所以我的场景是我收到一条执行作业的消息。该作业有一个 sourceId。现在,一次应运行具有一种类型的 sourceId 的一项作业,其他作业应排队。当一个作业启动时,它应该再次将自己的工作分解为多个小执行。对于每个小执行,我需要更新数据库以了解作业的多少部分已完成。一旦整个作业结束,我需要更新作业完成的数据库。如果出现错误,我需要将数据库中的作业标记为失败。 这是我想出的一个大概的草图。如果我遗漏了一些东西,你们能帮我吗?我应该如何进行数据库更新。是否有任何 Java 语言功能可以让我的生活变得轻松。 我还需要在 quarkus 应用程序中执行此操作。
class PrimaryWorker{
ConcurrentHashMap<String,ArrayBlockingQueue> staging;
public void submit(){
staging.checkIfEntryExistForSource()
if(yes){
getTheEntry()
createSecondaryWorkerWithQueue();
pushTheJobToQueue();
}else{
createEntryForDataSource()
createSecondaryWorkerWithQueue();
pushTheJobToQueue();
}
}
}
class BusinessJobWorker{
ArrayBlockingQueue input;
ArrayBlockingQueue commonOutput;
public SecondaryWorker(input,commonOutput){
}
public void run(){
BusinessJob br = input.poll();
ArrayBlockingQueue queue = new ArrayBlockingQueue(10);
if(canBeParallel()){
queue = new ArrayBlockingQueue(1);
}
do{
Batch batch = getEntities(br,batchSize,pageNumber);
workerQueue.push(batch);
}while(batch.hasNext);
updateDatabase();
}
public void updateDatabase(){
}
public boolean canBeParallel(){
}
}
class BatchWorker{
ArrayBlockingQueue commonInput;
ArrayBlockingQueue output;
public BatchWorker(input){
}
}
我仍然没有找到正确的方法来做到这一点,我希望获得关于如何完成多线程任务的专家意见。
你看过《叛变》吗?这是 Quarkus 中用于反应式编程的库,请参阅此链接:
https://smallrye.io/smallrye-mutiny/latest/tutorials/getting-mutiny/
无论如何,我还没有测试下面的代码,但它应该是这样的:
假设您有一个带有 MyJobEntity 的数据库(并且我在本示例中使用 Mongo):
public class MyJobEntity extends ReactivePanacheMongoEntity {
public State state;
public List<Execution> executions;
}
然后您可以按如下方式处理作业:
public Uni<Void> processAllJobs() {
return MyJobEntity.<MyJobEntity>streamAll()
.call(job -> processJob(job))
.call(job -> setJobState(job, "FINISHED"))
.invoke(job -> Log.infof("Job(id=%s, state=COMPLETED) finished successfully", job.id))
.onItem().ignoreAsUni();
}
public Uni<Void> processJob(MyJobEntity job) {
return Multi.createFrom().iterable(job.executions)
.onItem().transformToUniAndConcatenate(execution -> executePartOfJob(execution)
.onFailure()
.invoke(() -> new JobFailedException("Job(id=%s, executionId=%s) could not be completed".formatted(jobId, execution.id)))
)
.onItem().ignoreAsUni()
.onFailure(JobFailedException.class).call(() -> setJobState(job, "FAILED"));
}
此代码将并行处理每个作业,执行部分将按顺序处理。如果执行完成,作业的状态将更改为“COMPETED”之类的状态。否则,它将记录错误并将状态设置为 FAILED。