Spring Batch MultiLineItemReader with MultiResourcePartitioner

Niraj Sonawane

I have a File which has Multiline data like this. DataID is Start of a new record. e.g. One record is a combination of ID and concatenating below line until the start of a new record.

    >DataID1
    Line1asdfsafsdgdsfghfghfghjfgjghjgxcvmcxnvm
    Line2asdfsafsdgdsfghfghfghjfgjghjgxcvmcxnvm
    Line3asdfsafsdgdsfghfghfghjfgjghjgxcvmcxnvm
    >DataID2
    DataID2asdfsafsdgdsfghfghfghjfgjghjgxcvmcxnvm
    >DataID3
    DataID2asdfsafsdgdsfghfghfghjfgjghjgxcvmcxnvm

I was able to implement this using SingleItemPeekableItemReader and it's working fine.

I am not trying to implement partition, As we need to process multiple files. I am not sure how the partitioner is passing file info to my customer reader and how to make my SingleItemPeekableItemReader thread safe as it not working correctly

Need some inputs as I am stuck at this point

java-config

@Bean
      public Partitioner partitioner() {
          MultiResourcePartitioner partitioner = new MultiResourcePartitioner();
          partitioner.setResources(resources);          
          partitioner.partition(10);      
          return partitioner;
      }
      @Bean
      public TaskExecutor taskExecutor() {
          ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
          taskExecutor.setMaxPoolSize(4);
          taskExecutor.setCorePoolSize(4);
          taskExecutor.setQueueCapacity(8);
          taskExecutor.afterPropertiesSet();
          return taskExecutor;
      }   

      @Bean
      @Qualifier("masterStep")
      public Step masterStep() {
          return stepBuilderFactory.get("masterStep")                  
                  .partitioner("step1",partitioner())
                  .step(step1())
                  .taskExecutor(taskExecutor())                  
                  .build();
      }

     @Bean
      public MultiResourceItemReader<FieldSet> multiResourceItemReader() {
        log.info("Total Number of Files to be process {}",resources.length);
        report.setFileCount(resources.length);
        MultiResourceItemReader<FieldSet> resourceItemReader = new MultiResourceItemReader<FieldSet>();     
        resourceItemReader.setResources(resources);     
        resourceItemReader.setDelegate(reader());       
        return resourceItemReader;
      }

    @Bean
    public FlatFileItemReader<FieldSet> reader() {
         FlatFileItemReader<FieldSet> build = new FlatFileItemReaderBuilder<FieldSet>().name("fileReader")              
                .lineTokenizer(orderFileTokenizer())
                .fieldSetMapper(new FastFieldSetMapper())                   
                .recordSeparatorPolicy(new BlankLineRecordSeparatorPolicy())
                .build();        
         build.setBufferedReaderFactory(gzipBufferedReaderFactory);
         return build;
    }

    @Bean
    public SingleItemPeekableItemReader<FieldSet> readerPeek() {
        SingleItemPeekableItemReader<FieldSet> reader = new SingleItemPeekableItemReader<>();
        reader.setDelegate(multiResourceItemReader());
        return reader;
    }

    @Bean
    public MultiLineFastaItemReader itemReader() {
        MultiLineFastaItemReader itemReader = new MultiLineFastaItemReader(multiResourceItemReader());
        itemReader.setSingalPeekable(readerPeek());     
        return itemReader;
    }

    @Bean
    public PatternMatchingCompositeLineTokenizer orderFileTokenizer() {
        PatternMatchingCompositeLineTokenizer tokenizer = new PatternMatchingCompositeLineTokenizer();
        Map<String, LineTokenizer> tokenizers = new HashMap<>(2);
        tokenizers.put(">*", head());
        tokenizers.put("*", tail());
        tokenizer.setTokenizers(tokenizers);
        return tokenizer;
    }

    public DelimitedLineTokenizer head() {
        DelimitedLineTokenizer token = new DelimitedLineTokenizer();
        token.setNames("sequenceIdentifier");
        token.setDelimiter(" ");
        token.setStrict(false);
        return token;
    }

    public DelimitedLineTokenizer tail() {
        DelimitedLineTokenizer token = new DelimitedLineTokenizer();
        token.setNames("sequences");
        token.setDelimiter(" ");
        return token;
    }

    @Bean
    public FastReportWriter writer() {
        return new FastReportWriter();
    }

    @Bean
    public Job importUserJob(JobCompletionNotificationListener listener, Step step1) {
        return jobBuilderFactory.get("importUserJob")
                .incrementer(new RunIdIncrementer())
                .listener(listener)
                .flow(masterStep())
                //.flow(step1)
                .next(step2())
                .end()
                .build();
    }

    @Bean
    public Step step1() {
        return stepBuilderFactory.get("step1")
                .<Fasta, Fasta>chunk(5000)
                .reader(itemReader())
                .processor(new FastaIteamProcessor())
                //.processor(new PassThroughItemProcessor<>())
                .writer(writer())
                .build();
    }

public class MultiLineFastaItemReader implements ItemReader<Fasta>, ItemStream {

    private static final Logger log = LoggerFactory.getLogger(MultiLineFastaItemReader.class);
    private SingleItemPeekableItemReader<FieldSet> singalPeekable;

    AtomicInteger iteamCounter = new AtomicInteger(0);

    ConcurrentHashMap<String, AtomicInteger> fileNameAndCounterMap = new ConcurrentHashMap<>();

    @Autowired
    private SequenceFastaReport sequenceFastaReport;

    private MultiResourceItemReader<FieldSet> resourceItemReader;

    public MultiLineFastaItemReader(MultiResourceItemReader<FieldSet> multiResourceItemReader) {

        this.resourceItemReader = multiResourceItemReader;
    }

    public SingleItemPeekableItemReader<FieldSet> getSingalPeekable() {
        return singalPeekable;
    }

    public void setSingalPeekable(SingleItemPeekableItemReader<FieldSet> singalPeekable) {
        this.singalPeekable = singalPeekable;

    }

    @Override
    public Fasta read() throws Exception {
        FieldSet item = singalPeekable.read();
        if (item == null) {
            return null;
        }
        Fasta fastaObject = new Fasta();

        log.info("ID {} fileName {}", item.readString(0), resourceItemReader.getCurrentResource());
        fastaObject.setSequenceIdentifier(item.readString(0)
                .toUpperCase());
        fastaObject.setFileName(resourceItemReader.getCurrentResource()
                .getFilename());

        if (!fileNameAndCounterMap.containsKey(fastaObject.getFileName())) {
            fileNameAndCounterMap.put(fastaObject.getFileName(), new AtomicInteger(0));

        }

        while (true) {

            FieldSet possibleRelatedObject = singalPeekable.peek();
            if (possibleRelatedObject == null) {
                if (fastaObject.getSequenceIdentifier()
                        .length() < 1)
                    throw new InvalidParameterException("Somwthing Wrong in file");
                sequenceFastaReport.addToReport(fileNameAndCounterMap.get(fastaObject.getFileName())
                        .incrementAndGet(), fastaObject.getSequences());
                return fastaObject;
            }

            if (possibleRelatedObject.readString(0)
                    .startsWith(">")) {
                if (fastaObject.getSequenceIdentifier()
                        .length() < 1)
                    throw new InvalidParameterException("Somwthing Wrong in file");

                sequenceFastaReport.addToReport(fileNameAndCounterMap.get(fastaObject.getFileName())
                        .incrementAndGet(), fastaObject.getSequences());

                return fastaObject;
            }
            String data = fastaObject.getSequences()
                    .toUpperCase();
            fastaObject.setSequences(data + singalPeekable.read()
                    .readString(0)
                    .toUpperCase());

        }

    }

    @Override
    public void close() {
        this.singalPeekable.close();
    }

    @Override
    public void open(ExecutionContext executionContext) {
        this.singalPeekable.open(executionContext);

    }

    @Override
    public void update(ExecutionContext executionContext) {

        this.singalPeekable.update(executionContext);
    }

}
Mahmoud Ben Hassine

I am not sure how the partitioner is passing file info to my customer reader

The partitioner will create partition meta-data in step execution contexts and your reader should read that meta-data from it. In your example, you don't need to call partition on the partitioner, Spring Batch will do it. You need instead to set the partition key on the partitioner, for example:

  @Bean
  public Partitioner partitioner() {
      MultiResourcePartitioner partitioner = new MultiResourcePartitioner();
      partitioner.setResources(resources);          
      partitioner.setKeyName("file");     
      return partitioner;
  }

This will create a partition for each file with the key file that you can get in your reader from the step execution context:

@Bean
@StepScope
public FlatFileItemReader reader(@Value("#{stepExecutionContext['file']}") String file) {
    // define your reader 
}

Note that the reader should be step scoped to use this feature. More details here: https://docs.spring.io/spring-batch/4.0.x/reference/html/step.html#late-binding

この記事はインターネットから収集されたものであり、転載の際にはソースを示してください。

侵害の場合は、連絡してください[email protected]

編集
0

コメントを追加

0

関連記事

分類Dev

Spring Batch:MultiResourcePartitionerリソースを遅延設定する方法

分類Dev

Spring Batch:MultiResourcePartitionerリソースを遅延設定する方法

分類Dev

Spring Batch + Spring Boot + Couchbase

分類Dev

CallableTaskletAdapter Spring Batch

分類Dev

Spring Batch Writer

分類Dev

Spring Batch FlatFile Formatting

分類Dev

Transaction management with Spring Batch

分類Dev

Spring Batch : custom ItemReader

分類Dev

Spring Boot + Spring Batch + Spring JPA

分類Dev

Spring Batch Slow Write and Read

分類Dev

Spring Batch Beanの配置

分類Dev

spring-boot-batch with mongodb

分類Dev

Spring batch parsing on different data

分類Dev

Spring batch custom DB schema init

分類Dev

Disable transactions in my Spring Batch job

分類Dev

How to execute a stored procedure in postgresql with spring batch?

分類Dev

Testing of spring batch job causes unexpected results

分類Dev

Spring batch flow declaration using java config

分類Dev

Spring Batch Persist Job Meta Data

分類Dev

Spring Batch: get list of defined jobs at runtime

分類Dev

Spring batch how to skip entire file on condition

分類Dev

Spring batch how to skip entire file on condition

分類Dev

Spring batch return custom process exit code

分類Dev

Spring Batch Item Writer Listener not Working

分類Dev

Spring Batch Item Writer Listener not Working

分類Dev

NoSuchJobException when running a job programmatically in Spring Batch

分類Dev

Is Spring Batch's ItemWriter a singleton class?

分類Dev

Spring batch - get information about files in a directory

分類Dev

Spring Batch job execution status in response body

Related 関連記事

ホットタグ

アーカイブ