我是靠谱客的博主 冷静冰淇淋,最近开发中收集的这篇文章主要介绍《深入理解Spark》之 结构化流(spark streaming+spark SQL 处理结构化数据)的一个demo,觉得挺不错的,现在分享给大家,希望可以做个参考。
概述
最近在做关于spark Streaming + spark sql 结合处理结构化的数据的业务,下面是一个小栗子,有需要的拿走!
package com.unistack.tamboo.compute.process.impl;
import com.alibaba.fastjson.JSONArray;
import com.google.common.collect.Maps;
import com.unistack.tamboo.compute.process.StreamProcess;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.ConsumerStrategies;
import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.*;
/**
* @author hero.li
* spark sql处理流数据
*/
public class SqlProcess implements StreamProcess{
private static Logger LOGGER = LoggerFactory.getLogger(SqlProcess.class);
private Properties outputInfo;
private String toTopic;
/**
* {"datasources":[{"password":"welcome1","port":"3308","ip":"192.168.1.192","dbName":"test","dbType":"MYSQL","dataSourceName":"191_test","username":"root","tableName":"t1"},
* {"password":"welcome1","port":"3308","ip":"192.168.1.191","dbName":"test","dbType":"MYSQL","dataSourceName":"191_test","username":"root","tableName":"t1"}]
* ,"sql":"select * from ....","windowLen":"时间范围,2秒的倍数","windowSlide":"滚动间隔,2的倍数"}
*/
public SqlProcess(Properties outputInfo,String toTopic){
this.outputInfo = outputInfo;
this.toTopic = toTopic;
}
@Override
public void logic(JavaRDD<ConsumerRecord<String, String>> rdd) {
rdd.foreachPartition(itr->{
while(itr.hasNext()){
String recored = itr.next().value();
}
});
}
public static void main(String[] args) throws InterruptedException {
try{
Class.forName("com.mysql.jdbc.Driver");
} catch (ClassNotFoundException e){
e.printStackTrace();
}
SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount");
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1));
SparkSession spark = SparkSession.builder().appName("test_kane").getOrCreate();
Map<String,String> map = Maps.newHashMap();
// map.put("url", "jdbc:mysql://x.x.x.x:3309/test?user=root&password=welcome1&characterEncoding=UTF8");
map.put("url","jdbc:mysql://x.x.x.x:3309/test?characterEncoding=UTF8");
map.put("user","root");
map.put("password", "welcome1");
map.put("dbtable", "t2");
Dataset<Row> hiveJob = spark.read().format("jdbc").options(map).load();
hiveJob.createOrReplaceTempView("t2");
System.setProperty("java.security.auth.login.config","/Users/frank/Desktop/shell/lyh.conf");
Map<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put("bootstrap.servers", "x.x.x.x:9999");
kafkaParams.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
kafkaParams.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
kafkaParams.put("group.id",String.valueOf(System.currentTimeMillis()));
kafkaParams.put("auto.offset.reset","earliest");
kafkaParams.put("enable.auto.commit",true);
kafkaParams.put("sasl.mechanism","PLAIN");
kafkaParams.put("security.protocol","SASL_PLAINTEXT");
Collection<String> topics = Arrays.asList("xxTopic");
JavaInputDStream<ConsumerRecord<String,String>> stream = KafkaUtils.createDirectStream(jssc,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String,String>Subscribe(topics,kafkaParams));
stream.flatMap(r->Arrays.asList(new String(r.value())).iterator())
.foreachRDD((JavaRDD<String> rdd) ->{
if(rdd.count() > 0){
Dataset<Row> df = spark.read().json(spark.createDataset(rdd.rdd(),Encoders.STRING()));
df.createOrReplaceTempView("streamData");
df.cache();
try{
Dataset<Row> aggregators = spark.sql("select a.*,b.* from streamData a join t2 b on a.id = b.id");
String[] colsName = aggregators.columns();
Iterator<Row> itr = aggregators.toLocalIterator();
while(itr.hasNext()){
Row row = itr.next();
for(int i=0;i<colsName.length;i++){
String cn = colsName[i];
Object as = row.getAs(cn);
System.out.print(cn+"="+as+", ");
}
System.out.println();
}
}catch(Exception e){
System.out.println("::::::::::::::::::::::::::::::::::::::::err::::::::::::::::::::::::::::::::::::::::::::");
e.printStackTrace();
}
}
});
jssc.start();
jssc.awaitTermination();
}
}
最后
以上就是冷静冰淇淋为你收集整理的《深入理解Spark》之 结构化流(spark streaming+spark SQL 处理结构化数据)的一个demo的全部内容,希望文章能够帮你解决《深入理解Spark》之 结构化流(spark streaming+spark SQL 处理结构化数据)的一个demo所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复