当前位置:   article > 正文

Flink 系例 之 Connectors 连接 ElasticSearch_flink-connector-elasticsearch

flink-connector-elasticsearch

通过使用 Flink DataStream Connectors 数据流连接器连接到 ElasticSearch 搜索引擎的文档数据库 Index,并提供数据流输入与输出操作;

示例环境

  1. java.version: 1.8.x
  2. flink.version: 1.11.1
  3. elasticsearch:6.x

示例数据源 (项目码云下载)

Flink 系例 之 搭建开发环境与数据

示例模块 (pom.xml)

Flink 系例 之 DataStream Connectors 与 示例模块

数据流输入

DataStreamSource.java

  1. package com.flink.examples.elasticsearch;
  2. import com.flink.examples.TUser;
  3. import com.google.gson.Gson;
  4. import org.apache.flink.configuration.Configuration;
  5. import org.apache.flink.streaming.api.datastream.DataStream;
  6. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  7. import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
  8. import org.apache.http.Header;
  9. import org.apache.http.HttpHost;
  10. import org.elasticsearch.action.search.SearchRequest;
  11. import org.elasticsearch.action.search.SearchResponse;
  12. import org.elasticsearch.client.RestClient;
  13. import org.elasticsearch.client.RestClientBuilder;
  14. import org.elasticsearch.client.RestHighLevelClient;
  15. import org.elasticsearch.index.query.QueryBuilders;
  16. import org.elasticsearch.search.SearchHit;
  17. import org.elasticsearch.search.SearchHits;
  18. import org.elasticsearch.search.builder.SearchSourceBuilder;
  19. import java.io.IOException;
  20. import java.util.Map;
  21. /**
  22. * @Description 从elasticsearch中获取数据并输出到DataStream数据流中
  23. */
  24. public class DataStreamSource {
  25. /**
  26. * 官方文档:https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/connectors/elasticsearch.html
  27. */
  28. public static void main(String[] args) throws Exception {
  29. final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  30. env.setParallelism(1);
  31. DataStream<TUser> dataStream = env.addSource(new RichSourceFunction<TUser>(){
  32. private RestClientBuilder builder = null;
  33. //job开始执行,调用此方法创建数据源连接对象,该方法主要用于打开连接
  34. @Override
  35. public void open(Configuration parameters) throws Exception {
  36. super.open(parameters);
  37. builder = RestClient.builder(new HttpHost("192.168.1.3", 9200, "http"));
  38. }
  39. //执行查询并对数据进行封装
  40. @Override
  41. public void run(SourceContext<TUser> ctx) throws Exception {
  42. Gson gson = new Gson();
  43. RestHighLevelClient client = null;
  44. //匹配查询
  45. SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
  46. sourceBuilder.query(QueryBuilders.matchQuery("sex", 1));
  47. //定义索引库
  48. SearchRequest request = new SearchRequest();
  49. request.types("doc");
  50. request.indices("flink_demo");
  51. request.source(sourceBuilder);
  52. try {
  53. client = new RestHighLevelClient(builder);
  54. SearchResponse response = client.search(request, new Header[]{});
  55. SearchHits hits = response.getHits();
  56. System.out.println("查询结果有" + hits.getTotalHits() + "条");
  57. for (SearchHit searchHits : hits ) {
  58. Map<String,Object> dataMap = searchHits.getSourceAsMap();
  59. TUser user = gson.fromJson(gson.toJson(dataMap), TUser.class);
  60. ctx.collect(user);
  61. }
  62. //ID查询
  63. // GetRequest request = new GetRequest( "flink_demo","doc","NeMaoXQBElQ9wTD5MOfB");
  64. // client = new RestHighLevelClient(builder);
  65. // GetResponse getResponse = client.get(request, new Header[]{});
  66. // Map<String,Object> dataMap = getResponse.getSourceAsMap();
  67. // TUser user = gson.fromJson(gson.toJson(dataMap), TUser.class);
  68. // ctx.collect(user);
  69. }catch(IOException ioe){
  70. ioe.printStackTrace();
  71. }finally {
  72. if (client != null){
  73. client.close();
  74. }
  75. }
  76. }
  77. //Job结束时调用
  78. @Override
  79. public void cancel() {
  80. try {
  81. super.close();
  82. } catch (Exception e) {
  83. }
  84. builder = null;
  85. }
  86. });
  87. dataStream.print();
  88. env.execute("flink es to data job");
  89. }
  90. }

数据流输出

DataStreamSink.java

  1. package com.flink.examples.elasticsearch;
  2. import com.flink.examples.TUser;
  3. import com.google.gson.Gson;
  4. import org.apache.flink.api.common.functions.MapFunction;
  5. import org.apache.flink.api.common.functions.RuntimeContext;
  6. import org.apache.flink.streaming.api.datastream.DataStream;
  7. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  8. import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
  9. import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
  10. import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink;
  11. import org.apache.http.HttpHost;
  12. import org.elasticsearch.client.Requests;
  13. import java.util.ArrayList;
  14. import java.util.List;
  15. import java.util.Map;
  16. /**
  17. * @Description 将DataStream数据流输出到elasticsearch中
  18. */
  19. public class DataStreamSink {
  20. /**
  21. * 官方文档:https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/connectors/elasticsearch.html
  22. */
  23. public static void main(String[] args) throws Exception {
  24. final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  25. env.enableCheckpointing(5000);
  26. env.setParallelism(2);
  27. //1.设置Elasticsearch连接,创建索引数据
  28. List<HttpHost> httpHosts = new ArrayList<>();
  29. httpHosts.add(new HttpHost("192.168.1.3", 9200, "http"));
  30. //创建数据源对象 ElasticsearchSink
  31. ElasticsearchSink.Builder<String> esSinkBuilder = new ElasticsearchSink.Builder<String>(httpHosts,
  32. new ElasticsearchSinkFunction<String>() {
  33. @Override
  34. public void process(String user, RuntimeContext ctx, RequestIndexer indexer) {
  35. Gson gson = new Gson();
  36. Map<String,Object> map = gson.fromJson(user, Map.class);
  37. indexer.add(Requests.indexRequest()
  38. .index("flink_demo")
  39. .type("doc")
  40. .source(map));
  41. }
  42. }
  43. );
  44. // 设置批量写数据的最大动作量,对批量请求的配置;这指示接收器在每个元素之后发出,否则它们将被缓冲
  45. esSinkBuilder.setBulkFlushMaxActions(10);
  46. //刷新前缓冲区的最大数据大小(以MB为单位)
  47. esSinkBuilder.setBulkFlushMaxSizeMb(500);
  48. //论缓冲操作的数量或大小如何都要刷新的时间间隔
  49. esSinkBuilder.setBulkFlushInterval(4000);
  50. //2.写入数据到流中
  51. //封装数据
  52. TUser user = new TUser();
  53. user.setId(9);
  54. user.setName("wang1");
  55. user.setAge(23);
  56. user.setSex(1);
  57. user.setAddress("CN");
  58. user.setCreateTimeSeries(System.currentTimeMillis());
  59. DataStream<String> input = env.fromElements(user).map((MapFunction<TUser, String>) value -> new Gson().toJson(value));
  60. //3.将数据写入到Elasticearch中
  61. input.addSink(esSinkBuilder.build());
  62. env.execute("flink data to es job");
  63. }
  64. }

数据展示

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小丑西瓜9/article/detail/606175
推荐阅读
相关标签
  

闽ICP备14008679号