我是靠谱客的博主 多情果汁,最近开发中收集的这篇文章主要介绍catalog,dataset,sparkstreamingdataset和dataframe区别SparkStreaming,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

文章目录

    • 使用catalog读取hive的元数据信息
    • SparkSql历程
  • dataset和dataframe区别
    • DataSet创建
  • SparkStreaming
    • 配置idea
    • streaming TCP程序

使用catalog读取hive的元数据信息

package com.ruozedata.bigdata.sql06

import org.apache.spark.sql.SparkSession

object CatalogApp {
  def main(args: Array[String]): Unit = {
    val sparkSession = SparkSession.builder()
      .appName("DataSourceAPIApp")
      .master("local[2]")
      .getOrCreate()
import sparkSession.implicits._
    val catalog=sparkSession.catalog
    catalog.listDatabases().select("name","locationUri").show(false)

    catalog.listTables("default").show(false)
    catalog.listColumns("default","ruoze_emp").show(false)
    sparkSession.stop()
  }
}

SparkSql历程

sparksql 1.0版本有的
1.3 DataFrame
1.6 多了Dataset ,为了compile-time type safety 编译时的类型安全
在这里插入图片描述

dataset和dataframe区别

在这里插入图片描述 比如说要执行这么一个语句

 spark.sql("seelct a from x")

虽然这个select打错了,但是写代码的时候并不会报错,只有在程序运行时出错,这不是件好事。

如果使用的是dataframe

 df.seelct("a") 

在select敲错的时候就会报错
但是如果里面的列打错,还是得在运行时报错

 df.select("ax")

如果使用dataset,即使在列写错的时候,编译也会报错(就是相当于idea敲完变红色),而不许运行时

ds.seelct("a")
ds.map(_.ax)

与RDD和Dataset不同,DataFrame每一行的类型固定为Row,只有通过解析才能获取各个字段的值,无法直接获取每一列的值,比如下面的map需要给domain指定以string形式获取

infoDF.map(x=>x.getAs[String]("domain"))

DataFrame也可以叫做DataSet[Row],每一行的类型都是Row,不解析我们就无法知晓其中有哪些字段

DataSet可以理解成DataFrame的一种特例,主要区别是DataSet每一个record存储的是一个强类型值而不是一个Row

DataSet创建

package com.ruozedata.bigdata.sql06

import org.apache.spark.sql.SparkSession

object CatalogApp {
  def main(args: Array[String]): Unit = {
    val sparkSession = SparkSession.builder()
      .appName("DataSourceAPIApp")
      .master("local[2]")
      .getOrCreate()
import sparkSession.implicits._


//    sparkSession.read.format("csv").option("header","true").option("inferSchema","true").csv()
//
//    //使用csv文件里第一行带的schema信息,返回dataframe类型
//    val df = sparkSession.read.format("csv").option("header","true").option("inferSchema","true").csv("file:///home/hadoop/data/sales.csv")
//    //转成dataset,使用下面的case class
//    val ds = sparkSession.read.format("csv").option("header","true").option("inferSchema","true").csv("file:///home/hadoop/data/sales.csv").as[Sales]
//
//    val selectDF = df.select("itemId")
//    val selectDS = ds.select("itemId") //.show(false) // 运行时异常
//    //ds.map(_.itemId).show(false)
//
//    selectDF.queryExecution.optimizedPlan.numberedTreeString
//    selectDS.queryExecution.optimizedPlan.numberedTreeString

    sparkSession.stop()
  }

  case class Sales(transactionId:Int,customerId:Int,itemId:Int,amountPaid:Double)
}

SparkStreaming

以批处理为主,使用微批来解决实时问题
在这里插入图片描述
sparkstreaming的抽象DStream
一个DStream代表一串RDDS

在这里插入图片描述

配置idea

<dependency>
      <groupId>org.appche.spark</groupId>
      <artifactId>spark-streaming_2.11</artifactId>
      <version>${spark.version}</version>
    </dependency>

streaming TCP程序

先监听TCP9999端口

nc -lk 9999

package com.ruozedata.bigdata.streaming01

import org.apache.spark._
import org.apache.spark.streaming.{Seconds, StreamingContext}


object StreamingWCApp {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local[2]").setAppName("StreamingWCApp")
    //streaming数据多久被切成一批
    val ssc = new StreamingContext(conf, Seconds(10))

    //TCP作为源
    val lines = ssc.socketTextStream("hadoop000",9999)
    val results = lines.flatMap(_.split(",")).map((_,1)).reduceByKey(_+_)
    //打印前10条
    results.print()

    //启动streaming
    ssc.start()
    ssc.awaitTermination()
  }
}

windows下网页
http://localhost:4040/streaming/

最后

以上就是多情果汁为你收集整理的catalog,dataset,sparkstreamingdataset和dataframe区别SparkStreaming的全部内容,希望文章能够帮你解决catalog,dataset,sparkstreamingdataset和dataframe区别SparkStreaming所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(42)

评论列表共有 0 条评论

立即
投稿
返回
顶部