Airflow

Reference

Components of Airflow

Airflow 有一些重要的组件

Metastore Database

Contains information about the status of Tasks, DAGs, Variables, Connections, etc.

WebServer

守护进程,使用 gunicorn 服务器(相当于 java 中的 tomcat )处理并发请求,可通过修改{AIRFLOW_HOME}/airflow.cfg文件中workers的值来控制处理并发请求的进程数,workers = 4表示开启 4 个 gunicorn worker(进程)处理 web 请求。webserver 提供以下功能:

  • 中止、恢复、触发任务。
  • 监控正在运行的任务,断点续跑任务。
  • 执行 ad-hoc 命令或 SQL 语句来查询任务的状态,日志等详细信息。
  • 配置连接,包括不限于数据库、ssh 的连接等。

Scheduler

守护进程,调度器,它周期性地轮询任务的调度计划,以确定是否触发任务执行

Executor

执行器,Airflow 本身是一个综合平台,它兼容多种组件,所以在使用的时候有多种方案可以选择。比如最关键的几个执行器:

  • Debug Executor: 单进程顺序执行任务,默认执行器,通常只用于测试
  • Celery Executor: 分布式调度任务,生产环境常用。Celery 是一个借助队列机制实现的分布式任务调度框架,它本身无队列功能,需要借助第三方组件,比如 Redis 或者 RabbitMQ。
    • Celery 的任务队列包含两个重要的组件
      1. Broker: 存储要执行的命令列表,需要借助第三方的用于收发消息的消息中间件(Message Broker),如 RabbitMQ、Redis
      2. Result Backend: 存储已完成命令的状态,一般存储到 Database
    • 当调度器executor = CeleryExecutor时,包含两个重要的守护进程:
      1. Celery Worker: 守护进程,通过airflow worker -D启动一个或多个 Celery 的任务队列,负责执行具体的 DAG 任务,默认队列名为default
      2. Celery Flower: 守护进程,通过airflow flower -D启动,消息队列监控工具,用于监控 Celery 消息队列,默认的端口为5555,可以在浏览器地址栏中输入http://127.0.0.1:5555来访问
  • Dask Executor: 动态任务调度,主要用于数据分析
  • Kubernetes Executor

Common Concepts of Airflow

  • DAG:即有向无环图(Directed Acyclic Graph),将所有需要运行的 Tasks 按照依赖关系组织起来,描述的是所有 Tasks 执行顺序。
  • Operator:可以简单理解为一个class,描述了 DAG 中某个的 task 具体要做的事。其中,airflow 内置了很多 operators,如 BashOperator 执行一个 bash 命令,PythonOperator 调用任意的 Python 函数,EmailOperator 用于发送邮件,HTTPOperator 用于发送HTTP请求, SqlOperator 用于执行SQL命令等等,同时,用户可以自定义 Operator,这给用户提供了极大的便利性。Using Operators
  • Task:Task 是 Operator 的一个实例,也就是 DAGs 中的一个 Node。
  • Task Instance:Task的一次运行。Web 界面中可以看到 Task Instance
  • Task Lifecycle
  • Task Relationship:DAGs 中的不同 Tasks 之间可以有依赖关系,如Task1 >> Task2,表明 Task2 依赖于 Task1 的结果。
  • Workflow通过将 DAGs 和 Operators 结合起来,用户就可以创建各种复杂的工作流(workflow)。

Working Principle of Airflow

这里只说 CeleryExecutor 的工作原理。

Install Airflow

$ sudo pip install --upgrade pip
$ sudo pip install -U setuptools

$ export AIRFLOW_HOME=~/airflow
$ pip install apache-airflow
$ cd $PYTHON_HOME/lib/python2.7/sit-packages/airflow/bin
# 将 airflow 安装到刚刚设置的 AIRFLOW_HOME 目录下
$ ./airflow

$ cp -r $PYTHON_HOME/lib/python2.7/sit-packages/airflow/bin ~/airflow

$ ls -l ~/airflow
total 40
-rw-rw-r-- 1 zhangqiang zhangqiang 36475 Jun 28 18:35 airflow.cfg
drwxrwxr-x 3 zhangqiang zhangqiang    23 Jun 28 18:35 logs
-rw-rw-r-- 1 zhangqiang zhangqiang  2588 Jun 28 18:35 unittests.cfg



# 修改配置文件,所有节点公用一份
$ vim $AIRFLOW_HOME/airflow.cfg
# 配置 Metestore Database 
sql_alchemy_conn = mysql://{USERNAME}:{PASSWORD}@{MYSQL_HOST}:3306/airflow
# 配置 Broker - RabbitMQ
# http://docs.celeryproject.org/en/latest/userguide/configuration.html#broker-settings
broker_url = amqp://{USERNAME}:{PASSWORD}@{RABBITMQ_HOST}:5672/
# 或者配置 Broker - Redis,这里指定使用 Redis db 0
# broker_url = redis://:{PASSWORD}@{REDIS_HOST}:6379/0
# 配置 Result Backend
# http://docs.celeryproject.org/en/latest/userguide/configuration.html#task-result-backend-settings
result_backend = db+mysql://{USERNAME}:{PASSWORD}@{MYSQL_HOST}:3306/airflow

# 安装 mysql 依赖包,CentOS 
$ sudo yum install -y python-devel mysql-devel
# Ubuntu
$ sudo apt-get install -y python-dev python-MySQLdb
$ pip install 'apache-airflow[mysql]'

$ pip install 'apache-airflow[celery]'

$ ./airflow initdb

# Start the web server, default port is 8080
$ ./airflow webserver -D -p 8080

# Start the scheduler
$ ./airflow scheduler -D

# Start Celery Worker, 可以在多个节点运行多个 worker
$ ./airflow worker -D

Define DAGs

$ mkdir $AIRFLOW_HOME/dags

# 定义 DAG
$ vim $AIRFLOW_HOME/dags/tutorial.py
#!/usr/bin/python
# -*- coding: utf-8 -*-
from datetime import timedelta
# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG
# Operators; we need this to operate!
from airflow.operators.bash_operator import BashOperator
from airflow.utils.dates import days_ago

# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': days_ago(2),
    'email': ['[email protected]'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'end_date': datetime(2016, 1, 1),
    # 'wait_for_downstream': False,
    # 'dag': dag,
    # 'sla': timedelta(hours=2),
    # 'execution_timeout': timedelta(seconds=300),
    # 'on_failure_callback': some_function,
    # 'on_success_callback': some_other_function,
    # 'on_retry_callback': another_function,
    # 'sla_miss_callback': yet_another_function,
    # 'trigger_rule': 'all_success'
}
dag = DAG(
    dag_id='tutorial',
    default_args=default_args,
    description='A simple tutorial DAG',
    schedule_interval=timedelta(days=1),
)

# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = BashOperator(
    task_id='print_date',
    bash_command='date',
    dag=dag,
)

t2 = BashOperator(
    task_id='sleep',
    depends_on_past=False,
    bash_command='sleep 5',
    retries=3,
    dag=dag,
)
dag.doc_md = __doc__

t1.doc_md = """\
#### Task Documentation
You can document your task using the attributes `doc_md` (markdown),
`doc` (plain text), `doc_rst`, `doc_json`, `doc_yaml` which gets
rendered in the UI's Task Instance Details page.
![img](http://montcs.bloomu.edu/~bobmon/Semesters/2012-01/491/import%20soul.png)
"""
templated_command = """

"""

t3 = BashOperator(
    task_id='templated',
    depends_on_past=False,
    bash_command=templated_command,
    params={'my_param': 'Parameter I passed in'},
    dag=dag,
)

t1 >> [t2, t3]

# 检查是否有错误,如果命令行没有报错,就表示没太大问题。
$ python $AIRFLOW_HOME/dags/tutorial.py
# 查看生效的 DAGs
$ airflow list_dags -sd $AIRFLOW_HOME/dags

Commands

# Usage
$ ./airflow -h
$ ./airflow <command> -h

# 测试任务 airflow test dag_id task_id execution_time
$ ./airflow test dag_id task_id2019-09-10

# 开始运行任务(这一步也可以在web界面点trigger按钮)
$ ./airflow trigger_dag test_task

# 守护进程运行webserver, 默认端口为8080,也可以通过`-p`来指定
$ ./airflow webserver -D  

# 守护进程运行调度器     
$ ./airflow scheduler -D   

# 守护进程运行调度器    
$ ./airflow worker -D          

# 暂停任务
$ ./airflow pause dag_id      

# 取消暂停,等同于在web管理界面打开off按钮
$ ./airflow unpause dag_id     

# 查看task列表
$ ./airflow list_tasks dag_id

# 清空任务状态
$ ./airflow clear dag_id       

# 运行task
$ ./airflow run dag_id task_id execution_date

打赏一个呗

取消

感谢您的支持,我会继续努力的!

扫码支持
扫码支持
扫码打赏,一毛也是爱

打开支付宝扫一扫,即可进行扫码打赏哦