diff --git a/rxjava/src/test/java/com/baeldung/rxjava/filters/RxJavaFilterOperatorsTest.java b/rxjava/src/test/java/com/baeldung/rxjava/filters/RxJavaFilterOperatorsTest.java new file mode 100644 index 0000000000..be0f390b67 --- /dev/null +++ b/rxjava/src/test/java/com/baeldung/rxjava/filters/RxJavaFilterOperatorsTest.java @@ -0,0 +1,202 @@ +package com.baeldung.rxjava.filters; + +import org.junit.Test; + +import rx.Observable; +import rx.observers.TestSubscriber; + +public class RxJavaFilterOperatorsTest { + + @Test + public void givenRangeObservable_whenFilteringItems_thenOddItemsAreFiltered() { + + Observable sourceObservable = Observable.range(1, 10); + TestSubscriber subscriber = new TestSubscriber(); + + Observable filteredObservable = sourceObservable.filter(i -> i % 2 != 0); + + filteredObservable.subscribe(subscriber); + + subscriber.assertCompleted(); + subscriber.assertNoErrors(); + subscriber.assertValueCount(5); + subscriber.assertValues(1, 3, 5, 7, 9); + } + + @Test + public void givenRangeObservable_whenFilteringWithTake_thenOnlyFirstThreeItemsAreEmitted() { + + Observable sourceObservable = Observable.range(1, 10); + TestSubscriber subscriber = new TestSubscriber(); + + Observable filteredObservable = sourceObservable.take(3); + + filteredObservable.subscribe(subscriber); + + subscriber.assertCompleted(); + subscriber.assertNoErrors(); + subscriber.assertValueCount(3); + subscriber.assertValues(1, 2, 3); + } + + @Test + public void givenObservable_whenFilteringWithTakeWhile_thenItemsEmittedUntilConditionIsVerified() { + + Observable sourceObservable = Observable.just(1, 2, 3, 4, 3, 2, 1); + TestSubscriber subscriber = new TestSubscriber(); + + Observable filteredObservable = sourceObservable.takeWhile(i -> i < 4); + + filteredObservable.subscribe(subscriber); + + subscriber.assertCompleted(); + subscriber.assertNoErrors(); + subscriber.assertValueCount(3); + subscriber.assertValues(1, 2, 3); + } + + @Test + public void givenRangeObservable_whenFilteringWithTakeFirst_thenOnlyFirstItemIsEmitted() { + + Observable sourceObservable = Observable.just(1, 2, 3, 4, 5, 7, 6); + TestSubscriber subscriber = new TestSubscriber(); + + Observable filteredObservable = sourceObservable.takeFirst(x -> x > 5); + + filteredObservable.subscribe(subscriber); + + subscriber.assertCompleted(); + subscriber.assertNoErrors(); + subscriber.assertValueCount(1); + subscriber.assertValue(7); + } + + @Test + public void givenRangeObservable_whenFilteringWithFirst_thenOnlyFirstThreeItemsAreEmitted() { + + Observable sourceObservable = Observable.range(1, 10); + TestSubscriber subscriber = new TestSubscriber(); + + Observable filteredObservable = sourceObservable.first(); + + filteredObservable.subscribe(subscriber); + + subscriber.assertCompleted(); + subscriber.assertNoErrors(); + subscriber.assertValueCount(1); + subscriber.assertValue(1); + } + + @Test + public void givenEmptyObservable_whenFilteringWithFirstOrDefault_thenDefaultValue() { + + Observable sourceObservable = Observable.empty(); + TestSubscriber subscriber = new TestSubscriber(); + + Observable filteredObservable = sourceObservable.firstOrDefault(-1); + + filteredObservable.subscribe(subscriber); + + subscriber.assertCompleted(); + subscriber.assertNoErrors(); + subscriber.assertValueCount(1); + subscriber.assertValue(-1); + } + + @Test + public void givenRangeObservable_whenFilteringWithTakeLast_thenLastThreeItemAreEmitted() { + + Observable sourceObservable = Observable.range(1, 10); + TestSubscriber subscriber = new TestSubscriber(); + + Observable filteredObservable = sourceObservable.takeLast(3); + + filteredObservable.subscribe(subscriber); + + subscriber.assertCompleted(); + subscriber.assertNoErrors(); + subscriber.assertValueCount(3); + subscriber.assertValues(8, 9, 10); + } + + @Test + public void givenRangeObservable_whenFilteringWithLast_thenOnlyLastItemIsEmitted() { + + Observable sourceObservable = Observable.range(1, 10); + TestSubscriber subscriber = new TestSubscriber(); + + Observable filteredObservable = sourceObservable.last(i -> i % 2 != 0); + + filteredObservable.subscribe(subscriber); + + subscriber.assertCompleted(); + subscriber.assertNoErrors(); + subscriber.assertValueCount(1); + subscriber.assertValue(9); + } + + @Test + public void givenRangeObservable_whenFilteringWithLastAndDefault_thenOnlyDefaultIsEmitted() { + + Observable sourceObservable = Observable.range(1, 10); + TestSubscriber subscriber = new TestSubscriber(); + + Observable filteredObservable = sourceObservable.lastOrDefault(-1, i -> i > 10); + + filteredObservable.subscribe(subscriber); + + subscriber.assertCompleted(); + subscriber.assertNoErrors(); + subscriber.assertValueCount(1); + subscriber.assertValue(-1); + } + + @Test + public void givenObservable_whenTakingElementAt_thenItemAtSpecifiedIndexIsEmitted() { + + Observable sourceObservable = Observable.just(1, 2, 3, 5, 7, 11); + TestSubscriber subscriber = new TestSubscriber(); + + Observable filteredObservable = sourceObservable.elementAt(4); + + filteredObservable.subscribe(subscriber); + + subscriber.assertCompleted(); + subscriber.assertNoErrors(); + subscriber.assertValueCount(1); + subscriber.assertValue(7); + } + + @Test + public void givenObservable_whenTakingElementAtOrDefault_thenDefaultIsReturned() { + + Observable sourceObservable = Observable.just(1, 2, 3, 5, 7, 11); + TestSubscriber subscriber = new TestSubscriber(); + + Observable filteredObservable = sourceObservable.elementAtOrDefault(7, -1); + + filteredObservable.subscribe(subscriber); + + subscriber.assertCompleted(); + subscriber.assertNoErrors(); + subscriber.assertValueCount(1); + subscriber.assertValue(-1); + } + + @Test + public void givenMixedTypeObservable_whenFilteringByType_thenOnlyNumbersAreEmitted() { + + Observable sourceObservable = Observable.just(1, "two", 3, "five", 7, 11); + TestSubscriber subscriber = new TestSubscriber(); + + Observable filteredObservable = sourceObservable.ofType(String.class); + + filteredObservable.subscribe(subscriber); + + subscriber.assertCompleted(); + subscriber.assertNoErrors(); + subscriber.assertValueCount(2); + subscriber.assertValues("two", "five"); + + } +} diff --git a/rxjava/src/test/java/com/baeldung/rxjava/filters/RxJavaSkipOperatorsTest.java b/rxjava/src/test/java/com/baeldung/rxjava/filters/RxJavaSkipOperatorsTest.java new file mode 100644 index 0000000000..eca39b17bf --- /dev/null +++ b/rxjava/src/test/java/com/baeldung/rxjava/filters/RxJavaSkipOperatorsTest.java @@ -0,0 +1,105 @@ +package com.baeldung.rxjava.filters; + +import org.junit.Test; + +import rx.Observable; +import rx.observers.TestSubscriber; + +public class RxJavaSkipOperatorsTest { + + @Test + public void givenRangeObservable_whenSkipping_thenFirstFourItemsAreSkipped() { + + Observable sourceObservable = Observable.range(1, 10); + TestSubscriber subscriber = new TestSubscriber(); + + Observable filteredObservable = sourceObservable.skip(4); + + filteredObservable.subscribe(subscriber); + + subscriber.assertCompleted(); + subscriber.assertNoErrors(); + subscriber.assertValueCount(6); + subscriber.assertValues(5, 6, 7, 8, 9, 10); + } + + @Test + public void givenObservable_whenSkippingWhile_thenFirstItemsAreSkipped() { + + Observable sourceObservable = Observable.just(1, 2, 3, 4, 5, 4, 3, 2, 1); + TestSubscriber subscriber = new TestSubscriber(); + + Observable filteredObservable = sourceObservable.skipWhile(i -> i < 4); + + filteredObservable.subscribe(subscriber); + + subscriber.assertCompleted(); + subscriber.assertNoErrors(); + subscriber.assertValueCount(6); + subscriber.assertValues(4, 5, 4, 3, 2, 1); + } + + @Test + public void givenRangeObservable_whenSkippingLast_thenLastFiveItemsAreSkipped() { + + Observable sourceObservable = Observable.range(1, 10); + TestSubscriber subscriber = new TestSubscriber(); + + Observable filteredObservable = sourceObservable.skipLast(5); + + filteredObservable.subscribe(subscriber); + + subscriber.assertCompleted(); + subscriber.assertNoErrors(); + subscriber.assertValueCount(5); + subscriber.assertValues(1, 2, 3, 4, 5); + } + + @Test + public void givenObservable_whenFilteringDistinct_thenOnlyDistinctValuesAreEmitted() { + + Observable sourceObservable = Observable.just(1, 1, 2, 2, 1, 3, 3, 1); + TestSubscriber subscriber = new TestSubscriber(); + + Observable distinctObservable = sourceObservable.distinct(); + + distinctObservable.subscribe(subscriber); + + subscriber.assertCompleted(); + subscriber.assertNoErrors(); + subscriber.assertValueCount(3); + subscriber.assertValues(1, 2, 3); + } + + @Test + public void givenObservable_whenFilteringDistinctUntilChanged_thenOnlyDistinctConsecutiveItemsAreEmitted() { + + Observable sourceObservable = Observable.just(1, 1, 2, 2, 1, 3, 3, 1); + TestSubscriber subscriber = new TestSubscriber(); + + Observable distinctObservable = sourceObservable.distinctUntilChanged(); + + distinctObservable.subscribe(subscriber); + + subscriber.assertCompleted(); + subscriber.assertNoErrors(); + subscriber.assertValueCount(5); + subscriber.assertValues(1, 2, 1, 3, 1); + } + + @Test + public void givenRangeObservable_whenIgnoringElements_thenOnlyDistinctConsecutiveItemsAreEmitted() { + + Observable sourceObservable = Observable.range(1, 10); + TestSubscriber subscriber = new TestSubscriber(); + + Observable ignoredObservable = sourceObservable.ignoreElements(); + + ignoredObservable.subscribe(subscriber); + + subscriber.assertCompleted(); + subscriber.assertNoErrors(); + subscriber.assertValueCount(0); + subscriber.assertNoValues(); + } +} diff --git a/rxjava/src/test/java/com/baeldung/rxjava/filters/RxJavaTimeFilteringOperatorsTest.java b/rxjava/src/test/java/com/baeldung/rxjava/filters/RxJavaTimeFilteringOperatorsTest.java new file mode 100644 index 0000000000..868f123402 --- /dev/null +++ b/rxjava/src/test/java/com/baeldung/rxjava/filters/RxJavaTimeFilteringOperatorsTest.java @@ -0,0 +1,206 @@ +package com.baeldung.rxjava.filters; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import org.junit.Ignore; +import org.junit.Test; + +import rx.Observable; +import rx.observers.TestSubscriber; + +@Ignore("Manual only") +public class RxJavaTimeFilteringOperatorsTest { + + @Test + public void givenTimedObservable_whenSampling_thenOnlySampleItemsAreEmitted() throws InterruptedException { + + Observable timedObservable = + Observable.just(1, 2, 3, 4, 5, 6) + .zipWith( + Observable.interval(0, 1, TimeUnit.SECONDS), + (item, time) -> item + ); + + TestSubscriber subscriber = new TestSubscriber(); + + Observable sampledObservable = + timedObservable.sample(Observable.interval(2500L, TimeUnit.MILLISECONDS)); + + sampledObservable.subscribe(subscriber); + + Thread.sleep(7000); + + subscriber.assertCompleted(); + subscriber.assertNoErrors(); + subscriber.assertValues(3, 5, 6); + } + + @Test + public void givenTimedObservable_whenThrottlingLast_thenThrottleLastItemsAreEmitted() throws InterruptedException { + + Observable timedObservable = + Observable.just(1, 2, 3, 4, 5, 6) + .zipWith( + Observable.interval(0, 1, TimeUnit.SECONDS), + (item, time) -> item + ); + + TestSubscriber subscriber = new TestSubscriber(); + + Observable filteredObservable = timedObservable.throttleLast(3100L, TimeUnit.MILLISECONDS); + + filteredObservable.subscribe(subscriber); + + Thread.sleep(7000); + + subscriber.assertCompleted(); + subscriber.assertNoErrors(); + subscriber.assertValues(4, 6); + } + + @Test + public void givenRangeObservable_whenThrottlingFirst_thenThrottledFirstItemsAreEmitted() throws InterruptedException { + + Observable timedObservable = + Observable.just(1, 2, 3, 4, 5, 6) + .zipWith( + Observable.interval(0, 1, TimeUnit.SECONDS), + (item, time) -> item + ); + + TestSubscriber subscriber = new TestSubscriber(); + + Observable filteredObservable = + timedObservable.throttleFirst(4100L, TimeUnit.MILLISECONDS); + + filteredObservable.subscribe(subscriber); + + Thread.sleep(7000); + + subscriber.assertCompleted(); + subscriber.assertNoErrors(); + subscriber.assertValues(1, 6); + } + + @Test + public void givenTimedObservable_whenThrottlingWithTimeout_thenLastItemIsEmitted() throws InterruptedException { + + Observable timedObservable = + Observable.just(1, 2, 3, 4, 5, 6) + .zipWith( + Observable.interval(0, 1, TimeUnit.SECONDS), + (item, time) -> item + ); + + TestSubscriber subscriber = new TestSubscriber(); + + Observable filteredObservable = timedObservable.throttleWithTimeout(2000L, TimeUnit.MILLISECONDS); + + filteredObservable.subscribe(subscriber); + + Thread.sleep(7000); + + subscriber.assertCompleted(); + subscriber.assertNoErrors(); + subscriber.assertValue(6); + } + + @Test + public void givenTimedObservable_whenDebounceOperatorIsApplied_thenLastItemIsEmitted() throws InterruptedException { + + Observable timedObservable = + Observable.just(1, 2, 3, 4, 5, 6) + .zipWith( + Observable.interval(0, 1, TimeUnit.SECONDS), + (item, time) -> item + ); + + TestSubscriber subscriber = new TestSubscriber(); + + Observable filteredObservable = timedObservable.debounce(2000L, TimeUnit.MILLISECONDS); + + filteredObservable.subscribe(subscriber); + + Thread.sleep(7000); + + subscriber.assertCompleted(); + subscriber.assertNoErrors(); + subscriber.assertValue(6); + } + + @Test + public void givenTimedObservable_whenUsingTimeout_thenTimeOutException() throws InterruptedException { + + Observable timedObservable = + Observable.just(1, 2, 3, 4, 5, 6) + .zipWith( + Observable.interval(0, 1, TimeUnit.SECONDS), + (item, time) -> item + ); + + TestSubscriber subscriber = new TestSubscriber(); + + Observable filteredObservable = timedObservable.timeout(500L, TimeUnit.MILLISECONDS); + + filteredObservable.subscribe(subscriber); + + Thread.sleep(7000); + + subscriber.assertError(TimeoutException.class); + subscriber.assertValues(1); + } + + @Test + public void givenObservable_whenSkippingUntil_thenItemsAreSkippedUntilSecondObservableEmitsItems() throws InterruptedException { + + Observable timedObservable = + Observable.just(1, 2, 3, 4, 5, 6) + .zipWith( + Observable.interval(0, 1, TimeUnit.SECONDS), + (item, time) -> item + ); + + + Observable delayedObservable = Observable.just(1) + .delay(3000, TimeUnit.MILLISECONDS); + + TestSubscriber subscriber = new TestSubscriber(); + + Observable filteredObservable = timedObservable.skipUntil(delayedObservable); + + filteredObservable.subscribe(subscriber); + + Thread.sleep(7000); + + subscriber.assertCompleted(); + subscriber.assertNoErrors(); + subscriber.assertValues(4, 5, 6); + } + + @Test + public void givenObservable_whenSkippingWhile_thenItemsAreEmittedUntilSecondObservableEmitsItems() throws InterruptedException { + + Observable timedObservable = + Observable.just(1, 2, 3, 4, 5, 6) + .zipWith( + Observable.interval(0, 1, TimeUnit.SECONDS), + (item, time) -> item + ); + + TestSubscriber subscriber = new TestSubscriber(); + + Observable delayedObservable = Observable.just(1) + .delay(3000, TimeUnit.MILLISECONDS); + + Observable filteredObservable = timedObservable.takeUntil(delayedObservable); + + filteredObservable.subscribe(subscriber); + + Thread.sleep(7000); + + subscriber.assertCompleted(); + subscriber.assertNoErrors(); + subscriber.assertValues(1, 2, 3); + } +}