赞
踩
kafka消息积压报警,首先进行了自查,这个现象频频出现,之前每次都是先重新分配分区或者回溯(消息可丢弃防止大量积压消费跟不上)。
根据手册首先排查下消息拉取是否正常,看到了消息拉取线程是waiting状态,然后看到kafka这块逻辑是消费线程阻塞了拉取线程。
对比了其他消费者,消费线程都是在runing和waiting中切换,但是当前消费者的消费状态一直处于runing,阻塞了消息拉取线程。
问题定位成功,然后去看了线程的栈信息,发现是里面的逻辑卡在了socket.read,当即想到了socket的超时,去看了代码逻辑,是httpclinet,果然没有设置超时时间。
按照定义解释为如果sockettimeout设置为0的话,应该是等待无限长的时间(直到进程重启),这里有个老哥用个更详细的排查https://cloud.tencent.com/developer/news/698654。
所以解决方案就是在请求是设置一下:
使用的是fluent api
import org.apache.http.client.fluent.Request;
Request request = Request.Post(uri).connectTimeout(1000).socketTimeout(1000);
String response = request.execute().returnContent().asString();
后面考虑到这个请求量比较大,可能会影响交易流程(这次的问题查询是一个同步信息接口),因此决定不使用公共连接池,写法如下:
import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.net.ssl.SSLContext; import java.io.IOException; import java.security.KeyManagementException; import java.security.NoSuchAlgorithmException; /** * @version 1.0 */ public class HttpFluentUtil { private Logger logger = LoggerFactory.getLogger(HttpFluentUtil.class); private final static int MaxPerRoute = 100; private final static int MaxTotal = 200; final static PoolingHttpClientConnectionManager CONNMGR; final static HttpClient CLIENT; final static Executor executor; static { LayeredConnectionSocketFactory ssl = null; try { ssl = SSLConnectionSocketFactory.getSystemSocketFactory(); } catch (final SSLInitializationException ex) { final SSLContext sslcontext; try { sslcontext = SSLContext.getInstance(SSLConnectionSocketFactory.TLS); sslcontext.init(null, null, null); ssl = new SSLConnectionSocketFactory(sslcontext); } catch (final SecurityException ignore) { } catch (final KeyManagementException ignore) { } catch (final NoSuchAlgorithmException ignore) { } } final Registry<ConnectionSocketFactory> sfr = RegistryBuilder.<ConnectionSocketFactory>create() .register("http", PlainConnectionSocketFactory.getSocketFactory()) .register("https", ssl != null ? ssl : SSLConnectionSocketFactory.getSocketFactory()).build(); CONNMGR = new PoolingHttpClientConnectionManager(sfr); CONNMGR.setDefaultMaxPerRoute(MaxPerRoute); CONNMGR.setMaxTotal(MaxTotal); CLIENT = HttpClientBuilder.create().setConnectionManager(CONNMGR).build(); executor = Executor.newInstance(CLIENT); } public static String Get(String uri, int connectTimeout, int socketTimeout) throws IOException { return executor.execute(Request.Get(uri).connectTimeout(connectTimeout).socketTimeout(socketTimeout)) .returnContent().asString(); } public static String Post(String uri, int connectTimeout, int socketTimeout) throws IOException { return executor.execute(Request.Post(uri).socketTimeout(socketTimeout) ).returnContent().asString(); } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。