概述
大数据分析,同步图计算,三角形计数,graphlite
#题目要求
#代码实现
1.定义结构体存统计结果
2.图输入
3.结果输出(定义类外的变量以从计算函数类vertex传出结果)
4.对Aggregator的输入类型更改,涉及到对指针的使用https://blog.csdn.net/liu100m/article/details/90731422
5.计算函数注意超步中变量的存在周期,compute函数中的变量只存在于一个超步,新的超步会更新。用map函数在vertex类中存每个点的出边和入边
6.注意迭代器的使用,find函数,end函数;
#include <stdio.h>
#include <string.h>
#include <vector>
#include <math.h>
#include <map>
#include <algorithm>
#include "GraphLite.h"
#define VERTEX_CLASS_NAME(name) DirectedTriangleCount##name
#define EPS 1e-6
typedef struct result{
int through_num;
int cycle_num;
}RESULT;
class VERTEX_CLASS_NAME(InputFormatter): public InputFormatter {
public:
int64_t getVertexNum() {
unsigned long long n;
sscanf(m_ptotal_vertex_line, "%lld", &n);
m_total_vertex= n;
return m_total_vertex;
}
int64_t getEdgeNum() {
unsigned long long n;
sscanf(m_ptotal_edge_line, "%lld", &n);
m_total_edge= n;
return m_total_edge;
}
int getVertexValueSize() {
m_n_value_size = sizeof(int);
return m_n_value_size;
}
int getEdgeValueSize() {
m_e_value_size = sizeof(int);
return m_e_value_size;
}
int getMessageValueSize() {
m_m_value_size = sizeof(int64_t);
return m_m_value_size;
}
void loadGraph() {
unsigned long long last_vertex;
unsigned long long from;
unsigned long long to;
int weight = 0;
int value = 1;
int outdegree = 0;
const char *line= getEdgeLine();
// Note: modify this if an edge weight is to be read
//
modify the 'weight' variable
sscanf(line, "%lld %lld", &from, &to);
addEdge(from, to, &weight);
last_vertex = from;
++outdegree;
for (int64_t i = 1; i < m_total_edge; ++i) {
line= getEdgeLine();
// Note: modify this if an edge weight is to be read
//
modify the 'weight' variable
sscanf(line, "%lld %lld", &from, &to);
if (last_vertex != from) {
addVertex(last_vertex, &value, outdegree);
last_vertex = from;
outdegree = 1;
} else {
++outdegree;
}
addEdge(from, to, &weight);
}
addVertex(last_vertex, &value, outdegree);
}
};
int num_through;
int num_cycle;
class VERTEX_CLASS_NAME(OutputFormatter): public OutputFormatter {
public:
void writeResult() {
char s[1024];
int n1 = sprintf(s,"in: %lldn",(unsigned long long)num_through);
writeNextResLine(s, n1);
int n2 = sprintf(s,"out: %lldn",(unsigned long long)num_through);
writeNextResLine(s, n2);
int n3 = sprintf(s,"through: %lldn",(unsigned long long)num_through);
writeNextResLine(s, n3);
int n4 = sprintf(s,"cycle: %lldn",(unsigned long long)num_cycle);
writeNextResLine(s, n4);
}
};
// An aggregator that records a int value which denote the number of vertex that send message
class VERTEX_CLASS_NAME(Aggregator): public Aggregator<RESULT> {
private:
RESULT re ={0,0};
public:
void init() {
m_global = re;
m_local = re;
}
void* getGlobal() {
return &m_global;
}
void setGlobal(const void* p) {
m_global.through_num =(* (RESULT *)p).through_num;
m_global.cycle_num = (* (RESULT *)p).cycle_num;
}
void* getLocal() {
return &m_local;
}
void merge(const void* p) {
m_global.through_num +=( * (RESULT *)p).through_num;
m_global.cycle_num += (* (RESULT *)p).cycle_num;
}
void accumulate(const void* p) {
m_local.through_num += (* (RESULT *)p).through_num;
m_local.cycle_num += (* (RESULT *)p).cycle_num;
}
};
class VERTEX_CLASS_NAME(): public Vertex <int, int, int64_t> {
private:
map<int64_t, vector<int64_t>> from; // fatherSet
map<int64_t, vector<int64_t>> to; // sonSet
public:
void compute(MessageIterator* pmsgs) {
int64_t vertexid = getVertexId();
//the 0 step:get the outdegree and send message to neighbors
if (getSuperstep() == 0) {
//out_degree = getOutEdgeIterator().size();
vector<int64_t>& to_vertexid = to[vertexid];
OutEdgeIterator p = getOutEdgeIterator();
for(;!p.done();p.next()){
int64_t temp = p.target();
to_vertexid.push_back(temp);
}
//send a signal to the neighbors to count the in_degree
int signal = 1;
sendMessageToAllNeighbors(signal);
}
else if(getSuperstep() == 1) {//the 1 step count every vertex's through_num and cycle_num
vector<int64_t>& from_vertexid = from[vertexid];
int in_degree = 0;
for (; !pmsgs->done(); pmsgs->next()) {
// messages come from this vertex's from neighbors which can be used to get in_degree
in_degree += pmsgs->getValue();
from_vertexid.push_back(((Msg *)(pmsgs->getCurrent()))->s_id);
}
for(int i = 0;i < in_degree;i++) {
sendMessageToAllNeighbors(from_vertexid[i]);//send own from_vertexid to neighbors
}
}
else if (getSuperstep() == 2) {
vector < int64_t > from_from;
for (; !pmsgs->done(); pmsgs->next()) {
// messages come from this vertex's
neighbors which is neighbor's neighbor
from_from.push_back(pmsgs->getValue());
}
vector<int64_t>& to_vertexid = to[vertexid];
//from_from = my_to which is a cycle triangle
for(vector<int64_t>::iterator iter = from_from.begin();iter != from_from.end(); ++iter){
if(find(to_vertexid.begin(), to_vertexid.end(), *iter) != to_vertexid.end()){
RESULT num;
num.through_num = 0;
num.cycle_num = 1;
accumulateAggr(0, &num);
}
}
vector<int64_t>& from_vertexid = from[vertexid];
//from_from = my_from which is a through triangle
for(vector<int64_t>::iterator iter = from_from.begin(); iter!= from_from.end(); ++iter){
if(find(from_vertexid.begin(), from_vertexid.end(), *iter) != from_vertexid.end()){
RESULT num;
num.through_num = 1;
num.cycle_num = 0;
accumulateAggr(0, &num);
}
}
}else if(getSuperstep() == 3){
RESULT arr = * (result *)getAggrGlobal(0);
num_through = arr.through_num;// set through_num from global_value of aggregator
num_cycle = arr.cycle_num ; // set cycle_num from global_value of aggregator
voteToHalt();
return;
}
}
};
class VERTEX_CLASS_NAME(Graph): public Graph {
public:
VERTEX_CLASS_NAME(Aggregator)* aggregator;
public:
// argv[0]: DirectedTriangleCount.so
// argv[1]: <input path>
// argv[2]: <output path>
void init(int argc, char* argv[]) {
setNumHosts(5);
setHost(0, "localhost", 1411);
setHost(1, "localhost", 1421);
setHost(2, "localhost", 1431);
setHost(3, "localhost", 1441);
setHost(4, "localhost", 1451);
if (argc < 3) {
printf ("Usage: %s <input path> <output path>n", argv[0]);
exit(1);
}
m_pin_path = argv[1];
m_pout_path = argv[2];
aggregator = new VERTEX_CLASS_NAME(Aggregator)[1];
regNumAggr(1);
regAggr(0, &aggregator[0]);
}
void term() {
delete[] aggregator;
}
};
/* STOP: do not change the code below. */
extern "C" Graph* create_graph() {
Graph* pgraph = new VERTEX_CLASS_NAME(Graph);
pgraph->m_pin_formatter = new VERTEX_CLASS_NAME(InputFormatter);
pgraph->m_pout_formatter = new VERTEX_CLASS_NAME(OutputFormatter);
pgraph->m_pver_base = new VERTEX_CLASS_NAME();
return pgraph;
}
extern "C" void destroy_graph(Graph* pobject) {
delete ( VERTEX_CLASS_NAME()* )(pobject->m_pver_base);
delete ( VERTEX_CLASS_NAME(OutputFormatter)* )(pobject->m_pout_formatter);
delete ( VERTEX_CLASS_NAME(InputFormatter)* )(pobject->m_pin_formatter);
delete ( VERTEX_CLASS_NAME(Graph)* )pobject;
}
最后
以上就是野性蜜蜂为你收集整理的大数据分析,同步图计算,三角形计数,graphlite的全部内容,希望文章能够帮你解决大数据分析,同步图计算,三角形计数,graphlite所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复