above代码用于读取循环中所有EventHub的数据,并采用联合来获取单个数据框架。之后,进行了一些简单的转换。最后,通过在不使用Spark CheckPointing的情况下使用偏移信息更新JSON文件,从而保存了偏移。
连续执行1.5小时后,有任何方法可以解决预防降解吗?
添加更多的EventHub作为源将导致执行时间增加,即使数据量很小。如何解决这个问题?代码等待5分钟以在存在任何连接问题时读取EventHub。如何将超时时间减少到几秒钟?i试图在集群环境变量配置中分配G1GC垃圾收集器,但没有改进。
persist()
unpersist
相似,您在pyspark中做到了。
streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
batchDF.persist()
batchDF.write.format(...).save(...) // location 1
batchDF.write.format(...).save(...) // location 2
batchDF.unpersist()
}
贝洛是示例代码,相应地对其进行更改。
ThreadPoolExecutor
here,from concurrent.futures import ThreadPoolExecutor
def read_eventhub(config):
return spark.read.format("eventhubs").options(**config).load().withColumn("eh_ky", f.lit(key))
with ThreadPoolExecutor() as executor:
dataframes = list(executor.map(lambda cfg: read_eventhub(*cfg), eventhub_configs))
df = reduce(lambda df1, df2: df1.union(df2), dataframes)
要减少超时设置
eventhub_configs
和receiverTimeout