赞
踩
上篇记录了flink如何读取kafka的数据,我们都知道flink有许多自带的连接器,那么如何把读取的数写入到相关容器中呢?本篇记录下flink 的 elasticsearch 连接器的相关操作。
flink提供了很多连接器,如图所示,我们可以在官网上查到详细的说明
我们在前面介绍了kafka的连接器,本篇主要介绍下elasticsearch的连接器。
首先我们需要注意对应的版本
读取kafka参考上一篇文章的实现
1、5.x版本
我们可以看到在5.x版本中使用的连接器为
flink-connector-elasticsearch5_2.11
需要flink版本1.3.0版本以上
添加pom
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch5_2.11</artifactId>
<version>1.5.4</version>
</dependency>
官方代码实现
import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction; import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer; import org.apache.flink.streaming.connectors.elasticsearch5.ElasticsearchSink; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.client.Requests; import java.net.InetAddress; import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; //参考上一篇kafka的读取获取kafka数据流 DataStream<String> input = ...; Map<String, String> config = new HashMap<>(); config.put("cluster.name", "my-cluster-name"); // This instructs the sink to emit after every element, otherwise they would be buffered config.put("bulk.flush.max.actions", "1"); List<InetSocketAddress> transportAddresses = new ArrayList<>(); transportAddresses.add(new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300)); transportAddresses.add(new InetSocketAddress(InetAddress.getByName("10.2.3.1"), 9300)); input.addSink(new ElasticsearchSink<>(config, transportAddresses, new ElasticsearchSinkFunction<String>() { public IndexRequest createIndexRequest(String element) { Map<String, String> json = new HashMap<>(); json.put("data", element); return Requests.indexRequest() .index("my-index") .type("my-type") .source(json); } @Override public void process(String element, RuntimeContext ctx, RequestIndexer indexer) { indexer.add(createIndexRequest(element)); } }));
2、6.x,7.x版本
添加pom
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch6_2.11</artifactId>
<version>1.8.3</version>
</dependency>
官方代码实现
import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction; import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer; import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink; import org.apache.http.HttpHost; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.client.Requests; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; DataStream<String> input = ...; List<HttpHost> httpHosts = new ArrayList<>(); httpHosts.add(new HttpHost("127.0.0.1", 9200, "http")); httpHosts.add(new HttpHost("10.2.3.1", 9200, "http")); // use a ElasticsearchSink.Builder to create an ElasticsearchSink ElasticsearchSink.Builder<String> esSinkBuilder = new ElasticsearchSink.Builder<>( httpHosts, new ElasticsearchSinkFunction<String>() { public IndexRequest createIndexRequest(String element) { Map<String, String> json = new HashMap<>(); json.put("data", element); return Requests.indexRequest() .index("my-index") .type("my-type") .source(json); } @Override public void process(String element, RuntimeContext ctx, RequestIndexer indexer) { indexer.add(createIndexRequest(element)); } } ); // configuration for the bulk requests; this instructs the sink to emit after every element, otherwise they would be buffered esSinkBuilder.setBulkFlushMaxActions(1); // provide a RestClientFactory for custom configuration on the internally created REST client esSinkBuilder.setRestClientFactory( restClientBuilder -> { restClientBuilder.setDefaultHeaders(...) restClientBuilder.setMaxRetryTimeoutMillis(...) restClientBuilder.setPathPrefix(...) restClientBuilder.setHttpClientConfigCallback(...) } ); // finally, build and add the sink to the job's pipeline input.addSink(esSinkBuilder.build());
参考实现
public static void write2es(List<HttpHost> httpHosts, DataStream<Object> dataStream, String index, String type) { ElasticsearchSink.Builder<Object> esSinkBuilder = new ElasticsearchSink.Builder<Object>(httpHosts, new ElasticsearchSinkFunction<Object>() { public List<IndexRequest> createIndexRequest(Object event) { List<IndexRequest> indexRequestList = new ArrayList<>(); Map<String, String> map =CustomElasticSearchMap.getObjectToEsMap(event); indexRequestList.add(Requests.indexRequest() .index(index) .type("_doc") .source(map)); return indexRequestList; } @Override public void process(Object event, RuntimeContext runtimeContext, RequestIndexer requestIndexer) { List<IndexRequest> indexRequestList = createIndexRequest(event); for (int i = 0; i < indexRequestList.size(); i++) { requestIndexer.add(indexRequestList.get(i)); } } }); esSinkBuilder.setRestClientFactory(new RestClientFactory() { @Override public void configureRestClientBuilder(RestClientBuilder restClientBuilder) { Header[] headers = new BasicHeader[]{new BasicHeader("Content-Type","application/json")}; restClientBuilder.setDefaultHeaders(headers); //以数组的形式可以添加多个header } }); esSinkBuilder.setBulkFlushMaxActions(1); esSinkBuilder.setFailureHandler(new RetryRejectedExecutionFailureHandler()); dataStream.addSink(esSinkBuilder.build()); }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。