赞
踩
数据转是换将一个或多个DataSet 转换成为一个新的DataSet 。程序可以将多个DataSet 转换组合成复杂的DataSet。
对整个DataSet做一对一映射,即每个元素对应产生一个新元素。
data.map(new MapFunction<String, Integer>() {
public Integer map(String value) { return Integer.parseInt(value); }
});
获取一个元素并生成对应的零个、一个或多个元素。
data.flatMap(new FlatMapFunction<String, String>() {
public void flatMap(String value, Collector<String> out) {
for (String s : value.split(" ")) {
out.collect(s);
}
}
});
Map和FlatMap转换的对象是DataSet中的每个元素,而MapPartition转换的对象是DataSet的每个分区。
data.mapPartition(new MapPartitionFunction<String, Long>() {
public void mapPartition(Iterable<String> values, Collector<Long> out) {
long c = 0;
for (String s : values) {
c++;
}
out.collect(c);
}
})
过滤出符合条件的元素
data.filter(new FilterFunction<Integer>() {
public boolean filter(Integer value) { return value > 1000; }
});
根据DataSet的某个属性域进行升序或降序排序。
DataSet<Tuple2<String,Integer>> in = // [...]
DataSet<Integer> result = in.sortPartition(1, Order.ASCENDING)
.mapPartition(new PartitionMapper());
通过将两个元素反复组合为一个元素,将一组元素组合为一个元素。Reduce可以应用于完整的DataSet或分组的DataSet。
如果将reduce应用于分组DataSet,则可以通过向setCombineHint提供一个CombineHint来指定运行时执行reduce的组合阶段的方式。在大多数情况下,基于散列的策略应该更快,特别是具有很多不同键的时候。
data.reduce(new ReduceFunction<Integer> {
public Integer reduce(Integer a, Integer b) { return a + b; }
});
将一组值聚合为单个值。聚合函数可以看作是内置的reduce函数。聚合可以应用于完整的DataSet,也可以应用于分组的DataSet。
Aggregate 函数包括求和(SUM)、求最小值(SUM)、求最大值(MAX)。
根据String分组后,先对第一个Int求和,在对Double求最小值:
public static void main(String[] args) throws Exception { final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSet<Tuple3<Integer, String, Double>> input = env.fromElements( Tuple3.of(1, "hello", 4.0), Tuple3.of(1, "hello", 5.0), Tuple3.of(2, "hello", 5.0), Tuple3.of(3, "world", 6.0), Tuple3.of(3, "world", 6.0) ); DataSet<Tuple3<Integer, String, Double>> output = input.groupBy(1) .aggregate(Aggregations.SUM, 0) .and(Aggregations.MIN, 2); output.print(); env.execute("Aggregations Demo"); } 结果: (6,world,6.0) (4,hello,4.0)
public static void main(String[] args) throws Exception { final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSet<Tuple3<Integer, String, Double>> input = env.fromElements( Tuple3.of(1, "hello", 4.0), Tuple3.of(1, "hello", 5.0), Tuple3.of(2, "hello", 5.0), Tuple3.of(3, "world", 6.0), Tuple3.of(3, "world", 6.0) ); DataSet<Tuple3<Integer, String, Double>> output = input .aggregate(Aggregations.SUM, 0) .and(Aggregations.MIN, 2); output.print(); env.execute("Aggregations Demo"); } 结果: (10,world,4.0)
从一组元组中选择一个元组,取其中一个或多个字段的值为最小值(最大值)。用于比较的字段必须是有效的关键字段,即具有可比性。如果多个元组具有最小(最大)字段值,则返回这些元组的任意元组。MinBy (MaxBy)可以应用于完整数据集或分组数据集。
根据String分组后,先对Int属性求最小值,若对应最小值存在多个元祖,在根据double求最小值。
package com.ztland.flink_demo.dataSet; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.tuple.Tuple3; public class DataSetMinBy { public static void main(String[] args) throws Exception { final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSet<Tuple3<Integer, String, Double>> input = env.fromElements( Tuple3.of(1, "hello", 5.0), Tuple3.of(1, "hello", 4.0), Tuple3.of(2, "hello", 5.0), Tuple3.of(3, "world", 7.0), Tuple3.of(4, "world", 6.0) ); DataSet<Tuple3<Integer, String, Double>> output = input.groupBy(1) .minBy(0,2); output.print(); env.execute("minBy Demo"); } 结果: (1,hello,4.0) (3,world,7.0) }
package com.ztland.flink_demo.dataSet; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.tuple.Tuple3; public class DataSetCreateFlow { public static void main(String[] args) throws Exception { final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSet<Tuple3<Integer, String, Double>> input = env.fromElements( Tuple3.of(1, "hello", 5.0), Tuple3.of(1, "hello", 4.0), Tuple3.of(2, "hello", 5.0), Tuple3.of(3, "world", 7.0), Tuple3.of(4, "world", 6.0) ); DataSet<Tuple3<Integer, String, Double>> output = input .minBy(0,2); output.print(); env.execute("minBy Demo"); } } 结果: (1,hello,4.0)
可作用于分组的DataSet或整个的DataSet上,通过迭代器访问DataSet中的所有元素。
package com.ztland.flink_demo.dataSet; import org.apache.flink.api.common.functions.GroupReduceFunction; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.util.Collector; public class DataSetReduceGroup { public static void main(String[] args) throws Exception { final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSet<Tuple3<Integer, String, Double>> input = env.fromElements( Tuple3.of(1, "hello", 5.0), Tuple3.of(1, "hello", 4.0), Tuple3.of(2, "hello", 5.0), Tuple3.of(3, "world", 7.0), Tuple3.of(4, "world", 6.0) ); DataSet<Tuple3<Integer, String, Double>> output = input.groupBy(1) .reduceGroup(new GroupReduceFunction<Tuple3<Integer,String,Double>, Tuple3<Integer,String,Double>>() { /** * */ private static final long serialVersionUID = 1L; @Override public void reduce(Iterable<Tuple3<Integer, String, Double>> in, Collector<Tuple3<Integer, String, Double>> out) throws Exception { // TODO Auto-generated method stub for(Tuple3<Integer, String, Double> tuple : in) { out.collect(tuple); } } }); output.print(); env.execute("reduceGroup Demo"); } } 结果: (3,world,7.0) (4,world,6.0) (1,hello,5.0) (1,hello,4.0) (2,hello,5.0)
Flink支持每个元素去重,也支持根据key的位置去重。
package com.ztland.flink_demo.dataSet; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.tuple.Tuple3; public class DataSetReduceGroup { public static void main(String[] args) throws Exception { final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSet<Tuple3<Integer, String, Double>> input = env.fromElements( Tuple3.of(1, "hello", 5.0), Tuple3.of(1, "hello", 5.0), Tuple3.of(2, "hello", 5.0), Tuple3.of(3, "world", 7.0), Tuple3.of(4, "world", 6.0) ); // 对所有元素去重 // DataSet<Tuple3<Integer, String, Double>> output = input.distinct(); // 对Int和String去重 DataSet<Tuple3<Integer, String, Double>> output = input.distinct(0,1); DataSet<Integer> inputInteger = env.fromElements(1,2,3,4,5,5); // 对转换结果去重 DataSet<Integer> outputInteger = inputInteger.distinct( x -> Math.abs(x) ); output.print(); outputInteger.print(); env.execute("reduceGroup Demo"); } }
连接分为两类:
内连接有以下三种形式:
不带连接函数的形式。where和equalTo分别定义被连接的两个数据集的属性位置,输出者两个位置上相等的元素。
package com.ztland.flink_demo.dataSet; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.tuple.Tuple2; public class DataSetJoin { public static void main(String[] args) throws Exception { final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSet<Tuple2<Integer, String>> input1 = env.fromElements( Tuple2.of(1, "hello"), Tuple2.of(2, "world") ); DataSet<Tuple2<String,Integer>> input2 = env.fromElements( Tuple2.of("hello",1), Tuple2.of("world",2) ); // input1的第0位,于input2的第1位,值相等则连接,不对结果调用连接函数 DataSet<Tuple2<Tuple2<Integer, String>, Tuple2<String, Integer>>> output = input1.join(input2) .where(0) .equalTo(1); output.print(); env.execute("join Demo"); } } 结果: ((1,hello),(hello,1)) ((2,world),(world,2))
带JoinFunction函数的:
构建两个DataSet的笛卡儿积。
package com.ztland.flink_demo.dataSet; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.tuple.Tuple2; public class DataSetCross { public static void main(String[] args) throws Exception { final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSet<Integer> data1 = env.fromElements(1,2,3); DataSet<String> data2 = env.fromElements("one","tow","three"); DataSet<Tuple2<Integer, String>> result = data1.cross(data2); result.print(); env.execute("cross Demo"); } } 结果: (1,one) (1,tow) (1,three) (2,one) (2,tow) (2,three) (3,one) (3,tow) (3,three)
类似RDBMS中的union,DataSet的Union将多个相同类型的数据集拼接在一起。
package com.ztland.flink_demo.dataSet; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.tuple.Tuple2; public class DataSetUnion { public static void main(String[] args) throws Exception { final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSet<Tuple2<String, Integer>> data1 = env.fromElements( Tuple2.of("hello", 1), Tuple2.of("world", 2) ); DataSet<Tuple2<String, Integer>> data2 = env.fromElements( Tuple2.of("hello", 1), Tuple2.of("world", 2) ); DataSet<Tuple2<String, Integer>> data3 = env.fromElements( Tuple2.of("hello", 1), Tuple2.of("world", 2) ); DataSet<Tuple2<String, Integer>> result = data1.union(data2).union(data3); result.print(); env.execute("union Demo"); } } 结果: (hello,1) (hello,1) (hello,1) (world,2) (world,2) (world,2)
Rebalance模式:根据轮询调度算法,将数据均匀地发送给下一级节点。
``` package com.ztland.flink_demo.dataSet; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; public class DataSetDemo { public static void main(String[] args) throws Exception { final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSet<String> in = env.fromElements( "hello", "world" ); DataSet<String> out = in.rebalance(); out.print(); env.execute("rebalance Demo"); } } ```
package com.ztland.flink_demo.dataSet; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.tuple.Tuple2; public class DataSetDemo { public static void main(String[] args) throws Exception { final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSet<Tuple2<String, Integer>> in = env.fromElements( Tuple2.of("hello", 1), Tuple2.of("world", 2) ); DataSet<Tuple2<String, Integer>> out = in.partitionByHash(0); out.print(); env.execute("partitionByHash Demo"); } }
Range-Partition模式:根据某个属性的范围进行分区。
package com.ztland.flink_demo.dataSet; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.tuple.Tuple2; public class DataSetDemo { public static void main(String[] args) throws Exception { final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSet<Tuple2<String, Integer>> in = env.fromElements( Tuple2.of("hello", 1), Tuple2.of("world", 2) ); DataSet<Tuple2<String, Integer>> out = in.partitionByRange(0); out.print(); env.execute("partitionByRange Demo"); } }
广播变量是算子的多个并行实例间共享数据的一类方法,主要特点如下:
使用方法:
package com.ztland.flink_demo.dataSet; import java.util.Collection; import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import scala.collection.Traversable; public class DataSetDemo { public static void main(String[] args) throws Exception { final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSet<Integer> toBroadcast = env.fromElements(1, 2, 3); DataSet<String> data = env.fromElements("a", "b"); DataSet<String> out = data.map(new RichMapFunction<String, String>() { @Override public void open(Configuration parameters) throws Exception { // 3. Access the broadcast DataSet as a Collection Collection<Integer> broadcastSet = getRuntimeContext().getBroadcastVariable("broadcastSetName"); } @Override public String map(String value) throws Exception { return value; } }).withBroadcastSet(toBroadcast, "broadcastSetName"); // 2. Broadcast the DataSet out.print(); env.execute("partitionByRange Demo"); } }
为了提高访问速度和效率,TaskManager将算子实例要访问的远程文件复制到本地进行缓存起来。
使用步骤如下:
package com.ztland.flink_demo.dataSet; import java.io.File; import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.configuration.Configuration; public class DataSetDemo { public static void main(String[] args) throws Exception { final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); /** * 注册缓存文件,分为两类 */ // 远程文件。这里的远程是相对JobManager来说的,以下是注册HDFS缓存文件的示例 env.registerCachedFile("hdfs:///path/to/your/file", "hdfsFile"); // 本地文件,即JobManager本地文件 env.registerCachedFile("file:///path/to/your/file", "lcaolFile",true); DataSet<String> input = env.readTextFile(""); DataSet<Integer> result = input.map(new MyMapper()); env.execute(); } } //利用RichFunction实现自定义算子函数,通过注册名称访问缓存的文件或目录 final class MyMapper extends RichMapFunction<String, Integer>{ /** * */ private static final long serialVersionUID = 1L; @Override public void open(Configuration config) { // access cached file via RuntimeContext and DistributedCache File myFile = getRuntimeContext().getDistributedCache().getFile("hdfsFile"); // read the file (or navigate the directory) } @Override public Integer map(String value) throws Exception { // use content of cached file return null; } }
批处理程序容错的方法就是重试(Retry),重试有两个参数,即故障发生后最多重试次数和在故障发生后延迟多久开始重试,配置方法如下:
public static void main(String[] args) throws Exception {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// 重试次数
env.setNumberOfExecutionRetries(3);
// 重试延迟
env.getConfig().setExecutionRetryDelay(5000);
}
或在配置文件flink-conf.yaml中配置:
execution-retries.default: 3
execution-retries.delay: 10 s
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。