当前位置:   article > 正文

Kafka 生产者和消费者实例_kafka生产者和消费者

kafka生产者和消费者

目录

一、基于命令行使用Kafka

二、创建一个名为“itcasttopic”的主题

①、创建生产者

②、创建消费者

③、测试发送数据

三、基于Java API方式使用Kafka

①、创建工程添加依赖

②、编写生产者客户端

③ 、配置环境

④、编写消费者客户端

⑤、再运行KafkaConsumerTest程序 ​编辑​编辑

⑥、再回到KafkaProducerTest.java运行该程序


 

一、基于命令行使用Kafka

    类似scala,mysql等,命令行是初学者操作Kafka的基本方式,kafka的模式是生产者消费者模式,他们之间通讯是通过,一个公共频道完成

二、创建一个名为“itcasttopic”的主题

kafka-topics.sh --create --topic itcasttopic  --partitions 3  --replication-factor 2  --zookeeper master:2181,slave1:2181,slave2:2181

 

--create --topic itcasttopic:  创建主题名称是 itcasttopic

--partitions 3  : 分区数是3

--replication-factor 2:副本数是 2

--zookeeper master:2181,slave1:2181,slave2:2181 : zookeeper:服务的IP地址和端口

1b35e7041cd746459f7b9bcc011850f6.png

 

##删除主题##

$ bin/kafka-topics.sh --delete -zookeeper master:2181,slave1:2181,slave2:2181 --topic itcasttopic

 

①、创建生产者

kafka-console-producer.sh  --broker-list master:9092,slave1:9092,slave2:9092 --topic itcasttopic

e1138514f25c4465a5111fd5a02611db.png

 

(上面是等待输入光标在闪烁)

83087331134e44b38cd72745454d087f.png

 

转换到slave1

、创建消费者

kafka-console-consumer.sh  --from-beginning --topic itcasttopic --bootstrap-server master:90

ce761842a2d84647976512dd17bd843f.png

 

③、测试发送数据

生产发送数据

ea666802b2d2474cacf525450d3c5205.png

消费接收数据

2e9816c7bdc142b3a8bfca89a486984b.png

 

三、基于Java API方式使用Kafka

2ab018b696754842a27c5e791c26f33d.png

560f905f3548454a828ed3c4898d1f1f.png

 

修改配置:

7d791f05e2a849c38c3ee4b778d10131.png

 a09d2778cfbb47aead16d8bf74d02f3e.png

 

①、创建工程添加依赖

在工程里面的pom.xml文件添加Kafka依赖

(Kafka依赖需要与虚拟机安装的Kafka版本保持一致)

  1. <properties>
  2. <scala.version>2.11.8</scala.version>
  3. <hadoop.version>2.7.4</hadoop.version>
  4. <spark.version>2.3.2</spark.version>
  5. </properties>
  6. <build>
  7.     <plugins>
  8.         <plugin>
  9.             <groupId>org.apache.maven.plugins</groupId>
  10.             <artifactId>maven-compiler-plugin</artifactId>
  11.             <configuration>
  12.                 <source>1.8</source>
  13.                 <target>1.8</target>
  14.             </configuration>
  15.         </plugin>
  16.     </plugins>
  17. </build>
  18. <!--kafka-->
  19. <dependency>
  20.     <groupId>org.apache.kafka</groupId>
  21.     <artifactId>kafka-clients</artifactId>
  22.     <version>2.0.0</version>
  23. </dependency>
  24. <dependency>
  25.     <groupId>org.apache.kafka</groupId>
  26.     <artifactId>kafka-streams</artifactId>
  27.     <version>2.0.0</version>
  28. </dependency>

②、编写生产者客户端

在工程的java目录下创建KafkaProducerTest文件

5e0dada88c894b37872116cb5afc4f84.png

 

  1. import org.apache.kafka.clients.producer.KafkaProducer;
  2. import org.apache.kafka.clients.producer.ProducerRecord;
  3. import java.util.Properties;
  4. public class KafkaProducerTest {
  5.     public static void main(String[] args){
  6.         Properties props = new Properties();
  7.         //
  8.         props.put("bootstrap.servers","master:9092,slave1:9092,slave2:9092");
  9.         //
  10.         props.put("acks","all");
  11.         //
  12.         props.put("retries",0);
  13.         //
  14.         props.put("batch.size",16384);
  15.         //
  16.         props.put("linger.ms",1);
  17.         //
  18.         props.put("buffer.memory",33554432);
  19.         //
  20.         props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
  21.         //
  22.         props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
  23.         //
  24.         KafkaProducer<String,String> producer=new KafkaProducer<String, String>(props);
  25.         for (int i=0; i<50; i++){
  26.             producer.send(new ProducerRecord<String, String>("itcasttopic",Integer.toString(i),"hello world [2] -"+i));
  27.         }
  28.         producer.close();
  29.     }


}edcb457a6e69430d975b24ad6ddf82d3.png

 

Slave1上出现的结果

3920872aaecf49f2b6506b6b07e69ea1.png

 

③ 、配置环境

c26e97e27cb14e5dad208c2ef1ccab15.png

80160532d6a3427d8b1ec7a4464bd345.png

 

④、编写消费者客户端

  1. import org.apache.kafka.clients.consumer.ConsumerRecord;
  2. import org.apache.kafka.clients.consumer.ConsumerRecords;
  3. import org.apache.kafka.clients.consumer.KafkaConsumer;
  4. import org.apache.kafka.clients.producer.Callback;
  5. import org.apache.kafka.clients.producer.KafkaProducer;
  6. import org.apache.kafka.clients.producer.ProducerRecord;
  7. import org.apache.kafka.clients.producer.RecordMetadata;
  8. import java.util.Arrays;
  9. import java.util.Properties;
  10. public class KafkaConsumerTest {
  11.     public static void main(String[] args) {
  12.         // 1、准备配置文件
  13.         Properties props = new Properties();
  14.         // 2、指定Kafka集群主机名和端口号
  15.         props.put("bootstrap.servers", "master:9092,slave1:9092,slave2:9092");
  16.         // 3、指定消费者组ID,在同一时刻同一消费组中只有一个线程可以去消费一个分区数据,不同的消费组可以去消费同一个分区的数据。
  17.         props.put("group.id", "itcasttopic");
  18.         // 4、自动提交偏移量
  19.         props.put("enable.auto.commit", "true");
  20.         // 5、自动提交时间间隔,每秒提交一次
  21.         props.put("auto.commit.interval.ms", "1000");
  22.         props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  23.         props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  24.         KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(props);
  25.         // 6、订阅数据,这里的topic可以是多个
  26.         kafkaConsumer.subscribe(Arrays.asList("itcasttopic"));
  27.         // 7、获取数据
  28.         while (true) {
  29.             //每隔100ms就拉去一次
  30.             ConsumerRecords<String, String> records = kafkaConsumer.poll(100);
  31.             for (ConsumerRecord<String, String> record : records) {
  32.                 System.out.printf("topic = %s,offset = %d, key = %s, value = %s%n", record.topic(), record.offset(), record.key(), record.value());
  33.             }
  34.         }
  35.     }
  36. }

运行KafkaP roducerTest程序

83ffa7dd0cb14c72b442a98b7f45f136.png

 

⑤、再运行KafkaConsumerTest程序 790caac612934c6eb173c4d117372979.png

⑥、再回到KafkaProducerTest.java运行该程序

(查看KafkaConsumerTest的运行框)由以下图可以看出生产者生产消息成功被终端消费

5b69e55e64c34dc69f6fccb1f8059e32.png

 

 

本文内容由网友自发贡献,转载请注明出处:【wpsshop博客】
推荐阅读
相关标签
  

闽ICP备14008679号