我正在尝试在 Lambda 函数内创建一个 Kafka Producer,并启用 Exactly-Once Delivery 支持以将消息推送到 MSK。
编辑:MSK IAM Auth 用于 Kafka 和客户端之间的安全协议
但是,即使(我认为)我已经正确设置了所有配置,Producer 仍然无法将消息写入 MSK。
生产者在调用 init_transactions() 时挂起,并循环输出以下调试消息:
%7|1708348719.300|TXNCOORD|lambda#producer-1| [thrd:main]: Unable to query for transaction coordinator: Coordinator query timer: No brokers available for Transactions (3 broker(s) known)
2024-02-19T14:18:39.338+01:00 %7|1708348719.338|CONNECT|lambda#producer-1| [thrd:TxnCoordinator]: TxnCoordinator: broker in state TRY_CONNECT connecting
2024-02-19T14:18:39.338+01:00 %7|1708348719.338|CONNECT|lambda#producer-1| [thrd:TxnCoordinator]: TxnCoordinator: broker has no address yet: postponing connect
2024-02-19T14:18:39.800+01:00 %7|1708348719.800|CONNECT|lambda#producer-1| [thrd:main]: Cluster connection already in progress: acquire ProducerID
2024-02-19T14:18:39.800+01:00 %7|1708348719.800|PIDBROKER|lambda#producer-1| [thrd:main]: No brokers available for Transactions (3 broker(s) known)
我尝试更改代理的数量(从 2 到 4 - 不起作用),尝试设置 transaction.state.log.replication.factor、transaction.state.log.min.isr 的值,offsets.topic.replication.factor(即使将它们全部设置为 1 - 也没有帮助)。 此线程的建议也没有帮助Amazon MSK 默认配置和事务发布的问题
我的 AWS MSK 集群有以下配置和设置:
集群配置:
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=2
offsets.topic.replication.factor=3
min.insync.replicas=2
default.replication.factor=3
auto.create.topics.enable=true
num.io.threads=8
num.network.threads=2
num.partitions=1
num.replica.fetchers=2
replica.lag.time.max.ms=30000
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
socket.send.buffer.bytes=102400
zookeeper.session.timeout.ms=18000
生产者配置:
{
"client.id": "some_id",
"acks": "all",
"enable.idempotence": "true",
"transactional.id": "123",
}
附注如果我不使用 ack、enable.idempotence 和 transactional.id - Producer 工作正常,但它违背了首先提出此问题的目的。
UPD:在挖掘 MSK 经纪商的日志后,似乎没有建立连接 - 不知道为什么,特别是因为事务性和非事务性生产者的身份验证方法是相同的,而对于非事务性生产者,连接是建立得很好..
MSK 的 AWS IAM Auth 会以某种方式干扰调用 init_transactions(),因为不使用它而仅使用 PLAINTEXT 就可以正常工作。 不确定为什么这不能与 IAM Auth 一起使用,也许其他人可以提供建议。目前,此用例无法使用 AWS IAM Auth。