如何从 PubSub 主题读取数据并将其解析到梁管道中并打印它

问题描述 投票:0回答:5

我有一个程序,可以在 pubSub 中创建一个主题,并向该主题发布消息。我还有一个自动数据流作业(使用模板),它将这些消息保存到我的 BigQuery 表中。现在我打算用 python 管道替换基于模板的作业,其中我的要求是从 PubSub 读取数据,应用转换并将数据保存到 BigQuery/发布到另一个 PubSub 主题。我开始用 python 编写脚本,并进行了大量的试验和错误来实现它,但令我沮丧的是,我无法实现它。代码如下所示:

import apache_beam as beam
from apache_beam.io import WriteToText
TOPIC_PATH = "projects/test-pipeline-253103/topics/test-pipeline-topic"
OUTPUT_PATH = "projects/test-pipeline-253103/topics/topic-repub"

def run():
    o = beam.options.pipeline_options.PipelineOptions()
    p = beam.Pipeline(options=o)
    print("I reached here")
    # # Read from PubSub into a PCollection.
    data = (
        p
        | "Read From Pub/Sub" >> beam.io.ReadFromPubSub(topic=TOPIC_PATH)
    )
    data | beam.io.WriteToPubSub(topic=OUTPUT_PATH)
    print("Lines: ", data)

run()

如果我能尽早获得一些帮助,我将非常感激。 注意:我在谷歌云上设置了我的项目,并且我的脚本在本地运行。

google-cloud-platform google-bigquery google-cloud-dataflow apache-beam google-cloud-pubsub
5个回答
6
投票

这是工作代码。

import apache_beam as beam

TOPIC_PATH = "projects/test-pipeline-253103/topics/test-pipeline-topic"
OUTPUT_PATH = "projects/test-pipeline-253103/topics/topic-repub"


class PrintValue(beam.DoFn):
    def process(self, element):
        print(element)
        return [element]

def run():

    o = beam.options.pipeline_options.PipelineOptions()
    # Replace this by --stream execution param
    standard_options = o.view_as(beam.options.pipeline_options.StandardOptions)
    standard_options.streaming = True
    p = beam.Pipeline(options=o)

    print("I reached here")
    # # Read from PubSub into a PCollection.
    data = p | beam.io.ReadFromPubSub(topic=TOPIC_PATH) | beam.ParDo(PrintValue()) | beam.io.WriteToPubSub(topic=OUTPUT_PATH)
    # Don't forget to run the pipeline!
    result = p.run()
    result.wait_until_finish()

run()

总结一下

  • 您错过了运行管道。事实上,Beam 是一种图编程模型。因此,在之前的代码中,您构建了图表,但从未运行它。在这里,最后,运行它(不阻塞调用)并等待结束(阻塞调用)
  • 当您启动管道时,Beam 提到 PubSub 仅在流模式下工作。因此,您可以使用
    --streaming
    参数启动代码,或者按照我的代码中所示以编程方式执行此操作

请注意,流模式意味着在 PubSub 上无限期地收听。如果您在 Dataflow 上运行此命令,您的管道将始终处于运行状态,直到您停止它。如果您的消息很少,这可能会花费昂贵的费用。确保这是目标型号

另一种方法是在有限的时间内使用管道(您使用调度程序来启动它,并使用另一个调度程序来停止它)。但是,此时,你必须堆叠消息。在这里,您使用

Topic
作为管道的入口。此选项强制 Beam 创建临时订阅并侦听此订阅上的消息。这意味着在创建此订阅之前发布的消息将不会被接收和处理。

这个想法是创建一个订阅,这样消息就会堆积在其中(默认最多7天)。然后,在管道的条目中使用订阅名称

beam.io.ReadFromPubSub(subscription=SUB_PATH)
。消息将由 Beam 拆栈并处理(不保证顺序!)


0
投票

根据Beam编程指南,您只需在管道中添加一个转换步骤即可。这是一个例子或转换:

class PrintValue(beam.DoFn):
  def process(self, element):
    print(element)
    return [element]

将其添加到您的管道中

 data |  beam.ParDo(PrintValue()) | beam.io.WriteToPubSub(topic=OUTPUT_PATH)

您可以添加所需的变换数量。您可以测试该值并设置标记的 PCollection 中的元素(用于具有多个输出)以进行扇出,或在 PCollection 中使用侧面输入作为扇形。


0
投票

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

# Set the Google Cloud project ID
project_id = 'your-project-id'

# Set the Pub/Sub topic and subscription names
topic_name = 'projects/{project_id}/topics/{topic_name}'
subscription_name = 'projects/{project_id}/subscriptions/{subscription_name}'

# Define the pipeline options
options = PipelineOptions(
    streaming=True,
    runner='DirectRunner',  # Use DirectRunner for testing, DataflowRunner for production
    project=project_id,
    region='us-central1',
)

# Create a Pub/Sub message publisher transform
class PubsubMessagePublisher(beam.DoFn):
    def __init__(self, topic_name):
        self.topic_name = topic_name

    def process(self, element):
        # Publish the message to the Pub/Sub topic
        from google.cloud import pubsub_v1
        publisher = pubsub_v1.PublisherClient()
        publisher.publish(self.topic_name, element.encode('utf-8'))

# Create a Pub/Sub message subscriber transform
class PubsubMessageSubscriber(beam.DoFn):
    def __init__(self, subscription_name):
        self.subscription_name = subscription_name

    def start_bundle(self):
        # Create a Pub/Sub subscriber client and subscribe to the topic
        from google.cloud import pubsub_v1
        self.subscriber = pubsub_v1.SubscriberClient()
        self.subscriber.subscribe(self.subscription_name, callback=self.process_message)

    def process_message(self, message):
        # Process the received message
        print(f"Received message: {message.data.decode('utf-8')}")
        message.ack()

    def finish_bundle(self):
        # Stop the Pub/Sub subscriber client
        self.subscriber.stop()

# Define the pipeline
with beam.Pipeline(options=options) as pipeline:
    # Send messages to Pub/Sub topic
    messages = ['Message 1', 'Message 2', 'Message 3']
    (
        pipeline
        | "Create Messages" >> beam.Create(messages)
        | "Publish Messages" >> beam.ParDo(PubsubMessagePublisher(topic_name))
    )

    # Receive messages from Pub/Sub subscription
    (
        pipeline
        | "Subscribe to Pubsub Subscription" >> beam.io.ReadFromPubSub(subscription=subscription_name)
        | "Process Messages" >> beam.ParDo(PubsubMessageSubscriber(subscription_name))
    )


0
投票

#sending data to pubsub topic
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

# Set your Google Cloud project ID, Pub/Sub topic, and BigQuery table details
project_id = 'your-project-id'
pubsub_topic = 'projects/{project_id}/topics/{topic_name}'
bq_table = 'your-project-id.your_dataset.your_table'

# Define the Apache Beam pipeline options
options = PipelineOptions()
options.view_as(beam.options.pipeline_options.GoogleCloudOptions).project = project_id
options.view_as(beam.options.pipeline_options.GoogleCloudOptions).region = 'us-central1'
options.view_as(beam.options.pipeline_options.GoogleCloudOptions).job_name = 'bigquery-to-pubsub'
options.view_as(beam.options.pipeline_options.GoogleCloudOptions).staging_location = 'gs://your-bucket/staging'
options.view_as(beam.options.pipeline_options.GoogleCloudOptions).temp_location = 'gs://your-bucket/temp'

# Define the pipeline
pipeline = beam.Pipeline(options=options)

# Step 1: Read data from BigQuery
query = f'SELECT * FROM {bq_table}'
read_from_bq = pipeline | 'Read from BigQuery' >> beam.io.Read(beam.io.BigQuerySource(query=query, use_standard_sql=True))

# Step 2: Send data to Pub/Sub
def format_message(row):
    return row  # Modify this function to format the row as per your requirements

send_to_pubsub = read_from_bq | 'Format as Pub/Sub messages' >> beam.Map(format_message)
send_to_pubsub | 'Send to Pub/Sub' >> beam.io.WriteToPubSub(topic=pubsub_topic)

# Run the pipeline
result = pipeline.run()
result.wait_until_finish()

#receiving data from pubsub topic
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.io.gcp.bigquery import WriteToBigQuery


# Define your Pub/Sub topic and subscription
pubsub_topic = 'projects/YOUR_PROJECT_ID/topics/YOUR_TOPIC'
subscription = 'projects/YOUR_PROJECT_ID/subscriptions/YOUR_SUBSCRIPTION'

# Define your BigQuery output table details
output_table = 'YOUR_PROJECT_ID:YOUR_DATASET.YOUR_TABLE'


def run():
    # Set up pipeline options
    options = PipelineOptions()
    options.view_as(SetupOptions).save_main_session = True

    # Create a Pipeline using the defined options
    pipeline = beam.Pipeline(options=options)

    # Read messages from Pub/Sub topic
    messages = pipeline | 'Read Pub/Sub' >> beam.io.ReadFromPubSub(subscription=subscription)

    # Transform the messages as needed
    transformed_data = messages | 'Transform Data' >> beam.Map(lambda msg: your_transform_function(msg))

    # Define the BigQuery schema for the output table
    table_schema = {
        'fields': [
            {'name': 'field1', 'type': 'STRING'},
            {'name': 'field2', 'type': 'INTEGER'},
            # Add more fields as needed
        ]
    }

    # Write the transformed data to BigQuery
    transformed_data | 'Write to BigQuery' >> WriteToBigQuery(
        table=output_table,
        schema=table_schema,
        create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND
    )

    # Run the pipeline
    pipeline.run().wait_until_finish()


def your_transform_function(message):
    # Implement your own transformation logic here
    # This function should transform the incoming Pub/Sub message into a dictionary
    # with field names matching the BigQuery schema fields
    # For example:
    transformed_message = {
        'field1': message['field1'],
        'field2': int(message['field2']),
        # Add more fields as needed
    }
    return transformed_message


if __name__ == '__main__':
    run()

#read and print pubsub topic data
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

# Define your pipeline options
options = PipelineOptions()

# Define your pipeline
pipeline = beam.Pipeline(options=options)

# Define your Pub/Sub topic and subscription
topic = "projects/<your-project>/topics/<your-topic>"
subscription = "projects/<your-project>/subscriptions/<your-subscription>"

# Read data from the Pub/Sub topic
messages = (
    pipeline
    | "Read from Pub/Sub" >> beam.io.ReadFromPubSub(topic=topic, subscription=subscription)
)

# Process the messages (e.g., print them)
def process_message(message):
    print(message)

processed_messages = (
    messages
    | "Process messages" >> beam.Map(process_message)
)

# Run the pipeline
result = pipeline.run()
result.wait_until_finish()


0
投票

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

# Define your pipeline options
options = PipelineOptions()

# Define your pipeline
pipeline = beam.Pipeline(options=options)

# Define your BigQuery table
table_spec = "<your-project>:<your-dataset>.<your-table>"

# Read data from the BigQuery table
rows = (
    pipeline
    | "Read from BigQuery" >> beam.io.ReadFromBigQuery(table=table_spec)
)

# Transform the rows into Pub/Sub messages
def transform_to_pubsub(row):
    # Convert the row to a Pub/Sub message
    message = str(row)
    return message

pubsub_messages = (
    rows
    | "Transform to Pub/Sub messages" >> beam.Map(transform_to_pubsub)
)

# Write the messages to a Pub/Sub topic
topic = "projects/<your-project>/topics/<your-topic>"
pubsub_messages | "Write to Pub/Sub" >> beam.io.WriteToPubSub(topic=topic)

# Run the pipeline
result = pipeline.run()
result.wait_until_finish()

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

# Define your pipeline options
options = PipelineOptions()

# Define your pipeline
pipeline = beam.Pipeline(options=options)

# Define your Pub/Sub topic and subscription
topic = "projects/<your-project>/topics/<your-topic>"
subscription = "projects/<your-project>/subscriptions/<your-subscription>"

# Read data from the Pub/Sub topic
messages = (
    pipeline
    | "Read from Pub/Sub" >> beam.io.ReadFromPubSub(topic=topic, subscription=subscription)
)

# Process the messages (e.g., print them)
def process_message(message):
    print(message)

processed_messages = (
    messages
    | "Process messages" >> beam.Map(process_message)
)

# Run the pipeline
result = pipeline.run()
result.wait_until_finish()

© www.soinside.com 2019 - 2024. All rights reserved.