我正在开发 Micronaut 应用程序。在这个应用程序中,有一个像工作流引擎一样工作的主要计划作业,多个步骤处理大约 60 万条记录的数据。
随着这个计划的工作,我们有一个事件侦听器,它不断获取事件,在接收方,我们将事件有效负载保存在一个表中。
我们还在应用程序中维护一个健康检查端点,每 1 分钟调用一次并返回由后台作业准备的响应(每 2 分钟运行一次)。
HealthCheckStatus status;
@Get
@ExecuteOn(TaskExecutors.IO)
public HealthCheckStatus index() {
log.debug("HealthCheck requested");
return status;
}
@Scheduled(fixedDelay = "${scheduler.healthcheck.fixedDelay}", initialDelay = "${scheduler.healthcheck.initialDelay}")
public void refreshStatus() {
log.debug("HealthCheck refresh requested...");
status = // Here we are checking internal dependencies and populating this status
}
应用程序.yml:
micronaut:
application:
name: cbo-adaptor
netty:
event-loops:
httpclientthreadpool:
num-threads: 10
prefer-native-transport: true
executors:
io:
type: fixed
nThreads: 50
最近我们发现工作流引擎的几个步骤出现了一些延迟。如果我检查日志,我可以看到有很多与收到的事件相关的日志和健康检查 b/w 两个日志文件,预计将立即一个接一个地执行。
{"@timestamp":"2023-03-18T03:31:29.948Z","logger_name":"com.org.implementation.JobService","level":"DEBUG","level_value":10000,"job":"1234","job_status":"WORKING","job_step":"ProcessingStep","origin":"JOB_TYPE1,"message":"start processing","thread_name":"scheduled-executor-thread-2"}
{"@timestamp":"2023-03-18T03:31:38.198Z","logger_name":"com.org.endpoints.healthcheck.HealthCheckController","level":"DEBUG","level_value":10000,"message":"HealthCheck requested","thread_name":"io-executor-thread-23","v":"0.0.35.4-20230310.2"}
{"@timestamp":"2023-03-18T03:32:13.122Z","logger_name":"com.org.endpoints.healthcheck.HealthCheckController","level":"DEBUG","level_value":10000,"message":"HealthCheck requested","thread_name":"io-executor-thread-13","v":"0.0.35.4-20230310.2"}
{"@timestamp":"2023-03-18T03:32:59.025Z","logger_name":"com.org.endpoints.healthcheck.HealthCheckController","level":"DEBUG","level_value":10000,"message":"HealthCheck refresh requested...","thread_name":"scheduled-executor-thread-4","v":"0.0.35.4-20230310.2"}
{"@timestamp":"2023-03-18T03:33:13.123Z","logger_name":"com.org.endpoints.healthcheck.HealthCheckController","level":"DEBUG","level_value":10000,"message":"HealthCheck requested","thread_name":"io-executor-thread-40","v":"0.0.35.4-20230310.2"}
{"@timestamp":"2023-03-18T03:33:38.701Z","logger_name":"com.org.endpoints.healthcheck.HealthCheckController","level":"DEBUG","level_value":10000,"message":"HealthCheck requested","thread_name":"io-executor-thread-6","v":"0.0.35.4-20230310.2"}
{"@timestamp":"2023-03-18T03:34:13.122Z","logger_name":"com.org.endpoints.healthcheck.HealthCheckController","level":"DEBUG","level_value":10000,"message":"HealthCheck requested","thread_name":"io-executor-thread-1","v":"0.0.35.4-20230310.2"}
{"@timestamp":"2023-03-18T03:35:05.487Z","logger_name":"com.org.implementation.MyEventService","level":"DEBUG","level_value":10000,"message":"event received: MyDto(id=12313, type=A, timestamp=2023-03-18T03:35:05.487514, payload=Dto(field1=value1, field2=value2)","thread_name":"pool-4-thread-1"}
{"@timestamp":"2023-03-18T03:37:00.167Z","logger_name":"com.org.implementation.JobService","level":"DEBUG","level_value":10000,"job":"1234","job_status":"WORKING","job_step":"ProcessingStep","origin":"JOB_TYPE1","message":"completed processing","thread_name":"scheduled-executor-thread-2"}
这里是 ProcessingStep 的方法:
public void processData(UUID jobId) {
log.debug("started Processing");
repository.getItem(jobId, JobType.TYPE_1) //not records of TYPE_1 in db for this case
.map(Item::getId)
.map(this::processType1Data) //calling another api on itemId
.blockingSubscribe();
log.debug("completed processing");
}
在上述情况下,对于特定的作业 ID,我们在数据库中没有 TYPE_1 项目,所以理想情况下,两个日志应该在毫秒/秒内一个接一个地打印,但是根据日志
completed processing
在延迟后执行5-6 分钟和黑白日志与健康检查和事件接收的执行有关。
我相信这是由于上下文切换 b/w 线程但仍然没有太多深入的了解。
我希望我的工作能够快速完成而不受事件的影响。