赞
踩
大数据笔记
Zookeeper的安装及使用
安装步骤如下:
[root@localhost ~]# cd /home/software
[root@localhost software]# tar -xvf zookeeper-3.4.10.tar.gz
[root@localhost software]# cd zookeeper-3.4.10
[root@localhost zookeeper-3.4.10]# cd conf
[root@localhost conf]# cp zoo_sample.cfg zoo.cfg
[root@localhost zookeeper-3.4.10]# cd …/bin
开启服务
[root@localhost ~]# cd /home/software/zookeeper-3.4.10/bin
[root@localhost bin]# sh zkServer.sh start
ZooKeeper JMX enabled by default
Using config: /home/software/zookeeper-3.4.10/bin/…/conf/zoo.cfg
Starting zookeeper … STARTED
关闭服务
方式一:通过通过指令关闭
[root@localhost bin]# sh zkServer.sh stop
方式二:通过结束进程关闭
[root@localhost bin]# jps
3809 Jps
3713 QuorumPeerMain
[root@localhost bin]# kill -9 3713
开启客户端
记得最后按Enter键
[root@localhost bin]# sh zkCli.sh
Connecting to localhost:2181
…………
WATCHER::
WatchedEvent state:SyncConnected type:None path:null
[zk: localhost:2181(CONNECTED) 0]
退出客户端
[zk: localhost:2181(CONNECTED) 0] quit
查看根节点下有什么节点
[zk: localhost:2181(CONNECTED) 1] ls /
[zookeeper]
查看zookeeper节点下有什么节点
[zk: localhost:2181(CONNECTED) 2] ls /zookeeper
[quota]
知识点:
1.zk有一个根节点/
2.每一个节点都可以有自己的子节点
3.每一个节点成为znode节点
4.多个znode节点最终形成的成为znode树
5.每一个znode节点都可以存储数据,存储的数据是字节形式
6.zk为了提供快速的数据访问,会将数据维系到内存中
7.zk为了防止数据丢失,也会在磁盘上进行落地存储,存储的路径可以通过配置文件来指定
8.zk的所有操作指令,比如查看、创建节点、更新、删除都是基于路径来操作的。
9.zk中的路径是具有全局唯一性,所以可以基于这个特性来实现命名服务
创建节点
有如下四种类型:
①创建普通节点(注意:必须写初始数据,否则创建不成功)
也叫持久节点(编号0):可以挂载子节点
[zk: localhost:2181(CONNECTED) 3] create /park01 hello1805
Created /park01
[zk: localhost:2181(CONNECTED) 5] ls /
[park01, zookeeper]
可以创建空的初始数据
[zk: localhost:2181(CONNECTED) 8] create /park01/node02 ‘’
Created /park01/node02
②创建临时节点(编号1,不可以挂载子节点。一旦宕机或客户端下线,临时节点被删除)
[zk: localhost:2181(CONNECTED) 22] create -e /park02 ‘’
③创建顺序节点(编号2,会在指定的路径后跟上递增的顺序号)
[zk: localhost:2181(CONNECTED) 0] create -s /node hello
Created /node0000000004
[zk: localhost:2181(CONNECTED) 1] create -s /node hello
Created /node0000000005
④创建临时顺序节点(编号3)
[zk: localhost:2181(CONNECTED) 3] create -e -s /node hello
获取指定节点信息
[zk: localhost:2181(CONNECTED) 11] get /park02
cZxid = 0x7 //创建事务编号,(增删改)编号依次递增
ctime = Sat Oct 13 08:00:05 PDT 2018 //创建时间
mZxid = 0x7 //修改事务编号,编号递增
mtime = Sat Oct 13 08:00:05 PDT 2018 //修改时间
pZxid = 0x7 //最大事务id
cversion = 0 //创建版本
dataVersion = 0 //数据版本,每修改一次,数据版本+1
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 0
numChildren = 0
[zk: localhost:2181(CONNECTED) 12] get /park01
hello1805
cZxid = 0x4
ctime = Sat Oct 13 07:56:35 PDT 2018
mZxid = 0x4
mtime = Sat Oct 13 07:56:35 PDT 2018
pZxid = 0x6
cversion = 2
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 9
numChildren = 2
修改节点信息
[zk: localhost:2181(CONNECTED) 13] set /park01 you
cZxid = 0x4
ctime = Sat Oct 13 07:56:35 PDT 2018
mZxid = 0x8
mtime = Sat Oct 13 08:02:39 PDT 2018
pZxid = 0x6
cversion = 2
dataVersion = 1
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 3
numChildren = 2
再次查看数据,查看变化
[zk: localhost:2181(CONNECTED) 14] get /park01
you
cZxid = 0x4
ctime = Sat Oct 13 07:56:35 PDT 2018
mZxid = 0x8
mtime = Sat Oct 13 08:02:39 PDT 2018
pZxid = 0x6
cversion = 2
dataVersion = 1
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 3
numChildren = 2
事物id全局递增
删除节点
[zk: localhost:2181(CONNECTED) 15] delete /park02
包含子节点不能直接删除,可以用rmr指令递归删除
[zk: localhost:2181(CONNECTED) 20] rmr /park01
分布式存在的问题
1.需要设置管理节点
2.如果管理节点只有一个,那么就存在单点问题
3.为了解决单点问题,引入管理集群,就会产生多个管理者的情况
4.需要在管理集群中选出leader
5.确定一套选举机制
6.管理节点之间实现信息的共享
7.需要一套机制确定一个唯一的leader
8.分布式锁:死锁,活锁(没有把持资源,占用CPU)
集群配置
1.克隆出另外2台虚拟机
2.创建存放数据文件的目录
[root@localhost zookeeper-3.4.10]# mkdir tmp
3.创建文件myid,写入server的id(1或2或3……)
[root@localhost zookeeper-3.4.10]# cd tmp
[root@localhost tmp]# vim myid
4.修改配置文件
[root@localhost bin]# vim …/conf/zoo.cfg
注意:服务器编号唯一。
5.依次开启三台虚拟机的Zookeeper服务,一般第二个开启的为leader,其它为follower。可以通过sh zkServer.sh status命令查看主从状态。
6.通过sh zkCli.sh命令进入客户端。在任一台虚拟机进行的操作,其他集群虚拟机同步生效。
注意:
①提示started并不表示真的启动成功了,查看status状态可以确认是否启动成功。
②如果集群只启动一个节点,默认是失败的,至少启动一半以上的节点才能查看状态。
查看日志
Zookeeper日志是二进制格式,无法直接查看。解决办法如下:
进入存放日志的目录
[root@localhost lib]# cd …/tmp/version-2
查看日志文件
[root@localhost version-2]# ls
acceptedEpoch currentEpoch log.100000001 snapshot.100000000
需要先把slf4j-api-1.6.1.jar和zookeeper-3.4.10.jar复制到存放日志的version-2目录下
[root@localhost version-2]# java -classpath .:slf4j-api-1.6.1.jar:zookeeper-3.4.10.jar org.apache.zookeeper.server.LogFormatter ./log.100000001
nc的安装和使用
两种安装方法:
1.直接yum获取安装
[root@localhost software]# yum install nc
2.下载rpm安装包后安装
[root@localhost software]# rpm -ivh nc-1.84-24.el6.x86_64.rpm
常用指令
查看端口2181(Zookeeper)配置信息
[root@localhost software]# echo conf|nc 127.0.0.1 2181
clientPort=2181
dataDir=/home/software/zookeeper-3.4.10/tmp/version-2
dataLogDir=/home/software/zookeeper-3.4.10/tmp/version-2
tickTime=2000
maxClientCnxns=60
minSessionTimeout=4000
maxSessionTimeout=40000
serverId=3
initLimit=10
syncLimit=5
electionAlg=3
electionPort=3888
quorumPort=2888
peerType=0
查看2181端口状态
[root@localhost software]# echo stat|nc 192.168.157.130 2181
Zookeeper version: 3.4.10-39d3a4f269333c922ed3db283be479f9deacaa0f, built on 03/23/2017 10:13 GMT
Clients:
/192.168.157.132:446430
Latency min/avg/max: 0/0/0
Received: 1
Sent: 0
Connections: 1
Outstanding: 0
Zxid: 0x300000002
Mode: follower
Node count: 5
测试是否启动了该Server,若回复imok表示已经启动
[root@localhost software]# echo ruok|nc 127.0.0.1 2181
关掉server
[root@localhost software]# echo kill | nc 127.0.0.1 2181
Hadoop的安装使用
安装前准备工作
1.关闭防火墙
service iptables stop
chkconfig iptables off
2.配置主机名
注意:安装hadoop的集群主机名不能重复!不能有下划线!不然会找不到主机,无法启动!
vi /etc/sysconfig/network
NETWORKING=yes
HOSTNAME=hadoop01
source /etc/sysconfig/network
3.配置Hosts
vi /etc/hosts
填入以下内容
127.0.0.1 hadoop01
192.168.157.131 hadoop01
4.因使用过程会多次要求输入密码,所以先设置免密登录。
生成自己的公钥和私钥,生成的公私钥将自动存放在/root/.ssh目录下。
[root@localhost software]# ssh-keygen
把生成的公钥copy到本机
[root@localhost software]# ssh-copy-id root@192.168.157.131
安装步骤
5.获取安装包并解压
[root@localhost software]# tar -xvf hadoop-2.7.1_64bit.tar.gz
6.创建目录存放数据
mkdir /home/software/hadoop-2.7.1/tmp
7.修改多处配置文件
[root@localhost software]# cd hadoop-2.7.1/etc/hadoop
[root@localhost hadoop]# vim hadoop-env.sh
修改第25行 export JAVA_HOME=/home/software/jdk1.8.0_65
[root@localhost hadoop]# vim core-site.xml
修改最后两行内容如下:
fs.defaultFS
hdfs://hadoop01:9000
hadoop.tmp.dir
/home/software/hadoop-2.7.1/tmp
[root@localhost hadoop]# vim hdfs-site.xml
最后两行修改如下:
dfs.replication
1
8.复制并修改配置文件mapred-site.xml
[root@localhost hadoop]# cp mapred-site.xml.template mapred-site.xml
[root@localhost hadoop]# vim mapred-site.xml
最后两行修改如下:
mapreduce.framework.name
yarn
[root@localhost hadoop]# vim yarn-site.xml
最后两行修改如下:
<property>
<!--指定yarn的老大resourcemanager的地址-->
<name>yarn.resourcemanager.hostname</name>
<value>hadoop01</value>
</property>
<property>
<!--NodeManager获取数据的方式-->
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle</value>
</property>
如果访问不了有可能是服务器50070端口被关闭了。通过如下方式打开50070端口:
service iptables status #查询防火墙状态
service iptables start #开启防火墙
iptables -I INPUT -p tcp --dport 80 -j ACCEPT #开通特定端口
iptables -I INPUT -p tcp --dport 80 -j DROP #关闭特定端口
service iptables save #保存配置
service iptables restart #重启防火墙
如果多次格式化namenode,那么可能会导致datanode无法启动
通过java代码连接Hadoop
1.在eclipse的安装目录plugins下放置插件hadoop-eclipse-plugin-2.7.1.jar,重启eclipse
2.解压hadoop-2.7.1.tar.gz及hadoop-common-2.7.1-bin-master
3.打开工具栏Window/Prefences,点开Hadoop Map/Reduce一项,填入路径“D:\Java_Developer\hadoop-2.7.1”。
4.右键“我的电脑”→属性→高级系统设置→环境变量→新建→变量名:HADOOP_HOME 变量值:D:\Java_Developer\hadoop-common-2.7.1-bin-master
5.新建→变量值:HADOOP_USER_NAME 变量值:root→更改Path变量
6.新增参数 %HADOOP_HOME%\bin→重启电脑使生效
7.开启hadoop视图步骤:打开工具栏Window→Show View→others→搜索Map/Reduce Locations
8.连接主机到eclipse·
9.最后可以在Project视图看到DFS Location一栏
MapReduce分布式计算框架
MR自定义分区机制
一、自定义分区实现过程
在某些需求中,如果MR默认的分区机制无法完成功能,可以自定义分区规则
实现过程:
1.写一个类集成Partitioner
2.在job设置Partitioner
3.通常还需要修改Reducer的数量
二、自定义分区案例
1.开发自定义分区
2.开发Mapper
3.开发Reducer
4.开发Driver
三、MR的排序
在MR执行的过程中,存在分组排序的过程,可以利用这个过程,实现对海量数据的排序。只要合理的设置k2、k3就可以利用这个机制实现对海量数据的排序。
案例:计算利润
四、Combiner合并
在MR执行的过程中,如果Map中产生的数据有大量键是重复的,则可以在Map段对数据进行合并,从而减少数据量,减少shuffle过程中,网络中传输的数据量,磁盘中读写的数据量,从而提升效率。这样的机制就称之为MR中的Combiner机制。
合理的利用Combiner可以有效的提升程序的性能。但Combiner不是万能的,在使用Combiner的时候,一定要保证,无论是否有Combiner,无论Combiner执行多少次。都应该保证不影响最终的执行结果。
Shuffle
Buffer环形缓冲区,默认100MB,溢出比0.8.
Hadoop调优
简单总结如下:
1.禁止格式化namenode
2.调大心跳机制时间
3.手动合并元文件
4.调大文件上传下载缓存大小(4M)
5.spilt溢出缓冲区调大
6.调整map完成率
7.调整merge文件合并因子
8.压缩map输出文件
9.启动map任务的推测执行机制(慢任务)
flume的安装和使用
1.解压安装包到指定目录
cd /home/software
tar-xvf apache-flume-1.6.0-bin.tar.gz
2.进入conf目录
cd apache-flume-1.6.0-bin/conf
3.根据需要新增配置文件
配置详解:
#配置Agent a1 的组件
a1.sources=r1
a1.channels=c1 (可以配置多个,以空格隔开,名字自己定)
a1.sinks=s1 (可以配置多个,以空格隔开,名字自己定)
#描述/配置a1的r1
a1.sources.r1.type=netcat (netcat表示通过指定端口来访问)
a1.sources.r1.bind=0.0.0.0 (表示本机)
a1.sources.r1.port=44444 (指定的端口,此端口不固定,但是不要起冲突)
#描述a1的s1
a1.sinks.s1.type=logger (表示数据汇聚点的类型是logger日志)
#描述a1的c1
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000 (表示事件event存储在信道中的最大数量)
a1.channels.c1.transactionCapacity=100 (每个事务中的最大事件数)
#位channel 绑定 source和sink
a1.sources.r1.channels=c1 (一个source是可以对应多个通道的)
a1.sinks.s1.channel=c1 (一个sink,只能对应一个通道)
例 vim avro.conf
# 将agent组件起名
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# 配置source a1.sources.r1.type = avro a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 4444 # 配置sink a1.sinks.k1.type = logger # 配置channel a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # 绑定channel-source, channel-sink a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
4.根据指定的配置文件,来启动flume
[root@hadoop01 conf]# …/bin/flume-ng agent -n a1 -c ./ -f ./avro.conf -Dflume.root.logger=INFO,console
启动成功提示如下:
串联flume
1.配置上游虚拟机hadoop01
vim AvroSink.conf
# 将agent组件起名
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# 配置source a1.sources.r1.type = netcat a1.sources.r1.hostname = 0.0.0.0 a1.sources.r1.port = 8888 # 配置sink a1.sinks.k1.type = avro a1.sinks.k1.hostname = 192.168.157.131 a1.sinks.k1.port = 8888 # 配置channel a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # 绑定channel-source, channel-sink a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
2.配置下游虚拟机hadoop02
[root@hadoop02 conf]# vim avro.conf
# 将agent组件起名
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# 配置source a1.sources.r1.type = avro a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 8888 # 配置sink a1.sinks.k1.type = logger # 配置channel a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # 绑定channel-source, channel-sink a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
3.先启动下游flume
[root@hadoop02 conf]# …/bin/flume-ng agent -n a1 -c ./ -f ./avro.conf -Dflume.root.logger=INFO,console
4.再启动上游flume
[root@hadoop02 conf]# …/bin/flume-ng agent -n a1 -c ./ -f ./AvroSink.conf -Dflume.root.logger=INFO,console
5.上游nc测试
[root@hadoop01 conf]# nc hadoop01 8888
mygod
OK
6.下游收到消息
2018-11-15 02:26:10,035 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{} body: 6D 79 67 6F 64 mygod }
扇出配置
1.创建上游hadoop01(IP地址:192.168.157.130)配置文件
vim shanchu.conf
#命名Agent a1的组件
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2
#描述/配置Source a1.sources.r1.type = netcat a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 8888 #描述Sink #配置第一台扇出的虚拟机 a1.sinks.k1.type = avro a1.sinks.k1.hostname = 192.168.157.131 a1.sinks.k1.port = 8888 #配置第二台扇出的虚拟机 a1.sinks.k2.type = avro a1.sinks.k2.hostname = 192.168.157.132 a1.sinks.k2.port = 8888 #描述内存Channel a1.channels.c1.type = memory a1.channels.c2.type = memory #为Channle绑定Source和Sink a1.sources.r1.channels = c1 c2 a1.sinks.k1.channel = c1 a1.sinks.k2.channel = c2
2.创建两台下游配置文件
[root@hadoop02 conf]# vim avro.conf
# 将agent组件起名
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# 配置source a1.sources.r1.type = avro a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 8888 # 配置sink a1.sinks.k1.type = logger # 配置channel a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # 绑定channel-source, channel-sink a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
3.先启动下游虚拟机,再启动上游虚拟机
4.上游测试
[root@hadoop01 conf]# nc hadoop01 8888
test for shanchu
OK
5.两台下游虚拟机均收到信息
Event: { headers:{} body: 74 65 73 74 20 66 6F 72 20 73 68 61 6E 63 68 75 test for shanchu }
Hive的安装和使用
安装步骤:
1.获取安装包
2.解压
[root@hadoop02 software]# tar -xvf apache-hive-1.2.0-bin.tar.gz
3.启动hadoop
[root@hadoop02 software]# start-all.sh
4.进入bin目录执行hive命令
[root@hadoop02 software]# cd apache-hive-1.2.0-bin/bin
[root@hadoop02 bin]# hive
Logging initialized using configuration in jar:file:/home/software/apache-hive-1.2.0-bin/lib/hive-common-1.2.0.jar!/hive-log4j.properties
hive>
常用命令:
需先进入客户端 hive>
$show databases;
执行后发现默认有一个库default
$show tables;
发现没有任何表,证明不use其他库时,默认就是default库。
$create database tedu;
发现在hdfs中多出了/user/hive/warehouse/tedu.db目录
结论1:hive中的数据库对应hdfs中/user/hive/warehouse目录下以.db结尾的目录。
$use tedu;
$create table student (id int,name string);
$show tables;
$desc student;
$show create table student;
发现正确创建出来了表。
发现在hdfs中多出了/user/hive/warehouse/tedu.db/sutdent目录
结论2:hive中的表对应hdfs/user/hive/warehouse/[db目录]中的一个目录
$load data local inpath ‘…/mydata/student.txt’ into table student;
发现/user/hive/warehouse/tedu.db/sutdent下多出了文件
$select * from student;
发现查出的数据不正确,原因是建表时没有指定分隔符。默认的分隔符是空格。
$create table student2 (id int,name string) row format delimited fields terminated by ‘\t’;
$load data local inpath ‘…/mydata/student.txt’ into table student2;
$select * from student2;
发现正确查询出了数据。
结论3:hive中的数据对应当前hive表对应的hdfs目录中的文件中的数据。
$select count(*) from student;
发现执行了mapreduce作业,最终现实了结果
结论4:hive会将命令转换为mapreduce执行。
$use default;
$create table teacher(id int,name string);
发现在hive对应的目录下多出了 tedu.db 文件夹,其中包含user文件夹。
结论5:hive默认的default数据库直接对应/user/hive/warehouse目录,在default库中创建的表直接会在该目录下创建对应目录。
创建外部表的命令:
进入hive,执行:
create external table stu (id int,name string) row format delimited fields terminated by ’ ’ location ‘/目录路径’
示例
将从表t3查询到的数据插入表t2中。注意:要求t2表列数与查询结果列数一致。
INSERT OVERWRITE TABLE t2 SELECT t3.c2, count(1) FROM t3 WHERE t3.c1 <= 20 GROUP BY t3.c2
将从表t3查询到的数据写入到HDFS系统/output_dir目录下(目录不存在时自动创建),自动生成查询结果文件000000_0
INSERT OVERWRITE DIRECTORY ‘/output_dir’ SELECT t3.c2, avg(t3.c1) FROM t3 WHERE t3.c1 > 20 AND t3.c1 <= 30 GROUP BY t3.c2
将从表t3查询到的数据写入到本地文件(当前主机)系统/home/dir目录下(目录不存在时自动创建),自动生成查询结果文件000000_0
INSERT OVERWRITE LOCAL DIRECTORY ‘/home/dir’ SELECT t3.c2, sum(t3.c1) FROM t3 WHERE t3.c1 > 30 GROUP BY t3.c2;
从本地导入数据到表st。注意:如果表st原本有数据会被全部覆盖。
load data local inpath ‘/home/software/database/students.txt’ overwrite into table st;
查询与“刘晨”在同一个系学习的学生
hive> select s1.Sname from student s1 left semi join student s2 on s1.Sdept=s2.Sdept and s2.Sname=‘刘晨’;
注意比较:
1.左表信息全部显示,右表未符号条件的显示NULL
select * from student s1 left join student s2 on s1.Sdept=s2.Sdept and s2.Sname=‘刘晨’;
2.左表未符号条件的显示NULL,右表信息全部显示
select * from student s1 right join student s2 on s1.Sdept=s2.Sdept and s2.Sname=‘刘晨’;
查询结果如下:
NULL NULL NULL NULL NULL 95001 李勇 男 20 CS
95002 刘晨 女 19 IS 95002 刘晨 女 19 IS
95004 张立 男 19 IS 95002 刘晨 女 19 IS
95017 王风娟 女 18 IS 95002 刘晨 女 19 IS
95018 王一 女 19 IS 95002 刘晨 女 19 IS
95019 邢小丽 女 19 IS 95002 刘晨 女 19 IS
95020 赵钱 男 21 IS 95002 刘晨 女 19 IS
NULL NULL NULL NULL NULL 95003 王敏 女 22 MA
NULL NULL NULL NULL NULL 95004 张立 男 19 IS
NULL NULL NULL NULL NULL 95005 刘刚 男 18 MA
NULL NULL NULL NULL NULL 95006 孙庆 男 23 CS
NULL NULL NULL NULL NULL 95007 易思玲 女 19 MA
NULL NULL NULL NULL NULL 95008 李娜 女 18 CS
NULL NULL NULL NULL NULL 95009 梦圆圆 女 18 MA
NULL NULL NULL NULL NULL 95010 孔小涛 男 19 CS
NULL NULL NULL NULL NULL 95011 包小柏 男 18 MA
NULL NULL NULL NULL NULL 95012 孙花 女 20 CS
NULL NULL NULL NULL NULL 95013 冯伟 男 21 CS
NULL NULL NULL NULL NULL 95014 王小丽 女 19 CS
NULL NULL NULL NULL NULL 95015 王君 男 18 MA
NULL NULL NULL NULL NULL 95016 钱国 男 21 MA
NULL NULL NULL NULL NULL 95017 王风娟 女 18 IS
NULL NULL NULL NULL NULL 95018 王一 女 19 IS
NULL NULL NULL NULL NULL 95019 邢小丽 女 19 IS
NULL NULL NULL NULL NULL 95020 赵钱 男 21 IS
NULL NULL NULL NULL NULL 95021 周二 男 17 MA
NULL NULL NULL NULL NULL 95022 郑明 男 20 MA
3.仅显示符合条件部分,两个表拼接在一起。
select * from student s1 inner join student s2 on s1.Sdept=s2.Sdept and s2.Sname=‘刘晨’;
查询结果如下:
95002 刘晨 女 19 IS 95002 刘晨 女 19 IS
95004 张立 男 19 IS 95002 刘晨 女 19 IS
95017 王风娟 女 18 IS 95002 刘晨 女 19 IS
95018 王一 女 19 IS 95002 刘晨 女 19 IS
95019 邢小丽 女 19 IS 95002 刘晨 女 19 IS
95020 赵钱 男 21 IS 95002 刘晨 女 19 IS
4.仅显示符合条件的左表
select * from student s1 left semi join student s2 on s1.Sdept=s2.Sdept and s2.Sname=‘刘晨’;
查询结果如下:
95002 刘晨 女 19 IS
95004 张立 男 19 IS
95017 王风娟 女 18 IS
95018 王一 女 19 IS
95019 邢小丽 女 19 IS
95020 赵钱 男 21 IS
5.系统报错
select s1.Sname from student s1 right semi join student s2 on s1.Sdept=s2.Sdept and s2.Sname=‘刘晨’;
Sqoop
安装步骤:
1.准备sqoop安装包,官网地址:http://sqoop.apache.org
2.配置jdk环境变量和Hadoop的环境变量。因为sqoop在使用是会去找环境变量对应的路径,从而完整工作。
3.sqoop解压即可使用(前提是环境变量都配好了)
4.需要将要连接的数据库的驱动包加入sqoop的lib目录下(本例中用的是mysql数据库)
5.利用指令操作sqoop
Sqoop基础指令(在Sqoop的bin目录下执行下列指令)
说明 指令示例
查看mysql所有数据库 sh sqoop list-databases --connect jdbc:mysql://hadoop02:3306/ -username root -password root
查看指定数据库下的所有表 sh sqoop list-tables --connect jdbc:mysql://hadoop02:3306/hive -username root -password root
关系型数据库==>hdfs
sh sqoop import -help(查看import的帮助指令) 实现步骤:
1.现在mysql数据库的test数据下建立一张tabx表,并插入测试数据
建表:create table tabx (id int,name varchar(20));
插入:insert into tabx (id,name) values (1,‘aaa’),(2,‘bbb’),(3,‘ccc’),(1,‘ddd’),(2,‘eee’),(3,‘fff’);
2.进入到sqoop的bin目录下,执行导入语句
导入:(会在hadoop下自动创建文件)
sh sqoop import --connect jdbc:mysql://192.168.157.130:3306/test --username root --password root --table tabx --target-dir ‘/sqoop/tabx’ --fields-terminated-by ‘|’ -m 1;
hdfs==>关系型数据库 执行:sh sqoop export --connect jdbc:mysql://192.168.157.130:3306/test --username root --password root --export-dir ‘/sqoop/tabx/part-m-00000’ --table taby -m 1 --fields-terminated-by ‘|’
注:sqoop只能导出数据,不能自动建表。所以在导出之前,要现在mysql数据库里建好对应的表
zebra项目
1.查询流量排名前十
select
hourid,
appType,
sum(totalTraffic) as tt
from
D_H_HTTP_APPTYPE
where
hourid=’2018-10-31’
group by
hourid,appType
order by
tt desc
limit
10;
3.指定日期指定应用大类中应用小类流量排名前十
select
hourid,
appType
appSubType,
totalTraffic
from
D_H_HTTP_APPTYPE
where
hourid=’2018-10-31’
and
appType=1
order by
totalTraffic desc
limit 10;
Hbase
NoSQL=not only structured Query Language非关系型数据
半结构化数据,按列进行存储
安装配置:
1.获取安装包解压
tar -xvf hbase-0.98.17-hadoop2-bin.tar.gz
2.进入conf目录修改配置文件
cd /home/software/hbase-0.98.17-hadoop2/conf
vim hbase-site.xml
hbase.rootdir
hdfs://hadoop01:9000/hbase
dfs.replication
1
hbase.cluster.distributed
true
hbase.zookeeper.quorum
hadoop01:2181,hadoop02:2181,hadoop03:2181
[root@hadoop03 conf]# vim hbase-env.sh
修改Page 27 export JAVA_HOME=/home/software/jdk1.8.0_65
修改Page 122 export HBASE_MANAGES_ZK=false
[root@hadoop01 conf]# vim regionservers
hadoop01
hadoop02
hadoop03
复制配置好的目录到其他虚拟机(前提:这三台虚拟机处于互通状态)
scp -r hbase-0.98.17-hadoop2 root@hadoop02:/home/software
scp -r hbase-0.98.17-hadoop2 root@hadoop03:/home/software
额外配置
检查主机名
cat /etc/sysconfig/network
NETWORKING=yes
HOSTNAME=hadoop02
修改hosts文件
vim /etc/hosts
127.0.0.1 localhost
::1 localhost
192.168.157.130 hadoop01
192.168.157.131 hadoop02
192.168.157.132 hadoop03
设置环境变量
作用:使hbase命令在任何文件夹下都能直接执行
vim /etc/profile
追加内容如下:
export HB_HOME=/home/software/hbase-0.98.17-hadoop2
export PATH=
P
A
T
H
:
.
/
:
PATH:./:
PATH:./:JAVA_HOME/bin:
N
O
D
E
H
O
M
E
/
b
i
n
:
NODE_HOME/bin:
NODEHOME/bin:HADOOP_HOME/sbin:
H
A
D
O
O
P
H
O
M
E
/
b
i
n
:
HADOOP_HOME/bin:
HADOOPHOME/bin:ZK_HOME/bin:$HB_HOME/bin
开启服务
1.启动hadoop服务start-all.sh
2.开启Zookeeper服务
[root@localhost ~]# cd /home/software/zookeeper-3.4.10/bin
[root@localhost bin]# sh zkServer.sh start
3.先进入bin目录
cd /home/software/hbase-0.98.17-hadoop2/bin
4.执行start-hbase.sh命令
[root@hadoop01 bin]# start-hbase.sh
starting master, logging to /home/software/hbase-0.98.17-hadoop2/bin/…/logs/hbase-root-master-hadoop01.out
hadoop01: starting regionserver, logging to /home/software/hbase-0.98.17-hadoop2/bin/…/logs/hbase-root-regionserver-hadoop01.out
hadoop02: starting regionserver, logging to /home/software/hbase-0.98.17-hadoop2/bin/…/logs/hbase-root-regionserver-hadoop02.out
localhost: starting regionserver, logging to /home/software/hbase-0.98.17-hadoop2/bin/…/logs/hbase-root-regionserver-hadoop01.out
hadoop03: starting regionserver, logging to /home/software/hbase-0.98.17-hadoop2/bin/…/logs/hbase-root-regionserver-hadoop03.out
5.查看状态,发现多了两个进程HMaster和HRegionServer
[root@hadoop01 bin]# jps
22642 HMaster
22804 HRegionServer
12405 NodeManager
12134 SecondaryNameNode
22920 Jps
11849 NameNode
11949 DataNode
12302 ResourceManager
关闭服务
stop-hbase.sh
通过shell脚步访问
[root@hadoop01 bin]# hbase shell
2018-10-31 15:18:35,375 INFO [main] Configuration.deprecation: hadoop.native.lib is deprecated. Instead, use io.native.lib.available
HBase Shell; enter ‘help’ for list of supported commands.
Type “exit” to leave the HBase Shell
Version 0.98.17-hadoop2, rd5f8300c082a75ce8edbbe08b66f077e7d663a4a, Fri Jan 15 22:46:43 PST 2016
hbase(main):001:0>
查看状态
若服务有问题会一堆报错信息。
hbase(main):001:0> status
显示如下信息表示已成功开启
1 active master, 0 backup masters, 1 servers, 0 dead, 2.0000 average load
操作指令:
shell命令 描述
alter 修改列族(column family)模式
count 统计表中行的数量
create 创建表
describe 显示表相关的详细信息
delete 删除指定对象的值(可以为表,行,列对应的值,另外也可以指定时间戳的值)
deleteall 删除指定行的所有元素值
disable 使表无效
drop 删除表
enable 使表有效
exists 测试表是否存在
exit 退出hbase shell
get 获取行或单元(cell)的值
incr 增加指定表,行或列的值
list 列出hbase中存在的所有表
put 向指向的表单元添加值
tools 列出hbase所支持的工具
scan 通过对表的扫描来获取对用的值
status 返回hbase集群的状态信息
shutdown 关闭hbase集群(与exit不同)
truncate 重新创建指定表
version 返回hbase版本信息
注意:shutdown与exit之间的不同:shutdown表示关闭hbase服务,必须重新启动hbase才可以恢复,exit只是退出hbase shell,退出之后完全可以重新进入。
例:
create ‘MyTable’, ‘my’
put ‘MyTable’, ‘row1’, ‘my:1’, ‘中’
put ‘MyTable’, ‘row2’, ‘my:2’, ‘国’
put ‘MyTable’, ‘row3’, ‘my:3’, ‘人’
put ‘MyTable’, ‘row4’, ‘my:4’, ‘好’
创建新表
使用create命令来创建一个新的表。在创建的时候,必须指定表名和列族名。
hbase(main):001:0> create ‘test’, ‘cf’
0 row(s) in 0.4170 seconds
=> Hbase::Table - test
列举表信息
使用list命令:
hbase(main):002:0> list ‘test’
TABLE
test1 row(s) in 0.0180 seconds
=> [“test”]
获取表描述
hbase(main):020:0> desc ‘test’
Table test is ENABLED
test
COLUMN FAMILIES DESCRIPTION
{NAME => ‘cf’, BLOOMFILTER => ‘ROW’, VERSIONS => ‘1’, IN_MEMORY => ‘false’, KEEP_DELETE
D_CELLS => ‘FALSE’, DATA_BLOCK_ENCODING => ‘NONE’, TTL => ‘FOREVER’, COMPRESSION => ‘NO
NE’, MIN_VERSIONS => ‘0’, BLOCKCACHE => ‘true’, BLOCKSIZE => ‘65536’, REPLICATION_SCOPE
=> ‘0’}
1 row(s) in 0.3400 seconds
删除表
使用drop命令实现删除表的功能:
hbase(main):011:0> drop ‘test’
0 row(s) in 0.1370 seconds
检查表是否存在
hbase(main):021:0>exists ‘member’
Table member doesexist 0 row(s) in 0.1610seconds
数据管理(DML)操作
向表中插入数据
使用put命令,将数据插入表中:
hbase(main):003:0> put ‘test’, ‘row1’, ‘cf:a’, ‘value1’
hbase(main):004:0> put ‘test’, ‘row2’, ‘cf:b’, ‘value2’
hbase(main):005:0> put ‘test’, ‘row3’, ‘cf:c’, ‘value3’
可以看到,在本例中,一共插入了三条数据,一次一条。第一次插入到row1行,cf/:列,插入值为value1。所有列在HBase中有一个列族前缀。本例中的cf,后面跟着一个冒号还有一个列限定后缀,本例中是a。
一次性扫描全表数据
一种获取HBase数据的方法是扫描,使用scan命令来扫描表的数据。可以限制限制扫描的范围,在本例中,获取的是所有的数据。
hbase(main):006:0> scan ‘test’
ROW COLUMN+CELL
row1 column=cf:a, timestamp=1421762485768, value=value1
row2 column=cf:b, timestamp=1421762491785, value=value2
row3 column=cf:c, timestamp=1421762496210, value=value3
3 row(s) in 0.0230 seconds
获取一个行数据
使用get命令来获得某一行的数据:
hbase(main):007:0> get ‘test’, ‘row1’
COLUMN CELL
cf:a timestamp=1421762485768, value=value1
1 row(s) in 0.0350 seconds
更新一条数据
使用put命令,会覆盖原来的信息。
hbase(main):021:0> put ‘test’, ‘row3’, ‘cf:c’, 'winyar’0 row(s) in 0.0210seconds
hbase(main):024:0> get ‘test’, ‘row3’
COLUMN CELL
cf:c timestamp=1541029802520, value=winyar
1 row(s) in 0.0510 seconds
禁用一个表
如果你想要删除一个表或是修改它的设置,或者是其它的情况,都需要首先禁用该表。使用disable命令禁用表,enable命令重新启用表。
hbase(main):008:0> disable ‘test’
0 row(s) in 1.1820 seconds
hbase(main):009:0> enable ‘test’
0 row(s) in 0.1770 seconds
Kafka
安装与配置
1.下载Kafka安装包并解压:
tar -zxvf kafka_2.9.2-0.8.1.1.tgz
3.修改配置文件
[root@hadoop01 software]# cd kafka_2.11-1.0.0/config
[root@hadoop01 config]# vim server.properties
修改Page 21 broker.id=0
修改Page 31 listeners=PLAINTEXT://localhost:9092
修改Page 60 log.dirs=/home/software/kafka_2.11-1.0.0/logs
修改Page 123 zookeeper.connect=hadoop01:2181,hadoop02:2181,hadoop03:2181
在其下增加如下内容:
delete.topic.enable=true
advertised.host.name=192.168.157.130
advertised.port=9092
[root@hadoop01 config]# vim producer.properties
修改Page 21 bootstrap.servers=hadoop01:9092
[root@hadoop01 config]# vim zookeeper.properties
修改Page 16 dataDir=/home/software/kafka_2.11-1.0.0/data
4.在kafka_2.11-1.0.0目录下创建logs和data目录
[root@hadoop01 kafka_2.11-1.0.0]# mkdir logs
[root@hadoop01 kafka_2.11-1.0.0]# mkdir data
5.发送kafka文件夹到其他集群主机
[root@hadoop01 software]# scp -r kafka_2.11-1.0.0 root@hadoop02:/home/software/
[root@hadoop01 software]# scp -r kafka_2.11-1.0.0 root@hadoop03:/home/software/
6.修改其他主机的server.properties
[root@hadoop01 ~]# vim /home/software/kafka_2.11-1.0.0/config/server.properties
修改Page 21 broker.id=1
另一台修改为2。注意:id不能相同
7.删除其他主机的logs文件夹再重新创建一个。
[root@hadoop03 kafka_2.11-1.0.0]# rm -rf logs
[root@hadoop03 kafka_2.11-1.0.0]# mkdir logs
8.开启服务
①需先在各个机器上开启Zookeeper
[root@hadoop01 ~] sh zkServer.sh start
②进入bin目录
[root@hadoop01 ~]# cd /home/software/kafka_2.11-1.0.0/bin
③各个机器执行开启服务命令,指向配置文件
[root@hadoop01 bin]# kafka-server-start.sh …/config/server.properties
常用命令
创建topic:
kafka-topics.sh --create --zookeeper hadoop01:2181 --replication-factor 1 --partitions 1 --topic test
查看topic:
kafka-topics.sh --list --zookeeper hadoop01:2181
查看主题信息
1 kafka-topics.sh --describe --zookeeper hadoop01:2181 --topic test
启动命令行kafka生产者
1 kafka-console-producer.sh --broker-list hadoop01:9092 --topic test
启动命令行kafka消费者
1 kafka-console-consumer.sh --zookeeper hadoop01:2181 --topic test --from-beginning
实验:高可靠性验证
1 kafka-topics.sh --describe --zookeeper hadoop01:2181 --topic test
Storm
Storm的并发级别
Storm作为一个分布式实时计算系统,是可以通过在不同级别并发来提升程序的效率的。
Storm的并发体现在如下的四个级别:
1.Node服务器级别:配置在Storm集群中的一个服务器,会执行Topology的一部分运输,一个Storm集群中包含一个或者多个Node。
2.Worker进程级别:只一个Node上相互独立运作的jvm进程,每一个Node可以配置运行一个或多个worker。一个Topology会分配到一个或者多个worker运行。
3.Executor线程级别:指一个worker的jvm中运行的java线程。多个task可以指派给同一个executor来执行。除非是明确指定,Storm默认会给每个executor分配一个task。
4.Task任务
task是spout和bolt的实力,他们的nextTuple()和execute()方法会被executors线程调用执行。多个task可以指派给同一个executor来执行。除非是明确指定,Storm默认会给每个executor分配一个task。
Storm的默认并发度为1.
安装和配置
[root@hadoop01 software]# tar -xvf apache-storm-0.9.3.tar.gz
[root@hadoop01 software]# cd apache-storm-0.9.3/conf
[root@hadoop01 conf]# vim storm.yaml
修改Page 18 storm.zookeeper.servers:
修改Page 19 - “hadoop01”
修改Page 20 - “hadoop02”
修改Page 21 - “hadoop03”
修改Page 22 nimbus.host: “hadoop01”
新增Page 23 supervisor.slots.ports: “4”
新增Page 24 storm.local.dir: “/home/software/apache-storm-0.9.3/data”
复制配置好后的storm文件到其他集群主机
[root@hadoop01 software]# scp -r apache-storm-0.9.3 root@hadoop02:/home/software
[root@hadoop01 software]# scp -r apache-storm-0.9.3 root@hadoop03:/home/software
使用命令
需先启动Zookeeper
sh zkServer.sh start
[root@hadoop01 ~]# cd /home/software/apache-storm-0.9.3/bin
storm nimbus 启动nimbus守护进程
storm supervisor 启动supervisor守护进程
storm ui 启动stormui的守护进程,从而可以通过webUI界面来监控storm运行过程(浏览器访问http://hadoop01:8080)
storm drpc 启动一个DRPC服务守护进程
Spark
安装步骤
1.spark运行前提:安装有jdk、scala环境
2.上传和解压Spark安装包
tar -xvf spark-2.0.1-bin-hadoop2.7.tgz
3.进入Spark安装目录下的conf目录,创建spark-env.sh文件并修改
cd /home/software/spark-2.0.1-bin-hadoop2.7/conf
cp spark-env.sh.template spark-env.sh
vim spark-env.sh
#服务器IP地址
SPARK_LOCAL_IP=hadoop01
集群模式配置:
4.修改spark-env.sh文件增加如下内容
#spark的shuffle中间过程会产生一些临时文件,此项指定的是其存放目录,不配置默认是在 /tmp目录下
SPARK_LOCAL_DIRS=/home/software/spark/tmp
export JAVA_HOME=/home/software/jdk1.8
5.在conf目录下,编辑slaves文件
vim slave
hadoop01
hadoop02
hadoop03
5.远程发送配置好的spark安装文件到其他主机
cd /home/software
scp -r spark-2.0.1-bin-hadoop2.7 hadoop02:/home/software/
6.修改其他主机配置文件spark-env.sh的服务器IP地址
SPARK_LOCAL_IP=hadoop02
Spark单机模式启动
在bin目录下执行:sh spark-shell --master=local
简便操作指令:spark-shell
启动成功后可以通过web页面访问,http://192.168.157.130:4040
Spark集群模式启动
1)如果你想让 01 虚拟机变为master节点,则进入01 的spark安装目录的sbin目录
执行: sh start-all.sh
2)通过jps查看各机器进程,
01:Master +Worker
02:Worker
03:Worker
3)通过浏览器访问管理界面http://192.168.234.11:8080
4)通过spark shell 连接spark集群
进入spark的bin目录
执行:sh spark-shell.sh --master spark://192.168.157.130:7077
6)在集群中读取文件:
sc.textFile(“/root/work/words.txt”)
默认读取本机数据 这种方式需要在集群的每台机器上的对应位置上都一份该文件 浪费磁盘
7)所以应该通过hdfs存储数据
sc.textFile(“hdfs://hadoop01:9000/mydata/words.txt”);
注:可以在spark-env.sh 中配置选项 HADOOP_CONF_DIR 配置为hadoop的etc/hadoop的地址 使默认访问的是hdfs的路径
注:如果修改默认地址是hdfs地址 则如果想要访问文件系统中的文件 需要指明协议为file 例如 sc.text(“file:///xxx/xx”)
API操作
1)创建spark的项目
在scala中创建项目,导入spark相关的jar包
2)开发spark相关代码
代码示例:
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
object WordCountDriver {
def main(args: Array[String]): Unit = {
val conf=new SparkConf().setMaster("spark://hadoop01:7077").setAppName("wordcount")
val sc=new SparkContext(conf)
val data=sc.textFile("hdfs://hadoop01:9000/words.txt", 2)
val result=data.flatMap { x => x.split(" ") }.map { x => (x,1) }.reduceByKey(_+_)
result.saveAsTextFile("hdfs://hadoop01:9000/wcresult")
}
}
3)将写好的项目打成jar,上传到服务器,进入bin目录执行:
spark-submit --class cn.tedu.WordCountDriver /home/software/spark/conf/wc.jar
大数据知识点
Fast Paxos与Paxos的区别
Fast Paxos比Paxos更贴近工程。从一般的Client/Server来考虑,Client其实承担了Proposer和Learner的作用,而Server则扮演Acceptor的角色。
Fast Paxos的主要不同:
1.决策实施阶段.让Proposer直接提交到Acceptor,可以把通信步骤减少到2个。减少了先提交到acceptor的leader过程;
2.在Classic Paxos中,Acceptor投票的value都是Leader选择好的,所以不存在同一Round中投票多个Value的场景,从而保证了一致性。但在Fast Round中因为允许多个Proposer同时提交不同的Value到Acceptor,这将导致在Fast Round中没有任何value被作为最终决议,这也称为“冲突”(Collision)
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。