概述
文章目录
- 引入pom文件依赖
- 将标签按日输出到hbase表中
引入pom文件依赖
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>1.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase</artifactId>
<version>1.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-common</artifactId>
<version>1.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>1.1.1</version>
</dependency>
将标签按日输出到hbase表中
package com.dmp.tags
import com.dmp.utils.TagsUtils
import com.typesafe.config.ConfigFactory
import org.apache.hadoop.hbase.{HColumnDescriptor, HTableDescriptor, TableName}
import org.apache.hadoop.hbase.client.{ConnectionFactory, Put}
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapred.TableOutputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.mapred.JobConf
import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}
/*
C:UsersadminDesktopresult1
C:UsersadminDesktopdataapp_mapping.txt
C:UsersadminDesktopdatastops.txt
C:UsersadminDesktopdataresultTags1
*/
object Tags4Ctx {
def main(args: Array[String]): Unit = {
// 0 校验参数个数
if (args.length != 4) {
println(
"""
|com.dmp.tags.Tags4Ctx
|参数:
| logInputPath
| dictionaryPath
| stopwordPath
| resultOutputPath
| day
""".stripMargin)
sys.exit()
}
// 1 接受程序参数
val Array(logInputPath, dictionaryPath, stopwordPath, resultOutputPath,day) = args
// 2 创建sparkconf->sparkContext
val sparkConf = new SparkConf()
sparkConf.setAppName(s"${this.getClass.getSimpleName}")
sparkConf.setMaster("local[*]")
// RDD 序列化到磁盘 worker与worker之间的数据传输
sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
val sc = new SparkContext(sparkConf)
val sQLContext = new SQLContext(sc)
//字典文件 appMapping
val dicMap = sc.textFile(dictionaryPath)
.map(line => {
val fields = line.split("t", -1)
(fields(4), fields(5))
}).collect().toMap
// 停用词
val stopWordsMap = sc.textFile(stopwordPath)
.map((_, 0))
.collect().toMap
//广播出去
val broadcastAppDict = sc.broadcast(dicMap)
val broadcastStopWordsDict = sc.broadcast(stopWordsMap)
//判断hbase的表是否存在.不存在则创建
val configuration = sc.hadoopConfiguration
val load = ConfigFactory.load()
val hbaseTabName = load.getString("hbase.table.name")
configuration.set("hbase.zookeeper.quorum", load.getString("hbase.zookeeper.host"))
val hbConn = ConnectionFactory.createConnection(configuration)
val hbAdmin = hbConn.getAdmin
if (!hbAdmin.tableExists(TableName.valueOf(hbaseTabName))){
println(s"$hbaseTabName is not exists")
println(s"$hbaseTabName is creating")
val tabNameDescriptor = new HTableDescriptor(TableName.valueOf(hbaseTabName))
//创建列族
val columnDescriptor = new HColumnDescriptor("cf")
tabNameDescriptor.addFamily(columnDescriptor)
//创建表
hbAdmin.createTable(tabNameDescriptor)
//释放资源
hbAdmin.close()
hbConn.close()
}
//类似于mr 指定key的输出类型
val jobConf = new JobConf(configuration)
jobConf.setOutputFormat(classOf[TableOutputFormat])
//指定表名称
jobConf.set(TableOutputFormat.OUTPUT_TABLE,hbaseTabName)
//读取日志的parquet文件
sQLContext.read.parquet(logInputPath)
.where(TagsUtils.hasSomeUserIdCondition) //过滤
.map(row => {
//行数据进行标签化处理
//广告
val ads = Tags4Ads.makeTags(row)
//媒介
val apps = TagsApp.makeTags(row, broadcastAppDict.value)
//设备
val devices = Tags4Devices.makeTags(row)
//停用词
val keywords = Tags4KeyWords.makeTags(row, broadcastStopWordsDict.value)
val allUserId = TagsUtils.getAllUserId(row)
(allUserId(0), (ads ++ apps ++ devices).toList)
}).reduceByKey((a, b) => {
// List(("K电视剧",1),("K电视剧",1)) => groupBy => Map["K电视剧",List(....)]
//foldLeft(0)(_+_._2) 表示前一个加上后一个值
//第一种写法
// (a ++ b).groupBy(_._1).mapValues(_.foldLeft(0)(_+_._2)).toList
(a ++ b).groupBy(_._1).map {
//使用偏函数
case (k, smaTags) => (k, smaTags.map(_._2).sum)
}.toList
})
// .saveAsTextFile(resultOutputPath)
.map {
case (userId, userTags) => {
//以用户id为rowkey
val put = new Put(Bytes.toBytes(userId))
//list里面是元组,key是标签值 value是 个数 map转换成List(String) 类型 然后mkString转换成字符串类型,
//按,分割
val tags = userTags.map(t => t._1 + ":" + t._2).mkString(",")
//列族 cf 列day + 日期 值 标签
put.addColumn(Bytes.toBytes("cf"),Bytes.toBytes(s"day$day"),Bytes.toBytes(tags))
//转换成hadoop的输出类型
(new ImmutableBytesWritable(),put)
}
}.saveAsHadoopDataset(jobConf)
sc.stop()
}
}
最后
以上就是玩命爆米花为你收集整理的批量将标签数据写入hbase中的全部内容,希望文章能够帮你解决批量将标签数据写入hbase中所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复