我有一个flask应用程序,该应用程序接收请求并尝试通过给定的URL截屏,这是通过asyncio函数完成的。
我所做的是,
import asyncio
from pyppeteer import launch
from flask import Flask
import base64
from flask import Blueprint, jsonify, request
import jwt
async def main():
browser = await launch(headless=True)
page = await browser.newPage()
await page.goto(target)
await page.screenshot({'path': '/tmp/screen.png', 'fullPage': True})
await browser.close()
app = Flask(__name__)
@app.route('/heatMapDbConfigSave', methods=['POST'])
def notify():
token, target,id = map(
request.form.get, ('token', 'target','id'))
asyncio.get_event_loop().run_until_complete(main(target))
if __name__ == '__main__':
app.run(host='localhost', port=5002, debug=True)
我面临的问题是,出现错误RuntimeError:线程'Thread-2'中没有当前事件循环。。我已经用Google搜索并浏览了以前的文章。没有人提供帮助,也没有提出明确的解决方案。
解决此问题的解决方案是什么?
提前感谢!
您可以尝试以下类似方法
from flask import Blueprint
import asyncio
health_check = Blueprint('health_check', __name__)
async def first():
await asyncio.sleep(20)
return 'first'
async def second():
await asyncio.sleep(10)
return 'second'
async def third():
await asyncio.sleep(10)
return 'third'
def ordinary_generator():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
for future in asyncio.as_completed([first(), second(), third()]):
print('reached')
yield loop.run_until_complete(future)
@health_check.route('', methods=['GET'])
def healthcheck():
"""
Retrieves the health of the service.
"""
for element in ordinary_generator():
print(element)
return "Health check passed"