Spring Batch Parallel Processing Example (original) (raw)

This article is a tutorial about parallel processing in Spring Batch. We will use Spring Boot to speed our development process.

1. Introduction

Spring Batch is a lightweight, scale-able and comprehensive batch framework to handle data at massive scale. Spring Batch builds upon the spring framework to provide intuitive and easy configuration for executing batch applications. Spring Batch provides reusable functions essential for processing large volumes of records, including cross-cutting concerns such as logging/tracing, transaction management, job processing statistics, job restart, skip and resource management.

Spring Batch has a layered architecture consisting of three components:

Let us dive into parallel processing of spring batch with examples of partitioning and parallel jobs.

2. Technologies Used

3. Spring Batch Project

Spring Boot Starters provides more than 30 starters to ease the dependency management for your project. The easiest way to generate a Spring Boot project is via Spring starter tool with the steps below:

A Gradle Project will be generated. If you prefer Maven, use Maven instead of Gradle before generating the project. Import the project into your Java IDE.

3.1 Gradle File

We will look at the generated gradle file for our project. It has detailed configuration outlining the compile time and run time dependencies for our project.

build.gradle

buildscript { ext { springBootVersion = '2.0.1.RELEASE' } repositories { mavenCentral() } dependencies { classpath("org.springframework.boot:spring-boot-gradle-plugin:${springBootVersion}") } }

apply plugin: 'java' apply plugin: 'eclipse' apply plugin: 'idea' apply plugin: 'org.springframework.boot' apply plugin: 'io.spring.dependency-management'

group = 'com.jcg' version = '0.0.1-SNAPSHOT' sourceCompatibility = 1.8

repositories { mavenCentral() }

dependencies { compile('org.springframework.boot:spring-boot-starter-batch') runtime('org.hsqldb:hsqldb') testCompile('org.springframework.boot:spring-boot-starter-test') testCompile('org.springframework.batch:spring-batch-test') }

We will look at an example of running multiple jobs parallelly. Here, jobs are independent of each other and finish execution in a parallel manner. Below we can look at the java configuration to enable parallel processing.

Spring Batch Parallel Flow Configuration

package com.jcg.springbatchparallel.config;

import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.batch.core.Job; import org.springframework.batch.core.Step; import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing; import org.springframework.batch.core.configuration.annotation.JobBuilderFactory; import org.springframework.batch.core.configuration.annotation.StepBuilderFactory; import org.springframework.batch.core.configuration.annotation.StepScope; import org.springframework.batch.core.job.builder.FlowBuilder; import org.springframework.batch.core.job.flow.Flow; import org.springframework.batch.core.launch.support.RunIdIncrementer; import org.springframework.batch.core.partition.support.MultiResourcePartitioner; import org.springframework.batch.core.partition.support.Partitioner; import org.springframework.batch.core.step.tasklet.TaskletStep; import org.springframework.batch.item.ItemWriter; import org.springframework.batch.item.file.FlatFileItemReader; import org.springframework.batch.item.file.mapping.DefaultLineMapper; import org.springframework.batch.item.file.transform.DelimitedLineTokenizer; import org.springframework.batch.repeat.RepeatStatus; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.core.io.UrlResource; import org.springframework.core.io.support.PathMatchingResourcePatternResolver; import org.springframework.core.task.SimpleAsyncTaskExecutor;

import java.net.MalformedURLException; import java.util.LinkedHashMap; import java.util.Map; import java.util.stream.IntStream;

@Configuration @EnableBatchProcessing public class BatchConfiguration {

Logger logger = LoggerFactory.getLogger(BatchConfiguration.class);

@Autowired
JobBuilderFactory jobBuilderFactory;

@Autowired
StepBuilderFactory stepBuilderFactory;

private TaskletStep taskletStep(String step) {
    return stepBuilderFactory.get(step).tasklet((contribution, chunkContext) -> {
        IntStream.range(1, 100).forEach(token -> logger.info("Step:" + step + " token:" + token));
        return RepeatStatus.FINISHED;
    }).build();

}


@Bean
public Job parallelStepsJob() {

    Flow masterFlow = new FlowBuilder("masterFlow").start(taskletStep("step1")).build();


    Flow flowJob1 = new FlowBuilder("flow1").start(taskletStep("step2")).build();
    Flow flowJob2 = new FlowBuilder("flow2").start(taskletStep("step3")).build();
    Flow flowJob3 = new FlowBuilder("flow3").start(taskletStep("step4")).build();

    Flow slaveFlow = new FlowBuilder("slaveFlow")
            .split(new SimpleAsyncTaskExecutor()).add(flowJob1, flowJob2, flowJob3).build();

    return (jobBuilderFactory.get("parallelFlowJob")
            .incrementer(new RunIdIncrementer())
            .start(masterFlow)
            .next(slaveFlow)
            .build()).build();

}

}

Let us dive ahead and run the code in our Java IDE to observe the results.

Spring Batch Sequential Step 1

Spring Batch Parallel Steps -> 2,3,4

Use case above is used in places where a set of jobs are dependent on an initial job for completion after which they can be completely parallelized. An initial job can be a tasklet doing minimal processing to provide a baseline while the slave jobs execute the actual logic in parallel. Spring batch waits for all the jobs in SlaveFlow to provide aggregated exit status.

5. Spring Batch Partitioning

There is another use case of parallel processing in Spring which is via partitioning. Let us consider the scenario with the example of a huge file. Multiple threads reading the same file will not ensure increased performance as the I/O resource is still one and may even lead to performance degradation. In such cases, we split a single file into multiple files and each file can be processed in the same thread. In our example, a single file person.txt containing 50 records has been split into 10 files each containing 5 records. This can be achieved by using the split command

split -l 5 person.txt person

The above command creates files with names like personaa, personab etc. We will then configure Spring Batch to process these files parallelly for faster execution. Below is the batch configuration for the same.

Spring Batch Partitioning Configuration

@Bean public Job partitioningJob() throws Exception { return jobBuilderFactory.get("parallelJob") .incrementer(new RunIdIncrementer()) .flow(masterStep()) .end() .build(); }

@Bean
public Step masterStep() throws Exception {
    return stepBuilderFactory.get("masterStep")
            .partitioner(slaveStep())
            .partitioner("partition", partitioner())
            .gridSize(10)
            .taskExecutor(new SimpleAsyncTaskExecutor())
            .build();
}

@Bean
public Partitioner partitioner() throws Exception {
    MultiResourcePartitioner partitioner = new MultiResourcePartitioner();
    PathMatchingResourcePatternResolver resolver = new PathMatchingResourcePatternResolver();
    partitioner.setResources(resolver.getResources("file://persona*"));
    return partitioner;

}

@Bean
public Step slaveStep() throws Exception {
    return stepBuilderFactory.get("slaveStep")
            .<Map<String, String>, Map<String, String>>chunk(1)
            .reader(reader(null))
            .writer(writer())
            .build();
}

@Bean
@StepScope
public FlatFileItemReader<Map<String, String>> reader(@Value("#{stepExecutionContext['fileName']}") String file) throws MalformedURLException {
    FlatFileItemReader<Map<String, String>> reader = new FlatFileItemReader<>();
    reader.setResource(new UrlResource(file));

    DefaultLineMapper<Map<String, String>> lineMapper = new DefaultLineMapper<>();
    DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer(":");
    tokenizer.setNames("key", "value");

    lineMapper.setFieldSetMapper((fieldSet) -> {
        Map<String, String> map = new LinkedHashMap<>();
        map.put(fieldSet.readString("key"), fieldSet.readString("value"));
        return map;
    });
    lineMapper.setLineTokenizer(tokenizer);
    reader.setLineMapper(lineMapper);

    return reader;
}

@Bean
public ItemWriter<Map<String, String>> writer() {
    return (items) -> items.forEach(item -> {
        item.entrySet().forEach(entry -> {
            logger.info("key->[" + entry.getKey() + "] Value ->[" + entry.getValue() + "]");
        });
    });
}

Below is the output of running the parallelJob.

Spring Batch parallel job

We can clearly see the task executor context switching as each task executor concurrently logs people names.

6. Summary

In this example, we have demonstrated parallel processing features of Spring Batch. We saw two approaches to parallel processing with Spring Batch. Partitioning has seen widespread use in many of the applications. The former is parallelizing multiple jobs, while partitioning is parallelizing a single job. Both have its own use in applications.

7. Download the Source Code

Photo of Rajagopal ParthaSarathi

Rajagopal works in software industry solving enterprise-scale problems for customers across geographies specializing in distributed platforms. He holds a masters in computer science with focus on cloud computing from Illinois Institute of Technology. His current interests include data science and distributed computing.