JAVA-2109: Move Guide to the Java TransferQueue to the core-java-concurrency-collections-2
This commit is contained in:
@@ -10,7 +10,7 @@ This module contains articles about concurrent Java collections
|
||||
- [Custom Thread Pools In Java 8 Parallel Streams](http://www.baeldung.com/java-8-parallel-streams-custom-threadpool)
|
||||
- [Guide to DelayQueue](http://www.baeldung.com/java-delay-queue)
|
||||
- [A Guide to Java SynchronousQueue](http://www.baeldung.com/java-synchronous-queue)
|
||||
- [Guide to the Java TransferQueue](http://www.baeldung.com/java-transfer-queue)
|
||||
- [Guide to the ConcurrentSkipListMap](http://www.baeldung.com/java-concurrent-skip-list-map)
|
||||
- [Guide to CopyOnWriteArrayList](http://www.baeldung.com/java-copy-on-write-arraylist)
|
||||
- [LinkedBlockingQueue vs ConcurrentLinkedQueue](https://www.baeldung.com/java-queue-linkedblocking-concurrentlinked)
|
||||
- [[Next -->]](/core-java-modules/core-java-concurrency-collections-2)
|
||||
|
||||
@@ -1,41 +0,0 @@
|
||||
package com.baeldung.transferqueue;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.concurrent.TransferQueue;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
public class Consumer implements Runnable {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(Consumer.class);
|
||||
|
||||
private final TransferQueue<String> transferQueue;
|
||||
private final String name;
|
||||
final int numberOfMessagesToConsume;
|
||||
final AtomicInteger numberOfConsumedMessages = new AtomicInteger();
|
||||
|
||||
Consumer(TransferQueue<String> transferQueue, String name, int numberOfMessagesToConsume) {
|
||||
this.transferQueue = transferQueue;
|
||||
this.name = name;
|
||||
this.numberOfMessagesToConsume = numberOfMessagesToConsume;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
for (int i = 0; i < numberOfMessagesToConsume; i++) {
|
||||
try {
|
||||
LOG.debug("Consumer: " + name + " is waiting to take element...");
|
||||
String element = transferQueue.take();
|
||||
longProcessing(element);
|
||||
LOG.debug("Consumer: " + name + " received element: " + element);
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void longProcessing(String element) throws InterruptedException {
|
||||
numberOfConsumedMessages.incrementAndGet();
|
||||
Thread.sleep(500);
|
||||
}
|
||||
}
|
||||
@@ -1,41 +0,0 @@
|
||||
package com.baeldung.transferqueue;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TransferQueue;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
public class Producer implements Runnable {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(Producer.class);
|
||||
|
||||
private final TransferQueue<String> transferQueue;
|
||||
private final String name;
|
||||
final Integer numberOfMessagesToProduce;
|
||||
final AtomicInteger numberOfProducedMessages = new AtomicInteger();
|
||||
|
||||
Producer(TransferQueue<String> transferQueue, String name, Integer numberOfMessagesToProduce) {
|
||||
this.transferQueue = transferQueue;
|
||||
this.name = name;
|
||||
this.numberOfMessagesToProduce = numberOfMessagesToProduce;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
for (int i = 0; i < numberOfMessagesToProduce; i++) {
|
||||
try {
|
||||
LOG.debug("Producer: " + name + " is waiting to transfer...");
|
||||
boolean added = transferQueue.tryTransfer("A" + i, 4000, TimeUnit.MILLISECONDS);
|
||||
if (added) {
|
||||
numberOfProducedMessages.incrementAndGet();
|
||||
LOG.debug("Producer: " + name + " transferred element: A" + i);
|
||||
} else {
|
||||
LOG.debug("can not add an element due to the timeout");
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,74 +0,0 @@
|
||||
package com.baeldung.transferqueue;
|
||||
|
||||
import org.junit.FixMethodOrder;
|
||||
import org.junit.Test;
|
||||
import org.junit.runners.MethodSorters;
|
||||
|
||||
import java.util.concurrent.*;
|
||||
|
||||
import static junit.framework.TestCase.assertEquals;
|
||||
|
||||
@FixMethodOrder(MethodSorters.NAME_ASCENDING)
|
||||
public class TransferQueueIntegrationTest {
|
||||
|
||||
@Test
|
||||
public void whenMultipleConsumersAndProducers_thenProcessAllMessages() throws InterruptedException {
|
||||
//given
|
||||
TransferQueue<String> transferQueue = new LinkedTransferQueue<>();
|
||||
ExecutorService exService = Executors.newFixedThreadPool(3);
|
||||
Producer producer1 = new Producer(transferQueue, "1", 3);
|
||||
Producer producer2 = new Producer(transferQueue, "2", 3);
|
||||
Consumer consumer1 = new Consumer(transferQueue, "1", 3);
|
||||
Consumer consumer2 = new Consumer(transferQueue, "2", 3);
|
||||
|
||||
//when
|
||||
exService.execute(producer1);
|
||||
exService.execute(producer2);
|
||||
exService.execute(consumer1);
|
||||
exService.execute(consumer2);
|
||||
|
||||
//then
|
||||
exService.awaitTermination(5000, TimeUnit.MILLISECONDS);
|
||||
exService.shutdown();
|
||||
|
||||
assertEquals(producer1.numberOfProducedMessages.intValue(), 3);
|
||||
assertEquals(producer2.numberOfProducedMessages.intValue(), 3);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void whenUseOneConsumerAndOneProducer_thenShouldProcessAllMessages() throws InterruptedException {
|
||||
//given
|
||||
TransferQueue<String> transferQueue = new LinkedTransferQueue<>();
|
||||
ExecutorService exService = Executors.newFixedThreadPool(2);
|
||||
Producer producer = new Producer(transferQueue, "1", 3);
|
||||
Consumer consumer = new Consumer(transferQueue, "1", 3);
|
||||
|
||||
//when
|
||||
exService.execute(producer);
|
||||
exService.execute(consumer);
|
||||
|
||||
//then
|
||||
exService.awaitTermination(5000, TimeUnit.MILLISECONDS);
|
||||
exService.shutdown();
|
||||
|
||||
assertEquals(producer.numberOfProducedMessages.intValue(), 3);
|
||||
assertEquals(consumer.numberOfConsumedMessages.intValue(), 3);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void whenUseOneProducerAndNoConsumers_thenShouldFailWithTimeout() throws InterruptedException {
|
||||
//given
|
||||
TransferQueue<String> transferQueue = new LinkedTransferQueue<>();
|
||||
ExecutorService exService = Executors.newFixedThreadPool(2);
|
||||
Producer producer = new Producer(transferQueue, "1", 3);
|
||||
|
||||
//when
|
||||
exService.execute(producer);
|
||||
|
||||
//then
|
||||
exService.awaitTermination(5000, TimeUnit.MILLISECONDS);
|
||||
exService.shutdown();
|
||||
|
||||
assertEquals(producer.numberOfProducedMessages.intValue(), 0);
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user