赞
踩
Kafka是一个分布式流媒体平台,它主要有3种功能:
2)以容错的方式记录消息流,kafka以文件的方式来存储消息流
3)可以再消息发布的时候进行处理
推荐使用最新版本,新版内置zookeeper,本案例使用 kafka_2.12-2.8.0.tgz
http://kafka.apache.org/downloads
/usr/local/kafka
解压缩
tar zxvf kafka_2.12-2.8.0.tgz
/usr/local/kafka/kafka_2.12-2.8.0/config/server.properties
#kafka服务器在集群中的唯一ID
broker.id=1
#如果本机则不用配置,远程连接需要配置此项
advertised.listeners=PLAINTEXT://你的主机ip:9092
#zookeeper所在地址,使用自带的不用配置此项
zookeeper.connect=zookeeper所在主机ip:2181
/usr/local/kafka/kafka_2.12-2.8.0/config/customer.properties
#消费组id,可以自行配置,这里使用默认
group.id=test-consumer-group
/usr/local/kafka/kafka_2.12-2.8.0
bin/zookeeper-server-start.sh config/zookeeper.properties &
bin/kafka-server-start.sh config/server.properties &
//创建topic
bin/kafka-topics.sh --create --zookeeper (zookeeper主机ip默认填写127.0.0.1):2181 --topic (topic名称) --partitions 1 --replication-factor 1
//查看topic
bin/kafka-topics.sh --list --zookeeper (主机ip):2181 (topic名称)
bin/kafka-console-producer.sh --broker-list (zookeeper主机ip默认填写127.0.0.1):9092 --topic (topic名称)
bin/kafka-console-consumer.sh --bootstrap-server (zookeeper主机ip默认填写127.0.0.1):9092 --topic (topic名称) --from-beginning
创建完毕后,生产者输入消息,消费者会接受到消息
<!-- kafka -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.8.0</version>
</dependency>
package com.rj.kafka.Test01; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer; import java.util.Date; import java.util.Properties; /** * @author JZH * @desc 生产者 * @time 2021-05-19-9:24 */ public class Producer_Test01 { //kafka主机ip public static final String BROKER_LIST = "10.205.63.255:9092"; //topic名 public static final String TOPIC = "test"; public static void main(String[] args) { //创建配置容器 Properties properties = new Properties(); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName()); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,BROKER_LIST); Producer producer = null; try { //实例化producer对象 producer = new KafkaProducer(properties); //发送信息 for(int i = 0 ; i<10 ; i++){ String msg = new Date().toString()+"这是第"+(i+1)+"条信息"; producer.send(new ProducerRecord(TOPIC,msg)); //模仿延迟 //Thread.sleep(500); //打印信息 System.out.println("msg = " + msg); } }catch (Exception e){ }finally { producer.close(); } } }
package com.rj.kafka.Test01; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import java.time.Duration; import java.util.Arrays; import java.util.Properties; import java.util.concurrent.atomic.AtomicBoolean; /** * @author JZH * @desc * @time 2021-05-19-9:24 */ public class Customer_Test01 { //kafka主机ip public static final String BROKER_LIST = "10.205.63.255:9092"; //tipic public static final String TOPIC = "test"; //配置的消费组id public static final String GROUP_ID = "test-consumer-group"; public static void main(String[] args) { Properties properties = init(); //实例化消费者对象 KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties,new StringDeserializer(),new StringDeserializer()); consumer.subscribe(Arrays.asList(TOPIC)); //监听 try { while (true){ ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(1000)); for (ConsumerRecord<String,String> record : records) { System.out.println(record.offset()+"--->"+record.value()); System.out.println("======================================="); } } } finally { consumer.close(); } } /** * 初始化配置 * @return */ public static Properties init(){ Properties properties = new Properties(); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,BROKER_LIST); properties.put(ConsumerConfig.GROUP_ID_CONFIG,GROUP_ID); //properties.put(ConsumerConfig.CLIENT_ID_CONFIG,"consumer.client.id.demo"); return properties; } }
192.168.128.001
196.168.128.002
196.168.128.003
将每台服务器上的Kafka中server.properties修改一下
将001号服务器上的zookeeper作为主
其他服务器上的kafka都使用001号上的zookeeper
#001
broker.id=001
host.name=192.168.128.001
zookeeper.connection=192.168.128.001:2181
#002
broker.id=002
host.name=192.168.128.002
zookeeper.connection=192.168.128.001:2181
#003
broker.id=003
host.name=192.168.128.003
zookeeper.connection=192.168.128.001:2181
配置完成后
1)开启001服务器上的zookeeper
2)依次开启3台服务器上的Kafka
任意服务器上创建topic
因为有3台服务器,所以这里–replication-factor 是3
bin/kafka-topics.sh --create --zookeeper 192.168.128.001:2181 --replication-factor 3 -partitions 1 --topic 1test
002开启一个producer向003发送消息
bin/kafka-console-producer.sh --broker-list 192.168.128.003:9092 --topic 1test
this is a message to 132 broker
停止003的kafka,此时可用的服务器只剩下001,002 因为 主服务器是001所以并没有进行选举
在001上开启一个consumer
bin/kafka-console-consumer.sh --zookeeper 192.168.128.001:2181 --topic 1test --from-beginning
this is a message to 132 broker
发现停掉了003,依然能通过zookeeper从001和002的kafka上收到发往132的消息。
1.java连接Kafka生产者,消费者没反应,也不报错
解决方案:没有正确配置server.properties中的 advertised.listeners ,这里的ip使用ifconfig查询就可以,使用127.0.0.1经过测试无效
2.报jdk没有找到错误
解决方案:先查看linux是否已经配置好jdk
//Linux管理员模式下使用命令
sudo gedit /etc/profile
//进入后在文件最后一行查看 JAVA_HOME jdk地址是否正确
编写时间2021年5月19日
接Kafka生产者,消费者没反应,也不报错
解决方案:没有正确配置server.properties中的 advertised.listeners ,这里的ip使用ifconfig查询就可以,使用127.0.0.1经过测试无效
2.报jdk没有找到错误
解决方案:先查看linux是否已经配置好jdk
//Linux管理员模式下使用命令
sudo gedit /etc/profile
//进入后在文件最后一行查看 JAVA_HOME jdk地址是否正确
赞
踩
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。