Spring 批处理分区器:如何将 reader 类扩展为 bean 并在配置类中引用它

问题描述 投票:0回答:1

这是我的配置类,

@Bean
  @StepScope
    public JdbcPagingItemReader<DailyTransactionByUserId> pagingItemReader(
            @Value("#{stepExecutionContext['fromRow']}") Integer fromRow,
            @Value("#{stepExecutionContext['toRow']}") Integer toRow) throws Exception
    {  
      //Logic using fromRow and toRow
    }

    @Bean
        public DialyTransactionByUserIdPartitioner partitioner() 
        {
          DialyTransactionByUserIdPartitioner columnRangePartitioner = new DialyTransactionByUserIdPartitioner();
            return columnRangePartitioner;
        }
      @Bean
        public ItemWriter<DailyTransactionByUserId> dialyTransactionByUserIdItemWriter()
        {
            ItemWriter<DailyTransactionByUserId> itemWriter = appContext.getBean(DailyTransactionByUserWriter.class);
            return itemWriter;
        }
        @Bean
        public ItemProcessor<DailyTransactionByUserId, DailyTransactionByUserId> dialyTransactionByUserIdProcessor() {
            ItemProcessor<DailyTransactionByUserId, DailyTransactionByUserId> itemProcessor = appContext.getBean(DailyTransactionByUserProcessor.class);
            return itemProcessor;
        }
      //Master
        @Bean
        public Step dialyTransactionstep1() 
        {
            return new StepBuilder("FERA_DAILY_TRANSACTION_BY_USERID_API_CALL_STEP", jobRepository)
                    .partitioner(slaveStep().getName(), partitioner())
                    .step(slaveStep())
                    .gridSize(feraDailyTransactionByUserIdBatchProperties.getDailyTransactionByUserIdBatchStep1PartitionGridSize())
                    .taskExecutor(dailyTransactionByUserTaskExecutor())
                    .build();
        }
        
        // slave step
        @Bean
        public Step slaveStep() 
        {
            
            try {
                return new StepBuilder("slaveStep", jobRepository)
                        .<DailyTransactionByUserId, DailyTransactionByUserId>chunk(feraDailyTransactionByUserIdBatchProperties.getDailyTransactionByUserIdBatchStep1ReaderPageSize(),transactionManager)
                        .taskExecutor(dailyTransactionByUserTaskExecutor())
                        .reader(pagingItemReader(null,null))
                        .processor(dialyTransactionByUserIdProcessor())
                        .writer(dialyTransactionByUserIdItemWriter())
                        .build();
            } catch (Exception ex) {
                throw new RuntimeException("Error creating slave step: " + ex.getMessage());
            }
        }

这里编写器和处理器被读取为bean并注入到步骤中。现在如何让读者也以同样的方式?如果 reader 也作为 bean 注入,那么应该用什么来代替“.reader(pagingItemReader(null,null))”?

spring-boot spring-batch batch-processing spring-batch-tasklet spring-batch-job-monitoring
1个回答
0
投票

您可以将 bean 作为参数注入到方法中,这实际上是推荐的方法。

类似这样的:

@Bean
@StepScope
public JdbcPagingItemReader<DailyTransactionByUserId> pagingItemReader(
    @Value("#{stepExecutionContext['fromRow']}") Integer fromRow,
    @Value("#{stepExecutionContext['toRow']}") Integer toRow) throws Exception
{  
  //Logic using fromRow and toRow
}

@Bean
public DialyTransactionByUserIdPartitioner partitioner() 
{
    return new DialyTransactionByUserIdPartitioner();
}

//Master
@Bean
public Step dialyTransactionstep1(
    Step slaveStep, 
    DialyTransactionByUserIdPartitioner partitioner,
    TaskExecutor dailyTransactionByUserTaskExecutor) 
{
    return new StepBuilder("FERA_DAILY_TRANSACTION_BY_USERID_API_CALL_STEP", jobRepository)
                    .partitioner(slaveStep.getName(), partitioner)
                    .step(slaveStep)
                    .gridSize(feraDailyTransactionByUserIdBatchProperties.getDailyTransactionByUserIdBatchStep1PartitionGridSize())
                    .taskExecutor(dailyTransactionByUserTaskExecutor)
                    .build();
}
        
// slave step
@Bean
public Step slaveStep(
    JdbcPagingItemReader<DailyTransactionByUserId> reader
    DailyTransactionByUserProcessor processor, 
    DailyTransactionByUserWriter writer,
    TaskExecutor dailyTransactionByUserTaskExecutor) 
{
    return new StepBuilder("slaveStep", jobRepository)
                .<DailyTransactionByUserId, DailyTransactionByUserId>chunk(feraDailyTransactionByUserIdBatchProperties.getDailyTransactionByUserIdBatchStep1ReaderPageSize(),transactionManager)
                .taskExecutor(dailyTransactionByUserTaskExecutor)
                .reader(reader)
                .processor(processor)
                .writer(writer)
                .build();
}

您现在还可以删除处理器/编写器的

@Bean
方法,因为它们已经是 bean。当您开始使用
ApplicationContext.getBean
方法做
@Bean
之类的事情时,您需要重新考虑您的解决方案。

© www.soinside.com 2019 - 2024. All rights reserved.