我正在Apache Kafka上做一些性能测试,以便与RabbitMQ和ActiveMQ等其他软件进行比较。我的想法是将它用于代理通信的消息系统。
我正在测试多个场景(一对一、广播和多对一),有不同数量的发布者和订阅者,因此负载也不同。即使在最低负载的情况下,10对代理发送500条消息,发送之间有1ms的延迟,我也遇到了非常高的延迟(平均约200ms)。如果我们增加到100对,这个数字就会上升到~1500ms。同样的事情发生在广播和多对一上。
我使用Windows的Kafka 2.12-2.5.0和zookeeper 3.6.1与C# .Net客户端Confluent.Kafka 1.4.2。根据我找到的一些帖子,我已经尝试了一些属性,比如LingerMs = 0。我的Kafka和zookeeper都有默认设置。
我做了一个简单的测试代码,但问题还是发生了。
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Confluent.Kafka;
namespace KafkaSetupAgain
{
class Program
{
static void Main(string[] args)
{
int numberOfMessages = 500;
int numberOfPublishers = 10;
int numberOfSubscribers = 10;
int timeOfRun = 30000;
List<MCVESubscriber> Subscribers = new List<MCVESubscriber>();
for (int i = 0; i < numberOfSubscribers; i++)
{
MCVESubscriber ZeroMqSubscriber = new MCVESubscriber();
new Thread(() =>
{
ZeroMqSubscriber.read(i.ToString());
}).Start();
Subscribers.Add(ZeroMqSubscriber);
}
Thread.Sleep(10000);//to make sure all subscribers started
for (int i = 0; i < numberOfPublishers; i++)
{
MCVEPublisher ZeroMqPublisherBroadcast = new MCVEPublisher();
new Thread(() =>
{
ZeroMqPublisherBroadcast.publish(numberOfMessages, i.ToString());
}).Start();
}
Thread.Sleep(timeOfRun);
foreach (MCVESubscriber Subscriber in Subscribers)
{
Subscriber.PrintMessages("file.csv");
}
}
public class MCVEPublisher
{
public void publish(int numberOfMessages, string topic)
{
var config = new ProducerConfig
{
BootstrapServers = "localhost:9092",
LingerMs = 0,
Acks = 0,
};
var producer = new ProducerBuilder<Null, string>(config).Build();
int success = 0;
int failure = 0;
Thread.Sleep(3500);
for (int i = 0; i < numberOfMessages; i++)
{
Thread.Sleep(1);
long milliseconds = System.Diagnostics.Stopwatch.GetTimestamp() / TimeSpan.TicksPerMillisecond;
var t = producer.ProduceAsync(topic, new Message<Null, string> { Value = milliseconds.ToString() });
t.ContinueWith(task => {
if (task.IsFaulted)
{
failure++;
}
else
{
success++;
}
});
}
Console.WriteLine("Success: " + success + " Failure:" + failure);
}
}
public class MCVESubscriber
{
private List<string> prints = new List<string>();
public void read(string topic)
{
var config = new ConsumerConfig()
{
BootstrapServers = "localhost:9092",
EnableAutoCommit = false,
FetchErrorBackoffMs = 1,
};
var consumerConfig = new ConsumerConfig(config);
consumerConfig.GroupId = Guid.NewGuid().ToString();
consumerConfig.AutoOffsetReset = AutoOffsetReset.Earliest;
consumerConfig.EnableAutoCommit = false;
using (var consumer = new ConsumerBuilder<Ignore, string>(config).Build())
{
consumer.Subscribe(new[] { topic });
while (true)
{
var consumeResult = consumer.Consume();
long milliseconds = System.Diagnostics.Stopwatch.GetTimestamp() / TimeSpan.TicksPerMillisecond;
prints.Add(consumeResult.Message.Value + ";" + milliseconds.ToString());
}
consumer.Close();
}
}
public void PrintMessages(string path)
{
Console.WriteLine("printing " + prints.Count);
File.AppendAllLines(path, prints);
}
}
}
}
有人知道问题出在哪里吗?我可以改变哪些配置来提高延迟?
谢谢。
Davide Costa
Kafka并不是真正为低延迟的消息分发而构建的,而是为了高可用性。 它可以被配置为具有更低的延迟,但你开始失去很多Kafka提供的优势。
下面说几个小技巧。
关于... KafkaProducer
方面,一般来说,你要等到有足够的消息可以发送时,才能更有效地批处理消息。 这就是 linger.ms
属性,你已经提到了。 通常情况下,这个属性被设置为50ms左右,所以把它设置为0,实际上就是告诉生产者,只要它收到数据,就会尽快发送数据。 这可能会让生产者更 "聊得来",但你可以保证它一拿到数据就会把数据发送到集群。
然而,一旦消息被 "生产 "到Kafka中,它就会等待,直到从下层得到经纪人成功接收消息的ACK。 这里有多种选择。
官方 acks
Kafka文档中的解释。
https:/kafka.apache.org25documentation.html#acks。
还有其他的设置需要考虑,比如kafka生产者压缩和broker压缩设置,这些设置可能会增加更多的延迟开销,但是如果你使用的是默认设置(没有生产者压缩和 producer
选项),在这些步骤中不应该有额外的延迟。
说了这么多,我建议你试着把 "压缩 "设置成 "延迟"。acks
选项为0,看看你的延迟有什么变化。 我猜你会得到更好的延迟。但是 同时也要明白,不能保证你的信息真的被正确接收和存储。 一个不稳定的网络、网络分区等,都可能导致你丢失数据。 对于你的用例来说,这可能是可以的,但只要确保你意识到这一点。