概述
所有代码:github-wttttt
任务
- 统计每个IP地址的访问次数
- 查找访问数最多的前K个IP地址
分析:
- 任务1很简单,简单的求和问题,用来重新熟悉hadoop MR程序的写法。
- 优化:使用combiner()减少网络中的流量传输;
- 这个例子中combiner和reducer的逻辑相同,两种使用同一个reduce即可。
- 代码贴在附录里了,注释详细,可查看~
- 任务2是一个TopK的问题,要点有以下几个:
- 使用TreeMap来得到TopK,有点类似大根堆;
- 每个mapper得到该mapper的TopK;
- mapper处理完了相应的input split之后才输出,使用cleanup函数来达到该目的;
- 仅启动一个reducer以取得全局的TopK。TopK的方法类似mapper。
- 注:这一段没看明白的宝宝可以看附录3,我引用了别人的一句话,可能讲得比我清晰- -
附录:
- 任务1的代码:
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
// LogCount: count IP Address's visits
// map(): map each line to <IPAddress, 1>
// reduce(): add to <IPAddress, n>
// use combiner() to optimize
public class LogCount{
public static class xxMapper
extends Mapper<Object, Text, Text, IntWritable>{ // extends继承类
private final static IntWritable one = new IntWritable(1); // final常量
private Text word = new Text();
public void map(Object key, Text value, Context context
) throws IOException, InterruptedException{
// for each line
word.set(value.toString().split(" ")[0]);
context.write(word,one);
}
}
public static class xxReducer
extends Reducer<Text, IntWritable, Text, IntWritable>{ // extends继承类
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException{
int sum = 0;
// for each key, count it
for (IntWritable val : values){
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
public static void main(String[] args) throws Exception{
Configuration conf = new Configuration();
// 从输入获取剩下的配置:包括输入和输出路径
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
// 输入不合理检测
if (otherArgs.length < 2){
System.err.println("Usage: wordcount <in> [<in>...] <out>");
System.exit(2);
}
Job job = new Job(conf, "Log Count");
job.setJarByClass(LogCount.class);
job.setMapperClass(xxMapper.class);
// combiner和reducer使用同一个class,当如果combiner处理逻辑相同时
// 否则,为combiner写一个类,一般xxcombiner也是继承自Reducer
job.setCombinerClass(xxReducer.class); // combiner and reducer use the same class
job.setReducerClass(xxReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
for (int i = 0; i < otherArgs.length - 1; ++i){
FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
}
FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length - 1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
- 任务2的代码:
import java.io.IOException;
import java.util.Iterator;
import java.util.Map;
import java.util.TreeMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
/**
TopK 问题
Log_Max_k: find the max_k visiter's IP Address
map(): get TopK for each mapper
* Use TreeMap to store topK for each mapper
* For each mapper:
for each record, we try to updata the treemap, and finally we get TopK
* TreeMap is somewhat like a 'large root heap'.
* Unlike usual(write after one line), we write after all the input split is handled.
this is realized by the function 'cleanup'(conducted after the mapper task).
reduce(): get the global TopK in one Reducer
* we need just one Reducer to ensure top-k
TopK的k值是从外部(命令行)传给Mapper&Reducer
利用conf.set()以及conf.get()
**/
public class Log_Max_k {
public static class xxMap extends
Mapper<LongWritable, Text, Text, IntWritable>{
/**
* the map function
* input file: format as: IPAddresstVisitNum (for each line)
*/
// TODO: <String, Integer> or <Text, Integer>
private TreeMap<Integer, Text> tree = new TreeMap<Integer, Text>();
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException{
// TODO: conf.set() in function run()
//在map方法中通过Context对象获取conf对象,进而取得参数值
Configuration conf = context. getConfiguration();
int K = conf.getInt("K_value", 10); // default = 10
String[] values = value.toString().split("t"); // Tab split
//int visit_num = Integer.parseInt(values[1]);
//String IPAddress = values[0];
Text txt = new Text();
txt.set(values[0]);
tree.put(Integer.parseInt(values[1]), txt);
if (tree.size() > K){
tree.remove(tree.firstKey()); // store the top-k
}
}
@Override
protected void cleanup(Context context) throws IOException,
InterruptedException{
/**
* write after all the input split is handled, by the function cleanup()
*/
// iterate on the treemap, use Iterator
Iterator iter = tree.entrySet().iterator();
while (iter.hasNext()){
@SuppressWarnings("unchecked")
Map.Entry<Integer, Text> ent = (Map.Entry<Integer, Text>)iter.next();
// Map.Entry ent = (Map.Entry)iter.next();
// write: IPAddress Visit_num
context.write(ent.getValue(), new IntWritable(ent.getKey().intValue()));
}
}
}
public static class xxReduce extends Reducer<Text, IntWritable, Text, IntWritable>{
private TreeMap<IntWritable, Text> tree = new TreeMap<IntWritable, Text>();
public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException{
Configuration conf = context.getConfiguration();
int K = conf.getInt("K_value", 10); // default = 10
for(IntWritable visit_num: values){
tree.put(visit_num, key);
if (tree.size() > K){
tree.remove(tree.firstKey());
}
}
// iterate on tree, to write top-k
Iterator iter = tree.entrySet().iterator();
while (iter.hasNext()){
Map.Entry<IntWritable, Text> ent =(Map.Entry<IntWritable, Text>)iter.next();
context.write(ent.getValue(), ent.getKey());
}
}
}
public static void main(String[] args) throws Exception{
Configuration conf = new Configuration();
// 从输入获取剩下的配置:包括输入和输出路径
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
// 输入不合理检测
if (otherArgs.length < 3){
System.err.println("Usage: wordcount <K> <in> [<in>...] <out>");
System.exit(2);
}
Job job = new Job(conf, "TopKIP");
job.setJarByClass(Log_Max_k.class);
job.setMapperClass(xxMap.class);
// job.setCombinerClass(xxReducer.class); // combiner and reducer use the same class
job.setReducerClass(xxReduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
conf.set("K_value", otherArgs[0]);
job.setNumReduceTasks(1); // set the reducer num to 1
for (int i = 1; i < otherArgs.length - 1; ++i){
FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
}
FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length - 1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
附录3:关于TopK问题的详细思路:
- Mappers
使用默认的mapper数据,一个input split(输入分片)由一个mapper来处理。
在每一个map task中,我们找到这个input split的前k个记录。这里我们用TreeMap这个数据结构来保存top K的数据,这样便于更新。下一步,我们来加入新记录到TreeMap中去(这里的TreeMap我感觉就是个大顶堆)。在map中,我们对每一条记录都尝试去更新TreeMap,最后我们得到的就是这个分片中的local top k的k个值。在这里要提醒一下,以往的mapper中,我们都是处理一条数据之后就context.write或者output.collector一次。而在这里不是,这里是把所有这个input split的数据处理完之后再进行写入。所以,我们可以把这个context.write放在cleanup里执行。cleanup就是整个mapper task执行完之后会执行的一个函数。
2.reducers
由于我前面讲了很清楚了,这里只有一个reducer,就是对mapper输出的数据进行再一次汇总,选出其中的top k,即可达到我们的目的。Note that we are using NullWritable here. The reason for this is we want all of the outputs from all of the mappers to be grouped into a single key in the reducer.
- Mappers
最后
以上就是迷人大树为你收集整理的Hadoop实战之课后题--分析web服务器的日志文件的全部内容,希望文章能够帮你解决Hadoop实战之课后题--分析web服务器的日志文件所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复