概述
1.创建spark对象并配置读取es的连接信息
//创建spark上下文对象
val spark = SparkSession
.builder()
.appName("test")
.config("spark.port.maxretries", "128")
.config("spark.sql.parquet.writelegacyFormat", true)
.config("es.index.auto.create", "true")
.config("es.nodes.wan.only", "true")
.config("es.nodes", "0.0.0.0:9200")
.master("local")
.config("hive.metastore.uris", "thrift://0.0.0.0:9083")
.config("hive.execution.engine", "mr")
.config("spark.sql.warehouse.dir", "hdfs://0.0.0.0:8020/apps/hive/warehouse")
.config("spark.yarn.am.waitTime", "1000s")
.config("spark.default.parallelism", "300")
.config("spark.ui.retainedStages", 500)
.config("spark.hadoop.mapred.output.compress", "true")
.config("spark.hadoop.mapred.output.compression.codec", "snappy")
.config("spark.sql.hive.convertMetastoreParquet", "false")
.config("spark.sql.hive.convertMetastoreOrc", "false")
.enableHiveSupport()
.getOrCreate()
import org.elasticsearch.spark.sql._//一定要导包!
val options = Map(
"es.nodes.wan.only" -> "true",
"es.nodes" -> "0.0.0.0:9200,0.0.0.0:9200,0.0.0.0:9200,0.0.0.0:9200",//es节点
"es.port" -> "9200"//es的端口号
)
2.读取es数据
import org.elasticsearch.spark.sql._//一定要导包!
spark.read.format("es").options(options).load("索引名称/_doc")
.filter("添加一些过滤条件")
.createOrReplaceTempView("创建的临时表名称")
3.写数据到es
import org.elasticsearch.spark.sql._//一定要导包!
val sourceDF = spark.sql("hiveSql")
sourceDF
.write
.format("org.elasticsearch.spark.sql")
.options(options)
.mode(SaveMode.Append)
.save("写入的索引名/_docs")
最后
以上就是典雅樱桃为你收集整理的Spark读写ES的全部内容,希望文章能够帮你解决Spark读写ES所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复