我有一个 Scrapy 蜘蛛,它是从 FastAPI 中实现的 Rest API 端点触发的。 Scrapy 提供了从 Python 脚本启动蜘蛛的功能,如下所示:
from scrapy.crawler import CrawlerProcess
from scrapy.utils.project import get_project_settings
from tundra.spiders.spiderfool import SpiderfoolSpider
def run_spiderfool(start_page=1, number_pages=2):
process = CrawlerProcess(get_project_settings())
# Run the spider programmatically
process.crawl(SpiderfoolSpider, start_page=start_page, number_pages=number_pages)
process.start()
if __name__ == "__main__":
run_spiderfool(1, 2) # Example usage, include for each spider
此脚本“run_spider.py”位于包含“scrapy.cfg”的文件夹中。这里,
SpiderfoolSpider
是实现蜘蛛的类,start_page
和number_pages
是传递的参数。该脚本独立运行效果非常好:python3 run_spider.py
。
现在我将其集成到我的 Rest API 端点中:
@app.get("/api/trigger/spiderfool")
def trigger_spider(
start_page: int = Query(1, ge=1),
number_pages: int = Query(2, ge=1)
):
try:
logger.info(f"Starting spider with start_page={start_page}, number_pages={number_pages}")
run_spiderfool(start_page=start_page, number_pages=number_pages)
return {"message": f"Success! Triggered spider to crawl Motley Fool's transcripts collection from page {start_page} for {number_pages} pages!"}
except Exception as e:
logger.error(f"Failure! Triggered no spider! Err: {str(e)}")
return {"error": "Failure! Triggered no spider!", "details": str(e)}
蜘蛛仅在第一次从端点触发时运行,打印成功消息。随后每次触发端点时,它都会失败并使用空字符串
e
打印失败消息。查看日志,我在第一次(成功)运行中看到一条错误消息:
“ValueError:信号仅在主解释器的主线程中起作用”
我认为这是因为 Scrapy 或它使用的一些库(如twisted)依赖于仅在主线程中工作的信号。
我尝试过的:
我将twisted的
asyncioreactor.install()
中的asyncioreactor
放入FastAPI main中,并将端点重构为异步方法,如下所示:
@app.get("/api/trigger/spiderfool")
async def trigger_spider(
start_page: int = Query(1, ge=1),
number_pages: int = Query(2, ge=1)
):
try:
logger.info(f"Starting spider with start_page={start_page}, number_pages={number_pages}")
asyncio.create_task(arun_spiderfool(start_page=start_page, number_pages=number_pages))
return {"message": f"Success! Triggered spider to crawl Motley Fool's transcripts collection from page {start_page} for {number_pages} pages!"}
except Exception as e:
logger.error(f"Failure! Triggered no spider! Err: {str(e)}")
return {"error": "Failure! Triggered no spider!", "details": str(e)}
“run_spider.py”现在对蜘蛛进行异步调用,如下所示:
async def arun_spiderfool(start_page=1, number_pages=2):
loop = asyncio.get_event_loop()
loop.run_in_executor(None, run_spiderfool, start_page, number_pages)
def run_spiderfool(start_page=1, number_pages=2):
process = CrawlerProcess(get_project_settings())
# Run the spider programmatically
process.crawl(SpiderfoolSpider, start_page=start_page, number_pages=number_pages)
process.start()
这并没有改变代码的行为!其他事情我尝试使用
nest_asyncio
使用 FastAPI 事件循环修补 Scrapy 线程。这些没有帮助。
我终于走上了使用
subprocess
模块的路线。
import subprocess
def run_spiderfool(start_page=1, number_pages=2):
subprocess.Popen(['scrapy', 'crawl', 'spiderfool',
f'-a', f'start_page={start_page}',
f'-a', f'number_pages={number_pages}'])
这解决了问题,但有副作用,即端点只能启动蜘蛛,而无法在其完成时进行跟踪。
那么FastAPI的事件循环和Scrapy的Twisted线程库不兼容吗?
我最终接受了@MatsLindh的建议并实现了FastAPI端点,该端点使用Celery触发Scrapy蜘蛛来对请求进行排队。
Celery任务如下:
from celery.app import Celery
import os
import subprocess
redis_url = os.getenv("REDIS_URL", "redis://localhost:6379")
celery_app = Celery(
"scraper",
broker = redis_url,
backend = redis_url
)
@celery_app.task
def run_spiderfool_task(start_page=1, number_pages=2):
try:
# Run the Scrapy spider with arguments
command = [
"scrapy", "crawl", "spiderfool", # spider name
"-a", f"start_page={start_page}", # custom argument for start_page
"-a", f"number_pages={number_pages}" # custom argument for number_pages
]
# Execute the command
process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
# Wait for the process to finish
stdout, stderr = process.communicate()
# Check if there are any errors
if process.returncode != 0:
print(f"Error: {stderr.decode('utf-8')}")
else:
print(f"Spider completed successfully! Output:\n{stdout.decode('utf-8')}")
except Exception as e:
print(f"An error occurred: {e}")
Celery 使用 Redis 代理和后端。
FastAPI端点如下:
@app.post("/trigger/spiderfool")
def trigger_spider(
start_page: int = Query(1, ge=1),
number_pages: int = Query(2, ge=1)
):
spiderfool_task = run_spiderfool_task.delay(start_page, number_pages)
return {"job_id": spiderfool_task.id, "message": "Started spider!"}
@app.get("/status/{job_id}")
def get_status(job_id: str):
job_info = AsyncResult(job_id)
return {
"job_id": job_info.id,
"job_status": job_info.status,
"job_result": job_info.result
}
我对应用程序进行了 Docker 化,使其具有三个服务:
现在我可以从 FastAPI 触发蜘蛛并监控作业。
感谢@MatsLindh @wRAR 的投入。