diff --git a/spring-batch-2/pom.xml b/spring-batch-2/pom.xml index c429c272bd..77779a0fcf 100644 --- a/spring-batch-2/pom.xml +++ b/spring-batch-2/pom.xml @@ -24,7 +24,6 @@ org.hsqldb hsqldb - ${hsqldb.version} runtime @@ -44,11 +43,17 @@ ${spring.batch.version} test + + org.awaitility + awaitility + ${awaitility.version} + test + 4.3.0 - 2.5.1 + 3.1.1 \ No newline at end of file diff --git a/spring-batch-2/src/main/java/com/baeldung/batchscheduler/SpringBatchScheduler.java b/spring-batch-2/src/main/java/com/baeldung/batchscheduler/SpringBatchScheduler.java new file mode 100644 index 0000000000..bfaa044376 --- /dev/null +++ b/spring-batch-2/src/main/java/com/baeldung/batchscheduler/SpringBatchScheduler.java @@ -0,0 +1,190 @@ +package com.baeldung.batchscheduler; + +import com.baeldung.batchscheduler.model.Book; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.batch.core.Job; +import org.springframework.batch.core.JobExecution; +import org.springframework.batch.core.JobParametersBuilder; +import org.springframework.batch.core.Step; +import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing; +import org.springframework.batch.core.configuration.annotation.JobBuilderFactory; +import org.springframework.batch.core.configuration.annotation.StepBuilderFactory; +import org.springframework.batch.core.launch.JobLauncher; +import org.springframework.batch.core.launch.support.SimpleJobLauncher; +import org.springframework.batch.core.repository.JobRepository; +import org.springframework.batch.core.repository.support.JobRepositoryFactoryBean; +import org.springframework.batch.item.ItemWriter; +import org.springframework.batch.item.file.FlatFileItemReader; +import org.springframework.batch.item.file.builder.FlatFileItemReaderBuilder; +import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper; +import org.springframework.batch.support.transaction.ResourcelessTransactionManager; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.core.io.ClassPathResource; +import org.springframework.jdbc.datasource.DriverManagerDataSource; +import org.springframework.scheduling.TaskScheduler; +import org.springframework.scheduling.annotation.EnableScheduling; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; +import org.springframework.scheduling.support.ScheduledMethodRunnable; + +import javax.sql.DataSource; +import java.util.Date; +import java.util.IdentityHashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +@Configuration +@EnableBatchProcessing +@EnableScheduling +public class SpringBatchScheduler { + + private final Logger logger = LoggerFactory.getLogger(SpringBatchScheduler.class); + + private AtomicBoolean enabled = new AtomicBoolean(true); + + private AtomicInteger batchRunCounter = new AtomicInteger(0); + + private final Map> scheduledTasks = new IdentityHashMap<>(); + + @Autowired + private JobBuilderFactory jobBuilderFactory; + + @Autowired + private StepBuilderFactory stepBuilderFactory; + + @Autowired + private JobLauncher jobLauncher; + + @Scheduled(fixedRate = 2000) + public void launchJob() throws Exception { + Date date = new Date(); + logger.debug("scheduler starts at " + date); + if (enabled.get()) { + JobExecution jobExecution = jobLauncher.run(job(), new JobParametersBuilder().addDate("launchDate", date) + .toJobParameters()); + batchRunCounter.incrementAndGet(); + logger.debug("Batch job ends with status as " + jobExecution.getStatus()); + } + logger.debug("scheduler ends "); + } + + public void stop() { + enabled.set(false); + } + + public void start() { + enabled.set(true); + } + + @Bean + public TaskScheduler poolScheduler() { + return new CustomTaskScheduler(); + } + + private class CustomTaskScheduler extends ThreadPoolTaskScheduler { + + private static final long serialVersionUID = -7142624085505040603L; + + @Override + public ScheduledFuture scheduleAtFixedRate(Runnable task, long period) { + ScheduledFuture future = super.scheduleAtFixedRate(task, period); + + ScheduledMethodRunnable runnable = (ScheduledMethodRunnable) task; + scheduledTasks.put(runnable.getTarget(), future); + + return future; + } + + } + + public void cancelFutureSchedulerTasks() { + scheduledTasks.forEach((k, v) -> { + if (k instanceof SpringBatchScheduler) { + v.cancel(false); + } + }); + } + + @Bean + public Job job() { + return jobBuilderFactory.get("job") + .start(readBooks()) + .build(); + } + +// @Bean +// public JobLauncher jobLauncher() throws Exception { +// SimpleJobLauncher jobLauncher = new SimpleJobLauncher(); +// jobLauncher.setJobRepository(jobRepository()); +// jobLauncher.afterPropertiesSet(); +// return jobLauncher; +// } + +// @Bean +// public JobRepository jobRepository() throws Exception { +// JobRepositoryFactoryBean factory = new JobRepositoryFactoryBean(); +// factory.setDataSource(dataSource()); +// factory.setDatabaseType("HSQL"); +// factory.setTransactionManager(new ResourcelessTransactionManager()); +// return factory.getObject(); +// } + +// @Bean +// public DataSource dataSource() { +// DriverManagerDataSource dataSource = new DriverManagerDataSource(); +// dataSource.setDriverClassName("org.hsqldb.jdbc.JDBCDriver"); +// dataSource.setUrl("jdbc:hsqldb:mem:testdb"); +// dataSource.setUsername("sa"); +// dataSource.setPassword(""); +// return dataSource; +// } + + @Bean + protected Step readBooks() { + return stepBuilderFactory.get("readBooks") + . chunk(2) + .reader(reader()) + .writer(writer()) + .build(); + } + + @Bean + public FlatFileItemReader reader() { + return new FlatFileItemReaderBuilder().name("bookItemReader") + .resource(new ClassPathResource("books.csv")) + .delimited() + .names(new String[] { "id", "name" }) + .fieldSetMapper(new BeanWrapperFieldSetMapper() { + { + setTargetType(Book.class); + } + }) + .build(); + } + + @Bean + public ItemWriter writer() { + return new ItemWriter() { + + @Override + public void write(List items) throws Exception { + logger.debug("writer..." + items.size()); + for (Book item : items) { + logger.debug(item.toString()); + } + + } + }; + } + + public AtomicInteger getBatchRunCounter() { + return batchRunCounter; + } + +} diff --git a/spring-batch-2/src/main/java/com/baeldung/batchscheduler/SpringBatchSchedulerApplication.java b/spring-batch-2/src/main/java/com/baeldung/batchscheduler/SpringBatchSchedulerApplication.java new file mode 100644 index 0000000000..5b89163777 --- /dev/null +++ b/spring-batch-2/src/main/java/com/baeldung/batchscheduler/SpringBatchSchedulerApplication.java @@ -0,0 +1,14 @@ +package com.baeldung.batchscheduler; + +import com.baeldung.batch.SpringBootBatchProcessingApplication; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +@SpringBootApplication +public class SpringBatchSchedulerApplication { + + public static void main(String[] args) { + SpringApplication.run(SpringBatchSchedulerApplication.class, args); + } + +} diff --git a/spring-batch-2/src/main/java/com/baeldung/batchscheduler/model/Book.java b/spring-batch-2/src/main/java/com/baeldung/batchscheduler/model/Book.java new file mode 100644 index 0000000000..7deedeb63e --- /dev/null +++ b/spring-batch-2/src/main/java/com/baeldung/batchscheduler/model/Book.java @@ -0,0 +1,35 @@ +package com.baeldung.batchscheduler.model; + +public class Book { + private int id; + private String name; + + public Book() {} + + public Book(int id, String name) { + super(); + this.id = id; + this.name = name; + } + + public int getId() { + return id; + } + + public void setId(int id) { + this.id = id; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public String toString() { + return "Book [id=" + id + ", name=" + name + "]"; + } + +} diff --git a/spring-batch-2/src/main/resources/application.properties b/spring-batch-2/src/main/resources/application.properties index 0b8c56d3f8..5b34d9e93b 100644 --- a/spring-batch-2/src/main/resources/application.properties +++ b/spring-batch-2/src/main/resources/application.properties @@ -1 +1,8 @@ +spring.datasource.driver-class-name=org.hsqldb.jdbc.JDBCDriver +spring.datasource.url=jdbc:hsqldb:mem:testdb;DB_CLOSE_DELAY=-1;hsqldb.tx=mvcc +spring.datasource.username=sa +spring.datasource.password= + +spring.batch.jdbc.initialize-schema=always + file.input=coffee-list.csv \ No newline at end of file diff --git a/spring-batch-2/src/main/resources/books.csv b/spring-batch-2/src/main/resources/books.csv new file mode 100644 index 0000000000..af68e986a2 --- /dev/null +++ b/spring-batch-2/src/main/resources/books.csv @@ -0,0 +1,4 @@ +1,SHARP OBJECTS (MOVIE TIE-IN): A NOVEL +2,ARTEMIS: A NOVEL +3,HER PRETTY FACE +4,ALL WE EVER WANTED \ No newline at end of file diff --git a/spring-batch-2/src/test/java/com/baeldung/batchscheduler/SpringBatchSchedulerIntegrationTest.java b/spring-batch-2/src/test/java/com/baeldung/batchscheduler/SpringBatchSchedulerIntegrationTest.java new file mode 100644 index 0000000000..64a414fdbf --- /dev/null +++ b/spring-batch-2/src/test/java/com/baeldung/batchscheduler/SpringBatchSchedulerIntegrationTest.java @@ -0,0 +1,69 @@ +package com.baeldung.batchscheduler; + +import com.baeldung.batchscheduler.SpringBatchScheduler; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.springframework.batch.test.context.SpringBatchTest; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.context.ApplicationContext; +import org.springframework.context.annotation.PropertySource; +import org.springframework.scheduling.annotation.ScheduledAnnotationBeanPostProcessor; +import org.springframework.test.annotation.DirtiesContext; +import org.springframework.test.context.ContextConfiguration; +import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; +import org.springframework.test.context.junit4.SpringRunner; + +import static org.awaitility.Awaitility.await; +import static java.util.concurrent.TimeUnit.*; + +@SpringBatchTest +@SpringBootTest +@DirtiesContext +@PropertySource("classpath:application.properties") +@RunWith(SpringRunner.class) +public class SpringBatchSchedulerIntegrationTest { + + @Autowired + private ApplicationContext context; + + @Test + public void stopJobsWhenSchedulerDisabled() throws Exception { + SpringBatchScheduler schedulerBean = context.getBean(SpringBatchScheduler.class); + await().untilAsserted(() -> Assert.assertEquals(2, schedulerBean.getBatchRunCounter() + .get())); + schedulerBean.stop(); + await().atLeast(3, SECONDS); + + Assert.assertEquals(2, schedulerBean.getBatchRunCounter() + .get()); + } + + @Test + public void stopJobSchedulerWhenSchedulerDestroyed() throws Exception { + ScheduledAnnotationBeanPostProcessor bean = context.getBean(ScheduledAnnotationBeanPostProcessor.class); + SpringBatchScheduler schedulerBean = context.getBean(SpringBatchScheduler.class); + await().untilAsserted(() -> Assert.assertEquals(2, schedulerBean.getBatchRunCounter() + .get())); + bean.postProcessBeforeDestruction(schedulerBean, "SpringBatchScheduler"); + await().atLeast(3, SECONDS); + + Assert.assertEquals(2, schedulerBean.getBatchRunCounter() + .get()); + } + + @Test + public void stopJobSchedulerWhenFutureTasksCancelled() throws Exception { + SpringBatchScheduler schedulerBean = context.getBean(SpringBatchScheduler.class); + await().untilAsserted(() -> Assert.assertEquals(2, schedulerBean.getBatchRunCounter() + .get())); + schedulerBean.cancelFutureSchedulerTasks(); + await().atLeast(3, SECONDS); + + Assert.assertEquals(2, schedulerBean.getBatchRunCounter() + .get()); + } + + +} diff --git a/spring-batch/repository.sqlite b/spring-batch/repository.sqlite index a2b87ffa00..b6a954554c 100644 Binary files a/spring-batch/repository.sqlite and b/spring-batch/repository.sqlite differ