概述
Hadoop 求解无向图中节点的重要性,通过求解节点的三角形个数来展现:
求解图中节点重要性,并排序,在大数据,分布式处理大型图组织形式的数据时很重要,找出重要节点,并对重要节点做特殊处理是很重要的
下面讲解如何来求解
这篇文章分为三部分:
1,python生成无向图的邻接矩阵
2,python画出这个无向图
3,hadoop mapreduce 求解图中每个节点的三角形个数
关于hadoop求解矩阵相乘,请看之前的文章:http://blog.csdn.net/thao6626/article/details/46472535
1,python生成无向图的邻接矩阵
# coding:utf-8
__author__ = 'taohao'
import random
class AdjMatrix(object):
def build_adjmatrix(self, dimension):
temp = 1
fd = open("./AdjMatrix.txt", 'w+')
for i in range(1, dimension + 1):
for j in range(temp, dimension + 1):
if i == j:
if i == dimension:
fd.write('A,' + str(i) + ',' + str(j) + ',' + '0' + 'n')
fd.write('B,' + str(i) + ',' + str(j) + ',' + '0')
else:
fd.write('A,' + str(i) + ',' + str(j) + ',' + '0' + 'n')
fd.write('B,' + str(i) + ',' + str(j) + ',' + '0' + 'n')
else:
value = random.randint(0, 1)
fd.write('A,' + str(i) + ',' + str(j) + ',' + str(value) + 'n')
fd.write('A,' + str(j) + ',' + str(i) + ',' + str(value) + 'n')
fd.write('B,' + str(i) + ',' + str(j) + ',' + str(value) + 'n')
fd.write('B,' + str(j) + ',' + str(i) + ',' + str(value) + 'n')
temp += 1
fd.close()
if __name__ == '__main__':
adjMatrix = AdjMatrix()
adjMatrix.build_adjmatrix(10)
2,python画出这个无向图
# coding:utf-8
__author__ = 'taohao'
import matplotlib.pyplot as plt
import networkx as nx
class DrawGraph(object):
def __init__(self):
self.graph = nx.Graph(name='graph')
def build_graph(self):
fd = open('./AdjMatrix.txt', 'r')
for line in fd:
item = line.split(',')
print item
# length = len(item)
if item[0] == 'A':
self.graph.add_node(item[1])
self.graph.add_node(item[2])
# self.graph.add_nodes_from([int(item[1]), int(item[2])])
if item[3][0] == '1':
self.graph.add_edge(item[1], item[2])
def draw_graph(self):
nx.draw_networkx(self.graph, with_labels=True)
# draw_networkx() can display the label of nodes
plt.show()
if __name__ == '__main__':
draw_graph = DrawGraph()
draw_graph.build_graph()
draw_graph.draw_graph()
画出的图为:
3,hadoop mapreduce 求解图中每个节点的三角形个数
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class MatrixMutiply {
/*
* 矩阵存放在一个文件里面。
* 刚开始两个矩阵放在一个文件里面,hadoop会为两个文件做两次map导致先做一次map和reduce,
* 这样另外一个矩阵就没有数据,后面的reduce会出现问题
* 矩阵存放的形式是:
* A,1,1,2
表示A矩阵第一行第一列数据为2
* A,1,2,1
* A,2,1,3
* A,2,2,4
* 这样存放的目的是防止一次map在读取数据时分片而导致数据读取不完整
* 矩阵由python脚本产生,python脚本见BuildMatrix.py
*
* */
private static int colNumB = 10;
private static int rowNumA = 10;
public static class MatrixMapper extends Mapper<Object, Text, Text, Text>{
/*
* rowNumA and colNumB need to be confirm manually
* map阶段:
* 将数据组织为KEY VALUE的形式
* key:结果矩阵的元素的位置号
* value:结果矩阵元素需要用到的原两个矩阵的数据
* 要注意运算矩阵前矩阵和后矩阵在map阶段处理数据在组织map输出数据时不一样
*
* */
private Text mapOutputkey;
private Text mapOutputvalue;
@Override
protected void map(Object key, Text value,
Mapper<Object, Text, Text, Text>.Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
System.out.println("map input key:" + key);
System.out.println("map input value:" + value);
String[] matrixStrings = value.toString().split("n");
for(String item : matrixStrings){
System.out.println("item:"+ item);
String[] elemString = item.split(",");
for(String string : elemString){
System.out.println("element" + string);
}
System.out.println("elemString[0]:"+elemString[0]);
if(elemString[0].equals("A")){
// 此处一定要用equals,而不能用==来判断
/*
* 对A矩阵进行map化,outputkey outputvalue 在组织上要注意细节,处理好细节
* */
for(int i=1; i<=colNumB; i++){
mapOutputkey = new Text(elemString[1] + "," + String.valueOf(i));
mapOutputvalue = new Text("A:" + elemString[2] + "," + elemString[3]);
context.write(mapOutputkey, mapOutputvalue);
System.out.println("mapoutA:"+mapOutputkey+mapOutputvalue);
}
}
/*
* 对B矩阵map,mapoutput的组织和A矩阵的不同,细节要处理好
* */
else if(elemString[0].equals("B")){
for(int j=1; j<=rowNumA; j++){
mapOutputkey = new Text(String.valueOf(j) + "," + elemString[2]);
mapOutputvalue = new Text("B:" + elemString[1] + "," + elemString[3]);
context.write(mapOutputkey, mapOutputvalue);
System.out.println("mapoutB"+mapOutputkey+mapOutputvalue);
}
}
else{
// just for debug
System.out.println("mapout else else :--------------->"+ item);
}
}
}
}
public static class MatixReducer extends Reducer<Text, Text, Text, Text> {
private HashMap<String, String> MatrixAHashmap = new HashMap<String, String>();
private HashMap<String, String> MatrixBHashmap = new HashMap<String, String>();
private String val;
@Override
protected void reduce(Text key, Iterable<Text> value,
Reducer<Text, Text, Text, Text>.Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
System.out.println("reduce input key:" + key);
System.out.println("reduce input value:" + value.toString());
for(Text item : value){
val = item.toString();
System.out.println("val------------"+val);
if(!val.equals("0")){
String[] kv = val.substring(2).split(",");
if(val.startsWith("A:")){
MatrixAHashmap.put(kv[0], kv[1]);
}
if(val.startsWith("B:")){
MatrixBHashmap.put(kv[0], kv[1]);
}
}
}
/*just for debug*/
System.out.println("hashmapA:"+MatrixAHashmap);
System.out.println("hashmapB:"+MatrixBHashmap);
Iterator<String> iterator = MatrixAHashmap.keySet().iterator();
int sum = 0;
while(iterator.hasNext()){
String keyString = iterator.next();
sum += Integer.parseInt(MatrixAHashmap.get(keyString))*
Integer.parseInt(MatrixBHashmap.get(keyString));
}
//LongWritable reduceOutputvalue = new LongWritable(sum);
Text reduceOutputvalue = new Text(String.valueOf(sum));
context.write(key, reduceOutputvalue);
/*just for debug*/
System.out.println("reduce output key:" + key);
System.out.println("reduce output value:" + reduceOutputvalue);
}
}
public static class TriangleMapper extends Mapper<Object, Text, Text, Text>{
@Override
protected void map(Object key, Text value,
Mapper<Object, Text, Text, Text>.Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
/*
* map 将矩阵相乘的结果作为输入
* map input key 是hadoop自己分配
* map input value 就是矩阵相乘的结果文件中的每一行
* 对map input value 进行处理
* map output key: 行号
* map output value: 元素的列号+","+元素值
* for example:
* key:1
* value:1,2
* */
String[] valueString = value.toString().split("t");
String[] keyItems = valueString[0].split(",");
Text outputKey = new Text(keyItems[0]);
Text outputValue = new Text(keyItems[1] + "," + valueString[1]);
context.write(outputKey, outputValue);
}
}
public static class TriangleReducer extends Reducer<Text, Text, Text, Text>{
private String[] matrix = new String[colNumB*colNumB];
private boolean readGlobalMatrixFlag = false;
private int[] rowValue = new int[colNumB];
/*
* 得到原始矩阵的邻接矩阵
* */
private void getGlobalMatrix() {
// TODO Auto-generated method stub
String ADJ_MATRIX_PATH = "/home/taohao/PycharmProjects/Webs/pythonScript/Matrix/AdjMatrix.txt";
File file = new File(ADJ_MATRIX_PATH);
BufferedReader bufferedReader = null;
try {
bufferedReader = new BufferedReader(new FileReader(file));
String line = null;
while((line = bufferedReader.readLine()) != null){
String[] items = line.split("[,n]");
if(items[0].equals("A")){
matrix[(Integer.parseInt(items[1])-1)* colNumB + Integer.parseInt(items[2]) - 1] = items[3];
}
}
bufferedReader.close();
} catch (Exception e) {
// TODO: handle exception
System.out.println(e.toString());
}
}
@Override
protected void reduce(Text key, Iterable<Text> value,
Reducer<Text, Text, Text, Text>.Context context)
throws IOException, InterruptedException {
/*
* 以行为单位去求解三角形
*
* */
// TODO Auto-generated method stub
if(!readGlobalMatrixFlag){
getGlobalMatrix();
readGlobalMatrixFlag = true;
}
Iterator<Text> iterator = value.iterator();
int rowSum = 0;
while(iterator.hasNext()){
/*
* 注意此处要以reduce input value 中的数来标记元素是哪一列的
* 因为reduce输入的不一定是从前到后的,会是乱序
* */
String[] valueItems = iterator.next().toString().split(",");
rowValue[Integer.parseInt(valueItems[0])-1] = Integer.parseInt(valueItems[1]);
}
int rowKey = Integer.parseInt(key.toString());
for(int i = 0; i < colNumB; i++){
if(matrix[i + (rowKey-1)*colNumB].equals("1")){
rowSum += rowValue[i];
}
}
rowSum = rowSum / 2;
Text outputValue = new Text(String.valueOf(rowSum));
context.write(key, outputValue);
}
}
public static void main(String[] args) throws Exception{
Configuration conf = new Configuration();
Configuration confTriangle = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if(otherArgs.length != 2){
System.err.println("Usage: matrix <in> <out>");
System.exit(2);
}
Job job = Job.getInstance(conf, "matrix");
job.setJarByClass(MatrixMutiply.class);
job.setMapperClass(MatrixMapper.class);
/*按照思路,这里不需要combiner操作,不需指明*/
//
job.setCombinerClass(MatixReducer.class);
job.setReducerClass(MatixReducer.class);
/*这两个outputkeyclass outputvalueclass 对map output 和 reduce output同时起作用*/
/*注意是同时,所以在指定map 和 reduce的输出时要一致*/
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
job.waitForCompletion(true);
Job jobTriangle = Job.getInstance(confTriangle, "triangle");
jobTriangle.setJarByClass(MatrixMutiply.class);
jobTriangle.setMapperClass(TriangleMapper.class);
jobTriangle.setReducerClass(TriangleReducer.class);
jobTriangle.setOutputKeyClass(Text.class);
jobTriangle.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(jobTriangle, new Path("/trianglematrixoutput/part-r-00000"));
FileOutputFormat.setOutputPath(jobTriangle, new Path("/triangleoutput"));
System.exit(jobTriangle.waitForCompletion(true) ? 0 : 1);
}
}
有两轮mapreduce :
第一轮做矩阵相乘,邻接矩阵自乘,结果输出到一个目录下面
第二轮,将邻接矩阵自乘的结果作为输入,通过对相乘的结果和原邻接矩阵进行分析得到最终的结果
每一轮mapreduce需要一个job来控制,因此这里要启动两个job实例来做两轮mapreduce
最后
以上就是幸福魔镜为你收集整理的Hadoop 分析图中节点的重要性,求解图中节点三角形个数的全部内容,希望文章能够帮你解决Hadoop 分析图中节点的重要性,求解图中节点三角形个数所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复