安排 Akka Streams 中的 Kafka Consumer 在一天中的特定时间运行

问题描述 投票:0回答:1

我使用 Akka Streams 编写了一个 Kafka 消费者:

RestartSource.withBackoff(consumerResetProps(),
                () -> Consumer.committablePartitionedSource(consumerProps(), Subscriptions.topics(topics))
                        .mapAsyncUnordered(parallelism, pair -> pair.second()
                                .via(flow())
                                .runWith(Committer.sink(commiterProps()), system)))
        .toMat(Sink.ignore(), Keep.both())
        .run(system);

我希望该消费者仅在一天中的特定时间(例如,凌晨 1 点到凌晨 4 点)处于活动状态,并在一天中的其余时间关闭。 Akka Streams 是否提供内置方法来安排此类行为,或者是否有推荐的模式来实现此目的?

akka akka-stream
1个回答
0
投票

Akka Streams 中没有任何内容,但您可以使用

ActorSystem
scheduler (
system.scheduler
) 来安排流在一定持续时间后运行,因此在应用程序启动时,您将确定流的数量时间直到凌晨 1 点,然后安排流运行。

Alpakka Kafka 流默认会永远运行,但您可以使用流具体化为的

Control
对象(请注意,如果使用重新启动逻辑包装,每次重新启动都会具体化一个新的
Control
与此类似的东西 将是必需):调用
drainAndShutdown
将导致消费者不再向 Kafka 请求任何消息,并等待所有正在处理的消息。 您还可以使用调度程序将
drainAndShutdown
通话推迟到凌晨 4 点。

请注意,通过将所有这些状态封装在一个 Actor 中(甚至可能是一个持久 Actor,可能作为集群单例运行?),可以变得更详细并涵盖更多故障场景,但这对于单实例应用程序来说应该足够了(或者,如果您不介意集群中的每个节点都尝试开始消费(在这种情况下,Kafka 消费者组可以处理组织)),那么您可以合理地期望 Kubernetes 之类的东西重新启动。

© www.soinside.com 2019 - 2024. All rights reserved.