2021-02 spring-batch-demo updated

This commit is contained in:
stvort
2021-06-05 00:23:01 +04:00
parent b55c28ab04
commit f3207b8da1
2 changed files with 64 additions and 26 deletions
@@ -8,6 +8,7 @@ import org.springframework.batch.core.configuration.annotation.StepBuilderFactor
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.MethodInvokingTaskletAdapter;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.file.FlatFileItemReader;
@@ -20,13 +21,14 @@ import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.FileSystemResource;
import org.springframework.lang.NonNull;
import ru.otus.example.springbatch.model.Person;
import ru.otus.example.springbatch.service.CleanUpService;
import ru.otus.example.springbatch.service.HappyBirthdayService;
import java.util.List;
@SuppressWarnings("all")
@Configuration
public class JobConfig {
private static final int CHUNK_SIZE = 5;
@@ -42,6 +44,9 @@ public class JobConfig {
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
private CleanUpService cleanUpService;
@StepScope
@Bean
public FlatFileItemReader<Person> reader(@Value("#{jobParameters['" + INPUT_FILE_NAME + "']}") String inputFileName) {
@@ -68,34 +73,47 @@ public class JobConfig {
@StepScope
@Bean
public ItemProcessor processor(HappyBirthdayService happyBirthdayService) {
return (ItemProcessor<Person, Person>) happyBirthdayService::doHappyBirthday;
public ItemProcessor<Person, Person> processor(HappyBirthdayService happyBirthdayService) {
return happyBirthdayService::doHappyBirthday;
}
@StepScope
@Bean
public FlatFileItemWriter writer(@Value("#{jobParameters['" + OUTPUT_FILE_NAME + "']}") String outputFileName) {
return new FlatFileItemWriterBuilder<>()
public FlatFileItemWriter<Person> writer(@Value("#{jobParameters['" + OUTPUT_FILE_NAME + "']}") String outputFileName) {
return new FlatFileItemWriterBuilder<Person>()
.name("personItemWriter")
.resource(new FileSystemResource(outputFileName))
.lineAggregator(new DelimitedLineAggregator<>())
.build();
}
@Bean
public Job importUserJob(Step step1) {
public MethodInvokingTaskletAdapter cleanUpTasklet() {
MethodInvokingTaskletAdapter adapter = new MethodInvokingTaskletAdapter();
adapter.setTargetObject(cleanUpService);
adapter.setTargetMethod("cleanUp");
return adapter;
}
@Bean
public Job importUserJob(Step transformPersonsStep, Step cleanUpStep) {
return jobBuilderFactory.get(IMPORT_USER_JOB_NAME)
.incrementer(new RunIdIncrementer())
.flow(step1)
.flow(transformPersonsStep)
.next(cleanUpStep)
.end()
.listener(new JobExecutionListener() {
@Override
public void beforeJob(JobExecution jobExecution) {
public void beforeJob(@NonNull JobExecution jobExecution) {
logger.info("Начало job");
}
@Override
public void afterJob(JobExecution jobExecution) {
public void afterJob(@NonNull JobExecution jobExecution) {
logger.info("Конец job");
}
})
@@ -103,65 +121,73 @@ public class JobConfig {
}
@Bean
public Step step1(FlatFileItemWriter writer, ItemReader reader, ItemProcessor itemProcessor) {
public Step transformPersonsStep(ItemReader<Person> reader, FlatFileItemWriter<Person> writer,
ItemProcessor<Person, Person> itemProcessor) {
return stepBuilderFactory.get("step1")
.chunk(CHUNK_SIZE)
.<Person, Person>chunk(CHUNK_SIZE)
.reader(reader)
.processor(itemProcessor)
.writer(writer)
.listener(new ItemReadListener() {
.listener(new ItemReadListener<>() {
public void beforeRead() {
logger.info("Начало чтения");
}
public void afterRead(Object o) {
public void afterRead(@NonNull Person o) {
logger.info("Конец чтения");
}
public void onReadError(Exception e) {
public void onReadError(@NonNull Exception e) {
logger.info("Ошибка чтения");
}
})
.listener(new ItemWriteListener() {
public void beforeWrite(List list) {
.listener(new ItemWriteListener<>() {
public void beforeWrite(@NonNull List list) {
logger.info("Начало записи");
}
public void afterWrite(List list) {
public void afterWrite(@NonNull List list) {
logger.info("Конец записи");
}
public void onWriteError(Exception e, List list) {
public void onWriteError(@NonNull Exception e, @NonNull List list) {
logger.info("Ошибка записи");
}
})
.listener(new ItemProcessListener() {
public void beforeProcess(Object o) {
.listener(new ItemProcessListener<>() {
public void beforeProcess(Person o) {
logger.info("Начало обработки");
}
public void afterProcess(Object o, Object o2) {
public void afterProcess(@NonNull Person o, Person o2) {
logger.info("Конец обработки");
}
public void onProcessError(Object o, Exception e) {
logger.info("Ошбка обработки");
public void onProcessError(@NonNull Person o, @NonNull Exception e) {
logger.info("Ошибка обработки");
}
})
.listener(new ChunkListener() {
public void beforeChunk(ChunkContext chunkContext) {
public void beforeChunk(@NonNull ChunkContext chunkContext) {
logger.info("Начало пачки");
}
public void afterChunk(ChunkContext chunkContext) {
public void afterChunk(@NonNull ChunkContext chunkContext) {
logger.info("Конец пачки");
}
public void afterChunkError(ChunkContext chunkContext) {
public void afterChunkError(@NonNull ChunkContext chunkContext) {
logger.info("Ошибка пачки");
}
})
// .taskExecutor(new SimpleAsyncTaskExecutor())
.build();
}
@Bean
public Step cleanUpStep() {
return this.stepBuilderFactory.get("cleanUpStep")
.tasklet(cleanUpTasklet())
.build();
}
}
@@ -0,0 +1,12 @@
package ru.otus.example.springbatch.service;
import org.springframework.stereotype.Service;
@Service
public class CleanUpService {
public void cleanUp() throws Exception {
System.out.println("Выполняю завершающие мероприятия...");
Thread.sleep(1000);
System.out.println("Завершающие мероприятия закончены");
}
}