diff --git a/rxjava/src/main/java/com/baelding/rxjava/ConnectableObservableImpl.java b/rxjava/src/main/java/com/baelding/rxjava/ConnectableObservableImpl.java new file mode 100644 index 0000000000..c5da7c1df5 --- /dev/null +++ b/rxjava/src/main/java/com/baelding/rxjava/ConnectableObservableImpl.java @@ -0,0 +1,21 @@ +package com.baelding.rxjava; + +import rx.Observable; +import rx.observables.ConnectableObservable; + +import java.util.concurrent.TimeUnit; + +public class ConnectableObservableImpl { + + public static void main(String[] args) throws InterruptedException { + + ConnectableObservable connectable = Observable.interval(200, TimeUnit.MILLISECONDS).publish(); + connectable.subscribe(System.out::println); + + System.out.println("Connect"); + connectable.connect(); + + Thread.currentThread().sleep(500); + System.out.println("Sleep"); + } +} diff --git a/rxjava/src/main/java/com/baelding/rxjava/ObservableImpl.java b/rxjava/src/main/java/com/baelding/rxjava/ObservableImpl.java new file mode 100644 index 0000000000..6254d03491 --- /dev/null +++ b/rxjava/src/main/java/com/baelding/rxjava/ObservableImpl.java @@ -0,0 +1,105 @@ +package com.baelding.rxjava; + +import rx.Observable; +import rx.observables.BlockingObservable; + +public class ObservableImpl { + + + public static void main(String[] args) { + + Integer[] numbers = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10}; + String[] letters = {"a", "b", "c", "d", "e", "f", "g"}; + + System.out.println("-------Just-----------"); + Observable observable = Observable.just("Hello"); + observable.subscribe( + //onNext + System.out::println, + //onError + Throwable::printStackTrace, + //onCompleted + () -> System.out.println("onCompleted") + ); + + BlockingObservable blockingObservable = observable.toBlocking(); + + System.out.println(); + System.out.println("-------Map-----------"); + Observable.from(letters) + .map((letter) -> { + return letter.toUpperCase(); + }) + .subscribe( + System.out::print + ); + + System.out.println(); + System.out.println("-------FlatMap-----------"); + Observable.from(letters) + .flatMap((letter) -> { + String[] returnStrings = {letter.toUpperCase(), letter.toLowerCase()}; + return Observable.from(returnStrings); + }) + .subscribe( + System.out::print + ); + + System.out.println(); + System.out.println("--------Scan----------"); + Observable.from(letters) + .scan(new StringBuilder(), (buffer, nextLetter) -> { + return buffer.append(nextLetter); + }) + .subscribe((total) -> { + System.out.println(total.toString()); + }); + + System.out.println(); + System.out.println("------GroubBy------------"); + Observable.from(numbers) + .groupBy((i) -> { + return 0 == (i % 2) ? "EVEN" : "ODD"; + }) + .subscribe((group) -> { + group.subscribe((number) -> { + System.out.println(group.getKey() + " : " + number); + }); + }); + + System.out.println(); + System.out.println("-------Filter-----------"); + Observable.from(numbers) + .filter((i) -> { + return (i % 2 == 1); + }) + .subscribe( + System.out::println + ); + + + System.out.println("------DefaultIfEmpty------------"); + Observable.empty() + .defaultIfEmpty("Observable is empty") + .subscribe( + System.out::println + ); + + + + System.out.println("------DefaultIfEmpty-2-----------"); + Observable.from(letters) + .defaultIfEmpty("Observable is empty") + .first() + .subscribe(System.out::println); + + System.out.println("-------TakeWhile-----------"); + Observable.from(numbers) + .takeWhile((i) -> { + return i < 5; + }) + .subscribe(System.out::println); + + + } +} \ No newline at end of file diff --git a/rxjava/src/main/java/com/baelding/rxjava/ResourceManagement.java b/rxjava/src/main/java/com/baelding/rxjava/ResourceManagement.java new file mode 100644 index 0000000000..9bea622447 --- /dev/null +++ b/rxjava/src/main/java/com/baelding/rxjava/ResourceManagement.java @@ -0,0 +1,37 @@ +package com.baelding.rxjava; + + +import rx.Observable; + +public class ResourceManagement { + + + public static void main(String[] args) { + + Observable values = Observable.using( + //a factory function that creates a disposable resource + () -> { + String resource = "MyResource"; + System.out.println("Leased: " + resource); + return resource; + }, + //a factory function that creates an Observable + (resource) -> { + return Observable.create(o -> { + for (Character c : resource.toCharArray()) + o.onNext(c); + o.onCompleted(); + }); + }, + //a function that disposes of the resource + (resource) -> System.out.println("Disposed: " + resource) + ); + + values.subscribe( + v -> System.out.println(v), + e -> System.out.println(e) + ); + + } +} + diff --git a/rxjava/src/main/java/com/baelding/rxjava/SingleImpl.java b/rxjava/src/main/java/com/baelding/rxjava/SingleImpl.java new file mode 100644 index 0000000000..f6636999b6 --- /dev/null +++ b/rxjava/src/main/java/com/baelding/rxjava/SingleImpl.java @@ -0,0 +1,24 @@ +package com.baelding.rxjava; + +import rx.Observable; +import rx.Single; + +public class SingleImpl { + + public static void main(String[] args) { + + Single single = Observable.just("Hello") + .toSingle() + .doOnSuccess( + System.out::print + ) + .doOnError( + (error) -> { + throw new RuntimeException(error.getMessage()); + }); + single.subscribe(); + + + } + +} diff --git a/rxjava/src/main/java/com/baelding/rxjava/SubjectImpl.java b/rxjava/src/main/java/com/baelding/rxjava/SubjectImpl.java new file mode 100644 index 0000000000..e96f82a171 --- /dev/null +++ b/rxjava/src/main/java/com/baelding/rxjava/SubjectImpl.java @@ -0,0 +1,82 @@ +package com.baelding.rxjava; + +import rx.Observable; +import rx.schedulers.Schedulers; +import rx.subjects.PublishSubject; + + +public class SubjectImpl { + + public static final String[] subscriber1 = {""}; + public static final String[] subscriber2 = {""}; + + public static String subjectMethod() throws InterruptedException { + + + String[] letters = {"a", "b", "c", "d", "e", "f", "g"}; + Long signal = new Long(500L); + PublishSubject subject; + + synchronized (signal) { + subject = PublishSubject.create(); + subject.subscribe( + (letter) -> { + subscriber1[0] += letter; + System.out.println("Subscriber 1: " + subscriber1[0]); + try { + Thread.currentThread().sleep(500); + } catch (InterruptedException e) { + e.printStackTrace(); + } + if (letter.equals("c")) { + synchronized (signal) { + signal.notify(); + } + } + } + ); + } + + Observable.from(letters) + .subscribeOn(Schedulers.computation()) + .subscribe( + (letter) -> { + subject.onNext(letter); + }, + (t) -> { + subject.onError(t); + }, + () -> { + System.out.println("Subscriber 1 completed "); + subject.onCompleted(); + synchronized (signal) { + signal.notify(); + } + + } + ); + + synchronized (signal) { + signal.wait(); + subject.subscribe( + (letter) -> { + subscriber2[0] += letter; + System.out.println("Subscriber 2: " + subscriber2[0]); + }, + (t) -> { + subject.onError(t); + }, + () -> { + System.out.println("Subscriber 2 completed "); + } + ); + } + + synchronized (signal) { + signal.wait(); + return subscriber1[0] + subscriber2[0]; + } + + } + +} diff --git a/rxjava/src/test/java/com/baeldung/rxjava/ConnectableObservableTest.java b/rxjava/src/test/java/com/baeldung/rxjava/ConnectableObservableTest.java new file mode 100644 index 0000000000..88669179a6 --- /dev/null +++ b/rxjava/src/test/java/com/baeldung/rxjava/ConnectableObservableTest.java @@ -0,0 +1,27 @@ +package com.baeldung.rxjava; + +import org.junit.Test; +import rx.Observable; +import rx.observables.ConnectableObservable; + +import java.util.concurrent.TimeUnit; + +import static junit.framework.Assert.assertFalse; +import static junit.framework.Assert.assertTrue; + +public class ConnectableObservableTest { + + @Test + public void givenConnectableObservable_whenConnect_thenGetMessage() throws InterruptedException { + final String[] result = {""}; + ConnectableObservable connectable = Observable.interval(200, TimeUnit.MILLISECONDS).publish(); + connectable.subscribe(i -> result[0] += i); + + assertFalse(result[0].equals("01")); + + connectable.connect(); + Thread.currentThread().sleep(500); + + assertTrue(result[0].equals("01")); + } +} diff --git a/rxjava/src/test/java/com/baeldung/rxjava/ObservableTest.java b/rxjava/src/test/java/com/baeldung/rxjava/ObservableTest.java new file mode 100644 index 0000000000..a810abaa1e --- /dev/null +++ b/rxjava/src/test/java/com/baeldung/rxjava/ObservableTest.java @@ -0,0 +1,193 @@ +package com.baeldung.rxjava; + +import org.junit.Test; +import rx.Observable; +import rx.observables.ConnectableObservable; + +import java.util.concurrent.TimeUnit; + +import static junit.framework.Assert.assertTrue; + +public class ObservableTest { + + String result = ""; + + @Test + public void givenString_whenJustAndSubscribe_thenEmitsSingleItem() { + Observable observable = Observable.just("Hello"); + observable.subscribe(s -> result = s); + assertTrue(result.equals("Hello")); + } + + @Test + public void givenArray_whenFromAndSubscribe_thenEmitsItems() { + String[] letters = {"a", "b", "c", "d", "e", "f", "g"}; + Observable observable = Observable.from(letters); + observable.subscribe( + //onNext + (i) -> { + result += i; + }, + //onError + (t) -> { + t.printStackTrace(); + }, + //onCompleted + () -> { + result += "Complete"; + } + ); + assertTrue(result.equals("abcdefgComplete")); + } + + @Test + public void givenArray_whenConvertsObservabletoBlockingObservable_thenReturnFirstElement() { + String[] letters = {"a", "b", "c", "d", "e", "f", "g"}; + Observable observable = Observable.from(letters); + String blockingObservable = observable.toBlocking().first(); + + observable.subscribe( + //onNext + (i) -> { + result += i; + }, + //onError + (t) -> { + t.printStackTrace(); + }, + //onCompleted + () -> { + result += "Complete"; + } + ); + assertTrue(String.valueOf(result.charAt(0)).equals(blockingObservable)); + } + + @Test + public void givenArray_whenMapAndSubscribe_thenReturnCapitalLetters() { + String[] letters = {"a", "b", "c", "d", "e", "f", "g"}; + + Observable.from(letters) + .map((letter) -> { + return letter.toUpperCase(); + }) + .subscribe((letter) -> { + result += letter; + }); + + assertTrue(result.equals("ABCDEFG")); + } + + @Test + public void givenArray_whenFlatMapAndSubscribe_thenReturnUpperAndLowerCaseLetters() { + String[] letters = {"a", "b", "c", "d", "e", "f", "g"}; + + Observable.from(letters) + .flatMap((letter) -> { + String[] returnStrings = {letter.toUpperCase(), letter.toLowerCase()}; + return Observable.from(returnStrings); + }) + .subscribe((letter) -> { + result += letter; + }); + + assertTrue(result.equals("AaBbCcDdEeFfGg")); + } + + @Test + public void givenArray_whenScanAndSubscribe_thenReturnTheSumOfAllLetters() { + String[] letters = {"a", "b", "c", "d", "e", "f", "g"}; + + Observable.from(letters) + .scan(new StringBuilder(), (buffer, nextLetter) -> { + return buffer.append(nextLetter); + }) + .subscribe((total) -> { + result = total.toString(); + }); + + assertTrue(result.equals("abcdefg")); + } + + @Test + public void givenArrayOfNumbers_whenGroupBy_thenCreateTwoGroupsBasedOnParity() { + Integer[] numbers = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10}; + String[] EVEN = {""}; + String[] ODD = {""}; + + Observable.from(numbers) + .groupBy((i) -> { + return 0 == (i % 2) ? "EVEN" : "ODD"; + }) + .subscribe((group) -> { + group.subscribe((number) -> { + if (group.getKey().toString().equals("EVEN")) { + EVEN[0] += number; + } else { + ODD[0] += number; + } + }); + }); + + assertTrue(EVEN[0].equals("0246810")); + assertTrue(ODD[0].equals("13579")); + } + + @Test + public void givenArrayOfNumbers_whenFilter_thenGetAllOddNumbers() { + Integer[] numbers = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10}; + + Observable.from(numbers) + .filter((i) -> { + return (i % 2 == 1); + }) + .subscribe((i) -> { + result += i; + }); + + assertTrue(result.equals("13579")); + } + + @Test + public void givenEmptyObservable_whenDefaultIfEmpty_thenGetDefaultMessage() { + + Observable.empty() + .defaultIfEmpty("Observable is empty") + .subscribe((s) -> { + result += s; + }); + + assertTrue(result.equals("Observable is empty")); + } + + @Test + public void givenObservableFromArray_whenDefaultIfEmptyAndFirst_thenGetFirstLetterFromArray() { + String[] letters = {"a", "b", "c", "d", "e", "f", "g"}; + + Observable.from(letters) + .defaultIfEmpty("Observable is empty") + .first() + .subscribe((s) -> { + result += s; + }); + + assertTrue(result.equals("a")); + } + + @Test + public void givenObservableFromArray_whenTakeWhile_thenGetSumOfNumbersFromCondition() { + Integer[] numbers = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10}; + final Integer[] sum = {0}; + + Observable.from(numbers) + .takeWhile((i) -> { + return i < 5; + }) + .subscribe((s) -> { + sum[0] += s; + }); + + assertTrue(sum[0] == 10); + } + +} diff --git a/rxjava/src/test/java/com/baeldung/rxjava/ResourceManagementTest.java b/rxjava/src/test/java/com/baeldung/rxjava/ResourceManagementTest.java new file mode 100644 index 0000000000..6cc01d8932 --- /dev/null +++ b/rxjava/src/test/java/com/baeldung/rxjava/ResourceManagementTest.java @@ -0,0 +1,41 @@ +package com.baeldung.rxjava; + +import org.junit.Test; +import rx.Observable; + +import static junit.framework.Assert.assertTrue; + +public class ResourceManagementTest { + + @Test + public void givenResource_whenUsingOberservable_thenCreatePrintDisposeResource() throws InterruptedException { + + final String[] result = {""}; + + Observable values = Observable.using( + //a factory function that creates a disposable resource + () -> { + String resource = "MyResource"; + return resource; + }, + //a factory function that creates an Observable + (resource) -> { + return Observable.create(o -> { + for (Character c : resource.toCharArray()) + o.onNext(c); + o.onCompleted(); + }); + }, + //a function that disposes of the resource + (resource) -> System.out.println("Disposed: " + resource) + ); + + values.subscribe( + v -> result[0] += v, + e -> result[0] += e + ); + + assertTrue(result[0].equals("MyResource")); + + } +} diff --git a/rxjava/src/test/java/com/baeldung/rxjava/SingleTest.java b/rxjava/src/test/java/com/baeldung/rxjava/SingleTest.java new file mode 100644 index 0000000000..d447931b29 --- /dev/null +++ b/rxjava/src/test/java/com/baeldung/rxjava/SingleTest.java @@ -0,0 +1,28 @@ +package com.baeldung.rxjava; + +import org.junit.Test; +import rx.Observable; +import rx.Single; + +import static junit.framework.Assert.assertTrue; + +public class SingleTest { + + @Test + public void givenSingleObservable_whenSuccess_thenGetMessage() throws InterruptedException { + final String[] result = {""}; + Single single = Observable.just("Hello").toSingle() + .doOnSuccess( + (i) -> { + result[0] += i; + }) + .doOnError( + (error) -> { + throw new RuntimeException(error.getMessage()); + }); + single.subscribe(); + + assertTrue(result[0].equals("Hello")); + } + + } diff --git a/rxjava/src/test/java/com/baeldung/rxjava/SubjectTest.java b/rxjava/src/test/java/com/baeldung/rxjava/SubjectTest.java new file mode 100644 index 0000000000..8ff81e7066 --- /dev/null +++ b/rxjava/src/test/java/com/baeldung/rxjava/SubjectTest.java @@ -0,0 +1,17 @@ +package com.baeldung.rxjava; + +import com.baelding.rxjava.SubjectImpl; +import org.junit.Test; + +import static junit.framework.Assert.assertTrue; + +public class SubjectTest { + + @Test + public void givenSubjectAndTwoSubscribers_whenSubscribeOnSubjectAfterLetterC_thenSecondSubscriberBeginsToPrint() throws InterruptedException { + String result = SubjectImpl.subjectMethod(); + String subscribers = SubjectImpl.subscriber1[0] + SubjectImpl.subscriber2[0]; + + assertTrue(subscribers.equals(result)); + } +}