我是靠谱客的博主 野性豆芽,最近开发中收集的这篇文章主要介绍hive to mysql,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

package org.admln.sqoop;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;

import org.apache.sqoop.client.SqoopClient;
import org.apache.sqoop.model.MFromConfig;
import org.apache.sqoop.model.MJob;
import org.apache.sqoop.model.MLink;
import org.apache.sqoop.model.MLinkConfig;
import org.apache.sqoop.model.MSubmission;
import org.apache.sqoop.model.MToConfig;
import org.apache.sqoop.submission.counter.Counter;
import org.apache.sqoop.submission.counter.CounterGroup;
import org.apache.sqoop.submission.counter.Counters;
import org.apache.sqoop.validation.Status;

public class HiveToMysql {
	//Main方法
	public static void main(String[] args) throws SQLException {
		Long startTime = System.currentTimeMillis();
		//hive statistic
		hiveStatistic();
		//hive export
		hiveExport();
		//sqoop:hive to mysql
		sqoopTransfer();
		Long endTime = System.currentTimeMillis();
		System.out.println("任务完成,总耗时:" + (endTime-startTime) + "ms");
	}
	//hive statistic
	public static void hiveStatistic() throws SQLException {
		String driverName = "org.apache.hadoop.hive.jdbc.HiveDriver";
	     try {
	         Class.forName(driverName);
	     } catch (ClassNotFoundException e) {
	         e.printStackTrace();
	         System.exit(1);
	     }
	     Connection con = DriverManager.getConnection(
	                        "jdbc:hive://hadoop:10000/default", "hive", "hive");
	     Statement stmt = con.createStatement();
	     //建表
	     String sql = "DROP TABLE IF EXISTS ipstatistical";
	     stmt.execute(sql);
	     System.out.println("正在执行:"+sql);
	     sql = "CREATE TABLE ipstatistical(IpAddress string COMMENT 'IP地址',"
	     		+ "VisitNum int COMMENT '访问次数') COMMENT 'IP统计表' "
	     		+ "ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY 'n'";
	     stmt.execute(sql);
	     System.out.println("正在执行:"+sql);
	     sql = "FROM logtable lt "
	     		+ "INSERT OVERWRITE TABLE ipstatistical "
	     		+ "SELECT lt.ipaddress,COUNT(lt.ipaddress) c GROUP BY lt.ipaddress";
	     System.out.println("正在执行: " + sql);
	     boolean result = stmt.execute(sql);
	     if(result) {
	    	 System.out.println("hive统计执行完毕");
	     }else {
	    	 System.out.println("hive执行失败!!!");
	     }
	}
	//hive export
	public static void hiveExport() throws SQLException {
		//export table department to 'hdfs_exports_location/department'
		String driverName = "org.apache.hadoop.hive.jdbc.HiveDriver";
	     try {
	         Class.forName(driverName);
	     } catch (ClassNotFoundException e) {
	         e.printStackTrace();
	         System.exit(1);
	     }
	     Connection con = DriverManager.getConnection(
	                        "jdbc:hive://hadoop:10000/default", "hive", "hive");
	     Statement stmt = con.createStatement();
	     String sql = "export table ipstatistical to '/out/aboutyunLog/HiveExport/ipstatistical/'";
	     System.out.println("正在执行: " + sql);
	     boolean result = stmt.execute(sql);
	     if(result) {
	    	 System.out.println("hive输出到HDFS执行完毕");
	     }else {
	    	 System.out.println("执行异常");
	     }
	}
	//sqoop:hive to mysql
	public static void sqoopTransfer() {
		//初始化
		String url = "http://hadoop:12000/sqoop/";
		SqoopClient client = new SqoopClient(url);
		
		//创建一个源链接 HDFS
		long fromConnectorId = 1;
		MLink fromLink = client.createLink(fromConnectorId);
		fromLink.setName("HDFS connector");
		fromLink.setCreationUser("admln");
		MLinkConfig fromLinkConfig = fromLink.getConnectorLinkConfig();
		fromLinkConfig.getStringInput("linkConfig.uri").setValue("hdfs://hadoop:8020/");
		Status fromStatus = client.saveLink(fromLink);
		if(fromStatus.canProceed()) {
		 System.out.println("创建HDFS Link成功,ID为: " + fromLink.getPersistenceId());
		} else {
		 System.out.println("创建HDFS Link失败");
		}
		//创建一个目的地链接 JDBC
		long toConnectorId = 2;
		MLink toLink = client.createLink(toConnectorId);
		toLink.setName("JDBC connector");
		toLink.setCreationUser("admln");
		MLinkConfig toLinkConfig = toLink.getConnectorLinkConfig();
		toLinkConfig.getStringInput("linkConfig.connectionString").setValue("jdbc:mysql://hadoop:3306/hive");
		toLinkConfig.getStringInput("linkConfig.jdbcDriver").setValue("com.mysql.jdbc.Driver");
		toLinkConfig.getStringInput("linkConfig.username").setValue("hive");
		toLinkConfig.getStringInput("linkConfig.password").setValue("hive");
		Status toStatus = client.saveLink(toLink);
		if(toStatus.canProceed()) {
		 System.out.println("创建JDBC Link成功,ID为: " + toLink.getPersistenceId());
		} else {
		 System.out.println("创建JDBC Link失败");
		}
		
		//创建一个任务
		long fromLinkId = fromLink.getPersistenceId();
		long toLinkId = toLink.getPersistenceId();
		MJob job = client.createJob(fromLinkId, toLinkId);
		job.setName("HDFS to MySQL job");
		job.setCreationUser("admln");
		//设置源链接任务配置信息
		MFromConfig fromJobConfig = job.getFromJobConfig();
		fromJobConfig.getStringInput("fromJobConfig.inputDirectory").setValue("/out/aboutyunLog/HiveExport/ipstatistical/data");
		
		//创建目的地链接任务配置信息
		MToConfig toJobConfig = job.getToJobConfig();
		toJobConfig.getStringInput("toJobConfig.schemaName").setValue("aboutyunlog");
		toJobConfig.getStringInput("toJobConfig.tableName").setValue("ipstatistical");
		Status status = client.saveJob(job);
		if(status.canProceed()) {
		 System.out.println("JOB创建成功,ID为: "+ job.getPersistenceId());
		} else {
		 System.out.println("JOB创建失败。");
		}
		
		//启动任务
		long jobId = job.getPersistenceId();
		MSubmission submission = client.startJob(jobId);
		System.out.println("JOB提交状态为 : " + submission.getStatus());
		while(submission.getStatus().isRunning() && submission.getProgress() != -1) {
		  System.out.println("进度 : " + String.format("%.2f %%", submission.getProgress() * 100));
		  //三秒报告一次进度
		  try {
			Thread.sleep(3000);
		  } catch (InterruptedException e) {
			e.printStackTrace();
		  }
		}
		System.out.println("JOB执行结束... ...");
		System.out.println("Hadoop任务ID为 :" + submission.getExternalId());
		Counters counters = submission.getCounters();
		if(counters != null) {
		  System.out.println("计数器:");
		  for(CounterGroup group : counters) {
		    System.out.print("t");
		    System.out.println(group.getName());
		    for(Counter counter : group) {
		      System.out.print("tt");
		      System.out.print(counter.getName());
		      System.out.print(": ");
		      System.out.println(counter.getValue());
		    }
		  }
		}
		if(submission.getExceptionInfo() != null) {
		  System.out.println("JOB执行异常,异常信息为 : " +submission.getExceptionInfo());
		}
		System.out.println("HDFS通过sqoop传输数据到MySQL统计执行完毕");
	}
}

最后

以上就是野性豆芽为你收集整理的hive to mysql的全部内容,希望文章能够帮你解决hive to mysql所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(58)

评论列表共有 0 条评论

立即
投稿
返回
顶部