我是靠谱客的博主 瘦瘦犀牛,最近开发中收集的这篇文章主要介绍Spark装载数据源、Spark读取Jar包执行Scala操作一、装载CSV文件二、装载JSON数据源三、Spark读取Jar包执行Scala操作,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

一、装载CSV文件

1、使用SparkContext

    val conf = new SparkConf().setAppName("CsvDemo").setMaster("local[*]")
    val sc = new SparkContext(conf)
    val lines = sc.textFile("in/users.csv")
    lines.foreach(println)

读取显示效果如下:

user_id,locale,birthyear,gender,joinedAt,location,timezone
3197468391,id_ID,1993,male,2012-10-02T06:40:55.524Z,Medan  Indonesia,480
3537982273,id_ID,1992,male,2012-09-29T18:03:12.111Z,Medan  Indonesia,420
823183725,en_US,1975,male,2012-10-06T03:14:07.149Z,Stratford  Ontario,-240
1872223848,en_US,1991,female,2012-11-04T08:59:43.783Z,Tehran  Iran,210
3429017717,id_ID,1995,female,2012-09-10T16:06:53.132Z,,420
627175141,ka_GE,1973,female,2012-11-01T09:59:17.590Z,Tbilisi  Georgia,240
2752000443,id_ID,1994,male,2012-10-03T05:22:17.637Z,Medan  Indonesia,420
3473687777,id_ID,1965,female,2012-10-03T12:19:29.975Z,Medan  Indonesia,420
2966052962,id_ID,1979,male,2012-10-31T10:11:57.668Z,Medan  Indonesia,420
264876277,id_ID,1988,female,2012-10-02T07:28:09.555Z,Medan  Indonesia,420
1534483818,en_US,1992,male,2012-09-25T13:38:04.083Z,Medan  Indonesia,420
2648135297,en_US,1996,female,2012-10-30T05:09:45.592Z,Phnom Penh,420

2、使用SparkSession

    val spark = SparkSession.builder().appName("CSV").master("local[*]").getOrCreate()
    //val df = spark.read.format("csv").option("header","false").load("in/users.csv")		//与下面一行等价
    val df = spark.read.csv("in/users.csv")
    df.show()

读取显示效果如下:

+----------+------+---------+------+--------------------+------------------+--------+
|       _c0|   _c1|      _c2|   _c3|                 _c4|               _c5|     _c6|
+----------+------+---------+------+--------------------+------------------+--------+
|   user_id|locale|birthyear|gender|            joinedAt|          location|timezone|
|3197468391| id_ID|     1993|  male|2012-10-02T06:40:...|  Medan  Indonesia|     480|
|3537982273| id_ID|     1992|  male|2012-09-29T18:03:...|  Medan  Indonesia|     420|
| 823183725| en_US|     1975|  male|2012-10-06T03:14:...|Stratford  Ontario|    -240|
|1872223848| en_US|     1991|female|2012-11-04T08:59:...|      Tehran  Iran|     210|
|3429017717| id_ID|     1995|female|2012-09-10T16:06:...|              null|     420|
| 627175141| ka_GE|     1973|female|2012-11-01T09:59:...|  Tbilisi  Georgia|     240|
|2752000443| id_ID|     1994|  male|2012-10-03T05:22:...|  Medan  Indonesia|     420|
|3473687777| id_ID|     1965|female|2012-10-03T12:19:...|  Medan  Indonesia|     420|
|2966052962| id_ID|     1979|  male|2012-10-31T10:11:...|  Medan  Indonesia|     420|
| 264876277| id_ID|     1988|female|2012-10-02T07:28:...|  Medan  Indonesia|     420|
|1534483818| en_US|     1992|  male|2012-09-25T13:38:...|  Medan  Indonesia|     420|
|2648135297| en_US|     1996|female|2012-10-30T05:09:...|        Phnom Penh|     420|
+----------+------+---------+------+--------------------+------------------+--------+

二、装载JSON数据源

1、使用SparkContext

    val conf = new SparkConf().setAppName("Json").setMaster("local[*]")
    val sc = new SparkContext(conf)
    val lines = sc.textFile("in/users.json")
    import scala.util.parsing.json._
    val rdd = lines.map(x=>JSON.parseFull(x))
    rdd.collect.foreach(println)

读取显示效果如下:

Some(Map(name -> Michael))
Some(Map(name -> Andy, Age -> 30.0))
Some(Map(name -> Justin, Age -> 19.0))

2、使用SparkSession

    val spark = SparkSession.builder().appName("JSON").master("local[*]").getOrCreate()
    //val df = spark.read.format("json").option("header","true").load("in/users.json")
    val df = spark.read.json("in/users.json")//与上一行等价
    df.show()

读取显示效果如下:

+----+-------+
| Age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+

三、Spark读取Jar包执行Scala操作

1、打包Spark代码

  • 此处以读取配置文件方式读取文件,方便之后读取文件的修改,而不用修改代码
  • 注意需要将读取文件上传到相应的hdfs文件系统

配置文件:

loadfile:hdfs://192.168.8.99:9000/kb09File/world.txt
outfile:hdfs://192.168.8.99:9000/kb09File/outfile

Scala代码:

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("WordCount").setMaster("local[*]")
    val sc = new SparkContext(conf)
    val properties=new Properties()
    properties.load(new FileInputStream("/opt/userset.properties"))
    val loadFilePath=properties.get("loadfile").toString
    val outFilePath=properties.get("outfile").toString
    val lines = sc.textFile(loadFilePath)
    lines.flatMap(x=>x.split(" ")).map(x=>(x,1)).reduceByKey(_+_).saveAsTextFile(outFilePath)
    lines.flatMap(x=>x.split(" ")).map(x=>(x,1)).reduceByKey(_+_).foreach(println)
    sc.stop()
  }

2、删除安全文件

  • 上传Jar包到虚拟机上
  • 执行以下语句,删除Jar包中的安全文件,否则无法执行Scala代码
	zip -d /opt/kb09File/sparkdemo1.jar 'META-INF/*.DSA' 'META-INF/*SF'

3、读取Jar包

  • 上传配置文件到hdfs文件系统
  • 执行Jar包操作Scala代码
	spark-submit --class gaoji.WordCount --master local[1] ./sparkdemo1.jar

最后

以上就是瘦瘦犀牛为你收集整理的Spark装载数据源、Spark读取Jar包执行Scala操作一、装载CSV文件二、装载JSON数据源三、Spark读取Jar包执行Scala操作的全部内容,希望文章能够帮你解决Spark装载数据源、Spark读取Jar包执行Scala操作一、装载CSV文件二、装载JSON数据源三、Spark读取Jar包执行Scala操作所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(63)

评论列表共有 0 条评论

立即
投稿
返回
顶部