我是靠谱客的博主 炙热小松鼠,最近开发中收集的这篇文章主要介绍一些算法的MapReduce实现——有向图求边的交集ProblemInput Output Hadoop Code  Reference,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

Problem

如下图所示,求这三个graph的公共边,也即是途中有色粗线所示

Input

输入格式:

Graph_id<tab>source<tab>destination

 

上图数据输入格式例子:

graph1<tab>node1<tab>node3
graph1<tab>node1<tab>node2
graph1<tab>node2<tab>node3
graph1<tab>node2<tab>node7
graph1<tab>node2<tab>node4
graph1<tab>node3<tab>node4
graph1<tab>node3<tab>node5
graph1<tab>node4<tab>node6
graph1<tab>node4<tab>node6
graph2<tab>node1<tab>node2
graph2<tab>node1<tab>node2
graph2<tab>node1<tab>node7
graph2<tab>node2<tab>node4
graph2<tab>node2<tab>node5
graph2<tab>node2<tab>node7
graph2<tab>node3<tab>node5
graph2<tab>node4<tab>node6
graph2<tab>node7<tab>node3
graph3<tab>node1<tab>node3
graph3<tab>node1<tab>node2
graph3<tab>node2<tab>node4
graph3<tab>node2<tab>node7
graph3<tab>node3<tab>node5
graph3<tab>node3<tab>node6
graph3<tab>node4<tab>node7
graph3<tab>node5<tab>node6


Output

node1:node2<tab>graph3^graph2^graph1
node2:node4<tab>graph3^graph2^graph1
node2:node7<tab>graph3^graph2^graph1
node3:node5<tab>graph3^graph2^graph1


Hadoop Code

import package HadoopGTK;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.util.Tool;
public abstract class ExampleBaseJob extends Configured implements Tool {
// method to set the configuration for the job and the mapper and the reducer classes
protected Job setupJob(String jobName,JobInfo jobInfo) throws Exception {
Job job = new Job(new Configuration(), jobName);
// set the several classes
job.setJarByClass(jobInfo.getJarByClass());
//set the mapper class
job.setMapperClass(jobInfo.getMapperClass());
//the combiner class is optional, so set it only if it is required by the program
if (jobInfo.getCombinerClass() != null)
job.setCombinerClass(jobInfo.getCombinerClass());
//set the reducer class
job.setReducerClass(jobInfo.getReducerClass());
//the number of reducers is set to 3, this can be altered according to the program's requirements
job.setNumReduceTasks(3);
// set the type of the output key and value for the Map & Reduce
// functions
job.setOutputKeyClass(jobInfo.getOutputKeyClass());
job.setOutputValueClass(jobInfo.getOutputValueClass());
return job;
}
protected abstract class JobInfo {
public abstract Class<?> getJarByClass();
public abstract Class<? extends Mapper> getMapperClass();
public abstract Class<? extends Reducer> getCombinerClass();
public abstract Class<? extends Reducer> getReducerClass();
public abstract Class<?> getOutputKeyClass();
public abstract Class<?> getOutputValueClass();
}
}


 

import java.io.IOException;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.ToolRunner;
/**
*
* Description: MapReduce program to find the intersection among the graphs. The
* input, the output and the number of graphs are specified as command-line
* arguments.
*
*
* The input file contains data of the form : graphId<tab>source<tab>destination
*
* The output file contains data of the form :
* source:destination<tab>graphId_1^graphId_2^..graphId_n where 'n' is the
* number of graphs for which we are finding the intersection and
* source:destination pair is the common edge (source-destination pair) found in
* all the n graphs.
*
* Hadoop version used: 0.20.2
*
*
**/
public class Intersect extends ExampleBaseJob {
/**
* Mapper class that has the map method that outputs intermediate keys and
* values to be processed by the IntersectReducer.
*
* The type parameters are the type of the input key, the type of the input
* values, the type of the output key and the type of the output values
*
*
* Input format : graphId<tab>source<tab>destination
*
* Output format : <source:destination, graphId>
*
**/
public static class IntersectMapper extends
Mapper<LongWritable, Text, Text, Text> {
private Text gId = new Text(); // graphId
private Text srcDestPair = new Text(); // source, destination pair
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
// get the graph id
gId.set(itr.nextToken());
// setting the key as source and the value as the destination
//The source and the destination are delimited by ":" here
srcDestPair.set(itr.nextToken() + ":" + itr.nextToken());
//emit key, value pair from mapper
//here the key is the source:destination pair and the graph id is the value
context.write(srcDestPair, gId);
}
}
}
/**
* Reducer class for the intersection problem. The intermediate keys and
* values from the IntersectMapper form the input to the IntersectReducer.
* The output of the IntersectReducer will be the common edge found along
* with the graph ids.
*
* Input format : <source: destination, graphId>
*
* Output format :<source: destination, graphId>
*
*
**/
public static class IntersectReducer extends
Reducer<Text, Text, Text, Text> {
@Override
public void reduce(Text key, Iterable<Text> values, Context context
) throws IOException, InterruptedException {
boolean first = true;//used for formatting the output
StringBuilder toReturn = new StringBuilder();
Set<String> graphIds = new HashSet<String>();
Iterator<String> graphIdItr;
int numberOfGraphs = 3; // the number of graphs is set as 3, it can be changed according to the number of graphs used in the program
// add the graph ids to the set. This is to eliminate considering
// multiple edges between same pair of nodes in same graphk
for (Text val : values) {
graphIds.add(val.toString());
}
graphIdItr = graphIds.iterator();
// Iterate through the graphs and append the graph ids to a stringbuilder
while (graphIdItr.hasNext()) {
//for better formatting of the output
if (!first){
toReturn.append('^');
}
first = false;
toReturn.append(graphIdItr.next());
}
String intersect = new String(toReturn);
StringTokenizer st = new StringTokenizer(intersect, "^");
// use StringTokenizer and if the number of graph ids is equal to
// the number of graphs we're considering, write to the context
if (st.countTokens() == numberOfGraphs){
//emit the key, value pair from the reducer
//here the key is the source:destination pair and the value will be the concatenated graph ids that has this source:destination pair
context.write(key, new Text(intersect));
}
}
}
// method to set the configuration for the job and the mapper and the reducer classes
private Job getJobConf(String[] args) throws Exception {
JobInfo jobInfo = new JobInfo() {
@Override
public Class<? extends Reducer> getCombinerClass() {
return IntersectReducer.class;
}
@Override
public Class<?> getJarByClass() {
return Intersect.class;
}
@Override
public Class<? extends Mapper> getMapperClass() {
return IntersectMapper.class;
}
@Override
public Class<?> getOutputKeyClass() {
return Text.class;
}
@Override
public Class<?> getOutputValueClass() {
return Text.class;
}
@Override
public Class<? extends Reducer> getReducerClass() {
return IntersectReducer.class;
}
};
return setupJob("intersect", jobInfo);
}
@Override
public int run(String[] args) throws Exception {
Job job = getJobConf(args);
FileInputFormat.addInputPath(job, new Path(args[0]));// specify the input path to be the first command-line argument passed by the user
FileOutputFormat.setOutputPath(job, new Path(args[1]));// specify the output path to be the second command-line argument passed by the user
return job.waitForCompletion(true) ? 0 : 1;// wait for the job to complete
}
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), new Intersect(), args);
if (args.length != 2) {
System.err.println("Usage: Intersect <in> <out>" );
System.exit(2);
}
System.exit(res);
}
}


 Reference

http://hadooptutorial.wikispaces.com/The+First+MapReduce+Graph+Program

测试数据

最后

以上就是炙热小松鼠为你收集整理的一些算法的MapReduce实现——有向图求边的交集ProblemInput Output Hadoop Code  Reference的全部内容,希望文章能够帮你解决一些算法的MapReduce实现——有向图求边的交集ProblemInput Output Hadoop Code  Reference所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(60)

评论列表共有 0 条评论

立即
投稿
返回
顶部