我是靠谱客的博主 瘦瘦犀牛,最近开发中收集的这篇文章主要介绍Spark装载数据源、Spark读取Jar包执行Scala操作一、装载CSV文件二、装载JSON数据源三、Spark读取Jar包执行Scala操作,觉得挺不错的,现在分享给大家,希望可以做个参考。
概述
一、装载CSV文件
1、使用SparkContext
val conf = new SparkConf().setAppName("CsvDemo").setMaster("local[*]")
val sc = new SparkContext(conf)
val lines = sc.textFile("in/users.csv")
lines.foreach(println)
读取显示效果如下:
user_id,locale,birthyear,gender,joinedAt,location,timezone
3197468391,id_ID,1993,male,2012-10-02T06:40:55.524Z,Medan Indonesia,480
3537982273,id_ID,1992,male,2012-09-29T18:03:12.111Z,Medan Indonesia,420
823183725,en_US,1975,male,2012-10-06T03:14:07.149Z,Stratford Ontario,-240
1872223848,en_US,1991,female,2012-11-04T08:59:43.783Z,Tehran Iran,210
3429017717,id_ID,1995,female,2012-09-10T16:06:53.132Z,,420
627175141,ka_GE,1973,female,2012-11-01T09:59:17.590Z,Tbilisi Georgia,240
2752000443,id_ID,1994,male,2012-10-03T05:22:17.637Z,Medan Indonesia,420
3473687777,id_ID,1965,female,2012-10-03T12:19:29.975Z,Medan Indonesia,420
2966052962,id_ID,1979,male,2012-10-31T10:11:57.668Z,Medan Indonesia,420
264876277,id_ID,1988,female,2012-10-02T07:28:09.555Z,Medan Indonesia,420
1534483818,en_US,1992,male,2012-09-25T13:38:04.083Z,Medan Indonesia,420
2648135297,en_US,1996,female,2012-10-30T05:09:45.592Z,Phnom Penh,420
2、使用SparkSession
val spark = SparkSession.builder().appName("CSV").master("local[*]").getOrCreate()
//val df = spark.read.format("csv").option("header","false").load("in/users.csv") //与下面一行等价
val df = spark.read.csv("in/users.csv")
df.show()
读取显示效果如下:
+----------+------+---------+------+--------------------+------------------+--------+
| _c0| _c1| _c2| _c3| _c4| _c5| _c6|
+----------+------+---------+------+--------------------+------------------+--------+
| user_id|locale|birthyear|gender| joinedAt| location|timezone|
|3197468391| id_ID| 1993| male|2012-10-02T06:40:...| Medan Indonesia| 480|
|3537982273| id_ID| 1992| male|2012-09-29T18:03:...| Medan Indonesia| 420|
| 823183725| en_US| 1975| male|2012-10-06T03:14:...|Stratford Ontario| -240|
|1872223848| en_US| 1991|female|2012-11-04T08:59:...| Tehran Iran| 210|
|3429017717| id_ID| 1995|female|2012-09-10T16:06:...| null| 420|
| 627175141| ka_GE| 1973|female|2012-11-01T09:59:...| Tbilisi Georgia| 240|
|2752000443| id_ID| 1994| male|2012-10-03T05:22:...| Medan Indonesia| 420|
|3473687777| id_ID| 1965|female|2012-10-03T12:19:...| Medan Indonesia| 420|
|2966052962| id_ID| 1979| male|2012-10-31T10:11:...| Medan Indonesia| 420|
| 264876277| id_ID| 1988|female|2012-10-02T07:28:...| Medan Indonesia| 420|
|1534483818| en_US| 1992| male|2012-09-25T13:38:...| Medan Indonesia| 420|
|2648135297| en_US| 1996|female|2012-10-30T05:09:...| Phnom Penh| 420|
+----------+------+---------+------+--------------------+------------------+--------+
二、装载JSON数据源
1、使用SparkContext
val conf = new SparkConf().setAppName("Json").setMaster("local[*]")
val sc = new SparkContext(conf)
val lines = sc.textFile("in/users.json")
import scala.util.parsing.json._
val rdd = lines.map(x=>JSON.parseFull(x))
rdd.collect.foreach(println)
读取显示效果如下:
Some(Map(name -> Michael))
Some(Map(name -> Andy, Age -> 30.0))
Some(Map(name -> Justin, Age -> 19.0))
2、使用SparkSession
val spark = SparkSession.builder().appName("JSON").master("local[*]").getOrCreate()
//val df = spark.read.format("json").option("header","true").load("in/users.json")
val df = spark.read.json("in/users.json")//与上一行等价
df.show()
读取显示效果如下:
+----+-------+
| Age| name|
+----+-------+
|null|Michael|
| 30| Andy|
| 19| Justin|
+----+-------+
三、Spark读取Jar包执行Scala操作
1、打包Spark
代码
- 此处以读取配置文件方式读取文件,方便之后读取文件的修改,而不用修改代码
- 注意需要将读取文件上传到相应的hdfs文件系统
配置文件:
loadfile:hdfs://192.168.8.99:9000/kb09File/world.txt
outfile:hdfs://192.168.8.99:9000/kb09File/outfile
Scala代码:
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("WordCount").setMaster("local[*]")
val sc = new SparkContext(conf)
val properties=new Properties()
properties.load(new FileInputStream("/opt/userset.properties"))
val loadFilePath=properties.get("loadfile").toString
val outFilePath=properties.get("outfile").toString
val lines = sc.textFile(loadFilePath)
lines.flatMap(x=>x.split(" ")).map(x=>(x,1)).reduceByKey(_+_).saveAsTextFile(outFilePath)
lines.flatMap(x=>x.split(" ")).map(x=>(x,1)).reduceByKey(_+_).foreach(println)
sc.stop()
}
2、删除安全文件
- 上传Jar包到虚拟机上
- 执行以下语句,删除Jar包中的安全文件,否则无法执行Scala代码
zip -d /opt/kb09File/sparkdemo1.jar 'META-INF/*.DSA' 'META-INF/*SF'
3、读取Jar包
- 上传配置文件到hdfs文件系统
- 执行Jar包操作Scala代码
spark-submit --class gaoji.WordCount --master local[1] ./sparkdemo1.jar
最后
以上就是瘦瘦犀牛为你收集整理的Spark装载数据源、Spark读取Jar包执行Scala操作一、装载CSV文件二、装载JSON数据源三、Spark读取Jar包执行Scala操作的全部内容,希望文章能够帮你解决Spark装载数据源、Spark读取Jar包执行Scala操作一、装载CSV文件二、装载JSON数据源三、Spark读取Jar包执行Scala操作所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复