diff --git a/spring-5-mvc/src/main/java/com/baeldung/web/SseEmitterController.java b/spring-5-mvc/src/main/java/com/baeldung/web/SseEmitterController.java
index b11c93fb08..00113c5ff7 100644
--- a/spring-5-mvc/src/main/java/com/baeldung/web/SseEmitterController.java
+++ b/spring-5-mvc/src/main/java/com/baeldung/web/SseEmitterController.java
@@ -1,12 +1,16 @@
package com.baeldung.web;
-import com.baeldung.Constants;
+import java.time.LocalTime;
import java.util.Date;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
+import org.springframework.web.servlet.mvc.method.annotation.SseEmitter.SseEventBuilder;
+
+import com.baeldung.Constants;
@Controller
public class SseEmitterController {
@@ -29,4 +33,27 @@ public class SseEmitterController {
return emitter;
}
+ @GetMapping("/stream-sse-mvc")
+ public SseEmitter streamSseMvc() {
+ SseEmitter emitter = new SseEmitter();
+ ExecutorService sseMvcExecutor = Executors.newSingleThreadExecutor();
+
+ sseMvcExecutor.execute(() -> {
+ try {
+ for (int i = 0; true; i++) {
+ SseEventBuilder event = SseEmitter.event()
+ .data("SSE MVC - " + LocalTime.now()
+ .toString())
+ .id(String.valueOf(i))
+ .name("sse event - mvc");
+ emitter.send(event);
+ Thread.sleep(1000);
+ }
+ } catch (Exception ex) {
+ emitter.completeWithError(ex);
+ }
+ });
+ return emitter;
+ }
+
}
diff --git a/spring-5-reactive/pom.xml b/spring-5-reactive/pom.xml
index e81d3d8b79..5f455c3906 100644
--- a/spring-5-reactive/pom.xml
+++ b/spring-5-reactive/pom.xml
@@ -94,6 +94,12 @@
${project-reactor-test}
test
+
+ org.junit.platform
+ junit-platform-runner
+ ${junit.platform.version}
+ test
+
@@ -117,6 +123,7 @@
1.0
4.1
3.1.6.RELEASE
+ 1.2.0
diff --git a/spring-5-reactive/src/main/java/com/baeldung/reactive/serversentevents/consumer/ConsumerSSEApplication.java b/spring-5-reactive/src/main/java/com/baeldung/reactive/serversentevents/consumer/ConsumerSSEApplication.java
new file mode 100644
index 0000000000..3997607ef0
--- /dev/null
+++ b/spring-5-reactive/src/main/java/com/baeldung/reactive/serversentevents/consumer/ConsumerSSEApplication.java
@@ -0,0 +1,19 @@
+package com.baeldung.reactive.serversentevents.consumer;
+
+import java.util.Collections;
+
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.scheduling.annotation.EnableAsync;
+
+@SpringBootApplication
+@EnableAsync
+public class ConsumerSSEApplication {
+
+ public static void main(String[] args) {
+ SpringApplication app = new SpringApplication(ConsumerSSEApplication.class);
+ app.setDefaultProperties(Collections.singletonMap("server.port", "8082"));
+ app.run(args);
+ }
+
+}
diff --git a/spring-5-reactive/src/main/java/com/baeldung/reactive/serversentevents/consumer/controller/ClientController.java b/spring-5-reactive/src/main/java/com/baeldung/reactive/serversentevents/consumer/controller/ClientController.java
new file mode 100644
index 0000000000..69a6bc396c
--- /dev/null
+++ b/spring-5-reactive/src/main/java/com/baeldung/reactive/serversentevents/consumer/controller/ClientController.java
@@ -0,0 +1,83 @@
+package com.baeldung.reactive.serversentevents.consumer.controller;
+
+import java.time.LocalTime;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.core.ParameterizedTypeReference;
+import org.springframework.http.MediaType;
+import org.springframework.http.codec.ServerSentEvent;
+import org.springframework.scheduling.annotation.Async;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RestController;
+import org.springframework.web.reactive.function.client.WebClient;
+
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+@RestController
+@RequestMapping("/sse-consumer")
+public class ClientController {
+
+ private static Logger logger = LoggerFactory.getLogger(ClientController.class);
+ private WebClient client = WebClient.create("http://localhost:8081/sse-server");
+
+ @GetMapping("/launch-sse-client")
+ public String launchSSEFromSSEWebClient() {
+ consumeSSE();
+ return "LAUNCHED EVENT CLIENT!!! Check the logs...";
+ }
+
+ @GetMapping("/launch-flux-client")
+ public String launchcFluxFromSSEWebClient() {
+ consumeFlux();
+ return "LAUNCHED EVENT CLIENT!!! Check the logs...";
+ }
+
+ @GetMapping("/launch-sse-from-flux-endpoint-client")
+ public String launchFluxFromFluxWebClient() {
+ consumeSSEFromFluxEndpoint();
+ return "LAUNCHED EVENT CLIENT!!! Check the logs...";
+ }
+
+ @Async
+ public void consumeSSE() {
+ ParameterizedTypeReference> type = new ParameterizedTypeReference>() {
+ };
+
+ Flux> eventStream = client.get()
+ .uri("/stream-sse")
+ .retrieve()
+ .bodyToFlux(type);
+
+ eventStream.subscribe(content -> logger.info("Current time: {} - Received SSE: name[{}], id [{}], content[{}] ", LocalTime.now(), content.event(), content.id(), content.data()), error -> logger.error("Error receiving SSE: {}", error),
+ () -> logger.info("Completed!!!"));
+ }
+
+ @Async
+ public void consumeFlux() {
+ Flux stringStream = client.get()
+ .uri("/stream-flux")
+ .accept(MediaType.TEXT_EVENT_STREAM)
+ .retrieve()
+ .bodyToFlux(String.class);
+
+ stringStream.subscribe(content -> logger.info("Current time: {} - Received content: {} ", LocalTime.now(), content), error -> logger.error("Error retrieving content: {}", error), () -> logger.info("Completed!!!"));
+ }
+
+ @Async
+ public void consumeSSEFromFluxEndpoint() {
+ ParameterizedTypeReference> type = new ParameterizedTypeReference>() {
+ };
+
+ Flux> eventStream = client.get()
+ .uri("/stream-flux")
+ .accept(MediaType.TEXT_EVENT_STREAM)
+ .retrieve()
+ .bodyToFlux(type);
+
+ eventStream.subscribe(content -> logger.info("Current time: {} - Received SSE: name[{}], id [{}], content[{}] ", LocalTime.now(), content.event(), content.id(), content.data()), error -> logger.error("Error receiving SSE: {}", error),
+ () -> logger.info("Completed!!!"));
+ }
+}
diff --git a/spring-5-reactive/src/main/java/com/baeldung/reactive/serversentevents/server/ServerSSEApplication.java b/spring-5-reactive/src/main/java/com/baeldung/reactive/serversentevents/server/ServerSSEApplication.java
new file mode 100644
index 0000000000..2750e6616d
--- /dev/null
+++ b/spring-5-reactive/src/main/java/com/baeldung/reactive/serversentevents/server/ServerSSEApplication.java
@@ -0,0 +1,17 @@
+package com.baeldung.reactive.serversentevents.server;
+
+import java.util.Collections;
+
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+
+@SpringBootApplication
+public class ServerSSEApplication {
+
+ public static void main(String[] args) {
+ SpringApplication app = new SpringApplication(ServerSSEApplication.class);
+ app.setDefaultProperties(Collections.singletonMap("server.port", "8081"));
+ app.run(args);
+ }
+
+}
diff --git a/spring-5-reactive/src/main/java/com/baeldung/reactive/serversentevents/server/controllers/ServerController.java b/spring-5-reactive/src/main/java/com/baeldung/reactive/serversentevents/server/controllers/ServerController.java
new file mode 100644
index 0000000000..1ad8e848cf
--- /dev/null
+++ b/spring-5-reactive/src/main/java/com/baeldung/reactive/serversentevents/server/controllers/ServerController.java
@@ -0,0 +1,35 @@
+package com.baeldung.reactive.serversentevents.server.controllers;
+
+import java.time.Duration;
+import java.time.LocalTime;
+
+import org.springframework.http.MediaType;
+import org.springframework.http.codec.ServerSentEvent;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RestController;
+
+import reactor.core.publisher.Flux;
+
+@RestController
+@RequestMapping("/sse-server")
+public class ServerController {
+
+ @GetMapping("/stream-sse")
+ public Flux> streamEvents() {
+ return Flux.interval(Duration.ofSeconds(1))
+ .map(sequence -> ServerSentEvent. builder()
+ .id(String.valueOf(sequence))
+ .event("periodic-event")
+ .data("SSE - " + LocalTime.now()
+ .toString())
+ .build());
+ }
+
+ @GetMapping(path = "/stream-flux", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
+ public Flux streamFlux() {
+ return Flux.interval(Duration.ofSeconds(1))
+ .map(sequence -> "Flux - " + LocalTime.now()
+ .toString());
+ }
+}
diff --git a/spring-5-reactive/src/test/java/com/baeldung/reactive/serversentsevents/ServiceSentEventLiveTest.java b/spring-5-reactive/src/test/java/com/baeldung/reactive/serversentsevents/ServiceSentEventLiveTest.java
new file mode 100644
index 0000000000..53f4a3b1bb
--- /dev/null
+++ b/spring-5-reactive/src/test/java/com/baeldung/reactive/serversentsevents/ServiceSentEventLiveTest.java
@@ -0,0 +1,49 @@
+package com.baeldung.reactive.serversentsevents;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.function.Executable;
+import org.junit.platform.runner.JUnitPlatform;
+import org.junit.runner.RunWith;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.http.MediaType;
+import org.springframework.test.web.reactive.server.WebTestClient;
+
+@RunWith(JUnitPlatform.class)
+@SpringBootTest
+public class ServiceSentEventLiveTest {
+
+ private WebTestClient client = WebTestClient.bindToServer()
+ .baseUrl("http://localhost:8081/sse-server")
+ .build();
+
+ @Test
+ public void whenSSEEndpointIsCalled_thenEventStreamingBegins() {
+
+ Executable sseStreamingCall = () -> client.get()
+ .uri("/stream-sse")
+ .exchange()
+ .expectStatus()
+ .isOk()
+ .expectHeader()
+ .contentTypeCompatibleWith(MediaType.TEXT_EVENT_STREAM)
+ .expectBody(String.class);
+
+ Assertions.assertThrows(IllegalStateException.class, sseStreamingCall, "Expected test to timeout and throw IllegalStateException, but it didn't");
+ }
+
+ @Test
+ public void whenFluxEndpointIsCalled_thenEventStreamingBegins() {
+
+ Executable sseStreamingCall = () -> client.get()
+ .uri("/stream-flux")
+ .exchange()
+ .expectStatus()
+ .isOk()
+ .expectHeader()
+ .contentTypeCompatibleWith(MediaType.TEXT_EVENT_STREAM)
+ .expectBody(String.class);
+
+ Assertions.assertThrows(IllegalStateException.class, sseStreamingCall, "Expected test to timeout and throw IllegalStateException, but it didn't");
+ }
+}