Skip to content

Commit

Permalink
Added in S3 support
Browse files Browse the repository at this point in the history
  • Loading branch information
CHRISCARLON committed Sep 21, 2024
1 parent 78f7935 commit 4056c7b
Show file tree
Hide file tree
Showing 3 changed files with 128 additions and 13 deletions.
134 changes: 123 additions & 11 deletions HerdingCats/herding_cats.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,16 @@
import pandas as pd
import polars as pl
import duckdb
import boto3
import pyarrow as pa
import pyarrow.parquet as pq
import uuid

from io import BytesIO
from typing import Any, Dict, Optional, Union, Literal, List
from loguru import logger
from urllib.parse import urlencode, urlparse
from botocore.exceptions import ClientError

from .api_endpoints import CkanApiPaths, CkanDataCatalogues
from .cats_errors import CatExplorerError, CatSessionError
Expand Down Expand Up @@ -89,7 +94,7 @@ def start_session(self) -> None:
def close_session(self) -> None:
"""Close the session."""
self.session.close()
logger.info(f"Session closed for {self.domain}")
logger.success("Session closed")

def __enter__(self):
"""Allow use with the context manager with"""
Expand Down Expand Up @@ -687,7 +692,7 @@ def extract_resource_url(
url = item.get("resource_url")
format = item.get("resource_format")
if url and format:
logger.info(
logger.success(
f"Found URL for resource '{resource_name}'. Format is: {format}"
)
return [format, url]
Expand Down Expand Up @@ -1038,22 +1043,129 @@ def motherduck_data_loader(
except Exception as e:
logger.error(f"Unexpected error while connecting to MotherDuck: {e}")

def aws_s3_data_loader(
self,
resource_data: Optional[List[str]],
bucket_name: str,
custom_name: str,
mode: Literal["raw", "parquet"],
):
"""
Load resource data into remote S3 storage as current file type or as a parquet file.
May add in delta lake and iceberg in the future.
Args:
- resource_data: List containing file format and URL.
- bucket_name: S3 bucket
- mode: Chose whether data is uploaded in current format or as parquet
"""
# Enforce that resource_data is not None or empty
if not resource_data or len(resource_data) < 2:
logger.error("Invalid or insufficient resource data provided")
return

# Ensure bucket name is present before processing begins
if not bucket_name:
logger.error("No bucket name provided")
return

# Create an S3 client
s3_client = boto3.client("s3")
logger.success("S3 Client Created")

# Check if the bucket exists before processing the data
try:
s3_client.head_bucket(Bucket=bucket_name)
logger.success("Bucket Found")
except ClientError as e:
error_code = int(e.response["Error"]["Code"])
if error_code == 404:
logger.error(f"Bucket '{bucket_name}' does not exist.")
else:
logger.error(f"Error checking bucket '{bucket_name}': {e}")
return

url = resource_data[1]
file_format = resource_data[0].lower()
try:
response = requests.get(url)
response.raise_for_status()
binary_data = BytesIO(response.content)

# Generate a unique filename
filename = f"{custom_name}-{uuid.uuid4()}.{file_format}"

if mode == "raw":
# Upload the file in its original format
try:
s3_client.upload_fileobj(binary_data, bucket_name, filename)
logger.success("File uploaded successfully to S3")
except ClientError as e:
logger.error(f"Error uploading file to S3: {e}")
return

elif mode == "parquet":
# Convert to Parquet and upload
try:
# Read the data based on the file format
if file_format and (
file_format.lower() == "spreadsheet"
or file_format.lower() == "xlsx"
):
df = pd.read_excel(binary_data)
elif file_format == "csv":
df = pd.read_csv(binary_data)
elif file_format == "json":
df = pd.read_json(binary_data)
else:
logger.error(
f"Unsupported file format for Parquet conversion: {file_format}"
)
return

# Convert to Parquet
table = pa.Table.from_pandas(df)
parquet_buffer = BytesIO()
pq.write_table(table, parquet_buffer)
parquet_buffer.seek(0)

# Upload the Parquet file
parquet_filename = f"{custom_name}-{uuid.uuid4()}.parquet"
s3_client.upload_fileobj(
parquet_buffer, bucket_name, parquet_filename
)
logger.success(
"File converted to Parquet and uploaded successfully to S3"
)
except Exception as e:
logger.error(f"Error converting to Parquet or uploading to S3: {e}")
return

else:
logger.error(f"Invalid mode specified: {mode}")
return

except requests.RequestException as e:
logger.error(f"Error fetching data from URL: {e}")
return


# Example usage...
if __name__ == "__main__":
with CkanCatSession(CkanDataCatalogues.LONDON_DATA_STORE) as session:
explore = CkanCatExplorer(session)
all_packages = explore.package_list_dictionary()
data = all_packages.get("violence-reduction-unit")

data = all_packages.get("prevalence-common-mental-health-problems-borough")
info = explore.package_show_info_json(data)
dl_link = explore.extract_resource_url(info, "VRU Q1 2023-24 Dataset")

analyse = CkanCatAnalyser()
motherd = analyse.motherduck_data_loader(
dl_link,
token="",
duckdb_name="test",
table_name="test_table_herding_cats",

dl_link = explore.extract_resource_url(
info, "mental-health-common-problems-borough.csv"
)

analyser = CkanCatAnalyser()

upload = analyser.aws_s3_data_loader(
dl_link, "gridwalk-remote-file-test-bucket", "mental-health-issues", "raw"
)
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,14 @@ This will improve and speed up how people:
#### Planned
- S3 Integration
- duckdb
- direct
- direct 🚧
- DeltaLake
- Iceberg
- Redshift
- Databricks
- Snowflake
- Google Cloud Platform
- Google Cloud Storage
- Google Cloud Storage 🚧
- Google BigQuery


Expand Down
3 changes: 3 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ openpyxl = "^3.1.5"
polars = "^1.5.0"
fastexcel = "^0.11.6"
duckdb = "1.0.0"
boto3 = "^1.35.23"
pyarrow = "^17.0.0"
xlrd = "^2.0.1"

[tool.poetry.group.dev.dependencies]
watchdog = "^4.0.1"
Expand Down

0 comments on commit 4056c7b

Please sign in to comment.