Revert "BAEL-4134"

This commit is contained in:
Loredana Crusoveanu
2020-07-07 14:18:10 +03:00
committed by GitHub
parent dffa1f64e6
commit 485b4e3e99
2477 changed files with 9477 additions and 547819 deletions

View File

@@ -0,0 +1,23 @@
package com.baeldung.java9.currentmethod;
import org.junit.Test;
import java.util.Optional;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
public class CurrentExecutingMethodUnitTest {
@Test
public void givenJava9_whenWalkingTheStack_thenFindMethod() {
StackWalker walker = StackWalker.getInstance();
Optional<String> methodName = walker.walk(frames -> frames
.findFirst()
.map(StackWalker.StackFrame::getMethodName)
);
assertTrue(methodName.isPresent());
assertEquals("givenJava9_whenWalkingTheStack_thenFindMethod", methodName.get());
}
}

View File

@@ -0,0 +1,71 @@
package com.baeldung.java9.streams.reactive.flowvsrx;
import java.util.concurrent.Executors;
import java.util.concurrent.Flow;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.SubmissionPublisher;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
public class FlowApiLiveVideo {
static class VideoPlayer implements Flow.Subscriber<VideoFrame> {
Flow.Subscription subscription = null;
private long consumerDelay = 30;
public VideoPlayer(long consumerDelay) {
this.consumerDelay = consumerDelay;
}
@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
subscription.request(1);
}
@Override
public void onNext(VideoFrame item) {
try {
Thread.sleep(consumerDelay);
} catch (InterruptedException e) {
e.printStackTrace();
}
subscription.request(1);
}
@Override
public void onError(Throwable throwable) {
}
@Override
public void onComplete() {
}
}
static class VideoStreamServer extends SubmissionPublisher<VideoFrame> {
ScheduledExecutorService executor = null;
public VideoStreamServer(int bufferSize) {
super(Executors.newSingleThreadExecutor(), bufferSize);
executor = Executors.newScheduledThreadPool(1);
}
void startStreaming(long produceDelay, Runnable onDrop) {
AtomicLong frameNumber = new AtomicLong();
executor.scheduleWithFixedDelay(() -> {
offer(new VideoFrame(frameNumber.getAndIncrement()), (subscriber, videoFrame) -> {
subscriber.onError(new RuntimeException("Frame#" + videoFrame.getNumber() + " dropped because of back pressure"));
onDrop.run();
return true;
});
}, 0, produceDelay, TimeUnit.MILLISECONDS);
}
}
public static void streamLiveVideo(long produceDelay, long consumeDelay, int bufferSize, Runnable onError){
FlowApiLiveVideo.VideoStreamServer streamServer = new FlowApiLiveVideo.VideoStreamServer(bufferSize);
streamServer.subscribe(new FlowApiLiveVideo.VideoPlayer(consumeDelay));
streamServer.startStreaming(produceDelay, onError);
}
}

View File

@@ -0,0 +1,61 @@
package com.baeldung.java9.streams.reactive.flowvsrx;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.junit.Test;
import org.junit.jupiter.api.Assertions;
import static org.awaitility.Awaitility.await;
public class LiveVideoFlowVsRxUnitTest {
private final static long SLOW_CONSUMER_DELAY = 30;
private final static long FAST_CONSUMER_DELAY = 1;
private final static long PRODUCER_DELAY = 1;
private final static int BUFFER_SIZE = 10;
private final static long AWAIT = 1000;
@Test
public void givenSlowVideoPlayer_whenSubscribedToFlowApiLiveVideo_thenExpectErrorOnBackPressure() {
AtomicLong errors = new AtomicLong();
FlowApiLiveVideo.streamLiveVideo(PRODUCER_DELAY, SLOW_CONSUMER_DELAY, BUFFER_SIZE, errors::incrementAndGet);
await()
.atMost(AWAIT, TimeUnit.MILLISECONDS)
.untilAsserted(() -> Assertions.assertTrue(errors.get() > 0));
}
@Test
public void givenFastVideoPlayer_whenSubscribedToFlowApiLiveVideo_thenExpectNoErrorOnBackPressure() throws InterruptedException {
AtomicLong errors = new AtomicLong();
FlowApiLiveVideo.streamLiveVideo(PRODUCER_DELAY, FAST_CONSUMER_DELAY, BUFFER_SIZE, errors::incrementAndGet);
Thread.sleep(AWAIT);
Assertions.assertEquals(0, errors.get());
}
@Test
public void givenSlowVideoPlayer_whenSubscribedToRxJavaLiveVideo_thenExpectErrorOnBackPressure() {
AtomicLong errors = new AtomicLong();
RxJavaLiveVideo.streamLiveVideo(PRODUCER_DELAY, SLOW_CONSUMER_DELAY, BUFFER_SIZE, errors::incrementAndGet);
await()
.atMost(AWAIT, TimeUnit.MILLISECONDS)
.untilAsserted(() -> Assertions.assertTrue(errors.get() > 0));
}
@Test
public void givenFastVideoPlayer_whenSubscribedToRxJavaLiveVideo_thenExpectNoErrorOnBackPressure() throws InterruptedException {
AtomicLong errors = new AtomicLong();
RxJavaLiveVideo.streamLiveVideo(PRODUCER_DELAY, FAST_CONSUMER_DELAY, BUFFER_SIZE, errors::incrementAndGet);
Thread.sleep(AWAIT);
Assertions.assertEquals(0, errors.get());
}
}

View File

@@ -0,0 +1,36 @@
package com.baeldung.java9.streams.reactive.flowvsrx;
import io.reactivex.rxjava3.core.BackpressureOverflowStrategy;
import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.schedulers.Schedulers;
import java.util.concurrent.Executors;
import java.util.stream.Stream;
public class RxJavaLiveVideo {
public static Disposable streamLiveVideo(long produceDelay, long consumeDelay, int bufferSize, Runnable onError) {
return Flowable
.fromStream(Stream.iterate(new VideoFrame(0), videoFrame -> {
sleep(produceDelay);
return new VideoFrame(videoFrame.getNumber() + 1);
}))
.subscribeOn(Schedulers.from(Executors.newSingleThreadScheduledExecutor()), true)
.onBackpressureBuffer(bufferSize, null, BackpressureOverflowStrategy.ERROR)
.observeOn(Schedulers.from(Executors.newSingleThreadExecutor()))
.subscribe(item -> {
sleep(consumeDelay);
}, throwable -> {
onError.run();
});
}
private static void sleep(long i) {
try {
Thread.sleep(i);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

View File

@@ -0,0 +1,13 @@
package com.baeldung.java9.streams.reactive.flowvsrx;
class VideoFrame {
private long number;
public VideoFrame(long number) {
this.number = number;
}
public long getNumber() {
return number;
}
}