Merge pull request #27 from KunwarVikas/stockpricenotificationscheduling

Stockpricenotificationscheduling
This commit is contained in:
javadevjournal
2020-07-25 15:55:59 -07:00
committed by GitHub
22 changed files with 116 additions and 319 deletions

View File

@@ -3,12 +3,14 @@ package com.javadevjournal.springbootbatch;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;
/**
* @Author - Kunwar Vikas
*/
@SpringBootApplication
@EnableBatchProcessing
@EnableScheduling
public class SpringBootBatchBasicApplication {
public static void main(String[] args) {

View File

@@ -1,9 +1,10 @@
package com.javadevjournal.springbootbatch.config;
import com.javadevjournal.springbootbatch.listener.SPJobExecutionListener;
import com.javadevjournal.springbootbatch.listener.SPStepListener;
import com.javadevjournal.springbootbatch.model.Employee;
import com.javadevjournal.springbootbatch.step.EmployeeItemProcessor;
import com.javadevjournal.springbootbatch.listener.SpringBatchJobCompletionListener;
import com.javadevjournal.springbootbatch.listener.SpringBatchJobExecutionListener;
import com.javadevjournal.springbootbatch.listener.SpringBatchStepListener;
import com.javadevjournal.springbootbatch.model.StockInfo;
import com.javadevjournal.springbootbatch.step.StockInfoProcessor;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecutionListener;
import org.springframework.batch.core.Step;
@@ -18,11 +19,6 @@ import org.springframework.batch.item.file.transform.PassThroughLineAggregator;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import com.javadevjournal.springbootbatch.listener.SpringBatchJobCompletionListener;
import com.javadevjournal.springbootbatch.step.SBProcessor;
import com.javadevjournal.springbootbatch.step.SBReader;
import com.javadevjournal.springbootbatch.step.SBWriter;
import org.springframework.core.io.ClassPathResource;
import org.springframework.core.io.FileSystemResource;
@@ -40,42 +36,52 @@ public class SpringBatchConfig {
@Bean
public Job processJob() {
return jobBuilderFactory.get("javadevjournaljob")
return jobBuilderFactory.get("stockpricesinfojob")
.incrementer(new RunIdIncrementer())
.listener(new SPJobExecutionListener())
.flow(orderStep1()).end().build();
.listener(new SpringBatchJobExecutionListener())
.flow(StockPricesInfoStep())
.end()
.build();
}
@Bean
public Step orderStep1() {
public Step StockPricesInfoStep() {
return stepBuilderFactory.get("step1")
.listener(new SPStepListener())
.<Employee, String>chunk(10).reader(reader())
.processor(processor()).writer(writer()).build();
.listener(new SpringBatchStepListener())
.<StockInfo, String>chunk(10)
.reader(reader())
.processor(stockInfoProcessor())
.writer(writer())
.faultTolerant()
.retryLimit(3)
.retry(Exception.class)
.build();
}
@Bean
public FlatFileItemReader<Employee> reader() {
return new FlatFileItemReaderBuilder<Employee>()
.name("employeeItemReader")
.resource(new ClassPathResource("csv/employees.csv"))
.delimited().names(new String[] {"firstName", "lastName","department"})
.targetType(Employee.class).build();
public FlatFileItemReader<StockInfo> reader() {
return new FlatFileItemReaderBuilder<StockInfo>()
.name("stockInfoItemReader")
.resource(new ClassPathResource("csv/stockinfo.csv"))
.delimited()
.names(new String[] {"stockId", "stockName","stockPrice","yearlyHigh","yearlyLow","address","sector","market"})
.targetType(StockInfo.class)
.build();
}
@Bean
public EmployeeItemProcessor processor() {
return new EmployeeItemProcessor();
public StockInfoProcessor stockInfoProcessor(){
return new StockInfoProcessor();
}
@Bean
public FlatFileItemWriter<String> writer() {
return new FlatFileItemWriterBuilder<String>()
.name("greetingItemWriter")
.name("stockInfoItemWriter")
.resource(new FileSystemResource(
"target/output.txt"))
.lineAggregator(new PassThroughLineAggregator<>()).build();
.lineAggregator(new PassThroughLineAggregator<>())
.build();
}
@Bean

View File

@@ -3,12 +3,19 @@ package com.javadevjournal.springbootbatch.controller;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.JobParametersInvalidException;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException;
import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException;
import org.springframework.batch.core.repository.JobRestartException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.Date;
/**
* @Author - Kunwar Vikas
*/
@@ -19,13 +26,34 @@ public class SpringBatchJobController {
JobLauncher jobLauncher;
@Autowired
Job javadevjournaljob;
Job stockpricesinfojob;
/**
* Invoke the batch job on-demand using rest APIs.
* @return
* @throws Exception
*/
@GetMapping("/invokejob")
public String invokeBatchJob() throws Exception {
JobParameters jobParameters = new JobParametersBuilder().addLong("time", System.currentTimeMillis())
.toJobParameters();
jobLauncher.run(javadevjournaljob, jobParameters);
jobLauncher.run(stockpricesinfojob, jobParameters);
return "Batch job has been invoked";
}
/**
* Schuled batch job every minute to load the stock prices and send notification
* @return
* @throws JobParametersInvalidException
* @throws JobExecutionAlreadyRunningException
* @throws JobRestartException
* @throws JobInstanceAlreadyCompleteException
*/
@Scheduled(fixedRate = 60000)
public String schedule() throws JobParametersInvalidException, JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException {
JobParameters jobParameters = new JobParametersBuilder().addLong("time", System.currentTimeMillis())
.toJobParameters();
jobLauncher.run(stockpricesinfojob, jobParameters);
return "Batch job has been invoked";
}
}

View File

@@ -1,30 +0,0 @@
package com.javadevjournal.springbootbatch.listener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.ChunkListener;
import org.springframework.batch.core.StepListener;
import org.springframework.batch.core.scope.context.ChunkContext;
/**
* @Author - Kunwar Vikas
*/
public class SPChunkListener implements ChunkListener {
Logger logger = LoggerFactory.getLogger(SPChunkListener.class);
@Override
public void beforeChunk(ChunkContext chunkContext) {
logger.info("beforeChunk");
}
@Override
public void afterChunk(ChunkContext chunkContext) {
logger.info("afterChunk");
}
@Override
public void afterChunkError(ChunkContext chunkContext) {
logger.info("afterChunkError");
}
}

View File

@@ -1,29 +0,0 @@
package com.javadevjournal.springbootbatch.listener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.ItemProcessListener;
/**
* @Author - Kunwar Vikas
*/
public class SPItemProcessorListener implements ItemProcessListener<String, Number> {
Logger logger = LoggerFactory.getLogger(SPItemProcessorListener.class);
@Override
public void beforeProcess(String item) {
logger.info("beforeProcess");
}
@Override
public void afterProcess(String item, Number result) {
logger.info("afterProcess");
}
@Override
public void onProcessError(String item, Exception e) {
logger.info(" onProcessError");
}
}

View File

@@ -1,28 +0,0 @@
package com.javadevjournal.springbootbatch.listener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.ItemReadListener;
/**
* @Author - Kunwar Vikas
*/
public class SPItemReadListener implements ItemReadListener<String> {
Logger logger = LoggerFactory.getLogger(SPItemReadListener.class);
@Override
public void beforeRead() {
logger.info("Before reading an item");
}
@Override
public void afterRead(String item) {
logger.info("After reading an item: "+ item.toString());
}
@Override
public void onReadError(Exception ex) {
logger.error("Error occurred while reading an item!");
}
}

View File

@@ -1,28 +0,0 @@
package com.javadevjournal.springbootbatch.listener;
import com.javadevjournal.springbootbatch.model.User;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.ItemWriteListener;
import org.springframework.jdbc.core.JdbcTemplate;
import javax.sql.DataSource;
import java.util.List;
/**
* @Author - Kunwar Vikas
*/
public class SPItemWriteListener implements ItemWriteListener<Number> {
Logger logger = LoggerFactory.getLogger(SPItemWriteListener.class);
public void beforeWrite(List items) {
logger.info("before write");
}
public void onWriteError(Exception exception, List items) {
logger.info("Error occurred when writing items!");
}
public void afterWrite(List items) {
logger.info("after write");
}
}

View File

@@ -1,28 +0,0 @@
package com.javadevjournal.springbootbatch.listener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.SkipListener;
/**
* @Author - Kunwar Vikas
*/
public class SPSkipListener implements SkipListener<String, Number> {
Logger logger = LoggerFactory.getLogger(SPSkipListener.class);
@Override
public void onSkipInRead(Throwable t) {
logger.info("onSkipInRead");
}
@Override
public void onSkipInWrite(Number item, Throwable t) {
logger.info("onSkipInWrite");
}
@Override
public void onSkipInProcess(String item, Throwable t) {
logger.info("onWriteError");
}
}

View File

@@ -9,9 +9,9 @@ import org.springframework.batch.core.JobExecutionListener;
/**
* @Author - Kunwar Vikas
*/
public class SPJobExecutionListener implements JobExecutionListener {
public class SpringBatchJobExecutionListener implements JobExecutionListener {
Logger logger = LoggerFactory.getLogger(SPJobExecutionListener.class);
Logger logger = LoggerFactory.getLogger(SpringBatchJobExecutionListener.class);
public void beforeJob(JobExecution jobExecution) {
logger.info("BEFORE BATCH JOB STARTS");

View File

@@ -1,6 +1,5 @@
package com.javadevjournal.springbootbatch.listener;
import com.javadevjournal.springbootbatch.step.SBWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.ExitStatus;
@@ -10,9 +9,9 @@ import org.springframework.batch.core.StepExecutionListener;
/**
* @Author - Kunwar Vikas
*/
public class SPStepListener implements StepExecutionListener {
public class SpringBatchStepListener implements StepExecutionListener {
Logger logger = LoggerFactory.getLogger(SPStepListener.class);
Logger logger = LoggerFactory.getLogger(SpringBatchStepListener.class);
@Override
public void beforeStep(StepExecution stepExecution) {

View File

@@ -1,19 +0,0 @@
package com.javadevjournal.springbootbatch.mapper;
import com.javadevjournal.springbootbatch.model.User;
import org.springframework.jdbc.core.RowMapper;
import java.sql.ResultSet;
import java.sql.SQLException;
public class UserRowMapper implements RowMapper {
public User mapRow(ResultSet rs, int rowNum) throws SQLException {
User user = new User();
user.setFirstName(rs.getString("firstName"));
user.setMiddleName(rs.getString("middleName"));
user.setLastName(rs.getString("lastName"));
user.setCity(rs.getString("city"));
user.setId(rs.getInt("id"));
return user;
}
}

View File

@@ -1,13 +0,0 @@
package com.javadevjournal.springbootbatch.model;
import lombok.Data;
@Data
public class Employee {
private String firstName;
private String lastName;
private String department;
public Employee() {
}
}

View File

@@ -0,0 +1,16 @@
package com.javadevjournal.springbootbatch.model;
import lombok.Data;
import java.util.List;
@Data
public class StockInfo {
private String stockId;
private String stockName;
private double stockPrice;
private double yearlyHigh;
private double yearlyLow;
private String address;
private String sector;
private String market;
}

View File

@@ -1,15 +0,0 @@
package com.javadevjournal.springbootbatch.model;
import lombok.Data;
/**
* Author Kunwar Vikas
*/
@Data
public class User {
private int id;
private String firstName;
private String middleName;
private String lastName;
private String city;
}

View File

@@ -1,22 +0,0 @@
package com.javadevjournal.springbootbatch.step;
import com.javadevjournal.springbootbatch.model.Employee;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.item.ItemProcessor;
public class EmployeeItemProcessor
implements ItemProcessor<Employee, String> {
private static final Logger LOGGER =
LoggerFactory.getLogger(EmployeeItemProcessor.class);
@Override
public String process(Employee employee) throws Exception {
String greeting = "Hello " + employee.getFirstName() + " "
+ employee.getLastName() + " from " + employee.getDepartment()+"!";
LOGGER.info("converting '{}' into '{}'", employee, greeting);
return greeting;
}
}

View File

@@ -1,15 +0,0 @@
package com.javadevjournal.springbootbatch.step;
import org.springframework.batch.item.ItemProcessor;
/**
* @Author - Kunwar Vikas
*/
public class SBProcessor implements ItemProcessor<String, String> {
@Override
public String process(String data) throws Exception {
return data.toUpperCase();
}
}

View File

@@ -1,31 +0,0 @@
package com.javadevjournal.springbootbatch.step;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.NonTransientResourceException;
import org.springframework.batch.item.ParseException;
import org.springframework.batch.item.UnexpectedInputException;
/**
* @Author - Kunwar Vikas
*/
public class SBReader implements ItemReader<String> {
private String[] messages = { "javadevjournal.com",
"Welcome to Spring Batch Example",
"We use H2 Database for this example" };
private int count = 0;
@Override
public String read() throws Exception, UnexpectedInputException,
ParseException, NonTransientResourceException {
if (count < messages.length) {
return messages[count++];
} else {
count = 0;
}
return null;
}
}

View File

@@ -1,21 +0,0 @@
package com.javadevjournal.springbootbatch.step;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.item.ItemWriter;
/**
* @Author - Kunwar Vikas
*/
public class SBWriter implements ItemWriter<String> {
Logger logger = LoggerFactory.getLogger(SBWriter.class);
@Override
public void write(List<? extends String> messages) throws Exception {
for (String msg : messages) {
logger.info("Writing the data \n" + msg);
}
}
}

View File

@@ -0,0 +1,24 @@
package com.javadevjournal.springbootbatch.step;
import com.javadevjournal.springbootbatch.model.StockInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.item.ItemProcessor;
import java.util.Date;
public class StockInfoProcessor
implements ItemProcessor<StockInfo, String> {
private static final Logger LOGGER =
LoggerFactory.getLogger(StockInfoProcessor.class);
@Override
public String process(StockInfo stockInfo) throws Exception {
System.out.println("Hello");
String message = stockInfo.getStockName() + " is trading at "
+ stockInfo.getStockPrice() + " on " + stockInfo.getMarket()+" at "+ new Date().toString()+ "!";
LOGGER.info("printing '{}' to output file", message);
return message;
}
}

View File

@@ -10,4 +10,6 @@ log4j.rootCategory=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{ABSOLUTE} %5p %t %c{2}:%L - %m%n
log4j.category.org.springframework.beans.factory=DEBUG
log4j.category.org.springframework.beans.factory=DEBUG
spring.batch.job.enabled=false

View File

@@ -1,4 +0,0 @@
Jan, Sri, FCI
Chan, Sri, Powergrid
Vij, Ver, CDA
Sav, Sri, NTPC
1 Jan Sri FCI
2 Chan Sri Powergrid
3 Vij Ver CDA
4 Sav Sri NTPC

View File

@@ -0,0 +1,3 @@
1, Infy, 780.98, 1530.11, 453.44, Mumbai, Banking, BSE
1, TCS, 780.98, 1530.11, 453.44, Mumbai, Banking, BSE
1, Wipro, 780.98, 1530.11, 453.44, Mumbai, Banking, BSE
1 1 Infy 780.98 1530.11 453.44 Mumbai Banking BSE
2 1 TCS 780.98 1530.11 453.44 Mumbai Banking BSE
3 1 Wipro 780.98 1530.11 453.44 Mumbai Banking BSE