Skip to content

Commit

Permalink
added in duckdb catalogue freshness method
Browse files Browse the repository at this point in the history
  • Loading branch information
CHRISCARLON committed Sep 21, 2024
1 parent 096ffc7 commit 1d21ecd
Show file tree
Hide file tree
Showing 2 changed files with 169 additions and 37 deletions.
8 changes: 7 additions & 1 deletion HerdingCats/api_endpoints.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
from enum import Enum


class CkanApiPathsDocs:
PACKAGE_LIST = "https://docs.ckan.org/en/2.11/api/index.html#ckan.logic.action.get.package_list"
PACKAGE_SEARCH = "https://docs.ckan.org/en/2.11/api/index.html#ckan.logic.action.get.package_search"
# Need to add the rest !!


class CkanApiPaths:
BASE_PATH = "/api/3/action/{}"
SITE_READ = BASE_PATH.format("site_read")
Expand All @@ -24,4 +30,4 @@ class CkanDataCatalogues(Enum):
SUBAK = "https://data.subak.org"
HUMANITARIAN = "https://data.humdata.org"
AFRICA = "https://open.africa"
# Add more catalogues as needed...
# Add more default catalogues as needed...
198 changes: 162 additions & 36 deletions HerdingCats/herding_cats.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import pyarrow as pa
import pyarrow.parquet as pq
import uuid
import json

from io import BytesIO
from typing import Any, Dict, Optional, Union, Literal, List
Expand All @@ -21,7 +22,7 @@
class CkanCatSession:
def __init__(self, domain: Union[str, CkanDataCatalogues]) -> None:
"""
Initialise CATExplore with a valid domain or predefined catalog.
Initialise a session with a valid domain or predefined catalog.
Args:
domain (url or catalogue item): str
Expand All @@ -35,14 +36,17 @@ def __init__(self, domain: Union[str, CkanDataCatalogues]) -> None:
)
self._validate_url()

# ----------------------------
# Initiate a Session
# ----------------------------
@staticmethod
def _process_domain(domain: Union[str, CkanDataCatalogues]) -> str:
"""
Process the domain to ensure it's in the correct format
This iterates through the CkanDataCatalogues Enum and checks for a match
Otherwise processes the url as normal
Otherwise it processes the url as normal
Args:
domain (url or catalogue item): str
Expand Down Expand Up @@ -106,7 +110,7 @@ def __exit__(self, exc_type, exc_val, exc_tb):
self.close_session()


# FIND THE DATA YOU WANT / NEED
# FIND THE DATA YOU WANT / NEED / ISOLATE PACKAGES AND RESOURCES
class CkanCatExplorer:
def __init__(self, cat_session: CkanCatSession):
"""
Expand All @@ -126,6 +130,9 @@ def __init__(self, cat_session: CkanCatSession):
"""
self.cat_session = cat_session

# ----------------------------
# Check CKAN backend health
# ----------------------------
def check_site_health(self) -> None:
"""
Make sure the Ckan endpoints are healthy and reachable
Expand Down Expand Up @@ -155,11 +162,14 @@ def check_site_health(self) -> None:
"Health Check Failed: Something went wrong and CKAN is currently not available"
)

# ----------------------------
# Basic Available package lists + metadata
# ----------------------------
def get_package_count(self) -> int:
"""
Quick way to see how 'big' a data catalogue is
Especially how many datasets there are
E.g how many datasets (packages) there are
Returns:
package_count: int
Expand Down Expand Up @@ -228,7 +238,7 @@ def package_list_dataframe(
self, df_type: Literal["pandas", "polars"]
) -> Union[pd.DataFrame, "pl.DataFrame"]:
"""
Return all available packages as a dataframe for further use
Explore all packages that are available to query as a dataframe
Must specify a df type:
pandas
Expand Down Expand Up @@ -280,19 +290,13 @@ def package_list_dataframe(
try:
return pl.DataFrame(result)
except ImportError:
logger.warning(
"Polars is not installed. Please run 'pip install polars' to use this option."
)
raise ImportError(
"Polars is not installed. Please run 'pip install polars' to use this option."
)
case "pandas":
try:
return pd.DataFrame(result)
except ImportError:
logger.warning(
"Pandas is not installed. Please run 'pip install pandas' to use this option."
)
raise ImportError(
"Pandas is not installed. Please run 'pip install pandas' to use this option."
)
Expand All @@ -303,9 +307,21 @@ def package_list_dataframe(
logger.error(f"Failed to search datasets: {e}")
raise CatExplorerError(f"Failed to search datasets: {str(e)}")

def package_list_sorted_most_recent_extra_info(self):
def package_list_dictionary_extra(self):
"""
THIS NEEDS MORE WORK - BASICS ARE THERE
Explore all packages that are available to query.
With extra resource and meta information.
Sorted by most recently updated dataset first.
# Example usage...
if __name__ == "__main__":
with CatSession("data.london.gov.uk") as session:
explore = CatExplorer(session)
info_extra = package_list_dictionary_extra()
pprint(info_extra)
"""
url = (
self.cat_session.base_url + CkanApiPaths.CURRENT_PACKAGE_LIST_WITH_RESOURCES
Expand All @@ -321,7 +337,10 @@ def package_list_sorted_most_recent_extra_info(self):
"name": entry.get("name"),
"title": entry.get("title"),
"maintainer": entry.get("maintainer"),
"metadata_created": entry.get("metadata_created"),
"metadata_modified": entry.get("metadata_modified"),
"resources": entry.get("resources"),
"groups": entry.get("groups"),
}
for entry in dictionary_prep
]
Expand All @@ -331,6 +350,68 @@ def package_list_sorted_most_recent_extra_info(self):
raise CatExplorerError(f"Failed to search datasets: {str(e)}")
return

# ----------------------------
# Show catalogue freshness
# ----------------------------
def catalogue_freshness(self):
"""
Explore all packages that are available to query.
Returns a view of how many resources have been updated in the last 6 months
as a percentage of the total number of resources, based on unique package names.
"""
url = (
self.cat_session.base_url + CkanApiPaths.CURRENT_PACKAGE_LIST_WITH_RESOURCES
)
try:
response = self.cat_session.session.get(url)
response.raise_for_status()
data = response.json()
dictionary_prep = data["result"]

dictionary_data = [
{
"owner_org": entry.get("owner_org"),
"name": entry.get("name"),
"title": entry.get("title"),
"maintainer": entry.get("maintainer"),
"metadata_created": entry.get("metadata_created"),
"metadata_modified": entry.get("metadata_modified"),
"resources": entry.get("resources"),
"groups": entry.get("groups"),
}
for entry in dictionary_prep
]

df = self._duckdb_explore(
dictionary_data,
"freshness",
"""
WITH package_stats AS (
SELECT
name,
COUNT(*) as resource_count,
MAX(TRY_CAST(metadata_modified AS TIMESTAMP)) as last_update
FROM freshness
GROUP BY name
)
SELECT
COUNT(DISTINCT name) as total_packages,
SUM(resource_count) as total_resources,
COUNT(DISTINCT CASE WHEN last_update >= CURRENT_TIMESTAMP - INTERVAL 6 MONTH THEN name END) as updated_packages_last_6_months,
COUNT(DISTINCT CASE WHEN last_update >= CURRENT_TIMESTAMP - INTERVAL 6 MONTH THEN name END) * 100.0 / COUNT(DISTINCT name) as percentage_updated_packages_last_6_months
FROM package_stats
""",
)

return df

except requests.RequestException as e:
logger.error(f"Failed to search datasets: {e}")
raise CatExplorerError(f"Failed to search datasets: {str(e)}")

# ----------------------------
# Show metadata using a package name
# ----------------------------
def package_show_info_json(self, package_name: Union[str, dict, Any]) -> List[Dict]:
"""
Pass in a package name as a string or as a value from a dictionary
Expand Down Expand Up @@ -364,17 +445,21 @@ def package_show_info_json(self, package_name: Union[str, dict, Any]) -> List[Di
data = response.json()
result_data = data["result"]

return self._extract_package_show_data(result_data)
return self._extract_resource_data(result_data)

except requests.RequestException as e:
logger.error(f"Failed to search datasets: {e}")
raise CatExplorerError(f"Failed to search datasets: {str(e)}")

# ----------------------------
# Search Packages and store in DataFrames / or keep as Dicts
# Unpack data or keep it packed (e.g. don't split out resources into own columns')
# ----------------------------
def package_search_json(self, search_query: str, num_rows: int):
"""
Returns all available data for a particular search query
Specify the number of rows if the 'count' is large as the ouput is capped
Specify the number of rows if the 'count' is large
"""

Expand All @@ -400,7 +485,6 @@ def package_search_condense_json_unpacked(
self, search_query: str, num_rows: int
) -> Optional[List[Dict]]:
"""
Args:
Search query: str
Number of rows: int
Expand Down Expand Up @@ -661,14 +745,20 @@ def package_search_condense_dataframe_unpacked(
logger.error(f"Failed to search datasets: {e}")
raise CatExplorerError(f"Failed to search datasets: {str(e)}")

# ----------------------------
# Extract specific data from results
# OR flatten nested data structures
# ----------------------------
def extract_resource_url(
self, package_info: List[Dict], resource_name: str
) -> List[str] | None:
"""
Extracts the URL and format of a specific resource from a package.
Specify the name of the resource you want to use.
Returns:
List[format, url]: The URL of the specified resource + its format.
List[format, url]: The format of the resource and the URL.
Example:
if __name__ == "__main__":
Expand Down Expand Up @@ -776,9 +866,9 @@ def _create_polars_dataframe(data: List[Dict[str, Any]]) -> pl.DataFrame:
)

@staticmethod
def _extract_package_show_data(data: Dict[str, Any]) -> List[Dict[str, Any]]:
def _extract_resource_data(data: Dict[str, Any]) -> List[Dict[str, Any]]:
"""
Extracts specific fields from the package data and creates a list of dictionaries,
Extracts specific fields for a specific package and creates a list of dictionaries,
one for each resource, containing the specified fields.
Args:
Expand All @@ -803,9 +893,54 @@ def _extract_package_show_data(data: Dict[str, Any]) -> List[Dict[str, Any]]:

return result

@staticmethod
def _duckdb_explore(
data: List[Dict[str, Any]],
table_name: str,
query: str = "",
) -> pd.DataFrame:
"""
Create in memory duckdb to explore catalogue data and isolate resources for further analysis
"""
try:
flattened_data = []
for entry in data:
base_entry = {
k: v for k, v in entry.items() if k not in ["resources", "groups"]
}
# Handle groups - store as a list of group names
base_entry["groups"] = [
group["name"] for group in entry.get("groups", [])
]
# Handle resources
if entry.get("resources"):
for resource in entry["resources"]:
resource_entry = base_entry.copy()
resource_entry.update(
{f"resource_{k}": v for k, v in resource.items()}
)
flattened_data.append(resource_entry)
else:
flattened_data.append(base_entry)
# Convert the flattened data to a pandas DataFrame
df = pd.DataFrame(flattened_data)
logger.success("DataFrame Successfully Created")
# Use a context manager for the DuckDB connection
with duckdb.connect(":memory:") as con:
# Register the pandas DataFrame as a table in DuckDB
con.register(f"{table_name}", df)
# Execute the provided query
result = con.execute(query).fetchdf()
return result
except Exception as e:
print(f"Error when creating DuckDB DataFrame: {str(e)}")
print("First few elements of input data:")
print(json.dumps(data[:2], indent=2))
return pd.DataFrame()


# START TO WRANGLE / ANALYSE
class CkanCatAnalyser:
class CkanCatResourceLoader:
"""
Need to do:
Expand All @@ -826,7 +961,7 @@ class CkanCatAnalyser:
motherduck ✅
S3 (duckdb)
S3 (direct)
S3 (direct) ✅ - as both raw file or parquet
S3 (DeltaLake)
S3 (Iceberg)
Redshift
Expand All @@ -841,6 +976,9 @@ class CkanCatAnalyser:
def __init__(self):
pass

# ----------------------------
# Load data into a variety of formats/tools for aggregation and analysis
# ----------------------------
def polars_data_loader(
self, resource_data: Optional[List]
) -> Optional[pl.DataFrame]:
Expand Down Expand Up @@ -1153,19 +1291,7 @@ def aws_s3_data_loader(

# Example usage...
if __name__ == "__main__":
with CkanCatSession(CkanDataCatalogues.LONDON_DATA_STORE) as session:
with CkanCatSession(CkanDataCatalogues.SUBAK) as session:
explore = CkanCatExplorer(session)
all_packages = explore.package_list_dictionary()

data = all_packages.get("prevalence-common-mental-health-problems-borough")
info = explore.package_show_info_json(data)

dl_link = explore.extract_resource_url(
info, "mental-health-common-problems-borough.csv"
)

analyser = CkanCatAnalyser()

upload = analyser.aws_s3_data_loader(
dl_link, "gridwalk-remote-file-test-bucket", "mental-health-issues", "raw"
)
fresh = explore.catalogue_freshness()
print(fresh)

0 comments on commit 1d21ecd

Please sign in to comment.