From 93bfc63f153d622c61cb186cd71fd07b896586ce Mon Sep 17 00:00:00 2001 From: k0l0ssus Date: Sun, 29 Jul 2018 22:56:16 -0400 Subject: [PATCH 01/13] Create readme.md --- jersey-client-rx/readme.md | 1 + 1 file changed, 1 insertion(+) create mode 100644 jersey-client-rx/readme.md diff --git a/jersey-client-rx/readme.md b/jersey-client-rx/readme.md new file mode 100644 index 0000000000..8b13789179 --- /dev/null +++ b/jersey-client-rx/readme.md @@ -0,0 +1 @@ + From c195c5dc508d733f0c4e2a3921ad69d1e0d1eb06 Mon Sep 17 00:00:00 2001 From: k0l0ssus Date: Sun, 29 Jul 2018 22:56:58 -0400 Subject: [PATCH 02/13] Add files via upload --- jersey-client-rx/pom.xml | 30 +++ .../samples/jerseyrx/ClientOrchestration.java | 203 ++++++++++++++++++ 2 files changed, 233 insertions(+) create mode 100644 jersey-client-rx/pom.xml create mode 100644 jersey-client-rx/src/main/java/com/baeldung/samples/jerseyrx/ClientOrchestration.java diff --git a/jersey-client-rx/pom.xml b/jersey-client-rx/pom.xml new file mode 100644 index 0000000000..3857c16730 --- /dev/null +++ b/jersey-client-rx/pom.xml @@ -0,0 +1,30 @@ + + + 4.0.0 + com.baeldung.samples + jersey-client-rx + 1.0 + jar + + + org.glassfish.jersey.core + jersey-client + 2.27 + + + org.glassfish.jersey.ext.rx + jersey-rx-client-rxjava + 2.27 + + + org.glassfish.jersey.ext.rx + jersey-rx-client-rxjava2 + 2.27 + + + + UTF-8 + 1.8 + 1.8 + + \ No newline at end of file diff --git a/jersey-client-rx/src/main/java/com/baeldung/samples/jerseyrx/ClientOrchestration.java b/jersey-client-rx/src/main/java/com/baeldung/samples/jerseyrx/ClientOrchestration.java new file mode 100644 index 0000000000..4adf5e50b8 --- /dev/null +++ b/jersey-client-rx/src/main/java/com/baeldung/samples/jerseyrx/ClientOrchestration.java @@ -0,0 +1,203 @@ +package com.baeldung.samples.jerseyrx; + +import io.reactivex.Flowable; +import io.reactivex.disposables.Disposable; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.logging.Level; +import org.glassfish.jersey.client.rx.rxjava.RxObservableInvokerProvider; +import org.glassfish.jersey.client.rx.rxjava.RxObservableInvoker; +import java.util.logging.Logger; +import java.util.stream.Collectors; +import javax.ws.rs.client.Client; +import javax.ws.rs.client.ClientBuilder; +import javax.ws.rs.client.InvocationCallback; +import javax.ws.rs.client.WebTarget; +import javax.ws.rs.core.GenericType; +import org.glassfish.jersey.client.rx.rxjava2.RxFlowableInvoker; +import org.glassfish.jersey.client.rx.rxjava2.RxFlowableInvokerProvider; +import rx.Observable; + +/** + * + * @author baeldung + */ +public class ClientOrchestration { + + Client client = ClientBuilder.newClient(); + WebTarget userIdService = client.target("http:localhost:8080/serviceA/id?limit=10"); + WebTarget nameService = client.target("http:localhost:8080/serviceA/{empId}/name"); + WebTarget hashService = client.target("http:localhost:8080/serviceA/{comboIDandName}/address"); + + Logger logger = Logger.getLogger("ClientOrchestrator"); + + public static void main(String[] args) { + ClientOrchestration orchestrator = new ClientOrchestration(); + + orchestrator.callBackOrchestrate(); + orchestrator.rxOrchestrate(); + orchestrator.observableJavaOrchestrate(); + orchestrator.flowableJavaOrchestrate(); + + } + + public void callBackOrchestrate() { + logger.info("Orchestrating with the pyramid of doom"); + userIdService.request() + .async() + .get(new InvocationCallback>() { + @Override + public void completed(List empIds) { + CountDownLatch completionTracker = new CountDownLatch(empIds.size()); //used to keep track of the progress of the subsequent calls + empIds.forEach((id) -> { + //for each employee ID, get the name + nameService.resolveTemplate("empId", id).request() + .async() + .get(new InvocationCallback() { + @Override + public void completed(String response) { + completionTracker.countDown(); + hashService.request().async().get(new InvocationCallback() { + @Override + public void completed(String response) { + logger.log(Level.INFO, "The hash output {0}", response); + } + + @Override + public void failed(Throwable throwable) { + completionTracker.countDown(); + logger.log(Level.WARNING, "An error has occurred in the hashing request step {0}", throwable.getMessage()); + } + }); + } + @Override + public void failed(Throwable throwable) { + completionTracker.countDown(); + logger.log(Level.WARNING, "An error has occurred in the username request step {0}", throwable.getMessage()); + } + }); + }); + + try { + if (!completionTracker.await(10, TimeUnit.SECONDS)) { //wait for inner requests to complete in 10 seconds + logger.warning("Some requests didn't complete within the timeout"); + } + } catch (InterruptedException ex) { + Logger.getLogger(ClientOrchestration.class.getName()).log(Level.SEVERE, null, ex); + } + + } + + @Override + public void failed(Throwable throwable) { + //implement callback + } + }); + } + + public void rxOrchestrate() { + logger.info("Orchestrating with a CompletionStage"); + CompletionStage> userIdStage = userIdService.request() + .rx() + .get(new GenericType>() { + }) + .exceptionally((Throwable throwable) -> { + logger.warning("An error has occurred"); + return null; + }); + + CompletionStage>> completedNameStage = userIdStage.thenApplyAsync(list -> list.stream().map((Long id) -> { + CompletionStage nameStage = nameService.resolveTemplate("empId", id).request().rx().get(String.class); + }).collect(Collectors.toList())); + + userIdStage.thenAcceptAsync(listOfIds -> { + listOfIds.stream().map((Long id) -> { + CompletableFuture completable = nameService.resolveTemplate("empId", id) + .request() + .rx() + .get(String.class) + .toCompletableFuture(); + + completable.thenAccept((String userName) -> { + hashService.resolveTemplate("comboIDandName", userName + id) + .request() + .rx() + .get(String.class) + .toCompletableFuture() + .thenAcceptAsync(hashValue -> logger.log(Level.INFO, "The hash output {0}", hashValue)) + .exceptionally((Throwable throwable) -> { + logger.log(Level.WARNING, "Hash computation failed for {0}", id); + return null; + }); + + }); + + }); + }); + + } + + public void observableJavaOrchestrate() { + + logger.info("Orchestrating with Observables"); + + client.register(RxObservableInvokerProvider.class); + + Observable> userIdObservable = userIdService.request() + .rx(RxObservableInvoker.class) + .get(new GenericType>() { + }); + + userIdObservable.subscribe((List listOfIds) -> { + Observable.from(listOfIds).map(id + -> nameService.resolveTemplate("empId", id) + .request() + .rx(RxObservableInvoker.class) + .get(String.class) + .asObservable() //gotten the name for the given empId + .doOnError(throwable -> logger.log(Level.WARNING, "An error has occurred in the username request step {0}", throwable.getMessage())) + .subscribe(userName -> hashService.resolveTemplate("comboIDandName", userName + id) + .request() + .rx(RxObservableInvoker.class) + .get(String.class) + .asObservable() //gotten the hash value for empId+username + .doOnError(throwable -> logger.log(Level.WARNING, "An error has occurred in the hashing request step {0}", throwable.getMessage())) + .subscribe(hashValue -> logger.log(Level.INFO, "The hash output {0}", hashValue)))); + }); + + } + + public void flowableJavaOrchestrate() { + + logger.info("Orchestrating with Flowable"); + + client.register(RxFlowableInvokerProvider.class); + + Flowable> userIdObservable = userIdService.request() + .rx(RxFlowableInvoker.class) + .get(new GenericType>() { + }); + + Disposable subscribe = userIdObservable.subscribe((List listOfIds) -> { + Observable.from(listOfIds).map(id + -> nameService.resolveTemplate("empId", id) + .request() + .rx(RxObservableInvoker.class) + .get(String.class) + .asObservable() //gotten the name for the given empId + .doOnError(throwable -> logger.log(Level.WARNING, "An error has occurred in the username request step {0}", throwable.getMessage())) + .subscribe(userName -> hashService.resolveTemplate("comboIDandName", userName + id) + .request() + .rx(RxObservableInvoker.class) + .get(String.class) + .asObservable() //gotten the hash value for empId+username + .doOnError(throwable -> logger.warning("An error has occurred in the hashing request step " + throwable.getMessage())) + .subscribe(hashValue -> logger.log(Level.INFO, "The hash output {0}", hashValue)))); + }); + + } + +} From 075ba76de1d6688a1cf8374551ded910d1fba2c1 Mon Sep 17 00:00:00 2001 From: k0l0ssus Date: Sun, 29 Jul 2018 23:36:33 -0400 Subject: [PATCH 03/13] Add files via upload --- jersey-client-rx/pom.xml | 5 ++++ .../samples/jerseyrx/ClientOrchestration.java | 27 +++++++------------ 2 files changed, 14 insertions(+), 18 deletions(-) diff --git a/jersey-client-rx/pom.xml b/jersey-client-rx/pom.xml index 3857c16730..cc5f28c938 100644 --- a/jersey-client-rx/pom.xml +++ b/jersey-client-rx/pom.xml @@ -6,6 +6,11 @@ 1.0 jar + + org.glassfish.jersey.inject + jersey-hk2 + 2.27 + org.glassfish.jersey.core jersey-client diff --git a/jersey-client-rx/src/main/java/com/baeldung/samples/jerseyrx/ClientOrchestration.java b/jersey-client-rx/src/main/java/com/baeldung/samples/jerseyrx/ClientOrchestration.java index 4adf5e50b8..d3e6986b39 100644 --- a/jersey-client-rx/src/main/java/com/baeldung/samples/jerseyrx/ClientOrchestration.java +++ b/jersey-client-rx/src/main/java/com/baeldung/samples/jerseyrx/ClientOrchestration.java @@ -28,9 +28,9 @@ import rx.Observable; public class ClientOrchestration { Client client = ClientBuilder.newClient(); - WebTarget userIdService = client.target("http:localhost:8080/serviceA/id?limit=10"); - WebTarget nameService = client.target("http:localhost:8080/serviceA/{empId}/name"); - WebTarget hashService = client.target("http:localhost:8080/serviceA/{comboIDandName}/address"); + WebTarget userIdService = client.target("http://localhost:8080/serviceA/id?limit=10"); + WebTarget nameService = client.target("http://localhost:8080/serviceA/{empId}/name"); + WebTarget hashService = client.target("http://localhost:8080/serviceA/{comboIDandName}/address"); Logger logger = Logger.getLogger("ClientOrchestrator"); @@ -73,6 +73,7 @@ public class ClientOrchestration { } }); } + @Override public void failed(Throwable throwable) { completionTracker.countDown(); @@ -80,7 +81,7 @@ public class ClientOrchestration { } }); }); - + try { if (!completionTracker.await(10, TimeUnit.SECONDS)) { //wait for inner requests to complete in 10 seconds logger.warning("Some requests didn't complete within the timeout"); @@ -88,7 +89,7 @@ public class ClientOrchestration { } catch (InterruptedException ex) { Logger.getLogger(ClientOrchestration.class.getName()).log(Level.SEVERE, null, ex); } - + } @Override @@ -109,12 +110,8 @@ public class ClientOrchestration { return null; }); - CompletionStage>> completedNameStage = userIdStage.thenApplyAsync(list -> list.stream().map((Long id) -> { - CompletionStage nameStage = nameService.resolveTemplate("empId", id).request().rx().get(String.class); - }).collect(Collectors.toList())); - userIdStage.thenAcceptAsync(listOfIds -> { - listOfIds.stream().map((Long id) -> { + listOfIds.stream().forEach((Long id) -> { CompletableFuture completable = nameService.resolveTemplate("empId", id) .request() .rx() @@ -143,10 +140,7 @@ public class ClientOrchestration { public void observableJavaOrchestrate() { logger.info("Orchestrating with Observables"); - - client.register(RxObservableInvokerProvider.class); - - Observable> userIdObservable = userIdService.request() + Observable> userIdObservable = userIdService.register(RxObservableInvokerProvider.class).request() .rx(RxObservableInvoker.class) .get(new GenericType>() { }); @@ -173,10 +167,7 @@ public class ClientOrchestration { public void flowableJavaOrchestrate() { logger.info("Orchestrating with Flowable"); - - client.register(RxFlowableInvokerProvider.class); - - Flowable> userIdObservable = userIdService.request() + Flowable> userIdObservable = userIdService.register(RxFlowableInvokerProvider.class).request() .rx(RxFlowableInvoker.class) .get(new GenericType>() { }); From 03b611d494ef912add25e68a59d8f4dfd2f26028 Mon Sep 17 00:00:00 2001 From: k0l0ssus Date: Sun, 29 Jul 2018 23:38:53 -0400 Subject: [PATCH 04/13] Add files via upload --- .../java/com/baeldung/samples/jerseyrx/ClientOrchestration.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/jersey-client-rx/src/main/java/com/baeldung/samples/jerseyrx/ClientOrchestration.java b/jersey-client-rx/src/main/java/com/baeldung/samples/jerseyrx/ClientOrchestration.java index d3e6986b39..2b5c6bf965 100644 --- a/jersey-client-rx/src/main/java/com/baeldung/samples/jerseyrx/ClientOrchestration.java +++ b/jersey-client-rx/src/main/java/com/baeldung/samples/jerseyrx/ClientOrchestration.java @@ -30,7 +30,7 @@ public class ClientOrchestration { Client client = ClientBuilder.newClient(); WebTarget userIdService = client.target("http://localhost:8080/serviceA/id?limit=10"); WebTarget nameService = client.target("http://localhost:8080/serviceA/{empId}/name"); - WebTarget hashService = client.target("http://localhost:8080/serviceA/{comboIDandName}/address"); + WebTarget hashService = client.target("http://localhost:8080/serviceA/{comboIDandName}/hash"); Logger logger = Logger.getLogger("ClientOrchestrator"); From 11c97cff08c49c727654b285d3985a1b6c8f2c7c Mon Sep 17 00:00:00 2001 From: k0l0ssus Date: Sun, 29 Jul 2018 23:42:17 -0400 Subject: [PATCH 05/13] Update readme.md --- jersey-client-rx/readme.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/jersey-client-rx/readme.md b/jersey-client-rx/readme.md index 8b13789179..d1bc4e950b 100644 --- a/jersey-client-rx/readme.md +++ b/jersey-client-rx/readme.md @@ -1 +1,3 @@ +# Fluent, Reactive Jersey Client Orchestration # +### Sample code demonstrating the options for asynchronous, reactive RESTful service consumption with JAX-RS ### From af4ddaeb34ab56215c5437fddba22e0c5eb27ef9 Mon Sep 17 00:00:00 2001 From: k0l0ssus Date: Mon, 6 Aug 2018 21:20:02 -0400 Subject: [PATCH 06/13] Add files via upload --- jersey-client-rx/pom.xml | 21 +++++ .../samples/jerseyrx/ClientOrchestration.java | 94 +++++++++++-------- .../samples/jerseyrx/EmployeeDTO.java | 21 +++++ .../jerseyrx/ClientOrchestrationTest.java | 67 +++++++++++++ 4 files changed, 162 insertions(+), 41 deletions(-) create mode 100644 jersey-client-rx/src/main/java/com/baeldung/samples/jerseyrx/EmployeeDTO.java create mode 100644 jersey-client-rx/src/test/java/com/baeldung/samples/jerseyrx/ClientOrchestrationTest.java diff --git a/jersey-client-rx/pom.xml b/jersey-client-rx/pom.xml index cc5f28c938..fb7494fab1 100644 --- a/jersey-client-rx/pom.xml +++ b/jersey-client-rx/pom.xml @@ -26,6 +26,27 @@ jersey-rx-client-rxjava2 2.27 + + com.github.tomakehurst + wiremock + 1.58 + test + + + org.junit.vintage + junit-vintage-engine + 5.2.0 + + + org.glassfish.jersey.media + jersey-media-json-jackson + 2.22 + + + com.fasterxml.jackson.jaxrs + jackson-jaxrs-json-provider + 2.4.1 + UTF-8 diff --git a/jersey-client-rx/src/main/java/com/baeldung/samples/jerseyrx/ClientOrchestration.java b/jersey-client-rx/src/main/java/com/baeldung/samples/jerseyrx/ClientOrchestration.java index 2b5c6bf965..5c145ca5d9 100644 --- a/jersey-client-rx/src/main/java/com/baeldung/samples/jerseyrx/ClientOrchestration.java +++ b/jersey-client-rx/src/main/java/com/baeldung/samples/jerseyrx/ClientOrchestration.java @@ -11,12 +11,12 @@ import java.util.logging.Level; import org.glassfish.jersey.client.rx.rxjava.RxObservableInvokerProvider; import org.glassfish.jersey.client.rx.rxjava.RxObservableInvoker; import java.util.logging.Logger; -import java.util.stream.Collectors; import javax.ws.rs.client.Client; import javax.ws.rs.client.ClientBuilder; import javax.ws.rs.client.InvocationCallback; import javax.ws.rs.client.WebTarget; import javax.ws.rs.core.GenericType; +import javax.ws.rs.core.MediaType; import org.glassfish.jersey.client.rx.rxjava2.RxFlowableInvoker; import org.glassfish.jersey.client.rx.rxjava2.RxFlowableInvokerProvider; import rx.Observable; @@ -28,7 +28,7 @@ import rx.Observable; public class ClientOrchestration { Client client = ClientBuilder.newClient(); - WebTarget userIdService = client.target("http://localhost:8080/serviceA/id?limit=10"); + WebTarget userIdService = client.target("http://localhost:8080/serviceA/id"); WebTarget nameService = client.target("http://localhost:8080/serviceA/{empId}/name"); WebTarget hashService = client.target("http://localhost:8080/serviceA/{comboIDandName}/hash"); @@ -46,30 +46,33 @@ public class ClientOrchestration { public void callBackOrchestrate() { logger.info("Orchestrating with the pyramid of doom"); - userIdService.request() + userIdService.request().accept(MediaType.APPLICATION_JSON) .async() - .get(new InvocationCallback>() { + .get(new InvocationCallback() { @Override - public void completed(List empIds) { + public void completed(EmployeeDTO empIdList) { + logger.info("[InvocationCallback] Got all the IDs " + empIdList.getEmpIds()); + List empIds = empIdList.getEmpIds(); CountDownLatch completionTracker = new CountDownLatch(empIds.size()); //used to keep track of the progress of the subsequent calls empIds.forEach((id) -> { //for each employee ID, get the name nameService.resolveTemplate("empId", id).request() .async() .get(new InvocationCallback() { + @Override public void completed(String response) { completionTracker.countDown(); - hashService.request().async().get(new InvocationCallback() { + hashService.resolveTemplate("comboIDandName", response + id).request().async().get(new InvocationCallback() { @Override public void completed(String response) { - logger.log(Level.INFO, "The hash output {0}", response); + logger.log(Level.INFO, "[InvocationCallback] The hash output {0}", response); } @Override public void failed(Throwable throwable) { completionTracker.countDown(); - logger.log(Level.WARNING, "An error has occurred in the hashing request step {0}", throwable.getMessage()); + logger.log(Level.WARNING, "[InvocationCallback] An error has occurred in the hashing request step {0}", throwable.getMessage()); } }); } @@ -77,14 +80,14 @@ public class ClientOrchestration { @Override public void failed(Throwable throwable) { completionTracker.countDown(); - logger.log(Level.WARNING, "An error has occurred in the username request step {0}", throwable.getMessage()); + logger.log(Level.WARNING, "[InvocationCallback] An error has occurred in the username request step {0}", throwable.getMessage()); } }); }); try { if (!completionTracker.await(10, TimeUnit.SECONDS)) { //wait for inner requests to complete in 10 seconds - logger.warning("Some requests didn't complete within the timeout"); + logger.warning("[InvocationCallback] Some requests didn't complete within the timeout"); } } catch (InterruptedException ex) { Logger.getLogger(ClientOrchestration.class.getName()).log(Level.SEVERE, null, ex); @@ -94,24 +97,25 @@ public class ClientOrchestration { @Override public void failed(Throwable throwable) { - //implement callback + logger.warning("Couldn't get the list of IDs"); } }); } public void rxOrchestrate() { logger.info("Orchestrating with a CompletionStage"); - CompletionStage> userIdStage = userIdService.request() + CompletionStage userIdStage = userIdService.request().accept(MediaType.APPLICATION_JSON) .rx() - .get(new GenericType>() { + .get(new GenericType() { }) .exceptionally((Throwable throwable) -> { - logger.warning("An error has occurred"); + logger.warning("[CompletionStage] An error has occurred"); return null; }); - userIdStage.thenAcceptAsync(listOfIds -> { - listOfIds.stream().forEach((Long id) -> { + userIdStage.thenAcceptAsync(empIdDto -> { + logger.info("[CompletionStage] Got all the IDs " + empIdDto.getEmpIds()); + empIdDto.getEmpIds().stream().forEach((Long id) -> { CompletableFuture completable = nameService.resolveTemplate("empId", id) .request() .rx() @@ -124,9 +128,9 @@ public class ClientOrchestration { .rx() .get(String.class) .toCompletableFuture() - .thenAcceptAsync(hashValue -> logger.log(Level.INFO, "The hash output {0}", hashValue)) + .thenAcceptAsync(hashValue -> logger.log(Level.INFO, "[CompletionFuture] The hash output {0}", hashValue)) .exceptionally((Throwable throwable) -> { - logger.log(Level.WARNING, "Hash computation failed for {0}", id); + logger.log(Level.WARNING, "[CompletionStage] Hash computation failed for {0}", id); return null; }); @@ -140,53 +144,61 @@ public class ClientOrchestration { public void observableJavaOrchestrate() { logger.info("Orchestrating with Observables"); - Observable> userIdObservable = userIdService.register(RxObservableInvokerProvider.class).request() + Observable userIdObservable = userIdService.register(RxObservableInvokerProvider.class).request() + .accept(MediaType.APPLICATION_JSON) .rx(RxObservableInvoker.class) - .get(new GenericType>() { + .get(new GenericType() { }); - userIdObservable.subscribe((List listOfIds) -> { - Observable.from(listOfIds).map(id - -> nameService.resolveTemplate("empId", id) + userIdObservable.subscribe((EmployeeDTO empIdList) -> { + logger.info("[Observable] Got all the IDs " + empIdList.getEmpIds()); + Observable.from(empIdList.getEmpIds()).subscribe(id + -> nameService.register(RxObservableInvokerProvider.class) + .resolveTemplate("empId", id) .request() .rx(RxObservableInvoker.class) .get(String.class) .asObservable() //gotten the name for the given empId - .doOnError(throwable -> logger.log(Level.WARNING, "An error has occurred in the username request step {0}", throwable.getMessage())) - .subscribe(userName -> hashService.resolveTemplate("comboIDandName", userName + id) + .doOnError(throwable -> logger.log(Level.WARNING, " [Observable] An error has occurred in the username request step {0}", throwable.getMessage())) + .subscribe(userName -> hashService + .register(RxObservableInvokerProvider.class) + .resolveTemplate("comboIDandName", userName + id) .request() .rx(RxObservableInvoker.class) .get(String.class) .asObservable() //gotten the hash value for empId+username - .doOnError(throwable -> logger.log(Level.WARNING, "An error has occurred in the hashing request step {0}", throwable.getMessage())) - .subscribe(hashValue -> logger.log(Level.INFO, "The hash output {0}", hashValue)))); + .doOnError(throwable -> logger.log(Level.WARNING, " [Observable]An error has occurred in the hashing request step {0}", throwable.getMessage())) + .subscribe(hashValue -> logger.log(Level.INFO, "[Observable] The hash output {0}", hashValue)))); }); } + public void flowableJavaOrchestrate() { logger.info("Orchestrating with Flowable"); - Flowable> userIdObservable = userIdService.register(RxFlowableInvokerProvider.class).request() + Flowable userIdObservable = userIdService.register(RxFlowableInvokerProvider.class) + .request() .rx(RxFlowableInvoker.class) - .get(new GenericType>() { + .get(new GenericType() { }); - Disposable subscribe = userIdObservable.subscribe((List listOfIds) -> { + Disposable subscribe = userIdObservable.subscribe((EmployeeDTO dto) -> { + List listOfIds = dto.getEmpIds(); Observable.from(listOfIds).map(id - -> nameService.resolveTemplate("empId", id) + -> nameService.register(RxFlowableInvokerProvider.class) + .resolveTemplate("empId", id) .request() - .rx(RxObservableInvoker.class) - .get(String.class) - .asObservable() //gotten the name for the given empId - .doOnError(throwable -> logger.log(Level.WARNING, "An error has occurred in the username request step {0}", throwable.getMessage())) - .subscribe(userName -> hashService.resolveTemplate("comboIDandName", userName + id) + .rx(RxFlowableInvoker.class) + .get(String.class) //gotten the name for the given empId + .doOnError(throwable -> logger.log(Level.WARNING, "[Flowable] An error has occurred in the username request step {0}", throwable.getMessage())) + .subscribe(userName -> hashService.register(RxFlowableInvokerProvider.class) + .resolveTemplate("comboIDandName", userName + id) .request() - .rx(RxObservableInvoker.class) - .get(String.class) - .asObservable() //gotten the hash value for empId+username - .doOnError(throwable -> logger.warning("An error has occurred in the hashing request step " + throwable.getMessage())) - .subscribe(hashValue -> logger.log(Level.INFO, "The hash output {0}", hashValue)))); + .rx(RxFlowableInvoker.class) + .get(String.class) //gotten the hash value for empId+username + .doOnError(throwable -> logger.warning(" [Flowable] An error has occurred in the hashing request step " + throwable.getMessage())) + .subscribe(hashValue -> logger.log(Level.INFO, "[Flowable] The hash output {0}", hashValue)))); }); } diff --git a/jersey-client-rx/src/main/java/com/baeldung/samples/jerseyrx/EmployeeDTO.java b/jersey-client-rx/src/main/java/com/baeldung/samples/jerseyrx/EmployeeDTO.java new file mode 100644 index 0000000000..ab3cfb54a2 --- /dev/null +++ b/jersey-client-rx/src/main/java/com/baeldung/samples/jerseyrx/EmployeeDTO.java @@ -0,0 +1,21 @@ +package com.baeldung.samples.jerseyrx; + +import java.util.List; + +/** + * + * @author SIGINT-X + */ +public class EmployeeDTO { + + private List empIds; + + public List getEmpIds() { + return empIds; + } + + public void setEmpIds(List empIds) { + this.empIds = empIds; + } + +} diff --git a/jersey-client-rx/src/test/java/com/baeldung/samples/jerseyrx/ClientOrchestrationTest.java b/jersey-client-rx/src/test/java/com/baeldung/samples/jerseyrx/ClientOrchestrationTest.java new file mode 100644 index 0000000000..4286e192c0 --- /dev/null +++ b/jersey-client-rx/src/test/java/com/baeldung/samples/jerseyrx/ClientOrchestrationTest.java @@ -0,0 +1,67 @@ +package com.baeldung.samples.jerseyrx; + +import com.github.tomakehurst.wiremock.WireMockServer; +import static com.github.tomakehurst.wiremock.client.WireMock.aResponse; +import static com.github.tomakehurst.wiremock.client.WireMock.configureFor; +import static com.github.tomakehurst.wiremock.client.WireMock.equalTo; +import static com.github.tomakehurst.wiremock.client.WireMock.get; +import static com.github.tomakehurst.wiremock.client.WireMock.stubFor; +import static com.github.tomakehurst.wiremock.client.WireMock.urlEqualTo; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +/** + * + * @author baeldung + */ +public class ClientOrchestrationTest { + + ClientOrchestration orchestrator = new ClientOrchestration(); + + String jsonIdList = "{\"empIds\":[1,2,3,4,5,6]}"; + + String[] nameList = new String[]{"n/a", "Thor", "Hulk", "BlackWidow", "BlackPanther", "TheTick", "Hawkeye"}; + + String[] hashResultList = new String[]{"roht1", "kluh2", "WodiwKcalb3", "RehtnapKclab4", "kciteht5", "eyekwah6"}; + + WireMockServer wireMockServer = new WireMockServer(); + + @Before + public void setup() { + wireMockServer.start(); + configureFor("localhost", 8080); + stubFor(get(urlEqualTo("/serviceA/id")).willReturn(aResponse().withBody(jsonIdList).withHeader("Content-Type", "application/json"))); + + stubFor(get(urlEqualTo("/serviceA/1/name")).willReturn(aResponse().withBody(nameList[1]))); + stubFor(get(urlEqualTo("/serviceA/2/name")).willReturn(aResponse().withBody(nameList[2]))); + stubFor(get(urlEqualTo("/serviceA/3/name")).willReturn(aResponse().withBody(nameList[3]))); + stubFor(get(urlEqualTo("/serviceA/4/name")).willReturn(aResponse().withBody(nameList[4]))); + stubFor(get(urlEqualTo("/serviceA/5/name")).willReturn(aResponse().withBody(nameList[5]))); + stubFor(get(urlEqualTo("/serviceA/6/name")).willReturn(aResponse().withBody(nameList[6]))); + + stubFor(get(urlEqualTo("/serviceA/Thor1/hash")).willReturn(aResponse().withBody(hashResultList[0]))); + stubFor(get(urlEqualTo("/serviceA/Hulk2/hash")).willReturn(aResponse().withBody(hashResultList[1]))); + stubFor(get(urlEqualTo("/serviceA/BlackWidow3/hash")).willReturn(aResponse().withBody(hashResultList[2]))); + stubFor(get(urlEqualTo("/serviceA/BlackPanther4/hash")).willReturn(aResponse().withBody(hashResultList[3]))); + stubFor(get(urlEqualTo("/serviceA/TheTick5/hash")).willReturn(aResponse().withBody(hashResultList[4]))); + stubFor(get(urlEqualTo("/serviceA/Hawkeye6/hash")).willReturn(aResponse().withBody(hashResultList[5]))); + + } + + @Test + public void hits() { + + orchestrator.callBackOrchestrate(); + orchestrator.rxOrchestrate(); + orchestrator.observableJavaOrchestrate(); + orchestrator.flowableJavaOrchestrate(); + + } + + @After + public void tearDown() { + + } + +} From ac9ef8d6d0bad031f9ccebf0c765bcddd204d84a Mon Sep 17 00:00:00 2001 From: k0l0ssus Date: Tue, 7 Aug 2018 00:16:23 -0400 Subject: [PATCH 07/13] Add files via upload --- .../samples/jerseyrx/ClientOrchestration.java | 33 +++++++++++++++---- .../jerseyrx/ClientOrchestrationTest.java | 32 +++++++++--------- 2 files changed, 43 insertions(+), 22 deletions(-) diff --git a/jersey-client-rx/src/main/java/com/baeldung/samples/jerseyrx/ClientOrchestration.java b/jersey-client-rx/src/main/java/com/baeldung/samples/jerseyrx/ClientOrchestration.java index 5c145ca5d9..6e184876cb 100644 --- a/jersey-client-rx/src/main/java/com/baeldung/samples/jerseyrx/ClientOrchestration.java +++ b/jersey-client-rx/src/main/java/com/baeldung/samples/jerseyrx/ClientOrchestration.java @@ -2,6 +2,7 @@ package com.baeldung.samples.jerseyrx; import io.reactivex.Flowable; import io.reactivex.disposables.Disposable; +import java.util.LinkedList; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; @@ -31,6 +32,7 @@ public class ClientOrchestration { WebTarget userIdService = client.target("http://localhost:8080/serviceA/id"); WebTarget nameService = client.target("http://localhost:8080/serviceA/{empId}/name"); WebTarget hashService = client.target("http://localhost:8080/serviceA/{comboIDandName}/hash"); + LinkedList failures = new LinkedList<>(); Logger logger = Logger.getLogger("ClientOrchestrator"); @@ -72,6 +74,7 @@ public class ClientOrchestration { @Override public void failed(Throwable throwable) { completionTracker.countDown(); + failures.add(throwable); logger.log(Level.WARNING, "[InvocationCallback] An error has occurred in the hashing request step {0}", throwable.getMessage()); } }); @@ -80,6 +83,7 @@ public class ClientOrchestration { @Override public void failed(Throwable throwable) { completionTracker.countDown(); + failures.add(throwable); logger.log(Level.WARNING, "[InvocationCallback] An error has occurred in the username request step {0}", throwable.getMessage()); } }); @@ -90,6 +94,7 @@ public class ClientOrchestration { logger.warning("[InvocationCallback] Some requests didn't complete within the timeout"); } } catch (InterruptedException ex) { + failures.add(ex); Logger.getLogger(ClientOrchestration.class.getName()).log(Level.SEVERE, null, ex); } @@ -97,6 +102,7 @@ public class ClientOrchestration { @Override public void failed(Throwable throwable) { + failures.add(throwable); logger.warning("Couldn't get the list of IDs"); } }); @@ -108,7 +114,8 @@ public class ClientOrchestration { .rx() .get(new GenericType() { }) - .exceptionally((Throwable throwable) -> { + .exceptionally((throwable) -> { + failures.add(throwable); logger.warning("[CompletionStage] An error has occurred"); return null; }); @@ -129,7 +136,8 @@ public class ClientOrchestration { .get(String.class) .toCompletableFuture() .thenAcceptAsync(hashValue -> logger.log(Level.INFO, "[CompletionFuture] The hash output {0}", hashValue)) - .exceptionally((Throwable throwable) -> { + .exceptionally((throwable) -> { + failures.add(throwable); logger.log(Level.WARNING, "[CompletionStage] Hash computation failed for {0}", id); return null; }); @@ -159,7 +167,10 @@ public class ClientOrchestration { .rx(RxObservableInvoker.class) .get(String.class) .asObservable() //gotten the name for the given empId - .doOnError(throwable -> logger.log(Level.WARNING, " [Observable] An error has occurred in the username request step {0}", throwable.getMessage())) + .doOnError((throwable) -> { + failures.add(throwable); + logger.log(Level.WARNING, " [Observable] An error has occurred in the username request step {0}", throwable.getMessage()); + }) .subscribe(userName -> hashService .register(RxObservableInvokerProvider.class) .resolveTemplate("comboIDandName", userName + id) @@ -167,13 +178,15 @@ public class ClientOrchestration { .rx(RxObservableInvoker.class) .get(String.class) .asObservable() //gotten the hash value for empId+username - .doOnError(throwable -> logger.log(Level.WARNING, " [Observable]An error has occurred in the hashing request step {0}", throwable.getMessage())) + .doOnError((throwable) -> { + failures.add(throwable); + logger.log(Level.WARNING, " [Observable]An error has occurred in the hashing request step {0}", throwable.getMessage()); + }) .subscribe(hashValue -> logger.log(Level.INFO, "[Observable] The hash output {0}", hashValue)))); }); } - public void flowableJavaOrchestrate() { logger.info("Orchestrating with Flowable"); @@ -191,13 +204,19 @@ public class ClientOrchestration { .request() .rx(RxFlowableInvoker.class) .get(String.class) //gotten the name for the given empId - .doOnError(throwable -> logger.log(Level.WARNING, "[Flowable] An error has occurred in the username request step {0}", throwable.getMessage())) + .doOnError((throwable) -> { + failures.add(throwable); + logger.log(Level.WARNING, "[Flowable] An error has occurred in the username request step {0}", throwable.getMessage()); + }) .subscribe(userName -> hashService.register(RxFlowableInvokerProvider.class) .resolveTemplate("comboIDandName", userName + id) .request() .rx(RxFlowableInvoker.class) .get(String.class) //gotten the hash value for empId+username - .doOnError(throwable -> logger.warning(" [Flowable] An error has occurred in the hashing request step " + throwable.getMessage())) + .doOnError((throwable) -> { + failures.add(throwable); + logger.warning(" [Flowable] An error has occurred in the hashing request step " + throwable.getMessage()); + }) .subscribe(hashValue -> logger.log(Level.INFO, "[Flowable] The hash output {0}", hashValue)))); }); diff --git a/jersey-client-rx/src/test/java/com/baeldung/samples/jerseyrx/ClientOrchestrationTest.java b/jersey-client-rx/src/test/java/com/baeldung/samples/jerseyrx/ClientOrchestrationTest.java index 4286e192c0..2158f29a61 100644 --- a/jersey-client-rx/src/test/java/com/baeldung/samples/jerseyrx/ClientOrchestrationTest.java +++ b/jersey-client-rx/src/test/java/com/baeldung/samples/jerseyrx/ClientOrchestrationTest.java @@ -7,6 +7,7 @@ import static com.github.tomakehurst.wiremock.client.WireMock.equalTo; import static com.github.tomakehurst.wiremock.client.WireMock.get; import static com.github.tomakehurst.wiremock.client.WireMock.stubFor; import static com.github.tomakehurst.wiremock.client.WireMock.urlEqualTo; +import junit.framework.Assert; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -16,52 +17,53 @@ import org.junit.Test; * @author baeldung */ public class ClientOrchestrationTest { - + ClientOrchestration orchestrator = new ClientOrchestration(); - + String jsonIdList = "{\"empIds\":[1,2,3,4,5,6]}"; - + String[] nameList = new String[]{"n/a", "Thor", "Hulk", "BlackWidow", "BlackPanther", "TheTick", "Hawkeye"}; - + String[] hashResultList = new String[]{"roht1", "kluh2", "WodiwKcalb3", "RehtnapKclab4", "kciteht5", "eyekwah6"}; - + WireMockServer wireMockServer = new WireMockServer(); - + @Before public void setup() { wireMockServer.start(); configureFor("localhost", 8080); stubFor(get(urlEqualTo("/serviceA/id")).willReturn(aResponse().withBody(jsonIdList).withHeader("Content-Type", "application/json"))); - + stubFor(get(urlEqualTo("/serviceA/1/name")).willReturn(aResponse().withBody(nameList[1]))); stubFor(get(urlEqualTo("/serviceA/2/name")).willReturn(aResponse().withBody(nameList[2]))); stubFor(get(urlEqualTo("/serviceA/3/name")).willReturn(aResponse().withBody(nameList[3]))); stubFor(get(urlEqualTo("/serviceA/4/name")).willReturn(aResponse().withBody(nameList[4]))); stubFor(get(urlEqualTo("/serviceA/5/name")).willReturn(aResponse().withBody(nameList[5]))); stubFor(get(urlEqualTo("/serviceA/6/name")).willReturn(aResponse().withBody(nameList[6]))); - + stubFor(get(urlEqualTo("/serviceA/Thor1/hash")).willReturn(aResponse().withBody(hashResultList[0]))); stubFor(get(urlEqualTo("/serviceA/Hulk2/hash")).willReturn(aResponse().withBody(hashResultList[1]))); stubFor(get(urlEqualTo("/serviceA/BlackWidow3/hash")).willReturn(aResponse().withBody(hashResultList[2]))); stubFor(get(urlEqualTo("/serviceA/BlackPanther4/hash")).willReturn(aResponse().withBody(hashResultList[3]))); stubFor(get(urlEqualTo("/serviceA/TheTick5/hash")).willReturn(aResponse().withBody(hashResultList[4]))); stubFor(get(urlEqualTo("/serviceA/Hawkeye6/hash")).willReturn(aResponse().withBody(hashResultList[5]))); - + } - + @Test public void hits() { - + orchestrator.callBackOrchestrate(); orchestrator.rxOrchestrate(); orchestrator.observableJavaOrchestrate(); orchestrator.flowableJavaOrchestrate(); - + + Assert.assertTrue(orchestrator.failures.isEmpty()); } - + @After public void tearDown() { - + } - + } From 2b53d299854bbe70c518e84041b456966cd7400d Mon Sep 17 00:00:00 2001 From: k0l0ssus Date: Tue, 7 Aug 2018 00:17:36 -0400 Subject: [PATCH 08/13] Update EmployeeDTO.java --- .../main/java/com/baeldung/samples/jerseyrx/EmployeeDTO.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/jersey-client-rx/src/main/java/com/baeldung/samples/jerseyrx/EmployeeDTO.java b/jersey-client-rx/src/main/java/com/baeldung/samples/jerseyrx/EmployeeDTO.java index ab3cfb54a2..a161448bb7 100644 --- a/jersey-client-rx/src/main/java/com/baeldung/samples/jerseyrx/EmployeeDTO.java +++ b/jersey-client-rx/src/main/java/com/baeldung/samples/jerseyrx/EmployeeDTO.java @@ -4,7 +4,7 @@ import java.util.List; /** * - * @author SIGINT-X + * @author baeldung */ public class EmployeeDTO { From 2b229332b966a006d0f2edb3b7d165a0510a08ff Mon Sep 17 00:00:00 2001 From: k0l0ssus Date: Fri, 10 Aug 2018 08:24:46 -0400 Subject: [PATCH 09/13] Add files via upload --- jersey-client-rx/pom.xml | 2 +- .../samples/jerseyrx/ClientOrchestration.java | 82 +++++++++---------- .../samples/jerseyrx/EmployeeDTO.java | 33 ++++++++ .../jerseyrx/ClientOrchestrationTest.java | 63 ++++++++------ 4 files changed, 107 insertions(+), 73 deletions(-) diff --git a/jersey-client-rx/pom.xml b/jersey-client-rx/pom.xml index fb7494fab1..4e35be31b4 100644 --- a/jersey-client-rx/pom.xml +++ b/jersey-client-rx/pom.xml @@ -40,7 +40,7 @@ org.glassfish.jersey.media jersey-media-json-jackson - 2.22 + 2.25 com.fasterxml.jackson.jaxrs diff --git a/jersey-client-rx/src/main/java/com/baeldung/samples/jerseyrx/ClientOrchestration.java b/jersey-client-rx/src/main/java/com/baeldung/samples/jerseyrx/ClientOrchestration.java index 6e184876cb..f2687c8c8d 100644 --- a/jersey-client-rx/src/main/java/com/baeldung/samples/jerseyrx/ClientOrchestration.java +++ b/jersey-client-rx/src/main/java/com/baeldung/samples/jerseyrx/ClientOrchestration.java @@ -1,7 +1,6 @@ package com.baeldung.samples.jerseyrx; import io.reactivex.Flowable; -import io.reactivex.disposables.Disposable; import java.util.LinkedList; import java.util.List; import java.util.concurrent.CompletableFuture; @@ -27,28 +26,22 @@ import rx.Observable; * @author baeldung */ public class ClientOrchestration { - + Client client = ClientBuilder.newClient(); + WebTarget userIdService = client.target("http://localhost:8080/serviceA/id"); WebTarget nameService = client.target("http://localhost:8080/serviceA/{empId}/name"); WebTarget hashService = client.target("http://localhost:8080/serviceA/{comboIDandName}/hash"); + + LinkedList failures = new LinkedList<>(); - + Logger logger = Logger.getLogger("ClientOrchestrator"); - - public static void main(String[] args) { - ClientOrchestration orchestrator = new ClientOrchestration(); - - orchestrator.callBackOrchestrate(); - orchestrator.rxOrchestrate(); - orchestrator.observableJavaOrchestrate(); - orchestrator.flowableJavaOrchestrate(); - - } - + public void callBackOrchestrate() { logger.info("Orchestrating with the pyramid of doom"); - userIdService.request().accept(MediaType.APPLICATION_JSON) + userIdService.request() + .accept(MediaType.APPLICATION_JSON) .async() .get(new InvocationCallback() { @Override @@ -61,7 +54,7 @@ public class ClientOrchestration { nameService.resolveTemplate("empId", id).request() .async() .get(new InvocationCallback() { - + @Override public void completed(String response) { completionTracker.countDown(); @@ -70,7 +63,7 @@ public class ClientOrchestration { public void completed(String response) { logger.log(Level.INFO, "[InvocationCallback] The hash output {0}", response); } - + @Override public void failed(Throwable throwable) { completionTracker.countDown(); @@ -79,7 +72,7 @@ public class ClientOrchestration { } }); } - + @Override public void failed(Throwable throwable) { completionTracker.countDown(); @@ -88,7 +81,7 @@ public class ClientOrchestration { } }); }); - + try { if (!completionTracker.await(10, TimeUnit.SECONDS)) { //wait for inner requests to complete in 10 seconds logger.warning("[InvocationCallback] Some requests didn't complete within the timeout"); @@ -97,9 +90,9 @@ public class ClientOrchestration { failures.add(ex); Logger.getLogger(ClientOrchestration.class.getName()).log(Level.SEVERE, null, ex); } - + } - + @Override public void failed(Throwable throwable) { failures.add(throwable); @@ -107,7 +100,7 @@ public class ClientOrchestration { } }); } - + public void rxOrchestrate() { logger.info("Orchestrating with a CompletionStage"); CompletionStage userIdStage = userIdService.request().accept(MediaType.APPLICATION_JSON) @@ -119,7 +112,7 @@ public class ClientOrchestration { logger.warning("[CompletionStage] An error has occurred"); return null; }); - + userIdStage.thenAcceptAsync(empIdDto -> { logger.info("[CompletionStage] Got all the IDs " + empIdDto.getEmpIds()); empIdDto.getEmpIds().stream().forEach((Long id) -> { @@ -128,7 +121,7 @@ public class ClientOrchestration { .rx() .get(String.class) .toCompletableFuture(); - + completable.thenAccept((String userName) -> { hashService.resolveTemplate("comboIDandName", userName + id) .request() @@ -141,24 +134,24 @@ public class ClientOrchestration { logger.log(Level.WARNING, "[CompletionStage] Hash computation failed for {0}", id); return null; }); - + }); - + }); }); - + } - + public void observableJavaOrchestrate() { - + logger.info("Orchestrating with Observables"); - Observable userIdObservable = userIdService.register(RxObservableInvokerProvider.class).request() + Observable observableUserIdService = userIdService.register(RxObservableInvokerProvider.class).request() .accept(MediaType.APPLICATION_JSON) .rx(RxObservableInvoker.class) .get(new GenericType() { - }); - - userIdObservable.subscribe((EmployeeDTO empIdList) -> { + }).asObservable(); + + observableUserIdService.subscribe((EmployeeDTO empIdList) -> { logger.info("[Observable] Got all the IDs " + empIdList.getEmpIds()); Observable.from(empIdList.getEmpIds()).subscribe(id -> nameService.register(RxObservableInvokerProvider.class) @@ -171,8 +164,7 @@ public class ClientOrchestration { failures.add(throwable); logger.log(Level.WARNING, " [Observable] An error has occurred in the username request step {0}", throwable.getMessage()); }) - .subscribe(userName -> hashService - .register(RxObservableInvokerProvider.class) + .subscribe(userName -> hashService.register(RxObservableInvokerProvider.class) .resolveTemplate("comboIDandName", userName + id) .request() .rx(RxObservableInvoker.class) @@ -184,21 +176,21 @@ public class ClientOrchestration { }) .subscribe(hashValue -> logger.log(Level.INFO, "[Observable] The hash output {0}", hashValue)))); }); - + } - + public void flowableJavaOrchestrate() { - - logger.info("Orchestrating with Flowable"); - Flowable userIdObservable = userIdService.register(RxFlowableInvokerProvider.class) + + Flowable userIdFlowable = userIdService.register(RxFlowableInvokerProvider.class) .request() .rx(RxFlowableInvoker.class) .get(new GenericType() { }); - - Disposable subscribe = userIdObservable.subscribe((EmployeeDTO dto) -> { + + userIdFlowable.subscribe((EmployeeDTO dto) -> { + logger.info("Orchestrating with Flowable"); List listOfIds = dto.getEmpIds(); - Observable.from(listOfIds).map(id + Flowable.just(listOfIds).subscribe(id -> nameService.register(RxFlowableInvokerProvider.class) .resolveTemplate("empId", id) .request() @@ -219,7 +211,7 @@ public class ClientOrchestration { }) .subscribe(hashValue -> logger.log(Level.INFO, "[Flowable] The hash output {0}", hashValue)))); }); - + } - + } diff --git a/jersey-client-rx/src/main/java/com/baeldung/samples/jerseyrx/EmployeeDTO.java b/jersey-client-rx/src/main/java/com/baeldung/samples/jerseyrx/EmployeeDTO.java index a161448bb7..3a818f979e 100644 --- a/jersey-client-rx/src/main/java/com/baeldung/samples/jerseyrx/EmployeeDTO.java +++ b/jersey-client-rx/src/main/java/com/baeldung/samples/jerseyrx/EmployeeDTO.java @@ -1,6 +1,7 @@ package com.baeldung.samples.jerseyrx; import java.util.List; +import java.util.Objects; /** * @@ -18,4 +19,36 @@ public class EmployeeDTO { this.empIds = empIds; } + @Override + public int hashCode() { + int hash = 5; + hash = 59 * hash + Objects.hashCode(this.empIds); + return hash; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + final EmployeeDTO other = (EmployeeDTO) obj; + if (!Objects.equals(this.empIds, other.empIds)) { + return false; + } + return true; + } + + @Override + public String toString() { + return "EmployeeDTO{" + "empIds=" + empIds + '}'; + } + + + } diff --git a/jersey-client-rx/src/test/java/com/baeldung/samples/jerseyrx/ClientOrchestrationTest.java b/jersey-client-rx/src/test/java/com/baeldung/samples/jerseyrx/ClientOrchestrationTest.java index 2158f29a61..6df0e1c110 100644 --- a/jersey-client-rx/src/test/java/com/baeldung/samples/jerseyrx/ClientOrchestrationTest.java +++ b/jersey-client-rx/src/test/java/com/baeldung/samples/jerseyrx/ClientOrchestrationTest.java @@ -1,15 +1,18 @@ package com.baeldung.samples.jerseyrx; -import com.github.tomakehurst.wiremock.WireMockServer; import static com.github.tomakehurst.wiremock.client.WireMock.aResponse; -import static com.github.tomakehurst.wiremock.client.WireMock.configureFor; -import static com.github.tomakehurst.wiremock.client.WireMock.equalTo; import static com.github.tomakehurst.wiremock.client.WireMock.get; import static com.github.tomakehurst.wiremock.client.WireMock.stubFor; import static com.github.tomakehurst.wiremock.client.WireMock.urlEqualTo; -import junit.framework.Assert; -import org.junit.After; +import com.github.tomakehurst.wiremock.junit.WireMockRule; +import java.util.LinkedList; +import java.util.logging.Logger; +import javax.ws.rs.client.Client; +import javax.ws.rs.client.ClientBuilder; +import javax.ws.rs.client.WebTarget; +import static junit.framework.Assert.assertTrue; import org.junit.Before; +import org.junit.Rule; import org.junit.Test; /** @@ -17,53 +20,59 @@ import org.junit.Test; * @author baeldung */ public class ClientOrchestrationTest { - + + Client client = ClientBuilder.newClient(); + + WebTarget userIdService = client.target("http://localhost:8080/serviceA/id"); + WebTarget nameService = client.target("http://localhost:8080/serviceA/{empId}/name"); + WebTarget hashService = client.target("http://localhost:8080/serviceA/{comboIDandName}/hash"); + + LinkedList failures = new LinkedList<>(); + + Logger logger = Logger.getLogger("ClientOrchestrator"); + ClientOrchestration orchestrator = new ClientOrchestration(); - + String jsonIdList = "{\"empIds\":[1,2,3,4,5,6]}"; - + String[] nameList = new String[]{"n/a", "Thor", "Hulk", "BlackWidow", "BlackPanther", "TheTick", "Hawkeye"}; - + String[] hashResultList = new String[]{"roht1", "kluh2", "WodiwKcalb3", "RehtnapKclab4", "kciteht5", "eyekwah6"}; - - WireMockServer wireMockServer = new WireMockServer(); - + + @Rule + public WireMockRule wireMockServer = new WireMockRule(); + @Before public void setup() { - wireMockServer.start(); - configureFor("localhost", 8080); + stubFor(get(urlEqualTo("/serviceA/id")).willReturn(aResponse().withBody(jsonIdList).withHeader("Content-Type", "application/json"))); - + stubFor(get(urlEqualTo("/serviceA/1/name")).willReturn(aResponse().withBody(nameList[1]))); stubFor(get(urlEqualTo("/serviceA/2/name")).willReturn(aResponse().withBody(nameList[2]))); stubFor(get(urlEqualTo("/serviceA/3/name")).willReturn(aResponse().withBody(nameList[3]))); stubFor(get(urlEqualTo("/serviceA/4/name")).willReturn(aResponse().withBody(nameList[4]))); stubFor(get(urlEqualTo("/serviceA/5/name")).willReturn(aResponse().withBody(nameList[5]))); stubFor(get(urlEqualTo("/serviceA/6/name")).willReturn(aResponse().withBody(nameList[6]))); - + stubFor(get(urlEqualTo("/serviceA/Thor1/hash")).willReturn(aResponse().withBody(hashResultList[0]))); stubFor(get(urlEqualTo("/serviceA/Hulk2/hash")).willReturn(aResponse().withBody(hashResultList[1]))); stubFor(get(urlEqualTo("/serviceA/BlackWidow3/hash")).willReturn(aResponse().withBody(hashResultList[2]))); stubFor(get(urlEqualTo("/serviceA/BlackPanther4/hash")).willReturn(aResponse().withBody(hashResultList[3]))); stubFor(get(urlEqualTo("/serviceA/TheTick5/hash")).willReturn(aResponse().withBody(hashResultList[4]))); stubFor(get(urlEqualTo("/serviceA/Hawkeye6/hash")).willReturn(aResponse().withBody(hashResultList[5]))); - + } - + @Test public void hits() { - + orchestrator.callBackOrchestrate(); orchestrator.rxOrchestrate(); orchestrator.observableJavaOrchestrate(); orchestrator.flowableJavaOrchestrate(); - - Assert.assertTrue(orchestrator.failures.isEmpty()); + + assertTrue(orchestrator.failures.isEmpty()); } - - @After - public void tearDown() { - - } - + + } From 623e29cecb279a0a8300a447fbe9f42d610fb5c4 Mon Sep 17 00:00:00 2001 From: Tom Hombergs Date: Wed, 15 Aug 2018 23:10:18 +0200 Subject: [PATCH 10/13] fixed async timing and 404 bug, refactored code --- jersey-client-rx/pom.xml | 11 + .../samples/jerseyrx/ClientOrchestration.java | 217 ---------------- .../samples/jerseyrx/EmployeeDTO.java | 54 ---- .../ClientOrchestrationIntegrationTest.java | 244 ++++++++++++++++++ .../jerseyrx/ClientOrchestrationTest.java | 78 ------ pom.xml | 1 + 6 files changed, 256 insertions(+), 349 deletions(-) delete mode 100644 jersey-client-rx/src/main/java/com/baeldung/samples/jerseyrx/ClientOrchestration.java delete mode 100644 jersey-client-rx/src/main/java/com/baeldung/samples/jerseyrx/EmployeeDTO.java create mode 100644 jersey-client-rx/src/test/java/com/baeldung/samples/jerseyrx/ClientOrchestrationIntegrationTest.java delete mode 100644 jersey-client-rx/src/test/java/com/baeldung/samples/jerseyrx/ClientOrchestrationTest.java diff --git a/jersey-client-rx/pom.xml b/jersey-client-rx/pom.xml index 4e35be31b4..24894869a5 100644 --- a/jersey-client-rx/pom.xml +++ b/jersey-client-rx/pom.xml @@ -47,6 +47,17 @@ jackson-jaxrs-json-provider 2.4.1 + + org.slf4j + slf4j-jdk14 + 1.7.25 + + + org.assertj + assertj-core + 3.10.0 + test + UTF-8 diff --git a/jersey-client-rx/src/main/java/com/baeldung/samples/jerseyrx/ClientOrchestration.java b/jersey-client-rx/src/main/java/com/baeldung/samples/jerseyrx/ClientOrchestration.java deleted file mode 100644 index f2687c8c8d..0000000000 --- a/jersey-client-rx/src/main/java/com/baeldung/samples/jerseyrx/ClientOrchestration.java +++ /dev/null @@ -1,217 +0,0 @@ -package com.baeldung.samples.jerseyrx; - -import io.reactivex.Flowable; -import java.util.LinkedList; -import java.util.List; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionStage; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.logging.Level; -import org.glassfish.jersey.client.rx.rxjava.RxObservableInvokerProvider; -import org.glassfish.jersey.client.rx.rxjava.RxObservableInvoker; -import java.util.logging.Logger; -import javax.ws.rs.client.Client; -import javax.ws.rs.client.ClientBuilder; -import javax.ws.rs.client.InvocationCallback; -import javax.ws.rs.client.WebTarget; -import javax.ws.rs.core.GenericType; -import javax.ws.rs.core.MediaType; -import org.glassfish.jersey.client.rx.rxjava2.RxFlowableInvoker; -import org.glassfish.jersey.client.rx.rxjava2.RxFlowableInvokerProvider; -import rx.Observable; - -/** - * - * @author baeldung - */ -public class ClientOrchestration { - - Client client = ClientBuilder.newClient(); - - WebTarget userIdService = client.target("http://localhost:8080/serviceA/id"); - WebTarget nameService = client.target("http://localhost:8080/serviceA/{empId}/name"); - WebTarget hashService = client.target("http://localhost:8080/serviceA/{comboIDandName}/hash"); - - - LinkedList failures = new LinkedList<>(); - - Logger logger = Logger.getLogger("ClientOrchestrator"); - - public void callBackOrchestrate() { - logger.info("Orchestrating with the pyramid of doom"); - userIdService.request() - .accept(MediaType.APPLICATION_JSON) - .async() - .get(new InvocationCallback() { - @Override - public void completed(EmployeeDTO empIdList) { - logger.info("[InvocationCallback] Got all the IDs " + empIdList.getEmpIds()); - List empIds = empIdList.getEmpIds(); - CountDownLatch completionTracker = new CountDownLatch(empIds.size()); //used to keep track of the progress of the subsequent calls - empIds.forEach((id) -> { - //for each employee ID, get the name - nameService.resolveTemplate("empId", id).request() - .async() - .get(new InvocationCallback() { - - @Override - public void completed(String response) { - completionTracker.countDown(); - hashService.resolveTemplate("comboIDandName", response + id).request().async().get(new InvocationCallback() { - @Override - public void completed(String response) { - logger.log(Level.INFO, "[InvocationCallback] The hash output {0}", response); - } - - @Override - public void failed(Throwable throwable) { - completionTracker.countDown(); - failures.add(throwable); - logger.log(Level.WARNING, "[InvocationCallback] An error has occurred in the hashing request step {0}", throwable.getMessage()); - } - }); - } - - @Override - public void failed(Throwable throwable) { - completionTracker.countDown(); - failures.add(throwable); - logger.log(Level.WARNING, "[InvocationCallback] An error has occurred in the username request step {0}", throwable.getMessage()); - } - }); - }); - - try { - if (!completionTracker.await(10, TimeUnit.SECONDS)) { //wait for inner requests to complete in 10 seconds - logger.warning("[InvocationCallback] Some requests didn't complete within the timeout"); - } - } catch (InterruptedException ex) { - failures.add(ex); - Logger.getLogger(ClientOrchestration.class.getName()).log(Level.SEVERE, null, ex); - } - - } - - @Override - public void failed(Throwable throwable) { - failures.add(throwable); - logger.warning("Couldn't get the list of IDs"); - } - }); - } - - public void rxOrchestrate() { - logger.info("Orchestrating with a CompletionStage"); - CompletionStage userIdStage = userIdService.request().accept(MediaType.APPLICATION_JSON) - .rx() - .get(new GenericType() { - }) - .exceptionally((throwable) -> { - failures.add(throwable); - logger.warning("[CompletionStage] An error has occurred"); - return null; - }); - - userIdStage.thenAcceptAsync(empIdDto -> { - logger.info("[CompletionStage] Got all the IDs " + empIdDto.getEmpIds()); - empIdDto.getEmpIds().stream().forEach((Long id) -> { - CompletableFuture completable = nameService.resolveTemplate("empId", id) - .request() - .rx() - .get(String.class) - .toCompletableFuture(); - - completable.thenAccept((String userName) -> { - hashService.resolveTemplate("comboIDandName", userName + id) - .request() - .rx() - .get(String.class) - .toCompletableFuture() - .thenAcceptAsync(hashValue -> logger.log(Level.INFO, "[CompletionFuture] The hash output {0}", hashValue)) - .exceptionally((throwable) -> { - failures.add(throwable); - logger.log(Level.WARNING, "[CompletionStage] Hash computation failed for {0}", id); - return null; - }); - - }); - - }); - }); - - } - - public void observableJavaOrchestrate() { - - logger.info("Orchestrating with Observables"); - Observable observableUserIdService = userIdService.register(RxObservableInvokerProvider.class).request() - .accept(MediaType.APPLICATION_JSON) - .rx(RxObservableInvoker.class) - .get(new GenericType() { - }).asObservable(); - - observableUserIdService.subscribe((EmployeeDTO empIdList) -> { - logger.info("[Observable] Got all the IDs " + empIdList.getEmpIds()); - Observable.from(empIdList.getEmpIds()).subscribe(id - -> nameService.register(RxObservableInvokerProvider.class) - .resolveTemplate("empId", id) - .request() - .rx(RxObservableInvoker.class) - .get(String.class) - .asObservable() //gotten the name for the given empId - .doOnError((throwable) -> { - failures.add(throwable); - logger.log(Level.WARNING, " [Observable] An error has occurred in the username request step {0}", throwable.getMessage()); - }) - .subscribe(userName -> hashService.register(RxObservableInvokerProvider.class) - .resolveTemplate("comboIDandName", userName + id) - .request() - .rx(RxObservableInvoker.class) - .get(String.class) - .asObservable() //gotten the hash value for empId+username - .doOnError((throwable) -> { - failures.add(throwable); - logger.log(Level.WARNING, " [Observable]An error has occurred in the hashing request step {0}", throwable.getMessage()); - }) - .subscribe(hashValue -> logger.log(Level.INFO, "[Observable] The hash output {0}", hashValue)))); - }); - - } - - public void flowableJavaOrchestrate() { - - Flowable userIdFlowable = userIdService.register(RxFlowableInvokerProvider.class) - .request() - .rx(RxFlowableInvoker.class) - .get(new GenericType() { - }); - - userIdFlowable.subscribe((EmployeeDTO dto) -> { - logger.info("Orchestrating with Flowable"); - List listOfIds = dto.getEmpIds(); - Flowable.just(listOfIds).subscribe(id - -> nameService.register(RxFlowableInvokerProvider.class) - .resolveTemplate("empId", id) - .request() - .rx(RxFlowableInvoker.class) - .get(String.class) //gotten the name for the given empId - .doOnError((throwable) -> { - failures.add(throwable); - logger.log(Level.WARNING, "[Flowable] An error has occurred in the username request step {0}", throwable.getMessage()); - }) - .subscribe(userName -> hashService.register(RxFlowableInvokerProvider.class) - .resolveTemplate("comboIDandName", userName + id) - .request() - .rx(RxFlowableInvoker.class) - .get(String.class) //gotten the hash value for empId+username - .doOnError((throwable) -> { - failures.add(throwable); - logger.warning(" [Flowable] An error has occurred in the hashing request step " + throwable.getMessage()); - }) - .subscribe(hashValue -> logger.log(Level.INFO, "[Flowable] The hash output {0}", hashValue)))); - }); - - } - -} diff --git a/jersey-client-rx/src/main/java/com/baeldung/samples/jerseyrx/EmployeeDTO.java b/jersey-client-rx/src/main/java/com/baeldung/samples/jerseyrx/EmployeeDTO.java deleted file mode 100644 index 3a818f979e..0000000000 --- a/jersey-client-rx/src/main/java/com/baeldung/samples/jerseyrx/EmployeeDTO.java +++ /dev/null @@ -1,54 +0,0 @@ -package com.baeldung.samples.jerseyrx; - -import java.util.List; -import java.util.Objects; - -/** - * - * @author baeldung - */ -public class EmployeeDTO { - - private List empIds; - - public List getEmpIds() { - return empIds; - } - - public void setEmpIds(List empIds) { - this.empIds = empIds; - } - - @Override - public int hashCode() { - int hash = 5; - hash = 59 * hash + Objects.hashCode(this.empIds); - return hash; - } - - @Override - public boolean equals(Object obj) { - if (this == obj) { - return true; - } - if (obj == null) { - return false; - } - if (getClass() != obj.getClass()) { - return false; - } - final EmployeeDTO other = (EmployeeDTO) obj; - if (!Objects.equals(this.empIds, other.empIds)) { - return false; - } - return true; - } - - @Override - public String toString() { - return "EmployeeDTO{" + "empIds=" + empIds + '}'; - } - - - -} diff --git a/jersey-client-rx/src/test/java/com/baeldung/samples/jerseyrx/ClientOrchestrationIntegrationTest.java b/jersey-client-rx/src/test/java/com/baeldung/samples/jerseyrx/ClientOrchestrationIntegrationTest.java new file mode 100644 index 0000000000..10cdab7c7a --- /dev/null +++ b/jersey-client-rx/src/test/java/com/baeldung/samples/jerseyrx/ClientOrchestrationIntegrationTest.java @@ -0,0 +1,244 @@ +package com.baeldung.samples.jerseyrx; + +import static com.github.tomakehurst.wiremock.client.WireMock.*; +import static org.assertj.core.api.Assertions.*; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import javax.ws.rs.client.Client; +import javax.ws.rs.client.ClientBuilder; +import javax.ws.rs.client.InvocationCallback; +import javax.ws.rs.client.WebTarget; +import javax.ws.rs.core.GenericType; +import javax.ws.rs.core.MediaType; + +import org.glassfish.jersey.client.rx.rxjava.RxObservableInvoker; +import org.glassfish.jersey.client.rx.rxjava.RxObservableInvokerProvider; +import org.glassfish.jersey.client.rx.rxjava2.RxFlowableInvoker; +import org.glassfish.jersey.client.rx.rxjava2.RxFlowableInvokerProvider; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.github.tomakehurst.wiremock.junit.WireMockRule; + +import io.reactivex.Flowable; +import rx.Observable; + +/** + * + * @author baeldung + */ +public class ClientOrchestrationIntegrationTest { + + private Client client = ClientBuilder.newClient(); + + private WebTarget userIdService = client.target("http://localhost:8080/id-service/ids"); + private WebTarget nameService = client.target("http://localhost:8080/name-service/users/{userId}/name"); + private WebTarget hashService = client.target("http://localhost:8080/hash-service/{rawValue}"); + + private Logger logger = LoggerFactory.getLogger(ClientOrchestrationIntegrationTest.class); + + private String expectedUserIds = "[1,2,3,4,5,6]"; + + private List expectedNames = Arrays.asList("n/a", "Thor", "Hulk", "BlackWidow", "BlackPanther", "TheTick", "Hawkeye"); + + private List expectedHashValues = Arrays.asList("roht1", "kluh2", "WodiwKcalb3", "RehtnapKclab4", "kciteht5", "eyekwah6"); + + @Rule + public WireMockRule wireMockServer = new WireMockRule(); + + @Before + public void setup() { + + stubFor(get(urlEqualTo("/id-service/ids")).willReturn(aResponse().withBody(expectedUserIds).withHeader("Content-Type", "application/json"))); + + stubFor(get(urlEqualTo("/name-service/users/1/name")).willReturn(aResponse().withBody(expectedNames.get(1)))); + stubFor(get(urlEqualTo("/name-service/users/2/name")).willReturn(aResponse().withBody(expectedNames.get(2)))); + stubFor(get(urlEqualTo("/name-service/users/3/name")).willReturn(aResponse().withBody(expectedNames.get(3)))); + stubFor(get(urlEqualTo("/name-service/users/4/name")).willReturn(aResponse().withBody(expectedNames.get(4)))); + stubFor(get(urlEqualTo("/name-service/users/5/name")).willReturn(aResponse().withBody(expectedNames.get(5)))); + stubFor(get(urlEqualTo("/name-service/users/6/name")).willReturn(aResponse().withBody(expectedNames.get(6)))); + + stubFor(get(urlEqualTo("/hash-service/Thor1")).willReturn(aResponse().withBody(expectedHashValues.get(0)))); + stubFor(get(urlEqualTo("/hash-service/Hulk2")).willReturn(aResponse().withBody(expectedHashValues.get(1)))); + stubFor(get(urlEqualTo("/hash-service/BlackWidow3")).willReturn(aResponse().withBody(expectedHashValues.get(2)))); + stubFor(get(urlEqualTo("/hash-service/BlackPanther4")).willReturn(aResponse().withBody(expectedHashValues.get(3)))); + stubFor(get(urlEqualTo("/hash-service/TheTick5")).willReturn(aResponse().withBody(expectedHashValues.get(4)))); + stubFor(get(urlEqualTo("/hash-service/Hawkeye6")).willReturn(aResponse().withBody(expectedHashValues.get(5)))); + + } + + @Test + public void callBackOrchestrate() throws InterruptedException { + List receivedHashValues = new ArrayList<>(); + + userIdService.request().accept(MediaType.APPLICATION_JSON).async().get(new InvocationCallback>() { + @Override + public void completed(List employeeIds) { + logger.info("[CallbackExample] id-service result: {}", employeeIds); + CountDownLatch completionTracker = new CountDownLatch(employeeIds.size()); // used to keep track of the progress of the subsequent calls + employeeIds.forEach((id) -> { + // for each employee ID, get the name + nameService.resolveTemplate("userId", id).request().async().get(new InvocationCallback() { + + @Override + public void completed(String response) { + logger.info("[CallbackExample] name-service result: {}", response); + + completionTracker.countDown(); + hashService.resolveTemplate("rawValue", response + id).request().async().get(new InvocationCallback() { + @Override + public void completed(String response) { + logger.info("[CallbackExample] hash-service result: {}", response); + receivedHashValues.add(response); + } + + @Override + public void failed(Throwable throwable) { + completionTracker.countDown(); + logger.warn("[CallbackExample] An error has occurred in the hashing request step!", throwable); + } + }); + } + + @Override + public void failed(Throwable throwable) { + completionTracker.countDown(); + logger.warn("[CallbackExample] An error has occurred in the username request step!", throwable); + } + }); + }); + + try { + // wait for inner requests to complete in 10 seconds + if (!completionTracker.await(10, TimeUnit.SECONDS)) { + logger.warn("[CallbackExample] Some requests didn't complete within the timeout"); + } + } catch (InterruptedException e) { + logger.error("Interrupted!", e); + } + + } + + @Override + public void failed(Throwable throwable) { + logger.warn("[CallbackExample] An error has occurred in the userId request step!", throwable); + } + }); + + // wait for async calls to complete + Thread.sleep(1000); + + assertThat(receivedHashValues).containsAll(expectedHashValues); + } + + @Test + public void rxOrchestrate() throws InterruptedException { + List receivedHashValues = new ArrayList<>(); + + CompletionStage> userIdStage = userIdService.request().accept(MediaType.APPLICATION_JSON).rx().get(new GenericType>() { + }).exceptionally((throwable) -> { + logger.warn("[CompletionStageExample] An error has occurred"); + return null; + }); + + userIdStage.thenAcceptAsync(employeeIds -> { + logger.info("[CompletionStageExample] id-service result: {}", employeeIds); + employeeIds.forEach((Long id) -> { + CompletableFuture completable = nameService.resolveTemplate("userId", id).request().rx().get(String.class).toCompletableFuture(); + + completable.thenAccept((String userName) -> { + logger.info("[CompletionStageExample] name-service result: {}", userName); + hashService.resolveTemplate("rawValue", userName + id).request().rx().get(String.class).toCompletableFuture().thenAcceptAsync(hashValue -> { + logger.info("[CompletionStageExample] hash-service result: {}", hashValue); + receivedHashValues.add(hashValue); + }).exceptionally((throwable) -> { + logger.warn("[CompletionStageExample] Hash computation failed for {}", id); + return null; + }); + + }); + + }); + }); + + // wait for async calls to complete + Thread.sleep(1000); + + assertThat(receivedHashValues).containsAll(expectedHashValues); + } + + @Test + public void observableJavaOrchestrate() throws InterruptedException { + List receivedHashValues = new ArrayList<>(); + + Observable> observableUserIdService = userIdService.register(RxObservableInvokerProvider.class).request().accept(MediaType.APPLICATION_JSON).rx(RxObservableInvoker.class).get(new GenericType>() { + }).asObservable(); + + observableUserIdService.subscribe((List employeeIds) -> { + logger.info("[ObservableExample] id-service result: {}", employeeIds); + Observable.from(employeeIds).subscribe(id -> nameService.register(RxObservableInvokerProvider.class).resolveTemplate("userId", id).request().rx(RxObservableInvoker.class).get(String.class).asObservable() // gotten the name for the given + // userId + .doOnError((throwable) -> { + logger.warn("[ObservableExample] An error has occurred in the username request step {}", throwable.getMessage()); + }).subscribe(userName -> { + logger.info("[ObservableExample] name-service result: {}", userName); + hashService.register(RxObservableInvokerProvider.class).resolveTemplate("rawValue", userName + id).request().rx(RxObservableInvoker.class).get(String.class).asObservable() // gotten the hash value for + // userId+username + .doOnError((throwable) -> { + logger.warn("[ObservableExample] An error has occurred in the hashing request step {}", throwable.getMessage()); + }).subscribe(hashValue -> { + logger.info("[ObservableExample] hash-service result: {}", hashValue); + receivedHashValues.add(hashValue); + }); + })); + }); + + // wait for async calls to complete + Thread.sleep(1000); + + assertThat(receivedHashValues).containsAll(expectedHashValues); + } + + @Test + public void flowableJavaOrchestrate() throws InterruptedException { + List receivedHashValues = new ArrayList<>(); + + Flowable> userIdFlowable = userIdService.register(RxFlowableInvokerProvider.class).request().rx(RxFlowableInvoker.class).get(new GenericType>() { + }); + + userIdFlowable.subscribe((List employeeIds) -> { + logger.info("[FlowableExample] id-service result: {}", employeeIds); + Flowable.fromIterable(employeeIds).subscribe(id -> { + nameService.register(RxFlowableInvokerProvider.class).resolveTemplate("userId", id).request().rx(RxFlowableInvoker.class).get(String.class) // gotten the name for the given userId + .doOnError((throwable) -> { + logger.warn("[FlowableExample] An error has occurred in the username request step {}", throwable.getMessage()); + }).subscribe(userName -> { + logger.info("[FlowableExample] name-service result: {}", userName); + hashService.register(RxFlowableInvokerProvider.class).resolveTemplate("rawValue", userName + id).request().rx(RxFlowableInvoker.class).get(String.class) // gotten the hash value for userId+username + .doOnError((throwable) -> { + logger.warn(" [FlowableExample] An error has occurred in the hashing request step!", throwable); + }).subscribe(hashValue -> { + logger.info("[FlowableExample] hash-service result: {}", hashValue); + receivedHashValues.add(hashValue); + }); + }); + }); + }); + + // wait for async calls to complete + Thread.sleep(1000); + + assertThat(receivedHashValues).containsAll(expectedHashValues); + } + +} diff --git a/jersey-client-rx/src/test/java/com/baeldung/samples/jerseyrx/ClientOrchestrationTest.java b/jersey-client-rx/src/test/java/com/baeldung/samples/jerseyrx/ClientOrchestrationTest.java deleted file mode 100644 index 6df0e1c110..0000000000 --- a/jersey-client-rx/src/test/java/com/baeldung/samples/jerseyrx/ClientOrchestrationTest.java +++ /dev/null @@ -1,78 +0,0 @@ -package com.baeldung.samples.jerseyrx; - -import static com.github.tomakehurst.wiremock.client.WireMock.aResponse; -import static com.github.tomakehurst.wiremock.client.WireMock.get; -import static com.github.tomakehurst.wiremock.client.WireMock.stubFor; -import static com.github.tomakehurst.wiremock.client.WireMock.urlEqualTo; -import com.github.tomakehurst.wiremock.junit.WireMockRule; -import java.util.LinkedList; -import java.util.logging.Logger; -import javax.ws.rs.client.Client; -import javax.ws.rs.client.ClientBuilder; -import javax.ws.rs.client.WebTarget; -import static junit.framework.Assert.assertTrue; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; - -/** - * - * @author baeldung - */ -public class ClientOrchestrationTest { - - Client client = ClientBuilder.newClient(); - - WebTarget userIdService = client.target("http://localhost:8080/serviceA/id"); - WebTarget nameService = client.target("http://localhost:8080/serviceA/{empId}/name"); - WebTarget hashService = client.target("http://localhost:8080/serviceA/{comboIDandName}/hash"); - - LinkedList failures = new LinkedList<>(); - - Logger logger = Logger.getLogger("ClientOrchestrator"); - - ClientOrchestration orchestrator = new ClientOrchestration(); - - String jsonIdList = "{\"empIds\":[1,2,3,4,5,6]}"; - - String[] nameList = new String[]{"n/a", "Thor", "Hulk", "BlackWidow", "BlackPanther", "TheTick", "Hawkeye"}; - - String[] hashResultList = new String[]{"roht1", "kluh2", "WodiwKcalb3", "RehtnapKclab4", "kciteht5", "eyekwah6"}; - - @Rule - public WireMockRule wireMockServer = new WireMockRule(); - - @Before - public void setup() { - - stubFor(get(urlEqualTo("/serviceA/id")).willReturn(aResponse().withBody(jsonIdList).withHeader("Content-Type", "application/json"))); - - stubFor(get(urlEqualTo("/serviceA/1/name")).willReturn(aResponse().withBody(nameList[1]))); - stubFor(get(urlEqualTo("/serviceA/2/name")).willReturn(aResponse().withBody(nameList[2]))); - stubFor(get(urlEqualTo("/serviceA/3/name")).willReturn(aResponse().withBody(nameList[3]))); - stubFor(get(urlEqualTo("/serviceA/4/name")).willReturn(aResponse().withBody(nameList[4]))); - stubFor(get(urlEqualTo("/serviceA/5/name")).willReturn(aResponse().withBody(nameList[5]))); - stubFor(get(urlEqualTo("/serviceA/6/name")).willReturn(aResponse().withBody(nameList[6]))); - - stubFor(get(urlEqualTo("/serviceA/Thor1/hash")).willReturn(aResponse().withBody(hashResultList[0]))); - stubFor(get(urlEqualTo("/serviceA/Hulk2/hash")).willReturn(aResponse().withBody(hashResultList[1]))); - stubFor(get(urlEqualTo("/serviceA/BlackWidow3/hash")).willReturn(aResponse().withBody(hashResultList[2]))); - stubFor(get(urlEqualTo("/serviceA/BlackPanther4/hash")).willReturn(aResponse().withBody(hashResultList[3]))); - stubFor(get(urlEqualTo("/serviceA/TheTick5/hash")).willReturn(aResponse().withBody(hashResultList[4]))); - stubFor(get(urlEqualTo("/serviceA/Hawkeye6/hash")).willReturn(aResponse().withBody(hashResultList[5]))); - - } - - @Test - public void hits() { - - orchestrator.callBackOrchestrate(); - orchestrator.rxOrchestrate(); - orchestrator.observableJavaOrchestrate(); - orchestrator.flowableJavaOrchestrate(); - - assertTrue(orchestrator.failures.isEmpty()); - } - - -} diff --git a/pom.xml b/pom.xml index 4a25459fcb..87278ee3d0 100644 --- a/pom.xml +++ b/pom.xml @@ -188,6 +188,7 @@ spring-integration spring-jenkins-pipeline spring-jersey + jersey-client-rx jmeter spring-jms spring-jooq From fac564c4d5b3727a4da9cc1966cea3c34104debe Mon Sep 17 00:00:00 2001 From: Tom Hombergs Date: Wed, 15 Aug 2018 23:23:18 +0200 Subject: [PATCH 11/13] added jersey-client-rx to parent POM --- pom.xml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pom.xml b/pom.xml index db3bef7fda..0bed4ead26 100644 --- a/pom.xml +++ b/pom.xml @@ -576,6 +576,7 @@ spring-security-thymeleaf persistence-modules/java-jdbi jersey + jersey-client-rx java-spi performance-tests twilio @@ -1106,6 +1107,7 @@ spring-security-thymeleaf persistence-modules/java-jdbi jersey + jersey-client-rx java-spi performance-tests twilio From e6abd9d474023f339d02f72f55bb4f536da564aa Mon Sep 17 00:00:00 2001 From: Tom Hombergs Date: Thu, 16 Aug 2018 08:27:17 +0200 Subject: [PATCH 12/13] added CountDownLatch to all examples --- .../ClientOrchestrationIntegrationTest.java | 88 ++++++++++++------- 1 file changed, 58 insertions(+), 30 deletions(-) diff --git a/jersey-client-rx/src/test/java/com/baeldung/samples/jerseyrx/ClientOrchestrationIntegrationTest.java b/jersey-client-rx/src/test/java/com/baeldung/samples/jerseyrx/ClientOrchestrationIntegrationTest.java index 10cdab7c7a..88a8d67a7d 100644 --- a/jersey-client-rx/src/test/java/com/baeldung/samples/jerseyrx/ClientOrchestrationIntegrationTest.java +++ b/jersey-client-rx/src/test/java/com/baeldung/samples/jerseyrx/ClientOrchestrationIntegrationTest.java @@ -81,11 +81,12 @@ public class ClientOrchestrationIntegrationTest { public void callBackOrchestrate() throws InterruptedException { List receivedHashValues = new ArrayList<>(); + final CountDownLatch completionTracker = new CountDownLatch(expectedHashValues.size()); // used to keep track of the progress of the subsequent calls + userIdService.request().accept(MediaType.APPLICATION_JSON).async().get(new InvocationCallback>() { @Override public void completed(List employeeIds) { logger.info("[CallbackExample] id-service result: {}", employeeIds); - CountDownLatch completionTracker = new CountDownLatch(employeeIds.size()); // used to keep track of the progress of the subsequent calls employeeIds.forEach((id) -> { // for each employee ID, get the name nameService.resolveTemplate("userId", id).request().async().get(new InvocationCallback() { @@ -94,17 +95,16 @@ public class ClientOrchestrationIntegrationTest { public void completed(String response) { logger.info("[CallbackExample] name-service result: {}", response); - completionTracker.countDown(); hashService.resolveTemplate("rawValue", response + id).request().async().get(new InvocationCallback() { @Override public void completed(String response) { logger.info("[CallbackExample] hash-service result: {}", response); receivedHashValues.add(response); + completionTracker.countDown(); } @Override public void failed(Throwable throwable) { - completionTracker.countDown(); logger.warn("[CallbackExample] An error has occurred in the hashing request step!", throwable); } }); @@ -112,21 +112,11 @@ public class ClientOrchestrationIntegrationTest { @Override public void failed(Throwable throwable) { - completionTracker.countDown(); logger.warn("[CallbackExample] An error has occurred in the username request step!", throwable); } }); }); - try { - // wait for inner requests to complete in 10 seconds - if (!completionTracker.await(10, TimeUnit.SECONDS)) { - logger.warn("[CallbackExample] Some requests didn't complete within the timeout"); - } - } catch (InterruptedException e) { - logger.error("Interrupted!", e); - } - } @Override @@ -136,7 +126,14 @@ public class ClientOrchestrationIntegrationTest { }); // wait for async calls to complete - Thread.sleep(1000); + try { + // wait for inner requests to complete in 10 seconds + if (!completionTracker.await(10, TimeUnit.SECONDS)) { + logger.warn("[CallbackExample] Some requests didn't complete within the timeout"); + } + } catch (InterruptedException e) { + logger.error("Interrupted!", e); + } assertThat(receivedHashValues).containsAll(expectedHashValues); } @@ -145,6 +142,8 @@ public class ClientOrchestrationIntegrationTest { public void rxOrchestrate() throws InterruptedException { List receivedHashValues = new ArrayList<>(); + final CountDownLatch completionTracker = new CountDownLatch(expectedHashValues.size()); // used to keep track of the progress of the subsequent calls + CompletionStage> userIdStage = userIdService.request().accept(MediaType.APPLICATION_JSON).rx().get(new GenericType>() { }).exceptionally((throwable) -> { logger.warn("[CompletionStageExample] An error has occurred"); @@ -161,8 +160,10 @@ public class ClientOrchestrationIntegrationTest { hashService.resolveTemplate("rawValue", userName + id).request().rx().get(String.class).toCompletableFuture().thenAcceptAsync(hashValue -> { logger.info("[CompletionStageExample] hash-service result: {}", hashValue); receivedHashValues.add(hashValue); + completionTracker.countDown(); }).exceptionally((throwable) -> { logger.warn("[CompletionStageExample] Hash computation failed for {}", id); + completionTracker.countDown(); return null; }); @@ -172,7 +173,14 @@ public class ClientOrchestrationIntegrationTest { }); // wait for async calls to complete - Thread.sleep(1000); + try { + // wait for inner requests to complete in 10 seconds + if (!completionTracker.await(10, TimeUnit.SECONDS)) { + logger.warn("[CallbackExample] Some requests didn't complete within the timeout"); + } + } catch (InterruptedException e) { + logger.error("Interrupted!", e); + } assertThat(receivedHashValues).containsAll(expectedHashValues); } @@ -181,13 +189,15 @@ public class ClientOrchestrationIntegrationTest { public void observableJavaOrchestrate() throws InterruptedException { List receivedHashValues = new ArrayList<>(); + final CountDownLatch completionTracker = new CountDownLatch(expectedHashValues.size()); // used to keep track of the progress of the subsequent calls + Observable> observableUserIdService = userIdService.register(RxObservableInvokerProvider.class).request().accept(MediaType.APPLICATION_JSON).rx(RxObservableInvoker.class).get(new GenericType>() { }).asObservable(); observableUserIdService.subscribe((List employeeIds) -> { logger.info("[ObservableExample] id-service result: {}", employeeIds); Observable.from(employeeIds).subscribe(id -> nameService.register(RxObservableInvokerProvider.class).resolveTemplate("userId", id).request().rx(RxObservableInvoker.class).get(String.class).asObservable() // gotten the name for the given - // userId + // userId .doOnError((throwable) -> { logger.warn("[ObservableExample] An error has occurred in the username request step {}", throwable.getMessage()); }).subscribe(userName -> { @@ -197,14 +207,22 @@ public class ClientOrchestrationIntegrationTest { .doOnError((throwable) -> { logger.warn("[ObservableExample] An error has occurred in the hashing request step {}", throwable.getMessage()); }).subscribe(hashValue -> { - logger.info("[ObservableExample] hash-service result: {}", hashValue); - receivedHashValues.add(hashValue); - }); + logger.info("[ObservableExample] hash-service result: {}", hashValue); + receivedHashValues.add(hashValue); + completionTracker.countDown(); + }); })); }); // wait for async calls to complete - Thread.sleep(1000); + try { + // wait for inner requests to complete in 10 seconds + if (!completionTracker.await(10, TimeUnit.SECONDS)) { + logger.warn("[CallbackExample] Some requests didn't complete within the timeout"); + } + } catch (InterruptedException e) { + logger.error("Interrupted!", e); + } assertThat(receivedHashValues).containsAll(expectedHashValues); } @@ -213,6 +231,8 @@ public class ClientOrchestrationIntegrationTest { public void flowableJavaOrchestrate() throws InterruptedException { List receivedHashValues = new ArrayList<>(); + final CountDownLatch completionTracker = new CountDownLatch(expectedHashValues.size()); // used to keep track of the progress of the subsequent calls + Flowable> userIdFlowable = userIdService.register(RxFlowableInvokerProvider.class).request().rx(RxFlowableInvoker.class).get(new GenericType>() { }); @@ -223,20 +243,28 @@ public class ClientOrchestrationIntegrationTest { .doOnError((throwable) -> { logger.warn("[FlowableExample] An error has occurred in the username request step {}", throwable.getMessage()); }).subscribe(userName -> { - logger.info("[FlowableExample] name-service result: {}", userName); - hashService.register(RxFlowableInvokerProvider.class).resolveTemplate("rawValue", userName + id).request().rx(RxFlowableInvoker.class).get(String.class) // gotten the hash value for userId+username - .doOnError((throwable) -> { - logger.warn(" [FlowableExample] An error has occurred in the hashing request step!", throwable); - }).subscribe(hashValue -> { - logger.info("[FlowableExample] hash-service result: {}", hashValue); - receivedHashValues.add(hashValue); - }); - }); + logger.info("[FlowableExample] name-service result: {}", userName); + hashService.register(RxFlowableInvokerProvider.class).resolveTemplate("rawValue", userName + id).request().rx(RxFlowableInvoker.class).get(String.class) // gotten the hash value for userId+username + .doOnError((throwable) -> { + logger.warn(" [FlowableExample] An error has occurred in the hashing request step!", throwable); + }).subscribe(hashValue -> { + logger.info("[FlowableExample] hash-service result: {}", hashValue); + receivedHashValues.add(hashValue); + completionTracker.countDown(); + }); + }); }); }); // wait for async calls to complete - Thread.sleep(1000); + try { + // wait for inner requests to complete in 10 seconds + if (!completionTracker.await(10, TimeUnit.SECONDS)) { + logger.warn("[CallbackExample] Some requests didn't complete within the timeout"); + } + } catch (InterruptedException e) { + logger.error("Interrupted!", e); + } assertThat(receivedHashValues).containsAll(expectedHashValues); } From 565a11620b93ad9d0b7b0c09f30a282008cfc879 Mon Sep 17 00:00:00 2001 From: xamcross Date: Mon, 27 Aug 2018 00:19:42 +0300 Subject: [PATCH 13/13] BAEL-2070 (#5064) * BAEL-2070 UnsatisfiedDependencyException example app * [BAEL-8456] - Moved Java Date articles into a new module - 'java-dates' * BAEL-2070 Refactoring; Replaced field injection with constructor injection * fix package, fix get random node * update neo4j * fix formatting * [BAEL-8456] - Moved more articles into 'java-dates' module * BAEL-2070 Small indentation fix --- .../exception/app/CustomConfiguration.java | 13 +++++++++++++ .../exception/app/PurchaseDeptService.java | 13 +++++++++++++ .../exception/repository/DressRepository.java | 9 +++++++++ .../exception/repository/InventoryRepository.java | 7 +++++++ .../exception/repository/ShoeRepository.java | 7 +++++++ 5 files changed, 49 insertions(+) create mode 100644 spring-core/src/main/java/com/baeldung/dependency/exception/app/CustomConfiguration.java create mode 100644 spring-core/src/main/java/com/baeldung/dependency/exception/app/PurchaseDeptService.java create mode 100644 spring-core/src/main/java/com/baeldung/dependency/exception/repository/DressRepository.java create mode 100644 spring-core/src/main/java/com/baeldung/dependency/exception/repository/InventoryRepository.java create mode 100644 spring-core/src/main/java/com/baeldung/dependency/exception/repository/ShoeRepository.java diff --git a/spring-core/src/main/java/com/baeldung/dependency/exception/app/CustomConfiguration.java b/spring-core/src/main/java/com/baeldung/dependency/exception/app/CustomConfiguration.java new file mode 100644 index 0000000000..4366cb617a --- /dev/null +++ b/spring-core/src/main/java/com/baeldung/dependency/exception/app/CustomConfiguration.java @@ -0,0 +1,13 @@ +package com.baeldung.dependency.exception.app; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.context.annotation.ComponentScan; + +@SpringBootApplication +@ComponentScan(basePackages = "com.baeldung.dependency.exception") +public class CustomConfiguration { + public static void main(String[] args) { + SpringApplication.run(CustomConfiguration.class, args); + } +} diff --git a/spring-core/src/main/java/com/baeldung/dependency/exception/app/PurchaseDeptService.java b/spring-core/src/main/java/com/baeldung/dependency/exception/app/PurchaseDeptService.java new file mode 100644 index 0000000000..1e6fad63aa --- /dev/null +++ b/spring-core/src/main/java/com/baeldung/dependency/exception/app/PurchaseDeptService.java @@ -0,0 +1,13 @@ +package com.baeldung.dependency.exception.app; + +import com.baeldung.dependency.exception.repository.InventoryRepository; +import org.springframework.stereotype.Service; + +@Service +public class PurchaseDeptService { + private InventoryRepository repository; + + public PurchaseDeptService(InventoryRepository repository) { + this.repository = repository; + } +} \ No newline at end of file diff --git a/spring-core/src/main/java/com/baeldung/dependency/exception/repository/DressRepository.java b/spring-core/src/main/java/com/baeldung/dependency/exception/repository/DressRepository.java new file mode 100644 index 0000000000..4a6c836143 --- /dev/null +++ b/spring-core/src/main/java/com/baeldung/dependency/exception/repository/DressRepository.java @@ -0,0 +1,9 @@ +package com.baeldung.dependency.exception.repository; + +import org.springframework.context.annotation.Primary; +import org.springframework.stereotype.Repository; + +@Primary +@Repository +public class DressRepository implements InventoryRepository { +} diff --git a/spring-core/src/main/java/com/baeldung/dependency/exception/repository/InventoryRepository.java b/spring-core/src/main/java/com/baeldung/dependency/exception/repository/InventoryRepository.java new file mode 100644 index 0000000000..ccb2ad9c32 --- /dev/null +++ b/spring-core/src/main/java/com/baeldung/dependency/exception/repository/InventoryRepository.java @@ -0,0 +1,7 @@ +package com.baeldung.dependency.exception.repository; + +import org.springframework.stereotype.Repository; + +@Repository +public interface InventoryRepository { +} diff --git a/spring-core/src/main/java/com/baeldung/dependency/exception/repository/ShoeRepository.java b/spring-core/src/main/java/com/baeldung/dependency/exception/repository/ShoeRepository.java new file mode 100644 index 0000000000..60495914cd --- /dev/null +++ b/spring-core/src/main/java/com/baeldung/dependency/exception/repository/ShoeRepository.java @@ -0,0 +1,7 @@ +package com.baeldung.dependency.exception.repository; + +import org.springframework.stereotype.Repository; + +@Repository +public class ShoeRepository implements InventoryRepository { +}