AttributeError:'function'对象没有属性'tableId'。 Apache Beam数据流运行器

问题描述 投票:0回答:2

[我正在尝试从Apache Beam PTransform WriteToBigQuery()写入bigquery,当我为该表提供一个lambda函数时,该函数会读取字段“ DEVICE”的值,但会出现错误。我在流式传输工作中做了这件事,但是可以工作,但是由于某种原因,这在此批处理工作中不起作用。

我的管道选项:

import apache_beam as beam
from apache_beam.runners import DataflowRunner
from apache_beam.options import pipeline_options
from apache_beam.options.pipeline_options import GoogleCloudOptions
import google.auth

options = pipeline_options.PipelineOptions(flags=[])
_, options.view_as(GoogleCloudOptions).project = google.auth.default()
options.view_as(GoogleCloudOptions).region = 'europe-west1'
options.view_as(pipeline_options.SetupOptions).sdk_location = (
    '/root/apache-beam-custom/packages/beam/sdks/python/dist/apache-beam-%s0.tar.gz' % 
    beam.version.__version__)

我的代码:

p = beam.Pipeline(DataflowRunner(), options=options)

data = (p
    | "Read text" >> beam.io.textio.ReadFromText(f'gs://{bucket_name}/{file}')
    | "Parse json" >> beam.ParDo(lambda element: json.loads(element))
       )

telemetry_data = (data
    | "Filter telemetry data" >> beam.Filter(lambda element: element['type_MQTT'] == 'telemetry_data')
    | "Format telemetry data" >> beam.Map(format_telemetry)
    | "Telemetry data to bq" >> beam.io.WriteToBigQuery(
        table = lambda element: f'project:dataset.{element["DEVICE"]}__opdata',
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND
                                                        )
                 )

我的全部错误消息:

---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
<ipython-input-12-ae8dd133c81b> in <module>
     13             table = lambda element: f'project:dataset.{element["DEVICE"]}__opdata',
---> 14             write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND
     15                                                             )
     16                      )

~/apache-beam-custom/packages/beam/sdks/python/apache_beam/pvalue.py in __or__(self, ptransform)
    138 
    139   def __or__(self, ptransform):
--> 140     return self.pipeline.apply(ptransform, self)
    141 
    142 

~/apache-beam-custom/packages/beam/sdks/python/apache_beam/pipeline.py in apply(self, transform, pvalueish, label)
    596     if isinstance(transform, ptransform._NamedPTransform):
    597       return self.apply(
--> 598           transform.transform, pvalueish, label or transform.label)
    599 
    600     if not isinstance(transform, ptransform.PTransform):

~/apache-beam-custom/packages/beam/sdks/python/apache_beam/pipeline.py in apply(self, transform, pvalueish, label)
    606       try:
    607         old_label, transform.label = transform.label, label
--> 608         return self.apply(transform, pvalueish)
    609       finally:
    610         transform.label = old_label

~/apache-beam-custom/packages/beam/sdks/python/apache_beam/pipeline.py in apply(self, transform, pvalueish, label)
    649         transform.type_check_inputs(pvalueish)
    650 
--> 651       pvalueish_result = self.runner.apply(transform, pvalueish, self._options)
    652 
    653       if type_options is not None and type_options.pipeline_type_check:

~/apache-beam-custom/packages/beam/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py in apply(self, transform, input, options)
    151   def apply(self, transform, input, options):
    152     self._maybe_add_unified_worker_missing_options(options)
--> 153     return super(DataflowRunner, self).apply(transform, input, options)
    154 
    155   def _get_unique_step_name(self):

~/apache-beam-custom/packages/beam/sdks/python/apache_beam/runners/runner.py in apply(self, transform, input, options)
    196       m = getattr(self, 'apply_%s' % cls.__name__, None)
    197       if m:
--> 198         return m(transform, input, options)
    199     raise NotImplementedError(
    200         'Execution of [%s] not implemented in runner %s.' % (transform, self))

~/apache-beam-custom/packages/beam/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py in apply_WriteToBigQuery(self, transform, pcoll, options)
    833       return pcoll | 'WriteToBigQuery' >> beam.io.Write(
    834           beam.io.BigQuerySink(
--> 835               transform.table_reference.tableId,
    836               transform.table_reference.datasetId,
    837               transform.table_reference.projectId,

AttributeError: 'function' object has no attribute 'tableId'
python google-bigquery google-cloud-dataflow apache-beam
2个回答
0
投票

根据文档和此主题https://stackoverflow.com/a/62146803/5283663看来您需要指定schema参数。

这是否可以解决问题?

p = beam.Pipeline(DataflowRunner(), options=options)

data = (p
    | "Read text" >> beam.io.textio.ReadFromText(f'gs://{bucket_name}/{file}')
    | "Parse json" >> beam.ParDo(lambda element: json.loads(element))
       )

telemetry_data = (data
    | "Filter telemetry data" >> beam.Filter(lambda element: element['type_MQTT'] == 'telemetry_data')
    | "Format telemetry data" >> beam.Map(format_telemetry)
    | "Telemetry data to bq" >> beam.io.WriteToBigQuery(
        table = lambda element: f'project:dataset.{element["DEVICE"]}__opdata',
        schema=set_schema,            
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND
                                                        )
                 )

0
投票

似乎与DataflowRunner本身有关。我做了一个简单的例子,并得到了同样的错误。我尝试了从2.11.0到2.21的SDK,我记得大约在一年前用2.13.0制作了此示例代码,所以我认为发生了变化的是DataflowRunner本身。

如果使用DirectRunner,它可以正常工作。示例代码:

    with beam.Pipeline(options=pipeline_options) as p:
        elements = [
            {'number': 1, 'table': "table1"},
            {'number': 2, 'table': "table2"},
            {'number': 3, 'table': "table1"},
        ]

        schema='number:integer'

        def get_table(element):
            table = element['table']
            element.pop('table')
            return f"{known_args.project}:{known_args.dataset}.{table}"

        dyn_bq = (
                p
                | beam.Create(elements)
                | WriteToBigQuery(table=get_table,
                                   schema=schema,
                                   create_disposition=BigQueryDisposition.CREATE_IF_NEEDED,
                                   write_disposition=BigQueryDisposition.WRITE_APPEND)
        )

看不到任何解决方法。我将公开发布并进行更新。

© www.soinside.com 2019 - 2024. All rights reserved.