The two datasets are:

#users.txt (student id, name)
1,Robin Dong
2,Timi Yang
3,Olive Xu
4,Jenny Xu
5,Elsa Dong
6,Coly Wang
7,Hulk Li
8,Judy Lao
9,Kevin Liu
10,House Zhang
#scores.txt (student id, course, score)
1,Math,90
1,Physics,80
3,Music,70
5,Math,80
7,Geography,70
1,Geography,60
2,Physics,70
6,Math,70
4,Music,90
6,Geography,75
9,Geography,85
10,Music,95
2,Physics,78
2,Music,73
2,Math,84
4,Math,61
4,Physics,65
5,Music,66
5,Math,90

To join the two tables above by “student id”, we need to use MultipleInputs. The code is:

import java.io.IOException;
import java.util.Iterator;
import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;

public class School {

  public static class UserMapper
    extends Mapper {

    private String uid, name;

    @Override
    public void map(LongWritable key, Text value, Context context)
      throws IOException, InterruptedException {
      String line = value.toString();
      String arr[] = line.split(",");
      uid = arr[0].trim();
      name = arr[1].trim();
      context.write(new Text(uid), new Text(name));
    }
  }

  public static class ScoreMapper
      extends Mapper {

      private String uid, course, score;

      @Override
      public void map(LongWritable key, Text value, Context context)
        throws IOException, InterruptedException {
        String line = value.toString();
        String arr[] = line.split(",");
        uid = arr[0].trim();
        course = arr[1].trim();
        score = arr[2].trim();

        context.write(new Text(uid), new Text(course + "," + score));
      }
  }

  public static class InnerJoinReducer extends Reducer {

    @Override
    public void reduce(Text key, Iterable values, Context context)
    throws IOException, InterruptedException {
      String name = "";
      List courses = new ArrayList();
      List scores = new ArrayList();

      for (Text value : values) {
        String cur = value.toString();
        if (cur.contains(",")) {
          String arr[] = cur.split(",");
          courses.add(arr[0]);
          scores.add(arr[1]);
        } else {
          name = cur;
        }
      }

      if (!name.isEmpty() && !courses.isEmpty() && !scores.isEmpty()) {
        for (int i = 0; i < courses.size(); i++) {
          context.write(new Text(name), new Text(courses.get(i) + "," + scores.get(i)));
        }
      }
    }
  }

  public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    Job job = Job.getInstance(conf, "School");
    job.setJarByClass(School.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(Text.class);
    MultipleInputs.addInputPath(job, new Path(args[0]), TextInputFormat.class, UserMapper.class);
    MultipleInputs.addInputPath(job, new Path(args[1]), TextInputFormat.class, ScoreMapper.class);
    job.setReducerClass(InnerJoinReducer.class);
    FileOutputFormat.setOutputPath(job, new Path(args[2]));
    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }
}

Compile and run it:

~/hadoop-2.7.2/bin/hadoop com.sun.tools.javac.Main School.java -Xlint:unchecked
jar cf school.jar School*.class
bin/hadoop jar ~/school.jar School /users.txt /scores.txt /my

And the result in /my is:

Robin Dong      Geography,60
Robin Dong      Physics,80
Robin Dong      Math,90
House Zhang     Music,95
Timi Yang       Physics,70
Timi Yang       Math,84
Timi Yang       Music,73
Timi Yang       Physics,78
Olive Xu        Music,70
Jenny Xu        Physics,65
Jenny Xu        Math,61
Jenny Xu        Music,90
Elsa Dong       Math,90
Elsa Dong       Music,66
Elsa Dong       Math,80
Coly Wang       Geography,75
Coly Wang       Math,70
Hulk Li Geography,70
Kevin Liu       Geography,85