由于SpringBatc中的块计数而导致记录丢失

问题描述 投票:0回答:1

我们有一个批处理作业,以使用多个地址加载数百万的Employee数据,当我使用块时,它无法加载几行。

For example if I use chunk 5 and we loose 6th record which is associate to 5th row employee(refer image)

请提出解决方案。这是Spring Batch代码

    @Configuration
public class EmployeeJobMyBatis {


    @Autowired
    private JobBuilderFactory jobBuilderFactory;
    @Autowired
    private StepBuilderFactory stepBuilderFactory;
    @Autowired
    private EmployeeDataSourceConfig datasourceConfig;

    EmployeeRowMapper rowMapper = null;

    private static final Logger LOG = LogManager.getLogger(EmployeeJobMyBatis.class);

    @Bean
    @Qualifier("MyBatisJob")
    public Job mybatisJob() throws Exception {
        return this.jobBuilderFactory.get("MyBatisJob").incrementer(new RunIdIncrementer())
                .start(step()).build();
    }

    @Bean
    public Step step() throws SQLException, Exception {

        return this.stepBuilderFactory.get("EmployeeDataReadStep").<Employee, String>chunk(5)
                .reader(reader()).processor(processor()).writer(writer())
                .build();
    }

    @Bean
    public SqlSessionFactory sqlSessionFactory() throws Exception {
        PathMatchingResourcePatternResolver resourcePatternResolver = new PathMatchingResourcePatternResolver();
        SqlSessionFactoryBean ss = new SqlSessionFactoryBean();
        ss.setDataSource(datasourceConfig.getDataSource());
        ss.setMapperLocations(resourcePatternResolver.getResources("employee.xml"));
        org.apache.ibatis.session.Configuration configuration = new org.apache.ibatis.session.Configuration();
        configuration.setDefaultExecutorType(ExecutorType.BATCH);
        ss.setConfiguration(configuration);
        return ss.getObject();
    }

    @Bean
    public MyBatisCursorItemReader<Employee> reader() throws Exception {
        MyBatisCursorItemReader<Employee> reader = new MyBatisCursorItemReader<Employee>();
        reader.setSqlSessionFactory(sqlSessionFactory());
        reader.setQueryId("EmployeeData");
        return reader;

    }

    @Bean
    public processor processor() {
        return new DataProcessor();
    }

    @Bean
    public MultiResourceItemWriter<String> writer() {
        MultiResourceItemWriter<String> writer = new MultiResourceItemWriter<String>();
        writer.setResource(new FileSystemResource("C:/data/Employee.json"));
        writer.setItemCountLimitPerResource(2500000);
        FlatFileItemWriter<String> fileWriter = new FlatFileItemWriter<String>();
        fileWriter.setLineAggregator(new MyDelimitedLineAggregator());
        writer.setDelegate(fileWriter);
        return writer;
    }



}


 public class DataProcessor implements ItemProcessor<Employee, String> {


    private static final Gson gson = new GsonBuilder().create();

    @Override
    public String process(Employee employee) throws Exception {

        if (employee != null && employee.getId() == null)
            return null;
        else
            return (String) (gson.toJson(employee));
    }

}

public class MyDelimitedLineAggregator extends DelimitedLineAggregator<String> {

    String returnString = "";
    @Override
    public String aggregate(String jsonstr) {
        if(jsonstr != null)
            returnString = jsonstr;
        return returnString;
    }
}



public class Employee{

    String emplId;
    Addresses addressList;

    public String getEmplId() {
        return emplId;
    }

   public void setEmplId(Addresses value) {
        this.emplId = value;
   }


   public Addresses getAddressList() {
       return addressList;
   }

   public void setAddressList(Addresses value) {
       this.addressList = value;
   }
}
public class Addresses{

   List<Address> addresses;

   public List<Address> getAddresses() {
     if (addresses == null) {
        addresses = new ArrayList<Address>();
      }
     return this.addresses;
   }
 }
 public class Address{
   String addressLineOne;
   String city;
   String country;

   public String getAddressLineOne(){
       return addressLineOne;
   }

   public void setAddressLineOne(String value) {
       this.addressLineOne = value;
   }

   public String getCity(){
       return city;
   }

   public void setCity(String value) {
       this.city = value;
   }
   public String getCountry(){
       return country;
   }

   public void setCountry(String value) {
       this.country = value;
   }
}

这里是MyBatis Mapper xml

Employee.xml

<resultMap id="EmployeeMap" type="Employee">
  <id column="emplId" property="emplId"/>
  <collection property="addressList.addresses"
      javaType="list" ofType="Address">
    <result column="addressLineOne" property="addressLineOne"/>
    <result column="city" property="city"/>
    <result column="country" property="country"/>
  </collection>
</resultMap>
<select id="employeeData" resultMap="EmployeeMap">select * from employee e left join address a on a.emplId = e.emplId</select>
java spring-batch mybatis chunks
1个回答
0
投票

面向块的步骤逐行读取行,并将每行映射到一个域对象(基本上是一对一的映射)。就您而言,您具有一对多的关系。因此,您的步骤配置将无法立即使用。您需要执行的是如下实现driving query pattern

  • 让读者阅读员工详细信息除外地址
  • [使用为当前雇员获取地址的项目处理器
© www.soinside.com 2019 - 2024. All rights reserved.