我正在 Pthon 中开发数据流管道,使用
apache-beam==2.57.0
和 google-cloud-bigquery==3.26.0
从 Cloud SQL 数据库读取数据并将其写入 BigQuery 表。尝试写入 BigQuery 时,脚本遇到以下错误:
| "Write to BigQuery" >> beam.io.WriteToBigQuery(
^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\x\Python\Python311\Lib\site-packages\apache_beam\io\gcp\bigquery.py", line 2083, in __init__
self.table_reference = bigquery_tools.parse_table_reference(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File File "C:\x\Python\Python311\Lib\site-packages\apache_beam\io\gcp\bigquery_tools.py", line 263, in parse_table_reference
if isinstance(table, TableReference):
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
TypeError: isinstance() arg 2 must be a type, a tuple of types, or a union
环境:
详情:
我已经根据 Apache Beam GitHub 存储库 (google-cloud-bigquery>=2.0.0,<4).
) 中的设置文件验证了我的版本是兼容的
table_id 的格式为
project_id:dataset_id.table_name
。
表模式被定义为字典。
示例:
table_schema = {
"fields": [
{"name": "trip_id", "type": "STRING"},
{"name": "vehicle_id", "type": "STRING"},
{"name": "customer_id", "type": "STRING"},
{"name": "driver_id", "type": "STRING"}
]
}
即使在以下情况下也会出现问题:
schema='trip_id: STRING, vehicle_id: STRING, customer_id: STRING, driver_id: STRING,',
此外,我在尝试通过 pip 安装 apache-beam[gcp] 时遇到了问题:
$ pip install apache-beam[gcp]
ERROR: Could not find a version that satisfies the requirement google-apitools<0.5.32,>=0.5.31; extra == "gcp" (from apache-beam[gcp]) (from versions: 0.5.32)
ERROR: No matching distribution found for google-apitools<0.5.32,>=0.5.31; extra == "gcp"
我正在使用私有 PyPI 存储库 (https://pjfrog@/...)。这可能会影响安装。我清除了 pip 缓存并尝试使用 --index-url=https://pypi.org/simple 直接从公共 PyPI 存储库安装。
代码片段:
# Create the Beam pipeline
with beam.Pipeline(options=options) as p:
(
p
| "Start" >> beam.Create([None])
| "Read from Cloud SQL" >> beam.ParDo(ReadFromCloudSQL())
| "Print results" >> beam.Map(print) # Print each element to the console
| "Write to BigQuery" >> beam.io.WriteToBigQuery(
table=table_id,
schema=table_schema,
create_disposition=beam.io.BigQueryDisposition.CREATE_NEVER,
write_disposition=beam.io.gcp.bigquery.BigQueryDisposition.WRITE_APPEND
)
)
将 Beam 版本更新到 2.59 并安装 apache-beam[gcp] 后,我能够解决此错误。
断开 VPN 连接后,我可以从 PyPi 存储库安装此软件包。
pip install apache-beam[gcp] --index-url=https://pypi.org/simple