FastAPI 事件循环和 Scrapy 的 Twisted 线程不能很好地协同工作

问题描述 投票:0回答:1

我有一个 Scrapy 蜘蛛,它是从 FastAPI 中实现的 Rest API 端点触发的。 Scrapy 提供了从 Python 脚本启动蜘蛛的功能,如下所示:

from scrapy.crawler import CrawlerProcess
from scrapy.utils.project import get_project_settings
from tundra.spiders.spiderfool import SpiderfoolSpider

def run_spiderfool(start_page=1, number_pages=2):

    process = CrawlerProcess(get_project_settings())

    # Run the spider programmatically
    process.crawl(SpiderfoolSpider, start_page=start_page, number_pages=number_pages)
    process.start()

if __name__ == "__main__":
    run_spiderfool(1, 2)  # Example usage, include for each spider

此脚本“run_spider.py”位于包含“scrapy.cfg”的文件夹中。这里,

SpiderfoolSpider
是实现蜘蛛的类,
start_page
number_pages
是传递的参数。该脚本独立运行效果非常好:
python3 run_spider.py

现在我将其集成到我的 Rest API 端点中:

@app.get("/api/trigger/spiderfool")
def trigger_spider(
    start_page: int = Query(1, ge=1),
    number_pages: int = Query(2, ge=1)
):
    try:
        logger.info(f"Starting spider with start_page={start_page}, number_pages={number_pages}")
        run_spiderfool(start_page=start_page, number_pages=number_pages)

        return {"message": f"Success! Triggered spider to crawl Motley Fool's transcripts collection from page {start_page} for {number_pages} pages!"}
    except Exception as e:
        logger.error(f"Failure! Triggered no spider! Err: {str(e)}")
        return {"error": "Failure! Triggered no spider!", "details": str(e)}

蜘蛛仅在第一次从端点触发时运行,打印成功消息。随后每次触发端点时,它都会失败并使用空字符串

e
打印失败消息。查看日志,我在第一次(成功)运行中看到一条错误消息:

“ValueError:信号仅在主解释器的主线程中起作用”

我认为这是因为 Scrapy 或它使用的一些库(如twisted)依赖于仅在主线程中工作的信号。

我尝试过的:

我将twisted的

asyncioreactor.install()
中的
asyncioreactor
放入FastAPI main中,并将端点重构为异步方法,如下所示:

@app.get("/api/trigger/spiderfool")
async def trigger_spider(
    start_page: int = Query(1, ge=1),
    number_pages: int = Query(2, ge=1)
):
    try:
        logger.info(f"Starting spider with start_page={start_page}, number_pages={number_pages}")
        asyncio.create_task(arun_spiderfool(start_page=start_page, number_pages=number_pages))
        return {"message": f"Success! Triggered spider to crawl Motley Fool's transcripts collection from page {start_page} for {number_pages} pages!"}
    except Exception as e:
        logger.error(f"Failure! Triggered no spider! Err: {str(e)}")
        return {"error": "Failure! Triggered no spider!", "details": str(e)}

“run_spider.py”现在对蜘蛛进行异步调用,如下所示:

async def arun_spiderfool(start_page=1, number_pages=2):
    loop = asyncio.get_event_loop()
    loop.run_in_executor(None, run_spiderfool, start_page, number_pages)

def run_spiderfool(start_page=1, number_pages=2):

    process = CrawlerProcess(get_project_settings())

    # Run the spider programmatically
    process.crawl(SpiderfoolSpider, start_page=start_page, number_pages=number_pages)
    process.start()

这并没有改变代码的行为!其他事情我尝试使用

nest_asyncio
使用 FastAPI 事件循环修补 Scrapy 线程。这些没有帮助。

我终于走上了使用

subprocess
模块的路线。

import subprocess

def run_spiderfool(start_page=1, number_pages=2):
    subprocess.Popen(['scrapy', 'crawl', 'spiderfool', 
                      f'-a', f'start_page={start_page}', 
                      f'-a', f'number_pages={number_pages}'])

这解决了问题,但有副作用,即端点只能启动蜘蛛,而无法在其完成时进行跟踪。

那么FastAPI的事件循环和Scrapy的Twisted线程库不兼容吗?

scrapy fastapi
1个回答
0
投票

我最终接受了@MatsLindh的建议并实现了FastAPI端点,该端点使用Celery触发Scrapy蜘蛛来对请求进行排队。

Celery任务如下:

from celery.app import Celery
import os
import subprocess

redis_url = os.getenv("REDIS_URL", "redis://localhost:6379")

celery_app = Celery(
    "scraper",
    broker = redis_url,
    backend = redis_url
)

@celery_app.task
def run_spiderfool_task(start_page=1, number_pages=2):
    try:
        # Run the Scrapy spider with arguments
        command = [
            "scrapy", "crawl", "spiderfool",             # spider name
            "-a", f"start_page={start_page}",             # custom argument for start_page
            "-a", f"number_pages={number_pages}"          # custom argument for number_pages
        ]
        
        # Execute the command
        process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        
        # Wait for the process to finish
        stdout, stderr = process.communicate()

        # Check if there are any errors
        if process.returncode != 0:
            print(f"Error: {stderr.decode('utf-8')}")
        else:
            print(f"Spider completed successfully! Output:\n{stdout.decode('utf-8')}")
    
    except Exception as e:
        print(f"An error occurred: {e}")

Celery 使用 Redis 代理和后端。

FastAPI端点如下:

@app.post("/trigger/spiderfool")
def trigger_spider(
    start_page: int = Query(1, ge=1),
    number_pages: int = Query(2, ge=1)
):
    spiderfool_task = run_spiderfool_task.delay(start_page, number_pages)
    return {"job_id": spiderfool_task.id, "message": "Started spider!"}

@app.get("/status/{job_id}")
def get_status(job_id: str):
    job_info = AsyncResult(job_id)
    return {
        "job_id": job_info.id,
        "job_status": job_info.status,
        "job_result": job_info.result
    }

我对应用程序进行了 Docker 化,使其具有三个服务:

  1. FastAPI:托管为 REST 端点提供服务的 FastAPI 应用程序。
  2. Celery Worker:在单个 Worker 上执行繁重的工作、排队请求和执行任务。
  3. Redis:充当 Celery 的代理和后端。

现在我可以从 FastAPI 触发蜘蛛并监控作业。

感谢@MatsLindh @wRAR 的投入。

© www.soinside.com 2019 - 2024. All rights reserved.