赞
踩
本文是《Flink的DataSource三部曲》系列的第二篇,上一篇《Flink的DataSource三部曲之一:直接API》学习了StreamExecutionEnvironment的API创建DataSource,今天要练习的是Flink内置的connector,即下图的红框位置,这些connector可以通过StreamExecutionEnvironment的addSource方法使用:
今天的实战选择Kafka作为数据源来操作,先尝试接收和处理String型的消息,再接收JSON类型的消息,将JSON反序列化成bean实例;
如果您不想写代码,整个系列的源码可在GitHub下载到,地址和链接信息如下表所示(https://github.com/zq2599/blog_demos):
名称 | 链接 | 备注 |
---|---|---|
项目主页 | https://github.com/zq2599/blog_demos | 该项目在GitHub上的主页 |
git仓库地址(https) | https://github.com/zq2599/blog_demos.git | 该项目源码的仓库地址,https协议 |
git仓库地址(ssh) | git@github.com:zq2599/blog_demos.git | 该项目源码的仓库地址,ssh协议 |
这个git项目中有多个文件夹,本章的应用在flinkdatasourcedemo文件夹下,如下图红框所示:
本次实战的环境和版本如下:
请确保上述内容都已经准备就绪,才能继续后面的实战;
./kafka-topics.sh \
--create \
--zookeeper 192.168.50.43:2181 \
--replication-factor 1 \
--partitions 2 \
--topic test001
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>1.10.0</version>
</dependency>
package com.bolingcavalry.connector; import com.bolingcavalry.Splitter; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import java.util.Properties; import static com.sun.tools.doclint.Entity.para; public class Kafka240String { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //设置并行度 env.setParallelism(2); Properties properties = new Properties(); //broker地址 properties.setProperty("bootstrap.servers", "192.168.50.43:9092"); //zookeeper地址 properties.setProperty("zookeeper.connect", "192.168.50.43:2181"); //消费者的groupId properties.setProperty("group.id", "flink-connector"); //实例化Consumer类 FlinkKafkaConsumer<String> flinkKafkaConsumer = new FlinkKafkaConsumer<>( "test001", new SimpleStringSchema(), properties ); //指定从最新位置开始消费,相当于放弃历史消息 flinkKafkaConsumer.setStartFromLatest(); //通过addSource方法得到DataSource DataStream<String> dataStream = env.addSource(flinkKafkaConsumer); //从kafka取得字符串消息后,分割成单词,统计数量,窗口是5秒 dataStream .flatMap(new Splitter()) .keyBy(0) .timeWindow(Time.seconds(5)) .sum(1) .print(); env.execute("Connector DataSource demo : kafka"); } }
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.8.5</version>
</dependency>
package com.bolingcavalry; public class Student { private int id; private String name; public int getId() { return id; } public void setId(int id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } }
ackage com.bolingcavalry.connector; import com.bolingcavalry.Student; import com.google.gson.Gson; import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.api.common.serialization.SerializationSchema; import org.apache.flink.api.common.typeinfo.TypeInformation; import java.io.IOException; public class StudentSchema implements DeserializationSchema<Student>, SerializationSchema<Student> { private static final Gson gson = new Gson(); /** * 反序列化,将byte数组转成Student实例 * @param bytes * @return * @throws IOException */ @Override public Student deserialize(byte[] bytes) throws IOException { return gson.fromJson(new String(bytes), Student.class); } @Override public boolean isEndOfStream(Student student) { return false; } /** * 序列化,将Student实例转成byte数组 * @param student * @return */ @Override public byte[] serialize(Student student) { return new byte[0]; } @Override public TypeInformation<Student> getProducedType() { return TypeInformation.of(Student.class); } }
package com.bolingcavalry.connector; import com.bolingcavalry.Splitter; import com.bolingcavalry.Student; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import java.util.Properties; public class Kafka240Bean { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //设置并行度 env.setParallelism(2); Properties properties = new Properties(); //broker地址 properties.setProperty("bootstrap.servers", "192.168.50.43:9092"); //zookeeper地址 properties.setProperty("zookeeper.connect", "192.168.50.43:2181"); //消费者的groupId properties.setProperty("group.id", "flink-connector"); //实例化Consumer类 FlinkKafkaConsumer<Student> flinkKafkaConsumer = new FlinkKafkaConsumer<>( "test001", new StudentSchema(), properties ); //指定从最新位置开始消费,相当于放弃历史消息 flinkKafkaConsumer.setStartFromLatest(); //通过addSource方法得到DataSource DataStream<Student> dataStream = env.addSource(flinkKafkaConsumer); //从kafka取得的JSON被反序列化成Student实例,统计每个name的数量,窗口是5秒 dataStream.map(new MapFunction<Student, Tuple2<String, Integer>>() { @Override public Tuple2<String, Integer> map(Student student) throws Exception { return new Tuple2<>(student.getName(), 1); } }) .keyBy(0) .timeWindow(Time.seconds(5)) .sum(1) .print(); env.execute("Connector DataSource demo : kafka bean"); } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。