几周前,我写了我是如何使用Spark探索芝加哥市犯罪数据集的 ,并得出了每起犯罪的数量,我想将其写入CSV文件。
Spark提供了一个saveAsTextFile函数,该函数允许我们保存RDD的代码,因此我将代码重构为以下格式,以允许我使用它:
- import au.com.bytecode.opencsv.CSVParser
- import org.apache.spark.rdd.RDD
- import org.apache.spark.SparkContext._
-
- def dropHeader(data: RDD[String]): RDD[String] = {
- data.mapPartitionsWithIndex((idx, lines) => {
- if (idx == 0) {
- lines.drop(1)
- }
- lines
- })
- }
-
- // https://data.cityofchicago.org/Public-Safety/Crimes-2001-to-present/ijzp-q8t2
- val crimeFile = "/Users/markneedham/Downloads/Crimes_-_2001_to_present.csv"
-
- val crimeData = sc.textFile(crimeFile).cache()
- val withoutHeader: RDD[String] = dropHeader(crimeData)
-
- val file = "/tmp/primaryTypes.csv"
- FileUtil.fullyDelete(new File(file))
-
- val partitions: RDD[(String, Int)] = withoutHeader.mapPartitions(lines => {
- val parser = new CSVParser(',')
- lines.map(line => {
- val columns = parser.parseLine(line)
- (columns(5), 1)
- })
- })
-
- val c