如何在python烧瓶应用程序内调用asyncio函数

问题描述 投票:1回答:1

我有一个flask应用程序,该应用程序接收请求并尝试通过给定的URL截屏,这是通过asyncio函数完成的。

我所做的是,

import asyncio
from pyppeteer import launch
from flask import Flask
import base64
from flask import Blueprint, jsonify, request
import jwt
async def main():
    browser = await launch(headless=True)
    page = await browser.newPage()
    await page.goto(target)
    await page.screenshot({'path': '/tmp/screen.png', 'fullPage': True})
    await browser.close()
app = Flask(__name__)
@app.route('/heatMapDbConfigSave', methods=['POST'])
def notify():
    token, target,id = map(
            request.form.get, ('token', 'target','id'))
    asyncio.get_event_loop().run_until_complete(main(target))

if __name__ == '__main__':
    app.run(host='localhost', port=5002, debug=True)

我面临的问题是,出现错误RuntimeError:线程'Thread-2'中没有当前事件循环。。我已经用Google搜索并浏览了以前的文章。没有人提供帮助,也没有提出明确的解决方案。

解决此问题的解决方案是什么?

提前感谢!

python python-3.x flask python-multithreading
1个回答
0
投票

您可以尝试以下类似方法

from flask import Blueprint
import asyncio

health_check = Blueprint('health_check', __name__)


async def first():
    await asyncio.sleep(20)
    return 'first'

async def second():
    await asyncio.sleep(10)
    return 'second'

async def third():
    await asyncio.sleep(10)
    return 'third'

def ordinary_generator():
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)
    for future in asyncio.as_completed([first(), second(), third()]):
        print('reached')
        yield loop.run_until_complete(future)

@health_check.route('', methods=['GET'])
def healthcheck():
    """
    Retrieves the health of the service.
    """
    for element in ordinary_generator():
        print(element)
    return "Health check passed"
© www.soinside.com 2019 - 2024. All rights reserved.