赞
踩
在项目部署过程中遇到kafka需要走代理跨域通信的情景,搭建了一套环境模拟实验,以此记录。
两套kafka集群KafkaS和KafkaC分别位于两个不互通的网络域,跨域互访需要经过nginx代理机,现需要确认nginx、kafka的配置。
利用docker在一台虚拟机上创建两个网络不互通的kafka集群,同时在宿主机上部署nginx。
1、宿主机:Ubuntu 22.10
2、工具:docker、docker-compose
1、kafka镜像/zookeeper镜像:wurstmeister/zookeeper、wurstmeister/kafka
2、实验时版本:zookeeper
安装docker、docker-compose、nginx
sudo apt install docker.io docker-compose nginx -y
1、网络配置
networks:
kafka_server_net:
ipam:
config:
- subnet: 172.33.0.0/16
kafka_client_net:
ipam:
config:
- subnet: 172.34.0.0/16
2、zookeeper配置
zookeeperS:
image: wurstmeister/zookeeper
container_name: zookeeperS
restart: always
networks:
kafka_server_net:
ipv4_address: 172.33.0.10
zookeeperC的配置类似。
3、kafka配置
kafkaS1: image: wurstmeister/kafka depends_on: [ zookeeperS ] container_name: kafkaS1 environment: HOSTNAME: kafkaS1 KAFKA_BROKER_ID: 0 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafkaS1:9092 KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092 KAFKA_ZOOKEEPER_CONNECT: zookeeperS:2181/kafka # extra_hosts可以把地址映射添加到hosts文件里,后面有用。其中,统一集群的kafka映射为容器的IP地址,不同集群的kafka映射为宿主机地址,72.130是宿主机IP。 extra_hosts: kafkaS1: 172.33.0.11 kafkaS2: 172.33.0.12 kafkaS3: 172.33.0.13 kafkaC1: 192.168.72.128 kafkaC2: 192.168.72.128 kafkaC3: 192.168.72.128 networks: kafka_server_net: ipv4_address: 172.33.0.11
kafkaC的配置类似
4、完整的docker-compose文件
version: '3.8' services: # kafkaS && zookeeperS zookeeperS: image: wurstmeister/zookeeper container_name: zookeeperS restart: always networks: kafka_server_net: ipv4_address: 172.33.0.10 kafkaS1: image: wurstmeister/kafka depends_on: [ zookeeperS ] container_name: kafkaS1 environment: HOSTNAME: kafkaS1 KAFKA_BROKER_ID: 10 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafkaS1:9092 KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092 KAFKA_ZOOKEEPER_CONNECT: zookeeperS:2181/kafka networks: kafka_server_net: ipv4_address: 172.33.0.11 extra_hosts: kafkaS1: 172.33.0.11 kafkaS2: 172.33.0.12 kafkaS3: 172.33.0.13 kafkaC1: 192.168.72.128 kafkaC2: 192.168.72.128 kafkaC3: 192.168.72.128 kafkaS2: image: wurstmeister/kafka depends_on: [ zookeeperS ] container_name: kafkaS2 environment: HOSTNAME: kafkaS2 KAFKA_BROKER_ID: 11 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafkaS2:9092 KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092 KAFKA_ZOOKEEPER_CONNECT: zookeeperS:2181/kafka networks: kafka_server_net: ipv4_address: 172.33.0.12 extra_hosts: kafkaS1: 172.33.0.11 kafkaS2: 172.33.0.12 kafkaS3: 172.33.0.13 kafkaC1: 192.168.72.128 kafkaC2: 192.168.72.128 kafkaC3: 192.168.72.128 kafkaS3: image: wurstmeister/kafka depends_on: [ zookeeperS ] container_name: kafkaS3 environment: HOSTNAME: kafkaS3 KAFKA_BROKER_ID: 12 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafkaS3:9092 KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092 KAFKA_ZOOKEEPER_CONNECT: zookeeperS:2181/kafka networks: kafka_server_net: ipv4_address: 172.33.0.13 extra_hosts: kafkaS1: 172.33.0.11 kafkaS2: 172.33.0.12 kafkaS3: 172.33.0.13 kafkaC1: 192.168.72.128 kafkaC2: 192.168.72.128 kafkaC3: 192.168.72.128 # kafkaC && zookeeperC zookeeperC: image: wurstmeister/zookeeper container_name: zookeeperC restart: always networks: kafka_client_net: ipv4_address: 172.34.0.10 kafkaC1: image: wurstmeister/kafka depends_on: [ zookeeperC ] container_name: kafkaC1 environment: HOSTNAME: kafkaC1 KAFKA_BROKER_ID: 20 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafkaC1:9092 KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092 KAFKA_ZOOKEEPER_CONNECT: zookeeperC:2181/kafka networks: kafka_client_net: ipv4_address: 172.34.0.11 extra_hosts: kafkaS1: 192.168.72.128 kafkaS2: 192.168.72.128 kafkaS3: 192.168.72.128 kafkaC1: 172.34.0.11 kafkaC2: 172.34.0.12 kafkaC3: 172.34.0.13 kafkaC2: image: wurstmeister/kafka depends_on: [ zookeeperC ] container_name: kafkaC2 environment: HOSTNAME: kafkaC2 KAFKA_BROKER_ID: 21 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafkaC2:9092 KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092 KAFKA_ZOOKEEPER_CONNECT: zookeeperC:2181/kafka networks: kafka_client_net: ipv4_address: 172.34.0.12 extra_hosts: kafkaS1: 192.168.72.128 kafkaS2: 192.168.72.128 kafkaS3: 192.168.72.128 kafkaC1: 172.34.0.11 kafkaC2: 172.34.0.12 kafkaC3: 172.34.0.13 kafkaC3: image: wurstmeister/kafka depends_on: [ zookeeperC ] container_name: kafkaC3 environment: HOSTNAME: kafkaC3 KAFKA_BROKER_ID: 22 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafkaC3:9092 KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092 KAFKA_ZOOKEEPER_CONNECT: zookeeperC:2181/kafka networks: kafka_client_net: ipv4_address: 172.34.0.13 extra_hosts: kafkaS1: 192.168.72.128 kafkaS2: 192.168.72.128 kafkaS3: 192.168.72.128 kafkaC1: 172.34.0.11 kafkaC2: 172.34.0.12 kafkaC3: 172.34.0.13 # networks networks: kafka_server_net: ipam: config: - subnet: 172.33.0.0/16 kafka_client_net: ipam: config: - subnet: 172.34.0.0/16
docker-compose up -d启动所有容器后,开始验证环境是否符合要求。
1、网络连通性验证
本场景应当kafkaS和kafkaC集群间不能互访,内部三个节点可以互访;宿主机和两个集群可以互访。
1-1、kafka集群内部互访
先进入容器内:
docker -exec -it kafkaS1 bash
kafka容器没有telnet、ping等工具测试网络,但我们可以用nc:
nc -vz kafkaS2 9092
nc -vz zookeeperS 2181
以上测试应该会回显succeed
1-2、kafka集群间验证
nc -vz kafkaC1 9092
测试结果是refused,是否与我们预期不一样?别慌,这是因为kafkaC1被映射为宿主机的地址,因此是正常的,要测试kafka集群间连通性,应该直接用ip测试,这次结果就是超时了。
nc -vz 172.34.0.11 9092
1-3、代理(宿主机)与kafka的连通性验证
上一部分已经验证过,也可以在宿主机上Telnet端口,正常情况是通的。
2、kafka可用性验证
2-1、分别在两个kafka集群测试统一集群内kafka能否成功收发消息。
# 进入kafka测试脚本所在目录
cd /opt/kafka/bin/
# 创建主题
kafka-topics.sh --create --zookeeper zookeeperS:2181/kafka --replication-factor 1 --partitions 2 --topic testtopic
# 模拟生产消息
kafka-console-producer.sh --topic=testtopic --broker-list kafkaS1:9092,kafkaS2:9092,kafkaS3:9092
# 随便输入内容进行测试
# 进入另一个kafka容器,模拟消费
kafka-console-consumer.sh --bootstrap-server kafka01:9092,kafka02:9092,kafka03:9092 --from-beginning --topic testtopic
消费的窗口能看到生产的消息,则验证通过。
至此,模拟环境搭建完成,下面记录如何实现跨域通信。
kafka通过二次交互建立会话:服务端在第一次交互时会返回一个地址供客户端建立会话,这个交互机制可以防止中间人攻击,但对走代理的场景来说,配置就比较麻烦了。
一个kafka服务端会提供给多个客户端访问(含不经代理的访问),配置为返回代理地址不太合适,
kafka返回相同值给客户端,客户端却需要识别成各不相同、自己能连接的地址,这个问题很自然地就能想到用hosts解决。
docker-compose配置里已经给kafka节点都加上了extra_hosts,并根据连通性做好主机名和IP的映射。
映射加上之后,只需要再把nginx代理配好就能成功互访了。
这里我用四层代理进行转发。
在nginx.conf文件里新建stream块,和http同级。
在nginx配置目录下新建kafkaproxy.conf,具体如下:
# 新建配置文件
sudo nano /etc/nginx/conf.d/stream/kafkaproxy.conf
配置内容:
upstream KAFKA_SERVER
{
server 172.33.0.11:9092;
server 172.33.0.12:9092;
server 172.33.0.13:9092;
}
server {
listen *:9092;
proxy_pass KAFKA_SERVER;
proxy_connect_timeout 2s;
proxy_timeout 1m;
}
# 配置完毕后重载配置
sudo nginx -s reload
1、模拟发送消息
进入kafkaS任意节点,模拟发送消息
kafka-console-producer.sh --topic=testtopic --broker-list kafkaS1:9092,kafkaS2:9092,kafkaS3:9092
2、模拟接收消息
kafka-console-consumer.sh --bootstrap-server kafkaS1:9092,kafkaS2:9092,kafkaS3:9092 --from-beginning --topic testtopic
能相互通信
因为–from-beginning参数没去掉,所以有一些之前产生的消息。
但此时仍然是有问题的,通过代理访问kafka服务端的时候,有时会消费不到数据。这是负载均衡的问题,如果要避免,还是得配置多监听。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。