我们有一个批处理作业,以使用多个地址加载数百万的Employee数据,当我使用块时,它无法加载几行。
请提出解决方案。这是Spring Batch代码
@Configuration
public class EmployeeJobMyBatis {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
private EmployeeDataSourceConfig datasourceConfig;
EmployeeRowMapper rowMapper = null;
private static final Logger LOG = LogManager.getLogger(EmployeeJobMyBatis.class);
@Bean
@Qualifier("MyBatisJob")
public Job mybatisJob() throws Exception {
return this.jobBuilderFactory.get("MyBatisJob").incrementer(new RunIdIncrementer())
.start(step()).build();
}
@Bean
public Step step() throws SQLException, Exception {
return this.stepBuilderFactory.get("EmployeeDataReadStep").<Employee, String>chunk(5)
.reader(reader()).processor(processor()).writer(writer())
.build();
}
@Bean
public SqlSessionFactory sqlSessionFactory() throws Exception {
PathMatchingResourcePatternResolver resourcePatternResolver = new PathMatchingResourcePatternResolver();
SqlSessionFactoryBean ss = new SqlSessionFactoryBean();
ss.setDataSource(datasourceConfig.getDataSource());
ss.setMapperLocations(resourcePatternResolver.getResources("employee.xml"));
org.apache.ibatis.session.Configuration configuration = new org.apache.ibatis.session.Configuration();
configuration.setDefaultExecutorType(ExecutorType.BATCH);
ss.setConfiguration(configuration);
return ss.getObject();
}
@Bean
public MyBatisCursorItemReader<Employee> reader() throws Exception {
MyBatisCursorItemReader<Employee> reader = new MyBatisCursorItemReader<Employee>();
reader.setSqlSessionFactory(sqlSessionFactory());
reader.setQueryId("EmployeeData");
return reader;
}
@Bean
public processor processor() {
return new DataProcessor();
}
@Bean
public MultiResourceItemWriter<String> writer() {
MultiResourceItemWriter<String> writer = new MultiResourceItemWriter<String>();
writer.setResource(new FileSystemResource("C:/data/Employee.json"));
writer.setItemCountLimitPerResource(2500000);
FlatFileItemWriter<String> fileWriter = new FlatFileItemWriter<String>();
fileWriter.setLineAggregator(new MyDelimitedLineAggregator());
writer.setDelegate(fileWriter);
return writer;
}
}
public class DataProcessor implements ItemProcessor<Employee, String> {
private static final Gson gson = new GsonBuilder().create();
@Override
public String process(Employee employee) throws Exception {
if (employee != null && employee.getId() == null)
return null;
else
return (String) (gson.toJson(employee));
}
}
public class MyDelimitedLineAggregator extends DelimitedLineAggregator<String> {
String returnString = "";
@Override
public String aggregate(String jsonstr) {
if(jsonstr != null)
returnString = jsonstr;
return returnString;
}
}
public class Employee{
String emplId;
Addresses addressList;
public String getEmplId() {
return emplId;
}
public void setEmplId(Addresses value) {
this.emplId = value;
}
public Addresses getAddressList() {
return addressList;
}
public void setAddressList(Addresses value) {
this.addressList = value;
}
}
public class Addresses{
List<Address> addresses;
public List<Address> getAddresses() {
if (addresses == null) {
addresses = new ArrayList<Address>();
}
return this.addresses;
}
}
public class Address{
String addressLineOne;
String city;
String country;
public String getAddressLineOne(){
return addressLineOne;
}
public void setAddressLineOne(String value) {
this.addressLineOne = value;
}
public String getCity(){
return city;
}
public void setCity(String value) {
this.city = value;
}
public String getCountry(){
return country;
}
public void setCountry(String value) {
this.country = value;
}
}
这里是MyBatis Mapper xml
Employee.xml
<resultMap id="EmployeeMap" type="Employee">
<id column="emplId" property="emplId"/>
<collection property="addressList.addresses"
javaType="list" ofType="Address">
<result column="addressLineOne" property="addressLineOne"/>
<result column="city" property="city"/>
<result column="country" property="country"/>
</collection>
</resultMap>
<select id="employeeData" resultMap="EmployeeMap">select * from employee e left join address a on a.emplId = e.emplId</select>
面向块的步骤逐行读取行,并将每行映射到一个域对象(基本上是一对一的映射)。就您而言,您具有一对多的关系。因此,您的步骤配置将无法立即使用。您需要执行的是如下实现driving query pattern: