After downloading data from “Food and Agriculture Organization of United Nations”, I get many CSV files. One of the file is named “Trade_Crops_Livestock_E_All_Data_(Normalized).csv” and it looks like:
Area Code,Area,Item Code,Item,Element Code,Element,Year Code,Year,Unit,Value,Flag
"2","Afghanistan","231","Almonds shelled","5910","Export Quantity","1961","1961","tonnes","0.000000",""
"2","Afghanistan","231","Almonds shelled","5910","Export Quantity","1962","1962","tonnes","0.000000",""
"2","Afghanistan","231","Almonds shelled","5910","Export Quantity","1963","1963","tonnes","0.000000",""
......
To load this CSV file into Spark and dump it to Parquet format, I wrote these codes:
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.rdd._
/* Area Code,Area,Item Code,Item,Element Code,Element,Year Code,Year,Unit,Value,Flag */
case class Trade(area_code:Int, area:String, item_code:Int, item:String, element_code:Int,
element:String, year:Int, unit:String, value:Double, flag:String)
object TradeCrops {
def scrub(str:String):String = {
return str.replace("\"", "")
}
def toInt(str:String):Int = {
try {
return scrub(str).toInt
} catch {
case e:Throwable => {
return 0
}
}
}
def toDouble(str:String):Double = {
try {
return scrub(str).toDouble
} catch {
case e:Throwable => {
return 0
}
}
}
def toTrade(line:String):Trade = {
val fields = line.split("\",")
Trade(
toInt(fields(0)),
scrub(fields(1)),
toInt(fields(2)),
scrub(fields(3)),
toInt(fields(4)),
scrub(fields(5)),
toInt(fields(7)),
scrub(fields(8)),
toDouble(fields(9)),
scrub(fields(10))
)
}
def main(args: Array[String]) {
val conf = new SparkConf()
.setAppName("Trade Crops Application")
val sc = new SparkContext(conf)
val spark = SparkSession.builder()
.appName("Spark SQL Trade Crops")
.getOrCreate()
val file = sc.textFile("hdfs:///FAO/Trade_Crops.csv")
val tradeRDD = file.filter(_.split("\",").length == 11).map(toTrade(_))
val tradeDF = spark.createDataFrame(tradeRDD)
tradeDF.write.parquet("hdfs:///FAO/Trade_Crops.parquet")
}
}
The build.sbt is
lazy val root = (project in file("."))
.settings(
name := "FAO",
version := "1.0",
scalaVersion := "2.11.7",
unmanagedJars in Compile += file("/usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples-2.6.0-cdh5.7.6.jar"),
libraryDependencies ++= Seq(
"org.apache.spark" % "spark-core_2.11" % "2.1.1",
"org.apache.spark" % "spark-sql_2.11" % "2.1.1",
"org.apache.hadoop" % "hadoop-client" % "2.6.0"
)
)
Always remember to add dependency for “spark-sql” or else it will report “createDataFrame() if not a member of spark”.
And finally, the submit script is:
/disk1/spark-2.1.1-bin-hadoop2.6/bin/spark-submit --class TradeCrops \
--master yarn \
--driver-memory 2G \
--executor-memory 2G \
--executor-cores 1 \
--num-executors 64 \
./target/scala-2.11/FAO_2.11-1.0.jar