赞
踩
flink作为一个大数据框架,已经由阿里充分的证实了其性能和前景。但对国内仍然是一个比较陌生的状态,无论是开源的文档和实例都比较缺乏。之前找到的demo很多都是旧版本;同时flink本身面临版本演进,blink开源等一些影响,也会在之后出现一些比较大的变化。
我根据目前的资料,编写了基于flink的Kafka生产消费demo,便于初步的了解flink的特性,也希望能对他人了解flink提供一些小小的帮助。
以下demo是我根据一些其他的demo整合而成,引用申明见最下方。
flink的导包是常见的核心+拓展模式
flink-java是flink的核心实现.我使用的是(看起来比较成熟的1.5.0,可能1.6.0更合适)
flink-streaming-java_2.11是使用flink流接入的包
flink-connector-kafka-0.10_2.11是为了使用Kafka开发的中间件,而使用这个包必须保证版号对应,及使用kafka0.10和flink流2.11
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-java</artifactId>
- <version>1.5.0</version>
- </dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-streaming-java_2.11</artifactId>
- <version>1.5.0</version>
- </dependency>
- <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka-0.10 -->
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-connector-kafka-0.10_2.11</artifactId>
- <version>1.5.0</version>
- </dependency>
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
源为一个循环的数据产生器,输出为对应的kafka topic
- public class WriteIntoKafka {
- public static void main(String[] args) throws Exception {
- // create execution environment
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
- Map properties= new HashMap();
- properties.put("bootstrap.servers", "/*服务地址*/");
- properties.put("topic", "/*topic*/");
-
- // parse user parameters
- ParameterTool parameterTool = ParameterTool.fromMap(properties);
-
- // add a simple source which is writing some strings
- DataStream<String> messageStream = env.addSource(new SimpleStringGenerator());
-
- // write stream to Kafka
- messageStream.addSink(new FlinkKafkaProducer010<>(parameterTool.getRequired("bootstrap.servers"),
- parameterTool.getRequired("topic"),
- new SimpleStringSchema()));
-
- messageStream.rebalance().map(new MapFunction<String, String>() {
- //序列化设置
- private static final long serialVersionUID = 1L;
-
- @Override
- public String map(String value) throws Exception {
- return value;
- }
- });
-
- messageStream.print();
-
- env.execute();
- }
-
- public static class SimpleStringGenerator implements SourceFunction<String> {
- //序列化设置
- private static final long serialVersionUID = 1L;
- boolean running = true;
-
- @Override
- public void run(SourceContext<String> ctx) throws Exception {
- while(running) {
- ctx.collect(prouderJson());
- }
- }
-
- @Override
- public void cancel() {
- running = false;
- }
- }
- }
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
采用Kafka消费者作为源,通过MapFunction转换后输出
- public class ReadFromKafka {
-
- public static void main(String[] args) throws Exception {
- // create execution environment
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
- Map properties= new HashMap();
- properties.put("bootstrap.servers", "/*服务地址*/");
- properties.put("group.id", "test");
- properties.put("enable.auto.commit", "true");
- properties.put("auto.commit.interval.ms", "1000");
- properties.put("auto.offset.reset", "earliest");
- properties.put("session.timeout.ms", "30000");
- properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
- properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
- properties.put("topic", "/*topic*/");
- // parse user parameters
-
- ParameterTool parameterTool = ParameterTool.fromMap(properties);
-
- FlinkKafkaConsumer010 consumer010 = new FlinkKafkaConsumer010(
- parameterTool.getRequired("topic"), new SimpleStringSchema(), parameterTool.getProperties());
-
-
- DataStream<String> messageStream = env
- .addSource(consumer010);
-
- // print() will write the contents of the stream to the TaskManager's standard out stream
- // the rebelance call is causing a repartitioning of the data so that all machines
- // see the messages (for example in cases when "num kafka partitions" < "num flink operators"
- messageStream.rebalance().map(new MapFunction<String, String>() {
- private static final long serialVersionUID = 1L;
-
- @Override
- public String map(String value) throws Exception {
- return value;
- }
- });
-
-
- messageStream.print();
-
- env.execute();
- }
- }
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
1,配置我采用了 Map properties= new HashMap(); 这种在程序内部写入的方式。
也可以采用读取文件的配置的方式。
3,另一种配置方式,可以对FlinkKafkaConsumer010进行配置,具体使用待补充
采用Kafka为data Source使用
2,新旧版本的最大区别就是 使用FlinkKafkaProducer010和FlinkKafkaProducer010作为生产者和消费者的实例,具体方法的使用和之前的版本有些差别
4,生产者源的产生方式是值得研究的,也应该有其他的产生方式。很多实现细节没有弄清楚(待补充)
5.消费者转换的MapFunction感觉仍有不清楚的地方,这一步可以实现什么逻辑,适合写什么样的逻辑呢(待补充)
6,输出细节,可以在 实现中输出,也可以使用messageStream.print(),进行输出,具体格式可以自行验证。
以上两个demo是调用flink—Kafka的api实现的,但是仍有一些的问题,没有弄清:
1,关于生产者,如何自定义发送数据格式,如byte[],如何自定义发送的分区
2,关于消费者kafka低级api的使用,seek2end,拉取指定分区,手动提交偏移量等设置的实现
https://www.jianshu.com/p/f9d447a3c48f
https://blog.csdn.net/lmalds/article/details/51780950
https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html
github中的某段代码(找了很久,忘了出处了)
10.28
最近把一些用过的demo放在了GitHub上,可以看下:https://github.com/jyj019/flinkDemo
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。