Flink 用户定义的Sink Connector 无法将数据序列化为json 格式

问题描述 投票:0回答:1

我正在开发用户定义的 Flink MQTT 连接器。

https://github.com/yinjilong/StoneForests-flink-mqtt-connector

但是,当我尝试以

json
格式写入消息时遇到序列化问题。

在课堂上

public class MqttSinkFunction<T> extends RichSinkFunction<T> 
我按如下方式重写
invoke
方法,


    @Override
    public void invoke(T event, Context context) {
        if (log.isDebugEnabled()) {
            log.debug("sink invoke...");
            log.debug("message is {}", event);
        }

        try {
            byte[] payload = this.serializer.serialize(event);
        } catch (Exception e){
            log.error("can not serialize event{} at {}",event,e);
        }

        try{
            byte[]  payload = this.serializer.serialize(event);
            MqttMessage message = new MqttMessage(payload);
            message.setQos(this.qos);
            String[] topics = this.topics.split(",");
            for (String topic : topics) {
                if (log.isDebugEnabled()) {
                    log.debug("send message:[{}] to topic topic:[{}].  Exception.", message, topic);
                }
                    this.client.publish(topic, message);
            }
        } catch(Exception e){
                log.error("Cannot sink MQTT event {} at {}",event ,hostUrl,e);
        }

    }


我使用以下 SQL 来编写消息,

$ sql-client.sh

CREATE TABLE sink(
     id INT,
     name STRING
) WITH(
  'connector' = 'mqtt',
  'hostUrl' = 'tcp://localhost:1883',
  'username' = '',
  'password' = '',
  'sinkTopics' = 'test/mytopic',
  'format' = 'json'
 );

INSERT INTO sink (id,name) VALUES(1,'Jeen');

看来来自

event(RowData)
的序列化失败了。 事件显示为
+I(1,Jeen)

我认为当我使用 JSON 格式时,内部序列化可以自动从给定的 sql 表模式推断出 JSON 模式。但它失败并抛出异常。

请帮帮我。谢谢你。

serialization apache-flink connector sink
1个回答
0
投票

NPE 导致

event(RowData)
序列化失败,因为
org.apache.flink.formats.json.JsonRowDataSerializationSchema#mapper
在您的
com.nakata.flink.connectors.mqtt.table.MqttSinkFunction#serializer
实例中为 null。
您可以通过向
com.nakata.flink.connectors.mqtt.table.MqttSinkFunction#open
函数添加一行来修复它。

this.serializer.open(null);
© www.soinside.com 2019 - 2024. All rights reserved.