赞
踩
flink主要解决大数据流式计算问题,举个例子,我们在一个系统中难免会遇到统计功能,例如淘宝里面每间商品每个月销量,微信里面每月支出收入等,当数据达到一定程度,光靠数据库里的sum group by是不现实的,多则千万条数据,更甚多个表关联后会达到数亿计的临时数据,实时的统计会使得数据库占用大量资源,这个时候就需要流式计算,针对实时的数据变更通过流式计算出统计的数据,相当于每时每刻都在增量的计算统计结果,查询统计时直接读取当前的结果即可。
而flink则提供了这么一个分布式平台,我们只需要使用flink提供的java包里的方法,定义好输入,怎么处理数据,输出结果,然后打包成jar,上传到flink集群,flink自会帮我们解析任务,拆分任务,分配给不同机器处理。
优点显而易见:低代码,我们不必自己每次都写一个庞大的程序去处理数据,只需要用它提供的方法定义流程即可;分布式,我们不必纠结一台机器能不能稳定提供计算的能力,它会自动帮我们分布到不同机器运行。
docker pull flink:scala_2.12-java8(注意,这里尽量不要用latest,因为默认最新一般都是java11版本,使用java8开发的jar包会启动失败)
启动jobmanager(master节点,管理程序,负责调度job运算)
docker run -d --name jm --network mynet -p 8081:8081 -e JOB_MANAGER_RPC_ADDRESS=jm flink:scala_2.12-java8 jobmanager
启动两个taskmanager(真正运算task的节点)
docker run -d --name tm1 --network mynet -e JOB_MANAGER_RPC_ADDRESS=jm flink:scala_2.12-java8 taskmanager
docker run -d --name tm2 --network mynet -e JOB_MANAGER_RPC_ADDRESS=jm flink:scala_2.12-java8 taskmanager
然后可以通过8081打开管理页面,查看集群状态。
下面为了更好模拟正常的集群,我们把每个taskmanager的task slot数改为2(默认是1):
进入容器,修改opt/flink/conf/flink-conf.yaml里的taskmanager.numberOfTaskSlots: 2 (注意,这里如果用-v 映射文件出来会导致容器启动失败,具体原因不明,建议启动容器后通过 cp 命令把修改好的配置覆盖进去)
然后重启两个taskmanager容器即可。
我们先举一个简单的场景,有个生产者持续向kafka生产文本,我们想要持续统计出现单词的个数,然后输出到mysql,并且希望有两个节点同时进行统计提高效率。
jobmanager:集群的master节点,如简介所说,它负责解析用户开发的jar包,生成job流程,拆分不同的任务,并且分配去不同slot运行。上面的例子可以拆分成4个子任务,监听kafka数据源,统计A,统计B,输出mysql。
taskmanager:集群真正干活的节点。上面拆分的4个子任务会被安排到taskmanager节点执行。
taskslot:一个taskmanager可以有多个slot,每个slot都有自己独立的内存空间,上面拆分的子任务会分配到slot里执行,因此每个子任务都有自己的内存空间存储临时数据,互不干扰。一般情况下,建议与cpu核心数一样,充分利用cpu运算资源。
operation:算子,运算单元,一个子任务包含一个或多个算子,例如上面的统计单词个数,明显就是一个sum操作,就是一个算子。
一个job的过程一般存在4个基本的元素:
environment:获取环境,一般就是指的集群环境。
source:读取数据源,就是上面的从kafka获取数据。
transform:就是算子操作,真正运算逻辑。
sink:输出,上面的输出的mysql。
新建一个maven项目
引入flink相关maven包,引入log4j相关maven包,flink默认使用log4j,这里引入log4j的包只是本地启动调试的需要,不然logger初始化会失败
<dependencies> <!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-api --> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.7.36</version> </dependency> <!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-reload4j --> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-reload4j</artifactId> <version>1.7.36</version> </dependency> <!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-reload4j --> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-reload4j</artifactId> <version>1.7.36</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-java --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>1.17.1</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java</artifactId> <version>1.17.1</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients</artifactId> <version>1.17.1</version> </dependency> </dependencies>
添加log4j.properties。 flink线上会使用自己的log4j配置,这里只是配置本地启动日志。
log4j.rootLogger=warn, console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss SSS} [%t] [%c] [%p] - %m%n
新建一个文本模拟数据
然后新建一个类BatchWordCount 对这个文件进行统计。
package com.example.flink; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.operators.AggregateOperator; import org.apache.flink.api.java.operators.DataSource; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.util.Collector; public class BatchWordCount { public static void main(String[] args) throws Exception{ //获取Environment ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); //读取文件到source DataSource<String> source = env.readTextFile("input/words.txt"); //统计source里的数据,每个单词出现次数。 AggregateOperator<Tuple2<String, Integer>> sum = source.flatMap((String in, Collector<Tuple2<String, Integer>> out) -> { String[] words = in.split(" "); for (String word : words) { out.collect(Tuple2.of(word, 1)); } }).returns(Types.TUPLE(Types.STRING, Types.INT)) .groupBy(0) .sum(1); //sink输出到控制台 sum.print(); } }
这里只是简单模拟了一个有界数据源(txt文件,数据有限),可以看到代码与上面简介的environment、source、transform、sink一一对应,即使再复杂的流程也离不开这四个步骤。
当然实际情况下,我们更多的是无界流数据源(例如kafka,socket,数据持续输入),因此下面我们新建一个StreamWordCount,通过socket持续等带数据输入。
package com.example.flink; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector; public class StreamWordCount { public static void main(String[] args) throws Exception{ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<String> source = env.socketTextStream("localhost",9999); SingleOutputStreamOperator<Tuple2<String, Integer>> sum = source.flatMap((String in, Collector<Tuple2<String, Integer>> out) -> { String[] words = in.split(" "); for (String word : words) { out.collect(Tuple2.of(word, 1)); } }).returns(Types.TUPLE(Types.STRING, Types.INT)) .keyBy(data -> data.f0) .sum(1); sum.print(); env.execute(); } } env.execute(); } }
可以看到 env以及source的类变成stream的,我们从9999端口获取数据
下面简单编写一个socketserver程序:
package com.example.flink; import java.io.IOException; import java.io.PrintWriter; import java.net.ServerSocket; import java.net.Socket; import java.util.Scanner; public class SocketSource { public static void main(String[] args) throws IOException { ServerSocket server = new ServerSocket(9999); Socket socket = server.accept(); System.out.println("请输入");//提示输入 PrintWriter pw = new PrintWriter(socket.getOutputStream()); while (true){ Scanner scan=new Scanner(System.in);//控制台输入控件 String str=scan.nextLine(); if(str.equals("exit")){ break; } pw.println(str); pw.flush(); } pw.close(); socket.close(); } }
启动server,并且启动StreamWordCount,然后再server输入一些数据
可以看到会有相应的输出,当本地启动的时候,flink-client 会模拟一个线上的flink集群,输出左边的数字代表虚拟slot的id,可以看到不同key会被分配到不同的slot运算输出,因为被分配到不同的slot,因此并不能保证运算时的顺序。
有时我们希望可以指定用多少个slot去运算,这时就需要修改并行度,我们可以通过env.setParallelism()全局设置,或者针对特定操作设置。
下面我们修改一下print这个操作的并行度为2,再运行一次看看
可以看到我们输出定义的是2并行,所以部分单词会被分配到同一个slot输出。
下面需要把这个任务真正放到flink集群,去模拟线上的场景。
我们先启动一个容器当作socketserver。
docker pull busybox
docker run -it --network mynet --name bb busybox
然后在容器里用nc命令监听9999端口
nc -l -p 9999
我们把监听的localhost改成一个真正要监听的地址,这里就是上面busybox容器的名字bb。然后通过maven 把上面的项目 package成jar包。
通过界面上传jar包,并且submit com.example.flink.StreamWordCount 这个启动类,设置默认并行度为4
如下图
在running job 可以看到,刚刚的job被拆分成3个子任务,source、flat map和agg&sink,因为source是只有一个线程连接bb:9999的socket并且读取数据,并行度只能是1,而其余操作都被设为我设置的4并行,代表有4个slot运行这些操作。因为sink只是输出,不需要运算,而且并行度与agg一样,因此agg和sink被合并到同一个子任务以减少子任务间的数据传输。
这里题外话一下,算子分可合并和不可合并,例如map,flatmap,filter这些只是过滤,转换的操作,因为不需要太多计算,flink默认他们都是可以合并的。而sum,max那些则属于计算量较大,不可合并的aggregation。
下面在busybox容器输入一些文本,可以看到job里面指标一些变化,数据接收以及不同slot的指标等。
我们通过docker logs tm1 和tm2 ,可以看到统计的输出,但是我们发现webui里的stdout并不能看到对应的输出,这是因为docker启动默认是前台启动,只在命令行里输出,没有输出到文件,因此webui里也获取不到,解决方法建议参考Flink docker 容器运行环境下不能够从Web UI 查看 Logs 以及Stdout的解决办法_Allocator的博客-CSDN博客,比较复杂这里就不管了。
下面举一个更贴切实际的例子
某个电商网站有以下订单表,包含订单id(order_id),商品id(product_id),商品名(product_name),店铺id(shop_id)
现在我很关注每个店铺卖《奥特曼》相关产品的总销量是多少,以调控市场。
常规做法是select count(*) from order where product_name like “%奥特曼%” group by shop_id
众所周知,like 前后都模糊匹配的话是不走索引的,当这个商品表到达千万级别,统计出来效率是非常低的。
这个时候我们就可以把binlog变化推送去kafka(例如用canal),然后使用flink流式计算,实时统计这个指标。
接下来我们来模拟一下,把之前的kafka容器启动起来,然后创建一个order topic
下面添加3个maven包
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka</artifactId> <version>1.17.1</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-jdbc --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-jdbc</artifactId> <version>3.1.1-1.17</version> </dependency> <!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java --> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>8.0.29</version> </dependency>
增加OrderCount类:
package com.example.flink; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.connector.jdbc.JdbcConnectionOptions; import org.apache.flink.connector.jdbc.JdbcExecutionOptions; import org.apache.flink.connector.jdbc.JdbcSink; import org.apache.flink.connector.kafka.source.KafkaSource; import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector; import org.apache.kafka.clients.consumer.OffsetResetStrategy; public class OrderCount { public static void main(String[] args) throws Exception{ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); KafkaSource<String> kafkaSource = KafkaSource.<String>builder() .setBootstrapServers("8.130.136.133:9092")//Kafka服务 .setTopics("order")//消息主题 .setGroupId("default")//消费组 //偏移量 当没有提交偏移量则从最开始开始消费 .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST)) .setValueOnlyDeserializer(new SimpleStringSchema()).build(); DataStreamSource<String> source = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "order"); SingleOutputStreamOperator<Tuple2<String, Integer>> sum = source.filter(in -> { String[] data = in.split(" "); String name = data[0]; String shopId = data[1]; return name.contains("奥特曼"); }).flatMap((String in, Collector<Tuple2<String, Integer>> out) -> { String[] data = in.split(" "); String name = data[0]; String shopId = data[1]; out.collect(new Tuple2<>(shopId, 1)); }).returns(Types.TUPLE(Types.STRING, Types.INT)) .keyBy(data -> data.f0) .sum(1); sum.addSink(JdbcSink.sink( "REPLACE INTO stat (shop_id, count) VALUES (?,?)", ((statement,tuple)->{ statement.setString(1, tuple.f0); statement.setInt(2, tuple.f1); }), JdbcExecutionOptions.builder().withBatchSize(1).build(), new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withUrl("jdbc:mysql://8.130.136.133:3306/test?characterEncoding=utf8&useSSL=false").withDriverName("com.mysql.cj.jdbc.Driver").withUsername("root").withPassword("123456").build() )); env.execute(); } }
可以看到代码里从kafka监听数据的变化,通过flatmap+keyby+sum实现对shopid的分组统计,最后通过jdbc的sink写入数据库。
然后向kafka写入4条消息
奥特曼之父 A
奥特曼之母 A
巨型奥特曼 B
钢铁侠 A
查看数据库结果
可以看到实时统计出来的数据已经被写入mysql,并且过滤了不相关的数据,统计出最终结果。
这个时候我们在业务上就可以直接用这张表查询统计结果,而不必时时刻刻都用sql去聚合统计。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。