Apache Beam:使用计时器时组件数量与编码器数量不匹配

问题描述 投票:0回答:1

我的问题与这个非常相似one

我正在尝试添加一些计时器来稍后处理一些数据,但出现以下错误

Error message from worker: generic::unknown: Traceback (most recent call last):
File "apache_beam/runners/common.py", line 1435, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 636, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "apache_beam/runners/common.py", line 1621, in apache_beam.runners.common._OutputHandler.handle_process_outputs
  File "apache_beam/runners/common.py", line 1734, in apache_beam.runners.common._OutputHandler._write_value_to_tag
  File "apache_beam/runners/worker/operations.py", line 266, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 528, in apache_beam.runners.worker.operations.Operation.process
  File "/usr/local/lib/python3.9/site-packages/apache_beam/runners/worker/bundle_processor.py", line 158, in process
    self.windowed_coder_impl.encode_to_stream(
  File "apache_beam/coders/coder_impl.py", line 1448, in apache_beam.coders.coder_impl.WindowedValueCoderImpl.encode_to_stream
  File "apache_beam/coders/coder_impl.py", line 1467, in apache_beam.coders.coder_impl.WindowedValueCoderImpl.encode_to_stream
  File "apache_beam/coders/coder_impl.py", line 1023, in apache_beam.coders.coder_impl.AbstractComponentCoderImpl.encode_to_stream
ValueError: Number of components does not match number of coders.

我的多芬

class WaitUntilDevicesExist(beam.DoFn):
    BUFFER_STATE = beam.transforms.userstate.BagStateSpec('buffer', beam.coders.StrUtf8Coder())
    TIMER = beam.transforms.userstate.TimerSpec('timer', beam.TimeDomain.REAL_TIME)

    BUFFER_TIMER = 15  # seconds

    ...

    def process(self, key_value, timer=beam.DoFn.TimerParam(TIMER), buffer=beam.DoFn.StateParam(BUFFER_STATE)):
        shard_id, batch = key_value

        for message in batch:
            logging.info(f"Checking = {message}")
            
            ...

            if (...):
                timer.set(timestamp.Timestamp.now() + timestamp.Duration(seconds=self.BUFFER_TIMER)) 
                buffer.add(DeviceCheckHelper(message).to_string())
            else:
                yield message

    @beam.transforms.userstate.on_timer(TIMER)
    def expiry_callback(self, timer=beam.DoFn.TimerParam(TIMER), buffer=beam.DoFn.StateParam(BUFFER_STATE)):
        events = buffer.read()
        logging.info("Timer")
        new_buffer = []

        for row in events:
            message = DeviceCheckHelper.from_string(row)
            logging.info(message)
            
            ....

            if (...):
                if retry == 3:
                    logging.info(f"Waited 3 times, yielding ")
                    yield message.message
                else:
                    message.increase_retry()
                    new_buffer.append(message.to_string())
                    logging.info(f"retry = {message}")

        buffer.clear()
        timer.clear()

        logging.info(f"New buffer = {new_buffer}")
        if new_buffer:
            for row in new_buffer:
                logging.info(f"Adding {row}")
                buffer.add(row)

            timer.set(timestamp.Timestamp.now() + timestamp.Duration(seconds=self.BUFFER_TIMER))

管道看起来像这样

# 1 filter messages
filtered_messages = (
    transformed_messages[TransformData.TAG_OK]
    | f"Clean Devices {tenant}" >> beam.ParDo(FilterMessages()).with_outputs(FilterMessages.DEVICE_TAG, FilterMessages.OBSERVATION_TAG)
)

# 2 Write observations

    observation_results = (
        filtered_messages[FilterMessages.OBSERVATION_TAG]
        | f"{tenant} Check Devices" >> beam.ParDo(WaitUntilDevicesExist(...))
        | f"{tenant} Window Observations messages" >> GroupMessagesByShardedKey(max_messages=200, max_waiting_time=10, shard_key="obs", num_shards=10)
        | f"{tenant} Write Observations" >> beam.ParDo(Write(...)).with_outputs(FAILED_TAG)
    
    )
    

如果我将 WaitUntilDevicesExist 移到 GroupMessagesByShardedKey 之后,它就可以正常工作。我错过了什么?

python google-cloud-dataflow apache-beam
1个回答
0
投票

如果没有看到这里的其他 DoFns,很难确定,但在我看来,您正在从

WaitUntilDevicesExist
产生未键入的输出,然后在
GroupByKey
中调用
GroupMessagesByShardedKey
(或类似的操作)。您是否应该执行以下操作,而不是仅仅生成消息?

yield shard_id, message

我认为

Number of components does not match number of coders.
基本上是说编码人员期待一个键/值对,而你只是向他们传递一个值。

© www.soinside.com 2019 - 2024. All rights reserved.