赞
踩
一、准备环境
1、下载jdk-8u45-windows-x64.exe 安装于D:\Java8
2、修改JAVA_HOME为
D:\Java8\jdk1.8.0_45
3、修改HADOOP_HOME为
D:\hadoop272
4、搭建Hadoop102 Hadoop103 Hadoop104 三台centos6.5虚拟机
需修改hosts文件
192.168.198.134 Hadoop102
192.168.198.135 Hadoop103
192.168.198.136 Hadoop104
/etc/sysconfig/networks文件
NETWORKING=yes
HOSTNAME=Hadoop102
/etc/sysconfig/network-scripts/ifcfg-eth0文件
DEVICE="eth0"
BOOTPROTO="static"
NM_CONTROLLED="yes"
ONBOOT="yes"
TYPE="Ethernet"
UUID="2dc126cb-ef2a-412e-a373-45fbe1829354"
IPADDR=192.168.198.134
GATEWAY=192.168.198.2
NETMASK=255.255.255.0
DNS1=192.168.198.2
DNS2=114.114.114.114
DNS3=8.8.8.8
5、配置三台主机的登录秘钥
6、上传jdk1.8.0_45 至 /usr/java
7、上传hadoop272至 /opt/module2
8、配置hadoop 的6个文件
9、上传hive flume kafka zookeeper 至/opt/modules/ap 并解压
10、配置环境变量/etc/profile
export JAVA_HOME=/usr/java/jdk1.8.0_45 export PATH=$PATH:$JAVA_HOME/bin:$JAVA_HOME/jre/bin:$PATH export CLASSPATH=.:$JAVA_HOME/lib:$JAVA_HOME/jre/lib export HADOOP_HOME=/opt/modules/ap/hadoop272 export PATH=$PATH:$HADOOP_HOME/bin:$HADOOP_HOOME/sbin:$HADOOP_HOME/lib:$HADOOP_HOME/lib/native export HADOOP_COMMON_LIB_NATIVE_DIR=$HADOOP_HOME/lib/native export HADOOP_OPTS="-Djava.library.path=$HADOOP_HOME/lib" export HADOOP_CONF_DIR=/opt/modules/ap/hadoop272/etc/hadoop export HDFS_CONF_DIR=/opt/modules/ap/hadoop272/etc/hadoop export YARN_CONF_DIR=/opt/modules/ap/hadoop272/etc/hadoop export KAFKA_HOME=/opt/modules/ap/kafka_2.11-0.11.0.0 export PATH=$PATH:$KAFKA_HOME/bin:$KAFKA_HOME/libs export ZOOKEEPER_HOME=/opt/modules/ap/zookeeper-3.4.10 export PATH=$PATH:$ZOOKEEPER_HOME/bin export HIVE_HOME=/opt/modules/ap/apache-hive-1.2.1-bin export PATH=$PATH:$HIVE_HOME/bin:$HIVE_HOME/lib export FLUME_HOME=/opt/modules/ap/apache-flume-1.7.0-bin export PATH=$PATH:$FLUME_HOME/bin
11、安装配置zookeeper、hive、kafka、flume详见本人相关文章以下仅写出由于版本问题需要改动或补充的部分
12、此处由于hive版本较高,需要执行
schematool -dbType mysql -initSchema
命令手动初始化mysql数据库
否则会报错
Exception in thread "main" java.lang.RuntimeException: java.lang.RuntimeException: Unable to instantiate org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient
13、另外 配置metaserver后 使用hive前先后台启动metaserver
hive --service metastore &
14、win10安装idea
下载idea 2019.1
15、修改c:\Windows\System32\drivers\etc\hosts文件,添加
0.0.0.0 account.jetbrains.com
0.0.0.0 www.jetbrains.com
16、下载JetbrainsIdesCrack 4.2 jar
放到idea 2019的安装目录 bin下
修改 idea.exe.vmoptions和 idea64.exe.vmoptions
在末尾添加包的路径
-javaagent:D:\Javasoft\IntelliJ IDEA 2019.1.3\bin\jetbrains-agent.jar
17、打开idea2019,在activatecode 输入下载的注册码
18、打开D:\idea project\app_statistics_project 项目
二、项目概要
统计 手机app 的活跃度
关键词:开发商(租户)、app 、SDK、用户、活跃用户、沉默用户、忠诚用户、流失用户、留存用户、用户新鲜度、启动次数、单次使用时长、日使用时长
功能模块:
1)实现收集手机APP日志
2)定期离线分析业务指标
3)数据展示
流程:
1)手机APP启动时,上报启动日志、错误日志、页面日志、事件日志、使用时长日志等信息到日志收集服务器。
2)日志收集服务器将收集到的日志信息发送给kafka。
3)Flume分别消费kafka中的5种主题信息,并把数据存储到HDFS上。
4)通过crontab任务调度定时把HDFS中的信息拷贝到Hive数据仓库中。
5)核心业务操作采用Hive查询。
6)查询结果通过数据展示平台展示。
三、项目工程结构
项目:app_statistics_project
工程1:app_logs_common 提供实体类和地理信息的获取
–AppBaseLog 日志消息基类
创建时间、应用标识、租户标识、设备标识、应用版本、安装渠道、操作系统、系统版本、机型
–AppErrorLog 错误日志
错误摘要、错误详情
–AppEventLog 事件日志
事件标识、持续时长、参数(键值)
–AppPageLog 页面日志
页面标识、访问顺序、下个页面、停留时长
–AppStartupLog 启动日志
国家、省份、ip地址、网络运营商、品牌、分辨率
–AppUsageLog 时长日志
单次使用时长、单次上传流量、单次下载流量
–AppLogEntity 日志集合
private AppStartupLog[] appStartupLogs;
private AppPageLog[] appPageLogs;
private AppEventLog[] appEventLogs;
private AppUsageLog[] appUsageLogs;
private AppErrorLog[] appErrorLogs;
–GeoInfo 地理信息
country 、province
–GeoUtil 地理信息工具
加载国家数据
public static String getCountry(String ip)
加载省份数据
public static String getProvince(String ip)
工程2、app_logs_client 手机客户端工程,模拟生日生成
–GenerateData 数据生成类
日志创建时间、应用唯一标识、租户唯一标识、设备唯一标识、版本、渠道、操作系统、系统版本、机型
日志信息初始值或随机值定义
private static Long[] createdAtMsS = initCreatedAtMs();//日志创建时间
private static String appId = "sdk34734";//应用唯一标识
private static String[] tenantIds = {"cake"};//租户唯一标识,企业用户
private static String[] deviceIds = initDeviceId();//设备唯一标识
private static String[] appVersions = {"3.2.1", "3.2.2"};//版本
private static String[] appChannels = {"youmeng1", "youmeng2"};//渠道,安装时就在清单中制定了,appStore等。
private static String[] appPlatforms = {"android", "ios"};//平台
private static String[] osTypes = {"8.3", "7.1.1"};//操作系统
private static String[] deviceStyles = {"iPhone 6", "iPhone 6 Plus", "红米手机1s"};//机型
//初始化设备id
private static String[] initDeviceId()
//初始化创建时间
private static Long[] initCreatedAtMs()
//启动日志属性值
private static String[] countrys = {"America", "china"};//国家,终端不用上报,服务器自动填充该属性
private static String[] provinces = {"Washington", "jiangxi", "beijing"};//省份,终端不用上报,服务器自动填充该属性
private static String[] networks = {"WiFi", "CellNetwork"};//网络
private static String[] carriers = {"中国移动", "中国电信", "EE"};//运营商
private static String[] brands = {"三星", "华为", "Apple", "魅族", "小米", "锤子"};//品牌
private static String[] screenSizes = {"1136*640", "960*640", "480*320"};//分辨率
//事件日志属性值 private static String[] eventIds = {"popMenu", "autoImport", "BookStore"}; //事件唯一标识 private static Long[] eventDurationSecsS = {new Long(25), new Long(67), new Long(45)};//事件持续时长 static Map<String, String> map1 = new HashMap<String, String>() { { put("testparam1key", "testparam1value"); put("testparam2key", "testparam2value"); } }; static Map<String, String> map2 = new HashMap<String, String>() { { put("testparam3key", "testparam3value"); put("testparam4key", "testparam4value"); } }; private static Map[] paramKeyValueMapsS = {map1, map2};//参数名/值对
//使用时长日志属性值
private static Long[] singleUseDurationSecsS = initSingleUseDurationSecs();//单次使用时长(秒数),指一次启动内应用在前台的持续时长
// 单次使用时长
private static Long[] initSingleUseDurationSecs()
// 错误日志属性值
private static String[] errorBriefs = {"at cn.lift.dfdf.web.AbstractBaseController.validInbound(AbstractBaseController.java:72)", "at cn.lift.appIn.control.CommandUtil.getInfo(CommandUtil.java:67)"}; //错误摘要
private static String[] errorDetails = {"java.lang.NullPointerException\\n " + "at cn.lift.appIn.web.AbstractBaseController.validInbound(AbstractBaseController.java:72)\\n " + "at cn.lift.dfdf.web.AbstractBaseController.validInbound", "at cn.lift.dfdfdf.control.CommandUtil.getInfo(CommandUtil.java:67)\\n " + "at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\\n" + " at java.lang.reflect.Method.invoke(Method.java:606)\\n"}; //错误详情
// 页面使用情况日志属性值
private static String[] pageIds = {"list.html", "main.html", "test.html"};//页面id
private static int[] visitIndexs = {0, 1, 2, 3, 4};//访问顺序号,0为第一个页面
private static String[] nextPages = {"list.html", "main.html", "test.html", null}; //下一个访问页面,如为空则表示为退出应用的页面
private static Long[] stayDurationSecsS = {new Long(45), new Long(2), new Long(78)};//当前页面停留时长
生成模拟数据数组
// 初始化五类log的数据
//启动相关信息的数组
private static AppStartupLog[] appStartupLogs = initAppStartupLogs();
//页面跳转相关信息的数组
private static AppPageLog[] appPageLogs = initAppPageLogs();
//事件相关信息的数组
private static AppEventLog[] appEventLogs = initAppEventLogs();
//app使用情况相关信息的数组
private static AppUsageLog[] appUsageLogs = initAppUsageLogs();
//错误相关信息的数组
private static AppErrorLog[] appErrorLogs = initAppErrorLogs();
// 初始化每类log的公共属性值 private static void initLogCommon(AppBaseLog baselog){ // 日志创建时间 baselog.setCreatedAtMs(System.currentTimeMillis()); // appid baselog.setAppId(appId); // 租户唯一标识,企业用户 String tenantId = tenantIds[random.nextInt(tenantIds.length)]; if (tenantId != null) { baselog.setTenantId(tenantId); } baselog.setTenantId(tenantIds[random.nextInt(tenantIds.length)]); // 设备唯一标识 baselog.setDeviceId(deviceIds[random.nextInt(deviceIds.length)]); // 版本 baselog.setAppVersion(appVersions[random.nextInt(appVersions.length)]); // 渠道 baselog.setAppChannel(appChannels[random.nextInt(appChannels.length)]); // 平台 baselog.setAppPlatform(appPlatforms[random.nextInt(appPlatforms.length)]); // 操作系统 baselog.setOsType(osTypes[random.nextInt(osTypes.length)]); // 机型 baselog.setDeviceStyle(deviceStyles[random.nextInt(deviceStyles.length)]); }
各类日志的init函数
// 启动相关信息的数组
private static AppStartupLog[] initAppStartupLogs()
// 页面相关信息的数组
private static AppPageLog[] initAppPageLogs()
// 事件相关信息的数组
private static AppEventLog[] initAppEventLogs()
// app使用情况相关信息的数组
private static AppUsageLog[] initAppUsageLogs()
// 错误相关信息的数组
private static AppErrorLog[] initAppErrorLogs()
发送数据
// 循环发送数据
public static void main(String[] args)
–UploadUtil 日志上传工具类
//上传日志
public static void upload(String json)
URL url = new URL("http://hadoop102:8080/app_logs/coll/index");
工程3、app_logs_collect_web 网站端 spring mvc网站,浏览数据
–日志收集类
@Controller
@RequestMapping("/coll")
public class CollectLogController
//接收发送过来的数据
public AppLogEntity collect(@RequestBody AppLogEntity e, HttpServletRequest req)
// 修正时间
private void verifyTime(AppLogEntity e, HttpServletRequest req)
//修正ip client地址
private void processIp(AppLogEntity e, HttpServletRequest req)
// 缓存地址信息
private Map<String, GeoInfo> cache = new HashMap<String, GeoInfo>();
设置kafka主题
public class Constants {
//主题
public static final String TOPIC_APP_STARTUP = "topic_app_startup" ;
public static final String TOPIC_APP_ERRROR = "topic_app_error" ;
public static final String TOPIC_APP_EVENT = "topic_app_event" ;
public static final String TOPIC_APP_USAGE = "topic_app_usage" ;
public static final String TOPIC_APP_PAGE = "topic_app_page" ;
}
// 发送消息给发Kafka
private void sendMessage(AppLogEntity e)
//发送单个的log消息给kafka
private void sendSingleLog(KafkaProducer<String, String> producer, String topic, AppBaseLog[] logs)
工程4、app_logs_flume 创建Flume拦截器,区分kafka传递过来的日志类型
类:public class LogCollInterceptor implements Interceptor
//将flume事件解析成五个类别
public Event intercept(Event event)
配置Flume
a1.sources=r1 a1.channels=c1 a1.sinks=k1 a1.sources.r1.interceptors = i1 a1.sources.r1.interceptors.i1.type = com.hxzy.app.flume.interceptor.LogCollInterceptor$Builder a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource a1.sources.r1.batchSize = 5000 a1.sources.r1.batchDurationMillis = 2000 a1.sources.r1.kafka.bootstrap.servers = hadoop102:9092 a1.sources.r1.kafka.zookeeperConnect = hadoop102:2181,hadoop103:2181,hadoop104:2181 a1.sources.r1.kafka.topics=topic_app_startup,topic_app_error,topic_app_event,topic_app_usage,topic_app_page a1.channels.c1.type=memory a1.channels.c1.capacity=100000 a1.channels.c1.transactionCapacity=10000 a1.sinks.k1.type = hdfs a1.sinks.k1.hdfs.path = /user/centos/applogs/%{logType}/%Y%m/%d/%H%M a1.sinks.k1.hdfs.filePrefix = events- a1.sinks.k1.hdfs.round = true a1.sinks.k1.hdfs.roundValue = 30 a1.sinks.k1.hdfs.roundUnit = second #不要产生大量小文件 a1.sinks.k1.hdfs.rollInterval = 30 a1.sinks.k1.hdfs.rollSize = 0 a1.sinks.k1.hdfs.rollCount = 0 #控制输出文件是原生文件。 a1.sinks.k1.hdfs.fileType = DataStream a1.sources.r1.channels = c1 a1.sinks.k1.channel= c1
工程5、HIVE数据处理
分区表
tab_name
ext_error_logs
ext_event_logs
ext_page_logs
ext_startup_logs
ext_usage_logs
类:DateUtil
java date 处理函数
//根据输入的时间信息,返回当天的起始时间;
public static Date getDayBeginTime(Date d)
public static Date getDayBeginTime(Date d, int offset)
//根据输入的时间信息,返回本周的起始时间;
public static Date getWeekBeginTime(Date d)
public static Date getWeekBeginTime(Date d, int offset)
//根据输入的时间信息,返回本月的起始时间;
public static Date getMonthBeginTime(Date d)
public static Date getMonthBeginTime(Date d, int offset)
UDF函数
类:
class DayBeginUDF extends UDF
// 计算现在的起始时刻(毫秒数)
public long evaluate() throws ParseException {
return evaluate(new Date());
}
// 指定天偏移量
public long evaluate(int offset) throws ParseException {
return evaluate(DateUtil.getDayBeginTime(new Date(), offset));
}
// 计算某天的起始时刻,日期类型(毫秒数)
public long evaluate(Date d) throws ParseException {
return DateUtil.getDayBeginTime(d).getTime();
}
// 计算某天的起始时刻,日期类型,带偏移量(毫秒数)
public long evaluate(Date d, int offset) throws ParseException {
return DateUtil.getDayBeginTime(d, offset).getTime();
}
// 计算某天的起始时刻,String类型(毫秒数)
public long evaluate(String dateStr) throws ParseException {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
Date d = sdf.parse(dateStr);
return evaluate(d);
}
// 计算某天的起始时刻,String类型,带偏移量(毫秒数)
public long evaluate(String dateStr, int offset) throws ParseException {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
Date d = sdf.parse(dateStr);
return DateUtil.getDayBeginTime(d, offset).getTime();
}
// 计算某天的起始时刻,String类型,带格式化要求(毫秒数)
public long evaluate(String dateStr, String fmt) throws ParseException {
SimpleDateFormat sdf = new SimpleDateFormat(fmt);
Date d = sdf.parse(dateStr);
return DateUtil.getDayBeginTime(d).getTime();
}
// 计算某天的起始时刻,String类型,带格式化,带偏移量(毫秒数)
public long evaluate(String dateStr, String fmt, int offset) throws ParseException {
SimpleDateFormat sdf = new SimpleDateFormat(fmt);
Date d = sdf.parse(dateStr);
return DateUtil.getDayBeginTime(d, offset).getTime();
}
}
类:public class WeekBeginUDF extends UDF
类:public class MonthBeginUDF extends UDF
类:public class FormatTimeUDF extends UDF
SQL:
新增用户统计:
日新增用户
select
count(*)
from
(select min(createdatms) mintime
from ext_startup_logs
where appid = 'sdk34734'
group by deviceid
having mintime >= getdaybegin() and mintime < getdaybegin(1)
)t ;
月新增用户
select
count(*)
from
(select min(createdatms) mintime
from ext_startup_logs
where appid = 'sdk34734'
group by deviceid
having mintime >= getmonthbegin() and mintime < getmonthbegin(1)
)t ;
日活跃用户数
select
count(distinct deviceid)
from ext_startup_logs
where appid = 'sdk34734'
and createdatms >= getdaybegin() and createdatms < getdaybegin(1);
一周内,每天的日活跃数
select
formattime(createdatms,'yyyy/MM/dd') day ,count(distinct deviceid)
from ext_startup_logs
where appid = 'sdk34734'
and createdatms >= getweekbegin() and createdatms < getweekbegin(1)
group by formattime(createdatms,'yyyy/MM/dd');
连续3周活跃用户
select deviceid , count(distinct(formattime(createdatms,'yyyyMMdd',0))) c
from ext_startup_logs
where appid = 'sdk34734'
and concat(ym,day) >= formattime(getweekbegin(-2),'yyyyMMdd')
group by deviceid
沉默用户数
select
count(*)
from
(select deviceid , count(createdatms) dcount,min(createdatms) dmin
from ext_startup_logs
where appid = 'sdk34734'
group by deviceid
having dcount = 1 and dmin < getdaybegin(-1)
)t;
工程6、web可视化
配置spring mvc beans.xml
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:tx="http://www.springframework.org/schema/tx" xmlns:aop="http://www.springframework.org/schema/aop" xmlns:context="http://www.springframework.org/schema/context" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.3.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.3.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.3.xsd"> <!-- 扫描service包 --> <context:component-scan base-package="com.hxzy.applogs.visualize.service" /> <!-- 连接hive数据源 --> <bean id="dataSource" class="com.mchange.v2.c3p0.ComboPooledDataSource"> <property name="driverClass" value="org.apache.hive.jdbc.HiveDriver" /> <property name="jdbcUrl" value="jdbc:hive2://192.168.1.103:10000/applogsdb" /> <property name="user" value="hxzy" /> <property name="password" value="" /> </bean> <bean id="sqlSessionFactoryBean" class="org.mybatis.spring.SqlSessionFactoryBean"> <property name="dataSource" ref="dataSource" /> <property name="configLocation" value="classpath:mybatis-config.xml" /> </bean> <bean id="statMapper" class="org.mybatis.spring.mapper.MapperFactoryBean"> <property name="mapperInterface" value="com.hxzy.applogs.visualize.dao.StatMapper"></property> <property name="sqlSessionFactory" ref="sqlSessionFactoryBean"></property> </bean> </beans>
配置 mybetis
mybatis-config.xml
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE configuration
PUBLIC "-//mybatis.org//DTD Config 3.0//EN"
"http://mybatis.org/dtd/mybatis-3-config.dtd">
<configuration>
<mappers>
<mapper resource="StatMapper.xml" />
</mappers>
</configuration>
StatMapper.xml
<mapper namespace="com.hxzy.applogs.visualize.dao.StatMapper"> <!-- 查询新增用户 --> <select id="findNewUsers" resultMap="rm_StatBean"> select count(*) stcount from ext_startup_logs </select> <resultMap id="rm_StatBean" type="com.hxzy.applogs.visualize.domain.StatBean"> <result column="stcount" property="count" /> </resultMap> <select id="findThisWeekNewUsers" resultMap="rm_weekUser"> select formattime(t.mintime,'yyyy/MM/dd') stdate , count(*) stcount from ( select deviceid,min(createdatms) mintime from ext_startup_logs where appid = #{appid} group by deviceid having mintime >= getweekbegin() and mintime < getweekbegin(1) ) t group by formattime(t.mintime,'yyyy/MM/dd') </select> <resultMap id="rm_weekUser" type="com.hxzy.applogs.visualize.domain.StatBean"> <result column="stcount" property="count" /> <result column="stdate" property="date" /> </resultMap> </mapper>
类:StatController
//统计每周每天新增用户数
public Map<String, Object> stat3()
类2:统计服务
@Service("statService")
public class StatServiceImpl implements StatService
三、调试client、collect_web、flume拦截器3个工程
1、app_log_collect_web
调试问题1:
未找到包 xxxx
解决:pom.xml 为正确配置 app_logs_commen模块,名称错误
调试问题2:
Error:(31, 46) java: 找不到符号
符号: 类 AppLogEntity
位置: 类 com.atguigu.applogs.collect.web.controller.CollectLogController
解决:依据文档添加该类
2、app_log_client --> generatedata
调试问题1:
Error:java: Compilation failed: internal java compiler error
解决:将项目和工程的language level 统一改为 7 diamonds
调试问题2:找不到方法setSingleUseDurationSecs
解决:依据文档将缺失的方法添加到项目中
调试问题3:
java.net.ConnectException: Connection timed out: connect
未能连接网络服务器
解决:将目标链接改为本地
URL url = new URL("http://localhost:8080/coll/index");// 测试地址
调试问题4:
接收到的响应码始终未404
调试方法:
在CollectLogController 的 collect action中 添加参数接收的控制台输出
System.out.println("logEntity.setAppStartupLogs");
在GenerateData 的 main 方法中添加控制台输出对方法执行实施监控
System.out.println("logEntity.setAppStartupLogs");
AppStartupLog[] a = new AppStartupLog[]{appStartupLogs[random.nextInt(appStartupLogs.length)]};
logEntity.setAppStartupLogs(a);
System.out.println(a[0].getCountry());
发送json
{"appErrorLogs":[{"appChannel":"youmeng1","appId":"sdk34734","appPlatform":"android","appVersion":"3.2.1","createdAtMs":1597834893609,"deviceId":"device2265","deviceStyle":"红米手机1s","errorBrief":"at cn.lift.dfdf.web.AbstractBaseController.validInbound(AbstractBaseController.java:72)","errorDetail":"at cn.lift.dfdfdf.control.CommandUtil.getInfo(CommandUtil.java:67)\\n at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\\n at java.lang.reflect.Method.invoke(Method.java:606)\\n","osType":"8.3","tenantId":"cake"}],"appEventLogs":[{"appChannel":"youmeng2","appId":"sdk34734","appPlatform":"ios","appVersion":"3.2.2","createdAtMs":1597834893609,"deviceId":"device2222","deviceStyle":"红米手机1s","eventDurationSecs":45,"eventId":"autoImport","osType":"8.3","paramKeyValueMap":{"testparam3key":"testparam3value","testparam4key":"testparam4value"},"tenantId":"cake"}],"appPageLogs":[{"appChannel":"youmeng2","appId":"sdk34734","appPlatform":"ios","appVersion":"3.2.1","createdAtMs":1597834893608,"deviceId":"device2248","deviceStyle":"红米手机1s","nextPage":"list.html","osType":"8.3","pageId":"main.html","stayDurationSecs":45,"tenantId":"cake","visitIndex":2}],"appStartupLogs":[{"appChannel":"youmeng2","appId":"sdk34734","appPlatform":"android","appVersion":"3.2.1","brand":"小米","carrier":"EE","country":"America","createdAtMs":1597834893608,"deviceId":"device2274","deviceStyle":"iPhone 6 Plus","network":"WiFi","osType":"7.1.1","province":"Washington","screenSize":"480*320","tenantId":"cake"}],"appUsageLogs":[{"appChannel":"youmeng1","appId":"sdk34734","appPlatform":"ios","appVersion":"3.2.2","createdAtMs":1597834893609,"deviceId":"device224","deviceStyle":"iPhone 6 Plus","osType":"7.1.1","singleUseDurationSecs":93,"tenantId":"cake"}]}
System.out.println(json);
解决:数据生成正常,action端接收不到
1、将url改为:
http://localhost:8080/app_logs_collect_web_war_exploded/coll/index
通过控制台输出发现,程序在向kafka发送数据时,发生异常,由于虚拟机服务器未启动,因此情况正常,可以暂时注释掉向kafka发送数据的步骤
//sendMessage(e);
app_logs_client 工程 调试完成。
3、配置kafka集群的server.properties
delete.topic.enable=true
broker.id=0
log.dirs=/opt/modules/ap/kafka_2.11-0.11.0.0/logs
zookeeper.connect=Hadoop102:2181,Hadoop103:2181,Hadoop104:2181
num.recovery.threads.per.data.dir=1
num.partitions=1
socket.request.max.bytes=104857600
socket.receive.buffer.bytes=102400
socket.send.buffer.bytes=102400
num.io.threads=8
num.network.threads=3
在三台服务器依次启动kafka服务器
/opt/modules/ap/kafka_2.11-0.11.0.0/bin/kafka-server-start.sh /opt/modules/ap/kafka_2.11-0.11.0.0/config/server.properties &
创建主题
/opt/modules/ap/kafka_2.11-0.11.0.0/bin/kafka-topics.sh --zookeeper Hadoop102:2181 --create --replication-factor 3 --partitions 1 --topic topic_app_startup
/opt/modules/ap/kafka_2.11-0.11.0.0/bin/kafka-topics.sh --zookeeper Hadoop102:2181 --create --replication-factor 3 --partitions 1 --topic topic_app_error
/opt/modules/ap/kafka_2.11-0.11.0.0/bin/kafka-topics.sh --zookeeper Hadoop102:2181 --create --replication-factor 3 --partitions 1 --topic topic_app_event
/opt/modules/ap/kafka_2.11-0.11.0.0/bin/kafka-topics.sh --zookeeper Hadoop102:2181 --create --replication-factor 3 --partitions 1 --topic topic_app_usage
/opt/modules/ap/kafka_2.11-0.11.0.0/bin/kafka-topics.sh --zookeeper Hadoop102:2181 --create --replication-factor 3 --partitions 1 --topic topic_app_page
创建控制台 topic_app_startup主题 消费者
/opt/modules/ap/kafka_2.11-0.11.0.0/bin/kafka-console-consumer.sh --zookeeper Hadoop102:2181 --topic topic_app_startup
问题1:启动新的consumer后,发生异常
问题解决:未配置consumer.properties
zookeeper.connect=Hadoop102:2181,Hadoop103:2181,Hadoop104:2181
问题2:controller消息发送失败
解决:未配置producer.properties
bootstrap.servers=Hadoop102:9092,Hadoop103:9092,Hadoop104:9092
问题3:使用kafka-server-stop.sh 脚本关闭kafka失败
解决:编辑此脚本,指定kafka版本,“kafka.kafka” =>“kafka_2.11-0.11.0.0”
修改三个问题后,consumer接收app_logs_start消息正常
4、将调试后的app_logs_collect_web 打成war包 部署到 hadoop102的tomcat
检查是否pom文件中包含war属性
—打包之前首先clean项目
问题1:app_logs_common.jar找不到
解决:首先打包app_logs_common jar包,并且执行install操作
问题2:could not copy GeoLite2-City.mmdb 。。。请求的操作无法在使用用户映射区域打开的文件上执行
解决:打包前需要停止正在运行的项目,关闭后打包成功
将war包复制到Hadoop102的/usr/tomcat/webapps目录下
开启tomcat /usr/tomcat/bin/
修改app_logs_client工程中的UploadUtil中的请求地址为
http://hadoop102:8080/app_logs_collect_web/coll/index
5、flume接收kafka5个主题转存hdfs,并使用拦截器区分存储路径
ps: 此处也可使不使用拦截器而开动5个flume-agent分别接收5个不同主题但为节省维护成本,可以创建flume拦截器LogCollInterceptor类,用于在Event header中增加logtype键值,主要实现intercept函数如下:
public class LogCollInterceptor implements Interceptor { ... /** * Modifies events in-place. */ public Event intercept(Event event) { // 1获取flume接收消息头 Map<String, String> headers = event.getHeaders(); // 2获取flume接收的json数据数组 byte[] json = event.getBody(); // 将json数组转换为字符串 String jsonStr = new String(json); // pageLog String logType = "" ; if(jsonStr.contains("pageId")){ logType = "page" ; } // eventLog else if (jsonStr.contains("eventId")) { logType = "event"; } // usageLog else if (jsonStr.contains("singleUseDurationSecs")) { logType = "usage"; } // error else if (jsonStr.contains("errorBrief")) { logType = "error"; } // startup else if (jsonStr.contains("network")) { logType = "startup"; } // 3将日志类型存储到flume头中 headers.put("logType", logType); // 简易调试: 在窗口输出json和logType System.out.println("json:"+json.toString()); System.out.println("logType:"+logType); return event; } }
打包app_logs_flume工程为app_logs_flume_1.0.jar,拷贝到
/opt/modules/ap/apache-flume-1.7.0-bin/lib文件夹下面作为拦截器class
配置 flume-applog-conf.properties 配置kafka数据源、拦截器
a1.sources=r1 a1.channels=c1 a1.sinks=k1 a1.sources.r1.interceptors = i1 a1.sources.r1.interceptors.i1.type = com.atguigu.app.flume.interceptor.LogCollInterceptor$Builder a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource a1.sources.r1.batchSize = 5000 a1.sources.r1.batchDurationMillis = 2000 a1.sources.r1.kafka.bootstrap.servers = hadoop102:9092 a1.sources.r1.kafka.zookeeperConnect = hadoop102:2181,hadoop103:2181,hadoop104:2181 a1.sources.r1.kafka.topics=topic_app_startup,topic_app_error,topic_app_event,topic_app_usage,topic_app_page a1.channels.c1.type=memory a1.channels.c1.capacity=100000 a1.channels.c1.transactionCapacity=10000 a1.sinks.k1.type = hdfs a1.sinks.k1.hdfs.path = hdfs://hadoop102:9000/usr/applogs/%{logType}/%Y%m/%d/%H%M a1.sinks.k1.hdfs.filePrefix = events- a1.sinks.k1.hdfs.round = true a1.sinks.k1.hdfs.roundValue = 30 a1.sinks.k1.hdfs.roundUnit = second #不要产生大量小文件 a1.sinks.k1.hdfs.rollInterval = 30 a1.sinks.k1.hdfs.rollSize = 0 a1.sinks.k1.hdfs.rollCount = 0 #控制输出文件是原生文件。 a1.sinks.k1.hdfs.fileType = DataStream a1.sources.r1.channels = c1 a1.sinks.k1.channel= c1
启动flume agent脚本
/opt/modules/ap/apache-flume-1.8.0-bin/bin/flume-ng agent -f /opt/modules/ap/apache-flume-1.8.0-bin/conf/flume-applog-conf.properties -n a1
四、调试hive、visulize两个项目
1、修改hive配置,支持json,压缩模式改为不压缩
(1)将json-serde-1.3.8-jar-with-dependencies.jar导入到hive的/opt/module/hive/lib
(2)在/opt/module/hive/conf/hive-site.xml文件中添加如下配置
<property>
<name>hive.aux.jars.path</name>
<value>/opt/modules/ap/apache-hive-1.2.1-bin/lib/json-serde-1.3.8-jar-with-dependencies.jar
</value>
</property>
<property>
<name>hive.exec.compress.output</name>
<value>false</value>
</property>
2、创建applogs_db数据库
drop database applogs_db;
create database applogsdb;
use applogsdb;
3、创建外部分区表
--startup
CREATE external TABLE ext_startup_logs(createdAtMs bigint,appId string,tenantId string,deviceId string,appVersion string,appChannel string,appPlatform string,osType string,deviceStyle string,country string,province string,ipAddress string,network string,carrier string,brand string,screenSize string)PARTITIONED BY (ym string, day string,hm string) ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe' STORED AS TEXTFILE;
--error
CREATE external TABLE ext_error_logs(createdAtMs bigint,appId string,tenantId string,deviceId string,appVersion string,appChannel string,appPlatform string,osType string,deviceStyle string,errorBrief string,errorDetail string)PARTITIONED BY (ym string, day string,hm string) ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe' STORED AS TEXTFILE;
--event
CREATE external TABLE ext_event_logs(createdAtMs bigint,appId string,tenantId string,deviceId string,appVersion string,appChannel string,appPlatform string,osType string,deviceStyle string,eventId string,eventDurationSecs bigint,paramKeyValueMap Map<string,string>)PARTITIONED BY (ym string, day string,hm string) ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe' STORED AS TEXTFILE;
--page
CREATE external TABLE ext_page_logs(createdAtMs bigint,appId string,tenantId string,deviceId string,appVersion string,appChannel string,appPlatform string,osType string,deviceStyle string,pageViewCntInSession int,pageId string,visitIndex int,nextPage string,stayDurationSecs bigint)PARTITIONED BY (ym string, day string,hm string) ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe' STORED AS TEXTFILE;
--usage
CREATE external TABLE ext_usage_logs(createdAtMs bigint,appId string,tenantId string,deviceId string,appVersion string,appChannel string,appPlatform string,osType string,deviceStyle string,singleUseDurationSecs bigint,singleUploadTraffic bigint,singleDownloadTraffic bigint)PARTITIONED BY (ym string, day string,hm string) ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe' STORED AS TEXTFILE;
4、编程hive定时脚本 hdfstohivebyminute.sh ,每隔一分钟将HDFS上的数据,导入到Hive对应分区一次
#!/bin/bash
systime=`date -d "-3 minute" +%Y%m-%d-%H%M`
ym=`echo ${systime} | awk -F '-' '{print $1}'`
day=`echo ${systime} | awk -F '-' '{print $2}'`
hm=`echo ${systime} | awk -F '-' '{print $3}'`
/opt/modules/ap/apache-hive-1.2.1-bin/bin/hive -e "load data inpath '/usr/applogs/startup/${ym}/${day}/${hm}' into table applogsdb.ext_startup_logs partition(ym='${ym}',day='${day}',hm='${hm}')"
/opt/modules/ap/apache-hive-1.2.1-bin/bin/hive -e "load data inpath '/usr/applogs/error/${ym}/${day}/${hm}' into table applogsdb.ext_error_logs partition(ym='${ym}',day='${day}',hm='${hm}')"
/opt/modules/ap/apache-hive-1.2.1-bin/bin/hive -e "load data inpath '/usr/applogs/event/${ym}/${day}/${hm}' into table applogsdb.ext_event_logs partition(ym='${ym}',day='${day}',hm='${hm}')"
/opt/modules/ap/apache-hive-1.2.1-bin/bin/hive -e "load data inpath '/usr/applogs/usage/${ym}/${day}/${hm}' into table applogsdb.ext_usage_logs partition(ym='${ym}',day='${day}',hm='${hm}')"
/opt/modules/ap/apache-hive-1.2.1-bin/bin/hive -e "load data inpath '/usr/applogs/page/${ym}/${day}/${hm}' into table applogsdb.ext_page_logs partition(ym='${ym}',day='${day}',hm='${hm}')"
将文件格式化为UNIX格式
yum install -y dos2unix
dos2unix /opt/modules/ap/apache-hive-1.2.1-bin/hdfstohivebyminute.sh
编写Linux调度列表 /etc/crontab 添加
* * * * * source /etc/profile; /opt/modules/ap/apache-hive-1.2.1-bin/hdfstohive.sh
chmod -R 777 /opt/modules/ap/apache-hive-1.2.1-bin/
source /etc/crontab
service crond start
查看是否有数据进入hive
use applogsdb;
select * from ext_startup_logs;
5、编写udf函数
略
6、导出udf函数 jar包 app_log_hive.jar 到hive/lib
在当前进程中添加jar包
hive>add jar /opt/modules/ap/apache-hive-1.2.1-bin/lib/app_logs_hive.jar;
在 hive-site.xml中添加jar包
<property>
<name>hive.aux.jars.path</name>
<value>file:///opt/module/hive/lib/app_logs_hive.jar</value>
</property>
<property>
<name>hive.aux.jars.path</name>
<value>file:///opt/module/hive/lib/json-serde-1.3.8-jar-with-dependencies.jar,file:///opt/module/hive/lib/app_logs_hive.jar</value>
</property>
添加函数
create function getdaybegin AS 'com.atguigu.hive.DayBeginUDF';
create function getweekbegin AS 'com.atguigu.hive.WeekBeginUDF';
create function getmonthbegin AS 'com.atguigu.hive.MonthBeginUDF';
create function formattime AS 'com.atguigu.hive.FormatTimeUDF';
7、测试今日新增、昨日新增等统计指标sql
select
count(*)
from
(select min(createdatms) mintime
from ext_startup_logs
where appid = 'sdk34734'
group by deviceid
having mintime >= getdaybegin() and mintime < getdaybegin(1)
)t ;
。。。。。。。指标sql略
8、启动hiveserver2
/opt/modules/ap/apache-hive-1.2.1-bin/bin/hiveserver2 &
9、启动visulise_web工程
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。