我有一个程序,可以在 pubSub 中创建一个主题,并向该主题发布消息。我还有一个自动数据流作业(使用模板),它将这些消息保存到我的 BigQuery 表中。现在我打算用 python 管道替换基于模板的作业,其中我的要求是从 PubSub 读取数据,应用转换并将数据保存到 BigQuery/发布到另一个 PubSub 主题。我开始用 python 编写脚本,并进行了大量的试验和错误来实现它,但令我沮丧的是,我无法实现它。代码如下所示:
import apache_beam as beam
from apache_beam.io import WriteToText
TOPIC_PATH = "projects/test-pipeline-253103/topics/test-pipeline-topic"
OUTPUT_PATH = "projects/test-pipeline-253103/topics/topic-repub"
def run():
o = beam.options.pipeline_options.PipelineOptions()
p = beam.Pipeline(options=o)
print("I reached here")
# # Read from PubSub into a PCollection.
data = (
p
| "Read From Pub/Sub" >> beam.io.ReadFromPubSub(topic=TOPIC_PATH)
)
data | beam.io.WriteToPubSub(topic=OUTPUT_PATH)
print("Lines: ", data)
run()
如果我能尽早获得一些帮助,我将非常感激。 注意:我在谷歌云上设置了我的项目,并且我的脚本在本地运行。
这是工作代码。
import apache_beam as beam
TOPIC_PATH = "projects/test-pipeline-253103/topics/test-pipeline-topic"
OUTPUT_PATH = "projects/test-pipeline-253103/topics/topic-repub"
class PrintValue(beam.DoFn):
def process(self, element):
print(element)
return [element]
def run():
o = beam.options.pipeline_options.PipelineOptions()
# Replace this by --stream execution param
standard_options = o.view_as(beam.options.pipeline_options.StandardOptions)
standard_options.streaming = True
p = beam.Pipeline(options=o)
print("I reached here")
# # Read from PubSub into a PCollection.
data = p | beam.io.ReadFromPubSub(topic=TOPIC_PATH) | beam.ParDo(PrintValue()) | beam.io.WriteToPubSub(topic=OUTPUT_PATH)
# Don't forget to run the pipeline!
result = p.run()
result.wait_until_finish()
run()
总结一下
--streaming
参数启动代码,或者按照我的代码中所示以编程方式执行此操作请注意,流模式意味着在 PubSub 上无限期地收听。如果您在 Dataflow 上运行此命令,您的管道将始终处于运行状态,直到您停止它。如果您的消息很少,这可能会花费昂贵的费用。确保这是目标型号
另一种方法是在有限的时间内使用管道(您使用调度程序来启动它,并使用另一个调度程序来停止它)。但是,此时,你必须堆叠消息。在这里,您使用
Topic
作为管道的入口。此选项强制 Beam 创建临时订阅并侦听此订阅上的消息。这意味着在创建此订阅之前发布的消息将不会被接收和处理。
这个想法是创建一个订阅,这样消息就会堆积在其中(默认最多7天)。然后,在管道的条目中使用订阅名称
beam.io.ReadFromPubSub(subscription=SUB_PATH)
。消息将由 Beam 拆栈并处理(不保证顺序!)
根据Beam编程指南,您只需在管道中添加一个转换步骤即可。这是一个例子或转换:
class PrintValue(beam.DoFn):
def process(self, element):
print(element)
return [element]
将其添加到您的管道中
data | beam.ParDo(PrintValue()) | beam.io.WriteToPubSub(topic=OUTPUT_PATH)
您可以添加所需的变换数量。您可以测试该值并设置标记的 PCollection 中的元素(用于具有多个输出)以进行扇出,或在 PCollection 中使用侧面输入作为扇形。
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
# Set the Google Cloud project ID
project_id = 'your-project-id'
# Set the Pub/Sub topic and subscription names
topic_name = 'projects/{project_id}/topics/{topic_name}'
subscription_name = 'projects/{project_id}/subscriptions/{subscription_name}'
# Define the pipeline options
options = PipelineOptions(
streaming=True,
runner='DirectRunner', # Use DirectRunner for testing, DataflowRunner for production
project=project_id,
region='us-central1',
)
# Create a Pub/Sub message publisher transform
class PubsubMessagePublisher(beam.DoFn):
def __init__(self, topic_name):
self.topic_name = topic_name
def process(self, element):
# Publish the message to the Pub/Sub topic
from google.cloud import pubsub_v1
publisher = pubsub_v1.PublisherClient()
publisher.publish(self.topic_name, element.encode('utf-8'))
# Create a Pub/Sub message subscriber transform
class PubsubMessageSubscriber(beam.DoFn):
def __init__(self, subscription_name):
self.subscription_name = subscription_name
def start_bundle(self):
# Create a Pub/Sub subscriber client and subscribe to the topic
from google.cloud import pubsub_v1
self.subscriber = pubsub_v1.SubscriberClient()
self.subscriber.subscribe(self.subscription_name, callback=self.process_message)
def process_message(self, message):
# Process the received message
print(f"Received message: {message.data.decode('utf-8')}")
message.ack()
def finish_bundle(self):
# Stop the Pub/Sub subscriber client
self.subscriber.stop()
# Define the pipeline
with beam.Pipeline(options=options) as pipeline:
# Send messages to Pub/Sub topic
messages = ['Message 1', 'Message 2', 'Message 3']
(
pipeline
| "Create Messages" >> beam.Create(messages)
| "Publish Messages" >> beam.ParDo(PubsubMessagePublisher(topic_name))
)
# Receive messages from Pub/Sub subscription
(
pipeline
| "Subscribe to Pubsub Subscription" >> beam.io.ReadFromPubSub(subscription=subscription_name)
| "Process Messages" >> beam.ParDo(PubsubMessageSubscriber(subscription_name))
)
#sending data to pubsub topic
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
# Set your Google Cloud project ID, Pub/Sub topic, and BigQuery table details
project_id = 'your-project-id'
pubsub_topic = 'projects/{project_id}/topics/{topic_name}'
bq_table = 'your-project-id.your_dataset.your_table'
# Define the Apache Beam pipeline options
options = PipelineOptions()
options.view_as(beam.options.pipeline_options.GoogleCloudOptions).project = project_id
options.view_as(beam.options.pipeline_options.GoogleCloudOptions).region = 'us-central1'
options.view_as(beam.options.pipeline_options.GoogleCloudOptions).job_name = 'bigquery-to-pubsub'
options.view_as(beam.options.pipeline_options.GoogleCloudOptions).staging_location = 'gs://your-bucket/staging'
options.view_as(beam.options.pipeline_options.GoogleCloudOptions).temp_location = 'gs://your-bucket/temp'
# Define the pipeline
pipeline = beam.Pipeline(options=options)
# Step 1: Read data from BigQuery
query = f'SELECT * FROM {bq_table}'
read_from_bq = pipeline | 'Read from BigQuery' >> beam.io.Read(beam.io.BigQuerySource(query=query, use_standard_sql=True))
# Step 2: Send data to Pub/Sub
def format_message(row):
return row # Modify this function to format the row as per your requirements
send_to_pubsub = read_from_bq | 'Format as Pub/Sub messages' >> beam.Map(format_message)
send_to_pubsub | 'Send to Pub/Sub' >> beam.io.WriteToPubSub(topic=pubsub_topic)
# Run the pipeline
result = pipeline.run()
result.wait_until_finish()
#receiving data from pubsub topic
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.io.gcp.bigquery import WriteToBigQuery
# Define your Pub/Sub topic and subscription
pubsub_topic = 'projects/YOUR_PROJECT_ID/topics/YOUR_TOPIC'
subscription = 'projects/YOUR_PROJECT_ID/subscriptions/YOUR_SUBSCRIPTION'
# Define your BigQuery output table details
output_table = 'YOUR_PROJECT_ID:YOUR_DATASET.YOUR_TABLE'
def run():
# Set up pipeline options
options = PipelineOptions()
options.view_as(SetupOptions).save_main_session = True
# Create a Pipeline using the defined options
pipeline = beam.Pipeline(options=options)
# Read messages from Pub/Sub topic
messages = pipeline | 'Read Pub/Sub' >> beam.io.ReadFromPubSub(subscription=subscription)
# Transform the messages as needed
transformed_data = messages | 'Transform Data' >> beam.Map(lambda msg: your_transform_function(msg))
# Define the BigQuery schema for the output table
table_schema = {
'fields': [
{'name': 'field1', 'type': 'STRING'},
{'name': 'field2', 'type': 'INTEGER'},
# Add more fields as needed
]
}
# Write the transformed data to BigQuery
transformed_data | 'Write to BigQuery' >> WriteToBigQuery(
table=output_table,
schema=table_schema,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND
)
# Run the pipeline
pipeline.run().wait_until_finish()
def your_transform_function(message):
# Implement your own transformation logic here
# This function should transform the incoming Pub/Sub message into a dictionary
# with field names matching the BigQuery schema fields
# For example:
transformed_message = {
'field1': message['field1'],
'field2': int(message['field2']),
# Add more fields as needed
}
return transformed_message
if __name__ == '__main__':
run()
#read and print pubsub topic data
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
# Define your pipeline options
options = PipelineOptions()
# Define your pipeline
pipeline = beam.Pipeline(options=options)
# Define your Pub/Sub topic and subscription
topic = "projects/<your-project>/topics/<your-topic>"
subscription = "projects/<your-project>/subscriptions/<your-subscription>"
# Read data from the Pub/Sub topic
messages = (
pipeline
| "Read from Pub/Sub" >> beam.io.ReadFromPubSub(topic=topic, subscription=subscription)
)
# Process the messages (e.g., print them)
def process_message(message):
print(message)
processed_messages = (
messages
| "Process messages" >> beam.Map(process_message)
)
# Run the pipeline
result = pipeline.run()
result.wait_until_finish()
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
# Define your pipeline options
options = PipelineOptions()
# Define your pipeline
pipeline = beam.Pipeline(options=options)
# Define your BigQuery table
table_spec = "<your-project>:<your-dataset>.<your-table>"
# Read data from the BigQuery table
rows = (
pipeline
| "Read from BigQuery" >> beam.io.ReadFromBigQuery(table=table_spec)
)
# Transform the rows into Pub/Sub messages
def transform_to_pubsub(row):
# Convert the row to a Pub/Sub message
message = str(row)
return message
pubsub_messages = (
rows
| "Transform to Pub/Sub messages" >> beam.Map(transform_to_pubsub)
)
# Write the messages to a Pub/Sub topic
topic = "projects/<your-project>/topics/<your-topic>"
pubsub_messages | "Write to Pub/Sub" >> beam.io.WriteToPubSub(topic=topic)
# Run the pipeline
result = pipeline.run()
result.wait_until_finish()
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
# Define your pipeline options
options = PipelineOptions()
# Define your pipeline
pipeline = beam.Pipeline(options=options)
# Define your Pub/Sub topic and subscription
topic = "projects/<your-project>/topics/<your-topic>"
subscription = "projects/<your-project>/subscriptions/<your-subscription>"
# Read data from the Pub/Sub topic
messages = (
pipeline
| "Read from Pub/Sub" >> beam.io.ReadFromPubSub(topic=topic, subscription=subscription)
)
# Process the messages (e.g., print them)
def process_message(message):
print(message)
processed_messages = (
messages
| "Process messages" >> beam.Map(process_message)
)
# Run the pipeline
result = pipeline.run()
result.wait_until_finish()