我是靠谱客的博主 玩命爆米花,最近开发中收集的这篇文章主要介绍批量将标签数据写入hbase中,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

文章目录

      • 引入pom文件依赖
      • 将标签按日输出到hbase表中

引入pom文件依赖

    <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-server</artifactId>
            <version>1.1.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase</artifactId>
            <version>1.1.1</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-common</artifactId>
            <version>1.1.1</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-client</artifactId>
            <version>1.1.1</version>
        </dependency>

将标签按日输出到hbase表中

package com.dmp.tags

import com.dmp.utils.TagsUtils
import com.typesafe.config.ConfigFactory
import org.apache.hadoop.hbase.{HColumnDescriptor, HTableDescriptor, TableName}
import org.apache.hadoop.hbase.client.{ConnectionFactory, Put}
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapred.TableOutputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.mapred.JobConf
import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}

/*
C:UsersadminDesktopresult1
C:UsersadminDesktopdataapp_mapping.txt
C:UsersadminDesktopdatastops.txt
C:UsersadminDesktopdataresultTags1
 */
object Tags4Ctx {
  def main(args: Array[String]): Unit = {
    // 0 校验参数个数
    if (args.length != 4) {
      println(
        """
          |com.dmp.tags.Tags4Ctx
          |参数:
          | logInputPath
          | dictionaryPath
          | stopwordPath
          | resultOutputPath
          | day
        """.stripMargin)
      sys.exit()
    }

    // 1 接受程序参数
    val Array(logInputPath, dictionaryPath, stopwordPath, resultOutputPath,day) = args

    // 2 创建sparkconf->sparkContext
    val sparkConf = new SparkConf()
    sparkConf.setAppName(s"${this.getClass.getSimpleName}")
    sparkConf.setMaster("local[*]")
    // RDD 序列化到磁盘 worker与worker之间的数据传输
    sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    val sc = new SparkContext(sparkConf)

    val sQLContext = new SQLContext(sc)
    //字典文件 appMapping
    val dicMap = sc.textFile(dictionaryPath)
      .map(line => {
        val fields = line.split("t", -1)
        (fields(4), fields(5))
      }).collect().toMap

    // 停用词
    val stopWordsMap = sc.textFile(stopwordPath)
      .map((_, 0))
      .collect().toMap
    //广播出去
    val broadcastAppDict = sc.broadcast(dicMap)
    val broadcastStopWordsDict = sc.broadcast(stopWordsMap)
    //判断hbase的表是否存在.不存在则创建
    val configuration = sc.hadoopConfiguration
    val load = ConfigFactory.load()
    val hbaseTabName = load.getString("hbase.table.name")
    configuration.set("hbase.zookeeper.quorum", load.getString("hbase.zookeeper.host"))
    val hbConn = ConnectionFactory.createConnection(configuration)
    val hbAdmin = hbConn.getAdmin

    if (!hbAdmin.tableExists(TableName.valueOf(hbaseTabName))){
      println(s"$hbaseTabName is not exists")
      println(s"$hbaseTabName is creating")

      val tabNameDescriptor = new  HTableDescriptor(TableName.valueOf(hbaseTabName))
      //创建列族
      val columnDescriptor = new HColumnDescriptor("cf")
      tabNameDescriptor.addFamily(columnDescriptor)
      //创建表
      hbAdmin.createTable(tabNameDescriptor)
      //释放资源
      hbAdmin.close()
      hbConn.close()
    }

    //类似于mr 指定key的输出类型

    val jobConf = new JobConf(configuration)
    jobConf.setOutputFormat(classOf[TableOutputFormat])
    //指定表名称
    jobConf.set(TableOutputFormat.OUTPUT_TABLE,hbaseTabName)


    //读取日志的parquet文件
    sQLContext.read.parquet(logInputPath)
      .where(TagsUtils.hasSomeUserIdCondition) //过滤
      .map(row => {
      //行数据进行标签化处理
      //广告
      val ads = Tags4Ads.makeTags(row)
      //媒介
      val apps = TagsApp.makeTags(row, broadcastAppDict.value)
      //设备
      val devices = Tags4Devices.makeTags(row)
      //停用词
      val keywords = Tags4KeyWords.makeTags(row, broadcastStopWordsDict.value)

      val allUserId = TagsUtils.getAllUserId(row)
      (allUserId(0), (ads ++ apps ++ devices).toList)
    }).reduceByKey((a, b) => {
      // List(("K电视剧",1),("K电视剧",1))  => groupBy => Map["K电视剧",List(....)]
      //foldLeft(0)(_+_._2) 表示前一个加上后一个值
      //第一种写法
      //      (a ++ b).groupBy(_._1).mapValues(_.foldLeft(0)(_+_._2)).toList
      (a ++ b).groupBy(_._1).map {
        //使用偏函数
        case (k, smaTags) => (k, smaTags.map(_._2).sum)
      }.toList
    })
      //      .saveAsTextFile(resultOutputPath)
      .map {
      case (userId, userTags) => {
        //以用户id为rowkey
        val put = new Put(Bytes.toBytes(userId))
        //list里面是元组,key是标签值 value是 个数   map转换成List(String) 类型  然后mkString转换成字符串类型,
        //按,分割
        val tags = userTags.map(t => t._1 + ":" + t._2).mkString(",")
        //列族 cf 列day + 日期 值 标签
        put.addColumn(Bytes.toBytes("cf"),Bytes.toBytes(s"day$day"),Bytes.toBytes(tags))
        //转换成hadoop的输出类型
        (new ImmutableBytesWritable(),put)
      }
    }.saveAsHadoopDataset(jobConf)
    sc.stop()
  }
}

最后

以上就是玩命爆米花为你收集整理的批量将标签数据写入hbase中的全部内容,希望文章能够帮你解决批量将标签数据写入hbase中所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(63)

评论列表共有 0 条评论

立即
投稿
返回
顶部