赞
踩
各位面试官好,我是xxx,我来面试咱们公司的大数据开发岗位,上一家公司是北京转转精神科技有限责任公司,在其数据部门担任大数据开发工程师,主要负责技术调研、需求沟通、集群搭建、平台集群监控、代码开发以及平台的相关调优等。
接下来我给您讲下我最近所做的项目,精准广告推送系统。
我们这个系统是我全程参与设计、开发的系统,相对于通常的数据统计类项目来说,更具备业务价值,他是直接服务于市场运营的。
主要功能是运营人员设置营销规则,根据营销规则,实时监测用户的行为数据,若一段时间内用户的行为符合规则,则为该用户推送短信或app弹窗,实现精准营销。
技术方面用的不多,主要是flink,drools,redis,clickhouse等,但是系统内部的落地实现和很多细节方面的设计点,是比较多的;
目前,系统内同时在线运行的运营规则大约有60多个;
规则计算的平均响应时间在1~5ms左右;
这是一个整体上的情况,项目的具体内容比较多,您看看有哪些方面想详细了解?我可以为您详细说明。
数据处理流程:
运营人员通过规则管理平台定义规则,规则管理平台会将规则数据保存到mysql中,flinkcdc动态监控mysql的变更数据,加载到广播状态中,形成规则流。
Flink从Kafka中消费用户行为日志数据,形成事实流。 事实流与规则流connect,根据规则的各类分组要求,对事实流进行复制和标记分组key,形成包装事实流。 将包装事实流根据标记key动态分组。 将分组后的数据与规则流connect,进入自定义processFunction中。 在processFunction中获取所有规则,并封装到bean,遍历符合分组字段的规则,对用户数据进行规则计算。 规则中有涉及到匹配用户历史行为数据的,将查出来的用户历史行为数据缓存到Redis中,以便多次使用。 将匹配成功的用户和规则封装到bean中,写进Kafka,供上层应用使用。
flink-cdc这是一个可以直接从 MySQL、PostgreSQL 等数据库直接读取全量数据和增量变更数据的 source 组件。
规则的定义:
分组id、触发的当前行为规则、用户的画像规则、一定时间范围内的行为次数条件或行为序列条件、基于时间定时器的触发规则。
其中,触发的当前行为规则是根据当前用户数据判断,用户的画像规则去hbase中查该用户的用户画像来匹配,行为次数条件或行为序列条件,是查clickHouse中的用户历史行为数据,基于时间定时器的触发规则,是匹配用户的未来行为数据。
用户行为数据导入
用户的行为数据,使用Flume的Source读取日志服务器,此Source会监听指定文件,近实时读取文件新增的行,并会周期性记录偏移量,然后直接通过KafkaChannel写进Kafka。
在clickHouse中创建Kafka引擎表,从Kafka中读取用户日志行为数据
创建用户行为日志明细表,再创建物化视图,桥接Kafka引擎表和事件明细表,实现用户行为日志的ckickHouse入库。
同一个规则,可能被同一个用户多次触发计算,从而规则中的各个条件都要被重复计算(反复查询clickhouse等)
不同的规则,可能有“相似”的条件,从而也会引起这些“相似”条件的重复计算(反复查询clickhouse等)
而这些“重复计算”,如果能被避免,则能极大降低clickhouse的查询压力,从而极大提升系统整体的响应能力!
正则化查询:由于clickHouse并发量不高,所以我们将查出来的用户历史行为数据,只抽取事件id,并将其映射为一个个字母,在java端使用正则匹配
映射原理:将规则中定义好的,所关心的事件序列抽到list中,循环从clickHouse中查到的数据,使用list.indexof判断是否包含,包含,就将其返回值加65转换为字母,加到用户行为映射字符串中。不包含,则映射为0。
并将从clickHouse中查到的用户行为映射字符串缓存到redis中,设置12小时的ttl。
**缓存key的设计:**缓存我们用的hash类型,大key保存规则中的key拼接上cacheId,小key保存起始时间拼接上结束时间,value为用户行为映射字符串。
缓存边界的设计:
我们将用户近两小时的数据保存到状态中,其余从缓存和clickHouse中查。
此时有三种情况:规则条件中的结束时间,小于最近两小时,则直接从clickHouse中查询数据,缓存到redis中,之后redis缓存能复用
规则条件中的起始时间,大于最近两小时时间,直接从状态中查询。
实际生产上,用的最多的是起始时间小于最近两小时,结束时间大于最近两小时,即一部分数据从clickHouse中查,一部分数据从状态中查。
这样的话,从clickHouse查数据的结束时间一直在变,缓存的数据无法复用。为了解决这个问题,我们设计了缓存边界,即将当前数据的时间向上取整,再减去2小时,生成一个缓存边界,超过缓存边界的数据到状态中查,没超过缓存边界的数据到缓存中和click House中查,这样能保证在一个小时内,缓存能复用。
并且在之后的查询中,我们也可以只查缓存中没有的数据,再拼接到缓存中。
定时器匹配未来的数据:
注册定时器,并将定时器的触发时间与匹配条件保存到mapState中,定时器出发后,根据触发时间去mapState中拿到规则条件,去状态或redis或clickHouse中查。
使用Flink广播状态关联维表数据,使用异步IO访问外部API获取用户地区信息。
**实时统计每日新老用户数量:**按照设备id分组,使用 OperatorState ,在每个分区存一个布隆过滤器,当布隆过滤器说某个值存在时,这个值可能不存在;当它说不存在时,那就肯定不存在,通过该特性判断用户是否是新用户。缺点是存在误判。
按照设备id keyBy,新进来的设备id,就在ValueState中存一个任意标识,通过判断该标识是否存在,即可判断出该设备id是否是新用户。此时不会再有误判,但也存在一个问题,那就是随着数据越来越多,checkPoint所要消耗的时间也越来越多。这时就需要使用 RocksDB 实现增量checkPoint。
实时统计热门商品
先将数据进行keyBy(分类ID,事件ID,商品ID),划分窗口,在窗口内进行增量聚合(效率高,全局聚合效率低,而且占用大量资源)。只能获得到:(分类ID,事件ID,商品ID,次数)。但是没法获取窗口的信息(窗口的起始时间,结束时间),再定义一个WindowFunction,窗口触发后可以在WindowFunction获取到窗口聚合后的数据,并且可以得到窗口的起始时间和结束时间(分类ID,事件ID,商品ID,次数,窗口起始时间,结束时间)。
接下来要KeyBy(分类ID,事件ID,窗口起始时间,结束时间)后进行排序:使用ProcessFunction的onTimer定时器进行排序,每来一条数据,不直接输出,而是将数据存储到State(为了容错),再注册一个比当前窗口的结束时间还要大一毫秒的定时器。窗口是一个一个执行的,如果下一个窗口的数据进来了,说明上一个窗口的数据已经攒齐了,那么WaterMark也已经大于了我注册的定时器的时间,就可以排序了。
实时统计直播带货时的有效在线人数
每有一个用户进来,就根据数据中的 EventTime 开一个 60000+1ms 的定时器,并将 EventTime 与 sessionId 放到 mapState中
每有一个用户离开,就将 mapState 中对应的 sessionId 移除(前提是它存在,有可能它在定时器中已经被移除)
定时器触发后,循环mapState中的数据,用触发定时器的时间减去 mapState 中的每个 sessionId 的EventTime ,若大于60s,则说明该 sessionId 超过了1分钟,累计人数加1,并移除该sessionId。
使用Flink的窗口join,将数据转换为大宽表,写入ClickHouse中,实现多维度分析订单数据
解决Left Join 左流丢失数据的问题:
在Cogroup之前将左流的数据先划分一个和CoGroup的window长度和类型一样的窗口,由于窗口的长度和类型一样,进入到CoGroup的窗口迟到,之前的窗口也肯定迟到。可以将迟到的数据在之前的窗口中打上Tag,通过侧流数据将迟到的数据输出。再将数据跟join后的数据Union,如果右流为空,就查询数据库关联右流的信息。
测试Flink集群数20台左右:
物理机:64g内存、2cup 24核
2U的服务器,8T * 8 = 64T
Kafka3台(磁盘是ssd,16T,通常数据存储2个副本),
10台 HDFS,YARN都安装,
6台机器安装MySQL集群和Redis集群
中小公司生产环境:整个集群规律50台,15~20Kafka, 其他30台HDFS、YARN
工作没有学习发展的空间,自己想在面试工作的相关产业中多加学习,或是前一份工作与自己的生涯规划不合等等,回答的答案最好是积极正面的。或者说,我希望能获得一份更好的工作,如果机会来临,我会抓住。
mysql索引
redis分布式锁
线程的几种锁
为什么会产生线程安全问题
flink背压:
Flink不需要一个特殊的机制来处理反压, 因为Flink中的数据传输机制相当于已经提供了反压机制。Flink程序的最大吞吐量由程序中运行最慢的那个Task所决定。
可以在Web界面,从Sink到Source这样反向逐个Task排查,找到第一个出现反压的Task。
产生数据流的速度如果过快,而下游的算子消费不过来的话,会产生背压。
背压的监控可以使用 Flink Web UI(localhost:8081) 来可视化监控,一旦报警就能知道。一般情况下背压问题的产生可能是由于 sink 这个 操作符没有优化好,做一下 优化就可以了。
设置 watermark 的最大延迟时间这个参数, 如果设置的过大,可能会造成 内存的压力 。可以设置最大延迟时间小一些 ,然后把迟到元素发送到侧输出流中去。 晚一点更新结果。
项目在设计和部署上线时,已经对这个情况进行了预防,分配的服务器资源和任务并行度,都是日常流量的5倍来分配的; 2.如果真的出现了流量超最高预估,那咱们还有办法: 把一部分不那么重要的规则在控制平台上进行操作下线;如果还不够,还可以另外起新的job,把运营规则在老job和新job间进行均衡分配;
Rowkey设计三大原则
1、唯一性原则
rowkey在设计上保证其唯一性。rowkey是按照字典顺序排序存储的,因此,设计rowkey的时候,要充分利用这个排序的特点,将经常读取的数据存储到一块。
2、长度原则
rowkey是一个二进制码流,可以是任意字符串,最大长度 64kb ,实际应用中一般为10-100bytes,以byte[] 形式保存,一般设计成定长。建议越短越好,不要超过16个字节,如果rowkey字段过长,内存的有效利用率就会降低,系统不能缓存更多的数据,这样会降低检索效率。
目前操作系统都是64位系统,内存8字节对齐,控制在16个字节,8字节的整数倍利用了操作系统的最佳特性。
3、散列原则
建议将rowkey的高位作为散列字段,由程序随机生成,低位放时间字段,这样将提高数据均衡分布在每个RegionServer,以实现负载均衡的几率。如果没有散列字段,首字段直接是时间信息,所有的数据都会集中在一个RegionServer上,这样在数据检索的时候负载会集中在个别的RegionServer上,造成热点问题,会降低查询效率。
改善方法:加盐、哈希、反转、时间戳反转
Kafka是如何实现高吞吐率的
顺序读写、零拷贝、文件分段、批量发送、数据压缩
日志采集:由于日志服务器集群与大数据集群在网络中不允许直联,所以我们采用2级传输,并在2级汇聚点上,采用了高可用方案。
日志服务器上,配置flume,使用TAILDIR Source(近实时读取文件新增的行,并会周期性纪录偏移量),file Channel(数据写到磁盘,更加安全可靠),avro sink,数据通过网络传输,配置sink组,sink组的切换策略为failover
在中转机器上配置flume,avro source ,file channel,hdfs sink
数据延迟问题:由于在将数据写到hdfs时,用了时间戳动态分配符来设置文件名,而设置的时间戳以本机时间为准,所以出现了数据延迟到达的问题(后续流程处理中,有同事发现,2021-06-06文件夹中的日志,存在2021-06-05号的数据)。为解决这个问题,在上游定义了一个过滤器,将每个数据的时间戳提取到event的header中,则sink会根据数据中的时间戳将数据放到指定的文件夹下。
数据积压问题:
月活:1000多万 日活:400万
平均每天每个用户访问1.2次
每个用户平均每次访问时长20分钟
按经验,每个用户平均每 5~10 秒产生一条事件
则每次访问,将产生20分钟*60秒/5 = 200条事件日志
则,每天产生的日志总条数: 400万1.2200条 = 28800 *万=9.6亿条日志
每条日志大小平均为0.5k,则每日增量日志大小为:
9.6亿*0.5k = 28800*5M= 480G 即500G
每月累积增量为:144G*30 = 15T
假如要存储1年的数据量,则1年的累计存储量为:180T
考虑,增长趋势: 预估每月增长20%
则1年的累计存储量为:接近200T
流量:10M/s 峰值:100M/s
6台日志服务器,4台中转机器
服务器配置:12core 24线程,内存64G,硬盘 1T * 10块
HADOOP+HIVE集群节点数量:30台
KAFKA集群:3台
Flume采集节点:3台
行为日志,每天增量200G
Ods层,存储周期半年,行为日志总量:200G*6*30天 = 36T + dwd层+dws层+ads层 =约 50T
考虑HDFS的副本数量:3
总数据量约: 150T
从存储角度考虑: 每台机器10T ,要15台才能装下,肯定还需要富余空间 40% ,应该要 25台(datanode)
namenode -》 2台
Zookeeper -》3,如果预算紧,则共用
公共kafka -》 3-5台
在中转机器中,配置了3个channel,channel selector 选择multiplexing(多路复用),并添加拦截器,为event的header增加选择channel的属性,实现并行读。每个channel配置一个sink,并行往hdfs中写数据。
问题:200G的量,要多少个executor? 每个executor多少内存?多少核?
通常,一个executor 配2core,4-8G
业务数据入库:使用sqoop定时抽取业务数据库表数据。对于数据不断变化、不断新增的大表,根据修改日期这一字段,每天增量导入到ods层
对于数据基本不变的维度表,直接全量导入到dwd层。
对于增量导入的大表,在dwd层创建一张拉链表,根据每天的增量数据更新拉链表。
用昨日的拉链数据left join今日增量数据,对于join上的,且结束时间为9999-12-31的昨日拉链数据,将其结束日期修改为昨日。对于今日增量数据,均为其生成一条起始日期为今天,结束日期为9999-12-31的数据。
ODS开发:数据存半年
使用JsonSerde,将数据按json格式进行映射,开发脚本,每天定时入库,成功或失败用mailx发送邮件通知
DWD开发:将贴源层数据经过清洗过滤、数据解析、session分割、数据规范化处理、地理位置解析、生成全局唯一标识
数据解析:使用jsonSerde解析数据,jsonSerde会将解析失败的数据自动过滤掉
会话切割:公司app有会话保持技术,后台唤醒后依然保持原会话id,不利于后续统计分析,使用spark编写程序,将同一用户同一会话中超过30分钟的相邻日志,切割为两个会话
维度退化:将经纬度退化为省市区。我们有一个库用来保存参考坐标点,将这个表转换为含有GEOHASH码的表,精度设为6,这样,每个经纬度点可以覆盖方圆1公里的范围。对于没匹配到的用户,使用ip地址映射,在driver端将ip地址库字典文件读到字节数组中,并用广播变量将他们广播到每个executor中,即可在每个task中根据ip地址库字典文件解析IP地址。
并且将这些没匹配到的用户收集起来,每攒一批去查询高德地图API,丰富我们的参考坐标点库。
生成全局用户标识:之前公司一致用数据中的account当作用户标识,但有一个问题,有些用户是匿名访问,还有的用户干脆就没注册过,对于这类用户的数据,由于没有account,在处理时就丢失了。
领导想让我们设计出一种方案,可以推测出匿名用户的account,从而能够处理匿名用户的数据。
最开始我们想用设备id来当用户标识,但这样又有一个问题,同一账号在不同设备上登录,就会被认成两个人。
最终,我们设计了一张设备账号绑定评分表,我们将某个设备上出现的所有账号都记录下来,并为其增加一个权重,同一设备id上,登录次数越多的账号,权重越高。
为了防止用户换账号后,原先账号权重过高,导致现在所用账号很长一段时间内权重超不了旧帐号,我们还应该为绑定表增加权重衰减策略,即同一设备id上,某天没登录过的账户,它的权重变为原来的0.5倍。
当某个设备绑定的多个账号的权重一样时,我们以该设备最后一次登录的账号为准
同时,我们还设计了一张空设备账号映射表,对于从没注册过的匿名行为日志,我们应该为该设备id生成一个很大(远超业务库中注册用户的最大id)的新的guid,并将其记录下来。记录到一张空设备guid映射表中
DWS层开发:
流量分析主题:访问次数、访问人数、访问时长、访问跳出页
用户活跃主题:拉链表的设计,之前分析用户活跃,使用每天的日活表和昨天的日活表join,太过麻烦。设计出一个保存用户连续活跃区间的拉链表, 拉链表可以记录用户一点时间的所有活跃状态,可以应对多个复杂查询。把与条件有重叠的区间过滤出来计算。
用户留存主题:基于之前的用户活跃拉链表计算用户留存,在设计拉链表时有设计新用户的第一次访问日期字段。将今天的时间减去第一次访问日期,即可得出是几日留存。
用户留存表用竖表保存,因为字段数不能确定,且用横表保存的话会有很多浪费的空闲空间
事件归因分析:做完活动后,或者投放广告后,分析师想要评估活动或广告的效应。这里我用spark程序开发的,因为用户的一连串事件在sql中不太好处理。
过滤出用户行为数据中我们所关心的事件(目标事件、待归因事件),对这些事件按目标事件分段(一个用户可能完成多次目标事件),为每个分段中的带归因事件打分(首次触点归因、末次触点归因、线性归因、位置归因、时间衰减归因),保存到数据库中。
由于每个目标事件的待归因事件可能有多个,所以不能做成横表,而应该做成竖表。
漏斗分析主题:通常会在做活动时有这个需求,分析师想要得到活动、广告的变现率,对于这种分析主题,我们通常先对用户行为数据过滤,将我们所关心的数据过滤出来,然后将每个用户所作的事实集收集成一条,使用正则表达式匹配,得出该用户完成的最大步骤数。在其之上做各种统计报表。
ads层:按需求,根据DWS层开发的主题宽表,统计出各类报表
对于一些固定报表,公司有个web可视化页面,我们将ads层统计出来的报表迁移到clickHouse,供前端页面查询。
对于一些非固定报表,如临时需求,则现写sql,在hive上运行,将结果以csv形式发给对方
对于一些固定模型分析需求,公司有个在线olap分析平台,分析师可以自己在web平台定制表单,根据sql模板生成sql,运行在presto引擎上
用户画像:
统计类标签:活跃度统计标签、消费订单统计标签、交互行为统计标签、营销行为标签
规则类标签:用户粘性级别、用户活跃级别(RFE)、用户生命周期、用户价值等级(RFM)
计算框架:sql聚合统计 、sql判断阈值
更新策略:统计标签日更,规则标签周更、算法模型类标签月更
对于用户基本属性标签,每天都增量更新,将昨天的全量标签数据与今天的增量数据full join后更新
对于一些历史累计标签,如历史以来订单总额、历史以来订单总数,我们是先做日聚合,然后于昨日的标签数据full join 后累加
对于一些指定时间段累计标签,如最近3日访问次数、最近15日活跃次数,我们每天做日聚合,重新累加
对于一些复杂统计类标签,如最大活跃天数、最大沉默天数,我们每天都重新计算,全量替换
按相同计算单元、相同更新频率来划分计算过程中的中间表(横表为主)
存储策略:
针对不同的查询需求,提供不同的存储策略。对于实时动态规则引擎系统,由于要按guid随机高频单条查,使用bulkloader将数据导入hbase
对于精准营销平台我们将数据存到clickHouse中,提供查询。
数仓:我先简单介绍一下我们公司的离线系统的架构,后面讲一下我自己主要负责那部分内容。
我们公司数仓系统大概分为这三层:一个是数据采集层、离线计算层、数据服务层
数据采集我有负责app行为日志的采集,我们所用的组件是flume,我们的配置是 taildirSource+filechannel+hdfsSink,同时我们配置了级联和高可用,保证数据的正常采集,业务数据我们使用sqoop将数据从mysql数据库导入到hdfs。再导入的时候我们会根据表的不同,采取不同的导入策略,对于经常变化的业务表,我们每天导入增量数据,将其保存为拉链表,对于不经常改变的纬度数据,直接全量导入。
数据采集完后,使用spark core,对数据进行预处理,预处理工作主要有脏数据、废弃字段处理、字段规范化、数据集成(将经纬度转换为省市区信息),并给数据映射一个全局唯一id。
第二层是数据计算层,我们用hive做了一个离线分析平台。整个数仓我们分了4层,ods、dwd、dws以及ads层。ods 是数据贴源层,在这层我们用来存放原始数据,直接加载原始日志、数据,对数据预处理后,再关联上纬度信息,形成一张大宽表,放在dwd层,同时根据全局明细表得到各个主题的明细表。我们会根据主题不同,按照主题进行划分,主题不同在全局明细表中提取不同的字段,同时对这个主题进行各个维度表的关联,得到主题明细表,也会放在dwd层中。dwd层相对于ods层来说也是没有进行聚合操作的,只是关联了一些纬度。dws层存的表就比较复杂了,我们在生成最终的结果之前会有一些中间结果表,有的中间结果我们不会保存,有的由于我们第二天的分析需要用到,或者这个表后面的开发也会用,就会保存为一张中间表。整个的逻辑一个是根据计算逻辑来分,比如说我负责的用户就会有一些历史表、历史登录区间这些。在一个就是根据维度,我们的是固定有一些分析维度,省市区、时间纬度,日活、月活,最后报表是个多维报表,中间结果就会保存在dws层,dws层就会有很多轻度聚合,而且他们之间可能还会有层级,但是这个就不怎么细分了,最后我们得到的最终报表会放在ads层。
数据服务层就是将ads层的报表提供给其他组,供其他组使用。常规的那些报表我们会提供给运营组那边,是用于公司日常运营状况的一个分析,其他的原始数据可能会提供给算法组,提供一个数据挖掘,我们做的用户画像,会提供给广告组,用于一个精准的广告推荐。
计算主要用hivesql,也有一些复杂的用spark core,最后表的更新我们会写成一个shell脚本,他们之间可能会有一个执行的先后关系,我们用azkban进行调度。
用atlas进行数据治理
flink 1.13
flink cdc 1.2.0
spark 2.4.7
hadoop 3.1.1
hive 2.1
scala 2.12.12
hbase 2.2.5
数据采集1个 etl 1个 报表2个 olap 1个 用户画像 2个
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。