← back to the blog


GraphX- Pregel internals and tips for distributed graph processing

Posted on January 10th, 2017 in Apache Spark by Rohit Kumar

Spark-GraphX is the graph processing framework provided by Spark to support distributed graph processing. GraphX is based on RDDs and is actually a combination of VertexRDD and EdgeRDD representing the vertices and edges of the graph respectively. An edge in GraphX is always a directed edge from a source vertex to a destination vertex. GraphX employs many other optimizations to support efficient distributed graph processing but is out of scope for this blog. For the details of the internal optimizations, we refer the readers to the GraphX Paper. Apart from these two, there is another EdgeTrippletRDD which is materialized on the fly as and when needed.

The pregel API requires the user to provide three functions: 

  1. Vertex Program: This program runs on every vertex on the VertexRDD of the graph. It takes a message list as input and also has access to the current state of the vertex attribute and vertex id. The output is the new state of the vertex which updates the vertex Attribute.
  2. Send Message Program: This program runs on the required edges in the EdgeRDD.  It takes the triplet view as the input with all the attributes materialized. This generates a message for the source vertex of the processed edge.
  3. Merge Message Program: This program takes two messages meant for the same vertex and combines them into one message. The output is the combined message. These messages result in the generation of the messageRDD which has key as the vertex id and value as the messages for that vertex.

Apart from the above user defined functions the Pregel API also requires following three parameters:

  1. Initial message: The initial message to start the computation. This message is passed to all the vertices in the vertexRDD to do the 1st iteration.
  2. Max iteration: The max number of super steps for the Pregel API.
  3. Edge direction: This is used to filter the edges on which send message function will run. For example, if the edge direction is out then only the edges going out from the vertex on which the vertex program ran on the previous super step will execute the send message function.

 The code for the Pregel API can be found here. The computation is divided into 2 parts:

  1. Running the vertex Program on the vertexRDD: For the 1st iteration (super step 0) the vertex program runs on every vertex of the vertexRDD and all the vertices receive the complete initial message. On the subsequent iterations (super steps 1 onward) vertex program will run only on the vertices for which messages have been generated. This is achieved by doing a join over the vertexRDD and the messageRDD using the mapVertices function provided by GraphX.  
  2. Generating Message for the next Iteration: After the vertex program execution is over for all the vertices in a super step, based on the edge direction provided send message program is executed on the set of edges which are incident on the vertices which ran the vertex program in the current super step. The triplet view is updated for this execution which takes care of shipping the new vertices to all the active edge. Please note here all the attributes of the triplet i.e source vertex, destination vertex, and edge attributes are materialized. After the edge computation is over the reduce stage runs which combine the messages using the user-defined merge message program. The whole step is executed using the mapReduceTriplets function in the GraphX API which internally calls a modified version of aggregateMessages function.

 

Below is a pictorial representation of the whole Pregel API steps in GraphX. The new vertex attributes are shipped to the edges by materializing the edge triplet RDD. 

  Smiley face

Few notes for developers to ensure better performance and accuracy:

  • The vertex Attributes in the vertexRDD can be complex Scala objects if they are serializable. One should make sure to keep the vertex object as small as possible as this object will be shipped to all the partitions to update the triplet view during super steps.
  • The vertex attribute can only be made of immutable variables else the vertex state will not be synced properly in the super steps resulting in wrong computation. This is a bug in the current GraphX (Issue Link)
  • The Graph partitioning only partitions the edgeRDD and hence the vertexRDD needs to be partitioned separately by the user if needed. From my experience, it is good to keep the number of partitions for vertexRDD and edgeRDD same for better performance.
  • Partitioning the graph correctly over the default random hash partitioning gives good performance improvements. As per my experience EdgePartition2D performs best most of the time. This will depend on the nature of the graph.
  • If the graph is evolving (new vertex or edges are being added) currently the graph needs to be recreated and hence it is necessary to do checkpointing else in case of node failure recovery time will be a lot.