不谈需求的铺代码==耍流氓。。。
需求(概略):从kafka读数据=》业务处理=》写入es=》写入kafka(通过kafka通知其他业务:某条数据的id,以及所在es的索引)
第一版代码(摘要):
- //写入es
- mapDs.addSink(new ElasticsearchSink<JSONObject>(esProps, transportAddresses, (ElasticsearchSinkFunction<JSONObject>) (jsonObject, runtimeContext, requestIndexer) -> {
- long createTime = jsonObject.getLong("create_time");
- String indexName = esPrefix.concat(Utils.longDate2String(createTime, "yyyy.MM.dd"));
- requestIndexer.add(Requests.indexRequest().index(indexName)
- .type(esType)
- .source(jsonObject));
- }, new RetryRejectedExecutionFailureHandler())).name("sop-controller-es-sink");
-
- //写入kafka
- mapDs.filter(jsonObject -> "初始化".equals(jsonObject.getString("event_status")))
- .map(item -> kafkaOutPutData(item, esPrefix))
- .addSink(new FlinkKafkaProducer010<String>(outputTopic, new SimpleStringSchema(), props)).name("sop-controller-sop-sink");
都是根据flink基础api来的(之前没玩过),测试妥妥的,心里美滋滋。。。然后找下游接我写入kafka数据的同事联调,出问题了。。。。
如需求所说,他拿到的kafka数据标识的是数据的id以及所在的索引,然后他的业务回去查es,发现es还没有写入!!!(然鹅,并不是不写入es,而是es写入完于kafka写入)
思考:最根本的原因,下游的同学接到kafka数据时必须确保相应数据已经写入es
1.两个sink(es,kafka)属于flink的两个不同任务,肯定不是同步的,能控制么?翻了下api文档,没有找到相应的地方(如果有的话,请赐教)
2.如果直接在es sink的时候写kafka,会不会有问题?好像还是会有问题,es的写入,为了保证性能,是批量的形式,提交的数据会先缓存住,然后根据参数设置(按时间或者按数据量),进行批量提交(flush)。有没有办法监听批量提交的事件,在里面做文章呢?翻代码:
ElasticsearchSink的源码:
- @PublicEvolving
- public class ElasticsearchSink<T> extends ElasticsearchSinkBase<T, TransportClient> {
-
- private static final long serialVersionUID = 1L;
-
- /**
- * Creates a new {@code ElasticsearchSink} that connects to the cluster using a {@link TransportClient}.
- *
- * @param userConfig The map of user settings that are used when constructing the {@link TransportClient} and {@link BulkProcessor}
- * @param transportAddresses The addresses of Elasticsearch nodes to which to connect using a {@link TransportClient}
- * @param elasticsearchSinkFunction This is used to generate multiple {@link ActionRequest} from the incoming element
- */
- public ElasticsearchSink(
- Map<String, String> userConfig,
- List<InetSocketAddress> transportAddresses,
- ElasticsearchSinkFunction<T> elasticsearchSinkFunction) {
-
- this(userConfig, transportAddresses, elasticsearchSinkFunction, new NoOpFailureHandler());
- }
-
- /**
- * Creates a new {@code ElasticsearchSink} that connects to the clust