如何在Python Web应用程序中实现周期性执行的函数?

问题描述 投票:0回答:1

如果帐户被暂停超过7天,就我而言,必须将其删除。目前,我有三个想法:

    1. 再创建一个cronjob文件夹,并编写一个脚本,使用cronjob定期执行这个Python文件。 (可以使用
      base.py
      中的MongoDB连接方法来分享吗,还是需要单独写?)
    1. 实现一个API来删除
      authentication.py
      中的帐户,并设置一个cronjob来定期调用该API。
    1. user.py
      中编写功能并使用
      if __name__ == "__main__":
      调用该函数。

      (您可以查看下面的功能

      delete_suspended_accounts
      。)

实现此功能的最佳实践是什么?或者您还有其他建议吗?谢谢你。

更多信息可以查看我的代码结构。

  1. authentication.py
    中实现账号API,如注册、重置密码、账号暂停、账号重新激活、账号屏蔽等。
  2. base.py
    中实现MongoDB连接。
  3. user.py
    中实现账户API的数据库逻辑操作。
.
└── fastapi-backend
    ├── assets
    ├── static
    ├── src
    │   ├── api
    │   │   ├── api_router.py
    │   │   └── authentication.py
    │   ├── db
    │   │   ├── base.py
    │   │   └── user.py
    │   ├── models
    │   └── services
    └── utils

基础.py

from pymongo import MongoClient
import urllib
from fastapi import Depends
from src.core.config import setting
from typing import Callable, Type

class Base:
    def __init__(self, connection: MongoClient) -> None:
        self._conn = connection.demo

    @property
    def connection(self):
        return self._conn


def _get_mongo_client():
    return MongoClient(setting.MONGO_URL, maxPoolSize=None)


def get_repository(repo_type: Type[Base]) -> Callable:
    async def _get_repo(conn: MongoClient = Depends(_get_mongo_client)):
        yield repo_type(conn)
    return _get_repo

用户.py

class UserRepository(Base):

    def __init__(self, conn):
        super().__init__(conn)

    def suspend_user(self, user_id: str):
        the_user = self.connection.user.find_one({'_id': ObjectId(user_id)})
        if the_user and the_user['status'] == Status.ACTIVATED.value:
            self.connection.user.update(the_user, {"$set": {'status': Status.SUSPENDED.value,
                                                            'suspended_date': datetime.datetime.now()}})
        else:
            raise HTTPException(status_code=400, detail=u'Not Activated')

    def delete_suspended_accounts(self):
        query = {"status": 'Status.SUSPENDED.value', "suspended_date": {"$lt": datetime.datetime.utcnow() - timedelta(days=7)}}
        result = self.connection.user.delete_many(query)
        return result.deleted_count

身份验证.py

@authentication.post('/suspend')
async def suspend_account(request: Request, token: str,
                 user_repository: UserRepository = Depends(get_repository(UserRepository))):
    try:
        # decode access token
        access_token = jwt.decode(token, key=setting.SECERT_KEY)
        # suspend user
        user_id = access_token['sub']
        user_repository.suspend_user(user_id=user_id)
    except DecodeError:
        return {'message': 'decode error'}
    except ExpiredSignatureError:
        return {'message': 'expired signature error'}
    return {'message': 'ok'}

我尝试了第一种方法,如下,编写一个脚本,然后用cronjob执行它,成功了。然而,我的技术领导建议我用

user.py
来写。但由于 FastAPI 使用依赖注入(Depends)来连接 MongoDB,我应该如何编写它才能获得最佳方法?或者您还有其他推荐的方法吗?谢谢你。

from pymongo import MongoClient
from datetime import datetime, timedelta

client = MongoClient("MONGO_URL")
db = client['demo']
collection = db['user']
def delete_suspended_accounts():
    query = {"status": 3, "suspended_date": {"$lt": datetime.utcnow() - timedelta(days=7)}}
    result = collection.delete_many(query)
    print(f"Deleted {result.deleted_count} suspended accounts.")
if __name__ == "__main__":
    delete_suspended_accounts()
python mongodb cron fastapi
1个回答
0
投票

我创建了另一个名为 cronjob 的文件夹,并将

delete_account.py
放入其中。您只需浏览下面的代码即可。

.
└── fastapi-backend
    ├── assets
    ├── static
    ├── cronjob
    │   └── delete_account.py
    ├── src
    │   ├── api
    │   │   ├── api_rounter.py
    │   │   └── authentication.py
    │   ├── db
    │   │   ├── base.py
    │   │   └── user.py
    │   ├── models
    │   └── services
    └── utils
import sys
sys.path.insert(0, '../')

from src.db.users import UserRepository
from src.db.base import _get_mongo_client

conn = _get_mongo_client()

if __name__ == "__main__":
    user_repo = UserRepository(conn)
    counts = user_repo.delete_suspended_accounts()
    print(f"Deleted {counts} suspended accounts.")

我不知道使用 import sys 是否是最佳实践,但它似乎有效。

© www.soinside.com 2019 - 2024. All rights reserved.