在我的Scala(2.11)流应用程序中,我正在使用IBM MQ中一个队列中的数据,并将其写入具有一个分区的Kafka主题。在使用了来自MQ的数据之后,消息有效负载被拆分为3000个较小的消息,这些消息存储在字符串序列中。然后,使用KafkaProducer将这3000条消息中的每条消息发送到Kafka(2.x版)。
您将如何发送这3000条消息?
我既不能增加IBM MQ中的队列数(不受我的控制),也不能增加主题中的分区数(消息的顺序是必需的,编写自定义分区程序将影响该主题的太多使用者)。
生产者设置当前为:
但是优化它们可能只是一个问题,而不是我当前问题的一部分。
当前正在执行
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
private lazy val kafkaProducer: KafkaProducer[String, String] = new KafkaProducer[String, String](someProperties)
val messages: Seq[String] = Seq(String1, …, String3000)
for (msg <- messages) {
val future = kafkaProducer.send(new ProducerRecord[String, String](someTopic, someKey, msg))
val recordMetadata = future.get()
}
对我来说,这似乎不是最优雅,最有效的方法。是否有编程方式来提高吞吐量?
感谢答案使我指向正确的方向,因此我仔细研究了不同的Producer方法。 《 Kafka-权威指南》一书列出了这些方法:
一劳永逸]我们会向服务器发送一条消息,并不关心消息是否成功到达。大多数时候,它会成功到达,因为Kafka的可用性很高,并且生产者将重试自动发送消息。但是,使用此方法会丢失一些消息。
同步发送
我们发送一条消息,send()方法返回一个Future对象,然后使用get()等待将来,看看send()是否成功。异步发送
我们使用回调函数调用send()方法,该函数会在其回调时触发收到Kafka经纪人的回复现在我的代码看起来像这样(省去了错误处理和Callback类的定义):
val asyncProducer = new KafkaProducer[String, String](someProperties) for (msg <- messages) { val record = new ProducerRecord[String, String](someTopic, someKey, msg) asyncProducer.send(record, new compareProducerCallback) } asyncProducer.flush()
我已经比较了10000条非常小的消息的所有方法。这是我的测量结果:
即发即弃:173683464ns
同步发送:29195039875ns
异步发送:44153826ns
老实说,通过选择正确的属性(batch.size,linger.ms等),可能有更多的潜力来优化所有属性。
在我的Scala(2.11)流应用程序中,我正在使用IBM MQ中一个队列中的数据,并将其写入具有一个分区的Kafka主题。从MQ消耗数据后,消息有效负载就会得到...
我能看到您的代码运行缓慢的最大原因是,您在等待每个发送将来。