当前位置:   article > 正文

Flink es sink的一次历险

flink es sink

不谈需求的铺代码==耍流氓。。。

需求(概略):从kafka读数据=》业务处理=》写入es=》写入kafka(通过kafka通知其他业务:某条数据的id,以及所在es的索引)

第一版代码(摘要):

  1. //写入es
  2. mapDs.addSink(new ElasticsearchSink<JSONObject>(esProps, transportAddresses, (ElasticsearchSinkFunction<JSONObject>) (jsonObject, runtimeContext, requestIndexer) -> {
  3. long createTime = jsonObject.getLong("create_time");
  4. String indexName = esPrefix.concat(Utils.longDate2String(createTime, "yyyy.MM.dd"));
  5. requestIndexer.add(Requests.indexRequest().index(indexName)
  6. .type(esType)
  7. .source(jsonObject));
  8. }, new RetryRejectedExecutionFailureHandler())).name("sop-controller-es-sink");
  9. //写入kafka
  10. mapDs.filter(jsonObject -> "初始化".equals(jsonObject.getString("event_status")))
  11. .map(item -> kafkaOutPutData(item, esPrefix))
  12. .addSink(new FlinkKafkaProducer010<String>(outputTopic, new SimpleStringSchema(), props)).name("sop-controller-sop-sink");

都是根据flink基础api来的(之前没玩过),测试妥妥的,心里美滋滋。。。然后找下游接我写入kafka数据的同事联调,出问题了。。。。

如需求所说,他拿到的kafka数据标识的是数据的id以及所在的索引,然后他的业务回去查es,发现es还没有写入!!!(然鹅,并不是不写入es,而是es写入完于kafka写入)

思考:最根本的原因,下游的同学接到kafka数据时必须确保相应数据已经写入es

1.两个sink(es,kafka)属于flink的两个不同任务,肯定不是同步的,能控制么?翻了下api文档,没有找到相应的地方(如果有的话,请赐教)

2.如果直接在es sink的时候写kafka,会不会有问题?好像还是会有问题,es的写入,为了保证性能,是批量的形式,提交的数据会先缓存住,然后根据参数设置(按时间或者按数据量),进行批量提交(flush)。有没有办法监听批量提交的事件,在里面做文章呢?翻代码:

ElasticsearchSink的源码:
  1. @PublicEvolving
  2. public class ElasticsearchSink<T> extends ElasticsearchSinkBase<T, TransportClient> {
  3. private static final long serialVersionUID = 1L;
  4. /**
  5. * Creates a new {@code ElasticsearchSink} that connects to the cluster using a {@link TransportClient}.
  6. *
  7. * @param userConfig The map of user settings that are used when constructing the {@link TransportClient} and {@link BulkProcessor}
  8. * @param transportAddresses The addresses of Elasticsearch nodes to which to connect using a {@link TransportClient}
  9. * @param elasticsearchSinkFunction This is used to generate multiple {@link ActionRequest} from the incoming element
  10. */
  11. public ElasticsearchSink(
  12. Map<String, String> userConfig,
  13. List<InetSocketAddress> transportAddresses,
  14. ElasticsearchSinkFunction<T> elasticsearchSinkFunction) {
  15. this(userConfig, transportAddresses, elasticsearchSinkFunction, new NoOpFailureHandler());
  16. }
  17. /**
  18. * Creates a new {@code ElasticsearchSink} that connects to the clust
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/从前慢现在也慢/article/detail/606584
推荐阅读
相关标签
  

闽ICP备14008679号