当前位置:   article > 正文

电商数仓(一)_电商数仓书写

电商数仓书写

jdk配置命令

  1. 解压JDK到/opt/module目录下
 tar -zxvf jdk-8u144-linux-x64.tar.gz -C /opt/module/
  • 1
  1. 进入/opt/module/jdk1.8.0_144目录
pwd # 查看jdk的路径
  • 1
  1. 打开/etc/profile文件
sudo vi /etc/profile
  • 1
  1. 在profile文件末尾添加JDK路径
#JAVA_HOME
export JAVA_HOME=/opt/module/jdk1.8.0_144
export PATH=$PATH:$JAVA_HOME/bin

  • 1
  • 2
  • 3
  • 4
  1. 保存退出
:wq
  • 1
  1. 让修改后的文件生效
source /etc/profile
  • 1
  1. 测试JDK是否安装成功
java -version
  • 1
  注意:重启(如果java -version可以用就不用重启)
  • 1
sync
sudo reboot
  • 1
  • 2

hadoop配置

  1. 用SecureCRT工具将hadoop-2.7.2.tar.gz导入到opt目录下面的software文件夹下面
  2. 进入到Hadoop安装包路径下
cd /opt/software/
  • 1
  1. 解压安装文件到/opt/module下面
tar -zxvf hadoop-2.7.2.tar.gz -C /opt/module/
  • 1
  1. 查看是否解压成功
ls /opt/module/
  • 1
  1. 将Hadoop添加到环境变量
    (1)获取Hadoop安装路径
    进入hadoop安装目录
pwd
  • 1

(2)打开/etc/profile文件

 sudo vi /etc/profile
  • 1

在profile文件末尾添加JDK路径:(shitf+g)

##HADOOP_HOME
export HADOOP_HOME=/opt/module/hadoop-2.7.2
export PATH=$PATH:$HADOOP_HOME/bin
export PATH=$PATH:$HADOOP_HOME/sbin

  • 1
  • 2
  • 3
  • 4
  • 5

(3)保存后退出

:wq
  • 1

(4)让修改后的文件生效

source /etc/profile
  • 1
  1. 测试是否安装成功
hadoop version
  • 1
  1. 重启(如果Hadoop命令不能用再重启)
sync
sudo reboot
  • 1
  • 2

编写集群分发脚本 xsync

  1. 进入家目录
cd
  • 1
  1. pwd命令显示如下
/home/atguigu
  • 1
  1. 在/home/atguigu目录下创建bin目录,并在bin目录下xsync创建文件,文件内容如下:
mkdir bin
cd bin/
touch xsync
vi xsunc
  • 1
  • 2
  • 3
  • 4

在该文件中编写如下代码

#!/bin/bash
#1 获取输入参数个数,如果没有参数,直接退出
pcount=$#
if((pcount==0)); then
echo no args;
exit;
fi

#2 获取文件名称
p1=$1
fname=`basename $p1`
echo fname=$fname

#3 获取上级目录到绝对路径
pdir=`cd -P $(dirname $p1); pwd`
echo pdir=$pdir

#4 获取当前用户名称
user=`whoami`

#5 循环
for((host=103; host<105; host++)); do
        echo ------------------- hadoop$host --------------
        rsync -rvl $pdir/$fname $user@hadoop$host:$pdir
done

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  1. 修改脚本 xsync 具有执行权限
chmod 777 xsync
  • 1
  1. 调用脚本形式:xsync 文件名称
xsync /home/atguigu/bin
  • 1

注意:如果将xsync放到/home/atguigu/bin目录下仍然不能实现全局使用,可以将xsync移动到/usr/local/bin目录下。

  1. 下载rxync(三台虚拟机都要执行)
sudo yum install rsync
  • 1

配置hadoop集群

(1)核心配置文件

cd /opt/module/hadoop-2.7.2/etc/hadoop
  • 1
vi core-site.xml
  • 1

在config…中添加

<!-- 指定HDFS中NameNode的地址 -->
<property>
		<name>fs.defaultFS</name>
      <value>hdfs://hadoop102:9000</value>
</property>

<!-- 指定Hadoop运行时产生文件的存储目录 -->
<property>
		<name>hadoop.tmp.dir</name>
		<value>/opt/module/hadoop-2.7.2/data/tmp</value>
</property>

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

(2)HDFS配置文件

配置hadoop-env.sh

vi hadoop-env.sh
export JAVA_HOME=/opt/module/jdk1.8.0_144
  • 1
  • 2

配置hdfs-site.xml

vi hdfs-site.xml
  • 1

副本数量设置为1,是为了减少损耗,正常应该是3

<property>
		<name>dfs.replication</name>
		<value>1</value>
</property>

<!-- 指定Hadoop辅助名称节点主机配置 -->
<property>
      <name>dfs.namenode.secondary.http-address</name>
      <value>hadoop104:50090</value>
</property>

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

(3)YARN配置文件

配置yarn-env.sh

vi yarn-env.sh
export JAVA_HOME=/opt/module/jdk1.8.0_144
  • 1
  • 2

配置yarn-site.xml

vi yarn-site.xml
  • 1

在该文件中增加如下配置

<!-- Reducer获取数据的方式 -->
<property>
		<name>yarn.nodemanager.aux-services</name>
		<value>mapreduce_shuffle</value>
</property>

<!-- 指定YARN的ResourceManager的地址 -->
<property>
		<name>yarn.resourcemanager.hostname</name>
		<value>hadoop103</value>
</property>

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

(4)MapReduce配置文件
配置mapred-env.sh

vi mapred-env.sh
export JAVA_HOME=/opt/module/jdk1.8.0_144
  • 1
  • 2

配置mapred-site.xml

cp mapred-site.xml.template mapred-site.xml
vi mapred-site.xml
  • 1
  • 2

在该文件中增加如下配置

<!-- 指定MR运行在Yarn上 -->
<property>
		<name>mapreduce.framework.name</name>
		<value>yarn</value>
</property>

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

(5)slaves文件配置

vi slaves
  • 1

文件中只写

hadoop101
hadoop101
hadoop101
  • 1
  • 2
  • 3

免密登录

hadoop102和hadoop103上必须配置免密登录
(1)进入根目录

cd .ssh/
 ssh-keygen -t rsa
  • 1
  • 2

然后敲(三个回车),就会生成两个文件id_rsa(私钥)、id_rsa.pub(公钥)
(2)将公钥拷贝到要免密登录的目标机器上

ssh-copy-id hadoop102
ssh-copy-id hadoop103
ssh-copy-id hadoop104
  • 1
  • 2
  • 3

(3)下载ssh客户端,因为ssh-copy-id命令不可用

sudo yum -y install openssh-clients
  • 1

更改源

  1. 下载wget
 sudo yum -y install wget
  • 1
  1. 更改源
 wget http://mirrors.aliyun.com/centos/6.10/os/x86_64/
  • 1

各种源地址

http://ftp.sjtu.edu.cn/centos/6.10/os/x86_64/
http://mirrors.cqu.edu.cn/CentOS/6.10/os/x86_64/
http://mirrors.163.com/centos/6.10/os/x86_64/
http://mirrors.cn99.com/centos/6.10/os/x86_64/
http://mirror.lzu.edu.cn/centos/6.10/os/x86_64/
http://mirrors.ustc.edu.cn/centos/6.10/os/x86_64/
http://mirrors.aliyun.com/centos/6.10/os/x86_64/
http://mirrors.huaweicloud.com/centos/6.10/os/x86_64/
http://mirrors.bfsu.edu.cn/centos/6.10/os/x86_64/
http://mirrors.neusoft.edu.cn/centos/6.10/os/x86_64/
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

集群分发

  1. 在hadoop102上分发hadoop和jdk
xsync hadoop-2.7.2/
xsync jdk1.8.0_144/
  • 1
  • 2
  1. 拷贝文件/etc/profile(因为是root权限不能分发)
sudo scp /etc/profile root@hadoop103:/etc/profile
#拷贝当前虚拟机下的/etc/profile文件到hadoop103虚拟机root用户的/etc/profile目录下
  • 1
  • 2
  1. 在hadoop103,hadoop104上更新 /etc/profile
source /etc/profile
java -version
hadoop version
  • 1
  • 2
  • 3

群起集群

第一次启动时要格式化

bin/hdfs namenode -format
  • 1

在这里插入图片描述

  1. 在hadoop102上群起集群
sbin/start-dfs.sh
  • 1
  1. 在hadoop103上启动yarn
 sbin/start-yarn.sh
  • 1

在这里插入图片描述

集群启动错误解决方案

  1. 查看日志:/home/atguigu/module/hadoop-2.7.2/logs
  2. 如果进入安全模式,可以通过hdfs dfsadmin -safemode leave
  3. 停止所有进程,删除data和log文件夹,然后hdfs namenode -format 来格式化

Zookeeper安装

  1. 解压
tar -zxvf zookeeper-3.4.10.tar.gz -C /opt/module/
  • 1
  1. 配置服务器编号

(1)在/opt/module/zookeeper-3.4.10/这个目录下创建zkData

mkdir zkData
  • 1

(2)在/opt/module/zookeeper-3.4.10/zkData目录下创建一个myid的文件

touch myid
  • 1

(3)编辑myid文件

vi myid
  • 1
在文件中添加与server对应的编号:2
  • 1
  1. 配置zoo.cfg文件
    (1)重命名/opt/module/zookeeper-3.4.10/conf这个目录下的zoo_sample.cfg为zoo.cfg
mv zoo_sample.cfg zoo.cfg
  • 1

(2)打开zoo.cfg文件

vi zoo.cfg
  • 1

修改数据存储路径配置

dataDir=/opt/module/zookeeper-3.4.10/zkData
  • 1

增加如下配置

#######################cluster##########################
server.2=hadoop102:2888:3888
server.3=hadoop103:2888:3888
server.4=hadoop104:2888:3888

  • 1
  • 2
  • 3
  • 4
  • 5

在这里插入图片描述
(3)进入module目录,进行分发

xsync zookeeper-3.4.10/
  • 1

(4)修改hadoop103,104的myid

cd /opt/module/zookeeper-3.4.10/zkData/
  • 1
  1. zk集群启动
    (1)分别启动Zookeeper
[atguigu@hadoop102 zookeeper-3.4.10]$ bin/zkServer.sh start
[atguigu@hadoop103 zookeeper-3.4.10]$ bin/zkServer.sh start
[atguigu@hadoop104 zookeeper-3.4.10]$ bin/zkServer.sh start

  • 1
  • 2
  • 3
  • 4

(2)查看状态

bin/zkServer.sh status
  • 1

原则上两个flower一个leader

ZK集群启动停止脚本

1)在hadoop102的/home/atguigu/bin目录下创建脚本

cd /home/atguigu/bin/
vi zk.sh
  • 1
  • 2
在脚本中编写如下内容
  • 1
#! /bin/bash

case $1 in
"start"){
	for i in hadoop102 hadoop103 hadoop104
	do
		ssh $i "/opt/module/zookeeper-3.4.10/bin/zkServer.sh start"
	done
};;
"stop"){
	for i in hadoop102 hadoop103 hadoop104
	do
		ssh $i "/opt/module/zookeeper-3.4.10/bin/zkServer.sh stop"
	done
};;
"status"){
	for i in hadoop102 hadoop103 hadoop104
	do
		ssh $i "/opt/module/zookeeper-3.4.10/bin/zkServer.sh status"
	done
};;
esac

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23

2)增加脚本执行权限

chmod 777 zk.sh
  • 1

3)Zookeeper集群启动脚本

[atguigu@hadoop102 module]$ zk.sh start
  • 1

4)Zookeeper集群停止脚本

[atguigu@hadoop102 module]$ zk.sh stop
  • 1

5)Zookeeper集群状态查看脚本


[atguigu@hadoop102 module]$ zk.sh status
  • 1
  • 2

注意在使用前还需要一下配置
把/etc/profile里面的环境变量追加到~/.bashrc目录
即执行以下代码(三台虚拟机都要执行)

cat /etc/profile >> ~/.bashrc
  • 1

集群日志生成启动脚本

1)在/home/atguigu/bin目录下创建脚本lg.sh

[atguigu@hadoop102 bin]$ vi lg.sh
  • 1

2)在脚本中编写如下内容

#! /bin/bash

	for i in hadoop102 hadoop103 
	do
		ssh $i "java -classpath /opt/module/log-collector-1.0-SNAPSHOT-jar-with-dependencies.jar com.atguigu.appclient.AppMain $1 $2 >/opt/module/test.log &"
	done

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

3)修改脚本执行权限

chmod 777 lg.sh
  • 1

4)启动脚本

[atguigu@hadoop102 module]$ lg.sh 
  • 1

集群时间同步修改脚本

1)在/home/atguigu/bin目录下创建脚本dt.sh

[atguigu@hadoop102 bin]$ vim dt.sh
  • 1

2)在脚本中编写如下内容

#!/bin/bash

log_date=$1

for i in hadoop102 hadoop103 hadoop104
do
	ssh -t $i "sudo date -s $log_date"
done

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

说明(ssh -t):https://www.cnblogs.com/kevingrace/p/6110842.html
3)修改脚本执行权限

chmod 777 dt.sh
  • 1

4)启动脚本

dt.sh 2019-2-10
  • 1

集群所有进程查看脚本

1)在/home/atguigu/bin目录下创建脚本xcall.sh

[atguigu@hadoop102 bin]$ vim xcall.sh
  • 1

2)在脚本中编写如下内容

#! /bin/bash

for i in hadoop102 hadoop103 hadoop104
do
        echo --------- $i ----------
        ssh $i "$*"
done

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

3)修改脚本执行权限

chmod 777 xcall.sh
  • 1

4)启动脚本

 xcall.sh jps
  • 1

Flume安装

1)将apache-flume-1.7.0-bin.tar.gz上传到linux的/opt/software目录下
2)解压apache-flume-1.7.0-bin.tar.gz到/opt/module/目录下

tar -zxf apache-flume-1.7.0-bin.tar.gz -C /opt/module/
  • 1

3)修改apache-flume-1.7.0-bin的名称为flume

mv apache-flume-1.7.0-bin flume
  • 1

4) 将flume/conf下的flume-env.sh.template文件修改为flume-env.sh,并配置flume-env.sh文件

[atguigu@hadoop102 conf]$ mv flume-env.sh.template flume-env.sh
[atguigu@hadoop102 conf]$ vi flume-env.sh
export JAVA_HOME=/opt/module/jdk1.8.0_144

  • 1
  • 2
  • 3
  • 4

5)分发flume
进入module目录

xsync flume/
  • 1

Flume的具体配置

(1)在/opt/module/flume/conf目录下创建file-flume-kafka.conf文件

vi file-flume-kafka.conf
  • 1

在文件配置如下内容

a1.sources=r1
a1.channels=c1 c2

# configure source
a1.sources.r1.type = TAILDIR
a1.sources.r1.positionFile = /opt/module/flume/test/log_position.json
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /tmp/logs/app.+
a1.sources.r1.fileHeader = true
a1.sources.r1.channels = c1 c2

#interceptor
a1.sources.r1.interceptors =  i1 i2
a1.sources.r1.interceptors.i1.type = com.atguigu.flume.interceptor.LogETLInterceptor$Builder
a1.sources.r1.interceptors.i2.type = com.atguigu.flume.interceptor.LogTypeInterceptor$Builder

a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = topic
a1.sources.r1.selector.mapping.topic_start = c1
a1.sources.r1.selector.mapping.topic_event = c2

# configure channel
a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
a1.channels.c1.kafka.topic = topic_start
a1.channels.c1.parseAsFlumeEvent = false
a1.channels.c1.kafka.consumer.group.id = flume-consumer

a1.channels.c2.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c2.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
a1.channels.c2.kafka.topic = topic_event
a1.channels.c2.parseAsFlumeEvent = false
a1.channels.c2.kafka.consumer.group.id = flume-consumer

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34

(2) 分发

 xsync file-flume-kafka.conf 
  • 1

flume拦截器

先将打好的包放入到hadoop102的/opt/module/flume/lib文件夹下面。

cd /opt/module/flume/lib/
  • 1
  1. 查看这个jar是否存在
ls | grep interceptor
  • 1
  1. 分发jar包
xsync flume-interceptor-1.0-SNAPSHOT.jar 
  • 1
  1. flume启动
[atguigu@hadoop102 flume]$ bin/flume-ng agent --name a1 --conf-file conf/file-flume-kafka.conf &
  • 1

日志采集Flume启动停止脚本

1)在/home/atguigu/bin目录下创建脚本f1.sh

[atguigu@hadoop102 bin]$ vim f1.sh
  • 1

在脚本中填写如下内容

#! /bin/bash

case $1 in
"start"){
        for i in hadoop102 hadoop103
        do
                echo " --------启动 $i 采集flume-------"
                ssh $i "nohup /opt/module/flume/bin/flume-ng agent --conf-file /opt/module/flume/conf/file-flume-kafka.conf --name a1 -Dflume.root.logger=INFO,LOGFILE >/dev/null 2>&1 &"
        done
};;	
"stop"){
        for i in hadoop102 hadoop103
        do
                echo " --------停止 $i 采集flume-------"
                ssh $i "ps -ef | grep file-flume-kafka | grep -v grep |awk '{print \$2}' | xargs kill"
        done

};;
esac

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20

说明1:nohup,该命令可以在你退出帐户/关闭终端之后继续运行相应的进程。nohup就是不挂起的意思,不挂断地运行命令。

说明2:/dev/null代表linux的空设备文件,所有往这个文件里面写入的内容都会丢失,俗称“黑洞”。
标准输入0:从键盘获得输入 /proc/self/fd/0
标准输出1:输出到屏幕(即控制台) /proc/self/fd/1
错误输出2:输出到屏幕(即控制台) /proc/self/fd/2

2)增加脚本执行权限

chmod 777 f1.sh
  • 1

3)f1集群启动脚本

[atguigu@hadoop102 module]$ f1.sh start
  • 1

4)f1集群停止脚本

[atguigu@hadoop102 module]$ f1.sh stop
  • 1

kafka安装

1)解压安装包

 tar -zxvf kafka_2.11-0.11.0.2.tgz -C /opt/module/
kafka_2.11-0.11.0.2/
  • 1
  • 2

2)修改解压后的文件名称

mv kafka_2.11-0.11.0.2/ kafka
  • 1

3)在/opt/module/kafka目录下创建logs文件夹

mkdir logs
  • 1

4)修改配置文件

cd /opt/module/kafka/config/
vi server.properties
  • 1
  • 2

在这里插入图片描述
5)分发kafka
在module目录下执行

xsync kafka/
  • 1

修改配置文件中的broker.id

cd /opt/module/kafka/config/
vi server.properties
  • 1
  • 2
注:broker.id不得重复 分别是1和2
  • 1

6)kafka启动
依次在hadoop102、hadoop103、hadoop104节点上启动kafka

bin/kafka-server-start.sh config/server.properties &
  • 1

9)关闭集群

bin/kafka-server-stop.sh stop
  • 1

Kafka集群启动停止脚本

1)在/home/atguigu/bin目录下创建脚本kf.sh

 vim kf.sh
  • 1
在脚本中填写如下内容
  • 1
#! /bin/bash

case $1 in
"start"){
        for i in hadoop102 hadoop103 hadoop104
        do
                echo " --------启动 $i Kafka-------"
                # 用于KafkaManager监控
                ssh $i "export JMX_PORT=9988 && /opt/module/kafka/bin/kafka-server-start.sh -daemon /opt/module/kafka/config/server.properties "
        done
};;
"stop"){
        for i in hadoop102 hadoop103 hadoop104
        do
                echo " --------停止 $i Kafka-------"
                ssh $i "/opt/module/kafka/bin/kafka-server-stop.sh stop"
        done
};;
esac

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20

说明:启动Kafka时要先开启JMX端口,是用于后续KafkaManager监控。
2)增加脚本执行权限

chmod 777 kf.sh
  • 1

3)kf集群启动脚本

kf.sh start
  • 1

4)kf集群停止脚本

kf.sh stop
  • 1

测试kafka

在hadoop102中

cd /opt/module/kafka/
  • 1

Kafka消费消息

bin/kafka-console-consumer.sh \
--zookeeper hadoop102:2181 --from-beginning --topic topic_start

  • 1
  • 2
  • 3

启动之后
克隆hadoop102


cd /opt/module/kafka/
  • 1
  • 2

kafka生产消息

bin/kafka-console-producer.sh \
--broker-list hadoop102:9092 --topic topic_start

  • 1
  • 2
  • 3

在克隆的会话中输入 hello
那么原本的kafka就会输出hello
在这里插入图片描述

Kafka Manager安装

cd /opt/software/
mv kafka-manager-1.3.3.22.zip /opt/module/
  • 1
  • 2
  1. 进入module目录,执行
 unzip kafka-manager-1.3.3.22.zip 
  • 1

补充:下载unzip命令

yum install -y unzip zip 
  • 1
  1. 进入到/opt/module/kafka-manager-1.3.3.22/conf目录,在application.conf文件中修改kafka-manager.zkhosts
[atguigu@hadoop102 conf]$ vi application.conf
  • 1

修改为:
在这里插入图片描述
复制这一段

kafka-manager.zkhosts="hadoop102:2181,hadoop103:2181,hadoop104:2181"
  • 1
  1. 启动KafkaManager
[atguigu@hadoop102 kafka-manager-1.3.3.22]$ 
nohup bin/kafka-manager   -Dhttp.port=7456 >/opt/module/kafka-manager-1.3.3.22/start.log 2>&1 &
  • 1
  • 2
  1. 在浏览器中打开
    http://hadoop102:7456
http://hadoop102:7456
  • 1

在这里插入图片描述
可以看到这个界面,选择添加 cluster;
在这里插入图片描述至此,就可以查看整个Kafka集群的状态,包括:Topic的状态、Brokers的状态、Cosumer的状态。
在Kafka的/opt/module/kafka-manager-1.3.3.22/application.home_IS_UNDEFINED 目录下面,可以看到Kafka-Manager的日志。

KafkaManager使用

Kafka Manager启动停止脚本

1)在/home/atguigu/bin目录下创建脚本km.sh

 vi km.sh
  • 1
在脚本中填写如下内容
  • 1
#! /bin/bash

case $1 in
"start"){
        echo " -------- 启动 KafkaManager -------"
        nohup /opt/module/kafka-manager-1.3.3.22/bin/kafka-manager   -Dhttp.port=7456 >start.log 2>&1 &
};;
"stop"){
        echo " -------- 停止 KafkaManager -------"
        ps -ef | grep ProdServerStart | grep -v grep |awk '{print $2}' | xargs kill 
};;
esac

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

2)增加脚本执行权限

[atguigu@hadoop102 bin]$ chmod 777 km.sh
  • 1

3)km集群启动脚本

[atguigu@hadoop102 module]$ km.sh start
  • 1

4)km集群停止脚本

[atguigu@hadoop102 module]$ km.sh stop
  • 1

消费Kafka数据Flume的配置

1)Flume配置分析
在这里插入图片描述
2)Flume的具体配置如下:
(1)在hadoop104的/opt/module/flume/conf目录下创建kafka-flume-hdfs.conf文件

[atguigu@hadoop104 bin]$ cd /opt/module/flume/conf/
  • 1
vi kafka-flume-hdfs.conf
  • 1

在文件配置如下内容

## 组件
a1.sources=r1 r2
a1.channels=c1 c2
a1.sinks=k1 k2

## source1
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 5000
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
a1.sources.r1.kafka.topics=topic_start

## source2
a1.sources.r2.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r2.batchSize = 5000
a1.sources.r2.batchDurationMillis = 2000
a1.sources.r2.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
a1.sources.r2.kafka.topics=topic_event

## channel1
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /opt/module/flume/checkpoint/behavior1
a1.channels.c1.dataDirs = /opt/module/flume/data/behavior1/
a1.channels.c1.maxFileSize = 2146435071
a1.channels.c1.capacity = 1000000
a1.channels.c1.keep-alive = 6

## channel2
a1.channels.c2.type = file
a1.channels.c2.checkpointDir = /opt/module/flume/checkpoint/behavior2
a1.channels.c2.dataDirs = /opt/module/flume/data/behavior2/
a1.channels.c2.maxFileSize = 2146435071
a1.channels.c2.capacity = 1000000
a1.channels.c2.keep-alive = 6

## sink1
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /origin_data/gmall/log/topic_start/%Y-%m-%d
a1.sinks.k1.hdfs.filePrefix = logstart-
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = second

##sink2
a1.sinks.k2.type = hdfs
a1.sinks.k2.hdfs.path = /origin_data/gmall/log/topic_event/%Y-%m-%d
a1.sinks.k2.hdfs.filePrefix = logevent-
a1.sinks.k2.hdfs.round = true
a1.sinks.k2.hdfs.roundValue = 10
a1.sinks.k2.hdfs.roundUnit = second

## 不要产生大量小文件
a1.sinks.k1.hdfs.rollInterval = 10
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollCount = 0

a1.sinks.k2.hdfs.rollInterval = 10
a1.sinks.k2.hdfs.rollSize = 134217728
a1.sinks.k2.hdfs.rollCount = 0

## 控制输出文件是原生文件。
a1.sinks.k1.hdfs.fileType = CompressedStream 
a1.sinks.k2.hdfs.fileType = CompressedStream 

a1.sinks.k1.hdfs.codeC = lzop
a1.sinks.k2.hdfs.codeC = lzop

## 拼装
a1.sources.r1.channels = c1
a1.sinks.k1.channel= c1

a1.sources.r2.channels = c2
a1.sinks.k2.channel= c2

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74

在这里插入图片描述

日志消费Flume启动停止脚本

1)在/home/atguigu/bin目录下创建脚本f2.sh

vi f2.sh
  • 1
在脚本中填写如下内容
  • 1
#! /bin/bash

case $1 in
"start"){
        for i in hadoop104
        do
                echo " --------启动 $i 消费flume-------"
                ssh $i "nohup /opt/module/flume/bin/flume-ng agent --conf-file /opt/module/flume/conf/kafka-flume-hdfs.conf --name a1 -Dflume.root.logger=INFO,LOGFILE >/opt/module/flume/log.txt   2>&1 &"
        done
};;
"stop"){
        for i in hadoop104
        do
                echo " --------停止 $i 消费flume-------"
                ssh $i "ps -ef | grep kafka-flume-hdfs | grep -v grep |awk '{print \$2}' | xargs kill"
        done

};;
esac

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20

2)增加脚本执行权限

chmod 777 f2.sh
  • 1

3)f2集群启动脚本

[atguigu@hadoop102 module]$ f2.sh start
  • 1

4)f2集群停止脚本

[atguigu@hadoop102 module]$ f2.sh stop
  • 1

一阶段截止

在这里插入图片描述

采集通道启动/停止脚本

1)在/home/atguigu/bin目录下创建脚本cluster.sh

vi cluster.sh
  • 1
在脚本中填写如下内容
  • 1
#! /bin/bash

case $1 in
"start"){
	echo " -------- 启动 集群 -------"

	echo " -------- 启动 hadoop集群 -------"
	/opt/module/hadoop-2.7.2/sbin/start-dfs.sh 
	ssh hadoop103 "/opt/module/hadoop-2.7.2/sbin/start-yarn.sh"

	#启动 Zookeeper集群
	zk.sh start

sleep 4s;

	#启动 Flume采集集群
	f1.sh start

	#启动 Kafka采集集群
	kf.sh start

sleep 6s;

	#启动 Flume消费集群
	f2.sh start

	#启动 KafkaManager
	km.sh start
};;
"stop"){
    echo " -------- 停止 集群 -------"

	#停止 KafkaManager
	km.sh stop

    #停止 Flume消费集群
	f2.sh stop

	#停止 Kafka采集集群
	kf.sh stop

    sleep 6s;

	#停止 Flume采集集群
	f1.sh stop

	#停止 Zookeeper集群
	zk.sh stop

	echo " -------- 停止 hadoop集群 -------"
	ssh hadoop103 "/opt/module/hadoop-2.7.2/sbin/stop-yarn.sh"
	/opt/module/hadoop-2.7.2/sbin/stop-dfs.sh 
};;
esac

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55

2)增加脚本执行权限

[atguigu@hadoop102 bin]$ chmod 777 cluster.sh
  • 1

3)cluster集群启动脚本

[atguigu@hadoop102 module]$ cluster.sh start
  • 1

4)cluster集群停止脚本

[atguigu@hadoop102 module]$ cluster.sh stop
  • 1
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/很楠不爱3/article/detail/678817
推荐阅读
相关标签
  

闽ICP备14008679号