Celery 任务完成检查未检查正确的任务

问题描述 投票:0回答:1

我正在尝试执行一项并行处理多个订单的任务。我使用了一个和弦,并将和弦id发送回前端定期检查任务进度,但是我发送的id实际上是获取我嵌套和弦的父任务的结果?我不太确定出了什么问题。

这是我的父任务和 Django 端点

@shared_task
def bulk_create_shipping_labels_task(skus, carrier, service, measurements, user_id):
    start_time = time.time()
    logger.info(f"Starting bulk shipping label creation for SKUs: {skus}")
    
    orders = Order.objects.filter(
        Q(items__sku__sku__in=skus) & Q(order_status='pending_fulfillment')
    ).distinct().values_list('order_id', flat=True)

    # Create a chord of tasks
    header = [process_single_order.s(order_id, user_id, carrier, service, measurements) for order_id in orders]
    callback = collect_bulk_results.s()
    
    # Execute the chord
    chord_result = chord(header)(callback)

    print('chord_result', chord_result)
    
    end_time = time.time()
    total_time = end_time - start_time

    logger.info(f"Bulk shipping labels processing started. Total setup time: {total_time:.2f} seconds")

    chord_info = {
        'id': chord_result.id,
        'state': chord_result.state,
        'ready': chord_result.ready(),
        'successful': chord_result.successful(),
        'failed': chord_result.failed(),
    }

    return {
        'message': 'Bulk shipping labels processing started',
        'task_id': chord_result.id,
        'chord_info': chord_info
    }
@api_view(['GET'])
def check_bulk_shipping_status(request, task_id):
    logger.info(f"Checking status of bulk shipping task: {task_id}")
    try:
        task_result = AsyncResult(task_id)
        
        if task_result.ready():
            result = task_result.get()
            print('result', result)
            return Response({
                'status': 'completed',
                'message': 'All tasks have finished processing',
            })
        else:
            return Response({
                'status': 'processing',
                'message': 'Tasks are still processing'
            })
    except Exception as e:
        return Response({
            'status': 'error',
            'error': str(e)
        }, status=500)

我尝试使用组来代替并在那里调用组ID,但它总是说该组不存在。

python django django-celery celery-task
1个回答
0
投票

没关系,我正在将父任务 ID 而不是和弦发送到端点

© www.soinside.com 2019 - 2024. All rights reserved.