我有一个小宠物项目,它是一个用 fastpic 编写并使用 postgresql 和 sqlalchemyORM 的 api。最近我遇到了 Celery 这样的东西,并决定将它添加到我的项目中,但我遇到了一个问题,我有点不明白我的项目最好应用它的原则是什么。
简要介绍一下我的项目
这是一个用于在公司内部创建公司和员工的 Restful Api。该 API 具有 4 个角色的逻辑。我在 jwt 令牌的帮助下检查用户具有什么角色,并将会话数据存储在萝卜中。
这是我项目的树
├───alembic
├───auth
├───celery
├───crud_routs_func
│ ├───accounts_db_func
│ └───dash_board_func
├───db
│ ├───backup_folder
├───logger
├───rabbitmq
│ └───mnesia
├───routes
└───utils
在 main.py 中,我添加了 routes 文件夹中的所有路线。
def main():
app.include_router(ManipulationEmployeesController.create_router())
app.include_router(AccountController.create_router())
app.include_router(AuthenticateController.create_router())
app.include_router(CompanyController.create_router())
app.include_router(SellingPointsController.create_router())
app.include_router(InviteEmployeeController.create_router())
app.include_router(CriteriesController.create_router())
app.include_router(DashboardController.create_router())
uvicorn.run(app, host="0.0.0.0", port=8000, ws_ping_interval=0)
if __name__ == "__main__":
t1 = threading.Thread(target=create_backup_job, args=())
t1.start()
main()
我拥有的每个路由器看起来都是这样的,并且由于某个类的功能访问数据库而接收数据。
class CompanyController(Controller):
prefix = "/api/company"
tags = ["company"]
@post("/create")
@check_access(["owner"])
async def create(request: Request, item: schemas.Company):
user = _T.user_items(request)
CompanyDBFunc.create_company(ur_name=item.ur_name,true_name=item.true_name,
inn=item.inn, ur_address=item.ur_address,
true_address=item.true_address, phone=item.phone,
email=item.email, owner_id=user["id"])
response = JSONResponse(status_code=201, content={"message":"You have successfully created a company!"})
return response
@put("/update")
@check_access(["owner"])
async def update(request: Request, item: schemas.Company, company_id):
user = _T.user_items(request)
CompanyDBFunc.update_company(company_id, user['id'], ur_name=item.ur_name, true_name=item.true_address, inn=item.inn, ur_address= item.ur_address,
true_address=item.true_address, phone=item.phone, email=item.email)
response = JSONResponse(status_code=200, content={"message":"Company data has been successfully updated."})
return response
这是关于任何类对我来说访问数据库的样子
class CompanyDBFunc(DbConnect):
@classmethod
@Logger.log
def create_company(self, ur_name, true_name, inn, ur_address, true_address, phone, email, owner_id):
new_company = models.Company(ur_name=ur_name, true_name= true_name, inn=inn, ur_address=ur_address, true_address=true_address, phone=phone, email=email, owner_id=owner_id)
self.session_work.add(new_company)
self.session_work.commit()
self.session_work.close()
@classmethod
@Logger.log
def get_company_ids_by_owner_id(self, owner_id):
company_ids = self.session_work.query(models.Company.id).filter_by(owner_id=owner_id).all()
company_ids = [c[0] for c in company_ids]
self.session_work.close()
return company_ids
如何在此类应用程序中正确使用芹菜以及应按什么逻辑创建队列进行处理?
提前谢谢大家!