我是靠谱客的博主 仁爱台灯,最近开发中收集的这篇文章主要介绍Spark Streaming 输出数据清洗结果到Mysql,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

Flume+Kafka+Spark Streaming + Mysql

package util;
import java.awt.List;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.KafkaUtils;
import kafka.serializer.StringDecoder;
import scala.Tuple2;
public class DataClean {
public static void main(String[] args) throws InterruptedException {
SparkSession spark = SparkSession.builder().master("local").appName("dataClean").getOrCreate();
//用SparkSession创建Spark Context
JavaSparkContext conf = new JavaSparkContext(spark.sparkContext());
JavaStreamingContext ssc = new JavaStreamingContext(conf, Durations.seconds(2));
Map<String, String> kafkaParams = new HashMap<String, String>();
kafkaParams.put("bootstrap.servers", "vm04:9092,vm05:9092,vm06:9092");
Set<String> topics = new HashSet<String>();
topics.add("test_m_brokers");
JavaPairDStream<String, String> lines = KafkaUtils.createDirectStream(ssc, String.class, String.class,
StringDecoder.class, StringDecoder.class, kafkaParams, topics);
JavaDStream<String> words = lines.map(new Function<Tuple2<String, String>, String>() {
public String call(Tuple2<String, String> tuple) throws Exception {
String[] strs = tuple._2.split(",");
String ids =
strs[9].split("/")[3];
String tmp = "";
tmp += strs[1] + "t" + strs[9] + "t" + ids + "t" + strs[11];
return tmp;
}
});
//words.print();
words.foreachRDD(rdd -> {
String url = "jdbc:mysql://vm04:3306/echarts";
Properties connectionProperties = new Properties();
connectionProperties.put("user", "root");
connectionProperties.put("password", "xxxxx");
connectionProperties.put("driver", "com.mysql.jdbc.Driver");
JavaRDD<Row> ip = rdd.map(new Function<String, Row>() {
public Row call(String line) throws Exception {
String[] tmps = line.split("t");
return RowFactory.create(String.valueOf(tmps[0]),String.valueOf(tmps[1]),
String.valueOf(tmps[2]),String.valueOf(tmps[3]));
}
});
ArrayList<StructField> fields = new ArrayList<StructField>();
fields.add(DataTypes.createStructField("ip", DataTypes.StringType, true));
fields.add(DataTypes.createStructField("video", DataTypes.StringType, true));
fields.add(DataTypes.createStructField("videoid", DataTypes.StringType, true));
fields.add(DataTypes.createStructField("device", DataTypes.StringType, true));
StructType type = DataTypes.createStructType(fields);
Dataset<Row> ipDF = spark.createDataFrame(ip, type);
ipDF.write().mode("append").jdbc(url, "echarts", connectionProperties);
//spark.close();
});
ssc.start();
ssc.awaitTermination();
}
}

最后

以上就是仁爱台灯为你收集整理的Spark Streaming 输出数据清洗结果到Mysql的全部内容,希望文章能够帮你解决Spark Streaming 输出数据清洗结果到Mysql所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(62)

评论列表共有 0 条评论

立即
投稿
返回
顶部