The two datasets are:
#users.txt (student id, name) 1,Robin Dong 2,Timi Yang 3,Olive Xu 4,Jenny Xu 5,Elsa Dong 6,Coly Wang 7,Hulk Li 8,Judy Lao 9,Kevin Liu 10,House Zhang
#scores.txt (student id, course, score) 1,Math,90 1,Physics,80 3,Music,70 5,Math,80 7,Geography,70 1,Geography,60 2,Physics,70 6,Math,70 4,Music,90 6,Geography,75 9,Geography,85 10,Music,95 2,Physics,78 2,Music,73 2,Math,84 4,Math,61 4,Physics,65 5,Music,66 5,Math,90
To join the two tables above by “student id”, we need to use MultipleInputs. The code is:
import java.io.IOException;
import java.util.Iterator;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
public class School {
public static class UserMapper
extends Mapper {
private String uid, name;
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
String arr[] = line.split(",");
uid = arr[0].trim();
name = arr[1].trim();
context.write(new Text(uid), new Text(name));
}
}
public static class ScoreMapper
extends Mapper {
private String uid, course, score;
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
String arr[] = line.split(",");
uid = arr[0].trim();
course = arr[1].trim();
score = arr[2].trim();
context.write(new Text(uid), new Text(course + "," + score));
}
}
public static class InnerJoinReducer extends Reducer {
@Override
public void reduce(Text key, Iterable values, Context context)
throws IOException, InterruptedException {
String name = "";
List courses = new ArrayList();
List scores = new ArrayList();
for (Text value : values) {
String cur = value.toString();
if (cur.contains(",")) {
String arr[] = cur.split(",");
courses.add(arr[0]);
scores.add(arr[1]);
} else {
name = cur;
}
}
if (!name.isEmpty() && !courses.isEmpty() && !scores.isEmpty()) {
for (int i = 0; i < courses.size(); i++) {
context.write(new Text(name), new Text(courses.get(i) + "," + scores.get(i)));
}
}
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "School");
job.setJarByClass(School.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
MultipleInputs.addInputPath(job, new Path(args[0]), TextInputFormat.class, UserMapper.class);
MultipleInputs.addInputPath(job, new Path(args[1]), TextInputFormat.class, ScoreMapper.class);
job.setReducerClass(InnerJoinReducer.class);
FileOutputFormat.setOutputPath(job, new Path(args[2]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
Compile and run it:
~/hadoop-2.7.2/bin/hadoop com.sun.tools.javac.Main School.java -Xlint:unchecked jar cf school.jar School*.class bin/hadoop jar ~/school.jar School /users.txt /scores.txt /my
And the result in /my is:
Robin Dong Geography,60 Robin Dong Physics,80 Robin Dong Math,90 House Zhang Music,95 Timi Yang Physics,70 Timi Yang Math,84 Timi Yang Music,73 Timi Yang Physics,78 Olive Xu Music,70 Jenny Xu Physics,65 Jenny Xu Math,61 Jenny Xu Music,90 Elsa Dong Math,90 Elsa Dong Music,66 Elsa Dong Math,80 Coly Wang Geography,75 Coly Wang Math,70 Hulk Li Geography,70 Kevin Liu Geography,85