如何将 JSON 字符串序列化为 JSON 对象,以在合流云 SQS 连接器中进行模式注册表验证?

问题描述 投票:0回答:1

我正在尝试做的事情:

  • 将数据从我的应用程序(Ruby on Rails)发送到 SQS。
  • 在 Clonfluence Cloud 中创建 SQS Source Connector 以从队列中提取数据。

我有一个模式注册表,我希望在保存之前检查我的消息。 它可以与设置为 JSON_SR 的输出记录值格式一起使用

我的问题是从 SQS 拉取时的消息格式。我只能将字符串作为消息正文发送到 SQS,但我需要 JSON。目前,body 字段中的所有消息都只是 JSON 消息的字符串表示形式。

我发送的数据,序列化为JSON字符串,因为SQS无法接受messageBody中的JSON

{
      "username": "name",
      "property": "test prop",
      "created_utc_date": "2024-04-18T08:00:00Z"
}

从 SQS 中提取值时填充了很多字段,但我们可以使用 SMT 来提取我的消息所在的“正文”。提取的消息只是一个 JSON 字符串

"{\"username\":\"name\",\"property\":\"test prop\",\"created_utc_date\":\"2024-04-18T08:00:00Z\"}" 
由于出现错误,因此无法保存。

“连接器无法向架构注册表注册新架构,因为它与同一主题的现有架构不兼容。”

我查看了很多可能的解决方案、SMT、转换器,但我没有设法让任何东西发挥作用,有没有一种方法可以在某个时候以某种方式反序列化该数据,以便连接器可以摄取它?

我唯一发现的是这个https://www.confluence.io/hub/redhatinsights/expandjsonsmt/但是该插件无法添加到Confluence Cloud。

ruby-on-rails amazon-sqs confluent-cloud
1个回答
0
投票

所以我没能做到这一点。然而,我也做了类似的事情。 由于我找不到使用连接器以正确的格式将所需的消息发送到 Kafka 中正确主题的方法,因此我放弃了它并将其替换为 AWS Lambda 函数。 AWS Lambda 现在由 SQS 触发。使用 lambda,我可以像将消息发送到 Kafka 之前一样格式化、序列化和更改消息。

如果其他人想尝试使用这种解决方案,这里是我的 Lambda 代码

const { Kafka } = require('kafkajs');
const { SchemaRegistry } = require('@kafkajs/confluent-schema-registry');

exports.handler = async function (event, context) {
    const eventRecords = event.Records;
    await Promise.all(eventRecords.map(async (record) => {
        const message = JSON.parse(record.body);
        const topic = message.topic;
        await publishToKafka(message, topic);
    }));
}

publishToKafka = async function (message, kafkaTopic) {
    const kafkaBrokers = [process.env.KAFKA_DEV_CLUSTER];
    const schemaRegistryUrl = process.env.SCHEMA_REGISTRY_URL;

    const registry = new SchemaRegistry({
        host: schemaRegistryUrl,
        auth: {
          username: process.env.SCHEMA_REGISTRY_DEV_API_KEY,
          password: process.env.SCHEMA_REGISTRY_DEV_API_SECRET,
        },
      })
    // Just the way we name schemas
    const schemaName = kafkaTopic + '-value';
    const ssl = true;

    const kafka = new Kafka({
        brokers: kafkaBrokers,
        ssl,
        sasl: {
            mechanism: 'plain',
            username: process.env.KAFKA_DEV_API_KEY,
            password: process.env.KAFKA_DEV_API_SECRET,
        },
        securityProtocol: 'SASL_SSL'
    });

    const producer = kafka.producer();
    await producer.connect();

    try {
        const id = await registry.getLatestSchemaId(schemaName);
        const encodedPayload = await registry.encode(id, message)

        await producer.send({
            topic: kafkaTopic,
            messages: [{ value: encodedPayload }]
        });

        console.log('Message sent to Kafka');
        return 'Message sent to Kafka';
    } catch (error) {
        console.error('Error sending message to Kafka:', error);
        return 'Error sending message to Kafka';
    } finally {
        await producer.disconnect();
    }
};

所以最终我的工作流程非常简单,将消息发送到 SQS,然后 lambda 函数处理这些消息并将它们发送到 kafka。

© www.soinside.com 2019 - 2024. All rights reserved.