我正在尝试为 Flink 流作业创建 JUnit 测试,该作业将数据写入 kafka 主题并分别使用
FlinkKafkaProducer09
和 FlinkKafkaConsumer09
从同一 kafka 主题读取数据。我在产品中传递测试数据:
DataStream<String> stream = env.fromElements("tom", "jerry", "bill");
并检查相同的数据是否来自消费者:
List<String> expected = Arrays.asList("tom", "jerry", "bill");
List<String> result = resultSink.getResult();
assertEquals(expected, result);
使用
TestListResultSink
.
通过打印流,我能够按预期看到来自消费者的数据。但无法获得 Junit 测试结果,因为即使在消息完成后消费者仍将继续运行。所以它没有来测试部分。
在
Flink
或FlinkKafkaConsumer09
中有什么方法可以停止进程或运行特定时间吗?
潜在的问题是流媒体程序通常不是有限的并且可以无限期地运行。
最好的方法,至少目前,是在你的流中插入一个特殊的控制消息,让源正确终止(通过离开读取循环简单地停止读取更多数据)。这样 Flink 会告诉所有下游的操作者,他们可以在消费完所有数据后停止。
或者,您可以在源代码中抛出一个特殊的异常(例如,一段时间后),这样您就可以区分“正确”终止和失败案例(通过检查错误原因)。在源中抛出异常将使程序失败。
在您的测试中,您可以在单独的线程中开始作业执行,等待一段时间以允许它进行数据处理,取消线程(它将中断作业)并进行断言。
CompletableFuture<Void> handle = CompletableFuture.runAsync(() -> {
try {
environment.execute(jobName);
} catch (Exception e) {
e.printStackTrace();
}
});
try {
handle.get(seconds, TimeUnit.SECONDS);
} catch (TimeoutException e) {
handle.cancel(true); // this will interrupt the job execution thread, cancel and close the job
}
// Make assertions here
你不能在 Deserializer 中使用 isEndOfStream 覆盖来停止从 Kafka 获取数据吗?如果我没看错的话,flink/Kafka09Fetcher 的 run 方法中有以下代码会中断事件循环
if (deserializer.isEndOfStream(value)) {
// end of stream signaled
running = false;
break;
}
我的想法是结合 isEndOfStream 方法使用 Till Rohrmann 的控制消息想法来告诉 KafkaConsumer 停止阅读。
有什么不可行的原因吗?或者我忽略了一些极端情况?
关注@TillRohrman
如果使用 EmbeddedKafka 实例,您可以结合特殊的异常方法并在单元测试中处理它,然后读取 EmbeddedKafka 主题并断言消费者值。
我发现 https://github.com/asmaier/mini-kafka/blob/master/src/test/java/de/am/KafkaProducerIT.java 在这方面非常有用。
唯一的问题是你会丢失触发异常的元素,但你总是可以调整你的测试数据来解决这个问题。
我建立在@Mariusz W.给出的答案的基础上,通过使用
Awaitility
在本地测试期间运行我的Flink应用程序直到满足某些条件,这应该缩短测试的持续时间:
private static void runFlinkUntilConditionIsTrue() {
CompletableFuture<Void> handle = CompletableFuture.runAsync(() -> {
try {
env.execute();
} catch (Exception e) {
e.printStackTrace();
}
});
try {
Awaitility
.await()
.atMost(Duration.ofSeconds(30L)) // max duration for which to check the condition before giving up
.until(
// poll until some condition is met (or until max duration runs out)
// for me, this was getRecordsInLocalKinesisOutputTopic() == x
() -> someCondition()
);
} catch (ConditionTimeoutException e) {
System.out.println("Condition not met in time");
} finally {
try {
handle.get(0, TimeUnit.SECONDS);
} catch (TimeoutException | ExecutionException | InterruptedException e) {
// this will interrupt the job execution thread, cancel and close the job
handle.cancel(true);
}
}
}