diff --git a/aws/aws-hello-world/gradlew b/aws/aws-hello-world/gradlew old mode 100644 new mode 100755 diff --git a/aws/aws-hello-world/gradlew.bat b/aws/aws-hello-world/gradlew.bat old mode 100644 new mode 100755 diff --git a/reactive/src/main/java/io/reflectoring/reactive/batch/MessageHandler.java b/reactive/src/main/java/io/reflectoring/reactive/batch/MessageHandler.java index 66920f7..eceb1fc 100644 --- a/reactive/src/main/java/io/reflectoring/reactive/batch/MessageHandler.java +++ b/reactive/src/main/java/io/reflectoring/reactive/batch/MessageHandler.java @@ -1,34 +1,36 @@ package io.reflectoring.reactive.batch; import io.reactivex.Single; + import java.util.concurrent.atomic.AtomicInteger; public class MessageHandler { - private final AtomicInteger processedMessages = new AtomicInteger(); + private final AtomicInteger processedMessages = new AtomicInteger(); - private Logger logger = new Logger(); + private Logger logger = new Logger(); - enum Result { - SUCCESS, - FAILURE - } - - public Single handleMessage(Message message){ - sleep(500); - logger.log(String.format("processed message %s", message)); - return Single.just(Result.SUCCESS); - } - - private void sleep(long millis) { - try { - Thread.sleep(millis); - } catch (InterruptedException e) { - throw new RuntimeException(e); + enum Result { + SUCCESS, + FAILURE } - } - public AtomicInteger getProcessedMessages() { - return processedMessages; - } + public Result handleMessage(Message message) { + logger.log(String.format("handling message %s", message)); + sleep(500); + this.processedMessages.getAndAdd(1); + return Result.SUCCESS; + } + + private void sleep(long millis) { + try { + Thread.sleep(millis); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + + public Integer getProcessedMessages() { + return processedMessages.get(); + } } diff --git a/reactive/src/main/java/io/reflectoring/reactive/batch/ReactiveBatchProcessor.java b/reactive/src/main/java/io/reflectoring/reactive/batch/ReactiveBatchProcessor.java index 319e5ce..4b3e059 100644 --- a/reactive/src/main/java/io/reflectoring/reactive/batch/ReactiveBatchProcessor.java +++ b/reactive/src/main/java/io/reflectoring/reactive/batch/ReactiveBatchProcessor.java @@ -5,90 +5,95 @@ import io.reactivex.Scheduler; import io.reactivex.Single; import io.reactivex.schedulers.Schedulers; import io.reflectoring.reactive.batch.MessageHandler.Result; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; + import java.util.ArrayList; import java.util.List; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import org.reactivestreams.Subscriber; -import org.reactivestreams.Subscription; public class ReactiveBatchProcessor { - private final static Logger logger = new Logger(); + private final static Logger logger = new Logger(); - private final int MESSAGE_BATCHES = 10; + private final int MESSAGE_BATCHES = 10; - private final int BATCH_SIZE = 3; + private final int BATCH_SIZE = 3; - private final int THREADS = 4; + private final int THREADS = 4; - private final MessageHandler messageHandler = new MessageHandler(); + private final MessageHandler messageHandler = new MessageHandler(); - public void start() { - retrieveMessageBatches() - .doOnNext(batch -> logger.log(batch.toString())) - .flatMap(batch -> Flowable.fromIterable(batch.getMessages())) - .flatMapSingle(message -> messageHandler.handleMessage(message) - .subscribeOn(threadPoolScheduler(THREADS, 10))) - .subscribeWith(subscriber()); - } + public void start() { - private Subscriber subscriber() { - return new Subscriber<>() { - private Subscription subscription; + Scheduler threadPoolScheduler = threadPoolScheduler(THREADS, 10); - @Override - public void onSubscribe(Subscription subscription) { - this.subscription = subscription; - subscription.request(THREADS); - logger.log("subscribed"); - } - - @Override - public void onNext(Result message) { - subscription.request(THREADS); - } - - @Override - public void onError(Throwable t) { - logger.log("error"); - } - - @Override - public void onComplete() { - logger.log("completed"); - } - }; - } - - private Scheduler threadPoolScheduler(int poolSize, int queueSize) { - return Schedulers.from(new ThreadPoolExecutor( - poolSize, - poolSize, - 0L, - TimeUnit.SECONDS, - new LinkedBlockingDeque<>(queueSize) - )); - } - - public boolean allMessagesProcessed() { - return this.messageHandler.getProcessedMessages().get() == MESSAGE_BATCHES * BATCH_SIZE; - } - - private Flowable retrieveMessageBatches() { - return Flowable.range(1, MESSAGE_BATCHES) - .map(this::messageBatch); - } - - private MessageBatch messageBatch(int batchNumber) { - List messages = new ArrayList<>(); - for (int i = 1; i <= BATCH_SIZE; i++) { - messages.add(new Message(String.format("%d-%d", batchNumber, i))); + retrieveMessageBatches() + .doOnNext(batch -> logger.log(batch.toString())) + .flatMap(batch -> Flowable.fromIterable(batch.getMessages())) + .flatMapSingle(message -> Single.defer(() -> Single.just(messageHandler.handleMessage(message))) + .doOnSuccess(result -> logger.log("message handled")) + .subscribeOn(threadPoolScheduler)) + .subscribeWith(subscriber()); + } + + private Subscriber subscriber() { + return new Subscriber<>() { + private Subscription subscription; + + @Override + public void onSubscribe(Subscription subscription) { + this.subscription = subscription; + subscription.request(THREADS); + logger.log("subscribed"); + } + + @Override + public void onNext(Result message) { + subscription.request(THREADS); + } + + @Override + public void onError(Throwable t) { + logger.log("error"); + } + + @Override + public void onComplete() { + logger.log("completed"); + } + }; + } + + private Scheduler threadPoolScheduler(int poolSize, int queueSize) { + return Schedulers.from(new ThreadPoolExecutor( + poolSize, + poolSize, + 0L, + TimeUnit.SECONDS, + new LinkedBlockingDeque<>(queueSize), + new RetryRejectedExecutionHandler() + )); + } + + public boolean allMessagesProcessed() { + return this.messageHandler.getProcessedMessages() == MESSAGE_BATCHES * BATCH_SIZE; + } + + private Flowable retrieveMessageBatches() { + return Flowable.range(1, MESSAGE_BATCHES) + .map(this::messageBatch); + } + + private MessageBatch messageBatch(int batchNumber) { + List messages = new ArrayList<>(); + for (int i = 1; i <= BATCH_SIZE; i++) { + messages.add(new Message(String.format("%d-%d", batchNumber, i))); + } + return new MessageBatch(messages); } - return new MessageBatch(messages); - } } diff --git a/reactive/src/main/java/io/reflectoring/reactive/batch/RetryRejectedExecutionHandler.java b/reactive/src/main/java/io/reflectoring/reactive/batch/RetryRejectedExecutionHandler.java new file mode 100644 index 0000000..8f9429e --- /dev/null +++ b/reactive/src/main/java/io/reflectoring/reactive/batch/RetryRejectedExecutionHandler.java @@ -0,0 +1,18 @@ +package io.reflectoring.reactive.batch; + +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.RejectedExecutionHandler; +import java.util.concurrent.ThreadPoolExecutor; + +public class RetryRejectedExecutionHandler implements RejectedExecutionHandler { + + @Override + public void rejectedExecution(Runnable runnable, ThreadPoolExecutor threadPoolExecutor) { + try { + threadPoolExecutor.getQueue().put(runnable); + } catch (InterruptedException e) { + throw new RejectedExecutionException(e); + } + } + +}