当前位置:   article > 正文

flink适配elasticsearch-8 connector 心得_flink elasticsearch8

flink elasticsearch8

flink1.16.0适配elasticsearch-8 connector 心得

来源:github flink暂时未合并es8源码
https://github.com/apache/flink-connector-elasticsearch/pull/53/files


环境:flink 1.16.0 + jdk 1.8

要点一:OperationSerializer.java

使用的是kryo格式的序列化和反序列化,如果数据源是json,需要调整序列化方法

要点二:NetworkConfigFactory.java

需要在这儿自定义esClient,根据自身环境设置设置es的header 、认证、ssl等
注:这里不要默认header (“Content-Type”, “application/json”)

要点三:需要Operation和Elasticsearch8SinkBuilder做微调,见下图

在这里插入图片描述
总结
好了,整体的调整部分就是这样了,接下来就是测试了 ,可与参考test/下的Elasticsearch8SinkTest
我这里也有一个kafka2es的例子给大家参考

package org.apache.flink.connector.elasticsearch.sink;

import co.elastic.clients.elasticsearch.core.bulk.BulkOperation;
import lombok.AllArgsConstructor;

import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.connector.kafka.source.KafkaSource;

import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;

import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema;

import org.apache.http.HttpHost;

import java.util.Arrays;

public class CreateEs8demo {
    public static void main(String[] args) {
        try {
            KafkaSource<ObjectNode> source = KafkaSource.<ObjectNode>builder()
                    .setBootstrapServers("1.1.1.1:9092")
                    .setTopics("topic")
                    .setGroupId("test")
                    .setStartingOffsets(OffsetsInitializer.earliest())
                    //.setProperties(properties)
                    .setDeserializer(KafkaRecordDeserializationSchema.of(new JSONKeyValueDeserializationSchema(
                            true)))
                    .build();

            StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();

            DataStreamSource<ObjectNode> sourceStream = environment.fromSource(
                    source,
                    WatermarkStrategy.noWatermarks(),
                    "source");
            HttpHost[] httpHosts = new HttpHost[]{new HttpHost("*.*.*.33", 9200, "https")};
            Elasticsearch8Sink<User> elastic = Elasticsearch8SinkBuilder.<User>builder()
                    .setMaxBatchSize(1)
                    .setMaxBufferedRequests(2)
                    .setMaxTimeInBufferMS(1000)
                    .setHosts(httpHosts)
                    .setUsername("xxx")
                    .setPassword("xxx")
                    .setConverter(
                            (element, ctx) ->
                                    new BulkOperation.Builder()
//                                            .index(op -> op
//                                                    .index("es8_index")
//                                                    .document(element)
//                                            )
                                            .update(op -> op
                                                    .index("es8_index")
                                                    .id(element.getId())
                                                    .action(e -> e.doc(element).docAsUpsert(true))
                                            )


                                            .build())
                    .build();
            sourceStream.map(new MapFunction<ObjectNode, User>() {
                @Override
                public User map(ObjectNode jsonNodes) throws Exception {
                    JsonNode value = jsonNodes.get("value");
                    return User.builder()
                            .id(value.get("id").asText())
                            .name(value.get("name").asText())
                            .age(value.get("age").asText())
                            .build();
                }
            })
                    .sinkTo(elastic);

            environment.execute();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    @Builder
    public static class User {
        private String id;
        private String name;
        private String age;
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101

如有不对,欢迎请大家指正,不胜感激,欢迎评论。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/不正经/article/detail/606225
推荐阅读
相关标签
  

闽ICP备14008679号