31-SpringBoot——核心-企业级开发-SpringBatch

凌云 关注

收藏于 : 2018-11-13 21:04   被转藏 : 1   

版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/DERRANTCM/article/details/77890370

Spring Boot核心-企业级开发-Spring Batch


【博文目录>>>】


【项目源码>>>】


什么是Spring Batch


Spring Batch 是用来处理大量数据操作的一个框架,主要用来读取大量数据,然后进行一定处理后输出成指定的形式。

Spring Batch 主要组成


SpringBatch 主要由以下几部分组成:

以上Spring Batch 的主要组成部分只需注册成Spring 的Bean 即可。若想、开启批处理的支持还需在配置类上使用@EnableBatchProcessing。

代码实现

scheme.sql

                     CREATE   TABLE   IF   NOT   EXISTS  PERSON (
  id  INT   NOT   NULL  AUTO_INCREMENT  PRIMARY   KEY  ,
  name    NVARCHAR(  20  ),
  age  INT  ,
  nation  NVARCHAR(  20  ),
  address NVARCHAR(  20  )
);   

people.cvs

                   汪某某,11,汉族,合肥
张某某,12,汉族,上海
李某某,13,非汉族,武汉
刘某,14,非汉族,南京
欧阳某某,115,汉族,北京  

application.properties

                   spring  .datasource   .driverClassName  =  com   .mysql   .jdbc   .Driver  spring  .datasource   .url  =jdbc\:mysql\://localhost\:  3306  /springboot
spring  .datasource   .username  =root
spring  .datasource   .password  =  123456  spring  .jpa   .hibernate   .ddl  -auto=update
logging  .level   .org   .springframework   .web  =DEBUG  # true开启自动批量事物  spring  .batch   .job   .enabled  =false  
                    package  com.example.spring.boot.domain;  import  javax.validation.constraints.Size;  /**
 * Author: 王俊超
 * Date: 2017-07-20 07:44
 * All Rights Reserved !!!
 */   public    class   Person  {   @Size  (max =  4  , min =  2  )  //1   private  String name;  private   int  age;  private  String nation;  private  String address;  public  String  getName  () {  return  name;
    }  public   void   setName  (String name) {  this  .name = name;
    }  public   int   getAge  () {  return  age;
    }  public   void   setAge  (  int  age) {  this  .age = age;
    }  public  String  getNation  () {  return  nation;
    }  public   void   setNation  (String nation) {  this  .nation = nation;
    }  public  String  getAddress  () {  return  address;
    }  public   void   setAddress  (String address) {  this  .address = address;
    }
}  
                   package  com   .example   .spring   .boot   .controller   ;  import org  .springframework   .batch   .core   .Job   ;  import org  .springframework   .batch   .core   .JobParameters   ;  import org  .springframework   .batch   .core   .JobParametersBuilder   ;  import org  .springframework   .batch   .core   .launch   .JobLauncher   ;  import org  .springframework   .beans   .factory   .annotation   .Autowired   ;  import org  .springframework   .web   .bind   .annotation   .RequestMapping   ;  import org  .springframework   .web   .bind   .annotation   .RestController   ;   /**
 * Author: 王俊超
 * Date: 2017-07-20 08:00
 * All Rights Reserved !!!
 */  @RestController
public class DemoController {

    @Autowired
    JobLauncher jobLauncher  ;  @Autowired
    Job importJob  ;  public JobParameters jobParameters  ;  @RequestMapping(  "/read"  )
    public String imp(String fileName) throws Exception {

        String path = fileName +  ".csv"   ;  jobParameters = new JobParametersBuilder()  .addLong  (  "time"  , System  .currentTimeMillis  ())  .addString  (  "input.file.name"  , path)  .toJobParameters  ()  ;  jobLauncher  .run  (importJob, jobParameters)  ;  return  "ok"   ;  }

}  
                   package  com   .example   .spring   .boot   .batch   ;  import  com   .example   .spring   .boot   .domain   .Person   ;  import org  .springframework   .batch   .core   .Job   ;  import org  .springframework   .batch   .core   .Step   ;  import org  .springframework   .batch   .core   .configuration   .annotation   .EnableBatchProcessing   ;  import org  .springframework   .batch   .core   .configuration   .annotation   .JobBuilderFactory   ;  import org  .springframework   .batch   .core   .configuration   .annotation   .StepBuilderFactory   ;  import org  .springframework   .batch   .core   .launch   .support   .RunIdIncrementer   ;  import org  .springframework   .batch   .core   .launch   .support   .SimpleJobLauncher   ;  import org  .springframework   .batch   .core   .repository   .JobRepository   ;  import org  .springframework   .batch   .core   .repository   .support   .JobRepositoryFactoryBean   ;  import org  .springframework   .batch   .item   .ItemProcessor   ;  import org  .springframework   .batch   .item   .ItemReader   ;  import org  .springframework   .batch   .item   .ItemWriter   ;  import org  .springframework   .batch   .item   .database   .BeanPropertyItemSqlParameterSourceProvider   ;  import org  .springframework   .batch   .item   .database   .JdbcBatchItemWriter   ;  import org  .springframework   .batch   .item   .file   .FlatFileItemReader   ;  import org  .springframework   .batch   .item   .file   .mapping   .BeanWrapperFieldSetMapper   ;  import org  .springframework   .batch   .item   .file   .mapping   .DefaultLineMapper   ;  import org  .springframework   .batch   .item   .file   .transform   .DelimitedLineTokenizer   ;  import org  .springframework   .batch   .item   .validator   .Validator   ;  import org  .springframework   .context   .annotation   .Bean   ;  import org  .springframework   .context   .annotation   .Configuration   ;  import org  .springframework   .core   .io   .ClassPathResource   ;  import org  .springframework   .transaction   .PlatformTransactionManager   ;  import javax  .sql   .DataSource   ;   /**
 * Author: 王俊超
 * Date: 2017-07-20 07:45
 * All Rights Reserved !!!
 */  //@Configuration
//@EnableBatchProcessing
public class CsvBatchConfig {
    @Bean
    public ItemReader<Person> reader() throws Exception {
        FlatFileItemReader<Person> reader = new FlatFileItemReader<>()  ;  reader  .setResource  (new ClassPathResource(  "people.csv"  ))  ;  reader  .setLineMapper  (new DefaultLineMapper<Person>() {{
            setLineTokenizer(new DelimitedLineTokenizer() {{
                setNames(new String[]{  "name"  ,  "age"  ,  "nation"  ,  "address"  })  ;  }})  ;  setFieldSetMapper(new BeanWrapperFieldSetMapper<Person>() {{
                setTargetType(Person  .class  )  ;  }})  ;  }})  ;  return reader  ;  }

    @Bean
    public ItemProcessor<Person, Person> processor() {
        CsvItemProcessor processor = new CsvItemProcessor()  ; //1  processor  .setValidator  (csvBeanValidator())  ; //2  return processor  ;  }

    @Bean
    public ItemWriter<Person> writer(DataSource dataSource) {//  1  JdbcBatchItemWriter<Person> writer = new JdbcBatchItemWriter<Person>()  ; //2  writer  .setItemSqlParameterSourceProvider  (new BeanPropertyItemSqlParameterSourceProvider<Person>())  ;  String sql =  "insert into person "  +  "(name,age,nation,address) "  +  "values(:name, :age, :nation,:address)"   ;  writer  .setSql  (sql)  ; //3  writer  .setDataSource  (dataSource)  ;  return writer  ;  }


    @Bean
    public JobRepository jobRepository(DataSource dataSource, PlatformTransactionManager transactionManager)
            throws Exception {
        JobRepositoryFactoryBean jobRepositoryFactoryBean = new JobRepositoryFactoryBean()  ;  jobRepositoryFactoryBean  .setDataSource  (dataSource)  ;  jobRepositoryFactoryBean  .setTransactionManager  (transactionManager)  ;  jobRepositoryFactoryBean  .setDatabaseType  (  "mysql"  )  ;  return jobRepositoryFactoryBean  .getObject  ()  ;  }

    @Bean
    public SimpleJobLauncher jobLauncher(DataSource dataSource, PlatformTransactionManager transactionManager)
            throws Exception {
        SimpleJobLauncher jobLauncher = new SimpleJobLauncher()  ;  jobLauncher  .setJobRepository  (jobRepository(dataSource, transactionManager))  ;  return jobLauncher  ;  }

    @Bean
    public Job importJob(JobBuilderFactory jobs, Step s1) {
        return jobs  .get  (  "importJob"  )  .incrementer  (new RunIdIncrementer())  .flow  (s1) //  1   .end  ()  .listener  (csvJobListener()) //  2   .build  ()  ;  }

    @Bean
    public Step step1(StepBuilderFactory stepBuilderFactory, ItemReader<Person> reader, ItemWriter<Person> writer,
            ItemProcessor<Person, Person> processor) {
        return stepBuilderFactory  .get  (  "step1"  )
                .<Person, Person>chunk(  65000  ) //  1   .reader  (reader) //  2   .processor  (processor) //  3   .writer  (writer) //  4   .build  ()  ;  }

    @Bean
    public CsvJobListener csvJobListener() {
        return new CsvJobListener()  ;  }

    @Bean
    public Validator<Person> csvBeanValidator() {
        return new CsvBeanValidator<>()  ;  }


}  
                   package  com   .example   .spring   .boot   .batch   ;  import org  .springframework   .batch   .item   .validator   .ValidationException   ;  import org  .springframework   .batch   .item   .validator   .Validator   ;  import org  .springframework   .beans   .factory   .InitializingBean   ;  import javax  .validation   .ConstraintViolation   ;  import javax  .validation   .Validation   ;  import javax  .validation   .ValidatorFactory   ;  import java  .util   .Set   ;   /**
 * Author: 王俊超
 * Date: 2017-07-20 07:53
 * All Rights Reserved !!!
 */  public class CsvBeanValidator <T> implements Validator<T>,InitializingBean {
    private javax  .validation   .Validator  validator  ;  @Override
    public void afterPropertiesSet() throws Exception {
        ValidatorFactory validatorFactory = Validation  .buildDefaultValidatorFactory  ()  ;  validator = validatorFactory  .usingContext  ()  .getValidator  ()  ;  }

    @Override
    public void validate(T value) throws ValidationException {  Set  <ConstraintViolation<T>> constraintViolations = validator  .validate  (value)  ;  if (constraintViolations  .size  () >  0  ) {
            StringBuilder message = new StringBuilder()  ;  for (ConstraintViolation<T> constraintViolation : constraintViolations) {
                message  .append  (constraintViolation  .getMessage  ())  .append  (  "\n"  )  ;  }
            throw new ValidationException(message  .toString  ())  ;  }
    }

}  
                    package  com.example.spring.boot.batch;  import  com.example.spring.boot.domain.Person;  import  org.springframework.batch.item.validator.ValidatingItemProcessor;  import  org.springframework.batch.item.validator.ValidationException;  /**
 * Author: 王俊超
 * Date: 2017-07-20 07:50
 * All Rights Reserved !!!
 */  public   class   CsvItemProcessor   extends   ValidatingItemProcessor  <  Person  > {   @Override  public Person process(Person item)  throws  ValidationException {  super  .process(item);  if  (item.getNation().equals(  "汉族"  )) {
            item.setNation(  "01"  );
        }  else  {
            item.setNation(  "02"  );
        }  return  item;
    }
}  
                    package  com.example.spring.boot.batch;  import  org.springframework.batch.core.JobExecution;  import  org.springframework.batch.core.JobExecutionListener;  /**
 * Author: 王俊超
 * Date: 2017-07-20 07:52
 * All Rights Reserved !!!
 */   public    class   CsvJobListener   implements   JobExecutionListener  {   private   long  startTime;  private   long  endTime;  @Override   public   void   beforeJob  (JobExecution jobExecution) {
        startTime = System.currentTimeMillis();
        System.out.println(  "任务处理开始"  );
    }  @Override   public   void   afterJob  (JobExecution jobExecution) {
        endTime = System.currentTimeMillis();
        System.out.println(  "任务处理结束"  );
        System.out.println(  "耗时:"  + (endTime - startTime) +  "ms"  );
    }
}  
                   package  com   .example   .spring   .boot   .batch   ;  import  com   .example   .spring   .boot   .domain   .Person   ;  import org  .springframework   .batch   .core   .Job   ;  import org  .springframework   .batch   .core   .Step   ;  import org  .springframework   .batch   .core   .configuration   .annotation   .EnableBatchProcessing   ;  import org  .springframework   .batch   .core   .configuration   .annotation   .JobBuilderFactory   ;  import org  .springframework   .batch   .core   .configuration   .annotation   .StepBuilderFactory   ;  import org  .springframework   .batch   .core   .configuration   .annotation   .StepScope   ;  import org  .springframework   .batch   .core   .launch   .support   .RunIdIncrementer   ;  import org  .springframework   .batch   .core   .launch   .support   .SimpleJobLauncher   ;  import org  .springframework   .batch   .core   .repository   .JobRepository   ;  import org  .springframework   .batch   .core   .repository   .support   .JobRepositoryFactoryBean   ;  import org  .springframework   .batch   .item   .ItemProcessor   ;  import org  .springframework   .batch   .item   .ItemReader   ;  import org  .springframework   .batch   .item   .ItemWriter   ;  import org  .springframework   .batch   .item   .database   .BeanPropertyItemSqlParameterSourceProvider   ;  import org  .springframework   .batch   .item   .database   .JdbcBatchItemWriter   ;  import org  .springframework   .batch   .item   .file   .FlatFileItemReader   ;  import org  .springframework   .batch   .item   .file   .mapping   .BeanWrapperFieldSetMapper   ;  import org  .springframework   .batch   .item   .file   .mapping   .DefaultLineMapper   ;  import org  .springframework   .batch   .item   .file   .transform   .DelimitedLineTokenizer   ;  import org  .springframework   .batch   .item   .validator   .Validator   ;  import org  .springframework   .beans   .factory   .annotation   .Value   ;  import org  .springframework   .context   .annotation   .Bean   ;  import org  .springframework   .context   .annotation   .Configuration   ;  import org  .springframework   .core   .io   .ClassPathResource   ;  import org  .springframework   .transaction   .PlatformTransactionManager   ;  import javax  .sql   .DataSource   ;   /**
 * Author: 王俊超
 * Date: 2017-07-20 07:58
 * All Rights Reserved !!!
 */  @Configuration
@EnableBatchProcessing
public class TriggerBatchConfig {

    @Bean
    @StepScope
    public FlatFileItemReader<Person> reader(@Value(  "#{jobParameters['input.file.name']}"  ) String pathToFile) throws Exception {
        FlatFileItemReader<Person> reader = new FlatFileItemReader<Person>()  ; //1  reader  .setResource  (new ClassPathResource(pathToFile))  ; //2  reader  .setLineMapper  (new DefaultLineMapper<Person>() {{ //  3  setLineTokenizer(new DelimitedLineTokenizer() {{
                setNames(new String[] {  "name"  ,  "age"  ,  "nation"  ,  "address"  })  ;  }})  ;  setFieldSetMapper(new BeanWrapperFieldSetMapper<Person>() {{
                setTargetType(Person  .class  )  ;  }})  ;  }})  ;  return reader  ;  }

    @Bean
    public ItemProcessor<Person, Person> processor() {
        CsvItemProcessor processor = new CsvItemProcessor()  ; //1  processor  .setValidator  (csvBeanValidator())  ; //2  return processor  ;  }



    @Bean
    public ItemWriter<Person> writer(DataSource dataSource) {//  1  JdbcBatchItemWriter<Person> writer = new JdbcBatchItemWriter<Person>()  ; //2  writer  .setItemSqlParameterSourceProvider  (new BeanPropertyItemSqlParameterSourceProvider<Person>())  ;  String sql =  "insert into person "  +  "(name,age,nation,address) "  +  "values(:name, :age, :nation,:address)"   ;  writer  .setSql  (sql)  ; //3  writer  .setDataSource  (dataSource)  ;  return writer  ;  }

    @Bean
    public JobRepository jobRepository(DataSource dataSource, PlatformTransactionManager transactionManager)
            throws Exception {
        JobRepositoryFactoryBean jobRepositoryFactoryBean = new JobRepositoryFactoryBean()  ;  jobRepositoryFactoryBean  .setDataSource  (dataSource)  ;  jobRepositoryFactoryBean  .setTransactionManager  (transactionManager)  ;  jobRepositoryFactoryBean  .setDatabaseType  (  "mysql"  )  ;  return jobRepositoryFactoryBean  .getObject  ()  ;  }

    @Bean
    public SimpleJobLauncher jobLauncher(DataSource dataSource, PlatformTransactionManager transactionManager)
            throws Exception {
        SimpleJobLauncher jobLauncher = new SimpleJobLauncher()  ;  jobLauncher  .setJobRepository  (jobRepository(dataSource, transactionManager))  ;  return jobLauncher  ;  }

    @Bean
    public Job importJob(JobBuilderFactory jobs, Step s1) {
        return jobs  .get  (  "importJob"  )  .incrementer  (new RunIdIncrementer())  .flow  (s1) //  1   .end  ()  .listener  (csvJobListener()) //  2   .build  ()  ;  }

    @Bean
    public Step step1(StepBuilderFactory stepBuilderFactory, ItemReader<Person> reader, ItemWriter<Person> writer,
            ItemProcessor<Person,Person> processor) {
        return stepBuilderFactory  .get  (  "step1"  )
                .<Person, Person>chunk(  65000  ) //  1   .reader  (reader) //  2   .processor  (processor) //  3   .writer  (writer) //  4   .build  ()  ;  }



    @Bean
    public CsvJobListener csvJobListener() {
        return new CsvJobListener()  ;  }

    @Bean
    public Validator<Person> csvBeanValidator() {
        return new CsvBeanValidator<Person>()  ;  }
}  
 阅读文章全部内容  
点击查看
文章点评
相关文章
凌云 关注

文章收藏:9259

TA的最新收藏