这是我的 pyflink 工作。无论我如何尝试,我都无法提前获得水印。
我看到的输出是:
Event: (1, 1000), Current Watermark: -9223372036854775808
Event: (2, 2000), Current Watermark: -9223372036854775808
Event: (3, 3000), Current Watermark: -9223372036854775808
Event: (4, 4000), Current Watermark: -9223372036854775808
Event: (5, 5000), Current Watermark: -9223372036854775808
(1, 1000)
(2, 2000)
(3, 3000)
(4, 4000)
(5, 5000)
我的理解是水印被卡住了。
当我在 Java 中执行相同操作时,尽管我在元素之间添加了 Thread.sleep(1000),但我确实看到了水印。我尝试在 python 中做同样的事情,但它也不起作用。
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.common.watermark_strategy import WatermarkStrategy, TimestampAssigner
from pyflink.datastream.functions import ProcessFunction
from pyflink.common import Duration
# Custom TimestampAssigner
class CustomTimestampAssigner(TimestampAssigner):
def extract_timestamp(self, element, record_timestamp):
return element[1]
# Custom ProcessFunction
class PrintWatermarkProcessFunction(ProcessFunction):
def process_element(self, value, ctx: ProcessFunction.Context):
current_watermark = ctx.timer_service().current_watermark()
print(f"Event: {value}, Current Watermark: {current_watermark}")
yield value # Forward event
# Execution Environment
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
# Create a data source with more events to better illustrate watermarks
events = [
(1, 1000),
(2, 2000),
(3, 3000),
(4, 4000),
(5, 5000)
]
source = env.from_collection(events)
# WatermarkStrategy with 1 second out-of-orderness
watermark_strategy = (
WatermarkStrategy.for_bounded_out_of_orderness(Duration.of_seconds(1))
.with_timestamp_assigner(CustomTimestampAssigner())
)
# Assign watermarks
watermarked_stream = source.assign_timestamps_and_watermarks(watermark_strategy)
# Print watermark progression
processed_stream = watermarked_stream.process(PrintWatermarkProcessFunction())
# Print the events
processed_stream.print()
# Execute the job
env.execute("Flink Job with Proper Watermark Emission")
默认情况下,Flink 会每 200ms 生成一个水印。因此,在您的工作流程中,所有事件都将在生成任何水印之前进行处理。加入延迟可以使速度减慢到生成水印的程度,我认为这就是您的 Java 代码所发生的情况。
如果需要,您可以使用标点水印生成器为每个事件生成水印。请参阅文档了解如何创建它。