赞
踩
一:kafka介绍
kafka(官网地址:http://kafka.apache.org)是一种高吞吐量的分布式发布订阅的消息队列系统,具有高性能和高吞吐率。
producer创建一个topic时,可以指定该topic为几个partition(默认是1,配置num.partitions),然后会把partition分配到每个broker上,分配的算法是:a个broker,第b个partition分配到b%a的broker上,可以指定有每个partition有几分副本Replication,副本的分配策略为:第c个副本存储在第(b+c)%a的broker上。一个partition在每个broker上是一个文件夹,文件夹中文件的命名方式为:topic名称+有序序号。每个partition中文件是一个个的segment,segment file由.index和.log文件组成。两个文件的命名规则是,上一个segmentfile的最后一个offset。这样,可以快速的删除old文件。
producer往kafka里push数据,会自动的push到所有的分区上,消息是否push成功有几种情况:1,接收到partition的ack就算成功,2全部副本都写成功才算成功;数据可以存储多久,默认是两天;producer的数据会先存到缓存中,等大小或时间达到阈值时,flush到磁盘,consumer只能读到磁盘中的数据。
consumer从kafka里poll数据,poll到一定配置大小的数据放到内存中处理。每个group里的consumer共同消费全部的消息,不同group里的数据不能消费同样的数据,即每个group消费一组数据。
consumer的数量和partition的数量相等时消费的效率最高。这样,kafka可以横向的扩充broker数量和partitions;数据顺序写入磁盘;producer和consumer异步
二:环境搭建(windows)
kafka需要用到zookeeper,所以需要先安装zookeeper
三:基于.net的常用类库
基于.net实现kafka的消息队列应用,常用的类库有kafka-net,Confluent.Kafka,官网推荐使用Confluent.Kafka
,本文也是基于该库的实现,使用版本预发行版1.0.0-beta
,创建控制台应用程序。
四:应用–生产者
生产者将数据发布到指定的主题,一般生产环境下的负载均衡,服务代理会有多个,BootstrapServers属性则为以逗号隔开的多个代理地址
/// <summary> /// 生产者 /// </summary> public static void Produce() { var config = new ProducerConfig { BootstrapServers = "localhost:9092" } Action<DeliveryReportResult<Null, string>> handler = r => Console.WriteLine(!r.Error.IsError ? $"Delivered message to {r.TopicPartitionOffset}" : $"Delivery Error: {r.Error.Reason}"); using (var producer = new Producer<Null, string>(config)) { // 错误日志监视 producer.OnError += (_, msg) => { Console.WriteLine($"Producer_Erro信息:Code:{msg.Code};Reason:{msg.Reason};IsError:{msg.IsError}"); }; for (int i = 0; i < 5; i++) { // 异步发送消息到主题 producer.BeginProduce("MyTopic", new Message<Null, string> { Value = i.ToString() }, handler); } // 3后 Flush到磁盘 producer.Flush(TimeSpan.FromSeconds(3)); } }
五:应用–消费者
消费者使用消费者组名称标记自己,并且发布到主题的每个记录被传递到每个订阅消费者组中的一个消费者实例。消费者实例可以在单独的进程中,也可以在不同的机器
如果所有消费者实例具有相同的消费者组,则记录将有效地在消费者实例上进行负载平衡。
如果所有消费者实例具有不同的消费者组,则每个记录将广播到所有消费者进程
上图为两个服务器Kafka群集,托管四个分区(P0-P3),包含两个消费者组。消费者组A有两个消费者实例,B组有四个消费者实例。
默认EnableAutoCommit
是自动提交,只要从队列取出消息,偏移量自动移到后一位,无论消息后续处理成功与否,该条消息都会消失,所以为免除处理失败的数据丢失,消费者方可设置该属性为false
,后面进行手动commint()
提交偏移
/// <summary> /// 消费者 /// </summary> public static void Consumer() { var conf = new ConsumerConfig { GroupId = "test-consumer-group", BootstrapServers = "localhost:9092", AutoOffsetReset = AutoOffsetResetType.Earliest, EnableAutoCommit = false // 设置非自动偏移,业务逻辑完成后手动处理偏移,防止数据丢失 }; using (var consumer = new Consumer<Ignore, string>(conf)) { // 订阅topic consumer.Subscribe("MyTopic"); // 错误日志监视 consumer.OnError += (_, msg) => { Console.WriteLine($"Consumer_Error信息:Code:{msg.Code};Reason:{msg.Reason};IsError:{msg.IsError}"); }; while (true) { try { var consume = consumer.Consume(); string receiveMsg = consume.Value; Console.WriteLine($"Consumed message '{receiveMsg}' at: '{consume.TopicPartitionOffset}'."); // 开始我的业务逻辑 ... // 业务结束 if(成功) { consumer.Commit(new List<TopicPartitionOffset>() { consume.TopicPartitionOffset }); //手动提交偏移 } } catch (ConsumeException e) { Console.WriteLine($"Consumer_Error occured: {e.Error.Reason}"); } } } }
常见数据问题处理
Kafka 可视化调试
借助可视化客户端工具 kafka tool
具体使用可参考:https://www.cnblogs.com/frankdeng/p/9452982.html
END |
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。