我是靠谱客的博主 正直秀发,最近开发中收集的这篇文章主要介绍网站访问日志(二)通过Mapreduce 初步清洗数据得到weblogbean数据,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

网站访问日志(二)通过Mapreduce 初步清洗数据得到weblogbean数据

原始数据文件:百度网盘 /02.参考资料网站流量日志分析:项目脚本-配置代码/access.log.fensi

需求:对原始数据进行清洗 ,得到我们想要的11个字段数据。同时对不合法的数据标记为false 【1.状态码>400的 2.请求的request中包含过滤的静态资源的字段的 3.通过空格split 长度小于11的都标记为false 注意:数据还要 只是标记为false】

下面我们开始写代码:
创建 三个类:1.WebLogBean 【将我们想得到的数据封装成一个weblogbean对象】
2.WebLogParse 【封装好的一些项目中需要调用的方法 方便调用】
3WebLongPreProcess【 数据预处理类 这里直接将 将map reduce driver三个阶段封装得到这一个类中】
代码在下面 我先小总一下:
注意点:1.时间格式 2.静态资源过滤 【创建一个set 添加要过滤的字段,在过滤时我们contains(bean.getRequest()即可判断】/ 脏数据/最后一个字段数据太长 我们需要拼接
3.将清洗后的数据结构化 设置为 “01" 默认分割符。

过程中的编程小技巧:
- 数据预处理使用mr来进行,此外python shell 也可以进程处理

  • java语言中封装了很多工具类 md5 simpledataformat
  • mr可以进行分布式的并行计算处理
  • 在预处理中,如果涉及多属于数据传递 通常是建立与之对应的javabean携带数据传递
    注意要实现hadoop序列化机制 writable
  • 有意识的把javabean中toSting方法重写,以01进行分割,方便后续数据入hive
  • 如涉及不合规脏数据 往往采用逻辑删除 也就是自定义标记位 1 or 0来表示数据是否有效
  • 如果涉及数据最后一条不固定导致切割返回数组不定 可以采用从固定位拼接至最后的方式执行
  • 静态资源过滤 在一次request中,jpg,css,js称之为静态资源
    /js
    /css
    /img
    要想过滤静态资源 可以从请求的url上下手
    /index.html
    /js/jquery.min.js

扩展需求:在预处理中,需求根据ip查询对应的省份信息。

对于mr中,频繁使用且数据量不大的数据如何进行优化。

能不能一次查询出来,放在某个地方(在mr中哪里查 放在哪)

mr编程模型中 父类提供了set方法 (Called once at the beginning of the task.) 可以把需要频繁使用的小数据提取在该方法中加载到当前执行的内存中缓存起来。

代码如下:

1.WebLogBean

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Locale;

import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;

/**

  • 对接外部数据的层,表结构定义最好跟外部数据源保持一致
  • 术语: 贴源表
  • @author

*/
public class WebLogBean implements Writable {

private boolean valid = true;// 判断数据是否合法
private String remote_addr;// 记录客户端的ip地址
private String remote_user;// 记录客户端用户名称,忽略属性"-"
private String time_local;// 记录访问时间与时区
private String request;// 记录请求的url与http协议
private String status;// 记录请求状态;成功是200
private String body_bytes_sent;// 记录发送给客户端文件主体内容大小
private String http_referer;// 用来记录从那个页面链接访问过来的
private String http_user_agent;// 记录客户浏览器的相关信息
public void set(boolean valid,String remote_addr, String remote_user, String time_local, String request, String status, String body_bytes_sent, String http_referer, String http_user_agent) {
this.valid = valid;
this.remote_addr = remote_addr;
this.remote_user = remote_user;
this.time_local = time_local;
this.request = request;
this.status = status;
this.body_bytes_sent = body_bytes_sent;
this.http_referer = http_referer;
this.http_user_agent = http_user_agent;
}
public String getRemote_addr() {
return remote_addr;
}
public void setRemote_addr(String remote_addr) {
this.remote_addr = remote_addr;
}
public String getRemote_user() {
return remote_user;
}
public void setRemote_user(String remote_user) {
this.remote_user = remote_user;
}
public String getTime_local() {
return this.time_local;
}
public void setTime_local(String time_local) {
this.time_local = time_local;
}
public String getRequest() {
return request;
}
public void setRequest(String request) {
this.request = request;
}
public String getStatus() {
return status;
}
public void setStatus(String status) {
this.status = status;
}
public String getBody_bytes_sent() {
return body_bytes_sent;
}
public void setBody_bytes_sent(String body_bytes_sent) {
this.body_bytes_sent = body_bytes_sent;
}
public String getHttp_referer() {
return http_referer;
}
public void setHttp_referer(String http_referer) {
this.http_referer = http_referer;
}
public String getHttp_user_agent() {
return http_user_agent;
}
public void setHttp_user_agent(String http_user_agent) {
this.http_user_agent = http_user_agent;
}
public boolean isValid() {
return valid;
}
public void setValid(boolean valid) {
this.valid = valid;
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append(this.valid);
sb.append("01").append(this.getRemote_addr());
sb.append("01").append(this.getRemote_user());
sb.append("01").append(this.getTime_local());
sb.append("01").append(this.getRequest());
sb.append("01").append(this.getStatus());
sb.append("01").append(this.getBody_bytes_sent());
sb.append("01").append(this.getHttp_referer());
sb.append("01").append(this.getHttp_user_agent());
return sb.toString();
}
public void readFields(DataInput in) throws IOException {
this.valid = in.readBoolean();
this.remote_addr = in.readUTF();
this.remote_user = in.readUTF();
this.time_local = in.readUTF();
this.request = in.readUTF();
this.status = in.readUTF();
this.body_bytes_sent = in.readUTF();
this.http_referer = in.readUTF();
this.http_user_agent = in.readUTF();
}
public void write(DataOutput out) throws IOException {
out.writeBoolean(this.valid);
out.writeUTF(null==remote_addr?"":remote_addr);
out.writeUTF(null==remote_user?"":remote_user);
out.writeUTF(null==time_local?"":time_local);
out.writeUTF(null==request?"":request);
out.writeUTF(null==status?"":status);
out.writeUTF(null==body_bytes_sent?"":body_bytes_sent);
out.writeUTF(null==http_referer?"":http_referer);
out.writeUTF(null==http_user_agent?"":http_user_agent);
}

}

2.WebLogParse

import org.apache.hadoop.io.Writable;

import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Locale;
import java.util.Set;

public class WebLogParser {

public static SimpleDateFormat df1 = new SimpleDateFormat("dd/MMM/yyyy:HH:mm:ss", Locale.US);
public static SimpleDateFormat df2 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss", Locale.US);
public static WebLogBean parser(String line) {
WebLogBean webLogBean = new WebLogBean();
String[] arr = line.split(" ");
if (arr.length > 11) {
webLogBean.setRemote_addr(arr[0]);
webLogBean.setRemote_user(arr[1]);
String time_local = formatDate(arr[3].substring(1));
if(null==time_local || "".equals(time_local)) time_local="-invalid_time-";
webLogBean.setTime_local(time_local);
webLogBean.setRequest(arr[6]);
webLogBean.setStatus(arr[8]);
webLogBean.setBody_bytes_sent(arr[9]);
webLogBean.setHttp_referer(arr[10]);
//如果useragent元素较多,拼接useragent
if (arr.length > 12) {
StringBuilder sb = new StringBuilder();
for(int i=11;i<arr.length;i++){
sb.append(arr[i]);
}
webLogBean.setHttp_user_agent(sb.toString());
} else {
webLogBean.setHttp_user_agent(arr[11]);
}
if (Integer.parseInt(webLogBean.getStatus()) >= 400) {// 大于400,HTTP错误
webLogBean.setValid(false);
}
if("-invalid_time-".equals(webLogBean.getTime_local())){
webLogBean.setValid(false);
}
} else {
webLogBean=null;
}
return webLogBean;
}
public static void filtStaticResource(WebLogBean bean, Set<String> pages) {
if (!pages.contains(bean.getRequest())) {
bean.setValid(false);
}
}
//格式化时间方法
public static String formatDate(String time_local) {
try {
return df2.format(df1.parse(time_local));
} catch (ParseException e) {
return null;
}
}

}

3.WebLongPreProcess

package cn.weblog;

import java.io.IOException;
import java.util.HashSet;
import java.util.Set;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/**

  • 处理原始日志,过滤出真实pv请求 转换时间格式 对缺失字段填充默认值 对记录标记valid和invalid

*/

public class WeblogPreProcess {

static class WeblogPreProcessMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
// 用来存储网站url分类数据
Set<String> pages = new HashSet<String>();
Text k = new Text();
NullWritable v = NullWritable.get();
/**
* 从外部配置文件中加载网站的有用url分类数据 存储到maptask的内存中,用来对日志数据进行过滤
*/
@Override
protected void setup(Context context) throws IOException, InterruptedException {
pages.add("/about");
pages.add("/black-ip-list/");
pages.add("/cassandra-clustor/");
pages.add("/finance-rhive-repurchase/");
pages.add("/hadoop-family-roadmap/");
pages.add("/hadoop-hive-intro/");
pages.add("/hadoop-zookeeper-intro/");
pages.add("/hadoop-mahout-roadmap/");
}
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
WebLogBean webLogBean = WebLogParser.parser(line);
if (webLogBean != null) {
// 过滤js/图片/css等静态资源
WebLogParser.filtStaticResource(webLogBean, pages);
/* if (!webLogBean.isValid()) return; */
k.set(webLogBean.toString());
context.write(k, v);
}
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(WeblogPreProcess.class);
job.setMapperClass(WeblogPreProcessMapper.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
FileInputFormat.setInputPaths(job, new Path("hdfs://note1:9000/weblog/input"));
FileOutputFormat.setOutputPath(job, new Path("hdfs://note1:9000/weblog/output"));
job.setNumReduceTasks(0);
boolean res = job.waitForCompletion(true);
System.exit(res?0:1);
}

}

最后

以上就是正直秀发为你收集整理的网站访问日志(二)通过Mapreduce 初步清洗数据得到weblogbean数据的全部内容,希望文章能够帮你解决网站访问日志(二)通过Mapreduce 初步清洗数据得到weblogbean数据所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(37)

评论列表共有 0 条评论

立即
投稿
返回
顶部