从 Dask 连接到 MinIO 上托管的 Delta Lake

问题描述 投票:0回答:1

我正在尝试连接到存储在 MinIO 而不是 S3 上的 DeltaLake 表。我可以直接使用

deltalake
Python 包来完成此操作,如下所示:

storage_options = {
    "AWS_ENDPOINT_URL": "http://localhost:9000",
    "AWS_REGION": "local",
    "AWS_ACCESS_KEY_ID": access_key,
    "AWS_SECRET_ACCESS_KEY": secret_key,
    "AWS_S3_ALLOW_UNSAFE_RENAME": "true",
    "AWS_ALLOW_HTTP": "true"
}

dt = DeltaTable("s3a://my_bucket/data", storage_options=storage_options)
df = dt.to_pandas()

但是,我想读入 Dask 数据帧,所以我尝试使用

dask-deltatable
。由于它在底层使用
deltalake
,我认为以下内容可以工作:

ddf = dask_deltatable.read_deltalake("s3a://my_bucket/data", storage_options=storage_options)

但是,它似乎仍在尝试连接到AWS:

OSError                                   Traceback (most recent call last)
Cell In[3], line 1
----> 1 ddf = dask_deltatable.read_deltalake("s3a://my_bucket/data", storage_options=storage_options)

File ~/.local/lib/python3.10/site-packages/dask_deltatable/core.py:285, in read_deltalake(path, catalog, database_name, table_name, version, columns, storage_options, datetime, delta_storage_options, **kwargs)
    282         raise ValueError("Please Provide Delta Table path")
    284     delta_storage_options = utils.maybe_set_aws_credentials(path, delta_storage_options)  # type: ignore
--> 285     resultdf = _read_from_filesystem(
    286         path=path,
    287         version=version,
    288         columns=columns,
    289         storage_options=storage_options,
    290         datetime=datetime,
    291         delta_storage_options=delta_storage_options,
    292         **kwargs,
    293     )
    294 return resultdf

File ~/.local/lib/python3.10/site-packages/dask_deltatable/core.py:102, in _read_from_filesystem(path, version, columns, datetime, storage_options, delta_storage_options, **kwargs)
     99 delta_storage_options = utils.maybe_set_aws_credentials(path, delta_storage_options)  # type: ignore
    101 fs, fs_token, _ = get_fs_token_paths(path, storage_options=storage_options)
--> 102 dt = DeltaTable(
    103     table_uri=path, version=version, storage_options=delta_storage_options
    104 )
    105 if datetime is not None:
    106     dt.load_as_version(datetime)

File ~/.local/lib/python3.10/site-packages/deltalake/table.py:297, in DeltaTable.__init__(self, table_uri, version, storage_options, without_files, log_buffer_size)
    277 """
    278 Create the Delta Table from a path with an optional version.
    279 Multiple StorageBackends are currently supported: AWS S3, Azure Data Lake Storage Gen2, Google Cloud Storage (GCS) and local URI.
   (...)
    294 
    295 """
    296 self._storage_options = storage_options
--> 297 self._table = RawDeltaTable(
    298     str(table_uri),
    299     version=version,
    300     storage_options=storage_options,
    301     without_files=without_files,
    302     log_buffer_size=log_buffer_size,
    303 )

OSError: Generic S3 error: Error after 10 retries in 13.6945151s, max_retries:10, retry_timeout:180s, source:error sending request for url (http://169.254.169.254/latest/api/token)

是否有人成功地从 MinIO 中将 Deltalake 读取到 Dask 数据帧中,如果是的话,如何实现?

python dask delta-lake minio
1个回答
0
投票

首先:合理的解决方法是将这些值设置为环境变量。这样,任何运行中的 s3 框架都应该可以获取它们。

在文档字符串中,我们有

storage_options:字典,默认无 要传递到 fsspec 后端的键/值对(如果有)。

delta_storage_options :字典,默认无 要传递到 delta-rs 文件系统的键/值对(如果有)。

我怀疑这里的第二组选项与您所拥有的完全一样。

对于

fsspec
,请参阅此文档,了解如何设置端点、区域和密钥/秘密。其余的值将是 client_kwargs 的一部分,您需要在 botocore 文档中查找 - 但我怀疑它们不是必需的。

© www.soinside.com 2019 - 2024. All rights reserved.