我正在尝试执行一项并行处理多个订单的任务。我使用了一个和弦,并将和弦id发送回前端定期检查任务进度,但是我发送的id实际上是获取我嵌套和弦的父任务的结果?我不太确定出了什么问题。
这是我的父任务和 Django 端点
@shared_task
def bulk_create_shipping_labels_task(skus, carrier, service, measurements, user_id):
start_time = time.time()
logger.info(f"Starting bulk shipping label creation for SKUs: {skus}")
orders = Order.objects.filter(
Q(items__sku__sku__in=skus) & Q(order_status='pending_fulfillment')
).distinct().values_list('order_id', flat=True)
# Create a chord of tasks
header = [process_single_order.s(order_id, user_id, carrier, service, measurements) for order_id in orders]
callback = collect_bulk_results.s()
# Execute the chord
chord_result = chord(header)(callback)
print('chord_result', chord_result)
end_time = time.time()
total_time = end_time - start_time
logger.info(f"Bulk shipping labels processing started. Total setup time: {total_time:.2f} seconds")
chord_info = {
'id': chord_result.id,
'state': chord_result.state,
'ready': chord_result.ready(),
'successful': chord_result.successful(),
'failed': chord_result.failed(),
}
return {
'message': 'Bulk shipping labels processing started',
'task_id': chord_result.id,
'chord_info': chord_info
}
@api_view(['GET'])
def check_bulk_shipping_status(request, task_id):
logger.info(f"Checking status of bulk shipping task: {task_id}")
try:
task_result = AsyncResult(task_id)
if task_result.ready():
result = task_result.get()
print('result', result)
return Response({
'status': 'completed',
'message': 'All tasks have finished processing',
})
else:
return Response({
'status': 'processing',
'message': 'Tasks are still processing'
})
except Exception as e:
return Response({
'status': 'error',
'error': str(e)
}, status=500)
我尝试使用组来代替并在那里调用组ID,但它总是说该组不存在。
没关系,我正在将父任务 ID 而不是和弦发送到端点