概述
一. 简介
MapReduce是一个高性能的批处理分布式计算框架,用于对海量数据进行并行分析和处理。与传统方法相比较,MapReduce更倾向于蛮力去解决问题,通过简单、粗暴、有效的方式去处理海量的数据。通过对数据的输入、拆分与组合(核心),将任务分配到多个节点服务器上,进行分布式计算,这样可以有效地提高数据管理的安全性,同时也能够很好地范围被管理的数据。
mapreduce核心就是map+shuffle+reducer,首先通过读取文件,进行分片,通过map获取文件的key-value映射关系,用作reducer的输入,在作为reducer输入之前,要先对map的key进行一个shuffle,也就是排个序,然后将排完序的key-value作为reducer的输入进行reduce操作,当然一个mapreduce任务可以不要有reduce,只用一个map
其实现在MapReduce已经被Spark取代了,不过作为对大数据的学习,还是要稍微了解一下,下面是我学习过程中看过和写过的例子。
二. Hadoop自带的WordCount
2.1. 创建一个Maven项目,目录结构如下:
2.2. pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>sun</groupId>
<artifactId>hadoop-MapReduce</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<hadoopVersion>2.6.0</hadoopVersion>
</properties>
<dependencies>
<!-- Hadoop start -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoopVersion}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>${hadoopVersion}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>${hadoopVersion}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoopVersion}</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-it</artifactId>
<version>1.1.2</version>
</dependency>
<!-- Hadoop -->
<dependency>
<groupId>jdk.tools</groupId>
<artifactId>jdk.tools</artifactId>
<version>1.8</version>
<scope>system</scope>
<systemPath>${JAVA_HOME}/lib/tools.jar</systemPath>
</dependency>
<dependency>
<groupId>org.jetbrains</groupId>
<artifactId>annotations-java5</artifactId>
<version>RELEASE</version>
<scope>compile</scope>
</dependency>
</dependencies>
</project>
2.3. WordCount.java
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class WordCount
{
public static void main(String[] args)
throws Exception
{
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length < 2) {
System.err.println("Usage: wordcount <in> [<in>...] <out>");
System.exit(2);
}
Job job = Job.getInstance(conf, "word count");
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
for (int i = 0; i < otherArgs.length - 1; i++) {
FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
}
FileOutputFormat.setOutputPath(job, new Path(otherArgs[(otherArgs.length - 1)]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable>
{
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context)
throws IOException, InterruptedException
{
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
this.result.set(sum);
context.write(key, this.result);
}
}
public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>
{
private static final IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key, Text value, Mapper<Object, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException
{
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
this.word.set(itr.nextToken());
context.write(this.word, one);
}
}
}
}
三. 我的例子:
3.1. 将WordCount的结果上传到HBase:
WordCountUpLoadToHBase.java:
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
public class WordCountUpLoadToHBase extends Configured {
public static class WCHBaseMapper extends Mapper<Object, Text, ImmutableBytesWritable, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key,Text value,Context context) throws IOException, InterruptedException{
StringTokenizer strs = new StringTokenizer(value.toString());
while(strs.hasMoreTokens()){
word.set(strs.nextToken());
context.write(new ImmutableBytesWritable(Bytes.toBytes(word.toString())), one);
}
}
}
public static class WCHBaseReducer extends TableReducer<ImmutableBytesWritable, IntWritable, ImmutableBytesWritable>{
public void reduce(ImmutableBytesWritable key,Iterable<IntWritable> values,Context context) throws IOException, InterruptedException{
int sum = 0;
for(IntWritable val:values){
sum += val.get();
}
Put put = new Put(key.get());
put.add(Bytes.toBytes("content"),Bytes.toBytes("count"),Bytes.toBytes(sum+""));
context.write(key, put);
}
}
@SuppressWarnings("all")
public static void main(String[] args) throws MasterNotRunningException, ZooKeeperConnectionException, IOException, ClassNotFoundException, InterruptedException {
// TODO Auto-generated method stub
String tableName = "wordcount";
Configuration conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum","hadoop");
conf.set("hbase.zookeeper.property.clientPort","2181");
HBaseAdmin admin = new HBaseAdmin(conf);
//如果表格存在就删除
if(admin.tableExists(tableName)){
admin.disableTable(tableName);
admin.deleteTable(tableName);
}
HTableDescriptor tableDescriptor = new HTableDescriptor(tableName);
HColumnDescriptor columnDescriptor =new HColumnDescriptor("content");
tableDescriptor.addFamily(columnDescriptor);
admin.createTable(tableDescriptor);
Job job = new Job(conf,"upload to hbase");
job.setJarByClass(WordCountUpLoadToHBase.class);
job.setMapperClass(WCHBaseMapper.class);
TableMapReduceUtil.initTableReducerJob(tableName, WCHBaseReducer.class, job,null,null,null,null,false);
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(ImmutableBytesWritable.class);
job.setOutputValueClass(Put.class);
FileInputFormat.addInputPaths(job, "hdfs://hadoop:9000/agentlog/*");
System.exit(job.waitForCompletion(true)?0:1);
}
}
3.2. 从HBase读取数据
MRReadFromHbase.java:
import java.io.IOException;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class MRReadFromHbase extends Configured {
public static class WCHBaseMapper extends TableMapper<Text, Text>{
@Override
public void map(ImmutableBytesWritable key,Result values,Context context) throws IOException, InterruptedException{
StringBuffer sb =new StringBuffer("");
for(Map.Entry<byte[], byte[]> value:values.getFamilyMap("content".getBytes()).entrySet()){
String str =new String(value.getValue());
if(str!=null){
sb.append(str);
}
context.write(new Text(key.get()), new Text(sb.toString()));
}
}
}
public static class WCHBaseReducer extends Reducer<Text, Text, Text, Text>{
private Text result =new Text();
public void reduce(Text key,Iterable<Text> values,Context context) throws IOException, InterruptedException{
for(Text val:values){
result.set(val);
context.write(key,result);
}
}
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
// TODO Auto-generated method stub
String tableName = "wordcount";
Configuration conf =HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum", "hadoop");
conf.set("hbase.zookeeper.property.clientPort", "2181");
Job job =new Job(conf,"read from hbase to hdfs");
job.setJarByClass(MRReadFromHbase.class);
job.setReducerClass(WCHBaseReducer.class);
TableMapReduceUtil.initTableMapperJob(tableName, new Scan(), WCHBaseMapper.class, Text.class, Text.class, job);
FileOutputFormat.setOutputPath(job, new Path("hdfs://hadoop1:9000/user/sun/hbase"));
System.exit(job.waitForCompletion(true)?0:1);
}
}
3.3. 我的自定义格式的日志数据处理:
日志以[format:1][user:AAA][interface:/bt/btCourse/get][date:2018/10/10]的格式存储,针对[format:1]开头的数据,根据不同用户user进行排序统计。
UserLog.java:
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.*;
public class UserLog
{
public static void main(String[] args)
throws Exception
{
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length < 2) {
System.err.println("Usage: wordcount <in> [<in>...] <out>");
System.exit(2);
}
Job job = Job.getInstance(conf, "word count");
job.setJarByClass(UserLog.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
for (int i = 0; i < otherArgs.length - 1; i++) {
FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
}
FileOutputFormat.setOutputPath(job, new Path(otherArgs[(otherArgs.length - 1)]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
// map将输入中的value复制到输出数据的key上,并直接输出
public static class TokenizerMapper extends Mapper<Object, Text, Text, Text> {
private static Text line = new Text();// 每行数据
// 实现map函数
public void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
if (value.toString().startsWith("[format:1]")) {
context.write(new Text(getParameter(value, "user") + "|" + getParameter(value, "time") + "|" + getParameter(value, "html")), new Text(""));
}
}
}
/* // reduce将输入中的key复制到输出数据的key上,并直接输出
public static class IntSumReducer extends Reducer<Text, Text, Text, Text> {
// 实现reduce函数
public void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
for (Text val : values) {
context.write(key, val);
}
}
}*/
public static class IntSumReducer extends Reducer<Text, Text, Text, Text> {
//定义treeMap来保持统计结果,由于treeMap是按key升序排列的,这里要人为指定Comparator以实现倒排
private TreeMap<String, String> treeMap = new TreeMap<String, String>(new Comparator<String>() {
//@Override
public int compare(String x, String y) {
return x.compareTo(y);
}
});
//定义treeMap来保持统计结果,由于treeMap是按key升序排列的,这里要人为指定Comparator以实现倒排
private TreeMap<String, Long> treeMapResult = new TreeMap<String, Long>(new Comparator<String>() {
//@Override
public int compare(String x, String y) {
return x.compareTo(y);
}
});
public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
//reduce后的结果放入treeMap,而不是向context中记入结果
treeMap.put(key.toString(), key.toString());
}
protected void cleanup(Context context) throws IOException, InterruptedException {
if (StringUtils.isBlank(context.getCurrentValue().toString())) {
Iterator it = treeMap.entrySet().iterator();
while (it.hasNext()) {
Map.Entry entry = (Map.Entry) it.next();
context.write(new Text(entry.getKey().toString()), new Text("0"));
}
}else{
String key = "";
String value = "";
//将treeMap中的结果,按value-key顺序写入contex中
Iterator it = treeMap.entrySet().iterator();
while (it.hasNext()) {
Map.Entry entry = (Map.Entry) it.next();
String[] sp = entry.getKey().toString().split("\|");
if (key.equals(sp[0])){
long time = getSecondDiff(value, sp[1]);
if (!treeMapResult.containsKey(sp[0] + "|" + sp[2])) {
treeMapResult.put(sp[0] + "|" + sp[2], time);
} else {
treeMapResult.put(sp[0] + "|" + sp[2], Long.parseLong(treeMapResult.get(sp[0] + "|" + sp[2]).toString()) + time);
}
}else{
treeMapResult.put(sp[0] + "|" + sp[2], Long.parseLong("0"));
}
key = sp[0];
value = sp[1];
}
// 输出
Iterator iter = treeMapResult.entrySet().iterator();
while (iter.hasNext()) {
Map.Entry entry = (Map.Entry) iter.next();
context.write(new Text(entry.getKey().toString()), new Text(entry.getValue().toString()));
}
}
}
}
private static String getParameter(Text value, String param){
try {
return value.toString().substring(value.toString().indexOf(param) + param.length() + 1, value.toString().indexOf("]", value.toString().indexOf(param) + param.length() + 1));
}catch(Exception e){
return "";
}
}
private static long getSecondDiff(String s1, String s2){
SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
Date d1 = null;
Date d2 = null;
try {
d1 = format.parse(s1);
d2 = format.parse(s2);
//毫秒ms
long diff = d2.getTime() - d1.getTime();
long diffSeconds = diff / 1000;
return diffSeconds;
}catch(Exception e){
return 0;
}
}
}
四. 结果验证:
4.1. 将项目打成Jar包后,放到CentOS上的/home/hadoop/Downloads目录下。
4.2. 执行:
hadoop jar /home/hadoop/Downloads/hadoop-MapReduce-1.0-SNAPSHOT.jar org.apache.hadoop.examples.wordcount /agentlog/ /user/sun/MapReduce/wordCountX
4.3. 查看结果:
hadoop fs -cat /user/sun/MapReduce/wordCountX/part-r-00000
最后
以上就是迷路仙人掌为你收集整理的使用MapReduce对Hadoop下的日志记录进行分析处理的全部内容,希望文章能够帮你解决使用MapReduce对Hadoop下的日志记录进行分析处理所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复