spring batch : parallel processing(read file)

This commit is contained in:
haerong22
2021-05-13 14:22:43 +09:00
parent 1a73fd9850
commit 76b761102e
6 changed files with 223 additions and 1 deletions

View File

@@ -0,0 +1,157 @@
package com.example.springbatch.application.job;
import com.example.springbatch.application.model.BoardModel;
import com.example.springbatch.domain.entity.Article;
import com.example.springbatch.domain.entity.Board;
import com.example.springbatch.utils.FileUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.JobScope;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.core.partition.support.MultiResourcePartitioner;
import org.springframework.batch.core.partition.support.Partitioner;
import org.springframework.batch.core.partition.support.TaskExecutorPartitionHandler;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.builder.FlatFileItemReaderBuilder;
import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.FileSystemResource;
import org.springframework.core.io.Resource;
import org.springframework.core.io.UrlResource;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.util.StringUtils;
import java.io.File;
import java.net.MalformedURLException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.LocalDateTime;
@Configuration
@Slf4j
public class CreateBoardJobConfig {
private static final int CHUNK_SIZE = 1000;
private static final int GRID_SIZE = 10;
private static final int POOL_SIZE = 5;
private final JobBuilderFactory jobBuilderFactory;
private final StepBuilderFactory stepBuilderFactory;
private final JdbcTemplate demoJdbcTemplate;
public CreateBoardJobConfig(JobBuilderFactory jobBuilderFactory,
StepBuilderFactory stepBuilderFactory,
@Qualifier("demoJdbcTemplate") JdbcTemplate demoJdbcTemplate) {
this.jobBuilderFactory = jobBuilderFactory;
this.stepBuilderFactory = stepBuilderFactory;
this.demoJdbcTemplate = demoJdbcTemplate;
}
@Bean
public Job createBoardJob() throws MalformedURLException {
return jobBuilderFactory.get("createBoardJob")
.incrementer(new RunIdIncrementer())
.start(createBoardManager())
.build();
}
@Bean
public Step createBoardManager() throws MalformedURLException {
return stepBuilderFactory.get("createBoardManager")
.partitioner("createBoardPartitioner", createBoardPartitioner())
.partitionHandler(createBoardPartitionHandler())
.build();
}
@Bean
@StepScope
public Partitioner createBoardPartitioner() {
MultiResourcePartitioner partitioner = new MultiResourcePartitioner();
Path path = Paths.get("/Users/bobby/Desktop/kim/study/data");
Resource[] resources = FileUtils.stream(path)
.filter(File::isFile)
.filter(file -> "csv".equals(StringUtils.getFilenameExtension(file.getPath())))
.map(FileSystemResource::new)
.toArray(Resource[]::new);
partitioner.setResources(resources);
partitioner.partition(GRID_SIZE);
return partitioner;
}
@Bean
public ThreadPoolTaskExecutor createBoardTaskExecutor() {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setCorePoolSize(POOL_SIZE);
taskExecutor.setMaxPoolSize(POOL_SIZE);
taskExecutor.setWaitForTasksToCompleteOnShutdown(true);
taskExecutor.initialize();
return taskExecutor;
}
@Bean
public TaskExecutorPartitionHandler createBoardPartitionHandler() throws MalformedURLException {
TaskExecutorPartitionHandler partitionHandler = new TaskExecutorPartitionHandler();
partitionHandler.setStep(createBoardWorker());
partitionHandler.setGridSize(GRID_SIZE);
partitionHandler.setTaskExecutor(createBoardTaskExecutor());
return partitionHandler;
}
@Bean
public Step createBoardWorker() throws MalformedURLException {
return stepBuilderFactory.get("createBoardStep")
.<BoardModel, Board>chunk(1000)
.reader(createBoardReader(null))
.processor(createBoardProcessor())
.writer(createBoardWriterJDBC())
.build();
}
@Bean
@StepScope
public FlatFileItemReader<BoardModel> createBoardReader(@Value("#{stepExecutionContext[fileName]}") String fileName) throws MalformedURLException {
return new FlatFileItemReaderBuilder<BoardModel>()
.name("createBoardReader")
.resource(new UrlResource(fileName))
.delimited()
.names("title", "content")
.fieldSetMapper(new BeanWrapperFieldSetMapper<>())
.targetType(BoardModel.class)
.build();
}
@Bean
public ItemProcessor<BoardModel, Board> createBoardProcessor() {
LocalDateTime now = LocalDateTime.now();
return boardModel -> Board.builder()
.title(boardModel.getTitle())
.content(boardModel.getContent())
.createdAt(now)
.build();
}
// JDBC
@Bean
public ItemWriter<Board> createBoardWriterJDBC() {
return boards -> demoJdbcTemplate.batchUpdate("insert into Board (title, content, createdAt) values (?, ?, ?)",
boards,
CHUNK_SIZE,
(ps, board) -> {
ps.setObject(1, board.getTitle());
ps.setObject(2, board.getContent());
ps.setObject(3, board.getCreatedAt());
});
}
}

View File

@@ -1,4 +1,4 @@
package com.example.springbatch.application.listener; package com.example.springbatch.application.job.listener;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;

View File

@@ -0,0 +1,13 @@
package com.example.springbatch.application.model;
import lombok.Getter;
import lombok.Setter;
@Getter
@Setter
public class BoardModel {
private String title;
private String content;
}

View File

@@ -0,0 +1,27 @@
package com.example.springbatch.domain.entity;
import lombok.*;
import javax.persistence.Entity;
import javax.persistence.GeneratedValue;
import javax.persistence.GenerationType;
import javax.persistence.Id;
import java.time.LocalDateTime;
@Entity
@Getter
@Setter
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class Board {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private long id;
private String title;
private String content;
private LocalDateTime createdAt;
}

View File

@@ -0,0 +1,9 @@
package com.example.springbatch.domain.repository;
import com.example.springbatch.domain.entity.Board;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.stereotype.Repository;
@Repository
public interface BoardRepository extends JpaRepository<Board, Long> {
}

View File

@@ -0,0 +1,16 @@
package com.example.springbatch.utils;
import lombok.SneakyThrows;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.stream.Stream;
public class FileUtils {
@SneakyThrows
public static Stream<File> stream(Path path) {
return Files.list(path).map(Path::toFile);
}
}