赞
踩
transform操作允许将任意RDD到RDD函数应用于DStream。 它可用于应用任何未在DStream API中公开的RDD操作。 例如,将数据流中的每个批处理与其他数据集相结合的功能不会直接暴露在DStream API中。 但是,您可以轻松地使用transform来执行此操作。 这使得非常强大的可能性。 例如,可以通过将输入数据流与预先计算的垃圾信息(也可以用Spark一起生成)进行实时数据清理,然后根据它进行过滤。
//黑名单列表 (user, boolean), true表示该用户在黑名单中, 在后续的计算中,不记录该用户的点击效果。
final List<Tuple2<String, Boolean>> blockList = new ArrayList<Tuple2<String, Boolean>>();
//ture表示在黑名单上
blockList.add(new Tuple2<String, Boolean>("lisi", true));
//黑名单RDD (user, boolean)
JavaPairRDD<String, Boolean> blackRDD = jssc.sparkContext().parallelizePairs(blockList);
//从指定端口获取模拟点击日志:"date user"
JavaReceiverInputDStream<String> adsClickLogDStream = jssc.socketTextStream("192.168.1.224", 9999);
log: date user
改为
(user, log)
//为了后面对数据流中的RDD和黑名单中RDD进行join操作, 将RDD中的数据进行格式化(user, log)
JavaPairDStream<String, String> userAdsClickLogDStream = adsClickLogDStream.mapToPair(
new PairFunction<String, String, String>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2<String, String> call(String log) throws Exception {
//对日志格式进行转换,"date user" 变为(user, log)
return new Tuple2<String, String>(log.split(" ")[1], log);
}
});
//实时进行黑名单过滤, 执行transform操作, 将每个batch的RDD,与黑名单中的RDD进行join操作
JavaDStream<String> validAdsClickLogDStream = userAdsClickLogDStream.transform(
new Function<JavaPairRDD<String,String>, JavaRDD<String>>() {
private static final long serialVersionUID = 1L;
@Override
public JavaRDD<String> call(JavaPairRDD<String, String> userAdsClickLogRDD)
throws Exception {
//将黑名单RDD和每个batch的RDD进行join操作
// 这里为什么是左外连接,因为并不是每个用户都在黑名单中,所以直接用join,那么没有在黑名单中的数据,无法join到就会丢弃
// string是用户,string是日志,是否在黑名单里是Optional
//(user, (log, boolean))
JavaPairRDD<String, Tuple2<String, Optional<Boolean>>> joindRDD =
userAdsClickLogRDD.leftOuterJoin(blackRDD);
//过滤
JavaPairRDD<String, Tuple2<String, Optional<Boolean>>> filteredRDD =
joindRDD.filter(new Function<Tuple2<String,Tuple2<String,Optional<Boolean>>>, Boolean>() {
/*
* public interface Function<T1, R> extends Serializable {
R call(T1 v1) throws Exception;
}
*/
private static final long serialVersionUID = 1L;
@Override
public Boolean call(Tuple2<String, Tuple2<String, Optional<Boolean>>> tuple)
throws Exception {//(user, (log, boolean))
//这里tuple就是每个用户对应的访问日志和在黑名单中状态
if (tuple._2._2.isPresent() && tuple._2._2.get()) {
return false;
}else {
return true;
}
}
});
// 到此为止,filteredRDD中就只剩下没有被过滤的正常用户了,用map函数转换成我们要的格式,我们只要点击日志
JavaRDD<String> validAdsCiickLogRDD = filteredRDD.map(new Function<Tuple2<String,Tuple2<String,Optional<Boolean>>>, String>() {
private static final long serialVersionUID = 1L;
@Override
public String call( Tuple2<String, Tuple2<String, Optional<Boolean>>> tuple)
throws Exception {
return tuple._2._1;
}
});
//放回过滤的结果
return validAdsCiickLogRDD;
}
});
//将黑名单RDD和每个batch的RDD进行join操作
// 这里为什么是左外连接,因为并不是每个用户都在黑名单中,所以直接用join,那么没有在黑名单中的数据,无法join到就会丢弃
// string是用户,string是日志,是否在黑名单里是Optional
//得到的结果:(user, (log, boolean))
JavaPairRDD<String, Tuple2<String, Optional<Boolean>>> joindRDD =
userAdsClickLogRDD.leftOuterJoin(blackRDD);
//过滤
JavaPairRDD<String, Tuple2<String, Optional<Boolean>>> filteredRDD =
joindRDD.filter(new Function<Tuple2<String,Tuple2<String,Optional<Boolean>>>, Boolean>() {
/*
* public interface Function<T1, R> extends Serializable {
R call(T1 v1) throws Exception;
}
*/
private static final long serialVersionUID = 1L;
@Override
public Boolean call(Tuple2<String, Tuple2<String, Optional<Boolean>>> tuple)
throws Exception {//(user, (log, boolean))
//这里tuple就是每个用户对应的访问日志和在黑名单中状态
if (tuple._2._2.isPresent() && tuple._2._2.get()) {
return false;
}else {
return true;
}
}
});
// 到此为止,filteredRDD中就只剩下没有被过滤的正常用户了,用map函数转换成我们要的格式,我们只要点击日志
JavaRDD<String> validAdsCiickLogRDD = filteredRDD.map(new Function<Tuple2<String,Tuple2<String,Optional<Boolean>>>, String>() {
private static final long serialVersionUID = 1L;
@Override
public String call( Tuple2<String, Tuple2<String, Optional<Boolean>>> tuple)
throws Exception {
return tuple._2._1;
}
});
//放回过滤的结果
return validAdsCiickLogRDD;
// 这后面就可以写入Kafka中间件消息队列,作为广告计费服务的有效广告点击数据
validAdsClickLogDStream.print();
jssc.start();
jssc.awaitTermination();
jssc.close();
package com.chb.scala
import org.apache.spark.SparkConf
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Duration
import org.apache.spark.streaming.Seconds
import org.apache.spark.SparkContext
object BlackListFilter {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName("BlackListFilter")
.setMaster("local[*]")
val ssc = new StreamingContext(sparkConf, Seconds(5))
//黑名单
val blackList = Array(("jack", true), ("rose", true))
//设置并行度
val blackListRDD = ssc.sparkContext.parallelize(blackList, 3)
//使用socketTextStream 监听端口
var st = ssc.socketTextStream("192.168.179.5", 8888)
//user, boolean==>
val users = st.map {
line => (line.split(" ")(1), line)
}
val validRddDS = users.transform(ld => {
//通过leftOuterJoin 将(k, v) join (k,w) ==> (k, (v, some(W)))
val ljoinRdd = ld.leftOuterJoin(blackListRDD)
//过滤掉黑名单
val fRdd = ljoinRdd.filter(tuple => {
println(tuple)
if(tuple._2._2.getOrElse(false)) {
false
} else {
true
}
})
//获取白名单
val validRdd = fRdd.map(tuple => tuple._2._1)
validRdd
})
validRddDS.print()
ssc.start()
ssc.awaitTermination()
}
}
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。