使用Kotlin创建回调函数

问题描述 投票:0回答:1

我在以下情况下无法创建回调函数:kafka使用者可以收听新消息以将其记录在数据库中:

suspend fun consumerClient(service: ClientService) {
    val messages = consumerCommands(
        "create-client", "localhost:9092", "consumer-client", false,
        OffsetBehaviour.Earliest, 10
    )
}

suspend fun consumerCommands(
    topic: String,
    bootstrapServers: String,
    group: String,
    autoCommit: Boolean,
    offsetBehaviour: OffsetBehaviour,
    pollMax: Int
) {
    val consumer = consumer(bootstrapServers, group, autoCommit, offsetBehaviour, pollMax)
    consumer.subscribe(mutableListOf(topic))
    while (true) {
        val records = consumer.poll(Duration.ofMillis(1000))
        if (records.count() > 0) {
            records.forEach {
                val entity = treeToValue(it.value().get("message"), Client::class.java) as Client
                ClientService().insert(entity)
            }
        }
    }
}

完美。但是我正在尝试创建一些更通用的东西,如下所示:

interface KafkaConsumer<T> {
    fun execute(callback: (T) -> Unit)
}

suspend fun <T> consumerCommand(
    topic: String,
    bootstrapServers: String,
    group: String,
    autoCommit: Boolean,
    offsetBehaviour: OffsetBehaviour,
    pollMax: Int,
    callback: KafkaConsumer<T>
): ConsumerRecords<String, JsonNode>? {
    val consumer = consumer(bootstrapServers, group, autoCommit, offsetBehaviour, pollMax)
    consumer.subscribe(mutableListOf(topic))
    while (true) {
        val records = consumer.poll(Duration.ofMillis(1000))
        if (records.count() > 0) {
            records.forEach {
                val entity = (treeToValue(it.value().get("message"), Any::class.java) as T)
                coroutineScope {
                    callback.execute { entity }
                }
            }
        }
    }
}


suspend fun consumerClient(service: ClientService) {
    val messages = consumerCommand<Client>(
        "create-client", "localhost:9092", "consumer-client", false,
        OffsetBehaviour.Earliest, 10, {client: Client -> ClientService().insert(client)}
    )
}

但是它不起作用。有人可以帮忙吗?

kotlin callback
1个回答
0
投票

您正在尝试使用预期为{ client: Client -> ... }的lambda KafkaConsumer。相反,您需要使用函数类型。您的KafkaConsumer等于((T) -> Unit) -> Unit,但我怀疑这是一个错误,您实际上想要的是

suspend fun <T> consumerCommand(
    topic: String,
    bootstrapServers: String,
    group: String,
    autoCommit: Boolean,
    offsetBehaviour: OffsetBehaviour,
    pollMax: Int,
    callback: (T) -> Unit
): ConsumerRecords<String, JsonNode>? {
    ...
                coroutineScope {
                    callback(entity)
                }
            }
        }
    }
}

甚至suspend (T) -> Unit

旁注:在service函数中不使用consumerClient参数,而是创建一个新的ClientService;那是故意的吗?

© www.soinside.com 2019 - 2024. All rights reserved.