当前位置:   article > 正文

Flink Kafka数据写入Elasticsearch_elk收集flink日志

elk收集flink日志

    现在相当多场景设计为:kafka->flink-es,即数据采集存储到kafka,通过flink消费kafka数据,实时计算,结果存储到es,最后通过kibana展现,下面介绍具体实现过程。

环境搭建

    flink参考 Flink环境搭建,令人惊愕的HA,kafka和elasticsearch的环境搭建参考参考之前写的文章,如下链接

Elasticsearch7.X-Springboot整合ELK进行日志收集<1>,环境搭建我们参考之前的链接即可,具体就不详细介绍了,搭建好后分别启动kafka、elasticsearch,下面是具体代码和测试验证过程。

代码实现

1、pom.xml 引入下面的包

<properties>    <flink.version>1.13.0</flink.version>     <scala.binary.version>2.11</scala.binary.version>    <java.version>1.8</java.version>    <elasticsearch.version>7.12</elasticsearch.version></properties><dependency>    <groupId>org.apache.kafka</groupId>    <artifactId>kafka-clients</artifactId>    <version>2.4.0</version></dependency><dependency>    <groupId>org.apache.flink</groupId>    <artifactId>flink-connector-elasticsearch7_2.11</artifactId>    <version>${flink.version}</version></dependency>
<dependency>    <groupId>org.apache.flink</groupId>    <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>    <version>${flink.version}</version></dependency>

2、先在kafka的kafkaInfo 主题下产生消息,具体命令如下:

./kafka-console-producer.sh --broker-list 192.168.244.129:9092 --topic kafkaInfo

图片

3、接着Flink读取kafka写入ElasticSearch的java代码如下

  1. private static void kafkaToEs() throws Exception {
  2. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  3. env.enableCheckpointing(5000);
  4. env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
  5. env.setParallelism(3);
  6. Properties properties = new Properties();
  7. properties.setProperty("bootstrap.servers", "192.168.244.129:9092");
  8. properties.setProperty("group.id", "test227");
  9. properties.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
  10. properties.setProperty("max.poll.records","1000");
  11. properties.setProperty("max.partition.fetch.bytes","5242880");
  12. //创建kafak消费者,获取kafak中的数据
  13. DataStream<String> stream = env
  14. .addSource(new FlinkKafkaConsumer<>("kafkaInfo", new SimpleStringSchema(), properties));
  15. DataStream<String> sum = stream.flatMap(new FlatMapFunction<String,String>() {
  16. @Override
  17. public void flatMap(String str, Collector<String> collector) throws Exception {
  18. String[] arr = str.split(" ");
  19. for (String s : arr) {
  20. collector.collect(s);
  21. }
  22. }
  23. });
  24. sum.print();
  25. List<HttpHost> httpHosts = new ArrayList<>();
  26. httpHosts.add(new HttpHost("192.168.244.129", 9200, "http"));
  27. ElasticsearchSink.Builder<String> esSinkBuilder = new ElasticsearchSink.Builder<>(
  28. httpHosts,
  29. new ElasticsearchSinkFunction<String>() {
  30. public IndexRequest createIndexRequest(String element) {
  31. Map<String, String> json = new HashMap<>();
  32. json.put("data", element);
  33. return Requests.indexRequest()
  34. .index("kafka_flink_es")
  35. .source(json);
  36. }
  37. @Override
  38. public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
  39. indexer.add(createIndexRequest(element));
  40. }
  41. }
  42. );
  43. sum.addSink(esSinkBuilder.build()).name("toES");
  44. env.execute("kafkaToES");
  45. }

代码执行过程中有如下打印信息

图片

3、最后查看ElasticSearch的数据

访问Kibana http://192.168.244.129:5601/app/dev_tools#/console  在Kibana 中查看索引数据

图片

至此Flink读取Kafka写入ElasticSearch已经成功。

如果觉得文章能帮到您,欢迎关注微信公众号:“蓝天Java大数据” ,共同进步! 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Cpp五条/article/detail/606355
推荐阅读
相关标签
  

闽ICP备14008679号