我正在开发一个具有微服务架构的系统,该系统依赖于多个 Azure 服务来实现持续的服务间通信。对于上下文,这是我的设置的简化概述:
为了简单起见,此设置排除了与网络相关的组件。
我的目标是创建一种机制来验证所有组件之间的正确连接。最初,我考虑使用一个定期(每 6 或 12 小时)运行的看门狗脚本来执行连接检查。这是脚本的简化版本:
class Watchdog:
def __init__(self):
self.status_file_path = Path("/var/watchdog/status.txt")
def __call__(self):
if not self.__is_check_needed():
logger.info("No need to run watchdog")
return
logger.info("Running watchdog connectivity checks.")
self.__run_checks()
self.__update_status_file()
def __is_check_needed(self) -> bool:
if not self.status_file_path.exists():
logger.info("Watchdog status file does not exist. ")
return True
status_creation_time = (datetime.now() - modification_date(self.status_file_path)).total_seconds()
logger.info(f"Watchdog status files was created {status_creation_time} seconds ago")
if status_creation_time > self.check_frequency:
logger.info(f"Watchdog status is too old {status_creation_time}"
"We need to run it again.")
return True
return False
def __run_checks(self):
self.__check_connection_to_sql()
self.__check_connection_to_blob()
self.__check_connection_to_queue()
# remaining checks goes here
此方法适用于长期存在的 Azure 容器应用程序,但由于潜在的系统延迟影响,不适合短期数据处理组件。此外,我不确定如何在 Keda 工作人员上执行专用脚本,使得该解决方案在某些情况下不适用。
任何见解或建议将不胜感激,如果有兴趣,我愿意合作或分享这个项目。
此方法适用于长期存在的 Azure 容器应用程序,但由于潜在的系统延迟影响,不适合短期数据处理组件。
在同一个 Pod 中使用 Sidecar 容器和数据处理引擎。该 sidecar 将负责健康检查、记录并向我们配置的集中监控系统报告状态。
当数据处理引擎容器启动时,Sidecar也会启动并监控其生命周期。
部署.yml:
apiVersion: apps/v1
kind: Deployment
metadata:
name: deploy
spec:
replicas: 1
selector:
matchLabels:
app: data-processing
template:
metadata:
labels:
app: data-processing
spec:
containers:
- name: deploy
image: deploy
ports:
- containerPort: 8080
lifecycle:
preStop:
exec:
command: ["/bin/sh", "-c", "curl http://localhost:8081/shutdown"]
- name: my-app
image: my-app
ports:
- containerPort: 8081
command: ["python", "watchdog_script.py"]
我的app.yml:
apiVersion: batch/v1
kind: Job
metadata:
name: my-app
spec:
template:
spec:
containers:
- name: my-appcon
image: my-app
command: ["python", "watchdog_script.py"]
restartPolicy: Never
backoffLimit: 4
watchdog_script.py:
import requests
from datetime import datetime
from pathlib import Path
import logging
logger = logging.getLogger(__name__)
class Watchdog:
def __init__(self, check_frequency=43200):
self.status_file_path = Path("/var/watchdog/status.txt")
self.check_frequency = check_frequency
def __call__(self):
if not self.__is_check_needed():
logger.info("No need to run watchdog")
return
logger.info("Running watchdog connectivity checks.")
self.__run_checks()
self.__update_status_file()
def __is_check_needed(self) -> bool:
if not self.status_file_path.exists():
logger.info("Watchdog status file does not exist.")
return True
status_creation_time = (datetime.now() - self.__modification_date(self.status_file_path)).total_seconds()
logger.info(f"Watchdog status file was created {status_creation_time} seconds ago")
if status_creation_time > self.check_frequency:
logger.info(f"Watchdog status is too old ({status_creation_time} seconds). We need to run it again.")
return True
return False
def __run_checks(self):
self.__check_connection_to_sql()
self.__check_connection_to_blob()
self.__check_connection_to_queue()
self.__check_keda_scaled_components()
def __check_connection_to_sql(self):
# SQL check implementation
pass
def __check_connection_to_blob(self):
# Blob Storage check implementation
pass
def __check_connection_to_queue(self):
# Queue check implementation
pass
def __check_keda_scaled_components(self):
# Implement a check for KEDA scaled components
response = requests.get("http://keda-metrics-endpoint/health")
if response.status_code == 200:
logger.info("KEDA scaled components are healthy")
else:
logger.error("KEDA scaled components health check failed")
def __update_status_file(self):
self.status_file_path.touch()
@staticmethod
def __modification_date(file_path):
return datetime.fromtimestamp(file_path.stat().st_mtime)
if __name__ == "__main__":
watchdog = Watchdog()
watchdog()
结果: