因此,我在 Python 中有一个非常基本的管道,使用 apache beam 和 google cloud 从 Cloud Storage 获取数据,删除一些列,然后将其移动到 BigQuery 中,直到最后一点。
使用
WriteToBigQuery(table = ...)
时,出现以下错误:
TypeError:isinstance() arg 2 必须是类型、类型元组或联合
这是来自它检查
isinstance(table, TableReference)
作为 parse_table_reference()
函数的一部分。奇怪的是,如果我手动检查:
table = TableReference.from_string(...)
print(isinstance(table, TableReference))
然后它就返回 True 了。
我尝试过以各种方式格式化表格引用,但它们都相同。
这是我提供表格引用的方式的问题吗?还是有其他方法可以做到这一点并避免此错误?
追溯
TypeError Traceback (most recent call last)
Input In [1], in <cell line: 37>()
38 options = PipelineOptions()
39 p = beam.Pipeline(options = options)
41 (
42 p
43 | "Read" >> beam.io.textio.ReadFromText(('test_lender_2022-04-17.csv'), skip_header_lines = 1)
44 | "Split" >> beam.ParDo(Split())
45 #| "WriteToFile" >> beam.io.textio.WriteToText('testoutput.csv')
---> 46 | "WriteToBQ" >> beam.io.WriteToBigQuery(
47 table = 'other-lender-uploads-test:Lender_Data.Test_Lender',
48 schema = 'Date: STRING, Name: STRING',
49 write_disposition = beam.io.BigQueryDisposition.WRITE_APPEND)
50 )
52 result = p.run()
File ~\AppData\Local\Programs\Python\Python310\lib\site-packages\apache_beam\io\gcp\bigquery.py:2083, in WriteToBigQuery.__init__(self, table, dataset, project, schema, create_disposition, write_disposition, kms_key, batch_size, max_file_size, max_files_per_bundle, test_client, custom_gcs_temp_location, method, insert_retry_strategy, additional_bq_parameters, table_side_inputs, schema_side_inputs, triggering_frequency, validate, temp_file_format, ignore_insert_ids, with_auto_sharding, ignore_unknown_columns, load_job_project_id)
2081 self._dataset = dataset
2082 self._project = project
-> 2083 self.table_reference = bigquery_tools.parse_table_reference(
2084 table, dataset, project)
2085 self.create_disposition = BigQueryDisposition.validate_create(
2086 create_disposition)
2087 self.write_disposition = BigQueryDisposition.validate_write(
2088 write_disposition)
File ~\AppData\Local\Programs\Python\Python310\lib\site-packages\apache_beam\io\gcp\bigquery_tools.py:240, in parse_table_reference(table, dataset, project)
212 def parse_table_reference(table, dataset=None, project=None):
213 """Parses a table reference into a (project, dataset, table) tuple.
214
215 Args:
(...)
237 format.
238 """
--> 240 if isinstance(table, TableReference):
241 return TableReference(
242 projectId=table.projectId,
243 datasetId=table.datasetId,
244 tableId=table.tableId)
245 elif callable(table):
TypeError: isinstance() arg 2 must be a type, a tuple of types, or a union
请安装 apache-beam[gcp] 而不是普通的 apache-beam。 尝试:
pip install apache-beam[gcp]
就我而言(macos),
pip install apache-beam[gcp]
没有找到包裹,但是
pip install 'apache-beam[gcp]'
(带引号)做了