Changes related to Apache Spark GraphX article
This commit is contained in:
@@ -40,6 +40,12 @@
|
||||
<version>${org.apache.spark.spark-mllib.version}</version>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.spark</groupId>
|
||||
<artifactId>spark-graphx_2.12</artifactId>
|
||||
<version>${org.apache.spark.spark-mllib.version}</version>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.spark</groupId>
|
||||
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
|
||||
|
||||
@@ -0,0 +1,46 @@
|
||||
package com.baeldung.graphx;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.log4j.Level;
|
||||
import org.apache.log4j.Logger;
|
||||
import org.apache.spark.api.java.function.VoidFunction;
|
||||
import org.apache.spark.graphx.Graph;
|
||||
import org.apache.spark.graphx.VertexRDD;
|
||||
import org.apache.spark.graphx.lib.PageRank;
|
||||
|
||||
import scala.Tuple2;
|
||||
|
||||
public class GraphAlgorithms {
|
||||
public static Map<Long, User> USERS = new HashMap<>();
|
||||
|
||||
public static void main(String[] args) {
|
||||
Logger.getLogger("org").setLevel(Level.OFF);
|
||||
|
||||
GraphLoader loader = new GraphLoader();
|
||||
Graph<User, Relationship> graph = loader.mapUserRelationship();
|
||||
|
||||
Graph<Object, Object> pageRank = PageRank.run(graph, 20, 0.0001, GraphLoader.USER_TAG,
|
||||
GraphLoader.RELATIONSHIP_TAG);
|
||||
|
||||
VertexRDD<Object> usersRDD = pageRank.vertices();
|
||||
|
||||
System.out.println("---- PageRank: ");
|
||||
System.out.println("- Users Ranked ");
|
||||
usersRDD.toJavaRDD()
|
||||
.foreach((VoidFunction<Tuple2<Object, Object>>) tuple -> System.out.println(tuple.toString()));
|
||||
|
||||
System.out.println("---- Connected Components: ");
|
||||
Graph<Object, Relationship> connectedComponents = graph.ops().connectedComponents();
|
||||
|
||||
connectedComponents.vertices().toJavaRDD()
|
||||
.foreach((VoidFunction<Tuple2<Object, Object>>) tuple -> System.out.println(tuple.toString()));
|
||||
|
||||
System.out.println("---- Triangle Count: ");
|
||||
Graph<Object, Relationship> triangleCount = graph.ops().triangleCount();
|
||||
|
||||
triangleCount.vertices().toJavaRDD()
|
||||
.foreach((VoidFunction<Tuple2<Object, Object>>) tuple -> System.out.println(tuple.toString()));
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,97 @@
|
||||
package com.baeldung.graphx;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.spark.SparkConf;
|
||||
import org.apache.spark.api.java.JavaRDD;
|
||||
import org.apache.spark.api.java.JavaSparkContext;
|
||||
import org.apache.spark.graphx.Edge;
|
||||
import org.apache.spark.graphx.Graph;
|
||||
import org.apache.spark.storage.StorageLevel;
|
||||
|
||||
import scala.Function1;
|
||||
import scala.Function2;
|
||||
import scala.Predef;
|
||||
import scala.reflect.ClassTag;
|
||||
import scala.reflect.ClassTag$;
|
||||
|
||||
public class GraphLoader {
|
||||
|
||||
public static Map<Long, User> USERS = new HashMap<>();
|
||||
public static ClassTag<Relationship> RELATIONSHIP_TAG = ClassTag$.MODULE$.apply(Relationship.class);
|
||||
public static ClassTag<User> USER_TAG = ClassTag$.MODULE$.apply(User.class);
|
||||
|
||||
public JavaSparkContext getSparkContext() {
|
||||
SparkConf sparkConf = new SparkConf().setAppName("SparkGraphX").setMaster("local[*]");
|
||||
JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);
|
||||
return javaSparkContext;
|
||||
}
|
||||
|
||||
public Graph<User, Relationship> mapUserRelationship() {
|
||||
JavaSparkContext javaSparkContext = getSparkContext();
|
||||
|
||||
List<Edge<String>> edges = getEdges();
|
||||
|
||||
JavaRDD<Edge<String>> edgeJavaRDD = javaSparkContext.parallelize(edges);
|
||||
|
||||
ClassTag<String> stringTag = ClassTag$.MODULE$.apply(String.class);
|
||||
|
||||
Graph<String, String> graph = Graph.fromEdges(edgeJavaRDD.rdd(), "Following", StorageLevel.MEMORY_ONLY(),
|
||||
StorageLevel.MEMORY_ONLY(), stringTag, stringTag);
|
||||
|
||||
Graph<String, Relationship> relationshipGraph = graph.mapEdges(new MapRelationship(), RELATIONSHIP_TAG);
|
||||
Predef.$eq$colon$eq<String, User> eq = null;
|
||||
|
||||
return relationshipGraph.mapVertices(new MapUser(), USER_TAG, eq);
|
||||
}
|
||||
|
||||
public List<Edge<String>> getEdges() {
|
||||
List<Edge<String>> edges = new ArrayList<>();
|
||||
edges.add(new Edge<>(1L, 2L, "Friend"));
|
||||
edges.add(new Edge<>(1L, 4L, "Following"));
|
||||
edges.add(new Edge<>(2L, 4L, "Friend"));
|
||||
edges.add(new Edge<>(3L, 1L, "Relative"));
|
||||
edges.add(new Edge<>(3L, 4L, "Relative"));
|
||||
|
||||
return edges;
|
||||
}
|
||||
|
||||
public Map<Long, User> getUsers() {
|
||||
if (USERS.isEmpty()) {
|
||||
loadUsers();
|
||||
}
|
||||
|
||||
return USERS;
|
||||
}
|
||||
|
||||
private void loadUsers() {
|
||||
User john = new User(1L, "John");
|
||||
User martin = new User(2L, "Martin");
|
||||
User peter = new User(3L, "Peter");
|
||||
User alicia = new User(4L, "Alicia");
|
||||
|
||||
USERS.put(1L, john);
|
||||
USERS.put(2L, martin);
|
||||
USERS.put(3L, peter);
|
||||
USERS.put(4L, alicia);
|
||||
}
|
||||
|
||||
private static class MapRelationship implements Function1<Edge<String>, Relationship>, Serializable {
|
||||
|
||||
@Override
|
||||
public Relationship apply(Edge<String> edge) {
|
||||
return new Relationship(edge.attr, new GraphLoader().getUsers().get(edge.srcId()), USERS.get(edge.dstId()));
|
||||
}
|
||||
}
|
||||
|
||||
private static class MapUser implements Function2<Object, String, User>, Serializable {
|
||||
@Override
|
||||
public User apply(Object id, String name) {
|
||||
return new GraphLoader().getUsers().get((Long) id);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,53 @@
|
||||
package com.baeldung.graphx;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.log4j.Level;
|
||||
import org.apache.log4j.Logger;
|
||||
import org.apache.spark.api.java.function.VoidFunction;
|
||||
import org.apache.spark.graphx.Edge;
|
||||
import org.apache.spark.graphx.Graph;
|
||||
import org.apache.spark.graphx.VertexRDD;
|
||||
|
||||
import scala.Tuple2;
|
||||
|
||||
public class GraphOperations {
|
||||
public static Map<Long, User> USERS = new HashMap<>();
|
||||
|
||||
public static void main(String[] args) {
|
||||
Logger.getLogger("org").setLevel(Level.OFF);
|
||||
GraphOperations operations = new GraphOperations();
|
||||
operations.doOperations();
|
||||
}
|
||||
|
||||
private void doOperations() {
|
||||
GraphLoader loader = new GraphLoader();
|
||||
Graph<User, Relationship> userGraph = loader.mapUserRelationship();
|
||||
|
||||
System.out.println("Mapped Users: ");
|
||||
userGraph.vertices().toJavaRDD().foreach((VoidFunction<Tuple2<Object, User>>) tuple -> System.out
|
||||
.println("id: " + tuple._1 + " name: " + tuple._2));
|
||||
|
||||
System.out.println("Mapped Relationships: ");
|
||||
userGraph.edges().toJavaRDD()
|
||||
.foreach((VoidFunction<Edge<Relationship>>) edge -> System.out.println(edge.attr().toString()));
|
||||
|
||||
VertexRDD<Object> degreesVerticesRDD = userGraph.ops().degrees();
|
||||
VertexRDD<Object> inDegreesVerticesRDD = userGraph.ops().inDegrees();
|
||||
VertexRDD<Object> outDegreesVerticesRDD = userGraph.ops().outDegrees();
|
||||
|
||||
System.out.println("degrees: ");
|
||||
degreesVerticesRDD.toJavaRDD().foreach((VoidFunction<Tuple2<Object, Object>>) tuple -> System.out
|
||||
.println("id: " + tuple._1 + " count: " + tuple._2));
|
||||
|
||||
System.out.println("inDegrees: ");
|
||||
inDegreesVerticesRDD.toJavaRDD().foreach((VoidFunction<Tuple2<Object, Object>>) tuple -> System.out
|
||||
.println("id: " + tuple._1 + " count: " + tuple._2));
|
||||
|
||||
System.out.println("outDegrees: ");
|
||||
outDegreesVerticesRDD.toJavaRDD().foreach((VoidFunction<Tuple2<Object, Object>>) tuple -> System.out
|
||||
.println("id: " + tuple._1 + " count: " + tuple._2));
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,30 @@
|
||||
package com.baeldung.graphx;
|
||||
|
||||
public class Relationship {
|
||||
private String type;
|
||||
private User source;
|
||||
private User target;
|
||||
|
||||
public Relationship(String type, User source, User target) {
|
||||
this.type = type;
|
||||
this.source = source;
|
||||
this.target = target;
|
||||
}
|
||||
|
||||
public String getType() {
|
||||
return type;
|
||||
}
|
||||
|
||||
public User getSource() {
|
||||
return source;
|
||||
}
|
||||
|
||||
public User getTarget() {
|
||||
return target;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return getSource().toString() + " -- " + getType() + " --> " + getTarget().toString();
|
||||
}
|
||||
}
|
||||
25
apache-spark/src/main/java/com/baeldung/graphx/User.java
Normal file
25
apache-spark/src/main/java/com/baeldung/graphx/User.java
Normal file
@@ -0,0 +1,25 @@
|
||||
package com.baeldung.graphx;
|
||||
|
||||
public class User {
|
||||
|
||||
private Long id;
|
||||
private String name;
|
||||
|
||||
public User(long id, String name) {
|
||||
this.id = id;
|
||||
this.name = name;
|
||||
}
|
||||
|
||||
public Long getId() {
|
||||
return id;
|
||||
}
|
||||
|
||||
public String getName() {
|
||||
return name;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "(" + id + "-" + name + ")";
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user