我有一个 Kafka 生产者,它从两个大文件中读取数据并以具有相同结构的 JSON 格式发送它们:
def create_sample_json(row_id, data_file): return {'row_id':int(row_id), 'row_data': data_file}
生产者将每个文件分成小块,并从每个块创建 JSON 格式,最后在 for 循环中发送它们。
发送这两个文件的过程通过多线程同时发生。
我想从这些流(s1.row_id == s2.row_id)中进行join,并最终在我的生产者在 Kafka 上发送数据时进行一些流处理。因为生产者从多个来源生成大量数据,我迫不及待地要把它们全部消耗掉,而且必须同时完成。
我不确定 Table API 是否是一个好方法,但这是迄今为止我的 pyflink 代码:
from pyflink.datastream.stream_execution_environment import StreamExecutionEnvironment
from pyflink.table import EnvironmentSettings
from pyflink.table.expressions import col
from pyflink.table.table_environment import StreamTableEnvironment
KAFKA_SERVERS = 'localhost:9092'
def log_processing():
env = StreamExecutionEnvironment.get_execution_environment()
env.add_jars("file:///flink_jar/kafka-clients-3.3.2.jar")
env.add_jars("file:///flink_jar/flink-connector-kafka-1.16.1.jar")
env.add_jars("file:///flink_jar/flink-sql-connector-kafka-1.16.1.jar")
settings = EnvironmentSettings.new_instance() \
.in_streaming_mode() \
.build()
t_env = StreamTableEnvironment.create(stream_execution_environment=env, environment_settings=settings)
t1 = f"""
CREATE TEMPORARY TABLE table1(
row_id INT,
row_data STRING
) WITH (
'connector' = 'kafka',
'topic' = 'datatopic',
'properties.bootstrap.servers' = '{KAFKA_SERVERS}',
'properties.group.id' = 'MY_GRP',
'scan.startup.mode' = 'latest-offset',
'format' = 'json'
)
"""
t2 = f"""
CREATE TEMPORARY TABLE table2(
row_id INT,
row_data STRING
) WITH (
'connector' = 'kafka',
'topic' = 'datatopic',
'properties.bootstrap.servers' = '{KAFKA_SERVERS}',
'properties.group.id' = 'MY_GRP',
'scan.startup.mode' = 'latest-offset',
'format' = 'json'
)
"""
p1 = t_env.execute_sql(t1)
p2 = t_env.execute_sql(t2)
//请告诉我下一步应该做什么:
// 问题:
// 1) 我是否需要单独使用我的消费者类中的数据,然后将它们插入到这些表中,或者将从我们在这里实现的内容中消费数据(因为我传递了连接器、主题、bootstartap.servers 的名称)等...)?
// 2)如果是这样:
2.1) 如何在 Python 中从这些流中进行连接?
2.2)当我的 Producer 会发送数千条消息时,如何阻止之前的数据?我想确保不要进行重复查询。
// 3) 如果没有,我该怎么办?
非常感谢。
// 1) 我是否需要单独使用我的消费者类中的数据,然后将它们插入到这些表中,或者将从我们在这里实现的内容中消费数据(因为我传递了连接器、主题、bootstartap.servers 的名称)等...)?
后一个,数据将由我们实现的“kafka”表连接器消耗。并且您需要定义一个Sink表作为您插入的目标,Sink表可以是一个带有您想要输出的主题的kafka连接器表。
2.1) 如何在 Python 中从这些流中进行连接?
您可以编写 SQL 连接 table1 和 table2,然后在 Python 中插入到您的接收器表中
2.2)当我的 Producer 会发送数千条消息时,如何阻止之前的数据?我想确保不要进行重复查询。
您可以在“加入”之前或“插入”之前过滤这些消息,在您的情况下,“WHERE”子句就足够了