diff --git a/reactive/src/main/java/io/reflectoring/reactive/batch/ReactiveBatchProcessor.java b/reactive/src/main/java/io/reflectoring/reactive/batch/ReactiveBatchProcessor.java index 8c0587b..c833878 100644 --- a/reactive/src/main/java/io/reflectoring/reactive/batch/ReactiveBatchProcessor.java +++ b/reactive/src/main/java/io/reflectoring/reactive/batch/ReactiveBatchProcessor.java @@ -4,13 +4,10 @@ import io.reactivex.Flowable; import io.reactivex.Scheduler; import io.reactivex.Single; import io.reactivex.schedulers.Schedulers; -import io.reflectoring.reactive.batch.MessageHandler.Result; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import org.reactivestreams.Subscriber; -import org.reactivestreams.Subscription; public class ReactiveBatchProcessor { @@ -18,6 +15,8 @@ public class ReactiveBatchProcessor { private final int threads; + private final int threadPoolQueueSize; + private final MessageHandler messageHandler; private final MessageSource messageSource; @@ -25,15 +24,17 @@ public class ReactiveBatchProcessor { public ReactiveBatchProcessor( MessageSource messageSource, MessageHandler messageHandler, - int threads) { + int threads, + int threadPoolQueueSize) { this.messageSource = messageSource; this.threads = threads; this.messageHandler = messageHandler; + this.threadPoolQueueSize = threadPoolQueueSize; } public void start() { - Scheduler scheduler = threadPoolScheduler(threads, 10); + Scheduler scheduler = threadPoolScheduler(threads, threadPoolQueueSize); messageSource.getMessageBatches() .subscribeOn(Schedulers.from(Executors.newSingleThreadExecutor())) @@ -42,35 +43,7 @@ public class ReactiveBatchProcessor { .flatMapSingle(m -> Single.defer(() -> Single.just(m) .map(messageHandler::handleMessage)) .subscribeOn(scheduler)) - .subscribeWith(subscriber()); - } - - private Subscriber subscriber() { - return new Subscriber<>() { - private Subscription subscription; - - @Override - public void onSubscribe(Subscription subscription) { - this.subscription = subscription; - subscription.request(threads); - logger.log("subscribed"); - } - - @Override - public void onNext(Result message) { - subscription.request(threads); - } - - @Override - public void onError(Throwable t) { - logger.log("error"); - } - - @Override - public void onComplete() { - logger.log("completed"); - } - }; + .subscribeWith(new SimpleSubscriber<>(threads, 1)); } private Scheduler threadPoolScheduler(int poolSize, int queueSize) { @@ -79,7 +52,8 @@ public class ReactiveBatchProcessor { poolSize, 0L, TimeUnit.SECONDS, - new LinkedBlockingDeque<>(queueSize) + new LinkedBlockingDeque<>(queueSize), + new WaitForCapacityPolicy() )); } diff --git a/reactive/src/main/java/io/reflectoring/reactive/batch/ReactiveBatchProcessorV1.java b/reactive/src/main/java/io/reflectoring/reactive/batch/ReactiveBatchProcessorV1.java new file mode 100644 index 0000000..dd6ad61 --- /dev/null +++ b/reactive/src/main/java/io/reflectoring/reactive/batch/ReactiveBatchProcessorV1.java @@ -0,0 +1,55 @@ +package io.reflectoring.reactive.batch; + +import io.reactivex.Flowable; +import io.reactivex.Scheduler; +import io.reactivex.Single; +import io.reactivex.schedulers.Schedulers; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +public class ReactiveBatchProcessorV1 { + + private final static Logger logger = new Logger(); + + private final int threads; + + private final int threadPoolQueueSize; + + private final MessageHandler messageHandler; + + private final MessageSource messageSource; + + public ReactiveBatchProcessorV1( + MessageSource messageSource, + MessageHandler messageHandler, + int threads, + int threadPoolQueueSize) { + this.messageSource = messageSource; + this.threads = threads; + this.messageHandler = messageHandler; + this.threadPoolQueueSize = threadPoolQueueSize; + } + + public void start() { + messageSource.getMessageBatches() + .subscribeOn(Schedulers.from(Executors.newSingleThreadExecutor())) + .doOnNext(batch -> logger.log(batch.toString())) + .flatMap(batch -> Flowable.fromIterable(batch.getMessages())) + .flatMapSingle(m -> Single.just(messageHandler.handleMessage(m)) + .subscribeOn(threadPoolScheduler(threads, threadPoolQueueSize))) + .subscribeWith(new SimpleSubscriber<>(threads, 1)); + } + + private Scheduler threadPoolScheduler(int poolSize, int queueSize) { + return Schedulers.from(new ThreadPoolExecutor( + poolSize, + poolSize, + 0L, + TimeUnit.SECONDS, + new LinkedBlockingDeque<>(queueSize) + )); + } + +} diff --git a/reactive/src/main/java/io/reflectoring/reactive/batch/ReactiveBatchProcessorV2.java b/reactive/src/main/java/io/reflectoring/reactive/batch/ReactiveBatchProcessorV2.java new file mode 100644 index 0000000..e70298c --- /dev/null +++ b/reactive/src/main/java/io/reflectoring/reactive/batch/ReactiveBatchProcessorV2.java @@ -0,0 +1,55 @@ +package io.reflectoring.reactive.batch; + +import io.reactivex.Flowable; +import io.reactivex.Scheduler; +import io.reactivex.Single; +import io.reactivex.schedulers.Schedulers; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +public class ReactiveBatchProcessorV2 { + + private final static Logger logger = new Logger(); + + private final int threads; + + private final int threadPoolQueueSize; + + private final MessageHandler messageHandler; + + private final MessageSource messageSource; + + public ReactiveBatchProcessorV2( + MessageSource messageSource, + MessageHandler messageHandler, + int threads, + int threadPoolQueueSize) { + this.messageSource = messageSource; + this.threads = threads; + this.messageHandler = messageHandler; + this.threadPoolQueueSize = threadPoolQueueSize; + } + + public void start() { + messageSource.getMessageBatches() + .subscribeOn(Schedulers.from(Executors.newSingleThreadExecutor())) + .doOnNext(batch -> logger.log(batch.toString())) + .flatMap(batch -> Flowable.fromIterable(batch.getMessages())) + .flatMapSingle(m -> Single.defer(() -> Single.just(messageHandler.handleMessage(m))) + .subscribeOn(threadPoolScheduler(threads, threadPoolQueueSize))) + .subscribeWith(new SimpleSubscriber<>(threads, 1)); + } + + private Scheduler threadPoolScheduler(int poolSize, int queueSize) { + return Schedulers.from(new ThreadPoolExecutor( + poolSize, + poolSize, + 0L, + TimeUnit.SECONDS, + new LinkedBlockingDeque<>(queueSize) + )); + } + +} diff --git a/reactive/src/main/java/io/reflectoring/reactive/batch/ReactiveBatchProcessorV3.java b/reactive/src/main/java/io/reflectoring/reactive/batch/ReactiveBatchProcessorV3.java new file mode 100644 index 0000000..fc81c47 --- /dev/null +++ b/reactive/src/main/java/io/reflectoring/reactive/batch/ReactiveBatchProcessorV3.java @@ -0,0 +1,57 @@ +package io.reflectoring.reactive.batch; + +import io.reactivex.Flowable; +import io.reactivex.Scheduler; +import io.reactivex.Single; +import io.reactivex.schedulers.Schedulers; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +public class ReactiveBatchProcessorV3 { + + private final static Logger logger = new Logger(); + + private final int threads; + + private final int threadPoolQueueSize; + + private final MessageHandler messageHandler; + + private final MessageSource messageSource; + + public ReactiveBatchProcessorV3( + MessageSource messageSource, + MessageHandler messageHandler, + int threads, + int threadPoolQueueSize) { + this.messageSource = messageSource; + this.threads = threads; + this.messageHandler = messageHandler; + this.threadPoolQueueSize = threadPoolQueueSize; + } + + public void start() { + Scheduler scheduler = threadPoolScheduler(threads, threadPoolQueueSize); + + messageSource.getMessageBatches() + .subscribeOn(Schedulers.from(Executors.newSingleThreadExecutor())) + .doOnNext(batch -> logger.log(batch.toString())) + .flatMap(batch -> Flowable.fromIterable(batch.getMessages())) + .flatMapSingle(m -> Single.defer(() -> Single.just(messageHandler.handleMessage(m))) + .subscribeOn(scheduler)) + .subscribeWith(new SimpleSubscriber<>(threads, 1)); + } + + private Scheduler threadPoolScheduler(int poolSize, int queueSize) { + return Schedulers.from(new ThreadPoolExecutor( + poolSize, + poolSize, + 0L, + TimeUnit.SECONDS, + new LinkedBlockingDeque<>(queueSize) + )); + } + +} diff --git a/reactive/src/main/java/io/reflectoring/reactive/batch/SimpleSubscriber.java b/reactive/src/main/java/io/reflectoring/reactive/batch/SimpleSubscriber.java new file mode 100644 index 0000000..30c0848 --- /dev/null +++ b/reactive/src/main/java/io/reflectoring/reactive/batch/SimpleSubscriber.java @@ -0,0 +1,42 @@ +package io.reflectoring.reactive.batch; + +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; + +public class SimpleSubscriber implements Subscriber { + + private final Logger logger = new Logger(); + + private final int initialFetchCount; + private final int onNextFetchCount; + + private Subscription subscription; + + public SimpleSubscriber(int initialFetchCount, int onNextFetchCount) { + this.initialFetchCount = initialFetchCount; + this.onNextFetchCount = onNextFetchCount; + } + + @Override + public void onSubscribe(Subscription subscription) { + this.subscription = subscription; + subscription.request(initialFetchCount); + logger.log("subscribed"); + } + + @Override + public void onNext(T message) { + subscription.request(onNextFetchCount); + } + + @Override + public void onError(Throwable t) { + logger.log("error"); + } + + @Override + public void onComplete() { + logger.log("completed"); + } + +} diff --git a/reactive/src/main/java/io/reflectoring/reactive/batch/RetryRejectedExecutionHandler.java b/reactive/src/main/java/io/reflectoring/reactive/batch/WaitForCapacityPolicy.java similarity index 85% rename from reactive/src/main/java/io/reflectoring/reactive/batch/RetryRejectedExecutionHandler.java rename to reactive/src/main/java/io/reflectoring/reactive/batch/WaitForCapacityPolicy.java index 8f9429e..2a783f5 100644 --- a/reactive/src/main/java/io/reflectoring/reactive/batch/RetryRejectedExecutionHandler.java +++ b/reactive/src/main/java/io/reflectoring/reactive/batch/WaitForCapacityPolicy.java @@ -4,7 +4,7 @@ import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.ThreadPoolExecutor; -public class RetryRejectedExecutionHandler implements RejectedExecutionHandler { +public class WaitForCapacityPolicy implements RejectedExecutionHandler { @Override public void rejectedExecution(Runnable runnable, ThreadPoolExecutor threadPoolExecutor) { diff --git a/reactive/src/test/java/io/reflectoring/ReactiveBatchProcessorTest.java b/reactive/src/test/java/io/reflectoring/ReactiveBatchProcessorTest.java index f859868..f38dbd5 100644 --- a/reactive/src/test/java/io/reflectoring/ReactiveBatchProcessorTest.java +++ b/reactive/src/test/java/io/reflectoring/ReactiveBatchProcessorTest.java @@ -5,6 +5,9 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import io.reflectoring.reactive.batch.MessageSource; import io.reflectoring.reactive.batch.ReactiveBatchProcessor; +import io.reflectoring.reactive.batch.ReactiveBatchProcessorV1; +import io.reflectoring.reactive.batch.ReactiveBatchProcessorV2; +import io.reflectoring.reactive.batch.ReactiveBatchProcessorV3; import java.util.concurrent.TimeUnit; import org.junit.jupiter.api.Test; @@ -13,9 +16,10 @@ public class ReactiveBatchProcessorTest { @Test public void allMessagesAreProcessedOnMultipleThreads() { - int batches = 3; - int batchSize = 4; - int threads = 5; + int batches = 10; + int batchSize = 3; + int threads = 2; + int threadPoolQueueSize = 10; MessageSource messageSource = new TestMessageSource(batches, batchSize); TestMessageHandler messageHandler = new TestMessageHandler(); @@ -23,8 +27,8 @@ public class ReactiveBatchProcessorTest { ReactiveBatchProcessor processor = new ReactiveBatchProcessor( messageSource, messageHandler, - threads - ); + threads, + threadPoolQueueSize); processor.start();