概述
一、COVID-19 案例
上篇文章对 MapReduce 进行了介绍,并编写了 WordCount 经典案例的实现,本篇为继续加深 MapReduce 的用法,实践 COVID-19 新冠肺炎案例,下面是上篇文章的地址:
https://blog.csdn.net/qq_43692950/article/details/127195121
COVID-19,简称“新冠肺炎”,世界卫生组织命名为“2019冠状病毒病” [1-2] ,是指2019新型冠状病毒感染导致的肺炎。现有美国 2021-01-28 号,各个县county的新冠疫情累计案例信息,包括确诊病例和死亡病例,数据格式如下所示:
date(日期),county(县),state(州),fips(县编码code),cases(累计确诊病例),deaths(累计死亡病例)
2021-01-28,Pike,Alabama,01109,2704,35
2021-01-28,Randolph,Alabama,01111,1505,37
2021-01-28,Russell,Alabama,01113,3675,16
2021-01-28,Shelby,Alabama,01117,19878,141
2021-01-28,St. Clair,Alabama,01115,8047,147
2021-01-28,Sumter,Alabama,01119,925,28
2021-01-28,Talladega,Alabama,01121,6711,114
2021-01-28,Tallapoosa,Alabama,01123,3258,112
2021-01-28,Tuscaloosa,Alabama,01125,22083,283
2021-01-28,Walker,Alabama,01127,6105,185
2021-01-28,Washington,Alabama,01129,1454,27
数据集下载
https://download.csdn.net/download/qq_43692950/86805389
二、计算各个州的累积cases、deaths
创建 VO 类存储 cases、deaths
个数:
@Data
@AllArgsConstructor
@NoArgsConstructor
public class CountVO implements Writable {
private Long cases;//确诊病例数
private Long deaths;//死亡病例数
public void set(long cases, long deaths) {
this.cases = cases;
this.deaths = deaths;
}
/**
* 序列化方法
*/
@Override
public void write(DataOutput out) throws IOException {
out.writeLong(cases);
out.writeLong(deaths);
}
/**
* 反序列化方法 注意顺序
*/
@Override
public void readFields(DataInput in) throws IOException {
this.cases = in.readLong();
this.deaths =in.readLong();
}
@Override
public String toString() {
return cases +"t"+ deaths;
}
}
创建 Mapper 类,截取出州
和 cases、deaths
,以州
为 key ,CountVO
为 Value :
public class SumMapper extends Mapper<LongWritable, Text, Text, CountVO> {
Text outKey = new Text();
CountVO outValue = new CountVO();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] fields = value.toString().split(",");
//州
outKey.set(fields[2]);
//Covid数据 确诊病例 死亡病例
outValue.set(Long.parseLong(fields[fields.length-2]),Long.parseLong(fields[fields.length-1]));
context.write(outKey,outValue);
}
}
创建 Reducer ,对 cases、deaths
累加:
public class SumReducer extends Reducer<Text, CountVO,Text, CountVO> {
CountVO outValue = new CountVO();
@Override
protected void reduce(Text key, Iterable<CountVO> values, Context context) throws IOException, InterruptedException {
long totalCases = 0;
long totalDeaths =0;
//累加统计
for (CountVO value : values) {
totalCases += value.getCases();
totalDeaths +=value.getDeaths();
}
outValue.set(totalCases,totalDeaths);
context.write(key,outValue);
}
}
创建驱动类,加载上面的 Mapper 和 Reducer :
public class SumDriver extends Configured implements Tool {
public static void main(String[] args) throws Exception{
//配置文件对象
Configuration conf = new Configuration();
int status = ToolRunner.run(conf, new SumDriver(), args);
System.exit(status);
}
@Override
public int run(String[] args) throws Exception {
// 配置本次作业的输入数据路径 和输出数据路径,通过参数传递进来,
// 如果输入是一个目录,则会读取目录下的所有文件汇总到进行处理
Path input = new Path(args[0]);
Path output = new Path(args[1]);
// 输出目录必须为空,如果不为空则会报错提示
FileSystem fs = FileSystem.get(getConf());
if(fs.exists(output)){
fs.delete(output,true);
}
// 创建作业实例
Job job = Job.getInstance(getConf(), SumDriver.class.getSimpleName());
// 设置作业驱动类
job.setJarByClass(SumDriver.class);
// 设置作业mapper reducer类
job.setMapperClass(SumMapper.class);
job.setReducerClass(SumReducer.class);
// 设置作业mapper阶段输出key value数据类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(CountVO.class);
//设置作业reducer阶段输出key value数据类型 也就是程序最终输出数据类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(CountVO.class);
// 配置作业的输入数据路径
FileInputFormat.addInputPath(job, input);
// 配置作业的输出数据路径
FileOutputFormat.setOutputPath(job, output);
return job.waitForCompletion(true)? 0:1;
}
}
数据集目录和输出目录通过参数传递进来,这里我将数据集放在了 D:/test/input
下:
如果是打包后放在 hadoop 集群运行,则:
hadoop jar <jar path> <driver class path> <args>
# 或者
yarn jar <jar path> <driver class path> <args>
运行成功后,到输出目录查看结果:
已成功统计出相关结果。
三、对上面计算的结果根据deaths
进行倒叙排列
上麦已经计算出了每个州的cases、deaths
,如果还需要根据deaths
进行倒叙排列
的话,我们可以针对上面 job
输出的结果在进行处理,利用 MapReduce
中key
的排序行为,将上个 job
的 value
作为本次 job
的key
。
对 CountVO
进行修改,通过实现 Comparable
实现排序的效果,不过在上面我们已经实现了 Writable
接口,在上篇文章中就讲到 Hadoop
为我们提供了 WritableComparable
已经实现好了 Writable, Comparable
,下面将 CountVO
中的 Writable
换成 WritableComparable
:
@Data
@AllArgsConstructor
@NoArgsConstructor
public class CountVO implements WritableComparable<CountVO> {
private Long cases;//确诊病例数
private Long deaths;//死亡病例数
public void set(long cases, long deaths) {
this.cases = cases;
this.deaths = deaths;
}
/**
* 序列化方法
*/
@Override
public void write(DataOutput out) throws IOException {
out.writeLong(cases);
out.writeLong(deaths);
}
/**
* 反序列化方法 注意顺序
*/
@Override
public void readFields(DataInput in) throws IOException {
this.cases = in.readLong();
this.deaths =in.readLong();
}
@Override
public String toString() {
return cases +"t"+ deaths;
}
@Override
public int compareTo(CountVO o) {
return this.deaths.equals(o.getDeaths()) ? 0 : (this.deaths - o.getDeaths() > 0 ? -1 : 1);
}
}
compareTo
方法用于将当前对象与方法的参数进行比较。如果指定的数与参数相等返回0。如果指定的数小于参数返回 -1。如果指定的数大于参数返回 1。
创建 Mapper
,key
为 CountVO
:
public class SortSumMapper extends Mapper<LongWritable, Text, CountVO, Text> {
CountVO outKey = new CountVO();
Text outValue = new Text();
@Override
protected void map(LongWritable key, Text value, Mapper.Context context) throws IOException, InterruptedException {
String[] fields = value.toString().split("t");
outKey.set(Long.parseLong(fields[1]),Long.parseLong(fields[2]));
outValue.set(fields[0]);
context.write(outKey,outValue);
}
}
编写 Reducer
, 无需做任何操作直接 write
即可
public class SortSumReducer extends Reducer<CountVO, Text, Text,CountVO> {
@Override
protected void reduce(CountVO key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
Text outKey = values.iterator().next();
context.write(outKey,key);
}
}
编写驱动类:
public class SortSumDriver extends Configured implements Tool {
public static void main(String[] args) throws Exception{
//配置文件对象
Configuration conf = new Configuration();
int status = ToolRunner.run(conf, new SortSumDriver(), args);
System.exit(status);
}
@Override
public int run(String[] args) throws Exception {
// 配置本次作业的输入数据路径 和输出数据路径,通过参数传递进来,
// 如果输入是一个目录,则会读取目录下的所有文件汇总到进行处理
Path input = new Path(args[0]);
Path output = new Path(args[1]);
// 输出目录必须为空,如果不为空则会报错提示
FileSystem fs = FileSystem.get(getConf());
if(fs.exists(output)){
fs.delete(output,true);
}
// 创建作业实例
Job job = Job.getInstance(getConf(), SortSumDriver.class.getSimpleName());
// 设置作业驱动类
job.setJarByClass(SortSumDriver.class);
// 设置作业mapper reducer类
job.setMapperClass(SortSumMapper.class);
job.setReducerClass(SortSumReducer.class);
// 设置作业mapper阶段输出key value数据类型
job.setMapOutputKeyClass(CountVO.class);
job.setMapOutputValueClass(Text.class);
//设置作业reducer阶段输出key value数据类型 也就是程序最终输出数据类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(CountVO.class);
// 配置作业的输入数据路径
FileInputFormat.addInputPath(job, input);
// 配置作业的输出数据路径
FileOutputFormat.setOutputPath(job, output);
return job.waitForCompletion(true)? 0:1;
}
}
将上个 job
的结果放在 D:/test/input1
下,执行该驱动类:
执行成功后,到输出目录查看结果:
已经实现根据 死亡病例
进行倒叙排列
。
四、对每个州的 deaths 筛选出Top3的县
修改 CountVO
:
@Data
@AllArgsConstructor
@NoArgsConstructor
public class CountVO implements WritableComparable<CountVO> {
private String county;//县
private Long cases;//确诊病例数
private Long deaths;//死亡病例数
public CountVO(CountVO vo){
this.county = vo.getCounty();
this.cases = vo.getCases();
this.deaths = vo.getDeaths();
}
public void set(long cases, long deaths, String county) {
this.cases = cases;
this.deaths = deaths;
this.county = county;
}
/**
* 序列化方法
*/
@Override
public void write(DataOutput out) throws IOException {
out.writeLong(cases);
out.writeLong(deaths);
out.writeUTF(county);
}
/**
* 反序列化方法 注意顺序
*/
@Override
public void readFields(DataInput in) throws IOException {
this.cases = in.readLong();
this.deaths = in.readLong();
this.county = in.readUTF();
}
@Override
public String toString() {
return county + "t" + cases + "t" + deaths;
}
@Override
public int compareTo(CountVO o) {
return this.deaths.equals(o.getDeaths()) ? 0 : (this.deaths - o.getDeaths() > 0 ? -1 : 1);
}
}
修改 SumMapper
类:
public class SumMapper extends Mapper<LongWritable, Text, Text, CountVO> {
Text outKey = new Text();
CountVO outValue = new CountVO();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] fields = value.toString().split(",");
//州
outKey.set(fields[2]);
//Covid数据 确诊病例 死亡病例 县
outValue.set(Long.parseLong(fields[fields.length - 2]), Long.parseLong(fields[fields.length - 1]), fields[1]);
context.write(outKey, outValue);
}
}
修改 SumReducer
类:
public class SumReducer extends Reducer<Text, CountVO, Text, CountVO> {
CountVO outValue = new CountVO();
@Override
protected void reduce(Text key, Iterable<CountVO> values, Context context) throws IOException, InterruptedException {
List<CountVO> vList = new ArrayList<>();
values.forEach(v -> vList.add(new CountVO(v)));
vList.sort(CountVO::compareTo);
vList.stream().filter(Objects::nonNull).limit(3).forEach(c -> {
outValue.set(c.getCases(), c.getDeaths(), c.getCounty());
try {
context.write(key, outValue);
} catch (Exception e) {
e.printStackTrace();
}
});
}
}
驱动类无需修改,下面执行 Job
,到输出目录查看结果:
已经计算出了每个州的死亡病例 Top3
。
五、将二、三两个任务合并在一起执行
上面第三点依赖于第二点的结果,但是上面是分成了两个驱动类执行,在 MapReduce 中提供了工作流,可以通过一个提交来完成原来需要提交2次的任务。
修改驱动类:
public class SumDriver {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
ControlledJob ctrljob1 = getJob1(conf);
ControlledJob ctrljob2 = getJob2(conf);
//设置依赖job的依赖关系
ctrljob2.addDependingJob(ctrljob1);
// 主控制容器,控制上面的总的两个子作业
JobControl jobCtrl = new JobControl("mainCtrl");
// 添加到总的JobControl里,进行控制
jobCtrl.addJob(ctrljob1);
jobCtrl.addJob(ctrljob2);
// 在子线程启动
Thread t = new Thread(jobCtrl);
t.start();
while(true) {
if (jobCtrl.allFinished()) {// 如果作业成功完成,就打印成功作业的信息
System.out.println(jobCtrl.getSuccessfulJobList());
jobCtrl.stop();
break;
}
}
}
private static ControlledJob getJob1(Configuration conf) throws IOException {
Job job = Job.getInstance(conf, SumDriver.class.getSimpleName());
job.setJarByClass(SumDriver.class);
// 设置作业mapper reducer类
job.setMapperClass(SumMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(CountVO.class);
job.setReducerClass(SumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(CountVO.class);
Path input = new Path("D:/test/input");
Path output = new Path("D:/test/output");
FileSystem fs = FileSystem.get(conf);
if (fs.exists(output)) {
fs.delete(output, true);
}
FileInputFormat.addInputPath(job, input);
FileOutputFormat.setOutputPath(job, output);
//转化成受控作业
ControlledJob ctrljob = new ControlledJob(conf);
ctrljob.setJob(job);
return ctrljob;
}
private static ControlledJob getJob2(Configuration conf) throws IOException {
Job job = Job.getInstance(conf, SumDriver.class.getSimpleName());
// 设置作业驱动类
job.setJarByClass(SumDriver.class);
// 设置作业mapper reducer类
job.setMapperClass(SortSumMapper.class);
job.setMapOutputKeyClass(CountVO.class);
job.setMapOutputValueClass(Text.class);
job.setReducerClass(SortSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(CountVO.class);
Path input = new Path("D:/test/output");
Path output = new Path("D:/test/output1");
FileSystem fs = FileSystem.get(conf);
if (fs.exists(output)) {
fs.delete(output, true);
}
FileInputFormat.addInputPath(job, input);
FileOutputFormat.setOutputPath(job, output);
//转化成受控作业
ControlledJob ctrljob = new ControlledJob(conf);
ctrljob.setJob(job);
return ctrljob;
}
}
执行后可以看到两个结果目录:
最后
以上就是平淡咖啡为你收集整理的Hadoop3 - MapReduce COVID-19 案例实践的全部内容,希望文章能够帮你解决Hadoop3 - MapReduce COVID-19 案例实践所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复