我是靠谱客的博主 贪玩摩托,最近开发中收集的这篇文章主要介绍基于pagerank算法的运用Hbase的搜索引擎(5)——加上工作流控制篇,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

添加工作流控制的原因是为了让程序可以依照自己设置好的依赖先后关系自动的运行程序。

思想:

ControlledJob 是受控的job,可以添加不同job之间的依赖关系
JobControl 是主控程序,用来按照依赖关系自动提交任务

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob;
import org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import java.util.List;
public class SearchMain extends Configured implements Tool {
public static void main(String[] args) throws
Exception {
ToolRunner.run(new SearchMain(), args);
}
@Override
public int run(String[] strings) throws Exception {
Configuration conf = getConf();
conf.set("hbase.zookeeper.quorum", "hadoopPD:2181");
//=======================================================================================
//总控对象,用来按照依赖关系自动提交任务
JobControl control=new JobControl("SearchEngine");
//---------------------------------------------------------------------------------------CleanDataMR
Job job1 = Job.getInstance(conf, "cleanData");
job1.setJobName(CleanDataMR.class.getName());
job1.setJarByClass(CleanDataMR.class);
String table1 = conf.get("before_table");
TableMapReduceUtil.initTableMapperJob(table1, new Scan(), CleanDataMR.CDMapper.class, ImmutableBytesWritable.class, MapWritable.class, job1);
TableMapReduceUtil.initTableReducerJob("clean_webpage", CleanDataMR.CDReducer.class, job1);
ControlledJob cj1=new ControlledJob(job1.getConfiguration());
//---------------------------------------------------------------------------------------GetKeyWordMR
Job job2=Job.getInstance(conf);
job2.setJobName(GetKeyWordMR.class.getName());
job2.setJarByClass(GetKeyWordMR.class);
TableMapReduceUtil.initTableMapperJob(Bytes.toBytes("clean_webpage"), new Scan(), GetKeyWordMR.KeyWordMapper.class, Text.class, Text.class, job2);
TableMapReduceUtil.initTableReducerJob("clean_webpage", GetKeyWordMR.KeyWordReducer.class,job2);
ControlledJob cj2=new ControlledJob(job2.getConfiguration());
//--------------------------------------------------------------------------------------------PageRankMR.
//迭代次数
int round=10;
//存放迭代中的每个job
ControlledJob[]
cjbs=new ControlledJob[round];
for (int i = 0; i < round; i++) {
Job job = Job.getInstance(conf);
job.setJobName(PageRankMR.class.getName());
job.setJarByClass(PageRankMR.class);
TableMapReduceUtil.initTableMapperJob(Bytes.toBytes("clean_webpage"), new Scan(), PageRankMR.PageRankMapper.class, Text.class, Text.class, job);
TableMapReduceUtil.initTableReducerJob("clean_webpage", PageRankMR.PageRankReducer.class, job);
cjbs[i] =new ControlledJob(job.getConfiguration());
}
//==============================================================================================
InvertIndexMR
Job job4 = Job.getInstance(conf,"BuildInvertIndex");
job4.setJarByClass(InvertIndexMR.class);
TableMapReduceUtil.initTableMapperJob("clean_webpage", new Scan(), InvertIndexMR.IIMapper.class,ImmutableBytesWritable.class,MapWritable.class,job4);
TableMapReduceUtil.initTableReducerJob(conf.get("result_table"), InvertIndexMR.IIReducer.class,job4);
ControlledJob cj4=new ControlledJob(job4.getConfiguration());
//================================================================================================
cj2.addDependingJob(cj1);
cjbs[0].addDependingJob(cj2);
for(int i=1;i<round;i++){
cjbs[i].addDependingJob(cjbs[i-1]);
}
cj4.addDependingJob(cjbs[round-1]);
//------------------------------
control.addJob(cj1);
control.addJob(cj2);
for(ControlledJob cjb:cjbs){
control.addJob(cjb);
}
control.addJob(cj4);
//利用控制中心提交任务
Thread t = new Thread(control);
//自动开始任务提交,不判断任务是否成功
t.start();
//监控每个job是否运行成功
//获得某个正在运行的job
List<ControlledJob> list = control.getRunningJobList();
//如果某个job运行失败,整个工作流停止
for(ControlledJob cj:list){
boolean b=cj.getJob().monitorAndPrintJob();
if(b==false){
//停止Java虚拟机
System.exit(-1);
}
}
return 0;
}
}

最后

以上就是贪玩摩托为你收集整理的基于pagerank算法的运用Hbase的搜索引擎(5)——加上工作流控制篇的全部内容,希望文章能够帮你解决基于pagerank算法的运用Hbase的搜索引擎(5)——加上工作流控制篇所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(62)

评论列表共有 0 条评论

立即
投稿
返回
顶部