Simplified TestUtil, Fixed MongoDB update issue

This commit is contained in:
Chris Richardson
2016-08-31 13:02:44 -07:00
parent f4e070e7bd
commit 29d42fda9a
4 changed files with 47 additions and 49 deletions

View File

@@ -54,51 +54,26 @@ public class TestUtil {
}
}
public static <T> void eventually(final Producer<T> producer, final Verifier<T> verifier) {
final int n = 150;
Object possibleException = Observable.timer(0, 200, TimeUnit.MILLISECONDS).flatMap(new Func1<Long, Observable<Outcome<T>>>() {
@Override
public Observable<Outcome<T>> call(Long aLong) {
try {
return fromCompletableFuture(producer.produce()).map(new Func1<T, Outcome<T>>() {
@Override
public Outcome<T> call(T t) {
return new Success<T>(t);
}
});
} catch (Exception e) {
Outcome<T> value = new Failure<T>(e);
return Observable.just(value);
}
public static <T> void eventually(Producer<T> producer, Verifier<T> predicate) {
Throwable laste = null;
for (int i = 0; i < 30 ; i++) {
try {
T x = producer.produce().get(30, TimeUnit.SECONDS);
predicate.verify(x);
return;
} catch (Throwable t) {
laste = t;
}
}).map(new Func1<Outcome<T>, Throwable>() {
@Override
public Throwable call(Outcome<T> t) {
try {
if (t instanceof Success) {
verifier.verify(((Success<T>) t).value);
return null;
} else
return ((Failure<T>) t).t;
} catch (Throwable e) {
return e;
}
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}).take(n).zipWith(Observable.range(0, n), new Func2<Throwable, Integer, Tuple2<Throwable, Integer>>() {
@Override
public Tuple2<Throwable, Integer> call(Throwable e, Integer idx) {
return new Tuple2<Throwable, Integer>(e, idx);
}
}).skipWhile(new Func1<Tuple2<Throwable, Integer>, Boolean>() {
@Override
public Boolean call(Tuple2<Throwable, Integer> tuple2) {
return tuple2.first != null && tuple2.second < n - 1;
}
}).first().toBlocking().getIterator().next().first;
if (possibleException != null)
throw new RuntimeException((Throwable)possibleException);
}
if (laste != null)
throw new RuntimeException("Last exception was", laste);
else
throw new RuntimeException("predicate never satisfied");
}
private static <T> Observable<T> fromCompletableFuture(CompletableFuture<T> future) {