我是靠谱客的博主 迷路仙人掌,最近开发中收集的这篇文章主要介绍使用MapReduce对Hadoop下的日志记录进行分析处理,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

一. 简介

MapReduce是一个高性能的批处理分布式计算框架,用于对海量数据进行并行分析和处理。与传统方法相比较,MapReduce更倾向于蛮力去解决问题,通过简单、粗暴、有效的方式去处理海量的数据。通过对数据的输入、拆分与组合(核心),将任务分配到多个节点服务器上,进行分布式计算,这样可以有效地提高数据管理的安全性,同时也能够很好地范围被管理的数据。
mapreduce核心就是map+shuffle+reducer,首先通过读取文件,进行分片,通过map获取文件的key-value映射关系,用作reducer的输入,在作为reducer输入之前,要先对map的key进行一个shuffle,也就是排个序,然后将排完序的key-value作为reducer的输入进行reduce操作,当然一个mapreduce任务可以不要有reduce,只用一个map
其实现在MapReduce已经被Spark取代了,不过作为对大数据的学习,还是要稍微了解一下,下面是我学习过程中看过和写过的例子。

二. Hadoop自带的WordCount

2.1. 创建一个Maven项目,目录结构如下:

2.2. pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>sun</groupId>
    <artifactId>hadoop-MapReduce</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <hadoopVersion>2.6.0</hadoopVersion>
    </properties>

    <dependencies>
        <!-- Hadoop start -->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>${hadoopVersion}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>${hadoopVersion}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-core</artifactId>
            <version>${hadoopVersion}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>${hadoopVersion}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-it</artifactId>
            <version>1.1.2</version>
        </dependency>
        <!-- Hadoop -->
        <dependency>
            <groupId>jdk.tools</groupId>
            <artifactId>jdk.tools</artifactId>
            <version>1.8</version>
            <scope>system</scope>
            <systemPath>${JAVA_HOME}/lib/tools.jar</systemPath>
        </dependency>
        <dependency>
            <groupId>org.jetbrains</groupId>
            <artifactId>annotations-java5</artifactId>
            <version>RELEASE</version>
            <scope>compile</scope>
        </dependency>
    </dependencies>
</project>

2.3. WordCount.java

import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class WordCount
{
    public static void main(String[] args)
            throws Exception
    {
        Configuration conf = new Configuration();
        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
        if (otherArgs.length < 2) {
            System.err.println("Usage: wordcount <in> [<in>...] <out>");
            System.exit(2);
        }
        Job job = Job.getInstance(conf, "word count");
        job.setJarByClass(WordCount.class);
        job.setMapperClass(TokenizerMapper.class);
        job.setCombinerClass(IntSumReducer.class);
        job.setReducerClass(IntSumReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        for (int i = 0; i < otherArgs.length - 1; i++) {
            FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
        }
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[(otherArgs.length - 1)]));

        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }

    public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable>
    {
        private IntWritable result = new IntWritable();

        public void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context)
                throws IOException, InterruptedException
        {
            int sum = 0;
            for (IntWritable val : values) {
                sum += val.get();
            }
            this.result.set(sum);
            context.write(key, this.result);
        }
    }

    public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>
    {
        private static final IntWritable one = new IntWritable(1);
        private Text word = new Text();

        public void map(Object key, Text value, Mapper<Object, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException
        {
            StringTokenizer itr = new StringTokenizer(value.toString());
            while (itr.hasMoreTokens()) {
                this.word.set(itr.nextToken());
                context.write(this.word, one);
            }
        }
    }
}

三. 我的例子:

3.1. 将WordCount的结果上传到HBase:

WordCountUpLoadToHBase.java:

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableReducer;

public class WordCountUpLoadToHBase extends Configured {

    public static class WCHBaseMapper extends Mapper<Object, Text, ImmutableBytesWritable, IntWritable> {
        private final static IntWritable one = new IntWritable(1);
        private Text word = new Text();

        public void map(Object key,Text value,Context context) throws IOException, InterruptedException{
            StringTokenizer strs = new StringTokenizer(value.toString());
            while(strs.hasMoreTokens()){
                word.set(strs.nextToken());
                context.write(new ImmutableBytesWritable(Bytes.toBytes(word.toString())), one);
            }
        }

    }

    public static class WCHBaseReducer extends TableReducer<ImmutableBytesWritable, IntWritable, ImmutableBytesWritable>{

        public void reduce(ImmutableBytesWritable key,Iterable<IntWritable> values,Context context) throws IOException, InterruptedException{
            int sum = 0;
            for(IntWritable val:values){
                sum += val.get();
            }
            Put put = new Put(key.get());
            put.add(Bytes.toBytes("content"),Bytes.toBytes("count"),Bytes.toBytes(sum+""));
            context.write(key, put);
        }
    }


    @SuppressWarnings("all")
    public static void main(String[] args) throws MasterNotRunningException, ZooKeeperConnectionException, IOException, ClassNotFoundException, InterruptedException {
        // TODO Auto-generated method stub
        String tableName = "wordcount";
        Configuration conf = HBaseConfiguration.create();
        conf.set("hbase.zookeeper.quorum","hadoop");
        conf.set("hbase.zookeeper.property.clientPort","2181");

        HBaseAdmin admin = new HBaseAdmin(conf);
        //如果表格存在就删除
        if(admin.tableExists(tableName)){
            admin.disableTable(tableName);
            admin.deleteTable(tableName);
        }
        HTableDescriptor tableDescriptor = new HTableDescriptor(tableName);
        HColumnDescriptor columnDescriptor =new HColumnDescriptor("content");
        tableDescriptor.addFamily(columnDescriptor);
        admin.createTable(tableDescriptor);

        Job job = new Job(conf,"upload to hbase");
        job.setJarByClass(WordCountUpLoadToHBase.class);
        job.setMapperClass(WCHBaseMapper.class);
        TableMapReduceUtil.initTableReducerJob(tableName, WCHBaseReducer.class, job,null,null,null,null,false);

        job.setMapOutputKeyClass(ImmutableBytesWritable.class);
        job.setMapOutputValueClass(IntWritable.class);
        job.setOutputKeyClass(ImmutableBytesWritable.class);
        job.setOutputValueClass(Put.class);
        FileInputFormat.addInputPaths(job, "hdfs://hadoop:9000/agentlog/*");
        System.exit(job.waitForCompletion(true)?0:1);

    }

}

3.2. 从HBase读取数据

MRReadFromHbase.java:
import java.io.IOException;
import java.util.Map;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class MRReadFromHbase extends Configured {

    public static class WCHBaseMapper extends TableMapper<Text, Text>{

        @Override
        public void map(ImmutableBytesWritable key,Result values,Context context) throws IOException, InterruptedException{
            StringBuffer sb =new StringBuffer("");
            for(Map.Entry<byte[], byte[]> value:values.getFamilyMap("content".getBytes()).entrySet()){
                String  str =new String(value.getValue());
                if(str!=null){
                    sb.append(str);
                }
                context.write(new Text(key.get()), new Text(sb.toString()));
            }
        }
    }

    public static class WCHBaseReducer extends Reducer<Text, Text, Text, Text>{
        private Text result =new Text();
        public void reduce(Text key,Iterable<Text> values,Context context) throws IOException, InterruptedException{
            for(Text val:values){
                result.set(val);
                context.write(key,result);
            }
        }
    }

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        // TODO Auto-generated method stub
        String tableName = "wordcount";
        Configuration conf =HBaseConfiguration.create();
        conf.set("hbase.zookeeper.quorum", "hadoop");
        conf.set("hbase.zookeeper.property.clientPort", "2181");

        Job job =new Job(conf,"read from hbase to hdfs");
        job.setJarByClass(MRReadFromHbase.class);
        job.setReducerClass(WCHBaseReducer.class);
        TableMapReduceUtil.initTableMapperJob(tableName, new Scan(), WCHBaseMapper.class, Text.class, Text.class, job);
        FileOutputFormat.setOutputPath(job, new Path("hdfs://hadoop1:9000/user/sun/hbase"));
        System.exit(job.waitForCompletion(true)?0:1);
    }

}

3.3. 我的自定义格式的日志数据处理:

日志以[format:1][user:AAA][interface:/bt/btCourse/get][date:2018/10/10]的格式存储,针对[format:1]开头的数据,根据不同用户user进行排序统计。

UserLog.java:

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.*;

public class UserLog
{
    public static void main(String[] args)
            throws Exception
    {
        Configuration conf = new Configuration();
        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
        if (otherArgs.length < 2) {
            System.err.println("Usage: wordcount <in> [<in>...] <out>");
            System.exit(2);
        }
        Job job = Job.getInstance(conf, "word count");
        job.setJarByClass(UserLog.class);
        job.setMapperClass(TokenizerMapper.class);
        job.setCombinerClass(IntSumReducer.class);
        job.setReducerClass(IntSumReducer.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        for (int i = 0; i < otherArgs.length - 1; i++) {
            FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
        }
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[(otherArgs.length - 1)]));

        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }

    // map将输入中的value复制到输出数据的key上,并直接输出
    public static class TokenizerMapper extends Mapper<Object, Text, Text, Text> {
        private static Text line = new Text();// 每行数据

        // 实现map函数
        public void map(Object key, Text value, Context context)
                throws IOException, InterruptedException {
            if (value.toString().startsWith("[format:1]")) {
                context.write(new Text(getParameter(value, "user") + "|" + getParameter(value, "time") + "|" + getParameter(value, "html")), new Text(""));
            }
        }
    }

/*    // reduce将输入中的key复制到输出数据的key上,并直接输出
    public static class IntSumReducer extends Reducer<Text, Text, Text, Text> {
        // 实现reduce函数
        public void reduce(Text key, Iterable<Text> values, Context context)
                throws IOException, InterruptedException {
            for (Text val : values) {
                context.write(key, val);
            }
        }
    }*/

    public static class IntSumReducer extends Reducer<Text, Text, Text, Text> {

        //定义treeMap来保持统计结果,由于treeMap是按key升序排列的,这里要人为指定Comparator以实现倒排
        private TreeMap<String, String> treeMap = new TreeMap<String, String>(new Comparator<String>() {
            //@Override
            public int compare(String x, String y) {
                return x.compareTo(y);
            }
        });

        //定义treeMap来保持统计结果,由于treeMap是按key升序排列的,这里要人为指定Comparator以实现倒排
        private TreeMap<String, Long> treeMapResult = new TreeMap<String, Long>(new Comparator<String>() {
            //@Override
            public int compare(String x, String y) {
                return x.compareTo(y);
            }
        });

        public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
            //reduce后的结果放入treeMap,而不是向context中记入结果
            treeMap.put(key.toString(), key.toString());
        }

        protected void cleanup(Context context) throws IOException, InterruptedException {

            if (StringUtils.isBlank(context.getCurrentValue().toString())) {
                Iterator it = treeMap.entrySet().iterator();
                while (it.hasNext()) {
                    Map.Entry entry = (Map.Entry) it.next();
                    context.write(new Text(entry.getKey().toString()), new Text("0"));
                }
            }else{
                String key = "";
                String value = "";

                //将treeMap中的结果,按value-key顺序写入contex中
                Iterator it = treeMap.entrySet().iterator();
                while (it.hasNext()) {
                    Map.Entry entry = (Map.Entry) it.next();
                    String[] sp = entry.getKey().toString().split("\|");

                    if (key.equals(sp[0])){
                        long time = getSecondDiff(value, sp[1]);
                        if (!treeMapResult.containsKey(sp[0] + "|" + sp[2])) {
                            treeMapResult.put(sp[0] + "|" + sp[2], time);
                        } else {
                            treeMapResult.put(sp[0] + "|" + sp[2], Long.parseLong(treeMapResult.get(sp[0] + "|" + sp[2]).toString()) + time);
                        }
                    }else{
                        treeMapResult.put(sp[0] + "|" + sp[2], Long.parseLong("0"));
                    }

                    key = sp[0];
                    value = sp[1];
                }

                // 输出
                Iterator iter = treeMapResult.entrySet().iterator();
                while (iter.hasNext()) {
                    Map.Entry entry = (Map.Entry) iter.next();
                    context.write(new Text(entry.getKey().toString()), new Text(entry.getValue().toString()));
                }
            }
        }
    }

    private static String getParameter(Text value, String param){
        try {
            return value.toString().substring(value.toString().indexOf(param) + param.length() + 1, value.toString().indexOf("]", value.toString().indexOf(param) + param.length() + 1));
        }catch(Exception e){
            return "";
        }
    }

    private static long getSecondDiff(String s1, String s2){

        SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

        Date d1 = null;
        Date d2 = null;

        try {
            d1 = format.parse(s1);
            d2 = format.parse(s2);

            //毫秒ms
            long diff = d2.getTime() - d1.getTime();

            long diffSeconds = diff / 1000;

            return diffSeconds;
        }catch(Exception e){
            return 0;
        }
    }
}

四. 结果验证:

4.1. 将项目打成Jar包后,放到CentOS上的/home/hadoop/Downloads目录下。

4.2. 执行:

hadoop jar /home/hadoop/Downloads/hadoop-MapReduce-1.0-SNAPSHOT.jar org.apache.hadoop.examples.wordcount /agentlog/ /user/sun/MapReduce/wordCountX

4.3. 查看结果:

hadoop fs -cat /user/sun/MapReduce/wordCountX/part-r-00000

 

 

最后

以上就是迷路仙人掌为你收集整理的使用MapReduce对Hadoop下的日志记录进行分析处理的全部内容,希望文章能够帮你解决使用MapReduce对Hadoop下的日志记录进行分析处理所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(44)

评论列表共有 0 条评论

立即
投稿
返回
顶部