diff --git a/rabbitmq/README.md b/rabbitmq/README.md index 7fea2e85a0..3624635835 100644 --- a/rabbitmq/README.md +++ b/rabbitmq/README.md @@ -7,3 +7,4 @@ This module contains articles about RabbitMQ. - [Exchanges, Queues, and Bindings in RabbitMQ](https://www.baeldung.com/java-rabbitmq-exchanges-queues-bindings) - [Pub-Sub vs. Message Queues](https://www.baeldung.com/pub-sub-vs-message-queues) + diff --git a/rabbitmq/docker-compose.yaml b/rabbitmq/docker-compose.yaml new file mode 100644 index 0000000000..d3dd7a58c3 --- /dev/null +++ b/rabbitmq/docker-compose.yaml @@ -0,0 +1,14 @@ +version: '3.0' +services: + rabbitmq: + image: rabbitmq:3-management + environment: + - RABBITMQ_DEFAULT_USER=guest + - RABBITMQ_DEFAULT_PASS=guest + - RABBITMQ_VM_MEMORY_HIGH_WATERMARK_RELATIVE=0.8 + ports: + - "5672:5672" + - "15672:15672" + volumes: + - ./src/rabbitmq/20-mem.conf:/etc/rabbitmq/conf.d/20-mem.conf + diff --git a/rabbitmq/pom.xml b/rabbitmq/pom.xml index 8befd36ab6..1165f44d4a 100644 --- a/rabbitmq/pom.xml +++ b/rabbitmq/pom.xml @@ -13,6 +13,7 @@ 0.0.1-SNAPSHOT ../parent-boot-2 + @@ -30,7 +31,6 @@ com.rabbitmq amqp-client - ${amqp-client.version} org.springframework.boot @@ -40,10 +40,10 @@ org.springframework.boot spring-boot-starter-amqp + - 5.12.0 2020.0.3 diff --git a/rabbitmq/src/main/java/com/baeldung/benchmark/ConnectionPerChannelPublisher.java b/rabbitmq/src/main/java/com/baeldung/benchmark/ConnectionPerChannelPublisher.java new file mode 100644 index 0000000000..1692066bef --- /dev/null +++ b/rabbitmq/src/main/java/com/baeldung/benchmark/ConnectionPerChannelPublisher.java @@ -0,0 +1,97 @@ +package com.baeldung.benchmark; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.LongSummaryStatistics; +import java.util.Random; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.baeldung.benchmark.Worker.WorkerResult; +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.ConnectionFactory; + +public class ConnectionPerChannelPublisher implements Callable { + + private static final Logger log = LoggerFactory.getLogger(ConnectionPerChannelPublisher.class); + private final ConnectionFactory factory; + private final int workerCount; + private final int iterations; + private final int payloadSize; + + ConnectionPerChannelPublisher(ConnectionFactory factory, int workerCount, int iterations, int payloadSize) { + this.factory = factory; + this.workerCount = workerCount; + this.iterations = iterations; + this.payloadSize = payloadSize; + } + + public static void main(String[] args) { + + if (args.length != 4) { + System.err.println("Usage: java " + ConnectionPerChannelPublisher.class.getName() + " <#channels> <#messages> "); + System.exit(1); + } + + ConnectionFactory factory = new ConnectionFactory(); + factory.setHost(args[0]); + + int workerCount = Integer.parseInt(args[1]); + int iterations = Integer.parseInt(args[2]); + int payloadSize = Integer.parseInt(args[3]); + + // run the benchmark 10x and get the average throughput + LongSummaryStatistics summary = IntStream.range(0, 9) + .mapToObj(idx -> new ConnectionPerChannelPublisher(factory, workerCount, iterations, payloadSize)) + .map(p -> p.call()) + .collect(Collectors.summarizingLong((l) -> l)); + + log.info("[I66] workers={}, throughput={}", workerCount, (int)Math.floor(summary.getAverage())); + + } + + @Override + public Long call() { + try { + List workers = new ArrayList<>(); + CountDownLatch counter = new CountDownLatch(workerCount); + + for (int i = 0; i < workerCount; i++) { + Connection conn = factory.newConnection(); + workers.add(new Worker("queue_" + i, conn, iterations, counter, payloadSize)); + } + + ExecutorService executor = new ThreadPoolExecutor(workerCount, workerCount, 0, TimeUnit.SECONDS, new ArrayBlockingQueue<>(workerCount, true)); + long start = System.currentTimeMillis(); + log.info("[I61] Starting {} workers...", workers.size()); + executor.invokeAll(workers); + if (counter.await(5, TimeUnit.MINUTES)) { + long elapsed = System.currentTimeMillis() - start; + log.info("[I59] Tasks completed: #workers={}, #iterations={}, elapsed={}ms, stats={}", workerCount, iterations, elapsed); + return throughput(workerCount, iterations, elapsed); + } else { + throw new RuntimeException("[E61] Timeout waiting workers to complete"); + } + + } catch (Exception ex) { + throw new RuntimeException(ex); + } + } + + private static long throughput(int workerCount, int iterations, long elapsed) { + return (iterations * workerCount * 1000) / elapsed; + } + +} diff --git a/rabbitmq/src/main/java/com/baeldung/benchmark/SharedConnectionPublisher.java b/rabbitmq/src/main/java/com/baeldung/benchmark/SharedConnectionPublisher.java new file mode 100644 index 0000000000..7b44ccb9ea --- /dev/null +++ b/rabbitmq/src/main/java/com/baeldung/benchmark/SharedConnectionPublisher.java @@ -0,0 +1,165 @@ +package com.baeldung.benchmark; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.LongSummaryStatistics; +import java.util.Random; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.stream.Collectors; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.ConnectionFactory; + +public class SharedConnectionPublisher { + + private static final Logger log = LoggerFactory.getLogger(SharedConnectionPublisher.class); + + + public static void main(String[] args) { + + try { + + if ( args.length != 6) { + System.err.println("Usage: java " + SharedConnectionPublisher.class.getName() + " <#channels> <#messages> <#channels/connection> "); + System.exit(1); + } + + ConnectionFactory factory = new ConnectionFactory(); + factory.setHost(args[0]); + + List workers = new ArrayList<>(); + + int workerCount = Integer.parseInt(args[1]); + int iterations = Integer.parseInt(args[2]); + int payloadSize = Integer.parseInt(args[3]); + int channelsPerConnection = Integer.parseInt(args[4]); + long extraWork = Long.parseLong(args[5]); + + log.info("[I35] Creating {} worker{}...", workerCount, (workerCount > 1)?"s":""); + + CountDownLatch counter = new CountDownLatch(workerCount); + + int connCount = (workerCount + channelsPerConnection-1)/channelsPerConnection; + List connections = new ArrayList<>(connCount); + for( int i =0 ; i< connCount; i++) { + log.info("[I59] Creating connection#{}", i); + connections.add(factory.newConnection()); + } + + for( int i = 0 ; i < workerCount ; i++ ) { + workers.add(new Worker("queue_" + i, connections.get(i % connCount), iterations, counter,payloadSize,extraWork)); + } + + ExecutorService executor = new ThreadPoolExecutor(workerCount, workerCount, 0, TimeUnit.SECONDS, new ArrayBlockingQueue<>(workerCount, true)); + long start = System.currentTimeMillis(); + log.info("[I61] Starting workers..."); + List> results = executor.invokeAll(workers); + + log.info("[I55] Waiting workers to complete..."); + if( counter.await(5, TimeUnit.MINUTES) ) { + long elapsed = System.currentTimeMillis() - start - (workerCount*iterations*extraWork); + log.info("[I59] Tasks completed: #workers={}, #iterations={}, elapsed={}ms", + workerCount, + iterations, + elapsed); + + LongSummaryStatistics summary = results.stream() + .map(f -> safeGet(f)) + .map(r -> r.elapsed) + .collect(Collectors.summarizingLong((l) -> l)); + + log.info("[I74] stats={}", summary); + log.info("[I79] result: workers={}, throughput={}",workerCount,throughput(workerCount,iterations,elapsed)); + + } + else { + log.error("[E61] Timeout waiting workers to complete"); + } + + } + catch(Exception ex) { + throw new RuntimeException(ex); + } + } + + private static long throughput(int workerCount, int iterations, long elapsed) { + return (iterations*workerCount*1000)/elapsed; + } + + + private static T safeGet(Future f) { + try { + return f.get(); + } + catch(Exception ex) { + throw new RuntimeException(ex); + } + } + + private static class WorkerResult { + public final long elapsed; + WorkerResult(long elapsed) { + this.elapsed = elapsed; + } + } + + + private static class Worker implements Callable { + + private final Connection conn; + private final Channel channel; + private int iterations; + private final CountDownLatch counter; + private final String queue; + private final byte[] payload; + private long extraWork; + + Worker(String queue, Connection conn, int iterations, CountDownLatch counter,int payloadSize,long extraWork) throws IOException { + this.conn = conn; + this.iterations = iterations; + this.counter = counter; + this.queue = queue; + this.extraWork = extraWork; + + channel = conn.createChannel(); + channel.queueDeclare(queue, false, false, true, null); + + this.payload = new byte[payloadSize]; + new Random().nextBytes(payload); + + } + + @Override + public WorkerResult call() throws Exception { + + try { + long start = System.currentTimeMillis(); + for ( int i = 0 ; i < iterations ; i++ ) { + channel.basicPublish("", queue, null,payload); + Thread.sleep(extraWork); + } + + long elapsed = System.currentTimeMillis() - start - (extraWork*iterations); + channel.queueDelete(queue); + return new WorkerResult(elapsed); + } + finally { + counter.countDown(); + } + } + } +} diff --git a/rabbitmq/src/main/java/com/baeldung/benchmark/SingleConnectionPublisher.java b/rabbitmq/src/main/java/com/baeldung/benchmark/SingleConnectionPublisher.java new file mode 100644 index 0000000000..976a7e1990 --- /dev/null +++ b/rabbitmq/src/main/java/com/baeldung/benchmark/SingleConnectionPublisher.java @@ -0,0 +1,116 @@ +package com.baeldung.benchmark; + +import java.util.ArrayList; +import java.util.List; +import java.util.LongSummaryStatistics; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.baeldung.benchmark.Worker.WorkerResult; +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.ConnectionFactory; + +public class SingleConnectionPublisher implements Callable { + + private static final Logger log = LoggerFactory.getLogger(SingleConnectionPublisher.class); + + private final ConnectionFactory factory; + private final int workerCount; + private final int iterations; + private final int payloadSize; + + SingleConnectionPublisher(ConnectionFactory factory, int workerCount, int iterations, int payloadSize) { + this.factory = factory; + this.workerCount = workerCount; + this.iterations = iterations; + this.payloadSize = payloadSize; + } + + public static void main(String[] args) { + + if ( args.length != 4) { + System.err.println("Usage: java " + SingleConnectionPublisher.class.getName() + " <#channels> <#messages> "); + System.exit(1); + } + + ConnectionFactory factory = new ConnectionFactory(); + factory.setHost(args[0]); + + int workerCount = Integer.parseInt(args[1]); + int iterations = Integer.parseInt(args[2]); + int payloadSize = Integer.parseInt(args[3]); + + LongSummaryStatistics summary = IntStream.range(0, 9) + .mapToObj(idx -> new SingleConnectionPublisher(factory, workerCount, iterations, payloadSize)) + .map(p -> p.call()) + .collect(Collectors.summarizingLong((l) -> l)); + + log.info("[I66] workers={}, throughput={}", workerCount, (int)Math.floor(summary.getAverage())); + + } + + @Override + public Long call() { + + try { + + Connection connection = factory.newConnection(); + CountDownLatch counter = new CountDownLatch(workerCount); + List workers = new ArrayList<>(); + + for( int i = 0 ; i < workerCount ; i++ ) { + workers.add(new Worker("queue_" + i, connection, iterations, counter,payloadSize)); + } + + ExecutorService executor = new ThreadPoolExecutor(workerCount, workerCount, 0, TimeUnit.SECONDS, new ArrayBlockingQueue<>(workerCount, true)); + long start = System.currentTimeMillis(); + log.info("[I61] Starting {} workers...", workers.size()); + List> results = executor.invokeAll(workers); + + if( counter.await(5, TimeUnit.MINUTES) ) { + long elapsed = System.currentTimeMillis() - start; + + LongSummaryStatistics summary = results.stream() + .map(f -> safeGet(f)) + .map(r -> r.elapsed) + .collect(Collectors.summarizingLong((l) -> l)); + + log.info("[I59] Tasks completed: #workers={}, #iterations={}, elapsed={}ms, stats={}", + workerCount, + iterations, + elapsed, summary); + + return throughput(workerCount,iterations,elapsed); + } + else { + throw new RuntimeException("[E61] Timeout waiting workers to complete"); + } + } + catch(Exception ex) { + throw new RuntimeException(ex); + } + } + + private static T safeGet(Future f) { + try { + return f.get(); + } + catch(Exception ex) { + throw new RuntimeException(ex); + } + } + + private static long throughput(int workerCount, int iterations, long elapsed) { + return (iterations*workerCount*1000)/elapsed; + } +} diff --git a/rabbitmq/src/main/java/com/baeldung/benchmark/SingleConnectionPublisherNio.java b/rabbitmq/src/main/java/com/baeldung/benchmark/SingleConnectionPublisherNio.java new file mode 100644 index 0000000000..cd4fe28ce9 --- /dev/null +++ b/rabbitmq/src/main/java/com/baeldung/benchmark/SingleConnectionPublisherNio.java @@ -0,0 +1,151 @@ +package com.baeldung.benchmark; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.LongSummaryStatistics; +import java.util.Random; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.stream.Collectors; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.ConnectionFactory; + +public class SingleConnectionPublisherNio { + + private static final Logger log = LoggerFactory.getLogger(SingleConnectionPublisherNio.class); + + + public static void main(String[] args) { + + try { + + if ( args.length != 4) { + System.err.println("Usage: java " + SingleConnectionPublisherNio.class.getName() + " <#channels> <#messages> "); + System.exit(1); + } + + ConnectionFactory factory = new ConnectionFactory(); + factory.setHost(args[0]); + factory.useNio(); + Connection connection = factory.newConnection(); + + List workers = new ArrayList<>(); + + int workerCount = Integer.parseInt(args[1]); + int iterations = Integer.parseInt(args[2]); + int payloadSize = Integer.parseInt(args[3]); + + log.info("[I35] Creating {} worker{}...", workerCount, (workerCount > 1)?"s":""); + + CountDownLatch counter = new CountDownLatch(workerCount); + + for( int i = 0 ; i < workerCount ; i++ ) { + workers.add(new Worker("queue_" + i, connection, iterations, counter,payloadSize)); + } + + ExecutorService executor = new ThreadPoolExecutor(workerCount, workerCount, 0, TimeUnit.SECONDS, new ArrayBlockingQueue<>(workerCount, true)); + long start = System.currentTimeMillis(); + log.info("[I61] Starting workers..."); + List> results = executor.invokeAll(workers); + + log.info("[I55] Waiting workers to complete..."); + if( counter.await(5, TimeUnit.MINUTES) ) { + long elapsed = System.currentTimeMillis() - start; + log.info("[I59] Tasks completed: #workers={}, #iterations={}, elapsed={}ms", + workerCount, + iterations, + elapsed); + + LongSummaryStatistics summary = results.stream() + .map(f -> safeGet(f)) + .map(r -> r.elapsed) + .collect(Collectors.summarizingLong((l) -> l)); + + log.info("[I74] stats={}", summary); + + } + else { + log.error("[E61] Timeout waiting workers to complete"); + } + + } + catch(Exception ex) { + throw new RuntimeException(ex); + } + + } + + private static T safeGet(Future f) { + try { + return f.get(); + } + catch(Exception ex) { + throw new RuntimeException(ex); + } + } + + private static class WorkerResult { + public final long elapsed; + WorkerResult(long elapsed) { + this.elapsed = elapsed; + } + } + + + private static class Worker implements Callable { + + private final Connection conn; + private final Channel channel; + private int iterations; + private final CountDownLatch counter; + private final String queue; + private final byte[] payload; + + Worker(String queue, Connection conn, int iterations, CountDownLatch counter,int payloadSize) throws IOException { + this.conn = conn; + this.iterations = iterations; + this.counter = counter; + this.queue = queue; + + channel = conn.createChannel(); + channel.queueDeclare(queue, false, false, true, null); + + this.payload = new byte[payloadSize]; + new Random().nextBytes(payload); + + } + + @Override + public WorkerResult call() throws Exception { + + try { + long start = System.currentTimeMillis(); + for ( int i = 0 ; i < iterations ; i++ ) { + channel.basicPublish("", queue, null,payload); + } + + long elapsed = System.currentTimeMillis() - start; + channel.queueDelete(queue); + return new WorkerResult(elapsed); + } + finally { + counter.countDown(); + } + } + + } +} diff --git a/rabbitmq/src/main/java/com/baeldung/benchmark/Worker.java b/rabbitmq/src/main/java/com/baeldung/benchmark/Worker.java new file mode 100644 index 0000000000..11584372b8 --- /dev/null +++ b/rabbitmq/src/main/java/com/baeldung/benchmark/Worker.java @@ -0,0 +1,59 @@ +package com.baeldung.benchmark; + +import java.io.IOException; +import java.util.Random; +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; + + +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Connection; + +public class Worker implements Callable { + + private final Channel channel; + private int iterations; + private final CountDownLatch counter; + private final String queue; + private final byte[] payload; + + Worker(String queue, Connection conn, int iterations, CountDownLatch counter, int payloadSize) throws IOException { + this.iterations = iterations; + this.counter = counter; + this.queue = queue; + + channel = conn.createChannel(); + channel.queueDelete(queue); + channel.queueDeclare(queue, false, false, true, null); + + this.payload = new byte[payloadSize]; + new Random().nextBytes(payload); + + } + + @Override + public WorkerResult call() throws Exception { + + try { + long start = System.currentTimeMillis(); + for (int i = 0; i < iterations; i++) { + channel.basicPublish("", queue, null, payload); + } + + long elapsed = System.currentTimeMillis() - start; + channel.queueDelete(queue); + return new WorkerResult(elapsed); + } finally { + counter.countDown(); + } + } + + + public static class WorkerResult { + public final long elapsed; + + WorkerResult(long elapsed) { + this.elapsed = elapsed; + } + } +} diff --git a/rabbitmq/src/rabbitmq/20-mem.conf b/rabbitmq/src/rabbitmq/20-mem.conf new file mode 100644 index 0000000000..4c73e866b7 --- /dev/null +++ b/rabbitmq/src/rabbitmq/20-mem.conf @@ -0,0 +1,2 @@ +# Memory configuration for Rabbit +vm_memory_high_watermark.relative = 0.8 diff --git a/rabbitmq/src/test/java/com/baeldung/benchmark/ConnectionPerChannelPublisherLiveTest.java b/rabbitmq/src/test/java/com/baeldung/benchmark/ConnectionPerChannelPublisherLiveTest.java new file mode 100644 index 0000000000..0309912e91 --- /dev/null +++ b/rabbitmq/src/test/java/com/baeldung/benchmark/ConnectionPerChannelPublisherLiveTest.java @@ -0,0 +1,18 @@ +package com.baeldung.benchmark; + +import java.util.Arrays; + +import org.junit.jupiter.api.Test; + +class ConnectionPerChannelPublisherLiveTest { + + @Test + void whenConnectionPerChannel_thenRunBenchmark() throws Exception { + // host, workerCount, iterations, payloadSize + Arrays.asList(1,5,10,20,50,100,150).stream() + .forEach(workers -> { + ConnectionPerChannelPublisher.main(new String[]{"192.168.99.100", Integer.toString(workers), "1000", "4096"}); + }); + } + +} diff --git a/rabbitmq/src/test/java/com/baeldung/benchmark/SingleConnectionPublisherLiveTest.java b/rabbitmq/src/test/java/com/baeldung/benchmark/SingleConnectionPublisherLiveTest.java new file mode 100644 index 0000000000..6c03f90fb0 --- /dev/null +++ b/rabbitmq/src/test/java/com/baeldung/benchmark/SingleConnectionPublisherLiveTest.java @@ -0,0 +1,18 @@ +package com.baeldung.benchmark; + +import java.util.Arrays; + +import org.junit.jupiter.api.Test; + +class SingleConnectionPublisherLiveTest { + + @Test + void whenSingleChannel_thenRunBenchmark() throws Exception { + // host, workerCount, iterations, payloadSize + Arrays.asList(1,5,10,20,50,100,150).stream() + .forEach(workers -> { + SingleConnectionPublisher.main(new String[]{"192.168.99.100", Integer.toString(workers), "1000", "4096"}); + }); + } + +} diff --git a/spring-security-modules/pom.xml b/spring-security-modules/pom.xml index 1e0ae825e1..bef6d148be 100644 --- a/spring-security-modules/pom.xml +++ b/spring-security-modules/pom.xml @@ -47,7 +47,7 @@ spring-security-web-thymeleaf spring-security-web-x509 spring-security-opa - spring-security-pkce + spring-security-pkce \ No newline at end of file