如何在 Apache Beam 步骤中并行化 HTTP 请求?

问题描述 投票:0回答:2

我有一个在 Google Dataflow 上运行的 Apache Beam 管道,其工作相当简单:

  • 它从 Pub/Sub 读取单个 JSON 对象
  • 解析它们
  • 并通过 HTTP 将它们发送到某个 API

这个 API 要求我批量发送 75 个项目。所以我构建了一个

DoFn
,它将事件累积在列表中,并在收到 75 个事件后通过此 API 发布它们。这结果太慢了,所以我想改为使用线程池在不同线程中执行这些 HTTP 请求。

我现在的实现如下所示:

private class WriteFn : DoFn<TheEvent, Void>() {
  @Transient var api: TheApi

  @Transient var currentBatch: MutableList<TheEvent>

  @Transient var executor: ExecutorService

  @Setup
  fun setup() {
    api = buildApi()
    executor = Executors.newCachedThreadPool()
  }

  @StartBundle
  fun startBundle() {
    currentBatch = mutableListOf()
  }

  @ProcessElement
  fun processElement(processContext: ProcessContext) {
    val record = processContext.element()

    currentBatch.add(record)

    if (currentBatch.size >= 75) {
      flush()
    }
  }

  private fun flush() {
    val payloadTrack = currentBatch.toList()
    executor.submit {
      api.sendToApi(payloadTrack)
    }
    currentBatch.clear()
  }

  @FinishBundle
  fun finishBundle() {
    if (currentBatch.isNotEmpty()) {
      flush()
    }
  }

  @Teardown
  fun teardown() {
    executor.shutdown()
    executor.awaitTermination(30, TimeUnit.SECONDS)
  }
}

从数据发送到 API 的意义上来说,这似乎工作得“很好”。但我不知道这是否是正确的方法,而且我感觉这非常慢。

我认为速度慢的原因是,在负载测试时(通过向 Pub/Sub 发送几百万个事件),管道将这些消息转发到 API(其响应时间为低于 8 毫秒)比我的笔记本电脑将它们输入 Pub/Sub 还要快。

我的实现有什么问题吗?这是我应该做的吗?

另外...我是否需要等待所有请求在我的

@FinishBundle
方法中完成(即通过获取执行者返回的 future 并等待它们)?

google-cloud-dataflow apache-beam
2个回答
4
投票

您这里有两个相互关联的问题:

  1. 您做得对吗/您需要改变什么吗?
  2. 需要在
    @FinishBundle
    等候吗?

第二个答案:是的。但实际上你需要更彻底地冲洗,这一点将会变得清楚。

一旦您的

@FinishBundle
方法成功,Beam runner 将假定捆绑包已成功完成。但您的
@FinishBundle
仅发送请求 - 它并不能确保它们已成功。因此,如果请求随后失败,您可能会丢失数据。您的
@FinishBundle
方法实际上应该阻塞并等待
TheApi
的成功确认。顺便说一句,上述所有内容都应该是幂等的,因为完成捆绑后,地震可能会发生并导致重试;-)

回答第一个问题:你应该改变什么吗?就以上这些。只要您确定在提交捆绑包之前提交了结果,以这种方式批处理请求的做法就可以发挥作用。

您可能会发现这样做会导致管道变慢,因为

@FinishBundle
@Setup
发生得更频繁。要跨捆绑包批量请求,您需要使用状态和计时器的较低级别功能。我在 https://beam.apache.org/blog/2017/08/28/timely-processing.html 编写了您的用例的人为版本。我对这对你有何作用很感兴趣。

可能只是当您的管道中存在持久的洗牌时,您所期望的极低延迟(低毫秒范围)不可用。


0
投票

很晚才回答:

这个API要求我批量发送75个项目。

查看 PTransforms:

分组批量

WebAPI

© www.soinside.com 2019 - 2024. All rights reserved.