概述
package org.admln.sqoop;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import org.apache.sqoop.client.SqoopClient;
import org.apache.sqoop.model.MFromConfig;
import org.apache.sqoop.model.MJob;
import org.apache.sqoop.model.MLink;
import org.apache.sqoop.model.MLinkConfig;
import org.apache.sqoop.model.MSubmission;
import org.apache.sqoop.model.MToConfig;
import org.apache.sqoop.submission.counter.Counter;
import org.apache.sqoop.submission.counter.CounterGroup;
import org.apache.sqoop.submission.counter.Counters;
import org.apache.sqoop.validation.Status;
public class HiveToMysql {
//Main方法
public static void main(String[] args) throws SQLException {
Long startTime = System.currentTimeMillis();
//hive statistic
hiveStatistic();
//hive export
hiveExport();
//sqoop:hive to mysql
sqoopTransfer();
Long endTime = System.currentTimeMillis();
System.out.println("任务完成,总耗时:" + (endTime-startTime) + "ms");
}
//hive statistic
public static void hiveStatistic() throws SQLException {
String driverName = "org.apache.hadoop.hive.jdbc.HiveDriver";
try {
Class.forName(driverName);
} catch (ClassNotFoundException e) {
e.printStackTrace();
System.exit(1);
}
Connection con = DriverManager.getConnection(
"jdbc:hive://hadoop:10000/default", "hive", "hive");
Statement stmt = con.createStatement();
//建表
String sql = "DROP TABLE IF EXISTS ipstatistical";
stmt.execute(sql);
System.out.println("正在执行:"+sql);
sql = "CREATE TABLE ipstatistical(IpAddress string COMMENT 'IP地址',"
+ "VisitNum int COMMENT '访问次数') COMMENT 'IP统计表' "
+ "ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY 'n'";
stmt.execute(sql);
System.out.println("正在执行:"+sql);
sql = "FROM logtable lt "
+ "INSERT OVERWRITE TABLE ipstatistical "
+ "SELECT lt.ipaddress,COUNT(lt.ipaddress) c GROUP BY lt.ipaddress";
System.out.println("正在执行: " + sql);
boolean result = stmt.execute(sql);
if(result) {
System.out.println("hive统计执行完毕");
}else {
System.out.println("hive执行失败!!!");
}
}
//hive export
public static void hiveExport() throws SQLException {
//export table department to 'hdfs_exports_location/department'
String driverName = "org.apache.hadoop.hive.jdbc.HiveDriver";
try {
Class.forName(driverName);
} catch (ClassNotFoundException e) {
e.printStackTrace();
System.exit(1);
}
Connection con = DriverManager.getConnection(
"jdbc:hive://hadoop:10000/default", "hive", "hive");
Statement stmt = con.createStatement();
String sql = "export table ipstatistical to '/out/aboutyunLog/HiveExport/ipstatistical/'";
System.out.println("正在执行: " + sql);
boolean result = stmt.execute(sql);
if(result) {
System.out.println("hive输出到HDFS执行完毕");
}else {
System.out.println("执行异常");
}
}
//sqoop:hive to mysql
public static void sqoopTransfer() {
//初始化
String url = "http://hadoop:12000/sqoop/";
SqoopClient client = new SqoopClient(url);
//创建一个源链接 HDFS
long fromConnectorId = 1;
MLink fromLink = client.createLink(fromConnectorId);
fromLink.setName("HDFS connector");
fromLink.setCreationUser("admln");
MLinkConfig fromLinkConfig = fromLink.getConnectorLinkConfig();
fromLinkConfig.getStringInput("linkConfig.uri").setValue("hdfs://hadoop:8020/");
Status fromStatus = client.saveLink(fromLink);
if(fromStatus.canProceed()) {
System.out.println("创建HDFS Link成功,ID为: " + fromLink.getPersistenceId());
} else {
System.out.println("创建HDFS Link失败");
}
//创建一个目的地链接 JDBC
long toConnectorId = 2;
MLink toLink = client.createLink(toConnectorId);
toLink.setName("JDBC connector");
toLink.setCreationUser("admln");
MLinkConfig toLinkConfig = toLink.getConnectorLinkConfig();
toLinkConfig.getStringInput("linkConfig.connectionString").setValue("jdbc:mysql://hadoop:3306/hive");
toLinkConfig.getStringInput("linkConfig.jdbcDriver").setValue("com.mysql.jdbc.Driver");
toLinkConfig.getStringInput("linkConfig.username").setValue("hive");
toLinkConfig.getStringInput("linkConfig.password").setValue("hive");
Status toStatus = client.saveLink(toLink);
if(toStatus.canProceed()) {
System.out.println("创建JDBC Link成功,ID为: " + toLink.getPersistenceId());
} else {
System.out.println("创建JDBC Link失败");
}
//创建一个任务
long fromLinkId = fromLink.getPersistenceId();
long toLinkId = toLink.getPersistenceId();
MJob job = client.createJob(fromLinkId, toLinkId);
job.setName("HDFS to MySQL job");
job.setCreationUser("admln");
//设置源链接任务配置信息
MFromConfig fromJobConfig = job.getFromJobConfig();
fromJobConfig.getStringInput("fromJobConfig.inputDirectory").setValue("/out/aboutyunLog/HiveExport/ipstatistical/data");
//创建目的地链接任务配置信息
MToConfig toJobConfig = job.getToJobConfig();
toJobConfig.getStringInput("toJobConfig.schemaName").setValue("aboutyunlog");
toJobConfig.getStringInput("toJobConfig.tableName").setValue("ipstatistical");
Status status = client.saveJob(job);
if(status.canProceed()) {
System.out.println("JOB创建成功,ID为: "+ job.getPersistenceId());
} else {
System.out.println("JOB创建失败。");
}
//启动任务
long jobId = job.getPersistenceId();
MSubmission submission = client.startJob(jobId);
System.out.println("JOB提交状态为 : " + submission.getStatus());
while(submission.getStatus().isRunning() && submission.getProgress() != -1) {
System.out.println("进度 : " + String.format("%.2f %%", submission.getProgress() * 100));
//三秒报告一次进度
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("JOB执行结束... ...");
System.out.println("Hadoop任务ID为 :" + submission.getExternalId());
Counters counters = submission.getCounters();
if(counters != null) {
System.out.println("计数器:");
for(CounterGroup group : counters) {
System.out.print("t");
System.out.println(group.getName());
for(Counter counter : group) {
System.out.print("tt");
System.out.print(counter.getName());
System.out.print(": ");
System.out.println(counter.getValue());
}
}
}
if(submission.getExceptionInfo() != null) {
System.out.println("JOB执行异常,异常信息为 : " +submission.getExceptionInfo());
}
System.out.println("HDFS通过sqoop传输数据到MySQL统计执行完毕");
}
}
最后
以上就是野性豆芽为你收集整理的hive to mysql的全部内容,希望文章能够帮你解决hive to mysql所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复