当前位置:   article > 正文

大数据框架介绍与实操

大数据框架

一、 大数据开源框架汇总简介

功能分类组件
系统平台Hadoop、CDH、HDP
监控管理CM、Hue、Ambari、Dr.Elephant、Ganglia、Zabbix、Eagle
文件系统HDFS、GPFS、Ceph、GlusterFS、Swift 、BeeGFS、Alluxio
资源调度YARN、Mesos
协调框架ZooKeeper 、Etcd、Consul
数据存储HBase、Cassandra、ScyllaDB 、MongoDB、Accumulo 、Redis 、Ignite、Arrow 、Geode、CouchDB、Kudu、CarbonData
数据处理MapReduce、Spark、Flink、Storm、Tez、Samza、Apex、Beam、Heron
查询分析Hive、SparkSQL、Presto、Kylin、Impala、Druid、ElasticSearch、HAWQ、Lucene、Solr、 Phoenix
数据收集Flume、Filebeat、Logstash、Chukwa
数据交换Sqoop 、Kettle、DataX 、NiFi
消息系统Pulsar、Kafka、RocketMQ、ActiveMQ、RabbitMQ
任务调度Azkaban、Oozie、Airflow
数据治理Ranger 、Sentry、Atlas
可视化Kibana 、D3.js、ECharts
数据挖掘Mahout 、MADlib 、Spark ML、TensorFlow、Keras
云平台Amazon S3、GCP、Microsoft Azure

1.1 hadoop

Hadoop核心包含分布式存储HDFS、离线计算引擎MapRduce、资源调度Apache YARN三部分。

1.2 hdfs

HDFS(Hadoop Distributed File System)分布式文件系统,是分布式计算中数据存储管理的基础。是Hadoop Core项目的核心子项目。HDFS是基于流数据模式访问和处理超大文件的需求而开发的,数据在相同节点上以复制的方式进行存储以实现将数据合并计算的目的。HDFS是一个高度容错性的系统,适合部署在廉价的机器上。HDFS能提供高吞吐量的数据访问,非常适合大规模数据集上的应用。它有很多的优点,但也存在有一些缺点,包括:不适合低延迟数据访问、无法高效存储大量小文件、不支持多用户写入及任意修改文件。HDFS支持传统的层次型文件组织结构。用户或者应用程序可以创建目录,然后将文件保存在这些目录里。文件系统名字空间的层次结构和大多数现有的文件系统类似:用户可以创建、删除、移动或重命名文件。

1.3 yarn

YARN:(Yet Another Resource Negotiator)是Hadoop的资源管理和作业调度系统。作为Apache Hadoop的核心组件之一,YARN负责将系统资源分配给在Hadoop集群中运行的各种应用程序,并调度在不同集群节点上执行的任务。YARN是Hadoop2.x 版本中的一个新特性。它的出现其实是为了解决第一代 MapReduce 编程框架的不足,提高集群环境下的资源利用率,这些资源包括内存,磁盘,网络,IO等。YARN的基本思想是将资源管理和作业调度/监视的功能分解为单独的 daemon(守护进程),其拥有一个全局ResourceManager、每个应用程序的ApplicationMaster及每台机器框架代理NodeManager。ResourceManager负责所有应用程序之间资源分配。NodeManager负责Containers,监视其资源使用情况(CPU,内存,磁盘,网络)并将其报告给 ResourceManager。ApplicationMaster负责是协调来自ResourceManager的资源,并与NodeManager一起执行和监视任务。

1.4 mapreduce

Apache Hadoop MapReduce是一个分布式的离线计算框架,用于海量数据的并行运算,是Hadoop数据分析的核心.。MapReduce框架使得编程人员在不会分布式并行编程的情况下,将编写的业务逻辑代码运行在分布式系统上,开发人员可以将绝大部分的工作集中于业务逻辑上的开发,具体的计算只需要交给框架就可以。MapReduce的处理过程分为两个步骤:Map和Reduce。Map阶段对输入的数据进行并行处理,处理结果传给Reduce完成最后的汇总。但由于MR对HDFS的频繁操作(包括计算结果持久化、数据备份、资源下载及Shuffle等)导致磁盘I/O成为系统性能的瓶颈,因此只适用于离线数据处理或批处理,而不能支持对迭代式、交互式、流式数据的处理。

1.5 spark

Apache Spark是通用的一站式计算框架,是专为大规模数据处理而设计的快速通用的计算引擎。2009年诞生于UC Berkeley的AMPLab,2010 年开源,2013年6月成为Apache孵化项目,2014年2月19日成为Apache顶级项目。Spark是基于MapReduce算法实现的分布式计算,拥有 MapReduce 所具有的优点,但不同于 MR 的是,Job中间输出和结果可以保存在内存中,从而不再需要读写 HDFS,因此Spark能更好地适用于数据挖掘与机器学习等需要迭代的算法中,高效地支持更多计算模式,包括交互式查询和流处理。Spark是MapReduce 的替代方案,是对 Hadoop 的补充,而且兼容 HDFS、Hive,可融入 Hadoop 的生态系统,以弥补MapReduce的不足。Spark是在Scala语言中实现的,它将 Scala 用作其应用程序框架。与 Hadoop 不同,Spark 和Scala能够紧密集成,其中的Scala可以像操作本地集合对象一样轻松地操作分布式数据集。Spark通过提供丰富的Scala、Java、Python API、R及交互式Shell来提高可用性。
Spark主要包含几个重要组件:SparkCore批处理、SparkSQL交互式处理、SparkStreaming流处理、Spark Graphx图计算、Spark MLlib机器学习(maching learning),Spark旨在成为运行批处理、数据流处理、交互处理、图形处理和机器学习等应用的一站式平台。目前Spark已经成为大数据领域最热门的技术。

1.6 hbase

hbase是一个分布式的、面向列的NoSQL开源数据库。是一个高可靠性、高性能、面向列、可伸缩的分布式存储系统,利用HBase技术可在廉价服务器上搭建起大规模结构化存储集群。初期的目标是弥补MapReduce在实时操作上的缺失,方便用户可随时操作大规模的数据集。
HBase原来是Apache的Hadoop项目的子项目,随着大数据与NoSQL的流行和迅速发展,2010年5月Apache HBase脱离了Hadoop成为Apache基金的顶级项目。HBase利用Hadoop HDFS作为其文件存储系统;HBase同样利用Hadoop MapReduce来处理HBase中的海量数据;HBase利用Zookeeper作为协调服务。HBase不同于一般的关系数据库,它是一个适合于非结构化数据存储的数据库,另外HBase是基于列的而不是基于行的模式。

1.7 zookeeper

Apache ZooKeeper 是一个开源的分布式协调服务,是Hadoop,HBase和其他分布式框架使用的有组织服务的标准。ZooKeeper是一个典型的分布式数据一致性解决方案,分布式应用程序可以基于ZooKeeper实现诸如数据发布/订阅、负载均衡、命名服务、分布式协调/通知、集群管理、Master 选举、分布式锁和分布式队列等功能。ZooKeeper是以Fast Paxos算法为基础的,Paxos 算法存在活锁的问题,即当有多个proposer交错提交时,有可能互相排斥导致没有一个proposer能提交成功,而Fast Paxos作了一些优化,通过选举产生一个leader (领导者),只有leader才能提交proposer。ZooKeeper使用 ZAB 协议作为其保证数据一致性的核心算法。ZAB(ZooKeeper Atomic Broadcast 原子广播)协议是为分布式协调服务 ZooKeeper 专门设计的一种支持崩溃恢复的原子广播协议。

1.8 kafaka

Apache Kafka是一个发布/订阅的消息系统,由Scala写成。该项目的目标是为处理实时数据提供一个统一、高通量、低等待的平台。Kafka是一个分布式的、分区的、多复本的日志提交服务。是目前使用最广泛的消息系统。

二、hive数据分析实例

2.1 hive

Hive是基于Hadoop的一个数据仓库工具,可以将结构化的数据文件映射为一张数据库表,并提供类SQL语句的Hive SQL(HQL)查询功能,将SQL语句转换为MapReduce任务进行运行。原理是用熟悉的SQL模型来操作 HDFS 上的数据。可以通过HQL语句快速实现的MapReduce统计,不必开发专门的MapReduce应用。方便的使用 Hive 进行数据仓库的建模和建设,然后使用 SQL 模型针对数据仓库中的数据进行统计和分析。但由于Hive底层默认是转换为MR执行,而MR的shuffle是基于磁盘的,所以只能处理离线分析。目前部分企业使用Hive构建数仓。 Hive 的数据存储在 HDFS 中,大部分的查询由 MapReduce 完成(包含 * 的查询,比如 select * from tab 不会生成 MapRedcue 任务)。

MapReduce是一个分布式的离线计算框架,用于海量数据的并行运算,是Hadoop数据分析的核心。MapReduce框架使得编程人员在不会分布式并行编程的情况下,将编写的业务逻辑代码运行在分布式系统上,开发人员可以将绝大部分的工作集中于业务逻辑上的开发,具体的计算只需要交给框架就可以。MapReduce的处理过程分为两个步骤:Map和Reduce。Map阶段对输入的数据进行并行处理,处理结果传给Reduce完成最后的汇总。
Spark是专为大规模数据处理而设计的快速通用的计算引擎,类似于Hadoop MapReduce的通用并行框架,拥有Hadoop MapReduce所具有的优点;但不同于MapReduce的是——Job中间输出结果可以保存在内存中,从而不再需要读写HDFS,因此Spark能更好地适用于需要迭代的MapReduce的算法。
Pipeline管道。这是HDFS在上传文件写数据过程中采用的一种数据传输方式。
客户端将数据块写入第一个数据节点,第一个数据节点保存数据之后再将块复制到第二个数据节点,后者保存后将其复制到第三个数据节点。数据以管道的方式,顺序的沿着一个方向传输,这样能够充分利用每个机器的带宽,避免网络瓶颈和高延迟时的连接,最小化推送所有数据的延时。
在线性推送模式下,每台机器所有的出口宽带都用于以最快的速度传输数据,而不是在多个接受者之间分配宽带。

ACK应答响应ACK (Acknowledge character)确认字符,在数据通信中,接收方发给发送方的一种传输类控制字符。表示发来的数据已确认接收无误。
在HDFS pipeline管道传输数据的过程中,传输的反方向会进行ACK校验,确保数据传输安全。

核心概念–默认3副本存储策略:
默认副本存储策略是由BlockPlacementPolicyDefault指定。
第一块副本:优先客户端本地,否则随机
第二块副本:不同于第一块副本的不同机架。
第三块副本:第二块副本相同机架不同机器。

2.2 数据仓库

2.2.1 数据中心整体架构

hive本身是数据仓库,数据仓库是为了协助分析报告,支持决策,为需要业务智能的企业提供业务流程的改进和指导,从而节省时间和成本,提高质量,数据仓库是用来做查询分析的数据库,通常不做单条数据的插入、修改和删除,hive作为一个数据仓库的工具,非常适合数据的统计分析。

2.2.2 数据仓库模型规划

ODS(Operation Data Store): 数据源头层,数据仓库源头系统的数据表通常会原封不动的存储一份,这称为ODS层(可理解为原始库),是后续数据仓库加工数据的来源。数据来源:业务库、埋点日志、消息队列。
DWD(Data Warehouse Details ):数据细节层,是业务层与数据仓库的隔离层。主要对ODS数据层做一些数据清洗和规范化的操作。数据清洗:去除空值、脏数据、超过极限范围的。
DWB(Data Warehouse Base):数据基础层,存储的是客观数据,一般用作中间层,可以认为是大量指标的数据层,可理解为知识库字典、常用标准库。
DWS(Data Warehouse Service): 数据服务层,基于DWB上的基础数据,整合汇总成分析某一个主题域的服务数据层,一般是宽表。用于提供后续的业务查询,OLAP分析,数据分发等。
ADS(ApplicationData Service):应用数据服务,该层主要是提供数据产品和数据分析使用的数据,一般会存储在ES、mysql等系统中供线上系统使用。

2.3 Hive 和普通关系数据库的异同:

2.4、hive常用命令及应用实例

show databases; # 查看某个数据库
use 数据库;      # 进入某个数据库
show tables;    # 展示所有表
desc 表名;            # 显示表结构
show partitions 表名; # 显示表名的分区
show create table table_name;   # 显示创建表的结构
alter table table_name rename to new_table_name;#重命名
alter table table_name add columns (newcol1 int comment ‘新增’);增加字段
drop table table_name;#删除表
 

1、创建表:
CREATE TABLE stg_jt_iptv_c3_hw_d(
  `UserID` string, 
  `StartTime` string, 
  `EndTime` string, 
  `ServiceType` string, 
  `ChannelCode` string, 
  `ProgramName` string, 
  `MediaCode` string, 
  `ShiftDuration` string, 
  `Flux` string, 
  `EndReason` string, 
  `ChargeType` string, 
  `Qos` string, 
  `Reserved1` string, 
  `Reserved2` string, 
  `Reserved3` string)
PARTITIONED BY ( 
  `day_key` string)
ROW FORMAT SERDE 
  'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe' 
WITH SERDEPROPERTIES ( 
  'field.delim'='|', 
  'serialization.format'='|',
  'serialization.encoding'='GBK',
  'serialization.null.format'='')
STORED AS INPUTFORMAT 
  'org.apache.hadoop.mapred.TextInputFormat' 
OUTPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION 'hdfs://namenode01:8020/user/hive/warehouse/stg.db/stg_jt_iptv_c3_hw_d';


 CREATE TABLE `dw_jt_iptv_c3_hw_d`( 
  `UserID` string, 
  `StartTime` string, 
  `EndTime` string, 
  `STAYTIME` string, 
  `PERIOD` string,
  `ServiceType` string,
  `ChannelCode` string, 
  `ProgramName` string, 
  `MediaCode` string, 
  `ShiftDuration` string, 
  `Flux` string, 
  `EndReason` string,
  `ChargeType` string, 
  `Qos` string)
PARTITIONED BY ( 
  `day_key` string)
ROW FORMAT SERDE 
  'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe' 
WITH SERDEPROPERTIES ( 
  'field.delim'='|', 
  'serialization.format'='|') 
STORED AS INPUTFORMAT 
  'org.apache.hadoop.mapred.TextInputFormat' 
OUTPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION
  'hdfs://namenode01:8020/user/hive/warehouse/dw.db/dw_jt_iptv_c3_hw_d' 

  
2、数据压缩:
use dw
go
INSERT OVERWRITE TABLE dw.dw_jt_iptv_c3_hw_d PARTITION(day_key='$DAY$') 

select UserID,
       concat(substr(StartTime, 1, 4),
              '-',
              substr(StartTime, 5, 2),
              '-',
              substr(StartTime, 7, 2),
              ' ',
              substr(StartTime, 9, 2),
              ':',
              substr(StartTime, 11, 2),
              ':',
              substr(StartTime, 13, 2)),
       concat(substr(EndTime, 1, 4),
              '-',
              substr(EndTime, 5, 2),
              '-',
              substr(EndTime, 7, 2),
              ' ',
              substr(EndTime, 9, 2),
              ':',
              substr(EndTime, 11, 2),
              ':',
              substr(EndTime, 13, 2)),
       round((unix_timestamp(concat(substr(EndTime, 1, 4),
                                    '-',
                                    substr(EndTime, 5, 2),
                                    '-',
                                    substr(EndTime, 7, 2),
                                    ' ',
                                    substr(EndTime, 9, 2),
                                    ':',
                                    substr(EndTime, 11, 2),
                                    ':',
                                    substr(EndTime, 13, 2))) -
             unix_timestamp(concat(substr(StartTime, 1, 4),
                                    '-',
                                    substr(StartTime, 5, 2),
                                    '-',
                                    substr(StartTime, 7, 2),
                                    ' ',
                                    substr(StartTime, 9, 2),
                                    ':',
                                    substr(StartTime, 11, 2),
                                    ':',
                                    substr(StartTime, 13, 2)))) / 60,
             2),
       substr(StartTime, 9, 2),
       ServiceType,
       ChannelCode,
       ProgramName,
       MediaCode,
       ShiftDuration,
       Flux,
       EndReason,
       ChargeType,
       Qos
  from stg.stg_jt_iptv_c3_hw_d
 where day_key= $DAY$
 and (unix_timestamp(concat(substr(EndTime, 1, 4),
                              '-',
                              substr(EndTime, 5, 2),
                              '-',
                              substr(EndTime, 7, 2),
                              ' ',
                              substr(EndTime, 9, 2),
                              ':',
                              substr(EndTime, 11, 2),
                              ':',
                              substr(EndTime, 13, 2))) -
       unix_timestamp(concat(substr(StartTime, 1, 4),
                              '-',
                              substr(StartTime, 5, 2),
                              '-',
                              substr(StartTime, 7, 2),
                              ' ',
                              substr(StartTime, 9, 2),
                              ':',
                              substr(StartTime, 11, 2),
                              ':',
                              substr(StartTime, 13, 2)))) / 60 > 30
3、查询汇总计算:
select count(1) from stg.stg_jt_iptv_c3_hw_d where day_key='20220811';
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142
  • 143
  • 144
  • 145
  • 146
  • 147
  • 148
  • 149
  • 150
  • 151
  • 152
  • 153
  • 154
  • 155
  • 156
  • 157
  • 158
  • 159
  • 160
  • 161

2.5 hive工作原理

通过HiveSQL语句快速实现简单的MapReduce统计,Hive自身可以自动将HiveSQL语句快速转换成MapReduce任务进行运行 。hivesql查询的mapreduce作业转化过程:

2.6、hive元数据

一、存储Hive版本的元数据表(VERSION)
select * from VERSION;
二、Hive数据库相关的元数据表(DBS、DATABASE_PARAMS)
1、DBS 该表保存文件存储的基本信息,如INPUT_FORMAT、OUTPUT_FORMAT、是否压缩等。
2、DATABASE_PARAMS
三、Hive表和视图相关的元数据表
1、TBLS
2、TABLE_PARAMS
3、TBL_PRIVS
四、Hive文件存储信息相关的元数据表
1、SDS
2、SD_PARAMS
3、SERDES
4、SERDE_PARAMS
五、Hive表字段相关的元数据表
1、COLUMNS_V2
六、Hive表分区相关的元数据表
1、PARTITIONS
2、PARTITION_KEYS
3、PARTITION_KEY_VALS
4、PARTITION_PARAMS

2.7、hive使用数据倾斜及常用解决方案

5.1、hive查询信息数据在服务器框展示时一定加上limit查看部分样例数据,如果需要查单大量hive查询的详单数据,使用hive -e导出查询结果。

5.2、什么时数据倾斜:
数据角度:数据倾斜就是我们在计算数据的时候,数据的分散度不够,导致大量的数据集中到了一台或者几台机器上计算,这些数据的计算速度远远低于平均计算速度,导致整个计算过程过慢。Hadoop中的数据倾斜主要表现在ruduce阶段卡在99.99%,一直99.99%不能结束。我们在做数据运算的时候会涉及到count(distinct)、group by、join等操作,这些都会触发Shuffle动作,一旦触发,所有相同key的值就会拉到一个或几个节点上,就容易发生单点问题。

业务角度:数据往往和业务是强相关的,业务的场景直接影响到了数据的分布。比如联通某个新业务订单场景我们在某一天在北京和上海两个城市多了强力的推广,结果可能是这两个城市的订单量增长了100000%,其余城市的数据量几乎不变。然后我们要统计不同城市的订单情况,这样,一做group操作,可能直接就数据倾斜了。

5.3、数据倾斜解决思路:
5.3.1、业务逻辑,我们从业务逻辑的层面上来优化数据倾斜,比如上面的例子,我们单独对这两个城市来做count,最后和其它城市做整合。
5.3.2、程序层面,比如说在Hive中,经常遇到count(distinct)操作,这样会导致最终只有一个reduce,我们可以先group 再在外面包一层count。
5.3.3、配置调测。
5.3.4、从业务和数据方面解决数据倾斜:异常数据过滤、数据预处理、对分布不均匀的数据单独计算。

三、hdfs实例:

hadoop dfsadmin -report 这个命令可以快速定位出哪些节点down掉了,HDFS的容量以及使用了多少,以及每个节点的硬盘使用情况。

在HDFS分布式文件中 创建子文件:hadoop fs -mkdir -p /dqzx/zx

查看hive表存储目录:hadoop fs -ls /user/hive/warehouse

hdfs dfs -put /root/Contentviewlog_20220811.log hdfs://namenode01:8020/user/hive/warehouse/stg.db/stg_jt_iptv_c3_hw_d
  • 1
load data local inpath '/root/20220811/Contentviewlog_20220811.log' overwrite into table stg.stg_jt_iptv_c3_hw_d partition (day_key='20220811')
  • 1

查看上传文件:

hdfs dfs -ls hdfs://namenode01:8020/user/hive/warehouse/stg.db/stg_jt_iptv_c3_hw_d
  • 1

四、sparksql实例:

Shark提供了类似于Hive的功能,与Hive不同的是,Shark把SQL语句转换成Spark作业,而不是MapReduce作业。shark仅将物理执行计划从MapReduce作业替换成了Spark作业,也就是通过Hive的HiveSQL解析功能,把HiveSQL翻译成Spark上的RDD操作。Spark SQL在Hive兼容层面仅依赖HiveQL解析、Hive元数据,也就是说,从HQL被解析成抽象语法树(AST)起,就全部由Spark SQL接管了。Spark SQL执行计划生成和优化都由Catalyst(函数式关系查询优化框架)负责。



查看线程数:grep 'processor' /proc/cpuinfo | sort -u | wc -l
在本地模拟 N 个线程来运行当前任务
spark-shell --master local[N]

spark.sql("use stg").show(false)
spark.sql("select count(1) from stg.stg_jt_iptv_c3_hw_d where day_key='20220811'").show(false)

spark.sql("select * from stg.stg_jt_iptv_c3_hw_d where day_key='20220811'").show(5)
show展示数据

可以用show() 方法来展示数据,show有以下几种不同的使用方式:
show():显示所有数据
show(n) :显示前n条数据
show(true): 最多显示20个字符,默认为true
show(false): 去除最多显示20个字符的限制
show(n, true):显示前n条并最多显示20个字符
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/从前慢现在也慢/article/detail/647184
推荐阅读
相关标签
  

闽ICP备14008679号