当前位置:   article > 正文

Spark之SparkStreaming案例-transform_spark streaming transform python

spark streaming transform python

Transform 操作

transform操作允许将任意RDD到RDD函数应用于DStream。 它可用于应用任何未在DStream API中公开的RDD操作。 例如,将数据流中的每个批处理与其他数据集相结合的功能不会直接暴露在DStream API中。 但是,您可以轻松地使用transform来执行此操作。 这使得非常强大的可能性。 例如,可以通过将输入数据流与预先计算的垃圾信息(也可以用Spark一起生成)进行实时数据清理,然后根据它进行过滤。

一、案例:过滤刷广告的用户,

1.1、模拟一个黑名单

1.1.1、模拟用户在网站上点击广告, 但是存在刷广告的现象, 所以对这类用户的点击流量进行滤除,所以将此类用户加入黑名单,

//黑名单列表  (user, boolean), true表示该用户在黑名单中, 在后续的计算中,不记录该用户的点击效果。
final List<Tuple2<String, Boolean>> blockList = new ArrayList<Tuple2<String, Boolean>>();
//ture表示在黑名单上
blockList.add(new Tuple2<String, Boolean>("lisi", true));
  • 1
  • 2
  • 3
  • 4
  • 5

1.1.1、将黑名单列表转为一个RDD,

//黑名单RDD   (user, boolean)
JavaPairRDD<String, Boolean> blackRDD =  jssc.sparkContext().parallelizePairs(blockList);
  • 1
  • 2
  • 3

1.2、//从指定端口获取模拟点击日志:”date user”

//从指定端口获取模拟点击日志:"date  user"
JavaReceiverInputDStream<String> adsClickLogDStream = jssc.socketTextStream("192.168.1.224", 9999);
  • 1
  • 2
  • 3

1.3、将数据流中的数据进行格式转换

日志格式为date user,为了在后续工作中, 和黑名单RDD进行join操作方便, 将日志格式改为(user, log);

log:   date user
改为
(user, log//为了后面对数据流中的RDD和黑名单中RDD进行join操作, 将RDD中的数据进行格式化(user, log)
        JavaPairDStream<String, String> userAdsClickLogDStream = adsClickLogDStream.mapToPair(
                new PairFunction<String, String, String>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Tuple2<String, String> call(String log) throws Exception {
                //对日志格式进行转换,"date user" 变为(user, log)
                return new Tuple2<String, String>(log.split(" ")[1], log);
            }
        });
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

1.4、过滤黑名单中的用户日志, 此处使用transform操作

    //实时进行黑名单过滤, 执行transform操作, 将每个batch的RDD,与黑名单中的RDD进行join操作
        JavaDStream<String> validAdsClickLogDStream = userAdsClickLogDStream.transform(
                new Function<JavaPairRDD<String,String>, JavaRDD<String>>() {
                    private static final long serialVersionUID = 1L;

                    @Override
                    public JavaRDD<String> call(JavaPairRDD<String, String> userAdsClickLogRDD)
                            throws Exception {
                        //将黑名单RDD和每个batch的RDD进行join操作
                        // 这里为什么是左外连接,因为并不是每个用户都在黑名单中,所以直接用join,那么没有在黑名单中的数据,无法join到就会丢弃
                        // string是用户,string是日志,是否在黑名单里是Optional
                        //(user, (log, boolean))
                        JavaPairRDD<String, Tuple2<String, Optional<Boolean>>> joindRDD = 
                                userAdsClickLogRDD.leftOuterJoin(blackRDD);
                        //过滤    

                        JavaPairRDD<String, Tuple2<String, Optional<Boolean>>> filteredRDD = 
                                joindRDD.filter(new Function<Tuple2<String,Tuple2<String,Optional<Boolean>>>, Boolean>() {
                                /*  
                                 *  public interface Function<T1, R> extends Serializable {
                                          R call(T1 v1) throws Exception;
                                        }

                                 */
                                    private static final long serialVersionUID = 1L;

                                    @Override
                                    public Boolean call(Tuple2<String, Tuple2<String, Optional<Boolean>>> tuple)
                                            throws Exception {//(user, (log, boolean))
                                        //这里tuple就是每个用户对应的访问日志和在黑名单中状态
                                        if (tuple._2._2.isPresent() && tuple._2._2.get()) {
                                            return false;
                                        }else {
                                            return true;
                                        }
                                    }
                                });

                        // 到此为止,filteredRDD中就只剩下没有被过滤的正常用户了,用map函数转换成我们要的格式,我们只要点击日志
                        JavaRDD<String> validAdsCiickLogRDD = filteredRDD.map(new Function<Tuple2<String,Tuple2<String,Optional<Boolean>>>, String>() {

                            private static final long serialVersionUID = 1L;

                            @Override
                            public String call( Tuple2<String, Tuple2<String, Optional<Boolean>>> tuple)
                                    throws Exception {
                                return tuple._2._1;
                            }

                        });
                        //放回过滤的结果
                        return validAdsCiickLogRDD;
                    }
        });
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55

1.4.1、在transfrom操作中, 对每个batch中的RDD进行join操作

//将黑名单RDD和每个batch的RDD进行join操作
// 这里为什么是左外连接,因为并不是每个用户都在黑名单中,所以直接用join,那么没有在黑名单中的数据,无法join到就会丢弃
// string是用户,string是日志,是否在黑名单里是Optional
//得到的结果:(user, (log, boolean))
JavaPairRDD<String, Tuple2<String, Optional<Boolean>>> joindRDD = 
            userAdsClickLogRDD.leftOuterJoin(blackRDD);
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

1.4.2、黑名单和batch中的RDDjoin之后,对结果进行过滤

                        //过滤    
                        JavaPairRDD<String, Tuple2<String, Optional<Boolean>>> filteredRDD = 
                                joindRDD.filter(new Function<Tuple2<String,Tuple2<String,Optional<Boolean>>>, Boolean>() {
                                /*  
                                 *  public interface Function<T1, R> extends Serializable {
                                          R call(T1 v1) throws Exception;
                                        }

                                 */
                                    private static final long serialVersionUID = 1L;

                                    @Override
                                    public Boolean call(Tuple2<String, Tuple2<String, Optional<Boolean>>> tuple)
                                            throws Exception {//(user, (log, boolean))
                                        //这里tuple就是每个用户对应的访问日志和在黑名单中状态
                                        if (tuple._2._2.isPresent() && tuple._2._2.get()) {
                                            return false;
                                        }else {
                                            return true;
                                        }
                                    }
                                });
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22

1.4.3、就只剩下没有被过滤的正常用户了,用map函数转换成我们要的格式,我们只要点击日志


                        // 到此为止,filteredRDD中就只剩下没有被过滤的正常用户了,用map函数转换成我们要的格式,我们只要点击日志
                        JavaRDD<String> validAdsCiickLogRDD = filteredRDD.map(new Function<Tuple2<String,Tuple2<String,Optional<Boolean>>>, String>() {

                            private static final long serialVersionUID = 1L;

                            @Override
                            public String call( Tuple2<String, Tuple2<String, Optional<Boolean>>> tuple)
                                    throws Exception {
                                return tuple._2._1;
                            }

                        });
                        //放回过滤的结果
                        return validAdsCiickLogRDD;
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

1.5、启动

        // 这后面就可以写入Kafka中间件消息队列,作为广告计费服务的有效广告点击数据
        validAdsClickLogDStream.print();

        jssc.start();
        jssc.awaitTermination();
        jssc.close();
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

scala

package com.chb.scala

import org.apache.spark.SparkConf
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Duration
import org.apache.spark.streaming.Seconds
import org.apache.spark.SparkContext

object BlackListFilter {
    def main(args: Array[String]): Unit = {
      val sparkConf = new SparkConf().setAppName("BlackListFilter")
            .setMaster("local[*]")
      val ssc = new StreamingContext(sparkConf, Seconds(5))

      //黑名单
      val blackList = Array(("jack", true), ("rose", true))
      //设置并行度
      val blackListRDD = ssc.sparkContext.parallelize(blackList, 3)

      //使用socketTextStream 监听端口
      var st = ssc.socketTextStream("192.168.179.5", 8888)

      //user, boolean==> 
      val users = st.map { 
         line =>  (line.split(" ")(1), line)
      }

      val validRddDS = users.transform(ld => {
          //通过leftOuterJoin 将(k, v) join (k,w) ==> (k, (v, some(W)))
          val ljoinRdd = ld.leftOuterJoin(blackListRDD)

          //过滤掉黑名单
          val fRdd = ljoinRdd.filter(tuple => {
            println(tuple)
            if(tuple._2._2.getOrElse(false)) {  
                false
            } else {
                true
            }
          })

          //获取白名单
          val validRdd = fRdd.map(tuple => tuple._2._1) 
          validRdd
      })

      validRddDS.print()
      ssc.start()
      ssc.awaitTermination()
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/你好赵伟/article/detail/975605
推荐阅读
相关标签
  

闽ICP备14008679号