我正在解决一个我无法理解的问题。 通过这样的配置,我可以成功生成一条消息;
var config = new ProducerConfig
{
BootstrapServers = bootstrapServers,
SecurityProtocol = SecurityProtocol.SaslSsl,
SaslMechanism = SaslMechanism.Plain,
SaslUsername = saslUsername,
SaslPassword = saslPassword
};
using (var producer = new ProducerBuilder<Null, string>(config).Build())
{
Console.WriteLine("Enter a message to send to the topic (or 'exit' to quit):");
string message;
while ((message = Console.ReadLine()) != "exit")
{
try
{
var result = await producer.ProduceAsync(topic, new Message<Null, string> { Value = message });
Console.WriteLine($"Message '{message}' sent to topic '{topic}' at offset {result.Offset}");
}
catch (ProduceException<Null, string> e)
{
Console.WriteLine($"Delivery failed: {e.Error.Reason}");
}
}
}
它可以正常工作,没有任何问题。我可以转到我的 Azure 门户并查看我写的每条消息。
然后我对消费者(另一个像这样的控制台应用程序)做同样的事情:
var config = new ConsumerConfig
{
BootstrapServers = bootstrapServers,
SecurityProtocol = SecurityProtocol.SaslSsl,
SaslMechanism = SaslMechanism.Plain,
SaslUsername = saslUsername,
SaslPassword = saslPassword,
GroupId = consumerGroup,
SslEndpointIdentificationAlgorithm = SslEndpointIdentificationAlgorithm.None, // Disable endpoint identification
//SessionTimeoutMs = 6000, // To reduce disconnection frequency
//MaxPollIntervalMs = 10000,
//SocketKeepaliveEnable = true,
Debug = "all", // Enables detailed logs
AutoOffsetReset = AutoOffsetReset.Latest,
EnableAutoCommit = true,
SocketKeepaliveEnable = true
};
...
consumer.Subscribe(topic);
CancellationTokenSource cts = new CancellationTokenSource();
Console.CancelKeyPress += (_, e) =>
{
e.Cancel = true;
cts.Cancel();
};
try
{
while (true)
{
try
{
var consumeResult = consumer.Consume(cts.Token);
string logMessage = $"Consumed message '{consumeResult.Message.Value}' at: '{consumeResult.TopicPartitionOffset}'.";
从我这样做的那一刻起
consumer.Consume(cts.Token),
我没有收到任何消息。
我深入研究了日志,我看到的唯一问题(如果它们是问题的话)是这样的:
[Confluent.Kafka] Debug: [thrd:main]: Topic anothertest [2]: stored offset INVALID (leader epoch -1), committed offset INVALID (leader epoch -1): not including in commit[Confluent.Kafka] Debug: [thrd:main]: anothertest [1]: skipping offset validation for offset 7 (leader epoch -1): no leader epoch set
[Confluent.Kafka] Debug: [thrd:main]: anothertest [1]: skipping offset validation for offset 7 (leader epoch -1): no leader epoch set
[Confluent.Kafka] Debug: [thrd:main]: Topic anothertest [2]: broker is down: re-query
我猜问题出在“阅读”中的某个地方,但我不明白是什么,我尝试了所有可能的组合来看看发生了什么,但没有运气。
对于接下来要尝试什么有什么想法或建议吗?
我找到了一个运行良好的“解决方法”,我希望它可以帮助其他人。我使用Python版本测试了Confluence.Kafka库,对比日志后,我注意到Python版本使用client.software.version 2.5.3,而.NET版本使用2.6.0。
由于两者可能都是同一核心库的包装器,因此我在 .NET 项目中切换到 2.5.3 NuGet 包,一切都开始完美运行!
2.6.0 版本似乎引入了一些更改(可能与 TLS 相关),导致与事件中心的通信失败。我打算开个bug。
我希望这可以帮助别人