赞
踩
在我们大量使用分布式数据库、分布式计算集群的时候,是否会遇到这样的一些问题:想分析一下用户行为,一遍我能设计出更好的广告位,相对用户的搜索关键字尽心统计,分析出前卫的流行趋势,有些数据存数据库浪费,直接存硬盘操作效率又低。kafka在这样的场景下就是个不错的选择。
在centos中,拉取zookeeper镜像,以及创建zookeeper容器:
- 1)docker pull wurstmeister/zookeeper
- 2)docker run -d --restart=always --log-driver json-file --log-opt max-size=100m --log-opt max-file=2 --name zookeeper -p 2181:2181 -v /etc/localtime:/etc/localtime wurstmeister/zookeeper
注:读者请自行改为本机ip,此文采用的单例,而非集群,如需集群 读者请自行创建.
在centos中,创建kafka容器:
- 1)docker pull wurstmeister/kafka
- 2)docker run -d --restart=always --log-driver json-file --log-opt max-size=100m --log-opt max-file=2 --name kafka -p 9092:9092 -e KAFKA_BROKER_ID=0 -e KAFKA_ZOOKEEPER_CONNECT=192.168.244.132/kafka -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.244.132:9092 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 -v /etc/localtime:/etc/localtime wurstmeister/kafka
-
- 参数说明:
- -e KAFKA_BROKER_ID=0 在kafka集群中,每个kafka都有一个BROKER_ID来区分自己
-
- -e KAFKA_ZOOKEEPER_CONNECT=192.168.244.132:2181/kafka 配置zookeeper管理kafka的路径172.16.0.13:2181/kafka
-
- -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.244.132:9092 把kafka的地址端口注册给zookeeper,如果是远程访问要改成外网IP,类如Java程序访问出现无法连接。
-
- -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 配置kafka的监听端口
-
- -v /etc/localtime:/etc/localtime 容器时间同步虚拟机的时间
-
- 5、验证kafka是否可以使用
-
- 5.1、进入容器
- $ docker exec -it kafka bash
-
- 5.2、进入 /opt/kafka_2.12-2.3.0/bin/ 目录下
- $ cd /opt/kafka_2.12-2.3.0/bin/
-
- 5.3、运行kafka生产者发送消息
- $ ./kafka-console-producer.sh --broker-list localhost:9092 --topic sun
-
- 发送消息
- > {"datas":[{"channel":"","metric":"temperature","producer":"ijinus","sn":"IJA0101-00002245","time":"1543207156000","value":"80"}],"ver":"1.0"}
-
- 5.4、运行kafka消费者接收消息
- $ ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic sun --from-beginning

注:由于docker kafka版本不断迭代,KAFKA_ADVERTISED_HOST_NAME 不再建议是用localhost/127.0.0.1,故需要使用KAFKA_ADVERTISED_HOST_NAME= ip地址
- 1、fafka启动后,通过docker exec -it kafka bash进入后,不一会儿,自动退出来了。
- 【问题原因】启动容器设置了--restart=always,通过日志发现启动失败,一直在重启。并指向host不可达。
- image.png
- 【解决方案】关闭防火墙
- systemctl stop firewalld
- 2、关闭防火墙后重启container失败
- docker start 8dfac2df549c
- Error response from daemon: driver failed programming external connectivity on endpoint kafka (2bb0b0d19ea751146218f99d3f496db714dd0bc19105663b71fc56f65ebb0592): (iptables failed: iptables --wait -t nat -A DOCKER -p tcp -d 0/0 --dport 9092 -j DNAT --to-destination 172.17.0.3:9092 ! -i docker0: iptables: No chain/target/match by that name
-
- 【问题原因】关闭防火墙可能会清空iptables表,需要重建
- 【解决方案】重启docker
进入到kafka容器中 并创建topic 生产者,执行如下命令:
- 1)docker exec -it kafka bash
- 2)cd /opt/kafka_2.11-2.0.0/bin/
- 3)./kafka-topics.sh --create --zookeeper zookeeper:2181 --replication-factor 1 --partitions 8 --topic test
- 4)./kafka-console-producer.sh --broker-list localhost:9092 --topic test
执行上诉命令后,另起一个标签页,执行如下命令 创建kafka消费者消费消息:
- 1)docker exec -it kafka bash
- 2)cd /opt/kafka_2.11-2.0.0/bin/
- 3)./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
执行完上诉命令后,在生产者窗口中 输入任意内容回车,即可在消费者的窗口查看到消息
kafka-manager是目前最受欢迎的kafka集群管理工具,最早由雅虎开源,用户可以在Web界面执行一些简单的集群管理操作。具体支持以下内容:
管理多个集群 轻松检查群集状态(主题,消费者,偏移,代理,副本分发,分区分发) 运行首选副本选举 使用选项生成分区分配以选择要使用的代理 运行分区重新分配(基于生成的分配) 使用可选主题配置创建主题(0.8.1.1具有与0.8.2+不同的配置) 删除主题(仅支持0.8.2+并记住在代理配置中设置delete.topic.enable = true) 主题列表现在指示标记为删除的主题(仅支持0.8.2+) 批量生成多个主题的分区分配,并可选择要使用的代理 批量运行重新分配多个主题的分区 将分区添加到现有主题 更新现有主题的配置 在centos中,执行如下命令拉取镜像,创建对应容器,以及打开防火墙:
- 1)docker pull docker.io/sheepkiller/kafka-manager
- 2)docker run -it -d --rm -p 9000:9000 -e ZK_HOSTS="192.168.244.132:2181" --net=host sheepkiller/kafka-manager
- 3)firewall-cmd --add-port=9000/tcp
创建成后,在浏览器中访问 http://192.168.244.132:9000
读者通过配置 zookeeper集群 以及kafka版本,进行操作kafka
本文的配置 => Cluster Zookeeper Hosts:192.168.244.132:2181 Kafka Version: 0.8.2.2
如果读者验证过第三步,可以发现在右上角的菜单consumer发现有一个消费者。
附录: 查看消息主题列表:
- 1)./kafka-topics.sh --list --zookeeper zookeeper:2181
- 2)test
查看指定topic信息:
./kafka-topics.sh --describe --zookeeper zookeeper:2181 --topic test
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。