赞
踩
需求数据如下:
student1,11100
student2,11200
student3,11300
student4,11400
student5,11500
student6,11600
student7,11700
student8,11800
student9,11900
student10,12000
student11,12100
student12,12200
一、通过Producer API发送到kafka中的【topicHW】
注:topic自行创建
二、创建一个Consumer API程序,对kafka集群中的【topicHW】进行消费。
处理消费到的数据,将消费到的数据发送到另外一个名为topicDEAL 的topic中 ,要求如下:
student1,12600
student2,12700
student3,12800
student4,12900
...
注意,如果没有启动consumer就直接开始生产数据,则无法读取到刚刚生产到的数据,如果出现了这种情况,需要重新运行一下生产者的代码即可。
topicDEAL 这个topic中的数据消费在moba中处理。
kafka-server-stop.sh stop
[root@hadoop11 kafka0.11]# kafka-server-start.sh -daemon config/server.properties
注意:启动目录的绝对路径和相对路径
环境为三台Kafka集群
[root@hadoop11 kafka0.11]# kafka-topics.sh --zookeeper hadoop11:2181 --create --topic topicHW --partitions 3 --replication-factor 2
Created topic "topicHW".
【报错】创建topicHW时出现如下报错:
[root@hadoop11 ~]# kafka-topics.sh --zookeeper hadoop11:2181 --create --topic topicHW --partitions 3 --replication-factor 2
Error while executing topic command : replication factor: 2 larger than available brokers: 1
错误描述:副本数大于brokers数,排除了因为三台节点未完全启动的可能。
解决方案:删除已经存在的所有topic,将jps中已经运行的ConsoleConsumer
进程kill -9。
[root@hadoop11 ~]# kafka-topics.sh --zookeeper hadoop11:2181 --list
__consumer_offsets
topicA
topicHW
[root@hadoop11 ~]# kafka-topics.sh --zookeeper hadoop11:2181 --delete --topic topicHW
Topic topicHW is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.
[root@hadoop11 ~]# kafka-topics.sh --zookeeper hadoop11:2181 --delete --topic topicA
Topic topicA is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.
[root@hadoop11 ~]# kafka-topics.sh --zookeeper hadoop11:2181 --list
__consumer_offsets
topicA - marked for deletion
[root@hadoop11 ~]# xcall.sh jps 要执行的命令是:jps -----------------------hadoop11--------------------- 51696 Jps 39028 ConsoleConsumer 42309 ConsoleConsumer 42118 QuorumPeerMain 41098 ConsoleProducer 45691 ConsoleConsumer -----------------------hadoop12--------------------- 10040 QuorumPeerMain 12872 Jps -----------------------hadoop13--------------------- 3512 QuorumPeerMain 5373 Jps [root@hadoop11 ~]# kill -9 ^C [root@hadoop11 ~]# kill -9 39028 [root@hadoop11 ~]# kill -9 42309 [1] 已杀死 kafka-console-consumer.sh --bootstrap-server hadoop11:9092 --topic topicHW [root@hadoop11 ~]# kill -9 41098 [3]- 已杀死 kafka-console-consumer.sh --bootstrap-server hadoop11:9092 --topic topicHW [root@hadoop11 ~]# kill -9 45691 [2]- 已杀死 kafka-console-producer.sh --broker-list hadoop11:9092 --topic topicHW
package hw; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer; import java.util.HashMap; import java.util.Map; public class CustomProducer { public static void main(String[] args) throws Exception { Map<String, Object> configs = new HashMap<>(); configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop11:9092,hadoop12:9092,hadoop13:9092"); configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); //2. 创建生产者 KafkaProducer<String, String> producer = new KafkaProducer<>(configs); //3. 发送数据 int j = 11000; for (int i = 1; i <= 12; i++) { producer.send(new ProducerRecord<>("topicHW", "student" + i + "," + (j += 100))); } producer.close(); } }
linux终端显示生产者发送的数据
[root@hadoop11 kafka0.11]# kafka-console-consumer.sh --bootstrap-server hadoop11:9092 --topic topicHW
student1,11100
student2,11200
student3,11300
student4,11400
student5,11500
student6,11600
student7,11700
student8,11800
student9,11900
student10,12000
student11,12100
student12,12200
处理消费到的数据,将消费到的数据发送到另外一个名为topicDEAL 的topic中 ,要求如下:
结果如下 :
student1,12600
student2,12700
student3,12800
student4,12900
[root@hadoop11 kafka0.11]# kafka-topics.sh --zookeeper hadoop11:2181 --create --topic topicDEAL --partitions 3 --replication-factor 2
Created topic "topicDEAL".
package hw; import com.sun.scenario.effect.impl.sw.sse.SSEBlend_SRC_OUTPeer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import java.util.Arrays; import java.util.HashMap; import java.util.Map; public class CustomConsumer { public static void main(String[] args) { //1. 初始化配置信息 Map<String, Object> map = new HashMap<>(); map.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop11:9092,hadoop12:9092,hadoop13:9092"); map.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); map.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); map.put(ConsumerConfig.GROUP_ID_CONFIG, "g00000"); Map<String, Object> configs = new HashMap<>(); configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop11:9092,hadoop12:9092,hadoop13:9092"); configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); //2. 创建生产者 KafkaProducer<String, String> producer = new KafkaProducer<>(configs); //2. 创建Consumer KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer(map); //订阅 topic-user的数据 kafkaConsumer.subscribe(Arrays.asList("topicHW")); while (true) { //3. 消费数据 ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(1000); for (ConsumerRecord<String, String> consumerRecord : consumerRecords) { String[] arr = consumerRecord.value().split(","); //切分 int a = Integer.parseInt(arr[1]) + 1500; String data = arr[0] + "," + a; producer.send(new ProducerRecord<>("topicDEAL", data)); System.out.println(data); } } } }
如果出现数据格式化错误,需要改一下GROUP_ID,g00000改个名字
map.put(ConsumerConfig.GROUP_ID_CONFIG, "g00000");
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。