我在以下情况下无法创建回调函数:kafka使用者可以收听新消息以将其记录在数据库中:
suspend fun consumerClient(service: ClientService) {
val messages = consumerCommands(
"create-client", "localhost:9092", "consumer-client", false,
OffsetBehaviour.Earliest, 10
)
}
suspend fun consumerCommands(
topic: String,
bootstrapServers: String,
group: String,
autoCommit: Boolean,
offsetBehaviour: OffsetBehaviour,
pollMax: Int
) {
val consumer = consumer(bootstrapServers, group, autoCommit, offsetBehaviour, pollMax)
consumer.subscribe(mutableListOf(topic))
while (true) {
val records = consumer.poll(Duration.ofMillis(1000))
if (records.count() > 0) {
records.forEach {
val entity = treeToValue(it.value().get("message"), Client::class.java) as Client
ClientService().insert(entity)
}
}
}
}
完美。但是我正在尝试创建一些更通用的东西,如下所示:
interface KafkaConsumer<T> {
fun execute(callback: (T) -> Unit)
}
suspend fun <T> consumerCommand(
topic: String,
bootstrapServers: String,
group: String,
autoCommit: Boolean,
offsetBehaviour: OffsetBehaviour,
pollMax: Int,
callback: KafkaConsumer<T>
): ConsumerRecords<String, JsonNode>? {
val consumer = consumer(bootstrapServers, group, autoCommit, offsetBehaviour, pollMax)
consumer.subscribe(mutableListOf(topic))
while (true) {
val records = consumer.poll(Duration.ofMillis(1000))
if (records.count() > 0) {
records.forEach {
val entity = (treeToValue(it.value().get("message"), Any::class.java) as T)
coroutineScope {
callback.execute { entity }
}
}
}
}
}
suspend fun consumerClient(service: ClientService) {
val messages = consumerCommand<Client>(
"create-client", "localhost:9092", "consumer-client", false,
OffsetBehaviour.Earliest, 10, {client: Client -> ClientService().insert(client)}
)
}
但是它不起作用。有人可以帮忙吗?
您正在尝试使用预期为{ client: Client -> ... }
的lambda KafkaConsumer
。相反,您需要使用函数类型。您的KafkaConsumer
等于((T) -> Unit) -> Unit
,但我怀疑这是一个错误,您实际上想要的是
suspend fun <T> consumerCommand(
topic: String,
bootstrapServers: String,
group: String,
autoCommit: Boolean,
offsetBehaviour: OffsetBehaviour,
pollMax: Int,
callback: (T) -> Unit
): ConsumerRecords<String, JsonNode>? {
...
coroutineScope {
callback(entity)
}
}
}
}
}
甚至suspend (T) -> Unit
。
旁注:在service
函数中不使用consumerClient
参数,而是创建一个新的ClientService
;那是故意的吗?