[我正在尝试从Apache Beam PTransform WriteToBigQuery()写入bigquery,当我为该表提供一个lambda函数时,该函数会读取字段“ DEVICE”的值,但会出现错误。我在流式传输工作中做了这件事,但是可以工作,但是由于某种原因,这在此批处理工作中不起作用。
我的管道选项:
import apache_beam as beam
from apache_beam.runners import DataflowRunner
from apache_beam.options import pipeline_options
from apache_beam.options.pipeline_options import GoogleCloudOptions
import google.auth
options = pipeline_options.PipelineOptions(flags=[])
_, options.view_as(GoogleCloudOptions).project = google.auth.default()
options.view_as(GoogleCloudOptions).region = 'europe-west1'
options.view_as(pipeline_options.SetupOptions).sdk_location = (
'/root/apache-beam-custom/packages/beam/sdks/python/dist/apache-beam-%s0.tar.gz' %
beam.version.__version__)
我的代码:
p = beam.Pipeline(DataflowRunner(), options=options)
data = (p
| "Read text" >> beam.io.textio.ReadFromText(f'gs://{bucket_name}/{file}')
| "Parse json" >> beam.ParDo(lambda element: json.loads(element))
)
telemetry_data = (data
| "Filter telemetry data" >> beam.Filter(lambda element: element['type_MQTT'] == 'telemetry_data')
| "Format telemetry data" >> beam.Map(format_telemetry)
| "Telemetry data to bq" >> beam.io.WriteToBigQuery(
table = lambda element: f'project:dataset.{element["DEVICE"]}__opdata',
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND
)
)
我的全部错误消息:
---------------------------------------------------------------------------
AttributeError Traceback (most recent call last)
<ipython-input-12-ae8dd133c81b> in <module>
13 table = lambda element: f'project:dataset.{element["DEVICE"]}__opdata',
---> 14 write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND
15 )
16 )
~/apache-beam-custom/packages/beam/sdks/python/apache_beam/pvalue.py in __or__(self, ptransform)
138
139 def __or__(self, ptransform):
--> 140 return self.pipeline.apply(ptransform, self)
141
142
~/apache-beam-custom/packages/beam/sdks/python/apache_beam/pipeline.py in apply(self, transform, pvalueish, label)
596 if isinstance(transform, ptransform._NamedPTransform):
597 return self.apply(
--> 598 transform.transform, pvalueish, label or transform.label)
599
600 if not isinstance(transform, ptransform.PTransform):
~/apache-beam-custom/packages/beam/sdks/python/apache_beam/pipeline.py in apply(self, transform, pvalueish, label)
606 try:
607 old_label, transform.label = transform.label, label
--> 608 return self.apply(transform, pvalueish)
609 finally:
610 transform.label = old_label
~/apache-beam-custom/packages/beam/sdks/python/apache_beam/pipeline.py in apply(self, transform, pvalueish, label)
649 transform.type_check_inputs(pvalueish)
650
--> 651 pvalueish_result = self.runner.apply(transform, pvalueish, self._options)
652
653 if type_options is not None and type_options.pipeline_type_check:
~/apache-beam-custom/packages/beam/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py in apply(self, transform, input, options)
151 def apply(self, transform, input, options):
152 self._maybe_add_unified_worker_missing_options(options)
--> 153 return super(DataflowRunner, self).apply(transform, input, options)
154
155 def _get_unique_step_name(self):
~/apache-beam-custom/packages/beam/sdks/python/apache_beam/runners/runner.py in apply(self, transform, input, options)
196 m = getattr(self, 'apply_%s' % cls.__name__, None)
197 if m:
--> 198 return m(transform, input, options)
199 raise NotImplementedError(
200 'Execution of [%s] not implemented in runner %s.' % (transform, self))
~/apache-beam-custom/packages/beam/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py in apply_WriteToBigQuery(self, transform, pcoll, options)
833 return pcoll | 'WriteToBigQuery' >> beam.io.Write(
834 beam.io.BigQuerySink(
--> 835 transform.table_reference.tableId,
836 transform.table_reference.datasetId,
837 transform.table_reference.projectId,
AttributeError: 'function' object has no attribute 'tableId'
根据文档和此主题https://stackoverflow.com/a/62146803/5283663看来您需要指定schema参数。
这是否可以解决问题?
p = beam.Pipeline(DataflowRunner(), options=options)
data = (p
| "Read text" >> beam.io.textio.ReadFromText(f'gs://{bucket_name}/{file}')
| "Parse json" >> beam.ParDo(lambda element: json.loads(element))
)
telemetry_data = (data
| "Filter telemetry data" >> beam.Filter(lambda element: element['type_MQTT'] == 'telemetry_data')
| "Format telemetry data" >> beam.Map(format_telemetry)
| "Telemetry data to bq" >> beam.io.WriteToBigQuery(
table = lambda element: f'project:dataset.{element["DEVICE"]}__opdata',
schema=set_schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND
)
)
似乎与DataflowRunner本身有关。我做了一个简单的例子,并得到了同样的错误。我尝试了从2.11.0到2.21的SDK,我记得大约在一年前用2.13.0制作了此示例代码,所以我认为发生了变化的是DataflowRunner本身。
如果使用DirectRunner,它可以正常工作。示例代码:
with beam.Pipeline(options=pipeline_options) as p:
elements = [
{'number': 1, 'table': "table1"},
{'number': 2, 'table': "table2"},
{'number': 3, 'table': "table1"},
]
schema='number:integer'
def get_table(element):
table = element['table']
element.pop('table')
return f"{known_args.project}:{known_args.dataset}.{table}"
dyn_bq = (
p
| beam.Create(elements)
| WriteToBigQuery(table=get_table,
schema=schema,
create_disposition=BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=BigQueryDisposition.WRITE_APPEND)
)
看不到任何解决方法。我将公开发布并进行更新。