Pyflink 水印卡住了

问题描述 投票:0回答:1

这是我的 pyflink 工作。无论我如何尝试,我都无法提前获得水印。

我看到的输出是:

Event: (1, 1000), Current Watermark: -9223372036854775808
Event: (2, 2000), Current Watermark: -9223372036854775808
Event: (3, 3000), Current Watermark: -9223372036854775808
Event: (4, 4000), Current Watermark: -9223372036854775808
Event: (5, 5000), Current Watermark: -9223372036854775808
(1, 1000)
(2, 2000)
(3, 3000)
(4, 4000)
(5, 5000)

我的理解是水印被卡住了。

当我在 Java 中执行相同操作时,尽管我在元素之间添加了 Thread.sleep(1000),但我确实看到了水印。我尝试在 python 中做同样的事情,但它也不起作用。

from pyflink.datastream import StreamExecutionEnvironment
from pyflink.common.watermark_strategy import WatermarkStrategy, TimestampAssigner
from pyflink.datastream.functions import ProcessFunction
from pyflink.common import Duration

# Custom TimestampAssigner
class CustomTimestampAssigner(TimestampAssigner):
    def extract_timestamp(self, element, record_timestamp):
        return element[1]

# Custom ProcessFunction
class PrintWatermarkProcessFunction(ProcessFunction):
    def process_element(self, value, ctx: ProcessFunction.Context):
        current_watermark = ctx.timer_service().current_watermark()
        print(f"Event: {value}, Current Watermark: {current_watermark}")
        yield value  # Forward event

# Execution Environment
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)

# Create a data source with more events to better illustrate watermarks
events = [
    (1, 1000),
    (2, 2000),
    (3, 3000),
    (4, 4000),
    (5, 5000)
]
source = env.from_collection(events)

# WatermarkStrategy with 1 second out-of-orderness
watermark_strategy = (
    WatermarkStrategy.for_bounded_out_of_orderness(Duration.of_seconds(1))
    .with_timestamp_assigner(CustomTimestampAssigner())
)

# Assign watermarks
watermarked_stream = source.assign_timestamps_and_watermarks(watermark_strategy)

# Print watermark progression
processed_stream = watermarked_stream.process(PrintWatermarkProcessFunction())

# Print the events
processed_stream.print()

# Execute the job
env.execute("Flink Job with Proper Watermark Emission")
python apache-flink flink-streaming pyflink
1个回答
0
投票

默认情况下,Flink 会每 200ms 生成一个水印。因此,在您的工作流程中,所有事件都将在生成任何水印之前进行处理。加入延迟可以使速度减慢到生成水印的程度,我认为这就是您的 Java 代码所发生的情况。

如果需要,您可以使用标点水印生成器为每个事件生成水印。请参阅文档了解如何创建它。

最新问题
© www.soinside.com 2019 - 2025. All rights reserved.