从文档here可以看到,可以使用
deltalake::open_table
方法打开位于文件系统上的Delta Lake表。这我已经能做到了。
但是我一直无法弄清楚如何在 s3 上打开 Delta Lake 表。
可以在
here找到
deltalake::open_table
的文档说
使用当前元数据从给定路径创建并加载 DeltaTable。从给定表路径中的方案推断要使用的存储后端。
我尝试将 url 传递到 s3 存储桶,因为文档说将推断存储后端。但这样做
open_table("s3://127.0.0.1:9000/deltalake/delta-table")
会导致恐慌
thread 'main' panicked at 'called `Result::unwrap()` on an `Err` value: LoadCheckpoint { source: Storage { source: Generic { store: "S3", source: Error { retries: 0, message: "request error", source: Some(reqwest::Error { kind: Request, url: Url { scheme: "http", cannot_be_a_base: false, username: "", password: None, host: Some(Ipv4(169.254.169.254)), port: None, path: "/latest/api/token", query: None, fragment: None }, source: hyper::Error(Connect, ConnectError("tcp connect error", Os { code: 60, kind: TimedOut, message: "Operation timed out" })) }) } } } }', src/main.rs:71:10
stack backtrace:
0: rust_begin_unwind
我有一种感觉这应该是微不足道的,但不幸的是事实并非如此:(
那么有人知道如何让 open_table 打开位于 s3 存储桶上的表吗?
open_table_with_storage_options
来完成此操作,因为 AWS_ENDPOINT_URL
是选项之一。我在这个要点中有一个 在 Rust 中阅读 S3 的工作示例,你可以调整它。
use deltalake::open_table_with_storage_options;
use std::{collections::HashMap, error::Error};
use aws_credential_types::provider::ProvideCredentials;
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
let table_uri = "s3://my-valid-table";
// I am using a specific profile
let sdk_config = aws_config::load_from_env().await;
let cp = sdk_config
.credentials_provider().expect("credentials");
let creds = cp.provide_credentials().await?;
let mut options = HashMap::new();
options.insert("AWS_ACCESS_KEY_ID".into(), creds.access_key_id().to_string());
options.insert("AWS_SECRET_ACCESS_KEY".into(), creds.secret_access_key().to_string());
options.insert("AWS_SESSION_TOKEN".into(), creds.session_token().expect("got token").to_string());
// you need to be psychic to know to do this https://github.com/delta-io/delta-rs/releases/tag/rust-v0.17.0
deltalake::aws::register_handlers(None);
let dt = open_table_with_storage_options(table_uri, options).await.expect("got table");
println!("{:?} version: {}", table_uri, dt.version());
Ok(())
}