赞
踩
从半年现在从0开始搭建Flink实时计算平台,部分存储层用到了Elasticsearch,从零开始接触Flink,这半年来遇到了好多坑,由传统的开发转变成了大数据开发,Elasticsearch内含有多种熔断器,为了防止OOM。由于目前业务查询的方式会造成成本很高,(可以看一下allow_expensive_querys),某次查询可能会引起服务的熔断,这时候有可能引起实时任务 sink Elasticsearch请求也会被熔断。
当然 Flink Connector 提供了几种失败处理机制
IgnoringFailureHandler
: 会忽略所有 sink elasticsearch Connector的异常 ;NoOpFailureHandler
: 不处理任何异常,只输出异常栈信息(默认);RetryRejectedExecutionFailureHandler
: 遇到特定异常时会进行重试 包涵 EsRejectedExecutionException
类以及他的子类。当我们遇到更新比较多频繁的时候,用IgnoringFailureHandler
当写入ES失败时不影响Flink任务,当然遇到比较敏感统计时,我们需要对失败的结果集进行重试,
需要配合RetryRejectedExecutionFailureHandler
来进行处理,源码中只会处理EsRejectedExecutionException
类以及他的子类,当然熔断类型的异常归属于ElasticsearchStatusException
异常,两者并没有关系。为防止Flink因elasticsearch集群熔断导致挂掉,我们需要做特定的处理,重写ActionRequestFailureHandler
。
为了可以更好地扩展,我们首先定义一个策略类ElasticsearchExceptionHandlerStrategy
代码如下:
/** * @author liweigao * @date 2021/12/2 下午11:17 */ @Getter public enum ElasticsearchExceptionHandlerStrategy { /** * 默认空不处理或默认使用父类 由handler来决定实现 */ DEFAULT(Lists.newArrayList()), /*** * 全部异常 Throwable 级别 * 需要注意 */ ALL_EXCEPTION(Lists.newArrayList(Throwable.class)), /** * @see org.elasticsearch.ElasticsearchException * @see org.elasticsearch.ElasticsearchException.ElasticsearchExceptionHandle * <p> * elasticsearch 封装的异常 */ ELASTICSEARCH_EXCEPTION(Lists.newArrayList(ElasticsearchException.class)), /** * @see org.elasticsearch.ElasticsearchStatusException * @see org.elasticsearch.rest.RestStatus * @see EsRejectedExecutionException * <p> * elasticsearch 状态异常 * todo 可根据相应的异常进行细化~ * 可进行通信的链接状态错误(比如 es熔断导致的429错误) */ ELASTICSEARCH_STATUS_AND_REJECTED_EXCEPTION(Lists.newArrayList(org.elasticsearch.ElasticsearchStatusException.class, EsRejectedExecutionException .class)),; final List<Class<? extends Throwable>> exceptionClass; ElasticsearchExceptionHandlerStrategy(List<Class<? extends Throwable>> exceptionClass) { this.exceptionClass = exceptionClass; } }
定义了四种策略
ALL_EXCEPTION
全部异常ELASTICSEARCH_EXCEPTION
ELASTICSEARCH_EXCEPTION elasticsearch全部异常ELASTICSEARCH_STATUS_AND__EXCEPTION
EsRejectedExecutionException
和 ElasticsearchStatusException
异常DEFAULT
默认空不处理或默认使用父类, 由handler来决定实现可根据实际业务去扩展ElasticsearchExceptionHandlerStrategy 枚举类。
RetryExecutionFailureHandler
: 特定的异常失败重试 如果策略为DEFAULT时 会交由父类去处理(RetryRejectedExecutionFailureHandler
) 代码如下:/** * 可重试异常处理,根据{@link ElasticsearchExceptionHandlerStrategy} 进行处理 * * @author liweigao * @date 2021/12/2 下午11:27 */ @Slf4j public class RetryExecutionFailureHandler extends RetryRejectedExecutionFailureHandler { private static final long serialVersionUID = -1; private ElasticsearchExceptionHandlerStrategy strategy; @Nullable public RetryExecutionFailureHandler(ElasticsearchExceptionHandlerStrategy strategy) { this.strategy = strategy; } @Override public void onFailure(ActionRequest action, Throwable failure, int restStatusCode, RequestIndexer indexer) throws Throwable { if (Objects.isNull(strategy) || CollectionUtils.isEmpty(strategy.getExceptionClass())) { super.onFailure(action, failure, restStatusCode, indexer); return; } log.error("Failed Elasticsearch item request: {}", failure.getMessage(), failure); for (Class<? extends Throwable> exceptionClass : strategy.getExceptionClass()) { if (ExceptionUtils.findThrowable(failure, exceptionClass).isPresent()) { indexer.add(action); return; } } // rethrow all other failures throw failure; } }
IgnoringExceptionFailureHandler
: 特定的异常忽略 如果策略为DEFAULT时 类似于IgnoringFailureHandler
处理代码如下:/** * 忽略特定异常,如果没指定时默认为全部忽略 * * @author liweigao * @date 2021/12/2 下午11:35 */ @Slf4j public class IgnoringExceptionFailureHandler implements ActionRequestFailureHandler { private static final long serialVersionUID = -1; private ElasticsearchExceptionHandlerStrategy strategy; @Override public void onFailure(ActionRequest action, Throwable failure, int restStatusCode, RequestIndexer indexer) throws Throwable { if (Objects.isNull(strategy) || CollectionUtils.isEmpty(strategy.getExceptionClass())) { return; } log.error("Failed Elasticsearch item request: {}", failure.getMessage(), failure); for (Class<? extends Throwable> exceptionClass : strategy.getExceptionClass()) { if (ExceptionUtils.findThrowable(failure, exceptionClass).isPresent()) { return; } } // rethrow all other failures throw failure; } }
伪代码如下:
ElasticsearchSink.Builder<Object> builder = new ElasticsearchSink.Builder<Object>(httpHosts, new ElasticsearchSinkFunction(){...}); //配置批量提交 builder.setBulkFlushBackoff(true); //设置重试次数 builder.setBulkFlushBackoffRetries(2); //设置重试间隔 builder.setBulkFlushBackoffDelay(2000L); //设置重试策略CONSTANT: 常数 eg: 重试间隔为2s 重试3次 会在2s->4s->6s进行; EXPONENTIAL:指数 eg: 重试间隔为2s 重试3次 会在2s->4s->8s进行 builder.setBulkFlushBackoffType(ElasticsearchSinkBase.FlushBackoffType.CONSTANT); //设置批量提交最大数据量 builder.setBulkFlushMaxSizeMb(10); //设置批量提交间隔 builder.setBulkFlushInterval(2000L); //设置批量提交的最大条数 builder.setBulkFlushMaxActions(1000); //设置重试机制 builder.Builder<Object>.setFailureHandler(new RetryExecutionFailureHandler(ElasticsearchExceptionHandlerStrategy.DEFAULT));
Elasticsearch失败重试机制依赖于checkpoint 可参看源码:
ElasticsearchSinkBase
类
以上拙见,毕竟才入坑,欢迎交流~ 推荐一波Flink 的发布平台。切记:没有最优的公共配置,需要根据特定场景才能达到相应的效果。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。