我正在尝试实现 Spring Cloud Stream 应用程序以:
我尝试了一些不同的方法,但我一定误解了一些事情。
我尝试使用一个简单的应用程序对这种情况进行建模,该应用程序运行完美(使用 Spring Initializr 和那些相关更改生成):
pom.xml
<properties>
<java.version>21</java.version>
<spring-cloud.version>2023.0.4</spring-cloud.version>
<spring-function.version>5.0.1</spring-function.version>
</properties>
<!-- ... -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
Java代码
@Configuration
public class CloudFunctionConfiguration {
private final StreamBridge streamBridge;
public CloudFunctionConfiguration(final StreamBridge streamBridge) {
this.streamBridge = streamBridge;
}
@Bean
Supplier<String> source() {
return () -> "a,1,b,2,3,c";
}
@Bean
Consumer<String> splitter() {
return string -> {
Arrays.asList(string.split(",")).forEach(s -> {
final var message = MessageBuilder
.withPayload("{\"attribute\": \"".concat(s).concat("\"}"))
.setHeader("some-header", "some-content")
.build();
streamBridge.send("output", message);
});
};
}
}
应用程序属性
spring.application.name=sftp-demo
logging.level.root=DEBUG
logging.file.name=app.log
spring.cloud.function.definition=source|splitter
spring.cloud.stream.output-bindings=output
spring.cloud.stream.bindings.output.producer.required-groups=app
spring.cloud.stream.bindings.output.destination=ex.docs
spring.cloud.stream.rabbit.bindings.source|splitter-out-0.producer.declareExchange=false
spring.cloud.stream.rabbit.bindings.source|splitter-out-0.consumer.declareExchange=false
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=admin
这按预期工作;每次调用轮询的供应商时都会发布所有消息。
但是当我更改 SFTP 供应商功能的简单
String
供应商时,事情就停止了。
我生成了一个包含三个记录的示例 GZipped 文件,然后进行了以下更改:
pom.xml(已添加)
<dependency>
<groupId>org.springframework.cloud.fn</groupId>
<artifactId>spring-sftp-supplier</artifactId>
<version>${spring-function.version}</version><!-- 5.0.1 -->
</dependency>
Java代码(新版本)
@Configuration
public class CloudFunctionConfiguration {
protected static final Logger LOGGER = LoggerFactory.getLogger(CloudFunctionConfiguration.class);
private final GzipMessageParser parser;
private final StreamBridge streamBridge;
public CloudFunctionConfiguration(
final GzipMessageParser parser,
final StreamBridge streamBridge) {
this.parser = parser;
this.streamBridge = streamBridge;
}
@Bean
Consumer<Message<byte[]>> messageParser() {
return gzipFile -> {
parser.parse(gzipFile).forEach(message -> {
LOGGER.info("SENDING MESSAGE {}", message);
streamBridge.send("output", message);
});
};
}
}
application.properties(更改)[编辑:修复了复制/粘贴错误]
# changed
spring.cloud.function.definition=sftpSupplier|messageParser
spring.cloud.stream.rabbit.bindings.sftpSupplier|messageParser-out-0.producer.declareExchange=false
spring.cloud.stream.rabbit.bindings.sftpSupplier|messageParser-out-0.consumer.declareExchange=false
# added
spring.cloud.config.enabled=false
file.consumer.mode=contents
sftp.supplier.delay-when-empty=10s
sftp.supplier.filename-regex=^some-prefix.*?GZ$
sftp.supplier.remote-dir=upload
sftp.supplier.stream=true
sftp.supplier.factory.host=localhost
sftp.supplier.factory.port=2222
sftp.supplier.factory.username=sftpuser
sftp.supplier.factory.password=sftppwd
sftp.supplier.factory.private-key=file:///opt/keys/ssh_host_rsa_key
sftp.supplier.factory.allow-unknown-keys=true
实际上比这更复杂一点(我已经配置了数据库和元数据存储),但我将省略这些额外的配置(因为它们似乎工作正常)。
当我运行这个应用程序时,所有消息都会发送到 RabbitMQ,但我收到一些错误。这是日志中的内容。
sftpSupplier
传递到 messageParser
函数时,Spring Cloud Stream 会执行此操作)(...) DEBUG 781757 --- [sftp-demo] [boundedElastic-1] c.f.c.c.BeanFactoryAwareFunctionRegistry : Converted Message: GenericMessage [payload=byte[528], headers={file_remoteHostPort=localhost:2222, file_remoteFileInfo={"directory":false,"filename":"some-prefix_doc-files.GZ","link":false,"modified":1733178662000,"permissions":"rw-r--r--","remoteDirectory":"upload","size":528}, file_remoteDirectory=upload, id=8b2b9553-cb47-fdd3-111f-6c4d39af4593, contentType=application/octet-stream, closeableResource=org.springframework.integration.file.remote.session.CachingSessionFactory$CachedSession@6b5b5c4b, file_remoteFile=some-prefix_doc-files.GZ, timestamp=1733261637557}] to: GenericMessage [payload=byte[528], headers={file_remoteHostPort=localhost:2222, file_remoteFileInfo={"directory":false,"filename":"some-prefix_doc-files.GZ","link":false,"modified":1733178662000,"permissions":"rw-r--r--","remoteDirectory":"upload","size":528}, file_remoteDirectory=upload, id=8b2b9553-cb47-fdd3-111f-6c4d39af4593, contentType=application/octet-stream, closeableResource=org.springframework.integration.file.remote.session.CachingSessionFactory$CachedSession@6b5b5c4b, file_remoteFile=some-prefix_doc-files.GZ, timestamp=1733261637557}]
我看到了
LOGGER.info("SENDING MESSAGE {}", message);
条目,然后是 preSend
钩子日志、Publishing message (...)
日志和 postSend
钩子日志。
一切看起来都不错,但后来我看到了这个:
(...) DEBUG 781757 --- [sftp-demo] [scheduling-1] c.f.c.c.BeanFactoryAwareFunctionRegistry : Invoking function sftpSupplier|messageParser
(...) DEBUG 781757 --- [sftp-demo] [scheduling-1] o.s.i.e.SourcePollingChannelAdapter : Poll resulted in Message: GenericMessage [payload=MonoMap, headers={id=e531d10a-8edc-88fd-fe07-3bba0b950a48, timestamp=1733261637927}]
(...)
(...) DEBUG 823084 --- [sftp-demo] [scheduling-1] o.s.integration.handler.LoggingHandler : bean '_org.springframework.integration.errorLogger.handler' for component '_org.springframework.integration.errorLogger' received message: ErrorMessage [payload=org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.integration.amqp.outbound.AmqpOutboundEndpoint@4841e6], failedMessage=GenericMessage [payload=MonoMap, headers={id=c5903f2f-f4a7-1ca5-18e0-b76cb2f98fa6, timestamp=1733263993106}], headers={id=cb9ce0d7-bac6-0ba2-6e0e-7ac40ad989a1, timestamp=1733263993106}] for original GenericMessage [payload=MonoMap, headers={id=364cff1e-6120-f005-ee63-a34865009323, timestamp=1733263993106}]
(...) ERROR 823084 --- [sftp-demo] [scheduling-1] o.s.integration.handler.LoggingHandler : org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.integration.amqp.outbound.AmqpOutboundEndpoint@4841e6], failedMessage=GenericMessage [payload=MonoMap, headers={id=c5903f2f-f4a7-1ca5-18e0-b76cb2f98fa6, timestamp=1733263993106}]
(...)
Caused by: java.lang.IllegalArgumentException: SimpleMessageConverter only supports String, byte[] and Serializable payloads, received: reactor.core.publisher.MonoMap
也就是说,Spring Cloud Stream 正在轮询函数定义并自动将其绑定到 RabbitMQ Binder。由于轮询结果是一条带有 Mono 负载的消息(我不知道为什么),绑定器尝试转换消息并发送它 - 然后失败。
奇怪的是,我的第一个工作示例也进行了此轮询,但这就是结果:
DEBUG 759414 --- [demo] [scheduling-1] o.s.i.e.SourcePollingChannelAdapter : Received no Message during the poll, returning 'false'
我认为这就是它起作用的原因。
然后我还尝试了功能批量生产者方法(通过创建
Function<Message<String>, List<Message<String>>>
),但我无法让应用程序单独发送消息; RabbitMQ 仅收到一条序列化的消息 List
。
看起来我不应该这样做,但是通过阅读文档,我得到的印象是我可以(并且应该?)编写函数(使用我自己的 Spring Cloud Functions)来轻松构建集成应用程序。
那么我如何构建这样一个应用程序呢?我应该使用 Spring Integration 来完成它,还是可以在代码或配置中做一些事情?
因此,这是对您的困境的某种答案:https://github.com/spring-cloud/spring-functions-catalog/pull/108.
老实说,即使是我在弄清楚如何使所有内容协同工作时也遇到了一些障碍。
使用该示例,您可以将简单的
fileSupplier
替换为您的 sftpSupplier
要求等等。
我没有尝试使用
StreamBridge
,但无论如何这是可能的,尽管它会有点脱离函数组合的目的。