JAVA-15787 Moved spring-rector and spring-webflux-amqp to spring-reactive-modules

This commit is contained in:
Dhawal Kapil
2022-11-24 20:32:23 +05:30
parent 7d3e1e13f5
commit c9b4ed6d27
25 changed files with 3 additions and 5 deletions

View File

@@ -27,6 +27,8 @@
<module>spring-5-reactive-security</module>
<module>spring-reactive</module>
<module>spring-reactive-exceptions</module>
<module>spring-reactor</module>
<module>spring-webflux-amqp</module>
</modules>
<build>
@@ -62,4 +64,4 @@
<properties>
</properties>
</project>
</project>

View File

@@ -0,0 +1,7 @@
## Spring Reactor
This module contains articles about Spring Reactor
## Relevant articles:
- [Introduction to Project Reactor Bus](https://www.baeldung.com/reactor-bus)

View File

@@ -0,0 +1,45 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>spring-reactor</artifactId>
<version>1.0-SNAPSHOT</version>
<name>spring-reactor</name>
<packaging>jar</packaging>
<url>http://maven.apache.org</url>
<parent>
<groupId>com.baeldung</groupId>
<artifactId>parent-boot-2</artifactId>
<version>0.0.1-SNAPSHOT</version>
<relativePath>../parent-boot-2</relativePath>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-web</artifactId>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-bus</artifactId>
<version>${reactor.version}</version>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>${reactor.version}</version>
</dependency>
</dependencies>
<properties>
<spring-cloud-sleuth.version>2.0.2.RELEASE</spring-cloud-sleuth.version>
<reactor.version>2.0.8.RELEASE</reactor.version>
</properties>
</project>

View File

@@ -0,0 +1,20 @@
package com.baeldung.reactorbus;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import reactor.Environment;
import reactor.bus.EventBus;
@Configuration
public class Config {
@Bean
public Environment env() {
return Environment.initializeIfEmpty().assignErrorJournal();
}
@Bean
public EventBus createEventBus(Environment env) {
return EventBus.create(env, Environment.THREAD_POOL);
}
}

View File

@@ -0,0 +1,29 @@
package com.baeldung.reactorbus;
import com.baeldung.reactorbus.consumer.NotificationConsumer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import reactor.bus.EventBus;
import static reactor.bus.selector.Selectors.$;
@SpringBootApplication
public class NotificationApplication implements CommandLineRunner {
@Autowired
private EventBus eventBus;
@Autowired
private NotificationConsumer notificationConsumer;
@Override
public void run(String... args) throws Exception {
eventBus.on($("notificationConsumer"), notificationConsumer);
}
public static void main(String[] args) {
SpringApplication.run(NotificationApplication.class, args);
}
}

View File

@@ -0,0 +1,29 @@
package com.baeldung.reactorbus.consumer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import com.baeldung.reactorbus.domain.NotificationData;
import com.baeldung.reactorbus.service.NotificationService;
import reactor.bus.Event;
import reactor.fn.Consumer;
@Service
public class NotificationConsumer implements Consumer<Event<NotificationData>> {
@Autowired
private NotificationService notificationService;
@Override
public void accept(Event<NotificationData> notificationDataEvent) {
NotificationData notificationData = notificationDataEvent.getData();
try {
notificationService.initiateNotification(notificationData);
} catch (InterruptedException e) {
}
}
}

View File

@@ -0,0 +1,34 @@
package com.baeldung.reactorbus.controller;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;
import com.baeldung.reactorbus.domain.NotificationData;
import reactor.bus.Event;
import reactor.bus.EventBus;
@RestController
public class NotificationController {
@Autowired
private EventBus eventBus;
@GetMapping("/startNotification/{param}")
public void startNotification(@PathVariable Integer param) {
for (int i = 0; i < param; i++) {
NotificationData data = new NotificationData();
data.setId(i);
eventBus.notify("notificationConsumer", Event.wrap(data));
System.out.println("Notification " + i + ": notification task submitted successfully");
}
}
}

View File

@@ -0,0 +1,42 @@
package com.baeldung.reactorbus.domain;
public class NotificationData {
private long id;
private String name;
private String email;
private String mobile;
public long getId() {
return id;
}
public void setId(long id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getEmail() {
return email;
}
public void setEmail(String email) {
this.email = email;
}
public String getMobile() {
return mobile;
}
public void setMobile(String mobile) {
this.mobile = mobile;
}
}

View File

@@ -0,0 +1,9 @@
package com.baeldung.reactorbus.service;
import com.baeldung.reactorbus.domain.NotificationData;
public interface NotificationService {
void initiateNotification(NotificationData notificationData) throws InterruptedException;
}

View File

@@ -0,0 +1,21 @@
package com.baeldung.reactorbus.service.impl;
import org.springframework.stereotype.Service;
import com.baeldung.reactorbus.domain.NotificationData;
import com.baeldung.reactorbus.service.NotificationService;
@Service
public class NotificationServiceimpl implements NotificationService {
@Override
public void initiateNotification(NotificationData notificationData) throws InterruptedException {
System.out.println("Notification service started for Notification ID: " + notificationData.getId());
Thread.sleep(5000);
System.out.println("Notification service ended for Notification ID: " + notificationData.getId());
}
}

View File

@@ -0,0 +1,13 @@
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n
</pattern>
</encoder>
</appender>
<root level="INFO">
<appender-ref ref="STDOUT" />
</root>
</configuration>

View File

@@ -0,0 +1,17 @@
package com.baeldung;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import com.baeldung.reactorbus.NotificationApplication;
@RunWith(SpringRunner.class)
@SpringBootTest(classes = NotificationApplication.class)
public class SpringContextTest {
@Test
public void whenSpringContextIsBootstrapped_thenNoExceptions() {
}
}

View File

@@ -0,0 +1,22 @@
package com.baeldung.reactorbus;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.web.server.LocalServerPort;
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.web.client.RestTemplate;
@RunWith(SpringRunner.class)
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
public class NotificationApplicationIntegrationTest {
@LocalServerPort
private int port;
@Test
public void givenAppStarted_whenNotificationTasksSubmitted_thenProcessed() {
RestTemplate restTemplate = new RestTemplate();
restTemplate.getForObject("http://localhost:" + port + "/startNotification/10", String.class);
}
}

View File

@@ -0,0 +1,25 @@
/target/
!.mvn/wrapper/maven-wrapper.jar
### STS ###
.apt_generated
.classpath
.factorypath
.project
.settings
.springBeans
.sts4-cache
### IntelliJ IDEA ###
.idea
*.iws
*.iml
*.ipr
### NetBeans ###
/nbproject/private/
/build/
/nbbuild/
/dist/
/nbdist/
/.nb-gradle/

View File

@@ -0,0 +1,7 @@
## Spring WebFlux AMQP
This module contains articles about Spring WebFlux with AMQP
### Relevant Articles:
- [Spring AMQP in Reactive Applications](https://www.baeldung.com/spring-amqp-reactive)

View File

@@ -0,0 +1,64 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.baeldung.spring</groupId>
<artifactId>spring-webflux-amqp</artifactId>
<version>1.0.0-SNAPSHOT</version>
<name>spring-webflux-amqp</name>
<packaging>jar</packaging>
<description>Spring WebFlux AMQP Sample</description>
<parent>
<groupId>com.baeldung</groupId>
<artifactId>parent-boot-2</artifactId>
<version>0.0.1-SNAPSHOT</version>
<relativePath>../parent-boot-2</relativePath>
</parent>
<dependencyManagement>
<dependencies>
<dependency>
<!-- Import dependency management from Spring Boot -->
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<!-- <version>2.0.4.RELEASE</version> --><!-- works -->
<version>2.1.3.RELEASE</version> <!-- Works with workaround applied -->
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
</dependencies>
</project>

View File

@@ -0,0 +1,304 @@
package com.baeldung.spring.amqp;
import java.time.Duration;
import javax.annotation.PostConstruct;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Exchange;
import org.springframework.amqp.core.ExchangeBuilder;
import org.springframework.amqp.core.MessageListener;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.amqp.rabbit.listener.MessageListenerContainer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@RestController
public class AmqpReactiveController {
private static Logger log = LoggerFactory.getLogger(AmqpReactiveController.class);
@Autowired
private AmqpTemplate amqpTemplate;
@Autowired
private AmqpAdmin amqpAdmin;
@Autowired
private DestinationsConfig destinationsConfig;
@Autowired
private MessageListenerContainerFactory messageListenerContainerFactory;
@PostConstruct
public void setupQueueDestinations() {
log.info("[I48] Creating Destinations...");
destinationsConfig.getQueues()
.forEach((key, destination) -> {
log.info("[I54] Creating directExchange: key={}, name={}, routingKey={}", key, destination.getExchange(), destination.getRoutingKey());
Exchange ex = ExchangeBuilder.directExchange(destination.getExchange())
.durable(true)
.build();
amqpAdmin.declareExchange(ex);
Queue q = QueueBuilder.durable(destination.getRoutingKey())
.build();
amqpAdmin.declareQueue(q);
Binding b = BindingBuilder.bind(q)
.to(ex)
.with(destination.getRoutingKey())
.noargs();
amqpAdmin.declareBinding(b);
log.info("[I70] Binding successfully created.");
});
}
@PostConstruct
public void setupTopicDestinations() {
// For topic each consumer will have its own Queue, so no binding
destinationsConfig.getTopics()
.forEach((key, destination) -> {
log.info("[I98] Creating TopicExchange: name={}, exchange={}", key, destination.getExchange());
Exchange ex = ExchangeBuilder.topicExchange(destination.getExchange())
.durable(true)
.build();
amqpAdmin.declareExchange(ex);
log.info("[I107] Topic Exchange successfully created.");
});
}
@PostMapping(value = "/queue/{name}")
public Mono<ResponseEntity<?>> sendMessageToQueue(@PathVariable String name, @RequestBody String payload) {
// Lookup exchange details
final DestinationsConfig.DestinationInfo d = destinationsConfig.getQueues()
.get(name);
if (d == null) {
// Destination not found.
return Mono.just(ResponseEntity.notFound()
.build());
}
return Mono.fromCallable(() -> {
log.info("[I51] sendMessageToQueue: queue={}, routingKey={}", d.getExchange(), d.getRoutingKey());
amqpTemplate.convertAndSend(d.getExchange(), d.getRoutingKey(), payload);
return ResponseEntity.accepted()
.build();
});
}
/**
* Receive messages for the given queue
* @param name
* @param errorHandler
* @return
*/
@GetMapping(value = "/queue/{name}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<?> receiveMessagesFromQueue(@PathVariable String name) {
DestinationsConfig.DestinationInfo d = destinationsConfig.getQueues()
.get(name);
if (d == null) {
return Flux.just(ResponseEntity.notFound()
.build());
}
MessageListenerContainer mlc = messageListenerContainerFactory.createMessageListenerContainer(d.getRoutingKey());
Flux<String> f = Flux.<String> create(emitter -> {
log.info("[I168] Adding listener, queue={}", d.getRoutingKey());
mlc.setupMessageListener((MessageListener) m -> {
String qname = m.getMessageProperties()
.getConsumerQueue();
log.info("[I137] Message received, queue={}", qname);
if (emitter.isCancelled()) {
log.info("[I166] cancelled, queue={}", qname);
mlc.stop();
return;
}
String payload = new String(m.getBody());
emitter.next(payload);
log.info("[I176] Message sent to client, queue={}", qname);
});
emitter.onRequest(v -> {
log.info("[I171] Starting container, queue={}", d.getRoutingKey());
mlc.start();
});
emitter.onDispose(() -> {
log.info("[I176] onDispose: queue={}", d.getRoutingKey());
mlc.stop();
});
log.info("[I171] Container started, queue={}", d.getRoutingKey());
});
return Flux.interval(Duration.ofSeconds(5))
.map(v -> {
log.info("[I209] sending keepalive message...");
return "No news is good news";
})
.mergeWith(f);
}
/**
* send message to a given topic
* @param name
* @param payload
* @return
*/
@PostMapping(value = "/topic/{name}")
public Mono<ResponseEntity<?>> sendMessageToTopic(@PathVariable String name, @RequestBody String payload) {
// Lookup exchange details
final DestinationsConfig.DestinationInfo d = destinationsConfig.getTopics()
.get(name);
if (d == null) {
// Destination not found.
return Mono.just(ResponseEntity.notFound()
.build());
}
return Mono.fromCallable(() -> {
log.info("[I51] sendMessageToTopic: topic={}, routingKey={}", d.getExchange(), d.getRoutingKey());
amqpTemplate.convertAndSend(d.getExchange(), d.getRoutingKey(), payload);
return ResponseEntity.accepted()
.build();
});
}
@GetMapping(value = "/topic/{name}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<?> receiveMessagesFromTopic(@PathVariable String name) {
DestinationsConfig.DestinationInfo d = destinationsConfig.getTopics()
.get(name);
if (d == null) {
return Flux.just(ResponseEntity.notFound()
.build());
}
Queue topicQueue = createTopicQueue(d);
String qname = topicQueue.getName();
MessageListenerContainer mlc = messageListenerContainerFactory.createMessageListenerContainer(qname);
Flux<String> f = Flux.<String> create(emitter -> {
log.info("[I168] Adding listener, queue={}", qname);
mlc.setupMessageListener((MessageListener) m -> {
log.info("[I137] Message received, queue={}", qname);
if (emitter.isCancelled()) {
log.info("[I166] cancelled, queue={}", qname);
mlc.stop();
return;
}
String payload = new String(m.getBody());
emitter.next(payload);
log.info("[I176] Message sent to client, queue={}", qname);
});
emitter.onRequest(v -> {
log.info("[I171] Starting container, queue={}", qname);
mlc.start();
});
emitter.onDispose(() -> {
log.info("[I176] onDispose: queue={}", qname);
amqpAdmin.deleteQueue(qname);
mlc.stop();
});
log.info("[I171] Container started, queue={}", qname);
});
return Flux.interval(Duration.ofSeconds(5))
.map(v -> {
log.info("[I209] sending keepalive message...");
return "No news is good news";
})
.mergeWith(f);
}
private Queue createTopicQueue(DestinationsConfig.DestinationInfo destination) {
Exchange ex = ExchangeBuilder.topicExchange(destination.getExchange())
.durable(true)
.build();
amqpAdmin.declareExchange(ex);
Queue q = QueueBuilder.nonDurable()
.build();
amqpAdmin.declareQueue(q);
Binding b = BindingBuilder.bind(q)
.to(ex)
.with(destination.getRoutingKey())
.noargs();
amqpAdmin.declareBinding(b);
return q;
}
}

View File

@@ -0,0 +1,59 @@
package com.baeldung.spring.amqp;
import java.util.HashMap;
import java.util.Map;
import org.springframework.boot.context.properties.ConfigurationProperties;
@ConfigurationProperties("destinations")
public class DestinationsConfig {
private Map<String,DestinationInfo> queues = new HashMap<>();
private Map<String,DestinationInfo> topics = new HashMap<>();
public Map<String, DestinationInfo> getQueues() {
return queues;
}
public void setQueues(Map<String, DestinationInfo> queues) {
this.queues = queues;
}
public Map<String, DestinationInfo> getTopics() {
return topics;
}
public void setTopics(Map<String, DestinationInfo> topics) {
this.topics = topics;
}
// DestinationInfo stores the Exchange name and routing key used
// by our producers when posting messages
static class DestinationInfo {
private String exchange;
private String routingKey;
public String getExchange() {
return exchange;
}
public void setExchange(String exchange) {
this.exchange = exchange;
}
public String getRoutingKey() {
return routingKey;
}
public void setRoutingKey(String routingKey) {
this.routingKey = routingKey;
}
}
}

View File

@@ -0,0 +1,29 @@
package com.baeldung.spring.amqp;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.MessageListenerContainer;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class MessageListenerContainerFactory {
@Autowired
private ConnectionFactory connectionFactory;
public MessageListenerContainerFactory() {
}
public MessageListenerContainer createMessageListenerContainer(String queueName) {
SimpleMessageListenerContainer mlc = new SimpleMessageListenerContainer(connectionFactory);
mlc.addQueueNames(queueName);
mlc.setAcknowledgeMode(AcknowledgeMode.AUTO);
return mlc;
}
}

View File

@@ -0,0 +1,36 @@
package com.baeldung.spring.amqp;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.web.filter.reactive.HiddenHttpMethodFilter;
import org.springframework.web.server.ServerWebExchange;
import org.springframework.web.server.WebFilterChain;
import reactor.core.publisher.Mono;
@SpringBootApplication
@EnableConfigurationProperties(DestinationsConfig.class)
public class SpringWebfluxAmqpApplication {
public static void main(String[] args) {
SpringApplication.run(SpringWebfluxAmqpApplication.class, args);
}
/**
* This is a workaround for https://github.com/spring-projects/spring-framework/issues/21094
* @return
*/
@Bean
public HiddenHttpMethodFilter hiddenHttpMethodFilter() {
return new HiddenHttpMethodFilter() {
@Override
public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
return chain.filter(exchange);
}
};
}
}

View File

@@ -0,0 +1,27 @@
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
destinations:
queues:
NYSE:
exchange: nyse
routing-key: NYSE
IBOV:
exchange: ibov
routing-key: IBOV
topics:
weather:
exchange: alerts
routing-key: WEATHER

View File

@@ -0,0 +1,13 @@
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n
</pattern>
</encoder>
</appender>
<root level="INFO">
<appender-ref ref="STDOUT" />
</root>
</configuration>

View File

@@ -0,0 +1,25 @@
package com.baeldung;
import com.baeldung.spring.amqp.SpringWebfluxAmqpApplication;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
/**
* This live test requires:
* rabbitmq instance running on the environment
*
* <br>
* To run rabbitmq using docker image:
* (e.g. `docker run -d --name rabbitmq -p 5672:5672 rabbitmq:3`)
*
*/
@RunWith(SpringRunner.class)
@SpringBootTest(classes = SpringWebfluxAmqpApplication.class)
public class SpringContextLiveTest {
@Test
public void whenSpringContextIsBootstrapped_thenNoExceptions() {
}
}

View File

@@ -0,0 +1,26 @@
package com.baeldung.spring.amqp;
import org.junit.Test;
import org.springframework.test.web.reactive.server.WebTestClient;
public class SpringWebfluxAmqpLiveTest {
@Test
public void whenSendingAMessageToQueue_thenAcceptedReturnCode() {
WebTestClient client = WebTestClient.bindToServer()
.baseUrl("http://localhost:8080")
.build();
client.post()
.uri("/queue/NYSE")
.syncBody("Test Message")
.exchange()
.expectStatus().isAccepted();
}
}