使用 Python ApacheBeam for DataFlow 时如何避免 TypeError?

问题描述 投票:0回答:2

因此,我在 Python 中有一个非常基本的管道,使用 apache beam 和 google cloud 从 Cloud Storage 获取数据,删除一些列,然后将其移动到 BigQuery 中,直到最后一点。

使用

WriteToBigQuery(table = ...)
时,出现以下错误:

TypeError:isinstance() arg 2 必须是类型、类型元组或联合

这是来自它检查

isinstance(table, TableReference)
作为
parse_table_reference()
函数的一部分。奇怪的是,如果我手动检查:

table = TableReference.from_string(...)
print(isinstance(table, TableReference))

然后它就返回 True 了。

我尝试过以各种方式格式化表格引用,但它们都相同。

这是我提供表格引用的方式的问题吗?还是有其他方法可以做到这一点并避免此错误?


追溯

    TypeError                                 Traceback (most recent call last)
    Input In [1], in <cell line: 37>()
         38 options = PipelineOptions()
         39 p = beam.Pipeline(options = options)
         41 (
         42 p 
         43 | "Read" >> beam.io.textio.ReadFromText(('test_lender_2022-04-17.csv'), skip_header_lines = 1)
         44 | "Split" >> beam.ParDo(Split())
         45 #| "WriteToFile" >> beam.io.textio.WriteToText('testoutput.csv')
    ---> 46 | "WriteToBQ" >> beam.io.WriteToBigQuery(
         47     table = 'other-lender-uploads-test:Lender_Data.Test_Lender', 
         48     schema = 'Date: STRING, Name: STRING', 
         49     write_disposition = beam.io.BigQueryDisposition.WRITE_APPEND)
         50 )
         52 result = p.run()
    
    File ~\AppData\Local\Programs\Python\Python310\lib\site-packages\apache_beam\io\gcp\bigquery.py:2083, in WriteToBigQuery.__init__(self, table, dataset, project, schema, create_disposition, write_disposition, kms_key, batch_size, max_file_size, max_files_per_bundle, test_client, custom_gcs_temp_location, method, insert_retry_strategy, additional_bq_parameters, table_side_inputs, schema_side_inputs, triggering_frequency, validate, temp_file_format, ignore_insert_ids, with_auto_sharding, ignore_unknown_columns, load_job_project_id)
       2081 self._dataset = dataset
       2082 self._project = project
    -> 2083 self.table_reference = bigquery_tools.parse_table_reference(
       2084     table, dataset, project)
       2085 self.create_disposition = BigQueryDisposition.validate_create(
       2086     create_disposition)
       2087 self.write_disposition = BigQueryDisposition.validate_write(
       2088     write_disposition)
    
    File ~\AppData\Local\Programs\Python\Python310\lib\site-packages\apache_beam\io\gcp\bigquery_tools.py:240, in parse_table_reference(table, dataset, project)
        212 def parse_table_reference(table, dataset=None, project=None):
        213   """Parses a table reference into a (project, dataset, table) tuple.
        214 
        215   Args:
       (...)
        237       format.
        238   """
    --> 240   if isinstance(table, TableReference):
        241     return TableReference(
        242         projectId=table.projectId,
        243         datasetId=table.datasetId,
        244         tableId=table.tableId)
        245   elif callable(table):
    
    TypeError: isinstance() arg 2 must be a type, a tuple of types, or a union
python google-cloud-platform google-cloud-dataflow apache-beam
2个回答
1
投票

请安装 apache-beam[gcp] 而不是普通的 apache-beam。 尝试:

pip install apache-beam[gcp]

0
投票

就我而言(macos),

pip install apache-beam[gcp]
没有找到包裹,但是

pip install 'apache-beam[gcp]'

(带引号)做了

© www.soinside.com 2019 - 2024. All rights reserved.