rxjava : backpressure

This commit is contained in:
haerong22
2022-03-18 16:03:46 +09:00
parent 09f76d51d6
commit 5f22036461
6 changed files with 114 additions and 0 deletions

View File

@@ -1,3 +1,5 @@
package _01_basic;
import io.reactivex.rxjava3.core.Flowable;
import java.util.concurrent.TimeUnit;

View File

@@ -1,3 +1,5 @@
package _01_basic;
import io.reactivex.rxjava3.core.Flowable;
public class ColdPublisherEx {

View File

@@ -1,3 +1,5 @@
package _01_basic;
import io.reactivex.rxjava3.processors.PublishProcessor;
public class HotPublisherEx {

View File

@@ -1,3 +1,5 @@
package _01_basic;
import io.reactivex.rxjava3.core.Flowable;
public class OperatorEx {

View File

@@ -0,0 +1,76 @@
package _02_backpressure;
import io.reactivex.rxjava3.core.BackpressureOverflowStrategy;
import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.schedulers.Schedulers;
import java.util.concurrent.TimeUnit;
public class BackPressureStrategyEx {
/*
BUFFER : 모든 데이터 버퍼링
DROP : 새로 생성한 데이터 삭제
LATEST : 버퍼가 비워질 때 까지 통지 데이터 대기
ERROR : 버퍼 크기 초과시 MissingBackpressureException
NONE : 특정 처리 수행 X
*/
public static void main(String[] args) throws InterruptedException {
// BUFFER 전략
// bufferStrategy();
// DROP 전략
// dropStrategy();
// LATEST 전략
latestStrategy();
Thread.sleep(3000L);
}
private static void latestStrategy() {
Flowable.interval(300L, TimeUnit.MILLISECONDS)
.doOnNext(data -> System.out.println("interval = " + data))
.onBackpressureLatest()
.observeOn(Schedulers.computation(), false, 1)
.subscribe(
data -> {
Thread.sleep(1000L);
System.out.println("data = " + data);
},
error -> System.out.println("error = " + error)
);
}
private static void dropStrategy() {
Flowable.interval(300L, TimeUnit.MILLISECONDS)
.doOnNext(data -> System.out.println("interval = " + data))
.onBackpressureDrop(drop -> System.out.println("drop = " + drop))
.observeOn(Schedulers.computation(), false, 1)
.subscribe(
data -> {
Thread.sleep(1000L);
System.out.println("data = " + data);
},
error -> System.out.println("error = " + error)
);
}
private static void bufferStrategy() {
Flowable.interval(300L, TimeUnit.MILLISECONDS)
.doOnNext(data -> System.out.println("interval = " + data))
.onBackpressureBuffer(
2,
() -> System.out.println("overflow"),
BackpressureOverflowStrategy.DROP_LATEST // ERROR, DROP_LATEST, DROP_OLDEST
)
.doOnNext(data -> System.out.println("buffer = " + data))
.observeOn(Schedulers.computation(), false, 1)
.subscribe(
data -> {
Thread.sleep(1000L);
System.out.println("data = " + data);
},
error -> System.out.println("error = " + error)
);
}
}

View File

@@ -0,0 +1,30 @@
package _02_backpressure;
import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.schedulers.Schedulers;
import java.util.concurrent.TimeUnit;
public class MissingBackpressureExceptionEx {
public static void main(String[] args) throws InterruptedException {
/*
실행시 통지 속도보다 처리 속도가 느리기 때문에 MissingBackpressureException 발생
기본 버퍼가 128 이므로 128이상의 통지가 오면 예외발생
*/
Flowable.interval(1L, TimeUnit.MILLISECONDS) // 1ms 마다 숫자 통지
.doOnNext(data -> System.out.println("data = " + data)) // 통지된 데이터 출력
.observeOn(Schedulers.computation())
.subscribe(
data -> {
System.out.println("처리 대기중");
Thread.sleep(1000L); // 1초의 지연
System.out.println("data = " + data);
},
error -> System.out.println("error = " + error),
() -> System.out.println("completed")
);
Thread.sleep(2000L);
}
}