我使用 Akka Streams 编写了一个 Kafka 消费者:
RestartSource.withBackoff(consumerResetProps(),
() -> Consumer.committablePartitionedSource(consumerProps(), Subscriptions.topics(topics))
.mapAsyncUnordered(parallelism, pair -> pair.second()
.via(flow())
.runWith(Committer.sink(commiterProps()), system)))
.toMat(Sink.ignore(), Keep.both())
.run(system);
我希望该消费者仅在一天中的特定时间(例如,凌晨 1 点到凌晨 4 点)处于活动状态,并在一天中的其余时间关闭。 Akka Streams 是否提供内置方法来安排此类行为,或者是否有推荐的模式来实现此目的?
Akka Streams 中没有任何内容,但您可以使用
ActorSystem
的 scheduler (system.scheduler
) 来安排流在一定持续时间后运行,因此在应用程序启动时,您将确定流的数量时间直到凌晨 1 点,然后安排流运行。
Alpakka Kafka 流默认会永远运行,但您可以使用流具体化为的
Control
对象(请注意,如果使用重新启动逻辑包装,每次重新启动都会具体化一个新的 Control
:与此类似的东西 将是必需):调用 drainAndShutdown
将导致消费者不再向 Kafka 请求任何消息,并等待所有正在处理的消息。 您还可以使用调度程序将 drainAndShutdown
通话推迟到凌晨 4 点。
请注意,通过将所有这些状态封装在一个 Actor 中(甚至可能是一个持久 Actor,可能作为集群单例运行?),可以变得更详细并涵盖更多故障场景,但这对于单实例应用程序来说应该足够了(或者,如果您不介意集群中的每个节点都尝试开始消费(在这种情况下,Kafka 消费者组可以处理组织)),那么您可以合理地期望 Kubernetes 之类的东西重新启动。