我是靠谱客的博主 可靠飞机,最近开发中收集的这篇文章主要介绍数据仓库 - 树形结构的维表开发实践一、概述二、GreenPlum处理方案三、Spark GraphX 实现方案四、备注,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

一、概述

根据星型模型的概念,不存在渐变维度,数据存在冗,典型例子地域维度表,如国家,省,市这种树形数据结构。

OLTP数据结构:

idpidname
1 中国
21广东省
32深圳

期望的星型模型数据结构:

idcountryprovincecitylevel
1中国UNKNOWNUNKNOWN1
2中国广东省UNKNOWN2
3中国广东省深圳3

二、GreenPlum处理方案

gp底层是PostgreSQL, 支持递归,实现如下:

WITH RECURSIVE cte AS (
SELECT
"id",
"name",
1 AS "level"
FROM
t_location
UNION ALL
SELECT
"c"."id",
(
"p"."name" || '/' || "c"."name"
) :: VARCHAR (255) AS "name",
"p"."level" + 1 AS "level"
FROM
cte AS "p"
JOIN t_location AS "c" ON "c"."pid" = "p"."id"
)
SELECT
"id",
split_part("name", '/', 1) AS country,
split_part("name", '/', 2) AS province,
split_part("name", '/', 3) AS city,
"level"
FROM
(
SELECT
"c"."id",
"c"."name",
"c"."level"
FROM
cte "c"
INNER JOIN (
SELECT
"id",
MAX ("level") AS "lv"
FROM
cte
GROUP BY
"id"
) AS "d" ON "c"."id" = "d"."id"
AND "c"."level" = "d"."lv"
) AS "t"

源数据:

idpidname
1 中国
21广东省
32深圳
42珠海
51福建省
65厦门

输出:

idcountryprovincecitylevel
1中国  1
2中国广东省 2
5中国福建省 2
3中国广东省深圳3
4中国广东省珠海3
6中国福建省厦门3

三、Spark GraphX 实现方案

如果数据在Hadoop环境下,由于spark sql不支持recursive, 经过一番调研,网上找到了Spark GraphX的解决方案,主要利用Pregel接口。Pregel 是一个迭代图处理模型,由谷歌开发,它使用顶点之间的消息传递进行一系列的迭代。GraphX 实现了类似 Pregel 块同步消息传递 API。Pregel的步骤是顶点接收上一个super step的消息,基于消息计算新值赋予顶点,并发送消息给相连的下一个super step。

package net.demo;
import org.apache.spark._
import org.apache.spark.rdd.RDD
import org.apache.spark.graphx._
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.DataFrame
// 消息
case class VertexMessage(
initVertexId: Long, // 起始顶点的ID,用于判断图是否存在环
level: Int, // 层级
path: List[String], // 每层的名称集合
isCyclic: Boolean, // 是否存在环
isLeaf: Boolean // 是否叶子结点
)
//
case class VertexValue(
id: Long, // 结点ID
name: String, // 名称
initVertexId: Long, // 起始顶点ID
level: Int, // 默认0
path: List[String], // 名称集合
isCyclic: Boolean, // 是否有环,默认false
isLeaf: Boolean // 是否叶子结点,默认true
)
case class MyVertex(
id: Long,
name: String
)
object HierarchyToSingleRow {
// 计算顶点新值
def vprog(
vertexId: VertexId, // 当前顶点ID
value: VertexValue, // 当前顶点值
message: VertexMessage // 上一超步发送的消息
): VertexValue = {
if (message.level == 0) { // 初始级
value.copy(level = value.level + 1)
} else if (message.isCyclic) {
value.copy(isCyclic = true)
} else if (!message.isLeaf) {
value.copy(isLeaf = false)
} else {
value.copy(
initVertexId = message.initVertexId,
level = value.level + 1,
path = value.name :: message.path
)
}
}
// src -- edge -- dst
// triplet: src.id, dst.id, src.attr, e.attr, dst.attr
def sendMsg(
triplet: EdgeTriplet[VertexValue, String]
): Iterator[(VertexId, VertexMessage)] = {
val src = triplet.srcAttr
val dst = triplet.dstAttr
// 判断是否有环
if (src.initVertexId == triplet.dstId || src.initVertexId == dst.initVertexId) {
// 有环
if (!src.isCyclic) { // 设为有环
Iterator(
(
triplet.dstId,
VertexMessage(
initVertexId = src.initVertexId,
level = src.level,
path = src.path,
isCyclic = true,
isLeaf = src.isLeaf
)
)
)
} else { // false 则忽略
Iterator.empty
}
} else { // 无环,继续
if (src.isLeaf) { // 初始化时所有的结点都是叶子结点,两个顶点存在边,那么src顶点就是不是叶子结点
Iterator(
(
triplet.srcId,
VertexMessage(
initVertexId = src.initVertexId,
level = src.level,
path = src.path,
isCyclic = false,
isLeaf = false // 非常重要
)
)
)
} else { // 给dst结点设置新值
Iterator(
(
triplet.dstId,
VertexMessage(
initVertexId = src.initVertexId,
level = src.level,
path = src.path,
isCyclic = false, // Set to false so that cyclic updating is ignored in vprog
isLeaf = true // Set to true so that leaf updating is ignored in vprog
)
)
)
}
}
}
// 合并函数
def mergeMsg(msg1: VertexMessage, msg2: VertexMessage): VertexMessage =
msg2
def main(args: Array[String]): Unit = {
val spark =
SparkSession.builder().appName("Spark GraphX Example").getOrCreate();
import spark.implicits._
// 读取数据
val locationDF =
spark.read
.csv("/user/spark/location.csv")
.toDF("id", "pid", "name")
.cache()
// 构建顶点RDD
val verticesRdd: RDD[(VertexId, MyVertex)] = locationDF
.withColumn("id", $"id".cast("Long"))
.select(
$"id",
$"name"
)
.rdd
.map(loc => (loc.getLong(0), MyVertex(loc.getLong(0), loc.getString(1))))
// 构建边,确定顶点指向顶点的方向,本用例 pid -> id
val edgesRd: RDD[Edge[String]] = locationDF
.filter(
$"pid".isNotNull
)
.withColumn("id", $"id".cast("Long"))
.withColumn("pid", $"pid".cast("Long"))
.select(
$"pid",
$"id"
)
.rdd
.map(loc => Edge(loc.getLong(0), loc.getLong(1), "top-down")) // top-down 是edge的属性,在本用例中没有用途
// 构建图
val graph = Graph(verticesRdd, edgesRd).cache()
// 初始化图中各顶点的值
val valueGraph = graph.mapVertices { (id, v) =>
VertexValue(
id = v.id,
name = v.name,
initVertexId = id,
level = 0,
path = List(v.name),
isCyclic = false,
isLeaf = false
)
}
// 初始消息
val initialMsg = VertexMessage(
initVertexId = 0L,
level = 0,
path = Nil,
isCyclic = false,
isLeaf = true
)
val results = valueGraph.pregel(
initialMsg, // 初始消息
Int.MaxValue, // 迭代次数
EdgeDirection.Out // 往边的哪个方向输出消息
)(
vprog, // 计算函数
sendMsg, // 发送消息函数
mergeMsg // 合并函数
)
// 转换
val df = results.vertices
.map {
case (id, v) =>
(
id,
v.level,
v.path.reverse.mkString("/")
)
}
.toDF("id", "level", "path")
df.withColumn("_tmp", split($"path", "/"))
.select(
$"id",
$"_tmp".getItem(0).as("country"),
$"_tmp".getItem(1).as("province"),
$"_tmp".getItem(2).as("city"),
$"level"
)
.show
spark.stop
}
}

location.csv

1,,中国
2,1,广东省
3,2,深圳
4,2,珠海
5,1,福建省
6,5,厦门

输出:

idcountryprovincecitylevel
1中国nullnull1
2中国广东省null2
5中国福建省null2
3中国广东省深圳3
4中国广东省珠海3
6中国福建省厦门3

四、备注

代码注释不一定正确,部分是个人理解,如有错误,欢迎指正。

参考文献:

http://spark.apache.org/docs/latest/graphx-programming-guide.html

https://blog.csdn.net/qq_38265137/article/details/80547763

https://james.faeldon.com/generating-descendant-table-using-spark-graph-x#generating-descendant-table-using-spark-graphx

https://www.jianshu.com/p/d9170a0723e4

最后

以上就是可靠飞机为你收集整理的数据仓库 - 树形结构的维表开发实践一、概述二、GreenPlum处理方案三、Spark GraphX 实现方案四、备注的全部内容,希望文章能够帮你解决数据仓库 - 树形结构的维表开发实践一、概述二、GreenPlum处理方案三、Spark GraphX 实现方案四、备注所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(67)

评论列表共有 0 条评论

立即
投稿
返回
顶部