赞
踩
自定义源算子
import org.apache.flink.streaming.api.functions.source.SourceFunction import java.util.Calendar import scala.util.Random /** * DATE:2022/10/4 0:03 * AUTHOR:GX */ case class Event(user:String,url:String,timestamp:Long) class ClickSource extends SourceFunction[Event]{ //ParallelSourceFunction[Event] 算子可以设置并行度 //SourceFunction[Event] 并行度必须是 1 //标志位 var running = true override def run(sourceContext: SourceFunction.SourceContext[Event]): Unit = { //随机数生成器 val random = new Random() //定义随机数范围 val users = Array("Mary","Alice","Bob","Cary","Leborn") val urls = Array("./home","./cart","./fav","./prod?id=1","./prod?id=2","./prod?id=3") //用标志位作为循环判断条件,不停的发送数据 while (running) { //随机生成一个event val event = Event(users(random.nextInt(users.length)), urls(random.nextInt(urls.length)), Calendar.getInstance.getTimeInMillis) //调用ctx的方法向下游发送数据 sourceContext.collect(event) //每隔1秒发送一条数据 Thread.sleep(1000) } } override def cancel(): Unit = running = false }
Flink程序 import org.apache.flink.api.common.functions.{FilterFunction, FlatMapFunction, MapFunction} import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, createTypeInformation} import org.apache.flink.util.Collector /** * DATE:2022/10/4 12:01 * AUTHOR:GX */ object Transformastion { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) //读取自定义数据源 val stream = env.addSource(new ClickSource) //注意!!!对于一个非并行的算子而言,并行度必须是 1 //若想要并行处理,需要实现一个ParallelSourceFunction(自定义源算子继承ParallelSourceFunction),算子并行度才可以设置多个 //提取每次点击事件的用户名\ //1.使用匿名函数 stream.map(_.user).print("1") //2.实现MapFunction接口 stream.map(new UserExtractor).print("2") //3.使用匿名函数 stream.filter(_.user == "Mary").print("3") //4.实现FilterFunction接口 stream.filter(new UserFilter).print("4") //5.测试flatMap stream.flatMap(new UserFlatMap).print("5") env.execute() } class UserExtractor extends MapFunction[Event,String] {//[输入数据类型,输出数据类型] override def map(t: Event): String = t.user } class UserFilter extends FilterFunction[Event] { override def filter(t: Event): Boolean = t.user == "Leborn" } class UserFlatMap extends FlatMapFunction[Event,String] { override def flatMap(value: Event, out: Collector[String]): Unit = { //如果当前数据是Mary的点击事件,那么就直接输出User if (value.user == "Mary") { out.collect(value.user) } //如果当前数据是Leborn的点击事件,那么就直接输出User和Url else if (value.user == "Leborn") { out.collect(value.user) out.collect(value.url) } } } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。