minor formatting work

This commit is contained in:
eugenp
2017-02-11 14:00:52 +02:00
parent 67fcde1009
commit 29c0fd8371
12 changed files with 99 additions and 204 deletions

View File

@@ -18,154 +18,113 @@ public class RxJavaBackpressureTest {
@Test
public void givenColdObservable_shouldNotThrowException() {
//given
// given
TestSubscriber<Integer> testSubscriber = new TestSubscriber<>();
//when
Observable
.range(1, 1_000_000)
.observeOn(Schedulers.computation())
.subscribe(testSubscriber);
// when
Observable.range(1, 1_000_000).observeOn(Schedulers.computation()).subscribe(testSubscriber);
//then
// then
testSubscriber.awaitTerminalEvent();
assertTrue(testSubscriber
.getOnErrorEvents()
.size() == 0);
assertTrue(testSubscriber.getOnErrorEvents().size() == 0);
}
@Test
public void givenHotObservable_whenBackpressureNotDefined_shouldTrowException() {
//given
// given
TestSubscriber<Integer> testSubscriber = new TestSubscriber<>();
PublishSubject<Integer> source = PublishSubject.<Integer> create();
source
.observeOn(Schedulers.computation())
.subscribe(testSubscriber);
source.observeOn(Schedulers.computation()).subscribe(testSubscriber);
//when
IntStream
.range(0, 1_000_000)
.forEach(source::onNext);
// when
IntStream.range(0, 1_000_000).forEach(source::onNext);
//then
// then
testSubscriber.awaitTerminalEvent();
testSubscriber.assertError(MissingBackpressureException.class);
}
@Test
public void givenHotObservable_whenWindowIsDefined_shouldNotThrowException() {
//given
// given
TestSubscriber<Observable<Integer>> testSubscriber = new TestSubscriber<>();
PublishSubject<Integer> source = PublishSubject.<Integer> create();
//when
source
.window(500)
.observeOn(Schedulers.computation())
.subscribe(testSubscriber);
// when
source.window(500).observeOn(Schedulers.computation()).subscribe(testSubscriber);
IntStream
.range(0, 1_000)
.forEach(source::onNext);
IntStream.range(0, 1_000).forEach(source::onNext);
//then
// then
testSubscriber.awaitTerminalEvent(2, TimeUnit.SECONDS);
assertTrue(testSubscriber
.getOnErrorEvents()
.size() == 0);
assertTrue(testSubscriber.getOnErrorEvents().size() == 0);
}
@Test
public void givenHotObservable_whenBufferIsDefined_shouldNotThrowException() {
//given
// given
TestSubscriber<List<Integer>> testSubscriber = new TestSubscriber<>();
PublishSubject<Integer> source = PublishSubject.<Integer> create();
//when
source
.buffer(1024)
.observeOn(Schedulers.computation())
.subscribe(testSubscriber);
// when
source.buffer(1024).observeOn(Schedulers.computation()).subscribe(testSubscriber);
IntStream
.range(0, 1_000)
.forEach(source::onNext);
IntStream.range(0, 1_000).forEach(source::onNext);
//then
// then
testSubscriber.awaitTerminalEvent(2, TimeUnit.SECONDS);
assertTrue(testSubscriber
.getOnErrorEvents()
.size() == 0);
assertTrue(testSubscriber.getOnErrorEvents().size() == 0);
}
@Test
public void givenHotObservable_whenSkippingOperationIsDefined_shouldNotThrowException() {
//given
// given
TestSubscriber<Integer> testSubscriber = new TestSubscriber<>();
PublishSubject<Integer> source = PublishSubject.<Integer> create();
//when
// when
source.sample(100, TimeUnit.MILLISECONDS)
// .throttleFirst(100, TimeUnit.MILLISECONDS)
.observeOn(Schedulers.computation())
.subscribe(testSubscriber);
// .throttleFirst(100, TimeUnit.MILLISECONDS)
.observeOn(Schedulers.computation()).subscribe(testSubscriber);
IntStream
.range(0, 1_000)
.forEach(source::onNext);
IntStream.range(0, 1_000).forEach(source::onNext);
//then
// then
testSubscriber.awaitTerminalEvent(2, TimeUnit.SECONDS);
assertTrue(testSubscriber
.getOnErrorEvents()
.size() == 0);
assertTrue(testSubscriber.getOnErrorEvents().size() == 0);
}
@Test
public void givenHotObservable_whenOnBackpressureBufferDefined_shouldNotThrowException() {
//given
// given
TestSubscriber<Integer> testSubscriber = new TestSubscriber<>();
//when
Observable
.range(1, 1_000_000)
.onBackpressureBuffer(16, () -> {}, BackpressureOverflow.ON_OVERFLOW_DROP_OLDEST)
.observeOn(Schedulers.computation())
.subscribe(testSubscriber);
// when
Observable.range(1, 1_000_000).onBackpressureBuffer(16, () -> {
}, BackpressureOverflow.ON_OVERFLOW_DROP_OLDEST).observeOn(Schedulers.computation()).subscribe(testSubscriber);
//then
// then
testSubscriber.awaitTerminalEvent(2, TimeUnit.SECONDS);
assertTrue(testSubscriber
.getOnErrorEvents()
.size() == 0);
assertTrue(testSubscriber.getOnErrorEvents().size() == 0);
}
@Test
public void givenHotObservable_whenOnBackpressureDropDefined_shouldNotThrowException() {
//given
// given
TestSubscriber<Integer> testSubscriber = new TestSubscriber<>();
//when
Observable
.range(1, 1_000_000)
.onBackpressureDrop()
.observeOn(Schedulers.computation())
.subscribe(testSubscriber);
// when
Observable.range(1, 1_000_000).onBackpressureDrop().observeOn(Schedulers.computation()).subscribe(testSubscriber);
//then
// then
testSubscriber.awaitTerminalEvent(2, TimeUnit.SECONDS);
assertTrue(testSubscriber
.getOnErrorEvents()
.size() == 0);
assertTrue(testSubscriber.getOnErrorEvents().size() == 0);
}
}

View File

@@ -13,22 +13,18 @@ import java.util.concurrent.TimeUnit;
import static org.hamcrest.Matchers.*;
import static org.junit.Assert.assertThat;
public class RxJavaTesting {
@Test
public void givenObservable_whenZip_shouldAssertBlockingInASameThread() {
//given
// given
List<String> letters = Arrays.asList("A", "B", "C", "D", "E");
List<String> results = new ArrayList<>();
Observable<String> observable = Observable
.from(letters)
.zipWith(Observable.range(1, Integer.MAX_VALUE),
(string, index) -> index + "-" + string);
Observable<String> observable = Observable.from(letters).zipWith(Observable.range(1, Integer.MAX_VALUE), (string, index) -> index + "-" + string);
//when
// when
observable.subscribe(results::add);
//then
// then
assertThat(results, notNullValue());
assertThat(results, hasSize(5));
assertThat(results, hasItems("1-A", "2-B", "3-C", "4-D", "5-E"));
@@ -36,19 +32,16 @@ public class RxJavaTesting {
@Test
public void givenObservable_whenZip_shouldAssertOnTestSubscriber() {
//given
// given
List<String> letters = Arrays.asList("A", "B", "C", "D", "E");
TestSubscriber<String> subscriber = new TestSubscriber<>();
Observable<String> observable = Observable
.from(letters)
.zipWith(Observable.range(1, Integer.MAX_VALUE),
((string, index) -> index + "-" + string));
Observable<String> observable = Observable.from(letters).zipWith(Observable.range(1, Integer.MAX_VALUE), ((string, index) -> index + "-" + string));
//when
// when
observable.subscribe(subscriber);
//then
// then
subscriber.assertCompleted();
subscriber.assertNoErrors();
subscriber.assertValueCount(5);
@@ -57,52 +50,45 @@ public class RxJavaTesting {
@Test
public void givenTestObserver_whenExceptionWasThrowsOnObservable_observerShouldGetError() {
//given
// given
List<String> letters = Arrays.asList("A", "B", "C", "D", "E");
TestSubscriber<String> subscriber = new TestSubscriber<>();
Observable<String> observable = Observable.from(letters).zipWith(Observable.range(1, Integer.MAX_VALUE), ((string, index) -> index + "-" + string)).concatWith(Observable.error(new RuntimeException("error in Observable")));
Observable<String> observable = Observable
.from(letters)
.zipWith(Observable.range(1, Integer.MAX_VALUE),
((string, index) -> index + "-" + string))
.concatWith(Observable.error(new RuntimeException("error in Observable")));
//when
// when
observable.subscribe(subscriber);
//then
// then
subscriber.assertError(RuntimeException.class);
subscriber.assertNotCompleted();
}
@Test
public void givenObservableThatEmitsEventPerSecond_whenUseAdvanceByTime_shouldEmitEventPerSecond() {
//given
// given
List<String> letters = Arrays.asList("A", "B", "C", "D", "E");
TestScheduler scheduler = new TestScheduler();
TestSubscriber<String> subscriber = new TestSubscriber<>();
Observable<Long> tick = Observable.interval(1, TimeUnit.SECONDS, scheduler);
Observable<String> observable = Observable.from(letters)
.zipWith(tick, (string, index) -> index + "-" + string);
Observable<String> observable = Observable.from(letters).zipWith(tick, (string, index) -> index + "-" + string);
observable.subscribeOn(scheduler)
.subscribe(subscriber);
observable.subscribeOn(scheduler).subscribe(subscriber);
//expect
// expect
subscriber.assertNoValues();
subscriber.assertNotCompleted();
//when
// when
scheduler.advanceTimeBy(1, TimeUnit.SECONDS);
//then
// then
subscriber.assertNoErrors();
subscriber.assertValueCount(1);
subscriber.assertValues("0-A");
//when
// when
scheduler.advanceTimeTo(6, TimeUnit.SECONDS);
subscriber.assertCompleted();
subscriber.assertNoErrors();
@@ -110,4 +96,3 @@ public class RxJavaTesting {
assertThat(subscriber.getOnNextEvents(), hasItems("0-A", "1-B", "2-C", "3-D", "4-E"));
}
}