赞
踩
flink参考 Flink环境搭建,令人惊愕的HA,kafka和elasticsearch的环境搭建参考参考之前写的文章,如下链接
Elasticsearch7.X-Springboot整合ELK进行日志收集<1>,环境搭建我们参考之前的链接即可,具体就不详细介绍了,搭建好后分别启动kafka、elasticsearch,下面是具体代码和测试验证过程。
1、pom.xml 引入下面的包
<properties>
<flink.version>1.13.0</flink.version>
<scala.binary.version>2.11</scala.binary.version>
<java.version>1.8</java.version>
<elasticsearch.version>7.12</elasticsearch.version>
</properties>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.4.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch7_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
2、先在kafka的kafkaInfo 主题下产生消息,具体命令如下:
./kafka-console-producer.sh --broker-list 192.168.244.129:9092 --topic kafkaInfo
3、接着Flink读取kafka写入ElasticSearch的java代码如下
- private static void kafkaToEs() throws Exception {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.enableCheckpointing(5000);
- env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
- env.setParallelism(3);
-
- Properties properties = new Properties();
- properties.setProperty("bootstrap.servers", "192.168.244.129:9092");
- properties.setProperty("group.id", "test227");
- properties.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
- properties.setProperty("max.poll.records","1000");
- properties.setProperty("max.partition.fetch.bytes","5242880");
-
- //创建kafak消费者,获取kafak中的数据
- DataStream<String> stream = env
- .addSource(new FlinkKafkaConsumer<>("kafkaInfo", new SimpleStringSchema(), properties));
- DataStream<String> sum = stream.flatMap(new FlatMapFunction<String,String>() {
- @Override
- public void flatMap(String str, Collector<String> collector) throws Exception {
- String[] arr = str.split(" ");
- for (String s : arr) {
- collector.collect(s);
- }
- }
- });
- sum.print();
- List<HttpHost> httpHosts = new ArrayList<>();
- httpHosts.add(new HttpHost("192.168.244.129", 9200, "http"));
- ElasticsearchSink.Builder<String> esSinkBuilder = new ElasticsearchSink.Builder<>(
- httpHosts,
- new ElasticsearchSinkFunction<String>() {
- public IndexRequest createIndexRequest(String element) {
- Map<String, String> json = new HashMap<>();
- json.put("data", element);
-
- return Requests.indexRequest()
- .index("kafka_flink_es")
- .source(json);
- }
- @Override
- public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
- indexer.add(createIndexRequest(element));
- }
- }
- );
- sum.addSink(esSinkBuilder.build()).name("toES");
- env.execute("kafkaToES");
- }
代码执行过程中有如下打印信息
3、最后查看ElasticSearch的数据
访问Kibana http://192.168.244.129:5601/app/dev_tools#/console 在Kibana 中查看索引数据
至此Flink读取Kafka写入ElasticSearch已经成功。
如果觉得文章能帮到您,欢迎关注微信公众号:“蓝天Java大数据” ,共同进步!
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。