发布于 ,更新于 

SparkGraphX实战

1. 读写数据

1.1 NebulaGraph数据源

读取点
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
  def readVertexGraph(spark: SparkSession): RDD[NebulaGraphxVertex] = {
// LOG.info("start to read graphx vertex")
val config =
NebulaConnectionConfig
.builder()
.withMetaAddress("119.45.124.178:9559")
// .withConnectionRetry(2)
.withTimeout(6000)
.build()
val nebulaReadVertexConfig: ReadNebulaConfig = ReadNebulaConfig
.builder()
.withSpace("demo_basketballplayer")
.withLabel("player")
.withNoColumn(false)
// .withReturnCols(List("birthday"))
.withLimit(100)
.withPartitionNum(10)
.build()

val vertexRDD = spark.read.nebula(config, nebulaReadVertexConfig).loadVerticesToGraphx()
LOG.info("vertex rdd first record: " + vertexRDD.first())
LOG.info("vertex rdd count: {}", vertexRDD.count())
vertexRDD
}
读取边
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
  def readEdgeGraph(spark: SparkSession): RDD[NebulaGraphxEdge] = {
LOG.info("start to read graphx edge")
val config =
NebulaConnectionConfig
.builder()
.withMetaAddress("119.45.124.178:9559")
// .withConnectionRetry(2)
.withTimeout(6000)
.build()
val nebulaReadEdgeConfig: ReadNebulaConfig = ReadNebulaConfig
.builder()
.withSpace("demo_basketballplayer")
.withLabel("follow")
.withNoColumn(false)
.withReturnCols(List())
.withLimit(10)
.withPartitionNum(10)
.build()
val edgeRDD = spark.read.nebula(config, nebulaReadEdgeConfig).loadEdgesToGraphx()
LOG.info("edge rdd first record:" + edgeRDD.first())
LOG.info("edge rdd count: {}", edgeRDD.count())
edgeRDD
}
生成graph
1
2
3
4
5
6
// 从nebula读取点边RDD
val vertexRDD = readVertexGraph(spark)
val edgeRDD = readEdgeGraph(spark)
//readEdgeWithNgql(spark)
// 生成graphx
val graph = Graph(vertexRDD, edgeRDD)

2. GraphX相关API

2.1 pregel

子节点总数
1
2
3
4
5
6
7
8
9
10
val maxIter = 1 // dfs层数,即n度范围    
val init_graph = graph.mapVertices((vid, vd) => Set[VertexId](vid))
val g = init_graph.pregel[Set[VertexId]](initialMsg = Set[VertexId](), maxIter, EdgeDirection.Either)(
(id:VertexId,vd:Set[VertexId], a:Set[VertexId]) => vd ++ a, // 子节点更新
// (et:EdgeTriplet[Set[VertexId], (EdgeRank, Prop)])=> Iterator((et.srcId, et.dstAttr)), // 将子节点消息发送给起始节点
(et:EdgeTriplet[Set[VertexId], Int])=> Iterator((et.srcId, et.dstAttr)), // 将子节点消息发送给起始节点
(a:Set[VertexId],b:Set[VertexId]) => a ++ b // 子节点合并
)
val res = g.mapVertices((vid,vd) => vd.size - 1) // 减去本身节点
res.vertices.collect.foreach(println)

3. 图算法

3.1 DFS

子节点总数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
    val maxIter = 1 // dfs层数,即n度范围
val vertexs = graph.vertices.collect()
val resultDegree = vertexs.map(vd => {
iterNum = 0
val nodeNums = dfs(graph, vertexId = vd._1, mutable.Seq.empty[VertexId])(maxIter).size - 1
(vd._1, nodeNums)
})
resultDegree.foreach(println)
println("degree first" + resultDegree(0))


def dfs(g: Graph[Long, Int], vertexId: VertexId, visited: mutable.Seq[VertexId])(
maxIter: Int): mutable.Seq[VertexId] = {
if (visited.contains(vertexId)) {
visited
} else {
if (iterNum > maxIter) {
return visited
}
val newVisited = visited :+ vertexId
val neighbors = g.collectNeighbors(EdgeDirection.Out).lookup(vertexId).flatten
println("neighbors size:" + neighbors.size)
iterNum = iterNum + 1
println("iterNum:" + iterNum)
val resultVisited = neighbors.foldLeft(newVisited)((x, y) => dfs(g, y._1, x)(maxIter))
iterNum = iterNum - 1
resultVisited
}
}