赞
踩
准备工作,需要配置本地scala环境,配置方法见我的另外一篇博客:
https://blog.csdn.net/fct2001140269/article/details/93978940
我的pom.xml也一并贴出,方便小伙伴们参考:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>swtx</groupId> <artifactId>MySparkStreamTest1</artifactId> <version>1.0-SNAPSHOT</version> <inceptionYear>2008</inceptionYear> <properties> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> <encoding>UTF-8</encoding> <scala.version>2.11.12</scala.version> <spark.version>2.2.0</spark.version> <hadoop.version>2.7.2</hadoop.version> </properties> <repositories> <repository> <id>scala-tools.org</id> <name>Scala-Tools Maven2 Repository</name> <url>http://scala-tools.org/repo-releases</url> </repository> </repositories> <pluginRepositories> <pluginRepository> <id>scala-tools.org</id> <name>Scala-Tools Maven2 Repository</name> <url>http://scala-tools.org/repo-releases</url> </pluginRepository> </pluginRepositories> <dependencies> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>${scala.version}</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.4</version> <scope>test</scope> </dependency> <dependency> <groupId>org.specs</groupId> <artifactId>specs</artifactId> <version>1.2.5</version> <scope>test</scope> </dependency> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>${scala.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.11</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>${hadoop.version}</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.11</artifactId> <version>0.10.0.1</version> </dependency> <!-- <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka_2.10</artifactId> <version>1.0.0</version> </dependency>--> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka-0-10_2.11</artifactId> <version>2.3.0</version> </dependency> <!-- StringUtils --> <dependency> <groupId>commons-lang</groupId> <artifactId>commons-lang</artifactId> <version>2.5</version> </dependency> </dependencies> <build> <sourceDirectory>src/main/scala</sourceDirectory> <testSourceDirectory>src/test/scala</testSourceDirectory> <plugins> <!--(start) for package jar with dependencies --> <plugin> <artifactId>maven-assembly-plugin</artifactId> <version>3.0.0</version> <configuration> <archive> <manifest> <mainClass>swtx.SparkDemo</mainClass> </manifest> </archive> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> <executions> <execution> <id>make-assembly</id> <!-- this is used for inheritance merges --> <phase>package</phase> <!-- bind to the packaging phase --> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> <!--(end) for package jar with dependencies --> <plugin> <groupId>org.scala-tools</groupId> <artifactId>maven-scala-plugin</artifactId> <executions> <execution> <goals> <goal>compile</goal> <goal>testCompile</goal> </goals> </execution> </executions> <configuration> <scalaVersion>${scala.version}</scalaVersion> <args> <arg>-target:jvm-1.8</arg> </args> </configuration> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-eclipse-plugin</artifactId> <configuration> <downloadSources>true</downloadSources> <buildcommands> <buildcommand>ch.epfl.lamp.sdt.core.scalabuilder</buildcommand> </buildcommands> <additionalProjectnatures> <projectnature>ch.epfl.lamp.sdt.core.scalanature</projectnature> </additionalProjectnatures> <classpathContainers> <classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer> <classpathContainer>ch.epfl.lamp.sdt.launching.SCALA_CONTAINER</classpathContainer> </classpathContainers> </configuration> </plugin> </plugins> </build> <reporting> <plugins> <plugin> <groupId>org.scala-tools</groupId> <artifactId>maven-scala-plugin</artifactId> <configuration> <scalaVersion>${scala.version}</scalaVersion> </configuration> </plugin> </plugins> </reporting> </project>
二、spark streaming 接受kafka数据
具体的spark-streaming 的demo实现 ,从kafka接受数据
package swtx; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.api.java.function.VoidFunction; import org.apache.spark.streaming.Durations; import org.apache.spark.streaming.api.java.JavaInputDStream; import org.apache.spark.streaming.api.java.JavaPairDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; import org.apache.spark.streaming.kafka010.ConsumerStrategies; import org.apache.spark.streaming.kafka010.KafkaUtils; import org.apache.spark.streaming.kafka010.LocationStrategies; import scala.Tuple2; import java.util.Arrays; import java.util.Collection; import java.util.HashMap; import java.util.Map; /** * Created by user on 2019/6/27. */ public class SparkDemo { public static void main(String[] args) { //设置SparkConf,配置连接资源信息.local[2]表示本地模式两个core SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount"); //设置批次时间,测试可以用5秒,创建StreamingContext对象,运行时需要 JavaStreamingContext streamingContext = new JavaStreamingContext(conf, Durations.seconds(5)); //创建kafka连接配置参数,放入map中,使用spark-streaming的连接kafka的工具包KafkaUtils类衔接kafka和spark。 //最终返回InputDStream对象。类型:ConsumerRecord<String, String> Map<String, Object> kafkaParams = new HashMap<>(); kafkaParams.put("bootstrap.servers", "xxx.xx.xx.xxx:9092");//多个可用ip可用","隔开 kafkaParams.put("key.deserializer", StringDeserializer.class); kafkaParams.put("value.deserializer", StringDeserializer.class); kafkaParams.put("group.id", "fang_id"); Collection<String> topics = Arrays.asList("test_tank007");//配置topic,可以是数组 JavaInputDStream<ConsumerRecord<String, String>> javaInputDStream = KafkaUtils.createDirectStream( streamingContext, LocationStrategies.PreferConsistent() , ConsumerStrategies.Subscribe(topics, kafkaParams)); //将参数传入方法的的参数,采用匿名内部类的方法,new inferface名,重写其方法,返回DStream //mapToPair:Return a new DStream by applying a function to all elements of this DStream. JavaPairDStream<String, String> javaPairDStream = javaInputDStream.mapToPair(new PairFunction<ConsumerRecord<String, String>, String, String>() { private static final long serialVersionUID = 1L; @Override public Tuple2<String, String> call(ConsumerRecord<String, String> consumerRecord) throws Exception { return new Tuple2<>(consumerRecord.key(), consumerRecord.value()); } }); /*对DStream操作,循环遍历DStream中的所有元素,对每一个元素应用对应的函数 * 对DStream的操作拆分为两部,第一步:对DStream代表的底层rdd操作,遍历,每个rdd对象(数据块) * 第二步:对每一个rdd中的每个元组tuple执行遍历,拿到tuple中每个k-v键值对,直接打印输出; */ javaPairDStream.foreachRDD(new VoidFunction<JavaPairRDD<String, String>>() { @Override public void call(JavaPairRDD<String, String> javaPairRDD) throws Exception { // TODO Auto-generated method stub javaPairRDD.foreach(new VoidFunction<Tuple2<String, String>>() { @Override public void call(Tuple2<String, String> tuple2) throws Exception { // TODO Auto-generated method stub System.out.println(tuple2._2); } }); } }); //rdd本身也是惰性操作,需要streamingContext.start();来获取开始操作,开始streaming的消费运行 streamingContext.start(); try { streamingContext.awaitTermination();//流式计算:无穷尽的死循环,Wait for the execution to stop. } catch (InterruptedException e) { e.printStackTrace(); } } }
结果数据为:
19/06/28 12:01:10 INFO MemoryStore: Block broadcast_16_piece0 stored as bytes in memory (estimated size 2.1 KB, free 1997.1 MB) 19/06/28 12:01:10 INFO BlockManagerInfo: Added broadcast_16_piece0 in memory on 172.19.50.22:54667 (size: 2.1 KB, free: 1997.1 MB) 19/06/28 12:01:10 INFO SparkContext: Created broadcast 16 from broadcast at DAGScheduler.scala:1006 19/06/28 12:01:10 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 16 (MapPartitionsRDD[33] at mapToPair at SparkDemo.java:54) (first 15 tasks are for partitions Vector(0)) 19/06/28 12:01:10 INFO TaskSchedulerImpl: Adding task set 16.0 with 1 tasks 19/06/28 12:01:10 INFO TaskSetManager: Starting task 0.0 in stage 16.0 (TID 16, localhost, executor driver, partition 0, PROCESS_LOCAL, 4710 bytes) 19/06/28 12:01:10 INFO Executor: Running task 0.0 in stage 16.0 (TID 16) 19/06/28 12:01:10 INFO KafkaRDD: Computing topic test_tank007, partition 0 offsets 2312 -> 2361 19/06/28 12:01:10 INFO Executor: Finished task 0.0 in stage 16.0 (TID 16). 579 bytes result sent to driver 19/06/28 12:01:10 INFO TaskSetManager: Finished task 0.0 in stage 16.0 (TID 16) in 4 ms on localhost (executor driver) (1/1) 19/06/28 12:01:10 INFO TaskSchedulerImpl: Removed TaskSet 16.0, whose tasks have all completed, from pool value:1468 value:1469 value:1470 value:1471 value:1472 value:1473 value:1474 value:1475 value:1476 value:1477 value:1478 value:1479
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。