finished reactive example

This commit is contained in:
thombergs
2020-02-02 06:45:58 +11:00
parent ec2bdc9327
commit b6d167f4ae
7 changed files with 228 additions and 41 deletions

View File

@@ -4,13 +4,10 @@ import io.reactivex.Flowable;
import io.reactivex.Scheduler;
import io.reactivex.Single;
import io.reactivex.schedulers.Schedulers;
import io.reflectoring.reactive.batch.MessageHandler.Result;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
public class ReactiveBatchProcessor {
@@ -18,6 +15,8 @@ public class ReactiveBatchProcessor {
private final int threads;
private final int threadPoolQueueSize;
private final MessageHandler messageHandler;
private final MessageSource messageSource;
@@ -25,15 +24,17 @@ public class ReactiveBatchProcessor {
public ReactiveBatchProcessor(
MessageSource messageSource,
MessageHandler messageHandler,
int threads) {
int threads,
int threadPoolQueueSize) {
this.messageSource = messageSource;
this.threads = threads;
this.messageHandler = messageHandler;
this.threadPoolQueueSize = threadPoolQueueSize;
}
public void start() {
Scheduler scheduler = threadPoolScheduler(threads, 10);
Scheduler scheduler = threadPoolScheduler(threads, threadPoolQueueSize);
messageSource.getMessageBatches()
.subscribeOn(Schedulers.from(Executors.newSingleThreadExecutor()))
@@ -42,35 +43,7 @@ public class ReactiveBatchProcessor {
.flatMapSingle(m -> Single.defer(() -> Single.just(m)
.map(messageHandler::handleMessage))
.subscribeOn(scheduler))
.subscribeWith(subscriber());
}
private Subscriber<MessageHandler.Result> subscriber() {
return new Subscriber<>() {
private Subscription subscription;
@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
subscription.request(threads);
logger.log("subscribed");
}
@Override
public void onNext(Result message) {
subscription.request(threads);
}
@Override
public void onError(Throwable t) {
logger.log("error");
}
@Override
public void onComplete() {
logger.log("completed");
}
};
.subscribeWith(new SimpleSubscriber<>(threads, 1));
}
private Scheduler threadPoolScheduler(int poolSize, int queueSize) {
@@ -79,7 +52,8 @@ public class ReactiveBatchProcessor {
poolSize,
0L,
TimeUnit.SECONDS,
new LinkedBlockingDeque<>(queueSize)
new LinkedBlockingDeque<>(queueSize),
new WaitForCapacityPolicy()
));
}

View File

@@ -0,0 +1,55 @@
package io.reflectoring.reactive.batch;
import io.reactivex.Flowable;
import io.reactivex.Scheduler;
import io.reactivex.Single;
import io.reactivex.schedulers.Schedulers;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class ReactiveBatchProcessorV1 {
private final static Logger logger = new Logger();
private final int threads;
private final int threadPoolQueueSize;
private final MessageHandler messageHandler;
private final MessageSource messageSource;
public ReactiveBatchProcessorV1(
MessageSource messageSource,
MessageHandler messageHandler,
int threads,
int threadPoolQueueSize) {
this.messageSource = messageSource;
this.threads = threads;
this.messageHandler = messageHandler;
this.threadPoolQueueSize = threadPoolQueueSize;
}
public void start() {
messageSource.getMessageBatches()
.subscribeOn(Schedulers.from(Executors.newSingleThreadExecutor()))
.doOnNext(batch -> logger.log(batch.toString()))
.flatMap(batch -> Flowable.fromIterable(batch.getMessages()))
.flatMapSingle(m -> Single.just(messageHandler.handleMessage(m))
.subscribeOn(threadPoolScheduler(threads, threadPoolQueueSize)))
.subscribeWith(new SimpleSubscriber<>(threads, 1));
}
private Scheduler threadPoolScheduler(int poolSize, int queueSize) {
return Schedulers.from(new ThreadPoolExecutor(
poolSize,
poolSize,
0L,
TimeUnit.SECONDS,
new LinkedBlockingDeque<>(queueSize)
));
}
}

View File

@@ -0,0 +1,55 @@
package io.reflectoring.reactive.batch;
import io.reactivex.Flowable;
import io.reactivex.Scheduler;
import io.reactivex.Single;
import io.reactivex.schedulers.Schedulers;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class ReactiveBatchProcessorV2 {
private final static Logger logger = new Logger();
private final int threads;
private final int threadPoolQueueSize;
private final MessageHandler messageHandler;
private final MessageSource messageSource;
public ReactiveBatchProcessorV2(
MessageSource messageSource,
MessageHandler messageHandler,
int threads,
int threadPoolQueueSize) {
this.messageSource = messageSource;
this.threads = threads;
this.messageHandler = messageHandler;
this.threadPoolQueueSize = threadPoolQueueSize;
}
public void start() {
messageSource.getMessageBatches()
.subscribeOn(Schedulers.from(Executors.newSingleThreadExecutor()))
.doOnNext(batch -> logger.log(batch.toString()))
.flatMap(batch -> Flowable.fromIterable(batch.getMessages()))
.flatMapSingle(m -> Single.defer(() -> Single.just(messageHandler.handleMessage(m)))
.subscribeOn(threadPoolScheduler(threads, threadPoolQueueSize)))
.subscribeWith(new SimpleSubscriber<>(threads, 1));
}
private Scheduler threadPoolScheduler(int poolSize, int queueSize) {
return Schedulers.from(new ThreadPoolExecutor(
poolSize,
poolSize,
0L,
TimeUnit.SECONDS,
new LinkedBlockingDeque<>(queueSize)
));
}
}

View File

@@ -0,0 +1,57 @@
package io.reflectoring.reactive.batch;
import io.reactivex.Flowable;
import io.reactivex.Scheduler;
import io.reactivex.Single;
import io.reactivex.schedulers.Schedulers;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class ReactiveBatchProcessorV3 {
private final static Logger logger = new Logger();
private final int threads;
private final int threadPoolQueueSize;
private final MessageHandler messageHandler;
private final MessageSource messageSource;
public ReactiveBatchProcessorV3(
MessageSource messageSource,
MessageHandler messageHandler,
int threads,
int threadPoolQueueSize) {
this.messageSource = messageSource;
this.threads = threads;
this.messageHandler = messageHandler;
this.threadPoolQueueSize = threadPoolQueueSize;
}
public void start() {
Scheduler scheduler = threadPoolScheduler(threads, threadPoolQueueSize);
messageSource.getMessageBatches()
.subscribeOn(Schedulers.from(Executors.newSingleThreadExecutor()))
.doOnNext(batch -> logger.log(batch.toString()))
.flatMap(batch -> Flowable.fromIterable(batch.getMessages()))
.flatMapSingle(m -> Single.defer(() -> Single.just(messageHandler.handleMessage(m)))
.subscribeOn(scheduler))
.subscribeWith(new SimpleSubscriber<>(threads, 1));
}
private Scheduler threadPoolScheduler(int poolSize, int queueSize) {
return Schedulers.from(new ThreadPoolExecutor(
poolSize,
poolSize,
0L,
TimeUnit.SECONDS,
new LinkedBlockingDeque<>(queueSize)
));
}
}

View File

@@ -0,0 +1,42 @@
package io.reflectoring.reactive.batch;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
public class SimpleSubscriber<T> implements Subscriber<T> {
private final Logger logger = new Logger();
private final int initialFetchCount;
private final int onNextFetchCount;
private Subscription subscription;
public SimpleSubscriber(int initialFetchCount, int onNextFetchCount) {
this.initialFetchCount = initialFetchCount;
this.onNextFetchCount = onNextFetchCount;
}
@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
subscription.request(initialFetchCount);
logger.log("subscribed");
}
@Override
public void onNext(T message) {
subscription.request(onNextFetchCount);
}
@Override
public void onError(Throwable t) {
logger.log("error");
}
@Override
public void onComplete() {
logger.log("completed");
}
}

View File

@@ -4,7 +4,7 @@ import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
public class RetryRejectedExecutionHandler implements RejectedExecutionHandler {
public class WaitForCapacityPolicy implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable runnable, ThreadPoolExecutor threadPoolExecutor) {

View File

@@ -5,6 +5,9 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import io.reflectoring.reactive.batch.MessageSource;
import io.reflectoring.reactive.batch.ReactiveBatchProcessor;
import io.reflectoring.reactive.batch.ReactiveBatchProcessorV1;
import io.reflectoring.reactive.batch.ReactiveBatchProcessorV2;
import io.reflectoring.reactive.batch.ReactiveBatchProcessorV3;
import java.util.concurrent.TimeUnit;
import org.junit.jupiter.api.Test;
@@ -13,9 +16,10 @@ public class ReactiveBatchProcessorTest {
@Test
public void allMessagesAreProcessedOnMultipleThreads() {
int batches = 3;
int batchSize = 4;
int threads = 5;
int batches = 10;
int batchSize = 3;
int threads = 2;
int threadPoolQueueSize = 10;
MessageSource messageSource = new TestMessageSource(batches, batchSize);
TestMessageHandler messageHandler = new TestMessageHandler();
@@ -23,8 +27,8 @@ public class ReactiveBatchProcessorTest {
ReactiveBatchProcessor processor = new ReactiveBatchProcessor(
messageSource,
messageHandler,
threads
);
threads,
threadPoolQueueSize);
processor.start();