当前位置:   article > 正文

Kafka简介、单机版安装、JAVA连接Kafka、集群_java连接kafka集群

java连接kafka集群

Kafk简介 & 单机版 & JAVA连接 & 集群

1.介绍

Kafka一个分布式流媒体平台,它主要有3种功能:

  1. 发布和订阅消息流,这个功能类似于消息队列,这也是kafka归类为消息队列框架的原因

2)以容错的方式记录消息流,kafka以文件的方式来存储消息流

3)可以再消息发布的时候进行处理

2.下载

推荐使用最新版本,新版内置zookeeper,本案例使用 kafka_2.12-2.8.0.tgz

 http://kafka.apache.org/downloads
  • 1

3.安装

3.1 建立文件夹

/usr/local/kafka
  • 1

3.2 将压缩包放置到该文件夹下(可以使用xftp)

解压缩

tar zxvf  kafka_2.12-2.8.0.tgz
  • 1

3.3 修改配置文件

/usr/local/kafka/kafka_2.12-2.8.0/config/server.properties

#kafka服务器在集群中的唯一ID
broker.id=1

#如果本机则不用配置,远程连接需要配置此项
advertised.listeners=PLAINTEXT://你的主机ip:9092

#zookeeper所在地址,使用自带的不用配置此项
zookeeper.connect=zookeeper所在主机ip:2181

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

/usr/local/kafka/kafka_2.12-2.8.0/config/customer.properties

#消费组id,可以自行配置,这里使用默认
group.id=test-consumer-group
  • 1
  • 2

3.4 启动

/usr/local/kafka/kafka_2.12-2.8.0

3.4.1 启动zookeeper
bin/zookeeper-server-start.sh config/zookeeper.properties &
  • 1
3.4.2 启动kafka
bin/kafka-server-start.sh config/server.properties &
  • 1

4.测试

4.1 创建topic

//创建topic
bin/kafka-topics.sh --create --zookeeper (zookeeper主机ip默认填写127.0.0.1):2181 --topic (topic名称) --partitions 1 --replication-factor 1

//查看topic
bin/kafka-topics.sh --list --zookeeper (主机ip):2181 (topic名称)
  • 1
  • 2
  • 3
  • 4
  • 5

4.2 创建生产者

bin/kafka-console-producer.sh --broker-list (zookeeper主机ip默认填写127.0.0.1):9092 --topic (topic名称)
  • 1

4.3 创建消费者

bin/kafka-console-consumer.sh --bootstrap-server (zookeeper主机ip默认填写127.0.0.1):9092 --topic (topic名称) --from-beginning
  • 1

创建完毕后,生产者输入消息,消费者会接受到消息

5.JAVA连接kafka

5.1 pom.xml

<!-- kafka -->
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.8.0</version>
</dependency>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

5.2 Producer_Test01.java (生产者)

package com.rj.kafka.Test01;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Date;
import java.util.Properties;

/**
 * @author JZH
 * @desc 生产者
 * @time 2021-05-19-9:24
 */
public class Producer_Test01 {
	//kafka主机ip
    public static final String BROKER_LIST = "10.205.63.255:9092";
    //topic名
    public static final String TOPIC = "test";

    public static void main(String[] args) {
        //创建配置容器
        Properties properties = new Properties();
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,BROKER_LIST);

        Producer producer = null;

        try {
            //实例化producer对象
            producer = new KafkaProducer(properties);
            //发送信息
            for(int i = 0 ; i<10 ; i++){
                String msg = new Date().toString()+"这是第"+(i+1)+"条信息";
                producer.send(new ProducerRecord(TOPIC,msg));
                //模仿延迟
                //Thread.sleep(500);
                //打印信息
                System.out.println("msg = " + msg);

            }
        }catch (Exception e){

        }finally {
            producer.close();
        }


    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54

5.3 Customer_Test01.java

package com.rj.kafka.Test01;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;

/**
 * @author JZH
 * @desc
 * @time 2021-05-19-9:24
 */
public class Customer_Test01 {
    
	//kafka主机ip
    public static final String BROKER_LIST = "10.205.63.255:9092";
    //tipic
    public static final String TOPIC = "test";
    //配置的消费组id
    public static final String GROUP_ID = "test-consumer-group";

    public static void main(String[] args) {
        Properties properties = init();
        //实例化消费者对象
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties,new StringDeserializer(),new StringDeserializer());
        consumer.subscribe(Arrays.asList(TOPIC));

        //监听
        try {
            while (true){
                ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(1000));
                for (ConsumerRecord<String,String> record : records) {
                    System.out.println(record.offset()+"--->"+record.value());
                    System.out.println("=======================================");
                }
            }
        }
        finally {
            consumer.close();
        }

    }

    /**
     * 初始化配置
     * @return
     */
    public static Properties init(){
        Properties properties = new Properties();
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,BROKER_LIST);
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,GROUP_ID);
        //properties.put(ConsumerConfig.CLIENT_ID_CONFIG,"consumer.client.id.demo");
        return properties;
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64

以上为Kafka单机版配置 & JAVA连接Kafka

6.Kafka集群

6.1 需要三台服务器

192.168.128.001 
196.168.128.002
196.168.128.003
  • 1
  • 2
  • 3

将每台服务器上的Kafka中server.properties修改一下

将001号服务器上的zookeeper作为主

其他服务器上的kafka都使用001号上的zookeeper

#001
broker.id=001
host.name=192.168.128.001
zookeeper.connection=192.168.128.001:2181

#002
broker.id=002
host.name=192.168.128.002
zookeeper.connection=192.168.128.001:2181

#003
broker.id=003
host.name=192.168.128.003
zookeeper.connection=192.168.128.001:2181
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

配置完成后

1)开启001服务器上的zookeeper

2)依次开启3台服务器上的Kafka

6.2 测试

任意服务器上创建topic

因为有3台服务器,所以这里–replication-factor 是3

bin/kafka-topics.sh --create --zookeeper 192.168.128.001:2181 --replication-factor 3 -partitions 1 --topic 1test
  • 1

002开启一个producer向003发送消息

bin/kafka-console-producer.sh --broker-list 192.168.128.003:9092 --topic 1test
this is a message to 132 broker
  • 1
  • 2

停止003的kafka,此时可用的服务器只剩下001,002 因为 主服务器是001所以并没有进行选举

在001上开启一个consumer

bin/kafka-console-consumer.sh --zookeeper 192.168.128.001:2181 --topic 1test --from-beginning
this is a message to 132 broker
  • 1
  • 2

发现停掉了003,依然能通过zookeeper从001和002的kafka上收到发往132的消息。

填过的坑

1.java连接Kafka生产者,消费者没反应,也不报错

解决方案:没有正确配置server.properties中的 advertised.listeners ,这里的ip使用ifconfig查询就可以,使用127.0.0.1经过测试无效

2.报jdk没有找到错误

解决方案:先查看linux是否已经配置好jdk

//Linux管理员模式下使用命令
sudo gedit  /etc/profile

//进入后在文件最后一行查看 JAVA_HOME jdk地址是否正确
  • 1
  • 2
  • 3
  • 4

​ 编写时间2021年5月19日

接Kafka生产者,消费者没反应,也不报错

解决方案:没有正确配置server.properties中的 advertised.listeners ,这里的ip使用ifconfig查询就可以,使用127.0.0.1经过测试无效

2.报jdk没有找到错误

解决方案:先查看linux是否已经配置好jdk

//Linux管理员模式下使用命令
sudo gedit  /etc/profile

//进入后在文件最后一行查看 JAVA_HOME jdk地址是否正确
  • 1
  • 2
  • 3
  • 4
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小蓝xlanll/article/detail/629390
推荐阅读
相关标签
  

闽ICP备14008679号