我是靠谱客的博主 快乐故事,这篇文章主要介绍Spark2.3.0 结构化流 进行streaming+kafka的可操作算子流,现在分享给大家,希望可以做个参考。

工作上正在进行Streaming算子的研究学习,需要做到在流的基础上,通过kafka接收数据到 中间若干的计算算子,再到最后的输出。开始使用传统的streaming+kafka,但由于无法返回后续使用的dataset,只能放弃, 后来大牛提出永spark的结构化流处理,于是经过参考文档资料编写了一个可以进行过程处理的streaming处理流程。(只有与主题相关的代码,其余设计工作的没贴,看看实现思想即可,哈哈,与各位共勉。。。。。^.^)

sprak2.3.0官网流媒体编程文档

kafkaIputStep

复制代码
1
2
3
4
5
6
rawDataSet=sparkSession .readStream() .format("kafka") .option("kafka.bootstrap.servers", brokers) .option("subscribe", topics) .load().withWatermark("timestamp","1 minutes");//对dataset每一分钟进行一次水位截取然后进行合并发送

driver   

复制代码
1
foreach((ForeachWriter<Row>)processSteps[processIndex]).outputMode("append")
即可对dataset进行有方向输出
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public void run() { if (stepList.size() == 0) return; int processIndex = 0; dataSets = new Dataset[stepList.size()]; try { //处理第一个Input算子,构建Stream DataSet ModelStep firstModelStep = stepList.get(processIndex); if (firstModelStep.getAncestors().size() != 0) { //throw exception } Dataset<Row> streamSource = ((InputStep) processSteps[processIndex]).compute(sparkSession); dataSets[0]=streamSource; //只留最后一个步骤 for (processIndex = 1; processIndex < stepList.size()-1; processIndex++) { runComputeStep(processIndex);//中间的计算算子 }//Step 处理结束 //lastStep 进行dataset合并 Dataset<Row> outputDataSet = unionAncesstorDataSet(dataSets, processIndex); //User default trigger; //将结果通过输出算子输出到对应的地方 StreamingQuery handle = outputDataSet.writeStream().foreach((ForeachWriter<Row>)processSteps[processIndex]).outputMode("append").start(); handle.awaitTermination(); }catch (Exception e) { LogUtil.error(GICS_STEP_PROCESS_ERROR + ":" + e.getMessage(),e); return; } finally { sparkSession.stop(); } }

jdbcoutputStep  类要继承 ForeachWriter<Row> 

用于结构化流writeStream()的定向接收方法,只需要对在接收类里进行重写对应方法即可对数据进行专门处理,因为做的是通用算子,所以数据用OGG的特殊格式规定传输

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
private String url; private String username; private String password; private String driver; transient private Gson gson; transient private Connection connection; transient private Statement stmt_batch; @Override public void compute(final Dataset<Row> dataset, SparkSession sparkSession) throws Exception { throw new GICSException(GICS_METHOD_NOT_SUPPORT); } @Override public void initialize(StepInfo stepInfo, List<String> resourcePaths) throws GICSException { Map<String, String> params = stepInfo.getStepParams(); driver= GICSUtils.getStepParams(params,"driver",true); url=GICSUtils.getStepParams(params,"jdbcurl",true); username=GICSUtils.getStepParams(params,"username",true); password=GICSUtils.getStepParams(params,"password",true); } private String constructInsertSQL(StructMessage row) { return null; } private String constructDeleteSQL(StructMessage row){ return null; } private String constructUpdateSQL(StructMessage row){ return null; } //Method used for stream writing @Override public boolean open(long partitionId, long version) { try { connection = DBUtils.getConnection(url, username, password, driver); connection.setAutoCommit(false); stmt_batch = connection.createStatement(); gson = new GsonBuilder() //.setLenient()// json宽松 .enableComplexMapKeySerialization()//支持Map的key为复杂对象的形式 .serializeNulls() //智能null .setPrettyPrinting()// 调教格式 .disableHtmlEscaping() //默认是GSON把HTML 转义的 .create(); return true; }catch (Exception ex){ LogUtil.error(ex.getMessage(),ex); return false; } } @Override public void process(Row row) { String sql=null; try { //Check field op_type byte[] jsonBytes=row.getAs("value"); String jsonVal= null; StructMessage result = null; try { jsonVal = new String(jsonBytes,UTF8); Type type = new TypeToken<StructMessage>() {}.getType(); result = gson.fromJson(jsonVal, type); } catch (Exception e) { LogUtil.error("消息格式错误",jsonVal,e); } String op_type=result.getOp_type(); if(CommonUtil.testStringEmpty(op_type)) throw new GICSException("No op_type field for record"); if("I".equals(op_type)) { sql=constructInsertSQL(result); } if("D".equals(op_type)) { sql=constructDeleteSQL(result); } if("U".equals(op_type)) { sql=constructUpdateSQL(result); } stmt_batch.addBatch(sql); } catch (Exception e) { LogUtil.error("Executing JDBC update error",sql,e); } } @Override public void close(Throwable errorOrNull) { try { stmt_batch.executeBatch(); connection.commit(); } catch (Exception e) { LogUtil.error("Executing JDBC commit",e); }finally { DBUtils.close(connection); } }

最后

以上就是快乐故事最近收集整理的关于Spark2.3.0 结构化流 进行streaming+kafka的可操作算子流的全部内容,更多相关Spark2.3.0内容请搜索靠谱客的其他文章。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(79)

评论列表共有 0 条评论

立即
投稿
返回
顶部