Kubernetes 设计应用程序以实现高度可扩展

问题描述 投票:0回答:2

我正在用 Python 构建一个 API 应用程序,大量用户将使用该应用程序来执行他们的作业请求。用户使用 API 端点提交提供输入值的作业请求。他们收到 JobID 作为响应,应用程序在后台运行作业。作业完成后,用户可以轮询并检索结果。

该应用程序的初始版本定义了固定数量的副本,并且支持多个请求的数量。但是,当我增加负载时,要么请求开始失败,要么 pod 下的应用程序(gunicorn 工作节点)开始失败。

我正在考虑为后台作业动态创建 Pod,这是我过去从未做过的。我认为这将有助于扩展应用程序负载,因为一旦输入作业完成,pod 就会死亡。但是,我不确定如何管理 Pod(如果 Pod 失败或 Pod 下的容器失败等)。此外,我还会受到可以创建的 pod 数量的限制。

如果有人构建了类似的需要大量请求的应用程序,可以分享他们的经验吗?如何设计这样的应用程序具有高度可扩展性和弹性?

kubernetes kubernetes-pod kubernetes-apiserver scalable
2个回答
0
投票

为后台作业使用专用工作队列。 在 Python 原生环境中,Celery 可以正常工作;如果您认为可能需要跨语言互操作,RabbitMQ 是一个流行的开源选项;还有其他选择。

同样,编写一个专用的工作进程来实际运行作业。

您的基本执行流程将如下所示:

  1. API 端点将作业添加到工作队列,但实际上本身并不执行任何工作。
  2. 工作人员执行工作队列中的任务,并将其结果写回数据库。
  3. 状态端点轮询数据库以返回结果。

在本地开发和测试它,根本不使用任何容器基础设施(也许使用 Docker Compose 启动数据库和作业队列基础设施,但实际上在容器外部编写 API 服务器和工作线程)。

当您要部署它时,您将需要两个 StatefulSet(用于数据库和 Redis/RabbitMQ/...)和两个 Deployment(用于 HTTP 服务器和工作线程)。

这样做的实际意义是能够为两个 Deployment 配置一个 HorizontalPodAutoscaler。 这可以让您根据某些指标自动设置

replicas:
;将应用程序中的指标与某些指标存储(通常是 Prometheus)连接回 HPA 基础设施是一项挑战。 现在,您可以设置 API 服务器的副本数与并发 HTTP 请求数成正比(在负载下扩展 HTTP 层),以及工作线程的副本数与队列长度成正比(如果存在则扩展工作线程)很多未完成的工作)。

请注意,这一切根本不使用 Kubernetes API,也不会尝试动态创建 Pod。 如上所述,该堆栈非常独立于任何容器运行时(您可以在普通 Docker 中运行它,或者根本不需要容器)。 权限方面以及完成后清理每个作业 Pod 方面也存在挑战。 我会避免这种方法。


0
投票

以防万一其他人看到这篇文章,我遇到的挑战是 David 提到的,以及如何自动扩展作为 Python RQ 工作人员的后端 Pod。我最终创建了一个新的容器/部署,它利用了 Python Schedule 模块,并每隔几分钟运行一次,检查几个队列上正在运行的作业,我想根据活动作业进行扩展和缩减。我在寻找有关如何执行此操作的任何信息时遇到了困难,因此开始尝试代码并测试不同的方法。请注意,您可以忽略代码中的过多日志记录。这只是我编写任何企业应用程序的经验,大量日志记录是帮助快速解决出现问题的最佳方法。这是我为解决此问题而创建的代码。

#! /usr/bin/env python

import os
import logging
import logging.handlers
import time
import schedule
import requests
from kubernetes import client, config
from redis import Redis
from rq import Queue

script_name = "auto-scaler"

if os.path.exists("/var/log/"):
    log_file = f"/var/log/{script_name}.log"
else:
    log_file = f"{script_name}.log"

logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - [%(name)s-%(module)s-%(funcName)s-%(lineno)s] - %(levelname)s - %(message)s',
    handlers=[
        logging.handlers.RotatingFileHandler(log_file, maxBytes=5000000, backupCount=5),
        logging.StreamHandler()
    ]
)

redis_conn = Redis(host='automation-broker-redis', port=6380, db=0)
logging.info(f"Connected to the redis container with the default endpoint.")

q2_general = Queue('general', connection=redis_conn)
q3_initial = Queue('initial', connection=redis_conn)

def main():
    logging.info('Starting main method.')
        
    scale_deployment("automation-broker-worker-general", "automation-broker", q2_general, 5, 30)
    scale_deployment("automation-broker-worker-initial", "automation-broker", q3_initial, 10, 30)


def scale_deployment(deployment_name: str, namespace: str, queue: Queue, min_replicas: int, max_replicas: int):
    logging.info(f"Scaling deployment {deployment_name} in namespace {namespace}.")
    config.load_incluster_config()
    v1apps = client.AppsV1Api()

    ret = v1apps.read_namespaced_deployment(name=deployment_name, namespace=namespace)
    logging.info(f"Total replicas for {deployment_name} deployment: {ret.spec.replicas}")
    logging.info(f"Available replicas for {deployment_name} deployment: {ret.status.available_replicas}")
    total_replicas = ret.spec.replicas
    available_replicas = ret.status.available_replicas

    logging.info(f"Minimum replicas for {deployment_name} deployment: {min_replicas}")
    logging.info(f"Maximum replicas for {deployment_name} deployment: {max_replicas}")

    total_jobs = len(queue) + queue.started_job_registry.count + queue.scheduled_job_registry.count
    logging.info(f"Total jobs in queue for {deployment_name} deployment: {total_jobs}")

    if total_jobs > total_replicas and total_replicas < max_replicas:
        logging.info(f"Scaling up {deployment_name} deployment.")
        ret.spec.replicas = ret.spec.replicas + 1
        v1apps.patch_namespaced_deployment(name=deployment_name, namespace=namespace, body=ret)

    if total_jobs == 0 and ret.spec.replicas > min_replicas:
        logging.info(f"Scaling down {deployment_name} deployment.")
        ret.spec.replicas = ret.spec.replicas - 1
        v1apps.patch_namespaced_deployment(name=deployment_name, namespace=namespace, body=ret)

    logging.info(f"Completed scaling deployment {deployment_name} in namespace {namespace}.")

if __name__ == '__main__':
    schedule.every(2).minutes.do(main)

    logging.info("Starting scheduler service...")

    while True:
       schedule.run_pending()
       time.sleep(1)
© www.soinside.com 2019 - 2024. All rights reserved.