#36 rxjava: backpressure

This commit is contained in:
haerong22
2023-04-21 00:46:39 +09:00
parent eeb0f86c6b
commit c6b3fddc31
5 changed files with 157 additions and 0 deletions

View File

@@ -0,0 +1,35 @@
package org.example.ex04;
import io.reactivex.rxjava3.core.BackpressureOverflowStrategy;
import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.schedulers.Schedulers;
import org.example.utils.LogType;
import org.example.utils.Logger;
import org.example.utils.TimeUtil;
import java.util.concurrent.TimeUnit;
public class BackpressureBufferEx01 {
public static void main(String[] args) {
System.out.println("# start : " + TimeUtil.getCurrentTimeFormatted());
Flowable.interval(300L, TimeUnit.MILLISECONDS)
.doOnNext(data -> Logger.log("#interval doOnNext()", data))
.onBackpressureBuffer(
2,
() -> Logger.log("overflow!"),
BackpressureOverflowStrategy.DROP_LATEST
)
.doOnNext(data -> Logger.log("#onBackpressureBuffer doOnNext()", data))
.observeOn(Schedulers.computation(), false, 1)
.subscribe(
data -> {
TimeUtil.sleep(1000L);
Logger.log(LogType.ON_NEXT, data);
},
error -> Logger.log(LogType.ON_ERROR, error)
);
TimeUtil.sleep(2800L);
}
}

View File

@@ -0,0 +1,35 @@
package org.example.ex04;
import io.reactivex.rxjava3.core.BackpressureOverflowStrategy;
import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.schedulers.Schedulers;
import org.example.utils.LogType;
import org.example.utils.Logger;
import org.example.utils.TimeUtil;
import java.util.concurrent.TimeUnit;
public class BackpressureBufferEx02 {
public static void main(String[] args) {
System.out.println("# start : " + TimeUtil.getCurrentTimeFormatted());
Flowable.interval(300L, TimeUnit.MILLISECONDS)
.doOnNext(data -> Logger.log("#interval doOnNext()", data))
.onBackpressureBuffer(
2,
() -> Logger.log("overflow!"),
BackpressureOverflowStrategy.DROP_OLDEST
)
.doOnNext(data -> Logger.log("#onBackpressureBuffer doOnNext()", data))
.observeOn(Schedulers.computation(), false, 1)
.subscribe(
data -> {
TimeUtil.sleep(1000L);
Logger.log(LogType.ON_NEXT, data);
},
error -> Logger.log(LogType.ON_ERROR, error)
);
TimeUtil.sleep(2800L);
}
}

View File

@@ -0,0 +1,29 @@
package org.example.ex04;
import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.schedulers.Schedulers;
import org.example.utils.LogType;
import org.example.utils.Logger;
import org.example.utils.TimeUtil;
import java.util.concurrent.TimeUnit;
public class BackpressureDropEx {
public static void main(String[] args) {
System.out.println("# start : " + TimeUtil.getCurrentTimeFormatted());
Flowable.interval(300L, TimeUnit.MILLISECONDS)
.doOnNext(data -> Logger.log("#interval doOnNext()", data))
.onBackpressureDrop(dropData -> Logger.log(LogType.PRINT, dropData + " Drop!"))
.observeOn(Schedulers.computation(), false, 1)
.subscribe(
data -> {
TimeUtil.sleep(1000L);
Logger.log(LogType.ON_NEXT, data);
},
error -> Logger.log(LogType.ON_ERROR, error)
);
TimeUtil.sleep(5500L);
}
}

View File

@@ -0,0 +1,29 @@
package org.example.ex04;
import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.schedulers.Schedulers;
import org.example.utils.LogType;
import org.example.utils.Logger;
import org.example.utils.TimeUtil;
import java.util.concurrent.TimeUnit;
public class BackpressureLatestEx {
public static void main(String[] args) {
System.out.println("# start : " + TimeUtil.getCurrentTimeFormatted());
Flowable.interval(300L, TimeUnit.MILLISECONDS)
.doOnNext(data -> Logger.log("#interval doOnNext()", data))
.onBackpressureLatest()
.observeOn(Schedulers.computation(), false, 1)
.subscribe(
data -> {
TimeUtil.sleep(1000L);
Logger.log(LogType.ON_NEXT, data);
},
error -> Logger.log(LogType.ON_ERROR, error)
);
TimeUtil.sleep(5500L);
}
}

View File

@@ -0,0 +1,29 @@
package org.example.ex04;
import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.schedulers.Schedulers;
import org.example.utils.LogType;
import org.example.utils.Logger;
import org.example.utils.TimeUtil;
import java.util.concurrent.TimeUnit;
public class MissingBackpressureExceptionEx {
public static void main(String[] args) throws InterruptedException {
Flowable.interval(1L, TimeUnit.MILLISECONDS)
.doOnNext(data -> Logger.log(LogType.DO_ON_NEXT, data))
.observeOn(Schedulers.computation())
.subscribe(
data -> {
Logger.log(LogType.PRINT, "# 소비자 처리 대기 중..");
TimeUtil.sleep(1000L);
Logger.log(LogType.ON_NEXT, data);
},
error -> Logger.log(LogType.ON_ERROR, error),
() -> Logger.log(LogType.ON_COMPLETE)
);
Thread.sleep(2000L);
}
}