Flink 检查点缓慢/卡住

问题描述 投票:0回答:1

我有一个配置了检查点的 Pyflink 程序。然而,检查点有时会卡住或超时。

我主要检查Flink仪表板,一些子任务没有确认检查点请求。

由于每个操作员仅保留少量状态,因此我希望检查点能够更快地完成。非常感谢任何帮助。我附上了我一直在做的事情的一个最小例子。

谢谢你

import time
from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.common import Configuration, WatermarkStrategy
from pyflink.datastream.functions import FlatMapFunction
from pyflink.datastream import CheckpointingMode
from pyflink.datastream.connectors.number_seq import NumberSequenceSource
from pyflink.datastream.functions import RuntimeContext
from pyflink.datastream.state import ValueStateDescriptor

PRODUCER_PARALLELISM = 1
CONSUMER_PARALLELISM = 1

NUM_TASKS = 16 * 5 * 10000
TIME_UNIT = 0.5

class Producer(FlatMapFunction):
    def __init__(self):
        self.count = None

    def open(self, runtime_context: RuntimeContext):
        self.task_info = runtime_context.get_task_name_with_subtasks()
        self.task_index = runtime_context.get_index_of_this_subtask()
        self.attempt_number = runtime_context.get_attempt_number()

        descriptor = ValueStateDescriptor(
            "average",
            Types.INT(),
        )
        self.count = runtime_context.get_state(descriptor)
        print(f"Opened Producer {self.task_info} with index {self.task_index}")

    def flat_map(self, value):
        self.count.update((self.count.value() or 0) + 1)
        time.sleep(TIME_UNIT)
        print(f"count: {self.count.value()}, {value}")
        cnt = 0
        while cnt < 10000:
            cnt += 1
        yield value

def run_flink(env):
    start = time.perf_counter()
    number_source = NumberSequenceSource(0, NUM_TASKS)
    ds = env.from_source(
        source=number_source,
        watermark_strategy=WatermarkStrategy.for_monotonous_timestamps(),
        source_name="file_source",
        type_info=Types.LONG(),
    ).set_parallelism(1)
    ds = ds.key_by(lambda x: x % PRODUCER_PARALLELISM)
    ds = ds.flat_map(Producer(), output_type=Types.PICKLED_BYTE_ARRAY()).set_parallelism(
        PRODUCER_PARALLELISM
    )

    env.execute("Example")
    end = time.perf_counter()
    print(f"Time: {end - start:.4f}s")

def run_experiment():
    config = Configuration()
    config.set_string("restart-strategy.type", "fixed-delay")
    config.set_string("restart-strategy.fixed-delay.attempts", "3")
    config.set_string("restart-strategy.fixed-delay.delay", "10000ms")
    config.set_string("state.backend.type", "hashmap")
    config.set_string(
        "state.checkpoints.dir",
        "file:///home/ubuntu/.../flink/flink-checkpoints",
    )

    env = StreamExecutionEnvironment.get_execution_environment(config)
    env = env.enable_checkpointing(5000, CheckpointingMode.EXACTLY_ONCE)
    run_flink(env)

def main():
    run_experiment()

if __name__ == "__main__":
    main()

我正在使用

flink-1.20.0
。我在 Flink 仪表板上看到
Source: file_source -> _stream_key_by_map_operator
已被确认,但还没有
FlatMap

apache-flink flink-streaming pyflink
1个回答
0
投票

我认为主要问题是你的 flat_map 处理的每个元素都有 0.5 秒的延迟 (

time.sleep
)。这将产生显着的背压,从而延迟检查点屏障到达您的函数的时间。如果您的 flat_map 实际上需要花费那么多时间来处理每条记录,您可能需要启用未对齐的检查点。

另外,你的 flat_map 之后没有水槽。即使图中没有终端接收器,Flink 仍然会运行您的操作符,但为了清洁起见,至少在 flat_map 之后放置一个 DiscardingSink。

© www.soinside.com 2019 - 2024. All rights reserved.