我是靠谱客的博主 健忘小伙,最近开发中收集的这篇文章主要介绍hadoop读取mysql数据_hadoop读写mysql数据库,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

需求描述

我们有两张表“成绩表”和“总分表”,从成绩表中计算出每个学生的总成绩,记录到总分表中。

表结构

//成绩表记录学生id,课程id,这科分数

CREATE TABLE `score` (

`id` int(11) NOT NULL AUTO_INCREMENT,

`sid` int(11) DEFAULT NULL,

`cid` int(11) DEFAULT NULL,

`score` int(11) DEFAULT NULL,

PRIMARY KEY (`id`)

) ENGINE=InnoDB AUTO_INCREMENT=10 DEFAULT CHARSET=utf8;

//总分表记录学生id和学生总成绩

CREATE TABLE `topscore` (

`id` int(11) NOT NULL AUTO_INCREMENT,

`sid` int(11) DEFAULT NULL,

`totalscore` int(11) DEFAULT NULL,

PRIMARY KEY (`id`)

) ENGINE=InnoDB AUTO_INCREMENT=4 DEFAULT CHARSET=utf8;

成绩如下:

自定义类实现序列化和DBWritable接口

实现序列化方法和数据库读写方法,这里我们读写是两张表。import com.alibaba.fastjson.JSONObject;

import org.apache.hadoop.io.Writable;

import org.apache.hadoop.mapreduce.lib.db.DBWritable;

import java.io.DataInput;

import java.io.DataOutput;

import java.io.IOException;

import java.sql.PreparedStatement;

import java.sql.ResultSet;

import java.sql.SQLException;

public class MysqlDb_scoreWritable implements Writable,DBWritable {

private int id;

private int sid;

private int cid;

private int score;

private int totalscore;

/**

*数据序列化

*/

public void write(DataOutput out) throws IOException {

out.writeInt(id);

out.writeInt(sid);

out.writeInt(cid);

out.writeInt(score);

out.writeInt(totalscore);

}

public void readFields(DataInput in) throws IOException {

this.id = in.readInt();

this.sid = in.readInt();

this.cid = in.readInt();

this.score = in.readInt();

this.totalscore = in.readInt();

}

/**

* 数据库读写

* 向topscore中写入值

*/

public void write(PreparedStatement statement) throws SQLException {

statement.setInt(1,sid);

statement.setInt(2,totalscore);

}

//从score中读取成绩

public void readFields(ResultSet resultSet) throws SQLException {

id = resultSet.getInt(1);

sid = resultSet.getInt(2);

cid = resultSet.getInt(3);

score = resultSet.getInt(4);

}

public int getId() {

return id;

}

public void setId(int id) {

this.id = id;

}

public int getSid() {

return sid;

}

public void setSid(int sid) {

this.sid = sid;

}

public int getCid() {

return cid;

}

public void setCid(int cid) {

this.cid = cid;

}

public int getScore() {

return score;

}

public void setScore(int score) {

this.score = score;

}

public int getTotalscore() {

return totalscore;

}

public void setTotalscore(int totalscore) {

this.totalscore = totalscore;

}

@Override

public String toString() {

JSONObject json = new JSONObject();

json.put("id",id);

json.put("sid",sid);

json.put("cid",cid);

json.put("score",score);

return json.toString();

}

}

mapper类

map方法读取mysql表中一行行的数据,map的输入key,value分别是longWritable,上面我们自定义的mysql读写实体类。输入key是学生的id,value是自定义实体类。reduce的时候我们取出学生的各科成绩相加求总成绩。import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class MysqlDbMapper extends Mapper {

protected void map(LongWritable key, MysqlDb_scoreWritable value, Context context) throws IOException, InterruptedException {

System.out.println(value.toString());

int sid = value.getSid();

context.write(new IntWritable(sid),value);

}

}

reducer类

reduce的时候我们取出学生的各科成绩相加求总成绩。输出key是我们自定义实体类向mysql表中写数据,value为空。import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.NullWritable;

import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class MysqlDbReducer extends Reducer {

protected void reduce(IntWritable key, Iterable values, Context context)

throws IOException, InterruptedException {

int totalScore = 0;

for(MysqlDb_scoreWritable m : values){

totalScore += m.getScore();

}

MysqlDb_scoreWritable score = new MysqlDb_scoreWritable();

score.setSid(key.get());

score.setTotalscore(totalScore);

context.write(score,NullWritable.get());

}

}

运行app类import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.NullWritable;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;

import org.apache.hadoop.mapreduce.lib.db.DBInputFormat;

import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat;

public class DbApp {

public static void main(String[] args) throws Exception{

Configuration conf = new Configuration();

Job job = Job.getInstance(conf);

job.setJarByClass(DbApp.class);

job.setJobName("mysql read write");

job.setMapperClass(MysqlDbMapper.class);

job.setReducerClass(MysqlDbReducer.class);

job.setMapOutputKeyClass(IntWritable.class);

job.setMapOutputValueClass(MysqlDb_scoreWritable.class);

job.setOutputKeyClass(MysqlDb_scoreWritable.class);

job.setOutputValueClass(NullWritable.class);

//配置数据库信息

String driverclass = "com.mysql.jdbc.Driver";

String url = "jdbc:mysql://192.168.1.215:3306/school";

String username = "root";

String passwd = "root";

DBConfiguration.configureDB(job.getConfiguration(),driverclass,url,username,passwd);

//设置输入内容

DBInputFormat.setInput(job,MysqlDb_scoreWritable.class,"select * from score","select count(id) from score");

//设置输出内容,第一个string是表名,后面可以跟多个string是表字段名

DBOutputFormat.setOutput(job,"topscore","sid","totalscore");

job.waitForCompletion(true);

}

}

最后

以上就是健忘小伙为你收集整理的hadoop读取mysql数据_hadoop读写mysql数据库的全部内容,希望文章能够帮你解决hadoop读取mysql数据_hadoop读写mysql数据库所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(70)

评论列表共有 0 条评论

立即
投稿
返回
顶部