我是靠谱客的博主 虚拟信封,最近开发中收集的这篇文章主要介绍Spark给key加随机尾串解决数据倾斜问题(Java/Scala版)Spark给key加随机尾串解决数据倾斜问题(Java/Scala版),觉得挺不错的,现在分享给大家,希望可以做个参考。
概述
Spark给key加随机尾串解决数据倾斜问题(Java/Scala版)
通过给key加随机尾串,使得相同key加上随机尾串后的hash值不相等,在聚合操作的时候实现进入不同的分区,实现数据倾斜问题解决方式之一
- 详情请看代码注释
一、引入maven依赖
<!-- core -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>2.4.1</version>
</dependency>
二、测试数据准备
这里我用的是普通的text file,以下文本内容:
hello world
hello tom
hello tomas
it1002 tom it1002 world
test top cn it1002 hello
ok yes test demo good
good it1002
good it1002
good it1002
good it1002
good it1002
good it1002
二、代码编写(Java/Scala)
1、JSkew.java
package top.it1002.spark.demo.dataSkew;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
import top.it1002.spark.util.SparkUtil;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
/**
* java版key随机散列解决数据倾斜问题
*/
public class JSkew {
public static void main(String[] args){
JavaSparkContext context = SparkUtil.getJContext("jSkew" ,6);
String path = "G:\数据\spark\words.txt";
final int partitionNum = 3;
// 切割
List<Tuple2<String, Integer>> list = context.textFile(path, partitionNum).flatMap(new FlatMapFunction<String, String>() {
public Iterator<String> call(String s) throws Exception {
return Arrays.asList(s.split(" ")).iterator();
// 映射的同时加上key随机尾串
}
}).mapToPair(new PairFunction<String, String, Integer>() {
public Tuple2<String, Integer> call(String s) throws Exception {
return new Tuple2<String, Integer>(s + "_" + new Random().nextInt(partitionNum), 1);
// 第一次聚合
}
}).reduceByKey(new Function2<Integer, Integer, Integer>() {
public Integer call(Integer v1, Integer v2) throws Exception {
return v1 + v2;
// 去除尾串
}
}).mapToPair(new PairFunction<Tuple2<String, Integer>, String, Integer>() {
public Tuple2<String, Integer> call(Tuple2<String, Integer> s1) throws Exception {
return new Tuple2<String, Integer>(s1._1.substring(0, s1._1.indexOf("_")), s1._2);
// 再次聚合
}
}).reduceByKey(new Function2<Integer, Integer, Integer>() {
public Integer call(Integer v1, Integer v2) throws Exception {
return v1 + v2;
// 排序
}
}).mapToPair(new PairFunction<Tuple2<String, Integer>, Integer, String>() {
public Tuple2<Integer, String> call(Tuple2<String, Integer> s2) throws Exception {
return new Tuple2<Integer, String>(s2._2, s2._1);
}
}).sortByKey(false).mapToPair(new PairFunction<Tuple2<Integer, String>, String, Integer>() {
public Tuple2<String, Integer> call(Tuple2<Integer, String> i2) throws Exception {
return new Tuple2<String, Integer>(i2._2, i2._1);
}
}).collect();
// 输出打印
for(Tuple2<String, Integer> t : list){
System.out.println(t._1 + " ==> " + t._2);
}
}
}
2、SSkew.scala
package top.it1002.spark.demo.dataSkew
import top.it1002.spark.util.SparkUtil
import scala.util.Random
/**
* scala版key随机散列解决数据倾斜问题
*/
object SSkew {
def main(args: Array[String]): Unit = {
val context = SparkUtil.getSContext("sSkew", 6)
val path = "G:\数据\spark\words.txt"
val partitionNum = 3
context.textFile(path, partitionNum)
// 切割
.flatMap(_.split(" "))
// 映射为元组
.map((_, 1))
// 给key加上随机数,聚合时候具有随机性
.map(t => {
val rnum = Random.nextInt(partitionNum)
(t._1 + "_" + rnum, 1)
})
// 初次聚合
.reduceByKey(_ + _)
// 去除key随机后缀
.map(e => {
val word = e._1.toString().substring(0, e._1.toString().indexOf("_"))
val count = e._2
(word, count)
})
// 再次聚合
.reduceByKey(_ + _)
// 排序(false->降序, true->升序)
.map(e => (e._2, e._1)).sortByKey(false).map(e => (e._2, e._1))
// 输出
.collect().foreach(e => {e._1 + " ==> " + e._2})
}
}
三、运行测试
最后
以上就是虚拟信封为你收集整理的Spark给key加随机尾串解决数据倾斜问题(Java/Scala版)Spark给key加随机尾串解决数据倾斜问题(Java/Scala版)的全部内容,希望文章能够帮你解决Spark给key加随机尾串解决数据倾斜问题(Java/Scala版)Spark给key加随机尾串解决数据倾斜问题(Java/Scala版)所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复