概述
工作上正在进行Streaming算子的研究学习,需要做到在流的基础上,通过kafka接收数据到 中间若干的计算算子,再到最后的输出。开始使用传统的streaming+kafka,但由于无法返回后续使用的dataset,只能放弃, 后来大牛提出永spark的结构化流处理,于是经过参考文档资料编写了一个可以进行过程处理的streaming处理流程。(只有与主题相关的代码,其余设计工作的没贴,看看实现思想即可,哈哈,与各位共勉。。。。。^.^)
sprak2.3.0官网流媒体编程文档
kafkaIputStep
rawDataSet=sparkSession
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", brokers)
.option("subscribe", topics)
.load().withWatermark("timestamp","1 minutes");//对dataset每一分钟进行一次水位截取然后进行合并发送
driver
foreach((ForeachWriter<Row>)processSteps[processIndex]).outputMode("append")
即可对dataset进行有方向输出
public void run() {
if (stepList.size() == 0)
return;
int processIndex = 0;
dataSets = new Dataset[stepList.size()];
try {
//处理第一个Input算子,构建Stream DataSet
ModelStep firstModelStep = stepList.get(processIndex);
if (firstModelStep.getAncestors().size() != 0) {
//throw exception
}
Dataset<Row> streamSource = ((InputStep) processSteps[processIndex]).compute(sparkSession);
dataSets[0]=streamSource;
//只留最后一个步骤
for (processIndex = 1; processIndex < stepList.size()-1; processIndex++) {
runComputeStep(processIndex);//中间的计算算子
}//Step 处理结束
//lastStep 进行dataset合并
Dataset<Row> outputDataSet = unionAncesstorDataSet(dataSets, processIndex);
//User default trigger; //将结果通过输出算子输出到对应的地方
StreamingQuery handle = outputDataSet.writeStream().foreach((ForeachWriter<Row>)processSteps[processIndex]).outputMode("append").start();
handle.awaitTermination();
}catch (Exception e) {
LogUtil.error(GICS_STEP_PROCESS_ERROR + ":" + e.getMessage(),e);
return;
} finally {
sparkSession.stop();
}
}
jdbcoutputStep 类要继承 ForeachWriter<Row>
用于结构化流writeStream()的定向接收方法,只需要对在接收类里进行重写对应方法即可对数据进行专门处理,因为做的是通用算子,所以数据用OGG的特殊格式规定传输
private String url;
private String username;
private String password;
private String driver;
transient private Gson gson;
transient private Connection connection;
transient private Statement stmt_batch;
@Override
public void compute(final Dataset<Row> dataset, SparkSession sparkSession) throws Exception {
throw new GICSException(GICS_METHOD_NOT_SUPPORT);
}
@Override
public void initialize(StepInfo stepInfo, List<String> resourcePaths) throws GICSException {
Map<String, String> params = stepInfo.getStepParams();
driver= GICSUtils.getStepParams(params,"driver",true);
url=GICSUtils.getStepParams(params,"jdbcurl",true);
username=GICSUtils.getStepParams(params,"username",true);
password=GICSUtils.getStepParams(params,"password",true);
}
private String constructInsertSQL(StructMessage row) {
return null;
}
private String constructDeleteSQL(StructMessage row){
return null;
}
private String constructUpdateSQL(StructMessage row){
return null;
}
//Method used for stream writing
@Override
public boolean open(long partitionId, long version) {
try {
connection = DBUtils.getConnection(url, username, password, driver);
connection.setAutoCommit(false);
stmt_batch = connection.createStatement();
gson = new GsonBuilder() //.setLenient()// json宽松
.enableComplexMapKeySerialization()//支持Map的key为复杂对象的形式
.serializeNulls() //智能null
.setPrettyPrinting()// 调教格式
.disableHtmlEscaping() //默认是GSON把HTML 转义的
.create();
return true;
}catch (Exception ex){
LogUtil.error(ex.getMessage(),ex);
return false;
}
}
@Override
public void process(Row row) {
String sql=null;
try {
//Check field op_type
byte[] jsonBytes=row.getAs("value");
String jsonVal= null;
StructMessage result = null;
try {
jsonVal = new String(jsonBytes,UTF8);
Type type = new TypeToken<StructMessage>() {}.getType();
result = gson.fromJson(jsonVal, type);
} catch (Exception e) {
LogUtil.error("消息格式错误",jsonVal,e);
}
String op_type=result.getOp_type();
if(CommonUtil.testStringEmpty(op_type))
throw new GICSException("No op_type field for record");
if("I".equals(op_type)) {
sql=constructInsertSQL(result);
}
if("D".equals(op_type)) {
sql=constructDeleteSQL(result);
}
if("U".equals(op_type)) {
sql=constructUpdateSQL(result);
}
stmt_batch.addBatch(sql);
} catch (Exception e) {
LogUtil.error("Executing JDBC update error",sql,e);
}
}
@Override
public void close(Throwable errorOrNull) {
try {
stmt_batch.executeBatch();
connection.commit();
} catch (Exception e) {
LogUtil.error("Executing JDBC commit",e);
}finally {
DBUtils.close(connection);
}
}
最后
以上就是快乐故事为你收集整理的Spark2.3.0 结构化流 进行streaming+kafka的可操作算子流的全部内容,希望文章能够帮你解决Spark2.3.0 结构化流 进行streaming+kafka的可操作算子流所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复