Moved the code to spring-5-reactive-2
This commit is contained in:
12
spring-5-reactive-2/.gitignore
vendored
Normal file
12
spring-5-reactive-2/.gitignore
vendored
Normal file
@@ -0,0 +1,12 @@
|
||||
#folders#
|
||||
.idea
|
||||
/target
|
||||
/neoDb*
|
||||
/data
|
||||
/src/main/webapp/WEB-INF/classes
|
||||
*/META-INF/*
|
||||
|
||||
# Packaged files #
|
||||
*.jar
|
||||
*.war
|
||||
*.ear
|
||||
1
spring-5-reactive-2/README.md
Normal file
1
spring-5-reactive-2/README.md
Normal file
@@ -0,0 +1 @@
|
||||
## Spring 5 Reactive Project
|
||||
61
spring-5-reactive-2/pom.xml
Normal file
61
spring-5-reactive-2/pom.xml
Normal file
@@ -0,0 +1,61 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<groupId>com.baeldung</groupId>
|
||||
<artifactId>spring-5-reactive-2</artifactId>
|
||||
<version>0.0.1-SNAPSHOT</version>
|
||||
<name>spring-5-reactive-2</name>
|
||||
<packaging>jar</packaging>
|
||||
<description>spring 5 sample project about new features</description>
|
||||
|
||||
<parent>
|
||||
<groupId>com.baeldung</groupId>
|
||||
<artifactId>parent-boot-2</artifactId>
|
||||
<version>0.0.1-SNAPSHOT</version>
|
||||
<relativePath>../parent-boot-2</relativePath>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-starter-webflux</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-starter-test</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.projectlombok</groupId>
|
||||
<artifactId>lombok</artifactId>
|
||||
<version>${lombok.version}</version>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>com.github.tomakehurst</groupId>
|
||||
<artifactId>wiremock-jre8</artifactId>
|
||||
<version>${wiremock.version}</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
<build>
|
||||
<plugins>
|
||||
<plugin>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-maven-plugin</artifactId>
|
||||
<configuration>
|
||||
<mainClass>com.baeldung.webclient.WebClientApplication</mainClass>
|
||||
<layout>JAR</layout>
|
||||
</configuration>
|
||||
</plugin>
|
||||
</plugins>
|
||||
</build>
|
||||
|
||||
<properties>
|
||||
<wiremock.version>2.24.0</wiremock.version>
|
||||
</properties>
|
||||
|
||||
</project>
|
||||
@@ -0,0 +1,13 @@
|
||||
package com.baeldung.webclient;
|
||||
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
@Data
|
||||
@NoArgsConstructor
|
||||
@AllArgsConstructor
|
||||
public class Tweet {
|
||||
private String text;
|
||||
private String username;
|
||||
}
|
||||
@@ -0,0 +1,20 @@
|
||||
package com.baeldung.webclient;
|
||||
|
||||
import org.springframework.web.bind.annotation.GetMapping;
|
||||
import org.springframework.web.bind.annotation.RestController;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
|
||||
@RestController
|
||||
public class TweetsSlowServiceController {
|
||||
|
||||
@GetMapping("/slow-service-tweets")
|
||||
private List<Tweet> getAllTweets() throws Exception {
|
||||
Thread.sleep(2000L); // delay
|
||||
return Arrays.asList(
|
||||
new Tweet("RestTemplate rules", "@user1"),
|
||||
new Tweet("WebClient is better", "@user2"),
|
||||
new Tweet("OK, both are useful", "@user1"));
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,11 @@
|
||||
package com.baeldung.webclient;
|
||||
|
||||
import org.springframework.boot.SpringApplication;
|
||||
import org.springframework.boot.autoconfigure.SpringBootApplication;
|
||||
|
||||
@SpringBootApplication
|
||||
public class WebClientApplication {
|
||||
public static void main(String[] args) {
|
||||
SpringApplication.run(WebClientApplication.class, args);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,60 @@
|
||||
package com.baeldung.webclient;
|
||||
|
||||
import lombok.Setter;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.core.ParameterizedTypeReference;
|
||||
import org.springframework.http.HttpMethod;
|
||||
import org.springframework.http.MediaType;
|
||||
import org.springframework.http.ResponseEntity;
|
||||
import org.springframework.web.bind.annotation.GetMapping;
|
||||
import org.springframework.web.bind.annotation.RestController;
|
||||
import org.springframework.web.client.RestTemplate;
|
||||
import org.springframework.web.reactive.function.client.WebClient;
|
||||
import reactor.core.publisher.Flux;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
@Slf4j
|
||||
@RestController
|
||||
public class WebController {
|
||||
|
||||
private static final int DEFAULT_PORT = 8080;
|
||||
|
||||
@Setter
|
||||
private int serverPort = DEFAULT_PORT;
|
||||
|
||||
@GetMapping("/tweets-blocking")
|
||||
public List<Tweet> getTweetsBlocking() {
|
||||
log.info("Starting BLOCKING Controller!");
|
||||
final String uri = getSlowServiceUri();
|
||||
|
||||
RestTemplate restTemplate = new RestTemplate();
|
||||
ResponseEntity<List<Tweet>> response = restTemplate.exchange(
|
||||
uri, HttpMethod.GET, null,
|
||||
new ParameterizedTypeReference<List<Tweet>>(){});
|
||||
|
||||
List<Tweet> result = response.getBody();
|
||||
result.forEach(tweet -> log.info(tweet.toString()));
|
||||
log.info("Exiting BLOCKING Controller!");
|
||||
return result;
|
||||
}
|
||||
|
||||
@GetMapping(value = "/tweets-non-blocking", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
|
||||
public Flux<Tweet> getTweetsNonBlocking() {
|
||||
log.info("Starting NON-BLOCKING Controller!");
|
||||
Flux<Tweet> tweetFlux = WebClient.create()
|
||||
.get()
|
||||
.uri(getSlowServiceUri())
|
||||
.retrieve()
|
||||
.bodyToFlux(Tweet.class);
|
||||
|
||||
tweetFlux.subscribe(tweet -> log.info(tweet.toString()));
|
||||
log.info("Exiting NON-BLOCKING Controller!");
|
||||
return tweetFlux;
|
||||
}
|
||||
|
||||
private String getSlowServiceUri() {
|
||||
return "http://localhost:" + serverPort + "/slow-service-tweets";
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,57 @@
|
||||
package com.baeldung.webclient.filter;
|
||||
|
||||
import java.io.PrintStream;
|
||||
import java.net.URI;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.http.HttpMethod;
|
||||
import org.springframework.web.reactive.function.client.ClientRequest;
|
||||
import org.springframework.web.reactive.function.client.ExchangeFilterFunction;
|
||||
|
||||
public class WebClientFilters {
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(WebClientFilters.class);
|
||||
|
||||
public static ExchangeFilterFunction demoFilter() {
|
||||
ExchangeFilterFunction filterFunction = (clientRequest, nextFilter) -> {
|
||||
LOG.info("WebClient fitler executed");
|
||||
return nextFilter.exchange(clientRequest);
|
||||
};
|
||||
return filterFunction;
|
||||
}
|
||||
|
||||
public static ExchangeFilterFunction countingFilter(AtomicInteger getCounter) {
|
||||
ExchangeFilterFunction countingFilter = (clientRequest, nextFilter) -> {
|
||||
HttpMethod httpMethod = clientRequest.method();
|
||||
if (httpMethod == HttpMethod.GET) {
|
||||
getCounter.incrementAndGet();
|
||||
}
|
||||
return nextFilter.exchange(clientRequest);
|
||||
};
|
||||
return countingFilter;
|
||||
}
|
||||
|
||||
public static ExchangeFilterFunction urlModifyingFilter(String version) {
|
||||
ExchangeFilterFunction urlModifyingFilter = (clientRequest, nextFilter) -> {
|
||||
String oldUrl = clientRequest.url()
|
||||
.toString();
|
||||
URI newUrl = URI.create(oldUrl + "/" + version);
|
||||
ClientRequest filteredRequest = ClientRequest.from(clientRequest)
|
||||
.url(newUrl)
|
||||
.build();
|
||||
return nextFilter.exchange(filteredRequest);
|
||||
};
|
||||
return urlModifyingFilter;
|
||||
}
|
||||
|
||||
public static ExchangeFilterFunction loggingFilter(PrintStream printStream) {
|
||||
ExchangeFilterFunction loggingFilter = (clientRequest, nextFilter) -> {
|
||||
printStream.print("Sending request " + clientRequest.method() + " " + clientRequest.url());
|
||||
return nextFilter.exchange(clientRequest);
|
||||
};
|
||||
return loggingFilter;
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,51 @@
|
||||
package com.baeldung.webclient;
|
||||
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.boot.test.context.SpringBootTest;
|
||||
import org.springframework.boot.web.server.LocalServerPort;
|
||||
import org.springframework.test.context.junit4.SpringRunner;
|
||||
import org.springframework.test.web.reactive.server.WebTestClient;
|
||||
|
||||
@RunWith(SpringRunner.class)
|
||||
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT, classes = WebClientApplication.class)
|
||||
public class WebControllerIntegrationTest {
|
||||
|
||||
@LocalServerPort
|
||||
int randomServerPort;
|
||||
|
||||
@Autowired
|
||||
private WebTestClient testClient;
|
||||
|
||||
@Autowired
|
||||
private WebController webController;
|
||||
|
||||
@Before
|
||||
public void setup() {
|
||||
webController.setServerPort(randomServerPort);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void whenEndpointWithBlockingClientIsCalled_thenThreeTweetsAreReceived() {
|
||||
testClient.get()
|
||||
.uri("/tweets-blocking")
|
||||
.exchange()
|
||||
.expectStatus()
|
||||
.isOk()
|
||||
.expectBodyList(Tweet.class)
|
||||
.hasSize(3);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void whenEndpointWithNonBlockingClientIsCalled_thenThreeTweetsAreReceived() {
|
||||
testClient.get()
|
||||
.uri("/tweets-non-blocking")
|
||||
.exchange()
|
||||
.expectStatus()
|
||||
.isOk()
|
||||
.expectBodyList(Tweet.class)
|
||||
.hasSize(3);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,145 @@
|
||||
package com.baeldung.webclient.filter;
|
||||
|
||||
import static com.baeldung.webclient.filter.WebClientFilters.countingFilter;
|
||||
import static com.baeldung.webclient.filter.WebClientFilters.loggingFilter;
|
||||
import static com.baeldung.webclient.filter.WebClientFilters.urlModifyingFilter;
|
||||
import static com.github.tomakehurst.wiremock.client.WireMock.aResponse;
|
||||
import static com.github.tomakehurst.wiremock.client.WireMock.containing;
|
||||
import static com.github.tomakehurst.wiremock.client.WireMock.get;
|
||||
import static com.github.tomakehurst.wiremock.client.WireMock.getRequestedFor;
|
||||
import static com.github.tomakehurst.wiremock.client.WireMock.post;
|
||||
import static com.github.tomakehurst.wiremock.client.WireMock.stubFor;
|
||||
import static com.github.tomakehurst.wiremock.client.WireMock.urlPathEqualTo;
|
||||
import static com.github.tomakehurst.wiremock.client.WireMock.verify;
|
||||
import static com.github.tomakehurst.wiremock.core.WireMockConfiguration.wireMockConfig;
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.PrintStream;
|
||||
import java.net.URI;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.springframework.web.reactive.function.client.ExchangeFilterFunctions;
|
||||
import org.springframework.web.reactive.function.client.WebClient;
|
||||
|
||||
import com.github.tomakehurst.wiremock.junit.WireMockRule;
|
||||
|
||||
public class FilteredWebClientUnitTest {
|
||||
|
||||
private static final String PATH = "/filter/test";
|
||||
|
||||
@Rule
|
||||
public WireMockRule wireMockRule = new WireMockRule(wireMockConfig().dynamicPort()
|
||||
.dynamicHttpsPort());
|
||||
|
||||
@Test
|
||||
public void whenNoUrlModifyingFilter_thenPathUnchanged() {
|
||||
stubFor(get(urlPathEqualTo(PATH)).willReturn(aResponse().withStatus(200)
|
||||
.withBody("done")));
|
||||
|
||||
WebClient webClient = WebClient.create();
|
||||
String actual = sendGetRequest(webClient);
|
||||
|
||||
assertThat(actual).isEqualTo("done");
|
||||
verify(getRequestedFor(urlPathEqualTo(PATH)));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void whenUrlModifyingFilter_thenPathModified() {
|
||||
stubFor(get(urlPathEqualTo(PATH + "/1.0")).willReturn(aResponse().withStatus(200)
|
||||
.withBody("done")));
|
||||
|
||||
WebClient webClient = WebClient.builder()
|
||||
.filter(urlModifyingFilter("1.0"))
|
||||
.build();
|
||||
String actual = sendGetRequest(webClient);
|
||||
|
||||
assertThat(actual).isEqualTo("done");
|
||||
verify(getRequestedFor(urlPathEqualTo(PATH + "/1.0")));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenCountingFilter_whenGet_thenIncreaseCounter() {
|
||||
stubFor(get(urlPathEqualTo(PATH)).willReturn(aResponse().withStatus(200)
|
||||
.withBody("done")));
|
||||
AtomicInteger counter = new AtomicInteger(10);
|
||||
|
||||
WebClient webClient = WebClient.builder()
|
||||
.filter(countingFilter(counter))
|
||||
.build();
|
||||
String actual = sendGetRequest(webClient);
|
||||
|
||||
assertThat(actual).isEqualTo("done");
|
||||
assertThat(counter.get()).isEqualTo(11);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenCountingFilter_whenPost_thenDoNotIncreaseCounter() {
|
||||
stubFor(post(urlPathEqualTo(PATH)).willReturn(aResponse().withStatus(200)
|
||||
.withBody("done")));
|
||||
AtomicInteger counter = new AtomicInteger(10);
|
||||
|
||||
WebClient webClient = WebClient.builder()
|
||||
.filter(countingFilter(counter))
|
||||
.build();
|
||||
String actual = sendPostRequest(webClient);
|
||||
|
||||
assertThat(actual).isEqualTo("done");
|
||||
assertThat(counter.get()).isEqualTo(10);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testLoggingFilter() throws IOException {
|
||||
stubFor(get(urlPathEqualTo(PATH)).willReturn(aResponse().withStatus(200)
|
||||
.withBody("done")));
|
||||
|
||||
try (ByteArrayOutputStream baos = new ByteArrayOutputStream(); PrintStream ps = new PrintStream(baos);) {
|
||||
WebClient webClient = WebClient.builder()
|
||||
.filter(loggingFilter(ps))
|
||||
.build();
|
||||
String actual = sendGetRequest(webClient);
|
||||
|
||||
assertThat(actual).isEqualTo("done");
|
||||
assertThat(baos.toString()).isEqualTo("Sending request GET " + getUrl());
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBasicAuthFilter() {
|
||||
stubFor(get(urlPathEqualTo(PATH)).willReturn(aResponse().withStatus(200)
|
||||
.withBody("authorized")));
|
||||
|
||||
WebClient webClient = WebClient.builder()
|
||||
.filter(ExchangeFilterFunctions.basicAuthentication("user", "password"))
|
||||
.build();
|
||||
String actual = sendGetRequest(webClient);
|
||||
|
||||
assertThat(actual).isEqualTo("authorized");
|
||||
verify(getRequestedFor(urlPathEqualTo(PATH)).withHeader("Authorization", containing("Basic")));
|
||||
}
|
||||
|
||||
private String sendGetRequest(WebClient webClient) {
|
||||
return webClient.get()
|
||||
.uri(getUrl())
|
||||
.retrieve()
|
||||
.bodyToMono(String.class)
|
||||
.block();
|
||||
}
|
||||
|
||||
private String sendPostRequest(WebClient webClient) {
|
||||
return webClient.post()
|
||||
.uri(URI.create(getUrl()))
|
||||
.retrieve()
|
||||
.bodyToMono(String.class)
|
||||
.block();
|
||||
}
|
||||
|
||||
private String getUrl() {
|
||||
return "http://localhost:" + wireMockRule.port() + PATH;
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
Reference in New Issue
Block a user