概述
mysql_kafka
package com.dl.kafka;
import com.adt.entity.ShareHolder;
import com.adt.vo.TimeStamp;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.scheduling.annotation.Scheduled;
import java.sql.*;
import java.text.DateFormat;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Properties;
import java.util.UUID;
/**
* @author dinglei
* */
public class KafkaProducerMysql {
//mysql配置参数
private static String driver;
private static String url;
private static String username;
private static String password;
//String类型 内容的接收
private static String string;
//sql语句
private static String sql;
private static PreparedStatement ps;
private static ResultSet rs;
//连接对象
private static Connection connection;
//String类型时间的接收
private static String time;
//时间戳类型
private static TimeStamp timeStamp;
//kafka配置对象
private static Properties properties;
//kafka生产者对象
private static KafkaProducer<String, String> producer;
//kafka发送记录
private static ProducerRecord<String, String> record;
//计数
private static long count;
//每次条数
private static int n;
//shareHolder对象
private static ShareHolder shareHolder;
static {
properties = new Properties();
driver = "com.mysql.jdbc.Driver";
url = "jdbc:mysql://192.168.131.168:3306/表名";
username = "root";
password = "123456";
//初始化连接
try {
connection = DriverManager.getConnection(url,username,password);
} catch (SQLException e) {
e.printStackTrace();
}
//创建配置对象 指定Producer的信息
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.248.136:9092");
// 对record的key进行序列化
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
// ack的机制
properties.put(ProducerConfig.ACKS_CONFIG,"all");
// 重试发送record的次数
properties.put(ProducerConfig.RETRIES_CONFIG,3);
// 请求的超时时间 单位毫秒
properties.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG,30000);
// 开启幂等性支持
properties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,true);
/*properties.put(ProducerConfig.BATCH_SIZE_CONFIG,204800); // 批处理的数据上限
properties.put(ProducerConfig.LINGER_MS_CONFIG,10000); // 批处理的时间上限*/
// 生产者事务的ID
//properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, UUID.randomUUID().toString());
// 事务的超时时间
properties.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG,1000*60*3);
//连接最大请求数
properties.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1);
//创建Producer对象
producer = new KafkaProducer<String, String>(properties);
// 初始化kafka的事务
/* producer.initTransactions();
producer.beginTransaction();*/
//每次100000条
n = 100000;
}
@Scheduled(cron = "0/5 * * * * ?")
public static void main(String[] args) throws SQLException, ParseException {
mysqlToKafka();
}
public static void mysqlToKafka() throws SQLException, ParseException {
//producer.initTransactions();
//producer.beginTransaction();
//获取当前系统时间,以便于动态修改时间表中的时间
String content = new Date().toString();
//动态修改存储时间表,更新为本次启动时间
String updateTimeSql = "update ICSL_TIME_STAMP set content ='"+content+"' where id =1;";
connection = DriverManager.getConnection(url,username,password);
ps = connection.prepareStatement(updateTimeSql);
ps.executeUpdate();
connection = DriverManager.getConnection(url,username,password);
//查询时间表中的动态数据
String timeSql = "select TIME_STAMP from ICSL_TIME_STAMP where id = 1;";
ps = connection.prepareStatement(timeSql);
ResultSet resultSet = ps.executeQuery();
while (resultSet.next()){
timeStamp = new TimeStamp(resultSet.getString("TIME_STAMP"));
}
DateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
Date parse = sdf.parse(timeStamp.getTimeStamp());
//在本次启动时间的基础之上后退二十分钟,开始计算
time = sdf.format(new Date(parse.getTime() - 1200000));
sql = "select count(id) count from COMPANY_SHAREHOLDER where TIME_STAMP >= '"+time+"';";
ps = connection.prepareStatement(sql);
rs = ps.executeQuery();
while (rs.next()){
count = rs.getInt("count");
}
count = count%n == 0 ? (count/n) : (count/n)+1;
try{
for (int i = 0; i <= count; i++) {
sql = "select * from COMPANY_SHAREHOLDER where TIME_STAMP >= '"+time+"' limit "+(i*n)+","+n+";";
ps = connection.prepareStatement(sql);
rs = ps.executeQuery();
while (rs.next()){
shareHolder = new ShareHolder(
rs.getString("ID"),
rs.getString("COMPANY_ID"),
rs.getString("COMPANY_NAME"),
rs.getString("INVESTOR_TYPE"),
rs.getString("INVESTOR_TYPE_NAME"),
rs.getString("SHAREHOLDER_NUM"),
rs.getString("SHAREHOLDER_ID"),
rs.getString("SHAREHOLDER_NAME"),
rs.getString("CAPITAL"),
rs.getString("CAPITALACTL"),
rs.getString("AMOUNT"),
rs.getString("CERTNAME"),
rs.getString("CERTNO"),
rs.getString("STAKES_RATIO"),
rs.getString("NOTES"),
rs.getString("DESC01"),
rs.getString("DESC02"),
rs
最后
以上就是大力水壶为你收集整理的Flink通过抽取Mysql增量数据到Hbase,期间做新旧数据对比统计的全部内容,希望文章能够帮你解决Flink通过抽取Mysql增量数据到Hbase,期间做新旧数据对比统计所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复