我是靠谱客的博主 文静鲜花,最近开发中收集的这篇文章主要介绍使用SparkStreaming实现将数据写到MySQL中(1)在pom.xml中加入如下依赖包(2)在MySql中创建数据库和表,命令操作如下(3)使用Java编写一个数据库连接池类(5)打开netcat发送数据(6)提交,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

(1)在pom.xml中加入如下依赖包

 1 <dependency>
 2
<groupId>mysql</groupId>
 3
<artifactId>mysql-connector-java</artifactId>
 4
<version>5.1.38</version>
 5 </dependency>
 6 <dependency>
 7
<groupId>commons-dbcp</groupId>
 8
<artifactId>commons-dbcp</artifactId>
 9
<version>1.4</version>
10 </dependency>
 

(2)在MySql中创建数据库和表,命令操作如下

 

1 mysql -uroot –p
2 create database test;
3 use test;
4 show tables;
5 create table streaming(item varchar(30),count int);

 

 

(3)使用Java编写一个数据库连接池类

 1 package cn.itcast.spark.day7;
 2
 3 import java.sql.Connection;
 4 import java.sql.DriverManager;
 5 import java.util.LinkedList;
 6
 7 public class ConnectionPool {
 8
private static LinkedList<Connection> connectionQueue;
 9
10
static {
11
try {
12
Class.forName("com.mysql.jdbc.Driver");
13
}catch (ClassNotFoundException e) {
14 
e.printStackTrace();
15 
}
16 
}
17
18
public synchronized static Connection getConnection() {
19
try {
20
if (connectionQueue == null) {
21
connectionQueue = new LinkedList<Connection>();
22
for (int i = 0;i < 5;i ++) {
23
Connection conn = DriverManager.getConnection(
24
"jdbc:mysql://localhost:3306/test?characterEncoding=utf8&useSSL=true",
25
"root",
26
"root"
27 
);
28 
connectionQueue.push(conn);
29 
}
30 
}
31
}catch (Exception e) {
32 
e.printStackTrace();
33 
}
34
return connectionQueue.poll();
35 
}
36
37
public static void returnConnection(Connection conn) {
38 
connectionQueue.push(conn);
39 
}
40 }

(4)编写Spark代码
 1 package cn.itcast.spark.day7
 2
 3 import org.apache.spark.{SparkConf, TaskContext}
 4 import org.apache.spark.streaming.{Seconds, StreamingContext}
 5
 6 object sqlTest {
 7 
def main(args: Array[String]){
 8
 9
val conf = new SparkConf().setMaster("local[2]").setAppName("w")
10
val ssc = new StreamingContext(conf,Seconds(5))
11
12
val lines = ssc.socketTextStream("101.132.122.75",9999)
13
val words = lines.flatMap(_.split(" "))
14
val wordcount = words.map(x => (x,1)).reduceByKey(_+_)
15
wordcount.foreachRDD(rdd => {
16
rdd.foreachPartition(eachPartition => {
17
val conn = ConnectionPool.getConnection();
18
eachPartition.foreach(record => {
19
val sql = "insert into streaming(item,count) values('" + record._1 + "'," + record._2 + ")"
20
val stmt = conn.createStatement
21 
stmt.executeUpdate(sql)
22 
})
23 
ConnectionPool.returnConnection(conn)
24 
})
25 
})
26 
ssc.start()
27 
ssc.awaitTermination()
28 
}
29 }

(5)打开netcat发送数据

1 root@spark-master:~# nc -lk 9999
2 spark hadoop kafka spark hadoop kafka spark hadoop kafka spark hadoop

 

(6)提交

/usr/local/spark/bin/spark-submit --driver-class-path /usr/local/spark/lib/mysql-connector-java-5.1.35-bin.jar /root/Documents/SparkApps/SparkStreamApps.jar

 

转载于:https://www.cnblogs.com/hmy-blog/p/7798840.html

最后

以上就是文静鲜花为你收集整理的使用SparkStreaming实现将数据写到MySQL中(1)在pom.xml中加入如下依赖包(2)在MySql中创建数据库和表,命令操作如下(3)使用Java编写一个数据库连接池类(5)打开netcat发送数据(6)提交的全部内容,希望文章能够帮你解决使用SparkStreaming实现将数据写到MySQL中(1)在pom.xml中加入如下依赖包(2)在MySql中创建数据库和表,命令操作如下(3)使用Java编写一个数据库连接池类(5)打开netcat发送数据(6)提交所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(57)

评论列表共有 0 条评论

立即
投稿
返回
顶部