当前位置:   article > 正文

spark streaming从kafka接收的简单demo_spark kafka demo

spark kafka demo

准备工作,需要配置本地scala环境,配置方法见我的另外一篇博客:
https://blog.csdn.net/fct2001140269/article/details/93978940

一、pom.xml配置

我的pom.xml也一并贴出,方便小伙伴们参考:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>swtx</groupId>
  <artifactId>MySparkStreamTest1</artifactId>
  <version>1.0-SNAPSHOT</version>
  <inceptionYear>2008</inceptionYear>

  <properties>
    <maven.compiler.source>1.8</maven.compiler.source>
    <maven.compiler.target>1.8</maven.compiler.target>
    <encoding>UTF-8</encoding>
    <scala.version>2.11.12</scala.version>
    <spark.version>2.2.0</spark.version>
    <hadoop.version>2.7.2</hadoop.version>
  </properties>


  <repositories>
    <repository>
      <id>scala-tools.org</id>
      <name>Scala-Tools Maven2 Repository</name>
      <url>http://scala-tools.org/repo-releases</url>
    </repository>
  </repositories>

  <pluginRepositories>
    <pluginRepository>
      <id>scala-tools.org</id>
      <name>Scala-Tools Maven2 Repository</name>
      <url>http://scala-tools.org/repo-releases</url>
    </pluginRepository>
  </pluginRepositories>

  <dependencies>
    <dependency>
      <groupId>org.scala-lang</groupId>
      <artifactId>scala-library</artifactId>
      <version>${scala.version}</version>
    </dependency>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>4.4</version>
      <scope>test</scope>
    </dependency>

    <dependency>
      <groupId>org.specs</groupId>
      <artifactId>specs</artifactId>
      <version>1.2.5</version>
      <scope>test</scope>
    </dependency>

    <dependency>
      <groupId>org.scala-lang</groupId>
      <artifactId>scala-library</artifactId>
      <version>${scala.version}</version>
    </dependency>

    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-core_2.11</artifactId>
      <version>${spark.version}</version>
    </dependency>

    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming_2.11</artifactId>
      <version>${spark.version}</version>
    </dependency>


    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-sql_2.11</artifactId>
      <version>${spark.version}</version>
    </dependency>

    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-client</artifactId>
      <version>${hadoop.version}</version>
    </dependency>

    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka_2.11</artifactId>
      <version>0.10.0.1</version>
    </dependency>
<!--    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming-kafka_2.10</artifactId>
      <version>1.0.0</version>
    </dependency>-->
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
      <version>2.3.0</version>
    </dependency>



    <!-- StringUtils -->
    <dependency>
      <groupId>commons-lang</groupId>
      <artifactId>commons-lang</artifactId>
      <version>2.5</version>
    </dependency>

  </dependencies>

  <build>
    <sourceDirectory>src/main/scala</sourceDirectory>
    <testSourceDirectory>src/test/scala</testSourceDirectory>
    <plugins>

      <!--(start) for package jar with dependencies -->
      <plugin>
        <artifactId>maven-assembly-plugin</artifactId>
        <version>3.0.0</version>
        <configuration>
          <archive>
            <manifest>
              <mainClass>swtx.SparkDemo</mainClass>
            </manifest>
          </archive>
          <descriptorRefs>
            <descriptorRef>jar-with-dependencies</descriptorRef>
          </descriptorRefs>
        </configuration>
        <executions>
          <execution>
            <id>make-assembly</id> <!-- this is used for inheritance merges -->
            <phase>package</phase> <!-- bind to the packaging phase -->
            <goals>
              <goal>single</goal>
            </goals>
          </execution>
        </executions>
      </plugin>
      <!--(end) for package jar with dependencies -->


      <plugin>
        <groupId>org.scala-tools</groupId>
        <artifactId>maven-scala-plugin</artifactId>
        <executions>
          <execution>
            <goals>
              <goal>compile</goal>
              <goal>testCompile</goal>
            </goals>
          </execution>
        </executions>
        <configuration>
          <scalaVersion>${scala.version}</scalaVersion>
          <args>
            <arg>-target:jvm-1.8</arg>
          </args>
        </configuration>
      </plugin>

      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-eclipse-plugin</artifactId>
        <configuration>
          <downloadSources>true</downloadSources>
          <buildcommands>
            <buildcommand>ch.epfl.lamp.sdt.core.scalabuilder</buildcommand>
          </buildcommands>
          <additionalProjectnatures>
            <projectnature>ch.epfl.lamp.sdt.core.scalanature</projectnature>
          </additionalProjectnatures>
          <classpathContainers>
            <classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer>
            <classpathContainer>ch.epfl.lamp.sdt.launching.SCALA_CONTAINER</classpathContainer>
          </classpathContainers>
        </configuration>
      </plugin>

    </plugins>
  </build>

  <reporting>
    <plugins>
      <plugin>
        <groupId>org.scala-tools</groupId>
        <artifactId>maven-scala-plugin</artifactId>
        <configuration>
          <scalaVersion>${scala.version}</scalaVersion>
        </configuration>
      </plugin>
    </plugins>
  </reporting>

</project>

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142
  • 143
  • 144
  • 145
  • 146
  • 147
  • 148
  • 149
  • 150
  • 151
  • 152
  • 153
  • 154
  • 155
  • 156
  • 157
  • 158
  • 159
  • 160
  • 161
  • 162
  • 163
  • 164
  • 165
  • 166
  • 167
  • 168
  • 169
  • 170
  • 171
  • 172
  • 173
  • 174
  • 175
  • 176
  • 177
  • 178
  • 179
  • 180
  • 181
  • 182
  • 183
  • 184
  • 185
  • 186
  • 187
  • 188
  • 189
  • 190
  • 191
  • 192
  • 193
  • 194
  • 195
  • 196
  • 197

二、spark streaming 接受kafka数据
具体的spark-streaming 的demo实现 ,从kafka接受数据

package swtx;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.ConsumerStrategies;
import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies;
import scala.Tuple2;

import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;

/**
 * Created by user on 2019/6/27.
 */
public class SparkDemo {


    public static void main(String[] args) {

        //设置SparkConf,配置连接资源信息.local[2]表示本地模式两个core
        SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount");
        //设置批次时间,测试可以用5秒,创建StreamingContext对象,运行时需要
        JavaStreamingContext streamingContext = new JavaStreamingContext(conf, Durations.seconds(5));

        //创建kafka连接配置参数,放入map中,使用spark-streaming的连接kafka的工具包KafkaUtils类衔接kafka和spark。
        //最终返回InputDStream对象。类型:ConsumerRecord<String, String>
        Map<String, Object> kafkaParams = new HashMap<>();
        kafkaParams.put("bootstrap.servers", "xxx.xx.xx.xxx:9092");//多个可用ip可用","隔开
        kafkaParams.put("key.deserializer", StringDeserializer.class);
        kafkaParams.put("value.deserializer", StringDeserializer.class);
        kafkaParams.put("group.id", "fang_id");
        Collection<String> topics = Arrays.asList("test_tank007");//配置topic,可以是数组
        JavaInputDStream<ConsumerRecord<String, String>> javaInputDStream = KafkaUtils.createDirectStream(
                streamingContext, LocationStrategies.PreferConsistent()
                , ConsumerStrategies.Subscribe(topics, kafkaParams));


        //将参数传入方法的的参数,采用匿名内部类的方法,new inferface名,重写其方法,返回DStream
        //mapToPair:Return a new DStream by applying a function to all elements of this DStream.
        JavaPairDStream<String, String> javaPairDStream = javaInputDStream.mapToPair(new PairFunction<ConsumerRecord<String, String>, String, String>() {
            private static final long serialVersionUID = 1L;

            @Override
            public Tuple2<String, String> call(ConsumerRecord<String, String> consumerRecord) throws Exception {
                return new Tuple2<>(consumerRecord.key(), consumerRecord.value());
            }
        });


        /*对DStream操作,循环遍历DStream中的所有元素,对每一个元素应用对应的函数
        * 对DStream的操作拆分为两部,第一步:对DStream代表的底层rdd操作,遍历,每个rdd对象(数据块)
        * 第二步:对每一个rdd中的每个元组tuple执行遍历,拿到tuple中每个k-v键值对,直接打印输出;
         */
        javaPairDStream.foreachRDD(new VoidFunction<JavaPairRDD<String, String>>() {
            @Override
            public void call(JavaPairRDD<String, String> javaPairRDD) throws Exception {
                // TODO Auto-generated method stub
                javaPairRDD.foreach(new VoidFunction<Tuple2<String, String>>() {
                    @Override
                    public void call(Tuple2<String, String> tuple2)
                            throws Exception {
                        // TODO Auto-generated method stub
                        System.out.println(tuple2._2);
                    }
                });
            }
        });

        //rdd本身也是惰性操作,需要streamingContext.start();来获取开始操作,开始streaming的消费运行
        streamingContext.start();
        try {
            streamingContext.awaitTermination();//流式计算:无穷尽的死循环,Wait for the execution to stop.
        } catch (InterruptedException e) {
            e.printStackTrace();
        }


    }


}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93

结果数据为:

19/06/28 12:01:10 INFO MemoryStore: Block broadcast_16_piece0 stored as bytes in memory (estimated size 2.1 KB, free 1997.1 MB)
19/06/28 12:01:10 INFO BlockManagerInfo: Added broadcast_16_piece0 in memory on 172.19.50.22:54667 (size: 2.1 KB, free: 1997.1 MB)
19/06/28 12:01:10 INFO SparkContext: Created broadcast 16 from broadcast at DAGScheduler.scala:1006
19/06/28 12:01:10 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 16 (MapPartitionsRDD[33] at mapToPair at SparkDemo.java:54) (first 15 tasks are for partitions Vector(0))
19/06/28 12:01:10 INFO TaskSchedulerImpl: Adding task set 16.0 with 1 tasks
19/06/28 12:01:10 INFO TaskSetManager: Starting task 0.0 in stage 16.0 (TID 16, localhost, executor driver, partition 0, PROCESS_LOCAL, 4710 bytes)
19/06/28 12:01:10 INFO Executor: Running task 0.0 in stage 16.0 (TID 16)
19/06/28 12:01:10 INFO KafkaRDD: Computing topic test_tank007, partition 0 offsets 2312 -> 2361
19/06/28 12:01:10 INFO Executor: Finished task 0.0 in stage 16.0 (TID 16). 579 bytes result sent to driver
19/06/28 12:01:10 INFO TaskSetManager: Finished task 0.0 in stage 16.0 (TID 16) in 4 ms on localhost (executor driver) (1/1)
19/06/28 12:01:10 INFO TaskSchedulerImpl: Removed TaskSet 16.0, whose tasks have all completed, from pool 
value:1468
value:1469
value:1470
value:1471
value:1472
value:1473
value:1474
value:1475
value:1476
value:1477
value:1478
value:1479

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/码创造者/article/detail/945573
推荐阅读
相关标签
  

闽ICP备14008679号