赞
踩
./bin/spark-shell
。想要在Python shell中使用Spark,直接运行./bin/pyspark
命令即可。val broadcastAList = sc.broadcast(List("a", "b", "c", "d", "e"))
。广播变量存储在内存中。客户名称 | 商品名 | 商品价格 |
---|---|---|
John | iPhone Cover | 9.99 |
John | Headphones | 5.49 |
Jack | iPhone Cover | 9.99 |
Jill | Samsung Galaxy Cover | 8.95 |
Bob | iPad Cover | 5.49 |
对于Scala程序而言,需要创建两个文件:Scala代码文件以及项目的构建配置文件。项目将使用SBT来构建。
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
/**
* A simple Spark app in Scala
*/
object ScalaApp {
def main(args: Array[String]) {
val sc = new SparkContext("local[2]", "First Spark App")
// we take the raw data in CSV format and convert it into a set of records of the form (user, product, price)
val data = sc.textFile("data/UserPurchaseHistory.csv")
.map(line => line.split(","))
.map(purchaseRecord => (purchaseRecord(0), purchaseRecord(1), purchaseRecord(2)))
// let's count the number of purchases
val numPurchases = data.count()
// let's count how many unique users made purchases
val uniqueUsers = data.map { case (user, product, price) => user }.distinct().count()
// let's sum up our total revenue
val totalRevenue = data.map { case (user, product, price) => price.toDouble }.sum()
// let's find our most popular product
val productsByPopularity = data
.map { case (user, product, price) => (product, 1) }
.reduceByKey(_ + _)
.collect()
.sortBy(-_._2)
val mostPopular = productsByPopularity(0)
// finally, print everything out
println("Total purchases: " + numPurchases)
println("Unique users: " + uniqueUsers)
println("Total revenue: " + totalRevenue)
println("Most popular product: %s with %d purchases".format(mostPopular._1, mostPopular._2))
sc.stop()
}
}
Java API与Scala API本质上很相似。Scala代码可以很方便地调用Java代码,但某些Scala代码却无法在Java里调用。
1.8及之前版本的Java并不支持匿名函数,我们经常会创建临时类来传递给Spark操作。这些类会实现操作所需的接口以及call函数。
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.DoubleFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
/**
* A simple Spark app in Java
*/
public class JavaApp {
public static void main(String[] args) {
JavaSparkContext sc = new JavaSparkContext("local[2]", "First Spark App");
// we take the raw data in CSV format and convert it into a set of records of the form (user, product, price)
JavaRDD<String[]> data = sc.textFile("data/UserPurchaseHistory.csv")
.map(new Function<String, String[]>() {
@Override
public String[] call(String s) throws Exception {
return s.split(",");
}
});
// let's count the number of purchases
long numPurchases = data.count();
// let's count how many unique users made purchases
long uniqueUsers = data.map(new Function<String[], String>() {
@Override
public String call(String[] strings) throws Exception {
return strings[0];
}
}).distinct().count();
// let's sum up our total revenue
double totalRevenue = data.map(new DoubleFunction<String[]>() {
@Override
public Double call(String[] strings) throws Exception {
return Double.parseDouble(strings[2]);
}
}).sum();
// let's find our most popular product
// first we map the data to records of (product, 1) using a PairFunction
// and the Tuple2 class.
// then we call a reduceByKey operation with a Function2, which is essentially the sum function
List<Tuple2<String, Integer>> pairs = data.map(new PairFunction<String[], String, Integer>() {
@Override
public Tuple2<String, Integer> call(String[] strings) throws Exception {
return new Tuple2(strings[1], 1);
}
}).reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer integer, Integer integer2) throws Exception {
return integer + integer2;
}
}).collect();
// finally we sort the result. Note we need to create a Comparator function,
// that reverses the sort order.
Collections.sort(pairs, new Comparator<Tuple2<String, Integer>>() {
@Override
public int compare(Tuple2<String, Integer> o1, Tuple2<String, Integer> o2) {
return -(o1._2() - o2._2());
}
});
String mostPopular = pairs.get(0)._1();
int purchases = pairs.get(0)._2();
// print everything out
System.out.println("Total purchases: " + numPurchases);
System.out.println("Unique users: " + uniqueUsers);
System.out.println("Total revenue: " + totalRevenue);
System.out.println(String.format("Most popular product: %s with %d purchases",
mostPopular, purchases));
sc.stop();
}
}
Spark的Python API几乎覆盖了所有Scala API所能提供的功能,但的确有些特性,比如Spark Streaming和个别的API方法,暂不支持。
"""A simple Spark app in Python"""
from pyspark import SparkContext
sc = SparkContext("local[2]", "First Spark App")
# we take the raw data in CSV format and convert it into a set of records of the form (user, product, price)
data = sc.textFile("data/UserPurchaseHistory.csv").map(lambda line: line.split(",")).map(lambda record: (record[0], record[1], record[2]))
# let's count the number of purchases
numPurchases = data.count()
# let's count how many unique users made purchases
uniqueUsers = data.map(lambda record: record[0]).distinct().count()
# let's sum up our total revenue
totalRevenue = data.map(lambda record: float(record[2])).sum()
# let's find our most popular product
products = data.map(lambda record: (record[1], 1.0)).reduceByKey(lambda a, b: a + b).collect()
mostPopular = sorted(products, key=lambda x: x[1], reverse=True)[0]
# Finally, print everything out
print "Total purchases: %d" % numPurchases
print "Unique users: %d" % uniqueUsers
print "Total revenue: %2.2f" % totalRevenue
print "Most popular product: %s with %d purchases" % (mostPopular[0], mostPopular[1])
# stop the SparkContext
sc.stop()
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。