Databricks 自定义保持活动集群

问题描述 投票:0回答:1

我已将 databricks 集群升级到 10.4 LTS 到 12.2 LTS,并且我们使用集群的方式发生了重大变化。

对于某些上下文,我们在将连接到 Databricks 集群的 Azure 机器学习 VM 上部署 python 代码。

我们有一个在算法之间共享的集群。这样所有的python运行都可以访问同一个集群并水平扩展。它对于多个用户很有用,还可以降低我们的成本。

为了进一步降低成本,我们决定将

terminated after X minutes of inactivity
选项设置为 10 分钟。但我们的一些算法需要 Spark 会话保持活动时间超过 10 分钟。大多数时候,我们将 DF 写入临时表或文件中,然后加载它。但有时我们最好保持会话打开以保持 DF 的活力。

为此,我们每 9 分钟向集群创建一个线程

ping

    if keep_alive:
        # Keep Spark alive in a separate thread
        self._keep_spark_alive_thread = threading.Thread(
            target=self._keep_spark_alive,
            daemon=True,
        )
        self._keep_spark_alive_thread.start()
    def _keep_spark_alive(self):
        """
        Keeps the Spark session alive by running a dummy job and sleeping for a specified interval.

        This method runs an infinite loop that periodically executes a dummy job on the Spark session
        to prevent it from being terminated due to inactivity. It sleeps for a specified interval between
        each execution of the dummy job.

        Returns:
            None
        """
        while True:
            # Run a dummy job to keep Spark alive
            try:
                logging.debug("Keeping Spark alive...")
                self._spark.sql(f"SELECT '{self.project_name}'").collect()

                # Sleep for 9 minutes
                # time.sleep(9 * 60)
                time.sleep(20) # 20 seconds for debug
            except Exception as e:
                logging.debug(f"Error keeping Spark alive: {e}")

在 10.4 LTS 集群中使用此线程,发送 Spark sql 并使集群保持活动状态。 但在 12.2 LTS 中,发送了 Spark sql,但它忽略了

activity
,如果 10 分钟内没有其他查询/操作,集群就会关闭。

我尝试将睡眠时间降低到 20 秒,结果发生了。每隔 20 秒我就可以在 Spark 日志中看到该活动,但 10 分钟后,它开始关闭。 10分20秒,由于线程的原因,它再次启动。但由于集群重新启动,我失去了 Spark 会话。

我的调试日志:

Keeping Spark alive...
Keeping Spark alive...
24/09/30 13:54:05 WARN SparkServiceRPCClient: The cluster seems to be down. A
24/09/30 13:54:06 WARN SparkServiceRPCClient: Cluster xxxx-xxxxxx-xxxxxxxx in
24/09/30 13:54:16 WARN SparkServiceRPCClient: Cluster xxxx-xxxxxx-xxxxxxxx in
24/09/30 13:54:26 WARN SparkServiceRPCClient: Cluster xxxx-xxxxxx-xxxxxxxx in
24/09/30 13:54:37 WARN SparkServiceRPCClient: Cluster xxxx-xxxxxx-xxxxxxxx in
24/09/30 13:54:47 WARN SparkServiceRPCClient: Cluster xxxx-xxxxxx-xxxxxxxx in
24/09/30 13:54:57 WARN SparkServiceRPCClient: Cluster xxxx-xxxxxx-xxxxxxxx in
Error keeping Spark alive: requirement failed: Result for RPC Some(87d40863-d
Keeping Spark alive...
Keeping Spark alive...

你知道为什么

spark.sql(f"SELECT '{self.project_name}'").collect()
不能作为一项活动吗?新版本的 pyspark 是否知道查询是相同的,因此保存在缓存中?

python pyspark databricks azure-databricks databricks-connect
1个回答
0
投票

是的,您的查询可能会被缓存并且不被视为活动,因为您每次都运行相同的查询,不需要再次重新计算。

此外,自动终止火花作业检查。如果10分钟内没有spark作业,它会自动终止。

因此,在工作流程中创建一个简单的笔记本作业并每 9 分钟触发一次。

databricks_instance = "https://axyz..syx10.azuredatabricks.net"
access_token = "dapib....."
job_id = "142342612795035"


url = f"{databricks_instance}/api/2.1/jobs/run-now"

headers = {
    "Authorization": f"Bearer {access_token}",
    "Content-Type": "application/json"
}
payload = {
    "job_id": job_id
}


def _keep_spark_alive():
    
    while True:
        try:
            logger.info("Keeping Spark alive...")
            response = requests.post(url, headers=headers, data=json.dumps(payload))
            if response.status_code == 200:
                print("Job triggered successfully.")
                print("Response:", response.json())
            else:
                print("Failed to trigger job.")
                print("Status Code:", response.status_code)
                print("Response:", response.text)
            time.sleep(60*3)
        except Exception as e:
            logging.info(f"Error keeping Spark alive: {e}")

输出:

enter image description here

这里,每 3 分钟就会触发一次作业并保持集群处于活动状态。

或者,您可以使用 Command Execution API

在集群中每 9 分钟运行一次 python 或 sql 命令
© www.soinside.com 2019 - 2024. All rights reserved.