[BAEL-8417] - Seperated rxjava and rxjava2 code into two different modules

This commit is contained in:
amit2103
2018-08-21 00:40:20 +05:30
parent 110a74e4b1
commit b2b4d22fc1
13 changed files with 68 additions and 17 deletions

6
rxjava-2/README.md Normal file
View File

@@ -0,0 +1,6 @@
## Relevant articles:
- [RxJava and Error Handling](http://www.baeldung.com/rxjava-error-handling)
- [RxJava 2 Flowable](http://www.baeldung.com/rxjava-2-flowable)
- [RxJava Maybe](http://www.baeldung.com/rxjava-maybe)
- [Introduction to RxRelay for RxJava](http://www.baeldung.com/rx-relay)

46
rxjava-2/pom.xml Normal file
View File

@@ -0,0 +1,46 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>rxjava-2</artifactId>
<version>1.0-SNAPSHOT</version>
<parent>
<groupId>com.baeldung</groupId>
<artifactId>parent-java</artifactId>
<version>0.0.1-SNAPSHOT</version>
<relativePath>../parent-java</relativePath>
</parent>
<dependencies>
<dependency>
<groupId>io.reactivex.rxjava2</groupId>
<artifactId>rxjava</artifactId>
<version>${rx.java2.version}</version>
</dependency>
<dependency>
<groupId>com.jayway.awaitility</groupId>
<artifactId>awaitility</artifactId>
<version>${awaitility.version}</version>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<version>${assertj.version}</version>
</dependency>
<dependency>
<groupId>com.jakewharton.rxrelay2</groupId>
<artifactId>rxrelay</artifactId>
<version>${rxrelay.version}</version>
</dependency>
</dependencies>
<properties>
<assertj.version>3.8.0</assertj.version>
<rx.java2.version>2.1.3</rx.java2.version>
<awaitility.version>1.7.0</awaitility.version>
<rxrelay.version>2.0.0</rxrelay.version>
</properties>
</project>

View File

@@ -0,0 +1,32 @@
package com.baeldung.rxjava;
import com.jakewharton.rxrelay2.Relay;
import io.reactivex.Observer;
import io.reactivex.disposables.Disposables;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
public class RandomRelay extends Relay<Integer> {
Random random = new Random();
List<Observer<? super Integer>> observers = new ArrayList<>();
@Override
public void accept(Integer integer) {
int observerIndex = random.nextInt(observers.size()) & Integer.MAX_VALUE;
observers.get(observerIndex).onNext(integer);
}
@Override
public boolean hasObservers() {
return observers.isEmpty();
}
@Override
protected void subscribeActual(Observer<? super Integer> observer) {
observers.add(observer);
observer.onSubscribe(Disposables.fromRunnable(() -> System.out.println("Disposed")));
}
}

View File

@@ -0,0 +1,13 @@
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n
</pattern>
</encoder>
</appender>
<root level="INFO">
<appender-ref ref="STDOUT" />
</root>
</configuration>

View File

@@ -0,0 +1,93 @@
package com.baeldung.rxjava;
import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
import io.reactivex.FlowableOnSubscribe;
import io.reactivex.Observable;
import io.reactivex.exceptions.MissingBackpressureException;
import io.reactivex.schedulers.Schedulers;
import io.reactivex.subscribers.TestSubscriber;
import org.junit.Test;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
public class FlowableIntegrationTest {
@Test public void whenFlowableIsCreated_thenItIsProperlyInitialized() {
Flowable<Integer> integerFlowable = Flowable.just(1, 2, 3, 4);
assertNotNull(integerFlowable);
}
@Test public void whenFlowableIsCreatedFromObservable_thenItIsProperlyInitialized() throws InterruptedException {
Observable<Integer> integerObservable = Observable.just(1, 2, 3);
Flowable<Integer> integerFlowable = integerObservable.toFlowable(BackpressureStrategy.BUFFER);
assertNotNull(integerFlowable);
}
@Test public void whenFlowableIsCreatedFromFlowableOnSubscribe_thenItIsProperlyInitialized() throws InterruptedException {
FlowableOnSubscribe<Integer> flowableOnSubscribe = flowableEmitter -> flowableEmitter.onNext(1);
Flowable<Integer> integerFlowable = Flowable.create(flowableOnSubscribe, BackpressureStrategy.BUFFER);
assertNotNull(integerFlowable);
}
@Test public void thenAllValuesAreBufferedAndReceived() {
List testList = IntStream.range(0, 100000).boxed().collect(Collectors.toList());
Observable observable = Observable.fromIterable(testList);
TestSubscriber<Integer> testSubscriber = observable.toFlowable(BackpressureStrategy.BUFFER).observeOn(Schedulers.computation()).test();
testSubscriber.awaitTerminalEvent();
List<Integer> receivedInts = testSubscriber.getEvents().get(0).stream().mapToInt(object -> (int) object).boxed().collect(Collectors.toList());
assertEquals(testList, receivedInts);
}
@Test public void whenDropStrategyUsed_thenOnBackpressureDropped() {
List testList = IntStream.range(0, 100000).boxed().collect(Collectors.toList());
Observable observable = Observable.fromIterable(testList);
TestSubscriber<Integer> testSubscriber = observable.toFlowable(BackpressureStrategy.DROP).observeOn(Schedulers.computation()).test();
testSubscriber.awaitTerminalEvent();
List<Integer> receivedInts = testSubscriber.getEvents().get(0).stream().mapToInt(object -> (int) object).boxed().collect(Collectors.toList());
assertThat(receivedInts.size() < testList.size());
assertThat(!receivedInts.contains(100000));
}
@Test public void whenMissingStrategyUsed_thenException() {
Observable observable = Observable.range(1, 100000);
TestSubscriber subscriber = observable.toFlowable(BackpressureStrategy.MISSING).observeOn(Schedulers.computation()).test();
subscriber.awaitTerminalEvent();
subscriber.assertError(MissingBackpressureException.class);
}
@Test public void whenErrorStrategyUsed_thenExceptionIsThrown() {
Observable observable = Observable.range(1, 100000);
TestSubscriber subscriber = observable.toFlowable(BackpressureStrategy.ERROR).observeOn(Schedulers.computation()).test();
subscriber.awaitTerminalEvent();
subscriber.assertError(MissingBackpressureException.class);
}
@Test public void whenLatestStrategyUsed_thenTheLastElementReceived() {
List testList = IntStream.range(0, 100000).boxed().collect(Collectors.toList());
Observable observable = Observable.fromIterable(testList);
TestSubscriber<Integer> testSubscriber = observable.toFlowable(BackpressureStrategy.LATEST).observeOn(Schedulers.computation()).test();
testSubscriber.awaitTerminalEvent();
List<Integer> receivedInts = testSubscriber.getEvents().get(0).stream().mapToInt(object -> (int) object).boxed().collect(Collectors.toList());
assertThat(receivedInts.size() < testList.size());
assertThat(receivedInts.contains(100000));
}
}

View File

@@ -0,0 +1,40 @@
package com.baeldung.rxjava;
import org.junit.Test;
import io.reactivex.Flowable;
import io.reactivex.Maybe;
public class MaybeUnitTest {
@Test
public void whenEmitsSingleValue_thenItIsObserved() {
Maybe<Integer> maybe = Flowable.just(1, 2, 3, 4, 5)
.firstElement();
maybe.map(x -> x + 7)
.filter(x -> x > 0)
.test()
.assertResult(8)
.assertComplete();
}
@Test
public void whenEmitsNoValue_thenSignalsCompletionAndNoValueObserved() {
Maybe<Integer> maybe = Flowable.just(1, 2, 3, 4, 5)
.skip(5)
.firstElement();
maybe.test()
.assertComplete()
.assertNoValues();
}
@Test
public void whenThrowsError_thenErrorIsRaised() {
Maybe<Integer> maybe = Flowable.<Integer> error(new Exception("msg"))
.firstElement();
maybe.test()
.assertErrorMessage("msg");
}
}

View File

@@ -0,0 +1,120 @@
package com.baeldung.rxjava;
import com.jakewharton.rxrelay2.BehaviorRelay;
import com.jakewharton.rxrelay2.PublishRelay;
import com.jakewharton.rxrelay2.ReplayRelay;
import io.reactivex.internal.schedulers.SingleScheduler;
import io.reactivex.observers.TestObserver;
import org.junit.Test;
import java.util.concurrent.TimeUnit;
public class RxRelayIntegrationTest {
@Test
public void whenObserverSubscribedToPublishRelay_thenItReceivesEmittedEvents () {
PublishRelay<Integer> publishRelay = PublishRelay.create();
TestObserver<Integer> firstObserver = TestObserver.create();
TestObserver<Integer> secondObserver = TestObserver.create();
publishRelay.subscribe(firstObserver);
firstObserver.assertSubscribed();
publishRelay.accept(5);
publishRelay.accept(10);
publishRelay.subscribe(secondObserver);
secondObserver.assertSubscribed();
publishRelay.accept(15);
//First Observer will receive all events
firstObserver.assertValues(5, 10, 15);
//Second Observer will receive only last event
secondObserver.assertValue(15);
}
@Test
public void whenObserverSubscribedToBehaviorRelayWithoutDefaultValue_thenItIsEmpty() {
BehaviorRelay<Integer> behaviorRelay = BehaviorRelay.create();
TestObserver<Integer> firstObserver = new TestObserver<>();
behaviorRelay.subscribe(firstObserver);
firstObserver.assertEmpty();
}
@Test
public void whenObserverSubscribedToBehaviorRelay_thenItReceivesDefaultValue() {
BehaviorRelay<Integer> behaviorRelay = BehaviorRelay.createDefault(1);
TestObserver<Integer> firstObserver = new TestObserver<>();
behaviorRelay.subscribe(firstObserver);
firstObserver.assertValue(1);
}
@Test
public void whenObserverSubscribedToBehaviorRelay_thenItReceivesEmittedEvents () {
BehaviorRelay<Integer> behaviorRelay = BehaviorRelay.create();
TestObserver<Integer> firstObserver = TestObserver.create();
TestObserver<Integer> secondObserver = TestObserver.create();
behaviorRelay.accept(5);
behaviorRelay.subscribe(firstObserver);
behaviorRelay.accept(10);
behaviorRelay.subscribe(secondObserver);
behaviorRelay.accept(15);
firstObserver.assertValues(5, 10, 15);
secondObserver.assertValues(10, 15);
}
@Test
public void whenObserverSubscribedToReplayRelay_thenItReceivesEmittedEvents () {
ReplayRelay<Integer> replayRelay = ReplayRelay.create();
TestObserver<Integer> firstObserver = TestObserver.create();
TestObserver<Integer> secondObserver = TestObserver.create();
replayRelay.subscribe(firstObserver);
replayRelay.accept(5);
replayRelay.accept(10);
replayRelay.accept(15);
replayRelay.subscribe(secondObserver);
firstObserver.assertValues(5, 10, 15);
secondObserver.assertValues(5, 10, 15);
}
@Test
public void whenObserverSubscribedToReplayRelayWithLimitedSize_thenItReceivesEmittedEvents () {
ReplayRelay<Integer> replayRelay = ReplayRelay.createWithSize(2);
TestObserver<Integer> firstObserver = TestObserver.create();
replayRelay.accept(5);
replayRelay.accept(10);
replayRelay.accept(15);
replayRelay.accept(20);
replayRelay.subscribe(firstObserver);
firstObserver.assertValues(15, 20);
}
@Test
public void whenObserverSubscribedToReplayRelayWithMaxAge_thenItReceivesEmittedEvents () throws InterruptedException {
ReplayRelay<Integer> replayRelay = ReplayRelay.createWithTime(2000, TimeUnit.MILLISECONDS, new SingleScheduler());
TestObserver<Integer> firstObserver = TestObserver.create();
replayRelay.accept(5);
replayRelay.accept(10);
replayRelay.accept(15);
replayRelay.accept(20);
Thread.sleep(3000);
replayRelay.subscribe(firstObserver);
firstObserver.assertEmpty();
}
@Test
public void whenTwoObserversSubscribedToRandomRelay_thenOnlyOneReceivesEvent () {
RandomRelay randomRelay = new RandomRelay();
TestObserver<Integer> firstObserver = TestObserver.create();
TestObserver<Integer> secondObserver = TestObserver.create();
randomRelay.subscribe(firstObserver);
randomRelay.subscribe(secondObserver);
randomRelay.accept(5);
if(firstObserver.values().isEmpty()) {
secondObserver.assertValue(5);
} else {
firstObserver.assertValue(5);
secondObserver.assertEmpty();
}
}
}

View File

@@ -0,0 +1,138 @@
package com.baeldung.rxjava.onerror;
import io.reactivex.Observable;
import io.reactivex.exceptions.CompositeException;
import io.reactivex.observers.TestObserver;
import org.junit.Test;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.junit.Assert.assertTrue;
public class ExceptionHandlingIntegrationTest {
private Error UNKNOWN_ERROR = new Error("unknown error");
private Exception UNKNOWN_EXCEPTION = new Exception("unknown exception");
@Test
public void givenSubscriberAndError_whenHandleOnErrorReturn_thenResumed() {
TestObserver<String> testObserver = new TestObserver<>();
Observable
.<String>error(UNKNOWN_ERROR)
.onErrorReturn(Throwable::getMessage)
.subscribe(testObserver);
testObserver.assertNoErrors();
testObserver.assertComplete();
testObserver.assertValueCount(1);
testObserver.assertValue("unknown error");
}
@Test
public void givenSubscriberAndError_whenHandleOnErrorResume_thenResumed() {
TestObserver<String> testObserver = new TestObserver<>();
Observable
.<String>error(UNKNOWN_ERROR)
.onErrorResumeNext(Observable.just("one", "two"))
.subscribe(testObserver);
testObserver.assertNoErrors();
testObserver.assertComplete();
testObserver.assertValueCount(2);
testObserver.assertValues("one", "two");
}
@Test
public void givenSubscriberAndError_whenHandleOnErrorResumeItem_thenResumed() {
TestObserver<String> testObserver = new TestObserver<>();
Observable
.<String>error(UNKNOWN_ERROR)
.onErrorReturnItem("singleValue")
.subscribe(testObserver);
testObserver.assertNoErrors();
testObserver.assertComplete();
testObserver.assertValueCount(1);
testObserver.assertValue("singleValue");
}
@Test
public void givenSubscriberAndError_whenHandleOnErrorResumeFunc_thenResumed() {
TestObserver<String> testObserver = new TestObserver<>();
Observable
.<String>error(UNKNOWN_ERROR)
.onErrorResumeNext(throwable -> {
return Observable.just(throwable.getMessage(), "nextValue");
})
.subscribe(testObserver);
testObserver.assertNoErrors();
testObserver.assertComplete();
testObserver.assertValueCount(2);
testObserver.assertValues("unknown error", "nextValue");
}
@Test
public void givenSubscriberAndError_whenChangeStateOnError_thenErrorThrown() {
TestObserver<String> testObserver = new TestObserver<>();
final AtomicBoolean state = new AtomicBoolean(false);
Observable
.<String>error(UNKNOWN_ERROR)
.doOnError(throwable -> state.set(true))
.subscribe(testObserver);
testObserver.assertError(UNKNOWN_ERROR);
testObserver.assertNotComplete();
testObserver.assertNoValues();
assertTrue("state should be changed", state.get());
}
@Test
public void givenSubscriberAndError_whenExceptionOccurOnError_thenCompositeExceptionThrown() {
TestObserver<String> testObserver = new TestObserver<>();
Observable
.<String>error(UNKNOWN_ERROR)
.doOnError(throwable -> {
throw new RuntimeException("unexcepted");
})
.subscribe(testObserver);
testObserver.assertError(CompositeException.class);
testObserver.assertNotComplete();
testObserver.assertNoValues();
}
@Test
public void givenSubscriberAndException_whenHandleOnException_thenResumed() {
TestObserver<String> testObserver = new TestObserver<>();
Observable
.<String>error(UNKNOWN_EXCEPTION)
.onExceptionResumeNext(Observable.just("exceptionResumed"))
.subscribe(testObserver);
testObserver.assertNoErrors();
testObserver.assertComplete();
testObserver.assertValueCount(1);
testObserver.assertValue("exceptionResumed");
}
@Test
public void givenSubscriberAndError_whenHandleOnException_thenNotResumed() {
TestObserver<String> testObserver = new TestObserver<>();
Observable
.<String>error(UNKNOWN_ERROR)
.onExceptionResumeNext(Observable.just("exceptionResumed"))
.subscribe(testObserver);
testObserver.assertError(UNKNOWN_ERROR);
testObserver.assertNotComplete();
}
}

View File

@@ -0,0 +1,146 @@
package com.baeldung.rxjava.onerror;
import io.reactivex.Observable;
import io.reactivex.observers.TestObserver;
import org.junit.Test;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import static org.junit.Assert.assertTrue;
public class OnErrorRetryIntegrationTest {
private Error UNKNOWN_ERROR = new Error("unknown error");
@Test
public void givenSubscriberAndError_whenRetryOnError_thenRetryConfirmed() {
TestObserver<String> testObserver = new TestObserver<>();
AtomicInteger atomicCounter = new AtomicInteger(0);
Observable
.<String>error(() -> {
atomicCounter.incrementAndGet();
return UNKNOWN_ERROR;
})
.retry(1)
.subscribe(testObserver);
testObserver.assertError(UNKNOWN_ERROR);
testObserver.assertNotComplete();
testObserver.assertNoValues();
assertTrue("should call twice", atomicCounter.get() == 2);
}
@Test
public void givenSubscriberAndError_whenRetryConditionallyOnError_thenRetryConfirmed() {
TestObserver<String> testObserver = new TestObserver<>();
AtomicInteger atomicCounter = new AtomicInteger(0);
Observable
.<String>error(() -> {
atomicCounter.incrementAndGet();
return UNKNOWN_ERROR;
})
.retry((integer, throwable) -> integer < 4)
.subscribe(testObserver);
testObserver.assertError(UNKNOWN_ERROR);
testObserver.assertNotComplete();
testObserver.assertNoValues();
assertTrue("should call 4 times", atomicCounter.get() == 4);
}
@Test
public void givenSubscriberAndError_whenRetryUntilOnError_thenRetryConfirmed() {
TestObserver<String> testObserver = new TestObserver<>();
AtomicInteger atomicCounter = new AtomicInteger(0);
Observable
.<String>error(UNKNOWN_ERROR)
.retryUntil(() -> atomicCounter.incrementAndGet() > 3)
.subscribe(testObserver);
testObserver.assertError(UNKNOWN_ERROR);
testObserver.assertNotComplete();
testObserver.assertNoValues();
assertTrue("should call 4 times", atomicCounter.get() == 4);
}
@Test
public void givenSubscriberAndError_whenRetryWhenOnError_thenRetryConfirmed() {
TestObserver<String> testObserver = new TestObserver<>();
Exception noretryException = new Exception("don't retry");
Observable
.<String>error(UNKNOWN_ERROR)
.retryWhen(throwableObservable -> Observable.<String>error(noretryException))
.subscribe(testObserver);
testObserver.assertError(noretryException);
testObserver.assertNotComplete();
testObserver.assertNoValues();
}
@Test
public void givenSubscriberAndError_whenRetryWhenOnError_thenCompleted() {
TestObserver<String> testObserver = new TestObserver<>();
AtomicInteger atomicCounter = new AtomicInteger(0);
Observable
.<String>error(() -> {
atomicCounter.incrementAndGet();
return UNKNOWN_ERROR;
})
.retryWhen(throwableObservable -> Observable.empty())
.subscribe(testObserver);
testObserver.assertNoErrors();
testObserver.assertComplete();
testObserver.assertNoValues();
assertTrue("should not retry", atomicCounter.get() == 0);
}
@Test
public void givenSubscriberAndError_whenRetryWhenOnError_thenResubscribed() {
TestObserver<String> testObserver = new TestObserver<>();
AtomicInteger atomicCounter = new AtomicInteger(0);
Observable
.<String>error(() -> {
atomicCounter.incrementAndGet();
return UNKNOWN_ERROR;
})
.retryWhen(throwableObservable -> Observable.just("anything"))
.subscribe(testObserver);
testObserver.assertNoErrors();
testObserver.assertComplete();
testObserver.assertNoValues();
assertTrue("should retry once", atomicCounter.get() == 1);
}
@Test
public void givenSubscriberAndError_whenRetryWhenForMultipleTimesOnError_thenResumed() {
TestObserver<String> testObserver = new TestObserver<>();
long before = System.currentTimeMillis();
Observable
.<String>error(UNKNOWN_ERROR)
.retryWhen(throwableObservable -> throwableObservable
.zipWith(Observable.range(1, 3), (throwable, integer) -> integer)
.flatMap(integer -> {
System.out.println("retried " + integer + " times");
return Observable.timer(integer, TimeUnit.SECONDS);
}))
.blockingSubscribe(testObserver);
testObserver.assertNoErrors();
testObserver.assertComplete();
testObserver.assertNoValues();
long secondsElapsed = (System.currentTimeMillis() - before) / 1000;
assertTrue("6 seconds should elapse", secondsElapsed == 6);
}
}

View File

@@ -0,0 +1,65 @@
package com.baeldung.rxjava.operators;
import io.reactivex.Observable;
import io.reactivex.schedulers.TestScheduler;
import org.junit.Test;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;
import static org.hamcrest.CoreMatchers.hasItems;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
public class RxFlatmapAndSwitchmapUnitTest {
@Test
public void givenObservable_whenFlatmap_shouldAssertAllItemsReturned() {
//given
List<String> actualOutput = new ArrayList<>();
final TestScheduler scheduler = new TestScheduler();
final List<String> keywordToSearch = Arrays.asList("b", "bo", "boo", "book", "books");
//when
Observable.fromIterable(keywordToSearch)
.flatMap(s -> Observable
.just(s + " FirstResult", s + " SecondResult")
.delay(10, TimeUnit.SECONDS, scheduler))
.toList()
.doOnSuccess(s -> actualOutput.addAll(s))
.subscribe();
scheduler.advanceTimeBy(1, TimeUnit.MINUTES);
//then
assertThat(actualOutput, hasItems("b FirstResult", "b SecondResult",
"boo FirstResult", "boo SecondResult",
"bo FirstResult", "bo SecondResult",
"book FirstResult", "book SecondResult",
"books FirstResult", "books SecondResult"));
}
@Test
public void givenObservable_whenSwitchmap_shouldAssertLatestItemReturned() {
//given
List<String> actualOutput = new ArrayList<>();
final TestScheduler scheduler = new TestScheduler();
final List<String> keywordToSearch = Arrays.asList("b", "bo", "boo", "book", "books");
//when
Observable.fromIterable(keywordToSearch)
.switchMap(s -> Observable
.just(s + " FirstResult", s + " SecondResult")
.delay(10, TimeUnit.SECONDS, scheduler))
.toList()
.doOnSuccess(s -> actualOutput.addAll(s))
.subscribe();
scheduler.advanceTimeBy(1, TimeUnit.MINUTES);
//then
assertEquals(2, actualOutput.size());
assertThat(actualOutput, hasItems("books FirstResult", "books SecondResult"));
}
}