From 2201a8d9d99759d56b787bcc997fe5c3bcc627ef Mon Sep 17 00:00:00 2001 From: myluckagain Date: Sat, 7 Jul 2018 16:00:37 +0500 Subject: [PATCH] bael-1874 (#4574) * bael-1874 * bael-1874 fix * bael-1874 fix1 --- .../rxjava/MultipleSubscribersColdObs.java | 59 +++++++ .../rxjava/MultipleSubscribersHotObs.java | 152 ++++++++++++++++++ 2 files changed, 211 insertions(+) create mode 100644 rxjava/src/main/java/com/baeldung/rxjava/MultipleSubscribersColdObs.java create mode 100644 rxjava/src/main/java/com/baeldung/rxjava/MultipleSubscribersHotObs.java diff --git a/rxjava/src/main/java/com/baeldung/rxjava/MultipleSubscribersColdObs.java b/rxjava/src/main/java/com/baeldung/rxjava/MultipleSubscribersColdObs.java new file mode 100644 index 0000000000..1d3e1b3f8a --- /dev/null +++ b/rxjava/src/main/java/com/baeldung/rxjava/MultipleSubscribersColdObs.java @@ -0,0 +1,59 @@ +package com.baeldung.rxjava; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import rx.Observable; +import rx.Subscription; +import rx.observables.ConnectableObservable; +import rx.subscriptions.Subscriptions; + +public class MultipleSubscribersColdObs { + private static final Logger LOGGER = LoggerFactory.getLogger(MultipleSubscribersColdObs.class); + + public static void main(String[] args) throws InterruptedException { + defaultBehaviour(); + // subscribeBeforeConnect(); + + } + + private static void defaultBehaviour() { + Observable obs = getObservable(); + + LOGGER.info("Subscribing"); + Subscription s1 = obs.subscribe(i -> LOGGER.info("subscriber#1 is printing " + i)); + Subscription s2 = obs.subscribe(i -> LOGGER.info("subscriber#2 is printing " + i)); + + s1.unsubscribe(); + s2.unsubscribe(); + } + + private static void subscribeBeforeConnect() throws InterruptedException { + ConnectableObservable obs = getObservable().publish(); + + LOGGER.info("Subscribing"); + obs.subscribe(i -> LOGGER.info("subscriber #1 is printing " + i)); + obs.subscribe(i -> LOGGER.info("subscriber #2 is printing " + i)); + Thread.sleep(1000); + LOGGER.info("Connecting"); + Subscription s = obs.connect(); + s.unsubscribe(); + + } + + private static Observable getObservable() { + return Observable.create(subscriber -> { + subscriber.onNext(gettingValue(1)); + subscriber.onNext(gettingValue(2)); + + subscriber.add(Subscriptions.create(() -> { + LOGGER.info("Clear resources"); + })); + }); + } + + private static Integer gettingValue(int i) { + LOGGER.info("Getting " + i); + return i; + } +} diff --git a/rxjava/src/main/java/com/baeldung/rxjava/MultipleSubscribersHotObs.java b/rxjava/src/main/java/com/baeldung/rxjava/MultipleSubscribersHotObs.java new file mode 100644 index 0000000000..686c230003 --- /dev/null +++ b/rxjava/src/main/java/com/baeldung/rxjava/MultipleSubscribersHotObs.java @@ -0,0 +1,152 @@ +package com.baeldung.rxjava; + +import java.awt.Color; +import java.awt.Dimension; +import java.awt.event.MouseAdapter; +import java.awt.event.MouseEvent; +import java.awt.event.MouseListener; +import java.lang.reflect.InvocationTargetException; + +import javax.swing.JFrame; +import rx.Observable; +import rx.Subscription; +import rx.observables.ConnectableObservable; +import rx.subscriptions.Subscriptions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class MultipleSubscribersHotObs { + private static final Logger LOGGER = LoggerFactory.getLogger(MultipleSubscribersHotObs.class); + private static JFrame frame; + + public static void main(String[] args) throws InterruptedException, InvocationTargetException { + + javax.swing.SwingUtilities.invokeAndWait(new Runnable() { + public void run() { + createAndShowGUI(); + } + }); + + defaultBehaviour(); + // subscribeBeforeConnect(); + // connectBeforeSubscribe(); + // autoConnectAndSubscribe(); + // refCountAndSubscribe(); + } + + private static void createAndShowGUI() { + frame = new JFrame("Hot Observable Demo"); + frame.setDefaultCloseOperation(JFrame.EXIT_ON_CLOSE); + frame.getContentPane().setBackground(Color.GRAY); + frame.setPreferredSize(new Dimension(500, 500)); + frame.pack(); + frame.setVisible(true); + } + + public static void defaultBehaviour() throws InterruptedException { + Observable obs = getObservable(); + + LOGGER.info("subscribing #1"); + Subscription subscription1 = obs.subscribe((i) -> LOGGER.info("subscriber#1 is printing x-coordinate " + i)); + + Thread.sleep(1000); + LOGGER.info("subscribing #2"); + Subscription subscription2 = obs.subscribe((i) -> LOGGER.info("subscriber#2 is printing x-coordinate " + i)); + Thread.sleep(1000); + LOGGER.info("unsubscribe#1"); + subscription1.unsubscribe(); + Thread.sleep(1000); + LOGGER.info("unsubscribe#2"); + subscription2.unsubscribe(); + } + + public static void subscribeBeforeConnect() throws InterruptedException { + + ConnectableObservable obs = getObservable().publish(); + + LOGGER.info("subscribing #1"); + Subscription subscription1 = obs.subscribe((i) -> LOGGER.info("subscriber#1 is printing x-coordinate " + i)); + Thread.sleep(1000); + LOGGER.info("subscribing #2"); + Subscription subscription2 = obs.subscribe((i) -> LOGGER.info("subscriber#2 is printing x-coordinate " + i)); + Thread.sleep(1000); + LOGGER.info("connecting:"); + Subscription s = obs.connect(); + Thread.sleep(1000); + LOGGER.info("unsubscribe connected"); + s.unsubscribe(); + + } + + public static void connectBeforeSubscribe() throws InterruptedException { + + ConnectableObservable obs = getObservable().doOnNext(x -> LOGGER.info("saving " + x)).publish(); + LOGGER.info("connecting:"); + Subscription s = obs.connect(); + Thread.sleep(1000); + LOGGER.info("subscribing #1"); + obs.subscribe((i) -> LOGGER.info("subscriber#1 is printing x-coordinate " + i)); + Thread.sleep(1000); + LOGGER.info("subscribing #2"); + obs.subscribe((i) -> LOGGER.info("subscriber#2 is printing x-coordinate " + i)); + Thread.sleep(1000); + s.unsubscribe(); + + } + + public static void autoConnectAndSubscribe() throws InterruptedException { + Observable obs = getObservable().doOnNext(x -> LOGGER.info("saving " + x)).publish().autoConnect(); + + LOGGER.info("autoconnect()"); + Thread.sleep(1000); + LOGGER.info("subscribing #1"); + Subscription s1 = obs.subscribe((i) -> LOGGER.info("subscriber#1 is printing x-coordinate " + i)); + Thread.sleep(1000); + LOGGER.info("subscribing #2"); + Subscription s2 = obs.subscribe((i) -> LOGGER.info("subscriber#2 is printing x-coordinate " + i)); + + Thread.sleep(1000); + LOGGER.info("unsubscribe 1"); + s1.unsubscribe(); + Thread.sleep(1000); + LOGGER.info("unsubscribe 2"); + s2.unsubscribe(); + } + + public static void refCountAndSubscribe() throws InterruptedException { + Observable obs = getObservable().doOnNext(x -> LOGGER.info("saving " + x)).publish().refCount(); + + LOGGER.info("refcount()"); + Thread.sleep(1000); + LOGGER.info("subscribing #1"); + Subscription subscription1 = obs.subscribe((i) -> LOGGER.info("subscriber#1 is printing x-coordinate " + i)); + Thread.sleep(1000); + LOGGER.info("subscribing #2"); + Subscription subscription2 = obs.subscribe((i) -> LOGGER.info("subscriber#2 is printing x-coordinate " + i)); + + Thread.sleep(1000); + LOGGER.info("unsubscribe#1"); + subscription1.unsubscribe(); + Thread.sleep(1000); + LOGGER.info("unsubscribe#2"); + subscription2.unsubscribe(); + + } + + private static Observable getObservable() { + return Observable.create(subscriber -> { + frame.addMouseListener(new MouseAdapter() { + @Override + public void mouseClicked(MouseEvent e) { + subscriber.onNext(e.getX()); + } + }); + subscriber.add(Subscriptions.create(() -> { + LOGGER.info("Clear resources"); + for (MouseListener listener : frame.getListeners(MouseListener.class)) { + frame.removeMouseListener(listener); + } + })); + }); + } +}