当前位置:   article > 正文

spark streaming demo (java 1.8)

spark streaming demo

spark streaming demo 使用java 1.8 

kafka获取数据, 写入redis的一个简单demo

 

发送给kafka的数据包:

{'type': 'Data', 'values': [{'compID': '3333', 'name': '冷却油压力', 'serial_num': '1', 'time': '2019-06-18 16:49:51', 'gateMac': 'D4:36:39:1A:0D:D3', 'function_code': '3', 'data': '7'}, {'compID': '3333', 'name': '工作油压力', 'serial_num': '1', 'time': '2019-06-18 16:49:51', 'gateMac': 'D4:36:39:1A:0D:D3', 'function_code': '3', 'data': '76'}]}

 

StreamingKafka.java
  1. package com.imooc.spark;
  2. import com.alibaba.fastjson.JSON;
  3. import com.alibaba.fastjson.JSONArray;
  4. import com.alibaba.fastjson.JSONObject;
  5. import com.google.gson.Gson;
  6. import org.apache.kafka.clients.consumer.ConsumerRecord;
  7. import org.apache.kafka.common.TopicPartition;
  8. import org.apache.log4j.Logger;
  9. import org.apache.spark.SparkConf;
  10. import org.apache.spark.api.java.JavaRDD;
  11. import org.apache.spark.api.java.function.FlatMapFunction;
  12. import org.apache.spark.api.java.function.Function;
  13. import org.apache.spark.streaming.Durations;
  14. import org.apache.spark.streaming.StreamingContext;
  15. import org.apache.spark.streaming.api.java.*;
  16. import org.apache.spark.streaming.kafka010.ConsumerStrategies;
  17. import org.apache.spark.streaming.kafka010.KafkaUtils;
  18. import org.apache.spark.streaming.kafka010.LocationStrategies;
  19. import redis.clients.jedis.Jedis;
  20. import scala.Tuple2;
  21. import java.sql.Connection;
  22. import java.text.SimpleDateFormat;
  23. import java.util.*;
  24. public class StreamingKafka {
  25. public static void main(String[] args) throws Exception{
  26. System.setProperty("hadoop.home.dir", "G:\\winutils\\");
  27. String topics = "yu1";
  28. String brokers = "192.168.3.101:9093";
  29. SparkConf conf = new SparkConf().setMaster("local[2]")
  30. .setAppName("StreamingKafka")
  31. .set("spark.rdd.compress", "true")
  32. .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
  33. JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(5));
  34. Collection<String> topicSet = new HashSet<>(Arrays.asList(topics.split(",")));
  35. Map<String, Object> kafkaParams = new HashMap<>();
  36. kafkaParams.put("bootstrap.servers", brokers);
  37. kafkaParams.put("group.id", "group11111111");
  38. kafkaParams.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  39. kafkaParams.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  40. kafkaParams.put("auto.offset.reset", "latest");
  41. // Map<TopicPartition, Long> offsets = new HashMap<>();
  42. // offsets.put(new TopicPartition("yu_test_Data", 0), 2L);
  43. final JavaInputDStream<ConsumerRecord<String, String>> lines = KafkaUtils.createDirectStream(
  44. jssc,
  45. LocationStrategies.PreferConsistent(),
  46. ConsumerStrategies.Subscribe(topicSet, kafkaParams));
  47. /**
  48. * 数据清洗
  49. */
  50. JavaDStream<deviceData> dstream = lines
  51. .flatMap(new FlatMapFunction<ConsumerRecord<String, String>, deviceData>() {
  52. @Override
  53. public Iterator<deviceData> call(ConsumerRecord<String, String> stringStringConsumerRecord) throws Exception {
  54. String message = stringStringConsumerRecord.value().replaceAll("\"", "");
  55. return DataHandle(message).iterator();
  56. }
  57. });
  58. /**
  59. * 数据存储
  60. */
  61. dstream.foreachRDD(rdd -> {
  62. rdd.foreach(record->{
  63. Jedis jedis = new Jedis("192.168.3.176");
  64. jedis.hset(record.compID + "/" + record.gateMac, record.operationValue, record.operationData);
  65. });
  66. });
  67. dstream.print();
  68. jssc.start();
  69. jssc.awaitTermination();
  70. }
  71. /**
  72. * 解析Data类型的数据
  73. */
  74. private static List<deviceData> DataHandle(String message) {
  75. List<deviceData> d = new ArrayList<>();
  76. try {
  77. JSONObject DataObject = JSON.parseObject(message);
  78. String dataType = (String) DataObject.get("type");
  79. if (dataType.equals("Data") || dataType.equals("data")) {
  80. JSONArray dataList = JSON.parseArray(DataObject.get("values").toString());
  81. for (int i = 0; i < dataList.size(); i++) {
  82. deviceData d1 = new deviceData();
  83. JSONObject dataDict = dataList.getJSONObject(i);
  84. d1.compID = dataDict.get("compID").toString();
  85. d1.gateMac = dataDict.get("gateMac").toString();
  86. d1.operationValue = dataDict.get("name").toString();
  87. d1.operationData = dataDict.get("data").toString();
  88. d1.gatherTime = dataDict.get("time").toString();
  89. d.add(d1);
  90. }
  91. }
  92. } catch (Throwable t) {
  93. t.printStackTrace();
  94. }
  95. return d;
  96. }
  97. }
deviceData.java 
  1. package com.imooc.spark;
  2. /**
  3. * 设备数据的数据结构
  4. */
  5. class deviceData {
  6. String compID;
  7. String gateMac;
  8. String operationValue;
  9. String operationData;
  10. String gatherTime;
  11. long timestamp;
  12. @Override
  13. public String toString() {
  14. return "compID: " + compID + " operationValue: " + operationValue + " operationData: " + operationData;
  15. }
  16. }

运行代码显示如下:

使用python脚本模拟发包

  1. from kafka import KafkaProducer
  2. import json
  3. import time
  4. import random
  5. import threading
  6. '''
  7. 生产者demo
  8. 向test_lyl2主题中循环写入10条json数据
  9. 注意事项:要写入json数据需加上value_serializer参数,如下代码
  10. '''
  11. sj = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
  12. def send_msg(Num):
  13. producer = KafkaProducer(
  14. value_serializer=lambda v: json.dumps(v).encode('utf-8'),
  15. bootstrap_servers=['192.168.3.101:9093']
  16. )
  17. for i in range(Num):
  18. time.sleep(4)
  19. payload = dict()
  20. payload["type"] = "Data"
  21. payload["values"] = []
  22. a = random.randint(5, 10)
  23. payload["values"].append(
  24. {"function_code": "3", "serial_num": "1", "data": "%s" % a,
  25. "time": "%s" % sj, "name": "冷却油压力", "compID": "3333", "gateMac": "D4:36:39:1A:0D:D3"})
  26. a = random.randint(40, 100)
  27. # a = 0
  28. payload["values"].append(
  29. {"function_code": "3", "serial_num": "1", "data": "%s" % a,
  30. "time": "%s" % sj, "name": "工作油压力", "compID": "3333", "gateMac": "D4:36:39:1A:0D:D3"})
  31. a = random.randint(0, 20)
  32. # a = 0
  33. payload["values"].append(
  34. {"function_code": "3", "serial_num": "1", "data": "%s" % a,
  35. "time": "%s" % sj, "name": "冷却水温度", "compID": "3333", "gateMac": "D4:36:39:1A:0D:D3"})
  36. a = random.randint(50, 60)
  37. payload["values"].append(
  38. {"function_code": "3", "serial_num": "1", "data": "%s" % a,
  39. "time": "%s" % sj, "name": "液位", "compID": "3333", "gateMac": "D4:36:39:1A:0D:D3"})
  40. a = random.randint(1, 20)
  41. # a = 0
  42. payload["values"].append(
  43. {"data": "%s" % a, "time": "%s" % sj, "name": "转速", "compID": "3333", "gateMac": "D4:36:39:1A:0D:D3"})
  44. print("==", str(payload))
  45. producer.send('yu1', str(payload))
  46. print("============%s" % i)
  47. end = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
  48. print("===开始时间:%s" % sj)
  49. print("=====截止时间: %s" % end)
  50. producer.close()
  51. # def thread_start():
  52. # num = 4
  53. # Num = 1000000
  54. # Threads = []
  55. #
  56. # for i in range(num):
  57. # Threads.append(threading.Thread(target=send_msg, args=(Num,)))
  58. # for t in Threads:
  59. # # t.setDaemon(True)
  60. # t.start()
  61. if __name__ == "__main__":
  62. # thread_start()
  63. send_msg(10000000)

 

pom.xml

  1. <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  2. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
  3. <modelVersion>4.0.0</modelVersion>
  4. <groupId>com.imooc.spark</groupId>
  5. <artifactId>sparktrain</artifactId>
  6. <version>1.0</version>
  7. <inceptionYear>2008</inceptionYear>
  8. <properties>
  9. <scala.version>2.11.12</scala.version>
  10. <!--<kafka.version>0.9.0.0</kafka.version>-->
  11. <spark.version>2.1.0</spark.version>
  12. <hadoop.version>2.6.0-cdh5.7.0</hadoop.version>
  13. <hbase.version>1.2.0-cdh5.7.0</hbase.version>
  14. </properties>
  15. <!--添加cloudera的repository-->
  16. <repositories>
  17. <repository>
  18. <id>cloudera</id>
  19. <url>https://repository.cloudera.com/artifactory/cloudera-repos</url>
  20. </repository>
  21. </repositories>
  22. <dependencies>
  23. <dependency>
  24. <groupId>org.scala-lang</groupId>
  25. <artifactId>scala-library</artifactId>
  26. <version>${scala.version}</version>
  27. </dependency>
  28. <!-- Kafka 依赖-->
  29. <!--
  30. <dependency>
  31. <groupId>org.apache.kafka</groupId>
  32. <artifactId>kafka_2.11</artifactId>
  33. <version>${kafka.version}</version>
  34. </dependency>
  35. -->
  36. <!-- Hadoop 依赖-->
  37. <dependency>
  38. <groupId>org.apache.hadoop</groupId>
  39. <artifactId>hadoop-client</artifactId>
  40. <version>${hadoop.version}</version>
  41. </dependency>
  42. <!-- HBase 依赖-->
  43. <dependency>
  44. <groupId>org.apache.hbase</groupId>
  45. <artifactId>hbase-client</artifactId>
  46. <version>${hbase.version}</version>
  47. </dependency>
  48. <dependency>
  49. <groupId>org.apache.hbase</groupId>
  50. <artifactId>hbase-server</artifactId>
  51. <version>${hbase.version}</version>
  52. </dependency>
  53. <!-- Spark Streaming 依赖-->
  54. <dependency>
  55. <groupId>org.apache.spark</groupId>
  56. <artifactId>spark-streaming_2.11</artifactId>
  57. <version>${spark.version}</version>
  58. </dependency>
  59. <dependency>
  60. <groupId>com.alibaba</groupId>
  61. <artifactId>fastjson</artifactId>
  62. <version>1.2.47</version>
  63. </dependency>
  64. <dependency>
  65. <groupId>com.google.code.gson</groupId>
  66. <artifactId>gson</artifactId>
  67. <version>2.2.4</version>
  68. </dependency>
  69. <dependency>
  70. <groupId>redis.clients</groupId>
  71. <artifactId>jedis</artifactId>
  72. <version>2.3.0</version>
  73. </dependency>
  74. <!-- Spark Streaming整合Flume 依赖-->
  75. <dependency>
  76. <groupId>org.apache.spark</groupId>
  77. <artifactId>spark-streaming-flume_2.11</artifactId>
  78. <version>${spark.version}</version>
  79. </dependency>
  80. <dependency>
  81. <groupId>org.apache.spark</groupId>
  82. <artifactId>spark-streaming-flume-sink_2.11</artifactId>
  83. <version>${spark.version}</version>
  84. </dependency>
  85. <dependency>
  86. <groupId>org.apache.spark</groupId>
  87. <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
  88. <version>${spark.version}</version>
  89. </dependency>
  90. <dependency>
  91. <groupId>org.apache.commons</groupId>
  92. <artifactId>commons-lang3</artifactId>
  93. <version>3.5</version>
  94. </dependency>
  95. <!-- Spark SQL 依赖-->
  96. <dependency>
  97. <groupId>org.apache.spark</groupId>
  98. <artifactId>spark-sql_2.11</artifactId>
  99. <version>${spark.version}</version>
  100. </dependency>
  101. <dependency>
  102. <groupId>com.fasterxml.jackson.module</groupId>
  103. <artifactId>jackson-module-scala_2.11</artifactId>
  104. <version>2.6.5</version>
  105. </dependency>
  106. <dependency>
  107. <groupId>net.jpountz.lz4</groupId>
  108. <artifactId>lz4</artifactId>
  109. <version>1.3.0</version>
  110. </dependency>
  111. <dependency>
  112. <groupId>mysql</groupId>
  113. <artifactId>mysql-connector-java</artifactId>
  114. <version>5.1.38</version>
  115. </dependency>
  116. <dependency>
  117. <groupId>org.apache.flume.flume-ng-clients</groupId>
  118. <artifactId>flume-ng-log4jappender</artifactId>
  119. <version>1.6.0</version>
  120. </dependency>
  121. </dependencies>
  122. <build>
  123. <!--
  124. <sourceDirectory>src/main/scala</sourceDirectory>
  125. <testSourceDirectory>src/test/scala</testSourceDirectory>
  126. -->
  127. <plugins>
  128. <plugin>
  129. <groupId>org.scala-tools</groupId>
  130. <artifactId>maven-scala-plugin</artifactId>
  131. <executions>
  132. <execution>
  133. <goals>
  134. <goal>compile</goal>
  135. <goal>testCompile</goal>
  136. </goals>
  137. </execution>
  138. </executions>
  139. <configuration>
  140. <scalaVersion>${scala.version}</scalaVersion>
  141. <args>
  142. <arg>-target:jvm-1.8</arg>
  143. </args>
  144. </configuration>
  145. </plugin>
  146. <plugin>
  147. <groupId>org.apache.maven.plugins</groupId>
  148. <artifactId>maven-eclipse-plugin</artifactId>
  149. <configuration>
  150. <source>8</source>
  151. <target>8</target>
  152. <downloadSources>true</downloadSources>
  153. <buildcommands>
  154. <buildcommand>ch.epfl.lamp.sdt.core.scalabuilder</buildcommand>
  155. </buildcommands>
  156. <additionalProjectnatures>
  157. <projectnature>ch.epfl.lamp.sdt.core.scalanature</projectnature>
  158. </additionalProjectnatures>
  159. <classpathContainers>
  160. <classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer>
  161. <classpathContainer>ch.epfl.lamp.sdt.launching.SCALA_CONTAINER</classpathContainer>
  162. </classpathContainers>
  163. </configuration>
  164. </plugin>
  165. </plugins>
  166. </build>
  167. <reporting>
  168. <plugins>
  169. <plugin>
  170. <groupId>org.scala-tools</groupId>
  171. <artifactId>maven-scala-plugin</artifactId>
  172. <configuration>
  173. <scalaVersion>${scala.version}</scalaVersion>
  174. </configuration>
  175. </plugin>
  176. </plugins>
  177. </reporting>
  178. </project>

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/代码探险家/article/detail/945578
推荐阅读
相关标签
  

闽ICP备14008679号