赞
踩
flink入门基本使用
依据flink官网-信用卡欺诈检测例子进行测试flink
flink官网:
基于 DataStream API 实现欺诈检测 | Apache Flink
通过 Flink DataStream API 来实现一个有状态流处理程序
1,执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
2,创建数据源 TransactionSource为flink的一个示例数据源,可以模拟一个无限循环生成信用卡模拟交易数据的数据源,模拟一个无界数据流,
DataStream<Transaction> transactions = env .addSource(new TransactionSource()) .name("transactions");
这个数据源需要添加如下依赖:
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-walkthrough-common_2.12</artifactId>
- <version>1.12.0</version>
- </dependency>
3,对事件分区 & 业务逻辑处理
DataStream#keyBy 对流进行分区
process() 函数对流数据进行业务处理
DataStream<Alert> alerts = transactions .keyBy(Transaction::getAccountId) .process(new FraudDetector()) .name("fraud-detector");
4,输出结果
sink 会将 DataStream 写出到外部系统,例如 Apache Kafka、Cassandra 或者 AWS Kinesis 等。 AlertSink 使用 INFO 的日志级别打印每一个 Alert 的数据记录,而不是将其写入持久存储,以便你可以方便地查看结果。
alerts.addSink(new AlertSink());
5,运行作业
调用 StreamExecutionEnvironment#execute 时给任务传递一个任务名参数,就可以开始运行任务
env.execute("Fraud Detection");
6,业务处理器
FraudDetector 是 KeyedProcessFunction 接口的一个实现。 他的方法 KeyedProcessFunction#processElement 将会在每个事件上被调用
public class FraudDetector extends KeyedProcessFunction<Long, Transaction, Alert> {
private static final double SMALL_AMOUNT = 1.00;
private static final double LARGE_AMOUNT = 500.00;
private static final long ONE_MINUTE = 60 * 1000;
@Override
public void processElement( Transaction transaction, Context context, Collector<Alert> collector) throws Exception {
Alert alert = new Alert();
alert.setId(transaction.getAccountId());
collector.collect(alert);
}
}
flink状态类:ValueState
这是一种能够为被其封装的变量添加容错能力的类型。 ValueState 是一种 keyed state,也就是说它只能被用于 keyed context 提供的 operator 中,即所有能够紧随 DataStream#keyBy 之后被调用的operator。 一个 operator 中的 keyed state 的作用域默认是属于它所属的 key 的。 这个例子中,key 就是当前正在处理的交易行为所属的信用卡账户(key 传入 keyBy() 函数调用),而 FraudDetector 维护了每个帐户的标记状态。 ValueState 需要使用 ValueStateDescriptor 来创建,ValueStateDescriptor 包含了 Flink 如何管理变量的一些元数据信息。状态在使用之前需要先被注册。 状态需要使用 open() 函数来注册状态
public class FraudDetector extends KeyedProcessFunction<Long, Transaction, Alert> { private static final long serialVersionUID = 1L; private transient ValueState<Boolean> flagState; @Override public void open(Configuration parameters) { ValueStateDescriptor<Boolean> flagDescriptor = new ValueStateDescriptor<>( "flag", Types.BOOLEAN); flagState = getRuntimeContext().getState(flagDescriptor); }
ValueState 是一个包装类,类似于 Java 标准库里边的 AtomicReference 和 AtomicLong。 它提供了三个用于交互的方法。update 用于更新状态,value 用于获取状态值,还有 clear 用于清空状态。 如果一个 key 还没有状态,例如当程序刚启动或者调用过 ValueState#clear 方法时,ValueState#value 将会返回 null。 如果需要更新状态,需要调用 ValueState#update 方法,直接更改 ValueState#value 的返回值可能不会被系统识别。 容错处理将在 Flink 后台自动管理,你可以像与常规变量那样与状态变量进行交互。
下边的示例,说明了如何使用标记状态来追踪可能的欺诈交易行为。
Java
@Override public void processElement( Transaction transaction, Context context, Collector<Alert> collector) throws Exception { // Get the current state for the current key Boolean lastTransactionWasSmall = flagState.value(); // Check if the flag is set if (lastTransactionWasSmall != null) { if (transaction.getAmount() > LARGE_AMOUNT) { // Output an alert downstream Alert alert = new Alert(); alert.setId(transaction.getAccountId()); collector.collect(alert); } // Clean up our state flagState.clear(); } if (transaction.getAmount() < SMALL_AMOUNT) { // Set the flag to true flagState.update(true); } }
Scala
对于每笔交易,欺诈检测器都会检查该帐户的标记状态。 请记住,ValueState 的作用域始终限于当前的 key,即信用卡帐户。 如果标记状态不为空,则该帐户的上一笔交易是小额的,因此,如果当前这笔交易的金额很大,那么检测程序将输出报警信息。
在检查之后,不论是什么状态,都需要被清空。 不管是当前交易触发了欺诈报警而造成模式的结束,还是当前交易没有触发报警而造成模式的中断,都需要重新开始新的模式检测。
最后,检查当前交易的金额是否属于小额交易。 如果是,那么需要设置标记状态,以便可以在下一个事件中对其进行检查。 注意,ValueState<Boolean> 实际上有 3 种状态:unset (null),true,和 false,ValueState 是允许空值的。 我们的程序只使用了 unset (null) 和 true 两种来判断标记状态被设置了与否。
maven打包并上传至flink服务器
提交作业命令:
bin/flink run -c com.test.flinktest4.FraudDetectionJob /data/lts/flinktest4.jar
如果打出的jar包不含依赖项的话,可以把依赖的jar包放到flink的lib文件夹,或放在其他位置,然后再flink安装目录的bin/config.sh脚本文件中设置该lib路径:
依赖的jar包都可以放在这个目录内
INTERNAL_HADOOP_CLASSPATHS="${HADOOP_CLASSPATH}:${HADOOP_CONF_DIR}:${YARN_CONF_DIR}:/opt/flink-1.12.0/lib/"
常用命令
flink提交job作业命令:
bin/flink run -c <入口类> -p <并行度> <jar包路径> <启动参数>
查看已提交的所有job
bin/flink list
取消job命令
bin/flink cancel <Job的ID>
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。