赞
踩
目录
- import org.apache.flink.streaming.api.datastream.DataStreamSource;
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
- public class ShuffleTest {
- public static void main(String[] args) throws Exception {
- // 创建执行环境
- StreamExecutionEnvironment env =
- StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(1);
- // 读取数据源,并行度为 1
- DataStreamSource<Event> stream = env.addSource(new ClickSource());
- // 经洗牌后打印输出,并行度为 4
- stream.shuffle().print("shuffle").setParallelism(4);
- env.execute();
- }
- }
shuffle:1> Event{user='Bob', url='./cart', timestamp=...}shuffle:4> Event{user='Cary', url='./home', timestamp=...}shuffle:3> Event{user='Alice', url='./fav', timestamp=...}shuffle:4> Event{user='Cary', url='./cart', timestamp=...}shuffle:3> Event{user='Cary', url='./fav', timestamp=...}shuffle:1> Event{user='Cary', url='./home', timestamp=...}shuffle:2> Event{user='Mary', url='./home', timestamp=...}shuffle:1> Event{user='Bob', url='./fav', timestamp=...}shuffle:2> Event{user='Mary', url='./home', timestamp=...}
- import org.apache.flink.streaming.api.datastream.DataStreamSource;
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
- public class RebalanceTest {
- public static void main(String[] args) throws Exception {
- // 创建执行环境
- StreamExecutionEnvironment env =
- StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(1);
- // 读取数据源,并行度为 1
- DataStreamSource<Event> stream = env.addSource(new ClickSource());
- // 经轮询重分区后打印输出,并行度为 4
- stream.rebalance().print("rebalance").setParallelism(4);
- env.execute();
- }
- }
rebalance:2> Event{user='Cary', url='./fav', timestamp=...}rebalance:3> Event{user='Mary', url='./cart', timestamp=...}rebalance:4> Event{user='Mary', url='./fav', timestamp=...}rebalance:1> Event{user='Cary', url='./home', timestamp=...}rebalance:2> Event{user='Cary', url='./cart', timestamp=...}rebalance:3> Event{user='Alice', url='./prod?id=1', timestamp=...}rebalance:4> Event{user='Cary', url='./prod?id=2', timestamp=...}rebalance:1> Event{user='Bob', url='./prod?id=2', timestamp=...}rebalance:2> Event{user='Alice', url='./prod?id=1', timestamp=...}
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
- import
- org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
- public class RescaleTest {
- public static void main(String[] args) throws Exception {
- StreamExecutionEnvironment env =
- StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(1);
- // 这里使用了并行数据源的富函数版本
- // 这样可以调用 getRuntimeContext 方法来获取运行时上下文的一些信息
- env
- .addSource(new RichParallelSourceFunction<Integer>() {
- @Override
- public void run(SourceContext<Integer> sourceContext) throws
- Exception {
- for (int i = 0; i < 8; i++) {
- // 将奇数发送到索引为 1 的并行子任务
- // 将偶数发送到索引为 0 的并行子任务
- if ((i + 1) % 2 ==
- getRuntimeContext().getIndexOfThisSubtask()) {
- sourceContext.collect(i + 1);
- }
- }
- }
- @Override
- public void cancel() {
- }
- })
- .setParallelism(2)
- .rescale()
- .print().setParallelism(4);
- env.execute();
- }
- }
4> 33> 11> 21> 63> 54> 72> 42> 8
- import org.apache.flink.streaming.api.datastream.DataStreamSource;
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
- public class BroadcastTest {
- public static void main(String[] args) throws Exception {
- // 创建执行环境
- StreamExecutionEnvironment env =
- StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(1);
- // 读取数据源,并行度为 1
- DataStreamSource<Event> stream = env.addSource(new ClickSource());
- // 经广播后打印输出,并行度为 4
- stream. broadcast().print("broadcast").setParallelism(4);
- env.execute();
- }
- }
broadcast:3> Event{user='Mary', url='./cart', timestamp=...}broadcast:1> Event{user='Mary', url='./cart', timestamp=...}broadcast:4> Event{user='Mary', url='./cart', timestamp=...}broadcast:2> Event{user='Mary', url='./cart', timestamp=...}broadcast:2> Event{user='Alice', url='./fav', timestamp=...}broadcast:1> Event{user='Alice', url='./fav', timestamp=...}broadcast:3> Event{user='Alice', url='./fav', timestamp=...}broadcast:4> Event{user='Alice', url='./fav', timestamp=...}
- import org.apache.flink.api.common.functions.Partitioner;
- import org.apache.flink.api.java.functions.KeySelector;
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
- public class CustomPartitionTest {
- public static void main(String[] args) throws Exception {
- StreamExecutionEnvironment env =
- StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(1);
- // 将自然数按照奇偶分区
- env.fromElements(1, 2, 3, 4, 5, 6, 7, 8)
- .partitionCustom(new Partitioner<Integer>() {
- @Override
- public int partition(Integer key, int numPartitions) {
- return key % 2;
- }
- }, new KeySelector<Integer, Integer>() {
- @Override
- public Integer getKey(Integer value) throws Exception {
- return value;
- }
- })
- .print().setParallelism(2);
- env.execute();
- }
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。