出现错误:AttributeError:'MySchemaClassName'对象在使用 apache_beam to_dataframe 模块时没有属性“..type '”

问题描述 投票:0回答:1

您好,我正在学习 apache_beam。下面是我编写的脚本的一部分,旨在从 pub/sub 订阅中选取流数据,对其进行转换并将最终结果写入 BigQuery 但是,我收到此错误 AttributeError:“BmsSchema”对象没有属性“element_type”[同时运行“用 pandas 转换列”]

下面是我的代码

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions
from apache_beam.io.gcp.bigquery import WriteToBigQuery
from apache_beam.dataframe.convert import to_dataframe
import os
import typing

os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "...........json"


# Define the schema
class BmsSchema(typing.NamedTuple):
    ident: str


beam.coders.registry.register_coder(BmsSchema, beam.coders.RowCoder)


# Do function for passing in pubsub message from the subscription
class ParsePubSubMessage(beam.DoFn):
    def process(self, message):
        import json
        # Creating the main_dict that has all the columns
        all_columns = ['ident']
        main_dict = dict(zip(all_columns, [None] * len(all_columns)))
        # Parse the JSON message
        record = json.loads(message.decode('utf-8'))
        main_dict.update(record)

        yield {
            all_columns[0]: main_dict[all_columns[0]]

        }


# Do function for formatting the respective columns using Pandas
class PandasTransform(beam.DoFn):
    def process(self, element):
        df = to_dataframe(element)
        # Convert DataFrame back to a list of dictionaries
        for _, row in df.iterrows():
            yield row.to_dict()


def run():
    # Define pipeline options
    options = PipelineOptions(
        project='dw.......',
        runner='DirectRunner',
        streaming=True,
        temp_location='gs://............',
        staging_location='gs://..........',
        region='europe..........',
        job_name='pipeline-dataflow-test'
    )

    # Set streaming mode
    options.view_as(StandardOptions).streaming = True

    # Pub/Sub subscription
    input_subscription = 'projects/......./subscriptions/...........'

    table_schema = {
        "fields": [
            {"name": "ident", "type": "STRING", "mode": "NULLABLE"}

        ]
    }

    # Create the pipeline
    with beam.Pipeline(options=options) as p:
        # Read from Pub/Sub and parse the messages
        messages = (p
                    | 'Read from PubSub' >> beam.io.ReadFromPubSub(subscription=input_subscription)
                    | 'Parse PubSub Message' >> beam.ParDo(ParsePubSubMessage())
                    | 'Attaching the schema' >> beam.Map(lambda x: BmsSchema(**x)).with_output_types(BmsSchema)
                    | 'Transforming columns with pandas' >> beam.ParDo(PandasTransform())
                    )

        # Write to BigQuery with schema autodetect
        messages | 'Write to BigQuery' >> WriteToBigQuery(
            table='project.table_name',
            write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
            create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
            schema=table_schema,
            custom_gcs_temp_location='gs://........'
        )


if __name__ == '__main__':
    run() 
python google-bigquery etl google-cloud-dataflow apache-beam
1个回答
0
投票

to_dataframe 需要一个 PCollection[1],但您传递的是 PCollection 的单个元素。您可以执行以下操作,而不是将

df = to_dataframe(element)
作为
PandasTransform
的一部分进行调用:

messages = (p
                    | 'Read from PubSub' >> beam.io.ReadFromPubSub(subscription=input_subscription)
                    | 'Parse PubSub Message' >> beam.ParDo(ParsePubSubMessage())
                    | 'Attaching the schema' >> beam.Map(lambda x: BmsSchema(**x)).with_output_types(BmsSchema)
                    )
df_messages = to_dataframe(messages)

然后您可以使用数据帧操作来操纵 df_messages。请参阅 https://beam.apache.org/documentation/dsls/dataframes/overview/ 了解更多信息。

[1] https://beam.apache.org/releases/pydoc/current/apache_beam.dataframe.convert.html#apache_beam.dataframe.convert.to_dataframe

© www.soinside.com 2019 - 2024. All rights reserved.