赞
踩
本项目需求运行在CentOS7下Hadoop10单机环境:
Windows11 开发环境:
本项目使用一台linux虚拟机,主机名Hadoop10,CentOS7系统。
虚拟机服务器jdk选用1.8,包括完整生态的hadoop spark;
本地主机为win11,scala2.12.17,在idea集成环境中编写。
sh脚本执行jar生成用户数据和日志行为数据:“/opt/app/genlog.sh”
jar文件位置:“/opt/app/log-generator-jar-with-dependencies.jar”
[root@hadoop10 app]# sh genlog.sh user
generating user data ...
[root@hadoop10 app]# sh genlog.sh log
generating log data ...
这里上传2023-06-21的日期,以后的需求中需要更多的日期数据,需要重复进行该小节的步骤生成数据。请注意,由于本次的课程项目是在模拟实际生产环境,因此直到上传到HDFS才可以算作是T-1日的log数据完成。
[root@hadoop10 sparkdw]# hdfs dfs -mkdir -p /logdata/app/2023-06-21
[root@hadoop10 sparkdw]# hdfs dfs -put /root/moni_data/app.access.log.2023-06-21 /logdata/app/2023-06-21
本节内容将ODS层数据完成构建在Hive数据库中,Hive提前启动。本节提供Hive数据库的搭建,ODS层代码,以及多种导入数据的方式,包括使用Shell脚本导入数据库的方式,使用Shell脚本是为了方便程序能够上调度。
create database ods; create table ods.app_event_log( account string ,appid string ,appversion string ,carrier string ,deviceid string ,devicetype string ,eventid string ,ip string ,latitude double ,longitude double ,nettype string ,osname string ,osversion string ,properties map<string,string> ,releasechannel string ,resolution string ,sessionid string ,`timestamp` bigint )partitioned by(dt string) ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe' STORED AS TEXTFILE; load data inpath '/logdata/app/2023-06-22' into table ods.app_event_log partition(dt='2023-06-22') select *from ods.app_event_log where dt='2023-06-22'
方式一:直接在Hive中导入HDFS中的原始日志数据,导入成功截图:
这是方拾贰:
使用shell脚本"01.导入数据到ODS.sh":
#! /bin/bash #可以传递一个日期参数,如果没有写就使用T-1日期 #sh 01.导入数据到ODS.sh 2023-06-22 #load data inpath '/logdata/app/2023-06-22' into table ods.app_event_log partition(dt='2023-06-22') #sh 01.导入数据到ODS.sh #load data inpath '/logdata/app/2023-06-25' into table ods.app_event_log partition(dt='2023-06-25') dt=$1 if [ "x"$1 == "x" ] then dt=$(date -d "1 days ago" +%Y-%m-%d) fi echo " 执行日期 ---------- $dt ----------------- " #name=zs #echo "姓名:'$name'" #echo '姓名:"$name"' #hive -e "sql" hive -e "load data inpath '/logdata/app/$dt' into table ods.app_event_log partition(dt='$dt')"
此脚本的好处是可以将生成日志日期给一个变量指定。
[root@hadoop10 sparkdw]# sh 01.导入数据到ODS.sh 2023-06-21
DWD层将进行初步的数据清洗。由于数据清洗是经常性的,每天都要执行的代码应该写入shell脚本,本章小节会使用shell脚本调用scala程序,将jar包放在服务器上使用yarn模式运行,在spark上进行数据清洗工作。其中编写了两个Spark工具类,用于测试和生产环境中更方便的读取配置文件和传日期变量。
开始数据库insert操作之前需要在hive中创建临时表,如下:
create database tmp; create table if not exists tmp.event_log_washed( account string ,appid string ,appversion string ,carrier string ,deviceid string ,devicetype string ,eventid string ,ip string ,latitude double ,longitude double ,nettype string ,osname string ,osversion string ,properties map<string,string> ,releasechannel string ,resolution string ,sessionid string ,`timestamp` bigint )partitioned by(dt string) STORED AS orc TBLPROPERTIES ('orc.compress'='SNAPPY'); select * from tmp.event_log_washed;
数据清洗代码编写开始之前需要新建一个maven项目,项目名见下面代码。
注意:此依赖是本章节所有项目的完整依赖,包括本地模式和优化后的服务器提交yarn模式。
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>org.example</groupId> <artifactId>spark-dw</artifactId> <version>1.0-SNAPSHOT</version> <dependencies> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.12</artifactId> <version>3.2.0</version> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.38</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.12</artifactId> <version>3.2.0</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-hive_2.12</artifactId> <version>3.2.0</version> </dependency> <dependency> <groupId>org.apache.hive.hcatalog</groupId> <artifactId>hive-hcatalog-core</artifactId> <version>3.1.2</version> </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-annotations</artifactId> <version>2.12.1</version> </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-core</artifactId> <version>2.12.4</version> </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> <version>2.12.4</version> </dependency> <dependency> <groupId>com.typesafe</groupId> <artifactId>config</artifactId> <version>1.3.3</version> </dependency> </dependencies> <build> <finalName>spark-dw</finalName> <extensions> <extension> <groupId>org.apache.maven.wagon</groupId> <artifactId>wagon-ssh</artifactId> <version>2.8</version> </extension> </extensions> <plugins> <plugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <version>4.4.0</version> <executions> <execution> <id>scala-compile-first</id> <phase>process-resources</phase> <goals> <goal>add-source</goal> <goal>compile</goal> </goals> </execution> </executions> </plugin> <plugin> <artifactId>maven-assembly-plugin </artifactId> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> <plugin> <groupId>org.codehaus.mojo</groupId> <artifactId>wagon-maven-plugin</artifactId> <version>1.0</version> <configuration> <!--上传的本地jar的位置--> <fromFile>target/${project.build.finalName}.jar</fromFile> <!--远程拷贝的地址--> <url>scp://root:root@hadoop10:/opt/app</url> </configuration> </plugin> </plugins> </build> </project>
在spark scala编写数据清洗代码:
还有一个resources目录下的配置文件:hive-site,这一节忘了给了,在下一小节中给出了,是关于hive元数据的。
package com.yh.ods_etl import org.apache.spark.sql.SparkSession object AppLogWash_01 { def main(args: Array[String]): Unit = { val spark = SparkSession.builder() .master("local[*]") .appName("AppLogWash_01") .enableHiveSupport().getOrCreate(); /** * 过滤掉日志中缺少关键字段(deviceid/properties/eventid/sessionid 缺任何一个都不行)的记录 * 过滤掉日志中不符合时间段的记录(由于app上报日志可能的延迟,有数据延迟到达) * ods.app_event_log * tmp.event_log_washed */ spark.sql( """ | |insert overwrite table tmp.event_log_washed |partition(dt='2023-06-22') |select | account | ,appid | ,appversion | ,carrier | ,deviceid | ,devicetype | ,eventid | ,ip | ,latitude | ,longitude | ,nettype | ,osname | ,osversion | ,properties | ,releasechannel | ,resolution | ,sessionid | ,`timestamp` |from ods.app_event_log |where dt = '2023-06-22' |and deviceid is not null and trim(deviceid) != '' |and eventid is not null and trim(eventid) != '' |and sessionid is not null and trim(sessionid) != '' |and properties is not null and size(properties) > 0 |and from_unixtime(cast(substring(`timestamp`,1,10) as bigint),'yyyy-MM-dd') = '2023-06-22' | |""".stripMargin) spark.stop() } }
此为在spark中的local模式下运行,仅用于测试环境,在生产环境中需要打包进服务器。
运行spark scala代码后查看hive表:
开始之前需要将刚才local模式中插入的数据清空,以便于测试:
select * from tmp.event_log_washed where dt='2023-06-22'
alter table tmp.event_log_washed drop partition(dt='2023-06-22')
AppLogWash_01.scala:
注意:这个scala文件中主函数传入了一个参数,是为了便于在服务器提交时写日期参数,如果在spoark中测试时,可以在edit configuration中填写,如图:
package com.yh.ods_etl import com.yh.utils.SparkUtils object AppLogWash_01 { def main(args: Array[String]): Unit = { if(args.length == 0){ println("缺失参数") System.exit(0) } val spark = SparkUtils.getSparkSession("AppLogWash_01")//传参数APPname /** * 过滤掉日志中缺少关键字段(deviceid/properties/eventid/sessionid 缺任何一个都不行)的记录 * 过滤掉日志中不符合时间段的记录(由于app上报日志可能的延迟,有数据延迟到达) * ods.app_event_log * tmp.event_log_washed */ val dt = args(0) spark.sql( s""" | |insert overwrite table tmp.event_log_washed |partition(dt='${dt}') |select | account | ,appid | ,appversion | ,carrier | ,deviceid | ,devicetype | ,eventid | ,ip | ,latitude | ,longitude | ,nettype | ,osname | ,osversion | ,properties | ,releasechannel | ,resolution | ,sessionid | ,`timestamp` |from ods.app_event_log |where dt = '${dt}' |and deviceid is not null and trim(deviceid) != '' |and eventid is not null and trim(eventid) != '' |and sessionid is not null and trim(sessionid) != '' |and properties is not null and size(properties) > 0 |and from_unixtime(cast(substring(`timestamp`,1,10) as bigint),'yyyy-MM-dd') = '${dt}' | |""".stripMargin) spark.stop() } }
这三个配置文件结合下一小节中的两个工具类食用。
application.properties:
此文件为修改local模式和yarn模式的配置开启关闭,需要手动操作。
#local.run为true代表在本地测试,否则在集群测试,打包之前改为false
local.run=false
common-version-info.properties:
version=2.7.6
hive-site.xml:
<?xml version="1.0" encoding="UTF-8" standalone="no"?> <?xml-stylesheet type="text/xsl" href="configuration.xsl"?> <configuration> <!-- mysql数据库在hive体系架构中的作用? 答案:存储元数据 元数据:描述数据的数据 spark整合hive开发,读取hive表需要连接mysql数据库获取元数据信息 spark(hive cli) 连接hadoop10已经启动的metastore metastore连接mysql数据库 --> <property> <name>hive.metastore.uris</name> <value>thrift://hadoop10:9083</value> </property> </configuration>
SparkUtils.scala:
切换模式yarn和local模式
package com.yh.utils import org.apache.spark.sql.SparkSession object SparkUtils { def getSparkSession(appName:String) ={ var spark:SparkSession = null if(ConfigUtils.flag){ spark = SparkSession.builder() .master("local[*]") .appName(appName) .enableHiveSupport().getOrCreate() }else{ spark = SparkSession.builder() .master("yarn") .appName(appName) .enableHiveSupport().getOrCreate() } spark } }
ConfigUtils.scala:
通过配置文件确定当前模式是local还是yarn
package com.yh.utils
import com.typesafe.config.{Config, ConfigFactory}
object ConfigUtils {
//1.加载application.properties配置文件
val config: Config = ConfigFactory.load()
//2.根据key获取value
val flag = config.getBoolean("local.run")
}
在这里需要注意的是,经过本地测试后,上传到服务器的jar包一定要是yarn模式的,通过前面我们编写的传参主函数传入日期。jar包的名字可以通过配置finalname修改。我们所使用的依赖也需要全部打包进jar包上传到服务器,使用插件可以完成将所有依赖打包的操作。上述事项只需要复制前文中我整理好的依赖管理即可正常打包使用,完整jar包大约293M。
打包成功截图:
jar包上传到服务器
服务器提交命令:
[root@hadoop10 app]# spark-submit --master yarn --class com.yh.ods_etl.AppLogWash_01 /opt/app/spark-dw-jar-with-dependencies.jar 2023-06-22
23/06/26 06:43:46 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/06/26 06:43:50 WARN Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.
23/06/26 06:44:27 WARN SessionState: METASTORE_FILTER_HOOK will be ignored, since hive.security.authorization.manager is set to instance of HiveAuthorizerFactory.
提交成功,查看完成的数据:
编写shell运行:
[root@hadoop10 sparkdw]# sh 02.数据清洗.sh 2023-06-22
执行日期 ---------- 2023-06-22 -----------------
23/06/26 06:52:40 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/06/26 06:52:43 WARN Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.
23/06/26 06:53:19 WARN SessionState: METASTORE_FILTER_HOOK will be ignored, since hive.security.authorization.manager is set to instance of HiveAuthorizerFactory.
02.数据清洗.sh
#! /bin/bash dt=$1 if [ "x"$1 == "x" ] then dt=$(date -d "1 days ago" +%Y-%m-%d) fi echo " 执行日期 ---------- $dt ----------------- " spark-submit \ --master yarn \ --class com.yh.ods_etl.AppLogWash_01 \ --conf spark.defalut.parallelism=10 \ --driver-memory 1G \ --driver-cores 2 \ --executor-memory 2G \ --num-executors 3 \ --executor-cores 2 \ --queue abc \ /opt/app/spark-dw-jar-with-dependencies.jar $dt
1.解决了23/06/26 09:14:23 WARN Client: Neither spark.yarn.jars nor spark.yarn.archive is set, faploading libraries under SPARK_HOME.
这个警告问题。这个警告信息表明在提交Spark应用程序时,没有明确指定spark.yarn.jars或spark.yarn.archive参数,导致Spark将依赖的库文件上传到SPARK_HOME目录下。
2.解决了每次都要打包上传大文件依赖的问题,将--jars /opt/app/spark-dw-jar-with-dependencies.jar
进行指定。
#! /bin/bash dt=$1 if [ "x"$1 == "x" ] then dt=$(date -d "1 days ago" +%Y-%m-%d) fi echo " 执行日期 ---------- $dt ----conf spark.defalut.parallelism=10 \--------------- " spark-submit \ --master yarn \ --class com.yh.ods_etl.AppLogWash_01 \ --conf spark.yarn.jars=local:/opt/installs/spark3.2.0/jars/* \ --driver-memory 1G \ --driver-cores 2 \ --executor-memory 2G \ --num-executors 3 \ --executor-cores 2 \ --queue abc \ --jars /opt/app/spark-dw-jar-with-dependencies.jar \ /opt/app/spark-dw.jar $dt
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。