使用 kubernetes 运算符通过 Flink SQL 向 kafka 主题插入数据

问题描述 投票:0回答:1

我正在尝试运行以下代码:

CREATE TABLE IF NOT EXISTS some_source_table
(
    myField1     VARCHAR,
    myField2      VARCHAR
) WITH (
    'connector' = 'kafka',
    'topic' = 'demo',
    'properties.bootstrap.servers' = '***',
    'properties.group.id' = 'some-id-1',
    'scan.startup.mode' = 'latest-offset',
    'format' = 'json',
    'json.timestamp-format.standard' = 'ISO-8601',
    'scan.topic-partition-discovery.interval'= '60000',
    'json.fail-on-missing-field' = 'false',
    'json.ignore-parse-errors' = 'true',
    'properties.security.protocol' = 'SASL_SSL',
    'properties.sasl.mechanism' = 'PLAIN',
    'properties.sasl.jaas.config' = 'org.apache.kafka.common.security.plain.PlainLoginModule required username=*** password=***;',
    'properties.ssl.endpoint.identification.algorithm' = 'https'
);

CREATE TABLE IF NOT EXISTS some_sink_table
(
    myField1     VARCHAR,
    myField2      VARCHAR,
    PRIMARY KEY (`myField1`) NOT ENFORCED
) WITH (
    'connector' = 'upsert-kafka',
    'topic' = 'demosink',
    'properties.bootstrap.servers' = '***',
    'key.format' = 'json',
    'value.format' = 'json',
    'properties.security.protocol' = 'SASL_SSL',
    'properties.sasl.mechanism' = 'PLAIN',
    'properties.sasl.jaas.config' = 'org.apache.kafka.common.security.plain.PlainLoginModule required username=*** password=***;',
    'properties.ssl.endpoint.identification.algorithm' = 'https'
);

INSERT INTO some_sink_table SELECT * FROM some_source_table;

我可以看到我正在 Confluence 平台中消费数据,但是没有数据生成到 demosink 主题。

我尝试向here给出的示例添加依赖项。

我尝试添加以下依赖项:

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka</artifactId>
            <version>3.1.0-1.18</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-sql-connector-kafka</artifactId>
            <version>3.1.0-1.18</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner_2.12</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>

我使用Flink版本1.19.1。我是否缺少某些依赖项或者我的某些依赖项不兼容?我还尝试复制 flink-sql-connector-kafka 和 将 flink-clients .jar 文件放入 dockerfile 内的 lib 文件夹中。

apache-flink
1个回答
0
投票

问题出在

properties.sasl.jaas.config
这个

的重复
© www.soinside.com 2019 - 2024. All rights reserved.