赞
踩
Extract-Transform-Load的缩写,用来描述将数据从来源端经过萃取(extract)、转置(transform)、加载(load)至目的端的过程。
先来一张通用架构图:
数据源:数据源可以来自多个不同种类的源,例如数据库,日志文件,系统日志,数据库日志,业务日志等。
数据收集:采集数据,日志等数据文件。常用的采集工具有Flume,Logstash,Filebeat等。
数据缓冲:数据收集的过程是从多个源端采集过来,高并发,数据量大,如果直接处理会造成瓶颈甚至系统崩溃,所以一般需要使用消息队列将请求缓冲,根据系统处理能力按序处理。常用的消息中间件有Kafka,ActiveMQ,RabbitMQ等。
数据清洗:将送过来的数据做过滤,格式化,转换等操作,把各个形式的数据源统一转换为希望的数据格式。
离线存储:存储清洗后的数据,一般数据量都是非常巨大的,传统关系型数据库不利于存储和索引,所以离线存储使用非关系型数据库也就是常说的NOSQL,将数据创建索引后存储,用于数据分析和查询,效率很高。常用的有HBase,MongoDB,Elasticsearch等。
实时存储:业务上需要使用到的数据存入关系型数据库,用于业务上的功能,例如告警,运营统计等。常用的关系型数据库有Oracle,MySQL,SQL server等。
数据采集使用Filebeat, Filebeat是Elastic旗下Beats
系列中负责日志文件采集的产品,它是一个轻量级的日志传输agent,安装在数据源本地,监视日志文件目录和日志文件变化,将指定日志转发给Kafka,
Logstash,Elasticsearch,Redis等 。
这里我们在每个数据源本机安装Filebeat,并转给Kafka。
Kafka是时下比较流行的消息中间件,它的运行需要依赖Zookeeper,这里我们使用Kafka + ZK集群,接收Filebeat的数据。
关于消息中间件的功能这里就不介绍了,可以在另一篇文章《ActiveMQ环境搭建与使用》了解消息中间件的功能和原理,这里主要说一下Kafka如何区别点对点模式(p2p)和订阅模式(topic)。
上图中每一个broker对应Kafka集群中的一个Kafka实例,Filebeat是producer,将请求送给broker,默认情况下Kafka是订阅模式将消息广播给所有的consumer,例如我们部署了两个Logstash做高可用,即为两个consumer,其中一个broker接收到消息log1这个数据后会将log1同时发给两个Logstash处理,这样处理数据岂不是重复了?所以我们需要使用p2p的方式,即log1只有一个consumer可以接收并处理,在Kafka中需要使用consumer group的概念来实现,即给consumer分组,同一个组内的consumer会去竞争消息,所以我们把两个Logstash放入同一个consumer组内,即可实现高可用。
同样,Logstash也是elastic旗下的产品,它是服务器端的数据处理通道,能够同时从多个来源采集数据,转换数据,然后将数据发送到存储中。Logstash具备实时解析和转换数据的功能,数据从源传输到存储的过程中Logstash
过滤器能够解析各个事件,识别已命名的字段以构建结构,并将它们转换成通用格式,以便更轻松、更快速地分析和实现商业价值,例如:
l 利用 Grok 从非结构化数据中派生出结构
l 从 IP 地址破译出地理坐标
l 将 PII 数据匿名化,完全排除敏感字段
l 简化整体处理,不受数据源、格式或架构的影响
这里我们主要使用Logstash的数据处理功能,将采集到的数据过滤,转换为我们需要的格式和内容,再存储到下游的数据库中。
常用插件
将非结构化事件数据分析到字段中。 这个工具非常适用于系统日志,Apache和其他网络服务器日志,MySQL日志,以及通常为人类而不是计算机消耗的任何日志格式。
举例:
文件中的一行日志为:
2019-03-09 11:50:04.009 INFO
[MockBPServer-PacketRepository-9] [QdbLogger.java:23]
[Lid:127.0.0.1-92-bp1] sessionId: [329]
sequenceNo: [6] request: [ExecutionDynamicParamRequest] byteLength: [99]
我们利用grok将这行日志的时间和日志级别取出来
代码为:
- filter {
-
- grok {
-
- match => {
-
- "message"
- =>
- "(?<date>\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2}.\d{3})\s+(?<level>[A-Z]+)(?<text>.*)"
-
-
- }
-
- }
-
- }
代码的含义是用正则表达式匹
配上游发送的数据,如果匹配成功则记录到message这个字段中,并且匹配到< date >和< level >的数据也记录到date和level的字段中。
其中的正则表达式\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2}.\d{3}表示为日期格式xxxx-xx-xx xx:xx:xx.xxx
[A-Z]+表示多个大写英文字母
.*表示剩下的所有字符
所以最后的匹配结果为:
- {
-
-
- "date": "2019-03-09 11:50:04.009",
-
-
- "level": "INFO",
-
-
- "text": " [MockBPServer-PacketRepository-9]
- [QdbLogger.java:23] [Lid:127.0.0.1-92-bp1]
- sessionId: [329] sequenceNo: [6] request: [ExecutionDynamicParamRequest]
- byteLength: [99]"
-
- }
从字段解析日期以用作事件的Logstash时间戳,以下配置解析名为logdate的字段以设置Logstash时间戳:
代码:
- filter
- {
-
- date {
-
- match => [ "logdate", "yyyy-MM-DD
- HH:mm:ss" ]
-
- }
-
- }
返回结果:
{“logdate”:“2018-01-01
12:02:03”}
基于分隔符原理解析数据,解决grok解析时消耗过多cpu资源的问题
使用分隔符将非结构化事件数据提取到字段中。 解剖过滤器不使用正则表达式,速度非常快。
但是,如果数据的结构因行而异,grok过滤器更合适。
dissect的应用有一定的局限性:主要适用于每行格式相似且分隔符明确简单的场景
dissect语法比较简单,有一系列字段(field)和分隔符(delimiter)组成
%{}字段
%{}之间是分隔符12
例如,假设日志中包含以下消息:
Apr 26
12:20:02 localhost systemd[1]: Starting system activity accounting tool…1
代码:
- filter
- {
-
- dissect {
-
- mapping => { "message" =>
- "%{ts} %{+ts} %{+ts} %{src} %{prog}[%{pid}]: %{msg}"
-
- }
-
- }
-
- }
结果:
- {
-
- "msg" => "Starting system activity
- accounting tool...",
-
- "src" => "localhost",
-
- "pid" => "1",
-
- "message" => "Apr 26 12:20:02 localhost systemd[1]:
- Starting system activity accounting tool...",
-
- "prog" => "systemd",
-
- "ts" => "Apr 26 12:20:02"
-
- }
说明: Apr 26 12 :20:02
%{ts}
%{+ts} %{+ts} #+代表该匹配值追加到ts字段下
{
“ts”:“Apr 26 12:20:02”
}
two
three one go
%{+order/2}
%{+order/3} %{+order/1} %{+order/4}
#/后面的数字代表拼接的次序
{
“order”: “one two three go”
}
a=1&b=2
%{?key1}=%{&key1}&%{?key2}=%{&key2} #%{?}代表忽略匹配值,但是富裕字段名,用于后续匹配用;%{&}代表将匹配值赋予key1的匹配值
{
“a”:“1”,
“b”:“2” }
dissect可以自动处理空的匹配值
John
Smith,Big Oaks,Wood Lane,Hambledown,Canterbury,CB34RY
%{name},%{addr1},%{addr2},%{addr3},%{city},%{zip}
Jane
Doe,4321 Fifth Avenue,New York,87432
{
“name”:“Jane Doe”,
“addr1”:"4321 Fifth
Avenue",
“addr2”:"",
“addr3”:"",
“city”:“New York”,
“zip”:“87432”
}
#dissect分割后的字段值都是字符串,可以使用convert_datatype属性进行类型转换
- filter{
-
- dissect{
-
- convert_datatype => {age =>"int"
- }
- }
-
- }
可以对字段进行各种操作,比如重命名、删除、替换、更新等,主要操作如下:
convert
#类型转换
gsub
#字符串替换
split/join/merge
#字符串切割、数组合并为字符串、数组合并为数组
rename
#字段重命名
update/replace
#字段内容更新或替换
remove_field
#删除字段
过滤掉不需要的数据
代码:
- filter {
-
- if ([level] !~ "(ERROR|INFO)"){
-
- drop {}
- }
-
- }
说明:
如果level字段不是ERROR或者INFO 则丢弃
Elastic旗下最核心产品,Elasticsearch是一个分布式、RESTful 风格的搜索和数据分析引擎,能够执行及合并多种类型的搜索(结构化数据、非结构化数据、地理位置、指标),搜索方式随心而变,具体功能和部署使用方法请参考另一篇文章《Elasticsearch环境搭建与使用》
这里我们搭建es集群用来存储清洗后的日志文件。
Elastic旗下产品,一般搭配Logstash和Elasticsearch使用,被称为ELK组合。通过Kibana,你可以对自己的Elasticsearch中的数据进行可视化,并可以自定义展示报表,方便查询和管理es中的数据。
基于docker的搭建(首先请安装docker和docker-compose)
镜像下载
docker pull docker.elastic.co/elasticsearch/elasticsearch:7.2.0
docker pull
docker.elastic.co/kibana/kibana:7.2.0
docker pull docker.elastic.co/logstash/logstash:7.2.0
docker pull
docker.elastic.co/beats/filebeat:7.2.0
docker pull wurstmeister/kafka
配置
宿主机映射存储目录
[root@hbase es]# pwd
/root/Log-mgt/es
[root@hbase es]# mkdir data
[root@hbase es]# chmod 777 data
每一台数据源的主机上都需要配置filebeat agent的配置文件
- filebeat.inputs:
-
- -
- type: log
-
- enabled: true
-
- paths:
-
- - /var/log/*.log
-
- multiline:
-
- pattern:
- "^(([0-9]{4})-(0[1-9]|1[0-2])-(0[1-9]|[12][0-9]|3[01]))"
-
- negate: true
-
- match: after
首先input部分配置fileb
eat需要采集的信息
type:采集的类型,有如下:
l Log: 读一般日志文件的每一行
l Sdin:读事件
l Container:读docker容器日志,指向日志路径
l Redis:读redis日志
l UDP:读UDP上的事件
l Docker:读docker容器日志,指向容器id
l TCP:读TCP上的事件
l Syslog:读TCP或UDP上的事件
l NetFlow:读NetFlow和IPFIX
Path为日志路径
Multiline指匹配多行,如果不用,放入下游的数据是按每一行为一条,但是很多时候我们更希望一条数据是一个完整的信息,需要包含多行,所以这里需要匹配正则表达式,^(([0-9]{4})-(0[1-9]|1[0-2])-(0[1-9]|[12][0-9]|3[01])) 表示以日期格式xxxx-xx-xx为开头的数据为切分。
- output.kafka:
-
- hosts: ["172.16.23.126:9092"]
-
- topic: "leontest"
-
- ##partition策略必须为random、round_robin或者hash的其中一个
-
- #开启kafka的partition分区
-
- partition.round_robin:
-
- reachable_only: true
-
- compression: gzip
out部分的配置是将采集到的数据发送到下游,这里配置的是Kafka
topic是放入Kafka队列里的消息名,不同的数据源可以配置为不同topic
- input{
-
- kafka {
-
-
- client_id => "beats"
-
-
- bootstrap_servers => "172.16.23.126:9092"
-
-
- consumer_threads => 3
-
-
- group_id => "groupA"
-
-
- topics => ["leontest"]
-
-
- ##数据json化才能读取到message数据
-
-
- codec => "json"
-
-
- }
-
- }
Input部分配置Logstash从哪个上游获取数据,这里配置的Kafka
这里的group_id用于实现Logstash高可用,相同groupid的Logstash竞争同一个topic,及实现p2p队列模式,不同组则为订阅模式。
- filter{
-
-
- grok {
-
-
- match => {
-
-
- "message" => "(?<date>\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2}.\d{3})\s+(?<level>[A-Z]+)(?<text>.*)"
-
-
- }
-
-
- }
-
-
- #如果不是error或info删除日志
-
-
- if ([level] !~ "(ERROR|INFO)") {
-
-
- drop {}
-
-
- }
-
-
-
- mutate {
-
- ##不在output中输出以下字段,节省空间
-
-
- remove_field => [
-
- "beat",
-
-
- "@version",
-
-
- "message",
-
-
- "host",
-
-
- "meta",
-
-
- "input",
-
-
- "offset",
-
-
- "source",
-
-
- "fileset",
-
-
- "event",
-
-
- "prospector",
-
-
- "log",
-
-
- "ecs",
-
-
- "agent",
-
-
- "fields",
-
-
- "sort",
-
-
- "_version",
-
-
- "_score"
-
-
- ]
-
-
- }
Filter部分配置Logstash的数据清洗规则
这里我们将不是INFO和ERROR的日志记录丢弃,并且输出日志级
别和日志时间戳,将多余字段删除
- output{
-
- elasticsearch {
-
- codec => "json"
-
- hosts => ["172.16.23.126:9200"]
-
- index => "leontest"
-
- }
-
- }
输出部分的配置我们配置输出到Elasticsearch,输出格式为json
- version:
- '2.2'
-
- services:
-
- #Elasticsearch容器配置
-
- es:
-
- image:
- docker.elastic.co/elasticsearch/elasticsearch:7.2.0
-
- container_name: es
-
- environment:
-
- - node.name=es
-
- - cluster.initial_master_nodes=es
-
- - cluster.name=es-cluster
-
- - "ES_JAVA_OPTS=-Xms512m
- -Xmx512m"
-
- #跨域访问
-
- - http.cors.enabled=true
-
- - http.cors.allow-origin="*"
-
- volumes:
-
- #数据目录映射
-
- -
- /root/Log-mgt/es/data:/usr/share/elasticsearch/data
-
- ports:
-
- - 9200:9200
-
- - 9300:9300
-
- #重启策略,能够使服务保持始终运行,生产环境推荐使用
-
- restart: "always"
-
- #kibana容器配置
-
- kibana:
-
- image:
- docker.elastic.co/kibana/kibana:7.2.0
-
- container_name: kibana
-
- environment:
-
- XPACK_MONITORING_ENABLED:
- "true"
-
-
- XPACK_MONITORING_UI_CONTAINER_ELASTICSEARCH_ENABLED: "true"
-
- ELASTICSEARCH_HOSTS: http://172.16.23.126:9200
-
- CSP_STRICT: "true"
-
- ports:
-
- - 5601:5601
-
- restart: "always"
-
- #logstash容器配置
-
- logstash:
-
- image:
- docker.elastic.co/logstash/logstash:7.2.0
-
- restart: "always"
-
- container_name: logstash
-
- environment:
-
- - log.level=error
-
- - xpack.monitoring.enabled=true
-
- -
- xpack.management.elasticsearch.hosts=http://172.16.23.126:9200
-
- -
- xpack.monitoring.elasticsearch.hosts=http://172.16.23.126:9200
-
- ports:
-
- - 5044:5044
-
- volumes:
-
- #配置文件映射
-
- -
- /root/Log-mgt/logstash/logstash.conf:/usr/share/logstash/pipeline/logstash.conf
-
-
-
- #filebeat容器配置
-
- beat:
-
- image:
- docker.elastic.co/beats/filebeat:7.2.0
-
- container_name: filebeat
-
- user: root
-
- environment:
-
- - setup.kibana.host=172.16.23.126:5601
-
- - strict.perms=false
-
- restart: "always"
-
- volumes:
-
- #配置文件映射
-
- -
- /root/Log-mgt/filebeat/filebeat.yml:/usr/share/filebeat/filebeat.yml
-
- #需要采集的文件目录映射
-
- - /root/Log-mgt/filebeat/input:/var/log/
-
- #zookeeper容器配置
-
- zookeeper:
-
- image: zookeeper:latest
-
- container_name: zookeeper2183
-
- ports:
-
- - 2183:2181
-
- restart: always
-
-
-
- #kafka容器配置
-
- kafka:
-
- image: wurstmeister/kafka
-
- container_name: kafka
-
- ports:
-
- - 9092:9092
-
- #依赖zookeeper启动
-
- depends_on:
-
- - zookeeper
-
- restart: always
-
- environment:
-
- KAFKA_BROKER_ID: 1
-
- KAFKA_ADVERTISED_HOST_NAME: 172.16.23.126
-
- KAFKA_ZOOKEEPER_CONNECT:
- 172.16.23.126:2183
启动
docker-compose up -d
效果
容器启动成功
Es集群运行正常
在filebeat采集目录下放入日志,样例为:
访问kibana,查询leontest的index,只找到INFO和ERROR的日志,并且ERROR日志为多行
后续
Logstash输出了日志数据到es中,同时我们也可以将一些日志的关键信息输出到mysql中,用于业务操作,例如发送短信告警。
例如如下配置:
- output {
-
- jdbc
- {
-
- driver_jar_path => "/etc/logstash/jdbc/mysql-connector-java-5.1.47/mysql-connector-java-5.1.47-bin.jar"
-
- driver_class =>
- "com.mysql.jdbc.Driver"
-
- connection_string =>
- "jdbc:mysql://mysql服务器ip:端口/数据库?user=数据库用户名&password=数据库密码"
-
- statement => [ "insert into 数据表 (TIME ,IP,LEVEL) values (?,?,?)","%{@timestamp}"
- ,"%{host}","%{level}" ]
-
- }
-
- }
好了,到此为止我们什么数据都有了,接下来可以开发业务代码去使用这些数据做业务功能啦。
etl技术架构图 相关内容
etl 数据插入数据库 SpringBoot2+Spring Security+Vue+ElementUI+蚂蚁金服AntV MyBatis从入门到精通 使用Java理解程序逻辑-Java入门宝典
2016-07-06 22:39:31 lvzhuyiyi 阅读数 3330
Informatica 9.6 实战课程,将从数据仓库体系架构介绍,服务器搭建配置,实例组件操作,丰富的案例,及项目实战开发过程 来讲述ETL工具Informatica的实际应用,真正完成数据仓库全流程开发应用,ODS层,EDW层,DM层,学完该课程具备ETL软件开发工程师能力水平,能独立完成项目开发工作,从入门到高级项目开发应用,每个实例都载图,课件,教程,数据等,能达到中高级ETL开发能力,本课程的学习要求,一定跟着实例动手实战,操作才能掌握实例的开发步骤,及开发注意事项。
该企业的数据传输平台主要由三部分构成:
1) 一部分是基于Quartz和Docker的调度系统。主要功能是负责调度和运行ETL任务。监视和变换当前任务实例的状态,当实例状态发生变化或者符合变化条件时主动更新数据库和内存中实例状态。
2) 一部分是数据交换工具Wormhole。主要用来多个数据库之间数据的交换,是ETL平台的基础。这个工具我们一般把它集成到Docker镜像中,同时我们还集成了各种脚本任务和Java运行环境的镜像,具体的选择取决于任务的类型。
3) 最后是基于Web的图形化工具。主要用来帮助用户手工配置任务、监控任务运行状态和日志查看进行调试。对任务和实例进行管理。管理操作包括挂起,预跑,置为成功,修改,杀死正在运行的实例等。
图 4.1 ETL传输平台架构
如上图(图4.1)所示,用户通过Web界面来创建和管理任务,把数据存储到数据库中,而Quartz调度系统的定时任务会把状态为新建的任务读出,初始化为实例,接着检查上游实例和并发状况,并申请资源,接下来就是根据负载均衡选择执行机,远程启动执行机上的容器,并把容器和实例映射关系存入数据库中。执行机上Docker守护进程收到启动命令后,还需要从远程仓库中根据任务类型下载镜像,如果是传输任务就下载Wormhole镜像,并设置启动命令为Wormhole启动命令(其他任务类型类似)。容器运行后,有定时任务会向每台执行机查询容器列表,并逐一检查容器的状态,更新实例和容器在数据库中的状态。这样,整个ETL传输平台就正常运转了起来。
调度系统Kepler机制的关键就是调度机上的三个定时任务(如图4.2):Init任务、Ready任务和Running任务。
1) Init定时任务的运行频率是10分钟一次,用途是从数据库中获得所有有效任务检查下一次触发时间是否在,根据任务新建它的实例,并把这些实例保存到实例表中和内存Init队列中。
2) Ready定时任务运行频率是1分钟一次,负责把Init队列中的任务实例,检查运行条件(如前置任务的实例是否运行完成,是否有同时并发运行的实例等)是否满足,然后申请一些共享资源,主要是Hive、Mysql等并发资源,这一切成功后就把实例的状态改为Ready并把它从Init队列移到Ready队列中,并在数据库中更改实例状态为Ready。
3) Running进程的频率是30秒一次,主要如任务是定时通过Zookeeper查询在线的机器以免出现分配新的任务到已下线的机器上,以及选择被占Slot(槽位,对应一个Docker容器用的标准资源量)数最少的机器为任务运行的目标机器。然后根据任务类型确定镜像名称,利用Docker Client API远程创建和启动Docker容器。
另外,调度系统的三个比较重要的组件:资源管理器,负责统计所有任务依赖的共享资源,保证它们的总和不达到上限;状态管理器,维护这各种状态的实例列表;容器调度器,维护当前运行的容器列表和存活的执行机列表。
下面介绍一下Wormhole的架构,这里有两张图:
图4.3所示的是整个Wormhole的架构,但Splitter的作用不是很明显。图4.4说明了Splitter的作用。可以看出传输任务和数据源的关系是一对一,而与输出目的地的关系是一对多。
Wormhole可以在各种数据存储类型之间高速交换数据。整体上是Framework + Plugin架构[15],Framework提供了以行为单位的数据流缓冲机制,分片和多线程读、写机制,读写分离的数据交换队列等高性能数据传输技术。框架为读写、分片、预处理等插件都提供了通用的接口,针对每个数据类型的插件实现类负责处理具体的连接和读写等规则。一个数据传输任务对应一个进程,全部数据传输都在内存中进行。
框架为读写、分片、预处理等插件都提供了通用的接口,这些接口分Reader和Writer两类,如果你需要开发面向某种类型数据源的插件,只需实现这些接口即可。比如数据源是Oracle,数据传输目标是Mysql,那么对应要开发的读写插件就是出 OracleReader和MysqlWriter插件,分片是OracleReaderSplitter和MysqlWriterSplitter。预处理是OracleReaderPeriphery和MysqlWriterPeriphery。把这些类实现相关接口即可,就算是加入了框架中。。
在生产实践中,每个数据输入类型除了要开发Reader和Writer,考虑到传输过程的性能和复杂性,还要为每个Reader和Writer开发一个Splitter和Periphery来做分段读、写和读写预处理工作。
Wormhole的数据交换主要使用了两种技术:读写双缓冲队列与线程池。下面详细介绍。线程池是一种复用线程对象、减少创建线程花销的技术手段[16]。现在线程池技术应用广泛。无论是主流的Web服务器TCP连接、数据库访问连接、文件、邮件之类的连接都有使用。这主要是因为两点:第一,这些服务器访问都有一个共同的特点:访问量大、频率高但每次连接的时间短。第二,线程池相对其他多线程技术也有如下的优点:1)线程数可以预先设定,并在实际使用中控制在预定数的一定范围内。这样就能有效控制创建多个线程带来的内存消耗,同时也减轻了JVM在垃圾回收上的压力。2)复用预先创建或已经存在的线程,提高了资源的利用效率。多个访问连接复用线程,这就大大节省了线程对象创建的时间,并且节省了系统资源,防止资源浪费[17]。3)提高系统响应速度。现在有资料表明,现代服务器在短时间内处理大量访问请求会创建大量线程,线程的创建和销毁时间会成为系统性能的瓶颈。因此复用线程对象能够降低服务器访问的延迟。
在互联网经典TCP协议服务器的请求处理逻辑中,监听TCP连接、数据发送和接收等事件的是主线程,而具体数据的收发则由Handler线程处理。于是就需要一个队列在主线程和各个Handler线程之间交换数据,类似于经典的生产者-消费者模式。这个队列读写都需要加锁,在实际处理过程中实际并发性能并不特别好,如果我们要提高并发性能就要用到双端缓冲队列。
双端缓冲队列是读写分离的两个队列,发送数据的线程把数据插入写队列,而读取数据的线程则从读队列中读取数据[18]。如果读数据时读队列为空且写队列不为空,则交换两个队列,否则阻塞等待。这个过程中有两把锁发挥作用:一是写锁。写锁用于写线程把数据插入写队列时以及读队列和写队列进行交换时。不过队列交换时,读线程必须具有读写两把锁,否则会死锁。二是读锁。读锁只用于读线程从读队列中获取数据时。最后还要读写缓冲队列长度的问题,队列短时,能够保证数据交换的及时性,但如果太短,队列交换频繁会降低并发性能。所以一般数据量大时队列长一些,反之则短一些。另外,双端缓冲队列还有两种实现策略使用于不同场景:
1) 读优先。数据消费者发现读队列为空时尝试交换读写队列。这种情况适合读比写速度慢的情况下。
2) 写优先。数据生产者发现写队列满时尝试交换读写队列。这种情况适合写比读慢的情况下。
开发前台是MVC(模型-视图-控制器)的架构。用户在视图层次配置管理任务,控制层负责处理业务逻辑和转发请求,模型层(数据访问层)进行持久化管理。考虑到系统的灵活性,我们并没把控制层和数据访问层放在同一服务器集群上访问,而是进行的分离,中间采用了该企业的服务框架中间件,进行远程RPC访问。下面我们来看看Galaxy和Galaxy-Halley的后端架构图(如图4.5)。
图 4.5 Galaxy和Galaxy-Halley后端架构图
可以看出,用户在Web上配置和管理自己的任务发起Http请求,请求从Nginx服务器通过负载均衡交替选择服务器,由于没有Session Sticky,所以必须在Web前端存储用户登录信息(内部系统,并不存在安全问题)。Galaxy服务器在处理完逻辑后,就需要向Galaxy-Halley数据库服务发起远程RPC调用,获取或者修改实例信息。
下面我们重点讨论任务代码上线模块。模块的架构如上图(如图4.6)。
用户有些文件必须从本地上传,有些代码则是项目的代码,可以从Github上拉取项目打包结果,发布到执行机上。整个发布流程大概是:用户把文件上传到Galaxy Server或者Galaxy Server拉取Github的打包结果成功后,再把文件上传到Hdfs。利用Zookeeper实现的、个数与执行机个数相当的分布式消息队列发布消息给执行机,运行在执行机上的消息消费程序接收到消息后,立刻到Hdfs上的指定地址下载代码到该任务在执行机上的预订目录,无论下载成功还是失败,立刻往数据库中插入一条消费状态信息。Galaxy通过间隔一段时间轮询数据库来获取整个发布消费过程中各执行机的消费状况,一旦有机器消费失败或者时间超时,则返回失败。
本章主要介绍了平台的整体架构设计和三个模块(调度系统、数据交换工具、开发前台)的架构设计,为下一章的具体实现打好基础。
展开阅读原文
etl技术架构图 相关内容
etl 数据插入数据库 SpringBoot2+Spring Security+Vue+ElementUI+蚂蚁金服AntV MyBatis从入门到精通 使用Java理解程序逻辑-Java入门宝典
2019-05-16 17:44:22 haohaizijhz 阅读数 1677
Informatica 9.6 实战课程,将从数据仓库体系架构介绍,服务器搭建配置,实例组件操作,丰富的案例,及项目实战开发过程 来讲述ETL工具Informatica的实际应用,真正完成数据仓库全流程开发应用,ODS层,EDW层,DM层,学完该课程具备ETL软件开发工程师能力水平,能独立完成项目开发工作,从入门到高级项目开发应用,每个实例都载图,课件,教程,数据等,能达到中高级ETL开发能力,本课程的学习要求,一定跟着实例动手实战,操作才能掌握实例的开发步骤,及开发注意事项。
数据平台层级架构图
一般包含三个层级,ODS层、数据仓库层、数据应用层。
业务系统的操作和日志数据抽取到ODS层,ODS的数据经过ETL过程(抽取Extraction,转化Transformation,加载Loading)进入数据仓库,数据仓库反哺业务,为业务的分析和决策提供支持:反应业务现状,预测业务未来发展趋势,为业务的优化拓展赋能智慧。
直接从业务系统和用户日志中抽取,可与业务系统、日志系统中的数据结构和关系保持一致。
直接在业务系统和日志系统进行查询是会影响业务体统的正常运转,ODS的存在将查询操作和业务系统隔离开来,使分析师和决策者的查询更高效,隔离查询对业务系统运转的影响。
推荐设计流程:业务建模-领域建模-逻辑建模-物理建模
业务建模-领域建模-逻辑建模-物理建模,的建模流程。
业务建模:深入业务现场,进行业务主线的划分,业务流程的整合。
领域建模:即是实体抽象的过程,抽象出各个业务主线涉及到的领域概念和实体。
逻辑建模:找出各个实体之间的关系,此时推荐3NF 建模方法,ER图很有帮助。
物理建模:ER图落地到具体的数据库。
反应企业当前的经营状况、可进行多维度系统分析、数据挖掘的基础,为商业决策提供支持。
数据的应用来自于实际的业务需要,不要为了赶时髦急于上各种高大上的项目。
以上的架构方案最终都是应因为有数据应用的需求才去实施。
不要等到ODS、DW“建好了”,再去想怎么用。首先要有Why。
没有目的和规划的建设是无意义的浪费。
实践的好,也许能成为三体里的星环公司。
展开阅读原文
etl技术架构图 相关内容
etl 数据插入数据库 SpringBoot2+Spring Security+Vue+ElementUI+蚂蚁金服AntV MyBatis从入门到精通 使用Java理解程序逻辑-Java入门宝典
2019-08-19 10:27:03 qq_43147136 阅读数 1854
Informatica 9.6 实战课程,将从数据仓库体系架构介绍,服务器搭建配置,实例组件操作,丰富的案例,及项目实战开发过程 来讲述ETL工具Informatica的实际应用,真正完成数据仓库全流程开发应用,ODS层,EDW层,DM层,学完该课程具备ETL软件开发工程师能力水平,能独立完成项目开发工作,从入门到高级项目开发应用,每个实例都载图,课件,教程,数据等,能达到中高级ETL开发能力,本课程的学习要求,一定跟着实例动手实战,操作才能掌握实例的开发步骤,及开发注意事项。
1、大数据平台网站日志分析系统,项目技术架构图:
2、大数据平台网站日志分析系统,流程图解析,整体流程如下:
ETL即Hive查询的sql;
但是,由于本案例的前提是处理海量数据,因而,流程中各环节所使用的技术则跟传统BI完全不同:
1)数据采集:定制开发采集程序,或使用开源框架Flume
2)数据预处理:定制开发mapreduce程序运行于hadoop集群
3)数据仓库技术:基于hadoop之上的Hive
4)数据导出:基于hadoop的sqoop数据导入导出工具
5)数据可视化:定制开发web程序或使用kettle等产品
6)整个过程的流程调度:hadoop生态圈中的oozie工具或其他类似开源产品
3、在一个完整的大数据处理系统中,除了HDFS+MapReduce+Hive组成分析系统的核心之外,还需要数据采集、结果数据导出、任务调度等不可缺少的辅助系统,而这些辅助工具在Hadoop生态体系中都有便捷的开源框架,如图所示:
4、采集网站的点击流数据分析项目流程图分析
5、流式计算一般架构图
6、Spark和Hadoop之间的关系
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。