Skip to content

Commit

Permalink
Added in DeltaLake reading capability
Browse files Browse the repository at this point in the history
  • Loading branch information
CHRISCARLON committed Oct 13, 2024
1 parent b426027 commit 008e196
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 9 deletions.
6 changes: 5 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,11 @@ aws-config = "1.5.8"
aws-credential-types = "1.2.0"
aws-sdk-sts = "1.39.0"
aws-types = "1.3.3"
deltalake = "0.20.1"
deltalake = { version = "0.20.1", features = [
"s3",
"datafusion",
"datafusion-ext",
] }
deltalake-aws = "0.3.0"
tokio = { version = "1.39.3", features = ["full"] }

Expand Down
58 changes: 50 additions & 8 deletions src/delta_lake/mod.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
use aws_config;
use aws_config::BehaviorVersion;
use aws_sdk_sts::config::ProvideCredentials;
use deltalake::{open_table_with_storage_options, DeltaTableError};
use colored::Colorize;
use deltalake::arrow::record_batch::RecordBatch;
use deltalake::datafusion::execution::context::SessionContext;
use deltalake::{open_table_with_storage_options, DeltaOps, DeltaTableError};
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;

// Load AWS Creds into a hashmap for use with delta lake reader
Expand Down Expand Up @@ -58,14 +62,52 @@ pub async fn load_remote_delta_lake_table_info(
deltalake_aws::register_handlers(None);

// open delta lake table
let remote_delta_lake_table = open_table_with_storage_options(s3_uri, storage_options).await?;
let table = match open_table_with_storage_options(s3_uri, storage_options).await {
Ok(tbl) => tbl,
Err(_) => {
let ops = DeltaOps::try_from_uri(s3_uri).await?;
ops.create().with_table_name("data").await?
}
};

// Start DataFusion context
let ctx = SessionContext::new();

// Register table
ctx.register_table("data", Arc::new(table)).unwrap();

// Get and print the latest version
let version = remote_delta_lake_table.version();
println!("Current version: {:?}", version);
// Create batches
let batches = ctx
.sql("SELECT * FROM data")
.await
.unwrap()
.collect()
.await
.unwrap();

for batch in batches {
println!("{}", "DeltaLake Output: Columns and RecordBatch.".green());
process_record_batch(&batch);
println!("Record Batch: {:?}", batch);
}

// Get and print the table URI
let uri = remote_delta_lake_table.table_uri();
println!("Table URI: {}", uri);
Ok(())
}

fn process_record_batch(batch: &RecordBatch) {
println!("Number of columns: {}", batch.num_columns());
println!("Number of rows: {}", batch.num_rows());

let schema = batch.schema();

for i in 0..batch.num_columns() {
let field = schema.field(i);

println!(
"Column {}: '{}' (Type: {:?})",
i,
field.name(),
field.data_type()
);
}
}

0 comments on commit 008e196

Please sign in to comment.