In previous article, we used Spark to sort large dataset generated by Teragen. But it cost too much time than Hadoop Mapreduce framework, so we are going to optimize it.

By looking at the Spark UI for profiling, we find out the “Shuffle” read/write too much data from/to the hard-disk, this will surely hurt the performance severely.




In “Terasort” of Hadoop, it use “class TotalOrderPartition” to map all the data to a large mount of partitions by ordering, so every “Reduce” job only need to sort data in one task (almost don’t need any shuffle from other partition). This will save a lot of network bandwidth and CPU usage.

Therefore we could modify our Scala code to sort every partition locally:

    logData.partitionBy(new TeraSortPartitioner(512))
      .mapPartitions(iter => {
        iter.toVector.sortBy(kv => kv._1.getBytes).iterator
      })
      .saveAsNewAPIHadoopFile[TeraOutputFormat]("hdfs://127.0.0.1/output")

and the spark-submit should also be changed:

./bin/spark-submit --class TerasortApp \
  --master yarn \
  --deploy-mode cluster \
  --driver-memory 2000M \
  --executor-memory 5200M \
  --executor-cores 1 \
  --num-executors 64 \
  --conf spark.yarn.executor.memoryOverhead=900 \
  --conf spark.shuffle.memoryFraction=0.6 \
  --conf spark.kryoserializer.buffer.max=2000m \
  --conf "spark.executor.extraJavaOptions=-XX:+UseG1GC" \
  --queue spark \
  /home/sanbai/myspark/target/scala-2.10/Terasort_2.10-1.0.jar

This time, the job only cost 10 minutes for sorting data!

Screenshot from “Job Browser” of Hue: