我是靠谱客的博主 认真鸡翅,最近开发中收集的这篇文章主要介绍Spark之 RDD转换成DataFrame的Scala实现 Spark之 RDD转换成DataFrame的Scala实现 ,觉得挺不错的,现在分享给大家,希望可以做个参考。
概述
Spark之 RDD转换成DataFrame的Scala实现
依赖
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>2.1.3</version> </dependency>
RDD转化成DataFrame:通过StructType指定schema
package com.zy.sparksql import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD import org.apache.spark.sql.types.{IntegerType, StringType, StructType} import org.apache.spark.sql.{DataFrame, Row, SparkSession} /** * RDD转化成DataFrame:通过StructType指定schema */ object StructTypeSchema { def main(args: Array[String]): Unit = { //创建sparkSession对象 val sparkSession: SparkSession = SparkSession.builder().appName("StructTypeSchema").master("local[2]").getOrCreate() //获取sparkContext val sc: SparkContext = sparkSession.sparkContext //设置日志级别 sc.setLogLevel("WARN") //读取文件 val textFile: RDD[String] = sc.textFile("D:\person.txt") //切分文件 val lineArrayRDD: RDD[Array[String]] = textFile.map(_.split(",")) //关联对象 val rowRDD: RDD[Row] = lineArrayRDD.map(x => Row(x(0).toInt, x(1), x(2).toInt)) //创建rdd的schema信息 val schema: StructType = (new StructType) .add("id", IntegerType, true, "id") .add("name", StringType, false, "姓名") .add("age", IntegerType, true, "年龄") //根据rdd和schema信息创建DataFrame val personDF: DataFrame = sparkSession.createDataFrame(rowRDD, schema) //DSL操作 personDF.show() //sql 操作 //将df注册成表 personDF.createTempView("person") sparkSession.sql("select * from person where id =3").show() sparkSession.stop() } }
RDD转化成DataFrame:利用反射机制推断schema
package com.zy.sparksql import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, SparkSession} /** * RDD转化成DataFrame:利用反射机制推断schema */ //todo 定义一个样例类 case class Person(id: Int, name: String, age: Int) object CaseClassSchema { def main(args: Array[String]): Unit = { //构建sparkSession 指定appName和master地址(本地测试local) val sparkSession: SparkSession = SparkSession.builder().appName("CaseClassSchema").master("local[2]").getOrCreate() //获取sparkContext val sc: SparkContext = sparkSession.sparkContext //设置日志输出级别 sc.setLogLevel("WARN") //加载数据 val dataRDD: RDD[String] = sc.textFile("D:\person.txt") //切分数据 val lineArrayRDD: RDD[Array[String]] = dataRDD.map(_.split(",")) //将rdd和person样例类关联 val personRDD: RDD[Person] = lineArrayRDD.map(x => Person(x(0).toInt, x(1), x(2).toInt)) //将rdd转换成dataFrame 导入隐式转换 import sparkSession.implicits._ val personDF: DataFrame = personRDD.toDF //DSL语法 personDF.show() personDF.printSchema() personDF.select("name").show() personDF.filter($"age" > 30).show() println("---------------------------------------------") //sql语法 //首先要创建临时视图 personDF.createTempView("person") sparkSession.sql("select * from person where id>1").show() sparkSession.stop() } }
posted @
2018-02-18 20:58
青衫仗剑 阅读(
...) 评论(
...)
编辑
收藏
最后
以上就是认真鸡翅为你收集整理的Spark之 RDD转换成DataFrame的Scala实现 Spark之 RDD转换成DataFrame的Scala实现 的全部内容,希望文章能够帮你解决Spark之 RDD转换成DataFrame的Scala实现 Spark之 RDD转换成DataFrame的Scala实现 所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复