赞
踩
kafka_2.12-2.4.0.tgz(带zookeeper)
tar zxvf kafka_2.12-2.4.0.tgz -C /data
mv kafka_2.12-2.4.0 kafka
vim /data/kafka/config/server.properties:
############################# Server Basics ############################# # The id of the broker. This must be set to a unique integer for each broker. broker.id=0 host.name=10.7.2.201 listeners=SASL_PLAINTEXT://10.7.2.201:9092 advertised.listeners=SASL_PLAINTEXT://10.7.2.201:9092 security.inter.broker.protocol=SASL_PLAINTEXT sasl.enabled.mechanisms=PLAIN sasl.mechanism.inter.broker.protocol=PLAIN authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer allow.everyone.if.no.acl.found=true
在kafka安装目录下创建一个 kafka_server_jaas.conf 文件
vim /data/kafka/kafka_server_jaas.conf:
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="adminpasswd"
user_admin="adminpasswd"
user_producer="producerpwd"
user_consumer="consumerpwd";
};
说明:该配置通过org.apache.org.apache.kafka.common.security.plain.PlainLoginModule由指定采用PLAIN机制,定义了用户。
vim /data/kafka/bin/kafka-server-start.sh ,找到 export KAFKA_HEAP_OPTS , 添加jvm 参数为kafka_server_jaas.conf文件:
-Djava.security.auth.login.config=/data/kafka/kafka_server_jaas.conf
如下图:
# 先启动zookeeper
/data/kafka/bin/zookeeper-server-start.sh -daemon /data/kafka/config/zookeeper.properties
# 启动kafka
/data/kafka/bin/kafka-server-start.sh -daemon /data/kafka/config/server.properties
在/root 路径下新拷贝一个kafka安装包,解压后重命名,作为kafka客户端,客户端路径 /root/kafka
在 /root/kafka 路径下创建一个 kafka_client_jaas.conf 文件:
vim /root/kafka/kafka_client_jaas.conf:
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="producer"
password="producerpwd";
};
说明: 这里配置用户名和密码需要和服务端配置的账号密码保持一致,这里配置了producer这个用户
vim /root/kafka/bin/kafka-console-producer.sh ,找到 “x$KAFKA_HEAP_OPTS”,添加以下参数:
-Djava.security.auth.login.config=/root/kafka/kafka_client_jaas.conf
如下图:
vim /root/kafka/bin/kafka-console-consumer.sh ,找到 “x$KAFKA_HEAP_OPTS”,添加以下参数:
-Djava.security.auth.login.config=/root/kafka/kafka_client_jaas.conf
如下图:
# 开启生产者:
/root/kafka/bin/kafka-console-producer.sh --broker-list 10.7.2.201:9092 --topic test --producer-property security.protocol=SASL_PLAINTEXT --producer-property sasl.mechanism=PLAIN
# 开启消费者:
/root/kafka/bin/kafka-console-consumer.sh --bootstrap-server 10.7.2.201:9092 --topic test --from-beginning --consumer-property security.protocol=SASL_PLAINTEXT --consumer-property sasl.mechanism=PLAIN
生产者可正常生产数据,消费者能消费到数据
下载模块包:kafka-python-2.0.2.tar.gz,使用手册:https://kafka-python.readthedocs.io/en/master/apidoc/KafkaProducer.html
解压后,执行以下命令进行安装:
python setup.py install
# -*- coding: utf-8 -*- from __future__ import absolute_import from __future__ import print_function from kafka import KafkaProducer import json import sys reload(sys) sys.setdefaultencoding('utf8') def produceLog(topic_name, log): kafka_producer= KafkaProducer( sasl_mechanism="PLAIN", security_protocol='SASL_PLAINTEXT', sasl_plain_username="producer", sasl_plain_password="producerpwd", bootstrap_servers='10.7.2.201:9092' ) kafka_producer.send(topic_name, log, partition=0) kafka_producer.flush() kafka_producer.close() for i in range(10): sendlog_dict = {"name":"测试"} sendlog_dict['ID'] = i sendlog_srt = json.dumps(sendlog_dict, ensure_ascii=False) produceLog("pythontest", sendlog_srt)
执行后,在kafka中查看 pythontest 主题中的数据,如下图:
数据写入成功
# -*- coding: utf-8 -*- from __future__ import absolute_import from __future__ import print_function from kafka import KafkaConsumer from kafka import TopicPartition import sys reload(sys) sys.setdefaultencoding('utf8') consumer= KafkaConsumer( 'pythontest', # 消息主题 group_id="test", client_id="python", sasl_mechanism="PLAIN", security_protocol='SASL_PLAINTEXT', sasl_plain_username="producer", sasl_plain_password="producerpwd", bootstrap_servers='10.7.2.201:9092', auto_offset_reset='earliest' # 从头开始消费 ) print("主题的分区信息:{}".format(consumer.partitions_for_topic('pythontest'))) print("主题列表:{}".format(consumer.topics())) print("当前消费者订阅的主题:{}".format(consumer.subscription())) print("当前消费者topic分区信息:{}".format(consumer.assignment())) print("当前消费者可消费的偏移量:{}".format(consumer.beginning_offsets(consumer.assignment()))) for msg in consumer: print(msg.value)
运行结果:
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。