我正在开发用户定义的 Flink MQTT 连接器。
https://github.com/yinjilong/StoneForests-flink-mqtt-connector
但是,当我尝试以
json
格式写入消息时遇到序列化问题。
在课堂上
public class MqttSinkFunction<T> extends RichSinkFunction<T>
我按如下方式重写 invoke
方法,
@Override
public void invoke(T event, Context context) {
if (log.isDebugEnabled()) {
log.debug("sink invoke...");
log.debug("message is {}", event);
}
try {
byte[] payload = this.serializer.serialize(event);
} catch (Exception e){
log.error("can not serialize event{} at {}",event,e);
}
try{
byte[] payload = this.serializer.serialize(event);
MqttMessage message = new MqttMessage(payload);
message.setQos(this.qos);
String[] topics = this.topics.split(",");
for (String topic : topics) {
if (log.isDebugEnabled()) {
log.debug("send message:[{}] to topic topic:[{}]. Exception.", message, topic);
}
this.client.publish(topic, message);
}
} catch(Exception e){
log.error("Cannot sink MQTT event {} at {}",event ,hostUrl,e);
}
}
我使用以下 SQL 来编写消息,
$ sql-client.sh
CREATE TABLE sink(
id INT,
name STRING
) WITH(
'connector' = 'mqtt',
'hostUrl' = 'tcp://localhost:1883',
'username' = '',
'password' = '',
'sinkTopics' = 'test/mytopic',
'format' = 'json'
);
INSERT INTO sink (id,name) VALUES(1,'Jeen');
看来来自
event(RowData)
的序列化失败了。
事件显示为
+I(1,Jeen)
我认为当我使用 JSON 格式时,内部序列化可以自动从给定的 sql 表模式推断出 JSON 模式。但它失败并抛出异常。
请帮帮我。谢谢你。
NPE 导致
event(RowData)
序列化失败,因为 org.apache.flink.formats.json.JsonRowDataSerializationSchema#mapper
在您的 com.nakata.flink.connectors.mqtt.table.MqttSinkFunction#serializer
实例中为 null。com.nakata.flink.connectors.mqtt.table.MqttSinkFunction#open
函数添加一行来修复它。
this.serializer.open(null);