如何停止另一个线程切换上下文

问题描述 投票:0回答:0

我正在开发 Micronaut 应用程序。在这个应用程序中,有一个像工作流引擎一样工作的主要计划作业,多个步骤处理大约 60 万条记录的数据。

随着这个计划的工作,我们有一个事件侦听器,它不断获取事件,在接收方,我们将事件有效负载保存在一个表中。

我们还在应用程序中维护一个健康检查端点,每 1 分钟调用一次并返回由后台作业准备的响应(每 2 分钟运行一次)。

HealthCheckStatus status;

@Get
@ExecuteOn(TaskExecutors.IO)
public HealthCheckStatus index() {
    log.debug("HealthCheck requested");
    return status;
}

@Scheduled(fixedDelay = "${scheduler.healthcheck.fixedDelay}", initialDelay = "${scheduler.healthcheck.initialDelay}")
public void refreshStatus() {
   log.debug("HealthCheck refresh requested...");
   status = // Here we are checking internal dependencies and populating this status
}

应用程序.yml:

micronaut:
  application:
    name: cbo-adaptor
  netty:
    event-loops:
      httpclientthreadpool:
        num-threads: 10
        prefer-native-transport: true
  executors:
    io:
      type: fixed
      nThreads: 50

最近我们发现工作流引擎的几个步骤出现了一些延迟。如果我检查日志,我可以看到有很多与收到的事件相关的日志和健康检查 b/w 两个日志文件,预计将立即一个接一个地执行。

{"@timestamp":"2023-03-18T03:31:29.948Z","logger_name":"com.org.implementation.JobService","level":"DEBUG","level_value":10000,"job":"1234","job_status":"WORKING","job_step":"ProcessingStep","origin":"JOB_TYPE1,"message":"start processing","thread_name":"scheduled-executor-thread-2"}
{"@timestamp":"2023-03-18T03:31:38.198Z","logger_name":"com.org.endpoints.healthcheck.HealthCheckController","level":"DEBUG","level_value":10000,"message":"HealthCheck requested","thread_name":"io-executor-thread-23","v":"0.0.35.4-20230310.2"}
{"@timestamp":"2023-03-18T03:32:13.122Z","logger_name":"com.org.endpoints.healthcheck.HealthCheckController","level":"DEBUG","level_value":10000,"message":"HealthCheck requested","thread_name":"io-executor-thread-13","v":"0.0.35.4-20230310.2"}
{"@timestamp":"2023-03-18T03:32:59.025Z","logger_name":"com.org.endpoints.healthcheck.HealthCheckController","level":"DEBUG","level_value":10000,"message":"HealthCheck refresh requested...","thread_name":"scheduled-executor-thread-4","v":"0.0.35.4-20230310.2"}
{"@timestamp":"2023-03-18T03:33:13.123Z","logger_name":"com.org.endpoints.healthcheck.HealthCheckController","level":"DEBUG","level_value":10000,"message":"HealthCheck requested","thread_name":"io-executor-thread-40","v":"0.0.35.4-20230310.2"}
{"@timestamp":"2023-03-18T03:33:38.701Z","logger_name":"com.org.endpoints.healthcheck.HealthCheckController","level":"DEBUG","level_value":10000,"message":"HealthCheck requested","thread_name":"io-executor-thread-6","v":"0.0.35.4-20230310.2"}
{"@timestamp":"2023-03-18T03:34:13.122Z","logger_name":"com.org.endpoints.healthcheck.HealthCheckController","level":"DEBUG","level_value":10000,"message":"HealthCheck requested","thread_name":"io-executor-thread-1","v":"0.0.35.4-20230310.2"}
{"@timestamp":"2023-03-18T03:35:05.487Z","logger_name":"com.org.implementation.MyEventService","level":"DEBUG","level_value":10000,"message":"event received: MyDto(id=12313, type=A, timestamp=2023-03-18T03:35:05.487514, payload=Dto(field1=value1, field2=value2)","thread_name":"pool-4-thread-1"}
{"@timestamp":"2023-03-18T03:37:00.167Z","logger_name":"com.org.implementation.JobService","level":"DEBUG","level_value":10000,"job":"1234","job_status":"WORKING","job_step":"ProcessingStep","origin":"JOB_TYPE1","message":"completed processing","thread_name":"scheduled-executor-thread-2"}

这里是 ProcessingStep 的方法:

public void processData(UUID jobId) {
    log.debug("started Processing");
    repository.getItem(jobId, JobType.TYPE_1) //not records of TYPE_1 in db for this case
            .map(Item::getId)
            .map(this::processType1Data) //calling another api on itemId
            .blockingSubscribe();
    log.debug("completed processing");

}

在上述情况下,对于特定的作业 ID,我们在数据库中没有 TYPE_1 项目,所以理想情况下,两个日志应该在毫秒/秒内一个接一个地打印,但是根据日志

completed processing
在延迟后执行5-6 分钟和黑白日志与健康检查和事件接收的执行有关。

我相信这是由于上下文切换 b/w 线程但仍然没有太多深入的了解。

我希望我的工作能够快速完成而不受事件的影响。

multithreading threadpool micronaut executor context-switching
© www.soinside.com 2019 - 2024. All rights reserved.