diff --git a/apache-spark/pom.xml b/apache-spark/pom.xml index 3df81e5aee..6f62f568ff 100644 --- a/apache-spark/pom.xml +++ b/apache-spark/pom.xml @@ -40,6 +40,12 @@ ${org.apache.spark.spark-mllib.version} provided + + org.apache.spark + spark-graphx_2.12 + ${org.apache.spark.spark-mllib.version} + provided + org.apache.spark spark-streaming-kafka-0-10_2.11 diff --git a/apache-spark/src/main/java/com/baeldung/graphx/GraphAlgorithms.java b/apache-spark/src/main/java/com/baeldung/graphx/GraphAlgorithms.java new file mode 100644 index 0000000000..2c1474d0aa --- /dev/null +++ b/apache-spark/src/main/java/com/baeldung/graphx/GraphAlgorithms.java @@ -0,0 +1,46 @@ +package com.baeldung.graphx; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.log4j.Level; +import org.apache.log4j.Logger; +import org.apache.spark.api.java.function.VoidFunction; +import org.apache.spark.graphx.Graph; +import org.apache.spark.graphx.VertexRDD; +import org.apache.spark.graphx.lib.PageRank; + +import scala.Tuple2; + +public class GraphAlgorithms { + public static Map USERS = new HashMap<>(); + + public static void main(String[] args) { + Logger.getLogger("org").setLevel(Level.OFF); + + GraphLoader loader = new GraphLoader(); + Graph graph = loader.mapUserRelationship(); + + Graph pageRank = PageRank.run(graph, 20, 0.0001, GraphLoader.USER_TAG, + GraphLoader.RELATIONSHIP_TAG); + + VertexRDD usersRDD = pageRank.vertices(); + + System.out.println("---- PageRank: "); + System.out.println("- Users Ranked "); + usersRDD.toJavaRDD() + .foreach((VoidFunction>) tuple -> System.out.println(tuple.toString())); + + System.out.println("---- Connected Components: "); + Graph connectedComponents = graph.ops().connectedComponents(); + + connectedComponents.vertices().toJavaRDD() + .foreach((VoidFunction>) tuple -> System.out.println(tuple.toString())); + + System.out.println("---- Triangle Count: "); + Graph triangleCount = graph.ops().triangleCount(); + + triangleCount.vertices().toJavaRDD() + .foreach((VoidFunction>) tuple -> System.out.println(tuple.toString())); + } +} diff --git a/apache-spark/src/main/java/com/baeldung/graphx/GraphLoader.java b/apache-spark/src/main/java/com/baeldung/graphx/GraphLoader.java new file mode 100644 index 0000000000..8024d2576b --- /dev/null +++ b/apache-spark/src/main/java/com/baeldung/graphx/GraphLoader.java @@ -0,0 +1,97 @@ +package com.baeldung.graphx; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.graphx.Edge; +import org.apache.spark.graphx.Graph; +import org.apache.spark.storage.StorageLevel; + +import scala.Function1; +import scala.Function2; +import scala.Predef; +import scala.reflect.ClassTag; +import scala.reflect.ClassTag$; + +public class GraphLoader { + + public static Map USERS = new HashMap<>(); + public static ClassTag RELATIONSHIP_TAG = ClassTag$.MODULE$.apply(Relationship.class); + public static ClassTag USER_TAG = ClassTag$.MODULE$.apply(User.class); + + public JavaSparkContext getSparkContext() { + SparkConf sparkConf = new SparkConf().setAppName("SparkGraphX").setMaster("local[*]"); + JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf); + return javaSparkContext; + } + + public Graph mapUserRelationship() { + JavaSparkContext javaSparkContext = getSparkContext(); + + List> edges = getEdges(); + + JavaRDD> edgeJavaRDD = javaSparkContext.parallelize(edges); + + ClassTag stringTag = ClassTag$.MODULE$.apply(String.class); + + Graph graph = Graph.fromEdges(edgeJavaRDD.rdd(), "Following", StorageLevel.MEMORY_ONLY(), + StorageLevel.MEMORY_ONLY(), stringTag, stringTag); + + Graph relationshipGraph = graph.mapEdges(new MapRelationship(), RELATIONSHIP_TAG); + Predef.$eq$colon$eq eq = null; + + return relationshipGraph.mapVertices(new MapUser(), USER_TAG, eq); + } + + public List> getEdges() { + List> edges = new ArrayList<>(); + edges.add(new Edge<>(1L, 2L, "Friend")); + edges.add(new Edge<>(1L, 4L, "Following")); + edges.add(new Edge<>(2L, 4L, "Friend")); + edges.add(new Edge<>(3L, 1L, "Relative")); + edges.add(new Edge<>(3L, 4L, "Relative")); + + return edges; + } + + public Map getUsers() { + if (USERS.isEmpty()) { + loadUsers(); + } + + return USERS; + } + + private void loadUsers() { + User john = new User(1L, "John"); + User martin = new User(2L, "Martin"); + User peter = new User(3L, "Peter"); + User alicia = new User(4L, "Alicia"); + + USERS.put(1L, john); + USERS.put(2L, martin); + USERS.put(3L, peter); + USERS.put(4L, alicia); + } + + private static class MapRelationship implements Function1, Relationship>, Serializable { + + @Override + public Relationship apply(Edge edge) { + return new Relationship(edge.attr, new GraphLoader().getUsers().get(edge.srcId()), USERS.get(edge.dstId())); + } + } + + private static class MapUser implements Function2, Serializable { + @Override + public User apply(Object id, String name) { + return new GraphLoader().getUsers().get((Long) id); + } + } +} diff --git a/apache-spark/src/main/java/com/baeldung/graphx/GraphOperations.java b/apache-spark/src/main/java/com/baeldung/graphx/GraphOperations.java new file mode 100644 index 0000000000..defb51f46e --- /dev/null +++ b/apache-spark/src/main/java/com/baeldung/graphx/GraphOperations.java @@ -0,0 +1,53 @@ +package com.baeldung.graphx; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.log4j.Level; +import org.apache.log4j.Logger; +import org.apache.spark.api.java.function.VoidFunction; +import org.apache.spark.graphx.Edge; +import org.apache.spark.graphx.Graph; +import org.apache.spark.graphx.VertexRDD; + +import scala.Tuple2; + +public class GraphOperations { + public static Map USERS = new HashMap<>(); + + public static void main(String[] args) { + Logger.getLogger("org").setLevel(Level.OFF); + GraphOperations operations = new GraphOperations(); + operations.doOperations(); + } + + private void doOperations() { + GraphLoader loader = new GraphLoader(); + Graph userGraph = loader.mapUserRelationship(); + + System.out.println("Mapped Users: "); + userGraph.vertices().toJavaRDD().foreach((VoidFunction>) tuple -> System.out + .println("id: " + tuple._1 + " name: " + tuple._2)); + + System.out.println("Mapped Relationships: "); + userGraph.edges().toJavaRDD() + .foreach((VoidFunction>) edge -> System.out.println(edge.attr().toString())); + + VertexRDD degreesVerticesRDD = userGraph.ops().degrees(); + VertexRDD inDegreesVerticesRDD = userGraph.ops().inDegrees(); + VertexRDD outDegreesVerticesRDD = userGraph.ops().outDegrees(); + + System.out.println("degrees: "); + degreesVerticesRDD.toJavaRDD().foreach((VoidFunction>) tuple -> System.out + .println("id: " + tuple._1 + " count: " + tuple._2)); + + System.out.println("inDegrees: "); + inDegreesVerticesRDD.toJavaRDD().foreach((VoidFunction>) tuple -> System.out + .println("id: " + tuple._1 + " count: " + tuple._2)); + + System.out.println("outDegrees: "); + outDegreesVerticesRDD.toJavaRDD().foreach((VoidFunction>) tuple -> System.out + .println("id: " + tuple._1 + " count: " + tuple._2)); + } + +} diff --git a/apache-spark/src/main/java/com/baeldung/graphx/Relationship.java b/apache-spark/src/main/java/com/baeldung/graphx/Relationship.java new file mode 100644 index 0000000000..2a022e98e8 --- /dev/null +++ b/apache-spark/src/main/java/com/baeldung/graphx/Relationship.java @@ -0,0 +1,30 @@ +package com.baeldung.graphx; + +public class Relationship { + private String type; + private User source; + private User target; + + public Relationship(String type, User source, User target) { + this.type = type; + this.source = source; + this.target = target; + } + + public String getType() { + return type; + } + + public User getSource() { + return source; + } + + public User getTarget() { + return target; + } + + @Override + public String toString() { + return getSource().toString() + " -- " + getType() + " --> " + getTarget().toString(); + } +} diff --git a/apache-spark/src/main/java/com/baeldung/graphx/User.java b/apache-spark/src/main/java/com/baeldung/graphx/User.java new file mode 100644 index 0000000000..98bf7768b0 --- /dev/null +++ b/apache-spark/src/main/java/com/baeldung/graphx/User.java @@ -0,0 +1,25 @@ +package com.baeldung.graphx; + +public class User { + + private Long id; + private String name; + + public User(long id, String name) { + this.id = id; + this.name = name; + } + + public Long getId() { + return id; + } + + public String getName() { + return name; + } + + @Override + public String toString() { + return "(" + id + "-" + name + ")"; + } +}