我正在尝试运行以下代码:
CREATE TABLE IF NOT EXISTS some_source_table
(
myField1 VARCHAR,
myField2 VARCHAR
) WITH (
'connector' = 'kafka',
'topic' = 'demo',
'properties.bootstrap.servers' = '***',
'properties.group.id' = 'some-id-1',
'scan.startup.mode' = 'latest-offset',
'format' = 'json',
'json.timestamp-format.standard' = 'ISO-8601',
'scan.topic-partition-discovery.interval'= '60000',
'json.fail-on-missing-field' = 'false',
'json.ignore-parse-errors' = 'true',
'properties.security.protocol' = 'SASL_SSL',
'properties.sasl.mechanism' = 'PLAIN',
'properties.sasl.jaas.config' = 'org.apache.kafka.common.security.plain.PlainLoginModule required username=*** password=***;',
'properties.ssl.endpoint.identification.algorithm' = 'https'
);
CREATE TABLE IF NOT EXISTS some_sink_table
(
myField1 VARCHAR,
myField2 VARCHAR,
PRIMARY KEY (`myField1`) NOT ENFORCED
) WITH (
'connector' = 'upsert-kafka',
'topic' = 'demosink',
'properties.bootstrap.servers' = '***',
'key.format' = 'json',
'value.format' = 'json',
'properties.security.protocol' = 'SASL_SSL',
'properties.sasl.mechanism' = 'PLAIN',
'properties.sasl.jaas.config' = 'org.apache.kafka.common.security.plain.PlainLoginModule required username=*** password=***;',
'properties.ssl.endpoint.identification.algorithm' = 'https'
);
INSERT INTO some_sink_table SELECT * FROM some_source_table;
我可以看到我正在 Confluence 平台中消费数据,但是没有数据生成到 demosink 主题。
我尝试向here给出的示例添加依赖项。
我尝试添加以下依赖项:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>3.1.0-1.18</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-sql-connector-kafka</artifactId>
<version>3.1.0-1.18</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.12</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
我使用Flink版本1.19.1。我是否缺少某些依赖项或者我的某些依赖项不兼容?我还尝试复制 flink-sql-connector-kafka 和 将 flink-clients .jar 文件放入 dockerfile 内的 lib 文件夹中。