diff --git a/rxjava/src/test/java/com/baeldung/rxjava/RxJavaBackpressureTest.java b/rxjava/src/test/java/com/baeldung/rxjava/RxJavaBackpressureTest.java index 30357e7a27..33f94a9c6f 100644 --- a/rxjava/src/test/java/com/baeldung/rxjava/RxJavaBackpressureTest.java +++ b/rxjava/src/test/java/com/baeldung/rxjava/RxJavaBackpressureTest.java @@ -10,6 +10,7 @@ import rx.subjects.PublishSubject; import java.util.List; import java.util.concurrent.TimeUnit; +import java.util.stream.IntStream; import static org.junit.Assert.assertTrue; @@ -21,13 +22,16 @@ public class RxJavaBackpressureTest { TestSubscriber testSubscriber = new TestSubscriber<>(); //when - Observable.range(1, 1_000_000) - .observeOn(Schedulers.computation()) - .subscribe(testSubscriber); + Observable + .range(1, 1_000_000) + .observeOn(Schedulers.computation()) + .subscribe(testSubscriber); //then testSubscriber.awaitTerminalEvent(); - assertTrue(testSubscriber.getOnErrorEvents().size() == 0); + assertTrue(testSubscriber + .getOnErrorEvents() + .size() == 0); } @@ -35,16 +39,16 @@ public class RxJavaBackpressureTest { public void givenHotObservable_whenBackpressureNotDefined_shouldTrowException() { //given TestSubscriber testSubscriber = new TestSubscriber<>(); - PublishSubject source = PublishSubject.create(); + PublishSubject source = PublishSubject. create(); - source.observeOn(Schedulers.computation()) - .subscribe(testSubscriber); + source + .observeOn(Schedulers.computation()) + .subscribe(testSubscriber); //when - for (int i = 0; i < 1_000_000; i++) { - source.onNext(i); - - } + IntStream + .range(0, 1_000_000) + .forEach(source::onNext); //then testSubscriber.awaitTerminalEvent(); @@ -55,20 +59,23 @@ public class RxJavaBackpressureTest { public void givenHotObservable_whenWindowIsDefined_shouldNotThrowException() { //given TestSubscriber> testSubscriber = new TestSubscriber<>(); - PublishSubject source = PublishSubject.create(); + PublishSubject source = PublishSubject. create(); //when - source.window(500) - .observeOn(Schedulers.computation()) - .subscribe(testSubscriber); + source + .window(500) + .observeOn(Schedulers.computation()) + .subscribe(testSubscriber); - for (int i = 0; i < 1_000; i++) { - source.onNext(i); - } + IntStream + .range(0, 1_000) + .forEach(source::onNext); //then testSubscriber.awaitTerminalEvent(2, TimeUnit.SECONDS); - assertTrue(testSubscriber.getOnErrorEvents().size() == 0); + assertTrue(testSubscriber + .getOnErrorEvents() + .size() == 0); } @@ -76,43 +83,49 @@ public class RxJavaBackpressureTest { public void givenHotObservable_whenBufferIsDefined_shouldNotThrowException() { //given TestSubscriber> testSubscriber = new TestSubscriber<>(); - PublishSubject source = PublishSubject.create(); + PublishSubject source = PublishSubject. create(); //when - source.buffer(1024) - .observeOn(Schedulers.computation()) - .subscribe(testSubscriber); + source + .buffer(1024) + .observeOn(Schedulers.computation()) + .subscribe(testSubscriber); + + IntStream + .range(0, 1_000) + .forEach(source::onNext); - for (int i = 0; i < 1_000; i++) { - source.onNext(i); - } //then testSubscriber.awaitTerminalEvent(2, TimeUnit.SECONDS); - assertTrue(testSubscriber.getOnErrorEvents().size() == 0); + assertTrue(testSubscriber + .getOnErrorEvents() + .size() == 0); } - @Test public void givenHotObservable_whenSkippingOperationIsDefined_shouldNotThrowException() { //given TestSubscriber testSubscriber = new TestSubscriber<>(); - PublishSubject source = PublishSubject.create(); + PublishSubject source = PublishSubject. create(); //when source.sample(100, TimeUnit.MILLISECONDS) -// .throttleFirst(100, TimeUnit.MILLISECONDS) - .observeOn(Schedulers.computation()) - .subscribe(testSubscriber); + // .throttleFirst(100, TimeUnit.MILLISECONDS) + .observeOn(Schedulers.computation()) + .subscribe(testSubscriber); + + IntStream + .range(0, 1_000) + .forEach(source::onNext); - for (int i = 0; i < 1_000; i++) { - source.onNext(i); - } //then testSubscriber.awaitTerminalEvent(2, TimeUnit.SECONDS); - assertTrue(testSubscriber.getOnErrorEvents().size() == 0); + assertTrue(testSubscriber + .getOnErrorEvents() + .size() == 0); } @@ -122,34 +135,37 @@ public class RxJavaBackpressureTest { TestSubscriber testSubscriber = new TestSubscriber<>(); //when - Observable.range(1, 1_000_000) - .onBackpressureBuffer(16, () -> { - }, - BackpressureOverflow.ON_OVERFLOW_DROP_OLDEST) - .observeOn(Schedulers.computation()) - .subscribe(testSubscriber); + Observable + .range(1, 1_000_000) + .onBackpressureBuffer(16, () -> {}, BackpressureOverflow.ON_OVERFLOW_DROP_OLDEST) + .observeOn(Schedulers.computation()) + .subscribe(testSubscriber); //then testSubscriber.awaitTerminalEvent(2, TimeUnit.SECONDS); - assertTrue(testSubscriber.getOnErrorEvents().size() == 0); + assertTrue(testSubscriber + .getOnErrorEvents() + .size() == 0); } - @Test public void givenHotObservable_whenOnBackpressureDropDefined_shouldNotThrowException() { //given TestSubscriber testSubscriber = new TestSubscriber<>(); //when - Observable.range(1, 1_000_000) - .onBackpressureDrop() - .observeOn(Schedulers.computation()) - .subscribe(testSubscriber); + Observable + .range(1, 1_000_000) + .onBackpressureDrop() + .observeOn(Schedulers.computation()) + .subscribe(testSubscriber); //then testSubscriber.awaitTerminalEvent(2, TimeUnit.SECONDS); - assertTrue(testSubscriber.getOnErrorEvents().size() == 0); + assertTrue(testSubscriber + .getOnErrorEvents() + .size() == 0); } }