diff --git a/spring-batch/src/main/java/org/baeldung/batchscheduler/SpringBatchScheduler.java b/spring-batch/src/main/java/org/baeldung/batchscheduler/SpringBatchScheduler.java new file mode 100644 index 0000000000..edb9b7cfa5 --- /dev/null +++ b/spring-batch/src/main/java/org/baeldung/batchscheduler/SpringBatchScheduler.java @@ -0,0 +1,172 @@ +package org.baeldung.batchscheduler; + +import java.util.Date; +import java.util.IdentityHashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.baeldung.batchscheduler.model.Book; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.batch.core.Job; +import org.springframework.batch.core.JobExecution; +import org.springframework.batch.core.JobParametersBuilder; +import org.springframework.batch.core.Step; +import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing; +import org.springframework.batch.core.configuration.annotation.JobBuilderFactory; +import org.springframework.batch.core.configuration.annotation.StepBuilderFactory; +import org.springframework.batch.core.launch.JobLauncher; +import org.springframework.batch.core.launch.support.SimpleJobLauncher; +import org.springframework.batch.core.repository.JobRepository; +import org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean; +import org.springframework.batch.item.ItemWriter; +import org.springframework.batch.item.file.FlatFileItemReader; +import org.springframework.batch.item.file.builder.FlatFileItemReaderBuilder; +import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper; +import org.springframework.batch.support.transaction.ResourcelessTransactionManager; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.core.io.ClassPathResource; +import org.springframework.scheduling.TaskScheduler; +import org.springframework.scheduling.annotation.EnableScheduling; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; +import org.springframework.scheduling.support.ScheduledMethodRunnable; + +@Configuration +@EnableBatchProcessing +@EnableScheduling +public class SpringBatchScheduler { + + private final Logger logger = LoggerFactory.getLogger(SpringBatchScheduler.class); + + private AtomicBoolean enabled = new AtomicBoolean(true); + + private Date currentLaunchDate; + + private final Map> scheduledTasks = new IdentityHashMap<>(); + + @Autowired + private JobBuilderFactory jobBuilderFactory; + + @Autowired + private StepBuilderFactory stepBuilderFactory; + + @Scheduled(fixedRate = 2000) + public void launchJob() throws Exception { + Date date = new Date(); + logger.debug("scheduler starts at " + date); + if (enabled.get()) { + currentLaunchDate = date; + JobExecution jobExecution = jobLauncher().run(job(), new JobParametersBuilder().addDate("launchDate", currentLaunchDate) + .toJobParameters()); + logger.debug("Batch job ends with status as " + jobExecution.getStatus()); + } + logger.debug("scheduler ends "); + } + + public Date getCurrentLaunchDate() { + return currentLaunchDate; + } + + public void stop() { + enabled.set(false); + } + + public void start() { + enabled.set(true); + } + + @Bean + public TaskScheduler poolScheduler() { + return new CustomTaskScheduler(); + } + + private class CustomTaskScheduler extends ThreadPoolTaskScheduler { + + private static final long serialVersionUID = -7142624085505040603L; + + @Override + public ScheduledFuture scheduleAtFixedRate(Runnable task, long period) { + ScheduledFuture future = super.scheduleAtFixedRate(task, period); + + ScheduledMethodRunnable runnable = (ScheduledMethodRunnable) task; + scheduledTasks.put(runnable.getTarget(), future); + + return future; + } + + } + + public void cancelFutureSchedulerTasks() { + scheduledTasks.forEach((k, v) -> { + if (k instanceof SpringBatchScheduler) { + v.cancel(false); + } + }); + } + + @Bean + public Job job() { + return jobBuilderFactory.get("job") + .start(readBooks()) + .build(); + } + + @Bean + public JobLauncher jobLauncher() throws Exception { + SimpleJobLauncher jobLauncher = new SimpleJobLauncher(); + jobLauncher.setJobRepository(jobRepository()); + jobLauncher.afterPropertiesSet(); + return jobLauncher; + } + + @Bean + public JobRepository jobRepository() throws Exception { + MapJobRepositoryFactoryBean factory = new MapJobRepositoryFactoryBean(); + factory.setTransactionManager(new ResourcelessTransactionManager()); + return (JobRepository) factory.getObject(); + } + + @Bean + protected Step readBooks() { + return stepBuilderFactory.get("readBooks") + . chunk(2) + .reader(reader()) + .writer(writer()) + .build(); + } + + @Bean + public FlatFileItemReader reader() { + return new FlatFileItemReaderBuilder().name("bookItemReader") + .resource(new ClassPathResource("books.csv")) + .delimited() + .names(new String[] { "id", "name" }) + .fieldSetMapper(new BeanWrapperFieldSetMapper() { + { + setTargetType(Book.class); + } + }) + .build(); + } + + @Bean + public ItemWriter writer() { + return new ItemWriter() { + + @Override + public void write(List items) throws Exception { + logger.debug("writer..." + items.size()); + for (Book item : items) { + logger.debug(item.toString()); + } + + } + }; + } + +} diff --git a/spring-batch/src/main/java/org/baeldung/batchscheduler/model/Book.java b/spring-batch/src/main/java/org/baeldung/batchscheduler/model/Book.java new file mode 100644 index 0000000000..f992bde20e --- /dev/null +++ b/spring-batch/src/main/java/org/baeldung/batchscheduler/model/Book.java @@ -0,0 +1,35 @@ +package org.baeldung.batchscheduler.model; + +public class Book { + private int id; + private String name; + + public Book() {} + + public Book(int id, String name) { + super(); + this.id = id; + this.name = name; + } + + public int getId() { + return id; + } + + public void setId(int id) { + this.id = id; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public String toString() { + return "Book [id=" + id + ", name=" + name + "]"; + } + +} diff --git a/spring-batch/src/main/resources/books.csv b/spring-batch/src/main/resources/books.csv new file mode 100644 index 0000000000..af68e986a2 --- /dev/null +++ b/spring-batch/src/main/resources/books.csv @@ -0,0 +1,4 @@ +1,SHARP OBJECTS (MOVIE TIE-IN): A NOVEL +2,ARTEMIS: A NOVEL +3,HER PRETTY FACE +4,ALL WE EVER WANTED \ No newline at end of file diff --git a/spring-batch/src/main/resources/logback.xml b/spring-batch/src/main/resources/logback.xml index b110d1c226..0313fb5008 100644 --- a/spring-batch/src/main/resources/logback.xml +++ b/spring-batch/src/main/resources/logback.xml @@ -12,6 +12,10 @@ additivity="false"> + + + + diff --git a/spring-batch/src/test/java/org/baeldung/batchscheduler/SpringBatchSchedulerIntegrationTest.java b/spring-batch/src/test/java/org/baeldung/batchscheduler/SpringBatchSchedulerIntegrationTest.java new file mode 100644 index 0000000000..ef825b900f --- /dev/null +++ b/spring-batch/src/test/java/org/baeldung/batchscheduler/SpringBatchSchedulerIntegrationTest.java @@ -0,0 +1,60 @@ +package org.baeldung.batchscheduler; + +import java.util.Date; + +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.ApplicationContext; +import org.springframework.scheduling.TaskScheduler; +import org.springframework.scheduling.annotation.ScheduledAnnotationBeanPostProcessor; +import org.springframework.test.context.ContextConfiguration; +import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; + +@RunWith(SpringJUnit4ClassRunner.class) +@ContextConfiguration(classes = SpringBatchScheduler.class) +public class SpringBatchSchedulerIntegrationTest { + + private static final int TIMER = 3000; + + @Autowired + private ApplicationContext context; + + @Test + public void stopJobsWhenSchedulerDisabled() throws Exception { + Thread.sleep(TIMER); + SpringBatchScheduler schedulerBean = context.getBean(SpringBatchScheduler.class); + schedulerBean.stop(); + Thread.sleep(TIMER); + Date lastLaunchDate = schedulerBean.getCurrentLaunchDate(); + Thread.sleep(TIMER); + Assert.assertEquals(lastLaunchDate, schedulerBean.getCurrentLaunchDate()); + } + + @Test + public void stopJobSchedulerWhenSchedulerDestroyed() throws Exception { + Thread.sleep(TIMER); + ScheduledAnnotationBeanPostProcessor bean = context.getBean(ScheduledAnnotationBeanPostProcessor.class); + SpringBatchScheduler schedulerBean = context.getBean(SpringBatchScheduler.class); + bean.postProcessBeforeDestruction(schedulerBean, "SpringBatchScheduler"); + Thread.sleep(TIMER); + Date lastLaunchTime = schedulerBean.getCurrentLaunchDate(); + Thread.sleep(TIMER); + Assert.assertEquals(lastLaunchTime, schedulerBean.getCurrentLaunchDate()); + + } + + @Test + public void stopJobSchedulerWhenFutureTasksCancelled() throws Exception { + Thread.sleep(TIMER); + SpringBatchScheduler schedulerBean = context.getBean(SpringBatchScheduler.class); + schedulerBean.cancelFutureSchedulerTasks(); + Thread.sleep(TIMER); + Date lastLaunchTime = schedulerBean.getCurrentLaunchDate(); + Thread.sleep(TIMER); + Assert.assertEquals(lastLaunchTime, schedulerBean.getCurrentLaunchDate()); + + } + +}