概述
package com.uabrand.search_task;
import java.io.IOException;
import java.util.HashSet;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.ToolRunner;
import com.analyzer.SPAndroid;
import com.uabrand.search_task.Base;
import com.worm.util.RegexUtil;
public class SearchKeyWord extends Base{
public static class ByteMapper extends Mapper<LongWritable, Text, Text, Text> {
@Override
protected void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException {
context.getCounter(CounterRecorder.TOTAL).increment(1);
if(value==null){
return;
}
String valueText =value.toString();
if(valueText==null || valueText.length()<5){
return;
}
String[] vals = valueText.split("\|", -1);
if(vals==null || vals.length<39){
return;
}
String line = vals[28];
if(line!=null){
line = line.trim();
}
line = line.replaceAll("={2,}", "=");
line= line.trim();
//统一转化为小谢
line = line.toLowerCase();
//去除脏数据
if(line.contains("okhttp") || line.contains("httpclient") || line.contains("uuid") ){//|| line.contains("windows")
return;
}
//纯单行数据,不包含一些特殊字符
if(!RegexUtil.isSpecialChar(line)){
}else if(line.contains("iphone") || line.contains("ios") || line.contains("cfnetwork")) {
}else{
line = SPAndroid.filter_Data1(line);
line = SPAndroid.filter_Data2(line);
line = SPAndroid.filter_Data3(line);
if(line==null){
return;
}
String standKey=SPAndroid.getStandKey(line);
String[] strArray = line.split(" |,|t");
if(strArray==null || strArray.length<1){
return;
}
for(String item : strArray){
item = item.trim();
if(item!=null && item.length()>1){
context.write(new Text(item), new Text(standKey+"@"+line));
}
}
}
}
}
public static class TextMapper extends Mapper<LongWritable, Text, Text, Text> {
@Override
protected void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException {
if(value==null){
return;
}
String valueText =value.toString();
if(valueText==null || valueText.length()<5){
return;
}
String[] vals = valueText.split("\|", -1);
if(vals==null || vals.length<2){
return;
}
context.write(new Text(vals[0].trim()), new Text(vals[1]));
}
}
public static class ActionReducer extends Reducer<Text, Text, NullWritable, Text> {
private MultipleOutputs<NullWritable,Text> mos;
@Override
protected void setup(Context context) throws IOException,InterruptedException {
mos = new MultipleOutputs<NullWritable,Text>(context);
}
@Override
protected void cleanup(Context context) throws IOException,InterruptedException {
if(mos!=null){
mos.close();
mos =null;
}
}
@Override
protected void reduce(Text key, Iterable<Text> iter,Context context) throws IOException,InterruptedException {
if(iter==null || iter.iterator()==null){
return;
}
Set<String>datas =new HashSet<String>();
for(Text item : iter){
if(item!=null){
datas.add(item.toString());
item.clear();
item =null;
}
}
//还需要调整如果没有关键字如何处理
StringBuffer strBuf = new StringBuffer();
if(datas.size()>0){
for(String str:datas){
strBuf.append("|");
strBuf.append(str);
}
}
datas.clear();
datas =null;
//有数据且数据含有UA数据
if(strBuf.indexOf("$")>0 && strBuf.indexOf("@")>0){
mos.write("UA",NullWritable.get(), new Text(key.toString()+strBuf.toString()));
context.getCounter(CounterRecorder.SUCCEED).increment(1);
}
strBuf =null;
}
}
@Override
public int run(String[] args) throws Exception {
// TODO Auto-generated method stub
String inPath_1 = args[0];
String inPath_2 =args[1];
String outPath =args[2];
Configuration conf =this.getConf();
Job job = Job.getInstance(conf);
job.setJobName("SearchKeyWordTask_T");
job.setJarByClass(SearchKeyWord.class);
MultipleInputs.addInputPath(job, new Path(inPath_1), SequenceFileInputFormat.class, ByteMapper.class);
MultipleInputs.addInputPath(job, new Path(inPath_2), TextInputFormat.class, TextMapper.class);
job.setReducerClass(ActionReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(Text.class);
MultipleOutputs.addNamedOutput(job,"UA",TextOutputFormat.class,NullWritable.class,Text.class);
LazyOutputFormat.setOutputFormatClass(job,TextOutputFormat.class);
FileSystem fs = FileSystem.get(conf);
Path outPath_1 = new Path(outPath);
if(fs.exists(outPath_1)){
fs.deleteOnExit(outPath_1);
}
FileOutputFormat.setOutputPath(job,outPath_1);
return job.waitForCompletion(true) ? 0 : 1;
}
public static int startTask(Configuration con,String[] args) throws Exception{
return ToolRunner.run(con,new SearchKeyWord(),args);
}
public static void main(String[]args) throws Exception{
Configuration con =new Configuration();
String[] filePath = new String[]{
"/daas/20170428",//输入文件
"/user/_key",//输入的关键字文件
"/user/_temp"//输出文件
};
startTask(con,filePath);
}
}
最后
以上就是靓丽奇迹为你收集整理的Hadoop MapReduce多路径输入与多个输入 例子的全部内容,希望文章能够帮你解决Hadoop MapReduce多路径输入与多个输入 例子所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复