赞
踩
本文基于Spark重构基于Hive的电商数据分析的项目需求,在重构的同时对Spark On Hive的全流程进行详细的讲解。
所谓的Spark On X
指的是从X数据源中获取数据并在Spark进行计算之后,将计算结果导入该数据库或者数仓。获取数据和导入数据的地方可以是不同的。
首先,创建一个空的Maven工程,在创建之后,我们需要检查一系列配置,以保证JDK版本的一致性。同时,我们需要创建出Scala的编码环境。具体可参考以下文章:
Maven工程配置与常见问题解决指南
和
Scala01 —— Scala基础
2.1 在Spark On Hive
的项目中,我们需要有两个核心配置文件。
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.ybg</groupId> <artifactId>warehouse_ebs_2</artifactId> <version>1.0</version> <properties> <maven.compiler.source>8</maven.compiler.source> <maven.compiler.target>8</maven.compiler.target> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <spark.version>3.1.2</spark.version> <spark.scala.version>2.12</spark.scala.version> <hadoop.version>3.1.3</hadoop.version> <mysql.version>8.0.33</mysql.version> <hive.version>3.1.2</hive.version> <hbase.version>2.3.5</hbase.version> <jackson.version>2.10.0</jackson.version> </properties> <dependencies> <!-- spark-core --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_${spark.scala.version}</artifactId> <version>${spark.version}</version> </dependency> <!-- spark-sql --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_${spark.scala.version}</artifactId> <version>${spark.version}</version> </dependency> <!-- spark-hive --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-hive_${spark.scala.version}</artifactId> <version>${spark.version}</version> </dependency> <!-- hadoop-common --> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>${hadoop.version}</version> </dependency> <!-- mysql --> <dependency> <groupId>com.mysql</groupId> <artifactId>mysql-connector-j</artifactId> <version>${mysql.version}</version> </dependency> <!-- hive-exec --> <dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-exec</artifactId> <version>${hive.version}</version> <exclusions> <exclusion> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-slf4j-impl</artifactId> </exclusion> </exclusions> </dependency> <!-- HBase 驱动 --> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-client</artifactId> <version>${hbase.version}</version> </dependency> <!-- jackson-databind --> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-core</artifactId> <version>${jackson.version}</version> </dependency> <!-- jackson-databind --> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> <version>${jackson.version}</version> </dependency> </dependencies> </project>
log4j.properties
文件的主要作用是配置日志系统的行为,包括控制日志信息的输出和实现滚动事件日志。log4j.rootLogger=ERROR, stdout, logfile
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
----------------------- 滚动事件日志代码 -----------------------
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
log4j.appender.logfile=org.apache.log4j.DailyRollingFileAppender
log4j.appender.logfile.DatePattern='.'yyyy-MM-dd
log4j.appender.logfile.append=true
---------------------------------------------------------------
log4j.appender.logfile.File=log/spark_first.log
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n
2.2 组件核心配置文件
在工程的resources
目录下,需要存放在虚拟机中大数据服务的核心组件的配置文件,以便于Spark On Hive中调用大数据组件服务能够正常进行。
Spark On Hive
Spark On MySQL
SparkFactory配置表如下:
配置表
package core object Validator { /** * 数据校验 * * @param title 校验主题 * @param value 待校验的值 * @param regex 若待校验值为字符串,且有特定的规则,那么提供正则表达式进一步验证格式 */ def check(title: String, value: Any, regex: String = null) = { if (null == value) { throw new RuntimeException(s"value for $title null pointer exception") } if (value.isInstanceOf[String]) { if (value.toString.isEmpty) { throw new RuntimeException(s"value for $title empty string exception") } if (null != regex && !value.toString.matches(regex)) { throw new RuntimeException(s"value for $title not match $regex exception") } } } }
SparkFactory
类的作用是能够工厂化地创建和配置SparkSession
实例,通过一系列的set
和check
方法来确保配置项的有效性和正确性,并最终生成一个配置好的SparkSession
实例。package core import core.SparkFactory.Builder import core.Validator.check import org.apache.spark.SparkConf import org.apache.spark.sql.SparkSession class SparkFactory { def build():Builder={ new Builder { val conf = new SparkConf() /** * 先检查配置项名称是否正确 * 再检查配置项的值是否正确 * @param item 配置项名称 * @param value 配置项值 * @param regexValue 配置项正则规则 */ private def set(item:String,value:String,regexValue:String=null)={ check("name_of_config_item",item,"^spark\\..*") check(item,value,regexValue) conf.set(item,value) } // Base private def setBaseAppName(appName:String)={ set("spark.app.name",appName,"^\\w+$") } private def setBaseMaster(master:String)={ set("spark.master",master, "local(\\[(\\*|[1-9][0-9]*)])?|spark://([a-z]\\w+|\\d{1,3}(\\.\\d{1,3}){3}):\\d{4,5}|yarn") } private def setBaseDeployMode(deployMode:String)={ set("spark.submit.deployMode",deployMode,"client|cluster") } private def setBaseEventLogEnabled(eventLogEnabled:Boolean)={ set("spark.eventLog.enabled",s"$eventLogEnabled") } override def baseConfig(appName: String, master: String = "local[*]", deployMode: String = "client", eventLogEnabled: Boolean = false): Builder = { setBaseAppName(appName) setBaseMaster(master) setBaseDeployMode(deployMode) setBaseEventLogEnabled(eventLogEnabled) this } // Driver private def setDriverMemory(memoryGB:Int)={ set("spark.driver.memory",s"${memoryGB}g","[1-9]\\d*g") } private def setDriverCoreNum(coreNum: Int) = { set("spark.driver.cores", s"${coreNum}", "[1-9]\\d*") } private def setDriverMaxResultGB(maxRstGB:Int)={ set("spark.driver.maxResultSize",s"${maxRstGB}g","[1-9]\\d*g") } private def setDriverHost(driverHost:String)={ set("spark.driver.host",driverHost,"localhost|[a-z]\\w+") } override def optimizeDriver(memoryGB: Int = 2, coreNum: Int = 1, maxRstGB: Int = 1, driverHost: String = "localhost"): Builder = { setDriverMemory(memoryGB) setDriverCoreNum(coreNum) /** * 每一个Spark行动算子触发的所有分区序列化结果大小上限 */ setDriverMaxResultGB(maxRstGB) /** * Standalone 模式需要设置 DriverHost,便于 executor 与 master 通信 */ if (conf.get("spark.master").startsWith("spark://")) { setDriverHost(driverHost) } this } // Executor private def setExecutorMemory(memoryGB: Int) = { set("spark.executor.memory", s"${memoryGB}g", "[1-9]\\d*g") } private def setExecutorCoreNum(coreNum: Int) = { set("spark.executor.cores", s"${coreNum}", "[1-9]\\d*") } override def optimizeExecutor(memoryGB:Int=1,coreNum:Int=1):Builder={ setExecutorMemory(memoryGB) /** * Yarn模式下只能由1个核 * 其他模式下,核数为所有可用的核 */ if(!conf.get("spark.master").equals("yarn")){ setExecutorCoreNum(coreNum) } this } // Limit private def setLimitMaxCores(maxCores:Int)={ set("spark.cores.max",s"${maxCores}","[1-9]\\d*") } private def setLimitMaxTaskFailure(maxTaskFailure:Int)={ set("spark.task.maxFailures",s"${maxTaskFailure}","[1-9]\\d*") } private def setLimitMaxLocalWaitS(maxLocalWaitS:Int)={ set("spark.locality.wait",s"${maxLocalWaitS}s","[1-9]\\d*s") } override def optimizeLimit(maxCores:Int=4, maxTaskFailure:Int=3, maxLocalWaitS:Int=30):Builder={ if (conf.get("spark.master").startsWith("spark://")) { setLimitMaxCores(maxCores) } /** * 单个任务允许失败最大次数,超出会杀死本次任务 */ setLimitMaxTaskFailure(maxTaskFailure) /** * 数据本地化读取加载的最大等待时间 * 大任务:建议适当增加此值 */ setLimitMaxLocalWaitS(maxLocalWaitS) this } // Serializer override def optimizeSerializer(serde:String="org.apache.spark.serializer.JavaSerializer" ,clas:Array[Class[_]]=null):Builder={ /** * 设置将需要通过网络发送或快速缓存的对象序列化工具类 * 默认为JavaSerializer * 为了提速,推荐设置为KryoSerializer * 若采用 KryoSerializer,需要将所有自定义的实体类(样例类)注册到配置中心 */ set("spark.serializer",serde,"([a-z]+\\.)+[A-Z]\\w*") if(serde.equals("org.apache.spark.serializer.KryoSerializer")){ conf.registerKryoClasses(clas) } this } // Net private def setNetTimeout(netTimeoutS:Int)={ set("spark.network.timeout",s"${netTimeoutS}s","[1-9]\\d*s") } private def setNetSchedulerMode(schedulerMode:String)={ set("spark.scheduler.mode",schedulerMode,"FAIR|FIFO") } override def optimizeNetAbout(netTimeOusS:Int=120,schedulerMode:String="FAIR"):Builder={ /** * 所有和网络交互相关的超时阈值 */ setNetTimeout(netTimeOusS) /** * 多人工作模式下,建议设置为FAIR */ setNetSchedulerMode(schedulerMode) this } // Dynamic private def setDynamicEnabled(dynamicEnabled:Boolean)={ set("spark.dynamicAllocation.enabled",s"${dynamicEnabled}") } private def setDynamicInitialExecutors(initialExecutors:Int)={ set("spark.dynamicAllocation.initialExecutors",s"${initialExecutors}","[1-9]\\d*") } private def setDynamicMinExecutors(minExecutors:Int)={ set("spark.dynamicAllocation.minExecutors",s"${minExecutors}","[1-9]\\d*") } private def setDynamicMaxExecutors(maxExecutors:Int)={ set("spark.dynamicAllocation.maxExecutors",s"${maxExecutors}","[1-9]\\d*") } override def optimizeDynamicAllocation(dynamicEnabled:Boolean=false,initialExecutors:Int=3,minExecutors:Int=0,maxExecutors:Int=6):Builder={ /** * 根据应用的工作需求,动态分配executor */ setDynamicEnabled(dynamicEnabled) if(dynamicEnabled){ setDynamicInitialExecutors(initialExecutors) setDynamicMinExecutors(minExecutors) setDynamicMaxExecutors(maxExecutors) } this } // Shuffle def setShuffleParallelism(parallelism:Int=3)={ set("spark.default.parallelism",s"${parallelism}","[1-9]\\d*") } def setShuffleCompressEnabled(shuffleCompressEnabled:Boolean=false)={ set("spark.shuffle.compress",s"${shuffleCompressEnabled}") } def setShuffleMaxSizePerReducer(maxSizeMB:Int=128)={ set("spark.shuffle.maxSizeInFlight",s"${maxSizeMB}m","[1-9]\\d*m") } def setShuffleServiceEnabled(shuffleServiceEnabled:Boolean=true)={ set("spark.shuffle.service.enabled",s"${shuffleServiceEnabled}") } override def optimizeShuffle(parallelism:Int=3,shuffleCompressEnabled:Boolean=false, maxSizeMB:Int=48,shuffleServiceEnabled:Boolean=false):Builder={ /** * 如果用户没有指定分区数,则采用该值作为默认的分区数 */ setShuffleParallelism(3) /** * Shuffle 过程中 Map 端的输出数据是否压缩,建议生成过程中,数据规模较大时开启 */ setShuffleCompressEnabled(shuffleCompressEnabled) /** * 设置Reducer端的缓冲区大小,生产环境中,服务器内存较大时,可以适当调大 */ setShuffleMaxSizePerReducer(maxSizeMB) /** * 开启一个独立的外部服务,专门存储Executor产生的中间数据 */ setShuffleServiceEnabled(shuffleServiceEnabled) this } // Speculation def setSpeculationEnabled(speculationEnabled:Boolean)={ set("spark.speculation",s"${speculationEnabled}") } def setSpeculationInterval(interval:Int)={ set("spark.speculation.interval",s"${interval}s","[1-9]\\d*s") } def setSpeculationQuantile(quantile:Float)={ set("spark.speculation.quantile",s"${quantile}","0?\\.\\d+") } override def optimizeSpeculation(speculationEnabled:Boolean=false,interval:Int=5,quantile:Float=0.75F):Builder={ /** * 是否开启推测执行服务,将各阶段(Stage)中执行慢的任务(Task)重启 */ setSpeculationEnabled(true) /** * 设置推测执行频次 */ setSpeculationInterval(interval) /** * 设置推测执行阈值 */ setSpeculationQuantile(quantile) this } // Warehouse override def warehouseDir(hdfs:String):Builder={ set("spark.sql.warehouse.dir",hdfs,"hdfs://([a-z]\\w+|\\d{1,3}(\\.\\d{1,3}){3}):\\d{4,5}(/\\w+)+") this } override def end():SparkSession={ SparkSession .builder() .config(conf) .enableHiveSupport() .getOrCreate() } } } } object SparkFactory { def apply(): SparkFactory = new SparkFactory() trait Builder{ // 默认值能给就给 /** * 基本配置 * @param appName * @param master 默认是本地方式 * @param deployMode 默认是集群模式 * @param eventLogEnabled 生产环境打开,测试环境关闭 * @return */ def baseConfig(appName:String,master:String="local[*]",deployMode:String="client",eventLogEnabled:Boolean=false):Builder /** * 驱动端优化配置 * @param memoryGB 驱动程序的内存大小 * @param coreNum 驱动程序的核数 * @param maxRstGB 驱动程序的最大结果大小 * @param driverHost 驱动程序的主机地址:驱动程序会在主机地址上运行,并且集群中的其他节点会通过这个地址与驱动程序通信 * @return */ def optimizeDriver(memoryGB:Int=2,coreNum:Int=1,maxRstGB:Int=1,driverHost:String="localhost"):Builder def optimizeExecutor(memoryGB:Int=1,coreNum:Int=1):Builder /** * 整体限制配置 * @param maxCores 整体可用的最大核数 * @param maxTaskFailure 单个任务失败的最大次数 * @param maxLocalWaitS 容错机制:数据读取阶段允许等待的最长时间,超过时间切换到其他副本。 * @return */ def optimizeLimit(maxCores:Int=4,maxTaskFailure:Int=3,maxLocalWaitS:Int=30):Builder /** * 默认使用:Java序列化 * 推荐使用:Kryo序列化 提速或对速度又要i去 * 所有的自定义类型都要注册到Spark中,才能完成序列化。 * @param serde 全包路径 * @param classes 自定义类型,默认认为不需要指定,Class[_]表示类型未知。 * @return Builder */ def optimizeSerializer(serde:String="org.apache.spark.serializer.JavaSerializer",clas:Array[Class[_]]=null):Builder /** * 在Spark的官方配置中,netTimeOutS可能被很多超时的数据调用。 * @param netTimeOusS 判定网络超时的时间 * @param schedulerMode 可能很多任务一起跑,因此公平调度 * @return */ def optimizeNetAbout(netTimeOusS:Int=180,schedulerMode:String="FAIR"):Builder /** * 动态分配->按需分配 * 类似于配置线程池中的最大闲置线程数,根据需要去做动态分配 * @param dynamicEnabled 是否开启动态分配 * @param initialExecutors 初始启用的Executors的数量 * @param minExecutors 最小启用的Executors的数量 * @param maxExecutors 最大启用的Executors的数量 * @return */ def optimizeDynamicAllocation(dynamicEnabled:Boolean=false,initialExecutors:Int=3,minExecutors:Int=0,maxExecutors:Int=6):Builder /** * 特指在没有指定分区数时,对分区数的配置。 * 并行度和初始启用的Executors的数量一致,避免额外开销。 * * @param parallelism * @param shuffleCompressEnabled * @param maxSizeMB * @param shuffleServiceEnabled * @return */ def optimizeShuffle(parallelism:Int=3,shuffleCompressEnabled:Boolean=false,maxSizeMB:Int=128,shuffleServiceEnabled:Boolean=true):Builder /** * 推测执行,将运行时间长的任务,放到队列中,等待运行时间短的任务运行完成后,再运行。 * @param enabled * @param interval Spark检查任务执行时间的时间间隔,单位是秒。 * @param quantile 如果某个任务的执行时间超过指定分位数(如75%的任务执行时间),则认为该任务执行时间过长,需要启动推测执行。 */ def optimizeSpeculation(enabled:Boolean=false,interval:Int=15,quantile:Float=0.75f):Builder def warehouseDir(hdfs:String):Builder def end():SparkSession } }
package core import core.MySQLConfigFactory.{Getter, Setter} import core.Validator.check import java.util.Properties class MySQLConfigFactory { def build():Setter={ new Setter { val conf = new Properties(); override def setDriver(driverCla: String): Setter = { check("name_of_mysql_driver_class",driverCla,"com\\.mysql(\\.cj)?\\.jdbc\\.Driver") conf.setProperty("driver",driverCla) this } override def setUrl(url: String): Setter = { check("url_to_connect_mysql", url, "jdbc:mysql://([a-z]\\w+|\\d{1,3}(\\.\\d{1,3}){3}):\\d{4,5}/[a-z]\\w+(\\?.+)?") conf.setProperty("url", url) this } override def setUser(user: String): Setter = { check("user_to_connect_mysql", user) conf.setProperty("user", user) this } override def setPassword(password: String): Setter = { check("password_to_connect_mysql", password) conf.setProperty("password", password) this } override def finish(): Getter = { new Getter { override def getUrl: String = conf.getProperty("url") override def getConf: Properties = conf } } } } } object MySQLConfigFactory { def apply(): MySQLConfigFactory = new MySQLConfigFactory() trait Getter{ def getUrl:String def getConf:Properties } trait Setter { def setDriver(driverCla:String):Setter def setUrl(url:String):Setter def setUser(user:String):Setter def setPassword(password:String):Setter def finish():Getter } }
package test import core.{MySQLConfigFactory, SparkFactory} import org.apache.spark.sql.SparkSession object Test { def main(args: Array[String]): Unit = { // Spark On Hive val spark: SparkSession = SparkFactory() .build() .baseConfig("ebs_01") .optimizeDriver() .optimizeExecutor() .optimizeLimit() .optimizeSerializer() .optimizeNetAbout() .optimizeDynamicAllocation() .optimizeShuffle() .optimizeSpeculation() .warehouseDir("hdfs://single01:9000/hive312/warehouse") .end() spark.table("yb12211.transaction") .show(10) // Spark On MySQL val getter: MySQLConfigFactory.Getter = MySQLConfigFactory().build() .setDriver("com.mysql.cj.jdbc.Driver") .setUrl("jdbc:mysql://single01:3306/yb12211?useSSL=false&serverTimezone=UTC&allowPublicKeyRetrieval=true") .setUser("root") .setPassword("123456") .finish() spark.read.jdbc(getter.getUrl, "test_table1_for_hbase_import", getter.getConf) spark.stop() } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。