Kafka-java
1. 在写代码前需要导入依赖
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-connector-kafka</artifactId>
- <version>${flink.version}</version>
- </dependency>
2. 使用java代码从kafka中拿数据
- package com.wt.flink.scurce
- import org.apache.flink.api.common.eventtime.WatermarkStrategy
- import org.apache.flink.api.common.serialization.SimpleStringSchema
- import org.apache.flink.connector.kafka.source.KafkaSource
- import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer
- import org.apache.flink.streaming.api.scala._
-
- object Demo5KafkaSource {
- def main(args: Array[String]): Unit = {
- //创建flink的环境
- val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
-
- /**
- * 构建kafka source
- */
-
- val source: KafkaSource[String] = KafkaSource
- .builder[String]
- .setBootstrapServers("master:9092,node1:9092,node2:9092") //kafka集群broker列表
- .setTopics("test_topic2") //指定topic
- .setGroupId("my_group") //指定消费组,一条数据指能在一个组内只能被消费一次
- .setStartingOffsets(OffsetsInitializer.earliest()) //读取数据的位置,earliest:读取所有的数据,latest:读取最新的数据
- .setValueOnlyDeserializer(new SimpleStringSchema()) //反序列的类
- .build()
-
- //使用kafka source
- val kafkaDS: DataStream[String] = env.fromSource(source,WatermarkStrategy.noWatermarks(),"kafka Source")
-
- kafkaDS.print()
-
- env.execute()
- }
- }
3. 用java代码向kafka中打入数据
- package com.wt.flink.kafka
-
- import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
-
- import java.util.Properties
-
- object Demo1KafkaProducer {
- def main(args: Array[String]): Unit = {
- /**
- * 1. 创建生产者
- *
- */
- val properties = new Properties()
-
- //指定kafka broker的地址
- properties.setProperty("bootstrap.servers", "master:9092,node2:9092,node2:9092")
-
- //设置key 和 value的序列化的类
- properties.setProperty("key.serializer","org.apache.kafka.common.serialization.StringSerializer")
- properties.setProperty("value.serializer","org.apache.kafka.common.serialization.StringSerializer")
-
-
- val producer = new KafkaProducer[String, String](properties)
-
- val record = new ProducerRecord[String, String]("test_topic2", "woaini,zhongguo")
-
- //发送数据到kafka中
- producer.send(record)
- producer.flush()
-
- //关闭连接
- producer.close()
- }
- }
4. 向kafka中批量打入学生数据
- package com.wt.flink.kafka
- import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
- import java.util.Properties
- import scala.io.Source
-
- object Demo2StudentToKafka {
- def main(args: Array[String]): Unit = {
- /**
- * 创建生产者
- *
- */
- val properties = new Properties()
-
- //指定kafka broker 的地址
- //指定kafka broker地址
- properties.setProperty("bootstrap.servers", "master:9092,node2:9092,node2:9092")
-
- //设置key 和value的序列化类
- properties.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
- properties.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
-
- val producer = new KafkaProducer[String, String](properties)
-
- /**
- * 将学生表数据批量写到kafka中
- *
- */
- val studentList: List[String] = Source.fromFile("data/students.txt").getLines().toList
-
- //发送数据到kafka中
- for (student <- studentList) {
- val record = new ProducerRecord[String, String]("student", student)
-
- producer.send(record)
- producer.flush()
- }
- producer.close()
- }
- }
5. 在kafka中批量拿数据
- package com.wt.flink.kafka
- import org.apache.kafka.clients.consumer.{ConsumerRecord, ConsumerRecords, KafkaConsumer}
-
- import java.time.Duration
- import java.util.Properties
- import java.{lang, util}
-
- object Demo3KafkaConsumer {
- def main(args: Array[String]): Unit = {
- /**
- * 1. 创建消费者
- *
- */
- val properties = new Properties()
-
- properties.setProperty("bootstrap.servers", "master:9092,node2:9092,node2:9092")
-
- //key 和value 反序列化的类
- properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
- properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
-
- /**
- * earliest
- * 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
- * latest 默认
- * 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产认值生的该分区下的数据
- * none
- * topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
- *
- */
-
- properties.setProperty("auto.offset.reset","earliest")
-
- //消费者组
- properties.setProperty("group.id","suibian_mingzi")
-
- val consumer = new KafkaConsumer[String, String](properties)
-
- /**
- * 2. 订阅一个 topic, 可以一次定义多个topic
- *
- */
- val topics = new util.ArrayList[String]()
- topics.add("student")
- consumer.subscribe(topics)
-
- while (true) {
- println("正在消费")
-
- /**
- * 消费数据,这需要设置一个超时时间
- *
- */
- val consumerRecords: ConsumerRecords[String, String] = consumer
- .poll(Duration.ofSeconds(2))
-
- //解析数据
- val records: lang.Iterable[ConsumerRecord[String, String]] = consumerRecords.records("student")
-
- val iterRecord: util.Iterator[ConsumerRecord[String, String]] = records.iterator()
-
- while (iterRecord.hasNext) {
- //获取一行数据
- val record: ConsumerRecord[String, String] = iterRecord.next()
-
- val topic: String = record.topic() //topic
- val offset: Long = record.offset() //数据偏移量
- val key: String = record.key() //数据的key,默认情况下没有指定的的话为null
- val value: String = record.value() //保存数据
- val ts: Long = record.timestamp() //时间戳,默认存入的时间
-
- println(s"$topic\t$offset\t$key\t$value\t$ts")
-
- }
- }
- //关闭连接
- consumer.close()
- }
- }