-
Notifications
You must be signed in to change notification settings - Fork 0
/
Step2.java
164 lines (147 loc) · 6.01 KB
/
Step2.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.net.URI;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.*;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
//public static class CustomInputFormat extends FileInputFormat{
// public static class RecordReaderClass extends RecordReader<Text,IntWritable>{
// LineRecordReader reader;
//
// @Override
// public void initialize(InputSplit split, TaskAttemptContext context)
// throws IOException {
// reader.initialize(split, context);
// }
//
// @Override
// public void close() throws IOException {
// reader.close();
// }
//
// @Override
// public boolean nextKeyValue() throws IOException, InterruptedException {
// return reader.nextKeyValue();
// }
//
// @Override
// public Text getCurrentKey() throws IOException, InterruptedException {
// return new Text(reader.getCurrentValue().toString().split(" ")[0]); //Trigram
// }
//
// @Override
// public IntWritable getCurrentValue() throws IOException, InterruptedException {
// return new IntWritable(Integer.parseInt(reader.getCurrentValue().toString().split(" ")[1]));
// }
//
// @Override
// public float getProgress() throws IOException, InterruptedException {
// return reader.getProgress();
// }
//
// }
//
// @Override
// public RecordReader<Text,IntWritable> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
// return new RecordReaderClass();
// }
//}
public class Step2 {
enum TotalOcc {
TotalOcc_num
}
public static class TaggedValue implements Writable {
private Text Tag;
private IntWritable Value;
public TaggedValue(){
Tag = null;
Value = null;
}
public TaggedValue(Text Tag,IntWritable Value){
this.Tag=Tag;
this.Value=Value;
}
public Text getTag() {
return Tag;
}
public Writable getValue() {
return Value;
}
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeUTF(Tag.toString());
dataOutput.writeInt(Value.get());
}
public void readFields(DataInput dataInput) throws IOException {
Tag = new Text(dataInput.readUTF());
Value = new IntWritable(dataInput.readInt());
}
}
public static class MapperClass extends Mapper<LongWritable,Text,Text,TaggedValue> {
// The map gets a key and tagged value (of 2 types) and emits the key and the value
@Override
public void map(LongWritable key, Text Value, Context context) throws IOException, InterruptedException {
String fileName = ((FileSplit) context.getInputSplit()).getPath().getName();
String str = fileName.substring(0, 1);
String tagValue = str.equals("0")? "0":"1";
String[] value = Value.toString().split("\t");
context.getCounter(TotalOcc.TotalOcc_num).increment(Long.parseLong(value[1]));
context.write(new Text(value[0]), new TaggedValue(new Text(tagValue),new IntWritable(Integer.parseInt(value[1]))));
}
}
public static class PartitionerClass extends Partitioner<Text,TaggedValue> {
public int getPartition(Text key, TaggedValue value, int numPartitions) {
return key.hashCode();
}
}
public static class ReducerClass extends Reducer<Text,TaggedValue,Text,Text> {
@Override
public void reduce(Text key,Iterable<TaggedValue> taggedValues, Context context) throws IOException, InterruptedException {
TaggedValue val0 = taggedValues.iterator().next();
String val2 = taggedValues.iterator().hasNext() ? taggedValues.iterator().next().getValue().toString() : "0";
if (val0.getTag().toString().equals("0"))
context.write(key,new Text(val0.getValue().toString()+" "+val2));// tr:r0 r1
else{
context.write(key,new Text(val2+" "+val0.getValue().toString()));
}
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = new Job(conf, "Combine r0 r1");
FileSystem.setDefaultUri(conf, new URI(args[1]));
//set format for input and output
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
//path to input and output files
FileInputFormat.addInputPath(job,new Path(args[0]));
FileInputFormat.addInputPath(job,new Path(args[1]));
FileSystem.get(conf).delete(new Path(args[2]),true);
FileOutputFormat.setOutputPath(job,new Path(args[2]));
job.setJarByClass(Step2.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(TaggedValue.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setMapperClass(MapperClass.class);
job.setReducerClass(ReducerClass.class);
job.setPartitionerClass(PartitionerClass.class);
if (job.waitForCompletion(true)){
//Create a path
Path hdfswritepath = new Path( "s3://input-file-hadoop/n");
//Init output stream
FSDataOutputStream outputStream=FileSystem.get(conf).create(hdfswritepath);
outputStream.writeUTF(String.valueOf(job.getCounters().findCounter(TotalOcc.TotalOcc_num).getValue()));
outputStream.close();
System.exit(0);
}
System.exit(1);
}
}