diff --git a/RxJava/ReactiveStream/src/main/java/AsyncEx.java b/RxJava/ReactiveStream/src/main/java/_01_basic/AsyncEx.java similarity index 98% rename from RxJava/ReactiveStream/src/main/java/AsyncEx.java rename to RxJava/ReactiveStream/src/main/java/_01_basic/AsyncEx.java index 3d763d66..ce69a248 100644 --- a/RxJava/ReactiveStream/src/main/java/AsyncEx.java +++ b/RxJava/ReactiveStream/src/main/java/_01_basic/AsyncEx.java @@ -1,3 +1,5 @@ +package _01_basic; + import io.reactivex.rxjava3.core.Flowable; import java.util.concurrent.TimeUnit; diff --git a/RxJava/ReactiveStream/src/main/java/ColdPublisherEx.java b/RxJava/ReactiveStream/src/main/java/_01_basic/ColdPublisherEx.java similarity index 95% rename from RxJava/ReactiveStream/src/main/java/ColdPublisherEx.java rename to RxJava/ReactiveStream/src/main/java/_01_basic/ColdPublisherEx.java index 7f56a84b..30128433 100644 --- a/RxJava/ReactiveStream/src/main/java/ColdPublisherEx.java +++ b/RxJava/ReactiveStream/src/main/java/_01_basic/ColdPublisherEx.java @@ -1,3 +1,5 @@ +package _01_basic; + import io.reactivex.rxjava3.core.Flowable; public class ColdPublisherEx { diff --git a/RxJava/ReactiveStream/src/main/java/HotPublisherEx.java b/RxJava/ReactiveStream/src/main/java/_01_basic/HotPublisherEx.java similarity index 96% rename from RxJava/ReactiveStream/src/main/java/HotPublisherEx.java rename to RxJava/ReactiveStream/src/main/java/_01_basic/HotPublisherEx.java index 3d2dd4cd..b4b4ff64 100644 --- a/RxJava/ReactiveStream/src/main/java/HotPublisherEx.java +++ b/RxJava/ReactiveStream/src/main/java/_01_basic/HotPublisherEx.java @@ -1,3 +1,5 @@ +package _01_basic; + import io.reactivex.rxjava3.processors.PublishProcessor; public class HotPublisherEx { diff --git a/RxJava/ReactiveStream/src/main/java/OperatorEx.java b/RxJava/ReactiveStream/src/main/java/_01_basic/OperatorEx.java similarity index 95% rename from RxJava/ReactiveStream/src/main/java/OperatorEx.java rename to RxJava/ReactiveStream/src/main/java/_01_basic/OperatorEx.java index c5497502..7e1aacff 100644 --- a/RxJava/ReactiveStream/src/main/java/OperatorEx.java +++ b/RxJava/ReactiveStream/src/main/java/_01_basic/OperatorEx.java @@ -1,3 +1,5 @@ +package _01_basic; + import io.reactivex.rxjava3.core.Flowable; public class OperatorEx { diff --git a/RxJava/ReactiveStream/src/main/java/_02_backpressure/BackPressureStrategyEx.java b/RxJava/ReactiveStream/src/main/java/_02_backpressure/BackPressureStrategyEx.java new file mode 100644 index 00000000..def9cbcc --- /dev/null +++ b/RxJava/ReactiveStream/src/main/java/_02_backpressure/BackPressureStrategyEx.java @@ -0,0 +1,76 @@ +package _02_backpressure; + +import io.reactivex.rxjava3.core.BackpressureOverflowStrategy; +import io.reactivex.rxjava3.core.Flowable; +import io.reactivex.rxjava3.schedulers.Schedulers; + +import java.util.concurrent.TimeUnit; + +public class BackPressureStrategyEx { + /* + BUFFER : 모든 데이터 버퍼링 + DROP : 새로 생성한 데이터 삭제 + LATEST : 버퍼가 비워질 때 까지 통지 데이터 대기 + ERROR : 버퍼 크기 초과시 MissingBackpressureException + NONE : 특정 처리 수행 X + */ + public static void main(String[] args) throws InterruptedException { + // BUFFER 전략 +// bufferStrategy(); + + // DROP 전략 +// dropStrategy(); + + // LATEST 전략 + latestStrategy(); + + Thread.sleep(3000L); + } + + private static void latestStrategy() { + Flowable.interval(300L, TimeUnit.MILLISECONDS) + .doOnNext(data -> System.out.println("interval = " + data)) + .onBackpressureLatest() + .observeOn(Schedulers.computation(), false, 1) + .subscribe( + data -> { + Thread.sleep(1000L); + System.out.println("data = " + data); + }, + error -> System.out.println("error = " + error) + ); + } + + private static void dropStrategy() { + Flowable.interval(300L, TimeUnit.MILLISECONDS) + .doOnNext(data -> System.out.println("interval = " + data)) + .onBackpressureDrop(drop -> System.out.println("drop = " + drop)) + .observeOn(Schedulers.computation(), false, 1) + .subscribe( + data -> { + Thread.sleep(1000L); + System.out.println("data = " + data); + }, + error -> System.out.println("error = " + error) + ); + } + + private static void bufferStrategy() { + Flowable.interval(300L, TimeUnit.MILLISECONDS) + .doOnNext(data -> System.out.println("interval = " + data)) + .onBackpressureBuffer( + 2, + () -> System.out.println("overflow"), + BackpressureOverflowStrategy.DROP_LATEST // ERROR, DROP_LATEST, DROP_OLDEST + ) + .doOnNext(data -> System.out.println("buffer = " + data)) + .observeOn(Schedulers.computation(), false, 1) + .subscribe( + data -> { + Thread.sleep(1000L); + System.out.println("data = " + data); + }, + error -> System.out.println("error = " + error) + ); + } +} diff --git a/RxJava/ReactiveStream/src/main/java/_02_backpressure/MissingBackpressureExceptionEx.java b/RxJava/ReactiveStream/src/main/java/_02_backpressure/MissingBackpressureExceptionEx.java new file mode 100644 index 00000000..824e9ef4 --- /dev/null +++ b/RxJava/ReactiveStream/src/main/java/_02_backpressure/MissingBackpressureExceptionEx.java @@ -0,0 +1,30 @@ +package _02_backpressure; + +import io.reactivex.rxjava3.core.Flowable; +import io.reactivex.rxjava3.schedulers.Schedulers; + +import java.util.concurrent.TimeUnit; + +public class MissingBackpressureExceptionEx { + + public static void main(String[] args) throws InterruptedException { + /* + 실행시 통지 속도보다 처리 속도가 느리기 때문에 MissingBackpressureException 발생 + 기본 버퍼가 128 이므로 128이상의 통지가 오면 예외발생 + */ + Flowable.interval(1L, TimeUnit.MILLISECONDS) // 1ms 마다 숫자 통지 + .doOnNext(data -> System.out.println("data = " + data)) // 통지된 데이터 출력 + .observeOn(Schedulers.computation()) + .subscribe( + data -> { + System.out.println("처리 대기중"); + Thread.sleep(1000L); // 1초의 지연 + System.out.println("data = " + data); + }, + error -> System.out.println("error = " + error), + () -> System.out.println("completed") + ); + + Thread.sleep(2000L); + } +}