赞
踩
1、/etc/yum.repos.d 有一堆配置文件:
CentOS-Media 使用光盘挂载后调用的文件
CentOS-Base 网后基础的源,一般都用这个
CentOS-Vault 最近新版本的加入的老版本的yum源配置
CentOS-Debuginfo debug包尤其和内核相关的更新和软件安装
在该路径下建一个backup目录,将这些文件移进去,作为备份(仅保留CentOS-Base)
备注:文件中enabled是开启选项,1是开启,0是不开启
2、下载aliyun yum源repo文件
wget -O /etc/yum.repos.d/CentOS-Base.repo http://mirrors.aliyun.com/repo/Centos-7.repo
3、相关命令
yum clean all // 清除缓存
yum makecache // 把yum源缓存到本地,加快软件的搜索好安装速度
yum list
yum repolist all // 显示所有仓库包
4、各种源之间还可以设置优先级
安装之前系统先做镜像!!!
使用mysql作为元数据管理工具。
我的配置:
Node1:hdfs(master/slave) + Spark(Master/slave) + Hive
Node2:hdfs(slave) + Spark(slave) + Hive
Node3:hdfs(slave) + Spark(slave) + MySQL + Hive
MySQL、Hive的安装使用Hadoop课程的软件及方法
1、虚拟机备份(做快照) 2、安装MySQL,创建Hive的用户并授权 3、安装Hive 修改环境变量 修改hive-site.xml <?xml version="1.0" encoding="UTF-8" standalone="no"?> <?xml-stylesheet type="text/xsl" href="configuration.xsl"?> <configuration> <property> <name>javax.jdo.option.ConnectionURL</name> <value>jdbc:mysql://node3:3306/hive?createDatabaseIfNotExist=true</value> <description>JDBC connect string for a JDBC metastore</description> </property> <property> <name>javax.jdo.option.ConnectionDriverName</name> <value>com.mysql.jdbc.Driver</value> <description>Driver class name for a JDBC metastore</description> </property> <property> <name>javax.jdo.option.ConnectionUserName</name> <value>hive</value> <description>username to use against metastore database</description> </property> <property> <name>javax.jdo.option.ConnectionPassword</name> <value>hive</value> <description>password to use against metastore database</description> </property> </configuration> 拷贝jdbc驱动 修改hive文件(可执行文件) Hive测试 4、配置Spark 将jdbc驱动拷贝到$SPARK_HOME/jars 将hive-site.xml拷贝到$SPARK_HOME/conf 测试spark-shell(正常启动,并能访问Hive) 5、配置IDEA(经常被遗忘,经常搞不定) 配置jdbc(Maven引入依赖;导入jars) 在IDEA中指定hive-site.xml文件的位置。 File->Project Structure->Modules->Dependencies->Add(最右侧)->Jars or Directories 运行测试程序
jdbc的驱动程序要拷贝到$HIVE_HOME/lib下
(备注:先启动hdfs)
启动hive
Hive启动会遇到一个错误,错误中有类似下面的信息:
ls: cannot access /opt/modules/spark-2.2.0-bin-hadoop2.7/lib/spark-assembly-*.jar: No such file or directory
修改$HIVE_HOME/bin/hive文件,找到包含有spark-assembly-*.jar的行(116行),改为:
sparkAssemblyPath=`ls ${SPARK_HOME}/jars/*.jar` vim hive :set nu 显示行号
说明:低版本spark jar打成了一个包,名字类似于spark-assembly-*.jar,在lib目录中。高版本spark的包在$SPARK_HOME/jars下
在hive中创建数据库和表 // 创建数据库sparktest。如果需要删除数据库,删除之前需要先删除其中的表 create database if not exists sparktest; // 显示一下是否创建出了sparktest数据库 hive> show databases; // 在sparktest中创建表useraction,并加载数据。最后检查hdfs中是否有对应的文件。 CREATE EXTERNAL TABLE userinfo(userid string, itemid string, behavior_type string, user_geohash string, item_category string, time string) ROW FORMAT DELIMITED FIELDS TERMINATED BY ','; load data local inpath '/home/spark/small_user.csv' into table userinfo; load data inpath 'data/userinfo.csv' into table userinfo; 如果上述语句执行的很慢,而且有类似错误: FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.DDLTask. MetaException(message:For direct MetaStore DB connections, we don't support retries at the client level.) 请在mysql中执行:alter database hive character set latin1;
Spark的配置:
1、将jdbc的驱动程序拷贝到$SPARK_HOME/jars目录下(所有节点)
2、将hive-site.xml程序拷贝到$SPARK_HOME/conf 目录下(至少安装hive的节点或程序运行的节点,建议所有节点)
启动Spark-shell,启动过程无报错。运行以下测试:
spark.sql("use sparktest")
spark.sql("select count(*) from userinfo").show
result:300001
保证每个节点都有hive,spark
Metadata即元数据:元数据包含用Hive创建的database、table等的元信息。元数据存储在关系型数据库中。如Derby、MySQL等。
Metastore的作用是:客户端连接metastore服务,metastore再去连接MySQL数据库来存取元数据。
有了metastore服务,就可以有多个客户端同时连接,而且这些客户端不需要知道MySQL数据库的用户名和密码,只需要连接 metastore 服务即可。
hive中对metastore的配置包含3部分:
metastore database
metastore server
metastore client
import org.apache.spark.sql.SparkSession object HiveDemo { def main(args: Array[String]): Unit = { val spark = SparkSession.builder() .appName("Spark Hive Demo") .master("spark://node1:7077") .enableHiveSupport() // 支持hive,这个是关键,没有不行! .getOrCreate() spark.sparkContext.setLogLevel("WARN") spark.sparkContext.addJar("/home/spark/IdeaProjects/MyLastTest/out/artifacts/MyLastTest_jar/MyLastTest.jar") spark.sql("use sparktest") spark.sql("select * from userinfo").show(false) spark.stop() } }
这个是Hive默认的启动模式,一般用于单元测试,这种存储方式有一个缺点:在同一时间只能有一个进程连接使用数据库。
当 hive-site.xml没有配置第三方库时自动使用derby库
执行初始化命令:schematool -dbType derby -initSchema
查看初始化后的信息: schematool -dbType derby -info
配置完成后就可在shell中以CLI的方式访问hive 进行操作验证。
以本地Mysql数据库为例:创建好用户:hive;database:hive。
配置文件 hive-site.xml 中jdbc URL、驱动、用户名、密码等属性值配置如下:
<property> <name>javax.jdo.option.ConnectionURL</name> <value>jdbc:mysql://localhost/hive?createDatabaseIfNotExist=true</value> <description>JDBC connect string for a JDBC metastore</description> </property> <property> <name>javax.jdo.option.ConnectionDriverName</name> <value>com.mysql.jdbc.Driver</value> <description>Driver class name for a JDBC metastore</description> </property> <property> <name>javax.jdo.option.ConnectionUserName</name> <value>hive</value> <description>username to use against metastore database</description> </property> <property> <name>javax.jdo.option.ConnectionPassword</name> <value>hive</value> <description>password to use against metastore database</description> </property> <property> <name>hive.metastore.warehouse.dir</name> <!-- base hdfs path --> <value>/user/hive/warehouse</value> <description>location of default database for the warehouse</description> </property> 注意: 需要把mysql的驱动包copy到目录 <HIVE_HOME>/lib 中 如果是第一次需要执行初始化命令: schematool -dbType mysql -initSchema
仅连接远程的mysql并不能称之为“远程模式”,是否远程指的是 metastore 和 hive 服务是否在同一进程内;
以Mysql数据库为例:创建好用户:hive;database:hive_meta。Remote方式需要分别配置服务端和客户端的配置文件:
服务端的 hive-site.xml 中jdbc URL、驱动、用户名、密码等属性值配置和上面相同:
客户端的 hive-site.xml 中jdbc URL、驱动、用户名、密码等属性值配置和上面相同 ,
再加上thrift配置找服务端
<!-- thrift://<host_name>:<port> 默认端口是9083 -->
<property>
<name>hive.metastore.uris</name>
<value>thrift://master:9083,thrift://slaver1:9083</value>
<description>Thrift uri for the remote metastore. Used by metastore client to connect to remote metastore.</description>
</property>
hive metastore 服务端启动命令:
服务端口可以不写会找配置文件的默认9083
1) hive --service metastore -p <port_num>
2)hive --service metastore &
注意客户端中的端口配置需要和启动监听的端口一致。
客户端启动
输入hive
如果不加端口默认启动:hive --service metastore,则默认监听端口是:9083 ,注意客户端中的端口配置需要和启动监听的端口一致。服务端启动正常后,客户端就可以执行hive操作了。
注意:
客户端中配置hive.metastore.uris,如 thrift://master:9083。如果有多个metastore服务器,将URL之间用逗号分隔(不能有空格)
写多个是为了当前面的宕机了会自动配置后面的uris
优先第一个当第一个没有宕机其他的客户端都连接第一个
确认metastore服务启动:
netstat -an | grep 9083
lsof –i:9083
小结:
hive metastore可以配置多个实例,防止单点故障;(推荐)
配置了metastore,启动hive的时候,本地client端就无需实例化hive的metastore,启动速度会加快;
ThriftServer是一个JDBC/ODBC接口,用户可以通过JDBC/ODBC连接ThriftServer来访问SparkSQL的数据。ThriftServer在启动的时候,会启动了一个sparkSQL的应用程序,而通过JDBC/ODBC连接进来的客户端共同分享这个sparkSQL应用程序的资源,也就是说不同的用户之间可以共享数据;ThriftServer启动时还开启一个侦听器,等待JDBC客户端的连接和提交查询。所以,在配置ThriftServer的时候,至少要配置ThriftServer的主机名和端口,如果要使用hive数据的话,还要提供hive metastore的uris。
注意: 集群模式必须保证每个节点都有metastore_db
metastore_db:在哪启动就在那生成 和 sparkwarehouse
/start-thriftserver.sh --master 类型
--hiveconf hive.server2.thrift.port=11000//端口号可改
--conf "hive.metastore.warehouse.dir=
hdfs://master:9000/user/hive/warehouse"
启动后启动beeline
bin/beeline --hiveconf hive.server2.thrift.port=11000
--conf hive.metastore.warehouse.dir=
hdfs://master:9000/user/hive/warehouse"
beeline启动后连接thriftserver 注意:别忘把驱动包复制到spark/lib下
!connect jdbc:hive2://localhost:11000
!quit //退出
!help //获取帮助
thriftserver和普通的spark-shell/spark-sql的区别?
spark-shell,spark-sql都是一个spark application
thriftserver,不管你启动多少个客户端(beeline/code),永远都是一个spark application
解决了一个数据共享的问题,多个客户端可以共享数据;
beeline : 使用它可以实现一个节点多个打开spark-sql
web UI 4040查看job
在spark shell 中操作hdfs 上的数据是很方便的,但是操作也未免过于繁琐,幸好spark 还想用户提供另外两种操作 spark sql 的方式
启动方式比较简单但一个节点只能启动一个
/start-thriftserver.sh --master 类型
--conf "hive.metastore.warehouse.dir=
hdfs://master:9000/user/hive/warehouse"
spark.sql(“SQL语句”)
HiveServer2(HS2)是一个服务端接口,使远程客户端可以执行对Hive的查询并返回结果。目前基于Thrift RPC的实现是 HiveServer 的改进版本,并支持多客户端并发和身份验证;
HiveServer、HiveServer2都是基于Thrift的。由于HiveServer不能处理多于一个客户端的并发请求,因此在Hive-0.11.0版本中重写了HiveServer代码得到了HiveServer2。
HiveServer2支持多客户端的并发和认证,为开放API客户端如JDBC、ODBC提供了更好的支持。
生产环境中使用Hive,建议使用HiveServer2来提供服务,好处很多:
beeline是从 Hive 0.11版本引入的,是Hive新的命令行客户端工具;Hive客户端工具后续将使用beeline 替代HiveCLI ,并且后续版本将也会废弃HiveCLI 客户端工具;
beeline方式相当于瘦客户端模式,采用JDBC方式借助于Hive Thrift服务访问Hive数据仓库;
从Hive 0.14版本开始,Beeline使用HiveServer2工作时,它会从HiveServer2输出日志信息到STDERR;
Beeline 要与 HiveServer2 配合使用;需要启动 HiverServer2 (在node3);
hive --service hiveserver2 &
Hiveserver2 &
使用Beeline:
启动beeline;
!connect jdbc:hive2://node3:10000
jdbc:hive2://:/
默认用户名(spark)、密码不验证(hive.server2.authentication缺省值为NONE)
执行SQL命令
检查端口:lsof –i:10000
备注:启动 hiveserver2 后立即检查,看不见任何信息;Beeline连接后检查才能看见。
退出beeline命令行则是!quit, 很多命令都是前面需要加一个感叹号, 但对于登录了后的DDL,DML,则直接运行SQL语句即可,语句后带上一个分号,然后回车执行;
Beeline和其他工具有一些不同,执行查询都是正常的SQL输入,但是如果是一些管理的命令,比如进行连接,中断,退出,执行Beeline命令需要带上“!”,不需要终止符。如:
1、!connect url –连接不同的Hive2服务器
2、!exit –退出shell
3、!help –显示全部命令列表
备注:
beeline在我的机器上可能有两个:$ HIVE_HOME/bin、$SPARK_HOME/bin
Thrift JDBC/ODBC Server (简称 Spark Thrift Server 或者 STS)是Spark SQL的Apache Hive HiveServer2的端口,通过这个端口可以允许用户使用JDBC/ODBC端口协议来执行SQL查询;
通过使用STS,用户可以用使用其他的BI工具,比如Tableau来连接Spark进行基于大数据量的报表制作;
Thrift Server在启动的时候,启动了一个SparkSQL的应用程序,而通过JDBC/ODBC连接进来的客户端共同分享SparkSQL应用程序的资源;
Thrift Server启动时还开启一个侦听器,等待JDBC客户端的连接和提交查询;
在配置Thrift Server的时候,通常要配置Thrift Server的主机名和端口,如果要使用hive数据的话,还要提供hive metastore的uris;
<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
<name>hive.metastore.uris</name>
<value>thrift://node3:9083,thrift://node1:9083</value>
</property>
<property>
<name>hive.metastore.warehouse.dir</name>
<value>/user/hive/warehouse</value>
</property>
</configuration>
备注:仅配置了hive.metastore.uris的信息,其他均采用默认配置
hive --service metastore&
9678:启动了SparkSubmit
9741:根据当前配置,在node1、node2、node3上均启动了executor
错误:Error: Could not open client transport with JDBC Uri: jdbc:hive2://s1:10000/hive: Failed to open new session: java.lang.RuntimeException: org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.authorize.AuthorizationException): User: xxx is not allowed to impersonate anonymous (state=08S01,code=0)
解决方案
在hadoop的配置文件core-site.xml增加如下配置,重启hdfs,其中“xxx”是连接beeline的用户,将“xxx”替换成自己的用户名即可
Spark Thrift Server配置 & 运行
Beeline执行连接到RDBMS:
!connect jdbc:mysql://master:3306/metastore hive hive
show databases;
import java.sql.DriverManager object SparkSQLThriftServer { def main(args: Array[String]): Unit = { // 添加驱动 val driver = "org.apache.hive.jdbc.HiveDriver" Class.forName(driver) // 获取connection val (url, username, password) = ("jdbc:hive2://master:10000", "lyb", "lyb") val connection= DriverManager.getConnection(url, username, password) val sql = "SELECT count(*) as mycount FROM test1.test" // 获取statement val statement= connection.prepareStatement(sql) // 获取结果 val res = statement.executeQuery() while(res.next()){ println(s"${res.getString("mycount")}") } // 关闭资源 res.close() statement.close() connection.close() } }
UDF: 自定义函数。函数的输入、输出都是一条数据记录,类似于Spark SQL中普通的数学或字符串函数,从实现上看就是普通的Scala函数;
为了解决一些复杂的计算,并在SQL函数与Scala函数之间左右逢源
UDF的参数视为数据表的某个列;
书写规范:
import org.apache.spark.sql.{Row, SparkSession} object UDFDemo { def main(args: Array[String]): Unit = { val spark = SparkSession.builder() .appName("UDFDemo") .master("local[*]") .getOrCreate() spark.sparkContext.setLogLevel("WARN") val data = List(("scala", "author1"), ("spark", "author2"), ("hadoop", "author3"), ("hive", "author4"), ("strom", "author5"), ("kafka", "author6")) val df = spark.createDataFrame(data).toDF("title", "author") df.createTempView("books") // 定义函数并注册 def len1(bookTitle: String):Int = bookTitle.length spark.udf.register("len1", len1 _) // UDF可以在select语句、where语句等多处使用 spark.sql("select title, author, len1(title) from books").show spark.sql("select title, author from books where len1(title)>5").show // UDF可以在DataFrame、Dataset的API中使用 import spark.implicits._ df.filter("len1(title)>5").show // 不能通过编译 //df.filter(len1($"title")>5).show // 能通过编译,但不能执行 //df.select("len1(title)").show // 不能通过编译 //df.select(len1($"title")).show // 如果要在DSL语法中使用$符号包裹字符串表示一个Column,需要用udf方法来接收函数。这种函数无需注册 import org.apache.spark.sql.functions._ val len2 = udf((bookTitle: String) => bookTitle.length) df.filter(len2($"title")>5).show df.select(len2($"title")).show // 不使用UDF df.map{case Row(title: String, author: String) => (title, author, title.length)}.show spark.stop() } }
UDAF :用户自定义聚合函数。函数本身作用于数据集合,能够在聚合操作的基础上进行自定义操作(多条数据输入,一条数据输出);类似于在group by之后使用的sum、avg等函数
abstract class UserDefinedAggregateFunction extends Serializable{ def inputSchema : StructType //inputSchema用于定义与DataFrame列有关的输入样式 def bufferSchema : StructType //bufferSchema用于定义存储聚合运算时产生的中间数据结果的Schema; def dataType : DataFrame //dataType标明了UDAF函数的返回值类型; def deterministic : Boolean //deterministic是一个布尔值,用以标记针对给定的一组输入,UDAF是否总是生成相同的结果; def initialize ( buffer : MutableAggregationBuffer) : Unit //initialize对聚合运算中间结果的初始化; def update ( buffer : MutableAggregationBuffer , input :Row) :Unit //update函数的第一个参数为bufferSchema中两个Field的索引,默认以0开始; UDAF的核心计算都发生在update函数中; update函数的第二个参数input: Row对应的并非DataFrame的行, 而是被inputSchema投影了的行; def merge (buffer1 : MutableAggregationBuffer , buffer2 : Row):Unit //merge函数负责合并两个聚合运算的buffer,再将其存储到MutableAggregationBuffer中; def evluate ( buffer :Row ): Any //evaluate函数完成对聚合Buffer值的运算,得到最终的结果 }
普通的UDF不支持数据的聚合运算。如当要对销售数据执行年度同比计算,就需要对当年和上一年的销量分别求和,然后再利用同比公式进行计算。
书写UDAF 先继承UserDefinedAggregateFunction接口
在重写他的方法
def update ( buffer : MutableAggregationBuffer , input :Row) :Unit
// UDAF的核心计算都发生在update函数中。
// 扫描每行数据,都会调用一次update,输入buffer(缓存中间结果)、input(这一行的输入值)
// update函数的第一个参数为bufferSchema中两个Field的索引,默认以0开始
// update函数的第二个参数input: Row对应的是被inputSchema投影了的行。
// 本例中每一个input就应该只有两个Field的值,input(0)代表销量,input(1)代表销售日期
class YearOnYearBasis extends UserDefinedAggregateFunction { // UDAF与DataFrame列有关的输入样式 override def inputSchema: StructType = new StructType() .add("sales", DoubleType) .add("saledate", StringType) // UDAF函数的返回值类型 override def dataType: DataType = DoubleType // 缓存中间结果 override def bufferSchema: StructType = new StructType() .add("year2014", DoubleType) .add("year2015", DoubleType) // 布尔值,用以标记针对给定的一组输入,UDAF是否总是生成相同的结果。通常用true override def deterministic: Boolean = true // initialize就是对聚合运算中间结果的初始化 override def initialize(buffer: MutableAggregationBuffer): Unit = { buffer(0) = 0.0 buffer(1) = 0.0 } // UDAF的核心计算都发生在update函数中。 // 扫描每行数据,都会调用一次update,输入buffer(缓存中间结果)、input(这一行的输入值) // update函数的第一个参数为bufferSchema中两个Field的索引,默认以0开始 // update函数的第二个参数input: Row对应的是被inputSchema投影了的行。 // 本例中每一个input就应该只有两个Field的值,input(0)代表销量,input(1)代表销售日期 override def update(buffer: MutableAggregationBuffer, input: Row): Unit = { val salenumber = input.getAs[Double](0) input.getString(1).take(4) match { case "2014" => buffer(0) = buffer.getAs[Double](0) + salenumber case "2015" => buffer(1) = buffer.getAs[Double](1) + salenumber case _ => println("ERROR!") } } // 合并两个分区的buffer1、buffer2,将最终结果保存在buffer1中 override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = { buffer1(0) = buffer1.getDouble(0) + buffer2.getDouble(0) buffer1(1) = buffer1.getDouble(1) + buffer2.getDouble(1) } // 取出buffer(缓存的值)进行运算,得到最终结果 override def evaluate(buffer: Row): Double = { println(s"evaluate : ${buffer.getDouble(0)}, ${buffer.getDouble(1)}") if (buffer.getDouble(0) == 0.0) 0.0 else (buffer.getDouble(1) - buffer.getDouble(0)) / buffer.getDouble(0) } } object UDAFDemo { def main(args: Array[String]): Unit = { Logger.getLogger("org").setLevel(Level.WARN) val spark = SparkSession.builder() .appName(s"${this.getClass.getCanonicalName}") .master("local[*]") .getOrCreate() val sales = Seq( (1, "Widget Co", 1000.00, 0.00, "AZ", "2014-01-02"), (2, "Acme Widgets", 2000.00, 500.00, "CA", "2014-02-01"), (3, "Widgetry", 1000.00, 200.00, "CA", "2015-01-11"), (4, "Widgets R Us", 2000.00, 0.0, "CA", "2015-02-19"), (5, "Ye Olde Widgete", 3000.00, 0.0, "MA", "2015-02-28") ) val salesDF = spark.createDataFrame(sales).toDF("id", "name", "sales", "discount", "state", "saleDate") salesDF.createTempView("sales") val yearOnYear = new YearOnYearBasis spark.udf.register("yearOnYear", yearOnYear) spark.sql("select yearOnYear(sales, saleDate) as yearOnYear from sales").show() spark.stop() } }
// 读取数据库中的数据
val jdbcDF = spark.read.format("jdbc").
option("url", "jdbc:mysql://localhost:3306/spark").
option("driver","com.mysql.jdbc.Driver").
option("dbtable", "student").
option("user", "hive").
option("password", "hive").load()
jdbcDF.show
jdbcDF.printSchema
备注:
1、将jdbc驱动拷贝到$SPARK_HOME/jars目录下,是最简单的做法;
2、明白每一个参数的意思,一个参数不对整个结果出不来;
3、从数据库从读大量的数据进行分析,不推荐;读取少量的数据是可以接受的,也是常见的做法。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。