BAEL-3353: Asynchronous HTTP Programming with Play Framework (#8349)
* project creation and added ws library * BAEL-3353: pushing code for article * BAEL-3353: changing test names * BAEL-3353: removed unused imports * BAEL-3353: applied review changes and fixed unit test bug * BAEL-3353: added more examples to PR * BAEL-3353: updated test following feedback * BAEL-3353: add new unit test for large data download * BAEL-3353: fixed PR following feedback
This commit is contained in:
@@ -0,0 +1,232 @@
|
||||
package controllers;
|
||||
|
||||
import static java.time.temporal.ChronoUnit.SECONDS;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static play.mvc.Http.Status.SERVICE_UNAVAILABLE;
|
||||
|
||||
import akka.Done;
|
||||
import akka.actor.ActorSystem;
|
||||
import akka.stream.ActorMaterializer;
|
||||
import akka.stream.javadsl.Sink;
|
||||
import akka.util.ByteString;
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.time.Duration;
|
||||
import java.util.OptionalInt;
|
||||
import java.util.concurrent.CompletionStage;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.stream.IntStream;
|
||||
import org.apache.http.HttpStatus;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import play.Application;
|
||||
import play.inject.guice.GuiceApplicationBuilder;
|
||||
import play.libs.concurrent.Futures;
|
||||
import play.libs.ws.WSClient;
|
||||
import play.libs.ws.WSResponse;
|
||||
import play.libs.ws.ahc.AhcCurlRequestLogger;
|
||||
import play.mvc.Result;
|
||||
import play.mvc.Results;
|
||||
import play.test.WithServer;
|
||||
|
||||
public class HomeControllerTest extends WithServer {
|
||||
|
||||
private final Logger log = LoggerFactory.getLogger(HomeControllerTest.class);
|
||||
private String url;
|
||||
private int port;
|
||||
|
||||
@Override
|
||||
protected Application provideApplication() {
|
||||
return new GuiceApplicationBuilder().build();
|
||||
}
|
||||
|
||||
@Before
|
||||
public void setup() {
|
||||
OptionalInt optHttpsPort = testServer.getRunningHttpsPort();
|
||||
if (optHttpsPort.isPresent()) {
|
||||
port = optHttpsPort.getAsInt();
|
||||
url = "https://localhost:" + port;
|
||||
} else {
|
||||
port = testServer.getRunningHttpPort()
|
||||
.getAsInt();
|
||||
url = "http://localhost:" + port;
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenASingleGetRequestWhenResponseThenBlockWithCompletableAndLog()
|
||||
throws Exception {
|
||||
WSClient ws = play.test.WSTestClient.newClient(port);
|
||||
WSResponse wsResponse = ws.url(url)
|
||||
.setRequestFilter(new AhcCurlRequestLogger())
|
||||
.addHeader("key", "value")
|
||||
.addQueryParameter("num", "" + 1)
|
||||
.get()
|
||||
.toCompletableFuture()
|
||||
.get();
|
||||
|
||||
log.debug("Thread#" + Thread.currentThread()
|
||||
.getId() + " Request complete: Response code = "
|
||||
+ wsResponse.getStatus()
|
||||
+ " | Response: " + wsResponse.getBody() + " | Current Time:"
|
||||
+ System.currentTimeMillis());
|
||||
assert (HttpStatus.SC_OK == wsResponse.getStatus());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenASingleGetRequestWhenResponseThenLog() throws Exception {
|
||||
CountDownLatch latch = new CountDownLatch(1);
|
||||
WSClient ws = play.test.WSTestClient.newClient(port);
|
||||
ws.url(url)
|
||||
.setRequestFilter(new AhcCurlRequestLogger())
|
||||
.addHeader("key", "value")
|
||||
.addQueryParameter("num", "" + 1)
|
||||
.get()
|
||||
.thenAccept(r -> {
|
||||
log.debug("Thread#" + Thread.currentThread()
|
||||
.getId() + " Request complete: Response code = "
|
||||
+ r.getStatus()
|
||||
+ " | Response: " + r.getBody() + " | Current Time:" + System.currentTimeMillis());
|
||||
latch.countDown();
|
||||
});
|
||||
|
||||
log.debug(
|
||||
"Waiting for requests to be completed. Current Time: " + System.currentTimeMillis());
|
||||
latch.await(5, TimeUnit.SECONDS );
|
||||
assertEquals(0, latch.getCount());
|
||||
log.debug("All requests have been completed. Exiting test.");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenASinglePostRequestWhenResponseThenLog() throws Exception {
|
||||
CountDownLatch latch = new CountDownLatch(1);
|
||||
WSClient ws = play.test.WSTestClient.newClient(port);
|
||||
ws.url(url)
|
||||
.setContentType("application/x-www-form-urlencoded")
|
||||
.post("key1=value1&key2=value2")
|
||||
.thenAccept(r -> {
|
||||
log.debug("Thread#" + Thread.currentThread()
|
||||
.getId() + " Request complete: Response code = "
|
||||
+ r.getStatus()
|
||||
+ " | Response: " + r.getBody() + " | Current Time:" + System.currentTimeMillis());
|
||||
latch.countDown();
|
||||
});
|
||||
|
||||
log.debug(
|
||||
"Waiting for requests to be completed. Current Time: " + System.currentTimeMillis());
|
||||
latch.await(5, TimeUnit.SECONDS );
|
||||
assertEquals(0, latch.getCount());
|
||||
log.debug("All requests have been completed. Exiting test.");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenMultipleRequestsWhenResponseThenLog() throws Exception {
|
||||
CountDownLatch latch = new CountDownLatch(100);
|
||||
WSClient ws = play.test.WSTestClient.newClient(port);
|
||||
IntStream.range(0, 100)
|
||||
.parallel()
|
||||
.forEach(num ->
|
||||
ws.url(url)
|
||||
.setRequestFilter(new AhcCurlRequestLogger())
|
||||
.addHeader("key", "value")
|
||||
.addQueryParameter("num", "" + num)
|
||||
.get()
|
||||
.thenAccept(r -> {
|
||||
log.debug(
|
||||
"Thread#" + num + " Request complete: Response code = " + r.getStatus()
|
||||
+ " | Response: " + r.getBody() + " | Current Time:"
|
||||
+ System.currentTimeMillis());
|
||||
latch.countDown();
|
||||
})
|
||||
);
|
||||
|
||||
log.debug(
|
||||
"Waiting for requests to be completed. Current Time: " + System.currentTimeMillis());
|
||||
latch.await(5, TimeUnit.SECONDS );
|
||||
assertEquals(0, latch.getCount());
|
||||
log.debug("All requests have been completed. Exiting test.");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenLongResponseWhenTimeoutThenHandle() throws Exception {
|
||||
CountDownLatch latch = new CountDownLatch(1);
|
||||
WSClient ws = play.test.WSTestClient.newClient(port);
|
||||
Futures futures = app.injector()
|
||||
.instanceOf(Futures.class);
|
||||
CompletionStage<Result> f = futures.timeout(
|
||||
ws.url(url)
|
||||
.setRequestTimeout(Duration.of(1, SECONDS))
|
||||
.get()
|
||||
.thenApply(result -> {
|
||||
try {
|
||||
Thread.sleep(2000L);
|
||||
return Results.ok();
|
||||
} catch (InterruptedException e) {
|
||||
return Results.status(
|
||||
SERVICE_UNAVAILABLE);
|
||||
}
|
||||
}), 1L, TimeUnit.SECONDS
|
||||
);
|
||||
CompletionStage<Object> res = f.handleAsync((result, e) -> {
|
||||
if (e != null) {
|
||||
log.error("Exception thrown", e);
|
||||
latch.countDown();
|
||||
return e.getCause();
|
||||
} else {
|
||||
return result;
|
||||
}
|
||||
});
|
||||
res.thenAccept(result -> assertEquals(TimeoutException.class, result));
|
||||
|
||||
log.debug(
|
||||
"Waiting for requests to be completed. Current Time: " + System.currentTimeMillis());
|
||||
latch.await(5, TimeUnit.SECONDS );
|
||||
assertEquals(0, latch.getCount());
|
||||
log.debug("All requests have been completed. Exiting test.");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenMultigigabyteResponseConsumeWithStreams() throws Exception {
|
||||
CountDownLatch latch = new CountDownLatch(1);
|
||||
final ActorSystem system = ActorSystem.create();
|
||||
final ActorMaterializer materializer = ActorMaterializer.create(system);
|
||||
final Path path = Files.createTempFile("tmp_", ".out");
|
||||
|
||||
WSClient ws = play.test.WSTestClient.newClient(port);
|
||||
log.info("Starting test server on url: " + url);
|
||||
ws.url(url)
|
||||
.stream()
|
||||
.thenAccept(
|
||||
response -> {
|
||||
try {
|
||||
OutputStream outputStream = java.nio.file.Files.newOutputStream(path);
|
||||
Sink<ByteString, CompletionStage<Done>> outputWriter =
|
||||
Sink.foreach(bytes -> {
|
||||
log.info("Reponse: " + bytes.utf8String());
|
||||
outputStream.write(bytes.toArray());
|
||||
});
|
||||
|
||||
response.getBodyAsSource()
|
||||
.runWith(outputWriter, materializer);
|
||||
|
||||
} catch (IOException e) {
|
||||
log.error("An error happened while opening the output stream", e);
|
||||
}
|
||||
})
|
||||
.whenComplete((value, error) -> latch.countDown());
|
||||
|
||||
log.debug(
|
||||
"Waiting for requests to be completed. Current Time: " + System.currentTimeMillis());
|
||||
latch.await(5, TimeUnit.SECONDS );
|
||||
assertEquals(0, latch.getCount());
|
||||
log.debug("All requests have been completed. Exiting test.");
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user