如何在Future.compose链中的所有期货成功时收到通知?

问题描述 投票:0回答:1

我的应用程序(在内部调用其他REST服务的典型REST服务器)有两个主要类来执行引导过程。

Application.kt类应该配置vertx实例本身并注册某些模块(例如jackson kotlin集成):

class Application(
    private val profileSetting: String? = System.getenv("ACTIVE_PROFILES"),
    private val logger: Logger = LoggerFactory.getLogger(Application::class.java)!!
) {

    fun bootstrap() {
        val profiles = activeProfiles()
        val meterRegistry = configureMeters()
        val vertx = bootstrapVertx(meterRegistry)

        vertx.deployVerticle(ApplicationBootstrapVerticle(profiles)) { startup ->
            if (startup.succeeded()) {
                logger.info("Application startup finished")
            } else {
                logger.error("Application startup failed", startup.cause())
                vertx.close()
            }
        }
    }
}

此外,还有一个ApplicationBootstrapVerticle.kt类,它应该按照定义的顺序部署不同的Verticle。其中一些是顺序的,其中一些并行:

class ApplicationBootstrapVerticle(
    private val profiles: List<String>,
    private val logger: Logger = LoggerFactory.getLogger(ApplicationBootstrapVerticle::class.java)
) : AbstractVerticle() {

    override fun start(startFuture: Future<Void>) {
        initializeApplicationConfig().compose {
            logger.info("Application configuration initialized")
            initializeScheduledJobs()
        }.compose {
            logger.info("Scheduled jobs initialized")
            initializeRestEndpoints()
        }.compose {
            logger.info("Http server started")
            startFuture
        }.setHandler { ar ->
            if (ar.succeeded()) {
                startFuture.complete()
            } else {
                startFuture.fail(ar.cause())
            }
        }
    }

    private fun initializeApplicationConfig(): Future<String> {
        return Future.future<String>().also {
            vertx.deployVerticle(
                ApplicationConfigVerticle(profiles),
                it.completer()
            )
        }
    }

    private fun initializeScheduledJobs(): CompositeFuture {
        val stationsJob = Future.future<String>()
        val capabilitiesJob = Future.future<String>()

        return CompositeFuture.all(stationsJob, capabilitiesJob).also {
            vertx.deployVerticle(
                StationQualitiesVerticle(),
                stationsJob.completer()
            )
            vertx.deployVerticle(
                VideoCapabilitiesVerticle(),
                capabilitiesJob.completer()
            )
        }
    }

    private fun initializeRestEndpoints(): Future<String> {
        return Future.future<String>().also {
            vertx.deployVerticle(
                RestEndpointVerticle(dispatcherFactory = RouteDispatcherFactory(vertx)),
                it.completer()
            )
        }
    }
}

我不确定这是否是所谓的引导应用程序的方式,如果有的话。但更重要的是,我不确定我是否正确理解Future.compose机制。

应用程序成功启动,我看到所有想要的日志消息,除了

应用程序启动完成

信息。如果成功,也永远不会调用以下代码:

}.setHandler { ar ->
    if (ar.succeeded()) {
       startFuture.complete()
    } else {
       startFuture.fail(ar.cause())
    }
}

如果发生故障,例如当我的应用程序配置文件(yaml)因目标实体中存在未知字段而无法解析时,日志消息

应用启动失败

出现在日志中,并且还调用上面的代码。

我很好奇我的期货连锁店出了什么问题。我认为处理程序将在前一个期货成功或其中一个失败后调用,但我认为只有在成功的情况下调用它。

更新

我想缺少对startFuture.complete()的调用。通过调整start方法,它最终起作用:

override fun start(startFuture: Future<Void>) {
    initializeApplicationConfig().compose {
        logger.info("Application configuration initialized")
        initializeScheduledJobs()
    }.compose {
        logger.info("Scheduled jobs initialized")
        initializeRestEndpoints()
    }.compose {
        logger.info("Http server started")
        startFuture.complete()
        startFuture
    }.setHandler(
        startFuture.completer()
    )
}

我不确定,如果这是处理这个未来链的假设方式。

kotlin callback vert.x
1个回答
0
投票

对我有用的解决方案如下:

override fun start(startFuture: Future<Void>) {
    initializeApplicationConfig().compose {
        logger.info("Application configuration initialized")
        initializeScheduledJobs()
    }.compose {
        logger.info("Scheduled jobs initialized")
        initializeRestEndpoints()
    }.setHandler { ar ->
        if(ar.succeeded()) {
            logger.info("Http server started")
            startFuture.complete()
        } else {
            startFuture.fail(ar.cause())
        }
    }
}
© www.soinside.com 2019 - 2024. All rights reserved.