赞
踩
编 写 Flink 程 序 的 第 一 步 , 就 是 创 建 执 行 环 境 。 我 们 要 获 取 的 执 行 环 境 , 是StreamExecutionEnvironment 类的对象,这是所有 Flink 程序的基础。在代码中创建执行环境的方式,就是调用这个类的静态方法,具体有以下三种。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
这种“智能”的方式不需要我们额外做判断,用起来简单高效,是最常用的一种创建执行环境的方式。
StreamExecutionEnvironment localEnv = StreamExecutionEnvironment.createLocalEnvironment();
StreamExecutionEnvironment remoteEnv = StreamExecutionEnvironment
.createRemoteEnvironment(
"host", // JobManager 主机名
1234, // JobManager 进程端口号(一般情况下是6123)
"path/to/jarFile.jar" // 提交给 JobManager 的 JAR 包
);
在获取到程序执行环境后,我们还可以对执行环境进行灵活的设置。比如可以全局设置程序的并行度、禁用算子链,还可以定义程序的时间语义、配置容错机制。关于时间语义和容错机制,我们会在后续的章节介绍。
上节中我们获取到的执行环境,是一个 StreamExecutionEnvironment,顾名思义它应该是做流处理的。那对于批处理,又应该怎么获取执行环境呢?
在之前的 Flink 版本中,批处理的执行环境与流处理类似,是调用类 ExecutionEnvironment的静态方法,返回它的对象:
// 批处理环境
ExecutionEnvironment batchEnv = ExecutionEnvironment.getExecutionEnvironment();
// 流处理环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
1. BATCH 模式的配置方法
由于 Flink 程序默认是 STREAMING 模式,我们这里重点介绍一下 BATCH 模式的配置。
主要有两种方式:
(1)通过命令行配置
bin/flink run -Dexecution.runtime-mode=BATCH …
在提交作业时,增加 execution.runtime-mode 参数,指定值为 BATCH。 (2)通过代码配置
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
在代码中,直接基于执行环境调用 setRuntimeMode 方法,传入 BATCH 模式。
建议: 不要在代码中配置,而是使用命令行。这同设置并行度是类似的:在提交作业时指定参数可以更加灵活,同一段应用程序写好之后,既可以用于批处理也可以用于流处理。而在代码中硬编码(hard code)的方式可扩展性比较差,一般都不推荐。
2. 什么时候选择 BATCH 模式
env.execute();
DataStream<String> stream = env.addSource(...);
表 5-1 Event类字段设计
- 1
字段名 | 数据类型 | 说明 |
---|---|---|
user | String | 用户名 |
url | String | 用户访问的 url |
timestamp | Long | 用户访问 url 的时间戳 |
具体代码如下:
package online.liujiahao.chapter05; import java.sql.Timestamp; public class Event { public String user; public String url; public Long timeStamp; public Event() { } public Event(String user, String url, Long timeStamp) { this.user = user; this.url = url; this.timeStamp = timeStamp; } @Override public String toString() { return "Event{" + "user='" + user + '\'' + ", url='" + url + '\'' + ", timeStamp=" + new Timestamp(timeStamp) + '}'; } }
这里需要注意,我们定义的 Event,有这样几个特点:
注:Java 编程比较好的实践是重写每一个类的 toString 方法,来自 Joshua Bloch 编写的《Effective Java》。
最简单的读取数据的方式,就是在代码中直接创建一个 Java 集合,然后调用执行环境的fromCollection 方法进行读取。这相当于将数据临时存储到内存中,形成特殊的数据结构后,作为数据源使用,一般用于测试。
package online.liujiahao.chapter05; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import java.util.ArrayList; /* 这部分主要是针对Source读取数据源的api的调用 */ public class SourceTest { public static void main(String[] args) throws Exception{ //创建执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); //1.从文件中直接读取数据(读取的批量的数据(有界),流式方式输出) //todo 最常用的读取有界流数据的方式还是方式1(从文件中读取),后面两种一般是用于测试环境中。 DataStreamSource<String> stream1 = env.readTextFile("input/click.txt"); //2.从集合中读取数据(.fromCollection) ArrayList<Integer> nums = new ArrayList<>(); nums.add(2); nums.add(5); DataStreamSource<Integer> numStream = env.fromCollection(nums); ArrayList<Event> events = new ArrayList<>(); events.add(new Event("Mary", "./home", 1000L)); events.add(new Event("Bob", "./cart", 2000L)); DataStreamSource<Event> stream2 = env.fromCollection(events); //从元素中读取数据 DataStreamSource<Event> stream3 = env.fromElements( new Event("Mary", "./home", 1000L), new Event("Bob", "./cart", 2000L), new Event("Alice","./prod?id=100",3000L) ); // stream1.print("1"); // numStream.print("nums"); // stream2.print("2"); // stream3.print("3"); //4. 从socket文本流中直接读取 DataStreamSource<String> stream4 = env.socketTextStream("hadoop102", 7777); stream4.print("4"); env.execute(); } }
我们也可以不构建集合,直接将元素列举出来,调用 fromElements 方法进行读取数据:
DataStreamSource<Event> stream2 = env.fromElements(
new Event("Mary", "./home", 1000L),
new Event("Bob", "./cart", 2000L)
);
真正的实际应用中,自然不会直接将数据写在代码中。通常情况下,我们会从存储介质中获取数据,一个比较常见的方式就是读取日志文件。这也是批处理中最常见的读取方式。
DataStream<String> stream = env.readTextFile("clicks.txt");
说明:
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.7.5</version>
<scope>provided</scope>
</dependency>
DataStream<String> stream = env.socketTextStream("localhost", 7777);
kafka数据源本身传入的FlinkKafkaConsumer类继承自FlinkKafkaConsumerBase
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
然后调用 env.addSource(),传入 FlinkKafkaConsumer 的对象实例就可以了。
package online.liujiahao.chapter05; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import java.util.Properties; public class SourceKafkaTest { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "hadoop102:9092"); properties.setProperty("group.id", "consumer-group"); properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); properties.setProperty("auto.offset.reset", "latest"); DataStreamSource<String> stream = env.addSource(new FlinkKafkaConsumer<String>( "clicks", new SimpleStringSchema(), properties )); stream.print("Kafka"); env.execute(); } }
创建 FlinkKafkaConsumer 时需要传入三个参数:
接下来我们创建一个自定义的数据源,实现 SourceFunction 接口。主要重写两个关键方法:run()和 cancel()。
package online.liujiahao.chapter05; import org.apache.flink.streaming.api.functions.source.SourceFunction; import java.util.Calendar; import java.util.Random; public class ClickSource implements SourceFunction<Event> { //申明一个标志位 private Boolean running = true; @Override public void run(SourceContext<Event> sourceContext) throws Exception { // 模拟真实环境,随机生成数据 Random random = new Random(); //定义字段选取的数据集 String[] users = {"Marry", "Alice", "Bob", "Cary"}; String[] urls = {"./home", "./cart", "./fav", "./prod?id=200", "./prod?id=10"}; //循环生成数据 while (running) { //在0-users.length-1 之间随机选取一个索引位置作为user的值。 String user = users[random.nextInt(users.length)]; //url的选取 String url = urls[random.nextInt(urls.length)]; //获取当前系统时间的一个 毫秒级的时间戳 Long timestamp = Calendar.getInstance().getTimeInMillis(); sourceContext.collect(new Event(user,url,timestamp)); Thread.sleep(1000L); } } @Override public void cancel() { running = false; } }
这个数据源,我们后面会频繁使用,所以在后面的代码中涉及到 ClickSource()数据源,使用上面的代码就可以了。
下面的代码我们来读取一下自定义的数据源。有了自定义的 source function,接下来只要调用 addSource()就可以了:
env.addSource(new ClickSource())
下面是完整的代码:
package online.liujiahao.chapter05; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction; import java.util.Random; public class SourceCustomTest { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); DataStreamSource<Integer> customStream = env.addSource(new ParallelCustomSource()).setParallelism(2); customStream.print(); env.execute(); } public static class ParallelCustomSource implements ParallelSourceFunction<Integer>{ private Boolean running = true; private Random random = new Random(); @Override public void run(SourceContext<Integer> ctx) throws Exception { while (running) { ctx.collect(random.nextInt()); } } @Override public void cancel() { running = false; } } }
这里要注意的是 SourceFunction 接口定义的数据源,并行度只能设置为 1,如果数据源设置为大于 1 的并行度,则会抛出异常。如下程序所示:
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import java.util.Random;
public class SourceThrowException {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.addSource(new ClickSource()).setParallelism(2).print();
env.execute();
} }
输出的异常如下:
Exception in thread "main" java.lang.IllegalArgumentException: The parallelism
of non parallel operator must be 1.
所以如果我们想要自定义并行的数据源的话,需要使用 ParallelSourceFunction,示例程序
如下:
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction; import java.util.Random; public class ParallelSourceExample { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.addSource(new CustomSource()).setParallelism(2).print(); env.execute(); } public static class CustomSource implements ParallelSourceFunction<Integer> { private boolean running = true; private Random random = new Random(); @Override public void run(SourceContext<Integer> sourceContext) throws Exception { while (running) { sourceContext.collect(random.nextInt()); } } @Override public void cancel() { running = false; } } }
输出结果如下:
2> -686169047
2> 429515397
2> -223516288
2> 1137907312
2> -380165730
2> 2082090389
我们已经了解了 Flink 怎样从不同的来源读取数据。在之前的代码中,我们的数据都是定义好的 UserBehavior 类型,而且在 5.2.1 小节中特意说明了对这个类的要求。那还有没有其他更灵活的类型可以用呢?Flink 支持的数据类型到底有哪些?
(4)辅助类型
Option、Either、List、Map 等
(5)泛型类型(GENERIC)
Flink 支持所有的 Java 类和 Scala 类。不过如果没有按照上面 POJO 类型的要求来定义,就会被 Flink 当作泛型类来处理。Flink 会把泛型类型当作黑盒,无法获取它们内部的属性;它们也不是由 Flink 本身序列化的,而是由 Kryo 序列化的。
在这些类型中,元组类型和 POJO 类型最为灵活,因为它们支持创建复杂类型。而相比之下,POJO 还支持在键(key)的定义中直接使用字段名,这会让我们的代码可读性大大增加。
所以,在项目实践中,往往会将流处理程序中的元素类型定为 Flink 的 POJO 类型。
Flink 对 POJO 类型的要求如下:
⚫ 类是公共的(public)和独立的(standalone,也就是说没有非静态的内部类);
⚫ 类有一个公共的无参构造方法;
⚫ 类中的所有字段是 public 且非 final 的;或者有一个公共的 getter 和 setter 方法,这些方法需要符合 Java bean 的命名规范。
所以我们看到,之前的 UserBehavior,就是我们创建的符合 Flink POJO 定义的数据类型。
.map(word -> Tuple2.of(word, 1L))
.returns(Types.TUPLE(Types.STRING, Types.LONG));
returns(new TypeHint<Tuple2<Integer, SomeType>>(){})
在java源码中,TypeInformation本身是一个抽象类,他就是flink所有数据类型的基类,也就是说所有在flink当中支持的数据类型的类型都属于TypeInformation
有个createSerializer抽象方法,它会为数据类型创建TypeSerializer类型的特定的序列化器
首先我们来介绍一些基本的转换算子。
我们只需要基于 DataStrema 调用 map()方法就可以进行转换处理。方法需要传入的参数是接口 MapFunction 的实现;返回值类型还是 DataStream,不过泛型(流中的元素类型)可能改变。
下面的代码用不同的方式,实现了提取 Event 中的 user 字段的功能。
先看map函数的源码:
点进map,可以看到是要传入一个MapFunction函数,再点进MapFunction:
发现MapFunction是一个接口,调用它就需要重写它的 map() 方法,MapFunction有两个范型,<T,O>,T 就是传入的数据类型,O就是经过Map转换后输出的数据类型。我们可以在代码里单独定义一个类去实现MapFunction接口。
接下来是第一种方法:用自定义方法去实现MapFunction接口
代码如下:
package online.liujiahao.chapter05; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; public class TransformMapTest { public static void main(String[] args)throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //全局并行度设为1,方便后面的串行测试 env.setParallelism(1); //从元素中读取数据 DataStreamSource<Event> stream = env.fromElements( new Event("Mary", "./home", 1000L), new Event("Bob", "./cart", 2000L), new Event("Alice","./prod?id=100",3000L) ); //进行转换,提取user字段 //1.使用自定义类,实现MapFunction接口 SingleOutputStreamOperator<String> result = stream.map(new MyMapper()); result.print(); env.execute(); } //MapFunction是一个接口,调用它就需要重写它的 map() 方法,MapFunction有两个范型,<T,O>,T 就是传入的数据类型,O就是经过Map转换后输出的数据类型。我们可以在代码里单独定义一个类去实现MapFunction接口。 //自定义MapFunction public static class MyMapper implements MapFunction<Event, String> { @Override public String map(Event value) throws Exception { return value.user; } } }
下面用匿名内部类的形式实现一次,
代码如下:
package online.liujiahao.chapter05; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; public class TransformMapTest { public static void main(String[] args)throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //全局并行度设为1,方便后面的串行测试 env.setParallelism(1); //从元素中读取数据 DataStreamSource<Event> stream = env.fromElements( new Event("Mary", "./home", 1000L), new Event("Bob", "./cart", 2000L), new Event("Alice","./prod?id=100",3000L) ); //进行转换,提取user字段 //1.使用自定义类,实现MapFunction接口 SingleOutputStreamOperator<String> result1 = stream.map(new MyMapper()); //2.使用匿名类去实现MapFunction接口 SingleOutputStreamOperator<String> result2 = stream.map(new MapFunction<Event, String>() { @Override public String map(Event value) throws Exception { return value.user; } }); //result1.print(); result2.print(); env.execute(); } //MapFunction是一个接口,调用它就需要重写它的 map() 方法,MapFunction有两个范型,<T,O>,T 就是传入的数据类型,O就是经过Map转换后输出的数据类型。我们可以在代码里单独定义一个类去实现MapFunction接口。 //自定义MapFunction public static class MyMapper implements MapFunction<Event, String> { @Override public String map(Event value) throws Exception { return value.user; } } }
还有没有更简单的方法呢?
其实对于只有一个方法的接口,使用lambda表达式就可以很轻松的实现。
代买如下:
package online.liujiahao.chapter05; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; public class TransformMapTest { public static void main(String[] args)throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //全局并行度设为1,方便后面的串行测试 env.setParallelism(1); //从元素中读取数据 DataStreamSource<Event> stream = env.fromElements( new Event("Mary", "./home", 1000L), new Event("Bob", "./cart", 2000L), new Event("Alice","./prod?id=100",3000L) ); //进行转换,提取user字段 //1.使用自定义类,实现MapFunction接口 SingleOutputStreamOperator<String> result1 = stream.map(new MyMapper()); //2.使用匿名类去实现MapFunction接口 SingleOutputStreamOperator<String> result2 = stream.map(new MapFunction<Event, String>() { @Override public String map(Event value) throws Exception { return value.user; } }); //3.传入lambda表达式去实现 SingleOutputStreamOperator<String> result3 = stream.map(data -> data.user); //result1.print(); //result2.print(); result3.print(); env.execute(); } //MapFunction是一个接口,调用它就需要重写它的 map() 方法,MapFunction有两个范型,<T,O>,T 就是传入的数据类型,O就是经过Map转换后输出的数据类型。我们可以在代码里单独定义一个类去实现MapFunction接口。 //自定义MapFunction public static class MyMapper implements MapFunction<Event, String> { @Override public String map(Event value) throws Exception { return value.user; } } }
我们可以发现直接传入lanbda表达式是最为简单的一种方式,但是java中存在范型擦除的问题,到时候就需要在后面加上return来返回转换结果的数据类型了。
注意:
DataStreamSource继承自SingleOutputStreamOperator
SingleOutputStreamOperator又继承自DataStream
所以我们得到的其实还是一个DataStream
MapFunction有两个范型,一个Event,一个String
相当于是从Event转换成了String,实际上就是两个DataStream之间的转换
进行 filter 转换之后的新数据流的数据类型与原数据流是相同的。filter 转换需要传入的参数需要实现 FilterFunction 接口,而 FilterFunction 内要实现 filter()方法,就相当于一个返回布尔类型的条件表达式。
下面的代码会将数据流中用户 Mary 的浏览行为过滤出来 。
package online.liujiahao.chapter05; import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; public class TramsformFilterTest { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); //从元素中读取数据 DataStreamSource<Event> stream = env.fromElements( new Event("Mary", "./home", 1000L), new Event("Bob", "./cart", 2000L), new Event("Alice","./prod?id=100",3000L) ); //1.传入一个实现了FileterFunction的类的对象 SingleOutputStreamOperator<Event> result1 = stream.filter(new MyFilter()); //2.传入一个匿名类实现FilterFunction接口 SingleOutputStreamOperator<Event> result2 = stream.filter(new FilterFunction<Event>() { @Override public boolean filter(Event event) throws Exception { return event.user.equals("Bob"); } }); //3. 传入lambda表达式 //这里filter不用考虑范型擦除,因为它输出的数据类型就是输入的数据类型 //这里就可以直接打印了。 stream.filter(data -> data.user.equals("Alice")).print("lambda: Alice click"); result1.print(); result2.print(); env.execute(); } // 实现一个自定义的FilterFunction public static class MyFilter implements FilterFunction<Event> { @Override public boolean filter(Event value) throws Exception { return value.user.equals("Mary"); } } }
输出结果如下:
同 map 一样,flatMap 也可以使用 Lambda 表达式或者 FlatMapFunction 接口实现类的方式来进行传参,返回值类型取决于所传参数的具体逻辑,可以与原数据流相同,也可以不同。
flatMap 操作会应用在每一个输入事件上面,FlatMapFunction 接口中定义了 flatMap 方法,用户可以重写这个方法,在这个方法中对输入数据进行处理,并决定是返回 0 个、1 个或多个结果数据。因此 flatMap 并没有直接定义返回值类型,而是通过一个“收集器”(Collector)来指定输出。希望输出结果时,只要调用收集器的.collect()方法就可以了;这个方法可以多次调用,也可以不调用。所以 flatMap 方法也可以实现 map 方法和 filter 方法的功能,当返回结果是 0 个的时候,就相当于对数据进行了过滤,当返回结果是 1 个的时候,相当于对数据进行了简单的转换操作。
flatMap 的使用非常灵活,可以对结果进行任意输出,下面就是一个例子:
package online.liujiahao.chapter05; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.typeinfo.TypeHint; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector; public class TransformFlatMapTest { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); //从元素中读取数据 DataStreamSource<Event> stream = env.fromElements( new Event("Mary", "./home", 1000L), new Event("Bob", "./cart", 2000L), new Event("Alice","./prod?id=100",3000L) ); //1. 实现一个自定义的FlatMapFunction stream.flatMap(new MyFlatMap()).print("1"); //2. 传入lambda表达式 stream.flatMap((Event value,Collector<String> out) -> { //对于"Mary",就是进行了一个Map操作 if (value.user.equals("Mary")) { out.collect(value.url); //对于"Bob",就是进行了一个flat操作,将数据打散成3条后输出 } else if (value.user.equals("Bob")) { out.collect(value.user); out.collect(value.url); out.collect(value.timeStamp.toString()); } //对于"Alice",就是进行了一个filter操作。被过滤掉了。 //todo 这里定义了lambda表达式后,用到了Collector去进行结果输出(Collector是有范型的),但是jvm不知道转换之后具体的数据类型是什么。 //所以这里要调用returns指明返回的数据类型。 }).returns(new TypeHint<String>() {}) .print("2"); env.execute(); } public static class MyFlatMap implements FlatMapFunction<Event, String> { @Override public void flatMap(Event event, Collector<String> out) throws Exception { out.collect(event.user); out.collect(event.url); //时间戳是Long类型的数据,Collector输出的范型是String,所以这里做toString转换。 out.collect(event.timeStamp.toString()); } } }
FlatMap的参数和Map有些许不同,T 还是输入的数据类型,后面那个Collector 用来输出转换后的数据。
因为Map是一对一,一次只返回一个数据,但是如果想实现一对多的返回,就需要先将数据都收集起来,然后一起返回。
这里的输出方式就是使用到了Collector :收集器。(调用一次,就输出一次。)
这个收集器里面有个collect方法,就是将数据收集起来,传递到下游去。
输出结果如下,可以看到,本来是三条数据,现在被按字段打散之后,变成9条数据输出。
也就是一个扁平化映射的过程。
加入Lambda表达式后输出的结果如下:
对于 Flink 而言,DataStream 是没有直接进行聚合的 API 的。因为我们对海量数据做聚合肯定要进行分区并行处理,这样才能提高效率。所以在 Flink 中,要做聚合,需要先进行分区;这个操作就是通过 keyBy 来完成的。
keyBy 是聚合前必须要用到的一个算子。keyBy 通过指定键(key),可以将一条流从逻辑上划分成不同的分区(partitions)。这里所说的分区,其实就是并行处理的子任务,也就对应着任务槽(task slot)。
基于不同的 key,流中的数据将被分配到不同的分区中去,如图 5-8 所示;这样一来,所有具有相同的 key 的数据,都将被发往同一个分区,那么下一步算子操作就将会在同一个 slot中进行处理了。
在内部,是通过计算 key 的哈希值(hash code),对分区数进行取模运算来实现的。所以这里 key 如果是 POJO 的话,必须要重写 hashCode()方法。
keyBy()方法需要传入一个参数,这个参数指定了一个或一组 key。有很多不同的方法来指定 key:比如对于 Tuple 数据类型,可以指定字段的位置或者多个位置的组合;对于 POJO 类型,可以指定字段的名称(String);另外,还可以传入 Lambda 表达式或者实现一个键选择器(KeySelector),用于说明从数据中提取 key 的逻辑。
相同Key的数据,一定被分配到同一个分区,不同Key的数据,有可能被分配到同一个分区,也可能被分配到不同的分区。
先简单看看keyBy()的源码:
它需要传入的参数是一个KeySelector
点进KeySelector:
发现它是一个interface,有一个需要重写的方法,和两个范型
IN :就是当前传入的数据类型
KEY:就是我们指定需要提取出的那个Key的类型
看重写的方法,参数列表是IN,就是要传入的数据,返回的是KEY,也就是我们要提取的Key
我们再看keyBy()的返回值
它的返回值就不是之前map和flatmap返回的SingleOutputStreamOperator这样的数据类型了,而是一个KeyedStream,我们可以把它看作是一个指定了Key的一个数据流,也叫 按键分区流
它本质上是对DataStream追加了一个key的信息,做了一个针对于key的逻辑上的分区。
所以keyBy不能将他作为一个算子,因为它本质上只是在原来的数据流上做了一个追加key的信息。
KeyedStream 是一个非常重要的数据结构,只有基于它才可以做后续的聚合操作(比如 sum,reduce);**而且它可以将当前算子任务的状态(state)也按照 key 进行划分、限定为仅对当前 key 有效。**关于状态的相关知识我们会在后面章节继续讨论。
我们可以以 id 作为 key 做一个分区操作,代码实现如下:
package online.liujiahao.chapter05; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; public class TransformSimpleAggTest { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); //从元素中读取数据 //拓展一下Bob得数据,方便之后做分组聚合操作 DataStreamSource<Event> stream = env.fromElements( new Event("Mary", "./home", 1000L), new Event("Bob", "./cart", 2000L), new Event("Alice","./prod?id=100",3000L), new Event("Bob", "./home", 2500L), new Event("Alice","./prod?id=1",3300L), new Event("Alice","./home",3500L), new Event("Bob", "./prod?id=10", 2000L), new Event("Alice","./prod?id=2",3800L), new Event("Alice","./prod?id=3",4500L) ); //按键分组之后进行聚合,提取当前用户最近 一次访问数据 //keyBy得范型和KeySelector得范型是一样的,按user分组,key的类型就是string。 stream.keyBy(new KeySelector<Event, String>() { @Override public String getKey(Event event) throws Exception { return event.user; } //最近一次访问,max取最大的时间戳 }).max("timeStamp") .print("max: "); stream.keyBy(data -> data.user) .maxBy("timeStamp") .print("maxBy: "); env.execute(); } }
这里区别一下max()和maxBy()两种方法。
此处max()有两个方法,一个传Int型的索引号,一个传String类型的字段名称,此前wordcount案例用的是第一种,这时候POJO类型的数据用索引号会报错,而且直接传字段名称显然更方便一些。
注意输出结果:
Bob的第二条数据来了之后,max的输出结果是,将timeStamp的最大值取出来做了输出,而这条Event数据的其他字段还是沿用的与第一条一样的,所以这里可以看出,max是对特定字段做聚合(后面再来Bob的数据的时候,它也是只跟新时间戳,而不跟新其他的字段)。
maxBy的输出结果是,找到最大timeStamp所在的那条字段,将其完整的输出。
因此我们可以看到,max和maxBy的适用场景是不同的。
public class TransTupleAggreationTest { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); DataStreamSource<Tuple2<String, Integer>> stream = env.fromElements( Tuple2.of("a", 1), Tuple2.of("a", 3), Tuple2.of("b", 3), Tuple2.of("b", 4) ); stream.keyBy(r -> r.f0).sum(1).print(); stream.keyBy(r -> r.f0).sum("f1").print(); stream.keyBy(r -> r.f0).max(1).print(); stream.keyBy(r -> r.f0).max("f1").print(); stream.keyBy(r -> r.f0).min(1).print(); stream.keyBy(r -> r.f0).min("f1").print(); stream.keyBy(r -> r.f0).maxBy(1).print(); stream.keyBy(r -> r.f0).maxBy("f1").print(); stream.keyBy(r -> r.f0).minBy(1).print(); stream.keyBy(r -> r.f0).minBy("f1").print(); env.execute(); } }
而如果数据流的类型是 POJO 类,那么就只能通过字段名称来指定,不能通过位置来指定
了。
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class TransPojoAggregationTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<Event> stream = env.fromElements(
new Event("Mary", "./home", 1000L),
new Event("Bob", "./cart", 2000L)
);
stream.keyBy(e -> e.user).max("timestamp").print(); // 指定字段名称
env.execute();
package online.liujiahao.chapter05; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; public class TransformReduceTest { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); // todo 需求:统计每个用户的访问总量,找出访问量最大的用户 DataStreamSource<Event> stream = env.fromElements( new Event("Mary", "./home", 1000L), new Event("Bob", "./cart", 2000L), new Event("Alice","./prod?id=100",3000L), new Event("Bob", "./home", 2500L), new Event("Alice","./prod?id=1",3300L), new Event("Alice","./home",3500L), new Event("Bob", "./prod?id=10", 2000L), new Event("Alice","./prod?id=2",3800L), new Event("Alice","./prod?id=3",4500L) ); //1. 像wordcount一样,先用map做一个转换,统计每个用户的访问频次 SingleOutputStreamOperator<Tuple2<String, Long>> clickByUser = stream.map(new MapFunction<Event, Tuple2<String, Long>>() { @Override public Tuple2<String, Long> map(Event event) throws Exception { return Tuple2.of(event.user, 1L); } //我们要提取的是二元组里的第一个元素,所以用传索引的方式 }).keyBy(data -> data.f0) .reduce(new ReduceFunction<Tuple2<String, Long>>() { @Override public Tuple2<String, Long> reduce(Tuple2<String, Long> stringLongTuple2, Tuple2<String, Long> t1) throws Exception { return Tuple2.of(stringLongTuple2.f0, stringLongTuple2.f1 + t1.f1); } }); //2. 找出访问量最大的用户 //问题来了:因为API规定必须基于kededStream的数据才能调用聚合的方法。所以对于clickByUser这样一个数据流(他返回的是SingleOutputStreamOperator这样的数据类型),想要做聚合操作的话,得先按键分区。 //现在已经无法按user分区了,因为要统计的是所有用户里访问量最大的。所以就可以将所有用户都放到一个组里面去。 SingleOutputStreamOperator<Tuple2<String, Long>> result = clickByUser.keyBy(data -> "key").reduce(new ReduceFunction<Tuple2<String, Long>>() { @Override public Tuple2<String, Long> reduce(Tuple2<String, Long> value1, Tuple2<String, Long> value2) throws Exception { return value1.f1 > value2.f1 ? value1 : value2; } }); result.print(); env.execute(); } }
reduce 同简单聚合算子一样,也要针对每一个 key 保存状态。因为状态不会清空,所以我们需要将 reduce 算子作用在一个有限 key 的流上。
接下来我们就对这几种编程方式做一个梳理总结。
import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; public class TransFunctionUDFTest { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); DataStreamSource<Event> clicks = env.fromElements( new Event("Mary", "./home", 1000L), new Event("Bob", "./cart", 2000L) ); DataStream<Event> stream = clicks.filter(new FlinkFilter()); stream.print(); env.execute(); } public static class FlinkFilter implements FilterFunction<Event> { @Override public boolean filter(Event value) throws Exception { return value.url.contains("home"); } } }
当然还可以通过匿名类来实现 FilterFunction 接口:
DataStream<String> stream=clicks.filter(new FilterFunction<Event>(){@Overridepublic boolean filter(Event value)throws Exception{ return value.url.contains("home"); }});
为了类可以更加通用,我们还可以将用于过滤的关键字"home"抽象出来作为类的属性,调用构造方法时传进去。
DataStream<Event> stream = clicks.filter(new KeyWordFilter("home"));
public static class KeyWordFilter implements FilterFunction<Event> {
private String keyWord;
KeyWordFilter(String keyWord) {
this.keyWord = keyWord;
}
@Override
public boolean filter(Event value) throws Exception {
return value.url.contains(this.keyWord);
}
}
import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; public class TransFunctionLambdaTest { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); DataStreamSource<Event> clicks = env.fromElements( new Event("Mary", "./home", 1000L), new Event("Bob", "./cart", 2000L) ); //map 函数使用 Lambda 表达式,返回简单类型,不需要进行类型声明 DataStream<String> stream1 = clicks.map(event -> event.url); stream1.print(); env.execute(); } }
// flatMap 使用 Lambda 表达式,抛出异常
DataStream<String> stream2 = clicks.flatMap((event, out) -> {
out.collect(event.url);
});
stream2.print();
如果执行程序,Flink 会抛出如下异常:
org.apache.flink.api.common.functions.InvalidTypesException: The generic type parameters of 'Collector' are missing.
In many cases lambda methods don't provide enough information for automatic type extraction when Java generics are involved.
An easy workaround is to use an (anonymous) class instead that implements the 'org.apache.flink.api.common.functions.FlatMapFunction' interface.
Otherwise the type has to be specified explicitly using type information.
在这种情况下,我们需要显式地指定类型信息,否则输出将被视为 Object 类型,这会导致低效的序列化。
// flatMap 使用 Lambda 表达式,必须通过 returns 明确声明返回类型
DataStream<String> stream2 = clicks.flatMap((Event event, Collector<String>
out) -> {
out.collect(event.url);
}).returns(Types.STRING);
stream2.print();
当使用 map() 函数返回 Flink 自定义的元组类型时也会发生类似的问题。下例中的函数签名 Tuple2<String, Long> map(Event value) 被类型擦除为 Tuple2 map(Event value)。
//使用 map 函数也会出现类似问题,以下代码会报错
DataStream<Tuple2<String, Long>> stream3 = clicks
.map( event -> Tuple2.of(event.user, 1L) );
stream3.print();
一般来说,这个问题可以通过多种方式解决:
import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; public class ReturnTypeResolve { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); DataStreamSource<Event> clicks = env.fromElements( new Event("Mary", "./home", 1000L), new Event("Bob", "./cart", 2000L) ); // 想要转换成二元组类型,需要进行以下处理 // 1) 使用显式的 ".returns(...)" DataStream<Tuple2<String, Long>> stream3 = clicks .map( event -> Tuple2.of(event.user, 1L) ) .returns(Types.TUPLE(Types.STRING, Types.LONG)); stream3.print(); // 2) 使用类来替代 Lambda 表达式 clicks.map(new MyTuple2Mapper()) .print(); // 3) 使用匿名类来代替 Lambda 表达式 clicks.map(new MapFunction<Event, Tuple2<String, Long>>() { @Override public Tuple2<String, Long> map(Event value) throws Exception { return Tuple2.of(value.user, 1L); } }).print(); env.execute(); } // 自定义 MapFunction 的实现类 public static class MyTuple2Mapper implements MapFunction<Event, Tuple2<String, Long>>{ @Override public Tuple2<String, Long> map(Event value) throws Exception { return Tuple2.of(value.user, 1L); } } }
这些方法对于其它泛型擦除的场景同样适用。
package online.liujiahao.chapter05; import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; public class TransformRichFunctionTest { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); DataStreamSource<Event> stream = env.fromElements(new Event("Mary", "./home", 1000L), new Event("Bob", "./cart", 2000L), new Event("Alice", "./prod?id=100", 3000L), new Event("Bob", "./prod?id=1", 3100L), new Event("Bob", "./prod?id=2", 3600L), new Event("Bob", "./prod?id=3", 3800L), new Event("Bob", "./prod?id=0", 4200L)); stream.map(new MyRichMapper()).setParallelism(2).print(); env.execute(); } // 实现一个自定义的富函数类 public static class MyRichMapper extends RichMapFunction<Event,Integer>{ @Override public void open(Configuration parameters) throws Exception { super.open(parameters); System.out.println("open生命周期被调用" + getRuntimeContext().getIndexOfThisSubtask()+"号任务启动"); } @Override public Integer map(Event event) throws Exception { return event.url.length(); } @Override public void close() throws Exception { super.close(); System.out.println("close生命周期被调用" + getRuntimeContext().getIndexOfThisSubtask()+"号任务结束"); } } }
输出结果是:
这里可以看到:
这里我们将并行度改为2
执行后结果为:
每一个子任务都会调用一次open和close。
一个常见的应用场景就是,如果我们希望连接到一个外部数据库进行读写操作,那么将连接操作放在 map()中显然不是个好选择——因为每来一条数据就会重新连接一次数据库;所以我们可以在 open()中建立连接,在 map()中读写数据,而在 close()中关闭连接。所以我们推荐的最佳实践如下:
public class MyFlatMap extends RichFlatMapFunction<IN, OUT>> {
@Override
public void open(Configuration configuration) {
// 做一些初始化工作
// 例如建立一个和 MySQL 的连接
}
@Override
public void flatMap(IN in, Collector<OUT out) {
// 对数据库进行读写
}
@Override
public void close() {
// 清理工作,关闭和 MySQL 数据库的连接。
}
}
另外,富函数类提供了 getRuntimeContext()方法(我们在本节的第一个例子中使用了一下),可以获取到运行时上下文的一些信息,例如程序执行的并行度,任务名称,以及状态(state)。这使得我们可以大大扩展程序的功能,特别是对于状态的操作,使得 Flink 中的算子具备了处理复杂业务的能力。关于 Flink 中的状态管理和状态编程,我们会在后续章节逐渐展开。
package online.liujiahao.chapter05; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; public class TransformPartitionTest { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); // todo 需求:统计每个用户的访问总量,找出访问量最大的用户 DataStreamSource<Event> stream = env.fromElements( new Event("Mary", "./home", 1000L), new Event("Bob", "./cart", 2000L), new Event("Alice","./prod?id=100",3000L), new Event("Bob", "./home", 2500L), new Event("Alice","./prod?id=1",3300L), new Event("Alice","./home",3500L), new Event("Bob", "./prod?id=10", 2000L), new Event("Alice","./prod?id=2",3800L), new Event("Alice","./prod?id=3",4500L) ); //1.随机分区 //shuffle是一个平均分区的算子,就是当上下游数据并行度不一致时,他就将数据平均分配到下游的不同分区上面去。 stream.shuffle().print().setParallelism(4); env.execute(); } }
可以得到如下形式的输出结果:
2. 轮询分区(Round-Robin)
轮询也是一种常见的重分区方式。简单来说就是“发牌”,按照先后顺序将数据做依次分发,如图 5-10 所示。通过调用 DataStream 的.rebalance()方法,就可以实现轮询重分区。rebalance使用的是 Round-Robin 负载均衡算法,可以将输入流数据平均分配到下游的并行任务中去。
注:Round-Robin 算法用在了很多地方,例如 Kafka 和 Nginx。
我们同样可以在代码中进行测试:
package online.liujiahao.chapter05; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; public class TransformPartitionTest { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); // todo 需求:统计每个用户的访问总量,找出访问量最大的用户 DataStreamSource<Event> stream = env.fromElements( new Event("Mary", "./home", 1000L), new Event("Bob", "./cart", 2000L), new Event("Alice","./prod?id=100",3000L), new Event("Bob", "./home", 2500L), new Event("Alice","./prod?id=1",3300L), new Event("Alice","./home",3500L), new Event("Bob", "./prod?id=10", 2000L), new Event("Alice","./prod?id=2",3800L), new Event("Alice","./prod?id=3",4500L) ); //2.轮询分区(如果上下游并行度不同,Flink默认调用的就是这种) stream.rebalance().print().setParallelism(4); env.execute(); } }
得到的结果:
如果用发牌来比喻的话,相当于像下游的分区轮询发牌。
代码实现如下:
package online.liujiahao.chapter05; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; public class TransformPartitionTest { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); // todo 需求:统计每个用户的访问总量,找出访问量最大的用户 DataStreamSource<Event> stream = env.fromElements( new Event("Mary", "./home", 1000L), new Event("Bob", "./cart", 2000L), new Event("Alice","./prod?id=100",3000L), new Event("Bob", "./home", 2500L), new Event("Alice","./prod?id=1",3300L), new Event("Alice","./home",3500L), new Event("Bob", "./prod?id=10", 2000L), new Event("Alice","./prod?id=2",3800L), new Event("Alice","./prod?id=3",4500L) ); //1.随机分区 //shuffle是一个平均分区的算子,就是当上下游数据并行度不一致时,他就将数据平均分配到下游的不同分区上面去。 //stream.shuffle().print().setParallelism(4); //2.轮询分区(如果上下游并行度不同,Flink默认调用的就是这种) //stream.rebalance().print().setParallelism(4); //3. rescale重缩放分区(分区轮询) env.addSource(new RichParallelSourceFunction<Integer>() { @Override public void run(SourceContext<Integer> ctx) throws Exception { for (int i = 1; i <= 8; i++) { //将奇偶数分别发送到0号和1号并行分区 if (i % 2 == getRuntimeContext().getIndexOfThisSubtask()) { ctx.collect(i); } } } @Override public void cancel() { } }).setParallelism(2) .rescale() .print() .setParallelism(4); env.execute(); } }
输出结果如下:
奇数都是由3 4 两个线程输出。偶数由1 2 两个线程输出
可以将 rescale 方法换成 rebalance 方法,来体会一下这两种方法的区别。
package online.liujiahao.chapter05; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; public class TransformPartitionTest { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); // todo 需求:统计每个用户的访问总量,找出访问量最大的用户 DataStreamSource<Event> stream = env.fromElements( new Event("Mary", "./home", 1000L), new Event("Bob", "./cart", 2000L), new Event("Alice","./prod?id=100",3000L), new Event("Bob", "./home", 2500L), new Event("Alice","./prod?id=1",3300L), new Event("Alice","./home",3500L), new Event("Bob", "./prod?id=10", 2000L), new Event("Alice","./prod?id=2",3800L), new Event("Alice","./prod?id=3",4500L) ); //1.随机分区 //shuffle是一个平均分区的算子,就是当上下游数据并行度不一致时,他就将数据平均分配到下游的不同分区上面去。 //stream.shuffle().print().setParallelism(4); //2.轮询分区(如果上下游并行度不同,Flink默认调用的就是这种) //stream.rebalance().print().setParallelism(4); //3. rescale重缩放分区(分区轮询) env.addSource(new RichParallelSourceFunction<Integer>() { @Override public void run(SourceContext<Integer> ctx) throws Exception { for (int i = 1; i <= 8; i++) { //将奇偶数分别发送到0号和1号并行分区 if (i % 2 == getRuntimeContext().getIndexOfThisSubtask()) { ctx.collect(i); } } } @Override public void cancel() { } }).setParallelism(2) // .rescale() // .print() .setParallelism(4); //4. 广播 stream.broadcast().print().setParallelism(4); env.execute(); } }
结果如下:
相当于每条数据都被输出了4次。
5. 全局分区(global)
全局分区也是一种特殊的分区方式。这种做法非常极端,通过调用.global()方法,会将所有的输入流数据都发送到下游算子的第一个并行子任务中去。这就相当于强行让下游任务并行度变成了 1,所以使用这个操作需要非常谨慎,可能对程序造成很大的压力。
package online.liujiahao.chapter05; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; public class TransformPartitionTest { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); // todo 需求:统计每个用户的访问总量,找出访问量最大的用户 DataStreamSource<Event> stream = env.fromElements( new Event("Mary", "./home", 1000L), new Event("Bob", "./cart", 2000L), new Event("Alice","./prod?id=100",3000L), new Event("Bob", "./home", 2500L), new Event("Alice","./prod?id=1",3300L), new Event("Alice","./home",3500L), new Event("Bob", "./prod?id=10", 2000L), new Event("Alice","./prod?id=2",3800L), new Event("Alice","./prod?id=3",4500L) ); //1.随机分区 //shuffle是一个平均分区的算子,就是当上下游数据并行度不一致时,他就将数据平均分配到下游的不同分区上面去。 //stream.shuffle().print().setParallelism(4); //2.轮询分区(如果上下游并行度不同,Flink默认调用的就是这种) //stream.rebalance().print().setParallelism(4); //3. rescale重缩放分区(分区轮询) env.addSource(new RichParallelSourceFunction<Integer>() { @Override public void run(SourceContext<Integer> ctx) throws Exception { for (int i = 1; i <= 8; i++) { //将奇偶数分别发送到0号和1号并行分区 if (i % 2 == getRuntimeContext().getIndexOfThisSubtask()) { ctx.collect(i); } } } @Override public void cancel() { } }).setParallelism(2) // .rescale() // .print() .setParallelism(4); //4. 广播 //stream.broadcast().print().setParallelism(4); //5.全局分区 stream.global().print().setParallelism(4); env.execute(); } }
结果如下:
6. 自定义分区(Custom)
package online.liujiahao.chapter05; import org.apache.flink.api.common.functions.Partitioner; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; import javax.xml.crypto.dsig.keyinfo.KeyInfo; public class TransformPartitionTest { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); // todo 需求:统计每个用户的访问总量,找出访问量最大的用户 DataStreamSource<Event> stream = env.fromElements( new Event("Mary", "./home", 1000L), new Event("Bob", "./cart", 2000L), new Event("Alice","./prod?id=100",3000L), new Event("Bob", "./home", 2500L), new Event("Alice","./prod?id=1",3300L), new Event("Alice","./home",3500L), new Event("Bob", "./prod?id=10", 2000L), new Event("Alice","./prod?id=2",3800L), new Event("Alice","./prod?id=3",4500L) ); //1.随机分区 //shuffle是一个平均分区的算子,就是当上下游数据并行度不一致时,他就将数据平均分配到下游的不同分区上面去。 //stream.shuffle().print().setParallelism(4); //2.轮询分区(如果上下游并行度不同,Flink默认调用的就是这种) //stream.rebalance().print().setParallelism(4); //3. rescale重缩放分区(分区轮询) env.addSource(new RichParallelSourceFunction<Integer>() { @Override public void run(SourceContext<Integer> ctx) throws Exception { for (int i = 1; i <= 8; i++) { //将奇偶数分别发送到0号和1号并行分区 if (i % 2 == getRuntimeContext().getIndexOfThisSubtask()) { ctx.collect(i); } } } @Override public void cancel() { } }).setParallelism(2) // .rescale() // .print() .setParallelism(4); //4. 广播 //stream.broadcast().print().setParallelism(4); //5.全局分区 //stream.global().print().setParallelism(4); //6. 自定义分区 env.fromElements(1,2,3,4,5,6,7,8) .partitionCustom(new Partitioner<Integer>() { @Override public int partition(Integer key, int numPartitions) { return key % 2; } }, new KeySelector<Integer, Integer>() { @Override public Integer getKey(Integer value) throws Exception { return value; } }).print().setParallelism(4); env.execute(); } }
输入如下:
虽然并行度是4,但是我们是对2取余,所以只会有两个线程有输出结果,取余的结果分别是0和1,0就对应1号线程,1就对应2号线程。
Flink 作为数据处理框架,最终还是要把计算处理的结果写入外部存储,为外部应用提供支持,如图 5-12 所示,本节将主要讲解 Flink 中的 Sink 操作。我们已经了解了 Flink 程序如何对数据进行读取、转换等操作,最后一步当然就应该将结果数据保存或输出到外部系统了。
public DataStreamSink<T> print(String sinkIdentifier) {
PrintSinkFunction<T> printFunction = new PrintSinkFunction<>(sinkIdentifier, false);
return addSink(printFunction).name("Print to Std. Out");
}
与 Source 算子非常类似,除去一些 Flink 预实现的 Sink,一般情况下Sink 算子的创建是通过调用 DataStream 的.addSink()方法实现的。
stream.addSink(new SinkFunction(…));
addSource 的参数需要实现一个 SourceFunction 接口;类似地,addSink 方法同样需要传入一个参数,实现的是 SinkFunction 接口。在这个接口中只需要重写一个方法 invoke(),用来将指定的值写入到外部系统中。这个方法在每条数据记录到来时都会调用:
default void invoke(IN value, Context context) throws Exception
当然,SinkFuntion 多数情况下同样并不需要我们自己实现。Flink 官方提供了一部分的框架的 Sink 连接器。如图 5-13 所示,列出了 Flink 官方目前支持的第三方系统连接器:
我们可以看到,像 Kafka 之类流式系统,Flink 提供了完美对接,source/sink 两端都能连接,可读可写;而对于 Elasticsearch、文件系统(FileSystem)、JDBC 等数据存储系统,则只提供了输出写入的 sink 连接器。
除 Flink 官方之外,Apache Bahir 作为给 Spark 和 Flink 提供扩展支持的项目,也实现了一些其他第三方系统与 Flink 的连接器,如图 5-14 所示。
除此以外,就需要用户自定义实现 sink 连接器了。
接下来,我们就选取一些常见的外部系统进行展开讲解。
下面我们就以行编码为例,将一些测试数据直接写入文件:
package online.liujiahao.chapter05; import org.apache.flink.api.common.serialization.Encoder; import org.apache.flink.api.common.serialization.SimpleStringEncoder; import org.apache.flink.core.fs.Path; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink; import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy; import java.util.concurrent.TimeUnit; public class SinkToFilesTest { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(4); DataStreamSource<Event> stream = env.fromElements(new Event("Mary", "./home", 1000L), new Event("Bob", "./cart", 2000L), new Event("Alice", "./prod?id=100", 3000L), new Event("Bob", "./prod?id=1", 3100L), new Event("Bob", "./prod?id=2", 3600L), new Event("Bob", "./prod?id=3", 3800L), new Event("Bob", "./prod?id=0", 4200L)); StreamingFileSink<String> streamingFileSink = StreamingFileSink.<String>forRowFormat(new Path("./output"), new SimpleStringEncoder<>("UTF-8")) .withRollingPolicy( DefaultRollingPolicy.builder() .withMaxPartSize(1024 * 1024 * 1024) .withRolloverInterval(TimeUnit.MINUTES.toMillis(15)) .withInactivityInterval(TimeUnit.MINUTES.toMillis(5)) .build() ) .build(); stream.map(data -> data.toString()) .addSink(streamingFileSink); env.execute(); } }
这里我们创建了一个简单的文件 Sink,通过.withRollingPolicy()方法指定了一个“滚动策略”。“滚动”的概念在日志文件的写入中经常遇到:因为文件会有内容持续不断地写入,所以我们应该给一个标准,到什么时候就开启新的文件,将之前的内容归档保存。也就是说,上面的代码设置了在以下 3 种情况下,我们就会滚动分区文件:
Kafka 是一个分布式的基于发布/订阅的消息系统,本身处理的也是流式数据,所以跟Flink“天生一对”,经常会作为 Flink 的输入数据源和输出系统。Flink 官方为 Kafka 提供了 Source和 Sink 的连接器,我们可以用它方便地从 Kafka 读写数据。如果仅仅是支持读写,那还说明不了 Kafka 和 Flink 关系的亲密;真正让它们密不可分的是,Flink 与 Kafka 的连接器提供了端到端的精确一次(exactly once)语义保证,这在实际项目中是最高级别的一致性保证。关于这部分内容,我们会在后续章节做更详细的讲解。
现在我们要将数据输出到 Kafka,整个数据处理的闭环已经形成,所以可以完整测试如下:
代码如下
package online.liujiahao.chapter05; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; import java.util.Properties; public class SinkToKafkaTest { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); //1.从kafka中读取数 Properties properties = new Properties(); properties.setProperty("bootstrap.servers","hadoop102:9092"); DataStreamSource<String> kafkaStream = env.addSource(new FlinkKafkaConsumer<String>("clicks", new SimpleStringSchema(), properties)); //2.用flink进行数处理 SingleOutputStreamOperator<String> result = kafkaStream.map(new MapFunction<String, String>() { @Override public String map(String value) throws Exception { String[] fields = value.split(","); //trim() 用来去除前后的空白字段 return new Event(fields[0].trim(), fields[1].trim(), Long.valueOf(fields[2].trim())).toString(); } }); //结果写入kafka result.addSink(new FlinkKafkaProducer<String>("hadoop102:9092","events",new SimpleStringSchema())); env.execute(); } }
我们可以看到消费者可以正常消费数据,证明向 Kafka 写入数据成功。另外,我们也可以读取 5.2 节中介绍过的任意数据源,进行更多的完整测试。比较有趣的一个实验是,我们可以同时将 Kafka 作为 Flink 程序的数据源和写入结果的外部系统。只要将输入和输出的数据设置为不同的 topic,就可以看到整个系统运行的路径:Flink 从 Kakfa 的一个 topic 读取消费数据,然后进行处理转换,最终将结果数据写入 Kafka 的另一个 topic——数据从 Kafka 流入、经 Flink处理后又流回到 Kafka 去,这就是所谓的“数据管道”应用。
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>flink-connector-redis_2.11</artifactId>
<version>1.0</version>
</dependency>
(2)启动 Redis 集群
这里我们为方便测试,只启动了单节点 Redis。
(3)编写输出到 Redis 的示例代码
连接器为我们提供了一个 RedisSink,它继承了抽象类 RichSinkFunction,这就是已经实现好的向 Redis 写入数据的 SinkFunction。我们可以直接将 Event 数据输出到 Redis:
package online.liujiahao.chapter05; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.redis.RedisSink; import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisConfigBase; import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig; import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand; import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription; import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper; public class SinkToRedisTest { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); DataStreamSource<Event> stream = env.addSource(new ClickSource()); //创建一个jedis连接配置 FlinkJedisPoolConfig config = new FlinkJedisPoolConfig.Builder() .setHost("hadoop102") .build(); //写入到redis stream.addSink(new RedisSink<>(config,new MyRedisMapper())); env.execute(); } //自定义类实现RedisMapper接口 public static class MyRedisMapper implements RedisMapper<Event>{ @Override public RedisCommandDescription getCommandDescription() { return new RedisCommandDescription(RedisCommand.HSET,"clicks"); } @Override public String getKeyFromData(Event data) { return data.user; } @Override public String getValueFromData(Event data) { return data.url; } } }
这里 RedisSink 的构造方法需要传入两个参数:
我们会发现,这里的数据是覆盖写入的!
ElasticSearch 是一个分布式的开源搜索和分析引擎,适用于所有类型的数据。ElasticSearch有着简洁的 REST 风格的 API,以良好的分布式特性、速度和可扩展性而闻名,在大数据领域应用非常广泛。
Flink 为 ElasticSearch 专门提供了官方的 Sink 连接器,Flink 1.13 支持当前最新版本的ElasticSearch。
写入数据的 ElasticSearch 的测试步骤如下。
(1)添加 Elasticsearch 连接器依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch7_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
(2)启动 Elasticsearch 集群
(3)编写输出到 Elasticsearch 的示例代码
import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction ; import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer; import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink; import org.apache.http.HttpHost; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.client.Requests; import java.sql.Timestamp; import java.util.ArrayList; import java.util.HashMap; public class SinkToEsTest { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); DataStreamSource<Event> stream = env.fromElements( new Event("Mary", "./home", 1000L), new Event("Bob", "./cart", 2000L), new Event("Alice", "./prod?id=100", 3000L), new Event("Alice", "./prod?id=200", 3500L), new Event("Bob", "./prod?id=2", 2500L), new Event("Alice", "./prod?id=300", 3600L), new Event("Bob", "./home", 3000L), new Event("Bob", "./prod?id=1", 2300L), new Event("Bob", "./prod?id=3", 3300L)); ArrayList<HttpHost> httpHosts = new ArrayList<>(); httpHosts.add(new HttpHost("hadoop102", 9200, "http")); // 创建一个 ElasticsearchSinkFunction ElasticsearchSinkFunction<Event> elasticsearchSinkFunction = new ElasticsearchSinkFunction<Event>() { @Override public void process(Event element, RuntimeContext ctx, RequestIndexer indexer) { HashMap<String, String> data = new HashMap<>(); data.put(element.user, element.url); IndexRequest request = Requests.indexRequest() .index("clicks") .type("type") // Es 6 必须定义 type .source(data); indexer.add(request); } }; stream.addSink(new ElasticsearchSink.Builder<Event>(httpHosts, elasticsearchSinkFunction).build()); stream.addSink(esBuilder.build()); env.execute(); } }
{ "took" : 5, "timed_out" : false, "_shards" : { "total" : 1, "successful" : 1, "skipped" : 0, "failed" : 0 }, "hits" : "total" : { "value" : 9, "relation" : "eq" }, "max_score" : 1.0, "hits" : [ { "_index" : "clicks", "_type" : "_doc", "_id" : "dAxBYHoB7eAyu-y5suyU", "_score" : 1.0, "_source" : { "Mary" : "./home" } } ... ] } }
关系型数据库有着非常好的结构化数据设计、方便的 SQL 查询,是很多企业中业务数据存储的主要形式。MySQL 就是其中的典型代表。尽管在大数据处理中直接与 MySQL 交互的场景不多,但最终处理的计算结果是要给外部应用消费使用的,而外部应用读取的数据存储往往就是 MySQL。所以我们也需要知道如何将数据输出到 MySQL 这样的传统数据库。
写入数据的 MySQL 的测试步骤如下。
(1)添加依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.47</version>
</dependency>
(2)启动 MySQL,在 database 库下建表 clicks
mysql> create table clicks(
-> user varchar(20) not null,
-> url varchar(100) not null);
(3)编写输出到 MySQL 的示例代码
package online.liujiahao.chapter05; import org.apache.flink.connector.jdbc.JdbcConnectionOptions; import org.apache.flink.connector.jdbc.JdbcSink; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; public class SinkToMysql { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(4); DataStreamSource<Event> stream = env.fromElements(new Event("Mary", "./home", 1000L), new Event("Bob", "./cart", 2000L), new Event("Alice", "./prod?id=100", 3000L), new Event("Bob", "./prod?id=1", 3100L), new Event("Bob", "./prod?id=2", 3600L), new Event("Bob", "./prod?id=3", 3800L), new Event("Bob", "./prod?id=0", 4200L)); //JDBC连接,对应的就用JDBCSink,并且JdbcSink没有直接提供SinkFunction,他是提供了一个sink()方法,它的返回结果是SinkFunction。 stream.addSink(JdbcSink.sink( //第一个参数 sql "INSERT INTO clicks (user,url) VALUES (? , ?)", //第二个参数 JdbcStatementBuilder 点进去它本身并没有方法,他的父接口有一个accept方法。具体我们实现的也是他的accept方法。 ((statement, evnet) -> { statement.setString(1, evnet.user); statement.setString(2, evnet.url); }), //第三个参数 JDBC的连接参数 new JdbcConnectionOptions.JdbcConnectionOptionsBuilder() //指定Mysql的Url .withUrl("jdbc:mysql://hadoop102:3306/test") //指定Mysql驱动 .withDriverName("com.mysql.jdbc.Driver") .withUsername("root") .withPassword("123456") .build() )); env.execute(); } }
(4)运行代码,用客户端连接 MySQL,查看是否成功写入数据。
成功写入!
(1)导入依赖
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>${hbase.version}</version>
</dependency>
(2)编写输出到 HBase 的示例代码
import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Table; import java.nio.charset.StandardCharsets; public class SinkCustomtoHBase { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); env.fromElements("hello", "world") .addSink( new RichSinkFunction<String>() { public org.apache.hadoop.conf.Configuration configuration; // 管理 Hbase 的配置信息,这里因为 Configuration 的重名问题,将类以完整路径导入 public Connection connection; // 管理 Hbase 连接 @Override public void open(Configuration parameters) throws Exception { super.open(parameters); configuration = HBaseConfiguration.create(); configuration.set("hbase.zookeeper.quorum", "hadoop102:2181"); connection = ConnectionFactory.createConnection(configuration); } @Override public void invoke(String value, Context context) throws Exception { Table table = connection.getTable(TableName.valueOf("test")); // 表名为 test Put put = new Put("rowkey".getBytes(StandardCharsets.UTF_8)); // 指定 rowkey put.addColumn("info".getBytes(StandardCharsets.UTF_8) // 指定列名 , value.getBytes(StandardCharsets.UTF_8) // 写入的数据 , "1".getBytes(StandardCharsets.UTF_8)); // 写入的数据 table.put(put); // 执行 put 操作 table.close(); // 将表关闭 } @Override public void close() throws Exception { super.close(); connection.close(); // 关闭连接 } } ); env.execute(); } }
(3)可以在 HBase 查看插入的数据。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。