commiting spring reactor (#922)

* commiting spring reactor

* updating exception handling
This commit is contained in:
Abhinab Kanrar
2016-12-28 12:41:13 +05:30
committed by Grzegorz Piwowarek
parent b1d2595b65
commit 20bbeb3e65
8 changed files with 247 additions and 0 deletions

View File

@@ -0,0 +1,49 @@
package com.baeldung;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import com.baeldung.consumer.NotificationConsumer;
import reactor.Environment;
import reactor.bus.EventBus;
import static reactor.bus.selector.Selectors.$;
@Configuration
@EnableAutoConfiguration
@ComponentScan
public class Application implements CommandLineRunner {
@Autowired
private EventBus eventBus;
@Autowired
private NotificationConsumer notificationConsumer;
@Bean
Environment env() {
return Environment.initializeIfEmpty().assignErrorJournal();
}
@Bean
EventBus createEventBus(Environment env) {
return EventBus.create(env, Environment.THREAD_POOL);
}
@Override
public void run(String... args) throws Exception {
eventBus.on($("notificationConsumer"), notificationConsumer);
}
public static void main(String[] args){
SpringApplication.run(Application.class, args);
}
}

View File

@@ -0,0 +1,28 @@
package com.baeldung.consumer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import com.baeldung.doman.NotificationData;
import com.baeldung.service.NotificationService;
import reactor.bus.Event;
import reactor.fn.Consumer;
@Service
public class NotificationConsumer implements Consumer<Event<NotificationData>> {
@Autowired
private NotificationService notificationService;
@Override
public void accept(Event<NotificationData> notificationDataEvent) {
NotificationData notificationData = notificationDataEvent.getData();
try {
notificationService.initiateNotofication(notificationData);
} catch (InterruptedException e) {}
}
}

View File

@@ -0,0 +1,37 @@
package com.baeldung.controller;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import com.baeldung.doman.NotificationData;
import reactor.bus.Event;
import reactor.bus.EventBus;
@Controller
public class NotificationController {
@Autowired
private EventBus eventBus;
@RequestMapping(value = "/startNotification/{param}", method = RequestMethod.GET)
public void startNotification(@PathVariable("param") String param) {
int notificationSize = Integer.parseInt(param);
for(int i = 0; i < notificationSize; i++) {
NotificationData data = new NotificationData();
data.setId(i);
eventBus.notify("notificationConsumer",Event.wrap(data));
System.out.println("Notification " +i +": notification task submitted successfully");
}
}
}

View File

@@ -0,0 +1,35 @@
package com.baeldung.doman;
public class NotificationData {
private long id;
private String name;
private String email;
private String mobile;
public long getId() {
return id;
}
public void setId(long id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getEmail() {
return email;
}
public void setEmail(String email) {
this.email = email;
}
public String getMobile() {
return mobile;
}
public void setMobile(String mobile) {
this.mobile = mobile;
}
}

View File

@@ -0,0 +1,9 @@
package com.baeldung.service;
import com.baeldung.doman.NotificationData;
public interface NotificationService {
public void initiateNotofication(NotificationData notificationData) throws InterruptedException;
}

View File

@@ -0,0 +1,21 @@
package com.baeldung.service.impl;
import org.springframework.stereotype.Service;
import com.baeldung.doman.NotificationData;
import com.baeldung.service.NotificationService;
@Service
public class NotificationServiceimpl implements NotificationService {
@Override
public void initiateNotofication(NotificationData notificationData) throws InterruptedException {
System.out.println("Notification service started for Notification ID: " +notificationData.getId());
Thread.sleep(5000);
System.out.println("Notification service ended for Notification ID: " +notificationData.getId());
}
}