我在尝试从 PySpark 结构化流作业将聚合数据写入 MongoDB 时遇到问题。这是我的设置:
我有一个 Kafka 主题,我正在从中使用 JSON 消息。 我正在使用 PySpark 在结构化流作业中处理这些消息。 我在时间戳字段上定义了水印和窗口,以根据 1 分钟的窗口持续时间聚合数据。 我的聚合逻辑计算每个窗口内每个 gameId 的不同客户端的大致数量。 我正在尝试将这些聚合数据写入 MongoDB。 这是我的代码的相关部分:
schema = StructType([
StructField("gameType", StringType(), True),
StructField("clientId", StringType(), True),
StructField("gameId", StringType(), True),
StructField("timestamp", TimestampType(), True),
])
kafka_df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("subscribe", "test_topic").load()
json_df = kafka_df.selectExpr("CAST(value AS STRING)").select(from_json(col("value"), schema).alias("data")).select("data.*")
json_df_with_watermark = json_df.withWatermark("timestamp", "1 minute")
window_duration = "1 minute"
clients_per_gameId = json_df_with_watermark.groupBy(window("timestamp", window_duration), "gameId").agg(
approx_count_distinct("clientId").alias("client_count")
)
def write_to_mongo(df, epoch_id):
df = df.drop("window")
df.write \
.format("mongodb") \
.mode("append") \
.option("spark.mongodb.connection.uri", mongodb_uri) \
.option("spark.mongodb.database", "pyspark_test") \
.option("spark.mongodb.collection", "clients_per_gameId") \
.save()
query = clients_per_gameId.writeStream \
.foreachBatch(write_to_mongo) \
.option("checkpointLocation", checkpoint_dir) \
.start()
query.awaitTermination()
我面临的问题是,虽然原始 JSON 数据 (json_df_with_watermark) 已成功写入 MongoDB 集合,但聚合数据 (clients_per_gameId) 并未成功写入。我已确保聚合逻辑正确,并且在流作业执行期间没有报告错误。
任何关于为什么聚合数据没有写入 MongoDB 的见解或建议将不胜感激。
谢谢!
我通过在 writeStream 查询中显式将输出模式设置为“更新”或“追加”解决了该问题 像这样,这解决了问题:
query = clients_per_gameId.writeStream \
.foreachBatch(write_to_mongo) \
.option("checkpointLocation", checkpoint_dir) \
.outputMode("update") \
.start()