您好,我正在学习 apache_beam。下面是我编写的脚本的一部分,旨在从 pub/sub 订阅中选取流数据,对其进行转换并将最终结果写入 BigQuery 但是,我收到此错误 AttributeError:“BmsSchema”对象没有属性“element_type”[同时运行“用 pandas 转换列”]
下面是我的代码
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions
from apache_beam.io.gcp.bigquery import WriteToBigQuery
from apache_beam.dataframe.convert import to_dataframe
import os
import typing
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "...........json"
# Define the schema
class BmsSchema(typing.NamedTuple):
ident: str
beam.coders.registry.register_coder(BmsSchema, beam.coders.RowCoder)
# Do function for passing in pubsub message from the subscription
class ParsePubSubMessage(beam.DoFn):
def process(self, message):
import json
# Creating the main_dict that has all the columns
all_columns = ['ident']
main_dict = dict(zip(all_columns, [None] * len(all_columns)))
# Parse the JSON message
record = json.loads(message.decode('utf-8'))
main_dict.update(record)
yield {
all_columns[0]: main_dict[all_columns[0]]
}
# Do function for formatting the respective columns using Pandas
class PandasTransform(beam.DoFn):
def process(self, element):
df = to_dataframe(element)
# Convert DataFrame back to a list of dictionaries
for _, row in df.iterrows():
yield row.to_dict()
def run():
# Define pipeline options
options = PipelineOptions(
project='dw.......',
runner='DirectRunner',
streaming=True,
temp_location='gs://............',
staging_location='gs://..........',
region='europe..........',
job_name='pipeline-dataflow-test'
)
# Set streaming mode
options.view_as(StandardOptions).streaming = True
# Pub/Sub subscription
input_subscription = 'projects/......./subscriptions/...........'
table_schema = {
"fields": [
{"name": "ident", "type": "STRING", "mode": "NULLABLE"}
]
}
# Create the pipeline
with beam.Pipeline(options=options) as p:
# Read from Pub/Sub and parse the messages
messages = (p
| 'Read from PubSub' >> beam.io.ReadFromPubSub(subscription=input_subscription)
| 'Parse PubSub Message' >> beam.ParDo(ParsePubSubMessage())
| 'Attaching the schema' >> beam.Map(lambda x: BmsSchema(**x)).with_output_types(BmsSchema)
| 'Transforming columns with pandas' >> beam.ParDo(PandasTransform())
)
# Write to BigQuery with schema autodetect
messages | 'Write to BigQuery' >> WriteToBigQuery(
table='project.table_name',
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
schema=table_schema,
custom_gcs_temp_location='gs://........'
)
if __name__ == '__main__':
run()
to_dataframe 需要一个 PCollection[1],但您传递的是 PCollection 的单个元素。您可以执行以下操作,而不是将
df = to_dataframe(element)
作为 PandasTransform
的一部分进行调用:
messages = (p
| 'Read from PubSub' >> beam.io.ReadFromPubSub(subscription=input_subscription)
| 'Parse PubSub Message' >> beam.ParDo(ParsePubSubMessage())
| 'Attaching the schema' >> beam.Map(lambda x: BmsSchema(**x)).with_output_types(BmsSchema)
)
df_messages = to_dataframe(messages)
然后您可以使用数据帧操作来操纵 df_messages。请参阅 https://beam.apache.org/documentation/dsls/dataframes/overview/ 了解更多信息。