After downloading data from “Food and Agriculture Organization of United Nations”, I get many CSV files. One of the file is named “Trade_Crops_Livestock_E_All_Data_(Normalized).csv” and it looks like:

Area Code,Area,Item Code,Item,Element Code,Element,Year Code,Year,Unit,Value,Flag
"2","Afghanistan","231","Almonds shelled","5910","Export Quantity","1961","1961","tonnes","0.000000",""
"2","Afghanistan","231","Almonds shelled","5910","Export Quantity","1962","1962","tonnes","0.000000",""
"2","Afghanistan","231","Almonds shelled","5910","Export Quantity","1963","1963","tonnes","0.000000",""
......

To load this CSV file into Spark and dump it to Parquet format, I wrote these codes:

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.rdd._

/* Area Code,Area,Item Code,Item,Element Code,Element,Year Code,Year,Unit,Value,Flag */
case class Trade(area_code:Int, area:String, item_code:Int, item:String, element_code:Int,
                 element:String, year:Int, unit:String, value:Double, flag:String)

object TradeCrops {
  def scrub(str:String):String = {
    return str.replace("\"", "")
  }

  def toInt(str:String):Int = {
    try {
      return scrub(str).toInt
    } catch {
      case e:Throwable => {
        return 0
      }
    }
  }

  def toDouble(str:String):Double = {
    try {
      return scrub(str).toDouble
    } catch {
      case e:Throwable => {
        return 0
      }
    }
  }

  def toTrade(line:String):Trade = {
    val fields = line.split("\",")
    Trade(
      toInt(fields(0)),
      scrub(fields(1)),
      toInt(fields(2)),
      scrub(fields(3)),
      toInt(fields(4)),
      scrub(fields(5)),
      toInt(fields(7)),
      scrub(fields(8)),
      toDouble(fields(9)),
      scrub(fields(10))
    )
  }

  def main(args: Array[String]) {
    val conf = new SparkConf()
      .setAppName("Trade Crops Application")
    val sc = new SparkContext(conf)
    val spark = SparkSession.builder()
        .appName("Spark SQL Trade Crops")
        .getOrCreate()
    val file = sc.textFile("hdfs:///FAO/Trade_Crops.csv")
    val tradeRDD = file.filter(_.split("\",").length == 11).map(toTrade(_))
    val tradeDF = spark.createDataFrame(tradeRDD)
    tradeDF.write.parquet("hdfs:///FAO/Trade_Crops.parquet")
  }
}

The build.sbt is

lazy val root = (project in file("."))
  .settings(
    name := "FAO",
    version := "1.0",
    scalaVersion := "2.11.7",
    unmanagedJars in Compile += file("/usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples-2.6.0-cdh5.7.6.jar"),
    libraryDependencies ++= Seq(
      "org.apache.spark" % "spark-core_2.11" % "2.1.1",
      "org.apache.spark" % "spark-sql_2.11" % "2.1.1",
      "org.apache.hadoop" % "hadoop-client" % "2.6.0"
    )
  )

Always remember to add dependency for “spark-sql” or else it will report “createDataFrame() if not a member of spark”.
And finally, the submit script is:

/disk1/spark-2.1.1-bin-hadoop2.6/bin/spark-submit --class TradeCrops \
  --master yarn \
  --driver-memory 2G \
  --executor-memory 2G \
  --executor-cores 1 \
  --num-executors 64 \
  ./target/scala-2.11/FAO_2.11-1.0.jar