celery和rabbitmq按什么原则来分配队列才是正确的?

问题描述 投票:0回答:0

我有一个小宠物项目,它是一个用 fastpic 编写并使用 postgresql 和 sqlalchemyORM 的 api。最近我遇到了 Celery 这样的东西,并决定将它添加到我的项目中,但我遇到了一个问题,我有点不明白我的项目最好应用它的原则是什么。

简要介绍一下我的项目

这是一个用于在公司内部创建公司和员工的 Restful Api。该 API 具有 4 个角色的逻辑。我在 jwt 令牌的帮助下检查用户具有什么角色,并将会话数据存储在萝卜中。

这是我项目的树

├───alembic
├───auth
├───celery
├───crud_routs_func
│   ├───accounts_db_func
│   └───dash_board_func
├───db
│   ├───backup_folder
├───logger
├───rabbitmq
│   └───mnesia
├───routes
└───utils

在 main.py 中,我添加了 routes 文件夹中的所有路线。

def main():
    app.include_router(ManipulationEmployeesController.create_router())
    app.include_router(AccountController.create_router())
    app.include_router(AuthenticateController.create_router())
    app.include_router(CompanyController.create_router())
    app.include_router(SellingPointsController.create_router())
    app.include_router(InviteEmployeeController.create_router())
    app.include_router(CriteriesController.create_router())
    app.include_router(DashboardController.create_router())
    uvicorn.run(app, host="0.0.0.0", port=8000, ws_ping_interval=0)

if __name__ == "__main__":
    t1 = threading.Thread(target=create_backup_job, args=())
    t1.start()
    main()

我拥有的每个路由器看起来都是这样的,并且由于某个类的功能访问数据库而接收数据。

class CompanyController(Controller):
    prefix = "/api/company"
    tags = ["company"]

    @post("/create")
    @check_access(["owner"])
    async def create(request: Request, item: schemas.Company):
        user = _T.user_items(request)
        CompanyDBFunc.create_company(ur_name=item.ur_name,true_name=item.true_name, 
                                    inn=item.inn, ur_address=item.ur_address,
                                    true_address=item.true_address, phone=item.phone, 
                                    email=item.email, owner_id=user["id"])
        response = JSONResponse(status_code=201, content={"message":"You have successfully created a company!"})
        return response

    @put("/update")
    @check_access(["owner"])
    async def update(request: Request, item: schemas.Company, company_id):
        user = _T.user_items(request)
        CompanyDBFunc.update_company(company_id, user['id'], ur_name=item.ur_name, true_name=item.true_address, inn=item.inn, ur_address= item.ur_address, 
                            true_address=item.true_address, phone=item.phone, email=item.email)
        response = JSONResponse(status_code=200, content={"message":"Company data has been successfully updated."})
        return response

这是关于任何类对我来说访问数据库的样子

class CompanyDBFunc(DbConnect):

    @classmethod    
    @Logger.log
    def create_company(self, ur_name, true_name, inn, ur_address, true_address, phone, email, owner_id):
        new_company = models.Company(ur_name=ur_name, true_name= true_name, inn=inn, ur_address=ur_address, true_address=true_address, phone=phone, email=email, owner_id=owner_id)
        self.session_work.add(new_company)
        self.session_work.commit()
        self.session_work.close()

    @classmethod
    @Logger.log
    def get_company_ids_by_owner_id(self, owner_id):
        company_ids = self.session_work.query(models.Company.id).filter_by(owner_id=owner_id).all()
        company_ids = [c[0] for c in company_ids]
        self.session_work.close()
        return company_ids

如何在此类应用程序中正确使用芹菜以及应按什么逻辑创建队列进行处理?

提前谢谢大家!

python rabbitmq celery fastapi
© www.soinside.com 2019 - 2024. All rights reserved.