当前位置:   article > 正文

flink连接es8以及遇到的坑_nosuchmethoderror: org.apache.http.client.utils.ur

nosuchmethoderror: org.apache.http.client.utils.urlencodedutils.formatsegmen

一、. 基本使用

1. es8连接依赖

  1. <dependency>
  2. <groupId>co.elastic.clients</groupId>
  3. <artifactId>elasticsearch-java</artifactId>
  4. <version>8.5.2</version>
  5. </dependency>

2. 连接类

  1. import co.elastic.clients.elasticsearch.ElasticsearchClient;
  2. import co.elastic.clients.elasticsearch.core.IndexRequest;
  3. import co.elastic.clients.json.jackson.JacksonJsonpMapper;
  4. import co.elastic.clients.transport.ElasticsearchTransport;
  5. import co.elastic.clients.transport.rest_client.RestClientTransport;
  6. import org.apache.flink.configuration.Configuration;
  7. import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
  8. import org.apache.http.HttpHost;
  9. import org.elasticsearch.client.RestClient;
  10. public class ES8Writer extends RichSinkFunction<MyEntity> {
  11. private RestClient restClient;
  12. private ElasticsearchClient client;
  13. @Override
  14. public void open(Configuration parameters) throws Exception {
  15. HttpHost[] httpHosts = new HttpHost[Constraints.esHosts.length];
  16. for (int i=0;i<Constraints.esHosts.length;i++){
  17. httpHosts[i]=new HttpHost(Constraints.esHosts[i], 9200, "http");
  18. }
  19. restClient = RestClient.builder(httpHosts).build();
  20. ElasticsearchTransport transport = new RestClientTransport(
  21. restClient, new JacksonJsonpMapper());
  22. client = new ElasticsearchClient(transport);
  23. }
  24. @Override
  25. public void close() throws Exception {
  26. client.shutdown();
  27. restClient.close();
  28. }
  29. @Override
  30. public void invoke(MyEntity value, Context context) throws Exception {
  31. IndexRequest<Object> indexRequest = new IndexRequest.Builder<>()
  32. .index("index"
  33. .id(value.getId())
  34. .document(value)
  35. .build();
  36. client.index(indexRequest);
  37. }
  38. }

3. 使用

stream.addSink(new ES8Writer()).name("ElasticSearch");

二. 踩到的坑

1. java.lang.NoSuchMethodError: org.apache.http.client.utils.URLEncodedUtils.formatSegments

httpclient 版本问题

增加依赖

  1. <dependency>
  2. <groupId>org.apache.httpcomponents</groupId>
  3. <artifactId>httpclient</artifactId>
  4. <version>4.5.13</version>
  5. </dependency>

2. com.fasterxml.jackson.databind.JsonMappingException: No serializer found for xxxx

传给es的实体类(代码中的MyEntity)必须使用private属性加getter,setter方

3. java.lang.NoSuchMethodError: com.fasterxml.jackson.core.JsonParser.currentToken()

jackson版本问题

增加依赖

  1. <dependency>
  2. <groupId>com.fasterxml.jackson.core</groupId>
  3. <artifactId>jackson-core</artifactId>
  4. <version>2.9.0</version>
  5. </dependency>
  6. <dependency>
  7. <groupId>com.fasterxml.jackson.core</groupId>
  8. <artifactId>jackson-databind</artifactId>
  9. <version>2.9.0</version>
  10. </dependency>

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/菜鸟追梦旅行/article/detail/606535
推荐阅读
相关标签
  

闽ICP备14008679号