diff --git a/RxJava/ReactiveStream/src/main/java/_01_basic/AsyncEx.java b/RxJava/ReactiveStream/src/main/java/_01_start/AsyncEx.java similarity index 98% rename from RxJava/ReactiveStream/src/main/java/_01_basic/AsyncEx.java rename to RxJava/ReactiveStream/src/main/java/_01_start/AsyncEx.java index ce69a248..bec81aea 100644 --- a/RxJava/ReactiveStream/src/main/java/_01_basic/AsyncEx.java +++ b/RxJava/ReactiveStream/src/main/java/_01_start/AsyncEx.java @@ -1,4 +1,4 @@ -package _01_basic; +package _01_start; import io.reactivex.rxjava3.core.Flowable; diff --git a/RxJava/ReactiveStream/src/main/java/_01_basic/ColdPublisherEx.java b/RxJava/ReactiveStream/src/main/java/_01_start/ColdPublisherEx.java similarity index 95% rename from RxJava/ReactiveStream/src/main/java/_01_basic/ColdPublisherEx.java rename to RxJava/ReactiveStream/src/main/java/_01_start/ColdPublisherEx.java index 30128433..85b6d22e 100644 --- a/RxJava/ReactiveStream/src/main/java/_01_basic/ColdPublisherEx.java +++ b/RxJava/ReactiveStream/src/main/java/_01_start/ColdPublisherEx.java @@ -1,4 +1,4 @@ -package _01_basic; +package _01_start; import io.reactivex.rxjava3.core.Flowable; diff --git a/RxJava/ReactiveStream/src/main/java/_01_basic/HotPublisherEx.java b/RxJava/ReactiveStream/src/main/java/_01_start/HotPublisherEx.java similarity index 96% rename from RxJava/ReactiveStream/src/main/java/_01_basic/HotPublisherEx.java rename to RxJava/ReactiveStream/src/main/java/_01_start/HotPublisherEx.java index b4b4ff64..e478a047 100644 --- a/RxJava/ReactiveStream/src/main/java/_01_basic/HotPublisherEx.java +++ b/RxJava/ReactiveStream/src/main/java/_01_start/HotPublisherEx.java @@ -1,4 +1,4 @@ -package _01_basic; +package _01_start; import io.reactivex.rxjava3.processors.PublishProcessor; diff --git a/RxJava/ReactiveStream/src/main/java/_01_basic/OperatorEx.java b/RxJava/ReactiveStream/src/main/java/_01_start/OperatorEx.java similarity index 95% rename from RxJava/ReactiveStream/src/main/java/_01_basic/OperatorEx.java rename to RxJava/ReactiveStream/src/main/java/_01_start/OperatorEx.java index 7e1aacff..1a971134 100644 --- a/RxJava/ReactiveStream/src/main/java/_01_basic/OperatorEx.java +++ b/RxJava/ReactiveStream/src/main/java/_01_start/OperatorEx.java @@ -1,4 +1,4 @@ -package _01_basic; +package _01_start; import io.reactivex.rxjava3.core.Flowable; diff --git a/RxJava/ReactiveStream/src/main/java/_03_basic/FlowableEx.java b/RxJava/ReactiveStream/src/main/java/_03_basic/FlowableEx.java new file mode 100644 index 00000000..e59889d5 --- /dev/null +++ b/RxJava/ReactiveStream/src/main/java/_03_basic/FlowableEx.java @@ -0,0 +1,53 @@ +package _03_basic; + +import io.reactivex.rxjava3.core.BackpressureStrategy; +import io.reactivex.rxjava3.core.Flowable; +import io.reactivex.rxjava3.schedulers.Schedulers; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; + +public class FlowableEx { + + public static void main(String[] args) { + + Flowable flowable = Flowable.create(emitter -> { + String[] data = {"Hello, World!", "Hello, RxJava!"}; + + for (String d : data) { + if (emitter.isCancelled()) { + return; + } + emitter.onNext(d); + } + emitter.onComplete(); + }, BackpressureStrategy.BUFFER); + + flowable.observeOn(Schedulers.computation()) + .subscribe(new Subscriber() { + + private Subscription subscription; + + @Override + public void onSubscribe(Subscription s) { + this.subscription = s; + this.subscription.request(1L); // 데이터 개수 요청 + } + + @Override + public void onNext(String s) { + System.out.println(s); + this.subscription.request(1L); // 다음에 받을 데이터 개수 요청 + } + + @Override + public void onError(Throwable t) { + t.printStackTrace(); + } + + @Override + public void onComplete() { + System.out.println("complete"); + } + }); + } +} diff --git a/RxJava/ReactiveStream/src/main/java/_03_basic/ObservableEx.java b/RxJava/ReactiveStream/src/main/java/_03_basic/ObservableEx.java new file mode 100644 index 00000000..8c15c6ab --- /dev/null +++ b/RxJava/ReactiveStream/src/main/java/_03_basic/ObservableEx.java @@ -0,0 +1,52 @@ +package _03_basic; + +import io.reactivex.rxjava3.annotations.NonNull; +import io.reactivex.rxjava3.core.Observable; +import io.reactivex.rxjava3.core.Observer; +import io.reactivex.rxjava3.disposables.Disposable; +import io.reactivex.rxjava3.schedulers.Schedulers; + +public class ObservableEx { + + public static void main(String[] args) { + + /* + observable 은 배압이 없기 때문에 데이터가 생성 될 때마다 바로 통지된다. + */ + Observable observable = Observable.create(emitter -> { + String[] data = {"Hello, World!", "Hello, RxJava!"}; + + for (String d : data) { + if (emitter.isDisposed()) { + return; + } + emitter.onNext(d); + } + + emitter.onComplete(); + }); + + observable.observeOn(Schedulers.computation()) + .subscribe(new Observer() { + @Override + public void onSubscribe(@NonNull Disposable d) { + // 구독 중 구독을 해지하려면 전달받은 Disposable 을 내부에 보관(dispose 메소드 호출) + } + + @Override + public void onNext(@NonNull String s) { + System.out.println(s); + } + + @Override + public void onError(@NonNull Throwable e) { + e.printStackTrace(); + } + + @Override + public void onComplete() { + System.out.println("complete"); + } + }); + } +} diff --git a/RxJava/ReactiveStream/src/main/java/_03_basic/basic.md b/RxJava/ReactiveStream/src/main/java/_03_basic/basic.md new file mode 100644 index 00000000..a55168da --- /dev/null +++ b/RxJava/ReactiveStream/src/main/java/_03_basic/basic.md @@ -0,0 +1,11 @@ +### Flowable, Observable + +- Flowable : 배압 기능이 있다. 대량 데이터 처리, 네트워크 처리, I/O 처리 +- Observable : 배압 기능이 없다. 소량 데이터 처리 + +### 통지 규칙 + +- null을 통지하면 안된다. +- 데이터 통지는 Optional이다. +- 완료나 에러 통지를 한 뒤에는 다른 통지를 해서는 안 된다. +- 통지를 할 때는 1건씩 순차적으로 통지해야 하며 동시에 통지하면 안 된다. \ No newline at end of file