概述
话不多说先上代码
sparkstreaming对接kafka在对数据流做相应的逻辑处理之后可以放到hdfs、数据库等处。之前因为有用到将数据插入数据库以检测性能,所以就拿出来给大家分享一下,本人大白一枚。。
说到数据插入数据库,对不同情况有不同方式:逐条插入,就是一条数据插入一次,调用一次数据库连接。当数据量大时,使用这种方法显然就不行了,最起码速度太慢了,而且还可能会因为过多数据库连接造成其他问题。这个时候可以用批量插入的方式。
批量插入:顾名思义就是一个批次一个批次的把数据插入到数据库中,本次示例按2000批量插入到数据库中,2000条数据调用一个数据库连接。
当然,我并没有用数据库连接池,只是测试使用就没弄,随便连的jdbc,感兴趣的可以用数据库连接池。
import DBUtils.Databases;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.spark.SparkConf;
import org.apache.spark.TaskContext;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.*;
import scala.Tuple2;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.text.ParseException;
import java.util.*;
public class NB_Test {
public static void main (String[] args) throws InterruptedException {
//创建conf对象,context对象以及流context对象
SparkConf conf = new SparkConf().setAppName("kafka_Spark").setMaster("local[4]");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaStreamingContext ssc = new JavaStreamingContext(sc, Durations.seconds(5));
//创建map类型以传参
Map<String,Object> kafkaParams = new HashMap<String, Object>();
String brokers = "10.204.118.101:9092,10.204.118.102:9092,10.204.118.103:9092";
kafkaParams.put("bootstrap.servers",brokers);
kafkaParams.put("group.id","test-consumer");
kafkaParams.put("key.deserializer", StringDeserializer.class);
kafkaParams.put("value.deserializer",StringDeserializer.class);
HashSet topicSet = new HashSet<String>();
topicSet.add("test");
kafkaParams.put("auto.offset.reset","latest");
kafkaParams.put("enable.auto.commit",false);
JavaInputDStream<ConsumerRecord<String,String>> DStream = KafkaUtils.createDirectStream(
ssc,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.Subscribe(topicSet,kafkaParams)
);
//将数据流转换为<k,v>键值对类型的数据流,
这里只是我闲来无事打印一下
JavaPairDStream<String, String> kv = DStream.mapToPair(record -> new Tuple2<String, String>(record.key(), record.value()));
kv.print();
//此时得到开始时间戳
long start = System.currentTimeMillis();
//对流遍历rdd
DStream.foreachRDD(rdd -> {
OffsetRange[] offsetRanges = ((HasOffsetRanges)rdd.rdd()).offsetRanges();
//对每个rdd遍历分区
rdd.foreachPartition(partitions->{
OffsetRange o = offsetRanges[TaskContext.getPartitionId()];
System.out.println(o.partition()+"
"+o.fromOffset()+"
"+o.untilOffset());
//创建jdbc类对象
Databases databases = new Databases();
//使用类对象调用方法得到jdbc连接
Connection connection = databases.getConnection();
//设置false,即默认为手动提交jdbc连接任务
connection.setAutoCommit(false);
//本地测试mysql插入
String sql ="insert into db_test
values(?,?,?,?,?,?,?,?,?)";
PreparedStatement ps = connection.prepareStatement(sql);
//针对不同key值创建不同集合用以存放数据
List<String> listNull = new ArrayList<>();
List<String> list1001 = new ArrayList<>();
List<String> list1002 = new ArrayList<>();
//对每个分区遍历,将符合条件的数据分别放到不同集合中便于后续逻辑操作
partitions.forEachRemaining(line->{
//可以对不同key值判断然后执行不同逻辑处理操作
/* if(line._1()==null){
listNull.add(line._2().toString());
}else if(line._1().equals("1001")){
list1001.add(line._2().toString());
}else{
list1002.add(line._2().toString());
}*/
listNull.add(line.value());
});
//调用方法处理数据
NB_Test.Batch_SqlServer(listNull.size(),2000,listNull,ps,connection);
//每个分区使用完jdbc连接后,关闭连接
databases.closeConn(connection);
});
((CanCommitOffsets)DStream.inputDStream()).commitAsync(offsetRanges);
});
//插入数据结束时的时间戳
long end = System.currentTimeMillis();
System.out.println("程序执行时间为:"+(end-start)+"ms");
ssc.start();
ssc.awaitTermination();
}
//批量插入
/**
* @param len
集合中总数据长度
* @param batch
批量导入条数
* @param list
数据集合
* @param PS
SQL提交对象
* @param connection
jdbc连接对象
* @throws SQLException 抛异常
*/
public static void Batch_SqlServer(int len,int batch,List<String> list,PreparedStatement PS,Connection connection) throws SQLException, ParseException {
//首先判定集合有数据
if(len!=0){
//循环次数
int times = len/batch;
for(int k =0;k<=times;k++){
//判断是否刚好整数分批次
if(len%batch==0 && k*batch<len){
//分段将不同批次数据循环批量插入
for(int m = batch*k;m<(k+1)*batch;m++){
String[] split = list.get(m).split(",");
System.out.println("m"+m);
PS.setInt(1,Integer.parseInt(split[0]));
PS.setString(2,split[1]);
PS.setInt(3,Integer.parseInt(split[2]));
PS.setString(4,split[3]);
PS.setString(5,split[4]);
PS.setString(6,split[5]);
PS.setString(7,split[6]);
PS.setString(8,split[7]);
PS.setString(9,split[8]);
PS.addBatch();
}
PS.executeBatch();
connection.commit();
//判断当集合长度不是批次整数倍时
}else if(len%batch!=0){
//判断循环到此时是否数据够一个批次,当超过本次循环的批次数量时
if(k*batch<len-batch){
for(int n = k*batch;n<(k+1)*batch;n++){
String[] split1 = list.get(n).split(",");
System.out.println("n"+n);
PS.setInt(1,Integer.parseInt(split1[0]));
PS.setString(2,split1[1]);
PS.setInt(3,Integer.parseInt(split1[2]));
PS.setString(4,split1[3]);
PS.setString(5,split1[4]);
PS.setString(6,split1[5]);
PS.setString(7,split1[6]);
PS.setString(8,split1[7]);
PS.setString(9,split1[8]);
PS.addBatch();
}
PS.executeBatch();
connection.commit();
//判断当数据最后一批,即:不够一个批次数量,但是还是一个批次导入
}else{
for(int p = k*batch;p<(k*batch+(len%batch));p++){
String[] split2 = list.get(p).split(",");
System.out.println("p"+p);
PS.setInt(1,Integer.parseInt(split2[0]));
PS.setString(2,split2[1]);
PS.setInt(3,Integer.parseInt(split2[2]));
PS.setString(4,split2[3]);
PS.setString(5,split2[4]);
PS.setString(6,split2[5]);
PS.setString(7,split2[6]);
PS.setString(8,split2[7]);
PS.setString(9,split2[8]);
PS.addBatch();
}
PS.executeBatch();
connection.commit();
}
}
}
}
}
哦了 ,大白一枚,把自己知道的写个博客以后慢慢看。。。哇咔咔
最后
以上就是飘逸小馒头为你收集整理的sparkstreaming对接kafka将数据批量插入数据库(java版本)的全部内容,希望文章能够帮你解决sparkstreaming对接kafka将数据批量插入数据库(java版本)所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复