我有一个配置了检查点的 Pyflink 程序。然而,检查点有时会卡住或超时。
我主要检查Flink仪表板,一些子任务没有确认检查点请求。
由于每个操作员仅保留少量状态,因此我希望检查点能够更快地完成。非常感谢任何帮助。我附上了我一直在做的事情的一个最小例子。
谢谢你
import time
from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.common import Configuration, WatermarkStrategy
from pyflink.datastream.functions import FlatMapFunction
from pyflink.datastream import CheckpointingMode
from pyflink.datastream.connectors.number_seq import NumberSequenceSource
from pyflink.datastream.functions import RuntimeContext
from pyflink.datastream.state import ValueStateDescriptor
PRODUCER_PARALLELISM = 1
CONSUMER_PARALLELISM = 1
NUM_TASKS = 16 * 5 * 10000
TIME_UNIT = 0.5
class Producer(FlatMapFunction):
def __init__(self):
self.count = None
def open(self, runtime_context: RuntimeContext):
self.task_info = runtime_context.get_task_name_with_subtasks()
self.task_index = runtime_context.get_index_of_this_subtask()
self.attempt_number = runtime_context.get_attempt_number()
descriptor = ValueStateDescriptor(
"average",
Types.INT(),
)
self.count = runtime_context.get_state(descriptor)
print(f"Opened Producer {self.task_info} with index {self.task_index}")
def flat_map(self, value):
self.count.update((self.count.value() or 0) + 1)
time.sleep(TIME_UNIT)
print(f"count: {self.count.value()}, {value}")
cnt = 0
while cnt < 10000:
cnt += 1
yield value
def run_flink(env):
start = time.perf_counter()
number_source = NumberSequenceSource(0, NUM_TASKS)
ds = env.from_source(
source=number_source,
watermark_strategy=WatermarkStrategy.for_monotonous_timestamps(),
source_name="file_source",
type_info=Types.LONG(),
).set_parallelism(1)
ds = ds.key_by(lambda x: x % PRODUCER_PARALLELISM)
ds = ds.flat_map(Producer(), output_type=Types.PICKLED_BYTE_ARRAY()).set_parallelism(
PRODUCER_PARALLELISM
)
env.execute("Example")
end = time.perf_counter()
print(f"Time: {end - start:.4f}s")
def run_experiment():
config = Configuration()
config.set_string("restart-strategy.type", "fixed-delay")
config.set_string("restart-strategy.fixed-delay.attempts", "3")
config.set_string("restart-strategy.fixed-delay.delay", "10000ms")
config.set_string("state.backend.type", "hashmap")
config.set_string(
"state.checkpoints.dir",
"file:///home/ubuntu/.../flink/flink-checkpoints",
)
env = StreamExecutionEnvironment.get_execution_environment(config)
env = env.enable_checkpointing(5000, CheckpointingMode.EXACTLY_ONCE)
run_flink(env)
def main():
run_experiment()
if __name__ == "__main__":
main()
我正在使用
flink-1.20.0
。我在 Flink 仪表板上看到 Source: file_source -> _stream_key_by_map_operator
已被确认,但还没有 FlatMap
。
我认为主要问题是你的 flat_map 处理的每个元素都有 0.5 秒的延迟 (
time.sleep
)。这将产生显着的背压,从而延迟检查点屏障到达您的函数的时间。如果您的 flat_map 实际上需要花费那么多时间来处理每条记录,您可能需要启用未对齐的检查点。
另外,你的 flat_map 之后没有水槽。即使图中没有终端接收器,Flink 仍然会运行您的操作符,但为了清洁起见,至少在 flat_map 之后放置一个 DiscardingSink。