概述
需求描述
我们有两张表“成绩表”和“总分表”,从成绩表中计算出每个学生的总成绩,记录到总分表中。
表结构
//成绩表记录学生id,课程id,这科分数
CREATE TABLE `score` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`sid` int(11) DEFAULT NULL,
`cid` int(11) DEFAULT NULL,
`score` int(11) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=10 DEFAULT CHARSET=utf8;
//总分表记录学生id和学生总成绩
CREATE TABLE `topscore` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`sid` int(11) DEFAULT NULL,
`totalscore` int(11) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=4 DEFAULT CHARSET=utf8;
成绩如下:
自定义类实现序列化和DBWritable接口
实现序列化方法和数据库读写方法,这里我们读写是两张表。import com.alibaba.fastjson.JSONObject;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.lib.db.DBWritable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
public class MysqlDb_scoreWritable implements Writable,DBWritable {
private int id;
private int sid;
private int cid;
private int score;
private int totalscore;
/**
*数据序列化
*/
public void write(DataOutput out) throws IOException {
out.writeInt(id);
out.writeInt(sid);
out.writeInt(cid);
out.writeInt(score);
out.writeInt(totalscore);
}
public void readFields(DataInput in) throws IOException {
this.id = in.readInt();
this.sid = in.readInt();
this.cid = in.readInt();
this.score = in.readInt();
this.totalscore = in.readInt();
}
/**
* 数据库读写
* 向topscore中写入值
*/
public void write(PreparedStatement statement) throws SQLException {
statement.setInt(1,sid);
statement.setInt(2,totalscore);
}
//从score中读取成绩
public void readFields(ResultSet resultSet) throws SQLException {
id = resultSet.getInt(1);
sid = resultSet.getInt(2);
cid = resultSet.getInt(3);
score = resultSet.getInt(4);
}
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public int getSid() {
return sid;
}
public void setSid(int sid) {
this.sid = sid;
}
public int getCid() {
return cid;
}
public void setCid(int cid) {
this.cid = cid;
}
public int getScore() {
return score;
}
public void setScore(int score) {
this.score = score;
}
public int getTotalscore() {
return totalscore;
}
public void setTotalscore(int totalscore) {
this.totalscore = totalscore;
}
@Override
public String toString() {
JSONObject json = new JSONObject();
json.put("id",id);
json.put("sid",sid);
json.put("cid",cid);
json.put("score",score);
return json.toString();
}
}
mapper类
map方法读取mysql表中一行行的数据,map的输入key,value分别是longWritable,上面我们自定义的mysql读写实体类。输入key是学生的id,value是自定义实体类。reduce的时候我们取出学生的各科成绩相加求总成绩。import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class MysqlDbMapper extends Mapper {
protected void map(LongWritable key, MysqlDb_scoreWritable value, Context context) throws IOException, InterruptedException {
System.out.println(value.toString());
int sid = value.getSid();
context.write(new IntWritable(sid),value);
}
}
reducer类
reduce的时候我们取出学生的各科成绩相加求总成绩。输出key是我们自定义实体类向mysql表中写数据,value为空。import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class MysqlDbReducer extends Reducer {
protected void reduce(IntWritable key, Iterable values, Context context)
throws IOException, InterruptedException {
int totalScore = 0;
for(MysqlDb_scoreWritable m : values){
totalScore += m.getScore();
}
MysqlDb_scoreWritable score = new MysqlDb_scoreWritable();
score.setSid(key.get());
score.setTotalscore(totalScore);
context.write(score,NullWritable.get());
}
}
运行app类import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
import org.apache.hadoop.mapreduce.lib.db.DBInputFormat;
import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat;
public class DbApp {
public static void main(String[] args) throws Exception{
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(DbApp.class);
job.setJobName("mysql read write");
job.setMapperClass(MysqlDbMapper.class);
job.setReducerClass(MysqlDbReducer.class);
job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(MysqlDb_scoreWritable.class);
job.setOutputKeyClass(MysqlDb_scoreWritable.class);
job.setOutputValueClass(NullWritable.class);
//配置数据库信息
String driverclass = "com.mysql.jdbc.Driver";
String url = "jdbc:mysql://192.168.1.215:3306/school";
String username = "root";
String passwd = "root";
DBConfiguration.configureDB(job.getConfiguration(),driverclass,url,username,passwd);
//设置输入内容
DBInputFormat.setInput(job,MysqlDb_scoreWritable.class,"select * from score","select count(id) from score");
//设置输出内容,第一个string是表名,后面可以跟多个string是表字段名
DBOutputFormat.setOutput(job,"topscore","sid","totalscore");
job.waitForCompletion(true);
}
}
最后
以上就是健忘小伙为你收集整理的hadoop读取mysql数据_hadoop读写mysql数据库的全部内容,希望文章能够帮你解决hadoop读取mysql数据_hadoop读写mysql数据库所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复