added Single.defer to fan out messages over multiple threads

This commit is contained in:
Tom Hombergs
2020-01-31 09:18:02 +11:00
parent 3fb809911a
commit 5fe5deca66
5 changed files with 115 additions and 90 deletions

View File

@@ -1,34 +1,36 @@
package io.reflectoring.reactive.batch;
import io.reactivex.Single;
import java.util.concurrent.atomic.AtomicInteger;
public class MessageHandler {
private final AtomicInteger processedMessages = new AtomicInteger();
private final AtomicInteger processedMessages = new AtomicInteger();
private Logger logger = new Logger();
private Logger logger = new Logger();
enum Result {
SUCCESS,
FAILURE
}
public Single<Result> handleMessage(Message message){
sleep(500);
logger.log(String.format("processed message %s", message));
return Single.just(Result.SUCCESS);
}
private void sleep(long millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
throw new RuntimeException(e);
enum Result {
SUCCESS,
FAILURE
}
}
public AtomicInteger getProcessedMessages() {
return processedMessages;
}
public Result handleMessage(Message message) {
logger.log(String.format("handling message %s", message));
sleep(500);
this.processedMessages.getAndAdd(1);
return Result.SUCCESS;
}
private void sleep(long millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
public Integer getProcessedMessages() {
return processedMessages.get();
}
}

View File

@@ -5,90 +5,95 @@ import io.reactivex.Scheduler;
import io.reactivex.Single;
import io.reactivex.schedulers.Schedulers;
import io.reflectoring.reactive.batch.MessageHandler.Result;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
public class ReactiveBatchProcessor {
private final static Logger logger = new Logger();
private final static Logger logger = new Logger();
private final int MESSAGE_BATCHES = 10;
private final int MESSAGE_BATCHES = 10;
private final int BATCH_SIZE = 3;
private final int BATCH_SIZE = 3;
private final int THREADS = 4;
private final int THREADS = 4;
private final MessageHandler messageHandler = new MessageHandler();
private final MessageHandler messageHandler = new MessageHandler();
public void start() {
retrieveMessageBatches()
.doOnNext(batch -> logger.log(batch.toString()))
.flatMap(batch -> Flowable.fromIterable(batch.getMessages()))
.flatMapSingle(message -> messageHandler.handleMessage(message)
.subscribeOn(threadPoolScheduler(THREADS, 10)))
.subscribeWith(subscriber());
}
public void start() {
private Subscriber<MessageHandler.Result> subscriber() {
return new Subscriber<>() {
private Subscription subscription;
Scheduler threadPoolScheduler = threadPoolScheduler(THREADS, 10);
@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
subscription.request(THREADS);
logger.log("subscribed");
}
@Override
public void onNext(Result message) {
subscription.request(THREADS);
}
@Override
public void onError(Throwable t) {
logger.log("error");
}
@Override
public void onComplete() {
logger.log("completed");
}
};
}
private Scheduler threadPoolScheduler(int poolSize, int queueSize) {
return Schedulers.from(new ThreadPoolExecutor(
poolSize,
poolSize,
0L,
TimeUnit.SECONDS,
new LinkedBlockingDeque<>(queueSize)
));
}
public boolean allMessagesProcessed() {
return this.messageHandler.getProcessedMessages().get() == MESSAGE_BATCHES * BATCH_SIZE;
}
private Flowable<MessageBatch> retrieveMessageBatches() {
return Flowable.range(1, MESSAGE_BATCHES)
.map(this::messageBatch);
}
private MessageBatch messageBatch(int batchNumber) {
List<Message> messages = new ArrayList<>();
for (int i = 1; i <= BATCH_SIZE; i++) {
messages.add(new Message(String.format("%d-%d", batchNumber, i)));
retrieveMessageBatches()
.doOnNext(batch -> logger.log(batch.toString()))
.flatMap(batch -> Flowable.fromIterable(batch.getMessages()))
.flatMapSingle(message -> Single.defer(() -> Single.just(messageHandler.handleMessage(message)))
.doOnSuccess(result -> logger.log("message handled"))
.subscribeOn(threadPoolScheduler))
.subscribeWith(subscriber());
}
private Subscriber<MessageHandler.Result> subscriber() {
return new Subscriber<>() {
private Subscription subscription;
@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
subscription.request(THREADS);
logger.log("subscribed");
}
@Override
public void onNext(Result message) {
subscription.request(THREADS);
}
@Override
public void onError(Throwable t) {
logger.log("error");
}
@Override
public void onComplete() {
logger.log("completed");
}
};
}
private Scheduler threadPoolScheduler(int poolSize, int queueSize) {
return Schedulers.from(new ThreadPoolExecutor(
poolSize,
poolSize,
0L,
TimeUnit.SECONDS,
new LinkedBlockingDeque<>(queueSize),
new RetryRejectedExecutionHandler()
));
}
public boolean allMessagesProcessed() {
return this.messageHandler.getProcessedMessages() == MESSAGE_BATCHES * BATCH_SIZE;
}
private Flowable<MessageBatch> retrieveMessageBatches() {
return Flowable.range(1, MESSAGE_BATCHES)
.map(this::messageBatch);
}
private MessageBatch messageBatch(int batchNumber) {
List<Message> messages = new ArrayList<>();
for (int i = 1; i <= BATCH_SIZE; i++) {
messages.add(new Message(String.format("%d-%d", batchNumber, i)));
}
return new MessageBatch(messages);
}
return new MessageBatch(messages);
}
}

View File

@@ -0,0 +1,18 @@
package io.reflectoring.reactive.batch;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
public class RetryRejectedExecutionHandler implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable runnable, ThreadPoolExecutor threadPoolExecutor) {
try {
threadPoolExecutor.getQueue().put(runnable);
} catch (InterruptedException e) {
throw new RejectedExecutionException(e);
}
}
}