spring-integration 相关问题

有关Spring Integration项目的问题,请使用此标记。它不适用于将其他Spring项目与其他技术集成的一般问题。

TCP 缓存连接未按预期工作

有人可以澄清一下 CachingClientConnectionFactory 的工作原理吗? 这是我的预期: 当我创建一个具有 100 个 TCP 连接池的 CachingClientConnectionFactory 时,我认为我可以

回答 1 投票 0

如何在 Spring Integration 中从 MessageHandlingException 中删除 PII?

我正在尝试从日志中删除 PII。有没有办法告诉 spring 集成不要在异常中包含有效负载和标头。 公共消息 hasErrorAndPii(消息我...

回答 1 投票 0

如何摆脱警告:`java.lang.RuntimeException:No beanFactory`

我正在做一个项目。我的目标是使用版本 6.3.2 设计 Spring IntegrationFlow。从 Sftp 服务器读取并拆分和转换接收到的 XML 文件。 我有两个简单的 SftpI...

回答 1 投票 0

如何使用kafka出站通道传递消息后读取偏移量

我正在使用kafka输出通道适配器发送消息。有没有办法在发送消息后获取偏移量,下面是我的代码 我正在使用kafka输出通道适配器发送消息。有没有办法在发送消息后获取偏移量,下面是我的代码 <int-kafka:outbound-channel-adapter id="kafkaCommonOutboundChannelAdapter" kafka-template="kafkaTemplate" header-mapper="kafkaHeaderMapper" auto-startup="true" topic-expression="headers['topic']" partition-id-expression="headers['partition']" sync="true"> <int-kafka:request-handler-advice-chain> <ref bean="requestHandlerAdvice"/> <ref bean="retryAdvice"/> </int-kafka:request-handler-advice-chain> </int-kafka:outbound-channel-adapter> <bean id="requestHandlerAdvice" class="org.springframework.integration.handler.advice.ExpressionEvaluatingRequestHandlerAdvice"> <property name="trapException" value="true"/> <property name="onSuccessExpression" ref="success"/> <property name="successChannelName" value="successChannel"/> <property name="onFailureExpression" ref="failure"/> <property name="failureChannelName" value="failureChannel"/> </bean> 谢谢 KafkaProducerMessageHandler 带有如下选项: /** * Set the success channel. * @param sendSuccessChannel the Success channel. */ public void setSendSuccessChannel(MessageChannel sendSuccessChannel) { 制作成功后最终使用的是: processSendResult(message, producerRecord, sendFuture, getSendSuccessChannel()); 并且这样做: if (metadataChannel != null) { KafkaProducerMessageHandler.this.messagingTemplate.send(metadataChannel, getMessageBuilderFactory() .fromMessage(message) .setHeader(KafkaHeaders.RECORD_METADATA, sendResult.getRecordMetadata()) .build()); } 注意 KafkaHeaders.RECORD_METADATA,您可以通过 RecordMetadata 对象访问偏移量。 查看相应的 XML 属性: <xsd:attribute name="send-success-channel" type="xsd:string"> <xsd:annotation> <xsd:documentation><![CDATA[ Specifies the channel to which message with a payload of type 'org.apache.kafka.clients.producer.RecordMetadata' will be sent after a successful send. ]]></xsd:documentation> <xsd:appinfo> <tool:annotation kind="ref"> <tool:expected-type type="org.springframework.messaging.MessageChannel" /> </tool:annotation> </xsd:appinfo> </xsd:annotation> </xsd:attribute>

回答 1 投票 0

如何通过kafka出站通道发送消息后读取偏移量

我正在使用kafka输出通道适配器发送消息。有没有办法在发送消息后获取偏移量,下面是我的代码 我正在使用kafka输出通道适配器发送消息。有没有办法在发送消息后获取偏移量,下面是我的代码 <int-kafka:outbound-channel-adapter id="kafkaCommonOutboundChannelAdapter" kafka-template="kafkaTemplate" header-mapper="kafkaHeaderMapper" auto-startup="true" topic-expression="headers['topic']" partition-id-expression="headers['partition']" sync="true"> <int-kafka:request-handler-advice-chain> <ref bean="requestHandlerAdvice"/> <ref bean="retryAdvice"/> </int-kafka:request-handler-advice-chain> </int-kafka:outbound-channel-adapter> <bean id="requestHandlerAdvice" class="org.springframework.integration.handler.advice.ExpressionEvaluatingRequestHandlerAdvice"> <property name="trapException" value="true"/> <property name="onSuccessExpression" ref="success"/> <property name="successChannelName" value="successChannel"/> <property name="onFailureExpression" ref="failure"/> <property name="failureChannelName" value="failureChannel"/> </bean> 谢谢 KafkaProducerMessageHandler 带有如下选项: /** * Set the success channel. * @param sendSuccessChannel the Success channel. */ public void setSendSuccessChannel(MessageChannel sendSuccessChannel) { 制作成功后最终使用的是: processSendResult(message, producerRecord, sendFuture, getSendSuccessChannel()); 并且这样做: if (metadataChannel != null) { KafkaProducerMessageHandler.this.messagingTemplate.send(metadataChannel, getMessageBuilderFactory() .fromMessage(message) .setHeader(KafkaHeaders.RECORD_METADATA, sendResult.getRecordMetadata()) .build()); } 注意 KafkaHeaders.RECORD_METADATA,您可以通过 RecordMetadata 对象访问偏移量。 查看相应的 XML 属性: <xsd:attribute name="send-success-channel" type="xsd:string"> <xsd:annotation> <xsd:documentation><![CDATA[ Specifies the channel to which message with a payload of type 'org.apache.kafka.clients.producer.RecordMetadata' will be sent after a successful send. ]]></xsd:documentation> <xsd:appinfo> <tool:annotation kind="ref"> <tool:expected-type type="org.springframework.messaging.MessageChannel" /> </tool:annotation> </xsd:appinfo> </xsd:annotation> </xsd:attribute>

回答 1 投票 0

Exchange 服务器在未明确选择收件箱的情况下不接受存储命令

我有一个 ImapMailReceiver 实现来接收来自远程 IMAP 服务器的电子邮件,它与我们迄今为止测试的任何 IMAP 服务器都能正常工作。尽管如此,我们的 Exchange (2016) 服务器之一...

回答 1 投票 0

拆分和过滤后如何检测IntegrationFlow处理结束?

在Spring Integration中,创建IntegrationFlow时,当事件启动流时,我有多个操作,包括拆分器和过滤器。我想检测所有消息何时分裂......

回答 1 投票 0

在处理程序中引发 ImmediateRequeueAmqpException 会导致 ConditionalRejectingErrorHandler 记录错误堆栈跟踪

在我的 Spring 集成流程中,我抛出一个 ImmediateRequeueAmqpException 来获取重新排队的消息以供另一个实例处理该消息,但我得到了由

回答 1 投票 0

动态注册多个流并并行运行

我正在动态注册多个IntegrationFlow: 私有 IntegrationFlowContext 上下文; context.registration(flow1()).register(); context.registration(flow1()).register(); 集成流程...

回答 1 投票 0

使用 MockIntegrationContext 检查出站 FTP 适配器的负载

我有一个 Spring 集成流程,如下所示(简化) @豆 公共标准集成流集成流(){ 返回 IntegrationFlows.from(inputChannel()) .

回答 1 投票 0

Spring 集成:多线程 HTTP 请求聚合仅回复释放组的最后一个请求,其他请求超时

我正在尝试聚合多个http请求负载并一起处理它们。所以,我创建了一个 Spring Boot 应用程序,其中包含: RestController:我期望获得有效负载的地方。 一个

回答 1 投票 0

Spring 集成:Poller.fixedDealy 在多线程应用程序中未按预期工作

我已经使用 Spring Integration 创建了一个应用程序。它从数据库读取数据并将行转换为 XML 格式并发送给 kafka。 我已经使用 Executor 创建多线程进行转换...

回答 1 投票 0

如何解决 SPRING TCP SSL No subject Alternative Names Present

我正在尝试通过 SSL 使用 TCP 出站网关连接到服务器。我正在使用 ClientConnectionFactory。我实现了这个 我还使用自定义的 TcpNetSSLSupport,我相信......

回答 1 投票 0

Spring Integration DSL:在 .routeToRecipients() 之后,我可以将子流的输出路由回主流吗?

我有以下代码,我在 mainFlow 中分支到recipientFlows,然后通过直接通道 PERSIST_TO_DB_CH 在单独的 persistToDbFlow 中处理这些recipientFlows 的结果...

回答 2 投票 0

使用publishSubscribeChannel,在Spring集成中,如果一个订阅失败,另一个订阅还会运行吗?

让我们假设“throwsException”处理程序抛出一个异常。 通常,这将进入错误通道。 但是,第二次订阅还会继续吗? 消息会被看到吗...

回答 1 投票 0

MockRestServiceServer 未正确验证请求

我正在尝试为我的 spring 集成流程编写集成测试。我想用 MockRestServiceServer 记录传出请求(使用 http:outbound-gateway)并将其匹配到 Rest 服务...

回答 4 投票 0

为什么我可以通过telnet连接,但无法从远程客户端建立连接?

我正在编写一个 Spring Boot 应用程序,它充当两个外部客户端(例如 serviceA 和 serviceB)之间的中间人。通信将通过套接字连接,即 tcp/ip 贯穿......

回答 1 投票 0

如何访问Spring Integration InboundChannelAdapter中的请求路径

我有一个 Spring Integration DSL Flow,我正在尝试访问 HTTP 请求路径。可能的路径看起来像这样 /import:导入所有内容 /import/case1:仅导入 A 部分 /导入/c...

回答 1 投票 0

消息驱动通道适配器 - 消息并发处理

问题 我们有一个具有以下消息驱动通道适配器配置的遗留应用程序 问题 我们有一个具有以下消息驱动通道适配器配置的旧应用程序 <jms:message-driven-channel-adapter id="processRequest" destination="requestFromLoader" connection-factory="connectionFactory" max-concurrent-consumers="30" message-converter="xmlMarshalConverter" channel="jmsInChannel" error-channel="errorChannel"/> 使用此配置max-concurrent-consumers =“30”我们预计有30个并行消费者(processRequest-container-1到30)来消费消息,但在测试过程中我们发现每个消费者依次消费10条消息,Eg - processRequest-container-1 处理消息 1 到 10,然后创建 processRequest-container-2 并处理 11 到 20 等等.. 如果只有 2 个请求,这会导致第二条消息等待,即使还有 29 个其他消费者。 分辨率 我们添加了 max-messages-per-task="1",这似乎产生了正确的结果,但我们仍在使用大量消息进行测试。 <jms:message-driven-channel-adapter id="processRequest" destination="requestFromLoader" connection-factory="connectionFactory" max-concurrent-consumers="30" max-messages-per-task="1" message-converter="xmlMarshalConverter" channel="jmsInChannel" error-channel="errorChannel"/> 问题 我们通知日志中的消费者名称(processRequest-container-1)将达到2000或3000(processRequest-container-2546),并且不限于processRequest-container-1到processRequest-container-30,但在第一种情况下,没有 max-messages-per-task="1" 它没有超出 processRequest-container-30,这正常吗?或者我们需要做些什么吗? 在spring文档中max-messages-per-task =“1”被定义为每条消息要执行的任务,什么是任务以及最大并行消费者如何受此影响? 添加 max-messages-per-task="1" 正确还是我们遗漏/忽略了其他内容? 任何建议或评论都很好。预先感谢。 这个问题更多的是关于 Spring JMS,而不是 Spring Integration。 最好阅读用于上述 DefaultMessageListenerContainer 的 <jms:message-driven-channel-adapter> 的源代码和 Javadoc。 有一条有趣的评论可能可以解释您的行为: /** * Specify the maximum number of concurrent consumers to create. Default is 1. * <p>If this setting is higher than "concurrentConsumers", the listener container * will dynamically schedule surplus consumers at runtime, provided that enough * incoming messages are encountered. Once the load goes down again, the number of * consumers will be reduced to the standard level ("concurrentConsumers") again. * <p>Raising the number of concurrent consumers is recommendable in order * to scale the consumption of messages coming in from a queue. However, * note that any ordering guarantees are lost once multiple consumers are * registered. In general, stick with 1 consumer for low-volume queues. * <p><b>Do not raise the number of concurrent consumers for a topic, * unless vendor-specific setup measures clearly allow for it.</b> * With regular setup, this would lead to concurrent consumption * of the same message, which is hardly ever desirable. * <p><b>This setting can be modified at runtime, for example through JMX.</b> * @see #setConcurrentConsumers */ public void setMaxConcurrentConsumers(int maxConcurrentConsumers) { 因此,由于您不使用 concurrent-consumers="30" 来代替,因此您获得最多 30 个动态实例,并且它们的 id 确实增加了,这是正常的。 如果不指定max-messages-per-task="1",逻辑是这样的: if (this.taskExecutor instanceof SchedulingTaskExecutor ste && ste.prefersShortLivedTasks()) { if (this.maxMessagesPerTask == Integer.MIN_VALUE) { // TaskExecutor indicated a preference for short-lived tasks. According to // setMaxMessagesPerTask javadoc, we'll use 10 message per task in this case // unless the user specified a custom value. this.maxMessagesPerTask = 10; } } 或者你的taskExecutor不是那个SchedulingTaskExecutor(这确实是默认情况下),因此你的所有消息都由同一个消费者实例处理。 一些文档也在这里:https://docs.spring.io/spring-framework/reference/integration/jms/using.html#jms-mdp

回答 1 投票 0

Spring Integration 如何根据消息动态设置本地目录/远程目录以发送到远程 sftp 或从中获取文件

我目前正在尝试对一个使用 Spring 集成从 SFTP 服务器发送和接收文件的项目进行更改。 更改包括添加其他 SFTP 服务器,选择正确的服务器...

回答 1 投票 0

最新问题
© www.soinside.com 2019 - 2024. All rights reserved.