赞
踩
P(c/x)=P(X/C)P(C)/P(X)
argmax(P(C/X))=argmax(P(X/C)P(C))=argmax(IIP(Xi/c)P(C))
Mapreduce 解决方案
第一阶段 用训练数据建立分类器
//key 忽略 value 一个样本,包含了各属性值以及分类
map(key,value){
String [] tokens =value.split(",");
int classIndex =tokens.length-1;
String theclass = tokens[classIndex];
for (int i=0;i<(classindex-1);i++){
Striing reducerkey = token[i]+","+theclass;
emit(reducekey,1);
}
String reducerkey="Class"+theclass;
emit(reducerkey,1);
}
reduce(key ,values){
int total =0;
for (int value:values){
total +=value
}
emit(key,total);
}
//最终生成概率表 每个属性在各个类别中出现的概率
第二阶段 使用分类器对新数据分类
map(key,value){
emit(value,1)
}
public class NaiveBayesClassfilerReducer...{
private theProbabilityTable=...;
private List<String> classification=...;
public void setup(){
theProbabilityTable=buildTheProbabilityable();
classifications=buidclassifications();
}
reduce(key,value){
//key (x1,x2,...xm)
String[] attributes=key.split(",');
String selectedclass= null;
double maxPosterior=0.0;
for(String aclass:classifications){
double posterior =theProbabilityTable.getClassProbability(aclass);
for ( int i =0; i<attributes.length; i++){
posterior *=theProbabilityTable.getConditionalProbability(attributes[i],aclass);
}
if (selectedClass==null){
selectedclass=aclass;
maxPosterior=posterior;}
else{
if(posterior>maxPosterior){
selectedClass=aclass;
maxPosterior=posterior;}}
}
}
reduceroutputvalue= selectedclass+","+maxPosterior;
emit(key,reducerOutputValue);
}
对于连续数值属性的朴素贝叶斯分类器
计算P(Xi|C)时可以考虑使用高斯分布,首先从训练数据中计算出C类的Xi属性的高斯分布参数(期望和方差),对于测试数据,将属性值带入此高斯分布即可得到单个属性的条件概率
朴素贝叶斯分类器的Spark实现
阶段1 建立贝叶斯分类器
public class BuildNaiveBayes Classifier implements java.io.Serializable{
static List<Tuple2<PairofString,DoubleWritable>> toWritableList(Map<Tuple2<Sting,String>,Double> PT){
List<Tupple2<PairofString,DoubleWritable>> list = new ArrayList<Tuple2<PairofString,DoubleWritable>>();
for(Map.Entry<Tuple2<String,String>,Double>entry:PT.entrySet()){
list.add(new Tuple2<PairofString,DoubleWritable>(new PairOfStrings(entry.getKey()._1,entry.getKey()._2),
new DoubleWritable(entry.getValue()));
}
return list;
}
public static void main(String[] args) throws Exception{
//处理输入参数
final String trainingDataFilename=args[];
//创建spark上下文对象
JavaSparkContext ctx =SparkUtil.createJavaSparkContext("naive-bayes");
//读取训练数据
JavaRDD<String> training =ctx. textfile("",1);.
long trainingDataSize=training.count();
//对训练数据的所有元素实现map
JavaPiarRDD<Tuple2<String,String>,Integer> pairs =training.flatMapToPair(new PairFlatMapFunction<
String, Tuple2<String,String>,Integer>(){
public Iterable<Tuple2<Tuple2<String,String>,Integer>>call(String s){
List <Tuple2<Tuple2<String,String>,Integer>> result=
new ArrayList(Tuple2<Tuple2<String,String>,Integer>)
String[] tokens=s.split(",");
int classificationIndex =tokens.length-1;
String theclassification =tokens[classificationIndex];
for(int i=0; i<(classificationIndex-1);i==){
Tuple2<String,String> k=new Tuple2<String,String>(tokens[i],theclassification);
result.add(new Tuple2<Tuple2<String,String>,Integer>(k,1));
}
Tuple2<String,String> k =new Tuple2<String,String>("Class',theclassification);
result.add(new Tuple2<Tuple2<String,String>,Integer>(k,1));
return result;
}
})
//对训练数据的所有元素实现reduce
JavaPairRDD<Tuple2<String,String>,Ingeter> counts= pairs.reduceByKey(new Function2<Integer,Integer,Integer>(){
public Integer call(Integer i1,Integer,i2){
return i1+i2;}
})
//收集归约器数据为map
Map<Tuple2<String,String>, Integer> countsasmap=counts.collectAsMap();
//建立分类器数据结构 包括概率表和分类列表
Map<Tuple2<String,String>,Double> PT= new HashMap<Tuple2<String,String>,Doubel>();
List<String> CLASSIFICATIONS=new ArrayList();
for(Map.entry<Tuple2<String,String>,Integer> entry :countsAsMap.entryset()){
Tuple2<String,String> k= entry.getKey();
Sting classification=k._2;
if(k._1.equals("CLASS'){
PT.put(k,((double) entry.getvalue())/((double trainingDataSize)});
CLASSIFICATIONS.add(k._2);
}
else {
Tuple2<String,String> k2 =new Tuple2<String,String>("class",classification);
Integer count = countsAsMap.get(k2);
if (count ==null){PT.put(k,0.0);}
else{ PT.put(k,((double) entry.getvalue())/((double count.intValue()))}
}
}
//保存分类器
List<Tuple2<PairofStrings,Doublewritable>> list =toWritable(PT);
JavaPairRDD<PairofString,DoubleWritable> ptRDD=ctx.parallelizePairs(list);
ptRDD.saveAsHadoopFile("', PairofStrings.class,DoubleWritable.calss,SeqyenceFileOutputFormat.class);
JavaRDD<String> classificationsRDD= ctc.parallelize(CLASSIFICATIONS);
classificationsRDD.saveAsTextFIle("")
//
}
}
阶段2 使用分类器对新数据分类
public class NaiveBayesClassifier implements java.io.serializable{
public static void main(String[] args) throws Exception{
//处理输入参数
//创建spark上下文对象
JavaSparkContext ctx = SparkUtil.createJavaSparkContext("naive-byes");
//读取要分类的新数据
JavaRDD<String> newdata= ctx.textfile("",1);
//从hadoop中读取分类器
JavaPairRDD<PairofStrings,DoubeWritable> ptRDD =ctx.hadoopfile(
patyh; SequenceFileInputFoarmat.calss, PairOfStrings.class,DoubleWritable.class);
JavaPairRDD<Tuple2<String,String>,Double> classifierRDD=ptRDD.MapToPair(
new PairFunction<Tuple2<PairOfString, DoubleWritable>,Tuple2<String,String>,Double>(){
public Tuple2<Tuple2<String,String>,Double>call (Tuple2<PairOfString,DoubleWritable> rec){
PairofStrings pair = rec._1;
Tuple2<String,String> k2= new Tuple2<String,String>(pair.getLeftElement();pair.getRightElement());
Double v2 = new Double(rec.-2.get());
return new Tuple2<Tuple2<String,String>,Double>(k2,v2);
}})
//缓存分类器组件 使集群中的任何节点都可以使用这些组件
Map<Tuple2<String,String>, Double> classifier=classifierRDD.collectAsMap();
final Broadcast<Map<Tuple2<String,String>,Double>> broadcastClassifier=ctx.broadcast(classifier);
JavaRDD<String> classesRDD=ctx.textfile("",1);
List<String> CLASSES=classesRDD.collect();
final Broadcast<List<String>> broadcastClasses= ctx.broadcast(CLASSEDS);
//对新数据分类
JavaPairRDD<String,String> classified = newdata.mapToPair (new PairFunction<String,String,String>(){
public Tuple2<String,String> call(string rec){
Map<Tuple2<String,String>,Double> CLASSIFIER= broadcastClassifier.value();
List<String> CLASSES=broadcastClasses.value();
String [] attributes=rec.split(",");
String selectedClass = null;
double maxPoserior = 0.0;
for (String aclass:CLASSES){
double posterior =CLASSIFIER.get(new Tuple2<String,String>("Class",aclass));
for( int i =0; i<attributes.length;i++){
Double probablility=CLASSIFIER.get(new Tuple2<String,String>(attributes[i],aclass));
if (probablity ==null){
posterior =0.0;
break;
}
else {
posterior *= probablity.doubleValue();
}
}
if (selectedClass == null){
selectedclass=aclass;
maxPosterior=posterior;
}
else{
if(posteior >maxPosterior){
selectedclass=aclass;
maxPosterior=posterior;
}
}
}
}
return new Tuple2<String,String>(rec,selectedClass)
}})
}
}
spark MLlib集成了常用的机器学习算法,可以直接调用
JavaRDD<LabeledPoint> training=... //训练集
JavaRDD<LabeledPoint> test=...//测试集
//给定(label feature)对的一个RDD,训练一个朴素贝叶斯模型
final NaiveBayesModel model = NaiveBayes.train(training.rdd(),thesmoothingparameter);
JavaPairRDD<Double,Double> predictionAndLabel=test.mapToPair(
new PairFunction<LabeledPoint,Double,Double>(){
@Override
public Tuple2<Double,Double> call(LabeledPoint p){
return new Tuple2<Double,Double>(model.predict(p.features()),p.label());
}});
double accuracy =1.0*predictionAndLabel.filter(new Function<Tuple2<Double,Double>,Boolean>(){
@Override
public Boolean call(Tuple2<Double,Double>,p1){
return p1._()==p1._2();}}).count()/test.count();
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。