当前位置:   article > 正文

Flink写数据到Elasticsearch简单实现_flink to elasticsearch

flink to elasticsearch

一、应用场景


针对实时处理的数据需要及时能够搜索出来时,可以选择elasticsearch来支持这一业务。当然还可以选择其他的内存数据库,如redis。而elasticsearch除了强大的全文索引能力外,还支持分布式存储,可以将其作为分布式计算框架的底座,用于存储热数据或者温数据等。常见的组合方式有elasticsearch与sparkstreaming以及flink进行配合共同完成实时或流式处理的场景。

在本次案例中,我们将演示将商品信息数据通过nc发送,由flink程序通过socket读取实时数据流并对其进行简单处理,最后写入到es中。

二、环境说明


  1. 数据生产工具:nc (本案例安装在虚拟机中)
  2. 数据处理框架:flink,版本2.12
  3. 数据存储框架:elasticsearch
  4. 开发工具IDEA
  5. scala依赖版本:2.12
  6. es集群:3台节点 (本案例安装在虚拟机中)
  7. http通讯端口:9200

三、实验步骤


  1. 各种环境搭建,如es、idea等请自行搭建

  2. 启动idea,创建maven项目

  3. 添加依赖至pom.xml中

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-scala_2.12</artifactId>
            <version>1.12.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_2.12</artifactId>
            <version>1.12.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.12</artifactId>
            <version>1.12.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-elasticsearch7_2.11</artifactId>
            <version>1.12.0</version>
        </dependency>
    
        <!-- jackson -->
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-core</artifactId>
            <version>2.11.1</version>
        </dependency>
    </dependencies>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
  4. 如使用scala编程,则需添加scala框架的支持

  5. 编写FlinkElasticsearchSinkDemo客户端程序,完成代码如下:

    package com.suben.es;
    
    import org.apache.flink.api.common.functions.FlatMapFunction;
    import org.apache.flink.api.common.functions.RuntimeContext;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
    import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
    import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink;
    import org.apache.flink.util.Collector;
    import org.apache.http.HttpHost;
    import org.elasticsearch.action.index.IndexRequest;
    import org.elasticsearch.client.Requests;
    
    import java.util.ArrayList;
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    
    public class FlinkElasticsearchSinkDemo {
        public static void main(String[] args) throws Exception {
    
            StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
    
            // 读取socket流数据
            DataStreamSource<String> dataStreamSource = environment.socketTextStream("hadoop003", 12345);
    
            SingleOutputStreamOperator<String> streamOperator = dataStreamSource.flatMap(new FlatMapFunction<String, String>() {
                @Override
                public void flatMap(String s, Collector<String> collector) throws Exception {
                    System.out.println("======接收输入数据=======>" + s);
                }
            });
    
            // 利用es客户端api处理数据,并写入到es索引中
            // 1. 连接至es
            List<HttpHost> httpHosts = new ArrayList<>();
            httpHosts.add(new HttpHost("hadoop002", 9200, "http"));
            httpHosts.add(new HttpHost("hadoop003",9200,"http"));
            httpHosts.add(new HttpHost("hadoop004",9200,"http"));
            // 2. 创建esSink
            ElasticsearchSink.Builder<String> esSinkBuilder = new ElasticsearchSink.Builder<>(httpHosts,
                    new ElasticsearchSinkFunction<String>() {
                        @Override
                        public void process(String s, RuntimeContext runtimeContext, RequestIndexer requestIndexer) {
    
                            System.out.println("======接收输入数据=======>" + s);
    
                            if (s != null && !"".equals(s) && s.contains(",")) {
    
                                String[] splits = s.split(",");
    
                                Map<String, String> jsonMap = new HashMap<>();
                                jsonMap.put("id", splits[0]);
                                jsonMap.put("title", splits[1]);
                                jsonMap.put("category", splits[2]);
                                jsonMap.put("price", splits[3]);
                                jsonMap.put("images", splits[4]);
    
                                IndexRequest indexRequest = Requests.indexRequest();
                                indexRequest.index("flink_stream");
                                indexRequest.id(splits[0]);
                                indexRequest.source(jsonMap);
    
                                requestIndexer.add(indexRequest);
                            }
                        }
                    });
            // 3. 数据输出
            esSinkBuilder.setBulkFlushMaxActions(1);
            // 将es处理逻辑作为sink,处理dataStreamSource
            dataStreamSource.addSink(esSinkBuilder.build());
            // 启动执行
            environment.execute("flink-es");
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
  6. 启动nc,命令如下:

    nc -l -p 12345
    
    • 1
  7. 启动FlinkElasticsearchSinkDemo客户端程序,程序正常运行会看到console下打印出数据:
    在这里插入图片描述

  8. 在nc中添加测试数据,回车,观察es中有无数据被写入

    10,[10]华为手机,手机,3008.0,http://www.suben.com/hw.jpg
    11,[11]华为手机,手机,3008.0,http://www.suben.com/hw.jpg
    12,[12]华为手机,手机,3008.0,http://www.suben.com/hw.jpg
    13,[13]华为手机,手机,3008.0,http://www.suben.com/hw.jpg
    14,[14]华为手机,手机,5008.0,http://www.suben.com/hw.jpg

    在chrome浏览器插件elasticsearch-head中数据被成功写入es中:
    在这里插入图片描述

四、一点思考


上述实验仅仅实现了简单的将通过nc数据逐条或者多条输入,是手动的方式操作的。假如是真实业务场景,如电商系统一直源源不断地产生数据,如需模拟这种场景,又该如何实现呢?欢迎大家在评论区给我言,我们一起讨论,相互学习吧!

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Gausst松鼠会/article/detail/606448
推荐阅读
相关标签
  

闽ICP备14008679号