赞
踩
下载jdk linux版本https://pan.baidu.com/s/1hqKKL3e
将jdk压缩包放入服务器上
执行解压操作 tar zxf jdk-8u11-linux-x64.tar.gz -C /user/java
配置环境变量
执行vi /etc/profile操作
export JAVA_HOME=/user/java/jdk1.8.0_11
export PATH=$JAVA_HOME/bin:$PATH
export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
让profile文件生效
source /etc/profile
测试jdk是否安装成功
执行java -version
http://apache.fayea.com/zookeeper/zookeeper-3.4.10/zookeeper-3.4.10.tar.gz
配置zk节点的hosts文件:配置3台机器的ip地址和主机名的对应关系,其hosts文件添加下面3行
192.168.163.130 centos1
192.168.163.131 centos2
192.168.163.132 centos3
解压
tar zxf zookeeper-3.4.10 -C /user/java
创建快照日志存放目录
[root@centos1 zookeeper-3.4.10]# mkdir -p dataDir
创建事务日志存放目录
[root@centos1 zookeeper-3.4.10]# mkdir dataLogDir
修改配置文件,添加如下内容
[root@centos1 zookeeper-3.4.10]# cd conf
[root@centos1 conf]# mv zoo_sample.cfg zoo.cfg
[root@centos1 conf]# vi zoo.cfg
# 存放数据文件
dataDir=/user/java/zookeeper-3.4.10/dataDir
# 存放日志文件
dataLogDir=/user/java/zookeeper-3.4.10/dataLogDir
# zookeeper cluster,2888为选举端口,3888为心跳端口
server.1=centos1:2888:3888
server.2=centos2:2888:3888
server.3=centos3:2888:3888
在我们配置的dataDir指定的目录下面,创建一个myid文件,里面内容为一个数字,用来标识当前主机,conf/zoo.cfg文件中配置的server.X中X为什么数字,则myid文件中就输入这个数字
[root@centos1 ~]# echo "1" > /user/java/zookeeper-3.4.10/dataDir/myid
注意关闭防火墙
firewall-cmd --state 查看防火墙状态
systemctl stop firewalld.service 停止firewall
systemctl disable firewalld.service 禁止firewall开机启动
启动
[root@centos1 bin]# ./zkServer.sh start
测试zk集群
[root@centos1 bin]# ./zkServer.sh status
JMX enabled by default
Using config: /usr/local/zookeeper-3.4.6/bin/../conf/zoo.cfg
Mode: follower
http://kafka.apache.org/downloads
解压
tar zxf kafka_2.11-2.1.0.tgz -C /user/java
修改配置
[root@centos1 kafka_2.10-0.8.1]# cd config/
[root@centos1 config]# vi server.properties
创建消息持久化目录
[root@centos1 kafka_2.10-0.8.1]# mkdir kafkaLogs
advertised.host.name参数用来配置返回的host.name值,把这个参数配置为IP地址。这样客户端在使用java.net.InetAddress.getCanonicalHostName()获取时拿到的就是ip地址而不是主机名。
启动集群
[root@centos1 kafka_2.10-0.8.1]# JMX_PORT=9997 bin/kafka-server-start.sh -daemon config/server.properties &
Kafka 是一个分布式的,可划分的,多订阅者,冗余备份的持久性的日志服务。以可水平扩 展和高吞吐率而被广泛使用。它主要用于处理活跃的流式数据。
Kafka是分布式发布-订阅消息系统,是一个分布式的,可划分的,多订阅者,冗余备份的持久性的日志服务。以可水平扩展和高吞吐率而被广泛使用,它主要用于处理活跃的流式数据。
生产者向特定的topic生产消息,而消费者通过订阅topic,能够准实时的拉取到该topic新消息
Kafka采用消费组保证了一个分区只可被消费组中的一个消费者所消费, 这意味着:
(1)在一个消费组中,一个消费者可以消费多个分区。
(2)不同的消费者消费的分区一定不会重复,所有消费者一起消费所有的 分区。
(3)在不同消费组中,每个消费组都会消费所有的分区。
(4)同一个消费组下消费者对分区是互斥的,而不同消费组之间是共 享的
Broker
一个Borker就是Kafka集群中的一个实例,或者说是一个服务单元。连接到同一个zookeeper的多个broker实例组成kafka的集群。在若干个broker中会有一个broker是leader,其余的broker为follower。leader在集群启动时候选举出来,负责和外部的通讯。当leader死掉的时候,follower们会再次通过选举,选择出新的leader,确保集群的正常工作。
Consumer Group
Kafka和其它消息系统有一个不一样的设计,在consumer之上加了一层group。同一个group的consumer可以并行消费同一个topic的消息,但是同group的consumer,不会重复消费。这就好比多个consumer组成了一个团队,一起干活,当然干活的速度就上来了。
如果同一个topic需要被多次消费,可以通过设立多个consumer group来实现。每个group分别消费,互不影响。
Topic
kafka中消息订阅和发送都是基于某个topic。比如有个topic叫做NBA赛事信息,那么producer会把NBA赛事信息的消息发送到此topic下面。所有订阅此topic的consumer将会拉取到此topic下的消息。Topic就像一个特定主题的收件箱,producer往里丢,consumer取走。
分区(Partition)
大多数消息系统,同一个topic下的消息,存储在一个队列。分区的概念就是把这个队列划分为若干个小队列,每一个小队列就是一个分区,如下图:
好处
无分区时,一个topic只有一个消费者在消费这个消息队列。采用分区后,如果有两个分区,最多两个消费者同时消费,消费的速度肯定会更快。如果觉得不够快,可以加到四个分区,让四个消费者并行消费。分区的设计大大的提升了kafka的吞吐量!!
1、一个partition只能被同组的一个consumer消费(图中只会有一个箭头指向一个partition)
2、同一个组里的一个consumer可以消费多个partition(图中第一个consumer消费Partition 0和3)
3、消费效率最高的情况是partition和consumer数量相同。这样确保每个consumer专职负责一个partition。
4、consumer数量不能大于partition数量。由于第一点的限制,当consumer多于partition时,就会有consumer闲置。
5、consumer group可以认为是一个订阅者的集群,其中的每个consumer负责自己所消费的分区
副本(Replica)
在kafka中,正本和副本都称之为副本(Repalica),但存在leader和follower之分。活跃的称之为leader,其他的是follower.
每个分区的数据都会有多份副本,以此来保证Kafka的高可用。
Topic、partition、replica的关系如下图:
topic下会划分多个partition,每个partition都有自己的replica,其中只有一个是leader replica,其余的是follower replica。
消息进来的时候会先存入leader replica,然后从leader replica复制到follower replica。只有复制全部完成时,consumer才可以消费此条消息。这是为了确保意外发生时,数据可以恢复。consumer的消费也是从leader replica读取的。
直击Kafka的心脏——控制器
在Kafka集群中会有一个或者多个broker,其中有一个broker会被选举为控制器(Kafka Controller),它负责管理整个集群中所有分区和副本的状态。当某个分区的leader副本出现故障时,由控制器负责为该分区选举新的leader副本。当检测到某个分区的***ISR集合***(所有同partiton leader数据同步的Replica集合)发生变化时,由控制器负责通知所有broker更新其元数据信息。当使用kafka-topics.sh脚本为某个topic增加分区数量时,同样还是由控制器负责分区的重新分配。
Kafka中的控制器选举的工作依赖于Zookeeper,成功竞选为控制器的broker会在Zookeeper中创建/controller这个临时(EPHEMERAL)节点,此临时节点的内容参考如下:
{"version":1,"brokerid":0,"timestamp":"1529210278988"}
控制器作用:
创建topic
bin/kafka-topics.sh --create --topic weijinzhi --zookeeper 192.168.163.130:2181 --config max.message.bytes=12800000 --config flush.messages=1 --partitions 5 --replication-factor 1
–create: 指定创建topic动作
–topic:指定新建topic的名称
–zookeeper: 指定kafka连接zk的连接url,该值和server.properties文件中的配置项{zookeeper.connect}一样
–config:指定当前topic上有效的参数值,参数列表参考文档为: Topic-level configuration
–partitions:指定当前创建的kafka分区数量,默认为1个
–replication-factor:指定每个分区的复制因子个数,默认1个
查看topic
bin/kafka-topics.sh --list --zookeeper localhost:2181 test
查看对应topic的描述信息
bin/kafka-topics.sh --zookeeper 192.168.163.130:2181 --topic test_topic --describe
删除topic
bin/kafka-topics.sh --delete --topic weijinzhi --zookeeper 192.168.163.130:2181
发送topic消息
bin/kafka-console-producer.sh --broker-list 192.168.163.130:9092 --topic test
消费topic消息
bin/kafka-console-consumer.sh --bootstrap-server 192.168.163.130:9092 --topic wjz
pom.xml配置
<!-- kafka support -->
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-kafka</artifactId>
<version>2.0.0.RELEASE</version>
</dependency>
kafka配置
# ================================== # kafka config # ================================== # kafka 配置生产者 kafka.producer.bootstrap-servers=192.168.163.130:9092 连接的kafka服务器 kafka.producer.retries=1 重试次数 kafka.producer.batch-size=16384 批处理数据大小 kafka.producer.linger.ms=100 等待发送时间 kafka.producer.buffer.memory=33554432 # kafka配置消费者 kafka.consumer.servers=192.168.163.130:9092 获取数据的kafka服务器 kafka.consumer.enable.auto.commit=true 自动提交 kafka.consumer.session.timeout=10000 连接超时时间 kafka.consumer.auto.commit.interval=100 kafka.consumer.group.id=wjz 消费组的id kafka.consumer.auto.offset.reset=latest (earliest、latest、none) kafka.consumer.concurrency=10 消费线程数)
配置producer
package intellif.config; import java.util.HashMap; import java.util.Map; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.KafkaTemplate; @Configuration public class KafkaProducerConfig { @Value("${kafka.producer.bootstrap-servers}") private String servers; @Value("${kafka.producer.retries}") private int retries; @Value("${kafka.producer.batch-size}") private int batchSize; @Value("${kafka.producer.linger.ms}") private int lingerMs; @Value("${kafka.producer.buffer.memory}") private int bufferMemory; @Bean public KafkaTemplate<String, String> getKafkaTemplate(){ Map<String, Object> config = new HashMap<String, Object>(); config.put("bootstrap.servers", servers); config.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); config.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); config.put("retries", retries); config.put("batch.size", batchSize); config.put("linger.ms", lingerMs); config.put("buffer.memory", bufferMemory); DefaultKafkaProducerFactory<String, String> fac = new DefaultKafkaProducerFactory<>(config); KafkaTemplate<String, String> template = new KafkaTemplate<>(fac); return template; } }
发送消息
kafkaTemplate.send("wjz", msg); wjz是发送的topic,msg是具体信息
消费者配置
package intellif.config; import intellif.thread.Listener; import java.util.HashMap; import java.util.Map; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.StringDeserializer; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.config.KafkaListenerContainerFactory; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; /** * @date 2018年3月29日 * @author lei.ys * @addr company * @desc */ @Configuration @EnableKafka public class KafkaConsumerConfig { @Value("${kafka.consumer.servers}") private String servers; @Value("${kafka.consumer.enable.auto.commit}") private boolean enableAutoCommit; @Value("${kafka.consumer.session.timeout}") private String sessionTimeout; @Value("${kafka.consumer.auto.commit.interval}") private String autoCommitInterval; @Value("${kafka.consumer.group.id}") private String groupId; @Value("${kafka.consumer.auto.offset.reset}") private String autoOffsetReset; @Value("${kafka.consumer.concurrency}") private int concurrency; @Bean public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.setConcurrency(concurrency); factory.getContainerProperties().setPollTimeout(1500); return factory; } public ConsumerFactory<String, String> consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfigs()); } public Map<String, Object> consumerConfigs() { Map<String, Object> propsMap = new HashMap<>(); propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers); propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit); propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval); propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout); propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset); return propsMap; } @Bean public Listener listener() { return new Listener(); } }
具体监听过程
package intellif.thread; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.springframework.kafka.annotation.KafkaListener; public class Listener { protected final Logger logger = LogManager.getLogger(Listener.class); @KafkaListener(topics = {"wjz"}) public void listen(ConsumerRecord<?, ?> record) { logger.info("new message arrived:"); try { logger.info("record:" + record.toString()); } catch (Exception e) { logger.info("doReceive kafka message error:" + e.getMessage()); } } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。