我想将库 pyiceberg 与 Google 云存储一起使用。
我使用 Pyspark 在 Google Cloud 存储中创建了一个目录,我想从那里读取这些表。
我看到这个文档为GSC创建目录对象,但我真的不明白如何连接到它或如何为谷歌云创建配置对象。
我尝试过:
catalog = load_catalog(
uri="gs://catalog",
type="gcsfs"
)
但我收到错误:
---------------------------------------------------------------------------
KeyError Traceback (most recent call last)
Cell In[4], line 1
----> 1 catalog = load_catalog(
2 name="gcsfs",
File ~/opt/anaconda3/envs/pyceberg/lib/python3.11/site-packages/pyiceberg/catalog/__init__.py:212, in load_catalog(name, **properties)
210 catalog_type = None
211 if provided_catalog_type and isinstance(provided_catalog_type, str):
--> 212 catalog_type = CatalogType[provided_catalog_type.upper()]
213 elif not provided_catalog_type:
214 catalog_type = infer_catalog_type(name, conf)
File ~/opt/anaconda3/envs/pyceberg/lib/python3.11/enum.py:792, in EnumType.__getitem__(cls, name)
788 def __getitem__(cls, name):
789 """
790 Return the member matching `name`.
791 """
--> 792 return cls._member_map_[name]
KeyError: 'GCSFS'
我安装了包 pypiceberg[gcsfs]。
我在 PYICEBERG github 存储库
中看到AVAILABLE_CATALOGS: dict[CatalogType, Callable[[str, Properties], Catalog]] = {
CatalogType.REST: load_rest,
CatalogType.HIVE: load_hive,
CatalogType.GLUE: load_glue,
CatalogType.DYNAMODB: load_dynamodb,
CatalogType.SQL: load_sql,
}
Pyiceberg 是一个用于处理 Iceberg 表的 Python 库。
首先,使用服务帐户文件获取 OAuth2 令牌。 我正在协作中运行这个,所以我需要以这种方式完成它。 如果在容器中运行,您可以以不同的方式执行此操作。
import google.auth
from google.auth.transport.requests import Request
from pyiceberg import catalog
def get_access_token(service_account_file, scopes):
"""
Retrieves an access token from Google Cloud Platform using service account credentials.
Args:
service_account_file: Path to the service account JSON key file.
scopes: List of OAuth scopes required for your application.
Returns:
The access token as a string.
"""
credentials, name = google.auth.load_credentials_from_file(
service_account_file, scopes=scopes)
request = Request()
credentials.refresh(request) # Forces token refresh if needed
return credentials
# Example usage
service_account_file = "/path-to-service-account-file.json" # Replace with your path
scopes = ["https://www.googleapis.com/auth/cloud-platform"] # Adjust scopes as needed
access_token = get_access_token(service_account_file, scopes)
接下来,加载目录。 我们使用通过我们的服务帐户密钥检索到的 OAUTH2 凭据。
我已经编辑了
datetime_to_unix_ms
函数以专注于主要任务。
由于您才刚刚开始,我建议通过使用注册表数据库来保持您的实现轻松。
如果您已经拥有 EMR 集群,则应该考虑使用 Hive 元存储。
对于这个例子,我们将使用 sqlite 数据库作为我们的中央注册表。您可以将其替换为 SQLalchemy 库支持的任何SQL 数据库选项。
REGISTRY_DATABASE_URI = "sqlite:///catalog.db" # replace this with your database URI
catalog_inst = catalog.load_catalog(
"default",
**{
"uri": REGISTRY_DATABASE_URI,
"py-io-impl": "pyiceberg.io.pyarrow.PyArrowFileIO",
"gcs.oauth2.token-expires-at": datetime_to_unix_ms(access_token.expiry),
"gcs.project-id": "project-id", # replace with your gcp project id
"gcs.oauth2.token": access_token.token,
"gcs.default-bucket-location": "gs://bucket/", # replace with your gcs bucket
"warehouse": "gs://bucket/" # replace with your gcs bucket
}
)
最后,我们使用 Pyarrow 创建一个包含一些数据的示例表:
import pyarrow as pa
catalog_inst.create_namespace("default") # Replace this with your namespace
# Define the schema for the book table
schema = pa.schema([
('title', pa.string())
])
catalog_inst.drop_table("default.books") # Replace this with your table
table = catalog_inst.create_table("default.books", schema=schema)
# Create some sample data
titles = ["The Lord of the Rings", "Pride and Prejudice", "Moby Dick"]
# Create Arrow arrays from the data
title_array = pa.array(titles, type=pa.string())
table_data = pa.Table.from_arrays([title_array], names=schema.names)
table.append(table_data)