赞
踩
Apache Kafka是一个分布式的发布-订阅消息系统,能够支撑海量数据的数据传递,在离线和实时的消息处理业务系统中,Kafka都有广泛的应用。Kafka将消息持久化到磁盘中,并对消息创建了备份保证了数据的安全。Kafka在保证了较高的处理速度的同时,又能保证数据处理的低延迟和数据的零丢失。
Producer
:生产者即数据的发布者,该角色将消息发布到kafka的topic中。broker接收到生产者发送的消息后,broker将该消息追加到当前用于追加数据的segment文件中。生产者发送的消息,存储到一个partition中,生产者也可以指定数据存储的partition。Consumer
:消费者可以从broker中读取数据。消费者可以消费多个topic中的数据。Topic
:在kafka中,使用一个类别属性来划分数据的所属类,划分数据的这个类称为topic,如果把kafka看作成一个数据库,topic可以理解为数据库中的一张表,topic的名字即为表名。Partition
:topic中的数据分割成一个或多个partition。每个topic至少有一个partition。每个partition中的数据使用多个segment文件存储。partition中的数据是有序的,partition间的数据丢失了数据的顺序,如果topic有多个partition,消费数据时就不能保证数据的顺序,在需要严格保证消息的顺序消费的场景下,需要将partition数目设为1。Partition offset
:每条消息都有一个当前Partition下唯一的64字节的offset,它指明了这条消息的起始位置。Replicas of partition
:副本是一个分区的备份。副本不会被消费者消费,副本只用于防止数据丢失,即消费者不从为follower的partition中消费数据,而是从为leader的partition中读取数据,副本之间是一主多从的关系。Broker
:Kafka集群包含一个或多个服务器,服务器节点称为broker。broker存储topic的数据,如果某topic中有N个partition,集群有N个broker,那么每个broker存储该topic的一个partition。如果某topic有N个partition,集群有(N+M)个broker,那么其中有N个broker存储该topic的一个partition,剩下的M个broker不存储该topic的partition数据。如果某topic有N个partition,集群中broker数目少于N个,那么一个broker存储该topic的一个或多个partition,在实际生产环境中,尽量避免这种情况的发生,这种情况容易导致kafka集群数据不均衡。Leader
:每个partition有多个副本,其中有且仅有一个作为Leader,Leader是当前负责数据的读写的partition。Follower
:Follower跟随Leader,所有写请求都通过Leader路由,数据变更会广播给所有Follower,Follower与Leader保持数据同步。如果Leader失效,则从Follower中选举出一个新的Leader。当Follower与Leader挂掉、卡住或者同步太慢,Leader会把这个follower从“in sync replicas”(ISR)列表中删除,重写创建一个Follower。Zookeeper
:Zookeeper负责维护和协调broker,当kafka系统中新增了broker或者某个broker发生故障失效时,由Zookeeper通知生产者和消费者。生产者和消费者依据Zookeeper的broker状态信息与broker协调数据的发布和订阅任务。AR(Assigned Replicas)
:分区中所有的副本统称为AR。ISR(In-Sync Replicas)
:所有与Leader部分保持一定程度的副本组成ISR(baokuoLeader副本在内)。OSR(Out-of-Sync-Replicas)
:与Leader副本同步滞后过多的副本。HW(High Watermark)
:高水位,标识了一个特定的offset,消费者只能拉取到这个offset之前的消息。LEO(Log End Offset)
:即日志末端位移(log end offset),记录了该副本底层日志(log)中下一条消息的位移值,注意时下一条消息!也就是说,如果LEO=10,那么标识该副本保存了10条消息,位移值范围时[0,9]官网下载安装解压缩:http://kafka.apache.org/downloads
下载解压启动
#启动命令:
bin/kafka-server-start.sh config/server.properties
#server.properties配置中需要关注以下几个参数
#表示broker的编号,如果集群中有多个broker,则每个broker的编号需要设置的不同
broker.id=0
#broker对外提供的服务入口地址
listeners=PLAINTEST://:9092
#设置存放消息日志文件的地址
log.dirs=/tmp/kafka/log
#kafka所需zookeeper集群地址
zookeeper.connect=localhost:2181
启动成功之后重写打开一个终端,验证启动进程
命令:bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic hello --partitions 2 --replication-factor 1
bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic hello --partitions 2 --replication-factor 1
# --zookeeper:指定了kafka所连接的zookeeper服务地址
# --topic:指定了所要创建主题的名称
# --partitions:指定了分区个数
# --replication-factor:指定了副本因子
# --create:创建主题的动作指令
Created topic hello
命令:bin/kafka-topics.sh --zookeeper localhost:2181 --list
bin/kafka-topics.sh --zookeeper localhost:2181 --list
hello
命令:bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic hello
–describe 查看详情动作指令
bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic hello
Topic:hello PartitionCount:2 ReplicationFactor:1 configs:
Topic:hello Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Topic:hello Partition: 1 Leader: 0 Replicas: 0 Isr: 0
命令:bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic hello
–bootstrap-server 指定了连接Kafka集群的地址
–topic 指定了消费端订阅的主题
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic hello
hello,kafka #接收到的消息
命令:bin/kafka-console-producer.sh --broker-list localhost:9092 --topic hello
–broker-list 指定了连接的kafka集群的地址
–topic 指定了发送消息时的主题
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic hello
>hello,kafka #发送消息
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</dependency>
/** * kafka消息生产者 **/ public class ProducerFastStart{ //kafka集群地址 private static final String brokerList="localhost:9092"; //主题名称-之前已经创建的 private static final String topic="hello"; public static void main(String[] args){ Properties properties = new Properties(); //设置key序列化器 properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSeriallizer"); //另一种写法 // properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, // StringSerializer.class.getName()); //设置重试次数 properties.put(ProducerConfig.RETRIES_CONFIG,10); //设置值序列化器 properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSeriallizer"); //设置集群地址 properties.put("bootstrap.servers,brokerList"); //kafkaproducer线程安全 KafkaProducer<String,String> producer = new KafkaProducer<>(properties); producerRecord<String,String> record = new ProducerRecord<>(topic, "kafka-demo-001","hello,Kafka!"); try{ producer.send(record); }catch(Exception e){ e.printStackTrace(); } producer.close(); } }
/** * Kafka消息消费者 */ public class ConsumerFastStart{ //Kafka集群地址 private static final String brokerList = "127.0.0.1:9092"; //主题名称-之前已经创建的 private static final String topic = "hello"; //消费组 private static final String groupId = "group.demo"; public static void main(String[] args){ Properties properties = new Properies(); properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSeriallizer"); properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSeriallizer"); properties.put("bootstrap.servers",brokerList); properties.put("group.id",groupId); KfakaConsumer<String,String> consumer = new KafkaConsumer<>(properties); consumer.subscribe(Collections.singletonList(topic)); while(true){ ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(1000)); for(ConsumerRecord<String,String> record : records){ System.out.println(record.value()); } } } }
warning:使用java连接linux下kafka集群需要设置hosts绑定
先启动消费者,在启动生产者进行消息的发送
参数参考:config/server.properties
zookeeper.connect
指明zookeeper主机地址,如果zookeeper时集群则以逗号隔开,
如:192.168.179.128:2181,192.168.179.129:2181,192.168.179.130:2181
listeners
监听列表,broker对外提供服务时绑定的IP和端口,多个以逗号隔开,如果监听器名称不是一个安全的协议,listener.security.protocol.map也必须设置。主机名称设置0.0.0.0绑定所有的端口,主机名称为空则绑定默认的接口,如:PLAINTESXT://myhost:9092,SSL://:9091 CLIENT://0.0.0.0:9092,REPLICATION://localhost:9093
broker.id
broker的唯一标识符,如果不匹配则自动生成,建议配置且一定要保证集群中必须唯一,默认-1。
log.dirs
日志数据存放的目录,如果没有配置则使用log.dir,建议此项配置。
message.max.bytes
服务器接受单个消息的最大大小,默认1000012约等于976.6KB。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。