diff --git a/build-all.sh b/build-all.sh index f126a41..0009f7d 100644 --- a/build-all.sh +++ b/build-all.sh @@ -30,6 +30,7 @@ build_gradle_module() { chmod +x gradlew +build_gradle_module "reactive" build_gradle_module "junit/assumptions" build_gradle_module "logging" build_gradle_module "pact/pact-feign-consumer" diff --git a/reactive/build.gradle b/reactive/build.gradle index 305bb7a..2779c4c 100644 --- a/reactive/build.gradle +++ b/reactive/build.gradle @@ -15,9 +15,9 @@ repositories { } dependencies { - compile 'io.reactivex.rxjava2:rxjava:2.2.17' - testCompile 'org.junit.jupiter:junit-jupiter-engine:5.0.1' - testCompile 'org.awaitility:awaitility:3.0.0' + implementation 'io.reactivex.rxjava2:rxjava:2.2.17' + testImplementation 'org.junit.jupiter:junit-jupiter-engine:5.0.1' + testImplementation 'org.awaitility:awaitility:3.0.0' } test { diff --git a/reactive/gradle/wrapper/gradle-wrapper.jar b/reactive/gradle/wrapper/gradle-wrapper.jar index 1a958be..29953ea 100644 Binary files a/reactive/gradle/wrapper/gradle-wrapper.jar and b/reactive/gradle/wrapper/gradle-wrapper.jar differ diff --git a/reactive/gradle/wrapper/gradle-wrapper.properties b/reactive/gradle/wrapper/gradle-wrapper.properties index 4c46317..9492014 100644 --- a/reactive/gradle/wrapper/gradle-wrapper.properties +++ b/reactive/gradle/wrapper/gradle-wrapper.properties @@ -1,5 +1,5 @@ distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists +distributionUrl=https\://services.gradle.org/distributions/gradle-6.0.1-bin.zip zipStoreBase=GRADLE_USER_HOME zipStorePath=wrapper/dists -distributionUrl=https\://services.gradle.org/distributions/gradle-5.6.4-bin.zip diff --git a/reactive/gradlew b/reactive/gradlew index 4453cce..cccdd3d 100644 --- a/reactive/gradlew +++ b/reactive/gradlew @@ -33,11 +33,11 @@ DEFAULT_JVM_OPTS="" # Use the maximum available, or set MAX_FD != -1 to use that value. MAX_FD="maximum" -warn ( ) { +warn () { echo "$*" } -die ( ) { +die () { echo echo "$*" echo @@ -155,7 +155,7 @@ if $cygwin ; then fi # Escape application args -save ( ) { +save () { for i do printf %s\\n "$i" | sed "s/'/'\\\\''/g;1s/^/'/;\$s/\$/' \\\\/" ; done echo " " } diff --git a/reactive/src/main/java/io/reflectoring/reactive/batch/MessageHandler.java b/reactive/src/main/java/io/reflectoring/reactive/batch/MessageHandler.java index eceb1fc..664477b 100644 --- a/reactive/src/main/java/io/reflectoring/reactive/batch/MessageHandler.java +++ b/reactive/src/main/java/io/reflectoring/reactive/batch/MessageHandler.java @@ -1,36 +1,12 @@ package io.reflectoring.reactive.batch; -import io.reactivex.Single; +public interface MessageHandler { -import java.util.concurrent.atomic.AtomicInteger; + enum Result { + SUCCESS, + FAILURE + } -public class MessageHandler { + Result handleMessage(Message message); - private final AtomicInteger processedMessages = new AtomicInteger(); - - private Logger logger = new Logger(); - - enum Result { - SUCCESS, - FAILURE - } - - public Result handleMessage(Message message) { - logger.log(String.format("handling message %s", message)); - sleep(500); - this.processedMessages.getAndAdd(1); - return Result.SUCCESS; - } - - private void sleep(long millis) { - try { - Thread.sleep(millis); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - } - - public Integer getProcessedMessages() { - return processedMessages.get(); - } } diff --git a/reactive/src/main/java/io/reflectoring/reactive/batch/MessageSource.java b/reactive/src/main/java/io/reflectoring/reactive/batch/MessageSource.java new file mode 100644 index 0000000..ddbefcb --- /dev/null +++ b/reactive/src/main/java/io/reflectoring/reactive/batch/MessageSource.java @@ -0,0 +1,9 @@ +package io.reflectoring.reactive.batch; + +import io.reactivex.Flowable; + +public interface MessageSource { + + Flowable getMessageBatches(); + +} diff --git a/reactive/src/main/java/io/reflectoring/reactive/batch/ReactiveBatchProcessor.java b/reactive/src/main/java/io/reflectoring/reactive/batch/ReactiveBatchProcessor.java index 4b3e059..8c0587b 100644 --- a/reactive/src/main/java/io/reflectoring/reactive/batch/ReactiveBatchProcessor.java +++ b/reactive/src/main/java/io/reflectoring/reactive/batch/ReactiveBatchProcessor.java @@ -5,95 +5,82 @@ import io.reactivex.Scheduler; import io.reactivex.Single; import io.reactivex.schedulers.Schedulers; import io.reflectoring.reactive.batch.MessageHandler.Result; -import org.reactivestreams.Subscriber; -import org.reactivestreams.Subscription; - -import java.util.ArrayList; -import java.util.List; +import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; public class ReactiveBatchProcessor { - private final static Logger logger = new Logger(); + private final static Logger logger = new Logger(); - private final int MESSAGE_BATCHES = 10; + private final int threads; - private final int BATCH_SIZE = 3; + private final MessageHandler messageHandler; - private final int THREADS = 4; + private final MessageSource messageSource; - private final MessageHandler messageHandler = new MessageHandler(); + public ReactiveBatchProcessor( + MessageSource messageSource, + MessageHandler messageHandler, + int threads) { + this.messageSource = messageSource; + this.threads = threads; + this.messageHandler = messageHandler; + } - public void start() { + public void start() { - Scheduler threadPoolScheduler = threadPoolScheduler(THREADS, 10); + Scheduler scheduler = threadPoolScheduler(threads, 10); - retrieveMessageBatches() - .doOnNext(batch -> logger.log(batch.toString())) - .flatMap(batch -> Flowable.fromIterable(batch.getMessages())) - .flatMapSingle(message -> Single.defer(() -> Single.just(messageHandler.handleMessage(message))) - .doOnSuccess(result -> logger.log("message handled")) - .subscribeOn(threadPoolScheduler)) - .subscribeWith(subscriber()); - } + messageSource.getMessageBatches() + .subscribeOn(Schedulers.from(Executors.newSingleThreadExecutor())) + .doOnNext(batch -> logger.log(batch.toString())) + .flatMap(batch -> Flowable.fromIterable(batch.getMessages())) + .flatMapSingle(m -> Single.defer(() -> Single.just(m) + .map(messageHandler::handleMessage)) + .subscribeOn(scheduler)) + .subscribeWith(subscriber()); + } - private Subscriber subscriber() { - return new Subscriber<>() { - private Subscription subscription; + private Subscriber subscriber() { + return new Subscriber<>() { + private Subscription subscription; - @Override - public void onSubscribe(Subscription subscription) { - this.subscription = subscription; - subscription.request(THREADS); - logger.log("subscribed"); - } + @Override + public void onSubscribe(Subscription subscription) { + this.subscription = subscription; + subscription.request(threads); + logger.log("subscribed"); + } - @Override - public void onNext(Result message) { - subscription.request(THREADS); - } + @Override + public void onNext(Result message) { + subscription.request(threads); + } - @Override - public void onError(Throwable t) { - logger.log("error"); - } + @Override + public void onError(Throwable t) { + logger.log("error"); + } - @Override - public void onComplete() { - logger.log("completed"); - } - }; - } - - private Scheduler threadPoolScheduler(int poolSize, int queueSize) { - return Schedulers.from(new ThreadPoolExecutor( - poolSize, - poolSize, - 0L, - TimeUnit.SECONDS, - new LinkedBlockingDeque<>(queueSize), - new RetryRejectedExecutionHandler() - )); - } - - public boolean allMessagesProcessed() { - return this.messageHandler.getProcessedMessages() == MESSAGE_BATCHES * BATCH_SIZE; - } - - private Flowable retrieveMessageBatches() { - return Flowable.range(1, MESSAGE_BATCHES) - .map(this::messageBatch); - } - - private MessageBatch messageBatch(int batchNumber) { - List messages = new ArrayList<>(); - for (int i = 1; i <= BATCH_SIZE; i++) { - messages.add(new Message(String.format("%d-%d", batchNumber, i))); - } - return new MessageBatch(messages); - } + @Override + public void onComplete() { + logger.log("completed"); + } + }; + } + private Scheduler threadPoolScheduler(int poolSize, int queueSize) { + return Schedulers.from(new ThreadPoolExecutor( + poolSize, + poolSize, + 0L, + TimeUnit.SECONDS, + new LinkedBlockingDeque<>(queueSize) + )); + } } diff --git a/reactive/src/test/java/io/reflectoring/ReactiveBatchProcessorTest.java b/reactive/src/test/java/io/reflectoring/ReactiveBatchProcessorTest.java index 189ea7d..f859868 100644 --- a/reactive/src/test/java/io/reflectoring/ReactiveBatchProcessorTest.java +++ b/reactive/src/test/java/io/reflectoring/ReactiveBatchProcessorTest.java @@ -1,8 +1,9 @@ package io.reflectoring; import static org.awaitility.Awaitility.await; -import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.assertEquals; +import io.reflectoring.reactive.batch.MessageSource; import io.reflectoring.reactive.batch.ReactiveBatchProcessor; import java.util.concurrent.TimeUnit; import org.junit.jupiter.api.Test; @@ -10,16 +11,29 @@ import org.junit.jupiter.api.Test; public class ReactiveBatchProcessorTest { @Test - public void test() { - ReactiveBatchProcessor processor = new ReactiveBatchProcessor(); + public void allMessagesAreProcessedOnMultipleThreads() { + + int batches = 3; + int batchSize = 4; + int threads = 5; + + MessageSource messageSource = new TestMessageSource(batches, batchSize); + TestMessageHandler messageHandler = new TestMessageHandler(); + + ReactiveBatchProcessor processor = new ReactiveBatchProcessor( + messageSource, + messageHandler, + threads + ); processor.start(); await() .atMost(10, TimeUnit.SECONDS) .pollInterval(1, TimeUnit.SECONDS) - .untilAsserted(() -> assertTrue(processor.allMessagesProcessed())); + .untilAsserted(() -> assertEquals(batches * batchSize, messageHandler.getProcessedMessages())); + assertEquals(threads, messageHandler.threadNames().size()); } } diff --git a/reactive/src/test/java/io/reflectoring/TestMessageHandler.java b/reactive/src/test/java/io/reflectoring/TestMessageHandler.java new file mode 100644 index 0000000..876216d --- /dev/null +++ b/reactive/src/test/java/io/reflectoring/TestMessageHandler.java @@ -0,0 +1,43 @@ +package io.reflectoring; + +import io.reflectoring.reactive.batch.Logger; +import io.reflectoring.reactive.batch.Message; +import io.reflectoring.reactive.batch.MessageHandler; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +public class TestMessageHandler implements MessageHandler { + + private final AtomicInteger processedMessages = new AtomicInteger(); + + private final AtomicReference> threadNames = new AtomicReference<>(new HashSet<>()); + + private Logger logger = new Logger(); + + @Override + public Result handleMessage(Message message) { + sleep(500); + logger.log(String.format("processed message %s", message)); + threadNames.get().add(Thread.currentThread().getName()); + processedMessages.addAndGet(1); + return Result.SUCCESS; + } + + private void sleep(long millis) { + try { + Thread.sleep(millis); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + + public int getProcessedMessages() { + return processedMessages.get(); + } + + public Set threadNames() { + return threadNames.get(); + } +} diff --git a/reactive/src/test/java/io/reflectoring/TestMessageSource.java b/reactive/src/test/java/io/reflectoring/TestMessageSource.java new file mode 100644 index 0000000..5ea417b --- /dev/null +++ b/reactive/src/test/java/io/reflectoring/TestMessageSource.java @@ -0,0 +1,43 @@ +package io.reflectoring; + +import io.reactivex.Flowable; +import io.reflectoring.reactive.batch.Message; +import io.reflectoring.reactive.batch.MessageBatch; +import io.reflectoring.reactive.batch.MessageSource; +import java.util.ArrayList; +import java.util.List; + +public class TestMessageSource implements MessageSource { + + private final int batches; + + private final int batchSize; + + /** + * Constructor. + * + * @param batches the number of message batches to produce. + * @param batchSize the number of messages per batch. + */ + public TestMessageSource(int batches, int batchSize) { + this.batches = batches; + this.batchSize = batchSize; + } + + /** + * Generates a limited number of {@link MessageBatch}es. + */ + public Flowable getMessageBatches() { + return Flowable.range(1, batches) + .map(this::messageBatch); + } + + private MessageBatch messageBatch(int batchNumber) { + List messages = new ArrayList<>(); + for (int i = 1; i <= batchSize; i++) { + messages.add(new Message(String.format("%d-%d", batchNumber, i))); + } + return new MessageBatch(messages); + } + +}