diff --git a/spring-reactive-modules/pom.xml b/spring-reactive-modules/pom.xml index ea236df1d8..595ae9e211 100644 --- a/spring-reactive-modules/pom.xml +++ b/spring-reactive-modules/pom.xml @@ -22,6 +22,7 @@ spring-5-reactive-2 spring-5-reactive-3 spring-5-reactive-client + spring-5-reactive-client-2 spring-5-reactive-oauth spring-5-reactive-security spring-reactive @@ -61,4 +62,4 @@ - \ No newline at end of file + diff --git a/spring-reactive-modules/spring-5-reactive-client-2/README.md b/spring-reactive-modules/spring-5-reactive-client-2/README.md new file mode 100644 index 0000000000..341271e5ad --- /dev/null +++ b/spring-reactive-modules/spring-5-reactive-client-2/README.md @@ -0,0 +1,9 @@ +## Spring REST Example Project + +This module contains articles about reactive Spring 5 WebClient + +### The Course +The "REST With Spring" Classes: http://bit.ly/restwithspring + +### Relevant Articles +- More articles: [[<-- prev]](../spring-5-reactive-client) diff --git a/spring-reactive-modules/spring-5-reactive-client-2/pom.xml b/spring-reactive-modules/spring-5-reactive-client-2/pom.xml new file mode 100644 index 0000000000..23921f3df6 --- /dev/null +++ b/spring-reactive-modules/spring-5-reactive-client-2/pom.xml @@ -0,0 +1,146 @@ + + + 4.0.0 + spring-5-reactive-client-2 + spring-5-reactive-client-2 + jar + spring 5 sample project about new features + + + com.baeldung.spring.reactive + spring-reactive-modules + 1.0.0-SNAPSHOT + + + + + org.springframework.boot + spring-boot-starter-validation + + + org.springframework.boot + spring-boot-starter-web + + + org.springframework.boot + spring-boot-starter-webflux + + + org.projectreactor + reactor-spring + ${reactor-spring.version} + + + javax.json.bind + javax.json.bind-api + + + io.github.resilience4j + resilience4j-reactor + ${resilience4j.version} + + + io.github.resilience4j + resilience4j-ratelimiter + ${resilience4j.version} + + + com.google.guava + guava + ${guava.version} + + + + org.springframework.boot + spring-boot-devtools + runtime + + + org.springframework + spring-test + test + + + org.springframework.boot + spring-boot-starter-test + test + + + org.mockito + mockito-junit-jupiter + test + + + io.projectreactor + reactor-test + test + + + org.eclipse.jetty + jetty-reactive-httpclient + test + + + + + + + org.springframework.boot + spring-boot-maven-plugin + + + org.apache.maven.plugins + maven-compiler-plugin + + 1.8 + 1.8 + + + + + + + + integration-lite-first + + + + org.apache.maven.plugins + maven-surefire-plugin + + + ${project.basedir}/src/test/resources/logback-test.xml + + + + + + + + integration-lite-second + + + + org.apache.maven.plugins + maven-surefire-plugin + + + ${project.basedir}/src/test/resources/logback-test.xml + + + + + + + + + + 1.0.1.RELEASE + 1.1.6 + 3.2.10.RELEASE + 1.7.1 + + + \ No newline at end of file diff --git a/spring-reactive-modules/spring-5-reactive-client-2/src/main/java/com/baeldung/limitrequests/LimitRequestsApp.java b/spring-reactive-modules/spring-5-reactive-client-2/src/main/java/com/baeldung/limitrequests/LimitRequestsApp.java new file mode 100644 index 0000000000..682746f684 --- /dev/null +++ b/spring-reactive-modules/spring-5-reactive-client-2/src/main/java/com/baeldung/limitrequests/LimitRequestsApp.java @@ -0,0 +1,12 @@ +package com.baeldung.limitrequests; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +@SpringBootApplication +public class LimitRequestsApp { + + public static void main(String[] args) { + SpringApplication.run(LimitRequestsApp.class, args); + } +} diff --git a/spring-reactive-modules/spring-5-reactive-client-2/src/main/java/com/baeldung/limitrequests/client/DelayElements.java b/spring-reactive-modules/spring-5-reactive-client-2/src/main/java/com/baeldung/limitrequests/client/DelayElements.java new file mode 100644 index 0000000000..36488edcf9 --- /dev/null +++ b/spring-reactive-modules/spring-5-reactive-client-2/src/main/java/com/baeldung/limitrequests/client/DelayElements.java @@ -0,0 +1,33 @@ +package com.baeldung.limitrequests.client; + +import java.time.Duration; + +import org.springframework.web.reactive.function.client.WebClient; + +import com.baeldung.limitrequests.client.utils.Client; +import com.baeldung.limitrequests.client.utils.RandomConsumer; + +import reactor.core.publisher.Flux; + +public class DelayElements { + + private DelayElements() { + } + + public static Flux fetch(WebClient client, int requests, int delay) { + String clientId = Client.id(requests, DelayElements.class, delay); + + return Flux.range(1, requests) + .log() + .delayElements(Duration.ofMillis(delay)) + .flatMap(i -> RandomConsumer.get(client, clientId)); + } + + public static void main(String[] args) { + String baseUrl = args[0]; + WebClient client = WebClient.create(baseUrl); + + fetch(client, 12, 1050).doOnNext(System.out::println) + .blockLast(); + } +} diff --git a/spring-reactive-modules/spring-5-reactive-client-2/src/main/java/com/baeldung/limitrequests/client/GuavaRateLimit.java b/spring-reactive-modules/spring-5-reactive-client-2/src/main/java/com/baeldung/limitrequests/client/GuavaRateLimit.java new file mode 100644 index 0000000000..b0aa8d547f --- /dev/null +++ b/spring-reactive-modules/spring-5-reactive-client-2/src/main/java/com/baeldung/limitrequests/client/GuavaRateLimit.java @@ -0,0 +1,37 @@ +package com.baeldung.limitrequests.client; + +import org.springframework.web.reactive.function.client.WebClient; + +import com.baeldung.limitrequests.client.utils.Client; +import com.baeldung.limitrequests.client.utils.RandomConsumer; +import com.google.common.util.concurrent.RateLimiter; + +import reactor.core.publisher.Flux; + +public class GuavaRateLimit { + + private GuavaRateLimit() { + } + + public static Flux fetch(WebClient client, int requests, double limit) { + String clientId = Client.id(requests, GuavaRateLimit.class, limit); + + RateLimiter limiter = RateLimiter.create(limit); + + return Flux.range(1, requests) + .log() + .flatMap(i -> { + limiter.acquire(); + + return RandomConsumer.get(client, clientId); + }); + } + + public static void main(String[] args) throws InterruptedException { + String baseUrl = args[0]; + WebClient client = WebClient.create(baseUrl); + + fetch(client, 20, 2).doOnNext(System.out::println) + .blockLast(); + } +} diff --git a/spring-reactive-modules/spring-5-reactive-client-2/src/main/java/com/baeldung/limitrequests/client/LimitConcurrency.java b/spring-reactive-modules/spring-5-reactive-client-2/src/main/java/com/baeldung/limitrequests/client/LimitConcurrency.java new file mode 100644 index 0000000000..f95195b5f3 --- /dev/null +++ b/spring-reactive-modules/spring-5-reactive-client-2/src/main/java/com/baeldung/limitrequests/client/LimitConcurrency.java @@ -0,0 +1,30 @@ +package com.baeldung.limitrequests.client; + +import org.springframework.web.reactive.function.client.WebClient; + +import com.baeldung.limitrequests.client.utils.Client; +import com.baeldung.limitrequests.client.utils.RandomConsumer; + +import reactor.core.publisher.Flux; + +public class LimitConcurrency { + + private LimitConcurrency() { + } + + public static Flux fetch(WebClient client, int requests, int concurrency) { + String clientId = Client.id(requests, LimitConcurrency.class.getSimpleName(), concurrency); + + return Flux.range(1, requests) + .log() + .flatMap(i -> RandomConsumer.get(client, clientId), concurrency); + } + + public static void main(String[] args) throws InterruptedException { + String baseUrl = args[0]; + WebClient client = WebClient.create(baseUrl); + + fetch(client, 12, 5).doOnNext(System.out::println) + .blockLast(); + } +} diff --git a/spring-reactive-modules/spring-5-reactive-client-2/src/main/java/com/baeldung/limitrequests/client/Resilience4jRateLimit.java b/spring-reactive-modules/spring-5-reactive-client-2/src/main/java/com/baeldung/limitrequests/client/Resilience4jRateLimit.java new file mode 100644 index 0000000000..d25c422d03 --- /dev/null +++ b/spring-reactive-modules/spring-5-reactive-client-2/src/main/java/com/baeldung/limitrequests/client/Resilience4jRateLimit.java @@ -0,0 +1,42 @@ +package com.baeldung.limitrequests.client; + +import java.time.Duration; + +import org.springframework.web.reactive.function.client.WebClient; + +import com.baeldung.limitrequests.client.utils.Client; +import com.baeldung.limitrequests.client.utils.RandomConsumer; + +import io.github.resilience4j.ratelimiter.RateLimiter; +import io.github.resilience4j.ratelimiter.RateLimiterConfig; +import io.github.resilience4j.reactor.ratelimiter.operator.RateLimiterOperator; +import reactor.core.publisher.Flux; + +public class Resilience4jRateLimit { + + private Resilience4jRateLimit() { + } + + public static Flux fetch(WebClient client, int requests, int concurrency, int interval) { + RateLimiter limiter = RateLimiter.of("my-rate-limiter", RateLimiterConfig.custom() + .limitRefreshPeriod(Duration.ofMillis(interval)) + .limitForPeriod(concurrency) + .timeoutDuration(Duration.ofMillis((long) interval * concurrency)) + .build()); + + String clientId = Client.id(requests, Resilience4jRateLimit.class, concurrency, interval); + + return Flux.range(1, requests) + .log() + .flatMap(i -> RandomConsumer. get(client, clientId) + .transformDeferred(RateLimiterOperator.of(limiter))); + } + + public static void main(String[] args) { + String baseUrl = args[0]; + WebClient client = WebClient.create(baseUrl); + + fetch(client, 10, 5, 2500).doOnNext(System.out::println) + .blockLast(); + } +} \ No newline at end of file diff --git a/spring-reactive-modules/spring-5-reactive-client-2/src/main/java/com/baeldung/limitrequests/client/ZipWithInterval.java b/spring-reactive-modules/spring-5-reactive-client-2/src/main/java/com/baeldung/limitrequests/client/ZipWithInterval.java new file mode 100644 index 0000000000..326154df22 --- /dev/null +++ b/spring-reactive-modules/spring-5-reactive-client-2/src/main/java/com/baeldung/limitrequests/client/ZipWithInterval.java @@ -0,0 +1,33 @@ +package com.baeldung.limitrequests.client; + +import java.time.Duration; + +import org.springframework.web.reactive.function.client.WebClient; + +import com.baeldung.limitrequests.client.utils.Client; +import com.baeldung.limitrequests.client.utils.RandomConsumer; + +import reactor.core.publisher.Flux; + +public class ZipWithInterval { + + private ZipWithInterval() { + } + + public static Flux fetch(WebClient client, int requests, int delay) { + String clientId = Client.id(requests, ZipWithInterval.class, delay); + + return Flux.range(1, requests) + .log() + .zipWith(Flux.interval(Duration.ofMillis(delay))) + .flatMap(i -> RandomConsumer.get(client, clientId)); + } + + public static void main(String[] args) { + String baseUrl = args[0]; + WebClient client = WebClient.create(baseUrl); + + fetch(client, 15, 1100).doOnNext(System.out::println) + .blockLast(); + } +} diff --git a/spring-reactive-modules/spring-5-reactive-client-2/src/main/java/com/baeldung/limitrequests/client/utils/Client.java b/spring-reactive-modules/spring-5-reactive-client-2/src/main/java/com/baeldung/limitrequests/client/utils/Client.java new file mode 100644 index 0000000000..31cda42ce5 --- /dev/null +++ b/spring-reactive-modules/spring-5-reactive-client-2/src/main/java/com/baeldung/limitrequests/client/utils/Client.java @@ -0,0 +1,16 @@ +package com.baeldung.limitrequests.client.utils; + +public interface Client { + + String SEPARATOR = ":"; + + static String id(Object... args) { + StringBuilder builder = new StringBuilder(); + for (Object object : args) { + builder.append(":") + .append(object.toString()); + } + return builder.toString() + .replaceFirst(SEPARATOR, ""); + } +} diff --git a/spring-reactive-modules/spring-5-reactive-client-2/src/main/java/com/baeldung/limitrequests/client/utils/RandomConsumer.java b/spring-reactive-modules/spring-5-reactive-client-2/src/main/java/com/baeldung/limitrequests/client/utils/RandomConsumer.java new file mode 100644 index 0000000000..3deb8a162e --- /dev/null +++ b/spring-reactive-modules/spring-5-reactive-client-2/src/main/java/com/baeldung/limitrequests/client/utils/RandomConsumer.java @@ -0,0 +1,19 @@ +package com.baeldung.limitrequests.client.utils; + +import org.springframework.core.ParameterizedTypeReference; +import org.springframework.web.reactive.function.client.WebClient; + +import com.baeldung.limitrequests.server.RandomController; + +import reactor.core.publisher.Mono; + +public interface RandomConsumer { + + static Mono get(WebClient client, String id) { + return client.get() + .header(RandomController.CLIENT_ID_HEADER, id) + .retrieve() + .bodyToMono(new ParameterizedTypeReference() { + }); + } +} diff --git a/spring-reactive-modules/spring-5-reactive-client-2/src/main/java/com/baeldung/limitrequests/server/Concurrency.java b/spring-reactive-modules/spring-5-reactive-client-2/src/main/java/com/baeldung/limitrequests/server/Concurrency.java new file mode 100644 index 0000000000..277d0562b9 --- /dev/null +++ b/spring-reactive-modules/spring-5-reactive-client-2/src/main/java/com/baeldung/limitrequests/server/Concurrency.java @@ -0,0 +1,38 @@ +package com.baeldung.limitrequests.server; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.IntSupplier; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class Concurrency { + + public static final int MAX_CONCURRENT_REQUESTS = 5; + private static final Logger logger = LoggerFactory.getLogger(Concurrency.class); + private static final Map CONCURRENT_REQUESTS = new HashMap<>(); + + private Concurrency() { + } + + public static int protect(String clientId, IntSupplier supplier) throws InterruptedException { + AtomicInteger counter = CONCURRENT_REQUESTS.computeIfAbsent(clientId, k -> new AtomicInteger(0)); + + try { + int n = counter.incrementAndGet(); + if (n > MAX_CONCURRENT_REQUESTS) { + String message = String.format("%s - %d max concurrent requests reached [%d]. try again later", clientId, MAX_CONCURRENT_REQUESTS, n); + throw new UnsupportedOperationException(message); + } + + logger.info("{} - {}", clientId, n); + TimeUnit.SECONDS.sleep(2); + return supplier.getAsInt(); + } finally { + counter.decrementAndGet(); + } + } +} diff --git a/spring-reactive-modules/spring-5-reactive-client-2/src/main/java/com/baeldung/limitrequests/server/RandomController.java b/spring-reactive-modules/spring-5-reactive-client-2/src/main/java/com/baeldung/limitrequests/server/RandomController.java new file mode 100644 index 0000000000..9fc1c82ec8 --- /dev/null +++ b/spring-reactive-modules/spring-5-reactive-client-2/src/main/java/com/baeldung/limitrequests/server/RandomController.java @@ -0,0 +1,21 @@ +package com.baeldung.limitrequests.server; + +import java.util.Random; + +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RequestHeader; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +@RestController +@RequestMapping("/random") +public class RandomController { + + public static final String CLIENT_ID_HEADER = "client-id"; + private static final Random RANDOM = new Random(); + + @GetMapping + Integer getRandom(@RequestHeader(CLIENT_ID_HEADER) String clientId) throws InterruptedException { + return Concurrency.protect(clientId, () -> RANDOM.nextInt(50)); + } +} diff --git a/spring-reactive-modules/spring-5-reactive-client-2/src/main/resources/application.properties b/spring-reactive-modules/spring-5-reactive-client-2/src/main/resources/application.properties new file mode 100644 index 0000000000..05033054b1 --- /dev/null +++ b/spring-reactive-modules/spring-5-reactive-client-2/src/main/resources/application.properties @@ -0,0 +1,5 @@ +logging.level.root=INFO + +server.port=8081 + +logging.level.reactor.netty.http.client.HttpClient=DEBUG \ No newline at end of file diff --git a/spring-reactive-modules/spring-5-reactive-client-2/src/main/resources/logback.xml b/spring-reactive-modules/spring-5-reactive-client-2/src/main/resources/logback.xml new file mode 100644 index 0000000000..7072369b8d --- /dev/null +++ b/spring-reactive-modules/spring-5-reactive-client-2/src/main/resources/logback.xml @@ -0,0 +1,16 @@ + + + + + # Pattern of log message for console appender + %d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n + + + + + + + + + + \ No newline at end of file diff --git a/spring-reactive-modules/spring-5-reactive-client-2/src/test/java/com/baeldung/limitrequests/RandomControllerLiveTest.java b/spring-reactive-modules/spring-5-reactive-client-2/src/test/java/com/baeldung/limitrequests/RandomControllerLiveTest.java new file mode 100644 index 0000000000..e8debd24d0 --- /dev/null +++ b/spring-reactive-modules/spring-5-reactive-client-2/src/test/java/com/baeldung/limitrequests/RandomControllerLiveTest.java @@ -0,0 +1,113 @@ +package com.baeldung.limitrequests; + +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertThrows; + +import org.junit.jupiter.api.Test; +import org.springframework.web.reactive.function.client.WebClient; +import org.springframework.web.reactive.function.client.WebClientResponseException.InternalServerError; + +import com.baeldung.limitrequests.client.DelayElements; +import com.baeldung.limitrequests.client.GuavaRateLimit; +import com.baeldung.limitrequests.client.LimitConcurrency; +import com.baeldung.limitrequests.client.Resilience4jRateLimit; +import com.baeldung.limitrequests.client.ZipWithInterval; +import com.baeldung.limitrequests.server.Concurrency; + +class RandomControllerLiveTest { + + private static final int MAX_CONCURRENT_REQUESTS = Concurrency.MAX_CONCURRENT_REQUESTS; + private static final int TOTAL_REQUESTS = 10; + + private WebClient client = WebClient.create("http://localhost:8081/random"); + + @Test + void givenEnoughDelay_whenZipWithInterval_thenNoExceptionThrown() { + int delay = MAX_CONCURRENT_REQUESTS * 100; + + assertDoesNotThrow(() -> { + ZipWithInterval.fetch(client, TOTAL_REQUESTS, delay) + .doOnNext(System.out::println) + .blockLast(); + }); + } + + @Test + void givenSmallDelay_whenZipWithInterval_thenExceptionThrown() { + int delay = 100; + + assertThrows(InternalServerError.class, () -> { + ZipWithInterval.fetch(client, TOTAL_REQUESTS, delay) + .doOnNext(System.out::println) + .blockLast(); + }); + } + + @Test + void givenEnoughDelay_whenDelayElements_thenNoExceptionThrown() { + int delay = MAX_CONCURRENT_REQUESTS * 100; + + assertDoesNotThrow(() -> { + DelayElements.fetch(client, TOTAL_REQUESTS, delay) + .doOnNext(System.out::println) + .blockLast(); + }); + } + + @Test + void givenSmallDelay_whenDelayElements_thenExceptionThrown() { + int delay = 100; + + assertThrows(InternalServerError.class, () -> { + DelayElements.fetch(client, TOTAL_REQUESTS, delay) + .doOnNext(System.out::println) + .blockLast(); + }); + } + + @Test + void givenLimitInsideServerRange_whenLimitedConcurrency_thenNoExceptionThrown() { + int limit = MAX_CONCURRENT_REQUESTS; + + assertDoesNotThrow(() -> { + LimitConcurrency.fetch(client, TOTAL_REQUESTS, limit) + .doOnNext(System.out::println) + .blockLast(); + }); + } + + @Test + void givenLimitOutsideServerRange_whenLimitedConcurrency_thenExceptionThrown() { + int limit = MAX_CONCURRENT_REQUESTS + TOTAL_REQUESTS; + + assertThrows(InternalServerError.class, () -> { + LimitConcurrency.fetch(client, TOTAL_REQUESTS, limit) + .doOnNext(System.out::println) + .blockLast(); + }); + } + + @Test + void givenLongInterval_whenRateLimited_thenNoExceptionThrown() { + int interval = MAX_CONCURRENT_REQUESTS * 500; + + int limit = MAX_CONCURRENT_REQUESTS; + + assertDoesNotThrow(() -> { + Resilience4jRateLimit.fetch(client, TOTAL_REQUESTS, limit, interval) + .doOnNext(System.out::println) + .blockLast(); + }); + } + + @Test + void givenShortLimit_whenUsingGuavaRateLimiter_thenNoExceptionThrown() { + double limit = MAX_CONCURRENT_REQUESTS / 2; + + assertDoesNotThrow(() -> { + GuavaRateLimit.fetch(client, TOTAL_REQUESTS, limit) + .doOnNext(System.out::println) + .blockLast(); + }); + } +} diff --git a/spring-reactive-modules/spring-5-reactive-client-2/src/test/resources/logback-test.xml b/spring-reactive-modules/spring-5-reactive-client-2/src/test/resources/logback-test.xml new file mode 100644 index 0000000000..42cb0865c5 --- /dev/null +++ b/spring-reactive-modules/spring-5-reactive-client-2/src/test/resources/logback-test.xml @@ -0,0 +1,20 @@ + + + + + # Pattern of log message for console appender + %d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n + + + + + + + + + + + + + + \ No newline at end of file diff --git a/spring-reactive-modules/spring-5-reactive-client/README.md b/spring-reactive-modules/spring-5-reactive-client/README.md index 5a93e0b13e..f1793070b3 100644 --- a/spring-reactive-modules/spring-5-reactive-client/README.md +++ b/spring-reactive-modules/spring-5-reactive-client/README.md @@ -13,3 +13,4 @@ The "REST With Spring" Classes: http://bit.ly/restwithspring - [Get List of JSON Objects with WebClient](https://www.baeldung.com/spring-webclient-json-list) - [Upload a File with WebClient](https://www.baeldung.com/spring-webclient-upload-file) - [How to Get Response Body When Testing the Status Code in WebFlux WebClient](https://www.baeldung.com/spring-webclient-get-response-body) +- More articles: [[next -->]](../spring-5-reactive-client-2)