赞
踩
Kafka是最初由Linkedin公司开发,是一个分布式、分区的、多副本的、多订阅者,基于zookeeper协调的分布式日志系统(也可以当做MQ系统),常见可以用于web/nginx日志、访问日志,消息服务等等,Linkedin于2010年贡献给了Apache基金会并成为顶级开源项目。
主要应用场景是:日志收集系统和消息系统。
Kafka主要设计目标如下:
注意:Kafka的元数据都是存放在zookeeper中。kafka支持消息持久化,消费端为拉模型来拉取数据,消费状态和订阅关系有客户端负责维护,消息消费完后,不会立即删除,会保留历史消息。因此支持多订阅时,消息只会存储一份就可以了。
kafka有三层结构:kafka有多个主题,每个主题有多个分区,每个分区又有多条消息。
分区机制:主要解决了单台服务器存储容量有限和单台服务器并发数限制的问题 ,一个分片的不同副本不能放到同一个broker上。当主题数据量非常大的时候,一个服务器存放不了,就将数据分成两个或者多个部分,存放在多台服务器上。每个服务器上的数据,叫做一个分片
副本:副本备份机制解决了数据存储的高可用问题,当数据只保存一份的时候,有丢失的风险。为了更好的容错和容灾,将数据拷贝几份,保存到不同的机器上。
面试问题:
1. kafka的副本都有哪些作用?
2. follower副本为什么不对外提供服务?
从Kafka的大体角度上可以分为数据生产者,Kafka集群,还有就是消费者,而要保证数据的不丢失也要从这三个角度去考虑。
消息生产者保证数据不丢失:消息确认机制(ACK机制),参考值有三个:
0:无需等待来自broker的确认而继续发送下一批消息,效率最高,可靠性最低
1:收到Leader副本成功写入通知,就认为推送消息成功
-1:只有收到分区内所有副本的成功写入的通知才认为推送消息成功,效率最低,可靠性最高
//producer无需等待来自broker的确认而继续发送下一批消息。
//这种情况下数据传输效率最高,但是数据可靠性确是最低的。
properties.put(ProducerConfig.ACKS_CONFIG,"0");
//producer只要收到一个分区副本成功写入的通知就认为推送消息成功了。
//这里有一个地方需要注意,这个副本必须是leader副本。
//只有leader副本成功写入了,producer才会认为消息发送成功。
properties.put(ProducerConfig.ACKS_CONFIG,"1");
//ack=-1,简单来说就是,producer只有收到分区内所有副本的成功写入的通知才认为推送消息成功了。
properties.put(ProducerConfig.ACKS_CONFIG,"-1");
为什么消费者丢失数据?
解决方案:
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");
kafka在数据生产的时候,有一个数据分发策略。默认的情况使用DefaultPartitioner.class类。
public interface Partitioner extends Configurable, Closeable { /** * Compute the partition for the given record. * * @param topic The topic name * @param key The key to partition on (or null if no key) * @param keyBytes The serialized key to partition on( or null if no key) * @param value The value to partition on or null * @param valueBytes The serialized value to partition on or null * @param cluster The current cluster metadata */ public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster); /** * This is called when partitioner is closed. */ public void close(); }
/**
* Creates a record to be sent to a specified topic and partition
*
* @param topic The topic the record will be appended to
* @param partition The partition to which the record should be sent
* @param key The key that will be included in the record
* @param value The record contents
*/
public ProducerRecord(String topic, Integer partition, K key, V value) {
this(topic, partition, null, key, value, null);
}
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { //获取该topic的分区列表 List<PartitionInfo> partitions = cluster.partitionsForTopic(topic); //获得分区的个数 int numPartitions = partitions.size(); //如果key值为null if (keyBytes == null) { //如果没有指定key,那么就是轮询 // 维护一个key为topic的ConcurrentHashMap,并通过CAS操作的方式对value值执行递增+1操作 int nextValue = nextValue(topic); //获取该topic的可用分区列表 List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic); if (availablePartitions.size() > 0) { //如果可用分区大于0 // 执行求余操作,保证消息落在可用分区上 int part = Utils.toPositive(nextValue) % availablePartitions.size(); return availablePartitions.get(part).partition(); } else { // 没有可用分区的话,就给出一个不可用分区 return Utils.toPositive(nextValue) % numPartitions; } } else { //不过指定了key,key肯定就不为null // 通过计算key的hash,确定消息分区 return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions; } }
同一个分区中的数据,只能被一个消费者组中的一个消费者所消费。例如 P0分区中的数据不能被Consumer Group A中C1与C2同时消费。
消费组:一个消费组中可以包含多个消费者,properties.put(ConsumerConfig.GROUP_ID_CONFIG,“groupName”);
如果该消费组有四个消费者,主题有四个分区,那么每人一个。多个消费组可以重复消费消息。
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>1.0.0</version>
</dependency>
public class ProducerDemo { public static String topic = "test";//定义主题 public static void main(String[] args) throws Exception { Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.200.20:9092,192.168.200.20:9093,192.168.200.20:9094"); //网络传输,对key和value进行序列化 properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); //创建消息生产对象,需要从properties对象或者从properties文件中加载信息 KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties); try { while (true) { //设置消息内容 String msg = "Hello," + new Random().nextInt(100); //将消息内容封装到ProducerRecord中 ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, msg); kafkaProducer.send(record); System.out.println("消息发送成功:" + msg); Thread.sleep(500); } } finally { kafkaProducer.close(); } } }
public class ProducerDemo { public static String topic = "lagou";//定义主题 public static void main(String[] args) throws Exception { Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.200.20:9092,192.168.200.20:9093,192.168.200.20:9094"); //网络传输,对key和value进行序列化 properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); //指定组名 properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test"); KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties); kafkaConsumer.subscribe(Collections.singletonList(ProducerDemo.topic));// 订阅消息 while (true) { ConsumerRecords<String, String> records = kafkaConsumer.poll(100); for (ConsumerRecord<String, String> record : records) { System.out.println(String.format("topic:%s,offset:%d,消息:%s", record.topic(), record.offset(), record.value())); } } } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。