我有一个在 Google Dataflow 上运行的 Apache Beam 管道,其工作相当简单:
这个 API 要求我批量发送 75 个项目。所以我构建了一个
DoFn
,它将事件累积在列表中,并在收到 75 个事件后通过此 API 发布它们。这结果太慢了,所以我想改为使用线程池在不同线程中执行这些 HTTP 请求。
我现在的实现如下所示:
private class WriteFn : DoFn<TheEvent, Void>() {
@Transient var api: TheApi
@Transient var currentBatch: MutableList<TheEvent>
@Transient var executor: ExecutorService
@Setup
fun setup() {
api = buildApi()
executor = Executors.newCachedThreadPool()
}
@StartBundle
fun startBundle() {
currentBatch = mutableListOf()
}
@ProcessElement
fun processElement(processContext: ProcessContext) {
val record = processContext.element()
currentBatch.add(record)
if (currentBatch.size >= 75) {
flush()
}
}
private fun flush() {
val payloadTrack = currentBatch.toList()
executor.submit {
api.sendToApi(payloadTrack)
}
currentBatch.clear()
}
@FinishBundle
fun finishBundle() {
if (currentBatch.isNotEmpty()) {
flush()
}
}
@Teardown
fun teardown() {
executor.shutdown()
executor.awaitTermination(30, TimeUnit.SECONDS)
}
}
从数据发送到 API 的意义上来说,这似乎工作得“很好”。但我不知道这是否是正确的方法,而且我感觉这非常慢。
我认为速度慢的原因是,在负载测试时(通过向 Pub/Sub 发送几百万个事件),管道将这些消息转发到 API(其响应时间为低于 8 毫秒)比我的笔记本电脑将它们输入 Pub/Sub 还要快。
我的实现有什么问题吗?这是我应该做的吗?
另外...我是否需要等待所有请求在我的
@FinishBundle
方法中完成(即通过获取执行者返回的 future 并等待它们)?
您这里有两个相互关联的问题:
@FinishBundle
等候吗?第二个答案:是的。但实际上你需要更彻底地冲洗,这一点将会变得清楚。
一旦您的
@FinishBundle
方法成功,Beam runner 将假定捆绑包已成功完成。但您的 @FinishBundle
仅发送请求 - 它并不能确保它们已成功。因此,如果请求随后失败,您可能会丢失数据。您的 @FinishBundle
方法实际上应该阻塞并等待 TheApi
的成功确认。顺便说一句,上述所有内容都应该是幂等的,因为完成捆绑后,地震可能会发生并导致重试;-)
回答第一个问题:你应该改变什么吗?就以上这些。只要您确定在提交捆绑包之前提交了结果,以这种方式批处理请求的做法就可以发挥作用。
您可能会发现这样做会导致管道变慢,因为
@FinishBundle
比@Setup
发生得更频繁。要跨捆绑包批量请求,您需要使用状态和计时器的较低级别功能。我在 https://beam.apache.org/blog/2017/08/28/timely-processing.html 编写了您的用例的人为版本。我对这对你有何作用很感兴趣。
可能只是当您的管道中存在持久的洗牌时,您所期望的极低延迟(低毫秒范围)不可用。