Structured Streaming与Kafka的整合,实现不同json结构解耦
问题:Structured Streaming从kafka的不同topic读取数据,每个topic的value存取的数据格式是不同的。那么怎么使用一套模版代码,分别对多个topic进行读取数据。做到解耦呢?
思考:Structured Streaming读取kafka的操作是一致的,只是对kafka的value值的解析操作和一些参数配置,处理数据的sql是不一样的。可以把解析操作抽象出来处理。通过定义Bean对象,将json解析成对应的Bean,最后通过传入配置文件的方式,将对应的配置信息及sql传入,然后对数据来进行处理,得到需要的数据。
一、具体代码如下:
- CommonStructuedKafka类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44package com.test import org.apache.log4j.{Level, Logger} import org.apache.spark.sql.SparkSession object CommonStructuedKafka { def main(args: Array[String]): Unit = { Logger.getLogger("org.apache.spark").setLevel(Level.WARN) Logger.getLogger("org.apache.kafka").setLevel(Level.WARN) // 读取配置文件信息 val masterUrl = Props.get("master", "local") val appName = Props.get("appName", "Test7") val className = Props.get("className", "") val kafkaBootstrapServers = Props.get("kafka.bootstrap.servers", "localhost:9092") val subscribe = Props.get("subscribe", "test") val tmpTable = Props.get("tmpTable", "tmp") val sparksql = Props.get("sparksql", "select * from tmp") val spark = SparkSession.builder() .master(masterUrl) .appName(appName) .getOrCreate() // 读取kafka数据 val lines = spark.readStream .format("kafka") .option("kafka.bootstrap.servers", kafkaBootstrapServers) .option("subscribe", subscribe) .load() //隐式转换 import spark.implicits._ val values = lines.selectExpr("cast(value as string)").as[String] val res = values.map { value => // 将json数据解析成list集合 val list = Tools.parseJson(value, className) // 将List转成元组 Tools.list2Tuple7(list) } res.createOrReplaceTempView(tmpTable) val result = spark.sql(sparksql) val query = result.writeStream .format("console") .outputMode("append") .start() query.awaitTermination() } }
- Tools:解析json的工具类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47package com.test import com.google.gson.Gson import scala.collection.mutable object Tools { def main(args: Array[String]): Unit = { val tools = new Tools() val res = tools.parse("{'name':'caocao','age':'32','sex':'male'}", "com.test.People") println(res) } def parseJson(json: String, className: String): List[String] = { val tools = new Tools() tools.parse(json, className) } // 将List转成Tuple7元组类,这里仅仅是定义7个字段,可以定义更多字段。(ps:这种处理方式很不雅,一时也没想到好办法) def list2Tuple7(list: List[String]): (String, String, String, String, String, String, String) = { val t = list match { case List(a) => (a, "", "", "", "", "", "") case List(a, b) => (a, b, "", "", "", "", "") case List(a, b, c) => (a, b, c, "", "", "", "") case List(a, b, c, d) => (a, b, c, d, "", "", "") case List(a, b, c, d, e) => (a, b, c, d, e, "", "") case List(a, b, c, d, e, f) => (a, b, c, d, e, f, "") case List(a, b, c, d, e, f, g) => (a, b, c, d, e, f, g) case _ => ("", "", "", "", "", "", "") } t } } class Tools { // 通过传进来的Bean的全类名,进行反射,解析json,返回一个List() def parse(json: String, className: String): List[String] = { val list = mutable.ListBuffer[String]() val gson = new Gson() val clazz = Class.forName(className) val obj = gson.fromJson(json, clazz) val aClass = obj.getClass val fields = aClass.getDeclaredFields fields.foreach { f => val fName = f.getName val m = aClass.getDeclaredMethod(fName) val value = m.invoke(obj).toString list.append(value) } list.toList } }
- Props:读取配置文件的工具类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48package com.test import java.io.{FileInputStream, InputStream} import java.nio.file.{Files, Paths} import java.util.Properties import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} object Props { private val prop = new Properties() prop.load(getPropertyFileInputStream) /** * 在spark-submit中加入--driver-java-options -DPropPath=/home/spark/prop.properties的参数后, * 使用System.getProperty("PropPath")就能获取路径:/home/spark/prop.properties如果spark-submit中指定了 * prop.properties文件的路径,那么使用prop.properties中的属性,否则使用该类中定义的属性 */ private def getPropertyFileInputStream: InputStream = { var is: InputStream = null val filePath = System.getProperty("PropPath") if (filePath != null && filePath.length > 0) { if (Files.exists(Paths.get(filePath))) { is = new FileInputStream(filePath) } else { println(s"在本地未找到config文件$filePath,尝试在HDFS上获取文件") val fs = FileSystem.get(new Configuration()) if (fs.exists(new Path(filePath))) { val fis = fs.open(new Path(filePath)) is = fis.getWrappedStream } else { println(s"在HDFS上找不到config文件$filePath,加载失败...") } } } else { println(s"未设置配置文件PropPath") } is } def get(propertyName: String, defaultValue: String): String = { prop.getProperty(propertyName, defaultValue) } def get(): Properties = { println("prop:" + this.prop) this.prop } def reload(): Properties = { prop.load(getPropertyFileInputStream) prop } }
- People类和Student类
1
2
3case class People(name: String, age: String, sex: String) extends Serializable case class Student(name: String, age: String, sex: String, idNum: String) extends Serializable
二、配置文件
- people.properties
1
2
3
4
5
6
7
8master=local appName=Test7 className=com.test.People kafka.bootstrap.servers=localhost:9092 subscribe=test tmpTable=tmp sparksql=select _1 as name, _2 as age, _3 as sex from tmp
- student.properties
1
2
3
4
5
6
7
8master=local appName=Test7 className=com.test.Student kafka.bootstrap.servers=localhost:9092 subscribe=test tmpTable=tmp sparksql=select _1 as name, _2 as age, _3 as sex, _4 as idNum from tmp
三、执行
提交Structured Streaming程序,需要加上参数,例如:-DPropPath=/Users/zhangzhiqiang/Documents/test_project/comtest/src/main/resources/people.properties
本地调试,可以Idea的VM Option选项添加。
3.1 传入people.properties文件,执行程序,在kafka生产端命令行输入{"name":"caocao","age":"32","sex":"male"}
,结果显示:
3.2 传入student.properties文件,执行程序,在kafka生产端命令行输入{"name":"caocao","age":"32","sex":"male","idNum":"1001"}
,结果显示:
github代码:structuredstreamngdemo项目
最后
以上就是甜美冷风最近收集整理的关于Structured Streaming与Kafka的整合,实现不同json结构解耦的全部内容,更多相关Structured内容请搜索靠谱客的其他文章。
发表评论 取消回复