有关Spring Integration项目的问题,请使用此标记。它不适用于将其他Spring项目与其他技术集成的一般问题。
有人可以澄清一下 CachingClientConnectionFactory 的工作原理吗? 这是我的预期: 当我创建一个具有 100 个 TCP 连接池的 CachingClientConnectionFactory 时,我认为我可以
如何在 Spring Integration 中从 MessageHandlingException 中删除 PII?
我正在尝试从日志中删除 PII。有没有办法告诉 spring 集成不要在异常中包含有效负载和标头。 公共消息 hasErrorAndPii(消息我...
如何摆脱警告:`java.lang.RuntimeException:No beanFactory`
我正在做一个项目。我的目标是使用版本 6.3.2 设计 Spring IntegrationFlow。从 Sftp 服务器读取并拆分和转换接收到的 XML 文件。 我有两个简单的 SftpI...
我正在使用kafka输出通道适配器发送消息。有没有办法在发送消息后获取偏移量,下面是我的代码 我正在使用kafka输出通道适配器发送消息。有没有办法在发送消息后获取偏移量,下面是我的代码 <int-kafka:outbound-channel-adapter id="kafkaCommonOutboundChannelAdapter" kafka-template="kafkaTemplate" header-mapper="kafkaHeaderMapper" auto-startup="true" topic-expression="headers['topic']" partition-id-expression="headers['partition']" sync="true"> <int-kafka:request-handler-advice-chain> <ref bean="requestHandlerAdvice"/> <ref bean="retryAdvice"/> </int-kafka:request-handler-advice-chain> </int-kafka:outbound-channel-adapter> <bean id="requestHandlerAdvice" class="org.springframework.integration.handler.advice.ExpressionEvaluatingRequestHandlerAdvice"> <property name="trapException" value="true"/> <property name="onSuccessExpression" ref="success"/> <property name="successChannelName" value="successChannel"/> <property name="onFailureExpression" ref="failure"/> <property name="failureChannelName" value="failureChannel"/> </bean> 谢谢 KafkaProducerMessageHandler 带有如下选项: /** * Set the success channel. * @param sendSuccessChannel the Success channel. */ public void setSendSuccessChannel(MessageChannel sendSuccessChannel) { 制作成功后最终使用的是: processSendResult(message, producerRecord, sendFuture, getSendSuccessChannel()); 并且这样做: if (metadataChannel != null) { KafkaProducerMessageHandler.this.messagingTemplate.send(metadataChannel, getMessageBuilderFactory() .fromMessage(message) .setHeader(KafkaHeaders.RECORD_METADATA, sendResult.getRecordMetadata()) .build()); } 注意 KafkaHeaders.RECORD_METADATA,您可以通过 RecordMetadata 对象访问偏移量。 查看相应的 XML 属性: <xsd:attribute name="send-success-channel" type="xsd:string"> <xsd:annotation> <xsd:documentation><![CDATA[ Specifies the channel to which message with a payload of type 'org.apache.kafka.clients.producer.RecordMetadata' will be sent after a successful send. ]]></xsd:documentation> <xsd:appinfo> <tool:annotation kind="ref"> <tool:expected-type type="org.springframework.messaging.MessageChannel" /> </tool:annotation> </xsd:appinfo> </xsd:annotation> </xsd:attribute>
我正在使用kafka输出通道适配器发送消息。有没有办法在发送消息后获取偏移量,下面是我的代码 我正在使用kafka输出通道适配器发送消息。有没有办法在发送消息后获取偏移量,下面是我的代码 <int-kafka:outbound-channel-adapter id="kafkaCommonOutboundChannelAdapter" kafka-template="kafkaTemplate" header-mapper="kafkaHeaderMapper" auto-startup="true" topic-expression="headers['topic']" partition-id-expression="headers['partition']" sync="true"> <int-kafka:request-handler-advice-chain> <ref bean="requestHandlerAdvice"/> <ref bean="retryAdvice"/> </int-kafka:request-handler-advice-chain> </int-kafka:outbound-channel-adapter> <bean id="requestHandlerAdvice" class="org.springframework.integration.handler.advice.ExpressionEvaluatingRequestHandlerAdvice"> <property name="trapException" value="true"/> <property name="onSuccessExpression" ref="success"/> <property name="successChannelName" value="successChannel"/> <property name="onFailureExpression" ref="failure"/> <property name="failureChannelName" value="failureChannel"/> </bean> 谢谢 KafkaProducerMessageHandler 带有如下选项: /** * Set the success channel. * @param sendSuccessChannel the Success channel. */ public void setSendSuccessChannel(MessageChannel sendSuccessChannel) { 制作成功后最终使用的是: processSendResult(message, producerRecord, sendFuture, getSendSuccessChannel()); 并且这样做: if (metadataChannel != null) { KafkaProducerMessageHandler.this.messagingTemplate.send(metadataChannel, getMessageBuilderFactory() .fromMessage(message) .setHeader(KafkaHeaders.RECORD_METADATA, sendResult.getRecordMetadata()) .build()); } 注意 KafkaHeaders.RECORD_METADATA,您可以通过 RecordMetadata 对象访问偏移量。 查看相应的 XML 属性: <xsd:attribute name="send-success-channel" type="xsd:string"> <xsd:annotation> <xsd:documentation><![CDATA[ Specifies the channel to which message with a payload of type 'org.apache.kafka.clients.producer.RecordMetadata' will be sent after a successful send. ]]></xsd:documentation> <xsd:appinfo> <tool:annotation kind="ref"> <tool:expected-type type="org.springframework.messaging.MessageChannel" /> </tool:annotation> </xsd:appinfo> </xsd:annotation> </xsd:attribute>
Exchange 服务器在未明确选择收件箱的情况下不接受存储命令
我有一个 ImapMailReceiver 实现来接收来自远程 IMAP 服务器的电子邮件,它与我们迄今为止测试的任何 IMAP 服务器都能正常工作。尽管如此,我们的 Exchange (2016) 服务器之一...
拆分和过滤后如何检测IntegrationFlow处理结束?
在Spring Integration中,创建IntegrationFlow时,当事件启动流时,我有多个操作,包括拆分器和过滤器。我想检测所有消息何时分裂......
在处理程序中引发 ImmediateRequeueAmqpException 会导致 ConditionalRejectingErrorHandler 记录错误堆栈跟踪
在我的 Spring 集成流程中,我抛出一个 ImmediateRequeueAmqpException 来获取重新排队的消息以供另一个实例处理该消息,但我得到了由
我正在动态注册多个IntegrationFlow: 私有 IntegrationFlowContext 上下文; context.registration(flow1()).register(); context.registration(flow1()).register(); 集成流程...
使用 MockIntegrationContext 检查出站 FTP 适配器的负载
我有一个 Spring 集成流程,如下所示(简化) @豆 公共标准集成流集成流(){ 返回 IntegrationFlows.from(inputChannel()) .
Spring 集成:多线程 HTTP 请求聚合仅回复释放组的最后一个请求,其他请求超时
我正在尝试聚合多个http请求负载并一起处理它们。所以,我创建了一个 Spring Boot 应用程序,其中包含: RestController:我期望获得有效负载的地方。 一个
Spring 集成:Poller.fixedDealy 在多线程应用程序中未按预期工作
我已经使用 Spring Integration 创建了一个应用程序。它从数据库读取数据并将行转换为 XML 格式并发送给 kafka。 我已经使用 Executor 创建多线程进行转换...
如何解决 SPRING TCP SSL No subject Alternative Names Present
我正在尝试通过 SSL 使用 TCP 出站网关连接到服务器。我正在使用 ClientConnectionFactory。我实现了这个 我还使用自定义的 TcpNetSSLSupport,我相信......
Spring Integration DSL:在 .routeToRecipients() 之后,我可以将子流的输出路由回主流吗?
我有以下代码,我在 mainFlow 中分支到recipientFlows,然后通过直接通道 PERSIST_TO_DB_CH 在单独的 persistToDbFlow 中处理这些recipientFlows 的结果...
使用publishSubscribeChannel,在Spring集成中,如果一个订阅失败,另一个订阅还会运行吗?
让我们假设“throwsException”处理程序抛出一个异常。 通常,这将进入错误通道。 但是,第二次订阅还会继续吗? 消息会被看到吗...
我正在尝试为我的 spring 集成流程编写集成测试。我想用 MockRestServiceServer 记录传出请求(使用 http:outbound-gateway)并将其匹配到 Rest 服务...
为什么我可以通过telnet连接,但无法从远程客户端建立连接?
我正在编写一个 Spring Boot 应用程序,它充当两个外部客户端(例如 serviceA 和 serviceB)之间的中间人。通信将通过套接字连接,即 tcp/ip 贯穿......
如何访问Spring Integration InboundChannelAdapter中的请求路径
我有一个 Spring Integration DSL Flow,我正在尝试访问 HTTP 请求路径。可能的路径看起来像这样 /import:导入所有内容 /import/case1:仅导入 A 部分 /导入/c...
问题 我们有一个具有以下消息驱动通道适配器配置的遗留应用程序 问题 我们有一个具有以下消息驱动通道适配器配置的旧应用程序 <jms:message-driven-channel-adapter id="processRequest" destination="requestFromLoader" connection-factory="connectionFactory" max-concurrent-consumers="30" message-converter="xmlMarshalConverter" channel="jmsInChannel" error-channel="errorChannel"/> 使用此配置max-concurrent-consumers =“30”我们预计有30个并行消费者(processRequest-container-1到30)来消费消息,但在测试过程中我们发现每个消费者依次消费10条消息,Eg - processRequest-container-1 处理消息 1 到 10,然后创建 processRequest-container-2 并处理 11 到 20 等等.. 如果只有 2 个请求,这会导致第二条消息等待,即使还有 29 个其他消费者。 分辨率 我们添加了 max-messages-per-task="1",这似乎产生了正确的结果,但我们仍在使用大量消息进行测试。 <jms:message-driven-channel-adapter id="processRequest" destination="requestFromLoader" connection-factory="connectionFactory" max-concurrent-consumers="30" max-messages-per-task="1" message-converter="xmlMarshalConverter" channel="jmsInChannel" error-channel="errorChannel"/> 问题 我们通知日志中的消费者名称(processRequest-container-1)将达到2000或3000(processRequest-container-2546),并且不限于processRequest-container-1到processRequest-container-30,但在第一种情况下,没有 max-messages-per-task="1" 它没有超出 processRequest-container-30,这正常吗?或者我们需要做些什么吗? 在spring文档中max-messages-per-task =“1”被定义为每条消息要执行的任务,什么是任务以及最大并行消费者如何受此影响? 添加 max-messages-per-task="1" 正确还是我们遗漏/忽略了其他内容? 任何建议或评论都很好。预先感谢。 这个问题更多的是关于 Spring JMS,而不是 Spring Integration。 最好阅读用于上述 DefaultMessageListenerContainer 的 <jms:message-driven-channel-adapter> 的源代码和 Javadoc。 有一条有趣的评论可能可以解释您的行为: /** * Specify the maximum number of concurrent consumers to create. Default is 1. * <p>If this setting is higher than "concurrentConsumers", the listener container * will dynamically schedule surplus consumers at runtime, provided that enough * incoming messages are encountered. Once the load goes down again, the number of * consumers will be reduced to the standard level ("concurrentConsumers") again. * <p>Raising the number of concurrent consumers is recommendable in order * to scale the consumption of messages coming in from a queue. However, * note that any ordering guarantees are lost once multiple consumers are * registered. In general, stick with 1 consumer for low-volume queues. * <p><b>Do not raise the number of concurrent consumers for a topic, * unless vendor-specific setup measures clearly allow for it.</b> * With regular setup, this would lead to concurrent consumption * of the same message, which is hardly ever desirable. * <p><b>This setting can be modified at runtime, for example through JMX.</b> * @see #setConcurrentConsumers */ public void setMaxConcurrentConsumers(int maxConcurrentConsumers) { 因此,由于您不使用 concurrent-consumers="30" 来代替,因此您获得最多 30 个动态实例,并且它们的 id 确实增加了,这是正常的。 如果不指定max-messages-per-task="1",逻辑是这样的: if (this.taskExecutor instanceof SchedulingTaskExecutor ste && ste.prefersShortLivedTasks()) { if (this.maxMessagesPerTask == Integer.MIN_VALUE) { // TaskExecutor indicated a preference for short-lived tasks. According to // setMaxMessagesPerTask javadoc, we'll use 10 message per task in this case // unless the user specified a custom value. this.maxMessagesPerTask = 10; } } 或者你的taskExecutor不是那个SchedulingTaskExecutor(这确实是默认情况下),因此你的所有消息都由同一个消费者实例处理。 一些文档也在这里:https://docs.spring.io/spring-framework/reference/integration/jms/using.html#jms-mdp
Spring Integration 如何根据消息动态设置本地目录/远程目录以发送到远程 sftp 或从中获取文件
我目前正在尝试对一个使用 Spring 集成从 SFTP 服务器发送和接收文件的项目进行更改。 更改包括添加其他 SFTP 服务器,选择正确的服务器...