赞
踩
编译器 Intellli j IDEA
安装java 8+ 环境
idea内安装scala maven
到官网下载KAFKA 解压
1.新建一个maven项目,打开一个terminal,cd 进入解压的kafka包内,执行命令启动服务:
bin/zookeeper-server-start.sh config/zookeeper.properties
再打开一个终端:
bin/kafka-server-start.sh config/server.properties
2.编写第一个事件之前,必须创建一个主题。打开另一个终端会话并运行:
bin/kafka-topics.sh --create --topic quickstart-events --bootstrap-server localhost:9092
Kafka的所有命令行工具都具有其他选项:kafka-topics.sh不带任何参数的命令即可显示使用情况信息。例如,它还可以向您显示 详细信息,例如 新主题的分区数:
3.写事件进topic
$ bin/kafka-console-producer.sh --topic topic1 --bootstrap-server localhost:9092
4.main目录下创建一个package,在package下面再创建两个.scala文件,编写producer 代码
package mypackage
import java.util.Properties
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord, RecordMetadata}
import org.apache.kafka.common.serialization.StringSerializer
/**
实现producer
*/
object KafkaProducer {
def main(args: Array[String]): Unit = {
val prop = new Properties
// 指定请求的kafka集群列表
prop.put(“bootstrap.servers”, “localhost:9092”)// 指定响应方式
//prop.put(“acks”, “0”)
prop.put(“acks”, “all”)
// 请求失败重试次数
prop.put(“retries”, “3”)
// 指定key的序列化方式, key是用于存放数据对应的offset
prop.put(“key.serializer”, “org.apache.kafka.common.serialization.StringSerializer”)
// 指定value的序列化方式
prop.put(“value.serializer”,“org.apache.kafka.common.serialization.StringSerializer”)
// 配置超时时间
// prop.put(“key.deserializer”, “org.apache.kafka.common.serialization.StringDeserializer”)
//prop.put(“request.timeout.ms”, “60000”)
//prop.put(“batch.size”, “16384”)
//prop.put(“linger.ms”, “1”)
//prop.put(“buffer.memory”, “33554432”)
// 得到生产者的实例
val producer = new KafkaProducerString, String
// 模拟一些数据并发送给kafka
var i=1
while (i != 0) {
val msg = s"${i}: this is a topic ${i} kafka data"
println(“send -->” + msg)
// 得到返回值
val rmd: RecordMetadata = producer.send(new ProducerRecord[String, String](“topic1”, msg)).get()
println(rmd.toString)
Thread.sleep(500)
i = i + 1;
}
producer.close()
}
}
5.编写consumer代码:
import java.util.{Collections, Properties}
import org.apache.kafka.clients.consumer.{ConsumerRecords, KafkaConsumer}
import org.apache.kafka.common.serialization.StringSerializer
package mypackage
object KafkaConsumer {
def main(args: Array[String]): Unit = {
// 配置信息
val prop = new Properties
prop.put(“bootstrap.servers”, “localhost:9092”)
// 指定消费者组
prop.put(“group.id”, “group01”)
// 指定消费位置: earliest/latest/none
prop.put(“auto.offset.reset”, “earliest”)
// 指定消费的key的反序列化方式
prop.put(“key.deserializer”, “org.apache.kafka.common.serialization.StringDeserializer”)
// 指定消费的value的反序列化方式
prop.put(“value.deserializer”, “org.apache.kafka.common.serialization.StringDeserializer”)
prop.put(“enable.auto.commit”, “true”)
prop.put(“session.timeout.ms”, “30000”)
// 得到Consumer实例
val kafkaConsumer = new KafkaConsumerString, String
// 首先需要订阅topic
kafkaConsumer.subscribe(Collections.singletonList(“topic1”))
// 开始消费数据
while (true) {
// 如果Kafak中没有消息,会隔timeout这个值读一次。比如上面代码设置了2秒,就2秒后查一次。
// 如果Kafka中还有消息没有消费的话,会马上去读,而不需要等待。
val msgs: ConsumerRecords[String, String] = kafkaConsumer.poll(2000)
// println(msgs.count())
val it = msgs.iterator()
while (it.hasNext) {
val msg = it.next()
println(s"partition: ${msg.partition()}, offset: ${msg.offset()}, key: ${msg.key()}, value: ${msg.value()}")
}
}
}
}
6.在pom.xml内加入需要用到的各种dependency
org.apache.kafka
kafka_2.11
1.1.0
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>1.1.0</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.11</artifactId> <version>1.7.0</version> <!--<scope>provided</scope>--> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_2.11</artifactId> <version>1.7.0</version> <!--<scope>provided</scope>--> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka-0.10 --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka-0.10_2.11</artifactId> <version>1.7.0</version> </dependency> </dependencies>
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。