当前位置:   article > 正文

Kafka生产者与消费者练习测试题_kafka练习

kafka练习

需求数据如下:

student1,11100
student2,11200
student3,11300
student4,11400
student5,11500
student6,11600
student7,11700
student8,11800
student9,11900
student10,12000
student11,12100
student12,12200
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

一、通过Producer API发送到kafka中的【topicHW】
注:topic自行创建
二、创建一个Consumer API程序,对kafka集群中的【topicHW】进行消费。
处理消费到的数据,将消费到的数据发送到另外一个名为topicDEAL 的topic中 ,要求如下:

  1. 创建名为【topicDEAL】的topic ,其拥有3分区2副本,将消费到的数据中以逗号为分隔的第二个值+1500,其余不变,放入到topicDEAL这个topic中
    最终要求结果如下 :
student1,12600
student2,12700
student3,12800
student4,12900
...
  • 1
  • 2
  • 3
  • 4
  • 5

注意,如果没有启动consumer就直接开始生产数据,则无法读取到刚刚生产到的数据,如果出现了这种情况,需要重新运行一下生产者的代码即可。
topicDEAL 这个topic中的数据消费在moba中处理。

一、启动环境

关闭Kafka

kafka-server-stop.sh stop
  • 1

开启Kafka

[root@hadoop11 kafka0.11]# kafka-server-start.sh -daemon config/server.properties
  • 1

注意:启动目录的绝对路径和相对路径


环境为三台Kafka集群
image.png

二、创建

1.创建topicHW,成功如下:

[root@hadoop11 kafka0.11]# kafka-topics.sh  --zookeeper hadoop11:2181  --create   --topic topicHW --partitions 3 --replication-factor 2
Created topic "topicHW".

  • 1
  • 2
  • 3

【报错】创建topicHW时出现如下报错:

[root@hadoop11 ~]# kafka-topics.sh  --zookeeper hadoop11:2181  --create   --topic topicHW --partitions 3 --replication-factor 2
Error while executing topic command : replication factor: 2 larger than available brokers: 1

  • 1
  • 2
  • 3

错误描述:副本数大于brokers数,排除了因为三台节点未完全启动的可能。
解决方案:删除已经存在的所有topic,将jps中已经运行的ConsoleConsumer进程kill -9。

[root@hadoop11 ~]# kafka-topics.sh  --zookeeper hadoop11:2181  --list
__consumer_offsets
topicA
topicHW
[root@hadoop11 ~]# kafka-topics.sh  --zookeeper hadoop11:2181  --delete   --topic topicHW
Topic topicHW is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.
[root@hadoop11 ~]# kafka-topics.sh  --zookeeper hadoop11:2181  --delete   --topic topicA
Topic topicA is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.
[root@hadoop11 ~]# kafka-topics.sh  --zookeeper hadoop11:2181  --list
__consumer_offsets
topicA - marked for deletion

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
[root@hadoop11 ~]# xcall.sh jps
要执行的命令是:jps
-----------------------hadoop11---------------------
51696 Jps
39028 ConsoleConsumer
42309 ConsoleConsumer
42118 QuorumPeerMain
41098 ConsoleProducer
45691 ConsoleConsumer
-----------------------hadoop12---------------------
10040 QuorumPeerMain
12872 Jps
-----------------------hadoop13---------------------
3512 QuorumPeerMain
5373 Jps
[root@hadoop11 ~]# kill -9 ^C
[root@hadoop11 ~]# kill -9 39028
[root@hadoop11 ~]# kill -9 42309
[1]   已杀死               kafka-console-consumer.sh --bootstrap-server hadoop11:9092 --topic topicHW
[root@hadoop11 ~]# kill -9 41098
[3]-  已杀死               kafka-console-consumer.sh --bootstrap-server hadoop11:9092 --topic topicHW
[root@hadoop11 ~]# kill -9 45691
[2]-  已杀死               kafka-console-producer.sh --broker-list hadoop11:9092 --topic topicHW
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23

二、通过Producer API发送到kafka中的【topicHW】

package hw;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.HashMap;
import java.util.Map;

public class CustomProducer {
    public static void main(String[] args) throws Exception {
        Map<String, Object> configs = new HashMap<>();
        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop11:9092,hadoop12:9092,hadoop13:9092");
        configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        //2. 创建生产者
        KafkaProducer<String, String> producer = new KafkaProducer<>(configs);
        //3. 发送数据
        int j = 11000;
        for (int i = 1; i <= 12; i++) {
            producer.send(new ProducerRecord<>("topicHW", "student" + i + "," + (j += 100)));
        }
        producer.close();
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27

image.png
linux终端显示生产者发送的数据

[root@hadoop11 kafka0.11]# kafka-console-consumer.sh --bootstrap-server hadoop11:9092 --topic topicHW
student1,11100
student2,11200
student3,11300
student4,11400
student5,11500
student6,11600
student7,11700
student8,11800
student9,11900
student10,12000
student11,12100
student12,12200

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

三、创建一个Consumer API程序,对kafka集群中的【topicHW】进行消费。

处理消费到的数据,将消费到的数据发送到另外一个名为topicDEAL 的topic中 ,要求如下:

1. 创建名为【topicDEAL】的topic ,其拥有3分区2副本,将消费到的数据中以逗号为分隔的第二个值+1500,其余不变,放入到topicDEAL这个topic中

结果如下 :
student1,12600
student2,12700
student3,12800
student4,12900

[root@hadoop11 kafka0.11]# kafka-topics.sh  --zookeeper hadoop11:2181  --create   --topic topicDEAL --partitions 3 --replication-factor 2
Created topic "topicDEAL".

  • 1
  • 2
  • 3

2.消费者接收并发送

package hw;

import com.sun.scenario.effect.impl.sw.sse.SSEBlend_SRC_OUTPeer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;

public class CustomConsumer {
    public static void main(String[] args) {
        //1. 初始化配置信息
        Map<String, Object> map = new HashMap<>();
        map.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop11:9092,hadoop12:9092,hadoop13:9092");
        map.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        map.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        map.put(ConsumerConfig.GROUP_ID_CONFIG, "g00000");

        Map<String, Object> configs = new HashMap<>();
        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop11:9092,hadoop12:9092,hadoop13:9092");
        configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

        //2. 创建生产者
        KafkaProducer<String, String> producer = new KafkaProducer<>(configs);

        //2. 创建Consumer
        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer(map);
        //订阅 topic-user的数据
        kafkaConsumer.subscribe(Arrays.asList("topicHW"));

        while (true) {
            //3. 消费数据
            ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(1000);
            for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                String[] arr = consumerRecord.value().split(","); //切分
                int a = Integer.parseInt(arr[1]) + 1500;
                String data = arr[0] + "," + a;

                producer.send(new ProducerRecord<>("topicDEAL", data));
                System.out.println(data);
            }

        }
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55

如果出现数据格式化错误,需要改一下GROUP_ID,g00000改个名字
map.put(ConsumerConfig.GROUP_ID_CONFIG, "g00000");
image.png

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/天景科技苑/article/detail/1017799
推荐阅读
相关标签
  

闽ICP备14008679号