JAVA-14288 Rename spring-5-reactive-modules to spring-reactive-modules (#12659)

* JAVA-14288 Rename spring-5-reactive-modules to spring-reactive-modules

* JAVA-14288 Remove failing module

* JAVA-14288 Revert commenting spring-cloud-openfeign-2 module
This commit is contained in:
anuragkumawat
2022-09-02 21:50:42 +05:30
committed by GitHub
parent 18f456b179
commit d429f0f064
303 changed files with 12 additions and 12 deletions

View File

@@ -0,0 +1,16 @@
## Spring Reactive
This module contains articles describing reactive processing in Spring.
## Relevant articles:
- [Intro To Reactor Core](https://www.baeldung.com/reactor-core)
- [Debugging Reactive Streams in Java](https://www.baeldung.com/spring-debugging-reactive-streams)
- [Guide to Spring 5 WebFlux](https://www.baeldung.com/spring-webflux)
- [Introduction to the Functional Web Framework in Spring 5](https://www.baeldung.com/spring-5-functional-web)
- [Spring 5 WebClient](https://www.baeldung.com/spring-5-webclient)
- [Spring WebClient vs. RestTemplate](https://www.baeldung.com/spring-webclient-resttemplate)
- [Spring WebClient Requests with Parameters](https://www.baeldung.com/webflux-webclient-parameters)
- [Handling Errors in Spring WebFlux](https://www.baeldung.com/spring-webflux-errors)
- [Spring Security 5 for Reactive Applications](https://www.baeldung.com/spring-security-5-reactive)
- [Concurrency in Spring WebFlux](https://www.baeldung.com/spring-webflux-concurrency)

View File

@@ -0,0 +1,99 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>spring-reactive</artifactId>
<parent>
<groupId>com.baeldung.spring.reactive</groupId>
<artifactId>spring-reactive-modules</artifactId>
<version>1.0.0-SNAPSHOT</version>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-validation</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-security</artifactId>
</dependency>
<dependency>
<groupId>io.reactivex.rxjava2</groupId>
<artifactId>rxjava</artifactId>
<version>${rxjava.version}</version>
</dependency>
<dependency>
<groupId>io.projectreactor.kafka</groupId>
<artifactId>reactor-kafka</artifactId>
<version>${reactor-kafka.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-mongodb-reactive</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.security</groupId>
<artifactId>spring-security-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<version>${reactor.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
</dependencies>
<profiles>
<profile>
<id>integration-lite-first</id>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<systemPropertyVariables>
<logback.configurationFile>${project.basedir}/src/test/resources/logback-test.xml</logback.configurationFile>
</systemPropertyVariables>
</configuration>
</plugin>
</plugins>
</build>
</profile>
<profile>
<id>integration-lite-second</id>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<systemPropertyVariables>
<logback.configurationFile>${project.basedir}/src/test/resources/logback-test.xml</logback.configurationFile>
</systemPropertyVariables>
</configuration>
</plugin>
</plugins>
</build>
</profile>
</profiles>
<properties>
<reactor.version>3.4.16</reactor.version>
<reactor-kafka.version>1.3.10</reactor-kafka.version>
<rxjava.version>2.2.21</rxjava.version>
</properties>
</project>

View File

@@ -0,0 +1,17 @@
package com.baeldung.reactive.concurrency;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* Please note we assume Mongo and Kafka are running in the local machine and on default configuration.
* Additionally, if you want to experiment with Tomcat/Jetty instead of Netty, just uncomment the lines in pom.xml and rebuild.
*/
@SpringBootApplication
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}

View File

@@ -0,0 +1,127 @@
package com.baeldung.reactive.concurrency;
import io.reactivex.Observable;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import reactor.kafka.receiver.KafkaReceiver;
import reactor.kafka.receiver.ReceiverOptions;
import reactor.kafka.receiver.ReceiverRecord;
import reactor.kafka.sender.KafkaSender;
import reactor.kafka.sender.SenderOptions;
import reactor.kafka.sender.SenderRecord;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@RestController
@RequestMapping("/")
public class Controller {
@Autowired
private PersonRepository personRepository;
private Scheduler scheduler = Schedulers.newBoundedElastic(5, 10, "MyThreadGroup");
private Logger logger = LoggerFactory.getLogger(Controller.class);
@GetMapping("/threads/webflux")
public Flux<String> getThreadsWebflux() {
return Flux.fromIterable(getThreads());
}
@GetMapping("/threads/webclient")
public Flux<String> getThreadsWebClient() {
WebClient.create("http://localhost:8080/index")
.get()
.retrieve()
.bodyToMono(String.class)
.subscribeOn(scheduler)
.publishOn(scheduler)
.doOnNext(s -> logger.info("Response: {}", s))
.subscribe();
return Flux.fromIterable(getThreads());
}
@GetMapping("/threads/rxjava")
public Observable<String> getIndexRxJava() {
Observable.fromIterable(Arrays.asList("Hello", "World"))
.map(s -> s.toUpperCase())
.observeOn(io.reactivex.schedulers.Schedulers.trampoline())
.doOnNext(s -> logger.info("String: {}", s))
.subscribe();
return Observable.fromIterable(getThreads());
}
@GetMapping("/threads/mongodb")
public Flux<String> getIndexMongo() {
personRepository.findAll()
.doOnNext(p -> logger.info("Person: {}", p))
.subscribe();
return Flux.fromIterable(getThreads());
}
@GetMapping("/threads/reactor-kafka")
public Flux<String> getIndexKafka() {
Map<String, Object> producerProps = new HashMap<>();
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
SenderOptions<Integer, String> senderOptions = SenderOptions.create(producerProps);
KafkaSender<Integer, String> sender = KafkaSender.create(senderOptions);
Flux<SenderRecord<Integer, String, Integer>> outboundFlux = Flux.range(1, 10)
.map(i -> SenderRecord.create(new ProducerRecord<>("reactive-test", i, "Message_" + i), i));
sender.send(outboundFlux)
.subscribe();
Map<String, Object> consumerProps = new HashMap<>();
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
consumerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, "my-consumer");
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
ReceiverOptions<Integer, String> receiverOptions = ReceiverOptions.create(consumerProps);
receiverOptions.subscription(Collections.singleton("reactive-test"));
KafkaReceiver<Integer, String> receiver = KafkaReceiver.create(receiverOptions);
Flux<ReceiverRecord<Integer, String>> inboundFlux = receiver.receive();
inboundFlux.subscribe(r -> {
logger.info("Received message: {}", r.value());
r.receiverOffset()
.acknowledge();
});
return Flux.fromIterable(getThreads());
}
@GetMapping("/index")
public Mono<String> getIndex() {
return Mono.just("Hello world!");
}
private List<String> getThreads() {
return Thread.getAllStackTraces()
.keySet()
.stream()
.map(t -> String.format("%-20s \t %s \t %d \t %s\n", t.getName(), t.getState(), t.getPriority(), t.isDaemon() ? "Daemon" : "Normal"))
.collect(Collectors.toList());
}
}

View File

@@ -0,0 +1,27 @@
package com.baeldung.reactive.concurrency;
import org.springframework.data.annotation.Id;
import org.springframework.data.mongodb.core.mapping.Document;
@Document
public class Person {
@Id
String id;
public Person(String id) {
this.id = id;
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
@Override
public String toString() {
return "Person{" + "id='" + id + '\'' + '}';
}
}

View File

@@ -0,0 +1,6 @@
package com.baeldung.reactive.concurrency;
import org.springframework.data.mongodb.repository.ReactiveMongoRepository;
public interface PersonRepository extends ReactiveMongoRepository<Person, String> {
}

View File

@@ -0,0 +1,33 @@
package com.baeldung.reactive.debugging.consumer;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.mongo.MongoReactiveAutoConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.security.config.web.server.ServerHttpSecurity;
import org.springframework.security.web.server.SecurityWebFilterChain;
import reactor.core.publisher.Hooks;
import java.util.Collections;
@SpringBootApplication(exclude = MongoReactiveAutoConfiguration.class)
@EnableScheduling
public class ConsumerDebuggingApplication {
public static void main(String[] args) {
Hooks.onOperatorDebug();
SpringApplication app = new SpringApplication(ConsumerDebuggingApplication.class);
app.setDefaultProperties(Collections.singletonMap("server.port", "8082"));
app.run(args);
}
@Bean
public SecurityWebFilterChain debuggingConsumerSpringSecurityFilterChain(ServerHttpSecurity http) {
http.authorizeExchange()
.anyExchange()
.permitAll();
http.csrf().disable();
return http.build();
}
}

View File

@@ -0,0 +1,22 @@
package com.baeldung.reactive.debugging.consumer.controllers;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Hooks;
@RestController
public class ReactiveConfigsToggleRestController {
@GetMapping("/debug-hook-on")
public String setReactiveDebugOn() {
Hooks.onOperatorDebug();
return "DEBUG HOOK ON";
}
@GetMapping("/debug-hook-off")
public String setReactiveDebugOff() {
Hooks.resetOnOperatorDebug();
return "DEBUG HOOK OFF";
}
}

View File

@@ -0,0 +1,154 @@
package com.baeldung.reactive.debugging.consumer.cronjobs;
import com.baeldung.reactive.debugging.consumer.model.Foo;
import com.baeldung.reactive.debugging.consumer.model.FooDto;
import com.baeldung.reactive.debugging.consumer.service.FooService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.MediaType;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Flux;
import java.time.Duration;
import java.util.concurrent.ThreadLocalRandom;
@Component
public class CronJobs {
final Logger logger = LoggerFactory.getLogger(CronJobs.class);
final WebClient client = WebClient.create("http://localhost:8081");
@Autowired
private FooService service;
@Scheduled(fixedRate = 10000)
public void consumeInfiniteFlux() {
Flux<Foo> fluxFoo = client.get()
.uri("/functional-reactive/periodic-foo")
.accept(MediaType.TEXT_EVENT_STREAM)
.retrieve()
.bodyToFlux(FooDto.class)
.delayElements(Duration.ofMillis(100))
.map(dto -> {
logger.debug("process 1 with dto id {} name{}", dto.getId(), dto.getName());
return new Foo(dto);
});
int random = ThreadLocalRandom.current()
.nextInt(0, 3);
switch (random) {
case 0:
logger.info("process 1 with approach 1");
service.processFoo(fluxFoo);
break;
case 1:
logger.info("process 1 with approach 1 EH");
service.processUsingApproachOneWithErrorHandling(fluxFoo);
break;
default:
logger.info("process 1 with approach 2");
service.processFooInAnotherScenario(fluxFoo);
break;
}
}
@Scheduled(fixedRate = 20000)
public void consumeFiniteFlux2() {
Flux<Foo> fluxFoo = client.get()
.uri("/functional-reactive/periodic-foo-2")
.accept(MediaType.TEXT_EVENT_STREAM)
.retrieve()
.bodyToFlux(FooDto.class)
.delayElements(Duration.ofMillis(100))
.map(dto -> {
logger.debug("process 2 with dto id {} name{}", dto.getId(), dto.getName());
return new Foo(dto);
});
int random = ThreadLocalRandom.current()
.nextInt(0, 3);
switch (random) {
case 0:
logger.info("process 2 with approach 1");
service.processFoo(fluxFoo);
break;
case 1:
logger.info("process 2 with approach 1 EH");
service.processUsingApproachOneWithErrorHandling(fluxFoo);
break;
default:
logger.info("process 2 with approach 2");
service.processFooInAnotherScenario(fluxFoo);
break;
}
}
@Scheduled(fixedRate = 20000)
public void consumeFiniteFlux3() {
Flux<Foo> fluxFoo = client.get()
.uri("/functional-reactive/periodic-foo-2")
.accept(MediaType.TEXT_EVENT_STREAM)
.retrieve()
.bodyToFlux(FooDto.class)
.delayElements(Duration.ofMillis(100))
.map(dto -> {
logger.debug("process 3 with dto id {} name{}", dto.getId(), dto.getName());
return new Foo(dto);
});
logger.info("process 3 with approach 3");
service.processUsingApproachThree(fluxFoo);
}
@Scheduled(fixedRate = 20000)
public void consumeFiniteFluxWithCheckpoint4() {
Flux<Foo> fluxFoo = client.get()
.uri("/functional-reactive/periodic-foo-2")
.accept(MediaType.TEXT_EVENT_STREAM)
.retrieve()
.bodyToFlux(FooDto.class)
.delayElements(Duration.ofMillis(100))
.map(dto -> {
logger.debug("process 4 with dto id {} name{}", dto.getId(), dto.getName());
return new Foo(dto);
});
logger.info("process 4 with approach 4");
service.processUsingApproachFourWithCheckpoint(fluxFoo);
}
@Scheduled(fixedRate = 20000)
public void consumeFiniteFluxWitParallelScheduler() {
Flux<Foo> fluxFoo = client.get()
.uri("/functional-reactive/periodic-foo-2")
.accept(MediaType.TEXT_EVENT_STREAM)
.retrieve()
.bodyToFlux(FooDto.class)
.delayElements(Duration.ofMillis(100))
.map(dto -> {
logger.debug("process 5-parallel with dto id {} name{}", dto.getId(), dto.getName());
return new Foo(dto);
});
logger.info("process 5-parallel with approach 5-parallel");
service.processUsingApproachFivePublishingToDifferentParallelThreads(fluxFoo);
}
@Scheduled(fixedRate = 20000)
public void consumeFiniteFluxWithSingleSchedulers() {
Flux<Foo> fluxFoo = client.get()
.uri("/functional-reactive/periodic-foo-2")
.accept(MediaType.TEXT_EVENT_STREAM)
.retrieve()
.bodyToFlux(FooDto.class)
.delayElements(Duration.ofMillis(100))
.map(dto -> {
logger.debug("process 5-single with dto id {} name{}", dto.getId(), dto.getName());
return new Foo(dto);
});
logger.info("process 5-single with approach 5-single");
service.processUsingApproachFivePublishingToDifferentSingleThreads(fluxFoo);
}
}

View File

@@ -0,0 +1,31 @@
package com.baeldung.reactive.debugging.consumer.model;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.util.concurrent.ThreadLocalRandom;
@Data
@NoArgsConstructor
@AllArgsConstructor
public class Foo {
private Integer id;
private String formattedName;
private Integer quantity;
public Foo(FooDto dto) {
this.id = randomId() == 0 ? null : dto.getId();
this.formattedName = dto.getName();
this.quantity = randomQuantity();
}
private static int randomId() {
return ThreadLocalRandom.current().nextInt(0, 100);
}
private static int randomQuantity() {
return ThreadLocalRandom.current().nextInt(0, 10);
}
}

View File

@@ -0,0 +1,15 @@
package com.baeldung.reactive.debugging.consumer.model;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@NoArgsConstructor
@AllArgsConstructor
public class FooDto {
private Integer id;
private String name;
}

View File

@@ -0,0 +1,40 @@
package com.baeldung.reactive.debugging.consumer.service;
import com.baeldung.reactive.debugging.consumer.model.Foo;
import java.util.concurrent.ThreadLocalRandom;
public class FooNameHelper {
public static Foo concatAndSubstringFooName(Foo foo) {
Foo concat = concatFooName(foo);
return substringFooName(concat);
}
public static Foo concatFooName(Foo foo) {
int random = ThreadLocalRandom.current()
.nextInt(0, 80);
String processedName = (random != 0)
? foo.getFormattedName()
: foo.getFormattedName() + "-bael";
foo.setFormattedName(processedName);
return foo;
}
public static Foo substringFooName(Foo foo) {
int random = ThreadLocalRandom.current()
.nextInt(0, 100);
String processedName = (random == 0)
? foo.getFormattedName().substring(10, 15)
: foo.getFormattedName().substring(0, 5);
foo.setFormattedName(processedName);
return foo;
}
}

View File

@@ -0,0 +1,24 @@
package com.baeldung.reactive.debugging.consumer.service;
import com.baeldung.reactive.debugging.consumer.model.Foo;
import java.util.concurrent.ThreadLocalRandom;
public class FooQuantityHelper {
public static Foo processFooReducingQuantity(Foo foo) {
int random = ThreadLocalRandom.current().nextInt(0, 90);
int result = (random == 0) ? 0 : foo.getQuantity() + 2;
foo.setQuantity(result);
return divideFooQuantity(foo);
}
public static Foo divideFooQuantity(Foo foo) {
Integer result = (int) Math.round(5.0 / foo.getQuantity());
foo.setQuantity(result);
return foo;
}
}

View File

@@ -0,0 +1,24 @@
package com.baeldung.reactive.debugging.consumer.service;
import com.baeldung.reactive.debugging.consumer.model.Foo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class FooReporter {
private static final Logger LOGGER = LoggerFactory.getLogger(FooReporter.class);
public static Foo reportResult(Foo foo, String approach) {
if (foo.getId() == null) {
throw new IllegalArgumentException("Null id is not valid!");
}
LOGGER.info("Reporting for approach {}: Foo with id '{}' name '{}' and quantity '{}'",
approach, foo.getId(), foo.getFormattedName(), foo.getQuantity());
return foo;
}
public static Foo reportResult(Foo input) {
return reportResult(input, "default");
}
}

View File

@@ -0,0 +1,113 @@
package com.baeldung.reactive.debugging.consumer.service;
import com.baeldung.reactive.debugging.consumer.model.Foo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
import static com.baeldung.reactive.debugging.consumer.service.FooReporter.reportResult;
@Component
public class FooService {
private static final Logger LOGGER = LoggerFactory.getLogger(FooService.class);
public void processFoo(Flux<Foo> flux) {
flux.map(FooNameHelper::concatFooName)
.map(FooNameHelper::substringFooName)
.log()
.map(FooReporter::reportResult)
.doOnError(error -> LOGGER.error("The following error happened on processFoo method!", error))
.subscribe();
}
public void processFooInAnotherScenario(Flux<Foo> flux) {
flux.map(FooNameHelper::substringFooName)
.map(FooQuantityHelper::divideFooQuantity)
.subscribe();
}
public void processUsingApproachOneWithErrorHandling(Flux<Foo> flux) {
LOGGER.info("starting approach one w error handling!");
flux.map(FooNameHelper::concatAndSubstringFooName)
.map(FooNameHelper::concatAndSubstringFooName)
.map(FooNameHelper::substringFooName)
.map(FooQuantityHelper::processFooReducingQuantity)
.map(FooQuantityHelper::processFooReducingQuantity)
.map(FooQuantityHelper::processFooReducingQuantity)
.map(FooReporter::reportResult)
.doOnError(error -> LOGGER.error("Approach 1 with Error Handling failed!", error))
.subscribe();
}
public void processUsingApproachThree(Flux<Foo> flux) {
LOGGER.info("starting approach three!");
flux.map(FooNameHelper::concatAndSubstringFooName)
.map(foo -> reportResult(foo, "THREE"))
.doOnError(error -> LOGGER.error("Approach 3 failed!", error))
.subscribe();
}
public void processUsingApproachFourWithCheckpoint(Flux<Foo> flux) {
LOGGER.info("starting approach four!");
flux.map(FooNameHelper::concatAndSubstringFooName)
.checkpoint("CHECKPOINT 1")
.map(FooNameHelper::concatAndSubstringFooName)
.map(FooQuantityHelper::divideFooQuantity)
.checkpoint("CHECKPOINT 2", true)
.map(foo -> reportResult(foo, "FOUR"))
.map(FooNameHelper::concatAndSubstringFooName)
.doOnError(error -> LOGGER.error("Approach 4 failed!", error))
.subscribe();
}
public void processUsingApproachFourWithInitialCheckpoint(Flux<Foo> flux) {
LOGGER.info("starting approach four!");
flux.map(FooNameHelper::concatAndSubstringFooName)
.checkpoint("CHECKPOINT 1", true)
.map(FooNameHelper::concatAndSubstringFooName)
.map(FooQuantityHelper::divideFooQuantity)
.map(foo -> reportResult(foo, "FOUR"))
.map(FooNameHelper::concatAndSubstringFooName)
.doOnError(error -> LOGGER.error("Approach 4-2 failed!", error))
.subscribe();
}
public void processUsingApproachFivePublishingToDifferentParallelThreads(Flux<Foo> flux) {
LOGGER.info("starting approach five-parallel!");
flux.map(FooNameHelper::concatAndSubstringFooName)
.publishOn(Schedulers.newParallel("five-parallel-foo"))
.log()
.map(FooNameHelper::concatAndSubstringFooName)
.map(foo -> reportResult(foo, "FIVE-PARALLEL"))
.publishOn(Schedulers.newSingle("five-parallel-bar"))
.map(FooNameHelper::concatAndSubstringFooName)
.doOnError(error -> LOGGER.error("Approach 5-parallel failed!", error))
.subscribeOn(Schedulers.newParallel("five-parallel-starter"))
.subscribe();
}
public void processUsingApproachFivePublishingToDifferentSingleThreads(Flux<Foo> flux) {
LOGGER.info("starting approach five-single!");
flux.log()
.subscribeOn(Schedulers.newSingle("five-single-starter"))
.map(FooNameHelper::concatAndSubstringFooName)
.publishOn(Schedulers.newSingle("five-single-foo"))
.map(FooNameHelper::concatAndSubstringFooName)
.map(FooQuantityHelper::divideFooQuantity)
.map(foo -> reportResult(foo, "FIVE-SINGLE"))
.publishOn(Schedulers.newSingle("five-single-bar"))
.map(FooNameHelper::concatAndSubstringFooName)
.doOnError(error -> LOGGER.error("Approach 5-single failed!", error))
.subscribe();
}
}

View File

@@ -0,0 +1,30 @@
package com.baeldung.reactive.debugging.server;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.mongo.MongoReactiveAutoConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.security.config.web.server.ServerHttpSecurity;
import org.springframework.security.web.server.SecurityWebFilterChain;
import org.springframework.web.reactive.config.EnableWebFlux;
import java.util.Collections;
@EnableWebFlux
@SpringBootApplication(exclude = MongoReactiveAutoConfiguration.class)
public class ServerDebuggingApplication {
public static void main(String[] args) {
SpringApplication app = new SpringApplication(ServerDebuggingApplication.class);
app.setDefaultProperties(Collections.singletonMap("server.port", "8081"));
app.run(args);
}
@Bean
public SecurityWebFilterChain debuggingServerSpringSecurityFilterChain(ServerHttpSecurity http) {
http.authorizeExchange()
.anyExchange()
.permitAll();
return http.build();
}
}

View File

@@ -0,0 +1,46 @@
package com.baeldung.reactive.debugging.server.handlers;
import com.baeldung.reactive.debugging.server.model.Foo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.MediaType;
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.function.server.ServerRequest;
import org.springframework.web.reactive.function.server.ServerResponse;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.time.Duration;
import java.util.concurrent.ThreadLocalRandom;
@Component
public class ServerHandler {
private static final Logger LOGGER = LoggerFactory.getLogger(ServerHandler.class);
public Mono<ServerResponse> useHandler(final ServerRequest request) {
// there are chances that something goes wrong here...
return ServerResponse.ok()
.contentType(MediaType.TEXT_EVENT_STREAM)
.body(getFlux(), Foo.class);
}
public Mono<ServerResponse> useHandlerFinite(final ServerRequest request) {
return ServerResponse.ok()
.contentType(MediaType.TEXT_EVENT_STREAM)
.body(Flux.range(0, 50)
.map(sequence -> new Foo(new Long(sequence), "theFooNameNumber" + sequence)
), Foo.class);
}
private static Flux<Foo> getFlux() {
return Flux.interval(Duration.ofSeconds(1))
.map(sequence -> {
LOGGER.info("retrieving Foo. Sequence: {}", sequence);
if (ThreadLocalRandom.current().nextInt(0, 50) == 1) {
throw new RuntimeException("There was an error retrieving the Foo!");
}
return new Foo(sequence, "name" + sequence);
});
}
}

View File

@@ -0,0 +1,13 @@
package com.baeldung.reactive.debugging.server.model;
import lombok.AllArgsConstructor;
import lombok.Data;
@Data
@AllArgsConstructor
public class Foo {
private Long id;
private String name;
}

View File

@@ -0,0 +1,21 @@
package com.baeldung.reactive.debugging.server.routers;
import com.baeldung.reactive.debugging.server.handlers.ServerHandler;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.reactive.function.server.RequestPredicates;
import org.springframework.web.reactive.function.server.RouterFunction;
import org.springframework.web.reactive.function.server.RouterFunctions;
import org.springframework.web.reactive.function.server.ServerResponse;
@Configuration
public class ServerRouter {
@Bean
public RouterFunction<ServerResponse> responseRoute(@Autowired ServerHandler handler) {
return RouterFunctions.route(RequestPredicates.GET("/functional-reactive/periodic-foo"), handler::useHandler)
.andRoute(RequestPredicates.GET("/functional-reactive/periodic-foo-2"), handler::useHandlerFinite);
}
}

View File

@@ -0,0 +1,18 @@
package com.baeldung.reactive.errorhandling;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.mongo.MongoReactiveAutoConfiguration;
import org.springframework.boot.autoconfigure.security.reactive.ReactiveSecurityAutoConfiguration;
import org.springframework.boot.autoconfigure.security.reactive.ReactiveUserDetailsServiceAutoConfiguration;
@SpringBootApplication(exclude = {
MongoReactiveAutoConfiguration.class,
ReactiveSecurityAutoConfiguration.class,
ReactiveUserDetailsServiceAutoConfiguration.class })
public class ErrorHandlingApplication {
public static void main(String[] args) {
SpringApplication.run(ErrorHandlingApplication.class, args);
}
}

View File

@@ -0,0 +1,22 @@
package com.baeldung.reactive.errorhandling;
import org.springframework.boot.web.error.ErrorAttributeOptions;
import org.springframework.boot.web.reactive.error.DefaultErrorAttributes;
import org.springframework.http.HttpStatus;
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.function.server.ServerRequest;
import java.util.Map;
@Component
public class GlobalErrorAttributes extends DefaultErrorAttributes {
@Override
public Map<String, Object> getErrorAttributes(ServerRequest request, ErrorAttributeOptions options) {
Map<String, Object> map = super.getErrorAttributes(request, options);
map.put("status", HttpStatus.BAD_REQUEST);
map.put("message", "please provide a name");
return map;
}
}

View File

@@ -0,0 +1,48 @@
package com.baeldung.reactive.errorhandling;
import org.springframework.boot.autoconfigure.web.WebProperties;
import org.springframework.boot.autoconfigure.web.reactive.error.AbstractErrorWebExceptionHandler;
import org.springframework.boot.web.error.ErrorAttributeOptions;
import org.springframework.boot.web.reactive.error.ErrorAttributes;
import org.springframework.context.ApplicationContext;
import org.springframework.core.annotation.Order;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.codec.ServerCodecConfigurer;
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.function.BodyInserters;
import org.springframework.web.reactive.function.server.RequestPredicates;
import org.springframework.web.reactive.function.server.RouterFunction;
import org.springframework.web.reactive.function.server.RouterFunctions;
import org.springframework.web.reactive.function.server.ServerRequest;
import org.springframework.web.reactive.function.server.ServerResponse;
import reactor.core.publisher.Mono;
import java.util.Map;
@Component
@Order(-2)
public class GlobalErrorWebExceptionHandler extends AbstractErrorWebExceptionHandler {
public GlobalErrorWebExceptionHandler(GlobalErrorAttributes g, ApplicationContext applicationContext,
ServerCodecConfigurer serverCodecConfigurer) {
super(g, new WebProperties.Resources(), applicationContext);
super.setMessageWriters(serverCodecConfigurer.getWriters());
super.setMessageReaders(serverCodecConfigurer.getReaders());
}
@Override
protected RouterFunction<ServerResponse> getRoutingFunction(final ErrorAttributes errorAttributes) {
return RouterFunctions.route(RequestPredicates.all(), this::renderErrorResponse);
}
private Mono<ServerResponse> renderErrorResponse(final ServerRequest request) {
final Map<String, Object> errorPropertiesMap = getErrorAttributes(request, ErrorAttributeOptions.defaults());
return ServerResponse.status(HttpStatus.BAD_REQUEST)
.contentType(MediaType.APPLICATION_JSON)
.body(BodyInserters.fromValue(errorPropertiesMap));
}
}

View File

@@ -0,0 +1,67 @@
package com.baeldung.reactive.errorhandling;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.function.server.ServerRequest;
import org.springframework.web.reactive.function.server.ServerResponse;
import reactor.core.publisher.Mono;
@Component
public class Handler {
public Mono<ServerResponse> handleWithErrorReturn(ServerRequest request) {
return sayHello(request)
.onErrorReturn("Hello, Stranger")
.flatMap(s -> ServerResponse.ok()
.contentType(MediaType.TEXT_PLAIN)
.bodyValue(s));
}
public Mono<ServerResponse> handleWithErrorResumeAndDynamicFallback(ServerRequest request) {
return sayHello(request)
.flatMap(s -> ServerResponse.ok()
.contentType(MediaType.TEXT_PLAIN)
.bodyValue(s))
.onErrorResume(e -> (Mono.just("Hi, I looked around for your name but found: " + e.getMessage()))
.flatMap(s -> ServerResponse.ok()
.contentType(MediaType.TEXT_PLAIN)
.bodyValue(s)));
}
public Mono<ServerResponse> handleWithErrorResumeAndFallbackMethod(ServerRequest request) {
return sayHello(request)
.flatMap(s -> ServerResponse.ok()
.contentType(MediaType.TEXT_PLAIN)
.bodyValue(s))
.onErrorResume(e -> sayHelloFallback()
.flatMap(s -> ServerResponse.ok()
.contentType(MediaType.TEXT_PLAIN)
.bodyValue(s)));
}
public Mono<ServerResponse> handleWithErrorResumeAndCustomException(ServerRequest request) {
return ServerResponse.ok()
.body(sayHello(request)
.onErrorResume(e -> Mono.error(new NameRequiredException(
HttpStatus.BAD_REQUEST,
"please provide a name", e))), String.class);
}
public Mono<ServerResponse> handleWithGlobalErrorHandler(ServerRequest request) {
return ServerResponse.ok()
.body(sayHello(request), String.class);
}
private Mono<String> sayHello(ServerRequest request) {
try {
return Mono.just("Hello, " + request.queryParam("name").get());
} catch (Exception e) {
return Mono.error(e);
}
}
private Mono<String> sayHelloFallback() {
return Mono.just("Hello, Stranger");
}
}

View File

@@ -0,0 +1,11 @@
package com.baeldung.reactive.errorhandling;
import org.springframework.http.HttpStatus;
import org.springframework.web.server.ResponseStatusException;
public class NameRequiredException extends ResponseStatusException {
public NameRequiredException(HttpStatus status, String message, Throwable e) {
super(status, message, e);
}
}

View File

@@ -0,0 +1,26 @@
package com.baeldung.reactive.errorhandling;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.function.server.RouterFunction;
import org.springframework.web.reactive.function.server.RouterFunctions;
import org.springframework.web.reactive.function.server.ServerResponse;
import static org.springframework.http.MediaType.TEXT_PLAIN;
import static org.springframework.web.reactive.function.server.RequestPredicates.GET;
import static org.springframework.web.reactive.function.server.RequestPredicates.accept;
@Component
public class Router {
@Bean
public RouterFunction<ServerResponse> routes(Handler handler) {
return RouterFunctions
.route(GET("/api/endpoint1").and(accept(TEXT_PLAIN)), handler::handleWithErrorReturn)
.andRoute(GET("/api/endpoint2").and(accept(TEXT_PLAIN)), handler::handleWithErrorResumeAndFallbackMethod)
.andRoute(GET("/api/endpoint3").and(accept(TEXT_PLAIN)), handler::handleWithErrorResumeAndDynamicFallback)
.andRoute(GET("/api/endpoint4").and(accept(TEXT_PLAIN)), handler::handleWithErrorResumeAndCustomException)
.andRoute(GET("/api/endpoint5").and(accept(TEXT_PLAIN)), handler::handleWithGlobalErrorHandler);
}
}

View File

@@ -0,0 +1,37 @@
package com.baeldung.reactive.security;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Mono;
import java.security.Principal;
@RestController
public class GreetingController {
private final GreetingService greetingService;
public GreetingController(GreetingService greetingService) {
this.greetingService = greetingService;
}
@GetMapping("/")
public Mono<String> greet(Mono<Principal> principal) {
return principal
.map(Principal::getName)
.map(name -> String.format("Hello, %s", name));
}
@GetMapping("/admin")
public Mono<String> greetAdmin(Mono<Principal> principal) {
return principal
.map(Principal::getName)
.map(name -> String.format("Admin access: %s", name));
}
@GetMapping("/greetingService")
public Mono<String> greetingService() {
return greetingService.greet();
}
}

View File

@@ -0,0 +1,15 @@
package com.baeldung.reactive.security;
import org.springframework.security.access.prepost.PreAuthorize;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Mono;
@Service
public class GreetingService {
@PreAuthorize("hasRole('ADMIN')")
public Mono<String> greet() {
return Mono.just("Hello from service!");
}
}

View File

@@ -0,0 +1,52 @@
package com.baeldung.reactive.security;
import org.springframework.context.annotation.Bean;
import org.springframework.security.config.annotation.method.configuration.EnableReactiveMethodSecurity;
import org.springframework.security.config.annotation.web.reactive.EnableWebFluxSecurity;
import org.springframework.security.config.web.server.ServerHttpSecurity;
import org.springframework.security.core.userdetails.MapReactiveUserDetailsService;
import org.springframework.security.core.userdetails.User;
import org.springframework.security.core.userdetails.UserDetails;
import org.springframework.security.crypto.bcrypt.BCryptPasswordEncoder;
import org.springframework.security.crypto.password.PasswordEncoder;
import org.springframework.security.web.server.SecurityWebFilterChain;
@EnableWebFluxSecurity
@EnableReactiveMethodSecurity
public class SecurityConfig {
@Bean
public SecurityWebFilterChain securityWebFilterChain(ServerHttpSecurity http) {
return http.authorizeExchange()
.pathMatchers("/admin").hasAuthority("ROLE_ADMIN")
.anyExchange().authenticated()
.and()
.formLogin()
.and()
.csrf().disable()
.build();
}
@Bean
public MapReactiveUserDetailsService userDetailsService() {
UserDetails user = User
.withUsername("user")
.password(passwordEncoder().encode("password"))
.roles("USER")
.build();
UserDetails admin = User
.withUsername("admin")
.password(passwordEncoder().encode("password"))
.roles("ADMIN")
.build();
return new MapReactiveUserDetailsService(user, admin);
}
@Bean
public PasswordEncoder passwordEncoder() {
return new BCryptPasswordEncoder();
}
}

View File

@@ -0,0 +1,34 @@
package com.baeldung.reactive.security;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.http.server.reactive.HttpHandler;
import org.springframework.http.server.reactive.ReactorHttpHandlerAdapter;
import org.springframework.web.reactive.config.EnableWebFlux;
import org.springframework.web.server.adapter.WebHttpHandlerBuilder;
import reactor.netty.DisposableServer;
import reactor.netty.http.server.HttpServer;
@ComponentScan(basePackages = {"com.baeldung.reactive.security"})
@EnableWebFlux
public class SpringSecurity5Application {
public static void main(String[] args) {
try (AnnotationConfigApplicationContext context =
new AnnotationConfigApplicationContext(SpringSecurity5Application.class)) {
context.getBean(DisposableServer.class).onDispose().block();
}
}
@Bean
public DisposableServer disposableServer(ApplicationContext context) {
HttpHandler handler = WebHttpHandlerBuilder.applicationContext(context)
.build();
ReactorHttpHandlerAdapter adapter = new ReactorHttpHandlerAdapter(handler);
HttpServer httpServer = HttpServer.create().host("localhost").port(8083);
return httpServer.handle(adapter).bindNow();
}
}

View File

@@ -0,0 +1,14 @@
package com.baeldung.reactive.webclient;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@NoArgsConstructor
@AllArgsConstructor
public class Foo {
private String name;
}

View File

@@ -0,0 +1,13 @@
package com.baeldung.reactive.webclient;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@NoArgsConstructor
@AllArgsConstructor
public class Tweet {
private String text;
private String username;
}

View File

@@ -0,0 +1,20 @@
package com.baeldung.reactive.webclient;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.Arrays;
import java.util.List;
@RestController
public class TweetsSlowServiceController {
@GetMapping("/slow-service-tweets")
private List<Tweet> getAllTweets() throws Exception {
Thread.sleep(2000L); // delay
return Arrays.asList(
new Tweet("RestTemplate rules", "@user1"),
new Tweet("WebClient is better", "@user2"),
new Tweet("OK, both are useful", "@user1"));
}
}

View File

@@ -0,0 +1,24 @@
package com.baeldung.reactive.webclient;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.mongo.MongoReactiveAutoConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.security.config.web.server.ServerHttpSecurity;
import org.springframework.security.web.server.SecurityWebFilterChain;
@SpringBootApplication(exclude = MongoReactiveAutoConfiguration.class)
public class WebClientApplication {
public static void main(String[] args) {
SpringApplication.run(WebClientApplication.class, args);
}
@Bean
public SecurityWebFilterChain filterChain(ServerHttpSecurity http) {
http.csrf().disable()
.authorizeExchange()
.anyExchange().permitAll();
return http.build();
}
}

View File

@@ -0,0 +1,46 @@
package com.baeldung.reactive.webclient;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestPart;
import org.springframework.web.bind.annotation.ResponseStatus;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Mono;
import java.util.HashMap;
import java.util.Map;
@RestController
public class WebClientController {
@ResponseStatus(HttpStatus.OK)
@GetMapping("/resource")
public Map<String, String> getResource() {
Map<String, String> response = new HashMap<>();
response.put("field", "value");
return response;
}
@PostMapping("/resource")
public Mono<String> postStringResource(@RequestBody Mono<String> bodyString) {
return bodyString.map(body -> "processed-" + body);
}
@PostMapping("/resource-override")
public Mono<String> postStringResourceOverride(@RequestBody Mono<String> bodyString) {
return bodyString.map(body -> "override-processed-" + body);
}
@PostMapping("/resource-foo")
public Mono<String> postFooResource(@RequestBody Mono<Foo> bodyFoo) {
return bodyFoo.map(foo -> "processedFoo-" + foo.getName());
}
@PostMapping(value = "/resource-multipart", consumes = MediaType.MULTIPART_FORM_DATA_VALUE)
public String handleFormUpload(@RequestPart("key1") String value1, @RequestPart("key2") String value2) {
return "processed-" + value1 + '-' + value2;
}
}

View File

@@ -0,0 +1,60 @@
package com.baeldung.reactive.webclient;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.http.HttpMethod;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.client.RestTemplate;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Flux;
import java.util.List;
@Slf4j
@RestController
public class WebController {
private static final int DEFAULT_PORT = 8080;
@Setter
private int serverPort = DEFAULT_PORT;
@GetMapping("/tweets-blocking")
public List<Tweet> getTweetsBlocking() {
log.info("Starting BLOCKING Controller!");
final String uri = getSlowServiceUri();
RestTemplate restTemplate = new RestTemplate();
ResponseEntity<List<Tweet>> response = restTemplate.exchange(
uri, HttpMethod.GET, null,
new ParameterizedTypeReference<List<Tweet>>(){});
List<Tweet> result = response.getBody();
result.forEach(tweet -> log.info(tweet.toString()));
log.info("Exiting BLOCKING Controller!");
return result;
}
@GetMapping(value = "/tweets-non-blocking", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<Tweet> getTweetsNonBlocking() {
log.info("Starting NON-BLOCKING Controller!");
Flux<Tweet> tweetFlux = WebClient.create()
.get()
.uri(getSlowServiceUri())
.retrieve()
.bodyToFlux(Tweet.class);
tweetFlux.subscribe(tweet -> log.info(tweet.toString()));
log.info("Exiting NON-BLOCKING Controller!");
return tweetFlux;
}
private String getSlowServiceUri() {
return "http://localhost:" + serverPort + "/slow-service-tweets";
}
}

View File

@@ -0,0 +1,13 @@
package com.baeldung.reactive.webclientrequests;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.mongo.MongoReactiveAutoConfiguration;
@SpringBootApplication(exclude = MongoReactiveAutoConfiguration.class)
public class SpringWebClientRequestsApp {
public static void main(String[] args) {
SpringApplication.run(SpringWebClientRequestsApp.class, args);
}
}

View File

@@ -0,0 +1,15 @@
package com.baeldung.reactive.webflux;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@AllArgsConstructor
@NoArgsConstructor
public class Employee {
private String id;
private String name;
}

View File

@@ -0,0 +1,44 @@
package com.baeldung.reactive.webflux;
import org.springframework.stereotype.Repository;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.util.HashMap;
import java.util.Map;
@Repository
public class EmployeeRepository {
private static final Map<String, Employee> EMPLOYEE_DATA;
static {
EMPLOYEE_DATA = new HashMap<>();
EMPLOYEE_DATA.put("1", new Employee("1", "Employee 1"));
EMPLOYEE_DATA.put("2", new Employee("2", "Employee 2"));
EMPLOYEE_DATA.put("3", new Employee("3", "Employee 3"));
EMPLOYEE_DATA.put("4", new Employee("4", "Employee 4"));
EMPLOYEE_DATA.put("5", new Employee("5", "Employee 5"));
EMPLOYEE_DATA.put("6", new Employee("6", "Employee 6"));
EMPLOYEE_DATA.put("7", new Employee("7", "Employee 7"));
EMPLOYEE_DATA.put("8", new Employee("8", "Employee 8"));
EMPLOYEE_DATA.put("9", new Employee("9", "Employee 9"));
EMPLOYEE_DATA.put("10", new Employee("10", "Employee 10"));
}
public Mono<Employee> findEmployeeById(String id) {
return Mono.just(EMPLOYEE_DATA.get(id));
}
public Flux<Employee> findAllEmployees() {
return Flux.fromIterable(EMPLOYEE_DATA.values());
}
public Mono<Employee> updateEmployee(Employee employee) {
Employee existingEmployee = EMPLOYEE_DATA.get(employee.getId());
if (existingEmployee != null) {
existingEmployee.setName(employee.getName());
}
return Mono.just(existingEmployee);
}
}

View File

@@ -0,0 +1,39 @@
package com.baeldung.reactive.webflux.annotation;
import com.baeldung.reactive.webflux.Employee;
import com.baeldung.reactive.webflux.EmployeeRepository;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@RestController
@RequestMapping("/employees")
public class EmployeeController {
private final EmployeeRepository employeeRepository;
public EmployeeController(EmployeeRepository employeeRepository) {
this.employeeRepository = employeeRepository;
}
@GetMapping("/{id}")
private Mono<Employee> getEmployeeById(@PathVariable String id) {
return employeeRepository.findEmployeeById(id);
}
@GetMapping
private Flux<Employee> getAllEmployees() {
return employeeRepository.findAllEmployees();
}
@PostMapping("/update")
private Mono<Employee> updateEmployee(@RequestBody Employee employee) {
return employeeRepository.updateEmployee(employee);
}
}

View File

@@ -0,0 +1,24 @@
package com.baeldung.reactive.webflux.annotation;
import com.baeldung.reactive.webflux.EmployeeRepository;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.mongo.MongoReactiveAutoConfiguration;
import org.springframework.context.annotation.Bean;
@SpringBootApplication(exclude = MongoReactiveAutoConfiguration.class)
public class EmployeeSpringApplication {
@Bean
EmployeeRepository employeeRepository() {
return new EmployeeRepository();
}
public static void main(String[] args) {
SpringApplication.run(EmployeeSpringApplication.class, args);
EmployeeWebClient employeeWebClient = new EmployeeWebClient();
employeeWebClient.consume();
}
}

View File

@@ -0,0 +1,32 @@
package com.baeldung.reactive.webflux.annotation;
import com.baeldung.reactive.webflux.Employee;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
public class EmployeeWebClient {
private static final Logger LOGGER = LoggerFactory.getLogger(EmployeeWebClient.class);
WebClient client = WebClient.create("http://localhost:8080");
public void consume() {
Mono<Employee> employeeMono = client.get()
.uri("/employees/{id}", "1")
.retrieve()
.bodyToMono(Employee.class);
employeeMono.subscribe(employee -> LOGGER.info("Employee: {}", employee));
Flux<Employee> employeeFlux = client.get()
.uri("/employees")
.retrieve()
.bodyToFlux(Employee.class);
employeeFlux.subscribe(employee -> LOGGER.info("Employee: {}", employee));
}
}

View File

@@ -0,0 +1,43 @@
package com.baeldung.reactive.webflux.annotation;
import org.springframework.context.annotation.Bean;
import org.springframework.http.HttpMethod;
import org.springframework.security.config.annotation.web.reactive.EnableWebFluxSecurity;
import org.springframework.security.config.web.server.ServerHttpSecurity;
import org.springframework.security.core.userdetails.MapReactiveUserDetailsService;
import org.springframework.security.core.userdetails.User;
import org.springframework.security.core.userdetails.UserDetails;
import org.springframework.security.crypto.bcrypt.BCryptPasswordEncoder;
import org.springframework.security.crypto.password.PasswordEncoder;
import org.springframework.security.web.server.SecurityWebFilterChain;
@EnableWebFluxSecurity
public class EmployeeWebSecurityConfig {
@Bean
public MapReactiveUserDetailsService userDetailsService() {
UserDetails user = User
.withUsername("admin")
.password(passwordEncoder().encode("password"))
.roles("ADMIN")
.build();
return new MapReactiveUserDetailsService(user);
}
@Bean
public SecurityWebFilterChain springSecurityFilterChain(ServerHttpSecurity http) {
http
.csrf().disable()
.authorizeExchange()
.pathMatchers(HttpMethod.POST, "/employees/update").hasRole("ADMIN")
.pathMatchers("/**").permitAll()
.and()
.httpBasic();
return http.build();
}
@Bean
public PasswordEncoder passwordEncoder() {
return new BCryptPasswordEncoder();
}
}

View File

@@ -0,0 +1,63 @@
package com.baeldung.reactive.webflux.functional;
import com.baeldung.reactive.webflux.Employee;
import com.baeldung.reactive.webflux.EmployeeRepository;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.security.config.web.server.ServerHttpSecurity;
import org.springframework.security.web.server.SecurityWebFilterChain;
import org.springframework.web.reactive.function.server.RouterFunction;
import org.springframework.web.reactive.function.server.ServerResponse;
import static org.springframework.web.reactive.function.BodyExtractors.toMono;
import static org.springframework.web.reactive.function.server.RequestPredicates.GET;
import static org.springframework.web.reactive.function.server.RequestPredicates.POST;
import static org.springframework.web.reactive.function.server.RouterFunctions.route;
import static org.springframework.web.reactive.function.server.ServerResponse.ok;
@Configuration
public class EmployeeFunctionalConfig {
@Bean
EmployeeRepository employeeRepository() {
return new EmployeeRepository();
}
@Bean
RouterFunction<ServerResponse> getAllEmployeesRoute() {
return route(GET("/employees"), req -> ok().body(employeeRepository().findAllEmployees(), Employee.class));
}
@Bean
RouterFunction<ServerResponse> getEmployeeByIdRoute() {
return route(GET("/employees/{id}"), req -> ok().body(employeeRepository().findEmployeeById(req.pathVariable("id")), Employee.class));
}
@Bean
RouterFunction<ServerResponse> updateEmployeeRoute() {
return route(POST("/employees/update"), req -> req.body(toMono(Employee.class))
.doOnNext(employeeRepository()::updateEmployee)
.then(ok().build()));
}
@Bean
RouterFunction<ServerResponse> composedRoutes() {
return route(GET("/employees"), req -> ok().body(employeeRepository().findAllEmployees(), Employee.class))
.and(route(GET("/employees/{id}"), req -> ok().body(employeeRepository().findEmployeeById(req.pathVariable("id")), Employee.class)))
.and(route(POST("/employees/update"), req -> req.body(toMono(Employee.class))
.doOnNext(employeeRepository()::updateEmployee)
.then(ok().build())));
}
@Bean
public SecurityWebFilterChain springSecurityFilterChain(ServerHttpSecurity http) {
http.csrf()
.disable()
.authorizeExchange()
.anyExchange()
.permitAll();
return http.build();
}
}

View File

@@ -0,0 +1,14 @@
package com.baeldung.reactive.webflux.functional;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.mongo.MongoReactiveAutoConfiguration;
@SpringBootApplication(exclude = MongoReactiveAutoConfiguration.class)
public class EmployeeSpringFunctionalApplication {
public static void main(String[] args) {
SpringApplication.run(EmployeeSpringFunctionalApplication.class, args);
}
}

View File

@@ -0,0 +1,63 @@
package com.baeldung.reactive.debugging.consumer;
import static org.assertj.core.api.Assertions.assertThat;
import java.util.Arrays;
import java.util.Collection;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import com.baeldung.reactive.debugging.consumer.model.Foo;
import com.baeldung.reactive.debugging.consumer.service.FooService;
import com.baeldung.reactive.debugging.consumer.utils.ListAppender;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.classic.spi.IThrowableProxy;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Hooks;
class ConsumerFooServiceIntegrationTest {
FooService service = new FooService();
@BeforeEach
void clearLogList() {
Hooks.onOperatorDebug();
ListAppender.clearEventList();
}
@Test
void givenFooWithNullId_whenProcessFoo_thenLogsWithDebugTrace() {
Foo one = new Foo(1, "nameverylong", 8);
Foo two = new Foo(null, "nameverylong", 4);
Flux<Foo> flux = Flux.just(one, two);
service.processFoo(flux);
Collection<String> allLoggedEntries = ListAppender.getEvents()
.stream()
.map(ILoggingEvent::getFormattedMessage)
.collect(Collectors.toList());
Collection<String> allSuppressedEntries = ListAppender.getEvents()
.stream()
.map(ILoggingEvent::getThrowableProxy)
.flatMap(t -> Optional.ofNullable(t)
.map(IThrowableProxy::getSuppressed)
.map(Arrays::stream)
.orElse(Stream.empty()))
.map(IThrowableProxy::getClassName)
.collect(Collectors.toList());
assertThat(allLoggedEntries)
.anyMatch(entry -> entry.contains("The following error happened on processFoo method!"))
.anyMatch(entry -> entry.contains("| onSubscribe"))
.anyMatch(entry -> entry.contains("| cancel()"));
assertThat(allSuppressedEntries)
.anyMatch(entry -> entry.contains("reactor.core.publisher.FluxOnAssembly$OnAssemblyException"));
}
}

View File

@@ -0,0 +1,50 @@
package com.baeldung.reactive.debugging.consumer;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.springframework.test.web.reactive.server.WebTestClient;
import org.springframework.test.web.reactive.server.WebTestClient.ResponseSpec;
/**
* In order to run this live test, start the following classes:
* - com.baeldung.reactive.debugging.server.ServerDebuggingApplication
* - com.baeldung.reactive.debugging.consumer.ConsumerDebuggingApplication
*/
class ConsumerFooServiceLiveTest {
private static final String BASE_URL = "http://localhost:8082";
private static final String DEBUG_HOOK_ON = BASE_URL + "/debug-hook-on";
private static final String DEBUG_HOOK_OFF = BASE_URL + "/debug-hook-off";
private static WebTestClient client;
@BeforeAll
static void setup() {
client = WebTestClient.bindToServer()
.baseUrl(BASE_URL)
.build();
}
@Test
void whenRequestingDebugHookOn_thenObtainExpectedMessage() {
ResponseSpec response = client.get()
.uri(DEBUG_HOOK_ON)
.exchange();
response.expectStatus()
.isOk()
.expectBody(String.class)
.isEqualTo("DEBUG HOOK ON");
}
@Test
void whenRequestingDebugHookOff_thenObtainExpectedMessage() {
ResponseSpec response = client.get()
.uri(DEBUG_HOOK_OFF)
.exchange();
response.expectStatus()
.isOk()
.expectBody(String.class)
.isEqualTo("DEBUG HOOK OFF");
}
}

View File

@@ -0,0 +1,25 @@
package com.baeldung.reactive.debugging.consumer.utils;
import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.core.AppenderBase;
import java.util.ArrayList;
import java.util.List;
public class ListAppender extends AppenderBase<ILoggingEvent> {
private static final List<ILoggingEvent> EVENTS = new ArrayList<>();
@Override
protected void append(ILoggingEvent eventObject) {
EVENTS.add(eventObject);
}
public static List<ILoggingEvent> getEvents() {
return EVENTS;
}
public static void clearEventList() {
EVENTS.clear();
}
}

View File

@@ -0,0 +1,124 @@
package com.baeldung.reactive.errorhandling;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.autoconfigure.web.reactive.AutoConfigureWebTestClient;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.context.SpringBootTest.WebEnvironment;
import org.springframework.http.MediaType;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.web.reactive.server.WebTestClient;
@SpringBootTest(webEnvironment = WebEnvironment.RANDOM_PORT)
@DirtiesContext
@AutoConfigureWebTestClient(timeout = "10000")
class ErrorHandlingIntegrationTest {
@Autowired
private WebTestClient webTestClient;
@Test
void givenErrorReturn_whenUsernamePresent_thenOk() {
webTestClient.get()
.uri("/api/endpoint1?name={username}", "Tony")
.accept(MediaType.TEXT_PLAIN)
.exchange()
.expectBody(String.class).isEqualTo("Hello, Tony");
}
@Test
void givenErrorReturn_whenNoUsername_thenOk() {
webTestClient.get()
.uri("/api/endpoint1")
.accept(MediaType.TEXT_PLAIN)
.exchange()
.expectBody(String.class).isEqualTo("Hello, Stranger");
}
@Test
void givenResumeFallback_whenUsernamePresent_thenOk() {
webTestClient.get()
.uri("/api/endpoint2?name={username}", "Tony")
.accept(MediaType.TEXT_PLAIN)
.exchange()
.expectBody(String.class).isEqualTo("Hello, Tony");
}
@Test
void givenResumeFallback_whenNoUsername_thenOk() {
webTestClient.get()
.uri("/api/endpoint2")
.accept(MediaType.TEXT_PLAIN)
.exchange()
.expectBody(String.class).isEqualTo("Hello, Stranger");
}
@Test
void givenResumeDynamicValue_whenUsernamePresent_thenOk() {
webTestClient.get()
.uri("/api/endpoint3?name={username}", "Tony")
.accept(MediaType.TEXT_PLAIN)
.exchange()
.expectBody(String.class).isEqualTo("Hello, Tony");
}
@Test
void givenResumeDynamicValue_whenNoUsername_thenOk() {
webTestClient.get()
.uri("/api/endpoint3")
.accept(MediaType.TEXT_PLAIN)
.exchange()
.expectBody(String.class).isEqualTo("Hi, I looked around for your name but found: No value present");
}
@Test
void givenResumeRethrow_whenUsernamePresent_thenOk() {
webTestClient.get()
.uri("/api/endpoint4?name={username}", "Tony")
.accept(MediaType.TEXT_PLAIN)
.exchange()
.expectBody(String.class).isEqualTo("Hello, Tony");
}
@Test
void givenResumeRethrow_whenNoUsername_thenOk() {
webTestClient.get()
.uri("/api/endpoint4")
.accept(MediaType.TEXT_PLAIN)
.exchange()
.expectStatus().isBadRequest()
.expectHeader().contentType(MediaType.APPLICATION_JSON)
.expectBody().jsonPath("$.message").isEqualTo("please provide a name");
}
@Test
void givenGlobalErrorHandling_whenUsernamePresent_thenOk() {
webTestClient.get()
.uri("/api/endpoint5?name={username}", "Tony")
.accept(MediaType.TEXT_PLAIN)
.exchange()
.expectBody(String.class).isEqualTo("Hello, Tony");
}
@Test
void givenGlobalErrorHandling_whenNoUsername_thenOk() {
webTestClient.get()
.uri("/api/endpoint5")
.accept(MediaType.TEXT_PLAIN)
.exchange()
.expectStatus().isBadRequest()
.expectHeader().contentType(MediaType.APPLICATION_JSON)
.expectBody().jsonPath("$.message").isEqualTo("please provide a name");
}
}

View File

@@ -0,0 +1,124 @@
package com.baeldung.reactive.introduction;
import org.junit.jupiter.api.Test;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.ConnectableFlux;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
import java.util.ArrayList;
import java.util.List;
import static org.assertj.core.api.Assertions.assertThat;
class ReactorIntegrationTest {
private static final Logger LOGGER = LoggerFactory.getLogger(ReactorIntegrationTest.class);
@Test
void givenFlux_whenSubscribing_thenStream() {
List<Integer> elements = new ArrayList<>();
Flux.just(1, 2, 3, 4)
.log()
.map(i -> {
LOGGER.debug("{}:{}", i, Thread.currentThread());
return i * 2;
})
.subscribe(elements::add);
assertThat(elements).containsExactly(2, 4, 6, 8);
}
@Test
void givenFlux_whenZipping_thenCombine() {
List<String> elements = new ArrayList<>();
Flux.just(1, 2, 3, 4)
.log()
.map(i -> i * 2)
.zipWith(Flux.range(0, Integer.MAX_VALUE).log(), (one, two) -> String.format("First Flux: %d, Second Flux: %d", one, two))
.subscribe(elements::add);
assertThat(elements).containsExactly(
"First Flux: 2, Second Flux: 0",
"First Flux: 4, Second Flux: 1",
"First Flux: 6, Second Flux: 2",
"First Flux: 8, Second Flux: 3");
}
@Test
void givenFlux_whenApplyingBackPressure_thenPushElementsInBatches() {
List<Integer> elements = new ArrayList<>();
Flux.just(1, 2, 3, 4)
.log()
.subscribe(new Subscriber<Integer>() {
private Subscription s;
int onNextAmount;
@Override
public void onSubscribe(final Subscription s) {
this.s = s;
s.request(2);
}
@Override
public void onNext(final Integer integer) {
elements.add(integer);
onNextAmount++;
if (onNextAmount % 2 == 0) {
s.request(2);
}
}
@Override
public void onError(final Throwable t) {
}
@Override
public void onComplete() {
}
});
assertThat(elements).containsExactly(1, 2, 3, 4);
}
@Test
void givenFlux_whenInParallel_thenSubscribeInDifferentThreads() throws InterruptedException {
List<String> threadNames = new ArrayList<>();
Flux.just(1, 2, 3, 4)
.log()
.map(i -> Thread.currentThread().getName())
.subscribeOn(Schedulers.parallel())
.subscribe(threadNames::add);
Thread.sleep(1000);
assertThat(threadNames).isNotEmpty();
assertThat(threadNames).hasSize(4);
}
@Test
void givenConnectableFlux_whenConnected_thenShouldStream() {
List<Integer> elements = new ArrayList<>();
final ConnectableFlux<Integer> publish = Flux.just(1, 2, 3, 4).publish();
publish.subscribe(elements::add);
assertThat(elements).isEmpty();
publish.connect();
assertThat(elements).containsExactly(1, 2, 3, 4);
}
}

View File

@@ -0,0 +1,43 @@
package com.baeldung.reactive.security;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.context.ApplicationContext;
import org.springframework.security.test.context.support.WithMockUser;
import org.springframework.test.web.reactive.server.WebTestClient;
@SpringBootTest(classes = SpringSecurity5Application.class)
class SecurityIntegrationTest {
@Autowired
private ApplicationContext context;
private WebTestClient webTestClient;
@BeforeEach
void setup() {
webTestClient = WebTestClient.bindToApplicationContext(context)
.configureClient()
.build();
}
@Test
void whenNoCredentials_thenRedirectToLogin() {
webTestClient.get()
.uri("/")
.exchange()
.expectStatus().is3xxRedirection();
}
@Test
@WithMockUser
void whenHasCredentials_thenSeesGreeting() {
webTestClient.get()
.uri("/")
.exchange()
.expectStatus().isOk()
.expectBody(String.class).isEqualTo("Hello, user");
}
}

View File

@@ -0,0 +1,12 @@
package com.baeldung.reactive.webclient;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest(classes = WebClientApplication.class)
class SpringContextTest {
@Test
void whenSpringContextIsBootstrapped_thenNoExceptions() {
}
}

View File

@@ -0,0 +1,342 @@
package com.baeldung.reactive.webclient;
import io.netty.channel.ChannelOption;
import io.netty.handler.timeout.ReadTimeoutException;
import io.netty.handler.timeout.ReadTimeoutHandler;
import io.netty.handler.timeout.WriteTimeoutHandler;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.context.SpringBootTest.WebEnvironment;
import org.springframework.boot.web.server.LocalServerPort;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.core.codec.CodecException;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.ReactiveHttpOutputMessage;
import org.springframework.http.client.reactive.ClientHttpRequest;
import org.springframework.http.client.reactive.ReactorClientHttpConnector;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;
import org.springframework.web.reactive.function.BodyInserter;
import org.springframework.web.reactive.function.BodyInserters;
import org.springframework.web.reactive.function.client.WebClient;
import org.springframework.web.reactive.function.client.WebClient.RequestBodySpec;
import org.springframework.web.reactive.function.client.WebClient.RequestBodyUriSpec;
import org.springframework.web.reactive.function.client.WebClient.RequestHeadersSpec;
import org.springframework.web.reactive.function.client.WebClient.RequestHeadersUriSpec;
import org.springframework.web.reactive.function.client.WebClient.ResponseSpec;
import org.springframework.web.reactive.function.client.WebClientRequestException;
import reactor.core.publisher.Mono;
import reactor.netty.http.client.HttpClient;
import reactor.test.StepVerifier;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.time.ZonedDateTime;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import static org.assertj.core.api.Assertions.assertThat;
@SpringBootTest(classes = WebClientApplication.class, webEnvironment = WebEnvironment.RANDOM_PORT)
class WebClientIntegrationTest {
private static final String BODY_VALUE = "bodyValue";
private static final ParameterizedTypeReference<Map<String, String>> MAP_RESPONSE_REF = new ParameterizedTypeReference<Map<String, String>>() {
};
@LocalServerPort
private int port;
@Test
void givenDifferentWebClientCreationMethods_whenUsed_thenObtainExpectedResponse() {
WebClient client1 = WebClient.builder().clientConnector(httpConnector()).build();
StepVerifier.create(retrieveResponse(client1.post()
.uri("http://localhost:" + port + "/resource")))
.expectNext("processed-bodyValue")
.verifyComplete();
WebClient client2 = WebClient.builder().baseUrl("http://localhost:" + port)
.clientConnector(httpConnector()).build();
StepVerifier.create(retrieveResponse(client2))
.expectNext("processed-bodyValue")
.verifyComplete();
WebClient client3 = WebClient.builder()
.baseUrl("http://localhost:" + port)
.clientConnector(httpConnector())
.defaultCookie("cookieKey", "cookieValue")
.defaultHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
.defaultUriVariables(Collections.singletonMap("url", "http://localhost:8080"))
.build();
StepVerifier.create(retrieveResponse(client3))
.expectNext("processed-bodyValue")
.verifyComplete();
}
@Test
void givenDifferentMethodSpecifications_whenUsed_thenObtainExpectedResponse() {
RequestBodyUriSpec uriSpecPost1 = createDefaultClient().method(HttpMethod.POST);
StepVerifier.create(retrieveResponse(uriSpecPost1))
.expectNext("processed-bodyValue")
.verifyComplete();
RequestBodyUriSpec uriSpecPost2 = createDefaultClient().post();
StepVerifier.create(retrieveResponse(uriSpecPost2))
.expectNext("processed-bodyValue")
.verifyComplete();
RequestHeadersUriSpec<?> requestGet = createDefaultClient().get();
StepVerifier.create(retrieveGetResponse(requestGet))
.expectNextMatches(nextMap -> nextMap.get("field")
.equals("value"))
.verifyComplete();
}
@Test
void givenDifferentUriSpecifications_whenUsed_thenObtainExpectedResponse() {
RequestBodySpec bodySpecUsingString = createDefaultPostRequest().uri("/resource");
StepVerifier.create(retrieveResponse(bodySpecUsingString))
.expectNext("processed-bodyValue")
.verifyComplete();
RequestBodySpec bodySpecUsingUriBuilder = createDefaultPostRequest().uri(
uriBuilder -> uriBuilder.pathSegment("resource")
.build());
StepVerifier.create(retrieveResponse(bodySpecUsingUriBuilder))
.expectNext("processed-bodyValue")
.verifyComplete();
RequestBodySpec bodySpecusingURI = createDefaultPostRequest().uri(
URI.create("http://localhost:" + port + "/resource"));
StepVerifier.create(retrieveResponse(bodySpecusingURI))
.expectNext("processed-bodyValue")
.verifyComplete();
}
@Test
void givenOverriddenUriSpecifications_whenUsed_thenObtainExpectedResponse() {
RequestBodySpec bodySpecOverriddenBaseUri = createDefaultPostRequest()
.uri(URI.create("http://localhost:" + port + "/resource-override"));
StepVerifier.create(retrieveResponse(bodySpecOverriddenBaseUri))
.expectNext("override-processed-bodyValue")
.verifyComplete();
RequestBodySpec bodySpecOverriddenBaseUri2 = WebClient.builder()
.clientConnector(httpConnector())
.baseUrl("http://localhost:" + port)
.build()
.post()
.uri(URI.create("http://localhost:" + port + "/resource-override"));
StepVerifier.create(retrieveResponse(bodySpecOverriddenBaseUri2))
.expectNext("override-processed-bodyValue")
.verifyComplete();
}
@Test
void givenDifferentBodySpecifications_whenUsed_thenObtainExpectedResponse() {
// request body specifications
RequestHeadersSpec<?> headersSpecPost1 = createDefaultPostResourceRequest().body(
BodyInserters.fromPublisher(Mono.just(BODY_VALUE), String.class));
RequestHeadersSpec<?> headersSpecPost2 = createDefaultPostResourceRequest().body(
BodyInserters.fromValue(BODY_VALUE));
RequestHeadersSpec<?> headersSpecPost3 = createDefaultPostResourceRequest().bodyValue(BODY_VALUE);
RequestHeadersSpec<?> headersSpecFooPost = createDefaultPostRequest().uri("/resource-foo")
.body(Mono.just(new Foo("fooName")), Foo.class);
BodyInserter<Object, ReactiveHttpOutputMessage> inserterPlainObject = BodyInserters.fromValue(new Object());
RequestHeadersSpec<?> headersSpecPlainObject = createDefaultPostResourceRequest().body(inserterPlainObject);
// request body specifications - using other inserter method (multipart request)
LinkedMultiValueMap<String, String> map = new LinkedMultiValueMap<>();
map.add("key1", "multipartValue1");
map.add("key2", "multipartValue2");
BodyInserter<MultiValueMap<String, Object>, ClientHttpRequest> inserterMultipart = BodyInserters.fromMultipartData(
map);
RequestHeadersSpec<?> headersSpecInserterMultipart = createDefaultPostRequest().uri("/resource-multipart")
.body(inserterMultipart);
// response assertions
StepVerifier.create(retrieveResponse(headersSpecPost1))
.expectNext("processed-bodyValue")
.verifyComplete();
StepVerifier.create(retrieveResponse(headersSpecPost2))
.expectNext("processed-bodyValue")
.verifyComplete();
StepVerifier.create(retrieveResponse(headersSpecPost3))
.expectNext("processed-bodyValue")
.verifyComplete();
StepVerifier.create(retrieveResponse(headersSpecFooPost))
.expectNext("processedFoo-fooName")
.verifyComplete();
StepVerifier.create(retrieveResponse(headersSpecInserterMultipart))
.expectNext("processed-multipartValue1-multipartValue2")
.verifyComplete();
// assert error plain `new Object()` as request body
StepVerifier.create(retrieveResponse(headersSpecPlainObject))
.expectError(CodecException.class)
.verify();
// assert response for request without body
Mono<Map<String, String>> responsePostWithNoBody = createDefaultPostResourceRequest().exchangeToMono(
responseHandler -> {
assertThat(responseHandler.statusCode()).isEqualTo(HttpStatus.BAD_REQUEST);
return responseHandler.bodyToMono(MAP_RESPONSE_REF);
});
StepVerifier.create(responsePostWithNoBody)
.expectNextMatches(nextMap -> nextMap.get("error")
.equals("Bad Request"))
.verifyComplete();
}
@Test
void givenPostSpecifications_whenHeadersAdded_thenObtainExpectedResponse() {
// request header specification
RequestHeadersSpec<?> headersSpecInserterStringWithHeaders = createDefaultPostResourceRequestResponse().header(
HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
.accept(MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML)
.acceptCharset(StandardCharsets.UTF_8)
.ifNoneMatch("*")
.ifModifiedSince(ZonedDateTime.now());
// response assertions
StepVerifier.create(retrieveResponse(headersSpecInserterStringWithHeaders))
.expectNext("processed-bodyValue")
.verifyComplete();
}
@Test
void givenDifferentResponseSpecifications_whenUsed_thenObtainExpectedResponse() {
ResponseSpec responseSpecPostString = createDefaultPostResourceRequestResponse().retrieve();
Mono<String> responsePostString = responseSpecPostString.bodyToMono(String.class);
Mono<String> responsePostString2 = createDefaultPostResourceRequestResponse().exchangeToMono(response -> {
if (response.statusCode() == HttpStatus.OK) {
return response.bodyToMono(String.class);
} else if (response.statusCode().is4xxClientError()) {
return Mono.just("Error response");
} else {
return response.createException()
.flatMap(Mono::error);
}
});
Mono<String> responsePostNoBody = createDefaultPostResourceRequest().exchangeToMono(response -> {
if (response.statusCode() == HttpStatus.OK) {
return response.bodyToMono(String.class);
} else if (response.statusCode().is4xxClientError()) {
return Mono.just("Error response");
} else {
return response.createException()
.flatMap(Mono::error);
}
});
Mono<Map<String, String>> responseGet = createDefaultClient().get()
.uri("/resource")
.retrieve()
.bodyToMono(MAP_RESPONSE_REF);
// response assertions
StepVerifier.create(responsePostString)
.expectNext("processed-bodyValue")
.verifyComplete();
StepVerifier.create(responsePostString2)
.expectNext("processed-bodyValue")
.verifyComplete();
StepVerifier.create(responsePostNoBody)
.expectNext("Error response")
.verifyComplete();
StepVerifier.create(responseGet)
.expectNextMatches(nextMap -> nextMap.get("field")
.equals("value"))
.verifyComplete();
}
@Test
void givenWebClientWithTimeoutConfigurations_whenRequestUsingWronglyConfiguredPublisher_thenObtainTimeout() {
HttpClient httpClient = HttpClient.create()
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 1000)
.responseTimeout(Duration.ofMillis(1000))
.doOnConnected(conn -> conn.addHandlerLast(new ReadTimeoutHandler(1000, TimeUnit.MILLISECONDS))
.addHandlerLast(new WriteTimeoutHandler(1000, TimeUnit.MILLISECONDS)));
WebClient timeoutClient = WebClient.builder()
.baseUrl("http://localhost:" + port)
.clientConnector(new ReactorClientHttpConnector(httpClient))
.build();
RequestHeadersSpec<?> neverendingMonoBodyRequest = timeoutClient.post()
.uri("/resource")
.body(Mono.never(), String.class);
StepVerifier.create(neverendingMonoBodyRequest.retrieve()
.bodyToMono(String.class))
.expectErrorMatches(ex -> WebClientRequestException.class.isAssignableFrom(ex.getClass())
&& ReadTimeoutException.class.isAssignableFrom(ex.getCause().getClass()))
.verify();
}
// helper methods to create default instances
private WebClient createDefaultClient() {
return WebClient.builder()
.baseUrl("http://localhost:" + port)
.clientConnector(httpConnector())
.build();
}
private static ReactorClientHttpConnector httpConnector() {
HttpClient httpClient = HttpClient
.create()
.wiretap(true);
return new ReactorClientHttpConnector(httpClient);
}
private RequestBodyUriSpec createDefaultPostRequest() {
return createDefaultClient().post();
}
private RequestBodySpec createDefaultPostResourceRequest() {
return createDefaultPostRequest().uri("/resource");
}
private RequestHeadersSpec<?> createDefaultPostResourceRequestResponse() {
return createDefaultPostResourceRequest().bodyValue(BODY_VALUE);
}
// helper methods to retrieve a response based on different steps of the process (specs)
private Mono<String> retrieveResponse(WebClient client) {
return client.post()
.uri("/resource")
.bodyValue(BODY_VALUE)
.retrieve()
.bodyToMono(String.class);
}
private Mono<String> retrieveResponse(RequestBodyUriSpec spec) {
return spec.uri("/resource")
.bodyValue(BODY_VALUE)
.retrieve()
.bodyToMono(String.class);
}
private Mono<Map<String, String>> retrieveGetResponse(RequestHeadersUriSpec<?> spec) {
return spec.uri("/resource")
.retrieve()
.bodyToMono(MAP_RESPONSE_REF);
}
private Mono<String> retrieveResponse(RequestBodySpec spec) {
return spec.bodyValue(BODY_VALUE)
.retrieve()
.bodyToMono(String.class);
}
private Mono<String> retrieveResponse(RequestHeadersSpec<?> spec) {
return spec.retrieve()
.bodyToMono(String.class);
}
}

View File

@@ -0,0 +1,49 @@
package com.baeldung.reactive.webclient;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.context.SpringBootTest.WebEnvironment;
import org.springframework.boot.web.server.LocalServerPort;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.web.reactive.server.WebTestClient;
import static org.springframework.test.annotation.DirtiesContext.ClassMode.BEFORE_CLASS;
@DirtiesContext(classMode = BEFORE_CLASS)
@SpringBootTest(webEnvironment = WebEnvironment.RANDOM_PORT, classes = WebClientApplication.class)
class WebControllerIntegrationTest {
@LocalServerPort
private int randomServerPort;
@Autowired
private WebTestClient testClient;
@Autowired
private WebController webController;
@BeforeEach
void setup() {
webController.setServerPort(randomServerPort);
}
@Test
void whenEndpointWithBlockingClientIsCalled_thenThreeTweetsAreReceived() {
testClient.get()
.uri("/tweets-blocking")
.exchange()
.expectStatus().isOk()
.expectBodyList(Tweet.class).hasSize(3);
}
@Test
void whenEndpointWithNonBlockingClientIsCalled_thenThreeTweetsAreReceived() {
testClient.get()
.uri("/tweets-non-blocking")
.exchange()
.expectStatus().isOk()
.expectBodyList(Tweet.class).hasSize(3);
}
}

View File

@@ -0,0 +1,87 @@
package com.baeldung.reactive.webclient;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.context.SpringBootTest.WebEnvironment;
import org.springframework.boot.web.server.LocalServerPort;
import org.springframework.context.ApplicationContext;
import org.springframework.security.test.context.support.WithMockUser;
import org.springframework.test.web.reactive.server.WebTestClient;
import org.springframework.web.reactive.function.server.RequestPredicates;
import org.springframework.web.reactive.function.server.RouterFunction;
import org.springframework.web.reactive.function.server.RouterFunctions;
import org.springframework.web.reactive.function.server.ServerResponse;
import org.springframework.web.server.WebHandler;
import reactor.core.publisher.Mono;
@SpringBootTest(classes = WebClientApplication.class, webEnvironment = WebEnvironment.RANDOM_PORT)
class WebTestClientIntegrationTest {
@LocalServerPort
private int port;
@Autowired
private ApplicationContext context;
@Autowired
private WebClientController controller;
@Test
void whenBindToWebHandler_thenRequestProcessed() {
WebHandler webHandler = exchange -> Mono.empty();
WebTestClient.bindToWebHandler(webHandler)
.build()
.get()
.exchange()
.expectBody().isEmpty();
}
@Test
void whenBindToRouter_thenRequestProcessed() {
RouterFunction<ServerResponse> routerFunction = RouterFunctions.route(
RequestPredicates.GET("/resource"),
request -> ServerResponse.ok().build()
);
WebTestClient.bindToRouterFunction(routerFunction)
.build()
.get().uri("/resource")
.exchange()
.expectStatus().isOk()
.expectBody().isEmpty();
}
@Test
@WithMockUser
void whenBindToServer_thenRequestProcessed() {
WebTestClient.bindToServer()
.baseUrl("http://localhost:" + port).build()
.get().uri("/resource")
.exchange()
.expectStatus().isOk()
.expectBody().jsonPath("field").isEqualTo("value");
}
@Test
@WithMockUser
void whenBindToApplicationContext_thenRequestProcessed() {
WebTestClient.bindToApplicationContext(context)
.build()
.get().uri("/resource")
.exchange()
.expectStatus().isOk()
.expectBody().jsonPath("field").isEqualTo("value");
}
@Test
void whenBindToController_thenRequestProcessed() {
WebTestClient.bindToController(controller)
.build()
.get().uri("/resource")
.exchange()
.expectStatus().isOk()
.expectBody().jsonPath("field").isEqualTo("value");
}
}

View File

@@ -0,0 +1,191 @@
package com.baeldung.reactive.webclientrequests;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import org.mockito.Mock;
import org.springframework.boot.test.autoconfigure.web.reactive.WebFluxTest;
import org.springframework.web.reactive.function.client.ClientRequest;
import org.springframework.web.reactive.function.client.ClientResponse;
import org.springframework.web.reactive.function.client.ExchangeFunction;
import org.springframework.web.reactive.function.client.WebClient;
import org.springframework.web.util.DefaultUriBuilderFactory;
import reactor.core.publisher.Mono;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
@WebFluxTest
class WebClientRequestsWithParametersUnitTest {
private static final String BASE_URL = "https://example.com";
private WebClient webClient;
@Captor
private ArgumentCaptor<ClientRequest> argumentCaptor;
@Mock
private ExchangeFunction exchangeFunction;
@BeforeEach
void init() {
ClientResponse mockResponse = mock(ClientResponse.class);
when(mockResponse.bodyToMono(String.class)).thenReturn(Mono.just("test"));
when(exchangeFunction.exchange(argumentCaptor.capture())).thenReturn(Mono.just(mockResponse));
webClient = WebClient
.builder()
.baseUrl(BASE_URL)
.exchangeFunction(exchangeFunction)
.build();
}
@Test
void whenCallSimpleURI_thenURIMatched() {
webClient.get()
.uri("/products")
.retrieve()
.bodyToMono(String.class)
.block();
verifyCalledUrl("/products");
}
@Test
void whenCallSinglePathSegmentUri_thenURIMatched() {
webClient.get()
.uri(uriBuilder -> uriBuilder
.path("/products/{id}")
.build(2))
.retrieve()
.bodyToMono(String.class)
.block();
verifyCalledUrl("/products/2");
}
@Test
void whenCallMultiplePathSegmentsUri_thenURIMatched() {
webClient.get()
.uri(uriBuilder -> uriBuilder
.path("/products/{id}/attributes/{attributeId}")
.build(2, 13))
.retrieve()
.bodyToMono(String.class)
.block();
verifyCalledUrl("/products/2/attributes/13");
}
@Test
void whenCallSingleQueryParams_thenURIMatched() {
webClient.get()
.uri(uriBuilder -> uriBuilder
.path("/products/")
.queryParam("name", "AndroidPhone")
.queryParam("color", "black")
.queryParam("deliveryDate", "13/04/2019")
.build())
.retrieve()
.bodyToMono(String.class)
.block();
verifyCalledUrl("/products/?name=AndroidPhone&color=black&deliveryDate=13/04/2019");
}
@Test
void whenCallSingleQueryParamsPlaceholders_thenURIMatched() {
webClient.get()
.uri(uriBuilder -> uriBuilder
.path("/products/")
.queryParam("name", "{title}")
.queryParam("color", "{authorId}")
.queryParam("deliveryDate", "{date}")
.build("AndroidPhone", "black", "13/04/2019"))
.retrieve()
.bodyToMono(String.class)
.block();
verifyCalledUrl("/products/?name=AndroidPhone&color=black&deliveryDate=13%2F04%2F2019");
}
@Test
void whenCallArrayQueryParamsBrackets_thenURIMatched() {
webClient.get()
.uri(uriBuilder -> uriBuilder
.path("/products/")
.queryParam("tag[]", "Snapdragon", "NFC")
.build())
.retrieve()
.bodyToMono(String.class)
.block();
verifyCalledUrl("/products/?tag%5B%5D=Snapdragon&tag%5B%5D=NFC");
}
@Test
void whenCallArrayQueryParams_thenURIMatched() {
webClient.get()
.uri(uriBuilder -> uriBuilder
.path("/products/")
.queryParam("category", "Phones", "Tablets")
.build())
.retrieve()
.bodyToMono(String.class)
.block();
verifyCalledUrl("/products/?category=Phones&category=Tablets");
}
@Test
void whenCallArrayQueryParamsComma_thenURIMatched() {
webClient.get()
.uri(uriBuilder -> uriBuilder
.path("/products/")
.queryParam("category", String.join(",", "Phones", "Tablets"))
.build())
.retrieve()
.bodyToMono(String.class)
.block();
verifyCalledUrl("/products/?category=Phones,Tablets");
}
@Test
void whenUriComponentEncoding_thenQueryParamsNotEscaped() {
DefaultUriBuilderFactory factory = new DefaultUriBuilderFactory(BASE_URL);
factory.setEncodingMode(DefaultUriBuilderFactory.EncodingMode.URI_COMPONENT);
webClient = WebClient
.builder()
.uriBuilderFactory(factory)
.baseUrl(BASE_URL)
.exchangeFunction(exchangeFunction)
.build();
webClient.get()
.uri(uriBuilder -> uriBuilder
.path("/products/")
.queryParam("name", "AndroidPhone")
.queryParam("color", "black")
.queryParam("deliveryDate", "13/04/2019")
.build())
.retrieve()
.bodyToMono(String.class)
.block();
verifyCalledUrl("/products/?name=AndroidPhone&color=black&deliveryDate=13/04/2019");
}
private void verifyCalledUrl(String relativeUrl) {
ClientRequest request = argumentCaptor.getValue();
assertEquals(String.format("%s%s", BASE_URL, relativeUrl), request.url().toString());
verify(exchangeFunction).exchange(request);
verifyNoMoreInteractions(exchangeFunction);
}
}

View File

@@ -0,0 +1,93 @@
package com.baeldung.reactive.webflux.annotation;
import com.baeldung.reactive.webflux.Employee;
import com.baeldung.reactive.webflux.EmployeeRepository;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.mock.mockito.MockBean;
import org.springframework.security.test.context.support.WithMockUser;
import org.springframework.test.web.reactive.server.WebTestClient;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.util.Arrays;
import java.util.List;
import static org.mockito.BDDMockito.given;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;
import static org.springframework.boot.test.context.SpringBootTest.WebEnvironment.RANDOM_PORT;
@SpringBootTest(webEnvironment = RANDOM_PORT, classes = EmployeeSpringApplication.class)
class EmployeeControllerIntegrationTest {
@Autowired
private WebTestClient testClient;
@MockBean
private EmployeeRepository employeeRepository;
@Test
void givenEmployeeId_whenGetEmployeeById_thenCorrectEmployee() {
Employee employee = new Employee("1", "Employee 1 Name");
given(employeeRepository.findEmployeeById("1")).willReturn(Mono.just(employee));
testClient.get()
.uri("/employees/1")
.exchange()
.expectStatus().isOk()
.expectBody(Employee.class).isEqualTo(employee);
}
@Test
void whenGetAllEmployees_thenCorrectEmployees() {
List<Employee> employeeList = Arrays.asList(
new Employee("1", "Employee 1 Name"),
new Employee("2", "Employee 2 Name"),
new Employee("3", "Employee 3 Name")
);
Flux<Employee> employeeFlux = Flux.fromIterable(employeeList);
given(employeeRepository.findAllEmployees()).willReturn(employeeFlux);
testClient.get()
.uri("/employees")
.exchange()
.expectStatus().isOk()
.expectBodyList(Employee.class).isEqualTo(employeeList);
}
@Test
@WithMockUser(username = "admin", roles = { "ADMIN" })
void givenValidUser_whenUpdateEmployee_thenEmployeeUpdated() {
Employee employee = new Employee("10", "Employee 10 Updated");
given(employeeRepository.updateEmployee(employee)).willReturn(Mono.just(employee));
testClient.post()
.uri("/employees/update")
.body(Mono.just(employee), Employee.class)
.exchange()
.expectStatus().isOk()
.expectBody(Employee.class).isEqualTo(employee);
verify(employeeRepository).updateEmployee(employee);
}
@Test
@WithMockUser
void givenInvalidUser_whenUpdateEmployee_thenForbidden() {
Employee employee = new Employee("10", "Employee 10 Updated");
testClient.post()
.uri("/employees/update")
.body(Mono.just(employee), Employee.class)
.exchange()
.expectStatus().isForbidden();
verifyNoInteractions(employeeRepository);
}
}

View File

@@ -0,0 +1,82 @@
package com.baeldung.reactive.webflux.functional;
import com.baeldung.reactive.webflux.Employee;
import com.baeldung.reactive.webflux.EmployeeRepository;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.mock.mockito.MockBean;
import org.springframework.test.web.reactive.server.WebTestClient;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.util.Arrays;
import java.util.List;
import static org.mockito.BDDMockito.given;
import static org.mockito.Mockito.verify;
import static org.springframework.boot.test.context.SpringBootTest.WebEnvironment.RANDOM_PORT;
@SpringBootTest(webEnvironment = RANDOM_PORT, classes = EmployeeSpringFunctionalApplication.class)
class EmployeeSpringFunctionalIntegrationTest {
@Autowired
private EmployeeFunctionalConfig config;
@MockBean
private EmployeeRepository employeeRepository;
@Test
void givenEmployeeId_whenGetEmployeeById_thenCorrectEmployee() {
WebTestClient client = WebTestClient.bindToRouterFunction(config.getEmployeeByIdRoute())
.build();
Employee employee = new Employee("1", "Employee 1");
given(employeeRepository.findEmployeeById("1")).willReturn(Mono.just(employee));
client.get()
.uri("/employees/1")
.exchange()
.expectStatus()
.isOk()
.expectBody(Employee.class)
.isEqualTo(employee);
}
@Test
void whenGetAllEmployees_thenCorrectEmployees() {
WebTestClient client = WebTestClient.bindToRouterFunction(config.getAllEmployeesRoute())
.build();
List<Employee> employees = Arrays.asList(new Employee("1", "Employee 1"), new Employee("2", "Employee 2"));
Flux<Employee> employeeFlux = Flux.fromIterable(employees);
given(employeeRepository.findAllEmployees()).willReturn(employeeFlux);
client.get()
.uri("/employees")
.exchange()
.expectStatus()
.isOk()
.expectBodyList(Employee.class)
.isEqualTo(employees);
}
@Test
void whenUpdateEmployee_thenEmployeeUpdated() {
WebTestClient client = WebTestClient.bindToRouterFunction(config.updateEmployeeRoute())
.build();
Employee employee = new Employee("1", "Employee 1 Updated");
client.post()
.uri("/employees/update")
.body(Mono.just(employee), Employee.class)
.exchange()
.expectStatus()
.isOk();
verify(employeeRepository).updateEmployee(employee);
}
}

View File

@@ -0,0 +1,19 @@
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<include
resource="org/springframework/boot/logging/logback/base.xml" />
<appender name="LISTAPPENDER"
class="com.baeldung.reactive.debugging.consumer.utils.ListAppender">
</appender>
<logger
name="com.baeldung.reactive.debugging.consumer.service.FooService">
<appender-ref ref="LISTAPPENDER" />
</logger>
<logger name="reactor.netty.http.client" level="INFO"/>
<root level="info">
<appender-ref ref="CONSOLE" />
<appender-ref ref="LISTAPPENDER" />
</root>
</configuration>