当前位置:   article > 正文

flink将数据结果写入elasticsearch_flink写入es

flink写入es

一、需求

用flink流处理,将结果数据落地到elasticsearch中。

二、软件版本

flink1.8.0

elasticsearch7.8.1

kafka_2.11-1.0.0

java1.8

三、代码

1、maven的pom文件

  1. <properties>
  2. <compiler.version>1.8</compiler.version>
  3. <flink.version>1.8.0</flink.version>
  4. <java.version>1.8</java.version>
  5. <hadoop.version>2.7.5</hadoop.version>
  6. <scala.binary.version>2.11</scala.binary.version>
  7. <maven.compiler.source>1.8</maven.compiler.source>
  8. <maven.compiler.target>1.8</maven.compiler.target>
  9. <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  10. </properties>
  11. <dependencies>
  12. <dependency>
  13. <groupId>org.apache.flink</groupId>
  14. <artifactId>flink-connector-kafka-0.11_${scala.binary.version}</artifactId>
  15. <version>${flink.version}</version>
  16. </dependency>
  17. <dependency>
  18. <groupId>org.apache.flink</groupId>
  19. <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
  20. <version>${flink.version}</version>
  21. <scope>provided</scope>
  22. </dependency>
  23. <!--加入下面两个依赖才会出现 Flink 的日志出来-->
  24. <dependency>
  25. <groupId>org.slf4j</groupId>
  26. <artifactId>slf4j-api</artifactId>
  27. <version>1.7.25</version>
  28. </dependency>
  29. <dependency>
  30. <groupId>org.slf4j</groupId>
  31. <artifactId>slf4j-simple</artifactId>
  32. <version>1.7.25</version>
  33. </dependency>
  34. <!--对象和json 互相转换的-->
  35. <dependency>
  36. <groupId>com.alibaba</groupId>
  37. <artifactId>fastjson</artifactId>
  38. <version>1.2.44</version>
  39. </dependency>
  40. <dependency>
  41. <groupId>org.apache.hadoop</groupId>
  42. <artifactId>hadoop-client</artifactId>
  43. <version>${hadoop.version}</version>
  44. </dependency>
  45. <!-- 指定mysql-connector的依赖 -->
  46. <dependency>
  47. <groupId>mysql</groupId>
  48. <artifactId>mysql-connector-java</artifactId>
  49. <version>5.1.38</version>
  50. </dependency>
  51. <dependency>
  52. <groupId>org.apache.flink</groupId>
  53. <artifactId>flink-connector-elasticsearch6_2.11</artifactId>
  54. <version>1.8.0</version>
  55. </dependency>
  56. <dependency>
  57. <groupId>org.apache.httpcomponents</groupId>
  58. <artifactId>httpclient</artifactId>
  59. <version>4.5.5</version>
  60. </dependency>
  61. </dependencies>
  62. <!-- This profile helps to make things run out of the box in IntelliJ -->
  63. <!-- Its adds Flink's core classes to the runtime class path. -->
  64. <!-- Otherwise they are missing in IntelliJ, because the dependency is 'provided' -->
  65. <profiles>
  66. <profile>
  67. <id>add-dependencies-for-IDEA</id>
  68. <activation>
  69. <property>
  70. <name>idea.version</name>
  71. </property>
  72. </activation>
  73. <dependencies>
  74. <dependency>
  75. <groupId>org.apache.flink</groupId>
  76. <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
  77. <version>${flink.version}</version>
  78. <scope>compile</scope>
  79. </dependency>
  80. </dependencies>
  81. </profile>
  82. </profiles>

2、代码

  1. package com.felix.kafka;
  2. import com.alibaba.fastjson.JSON;
  3. import com.alibaba.fastjson.JSONObject;
  4. import com.felix.model.SecureLog;
  5. import com.felix.utils.CommonUtils;
  6. import com.felix.utils.DateUtils;
  7. import com.felix.utils.HttpUtils;
  8. import com.felix.utils.PropertyUtils;
  9. import org.apache.flink.api.common.functions.MapFunction;
  10. import org.apache.flink.api.common.functions.RuntimeContext;
  11. import org.apache.flink.api.common.serialization.SimpleStringSchema;
  12. import org.apache.flink.streaming.api.TimeCharacteristic;
  13. import org.apache.flink.streaming.api.datastream.DataStream;
  14. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  15. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  16. import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
  17. import org.apache.flink.streaming.api.watermark.Watermark;
  18. import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
  19. import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
  20. import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink;
  21. import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
  22. import org.apache.http.Header;
  23. import org.apache.http.HttpHost;
  24. import org.apache.http.message.BasicHeader;
  25. import org.elasticsearch.action.index.IndexRequest;
  26. import org.elasticsearch.client.Requests;
  27. import javax.annotation.Nullable;
  28. import java.util.*;
  29. public class KafkaMain {
  30. public static void main(String[] args) throws Exception{
  31. final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  32. //设置容错机制
  33. env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
  34. env.setParallelism(1);
  35. // 5s钟启动一次checkpoint Maven projects need to be imported: Import Changes Enable Auto-Import
  36. // env.enableCheckpointing(10000);
  37. //
  38. // // 设置checkpoint只checkpoint一次
  39. // env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
  40. // // 设置两次checkpoint的最小时间间隔
  41. // env.getCheckpointConfig().setMinPauseBetweenCheckpoints(5000);
  42. // // 设置checkpoint的超时时长
  43. // env.getCheckpointConfig().setCheckpointTimeout(60000);
  44. // // 最大并行度
  45. // env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
  46. // // 当程序关闭时,触发额外的checkpoint
  47. // env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
  48. //
  49. // // 设置checkpoint的地址
  50. // env.setStateBackend(new FsStateBackend("hdfs://node01:8020/flink-checkpoint/"));
  51. Properties props = PropertyUtils.getPropertiesWithKafkaConfig();
  52. DataStreamSource<String> dataStreamSource = env.addSource(new FlinkKafkaConsumer011<>(
  53. PropertyUtils.get("input.topic"), //kafka topic
  54. new SimpleStringSchema(), // String 序列化
  55. props)).setParallelism(1);
  56. // dataStreamSource.print(); //把从 kafka 读取到的数据打印在控制台
  57. DataStream<SecureLog> secureLogStream = dataStreamSource.map(new MapFunction<String, SecureLog>() {
  58. @Override
  59. public SecureLog map(String msgJson) throws Exception {
  60. SecureLog secureLog = new SecureLog();
  61. JSONObject jsonObject = JSON.parseObject(msgJson);
  62. secureLog.setSyslogProgram(jsonObject.getString("syslog_program"));
  63. secureLog.setPath(jsonObject.getString("path"));
  64. secureLog.setSyslogTimestamp(jsonObject.getString("syslog_timestamp"));
  65. secureLog.setTimestamp(jsonObject.getString("@timestamp"));
  66. secureLog.setSyslogHostname(jsonObject.getString("syslog_hostname"));
  67. secureLog.setVersion(jsonObject.getString("@version"));
  68. secureLog.setMessage(jsonObject.getString("message"));
  69. secureLog.setHost(jsonObject.getString("host"));
  70. secureLog.setReceivedFrom(jsonObject.getString("received_from"));
  71. secureLog.setSyslogPid(jsonObject.getString("syslog_pid"));
  72. secureLog.setReceivedAt(jsonObject.getString("received_at"));
  73. secureLog.setSyslogMessage(jsonObject.getString("syslog_message"));
  74. secureLog.setType(jsonObject.getString("type"));
  75. secureLog.setSysLogTimestampLong(DateUtils.dateStrToLong(secureLog.getSyslogTimestamp()));
  76. return secureLog;
  77. }
  78. });
  79. // secureLogStream.print();
  80. //设置水印机制
  81. DataStream<SecureLog> secureLogDataStreamWithWaterMask = secureLogStream.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<SecureLog>() {
  82. Long currentTimeStamp = 0l;
  83. // 延迟时间
  84. Long maxDelayTime = 2000l;
  85. @Nullable
  86. @Override
  87. public Watermark getCurrentWatermark() {
  88. return new Watermark(currentTimeStamp - maxDelayTime);
  89. }
  90. @Override
  91. public long extractTimestamp(SecureLog secureLog, long previousElementTimestamp) {
  92. return Math.max(secureLog.getSysLogTimestampLong(), previousElementTimestamp);
  93. }
  94. });
  95. // secureLogDataStreamWithWaterMask.print();
  96. //落地到mysql
  97. // secureLogDataStreamWithWaterMask.addSink(new MySqlSink());
  98. //落地到es
  99. List<HttpHost> httpHosts = new ArrayList<>();
  100. httpHosts.add(new HttpHost("192.168.126.128", 9200, "http"));
  101. httpHosts.add(new HttpHost("192.168.126.129", 9200, "http"));
  102. httpHosts.add(new HttpHost("192.168.126.130", 9200, "http"));
  103. // use a ElasticsearchSink.Builder to create an ElasticsearchSink
  104. ElasticsearchSink.Builder<SecureLog> esSinkBuilder = new ElasticsearchSink.Builder<SecureLog>(
  105. httpHosts,
  106. new ElasticsearchSinkFunction<SecureLog>() {
  107. public IndexRequest createIndexRequest(SecureLog secureLog) {
  108. // Map<String, SecureLog> json = new HashMap<>();
  109. // json.put("data", secureLog);
  110. Map<String, Object> map = CommonUtils.beanToMap(secureLog);
  111. return Requests.indexRequest()
  112. .index("flink_secure_log")
  113. .type("_doc")
  114. .source(map);
  115. }
  116. @Override
  117. public void process(SecureLog secureLog, RuntimeContext runtimeContext, RequestIndexer requestIndexer) {
  118. requestIndexer.add(createIndexRequest(secureLog));
  119. }
  120. }
  121. );
  122. // 设置批量写数据的缓冲区大小
  123. esSinkBuilder.setBulkFlushMaxActions(1);
  124. // provide a RestClientFactory for custom configuration on the internally created REST client
  125. Header[] defaultHeaders = new Header[]{new BasicHeader("Authorization", HttpUtils.getHeader())};
  126. esSinkBuilder.setRestClientFactory(
  127. restClientBuilder -> {
  128. restClientBuilder.setDefaultHeaders(defaultHeaders);
  129. }
  130. );
  131. secureLogDataStreamWithWaterMask.print();
  132. secureLogDataStreamWithWaterMask.addSink(esSinkBuilder.build());
  133. env.execute("Flink add data source");
  134. }
  135. }

3、连接elasticsearch时,如果设置了用户名活密码,需要再http请求的header中加入Authorization授权信息

  1. package com.felix.utils;
  2. import org.apache.commons.codec.binary.Base64;
  3. import java.nio.charset.Charset;
  4. public class HttpUtils {
  5. private static final String APP_KEY = "elastic"; //es用户名
  6. private static final String SECRET_KEY = "123456";//es访问密码
  7. /**
  8. * 构造Basic Auth认证头信息
  9. *
  10. * @return
  11. */
  12. public static String getHeader() {
  13. String auth = APP_KEY + ":" + SECRET_KEY;
  14. byte[] encodedAuth = Base64.encodeBase64(auth.getBytes(Charset.forName("US-ASCII")));
  15. String authHeader = "Basic " + new String(encodedAuth);
  16. return authHeader;
  17. }
  18. }
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小丑西瓜9/article/detail/606336
推荐阅读
相关标签
  

闽ICP备14008679号