我构建了一个小的Web抓取函数,以从Web上获取一些数据并将其填充到我的数据库中,效果很好。现在,我想使用芹菜定期任务每20秒定期触发此功能。我浏览了文档,一切似乎都已准备好进行开发(使用redis作为代理)。
这是我的定期触发功能所在的project / stocksapp中的tasks.py
文件:
# Celery imports
from celery.task.schedules import crontab
from celery.decorators import periodic_task
from celery.utils.log import get_task_logger
from datetime import timedelta
logger = get_task_logger(__name__)
# periodic functions
@periodic_task(
run_every=(timedelta(seconds=20)),
name="getStocksDataDax",
ignore_result=True
)
def getStocksDataDax():
print("fired")
现在,当我启动工作程序时,该函数似乎仅被触发一次(填充数据库)。但是在那之后,该功能不再被触发,尽管控制台建议:
C:\Users\Jonas\Desktop\CFD\CFD>celery -A CFD beat -l info
celery beat v4.4.2 (cliffs) is starting.
__ - ... __ - _
LocalTime -> 2020-05-15 23:06:29
Configuration ->
. broker -> redis://localhost:6379/0
. loader -> celery.loaders.app.AppLoader
. scheduler -> celery.beat.PersistentScheduler
. db -> celerybeat-schedule
. logfile -> [stderr]@%INFO
. maxinterval -> 5.00 minutes (300s)
[2020-05-15 23:06:29,990: INFO/MainProcess] beat: Starting...
[2020-05-15 23:06:30,024: INFO/MainProcess] Scheduler: Sending due task getStocksDataDax (getStocksDataDax)
[2020-05-15 23:06:50,015: INFO/MainProcess] Scheduler: Sending due task getStocksDataDax (getStocksDataDax)
[2020-05-15 23:07:10,015: INFO/MainProcess] Scheduler: Sending due task getStocksDataDax (getStocksDataDax)
[2020-05-15 23:07:30,015: INFO/MainProcess] Scheduler: Sending due task getStocksDataDax (getStocksDataDax)
[2020-05-15 23:07:50,015: INFO/MainProcess] Scheduler: Sending due task getStocksDataDax (getStocksDataDax)
[2020-05-15 23:08:10,016: INFO/MainProcess] Scheduler: Sending due task getStocksDataDax (getStocksDataDax)
[2020-05-15 23:08:30,016: INFO/MainProcess] Scheduler: Sending due task getStocksDataDax (getStocksDataDax)
[2020-05-15 23:08:50,016: INFO/MainProcess] Scheduler: Sending due task getStocksDataDax (getStocksDataDax)
project / project / celery.py
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery
# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'CFD.settings')
app = Celery('CFD',
broker='redis://localhost:6379/0',
backend='amqp://',
include=['CFD.tasks'])
app.conf.broker_transport_options = {'visibility_timeout': 3600}
# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
# should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')
# Load task modules from all registered Django app configs.
app.autodiscover_tasks()
@app.task(bind=True)
def debug_task(self):
print('Request: {0!r}'.format(self.request))
该函数本身总共运行大约一秒钟。
在此设置中,基本上每当使工作人员/芹菜每20秒触发一次功能时,该问题在哪里?
celery -A CFD beat -l info
仅启动芹菜节拍过程。您应该有一个单独的Celery worker进程-在另一个终端中运行类似celery -A CFD worker -c 8 -O fair -l info
的内容。