GCS 中的 Pyiceberg 目录:我无法将 pyceberg 与 google 云存储一起使用

问题描述 投票:0回答:1

我想将库 pyiceberg 与 Google 云存储一起使用。

我使用 Pyspark 在 Google Cloud 存储中创建了一个目录,我想从那里读取这些表。

我看到这个文档为GSC创建目录对象,但我真的不明白如何连接到它或如何为谷歌云创建配置对象。

我尝试过:

catalog = load_catalog(
    uri="gs://catalog",
    type="gcsfs"
)

但我收到错误:

---------------------------------------------------------------------------
KeyError                                  Traceback (most recent call last)
Cell In[4], line 1
----> 1 catalog = load_catalog(
      2     name="gcsfs", 
      
File ~/opt/anaconda3/envs/pyceberg/lib/python3.11/site-packages/pyiceberg/catalog/__init__.py:212, in load_catalog(name, **properties)
    210 catalog_type = None
    211 if provided_catalog_type and isinstance(provided_catalog_type, str):
--> 212     catalog_type = CatalogType[provided_catalog_type.upper()]
    213 elif not provided_catalog_type:
    214     catalog_type = infer_catalog_type(name, conf)

File ~/opt/anaconda3/envs/pyceberg/lib/python3.11/enum.py:792, in EnumType.__getitem__(cls, name)
    788 def __getitem__(cls, name):
    789     """
    790     Return the member matching `name`.
    791     """
--> 792     return cls._member_map_[name]

KeyError: 'GCSFS'

我安装了包 pypiceberg[gcsfs]。

我在 PYICEBERG github 存储库

中看到
AVAILABLE_CATALOGS: dict[CatalogType, Callable[[str, Properties], Catalog]] = {
    CatalogType.REST: load_rest,
    CatalogType.HIVE: load_hive,
    CatalogType.GLUE: load_glue,
    CatalogType.DYNAMODB: load_dynamodb,
    CatalogType.SQL: load_sql,
}

python google-cloud-storage apache-iceberg
1个回答
0
投票

Pyiceberg 是一个用于处理 Iceberg 表的 Python 库。

首先,使用服务帐户文件获取 OAuth2 令牌。 我正在协作中运行这个,所以我需要以这种方式完成它。 如果在容器中运行,您可以以不同的方式执行此操作。

import google.auth
from google.auth.transport.requests import Request
from pyiceberg import catalog


def get_access_token(service_account_file, scopes):
  """
  Retrieves an access token from Google Cloud Platform using service account credentials.

  Args:
      service_account_file: Path to the service account JSON key file.
      scopes: List of OAuth scopes required for your application.

  Returns:
      The access token as a string.
  """

  credentials, name = google.auth.load_credentials_from_file(
      service_account_file, scopes=scopes)

  request = Request()
  credentials.refresh(request)  # Forces token refresh if needed
  return credentials

# Example usage
service_account_file = "/path-to-service-account-file.json"  # Replace with your path
scopes = ["https://www.googleapis.com/auth/cloud-platform"]  # Adjust scopes as needed

access_token = get_access_token(service_account_file, scopes)

接下来,加载目录。 我们使用通过我们的服务帐户密钥检索到的 OAUTH2 凭据。

我已经编辑了

datetime_to_unix_ms
函数以专注于主要任务。

由于您才刚刚开始,我建议通过使用注册表数据库来保持您的实现轻松。

如果您已经拥有 EMR 集群,则应该考虑使用 Hive 元存储。

对于这个例子,我们将使用 sqlite 数据库作为我们的中央注册表。您可以将其替换为 SQLalchemy 库支持的任何SQL 数据库选项

REGISTRY_DATABASE_URI = "sqlite:///catalog.db" # replace this with your database URI

catalog_inst = catalog.load_catalog(
    "default",
    **{
        "uri": REGISTRY_DATABASE_URI, 
        "py-io-impl": "pyiceberg.io.pyarrow.PyArrowFileIO",
        "gcs.oauth2.token-expires-at": datetime_to_unix_ms(access_token.expiry),
        "gcs.project-id": "project-id", # replace with your gcp project id
        "gcs.oauth2.token": access_token.token,
        "gcs.default-bucket-location": "gs://bucket/", # replace with your gcs bucket
        "warehouse": "gs://bucket/" # replace with your gcs bucket
    }
)

最后,我们使用 Pyarrow 创建一个包含一些数据的示例表:

import pyarrow as pa

catalog_inst.create_namespace("default") # Replace this with your namespace

# Define the schema for the book table
schema = pa.schema([
    ('title', pa.string())
])

catalog_inst.drop_table("default.books") # Replace this with your table
table = catalog_inst.create_table("default.books", schema=schema)

# Create some sample data
titles = ["The Lord of the Rings", "Pride and Prejudice", "Moby Dick"]

# Create Arrow arrays from the data
title_array = pa.array(titles, type=pa.string())
table_data = pa.Table.from_arrays([title_array], names=schema.names)

table.append(table_data)
© www.soinside.com 2019 - 2024. All rights reserved.