springbatch
阅读原文时间:2023年07月15日阅读:4

springbatch

  • job:作业,是批处理中的核心概念,是batch操作的基础单元,每个job由多个step组成

  • step:步骤,任务完成的节点

  • 每个job是由JobBuildFactory创建,每个step是由StepBuildFactory创建

  • 示例:

    ``

    @Configuration

    @EnableBatchProcessing //开启批处理

    public class JobConfiguration {

        /**
         * 创建任务对象的对象
         */
        @Autowired
        private JobBuilderFactory jobBuilderFactory;
    /**
     * 创建步骤对象的对象
     */
    @Autowired
    private StepBuilderFactory stepBuilderFactory;
    
    @Bean
    public Job helloJob(){
    
        return jobBuilderFactory.get("helloJob").start(helloStep1()).build();
    }
    
    @Bean
    public Step helloStep1(){
    
        return stepBuilderFactory.get("helloStep1").tasklet(new Tasklet() {
            @Override
            public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
                System.out.println("hello world");
                return RepeatStatus.FINISHED;
            }
        }).build();
    }
    }

    ``

  • JdbcPagingItemReader

      . 设置数据源
      . 设置查询条件
      . 将查询结果映射成实体类
  • MySqlPagingQueryProvider // mysql分页查询支持器

  • 代码示例:

        JdbcPagingItemReader<User> jdbcPagingItemReader = new JdbcPagingItemReader<>();
        jdbcPagingItemReader.setDataSource(dataSource);
        jdbcPagingItemReader.setFetchSize(3);//每次从数据库拉取条数
    //设置查询条件
    MySqlPagingQueryProvider pagingQueryProvider = new MySqlPagingQueryProvider();
    pagingQueryProvider.setSelectClause("id,name,password");
    pagingQueryProvider.setFromClause("user");
    
    Map&lt;String, Order&gt; orderMap = new HashMap&lt;&gt;();
    orderMap.put("password",Order.DESCENDING);//设置排序字段
    pagingQueryProvider.setSortKeys(orderMap);
    jdbcPagingItemReader.setQueryProvider(pagingQueryProvider);
    
    //设置行映射器
    jdbcPagingItemReader.setRowMapper(new RowMapper&lt;User&gt;() {
        @Override
        public User mapRow(ResultSet resultSet, int i) throws SQLException {
            User user = new User();
            user.setId(resultSet.getInt(1));
            user.setName(resultSet.getString(2));
            user.setPassword(resultSet.getString(3));
            return user;
        }
    });
    return jdbcPagingItemReader;</code></pre></li>
  • FlatFileItemReader

  • DelimitedLineTokenizer //行分词器

  • DefaultLineMapper //行映射器,将一行数据映射成对应的实体

  • defaultLineMapper.setLineTokenizer(delimitedLineTokenizer);

  • flatFileItemReader.setLineMapper(defaultLineMapper);

  • 代码示例:

        FlatFileItemReader<Hospital> flatFileItemReader = new FlatFileItemReader<>();
        flatFileItemReader.setEncoding("utf-8");
        flatFileItemReader.setLinesToSkip(1);//跳过第一行
        flatFileItemReader.setResource(new ClassPathResource("/data/hospital.txt"));
    //解析数据  按行分词
    DelimitedLineTokenizer delimitedLineTokenizer = new DelimitedLineTokenizer();
    delimitedLineTokenizer.setNames("id","org_name","org_type","addr","allow_no","cert_dept",
            "start_valid_date","end_invalid_date");
    
    //行映射器
    DefaultLineMapper&lt;Hospital&gt; defaultLineMapper = new DefaultLineMapper&lt;&gt;();
    defaultLineMapper.setLineTokenizer(delimitedLineTokenizer);
    defaultLineMapper.setFieldSetMapper(new FieldSetMapper&lt;Hospital&gt;() {
        @Override
        public Hospital mapFieldSet(FieldSet fieldSet) throws BindException {
            Hospital hospital = new Hospital();
            hospital.setId(fieldSet.readInt("id"));
            hospital.setAddr(fieldSet.readString("addr"));
            hospital.setAllowNo(fieldSet.readString("allow_no"));
            hospital.setCertDept(fieldSet.readString("cert_dept"));
            hospital.setEndValidDate(fieldSet.readString("end_invalid_date"));
            hospital.setOrgName(fieldSet.readString("org_name"));
            hospital.setOrgType(fieldSet.readString("org_type"));
            hospital.setStartValidDate(fieldSet.readString("start_valid_date"));
            return hospital;
        }
    });
    defaultLineMapper.afterPropertiesSet();
    flatFileItemReader.setLineMapper(defaultLineMapper);
    
    return flatFileItemReader;</code></pre></li>
  • StaxEventItemReader xml读取器

  • pom引入jar包

    ``

      <dependency>
          <groupId>com.thoughtworks.xstream</groupId>
          <artifactId>xstream</artifactId>
          <version>1.4.11.1</version>
      </dependency>
    
      <dependency>
          <groupId>org.springframework</groupId>
          <artifactId>spring-oxm</artifactId>
          <version>5.0.8.RELEASE</version>
      </dependency>

    ``

  • xstream实现xml转对象

  • spring-oxm遵循xstream接口规范,实现xml转对象

  • 代码示例:

    ``

          @Bean
          public ItemReader<User> xmlItemReader() {
          StaxEventItemReader&lt;User&gt; staxEventItemReader = new StaxEventItemReader&lt;&gt;();
          staxEventItemReader.setFragmentRootElementName("user");//设置根标签
          staxEventItemReader.setResource(new ClassPathResource("/data/user.xml"));//设置文件路径
    
          //将xml转成实体对象
          XStreamMarshaller xStreamMarshaller = new XStreamMarshaller();
          Map&lt;String,Class&gt; alias = new HashMap&lt;&gt;();
          alias.put("user",User.class);//key 为根标签,class是key标签下的所有标签映射到的对象
          xStreamMarshaller.setAliases(alias);
    
          staxEventItemReader.setUnmarshaller(xStreamMarshaller);
    
          return staxEventItemReader;
      }</code></pre>

    ``

  • MultiResourceItemReader

  • 多文件读取,其实就是一个一个文件读取

  • MultiResourceItemReader需要设置文件读取代理,待读取文件资源

  • 代码示例:

    ``

    @Value("classpath:/data/file*.txt")

    private Resource[] resources;

    @Bean

    public MultiResourceItemReader multiFileItemReader() {

       MultiResourceItemReader<Hospital> multiResourceItemReader = new MultiResourceItemReader<>();
       //多文件读取,其实也是一个一个文件进行读取,需要设置文件读取器
       multiResourceItemReader.setDelegate(flatFileItemReader());
       multiResourceItemReader.setResources(resources);
       return multiResourceItemReader;

    }

    @Bean

    public FlatFileItemReader flatFileItemReader(){

    FlatFileItemReader flatFileItemReader = new FlatFileItemReader<>();

    flatFileItemReader.setEncoding("utf-8");

       DelimitedLineTokenizer delimitedLineTokenizer = new DelimitedLineTokenizer();
       delimitedLineTokenizer.setDelimiter(",");
       delimitedLineTokenizer.setNames("id","org_name","org_type","addr","allow_no","cert_dept",
               "start_valid_date","end_invalid_date");
    
       DefaultLineMapper<Hospital> defaultLineMapper = new DefaultLineMapper<>();
       defaultLineMapper.setLineTokenizer(delimitedLineTokenizer);
       defaultLineMapper.setFieldSetMapper(new FieldSetMapper<Hospital>() {
           @Override
           public Hospital mapFieldSet(FieldSet fieldSet) throws BindException {
               Hospital hospital = new Hospital();
               hospital.setStartValidDate(fieldSet.readString("start_valid_date"));
               hospital.setOrgType(fieldSet.readString("org_type"));
               hospital.setOrgName(fieldSet.readString("org_name"));
               hospital.setEndValidDate(fieldSet.readString("end_invalid_date"));
               hospital.setCertDept(fieldSet.readString("cert_dept"));
               hospital.setAllowNo(fieldSet.readString("allow_no"));
               hospital.setAddr(fieldSet.readString("addr"));
               hospital.setId(fieldSet.readInt("id"));
               return hospital;
           }
       });
    
       flatFileItemReader.setLineMapper(defaultLineMapper);
    
       return flatFileItemReader;

    }

    ``

  • JdbcBatchItemWriter

  • 代码示例:

        @Bean
        public ItemWriter<Hospital> jdbcBatchItemWriter() {
            JdbcBatchItemWriter<Hospital> jdbcBatchItemWriter = new JdbcBatchItemWriter<>();
            jdbcBatchItemWriter.setDataSource(dataSource);
            jdbcBatchItemWriter.setSql(
                    "INSERT INTO `hospital` (`id`, `org_name`, `org_type`, `addr`, `allow_no`, `cert_dept`, `start_valid_date`, `end_invalid_date`) " +
                            "VALUES (:id, :orgName, :orgType, :addr, :allowNo, :certDept, :startValidDate, :endValidDate)"
            );
            //将对象属性值映射到sql参数中
            jdbcBatchItemWriter.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>());
            return jdbcBatchItemWriter;
        }
  • FlatItemFileWriter

  • 代码示例:

        @Bean
        public ItemWriter<? super Hospital> flatFileItemWriter() {
        FlatFileItemWriter&lt;Hospital&gt; flatFileItemWriter = new FlatFileItemWriter&lt;&gt;();
        flatFileItemWriter.setEncoding("utf-8");
        //设置存放数据的文件路径
        flatFileItemWriter.setResource(new FileSystemResource("f:/hospital_generate.txt"));
    
        flatFileItemWriter.setLineAggregator(new LineAggregator&lt;Hospital&gt;() {
            ObjectMapper objectMapper = new ObjectMapper();
            @Override
            public String aggregate(Hospital item) {
                String result = null;
                try {
                    result = objectMapper.writeValueAsString(item);//将对象转json字符串
                } catch (JsonProcessingException e) {
                    e.printStackTrace();
                }
    
                return result;
            }
        });
        return flatFileItemWriter;
    
    }</code></pre></li>
  • StaxEventItemWriter

  • XstreamMarshaller - xml标签元素与实体对象映射

  • 代码示例

        @Bean
        public ItemWriter<? super Hospital> xmlFileItemWriter() {
            StaxEventItemWriter<Hospital> staxEventItemWriter = new StaxEventItemWriter<>();
            staxEventItemWriter.setEncoding("utf-8");
            staxEventItemWriter.setResource(new FileSystemResource("f:/hospital.xml"));
            staxEventItemWriter.setRootTagName("hospitals");
        //XML标签与实体对象映射
        Map&lt;String,Class&lt;Hospital&gt;&gt; alias = new HashMap&lt;&gt;();
        alias.put("hospital",Hospital.class);
    
        XStreamMarshaller xStreamMarshaller = new XStreamMarshaller();
        xStreamMarshaller.setAliases(alias);
    
        staxEventItemWriter.setMarshaller(xStreamMarshaller);
    
        return staxEventItemWriter;
    
    }</code></pre></li>