概述
文章目录
- 使用catalog读取hive的元数据信息
- SparkSql历程
- dataset和dataframe区别
- DataSet创建
- SparkStreaming
- 配置idea
- streaming TCP程序
使用catalog读取hive的元数据信息
package com.ruozedata.bigdata.sql06
import org.apache.spark.sql.SparkSession
object CatalogApp {
def main(args: Array[String]): Unit = {
val sparkSession = SparkSession.builder()
.appName("DataSourceAPIApp")
.master("local[2]")
.getOrCreate()
import sparkSession.implicits._
val catalog=sparkSession.catalog
catalog.listDatabases().select("name","locationUri").show(false)
catalog.listTables("default").show(false)
catalog.listColumns("default","ruoze_emp").show(false)
sparkSession.stop()
}
}
SparkSql历程
sparksql 1.0版本有的
1.3 DataFrame
1.6 多了Dataset ,为了compile-time type safety 编译时的类型安全
dataset和dataframe区别
比如说要执行这么一个语句
spark.sql("seelct a from x")
虽然这个select打错了,但是写代码的时候并不会报错,只有在程序运行时出错,这不是件好事。
如果使用的是dataframe
df.seelct("a")
在select敲错的时候就会报错
但是如果里面的列打错,还是得在运行时报错
df.select("ax")
如果使用dataset,即使在列写错的时候,编译也会报错(就是相当于idea敲完变红色),而不许运行时
ds.seelct("a")
ds.map(_.ax)
与RDD和Dataset不同,DataFrame每一行的类型固定为Row,只有通过解析才能获取各个字段的值,无法直接获取每一列的值,比如下面的map需要给domain指定以string形式获取
infoDF.map(x=>x.getAs[String]("domain"))
DataFrame也可以叫做DataSet[Row],每一行的类型都是Row,不解析我们就无法知晓其中有哪些字段
DataSet可以理解成DataFrame的一种特例,主要区别是DataSet每一个record存储的是一个强类型值而不是一个Row
DataSet创建
package com.ruozedata.bigdata.sql06
import org.apache.spark.sql.SparkSession
object CatalogApp {
def main(args: Array[String]): Unit = {
val sparkSession = SparkSession.builder()
.appName("DataSourceAPIApp")
.master("local[2]")
.getOrCreate()
import sparkSession.implicits._
// sparkSession.read.format("csv").option("header","true").option("inferSchema","true").csv()
//
// //使用csv文件里第一行带的schema信息,返回dataframe类型
// val df = sparkSession.read.format("csv").option("header","true").option("inferSchema","true").csv("file:///home/hadoop/data/sales.csv")
// //转成dataset,使用下面的case class
// val ds = sparkSession.read.format("csv").option("header","true").option("inferSchema","true").csv("file:///home/hadoop/data/sales.csv").as[Sales]
//
// val selectDF = df.select("itemId")
// val selectDS = ds.select("itemId") //.show(false) // 运行时异常
// //ds.map(_.itemId).show(false)
//
// selectDF.queryExecution.optimizedPlan.numberedTreeString
// selectDS.queryExecution.optimizedPlan.numberedTreeString
sparkSession.stop()
}
case class Sales(transactionId:Int,customerId:Int,itemId:Int,amountPaid:Double)
}
SparkStreaming
以批处理为主,使用微批来解决实时问题
sparkstreaming的抽象DStream
一个DStream代表一串RDDS
配置idea
<dependency>
<groupId>org.appche.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
streaming TCP程序
先监听TCP9999端口
nc -lk 9999
package com.ruozedata.bigdata.streaming01
import org.apache.spark._
import org.apache.spark.streaming.{Seconds, StreamingContext}
object StreamingWCApp {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[2]").setAppName("StreamingWCApp")
//streaming数据多久被切成一批
val ssc = new StreamingContext(conf, Seconds(10))
//TCP作为源
val lines = ssc.socketTextStream("hadoop000",9999)
val results = lines.flatMap(_.split(",")).map((_,1)).reduceByKey(_+_)
//打印前10条
results.print()
//启动streaming
ssc.start()
ssc.awaitTermination()
}
}
windows下网页
http://localhost:4040/streaming/
最后
以上就是多情果汁为你收集整理的catalog,dataset,sparkstreamingdataset和dataframe区别SparkStreaming的全部内容,希望文章能够帮你解决catalog,dataset,sparkstreamingdataset和dataframe区别SparkStreaming所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复