From fc41dd02beb147e656c39601b9c005fdcbf9050c Mon Sep 17 00:00:00 2001 From: Krzysiek Date: Wed, 24 Nov 2021 22:52:41 +0100 Subject: [PATCH] JAVA-8362: Copy batchscheduler code to spring-batch-2 --- spring-batch-2/pom.xml | 9 +- .../batchscheduler/SpringBatchScheduler.java | 190 ++++++++++++++++++ .../SpringBatchSchedulerApplication.java | 14 ++ .../baeldung/batchscheduler/model/Book.java | 35 ++++ .../src/main/resources/application.properties | 7 + spring-batch-2/src/main/resources/books.csv | 4 + .../SpringBatchSchedulerIntegrationTest.java | 69 +++++++ spring-batch/repository.sqlite | Bin 73728 -> 73728 bytes 8 files changed, 326 insertions(+), 2 deletions(-) create mode 100644 spring-batch-2/src/main/java/com/baeldung/batchscheduler/SpringBatchScheduler.java create mode 100644 spring-batch-2/src/main/java/com/baeldung/batchscheduler/SpringBatchSchedulerApplication.java create mode 100644 spring-batch-2/src/main/java/com/baeldung/batchscheduler/model/Book.java create mode 100644 spring-batch-2/src/main/resources/books.csv create mode 100644 spring-batch-2/src/test/java/com/baeldung/batchscheduler/SpringBatchSchedulerIntegrationTest.java diff --git a/spring-batch-2/pom.xml b/spring-batch-2/pom.xml index c429c272bd..77779a0fcf 100644 --- a/spring-batch-2/pom.xml +++ b/spring-batch-2/pom.xml @@ -24,7 +24,6 @@ org.hsqldb hsqldb - ${hsqldb.version} runtime @@ -44,11 +43,17 @@ ${spring.batch.version} test + + org.awaitility + awaitility + ${awaitility.version} + test + 4.3.0 - 2.5.1 + 3.1.1 \ No newline at end of file diff --git a/spring-batch-2/src/main/java/com/baeldung/batchscheduler/SpringBatchScheduler.java b/spring-batch-2/src/main/java/com/baeldung/batchscheduler/SpringBatchScheduler.java new file mode 100644 index 0000000000..bfaa044376 --- /dev/null +++ b/spring-batch-2/src/main/java/com/baeldung/batchscheduler/SpringBatchScheduler.java @@ -0,0 +1,190 @@ +package com.baeldung.batchscheduler; + +import com.baeldung.batchscheduler.model.Book; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.batch.core.Job; +import org.springframework.batch.core.JobExecution; +import org.springframework.batch.core.JobParametersBuilder; +import org.springframework.batch.core.Step; +import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing; +import org.springframework.batch.core.configuration.annotation.JobBuilderFactory; +import org.springframework.batch.core.configuration.annotation.StepBuilderFactory; +import org.springframework.batch.core.launch.JobLauncher; +import org.springframework.batch.core.launch.support.SimpleJobLauncher; +import org.springframework.batch.core.repository.JobRepository; +import org.springframework.batch.core.repository.support.JobRepositoryFactoryBean; +import org.springframework.batch.item.ItemWriter; +import org.springframework.batch.item.file.FlatFileItemReader; +import org.springframework.batch.item.file.builder.FlatFileItemReaderBuilder; +import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper; +import org.springframework.batch.support.transaction.ResourcelessTransactionManager; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.core.io.ClassPathResource; +import org.springframework.jdbc.datasource.DriverManagerDataSource; +import org.springframework.scheduling.TaskScheduler; +import org.springframework.scheduling.annotation.EnableScheduling; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; +import org.springframework.scheduling.support.ScheduledMethodRunnable; + +import javax.sql.DataSource; +import java.util.Date; +import java.util.IdentityHashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +@Configuration +@EnableBatchProcessing +@EnableScheduling +public class SpringBatchScheduler { + + private final Logger logger = LoggerFactory.getLogger(SpringBatchScheduler.class); + + private AtomicBoolean enabled = new AtomicBoolean(true); + + private AtomicInteger batchRunCounter = new AtomicInteger(0); + + private final Map> scheduledTasks = new IdentityHashMap<>(); + + @Autowired + private JobBuilderFactory jobBuilderFactory; + + @Autowired + private StepBuilderFactory stepBuilderFactory; + + @Autowired + private JobLauncher jobLauncher; + + @Scheduled(fixedRate = 2000) + public void launchJob() throws Exception { + Date date = new Date(); + logger.debug("scheduler starts at " + date); + if (enabled.get()) { + JobExecution jobExecution = jobLauncher.run(job(), new JobParametersBuilder().addDate("launchDate", date) + .toJobParameters()); + batchRunCounter.incrementAndGet(); + logger.debug("Batch job ends with status as " + jobExecution.getStatus()); + } + logger.debug("scheduler ends "); + } + + public void stop() { + enabled.set(false); + } + + public void start() { + enabled.set(true); + } + + @Bean + public TaskScheduler poolScheduler() { + return new CustomTaskScheduler(); + } + + private class CustomTaskScheduler extends ThreadPoolTaskScheduler { + + private static final long serialVersionUID = -7142624085505040603L; + + @Override + public ScheduledFuture scheduleAtFixedRate(Runnable task, long period) { + ScheduledFuture future = super.scheduleAtFixedRate(task, period); + + ScheduledMethodRunnable runnable = (ScheduledMethodRunnable) task; + scheduledTasks.put(runnable.getTarget(), future); + + return future; + } + + } + + public void cancelFutureSchedulerTasks() { + scheduledTasks.forEach((k, v) -> { + if (k instanceof SpringBatchScheduler) { + v.cancel(false); + } + }); + } + + @Bean + public Job job() { + return jobBuilderFactory.get("job") + .start(readBooks()) + .build(); + } + +// @Bean +// public JobLauncher jobLauncher() throws Exception { +// SimpleJobLauncher jobLauncher = new SimpleJobLauncher(); +// jobLauncher.setJobRepository(jobRepository()); +// jobLauncher.afterPropertiesSet(); +// return jobLauncher; +// } + +// @Bean +// public JobRepository jobRepository() throws Exception { +// JobRepositoryFactoryBean factory = new JobRepositoryFactoryBean(); +// factory.setDataSource(dataSource()); +// factory.setDatabaseType("HSQL"); +// factory.setTransactionManager(new ResourcelessTransactionManager()); +// return factory.getObject(); +// } + +// @Bean +// public DataSource dataSource() { +// DriverManagerDataSource dataSource = new DriverManagerDataSource(); +// dataSource.setDriverClassName("org.hsqldb.jdbc.JDBCDriver"); +// dataSource.setUrl("jdbc:hsqldb:mem:testdb"); +// dataSource.setUsername("sa"); +// dataSource.setPassword(""); +// return dataSource; +// } + + @Bean + protected Step readBooks() { + return stepBuilderFactory.get("readBooks") + . chunk(2) + .reader(reader()) + .writer(writer()) + .build(); + } + + @Bean + public FlatFileItemReader reader() { + return new FlatFileItemReaderBuilder().name("bookItemReader") + .resource(new ClassPathResource("books.csv")) + .delimited() + .names(new String[] { "id", "name" }) + .fieldSetMapper(new BeanWrapperFieldSetMapper() { + { + setTargetType(Book.class); + } + }) + .build(); + } + + @Bean + public ItemWriter writer() { + return new ItemWriter() { + + @Override + public void write(List items) throws Exception { + logger.debug("writer..." + items.size()); + for (Book item : items) { + logger.debug(item.toString()); + } + + } + }; + } + + public AtomicInteger getBatchRunCounter() { + return batchRunCounter; + } + +} diff --git a/spring-batch-2/src/main/java/com/baeldung/batchscheduler/SpringBatchSchedulerApplication.java b/spring-batch-2/src/main/java/com/baeldung/batchscheduler/SpringBatchSchedulerApplication.java new file mode 100644 index 0000000000..5b89163777 --- /dev/null +++ b/spring-batch-2/src/main/java/com/baeldung/batchscheduler/SpringBatchSchedulerApplication.java @@ -0,0 +1,14 @@ +package com.baeldung.batchscheduler; + +import com.baeldung.batch.SpringBootBatchProcessingApplication; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +@SpringBootApplication +public class SpringBatchSchedulerApplication { + + public static void main(String[] args) { + SpringApplication.run(SpringBatchSchedulerApplication.class, args); + } + +} diff --git a/spring-batch-2/src/main/java/com/baeldung/batchscheduler/model/Book.java b/spring-batch-2/src/main/java/com/baeldung/batchscheduler/model/Book.java new file mode 100644 index 0000000000..7deedeb63e --- /dev/null +++ b/spring-batch-2/src/main/java/com/baeldung/batchscheduler/model/Book.java @@ -0,0 +1,35 @@ +package com.baeldung.batchscheduler.model; + +public class Book { + private int id; + private String name; + + public Book() {} + + public Book(int id, String name) { + super(); + this.id = id; + this.name = name; + } + + public int getId() { + return id; + } + + public void setId(int id) { + this.id = id; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public String toString() { + return "Book [id=" + id + ", name=" + name + "]"; + } + +} diff --git a/spring-batch-2/src/main/resources/application.properties b/spring-batch-2/src/main/resources/application.properties index 0b8c56d3f8..5b34d9e93b 100644 --- a/spring-batch-2/src/main/resources/application.properties +++ b/spring-batch-2/src/main/resources/application.properties @@ -1 +1,8 @@ +spring.datasource.driver-class-name=org.hsqldb.jdbc.JDBCDriver +spring.datasource.url=jdbc:hsqldb:mem:testdb;DB_CLOSE_DELAY=-1;hsqldb.tx=mvcc +spring.datasource.username=sa +spring.datasource.password= + +spring.batch.jdbc.initialize-schema=always + file.input=coffee-list.csv \ No newline at end of file diff --git a/spring-batch-2/src/main/resources/books.csv b/spring-batch-2/src/main/resources/books.csv new file mode 100644 index 0000000000..af68e986a2 --- /dev/null +++ b/spring-batch-2/src/main/resources/books.csv @@ -0,0 +1,4 @@ +1,SHARP OBJECTS (MOVIE TIE-IN): A NOVEL +2,ARTEMIS: A NOVEL +3,HER PRETTY FACE +4,ALL WE EVER WANTED \ No newline at end of file diff --git a/spring-batch-2/src/test/java/com/baeldung/batchscheduler/SpringBatchSchedulerIntegrationTest.java b/spring-batch-2/src/test/java/com/baeldung/batchscheduler/SpringBatchSchedulerIntegrationTest.java new file mode 100644 index 0000000000..64a414fdbf --- /dev/null +++ b/spring-batch-2/src/test/java/com/baeldung/batchscheduler/SpringBatchSchedulerIntegrationTest.java @@ -0,0 +1,69 @@ +package com.baeldung.batchscheduler; + +import com.baeldung.batchscheduler.SpringBatchScheduler; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.springframework.batch.test.context.SpringBatchTest; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.context.ApplicationContext; +import org.springframework.context.annotation.PropertySource; +import org.springframework.scheduling.annotation.ScheduledAnnotationBeanPostProcessor; +import org.springframework.test.annotation.DirtiesContext; +import org.springframework.test.context.ContextConfiguration; +import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; +import org.springframework.test.context.junit4.SpringRunner; + +import static org.awaitility.Awaitility.await; +import static java.util.concurrent.TimeUnit.*; + +@SpringBatchTest +@SpringBootTest +@DirtiesContext +@PropertySource("classpath:application.properties") +@RunWith(SpringRunner.class) +public class SpringBatchSchedulerIntegrationTest { + + @Autowired + private ApplicationContext context; + + @Test + public void stopJobsWhenSchedulerDisabled() throws Exception { + SpringBatchScheduler schedulerBean = context.getBean(SpringBatchScheduler.class); + await().untilAsserted(() -> Assert.assertEquals(2, schedulerBean.getBatchRunCounter() + .get())); + schedulerBean.stop(); + await().atLeast(3, SECONDS); + + Assert.assertEquals(2, schedulerBean.getBatchRunCounter() + .get()); + } + + @Test + public void stopJobSchedulerWhenSchedulerDestroyed() throws Exception { + ScheduledAnnotationBeanPostProcessor bean = context.getBean(ScheduledAnnotationBeanPostProcessor.class); + SpringBatchScheduler schedulerBean = context.getBean(SpringBatchScheduler.class); + await().untilAsserted(() -> Assert.assertEquals(2, schedulerBean.getBatchRunCounter() + .get())); + bean.postProcessBeforeDestruction(schedulerBean, "SpringBatchScheduler"); + await().atLeast(3, SECONDS); + + Assert.assertEquals(2, schedulerBean.getBatchRunCounter() + .get()); + } + + @Test + public void stopJobSchedulerWhenFutureTasksCancelled() throws Exception { + SpringBatchScheduler schedulerBean = context.getBean(SpringBatchScheduler.class); + await().untilAsserted(() -> Assert.assertEquals(2, schedulerBean.getBatchRunCounter() + .get())); + schedulerBean.cancelFutureSchedulerTasks(); + await().atLeast(3, SECONDS); + + Assert.assertEquals(2, schedulerBean.getBatchRunCounter() + .get()); + } + + +} diff --git a/spring-batch/repository.sqlite b/spring-batch/repository.sqlite index a2b87ffa00f315b4084437ce2f5f639d47469027..b6a954554c9659b0f486c8af45bd69b710e61a0f 100644 GIT binary patch delta 852 zcma)3O-NKx6n;PVzBlvkyZ0)Z|IOq`F%XLL{~|P<@r9W)n$1E>{F&hm{hGm4WLT4% zHZIgx=qjb8MG%y4Aw-mdixwh6n-&HMErihAr4rG56Ana++I;8ZeBU|eo^vw`AhQ6* z%GurViU_;=s&AH-A-u)VW%`JAQ#aWmD`b|WNtl%2Eu%7T6`yw_$k+;^m#!rOAzyze zl=S%gwf;oHn@DMj=2tYYrumMgf&sr<+IM~tgEfw&Gd?}xzq`X0o zlBl(t&Z?+>rsk-Tc8swE;XQ^7lb>{jRN*~?9NNGgZcq?q|6?rLH~wp?WVF^)=L@Jr z-~+6_Di#X30`hfPs%HQYa53P0tiH|{2&kOrk6X036cus2hYdHzBh9@nZQV>rYym|++sE-`e6Zqp6= znZBV@G)>!WvbZ5Gncbd=oD1o1r(K*&vVtK`Z5E5NS&cbnZU*E%1q%%&5yo83WU}{|$?DKd zsB`l+!2UpmHZaHt0$hd@uoF7K7w{Ymngcq)Jq5`O%R&g9P7KYxtPUrSWHD-C%WkAC u+TNY4L5N~4 z(N#zzO0cxhn}alh+F8gV1i`|>gK%6Uq&e_Je<3E zh9$TJZ*U)Hu@j!bhhxZrs}jAPVD(ByL`TN^+|#9U@uH!_X#(r;hCbZE33lTGmLRRe z{h-o4-A!D!aC6Y`ZmkfvW6T_VmtKKVZbzs&V0b^xW-i-IbL78_L}fT&_8IQ@Olop| z`lCBPmHjd^o=r{2?@1ceIG^AHR&W=WF{^P7Jb|xqKS23&LrtwVD-*bG+;P6y9Sw6b zFG^O`it<@jR*B#ivo?lpMmATxtHZcAw&p6iT8}0~fCOZKG!jCja0otQ9e?8^o?{7f z_(?dr`^$2+Gy46(2a<13E-I(=6!HFE*oq$gLH53&-Y$4DB&Z