概述
一般的hadoop的mapreduce操作都是对文本文件或者在hdfs上的数据,如何在数据库中读取数据进行操作。
首先需要自定义一个DBWritable类。
package com.huawei.hdfs.com.huawei.mysql;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.lib.db.DBWritable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
public class MyDBWritable implements DBWritable, Writable {
private String name;
private int age;
private String p_name;
private int p_age;
public MyDBWritable(){
}
public MyDBWritable(String name, int age) {
this.name = name;
this.age = age;
this.p_name=p_name;
this.p_age=p_age;
}
public void write(DataOutput out) throws IOException {
out.writeUTF(name);
out.writeInt(age);
out.writeUTF(p_name);
out.writeInt(p_age);
}
public void readFields(DataInput in) throws IOException {
name=in.readUTF();
age=in.readInt();
p_name=in.readUTF();
p_age=in.readInt();
}
//DB写数据的过程
public void write(PreparedStatement statement) throws SQLException {
statement.setString(1,p_name);
statement.setInt(2,p_age);
}
//DBdu读数据的过程
public void readFields(ResultSet resultSet) throws SQLException {
name=resultSet.getString(1);
age=resultSet.getInt(2);
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
public String getP_name() {
return p_name;
}
public void setP_name(String p_name) {
this.p_name = p_name;
}
public int getP_age() {
return p_age;
}
public void setP_age(int p_age) {
this.p_age = p_age;
}
}
Mapper类
package com.huawei.hdfs.com.huawei.mysql;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class SQLMapper extends Mapper<LongWritable,MyDBWritable,Text,IntWritable>{
@Override
protected void map(LongWritable key, MyDBWritable value, Context context) throws IOException, InterruptedException {
context.write(new Text(value.getName()),new IntWritable(value.getAge()));
}
}
Reducer类
package com.huawei.hdfs.com.huawei.mysql;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class SQLReducer extends Reducer<Text,IntWritable,MyDBWritable,NullWritable>{
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
MyDBWritable myDBWritable=new MyDBWritable();
for(IntWritable it:values){
myDBWritable.setP_name(key.toString());
myDBWritable.setP_age(it.get()+100);
context.write(myDBWritable,NullWritable.get());
}
}
}
运行类
package com.huawei.hdfs.com.huawei.mysql;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
import org.apache.hadoop.mapreduce.lib.db.DBInputFormat;
import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class SQL_Test {
public static void main(String[] args) throws Exception {
Configuration conf=new Configuration();
conf.set("fs.defaultFS","file:///");
Job job=Job.getInstance(conf);
FileSystem fs=FileSystem.get(conf);
if(fs.exists(new Path(args[0]))){
fs.delete(new Path(args[0]),true);
}
job.setJobName("SQL_TEST");
job.setJarByClass(SQL_Test.class);
job.setMapperClass(SQLMapper.class);
job.setReducerClass(SQLReducer.class);
//配置数据库信息
String driveclass="com.mysql.jdbc.Driver";
String url="jdbc:mysql://localhost:3306/bigtable";
String username="root";
String password="root";
DBConfiguration.configureDB(job.getConfiguration(),driveclass,url,username,password);
//设置数据库输入
//需要通过总的记录数来计算切片
DBInputFormat.setInput(job,MyDBWritable.class,"select name,age from persion","select count(*) from persion");
//设置数据库输出
DBOutputFormat.setOutput(job,"state","p_name","p_age");
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.waitForCompletion(true);
}
}
其中查看自己MySQL远程调用的接口的方法:
show global variables like 'port';
如果要在集群中运行该jar,那么要保证在集群中有mysql-connector-java-5.1.17.jar这个包,可以放在hadoop文件夹中/hadoop/share/hadoop/common/lib目录下。
最后
以上就是雪白微笑为你收集整理的hadoop与MySQL结合的全部内容,希望文章能够帮你解决hadoop与MySQL结合所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复