我有一个简单的 python 数据流代码,它使用无界 pcollection 。简直就是
这是示例代码
| "Read from Pub/Sub" >> io.ReadFromPubSub(subscription=str(user_options.input_sub))
# | 'WindowIntoFixedWindows' >> beam.WindowInto(FixedWindows(60), trigger=Repeatedly(AfterProcessingTime(60)), accumulation_mode=AccumulationMode.DISCARDING)
# | "Window into Fixed Intervals" >> beam.WindowInto(GlobalWindows(),trigger=Repeatedly(AfterAny(AfterCount(10), AfterProcessingTime(10))), accumulation_mode=AccumulationMode.DISCARDING)
| 'Parse into JSON' >> ParDo(functions.parse_into_json).with_outputs(FAILED_TAG, main=SUCCESS_TAG))
# Validate successful json records
validated_data = (items[SUCCESS_TAG]
| 'Validate Raw Product JSON Data' >> ParDo(functions.validate_product_data).with_outputs(FAILED_TAG, main=SUCCESS_TAG))
# # Persist successful json records in BQ as raw
(validated_data[SUCCESS_TAG]
| "Print each line " >> beam.Map(print))
当我在本地运行此代码时,它可以成功运行。这是我用来在本地运行的命令
python3 product_pubsub_to_bq_to_crs_dataflow.py --staging_location gs://SOMEBUCKET-dataflow1/stage --temp_location gs://SOMEBUCKET-dataflow1/temp --project SOME_GCP_PROJECT --region us-central1 --streaming --input_sub projects/SOME_GCP_PROJECT/subscriptions/sub-for-prd --bq_raw_dest_table=SOME_GCP_PROJECT:myds_raw.prd_raw --bq_raw_failure_dest_table=SOME_GCP_PROJECT:myds_raw.prd_raw_failed --bq_curated_dest_table=SOME_GCP_PROJECT:myds_raw.prd_curated --bq_curated_failure_dest_table=SOME_GCP_PROJECT:myds_raw.incr_prd_failed --crs_branch 0 --crs_project SOME_GCP_PROJECT --setup_file ./src/setup.py
当我在数据流上运行它时,它总是失败并出现此错误
ValueError: GroupByKey cannot be applied to an unbounded PCollection with global windowing and a default trigger
在这条线上
(validated_data[SUCCESS_TAG]
这是我用来在数据流上运行的命令
gcloud dataflow flex-template run incr-prd-pubsub-to-bq-to-crs --template-file-gcs-location gs://SOME_BUCKET-dataflow1/templates/prd_raw_gcs_to_bq.json --additional-experiments enable_prime --enable-streaming-engine --region us-central1 --network default --service-account-email [email protected] --subnetwork https://www.googleapis.com/compute/v1/projects/SOME_GCP_PROJECT/regions/us-central1/subnetworks/default --parameters input_sub=projects/SOME_GCP_PROJECT/subscriptions/sub-for-prd --parameters bq_raw_dest_table=SOME_GCP_PROJECT:myds_raw.prd_raw --parameters bq_raw_failure_dest_table=SOME_GCP_PROJECT:myds_raw.prd_raw_failed --parameters bq_curated_dest_table=SOME_GCP_PROJECT:myds_raw.prd_curated --parameters bq_curated_failure_dest_table=SOME_GCP_PROJECT:myds_raw.incr_prd_failed --parameters crs_branch=0 --parameters crs_project=SOME_GCP_PROJECT
我尝试使用带有触发器的固定窗口和带有触发器的全局窗口,但它始终失败并出现相同的错误
在从 pubsub 读取数据后,我还在管道中添加了这个窗口步骤
|'WindowIntoFixedWindows' >> beam.WindowInto(FixedWindows(60), trigger=Repeatedly(AfterProcessingTime(60)), accumulation_mode=AccumulationMode.DISCARDING)
这里的示例 1 是解释无界集合的另一个主题
这里是一个类似的问题
但每当我尝试获取上一步的标记输出时,它仍然失败。
validated_data[SUCCESS_TAG]
很可能是由于无界集合中的标记输出引起的。如果我在流模式下没有无界的 pubsub pcollection,而只是批处理中有界的 pcollection,则相同的代码可以工作。
这是示例验证函数,它只是标记输出
def validate_product_data(self, elem):
"""
Par Do Function to perform product validations
"""
#TODO Add all validations here so when we transform later, we don't have to check
top_level_keys = elem.keys()
self.helper.validate_key(elem, 'id', top_level_keys)
self.helper.validate_key(elem, 'name', top_level_keys)
self.helper.validate_key(elem, 'brand', top_level_keys)
if FAILED_REASON in elem.keys():
yield beam.pvalue.TaggedOutput(FAILED_TAG, elem)
else:
yield elem
这是原始代码的屏幕截图,没有窗口,但仍然失败
有3个地方指定了流式传输 构建期间
gcloud dataflow flex-template build ${DF_PRODUCT_PUBSUB_TO_BQ_TO_CRS_TEMPLATE_LOCATION} \
--image-gcr-path "${GCR_HOSTNAME}/${GCR_PROJECT_ID}/${DATAFLOW_IMAGE_REPO}/${DF_PRODUCT_PUBSUB_TO_BQ_TO_CRS_IMAGE_NAME}:latest" \
--additional-experiments "use_network_tags_for_flex_templates=${NETWORK_TAG}" \
--enable-streaming-engine \
在代码中
pipeline_options = PipelineOptions()
pipeline_options.view_as(SetupOptions).save_main_session = True
pipeline_options.view_as(StandardOptions).streaming = True
跑步时
gcloud dataflow flex-template run ${DF_INCR_PRODUCT_PUBSUB_TO_BQ_TO_CRS_JOB_NAME}-${prefix} \
--template-file-gcs-location ${DF_PRODUCT_PUBSUB_TO_BQ_TO_CRS_TEMPLATE_LOCATION} \
--additional-experiments enable_prime \
--enable-streaming-engine \
如果我在构建期间保留它,请将其保留在代码中并在构建期间保留它 运行,无论如何总是失败
如果我在构建期间将其删除,但将其保留在代码中和运行期间 ,效果很好
如果我在构建期间删除,请将其从代码中删除,但在构建期间保留它 运行,失败
如果我在构建期间删除它,将其保留在代码中,在运行期间删除它,它 效果也很好
所以显然在构建过程中使用它导致了这个问题。不需要开窗