[BAEL-16658] Split rxjava (& rxjava-2) by subject
This commit is contained in:
12
rxjava-operators/README.md
Normal file
12
rxjava-operators/README.md
Normal file
@@ -0,0 +1,12 @@
|
||||
## RxJava Operators
|
||||
|
||||
This module contains articles about RxJava Operators
|
||||
|
||||
### Related Articles:
|
||||
|
||||
- [Mathematical and Aggregate Operators in RxJava](https://www.baeldung.com/rxjava-math)
|
||||
- [Observable Utility Operators in RxJava](https://www.baeldung.com/rxjava-observable-operators)
|
||||
- [Implementing Custom Operators in RxJava](https://www.baeldung.com/rxjava-custom-operators)
|
||||
- [Converting Synchronous and Asynchronous APIs to Observables using RxJava2](https://www.baeldung.com/rxjava-apis-to-observables)
|
||||
|
||||
|
||||
59
rxjava-operators/pom.xml
Normal file
59
rxjava-operators/pom.xml
Normal file
@@ -0,0 +1,59 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<artifactId>rxjava-operators</artifactId>
|
||||
<version>1.0-SNAPSHOT</version>
|
||||
<name>rxjava-operators</name>
|
||||
|
||||
<parent>
|
||||
<groupId>com.baeldung</groupId>
|
||||
<artifactId>parent-java</artifactId>
|
||||
<version>0.0.1-SNAPSHOT</version>
|
||||
<relativePath>../parent-java</relativePath>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>io.reactivex</groupId>
|
||||
<artifactId>rxjava</artifactId>
|
||||
<version>${rx.java.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>io.reactivex.rxjava2</groupId>
|
||||
<artifactId>rxjava</artifactId>
|
||||
<version>${rx.java2.version}</version>
|
||||
</dependency>
|
||||
<!-- https://mvnrepository.com/artifact/com.github.akarnokd/rxjava2-extensions -->
|
||||
<dependency>
|
||||
<groupId>com.github.akarnokd</groupId>
|
||||
<artifactId>rxjava2-extensions</artifactId>
|
||||
<version>${rxjava2.ext.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.assertj</groupId>
|
||||
<artifactId>assertj-core</artifactId>
|
||||
<version>${assertj.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>io.reactivex</groupId>
|
||||
<artifactId>rxjava-math</artifactId>
|
||||
<version>${rx.java.math.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.jayway.awaitility</groupId>
|
||||
<artifactId>awaitility</artifactId>
|
||||
<version>${awaitility.version}</version>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
<properties>
|
||||
<rxjava2.ext.version>0.20.4</rxjava2.ext.version>
|
||||
<rx.java2.version>2.2.2</rx.java2.version>
|
||||
<assertj.version>3.8.0</assertj.version>
|
||||
<rx.java.version>1.2.5</rx.java.version>
|
||||
<rx.java.math.version>1.0.0</rx.java.math.version>
|
||||
<awaitility.version>1.7.0</awaitility.version>
|
||||
</properties>
|
||||
|
||||
</project>
|
||||
@@ -0,0 +1,42 @@
|
||||
package com.baeldung.rxjava.operator;
|
||||
|
||||
import rx.Observable.Operator;
|
||||
import rx.Subscriber;
|
||||
|
||||
public class ToCleanString implements Operator<String, String> {
|
||||
|
||||
public static ToCleanString toCleanString() {
|
||||
return new ToCleanString();
|
||||
}
|
||||
|
||||
private ToCleanString() {
|
||||
super();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Subscriber<? super String> call(final Subscriber<? super String> subscriber) {
|
||||
return new Subscriber<String>(subscriber) {
|
||||
@Override
|
||||
public void onCompleted() {
|
||||
if (!subscriber.isUnsubscribed()) {
|
||||
subscriber.onCompleted();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onError(Throwable t) {
|
||||
if (!subscriber.isUnsubscribed()) {
|
||||
subscriber.onError(t);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onNext(String item) {
|
||||
if (!subscriber.isUnsubscribed()) {
|
||||
final String result = item.replaceAll("[^A-Za-z0-9]", "");
|
||||
subscriber.onNext(result);
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,20 @@
|
||||
package com.baeldung.rxjava.operator;
|
||||
|
||||
import rx.Observable;
|
||||
import rx.Observable.Transformer;
|
||||
|
||||
public class ToLength implements Transformer<String, Integer> {
|
||||
|
||||
public static ToLength toLength() {
|
||||
return new ToLength();
|
||||
}
|
||||
|
||||
private ToLength() {
|
||||
super();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Observable<Integer> call(Observable<String> source) {
|
||||
return source.map(String::length);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,107 @@
|
||||
package com.baeldung.rxjava;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import org.junit.Test;
|
||||
|
||||
import hu.akarnokd.rxjava2.async.AsyncObservable;
|
||||
import io.reactivex.Observable;
|
||||
|
||||
public class AsyncAndSyncToObservableIntegrationTest {
|
||||
|
||||
AtomicInteger counter = new AtomicInteger();
|
||||
Callable<Integer> callable = () -> counter.incrementAndGet();
|
||||
|
||||
/* Method will execute every time it gets subscribed*/
|
||||
@Test
|
||||
public void givenSyncMethod_whenConvertedWithFromCallable_thenReturnObservable() {
|
||||
|
||||
Observable<Integer> source = Observable.fromCallable(callable);
|
||||
|
||||
for (int i = 1; i < 5; i++) {
|
||||
source.test()
|
||||
.awaitDone(5, TimeUnit.SECONDS)
|
||||
.assertResult(i);
|
||||
|
||||
assertEquals(i, counter.get());
|
||||
}
|
||||
}
|
||||
|
||||
/* Method will execute only once and cache its result.*/
|
||||
@Test
|
||||
public void givenSyncMethod_whenConvertedWithStart_thenReturnObservable() {
|
||||
|
||||
Observable<Integer> source = AsyncObservable.start(callable);
|
||||
|
||||
for (int i = 1; i < 5; i++) {
|
||||
source.test()
|
||||
.awaitDone(5, TimeUnit.SECONDS)
|
||||
.assertResult(1);
|
||||
|
||||
assertEquals(1, counter.get());
|
||||
}
|
||||
}
|
||||
|
||||
/* Method will execute only once and cache its result.*/
|
||||
@Test
|
||||
public void givenAsyncMethod_whenConvertedWithFromFuture_thenRetrunObservble() {
|
||||
|
||||
ExecutorService executor = Executors.newSingleThreadExecutor();
|
||||
Future<Integer> future = executor.submit(callable);
|
||||
Observable<Integer> source = Observable.fromFuture(future);
|
||||
|
||||
for (int i = 1; i < 5; i++) {
|
||||
source.test()
|
||||
.awaitDone(5, TimeUnit.SECONDS)
|
||||
.assertResult(1);
|
||||
|
||||
assertEquals(1, counter.get());
|
||||
}
|
||||
|
||||
executor.shutdown();
|
||||
}
|
||||
|
||||
/* Method will execute every time it gets subscribed*/
|
||||
@Test
|
||||
public void givenAsyncMethod_whenConvertedWithStartFuture_thenRetrunObservble() {
|
||||
|
||||
ExecutorService executor = Executors.newSingleThreadExecutor();
|
||||
Observable<Integer> source = AsyncObservable.startFuture(() -> executor.submit(callable));
|
||||
|
||||
for (int i = 1; i < 5; i++) {
|
||||
source.test()
|
||||
.awaitDone(5, TimeUnit.SECONDS)
|
||||
.assertResult(i);
|
||||
|
||||
assertEquals(i, counter.get());
|
||||
}
|
||||
|
||||
executor.shutdown();
|
||||
}
|
||||
|
||||
/*Method will execute only once and cache its result.*/
|
||||
@Test
|
||||
public void givenAsyncMethod_whenConvertedWithDeferFuture_thenRetrunObservble() {
|
||||
List<Integer> list = Arrays.asList(new Integer[] { counter.incrementAndGet(), counter.incrementAndGet(), counter.incrementAndGet() });
|
||||
ExecutorService exec = Executors.newSingleThreadExecutor();
|
||||
Callable<Observable<Integer>> callable = () -> Observable.fromIterable(list);
|
||||
Observable<Integer> source = AsyncObservable.deferFuture(() -> exec.submit(callable));
|
||||
for (int i = 1; i < 4; i++) {
|
||||
source.test()
|
||||
.awaitDone(5, TimeUnit.SECONDS)
|
||||
.assertResult(1, 2, 3);
|
||||
}
|
||||
|
||||
exec.shutdown();
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,105 @@
|
||||
package com.baeldung.rxjava;
|
||||
|
||||
import org.junit.Test;
|
||||
import rx.Observable;
|
||||
import rx.Observable.Operator;
|
||||
import rx.Observable.Transformer;
|
||||
import rx.Subscriber;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
|
||||
import static com.baeldung.rxjava.operator.ToCleanString.toCleanString;
|
||||
import static com.baeldung.rxjava.operator.ToLength.toLength;
|
||||
import static org.hamcrest.Matchers.hasItems;
|
||||
import static org.hamcrest.Matchers.hasSize;
|
||||
import static org.hamcrest.Matchers.notNullValue;
|
||||
import static org.junit.Assert.assertThat;
|
||||
|
||||
public class RxJavaCustomOperatorUnitTest {
|
||||
|
||||
@Test
|
||||
public void whenUseCleanStringOperator_thenSuccess() {
|
||||
final List<String> list = Arrays.asList("john_1", "tom-3");
|
||||
final List<String> results = new ArrayList<>();
|
||||
|
||||
final Observable<String> observable = Observable.from(list)
|
||||
.lift(toCleanString());
|
||||
|
||||
// when
|
||||
observable.subscribe(results::add);
|
||||
|
||||
// then
|
||||
assertThat(results, notNullValue());
|
||||
assertThat(results, hasSize(2));
|
||||
assertThat(results, hasItems("john1", "tom3"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void whenUseToLengthOperator_thenSuccess() {
|
||||
final List<String> list = Arrays.asList("john", "tom");
|
||||
final List<Integer> results = new ArrayList<>();
|
||||
|
||||
final Observable<Integer> observable = Observable.from(list)
|
||||
.compose(toLength());
|
||||
|
||||
// when
|
||||
observable.subscribe(results::add);
|
||||
|
||||
// then
|
||||
assertThat(results, notNullValue());
|
||||
assertThat(results, hasSize(2));
|
||||
assertThat(results, hasItems(4, 3));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void whenUseFunctionOperator_thenSuccess() {
|
||||
final Operator<String, String> cleanStringFn = subscriber -> new Subscriber<String>(subscriber) {
|
||||
@Override
|
||||
public void onCompleted() {
|
||||
if (!subscriber.isUnsubscribed()) {
|
||||
subscriber.onCompleted();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onError(Throwable t) {
|
||||
if (!subscriber.isUnsubscribed()) {
|
||||
subscriber.onError(t);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onNext(String str) {
|
||||
if (!subscriber.isUnsubscribed()) {
|
||||
final String result = str.replaceAll("[^A-Za-z0-9]", "");
|
||||
subscriber.onNext(result);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
final List<String> results = new ArrayList<>();
|
||||
Observable.from(Arrays.asList("ap_p-l@e", "or-an?ge"))
|
||||
.lift(cleanStringFn)
|
||||
.subscribe(results::add);
|
||||
|
||||
assertThat(results, notNullValue());
|
||||
assertThat(results, hasSize(2));
|
||||
assertThat(results, hasItems("apple", "orange"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void whenUseFunctionTransformer_thenSuccess() {
|
||||
final Transformer<String, Integer> toLengthFn = source -> source.map(String::length);
|
||||
|
||||
final List<Integer> results = new ArrayList<>();
|
||||
Observable.from(Arrays.asList("apple", "orange"))
|
||||
.compose(toLengthFn)
|
||||
.subscribe(results::add);
|
||||
|
||||
assertThat(results, notNullValue());
|
||||
assertThat(results, hasSize(2));
|
||||
assertThat(results, hasItems(5, 6));
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,270 @@
|
||||
package com.baeldung.rxjava;
|
||||
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.rules.ExpectedException;
|
||||
import rx.Observable;
|
||||
import rx.Observer;
|
||||
import rx.exceptions.OnErrorNotImplementedException;
|
||||
import rx.schedulers.Schedulers;
|
||||
import rx.schedulers.Timestamped;
|
||||
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import static com.jayway.awaitility.Awaitility.await;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
public class UtilityOperatorsIntegrationTest {
|
||||
|
||||
private int emittedTotal = 0;
|
||||
private int receivedTotal = 0;
|
||||
private String result = "";
|
||||
|
||||
@Rule
|
||||
public ExpectedException thrown = ExpectedException.none();
|
||||
|
||||
@Test
|
||||
public void givenObservable_whenObserveOnAfterOnNext_thenEmitsEventsOnComputeScheduler() throws InterruptedException {
|
||||
|
||||
Observable.range(1, 5)
|
||||
.map(i -> i * 100)
|
||||
.doOnNext(i -> {
|
||||
emittedTotal += i;
|
||||
System.out.println("Emitting " + i
|
||||
+ " on thread " + Thread.currentThread().getName());
|
||||
})
|
||||
.observeOn(Schedulers.computation())
|
||||
.map(i -> i * 10)
|
||||
.subscribe(i -> {
|
||||
receivedTotal += i;
|
||||
System.out.println("Received " + i + " on thread "
|
||||
+ Thread.currentThread().getName());
|
||||
});
|
||||
|
||||
await().until(() -> {
|
||||
assertTrue(emittedTotal == 1500);
|
||||
assertTrue(receivedTotal == 15000);
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenObservable_whenObserveOnBeforeOnNext_thenEmitsEventsOnComputeScheduler() throws InterruptedException {
|
||||
|
||||
Observable.range(1, 5)
|
||||
.map(i -> i * 100)
|
||||
.observeOn(Schedulers.computation())
|
||||
.doOnNext(i -> {
|
||||
emittedTotal += i;
|
||||
System.out.println("Emitting " + i
|
||||
+ " on thread " + Thread.currentThread().getName());
|
||||
})
|
||||
.map(i -> i * 10)
|
||||
.subscribe(i -> {
|
||||
receivedTotal += i;
|
||||
System.out.println("Received " + i + " on thread "
|
||||
+ Thread.currentThread().getName());
|
||||
});
|
||||
|
||||
await().until(() -> {
|
||||
assertTrue(emittedTotal == 1500);
|
||||
assertTrue(receivedTotal == 15000);
|
||||
});
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenObservable_whenSubscribeOn_thenEmitsEventsOnComputeScheduler() throws InterruptedException {
|
||||
|
||||
Observable.range(1, 5)
|
||||
.map(i -> i * 100)
|
||||
.doOnNext(i -> {
|
||||
emittedTotal += i;
|
||||
System.out.println("Emitting " + i
|
||||
+ " on thread " + Thread.currentThread().getName());
|
||||
})
|
||||
.subscribeOn(Schedulers.computation())
|
||||
.map(i -> i * 10)
|
||||
.subscribe(i -> {
|
||||
receivedTotal += i;
|
||||
System.out.println("Received " + i + " on thread "
|
||||
+ Thread.currentThread().getName());
|
||||
});
|
||||
|
||||
await().until(() -> {
|
||||
assertTrue(emittedTotal == 1500);
|
||||
assertTrue(receivedTotal == 15000);
|
||||
});
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenObservableWithOneEvent_whenSingle_thenEmitEvent() {
|
||||
|
||||
Observable.range(1, 1)
|
||||
.single()
|
||||
.subscribe(i -> receivedTotal += i);
|
||||
assertTrue(receivedTotal == 1);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenObservableWithNoEvents_whenSingle_thenThrowException() {
|
||||
|
||||
Observable.range(1, 3)
|
||||
.single()
|
||||
.onErrorReturn(e -> receivedTotal += 10)
|
||||
.subscribe();
|
||||
assertTrue(receivedTotal == 10);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenObservableWihNoEvents_whenSingleOrDefault_thenDefaultMessage() {
|
||||
|
||||
Observable.empty()
|
||||
.singleOrDefault("Default")
|
||||
.subscribe(i -> result += i);
|
||||
assertTrue(result.equals("Default"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenObservableWithManyEvents_whenSingleOrDefault_thenThrowException() {
|
||||
|
||||
Observable.range(1, 3)
|
||||
.singleOrDefault(5)
|
||||
.onErrorReturn(e -> receivedTotal += 10)
|
||||
.subscribe();
|
||||
assertTrue(receivedTotal == 10);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenObservable_whenDoOnNextAndDoOnCompleted_thenSumAllEventsAndShowMessage() {
|
||||
|
||||
Observable.range(1, 10)
|
||||
.doOnNext(r -> receivedTotal += r)
|
||||
.doOnCompleted(() -> result = "Completed")
|
||||
.subscribe();
|
||||
assertTrue(receivedTotal == 55);
|
||||
assertTrue(result.equals("Completed"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenObservable_whenDoOnEachAndDoOnSubscribe_thenSumAllValuesAndShowMessage() {
|
||||
|
||||
Observable.range(1, 10)
|
||||
.doOnEach(new Observer<Integer>() {
|
||||
@Override
|
||||
public void onCompleted() {
|
||||
System.out.println("Complete");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onError(Throwable e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onNext(Integer value) {
|
||||
receivedTotal += value;
|
||||
}
|
||||
})
|
||||
.doOnSubscribe(() -> result = "Subscribed")
|
||||
.subscribe();
|
||||
assertTrue(receivedTotal == 55);
|
||||
assertTrue(result.equals("Subscribed"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenObservable_whenDoOnErrorDoOnTerminateAndDoAfterTerminate_thenShowErrorTerminateAndAfterTerminateMessages() {
|
||||
|
||||
thrown.expect(OnErrorNotImplementedException.class);
|
||||
Observable.empty()
|
||||
.single()
|
||||
.doOnError(throwable -> {
|
||||
throw new RuntimeException("error");
|
||||
})
|
||||
.doOnTerminate(() -> result += "doOnTerminate")
|
||||
.doAfterTerminate(() -> result += "_doAfterTerminate")
|
||||
.subscribe();
|
||||
assertTrue(result.equals("doOnTerminate_doAfterTerminate"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenObservable_whenTimestamp_thenEventsShouldAppearTimestamped() {
|
||||
|
||||
Observable.range(1, 10)
|
||||
.timestamp()
|
||||
.map(o -> result = o.getClass().toString())
|
||||
.last()
|
||||
.subscribe();
|
||||
assertTrue(result.equals("class rx.schedulers.Timestamped"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenObservables_whenDelay_thenEventsStartAppearAfterATime() throws InterruptedException {
|
||||
|
||||
Observable<Timestamped<Long>> source = Observable.interval(1, TimeUnit.SECONDS)
|
||||
.take(5)
|
||||
.timestamp();
|
||||
|
||||
Observable<Timestamped<Long>> delay = source.delaySubscription(2, TimeUnit.SECONDS);
|
||||
|
||||
source.<Long>subscribe(
|
||||
value -> System.out.println("source :" + value),
|
||||
t -> System.out.println("source error"),
|
||||
() -> System.out.println("source completed"));
|
||||
|
||||
delay.subscribe(
|
||||
value -> System.out.println("delay : " + value),
|
||||
t -> System.out.println("delay error"),
|
||||
() -> System.out.println("delay completed"));
|
||||
//Thread.sleep(8000);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenObservable_whenRepeat_thenSumNumbersThreeTimes() {
|
||||
|
||||
Observable.range(1, 3)
|
||||
.repeat(3)
|
||||
.subscribe(i -> receivedTotal += i);
|
||||
assertTrue(receivedTotal == 18);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenObservable_whenUsing_thenReturnCreatedResource() {
|
||||
|
||||
Observable<Character> values = Observable.using(
|
||||
() -> "resource",
|
||||
r -> Observable.create(o -> {
|
||||
for (Character c : r.toCharArray()) {
|
||||
o.onNext(c);
|
||||
}
|
||||
o.onCompleted();
|
||||
}),
|
||||
r -> System.out.println("Disposed: " + r)
|
||||
);
|
||||
values.subscribe(
|
||||
v -> result += v,
|
||||
e -> result += e
|
||||
);
|
||||
assertTrue(result.equals("resource"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenObservableCached_whenSubscribesWith2Actions_thenEmitsCachedValues() {
|
||||
|
||||
Observable<Integer> source =
|
||||
Observable.<Integer>create(subscriber -> {
|
||||
System.out.println("Create");
|
||||
subscriber.onNext(receivedTotal += 5);
|
||||
subscriber.onCompleted();
|
||||
}
|
||||
).cache();
|
||||
source.subscribe(i -> {
|
||||
System.out.println("element 1");
|
||||
receivedTotal += 1;
|
||||
});
|
||||
source.subscribe(i -> {
|
||||
System.out.println("element 2");
|
||||
receivedTotal += 2;
|
||||
});
|
||||
assertTrue(receivedTotal == 8);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,221 @@
|
||||
package com.baeldung.rxjava.operators;
|
||||
|
||||
import org.junit.Test;
|
||||
import rx.Observable;
|
||||
import rx.observers.TestSubscriber;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
public class RxAggregateOperatorsUnitTest {
|
||||
|
||||
@Test
|
||||
public void givenTwoObservable_whenConcatenatingThem_thenSuccessfull() {
|
||||
// given
|
||||
List<Integer> listOne = Arrays.asList(1, 2, 3, 4);
|
||||
Observable<Integer> observableOne = Observable.from(listOne);
|
||||
|
||||
List<Integer> listTwo = Arrays.asList(5, 6, 7, 8);
|
||||
Observable<Integer> observableTwo = Observable.from(listTwo);
|
||||
|
||||
TestSubscriber<Integer> subscriber = TestSubscriber.create();
|
||||
|
||||
// when
|
||||
Observable<Integer> concatObservable = observableOne.concatWith(observableTwo);
|
||||
|
||||
concatObservable.subscribe(subscriber);
|
||||
|
||||
// then
|
||||
subscriber.assertCompleted();
|
||||
subscriber.assertNoErrors();
|
||||
subscriber.assertValueCount(8);
|
||||
subscriber.assertValues(1, 2, 3, 4, 5, 6, 7, 8);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenObservable_whenCounting_thenObtainingNumberOfElements() {
|
||||
// given
|
||||
List<String> lettersList = Arrays.asList("A", "B", "C", "D", "E", "F", "G");
|
||||
|
||||
TestSubscriber<Integer> subscriber = TestSubscriber.create();
|
||||
|
||||
// when
|
||||
Observable<Integer> sourceObservable = Observable.from(lettersList)
|
||||
.count();
|
||||
sourceObservable.subscribe(subscriber);
|
||||
|
||||
// then
|
||||
subscriber.assertCompleted();
|
||||
subscriber.assertNoErrors();
|
||||
subscriber.assertValueCount(1);
|
||||
subscriber.assertValue(7);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenObservable_whenReducing_thenObtainingInvertedConcatenatedString() {
|
||||
// given
|
||||
List<String> list = Arrays.asList("A", "B", "C", "D", "E", "F", "G");
|
||||
|
||||
TestSubscriber<String> subscriber = TestSubscriber.create();
|
||||
|
||||
// when
|
||||
Observable<String> reduceObservable = Observable.from(list)
|
||||
.reduce((letter1, letter2) -> letter2 + letter1);
|
||||
reduceObservable.subscribe(subscriber);
|
||||
|
||||
// then
|
||||
subscriber.assertCompleted();
|
||||
subscriber.assertNoErrors();
|
||||
subscriber.assertValueCount(1);
|
||||
subscriber.assertValue("GFEDCBA");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenObservable_whenCollecting_thenObtainingASet() {
|
||||
// given
|
||||
List<String> list = Arrays.asList("A", "B", "C", "B", "B", "A", "D");
|
||||
|
||||
TestSubscriber<HashSet> subscriber = TestSubscriber.create();
|
||||
|
||||
// when
|
||||
Observable<HashSet<String>> reduceListObservable = Observable.from(list)
|
||||
.collect(HashSet::new, HashSet::add);
|
||||
reduceListObservable.subscribe(subscriber);
|
||||
|
||||
// then
|
||||
subscriber.assertCompleted();
|
||||
subscriber.assertNoErrors();
|
||||
subscriber.assertValueCount(1);
|
||||
subscriber.assertValues(new HashSet<>(list));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenObservable_whenUsingToList_thenObtainedAList() {
|
||||
// given
|
||||
Observable<Integer> sourceObservable = Observable.range(1, 5);
|
||||
TestSubscriber<List> subscriber = TestSubscriber.create();
|
||||
|
||||
// when
|
||||
Observable<List<Integer>> listObservable = sourceObservable.toList();
|
||||
listObservable.subscribe(subscriber);
|
||||
|
||||
// then
|
||||
subscriber.assertCompleted();
|
||||
subscriber.assertNoErrors();
|
||||
subscriber.assertValueCount(1);
|
||||
subscriber.assertValue(Arrays.asList(1, 2, 3, 4, 5));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenObservable_whenUsingToSortedList_thenObtainedASortedList() {
|
||||
// given
|
||||
Observable<Integer> sourceObservable = Observable.range(10, 5);
|
||||
TestSubscriber<List> subscriber = TestSubscriber.create();
|
||||
|
||||
// when
|
||||
Observable<List<Integer>> listObservable = sourceObservable.toSortedList();
|
||||
listObservable.subscribe(subscriber);
|
||||
|
||||
// then
|
||||
subscriber.assertCompleted();
|
||||
subscriber.assertNoErrors();
|
||||
subscriber.assertValueCount(1);
|
||||
subscriber.assertValue(Arrays.asList(10, 11, 12, 13, 14));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenObservable_whenUsingToSortedListWithComparator_thenObtainedAnInverseSortedList() {
|
||||
// given
|
||||
Observable<Integer> sourceObservable = Observable.range(10, 5);
|
||||
TestSubscriber<List> subscriber = TestSubscriber.create();
|
||||
|
||||
// when
|
||||
Observable<List<Integer>> listObservable = sourceObservable.toSortedList((int1, int2) -> int2 - int1);
|
||||
listObservable.subscribe(subscriber);
|
||||
|
||||
// then
|
||||
subscriber.assertCompleted();
|
||||
subscriber.assertNoErrors();
|
||||
subscriber.assertValueCount(1);
|
||||
subscriber.assertValue(Arrays.asList(14, 13, 12, 11, 10));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenObservable_whenUsingToMap_thenObtainedAMap() {
|
||||
// given
|
||||
Observable<Book> bookObservable = Observable
|
||||
.just(
|
||||
new Book("The North Water", 2016),
|
||||
new Book("Origin", 2017),
|
||||
new Book("Sleeping Beauties", 2017));
|
||||
TestSubscriber<Map> subscriber = TestSubscriber.create();
|
||||
|
||||
// when
|
||||
Observable<Map<String, Integer>> mapObservable = bookObservable
|
||||
.toMap(Book::getTitle, Book::getYear, HashMap::new);
|
||||
|
||||
mapObservable.subscribe(subscriber);
|
||||
|
||||
// then
|
||||
subscriber.assertCompleted();
|
||||
subscriber.assertNoErrors();
|
||||
subscriber.assertValueCount(1);
|
||||
subscriber.assertValue(new HashMap() {
|
||||
{
|
||||
put("The North Water", 2016);
|
||||
put("Origin", 2017);
|
||||
put("Sleeping Beauties", 2017);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenObservable_whenUsingToMultiMap_thenObtainedAMultiMap() {
|
||||
// given
|
||||
Observable<Book> bookObservable = Observable
|
||||
.just(
|
||||
new Book("The North Water", 2016),
|
||||
new Book("Origin", 2017),
|
||||
new Book("Sleeping Beauties", 2017));
|
||||
TestSubscriber<Map> subscriber = TestSubscriber.create();
|
||||
|
||||
// when
|
||||
Observable multiMapObservable = bookObservable
|
||||
.toMultimap(Book::getYear, Book::getTitle, () -> new HashMap<>(), (key) -> new ArrayList<>());
|
||||
|
||||
multiMapObservable.subscribe(subscriber);
|
||||
|
||||
// then
|
||||
subscriber.assertCompleted();
|
||||
subscriber.assertNoErrors();
|
||||
subscriber.assertValueCount(1);
|
||||
subscriber.assertValue(new HashMap() {
|
||||
{
|
||||
put(2016, Arrays.asList("The North Water"));
|
||||
put(2017, Arrays.asList("Origin", "Sleeping Beauties"));
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
class Book {
|
||||
private String title;
|
||||
private Integer year;
|
||||
|
||||
public Book(String title, Integer year) {
|
||||
this.title = title;
|
||||
this.year = year;
|
||||
}
|
||||
|
||||
public String getTitle() {
|
||||
return title;
|
||||
}
|
||||
|
||||
public Integer getYear() {
|
||||
return year;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,65 @@
|
||||
package com.baeldung.rxjava.operators;
|
||||
|
||||
import io.reactivex.Observable;
|
||||
import io.reactivex.schedulers.TestScheduler;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import static org.hamcrest.CoreMatchers.hasItems;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertThat;
|
||||
|
||||
public class RxFlatmapAndSwitchmapUnitTest {
|
||||
@Test
|
||||
public void givenObservable_whenFlatmap_shouldAssertAllItemsReturned() {
|
||||
//given
|
||||
List<String> actualOutput = new ArrayList<>();
|
||||
final TestScheduler scheduler = new TestScheduler();
|
||||
final List<String> keywordToSearch = Arrays.asList("b", "bo", "boo", "book", "books");
|
||||
|
||||
//when
|
||||
Observable.fromIterable(keywordToSearch)
|
||||
.flatMap(s -> Observable
|
||||
.just(s + " FirstResult", s + " SecondResult")
|
||||
.delay(10, TimeUnit.SECONDS, scheduler))
|
||||
.toList()
|
||||
.doOnSuccess(s -> actualOutput.addAll(s))
|
||||
.subscribe();
|
||||
|
||||
scheduler.advanceTimeBy(1, TimeUnit.MINUTES);
|
||||
|
||||
//then
|
||||
assertThat(actualOutput, hasItems("b FirstResult", "b SecondResult",
|
||||
"boo FirstResult", "boo SecondResult",
|
||||
"bo FirstResult", "bo SecondResult",
|
||||
"book FirstResult", "book SecondResult",
|
||||
"books FirstResult", "books SecondResult"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenObservable_whenSwitchmap_shouldAssertLatestItemReturned() {
|
||||
//given
|
||||
List<String> actualOutput = new ArrayList<>();
|
||||
final TestScheduler scheduler = new TestScheduler();
|
||||
final List<String> keywordToSearch = Arrays.asList("b", "bo", "boo", "book", "books");
|
||||
|
||||
//when
|
||||
Observable.fromIterable(keywordToSearch)
|
||||
.switchMap(s -> Observable
|
||||
.just(s + " FirstResult", s + " SecondResult")
|
||||
.delay(10, TimeUnit.SECONDS, scheduler))
|
||||
.toList()
|
||||
.doOnSuccess(s -> actualOutput.addAll(s))
|
||||
.subscribe();
|
||||
|
||||
scheduler.advanceTimeBy(1, TimeUnit.MINUTES);
|
||||
|
||||
//then
|
||||
assertEquals(2, actualOutput.size());
|
||||
assertThat(actualOutput, hasItems("books FirstResult", "books SecondResult"));
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,139 @@
|
||||
package com.baeldung.rxjava.operators;
|
||||
|
||||
import org.junit.Test;
|
||||
import rx.Observable;
|
||||
import rx.observables.MathObservable;
|
||||
import rx.observers.TestSubscriber;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Comparator;
|
||||
import java.util.List;
|
||||
|
||||
public class RxMathematicalOperatorsUnitTest {
|
||||
|
||||
@Test
|
||||
public void givenRangeNumericObservable_whenCalculatingAverage_ThenSuccessfull() {
|
||||
// given
|
||||
Observable<Integer> sourceObservable = Observable.range(1, 20);
|
||||
|
||||
TestSubscriber<Integer> subscriber = TestSubscriber.create();
|
||||
|
||||
// when
|
||||
MathObservable.averageInteger(sourceObservable)
|
||||
.subscribe(subscriber);
|
||||
|
||||
// then
|
||||
subscriber.assertCompleted();
|
||||
subscriber.assertNoErrors();
|
||||
subscriber.assertValueCount(1);
|
||||
subscriber.assertValue(10);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenRangeNumericObservable_whenCalculatingSum_ThenSuccessfull() {
|
||||
// given
|
||||
Observable<Integer> sourceObservable = Observable.range(1, 20);
|
||||
TestSubscriber<Integer> subscriber = TestSubscriber.create();
|
||||
|
||||
// when
|
||||
MathObservable.sumInteger(sourceObservable)
|
||||
.subscribe(subscriber);
|
||||
|
||||
// then
|
||||
subscriber.assertCompleted();
|
||||
subscriber.assertNoErrors();
|
||||
subscriber.assertValueCount(1);
|
||||
subscriber.assertValue(210);
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenRangeNumericObservable_whenCalculatingMax_ThenSuccessfullObtainingMaxValue() {
|
||||
// given
|
||||
Observable<Integer> sourceObservable = Observable.range(1, 20);
|
||||
TestSubscriber<Integer> subscriber = TestSubscriber.create();
|
||||
|
||||
// when
|
||||
MathObservable.max(sourceObservable)
|
||||
.subscribe(subscriber);
|
||||
|
||||
// then
|
||||
subscriber.assertCompleted();
|
||||
subscriber.assertNoErrors();
|
||||
subscriber.assertValueCount(1);
|
||||
subscriber.assertValue(20);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenRangeNumericObservable_whenCalculatingMin_ThenSuccessfullObtainingMinValue() {
|
||||
// given
|
||||
Observable<Integer> sourceObservable = Observable.range(1, 20);
|
||||
TestSubscriber<Integer> subscriber = TestSubscriber.create();
|
||||
|
||||
// when
|
||||
MathObservable.min(sourceObservable)
|
||||
.subscribe(subscriber);
|
||||
|
||||
// then
|
||||
subscriber.assertCompleted();
|
||||
subscriber.assertNoErrors();
|
||||
subscriber.assertValueCount(1);
|
||||
subscriber.assertValue(1);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenItemObservable_whenCalculatingMaxWithComparator_ThenSuccessfullObtainingMaxItem() {
|
||||
// given
|
||||
Item five = new Item(5);
|
||||
List<Item> list = Arrays.asList(new Item(1), new Item(2), new Item(3), new Item(4), five);
|
||||
Observable<Item> itemObservable = Observable.from(list);
|
||||
|
||||
TestSubscriber<Item> subscriber = TestSubscriber.create();
|
||||
|
||||
// when
|
||||
MathObservable.from(itemObservable)
|
||||
.max(Comparator.comparing(Item::getId))
|
||||
.subscribe(subscriber);
|
||||
|
||||
// then
|
||||
subscriber.assertCompleted();
|
||||
subscriber.assertNoErrors();
|
||||
subscriber.assertValueCount(1);
|
||||
subscriber.assertValue(five);
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenItemObservable_whenCalculatingMinWithComparator_ThenSuccessfullObtainingMinItem() {
|
||||
// given
|
||||
Item one = new Item(1);
|
||||
List<Item> list = Arrays.asList(one, new Item(2), new Item(3), new Item(4), new Item(5));
|
||||
TestSubscriber<Item> subscriber = TestSubscriber.create();
|
||||
Observable<Item> itemObservable = Observable.from(list);
|
||||
|
||||
// when
|
||||
MathObservable.from(itemObservable)
|
||||
.min(Comparator.comparing(Item::getId))
|
||||
.subscribe(subscriber);
|
||||
|
||||
// then
|
||||
subscriber.assertCompleted();
|
||||
subscriber.assertNoErrors();
|
||||
subscriber.assertValueCount(1);
|
||||
subscriber.assertValue(one);
|
||||
|
||||
}
|
||||
|
||||
class Item {
|
||||
private Integer id;
|
||||
|
||||
public Item(Integer id) {
|
||||
this.id = id;
|
||||
}
|
||||
|
||||
public Integer getId() {
|
||||
return id;
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user