spring-batch 相关问题

Spring Batch是一个轻量级,全面的框架,旨在实现对企业系统日常运营至关重要的批处理应用程序的开发。此上下文中的批处理应用程序是指针对批量数据处理的自动离线系统。

在 Spring Batch 中加载和处理大量数据

我们有一个用例,使用以下连接资源代码将 100M 记录从共享对象存储桶加载到 Mongo DB HttpURLConnection httpConnection = null; 尝试 { http连接 = (

回答 1 投票 0

使用Spring批处理在数据库中导入CSV:RepositoryItemWriter没有使用JPA插入MySQL数据库

我有一个 CSV 文件,我必须将其导入到 mysql 数据库中。它的大小超过8Go。 为此,我开始研究 Spring Batch 来解决我的问题。我首先创建了一个 Spring Batch 应用程序: 我有一个 CSV 文件,我必须将其导入到 mysql 数据库中。它的大小超过8Go。 为此,我开始研究 Spring Batch 来解决我的问题。我首先创建了一个 Spring Batch 应用程序: <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>3.1.4</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>fr.test.batch</groupId> <artifactId>mysql-import</artifactId> <version>0.0.1-SNAPSHOT</version> <name>mysql-import-batch</name> <description>mysql-import-batch</description> <properties> <java.version>17</java.version> <fasterxml.jackson.version>2.15.2</fasterxml.jackson.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-batch</artifactId> </dependency> <!-- Provides transitive vulnerable dependency maven:org.yaml:snakeyaml:1.33 --> <dependency> <groupId>org.yaml</groupId> <artifactId>snakeyaml</artifactId> <version>2.0</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-jpa</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </dependency> <dependency> <groupId>com.mysql</groupId> <artifactId>mysql-connector-j</artifactId> <scope>runtime</scope> </dependency> <!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-core --> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-core</artifactId> <version>${fasterxml.jackson.version}</version> </dependency> <!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-annotations --> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-annotations</artifactId> <version>${fasterxml.jackson.version}</version> </dependency> <!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-databind --> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> <version>${fasterxml.jackson.version}</version> </dependency> <!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.dataformat/jackson-dataformat-xml --> <dependency> <groupId>com.fasterxml.jackson.dataformat</groupId> <artifactId>jackson-dataformat-xml</artifactId> <version>${fasterxml.jackson.version}</version> </dependency> <!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.datatype/jackson-datatype-jsr310 --> <dependency> <groupId>com.fasterxml.jackson.datatype</groupId> <artifactId>jackson-datatype-jsr310</artifactId> <version>${fasterxml.jackson.version}</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.batch</groupId> <artifactId>spring-batch-test</artifactId> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> 我的 CSV 文件有一整套列: Siren,nic,siret,statutDiffusionEtablissement,dateCreationEtablissement.. 并且 CSV 不包含任何主键。我也在使用 JPA,为此我创建了一个实体: @Entity @Table(name="establishments") @Data @AllArgsConstructor @NoArgsConstructor @JsonInclude(JsonInclude.Include.NON_NULL) public class EstablishmentEntity { @Id @GeneratedValue(strategy = GenerationType.IDENTITY) private Long id; @JsonProperty(value="siren") @JsonAlias("siren") private String siren; @JsonProperty(value="icn") @JsonAlias("nic") private String icn; @JsonProperty(value="siret") @JsonAlias("siret") private String siret; ... } 我也有一个存储库: @Repository public interface EstablishmentRepository extends CrudRepository<EstablishmentEntity, Long> { } 现在我尝试配置一个批处理类: @Configuration @EnableBatchProcessing @EnableTransactionManagement public class InseeBatchConfiguration { private static final Logger log = LoggerFactory.getLogger(InseeBatchConfiguration.class); //Reader class Object @Bean public FlatFileItemReader<EstablishmentEntity> reader() { log.warn("reader() called"); return new FlatFileItemReaderBuilder<EstablishmentEntity>() .name("establishmentItemReader") .resource(new FileSystemResource("D:\\StockEtablissement_utf8.csv")) .delimited() .names("siren", "nic", "siret", "statutDiffusionEtablissement", "dateCreationEtablissement"...) .fieldSetMapper(new BeanWrapperFieldSetMapper<>() {{ setTargetType(EstablishmentEntity.class); }}) .build(); @Bean public EstablishmentItemProcessor processor() { log.warn("processor() called"); return new EstablishmentItemProcessor(); } //Writer class Object @Bean public RepositoryItemWriter<EstablishmentEntity> writer(EstablishmentRepository repository) { log.warn("writer() called"); return new RepositoryItemWriterBuilder<EstablishmentEntity>() .repository(repository) .methodName("save") .build(); } @Bean public Job importJob( JobRepository jobRepository, BatchListener listener, Step step1) { log.warn("importJob() called"); return new JobBuilder("import", jobRepository) .incrementer(new RunIdIncrementer()) .listener(listener) .flow(step1) .end() .build(); } @Bean public Step step1( JobRepository jobRepository, PlatformTransactionManager transactionManager, RepositoryItemWriter<EstablishmentEntity> writer) { log.warn("step() called"); return new StepBuilder("step1", jobRepository) .<EstablishmentEntity, EstablishmentEntity> chunk(10, transactionManager) .reader(reader()) .processor(processor()) .writer(writer) .build(); } } 主要方法如下: public static void main(String[] args) { // SpringApplication.run(MysqlImportBatchApplication.class, args); System.exit(SpringApplication.exit(SpringApplication.run(MysqlImportBatchApplication.class, args))); } 我有以下应用程序属性: #mysql database connection spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver spring.datasource.url = jdbc:mysql://localhost:3306/insee?createDatabaseIfNotExist=true&allowPublicKeyRetrieval=true&useSSL=false spring.datasource.username = root spring.datasource.password = root spring.jpa.generate-ddl=true #disabled job run at startup #----------ORM Details------------------- #To display SQL At console spring.jpa.show-sql=true #To Create tables spring.jpa.hibernate.ddl-auto=update #To Generate SQL queries spring.jpa.database-platform=org.hibernate.dialect.MySQL8Dialect spring.jpa.properties.hibernate.dialect = org.hibernate.dialect.MySQL8Dialect #----------Spring Batch Properties---------- # By default it's true which means all the Spring batches will start executing automatically spring.batch.job.enabled=false # Tables for metadata created by Spring Boot (Always, Embedded, Never) spring.batch.jdbc.initialize-schema=ALWAYS 当我启动运行主方法的应用程序时,应用程序正确启动,如果不存在但没有写入数据,则创建表 根据 Mahmoud Ben Hassine 的回答,我更新了我的代码: 1。实现 JpaTransactionManager @Bean public JpaTransactionManager transactionManager() throws SQLException { JpaTransactionManager transactionManager = new JpaTransactionManager(); transactionManager.setEntityManagerFactory(entityManagerFactoryBean().getObject()); transactionManager.setDataSource(entityManagerFactoryBean().getDataSource()); return transactionManager; } @Bean(name = "entityManagerFactory") public LocalContainerEntityManagerFactoryBean entityManagerFactoryBean() throws SQLException { LocalContainerEntityManagerFactoryBean entityManagerFactoryBean = new LocalContainerEntityManagerFactoryBean(); entityManagerFactoryBean.setJpaVendorAdapter(vendorAdaptor()); entityManagerFactoryBean.setDataSource(datasource()); entityManagerFactoryBean.setPersistenceProviderClass(HibernatePersistenceProvider.class); entityManagerFactoryBean.setPackagesToScan(ENTITYMANAGER_PACKAGES_TO_SCAN); entityManagerFactoryBean.setJpaProperties(jpaHibernateProperties()); entityManagerFactoryBean.setJpaDialect(new HibernateJpaDialect()); return entityManagerFactoryBean; } private HibernateJpaVendorAdapter vendorAdaptor() { HibernateJpaVendorAdapter vendorAdapter = new HibernateJpaVendorAdapter(); vendorAdapter.setShowSql(true); return vendorAdapter; } @Bean(name = "dataSource") @ConfigurationProperties(prefix = "spring.datasource") public DataSource datasource() { HikariConfig config = new HikariConfig(); config.setJdbcUrl(env.getProperty("spring.datasource.url")); config.setUsername(env.getProperty("spring.datasource.username")); ... config.setPassword(env.getProperty("spring.datasource.password")); config.setMinimumIdle(Integer.parseInt( Properties props = new Properties(); props.put( "spring.datasource.hikari.data-source-properties.cachePrepStmts", env.getProperty("spring.datasource.hikari.data-source-properties.cachePrepStmts")); ... config.setDataSourceProperties(props); log.warn("## datasource() called"); return new HikariDataSource(config); } private Properties jpaHibernateProperties() { Properties properties = new Properties(); ... properties.put(PROPERTY_NAME_HIBERNATE_SHOW_SQL, env.getProperty(PROPERTY_NAME_HIBERNATE_SHOW_SQL)); properties.put(PROPERTY_NAME_HIBERNATE_DIALECT, env.getProperty(PROPERTY_NAME_HIBERNATE_DIALECT)); //properties.put(AvailableSettings.JAKARTA_HBM2DDL_DATABASE_ACTION, "none"); return properties; } 2。我也更新了我的step1。 @Bean public Step step1( BatchReadListener listener, JobRepository jobRepository, JpaTransactionManager transactionManager, RepositoryItemWriter<EstablishmentEntity> writer) throws IOException { log.warn("step() called"); return new StepBuilder("step1", jobRepository) .<EstablishmentEntity, EstablishmentEntity> chunk(10, transactionManager) .reader(reader()) .processor(processor()) .writer(writer) .listener(listener) .build(); } 我还添加了一个监听器用于测试目的。 @Service public class BatchReadListener implements ItemReadListener<EstablishmentEntity> { @Override public void beforeRead() { // ItemReadListener.super.beforeRead(); System.out.println("Before reading ..."); } @Override public void afterRead(EstablishmentEntity item) { System.out.println("After reading ..."); System.out.println("## " + item.getSiret()); } @Override public void onReadError(Exception ex) { ex.printStackTrace(); // ItemReadListener.super.onReadError(ex); } } 启动应用程序时,数据仍未存储在数据库中。我确实有有关数据库连接的日志: HikariPool-1 - Fill pool skipped, pool has sufficient level or currently being filled (queueDepth=0). HikariPool-1 - Pool stats (total=10, active=0, idle=10, waiting=0) HikariPool-1 - Fill pool skipped, pool has sufficient level or currently being filled (queueDepth=0). HikariPool-1 - Pool stats (total=10, active=0, idle=10, waiting=0) HikariPool-1 - Fill pool skipped, pool has sufficient level or currently being filled (queueDepth=0). HikariPool-1 - Closing connection com.mysql.cj.jdbc.ConnectionImpl@61c9d64b: (connection has passed maxLifetime) RepositoryItemWriter基于JPA存储库,因此您需要确保步骤中自动装配的PlatformTransactionManager transactionManager是JpaTransactionManager类型,而不是DataSourceTransactionManager或JdbcTransactionManager(这似乎是默认类型)由 Spring Boot 自动配置)。 您可以将步骤签名更改为: @Bean public Step step1( JobRepository jobRepository, JpaTransactionManager transactionManager, RepositoryItemWriter<EstablishmentEntity> writer) { ... } 这将使错误变得明显,即如果上下文中没有定义 JpaTransactionManager bean,应用程序将无法启动。

回答 1 投票 0

Spring 批处理和 Kubernetes cron 作业

我正在尝试配置通过 Kubernetes cron 作业触发的 Spring 批处理作业。我配置了作业、分区和步骤。该作业可以通过 Kubernetes cron 作业触发。 弹簧靴

回答 1 投票 0

Spring Batch 表中的 ID 增加 20

Spring 批处理表中的 ID 字段增加 20。 对于所有春季批次表都是如此 如何将Spring Batch表中的ID一一增加? 示例表 BATCH_JOB_EXECUTION

回答 1 投票 0

避免在 Spring Batch 5 中创建元数据表

如何在 Spring Batch 5(Spring Boot 3)中禁用元数据表的创建。我在日志 org.springframework.jdbc.BadSqlGrammarException 中看到以下异常:PreparedStatementCallb...

回答 1 投票 0

了解 BATCH_STEP_EXECUTION 中的 COMMIT_COUNT

运行 Spring Batch 作业时,我在 BATCH_STEP_EXECUTION 表中看到了我不理解的 COMMIT_COUNT 数据。 这项工作相当简单。这是带有

回答 1 投票 0

运行 Kubernetes 调度的 Spring Batch

我有一个 Spring Batch 作业并将其部署在 Kunernetes 上。我还使用 Kubernetes cronjob 配置每天上午 10 点运行它。像这样的例子 api版本:batch/v1 种类:CronJob 元数据: 名称: 你好...

回答 1 投票 0

如何避免在步骤1完成之前调用步骤2的Reader构造函数?

Reader1 从 .csv 文件读取,Processor1 生成不同的输出,Writer1 将其存储在 RECORD 表中,同时还将文件存储在 FILE 表中,并以自动生成的 ID 作为主...

回答 1 投票 0

在 Spring 批处理作业中创建窥视阅读器实例时出现问题

我有大师步 @豆 公共步骤employeeMasterProfilePhoneStep()抛出UnexpectedInputException,ParseException { 返回stepBuilderFactory.get(Constant.MASTER_PHONE_STEP) .

回答 1 投票 0

spring batch RepositoryItemWriter不将数据写入数据库

@配置 公共类数据源配置{ @豆 @ConfigurationProperties(“spring.datasource”) 公共数据源appDataSource() { DataSourceBuilder 构建器 =

回答 1 投票 0

使用Mockito进行Sprng批量stepscope阅读器测试 - org.springframework.batch.item.ReaderNotOpenException:阅读器必须先打开才能读取

我正在测试 @stepscope JDBCCUrsorIteamReader 但出现以下错误。我在开始阅读之前打开阅读器,但它的阅读器仍然没有打开。我想模拟数据库调用。 我怎样才能

回答 1 投票 0

使用两个数据源的 Spring 批处理

我正在开发一个 Spring Batch 项目,我需要使用两个不同的数据源。我计划使用 RepositoryItemReader 和 RepositoryItemWriter 来读取和写入这些数据库。哈...

回答 1 投票 0

JdbcPagingItemReader:有关异步/同步行为的问题

我有一个关于 JdbcPagingItemReader 的问题: 当我们初始化 QueryProvider 时,建议使用 setSortKeys 方法和唯一键约束,以保证不会丢失数据

回答 1 投票 0

使用 Mockito 进行 Spring 批量作业测试

我是春季批量应用程序的新手。我正在使用 Junit 和 mockito 为 Spring 批处理作业编写测试用例。我尝试使用内存数据库编写测试用例,但我们正在使用一些特定于 Oracle 的

回答 1 投票 0

如果 bean 声明为 ItemWriter,Spring-Batch 不会打开 FlatFileItemWriter

我正在使用 Spring Batch 4.3.8,我尝试在配置中使用以下代码来定义 ItemWriter @豆 @StepScope 公共 ItemWriter itemWriter(MyFileConfig fileConfig)...

回答 1 投票 0

引起:org.springframework.beans.factory.NoSuchBeanDefinitionException:没有类型为“javax.sql.DataSource”的合格 bean - Spring Batch Junit5

我试图为Spring Batch项目编写Junit,并指的是:https://docs.spring.io/spring-batch/docs/4.3.x/api/org/springframework/batch/test/context /SpringBatchTest.html,但是...

回答 1 投票 0

没有类型为“org.springframework.batch.core.configuration注释.JobBuilderFactory”的合格bean可用:在Junit中

我已经浏览了下面的几个链接,但没有开始工作,我正在使用 Spring Boot v2.7.1 和 Batch,并且有下面的代码并使用 junit 5。 https://docs.spring.io/spring-batch/docs/4.0.x/

回答 1 投票 0

Spring Batch / Postgres:错误:关系“batch_job_instance”不存在

我正在尝试配置 Spring Batch 以使用 PostGres DB。我已在 build.gradle.kts 文件中包含以下依赖项: 实现(“org.springframework.boot:spring-boot-starter-...

回答 7 投票 0

JdbcPagingItemReader:PagingQueryProvider在第二次迭代后添加参数值

我创建了一个扩展 JdbcPagingItemReader 的类。我里面有这个配置。 @PostConstruct 公共无效 init() 抛出异常 { setDataSource(dsSrv.getUserDataSource());...

回答 1 投票 0

用于工作流程数据迁移的 Spring 批处理

在执行数据迁移时,通常会涉及到需要从多个存储库中提取数据、转换数据然后保存到新存储库的工作流程。解决这个问题的一种方法...

回答 1 投票 0

© www.soinside.com 2019 - 2024. All rights reserved.