如何在 Celery shared_task 中运行异步 GraphQL schema.execute()?

问题描述 投票:0回答:0

我将 Strawberry 与 Django 一起用于 GraphQL,我创建了一个 shared_task(),它应该在 Schema 上执行此查询,但我收到此错误:

GraphQLError("'coroutine' 对象没有属性 'items'")

query_total = """
        query MyQuery($filters: Filter!) {
            something(filters: $filters) {
                items {
                    id
                }
                total
            }
        }
    """

@shared_task()
def my_function(filters, request):

    context = CustomContext(request)

    loop = asyncio.get_event_loop()
    total_result = loop.run_until_complete(schema.execute(query_total, variable_values={"filters": filters}, context_value=context))

我这样调用函数:

my_function.delay(...)

我已经尝试过使用 async-await、async_to_sync 和包装函数。

python django graphql celery strawberry-graphql
© www.soinside.com 2019 - 2024. All rights reserved.