Airflow

Reference

Components of Airflow

Airflow 有一些重要的组件

Metastore Database

Contains information about the status of Tasks, DAGs, Variables, Connections, etc.

WebServer

守护进程,使用 gunicorn 服务器(相当于 java 中的 tomcat )处理并发请求,可通过修改{AIRFLOW_HOME}/airflow.cfg文件中workers的值来控制处理并发请求的进程数,workers = 4表示开启 4 个 gunicorn worker(进程)处理 web 请求。webserver 提供以下功能:

  • 中止、恢复、触发任务。
  • 监控正在运行的任务,断点续跑任务。
  • 执行 ad-hoc 命令或 SQL 语句来查询任务的状态,日志等详细信息。
  • 配置连接,包括不限于数据库、ssh 的连接等。

Scheduler

守护进程,调度器,它周期性地轮询任务的调度计划,以确定是否触发任务执行

Executor

执行器,Airflow 本身是一个综合平台,它兼容多种组件,所以在使用的时候有多种方案可以选择。比如最关键的几个执行器:

  • Debug Executor: 单进程顺序执行任务,默认执行器,通常只用于测试
  • Celery Executor: 分布式调度任务,生产环境常用。Celery 是一个借助队列机制实现的分布式任务调度框架,它本身无队列功能,需要借助第三方组件,比如 Redis 或者 RabbitMQ。
    • Celery 的任务队列包含两个重要的组件
      1. Broker: 存储要执行的命令列表,需要借助第三方的用于收发消息的消息中间件(Message Broker),如 RabbitMQ、Redis
      2. Result Backend: 存储已完成命令的状态,一般存储到 Database
    • 当调度器executor = CeleryExecutor时,包含两个重要的守护进程:
      1. Celery Worker: 守护进程,通过airflow worker -D启动一个或多个 Celery 的任务队列,负责执行具体的 DAG 任务,默认队列名为default
      2. Celery Flower: 守护进程,通过airflow flower -D启动,消息队列监控工具,用于监控 Celery 消息队列,默认的端口为5555,可以在浏览器地址栏中输入http://127.0.0.1:5555来访问
  • Dask Executor: 动态任务调度,主要用于数据分析
  • Kubernetes Executor

Common Concepts of Airflow

  • DAG:即有向无环图(Directed Acyclic Graph),将所有需要运行的 Tasks 按照依赖关系组织起来,描述的是所有 Tasks 执行顺序。
  • Operator:可以简单理解为一个class,描述了 DAG 中某个的 task 具体要做的事。其中,airflow 内置了很多 operators,如 BashOperator 执行一个 bash 命令,PythonOperator 调用任意的 Python 函数,EmailOperator 用于发送邮件,HTTPOperator 用于发送HTTP请求, SqlOperator 用于执行SQL命令等等,同时,用户可以自定义 Operator,这给用户提供了极大的便利性。Using Operators
  • Task:Task 是 Operator 的一个实例,也就是 DAGs 中的一个 Node。
  • Task Instance:Task的一次运行。Web 界面中可以看到 Task Instance
  • Task Lifecycle:Task从开始到完成整个生命周期各阶段的状态。
  • Task Relationship:DAGs 中的不同 Tasks 之间可以有依赖关系,如Task1 >> Task2,表明 Task2 依赖于 Task1 的结果。
  • Workflow:通过将 DAGs 和 Operators 结合起来,用户就可以创建各种复杂的工作流(workflow)。

Working Principle of Airflow

这里只说 CeleryExecutor 的工作原理。

Install Airflow On ECS

$ sudo pip install --upgrade pip
$ sudo pip install -U setuptools

$ export AIRFLOW_HOME=~/airflow
$ pip install apache-airflow
$ cd $PYTHON_HOME/lib/python2.7/sit-packages/airflow/bin
# 将 airflow 安装到刚刚设置的 AIRFLOW_HOME 目录下
$ ./airflow

$ cp -r $PYTHON_HOME/lib/python2.7/sit-packages/airflow/bin ~/airflow

$ ls -l ~/airflow
total 40
-rw-rw-r-- 1 zhangqiang zhangqiang 36475 Jun 28 18:35 airflow.cfg
drwxrwxr-x 3 zhangqiang zhangqiang    23 Jun 28 18:35 logs
-rw-rw-r-- 1 zhangqiang zhangqiang  2588 Jun 28 18:35 unittests.cfg


# 修改配置文件,所有节点公用一份
$ vim $AIRFLOW_HOME/airflow.cfg
# 配置 Metestore Database 
sql_alchemy_conn = mysql://{USERNAME}:{PASSWORD}@{MYSQL_HOST}:3306/airflow
# 配置 Broker - RabbitMQ
# http://docs.celeryproject.org/en/latest/userguide/configuration.html#broker-settings
broker_url = amqp://{USERNAME}:{PASSWORD}@{RABBITMQ_HOST}:5672/
# 或者配置 Broker - Redis,这里指定使用 Redis db 0
# broker_url = redis://:{PASSWORD}@{REDIS_HOST}:6379/0
# 配置 Result Backend
# http://docs.celeryproject.org/en/latest/userguide/configuration.html#task-result-backend-settings
result_backend = db+mysql://{USERNAME}:{PASSWORD}@{MYSQL_HOST}:3306/airflow

# 安装 mysql 依赖包,CentOS 
$ sudo yum install -y python-devel mysql-devel
# Ubuntu
$ sudo apt-get install -y python-dev python-MySQLdb
$ pip install 'apache-airflow[mysql]'

$ pip install 'apache-airflow[celery]'

$ ./airflow initdb

# Start the web server, default port is 8080
$ ./airflow webserver -D -p 8080

# Start the scheduler
$ ./airflow scheduler -D

# Start Celery Worker, 可以在多个节点运行多个 worker
$ ./airflow worker -D

Install Airflow On Kubernetes

# Optional: 创建 Secrets,注意 --from-literal 参数中的 key 可以通过 secretKey 修改,默认为 mysql-password
$ kubectl create secret generic "airflow-mysql-password" --from-literal=mysql-password=123456
$ kubectl create secret generic "airflow-redis-password" --from-literal=redis-password=123456

$ helm repo add bitnami https://charts.bitnami.com/bitnami
"bitnami" has been added to your repositories

$ helm search repo airflow
NAME            CHART VERSION   APP VERSION     DESCRIPTION
bitnami/airflow 6.3.5           1.10.10         Apache Airflow is a platform to programmaticall...
stable/airflow  7.2.0           1.10.10         Airflow is a platform to programmatically autho...

$ helm fetch bitnami/airflow

$ tar -zxvf airflow-6.3.5.tgz

$ cd airflow && ls -l
total 107
-rw-r--r-- 1 zhangqiang 197121   462  7月  2 15:42 Chart.yaml
drwxr-xr-x 1 zhangqiang 197121     0  7月  9 23:11 charts
drwxr-xr-x 1 zhangqiang 197121     0  7月  9 23:11 ci
drwxr-xr-x 1 zhangqiang 197121     0  7月  9 23:11 files
# 使用说明,可配置参数介绍
-rw-r--r-- 1 zhangqiang 197121 62104  7月  2 15:42 README.md
-rw-r--r-- 1 zhangqiang 197121   306  7月  2 15:42 requirements.lock
-rw-r--r-- 1 zhangqiang 197121   255  7月  2 15:42 requirements.yaml
drwxr-xr-x 1 zhangqiang 197121     0  7月  9 23:11 templates
# 这里面包含所有可配置的属性,可以直接修改这里面的配置项目,而不是通过参数 + 模板的方式配置
-rw-r--r-- 1 zhangqiang 197121 17578  7月  2 15:42 values.yaml
-rw-r--r-- 1 zhangqiang 197121 15403  7月  2 15:42 values-production.yaml

# 修改配置项
$ vim values.yml

# 从本地安装,或者 helm install --name-template my-airflow -f your_new_values.yaml stable/airflow
$ helm install --name-template my-airflow .

# 更新配置 helm upgrade my-airflow -f your_new_values.yaml .
$ helm upgrade my-airflow .
Release "my-airflow" has been upgraded. Happy Helming!
NAME: my-airflow
LAST DEPLOYED: Thu Jul  9 22:51:51 2020
NAMESPACE: dlink-test
STATUS: deployed
REVISION: 3
TEST SUITE: None
NOTES:
Congratulations. You have just deployed Apache Airflow!

1. Get the Airflow Service URL by running these commands:
   export NODE_PORT=$(kubectl get --namespace dlink-test -o jsonpath="{.spec.ports[0].nodePort}" services my-airflow-web)
   export NODE_IP=$(kubectl get nodes --namespace dlink-test -o jsonpath="{.items[0].status.addresses[0].address}")
   echo http://$NODE_IP:$NODE_PORT/

2. Open Airflow in your web browser

Define DAGs

$ mkdir $AIRFLOW_HOME/dags

# 定义 DAG
$ vim $AIRFLOW_HOME/dags/tutorial.py
#!/usr/bin/python
# -*- coding: utf-8 -*-
from datetime import timedelta
# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG
# Operators; we need this to operate!
from airflow.operators.bash_operator import BashOperator
from airflow.utils.dates import days_ago

# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': days_ago(2),
    'email': ['[email protected]'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'end_date': datetime(2016, 1, 1),
    # 'wait_for_downstream': False,
    # 'dag': dag,
    # 'sla': timedelta(hours=2),
    # 'execution_timeout': timedelta(seconds=300),
    # 'on_failure_callback': some_function,
    # 'on_success_callback': some_other_function,
    # 'on_retry_callback': another_function,
    # 'sla_miss_callback': yet_another_function,
    # 'trigger_rule': 'all_success'
}
dag = DAG(
    dag_id='tutorial',
    default_args=default_args,
    description='A simple tutorial DAG',
    schedule_interval=timedelta(days=1),
)

# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = BashOperator(
    task_id='print_date',
    bash_command='date',
    dag=dag,
)

t2 = BashOperator(
    task_id='sleep',
    depends_on_past=False,
    bash_command='sleep 5',
    retries=3,
    dag=dag,
)
dag.doc_md = __doc__

t1.doc_md = """\
#### Task Documentation
You can document your task using the attributes `doc_md` (markdown),
`doc` (plain text), `doc_rst`, `doc_json`, `doc_yaml` which gets
rendered in the UI's Task Instance Details page.
![img](http://montcs.bloomu.edu/~bobmon/Semesters/2012-01/491/import%20soul.png)
"""
templated_command = """

"""

t3 = BashOperator(
    task_id='templated',
    depends_on_past=False,
    bash_command=templated_command,
    params={'my_param': 'Parameter I passed in'},
    dag=dag,
)

t1 >> [t2, t3]

# 检查是否有错误,如果命令行没有报错,就表示没太大问题。
$ python $AIRFLOW_HOME/dags/tutorial.py
# 查看生效的 DAGs
$ airflow list_dags -sd $AIRFLOW_HOME/dags

Commands

Usages

$ ./airflow -h
$ ./airflow <command> -h

DAG Test

使用airflow [test] <dag_id> <task_id> <execution_time>测试任务 ,很有用的命令,可以提前测试 DAG 是否有问题

$ airflow test meta-scanner-streaming meta-scanner-streaming-submit 2020-08-03
[2020-08-03 07:30:02,634] {__init__.py:51} INFO - Using executor CeleryExecutor
[2020-08-03 07:30:02,635] {dagbag.py:396} INFO - Filling up the DagBag from /opt/bitnami/airflow/dags
[2020-08-03 07:30:03,200] {taskinstance.py:669} INFO - Dependencies all met for <TaskInstance: meta-scanner-streaming.meta-scanner-streaming-submit 2020-08-03T00:00:00+00:00 [None]>
[2020-08-03 07:30:03,263] {taskinstance.py:669} INFO - Dependencies all met for <TaskInstance: meta-scanner-streaming.meta-scanner-streaming-submit 2020-08-03T00:00:00+00:00 [None]>
[2020-08-03 07:30:03,263] {taskinstance.py:879} INFO - 
--------------------------------------------------------------------------------
[2020-08-03 07:30:03,263] {taskinstance.py:880} INFO - Starting attempt 1 of 2
[2020-08-03 07:30:03,264] {taskinstance.py:881} INFO - 
--------------------------------------------------------------------------------
[2020-08-03 07:30:03,265] {taskinstance.py:900} INFO - Executing <Task(KubernetesPodOperator): meta-scanner-streaming-submit> on 2020-08-03T00:00:00+00:00
[2020-08-03 07:30:03,320] {taskinstance.py:1145} ERROR - [Errno 2] No such file or directory: '/.kube/config'
Traceback (most recent call last):
  File "/opt/bitnami/airflow/venv/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 978, in _run_raw_task
    result = task_copy.execute(context=context)
  File "/opt/bitnami/airflow/venv/lib/python3.6/site-packages/airflow/contrib/operators/kubernetes_pod_operator.py", line 197, in execute
    config_file=self.config_file)
  File "/opt/bitnami/airflow/venv/lib/python3.6/site-packages/airflow/contrib/kubernetes/kube_client.py", line 103, in get_kube_client
    client_conf = _get_kube_config(in_cluster, cluster_context, config_file)
  File "/opt/bitnami/airflow/venv/lib/python3.6/site-packages/airflow/contrib/kubernetes/kube_client.py", line 48, in _get_kube_config
    client_configuration=cfg, config_file=config_file, context=cluster_context)
  File "/opt/bitnami/airflow/venv/lib/python3.6/site-packages/airflow/contrib/kubernetes/refresh_config.py", line 117, in load_kube_config
    config_file, active_context=context, config_persister=None)
  File "/opt/bitnami/airflow/venv/lib/python3.6/site-packages/airflow/contrib/kubernetes/refresh_config.py", line 99, in _get_kube_config_loader_for_yaml_file
    with open(filename) as f:
FileNotFoundError: [Errno 2] No such file or directory: '/.kube/config'
[2020-08-03 07:30:03,322] {taskinstance.py:1187} INFO - All retries failed; marking task as FAILED.dag_id=meta-scanner-streaming-submit-10, task_id=meta-scanner-streaming-submit-10, execution_date=20200803T000000, start_date=20200803T073003, end_date=20200803T073003
[2020-08-03 07:30:03,646] {email.py:131} INFO - Sent an alert email to ['[email protected]']
[2020-08-03 07:30:04,241] {email.py:131} INFO - Sent an alert email to ['[email protected]']
[2020-08-03 07:30:04,487] {taskinstance.py:1206} ERROR - Failed to send email to: ['[email protected]']
[2020-08-03 07:30:04,488] {taskinstance.py:1207} ERROR - {'[email protected]': (553, b'5.7.1 <[email protected]>: Sender address rejected: not owned by user [email protected]')}
...
# 开始运行任务(这一步也可以在web界面点trigger按钮)
$ ./airflow trigger_dag test_task

# 守护进程运行webserver, 默认端口为8080,也可以通过`-p`来指定
$ ./airflow webserver -D  

# 守护进程运行调度器     
$ ./airflow scheduler -D   

# 守护进程运行调度器    
$ ./airflow worker -D          

# 暂停任务
$ ./airflow pause dag_id      

# 取消暂停,等同于在web管理界面打开off按钮
$ ./airflow unpause dag_id     

# 查看task列表
$ ./airflow list_tasks dag_id

# 清空任务状态
$ ./airflow clear dag_id       

# 运行task
$ ./airflow run dag_id task_id execution_date

qin

取消

感谢您的支持,我会继续努力的!

扫码支持
扫码支持
扫码打赏

打开支付宝扫一扫,即可进行扫码打赏哦