diff --git a/core-java/src/main/java/com/baeldung/transferqueue/Consumer.java b/core-java/src/main/java/com/baeldung/transferqueue/Consumer.java new file mode 100644 index 0000000000..81a24ea5a8 --- /dev/null +++ b/core-java/src/main/java/com/baeldung/transferqueue/Consumer.java @@ -0,0 +1,33 @@ +package com.baeldung.transferqueue; + +import java.util.concurrent.TransferQueue; + +public class Consumer implements Runnable { + private final TransferQueue transferQueue; + private final String name; + private final int numberOfMessagesToConsume; + + public Consumer(TransferQueue transferQueue, String name, int numberOfMessagesToConsume) { + this.transferQueue = transferQueue; + this.name = name; + this.numberOfMessagesToConsume = numberOfMessagesToConsume; + } + + @Override + public void run() { + for (int i = 0; i < numberOfMessagesToConsume; i++) { + try { + System.out.println("Consumer: " + name + " is waiting to take element..."); + String element = transferQueue.take(); + longProcessing(element); + System.out.println("Consumer: " + name + " received element: " + element); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } + + private void longProcessing(String element) throws InterruptedException { + Thread.sleep(1_000); + } +} \ No newline at end of file diff --git a/core-java/src/main/java/com/baeldung/transferqueue/Producer.java b/core-java/src/main/java/com/baeldung/transferqueue/Producer.java new file mode 100644 index 0000000000..0aa2e32866 --- /dev/null +++ b/core-java/src/main/java/com/baeldung/transferqueue/Producer.java @@ -0,0 +1,33 @@ +package com.baeldung.transferqueue; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TransferQueue; + +public class Producer implements Runnable { + private final TransferQueue transferQueue; + private final String name; + private final Integer numberOfMessagesToProduce; + + public Producer(TransferQueue transferQueue, String name, Integer numberOfMessagesToProduce) { + this.transferQueue = transferQueue; + this.name = name; + this.numberOfMessagesToProduce = numberOfMessagesToProduce; + } + + @Override + public void run() { + for (int i = 0; i < numberOfMessagesToProduce; i++) { + try { + System.out.println("Producer: " + name + " is waiting to transfer..."); + boolean added = transferQueue.tryTransfer("A" + i, 4000, TimeUnit.MILLISECONDS); + if (!added) { + System.out.println("can not add an element due to the timeout"); + } else { + System.out.println("Producer: " + name + " transferred element: A" + i); + } + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } +} \ No newline at end of file diff --git a/core-java/src/test/java/com/baeldung/transferqueue/TransferQueueTest.java b/core-java/src/test/java/com/baeldung/transferqueue/TransferQueueTest.java new file mode 100644 index 0000000000..056a9f9c33 --- /dev/null +++ b/core-java/src/test/java/com/baeldung/transferqueue/TransferQueueTest.java @@ -0,0 +1,62 @@ +package com.baeldung.transferqueue; + +import org.junit.Test; + +import java.util.concurrent.*; + +public class TransferQueueTest { + + @Test + public void givenTransferQueue_whenUseMultipleConsumersAndMultipleProducers_thenShouldProcessAllMessages() throws InterruptedException { + //given + TransferQueue transferQueue = new LinkedTransferQueue<>(); + ExecutorService exService = Executors.newFixedThreadPool(3); + Producer producer1 = new Producer(transferQueue, "1", 3); + Producer producer2 = new Producer(transferQueue, "2", 3); + Consumer consumer1 = new Consumer(transferQueue, "1", 3); + Consumer consumer2 = new Consumer(transferQueue, "2", 3); + + //when + exService.execute(producer1); + exService.execute(producer2); + exService.execute(consumer1); + exService.execute(consumer2); + + //then + exService.awaitTermination(10_000, TimeUnit.MILLISECONDS); + exService.shutdown(); + } + + @Test + public void givenTransferQueue_whenUseOneConsumerAndOneProducer_thenShouldProcessAllMessages() throws InterruptedException { + //given + TransferQueue transferQueue = new LinkedTransferQueue<>(); + ExecutorService exService = Executors.newFixedThreadPool(2); + Producer producer = new Producer(transferQueue, "1", 3); + Consumer consumer = new Consumer(transferQueue, "1", 3); + + //when + exService.execute(producer); + exService.execute(consumer); + + //then + exService.awaitTermination(5000, TimeUnit.MILLISECONDS); + exService.shutdown(); + } + + @Test + public void givenTransferQueue_whenUseOneProducerAndNoConsumers_thenShouldFailWithTimeout() throws InterruptedException { + //given + TransferQueue transferQueue = new LinkedTransferQueue<>(); + ExecutorService exService = Executors.newFixedThreadPool(2); + Producer producer = new Producer(transferQueue, "1", 3); + + //when + exService.execute(producer); + + //then + exService.awaitTermination(5000, TimeUnit.MILLISECONDS); + exService.shutdown(); + } + +} \ No newline at end of file