-
Notifications
You must be signed in to change notification settings - Fork 0
/
pregel_app_hashmin.h
119 lines (107 loc) · 2.32 KB
/
pregel_app_hashmin.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
#include "basic/pregel-dev.h"
using namespace std;
//input line format: vertexID \t numOfNeighbors neighbor1 neighbor2 ...
//output line format: v \t min_vertexID(v's connected component)
struct CCValue_pregel
{
int color;
vector<VertexID> edges;
};
ibinstream & operator<<(ibinstream & m, const CCValue_pregel & v){
m<<v.color;
m<<v.edges;
return m;
}
obinstream & operator>>(obinstream & m, CCValue_pregel & v){
m>>v.color;
m>>v.edges;
return m;
}
//====================================
class CCVertex_pregel:public Vertex<VertexID, CCValue_pregel, VertexID>
{
public:
void broadcast(VertexID msg)
{
vector<VertexID> & nbs=value().edges;
for(int i=0; i<nbs.size(); i++)
{
send_message(nbs[i], msg);
}
}
virtual void compute(MessageContainer & messages)
{
if(step_num()==1)
{
VertexID min=id;
vector<VertexID> & nbs=value().edges;
for(int i=0; i<nbs.size(); i++)
{
if(min>nbs[i]) min=nbs[i];
}
value().color=min;
broadcast(min);
vote_to_halt();
}
else
{
VertexID min=messages[0];
for(int i=1; i<messages.size(); i++)
{
if(min>messages[i]) min=messages[i];
}
if(min<value().color)
{
value().color=min;
broadcast(min);
}
vote_to_halt();
}
}
};
class CCWorker_pregel:public Worker<CCVertex_pregel>
{
char buf[100];
public:
//C version
virtual CCVertex_pregel* toVertex(char* line)
{
char * pch;
pch=strtok(line, " ");
CCVertex_pregel* v=new CCVertex_pregel;
v->id=atoi(pch);
pch=strtok(NULL, " ");
int num=atoi(pch);
for(int i=0; i<num; i++)
{
pch=strtok(NULL, " ");
v->value().edges.push_back(atoi(pch));
}
return v;
}
virtual void toline(CCVertex_pregel* v, BufferedWriter & writer)
{
sprintf(buf, "%d\t%d\n", v->id, v->value().color);
writer.write(buf);
}
};
class CCCombiner_pregel:public Combiner<VertexID>
{
public:
virtual void combine(VertexID & old, const VertexID & new_msg)
{
if(old>new_msg) old=new_msg;
}
};
void pregel_hashmin(string in_path, string out_path, bool use_combiner)
{
WorkerParams param;
param.input_path=in_path;
param.output_path=out_path;
param.force_write=true;
param.native_dispatcher=false;
CCWorker_pregel worker;
CCCombiner_pregel combiner;
if(use_combiner) worker.setCombiner(&combiner);
worker.run(param);
}