当前位置:   article > 正文

Flink sink ES 写入 ES(带密码)_flink es sink

flink es sink

 1.依赖 

  1. <dependencies>
  2. <dependency>
  3. <groupId>org.apache.flink</groupId>
  4. <artifactId>flink-java</artifactId>
  5. <version>1.10.1</version>
  6. </dependency>
  7. <dependency>
  8. <groupId>org.apache.flink</groupId>
  9. <artifactId>flink-streaming-java_2.12</artifactId>
  10. <version>1.10.1</version>
  11. </dependency>
  12. <dependency>
  13. <groupId>org.apache.flink</groupId>
  14. <artifactId>flink-connector-kafka-0.11_2.12</artifactId>
  15. <version>1.10.1</version>
  16. </dependency>
  17. <dependency>
  18. <groupId>org.apache.bahir</groupId>
  19. <artifactId>flink-connector-redis_2.11</artifactId>
  20. <version>1.0</version>
  21. </dependency>
  22. <dependency>
  23. <groupId>org.apache.flink</groupId>
  24. <artifactId>flink-connector-elasticsearch6_2.12</artifactId>
  25. <version>1.10.1</version>
  26. </dependency>
  27. <dependency>
  28. <groupId>mysql</groupId>
  29. <artifactId>mysql-connector-java</artifactId>
  30. <version>5.1.44</version>
  31. </dependency>
  32. <dependency>
  33. <groupId>org.apache.flink</groupId>
  34. <artifactId>flink-statebackend-rocksdb_2.12</artifactId>
  35. <version>1.10.1</version>
  36. </dependency>
  37. <dependency>
  38. <groupId>org.apache.flink</groupId>
  39. <artifactId>flink-table-planner_2.12</artifactId>
  40. <version>1.10.1</version>
  41. </dependency>
  42. <dependency>
  43. <groupId>org.apache.flink</groupId>
  44. <artifactId>flink-table-planner-blink_2.12</artifactId>
  45. <version>1.10.1</version>
  46. </dependency>
  47. <dependency>
  48. <groupId>org.apache.flink</groupId>
  49. <artifactId>flink-csv</artifactId>
  50. <version>1.10.1</version>
  51. </dependency>
  52. </dependencies>

2.实体

  1. // 传感器温度读数的数据类型
  2. public class SensorReading {
  3. // 属性:id,时间戳,温度值
  4. private String id;
  5. private Long timestamp;
  6. private Double temperature;
  7. public SensorReading() {
  8. }
  9. public SensorReading(String id, Long timestamp, Double temperature) {
  10. this.id = id;
  11. this.timestamp = timestamp;
  12. this.temperature = temperature;
  13. }
  14. public String getId() {
  15. return id;
  16. }
  17. public void setId(String id) {
  18. this.id = id;
  19. }
  20. public Long getTimestamp() {
  21. return timestamp;
  22. }
  23. public void setTimestamp(Long timestamp) {
  24. this.timestamp = timestamp;
  25. }
  26. public Double getTemperature() {
  27. return temperature;
  28. }
  29. public void setTemperature(Double temperature) {
  30. this.temperature = temperature;
  31. }
  32. @Override
  33. public String toString() {
  34. return "SensorReading{" +
  35. "id='" + id + '\'' +
  36. ", timestamp=" + timestamp +
  37. ", temperature=" + temperature +
  38. '}';
  39. }
  40. }

3.代码

  1. import org.apache.flink.api.common.functions.RuntimeContext;
  2. import org.apache.flink.streaming.api.datastream.DataStream;
  3. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  4. import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
  5. import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
  6. import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
  7. import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink;
  8. import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink.Builder;
  9. import org.apache.flink.streaming.connectors.elasticsearch6.RestClientFactory;
  10. import org.apache.http.HttpHost;
  11. import org.apache.http.auth.AuthScope;
  12. import org.apache.http.auth.UsernamePasswordCredentials;
  13. import org.apache.http.client.CredentialsProvider;
  14. import org.apache.http.impl.client.BasicCredentialsProvider;
  15. import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
  16. import org.elasticsearch.action.index.IndexRequest;
  17. import org.elasticsearch.client.Requests;
  18. import java.util.ArrayList;
  19. import java.util.HashMap;
  20. import org.elasticsearch.client.RestClientBuilder;
  21. import org.elasticsearch.client.RestClientBuilder.HttpClientConfigCallback;
  22. public class SinkTest3_Es {
  23. public static void main(String[] args) throws Exception {
  24. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  25. env.setParallelism(1);
  26. // 从文件读取数据
  27. DataStream<String> inputStream = env.readTextFile("D:\\sensor.txt");
  28. // 转换成SensorReading类型
  29. DataStream<SensorReading> dataStream = inputStream.map(line -> {
  30. String[] fields = line.split(",");
  31. return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
  32. });
  33. // 定义es的连接配置 不带用户名密码
  34. ArrayList<HttpHost> httpHosts = new ArrayList<>();
  35. httpHosts.add(new HttpHost("localhost", 9200));
  36. dataStream.addSink(
  37. new ElasticsearchSink.Builder<SensorReading>(httpHosts, new MyEsSinkFunction()).build());
  38. env.execute();
  39. // 定义es的连接配置 带用户名密码
  40. /* RestClientFactory restClientFactory = new RestClientFactory() {
  41. @Override
  42. public void configureRestClientBuilder(RestClientBuilder restClientBuilder) {
  43. CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
  44. credentialsProvider.setCredentials(AuthScope.ANY,
  45. new UsernamePasswordCredentials("用户名", "密码"));
  46. restClientBuilder.setHttpClientConfigCallback(new HttpClientConfigCallback() {
  47. @Override
  48. public HttpAsyncClientBuilder customizeHttpClient(
  49. HttpAsyncClientBuilder httpAsyncClientBuilder) {
  50. httpAsyncClientBuilder.disableAuthCaching();
  51. return httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
  52. }
  53. });
  54. }
  55. };
  56. ArrayList<HttpHost> httpHosts = new ArrayList<>();
  57. httpHosts.add(new HttpHost("localhost", 9200));
  58. ElasticsearchSink.Builder<SensorReading> sensorReadingBuilder = new ElasticsearchSink.Builder<>(
  59. httpHosts,
  60. new MyEsSinkFunction());
  61. sensorReadingBuilder.setRestClientFactory(restClientFactory);
  62. dataStream.addSink(sensorReadingBuilder.build());
  63. env.execute();*/
  64. }
  65. // 实现自定义的ES写入操作
  66. public static class MyEsSinkFunction implements ElasticsearchSinkFunction<SensorReading> {
  67. @Override
  68. public void process(SensorReading element, RuntimeContext ctx, RequestIndexer indexer) {
  69. // 定义写入的数据source
  70. HashMap<String, String> dataSource = new HashMap<>();
  71. dataSource.put("id", element.getId());
  72. dataSource.put("temp", element.getTemperature().toString());
  73. dataSource.put("ts", element.getTimestamp().toString());
  74. // 创建请求,作为向es发起的写入命令
  75. IndexRequest indexRequest = Requests.indexRequest()
  76. .index("sensor")
  77. .type("readingdata")
  78. .source(dataSource);
  79. // 用index发送请求
  80. indexer.add(indexRequest);
  81. }
  82. }
  83. }

4.

D:\\sensor.txt
  1. sensor_1,1547718199,35.8
  2. sensor_6,1547718201,15.4
  3. sensor_7,1547718202,6.7
  4. sensor_10,1547718205,38.1
  5. sensor_1,1547718207,36.3
  6. sensor_1,1547718209,32.8
  7. sensor_1,1547718212,37.1

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/花生_TL007/article/detail/606523
推荐阅读
相关标签
  

闽ICP备14008679号