当前位置:   article > 正文

kafka 快速入门, 安装与使用(windows)_windows kafka3.7 kraft

windows kafka3.7 kraft

介绍和概念

消息队列, 由scala 和java 编写, 是一种高吞吐量的分布式发布与订阅消息系统

概念

	消息队列简称:  MQ
		Message: 多台设备生产通信的数据名称, 可以是视频, 文本, 音频等等
		Queue: 	一种特殊的线性表, 满足先进先出原则
  • 1
  • 2
  • 3

种类

	p2p: peer to peer 
    pub/sub: 发布与订阅
  • 1
  • 2

p2p


pub/sub

区别

共同点:
	消息生产者将消息生产到队列, 消费组从队列中消费消息
不同点:
	p2p: 一个生产者的消息只能被一个消费组消费, 如: 打电话
	pub/sub: 每个消息都可以有多个消费者, 消息存在队列, 使用偏移量方式来管理消息
  • 1
  • 2
  • 3
  • 4
  • 5

示例

作用

存储能力, 容器一般是队列
消息入列, 生产
消费出列, 消费
  • 1
  • 2
  • 3

常见的消息队列

rabbitmq: erlang 编写, 支持负载均衡, 数据持久化, pub/sub/p2p
redis: kv 的nosql 缓存数据库, 也支持pub/sub, 对于短消息(小于10kb)的性能高于 rabbitmq
zeromq: 轻量级的mq, p2p
activemq: jms 实现, p2p, 持久化, 分布式事务
kafka/jafka: 高性能跨语言分布式事务, 基于发布-订阅的全分布式支持持久化, 可以离线或者实时处理数据
racketmq: 纯java 实现, 发布-订阅, 本地事务和分布式事务
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

kafka 服务安装

特点

 1. rabbitmq: erlang 编写, 支持负载均衡, 数据持久化, pub/sub/p2p
 2. zeromq: 轻量级的mq, p2p
 3. activemq: jms 实现, p2p, 持久化, 分布式事务
 4. kafka/jafka: 高性能跨语言分布式事务, 基于发布-订阅的全分布式支持持久化, 可以离线或者实时处理数据
 5. racketmq: 纯java 实现, 发布-订阅, 本地事务和分布式事务
  • 1
  • 2
  • 3
  • 4
  • 5

组件

 1. topic: 主题, kafka 处理的消息分为不同的分类, 按照主题来划分
 2.  broker: 消息服务器的代理, kakfa 集群中的一个节点一般称为 broket, 主要来存储消息, 存在硬盘中
 3. partition: 分区, topic 的物理上的分组, 一个topic 在broket上被分为一个或多个partition,
           分区在创建主题时指定
4. message: 消息, 通信的基本单元, 每个消息属于某一个 partition
5. producer: 生产者, 消息和数据都是由这个组件产生, 由它发送到kafka 集群中
6. consumer: 消费者, 消息和数据都是由这个组件来消费
7. zookeeper: zk做分布式协调, 在kafka 2.8 后不在必须依赖
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

下载安装

下载地址: https://kafka.apache.org/downloads 这里选择 2.13, 下载的版本是 3.x
官方下载慢, 提供阿里镜像地址和阿里云网盘下载地址, 已经放到文末
解压到指定目录即可, 下载的是 tgz 文件 :
kafka 下载页面
linux: tar -zvxf kafka-3.6.0.tgz -C /home/kafka (这里解压到 /home/kafka 目录)
windows: 使用解压工具即可, 同时使用 windows 安装

目录说明

bin:	默认存放linux 系统的一些脚本
	windows: 存放pc 系统的脚步
config: kafka 配置文件存放目录
	kraft: kraft 模式的配置文件
libs: 依赖目录
licenses:  许可证存放
logs: 服务日志目录
site-docs: kafka 文档
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

配置修改

1. 修改config/zookeeper.properties, 可以自定义端口号, 一般默认即可
  	clientPort: 端口号
2. 修改config/server.properties,
			listener: plaintext://[ip]:9092 , 可以自定义ip
      advertised.listener:plaintext://ip:9092, 这个后面不在本地部署时需要配置
      zookeeper.connect: ip:2181 , 修改为zookeeper.properties 中的端口

# 本地安装可使用默认, 如果存在端口占用, 修改端口即可, 其他默认即可
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

启动测试

这里因为系统是 windows , 所以相关命令在 bin/windows 下面, 在安装目录下打开 cmd

启动zookeeper 服务

bin\windows\zookeeper-server-start.bat  config\zookeeper.properties
  • 1

zookeeper启动示例

启动 kafka 服务

bin\window\kafka-server-start.bat  config\server.properties
  • 1

kafka 启动示例

创建主题(topic)

bin\windows\kafka-topics.bat  --create  --if-not-exists --topic  test  --bootstrap-server  localhost:9092

# 查看主题列表
bin\windows\kafka-topics.bat  --list --bootstrap-server locahost:9092 

# --if-not-exists: 当创建的 test 主题不存在时才创建
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

主题创建和查看

生产者-消费者客户端启动

#生产者:
bin\windows\kafka-console-producer.bat  --bootstrap-server  localhost:9092  --topic test

#消费者:
bin\windows\kafka-console-consumer.bat  --bootstrap-server  localhost:9092  --topic test
  • 1
  • 2
  • 3
  • 4
  • 5

生产者-发送消息
生产者发送消息
消费者-消费消息
消费者消费消息

不依赖 zookeeper 启动(kraft 模式)

创建uuid

#创建随机uuid 
bin\windows\kafka-storage  random
  • 1
  • 2

创建随机 uuid

格式化

bin\windows\kafka-storage.bat  format  -t  nkA_YUlRQEmIdLk4dr35xA <uuid>  -c  config\kraft\server.properties
  • 1

格式化

使用 kraft 模式启动

bin\windows\kafka-server-start.bat  config\kraft\server.properties
  • 1

kraft 模式启动

生产者和消费者和原来一样使用

API 的使用

依赖

<dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kafka-clients</artifactId>
  <version>3.5.1</version>
</dependency>
  • 1
  • 2
  • 3
  • 4
  • 5

生产消息

package org.example;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;
import java.util.Scanner;

public class KafkaProducerDemo {

    public static void main(String[] args) throws InterruptedException {
        Scanner sc = new Scanner(System.in);
        Producer<String, Object> producer = createProducer();
        String mess = null;
        boolean flag = true;
        while (flag) {
            System.out.print("生产消息(输入exit 退出): ");
            mess = sc.nextLine();
            if (mess != null && (flag = (!"exit".equals(mess)))) {
                producer.send(new ProducerRecord<>("test", mess));
                Thread.sleep(1000);
                System.out.println("send message success....");
            }
        }

    }

    /**
     * 创建生产者
     *
     * @return
     */
    public static Producer<String, Object> createProducer() {
        // 使用生产者配置
        Properties properties = buildProducerProperties();
        // 创建生产者对象
        KafkaProducer<String, Object> producer = new KafkaProducer<>(properties);


        return producer;
    }


    /**
     * 构建生产者配置
     *
     * @return
     */
    public static Properties buildProducerProperties() {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "127.0.0.1:9092");
        properties.put("acks", "all");
        properties.put("retries", 0);
        properties.put("batch.size", 16384);
        properties.put("linger.ms", 1);
        properties.put("buffer.memory", 33554432);
        properties.put("key.serializer", StringSerializer.class.getName());
        properties.put("value.serializer", StringSerializer.class.getName());
        return properties;
    }


}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66

消费消息

package org.example;

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.util.Arrays;
import java.util.Properties;

public class KafkaConsumerDemo {

    public static void main(String[] args) {

        //创建消费者, 并消费消息
        Consumer<String, Object> consumer = createConsumer();
        while (true) {
            ConsumerRecords<String, Object> records = consumer.poll(100);
            for (ConsumerRecord<String, Object> record : records) {
                System.out.println("消费消息: " + record.value());
            }
        }
    }

    /**
     * 创建消费者
     *
     * @return
     */
    public static Consumer<String, Object> createConsumer() {
        // 使用消费者配置
        Properties properties = buildConsumerProperties();
        // 创建消费者对象
        Consumer<String, Object> consumer = new KafkaConsumer<>(properties);
        // 订阅 test 主题
        consumer.subscribe(Arrays.asList("test"));
        // 返回消费者
        return consumer;
    }

    /**
     * 构建消费者配置
     *
     * @return
     */
    public static Properties buildConsumerProperties() {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "127.0.0.1:9092");
        properties.put("group.id", "test");
        properties.put("enable.auto.commit", false);
        properties.put("auto.commit.interval.ms", "1000");
        properties.put("session.timeout.ms", "30000");
        properties.put("key.deserializer", StringDeserializer.class.getName());
        properties.put("value.deserializer", StringDeserializer.class.getName());
        return properties;
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59

springboot 集成 kafka 以及自定义注解实现消息消费可参考: Springboot 集成 Kafka

下载地址

阿里云镜像站-apache-kafka
阿里云盘-kafka

本文内容由网友自发贡献,转载请注明出处:https://www.wpsshop.cn/w/黑客灵魂/article/detail/1000715
推荐阅读
相关标签
  

闽ICP备14008679号