赞
踩
borker即是kafka集群的每台机器Topic是一类数据的集合
是Topic数据的物理分区
负责生成数据到kafka的broker中
为consumer指定对应的consumer Group
kafka_2.12-3.2.0kafkakakakakakakakakakakakaka-Java文档类资源-CSDN下载
下载完成后直接解压
tar -zxvf kafka_2.12-3.2.0.tgz
vi /opt/kafka/config/server.properties
增加3个配置
listeners=PLAINTEXT://10.31.126.100:9092
advertised.listeners=PLAINTEXT://10.31.126.100:9092
zookeeper.connect=10.31.126.100:2181
cd /opt/kafka
./bin/zookeeper-server-start.sh ./config/zookeeper.properties
开启以后,不要关闭当前窗口,不然的话zookeeper服务会中断
ps: 如果报错:/opt/kafka/bin/kafka-run-class.sh: line 342: /opt/kafka/echo/bin/java: No such file or directory
可以看下 echo $JAVA_HOME 是不是路径打印不出。
解决方案: 执行 source /etc/profile ,再打印下 echo $JAVA_HOME,看下是否正常。
cd /opt/kafka
./bin/kafka-server-start.sh ./config/server.properties
开启以后,不要关闭当前窗口,不然的话kafka服务会中断
bin/kafka-topics.sh --create --bootstrap-server 10.31.126.10:9092 --replication-factor 1 --partitions 1 --topic wordTest
ps:在较新版本(2.2 及更高版本)的 Kafka 不再需要 ZooKeeper 连接字符串,即- -zookeeper localhost:2181。使用 Kafka Broker的 --bootstrap-server localhost:9092来替代- -zookeeper localhost:2181。
2.2一下版本:./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication 1 --partitions 1 --topic wordsendertest
来启动
bin/kafka-topics.sh --bootstrap-server 10.31.126.10:9092 --list
bin/kafka-topics.sh --describe --bootstrap-server 10.31.126.10:9092 --topic wordTest
bin/kafka-topics.sh -delete --bootstrap-server 10.31.126.10:9092 --topic wordTest
可以开启多个生产者客户端发送消息,开启多个消费者客户端接收消息,以观察这种订阅-发布模式实现的消息队列。
bin/kafka-console-producer.sh --bootstrap-server 10.31.126.10:9092 --topic wordTest
从当前时间点,开始取数据
bin/kafka-console-consumer.sh --bootstrap-server 10.31.126.10:9092 --topic wordTest
从最开始的时间点,开始取数据,(取该topic所有的数据)
bin/kafka-console-consumer.sh --bootstrap-server 10.31.126.10:9092 --topic wordTest --from-beginning
bin/kafka-server-stop.sh
开启生产者和消费者
编写flink程序,消费kafka数据
核心的生成kafka数据源方法介绍:
其中SimpleStringSchema,是因为kafka是纯字节存储,所以需要在代码中进行反序列化成对象,让Scala可以解析。
设置offset的几种方式
import org.apache.flink.api.common.serialization.SimpleStringSchema import org.apache.flink.streaming.api.scala._ import org.apache.flink.api.common.serialization.SimpleStringSchema import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer, FlinkKafkaConsumerBase} import java.util.Properties object kafkaTest { def main(args: Array[String]): Unit = { //create the env val env = StreamExecutionEnvironment.getExecutionEnvironment //for kafka connection val kafkaProps = new Properties() //kafka's attribute kafkaProps.setProperty("bootstrap.servers","10.31.126.10:9092") //set the consumer's group //kafkaProps.setProperty("group.id","group1") //create the consumer val kafkaSource = new FlinkKafkaConsumer[String]("wordTest", new SimpleStringSchema, kafkaProps) //set offset kafkaSource.setStartFromEarliest() //auto commit offset kafkaSource.setCommitOffsetsOnCheckpoints(true) //band data source val stream = env.addSource(kafkaSource) stream.print() //execute env.execute() } }
输出效果:
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。