diff --git a/RxJava/practice/src/main/java/org/example/ex04/HelloRxJavaFlowableCreateEx.java b/RxJava/practice/src/main/java/org/example/ex04/HelloRxJavaFlowableCreateEx.java new file mode 100644 index 00000000..1263f6a6 --- /dev/null +++ b/RxJava/practice/src/main/java/org/example/ex04/HelloRxJavaFlowableCreateEx.java @@ -0,0 +1,63 @@ +package org.example.ex04; + +import io.reactivex.rxjava3.core.BackpressureStrategy; +import io.reactivex.rxjava3.core.Flowable; +import io.reactivex.rxjava3.core.FlowableEmitter; +import io.reactivex.rxjava3.core.FlowableOnSubscribe; +import io.reactivex.rxjava3.schedulers.Schedulers; +import org.example.utils.LogType; +import org.example.utils.Logger; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; + +public class HelloRxJavaFlowableCreateEx { + public static void main(String[] args) throws InterruptedException { + Flowable flowable = + Flowable.create(new FlowableOnSubscribe() { + @Override + public void subscribe(FlowableEmitter emitter) throws Exception { + String[] datas = {"Hello", "RxJava!"}; + for(String data : datas) { + // 구독이 해지되면 처리 중단 + if (emitter.isCancelled()) + return; + + // 데이터 통지 + emitter.onNext(data); + } + + // 데이터 통지 완료를 알린다 + emitter.onComplete(); + } + }, BackpressureStrategy.BUFFER); // 구독자의 처리가 늦을 경우 데이터를 버퍼에 담아두는 설정. + + flowable.observeOn(Schedulers.computation()) + .subscribe(new Subscriber() { + // 데이터 개수 요청 및 구독을 취소하기 위한 Subscription 객체 + private Subscription subscription; + + @Override + public void onSubscribe(Subscription subscription) { + this.subscription = subscription; + this.subscription.request(Long.MAX_VALUE); + } + + @Override + public void onNext(String data) { + Logger.log(LogType.ON_NEXT, data); + } + + @Override + public void onError(Throwable error) { + Logger.log(LogType.ON_ERROR, error); + } + + @Override + public void onComplete() { + Logger.log(LogType.ON_COMPLETE); + } + }); + + Thread.sleep(500L); + } +} \ No newline at end of file diff --git a/RxJava/practice/src/main/java/org/example/ex04/HelloRxJavaFlowableCreateLambdaEx.java b/RxJava/practice/src/main/java/org/example/ex04/HelloRxJavaFlowableCreateLambdaEx.java new file mode 100644 index 00000000..8ca14eac --- /dev/null +++ b/RxJava/practice/src/main/java/org/example/ex04/HelloRxJavaFlowableCreateLambdaEx.java @@ -0,0 +1,38 @@ +package org.example.ex04; + +import io.reactivex.rxjava3.core.BackpressureStrategy; +import io.reactivex.rxjava3.core.Flowable; +import io.reactivex.rxjava3.schedulers.Schedulers; +import org.example.utils.LogType; +import org.example.utils.Logger; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; + +public class HelloRxJavaFlowableCreateLambdaEx { + public static void main(String[] args) throws InterruptedException { + Flowable flowable = + Flowable.create(emitter -> { + String[] datas = {"Hello", "RxJava!"}; + for(String data : datas) { + // 구독이 해지되면 처리 중단 + if (emitter.isCancelled()) + return; + + // 데이터 발행 + emitter.onNext(data); + } + + // 데이터 발행 완료를 알린다 + emitter.onComplete(); + }, BackpressureStrategy.BUFFER); + + flowable.observeOn(Schedulers.computation()) + .subscribe( + data -> Logger.log(LogType.ON_NEXT, data), + error -> Logger.log(LogType.ON_ERROR, error), + () -> Logger.log(LogType.ON_COMPLETE) + ); + + Thread.sleep(500L); + } +} \ No newline at end of file diff --git a/RxJava/practice/src/main/java/org/example/ex04/HelloRxJavaObservableCreateEx.java b/RxJava/practice/src/main/java/org/example/ex04/HelloRxJavaObservableCreateEx.java new file mode 100644 index 00000000..8f385783 --- /dev/null +++ b/RxJava/practice/src/main/java/org/example/ex04/HelloRxJavaObservableCreateEx.java @@ -0,0 +1,54 @@ +package org.example.ex04; + +import io.reactivex.rxjava3.core.Observable; +import io.reactivex.rxjava3.core.ObservableEmitter; +import io.reactivex.rxjava3.core.ObservableOnSubscribe; +import io.reactivex.rxjava3.core.Observer; +import io.reactivex.rxjava3.disposables.Disposable; +import io.reactivex.rxjava3.schedulers.Schedulers; +import org.example.utils.LogType; +import org.example.utils.Logger; + +public class HelloRxJavaObservableCreateEx { + public static void main(String[] args) throws InterruptedException { + Observable observable = + Observable.create(new ObservableOnSubscribe() { + @Override + public void subscribe(ObservableEmitter emitter) throws Exception { + String[] datas = {"Hello", "RxJava!"}; + for (String data : datas) { + if (emitter.isDisposed()) + return; + + emitter.onNext(data); + } + emitter.onComplete(); + } + }); + + observable.observeOn(Schedulers.computation()) + .subscribe(new Observer<>() { + @Override + public void onSubscribe(Disposable disposable) { + // 아무 처리도 하지 않음. + } + + @Override + public void onNext(String data) { + Logger.log(LogType.ON_NEXT, data); + } + + @Override + public void onError(Throwable error) { + Logger.log(LogType.ON_ERROR, error); + } + + @Override + public void onComplete() { + Logger.log(LogType.ON_COMPLETE); + } + }); + + Thread.sleep(500L); + } +} \ No newline at end of file diff --git a/RxJava/practice/src/main/java/org/example/ex04/HelloRxJavaObservableCreateLambdaEx.java b/RxJava/practice/src/main/java/org/example/ex04/HelloRxJavaObservableCreateLambdaEx.java new file mode 100644 index 00000000..d84ebaa1 --- /dev/null +++ b/RxJava/practice/src/main/java/org/example/ex04/HelloRxJavaObservableCreateLambdaEx.java @@ -0,0 +1,33 @@ +package org.example.ex04; + +import io.reactivex.rxjava3.core.Observable; +import io.reactivex.rxjava3.core.Observer; +import io.reactivex.rxjava3.disposables.Disposable; +import io.reactivex.rxjava3.schedulers.Schedulers; +import org.example.utils.LogType; +import org.example.utils.Logger; + +public class HelloRxJavaObservableCreateLambdaEx { + public static void main(String[] args) throws InterruptedException { + Observable observable = + Observable.create(emitter -> { + String[] datas = {"Hello", "RxJava!"}; + for (String data : datas) { + if (emitter.isDisposed()) + return; + + emitter.onNext(data); + } + emitter.onComplete(); + }); + + observable.observeOn(Schedulers.computation()) + .subscribe( + data -> Logger.log(LogType.ON_NEXT, data), + error -> Logger.log(LogType.ON_NEXT, error), + () -> Logger.log(LogType.DO_ON_COMPLETE) + ); + + Thread.sleep(500L); + } +} \ No newline at end of file