概述
原文链接:http://blog.fens.me/hadoop-mapreduce-log-kpi/
数据类来源:网站访问记录
先上代码:
对原作者的代码进行了部分更改以适应新版本的Hadoop,同时记录下自己的问题和查找答案,方便以后快速回忆
package org.apache.hadoop.examples;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.HashSet;
import java.util.Locale;
import java.util.Set;
public class Kpi {//bean,将数据封装,转化为String
private String remote_add;
private String remote_user;
private String time_local;
private String request;
private String statues;
private String body_bytes_sent;
private String http_referer;
private String http_user_agent;
private boolean valid = true;
public String toString(){
StringBuilder sb = new StringBuilder();
sb.append("valid:"+this.valid);
sb.append("nremote:_addr:"+this.remote_add);
sb.append("nremote_user:"+this.remote_user);
sb.append("ntime_local:"+this.time_local);
sb.append("request:"+this.request);
sb.append("nstatues:"+this.statues);
sb.append("nbody_statues:"+this.body_bytes_sent);
sb.append("nhttp_referer:"+this.http_referer);
sb.append("nhttp_user_agent:"+this.http_user_agent);
return sb.toString();
}
public String getRemote_add() {
return remote_add;
}
public void setRemote_add(String remote_add) {
this.remote_add = remote_add;
}
public String getRemote_user() {
return remote_user;
}
public void setRemote_user(String remote_user) {
this.remote_user = remote_user;
}
public String getTime_local() {
return time_local;
}
public void setTime_local(String time_local) {
this.time_local = time_local;
}
public String getRequest() {
return request;
}
public void setRequest(String request) {
this.request = request;
}
public String getStatues() {
return statues;
}
public void setStatues(String statues) {
this.statues = statues;
}
public String getBody_bytes_sent() {
return body_bytes_sent;
}
public void setBody_bytes_sent(String body_bytes_sent) {
this.body_bytes_sent = body_bytes_sent;
}
public String getHttp_referer() {
if(http_referer.length()<8){
return http_referer;
}
String str = this.http_referer.replace("""," ").replace("http://", "").replace("https://", "");
return str.indexOf("/")>0?str.substring(0,str.indexOf("/")):str;
}
public void setHttp_referer(String http_referer) {
this.http_referer = http_referer;
}
public String getHttp_user_agent() {
return http_user_agent;
}
public void setHttp_user_agent(String http_user_agent) {
this.http_user_agent = http_user_agent;
}
public boolean isValid() {
return valid;
}
public void setValid(boolean valid) {
this.valid = valid;
};
public java.util.Date getTime_local_Date() throws ParseException{
SimpleDateFormat sdf = new SimpleDateFormat("dd/MMM/yyyy:HH:mm:ss",Locale.US);
return sdf.parse(this.time_local);
}
public String getTime_local_Date_hour() throws ParseException{
SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHH");
return sdf.format(this.getTime_local_Date());
}
public static void main(String args[]) {//测试是否正确分割
String line = "222.68.172.190 - - [18/Sep/2013:06:49:57 +0000] "GET /images/my.jpg HTTP/1.1" 200 19939 "http://www.angularjs.cn/A00n" "Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/29.0.1547.66 Safari/537.36"";
System.out.println(line);
Kpi kpi = new Kpi();
String[] arr = line.split(" ");
kpi.setRemote_add(arr[0]);
kpi.setRemote_user(arr[1]);
kpi.setTime_local(arr[3].substring(1));
kpi.setRequest(arr[6]);
kpi.setStatues(arr[8]);
kpi.setBody_bytes_sent(arr[9]);
kpi.setHttp_referer(arr[10]);
kpi.setHttp_user_agent(arr[11] + " " + arr[12]);
System.out.println(kpi);
try {
SimpleDateFormat df = new SimpleDateFormat("yyyy.MM.dd:HH:mm:ss", Locale.US);
System.out.println(df.format(kpi.getTime_local_Date()));
System.out.println(kpi.getTime_local_Date_hour());
System.out.println(kpi.getHttp_referer());
} catch (ParseException e) {
e.printStackTrace();
}
}
private static Kpi parser(String line) {
System.out.println(line);
Kpi kpi = new Kpi();
String[] arr = line.split(" ");
if (arr.length > 11) {
kpi.setRemote_add(arr[0]);
kpi.setRemote_user(arr[1]);
kpi.setTime_local(arr[3].substring(1));
kpi.setRequest(arr[6]);
kpi.setStatues(arr[8]);
kpi.setBody_bytes_sent(arr[9]);
kpi.setHttp_referer(arr[10]);
if (arr.length > 12) {
kpi.setHttp_user_agent(arr[11] + " " + arr[12]);
} else {
kpi.setHttp_user_agent(arr[11]);
}
if (Integer.parseInt(kpi.getStatues()) >= 400) {// 大于400,HTTP错误
kpi.setValid(false);
}
} else {
kpi.setValid(false);
}
return kpi;
}
/**
* 按page的pv分类, fliter the pages that setted
*
*/
public static Kpi filterPVs(String line) {
Kpi kpi = parser(line);
/* Set pages = new HashSet();
pages.add("/about");
pages.add("/black-ip-list/");
pages.add("/cassandra-clustor/");
pages.add("/finance-rhive-repurchase/");
pages.add("/hadoop-family-roadmap/");
pages.add("/hadoop-hive-intro/");
pages.add("/hadoop-zookeeper-intro/");
pages.add("/hadoop-mahout-roadmap/");
if (!pages.contains(kpi.getRequest())) {
kpi.setValid(false);
}*/
return kpi;
}
}
package org.apache.hadoop.examples;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.examples.WordCount.IntSumReducer;
import org.apache.hadoop.examples.WordCount.TokenizerMapper;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class KpiPv {//MapReduce分析指定pv(page view)数据
/*
* MapReduce工作过程分为map阶段和reduce阶段。每个阶段都有键值对作为输入和输出,并且它们的类型是由程序员指定的。同时程序员需要做的工作是编写map和reduce函数。
* map阶段输入的key是在文件开头部分文本起始处的偏移量,但是一般没有这方面的需要,所以可以忽略。可以在map阶段进行数据的筛选
* Hadoop规定了自己的一套可用于网络序列化的基本类型,便于RPC等功能的实现。所以没有使用Java内置内型。可以简单的认为Text类型相当于java的String,IntWritable相当于Integer
* map的输入参数是个 Text之类的 对象,并不是 file对象
* 1. 怎么将 文件参数 传递 到 job中呢?在 client 我们调用了FileInputFormat.addInputPath(job, new Path(otherArgs[0]));FileInputFormat 实现了 InputFormat 接口
* 在 InputFormat 接口中 有getSplits方法,也就是说分片操作实际上实在 map之前 就已经做好了
* 2.计算出来的分片有时怎么传递给 map呢 ?对于单词数量如何累加?
* nputFormat中的另一个方法createRecordReader() 这个方法:RecordReader:RecordReader是用来从一个输入分片中读取一个一个的K -V 对的抽象类,
* 它最主要的方法就是nextKeyvalue()方法,由它获取分片上的下一个K-V 对
* Hadoop把输入数据划分为等长的小数据发送到MapReduce,称为分片(input split)每个分片都创建一个map任务,由它来运行用户自定义的map函数来分析每个分片中的记录
*/
public static class CountMapper extends
Mapper<Object, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);//这里的one和word的作用是什么?它们是全局变量
private Text word = new Text();
public void map(Object key, Text value, Context context)
throws IOException, InterruptedException {//map函数中的参数分别对应的是:输入的键,输入的值,上下文对象(充当旧版本的OutputCollector以及Reporter角色)
//输入的键是文本的偏移量,通常我们不需要管,输入的值是一行文本,所以对于用户的文件,MapReduce框架进行切割处理之后,对于每一行文本都会调用map函数进行处理
//map函数的作用在于,对于输入的key-value对进行处理,得到新的key-value输出作为reduce函数的输入
//map函数确认每一行的依据是什么?换行符?
Kpi kpi = Kpi.filterPVs(value.toString());
if (kpi.isValid()) {//数据筛选
word.set(kpi.getRequest());
context.write(word, one);
}
/*
* while (itr.hasMoreTokens()) { word.set(itr.nextToken());
* context.write(word, one); }
*/
}
}
/*
public static class KPIPVReducer extends MapReduceBase implements Reducer {
private IntWritable result = new IntWritable();
@Override
public void reduce(Text key, Iterator values, OutputCollector output, Reporter reporter) throws IOException {
//reduce任务的输入是所有map任务的输出所组成的集合,它们通过网络进行传输汇集
int sum = 0;
while (values.hasNext()) {
sum += values.next().get();
}
result.set(sum);
output.collect(key, result);
}
}
*/
public static class KPiReducer extends
Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
// String[] otherArgs = new GenericOptionsParser(conf, args)
// .getRemainingArgs();
String[] otherArgs = new String[] { "input01", "output03" };
if (otherArgs.length != 2) {
System.err.println("Usage: wordcount <in> <out>");
System.exit(2);
}
Job job = new Job(conf, "word count");
job.setJarByClass(KpiPv.class);
job.setMapperClass(CountMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));//将文件路径加载到了conf中;定义输入的路径可以是单个文件,也可以是目录,或者是文件模式的路径,并且可以被调用多次
//从而实现使用多路径输入。
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));//指定了reduce函数输出文件写入的目录,在运行job前该目录不应该存在,否则会提示错误,这样式为了防止数据被覆盖
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
最后
以上就是小巧野狼为你收集整理的MapReduce分析日志---代码流程解读的全部内容,希望文章能够帮你解决MapReduce分析日志---代码流程解读所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复