当前位置:   article > 正文

SparkStreaming写数据到Elasticsearch简单实现_sparkstreaming通过javadstream.foreachrdd()将数据写入es

sparkstreaming通过javadstream.foreachrdd()将数据写入es

一、应用场景


针对实时处理的数据需要及时能够搜索出来时,可以选择elasticsearch来支持这一业务。当然还可以选择其他的内存数据库,如redis。而elasticsearch除了强大的全文索引能力外,还支持分布式存储,可以将其作为分布式计算框架的底座,用于存储热数据或者温数据等。常见的组合方式有elasticsearch与sparkstreaming以及flink进行配合共同完成实时或流式处理的场景。

在本次案例中,我们将演示将商品信息数据通过nc发送,由sparkstreaming通过socket读取实时数据流并对其进行简单处理,最后写入到es中。

二、环境说明


  1. 数据生产工具:nc (本案例安装在虚拟机中)
  2. 数据处理框架:sparkstreaming,版本2.12
  3. 数据存储框架:elasticsearch
  4. 开发工具IDEA
  5. es集群:3台节点 (本案例安装在虚拟机中)
  6. http通讯端口:9200

三、实验步骤


  1. 各种环境搭建,如es、idea等请自行搭建

  2. 启动idea,创建maven项目

  3. 添加依赖至pom.xml中

    <dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.12</artifactId>
            <version>3.0.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.12</artifactId>
            <version>3.0.0</version>
        </dependency>
        <dependency>
            <groupId>org.elasticsearch</groupId>
            <artifactId>elasticsearch</artifactId>
            <version>7.8.0</version>
        </dependency>
        <!-- elasticsearch 的客户端 -->
        <dependency>
            <groupId>org.elasticsearch.client</groupId>
            <artifactId>elasticsearch-rest-high-level-client</artifactId>
            <version>7.8.0</version>
        </dependency>
        <!-- elasticsearch 依赖 2.x 的 log4j -->
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-api</artifactId>
            <version>2.8.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-core</artifactId>
            <version>2.8.2</version>
        </dependency>
    </dependencies>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
  4. 添加scala框架的支持,或者在pom.xml中自行添加

  5. 编写sparkstreaming客户端程序,完成代码如下:

    package com.suben.es.sstream
    
    import org.apache.http.HttpHost
    import org.apache.spark.SparkConf
    import org.apache.spark.storage.StorageLevel
    import org.apache.spark.streaming.dstream.ReceiverInputDStream
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    import org.elasticsearch.action.index.IndexRequest
    import org.elasticsearch.client.{RequestOptions, RestClient, RestHighLevelClient}
    import org.elasticsearch.common.xcontent.XContentType
    
    import java.util.Date
    
    object SparkStreamingESTest {
    
      val INDEX_NAME = "stream"
    
      def main(args: Array[String]): Unit = {
        val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreamingOperateES")
        val streamingContext = new StreamingContext(sparkConf, Seconds(3))
        // 读取TCP端口数据,即nc发送过来的数据
        val inputDStream: ReceiverInputDStream[String] = streamingContext.socketTextStream("hadoop003", 12345, StorageLevel.MEMORY_ONLY)
        // 对数据进行处理
        inputDStream.foreachRDD(rdd => {
          println(">>>>>>>>>>>>" + new Date())
          rdd.foreach(data => {
            println("==========接收到的数据>>>>>>>>>>>>", data)
            if (data != null && !"".equals(data)) {
              // 将数据写入到elasticsearch索引为shopping中去
              // 0. 处理数据,假如输入数据格式如下:9,小米手机,手机,2008.0,http://www.suben.com/xm.jpg
              val splits = data.split(",")
              // 选择第一个元素作为id
              val id = splits(0)
              // 1. 编写es客户端代码
              val client = new RestHighLevelClient(RestClient.builder(new HttpHost("hadoop002", 9200, "http")))
              // 2. 创建请求对象
              val request = new IndexRequest();
              // 3. 设置索引和id
              request.index(INDEX_NAME).id(id)
              // 4. 将splits数组中的数据转成json字符串
              val productJson =
              s"""
                 | { "id":"${splits(0)}",
                 | "title":"${splits(1)}",
                 | "category":"${splits(2)}",
                 | "price":"${splits(3)}",
                 | "images":"${splits(4)}" }
                 |""".stripMargin;
              println("===productJson===", productJson)
              // 6. 将jsonObject封装到request中
              request.source(productJson, XContentType.JSON)
              // 7. 请求添加文档
              val response = client.index(request, RequestOptions.DEFAULT)
    
              System.out.println("_index:" + response.getIndex());
              System.out.println("_id:" + response.getId());
              System.out.println("_result:" + response.getResult());
    
              client.close()
            }
          })
        })
    
        // 启动程序
        streamingContext.start()
        // 等待执行结果
        streamingContext.awaitTermination()
    
      }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
  6. 启动nc,命令如下:

    nc -l -p 12345
    
    • 1
  7. 启动sparkstreaming客户端程序

  8. 在nc中添加测试数据,回车,观察es中有无数据被写入

    10,[10]华为手机,手机,3008.0,http://www.suben.com/hw.jpg
    11,[11]华为手机,手机,3008.0,http://www.suben.com/hw.jpg
    12,[12]华为手机,手机,3008.0,http://www.suben.com/hw.jpg
    13,[13]华为手机,手机,3008.0,http://www.suben.com/hw.jpg
    14,[14]华为手机,手机,5008.0,http://www.suben.com/hw.jpg

    在chrome浏览器插件elasticsearch-head中数据被成功写入es中:
    在这里插入图片描述

四、思考


上述实验仅仅实现了简单的将通过nc数据逐条或者多条输入,是手动的方式操作的。假如是真实业务场景,如电商系统一直源源不断地产生数据,如需模拟这种场景,又该如何实现呢?欢迎大家在评论区给我言,我们一起讨论,相互学习吧!

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/你好赵伟/article/detail/606207
推荐阅读
相关标签
  

闽ICP备14008679号