diff --git a/reactive/build.gradle b/reactive/build.gradle index 2779c4c..9c61496 100644 --- a/reactive/build.gradle +++ b/reactive/build.gradle @@ -15,7 +15,7 @@ repositories { } dependencies { - implementation 'io.reactivex.rxjava2:rxjava:2.2.17' + implementation 'io.reactivex.rxjava3:rxjava:3.0.0-RC9' testImplementation 'org.junit.jupiter:junit-jupiter-engine:5.0.1' testImplementation 'org.awaitility:awaitility:3.0.0' } diff --git a/reactive/src/main/java/io/reflectoring/reactive/batch/Logger.java b/reactive/src/main/java/io/reflectoring/reactive/batch/Logger.java index cd89b3b..e39e729 100644 --- a/reactive/src/main/java/io/reflectoring/reactive/batch/Logger.java +++ b/reactive/src/main/java/io/reflectoring/reactive/batch/Logger.java @@ -1,8 +1,8 @@ package io.reflectoring.reactive.batch; -public class Logger { +class Logger { - public void log(String string) { + void log(String string) { System.out.println(String.format("%s %s: %s", System.currentTimeMillis(), Thread.currentThread().getName(), string)); } diff --git a/reactive/src/main/java/io/reflectoring/reactive/batch/Message.java b/reactive/src/main/java/io/reflectoring/reactive/batch/Message.java index 5dffd9d..a81eddd 100644 --- a/reactive/src/main/java/io/reflectoring/reactive/batch/Message.java +++ b/reactive/src/main/java/io/reflectoring/reactive/batch/Message.java @@ -1,10 +1,10 @@ package io.reflectoring.reactive.batch; -public class Message { +class Message { private final String content; - public Message(String content) { + Message(String content) { this.content = content; } diff --git a/reactive/src/main/java/io/reflectoring/reactive/batch/MessageBatch.java b/reactive/src/main/java/io/reflectoring/reactive/batch/MessageBatch.java index 3484192..dae86a7 100644 --- a/reactive/src/main/java/io/reflectoring/reactive/batch/MessageBatch.java +++ b/reactive/src/main/java/io/reflectoring/reactive/batch/MessageBatch.java @@ -3,11 +3,11 @@ package io.reflectoring.reactive.batch; import java.util.Collections; import java.util.List; -public class MessageBatch { +class MessageBatch { private final List messages; - public MessageBatch(List messages) { + MessageBatch(List messages) { this.messages = messages; } diff --git a/reactive/src/main/java/io/reflectoring/reactive/batch/MessageHandler.java b/reactive/src/main/java/io/reflectoring/reactive/batch/MessageHandler.java index 664477b..381e623 100644 --- a/reactive/src/main/java/io/reflectoring/reactive/batch/MessageHandler.java +++ b/reactive/src/main/java/io/reflectoring/reactive/batch/MessageHandler.java @@ -1,6 +1,6 @@ package io.reflectoring.reactive.batch; -public interface MessageHandler { +interface MessageHandler { enum Result { SUCCESS, diff --git a/reactive/src/main/java/io/reflectoring/reactive/batch/MessageSource.java b/reactive/src/main/java/io/reflectoring/reactive/batch/MessageSource.java index ddbefcb..750ef72 100644 --- a/reactive/src/main/java/io/reflectoring/reactive/batch/MessageSource.java +++ b/reactive/src/main/java/io/reflectoring/reactive/batch/MessageSource.java @@ -1,8 +1,8 @@ package io.reflectoring.reactive.batch; -import io.reactivex.Flowable; +import io.reactivex.rxjava3.core.Flowable; -public interface MessageSource { +interface MessageSource { Flowable getMessageBatches(); diff --git a/reactive/src/main/java/io/reflectoring/reactive/batch/ReactiveBatchProcessor.java b/reactive/src/main/java/io/reflectoring/reactive/batch/ReactiveBatchProcessor.java index c833878..86a8c9a 100644 --- a/reactive/src/main/java/io/reflectoring/reactive/batch/ReactiveBatchProcessor.java +++ b/reactive/src/main/java/io/reflectoring/reactive/batch/ReactiveBatchProcessor.java @@ -1,9 +1,9 @@ package io.reflectoring.reactive.batch; -import io.reactivex.Flowable; -import io.reactivex.Scheduler; -import io.reactivex.Single; -import io.reactivex.schedulers.Schedulers; +import io.reactivex.rxjava3.core.Flowable; +import io.reactivex.rxjava3.core.Scheduler; +import io.reactivex.rxjava3.core.Single; +import io.reactivex.rxjava3.schedulers.Schedulers; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.ThreadPoolExecutor; @@ -21,7 +21,7 @@ public class ReactiveBatchProcessor { private final MessageSource messageSource; - public ReactiveBatchProcessor( + ReactiveBatchProcessor( MessageSource messageSource, MessageHandler messageHandler, int threads, @@ -32,7 +32,7 @@ public class ReactiveBatchProcessor { this.threadPoolQueueSize = threadPoolQueueSize; } - public void start() { + void start() { Scheduler scheduler = threadPoolScheduler(threads, threadPoolQueueSize); diff --git a/reactive/src/main/java/io/reflectoring/reactive/batch/ReactiveBatchProcessorV1.java b/reactive/src/main/java/io/reflectoring/reactive/batch/ReactiveBatchProcessorV1.java index dd6ad61..f40fd01 100644 --- a/reactive/src/main/java/io/reflectoring/reactive/batch/ReactiveBatchProcessorV1.java +++ b/reactive/src/main/java/io/reflectoring/reactive/batch/ReactiveBatchProcessorV1.java @@ -1,9 +1,9 @@ package io.reflectoring.reactive.batch; -import io.reactivex.Flowable; -import io.reactivex.Scheduler; -import io.reactivex.Single; -import io.reactivex.schedulers.Schedulers; +import io.reactivex.rxjava3.core.Flowable; +import io.reactivex.rxjava3.core.Scheduler; +import io.reactivex.rxjava3.core.Single; +import io.reactivex.rxjava3.schedulers.Schedulers; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.ThreadPoolExecutor; diff --git a/reactive/src/main/java/io/reflectoring/reactive/batch/ReactiveBatchProcessorV2.java b/reactive/src/main/java/io/reflectoring/reactive/batch/ReactiveBatchProcessorV2.java index e70298c..f4c7633 100644 --- a/reactive/src/main/java/io/reflectoring/reactive/batch/ReactiveBatchProcessorV2.java +++ b/reactive/src/main/java/io/reflectoring/reactive/batch/ReactiveBatchProcessorV2.java @@ -1,9 +1,9 @@ package io.reflectoring.reactive.batch; -import io.reactivex.Flowable; -import io.reactivex.Scheduler; -import io.reactivex.Single; -import io.reactivex.schedulers.Schedulers; +import io.reactivex.rxjava3.core.Flowable; +import io.reactivex.rxjava3.core.Scheduler; +import io.reactivex.rxjava3.core.Single; +import io.reactivex.rxjava3.schedulers.Schedulers; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.ThreadPoolExecutor; diff --git a/reactive/src/main/java/io/reflectoring/reactive/batch/ReactiveBatchProcessorV3.java b/reactive/src/main/java/io/reflectoring/reactive/batch/ReactiveBatchProcessorV3.java index fc81c47..2a2d312 100644 --- a/reactive/src/main/java/io/reflectoring/reactive/batch/ReactiveBatchProcessorV3.java +++ b/reactive/src/main/java/io/reflectoring/reactive/batch/ReactiveBatchProcessorV3.java @@ -1,9 +1,9 @@ package io.reflectoring.reactive.batch; -import io.reactivex.Flowable; -import io.reactivex.Scheduler; -import io.reactivex.Single; -import io.reactivex.schedulers.Schedulers; +import io.reactivex.rxjava3.core.Flowable; +import io.reactivex.rxjava3.core.Scheduler; +import io.reactivex.rxjava3.core.Single; +import io.reactivex.rxjava3.schedulers.Schedulers; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.ThreadPoolExecutor; diff --git a/reactive/src/main/java/io/reflectoring/reactive/batch/SimpleSubscriber.java b/reactive/src/main/java/io/reflectoring/reactive/batch/SimpleSubscriber.java index 30c0848..f260fb8 100644 --- a/reactive/src/main/java/io/reflectoring/reactive/batch/SimpleSubscriber.java +++ b/reactive/src/main/java/io/reflectoring/reactive/batch/SimpleSubscriber.java @@ -3,7 +3,7 @@ package io.reflectoring.reactive.batch; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; -public class SimpleSubscriber implements Subscriber { +class SimpleSubscriber implements Subscriber { private final Logger logger = new Logger(); @@ -12,7 +12,7 @@ public class SimpleSubscriber implements Subscriber { private Subscription subscription; - public SimpleSubscriber(int initialFetchCount, int onNextFetchCount) { + SimpleSubscriber(int initialFetchCount, int onNextFetchCount) { this.initialFetchCount = initialFetchCount; this.onNextFetchCount = onNextFetchCount; } diff --git a/reactive/src/main/java/io/reflectoring/reactive/batch/WaitForCapacityPolicy.java b/reactive/src/main/java/io/reflectoring/reactive/batch/WaitForCapacityPolicy.java index 2a783f5..69bda7b 100644 --- a/reactive/src/main/java/io/reflectoring/reactive/batch/WaitForCapacityPolicy.java +++ b/reactive/src/main/java/io/reflectoring/reactive/batch/WaitForCapacityPolicy.java @@ -4,7 +4,7 @@ import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.ThreadPoolExecutor; -public class WaitForCapacityPolicy implements RejectedExecutionHandler { +class WaitForCapacityPolicy implements RejectedExecutionHandler { @Override public void rejectedExecution(Runnable runnable, ThreadPoolExecutor threadPoolExecutor) { diff --git a/reactive/src/test/java/io/reflectoring/ReactiveBatchProcessorTest.java b/reactive/src/test/java/io/reflectoring/reactive/batch/ReactiveBatchProcessorTest.java similarity index 67% rename from reactive/src/test/java/io/reflectoring/ReactiveBatchProcessorTest.java rename to reactive/src/test/java/io/reflectoring/reactive/batch/ReactiveBatchProcessorTest.java index f38dbd5..10f97e3 100644 --- a/reactive/src/test/java/io/reflectoring/ReactiveBatchProcessorTest.java +++ b/reactive/src/test/java/io/reflectoring/reactive/batch/ReactiveBatchProcessorTest.java @@ -1,20 +1,15 @@ -package io.reflectoring; +package io.reflectoring.reactive.batch; import static org.awaitility.Awaitility.await; import static org.junit.jupiter.api.Assertions.assertEquals; -import io.reflectoring.reactive.batch.MessageSource; -import io.reflectoring.reactive.batch.ReactiveBatchProcessor; -import io.reflectoring.reactive.batch.ReactiveBatchProcessorV1; -import io.reflectoring.reactive.batch.ReactiveBatchProcessorV2; -import io.reflectoring.reactive.batch.ReactiveBatchProcessorV3; import java.util.concurrent.TimeUnit; import org.junit.jupiter.api.Test; -public class ReactiveBatchProcessorTest { +class ReactiveBatchProcessorTest { @Test - public void allMessagesAreProcessedOnMultipleThreads() { + void allMessagesAreProcessedOnMultipleThreads() { int batches = 10; int batchSize = 3; diff --git a/reactive/src/test/java/io/reflectoring/TestMessageHandler.java b/reactive/src/test/java/io/reflectoring/reactive/batch/TestMessageHandler.java similarity index 80% rename from reactive/src/test/java/io/reflectoring/TestMessageHandler.java rename to reactive/src/test/java/io/reflectoring/reactive/batch/TestMessageHandler.java index 876216d..2775742 100644 --- a/reactive/src/test/java/io/reflectoring/TestMessageHandler.java +++ b/reactive/src/test/java/io/reflectoring/reactive/batch/TestMessageHandler.java @@ -1,14 +1,11 @@ -package io.reflectoring; +package io.reflectoring.reactive.batch; -import io.reflectoring.reactive.batch.Logger; -import io.reflectoring.reactive.batch.Message; -import io.reflectoring.reactive.batch.MessageHandler; import java.util.HashSet; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; -public class TestMessageHandler implements MessageHandler { +class TestMessageHandler implements MessageHandler { private final AtomicInteger processedMessages = new AtomicInteger(); diff --git a/reactive/src/test/java/io/reflectoring/TestMessageSource.java b/reactive/src/test/java/io/reflectoring/reactive/batch/TestMessageSource.java similarity index 83% rename from reactive/src/test/java/io/reflectoring/TestMessageSource.java rename to reactive/src/test/java/io/reflectoring/reactive/batch/TestMessageSource.java index 5ea417b..f29a41a 100644 --- a/reactive/src/test/java/io/reflectoring/TestMessageSource.java +++ b/reactive/src/test/java/io/reflectoring/reactive/batch/TestMessageSource.java @@ -1,13 +1,13 @@ -package io.reflectoring; +package io.reflectoring.reactive.batch; -import io.reactivex.Flowable; +import io.reactivex.rxjava3.core.Flowable; import io.reflectoring.reactive.batch.Message; import io.reflectoring.reactive.batch.MessageBatch; import io.reflectoring.reactive.batch.MessageSource; import java.util.ArrayList; import java.util.List; -public class TestMessageSource implements MessageSource { +class TestMessageSource implements MessageSource { private final int batches; @@ -19,7 +19,7 @@ public class TestMessageSource implements MessageSource { * @param batches the number of message batches to produce. * @param batchSize the number of messages per batch. */ - public TestMessageSource(int batches, int batchSize) { + TestMessageSource(int batches, int batchSize) { this.batches = batches; this.batchSize = batchSize; }