我正在运行一个应用程序,该应用程序充当通过 RabbitMQ 和 Kafka 发送消息的消费者。它不产生任何东西。当应用程序运行时,我注意到通过 Kafka 接收到的消息也被推送到 RabbitMQ 中。这可以避免吗?我可以配置 MassTransit,使 Kafka 消息不推送到 RabbitMQ 吗?
这是我当前的设置
if (!options.RabbitMq.Enabled)
return;
busConfigurator.SetKebabCaseEndpointNameFormatter();
busConfigurator.AddConsumer<RabbitMqConsumer, RabbitMqConsumerDefinition>();
busConfigurator.UsingRabbitMq((context, busFactoryConfigurator) =>
{
busFactoryConfigurator.Host(new Uri(options.RabbitMq.Endpoint));
busFactoryConfigurator.ConfigureEndpoints(context);
});
if (!options.Kafka.Enabled)
return;
var schemaRegistryConfig = new SchemaRegistryConfig
{
Url = options.Kafka.SchemaRegistryUrl
};
var schemaRegistryClient = new CachedSchemaRegistryClient(schemaRegistryConfig);
busConfigurator.AddSingleton<ISchemaRegistryClient>(schemaRegistryClient);
busConfigurator.AddRider(rider =>
{
rider.AddConsumer<KafkaMyMessageConsumer>();
rider.UsingKafka((context, kafkaConfigurator) =>
{
kafkaConfigurator.ClientId = options.Kafka.ClientId;
kafkaConfigurator.Host(options.Kafka.Endpoint, configureHost =>
{
// ssl, sasl configuration
});
kafkaConfigurator.TopicEndpoint<string, MyMessage>(options.Kafka.Topic, options.Kafka.GroupId, endpointConfigurator =>
{
endpointConfigurator.CreateIfMissing(t => t.NumPartitions = 1);
endpointConfigurator.SetValueDeserializer(new ProtobufDeserializer<MyMessage>().AsSyncOverAsync());
endpointConfigurator.AutoOffsetReset = AutoOffsetReset.Earliest;
endpointConfigurator.ConfigureConsumer<KafkaMyMessageConsumer>(context);
});
});
});
编辑: 为什么我对 RabbitMQ 和 Kafka 感到困惑,我看到以下日志消息
2023-12-20 23:19:51 [22:19:51 DBG] Endpoint Ready: rabbitmq://my-rabbitmq/kafka/my-kafka-topic/my-service
2023-12-20 23:19:51 [22:19:51 INF] Bus started: rabbitmq://my-rabbitmq/
2023-12-20 23:19:51 [22:19:51 DBG] Partition: my-kafka-topic [[0]] with -1001 was assigned to: my-service-e470f4c3-4581-4a7d-8787-591389c2150b
2023-12-20 23:19:51 [22:19:51 DBG] RECEIVE rabbitmq://my-rabbitmq/kafka/my-kafka-topic null Foo.V1.MyMessage Bar.KafkaMyMessageConsumer(00:00:00.0014020)
我发现 RabbitMQ 以某种方式与 Kafka-Topic 相关联;但是 RabbitMQ 上没有为 Kafka-Topic 创建交换
我的“修复”是使用
AddMassTransit
两次,使用 IKafkaBus
和 IRabbitMqBus
并为 Kafka-MassTransit-Integration 调用 busConfigurator.UsingInMemory();
(这会产生类似的日志消息,但使用 loopback
uri 方案)
在同一总线实例上从 Kafka 和 RabbitMQ 进行消费工作完全正常,并且 Kafka 和 RabbitMQ 之间没有交互或消息交换。
我不确定你的建议是什么,但你的假设是不正确的。乘客根本不乘坐公交车。