rxjava : flowable, observable

This commit is contained in:
haerong22
2022-03-18 16:41:39 +09:00
parent 5f22036461
commit 548d6636b9
7 changed files with 120 additions and 4 deletions

View File

@@ -1,4 +1,4 @@
package _01_basic;
package _01_start;
import io.reactivex.rxjava3.core.Flowable;

View File

@@ -1,4 +1,4 @@
package _01_basic;
package _01_start;
import io.reactivex.rxjava3.core.Flowable;

View File

@@ -1,4 +1,4 @@
package _01_basic;
package _01_start;
import io.reactivex.rxjava3.processors.PublishProcessor;

View File

@@ -1,4 +1,4 @@
package _01_basic;
package _01_start;
import io.reactivex.rxjava3.core.Flowable;

View File

@@ -0,0 +1,53 @@
package _03_basic;
import io.reactivex.rxjava3.core.BackpressureStrategy;
import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.schedulers.Schedulers;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
public class FlowableEx {
public static void main(String[] args) {
Flowable<String> flowable = Flowable.create(emitter -> {
String[] data = {"Hello, World!", "Hello, RxJava!"};
for (String d : data) {
if (emitter.isCancelled()) {
return;
}
emitter.onNext(d);
}
emitter.onComplete();
}, BackpressureStrategy.BUFFER);
flowable.observeOn(Schedulers.computation())
.subscribe(new Subscriber<String>() {
private Subscription subscription;
@Override
public void onSubscribe(Subscription s) {
this.subscription = s;
this.subscription.request(1L); // 데이터 개수 요청
}
@Override
public void onNext(String s) {
System.out.println(s);
this.subscription.request(1L); // 다음에 받을 데이터 개수 요청
}
@Override
public void onError(Throwable t) {
t.printStackTrace();
}
@Override
public void onComplete() {
System.out.println("complete");
}
});
}
}

View File

@@ -0,0 +1,52 @@
package _03_basic;
import io.reactivex.rxjava3.annotations.NonNull;
import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.core.Observer;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.schedulers.Schedulers;
public class ObservableEx {
public static void main(String[] args) {
/*
observable 은 배압이 없기 때문에 데이터가 생성 될 때마다 바로 통지된다.
*/
Observable<String> observable = Observable.create(emitter -> {
String[] data = {"Hello, World!", "Hello, RxJava!"};
for (String d : data) {
if (emitter.isDisposed()) {
return;
}
emitter.onNext(d);
}
emitter.onComplete();
});
observable.observeOn(Schedulers.computation())
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
// 구독 중 구독을 해지하려면 전달받은 Disposable 을 내부에 보관(dispose 메소드 호출)
}
@Override
public void onNext(@NonNull String s) {
System.out.println(s);
}
@Override
public void onError(@NonNull Throwable e) {
e.printStackTrace();
}
@Override
public void onComplete() {
System.out.println("complete");
}
});
}
}

View File

@@ -0,0 +1,11 @@
### Flowable, Observable
- Flowable : 배압 기능이 있다. 대량 데이터 처리, 네트워크 처리, I/O 처리
- Observable : 배압 기능이 없다. 소량 데이터 처리
### 통지 규칙
- null을 통지하면 안된다.
- 데이터 통지는 Optional이다.
- 완료나 에러 통지를 한 뒤에는 다른 통지를 해서는 안 된다.
- 통지를 할 때는 1건씩 순차적으로 통지해야 하며 동시에 통지하면 안 된다.