devbox
不正经
这个屌丝很懒,什么也没留下!
热门文章

另外,我们还可以拿到web服务器的日志数据,这里以apache服务器的一份log为例,每一行日志记录了访问者的IP、userId、访问时间、访问方法以及访问的url,具体描述如下:

字段名

数据类型

说明

ip

String

访问的 IP

userId

Long

访问的 user ID

eventTime

Long

访问时间

method

String

访问方法 GET/POST/PUT/DELETE

url

String

访问的 url

由于行为数据有限,在实时热门商品统计模块中可以使用UserBehavior数据集,而对于恶意登录监控和订单支付失效监控,我们只以示例数据来做演示。

第2章、实时热门商品统计

首先要实现的是实时热门商品统计,我们将会基于UserBehavior数据集来进行分析。

项目主体用Scala编写,采用IDEA作为开发环境进行项目编写,采用maven作为项目构建和管理工具。首先我们需要搭建项目框架。

2.1、 创建Maven项目

2.1.1 、项目框架搭建

打开IDEA,创建一个maven项目,命名为UserBehaviorAnalysis。由于包含了多个模块,我们可以以UserBehaviorAnalysis作为父项目,并在其下建一个名为HotItemsAnalysis的子项目,用于实时统计热门top N商品。

在UserBehaviorAnalysis下新建一个 maven module作为子项目,命名为HotItemsAnalysis。

父项目只是为了规范化项目结构,方便依赖管理,本身是不需要代码实现的,所以UserBehaviorAnalysis下的src文件夹可以删掉。

2.1.2、 声明项目中工具的版本信息

我们整个项目需要的工具的不同版本可能会对程序运行造成影响,所以应该在最外层的UserBehaviorAnalysis中声明所有子模块共用的版本信息。

在pom.xml中加入以下配置:

UserBehaviorAnalysis/pom.xml

  1. <properties>
  2. <flink.version>1.7.2</flink.version>
  3. <scala.binary.version>2.11</scala.binary.version>
  4. <kafka.version>2.2.0</kafka.version>
  5. </properties>

2.1.3 添加项目依赖

对于整个项目而言,所有模块都会用到flink相关的组件,所以我们在UserBehaviorAnalysis中引入公有依赖:

UserBehaviorAnalysis/pom.xml

  1. <dependencies>
  2. <dependency>
  3. <groupId>org.apache.flink</groupId>
  4. <artifactId>flink-scala_${scala.binary.version}</artifactId>
  5. <version>${flink.version}</version>
  6. </dependency>
  7. <dependency>
  8. <groupId>org.apache.flink</groupId>
  9. <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
  10. <version>${flink.version}</version>
  11. </dependency>
  12. <dependency>
  13. <groupId>org.apache.kafka</groupId>
  14. <artifactId>kafka_${scala.binary.version}</artifactId>
  15. <version>${kafka.version}</version>
  16. </dependency>
  17. <dependency>
  18. <groupId>org.apache.flink</groupId>
  19. <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
  20. <version>${flink.version}</version>
  21. </dependency>
  22. </dependencies>

同样,对于maven项目的构建,可以引入公有的插件:

  1. <build>
  2. <plugins>
  3. <!-- 该插件用于将Scala代码编译成class文件 -->
  4. <plugin>
  5. <groupId>net.alchim31.maven</groupId>
  6. <artifactId>scala-maven-plugin</artifactId>
  7. <version>3.4.6</version>
  8. <executions>
  9. <execution>
  10. <!-- 声明绑定到maven的compile阶段 -->
  11. <goals>
  12. <goal>testCompile</goal>
  13. </goals>
  14. </execution>
  15. </executions>
  16. </plugin>
  17. <plugin>
  18. <groupId>org.apache.maven.plugins</groupId>
  19. <artifactId>maven-assembly-plugin</artifactId>
  20. <version>3.0.0</version>
  21. <configuration>
  22. <descriptorRefs>
  23. <descriptorRef>
  24. jar-with-dependencies
  25. </descriptorRef>
  26. </descriptorRefs>
  27. </configuration>
  28. <executions>
  29. <execution>
  30. <id>make-assembly</id>
  31. <phase>package</phase>
  32. <goals>
  33. <goal>single</goal>
  34. </goals>
  35. </execution>
  36. </executions>
  37. </plugin>
  38. </plugins>
  39. </build>

在HotItemsAnalysis子模块中,我们并没有引入更多的依赖,所以不需要改动pom文件。

2.1.4 数据准备

在src/main/目录下,可以看到已有的默认源文件目录是java,我们可以将其改名为scala。将数据文件UserBehavior.csv复制到资源文件目录src/main/resources下,我们将从这里读取数据。

至此,我们的准备工作都已完成,接下来可以写代码了。

2.2 模块代码实现

我们将实现一个“实时热门商品”的需求,可以将“实时热门商品”翻译成程序员更好理解的需求:每隔5分钟输出最近一小时内点击量最多的前N个商品。将这个需求进行分解我们大概要做这么几件事情:

  • 抽取出业务时间戳,告诉Flink框架基于业务时间做窗口
  • 过滤出点击行为数据
  • 按一小时的窗口大小,每5分钟统计一次,做滑动窗口聚合(Sliding Window)
  • 按每个窗口聚合,输出每个窗口中点击量前N名的商品

2.2.1 程序主体

在src/main/scala下创建HotItems.scala文件,新建一个单例对象。定义样例类UserBehavior和ItemViewCount,在main函数中创建StreamExecutionEnvironment 并做配置,然后从UserBehavior.csv文件中读取数据,并包装成UserBehavior类型。代码如下:

HotItemsAnalysis/src/main/scala/HotItems.scala

  1. case class UserBehavior(userId: Long, itemId: Long, categoryId: Int, behavior: String, timestamp: Long)
  2. case class ItemViewCount(itemId: Long, windowEnd: Long, count: Long)
  3. object HotItems {
  4. def main(args: Array[String]): Unit = {
  5. // 创建一个 StreamExecutionEnvironment
  6. val env = StreamExecutionEnvironment.getExecutionEnvironment
  7. // 设定Time类型为EventTime
  8. env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
  9. // 为了打印到控制台的结果不乱序,我们配置全局的并发为1,这里改变并发对结果正确性没有影响
  10. env.setParallelism(1)
  11. val stream = env
  12. // 以window下为例,需替换成自己的路径
  13. .readTextFile("YOUR_PATH\\resources\\UserBehavior.csv")
  14. .map(line => {
  15. val linearray = line.split(",")
  16. UserBehavior(linearray(0).toLong, linearray(1).toLong, linearray(2).toInt, linearray(3), linearray(4).toLong)
  17. })
  18. // 指定时间戳和watermark
  19. .assignAscendingTimestamps(_.timestamp * 1000)
  20. env.execute("Hot Items Job")
  21. }

这里注意,我们需要统计业务时间上的每小时的点击量,所以要基于EventTime来处理。那么如果让Flink按照我们想要的业务时间来处理呢?这里主要有两件事情要做。

第一件是告诉Flink我们现在按照EventTime模式进行处理,Flink默认使用ProcessingTime处理,所以我们要显式设置如下:

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

第二件事情是指定如何获得业务时间,以及生成Watermark。Watermark是用来追踪业务事件的概念,可以理解成EventTime世界中的时钟,用来指示当前处理到什么时刻的数据了。由于我们的数据源的数据已经经过整理,没有乱序,即事件的时间戳是单调递增的,所以可以将每条数据的业务时间就当做Watermark。这里我们用 assignAscendingTimestamps来实现时间戳的抽取和Watermark的生成。

注:真实业务场景一般都是乱序的,所以一般不用assignAscendingTimestamps而是使用BoundedOutOfOrdernessTimestampExtractor

.assignAscendingTimestamps(_.timestamp * 1000)

这样我们就得到了一个带有时间标记的数据流了,后面就能做一些窗口的操作。

2.2.2 过滤出点击事件

在开始窗口操作之前,先回顾下需求“每隔5分钟输出过去一小时内点击量最多的前N个商品”。由于原始数据中存在点击、购买、收藏、喜欢各种行为的数据,但是我们只需要统计点击量,所以先使用filter将点击行为数据过滤出来。

.filter(_.behavior == "pv")

2.2.3 设置滑动窗口,统计点击量

由于要每隔5分钟统计一次最近一小时每个商品的点击量,所以窗口大小是一小时,每隔5分钟滑动一次。即分别要统计[09:00, 10:00), [09:05, 10:05), [09:10, 10:10)…等窗口的商品点击量。是一个常见的滑动窗口需求(Sliding Window)。

  1. .keyBy("itemId")
  2. .timeWindow(Time.minutes(60), Time.minutes(5))
  3. .aggregate(new CountAgg(), new WindowResultFunction());
  1. 我们使用.keyBy("itemId")对商品进行分组,使用.timeWindow(Time size, Time slide)对每个商品做
  2. 滑动窗口(1小时窗口,5分钟滑动一次)。然后我们使用 .aggregate(AggregateFunction af,
  3. WindowFunction wf) 做增量的聚合操作,它能使用AggregateFunction提前聚合掉数据,减少state的
  4. 存储压力。较之 .apply(WindowFunction wf) 会将窗口中的数据都存储下来,最后一起计算要高效地
  5. 多。这里的CountAgg实现了AggregateFunction接口,功能是统计窗口中的条数,即遇到一条数据就加
  6. 一。
  1. // COUNT统计的聚合函数实现,每出现一条记录就加一
  2. class CountAgg extends AggregateFunction[UserBehavior, Long, Long] {
  3. override def createAccumulator(): Long = 0L
  4. override def add(userBehavior: UserBehavior, acc: Long): Long = acc + 1
  5. override def getResult(acc: Long): Long = acc
  6. override def merge(acc1: Long, acc2: Long): Long = acc1 + acc2
  7. }

聚合操作.aggregate(AggregateFunction af, WindowFunction wf)的第二个参数WindowFunction将每个key每个窗口聚合后的结果带上其他信息进行输出。我们这里实现的WindowResultFunction将<主键商品ID,窗口,点击量>封装成了ItemViewCount进行输出。

  1. // 商品点击量(窗口操作的输出类型)
  2. case class ItemViewCount(itemId: Long, windowEnd: Long, count: Long)

代码如下:

  1. // 用于输出窗口的结果
  2. class WindowResultFunction extends WindowFunction[Long, ItemViewCount, Tuple, TimeWindow] {
  3. override def apply(key: Tuple, window: TimeWindow, aggregateResult: Iterable[Long],
  4. collector: Collector[ItemViewCount]) : Unit = {
  5. val itemId: Long = key.asInstanceOf[Tuple1[Long]].f0
  6. val count = aggregateResult.iterator.next
  7. collector.collect(ItemViewCount(itemId, window.getEnd, count))
  8. }
  9. }

现在我们就得到了每个商品在每个窗口的点击量的数据流。

 

2.2.4 计算最热门Top N商品

为了统计每个窗口下最热门的商品,我们需要再次按窗口进行分组,这里根据ItemViewCount中的windowEnd进行keyBy()操作。然后使用ProcessFunction实现一个自定义的TopN函数TopNHotItems来计算点击量排名前3名的商品,并将排名结果格式化成字符串,便于后续输出。

  1. .keyBy("windowEnd")
  2. .process(new TopNHotItems(3)); // 求点击量前3名的商品

ProcessFunction是Flink提供的一个low-level API,用于实现更高级的功能。它主要提供了定时器timer的功能(支持EventTime或ProcessingTime)。本案例中我们将利用timer来判断何时收齐了某个window下所有商品的点击量数据。由于Watermark的进度是全局的,在processElement方法中,每当收到一条数据ItemViewCount,我们就注册一个windowEnd+1的定时器(Flink框架会自动忽略同一时间的重复注册)。windowEnd+1的定时器被触发时,意味着收到了windowEnd+1的Watermark,即收齐了该windowEnd下的所有商品窗口统计值。我们在onTimer()中处理将收集的所有商品及点击量进行排序,选出TopN,并将排名信息格式化成字符串后进行输出。

这里我们还使用了ListState<ItemViewCount>来存储收到的每条ItemViewCount消息,保证在发生故障时,状态数据的不丢失和一致性。ListState是Flink提供的类似Java List接口的State API,它集成了框架的checkpoint机制,自动做到了exactly-once的语义保证。

  1. // 求某个窗口中前 N 名的热门点击商品,key 为窗口时间戳,输出为 TopN 的结果字符串
  2. class TopNHotItems(topSize: Int) extends KeyedProcessFunction[Tuple, ItemViewCount, String] {
  3. private var itemState : ListState[ItemViewCount] = _
  4. override def open(parameters: Configuration): Unit = {
  5. super.open(parameters)
  6. // 命名状态变量的名字和状态变量的类型
  7. val itemsStateDesc = new ListStateDescriptor[ItemViewCount]("itemState-state", classOf[ItemViewCount])
  8. // 定义状态变量
  9. itemState = getRuntimeContext.getListState(itemsStateDesc)
  10. }
  11. override def processElement(input: ItemViewCount, context: KeyedProcessFunction[Tuple, ItemViewCount, String]#Context, collector: Collector[String]): Unit = {
  12. // 每条数据都保存到状态中
  13. itemState.add(input)
  14. // 注册 windowEnd+1 的 EventTime Timer, 当触发时,说明收齐了属于windowEnd窗口的所有商品数据
  15. // 也就是当程序看到windowend + 1的水位线watermark时,触发onTimer回调函数
  16. context.timerService.registerEventTimeTimer(input.windowEnd + 1)
  17. }
  18. override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Tuple, ItemViewCount, String]#OnTimerContext, out: Collector[String]): Unit = {
  19. // 获取收到的所有商品点击量
  20. val allItems: ListBuffer[ItemViewCount] = ListBuffer()
  21. import scala.collection.JavaConversions._
  22. for (item <- itemState.get) {
  23. allItems += item
  24. }
  25. // 提前清除状态中的数据,释放空间
  26. itemState.clear()
  27. // 按照点击量从大到小排序
  28. val sortedItems = allItems.sortBy(_.count)(Ordering.Long.reverse).take(topSize)
  29. // 将排名信息格式化成 String, 便于打印
  30. val result: StringBuilder = new StringBuilder
  31. result.append("====================================\n")
  32. result.append("时间: ").append(new Timestamp(timestamp - 1)).append("\n")
  33. for(i <- sortedItems.indices){
  34. val currentItem: ItemViewCount = sortedItems(i)
  35. // e.g. No1: 商品ID=12224 浏览量=2413
  36. result.append("No").append(i+1).append(":")
  37. .append(" 商品ID=").append(currentItem.itemId)
  38. .append(" 浏览量=").append(currentItem.count).append("\n")
  39. }
  40. result.append("====================================\n\n")
  41. // 控制输出频率,模拟实时滚动结果
  42. Thread.sleep(1000)
  43. out.collect(result.toString)
  44. }
  45. }

最后我们可以在main函数中将结果打印输出到控制台,方便实时观测:

.print();

至此整个程序代码全部完成,我们直接运行main函数,就可以在控制台看到不断输出的各个时间点统计出的热门商品。

2.2.5 完整代码

最终完整代码如下:

  1. case class UserBehavior(userId: Long, itemId: Long, categoryId: Int, behavior: String, timestamp: Long)
  2. case class ItemViewCount(itemId: Long, windowEnd: Long, count: Long)
  3. object HotItems {
  4. def main(args: Array[String]): Unit = {
  5. val env = StreamExecutionEnvironment.getExecutionEnvironment
  6. env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
  7. env.setParallelism(1)
  8. val stream = env
  9. .readTextFile("YOUR_PATH\\resources\\UserBehavior.csv")
  10. .map(line => {
  11. val linearray = line.split(",")
  12. UserBehavior(linearray(0).toLong, linearray(1).toLong, linearray(2).toInt, linearray(3), linearray(4).toLong)
  13. })
  14. .assignAscendingTimestamps(_.timestamp * 1000)
  15. .filter(_.behavior=="pv")
  16. .keyBy("itemId")
  17. .timeWindow(Time.minutes(60), Time.minutes(5))
  18. .aggregate(new CountAgg(), new WindowResultFunction())
  19. .keyBy(1)
  20. .process(new TopNHotItems(3))
  21. .print()
  22. env.execute("Hot Items Job")
  23. }
  24. // COUNT 统计的聚合函数实现,每出现一条记录加一
  25. class CountAgg extends AggregateFunction[UserBehavior, Long, Long] {
  26. override def createAccumulator(): Long = 0L
  27. override def add(userBehavior: UserBehavior, acc: Long): Long = acc + 1
  28. override def getResult(acc: Long): Long = acc
  29. override def merge(acc1: Long, acc2: Long): Long = acc1 + acc2
  30. }
  31. // 用于输出窗口的结果
  32. class WindowResultFunction extends WindowFunction[Long, ItemViewCount, Tuple, TimeWindow] {
  33. override def apply(key: Tuple, window: TimeWindow, aggregateResult: Iterable[Long],
  34. collector: Collector[ItemViewCount]) : Unit = {
  35. val itemId: Long = key.asInstanceOf[Tuple1[Long]].f0
  36. val count = aggregateResult.iterator.next
  37. collector.collect(ItemViewCount(itemId, window.getEnd, count))
  38. }
  39. }
  40. // 求某个窗口中前 N 名的热门点击商品,key 为窗口时间戳,输出为 TopN 的结果字符串
  41. class TopNHotItems(topSize: Int) extends KeyedProcessFunction[Tuple, ItemViewCount, String] {
  42. private var itemState : ListState[ItemViewCount] = _
  43. override def open(parameters: Configuration): Unit = {
  44. super.open(parameters)
  45. // 命名状态变量的名字和状态变量的类型
  46. val itemsStateDesc = new ListStateDescriptor[ItemViewCount]("itemState-state", classOf[ItemViewCount])
  47. // 从运行时上下文中获取状态并赋值
  48. itemState = getRuntimeContext.getListState(itemsStateDesc)
  49. }
  50. override def processElement(input: ItemViewCount, context: KeyedProcessFunction[Tuple, ItemViewCount, String]#Context, collector: Collector[String]): Unit = {
  51. // 每条数据都保存到状态中
  52. itemState.add(input)
  53. // 注册 windowEnd+1 的 EventTime Timer, 当触发时,说明收齐了属于windowEnd窗口的所有商品数据
  54. // 也就是当程序看到windowend + 1的水位线watermark时,触发onTimer回调函数
  55. context.timerService.registerEventTimeTimer(input.windowEnd + 1)
  56. }
  57. override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Tuple, ItemViewCount, String]#OnTimerContext, out: Collector[String]): Unit = {
  58. // 获取收到的所有商品点击量
  59. val allItems: ListBuffer[ItemViewCount] = ListBuffer()
  60. import scala.collection.JavaConversions._
  61. for (item <- itemState.get) {
  62. allItems += item
  63. }
  64. // 提前清除状态中的数据,释放空间
  65. itemState.clear()
  66. // 按照点击量从大到小排序
  67. val sortedItems = allItems.sortBy(_.count)(Ordering.Long.reverse).take(topSize)
  68. // 将排名信息格式化成 String, 便于打印
  69. val result: StringBuilder = new StringBuilder
  70. result.append("====================================\n")
  71. result.append("时间: ").append(new Timestamp(timestamp - 1)).append("\n")
  72. for(i <- sortedItems.indices){
  73. val currentItem: ItemViewCount = sortedItems(i)
  74. // e.g. No1: 商品ID=12224 浏览量=2413
  75. result.append("No").append(i+1).append(":")
  76. .append(" 商品ID=").append(currentItem.itemId)
  77. .append(" 浏览量=").append(currentItem.count).append("\n")
  78. }
  79. result.append("====================================\n\n")
  80. // 控制输出频率,模拟实时滚动结果
  81. Thread.sleep(1000)
  82. out.collect(result.toString)
  83. }
  84. }
  85. }

2.2.6 更换Kafka 作为数据源

实际生产环境中,我们的数据流往往是从Kafka获取到的。如果要让代码更贴近生产实际,我们只需将source更换为Kafka即可:

  1. val properties = new Properties()
  2. properties.setProperty("bootstrap.servers", "localhost:9092")
  3. properties.setProperty("group.id", "consumer-group")
  4. properties.setProperty("key.deserializer",
  5. "org.apache.kafka.common.serialization.StringDeserializer")
  6. properties.setProperty("value.deserializer",
  7. "org.apache.kafka.common.serialization.StringDeserializer")
  8. properties.setProperty("auto.offset.reset", "latest")
  9. val env = StreamExecutionEnvironment.getExecutionEnvironment
  10. env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
  11. env.setParallelism(1)
  12. val stream = env
  13. .addSource(new FlinkKafkaConsumer[String]("hotitems", new SimpleStringSchema(), properties))

当然,根据实际的需要,我们还可以将Sink指定为Kafka、ES、Redis或其它存储,这里就不一一展开实现了。

第3章 、实时流量统计

3.1 模块创建和数据准备

在UserBehaviorAnalysis下新建一个 maven module作为子项目,命名为NetworkTrafficAnalysis。在这个子模块中,我们同样并没有引入更多的依赖,所以也不需要改动pom文件。

在src/main/目录下,将默认源文件目录java改名为scala。将apache服务器的日志文件apache.log复制到资源文件目录src/main/resources下,我们将从这里读取数据。

3.2 代码实现

我们现在要实现的模块是 “实时流量统计”。对于一个电商平台而言,用户登录的入口流量、不同页面的访问流量都是值得分析的重要数据,而这些数据,可以简单地从web服务器的日志中提取出来。我们在这里实现最基本的“页面浏览数”的统计,也就是读取服务器日志中的每一行log,统计在一段时间内用户访问url的次数。

具体做法为:每隔5秒,输出最近10分钟内访问量最多的前N个URL。可以看出,这个需求与之前“实时热门商品统计”非常类似,所以我们完全可以借鉴此前的代码。

在src/main/scala下创建TrafficAnalysis.scala文件,新建一个单例对象。定义样例类ApacheLogEvent,这是输入的日志数据流;另外还有UrlViewCount,这是窗口操作统计的输出数据类型。在main函数中创建StreamExecutionEnvironment 并做配置,然后从apache.log文件中读取数据,并包装成ApacheLogEvent类型。

需要注意的是,原始日志中的时间是“dd/MM/yyyy:HH:mm:ss”的形式,需要定义一个DateTimeFormat将其转换为我们需要的时间戳格式:

  1. .map(line => {
  2. val linearray = line.split(" ")
  3. val sdf = new SimpleDateFormat("dd/MM/yyyy:HH:mm:ss")
  4. val timestamp = sdf.parse(linearray(3)).getTime
  5. ApacheLogEvent(linearray(0), linearray(2), timestamp,
  6. linearray(5), linearray(6))
  7. })

完整代码如下:

NetworkTrafficAnalysis/src/main/scala/TrafficAnalysis.scala

  1. case class ApacheLogEvent(ip: String, userId: String, eventTime: Long, method: String, url: String)
  2. case class UrlViewCount(url: String, windowEnd: Long, count: Long)
  3. object TrafficAnalysis {
  4. def main(args: Array[String]): Unit = {
  5. val env = StreamExecutionEnvironment.getExecutionEnvironment
  6. env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
  7. env.setParallelism(1)
  8. val stream = env
  9. // 以window下为例,需替换成自己的路径
  10. .readTextFile("YOUR_PATH\\resources\\apache.log")
  11. .map(line => {
  12. val linearray = line.split(" ")
  13. val simpleDateFormat = new SimpleDateFormat("dd/MM/yyyy:HH:mm:ss")
  14. val timestamp = simpleDateFormat.parse(linearray(3)).getTime
  15. ApacheLogEvent(linearray(0), linearray(2), timestamp, linearray(5), linearray(6))
  16. })
  17. .assignTimestampsAndWatermarks(new
  18. BoundedOutOfOrdernessTimestampExtractor[ApacheLogEvent]
  19. (Time.milliseconds(1000)) {
  20. override def extractTimestamp(t: ApacheLogEvent): Long = {
  21. t.eventTime
  22. }
  23. })
  24. .keyBy("url")
  25. .timeWindow(Time.minutes(10), Time.seconds(5))
  26. .aggregate(new CountAgg(), new WindowResultFunction())
  27. .keyBy(1)
  28. .process(new TopNHotUrls(5))
  29. .print()
  30. env.execute("Traffic Analysis Job")
  31. }
  32. class CountAgg extends AggregateFunction[ApacheLogEvent, Long, Long] {
  33. override def createAccumulator(): Long = 0L
  34. override def add(apacheLogEvent: ApacheLogEvent, acc: Long): Long = acc + 1
  35. override def getResult(acc: Long): Long = acc
  36. override def merge(acc1: Long, acc2: Long): Long = acc1 + acc2
  37. }
  38. class WindowResultFunction extends WindowFunction[Long, UrlViewCount, Tuple, TimeWindow] {
  39. override def apply(key: Tuple, window: TimeWindow, aggregateResult: Iterable[Long], collector: Collector[UrlViewCount]) : Unit = {
  40. val url: String = key.asInstanceOf[Tuple1[String]].f0
  41. val count = aggregateResult.iterator.next
  42. collector.collect(UrlViewCount(url, window.getEnd, count))
  43. }
  44. }
  45. class TopNHotUrls(topsize: Int) extends KeyedProcessFunction[Tuple, UrlViewCount, String] {
  46. private var urlState : ListState[UrlViewCount] = _
  47. override def open(parameters: Configuration): Unit = {
  48. super.open(parameters)
  49. val urlStateDesc = new ListStateDescriptor[UrlViewCount]("urlState-state", classOf[UrlViewCount])
  50. urlState = getRuntimeContext.getListState(urlStateDesc)
  51. }
  52. override def processElement(input: UrlViewCount, context: KeyedProcessFunction[Tuple, UrlViewCount, String]#Context, collector: Collector[String]): Unit = {
  53. // 每条数据都保存到状态中
  54. urlState.add(input)
  55. context.timerService.registerEventTimeTimer(input.windowEnd + 1)
  56. }
  57. override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Tuple, UrlViewCount, String]#OnTimerContext, out: Collector[String]): Unit = {
  58. // 获取收到的所有URL访问量
  59. val allUrlViews: ListBuffer[UrlViewCount] = ListBuffer()
  60. import scala.collection.JavaConversions._
  61. for (urlView <- urlState.get) {
  62. allUrlViews += urlView
  63. }
  64. // 提前清除状态中的数据,释放空间
  65. urlState.clear()
  66. // 按照访问量从大到小排序
  67. val sortedUrlViews = allUrlViews.sortBy(_.count)(Ordering.Long.reverse)
  68. .take(topSize)
  69. // 将排名信息格式化成 String, 便于打印
  70. var result: StringBuilder = new StringBuilder
  71. result.append("====================================\n")
  72. result.append("时间: ").append(new Timestamp(timestamp - 1)).append("\n")
  73. for (i <- sortedUrlViews.indices) {
  74. val currentUrlView: UrlViewCount = sortedUrlViews(i)
  75. // e.g. No1: URL=/blog/tags/firefox?flav=rss20 流量=55
  76. result.append("No").append(i+1).append(":")
  77. .append(" URL=").append(currentUrlView.url)
  78. .append(" 流量=").append(currentUrlView.count).append("\n")
  79. }
  80. result.append("====================================\n\n")
  81. // 控制输出频率,模拟实时滚动结果
  82. Thread.sleep(1000)
  83. out.collect(result.toString)
  84. }
  85. }
  86. }

第4章 恶意登录监控

4.1 模块创建和数据准备

继续在UserBehaviorAnalysis下新建一个 maven module作为子项目,命名为LoginFailDetect。在这个子模块中,我们将会用到flink的CEP库来实现事件流的模式匹配,所以需要在pom文件中引入CEP的相关依赖:

  1. <dependency>
  2. <groupId>org.apache.flink</groupId>
  3. <artifactId>flink-cep_${scala.binary.version}</artifactId>
  4. <version>${flink.version}</version>
  5. </dependency>
  6. <dependency>
  7. <groupId>org.apache.flink</groupId>
  8. <artifactId>flink-cep-scala_${scala.binary.version}</artifactId>
  9. <version>${flink.version}</version>
  10. </dependency>

同样,在src/main/目录下,将默认源文件目录java改名为scala。

4.2 代码实现

对于网站而言,用户登录并不是频繁的业务操作。如果一个用户短时间内频繁登录失败,就有可能是出现了程序的恶意攻击,比如密码暴力破解。因此我们考虑,应该对用户的登录失败动作进行统计,具体来说,如果同一用户(可以是不同IP)在2秒之内连续两次登录失败,就认为存在恶意登录的风险,输出相关的信息进行报警提示。这是电商网站、也是几乎所有网站风控的基本一环。

4.2.1 状态编程

由于同样引入了时间,我们可以想到,最简单的方法其实与之前的热门统计类似,只需要按照用户ID分流,然后遇到登录失败的事件时将其保存在ListState中,然后设置一个定时器,2秒后触发。定时器触发时检查状态中的登录失败事件个数,如果大于等于2,那么就输出报警信息。

在src/main/scala下创建LoginFail.scala文件,新建一个单例对象。定义样例类LoginEvent,这是输入的登录事件流。由于没有现成的登录数据,我们用几条自定义的示例数据来做演示。

代码如下:

LoginFailDetect/src/main/scala/LoginFail.scala

  1. case class LoginEvent(userId: Long, ip: String, eventType: String, eventTime: Long)
  2. object LoginFail {
  3. def main(args: Array[String]): Unit = {
  4. val env = StreamExecutionEnvironment.getExecutionEnvironment
  5. env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
  6. env.setParallelism(1)
  7. val loginEventStream = env.fromCollection(List(
  8. LoginEvent(1, "192.168.0.1", "fail", 1558430842),
  9. LoginEvent(1, "192.168.0.2", "fail", 1558430843),
  10. LoginEvent(1, "192.168.0.3", "fail", 1558430844),
  11. LoginEvent(2, "192.168.10.10", "success", 1558430845)
  12. ))
  13. .assignAscendingTimestamps(_.eventTime * 1000)
  14. .keyBy(_.userId)
  15. .process(new MatchFunction())
  16. .print()
  17. env.execute("Login Fail Detect Job")
  18. }
  19. class MatchFunction extends KeyedProcessFunction[Long, LoginEvent, LoginEvent] {
  20. // 定义状态变量
  21. lazy val loginState: ListState[LoginEvent] = getRuntimeContext.getListState(
  22. new ListStateDescriptor[LoginEvent]("saved login", classOf[LoginEvent]))
  23. override def processElement(login: LoginEvent,
  24. context: KeyedProcessFunction[Long, LoginEvent,
  25. LoginEvent]#Context, out: Collector[LoginEvent]): Unit = {
  26. if (login.eventType == "fail") {
  27. loginState.add(login)
  28. }
  29. // 注册定时器,触发事件设定为2秒后
  30. context.timerService.registerEventTimeTimer(login.eventTime + 2 * 1000)
  31. }
  32. override def onTimer(timestamp: Long,
  33. ctx: KeyedProcessFunction[Long, LoginEvent,
  34. LoginEvent]#OnTimerContext, out: Collector[LoginEvent]): Unit = {
  35. val allLogins: ListBuffer[LoginEvent] = ListBuffer()
  36. import scala.collection.JavaConversions._
  37. for (login <- loginState.get) {
  38. allLogins += login
  39. }
  40. loginState.clear()
  41. if (allLogins.length > 1) {
  42. out.collect(allLogins.head)
  43. }
  44. }
  45. }
  46. }

4.2.2 CEP编程

上一节的代码实现中我们可以看到,直接把每次登录失败的数据存起来、设置定时器一段时间后再读取,这种做法尽管简单,但和我们开始的需求还是略有差异的。这种做法只能隔2秒之后去判断一下这期间是否有多次失败登录,而不是在一次登录失败之后、再一次登录失败时就立刻报警。这个需求如果严格实现起来,相当于要判断任意紧邻的事件,是否符合某种模式。这听起来就很复杂了,那有什么方式可以方便地实现呢?

很幸运,flink为我们提供了CEP(Complex Event Processing,复杂事件处理)库,用于在流中筛选符合某种复杂模式的事件。接下来我们就基于CEP来完成这个模块的实现。

在src/main/scala下继续创建LoginFailWithCep.scala文件,新建一个单例对象。样例类LoginEvent由于在LoginFail.scala已经定义,我们在同一个模块中就不需要再定义了。

代码如下:

LoginFailDetect/src/main/scala/LoginFailWithCep.scala

  1. object LoginFailWithCep {
  2. def main(args: Array[String]): Unit = {
  3. val env = StreamExecutionEnvironment.getExecutionEnvironment
  4. env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
  5. env.setParallelism(1)
  6. val loginEventStream = env.fromCollection(List(
  7. LoginEvent(1, "192.168.0.1", "fail", 1558430842),
  8. LoginEvent(1, "192.168.0.2", "fail", 1558430843),
  9. LoginEvent(1, "192.168.0.3", "fail", 1558430844),
  10. LoginEvent(2, "192.168.10.10", "success", 1558430845)
  11. )).assignAscendingTimestamps(_.eventTime * 1000)
  12. // 定义匹配模式
  13. val loginFailPattern = Pattern.begin[LoginEvent]("begin")
  14. .where(_.eventType == "fail")
  15. .next("next")
  16. .where(_.eventType == "fail")
  17. .within(Time.seconds(2))
  18. // 在数据流中匹配出定义好的模式
  19. val patternStream = CEP.pattern(loginEventStream.keyBy(_.userId), loginFailPattern)
  20. // .select方法传入一个 pattern select function,当检测到定义好的模式序列时就会调用
  21. val loginFailDataStream = patternStream
  22. .select((pattern: Map[String, Iterable[LoginEvent]]) => {
  23. val first = pattern.getOrElse("begin", null).iterator.next()
  24. val second = pattern.getOrElse("next", null).iterator.next()
  25. (second.userId, second.ip, second.eventType)
  26. })
  27. // 将匹配到的符合条件的事件打印出来
  28. loginFailDataStream.print()
  29. env.execute("Login Fail Detect Job")
  30. }
  31. }

第5章 订单支付实时监控

5.1 模块创建和数据准备

同样地,在UserBehaviorAnalysis下新建一个 maven module作为子项目,命名为OrderTimeoutDetect。在这个子模块中,我们同样将会用到flink的CEP库来实现事件流的模式匹配,所以需要在pom文件中引入CEP的相关依赖:

  1. <dependency>
  2. <groupId>org.apache.flink</groupId>
  3. <artifactId>flink-cep_${scala.binary.version}</artifactId>
  4. <version>${flink.version}</version>
  5. </dependency>
  6. <dependency>
  7. <groupId>org.apache.flink</groupId>
  8. <artifactId>flink-cep-scala_${scala.binary.version}</artifactId>
  9. <version>${flink.version}</version>
  10. </dependency>

同样,在src/main/目录下,将默认源文件目录java改名为scala。

5.2 代码实现

在电商平台中,最终创造收入和利润的是用户下单购买的环节;更具体一点,是用户真正完成支付动作的时候。用户下单的行为可以表明用户对商品的需求,但在现实中,并不是每次下单都会被用户立刻支付。当拖延一段时间后,用户支付的意愿会降低。所以为了让用户更有紧迫感从而提高支付转化率,同时也为了防范订单支付环节的安全风险,电商网站往往会对订单状态进行监控,设置一个失效时间(比如15分钟),如果下单后一段时间仍未支付,订单就会被取消。

我们将会利用CEP库来实现这个功能。我们先将事件流按照订单号orderId分流,然后定义这样的一个事件模式:在15分钟内,事件“create”与“pay”严格紧邻:

  1. val orderPayPattern = Pattern.begin[OrderEvent]("begin")
  2. .where(_.eventType == "create")
  3. .next("next")
  4. .where(_.eventType == "pay")
  5. .within(Time.seconds(5))

这样调用.select方法时,就可以同时获取到匹配出的事件和超时未匹配的事件了。

在src/main/scala下继续创建OrderTimeout.scala文件,新建一个单例对象。定义样例类OrderEvent,这是输入的订单事件流;另外还有OrderResult,这是输出显示的订单状态结果。由于没有现成的数据,我们还是用几条自定义的示例数据来做演示。

完整代码如下:

OrderTimeoutDetect/src/main/scala/OrderTimeout.scala

  1. case class OrderEvent(orderId: Long, eventType: String, eventTime: Long)
  2. case class OrderResult(orderId: Long, eventType: String)
  3. object OrderTimeout {
  4. def main(args: Array[String]): Unit = {
  5. val env = StreamExecutionEnvironment.getExecutionEnvironment
  6. env.setParallelism(1)
  7. env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
  8. val orderEventStream = env.fromCollection(List(
  9. OrderEvent(1, "create", 1558430842),
  10. OrderEvent(2, "create", 1558430843),
  11. OrderEvent(2, "pay", 1558430844)
  12. )).assignAscendingTimestamps(_.eventTime * 1000)
  13. // 定义一个带匹配时间窗口的模式
  14. val orderPayPattern = Pattern.begin[OrderEvent]("begin")
  15. .where(_.eventType == "create")
  16. .next("next")
  17. .where(_.eventType == "pay")
  18. .within(Time.minutes(15))
  19. // 定义一个输出标签
  20. val orderTimeoutOutput = OutputTag[OrderResult]("orderTimeout")
  21. // 订单事件流根据 orderId 分流,然后在每一条流中匹配出定义好的模式
  22. val patternStream = CEP.pattern(orderEventStream.keyBy("orderId"), orderPayPattern)
  23. val complexResult = patternStream.select(orderTimeoutOutput) {
  24. // 对于已超时的部分模式匹配的事件序列,会调用这个函数
  25. (pattern: Map[String, Iterable[OrderEvent]], timestamp: Long) => {
  26. val createOrder = pattern.get("begin")
  27. OrderResult(createOrder.get.iterator.next().orderId, "timeout")
  28. }
  29. } {
  30. // 检测到定义好的模式序列时,就会调用这个函数
  31. pattern: Map[String, Iterable[OrderEvent]] => {
  32. val payOrder = pattern.get("next")
  33. OrderResult(payOrder.get.iterator.next().orderId, "success")
  34. }
  35. }
  36. // 拿到同一输出标签中的 timeout 匹配结果(流)
  37. val timeoutResult = complexResult.getSideOutput(orderTimeoutOutput)
  38. complexResult.print()
  39. timeoutResult.print()
  40. env.execute("Order Timeout Detect Job")
  41. }
  42. }

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/不正经/article/detail/642549
推荐阅读
相关标签
  

闽ICP备14008679号