赞
踩
spark streaming demo 使用java 1.8
从kafka获取数据, 写入redis的一个简单demo
发送给kafka的数据包:
{'type': 'Data', 'values': [{'compID': '3333', 'name': '冷却油压力', 'serial_num': '1', 'time': '2019-06-18 16:49:51', 'gateMac': 'D4:36:39:1A:0D:D3', 'function_code': '3', 'data': '7'}, {'compID': '3333', 'name': '工作油压力', 'serial_num': '1', 'time': '2019-06-18 16:49:51', 'gateMac': 'D4:36:39:1A:0D:D3', 'function_code': '3', 'data': '76'}]}
StreamingKafka.java
- package com.imooc.spark;
-
- import com.alibaba.fastjson.JSON;
- import com.alibaba.fastjson.JSONArray;
- import com.alibaba.fastjson.JSONObject;
- import com.google.gson.Gson;
- import org.apache.kafka.clients.consumer.ConsumerRecord;
- import org.apache.kafka.common.TopicPartition;
- import org.apache.log4j.Logger;
- import org.apache.spark.SparkConf;
- import org.apache.spark.api.java.JavaRDD;
- import org.apache.spark.api.java.function.FlatMapFunction;
- import org.apache.spark.api.java.function.Function;
- import org.apache.spark.streaming.Durations;
- import org.apache.spark.streaming.StreamingContext;
- import org.apache.spark.streaming.api.java.*;
- import org.apache.spark.streaming.kafka010.ConsumerStrategies;
- import org.apache.spark.streaming.kafka010.KafkaUtils;
- import org.apache.spark.streaming.kafka010.LocationStrategies;
-
- import redis.clients.jedis.Jedis;
- import scala.Tuple2;
-
- import java.sql.Connection;
- import java.text.SimpleDateFormat;
- import java.util.*;
-
- public class StreamingKafka {
- public static void main(String[] args) throws Exception{
-
- System.setProperty("hadoop.home.dir", "G:\\winutils\\");
-
- String topics = "yu1";
- String brokers = "192.168.3.101:9093";
- SparkConf conf = new SparkConf().setMaster("local[2]")
- .setAppName("StreamingKafka")
- .set("spark.rdd.compress", "true")
- .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
- JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(5));
-
- Collection<String> topicSet = new HashSet<>(Arrays.asList(topics.split(",")));
- Map<String, Object> kafkaParams = new HashMap<>();
- kafkaParams.put("bootstrap.servers", brokers);
- kafkaParams.put("group.id", "group11111111");
- kafkaParams.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
- kafkaParams.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
- kafkaParams.put("auto.offset.reset", "latest");
-
- // Map<TopicPartition, Long> offsets = new HashMap<>();
- // offsets.put(new TopicPartition("yu_test_Data", 0), 2L);
-
- final JavaInputDStream<ConsumerRecord<String, String>> lines = KafkaUtils.createDirectStream(
- jssc,
- LocationStrategies.PreferConsistent(),
- ConsumerStrategies.Subscribe(topicSet, kafkaParams));
-
- /**
- * 数据清洗
- */
- JavaDStream<deviceData> dstream = lines
- .flatMap(new FlatMapFunction<ConsumerRecord<String, String>, deviceData>() {
- @Override
- public Iterator<deviceData> call(ConsumerRecord<String, String> stringStringConsumerRecord) throws Exception {
- String message = stringStringConsumerRecord.value().replaceAll("\"", "");
- return DataHandle(message).iterator();
- }
- });
-
- /**
- * 数据存储
- */
- dstream.foreachRDD(rdd -> {
- rdd.foreach(record->{
- Jedis jedis = new Jedis("192.168.3.176");
- jedis.hset(record.compID + "/" + record.gateMac, record.operationValue, record.operationData);
- });
- });
-
- dstream.print();
-
- jssc.start();
- jssc.awaitTermination();
- }
-
- /**
- * 解析Data类型的数据
- */
- private static List<deviceData> DataHandle(String message) {
- List<deviceData> d = new ArrayList<>();
- try {
- JSONObject DataObject = JSON.parseObject(message);
- String dataType = (String) DataObject.get("type");
- if (dataType.equals("Data") || dataType.equals("data")) {
-
- JSONArray dataList = JSON.parseArray(DataObject.get("values").toString());
- for (int i = 0; i < dataList.size(); i++) {
- deviceData d1 = new deviceData();
- JSONObject dataDict = dataList.getJSONObject(i);
- d1.compID = dataDict.get("compID").toString();
- d1.gateMac = dataDict.get("gateMac").toString();
- d1.operationValue = dataDict.get("name").toString();
- d1.operationData = dataDict.get("data").toString();
- d1.gatherTime = dataDict.get("time").toString();
- d.add(d1);
- }
- }
- } catch (Throwable t) {
- t.printStackTrace();
- }
- return d;
- }
- }

deviceData.java
- package com.imooc.spark;
-
- /**
- * 设备数据的数据结构
- */
- class deviceData {
- String compID;
- String gateMac;
- String operationValue;
- String operationData;
- String gatherTime;
- long timestamp;
-
- @Override
- public String toString() {
- return "compID: " + compID + " operationValue: " + operationValue + " operationData: " + operationData;
- }
- }

运行代码显示如下:
使用python脚本模拟发包
- from kafka import KafkaProducer
- import json
- import time
- import random
- import threading
-
- '''
- 生产者demo
- 向test_lyl2主题中循环写入10条json数据
- 注意事项:要写入json数据需加上value_serializer参数,如下代码
- '''
-
- sj = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
-
-
- def send_msg(Num):
- producer = KafkaProducer(
- value_serializer=lambda v: json.dumps(v).encode('utf-8'),
- bootstrap_servers=['192.168.3.101:9093']
- )
- for i in range(Num):
- time.sleep(4)
- payload = dict()
- payload["type"] = "Data"
- payload["values"] = []
- a = random.randint(5, 10)
- payload["values"].append(
- {"function_code": "3", "serial_num": "1", "data": "%s" % a,
- "time": "%s" % sj, "name": "冷却油压力", "compID": "3333", "gateMac": "D4:36:39:1A:0D:D3"})
- a = random.randint(40, 100)
- # a = 0
- payload["values"].append(
- {"function_code": "3", "serial_num": "1", "data": "%s" % a,
- "time": "%s" % sj, "name": "工作油压力", "compID": "3333", "gateMac": "D4:36:39:1A:0D:D3"})
- a = random.randint(0, 20)
- # a = 0
- payload["values"].append(
- {"function_code": "3", "serial_num": "1", "data": "%s" % a,
- "time": "%s" % sj, "name": "冷却水温度", "compID": "3333", "gateMac": "D4:36:39:1A:0D:D3"})
- a = random.randint(50, 60)
- payload["values"].append(
- {"function_code": "3", "serial_num": "1", "data": "%s" % a,
- "time": "%s" % sj, "name": "液位", "compID": "3333", "gateMac": "D4:36:39:1A:0D:D3"})
- a = random.randint(1, 20)
- # a = 0
- payload["values"].append(
- {"data": "%s" % a, "time": "%s" % sj, "name": "转速", "compID": "3333", "gateMac": "D4:36:39:1A:0D:D3"})
- print("==", str(payload))
- producer.send('yu1', str(payload))
- print("============%s" % i)
- end = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
- print("===开始时间:%s" % sj)
- print("=====截止时间: %s" % end)
- producer.close()
-
-
- # def thread_start():
- # num = 4
- # Num = 1000000
- # Threads = []
- #
- # for i in range(num):
- # Threads.append(threading.Thread(target=send_msg, args=(Num,)))
- # for t in Threads:
- # # t.setDaemon(True)
- # t.start()
-
- if __name__ == "__main__":
- # thread_start()
- send_msg(10000000)
-

pom.xml
- <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <groupId>com.imooc.spark</groupId>
- <artifactId>sparktrain</artifactId>
- <version>1.0</version>
- <inceptionYear>2008</inceptionYear>
- <properties>
- <scala.version>2.11.12</scala.version>
- <!--<kafka.version>0.9.0.0</kafka.version>-->
- <spark.version>2.1.0</spark.version>
- <hadoop.version>2.6.0-cdh5.7.0</hadoop.version>
- <hbase.version>1.2.0-cdh5.7.0</hbase.version>
- </properties>
- <!--添加cloudera的repository-->
- <repositories>
- <repository>
- <id>cloudera</id>
- <url>https://repository.cloudera.com/artifactory/cloudera-repos</url>
- </repository>
- </repositories>
-
- <dependencies>
- <dependency>
- <groupId>org.scala-lang</groupId>
- <artifactId>scala-library</artifactId>
- <version>${scala.version}</version>
- </dependency>
-
- <!-- Kafka 依赖-->
- <!--
- <dependency>
- <groupId>org.apache.kafka</groupId>
- <artifactId>kafka_2.11</artifactId>
- <version>${kafka.version}</version>
- </dependency>
- -->
-
- <!-- Hadoop 依赖-->
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-client</artifactId>
- <version>${hadoop.version}</version>
- </dependency>
-
- <!-- HBase 依赖-->
- <dependency>
- <groupId>org.apache.hbase</groupId>
- <artifactId>hbase-client</artifactId>
- <version>${hbase.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.hbase</groupId>
- <artifactId>hbase-server</artifactId>
- <version>${hbase.version}</version>
- </dependency>
-
- <!-- Spark Streaming 依赖-->
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-streaming_2.11</artifactId>
- <version>${spark.version}</version>
- </dependency>
-
- <dependency>
- <groupId>com.alibaba</groupId>
- <artifactId>fastjson</artifactId>
- <version>1.2.47</version>
- </dependency>
- <dependency>
- <groupId>com.google.code.gson</groupId>
- <artifactId>gson</artifactId>
- <version>2.2.4</version>
- </dependency>
- <dependency>
- <groupId>redis.clients</groupId>
- <artifactId>jedis</artifactId>
- <version>2.3.0</version>
- </dependency>
-
-
- <!-- Spark Streaming整合Flume 依赖-->
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-streaming-flume_2.11</artifactId>
- <version>${spark.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-streaming-flume-sink_2.11</artifactId>
- <version>${spark.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
- <version>${spark.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.commons</groupId>
- <artifactId>commons-lang3</artifactId>
- <version>3.5</version>
- </dependency>
-
- <!-- Spark SQL 依赖-->
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-sql_2.11</artifactId>
- <version>${spark.version}</version>
- </dependency>
-
-
- <dependency>
- <groupId>com.fasterxml.jackson.module</groupId>
- <artifactId>jackson-module-scala_2.11</artifactId>
- <version>2.6.5</version>
- </dependency>
-
- <dependency>
- <groupId>net.jpountz.lz4</groupId>
- <artifactId>lz4</artifactId>
- <version>1.3.0</version>
- </dependency>
-
- <dependency>
- <groupId>mysql</groupId>
- <artifactId>mysql-connector-java</artifactId>
- <version>5.1.38</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flume.flume-ng-clients</groupId>
- <artifactId>flume-ng-log4jappender</artifactId>
- <version>1.6.0</version>
- </dependency>
-
- </dependencies>
-
- <build>
- <!--
- <sourceDirectory>src/main/scala</sourceDirectory>
- <testSourceDirectory>src/test/scala</testSourceDirectory>
- -->
- <plugins>
- <plugin>
- <groupId>org.scala-tools</groupId>
- <artifactId>maven-scala-plugin</artifactId>
- <executions>
- <execution>
- <goals>
- <goal>compile</goal>
- <goal>testCompile</goal>
- </goals>
- </execution>
- </executions>
- <configuration>
- <scalaVersion>${scala.version}</scalaVersion>
- <args>
- <arg>-target:jvm-1.8</arg>
- </args>
- </configuration>
- </plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-eclipse-plugin</artifactId>
- <configuration>
- <source>8</source>
- <target>8</target>
- <downloadSources>true</downloadSources>
- <buildcommands>
- <buildcommand>ch.epfl.lamp.sdt.core.scalabuilder</buildcommand>
- </buildcommands>
- <additionalProjectnatures>
- <projectnature>ch.epfl.lamp.sdt.core.scalanature</projectnature>
- </additionalProjectnatures>
- <classpathContainers>
- <classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer>
- <classpathContainer>ch.epfl.lamp.sdt.launching.SCALA_CONTAINER</classpathContainer>
- </classpathContainers>
- </configuration>
- </plugin>
- </plugins>
- </build>
- <reporting>
- <plugins>
- <plugin>
- <groupId>org.scala-tools</groupId>
- <artifactId>maven-scala-plugin</artifactId>
- <configuration>
- <scalaVersion>${scala.version}</scalaVersion>
- </configuration>
- </plugin>
- </plugins>
- </reporting>
- </project>

Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。