使用 Apache Beam 写入 Dataflow 管道上的 BigQuery 表时“TypeError: isinstance() arg 2 必须是类型、类型元组或联合”

问题描述 投票:0回答:1

我正在 Pthon 中开发数据流管道,使用

apache-beam==2.57.0
google-cloud-bigquery==3.26.0
从 Cloud SQL 数据库读取数据并将其写入 BigQuery 表。尝试写入 BigQuery 时,脚本遇到以下错误:

| "Write to BigQuery" >> beam.io.WriteToBigQuery(
                             ^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\x\Python\Python311\Lib\site-packages\apache_beam\io\gcp\bigquery.py", line 2083, in __init__
    self.table_reference = bigquery_tools.parse_table_reference(
                           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File File "C:\x\Python\Python311\Lib\site-packages\apache_beam\io\gcp\bigquery_tools.py", line 263, in parse_table_reference
    if isinstance(table, TableReference):
       ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
TypeError: isinstance() arg 2 must be a type, a tuple of types, or a union

环境:

  • Python版本:3.11
  • Apache Beam 版本:2.57.0
  • google-cloud-bigquery 版本:3.26.0
  • Runner:DirectRunner(用于本地测试)

详情:

我已经根据 Apache Beam GitHub 存储库 (google-cloud-bigquery>=2.0.0,<4).
) 中的设置文件验证了我的版本是兼容的 table_id 的格式为

project_id:dataset_id.table_name
。 表模式被定义为字典。 示例:

 table_schema = {
        "fields": [
            {"name": "trip_id", "type": "STRING"},
            {"name": "vehicle_id", "type": "STRING"},
            {"name": "customer_id", "type": "STRING"},
            {"name": "driver_id", "type": "STRING"}
        ]
    }

即使在以下情况下也会出现问题:

schema='trip_id: STRING, vehicle_id: STRING, customer_id: STRING, driver_id: STRING,',

此外,我在尝试通过 pip 安装 apache-beam[gcp] 时遇到了问题:

$ pip install apache-beam[gcp]
ERROR: Could not find a version that satisfies the requirement google-apitools<0.5.32,>=0.5.31; extra == "gcp" (from apache-beam[gcp]) (from versions: 0.5.32)
ERROR: No matching distribution found for google-apitools<0.5.32,>=0.5.31; extra == "gcp"

我正在使用私有 PyPI 存储库 (https://pjfrog@/...)。这可能会影响安装。我清除了 pip 缓存并尝试使用 --index-url=https://pypi.org/simple 直接从公共 PyPI 存储库安装。

代码片段:

# Create the Beam pipeline
with beam.Pipeline(options=options) as p:
    (
        p
            | "Start" >> beam.Create([None]) 
            | "Read from Cloud SQL" >> beam.ParDo(ReadFromCloudSQL()) 
            | "Print results" >> beam.Map(print)  # Print each element to the console
            | "Write to BigQuery" >> beam.io.WriteToBigQuery(
                table=table_id,
                schema=table_schema,
                create_disposition=beam.io.BigQueryDisposition.CREATE_NEVER,
                write_disposition=beam.io.gcp.bigquery.BigQueryDisposition.WRITE_APPEND 

             )
    )
python google-cloud-platform google-bigquery google-cloud-dataflow apache-beam
1个回答
0
投票

将 Beam 版本更新到 2.59 并安装 apache-beam[gcp] 后,我能够解决此错误。

断开 VPN 连接后,我可以从 PyPi 存储库安装此软件包。

pip install apache-beam[gcp] --index-url=https://pypi.org/simple

© www.soinside.com 2019 - 2024. All rights reserved.