From 5bdd92a5b9e5cdaeee75dbace4d40e57e599e93c Mon Sep 17 00:00:00 2001 From: Norberto Ritzmann Jr Date: Sat, 5 Oct 2019 23:11:55 +0200 Subject: [PATCH 1/7] Changes related to Apache Spark GraphX article --- apache-spark/pom.xml | 6 ++ .../com/baeldung/graphx/GraphAlgorithms.java | 46 +++++++++ .../java/com/baeldung/graphx/GraphLoader.java | 97 +++++++++++++++++++ .../com/baeldung/graphx/GraphOperations.java | 53 ++++++++++ .../com/baeldung/graphx/Relationship.java | 30 ++++++ .../main/java/com/baeldung/graphx/User.java | 25 +++++ 6 files changed, 257 insertions(+) create mode 100644 apache-spark/src/main/java/com/baeldung/graphx/GraphAlgorithms.java create mode 100644 apache-spark/src/main/java/com/baeldung/graphx/GraphLoader.java create mode 100644 apache-spark/src/main/java/com/baeldung/graphx/GraphOperations.java create mode 100644 apache-spark/src/main/java/com/baeldung/graphx/Relationship.java create mode 100644 apache-spark/src/main/java/com/baeldung/graphx/User.java diff --git a/apache-spark/pom.xml b/apache-spark/pom.xml index 3df81e5aee..6f62f568ff 100644 --- a/apache-spark/pom.xml +++ b/apache-spark/pom.xml @@ -40,6 +40,12 @@ ${org.apache.spark.spark-mllib.version} provided + + org.apache.spark + spark-graphx_2.12 + ${org.apache.spark.spark-mllib.version} + provided + org.apache.spark spark-streaming-kafka-0-10_2.11 diff --git a/apache-spark/src/main/java/com/baeldung/graphx/GraphAlgorithms.java b/apache-spark/src/main/java/com/baeldung/graphx/GraphAlgorithms.java new file mode 100644 index 0000000000..2c1474d0aa --- /dev/null +++ b/apache-spark/src/main/java/com/baeldung/graphx/GraphAlgorithms.java @@ -0,0 +1,46 @@ +package com.baeldung.graphx; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.log4j.Level; +import org.apache.log4j.Logger; +import org.apache.spark.api.java.function.VoidFunction; +import org.apache.spark.graphx.Graph; +import org.apache.spark.graphx.VertexRDD; +import org.apache.spark.graphx.lib.PageRank; + +import scala.Tuple2; + +public class GraphAlgorithms { + public static Map USERS = new HashMap<>(); + + public static void main(String[] args) { + Logger.getLogger("org").setLevel(Level.OFF); + + GraphLoader loader = new GraphLoader(); + Graph graph = loader.mapUserRelationship(); + + Graph pageRank = PageRank.run(graph, 20, 0.0001, GraphLoader.USER_TAG, + GraphLoader.RELATIONSHIP_TAG); + + VertexRDD usersRDD = pageRank.vertices(); + + System.out.println("---- PageRank: "); + System.out.println("- Users Ranked "); + usersRDD.toJavaRDD() + .foreach((VoidFunction>) tuple -> System.out.println(tuple.toString())); + + System.out.println("---- Connected Components: "); + Graph connectedComponents = graph.ops().connectedComponents(); + + connectedComponents.vertices().toJavaRDD() + .foreach((VoidFunction>) tuple -> System.out.println(tuple.toString())); + + System.out.println("---- Triangle Count: "); + Graph triangleCount = graph.ops().triangleCount(); + + triangleCount.vertices().toJavaRDD() + .foreach((VoidFunction>) tuple -> System.out.println(tuple.toString())); + } +} diff --git a/apache-spark/src/main/java/com/baeldung/graphx/GraphLoader.java b/apache-spark/src/main/java/com/baeldung/graphx/GraphLoader.java new file mode 100644 index 0000000000..8024d2576b --- /dev/null +++ b/apache-spark/src/main/java/com/baeldung/graphx/GraphLoader.java @@ -0,0 +1,97 @@ +package com.baeldung.graphx; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.graphx.Edge; +import org.apache.spark.graphx.Graph; +import org.apache.spark.storage.StorageLevel; + +import scala.Function1; +import scala.Function2; +import scala.Predef; +import scala.reflect.ClassTag; +import scala.reflect.ClassTag$; + +public class GraphLoader { + + public static Map USERS = new HashMap<>(); + public static ClassTag RELATIONSHIP_TAG = ClassTag$.MODULE$.apply(Relationship.class); + public static ClassTag USER_TAG = ClassTag$.MODULE$.apply(User.class); + + public JavaSparkContext getSparkContext() { + SparkConf sparkConf = new SparkConf().setAppName("SparkGraphX").setMaster("local[*]"); + JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf); + return javaSparkContext; + } + + public Graph mapUserRelationship() { + JavaSparkContext javaSparkContext = getSparkContext(); + + List> edges = getEdges(); + + JavaRDD> edgeJavaRDD = javaSparkContext.parallelize(edges); + + ClassTag stringTag = ClassTag$.MODULE$.apply(String.class); + + Graph graph = Graph.fromEdges(edgeJavaRDD.rdd(), "Following", StorageLevel.MEMORY_ONLY(), + StorageLevel.MEMORY_ONLY(), stringTag, stringTag); + + Graph relationshipGraph = graph.mapEdges(new MapRelationship(), RELATIONSHIP_TAG); + Predef.$eq$colon$eq eq = null; + + return relationshipGraph.mapVertices(new MapUser(), USER_TAG, eq); + } + + public List> getEdges() { + List> edges = new ArrayList<>(); + edges.add(new Edge<>(1L, 2L, "Friend")); + edges.add(new Edge<>(1L, 4L, "Following")); + edges.add(new Edge<>(2L, 4L, "Friend")); + edges.add(new Edge<>(3L, 1L, "Relative")); + edges.add(new Edge<>(3L, 4L, "Relative")); + + return edges; + } + + public Map getUsers() { + if (USERS.isEmpty()) { + loadUsers(); + } + + return USERS; + } + + private void loadUsers() { + User john = new User(1L, "John"); + User martin = new User(2L, "Martin"); + User peter = new User(3L, "Peter"); + User alicia = new User(4L, "Alicia"); + + USERS.put(1L, john); + USERS.put(2L, martin); + USERS.put(3L, peter); + USERS.put(4L, alicia); + } + + private static class MapRelationship implements Function1, Relationship>, Serializable { + + @Override + public Relationship apply(Edge edge) { + return new Relationship(edge.attr, new GraphLoader().getUsers().get(edge.srcId()), USERS.get(edge.dstId())); + } + } + + private static class MapUser implements Function2, Serializable { + @Override + public User apply(Object id, String name) { + return new GraphLoader().getUsers().get((Long) id); + } + } +} diff --git a/apache-spark/src/main/java/com/baeldung/graphx/GraphOperations.java b/apache-spark/src/main/java/com/baeldung/graphx/GraphOperations.java new file mode 100644 index 0000000000..defb51f46e --- /dev/null +++ b/apache-spark/src/main/java/com/baeldung/graphx/GraphOperations.java @@ -0,0 +1,53 @@ +package com.baeldung.graphx; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.log4j.Level; +import org.apache.log4j.Logger; +import org.apache.spark.api.java.function.VoidFunction; +import org.apache.spark.graphx.Edge; +import org.apache.spark.graphx.Graph; +import org.apache.spark.graphx.VertexRDD; + +import scala.Tuple2; + +public class GraphOperations { + public static Map USERS = new HashMap<>(); + + public static void main(String[] args) { + Logger.getLogger("org").setLevel(Level.OFF); + GraphOperations operations = new GraphOperations(); + operations.doOperations(); + } + + private void doOperations() { + GraphLoader loader = new GraphLoader(); + Graph userGraph = loader.mapUserRelationship(); + + System.out.println("Mapped Users: "); + userGraph.vertices().toJavaRDD().foreach((VoidFunction>) tuple -> System.out + .println("id: " + tuple._1 + " name: " + tuple._2)); + + System.out.println("Mapped Relationships: "); + userGraph.edges().toJavaRDD() + .foreach((VoidFunction>) edge -> System.out.println(edge.attr().toString())); + + VertexRDD degreesVerticesRDD = userGraph.ops().degrees(); + VertexRDD inDegreesVerticesRDD = userGraph.ops().inDegrees(); + VertexRDD outDegreesVerticesRDD = userGraph.ops().outDegrees(); + + System.out.println("degrees: "); + degreesVerticesRDD.toJavaRDD().foreach((VoidFunction>) tuple -> System.out + .println("id: " + tuple._1 + " count: " + tuple._2)); + + System.out.println("inDegrees: "); + inDegreesVerticesRDD.toJavaRDD().foreach((VoidFunction>) tuple -> System.out + .println("id: " + tuple._1 + " count: " + tuple._2)); + + System.out.println("outDegrees: "); + outDegreesVerticesRDD.toJavaRDD().foreach((VoidFunction>) tuple -> System.out + .println("id: " + tuple._1 + " count: " + tuple._2)); + } + +} diff --git a/apache-spark/src/main/java/com/baeldung/graphx/Relationship.java b/apache-spark/src/main/java/com/baeldung/graphx/Relationship.java new file mode 100644 index 0000000000..2a022e98e8 --- /dev/null +++ b/apache-spark/src/main/java/com/baeldung/graphx/Relationship.java @@ -0,0 +1,30 @@ +package com.baeldung.graphx; + +public class Relationship { + private String type; + private User source; + private User target; + + public Relationship(String type, User source, User target) { + this.type = type; + this.source = source; + this.target = target; + } + + public String getType() { + return type; + } + + public User getSource() { + return source; + } + + public User getTarget() { + return target; + } + + @Override + public String toString() { + return getSource().toString() + " -- " + getType() + " --> " + getTarget().toString(); + } +} diff --git a/apache-spark/src/main/java/com/baeldung/graphx/User.java b/apache-spark/src/main/java/com/baeldung/graphx/User.java new file mode 100644 index 0000000000..98bf7768b0 --- /dev/null +++ b/apache-spark/src/main/java/com/baeldung/graphx/User.java @@ -0,0 +1,25 @@ +package com.baeldung.graphx; + +public class User { + + private Long id; + private String name; + + public User(long id, String name) { + this.id = id; + this.name = name; + } + + public Long getId() { + return id; + } + + public String getName() { + return name; + } + + @Override + public String toString() { + return "(" + id + "-" + name + ")"; + } +} From 93b517fab5be3a6df2c5886bf8722f033eec70e6 Mon Sep 17 00:00:00 2001 From: Norberto Ritzmann Jr Date: Sun, 6 Oct 2019 12:22:55 +0200 Subject: [PATCH 2/7] BAEL-3086 - Fix spark graphx version --- apache-spark/pom.xml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/apache-spark/pom.xml b/apache-spark/pom.xml index 6f62f568ff..5de616fa7f 100644 --- a/apache-spark/pom.xml +++ b/apache-spark/pom.xml @@ -43,7 +43,7 @@ org.apache.spark spark-graphx_2.12 - ${org.apache.spark.spark-mllib.version} + ${org.apache.spark.spark-graphx.version} provided @@ -99,6 +99,7 @@ 2.3.0 2.3.0 2.3.0 + 2.4.0 2.3.0 1.5.2 3.2 From 259bb1701996039714afbc6de0a434dccb4c1476 Mon Sep 17 00:00:00 2001 From: Norberto Ritzmann Jr Date: Sun, 6 Oct 2019 21:57:54 +0200 Subject: [PATCH 3/7] Spark graphx moved to a new project due to a incompatibility of scala compiler --- apache-spark-graphx/pom.xml | 61 +++++++++++++++++++ .../com/baeldung/graphx/GraphAlgorithms.java | 0 .../java/com/baeldung/graphx/GraphLoader.java | 0 .../com/baeldung/graphx/GraphOperations.java | 0 .../com/baeldung/graphx/Relationship.java | 0 .../main/java/com/baeldung/graphx/User.java | 4 +- .../src/main/resources/logback.xml | 13 ++++ apache-spark/pom.xml | 7 --- 8 files changed, 77 insertions(+), 8 deletions(-) create mode 100644 apache-spark-graphx/pom.xml rename {apache-spark => apache-spark-graphx}/src/main/java/com/baeldung/graphx/GraphAlgorithms.java (100%) rename {apache-spark => apache-spark-graphx}/src/main/java/com/baeldung/graphx/GraphLoader.java (100%) rename {apache-spark => apache-spark-graphx}/src/main/java/com/baeldung/graphx/GraphOperations.java (100%) rename {apache-spark => apache-spark-graphx}/src/main/java/com/baeldung/graphx/Relationship.java (100%) rename {apache-spark => apache-spark-graphx}/src/main/java/com/baeldung/graphx/User.java (83%) create mode 100644 apache-spark-graphx/src/main/resources/logback.xml diff --git a/apache-spark-graphx/pom.xml b/apache-spark-graphx/pom.xml new file mode 100644 index 0000000000..cb6b65fb63 --- /dev/null +++ b/apache-spark-graphx/pom.xml @@ -0,0 +1,61 @@ + + 4.0.0 + apache-spark-graphx + 1.0-SNAPSHOT + apache-spark-graphx + jar + http://maven.apache.org + + + com.baeldung + parent-modules + 1.0.0-SNAPSHOT + + + + + org.apache.spark + spark-graphx_2.12 + ${org.apache.spark.spark-graphx.version} + provided + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + ${maven-compiler-plugin.version} + + ${java.version} + ${java.version} + + + + maven-assembly-plugin + + + package + + single + + + + + + jar-with-dependencies + + + + + + + + 2.4.4 + 3.2 + + + diff --git a/apache-spark/src/main/java/com/baeldung/graphx/GraphAlgorithms.java b/apache-spark-graphx/src/main/java/com/baeldung/graphx/GraphAlgorithms.java similarity index 100% rename from apache-spark/src/main/java/com/baeldung/graphx/GraphAlgorithms.java rename to apache-spark-graphx/src/main/java/com/baeldung/graphx/GraphAlgorithms.java diff --git a/apache-spark/src/main/java/com/baeldung/graphx/GraphLoader.java b/apache-spark-graphx/src/main/java/com/baeldung/graphx/GraphLoader.java similarity index 100% rename from apache-spark/src/main/java/com/baeldung/graphx/GraphLoader.java rename to apache-spark-graphx/src/main/java/com/baeldung/graphx/GraphLoader.java diff --git a/apache-spark/src/main/java/com/baeldung/graphx/GraphOperations.java b/apache-spark-graphx/src/main/java/com/baeldung/graphx/GraphOperations.java similarity index 100% rename from apache-spark/src/main/java/com/baeldung/graphx/GraphOperations.java rename to apache-spark-graphx/src/main/java/com/baeldung/graphx/GraphOperations.java diff --git a/apache-spark/src/main/java/com/baeldung/graphx/Relationship.java b/apache-spark-graphx/src/main/java/com/baeldung/graphx/Relationship.java similarity index 100% rename from apache-spark/src/main/java/com/baeldung/graphx/Relationship.java rename to apache-spark-graphx/src/main/java/com/baeldung/graphx/Relationship.java diff --git a/apache-spark/src/main/java/com/baeldung/graphx/User.java b/apache-spark-graphx/src/main/java/com/baeldung/graphx/User.java similarity index 83% rename from apache-spark/src/main/java/com/baeldung/graphx/User.java rename to apache-spark-graphx/src/main/java/com/baeldung/graphx/User.java index 98bf7768b0..0ad9c09a6a 100644 --- a/apache-spark/src/main/java/com/baeldung/graphx/User.java +++ b/apache-spark-graphx/src/main/java/com/baeldung/graphx/User.java @@ -1,6 +1,8 @@ package com.baeldung.graphx; -public class User { +import java.io.Serializable; + +public class User implements Serializable { private Long id; private String name; diff --git a/apache-spark-graphx/src/main/resources/logback.xml b/apache-spark-graphx/src/main/resources/logback.xml new file mode 100644 index 0000000000..7d900d8ea8 --- /dev/null +++ b/apache-spark-graphx/src/main/resources/logback.xml @@ -0,0 +1,13 @@ + + + + + %d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n + + + + + + + + \ No newline at end of file diff --git a/apache-spark/pom.xml b/apache-spark/pom.xml index 5de616fa7f..3df81e5aee 100644 --- a/apache-spark/pom.xml +++ b/apache-spark/pom.xml @@ -40,12 +40,6 @@ ${org.apache.spark.spark-mllib.version} provided - - org.apache.spark - spark-graphx_2.12 - ${org.apache.spark.spark-graphx.version} - provided - org.apache.spark spark-streaming-kafka-0-10_2.11 @@ -99,7 +93,6 @@ 2.3.0 2.3.0 2.3.0 - 2.4.0 2.3.0 1.5.2 3.2 From d18b5f3019acb9835f44d8f152c9763a501353e2 Mon Sep 17 00:00:00 2001 From: Norberto Ritzmann Jr Date: Sun, 17 Nov 2019 17:29:00 +0100 Subject: [PATCH 4/7] BAEL-3086 - Rework to deal with GraphFrames Librarz --- apache-spark/pom.xml | 14 ++++ .../graphframes/GraphExperiments.java | 52 ++++++++++++++ .../com/baeldung/graphframes/GraphLoader.java | 72 +++++++++++++++++++ .../baeldung/graphframes/Relationship.java | 39 ++++++++++ .../java/com/baeldung/graphframes/User.java | 27 +++++++ 5 files changed, 204 insertions(+) create mode 100644 apache-spark/src/main/java/com/baeldung/graphframes/GraphExperiments.java create mode 100644 apache-spark/src/main/java/com/baeldung/graphframes/GraphLoader.java create mode 100644 apache-spark/src/main/java/com/baeldung/graphframes/Relationship.java create mode 100644 apache-spark/src/main/java/com/baeldung/graphframes/User.java diff --git a/apache-spark/pom.xml b/apache-spark/pom.xml index 3df81e5aee..8fcac06e05 100644 --- a/apache-spark/pom.xml +++ b/apache-spark/pom.xml @@ -28,6 +28,18 @@ ${org.apache.spark.spark-sql.version} provided + + org.apache.spark + spark-graphx_2.11 + ${org.apache.spark.spark-graphx.version} + provided + + + graphframes + graphframes + ${graphframes.version} + provided + org.apache.spark spark-streaming_2.11 @@ -92,6 +104,8 @@ 2.3.0 2.3.0 2.3.0 + 2.3.0 + 0.7.0-spark2.4-s_2.11 2.3.0 2.3.0 1.5.2 diff --git a/apache-spark/src/main/java/com/baeldung/graphframes/GraphExperiments.java b/apache-spark/src/main/java/com/baeldung/graphframes/GraphExperiments.java new file mode 100644 index 0000000000..cb5b4c7977 --- /dev/null +++ b/apache-spark/src/main/java/com/baeldung/graphframes/GraphExperiments.java @@ -0,0 +1,52 @@ +package com.baeldung.graphframes; + +import org.apache.log4j.Level; +import org.apache.log4j.Logger; +import org.apache.spark.api.java.function.VoidFunction; +import org.apache.spark.graphx.Edge; +import org.apache.spark.graphx.Graph; +import org.apache.spark.graphx.VertexRDD; +import org.graphframes.GraphFrame; +import scala.Tuple2; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +public class GraphExperiments { + public static Map USERS = new HashMap<>(); + + public static void main(String[] args) throws IOException { + Logger.getLogger("org").setLevel(Level.OFF); + GraphLoader loader = new GraphLoader(); + GraphFrame graph = loader.getGraphFrameUserRelationship(); + + GraphExperiments experiments = new GraphExperiments(); + experiments.doGraphFrameOperations(graph); + experiments.doGraphFrameAlgorithms(graph); + } + + private void doGraphFrameOperations(GraphFrame graph) { + graph.vertices().show(); + graph.edges().show(); + + graph.vertices().filter("name = 'Martin'").show(); + + graph.filterEdges("type = 'Friend'") + .dropIsolatedVertices().vertices().show(); + + graph.degrees().show(); + graph.inDegrees().show(); + graph.outDegrees().show(); + } + + private void doGraphFrameAlgorithms(GraphFrame graph) { + + graph.pageRank().maxIter(20).resetProbability(0.0001).run().vertices().show(); + + graph.connectedComponents().run().show(); + + graph.triangleCount().run().show(); + } + +} diff --git a/apache-spark/src/main/java/com/baeldung/graphframes/GraphLoader.java b/apache-spark/src/main/java/com/baeldung/graphframes/GraphLoader.java new file mode 100644 index 0000000000..084af4fac4 --- /dev/null +++ b/apache-spark/src/main/java/com/baeldung/graphframes/GraphLoader.java @@ -0,0 +1,72 @@ +package com.baeldung.graphframes; + +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; +import org.graphframes.GraphFrame; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.List; + +public class GraphLoader { + + public JavaSparkContext getSparkContext() throws IOException { + Path temp = Files.createTempDirectory("sparkGraphFrames"); + SparkConf sparkConf = new SparkConf().setAppName("SparkGraphX").setMaster("local[*]"); + JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf); + javaSparkContext.setCheckpointDir(temp.toString()); + return javaSparkContext; + } + + public GraphFrame getGraphFrameUserRelationship() throws IOException { + Path temp = Files.createTempDirectory("sparkGraphFrames"); + SparkSession session = SparkSession.builder() + .appName("SparkGraphFrameSample") + .config("spark.sql.warehouse.dir", temp.toString())//"/file:C:/temp" + .sparkContext(getSparkContext().sc()) + .master("local[*]") + .getOrCreate(); + List users = loadUsers(); + + Dataset userDataset = session.createDataFrame(users, User.class); + + List relationshipsList = getRelations(); + Dataset relationshipDataset = session.createDataFrame(relationshipsList, Relationship.class); + + GraphFrame graphFrame = new GraphFrame(userDataset, relationshipDataset); + + return graphFrame; + } + + public List getRelations() { + List relationships = new ArrayList<>(); + relationships.add(new Relationship("Friend", "1", "2")); + relationships.add(new Relationship("Following", "1", "4")); + relationships.add(new Relationship("Friend", "2", "4")); + relationships.add(new Relationship("Relative", "3", "1")); + relationships.add(new Relationship("Relative", "3", "4")); + + return relationships; + } + + private List loadUsers() { + User john = new User(1L, "John"); + User martin = new User(2L, "Martin"); + User peter = new User(3L, "Peter"); + User alicia = new User(4L, "Alicia"); + + List users = new ArrayList<>(); + + users.add(new User(1L, "John")); + users.add(new User(2L, "Martin")); + users.add(new User(3L, "Peter")); + users.add(new User(4L, "Alicia")); + + return users; + } +} diff --git a/apache-spark/src/main/java/com/baeldung/graphframes/Relationship.java b/apache-spark/src/main/java/com/baeldung/graphframes/Relationship.java new file mode 100644 index 0000000000..ce1780ea3f --- /dev/null +++ b/apache-spark/src/main/java/com/baeldung/graphframes/Relationship.java @@ -0,0 +1,39 @@ +package com.baeldung.graphframes; + +import java.io.Serializable; +import java.util.UUID; + +public class Relationship implements Serializable { + private String type; + private String src; + private String dst; + private UUID id; + + public Relationship(String type, String src, String dst) { + this.type = type; + this.src = src; + this.dst = dst; + this.id = UUID.randomUUID(); + } + + public String getId() { + return id.toString(); + } + + public String getType() { + return type; + } + + public String getSrc() { + return src; + } + + public String getDst() { + return dst; + } + + @Override + public String toString() { + return getSrc() + " -- " + getType() + " --> " + getDst(); + } +} diff --git a/apache-spark/src/main/java/com/baeldung/graphframes/User.java b/apache-spark/src/main/java/com/baeldung/graphframes/User.java new file mode 100644 index 0000000000..475a9c13ad --- /dev/null +++ b/apache-spark/src/main/java/com/baeldung/graphframes/User.java @@ -0,0 +1,27 @@ +package com.baeldung.graphframes; + +import java.io.Serializable; + +public class User implements Serializable { + + private Long id; + private String name; + + public User(long id, String name) { + this.id = id; + this.name = name; + } + + public String getId() { + return id.toString(); + } + + public String getName() { + return name; + } + + @Override + public String toString() { + return "struct<" + id + "," + name + ">"; + } +} From ffe56e07b2e4795cf9095148836b54c7a3659119 Mon Sep 17 00:00:00 2001 From: Norberto Ritzmann Jr Date: Mon, 18 Nov 2019 00:22:52 +0100 Subject: [PATCH 5/7] Adjustment to have the spark repository --- apache-spark/pom.xml | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/apache-spark/pom.xml b/apache-spark/pom.xml index 8fcac06e05..af17113215 100644 --- a/apache-spark/pom.xml +++ b/apache-spark/pom.xml @@ -112,4 +112,10 @@ 3.2 + + + SparkPackagesRepo + http://dl.bintray.com/spark-packages/maven + + From a0011be9fcd0cf842e2c35fe3ceb48f910f153f6 Mon Sep 17 00:00:00 2001 From: Norberto Ritzmann Jr Date: Fri, 22 Nov 2019 10:03:29 +0100 Subject: [PATCH 6/7] BAEL-3086 - Project moved to apache-spark project --- apache-spark-graphx/pom.xml | 61 ------------ .../com/baeldung/graphx/GraphAlgorithms.java | 46 --------- .../java/com/baeldung/graphx/GraphLoader.java | 97 ------------------- .../com/baeldung/graphx/GraphOperations.java | 53 ---------- .../com/baeldung/graphx/Relationship.java | 30 ------ .../main/java/com/baeldung/graphx/User.java | 27 ------ .../src/main/resources/logback.xml | 13 --- 7 files changed, 327 deletions(-) delete mode 100644 apache-spark-graphx/pom.xml delete mode 100644 apache-spark-graphx/src/main/java/com/baeldung/graphx/GraphAlgorithms.java delete mode 100644 apache-spark-graphx/src/main/java/com/baeldung/graphx/GraphLoader.java delete mode 100644 apache-spark-graphx/src/main/java/com/baeldung/graphx/GraphOperations.java delete mode 100644 apache-spark-graphx/src/main/java/com/baeldung/graphx/Relationship.java delete mode 100644 apache-spark-graphx/src/main/java/com/baeldung/graphx/User.java delete mode 100644 apache-spark-graphx/src/main/resources/logback.xml diff --git a/apache-spark-graphx/pom.xml b/apache-spark-graphx/pom.xml deleted file mode 100644 index cb6b65fb63..0000000000 --- a/apache-spark-graphx/pom.xml +++ /dev/null @@ -1,61 +0,0 @@ - - 4.0.0 - apache-spark-graphx - 1.0-SNAPSHOT - apache-spark-graphx - jar - http://maven.apache.org - - - com.baeldung - parent-modules - 1.0.0-SNAPSHOT - - - - - org.apache.spark - spark-graphx_2.12 - ${org.apache.spark.spark-graphx.version} - provided - - - - - - - org.apache.maven.plugins - maven-compiler-plugin - ${maven-compiler-plugin.version} - - ${java.version} - ${java.version} - - - - maven-assembly-plugin - - - package - - single - - - - - - jar-with-dependencies - - - - - - - - 2.4.4 - 3.2 - - - diff --git a/apache-spark-graphx/src/main/java/com/baeldung/graphx/GraphAlgorithms.java b/apache-spark-graphx/src/main/java/com/baeldung/graphx/GraphAlgorithms.java deleted file mode 100644 index 2c1474d0aa..0000000000 --- a/apache-spark-graphx/src/main/java/com/baeldung/graphx/GraphAlgorithms.java +++ /dev/null @@ -1,46 +0,0 @@ -package com.baeldung.graphx; - -import java.util.HashMap; -import java.util.Map; - -import org.apache.log4j.Level; -import org.apache.log4j.Logger; -import org.apache.spark.api.java.function.VoidFunction; -import org.apache.spark.graphx.Graph; -import org.apache.spark.graphx.VertexRDD; -import org.apache.spark.graphx.lib.PageRank; - -import scala.Tuple2; - -public class GraphAlgorithms { - public static Map USERS = new HashMap<>(); - - public static void main(String[] args) { - Logger.getLogger("org").setLevel(Level.OFF); - - GraphLoader loader = new GraphLoader(); - Graph graph = loader.mapUserRelationship(); - - Graph pageRank = PageRank.run(graph, 20, 0.0001, GraphLoader.USER_TAG, - GraphLoader.RELATIONSHIP_TAG); - - VertexRDD usersRDD = pageRank.vertices(); - - System.out.println("---- PageRank: "); - System.out.println("- Users Ranked "); - usersRDD.toJavaRDD() - .foreach((VoidFunction>) tuple -> System.out.println(tuple.toString())); - - System.out.println("---- Connected Components: "); - Graph connectedComponents = graph.ops().connectedComponents(); - - connectedComponents.vertices().toJavaRDD() - .foreach((VoidFunction>) tuple -> System.out.println(tuple.toString())); - - System.out.println("---- Triangle Count: "); - Graph triangleCount = graph.ops().triangleCount(); - - triangleCount.vertices().toJavaRDD() - .foreach((VoidFunction>) tuple -> System.out.println(tuple.toString())); - } -} diff --git a/apache-spark-graphx/src/main/java/com/baeldung/graphx/GraphLoader.java b/apache-spark-graphx/src/main/java/com/baeldung/graphx/GraphLoader.java deleted file mode 100644 index 8024d2576b..0000000000 --- a/apache-spark-graphx/src/main/java/com/baeldung/graphx/GraphLoader.java +++ /dev/null @@ -1,97 +0,0 @@ -package com.baeldung.graphx; - -import java.io.Serializable; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import org.apache.spark.SparkConf; -import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.graphx.Edge; -import org.apache.spark.graphx.Graph; -import org.apache.spark.storage.StorageLevel; - -import scala.Function1; -import scala.Function2; -import scala.Predef; -import scala.reflect.ClassTag; -import scala.reflect.ClassTag$; - -public class GraphLoader { - - public static Map USERS = new HashMap<>(); - public static ClassTag RELATIONSHIP_TAG = ClassTag$.MODULE$.apply(Relationship.class); - public static ClassTag USER_TAG = ClassTag$.MODULE$.apply(User.class); - - public JavaSparkContext getSparkContext() { - SparkConf sparkConf = new SparkConf().setAppName("SparkGraphX").setMaster("local[*]"); - JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf); - return javaSparkContext; - } - - public Graph mapUserRelationship() { - JavaSparkContext javaSparkContext = getSparkContext(); - - List> edges = getEdges(); - - JavaRDD> edgeJavaRDD = javaSparkContext.parallelize(edges); - - ClassTag stringTag = ClassTag$.MODULE$.apply(String.class); - - Graph graph = Graph.fromEdges(edgeJavaRDD.rdd(), "Following", StorageLevel.MEMORY_ONLY(), - StorageLevel.MEMORY_ONLY(), stringTag, stringTag); - - Graph relationshipGraph = graph.mapEdges(new MapRelationship(), RELATIONSHIP_TAG); - Predef.$eq$colon$eq eq = null; - - return relationshipGraph.mapVertices(new MapUser(), USER_TAG, eq); - } - - public List> getEdges() { - List> edges = new ArrayList<>(); - edges.add(new Edge<>(1L, 2L, "Friend")); - edges.add(new Edge<>(1L, 4L, "Following")); - edges.add(new Edge<>(2L, 4L, "Friend")); - edges.add(new Edge<>(3L, 1L, "Relative")); - edges.add(new Edge<>(3L, 4L, "Relative")); - - return edges; - } - - public Map getUsers() { - if (USERS.isEmpty()) { - loadUsers(); - } - - return USERS; - } - - private void loadUsers() { - User john = new User(1L, "John"); - User martin = new User(2L, "Martin"); - User peter = new User(3L, "Peter"); - User alicia = new User(4L, "Alicia"); - - USERS.put(1L, john); - USERS.put(2L, martin); - USERS.put(3L, peter); - USERS.put(4L, alicia); - } - - private static class MapRelationship implements Function1, Relationship>, Serializable { - - @Override - public Relationship apply(Edge edge) { - return new Relationship(edge.attr, new GraphLoader().getUsers().get(edge.srcId()), USERS.get(edge.dstId())); - } - } - - private static class MapUser implements Function2, Serializable { - @Override - public User apply(Object id, String name) { - return new GraphLoader().getUsers().get((Long) id); - } - } -} diff --git a/apache-spark-graphx/src/main/java/com/baeldung/graphx/GraphOperations.java b/apache-spark-graphx/src/main/java/com/baeldung/graphx/GraphOperations.java deleted file mode 100644 index defb51f46e..0000000000 --- a/apache-spark-graphx/src/main/java/com/baeldung/graphx/GraphOperations.java +++ /dev/null @@ -1,53 +0,0 @@ -package com.baeldung.graphx; - -import java.util.HashMap; -import java.util.Map; - -import org.apache.log4j.Level; -import org.apache.log4j.Logger; -import org.apache.spark.api.java.function.VoidFunction; -import org.apache.spark.graphx.Edge; -import org.apache.spark.graphx.Graph; -import org.apache.spark.graphx.VertexRDD; - -import scala.Tuple2; - -public class GraphOperations { - public static Map USERS = new HashMap<>(); - - public static void main(String[] args) { - Logger.getLogger("org").setLevel(Level.OFF); - GraphOperations operations = new GraphOperations(); - operations.doOperations(); - } - - private void doOperations() { - GraphLoader loader = new GraphLoader(); - Graph userGraph = loader.mapUserRelationship(); - - System.out.println("Mapped Users: "); - userGraph.vertices().toJavaRDD().foreach((VoidFunction>) tuple -> System.out - .println("id: " + tuple._1 + " name: " + tuple._2)); - - System.out.println("Mapped Relationships: "); - userGraph.edges().toJavaRDD() - .foreach((VoidFunction>) edge -> System.out.println(edge.attr().toString())); - - VertexRDD degreesVerticesRDD = userGraph.ops().degrees(); - VertexRDD inDegreesVerticesRDD = userGraph.ops().inDegrees(); - VertexRDD outDegreesVerticesRDD = userGraph.ops().outDegrees(); - - System.out.println("degrees: "); - degreesVerticesRDD.toJavaRDD().foreach((VoidFunction>) tuple -> System.out - .println("id: " + tuple._1 + " count: " + tuple._2)); - - System.out.println("inDegrees: "); - inDegreesVerticesRDD.toJavaRDD().foreach((VoidFunction>) tuple -> System.out - .println("id: " + tuple._1 + " count: " + tuple._2)); - - System.out.println("outDegrees: "); - outDegreesVerticesRDD.toJavaRDD().foreach((VoidFunction>) tuple -> System.out - .println("id: " + tuple._1 + " count: " + tuple._2)); - } - -} diff --git a/apache-spark-graphx/src/main/java/com/baeldung/graphx/Relationship.java b/apache-spark-graphx/src/main/java/com/baeldung/graphx/Relationship.java deleted file mode 100644 index 2a022e98e8..0000000000 --- a/apache-spark-graphx/src/main/java/com/baeldung/graphx/Relationship.java +++ /dev/null @@ -1,30 +0,0 @@ -package com.baeldung.graphx; - -public class Relationship { - private String type; - private User source; - private User target; - - public Relationship(String type, User source, User target) { - this.type = type; - this.source = source; - this.target = target; - } - - public String getType() { - return type; - } - - public User getSource() { - return source; - } - - public User getTarget() { - return target; - } - - @Override - public String toString() { - return getSource().toString() + " -- " + getType() + " --> " + getTarget().toString(); - } -} diff --git a/apache-spark-graphx/src/main/java/com/baeldung/graphx/User.java b/apache-spark-graphx/src/main/java/com/baeldung/graphx/User.java deleted file mode 100644 index 0ad9c09a6a..0000000000 --- a/apache-spark-graphx/src/main/java/com/baeldung/graphx/User.java +++ /dev/null @@ -1,27 +0,0 @@ -package com.baeldung.graphx; - -import java.io.Serializable; - -public class User implements Serializable { - - private Long id; - private String name; - - public User(long id, String name) { - this.id = id; - this.name = name; - } - - public Long getId() { - return id; - } - - public String getName() { - return name; - } - - @Override - public String toString() { - return "(" + id + "-" + name + ")"; - } -} diff --git a/apache-spark-graphx/src/main/resources/logback.xml b/apache-spark-graphx/src/main/resources/logback.xml deleted file mode 100644 index 7d900d8ea8..0000000000 --- a/apache-spark-graphx/src/main/resources/logback.xml +++ /dev/null @@ -1,13 +0,0 @@ - - - - - %d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n - - - - - - - - \ No newline at end of file From 4c50b3043b166076f61d209ccbbf2bfe39062167 Mon Sep 17 00:00:00 2001 From: Norberto Ritzmann Jr Date: Fri, 22 Nov 2019 10:21:10 +0100 Subject: [PATCH 7/7] BAEL-3086 - Clean up --- .../main/java/com/baeldung/graphframes/GraphExperiments.java | 2 +- .../src/main/java/com/baeldung/graphframes/GraphLoader.java | 2 +- apache-spark/src/main/java/com/baeldung/graphframes/User.java | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/apache-spark/src/main/java/com/baeldung/graphframes/GraphExperiments.java b/apache-spark/src/main/java/com/baeldung/graphframes/GraphExperiments.java index cb5b4c7977..30524a8c8b 100644 --- a/apache-spark/src/main/java/com/baeldung/graphframes/GraphExperiments.java +++ b/apache-spark/src/main/java/com/baeldung/graphframes/GraphExperiments.java @@ -42,7 +42,7 @@ public class GraphExperiments { private void doGraphFrameAlgorithms(GraphFrame graph) { - graph.pageRank().maxIter(20).resetProbability(0.0001).run().vertices().show(); + graph.pageRank().maxIter(20).resetProbability(0.15).run().vertices().show(); graph.connectedComponents().run().show(); diff --git a/apache-spark/src/main/java/com/baeldung/graphframes/GraphLoader.java b/apache-spark/src/main/java/com/baeldung/graphframes/GraphLoader.java index 084af4fac4..cad1fb4e26 100644 --- a/apache-spark/src/main/java/com/baeldung/graphframes/GraphLoader.java +++ b/apache-spark/src/main/java/com/baeldung/graphframes/GraphLoader.java @@ -27,7 +27,7 @@ public class GraphLoader { Path temp = Files.createTempDirectory("sparkGraphFrames"); SparkSession session = SparkSession.builder() .appName("SparkGraphFrameSample") - .config("spark.sql.warehouse.dir", temp.toString())//"/file:C:/temp" + .config("spark.sql.warehouse.dir", temp.toString()) .sparkContext(getSparkContext().sc()) .master("local[*]") .getOrCreate(); diff --git a/apache-spark/src/main/java/com/baeldung/graphframes/User.java b/apache-spark/src/main/java/com/baeldung/graphframes/User.java index 475a9c13ad..50022a1da1 100644 --- a/apache-spark/src/main/java/com/baeldung/graphframes/User.java +++ b/apache-spark/src/main/java/com/baeldung/graphframes/User.java @@ -22,6 +22,6 @@ public class User implements Serializable { @Override public String toString() { - return "struct<" + id + "," + name + ">"; + return "<" + id + "," + name + ">"; } }