赞
踩
对于后端开发而已,不宜在环境安装方面耗费宝贵的精力。相对于tar包安装等方式,docker安装省时省力。下面主要介绍,如何使用docker安装kafka。
事实上,仅需如下两条命令即可完成kafka安装。
docker run -d --restart=always --log-driver json-file --log-opt max-size=100m --log-opt max-file=2 --name zookeeper -p 2181:2181 -v /etc/localtime:/etc/localtime wurstmeister/zookeeper
docker run -d --restart=always --log-driver json-file --log-opt max-size=100m --log-opt max-file=2 --name kafka -p 9092:9092 -e KAFKA_BROKER_ID=0 -e KAFKA_ZOOKEEPER_CONNECT=192.168.1.10:2181/kafka -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.1.10:9092 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 -v /etc/localtime:/etc/localtime wurstmeister/kafka
-e KAFKA_BROKER_ID=0 在kafka集群中,kafka的唯一ID
-e KAFKA_ZOOKEEPER_CONNECT=172.16.0.13:2181/kafka 配置zookeeper管理kafka的路径
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.1.10:9092 把kafka的地址端口注册给zookeeper
-e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 配置kafka的监听端口
-v /etc/localtime:/etc/localtime 容器同步宿主机时间,docker基本都要配置此参数
安装过程可能会遇到一些问题,我这里简单说几点
vim /etc/docker/daemon.json
,修改为:{ "registry-mirrors":["https://n62k26ut.mirror.aliyuncs.com"] }
service docker restart
将配置好的消费者工厂类DefaultKafkaConsumerFactory注册到kafka监听工厂KafkaListenerContainerFactory,具体代码如下所示:
@Configuration @EnableKafka public class KafkaConfiguration { //定义消费者组:当更换此选项,offset会从0开始 @Value("${person.groupId}") private String groupId; @Bean @ConditionalOnMissingBean(name = "kafkaListenerContainerFactory") KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); //并发数量 factory.setConcurrency(2); factory.setConsumerFactory(consumerFactory()); //批量监听:一次消费多条数据 factory.setBatchListener(true); factory.getContainerProperties().setPollTimeout(20000); return factory; } @Bean public Map<String, Object> consumerConfigs() { Map<String, Object> propsMap = new HashMap<>(); propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.1.10:21005"); propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000"); propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000"); propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); //Key序列化为字符串 propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); //Value序列化为字符串 propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500); propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); return propsMap; } private ConsumerFactory<String, String> consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfigs()); } }
消费起来也非常简单,直接使用kafkaListener即可对相关topics进行监听
@Component @Slf4j public class KafkaReceiver { @KafkaListener(topics = {"xxx"}, containerFactory = "kafkaListenerContainerFactory") public void process(List<ConsumerRecord<?, ?>> records) { try { for (ConsumerRecord<?, ?> record : records) { Optional<?> kafkaMessage = Optional.ofNullable(record.value()); if (kafkaMessage.isPresent()) { //实际的消息处理逻辑写在此处 } } } catch (Exception e) { log.error( e.getMessage(), e); } } }
生产者配置实际上就是配置kafkaTemplate,比消费者代码更为简单。
package xyz.yq56.kafka.common.config; import java.util.Map; import org.apache.commons.collections4.map.HashedMap; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.config.KafkaListenerContainerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.ProducerFactory; import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; import com.google.common.collect.Maps; @Configuration public class KafkaConfig { // ******************************************* 生产者配置 // **************************************************// @Bean public KafkaTemplate<String, String> kafkaTemplate() { return new KafkaTemplate<>(producerFactory()); } @Bean public ProducerFactory<String, String> producerFactory() { return new DefaultKafkaProducerFactory<>(producerConfigs()); } @Bean public Map<String, Object> producerConfigs() { Map<String, Object> props = Maps.newHashMap(); props.put(ProducerConfig.ACKS_CONFIG, "0"); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "122.51.21.35:9092"); props.put(ProducerConfig.RETRIES_CONFIG, 3); props.put(ProducerConfig.BATCH_SIZE_CONFIG, 150); props.put(ProducerConfig.LINGER_MS_CONFIG, 1); props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); return props; } // ******************************************* 消费者配置 // **************************************************// @Bean(value = "kafkaListenerContainerFactory") public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); // 并发数量 factory.setConcurrency(2); factory.setConsumerFactory(consumerFactory()); // 批量监听:一次消费多条数据 factory.setBatchListener(true); factory.getContainerProperties().setPollTimeout(20000); return factory; } public DefaultKafkaConsumerFactory<String, String> consumerFactory() { return new DefaultKafkaConsumerFactory<>(ConsumerConfig()); } public Map<String, Object> ConsumerConfig() { Map<String, Object> propsMap = new HashedMap<>(); propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "122.51.21.35:9092"); propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000"); propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000"); propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, "123465"); // Key序列化为字符串 propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); // Value序列化为字符串 propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500); propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); return propsMap; } }
注入kafkaTemplate,然后调用send方法即可
package xyz.yq56.kafka.mq; import javax.annotation.Resource; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Component; import lombok.extern.slf4j.Slf4j; @Component @Slf4j public class KafkaProducer { @Resource private KafkaTemplate<String, String> kafkaTemplate; public void send(String topic, String message) { kafkaTemplate.send(topic, message); log.info("发送消息"); } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。