我是靠谱客的博主 贪玩摩托,这篇文章主要介绍基于pagerank算法的运用Hbase的搜索引擎(5)——加上工作流控制篇,现在分享给大家,希望可以做个参考。

添加工作流控制的原因是为了让程序可以依照自己设置好的依赖先后关系自动的运行程序。

思想:

ControlledJob 是受控的job,可以添加不同job之间的依赖关系
JobControl 是主控程序,用来按照依赖关系自动提交任务

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.MapWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob; import org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import java.util.List; public class SearchMain extends Configured implements Tool { public static void main(String[] args) throws Exception { ToolRunner.run(new SearchMain(), args); } @Override public int run(String[] strings) throws Exception { Configuration conf = getConf(); conf.set("hbase.zookeeper.quorum", "hadoopPD:2181"); //======================================================================================= //总控对象,用来按照依赖关系自动提交任务 JobControl control=new JobControl("SearchEngine"); //---------------------------------------------------------------------------------------CleanDataMR Job job1 = Job.getInstance(conf, "cleanData"); job1.setJobName(CleanDataMR.class.getName()); job1.setJarByClass(CleanDataMR.class); String table1 = conf.get("before_table"); TableMapReduceUtil.initTableMapperJob(table1, new Scan(), CleanDataMR.CDMapper.class, ImmutableBytesWritable.class, MapWritable.class, job1); TableMapReduceUtil.initTableReducerJob("clean_webpage", CleanDataMR.CDReducer.class, job1); ControlledJob cj1=new ControlledJob(job1.getConfiguration()); //---------------------------------------------------------------------------------------GetKeyWordMR Job job2=Job.getInstance(conf); job2.setJobName(GetKeyWordMR.class.getName()); job2.setJarByClass(GetKeyWordMR.class); TableMapReduceUtil.initTableMapperJob(Bytes.toBytes("clean_webpage"), new Scan(), GetKeyWordMR.KeyWordMapper.class, Text.class, Text.class, job2); TableMapReduceUtil.initTableReducerJob("clean_webpage", GetKeyWordMR.KeyWordReducer.class,job2); ControlledJob cj2=new ControlledJob(job2.getConfiguration()); //--------------------------------------------------------------------------------------------PageRankMR. //迭代次数 int round=10; //存放迭代中的每个job ControlledJob[] cjbs=new ControlledJob[round]; for (int i = 0; i < round; i++) { Job job = Job.getInstance(conf); job.setJobName(PageRankMR.class.getName()); job.setJarByClass(PageRankMR.class); TableMapReduceUtil.initTableMapperJob(Bytes.toBytes("clean_webpage"), new Scan(), PageRankMR.PageRankMapper.class, Text.class, Text.class, job); TableMapReduceUtil.initTableReducerJob("clean_webpage", PageRankMR.PageRankReducer.class, job); cjbs[i] =new ControlledJob(job.getConfiguration()); } //============================================================================================== InvertIndexMR Job job4 = Job.getInstance(conf,"BuildInvertIndex"); job4.setJarByClass(InvertIndexMR.class); TableMapReduceUtil.initTableMapperJob("clean_webpage", new Scan(), InvertIndexMR.IIMapper.class,ImmutableBytesWritable.class,MapWritable.class,job4); TableMapReduceUtil.initTableReducerJob(conf.get("result_table"), InvertIndexMR.IIReducer.class,job4); ControlledJob cj4=new ControlledJob(job4.getConfiguration()); //================================================================================================ cj2.addDependingJob(cj1); cjbs[0].addDependingJob(cj2); for(int i=1;i<round;i++){ cjbs[i].addDependingJob(cjbs[i-1]); } cj4.addDependingJob(cjbs[round-1]); //------------------------------ control.addJob(cj1); control.addJob(cj2); for(ControlledJob cjb:cjbs){ control.addJob(cjb); } control.addJob(cj4); //利用控制中心提交任务 Thread t = new Thread(control); //自动开始任务提交,不判断任务是否成功 t.start(); //监控每个job是否运行成功 //获得某个正在运行的job List<ControlledJob> list = control.getRunningJobList(); //如果某个job运行失败,整个工作流停止 for(ControlledJob cj:list){ boolean b=cj.getJob().monitorAndPrintJob(); if(b==false){ //停止Java虚拟机 System.exit(-1); } } return 0; } }

最后

以上就是贪玩摩托最近收集整理的关于基于pagerank算法的运用Hbase的搜索引擎(5)——加上工作流控制篇的全部内容,更多相关基于pagerank算法内容请搜索靠谱客的其他文章。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(71)

评论列表共有 0 条评论

立即
投稿
返回
顶部