我有以下代码
public string ManterRequisicoesComprasJob(string routingKey, string message)
{
try
{
var request = message.FromJson<HookOutDto>();
var keys = request.GetKeys();
var empresa = keys[0];
var sistema = keys[1].ToInteger();
var fornecedor = keys[2].ToInteger();
var negociacao = keys[3];
var periodo = keys[4].ToInteger();
return BatchJob.StartNew(batch =>
{
var jobId = batch.Enqueue<IHookTargetService>(x => x.Adicionar(null, request.Id, TargetHook.PROCUREMENT));
jobId = batch.ContinueJobWith<IRequisicaoCompraService>(jobId, x => x.GetGrupoRequisicoes(null, empresa, fornecedor, sistema, periodo, negociacao));
jobId = batch.ContinueJobWith<IRequisicaoCompraService>(jobId, x => x.ProcessarRequisicoes(null, empresa, fornecedor, request.Id));
},
request.GetJobDescription()
);
}
catch (Exception exception)
{
throw exception.Failin();
}
}
在 ProcessarRequisicoes 工作中,我正在继续代码
public async Task<RequisicaoCompraRequest> ProcessarRequisicoes(PerformContext context, string empresa, int fornecedor, long parentId)
{
try
{
var result = context.GetAntecedentResult<List<ReqFornecSistemaDto>>();
var jobId = context.BackgroundJob.Id;
context.SetJobParameter("PROCESSED", jobId);
var requisicaoJobs = new List<string>();
BatchJob.Attach(context.GetBatchId(), batch =>
{
foreach (var item in result)
batch.ContinueJobWith<IRequisicaoCompraService>(jobId, x => x.GetRequisicao(null, empresa, item.Sistema, item.Local, item.Produto)); //duvida
jobId = batch.ContinueJobWith<IRequisicaoCompraService>(jobId, x => x.GetCachedRequisicoes(null));
jobId = batch.ContinueJobWith<IRequisicaoCompraService>(jobId, x => x.GetIdFornecedorProcurement(null, empresa, fornecedor));
jobId = batch.ContinueJobWith<IMapperService>(jobId, x => x.MapFromAntecedentResult(null, typeof(Lar.Procurement.Client.RequisicaoCompra.RequestWrapper), null));
jobId = batch.ContinueJobWith<IRequisicaoCompraService>(jobId, x => x.Send(null));
batch.ContinueJobWith<IHookTargetService>(jobId, x => x.AtualizarFinalizado(null, parentId, TargetHook.PROCUREMENT));
});
return null;
}
catch (Exception exception)
{
throw exception.LimTargetException(context, TargetHook.PROCUREMENT);
}
}
我想处理所有 GetRequisicao 作业,并在它们全部成功完成后,我想转到下一个作业 GetCachedRequisicoes。如果我按照以下方式执行,我什至可以做到这一点,但问题是执行是连续的,一项接着一项。
foreach (var item in result)
jobId = batch.ContinueJobWith<IRequisicaoCompraService>(jobId, x => x.GetRequisicao(null, empresa, item.Sistema, item.Local, item.Produto));
但我想一次运行多个 GetRequisicao 作业,并且只有当它们全部完成时,才转到下一个 GetCachedRequisicoes 作业。有人可以帮助我吗?
要同时处理多个
GetRequisicao
作业,然后仅在所有 GetCachedRequisicoes
作业完成后才继续处理 GetRequisicao
作业,您可以在 Hangfire 中使用批处理延续的概念。 Hangfire 的批处理功能允许您将作业分组在一起,并定义在批处理中的所有作业完成后运行的延续。
以下是如何修改代码来实现此目的:
为
GetRequisicao
作业创建新批次:
GetRequisicao
作业链接到前一个作业,而是为这些作业创建一个新批次。这允许它们同时运行。BatchJob.StartNew
在现有批次中启动新批次。将
GetRequisicao
职位添加到新批次:
GetRequisicao
作业添加到这个新批次中。定义新批次的延续:
GetRequisicao
作业后,为将启动 GetCachedRequisicoes
作业的新批次定义延续。此延续仅在所有 GetRequisicao
作业完成后才会执行。这是一个基于您的代码的示例:
public async Task<RequisicaoCompraRequest> ProcessarRequisicoes(PerformContext context, string empresa, int fornecedor, long parentId)
{
try
{
var result = context.GetAntecedentResult<List<ReqFornecSistemaDto>>();
var jobId = context.BackgroundJob.Id;
context.SetJobParameter("PROCESSED", jobId);
// Start a new batch for GetRequisicao jobs
var getRequisicaoBatchId = BatchJob.StartNew(batch =>
{
foreach (var item in result)
{
batch.Enqueue<IRequisicaoCompraService>(x => x.GetRequisicao(null, empresa, item.Sistema, item.Local, item.Produto));
}
});
// Define a continuation for the new batch
BatchJob.ContinueBatchWith(getRequisicaoBatchId, batch =>
{
jobId = batch.Enqueue<IRequisicaoCompraService>(x => x.GetCachedRequisicoes(null));
jobId = batch.ContinueJobWith<IRequisicaoCompraService>(jobId, x => x.GetIdFornecedorProcurement(null, empresa, fornecedor));
jobId = batch.ContinueJobWith<IMapperService>(jobId, x => x.MapFromAntecedentResult(null, typeof(Lar.Procurement.Client.RequisicaoCompra.RequestWrapper), null));
jobId = batch.ContinueJobWith<IRequisicaoCompraService>(jobId, x => x.Send(null));
batch.ContinueJobWith<IHookTargetService>(jobId, x => x.AtualizarFinalizado(null, parentId, TargetHook.PROCUREMENT));
});
return null;
}
catch (Exception exception)
{
throw exception.LimTargetException(context, TargetHook.PROCUREMENT);
}
}
在此修订后的方法中:
GetRequisicao
作业都添加到单独的批次中并同时运行。GetCachedRequisicoes
作业和后续作业被设置为仅在所有 GetRequisicao
作业完成后才运行的延续。GetCachedRequisicoes
作业仅在所有 GetRequisicao
作业成功完成后启动,同时仍允许 GetRequisicao
作业同时运行。