从 PySpark 结构化流将聚合数据写入 MongoDB 的问题

问题描述 投票:0回答:1

我在尝试从 PySpark 结构化流作业将聚合数据写入 MongoDB 时遇到问题。这是我的设置:

我有一个 Kafka 主题,我正在从中使用 JSON 消息。 我正在使用 PySpark 在结构化流作业中处理这些消息。 我在时间戳字段上定义了水印和窗口,以根据 1 分钟的窗口持续时间聚合数据。 我的聚合逻辑计算每个窗口内每个 gameId 的不同客户端的大致数量。 我正在尝试将这些聚合数据写入 MongoDB。 这是我的代码的相关部分:


schema = StructType([
    StructField("gameType", StringType(), True),
    StructField("clientId", StringType(), True),
    StructField("gameId", StringType(), True),
    StructField("timestamp", TimestampType(), True),
])

kafka_df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("subscribe", "test_topic").load()

json_df = kafka_df.selectExpr("CAST(value AS STRING)").select(from_json(col("value"), schema).alias("data")).select("data.*")

json_df_with_watermark = json_df.withWatermark("timestamp", "1 minute")

window_duration = "1 minute"

clients_per_gameId = json_df_with_watermark.groupBy(window("timestamp", window_duration), "gameId").agg(
    approx_count_distinct("clientId").alias("client_count")
)

def write_to_mongo(df, epoch_id):
    df = df.drop("window")
    df.write \
        .format("mongodb") \
        .mode("append") \
        .option("spark.mongodb.connection.uri", mongodb_uri) \
        .option("spark.mongodb.database", "pyspark_test") \
        .option("spark.mongodb.collection", "clients_per_gameId") \
        .save()

query = clients_per_gameId.writeStream \
    .foreachBatch(write_to_mongo) \
    .option("checkpointLocation", checkpoint_dir) \
    .start()

query.awaitTermination()


我面临的问题是,虽然原始 JSON 数据 (json_df_with_watermark) 已成功写入 MongoDB 集合,但聚合数据 (clients_per_gameId) 并未成功写入。我已确保聚合逻辑正确,并且在流作业执行期间没有报告错误。

任何关于为什么聚合数据没有写入 MongoDB 的见解或建议将不胜感激。

谢谢!

mongodb pyspark apache-kafka-streams spark-structured-streaming
1个回答
0
投票

我通过在 writeStream 查询中显式将输出模式设置为“更新”或“追加”解决了该问题 像这样,这解决了问题:

query = clients_per_gameId.writeStream \
.foreachBatch(write_to_mongo) \
.option("checkpointLocation", checkpoint_dir) \
.outputMode("update") \
.start()
最新问题
© www.soinside.com 2019 - 2025. All rights reserved.