当前位置:   article > 正文

Spark Streaming入门 - 从Queue队列接收数据 Demo,测试使用_sparkstreaming--javaqueuestream

sparkstreaming--javaqueuestream
  1. package cn.taobao;
  2. import org.apache.spark.api.java.JavaRDD;
  3. import org.apache.spark.api.java.function.Function2;
  4. import org.apache.spark.api.java.function.PairFunction;
  5. import org.apache.spark.streaming.Durations;
  6. import org.apache.spark.streaming.api.java.JavaDStream;
  7. import org.apache.spark.streaming.api.java.JavaPairDStream;
  8. import org.apache.spark.streaming.api.java.JavaStreamingContext;
  9. import scala.Tuple2;
  10. import java.util.ArrayList;
  11. import java.util.LinkedList;
  12. public class queue_streaming_Test {
  13. public static void main(String[] args) {
  14. // StreamingContext 编程入口
  15. JavaStreamingContext ssc = new JavaStreamingContext(
  16. "local[2]",
  17. "JavaLocalNetworkWordCount",
  18. Durations.seconds(4),
  19. System.getenv("SPARK_HOME"),
  20. JavaStreamingContext.jarOfClass(queue_streaming_Test.class.getClass()));
  21. ssc.sparkContext().setLogLevel("ERROR");
  22. //创建一个RDD类型的queue
  23. LinkedList<JavaRDD<Integer>> queue = new LinkedList<>();
  24. ArrayList<Integer> arrayList = new ArrayList<>();
  25. for (int i = 1; i <= 1000; i++) {
  26. arrayList.add(i);
  27. }
  28. //ArrayList 转换成JavaRdd
  29. JavaRDD<Integer> javaRDD = ssc.sparkContext().parallelize(arrayList);
  30. //添加到 queue中
  31. queue.add(javaRDD);
  32. //创建QueueInputDStream 且接受数据和处理数据
  33. JavaDStream<Integer> integerJavaDStream = ssc.queueStream(queue);
  34. //mapToPair算子
  35. JavaPairDStream<Integer, Integer> integerIntegerJavaPairDStream = integerJavaDStream.mapToPair(new PairFunction<Integer, Integer, Integer>() {
  36. @Override
  37. public Tuple2<Integer, Integer> call(Integer i) throws Exception {
  38. return new Tuple2<Integer, Integer>(i % 10, 1);
  39. }
  40. });
  41. //reduceByKey算子
  42. JavaPairDStream<Integer, Integer> integerIntegerJavaPairDStream1 = integerIntegerJavaPairDStream.reduceByKey(new Function2<Integer, Integer, Integer>() {
  43. @Override
  44. public Integer call(Integer v1, Integer v2) throws Exception {
  45. return v1 + v2;
  46. }
  47. });
  48. //将结果输出到控制台
  49. integerIntegerJavaPairDStream1.print();
  50. //显式的启动数据接收
  51. ssc.start();
  52. try {
  53. //来等待计算完成
  54. ssc.awaitTermination();
  55. } catch (InterruptedException e) {
  56. e.printStackTrace();
  57. }
  58. }
  59. }
  1. 运行结果
  2. -------------------------------------------
  3. Time: 1619511532000 ms
  4. -------------------------------------------
  5. (4,100)
  6. (0,100)
  7. (6,100)
  8. (8,100)
  9. (2,100)
  10. (1,100)
  11. (3,100)
  12. (7,100)
  13. (9,100)
  14. (5,100)
  15. -------------------------------------------
  16. Time: 1619511536000 ms
  17. -------------------------------------------

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/盐析白兔/article/detail/960640
推荐阅读
相关标签
  

闽ICP备14008679号