当前位置:   article > 正文

hive on spark

hive on spark

一. 配置

1. hive 回顾

1.1 hive简介

  1. Hive是一个构建于Hadoop顶层的数据仓库工具
  2. 支持大规模数据存储、分析,具有良好的可扩展性
  3. 某种程度上可以看作是用户编程接口,本身不存储和处理数据
  4. 依赖分布式文件系统HDFS存储数据
  5. 依赖分布式并行计算模型MapReduce处理数据
  6. 定义了简单的类似SQL 的查询语言——HiveQL
  7. 用户可以通过编写的HiveQL语句运行MapReduce任务
  8. 可以很容易把原来构建在关系数据库上的数据仓库应用程序移植到Hadoop平台上
    是一个可以提供有效、合理、直观组织和使用数据的分析工具
    在这里插入图片描述
    在这里插入图片描述
    在这里插入图片描述
    Hive是基于Hadoop的数据仓库工具,将SQL语句转化为MapReduce任务运行

1.2 yum 设置 & 命令(Centos7)

1、/etc/yum.repos.d 有一堆配置文件:
CentOS-Media 使用光盘挂载后调用的文件
CentOS-Base 网后基础的源,一般都用这个
CentOS-Vault 最近新版本的加入的老版本的yum源配置
CentOS-Debuginfo debug包尤其和内核相关的更新和软件安装
在该路径下建一个backup目录,将这些文件移进去,作为备份(仅保留CentOS-Base)

备注:文件中enabled是开启选项,1是开启,0是不开启

2、下载aliyun yum源repo文件
wget -O /etc/yum.repos.d/CentOS-Base.repo http://mirrors.aliyun.com/repo/Centos-7.repo

3、相关命令
yum clean all // 清除缓存
yum makecache // 把yum源缓存到本地,加快软件的搜索好安装速度
yum list
yum repolist all // 显示所有仓库包

4、各种源之间还可以设置优先级

1.3 hive 安装

安装之前系统先做镜像!!!

使用mysql作为元数据管理工具。

我的配置:
Node1:hdfs(master/slave) + Spark(Master/slave) + Hive
Node2:hdfs(slave) + Spark(slave) + Hive
Node3:hdfs(slave) + Spark(slave) + MySQL + Hive

MySQL、Hive的安装使用Hadoop课程的软件及方法

1、虚拟机备份(做快照)
2、安装MySQL,创建Hive的用户并授权
3、安装Hive
	修改环境变量
	修改hive-site.xml
		<?xml version="1.0" encoding="UTF-8" standalone="no"?>
		<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
		<configuration>
		  <property>
		    <name>javax.jdo.option.ConnectionURL</name>
		    <value>jdbc:mysql://node3:3306/hive?createDatabaseIfNotExist=true</value>
		    <description>JDBC connect string for a JDBC metastore</description>
		  </property>
		  <property>
		    <name>javax.jdo.option.ConnectionDriverName</name>
		    <value>com.mysql.jdbc.Driver</value>
		    <description>Driver class name for a JDBC metastore</description>
		  </property>
		  <property>
		    <name>javax.jdo.option.ConnectionUserName</name>
		    <value>hive</value>
		    <description>username to use against metastore database</description>
		  </property>
		  <property>
		    <name>javax.jdo.option.ConnectionPassword</name>
		    <value>hive</value>
		    <description>password to use against metastore database</description>
		  </property>
		</configuration>
	拷贝jdbc驱动
	修改hive文件(可执行文件)
	Hive测试
	
4、配置Spark
	将jdbc驱动拷贝到$SPARK_HOME/jars
	将hive-site.xml拷贝到$SPARK_HOME/conf
	测试spark-shell(正常启动,并能访问Hive)
5、配置IDEA(经常被遗忘,经常搞不定)
	配置jdbc(Maven引入依赖;导入jars)
	在IDEA中指定hive-site.xml文件的位置。
	File->Project Structure->Modules->Dependencies->Add(最右侧)->Jars or Directories
	运行测试程序
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42

1.5 注意问题

jdbc的驱动程序要拷贝到$HIVE_HOME/lib下
(备注:先启动hdfs)
启动hive

Hive启动会遇到一个错误,错误中有类似下面的信息:
ls: cannot access /opt/modules/spark-2.2.0-bin-hadoop2.7/lib/spark-assembly-*.jar: No such file or directory

修改$HIVE_HOME/bin/hive文件,找到包含有spark-assembly-*.jar的行(116行),改为:
sparkAssemblyPath=`ls ${SPARK_HOME}/jars/*.jar`  vim hive   :set nu 显示行号

说明:低版本spark jar打成了一个包,名字类似于spark-assembly-*.jar,在lib目录中。高版本spark的包在$SPARK_HOME/jars下
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

1.6 hive测试

在hive中创建数据库和表
// 创建数据库sparktest。如果需要删除数据库,删除之前需要先删除其中的表
create database if not exists sparktest;
 
// 显示一下是否创建出了sparktest数据库
hive> show databases;
 
// 在sparktest中创建表useraction,并加载数据。最后检查hdfs中是否有对应的文件。
CREATE EXTERNAL TABLE 
userinfo(userid string, itemid string, behavior_type string, user_geohash string, item_category string, time string)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ',';
load data local inpath '/home/spark/small_user.csv' into table userinfo;
load data inpath 'data/userinfo.csv' into table userinfo;

如果上述语句执行的很慢,而且有类似错误:
FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.DDLTask. MetaException(message:For direct MetaStore DB connections, we don't support retries at the client level.)
请在mysql中执行:alter database hive character set latin1;
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

1.7 spark配置

Spark的配置:
1、将jdbc的驱动程序拷贝到$SPARK_HOME/jars目录下(所有节点)
2、将hive-site.xml程序拷贝到$SPARK_HOME/conf 目录下(至少安装hive的节点或程序运行的节点,建议所有节点)

启动Spark-shell,启动过程无报错。运行以下测试:
spark.sql("use sparktest")
spark.sql("select count(*) from userinfo").show
result:300001
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

保证每个节点都有hive,spark
Metadata即元数据:元数据包含用Hive创建的database、table等的元信息。元数据存储在关系型数据库中。如Derby、MySQL等。

Metastore的作用是:客户端连接metastore服务,metastore再去连接MySQL数据库来存取元数据。
有了metastore服务,就可以有多个客户端同时连接,而且这些客户端不需要知道MySQL数据库的用户名和密码,只需要连接 metastore 服务即可。
hive中对metastore的配置包含3部分:

metastore database
metastore server
metastore client

1.8 读取hive数据

import org.apache.spark.sql.SparkSession

object HiveDemo {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder()
      .appName("Spark Hive Demo")
      .master("spark://node1:7077")
      .enableHiveSupport()	// 支持hive,这个是关键,没有不行!
      .getOrCreate()

    spark.sparkContext.setLogLevel("WARN")
    spark.sparkContext.addJar("/home/spark/IdeaProjects/MyLastTest/out/artifacts/MyLastTest_jar/MyLastTest.jar")

    spark.sql("use sparktest")
    spark.sql("select * from userinfo").show(false)

    spark.stop()
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

二. hive三种模式

在这里插入图片描述

1、内嵌Derby方式

这个是Hive默认的启动模式,一般用于单元测试,这种存储方式有一个缺点:在同一时间只能有一个进程连接使用数据库。
当 hive-site.xml没有配置第三方库时自动使用derby库
执行初始化命令:schematool -dbType derby -initSchema
查看初始化后的信息: schematool -dbType derby -info
配置完成后就可在shell中以CLI的方式访问hive 进行操作验证。

2.Local方式

以本地Mysql数据库为例:创建好用户:hive;database:hive。
配置文件 hive-site.xml 中jdbc URL、驱动、用户名、密码等属性值配置如下:

<property>
  <name>javax.jdo.option.ConnectionURL</name>
  <value>jdbc:mysql://localhost/hive?createDatabaseIfNotExist=true</value>
  <description>JDBC connect string for a JDBC metastore</description>
</property>
 
<property>
  <name>javax.jdo.option.ConnectionDriverName</name>
  <value>com.mysql.jdbc.Driver</value>
  <description>Driver class name for a JDBC metastore</description>
</property>
 
<property>
  <name>javax.jdo.option.ConnectionUserName</name>
  <value>hive</value>
  <description>username to use against metastore database</description>
</property>
 
<property>
  <name>javax.jdo.option.ConnectionPassword</name>
  <value>hive</value>
  <description>password to use against metastore database</description>
</property>
 
<property>
  <name>hive.metastore.warehouse.dir</name>
  <!-- base hdfs path -->
  <value>/user/hive/warehouse</value>
  <description>location of default database for the warehouse</description>
</property>

注意: 需要把mysql的驱动包copy到目录 <HIVE_HOME>/lib 中
如果是第一次需要执行初始化命令:
schematool -dbType mysql -initSchema
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34

3.Remote方式 (远程模式)

仅连接远程的mysql并不能称之为“远程模式”,是否远程指的是 metastore 和 hive 服务是否在同一进程内;
以Mysql数据库为例:创建好用户:hive;database:hive_meta。Remote方式需要分别配置服务端和客户端的配置文件:
服务端的 hive-site.xml 中jdbc URL、驱动、用户名、密码等属性值配置和上面相同:

客户端的 hive-site.xml 中jdbc URL、驱动、用户名、密码等属性值配置和上面相同 ,
再加上thrift配置找服务端

<!-- thrift://<host_name>:<port> 默认端口是9083 -->
<property>
  <name>hive.metastore.uris</name>
  <value>thrift://master:9083,thrift://slaver1:9083</value>
  <description>Thrift uri for the remote metastore. Used by metastore client to connect to remote metastore.</description>
</property>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

hive metastore 服务端启动命令:

服务端口可以不写会找配置文件的默认9083
1) hive --service metastore -p <port_num>
2)hive --service metastore &
  • 1
  • 2
  • 3

注意客户端中的端口配置需要和启动监听的端口一致。
客户端启动

输入hive
如果不加端口默认启动:hive --service metastore,则默认监听端口是:9083 ,注意客户端中的端口配置需要和启动监听的端口一致。服务端启动正常后,客户端就可以执行hive操作了。

注意:
客户端中配置hive.metastore.uris,如 thrift://master:9083。如果有多个metastore服务器,将URL之间用逗号分隔(不能有空格)
写多个是为了当前面的宕机了会自动配置后面的uris
优先第一个当第一个没有宕机其他的客户端都连接第一个

确认metastore服务启动:
netstat -an | grep 9083
lsof –i:9083

小结:
hive metastore可以配置多个实例,防止单点故障;(推荐)
配置了metastore,启动hive的时候,本地client端就无需实例化hive的metastore,启动速度会加快;

三. spark sql 远程连接(thriftserver – beeline)

ThriftServer是一个JDBC/ODBC接口,用户可以通过JDBC/ODBC连接ThriftServer来访问SparkSQL的数据。ThriftServer在启动的时候,会启动了一个sparkSQL的应用程序,而通过JDBC/ODBC连接进来的客户端共同分享这个sparkSQL应用程序的资源,也就是说不同的用户之间可以共享数据;ThriftServer启动时还开启一个侦听器,等待JDBC客户端的连接和提交查询。所以,在配置ThriftServer的时候,至少要配置ThriftServer的主机名和端口,如果要使用hive数据的话,还要提供hive metastore的uris。

注意: 集群模式必须保证每个节点都有metastore_db
metastore_db:在哪启动就在那生成 和 sparkwarehouse
/start-thriftserver.sh --master 类型 
         --hiveconf hive.server2.thrift.port=11000//端口号可改
--conf "hive.metastore.warehouse.dir=
hdfs://master:9000/user/hive/warehouse"
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

在这里插入图片描述

启动后启动beeline
bin/beeline   --hiveconf hive.server2.thrift.port=11000 
--conf  hive.metastore.warehouse.dir=
hdfs://master:9000/user/hive/warehouse"

beeline启动后连接thriftserver 注意:别忘把驱动包复制到spark/lib下
!connect jdbc:hive2://localhost:11000	
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

在这里插入图片描述

!quit //退出
!help //获取帮助
thriftserver和普通的spark-shell/spark-sql的区别?
spark-shell,spark-sql都是一个spark application
thriftserver,不管你启动多少个客户端(beeline/code),永远都是一个spark application
解决了一个数据共享的问题,多个客户端可以共享数据;
beeline : 使用它可以实现一个节点多个打开spark-sql

web UI 4040查看job
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

在这里插入图片描述
在spark shell 中操作hdfs 上的数据是很方便的,但是操作也未免过于繁琐,幸好spark 还想用户提供另外两种操作 spark sql 的方式

2 .spark-sql

启动方式比较简单但一个节点只能启动一个

/start-thriftserver.sh --master 类型          
--conf "hive.metastore.warehouse.dir=
hdfs://master:9000/user/hive/warehouse"
  • 1
  • 2
  • 3

在这里插入图片描述

3.scala操作spark-SQL

spark.sql(“SQL语句”)

四 .spark on hive(HiveServer 2)

1. HiveServer 2

在这里插入图片描述
HiveServer2(HS2)是一个服务端接口,使远程客户端可以执行对Hive的查询并返回结果。目前基于Thrift RPC的实现是 HiveServer 的改进版本,并支持多客户端并发和身份验证;

HiveServer、HiveServer2都是基于Thrift的。由于HiveServer不能处理多于一个客户端的并发请求,因此在Hive-0.11.0版本中重写了HiveServer代码得到了HiveServer2。

HiveServer2支持多客户端的并发和认证,为开放API客户端如JDBC、ODBC提供了更好的支持。

  1. 正常的hive仅允许使用HiveQL执行查询、更新等操作,并且该方式比较笨拙单一。Hive提供了轻客户端的实现,通过HiveServer或者HiveServer2,客户端可以在不启动CLI的情况下对Hive中的数据进行操作,两者都允许远程客户端使用多种编程语言如Java、Python向Hive提交请求,取回结果;
  2. 可以实现远程访问;
  3. 可以通过命令链接多个hive;

生产环境中使用Hive,建议使用HiveServer2来提供服务,好处很多:

  1. 在应用端不用部署Hadoop和Hive客户端;
  2. 相比hive-cli方式,HiveServer2不用直接将HDFS和Metastore暴露给用户;
  3. 有安全认证机制,并且支持自定义权限校验;
  4. 有HA机制,解决应用端的并发和负载均衡问题;
  5. JDBC方式,可以使用任何语言,方便与应用进行数据交互;
  6. 从2.0开始,HiveServer2提供了WEB UI。

2. beline

beeline是从 Hive 0.11版本引入的,是Hive新的命令行客户端工具;Hive客户端工具后续将使用beeline 替代HiveCLI ,并且后续版本将也会废弃HiveCLI 客户端工具;
beeline方式相当于瘦客户端模式,采用JDBC方式借助于Hive Thrift服务访问Hive数据仓库;
从Hive 0.14版本开始,Beeline使用HiveServer2工作时,它会从HiveServer2输出日志信息到STDERR;

Beeline 要与 HiveServer2 配合使用;需要启动 HiverServer2 (在node3);
hive --service hiveserver2 &
Hiveserver2 &

使用Beeline:
启动beeline;
!connect jdbc:hive2://node3:10000
jdbc:hive2://:/
默认用户名(spark)、密码不验证(hive.server2.authentication缺省值为NONE)
执行SQL命令

检查端口:lsof –i:10000
备注:启动 hiveserver2 后立即检查,看不见任何信息;Beeline连接后检查才能看见。

退出beeline命令行则是!quit, 很多命令都是前面需要加一个感叹号, 但对于登录了后的DDL,DML,则直接运行SQL语句即可,语句后带上一个分号,然后回车执行;

Beeline和其他工具有一些不同,执行查询都是正常的SQL输入,但是如果是一些管理的命令,比如进行连接,中断,退出,执行Beeline命令需要带上“!”,不需要终止符。如:
1、!connect url –连接不同的Hive2服务器
2、!exit –退出shell
3、!help –显示全部命令列表

备注:
beeline在我的机器上可能有两个:$ HIVE_HOME/bin、$SPARK_HOME/bin
在这里插入图片描述在这里插入图片描述

3. Spark Thrift Server

Thrift JDBC/ODBC Server (简称 Spark Thrift Server 或者 STS)是Spark SQL的Apache Hive HiveServer2的端口,通过这个端口可以允许用户使用JDBC/ODBC端口协议来执行SQL查询;

通过使用STS,用户可以用使用其他的BI工具,比如Tableau来连接Spark进行基于大数据量的报表制作;

Thrift Server在启动的时候,启动了一个SparkSQL的应用程序,而通过JDBC/ODBC连接进来的客户端共同分享SparkSQL应用程序的资源;

Thrift Server启动时还开启一个侦听器,等待JDBC客户端的连接和提交查询;

在配置Thrift Server的时候,通常要配置Thrift Server的主机名和端口,如果要使用hive数据的话,还要提供hive metastore的uris;

五. Spark Thrift Server配置 & 运行

1、$SPARK_HOME/conf/hive-site.xml(与前面的配置相同)(node2)

<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
  <property>
    <name>hive.metastore.uris</name>
    <value>thrift://node3:9083,thrift://node1:9083</value>
  </property>
  <property>
    <name>hive.metastore.warehouse.dir</name>
    <value>/user/hive/warehouse</value>
   </property>
</configuration>
备注:仅配置了hive.metastore.uris的信息,其他均采用默认配置
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

2、启动hive metastore服务(node3 或 node1)

hive --service metastore&

3、启动Spark ThriftServer(node2)/home/xdl/spark-2.3.3-bin-hadoop2.7/conf start-thriftserver.sh

在这里插入图片描述

4、检查日志(node2)在node2中查看日志如下所示:

在这里插入图片描述

5、检查进程(node2)

在这里插入图片描述
9678:启动了SparkSubmit
9741:根据当前配置,在node1、node2、node3上均启动了executor

6、检查端口(缺省端口号是10000,可配置)

在这里插入图片描述
在这里插入图片描述

beeline连接hiveserver2报错。

错误:Error: Could not open client transport with JDBC Uri: jdbc:hive2://s1:10000/hive: Failed to open new session: java.lang.RuntimeException: org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.authorize.AuthorizationException): User: xxx is not allowed to impersonate anonymous (state=08S01,code=0)

解决方案
在hadoop的配置文件core-site.xml增加如下配置,重启hdfs,其中“xxx”是连接beeline的用户,将“xxx”替换成自己的用户名即可

  • 表示可通过超级代理“xxx”操作hadoop的用户、用户组和主机

    hadoop.proxyuser.xxx.hosts
    *
hadoop.proxyuser.xxx.groups * 参考: https://blog.csdn.net/jiangyonggang1/article/details/87261092

Spark Thrift Server配置 & 运行

Beeline执行连接到RDBMS:
!connect jdbc:mysql://master:3306/metastore hive hive
show databases;

SparkSQL通过jdbc访问hive

import java.sql.DriverManager

object SparkSQLThriftServer {
  def main(args: Array[String]): Unit = {
    // 添加驱动
    val driver =  "org.apache.hive.jdbc.HiveDriver"
    Class.forName(driver)

    // 获取connection
    val (url, username, password) = ("jdbc:hive2://master:10000", "lyb", "lyb")
    val connection=  DriverManager.getConnection(url, username, password)

    val sql =  "SELECT count(*) as mycount FROM test1.test"
    // 获取statement
    val statement= connection.prepareStatement(sql)

    // 获取结果
    val res = statement.executeQuery()
    while(res.next()){
      println(s"${res.getString("mycount")}")
    }

    // 关闭资源
    res.close()
    statement.close()
    connection.close()
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28

六.UDF

UDF: 自定义函数。函数的输入、输出都是一条数据记录,类似于Spark SQL中普通的数学或字符串函数,从实现上看就是普通的Scala函数;
为了解决一些复杂的计算,并在SQL函数与Scala函数之间左右逢源
UDF的参数视为数据表的某个列;
书写规范:

1.注册版

  1. import spark.implicits._
  2. def funName(参数:类型)={函数体} //自定义函数
  3. spark.udf.register(“fun1”, funName _ )
    fun1 :是sql中要用的函数
    funName _ :自定义的函数名+空格+下划线
    // 注册函数
    4)val x=spark.sql(“select id, fun1(colname) from tbName ”)

2.非注册版

  1. import org.apache.spark.sql.functions._
    import spark.implicits._
  2. val fun2=udf((参数:类型,length:Int)=>参数.length>length)
  3. val getData=DataFrame类型数据.filter(fun2($ ”参数”,lit(10)))
    $ : 可以接收的数据会当成Column对象($符号来包裹一个字符串表示一个Column)
    当不用注册时要有udf包住自定义函数—>udf函数

3. 案例

import org.apache.spark.sql.{Row, SparkSession}

object UDFDemo {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder()
      .appName("UDFDemo")
      .master("local[*]")
      .getOrCreate()
    spark.sparkContext.setLogLevel("WARN")

    val data = List(("scala", "author1"), ("spark", "author2"), ("hadoop", "author3"), ("hive", "author4"), ("strom", "author5"), ("kafka", "author6"))
    val df = spark.createDataFrame(data).toDF("title", "author")
    df.createTempView("books")

    // 定义函数并注册
    def len1(bookTitle: String):Int = bookTitle.length
    spark.udf.register("len1", len1 _)
    // UDF可以在select语句、where语句等多处使用
    spark.sql("select title, author, len1(title) from books").show
    spark.sql("select title, author from books where len1(title)>5").show

    // UDF可以在DataFrame、Dataset的API中使用
    import spark.implicits._
    df.filter("len1(title)>5").show
    // 不能通过编译
    //df.filter(len1($"title")>5).show
    // 能通过编译,但不能执行
    //df.select("len1(title)").show
    // 不能通过编译
    //df.select(len1($"title")).show

    // 如果要在DSL语法中使用$符号包裹字符串表示一个Column,需要用udf方法来接收函数。这种函数无需注册
    import org.apache.spark.sql.functions._
    val len2 = udf((bookTitle: String) => bookTitle.length)
    df.filter(len2($"title")>5).show
    df.select(len2($"title")).show

    // 不使用UDF
    df.map{case Row(title: String, author: String) => (title, author, title.length)}.show

    spark.stop()
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43

七. UDAF

UDAF :用户自定义聚合函数。函数本身作用于数据集合,能够在聚合操作的基础上进行自定义操作(多条数据输入,一条数据输出);类似于在group by之后使用的sum、avg等函数
在这里插入图片描述
在这里插入图片描述

abstract class UserDefinedAggregateFunction extends Serializable{
def inputSchema : StructType
//inputSchema用于定义与DataFrame列有关的输入样式

def bufferSchema : StructType
//bufferSchema用于定义存储聚合运算时产生的中间数据结果的Schema;

def dataType : DataFrame
//dataType标明了UDAF函数的返回值类型;

def deterministic : Boolean
//deterministic是一个布尔值,用以标记针对给定的一组输入,UDAF是否总是生成相同的结果;

def initialize ( buffer : MutableAggregationBuffer) : Unit
//initialize对聚合运算中间结果的初始化;

def update ( buffer : MutableAggregationBuffer , input :Row) :Unit
//update函数的第一个参数为bufferSchema中两个Field的索引,默认以0开始;
UDAF的核心计算都发生在update函数中;
update函数的第二个参数input: Row对应的并非DataFrame的行,
而是被inputSchema投影了的行;

def merge (buffer1 : MutableAggregationBuffer , buffer2 : Row):Unit
//merge函数负责合并两个聚合运算的buffer,再将其存储到MutableAggregationBuffer中;

def evluate ( buffer :Row ): Any                       
//evaluate函数完成对聚合Buffer值的运算,得到最终的结果
 }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28

普通的UDF不支持数据的聚合运算。如当要对销售数据执行年度同比计算,就需要对当年和上一年的销量分别求和,然后再利用同比公式进行计算。
书写UDAF 先继承UserDefinedAggregateFunction接口
在重写他的方法
def update ( buffer : MutableAggregationBuffer , input :Row) :Unit
// UDAF的核心计算都发生在update函数中。
// 扫描每行数据,都会调用一次update,输入buffer(缓存中间结果)、input(这一行的输入值)
// update函数的第一个参数为bufferSchema中两个Field的索引,默认以0开始
// update函数的第二个参数input: Row对应的是被inputSchema投影了的行。
// 本例中每一个input就应该只有两个Field的值,input(0)代表销量,input(1)代表销售日期

案例

class YearOnYearBasis extends UserDefinedAggregateFunction {
  // UDAF与DataFrame列有关的输入样式
  override def inputSchema: StructType 
  				= new StructType()
				  .add("sales", DoubleType)
				  .add("saledate", StringType)

  // UDAF函数的返回值类型
  override def dataType: DataType = DoubleType

  // 缓存中间结果
  override def bufferSchema: StructType 
  					= new StructType()
  					.add("year2014", DoubleType)
  					.add("year2015", DoubleType)

  // 布尔值,用以标记针对给定的一组输入,UDAF是否总是生成相同的结果。通常用true
  override def deterministic: Boolean = true

  // initialize就是对聚合运算中间结果的初始化
  override def initialize(buffer: MutableAggregationBuffer): Unit = {
    buffer(0) = 0.0
    buffer(1) = 0.0
  }

  // UDAF的核心计算都发生在update函数中。
  // 扫描每行数据,都会调用一次update,输入buffer(缓存中间结果)、input(这一行的输入值)
  // update函数的第一个参数为bufferSchema中两个Field的索引,默认以0开始
  // update函数的第二个参数input: Row对应的是被inputSchema投影了的行。
  // 本例中每一个input就应该只有两个Field的值,input(0)代表销量,input(1)代表销售日期
  override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {	  
    val salenumber = input.getAs[Double](0)
    input.getString(1).take(4) match {
      case "2014" => buffer(0) = buffer.getAs[Double](0) + salenumber
      case "2015" => buffer(1) = buffer.getAs[Double](1) + salenumber
      case _ => println("ERROR!")
    }
  }

  // 合并两个分区的buffer1、buffer2,将最终结果保存在buffer1中
  override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
    buffer1(0) = buffer1.getDouble(0) + buffer2.getDouble(0)
    buffer1(1) = buffer1.getDouble(1) + buffer2.getDouble(1)
  }

  // 取出buffer(缓存的值)进行运算,得到最终结果
  override def evaluate(buffer: Row): Double = {
    println(s"evaluate : ${buffer.getDouble(0)}, ${buffer.getDouble(1)}")
    if (buffer.getDouble(0) == 0.0) 0.0
    else (buffer.getDouble(1) - buffer.getDouble(0)) / buffer.getDouble(0)
  }
}

object UDAFDemo {
  def main(args: Array[String]): Unit = {
    Logger.getLogger("org").setLevel(Level.WARN)
    val spark = SparkSession.builder()
      .appName(s"${this.getClass.getCanonicalName}")
      .master("local[*]")
      .getOrCreate()

    val sales = Seq(
      (1, "Widget Co",        1000.00, 0.00,    "AZ", "2014-01-02"),
      (2, "Acme Widgets",     2000.00, 500.00,  "CA", "2014-02-01"),
      (3, "Widgetry",         1000.00, 200.00,  "CA", "2015-01-11"),
      (4, "Widgets R Us",     2000.00, 0.0,     "CA", "2015-02-19"),
      (5, "Ye Olde Widgete",  3000.00, 0.0,     "MA", "2015-02-28") )

    val salesDF = spark.createDataFrame(sales).toDF("id", "name", "sales", "discount", "state", "saleDate")
    salesDF.createTempView("sales")

    val yearOnYear = new YearOnYearBasis
    spark.udf.register("yearOnYear", yearOnYear)
    spark.sql("select yearOnYear(sales, saleDate) as yearOnYear from sales").show()

    spark.stop()
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78

在这里插入图片描述

八. 从MySQL读取数据

// 读取数据库中的数据
val jdbcDF = spark.read.format("jdbc").
				option("url", "jdbc:mysql://localhost:3306/spark").
				option("driver","com.mysql.jdbc.Driver").
				option("dbtable", "student").
				option("user", "hive").
				option("password", "hive").load()
jdbcDF.show
jdbcDF.printSchema
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

备注:
1、将jdbc驱动拷贝到$SPARK_HOME/jars目录下,是最简单的做法;
2、明白每一个参数的意思,一个参数不对整个结果出不来;
3、从数据库从读大量的数据进行分析,不推荐;读取少量的数据是可以接受的,也是常见的做法。
在这里插入图片描述在这里插入图片描述在这里插入图片描述在这里插入图片描述在这里插入图片描述

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/繁依Fanyi0/article/detail/634558
推荐阅读
相关标签
  

闽ICP备14008679号