当前位置:   article > 正文

Airflow Python工作流引擎的重要概念介绍_python – airflow

python – airflow

1、Airflow简介

Airflow是一个以编程方式创作,安排和监控工作流程的平台。

当工作流被定义为代码时,它们变得更易于维护,可版本化,可测试和协作。

使用Airflow将工作流作为任务的有向非循环图(DAG)。 Airflow调度程序在遵循指定的依赖项的同时在一组worker上执行您的任务。 丰富的命令行实用程序可以轻松地在DAG上执行复杂的手术。 丰富的用户界面使您可以轻松地可视化生产中运行的管道,监控进度以及在需要时解决问题。

Airflow的英文官网:

https://airflow.apache.org/project.html

Airflow是由Airbnb开源的一款工具,目前在github上已经近一万颗星,从该项目的github地址可获取源码和很多示例文件:

https://github.com/apache/incubator-airflow

https://github.com/apache/incubator-airflow/tree/master/airflow/example_dags

设计原则

  • 动态:Airflow配置为代码(Python),允许动态生成pipeline。 这允许编写动态实例化的pipelines代码。

  • 可扩展:轻松定义自己的opertators,执行程序并扩展库,使其符合适合您环境的抽象级别。

  • 优雅:Airflow精益而明确。 使用强大的Jinja模板引擎将参数化脚本内置于Airflow的核心。

  • 可扩展:Airflow具有模块化体系结构,并使用消息队列来协调任意数量的工作者。 Airflow已准备好扩展到无限远。

在查阅国内使用airflow的相关资料时,看到大部分网友是拿来作为代替crontab的一个高级定时任务管理工具使用,考虑到airflow的调度管理特性,确实也很擅长于做这些。不过airflow的核心价值应该是在于它是一个有向非循环的组织结构。在我们有一些比较复杂的后台工作任务需要进行自动化地处理时,airflow是一个非常好用的任务工作流编排和管理的工具。

2、快速开始

  1. mkdir -p ~/airflow
  2. export AIRFLOW_HOME=~/airflow
  3. export SLUGIFY_USES_TEXT_UNIDECODE=yes
  4. pip install apache-airflow
  5. # initialize the database
  6. airflow initdb
  7. # start the web server, default port is 8080
  8. airflow webserver -p 8080
  9. # start the scheduler
  10. airflow scheduler

通过浏览器访问以下地址:

http://localhost:8080

Airflow服务的配置文件为airflow.cfg,可以通过UI界面配置相关参数。

默认情况下,airflow使用一个sqlite数据库和SequentialExecutor执行器,这种使用方式时将仅支持按顺序得运行任务,只用于学习和实验用途。

我们先看两个演示的任务实例:

  1. # run your first task instance
  2. airflow run example_bash_operator runme_0 2018-09-06
  3. # run a backfill over 2 days
  4. airflow backfill example_bash_operator -s 2018-09-06 -e 2018-09-07

如果需要部署一个用于生产的环境,则按下面两个链接中的信息,安装其他类型的数据库并对配置文件进行变更。

https://airflow.apache.org/installation.html

https://airflow.apache.org/howto/initialize-database.html

3、定义一个DAG

人们有时会将DAG定义文件视为可以进行实际数据处理的地方 - 事实并非如此! 该脚本的目的是定义DAG对象。 它需要快速评估(秒级,而不是几分钟),因为调度程序将定期执行它以反映出变更(如果有的话)。Airflow 只是一个Python脚本,用来定义了Airflow DAG对象。 

下面是一个airflow pipeline的示例:

https://raw.githubusercontent.com/apache/incubator-airflow/master/airflow/example_dags/tutorial.py

  1. # -*- coding: utf-8 -*-
  2. #
  3. # Licensed to the Apache Software Foundation (ASF) under one
  4. # or more contributor license agreements. See the NOTICE file
  5. # distributed with this work for additional information
  6. # regarding copyright ownership. The ASF licenses this file
  7. # to you under the Apache License, Version 2.0 (the
  8. # "License"); you may not use this file except in compliance
  9. # with the License. You may obtain a copy of the License at
  10. #
  11. # http://www.apache.org/licenses/LICENSE-2.0
  12. #
  13. # Unless required by applicable law or agreed to in writing,
  14. # software distributed under the License is distributed on an
  15. # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  16. # KIND, either express or implied. See the License for the
  17. # specific language governing permissions and limitations
  18. # under the License.
  19. """
  20. ### Tutorial Documentation
  21. Documentation that goes along with the Airflow tutorial located
  22. [here](https://airflow.incubator.apache.org/tutorial.html)
  23. """
  24. # 导入依赖库
  25. import airflow
  26. from airflow import DAG
  27. # 导入特定的执行器
  28. from airflow.operators.bash_operator import BashOperator
  29. from datetime import timedelta
  30. # these args will get passed on to each operator
  31. # you can override them on a per-task basis during operator initialization
  32. # 显式地将一组参数传递给每个任务的构造函数作为默认参数
  33. default_args = {
  34. 'owner': 'airflow',
  35. 'depends_on_past': False,
  36. 'start_date': airflow.utils.dates.days_ago(2),
  37. 'email': ['airflow@example.com'],
  38. 'email_on_failure': False,
  39. 'email_on_retry': False,
  40. 'retries': 1,
  41. 'retry_delay': timedelta(minutes=5),
  42. # 'queue': 'bash_queue',
  43. # 'pool': 'backfill',
  44. # 'priority_weight': 10,
  45. # 'end_date': datetime(2016, 1, 1),
  46. # 'wait_for_downstream': False,
  47. # 'dag': dag,
  48. # 'adhoc':False,
  49. # 'sla': timedelta(hours=2),
  50. # 'execution_timeout': timedelta(seconds=300),
  51. # 'on_failure_callback': some_function,
  52. # 'on_success_callback': some_other_function,
  53. # 'on_retry_callback': another_function,
  54. # 'trigger_rule': u'all_success'
  55. }
  56. # 我们需要一个DAG对象来嵌入我们的任务。 这里我们需要传递一个定义dag_id的字符串,它用作DAG的唯一标识符。 我们还传递我们刚刚定义的默认参数字典,并为DAG定义1天的schedule_interval。
  57. dag = DAG(
  58. 'tutorial',
  59. default_args=default_args,
  60. description='A simple tutorial DAG',
  61. schedule_interval=timedelta(days=1))
  62. # 生成任务。 从operator实例化的对象称为构造器。 第一个参数task_id充当任务的唯一标识符。任务必须包含或继承参数task_id和owner,否则Airflow将引发异常。
  63. # t1, t2 and t3 are examples of tasks created by instantiating operators
  64. t1 = BashOperator(
  65. task_id='print_date',
  66. bash_command='date',
  67. dag=dag)
  68. t1.doc_md = """\
  69. #### Task Documentation
  70. You can document your task using the attributes `doc_md` (markdown),
  71. `doc` (plain text), `doc_rst`, `doc_json`, `doc_yaml` which gets
  72. rendered in the UI's Task Instance Details page.
  73. ![img](http://montcs.bloomu.edu/~bobmon/Semesters/2012-01/491/import%20soul.png)
  74. """
  75. dag.doc_md = __doc__
  76. t2 = BashOperator(
  77. task_id='sleep',
  78. depends_on_past=False,
  79. bash_command='sleep 5',
  80. dag=dag)
  81. # Airflow充分利用了Jinja Templating的强大功能,并提供了一组内置参数和宏。 Airflow还支持自定义参数、宏和模板的钩子。下面的示例使用到了最常见的模板变量:{{ ds}}(今天的“日期戳”)
  82. templated_command = """
  83. {% for i in range(5) %}
  84. echo "{{ ds }}"
  85. echo "{{ macros.ds_add(ds, 7)}}"
  86. echo "{{ params.my_param }}"
  87. {% endfor %}
  88. """
  89. t3 = BashOperator(
  90. task_id='templated',
  91. depends_on_past=False,
  92. bash_command=templated_command,
  93. params={'my_param': 'Parameter I passed in'},
  94. dag=dag)
  95. # 配置任务之间的依赖关系,使得任务t2和t3依赖于t1
  96. t2.set_upstream(t1)
  97. t3.set_upstream(t1)

Airflow所支持的更多变理和宏:https://airflow.apache.org/code.html#macros

4、测试

将脚本文件放在指定路径下,执行一次,如果没有报错,说明脚本文件中不存在语法错误。

python ~/airflow/dags/tutorial.py

命令行元数据验证

  1. # print the list of active DAGsairflow list_dags# prints the list of tasks the "tutorial" dag_idairflow list_tasks tutorial# prints the hierarchy of tasks in the tutorial DAGairflow list_tasks tutorial --tree
  2. ## Testing
  3. # testing print_date
  4. airflow test tutorial print_date 2018-09-06
  5. # testing sleep
  6. airflow test tutorial sleep 2018-09-06
  7. # testing templated
  8. airflow test tutorial templated 2018-09-06
  9. ## Backfill
  10. # optional, start a web server in debug mode in the background
  11. airflow webserver --debug &
  12. # start your backfill on a date range
  13. airflow backfill tutorial -s 2018-09-06 -e 2018-09-07

5、重要概念

5.1 DAGs

DAG不关心其组成任务的作用; 它的工作是确保一批任务可以在正确的时间或正确的顺序发生,或可以正确处理任何意外的问题。

DAG在标准Python文件中定义,这些文件放在Airflow的DAG_FOLDER中。 Airflow将执行每个文件中的代码以动态构建DAG对象。 您可以拥有任意数量的DAG,每个DAG都描述任意数量的任务。 通常,每个应该对应于单个逻辑工作流。

搜索DAG时,Airflow将仅考虑字符串“airflow”和“DAG”都出现在.py文件内容中的文件。

Scope

全局DAG和本地DAG

Airflow将加载它可以从DAG文件导入的任何DAG对象。 重要的是,这意味着DAG必须出现在globals()中。 

以下两个DAG,将只会加载dag_1; 另一个只出现在本地范围内。

  1. dag_1 = DAG('this_dag_will_be_discovered')
  2. def my_function():
  3. dag_2 = DAG('but_this_dag_will_not')
  4. my_function()

有时这可以很好地利用。 例如,SubDagOperator的常见模式是定义函数内的子标记,以便Airflow不会尝试将其作为独立的DAG加载。

Default Arguments

如果将default_args字典传递给DAG,它将把它们应用于任何operator。 这使得很容易将公共参数应用于许多operator而无需多次的输入。

  1. default_args = {
  2. 'start_date': datetime(2016, 1, 1),
  3. 'owner': 'Airflow'}
  4. dag = DAG('my_dag', default_args=default_args)
  5. op = DummyOperator(task_id='dummy', dag=dag)
  6. print(op.owner) # Airflow

Context Manager

DAG可用作上下文管理器,以自动将新operator分配给该DAG。

5.2 Operators

虽然DAG描述了如何运行工作流,但是由operator确定实际完成的工作。

operator描述工作流中的单个任务。 operators通常(但并非总是)是原子的,这意味着他们可以独立运行,而不需要与任何其他operators共享资源。 DAG将确保operators以正确的顺序运行; 除了这些依赖项之外,operators通常独立运行。 实际上,它们可能在两台完全不同的机器上运行。

这是一个微妙但非常重要的一点:通常,如果两个operators需要共享信息,如文件名或少量数据,您应该考虑将它们组合到一个运算符中。 如果无法绝对避免,Airflow确实也提供了operators交叉通信的功能,称为XCom,本文档的其他部分对此进行了描述。

Airflow为许多常见任务提供operator支持,包括:

  • BashOperator - executes a bash command

  • PythonOperator - calls an arbitrary Python function

  • EmailOperator - sends an email

  • SimpleHttpOperator - sends an HTTP request

  • MySqlOperator, SqliteOperator, PostgresOperator, MsSqlOperator, OracleOperator, JdbcOperator, etc. - executes a SQL command

  • Sensor - waits for a certain time, file, database row, S3 key, etc…

除了这些基本构建块之外,还有许多特定的operators:

DockerOperator, HiveOperator, S3FileTransformOperator, PrestoToMysqlOperator, SlackOperator

airflow/contrib/目录则包含了更多由社区构建的operators。 这些operators并不总是像主发行版中那样完整或经过良好测试,但允许用户更轻松地向平台添加新功能。

Airflow operator的使用方法参见:https://airflow.apache.org/howto/operator.html

DAG Assignment

operator不必立即分配给DAG(之前的dag是必需参数)。 但是,一旦将operator分配给DAG,就无法转移或取消分配。 在创建operator时,通过延迟赋值或甚至从其他operator推导,可以显式地完成DAG分配。

  1. dag = DAG('my_dag', start_date=datetime(2016, 1, 1))
  2. # sets the DAG explicitlyexplicit_op = DummyOperator(task_id='op1', dag=dag)
  3. # deferred DAG assignmentdeferred_op = DummyOperator(task_id='op2')deferred_op.dag = dag
  4. # inferred DAG assignment (linked operators must be in the same DAG)inferred_op = DummyOperator(task_id='op3')inferred_op.set_upstream(deferred_op)

Bitshift Composition 位移组合结构

传统上,使用set_upstream()和set_downstream()方法设置operators之间的依赖关系。 在Airflow 1.8中,这可以通过Python bitshift操作符">>"和"<<"来完成。 

例如,以下四个语句在功能上都是等效的:

  1. op1 >> op2
  2. op1.set_downstream(op2)
  3. op2 << op1
  4. op2.set_upstream(op1)

当使用bitshift组合运算符时,关系设置在bitshift运算符指向的方向上。 例如,op1 >> op2表示op1先运行,op2运行第二。 可以组成多个运算符 - 请记住,链从左到右执行,并且始终返回最右边的对象。 例如:

op1 >> op2 >> op3 << op4

上面这行配置的效果相当于下面的依赖关系设置:

  1. op1.set_downstream(op2)
  2. op2.set_downstream(op3)
  3. op3.set_upstream(op4)

bitshift操作符同样也能使用于DAGs,例如:

dag >> op1 >> op2

相当于:

  1. op1.dag = dag
  2. op1.set_downstream(op2)

通过使用bitshift操作符,我们可以把下面多个operators打包成为一个简单的pipeline:

  1. with DAG('my_dag', start_date=datetime(2016, 1, 1)) as dag:
  2. (
  3. DummyOperator(task_id='dummy_1')
  4. >> BashOperator(
  5. task_id='bash_1',
  6. bash_command='echo "HELLO!"')
  7. >> PythonOperator(
  8. task_id='python_1',
  9. python_callable=lambda: print("GOODBYE!"))
  10. )

5.3 Tasks

一旦operator被实例化,它就被称为“任务”。 实例化的过程中会调用抽象运算符时定义特定值,参数化后的任务便成为了DAG中的一个节点。

任务实例表示任务的特定运行时,其特征在于dag、任务和时间点的组合。 任务实例也有一个运行状态,可以是“运行”、“成功”、“失败”、“跳过”、“重试”等。

Workflows

您现在熟悉了Airflow的核心构建模块。 有些概念可能听起来非常相似,但词汇表可以概念化如下:

  • DAG: 描述工作应该发生的顺序

  • Operator: 作为执行某些工作的模板的类

  • Task: operator的参数化实例

  • Task Instance: task的运行时实例

  • 已经被分配了DAG

  • 具有与DAG的特定运行相关联的状态

通过组合DAG、Operators来创建TaskInstances,您可以构建出复杂的工作流程。

6、附加功能

除了核心Airflow对象之外,还有许多更复杂的功能可以实现限制同时访问资源、交叉通信、条件执行等行为。

6.1 Hooks

钩子是外部平台和数据库的接口,如Hive,S3,MySQL,Postgres,HDFS和Pig。 Hooks尽可能实现通用接口,并充当operator的构建块。 他们使用airflow.models.Connection模型来获取主机名和身份验证信息。 钩子将身份验证代码和信息保存在pipeline之外,集中在元数据数据库中。

钩子在Python脚本,Airflow airflow.operators.PythonOperator以及iPython或Jupyter Notebook等交互式环境中使用它们也非常有用。

6.2 Pools

当有太多进程同时需要执行时,某些系统可能会被淹没。 Airflow pools可用于限制任意任务集上的并发执行。 要以在UI(菜单 - >管理 - >pools)中管理pools列表,通过为pools命名并为其分配多个worker slots。 然后在创建任务时(即实例化operators),可以通过使用pools参数将task与现有pools之一相关联。

  1. aggregate_db_message_job = BashOperator(
  2. task_id='aggregate_db_message_job',
  3. execution_timeout=timedelta(hours=3),
  4. pool='ep_data_pipeline_db_msg_agg',
  5. bash_command=aggregate_db_message_job_cmd,
  6. dag=dag)aggregate_db_message_job.set_upstream(wait_for_empty_queue)

pool参数可以与priority_weight结合使用,以定义队列中的优先级,以及在pool中打开的slot时首先执行哪些任务。 默认的priority_weight是1,可以使用任何数字。 在对队列进行排序以评估接下来应该执行哪个任务时,我们使用priority_weight,与来自此任务下游任务的所有priority_weight值相加。 您可以使用它来执行特定的重要任务,并相应地优先处理该任务的整个路径。

当slot被填满时,任务将照常安排。 达到容量后,可运行的任务将排队,其状态将在UI中显示。 当插槽空闲时,排队的任务将根据priority_weight(任务及其后代)开始运行。

请注意,默认情况下,任务不会分配给任何池,并且它们的执行的并行性仅限于执行程序的设置。

6.3 Connections

外部系统的连接信息存储在Airflow元数据数据库中并在UI中进行管理(菜单 - >管理 - >连接)。在那里定义了conn_id,并附加了主机名/登录/密码/schema信息。 Airflow pipelines可以简单地引用集中配置管理中的conn_id,而无需在任何地方硬编码任何此类信息。

可以定义具有相同conn_id的许多连接,并且在这种情况下,并且当hooks使用来自BaseHook的get_connection方法时,Airflow将随机选择一个连接,允许提供一些基本的负载平衡和容错。

Airflow还能够通过操作系统中的环境变量引用连接信息。 但它只支持URI格式。 如果您需要为连接指定extra内容,请使用Web UI。

如果在Airflow元数据数据库和环境变量中都定义了具有相同conn_id的连接,则Airflow将仅引用环境变量中的连接(例如,给定conn_id postgres_master,Airflow将首先在环境变量中搜索AIRFLOW_CONN_POSTGRES_MASTER并直接引用它)。

任何钩子都有一个默认的conn_id,其中使用该钩子的operators不需要显式提供conn_id。 例如,PostgresHook的默认conn_id是postgres_default。

更多的关于Connections的使用方法请参照:https://airflow.apache.org/howto/manage-connections.html

6.4 Queues

使用CeleryExecutor时,可以指定发送任务的celery队列。 queue是BaseOperator的一个属性,因此任何任务都可以分配给任何队列。 环境的默认队列在airflow.cfg的celery - > default_queue中定义。 这定义了未指定任务时分配给的队列,以及Airflow工作程序在启动时侦听的队列。

Workers可以监听一个或多个任务队列。 当工作程序启动时(使用airflow worker命令),可以指定一组由逗号分隔的队列名称(例如,airflow worker -q spark)。 然后,该工作人员将仅接收连接到指定队列的任务。

如果您需要专用的workers,从资源角度来看(例如,一个worker可以毫无问题地执行数千个任务)或者从环境角度(比如您希望worker从Spark群集中运行,这可能非常有用 本身,因为它需要一个非常具体的环境和安全权限)。

6.5 XComs

XComs允许任务间交换消息,允许更细微的控制形式和共享状态。 该名称是“cross-communication”的缩写。 XComs主要由一个key, value和timestamp所定义,但也跟踪创建XCom的task/DAG,以及何时应该可见的属性。 任何可以被pickled的对象都可以用作XCom值,因此用户应该确保使用适当大小的对象。

XComs支持“推”(发送)或“拉”(接收)的方式处理消息。 当任务推送XCom时,它通常可用于其他任务。 任务可以通过调用xcom_push()方法随时推送XComs。 此外,如果任务返回一个值(来自其Operator的execute()方法,或者来自PythonOperator的python_callable函数),则会自动推送包含该值的XCom。

Tasks可以调用xcom_pull()来检索XComs,可选地根据key、source task_ids和source dag_id等条件应用过滤器。 默认情况下,xcom_pull()会过滤出从执行函数返回时被自动赋予XCom的键(与手动推送的XCom相反)。

如果为task_ids传递xcom_pull单个字符串,则返回该任务的最新XCom值; 如果传递了task_ids列表,则返回相应的XCom值列表。

  1. # inside a PythonOperator called 'pushing_task'def push_function():
  2. return value
  3. # inside another PythonOperator where provide_context=Truedef pull_function(**context):
  4. value = context['task_instance'].xcom_pull(task_ids='pushing_task')

也可以直接在模板中pull XCom,这是一个示例:

SELECT * FROM {{ task_instance.xcom_pull(task_ids='foo', key='table_name') }}

请注意,XCom与变量类似,但专门用于任务间通信而非全局设置。

6.6 Variables

变量是将任意内容或配置作为一个key/value简单键值存储的通用方法。 可以从UI(管理 - >变量),代码或CLI列出,创建,更新和删除变量。 此外,json配置文件可以通过UI批量上传。 虽然pipeline代码定义和大多数常量和变量应该在代码中定义并存储在源代码控制中,但是通过UI可以访问和修改某些变量或配置项会很有用。

  1. from airflow.models import Variable
  2. foo = Variable.get("foo")
  3. bar = Variable.get("bar", deserialize_json=True)

第二个调用假设是json内容,并将其反序列化为bar。 请注意,Variable是sqlalchemy模型,可以这样使用。

你可以在jinja模板中按下面方法引用变量:

echo {{ var.value.<variable_name> }}

或者如果需要从变量反序列化json对象:

echo {{ var.json.<variable_name> }}

7.7 Branching

有时您需要一个工作流分支,或者只根据任意条件走下某条路径,这通常与上游任务中发生的事情有关。 一种方法是使用BranchPythonOperator。

BranchPythonOperator与PythonOperator非常相似,只是它需要一个返回task_id的python_callable。 返回task_id,并跳过所有其他路径。 Python函数返回的task_id必须直接引用BranchPythonOperator任务下游的任务。

请注意,在BranchPythonOperator的下游任务中使用depends_on_past = True,这在逻辑上是不合理的,因为skipped状态将总是

因为他们过去的successes而造成task的堵塞。

如果你想跳过一些任务,请记住你不能有一个空路径,如果是这样,那就做一个虚设任务。

像这样,跳过虚拟任务“branch_false”

6.8 SubDAGs

SubDAG非常适合重复模式。 在使用Airflow时,定义一个返回DAG对象的函数是一个很好的设计模式。

Airbnb在加载数据时使用stage-check-exchange模式。 数据在临时表中暂存,然后对该表执行数据质量检查。 一旦检查全部通过,分区就会移动到生产表中。

再举一个例子,考虑以下DAG:

我们可以将所有并行task-* operators组合到一个SubDAG中,生成的DAG类似于以下内容:

请注意,SubDAG operators应包含返回DAG对象的工厂方法。 这将阻止SubDAG在main UI中被视为单独的DAG。

例如:

  1. #dags/subdag.py
  2. from airflow.models import DAG
  3. from airflow.operators.dummy_operator import DummyOperator
  4. # Dag is returned by a factory method
  5. def sub_dag(parent_dag_name, child_dag_name, start_date, schedule_interval):
  6. dag = DAG(
  7. '%s.%s' % (parent_dag_name, child_dag_name),
  8. schedule_interval=schedule_interval,
  9. start_date=start_date,
  10. )
  11. dummy_operator = DummyOperator(
  12. task_id='dummy_task',
  13. dag=dag,
  14. )
  15. return dag

然后可以在主DAG文件中引用此SubDAG:

  1. # main_dag.pyfrom datetime import datetime, timedeltafrom airflow.models import DAGfrom airflow.operators.subdag_operator import SubDagOperatorfrom dags.subdag import sub_dag
  2. PARENT_DAG_NAME = 'parent_dag'CHILD_DAG_NAME = 'child_dag'
  3. main_dag = DAG(
  4. dag_id=PARENT_DAG_NAME,
  5. schedule_interval=timedelta(hours=1),
  6. start_date=datetime(2016, 1, 1))
  7. sub_dag = SubDagOperator(
  8. subdag=sub_dag(PARENT_DAG_NAME, CHILD_DAG_NAME, main_dag.start_date,
  9. main_dag.schedule_interval),
  10. task_id=CHILD_DAG_NAME,
  11. dag=main_dag,)

您可以从主DAG的Graph视图放大SubDagOperator,以显示SubDAG中包含的任务:

使用SubDAG时的一些其他提示:

  • 按照惯例,SubDAG的dag_id应以其父级和点为前缀。 和在parent.child中一样。
  • 通过将参数传递给SubDAG operator来共享主DAG和SubDAG之间的参数(如上所示)。
  • SubDAG必须有一个计划并启用。 如果SubDAG的时间表设置为None或@once,SubDAG将成功完成而不做任何事情。
  • 清除SubDagOperator也会清除其中的任务状态。
  • 在SubDagOperator上标记成功不会影响其中的任务状态。
  • 避免在SubDAG中的任务中使用depends_on_past=True,因为这可能会造成混淆。
  • 可以为SubDAG指定执行程序。 如果要在进程中运行SubDAG并有效地将其并行性限制为1,则通常使用SequentialExecutor。 使用LocalExecutor可能会有问题,因为它可能会过度订阅你的worker,在单个插槽中运行多个任务。

6.9 SLAs

服务级别协议或任务/DAG应该成功的时间可以在任务级别设置为timedelta。 如果此时一个或多个实例未成功,则会发送警报电子邮件,详细说明错过其SLA的任务列表。 该事件也记录在数据库中,并可在Web UI中浏览,其中可以分析和记录事件。

6.10 Trigger Rules

虽然正常的工作流行为是在所有直接上游任务都成功时触发任务,但Airflow允许更复杂的依赖项设置。

所有operators 都有一个trigger_rule参数,该参数定义生成的任务被触发的规则。 trigger_rule的默认值是all_success,可以定义为“当所有直接上游任务都成功时触发此任务”。 此处描述的所有其他规则都基于直接父任务,并且是在创建任务时可以传递给任何operator:

  • all_success: (default) 所有的父任务都成功。

  • all_failed: 所有的父任务或上游任务都失败。

  • all_done: 所有的父任务都完成。

  • one_failed: 一旦至少一个父任务失败了就会被触发,它不会等待所有父任务完成。

  • one_success:  一旦至少有一个父任务成功了就会被触发,它不会等待所有父任务完成。

  • dummy: 依赖只是为了展示,任意触发。

请注意,这些可以与depends_on_past(boolean)结合使用,当设置为True时,如果任务的先前计划未成功,则不会触发任务。

6.11 Latest Run Only

标准工作流行为涉及为特定日期/时间范围内运行一系列任务。 但是,某些工作流执行的任务与运行时无关,但需要按计划运行,就像标准的cron作业一样。 在这些情况下,暂停期间错过的回填或运行作业会浪费CPU周期。

对于这种情况,您可以使用LatestOnlyOperator跳过在DAG的最近计划运行期间未运行的任务。 如果现在的时间不在其execution_time和下一个计划的execution_time之间,则LatestOnlyOperator将跳过所有直接下游任务及其自身。

必须意识到跳过的任务和触发器规则之间的相互作用。 跳过的任务将通过触发器规则all_success和all_failed级联,但不是all_done,one_failed,one_success和dummy。 如果您希望将LatestOnlyOperator与不级联跳过的触发器规则一起使用,则需要确保LatestOnlyOperator直接位于您要跳过的任务的上游。

例如,考虑以下dag:

  1. #dags/latest_only_with_trigger.pyimport datetime as dt
  2. from airflow.models import DAGfrom airflow.operators.dummy_operator import DummyOperatorfrom airflow.operators.latest_only_operator import LatestOnlyOperatorfrom airflow.utils.trigger_rule import TriggerRule
  3. dag = DAG(
  4. dag_id='latest_only_with_trigger',
  5. schedule_interval=dt.timedelta(hours=4),
  6. start_date=dt.datetime(2016, 9, 20),)
  7. latest_only = LatestOnlyOperator(task_id='latest_only', dag=dag)
  8. task1 = DummyOperator(task_id='task1', dag=dag)
  9. task1.set_upstream(latest_only)
  10. task2 = DummyOperator(task_id='task2', dag=dag)
  11. task3 = DummyOperator(task_id='task3', dag=dag)
  12. task3.set_upstream([task1, task2])
  13. task4 = DummyOperator(task_id='task4', dag=dag,
  14. trigger_rule=TriggerRule.ALL_DONE)
  15. task4.set_upstream([task1, task2])
  • 在这个dag的情况下,对于除最新运行之外的所有运行,latest_only任务将显示为跳过。 

  • task1直接位于latest_only的下游,并且除了最新的之外将跳过所有运行。

  • task2完全独立于latest_only,将在所有计划的时间段内运行。

  • task3是task1和task2的下游,由于默认的trigger_rule是all_success,因此将从task1接收级联跳过。 

  • task4是task1和task2的下游,但由于其trigger_rule设置为all_done,因此一旦跳过task1(有效的完成状态)并且task2成功,它将立即触发。

6.12 Zombies & Undeads

任务实例也会死掉,这通常是正常生命周期的一部分,但有时会出乎意料。

Zombies僵尸任务的特点是没有心跳(由job定期发出)和数据库中的运行状态。 当工作节点无法访问数据库,Airflow进程在外部被终止或者节点重新启动时,它们可能会发生。 调度程序的进程会定期执行查杀僵尸任务。

Undead进程的特点是存在进程和匹配的心跳,但Airflow不知道此任务在数据库中运行。 这种不匹配通常在数据库状态发生变化时发生,最有可能是通过删除UI中“任务实例”视图中的行。 任务会被指示验证其作为例行心跳的一部分的状态,并在确定它们处于这种“undead”状态时终止自己。

6.13 Cluster Policy

在您的本地airflow配置文件中可以定义一个policy函数,该函数根据其他任务或DAG属性改变任务属性。 它接收单个参数作为对任务对象的引用,并期望改变其属性。

例如,此函数可以在使用特定运算符时应用特定队列属性,或强制执行任务超时策略,确保没有任务运行超过48小时。 以下是airflow_settings.py中的内容示例:

  1. def policy(task):
  2. if task.__class__.__name__ == 'HivePartitionSensor':
  3. task.queue = "sensor_queue"
  4. if task.timeout > timedelta(hours=48):
  5. task.timeout = timedelta(hours=48)

6.14 文档和注释

可以在Web界面中显示的dag和任务对象中添加文档或注释(dag为“Graph View”,任务为“Task Details”)。 Airflow提供了一组特殊的任务属性,用于呈现更为丰富的内容:

请注意,对于dags,doc_md是可使用的唯一注释属性。

如果您的任务是从配置文件动态构建的,则此功能特别有用,它允许您公开Airflow中相关任务的配置信息。

  1. """
  2. ### My great DAG
  3. """
  4. dag = DAG('my_dag', default_args=default_args)
  5. dag.doc_md = __doc__
  6. t = BashOperator("foo", dag=dag)
  7. t.doc_md = """\
  8. #Title"
  9. Here's a [url](www.airbnb.com)
  10. """

此内容将分别在“图表视图”和“任务详细信息”页面中被煊染为markdown格式。

6.15 Jinja Templating

Airflow充分利用了Jinja Templating的强大功能,这可以成为与宏结合使用的强大工具。

例如,假设您希望使用BashOperator将执行日期作为环境变量传递给Bash脚本。

  1. # The execution date as YYYY-MM-DD
  2. date = "{{ ds }}"
  3. t = BashOperator(
  4. task_id='test_env',
  5. bash_command='/tmp/test.sh ',
  6. dag=dag,
  7. env={'EXECUTION_DATE': date})

这里,{{ds}}是一个宏,并且由于BashOperator的env参数是使用Jinja模板化的,因此执行日期将作为Bash脚本中名为EXECUTION_DATE的环境变量提供。

您可以将Jinja模板与文档中标记为“templated”的每个参数一起使用。 模板替换发生在调用operator的pre_execute函数之前。

6.6 Packaged dags

虽然通常会在单个.py文件中指定dags,但有时可能需要将dag及其依赖项打包组合在一起。 

例如,您可能希望将多个dag组合在一起以将它们一起管理版本,或者您可能需要一个额外的模块,默认情况下在您运行airflow的系统上不可用。 为此,您可以创建一个zip文件,其中包含zip文件根目录中的dag,并在目录中解压缩额外的模块。

例如,您可以创建一个如下所示的zip文件:

  1. my_dag1.py
  2. my_dag2.py
  3. package1/__init__.py
  4. package1/functions.py

Airflow将扫描zip文件并尝试加载my_dag1.py和my_dag2.py。 它不会进入子目录,因为它们被认为是潜在的包。

如果您想将模块依赖项添加到DAG,您基本上也会这样做,但是更多的是使用virtualenv和pip。

  1. virtualenv zip_dag
  2. source zip_dag/bin/activate
  3. mkdir zip_dag_contents
  4. cd zip_dag_contents
  5. pip install --install-option="--install-lib=$PWD" my_useful_package
  6. cp ~/my_dag.py .
  7. zip -r zip_dag.zip *

注1:zip文件将插入模块搜索列表(sys.path)的开头,因此它将可用于驻留在同一解释器中的任何其他代码。

注2:打包的dags不能与打开pickling时一起使用。

注3:打包的dag不能包含动态库(例如libz.so),如果模块需要这些库,则需要在系统上使用这些库。 换句话说,只能打包纯python模块。

 

参考资料:

https://airflow.apache.org/concepts.html

系统研究Airbnb开源项目airflow http://www.cnblogs.com/harrychinese/p/airflow.html

Airflow使用入门指南  https://blog.csdn.net/qazplm12_3/article/details/53065654

Airflow实战  http://ju.outofmemory.cn/entry/245373

一个非常好用的data pipeline管理工具 airflow  https://www.jianshu.com/p/59d69981658a

使用 Airflow 替代你的 crontab  https://www.juhe.cn/news/index/id/2365

生产环境使用airflow  https://www.douban.com/note/620024057/

Airflow 中的技巧和陷阱  https://segmentfault.com/a/1190000005078547

airflow 简明指南  https://www.v2ex.com/t/331460

airflow探索篇  https://segmentfault.com/a/1190000012803744

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小丑西瓜9/article/detail/130815
推荐阅读
相关标签
  

闽ICP备14008679号