带有 Confluence.Kafka 的 Azure 事件中心。我可以生产但不能消费

问题描述 投票:0回答:1

我正在解决一个我无法理解的问题。 通过这样的配置,我可以成功生成一条消息;

 var config = new ProducerConfig
 {
     BootstrapServers = bootstrapServers,
     SecurityProtocol = SecurityProtocol.SaslSsl,
     SaslMechanism = SaslMechanism.Plain,
     SaslUsername = saslUsername,
     SaslPassword = saslPassword
 };

 using (var producer = new ProducerBuilder<Null, string>(config).Build())
 {
     Console.WriteLine("Enter a message to send to the topic (or 'exit' to quit):");
     string message;
     while ((message = Console.ReadLine()) != "exit")
     {
         try
         {
             var result = await producer.ProduceAsync(topic, new Message<Null, string> { Value = message });
             Console.WriteLine($"Message '{message}' sent to topic '{topic}' at offset {result.Offset}");
         }
         catch (ProduceException<Null, string> e)
         {
             Console.WriteLine($"Delivery failed: {e.Error.Reason}");
         }
     }
 }

它可以正常工作,没有任何问题。我可以转到我的 Azure 门户并查看我写的每条消息。

然后我对消费者(另一个像这样的控制台应用程序)做同样的事情:

   var config = new ConsumerConfig
   {
       BootstrapServers = bootstrapServers,
       SecurityProtocol = SecurityProtocol.SaslSsl,
       SaslMechanism = SaslMechanism.Plain,
       SaslUsername = saslUsername,
       SaslPassword = saslPassword,
       GroupId = consumerGroup,
       SslEndpointIdentificationAlgorithm = SslEndpointIdentificationAlgorithm.None, // Disable endpoint identification
       //SessionTimeoutMs = 6000,  // To reduce disconnection frequency
       //MaxPollIntervalMs = 10000,
       //SocketKeepaliveEnable = true,
       Debug = "all", // Enables detailed logs
       AutoOffsetReset = AutoOffsetReset.Latest,
       EnableAutoCommit = true,
       SocketKeepaliveEnable = true
   };

...
    consumer.Subscribe(topic);

    CancellationTokenSource cts = new CancellationTokenSource();
    Console.CancelKeyPress += (_, e) =>
    {
        e.Cancel = true;
        cts.Cancel();
    };

    try
    {
        while (true)
        {
            try
            {
                var consumeResult = consumer.Consume(cts.Token);
                string logMessage = $"Consumed message '{consumeResult.Message.Value}' at: '{consumeResult.TopicPartitionOffset}'.";

从我这样做的那一刻起

consumer.Consume(cts.Token),
我没有收到任何消息。 我深入研究了日志,我看到的唯一问题(如果它们是问题的话)是这样的:

[Confluent.Kafka] Debug: [thrd:main]: Topic anothertest [2]: stored offset INVALID (leader epoch -1), committed offset INVALID (leader epoch -1): not including in commit[Confluent.Kafka] Debug: [thrd:main]: anothertest [1]: skipping offset validation for offset 7 (leader epoch -1): no leader epoch set
[Confluent.Kafka] Debug: [thrd:main]: anothertest [1]: skipping offset validation for offset 7 (leader epoch -1): no leader epoch set
[Confluent.Kafka] Debug: [thrd:main]: Topic anothertest [2]: broker is down: re-query

我猜问题出在“阅读”中的某个地方,但我不明白是什么,我尝试了所有可能的组合来看看发生了什么,但没有运气。

对于接下来要尝试什么有什么想法或建议吗?

c# apache-kafka confluent-kafka-dotnet
1个回答
0
投票

我找到了一个运行良好的“解决方法”,我希望它可以帮助其他人。我使用Python版本测试了Confluence.Kafka库,对比日志后,我注意到Python版本使用client.software.version 2.5.3,而.NET版本使用2.6.0。

由于两者可能都是同一核心库的包装器,因此我在 .NET 项目中切换到 2.5.3 NuGet 包,一切都开始完美运行!

2.6.0 版本似乎引入了一些更改(可能与 TLS 相关),导致与事件中心的通信失败。我打算开个bug。

我希望这可以帮助别人

© www.soinside.com 2019 - 2024. All rights reserved.