赞
踩
来源:github flink暂时未合并es8源码
https://github.com/apache/flink-connector-elasticsearch/pull/53/files
环境:flink 1.16.0 + jdk 1.8
要点一:OperationSerializer.java
使用的是kryo格式的序列化和反序列化,如果数据源是json,需要调整序列化方法
要点二:NetworkConfigFactory.java
需要在这儿自定义esClient,根据自身环境设置设置es的header 、认证、ssl等
注:这里不要默认header (“Content-Type”, “application/json”)
要点三:需要Operation和Elasticsearch8SinkBuilder做微调,见下图
总结
好了,整体的调整部分就是这样了,接下来就是测试了 ,可与参考test/下的Elasticsearch8SinkTest
我这里也有一个kafka2es的例子给大家参考
package org.apache.flink.connector.elasticsearch.sink; import co.elastic.clients.elasticsearch.core.bulk.BulkOperation; import lombok.AllArgsConstructor; import lombok.Builder; import lombok.Data; import lombok.NoArgsConstructor; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.connector.kafka.source.KafkaSource; import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema; import org.apache.http.HttpHost; import java.util.Arrays; public class CreateEs8demo { public static void main(String[] args) { try { KafkaSource<ObjectNode> source = KafkaSource.<ObjectNode>builder() .setBootstrapServers("1.1.1.1:9092") .setTopics("topic") .setGroupId("test") .setStartingOffsets(OffsetsInitializer.earliest()) //.setProperties(properties) .setDeserializer(KafkaRecordDeserializationSchema.of(new JSONKeyValueDeserializationSchema( true))) .build(); StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<ObjectNode> sourceStream = environment.fromSource( source, WatermarkStrategy.noWatermarks(), "source"); HttpHost[] httpHosts = new HttpHost[]{new HttpHost("*.*.*.33", 9200, "https")}; Elasticsearch8Sink<User> elastic = Elasticsearch8SinkBuilder.<User>builder() .setMaxBatchSize(1) .setMaxBufferedRequests(2) .setMaxTimeInBufferMS(1000) .setHosts(httpHosts) .setUsername("xxx") .setPassword("xxx") .setConverter( (element, ctx) -> new BulkOperation.Builder() // .index(op -> op // .index("es8_index") // .document(element) // ) .update(op -> op .index("es8_index") .id(element.getId()) .action(e -> e.doc(element).docAsUpsert(true)) ) .build()) .build(); sourceStream.map(new MapFunction<ObjectNode, User>() { @Override public User map(ObjectNode jsonNodes) throws Exception { JsonNode value = jsonNodes.get("value"); return User.builder() .id(value.get("id").asText()) .name(value.get("name").asText()) .age(value.get("age").asText()) .build(); } }) .sinkTo(elastic); environment.execute(); } catch (Exception e) { e.printStackTrace(); } } @Data @AllArgsConstructor @NoArgsConstructor @Builder public static class User { private String id; private String name; private String age; } }
如有不对,欢迎请大家指正,不胜感激,欢迎评论。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。