赞
踩
基于JAVA8 环境 + Centos7 服务器 + Flink1.14.6.
Flink安装部署测试可见1
Flink1.14.6安装部署参考
导入项目依赖
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.14.6</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>1.14.6</version>
</dependency>
</dependencies>
代码参考网址:
package com.scali.experiment.flink; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.util.Collector; public class SocketWindowWordCount { public static void main(String[] args) throws Exception { // 该部分代码从官网copy出来的,前面有一部分是hostname和port直接修改为默认。 // ip随便命名 final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // get input data by connecting to the socket DataStream<String> text = env.socketTextStream("202.127.205.60", 9999, "\n"); // parse the data, group it, window it, and aggregate the counts DataStream<WordWithCount> windowCounts = text.flatMap( new FlatMapFunction<String, WordWithCount>() { @Override public void flatMap( String value, Collector<WordWithCount> out) { for (String word : value.split("\\s")) { out.collect(new WordWithCount(word, 1L)); } } }) .keyBy(value -> value.word) .window(TumblingProcessingTimeWindows.of(Time.seconds(5))) .reduce( new ReduceFunction<WordWithCount>() { @Override public WordWithCount reduce(WordWithCount a, WordWithCount b) { return new WordWithCount(a.word, a.count + b.count); } }); // print the results with a single thread, rather than in parallel windowCounts.print().setParallelism(1); env.execute("Socket Window WordCount"); } // ------------------------------------------------------------------------ /** Data type for words with count. */ public static class WordWithCount { public String word; public long count; public WordWithCount() {} public WordWithCount(String word, long count) { this.word = word; this.count = count; } @Override public String toString() { return word + " : " + count; } } }
保存代码。使用了IDEA,所以直接进行打包
nc -l 9999
,参考网址:flink1.14.4说明文档
<dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-core</artifactId> <version>1.14.6</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>1.14.6</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.12</artifactId> <version>1.14.6</version> </dependency> <!-- 欺诈检测的common依赖包,包括一些实体等内容--> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-walkthrough-common_2.12</artifactId> <version>1.14.6</version> </dependency> </dependencies>
package com.scali.experiment.flink; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.walkthrough.common.entity.Alert; import org.apache.flink.walkthrough.common.entity.Transaction; import org.apache.flink.walkthrough.common.sink.AlertSink; import org.apache.flink.walkthrough.common.source.TransactionSource; /** * @Author:HE * @Date:Created in 15:42 2023/7/2 * @Description: */ public class FraudDetectionJob { public static void main(String[] args) throws Exception { // 设置任务环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //创建数据源:Transaction是个实体类,可以后期修改,自己建实体类等信息 DataStream<Transaction> transactions = env // TransactionSource是准备好的数据源 .addSource(new TransactionSource()) .name("transactions"); //一个task处理同一个key的所有数据。 DataStream#keyBy 对流进行分区, DataStream<Alert> alerts = transactions .keyBy(Transaction::getAccountId) // 绑定数据流,执行FraudDetector中的操作 .process(new FraudDetector()) .name("fraud-detector"); alerts // AlertSink 使用 INFO 的日志级别打印每一个 Alert 的数据记录了,可以将结果输入到kafka, .addSink(new AlertSink()) .name("send-alerts"); env.execute("Fraud Detection"); } }
package com.scali.experiment.flink; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.KeyedProcessFunction; import org.apache.flink.util.Collector; import org.apache.flink.walkthrough.common.entity.Alert; import org.apache.flink.walkthrough.common.entity.Transaction; // KeyedProcessFunction的接口实现 //KeyedProcessFunction:同时提供对状态和时间的细粒度操作 public class FraudDetector extends KeyedProcessFunction<Long, Transaction, Alert> { private static final long serialVersionUID = 1L; private static final double SMALL_AMOUNT = 1.00; private static final double LARGE_AMOUNT = 500.00; private static final long ONE_MINUTE = 60 * 1000; private transient ValueState<Boolean> flagState; private transient ValueState<Long> timerState; //通过定时器判断两次取钱间隔是否超过一分钟;通过flag判断上一次取钱行为是否被标记为小数目取钱 // 综合判断本次取钱是否有问题 @Override public void open(Configuration parameters) { ValueStateDescriptor<Boolean> flagDescriptor = new ValueStateDescriptor<>( "flag", Types.BOOLEAN); flagState = getRuntimeContext().getState(flagDescriptor); ValueStateDescriptor<Long> timerDescriptor = new ValueStateDescriptor<>( "timer-state", Types.LONG); timerState = getRuntimeContext().getState(timerDescriptor); } @Override public void processElement( Transaction transaction, Context context, Collector<Alert> collector) throws Exception { // 获取上一次取钱行为判断是否设置了值 Boolean lastTransactionWasSmall = flagState.value(); // Check if the flag is set if (lastTransactionWasSmall != null) { if (transaction.getAmount() > LARGE_AMOUNT) { //Output an alert downstream Alert alert = new Alert(); alert.setId(transaction.getAccountId()); collector.collect(alert); } // Clean up our state cleanUp(context); } if (transaction.getAmount() < SMALL_AMOUNT) { // set the flag to true flagState.update(true); long timer = context.timerService().currentProcessingTime() + ONE_MINUTE; context.timerService().registerProcessingTimeTimer(timer); timerState.update(timer); } } //定时器服务可以用于查询当前时间、注册定时器和删除定时器。 @Override public void onTimer(long timestamp, OnTimerContext ctx, Collector<Alert> out) { // remove flag after 1 minute timerState.clear(); flagState.clear(); } private void cleanUp(Context ctx) throws Exception { // delete timer Long timer = timerState.value(); ctx.timerService().deleteProcessingTimeTimer(timer); // clean up all state timerState.clear(); flagState.clear(); } }
需要在flink的lib目录下添加相关jar包,否则后面会有NoclassError错误
访问bin目录重启flink。
打包后上传到flink进行执行
查看TaskManager中的结果
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。