概述
hadoop连接mysql数据库执行数据读写数据库操作
为了方便 MapReduce 直接访问关系型数据库(Mysql,Oracle),Hadoop提供了DBInputFormat和DBOutputFormat两个类。通过DBInputFormat类把数据库表数据读入到HDFS,根据DBOutputFormat类把MapReduce产生的结果集导入到数据库表中。
运行MapReduce时候报错:java.io.IOException: com.mysql.jdbc.Driver,一般是由于程序找不到mysql驱动包。解决方法是让每个tasktracker运行MapReduce程序时都可以找到该驱动包。
添加包有两种方式:
(1)在每个节点下的${HADOOP_HOME}/lib下添加该包。重启集群,一般是比较原始的方法。
(2)a)把包传到集群上: hadoop fs -put MySQL-connector-java-5.1.0- bin.jar /hdfsPath/
b)在mr程序提交job前,添加语句:DistributedCache.addFileToClassPath(new Path(“/hdfsPath/mysql- connector-java-5.1.0-bin.jar”),conf);
mysql数据库存储到hadoop hdfs
mysql表创建和数据初始化
[sql] view plain copy print?
1.DROP TABLE IF EXISTS wu_testhadoop
;
2.CREATE TABLE wu_testhadoop
(
3. id
int(11) NOT NULL AUTO_INCREMENT,
4. title
varchar(255) DEFAULT NULL,
5. content
varchar(255) DEFAULT NULL,
6. PRIMARY KEY (id
)
7.) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8;
8.
9.– —————————-
10.– Records of wu_testhadoop
11.– —————————-
12.INSERT INTO wu_testhadoop
VALUES (‘1’, ‘123’, ‘122312’);
13.INSERT INTO wu_testhadoop
VALUES (‘2’, ‘123’, ‘123456’);
定义hadoop数据访问
mysql表创建完毕后,我们需要定义hadoop访问mysql的规则;
hadoop提供了org.apache.hadoop.io.Writable接口来实现简单的高效的可序列化的协议,该类基于DataInput和DataOutput来实现相关的功能。
hadoop对数据库访问也提供了org.apache.hadoop.mapred.lib.db.DBWritable接口,其中write方法用于对PreparedStatement对象设定值,readFields方法用于对从数据库读取出来的对象进行列的值绑定;
以上两个接口的使用如下(内容是从源码得来)
writable
[java] view plain copy print?
1.public class MyWritable implements Writable {
2. // Some data
3. private int counter;
4. private long timestamp;
5.
6. public void write(DataOutput out) throws IOException {
7. out.writeInt(counter);
8. out.writeLong(timestamp);
9. }
10.
11. public void readFields(DataInput in) throws IOException {
12. counter = in.readInt();
13. timestamp = in.readLong();
14. }
15.
16. public static MyWritable read(DataInput in) throws IOException {
17. MyWritable w = new MyWritable();
18. w.readFields(in);
19. return w;
20. }
21. }
DBWritable
[java] view plain copy print?
1.public class MyWritable implements Writable, DBWritable {
2. // Some data
3. private int counter;
4. private long timestamp;
5.
6. //Writable#write() implementation
7. public void write(DataOutput out) throws IOException {
8. out.writeInt(counter);
9. out.writeLong(timestamp);
10. }
11.
12. //Writable#readFields() implementation
13. public void readFields(DataInput in) throws IOException {
14. counter = in.readInt();
15. timestamp = in.readLong();
16. }
17.
18. public void write(PreparedStatement statement) throws SQLException {
19. statement.setInt(1, counter);
20. statement.setLong(2, timestamp);
21. }
22.
23. public void readFields(ResultSet resultSet) throws SQLException {
24. counter = resultSet.getInt(1);
25. timestamp = resultSet.getLong(2);
26. }
27. }
数据库对应的实现
[java] view plain copy print?
1.package com.wyg.hadoop.mysql.bean;
2.
3.import java.io.DataInput;
4.import java.io.DataOutput;
5.import java.io.IOException;
6.import java.sql.PreparedStatement;
7.import java.sql.ResultSet;
8.import java.sql.SQLException;
9.
10.import org.apache.hadoop.io.Text;
11.import org.apache.hadoop.io.Writable;
12.import org.apache.hadoop.mapred.lib.db.DBWritable;
13.
14.public class DBRecord implements Writable, DBWritable{
15. private int id;
16. private String title;
17. private String content;
18. public int getId() {
19. return id;
20. }
21.
22. public void setId(int id) {
23. this.id = id;
24. }
25.
26. public String getTitle() {
27. return title;
28. }
29.
30. public void setTitle(String title) {
31. this.title = title;
32. }
33.
34. public String getContent() {
35. return content;
36. }
37.
38. public void setContent(String content) {
39. this.content = content;
40. }
41.
42. @Override
43. public void readFields(ResultSet set) throws SQLException {
44. this.id = set.getInt(“id”);
45. this.title = set.getString(“title”);
46. this.content = set.getString(“content”);
47. }
48.
49. @Override
50. public void write(PreparedStatement pst) throws SQLException {
51. pst.setInt(1, id);
52. pst.setString(2, title);
53. pst.setString(3, content);
54. }
55.
56. @Override
57. public void readFields(DataInput in) throws IOException {
58. this.id = in.readInt();
59. this.title = Text.readString(in);
60. this.content = Text.readString(in);
61. }
62.
63. @Override
64. public void write(DataOutput out) throws IOException {
65. out.writeInt(this.id);
66. Text.writeString(out, this.title);
67. Text.writeString(out, this.content);
68. }
69.
70. @Override
71. public String toString() {
72. return this.id + ” ” + this.title + ” ” + this.content;
73. }
74.}
实现Map/Reduce
[java] view plain copy print?
1.package com.wyg.hadoop.mysql.mapper;
2.
3.import java.io.IOException;
4.
5.import org.apache.hadoop.io.LongWritable;
6.import org.apache.hadoop.io.Text;
7.import org.apache.hadoop.mapred.MapReduceBase;
8.import org.apache.hadoop.mapred.Mapper;
9.import org.apache.hadoop.mapred.OutputCollector;
10.import org.apache.hadoop.mapred.Reporter;
11.
12.import com.wyg.hadoop.mysql.bean.DBRecord;
13.
14.@SuppressWarnings(“deprecation”)
15.public class DBRecordMapper extends MapReduceBase implements Mapper
最后
以上就是超帅睫毛为你收集整理的hadoop连接mysql数据库执行数据读写数据库操作的全部内容,希望文章能够帮你解决hadoop连接mysql数据库执行数据读写数据库操作所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复