diff --git a/core-java-modules/core-java-streams-3/pom.xml b/core-java-modules/core-java-streams-3/pom.xml index 659f1937f2..01b83f229a 100644 --- a/core-java-modules/core-java-streams-3/pom.xml +++ b/core-java-modules/core-java-streams-3/pom.xml @@ -27,6 +27,17 @@ ${lombok.version} provided + + org.openjdk.jmh + jmh-core + ${jmh.version} + + + org.openjdk.jmh + jmh-generator-annprocess + ${jmh.version} + test + org.assertj @@ -44,11 +55,30 @@ true + + + org.apache.maven.plugins + maven-compiler-plugin + + 1.8 + 1.8 + + + org.openjdk.jmh + jmh-generator-annprocess + ${jmh.version} + + + + + + 1.18.20 3.6.1 + 1.29 \ No newline at end of file diff --git a/core-java-modules/core-java-streams-3/src/main/java/com/baeldung/streams/parallel/BenchmarkRunner.java b/core-java-modules/core-java-streams-3/src/main/java/com/baeldung/streams/parallel/BenchmarkRunner.java new file mode 100644 index 0000000000..461d728ad0 --- /dev/null +++ b/core-java-modules/core-java-streams-3/src/main/java/com/baeldung/streams/parallel/BenchmarkRunner.java @@ -0,0 +1,9 @@ +package com.baeldung.streams.parallel; + +public class BenchmarkRunner { + + public static void main(String[] args) throws Exception { + org.openjdk.jmh.Main.main(args); + } + +} diff --git a/core-java-modules/core-java-streams-3/src/main/java/com/baeldung/streams/parallel/DifferentSourceSplitting.java b/core-java-modules/core-java-streams-3/src/main/java/com/baeldung/streams/parallel/DifferentSourceSplitting.java new file mode 100644 index 0000000000..9ad569df30 --- /dev/null +++ b/core-java-modules/core-java-streams-3/src/main/java/com/baeldung/streams/parallel/DifferentSourceSplitting.java @@ -0,0 +1,54 @@ +package com.baeldung.streams.parallel; + +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; + +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.stream.IntStream; + +public class DifferentSourceSplitting { + + private static final List arrayListOfNumbers = new ArrayList<>(); + private static final List linkedListOfNumbers = new LinkedList<>(); + + static { + IntStream.rangeClosed(1, 1_000_000).forEach(i -> { + arrayListOfNumbers.add(i); + linkedListOfNumbers.add(i); + }); + } + + @Benchmark + @BenchmarkMode(Mode.AverageTime) + @OutputTimeUnit(TimeUnit.NANOSECONDS) + public static void differentSourceArrayListSequential() { + arrayListOfNumbers.stream().reduce(0, Integer::sum); + } + + @Benchmark + @BenchmarkMode(Mode.AverageTime) + @OutputTimeUnit(TimeUnit.NANOSECONDS) + public static void differentSourceArrayListParallel() { + arrayListOfNumbers.parallelStream().reduce(0, Integer::sum); + } + + @Benchmark + @BenchmarkMode(Mode.AverageTime) + @OutputTimeUnit(TimeUnit.NANOSECONDS) + public static void differentSourceLinkedListSequential() { + linkedListOfNumbers.stream().reduce(0, Integer::sum); + } + + @Benchmark + @BenchmarkMode(Mode.AverageTime) + @OutputTimeUnit(TimeUnit.NANOSECONDS) + public static void differentSourceLinkedListParallel() { + linkedListOfNumbers.parallelStream().reduce(0, Integer::sum); + } + +} diff --git a/core-java-modules/core-java-streams-3/src/main/java/com/baeldung/streams/parallel/MemoryLocalityCosts.java b/core-java-modules/core-java-streams-3/src/main/java/com/baeldung/streams/parallel/MemoryLocalityCosts.java new file mode 100644 index 0000000000..bc5cbf491b --- /dev/null +++ b/core-java-modules/core-java-streams-3/src/main/java/com/baeldung/streams/parallel/MemoryLocalityCosts.java @@ -0,0 +1,52 @@ +package com.baeldung.streams.parallel; + +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; + +import java.util.Arrays; +import java.util.concurrent.TimeUnit; +import java.util.stream.IntStream; + +public class MemoryLocalityCosts { + + private static final int[] intArray = new int[1_000_000]; + private static final Integer[] integerArray = new Integer[1_000_000]; + + static { + IntStream.rangeClosed(1, 1_000_000).forEach(i -> { + intArray[i-1] = i; + integerArray[i-1] = i; + }); + } + + @Benchmark + @BenchmarkMode(Mode.AverageTime) + @OutputTimeUnit(TimeUnit.NANOSECONDS) + public static void localityIntArraySequential() { + Arrays.stream(intArray).reduce(0, Integer::sum); + } + + @Benchmark + @BenchmarkMode(Mode.AverageTime) + @OutputTimeUnit(TimeUnit.NANOSECONDS) + public static void localityIntArrayParallel() { + Arrays.stream(intArray).parallel().reduce(0, Integer::sum); + } + + @Benchmark + @BenchmarkMode(Mode.AverageTime) + @OutputTimeUnit(TimeUnit.NANOSECONDS) + public static void localityIntegerArraySequential() { + Arrays.stream(integerArray).reduce(0, Integer::sum); + } + + @Benchmark + @BenchmarkMode(Mode.AverageTime) + @OutputTimeUnit(TimeUnit.NANOSECONDS) + public static void localityIntegerArrayParallel() { + Arrays.stream(integerArray).parallel().reduce(0, Integer::sum); + } + +} diff --git a/core-java-modules/core-java-streams-3/src/main/java/com/baeldung/streams/parallel/MergingCosts.java b/core-java-modules/core-java-streams-3/src/main/java/com/baeldung/streams/parallel/MergingCosts.java new file mode 100644 index 0000000000..a9919dbe72 --- /dev/null +++ b/core-java-modules/core-java-streams-3/src/main/java/com/baeldung/streams/parallel/MergingCosts.java @@ -0,0 +1,52 @@ +package com.baeldung.streams.parallel; + +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +public class MergingCosts { + + private static final List arrayListOfNumbers = new ArrayList<>(); + + static { + IntStream.rangeClosed(1, 1_000_000).forEach(i -> { + arrayListOfNumbers.add(i); + }); + } + + @Benchmark + @BenchmarkMode(Mode.AverageTime) + @OutputTimeUnit(TimeUnit.NANOSECONDS) + public static void mergingCostsSumSequential() { + arrayListOfNumbers.stream().reduce(0, Integer::sum); + } + + @Benchmark + @BenchmarkMode(Mode.AverageTime) + @OutputTimeUnit(TimeUnit.NANOSECONDS) + public static void mergingCostsSumParallel() { + arrayListOfNumbers.stream().parallel().reduce(0, Integer::sum); + } + + @Benchmark + @BenchmarkMode(Mode.AverageTime) + @OutputTimeUnit(TimeUnit.NANOSECONDS) + public static void mergingCostsGroupingSequential() { + arrayListOfNumbers.stream().collect(Collectors.toSet()); + } + + @Benchmark + @BenchmarkMode(Mode.AverageTime) + @OutputTimeUnit(TimeUnit.NANOSECONDS) + public static void mergingCostsGroupingParallel() { + arrayListOfNumbers.stream().parallel().collect(Collectors.toSet()); + } + +} diff --git a/core-java-modules/core-java-streams-3/src/main/java/com/baeldung/streams/parallel/ParallelStream.java b/core-java-modules/core-java-streams-3/src/main/java/com/baeldung/streams/parallel/ParallelStream.java new file mode 100644 index 0000000000..f236f418e8 --- /dev/null +++ b/core-java-modules/core-java-streams-3/src/main/java/com/baeldung/streams/parallel/ParallelStream.java @@ -0,0 +1,15 @@ +package com.baeldung.streams.parallel; + +import java.util.Arrays; +import java.util.List; + +public class ParallelStream { + + public static void main(String[] args) { + List listOfNumbers = Arrays.asList(1, 2, 3, 4); + listOfNumbers.parallelStream().forEach(number -> + System.out.println(number + " " + Thread.currentThread().getName()) + ); + } + +} diff --git a/core-java-modules/core-java-streams-3/src/main/java/com/baeldung/streams/parallel/SequentialStream.java b/core-java-modules/core-java-streams-3/src/main/java/com/baeldung/streams/parallel/SequentialStream.java new file mode 100644 index 0000000000..01379130fa --- /dev/null +++ b/core-java-modules/core-java-streams-3/src/main/java/com/baeldung/streams/parallel/SequentialStream.java @@ -0,0 +1,15 @@ +package com.baeldung.streams.parallel; + +import java.util.Arrays; +import java.util.List; + +public class SequentialStream { + + public static void main(String[] args) { + List listOfNumbers = Arrays.asList(1, 2, 3, 4); + listOfNumbers.stream().forEach(number -> + System.out.println(number + " " + Thread.currentThread().getName()) + ); + } + +} diff --git a/core-java-modules/core-java-streams-3/src/main/java/com/baeldung/streams/parallel/SplittingCosts.java b/core-java-modules/core-java-streams-3/src/main/java/com/baeldung/streams/parallel/SplittingCosts.java new file mode 100644 index 0000000000..d1e878df1f --- /dev/null +++ b/core-java-modules/core-java-streams-3/src/main/java/com/baeldung/streams/parallel/SplittingCosts.java @@ -0,0 +1,27 @@ +package com.baeldung.streams.parallel; + +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; + +import java.util.concurrent.TimeUnit; +import java.util.stream.IntStream; + +public class SplittingCosts { + + @Benchmark + @BenchmarkMode(Mode.AverageTime) + @OutputTimeUnit(TimeUnit.NANOSECONDS) + public static void sourceSplittingIntStreamSequential() { + IntStream.rangeClosed(1, 100).reduce(0, Integer::sum); + } + + @Benchmark + @BenchmarkMode(Mode.AverageTime) + @OutputTimeUnit(TimeUnit.NANOSECONDS) + public static void sourceSplittingIntStreamParallel() { + IntStream.rangeClosed(1, 100).parallel().reduce(0, Integer::sum); + } + +} diff --git a/core-java-modules/core-java-streams-3/src/test/java/com/baeldung/streams/parallel/ForkJoinUnitTest.java b/core-java-modules/core-java-streams-3/src/test/java/com/baeldung/streams/parallel/ForkJoinUnitTest.java new file mode 100644 index 0000000000..f9aab8ed6c --- /dev/null +++ b/core-java-modules/core-java-streams-3/src/test/java/com/baeldung/streams/parallel/ForkJoinUnitTest.java @@ -0,0 +1,46 @@ +package com.baeldung.streams.parallel; + +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ForkJoinPool; + +import static org.assertj.core.api.Assertions.assertThat; + +class ForkJoinUnitTest { + + @Test + void givenSequentialStreamOfNumbers_whenReducingSumWithIdentityFive_thenResultIsCorrect() { + List listOfNumbers = Arrays.asList(1, 2, 3, 4); + int sum = listOfNumbers.stream().reduce(5, Integer::sum); + assertThat(sum).isEqualTo(15); + } + + @Test + void givenParallelStreamOfNumbers_whenReducingSumWithIdentityFive_thenResultIsNotCorrect() { + List listOfNumbers = Arrays.asList(1, 2, 3, 4); + int sum = listOfNumbers.parallelStream().reduce(5, Integer::sum); + assertThat(sum).isNotEqualTo(15); + } + + @Test + void givenParallelStreamOfNumbers_whenReducingSumWithIdentityZero_thenResultIsCorrect() { + List listOfNumbers = Arrays.asList(1, 2, 3, 4); + int sum = listOfNumbers.parallelStream().reduce(0, Integer::sum) + 5; + assertThat(sum).isEqualTo(15); + } + + @Test + public void givenParallelStreamOfNumbers_whenUsingCustomThreadPool_thenResultIsCorrect() + throws InterruptedException, ExecutionException { + List listOfNumbers = Arrays.asList(1, 2, 3, 4); + ForkJoinPool customThreadPool = new ForkJoinPool(4); + int sum = customThreadPool.submit( + () -> listOfNumbers.parallelStream().reduce(0, Integer::sum)).get(); + customThreadPool.shutdown(); + assertThat(sum).isEqualTo(10); + } + +}