diff --git a/apache-spark/pom.xml b/apache-spark/pom.xml index b5e97ee03d..fbb6e9ba5e 100644 --- a/apache-spark/pom.xml +++ b/apache-spark/pom.xml @@ -17,17 +17,17 @@ org.apache.spark - spark-core_2.11 + spark-core_2.12 ${org.apache.spark.spark-core.version} org.apache.spark - spark-sql_2.11 + spark-sql_2.12 ${org.apache.spark.spark-sql.version} org.apache.spark - spark-graphx_2.11 + spark-graphx_2.12 ${org.apache.spark.spark-graphx.version} @@ -37,22 +37,22 @@ org.apache.spark - spark-streaming_2.11 + spark-streaming_2.12 ${org.apache.spark.spark-streaming.version} org.apache.spark - spark-mllib_2.11 + spark-mllib_2.12 ${org.apache.spark.spark-mllib.version} org.apache.spark - spark-streaming-kafka-0-10_2.11 + spark-streaming-kafka-0-10_2.12 ${org.apache.spark.spark-streaming-kafka.version} com.datastax.spark - spark-cassandra-connector_2.11 + spark-cassandra-connector_2.12 ${com.datastax.spark.spark-cassandra-connector.version} @@ -97,17 +97,17 @@ - 2.4.8 - 2.4.8 - 2.4.8 - 2.4.8 - 2.4.8 + 3.3.2 + 3.3.2 + 3.3.2 + 3.3.2 + 3.3.2 0.8.1-spark3.0-s_2.12 - 2.4.8 - 2.5.2 + 3.3.2 + 3.3.0 1.6.0-M1 3.3.0 - 42.3.3 + 42.5.4 \ No newline at end of file diff --git a/apache-spark/src/main/java/com/baeldung/data/pipeline/WordCountingAppWithCheckpoint.java b/apache-spark/src/main/java/com/baeldung/data/pipeline/WordCountingAppWithCheckpoint.java index efbe5f3851..15f70aae61 100644 --- a/apache-spark/src/main/java/com/baeldung/data/pipeline/WordCountingAppWithCheckpoint.java +++ b/apache-spark/src/main/java/com/baeldung/data/pipeline/WordCountingAppWithCheckpoint.java @@ -16,8 +16,11 @@ import org.apache.log4j.Logger; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.Optional; import org.apache.spark.api.java.function.Function2; +import org.apache.spark.api.java.function.Function3; import org.apache.spark.streaming.Durations; +import org.apache.spark.streaming.State; import org.apache.spark.streaming.StateSpec; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaInputDStream; @@ -74,7 +77,8 @@ public class WordCountingAppWithCheckpoint { JavaPairDStream wordCounts = words.mapToPair(s -> new Tuple2<>(s, 1)) .reduceByKey((Function2) (i1, i2) -> i1 + i2); - JavaMapWithStateDStream> cumulativeWordCounts = wordCounts.mapWithState(StateSpec.function((word, one, state) -> { + JavaMapWithStateDStream> cumulativeWordCounts = + wordCounts.mapWithState(StateSpec.function((Function3, State, Tuple2>) (word, one, state) -> { int sum = one.orElse(0) + (state.exists() ? state.get() : 0); Tuple2 output = new Tuple2<>(word, sum); state.update(sum); diff --git a/apache-spark/src/main/java/com/baeldung/dataframes/SparkDriver.java b/apache-spark/src/main/java/com/baeldung/dataframes/SparkDriver.java index adc25170a7..e5e988b5a8 100644 --- a/apache-spark/src/main/java/com/baeldung/dataframes/SparkDriver.java +++ b/apache-spark/src/main/java/com/baeldung/dataframes/SparkDriver.java @@ -9,6 +9,7 @@ public class SparkDriver implements Serializable { public static SparkSession getSparkSession() { return SparkSession.builder() .appName("Customer Aggregation pipeline") + .config("spark.sql.legacy.timeParserPolicy", "LEGACY") .master("local") .getOrCreate();