#36 rxjava: flowable, observable

This commit is contained in:
haerong22
2023-04-22 18:31:49 +09:00
parent c6b3fddc31
commit 001089a034
4 changed files with 188 additions and 0 deletions

View File

@@ -0,0 +1,63 @@
package org.example.ex04;
import io.reactivex.rxjava3.core.BackpressureStrategy;
import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.core.FlowableEmitter;
import io.reactivex.rxjava3.core.FlowableOnSubscribe;
import io.reactivex.rxjava3.schedulers.Schedulers;
import org.example.utils.LogType;
import org.example.utils.Logger;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
public class HelloRxJavaFlowableCreateEx {
public static void main(String[] args) throws InterruptedException {
Flowable<String> flowable =
Flowable.create(new FlowableOnSubscribe<String>() {
@Override
public void subscribe(FlowableEmitter<String> emitter) throws Exception {
String[] datas = {"Hello", "RxJava!"};
for(String data : datas) {
// 구독이 해지되면 처리 중단
if (emitter.isCancelled())
return;
// 데이터 통지
emitter.onNext(data);
}
// 데이터 통지 완료를 알린다
emitter.onComplete();
}
}, BackpressureStrategy.BUFFER); // 구독자의 처리가 늦을 경우 데이터를 버퍼에 담아두는 설정.
flowable.observeOn(Schedulers.computation())
.subscribe(new Subscriber<String>() {
// 데이터 개수 요청 및 구독을 취소하기 위한 Subscription 객체
private Subscription subscription;
@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
this.subscription.request(Long.MAX_VALUE);
}
@Override
public void onNext(String data) {
Logger.log(LogType.ON_NEXT, data);
}
@Override
public void onError(Throwable error) {
Logger.log(LogType.ON_ERROR, error);
}
@Override
public void onComplete() {
Logger.log(LogType.ON_COMPLETE);
}
});
Thread.sleep(500L);
}
}

View File

@@ -0,0 +1,38 @@
package org.example.ex04;
import io.reactivex.rxjava3.core.BackpressureStrategy;
import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.schedulers.Schedulers;
import org.example.utils.LogType;
import org.example.utils.Logger;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
public class HelloRxJavaFlowableCreateLambdaEx {
public static void main(String[] args) throws InterruptedException {
Flowable<String> flowable =
Flowable.create(emitter -> {
String[] datas = {"Hello", "RxJava!"};
for(String data : datas) {
// 구독이 해지되면 처리 중단
if (emitter.isCancelled())
return;
// 데이터 발행
emitter.onNext(data);
}
// 데이터 발행 완료를 알린다
emitter.onComplete();
}, BackpressureStrategy.BUFFER);
flowable.observeOn(Schedulers.computation())
.subscribe(
data -> Logger.log(LogType.ON_NEXT, data),
error -> Logger.log(LogType.ON_ERROR, error),
() -> Logger.log(LogType.ON_COMPLETE)
);
Thread.sleep(500L);
}
}

View File

@@ -0,0 +1,54 @@
package org.example.ex04;
import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.core.ObservableEmitter;
import io.reactivex.rxjava3.core.ObservableOnSubscribe;
import io.reactivex.rxjava3.core.Observer;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.schedulers.Schedulers;
import org.example.utils.LogType;
import org.example.utils.Logger;
public class HelloRxJavaObservableCreateEx {
public static void main(String[] args) throws InterruptedException {
Observable<String> observable =
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
String[] datas = {"Hello", "RxJava!"};
for (String data : datas) {
if (emitter.isDisposed())
return;
emitter.onNext(data);
}
emitter.onComplete();
}
});
observable.observeOn(Schedulers.computation())
.subscribe(new Observer<>() {
@Override
public void onSubscribe(Disposable disposable) {
// 아무 처리도 하지 않음.
}
@Override
public void onNext(String data) {
Logger.log(LogType.ON_NEXT, data);
}
@Override
public void onError(Throwable error) {
Logger.log(LogType.ON_ERROR, error);
}
@Override
public void onComplete() {
Logger.log(LogType.ON_COMPLETE);
}
});
Thread.sleep(500L);
}
}

View File

@@ -0,0 +1,33 @@
package org.example.ex04;
import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.core.Observer;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.schedulers.Schedulers;
import org.example.utils.LogType;
import org.example.utils.Logger;
public class HelloRxJavaObservableCreateLambdaEx {
public static void main(String[] args) throws InterruptedException {
Observable<String> observable =
Observable.create(emitter -> {
String[] datas = {"Hello", "RxJava!"};
for (String data : datas) {
if (emitter.isDisposed())
return;
emitter.onNext(data);
}
emitter.onComplete();
});
observable.observeOn(Schedulers.computation())
.subscribe(
data -> Logger.log(LogType.ON_NEXT, data),
error -> Logger.log(LogType.ON_NEXT, error),
() -> Logger.log(LogType.DO_ON_COMPLETE)
);
Thread.sleep(500L);
}
}