当前位置:   article > 正文

javaFlink

javaflink

sql排序

dense_rank  会考虑并列

row_number() 不会考虑并列 会直接排序

rank()  会考虑并列,但会直接跳过并列的值

source读取文件

  1. //第二个是编码格式
  2. Iterator<String> lines = Source.fromFile(new File("input/click.txt"),"utf8").getLines();
  3. while (lines.hasNext()){
  4. String next = lines.next();
  5. System.out.println(next);
  6. }

获取当前时间戳

  1. import java.util.Calendar;
  2. long l = Calendar.getInstance().getTimeInMillis();

sink redis

  1. package p52;
  2. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  3. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  4. import org.apache.flink.streaming.connectors.redis.RedisSink;
  5. import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
  6. import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
  7. import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
  8. import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
  9. import p33.Event;
  10. /**
  11. * @author jiasongfan
  12. * @date 2022/7/25
  13. * @apiNote
  14. */
  15. public class SinkToRedis {
  16. public static void main(String[] args) throws Exception {
  17. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  18. env.setParallelism(1);
  19. DataStreamSource<Event> streamSource = env.fromElements(
  20. new Event("bob", "./home", 1000L),
  21. new Event("mary", "./home", 2000L),
  22. new Event("bob", "./pord", 3000L)
  23. );
  24. streamSource.print();
  25. //创建一个连接配置
  26. FlinkJedisPoolConfig config = new FlinkJedisPoolConfig.Builder()
  27. .setHost("hdp1")
  28. .build();
  29. //sink 到redis
  30. streamSource.addSink(new RedisSink<>(config,new MyRedisMapper()));
  31. env.execute();
  32. }
  33. //自动义类 实现redisMapper接口
  34. public static class MyRedisMapper implements RedisMapper<Event>{
  35. @Override
  36. public RedisCommandDescription getCommandDescription() {
  37. return new RedisCommandDescription(RedisCommand.HSET,"clicks");
  38. }
  39. @Override
  40. public String getKeyFromData(Event event) {
  41. return event.user;
  42. }
  43. @Override
  44. public String getValueFromData(Event event) {
  45. return event.url;
  46. }
  47. }
  48. }

source mysql

  1. package p33;
  2. import org.apache.flink.configuration.Configuration;
  3. import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
  4. import java.sql.Connection;
  5. import java.sql.DriverManager;
  6. import java.sql.PreparedStatement;
  7. import java.sql.ResultSet;
  8. /**
  9. * @author jiasongfan
  10. * @date 2022/7/20
  11. * @apiNote
  12. */
  13. public class RClickSOurce extends RichSourceFunction<Event> {
  14. Connection conn =null;
  15. PreparedStatement ps =null;
  16. @Override
  17. public void open(Configuration parameters) throws Exception {
  18. Class.forName("com.mysql.jdbc.Driver");
  19. conn=DriverManager.getConnection("jdbc:mysql://hdp1:3306/1912b", "root", "root");
  20. ps=conn.prepareStatement("select * from t_event");
  21. }
  22. @Override
  23. public void close() throws Exception {
  24. conn.close();
  25. ps.close();
  26. }
  27. @Override
  28. public void run(SourceContext<Event> ctx) throws Exception {
  29. ResultSet resultSet = ps.executeQuery();
  30. while (resultSet.next()){
  31. String users = resultSet.getString(1);
  32. String uils = resultSet.getString(2);
  33. Long ts = resultSet.getLong(3);
  34. ctx.collect(new Event(users,uils,ts));
  35. }
  36. }
  37. @Override
  38. public void cancel() {
  39. }
  40. }

maxby 求最大 里面的值只能写字段名

  1. KeyedStream<Event, String> keyedStream = eventSource.keyBy(data -> data.user);
  2. SingleOutputStreamOperator<Event> max = keyedStream.maxBy("ts");

reduce的规约聚合

  1. package p42;
  2. import org.apache.flink.api.common.functions.MapFunction;
  3. import org.apache.flink.api.common.functions.ReduceFunction;
  4. import org.apache.flink.api.java.functions.KeySelector;
  5. import org.apache.flink.api.java.tuple.Tuple2;
  6. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  7. import org.apache.flink.streaming.api.datastream.KeyedStream;
  8. import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
  9. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  10. import p33.Event;
  11. /**
  12. * @author jiasongfan
  13. * @date 2022/7/21
  14. * @apiNote
  15. */
  16. public class TranformReduceTest {
  17. public static void main(String[] args) throws Exception {
  18. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  19. env.setParallelism(1);
  20. DataStreamSource<Event> eventSource = env.fromElements(new Event("bob", "./home", 1000L),
  21. new Event("mary", "./home", 2000L),
  22. new Event("bob", "./pord", 3000L)
  23. );
  24. //统计每个项目访问次数
  25. SingleOutputStreamOperator<Tuple2<String, Long>> map = eventSource.map(new MapFunction<Event, Tuple2<String, Long>>() {
  26. @Override
  27. public Tuple2<String, Long> map(Event event) throws Exception {
  28. return Tuple2.of(event.user, 1L);
  29. }
  30. });
  31. KeyedStream<Tuple2<String, Long>, String> keyedStream = map.keyBy(new KeySelector<Tuple2<String, Long>, String>() {
  32. @Override
  33. public String getKey(Tuple2<String, Long> v1) throws Exception {
  34. return v1.f0;
  35. }
  36. });
  37. SingleOutputStreamOperator<Tuple2<String, Long>> reduce = keyedStream.reduce(new ReduceFunction<Tuple2<String, Long>>() {
  38. @Override
  39. public Tuple2<String, Long> reduce(Tuple2<String, Long> v1, Tuple2<String, Long> t1) throws Exception {
  40. return Tuple2.of(v1.f0, v1.f1 + t1.f1);
  41. }
  42. });
  43. //选取当前最活跃的用户
  44. //这个key 是随便写的 用于把短的数据分到一个组
  45. SingleOutputStreamOperator<Tuple2<String, Long>> result = reduce.keyBy(data -> "key").reduce(new ReduceFunction<Tuple2<String, Long>>() {
  46. @Override
  47. public Tuple2<String, Long> reduce(Tuple2<String, Long> v1, Tuple2<String, Long> t1) throws Exception {
  48. return v1.f1 > t1.f1 ? v1 : t1;
  49. }
  50. });
  51. result.print();
  52. env.execute();
  53. }
  54. }

sink到redis

redis的启动

进入到redis页面

redis-server redis.conf

导包

  1. <!-- 连接到redis-->
  2. <dependency>
  3. <groupId>org.apache.bahir</groupId>
  4. <artifactId>flink-connector-redis_2.11</artifactId>
  5. <version>1.0</version>
  6. </dependency>
  1. package p52;
  2. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  3. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  4. import org.apache.flink.streaming.connectors.redis.RedisSink;
  5. import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
  6. import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
  7. import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
  8. import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
  9. import org.apache.flink.table.expressions.E;
  10. import p33.Event;
  11. /**
  12. * @author jiasongfan
  13. * @date 2022/7/25
  14. * @apiNote
  15. */
  16. public class SinkToRedis1 {
  17. public static void main(String[] args) throws Exception {
  18. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  19. env.setParallelism(1);
  20. DataStreamSource<Event> streamSource = env.fromElements(
  21. new Event("bob", "./home", 1000L),
  22. new Event("mary", "./home", 2000L),
  23. new Event("bob", "./pord", 3000L)
  24. );
  25. FlinkJedisPoolConfig config = new FlinkJedisPoolConfig.Builder()
  26. .setHost("hdp1")
  27. .build();
  28. streamSource.addSink(new RedisSink<>(config,new RedisMapper1()));
  29. env.execute();
  30. }
  31. public static class RedisMapper1 implements RedisMapper<Event>{
  32. @Override
  33. public RedisCommandDescription getCommandDescription() {
  34. //采用hash
  35. return new RedisCommandDescription(RedisCommand.HSET,"clicks");
  36. }
  37. @Override
  38. public String getKeyFromData(Event event) {
  39. return event.user;
  40. }
  41. @Override
  42. public String getValueFromData(Event event) {
  43. return event.url;
  44. }
  45. }
  46. }

avg平均值的 计算  (带窗口 必须使用processwindowfunction)

  1. package p73;
  2. import org.apache.flink.api.common.eventtime.WatermarkStrategy;
  3. import org.apache.flink.api.common.functions.AggregateFunction;
  4. import org.apache.flink.api.java.tuple.Tuple2;
  5. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  6. import org.apache.flink.streaming.api.datastream.KeyedStream;
  7. import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
  8. import org.apache.flink.streaming.api.datastream.WindowedStream;
  9. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  10. import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
  11. import org.apache.flink.streaming.api.scala.function.WindowFunction;
  12. import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
  13. import org.apache.flink.streaming.api.windowing.time.Time;
  14. import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
  15. import org.apache.flink.util.Collector;
  16. import scala.collection.Iterable;
  17. import scala.collection.Iterator;
  18. import java.lang.Double;
  19. import java.time.Duration;
  20. /**
  21. * @author jiasongfan
  22. * @date 2022/7/27
  23. * @apiNote
  24. */
  25. public class AggTest {
  26. public static void main(String[] args) throws Exception {
  27. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  28. env.setParallelism(1);
  29. DataStreamSource<ABean> streamSource = env.fromElements(
  30. new ABean("bob", 3, 1000L),
  31. new ABean("Mary", 4, 2000L),
  32. new ABean("bob", 5, 3000L)
  33. );
  34. //求平均值
  35. SingleOutputStreamOperator<ABean> timeDS = streamSource.assignTimestampsAndWatermarks(WatermarkStrategy
  36. .<ABean>forBoundedOutOfOrderness(Duration.ofSeconds(0))
  37. .withTimestampAssigner((event, timestamp) -> event.ts));
  38. KeyedStream<ABean, String> keyDS = timeDS.keyBy(data -> data.id);
  39. WindowedStream<ABean, String, TimeWindow> winDS = keyDS.window(TumblingEventTimeWindows.of(Time.seconds(5)));
  40. SingleOutputStreamOperator<WData> avgDS = winDS.aggregate(new MyAvg(), new MyWin2());
  41. avgDS.print();
  42. env.execute();
  43. }
  44. //[IN, OUT, KEY, W <: Window]
  45. public static class MyWin2 extends ProcessWindowFunction<Double,WData, String,TimeWindow> {
  46. @Override
  47. public void process(String s, ProcessWindowFunction<Double, WData, String, TimeWindow>.Context context, java.lang.Iterable<Double> iterable, Collector<WData> collector) throws Exception {
  48. long start = context.window().getStart();
  49. long end = context.window().getEnd();
  50. Double next = iterable.iterator().next();
  51. collector.collect(new WData(start,end,s,next));
  52. }
  53. }
  54. public static class MyWin1 implements WindowFunction<Double, WData, String, TimeWindow> {
  55. @Override
  56. public void apply(String s, TimeWindow window, Iterable<Double> input, Collector<WData> out) {
  57. Iterator<Double> iterator = input.iterator();
  58. while (iterator.hasNext()){
  59. Double next = iterator.next();
  60. out.collect(new WData(window.getStart(),window.getEnd(),s,next));
  61. }
  62. }
  63. }
  64. public static class MyAvg implements AggregateFunction<ABean, Tuple2<Long, Long>, Double> {
  65. @Override
  66. public Tuple2<Long, Long> createAccumulator() {
  67. return Tuple2.of(0L,0L);
  68. }
  69. @Override
  70. public Tuple2<Long, Long> add(ABean aBean, Tuple2<Long, Long> v1) {
  71. return Tuple2.of((aBean.num+v1.f0),(v1.f1+1L));
  72. }
  73. @Override
  74. public Double getResult(Tuple2<Long, Long> v1) {
  75. return ((double)v1.f0)/v1.f1 ;
  76. }
  77. @Override
  78. public Tuple2<Long, Long> merge(Tuple2<Long, Long> v1, Tuple2<Long, Long> acc1) {
  79. return Tuple2.of(v1.f0+acc1.f0,v1.f1+acc1.f1);
  80. }
  81. }
  82. }

sum计次数

  1. package p73;
  2. import org.apache.flink.api.common.eventtime.WatermarkStrategy;
  3. import org.apache.flink.api.common.functions.AggregateFunction;
  4. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  5. import org.apache.flink.streaming.api.datastream.KeyedStream;
  6. import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
  7. import org.apache.flink.streaming.api.datastream.WindowedStream;
  8. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  9. import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
  10. import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
  11. import org.apache.flink.streaming.api.windowing.time.Time;
  12. import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
  13. import org.apache.flink.util.Collector;
  14. import java.time.Duration;
  15. /**
  16. * @author jiasongfan
  17. * @date 2022/7/27
  18. * @apiNote
  19. */
  20. public class SumTest {
  21. public static void main(String[] args) throws Exception {
  22. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  23. env.setParallelism(1);
  24. DataStreamSource<ABean> streamSource = env.fromElements(
  25. new ABean("bob", 3, 1000L),
  26. new ABean("Mary", 4, 2000L),
  27. new ABean("bob", 5, 3000L)
  28. );
  29. //求平均值
  30. SingleOutputStreamOperator<ABean> timeDS = streamSource.assignTimestampsAndWatermarks(WatermarkStrategy
  31. .<ABean>forBoundedOutOfOrderness(Duration.ofSeconds(0))
  32. .withTimestampAssigner((event, timestamp) -> event.ts));
  33. KeyedStream<ABean, String> keyDS = timeDS.keyBy(data -> data.id);
  34. WindowedStream<ABean, String, TimeWindow> winDS = keyDS.window(TumblingEventTimeWindows.of(Time.seconds(5)));
  35. SingleOutputStreamOperator<String> sumDS = winDS.aggregate(new MySum(), new MyWin2());
  36. sumDS.print();
  37. env.execute();
  38. }
  39. //[IN, OUT, KEY, W <: Window]
  40. public static class MyWin2 extends ProcessWindowFunction<Integer,String,String,TimeWindow> {
  41. @Override
  42. public void process(String s, ProcessWindowFunction<Integer, String, String, TimeWindow>.Context context, Iterable<Integer> iterable, Collector<String> collector) throws Exception {
  43. long start = context.window().getStart();
  44. long end = context.window().getEnd();
  45. Integer next = iterable.iterator().next();
  46. collector.collect(start+","+end+","+next);
  47. }
  48. }
  49. //<IN, ACC, OUT>
  50. public static class MySum implements AggregateFunction<ABean,Integer,Integer>{
  51. @Override
  52. public Integer createAccumulator() {
  53. return 0;
  54. }
  55. @Override
  56. public Integer add(ABean aBean, Integer integer) {
  57. return integer+1;
  58. }
  59. @Override
  60. public Integer getResult(Integer integer) {
  61. return integer;
  62. }
  63. @Override
  64. public Integer merge(Integer integer, Integer acc1) {
  65. return null;
  66. }
  67. }
  68. }

累加器

  1. /**
  2. * @author jiasongfan
  3. * @date 2022/7/28
  4. * @apiNote
  5. */
  6. import org.apache.flink.api.common.JobExecutionResult;
  7. import org.apache.flink.api.common.accumulators.IntCounter;
  8. import org.apache.flink.api.common.functions.RichMapFunction;
  9. import org.apache.flink.configuration.Configuration;
  10. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  11. import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
  12. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  13. import org.apache.flink.streaming.api.windowing.time.Time;
  14. import org.apache.hadoop.yarn.webapp.hamlet.Hamlet;
  15. public class SumTest {
  16. public static void main(String[] args) throws Exception {
  17. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  18. //env.setParallelism(1);
  19. //2.Source
  20. DataStreamSource<String> streamSource = env.fromElements("aaa", "bbb", "ccc", "ddd");
  21. SingleOutputStreamOperator<String> result = streamSource.map(new RichMapFunction<String, String>() {
  22. //1 创建累加器
  23. private IntCounter elementCounter = new IntCounter();
  24. @Override
  25. public void open(Configuration parameters) throws Exception {
  26. //注册累加器
  27. getRuntimeContext().addAccumulator("elementCounter", elementCounter);
  28. }
  29. @Override
  30. public String map(String s) throws Exception {
  31. this.elementCounter.add(1);
  32. return s;
  33. }
  34. });
  35. //-4.获取加强结果
  36. JobExecutionResult jobExecutionResult = env.execute();
  37. int nums = jobExecutionResult.getAccumulatorResult("elementCounter");
  38. System.out.println("使用累加器统计的结果:"+nums);
  39. }
  40. }

sparksql

求女生占比

sum(case when sex='女' then 1 else 0  end) cnt1,

count(*) cnt ,

(sum(case when sex='女' then 1 else 0  end)*1.0/count(*)) cnt2

tableSQL (带聚合的 操作)

  1. package p128;
  2. /**
  3. * @author jiasongfan
  4. * @date 2022/7/30
  5. * @apiNote
  6. */
  7. import org.apache.flink.table.api.EnvironmentSettings;
  8. import org.apache.flink.table.api.Table;
  9. import org.apache.flink.table.api.TableEnvironment;
  10. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  11. import org.apache.flink.table.api.EnvironmentSettings;
  12. import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
  13. public class TableTest02 {
  14. public static void main(String[] args) throws Exception {
  15. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  16. EnvironmentSettings settings = EnvironmentSettings
  17. .newInstance()
  18. .inStreamingMode()
  19. //.inBatchMode()
  20. .useBlinkPlanner()
  21. .build();
  22. StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);
  23. //Mary,./home,1000
  24. tEnv.executeSql("CREATE TABLE t_table (\n"
  25. + " users String,\n"
  26. + " urls STRING,\n"
  27. + " ts Bigint) \n"
  28. + "WITH (\n"
  29. + " 'connector' = 'filesystem', -- required: specify the connector\n"
  30. + " 'path' = 'file:///D:\\E\\month9class\\zong\\input\\click.txt', -- required: path to a directory\n"
  31. + " 'format' = 'csv'"
  32. + ")");
  33. tEnv.executeSql("select users,sum(ts) sums from t_table"
  34. + " group by users");
  35. //.print();
  36. //写入到kafka
  37. Table table = tEnv.sqlQuery("select users,sum(ts) sums from t_table group by users");
  38. //聚合转换
  39. tEnv.toChangelogStream(table).print();
  40. env.execute();
  41. }
  42. }

topN

  1. package p86;
  2. import org.apache.flink.api.common.eventtime.WatermarkStrategy;
  3. import org.apache.flink.api.common.state.ListState;
  4. import org.apache.flink.api.common.state.ListStateDescriptor;
  5. import org.apache.flink.api.common.typeinfo.TypeHint;
  6. import org.apache.flink.api.common.typeinfo.TypeInformation;
  7. import org.apache.flink.configuration.Configuration;
  8. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  9. import org.apache.flink.streaming.api.datastream.KeyedStream;
  10. import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
  11. import org.apache.flink.streaming.api.datastream.WindowedStream;
  12. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  13. import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
  14. import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
  15. import org.apache.flink.streaming.api.windowing.time.Time;
  16. import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
  17. import org.apache.flink.util.Collector;
  18. import p73.ABean;
  19. import p73.AggTest;
  20. import p73.WData;
  21. import java.time.Duration;
  22. import java.util.ArrayList;
  23. import java.util.Comparator;
  24. import java.util.List;
  25. /**
  26. * @author jiasongfan
  27. * @date 2022/7/28
  28. * @apiNote
  29. */
  30. public class TopNTest {
  31. public static void main(String[] args) throws Exception {
  32. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  33. env.setParallelism(1);
  34. DataStreamSource<ABean> streamSource = env.fromElements(
  35. new ABean("bob", 3, 1000L),
  36. new ABean("Mary", 4, 2000L),
  37. new ABean("bob", 5, 3000L),
  38. new ABean("Kak", 5, 4000L),
  39. new ABean("KaK", 7, 5000L),
  40. new ABean("NoNO", 8, 6000L),
  41. new ABean("ZZ", 8, 7000L),
  42. new ABean("NoNO", 8, 9000L),
  43. new ABean("NoNO", 8, 11000L)
  44. );
  45. //求平均值
  46. SingleOutputStreamOperator<ABean> timeDS = streamSource.assignTimestampsAndWatermarks(WatermarkStrategy
  47. .<ABean>forBoundedOutOfOrderness(Duration.ofSeconds(0))
  48. .withTimestampAssigner((event, timestamp) -> event.ts));
  49. KeyedStream<ABean, String> keyDS = timeDS.keyBy(data -> data.id);
  50. WindowedStream<ABean, String, TimeWindow> winDS = keyDS.window(TumblingEventTimeWindows.of(Time.seconds(5)));
  51. SingleOutputStreamOperator<WData> avgDS = winDS.aggregate(new AggTest.MyAvg(), new AggTest.MyWin2());
  52. SingleOutputStreamOperator<String> process = avgDS.keyBy(data -> data.end).process(new MyPro2());
  53. process.print("topN");
  54. env.execute();
  55. }
  56. public static class MyPro2 extends KeyedProcessFunction<Long,WData,String>{
  57. //定义状态
  58. ListStateDescriptor<WData> descriptor =
  59. new ListStateDescriptor<>(
  60. "buffered-elements",
  61. TypeInformation.of(new TypeHint<WData>() {}));
  62. ListState<WData> list=null;
  63. @Override
  64. public void open(Configuration parameters) throws Exception {
  65. list = getRuntimeContext().getListState(descriptor);
  66. }
  67. @Override
  68. public void processElement(WData wData, KeyedProcessFunction<Long, WData, String>.Context context, Collector<String> collector) throws Exception {
  69. list.add(wData);
  70. //注册定时器
  71. context.timerService().registerEventTimeTimer(context.getCurrentKey()+1);
  72. }
  73. @Override
  74. public void onTimer(long timestamp, KeyedProcessFunction<Long, WData, String>.OnTimerContext ctx, Collector<String> out) throws Exception {
  75. ArrayList<WData> wData = new ArrayList<>();
  76. for (WData wDatum : list.get()) {
  77. wData.add(wDatum);
  78. }
  79. //排序
  80. wData.sort(new Comparator<WData>() {
  81. @Override
  82. public int compare(WData o1, WData o2) {
  83. return o2.getAvg().intValue()-o1.getAvg().intValue();
  84. }
  85. });
  86. if(wData.size()>=3){
  87. List<WData> wData1 = wData.subList(0, 3);
  88. out.collect(wData1.toString());
  89. }else{
  90. out.collect(wData.toString());
  91. }
  92. }
  93. }
  94. }

自定义source类型

  1. package p128;
  2. import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
  3. import org.apache.hadoop.yarn.webapp.hamlet.Hamlet;
  4. import p33.Event;
  5. import java.util.Date;
  6. import java.util.Random;
  7. /**
  8. * @author jiasongfan
  9. * @date 2022/7/30
  10. * @apiNote
  11. */
  12. public class Clickhouse extends RichSourceFunction<Event> {
  13. private boolean running=true;
  14. @Override
  15. public void run(SourceContext<Event> sourceContext) throws Exception {
  16. //随机生成数据
  17. Random random = new Random();
  18. //定义字段选取的数据集
  19. String[] users= {"Mary","","Alice","Bob","Cary"};
  20. String[] urls={"./home","./cart","./fav","./prod?id=100"};
  21. //循环生成数据
  22. while (true){
  23. String user = users[random.nextInt(users.length)];
  24. String url = urls[random.nextInt(urls.length)];
  25. long time = new Date().getTime();
  26. sourceContext.collect(new Event(user,url,time));
  27. Thread.sleep(1000L);
  28. }
  29. }
  30. @Override
  31. public void cancel() {
  32. running=false;
  33. }
  34. }

拿window 的ip地址

ipconfig

192.168.0.105

本地改localhost

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/煮酒与君饮/article/detail/972376
推荐阅读
相关标签
  

闽ICP备14008679号