概述
一、环境配置
这里选择的环境是hadoop-0.20.2和hbase-0.90.4,Hadoop环境配置参看
这里
,HBase环境配置请看
这里
。
需要注意的是,本文的需求是在Hadoop上跑MapReduce job来分析日志并将结果持久化到HBase,所以,在编译程序时,Hadoop需要用到HBase和Zookeeper包,因此,需要分别将hbase-0.90.4.jar和zookeeper-3.3.2.jar拷贝到Hadoop的lib目录下,具体操作如下:
#cp /root/hbase-0.90.4/hbase-0.90.4.jar /root/hadoop-0.20.2/lib
#cp /root/hbase-0.90.4/lib/zookeeper-3.3.2.jar /root/hadoop-0.20.2/lib
二、实例编写
日志文件xxxlog.txt的内容如下:
version-------------time-----------------id-------rt----filter--------id----rt-----filter
1.0^A2014-03-03 00:00:01^Ad2000^C4^C3040^Bd2001^C7^C0
1.0^A2014-03-03 00:00:01^Ad3000^C4^C3041^Bd2001^C7^C0
同样,需要将此文件放到hdfs目录下,比如:hadoop fs -put /tmp/input。
为持久化在HBase中创建table和family,比如:./hbase shell,create 'xxxlog', 'dsp_filter'。
为了清晰便于扩展,将Maper、Reducer、Driver分开,具体如下:
1、
Maper
#vi xxxLogMaper.java
import java.io.IOException;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
public class xxxLogMaper
extends Mapper {
public final static String CONTROL_A = "^A";
public final static String CONTROL_B = "^B";
public final static String CONTROL_C = "^C";
public final static int PV_TIME = 1;
public final static int DSP_INFO_LIST = 5;
public final static int DSP_ID = 0;
public final static int DSP_FILTER = 2;
public void map(Object key, Text value, Context context) {
try {
System.out.println("n------------map come on-----------");
System.out.println("nline=-----------"+value.toString());
String[] line = value.toString().split(CONTROL_A);
String pvtime = "";
System.out.println("npvtime=-----------"+line[PV_TIME]);
String year = line[PV_TIME].substring(0, 4);
String month = line[PV_TIME].substring(5, 7);
String day = line[PV_TIME].substring(8, 10);
String hour = line[PV_TIME].substring(11, 13);
String minute = "";
int m_tmp = Integer.parseInt(line[PV_TIME].substring(14, 16));
if (m_tmp >= 0 && m_tmp <= 30) {
minute = "00";
} else {
minute = "30";
}
pvtime = year + month + day + hour + minute;
String[] dspInfoList = line[DSP_INFO_LIST].split(CONTROL_B);
String dspid = "";
String dspfilter = "";
Text k = new Text();
IntWritable v = new IntWritable(1);
for(int i=0; i<dspinfolist.length; i++)="" {
System.out.println("n------------map-----------");
System.out.println("ndspinfo="+dspInfoList[i]);
String[] dspInfo = dspInfoList[i].split(CONTROL_C);
dspid = dspInfo[DSP_ID];
dspfilter = dspInfo[DSP_FILTER];
//key=ddspid^Afilter^Apvtime, value=1
k.set(dspid+CONTROL_A+dspfilter+CONTROL_A+pvtime);
context.write(k, v);
System.out.println("nkey="+k.toString());
System.out.println("nvalue="+v.toString());
}
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
2、
Reducer
import java.io.IOException;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.HTablePool;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.KeyValue;
public class BidLogReducer
extends TableReducer {
public final static String COL_FAMILY = "dsp_filter";
public final static String COL_NAME = "sum";
private final static String ZK_HOST = "localhost";
private final static String TABLE_NAME = "xxxlog";
public void reduce(Text key, Iterable values, Context context)
throws IOException, InterruptedException {
System.out.println("n------------reduce come on-----------");
String k = key.toString();
IntWritable v = new IntWritable();
int sum = 0;
for (IntWritable val:values) {
sum += val.get();
}
//v.set(sum);
//context.write(key, v);
System.out.println("n------------reduce-----------");
System.out.println("ncur-key="+key.toString());
System.out.println("ncur-value="+sum);
Configuration conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum.", ZK_HOST);
HTablePool pool = new HTablePool(conf, 3);
HTable table = (HTable)pool.getTable(TABLE_NAME);
Get getrow = new Get(k.getBytes());
Result r = table.get(getrow);
int m_tmp = 0;
for(KeyValue kv:r.raw()) {
System.out.println("nraw-KeyValugge---"+kv);
System.out.println("nraw-row=>"+Bytes.toString(kv.getRow()));
System.out.println("nraw-family=>"+Bytes.toString(kv.getFamily()));
System.out.println("nraw-qualifier=>"+Bytes.toString(kv.getQualifier()));
System.out.println("nraw-value=>"+Bytes.toString(kv.getValue()));
m_tmp += Integer.parseInt(Bytes.toString(kv.getValue()));
}
sum = sum + m_tmp;
v.set(sum);
System.out.println("nreal-key="+key.toString());
System.out.println("nreal-value="+v.toString());
Put putrow = new Put(k.getBytes());
putrow.add(COL_FAMILY.getBytes(), COL_NAME.getBytes(), String.valueOf(v).getBytes());
try {
context.write(new ImmutableBytesWritable(key.getBytes()), putrow);
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
3、Driver
#vi xxxLogDriver.java
#vi xxxLogReducer.java
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class xxxLogDriver {
public final static String ZK_HOST = "localhost";
public final static String TABLE_NAME = "xxxlog";
public static void main(String[] args) throws Exception {
//Hbase Configuration
Configuration conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum.", ZK_HOST);
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage: please input args");
System.exit(2);
}
Job job = new Job(conf,"xxxLog");
job.setJarByClass(xxxLogDriver.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.out.println("n------------driver come on-----------");
job.setMapperClass(xxxLogMaper.class);
job.setReducerClass(xxxLogReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
TableMapReduceUtil.initTableReducerJob(TABLE_NAME, xxxLogReducer.class, job);
System.exit(job.waitForCompletion(true)? 0 : 1);
}
}
三、编译运行
在当前目录下编译源码,具体如下:
#javac -classpath /root/hadoop-0.20.2/hadoop-0.20.2-core.jar:/root/hadoop-0.20.2/lib/commons-cli-1.2.jar:/root/hbase-0.90.4/hbase-0.90.4.jar -d ./ xxxLogMaper.java xxxLogReducer.java xxxLogDriver.java
需要注意的是,必须三个一起编译否则出错:
xxxLogDriver.java:22: error: cannot find symbol
job.setMapperClass(xxxLogMaper.class);
打包class文件,具体如下:
#jar cvf xxxLog.jar *class
#rm -rf *class
运行任务,具体如下:
#hadoop jar xxxLog.jar xxxLogDriver /tmp/input /tmp/output
查询结果,具体如下:
#./hbase shell
hbase(main):014:0>scan 'xxxlog'
最后
以上就是直率铅笔为你收集整理的Hadoop初探之MapReduce+HBase实例的全部内容,希望文章能够帮你解决Hadoop初探之MapReduce+HBase实例所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复