我已将 databricks 集群升级到 10.4 LTS 到 12.2 LTS,并且我们使用集群的方式发生了重大变化。
对于某些上下文,我们在将连接到 Databricks 集群的 Azure 机器学习 VM 上部署 python 代码。
我们有一个在算法之间共享的集群。这样所有的python运行都可以访问同一个集群并水平扩展。它对于多个用户很有用,还可以降低我们的成本。
为了进一步降低成本,我们决定将
terminated after X minutes of inactivity
选项设置为 10 分钟。但我们的一些算法需要 Spark 会话保持活动时间超过 10 分钟。大多数时候,我们将 DF 写入临时表或文件中,然后加载它。但有时我们最好保持会话打开以保持 DF 的活力。
为此,我们每 9 分钟向集群创建一个线程
ping
。
if keep_alive:
# Keep Spark alive in a separate thread
self._keep_spark_alive_thread = threading.Thread(
target=self._keep_spark_alive,
daemon=True,
)
self._keep_spark_alive_thread.start()
def _keep_spark_alive(self):
"""
Keeps the Spark session alive by running a dummy job and sleeping for a specified interval.
This method runs an infinite loop that periodically executes a dummy job on the Spark session
to prevent it from being terminated due to inactivity. It sleeps for a specified interval between
each execution of the dummy job.
Returns:
None
"""
while True:
# Run a dummy job to keep Spark alive
try:
logging.debug("Keeping Spark alive...")
self._spark.sql(f"SELECT '{self.project_name}'").collect()
# Sleep for 9 minutes
# time.sleep(9 * 60)
time.sleep(20) # 20 seconds for debug
except Exception as e:
logging.debug(f"Error keeping Spark alive: {e}")
在 10.4 LTS 集群中使用此线程,发送 Spark sql 并使集群保持活动状态。 但在 12.2 LTS 中,发送了 Spark sql,但它忽略了
activity
,如果 10 分钟内没有其他查询/操作,集群就会关闭。
我尝试将睡眠时间降低到 20 秒,结果发生了。每隔 20 秒我就可以在 Spark 日志中看到该活动,但 10 分钟后,它开始关闭。 10分20秒,由于线程的原因,它再次启动。但由于集群重新启动,我失去了 Spark 会话。
我的调试日志:
Keeping Spark alive...
Keeping Spark alive...
24/09/30 13:54:05 WARN SparkServiceRPCClient: The cluster seems to be down. A
24/09/30 13:54:06 WARN SparkServiceRPCClient: Cluster xxxx-xxxxxx-xxxxxxxx in
24/09/30 13:54:16 WARN SparkServiceRPCClient: Cluster xxxx-xxxxxx-xxxxxxxx in
24/09/30 13:54:26 WARN SparkServiceRPCClient: Cluster xxxx-xxxxxx-xxxxxxxx in
24/09/30 13:54:37 WARN SparkServiceRPCClient: Cluster xxxx-xxxxxx-xxxxxxxx in
24/09/30 13:54:47 WARN SparkServiceRPCClient: Cluster xxxx-xxxxxx-xxxxxxxx in
24/09/30 13:54:57 WARN SparkServiceRPCClient: Cluster xxxx-xxxxxx-xxxxxxxx in
Error keeping Spark alive: requirement failed: Result for RPC Some(87d40863-d
Keeping Spark alive...
Keeping Spark alive...
你知道为什么
spark.sql(f"SELECT '{self.project_name}'").collect()
不能作为一项活动吗?新版本的 pyspark 是否知道查询是相同的,因此保存在缓存中?
是的,您的查询可能会被缓存并且不被视为活动,因为您每次都运行相同的查询,不需要再次重新计算。
此外,自动终止火花作业检查。如果10分钟内没有spark作业,它会自动终止。
因此,在工作流程中创建一个简单的笔记本作业并每 9 分钟触发一次。
databricks_instance = "https://axyz..syx10.azuredatabricks.net"
access_token = "dapib....."
job_id = "142342612795035"
url = f"{databricks_instance}/api/2.1/jobs/run-now"
headers = {
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json"
}
payload = {
"job_id": job_id
}
def _keep_spark_alive():
while True:
try:
logger.info("Keeping Spark alive...")
response = requests.post(url, headers=headers, data=json.dumps(payload))
if response.status_code == 200:
print("Job triggered successfully.")
print("Response:", response.json())
else:
print("Failed to trigger job.")
print("Status Code:", response.status_code)
print("Response:", response.text)
time.sleep(60*3)
except Exception as e:
logging.info(f"Error keeping Spark alive: {e}")
输出:
这里,每 3 分钟就会触发一次作业并保持集群处于活动状态。
或者,您可以使用 Command Execution API
在集群中每 9 分钟运行一次 python 或 sql 命令