有关Spring Integration项目的问题,请使用此标记。它不适用于将其他Spring项目与其他技术集成的一般问题。
尝试使用 Spring Integration 和 MySQL 实现发件箱模式时出现“超出锁定等待超时”
我正在尝试使用 Spring Integration 实现发件箱模式。我配置了以下 bean: @配置 公共类 SpringIntegrationTestApplicationConfiguration { 私人静态
我正在尝试使用以下代码创建一个SSE Spring Cloud Stream供应商: @豆 fun sseSupplier(sseSupplierPub:发布者>):供应商 我正在尝试使用以下代码创建 SSE Spring Cloud Stream 供应商: @Bean fun sseSupplier(sseSupplierPub: Publisher<Message<Any>>): Supplier<Flux<Message<Any>>> { return Supplier { Flux.from(sseSupplierPub) } } @Bean fun sseSupplierPub( sseSupplierProperties: sseSupplierProperties ): Publisher<Message<Any>> { return IntegrationFlow.from(channelsConfiguration.trigger()) .enrichHeaders { it.header(CONTENT_TYPE, "application/json") it.header(ACCEPT, "text/event-stream") } .handle(sseMessageHandlerSpec()) .log() .toReactivePublisher(true) } fun webClient(): WebClient { val client = HttpClient.create(provider) .keepAlive(true) val clientRegistryRepo = InMemoryReactiveClientRegistrationRepository( ClientRegistration .withRegistrationId("clientId") .tokenUri("https://openid-connect/token") .clientId("clientId") .clientSecret("clientSecret") .authorizationGrantType(AuthorizationGrantType.CLIENT_CREDENTIALS) .build()) val clientService = InMemoryReactiveOAuth2AuthorizedClientService(clientRegistryRepo) val authorizedClientManager = AuthorizedClientServiceReactiveOAuth2AuthorizedClientManager(clientRegistryRepo, clientService) val oauthFilter = ServerOAuth2AuthorizedClientExchangeFilterFunction(authorizedClientManager) oauthFilter.setDefaultClientRegistrationId("clientId") return WebClient.builder() .filter(oauthFilter) .clientConnector(ReactorClientHttpConnector(client)) .exchangeStrategies(strategies) .build() } fun sseMessageHandlerSpec(): WebFluxMessageHandlerSpec { return WebFlux.outboundGateway( "https://api/src", webClient()) .httpMethod(HttpMethod.POST) .replyPayloadToFlux(true) .expectedResponseType(typeReference<ServerSentEvent<String>>()) } @Bean fun commandLineRunner(ctx: ApplicationContext?): CommandLineRunner { return CommandLineRunner { _ -> val message = MessageBuilder.withPayload("{}").build() logger.info { "Trigger SSE supplier" } channelsConfiguration.trigger().send(message) } } 我能够获取授权令牌并成功连接到https://api/src,但除了此消息之外我无法获得任何响应 { "scanAvailable": true, "prefetch": -1 } 当我连接时。收到上述消息后,我将不会再收到任何消息。可能是什么问题? 我设法接收数据,但我需要订阅响应中的body(请参阅下面更新的代码)。我现在面临的问题是如何将DataBuffer映射到ServerSentEvent。 @Bean fun sseSupplier(sseSupplierPub: Publisher<Message<Any>>): Supplier<Flux<Message<Any>>> { return Supplier { Flux.from(sseSupplierPub) } } @Bean fun sseSupplierPub( sseSupplierProperties: sseSupplierProperties ): Publisher<Message<Any>> { return IntegrationFlow.from(channelsConfiguration.trigger()) .enrichHeaders { it.header(CONTENT_TYPE, "application/json") it.header(ACCEPT, "text/event-stream") } .handle(sseMessageHandlerSpec()) .channel(channelsConfiguration.sseMessage()) .log() .toReactivePublisher(true) } fun webClient(): WebClient { val client = HttpClient.create(provider) .keepAlive(true) val clientRegistryRepo = InMemoryReactiveClientRegistrationRepository( ClientRegistration .withRegistrationId("clientId") .tokenUri("https://openid-connect/token") .clientId("clientId") .clientSecret("clientSecret") .authorizationGrantType(AuthorizationGrantType.CLIENT_CREDENTIALS) .build()) val clientService = InMemoryReactiveOAuth2AuthorizedClientService(clientRegistryRepo) val authorizedClientManager = AuthorizedClientServiceReactiveOAuth2AuthorizedClientManager(clientRegistryRepo, clientService) val oauthFilter = ServerOAuth2AuthorizedClientExchangeFilterFunction(authorizedClientManager) oauthFilter.setDefaultClientRegistrationId("clientId") return WebClient.builder() .filter(oauthFilter) .clientConnector(ReactorClientHttpConnector(client)) .exchangeStrategies(strategies) .build() } fun sseMessageHandlerSpec(): WebFluxMessageHandlerSpec { return WebFlux.outboundGateway( "https://api/src", webClient()) .httpMethod(HttpMethod.POST) .bodyExtractor { inputMessage, _ -> inputMessage.body.subscribe { val bytes = ByteArray(it.readableByteCount()) it.read(bytes) DataBufferUtils.release(it) val output = String(bytes) val message = MessageBuilder.withPayload(output).build() logger.debug { "SSE message payload => ${message.payload}" } channelsConfiguration.sseMessage().send(message) } } } @Bean fun commandLineRunner(ctx: ApplicationContext?): CommandLineRunner { return CommandLineRunner { _ -> val message = MessageBuilder.withPayload("{}").build() logger.info { "Trigger SSE supplier" } channelsConfiguration.trigger().send(message) } }
我们想通过使用 Spring 集成来实现 Outbox 模式。 从这个例子开始,我们想出了这个更简单的解决方案: 受保护的 IntegrationFlowDefinition buildFlow() {
通过 Spring Integration Sftp 跟踪多个文件夹以下载文件
我尝试更改应用程序以跟踪多个文件夹并从 Sftp 获取文件。 它构建在最新的 Spring 堆栈、Spring Boot 3.2.0、Spring Integration 6.2.0 等之上。 关注官方
实现用于从文件服务器归档文件的 AWS lambda 应用程序的方法
我想寻求有关如何实施AWS Lambda应用程序(在我们团队的AWS账户中)来执行文件归档任务的建议。 客观的 要存档 Windows 服务器上托管的文件...
有哪些好的资源可以帮助您开始 Spring 集成。 我找到的都是 2012 年的书籍,并且 Spring 文档没有为初学者提供足够的解释。
Spring Integration SFTP fileExistsMode进行mv操作
为什么mv操作时不能使用FileExistsMode? FAIL 是 mv 操作的默认值吗? AbstractRemoteFileOutboundGateway.Command.MV .fileExistsMode(FileExistsMode.REPLACE) 为什么不...
我已经尝试弄清楚在 Spring 框架内通过 TCP 实现自定义应用程序协议的正确方法是什么。你们能就这个话题跟我谈谈吗? 我...
在 SpringBootTest 带注释的测试中运行代码时,Spring 集成流程不会启动
我有一个集成流程,由消息网关接口调用启动 @MessagingGateway(defaultRequestChannel = "aChn") 接口A { 有趣的民意调查(c:C) } // 踢代码...
在Java应用程序中使用Databricks数据库数据的最佳方式是什么?
我需要检索存储在Databricks平台中的数据。我可以看到它可以使用 Databricks-SDK 以及 Databricks API 路线来实现,但没有在任何地方看到获得...
在 Spring Integration 中,处理程序和拦截器看起来基本上实现了相同的目标。甚至还有一些“受骗”的实现,例如 MessageTransformingChannelInterceptor 和
Spring Integration sftp 新文件不会轮询,直到消息处理程序遍历本地缓存的所有文件列表
@Bean 公共 SessionFactory sftpSessionFactory() { DefaultSftpSessionFactory 工厂 = new DefaultSftpSessionFactory(true); 工厂.setHost(sftpHost);...
使用 SFTP 出站通道适配器通过 SFTP 将数据流式传输到远程文件服务器
在我的 Spring Boot 应用程序中,我必须从数据库读取大量数据,将其转换为 CSV 文件并将其上传到 SFTP 服务器。由于文件可能很大,我无法读取内存中的整个文件,然后你...
我想知道如何编写 Spring 测试来断言由“SourcePollingChannelAdapter”触发的逻辑链。 我想到了什么: 使用 Thread.sleep() 这对于测试来说确实是个坏主意 有
使用 MappingJackson2MessageConverter 处理已删除的 MQTT 主题
在自定义的IntegrationFlow中,我使用MappingJackson2MessageConverter进行订阅,以从JSON生成相应的POJO。到目前为止,这工作得很好。 然而,一些主题被保留...
在应用程序上运行高负载时,我们会遇到极高的 CPU(15 个内核,100%)。配置文件显示 SpEL 使用了 20% 以上,其中主要由 ServiceActivatingHandler 使用。
我想配置重试建议。 这行得通吗? IntegrationFlows.from("inputChannel") .transform(theTranformer , "theTransfomerMethod" , e -> e.handleMessageAdvice(new
首先,我列出了一个工作正常的目录中的文件。列出后,我想将文件移动到另一个也可以工作的目录。然而,保留原汁原味是行不通的...
同一个 StompSessionManager 实例可以用于不同的目的地吗?
我有一个带有 StompInboundChannelAdapter 的 Spring 集成流程,用于侦听来自一个目标的传入消息,还有另一个带有 StompMessageHandler 的集成流程,用于将消息发送到
spring-Integration - http 入站网关响应和异步处理 - 任务执行器线程仅处理一条消息而不是所有消息
说明: 我们有一个 Spring 集成流程,我们想要配置一个端点,分割负载,返回给消费者,即使用 http 代码 202,其他项目必须继续...