#BAEL-16633 split Core Java 9 module - improved features

This commit is contained in:
Alessio Stalla
2019-10-29 15:30:01 +01:00
parent 83b5897387
commit fabf0993f6
24 changed files with 95 additions and 37 deletions

View File

@@ -0,0 +1,23 @@
package com.baeldung.java9.language;
public interface PrivateInterface {
private static String staticPrivate() {
return "static private";
}
private String instancePrivate() {
return "instance private";
}
public default void check() {
String result = staticPrivate();
if (!result.equals("static private"))
throw new AssertionError("Incorrect result for static private interface method");
PrivateInterface pvt = new PrivateInterface() {
};
result = pvt.instancePrivate();
if (!result.equals("instance private"))
throw new AssertionError("Incorrect result for instance private interface method");
}
}

View File

@@ -0,0 +1,82 @@
package com.baeldung.java9.reactive;
import java.util.ArrayList;
import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.Flow.Subscription;
public class BaeldungBatchSubscriberImpl<T> implements Subscriber<String> {
private Subscription subscription;
private boolean completed = false;
private int counter;
private ArrayList<String> buffer;
public static final int BUFFER_SIZE = 5;
public BaeldungBatchSubscriberImpl() {
buffer = new ArrayList<String>();
}
public boolean isCompleted() {
return completed;
}
public void setCompleted(boolean completed) {
this.completed = completed;
}
public int getCounter() {
return counter;
}
public void setCounter(int counter) {
this.counter = counter;
}
@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
subscription.request(BUFFER_SIZE);
}
@Override
public void onNext(String item) {
buffer.add(item);
// if buffer is full, process the items.
if (buffer.size() >= BUFFER_SIZE) {
processBuffer();
}
//request more items.
subscription.request(1);
}
private void processBuffer() {
if (buffer.isEmpty())
return;
// Process all items in the buffer. Here, we just print it and sleep for 1 second.
System.out.print("Processed items: ");
buffer.stream()
.forEach(item -> {
System.out.print(" " + item);
});
System.out.println();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
counter = counter + buffer.size();
buffer.clear();
}
@Override
public void onError(Throwable t) {
t.printStackTrace();
}
@Override
public void onComplete() {
completed = true;
// process any remaining items in buffer before
processBuffer();
subscription.cancel();
}
}

View File

@@ -0,0 +1,55 @@
package com.baeldung.java9.reactive;
import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.Flow.Subscription;
public class BaeldungSubscriberImpl<T> implements Subscriber<String> {
private Subscription subscription;
private boolean completed = false;
private int counter;
public boolean isCompleted() {
return completed;
}
public void setCompleted(boolean completed) {
this.completed = completed;
}
public int getCounter() {
return counter;
}
public void setCounter(int counter) {
this.counter = counter;
}
@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
subscription.request(1);
}
@Override
public void onNext(String item) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
counter++;
System.out.println("Processed item : " + item);
subscription.request(1);
}
@Override
public void onError(Throwable t) {
t.printStackTrace();
}
@Override
public void onComplete() {
completed = true;
subscription.cancel();
}
}

View File

@@ -0,0 +1,75 @@
package com.baeldung.java9.reactive;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.SubmissionPublisher;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Stopwatch;
public class BaeldungBatchSubscriberImplIntegrationTest {
private static final int ITEM_SIZE = 10;
private SubmissionPublisher<String> publisher;
private BaeldungBatchSubscriberImpl<String> subscriber;
@Before
public void initialize() {
this.publisher = new SubmissionPublisher<String>(ForkJoinPool.commonPool(), 6);
this.subscriber = new BaeldungBatchSubscriberImpl<String>();
publisher.subscribe(subscriber);
}
@Rule
public Stopwatch stopwatch = new Stopwatch() {
};
@Test
public void testReactiveStreamCount() {
IntStream.range(0, ITEM_SIZE)
.forEach(item -> publisher.submit(item + ""));
publisher.close();
do {
// wait for subscribers to complete all processing.
try {
Thread.sleep(100);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
} while (!subscriber.isCompleted());
int count = subscriber.getCounter();
assertEquals(ITEM_SIZE, count);
}
@Test
public void testReactiveStreamTime() {
IntStream.range(0, ITEM_SIZE)
.forEach(item -> publisher.submit(item + ""));
publisher.close();
do {
// wait for subscribers to complete all processing.
try {
Thread.sleep(100);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
} while (!subscriber.isCompleted());
// The runtime in seconds should be equal to the number of items in each batch.
assertTrue(stopwatch.runtime(TimeUnit.SECONDS) >= (ITEM_SIZE / subscriber.BUFFER_SIZE));
}
}

View File

@@ -0,0 +1,100 @@
package com.baeldung.java9.reactive;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.SubmissionPublisher;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Stopwatch;
public class BaeldungSubscriberImplIntegrationTest {
private static final int ITEM_SIZE = 10;
private SubmissionPublisher<String> publisher;
private BaeldungSubscriberImpl<String> subscriber;
@Before
public void initialize() {
// create Publisher with max buffer capacity 3.
this.publisher = new SubmissionPublisher<String>(ForkJoinPool.commonPool(), 3);
this.subscriber = new BaeldungSubscriberImpl<String>();
publisher.subscribe(subscriber);
}
@Rule
public Stopwatch stopwatch = new Stopwatch() {
};
@Test
public void testReactiveStreamCount() {
IntStream.range(0, ITEM_SIZE)
.forEach(item -> publisher.submit(item + ""));
publisher.close();
do {
// wait for subscribers to complete all processing.
try {
Thread.sleep(100);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
} while (!subscriber.isCompleted());
int count = subscriber.getCounter();
assertEquals(ITEM_SIZE, count);
}
@Test
public void testReactiveStreamTime() {
IntStream.range(0, ITEM_SIZE)
.forEach(item -> publisher.submit(item + ""));
publisher.close();
do {
// wait for subscribers to complete all processing.
try {
Thread.sleep(100);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
} while (!subscriber.isCompleted());
// The runtime in seconds should be equal to the number of items.
assertTrue(stopwatch.runtime(TimeUnit.SECONDS) >= ITEM_SIZE);
}
@Test
public void testReactiveStreamOffer() {
IntStream.range(0, ITEM_SIZE)
.forEach(item -> publisher.offer(item + "", (subscriber, string) -> {
// Returning false means this item will be dropped (no retry), if blocked.
return false;
}));
publisher.close();
do {
// wait for subscribers to complete all processing.
try {
Thread.sleep(100);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
} while (!subscriber.isCompleted());
int count = subscriber.getCounter();
// Because 10 items were offered and the buffer capacity was 3, few items will not be processed.
assertTrue(ITEM_SIZE > count);
}
}

View File

@@ -0,0 +1,16 @@
package com.baeldung.java9.stackwalker;
import org.junit.Test;
public class StackWalkerDemoUnitTest {
@Test
public void giveStalkWalker_whenWalkingTheStack_thenShowStackFrames() {
new StackWalkerDemo().methodOne();
}
@Test
public void giveStalkWalker_whenInvokingFindCaller_thenFindCallingClass() {
new StackWalkerDemo().findCaller();
}
}

View File

@@ -0,0 +1,73 @@
package com.baeldung.java9.streams.reactive;
import org.junit.Test;
import java.util.List;
import java.util.concurrent.SubmissionPublisher;
import static org.assertj.core.api.Java6Assertions.assertThat;
public class ReactiveStreamsTest {
@Test
public void givenPublisher_whenSubscribeToIt_thenShouldConsumeAllElements() throws InterruptedException {
//given
SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
EndSubscriber<String> subscriber = new EndSubscriber<>(6);
publisher.subscribe(subscriber);
List<String> items = List.of("1", "x", "2", "x", "3", "x");
//when
assertThat(publisher.getNumberOfSubscribers()).isEqualTo(1);
items.forEach(publisher::submit);
publisher.close();
//then
await().atMost(1000, TimeUnit.MILLISECONDS).until(
() -> assertThat(subscriber.consumedElements).containsExactlyElementsOf(items)
);
}
@Test
public void givenPublisher_whenSubscribeAndTransformElements_thenShouldConsumeAllElements() throws InterruptedException {
//given
SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
TransformProcessor<String, Integer> transformProcessor = new TransformProcessor<>(Integer::parseInt);
EndSubscriber<Integer> subscriber = new EndSubscriber<>(3);
List<String> items = List.of("1", "2", "3");
List<Integer> expectedResult = List.of(1, 2, 3);
//when
publisher.subscribe(transformProcessor);
transformProcessor.subscribe(subscriber);
items.forEach(publisher::submit);
publisher.close();
//then
await().atMost(1000, TimeUnit.MILLISECONDS).until(
() -> assertThat(subscriber.consumedElements).containsExactlyElementsOf(expectedResult)
);
}
@Test
public void givenPublisher_whenRequestForOnlyOneElement_thenShouldConsumeOnlyThatOne() throws InterruptedException {
//given
SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
EndSubscriber<String> subscriber = new EndSubscriber<>(1);
publisher.subscribe(subscriber);
List<String> items = List.of("1", "x", "2", "x", "3", "x");
List<String> expected = List.of("1");
//when
assertThat(publisher.getNumberOfSubscribers()).isEqualTo(1);
items.forEach(publisher::submit);
publisher.close();
//then
await().atMost(1000, TimeUnit.MILLISECONDS).until(
() -> assertThat(subscriber.consumedElements).containsExactlyElementsOf(expected)
);
}
}