文章目录
- 引入pom文件依赖
- 将标签按日输出到hbase表中
引入pom文件依赖
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23<dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-server</artifactId> <version>1.1.1</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase</artifactId> <version>1.1.1</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-common</artifactId> <version>1.1.1</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-client</artifactId> <version>1.1.1</version> </dependency>
将标签按日输出到hbase表中
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137package com.dmp.tags import com.dmp.utils.TagsUtils import com.typesafe.config.ConfigFactory import org.apache.hadoop.hbase.{HColumnDescriptor, HTableDescriptor, TableName} import org.apache.hadoop.hbase.client.{ConnectionFactory, Put} import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.mapred.TableOutputFormat import org.apache.hadoop.hbase.util.Bytes import org.apache.hadoop.mapred.JobConf import org.apache.spark.sql.SQLContext import org.apache.spark.{SparkConf, SparkContext} /* C:UsersadminDesktopresult1 C:UsersadminDesktopdataapp_mapping.txt C:UsersadminDesktopdatastops.txt C:UsersadminDesktopdataresultTags1 */ object Tags4Ctx { def main(args: Array[String]): Unit = { // 0 校验参数个数 if (args.length != 4) { println( """ |com.dmp.tags.Tags4Ctx |参数: | logInputPath | dictionaryPath | stopwordPath | resultOutputPath | day """.stripMargin) sys.exit() } // 1 接受程序参数 val Array(logInputPath, dictionaryPath, stopwordPath, resultOutputPath,day) = args // 2 创建sparkconf->sparkContext val sparkConf = new SparkConf() sparkConf.setAppName(s"${this.getClass.getSimpleName}") sparkConf.setMaster("local[*]") // RDD 序列化到磁盘 worker与worker之间的数据传输 sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") val sc = new SparkContext(sparkConf) val sQLContext = new SQLContext(sc) //字典文件 appMapping val dicMap = sc.textFile(dictionaryPath) .map(line => { val fields = line.split("t", -1) (fields(4), fields(5)) }).collect().toMap // 停用词 val stopWordsMap = sc.textFile(stopwordPath) .map((_, 0)) .collect().toMap //广播出去 val broadcastAppDict = sc.broadcast(dicMap) val broadcastStopWordsDict = sc.broadcast(stopWordsMap) //判断hbase的表是否存在.不存在则创建 val configuration = sc.hadoopConfiguration val load = ConfigFactory.load() val hbaseTabName = load.getString("hbase.table.name") configuration.set("hbase.zookeeper.quorum", load.getString("hbase.zookeeper.host")) val hbConn = ConnectionFactory.createConnection(configuration) val hbAdmin = hbConn.getAdmin if (!hbAdmin.tableExists(TableName.valueOf(hbaseTabName))){ println(s"$hbaseTabName is not exists") println(s"$hbaseTabName is creating") val tabNameDescriptor = new HTableDescriptor(TableName.valueOf(hbaseTabName)) //创建列族 val columnDescriptor = new HColumnDescriptor("cf") tabNameDescriptor.addFamily(columnDescriptor) //创建表 hbAdmin.createTable(tabNameDescriptor) //释放资源 hbAdmin.close() hbConn.close() } //类似于mr 指定key的输出类型 val jobConf = new JobConf(configuration) jobConf.setOutputFormat(classOf[TableOutputFormat]) //指定表名称 jobConf.set(TableOutputFormat.OUTPUT_TABLE,hbaseTabName) //读取日志的parquet文件 sQLContext.read.parquet(logInputPath) .where(TagsUtils.hasSomeUserIdCondition) //过滤 .map(row => { //行数据进行标签化处理 //广告 val ads = Tags4Ads.makeTags(row) //媒介 val apps = TagsApp.makeTags(row, broadcastAppDict.value) //设备 val devices = Tags4Devices.makeTags(row) //停用词 val keywords = Tags4KeyWords.makeTags(row, broadcastStopWordsDict.value) val allUserId = TagsUtils.getAllUserId(row) (allUserId(0), (ads ++ apps ++ devices).toList) }).reduceByKey((a, b) => { // List(("K电视剧",1),("K电视剧",1)) => groupBy => Map["K电视剧",List(....)] //foldLeft(0)(_+_._2) 表示前一个加上后一个值 //第一种写法 // (a ++ b).groupBy(_._1).mapValues(_.foldLeft(0)(_+_._2)).toList (a ++ b).groupBy(_._1).map { //使用偏函数 case (k, smaTags) => (k, smaTags.map(_._2).sum) }.toList }) // .saveAsTextFile(resultOutputPath) .map { case (userId, userTags) => { //以用户id为rowkey val put = new Put(Bytes.toBytes(userId)) //list里面是元组,key是标签值 value是 个数 map转换成List(String) 类型 然后mkString转换成字符串类型, //按,分割 val tags = userTags.map(t => t._1 + ":" + t._2).mkString(",") //列族 cf 列day + 日期 值 标签 put.addColumn(Bytes.toBytes("cf"),Bytes.toBytes(s"day$day"),Bytes.toBytes(tags)) //转换成hadoop的输出类型 (new ImmutableBytesWritable(),put) } }.saveAsHadoopDataset(jobConf) sc.stop() } }
最后
以上就是玩命爆米花最近收集整理的关于批量将标签数据写入hbase中的全部内容,更多相关批量将标签数据写入hbase中内容请搜索靠谱客的其他文章。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复