如何从程序中停止 flink 流作业

问题描述 投票:0回答:5

我正在尝试为 Flink 流作业创建 JUnit 测试,该作业将数据写入 kafka 主题并分别使用

FlinkKafkaProducer09
FlinkKafkaConsumer09
从同一 kafka 主题读取数据。我在产品中传递测试数据:

DataStream<String> stream = env.fromElements("tom", "jerry", "bill");

并检查相同的数据是否来自消费者:

List<String> expected = Arrays.asList("tom", "jerry", "bill");
List<String> result =  resultSink.getResult();
assertEquals(expected, result);

使用

TestListResultSink
.

通过打印流,我能够按预期看到来自消费者的数据。但无法获得 Junit 测试结果,因为即使在消息完成后消费者仍将继续运行。所以它没有来测试部分。

Flink
FlinkKafkaConsumer09
中有什么方法可以停止进程或运行特定时间吗?

junit apache-kafka apache-flink flink-streaming
5个回答
9
投票

潜在的问题是流媒体程序通常不是有限的并且可以无限期地运行。

最好的方法,至少目前,是在你的流中插入一个特殊的控制消息,让源正确终止(通过离开读取循环简单地停止读取更多数据)。这样 Flink 会告诉所有下游的操作者,他们可以在消费完所有数据后停止。

或者,您可以在源代码中抛出一个特殊的异常(例如,一段时间后),这样您就可以区分“正确”终止和失败案例(通过检查错误原因)。在源中抛出异常将使程序失败。


3
投票

在您的测试中,您可以在单独的线程中开始作业执行,等待一段时间以允许它进行数据处理,取消线程(它将中断作业)并进行断言。

CompletableFuture<Void> handle = CompletableFuture.runAsync(() -> {
    try {
        environment.execute(jobName);
    } catch (Exception e) {
        e.printStackTrace();
    }
});
try {
    handle.get(seconds, TimeUnit.SECONDS);
} catch (TimeoutException e) {
    handle.cancel(true); // this will interrupt the job execution thread, cancel and close the job
}

// Make assertions here

1
投票

你不能在 Deserializer 中使用 isEndOfStream 覆盖来停止从 Kafka 获取数据吗?如果我没看错的话,flink/Kafka09Fetcher 的 run 方法中有以下代码会中断事件循环

    if (deserializer.isEndOfStream(value)) {
                        // end of stream signaled
                        running = false;
                        break;
                    }

我的想法是结合 isEndOfStream 方法使用 Till Rohrmann 的控制消息想法来告诉 KafkaConsumer 停止阅读。

有什么不可行的原因吗?或者我忽略了一些极端情况?

https://github.com/apache/flink/blob/07de86559d64f375d4a2df46d320fc0f5791b562/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/ Kafka09Fetcher.java#L146


0
投票

关注@TillRohrman

如果使用 EmbeddedKafka 实例,您可以结合特殊的异常方法并在单元测试中处理它,然后读取 EmbeddedKafka 主题并断言消费者值。

我发现 https://github.com/asmaier/mini-kafka/blob/master/src/test/java/de/am/KafkaProducerIT.java 在这方面非常有用。

唯一的问题是你会丢失触发异常的元素,但你总是可以调整你的测试数据来解决这个问题。


0
投票

我建立在@Mariusz W.给出的答案的基础上,通过使用

Awaitility
在本地测试期间运行我的Flink应用程序直到满足某些条件,这应该缩短测试的持续时间:

  • https://www.baeldung.com/awaitility-testing
  • 我的 Flink 应用程序正在将数据下沉到 Kinesis 数据流(使用 localstack 测试容器在本地模拟),所以我写了下面的代码来运行 Flink 应用程序,直到“x”条记录进入输出运动数据流
  • 就我个人而言,我发现这比装配某种机制来发送某种控制消息要容易得多
    private static void runFlinkUntilConditionIsTrue() {
        CompletableFuture<Void> handle = CompletableFuture.runAsync(() -> {
            try {
                env.execute();
            } catch (Exception e) {
                e.printStackTrace();
            }
        });

        try {
            Awaitility
                    .await()
                    .atMost(Duration.ofSeconds(30L)) // max duration for which to check the condition before giving up
                    .until(
                            // poll until some condition is met (or until max duration runs out)
                            // for me, this was getRecordsInLocalKinesisOutputTopic() == x
                            () -> someCondition()
                    );
        } catch (ConditionTimeoutException e) {
            System.out.println("Condition not met in time");
        } finally {
            try {
                handle.get(0, TimeUnit.SECONDS);
            } catch (TimeoutException | ExecutionException | InterruptedException e) {
                // this will interrupt the job execution thread, cancel and close the job
                handle.cancel(true);
            }
        }
    }
© www.soinside.com 2019 - 2024. All rights reserved.