赞
踩
学习文档:Flink 官方文档 - DataStream API - 用户自定义 Functions
学习笔记如下:
用户可以通过实现接口来完成自定义 Functions。
实现接口并使用的样例:
class MyMapFunction implements MapFunction<String, Integer> { public Integer map(String value) { return Integer.parseInt(value); } } data.map(new MyMapFunction());
- 1
- 2
- 3
- 4
使用匿名类实现的样例:
data.map(new MapFunction<String, Integer> () { public Integer map(String value) { return Integer.parseInt(value); } });
- 1
- 2
- 3
使用 Lambda 表达式实现(Java 8)样例:
data.filter(s -> s.startsWith("http://"));
- 1
data.reduce((i1,i2) -> i1 + i2);
- 1
所有的 Flink 函数类都有其 Rich 版本,在 Rick function 中,可以获取运行状态的上下文,而且拥有一些生命周期方法,可以实现更复杂的功能。
所有需要用户自定义的 function 的转化操作都可以转化为 rich function,只需要继承 Rich 版本的 Function 即可。
例如:可以将以下
MapFunction
代码class MyMapFunction implements MapFunction<String, Integer> { public Integer map(String value) { return Integer.parseInt(value); } }
- 1
- 2
- 3
替换为
class MyMapFunction extends RichMapFunction<String, Integer> { public Integer map(String value) { return Integer.parseInt(value); } }
- 1
- 2
- 3
在使用自定义的 rich function 时,与非 Rich 版本一样,传给 map 算子即可:
data.map(new MyMapFunction());
- 1
Rich Function 也可以使用匿名类的方式定义。例如:
data.map (new RichMapFunction<String, Integer>() {
public Integer map(String value) { return Integer.parseInt(value); }
});
累加器可以使用 Accumulator.add(V value)
方法将其递增;在作业结束时,Flink 会汇总(merge)所有部分的结果并将其发送给客户端。
使用计数器的方法如下:
Step 1|在需要使用累加器的用户自定义 function 中创建一个累加器对象(此处以计数器为例)
private IntCounter numLines = new IntCounter();
Step 2|在 rich function 的 open()
方法中注册累加器对象,也可以在次数定义名称
getRuntimeContext().addAccumulator("num-lines", this.numLines);
Step 3|在 function 的任何地方(包括 open()
和 close()
方法中)使用累加器
this.numLines.add(1);
Step 4|最终整体结果会存储在由执行环境的 execute()
方法返回的 JobExecutionResult
对象中。
myJobExecutionResult.getAccumulatorResult("num-lines");
单个作业的所有累加器共享一个命名空间,因此可以在不同操作 function 里使用同一个累加器,Flink 会在内部将所有具有相同名称的累加器合并起来。
需要注意的是,当前累加器的结果只有在整个作业结束后才可用。
计数器 IntCounter
类的源码如下:
// flink-core/src/main/java/org/apache/flink/api/common/accumulators/IntCounter.java
@PublicEvolving
public class IntCounter implements SimpleAccumulator<Integer> {
private static final long serialVersionUID = 1L;
private int localValue = 0;
public IntCounter() {}
public IntCounter(int value) {
this.localValue = value;
}
// ------------------------------------------------------------------------
// Accumulator
// ------------------------------------------------------------------------
/** Consider using {@link #add(int)} instead for primitive int values */
@Override
public void add(Integer value) {
localValue += value;
}
@Override
public Integer getLocalValue() {
return localValue;
}
@Override
public void merge(Accumulator<Integer, Integer> other) {
this.localValue += other.getLocalValue();
}
@Override
public void resetLocal() {
this.localValue = 0;
}
@Override
public IntCounter clone() {
IntCounter result = new IntCounter();
result.localValue = localValue;
return result;
}
// ------------------------------------------------------------------------
// Primitive Specializations
// ------------------------------------------------------------------------
public void add(int value) {
localValue += value;
}
public int getLocalValuePrimitive() {
return this.localValue;
}
// ------------------------------------------------------------------------
// Utilities
// ------------------------------------------------------------------------
@Override
public String toString() {
return "IntCounter " + this.localValue;
}
}
localValue
属性来存储当前 function 中的累加器的值add
方法:实现累加操作merge
方法:实现多个累加器的合并clone()
方法:实现累加器的复制计数器 IntCounter
类实现了 SimpleAccumulator
接口,SimpleAccumulator
接口的源码如下:
// flink-core/src/main/java/org/apache/flink/api/common/accumulators/SimpleAccumulator.java
@Public
public interface SimpleAccumulator<T extends Serializable> extends Accumulator<T, T> {}
SimpleAccumulator
接口继承了 Accumulator
接口,并定义添加的值的类型与最终结果的类型相同;Accumulator
接口的源码如下:
@Public
public interface Accumulator<V, R extends Serializable> extends Serializable, Cloneable {
/** @param value The value to add to the accumulator object */
void add(V value);
/** @return local The local value from the current UDF context */
R getLocalValue();
/** Reset the local value. This only affects the current UDF context. */
void resetLocal();
/**
* Used by system internally to merge the collected parts of an accumulator at the end of the
* job.
*
* @param other Reference to accumulator to merge in.
*/
void merge(Accumulator<V, R> other);
/**
* Duplicates the accumulator. All subclasses need to properly implement cloning and cannot
* throw a {@link java.lang.CloneNotSupportedException}
*
* @return The duplicated accumulator.
*/
Accumulator<V, R> clone();
}
add
方法:实现累加器的累加操作getLocalValue()
方法:读取当前 UDF 中累加器的值resetLocal()
方法:重置当前 UDF 中累加器的值merge
方法:在 Flink 合并不同 UDF 的结果时使用clone()
方法:复制累加器对象,实现了 Clonable
接口在 Accumulator
接口中,定义了添加的值的类型 V
和最终结果的类型 R
。适用于实现更灵活的操作,例如直方图,添加的值为数字,最终结果的直方图。
在 SimpleAccumulator
接口中,要求添加的值的类型和最终结果的类型相同。适用于相对简单的操作,例如计数器。
如果需要自定义累加器,只需要实现 Accumulator
接口或 SimpleAccumulator
接口即可。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。