我是靠谱客的博主 炙热小松鼠,这篇文章主要介绍一些算法的MapReduce实现——有向图求边的交集ProblemInput Output Hadoop Code Reference,现在分享给大家,希望可以做个参考。
Problem
如下图所示,求这三个graph的公共边,也即是途中有色粗线所示
Input
输入格式:
Graph_id<tab>source<tab>destination
上图数据输入格式例子:
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27graph1<tab>node1<tab>node3 graph1<tab>node1<tab>node2 graph1<tab>node2<tab>node3 graph1<tab>node2<tab>node7 graph1<tab>node2<tab>node4 graph1<tab>node3<tab>node4 graph1<tab>node3<tab>node5 graph1<tab>node4<tab>node6 graph1<tab>node4<tab>node6 graph2<tab>node1<tab>node2 graph2<tab>node1<tab>node2 graph2<tab>node1<tab>node7 graph2<tab>node2<tab>node4 graph2<tab>node2<tab>node5 graph2<tab>node2<tab>node7 graph2<tab>node3<tab>node5 graph2<tab>node4<tab>node6 graph2<tab>node7<tab>node3 graph3<tab>node1<tab>node3 graph3<tab>node1<tab>node2 graph3<tab>node2<tab>node4 graph3<tab>node2<tab>node7 graph3<tab>node3<tab>node5 graph3<tab>node3<tab>node6 graph3<tab>node4<tab>node7 graph3<tab>node5<tab>node6
Output
复制代码
1
2
3
4
5node1:node2<tab>graph3^graph2^graph1 node2:node4<tab>graph3^graph2^graph1 node2:node7<tab>graph3^graph2^graph1 node3:node5<tab>graph3^graph2^graph1
Hadoop Code
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38import package HadoopGTK; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.util.Tool; public abstract class ExampleBaseJob extends Configured implements Tool { // method to set the configuration for the job and the mapper and the reducer classes protected Job setupJob(String jobName,JobInfo jobInfo) throws Exception { Job job = new Job(new Configuration(), jobName); // set the several classes job.setJarByClass(jobInfo.getJarByClass()); //set the mapper class job.setMapperClass(jobInfo.getMapperClass()); //the combiner class is optional, so set it only if it is required by the program if (jobInfo.getCombinerClass() != null) job.setCombinerClass(jobInfo.getCombinerClass()); //set the reducer class job.setReducerClass(jobInfo.getReducerClass()); //the number of reducers is set to 3, this can be altered according to the program's requirements job.setNumReduceTasks(3); // set the type of the output key and value for the Map & Reduce // functions job.setOutputKeyClass(jobInfo.getOutputKeyClass()); job.setOutputValueClass(jobInfo.getOutputValueClass()); return job; } protected abstract class JobInfo { public abstract Class<?> getJarByClass(); public abstract Class<? extends Mapper> getMapperClass(); public abstract Class<? extends Reducer> getCombinerClass(); public abstract Class<? extends Reducer> getReducerClass(); public abstract Class<?> getOutputKeyClass(); public abstract Class<?> getOutputValueClass(); } }
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164import java.io.IOException; import java.util.HashSet; import java.util.Iterator; import java.util.Set; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.ToolRunner; /** * * Description: MapReduce program to find the intersection among the graphs. The * input, the output and the number of graphs are specified as command-line * arguments. * * * The input file contains data of the form : graphId<tab>source<tab>destination * * The output file contains data of the form : * source:destination<tab>graphId_1^graphId_2^..graphId_n where 'n' is the * number of graphs for which we are finding the intersection and * source:destination pair is the common edge (source-destination pair) found in * all the n graphs. * * Hadoop version used: 0.20.2 * * **/ public class Intersect extends ExampleBaseJob { /** * Mapper class that has the map method that outputs intermediate keys and * values to be processed by the IntersectReducer. * * The type parameters are the type of the input key, the type of the input * values, the type of the output key and the type of the output values * * * Input format : graphId<tab>source<tab>destination * * Output format : <source:destination, graphId> * **/ public static class IntersectMapper extends Mapper<LongWritable, Text, Text, Text> { private Text gId = new Text(); // graphId private Text srcDestPair = new Text(); // source, destination pair @Override public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { StringTokenizer itr = new StringTokenizer(value.toString()); while (itr.hasMoreTokens()) { // get the graph id gId.set(itr.nextToken()); // setting the key as source and the value as the destination //The source and the destination are delimited by ":" here srcDestPair.set(itr.nextToken() + ":" + itr.nextToken()); //emit key, value pair from mapper //here the key is the source:destination pair and the graph id is the value context.write(srcDestPair, gId); } } } /** * Reducer class for the intersection problem. The intermediate keys and * values from the IntersectMapper form the input to the IntersectReducer. * The output of the IntersectReducer will be the common edge found along * with the graph ids. * * Input format : <source: destination, graphId> * * Output format :<source: destination, graphId> * * **/ public static class IntersectReducer extends Reducer<Text, Text, Text, Text> { @Override public void reduce(Text key, Iterable<Text> values, Context context ) throws IOException, InterruptedException { boolean first = true;//used for formatting the output StringBuilder toReturn = new StringBuilder(); Set<String> graphIds = new HashSet<String>(); Iterator<String> graphIdItr; int numberOfGraphs = 3; // the number of graphs is set as 3, it can be changed according to the number of graphs used in the program // add the graph ids to the set. This is to eliminate considering // multiple edges between same pair of nodes in same graphk for (Text val : values) { graphIds.add(val.toString()); } graphIdItr = graphIds.iterator(); // Iterate through the graphs and append the graph ids to a stringbuilder while (graphIdItr.hasNext()) { //for better formatting of the output if (!first){ toReturn.append('^'); } first = false; toReturn.append(graphIdItr.next()); } String intersect = new String(toReturn); StringTokenizer st = new StringTokenizer(intersect, "^"); // use StringTokenizer and if the number of graph ids is equal to // the number of graphs we're considering, write to the context if (st.countTokens() == numberOfGraphs){ //emit the key, value pair from the reducer //here the key is the source:destination pair and the value will be the concatenated graph ids that has this source:destination pair context.write(key, new Text(intersect)); } } } // method to set the configuration for the job and the mapper and the reducer classes private Job getJobConf(String[] args) throws Exception { JobInfo jobInfo = new JobInfo() { @Override public Class<? extends Reducer> getCombinerClass() { return IntersectReducer.class; } @Override public Class<?> getJarByClass() { return Intersect.class; } @Override public Class<? extends Mapper> getMapperClass() { return IntersectMapper.class; } @Override public Class<?> getOutputKeyClass() { return Text.class; } @Override public Class<?> getOutputValueClass() { return Text.class; } @Override public Class<? extends Reducer> getReducerClass() { return IntersectReducer.class; } }; return setupJob("intersect", jobInfo); } @Override public int run(String[] args) throws Exception { Job job = getJobConf(args); FileInputFormat.addInputPath(job, new Path(args[0]));// specify the input path to be the first command-line argument passed by the user FileOutputFormat.setOutputPath(job, new Path(args[1]));// specify the output path to be the second command-line argument passed by the user return job.waitForCompletion(true) ? 0 : 1;// wait for the job to complete } public static void main(String[] args) throws Exception { int res = ToolRunner.run(new Configuration(), new Intersect(), args); if (args.length != 2) { System.err.println("Usage: Intersect <in> <out>" ); System.exit(2); } System.exit(res); } }
Reference
http://hadooptutorial.wikispaces.com/The+First+MapReduce+Graph+Program
测试数据
最后
以上就是炙热小松鼠最近收集整理的关于一些算法的MapReduce实现——有向图求边的交集ProblemInput Output Hadoop Code Reference的全部内容,更多相关一些算法的MapReduce实现——有向图求边的交集ProblemInput内容请搜索靠谱客的其他文章。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复