我有一个拓扑结构,其中spout从Kafka读取数据并发送到bolt,然后依次调用REST API(A)和另一个REST API(B)。到目前为止,API B还没有节流。现在,他们已经实现了节流(每时钟分钟x最大呼叫数)。
我们需要实现节流处理程序。
选项A
起初,我们考虑在REST API(A)级别进行操作,并放置一个
Thread.sleep(x in millis)
一旦调用被REST API(B)限制
但是这将使所有的REST(A)呼叫等待那么长的时间(这将在1秒到59秒之间变化,并且可能会增加新呼叫的负载。
选项B
REST API(A)将有关节流的响应发送回Bolt。螺栓将处理失败通知Spout到
选项A可以直接实施,但我认为不是一个好的解决方案。
[试图弄清选项B是否可用于topology.max.spout.pending,但是如何动态地与Storm进行通信以限制节流。任何人都可以对这个选项分享一些想法。
选项C
REST API(B)限制来自REST(A)的调用,该调用将不处理该调用,但会将429响应代码发送到螺栓。螺栓会将消息重新排队到另一个风暴拓扑的错误主题部分。此消息可以包含重试计数,如果同一消息再次受到限制,我们可以使用++ retry count重新进行排队。
更新帖子,找到使选项B可行的解决方案。
选项D
/**
* The time stamp of the next retry is scheduled according to the exponential backoff formula (geometric progression):
* nextRetry = failCount == 1 ? currentTime + initialDelay : currentTime + delayPeriod^(failCount-1),
* where failCount = 1, 2, 3, ... nextRetry = Min(nextRetry, currentTime + maxDelay).
* <p/>
* By specifying a value for maxRetries lower than Integer.MAX_VALUE, the user decides to sacrifice guarantee of delivery for the
* previous polled records in favor of processing more records.
*
* @param initialDelay initial delay of the first retry
* @param delayPeriod the time interval that is the ratio of the exponential backoff formula (geometric progression)
* @param maxRetries maximum number of times a tuple is retried before being acked and scheduled for commit
* @param maxDelay maximum amount of time waiting before retrying
*
*/
public KafkaSpoutRetryExponentialBackoff(TimeInterval initialDelay, TimeInterval delayPeriod, int maxRetries, TimeInterval maxDelay) {
this.initialDelay = initialDelay;
this.delayPeriod = delayPeriod;
this.maxRetries = maxRetries;
this.maxDelay = maxDelay;
LOG.debug("Instantiated {}", this.toStringImpl());
}
步骤如下:
KafkaSpoutConfig.builder(kafkaBootStrapServers, topic).setRetry(kafkaSpoutRetryService)
collector.fail(tuple)
,它将根据第1步和第2步中的重试配置设置发出信号,通知spout再次处理元组。我有一个拓扑结构,其中spout从Kafka读取数据并发送到bolt,然后依次调用REST API(A)和另一个REST API(B)。到目前为止,API B还没有节流。现在他们有了...
您的选项D听起来不错,但是为了避免在对API A的调用中出现重复,我认为您应该考虑将拓扑分为两部分。