如果对流数据帧数据进行分组,在 Spark 结构化流中是否可以在单独的单个微批次中处理每个组?像这样的东西:
dfs = ...
dfs.groupBy(...).writestream...foreachbatch()
谢谢!
您无法在微批次中单独运行分组数据。如果您希望根据分组列将数据放在单独的文件夹中,可以使用下面的代码。
def batch_oper(df, epoch_id):
df.groupby(F.col("country")).count() \
.write.format("csv").mode("append") \
.partitionBy("country") \
.save("base_path_of_the_location")
df.writeStream \
.outputMode("append") \
.foreachBatch(batch_oper) \
.option("checkpointLocation", "base_path_of_the_location/checkpoint") \
.start()
或
def batch_oper(df, epoch_id):
df.write.format("csv").partitionBy("country").save("dbfs:/part_csvs1/")
streaming_df.groupBy(F.col("country")).count().writeStream.outputMode("complete").foreachBatch(batch_oper).option("checkpointLocation", "dbfs:/part_csvs/checkpoint").start()
输出:
在这里,它为每个国家/地区创建单独的文件夹并存储数据。
如果您想对分组数据执行自定义操作,PySpark 不支持用户定义的聚合函数,但您可以按照this解决方案中的方法进行操作。