JAVA-11499 move rxjava* modules to new rxjava-modules (#12342)

* JAVA-11499 move rxjava* modules to new rxjava-modules

* JAVA-11499 addressed PR comments
This commit is contained in:
Keerthi
2022-06-19 01:51:25 +10:00
committed by GitHub
parent 467d2d5b3a
commit eb99a50559
64 changed files with 79 additions and 111 deletions

View File

@@ -0,0 +1,16 @@
## RxJava
This module contains articles about RxJava.
### Relevant articles:
- [Dealing with Backpressure with RxJava](https://www.baeldung.com/rxjava-backpressure)
- [How to Test RxJava?](https://www.baeldung.com/rxjava-testing)
- [Introduction to RxJava](https://www.baeldung.com/rx-java)
- [Schedulers in RxJava](https://www.baeldung.com/rxjava-schedulers)
- [Difference Between Flatmap and Switchmap in RxJava](https://www.baeldung.com/rxjava-flatmap-switchmap)
- [RxJava and Error Handling](https://www.baeldung.com/rxjava-error-handling)
- [RxJava Maybe](https://www.baeldung.com/rxjava-maybe)
- [Combining RxJava Completables](https://www.baeldung.com/rxjava-completable)
- [RxJava Hooks](https://www.baeldung.com/rxjava-hooks)
- More articles: [[next -->]](/rxjava-2)

View File

@@ -0,0 +1,15 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>rxjava-core</artifactId>
<version>1.0-SNAPSHOT</version>
<name>rxjava-core</name>
<parent>
<groupId>com.baeldung.rxjava-modules</groupId>
<artifactId>rxjava-modules</artifactId>
<version>0.0.1-SNAPSHOT</version>
</parent>
</project>

View File

@@ -0,0 +1,43 @@
package com.baeldung.rxjava;
import rx.Observable;
import java.util.List;
public class ComputeFunction {
public static void compute(Integer v) {
try {
System.out.println("compute integer v: " + v);
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static void compute(List<Integer> v) {
try {
System.out.println("compute integer v: " + v);
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static void compute(Observable<Integer> v) {
try {
v.forEach(System.out::println);
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static void compute(Long v) {
try {
System.out.println("compute integer v: " + v);
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

View File

@@ -0,0 +1,22 @@
package com.baeldung.rxjava;
import rx.Observable;
import rx.observables.ConnectableObservable;
import java.util.concurrent.TimeUnit;
public class ConnectableObservableImpl {
public static void main(String[] args) throws InterruptedException {
ConnectableObservable<Long> connectable
= Observable.interval(200, TimeUnit.MILLISECONDS).publish();
connectable.subscribe(System.out::println);
System.out.println("Connect");
connectable.connect();
Thread.sleep(500);
System.out.println("Sleep");
}
}

View File

@@ -0,0 +1,81 @@
package com.baeldung.rxjava;
import rx.Observable;
import rx.observables.BlockingObservable;
import java.util.Arrays;
import java.util.List;
public class ObservableImpl {
private static Integer[] numbers = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9};
private static String[] letters = {"a", "b", "c", "d", "e", "f", "g", "h", "i"};
private static String[] titles = {"title"};
private static List<String> titleList = Arrays.asList(titles);
static Observable<String> getTitle() {
return Observable.from(titleList);
}
public static void main(String[] args) {
System.out.println("-------Just-----------");
Observable<String> observable = Observable.just("Hello");
observable.subscribe(
System.out::println, //onNext
Throwable::printStackTrace, //onError
() -> System.out.println("onCompleted") //onCompleted
);
BlockingObservable<String> blockingObservable = observable.toBlocking();
System.out.println();
System.out.println("-------Map-----------");
Observable.from(letters)
.map(String::toUpperCase)
.subscribe(System.out::print);
System.out.println();
System.out.println("-------FlatMap-----------");
Observable.just("book1", "book2")
.flatMap(s -> getTitle())
.subscribe(System.out::print);
System.out.println();
System.out.println("--------Scan----------");
Observable.from(letters)
.scan(new StringBuilder(), StringBuilder::append)
.subscribe(System.out::println);
System.out.println();
System.out.println("------GroubBy------------");
Observable.from(numbers)
.groupBy(i -> 0 == (i % 2) ? "EVEN" : "ODD")
.subscribe((group) -> group.subscribe((number) -> {
System.out.println(group.getKey() + " : " + number);
}));
System.out.println();
System.out.println("-------Filter-----------");
Observable.from(numbers)
.filter(i -> (i % 2 == 1))
.subscribe(System.out::println);
System.out.println("------DefaultIfEmpty------------");
Observable.empty()
.defaultIfEmpty("Observable is empty")
.subscribe(System.out::println);
System.out.println("------DefaultIfEmpty-2-----------");
Observable.from(letters)
.defaultIfEmpty("Observable is empty")
.first()
.subscribe(System.out::println);
System.out.println("-------TakeWhile-----------");
Observable.from(numbers)
.takeWhile(i -> i < 5)
.subscribe(System.out::println);
}
}

View File

@@ -0,0 +1,30 @@
package com.baeldung.rxjava;
import rx.Observable;
public class ResourceManagement {
public static void main(String[] args) {
Observable<Character> values = Observable.using(
() -> {
String resource = "MyResource";
System.out.println("Leased: " + resource);
return resource;
},
r -> Observable.create(o -> {
for (Character c : r.toCharArray()) {
o.onNext(c);
}
o.onCompleted();
}),
r -> System.out.println("Disposed: " + r)
);
values.subscribe(
System.out::println,
System.out::println
);
}
}

View File

@@ -0,0 +1,18 @@
package com.baeldung.rxjava;
import rx.Observable;
import rx.Single;
public class SingleImpl {
public static void main(String[] args) {
Single<String> single = Observable.just("Hello")
.toSingle()
.doOnSuccess(System.out::print)
.doOnError(e -> {
throw new RuntimeException(e.getMessage());
});
single.subscribe();
}
}

View File

@@ -0,0 +1,73 @@
package com.baeldung.rxjava;
import rx.Observer;
import rx.subjects.PublishSubject;
public class SubjectImpl {
static Integer subscriber1 = 0;
static Integer subscriber2 = 0;
private static Integer subjectMethod() {
PublishSubject<Integer> subject = PublishSubject.create();
subject.subscribe(getFirstObserver());
subject.onNext(1);
subject.onNext(2);
subject.onNext(3);
subject.subscribe(getSecondObserver());
subject.onNext(4);
subject.onCompleted();
return subscriber1 + subscriber2;
}
static Observer<Integer> getFirstObserver() {
return new Observer<Integer>() {
@Override
public void onNext(Integer value) {
subscriber1 += value;
System.out.println("Subscriber1: " + value);
}
@Override
public void onError(Throwable e) {
System.out.println("error");
}
@Override
public void onCompleted() {
System.out.println("Subscriber1 completed");
}
};
}
static Observer<Integer> getSecondObserver() {
return new Observer<Integer>() {
@Override
public void onNext(Integer value) {
subscriber2 += value;
System.out.println("Subscriber2: " + value);
}
@Override
public void onError(Throwable e) {
System.out.println("error");
}
@Override
public void onCompleted() {
System.out.println("Subscriber2 completed");
}
};
}
public static void main(String[] args) throws InterruptedException {
System.out.println(subjectMethod());
}
}

View File

@@ -0,0 +1,13 @@
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n
</pattern>
</encoder>
</appender>
<root level="INFO">
<appender-ref ref="STDOUT" />
</root>
</configuration>

View File

@@ -0,0 +1,112 @@
package com.baeldung.rxjava;
import io.reactivex.Completable;
import io.reactivex.Flowable;
import io.reactivex.Single;
import io.reactivex.observers.DisposableCompletableObserver;
import org.junit.Before;
import org.junit.Test;
public class CompletableUnitTest {
Completable first;
Completable second;
Completable error;
Throwable throwable = new RuntimeException();
@Before
public void setUpCompletables() {
first = Completable.fromSingle(Single.just(1));
second = Completable.fromRunnable(() -> {});
error = Single.error(throwable)
.ignoreElement();
}
@Test
public void whenCompletableConstructed_thenCompletedSuccessfully() {
Completable completed = Completable.complete();
completed.subscribe(new DisposableCompletableObserver() {
@Override
public void onComplete() {
System.out.println("Completed!");
}
@Override
public void onError(Throwable e) {
e.printStackTrace();
}
});
Flowable<String> flowable = Flowable.just("request received", "user logged in");
Completable flowableCompletable = Completable.fromPublisher(flowable);
Completable singleCompletable = Single.just(1)
.ignoreElement();
completed.andThen(flowableCompletable)
.andThen(singleCompletable)
.test()
.assertComplete();
}
@Test
public void whenCombiningCompletables_thenCompletedSuccessfully() {
first.andThen(second)
.test()
.assertComplete();
}
@Test
public void whenCombinedWithError_thenCompletedWithError() {
first.andThen(second)
.andThen(error)
.test()
.assertError(throwable);
}
@Test
public void whenCombinedWithNever_thenDoesNotComplete() {
first.andThen(second)
.andThen(Completable.never())
.test()
.assertNotComplete();
}
@Test
public void whenMergedCompletables_thenCompletedSuccessfully() {
Completable.mergeArray(first, second)
.test()
.assertComplete();
}
@Test
public void whenMergedWithError_thenCompletedWithError() {
Completable.mergeArray(first, second, error)
.test()
.assertError(throwable);
}
@Test
public void whenFlatMaped_thenCompletedSuccessfully() {
Completable allElementsCompletable = Flowable.just("request received", "user logged in")
.flatMapCompletable(message -> Completable
.fromRunnable(() -> System.out.println(message))
);
allElementsCompletable
.test()
.assertComplete();
}
@Test
public void whenAmbWithNever_thenCompletedSuccessfully() {
Completable.ambArray(first, Completable.never(), second)
.test()
.assertComplete();
}
@Test
public void whenAmbWithError_thenCompletedWithError() {
Completable.ambArray(error, first, second)
.test()
.assertError(throwable);
}
}

View File

@@ -0,0 +1,27 @@
package com.baeldung.rxjava;
import org.junit.Test;
import rx.Observable;
import rx.observables.ConnectableObservable;
import java.util.concurrent.TimeUnit;
import static com.jayway.awaitility.Awaitility.await;
import static junit.framework.Assert.assertFalse;
import static junit.framework.Assert.assertTrue;
public class ConnectableObservableIntegrationTest {
@Test
public void givenConnectableObservable_whenConnect_thenGetMessage() throws InterruptedException {
String[] result = {""};
ConnectableObservable<Long> connectable
= Observable.interval(500, TimeUnit.MILLISECONDS).publish();
connectable.subscribe(i -> result[0] += i);
assertFalse(result[0].equals("01"));
connectable.connect();
await()
.until(() -> assertTrue(result[0].equals("01")));
}
}

View File

@@ -0,0 +1,40 @@
package com.baeldung.rxjava;
import org.junit.Test;
import io.reactivex.Flowable;
import io.reactivex.Maybe;
public class MaybeUnitTest {
@Test
public void whenEmitsSingleValue_thenItIsObserved() {
Maybe<Integer> maybe = Flowable.just(1, 2, 3, 4, 5)
.firstElement();
maybe.map(x -> x + 7)
.filter(x -> x > 0)
.test()
.assertResult(8)
.assertComplete();
}
@Test
public void whenEmitsNoValue_thenSignalsCompletionAndNoValueObserved() {
Maybe<Integer> maybe = Flowable.just(1, 2, 3, 4, 5)
.skip(5)
.firstElement();
maybe.test()
.assertComplete()
.assertNoValues();
}
@Test
public void whenThrowsError_thenErrorIsRaised() {
Maybe<Integer> maybe = Flowable.<Integer> error(new Exception("msg"))
.firstElement();
maybe.test()
.assertErrorMessage("msg");
}
}

View File

@@ -0,0 +1,144 @@
package com.baeldung.rxjava;
import org.junit.Test;
import rx.Observable;
import static com.baeldung.rxjava.ObservableImpl.getTitle;
import static junit.framework.Assert.assertTrue;
public class ObservableUnitTest {
private String result = "";
@Test
public void givenString_whenJustAndSubscribe_thenEmitsSingleItem() {
Observable<String> observable = Observable.just("Hello");
observable.subscribe(s -> result = s);
assertTrue(result.equals("Hello"));
}
@Test
public void givenArray_whenFromAndSubscribe_thenEmitsItems() {
String[] letters = {"a", "b", "c", "d", "e", "f", "g"};
Observable<String> observable = Observable.from(letters);
observable.subscribe(
i -> result += i,
Throwable::printStackTrace,
() -> result += "_Complete"
);
assertTrue(result.equals("abcdefg_Complete"));
}
@Test
public void givenArray_whenConvertsObservabletoBlockingObservable_thenReturnFirstElement() {
String[] letters = {"a", "b", "c", "d", "e", "f", "g"};
Observable<String> observable = Observable.from(letters);
String blockingObservable = observable.toBlocking().first();
observable.subscribe(
i -> result += i,
Throwable::printStackTrace,
() -> result += "_Completed"
);
assertTrue(String.valueOf(result.charAt(0)).equals(blockingObservable));
}
@Test
public void givenArray_whenMapAndSubscribe_thenReturnCapitalLetters() {
String[] letters = {"a", "b", "c", "d", "e", "f", "g"};
Observable.from(letters)
.map(String::toUpperCase)
.subscribe(letter -> result += letter);
assertTrue(result.equals("ABCDEFG"));
}
@Test
public void givenArray_whenFlatMapAndSubscribe_thenReturnUpperAndLowerCaseLetters() {
Observable.just("book1", "book2")
.flatMap(s -> getTitle())
.subscribe(l -> result += l);
assertTrue(result.equals("titletitle"));
}
@Test
public void givenArray_whenScanAndSubscribe_thenReturnTheSumOfAllLetters() {
String[] letters = {"a", "b", "c"};
Observable.from(letters)
.scan(new StringBuilder(), StringBuilder::append)
.subscribe(total -> result += total.toString());
assertTrue(result.equals("aababc"));
}
@Test
public void givenArrayOfNumbers_whenGroupBy_thenCreateTwoGroupsBasedOnParity() {
Integer[] numbers = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10};
String[] EVEN = {""};
String[] ODD = {""};
Observable.from(numbers)
.groupBy(i -> 0 == (i % 2) ? "EVEN" : "ODD")
.subscribe(group ->
group.subscribe((number) -> {
if (group.getKey().equals("EVEN")) {
EVEN[0] += number;
} else {
ODD[0] += number;
}
})
);
assertTrue(EVEN[0].equals("0246810"));
assertTrue(ODD[0].equals("13579"));
}
@Test
public void givenArrayOfNumbers_whenFilter_thenGetAllOddNumbers() {
Integer[] numbers = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10};
Observable.from(numbers)
.filter(i -> (i % 2 == 1))
.subscribe(i -> result += i);
assertTrue(result.equals("13579"));
}
@Test
public void givenEmptyObservable_whenDefaultIfEmpty_thenGetDefaultMessage() {
Observable.empty()
.defaultIfEmpty("Observable is empty")
.subscribe(s -> result += s);
assertTrue(result.equals("Observable is empty"));
}
@Test
public void givenObservableFromArray_whenDefaultIfEmptyAndFirst_thenGetFirstLetterFromArray() {
String[] letters = {"a", "b", "c", "d", "e", "f", "g"};
Observable.from(letters)
.defaultIfEmpty("Observable is empty")
.first()
.subscribe(s -> result += s);
assertTrue(result.equals("a"));
}
@Test
public void givenObservableFromArray_whenTakeWhile_thenGetSumOfNumbersFromCondition() {
Integer[] numbers = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10};
final Integer[] sum = {0};
Observable.from(numbers)
.takeWhile(i -> i < 5)
.subscribe(s -> sum[0] += s);
assertTrue(sum[0] == 10);
}
}

View File

@@ -0,0 +1,30 @@
package com.baeldung.rxjava;
import org.junit.Test;
import rx.Observable;
import static junit.framework.Assert.assertTrue;
public class ResourceManagementUnitTest {
@Test
public void givenResource_whenUsingOberservable_thenCreatePrintDisposeResource() throws InterruptedException {
String[] result = {""};
Observable<Character> values = Observable.using(
() -> "MyResource",
r -> Observable.create(o -> {
for (Character c : r.toCharArray())
o.onNext(c);
o.onCompleted();
}),
r -> System.out.println("Disposed: " + r)
);
values.subscribe(
v -> result[0] += v,
e -> result[0] += e
);
assertTrue(result[0].equals("MyResource"));
}
}

View File

@@ -0,0 +1,125 @@
package com.baeldung.rxjava;
import org.junit.Test;
import rx.BackpressureOverflow;
import rx.Observable;
import rx.exceptions.MissingBackpressureException;
import rx.observers.TestSubscriber;
import rx.schedulers.Schedulers;
import rx.subjects.PublishSubject;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
import static org.junit.Assert.assertTrue;
public class RxJavaBackpressureLongRunningUnitTest {
@Test
public void givenColdObservable_shouldNotThrowException() {
// given
TestSubscriber<Integer> testSubscriber = new TestSubscriber<>();
// when
Observable.range(1, 1_000_000).observeOn(Schedulers.computation()).subscribe(testSubscriber);
// then
testSubscriber.awaitTerminalEvent();
assertTrue(testSubscriber.getOnErrorEvents().size() == 0);
}
@Test
public void givenHotObservable_whenBackpressureNotDefined_shouldTrowException() {
// given
TestSubscriber<Integer> testSubscriber = new TestSubscriber<>();
PublishSubject<Integer> source = PublishSubject.create();
source.observeOn(Schedulers.computation()).subscribe(testSubscriber);
// when
IntStream.range(0, 1_000_000).forEach(source::onNext);
// then
testSubscriber.awaitTerminalEvent();
testSubscriber.assertError(MissingBackpressureException.class);
}
@Test
public void givenHotObservable_whenWindowIsDefined_shouldNotThrowException() {
// given
TestSubscriber<Observable<Integer>> testSubscriber = new TestSubscriber<>();
PublishSubject<Integer> source = PublishSubject.create();
// when
source.window(500).observeOn(Schedulers.computation()).subscribe(testSubscriber);
IntStream.range(0, 1_000).forEach(source::onNext);
// then
testSubscriber.awaitTerminalEvent(2, TimeUnit.SECONDS);
assertTrue(testSubscriber.getOnErrorEvents().size() == 0);
}
@Test
public void givenHotObservable_whenBufferIsDefined_shouldNotThrowException() {
// given
TestSubscriber<List<Integer>> testSubscriber = new TestSubscriber<>();
PublishSubject<Integer> source = PublishSubject.create();
// when
source.buffer(1024).observeOn(Schedulers.computation()).subscribe(testSubscriber);
IntStream.range(0, 1_000).forEach(source::onNext);
// then
testSubscriber.awaitTerminalEvent(2, TimeUnit.SECONDS);
assertTrue(testSubscriber.getOnErrorEvents().size() == 0);
}
@Test
public void givenHotObservable_whenSkippingOperationIsDefined_shouldNotThrowException() {
// given
TestSubscriber<Integer> testSubscriber = new TestSubscriber<>();
PublishSubject<Integer> source = PublishSubject.create();
// when
source.sample(100, TimeUnit.MILLISECONDS)
// .throttleFirst(100, TimeUnit.MILLISECONDS)
.observeOn(Schedulers.computation()).subscribe(testSubscriber);
IntStream.range(0, 1_000).forEach(source::onNext);
// then
testSubscriber.awaitTerminalEvent(2, TimeUnit.SECONDS);
assertTrue(testSubscriber.getOnErrorEvents().size() == 0);
}
@Test
public void givenHotObservable_whenOnBackpressureBufferDefined_shouldNotThrowException() {
// given
TestSubscriber<Integer> testSubscriber = new TestSubscriber<>();
// when
Observable.range(1, 1_000_000).onBackpressureBuffer(16, () -> {
}, BackpressureOverflow.ON_OVERFLOW_DROP_OLDEST).observeOn(Schedulers.computation()).subscribe(testSubscriber);
// then
testSubscriber.awaitTerminalEvent(2, TimeUnit.SECONDS);
assertTrue(testSubscriber.getOnErrorEvents().size() == 0);
}
@Test
public void givenHotObservable_whenOnBackpressureDropDefined_shouldNotThrowException() {
// given
TestSubscriber<Integer> testSubscriber = new TestSubscriber<>();
// when
Observable.range(1, 1_000_000).onBackpressureDrop().observeOn(Schedulers.computation())
.subscribe(testSubscriber);
// then
testSubscriber.awaitTerminalEvent(2, TimeUnit.SECONDS);
assertTrue(testSubscriber.getOnErrorEvents().size() == 0);
}
}

View File

@@ -0,0 +1,84 @@
package com.baeldung.rxjava;
import static org.junit.Assert.assertTrue;
import org.junit.After;
import org.junit.Test;
import io.reactivex.Observable;
import io.reactivex.plugins.RxJavaPlugins;
import io.reactivex.schedulers.Schedulers;
public class RxJavaHooksManualTest {
private boolean initHookCalled = false;
private boolean hookCalled = false;
@Test
public void givenIOScheduler_whenCalled_shouldExecuteTheHooks() {
RxJavaPlugins.setInitIoSchedulerHandler((scheduler) -> {
initHookCalled = true;
return scheduler.call();
});
RxJavaPlugins.setIoSchedulerHandler((scheduler) -> {
hookCalled = true;
return scheduler;
});
Observable.range(1, 10)
.map(v -> v * 2)
.subscribeOn(Schedulers.io())
.test();
assertTrue(hookCalled && initHookCalled);
}
@Test
public void givenNewThreadScheduler_whenCalled_shouldExecuteTheHook() {
RxJavaPlugins.setInitNewThreadSchedulerHandler((scheduler) -> {
initHookCalled = true;
return scheduler.call();
});
RxJavaPlugins.setNewThreadSchedulerHandler((scheduler) -> {
hookCalled = true;
return scheduler;
});
Observable.range(1, 15)
.map(v -> v * 2)
.subscribeOn(Schedulers.newThread())
.test();
assertTrue(hookCalled && initHookCalled);
}
@Test
public void givenSingleScheduler_whenCalled_shouldExecuteTheHooks() {
RxJavaPlugins.setInitSingleSchedulerHandler((scheduler) -> {
initHookCalled = true;
return scheduler.call();
});
RxJavaPlugins.setSingleSchedulerHandler((scheduler) -> {
hookCalled = true;
return scheduler;
});
Observable.range(1, 10)
.map(v -> v * 2)
.subscribeOn(Schedulers.single())
.test();
assertTrue(hookCalled && initHookCalled);
}
@After
public void reset() {
hookCalled = false;
initHookCalled = false;
RxJavaPlugins.reset();
}
}

View File

@@ -0,0 +1,244 @@
package com.baeldung.rxjava;
import static org.junit.Assert.assertTrue;
import org.junit.After;
import org.junit.Test;
import io.reactivex.Completable;
import io.reactivex.Flowable;
import io.reactivex.Maybe;
import io.reactivex.Observable;
import io.reactivex.Single;
import io.reactivex.flowables.ConnectableFlowable;
import io.reactivex.observables.ConnectableObservable;
import io.reactivex.plugins.RxJavaPlugins;
import io.reactivex.schedulers.Schedulers;
public class RxJavaHooksUnitTest {
private boolean initHookCalled = false;
private boolean hookCalled = false;
@Test
public void givenObservable_whenError_shouldExecuteTheHook() {
RxJavaPlugins.setErrorHandler(throwable -> {
hookCalled = true;
});
Observable.error(new IllegalStateException())
.subscribe();
assertTrue(hookCalled);
}
@Test
public void givenCompletable_whenAssembled_shouldExecuteTheHook() {
RxJavaPlugins.setOnCompletableAssembly(completable -> {
hookCalled = true;
return completable;
});
Completable.fromSingle(Single.just(1));
assertTrue(hookCalled);
}
@Test
public void givenCompletable_whenSubscribed_shouldExecuteTheHook() {
RxJavaPlugins.setOnCompletableSubscribe((completable, observer) -> {
hookCalled = true;
return observer;
});
Completable.fromSingle(Single.just(1))
.test();
assertTrue(hookCalled);
}
@Test
public void givenObservable_whenAssembled_shouldExecuteTheHook() {
RxJavaPlugins.setOnObservableAssembly(observable -> {
hookCalled = true;
return observable;
});
Observable.range(1, 10);
assertTrue(hookCalled);
}
@Test
public void givenObservable_whenSubscribed_shouldExecuteTheHook() {
RxJavaPlugins.setOnObservableSubscribe((observable, observer) -> {
hookCalled = true;
return observer;
});
Observable.range(1, 10)
.test();
assertTrue(hookCalled);
}
@Test
public void givenConnectableObservable_whenAssembled_shouldExecuteTheHook() {
RxJavaPlugins.setOnConnectableObservableAssembly(connectableObservable -> {
hookCalled = true;
return connectableObservable;
});
ConnectableObservable.range(1, 10)
.publish()
.connect();
assertTrue(hookCalled);
}
@Test
public void givenFlowable_whenAssembled_shouldExecuteTheHook() {
RxJavaPlugins.setOnFlowableAssembly(flowable -> {
hookCalled = true;
return flowable;
});
Flowable.range(1, 10);
assertTrue(hookCalled);
}
@Test
public void givenFlowable_whenSubscribed_shouldExecuteTheHook() {
RxJavaPlugins.setOnFlowableSubscribe((flowable, observer) -> {
hookCalled = true;
return observer;
});
Flowable.range(1, 10)
.test();
assertTrue(hookCalled);
}
@Test
public void givenConnectableFlowable_whenAssembled_shouldExecuteTheHook() {
RxJavaPlugins.setOnConnectableFlowableAssembly(connectableFlowable -> {
hookCalled = true;
return connectableFlowable;
});
ConnectableFlowable.range(1, 10)
.publish()
.connect();
assertTrue(hookCalled);
}
@Test
public void givenParallel_whenAssembled_shouldExecuteTheHook() {
RxJavaPlugins.setOnParallelAssembly(parallelFlowable -> {
hookCalled = true;
return parallelFlowable;
});
Flowable.range(1, 10)
.parallel();
assertTrue(hookCalled);
}
@Test
public void givenMaybe_whenAssembled_shouldExecuteTheHook() {
RxJavaPlugins.setOnMaybeAssembly(maybe -> {
hookCalled = true;
return maybe;
});
Maybe.just(1);
assertTrue(hookCalled);
}
@Test
public void givenMaybe_whenSubscribed_shouldExecuteTheHook() {
RxJavaPlugins.setOnMaybeSubscribe((maybe, observer) -> {
hookCalled = true;
return observer;
});
Maybe.just(1)
.test();
assertTrue(hookCalled);
}
@Test
public void givenSingle_whenAssembled_shouldExecuteTheHook() {
RxJavaPlugins.setOnSingleAssembly(single -> {
hookCalled = true;
return single;
});
Single.just(1);
assertTrue(hookCalled);
}
@Test
public void givenSingle_whenSubscribed_shouldExecuteTheHook() {
RxJavaPlugins.setOnSingleSubscribe((single, observer) -> {
hookCalled = true;
return observer;
});
Single.just(1)
.test();
assertTrue(hookCalled);
}
@Test
public void givenAnyScheduler_whenCalled_shouldExecuteTheHook() {
RxJavaPlugins.setScheduleHandler((runnable) -> {
hookCalled = true;
return runnable;
});
Observable.range(1, 10)
.map(v -> v * 2)
.subscribeOn(Schedulers.single())
.test();
hookCalled = false;
Observable.range(1, 10)
.map(v -> v * 2)
.subscribeOn(Schedulers.computation())
.test();
assertTrue(hookCalled);
}
@Test
public void givenComputationScheduler_whenCalled_shouldExecuteTheHooks() {
RxJavaPlugins.setInitComputationSchedulerHandler((scheduler) -> {
initHookCalled = true;
return scheduler.call();
});
RxJavaPlugins.setComputationSchedulerHandler((scheduler) -> {
hookCalled = true;
return scheduler;
});
Observable.range(1, 10)
.map(v -> v * 2)
.subscribeOn(Schedulers.computation())
.test();
assertTrue(hookCalled && initHookCalled);
}
@After
public void reset() {
initHookCalled = false;
hookCalled = false;
RxJavaPlugins.reset();
}
}

View File

@@ -0,0 +1,104 @@
package com.baeldung.rxjava;
import org.junit.Test;
import rx.Observable;
import rx.observers.TestSubscriber;
import rx.schedulers.TestScheduler;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;
import static org.hamcrest.Matchers.hasItems;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.notNullValue;
import static org.junit.Assert.assertThat;
public class RxJavaUnitTest {
@Test
public void givenObservable_whenZip_shouldAssertBlockingInASameThread() {
// given
List<String> letters = Arrays.asList("A", "B", "C", "D", "E");
List<String> results = new ArrayList<>();
Observable<String> observable = Observable.from(letters)
.zipWith(Observable.range(1, Integer.MAX_VALUE), (string, index) -> index + "-" + string);
// when
observable.subscribe(results::add);
// then
assertThat(results, notNullValue());
assertThat(results, hasSize(5));
assertThat(results, hasItems("1-A", "2-B", "3-C", "4-D", "5-E"));
}
@Test
public void givenObservable_whenZip_shouldAssertOnTestSubscriber() {
// given
List<String> letters = Arrays.asList("A", "B", "C", "D", "E");
TestSubscriber<String> subscriber = new TestSubscriber<>();
Observable<String> observable = Observable.from(letters)
.zipWith(Observable.range(1, Integer.MAX_VALUE), ((string, index) -> index + "-" + string));
// when
observable.subscribe(subscriber);
// then
subscriber.assertCompleted();
subscriber.assertNoErrors();
subscriber.assertValueCount(5);
assertThat(subscriber.getOnNextEvents(), hasItems("1-A", "2-B", "3-C", "4-D", "5-E"));
}
@Test
public void givenTestObserver_whenExceptionWasThrowsOnObservable_observerShouldGetError() {
// given
List<String> letters = Arrays.asList("A", "B", "C", "D", "E");
TestSubscriber<String> subscriber = new TestSubscriber<>();
Observable<String> observable = Observable.from(letters)
.zipWith(Observable.range(1, Integer.MAX_VALUE), ((string, index) -> index + "-" + string))
.concatWith(Observable.error(new RuntimeException("error in Observable")));
// when
observable.subscribe(subscriber);
// then
subscriber.assertError(RuntimeException.class);
subscriber.assertNotCompleted();
}
@Test
public void givenObservableThatEmitsEventPerSecond_whenUseAdvanceByTime_shouldEmitEventPerSecond() {
// given
List<String> letters = Arrays.asList("A", "B", "C", "D", "E");
TestScheduler scheduler = new TestScheduler();
TestSubscriber<String> subscriber = new TestSubscriber<>();
Observable<Long> tick = Observable.interval(1, TimeUnit.SECONDS, scheduler);
Observable<String> observable = Observable.from(letters).zipWith(tick, (string, index) -> index + "-" + string);
observable.subscribeOn(scheduler).subscribe(subscriber);
// expect
subscriber.assertNoValues();
subscriber.assertNotCompleted();
// when
scheduler.advanceTimeBy(1, TimeUnit.SECONDS);
// then
subscriber.assertNoErrors();
subscriber.assertValueCount(1);
subscriber.assertValues("0-A");
// when
scheduler.advanceTimeTo(6, TimeUnit.SECONDS);
subscriber.assertCompleted();
subscriber.assertNoErrors();
subscriber.assertValueCount(5);
assertThat(subscriber.getOnNextEvents(), hasItems("0-A", "1-B", "2-C", "3-D", "4-E"));
}
}

View File

@@ -0,0 +1,247 @@
package com.baeldung.rxjava;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.junit.Ignore;
import org.junit.Test;
import rx.Observable;
import rx.Scheduler;
import rx.observers.TestSubscriber;
import rx.schedulers.Schedulers;
import rx.schedulers.TestScheduler;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import static com.jayway.awaitility.Awaitility.await;
import static java.util.concurrent.Executors.newFixedThreadPool;
import static org.hamcrest.Matchers.hasItems;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
public class SchedulersLiveTest {
private String result = "";
private String result1 = "";
private String result2 = "";
@Test
public void givenScheduledWorker_whenScheduleAnAction_thenResultAction() throws InterruptedException {
System.out.println("scheduling");
Scheduler scheduler = Schedulers.immediate();
Scheduler.Worker worker = scheduler.createWorker();
worker.schedule(() -> result += "action");
assertTrue(result.equals("action"));
}
@Test
public void givenScheduledWorker_whenUnsubscribeOnWorker_thenResultFirstAction() throws InterruptedException {
System.out.println("canceling");
Scheduler scheduler = Schedulers.newThread();
Scheduler.Worker worker = scheduler.createWorker();
worker.schedule(() -> {
result += "First_Action";
worker.unsubscribe();
});
worker.schedule(() -> result += "Second_Action");
await()
.until(() -> assertTrue(result.equals("First_Action")));
}
@Ignore //it's not safe, not every time is running correctly
@Test
public void givenWorker_whenScheduledOnNewThread_thenResultIsBoundToNewThread() throws InterruptedException {
System.out.println("newThread_1");
Scheduler scheduler = Schedulers.newThread();
Scheduler.Worker worker = scheduler.createWorker();
worker.schedule(() -> {
result += Thread.currentThread().getName() + "_Start";
worker.schedule(() -> result += "_worker_");
result += "_End";
});
await()
.until(() -> assertTrue(result.equals("RxNewThreadScheduler-1_Start_End_worker_")));
}
@Test
public void givenObservable_whenObserveOnNewThread_thenRunOnDifferentThreadEachTime() throws InterruptedException {
System.out.println("newThread_2");
Observable.just("Hello")
.observeOn(Schedulers.newThread())
.doOnNext(s ->
result2 += Thread.currentThread().getName()
)
.observeOn(Schedulers.newThread())
.subscribe(s ->
result1 += Thread.currentThread().getName()
);
await()
.until(() -> {
assertTrue(result1.equals("RxNewThreadScheduler-1"));
assertTrue(result2.equals("RxNewThreadScheduler-2"));
});
}
@Test
public void givenWorker_whenScheduledOnImmediate_thenResultIsBoundToThread() throws InterruptedException {
System.out.println("immediate_1");
Scheduler scheduler = Schedulers.immediate();
Scheduler.Worker worker = scheduler.createWorker();
worker.schedule(() -> {
result += Thread.currentThread().getName() + "_Start";
worker.schedule(() -> result += "_worker_");
result += "_End";
});
await()
.until(() -> assertTrue(result.equals("main_Start_worker__End")));
}
@Test
public void givenObservable_whenImmediateScheduled_thenExecuteOnMainThread() throws InterruptedException {
System.out.println("immediate_2");
Observable.just("Hello")
.subscribeOn(Schedulers.immediate())
.subscribe(s ->
result += Thread.currentThread().getName()
);
await()
.until(() -> assertTrue(result.equals("main")));
}
@Test
public void givenObservable_whenTrampolineScheduled_thenExecuteOnMainThread() throws InterruptedException {
System.out.println("trampoline_1");
Observable.just(2, 4, 6, 8)
.subscribeOn(Schedulers.trampoline())
.subscribe(i -> result += "" + i);
Observable.just(1, 3, 5, 7, 9)
.subscribeOn(Schedulers.trampoline())
.subscribe(i -> result += "" + i);
await()
.until(() -> assertTrue(result.equals("246813579")));
}
@Test
public void givenWorker_whenScheduledOnTrampoline_thenComposeResultAsBlocking() throws InterruptedException {
System.out.println("trampoline_2");
Scheduler scheduler = Schedulers.trampoline();
Scheduler.Worker worker = scheduler.createWorker();
worker.schedule(() -> {
result += Thread.currentThread().getName() + "Start";
worker.schedule(() -> {
result += "_middleStart";
worker.schedule(() ->
result += "_worker_"
);
result += "_middleEnd";
});
result += "_mainEnd";
});
await()
.until(() -> assertTrue(result.equals("mainStart_mainEnd_middleStart_middleEnd_worker_")));
}
private ThreadFactory threadFactory(String pattern) {
return new ThreadFactoryBuilder()
.setNameFormat(pattern)
.build();
}
@Test
public void givenExecutors_whenSchedulerFromCreatedExecutors_thenReturnElementsOnEacheThread() throws InterruptedException {
System.out.println("from");
ExecutorService poolA = newFixedThreadPool(10, threadFactory("Sched-A-%d"));
Scheduler schedulerA = Schedulers.from(poolA);
ExecutorService poolB = newFixedThreadPool(10, threadFactory("Sched-B-%d"));
Scheduler schedulerB = Schedulers.from(poolB);
Observable<String> observable = Observable.create(subscriber -> {
subscriber.onNext("Alfa");
subscriber.onNext("Beta");
subscriber.onCompleted();
});
observable
.subscribeOn(schedulerA)
.subscribeOn(schedulerB)
.subscribe(
x -> result += Thread.currentThread().getName() + x + "_",
Throwable::printStackTrace,
() -> result += "_Completed"
);
await()
.until(() -> assertTrue(result.equals("Sched-A-0Alfa_Sched-A-0Beta__Completed")));
}
@Test
public void givenObservable_whenIoScheduling_thenReturnThreadName() throws InterruptedException {
System.out.println("io");
Observable.just("io")
.subscribeOn(Schedulers.io())
.subscribe(i -> result += Thread.currentThread().getName());
await()
.until(() -> assertTrue(result.equals("RxIoScheduler-2")));
}
@Test
@Ignore
public void givenObservable_whenComputationScheduling_thenReturnThreadName() throws InterruptedException {
System.out.println("computation");
Observable.just("computation")
.subscribeOn(Schedulers.computation())
.subscribe(i -> result += Thread.currentThread().getName());
await()
.until(() -> assertTrue(result.equals("RxComputationScheduler-1")));
}
@Test
public void givenLetters_whenTestScheduling_thenReturnValuesControllingAdvanceTime() throws InterruptedException {
List<String> letters = Arrays.asList("A", "B", "C");
TestScheduler scheduler = Schedulers.test();
TestSubscriber<String> subscriber = new TestSubscriber<>();
Observable<Long> tick = Observable.interval(1, TimeUnit.SECONDS, scheduler);
Observable.from(letters)
.zipWith(tick, (string, index) -> index + "-" + string)
.subscribeOn(scheduler)
.subscribe(subscriber);
subscriber.assertNoValues();
subscriber.assertNotCompleted();
scheduler.advanceTimeBy(1, TimeUnit.SECONDS);
subscriber.assertNoErrors();
subscriber.assertValueCount(1);
subscriber.assertValues("0-A");
scheduler.advanceTimeTo(3, TimeUnit.SECONDS);
subscriber.assertCompleted();
subscriber.assertNoErrors();
subscriber.assertValueCount(3);
assertThat(subscriber.getOnNextEvents(), hasItems("0-A", "1-B", "2-C"));
}
@Test
public void givenLetters_whenDelay_thenReturne() throws InterruptedException {
ExecutorService poolA = newFixedThreadPool(10, threadFactory("Sched1-"));
Scheduler schedulerA = Schedulers.from(poolA);
Observable.just('A', 'B')
.delay(1, TimeUnit.SECONDS, schedulerA)
.subscribe(i -> result += Thread.currentThread().getName() + i + " ");
await()
.until(() -> assertTrue(result.equals("Sched1-A Sched1-B ")));
}
}

View File

@@ -0,0 +1,23 @@
package com.baeldung.rxjava;
import org.junit.Test;
import rx.Observable;
import rx.Single;
import static junit.framework.Assert.assertTrue;
public class SingleUnitTest {
@Test
public void givenSingleObservable_whenSuccess_thenGetMessage() throws InterruptedException {
String[] result = {""};
Single<String> single = Observable.just("Hello")
.toSingle()
.doOnSuccess(i -> result[0] += i)
.doOnError(error -> {
throw new RuntimeException(error.getMessage());
});
single.subscribe();
assertTrue(result[0].equals("Hello"));
}
}

View File

@@ -0,0 +1,25 @@
package com.baeldung.rxjava;
import org.junit.Test;
import rx.subjects.PublishSubject;
import static junit.framework.Assert.assertTrue;
public class SubjectUnitTest {
@Test
public void givenSubjectAndTwoSubscribers_whenSubscribeOnSubject_thenSubscriberBeginsToAdd() {
PublishSubject<Integer> subject = PublishSubject.create();
subject.subscribe(SubjectImpl.getFirstObserver());
subject.onNext(1);
subject.onNext(2);
subject.onNext(3);
subject.subscribe(SubjectImpl.getSecondObserver());
subject.onNext(4);
subject.onCompleted();
assertTrue(SubjectImpl.subscriber1 + SubjectImpl.subscriber2 == 14);
}
}

View File

@@ -0,0 +1,138 @@
package com.baeldung.rxjava.onerror;
import io.reactivex.Observable;
import io.reactivex.exceptions.CompositeException;
import io.reactivex.observers.TestObserver;
import org.junit.Test;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.junit.Assert.assertTrue;
public class ExceptionHandlingIntegrationTest {
private Error UNKNOWN_ERROR = new Error("unknown error");
private Exception UNKNOWN_EXCEPTION = new Exception("unknown exception");
@Test
public void givenSubscriberAndError_whenHandleOnErrorReturn_thenResumed() {
TestObserver<String> testObserver = new TestObserver<>();
Observable
.<String>error(UNKNOWN_ERROR)
.onErrorReturn(Throwable::getMessage)
.subscribe(testObserver);
testObserver.assertNoErrors();
testObserver.assertComplete();
testObserver.assertValueCount(1);
testObserver.assertValue("unknown error");
}
@Test
public void givenSubscriberAndError_whenHandleOnErrorResume_thenResumed() {
TestObserver<String> testObserver = new TestObserver<>();
Observable
.<String>error(UNKNOWN_ERROR)
.onErrorResumeNext(Observable.just("one", "two"))
.subscribe(testObserver);
testObserver.assertNoErrors();
testObserver.assertComplete();
testObserver.assertValueCount(2);
testObserver.assertValues("one", "two");
}
@Test
public void givenSubscriberAndError_whenHandleOnErrorResumeItem_thenResumed() {
TestObserver<String> testObserver = new TestObserver<>();
Observable
.<String>error(UNKNOWN_ERROR)
.onErrorReturnItem("singleValue")
.subscribe(testObserver);
testObserver.assertNoErrors();
testObserver.assertComplete();
testObserver.assertValueCount(1);
testObserver.assertValue("singleValue");
}
@Test
public void givenSubscriberAndError_whenHandleOnErrorResumeFunc_thenResumed() {
TestObserver<String> testObserver = new TestObserver<>();
Observable
.<String>error(UNKNOWN_ERROR)
.onErrorResumeNext(throwable -> {
return Observable.just(throwable.getMessage(), "nextValue");
})
.subscribe(testObserver);
testObserver.assertNoErrors();
testObserver.assertComplete();
testObserver.assertValueCount(2);
testObserver.assertValues("unknown error", "nextValue");
}
@Test
public void givenSubscriberAndError_whenChangeStateOnError_thenErrorThrown() {
TestObserver<String> testObserver = new TestObserver<>();
final AtomicBoolean state = new AtomicBoolean(false);
Observable
.<String>error(UNKNOWN_ERROR)
.doOnError(throwable -> state.set(true))
.subscribe(testObserver);
testObserver.assertError(UNKNOWN_ERROR);
testObserver.assertNotComplete();
testObserver.assertNoValues();
assertTrue("state should be changed", state.get());
}
@Test
public void givenSubscriberAndError_whenExceptionOccurOnError_thenCompositeExceptionThrown() {
TestObserver<String> testObserver = new TestObserver<>();
Observable
.<String>error(UNKNOWN_ERROR)
.doOnError(throwable -> {
throw new RuntimeException("unexcepted");
})
.subscribe(testObserver);
testObserver.assertError(CompositeException.class);
testObserver.assertNotComplete();
testObserver.assertNoValues();
}
@Test
public void givenSubscriberAndException_whenHandleOnException_thenResumed() {
TestObserver<String> testObserver = new TestObserver<>();
Observable
.<String>error(UNKNOWN_EXCEPTION)
.onExceptionResumeNext(Observable.just("exceptionResumed"))
.subscribe(testObserver);
testObserver.assertNoErrors();
testObserver.assertComplete();
testObserver.assertValueCount(1);
testObserver.assertValue("exceptionResumed");
}
@Test
public void givenSubscriberAndError_whenHandleOnException_thenNotResumed() {
TestObserver<String> testObserver = new TestObserver<>();
Observable
.<String>error(UNKNOWN_ERROR)
.onExceptionResumeNext(Observable.just("exceptionResumed"))
.subscribe(testObserver);
testObserver.assertError(UNKNOWN_ERROR);
testObserver.assertNotComplete();
}
}

View File

@@ -0,0 +1,146 @@
package com.baeldung.rxjava.onerror;
import io.reactivex.Observable;
import io.reactivex.observers.TestObserver;
import org.junit.Test;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import static org.junit.Assert.assertTrue;
public class OnErrorRetryIntegrationTest {
private Error UNKNOWN_ERROR = new Error("unknown error");
@Test
public void givenSubscriberAndError_whenRetryOnError_thenRetryConfirmed() {
TestObserver<String> testObserver = new TestObserver<>();
AtomicInteger atomicCounter = new AtomicInteger(0);
Observable
.<String>error(() -> {
atomicCounter.incrementAndGet();
return UNKNOWN_ERROR;
})
.retry(1)
.subscribe(testObserver);
testObserver.assertError(UNKNOWN_ERROR);
testObserver.assertNotComplete();
testObserver.assertNoValues();
assertTrue("should call twice", atomicCounter.get() == 2);
}
@Test
public void givenSubscriberAndError_whenRetryConditionallyOnError_thenRetryConfirmed() {
TestObserver<String> testObserver = new TestObserver<>();
AtomicInteger atomicCounter = new AtomicInteger(0);
Observable
.<String>error(() -> {
atomicCounter.incrementAndGet();
return UNKNOWN_ERROR;
})
.retry((integer, throwable) -> integer < 4)
.subscribe(testObserver);
testObserver.assertError(UNKNOWN_ERROR);
testObserver.assertNotComplete();
testObserver.assertNoValues();
assertTrue("should call 4 times", atomicCounter.get() == 4);
}
@Test
public void givenSubscriberAndError_whenRetryUntilOnError_thenRetryConfirmed() {
TestObserver<String> testObserver = new TestObserver<>();
AtomicInteger atomicCounter = new AtomicInteger(0);
Observable
.<String>error(UNKNOWN_ERROR)
.retryUntil(() -> atomicCounter.incrementAndGet() > 3)
.subscribe(testObserver);
testObserver.assertError(UNKNOWN_ERROR);
testObserver.assertNotComplete();
testObserver.assertNoValues();
assertTrue("should call 4 times", atomicCounter.get() == 4);
}
@Test
public void givenSubscriberAndError_whenRetryWhenOnError_thenRetryConfirmed() {
TestObserver<String> testObserver = new TestObserver<>();
Exception noretryException = new Exception("don't retry");
Observable
.<String>error(UNKNOWN_ERROR)
.retryWhen(throwableObservable -> Observable.<String>error(noretryException))
.subscribe(testObserver);
testObserver.assertError(noretryException);
testObserver.assertNotComplete();
testObserver.assertNoValues();
}
@Test
public void givenSubscriberAndError_whenRetryWhenOnError_thenCompleted() {
TestObserver<String> testObserver = new TestObserver<>();
AtomicInteger atomicCounter = new AtomicInteger(0);
Observable
.<String>error(() -> {
atomicCounter.incrementAndGet();
return UNKNOWN_ERROR;
})
.retryWhen(throwableObservable -> Observable.empty())
.subscribe(testObserver);
testObserver.assertNoErrors();
testObserver.assertComplete();
testObserver.assertNoValues();
assertTrue("should not retry", atomicCounter.get() == 0);
}
@Test
public void givenSubscriberAndError_whenRetryWhenOnError_thenResubscribed() {
TestObserver<String> testObserver = new TestObserver<>();
AtomicInteger atomicCounter = new AtomicInteger(0);
Observable
.<String>error(() -> {
atomicCounter.incrementAndGet();
return UNKNOWN_ERROR;
})
.retryWhen(throwableObservable -> Observable.just("anything"))
.subscribe(testObserver);
testObserver.assertNoErrors();
testObserver.assertComplete();
testObserver.assertNoValues();
assertTrue("should retry once", atomicCounter.get() == 1);
}
@Test
public void givenSubscriberAndError_whenRetryWhenForMultipleTimesOnError_thenResumed() {
TestObserver<String> testObserver = new TestObserver<>();
long before = System.currentTimeMillis();
Observable
.<String>error(UNKNOWN_ERROR)
.retryWhen(throwableObservable -> throwableObservable
.zipWith(Observable.range(1, 3), (throwable, integer) -> integer)
.flatMap(integer -> {
System.out.println("retried " + integer + " times");
return Observable.timer(integer, TimeUnit.SECONDS);
}))
.blockingSubscribe(testObserver);
testObserver.assertNoErrors();
testObserver.assertComplete();
testObserver.assertNoValues();
long secondsElapsed = (System.currentTimeMillis() - before) / 1000;
assertTrue("6 seconds should elapse", secondsElapsed == 6);
}
}