当前位置:   article > 正文

Kafka消息发送失败解决方案

kafka消息发送失败

防火墙设置

防火墙会屏蔽掉Kafka的访问,如果在内网限制很强的情况下,只能是逐个端口用 telnet 排查Kafka用到哪些端口,用lsof -i:<端口号>排查对应进程。

最粗暴的方式是关闭防火墙

# CentOS 6
service iptables stop
# CentOS 7
systemctl stop firewalld
systemctl disable firewalld
  • 1
  • 2
  • 3
  • 4
  • 5

有可能要关闭SELINUX(不确定)

# 当前session生效
setenforce 0
  • 1
  • 2

要么编辑/etc/selinux/config,Session重新登录后生效

# SELINUX=enforcing # 默认值
SELINUX=disabled
  • 1
  • 2

文件资源句柄不足

当拿到机器未做操作系统层面优化时,往往会受到默认系统的参数限制,导致一些奇怪的异常。最常见的是文件资源句柄数量(nofile)和进程数量(nproc)的限制,可修改如下

# /etc/security/limits.conf文件
*    hard    nofile    65536
*    soft    nofile    65536
*    hard    nproc     65536
*    soft    nproc     65536
  • 1
  • 2
  • 3
  • 4
  • 5

Session重新登录后生效,ulimit -a命令可以查看

Kafka listener设置

kafka的producer和consumer在查找broker时,依赖的主机名还是IP的情况比较迷,尤其是当hostname和请求中的Kafka broker配置不一致时,比较容易出问题,因此索性都改成IP比较不容易出问题

# 编辑 $KAFKA_HOME/config/server.properties文件
advertised.host.name=<Your IP>
advertised.port=9092
advertised.listeners=PLAINTEXT://<Your IP>:9092
  • 1
  • 2
  • 3
  • 4

kafka-python 的使用

kafka-python库用作python的依赖来访问kafka比较方便,注意在使用KafkaProducer时,可能会存在各种各样的坑,包括制定API版本、正确设定bootstrap_servers等。详细API使用说明可以参考官方KafkaProducer说明

尤其注意参数 bootstrap_servers 使用broker的数组比较安全,尽量不要把所有broker信息都放到一个string里面

#!/usr/bin/env python
# encoding:utf-8
# 推荐的Producer声明方式
from kafka import KafkaProducer

brokers = ["kafka-1:9092", "kafka-2:9092", "kafka-3:9092"]
producer = KafkaProducer(bootstrap_servers=brokers, acks=1, retries=3, max_block_ms=5000, api_version=(0, 8, 2))
"""
    acks = 1: Partition Leader做出回应
    max_block_ms = 5000: 如果不能正常拿到metadata, 5s就返回,默认是60s
    api_version = (0, 8, 2): Kafka的版本元组,必须和Kafka Broker的版本一致,否则可能有问题,写大版本号(0, 10), (0, 8)之类的也可以
"""

producer.send("topic_name", b"Hello World")
producer.flush()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

Kafka 内存不稳定、吃CPU的优化

主要修改$KAFKA_HOME/bin/kafka-run-class.sh和$KAFKA_HOME/bin/kafka-server-start.sh

  • $KAFKA_HOME/bin/kafka-run-class.sh

1. G1GC垃圾回收能够避免掉CMS的一些问题
2. Kafka使用DirectMemory通信(基于NIO),因此要设置大一些

KAFKA_JVM_PERFORMANCE_OPTS="-server -XX:MaxDirectMemorySize=1024m -XX:+UseG1GC -Djava.awt.headless=true"
  • 1
  • $KAFKA_HOME/bin/kafka-server-start.sh

1. -Xmn设置年轻代(Eden+S1+S2)大小,默认的年轻代尺寸过小,容易发生YoungGC
2. SurvivorRatio=Eden:S1,由于S1和S2大小一致,且主要用于GC时使用,所以可以评估年轻代大小,调整合适的比例。
3. 年轻代主要放在Eden区当中,Eden区大小=Xmn大小 * SurvivorRatio / (SurvivorRatio + 2)

export KAFKA_HEAP_OPTS="-Xmx8G -Xms2G -Xmn3G -XX:SurvivorRatio=6"
  • 1
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/IT小白/article/detail/438340
推荐阅读
相关标签
  

闽ICP备14008679号