python 数据流:GroupByKey 无法应用于具有全局窗口和默认触发器的无界 PCollection

问题描述 投票:0回答:1

我有一个简单的 python 数据流代码,它使用无界 pcollection 。简直就是

  • 从 pubsub 读取
  • 解析为带有输出标签 SUCCESS 和 FAILURE 的 json
  • 使用输出标签 SUCCESS 和 FAILURE 验证 json。它只是产生 如果验证不正确,则带有 FAILED 标签的元素,否则 它产生的元素为 SUCCESS
  • 打印成功标签

这是示例代码

         | "Read from Pub/Sub" >> io.ReadFromPubSub(subscription=str(user_options.input_sub))
         #  | 'WindowIntoFixedWindows' >> beam.WindowInto(FixedWindows(60), trigger=Repeatedly(AfterProcessingTime(60)), accumulation_mode=AccumulationMode.DISCARDING)
         #  | "Window into Fixed Intervals" >> beam.WindowInto(GlobalWindows(),trigger=Repeatedly(AfterAny(AfterCount(10), AfterProcessingTime(10))),  accumulation_mode=AccumulationMode.DISCARDING)
         | 'Parse into JSON' >> ParDo(functions.parse_into_json).with_outputs(FAILED_TAG, main=SUCCESS_TAG))
        
# Validate successful json records
validated_data = (items[SUCCESS_TAG]
      | 'Validate Raw Product JSON Data' >> ParDo(functions.validate_product_data).with_outputs(FAILED_TAG, main=SUCCESS_TAG))
  # # Persist successful json records in BQ as raw
   (validated_data[SUCCESS_TAG]
            | "Print each line " >> beam.Map(print))

当我在本地运行此代码时,它可以成功运行。这是我用来在本地运行的命令

python3 product_pubsub_to_bq_to_crs_dataflow.py --staging_location gs://SOMEBUCKET-dataflow1/stage --temp_location gs://SOMEBUCKET-dataflow1/temp --project SOME_GCP_PROJECT --region us-central1 --streaming --input_sub projects/SOME_GCP_PROJECT/subscriptions/sub-for-prd --bq_raw_dest_table=SOME_GCP_PROJECT:myds_raw.prd_raw --bq_raw_failure_dest_table=SOME_GCP_PROJECT:myds_raw.prd_raw_failed --bq_curated_dest_table=SOME_GCP_PROJECT:myds_raw.prd_curated --bq_curated_failure_dest_table=SOME_GCP_PROJECT:myds_raw.incr_prd_failed --crs_branch 0 --crs_project SOME_GCP_PROJECT --setup_file ./src/setup.py

当我在数据流上运行它时,它总是失败并出现此错误

ValueError: GroupByKey cannot be applied to an unbounded PCollection with global windowing and a default trigger

在这条线上

 (validated_data[SUCCESS_TAG]

这是我用来在数据流上运行的命令

gcloud dataflow flex-template run incr-prd-pubsub-to-bq-to-crs --template-file-gcs-location gs://SOME_BUCKET-dataflow1/templates/prd_raw_gcs_to_bq.json --additional-experiments enable_prime --enable-streaming-engine --region us-central1 --network default --service-account-email [email protected] --subnetwork https://www.googleapis.com/compute/v1/projects/SOME_GCP_PROJECT/regions/us-central1/subnetworks/default --parameters input_sub=projects/SOME_GCP_PROJECT/subscriptions/sub-for-prd --parameters bq_raw_dest_table=SOME_GCP_PROJECT:myds_raw.prd_raw --parameters bq_raw_failure_dest_table=SOME_GCP_PROJECT:myds_raw.prd_raw_failed --parameters bq_curated_dest_table=SOME_GCP_PROJECT:myds_raw.prd_curated --parameters bq_curated_failure_dest_table=SOME_GCP_PROJECT:myds_raw.incr_prd_failed --parameters crs_branch=0 --parameters crs_project=SOME_GCP_PROJECT

我尝试使用带有触发器的固定窗口和带有触发器的全局窗口,但它始终失败并出现相同的错误
在从 pubsub 读取数据后,我还在管道中添加了这个窗口步骤

|'WindowIntoFixedWindows' >> beam.WindowInto(FixedWindows(60), trigger=Repeatedly(AfterProcessingTime(60)), accumulation_mode=AccumulationMode.DISCARDING)

这里的示例 1 是解释无界集合的另一个主题
这里是一个类似的问题

但每当我尝试获取上一步的标记输出时,它仍然失败。

validated_data[SUCCESS_TAG]

很可能是由于无界集合中的标记输出引起的。如果我在流模式下没有无界的 pubsub pcollection,而只是批处理中有界的 pcollection,则相同的代码可以工作。

这是示例验证函数,它只是标记输出

def validate_product_data(self, elem):
        """
        Par Do Function to perform product validations
        """
        #TODO Add all validations here so when we transform later, we don't have to check 
        top_level_keys = elem.keys()
        self.helper.validate_key(elem, 'id', top_level_keys)
        self.helper.validate_key(elem, 'name', top_level_keys)
        self.helper.validate_key(elem, 'brand', top_level_keys)
        if FAILED_REASON in elem.keys():
            yield beam.pvalue.TaggedOutput(FAILED_TAG, elem)
        else:
            yield elem

这是原始代码的屏幕截图,没有窗口,但仍然失败

python google-cloud-dataflow apache-beam google-cloud-pubsub
1个回答
0
投票

有3个地方指定了流式传输 构建期间

gcloud dataflow flex-template build ${DF_PRODUCT_PUBSUB_TO_BQ_TO_CRS_TEMPLATE_LOCATION} \
--image-gcr-path "${GCR_HOSTNAME}/${GCR_PROJECT_ID}/${DATAFLOW_IMAGE_REPO}/${DF_PRODUCT_PUBSUB_TO_BQ_TO_CRS_IMAGE_NAME}:latest" \
--additional-experiments "use_network_tags_for_flex_templates=${NETWORK_TAG}" \
--enable-streaming-engine \

在代码中

pipeline_options = PipelineOptions()
pipeline_options.view_as(SetupOptions).save_main_session = True
pipeline_options.view_as(StandardOptions).streaming = True

跑步时

gcloud dataflow flex-template run ${DF_INCR_PRODUCT_PUBSUB_TO_BQ_TO_CRS_JOB_NAME}-${prefix} \
--template-file-gcs-location ${DF_PRODUCT_PUBSUB_TO_BQ_TO_CRS_TEMPLATE_LOCATION} \
--additional-experiments enable_prime \
--enable-streaming-engine \
  • 如果我在构建期间保留它,请将其保留在代码中并在构建期间保留它 运行,无论如何总是失败

  • 如果我在构建期间将其删除,但将其保留在代码中和运行期间 ,效果很好

  • 如果我在构建期间删除,请将其从代码中删除,但在构建期间保留它 运行,失败

  • 如果我在构建期间删除它,将其保留在代码中,在运行期间删除它,它 效果也很好

所以显然在构建过程中使用它导致了这个问题。不需要开窗

© www.soinside.com 2019 - 2024. All rights reserved.