From c6b3fddc3104166266d1813810e74d5d236da412 Mon Sep 17 00:00:00 2001 From: haerong22 Date: Fri, 21 Apr 2023 00:46:39 +0900 Subject: [PATCH] #36 rxjava: backpressure --- .../example/ex04/BackpressureBufferEx01.java | 35 +++++++++++++++++++ .../example/ex04/BackpressureBufferEx02.java | 35 +++++++++++++++++++ .../org/example/ex04/BackpressureDropEx.java | 29 +++++++++++++++ .../example/ex04/BackpressureLatestEx.java | 29 +++++++++++++++ .../ex04/MissingBackpressureExceptionEx.java | 29 +++++++++++++++ 5 files changed, 157 insertions(+) create mode 100644 RxJava/practice/src/main/java/org/example/ex04/BackpressureBufferEx01.java create mode 100644 RxJava/practice/src/main/java/org/example/ex04/BackpressureBufferEx02.java create mode 100644 RxJava/practice/src/main/java/org/example/ex04/BackpressureDropEx.java create mode 100644 RxJava/practice/src/main/java/org/example/ex04/BackpressureLatestEx.java create mode 100644 RxJava/practice/src/main/java/org/example/ex04/MissingBackpressureExceptionEx.java diff --git a/RxJava/practice/src/main/java/org/example/ex04/BackpressureBufferEx01.java b/RxJava/practice/src/main/java/org/example/ex04/BackpressureBufferEx01.java new file mode 100644 index 00000000..3a8ad273 --- /dev/null +++ b/RxJava/practice/src/main/java/org/example/ex04/BackpressureBufferEx01.java @@ -0,0 +1,35 @@ +package org.example.ex04; + +import io.reactivex.rxjava3.core.BackpressureOverflowStrategy; +import io.reactivex.rxjava3.core.Flowable; +import io.reactivex.rxjava3.schedulers.Schedulers; +import org.example.utils.LogType; +import org.example.utils.Logger; +import org.example.utils.TimeUtil; + +import java.util.concurrent.TimeUnit; + +public class BackpressureBufferEx01 { + + public static void main(String[] args) { + System.out.println("# start : " + TimeUtil.getCurrentTimeFormatted()); + Flowable.interval(300L, TimeUnit.MILLISECONDS) + .doOnNext(data -> Logger.log("#interval doOnNext()", data)) + .onBackpressureBuffer( + 2, + () -> Logger.log("overflow!"), + BackpressureOverflowStrategy.DROP_LATEST + ) + .doOnNext(data -> Logger.log("#onBackpressureBuffer doOnNext()", data)) + .observeOn(Schedulers.computation(), false, 1) + .subscribe( + data -> { + TimeUtil.sleep(1000L); + Logger.log(LogType.ON_NEXT, data); + }, + error -> Logger.log(LogType.ON_ERROR, error) + ); + + TimeUtil.sleep(2800L); + } +} diff --git a/RxJava/practice/src/main/java/org/example/ex04/BackpressureBufferEx02.java b/RxJava/practice/src/main/java/org/example/ex04/BackpressureBufferEx02.java new file mode 100644 index 00000000..cdefd988 --- /dev/null +++ b/RxJava/practice/src/main/java/org/example/ex04/BackpressureBufferEx02.java @@ -0,0 +1,35 @@ +package org.example.ex04; + +import io.reactivex.rxjava3.core.BackpressureOverflowStrategy; +import io.reactivex.rxjava3.core.Flowable; +import io.reactivex.rxjava3.schedulers.Schedulers; +import org.example.utils.LogType; +import org.example.utils.Logger; +import org.example.utils.TimeUtil; + +import java.util.concurrent.TimeUnit; + +public class BackpressureBufferEx02 { + + public static void main(String[] args) { + System.out.println("# start : " + TimeUtil.getCurrentTimeFormatted()); + Flowable.interval(300L, TimeUnit.MILLISECONDS) + .doOnNext(data -> Logger.log("#interval doOnNext()", data)) + .onBackpressureBuffer( + 2, + () -> Logger.log("overflow!"), + BackpressureOverflowStrategy.DROP_OLDEST + ) + .doOnNext(data -> Logger.log("#onBackpressureBuffer doOnNext()", data)) + .observeOn(Schedulers.computation(), false, 1) + .subscribe( + data -> { + TimeUtil.sleep(1000L); + Logger.log(LogType.ON_NEXT, data); + }, + error -> Logger.log(LogType.ON_ERROR, error) + ); + + TimeUtil.sleep(2800L); + } +} diff --git a/RxJava/practice/src/main/java/org/example/ex04/BackpressureDropEx.java b/RxJava/practice/src/main/java/org/example/ex04/BackpressureDropEx.java new file mode 100644 index 00000000..7d53a833 --- /dev/null +++ b/RxJava/practice/src/main/java/org/example/ex04/BackpressureDropEx.java @@ -0,0 +1,29 @@ +package org.example.ex04; + +import io.reactivex.rxjava3.core.Flowable; +import io.reactivex.rxjava3.schedulers.Schedulers; +import org.example.utils.LogType; +import org.example.utils.Logger; +import org.example.utils.TimeUtil; + +import java.util.concurrent.TimeUnit; + +public class BackpressureDropEx { + + public static void main(String[] args) { + System.out.println("# start : " + TimeUtil.getCurrentTimeFormatted()); + Flowable.interval(300L, TimeUnit.MILLISECONDS) + .doOnNext(data -> Logger.log("#interval doOnNext()", data)) + .onBackpressureDrop(dropData -> Logger.log(LogType.PRINT, dropData + " Drop!")) + .observeOn(Schedulers.computation(), false, 1) + .subscribe( + data -> { + TimeUtil.sleep(1000L); + Logger.log(LogType.ON_NEXT, data); + }, + error -> Logger.log(LogType.ON_ERROR, error) + ); + + TimeUtil.sleep(5500L); + } +} diff --git a/RxJava/practice/src/main/java/org/example/ex04/BackpressureLatestEx.java b/RxJava/practice/src/main/java/org/example/ex04/BackpressureLatestEx.java new file mode 100644 index 00000000..ace1af5b --- /dev/null +++ b/RxJava/practice/src/main/java/org/example/ex04/BackpressureLatestEx.java @@ -0,0 +1,29 @@ +package org.example.ex04; + +import io.reactivex.rxjava3.core.Flowable; +import io.reactivex.rxjava3.schedulers.Schedulers; +import org.example.utils.LogType; +import org.example.utils.Logger; +import org.example.utils.TimeUtil; + +import java.util.concurrent.TimeUnit; + +public class BackpressureLatestEx { + + public static void main(String[] args) { + System.out.println("# start : " + TimeUtil.getCurrentTimeFormatted()); + Flowable.interval(300L, TimeUnit.MILLISECONDS) + .doOnNext(data -> Logger.log("#interval doOnNext()", data)) + .onBackpressureLatest() + .observeOn(Schedulers.computation(), false, 1) + .subscribe( + data -> { + TimeUtil.sleep(1000L); + Logger.log(LogType.ON_NEXT, data); + }, + error -> Logger.log(LogType.ON_ERROR, error) + ); + + TimeUtil.sleep(5500L); + } +} diff --git a/RxJava/practice/src/main/java/org/example/ex04/MissingBackpressureExceptionEx.java b/RxJava/practice/src/main/java/org/example/ex04/MissingBackpressureExceptionEx.java new file mode 100644 index 00000000..70cf587c --- /dev/null +++ b/RxJava/practice/src/main/java/org/example/ex04/MissingBackpressureExceptionEx.java @@ -0,0 +1,29 @@ +package org.example.ex04; + +import io.reactivex.rxjava3.core.Flowable; +import io.reactivex.rxjava3.schedulers.Schedulers; +import org.example.utils.LogType; +import org.example.utils.Logger; +import org.example.utils.TimeUtil; + +import java.util.concurrent.TimeUnit; + +public class MissingBackpressureExceptionEx { + + public static void main(String[] args) throws InterruptedException { + Flowable.interval(1L, TimeUnit.MILLISECONDS) + .doOnNext(data -> Logger.log(LogType.DO_ON_NEXT, data)) + .observeOn(Schedulers.computation()) + .subscribe( + data -> { + Logger.log(LogType.PRINT, "# 소비자 처리 대기 중.."); + TimeUtil.sleep(1000L); + Logger.log(LogType.ON_NEXT, data); + }, + error -> Logger.log(LogType.ON_ERROR, error), + () -> Logger.log(LogType.ON_COMPLETE) + ); + + Thread.sleep(2000L); + } +}