使用pyspark批处理读取几个小时后,使用pyspark批处理读取挂起挂起的数据,重复执行

问题描述 投票:0回答:1

above代码用于读取循环中所有EventHub的数据,并采用联合来获取单个数据框架。之后,进行了一些简单的转换。最后,通过在不使用Spark CheckPointing的情况下使用偏移信息更新JSON文件,从而保存了偏移。

连续执行1.5小时后,有任何方法可以解决预防降解吗?

添加更多的EventHub作为源将导致执行时间增加,即使数据量很小。如何解决这个问题?
代码等待5分钟以在存在任何连接问题时读取EventHub。如何将超时时间减少到几秒钟?

i试图在集群环境变量配置中分配G1GC垃圾收集器,但没有改进。
  1. below是您可能可以尝试的方法。
  2. persist()
  3. 将数据存储在内存中的数据,如果您不明确,随着时间的流逝,记忆将增加使用情况。
  4. themere是样本Scala代码。
unpersist

相似,您在pyspark中做到了。
azure pyspark azure-databricks azure-eventhub
1个回答
0
投票
在阅读多个EventHub时循环循环,您可以使用

streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) => batchDF.persist() batchDF.write.format(...).save(...) // location 1 batchDF.write.format(...).save(...) // location 2 batchDF.unpersist() }

  1. 贝洛是示例代码,相应地对其进行更改。
    
    ThreadPoolExecutor
    here,
  2. from concurrent.futures import ThreadPoolExecutor def read_eventhub(config): return spark.read.format("eventhubs").options(**config).load().withColumn("eh_ky", f.lit(key)) with ThreadPoolExecutor() as executor: dataframes = list(executor.map(lambda cfg: read_eventhub(*cfg), eventhub_configs)) df = reduce(lambda df1, df2: df1.union(df2), dataframes)
是您需要的列表EventHubs源。

要减少超时设置
eventhub_configs

receiverTimeout

配置,请检查此
    github文档
  1. 以获取更多信息。
        
最新问题
© www.soinside.com 2019 - 2025. All rights reserved.