如果中间有一个带有轮询器的
QueueChannel
,我将面临正确设置跟踪传播的问题:(
Flow 由 HTTP 调用发起,触发 Spring 集成的
MessagingGateway
:
@RestController
@RequiredArgsConstructor
class FooController {
private final InitiateSftpImportGateway gateway;
@ResponseStatus(HttpStatus.NO_CONTENT)
@PostMapping("/bar")
void runImportAt(@RequestParam String path) {
gateway.runImportAt(path);
}
@MessagingGateway(defaultRequestChannel = "sftpListingChannel", errorChannel = "errorChannel")
interface InitiateSftpImportGateway {
@Gateway
void runImportAt(String path);
}
}
通道定义如下:
@Bean
MessageChannel sftpListingChannel(final ObservationRegistry observationRegistry) {
final var directChannel = new DirectChannel();
directChannel.registerObservationRegistry(observationRegistry);
return directChannel;
}
@Bean
MessageChannel sftpFetchingChannel(final ObservationRegistry observationRegistry) {
final var queueChannel = new QueueChannel();
queueChannel.registerObservationRegistry(observationRegistry);
return queueChannel;
}
以及实际流程中的片段 - 基于触发器,
sftpListingChannel
在 SFTP 上提取文件:
@Bean
@ServiceActivator(inputChannel = "sftpListingChannel", outputChannel = "fileSplittingChannel")
MessageHandler listFiles(final CachingSessionFactory<SftpClient.DirEntry> cachingSessionFactory) {
final var outboundGateway = new SftpOutboundGateway(
cachingSessionFactory,
AbstractRemoteFileOutboundGateway.Command.LS.getCommand(),
"payload"
);
outboundGateway.setOption(AbstractRemoteFileOutboundGateway.Option.RECURSIVE);
return outboundGateway;
}
并将它们分开:
@Splitter(inputChannel = "fileSplittingChannel", outputChannel = "sftpFetchingChannel")
List<FileInfo<SftpClient.DirEntry>> splitByFile(@Payload final List<FileInfo<SftpClient.DirEntry>> fileInfo) {
return fileInfo;
}
对于每个文件 - 我们下载它:
@Bean
@ServiceActivator(
inputChannel = "sftpFetchingChannel",
outputChannel = "...",
poller = @Poller(value = "transactionalPoller")
)
MessageHandler fetchFiles(CachingSessionFactory<SftpClient.DirEntry> cachingSessionFactory) {
final var outboundGateway = new SftpOutboundGateway(
cachingSessionFactory,
AbstractRemoteFileOutboundGateway.Command.GET.getCommand(),
Constants.Expressions.FILE_LOCATION_FROM_HEADER
);
outboundGateway.setOption(
AbstractRemoteFileOutboundGateway.Option.STREAM,
AbstractRemoteFileOutboundGateway.Option.PRESERVE_TIMESTAMP
);
return outboundGateway;
}
它使用自定义轮询器来注入事务建议:
@Bean
PollerMetadata transactionalPoller(final TransactionManager transactionManager) {
final var pollerMetadata = new PollerMetadata();
pollerMetadata.setAdviceChain(
List.of(new TransactionInterceptorBuilder().transactionManager(transactionManager).build())
);
return pollerMetadata;
}
轮询器(?)终止初始跟踪并初始化一个新的跟踪 - 这很麻烦,因为我想跟踪整体流程并仅中继跨度 ID 以深入研究特定的文件子进程。
我最关心的是
errorChannel
- 如果拆分后的某些子操作抛出错误,则会在 errorChannel
上处理,但使用新的跟踪 ID :(
请建议我是否/如何调整我的配置以使跟踪传播更平滑。
免责声明:
get
(甚至是适配器),而不是普通的 ls
- 上面的代码只是更大流程的一个小子集记录一下:
你确实错过了观察传播。
QueueChannel
在另一个线程上被消耗,并且不知道生产者线程中的跟踪。因此,您需要配置提到的 ObservationPropagationChannelInterceptor
,以通过存储在该通道中的消息将跟踪从生产者传播到消费者。