当前位置:   article > 正文

kafka之 producer与consumer简单应用_kafka consumer 适用

kafka consumer 适用

编译器 Intellli j IDEA
安装java 8+ 环境
idea内安装scala maven
到官网下载KAFKA 解压

1.新建一个maven项目,打开一个terminal,cd 进入解压的kafka包内,执行命令启动服务:
bin/zookeeper-server-start.sh config/zookeeper.properties
再打开一个终端:
bin/kafka-server-start.sh config/server.properties
2.编写第一个事件之前,必须创建一个主题。打开另一个终端会话并运行:
bin/kafka-topics.sh --create --topic quickstart-events --bootstrap-server localhost:9092
Kafka的所有命令行工具都具有其他选项:kafka-topics.sh不带任何参数的命令即可显示使用情况信息。例如,它还可以向您显示 详细信息,例如 新主题的分区数:
3.写事件进topic
$ bin/kafka-console-producer.sh --topic topic1 --bootstrap-server localhost:9092

4.main目录下创建一个package,在package下面再创建两个.scala文件,编写producer 代码
package mypackage
import java.util.Properties
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord, RecordMetadata}
import org.apache.kafka.common.serialization.StringSerializer
/**

  • 实现producer
    */
    object KafkaProducer {
    def main(args: Array[String]): Unit = {
    val prop = new Properties
    // 指定请求的kafka集群列表
    prop.put(“bootstrap.servers”, “localhost:9092”)// 指定响应方式
    //prop.put(“acks”, “0”)
    prop.put(“acks”, “all”)
    // 请求失败重试次数
    prop.put(“retries”, “3”)
    // 指定key的序列化方式, key是用于存放数据对应的offset
    prop.put(“key.serializer”, “org.apache.kafka.common.serialization.StringSerializer”)
    // 指定value的序列化方式
    prop.put(“value.serializer”,“org.apache.kafka.common.serialization.StringSerializer”)
    // 配置超时时间
    // prop.put(“key.deserializer”, “org.apache.kafka.common.serialization.StringDeserializer”)
    //prop.put(“request.timeout.ms”, “60000”)
    //prop.put(“batch.size”, “16384”)
    //prop.put(“linger.ms”, “1”)
    //prop.put(“buffer.memory”, “33554432”)

    // 得到生产者的实例
    val producer = new KafkaProducerString, String

    // 模拟一些数据并发送给kafka
    var i=1
    while (i != 0) {
    val msg = s"${i}: this is a topic ${i} kafka data"
    println(“send -->” + msg)
    // 得到返回值
    val rmd: RecordMetadata = producer.send(new ProducerRecord[String, String](“topic1”, msg)).get()
    println(rmd.toString)
    Thread.sleep(500)
    i = i + 1;
    }

    producer.close()
    }
    }

5.编写consumer代码:

import java.util.{Collections, Properties}
import org.apache.kafka.clients.consumer.{ConsumerRecords, KafkaConsumer}
import org.apache.kafka.common.serialization.StringSerializer
package mypackage

object KafkaConsumer {
def main(args: Array[String]): Unit = {
// 配置信息
val prop = new Properties
prop.put(“bootstrap.servers”, “localhost:9092”)
// 指定消费者组
prop.put(“group.id”, “group01”)
// 指定消费位置: earliest/latest/none
prop.put(“auto.offset.reset”, “earliest”)
// 指定消费的key的反序列化方式
prop.put(“key.deserializer”, “org.apache.kafka.common.serialization.StringDeserializer”)
// 指定消费的value的反序列化方式
prop.put(“value.deserializer”, “org.apache.kafka.common.serialization.StringDeserializer”)
prop.put(“enable.auto.commit”, “true”)
prop.put(“session.timeout.ms”, “30000”)
// 得到Consumer实例
val kafkaConsumer = new KafkaConsumerString, String
// 首先需要订阅topic
kafkaConsumer.subscribe(Collections.singletonList(“topic1”))
// 开始消费数据
while (true) {
// 如果Kafak中没有消息,会隔timeout这个值读一次。比如上面代码设置了2秒,就2秒后查一次。
// 如果Kafka中还有消息没有消费的话,会马上去读,而不需要等待。
val msgs: ConsumerRecords[String, String] = kafkaConsumer.poll(2000)
// println(msgs.count())
val it = msgs.iterator()
while (it.hasNext) {
val msg = it.next()
println(s"partition: ${msg.partition()}, offset: ${msg.offset()}, key: ${msg.key()}, value: ${msg.value()}")
}
}
}
}

6.在pom.xml内加入需要用到的各种dependency


org.apache.kafka
kafka_2.11
1.1.0

    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>1.1.0</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java_2.11</artifactId>
        <version>1.7.0</version>
        <!--<scope>provided</scope>-->
    </dependency>

    <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-scala_2.11</artifactId>
        <version>1.7.0</version>
        <!--<scope>provided</scope>-->


    </dependency>

    <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka-0.10 -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-kafka-0.10_2.11</artifactId>
        <version>1.7.0</version>
    </dependency>

</dependencies>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  1. RUN 。
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/笔触狂放9/article/detail/606402
推荐阅读
相关标签
  

闽ICP备14008679号