赞
踩
通过使用 Flink DataStream Connectors 数据流连接器连接到 ElasticSearch 搜索引擎的文档数据库 Index,并提供数据流输入与输出操作;
示例环境
- java.version: 1.8.x
- flink.version: 1.11.1
- elasticsearch:6.x
示例数据源 (项目码云下载)
示例模块 (pom.xml)
Flink 系例 之 DataStream Connectors 与 示例模块
数据流输入
DataStreamSource.java
- package com.flink.examples.elasticsearch;
-
- import com.flink.examples.TUser;
- import com.google.gson.Gson;
- import org.apache.flink.configuration.Configuration;
- import org.apache.flink.streaming.api.datastream.DataStream;
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
- import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
- import org.apache.http.Header;
- import org.apache.http.HttpHost;
- import org.elasticsearch.action.search.SearchRequest;
- import org.elasticsearch.action.search.SearchResponse;
- import org.elasticsearch.client.RestClient;
- import org.elasticsearch.client.RestClientBuilder;
- import org.elasticsearch.client.RestHighLevelClient;
- import org.elasticsearch.index.query.QueryBuilders;
- import org.elasticsearch.search.SearchHit;
- import org.elasticsearch.search.SearchHits;
- import org.elasticsearch.search.builder.SearchSourceBuilder;
-
- import java.io.IOException;
- import java.util.Map;
- /**
- * @Description 从elasticsearch中获取数据并输出到DataStream数据流中
- */
- public class DataStreamSource {
- /**
- * 官方文档:https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/connectors/elasticsearch.html
- */
- public static void main(String[] args) throws Exception {
- final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(1);
- DataStream<TUser> dataStream = env.addSource(new RichSourceFunction<TUser>(){
- private RestClientBuilder builder = null;
- //job开始执行,调用此方法创建数据源连接对象,该方法主要用于打开连接
- @Override
- public void open(Configuration parameters) throws Exception {
- super.open(parameters);
- builder = RestClient.builder(new HttpHost("192.168.1.3", 9200, "http"));
- }
- //执行查询并对数据进行封装
- @Override
- public void run(SourceContext<TUser> ctx) throws Exception {
- Gson gson = new Gson();
- RestHighLevelClient client = null;
- //匹配查询
- SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
- sourceBuilder.query(QueryBuilders.matchQuery("sex", 1));
- //定义索引库
- SearchRequest request = new SearchRequest();
- request.types("doc");
- request.indices("flink_demo");
- request.source(sourceBuilder);
- try {
- client = new RestHighLevelClient(builder);
- SearchResponse response = client.search(request, new Header[]{});
- SearchHits hits = response.getHits();
- System.out.println("查询结果有" + hits.getTotalHits() + "条");
- for (SearchHit searchHits : hits ) {
- Map<String,Object> dataMap = searchHits.getSourceAsMap();
- TUser user = gson.fromJson(gson.toJson(dataMap), TUser.class);
- ctx.collect(user);
- }
- //ID查询
- // GetRequest request = new GetRequest( "flink_demo","doc","NeMaoXQBElQ9wTD5MOfB");
- // client = new RestHighLevelClient(builder);
- // GetResponse getResponse = client.get(request, new Header[]{});
- // Map<String,Object> dataMap = getResponse.getSourceAsMap();
- // TUser user = gson.fromJson(gson.toJson(dataMap), TUser.class);
- // ctx.collect(user);
- }catch(IOException ioe){
- ioe.printStackTrace();
- }finally {
- if (client != null){
- client.close();
- }
- }
- }
- //Job结束时调用
- @Override
- public void cancel() {
- try {
- super.close();
- } catch (Exception e) {
- }
- builder = null;
- }
- });
- dataStream.print();
- env.execute("flink es to data job");
- }
-
- }
数据流输出
DataStreamSink.java
- package com.flink.examples.elasticsearch;
-
- import com.flink.examples.TUser;
- import com.google.gson.Gson;
- import org.apache.flink.api.common.functions.MapFunction;
- import org.apache.flink.api.common.functions.RuntimeContext;
- import org.apache.flink.streaming.api.datastream.DataStream;
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
- import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
- import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
- import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink;
- import org.apache.http.HttpHost;
- import org.elasticsearch.client.Requests;
-
- import java.util.ArrayList;
- import java.util.List;
- import java.util.Map;
-
- /**
- * @Description 将DataStream数据流输出到elasticsearch中
- */
- public class DataStreamSink {
-
- /**
- * 官方文档:https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/connectors/elasticsearch.html
- */
- public static void main(String[] args) throws Exception {
- final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.enableCheckpointing(5000);
- env.setParallelism(2);
- //1.设置Elasticsearch连接,创建索引数据
- List<HttpHost> httpHosts = new ArrayList<>();
- httpHosts.add(new HttpHost("192.168.1.3", 9200, "http"));
- //创建数据源对象 ElasticsearchSink
- ElasticsearchSink.Builder<String> esSinkBuilder = new ElasticsearchSink.Builder<String>(httpHosts,
- new ElasticsearchSinkFunction<String>() {
- @Override
- public void process(String user, RuntimeContext ctx, RequestIndexer indexer) {
- Gson gson = new Gson();
- Map<String,Object> map = gson.fromJson(user, Map.class);
- indexer.add(Requests.indexRequest()
- .index("flink_demo")
- .type("doc")
- .source(map));
- }
- }
- );
- // 设置批量写数据的最大动作量,对批量请求的配置;这指示接收器在每个元素之后发出,否则它们将被缓冲
- esSinkBuilder.setBulkFlushMaxActions(10);
- //刷新前缓冲区的最大数据大小(以MB为单位)
- esSinkBuilder.setBulkFlushMaxSizeMb(500);
- //论缓冲操作的数量或大小如何都要刷新的时间间隔
- esSinkBuilder.setBulkFlushInterval(4000);
-
- //2.写入数据到流中
- //封装数据
- TUser user = new TUser();
- user.setId(9);
- user.setName("wang1");
- user.setAge(23);
- user.setSex(1);
- user.setAddress("CN");
- user.setCreateTimeSeries(System.currentTimeMillis());
- DataStream<String> input = env.fromElements(user).map((MapFunction<TUser, String>) value -> new Gson().toJson(value));
- //3.将数据写入到Elasticearch中
- input.addSink(esSinkBuilder.build());
- env.execute("flink data to es job");
- }
-
- }
数据展示
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。