我的问题与这个非常相似one
我正在尝试添加一些计时器来稍后处理一些数据,但出现以下错误
Error message from worker: generic::unknown: Traceback (most recent call last):
File "apache_beam/runners/common.py", line 1435, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 636, in apache_beam.runners.common.SimpleInvoker.invoke_process
File "apache_beam/runners/common.py", line 1621, in apache_beam.runners.common._OutputHandler.handle_process_outputs
File "apache_beam/runners/common.py", line 1734, in apache_beam.runners.common._OutputHandler._write_value_to_tag
File "apache_beam/runners/worker/operations.py", line 266, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
File "apache_beam/runners/worker/operations.py", line 528, in apache_beam.runners.worker.operations.Operation.process
File "/usr/local/lib/python3.9/site-packages/apache_beam/runners/worker/bundle_processor.py", line 158, in process
self.windowed_coder_impl.encode_to_stream(
File "apache_beam/coders/coder_impl.py", line 1448, in apache_beam.coders.coder_impl.WindowedValueCoderImpl.encode_to_stream
File "apache_beam/coders/coder_impl.py", line 1467, in apache_beam.coders.coder_impl.WindowedValueCoderImpl.encode_to_stream
File "apache_beam/coders/coder_impl.py", line 1023, in apache_beam.coders.coder_impl.AbstractComponentCoderImpl.encode_to_stream
ValueError: Number of components does not match number of coders.
我的多芬
class WaitUntilDevicesExist(beam.DoFn):
BUFFER_STATE = beam.transforms.userstate.BagStateSpec('buffer', beam.coders.StrUtf8Coder())
TIMER = beam.transforms.userstate.TimerSpec('timer', beam.TimeDomain.REAL_TIME)
BUFFER_TIMER = 15 # seconds
...
def process(self, key_value, timer=beam.DoFn.TimerParam(TIMER), buffer=beam.DoFn.StateParam(BUFFER_STATE)):
shard_id, batch = key_value
for message in batch:
logging.info(f"Checking = {message}")
...
if (...):
timer.set(timestamp.Timestamp.now() + timestamp.Duration(seconds=self.BUFFER_TIMER))
buffer.add(DeviceCheckHelper(message).to_string())
else:
yield message
@beam.transforms.userstate.on_timer(TIMER)
def expiry_callback(self, timer=beam.DoFn.TimerParam(TIMER), buffer=beam.DoFn.StateParam(BUFFER_STATE)):
events = buffer.read()
logging.info("Timer")
new_buffer = []
for row in events:
message = DeviceCheckHelper.from_string(row)
logging.info(message)
....
if (...):
if retry == 3:
logging.info(f"Waited 3 times, yielding ")
yield message.message
else:
message.increase_retry()
new_buffer.append(message.to_string())
logging.info(f"retry = {message}")
buffer.clear()
timer.clear()
logging.info(f"New buffer = {new_buffer}")
if new_buffer:
for row in new_buffer:
logging.info(f"Adding {row}")
buffer.add(row)
timer.set(timestamp.Timestamp.now() + timestamp.Duration(seconds=self.BUFFER_TIMER))
管道看起来像这样
# 1 filter messages
filtered_messages = (
transformed_messages[TransformData.TAG_OK]
| f"Clean Devices {tenant}" >> beam.ParDo(FilterMessages()).with_outputs(FilterMessages.DEVICE_TAG, FilterMessages.OBSERVATION_TAG)
)
# 2 Write observations
observation_results = (
filtered_messages[FilterMessages.OBSERVATION_TAG]
| f"{tenant} Check Devices" >> beam.ParDo(WaitUntilDevicesExist(...))
| f"{tenant} Window Observations messages" >> GroupMessagesByShardedKey(max_messages=200, max_waiting_time=10, shard_key="obs", num_shards=10)
| f"{tenant} Write Observations" >> beam.ParDo(Write(...)).with_outputs(FAILED_TAG)
)
如果我将 WaitUntilDevicesExist 移到 GroupMessagesByShardedKey 之后,它就可以正常工作。我错过了什么?
如果没有看到这里的其他 DoFns,很难确定,但在我看来,您正在从
WaitUntilDevicesExist
产生未键入的输出,然后在 GroupByKey
中调用 GroupMessagesByShardedKey
(或类似的操作)。您是否应该执行以下操作,而不是仅仅生成消息?
yield shard_id, message
我认为
Number of components does not match number of coders.
基本上是说编码人员期待一个键/值对,而你只是向他们传递一个值。