赞
踩
最近在线上遇到个奇怪的问题,具体的情景是:由于客户有相应的需求,我们需要提前在程序中判断Kafka集群是否可用,于是我们使用了Kafka0.11.0.0版本之后提供的KafkaAdminClient,这可以方便的创建一个kafka客户端进行集群的管理操作。
我们使用listTopics()来获取kafka集群所有的topic,如果能获取到相应数据,则能够说明kafka集群运行正常,代码很简单:
package com.wenjie.demo.kafkaconnector.client; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.KafkaAdminClient; import org.apache.kafka.clients.admin.ListTopicsResult; import java.net.InetAddress; import java.net.UnknownHostException; import java.util.Properties; import java.util.Set; import java.util.concurrent.ExecutionException; /** * @author BlackSugar * @date 2020/11/19 */ @Slf4j public class MyAdminClient { private static InetAddress localInetAddr = null; static { try { localInetAddr = InetAddress.getLocalHost(); } catch (UnknownHostException e) { e.printStackTrace(); log.error("Get local host error!!!"); } } private static void isReachable(InetAddress remoteInetAddr, int port, int timeout) { Properties properties = new Properties(); String host = remoteInetAddr.getHostAddress() + ":" + port; properties.put("bootstrap.servers", host); properties.put("connections.max.idle.ms", 10000); properties.put("request.timeout.ms", timeout); try (AdminClient client = KafkaAdminClient.create(properties)) { ListTopicsResult topics = client.listTopics(); Set<String> names = topics.names().get(); log.info("SUCCESS - Connection established! Local:{}, remote:{}, port:{}, topic:[{}]", localInetAddr.getHostAddress(), remoteInetAddr.getHostAddress(), port, names); } catch (InterruptedException | ExecutionException e) { // Kafka is not available log.error("FAILED - Can not connect to kafka cluster! Local:{}, remote:{}, port:{}, exception: ", localInetAddr.getHostAddress(), remoteInetAddr.getHostAddress(), port, e); } } public static void main(String[] args) { try { isReachable(InetAddress.getByName("127.0.0.1"), 9092, 5000); } catch (UnknownHostException e) { e.printStackTrace(); } } }
仅依赖kafka客户端即可,POM文件:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.3.1</version>
</dependency>
<!--lombok -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
成功日志(返回topics):
2020-11-18 16:33:25.085 INFO 45508 --- [http-nio-8990-exec-8] o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1605774805085
2020-11-18 16:33:25.096 INFO 45508 --- [http-nio-8990-exec-8] com.wenjie.demo.kafkaconnector.client.MyAdminClient : SUCCESS - Connection established! Local:127.0.0.1, remote:127.0.0.1, port:9092, topic:[topic1,topic2]
失败日志(超时):
15:47:15.942 [main] DEBUG org.apache.kafka.clients.admin.KafkaAdminClient - [AdminClient clientId=adminclient-1] Kafka admin client closed.
15:47:15.942 [main] ERROR com.wenjie.demo.kafkaconnector.client.MyAdminClient - FAILED - Can not connect to kafka cluster! Local:127.0.0.1, remote:127.0.0.1, port:9091, exception:
java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment.
at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
at com.wenjie.demo.kafkaconnector.client.MyAdminClient.isReachable(MyAdminClient.java:39)
at com.wenjie.demo.kafkaconnector.client.MyAdminClient.main(MyAdminClient.java:51)
Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment.
项目运行了一段时间没什么问题,直到有一天线上突然报出OOM:
2020-11-18 17:37:48.107 ERROR 41072 --- [kafka-admin-client-thread | adminclient-5] o.apache.kafka.common.utils.KafkaThread : Uncaught exception in thread 'kafka-admin-client-thread | adminclient-5': java.lang.OutOfMemoryError: Java heap space at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57) at java.nio.ByteBuffer.allocate(ByteBuffer.java:335) at org.apache.kafka.common.memory.MemoryPool$1.tryAllocate(MemoryPool.java:30) at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:112) at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:424) at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:385) at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:651) at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:572) at org.apache.kafka.common.network.Selector.poll(Selector.java:483) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:539) at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1152) at java.lang.Thread.run(Thread.java:745)
通过观察gc日志,我们发现异常出现的时间进行了Full GC,还GC不掉:
2020-11-18T17:37:48.387+0800: 3.095: [Full GC (Metadata GC Threshold) [PSYoungGen: 6468K->0K(299008K)] [ParOldGen: 5580K->11448K(349696K)] 12049K->11448K(648704K), [Metaspace: 33895K->33895K(1079296K)], 0.0712752 secs] [Times: user=0.20 sys=0.00, real=0.08 secs]
这就奇怪了,怎么会突然报Full GC呢,看GC应该是出现了大对象,直接把内存撑爆了,于是我开始向这个方向着手排查。
通过一系列的验证,我发现出现OOM的时候,都是Kafka客户端尝试访问一个非Kafka的TCP服务的时候(访问正常的Kafka服务会success,访问不畅通的TCP服务的时候会timout)。
顺着这个情况找下去,最终找到了这个类:NetworkReceive,关键代码如下:
public long readFrom(ScatteringByteChannel channel) throws IOException { ... if (!size.hasRemaining()) { size.rewind(); int receiveSize = size.getInt(); if (receiveSize < 0) throw new InvalidReceiveException("Invalid receive (size = " + receiveSize + ")"); if (maxSize != UNLIMITED && receiveSize > maxSize) throw new InvalidReceiveException("Invalid receive (size = " + receiveSize + " larger than " + maxSize + ")"); requestedBufferSize = receiveSize; if (receiveSize == 0) { buffer = EMPTY_BUFFER; } } ... if (buffer == null && requestedBufferSize != -1) { buffer = memoryPool.tryAllocate(requestedBufferSize); ... }
关键就在于这个buffer,在OOM时想要申请一个1213486160字节(约1.1G)的内存空间,而我们程序的最大堆-Xmx才配的1G,当然会OOM啦。
好的,现在问题是找到了 ,那么原因呢?
拜托了万能的google,我找到了这个:
其中表示当Kafka服务端接受到一个HTTP服务的时候,生成的数据会特别大,然后Kafka在0.10.1.1之后做了更新,解决了这个问题,可是客户端呢?就被遗忘了?坑爹呢!
不过可能Kafka的开发人员也没想到会有这么奇葩的需求吧~
这里我推测是由于无法正常序列化HTTP报文,导致序列化出来的数据特别大(ps:thrift也有类似问题)。
言归正传,原因也发现了,那怎么解决?
回到上面代码,我们可以看到:
if (maxSize != UNLIMITED && receiveSize > maxSize)
throw new InvalidReceiveException("Invalid receive (size = " + receiveSize + " larger than " + maxSize + ")");
requestedBufferSize = receiveSize;
好家伙,我直接好家伙,buffer是根据requestedBufferSize来申请的,这不是有相关配置吗,如果这里我们配置一个maxSize,那么程序就会直接抛出异常,而不会到申请内存的地方,我们再在外部将这个异常捕获处理,那一切不就ok了吗?
naive,我是真的naive,我一路追寻这个maxSize,最终找到了它配置的地方: org.apache.kafka.common.network.Selector,它的构造函数:
public Selector(int maxReceiveSize, long connectionMaxIdleMs, int failedAuthenticationDelayMs, Metrics metrics, Time time, String metricGrpPrefix, Map<String, String> metricTags, boolean metricsPerConnection, boolean recordTimePerConnection, ChannelBuilder channelBuilder, MemoryPool memoryPool, LogContext logContext) { try { this.nioSelector = java.nio.channels.Selector.open(); } catch (IOException e) { throw new KafkaException(e); } this.maxReceiveSize = maxReceiveSize; this.time = time; this.channels = new HashMap<>(); this.explicitlyMutedChannels = new HashSet<>(); this.outOfMemory = false; this.completedSends = new ArrayList<>(); this.completedReceives = new ArrayList<>(); this.stagedReceives = new HashMap<>(); this.immediatelyConnectedKeys = new HashSet<>(); this.closingChannels = new HashMap<>(); this.keysWithBufferedRead = new HashSet<>(); this.connected = new ArrayList<>(); this.disconnected = new HashMap<>(); this.failedSends = new ArrayList<>(); this.sensors = new SelectorMetrics(metrics, metricGrpPrefix, metricTags, metricsPerConnection); this.channelBuilder = channelBuilder; this.recordTimePerConnection = recordTimePerConnection; this.idleExpiryManager = connectionMaxIdleMs < 0 ? null : new IdleExpiryManager(time, connectionMaxIdleMs); this.memoryPool = memoryPool; this.lowMemThreshold = (long) (0.1 * this.memoryPool.size()); this.log = logContext.logger(Selector.class); this.failedAuthenticationDelayMs = failedAuthenticationDelayMs; this.delayedClosingChannels = (failedAuthenticationDelayMs > NO_FAILED_AUTHENTICATION_DELAY) ? new LinkedHashMap<String, DelayedAuthenticationFailureClose>() : null; } public Selector(int maxReceiveSize, long connectionMaxIdleMs, Metrics metrics, Time time, String metricGrpPrefix, Map<String, String> metricTags, boolean metricsPerConnection, boolean recordTimePerConnection, ChannelBuilder channelBuilder, MemoryPool memoryPool, LogContext logContext) { this(maxReceiveSize, connectionMaxIdleMs, NO_FAILED_AUTHENTICATION_DELAY, metrics, time, metricGrpPrefix, metricTags, metricsPerConnection, recordTimePerConnection, channelBuilder, memoryPool, logContext); } public Selector(int maxReceiveSize, long connectionMaxIdleMs, int failedAuthenticationDelayMs, Metrics metrics, Time time, String metricGrpPrefix, Map<String, String> metricTags, boolean metricsPerConnection, ChannelBuilder channelBuilder, LogContext logContext) { this(maxReceiveSize, connectionMaxIdleMs, failedAuthenticationDelayMs, metrics, time, metricGrpPrefix, metricTags, metricsPerConnection, false, channelBuilder, MemoryPool.NONE, logContext); } public Selector(int maxReceiveSize, long connectionMaxIdleMs, Metrics metrics, Time time, String metricGrpPrefix, Map<String, String> metricTags, boolean metricsPerConnection, ChannelBuilder channelBuilder, LogContext logContext) { this(maxReceiveSize, connectionMaxIdleMs, NO_FAILED_AUTHENTICATION_DELAY, metrics, time, metricGrpPrefix, metricTags, metricsPerConnection, channelBuilder, logContext); } public Selector(long connectionMaxIdleMS, Metrics metrics, Time time, String metricGrpPrefix, ChannelBuilder channelBuilder, LogContext logContext) { this(NetworkReceive.UNLIMITED, connectionMaxIdleMS, metrics, time, metricGrpPrefix, Collections.emptyMap(), true, channelBuilder, logContext); } }
这个maxReceiveSize就是我们找的目标了,而有他的构造函数一个调用都没有,而没有这个参数的构造函数,它的调用方是:
好吧,一家人整整齐齐。也是不知道为什么Kafka官方不提供一个可配置maxReceiveSize的调用。
我们来看KafkaAdminClient:
static KafkaAdminClient createInternal(AdminClientConfig config, TimeoutProcessorFactory timeoutProcessorFactory) {
...
Selector(config.getLong(AdminClientConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG),
metrics, time, metricGrpPrefix, channelBuilder, logContext);
...
}
的确没法配置,没办法,我们只有重写这个类了,这里我们只需要create、listTopics、close方法,其他的就不需要写了,最终代码如下(注意引用我们自己的类):
package com.wenjie.demo.kafkaconnector.client; import com.wenjie.demo.kafkaconnector.component.AdminClient; import com.wenjie.demo.kafkaconnector.component.KafkaAdminClient; import com.wenjie.demo.kafkaconnector.component.ListTopicsResult; import lombok.extern.slf4j.Slf4j; import java.net.InetAddress; import java.net.UnknownHostException; import java.util.Properties; import java.util.Set; /** * @author BlackSugar * @date 2020/11/19 */ @Slf4j public class MyAdminClient { private static InetAddress localInetAddr = null; static { try { localInetAddr = InetAddress.getLocalHost(); } catch (UnknownHostException e) { e.printStackTrace(); log.error("Get local host error!!!"); } } private static void isReachable(InetAddress remoteInetAddr, int port, int timeout)
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。