Reference
Components of Airflow
Airflow 有一些重要的组件
Metastore Database
Contains information about the status of Tasks, DAGs, Variables, Connections, etc.
WebServer
守护进程,使用 gunicorn 服务器(相当于 java 中的 tomcat )处理并发请求,可通过修改{AIRFLOW_HOME}/airflow.cfg
文件中workers
的值来控制处理并发请求的进程数,workers = 4
表示开启 4 个 gunicorn worker(进程)处理 web 请求。webserver 提供以下功能:
- 中止、恢复、触发任务。
- 监控正在运行的任务,断点续跑任务。
- 执行 ad-hoc 命令或 SQL 语句来查询任务的状态,日志等详细信息。
- 配置连接,包括不限于数据库、ssh 的连接等。
Scheduler
守护进程,调度器,它周期性地轮询任务的调度计划,以确定是否触发任务执行
Executor
执行器,Airflow 本身是一个综合平台,它兼容多种组件,所以在使用的时候有多种方案可以选择。比如最关键的几个执行器:
- Debug Executor: 单进程顺序执行任务,默认执行器,通常只用于测试
- Celery Executor: 分布式调度任务,生产环境常用。Celery 是一个借助队列机制实现的分布式任务调度框架,它本身无队列功能,需要借助第三方组件,比如 Redis 或者 RabbitMQ。
- Celery 的任务队列包含两个重要的组件
- Broker: 存储要执行的命令列表,需要借助第三方的用于收发消息的消息中间件(Message Broker),如 RabbitMQ、Redis
- Result Backend: 存储已完成命令的状态,一般存储到 Database
- 当调度器
executor = CeleryExecutor
时,包含两个重要的守护进程:- Celery Worker: 守护进程,通过
airflow worker -D
启动一个或多个 Celery 的任务队列,负责执行具体的 DAG 任务,默认队列名为default
- Celery Flower: 守护进程,通过
airflow flower -D
启动,消息队列监控工具,用于监控 Celery 消息队列,默认的端口为5555
,可以在浏览器地址栏中输入http://127.0.0.1:5555
来访问
- Celery Worker: 守护进程,通过
- Celery 的任务队列包含两个重要的组件
- Dask Executor: 动态任务调度,主要用于数据分析
- Kubernetes Executor
Common Concepts of Airflow
- DAG:即有向无环图(Directed Acyclic Graph),将所有需要运行的 Tasks 按照依赖关系组织起来,描述的是所有 Tasks 执行顺序。
- Operator:可以简单理解为一个class,描述了 DAG 中某个的 task 具体要做的事。其中,airflow 内置了很多 operators,如 BashOperator 执行一个 bash 命令,PythonOperator 调用任意的 Python 函数,EmailOperator 用于发送邮件,HTTPOperator 用于发送HTTP请求, SqlOperator 用于执行SQL命令等等,同时,用户可以自定义 Operator,这给用户提供了极大的便利性。Using Operators
- Task:Task 是 Operator 的一个实例,也就是 DAGs 中的一个 Node。
- Task Instance:Task的一次运行。Web 界面中可以看到 Task Instance
- Task Lifecycle:Task从开始到完成整个生命周期各阶段的状态。
- Task Relationship:DAGs 中的不同 Tasks 之间可以有依赖关系,如
Task1 >> Task2
,表明 Task2 依赖于 Task1 的结果。 - Workflow:通过将 DAGs 和 Operators 结合起来,用户就可以创建各种复杂的工作流(workflow)。
Working Principle of Airflow
这里只说 CeleryExecutor 的工作原理。
Install Airflow On ECS
$ sudo pip install --upgrade pip
$ sudo pip install -U setuptools
$ export AIRFLOW_HOME=~/airflow
$ pip install apache-airflow
$ cd $PYTHON_HOME/lib/python2.7/sit-packages/airflow/bin
# 将 airflow 安装到刚刚设置的 AIRFLOW_HOME 目录下
$ ./airflow
$ cp -r $PYTHON_HOME/lib/python2.7/sit-packages/airflow/bin ~/airflow
$ ls -l ~/airflow
total 40
-rw-rw-r-- 1 zhangqiang zhangqiang 36475 Jun 28 18:35 airflow.cfg
drwxrwxr-x 3 zhangqiang zhangqiang 23 Jun 28 18:35 logs
-rw-rw-r-- 1 zhangqiang zhangqiang 2588 Jun 28 18:35 unittests.cfg
# 修改配置文件,所有节点公用一份
$ vim $AIRFLOW_HOME/airflow.cfg
# 配置 Metestore Database
sql_alchemy_conn = mysql://{USERNAME}:{PASSWORD}@{MYSQL_HOST}:3306/airflow
# 配置 Broker - RabbitMQ
# http://docs.celeryproject.org/en/latest/userguide/configuration.html#broker-settings
broker_url = amqp://{USERNAME}:{PASSWORD}@{RABBITMQ_HOST}:5672/
# 或者配置 Broker - Redis,这里指定使用 Redis db 0
# broker_url = redis://:{PASSWORD}@{REDIS_HOST}:6379/0
# 配置 Result Backend
# http://docs.celeryproject.org/en/latest/userguide/configuration.html#task-result-backend-settings
result_backend = db+mysql://{USERNAME}:{PASSWORD}@{MYSQL_HOST}:3306/airflow
# 安装 mysql 依赖包,CentOS
$ sudo yum install -y python-devel mysql-devel
# Ubuntu
$ sudo apt-get install -y python-dev python-MySQLdb
$ pip install 'apache-airflow[mysql]'
$ pip install 'apache-airflow[celery]'
$ ./airflow initdb
# Start the web server, default port is 8080
$ ./airflow webserver -D -p 8080
# Start the scheduler
$ ./airflow scheduler -D
# Start Celery Worker, 可以在多个节点运行多个 worker
$ ./airflow worker -D
Install Airflow On Kubernetes
# Optional: 创建 Secrets,注意 --from-literal 参数中的 key 可以通过 secretKey 修改,默认为 mysql-password
$ kubectl create secret generic "airflow-mysql-password" --from-literal=mysql-password=123456
$ kubectl create secret generic "airflow-redis-password" --from-literal=redis-password=123456
$ helm repo add bitnami https://charts.bitnami.com/bitnami
"bitnami" has been added to your repositories
$ helm search repo airflow
NAME CHART VERSION APP VERSION DESCRIPTION
bitnami/airflow 6.3.5 1.10.10 Apache Airflow is a platform to programmaticall...
stable/airflow 7.2.0 1.10.10 Airflow is a platform to programmatically autho...
$ helm fetch bitnami/airflow
$ tar -zxvf airflow-6.3.5.tgz
$ cd airflow && ls -l
total 107
-rw-r--r-- 1 zhangqiang 197121 462 7月 2 15:42 Chart.yaml
drwxr-xr-x 1 zhangqiang 197121 0 7月 9 23:11 charts
drwxr-xr-x 1 zhangqiang 197121 0 7月 9 23:11 ci
drwxr-xr-x 1 zhangqiang 197121 0 7月 9 23:11 files
# 使用说明,可配置参数介绍
-rw-r--r-- 1 zhangqiang 197121 62104 7月 2 15:42 README.md
-rw-r--r-- 1 zhangqiang 197121 306 7月 2 15:42 requirements.lock
-rw-r--r-- 1 zhangqiang 197121 255 7月 2 15:42 requirements.yaml
drwxr-xr-x 1 zhangqiang 197121 0 7月 9 23:11 templates
# 这里面包含所有可配置的属性,可以直接修改这里面的配置项目,而不是通过参数 + 模板的方式配置
-rw-r--r-- 1 zhangqiang 197121 17578 7月 2 15:42 values.yaml
-rw-r--r-- 1 zhangqiang 197121 15403 7月 2 15:42 values-production.yaml
# 修改配置项
$ vim values.yml
# 从本地安装,或者 helm install --name-template my-airflow -f your_new_values.yaml stable/airflow
$ helm install --name-template my-airflow .
# 更新配置 helm upgrade my-airflow -f your_new_values.yaml .
$ helm upgrade my-airflow .
Release "my-airflow" has been upgraded. Happy Helming!
NAME: my-airflow
LAST DEPLOYED: Thu Jul 9 22:51:51 2020
NAMESPACE: dlink-test
STATUS: deployed
REVISION: 3
TEST SUITE: None
NOTES:
Congratulations. You have just deployed Apache Airflow!
1. Get the Airflow Service URL by running these commands:
export NODE_PORT=$(kubectl get --namespace dlink-test -o jsonpath="{.spec.ports[0].nodePort}" services my-airflow-web)
export NODE_IP=$(kubectl get nodes --namespace dlink-test -o jsonpath="{.items[0].status.addresses[0].address}")
echo http://$NODE_IP:$NODE_PORT/
2. Open Airflow in your web browser
Define DAGs
$ mkdir $AIRFLOW_HOME/dags
# 定义 DAG
$ vim $AIRFLOW_HOME/dags/tutorial.py
#!/usr/bin/python
# -*- coding: utf-8 -*-
from datetime import timedelta
# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG
# Operators; we need this to operate!
from airflow.operators.bash_operator import BashOperator
from airflow.utils.dates import days_ago
# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': days_ago(2),
'email': ['[email protected]'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
# 'wait_for_downstream': False,
# 'dag': dag,
# 'sla': timedelta(hours=2),
# 'execution_timeout': timedelta(seconds=300),
# 'on_failure_callback': some_function,
# 'on_success_callback': some_other_function,
# 'on_retry_callback': another_function,
# 'sla_miss_callback': yet_another_function,
# 'trigger_rule': 'all_success'
}
dag = DAG(
dag_id='tutorial',
default_args=default_args,
description='A simple tutorial DAG',
schedule_interval=timedelta(days=1),
)
# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = BashOperator(
task_id='print_date',
bash_command='date',
dag=dag,
)
t2 = BashOperator(
task_id='sleep',
depends_on_past=False,
bash_command='sleep 5',
retries=3,
dag=dag,
)
dag.doc_md = __doc__
t1.doc_md = """\
#### Task Documentation
You can document your task using the attributes `doc_md` (markdown),
`doc` (plain text), `doc_rst`, `doc_json`, `doc_yaml` which gets
rendered in the UI's Task Instance Details page.
![img](http://montcs.bloomu.edu/~bobmon/Semesters/2012-01/491/import%20soul.png)
"""
templated_command = """
"""
t3 = BashOperator(
task_id='templated',
depends_on_past=False,
bash_command=templated_command,
params={'my_param': 'Parameter I passed in'},
dag=dag,
)
t1 >> [t2, t3]
# 检查是否有错误,如果命令行没有报错,就表示没太大问题。
$ python $AIRFLOW_HOME/dags/tutorial.py
# 查看生效的 DAGs
$ airflow list_dags -sd $AIRFLOW_HOME/dags
Commands
Usages
$ ./airflow -h
$ ./airflow <command> -h
DAG Test
使用airflow [test] <dag_id> <task_id> <execution_time>
测试任务 ,很有用的命令,可以提前测试 DAG 是否有问题
$ airflow test meta-scanner-streaming meta-scanner-streaming-submit 2020-08-03
[2020-08-03 07:30:02,634] {__init__.py:51} INFO - Using executor CeleryExecutor
[2020-08-03 07:30:02,635] {dagbag.py:396} INFO - Filling up the DagBag from /opt/bitnami/airflow/dags
[2020-08-03 07:30:03,200] {taskinstance.py:669} INFO - Dependencies all met for <TaskInstance: meta-scanner-streaming.meta-scanner-streaming-submit 2020-08-03T00:00:00+00:00 [None]>
[2020-08-03 07:30:03,263] {taskinstance.py:669} INFO - Dependencies all met for <TaskInstance: meta-scanner-streaming.meta-scanner-streaming-submit 2020-08-03T00:00:00+00:00 [None]>
[2020-08-03 07:30:03,263] {taskinstance.py:879} INFO -
--------------------------------------------------------------------------------
[2020-08-03 07:30:03,263] {taskinstance.py:880} INFO - Starting attempt 1 of 2
[2020-08-03 07:30:03,264] {taskinstance.py:881} INFO -
--------------------------------------------------------------------------------
[2020-08-03 07:30:03,265] {taskinstance.py:900} INFO - Executing <Task(KubernetesPodOperator): meta-scanner-streaming-submit> on 2020-08-03T00:00:00+00:00
[2020-08-03 07:30:03,320] {taskinstance.py:1145} ERROR - [Errno 2] No such file or directory: '/.kube/config'
Traceback (most recent call last):
File "/opt/bitnami/airflow/venv/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 978, in _run_raw_task
result = task_copy.execute(context=context)
File "/opt/bitnami/airflow/venv/lib/python3.6/site-packages/airflow/contrib/operators/kubernetes_pod_operator.py", line 197, in execute
config_file=self.config_file)
File "/opt/bitnami/airflow/venv/lib/python3.6/site-packages/airflow/contrib/kubernetes/kube_client.py", line 103, in get_kube_client
client_conf = _get_kube_config(in_cluster, cluster_context, config_file)
File "/opt/bitnami/airflow/venv/lib/python3.6/site-packages/airflow/contrib/kubernetes/kube_client.py", line 48, in _get_kube_config
client_configuration=cfg, config_file=config_file, context=cluster_context)
File "/opt/bitnami/airflow/venv/lib/python3.6/site-packages/airflow/contrib/kubernetes/refresh_config.py", line 117, in load_kube_config
config_file, active_context=context, config_persister=None)
File "/opt/bitnami/airflow/venv/lib/python3.6/site-packages/airflow/contrib/kubernetes/refresh_config.py", line 99, in _get_kube_config_loader_for_yaml_file
with open(filename) as f:
FileNotFoundError: [Errno 2] No such file or directory: '/.kube/config'
[2020-08-03 07:30:03,322] {taskinstance.py:1187} INFO - All retries failed; marking task as FAILED.dag_id=meta-scanner-streaming-submit-10, task_id=meta-scanner-streaming-submit-10, execution_date=20200803T000000, start_date=20200803T073003, end_date=20200803T073003
[2020-08-03 07:30:03,646] {email.py:131} INFO - Sent an alert email to ['[email protected]']
[2020-08-03 07:30:04,241] {email.py:131} INFO - Sent an alert email to ['[email protected]']
[2020-08-03 07:30:04,487] {taskinstance.py:1206} ERROR - Failed to send email to: ['[email protected]']
[2020-08-03 07:30:04,488] {taskinstance.py:1207} ERROR - {'[email protected]': (553, b'5.7.1 <[email protected]>: Sender address rejected: not owned by user [email protected]')}
...
# 开始运行任务(这一步也可以在web界面点trigger按钮)
$ ./airflow trigger_dag test_task
# 守护进程运行webserver, 默认端口为8080,也可以通过`-p`来指定
$ ./airflow webserver -D
# 守护进程运行调度器
$ ./airflow scheduler -D
# 守护进程运行调度器
$ ./airflow worker -D
# 暂停任务
$ ./airflow pause dag_id
# 取消暂停,等同于在web管理界面打开off按钮
$ ./airflow unpause dag_id
# 查看task列表
$ ./airflow list_tasks dag_id
# 清空任务状态
$ ./airflow clear dag_id
# 运行task
$ ./airflow run dag_id task_id execution_date