概述
1、pom.xml
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<maven.compiler.compilerVersion>1.8</maven.compiler.compilerVersion>
<scala.version>2.11.4</scala.version>
<spark.version>2.4.1</spark.version>
</properties>
<repositories>
<repository>
<id>scala-tools.org</id>
<name>Scala-Tools Maven2 Repository</name>
<url>http://scala-tools.org/repo-releases</url>
</repository>
</repositories>
<pluginRepositories>
<pluginRepository>
<id>scala-tools.org</id>
<name>Scala-Tools Maven2 Repository</name>
<url>http://scala-tools.org/repo-releases</url>
</pluginRepository>
</pluginRepositories>
<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql-kafka-0-10 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_2.11</artifactId>
<version>2.4.4</version>
<!--<scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>${spark.version}</version>
<scope>${scope}</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>${spark.version}</version>
<!--<scope>${scope}</scope>-->
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>${spark.version}</version>
<!--<scope>${scope}</scope>-->
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>${spark.version}</version>
<!--<scope>provided</scope>-->
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.34</version>
</dependency>
</dependencies>
2、Bean
创建所需要的Bean
@JsonIgnoreProperties(ignoreUnknown = true)
class DateBean extends Serializable {
var item:Array[ItemBean] = Array.empty
var sid = ""
var roomid = ""
var sourcelink=""
}
3、工具类
object KafkaStreamMapUtil {
def kafkaStreamToDStream(df:DataFrame, chatTypeName:String):
RDD[Array[(Array[MessageBean],Array[GiftBean])]] = {
df.rdd.map(row=>{
val jsonParse = new JsonParser()
val gs =new Gson()
var msg:scala.Array[(Array[MessageBean],Array[GiftBean])]=null
val
sdf = new SimpleDateFormat("yyyyMMdd")
try {
//
println(line._2)
val je=jsonParse.parse(row.getAs[String]("value"))
var dbArr:Array[DateBean]=null
if(je.isJsonArray){
dbArr= gs.fromJson(je,classOf[Array[DateBean]])
}else{
val dbb= gs.fromJson(je,classOf[DateBean])
dbArr= new Array[DateBean](1)
dbArr(0)=dbb
}
msg = dbArr.map(m=>{
var p=""
var r=""
var mArr:Array[MBean]=Array.empty
var gArr:Array[GBean]=Array.empty
if(m!=null & m.item!=null & m.item.length>0){
p=m.sid
r=m.r
var gBean :GBean= null
var mBean :MBean = null
m.item.foreach(x=> {
gBean = new GBean
mBean = new MBean
if(x!=null){
if(x.typeName.equals(chatTypeName)){
mBean .p =p
mBean .r =r
mBean .f = x.f
mBean .timestamp =x.time
mBean .content=x.content
val date
= sdf.format(new Date(x.time.toLong * 1000))
mBean .date = date
mArr :+= mBean
} else{
if(!x.price.trim.equals("")){
//
println(line._2)
gBean .p = p
gBean .r =r
gBean .f = x.f
gBean .fname = x.fname
gBean .g_type= x.g_type
gBean .price = x.price.toDouble
gBean .count = x.count
gBean .timestamp = x.time
gBean .g_name=x.gname
gBean .g_id=x.gid
val date
= sdf.format(new Date(x.time.toLong * 1000))
gBean .date = date
gArr :+= gBean
}
}
}
})
}
(msgArr,giftArr)
})
} catch {
case e: Exception =>println(e.getMessage)
}
msg
})
}
}
4、主类
object KafkaSourceApp {
def main(args: Array[String]): Unit = {
Logger.getLogger("org").setLevel(Level.ERROR)
val sqLContext = SparkSession.builder().appName(" structured_streaming_kafka_App")
.master("local[*]").getOrCreate()
val df = sqLContext
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "brokerid")
.option("subscribe", "topic")
.load()
val strDF = df
.selectExpr( "CAST(value AS STRING)")
val chatTypeName="chat"
strDF.writeStream.format("console")
.option("checkpointLocation", "path/")
.foreachBatch((df,id)=>{
println(s"batchid = $id")
import sqLContext.implicits._
val transRDD
= KafkaStreamMapUtil.kafkaStreamToDStream(df,chatTypeName)
val msgDF = transRDD.flatMap(x=>x)
.flatMap(_._1)
.map(e=>(e.p,e.r,e.f,e.content,e.timestamp,e.date))
.toDF("p","r","f","content","timestamp","date")
.selectExpr("*","platform_id as plat")
val giftDF =transRDD.flatMap(x=>x)
.flatMap(_._2)
.map(e=>(e.p,e.r,e.f,e.f_name,e.g_type,e.price,e.count,e.timestamp,e.date,e.g_name,e.g_id))
.toDF("p","r","f","fname","g_type",
"price","count","timestamp","date","g_name","g_id")
.selectExpr("*","platform_id as plat")
msgDF.show()
giftDF.show()
}).start().awaitTermination()
}
}
最后
以上就是缓慢小鸭子为你收集整理的七、Structured Streaming Kafka的全部内容,希望文章能够帮你解决七、Structured Streaming Kafka所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复