我正在尝试使用 Celery 从学校大学网站异步抓取课程注册信息。最终,我想构建一些东西,如果特定课程中有空位,用户会收到通知。 RabbitMQ 是我的消息代理。我正在使用 Django,在运行我的服务器时遇到错误。
我不知道我错过了什么,但这是我的第一个项目,使用 celery 之类的东西来异步查询数据。如果有人可以告诉我我缺少什么,我将不胜感激。下面是我的 RabbitMQ 仪表板的屏幕截图,我相信它正在运行并使用我的项目进行配置。
设置
CELERY_BROKER_URL = 'amqp://localhost:5672' # RabbitMQ broker URL
CELERY_RESULT_BACKEND = 'amqp://localhost:5672'
views.py
from django.shortcuts import render
from bs4 import BeautifulSoup
import requests
from .forms import CourseRegistrationForm
from .tasks import scrape_course_data
def main(request):
form = CourseRegistrationForm()
if request.method == 'POST':
form = CourseRegistrationForm(request.POST)
if form.is_valid():
term = form.cleaned_data['semester']
subj = form.cleaned_data['subject']
crse = form.cleaned_data['course_number']
classes_result = scrape_course_data.delay(term, subj, crse)
classes = classes_result.get() # Get the actual result
print(classes)
context = {
'form': form,
'classes':classes
}
return render(request, 'classes.html', context)
else:
form = CourseRegistrationForm()
context = {
'form': form,
}
return render(request, 'main.html', context)
任务.py
from celery import shared_task, Celery
from bs4 import BeautifulSoup
import requests
@shared_task
def scrape_course_data(term, subj, crse):
print('SOUP')
url=f'this is the url i am scraping from. no problems here'
link=requests.get(url).text
content=BeautifulSoup(link, 'lxml')
body=content.body
table=body.find(class_='datadisplaytable')
tble=table.contents[5:]
classes=[]
for row in tble:
if 'bs4.element.Tag' in str(type(row)):
course=row.find_all('td')
if len(course)>1:
if course[2].text==subj:
clas={}
clas['subj']=course[2].text
clas['crse']=course[3].text
if course[0].text == 'C':
clas['availability']='Full'
else:
clas['availability']='Available'
clas['title']=course[6].text
clas['id']=course[1].text
clas['seats']=course[12].text
clas['seats_avail']=course[13].text
classes.append(clas)
return classes
使用 RabbitMQ 通过 Celery 进行消息传递和结果:
配置:
CELERY_BROKER_URL = 'pyamqp://guest:guest@localhost//'
CELERY_RESULT_BACKEND = 'rpc://'
获取任务结果:
result = some_celery_task.delay(some_arg)
result.result
管理结果存储:
CELERY_TASK_RESULT_EXPIRES = 3600 # deletes results after 1 hour
@task(ignore_result=True)
def some_task():
...
注意:监控RabbitMQ。使用它作为结果后端可能会导致存储许多消息(结果),因此要进行相应的管理和监控。
通过此配置,您可以利用 RabbitMQ 在 Celery 中进行消息代理和结果存储。始终关注 RabbitMQ 的队列以确保最佳性能。