赞
踩
这篇文章简单介绍如何在ubuntu上安装kafka,并使用kafka完成消息的发送和接收。
访问kafka官网Apache Kafka,然后点击快速开始
紧接着,点击Download
最后点击下载链接下载安装包
如果下载缓慢,博主已经把安装包上传到百度网盘:
链接:https://pan.baidu.com/s/1nZ1duIt64ZVUsimaQ1meZA?pwd=3aoh
提取码:3aoh
--来自百度网盘超级会员V3的分享
经过上一步下载完成后,按照页面的提示启动kafka
1、通过远程连接工具,如finalshell、xshell上传kafka_2.13-3.6.0.tgz到服务器上的usr目录
2、切换到usr目录,解压kafka_2.13-3.6.0.tgz
- cd /usr
-
- tar -zxzf kafka_2.13-3.6.0.tgz
3、启动zookeeper
修改配置文件confg/zookeeper.properties,修改一下数据目录
dataDir=/usr/local/zookeeper
然后通过以下命令启动kafka自带的zookeeper
bin/zookeeper-server-start.sh config/zookeeper.properties
4、启动kafka
修改配置文件confg/server.properties,修改一下kafka保存日志的目录
log.dirs=/usr/local/kafka/logs
然后新开一个连接窗口,通过以下命令启动kafka
bin/kafka-server-start.sh config/server.properties
bin/kafka-topics.sh --create --topic hello --bootstrap-server localhost:9092
往刚刚创建的topic里发送消息,可以一次性发送多条消息,点击Ctrl+C完成发送
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic hello
新开一个连接窗口,在命令行输入以下命令拉取topic为hello上的消息
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic hello
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic hello
指定从第几条消息开始消费,这里--offset参数设置的偏移量是从0开始的。
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --partition 0 --offset 1 --topic hello
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --consumer-property group.id=helloGroup --topic hello
通过maven官网搜索kafka的maven依赖版本
https://central.sonatype.com/search?q=kafkahttps://central.sonatype.com/search?q=kafka然后通过IntelliJ IDEA创建一个maven项目kafka,在pom.xml中添加kafka的依赖
- <?xml version="1.0" encoding="UTF-8"?>
- <project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
-
- <groupId>org.example</groupId>
- <artifactId>kafka</artifactId>
- <version>1.0-SNAPSHOT</version>
-
- <properties>
- <maven.compiler.source>8</maven.compiler.source>
- <maven.compiler.target>8</maven.compiler.target>
- </properties>
-
- <dependencies>
- <dependency>
- <groupId>org.apache.kafka</groupId>
- <artifactId>kafka_2.12</artifactId>
- <version>3.6.0</version>
- </dependency>
- </dependencies>
- </project>
- package producer;
-
- import org.apache.kafka.clients.producer.KafkaProducer;
- import org.apache.kafka.clients.producer.Producer;
- import org.apache.kafka.clients.producer.ProducerConfig;
-
- import java.util.Properties;
-
- /**
- * 消息生产者工厂类
- * @author heyunlin
- * @version 1.0
- */
- public class MessageProducerFactory {
-
- private static final String BOOTSTRAP_SERVERS = "192.168.254.128:9092";
-
- public static Producer<String, String> getProducer() {
- //PART1:设置发送者相关属性
- Properties props = new Properties();
- // 此处配置的是kafka的端口
- props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
- // 配置key的序列化类
- props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
- // 配置value的序列化类
- props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
-
- return new KafkaProducer<>(props);
- }
-
- }
- package producer;
-
- import org.apache.kafka.clients.producer.Callback;
- import org.apache.kafka.clients.producer.Producer;
- import org.apache.kafka.clients.producer.ProducerRecord;
- import org.apache.kafka.clients.producer.RecordMetadata;
-
- /**
- * @author heyunlin
- * @version 1.0
- */
- public class MessageProducer {
-
- private static final String TOPIC = "hello";
-
- public static void main(String[] args) {
- ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, "1", "Message From Producer.");
- Producer<String, String> producer = MessageProducerFactory.getProducer();
-
- // 同步发送消息
- producer.send(record);
-
- // 异步发送消息
- producer.send(record, new Callback() {
- @Override
- public void onCompletion(RecordMetadata recordMetadata, Exception e) {
- String topic = recordMetadata.topic();
- long offset = recordMetadata.offset();
- int partition = recordMetadata.partition();
- String message = recordMetadata.toString();
-
- System.out.println("topic = " + topic);
- System.out.println("offset = " + offset);
- System.out.println("message = " + message);
- System.out.println("partition = " + partition);
- }
- });
-
- // 加上这行代码才会发送消息
- producer.close();
- }
-
- }
- package consumer;
-
- import org.apache.kafka.clients.consumer.Consumer;
- import org.apache.kafka.clients.consumer.ConsumerConfig;
- import org.apache.kafka.clients.consumer.KafkaConsumer;
-
- import java.util.Properties;
-
- /**
- * 消息生产者工厂类
- * @author heyunlin
- * @version 1.0
- */
- public class MessageConsumerFactory {
-
- private static final String BOOTSTRAP_SERVERS = "192.168.254.128:9092";
-
- public static Consumer<String, String> getConsumer() {
- //PART1:设置发送者相关属性
- Properties props = new Properties();
-
- //kafka地址
- props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
- //每个消费者要指定一个group
- props.put(ConsumerConfig.GROUP_ID_CONFIG, "helloGroup");
- //key序列化类
- props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
- //value序列化类
- props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
-
- return new KafkaConsumer<>(props);
- }
-
- }
- package consumer;
-
- import org.apache.kafka.clients.consumer.Consumer;
- import org.apache.kafka.clients.consumer.ConsumerRecord;
- import org.apache.kafka.clients.consumer.ConsumerRecords;
-
- import java.time.Duration;
- import java.util.Collections;
-
- /**
- * @author heyunlin
- * @version 1.0
- */
- public class MessageConsumer {
-
- private static final String TOPIC = "hello";
-
- public static void main(String[] args) {
- Consumer<String, String> consumer = MessageConsumerFactory.getConsumer();
- consumer.subscribe(Collections.singletonList(TOPIC));
-
- while (true) {
- ConsumerRecords<String, String> records = consumer.poll(Duration.ofNanos(100));
-
- for (ConsumerRecord<String, String> record : records) {
- System.out.println(record.key() + ": " + record.value());
- }
-
- // 提交偏移量,避免消息重复推送
- consumer.commitSync(); // 同步提交
- // consumer.commitAsync(); // 异步提交
- }
- }
-
- }
然后通过IntelliJ IDEA创建一个springboot项目springboot-kafka,在pom.xml中添加kafka的依赖
- <dependency>
- <groupId>org.springframework.kafka</groupId>
- <artifactId>spring-kafka</artifactId>
- </dependency>
然后修改application.yml,添加kafka相关配置
spring: kafka: bootstrap-servers: 192.168.254.128:9092 producer: acks: 1 retries: 3 batch-size: 16384 properties: linger: ms: 0 buffer-memory: 33554432 key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer consumer: group-id: helloGroup enable-auto-commit: false auto-commit-interval: 1000 auto-offset-reset: latest properties: request: timeout: ms: 18000 session: timeout: ms: 12000 key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
- package com.example.springboot.kafka.producer;
-
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.kafka.core.KafkaTemplate;
- import org.springframework.web.bind.annotation.RequestMapping;
- import org.springframework.web.bind.annotation.RequestMethod;
- import org.springframework.web.bind.annotation.RestController;
-
- /**
- * @author heyunlin
- * @version 1.0
- */
- @RestController
- @RequestMapping(path = "/producer", produces = "application/json;charset=utf-8")
- public class KafkaProducer {
-
- private final KafkaTemplate<String, Object> kafkaTemplate;
-
- @Autowired
- public KafkaProducer(KafkaTemplate<String, Object> kafkaTemplate) {
- this.kafkaTemplate = kafkaTemplate;
- }
-
- @RequestMapping(value = "/sendMessage", method = RequestMethod.GET)
- public String sendMessage(String message) {
- kafkaTemplate.send("hello", message);
-
- return "发送成功~";
- }
-
- }
- package com.example.springboot.kafka.consumer;
-
- import org.apache.kafka.clients.consumer.ConsumerRecord;
- import org.springframework.kafka.annotation.KafkaListener;
- import org.springframework.stereotype.Component;
-
- /**
- * @author heyunlin
- * @version 1.0
- */
- @Component
- public class KafkaConsumer {
-
- @KafkaListener(topics = "hello")
- public void receiveMessage(ConsumerRecord<String, String> record) {
- String topic = record.topic();
- long offset = record.offset();
- int partition = record.partition();
-
- System.out.println("topic = " + topic);
- System.out.println("offset = " + offset);
- System.out.println("partition = " + partition);
- }
-
- }
然后访问网址http://localhost:8080/producer/sendMessage?message=hello往topic为hello的消息队列发送消息。控制台打印了参数,成功监听到发送的消息。
文章涉及的项目已经上传到gitee,按需获取~
Java中操作kafka的基本项目https://gitee.com/he-yunlin/kafka.git
springboot整合kafka案例项目https://gitee.com/he-yunlin/springboot-kafka.git
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。