diff --git a/rxjava/src/test/java/com/baeldung/rxjava/combine/ObservableCombineUnitTest.java b/rxjava/src/test/java/com/baeldung/rxjava/combine/ObservableCombineUnitTest.java index 693608d116..72eab2fdfd 100644 --- a/rxjava/src/test/java/com/baeldung/rxjava/combine/ObservableCombineUnitTest.java +++ b/rxjava/src/test/java/com/baeldung/rxjava/combine/ObservableCombineUnitTest.java @@ -4,14 +4,8 @@ import static org.assertj.core.api.Assertions.assertThat; import java.util.ArrayList; import java.util.List; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; -import org.junit.AfterClass; -import org.junit.BeforeClass; import org.junit.Test; import rx.Observable; @@ -19,50 +13,18 @@ import rx.observers.TestSubscriber; public class ObservableCombineUnitTest { - private static ExecutorService executor; - - @BeforeClass - public static void setupClass() { - executor = Executors.newFixedThreadPool(10); - } - - @AfterClass - public static void tearDownClass() { - executor.shutdown(); - } - @Test public void givenTwoObservables_whenMerged_shouldEmitCombinedResults() { - List results = new ArrayList<>(); + TestSubscriber testSubscriber = new TestSubscriber<>(); //@formatter:off Observable.merge( Observable.from(new String[] {"Hello", "World"}), Observable.from(new String[]{ "I love", "RxJava"}) - ).subscribe(data -> { - results.add(data); - }); + ).subscribe(testSubscriber); //@formatter:on - assertThat(results).isNotEmpty(); - assertThat(results.size()).isEqualTo(4); - assertThat(results).contains("Hello", "World", "I love", "RxJava"); - } - - @Test - public void givenAnObservable_whenStartWith_thenPrependEmittedResults() { - StringBuffer buffer = new StringBuffer(); - - //@formatter:off - Observable - .from(new String[] { "RxJava", "Observables" }) - .startWith("Buzzwords of Reactive Programming") - .subscribe(data -> { - buffer.append(data).append(" "); - }); - //@formatter:on - - assertThat(buffer.toString().trim()).isEqualTo("Buzzwords of Reactive Programming RxJava Observables"); + testSubscriber.assertValues("Hello", "World", "I love", "RxJava"); } @Test @@ -73,11 +35,7 @@ public class ObservableCombineUnitTest { Observable.zip( Observable.from(new String[] { "Simple", "Moderate", "Complex" }), Observable.from(new String[] { "Solutions", "Success", "Heirarchy"}), - (str1, str2) -> { - return str1 + " " + str2; - }).subscribe(zipped -> { - zippedStrings.add(zipped); - }); + (str1, str2) -> str1 + " " + str2).subscribe(zippedStrings::add); //formatter:on assertThat(zippedStrings).isNotEmpty(); @@ -87,35 +45,31 @@ public class ObservableCombineUnitTest { @Test public void givenMutipleObservablesOneThrows_whenMerged_thenCombineBeforePropagatingError() { - Future f1 = executor.submit(createCallable("Hello")); - Future f2 = executor.submit(createCallable("World")); - Future f3 = executor.submit(createCallable(null)); - Future f4 = executor.submit(createCallable("RxJava")); - TestSubscriber testSubscriber = new TestSubscriber<>(); - //@formatter:off Observable.mergeDelayError( - Observable.from(f1), - Observable.from(f2), - Observable.from(f3), - Observable.from(f4) + Observable.from(new String[] { "hello", "world" }), + Observable.error(new RuntimeException("Some exception")), + Observable.from(new String[] { "rxjava" }) ).subscribe(testSubscriber); - //@formatter:on - + testSubscriber.assertValues("hello", "world", "rxjava"); - testSubscriber.assertError(ExecutionException.class); + testSubscriber.assertError(RuntimeException.class);; } - private Callable createCallable(final String data) { - return new Callable() { - @Override - public String call() throws Exception { - if (data == null) { - throw new IllegalArgumentException("Data should not be null."); - } - return data.toLowerCase(); - } - }; + @Test + public void givenAStream_whenZippedWithInterval_shouldDelayStreamEmmission() { + TestSubscriber testSubscriber = new TestSubscriber<>(); + + Observable data = Observable.just("one", "two", "three", "four", "five"); + Observable interval = Observable.interval(1L, TimeUnit.SECONDS); + + Observable + .zip(data, interval, (strData, tick) -> String.format("[%d]=%s", tick, strData)) + .toBlocking().subscribe(testSubscriber); + + testSubscriber.assertCompleted(); + testSubscriber.assertValueCount(5); + testSubscriber.assertValues("[0]=one", "[1]=two", "[2]=three", "[3]=four", "[4]=five"); } }