我是靠谱客的博主 无限西牛,最近开发中收集的这篇文章主要介绍Mapreduce的简单实现和步骤,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

package com.qfedu.bigdata.MR;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

/**

  • @ClassName wordCount
  • @Description TODO
  • @Author Chenfg
  • @Date 2018/9/20 0020 11:22
  • @Version 1.0
  • 词频统计
  • 输出数据:
  • word,n
  • Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
  • 框架在调用咱们写的map方法时,会将数据作为参数(一个key,一个value)传递给map方法
    
  • KEYIN:是框架(maptask)要传递给map方法的输入参数中的key的数据类型
    
  • VALUEIN:是框架(maptask)要产地给map方法的输入参数中的value的数据类型
    
  • 在默认情况下,框架传入的key是框架从待处理数据(文本文件)中读取到的‘某一行’数据的起始偏移量,所以类型Long
    
  • 框架传入的value是框架从待处理数据中读到的‘某一行’的内容,所以类型是String
    
  • 但是,Long或者String等java的原生态数据类型的序列化的效率较低,所以hadoop对其进行了封装改造,
    
  • 有替代品:LongWriable/Text
    
  • map方法处理完数据后需要返回一个结果(一个key一个value的键值对数据)
    
  • KEYOUT:是咱们的map方法处理完成后返回结果中的key的数据类型
    
  • VALUEOUT:是咱们的map方法处理完成后返回结果中的value的数据类型
    
  • 思路:
    
  • map阶段的处理逻辑:
    
  • 1、用空格切分单词
    
  • 2、循环遍历单词
    
  • 3、输出结果,<word,1>
    
  • 
    map方法的调用规律:
    
  • 
    maptask没读取到一行数据就会调用此意map方法
    

*/
public class wordCount {
static class WCMapper extends Mapper<LongWritable, Text, Text, LongWritable>{
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//1、数据类型转换
String line = value.toString();


//2、切分数据
String[] words = line.split(" ");
//3、循环遍历单词
for (String word : words) {
//4、输出结果
context.write(new Text(word),new LongWritable(1));
}
}
}
/**
* Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
*
reduce()方法要接收的输入参数是一个key一个迭代器<T>的values
*
KEYIN:框架(reducetask)要传递给reduce方法的输入参数的key的数据类型
*
VALUEIN:框架(reducetask)要传递给reduce方法的输入参数的value的数据类型
*
*
*
KEYOUT:reduce方法处理后的数据的返回结果的key的数据类型
*
VALUEOUT:reduce方法处理后的数据的返回结果的value的数据类型
*
*
reduce方法调用的规律:框架会从map阶段的输出结果中找出所有的key相同的<k,v>数据对组成一组数据,
*
然后调用一次reduce()方法
*/
static class WCReducer extends Reducer<Text, LongWritable, Text, LongWritable>{
@Override
protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
//为什么用迭代器
/**
* map端的输出数据:
<a,1>
<b,1>
<c,1>
<a,1>
<a,1>
<d,1>
reduce端的输入数据:
<a,<1,1,1>>
*/
int count=0;
//循环遍历迭代器
for (LongWritable i :values) {
count+=i.get();
}
//往下发送数据
context.write(key,new LongWritable(count));
}
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
//1、创建一个Configuration配置项
Configuration conf = new Configuration();
//2、配置连接参数
conf.set("fs.defaultFS","hdfs://qianfeng");
//高可用集群的设置项
conf.set("fs.defaultFS","hdfs://qianfeng");
conf.set("dfs.nameservices","qianfeng");
conf.set("dfs.ha.namenodes.qianfeng","nn1,nn2");
conf.set("dfs.namenode.rpc-address.qianfeng.nn1","hadoop01:9000");
conf.set("dfs.namenode.rpc-address.qianfeng.nn2","hadoop02:9000");
conf.set("dfs.client.failover.proxy.provider.qianfeng","org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider");
//3、创建一个job对象
Job job = Job.getInstance(conf,"wordCount");
//4、描述对象
//5、设置Job的执行路径
job.setJarByClass(wordCount.class);
//6、设置mapTask调用的业务逻辑类
job.setMapperClass(WCMapper.class);
//7、设置map端数据输出的类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
//8、设置mapTask调用的业务类
job.setReducerClass(WCReducer.class);
//9、设置reduce端的数据的输出类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
//10、设置Job的输入文件的路径
FileInputFormat.setInputPaths(job,new Path(args[0]));
//11、设置Job的输出文件的路径
FileOutputFormat.setOutputPath(job,new Path(args[1]));
//12、提交job

// job.submit();
boolean b = job.waitForCompletion(true);


System.exit(b?0:1);
}package com.qfedu.bigdata.MR;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

/**

  • @ClassName wordCount
  • @Description TODO
  • @Author Chenfg
  • @Date 2018/9/20 0020 11:22
  • @Version 1.0
  • 词频统计
  • 输出数据:
  • word,n
  • Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
  • 框架在调用咱们写的map方法时,会将数据作为参数(一个key,一个value)传递给map方法
    
  • KEYIN:是框架(maptask)要传递给map方法的输入参数中的key的数据类型
    
  • VALUEIN:是框架(maptask)要产地给map方法的输入参数中的value的数据类型
    
  • 在默认情况下,框架传入的key是框架从待处理数据(文本文件)中读取到的‘某一行’数据的起始偏移量,所以类型Long
    
  • 框架传入的value是框架从待处理数据中读到的‘某一行’的内容,所以类型是String
    
  • 但是,Long或者String等java的原生态数据类型的序列化的效率较低,所以hadoop对其进行了封装改造,
    
  • 有替代品:LongWriable/Text
    
  • map方法处理完数据后需要返回一个结果(一个key一个value的键值对数据)
    
  • KEYOUT:是咱们的map方法处理完成后返回结果中的key的数据类型
    
  • VALUEOUT:是咱们的map方法处理完成后返回结果中的value的数据类型
    
  • 思路:
    
  • map阶段的处理逻辑:
    
  • 1、用空格切分单词
    
  • 2、循环遍历单词
    
  • 3、输出结果,<word,1>
    
  • 
    map方法的调用规律:
    
  • 
    maptask没读取到一行数据就会调用此意map方法
    

*/
public class wordCount {
static class WCMapper extends Mapper<LongWritable, Text, Text, LongWritable>{
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//1、数据类型转换
String line = value.toString();


//2、切分数据
String[] words = line.split(" ");
//3、循环遍历单词
for (String word : words) {
//4、输出结果
context.write(new Text(word),new LongWritable(1));
}
}
}
/**
* Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
*
reduce()方法要接收的输入参数是一个key一个迭代器<T>的values
*
KEYIN:框架(reducetask)要传递给reduce方法的输入参数的key的数据类型
*
VALUEIN:框架(reducetask)要传递给reduce方法的输入参数的value的数据类型
*
*
*
KEYOUT:reduce方法处理后的数据的返回结果的key的数据类型
*
VALUEOUT:reduce方法处理后的数据的返回结果的value的数据类型
*
*
reduce方法调用的规律:框架会从map阶段的输出结果中找出所有的key相同的<k,v>数据对组成一组数据,
*
然后调用一次reduce()方法
*/
static class WCReducer extends Reducer<Text, LongWritable, Text, LongWritable>{
@Override
protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
//为什么用迭代器
/**
* map端的输出数据:
<a,1>
<b,1>
<c,1>
<a,1>
<a,1>
<d,1>
reduce端的输入数据:
<a,<1,1,1>>
*/
int count=0;
//循环遍历迭代器
for (LongWritable i :values) {
count+=i.get();
}
//往下发送数据
context.write(key,new LongWritable(count));
}
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
//1、创建一个Configuration配置项
Configuration conf = new Configuration();
//2、配置连接参数
conf.set("fs.defaultFS","hdfs://qianfeng");
//高可用集群的设置项
conf.set("fs.defaultFS","hdfs://qianfeng");
conf.set("dfs.nameservices","qianfeng");
conf.set("dfs.ha.namenodes.qianfeng","nn1,nn2");
conf.set("dfs.namenode.rpc-address.qianfeng.nn1","hadoop01:9000");
conf.set("dfs.namenode.rpc-address.qianfeng.nn2","hadoop02:9000");
conf.set("dfs.client.failover.proxy.provider.qianfeng","org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider");
//3、创建一个job对象
Job job = Job.getInstance(conf,"wordCount");
//4、描述对象
//5、设置Job的执行路径
job.setJarByClass(wordCount.class);
//6、设置mapTask调用的业务逻辑类
job.setMapperClass(WCMapper.class);
//7、设置map端数据输出的类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
//8、设置mapTask调用的业务类
job.setReducerClass(WCReducer.class);
//9、设置reduce端的数据的输出类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
//10、设置Job的输入文件的路径
FileInputFormat.setInputPaths(job,new Path(args[0]));
//11、设置Job的输出文件的路径
FileOutputFormat.setOutputPath(job,new Path(args[1]));
//12、提交job

// job.submit();
boolean b = job.waitForCompletion(true);


System.exit(b?0:1);
}

}

}

最后

以上就是无限西牛为你收集整理的Mapreduce的简单实现和步骤的全部内容,希望文章能够帮你解决Mapreduce的简单实现和步骤所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(56)

评论列表共有 0 条评论

立即
投稿
返回
顶部