spring batch in practice
table of content
- summary of meta-data table
- summary of spring batch architecture
- creation of parallel batch job
- thread pool
- creation of synchronous batch job
- job status
- job success
- job fail
- scheduling task
- transaction
- re-run strategy
- distributed deployment
- example with spring boot
summary of meta-data table
Meta-Data table design details refer to : spring docs spring docs
Use above link to find and digging into spring batch data module in database . In here , this article ,we will only summarize something must be aware for developer .
- The initial DDL scripts placed in the packages : org.springfreamework.batch.core of spring-batch-core.xx.jar
- The spring batch configuration file reside in spring-batch-core.xx.jar named as : batch-xx.properties . you can change the configuration optional according to this file optional
- optimistic locking stategy
- table structure below

the primary table are : batch_job_instance , batch_job_execution , batch_setp_excution make up the overall hierarchy of spring batch data module .
summary of architecture
refer to : spring batch docs Use this link for find and digging into design architecture of spring batch , in this sections , will walk you through the summary of spring batch .
The following diagram for overall hierarchy

creation of parallel batch job
Go and find the details of parallel job
example with spring boot
This example demonstrate how to upload data into database from excel .
First , configure spring to enable batch processing
@Configuration
@EnableBatchProcessing
@EnableScheduling
public class BatchConfiguration
@EnableBatchProcessingannotation on class to enable batch job@EnableSchedulingenable spring schedule component to schedule our batch job , beside launch batch job by web application online
beside above configuration , in application.properties , batch job global configuration are needed
spring.batch.table-prefix=BATCH_
spring.batch.initialize-schema=always
spring.batch.job.enabled=false
In addition , we should also enable spring transaction management for batch job transaction.
@SpringBootApplication(scanBasePackages = {"com.a**s.c**t"})
@EnableCaching
@EnableTransactionManagement
public class StarterWithSpringBoot {
Now , let to configure JobLauncher TaskExeuctor JobRepository Scheduled in BatchConfiguration class for global configuration.
Using JobLauncher to launcher our batch job
@Bean
public JobLauncher jobLauncher(JobRepository jobRepository,TaskExecutor taskExecutor) {
SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
jobLauncher.setTaskExecutor(taskExecutor);
jobLauncher.setJobRepository(jobRepository);
return jobLauncher;
}
TaskExecutorwe usingThreadPoolTaskExecutorimplements ofTaskExecutorfor parallelism batch job executions
@Bean
public TaskExecutor taskExecutor() {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setCorePoolSize(10);
taskExecutor.setMaxPoolSize(50);
taskExecutor.setThreadNamePrefix("***-business");
taskExecutor.initialize();
return taskExecutor;
}
JobRepositorydatabase repository for batch persisence
@Bean
public JobRepository jobRepository(DataSource dataSource, PlatformTransactionManager transactionManager) throws Exception {
JobRepositoryFactoryBean factoryBean = new JobRepositoryFactoryBean();
factoryBean.setDataSource(dataSource);
factoryBean.setTransactionManager(transactionManager);
JobRepository jobRepository = factoryBean.getObject();
return jobRepository;
}
We can use spring scheduler component to schedule batch job , for example,
@Scheduled(cron = "*/5 * * * * ?")
public void medicalCardDayEndTask() {
// logger.info("*******************spring batch scheduler running ***************");
}
Throughout About step ,global configuration complete , follow by job configuration
second , job configuration
Create a component JobConfiguration for job configuration
@Component
public class UploadFileJob
create a job in JobConfiguration
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private JobCompletionNotificationListener jobCompletionNotificationListener;
public void createUploadFileJob(String path) throws JobParametersInvalidException, JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException {
Job job = jobBuilderFactory.get("UploadJob")
.incrementer(new RunIdIncrementer())
.listener(jobCompletionNotificationListener)
.flow(uploadStep(path))
.next(dataSqlValidationStep())
.end()
.build();
runJob(job);
}
JobBuilderFactoryprivoded by Spring Batch , be used to build a jobJobCompletionNotificationListenera implements ofJobExecutionListenerSupport, listen to job exeuction events , like completed , failed and so on.
@Component
public class JobCompletionNotificationListener extends JobExecutionListenerSupport {
@Override
public void afterJob(JobExecution jobExecution) {
if (jobExecution.getStatus() == BatchStatus.COMPLETED) {
JobParameters jobParameters = jobExecution.getJobParameters();
Date time = jobParameters.getDate("time");
logger.info("************ time:take time:{}MS;start time:{}********", System.currentTimeMillis() - time.getTime(),time );
} else if (jobExecution.getStatus() == BatchStatus.FAILED) {
}
}
@Override
public void beforeJob(JobExecution jobExecution) {
super.beforeJob(jobExecution);
}
}
uploadStep(path)a method to create a step to upload data into database from excel .
@Autowired
private StreamerItemReaderBuilder streamerItemReaderBuilder ;
@Autowired
private FileUploadProcessor FileUploadProcessor;
@Autowired
FileUploadWriter<FileUploadPo> writer;
@Autowired
ExcelDataSupplementListener excelDataSupplementListener;
@Autowired
FileUploadValidatorListener FileUploadValidatorListener;
@Autowired
ValidationErrorEventListener errorEventListener;
private Step fileUploadStep(String path) {
return stepBuilderFactory.get("uploadFileStep1")
.<FileUploadExcel, FileUploadPo> chunk(1000)
.reader(streamerItemReaderBuilder.reader(path))
.processor(FileUploadProcessor)
.writer(writer)
.listener(excelDataSupplementListener)
.listener(FileUploadValidatorListener)
.listener(errorEventListener)
.build();
}
FileUploadExceljava bean used byItemReaderfor extract data from excel , one field in java bean take responsibility for one column in excel.FileUploadPoa persistence object used by repository to store data into databaseStreamerItemReaderBuilderusing to build aItemReader
@Component
public class StreamerItemReaderBuilder {
public StreamerItemReader<FileUploadExcel> build(String path) {
StreamerItemReader<FileUploadExcel> reader = new StreamerItemReader<FileUploadExcel>();
reader.setResource(new FileSystemResource(path));
reader.setLinesToSkip(2);
reader.setMaxNumberOfSheets(0);
reader.setRowMapper(rowMapper());
DefaultRowSetFactory rowSetFactory = new DefaultRowSetFactory();
RowNumberColumnNameExtractor columnNameExtractor = new RowNumberColumnNameExtractor();
columnNameExtractor.setRowNumberOfColumnNames(1);
rowSetFactory.setColumnNameExtractor(columnNameExtractor);
reader.setRowSetFactory(rowSetFactory);
return reader;
}
private RowMapper<FileUploadExcel> rowMapper() {
BeanWrapperRowMapper<FileUploadExcel> rowMapper = new BeanWrapperRowMapper<FileUploadExcel>();
rowMapper.setDistanceLimit(1);
rowMapper.setStrict(false);
rowMapper.setTargetType(FileUploadExcel.class);
ColumnGroup products = new ColumnGroup("products");
policyProducts.addColumnName("pl**o", "cover*ode", "status", "propo*A", "ne*mt");
rowMapper.addColumnGroup(products);
return rowMapper;
}
}
FileUploadProcessorusing for data transform fromFileUploadExceltoFileUploadPofor persistence
@Component
public class FileUploadProcessor implements ItemProcessor<FileUploadExcel,FileUploadPo> {
@Override
public FileUploadPo process(FileUploadExcel fileUploadExcel) throws Exception {
FileUploadPo po = new FileUploadPo();
IBeanUtils.copyProperties(fileUploadExcel,po);
return po;
}
}
writeris a instance ofFileUploadWriter<FileUploadPo>that is implments ofItemWriterbe using for data persistence
@Component
public class FileUploadWriter<T> implements ItemWriter<FileUploadPo> {
@Autowired
FileUploadDataPoRepository repository;
@Override
public void write(List<? extends FileUploadPo > items) throws Exception {
logger.info("upload writer:{}",items);
long time = System.currentTimeMillis();
repository.saveAll(items);
logger.info("using time:{}",System.currentTimeMillis() - time);
}
}
excelDataSupplementListeneris a instance of job event listener be used to supplement data after read before process.
@Component
public class ExcelDataSupplementListener {
static final SimpleDateFormat DATE_FORMAT =new SimpleDateFormat("MM-dd-yyyy");
@AfterRead
public void supplement(FileUploadExcel excel) {
if (excel.getDate() != null) {
Date date = new Date();
date.setTime(Long.valueOf(excel.getDate()));
excel.setDate(DATE_FORMAT.format(date));
}
}
}
FileUploadValidatorListeneris a instance of job event listener be used to validate data before process
@Component
public class FileUploadValidatorListener {
@Autowired
ApplicationContext applicationContext;
@BeforeProcess
public void verify(FileUploadExcel excelBean) {
doVerify(new ClientValidator(excelBean));
doVerify(new DateValidator(excelBean));
}
private void doVerify(Validator validator) {
if (!validator.verify())
handleErrorMessage(validator);
}
private void handleErrorMessage(Validator validator) {
ValidationErrorMessageEvent event = new ValidationErrorMessageEvent(this, validator.getErrorMessage());
applicationContext.publishEvent(event);
}
}
applicationContextused to publish a validation error event when validation errorValidationErrorMessageEventa validation error even implementedApplicationEventwithin spring
public class ValidationErrorMessageEvent extends ApplicationEvent {
List<ValidationMessageOfFileUpload> errorMessage = new LinkedList<>();
public List<ValidationMessageOfFileUpload> getErrorMessage() {
return errorMessage;
}
/**
* Create a new ApplicationEvent.
*
* @param source the object on which the event initially occurred (never {@code null})
*/
public ValidationErrorMessageEvent(Object source,List<ValidationMessageOfFileUpload> errorMessage) {
super(source);
this.errorMessage = errorMessage;
}
}
ValidationMessageOfFileUploada java bean for erorr messageValidationErrorEventListenerthis event listener will listen toValidationErrorMessageEventaways
@Component
@Slf4j
public class ValidationErrorEventListener {
List<ValidationMessageOfFileUpload> errorMessages = new LinkedList<>();
@Autowired
FileUploadErrorMessagePoRepository repository;
@EventListener(classes = ValidationErrorMessageEvent.class)
public void handleErrorMessage(MfuValidationErrorMessageEvent event) {
errorMessages.addAll(event.getErrorMessage());
if (errorMessages.size() >= 1000) {
log.info("handle error message,the number of error message is :{}", errorMessages.size());
persist(errorMessages);
errorMessages.clear();
}
}
private void persist(List<ValidationMessageOfFileUpload> errorMessages) {
List<FileUploadErrorMessagePo> pos = errorMessages.stream().map(error -> createPo(error)).collect(Collectors.toList());
repository.saveAll(pos);
}
private FileUploadErrorMessagePo createPo(ValidationMessageOfFileUpload error) {
FileUploadErrorMessagePo po = new FileUploadErrorMessagePo();
po.setRowNumber(error.getRowNumber());
po.setMessage(error.getMessage());
//TODO other property should be supplemented
return po;
}
@AfterStep
public void handleErrorMessageLastBatch(StepExecution stepExecution) {
if (errorMessages.size() > 0) {
log.info("handle file upload error message last batch ,the number of last batch is {}:", errorMessages.size());
persist(errorMessages);
errorMessages.clear();
}
}
}
@AfterStepon methodhandleErrorMessageLastBatchto listen to the job event , so that system can handle last batch error message to persist .
dataSqlValidationStep()create validaton setp
@Autowired
SqlValidatorReaderBuilder sqlValidatorReaderBuilder;
@Autowired
SqlValidatorProcessor sqlValidatorProcessor;
@Autowired
SqlValidatorWriter sqlValidatorWriter;
private Step dataSqlValidationStep() {
return stepBuilderFactory.get("dataSqlValidationStep")
.<SqlValidator,SqlValidator>chunk(1)
.reader(sqlValidatorReaderBuilder.build())
.processor(sqlValidatorProcessor)
.writer(sqlValidatorWriter)
.build();
}
sqlValidatorReaderBuilderbuild aItemReaderto read item like this
@Component
public class SqlValidatorReaderBuilder {
@Autowired
ApplicationContext context;
public ListItemReader<SqlValidator> build() {
Map<String, SqlValidator> validators = context.getBeansOfType(SqlValidator.class);
ListItemReader<SqlValidator> reader = new ListItemReader<>(validators.values().stream().collect(Collectors.toList()));
return reader;
}
}
sqlValidatorProcessora validation processor like below,
@Component
public class SqlValidatorProcessor implements ItemProcessor<SqlValidator, SqlValidator> {
@Autowired
ApplicationContext context;
@Override
public SqlValidator process(SqlValidator item) throws Exception {
if (item != null) {
item.verify();
}
return item;
}
}
sqlValidatorWritera writer to persist data
@Component
public class SqlValidatorWriter implements ItemWriter<SqlValidator> {
@Autowired
UploadDataPoRepository repository;
@Override
public void write(List<? extends SqlValidator> items) throws Exception {
}
}
- spring JPA batch insert configuration
Configure batch insert in application.properties
hibernate.jdbc.batch_size=1000
hibernate.order_inserts=true
hibernate.order_updates=true
hibernate.jdbc.batch_versioned_data=true
spring.jpa.hibernate.jdbc.batch_size=1000
spring.jpa.hibernate.order_inserts=true
spring.jpa.hibernate.order_updates=true
spring.jpa.hibernate.jdbc.batch_versioned_data=true
spring.jpa.properties.hibernate.jdbc.batch_size=1000
spring.jpa.properties.hibernate.order_inserts=true
spring.jpa.properties.hibernate.order_updates=true
spring.jpa.properties.hibernate.jdbc.batch_versioned_data=true
Throughout above step by step , we build up a spring batch example. and walked you through the process of batch job