GraphX编程指南
阅读原文时间:2022年03月26日阅读:1

GraphX编程指南

概述

GraphX是Spark中用于图形和图形并行计算的新组件。在较高的层次上,GraphX通过引入新的Graph抽象来扩展Spark RDD:一个有向多图,其属性附加到每个顶点和边上。为了支持图计算,GraphX公开了一组基本的算子(例如,子图joinVertices和 aggregateMessages),以及所述的优化的变体预凝胶API。此外,GraphX包括越来越多的图形算法和 构建器集合,以简化图形分析任务。

GraphX is a new component in Spark for graphs and graph-parallel computation. At a high level, GraphX extends the Spark RDD by introducing a new Graph abstraction: a directed multigraph with properties attached to each vertex and edge. To support graph computation, GraphX exposes a set of fundamental operators (e.g., subgraphjoinVertices, and aggregateMessages) as well as an optimized variant of the Pregel API. In addition, GraphX includes a growing collection of graph algorithms and builders to simplify graph analytics tasks.

开始

首先,首先需要将Spark和GraphX导入到项目中,如下所示:

import org.apache.spark._

import org.apache.spark.graphx._

// To make some of the examples work we will also need RDD

import org.apache.spark.rdd.RDD

如果不使用Spark Shell,需要一个SparkContext。要了解有关Spark入门的更多信息,参考《Spark快速入门指南》

属性图

属性曲线图是一个有向多重图与连接到每个顶点和边缘的用户定义的对象。有向多重图是有向图,其中潜在的多个平行边共享相同的源和目标顶点。支持平行边的能力,简化了在相同顶点之间可能存在多个关系(例如,同事和朋友)的建模场景。每个顶点都由_唯一的_64位长标识符(VertexId)进行键控。GraphX对顶点标识符没有施加任何排序约束。同样,边具有相应的源和目标顶点标识符。

在顶点(VD)和边(ED)类型上对属性图进行了参数化。这些是分别与每个顶点和边关联的对象的类型。

当顶点和边类型是原始数据类型(例如int,double等)时,GraphX可以优化表示形式,方法是将存储在专用数组中,从而减少了内存占用量。

在某些情况下,可能希望在同一图中具有具有不同属性类型的顶点。可以通过继承来实现。例如,要将用户和产品建模为二部图,可以执行以下算子:

class VertexProperty()

case class UserProperty(val name: String) extends VertexProperty

case class ProductProperty(val name: String, val price: Double) extends VertexProperty

// The graph might then have the type:

var graph: Graph[VertexProperty, String] = null

与RDD一样,属性图是不可变的,分布式的和容错的。图的值或结构的更改是通过生成具有所需更改的新图来完成的。原始图的实质部分(即不受影响的结构,属性和索引)在新图中被重用,从而降低了这种固有功能数据结构的成本。使用一系列顶点分区试探法在执行程序之间划分图。与RDD一样,发生故障时,可以在不同的计算机上重新创建图的每个分区。

从逻辑上讲,属性图对应于一对类型化集合(RDD),对每个顶点和边的属性进行编码。结果,图类包含访问图的顶点和边的成员:

class Graph[VD, ED] {

val vertices: VertexRDD[VD]

val edges: EdgeRDD[ED]

}

类VertexRDD[VD]和EdgeRDD[ED]延伸,且被优化的版本RDD[(VertexId, VD)]和RDD[Edge[ED]]分别。双方VertexRDD[VD]并EdgeRDD[ED]提供围绕图形计算,利用内部优化内置附加功能。将在有关顶点和边缘RDD的部分中更详细地讨论 VertexRDDVertexRDD和EdgeRDDEdgeRDD API,可以将简单地视为形式为RDD[(VertexId, VD)]和RDD[Edge[ED]]的RDD。

属性图示例

假设要构造一个由GraphX项目中的各个协作者组成的属性图。顶点属性可能包含用户名和职业。可以用描述协作者之间关系的字符串注释边缘:

结果图将具有类型签名:

val userGraph: Graph[(String, String), String]

有许多方法可以从原始文件,RDD甚至合成生成器构造属性图,有关这些图的详细信息,参见图构建器一节。可能最通用的方法是使用Graph对象。例如,以下代码从RDD的集合构造一个图形:

// Assume the SparkContext has already been constructed

val sc: SparkContext

// Create an RDD for the vertices

val users: RDD[(VertexId, (String, String))] =

sc.parallelize(Seq((3L, ("rxin", "student")), (7L, ("jgonzal", "postdoc")),

(5L, ("franklin", "prof")), (2L, ("istoica", "prof"))))

// Create an RDD for edges

val relationships: RDD[Edge[String]] =

sc.parallelize(Seq(Edge(3L, 7L, "collab"),    Edge(5L, 3L, "advisor"),

Edge(2L, 5L, "colleague"), Edge(5L, 7L, "pi")))

// Define a default user in case there are relationship with missing user

val defaultUser = ("John Doe", "Missing")

// Build the initial Graph

val graph = Graph(users, relationships, defaultUser)

在上面的示例中,使用了Edgecase类。边缘具有srcId和,分别 dstId对应于源和目标顶点标识符。另外,Edge类具有attr存储edge属性的成员。

可以分别使用graph.vertices 和graph.edges成员将图解构为相应的顶点和边缘视图。

val graph: Graph[(String, String), String] // Constructed from above

// Count all users which are postdocs

graph.vertices.filter { case (id, (name, pos)) => pos == "postdoc" }.count

// Count all the edges where src > dst

graph.edges.filter(e => e.srcId > e.dstId).count

graph.vertices返回VertexRDD[(String, String)]扩展了的 RDD[(VertexId, (String, String))],使用scalacase表达式来解构元组。另一方面,graph.edges返回一个EdgeRDD包含Edge[String]对象。使用case类类型构造函数,如下所示:

graph.edges.filter { case Edge(src, dst, prop) => src > dst }.count

除了属性图的顶点和边缘视图外,GraphX还公开了一个三元组视图。三元组视图在逻辑上将顶点和边缘属性结合在一起,从而产生RDD[EdgeTriplet[VD, ED]]了EdgeTriplet该类的包含实例。可以在下面的SQL表达式来表示:

SELECT src.id, dst.id, src.attr, e.attr, dst.attr

FROM edges AS e LEFT JOIN vertices AS src, vertices AS dst

ON e.srcId = src.Id AND e.dstId = dst.Id

或以图形方式显示为:

EdgeTriplet类扩展Edge通过添加类srcAttr和 dstAttr分别包含源和目的地属性成员。可以使用图形的三元组视图来呈现描述用户之间关系的字符串集合。

val graph: Graph[(String, String), String] // Constructed from above

// Use the triplets view to create an RDD of facts.

val facts: RDD[String] =

graph.triplets.map(triplet =>

triplet.srcAttr._1 + " is the " + triplet.attr + " of " + triplet.dstAttr._1)

facts.collect.foreach(println(_))

图算子

正如RDD具有,,和等基本算子一样map,属性图也具有基本算子的集合,这些算子采用用户定义的函数,生成具有转换后的属性和结构的新图。定义了具有优化实现的核心算子,定义了表示为核心算子组成的便捷算子。但是,由于使用Scala隐式,算子可以作为的成员自动使用。例如,可以通过以下方法计算每个顶点(在中定义)的度数:filterreduceByKeyGraphGraphOpsGraphOpsGraphGraphOps

val graph: Graph[(String, String), String]

// Use the implicit GraphOps.inDegrees operator

val inDegrees: VertexRDD[Int] = graph.inDegrees

区分核心图形算子的原因,GraphOps是将来能够支持不同的图形表示。每个图形表示形式都必须提供核心算子的实现,重用中定义的许多有用的算子GraphOps

算子摘要列表

下面是在这两个定义的功能的快速摘要,Graph和 GraphOps,呈现为为简单起见图的成员。某些功能签名已得到简化(例如,删除了默认参数和类型约束),且已删除了一些更高级的功能,查阅API文档以获取正式的算子清单。

/** Summary of the functionality in the property graph */

class Graph[VD, ED] {

// Information about the Graph ===================================================================

val numEdges: Long

val numVertices: Long

val inDegrees: VertexRDD[Int]

val outDegrees: VertexRDD[Int]

val degrees: VertexRDD[Int]

// Views of the graph as collections =============================================================

val vertices: VertexRDD[VD]

val edges: EdgeRDD[ED]

val triplets: RDD[EdgeTriplet[VD, ED]]

// Functions for caching graphs ==================================================================

def persist(newLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, ED]

def cache(): Graph[VD, ED]

def unpersistVertices(blocking: Boolean = false): Graph[VD, ED]

// Change the partitioning heuristic  ============================================================

def partitionBy(partitionStrategy: PartitionStrategy): Graph[VD, ED]

// Transform vertex and edge attributes ==========================================================

def mapVertices[VD2](map: (VertexId, VD) => VD2): Graph[VD2, ED]

def mapEdges[ED2](map: Edge[ED] => ED2): Graph[VD, ED2]

def mapEdges[ED2](map: (PartitionID, Iterator[Edge[ED]]) => Iterator[ED2]): Graph[VD, ED2]

def mapTriplets[ED2](map: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2]

def mapTriplets[ED2](map: (PartitionID, Iterator[EdgeTriplet[VD, ED]]) => Iterator[ED2])

: Graph[VD, ED2]

// Modify the graph structure ====================================================================

def reverse: Graph[VD, ED]

def subgraph(

epred: EdgeTriplet[VD,ED] => Boolean = (x => true),

vpred: (VertexId, VD) => Boolean = ((v, d) => true))

: Graph[VD, ED]

def mask[VD2, ED2](other: Graph[VD2, ED2]): Graph[VD, ED]

def groupEdges(merge: (ED, ED) => ED): Graph[VD, ED]

// Join RDDs with the graph ======================================================================

def joinVertices[U](table: RDD[(VertexId, U)])(mapFunc: (VertexId, VD, U) => VD): Graph[VD, ED]

def outerJoinVertices[U, VD2](other: RDD[(VertexId, U)])

(mapFunc: (VertexId, VD, Option[U]) => VD2)

: Graph[VD2, ED]

// Aggregate information about adjacent triplets =================================================

def collectNeighborIds(edgeDirection: EdgeDirection): VertexRDD[Array[VertexId]]

def collectNeighbors(edgeDirection: EdgeDirection): VertexRDD[Array[(VertexId, VD)]]

def aggregateMessages[Msg: ClassTag](

sendMsg: EdgeContext[VD, ED, Msg] => Unit,

mergeMsg: (Msg, Msg) => Msg,

tripletFields: TripletFields = TripletFields.All)

: VertexRDD[A]

// Iterative graph-parallel computation ==========================================================

def pregel[A](initialMsg: A, maxIterations: Int, activeDirection: EdgeDirection)(

vprog: (VertexId, VD, A) => VD,

sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],

mergeMsg: (A, A) => A)

: Graph[VD, ED]

// Basic graph algorithms ========================================================================

def pageRank(tol: Double, resetProb: Double = 0.15): Graph[Double, Double]

def connectedComponents(): Graph[VertexId, ED]

def triangleCount(): Graph[Int, ED]

def stronglyConnectedComponents(numIter: Int): Graph[VertexId, ED]

}

Property 算子

与RDDmap算子一样,属性图包含以下内容:

class Graph[VD, ED] {

def mapVertices[VD2](map: (VertexId, VD) => VD2): Graph[VD2, ED]

def mapEdges[ED2](map: Edge[ED] => ED2): Graph[VD, ED2]

def mapTriplets[ED2](map: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2]

}

这些算子中的每一个都会产生一个新图,其顶点或边线属性由用户定义的map函数修改。

在每种情况下,图形结构均不受影响。这是这些算子的关键功能,允许生成的图重新使用原始图的结构索引。以下代码段在逻辑上是等效的,第一个代码段不会保留结构索引,也不会从GraphX系统优化中受益:

val newVertices = graph.vertices.map { case (id, attr) => (id, mapUdf(id, attr)) }

val newGraph = Graph(newVertices, graph.edges)

而是使用mapVertices(ClassTag[VD2]):Graph[VD2,ED])保留索引:

val newGraph = graph.mapVertices((id, attr) => mapUdf(id, attr))

这些算子通常用于初始化图形,进行特定计算或投影出不必要的属性。例如,给定一个以out度作为顶点属性的图(稍后将描述如何构造这种图),将其初始化为PageRank:

// Given a graph where the vertex property is the out degree

val inputGraph: Graph[Int, String] =

graph.outerJoinVertices(graph.outDegrees)((vid, _, degOpt) => degOpt.getOrElse(0))

// Construct a graph where each edge contains the weight

// and each vertex is the initial PageRank

val outputGraph: Graph[Double, Double] =

inputGraph.mapTriplets(triplet => 1.0 / triplet.srcAttr).mapVertices((id, _) => 1.0)

结构化算子

当前,GraphX仅支持一组简单的常用结构化算子,希望将来会增加更多。以下是基本结构算子的列表。

class Graph[VD, ED] {

def reverse: Graph[VD, ED]

def subgraph(epred: EdgeTriplet[VD,ED] => Boolean,

vpred: (VertexId, VD) => Boolean): Graph[VD, ED]

def mask[VD2, ED2](other: Graph[VD2, ED2]): Graph[VD, ED]

def groupEdges(merge: (ED, ED) => ED): Graph[VD,ED]

}

reverse算子将返回逆转的所有边缘方向上的新图。例如,在尝试计算逆PageRank时,这将很有用。由于反向算子不会修改顶点或边的属性或更改边的数量,因此可以有效地实现,无需移动或复制数据。

subgraph%E2%87%92Boolean):Graph[VD,ED])算子者需要的顶点和边缘的谓词,返回包含只有满足顶点谓词(评估为真)的顶点和满足边缘谓词边缘的曲线和满足顶点谓词连接顶点。所述subgraph 算子可在情况编号被用来限制图形以顶点和感兴趣的边缘或消除断开的链接。例如,在下面的代码中,删除了断开的链接:

// Create an RDD for the vertices

val users: RDD[(VertexId, (String, String))] =

sc.parallelize(Seq((3L, ("rxin", "student")), (7L, ("jgonzal", "postdoc")),

(5L, ("franklin", "prof")), (2L, ("istoica", "prof")),

(4L, ("peter", "student"))))

// Create an RDD for edges

val relationships: RDD[Edge[String]] =

sc.parallelize(Seq(Edge(3L, 7L, "collab"),    Edge(5L, 3L, "advisor"),

Edge(2L, 5L, "colleague"), Edge(5L, 7L, "pi"),

Edge(4L, 0L, "student"),   Edge(5L, 0L, "colleague")))

// Define a default user in case there are relationship with missing user

val defaultUser = ("John Doe", "Missing")

// Build the initial Graph

val graph = Graph(users, relationships, defaultUser)

// Notice that there is a user 0 (for which we have no information) connected to users

// 4 (peter) and 5 (franklin).

graph.triplets.map(

triplet => triplet.srcAttr._1 + " is the " + triplet.attr + " of " + triplet.dstAttr._1

).collect.foreach(println(_))

// Remove missing vertices as well as the edges to connected to them

val validGraph = graph.subgraph(vpred = (id, attr) => attr._2 != "Missing")

// The valid subgraph will disconnect users 4 and 5 by removing user 0

validGraph.vertices.collect.foreach(println(_))

validGraph.triplets.map(

triplet => triplet.srcAttr._1 + " is the " + triplet.attr + " of " + triplet.dstAttr._1

).collect.foreach(println(_))

在以上示例中,仅提供了顶点谓词。subgraph算子者默认为true,如果不设置顶点或边谓词。

mask:Graph[VD,ED])算子者通过返回包含该顶点和边,在输入图形中发现的曲线构造一个子图。可以将其与subgraph算子结合使用,基于另一个相关图形中的属性来限制图形。例如,可能使用缺少顶点的图来运行连接的组件,然后将答案限制为有效的子图。

// Run Connected Components

val ccGraph = graph.connectedComponents() // No longer contains missing field

// Remove missing vertices as well as the edges to connected to them

val validGraph = graph.subgraph(vpred = (id, attr) => attr._2 != "Missing")

// Restrict the answer to the valid subgraph

val validCCGraph = ccGraph.mask(validGraph)

groupEdges:Graph[VD,ED])算子者合并平行边在多重图(即,顶点对之间的重复边缘)。在许多数值应用程序中,可以将平行边添加 (合并权重)到单个边中,从而减小图形的大小。

Join算子

在许多情况下,有必要将外部集合(RDD)中的数据与图形连接起来。例如,可能有想要与现有图合并的额外用户属性,或者可能希望将顶点属性从一个图拉到另一个图。这些任务可以使用联接算子来完成。下面列出了关键的联接算子:

class Graph[VD, ED] {

def joinVertices[U](table: RDD[(VertexId, U)])(map: (VertexId, VD, U) => VD)

: Graph[VD, ED]

def outerJoinVertices[U, VD2](table: RDD[(VertexId, U)])(map: (VertexId, VD, Option[U]) => VD2)

: Graph[VD2, ED]

}

joinVertices((VertexId,VD,U)%E2%87%92VD)(ClassTag[U]):Graph[VD,ED])算子连接与输入RDD顶点并返回与通过应用用户定义获得的顶点属性的新图形map函数到连接的顶点的结果。RDD中没有匹配值的顶点将保留其原始值。

如果RDD对于给定的顶点包含多个值,则只会使用一个。因此,建议使用以下命令使输入RDD唯一,这也将对结果值进行预索引,以显着加速后续的连接。

val nonUniqueCosts: RDD[(VertexId, Double)]

val uniqueCosts: VertexRDD[Double] =

graph.vertices.aggregateUsingIndex(nonUnique, (a,b) => a + b)

val joinedGraph = graph.joinVertices(uniqueCosts)(

(id, oldCost, extraCost) => oldCost + extraCost)

除了将用户定义的函数应用于所有顶点并可以更改顶点属性类型外,其它outerJoinVertices((VertexId,VD,Option[U])%E2%87%92VD2)(ClassTag[U],ClassTag[VD2]):Graph[VD2,ED])行为与“一般”相似。由于并非所有顶点在输入RDD中都可能具有匹配值,因此该函数采用一种类型。例如,可以通过使用初始化顶点属性来为PageRank设置图形。joinVerticesmapmapOptionoutDegree

val outDegrees: VertexRDD[Int] = graph.outDegrees

val degreeGraph = graph.outerJoinVertices(outDegrees) { (id, oldAttr, outDegOpt) =>

outDegOpt match {

case Some(outDeg) => outDeg

case None => 0 // No outDegree means zero outDegree

}

}

可能已经注意到f(a)(b)上面示例中使用的多个参数列表(例如)咖喱函数模式。虽然可以同样地写f(a)(b),f(a,b),这意味着对类型的推断b将不依赖a。结果,用户将需要为用户定义的函数提供类型注释:

val joinedGraph = graph.joinVertices(uniqueCosts,

(id: VertexId, oldCost: Double, extraCost: Double) => oldCost + extraCost)

邻域聚集

许多图形分析任务中的关键步骤是汇总有关每个顶点邻域的信息。例如,可能想知道每个用户拥有的关注者数量或每个用户的关注者平均年龄。许多迭代图算法(例如,PageRank,最短路径和连接的组件)反复聚合相邻顶点的属性(例如,当前的PageRank值,最短路径以及最小的可到达顶点ID)。

为了提高性能,主要聚合算子更改 graph.mapReduceTriplets为graph.AggregateMessages。尽管API的更改相对较小,但在下面提供了过渡指南。

汇总消息(a​​ggregateMessages

GraphX中的核心聚合算子为aggregateMessages%E2%87%92A,TripletFields)(ClassTag[A]):VertexRDD[A])。该算子将用户定义的sendMsg函数应用于图形中的每个边三元组,然后使用该mergeMsg函数在其目标顶点处聚合这些消息。

class Graph[VD, ED] {

def aggregateMessages[Msg: ClassTag](

sendMsg: EdgeContext[VD, ED, Msg] => Unit,

mergeMsg: (Msg, Msg) => Msg,

tripletFields: TripletFields = TripletFields.All)

: VertexRDD[Msg]

}

用户定义的sendMsg函数采用EdgeContext,将开源和目标属性以及边缘属性和函数(sendToSrcsendToDst),将消息发送到源和目标属性。可以将其sendMsg视为 map-reduce中的map函数。用户定义的mergeMsg函数接受两条发往同一顶点的消息,产生一条消息。可以认为是map-reduce中mergeMsg的reduce函数。所述 aggregateMessages%E2%87%92A,TripletFields)(ClassTag[A]):VertexRDD[A])算子者返回一个VertexRDD[Msg] 包含该集合消息(类型的Msg)发往每个顶点。未收到消息的顶点不包括在返回的VertexRDDVertexRDD中

另外,aggregateMessages%E2%87%92A,TripletFields)(ClassTag[A]):VertexRDD[A])采用一个可选参数 tripletsFields,该参数指示访问了哪些数据EdgeContext (即,源顶点属性,而不是目标顶点属性)。可能选项在tripletsFields中定义,TripletFields默认值为TripletFields.All,指示用户定义的sendMsg函数可以访问任何字段EdgeContext。该tripletFields参数可用于通知GraphX仅需要的一部分,EdgeContext从而允许GraphX选择优化的联接策略。例如,如果正在计算每个用户的fellow的平均年龄,则仅需要源字段,因此可以TripletFields.Src用来表明仅需要源字段

在GraphX的早期版本中,使用字节码检查来推断,TripletFields,但是发现字节码检查有些不可靠,而是选择了更明确的用户控制。

在以下示例中,使用aggregateMessages%E2%87%92A,TripletFields)(ClassTag[A]):VertexRDD[A])算子来计算每个用户的高级fellow的平均年龄。

import org.apache.spark.graphx.{Graph, VertexRDD}

import org.apache.spark.graphx.util.GraphGenerators

// Create a graph with "age" as the vertex property.

// Here we use a random graph for simplicity.

val graph: Graph[Double, Int] =

GraphGenerators.logNormalGraph(sc, numVertices = 100).mapVertices( (id, _) => id.toDouble )

// Compute the number of older followers and their total age

val olderFollowers: VertexRDD[(Int, Double)] = graph.aggregateMessages[(Int, Double)](

triplet => { // Map Function

if (triplet.srcAttr > triplet.dstAttr) {

// Send message to destination vertex containing counter and age

triplet.sendToDst((1, triplet.srcAttr))

}

},

// Add counter and age

(a, b) => (a._1 + b._1, a._2 + b._2) // Reduce Function

)

// Divide total age by number of older followers to get average age of older followers

val avgAgeOfOlderFollowers: VertexRDD[Double] =

olderFollowers.mapValues( (id, value) =>

value match { case (count, totalAge) => totalAge / count } )

// Display the results

avgAgeOfOlderFollowers.collect.foreach(println(_))

在Spark存储库中的“ examples / src / main / scala / org / apache / spark / examples / graphx / AggregateMessagesExample.scala”中找到完整的示例代码。

aggregateMessages当消息(和消息的总和)大小固定(例如,浮点数和加法而不是列表和串联)时,该算子将以最佳方式执行。

Map Reduce三元组转换指南(旧版)

在GraphX的早期版本中,邻域聚合是使用mapReduceTriplets算子完成的:

class Graph[VD, ED] {

def mapReduceTriplets[Msg](

map: EdgeTriplet[VD, ED] => Iterator[(VertexId, Msg)],

reduce: (Msg, Msg) => Msg)

: VertexRDD[Msg]

}

在mapReduceTriplets算子者需要被施加到每个三联体,可以产生用户定义的映射函数的消息,使用用户定义的汇总的 reduce功能。但是,发现返回的迭代器的用户很昂贵,且抑制了应用其它优化(例如,局部顶点重新编号)的能力。在aggregateMessages%E2%87%92A,TripletFields)(ClassTag[A]):VertexRDD[A])引入的EdgeContext中,公开了三元组字段,还具有将消息显式发送到源和目标顶点的功能。此外,删除了字节码检查,而是要求用户指出三元组中实际需要的字段。

以下代码块使用mapReduceTriplets:

val graph: Graph[Int, Float] =

def msgFun(triplet: Triplet[Int, Float]): Iterator[(Int, String)] = {

Iterator((triplet.dstId, "Hi"))

}

def reduceFun(a: String, b: String): String = a + " " + b

val result = graph.mapReduceTriplets[String](msgFun, reduceFun)

可以使用以下方式重写aggregateMessages:

val graph: Graph[Int, Float] =

def msgFun(triplet: EdgeContext[Int, Float, String]) {

triplet.sendToDst("Hi")

}

def reduceFun(a: String, b: String): String = a + " " + b

val result = graph.aggregateMessages[String](msgFun, reduceFun)

计算度信息

常见的聚合任务是计算每个顶点的度:每个顶点相邻的边数。在有向图的上下文中,通常有必要知道每个顶点的入度,出度和总度。本 GraphOps类包含算子计算度数每个顶点的集合。例如,在下面的示例中,计算最大进,出和总度数:

// Define a reduce operation to compute the highest degree vertex

def max(a: (VertexId, Int), b: (VertexId, Int)): (VertexId, Int) = {

if (a._2 > b._2) a else b

}

// Compute the max degrees

val maxInDegree: (VertexId, Int)  = graph.inDegrees.reduce(max)

val maxOutDegree: (VertexId, Int) = graph.outDegrees.reduce(max)

val maxDegrees: (VertexId, Int)   = graph.degrees.reduce(max)

收集邻域

在某些情况下,通过收集每个顶点处的相邻顶点及其属性来表达计算可能会更容易。使用collectNeighborIdscollectNeighbors]])算子可以轻松完成 此 算子。

class GraphOps[VD, ED] {

def collectNeighborIds(edgeDirection: EdgeDirection): VertexRDD[Array[VertexId]]

def collectNeighbors(edgeDirection: EdgeDirection): VertexRDD[ Array[(VertexId, VD)] ]

}

这些算子可能会非常昂贵,因为复制信息并需要大量沟通。如果可能,尝试aggregateMessages%E2%87%92A,TripletFields)(ClassTag[A]):VertexRDD[A]) 直接使用算子表示相同的计算。

缓存和取消缓存

在Spark中,默认情况下,RDD不保留在内存中。为避免重新计算,在多次使用时必须将显式缓存(参见《Spark编程指南》)。GraphX中的图形行为相同。多次使用图形时,确保先调用Graph.cache()

在迭代计算中,为了获得最佳性能,也可能需要取消缓存。默认情况下,缓存的RDD和图形将保留在内存中,直到内存压力迫使按LRU顺序逐出为止。对于迭代计算,先前迭代的中间结果将填满缓存。尽管最终将被驱逐,但存储在内存中的不必要数据将减慢垃圾回收速度。一旦不再需要中间结果,则将其取消缓存会更有效。这涉及到在每次迭代中实现(缓存和强制执行)图或RDD,取消缓存所有其它数据集以及仅在以后的迭代中使用实现的数据集。但是,由于图是由多个RDD组成的,很难正确地取消持久保留。对于迭代计算,建议使用Pregel API,可以正确地保留中间结果。

Pregel API

图是固有的递归数据结构,因为顶点的属性取决于其相邻的属性,而顶点的属性又取决于相邻的属性。结果,许多重要的图算法迭代地重新计算每个顶点的属性,直到达到定点条件为止。已经提出了一系列图并行抽象来表达这些迭代算法。GraphX公开了Pregel API的一种变体。

在较高层次上,GraphX中的Pregel算子是受图拓扑限制的批量同步并行消息传递抽象。Pregel算子在一系列超级步骤中执行,其中顶点从上一个超级步骤接收入站消息的总和,计算顶点属性的新值,然后在下一个超级步骤中将消息发送到相邻的顶点。与Pregel不同,消息是根据边缘三元组并行计算的,且消息计算可以访问源顶点和目标顶点属性。在超级步骤中会跳过未收到消息的顶点。当没有消息剩余时,Pregel算子终止迭代并返回最终图形。

与更标准的Pregel实现不同,GraphX中的顶点只能将消息发送到相邻的顶点,且消息的构建是使用用户定义的消息传递功能并行完成的。这些限制允许在GraphX中进行其它优化。

以下是Pregel算子%E2%87%92VD,(EdgeTriplet[VD,ED])%E2%87%92Iterator[(VertexId,A)],(A,A)%E2%87%92A)(ClassTag[A]):Graph[VD,ED])的类型签名及其实现的草图(注意:为避免由于长谱系链而引起的stackOverflowError,pregel通过将“ spark.graphx.pregel.checkpointInterval”设置为a)来定期支持检查点图和消息。正数,例如10。并使用SparkContext.setCheckpointDir(directory:String))设置检查点目录:

class GraphOps[VD, ED] {

def pregel[A]

(initialMsg: A,

maxIter: Int = Int.MaxValue,

activeDir: EdgeDirection = EdgeDirection.Out)

(vprog: (VertexId, VD, A) => VD,

sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],

mergeMsg: (A, A) => A)

: Graph[VD, ED] = {

// Receive the initial message at each vertex

var g = mapVertices( (vid, vdata) => vprog(vid, vdata, initialMsg) ).cache()

// compute the messages

var messages = GraphXUtils.mapReduceTriplets(g, sendMsg, mergeMsg)

var activeMessages = messages.count()

// Loop until no messages remain or maxIterations is achieved

var i = 0

while (activeMessages > 0 && i < maxIterations) {

// Receive the messages and update the vertices.

g = g.joinVertices(messages)(vprog).cache()

val oldMessages = messages

// Send new messages, skipping edges where neither side received a message. We must cache

// messages so it can be materialized on the next line, allowing us to uncache the previous

// iteration.

messages = GraphXUtils.mapReduceTriplets(

g, sendMsg, mergeMsg, Some((oldMessages, activeDirection))).cache()

activeMessages = messages.count()

i += 1

}

g

}

}

Pregel采用了两个参数列表(即graph.pregel(list1)(list2))。第一个参数列表包含配置参数,包括初始消息,最大迭代次数以及发送消息的边缘方向(默认情况下沿边缘)。第二个参数列表包含用户定义的函数,这些函数用于接收消息(顶点程序vprog),计算消息(sendMsg)以及组合消息mergeMsg。

在以下示例中,可以使用Pregel算子来表示诸如单个源最短路径之类的计算。

import org.apache.spark.graphx.{Graph, VertexId}

import org.apache.spark.graphx.util.GraphGenerators

// A graph with edge attributes containing distances

val graph: Graph[Long, Double] =

GraphGenerators.logNormalGraph(sc, numVertices = 100).mapEdges(e => e.attr.toDouble)

val sourceId: VertexId = 42 // The ultimate source

// Initialize the graph such that all vertices except the root have distance infinity.

val initialGraph = graph.mapVertices((id, _) =>

if (id == sourceId) 0.0 else Double.PositiveInfinity)

val sssp = initialGraph.pregel(Double.PositiveInfinity)(

(id, dist, newDist) => math.min(dist, newDist), // Vertex Program

triplet =>// Send Message

if (triplet.srcAttr + triplet.attr < triplet.dstAttr) {

Iterator((triplet.dstId, triplet.srcAttr + triplet.attr))

} else {

Iterator.empty

}

},

(a, b) => math.min(a, b) // Merge Message

)

println(sssp.vertices.collect.mkString("\n"))

在Spark存储库中的“ examples / src / main / scala / org / apache / spark / examples / graphx / SSSPExample.scala”中找到完整的示例代码。

图构建器

GraphX提供了几种从RDD或磁盘上的顶点和边的集合构建图形的方法。默认情况下,没有任何图构建器会重新划分图的边缘。相反,边缘保留在其默认分区(例如HDFS中的原始块)中。Graph.groupEdges:Graph[VD,ED])要求对图进行重新分区,假定相同的边将在同一分区上并置,因此必须在调用Graph.partitionBy之前先进行调用groupEdges。

object GraphLoader {

def edgeListFile(

sc: SparkContext,

path: String,

canonicalOrientation: Boolean = false,

minEdgePartitions: Int = 1)

: Graph[Int, Int]

}

GraphLoader.edgeListFile提供了一种从磁盘上的边缘列表加载图形的方法。解析以下形式(源顶点ID,目标顶点ID)对的邻接列表,跳过以以下内容开头的注释行#:

# This is a comment

2 1

4 1

1 2

Graph从指定的边创建一个,自动创建边提到的任何顶点。所有顶点和边属性均默认为1。该canonicalOrientation参数允许沿正方向(srcId < dstId)重新定向边,这是所连接组件算法所要求的。该minEdgePartitions参数指定要生成的边缘分区的最小数量。例如,如果HDFS文件具有更多块,则边缘分区可能会比指定的更多。

object Graph {

def apply[VD, ED](

vertices: RDD[(VertexId, VD)],

edges: RDD[Edge[ED]],

defaultVertexAttr: VD = null)

: Graph[VD, ED]

def fromEdges[VD, ED](

edges: RDD[Edge[ED]],

defaultValue: VD): Graph[VD, ED]

def fromEdgeTuples[VD](

rawEdges: RDD[(VertexId, VertexId)],

defaultValue: VD,

uniqueEdges: Option[PartitionStrategy] = None): Graph[VD, Int]

}

Graph.apply(ClassTag[VD],ClassTag[ED]):Graph[VD,ED])允许根据顶点和边的RDD创建图形。任意选取重复的顶点,将在边缘RDD中找到但不在顶点RDD中找到的顶点指定为默认属性。

Graph.fromEdges:Graph[VD,ED]) 允许仅从边缘的RDD创建图形,自动创建边缘提到的任何顶点,为其指定默认值。

Graph.fromEdgeTuples(ClassTag[VD]):Graph[VD,Int])允许仅从边缘元组的RDD创建图形,为边缘分配值1,自动创建Edge提及的任何顶点并将其指定为默认值。还支持对边缘进行重复数据删除;要进行重复数据删除,传递SomeaPartitionStrategy作为uniqueEdges参数(例如uniqueEdges = Some(PartitionStrategy.RandomVertexCut))。必须使用分区策略才能将相同边缘并置在同一分区上,以便可以对进行重复数据删除。

顶点和边缘RDD

GraphX公开RDD存储在图中的顶点和边的视图。由于GraphX在优化的数据结构中维护了顶点和边,且这些数据结构提供了其它功能,因此分别将顶点和边作为VertexRDDVertexRDD和EdgeRDDEdgeRDD返回。将介绍这些类型中的一些其它有用的功能。这只是一个不完整的列表,参阅API文档以获取正式的算子列表。

顶点RDD

该VertexRDD[A]扩展RDD[(VertexId, A)]并增加了额外的限制,每个 VertexId只发生一次。此外,VertexRDD[A]表示一顶点,每个顶点的类型属性为A。在内部,这是通过将顶点属性存储在可重用的哈希映射数据结构中来实现的。因此,如果两个VertexRDDs是从相同的基本 VertexRDDVertexRDD派生的(例如,通过filter或mapValues),可以在恒定时间内将连接在一起,无需进行哈希评估。为了利用此索引数据结构,VertexRDDVertexRDD公开了以下附加功能:

class VertexRDD[VD] extends RDD[(VertexId, VD)] {

// Filter the vertex set but preserves the internal index

def filter(pred: Tuple2[VertexId, VD] => Boolean): VertexRDD[VD]

// Transform the values without changing the ids (preserves the internal index)

def mapValues[VD2](map: VD => VD2): VertexRDD[VD2]

def mapValues[VD2](map: (VertexId, VD) => VD2): VertexRDD[VD2]

// Show only vertices unique to this set based on their VertexId's

def minus(other: RDD[(VertexId, VD)])

// Remove vertices from this set that appear in the other set

def diff(other: VertexRDD[VD]): VertexRDD[VD]

// Join operators that take advantage of the internal indexing to accelerate joins (substantially)

def leftJoin[VD2, VD3](other: RDD[(VertexId, VD2)])(f: (VertexId, VD, Option[VD2]) => VD3): VertexRDD[VD3]

def innerJoin[U, VD2](other: RDD[(VertexId, U)])(f: (VertexId, VD, U) => VD2): VertexRDD[VD2]

// Use the index on this RDD to accelerate a `reduceByKey` operation on the input RDD.

def aggregateUsingIndex[VD2](other: RDD[(VertexId, VD2)], reduceFunc: (VD2, VD2) => VD2): VertexRDD[VD2]

}

例如,filter算子如何返回VertexRDDVertexRDD。过滤器实际上是使用来实现的,BitSet从而重用了索引并保留了与其它VertexRDDs快速连接的能力。同样,mapValues算子不允许map功能更改,VertexId从而使相同的HashMap数据结构可以重用。无论是 leftJoin和innerJoin能够连接两个时识别VertexRDD来自同一来源的小号 HashMap和落实线性扫描,而不是昂贵的点查找的加入。

该aggregateUsingIndex算子对于从 VertexRDDNET高效构建新VertexRDD有用RDD[(VertexId, A)]。从概念上讲,如果构建VertexRDD[B]了一组顶点(在某些情况下是这些顶点的超集),RDD[(VertexId, A)]可以重用索引进行聚合,然后进行索引RDD[(VertexId, A)]。例如:

val setA: VertexRDD[Int] = VertexRDD(sc.parallelize(0L until 100L).map(id => (id, 1)))

val rddB: RDD[(VertexId, Double)] = sc.parallelize(0L until 100L).flatMap(id => List((id, 1.0), (id, 2.0)))

// There should be 200 entries in rddB

rddB.count

val setB: VertexRDD[Double] = setA.aggregateUsingIndex(rddB, _ + _)

// There should be 100 entries in setB

setB.count

// Joining A and B should now be fast!

val setC: VertexRDD[Double] = setA.innerJoin(setB)((id, a, b) => a + b)

边缘RDD

EdgeRDD[ED],其延伸RDD[Edge[ED]]组织在块的边缘在使用中定义的各种分区策略的一个分区PartitionStrategy。在每个分区内,边缘属性和邻接结构分别存储,从而在更改属性值时可实现最大程度的重用。

EdgeRDDEdgeRDD公开的三个附加功能是:

// Transform the edge attributes while preserving the structure

def mapValues[ED2](f: Edge[ED] => ED2): EdgeRDD[ED2]

// Reverse the edges reusing both attributes and structure

def reverse: EdgeRDD[ED]

// Join two `EdgeRDD`s partitioned using the same partitioning strategy.

def innerJoin[ED2, ED3](other: EdgeRDD[ED2])(f: (VertexId, VertexId, ED, ED2) => ED3): EdgeRDD[ED3]

在大多数应用程序中,发现EdgeRDDEdgeRDD上的算子是通过图形算子完成的,或者依赖于基RDD类中定义的算子。

优化表示

尽管在GraphX表示中的分布式图形的优化的详细描述超出了本指南的范围,但是一些高级的理解可能有助于可伸缩算法的设计以及API的最佳使用。GraphX采用顶点切割方法进行分布式图分区:

GraphX不会沿边缘划分图,而是沿顶点划分图,这可以减少通信和存储开销。从逻辑上讲,这对应于为机器分配边并允许顶点跨越多台机器。分配边的确切方法取决于,PartitionStrategy并且对各种启发式方法有一些折衷。用户可以通过用Graph.partitionBy算子对图形进行重新划分来在不同的策略之间进行选择。默认的分区策略是使用图形构造中提供的边缘的初始分区。但是,用户可以轻松地切换到GraphX中包含的2D分区或其它启发式方法。

一旦对边缘进行了划分,高效图形并行计算的关键挑战就是有效地将顶点属性与边缘连接起来。因为现实世界中的图通常具有比顶点更多的边,所以将顶点属性移到边上。因为并非所有分区都包含与所有顶点相邻的边,所以在内部维护一个路由表,该路由表在实现诸如triplets和算子所需的连接时,标识在哪里广播顶点aggregateMessages。

图算法

GraphX包含一组图形算法,以简化分析任务。这些算法包含在org.apache.spark.graphx.lib包中,可以作为方法Graph通过via直接访问GraphOps。本节介绍算法及其使用方法。

page页排名

PageRank测量在图中每个顶点的重要性,假设从边缘ùv表示的认可v通过的重要性ü。例如,如果一个Twitter用户之后有许多其它用户,则该用户将获得很高的排名。

GraphX带有PageRank的静态和动态实现,作为PageRank对象上的方法。静态PageRank运行固定的迭代次数,而动态PageRank运行直到排名收敛(即,停止变化超过指定的公差)。GraphOps允许直接将这些算法作为方法调用Graph。

GraphX还包括一个示例社交网络数据集,可以在其上运行PageRank。在中给出了一组用户,在中给出了data/graphx/users.txt一组用户之间的关系data/graphx/followers.txt。计算每个用户的PageRank如下:

import org.apache.spark.graphx.GraphLoader

// Load the edges as a graph

val graph = GraphLoader.edgeListFile(sc, "data/graphx/followers.txt")

// Run PageRank

val ranks = graph.pageRank(0.0001).vertices

// Join the ranks with the usernames

val users = sc.textFile("data/graphx/users.txt").map { line =>

val fields = line.split(",")

(fields(0).toLong, fields(1))

}

val ranksByUsername = users.join(ranks).map {

case (id, (username, rank)) => (username, rank)

}

// Print the result

println(ranksByUsername.collect().mkString("\n"))

在Spark存储库中的“ examples / src / main / scala / org / apache / spark / examples / graphx / PageRankExample.scala”中找到完整的示例代码。

连接的组件

连通组件算法使用其编号最小的顶点的ID标记图的每个连通组件。例如,在社交网络中,连接的组件可以近似群集。GraphX包含在该算法的实现ConnectedComponents目的,且计算从所述示例性社交网络数据集的连通分量的PageRank部分如下:

import org.apache.spark.graphx.GraphLoader

// Load the graph as in the PageRank example

val graph = GraphLoader.edgeListFile(sc, "data/graphx/followers.txt")

// Find the connected components

val cc = graph.connectedComponents().vertices

// Join the connected components with the usernames

val users = sc.textFile("data/graphx/users.txt").map { line =>

val fields = line.split(",")

(fields(0).toLong, fields(1))

}

val ccByUsername = users.join(cc).map {

case (id, (username, cc)) => (username, cc)

}

// Print the result

println(ccByUsername.collect().mkString("\n"))

在Spark存储库中的“ examples / src / main / scala / org / apache / spark / examples / graphx / ConnectedComponentsExample.scala”中找到完整的示例代码。

三角计数

当顶点有两个相邻的顶点且在之间有一条边时,该顶点是三角形的一部分。GraphX在TriangleCount对象中实现三角形计数算法,该算法确定通过每个顶点的三角形的数量,从而提供聚类的度量。从PageRank部分计算社交网络数据集的三角形计数。TriangleCount要求边沿必须为规范方向(srcId < dstId),且必须使用来对图形进行分区Graph.partitionBy

import org.apache.spark.graphx.{GraphLoader, PartitionStrategy}

// Load the edges in canonical order and partition the graph for triangle count

val graph = GraphLoader.edgeListFile(sc, "data/graphx/followers.txt", true)

.partitionBy(PartitionStrategy.RandomVertexCut)

// Find the triangle count for each vertex

val triCounts = graph.triangleCount().vertices

// Join the triangle counts with the usernames

val users = sc.textFile("data/graphx/users.txt").map { line =>

val fields = line.split(",")

(fields(0).toLong, fields(1))

}

val triCountByUsername = users.join(triCounts).map { case (id, (username, tc)) =>

(username, tc)

}

// Print the result

println(triCountByUsername.collect().mkString("\n"))

在Spark存储库中的“ examples / src / main / scala / org / apache / spark / examples / graphx / TriangleCountingExample.scala”中找到完整的示例代码。

例子

假设想从一些文本文件构建图形,将图形限制在重要的关系和用户上,在子图形上运行page-rank,然后最后返回与顶级用户相关联的属性。可以使用GraphX在几行中完成所有这些算子:

import org.apache.spark.graphx.GraphLoader

// Load my user data and parse into tuples of user id and attribute list

val users = (sc.textFile("data/graphx/users.txt")

.map(line => line.split(",")).map( parts => (parts.head.toLong, parts.tail) ))

// Parse the edge data which is already in userId -> userId format

val followerGraph = GraphLoader.edgeListFile(sc, "data/graphx/followers.txt")

// Attach the user attributes

val graph = followerGraph.outerJoinVertices(users) {

case (uid, deg, Some(attrList)) => attrList

// Some users may not have attributes so we set them as empty

case (uid, deg, None) => Array.empty[String]

}

// Restrict the graph to users with usernames and names

val subgraph = graph.subgraph(vpred = (vid, attr) => attr.size == 2)

// Compute the PageRank

val pagerankGraph = subgraph.pageRank(0.001)

// Get the attributes of the top pagerank users

val userInfoWithPageRank = subgraph.outerJoinVertices(pagerankGraph.vertices) {

case (uid, attrList, Some(pr)) => (pr, attrList.toList)

case (uid, attrList, None) => (0.0, attrList.toList)

}

println(userInfoWithPageRank.vertices.top(5)(Ordering.by(_._2._1)).mkString("\n"))

在Spark存储库中的“ examples / src / main / scala / org / apache / spark / examples / graphx / ComprehensiveExample.scala”中找到完整的示例代码。

手机扫一扫

移动阅读更方便

阿里云服务器
腾讯云服务器
七牛云服务器

你可能感兴趣的文章