赞
踩
一、. 基本使用
1. es8连接依赖
- <dependency>
- <groupId>co.elastic.clients</groupId>
- <artifactId>elasticsearch-java</artifactId>
- <version>8.5.2</version>
- </dependency>
2. 连接类
- import co.elastic.clients.elasticsearch.ElasticsearchClient;
- import co.elastic.clients.elasticsearch.core.IndexRequest;
- import co.elastic.clients.json.jackson.JacksonJsonpMapper;
- import co.elastic.clients.transport.ElasticsearchTransport;
- import co.elastic.clients.transport.rest_client.RestClientTransport;
- import org.apache.flink.configuration.Configuration;
- import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
- import org.apache.http.HttpHost;
- import org.elasticsearch.client.RestClient;
-
- public class ES8Writer extends RichSinkFunction<MyEntity> {
- private RestClient restClient;
- private ElasticsearchClient client;
-
- @Override
- public void open(Configuration parameters) throws Exception {
- HttpHost[] httpHosts = new HttpHost[Constraints.esHosts.length];
- for (int i=0;i<Constraints.esHosts.length;i++){
- httpHosts[i]=new HttpHost(Constraints.esHosts[i], 9200, "http");
- }
- restClient = RestClient.builder(httpHosts).build();
- ElasticsearchTransport transport = new RestClientTransport(
- restClient, new JacksonJsonpMapper());
- client = new ElasticsearchClient(transport);
- }
-
- @Override
- public void close() throws Exception {
- client.shutdown();
- restClient.close();
- }
-
- @Override
- public void invoke(MyEntity value, Context context) throws Exception {
- IndexRequest<Object> indexRequest = new IndexRequest.Builder<>()
- .index("index"
- .id(value.getId())
- .document(value)
- .build();
- client.index(indexRequest);
- }
-
- }
3. 使用
stream.addSink(new ES8Writer()).name("ElasticSearch");
二. 踩到的坑
1. java.lang.NoSuchMethodError: org.apache.http.client.utils.URLEncodedUtils.formatSegments
httpclient 版本问题
增加依赖
- <dependency>
- <groupId>org.apache.httpcomponents</groupId>
- <artifactId>httpclient</artifactId>
- <version>4.5.13</version>
- </dependency>
2. com.fasterxml.jackson.databind.JsonMappingException: No serializer found for xxxx
传给es的实体类(代码中的MyEntity)必须使用private属性加getter,setter方法
3. java.lang.NoSuchMethodError: com.fasterxml.jackson.core.JsonParser.currentToken()
jackson版本问题
增加依赖
- <dependency>
- <groupId>com.fasterxml.jackson.core</groupId>
- <artifactId>jackson-core</artifactId>
- <version>2.9.0</version>
- </dependency>
- <dependency>
- <groupId>com.fasterxml.jackson.core</groupId>
- <artifactId>jackson-databind</artifactId>
- <version>2.9.0</version>
- </dependency>
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。