Airflow 工作原理(未完成)

前言

使用 airflow 一年,出现了以下几个问题,虽然不是非常致命,却时常影响效率,需要重启解决。因此这次梳理一下 airflow。

注意以下阅读的是 1.8.0 的源码。安装包的名字由 airflow 变成了 apache-airflow,原来的安装包不支持后面的升级了。目前(2018年04月22日)发布的稳定版本是 1.9.0 ,正在开发的版本是 1.10.0。

试图解决一下几个问题:

  1. 监测类的任务,每五分钟启动一次 DAG,产生了过多且难以使用的 log。
  2. 较重的 task instance 会失效,状态停留在 running,阻塞的之后的任务运行。

此外,本次试图解答一下几个问题:

  1. airflow 的正常运行需要哪几个部分?如何协调的?
  2. airflow 的 log 应当如何管理?
  3. 如何优化以下 SQL 的查询速度?
1
SELECT dag_run.dag_id AS dag_run_dag_id, dag_run.state AS dag_run_state, count('*') AS count_1 FROM dag_run WHERE dag_run.dag_id != dag_run.dag_id GROUP BY dag_run.dag_id, dag_run.state

基本概念

JOB:最上层的工作。分为 SchedulerJob、BackfillJob 和 LocalTaskJob。SchedulerJob 有由 schedule 创建,BackfillJob 由 backfill 创建,LocalTaskJob 由前面两种 Job 创建。

DAG:有向无环图,用来表示工作流。

DAG Run:工作流实例,表示某个工作流的一次运行。

Task:任务,工作流的基本组成部分。

Task Instance:任务实例,表示某个任务的一次运行。

airflow 的正常运行需要哪几个部分?如何协调的?

首先,所有的后台服务都是通过托管 airflow {serve_name} 这样的指令启动的。因此有必要先看一下这个命令中的参数是如何映射到指定函数的参数中的。

1
2
3
parser = CLIFactory.get_parser()
args = parser.parse_args()
args.func(args)

CLIFactory 中有两个重要的映射,args 将命令行参数名映射到 默认值、帮助信息等,subparsers_dict 将目标函数的函数名映射到该目标函数相关的说明中。

CLIFactory.get_parser() 返回了一个 argparse.ArgumentParser 的实例 parser ,包含了所有相关参数信息,通过 parser.parse_args() 方法进行参数校验,将命令行参数转化为一个仅用于存储属性的对象实例 args 中。

这一段可作为 argparse 的典型用法学习,文档 可供参考。对于小命令行工具,click 更值得推荐。

webserver

首先,为了提供 UI 界面,airflow 是有一个 webserver 的服务,用于展示内部各个 DAG 和 TASK 的执行状态,还可以做一部分配置管理和任务管理的工作。其实就是用 gunicorn 启动了 airflow 中的 application。

启动服务的指令定义在 airflow.bin.cli.webserver,涉及运行参数的代码如下,仅供理解,不能运行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
run_args = [
'gunicorn',
'-w', str(num_workers),
'-k', str(args.workerclass),
'-t', str(worker_timeout),
'-b', args.hostname + ':' + str(args.port),
'-n', 'airflow-webserver',
'-p', str(pid),
'-c', 'python:airflow.www.gunicorn_config',
]

if args.access_logfile:
run_args += ['--access-logfile', str(args.access_logfile)]

if args.error_logfile:
run_args += ['--error-logfile', str(args.error_logfile)]

if args.daemon:
run_args += ['-D']

if ssl_cert:
run_args += ['--certfile', ssl_cert, '--keyfile', ssl_key]

webserver_module = 'www_rbac' if settings.RBAC else 'www'
run_args += ["airflow." + webserver_module + ".app:cached_app()"]

调用的接口 app 入口在 airflow.www.app.cached_app,是使用纯 flask 写前后端不分离的网站,airflow.www.app.create_app 中绑定了各个视图页,如需学习 flask 可深入研究。

scheduler

调度器实际上就是一个 airflow.jobs.SchedulerJob 实例 job 持续运行 run 方法。job.run() 在开始时将自身的信息加入到 job 表中,并维护状态和心跳,预期能够正常结束,将结束时间也更新到表中。但是实际上往往因为异常中断,导致结束时间为空。不管是如何进行的退出,SchedulerJob 退出时会关闭所有子进程。

job._execute_helper(processor_manager) 内封装了主要的定时调度逻辑。

airflow.utils.dag_processing.DagFileProcessorManager#heartbeat 被周期性调用,将完成的 processor 从当前处理队列弹出,根据最大并发参数 max_threads 将待处理的文件加入当前处理队列。 返回值为 list[SimpleDag] ,即已经成功结束的处理。

核心问题:scheduler 是如何做定时的,如何做并发控制的?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
self.heartrate = conf.getint('scheduler', 'SCHEDULER_HEARTBEAT_SEC')
self.max_threads = conf.getint('scheduler', 'max_threads')
# How often to scan the DAGs directory for new files. Default to 5 minutes.
self.dag_dir_list_interval = conf.getint('scheduler',
'dag_dir_list_interval')
# How often to print out DAG file processing stats to the log. Default to
# 30 seconds.
self.print_stats_interval = conf.getint('scheduler',
'print_stats_interval')
# Parse and schedule each file no faster than this interval. Default
# to 3 minutes.
self.file_process_interval = file_process_interval

self.max_tis_per_query = conf.getint('scheduler', 'max_tis_per_query')
if run_duration is None:
self.run_duration = conf.getint('scheduler',
'run_duration')

每隔 dag_dir_list_interval 重读 dags 文件夹。

max_threads 在 processor_manager.heartbeat() 中限制并发的 processor.start() 数,每个 processor 对应着一个 dag 文件地址。

parallelism 在 executor.heartbeat() 中限制并发的,executor.start() 后 executor 内生成了一个 multiprocessing.JoinableQueue() 的 queue 以及 worker 列表,worker 都执行了 start() 方法。

executor.queued_tasks 中添加入队的 tis,等到 executor.heartbeat() 时使用 executor.execute_async 方法将该命令放入QueuedLocalWorker 实例中,然后在通过 worker.execute_work(key, command) 中执行 airflow run 指令。

1 个 SchedulerJob 实例,对应 1 个 DefaultExecutor 实例,1 个 DagFileProcessorManager 实例,parallelism 个 QueuedLocalWorker 实例。

这一段可作为 multiprocessing 的典型用法学习,文档 可供参考。

airflow 的 log 级别应当如何管理?

LEVEL:1.8 版本的 loglevel 写死为 LOGGING_LEVEL = logging.INFO,目前不可配置。后续将会升级,使其可配置。

如何优化以下 SQL 的查询速度?

升级版本,或者根据新版本的 model 自己加索引。这是最省事的办法。

subdag

subdag 是一个特殊的 DAG,主要的 task 有一个 log,记录了使用 backfill 指令运行的子DAG。而因此会单独创建一个 BackfillJob 运行所有的这些子DAG,并且其中的每个子 DAG 的子任务都有自己的 log。

配置方法

将schedule 的 heartrate 延长,减少数据库读写。

对每个小时进行的 DAG 错峰执行,平摊开销。

其他收获

  1. 使用 sqlalchemy 的 polymorphic 特性实现模型多态
1
2
3
4
__mapper_args__ = {
'polymorphic_on': job_type,
'polymorphic_identity': 'BaseJob'
}
  1. 使用 sqlalchemy 的 table_args 创建索引
1
2
3
__table_args__ = (
Index('job_type_heart', job_type, latest_heartbeat),
)
  1. 使用 sqlalchemy 的 sqlalchemy.orm.session.make_transientsession.merge(model) 来将这个类实例与会话的关系断开或重建。

简单总结

airflow 目前还是处于高速开发中,当前版本的坑很多,版本升级也不是向后兼容的,变动很大。如果已经在生产环境使用了 airflow,因为一些原因想升级,需要对目标版本做充分的测试和了解。

坚持原创技术分享,您的支持将鼓励我继续创作!