我是靠谱客的博主 野性豆芽,这篇文章主要介绍hive to mysql,现在分享给大家,希望可以做个参考。

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
package org.admln.sqoop; import java.sql.Connection; import java.sql.DriverManager; import java.sql.SQLException; import java.sql.Statement; import org.apache.sqoop.client.SqoopClient; import org.apache.sqoop.model.MFromConfig; import org.apache.sqoop.model.MJob; import org.apache.sqoop.model.MLink; import org.apache.sqoop.model.MLinkConfig; import org.apache.sqoop.model.MSubmission; import org.apache.sqoop.model.MToConfig; import org.apache.sqoop.submission.counter.Counter; import org.apache.sqoop.submission.counter.CounterGroup; import org.apache.sqoop.submission.counter.Counters; import org.apache.sqoop.validation.Status; public class HiveToMysql { //Main方法 public static void main(String[] args) throws SQLException { Long startTime = System.currentTimeMillis(); //hive statistic hiveStatistic(); //hive export hiveExport(); //sqoop:hive to mysql sqoopTransfer(); Long endTime = System.currentTimeMillis(); System.out.println("任务完成,总耗时:" + (endTime-startTime) + "ms"); } //hive statistic public static void hiveStatistic() throws SQLException { String driverName = "org.apache.hadoop.hive.jdbc.HiveDriver"; try { Class.forName(driverName); } catch (ClassNotFoundException e) { e.printStackTrace(); System.exit(1); } Connection con = DriverManager.getConnection( "jdbc:hive://hadoop:10000/default", "hive", "hive"); Statement stmt = con.createStatement(); //建表 String sql = "DROP TABLE IF EXISTS ipstatistical"; stmt.execute(sql); System.out.println("正在执行:"+sql); sql = "CREATE TABLE ipstatistical(IpAddress string COMMENT 'IP地址'," + "VisitNum int COMMENT '访问次数') COMMENT 'IP统计表' " + "ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY 'n'"; stmt.execute(sql); System.out.println("正在执行:"+sql); sql = "FROM logtable lt " + "INSERT OVERWRITE TABLE ipstatistical " + "SELECT lt.ipaddress,COUNT(lt.ipaddress) c GROUP BY lt.ipaddress"; System.out.println("正在执行: " + sql); boolean result = stmt.execute(sql); if(result) { System.out.println("hive统计执行完毕"); }else { System.out.println("hive执行失败!!!"); } } //hive export public static void hiveExport() throws SQLException { //export table department to 'hdfs_exports_location/department' String driverName = "org.apache.hadoop.hive.jdbc.HiveDriver"; try { Class.forName(driverName); } catch (ClassNotFoundException e) { e.printStackTrace(); System.exit(1); } Connection con = DriverManager.getConnection( "jdbc:hive://hadoop:10000/default", "hive", "hive"); Statement stmt = con.createStatement(); String sql = "export table ipstatistical to '/out/aboutyunLog/HiveExport/ipstatistical/'"; System.out.println("正在执行: " + sql); boolean result = stmt.execute(sql); if(result) { System.out.println("hive输出到HDFS执行完毕"); }else { System.out.println("执行异常"); } } //sqoop:hive to mysql public static void sqoopTransfer() { //初始化 String url = "http://hadoop:12000/sqoop/"; SqoopClient client = new SqoopClient(url); //创建一个源链接 HDFS long fromConnectorId = 1; MLink fromLink = client.createLink(fromConnectorId); fromLink.setName("HDFS connector"); fromLink.setCreationUser("admln"); MLinkConfig fromLinkConfig = fromLink.getConnectorLinkConfig(); fromLinkConfig.getStringInput("linkConfig.uri").setValue("hdfs://hadoop:8020/"); Status fromStatus = client.saveLink(fromLink); if(fromStatus.canProceed()) { System.out.println("创建HDFS Link成功,ID为: " + fromLink.getPersistenceId()); } else { System.out.println("创建HDFS Link失败"); } //创建一个目的地链接 JDBC long toConnectorId = 2; MLink toLink = client.createLink(toConnectorId); toLink.setName("JDBC connector"); toLink.setCreationUser("admln"); MLinkConfig toLinkConfig = toLink.getConnectorLinkConfig(); toLinkConfig.getStringInput("linkConfig.connectionString").setValue("jdbc:mysql://hadoop:3306/hive"); toLinkConfig.getStringInput("linkConfig.jdbcDriver").setValue("com.mysql.jdbc.Driver"); toLinkConfig.getStringInput("linkConfig.username").setValue("hive"); toLinkConfig.getStringInput("linkConfig.password").setValue("hive"); Status toStatus = client.saveLink(toLink); if(toStatus.canProceed()) { System.out.println("创建JDBC Link成功,ID为: " + toLink.getPersistenceId()); } else { System.out.println("创建JDBC Link失败"); } //创建一个任务 long fromLinkId = fromLink.getPersistenceId(); long toLinkId = toLink.getPersistenceId(); MJob job = client.createJob(fromLinkId, toLinkId); job.setName("HDFS to MySQL job"); job.setCreationUser("admln"); //设置源链接任务配置信息 MFromConfig fromJobConfig = job.getFromJobConfig(); fromJobConfig.getStringInput("fromJobConfig.inputDirectory").setValue("/out/aboutyunLog/HiveExport/ipstatistical/data"); //创建目的地链接任务配置信息 MToConfig toJobConfig = job.getToJobConfig(); toJobConfig.getStringInput("toJobConfig.schemaName").setValue("aboutyunlog"); toJobConfig.getStringInput("toJobConfig.tableName").setValue("ipstatistical"); Status status = client.saveJob(job); if(status.canProceed()) { System.out.println("JOB创建成功,ID为: "+ job.getPersistenceId()); } else { System.out.println("JOB创建失败。"); } //启动任务 long jobId = job.getPersistenceId(); MSubmission submission = client.startJob(jobId); System.out.println("JOB提交状态为 : " + submission.getStatus()); while(submission.getStatus().isRunning() && submission.getProgress() != -1) { System.out.println("进度 : " + String.format("%.2f %%", submission.getProgress() * 100)); //三秒报告一次进度 try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println("JOB执行结束... ..."); System.out.println("Hadoop任务ID为 :" + submission.getExternalId()); Counters counters = submission.getCounters(); if(counters != null) { System.out.println("计数器:"); for(CounterGroup group : counters) { System.out.print("t"); System.out.println(group.getName()); for(Counter counter : group) { System.out.print("tt"); System.out.print(counter.getName()); System.out.print(": "); System.out.println(counter.getValue()); } } } if(submission.getExceptionInfo() != null) { System.out.println("JOB执行异常,异常信息为 : " +submission.getExceptionInfo()); } System.out.println("HDFS通过sqoop传输数据到MySQL统计执行完毕"); } }

最后

以上就是野性豆芽最近收集整理的关于hive to mysql的全部内容,更多相关hive内容请搜索靠谱客的其他文章。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(82)

评论列表共有 0 条评论

立即
投稿
返回
顶部