我正在用 Python 构建一个 API 应用程序,大量用户将使用该应用程序来执行他们的作业请求。用户使用 API 端点提交提供输入值的作业请求。他们收到 JobID 作为响应,应用程序在后台运行作业。作业完成后,用户可以轮询并检索结果。
该应用程序的初始版本定义了固定数量的副本,并且支持多个请求的数量。但是,当我增加负载时,要么请求开始失败,要么 pod 下的应用程序(gunicorn 工作节点)开始失败。
我正在考虑为后台作业动态创建 Pod,这是我过去从未做过的。我认为这将有助于扩展应用程序负载,因为一旦输入作业完成,pod 就会死亡。但是,我不确定如何管理 Pod(如果 Pod 失败或 Pod 下的容器失败等)。此外,我还会受到可以创建的 pod 数量的限制。
如果有人构建了类似的需要大量请求的应用程序,可以分享他们的经验吗?如何设计这样的应用程序具有高度可扩展性和弹性?
为后台作业使用专用工作队列。 在 Python 原生环境中,Celery 可以正常工作;如果您认为可能需要跨语言互操作,RabbitMQ 是一个流行的开源选项;还有其他选择。
同样,编写一个专用的工作进程来实际运行作业。
您的基本执行流程将如下所示:
在本地开发和测试它,根本不使用任何容器基础设施(也许使用 Docker Compose 启动数据库和作业队列基础设施,但实际上在容器外部编写 API 服务器和工作线程)。
当您要部署它时,您将需要两个 StatefulSet(用于数据库和 Redis/RabbitMQ/...)和两个 Deployment(用于 HTTP 服务器和工作线程)。
这样做的实际意义是能够为两个 Deployment 配置一个 HorizontalPodAutoscaler。 这可以让您根据某些指标自动设置
replicas:
;将应用程序中的指标与某些指标存储(通常是 Prometheus)连接回 HPA 基础设施是一项挑战。 现在,您可以设置 API 服务器的副本数与并发 HTTP 请求数成正比(在负载下扩展 HTTP 层),以及工作线程的副本数与队列长度成正比(如果存在则扩展工作线程)很多未完成的工作)。
请注意,这一切根本不使用 Kubernetes API,也不会尝试动态创建 Pod。 如上所述,该堆栈非常独立于任何容器运行时(您可以在普通 Docker 中运行它,或者根本不需要容器)。 权限方面以及完成后清理每个作业 Pod 方面也存在挑战。 我会避免这种方法。
以防万一其他人看到这篇文章,我遇到的挑战是 David 提到的,以及如何自动扩展作为 Python RQ 工作人员的后端 Pod。我最终创建了一个新的容器/部署,它利用了 Python Schedule 模块,并每隔几分钟运行一次,检查几个队列上正在运行的作业,我想根据活动作业进行扩展和缩减。我在寻找有关如何执行此操作的任何信息时遇到了困难,因此开始尝试代码并测试不同的方法。请注意,您可以忽略代码中的过多日志记录。这只是我编写任何企业应用程序的经验,大量日志记录是帮助快速解决出现问题的最佳方法。这是我为解决此问题而创建的代码。
#! /usr/bin/env python
import os
import logging
import logging.handlers
import time
import schedule
import requests
from kubernetes import client, config
from redis import Redis
from rq import Queue
script_name = "auto-scaler"
if os.path.exists("/var/log/"):
log_file = f"/var/log/{script_name}.log"
else:
log_file = f"{script_name}.log"
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - [%(name)s-%(module)s-%(funcName)s-%(lineno)s] - %(levelname)s - %(message)s',
handlers=[
logging.handlers.RotatingFileHandler(log_file, maxBytes=5000000, backupCount=5),
logging.StreamHandler()
]
)
redis_conn = Redis(host='automation-broker-redis', port=6380, db=0)
logging.info(f"Connected to the redis container with the default endpoint.")
q2_general = Queue('general', connection=redis_conn)
q3_initial = Queue('initial', connection=redis_conn)
def main():
logging.info('Starting main method.')
scale_deployment("automation-broker-worker-general", "automation-broker", q2_general, 5, 30)
scale_deployment("automation-broker-worker-initial", "automation-broker", q3_initial, 10, 30)
def scale_deployment(deployment_name: str, namespace: str, queue: Queue, min_replicas: int, max_replicas: int):
logging.info(f"Scaling deployment {deployment_name} in namespace {namespace}.")
config.load_incluster_config()
v1apps = client.AppsV1Api()
ret = v1apps.read_namespaced_deployment(name=deployment_name, namespace=namespace)
logging.info(f"Total replicas for {deployment_name} deployment: {ret.spec.replicas}")
logging.info(f"Available replicas for {deployment_name} deployment: {ret.status.available_replicas}")
total_replicas = ret.spec.replicas
available_replicas = ret.status.available_replicas
logging.info(f"Minimum replicas for {deployment_name} deployment: {min_replicas}")
logging.info(f"Maximum replicas for {deployment_name} deployment: {max_replicas}")
total_jobs = len(queue) + queue.started_job_registry.count + queue.scheduled_job_registry.count
logging.info(f"Total jobs in queue for {deployment_name} deployment: {total_jobs}")
if total_jobs > total_replicas and total_replicas < max_replicas:
logging.info(f"Scaling up {deployment_name} deployment.")
ret.spec.replicas = ret.spec.replicas + 1
v1apps.patch_namespaced_deployment(name=deployment_name, namespace=namespace, body=ret)
if total_jobs == 0 and ret.spec.replicas > min_replicas:
logging.info(f"Scaling down {deployment_name} deployment.")
ret.spec.replicas = ret.spec.replicas - 1
v1apps.patch_namespaced_deployment(name=deployment_name, namespace=namespace, body=ret)
logging.info(f"Completed scaling deployment {deployment_name} in namespace {namespace}.")
if __name__ == '__main__':
schedule.every(2).minutes.do(main)
logging.info("Starting scheduler service...")
while True:
schedule.run_pending()
time.sleep(1)