当前位置:   article > 正文

利用docker和docker-compose部署单机kafka_docker-compose kafka 设置密码

docker-compose kafka 设置密码

前提

  1. docker
  2. docker-compose

其中docker-compose不是必须的,单单使用docker也是可以的,这里主要介绍docker和docker-compose两种方式

docker部署

docker部署kafka非常简单,只需要两条命令即可完成kafka服务器的部署。

  1. docker run -d --name zookeeper -p 2181:2181 wurstmeister/zookeeper
  2. docker run -d --name kafka -p 9092:9092 -e KAFKA_BROKER_ID=0 -e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 --link zookeeper -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.1.60(机器IP):9092 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 -t wurstmeister/kafka

由于kafka是需要和zookeeper共同工作的,所以需要部署一个zookeeper,但有了docker这对部署来说非常轻松.
可以通过docker ps查看到两个容器的状态,这里不再展示.

接下来可以进行生产者和消费者的尝试

通过kafka自带工具生产消费消息测试

  1. 首先,进入到kafka的docker容器中
    docker exec -it kafka sh
    
  2. 运行消费者,进行消息的监听

    kafka-console-consumer.sh --bootstrap-server 192.168.1.60:9092 --topic kafeidou --from-beginning
    
  3. 打开一个新的ssh窗口,同样进入kafka的容器中,执行下面这条命令生产消息

    kafka-console-producer.sh --broker-list 192.168.1.60(机器IP):9092 --topic kafeidou
    

    输入完这条命令后会进入到控制台,可以输入任何想发送的消息,这里发送一个hello

    1. >>
    2. >hello
    3. >
    4. >
    5. >
  4. 可以看到,在生产者的控制台中输入消息后,消费者的控制台立刻看到了消息

到目前为止,一个kafka完整的hello world就完成了.kafka的部署加上生产者消费者测试.

通过java代码进行测试

  1. 新建一个maven项目并加入以下依赖
    1. <dependency>
    2. <groupId>org.apache.kafka</groupId>
    3. <artifactId>kafka-clients</artifactId>
    4. <version>2.1.1</version>
    5. </dependency>
    6. <dependency>
    7. <groupId>org.apache.kafka</groupId>
    8. <artifactId>kafka_2.11</artifactId>
    9. <version>0.11.0.2</version>
    10. </dependency>
  2. 生产者代码
    producer.java
  1. import org.apache.kafka.clients.producer.*;
  2. import java.util.Date;
  3. import java.util.Properties;
  4. import java.util.Random;
  5. public class HelloWorldProducer {
  6. public static void main(String[] args) {
  7. long events = 30;
  8. Random rnd = new Random();
  9. Properties props = new Properties();
  10. props.put("bootstrap.servers", "192.168.1.60:9092");
  11. props.put("acks", "all");
  12. props.put("retries", 0);
  13. props.put("batch.size", 16384);
  14. props.put("linger.ms", 1);
  15. props.put("buffer.memory", 33554432);
  16. props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  17. props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  18. props.put("message.timeout.ms", "3000");
  19. Producer<String, String> producer = new KafkaProducer<>(props);
  20. String topic = "kafeidou";
  21. for (long nEvents = 0; nEvents < events; nEvents++) {
  22. long runtime = new Date().getTime();
  23. String ip = "192.168.2." + rnd.nextInt(255);
  24. String msg = runtime + ",www.example.com," + ip;
  25. System.out.println(msg);
  26. ProducerRecord<String, String> data = new ProducerRecord<String, String>(topic, ip, msg);
  27. producer.send(data,
  28. new Callback() {
  29. public void onCompletion(RecordMetadata metadata, Exception e) {
  30. if(e != null) {
  31. e.printStackTrace();
  32. } else {
  33. System.out.println("The offset of the record we just sent is: " + metadata.offset());
  34. }
  35. }
  36. });
  37. }
  38. System.out.println("send message done");
  39. producer.close();
  40. System.exit(-1);
  41. }
  42. }
  1. 消费者代码
    consumer.java
  1. import java.util.Arrays;
  2. import java.util.Properties;
  3. import org.apache.kafka.clients.consumer.Consumer;
  4. import org.apache.kafka.clients.consumer.ConsumerConfig;
  5. import org.apache.kafka.clients.consumer.ConsumerRecord;
  6. import org.apache.kafka.clients.consumer.ConsumerRecords;
  7. import org.apache.kafka.clients.consumer.KafkaConsumer;
  8. import org.apache.kafka.common.serialization.StringDeserializer;
  9. public class HelloWorldConsumer2 {
  10. public static void main(String[] args) {
  11. Properties props = new Properties();
  12. props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.60:9092");
  13. props.put(ConsumerConfig.GROUP_ID_CONFIG ,"kafeidou_group") ;
  14. props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
  15. props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
  16. props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  17. props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  18. props.put("auto.offset.reset", "earliest");
  19. Consumer<String, String> consumer = new KafkaConsumer<>(props);
  20. consumer.subscribe(Arrays.asList("kafeidou"));
  21. while (true) {
  22. ConsumerRecords<String, String> records = consumer.poll(1000);
  23. for (ConsumerRecord<String, String> record : records) {
  24. System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
  25. }
  26. }
  27. }
  28. }
  1. 分别运行生产者和消费者即可
    生产者打印消息
    1. 1581651496176,www.example.com,192.168.2.219
    2. 1581651497299,www.example.com,192.168.2.112
    3. 1581651497299,www.example.com,192.168.2.20
    消费者打印消息
    1. offset = 0, key = 192.168.2.202, value = 1581645295298,www.example.com,192.168.2.202
    2. offset = 1, key = 192.168.2.102, value = 1581645295848,www.example.com,192.168.2.102
    3. offset = 2, key = 192.168.2.63, value = 1581645295848,www.example.com,192.168.2.63
    源码地址:FISHStack/kafka-demo

通过docker-compose部署kafka

首先创建一个docker-compose.yml文件

  1. version: '3.7'
  2. services:
  3. zookeeper:
  4. image: wurstmeister/zookeeper
  5. volumes:
  6. - ./data:/data
  7. ports:
  8. - 2182:2181
  9. kafka9094:
  10. image: wurstmeister/kafka
  11. ports:
  12. - 9092:9092
  13. environment:
  14. KAFKA_BROKER_ID: 0
  15. KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.1.60:9092
  16. KAFKA_CREATE_TOPICS: "kafeidou:2:0" #kafka启动后初始化一个有2个partition(分区)0个副本名叫kafeidou的topic
  17. KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
  18. KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
  19. volumes:
  20. - ./kafka-logs:/kafka
  21. depends_on:
  22. - zookeeper

部署起来很简单,在docker-compose.yml文件的目录下执行docker-compose up -d就可以了,测试方式和上面的一样。
这个docker-compose做的东西比上面docker方式部署的东西要多一些

  1. 数据持久化,在当前目录下挂在了两个目录分别存储zookeeper和kafka的数据,当然在docker run命令中添加 -v 选项也是可以做到这样的效果的
  2. kafka在启动后会初始化一个有分区的topic,同样的,docker run的时候添加-e KAFKA_CREATE_TOPICS=kafeidou:2:0也是可以做到的。

总结:优先推荐docker-compose方式部署

为什么呢?

因为单纯使用docker方式部署的话,如果有改动(例如:修改对外开放的端口号)的情况下,docker需要把容器停止docker stop 容器ID/容器NAME,然后删除容器docker rm 容器ID/容器NAME,最后启动新效果的容器docker run ...

而如果在docker-compose部署的情况下如果修改内容只需要修改docker-compose.yml文件对应的地方,例如2181:2181改成2182:2182,然后再次在docker-compose.yml文件对应的目录下执行docker-compose up -d就能达到更新后的效果。  

 

> 始发于 四颗咖啡豆 发布! 
> 关注公众号->[四颗咖啡豆] 获取最新内容  
 

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/不正经/article/detail/651934
推荐阅读
相关标签
  

闽ICP备14008679号