**我想使用以下代码将数据提取到bigquery中,但它给了我以下错误 ** 我对 apache beam 很陌生,所以请帮助我更好地理解。
File "C:\Users\Stranger\OneDrive - Stranger\Desktop\Coding\POC2\beam_bq.py", line 106, in <module>
poc1()
File "C:\Users\Stranger\OneDrive - Stranger\Desktop\Coding\POC2\beam_bq.py", line 99, in poc1
result = (errors | 'PrintErrors' >> beam.Map(print))
~~~~~~~^~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
File "C:\Users\Stranger\OneDrive - Stranger\Desktop\Coding\POC2\Lib\site-packages\apache_beam\transforms\ptransform.py", line 1110, in __ror__
return self.transform.__ror__(pvalueish, self.label)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\Stranger\OneDrive - Stranger\Desktop\Coding\POC2\Lib\site-packages\apache_beam\transforms\ptransform.py", line 616, in __ror__
replacements = {
^
File "C:\Users\Stranger\OneDrive - Stranger\Desktop\Coding\POC2\Lib\site-packages\apache_beam\transforms\ptransform.py", line 617, in <dictcomp>
id(v): p | 'CreatePInput%s' % ix >> Create(v, reshuffle=False)
^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\Stranger\OneDrive - Stranger\Desktop\Coding\POC2\Lib\site-packages\apache_beam\transforms\core.py", line 3723, in __init__
self.values = tuple(values)
^^^^^^^^^^^^^
File "C:\Users\Stranger\OneDrive - Stranger\Desktop\Coding\POC2\Lib\site-packages\apache_beam\io\gcp\bigquery.py", line 2501, in __getitem__
raise AttributeError(
AttributeError: Error trying to access nonexistent attribute `0` in write result. Please see __documentation__ for available attributes.
def poc1():
import apache_beam as beam
# Create pipeline.
schema = ({'fields': [{'name': 'a', 'type': 'STRING', 'mode': 'REQUIRED'}]})
pipeline = beam.Pipeline()
errors = (pipeline
| 'Data' >> beam.Create([2, 2])
| 'CreateBrokenData' >> beam.Map(lambda src: {'a': src} if src == 2 else {'a': '2'})
| 'WriteToBigQuery' >> beam.io.WriteToBigQuery(
table='dummy_a_table',
dataset='PM_INGEST_TEMP',
project="bmas-eu-digi-pipe-dev",
schema=schema,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,
custom_gcs_temp_location='gs://destination_bucket_final'
# insert_retry_strategy='RETRY_ON_TRANSIENT_ERROR',
# create_disposition='CREATE_IF_NEEDED',
# write_disposition='WRITE_APPEND')
)
)
result = (errors | 'PrintErrors' >> beam.Map(print))
if __name__ == '__main__':
# run()
poc1()
我想使用代码将数据提取到 bigquery 中,但它给了我错误。我正在使用 Windows 10、Python 3.10,并且希望在本地运行而不是在谷歌云数据流上运行。