我是靠谱客的博主 认真鸡翅,最近开发中收集的这篇文章主要介绍Spark之 RDD转换成DataFrame的Scala实现 Spark之 RDD转换成DataFrame的Scala实现 ,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

Spark之 RDD转换成DataFrame的Scala实现

依赖

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.1.3</version>
</dependency>

RDD转化成DataFrame:通过StructType指定schema

package com.zy.sparksql
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.{IntegerType, StringType, StructType}
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
/**
* RDD转化成DataFrame:通过StructType指定schema
*/
object StructTypeSchema {
def main(args: Array[String]): Unit = {
//创建sparkSession对象
val sparkSession: SparkSession = SparkSession.builder().appName("StructTypeSchema").master("local[2]").getOrCreate()
//获取sparkContext
val sc: SparkContext = sparkSession.sparkContext
//设置日志级别
sc.setLogLevel("WARN")
//读取文件
val textFile: RDD[String] = sc.textFile("D:\person.txt")
//切分文件
val lineArrayRDD: RDD[Array[String]] = textFile.map(_.split(","))
//关联对象
val rowRDD: RDD[Row] = lineArrayRDD.map(x => Row(x(0).toInt, x(1), x(2).toInt))
//创建rdd的schema信息
val schema: StructType = (new StructType)
.add("id", IntegerType, true, "id")
.add("name", StringType, false, "姓名")
.add("age", IntegerType, true, "年龄")
//根据rdd和schema信息创建DataFrame
val personDF: DataFrame = sparkSession.createDataFrame(rowRDD, schema)
//DSL操作

personDF.show()
//sql 操作
//将df注册成表
personDF.createTempView("person")
sparkSession.sql("select * from person where id =3").show()
sparkSession.stop()
}
}

 

RDD转化成DataFrame:利用反射机制推断schema

package com.zy.sparksql
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}
/**
* RDD转化成DataFrame:利用反射机制推断schema
*/
//todo 定义一个样例类
case class Person(id: Int, name: String, age: Int)
object CaseClassSchema {
def main(args: Array[String]): Unit = {
//构建sparkSession 指定appName和master地址(本地测试local)
val sparkSession: SparkSession = SparkSession.builder().appName("CaseClassSchema").master("local[2]").getOrCreate()
//获取sparkContext
val sc: SparkContext = sparkSession.sparkContext
//设置日志输出级别
sc.setLogLevel("WARN")
//加载数据
val dataRDD: RDD[String] = sc.textFile("D:\person.txt")
//切分数据
val lineArrayRDD: RDD[Array[String]] = dataRDD.map(_.split(","))
//将rdd和person样例类关联
val personRDD: RDD[Person] = lineArrayRDD.map(x => Person(x(0).toInt, x(1), x(2).toInt))
//将rdd转换成dataFrame 导入隐式转换
import sparkSession.implicits._
val personDF: DataFrame = personRDD.toDF
//DSL语法

personDF.show()
personDF.printSchema()
personDF.select("name").show()
personDF.filter($"age" > 30).show()
println("---------------------------------------------")
//sql语法
//首先要创建临时视图
personDF.createTempView("person")
sparkSession.sql("select * from person where id>1").show()
sparkSession.stop()
}
}

 

posted @ 2018-02-18 20:58 青衫仗剑 阅读( ...) 评论( ...) 编辑 收藏

最后

以上就是认真鸡翅为你收集整理的Spark之 RDD转换成DataFrame的Scala实现 Spark之 RDD转换成DataFrame的Scala实现 的全部内容,希望文章能够帮你解决Spark之 RDD转换成DataFrame的Scala实现 Spark之 RDD转换成DataFrame的Scala实现 所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(59)

评论列表共有 0 条评论

立即
投稿
返回
顶部