diff --git a/rxjava/src/main/java/com/baelding/rxjava/ConnectableObservableImpl.java b/rxjava/src/main/java/com/baelding/rxjava/ConnectableObservableImpl.java index 8788b894aa..005487dae8 100644 --- a/rxjava/src/main/java/com/baelding/rxjava/ConnectableObservableImpl.java +++ b/rxjava/src/main/java/com/baelding/rxjava/ConnectableObservableImpl.java @@ -9,7 +9,8 @@ public class ConnectableObservableImpl { public static void main(String[] args) throws InterruptedException { - ConnectableObservable connectable = Observable.interval(200, TimeUnit.MILLISECONDS).publish(); + ConnectableObservable connectable + = Observable.interval(200, TimeUnit.MILLISECONDS).publish(); connectable.subscribe(System.out::println); System.out.println("Connect"); diff --git a/rxjava/src/main/java/com/baelding/rxjava/ObservableImpl.java b/rxjava/src/main/java/com/baelding/rxjava/ObservableImpl.java index f8f5b81883..9ab0d0e1e6 100644 --- a/rxjava/src/main/java/com/baelding/rxjava/ObservableImpl.java +++ b/rxjava/src/main/java/com/baelding/rxjava/ObservableImpl.java @@ -3,12 +3,22 @@ package com.baelding.rxjava; import rx.Observable; import rx.observables.BlockingObservable; +import java.util.Arrays; +import java.util.List; + public class ObservableImpl { - public static void main(String[] args) { + static Integer[] numbers = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}; - Integer[] numbers = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10}; - String[] letters = {"a", "b", "c", "d", "e", "f", "g"}; + static String[] letters = {"a", "b", "c", "d", "e", "f", "g", "h", "i"}; + static String[] titles = {"title"}; + public static List titleList = Arrays.asList(titles); + + public static Observable getTitle() { + return Observable.from(titleList); + } + + public static void main(String[] args) { System.out.println("-------Just-----------"); Observable observable = Observable.just("Hello"); @@ -28,14 +38,9 @@ public class ObservableImpl { System.out.println(); System.out.println("-------FlatMap-----------"); - Observable.from(letters) - .flatMap((letter) -> { - String[] returnStrings = {letter.toUpperCase(), letter.toLowerCase()}; - return Observable.from(returnStrings); - }) - .subscribe( - System.out::print - ); + Observable.just("book1", "book2") + .flatMap(s -> getTitle()) + .subscribe(System.out::print); System.out.println(); System.out.println("--------Scan----------"); @@ -55,16 +60,12 @@ public class ObservableImpl { System.out.println("-------Filter-----------"); Observable.from(numbers) .filter(i -> (i % 2 == 1)) - .subscribe( - System.out::println - ); + .subscribe(System.out::println); System.out.println("------DefaultIfEmpty------------"); Observable.empty() .defaultIfEmpty("Observable is empty") - .subscribe( - System.out::println - ); + .subscribe(System.out::println); System.out.println("------DefaultIfEmpty-2-----------"); Observable.from(letters) diff --git a/rxjava/src/main/java/com/baelding/rxjava/SubjectImpl.java b/rxjava/src/main/java/com/baelding/rxjava/SubjectImpl.java index 244c291f00..aac9b4454a 100644 --- a/rxjava/src/main/java/com/baelding/rxjava/SubjectImpl.java +++ b/rxjava/src/main/java/com/baelding/rxjava/SubjectImpl.java @@ -1,69 +1,73 @@ package com.baelding.rxjava; -import rx.Observable; -import rx.schedulers.Schedulers; +import rx.Observer; import rx.subjects.PublishSubject; public class SubjectImpl { - public static final String[] subscriber1 = {""}; - public static final String[] subscriber2 = {""}; + public static Integer subscriber1 = 0; + public static Integer subscriber2 = 0; - public static String subjectMethod() throws InterruptedException { + public static Integer subjectMethod() { + PublishSubject subject = PublishSubject.create(); - String[] letters = {"a", "b", "c", "d", "e", "f", "g"}; - Long signal = 500L; - PublishSubject subject; + subject.subscribe(getFirstObserver()); - synchronized (signal) { - subject = PublishSubject.create(); - subject.subscribe( - (letter) -> { - subscriber1[0] += letter; - System.out.println("Subscriber 1: " + subscriber1[0]); - try { - Thread.sleep(500); - } catch (InterruptedException e) { - e.printStackTrace(); - } - if (letter.equals("c")) { - synchronized (signal) { - signal.notify(); - } - } - } - ); - } + subject.onNext(1); + subject.onNext(2); + subject.onNext(3); - Observable.from(letters) - .subscribeOn(Schedulers.computation()) - .subscribe( - subject::onNext, - subject::onError, - () -> { - System.out.println("Subscriber 1 completed "); - subject.onCompleted(); - synchronized (signal) { - signal.notify(); - } + subject.subscribe(getSecondObserver()); + + subject.onNext(4); + subject.onCompleted(); + return subscriber1 + subscriber2; + } + + + public static Observer getFirstObserver() { + return new Observer() { + + @Override + public void onNext(Integer value) { + subscriber1 += value; + System.out.println("Subscriber1: " + value); } - ); - synchronized (signal) { - signal.wait(); - subject.subscribe( - (letter) -> { - subscriber2[0] += letter; - System.out.println("Subscriber 2: " + subscriber2[0]); - }, - subject::onError, - () -> System.out.println("Subscriber 2 completed ") - ); - } + @Override + public void onError(Throwable e) { + System.out.println("error"); + } - synchronized (signal) { - signal.wait(); - return subscriber1[0] + subscriber2[0]; - } + @Override + public void onCompleted() { + System.out.println("Subscriber1 completed"); + } + }; + } + + public static Observer getSecondObserver() { + return new Observer() { + + @Override + public void onNext(Integer value) { + subscriber2 += value; + System.out.println("Subscriber2: " + value); + } + + @Override + public void onError(Throwable e) { + System.out.println("error"); + } + + @Override + public void onCompleted() { + System.out.println("Subscriber2 completed"); + } + }; + } + + public static void main(String[] args) throws InterruptedException { + System.out.println(subjectMethod()); } } diff --git a/rxjava/src/test/java/com/baeldung/rxjava/ConnectableObservableTest.java b/rxjava/src/test/java/com/baeldung/rxjava/ConnectableObservableTest.java index 88669179a6..981875510d 100644 --- a/rxjava/src/test/java/com/baeldung/rxjava/ConnectableObservableTest.java +++ b/rxjava/src/test/java/com/baeldung/rxjava/ConnectableObservableTest.java @@ -13,15 +13,14 @@ public class ConnectableObservableTest { @Test public void givenConnectableObservable_whenConnect_thenGetMessage() throws InterruptedException { - final String[] result = {""}; - ConnectableObservable connectable = Observable.interval(200, TimeUnit.MILLISECONDS).publish(); + String[] result = {""}; + ConnectableObservable connectable + = Observable.interval(200, TimeUnit.MILLISECONDS).publish(); connectable.subscribe(i -> result[0] += i); - assertFalse(result[0].equals("01")); connectable.connect(); - Thread.currentThread().sleep(500); - + Thread.sleep(500); assertTrue(result[0].equals("01")); } } diff --git a/rxjava/src/test/java/com/baeldung/rxjava/ObservableTest.java b/rxjava/src/test/java/com/baeldung/rxjava/ObservableTest.java index a810abaa1e..08fccfb238 100644 --- a/rxjava/src/test/java/com/baeldung/rxjava/ObservableTest.java +++ b/rxjava/src/test/java/com/baeldung/rxjava/ObservableTest.java @@ -2,10 +2,8 @@ package com.baeldung.rxjava; import org.junit.Test; import rx.Observable; -import rx.observables.ConnectableObservable; - -import java.util.concurrent.TimeUnit; +import static com.baelding.rxjava.ObservableImpl.getTitle; import static junit.framework.Assert.assertTrue; public class ObservableTest { @@ -24,41 +22,23 @@ public class ObservableTest { String[] letters = {"a", "b", "c", "d", "e", "f", "g"}; Observable observable = Observable.from(letters); observable.subscribe( - //onNext - (i) -> { - result += i; - }, - //onError - (t) -> { - t.printStackTrace(); - }, - //onCompleted - () -> { - result += "Complete"; - } + i -> result += i, + Throwable::printStackTrace, + () -> result += "_Complete" ); - assertTrue(result.equals("abcdefgComplete")); + assertTrue(result.equals("abcdefg_Complete")); } - @Test + @Test public void givenArray_whenConvertsObservabletoBlockingObservable_thenReturnFirstElement() { String[] letters = {"a", "b", "c", "d", "e", "f", "g"}; Observable observable = Observable.from(letters); String blockingObservable = observable.toBlocking().first(); observable.subscribe( - //onNext - (i) -> { - result += i; - }, - //onError - (t) -> { - t.printStackTrace(); - }, - //onCompleted - () -> { - result += "Complete"; - } + i -> result += i, + Throwable::printStackTrace, + () -> result += "_Completed" ); assertTrue(String.valueOf(result.charAt(0)).equals(blockingObservable)); } @@ -68,45 +48,31 @@ public class ObservableTest { String[] letters = {"a", "b", "c", "d", "e", "f", "g"}; Observable.from(letters) - .map((letter) -> { - return letter.toUpperCase(); - }) - .subscribe((letter) -> { - result += letter; - }); + .map(String::toUpperCase) + .subscribe(letter -> result += letter); assertTrue(result.equals("ABCDEFG")); } @Test public void givenArray_whenFlatMapAndSubscribe_thenReturnUpperAndLowerCaseLetters() { - String[] letters = {"a", "b", "c", "d", "e", "f", "g"}; - Observable.from(letters) - .flatMap((letter) -> { - String[] returnStrings = {letter.toUpperCase(), letter.toLowerCase()}; - return Observable.from(returnStrings); - }) - .subscribe((letter) -> { - result += letter; - }); + Observable.just("book1", "book2") + .flatMap(s -> getTitle()) + .subscribe(l -> result += l); - assertTrue(result.equals("AaBbCcDdEeFfGg")); + assertTrue(result.equals("titletitle")); } @Test public void givenArray_whenScanAndSubscribe_thenReturnTheSumOfAllLetters() { - String[] letters = {"a", "b", "c", "d", "e", "f", "g"}; + String[] letters = {"a", "b", "c"}; Observable.from(letters) - .scan(new StringBuilder(), (buffer, nextLetter) -> { - return buffer.append(nextLetter); - }) - .subscribe((total) -> { - result = total.toString(); - }); + .scan(new StringBuilder(), StringBuilder::append) + .subscribe(total -> result += total.toString()); - assertTrue(result.equals("abcdefg")); + assertTrue(result.equals("aababc")); } @Test @@ -116,18 +82,16 @@ public class ObservableTest { String[] ODD = {""}; Observable.from(numbers) - .groupBy((i) -> { - return 0 == (i % 2) ? "EVEN" : "ODD"; - }) - .subscribe((group) -> { - group.subscribe((number) -> { - if (group.getKey().toString().equals("EVEN")) { - EVEN[0] += number; - } else { - ODD[0] += number; - } - }); - }); + .groupBy(i -> 0 == (i % 2) ? "EVEN" : "ODD") + .subscribe(group -> + group.subscribe((number) -> { + if (group.getKey().toString().equals("EVEN")) { + EVEN[0] += number; + } else { + ODD[0] += number; + } + }) + ); assertTrue(EVEN[0].equals("0246810")); assertTrue(ODD[0].equals("13579")); @@ -138,12 +102,8 @@ public class ObservableTest { Integer[] numbers = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10}; Observable.from(numbers) - .filter((i) -> { - return (i % 2 == 1); - }) - .subscribe((i) -> { - result += i; - }); + .filter(i -> (i % 2 == 1)) + .subscribe(i -> result += i); assertTrue(result.equals("13579")); } @@ -152,10 +112,8 @@ public class ObservableTest { public void givenEmptyObservable_whenDefaultIfEmpty_thenGetDefaultMessage() { Observable.empty() - .defaultIfEmpty("Observable is empty") - .subscribe((s) -> { - result += s; - }); + .defaultIfEmpty("Observable is empty") + .subscribe(s -> result += s); assertTrue(result.equals("Observable is empty")); } @@ -165,11 +123,9 @@ public class ObservableTest { String[] letters = {"a", "b", "c", "d", "e", "f", "g"}; Observable.from(letters) - .defaultIfEmpty("Observable is empty") - .first() - .subscribe((s) -> { - result += s; - }); + .defaultIfEmpty("Observable is empty") + .first() + .subscribe(s -> result += s); assertTrue(result.equals("a")); } @@ -180,12 +136,8 @@ public class ObservableTest { final Integer[] sum = {0}; Observable.from(numbers) - .takeWhile((i) -> { - return i < 5; - }) - .subscribe((s) -> { - sum[0] += s; - }); + .takeWhile(i -> i < 5) + .subscribe(s -> sum[0] += s); assertTrue(sum[0] == 10); } diff --git a/rxjava/src/test/java/com/baeldung/rxjava/ResourceManagementTest.java b/rxjava/src/test/java/com/baeldung/rxjava/ResourceManagementTest.java index 6cc01d8932..9c52af61d0 100644 --- a/rxjava/src/test/java/com/baeldung/rxjava/ResourceManagementTest.java +++ b/rxjava/src/test/java/com/baeldung/rxjava/ResourceManagementTest.java @@ -10,32 +10,25 @@ public class ResourceManagementTest { @Test public void givenResource_whenUsingOberservable_thenCreatePrintDisposeResource() throws InterruptedException { - final String[] result = {""}; - + String[] result = {""}; Observable values = Observable.using( - //a factory function that creates a disposable resource - () -> { - String resource = "MyResource"; - return resource; - }, - //a factory function that creates an Observable - (resource) -> { - return Observable.create(o -> { - for (Character c : resource.toCharArray()) - o.onNext(c); - o.onCompleted(); - }); - }, - //a function that disposes of the resource - (resource) -> System.out.println("Disposed: " + resource) + () -> { + return "MyResource"; + }, + r -> { + return Observable.create(o -> { + for (Character c : r.toCharArray()) + o.onNext(c); + o.onCompleted(); + }); + }, + r -> System.out.println("Disposed: " + r) ); values.subscribe( - v -> result[0] += v, - e -> result[0] += e + v -> result[0] += v, + e -> result[0] += e ); - assertTrue(result[0].equals("MyResource")); - } } diff --git a/rxjava/src/test/java/com/baeldung/rxjava/SingleTest.java b/rxjava/src/test/java/com/baeldung/rxjava/SingleTest.java index d447931b29..6d428d856b 100644 --- a/rxjava/src/test/java/com/baeldung/rxjava/SingleTest.java +++ b/rxjava/src/test/java/com/baeldung/rxjava/SingleTest.java @@ -10,19 +10,15 @@ public class SingleTest { @Test public void givenSingleObservable_whenSuccess_thenGetMessage() throws InterruptedException { - final String[] result = {""}; - Single single = Observable.just("Hello").toSingle() - .doOnSuccess( - (i) -> { - result[0] += i; - }) - .doOnError( - (error) -> { - throw new RuntimeException(error.getMessage()); - }); + String[] result = {""}; + Single single = Observable.just("Hello") + .toSingle() + .doOnSuccess(i -> result[0] += i) + .doOnError(error -> { + throw new RuntimeException(error.getMessage()); + }); single.subscribe(); - assertTrue(result[0].equals("Hello")); } - } +} diff --git a/rxjava/src/test/java/com/baeldung/rxjava/SubjectTest.java b/rxjava/src/test/java/com/baeldung/rxjava/SubjectTest.java index 8ff81e7066..429a7fe231 100644 --- a/rxjava/src/test/java/com/baeldung/rxjava/SubjectTest.java +++ b/rxjava/src/test/java/com/baeldung/rxjava/SubjectTest.java @@ -2,16 +2,25 @@ package com.baeldung.rxjava; import com.baelding.rxjava.SubjectImpl; import org.junit.Test; +import rx.subjects.PublishSubject; import static junit.framework.Assert.assertTrue; public class SubjectTest { @Test - public void givenSubjectAndTwoSubscribers_whenSubscribeOnSubjectAfterLetterC_thenSecondSubscriberBeginsToPrint() throws InterruptedException { - String result = SubjectImpl.subjectMethod(); - String subscribers = SubjectImpl.subscriber1[0] + SubjectImpl.subscriber2[0]; + public void givenSubjectAndTwoSubscribers_whenSubscribeOnSubject_thenSubscriberBeginsToAdd(){ + PublishSubject subject = PublishSubject.create(); - assertTrue(subscribers.equals(result)); + subject.subscribe(SubjectImpl.getFirstObserver()); + subject.onNext(1); + subject.onNext(2); + subject.onNext(3); + + subject.subscribe(SubjectImpl.getSecondObserver()); + subject.onNext(4); + subject.onCompleted(); + + assertTrue(SubjectImpl.subscriber1 + SubjectImpl.subscriber2 == 14); } }