当使用Lenses MQTT源连接器和汇合的kafka时,消息流断断续续。

问题描述 投票:1回答:1

我正在尝试使用Lenses MQTT源连接器[。https:/docs.lens.ioconnectorssourcemqtt.html] 与conluent kafka v5.4。

以下是我的MQTT源连接器属性文件。

connector.class=com.datamountaineer.streamreactor.connect.mqtt.source.MqttSourceConnector
connect.mqtt.clean=false
key.converter.schemas.enable=false
connect.mqtt.timeout=1000
value.converter.schemas.enable=false
name=kmd-source-4
connect.mqtt.kcql=INSERT INTO kafka-source-topic-2   SELECT * FROM ctt/+/+/location WITHCONVERTER=`com.datamountaineer.streamreactor.connect.converters.source.JsonSimpleConverter` WITHKEY(id)
value.converter=org.apache.kafka.connect.json.JsonConverter
connect.mqtt.service.quality=1
key.converter=org.apache.kafka.connect.json.JsonConverter
connect.mqtt.hosts=tcp://ip:1883
connect.mqtt.converter.throw.on.error=true
connect.mqtt.username=username
connect.mqtt.password=password
errors.log.include.messages=true
errors.log.enable=true

我从基于UI的MQTT客户端MQTT fx发布消息到MQTT主题'ctt++location',并将这些消息订阅到kafka主题'kafka-source-topic-2'上.我使用Rabbit MQ作为我的MQTT经纪人,我的汇流平台和RabbitMQ在不同的虚拟机上。我认为使用RabbitMQ broker代替Mosquitto MQTT应该没有问题。无论何时何地,只要我从MQTT fx发布,我都能成功地在MQTT fx中看到订阅后的消息。我也设置了confleunt MongoDB源连接器,并且它无缝地工作。

但我的问题是--在MQTT主题上发布的消息在映射的kafka主题上是断断续续的。可能是什么原因?我在kafka连接日志中没有看到任何错误信息。是否有任何关于MQTT broker的连接相关属性需要我在MQTT源属性文件中指定?Rabbit MQ broker中有没有什么属性是一定要包含的?有没有人使用过Lenses MQTT源和汇的连接器,并想对它们提出什么建议?

apache-kafka rabbitmq mqtt confluent-platform
1个回答
0
投票

你的connect.mqtt.timeout只有1秒吗!? 断断续续的消息给我的感觉是,你的连接器超时了,必须重新建立连接,而当它忙着这样做的时候,MQTT消息进来了,但没有传到连接器上,因为它在那个实例中没有订阅到broker。 试着把超时时间增加到60000(1分钟),看看会发生什么。您是否有任何需要它超时的原因? RabbitMQ 可以处理长时间保持开放且无流量的连接。

© www.soinside.com 2019 - 2024. All rights reserved.