diff --git a/rxjava-2/src/test/java/com/baeldung/rxjava/RxJavaHooksUnitTest.java b/rxjava-2/src/test/java/com/baeldung/rxjava/RxJavaHooksUnitTest.java new file mode 100644 index 0000000000..e31838448b --- /dev/null +++ b/rxjava-2/src/test/java/com/baeldung/rxjava/RxJavaHooksUnitTest.java @@ -0,0 +1,329 @@ +package com.baeldung.rxjava; + +import org.junit.Test; + +import io.reactivex.Completable; +import io.reactivex.Flowable; +import io.reactivex.Maybe; +import io.reactivex.Observable; +import io.reactivex.Single; +import io.reactivex.flowables.ConnectableFlowable; +import io.reactivex.observables.ConnectableObservable; +import io.reactivex.plugins.RxJavaPlugins; +import io.reactivex.schedulers.Schedulers; + +public class RxJavaHooksUnitTest { + + @Test + public void givenCompletable_whenAssembled_shouldExecuteTheHook() { + try { + RxJavaPlugins.setOnCompletableAssembly(completable -> { + System.out.println("Assembling Completable"); + return completable; + }); + Completable.fromSingle(Single.just(1)); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void givenCompletable_whenSubscribed_shouldExecuteTheHook() { + try { + RxJavaPlugins.setOnCompletableSubscribe((completable, observer) -> { + System.out.println("Subscribing to Completable"); + return observer; + }); + + Completable.fromSingle(Single.just(1)) + .test(true); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void givenObservable_whenAssembled_shouldExecuteTheHook() { + try { + RxJavaPlugins.setOnObservableAssembly(observable -> { + System.out.println("Assembling Observable"); + return observable; + }); + + Observable.range(1, 10); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void givenObservable_whenSubscribed_shouldExecuteTheHook() { + try { + RxJavaPlugins.setOnObservableSubscribe((observable, observer) -> { + System.out.println("Suscribing to Observable"); + return observer; + }); + + Observable.range(1, 10) + .test(true); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void givenConnectableObservable_whenAssembled_shouldExecuteTheHook() { + try { + RxJavaPlugins.setOnConnectableObservableAssembly(connectableObservable -> { + System.out.println("Assembling ConnectableObservable"); + return connectableObservable; + }); + + ConnectableObservable.range(1, 10) + .publish() + .connect(); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void givenFlowable_whenAssembled_shouldExecuteTheHook() { + try { + RxJavaPlugins.setOnFlowableAssembly(flowable -> { + System.out.println("Assembling Flowable"); + return flowable; + }); + + Flowable.range(1, 10); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void givenFlowable_whenSubscribed_shouldExecuteTheHook() { + try { + RxJavaPlugins.setOnFlowableSubscribe((flowable, observer) -> { + System.out.println("Suscribing to Flowable"); + return observer; + }); + + Flowable.range(1, 10) + .test(); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void givenConnectableFlowable_whenAssembled_shouldExecuteTheHook() { + try { + RxJavaPlugins.setOnConnectableFlowableAssembly(connectableFlowable -> { + System.out.println("Assembling ConnectableFlowable"); + return connectableFlowable; + }); + + ConnectableFlowable.range(1, 10) + .publish() + .connect(); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void givenParallel_whenAssembled_shouldExecuteTheHook() { + try { + RxJavaPlugins.setOnParallelAssembly(parallelFlowable -> { + System.out.println("Assembling ParallelFlowable"); + return parallelFlowable; + }); + + Flowable.range(1, 10) + .parallel(); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void givenMaybe_whenAssembled_shouldExecuteTheHook() { + try { + RxJavaPlugins.setOnMaybeAssembly(maybe -> { + System.out.println("Assembling Maybe"); + return maybe; + }); + + Maybe.just(1); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void givenMaybe_whenSubscribed_shouldExecuteTheHook() { + try { + RxJavaPlugins.setOnMaybeSubscribe((maybe, observer) -> { + System.out.println("Suscribing to Maybe"); + return observer; + }); + + Maybe.just(1) + .test(); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void givenSingle_whenAssembled_shouldExecuteTheHook() { + try { + RxJavaPlugins.setOnSingleAssembly(single -> { + System.out.println("Assembling Single"); + return single; + }); + + Single.just(1); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void givenSingle_whenSubscribed_shouldExecuteTheHook() { + + try { + RxJavaPlugins.setOnSingleSubscribe((single, observer) -> { + System.out.println("Suscribing to Single"); + return observer; + }); + + Single.just(1) + .test(); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void givenAnyScheduler_whenCalled_shouldExecuteTheHook() { + try { + RxJavaPlugins.setScheduleHandler((runnable) -> { + System.out.println("Executing Scheduler"); + return runnable; + }); + + Observable.range(1, 10) + .map(v -> v * 2) + .subscribeOn(Schedulers.single()) + .test(); + + Observable.range(1, 10) + .map(v -> v * 2) + .subscribeOn(Schedulers.computation()) + .test(); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void givenComputationScheduler_whenCalled_shouldExecuteTheHooks() { + try { + RxJavaPlugins.setInitComputationSchedulerHandler((scheduler) -> { + System.out.println("Initializing Computation Scheduler"); + return scheduler.call(); + }); + RxJavaPlugins.setComputationSchedulerHandler((scheduler) -> { + System.out.println("Executing Computation Scheduler"); + return scheduler; + }); + + Observable.range(1, 10) + .map(v -> v * 2) + .subscribeOn(Schedulers.computation()) + .test(); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void givenIOScheduler_whenCalled_shouldExecuteTheHooks() { + try { + RxJavaPlugins.setInitIoSchedulerHandler((scheduler) -> { + System.out.println("Initializing IO Scheduler"); + return scheduler.call(); + }); + + RxJavaPlugins.setIoSchedulerHandler((scheduler) -> { + System.out.println("Executing IO Scheduler"); + return scheduler; + }); + + Observable.range(1, 10) + .map(v -> v * 2) + .subscribeOn(Schedulers.io()) + .test(); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void givenNewThreadScheduler_whenCalled_shouldExecuteTheHook() { + try { + RxJavaPlugins.setInitNewThreadSchedulerHandler((scheduler) -> { + System.out.println("Initializing newThread Scheduler"); + return scheduler.call(); + }); + + RxJavaPlugins.setNewThreadSchedulerHandler((scheduler) -> { + System.out.println("Executing newThread Scheduler"); + return scheduler; + }); + + Observable.range(1, 15) + .map(v -> v * 2) + .subscribeOn(Schedulers.newThread()) + .test(); + + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void givenSingleScheduler_whenCalled_shouldExecuteTheHooks() { + try { + RxJavaPlugins.setInitSingleSchedulerHandler((scheduler) -> { + System.out.println("Initializing Single Scheduler"); + return scheduler.call(); + }); + + RxJavaPlugins.setSingleSchedulerHandler((scheduler) -> { + System.out.println("Executing Single Scheduler"); + return scheduler; + }); + + Observable.range(1, 10) + .map(v -> v * 2) + .subscribeOn(Schedulers.single()) + .test(); + + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void givenObservable_whenError_shouldExecuteTheHook() { + RxJavaPlugins.setErrorHandler(throwable -> { + System.out.println("Handling error" + throwable.getCause()); + }); + + Observable.error(new IllegalStateException()) + .subscribe(); + } +}