Reactive Batch Processing Example

This commit is contained in:
Tom Hombergs
2020-01-31 19:48:06 +11:00
parent 5fe5deca66
commit ec2bdc9327
11 changed files with 185 additions and 112 deletions

View File

@@ -30,6 +30,7 @@ build_gradle_module() {
chmod +x gradlew
build_gradle_module "reactive"
build_gradle_module "junit/assumptions"
build_gradle_module "logging"
build_gradle_module "pact/pact-feign-consumer"

View File

@@ -15,9 +15,9 @@ repositories {
}
dependencies {
compile 'io.reactivex.rxjava2:rxjava:2.2.17'
testCompile 'org.junit.jupiter:junit-jupiter-engine:5.0.1'
testCompile 'org.awaitility:awaitility:3.0.0'
implementation 'io.reactivex.rxjava2:rxjava:2.2.17'
testImplementation 'org.junit.jupiter:junit-jupiter-engine:5.0.1'
testImplementation 'org.awaitility:awaitility:3.0.0'
}
test {

Binary file not shown.

View File

@@ -1,5 +1,5 @@
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-6.0.1-bin.zip
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-5.6.4-bin.zip

6
reactive/gradlew vendored
View File

@@ -33,11 +33,11 @@ DEFAULT_JVM_OPTS=""
# Use the maximum available, or set MAX_FD != -1 to use that value.
MAX_FD="maximum"
warn ( ) {
warn () {
echo "$*"
}
die ( ) {
die () {
echo
echo "$*"
echo
@@ -155,7 +155,7 @@ if $cygwin ; then
fi
# Escape application args
save ( ) {
save () {
for i do printf %s\\n "$i" | sed "s/'/'\\\\''/g;1s/^/'/;\$s/\$/' \\\\/" ; done
echo " "
}

View File

@@ -1,36 +1,12 @@
package io.reflectoring.reactive.batch;
import io.reactivex.Single;
public interface MessageHandler {
import java.util.concurrent.atomic.AtomicInteger;
enum Result {
SUCCESS,
FAILURE
}
public class MessageHandler {
Result handleMessage(Message message);
private final AtomicInteger processedMessages = new AtomicInteger();
private Logger logger = new Logger();
enum Result {
SUCCESS,
FAILURE
}
public Result handleMessage(Message message) {
logger.log(String.format("handling message %s", message));
sleep(500);
this.processedMessages.getAndAdd(1);
return Result.SUCCESS;
}
private void sleep(long millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
public Integer getProcessedMessages() {
return processedMessages.get();
}
}

View File

@@ -0,0 +1,9 @@
package io.reflectoring.reactive.batch;
import io.reactivex.Flowable;
public interface MessageSource {
Flowable<MessageBatch> getMessageBatches();
}

View File

@@ -5,95 +5,82 @@ import io.reactivex.Scheduler;
import io.reactivex.Single;
import io.reactivex.schedulers.Schedulers;
import io.reflectoring.reactive.batch.MessageHandler.Result;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
public class ReactiveBatchProcessor {
private final static Logger logger = new Logger();
private final static Logger logger = new Logger();
private final int MESSAGE_BATCHES = 10;
private final int threads;
private final int BATCH_SIZE = 3;
private final MessageHandler messageHandler;
private final int THREADS = 4;
private final MessageSource messageSource;
private final MessageHandler messageHandler = new MessageHandler();
public ReactiveBatchProcessor(
MessageSource messageSource,
MessageHandler messageHandler,
int threads) {
this.messageSource = messageSource;
this.threads = threads;
this.messageHandler = messageHandler;
}
public void start() {
public void start() {
Scheduler threadPoolScheduler = threadPoolScheduler(THREADS, 10);
Scheduler scheduler = threadPoolScheduler(threads, 10);
retrieveMessageBatches()
.doOnNext(batch -> logger.log(batch.toString()))
.flatMap(batch -> Flowable.fromIterable(batch.getMessages()))
.flatMapSingle(message -> Single.defer(() -> Single.just(messageHandler.handleMessage(message)))
.doOnSuccess(result -> logger.log("message handled"))
.subscribeOn(threadPoolScheduler))
.subscribeWith(subscriber());
}
messageSource.getMessageBatches()
.subscribeOn(Schedulers.from(Executors.newSingleThreadExecutor()))
.doOnNext(batch -> logger.log(batch.toString()))
.flatMap(batch -> Flowable.fromIterable(batch.getMessages()))
.flatMapSingle(m -> Single.defer(() -> Single.just(m)
.map(messageHandler::handleMessage))
.subscribeOn(scheduler))
.subscribeWith(subscriber());
}
private Subscriber<MessageHandler.Result> subscriber() {
return new Subscriber<>() {
private Subscription subscription;
private Subscriber<MessageHandler.Result> subscriber() {
return new Subscriber<>() {
private Subscription subscription;
@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
subscription.request(THREADS);
logger.log("subscribed");
}
@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
subscription.request(threads);
logger.log("subscribed");
}
@Override
public void onNext(Result message) {
subscription.request(THREADS);
}
@Override
public void onNext(Result message) {
subscription.request(threads);
}
@Override
public void onError(Throwable t) {
logger.log("error");
}
@Override
public void onError(Throwable t) {
logger.log("error");
}
@Override
public void onComplete() {
logger.log("completed");
}
};
}
private Scheduler threadPoolScheduler(int poolSize, int queueSize) {
return Schedulers.from(new ThreadPoolExecutor(
poolSize,
poolSize,
0L,
TimeUnit.SECONDS,
new LinkedBlockingDeque<>(queueSize),
new RetryRejectedExecutionHandler()
));
}
public boolean allMessagesProcessed() {
return this.messageHandler.getProcessedMessages() == MESSAGE_BATCHES * BATCH_SIZE;
}
private Flowable<MessageBatch> retrieveMessageBatches() {
return Flowable.range(1, MESSAGE_BATCHES)
.map(this::messageBatch);
}
private MessageBatch messageBatch(int batchNumber) {
List<Message> messages = new ArrayList<>();
for (int i = 1; i <= BATCH_SIZE; i++) {
messages.add(new Message(String.format("%d-%d", batchNumber, i)));
}
return new MessageBatch(messages);
}
@Override
public void onComplete() {
logger.log("completed");
}
};
}
private Scheduler threadPoolScheduler(int poolSize, int queueSize) {
return Schedulers.from(new ThreadPoolExecutor(
poolSize,
poolSize,
0L,
TimeUnit.SECONDS,
new LinkedBlockingDeque<>(queueSize)
));
}
}

View File

@@ -1,8 +1,9 @@
package io.reflectoring;
import static org.awaitility.Awaitility.await;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.assertEquals;
import io.reflectoring.reactive.batch.MessageSource;
import io.reflectoring.reactive.batch.ReactiveBatchProcessor;
import java.util.concurrent.TimeUnit;
import org.junit.jupiter.api.Test;
@@ -10,16 +11,29 @@ import org.junit.jupiter.api.Test;
public class ReactiveBatchProcessorTest {
@Test
public void test() {
ReactiveBatchProcessor processor = new ReactiveBatchProcessor();
public void allMessagesAreProcessedOnMultipleThreads() {
int batches = 3;
int batchSize = 4;
int threads = 5;
MessageSource messageSource = new TestMessageSource(batches, batchSize);
TestMessageHandler messageHandler = new TestMessageHandler();
ReactiveBatchProcessor processor = new ReactiveBatchProcessor(
messageSource,
messageHandler,
threads
);
processor.start();
await()
.atMost(10, TimeUnit.SECONDS)
.pollInterval(1, TimeUnit.SECONDS)
.untilAsserted(() -> assertTrue(processor.allMessagesProcessed()));
.untilAsserted(() -> assertEquals(batches * batchSize, messageHandler.getProcessedMessages()));
assertEquals(threads, messageHandler.threadNames().size());
}
}

View File

@@ -0,0 +1,43 @@
package io.reflectoring;
import io.reflectoring.reactive.batch.Logger;
import io.reflectoring.reactive.batch.Message;
import io.reflectoring.reactive.batch.MessageHandler;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
public class TestMessageHandler implements MessageHandler {
private final AtomicInteger processedMessages = new AtomicInteger();
private final AtomicReference<Set<String>> threadNames = new AtomicReference<>(new HashSet<>());
private Logger logger = new Logger();
@Override
public Result handleMessage(Message message) {
sleep(500);
logger.log(String.format("processed message %s", message));
threadNames.get().add(Thread.currentThread().getName());
processedMessages.addAndGet(1);
return Result.SUCCESS;
}
private void sleep(long millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
public int getProcessedMessages() {
return processedMessages.get();
}
public Set<String> threadNames() {
return threadNames.get();
}
}

View File

@@ -0,0 +1,43 @@
package io.reflectoring;
import io.reactivex.Flowable;
import io.reflectoring.reactive.batch.Message;
import io.reflectoring.reactive.batch.MessageBatch;
import io.reflectoring.reactive.batch.MessageSource;
import java.util.ArrayList;
import java.util.List;
public class TestMessageSource implements MessageSource {
private final int batches;
private final int batchSize;
/**
* Constructor.
*
* @param batches the number of message batches to produce.
* @param batchSize the number of messages per batch.
*/
public TestMessageSource(int batches, int batchSize) {
this.batches = batches;
this.batchSize = batchSize;
}
/**
* Generates a limited number of {@link MessageBatch}es.
*/
public Flowable<MessageBatch> getMessageBatches() {
return Flowable.range(1, batches)
.map(this::messageBatch);
}
private MessageBatch messageBatch(int batchNumber) {
List<Message> messages = new ArrayList<>();
for (int i = 1; i <= batchSize; i++) {
messages.add(new Message(String.format("%d-%d", batchNumber, i)));
}
return new MessageBatch(messages);
}
}