我是靠谱客的博主 俏皮战斗机,这篇文章主要介绍Spark GraphX的边构造过程详解——从 RDD[Edge[ED]] 到 EdgeRDD[ED, VD],现在分享给大家,希望可以做个参考。

关键词 Spark GraphX RDD Edge EdgeRDD EdgeRDDImpl 分区 索引 分区索引

本文介绍Spark GraphX内部对边存储的机理,详解边分区内部的索引构建过程。

在最初,边是这样的 RDD[(srcId, dstId, attr)],这种记法要看得懂,简而言之,它是(srcId, dstId, attr)这个三元组的集合。不过,RDD进行了分区存储,分区内部元素使用数组形式存储。这种结构并不能提供有效的图计算,如快速找出(srcId, dstId)的属性值attr,找出以srcId为源顶点的所有边等操作若按遍历方式处理,效率将非常低。

这里有一些对象要理解:
Edge,边对象,存有srcId, dstId, attr 3个字段,和一点点边的方法
RDD[Edge],存放着Edge对象的RDD
EdgeRDD,完整提供边的各种操作类,编程时用的就是它了。
EdgeRDDImpl,边的一些分区优化实现类,把边优化存储在分区里。这是GraphX 内部的类了,编程时都不用它的,我们调用EdgeRDD就行了。

它们之间继承关系如下:
EdgeRDD extends RDD[Edge]
EdgeRDDImpl extends EdgeRDD

于是乎,GraphX是这样处理的…

  • 构建RDD[Edge]

    调用RDD[(srcId, dstId, attr)]的map操作,把(srcId, dstId, attr)转为Edge,这里就不介绍了,只是调用map算子而已

  • 由RDD[Edge]构建EdgeRDD

    EdgeRDD.fromEdges(edges:RDD[Edge]):EdgeRDDImpl,这个初始化方法实现了我们的转换,我们接下来看一看它是如何实现的

复制代码
1
2
3
4
5
6
7
8
9
10
def fromEdges[ED: ClassTag, VD: ClassTag](edges: RDD[Edge[ED]]): EdgeRDDImpl[ED, VD] = { val edgePartitions = edges.mapPartitionsWithIndex { (pid, iter) => val builder = new EdgePartitionBuilder[ED, VD] iter.foreach { e => builder.add(e.srcId, e.dstId, e.attr) } Iterator((pid, builder.toEdgePartition)) } EdgeRDD.fromEdgePartitions(edgePartitions) }
  • edges.mapPartitionsWithIndex得到每个分区的分区号和分区元素迭代器(pid,iter)

  • EdgePartitionBuilder类中有一个edges字段,是个PrimitiveVector[Edge]类型的,把add方法接收的边全放在edges中了。builder对象中就有分区里面所有的Edge了

  • toEdgePartition方法才是真正懂得如何建立各种优化索引的,它返回一个EdgePartition对象。源代码如下,需要一点点scala知识。

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
def toEdgePartition: EdgePartition[ED, VD] = { val edgeArray = edges.trim().array new Sorter(Edge.edgeArraySortDataFormat[ED]) .sort(edgeArray, 0, edgeArray.length, Edge.lexicographicOrdering) val localSrcIds = new Array[Int](edgeArray.size) val localDstIds = new Array[Int](edgeArray.size) val data = new Array[ED](edgeArray.size) val index = new GraphXPrimitiveKeyOpenHashMap[VertexId, Int] val global2local = new GraphXPrimitiveKeyOpenHashMap[VertexId, Int] val local2global = new PrimitiveVector[VertexId] var vertexAttrs = Array.empty[VD] // Copy edges into columnar structures, tracking the beginnings of source vertex id clusters and // adding them to the index. Also populate a map from vertex id to a sequential local offset. if (edgeArray.length > 0) { index.update(edgeArray(0).srcId, 0) var currSrcId: VertexId = edgeArray(0).srcId var currLocalId = -1 var i = 0 while (i < edgeArray.size) { val srcId = edgeArray(i).srcId val dstId = edgeArray(i).dstId localSrcIds(i) = global2local.changeValue(srcId, { currLocalId += 1; local2global += srcId; currLocalId }, identity) localDstIds(i) = global2local.changeValue(dstId, { currLocalId += 1; local2global += dstId; currLocalId }, identity) data(i) = edgeArray(i).attr if (srcId != currSrcId) { currSrcId = srcId index.update(currSrcId, i) } i += 1 } vertexAttrs = new Array[VD](currLocalId + 1) } new EdgePartition( localSrcIds, localDstIds, data, index, global2local, local2global.trim().array, vertexAttrs, None) }

下面引用一下别人的资料,介绍EdgePartition对象的内容
从edgerdd中构建边

入口:Graph.fromEdgeTuples(rawEdgesRdd)
元数据为,分割的两个点ID,把元数据映射成Edge(srcId, dstId, attr)对象, attr默认为null。这样元数据就构建成了RDD[Edge[ED]]

RDD[Edge[ED]]要进一步转化成EdgeRDDImpl[ED, VD]
首先遍历RDD[Edge[ED]]的分区partitions,对分区内的边重排序new Sorter(Edge.edgeArraySortDataFormat[ED]).sort(edgeArray, 0, edgeArray.length, Edge.lexicographicOrdering)即:按照srcId从小到大排序。

问:为何要重排序?
答:为了遍历时顺序访问。采用数组而不是Map,数组是连续的内存单元,具有原子性,避免了Map的hash问题,访问速度快

填充localSrcIds,localDstIds, data, index, global2local, local2global, vertexAttrs

数组localSrcIds,localDstIds中保存的是经过global2local.changeValue(srcId/dstId)转变的本地索引,即:localSrcIds、localDstIds数组下标对应于分区元素,数组中保存的索引位可以定位到local2global中查到具体的VertexId

global2local是spark私有的Map数据结构GraphXPrimitiveKeyOpenHashMap, 保存vertextId和本地索引的映射关系。global2local中包含当前partition所有srcId、dstId与本地索引的映射关系。

data就是当前分区的attr属性数组

index索引最有意思,按照srcId重排序的边数据, 会看到相同的srcId对应了不同的dstId, 见图中index desc部分。index中记录的是相同srcId中第一个出现的srcId与其下标。

local2global记录的是所有的VertexId信息的数组。形如:srcId,dstId,srcId,dstId,srcId,dstId,srcId,dstId。其中会包含相同的ID。即:当前分区所有vertextId的顺序实际值

复制代码
1
2
3
4
5
6
# 用途: # 根据本地下标取VertexId localSrcIds/localDstIds -> index(取出的本地索引) -> local2global -> VertexId # 根据VertexId取本地下标,取属性 VertexId -> global2local -> index(那个哈希对象) -> data -> attr object

spark的数据最终是在patition中表达,所以各种transform都在这里进行,这里的数据结构性能至关重要.

作者简介

唐黎哲,国防科学技术大学并行与分布式计算国家重点实验室(PDL)研究生,14年入学便开始接触spark,准备在余下的读研时间在spark相关开源社区贡献自己的代码,毕业后准备继续从事该方面研究。
邮箱:tanglizhe1105@qq.com

最后

以上就是俏皮战斗机最近收集整理的关于Spark GraphX的边构造过程详解——从 RDD[Edge[ED]] 到 EdgeRDD[ED, VD]的全部内容,更多相关Spark内容请搜索靠谱客的其他文章。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(54)

评论列表共有 0 条评论

立即
投稿
返回
顶部