-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmr_sequential.cc
128 lines (103 loc) · 2.94 KB
/
mr_sequential.cc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
//
// A simple sequential MapReduce for WordCount
//
#include <string>
#include <sstream>
#include <fstream>
#include <iostream>
#include <vector>
#include <algorithm>
using namespace std;
typedef struct {
string key;
string val;
}
KeyVal;
//
// The map function is called once for each file of input. The first
// argument is the name of the input file, and the second is the
// file's complete contents. You should ignore the input file name,
// and look only at the contents argument. The return value is a slice
// of key/value pairs.
//
vector<KeyVal> Map(const string &filename, const string &content)
{
// Your code goes here
// Hints: split contents into an array of words.
int pos = 0;
vector<KeyVal> res;
while (pos < content.size())
{
if (!isalpha(content[pos])) {
++pos;
continue;
}
KeyVal kv;
kv.val = "1";
while(isalpha(content[pos])){
kv.key += content[pos];
++pos;
}
res.push_back(kv);
}
return res;
}
//
// The reduce function is called once for each key generated by the
// map tasks, with a list of all the values created for that key by
// any map task.
//
string Reduce(const string &key, const vector <string> &values)
{
// Your code goes here
// Hints: return the number of occurrences of the word.
return to_string(values.size());
}
int main(int argc, char ** argv)
{
if (argc < 2) {
cout << "Usage: mrsequential inputfiles...\n";
exit(1);
}
vector <string> filename;
vector <KeyVal> intermediate;
//
// read each input file,
// pass it to Map,
// accumulate the intermediate Map output.
//
for (int i = 1; i < argc; ++i) {
string filename = argv[i];
string content;
// Read the whole file into the buffer.
getline(ifstream(filename), content, '\0');
vector <KeyVal> KVA = Map(filename, content);
intermediate.insert(intermediate.end(), KVA.begin(), KVA.end());
}
//
// a big difference from real MapReduce is that all the
// intermediate data is in one place, intermediate[],
// rather than being partitioned into NxM buckets.
//
sort(intermediate.begin(), intermediate.end(),
[](KeyVal const & a, KeyVal const & b) {
return a.key < b.key;
});
//
// call Reduce on each distinct key in intermediate[],
// and print the result to mr-out-0.
//
for (unsigned int i = 0; i < intermediate.size();) {
unsigned int j = i + 1;
for (; j < intermediate.size() && intermediate[j].key == intermediate[i].key;)
j++;
vector < string > values;
for (unsigned int k = i; k < j; k++) {
values.push_back(intermediate[k].val);
}
string output = Reduce(intermediate[i].key, values);
printf("%s %s\n", intermediate[i].key.data(), output.data());
i = j;
}
return 0;
}