After downloading data from “Food and Agriculture Organization of United Nations”, I get many CSV files. One of the file is named “Trade_Crops_Livestock_E_All_Data_(Normalized).csv” and it looks like:
Area Code,Area,Item Code,Item,Element Code,Element,Year Code,Year,Unit,Value,Flag "2","Afghanistan","231","Almonds shelled","5910","Export Quantity","1961","1961","tonnes","0.000000","" "2","Afghanistan","231","Almonds shelled","5910","Export Quantity","1962","1962","tonnes","0.000000","" "2","Afghanistan","231","Almonds shelled","5910","Export Quantity","1963","1963","tonnes","0.000000","" ......
To load this CSV file into Spark and dump it to Parquet format, I wrote these codes:
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.rdd._
/* Area Code,Area,Item Code,Item,Element Code,Element,Year Code,Year,Unit,Value,Flag */
case class Trade(area_code:Int, area:String, item_code:Int, item:String, element_code:Int,
element:String, year:Int, unit:String, value:Double, flag:String)
object TradeCrops {
def scrub(str:String):String = {
return str.replace("\"", "")
}
def toInt(str:String):Int = {
try {
return scrub(str).toInt
} catch {
case e:Throwable => {
return 0
}
}
}
def toDouble(str:String):Double = {
try {
return scrub(str).toDouble
} catch {
case e:Throwable => {
return 0
}
}
}
def toTrade(line:String):Trade = {
val fields = line.split("\",")
Trade(
toInt(fields(0)),
scrub(fields(1)),
toInt(fields(2)),
scrub(fields(3)),
toInt(fields(4)),
scrub(fields(5)),
toInt(fields(7)),
scrub(fields(8)),
toDouble(fields(9)),
scrub(fields(10))
)
}
def main(args: Array[String]) {
val conf = new SparkConf()
.setAppName("Trade Crops Application")
val sc = new SparkContext(conf)
val spark = SparkSession.builder()
.appName("Spark SQL Trade Crops")
.getOrCreate()
val file = sc.textFile("hdfs:///FAO/Trade_Crops.csv")
val tradeRDD = file.filter(_.split("\",").length == 11).map(toTrade(_))
val tradeDF = spark.createDataFrame(tradeRDD)
tradeDF.write.parquet("hdfs:///FAO/Trade_Crops.parquet")
}
}
The build.sbt is
lazy val root = (project in file(".")) .settings( name := "FAO", version := "1.0", scalaVersion := "2.11.7", unmanagedJars in Compile += file("/usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples-2.6.0-cdh5.7.6.jar"), libraryDependencies ++= Seq( "org.apache.spark" % "spark-core_2.11" % "2.1.1", "org.apache.spark" % "spark-sql_2.11" % "2.1.1", "org.apache.hadoop" % "hadoop-client" % "2.6.0" ) )
Always remember to add dependency for “spark-sql” or else it will report “createDataFrame() if not a member of spark”.
And finally, the submit script is:
/disk1/spark-2.1.1-bin-hadoop2.6/bin/spark-submit --class TradeCrops \ --master yarn \ --driver-memory 2G \ --executor-memory 2G \ --executor-cores 1 \ --num-executors 64 \ ./target/scala-2.11/FAO_2.11-1.0.jar