版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/DERRANTCM/article/details/77890370
Spring Boot核心-企业级开发-Spring Batch
【博文目录>>>】
【项目源码>>>】
什么是Spring Batch
Spring Batch 是用来处理大量数据操作的一个框架,主要用来读取大量数据,然后进行一定处理后输出成指定的形式。
Spring Batch 主要组成
SpringBatch 主要由以下几部分组成:
以上Spring Batch 的主要组成部分只需注册成Spring 的Bean 即可。若想、开启批处理的支持还需在配置类上使用@EnableBatchProcessing。
代码实现
scheme.sql
CREATE TABLE IF NOT EXISTS PERSON (
id INT NOT NULL AUTO_INCREMENT PRIMARY KEY ,
name NVARCHAR( 20 ),
age INT ,
nation NVARCHAR( 20 ),
address NVARCHAR( 20 )
);
people.cvs
汪某某,11,汉族,合肥
张某某,12,汉族,上海
李某某,13,非汉族,武汉
刘某,14,非汉族,南京
欧阳某某,115,汉族,北京
application.properties
spring .datasource .driverClassName = com .mysql .jdbc .Driver spring .datasource .url =jdbc\:mysql\://localhost\: 3306 /springboot
spring .datasource .username =root
spring .datasource .password = 123456 spring .jpa .hibernate .ddl -auto=update
logging .level .org .springframework .web =DEBUG # true开启自动批量事物 spring .batch .job .enabled =false
package com.example.spring.boot.domain; import javax.validation.constraints.Size; /**
* Author: 王俊超
* Date: 2017-07-20 07:44
* All Rights Reserved !!!
*/ public class Person { @Size (max = 4 , min = 2 ) //1 private String name; private int age; private String nation; private String address; public String getName () { return name;
} public void setName (String name) { this .name = name;
} public int getAge () { return age;
} public void setAge ( int age) { this .age = age;
} public String getNation () { return nation;
} public void setNation (String nation) { this .nation = nation;
} public String getAddress () { return address;
} public void setAddress (String address) { this .address = address;
}
}
package com .example .spring .boot .controller ; import org .springframework .batch .core .Job ; import org .springframework .batch .core .JobParameters ; import org .springframework .batch .core .JobParametersBuilder ; import org .springframework .batch .core .launch .JobLauncher ; import org .springframework .beans .factory .annotation .Autowired ; import org .springframework .web .bind .annotation .RequestMapping ; import org .springframework .web .bind .annotation .RestController ; /**
* Author: 王俊超
* Date: 2017-07-20 08:00
* All Rights Reserved !!!
*/ @RestController
public class DemoController {
@Autowired
JobLauncher jobLauncher ; @Autowired
Job importJob ; public JobParameters jobParameters ; @RequestMapping( "/read" )
public String imp(String fileName) throws Exception {
String path = fileName + ".csv" ; jobParameters = new JobParametersBuilder() .addLong ( "time" , System .currentTimeMillis ()) .addString ( "input.file.name" , path) .toJobParameters () ; jobLauncher .run (importJob, jobParameters) ; return "ok" ; }
}
package com .example .spring .boot .batch ; import com .example .spring .boot .domain .Person ; import org .springframework .batch .core .Job ; import org .springframework .batch .core .Step ; import org .springframework .batch .core .configuration .annotation .EnableBatchProcessing ; import org .springframework .batch .core .configuration .annotation .JobBuilderFactory ; import org .springframework .batch .core .configuration .annotation .StepBuilderFactory ; import org .springframework .batch .core .launch .support .RunIdIncrementer ; import org .springframework .batch .core .launch .support .SimpleJobLauncher ; import org .springframework .batch .core .repository .JobRepository ; import org .springframework .batch .core .repository .support .JobRepositoryFactoryBean ; import org .springframework .batch .item .ItemProcessor ; import org .springframework .batch .item .ItemReader ; import org .springframework .batch .item .ItemWriter ; import org .springframework .batch .item .database .BeanPropertyItemSqlParameterSourceProvider ; import org .springframework .batch .item .database .JdbcBatchItemWriter ; import org .springframework .batch .item .file .FlatFileItemReader ; import org .springframework .batch .item .file .mapping .BeanWrapperFieldSetMapper ; import org .springframework .batch .item .file .mapping .DefaultLineMapper ; import org .springframework .batch .item .file .transform .DelimitedLineTokenizer ; import org .springframework .batch .item .validator .Validator ; import org .springframework .context .annotation .Bean ; import org .springframework .context .annotation .Configuration ; import org .springframework .core .io .ClassPathResource ; import org .springframework .transaction .PlatformTransactionManager ; import javax .sql .DataSource ; /**
* Author: 王俊超
* Date: 2017-07-20 07:45
* All Rights Reserved !!!
*/ //@Configuration
//@EnableBatchProcessing
public class CsvBatchConfig {
@Bean
public ItemReader<Person> reader() throws Exception {
FlatFileItemReader<Person> reader = new FlatFileItemReader<>() ; reader .setResource (new ClassPathResource( "people.csv" )) ; reader .setLineMapper (new DefaultLineMapper<Person>() {{
setLineTokenizer(new DelimitedLineTokenizer() {{
setNames(new String[]{ "name" , "age" , "nation" , "address" }) ; }}) ; setFieldSetMapper(new BeanWrapperFieldSetMapper<Person>() {{
setTargetType(Person .class ) ; }}) ; }}) ; return reader ; }
@Bean
public ItemProcessor<Person, Person> processor() {
CsvItemProcessor processor = new CsvItemProcessor() ; //1 processor .setValidator (csvBeanValidator()) ; //2 return processor ; }
@Bean
public ItemWriter<Person> writer(DataSource dataSource) {// 1 JdbcBatchItemWriter<Person> writer = new JdbcBatchItemWriter<Person>() ; //2 writer .setItemSqlParameterSourceProvider (new BeanPropertyItemSqlParameterSourceProvider<Person>()) ; String sql = "insert into person " + "(name,age,nation,address) " + "values(:name, :age, :nation,:address)" ; writer .setSql (sql) ; //3 writer .setDataSource (dataSource) ; return writer ; }
@Bean
public JobRepository jobRepository(DataSource dataSource, PlatformTransactionManager transactionManager)
throws Exception {
JobRepositoryFactoryBean jobRepositoryFactoryBean = new JobRepositoryFactoryBean() ; jobRepositoryFactoryBean .setDataSource (dataSource) ; jobRepositoryFactoryBean .setTransactionManager (transactionManager) ; jobRepositoryFactoryBean .setDatabaseType ( "mysql" ) ; return jobRepositoryFactoryBean .getObject () ; }
@Bean
public SimpleJobLauncher jobLauncher(DataSource dataSource, PlatformTransactionManager transactionManager)
throws Exception {
SimpleJobLauncher jobLauncher = new SimpleJobLauncher() ; jobLauncher .setJobRepository (jobRepository(dataSource, transactionManager)) ; return jobLauncher ; }
@Bean
public Job importJob(JobBuilderFactory jobs, Step s1) {
return jobs .get ( "importJob" ) .incrementer (new RunIdIncrementer()) .flow (s1) // 1 .end () .listener (csvJobListener()) // 2 .build () ; }
@Bean
public Step step1(StepBuilderFactory stepBuilderFactory, ItemReader<Person> reader, ItemWriter<Person> writer,
ItemProcessor<Person, Person> processor) {
return stepBuilderFactory .get ( "step1" )
.<Person, Person>chunk( 65000 ) // 1 .reader (reader) // 2 .processor (processor) // 3 .writer (writer) // 4 .build () ; }
@Bean
public CsvJobListener csvJobListener() {
return new CsvJobListener() ; }
@Bean
public Validator<Person> csvBeanValidator() {
return new CsvBeanValidator<>() ; }
}
package com .example .spring .boot .batch ; import org .springframework .batch .item .validator .ValidationException ; import org .springframework .batch .item .validator .Validator ; import org .springframework .beans .factory .InitializingBean ; import javax .validation .ConstraintViolation ; import javax .validation .Validation ; import javax .validation .ValidatorFactory ; import java .util .Set ; /**
* Author: 王俊超
* Date: 2017-07-20 07:53
* All Rights Reserved !!!
*/ public class CsvBeanValidator <T> implements Validator<T>,InitializingBean {
private javax .validation .Validator validator ; @Override
public void afterPropertiesSet() throws Exception {
ValidatorFactory validatorFactory = Validation .buildDefaultValidatorFactory () ; validator = validatorFactory .usingContext () .getValidator () ; }
@Override
public void validate(T value) throws ValidationException { Set <ConstraintViolation<T>> constraintViolations = validator .validate (value) ; if (constraintViolations .size () > 0 ) {
StringBuilder message = new StringBuilder() ; for (ConstraintViolation<T> constraintViolation : constraintViolations) {
message .append (constraintViolation .getMessage ()) .append ( "\n" ) ; }
throw new ValidationException(message .toString ()) ; }
}
}
package com.example.spring.boot.batch; import com.example.spring.boot.domain.Person; import org.springframework.batch.item.validator.ValidatingItemProcessor; import org.springframework.batch.item.validator.ValidationException; /**
* Author: 王俊超
* Date: 2017-07-20 07:50
* All Rights Reserved !!!
*/ public class CsvItemProcessor extends ValidatingItemProcessor < Person > { @Override public Person process(Person item) throws ValidationException { super .process(item); if (item.getNation().equals( "汉族" )) {
item.setNation( "01" );
} else {
item.setNation( "02" );
} return item;
}
}
package com.example.spring.boot.batch; import org.springframework.batch.core.JobExecution; import org.springframework.batch.core.JobExecutionListener; /**
* Author: 王俊超
* Date: 2017-07-20 07:52
* All Rights Reserved !!!
*/ public class CsvJobListener implements JobExecutionListener { private long startTime; private long endTime; @Override public void beforeJob (JobExecution jobExecution) {
startTime = System.currentTimeMillis();
System.out.println( "任务处理开始" );
} @Override public void afterJob (JobExecution jobExecution) {
endTime = System.currentTimeMillis();
System.out.println( "任务处理结束" );
System.out.println( "耗时:" + (endTime - startTime) + "ms" );
}
}
package com .example .spring .boot .batch ; import com .example .spring .boot .domain .Person ; import org .springframework .batch .core .Job ; import org .springframework .batch .core .Step ; import org .springframework .batch .core .configuration .annotation .EnableBatchProcessing ; import org .springframework .batch .core .configuration .annotation .JobBuilderFactory ; import org .springframework .batch .core .configuration .annotation .StepBuilderFactory ; import org .springframework .batch .core .configuration .annotation .StepScope ; import org .springframework .batch .core .launch .support .RunIdIncrementer ; import org .springframework .batch .core .launch .support .SimpleJobLauncher ; import org .springframework .batch .core .repository .JobRepository ; import org .springframework .batch .core .repository .support .JobRepositoryFactoryBean ; import org .springframework .batch .item .ItemProcessor ; import org .springframework .batch .item .ItemReader ; import org .springframework .batch .item .ItemWriter ; import org .springframework .batch .item .database .BeanPropertyItemSqlParameterSourceProvider ; import org .springframework .batch .item .database .JdbcBatchItemWriter ; import org .springframework .batch .item .file .FlatFileItemReader ; import org .springframework .batch .item .file .mapping .BeanWrapperFieldSetMapper ; import org .springframework .batch .item .file .mapping .DefaultLineMapper ; import org .springframework .batch .item .file .transform .DelimitedLineTokenizer ; import org .springframework .batch .item .validator .Validator ; import org .springframework .beans .factory .annotation .Value ; import org .springframework .context .annotation .Bean ; import org .springframework .context .annotation .Configuration ; import org .springframework .core .io .ClassPathResource ; import org .springframework .transaction .PlatformTransactionManager ; import javax .sql .DataSource ; /**
* Author: 王俊超
* Date: 2017-07-20 07:58
* All Rights Reserved !!!
*/ @Configuration
@EnableBatchProcessing
public class TriggerBatchConfig {
@Bean
@StepScope
public FlatFileItemReader<Person> reader(@Value( "#{jobParameters['input.file.name']}" ) String pathToFile) throws Exception {
FlatFileItemReader<Person> reader = new FlatFileItemReader<Person>() ; //1 reader .setResource (new ClassPathResource(pathToFile)) ; //2 reader .setLineMapper (new DefaultLineMapper<Person>() {{ // 3 setLineTokenizer(new DelimitedLineTokenizer() {{
setNames(new String[] { "name" , "age" , "nation" , "address" }) ; }}) ; setFieldSetMapper(new BeanWrapperFieldSetMapper<Person>() {{
setTargetType(Person .class ) ; }}) ; }}) ; return reader ; }
@Bean
public ItemProcessor<Person, Person> processor() {
CsvItemProcessor processor = new CsvItemProcessor() ; //1 processor .setValidator (csvBeanValidator()) ; //2 return processor ; }
@Bean
public ItemWriter<Person> writer(DataSource dataSource) {// 1 JdbcBatchItemWriter<Person> writer = new JdbcBatchItemWriter<Person>() ; //2 writer .setItemSqlParameterSourceProvider (new BeanPropertyItemSqlParameterSourceProvider<Person>()) ; String sql = "insert into person " + "(name,age,nation,address) " + "values(:name, :age, :nation,:address)" ; writer .setSql (sql) ; //3 writer .setDataSource (dataSource) ; return writer ; }
@Bean
public JobRepository jobRepository(DataSource dataSource, PlatformTransactionManager transactionManager)
throws Exception {
JobRepositoryFactoryBean jobRepositoryFactoryBean = new JobRepositoryFactoryBean() ; jobRepositoryFactoryBean .setDataSource (dataSource) ; jobRepositoryFactoryBean .setTransactionManager (transactionManager) ; jobRepositoryFactoryBean .setDatabaseType ( "mysql" ) ; return jobRepositoryFactoryBean .getObject () ; }
@Bean
public SimpleJobLauncher jobLauncher(DataSource dataSource, PlatformTransactionManager transactionManager)
throws Exception {
SimpleJobLauncher jobLauncher = new SimpleJobLauncher() ; jobLauncher .setJobRepository (jobRepository(dataSource, transactionManager)) ; return jobLauncher ; }
@Bean
public Job importJob(JobBuilderFactory jobs, Step s1) {
return jobs .get ( "importJob" ) .incrementer (new RunIdIncrementer()) .flow (s1) // 1 .end () .listener (csvJobListener()) // 2 .build () ; }
@Bean
public Step step1(StepBuilderFactory stepBuilderFactory, ItemReader<Person> reader, ItemWriter<Person> writer,
ItemProcessor<Person,Person> processor) {
return stepBuilderFactory .get ( "step1" )
.<Person, Person>chunk( 65000 ) // 1 .reader (reader) // 2 .processor (processor) // 3 .writer (writer) // 4 .build () ; }
@Bean
public CsvJobListener csvJobListener() {
return new CsvJobListener() ; }
@Bean
public Validator<Person> csvBeanValidator() {
return new CsvBeanValidator<Person>() ; }
}