我是靠谱客的博主 干净小蝴蝶,最近开发中收集的这篇文章主要介绍使用SparkSQL 和 Hive API 代码实现用户自定义函数UDF UDAF UDTF,觉得挺不错的,现在分享给大家,希望可以做个参考。
概述
UDAF
例:继承SparkSQL API 中的Aggregator类自定义UDAF函数:输入多行,返回一行 => 聚合函数 。 (Scala代码实现)
以下函数功能描述: 求平均值
import org.apache.spark.SparkConf
import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.sql.{DataFrame, Encoder, Encoders, SparkSession, functions}
//输入多行,返回一行 => 聚合函数
object SparkSQL06_UDAF{
def main(args: Array[String]): Unit = {
// 1 创建上下文环境配置对象
val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkSQLTest")
// 2 创建SparkSession对象
val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()
// 3 读取数据
val df: DataFrame = spark.read.json("input/user.json")
// 4 注册UDAF
spark.udf.register("myAvg",functions.udaf(new MyAvgUDAF()))
// 5 创建DataFrame临时视图
df.createOrReplaceTempView("user")
// 6 调用自定义UDAF函数
spark.sql("select myAvg(age) from user").show()
// 7 释放资源
spark.stop()
}
//输入数据类型
case class User(age:Long, name:String)
case class Buff(var sum:Long, var cnt:Long)
/**
* 1,20岁; 2,19岁; 3,18岁
* IN:聚合函数的输入类型:Long
* BUF:
* OUT:聚合函数的输出类型:Double
(18+19+20)/3
*/
class MyAvgUDAF extends Aggregator[User,Buff,Double]{
// 初始化缓冲区
override def zero: Buff = {
Buff(0L,0L)
}
// 将输入的年龄和缓冲区的数据进行聚合 在executor运行
override def reduce(buff: Buff, user: User): Buff = {
buff.sum = buff.sum + user.age
buff.cnt = buff.cnt + 1
buff
}
// 多个缓冲区数据合并 在driver端运行
override def merge(buff1: Buff, buff2: Buff): Buff = {
buff1.sum = buff1.sum + buff2.sum
buff1.cnt = buff1.cnt + buff2.cnt
buff1
}
// 完成聚合操作,获取最终结果
override def finish(buff: Buff): Double = {
buff.sum.toDouble / buff.cnt
}
// sparksql对传递的对象的序列化操作(编码)
//自定义类型就是product
自带类型根据类型选择
override def bufferEncoder: Encoder[Buff] = Encoders.product
override def outputEncoder: Encoder[Double] = Encoders.scalaDouble
}
}
UDF
例:继承Hive API中GenericUDF实现UDF函数:一行进,一行出 。 (Java代码实现)
以下函数功能描述: 获取字符串的长度
package com.yuwenzhi.UDF;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException;
import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
class MyStringLength
extends GenericUDF {
/**
*
* @param arguments 输入参数类型的鉴别器对象
* @return 返回值类型的鉴别器对象
* @throws UDFArgumentException
*/
@Override
public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException {
// 判断输入参数的个数
if(arguments.length !=1){
throw new UDFArgumentLengthException("Input Args Length Error!!!");
}
// 判断输入参数的类型
if(!arguments[0].getCategory().equals(ObjectInspector.Category.PRIMITIVE)){
throw new UDFArgumentTypeException(0,"Input Args Type Error!!!");
}
//函数本身返回值为int,需要返回int类型的鉴别器对象
return PrimitiveObjectInspectorFactory.javaIntObjectInspector;
}
/**
* 函数的逻辑处理
* @param arguments 输入的参数
* @return 返回值
* @throws HiveException
*/
@Override
public Object evaluate(DeferredObject[] arguments) throws HiveException {
if(arguments[0].get() == null){
return 0 ;
}
return arguments[0].get().toString().length();
}
@Override
public String getDisplayString(String[] children) {
return "";
}
}
UDTF
例:继承 Hive API 中GenericUDTF实现UDTF函数:一行进,多行出。 (Java代码实现)
以下函数功能描述: 将JSON字符串数组,炸裂成多个JSON字符串
package com.yuwenzhi.UDTF;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.json.JSONArray;
import java.util.ArrayList;
import java.util.List;
public class ExplodeJsonArray extends GenericUDTF {
public StructObjectInspector initialize(StructObjectInspector argOIs) throws UDFArgumentException {
//1. 参数个数检查,只有一个参数
if(argOIs.getAllStructFieldRefs().size() != 1){
throw new UDFArgumentException("ExplodeJsonArray 只需要一个参数!");
}
//2. 参数类型检查,参数类型必须为String
//2. 1 获取字段检查器
ObjectInspector fieldObjectInspector = argOIs.getAllStructFieldRefs().get(0).getFieldObjectInspector();
if(! "string".equals(fieldObjectInspector.getTypeName())){
throw new UDFArgumentException("参数类型为String!");
}
//3. 返回值类型
List<String> fieldNames = new ArrayList<>();
//一行变多行,列名
fieldNames.add("col1");
List<ObjectInspector> fieldOIs = new ArrayList<>();
//转化为多行后,每行的类型检查
fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames,fieldOIs);
}
@Override
public void process(Object[] objects) throws HiveException {
//1.传入的参数为一个字段,取数组中的第一个即可
String jsonArray = objects[0].toString();
//2.将字符串转化为json数组
JSONArray actions = new JSONArray(jsonArray);
for (int i = 0; i < actions.length(); i++) {
//需要将结果封装成数组再写入
String[] result = new String[1];
result[0] = actions.getString(i);
forward(result);
}
}
@Override
public void close() throws HiveException {
}
}
有关大数据学习资源,请关注微信公众号“码农书斋”。回复“大数据”,免费获取学习视频、源码及资料!
最后
以上就是干净小蝴蝶为你收集整理的使用SparkSQL 和 Hive API 代码实现用户自定义函数UDF UDAF UDTF的全部内容,希望文章能够帮你解决使用SparkSQL 和 Hive API 代码实现用户自定义函数UDF UDAF UDTF所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复