我是靠谱客的博主 典雅樱桃,最近开发中收集的这篇文章主要介绍Spark读写ES,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

1.创建spark对象并配置读取es的连接信息

//创建spark上下文对象
val spark = SparkSession
.builder()
.appName("test")
.config("spark.port.maxretries", "128")
.config("spark.sql.parquet.writelegacyFormat", true)
.config("es.index.auto.create", "true")
.config("es.nodes.wan.only", "true")
.config("es.nodes", "0.0.0.0:9200")
.master("local")
.config("hive.metastore.uris", "thrift://0.0.0.0:9083")
.config("hive.execution.engine", "mr")
.config("spark.sql.warehouse.dir", "hdfs://0.0.0.0:8020/apps/hive/warehouse")
.config("spark.yarn.am.waitTime", "1000s")
.config("spark.default.parallelism", "300")
.config("spark.ui.retainedStages", 500)
.config("spark.hadoop.mapred.output.compress", "true")
.config("spark.hadoop.mapred.output.compression.codec", "snappy")
.config("spark.sql.hive.convertMetastoreParquet", "false")
.config("spark.sql.hive.convertMetastoreOrc", "false")
.enableHiveSupport()
.getOrCreate()
import org.elasticsearch.spark.sql._//一定要导包!
val options = Map(
"es.nodes.wan.only" -> "true",
"es.nodes" -> "0.0.0.0:9200,0.0.0.0:9200,0.0.0.0:9200,0.0.0.0:9200",//es节点
"es.port" -> "9200"//es的端口号
)

2.读取es数据

import org.elasticsearch.spark.sql._//一定要导包!
spark.read.format("es").options(options).load("索引名称/_doc")
.filter("添加一些过滤条件")
.createOrReplaceTempView("创建的临时表名称")

3.写数据到es

import org.elasticsearch.spark.sql._//一定要导包!
val sourceDF = spark.sql("hiveSql")
sourceDF
.write
.format("org.elasticsearch.spark.sql")
.options(options)
.mode(SaveMode.Append)
.save("写入的索引名/_docs")

最后

以上就是典雅樱桃为你收集整理的Spark读写ES的全部内容,希望文章能够帮你解决Spark读写ES所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(42)

评论列表共有 0 条评论

立即
投稿
返回
顶部