有关Spring Integration项目的问题,请使用此标记。它不适用于将其他Spring项目与其他技术集成的一般问题。
我有一个 Spring Boot Web 应用程序,它会在 120 秒超时后正常关闭。 最近,我们向同一应用程序添加了 Spring 集成流程,以便能够处理某些文件...
我有一个 websocket 端点,允许客户端订阅与特定对话相关的消息。消息保存在 MongoDb 集合中,因为有多个实例......
如何在解析SFTP服务器上的所有文件时停止IntegrationFlow,并在第二天使用计划功能重新运行它
我有一个 StandardIntegrationFlow,它从 SFTP 服务器上的一堆文件夹中读取一些文件,进行过滤,然后处理通过过滤器的文件。我的文件结构是这样的:/...
我有一个 Spring Integration 流程,它从数据流中检索数据,对其进行处理,并将结果保存在关系数据库中。流程运行得很好。我想停止/恢复 f...
如果 ResolvableType 已在标头中,则会发生 InvalidDefinitionException
我们在项目中使用了spring集成和spring kafka。 从 Spring-boot 3.1.7 更新到 3.2.1 后,我们在消息序列化期间遇到异常。 给定流程: @豆 综合...
SftpOutboundGateway MGet 在运行时更改远程目录
我想尝试使用 MGet 命令从多个远程目录中提取文件,并将其存储在单独的本地目录中。但文件没有被下载,也没有抛出错误......
Spring Integration 和 JMS:从外部客户端接收消息
在我们的系统中,外部客户端将消息放入 JMS 队列中。 要求我们的 Spring Integration 应用程序从这些队列中获取消息并处理它们。 我最初的尝试...
我在网上看到的大多数资源都是通过以下方式声明MessageChannels: @豆 消息通道 myChannel() { 返回新的 MessageChannels.direct().get(); } 看来 .get() 方法...
我有一个 Spring 集成流程,其中我目前通过使用 Thread.sleep() 引入了手动条件延迟 我知道这是低效的,现在想重构整个......
MongoDB 入站适配器更新表达式和事务同步 DSL 示例
与 5 年前的这个问题类似,我试图定义一个 mongodb 源,在轮询时将记录标记为“正在处理”。当流程完成后,我想将记录标记为“完成”。很多都有
Spring Integration SMB 在 Linux(CentOS 7) 上无效
我使用Spring Integration SMB将本地文件上传到Windows共享文件夹,在本地windows环境和idea中可以正常运行,但是当我将应用程序打成jar包时......
Dispatcher 在使用 spring-integation-mail 时没有订阅者
我们有一个多租户应用程序,它允许用户按需创建 IMAP 邮件获取器(也称为邮件接收器)并单独处理它们。为此,我们决定使用:spring-integration-mail (v ...
如何在 HttpRequestExecutingMessageHandler 中记录超时错误
我希望能够在使用HttpRequestExecutingMessageHandler时记录超时错误异常。我做了以下事情: @豆 公共 HttpRequestExecutingMessageHandler
如果使用 ImapMailReceiver 将 autoCloseFolder 设置为 false,则手动关闭文件夹
阅读 spring-integration-mail 我尝试加载电子邮件的内容,包括附件。结果,我显然设置了autoCloseFolder=false。现在,文档提到 ClosableResource 应该是你...
在为依赖 Kafka 作为消息源的 Spring 集成应用程序编写集成测试时,我经常遇到在两个测试用例之间创建干净的测试状态的问题,因为......
即使启用了一次,Google Cloud PubSub 也会在确认截止日期之前重新发送消息
我有一个 PubSub 订阅,启用了一次性传送和指数重试。指数重试的最小退避时间为 10 秒,最大退避时间为 600 秒。 ack 截止时间设置为 60 秒...
如果中间有一个带有轮询器的 QueueChannel,我将面临正确设置跟踪传播的问题:( 流程由 HTTP 调用发起,从 spring 触发 MessagingGateway
如何在Spring Integration中自定义MqttSubscription?
我在 Spring Integration 中使用 org.eclipse.paho.mqttv5.client 并尝试在 mqtt 中设置无本地选项,如下所示: @豆 公共 MessageProducer 入站(ClientManager 我在 Spring Integration 中使用 org.eclipse.paho.mqttv5.client 并尝试在 mqtt 中设置 no local 选项,如下所示: @Bean public MessageProducer inbound(ClientManager<IMqttAsyncClient, MqttConnectionOptions> clientManager) { Mqttv5PahoMessageDrivenChannelAdapter adapter = new Mqttv5PahoMessageDrivenChannelAdapter( clientManager, "test" ); adapter.setCompletionTimeout(5000); adapter.setQos(2); adapter.connectComplete(true); adapter.setOutputChannel(mqttInputChannel()); return adapter; } 但是Mqttv5PahoMessageDrivenChannelAdapter没有办法设置MqttSubscription(有mqtt的no-local的配置) 在Mqttv5PahoMessageDrivenChannelAdapter类中,它有一个方法subscribe: private void subscribe() { var clientManager = getClientManager(); if (clientManager != null && this.mqttClient == null) { this.mqttClient = clientManager.getClient(); } String[] topics = getTopic(); ApplicationEventPublisher applicationEventPublisher = getApplicationEventPublisher(); this.topicLock.lock(); try { if (topics.length == 0) { return; } int[] requestedQos = getQos(); MqttSubscription[] subscriptions = IntStream.range(0, topics.length) .mapToObj(i -> new MqttSubscription(topics[i], requestedQos[i])) .toArray(MqttSubscription[]::new); IMqttMessageListener listener = this::messageArrived; IMqttMessageListener[] listeners = IntStream.range(0, topics.length) .mapToObj(t -> listener) .toArray(IMqttMessageListener[]::new); this.mqttClient.subscribe(subscriptions, null, null, listeners, null) .waitForCompletion(getCompletionTimeout()); String message = "Connected and subscribed to " + Arrays.toString(topics); logger.debug(message); if (applicationEventPublisher != null) { applicationEventPublisher.publishEvent(new MqttSubscribedEvent(this, message)); } } catch (MqttException ex) { if (applicationEventPublisher != null) { applicationEventPublisher.publishEvent(new MqttConnectionFailedEvent(this, ex)); } logger.error(ex, () -> "Error subscribing to " + Arrays.toString(topics)); } finally { this.topicLock.unlock(); } } 但它仅使用参数 MqttSubscription 和 topic 创建 qos: MqttSubscription[] subscriptions = IntStream.range(0, topics.length).mapToObj(i -> new MqttSubscription(topics[i], requestedQos[i])).toArray(MqttSubscription[]::new); 这是我们在引入 MQTT v5 支持时错过的东西。 看起来我们必须引入类似基于 MqttSubscription 的构造函数之类的东西,作为普通 topic 及其 qos 的替代选项。这样您就可以对每个订阅进行细粒度配置。 请提出 GH 问题,我们将在下一个 Spring Integration 版本中解决该问题。 作为解决方法,我只能建议直接使用 Paho API。自定义 MessageProducerSupport impl 可用于将其与项目中集成流程的其余部分连接起来。
通过 int-jdbc:inbound-channel-adapter 使用选择查询的时间更新查询
我想要执行的是使用链接到某个数据源的 int-jdbc:inbound-channel-adapter 来消费消息,然后更新将保留的consumer_table(或偏移表),用于 sp...
spring-integration 5.5.18 消息网关超时
我使用 @MessagingGateway 是为了在 spring-mvc @Controller 和 spring-integration 之间架起桥梁。 @MessagingGateway 声明如下所示: 包a.b.c; 导入org.springframework。