我是靠谱客的博主 凶狠台灯,最近开发中收集的这篇文章主要介绍Spark first, last函数的坑,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

Spark SQL的聚合函数中有first, last函数,从字面意思就是根据分组获取第一条和最后一条记录的值,实际上,只在local模式下,你可以得到满意的答案,但是在生产环境(分布式)时,这个是不能保证的。看源码的解释:

/**
* Returns the first value of `child` for a group of rows. If the first value of `child`
* is `null`, it returns `null` (respecting nulls). Even if [[First]] is used on an already
* sorted column, if we do partial aggregation and final aggregation (when mergeExpression
* is used) its result will not be deterministic (unless the input table is sorted and has
* a single partition, and we use a single reducer to do the aggregation.).
*/

如何保证first, last是有效呢?表要排好序的,同时只能用一个分区处理,再用一个reducer来聚合。。。

所以,在多分区场景不能用first, last函数求得聚合的第一条和最后一条数据。

解决方案:利用Window。

val spark = SparkSession.builder().master("local").appName("Demo").getOrCreate()
import spark.implicits._
val df = Seq(("a", 10, 12345), ("a", 12, 34567), ("a", 11, 23456), ("b", 10, 55555), ("b", 8, 12348)).toDF("name", "value", "event_time")
// 定义window
val asc = Window.partitionBy("name").orderBy($"event_time")
val desc = Window.partitionBy("name").orderBy($"event_time".desc)
// 根据window生成row_number,根据row_number获取对应的数据
val firstValue = df.withColumn("rn", row_number().over(asc)).where($"rn" === 1).drop("rn")
val lastValue = df.withColumn("rn", row_number().over(desc)).where($"rn" === 1).drop("rn")
// 利用join把数据聚合一起
df.groupBy("name")
.count().as("t1")
.join(firstValue.as("fv"), "name")
.join(lastValue.as("lv"), "name")
.select($"t1.name", $"fv.value".as("first_value"), $"lv.value".as("last_value"), $"t1.count")
.show()

输出:

+----+-----------+----------+-----+
|name|first_value|last_value|count|
+----+-----------+----------+-----+
|   b|          8|        10|    2|
|   a|         10|        12|    3|
+----+-----------+----------+-----+

 

最后

以上就是凶狠台灯为你收集整理的Spark first, last函数的坑的全部内容,希望文章能够帮你解决Spark first, last函数的坑所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(61)

评论列表共有 0 条评论

立即
投稿
返回
顶部