reduces visibility

This commit is contained in:
thombergs
2020-02-03 06:37:25 +11:00
parent b6d167f4ae
commit 5b76cbac62
15 changed files with 40 additions and 48 deletions

View File

@@ -15,7 +15,7 @@ repositories {
}
dependencies {
implementation 'io.reactivex.rxjava2:rxjava:2.2.17'
implementation 'io.reactivex.rxjava3:rxjava:3.0.0-RC9'
testImplementation 'org.junit.jupiter:junit-jupiter-engine:5.0.1'
testImplementation 'org.awaitility:awaitility:3.0.0'
}

View File

@@ -1,8 +1,8 @@
package io.reflectoring.reactive.batch;
public class Logger {
class Logger {
public void log(String string) {
void log(String string) {
System.out.println(String.format("%s %s: %s", System.currentTimeMillis(), Thread.currentThread().getName(), string));
}

View File

@@ -1,10 +1,10 @@
package io.reflectoring.reactive.batch;
public class Message {
class Message {
private final String content;
public Message(String content) {
Message(String content) {
this.content = content;
}

View File

@@ -3,11 +3,11 @@ package io.reflectoring.reactive.batch;
import java.util.Collections;
import java.util.List;
public class MessageBatch {
class MessageBatch {
private final List<Message> messages;
public MessageBatch(List<Message> messages) {
MessageBatch(List<Message> messages) {
this.messages = messages;
}

View File

@@ -1,6 +1,6 @@
package io.reflectoring.reactive.batch;
public interface MessageHandler {
interface MessageHandler {
enum Result {
SUCCESS,

View File

@@ -1,8 +1,8 @@
package io.reflectoring.reactive.batch;
import io.reactivex.Flowable;
import io.reactivex.rxjava3.core.Flowable;
public interface MessageSource {
interface MessageSource {
Flowable<MessageBatch> getMessageBatches();

View File

@@ -1,9 +1,9 @@
package io.reflectoring.reactive.batch;
import io.reactivex.Flowable;
import io.reactivex.Scheduler;
import io.reactivex.Single;
import io.reactivex.schedulers.Schedulers;
import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.core.Scheduler;
import io.reactivex.rxjava3.core.Single;
import io.reactivex.rxjava3.schedulers.Schedulers;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
@@ -21,7 +21,7 @@ public class ReactiveBatchProcessor {
private final MessageSource messageSource;
public ReactiveBatchProcessor(
ReactiveBatchProcessor(
MessageSource messageSource,
MessageHandler messageHandler,
int threads,
@@ -32,7 +32,7 @@ public class ReactiveBatchProcessor {
this.threadPoolQueueSize = threadPoolQueueSize;
}
public void start() {
void start() {
Scheduler scheduler = threadPoolScheduler(threads, threadPoolQueueSize);

View File

@@ -1,9 +1,9 @@
package io.reflectoring.reactive.batch;
import io.reactivex.Flowable;
import io.reactivex.Scheduler;
import io.reactivex.Single;
import io.reactivex.schedulers.Schedulers;
import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.core.Scheduler;
import io.reactivex.rxjava3.core.Single;
import io.reactivex.rxjava3.schedulers.Schedulers;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;

View File

@@ -1,9 +1,9 @@
package io.reflectoring.reactive.batch;
import io.reactivex.Flowable;
import io.reactivex.Scheduler;
import io.reactivex.Single;
import io.reactivex.schedulers.Schedulers;
import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.core.Scheduler;
import io.reactivex.rxjava3.core.Single;
import io.reactivex.rxjava3.schedulers.Schedulers;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;

View File

@@ -1,9 +1,9 @@
package io.reflectoring.reactive.batch;
import io.reactivex.Flowable;
import io.reactivex.Scheduler;
import io.reactivex.Single;
import io.reactivex.schedulers.Schedulers;
import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.core.Scheduler;
import io.reactivex.rxjava3.core.Single;
import io.reactivex.rxjava3.schedulers.Schedulers;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;

View File

@@ -3,7 +3,7 @@ package io.reflectoring.reactive.batch;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
public class SimpleSubscriber<T> implements Subscriber<T> {
class SimpleSubscriber<T> implements Subscriber<T> {
private final Logger logger = new Logger();
@@ -12,7 +12,7 @@ public class SimpleSubscriber<T> implements Subscriber<T> {
private Subscription subscription;
public SimpleSubscriber(int initialFetchCount, int onNextFetchCount) {
SimpleSubscriber(int initialFetchCount, int onNextFetchCount) {
this.initialFetchCount = initialFetchCount;
this.onNextFetchCount = onNextFetchCount;
}

View File

@@ -4,7 +4,7 @@ import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
public class WaitForCapacityPolicy implements RejectedExecutionHandler {
class WaitForCapacityPolicy implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable runnable, ThreadPoolExecutor threadPoolExecutor) {

View File

@@ -1,20 +1,15 @@
package io.reflectoring;
package io.reflectoring.reactive.batch;
import static org.awaitility.Awaitility.await;
import static org.junit.jupiter.api.Assertions.assertEquals;
import io.reflectoring.reactive.batch.MessageSource;
import io.reflectoring.reactive.batch.ReactiveBatchProcessor;
import io.reflectoring.reactive.batch.ReactiveBatchProcessorV1;
import io.reflectoring.reactive.batch.ReactiveBatchProcessorV2;
import io.reflectoring.reactive.batch.ReactiveBatchProcessorV3;
import java.util.concurrent.TimeUnit;
import org.junit.jupiter.api.Test;
public class ReactiveBatchProcessorTest {
class ReactiveBatchProcessorTest {
@Test
public void allMessagesAreProcessedOnMultipleThreads() {
void allMessagesAreProcessedOnMultipleThreads() {
int batches = 10;
int batchSize = 3;

View File

@@ -1,14 +1,11 @@
package io.reflectoring;
package io.reflectoring.reactive.batch;
import io.reflectoring.reactive.batch.Logger;
import io.reflectoring.reactive.batch.Message;
import io.reflectoring.reactive.batch.MessageHandler;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
public class TestMessageHandler implements MessageHandler {
class TestMessageHandler implements MessageHandler {
private final AtomicInteger processedMessages = new AtomicInteger();

View File

@@ -1,13 +1,13 @@
package io.reflectoring;
package io.reflectoring.reactive.batch;
import io.reactivex.Flowable;
import io.reactivex.rxjava3.core.Flowable;
import io.reflectoring.reactive.batch.Message;
import io.reflectoring.reactive.batch.MessageBatch;
import io.reflectoring.reactive.batch.MessageSource;
import java.util.ArrayList;
import java.util.List;
public class TestMessageSource implements MessageSource {
class TestMessageSource implements MessageSource {
private final int batches;
@@ -19,7 +19,7 @@ public class TestMessageSource implements MessageSource {
* @param batches the number of message batches to produce.
* @param batchSize the number of messages per batch.
*/
public TestMessageSource(int batches, int batchSize) {
TestMessageSource(int batches, int batchSize) {
this.batches = batches;
this.batchSize = batchSize;
}