我正在使用.net core 3.1并使用合流的kafka lib
using Confluent.Kafka;
而且我正在实施kafka系统,同时创建生产者和消费者。
我知道我可以轻松地执行类似的操作来创建消息并将消息发送到生产者中的主题:
using (var producer = new ProducerBuilder<long, string>(config).Build())
{
for(var i=0; i<1000; i++)
{
var deliveryReport = await producer.ProduceAsync(kafkaTopic, new Message<long, string> { Key
= i, Value = "lorem ipsum "+i });
}
}
效果很好。
但是如果我想代替一个对象怎么办?以下无效:
using (var producer = new ProducerBuilder<long, User>(config).Build())
{
for(var i=0; i<1000; i++)
{
var user = new User() { Id = i, Name = "random" };
var deliveryReport = await producer.ProduceAsync(kafkaTopic, new Message<long, User> { Key
= i, Value = user });
}
}
缺少什么?我听说有一种方法可以做类似的事情,但找不到。
在那种情况下,您必须序列化对象。据我所知,JSON有效负载超出了您一直在使用的库的范围。
因此,您可以使用Avro(但需要Schema Registry)。这是一个example:
using Avro;
using Avro.Generic;
using Confluent.Kafka.SyncOverAsync;
using Confluent.SchemaRegistry.Serdes;
using Confluent.SchemaRegistry;
using System;
using System.Threading;
using System.Threading.Tasks;
namespace Confluent.Kafka.Examples.AvroGeneric
{
class Program
{
static async Task Main(string[] args)
{
if (args.Length != 3)
{
Console.WriteLine("Usage: .. bootstrapServers schemaRegistryUrl topicName");
return;
}
string bootstrapServers = args[0];
string schemaRegistryUrl = args[1];
string topicName = args[2];
string groupName = "avro-generic-example-group";
// var s = (RecordSchema)RecordSchema.Parse(File.ReadAllText("my-schema.json"));
var s = (RecordSchema)RecordSchema.Parse(
@"{
""namespace"": ""Confluent.Kafka.Examples.AvroSpecific"",
""type"": ""record"",
""name"": ""User"",
""fields"": [
{""name"": ""name"", ""type"": ""string""},
{""name"": ""favorite_number"", ""type"": [""int"", ""null""]},
{""name"": ""favorite_color"", ""type"": [""string"", ""null""]}
]
}"
);
CancellationTokenSource cts = new CancellationTokenSource();
var consumeTask = Task.Run(() =>
{
using (var schemaRegistry = new CachedSchemaRegistryClient(new SchemaRegistryConfig { Url = schemaRegistryUrl }))
using (var consumer =
new ConsumerBuilder<string, GenericRecord>(new ConsumerConfig { BootstrapServers = bootstrapServers, GroupId = groupName })
.SetKeyDeserializer(new AvroDeserializer<string>(schemaRegistry).AsSyncOverAsync())
.SetValueDeserializer(new AvroDeserializer<GenericRecord>(schemaRegistry).AsSyncOverAsync())
.SetErrorHandler((_, e) => Console.WriteLine($"Error: {e.Reason}"))
.Build())
{
consumer.Subscribe(topicName);
try
{
while (true)
{
try
{
var consumeResult = consumer.Consume(cts.Token);
Console.WriteLine($"Key: {consumeResult.Message.Key}\nValue: {consumeResult.Message.Value}");
}
catch (ConsumeException e)
{
Console.WriteLine($"Consume error: {e.Error.Reason}");
}
}
}
catch (OperationCanceledException)
{
// commit final offsets and leave the group.
consumer.Close();
}
}
});
using (var schemaRegistry = new CachedSchemaRegistryClient(new SchemaRegistryConfig { Url = schemaRegistryUrl }))
using (var producer =
new ProducerBuilder<string, GenericRecord>(new ProducerConfig { BootstrapServers = bootstrapServers })
.SetKeySerializer(new AvroSerializer<string>(schemaRegistry))
.SetValueSerializer(new AvroSerializer<GenericRecord>(schemaRegistry))
.Build())
{
Console.WriteLine($"{producer.Name} producing on {topicName}. Enter user names, q to exit.");
int i = 0;
string text;
while ((text = Console.ReadLine()) != "q")
{
var record = new GenericRecord(s);
record.Add("name", text);
record.Add("favorite_number", i++);
record.Add("favorite_color", "blue");
await producer
.ProduceAsync(topicName, new Message<string, GenericRecord> { Key = text, Value = record })
.ContinueWith(task => task.IsFaulted
? $"error producing message: {task.Exception.Message}"
: $"produced to: {task.Result.TopicPartitionOffset}");
}
}
cts.Cancel();
}
}
}