我是靠谱客的博主 花痴火,最近开发中收集的这篇文章主要介绍Hadoop编程——第五章:(5)MapReduce自定义对象序列化案例案例1:各州累计病例数量统计一、需求分析二、代码实现,觉得挺不错的,现在分享给大家,希望可以做个参考。
概述
美国新冠疫情COVID-19病例数统计
有一份2020-12-01号美国各县county的新冠疫情统计数据,包括累计确诊病例、累计死亡病例。使用MapReduce对疫情数据进行各种分析统计。
案例背后的核心是学会自定义MapReduce各个组件。包括自定义对象、序列化、排序、分区、分组。
数据字段说明
date (日期) , county(县) , state(州 ) , fips(县编码code ) , cases(累计确诊病例) , deaths(索计死亡病例)。
案例学习目标:
MapReduce自定义对象序列化
MapReduce自定义排序
MapReduce自定义分区
MapReduce自定义分组
MapReduce自定义分组扩展:topN
案例1:各州累计病例数量统计
统计美国2020-12-01,每个州state累计确诊案例数、累计死亡案例数。
一、需求分析
“州”作为key,CovidCountBean将两个案例数(cases,deaths)封装起来
- 对于涉及多属性数据传递,建议使用javaBean进行封装传递
- 注意在MapReduce中需要实现序列化机制
- 如果是作为key传递还需要实现Compareable接口
在reduce阶段key相同的就会被分成一组,一组调用一次reduce方法处理。
在本业务中key相同即是同一个州的数据
把各个县的病例累计求和就是该州的疫情数据。
注意要使用toString方法。
二、代码实现
(一)pom文件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>USA_Covid</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version> 2.9.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version> 2.9.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version> 2.9.2</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.28</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>2.4</version>
<configuration>
<archive>
<manifest>
<addClasspath>true</addClasspath>
<classpathPrefix>lib</classpathPrefix>
<mainClass>CovidSumDriver</mainClass>
</manifest>
</archive>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
</plugins>
</build>
</project>
(二)log4j.properties
log4j.rootLogger=INFO,STDOUT,R
#把日志信息打印到控制台上
log4j.appender.STDOUT=org.apache.log4j.ConsoleAppender
log4j.appender.STDOUT.Threshold=DEBUG
log4j.appender.STDOUT.layout=org.apache.log4j.PatternLayout
log4j.appender.STDOUT.layout.ConversionPattern=[%p] [%l] %10.10c - %m%n
#放在工程路径下,通过文件查看日志
log4j.appender.R=org.apache.log4j.RollingFileAppender
log4j.appender.R.File=mapreduce_test.log
log4j.appender.R.MaxFileSize=1MB
log4j.appender.R.MaxBackupIndex=1
log4j.appender.R.layout=org.apache.log4j.PatternLayout
log4j.appender.R.layout.ConversionPattern=%p %t %c - %m%n
log4j.logger.org.example=DEBUG
(三)CovidCountBean封装两个属性
import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
/**
* 自定义对象作为数据类型在MR中传递
* 一定要实现Hadoop的序列化机制:接口Writable, ctrl+i 实现两个方法
*/
public class CovidCountBean implements Writable {
//1、封装私有的属性
private long cases; //确诊病例数
private long deaths; //死亡病例数
//2、有参无参构造
public CovidCountBean() {
}
public CovidCountBean(long cases, long deaths) {
this.cases = cases;
this.deaths = deaths;
}
/**
*对有参构造进行修改,提供一个set方法
* 自己封装对象的set方法,用于对象属性赋值
*/
public void set(long cases, long deaths) {
this.cases = cases;
this.deaths = deaths;
}
//3、set和get方法
public long getCases() {
return cases;
}
public void setCases(long cases) {
this.cases = cases;
}
public long getDeaths() {
return deaths;
}
public void setDeaths(long deaths) {
this.deaths = deaths;
}
//4、实现对象的方法
// @Override
// public String toString() {
// return "CovidCountBean{" +
// "cases=" + cases +
// ", deaths=" + deaths +
// '}';
// }
//修改一下,返回的都是数据
@Override
public String toString() {
return cases + "t" + deaths;
}
/**
* 序列化方法,可以控制把哪写字段序列化出去
*/
@Override
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeLong(cases);
dataOutput.writeLong(deaths);
}
/**
* 反序列化方法
* todo 注意反序列化的顺序和序列化顺序一致
*/
@Override
public void readFields(DataInput dataInput) throws IOException {
this.cases = dataInput.readLong();
this.deaths = dataInput.readLong();
}
}
(四)CovidSumMapper
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class CovidSumMapper extends Mapper<LongWritable, Text, Text,CovidCountBean> {
/**
* 3、创建输出对象
*/
Text outKey = new Text();
CovidCountBean outValue = new CovidCountBean();
/**
* 1、重写map父类方法:map回车
*/
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
/**
* 2、读取一行数据进行切割
*/
String[] fields = value.toString().split(",");
// String line = value.toString();
// String[] fields = line.split("|");
/**
* 4、提取数据 州、确诊数、死亡数
*/
outKey.set(fields[2]);//下标从0开始
// outValue.set(Long.parseLong(fields[4]),Long.parseLong(fields[5]));//但outValue没有set方法,所以要去改造一下
//因为疫情数据中,会有部分数据缺少,正着数就会越界,可以采用倒着数
outValue.set(Long.parseLong(fields[fields.length-2].trim()),Long.parseLong(fields[fields.length-1].trim()));
/**
* 5、输出结果
*/
context.write(outKey,outValue);
}
}
(五)CovidSumReducer
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class CovidSumReducer extends Reducer<Text,CovidCountBean,Text,CovidCountBean> {
CovidCountBean outValue = new CovidCountBean();
@Override
protected void reduce(Text key, Iterable<CovidCountBean> values, Context context) throws IOException, InterruptedException {
/**
* 创建统计变量
*/
long totalCases = 0;
long totalDeaths = 0;
/**
* 遍历该州的各个县的数据
*/
for(CovidCountBean value : values){
totalCases += value.getCases();
totalDeaths = totalDeaths +value.getDeaths();
}
/**
* 输出结果赋值
*/
outValue.set(totalCases,totalDeaths);
context.write(key,outValue);
}
}
(六)CovidSumDriver
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/**
* 该类就是MapReduce程序客户端驱动类
* 主要是构造Job对象实例
* 指定各种组件属性:mapper、reducer类,输入输出的数据类型,输入输出的数据路径,提交job作业(job.submit())
*/
public class CovidSumDriver {
public static void main(String[] args) throws Exception {
//创建驱动类
Configuration conf = new Configuration();
//本地模式运行可以直接指定路径,或者配置Configurations
// args = new String[] {"F:/MyBigData/Hadoop/data/mr_wordcount/input ","F:/MyBigData/Hadoop/data/mr_wordcount/output"};
//设置mapreduce程序的运行模式
// conf.set("mapreduce.framework.name","local");
//构造job作业的实例,参数(配置对象,job名字)
Job job = Job.getInstance(conf, CovidSumDriver.class.getSimpleName());
//设置mr程序运行的主类
job.setJarByClass(CovidSumDriver.class);
//设置本次mr程序的mapper类型、reducer类型
job.setMapperClass(CovidSumMapper.class);
job.setReducerClass(CovidSumReducer.class);
//指定mapper阶段输出的key value数据类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(CovidCountBean.class);
//指定reducer阶段输出的key value数据类型,也是mr程序最终的输出数据类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(CovidCountBean.class);
//配置本次作业的输入数据路径和输出数据路径
Path inputPath = new Path(args[0]);
Path outputPath = new Path(args[1]);
//todo 默认组件 TextInputFormat TextOutputFormat
FileInputFormat.setInputPaths(job, inputPath);
FileOutputFormat.setOutputPath(job,outputPath);
//todo 判断输出路径是否已经存在,如果已经存在,先删除
FileSystem fs = FileSystem.get(conf);
if(fs.exists(outputPath)){
fs.delete(outputPath,true); //递归删除
}
//最终提交本次job作业
//job.submit();
//采用waitForCompletion提交job,参数表示是否开启实时监视追踪作业的执行情况
boolean resultFlag = job.waitForCompletion(true);
//退出程序 和job结果进行绑定, 0是正常退出,1是异常退出
System.exit(resultFlag ? 0: 1);
}
}
最后
以上就是花痴火为你收集整理的Hadoop编程——第五章:(5)MapReduce自定义对象序列化案例案例1:各州累计病例数量统计一、需求分析二、代码实现的全部内容,希望文章能够帮你解决Hadoop编程——第五章:(5)MapReduce自定义对象序列化案例案例1:各州累计病例数量统计一、需求分析二、代码实现所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复