赞
踩
UserBehavior2.csv
543462,1715,1464116,pv,1587635915932
662867,1162383,570735,pv,1587635915932
561558,3611281,965809,pv,1587635915932
894923,3076029,1879194,pv,1587635915932
834377,1256540,1451783,pv,1587635915932
315321,942195,4339722,pv,1587635915932
625915,1162383,570735,pv,1587635915932
578814,534083,4203730,pv,1587635915932
873335,1256540,1451783,pv,1587635915932
429984,4625350,2355072,pv,1587635915932
866796,534083,4203730,pv,1587635915932
937166,534083,4203730,pv,1587635915932
itemInfo.csv
1715,aeeeeeeeee
1162383,beeeeeeee
3611281,9eee
3076029,18eee
1256540,14eee
942195,43ddd
534083,42bbb
4625350,23bbb
534083,42aaa
package myflink; import lombok.Data; import myflink.entity.ItemInfo; import myflink.entity.UserBehavior; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.java.BatchTableEnvironment; import java.net.URL; public class TableJobJoinExample { public static void main(String[] args) throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); // env. // env.setStateBackend(new FsStateBackend("hdfs://namenode:40010/flink/checkpoints")); // env BatchTableEnvironment tableEnv = BatchTableEnvironment.create(env); Table topScore = getOrder(env, tableEnv); //将topScore注册为一个表 , tableEnv.registerTable("topScore", topScore); // 商品信息表 Table itemInfo = getItemInfo(env, tableEnv); tableEnv.registerTable("itemInfo", itemInfo); //查询item点击数,及itemId的名字 String groupSql="select a.itemId,count(*) as sum_total_score,b.desc from topScore a join itemInfo b on a.itemId=b.itemId group by a.itemId,categoryId,b.desc order by itemId desc"; // 按照滚动时间窗口来处理 String tumbleWinGroupSql="select a.itemId,count(*) as sum_total_score from topScore a join itemInfo b on a.itemId=b.itemId group by TUMBLE(a.createTime,INTERVAL '2' MINUTE), a.itemId order by itemId desc"; Table groupedByCountry = tableEnv.sqlQuery(groupSql); //转换回dataset DataSet<Result> result = tableEnv.toDataSet(groupedByCountry, Result.class); //将dataset map成tuple输出 result.map(new MapFunction<Result, Tuple2<Long, Long>>() { @Override public Tuple2<Long, Long> map(Result result) throws Exception { System.out.println("=============================" + result); Long country = result.itemId; Long sum_total_score = result.sum_total_score; return Tuple2.of(country, sum_total_score); } }).print(); } /** * item点击 * * @param env * @param tableEnv * @return */ private static Table getOrder(ExecutionEnvironment env, BatchTableEnvironment tableEnv) { // UserBehavior.csv 的本地文件路径, 在 resources 目录下 URL fileUrl = HotItemsSql.class.getClassLoader().getResource("UserBehavior2.csv"); //source,这里读取CSV文件,并转换为对应的Class DataSet<UserBehavior> csvInput = env .readCsvFile(fileUrl.getPath()) .ignoreFirstLine().pojoType(UserBehavior.class, "userId", "itemId", "categoryId", "behavior", "createTime"); //将DataSet转换为Table return tableEnv.fromDataSet(csvInput); } /** * itemInfo * * @param env * @param tableEnv * @return */ private static Table getItemInfo(ExecutionEnvironment env, BatchTableEnvironment tableEnv) { // UserBehavior.csv 的本地文件路径, 在 resources 目录下 URL fileUrl = HotItemsSql.class.getClassLoader().getResource("ItemInfo.csv"); //source,这里读取CSV文件,并转换为对应的Class DataSet<ItemInfo> csvInput = env .readCsvFile(fileUrl.getPath()) .ignoreFirstLine().pojoType(ItemInfo.class, "itemId", "desc"); //将DataSet转换为Table return tableEnv.fromDataSet(csvInput); } /** * 统计结果对应的类 */ @Data public static class Result { // public String country; public Long sum_total_score; // public String userId; public Long itemId; public String desc; public Result() { } } }
import lombok.Data; import lombok.extern.slf4j.Slf4j; import myflink.entity.ItemInfo; import myflink.entity.UserBehavior; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.io.PojoCsvInputFormat; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.typeutils.PojoTypeInfo; import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.core.fs.Path; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.ProcessFunction; import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.java.StreamTableEnvironment; import org.apache.flink.util.Collector; import java.io.File; import java.net.URL; /** * * * */ @Slf4j public class HotItemsSql { public static void main(String[] args) { try { // 创建 execution environment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 告诉系统按照 EventTime 处理 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); // 为了打印到控制台的结果不乱序,我们配置全局的并发为1,改变并发对结果正确性没有影响 env.setParallelism(1); // env.setStateBackend(new FsStateBackend("hdfs://namenode:40010/flink/checkpoints")) // env.setStateBackend(new FsStateBackend("file://D:\\img\\data")); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); Table topScore = getOrder(env, tableEnv); //将topScore注册为一个表 , tableEnv.registerTable("topScore", topScore); // 商品信息表 Table itemInfo = getItemInfo(env, tableEnv); tableEnv.registerTable("itemInfo", itemInfo); //查询item点击数,及itemId的名字 String groupSql = "select a.itemId,count(*) as sum_total_score,b.desc from topScore a join itemInfo b on a.itemId=b.itemId group by a.itemId,categoryId,b.desc order by itemId desc"; // groupSql = "select a.itemId,b.desc from topScore a join itemInfo b on a.itemId=b.itemId group by a.itemId,categoryId,b.desc "; // 按照滚动时间窗口来处理 String tumbleWinGroupSql = "select a.itemId,count(*) as sum_total_score from topScore a join itemInfo b on a.itemId=b.itemId group by TUMBLE(a.createTime,INTERVAL '2' MINUTE), a.itemId order by itemId desc"; Table groupedByCountry = tableEnv.sqlQuery(groupSql); DataStream<Tuple2<Boolean, Result>> result = tableEnv.toRetractStream(groupedByCountry, Result.class); // //将dataset map成tuple输出 // result.map(new MapFunction<Tuple2<Boolean, Result>, String>() { // // @Override // public String map(Tuple2<Boolean, Result> booleanResultTuple2) throws Exception { // System.out.println("=============================" + result); // return null; // } // }).print(); // // result.process(new TopNHotItems2()) .print(); env.execute("Hot Items Job"); } catch (Exception e) { e.printStackTrace(); } } public static class TopNHotItems2 extends ProcessFunction<Tuple2<Boolean, Result>, String> { @Override public void processElement(Tuple2<Boolean, Result> value, Context context, Collector<String> collector) throws Exception { System.out.println("0:" + value.f0 + "1:" + value.f1); } } /** * item点击 * * @param env * @param tableEnv * @return */ private static Table getOrder(StreamExecutionEnvironment env, StreamTableEnvironment tableEnv) throws Exception { // UserBehavior.csv 的本地文件路径, 在 resources 目录下 URL fileUrl = HotItemsSql.class.getClassLoader().getResource("UserBehavior2.csv"); Path filePath = Path.fromLocalFile(new File(fileUrl.toURI())); //source,这里读取CSV文件,并转换为对应的Class // 抽取 UserBehavior 的 TypeInformation,是一个 PojoTypeInfo PojoTypeInfo<UserBehavior> pojoType = (PojoTypeInfo<UserBehavior>) TypeExtractor.createTypeInfo(UserBehavior.class); // 由于 Java 反射抽取出的字段顺序是不确定的,需要显式指定下文件中字段的顺序 String[] fieldOrder = new String[]{"userId", "itemId", "categoryId", "behavior", "createTime"}; // 创建 PojoCsvInputFormat PojoCsvInputFormat<UserBehavior> csvInput = new PojoCsvInputFormat<UserBehavior>(filePath, pojoType, fieldOrder); DataStream<UserBehavior> dataStream = env // 创建数据源,得到 UserBehavior 类型的 DataStream .createInput(csvInput, pojoType) // 抽取出时间和生成 watermark .assignTimestampsAndWatermarks(new AscendingTimestampExtractor<UserBehavior>() { @Override public long extractAscendingTimestamp(UserBehavior userBehavior) { // 原始数据单位秒,将其转成毫秒 return userBehavior.createTime; } }); //将DataSet转换为Table return tableEnv.fromDataStream(dataStream); } private static Table getItemInfo(StreamExecutionEnvironment env, StreamTableEnvironment tableEnv) throws Exception { // UserBehavior.csv 的本地文件路径, 在 resources 目录下 URL fileUrl = HotItemsSql.class.getClassLoader().getResource("ItemInfo.csv"); Path filePath = Path.fromLocalFile(new File(fileUrl.toURI())); //source,这里读取CSV文件,并转换为对应的Class // 抽取 UserBehavior 的 TypeInformation,是一个 PojoTypeInfo PojoTypeInfo<ItemInfo> pojoType = (PojoTypeInfo<ItemInfo>) TypeExtractor.createTypeInfo(ItemInfo.class); // 由于 Java 反射抽取出的字段顺序是不确定的,需要显式指定下文件中字段的顺序 String[] fieldOrder = new String[]{"itemId", "desc"}; // 创建 PojoCsvInputFormat PojoCsvInputFormat<ItemInfo> csvInput = new PojoCsvInputFormat<ItemInfo>(filePath, pojoType, fieldOrder); DataStream<ItemInfo> dataStream = env // 创建数据源,得到 UserBehavior 类型的 DataStream .createInput(csvInput, pojoType); // 抽取出时间和生成 watermark //将DataSet转换为Table return tableEnv.fromDataStream(dataStream); } /** * 统计结果对应的类 */ @Data public static class Result { // public String country; // public String sum_total_score; // public String userId; public Long itemId; public String desc; public Result() { } public Result(Long itemId, String desc) { this.itemId = itemId; this.desc = desc; } } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。