组件
功能
Airflow Webserver
查询元数据以监控和执行DAGs的web界面。
Airflow Scheduler
它检查元数据数据库中的DAG和任务的状态,
在必要时创建新任务,并将任务发送到队列。
Airflow Metadata Database
它包含DAG运行和任务实例的状态.
Airflow Message Broker
它将在队列中存储要运行的任务命令。
Airflow Workers
它们从队列中检索命令,执行命令,并更新元数据。
服务器
结点
服务
DATACENTER01
master1
webserver, scheduler,worker,rabbitmq(选装)
DATACENTER03
master2
webserver,scheduler
DATACENTER04
worker1
worker
DATACENTER05
worker2
worker
因为要用到RabbitMQ,由于rabbitmq是基于erlang语言开发的,所以必须先安装erlang。
安装依赖
yum -y install gcc glibc-devel make ncurses-devel openssl-devel xmlto perl wget gtk2-devel binutils-devel
下载erlang
wget http://erlang.org/download/otp_src_24.0.tar.gz
或
wget https://fossies.org/linux/misc/otp_src_24.0.tar.gz
(比较快)
解压otp_src_24.0.tar.gz
tar -zxvf otp_src_24.0.tar.gz
移动位置
mv otp_src_24.0 /usr/local/
切换目录
cd /usr/local/otp_src_24.0/
创建即将安装的目录
mkdir ../erlang
配置安装路径
./configure --prefix=/usr/local/erlang
如果遇到如下错误,不管
安装
make && make install
查看一下是否安装成功
ll /usr/local/erlang/bin
添加环境变量
echo 'export PATH=$PATH:/usr/local/erlang/bin' >> /etc/profile
刷新环境变量
source /etc/profile
测试
erl
退出
输入halt().退出
至此 erlang 安装完成
下载
wget https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.8.16/rabbitmq-server-generic-unix-3.8.16.tar.xz
由于是tar.xz格式的所以需要用到xz,没有的话就先安装(可选)
yum install -y xz
第一次解压
/bin/xz -d rabbitmq-server-generic-unix-3.8.16.tar.xz
第二次解压
tar -xvf rabbitmq-server-generic-unix-3.8.16.tar
移走
mv rabbitmq_server-3.8.16/ /usr/local/
改名
cd /usr/local/
mv /usr/local/rabbitmq_server-3.7.15 rabbitmq
配置环境变量
echo 'export PATH=$PATH:/usr/local/rabbitmq/sbin' >> /etc/profile
刷新环境变量
source /etc/profile
创建配置目录(未使用单独的配置文件,此步骤可以不用)
mkdir /etc/rabbitmq
启动
rabbitmq-server -detached
停止
rabbitmqctl stop
状态
rabbitmqctl status
开启web插件
rabbitmq-plugins enable rabbitmq_management
访问:15672端口
默认账号密码:guest guest(这个账号只允许本机访问)
查看所有用户
rabbitmqctl list_users
添加一个用户
rabbitmqctl add_user lillcol 123456
配置权限
rabbitmqctl set_permissions -p "/" lillcol ".*" ".*" ".*"
查看用户权限
rabbitmqctl list_user_permissions lillcol
设置tag
rabbitmqctl set_user_tags lillcol administrator
删除用户(安全起见,删除默认用户)
rabbitmqctl delete_user guest
配置好用户之后重启一下rabbit,然后就可以用新账号进行登陆
经测试,3.7.5一下版本安装airflow过程中会出现各种问题,所以需要安装3.7.5,(3.7.5+没测试不知道会不会出问题)
安装编译相关工具
yum -y groupinstall "Development tools"
yum -y install zlib-devel bzip2-devel openssl-devel ncurses-devel sqlite-devel readline-devel tk-devel gdbm-devel db4-devel libpcap-devel xz-devel
yum install libffi-devel -y
下载安装包解压
wget https://www.python.org/ftp/python/3.7.5/Python-3.7.5.tar.xz
tar -xvJf Python-3.7.5.tar.xz
编译安装python
mkdir /usr/python3.7 #创建编译安装目录
cd Python-3.7.5
./configure --prefix=/usr/python3.7 --enable-optimizations
make && make install
创建软连接
ln -s /usr/python3.7/bin/python3 /usr/bin/python3.7
ln -s /usr/python3.7/bin/pip3 /usr/bin/pip3.7
验证是否成功
python3.7 -V
pip3.7 -V
[root@DATACENTER04 bin]# python3.7 -V
Python 3.7.5
[root@DATACENTER04 bin]# pip3.7 -V
pip 19.2.3 from /usr/python3.7/lib/python3.7/site-packages/pip (python 3.7)
升级pip3.7
安装airflow pip版本过低会导致安装失败
pip3.7 install --upgrade pip==21.1.2
[root@DATACENTER04 bin]# pip3.7 install --upgrade pip==21.1.2
[root@DATACENTER04 bin]# pip3.7 -V
pip 21.1.2 from /usr/python3.7/lib/python3.7/site-packages/pip (python 3.7)
安装gunicorn
pip3.7 install --upgrade pip==21.1.2
此处使用mysql进行元数据管理,要求mysql 5.7+以上版本。
创建airflow_db库
CREATE DATABASE airflow_db CHARACTER SET UTF8mb3 COLLATE utf8_general_ci;
注意要用UTF8mb3,UTF8mb4在测试过程中出现错误
修改权限
CREATE USER 'airflow' IDENTIFIED BY 'airflow123';
GRANT ALL PRIVILEGES ON airflow_db.* TO 'airflow';
配置apps sudo权限(root)
给apps用户sudo权限,vi /etc/sudoers
,,加入下面语句,否则安装install的时候可能会失败
root ALL=(ALL) ALL
apps ALL=(ALL) NOPASSWD: ALL #加入这一句
配置airflow环境变量(root)
安装完后airflow安装路径默认为:/home/apps/.local/bin
,vi /etc/profile
尾部加入如下内容:
export PATH=$PATH:/usr/python3/bin:/home/apps/.local/bin
source /etc/profile
此处的/home/apps/.local/bin 为~/.local/bin,
根据实际配置PATH=$PATH:~/.local/bin
配置hosts(root),vi /etc/hosts
,加入下面语句
199.232.68.133 raw.githubusercontent.com
配置环境变量(apps)(可选,默认~/airflow)
export AIRFLOW_HOME=~/airflow
配置版本信息(apps)
AIRFLOW_VERSION=2.0.2 # airflow版本
PYTHON_VERSION="$(python3.7 --version | cut -d " " -f 2 | cut -d "." -f 1-2)" # python版本
CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt" # 约束url
安装airlfow(apps)
执行安装命令,注意要加sudo,否则会有部分缺失,但是没有报错
sudo pip3.7 install "apache-airflow==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}" --use-deprecated legacy-resolver [-i https://pypi.douban.com/simple]
如果上面的步骤顺利执行,此时会有airflow命令,并且会创建~/airflow
,进入airflow目录如下
注:上述步骤需要在所有安装结点进行操作
再看一遍安装规划
服务器
结点
服务
DATACENTER01
master1
webserver, scheduler,worker,rabbitmq(选装)
DATACENTER03
master2
webserver,scheduler
DATACENTER04
worker1
worker
DATACENTER05
worker2
worker
此时的架构如下
队列服务及元数据库(Metestore)的高可用
队列服务采用RabbitMQ,已经安装在DATACENTER01,可以通过部署高可用实现队列的高可用,(本案例没有对队列做高可用)
元数据库(Metestore) 高可用
取决于所使用的数据库,此处采用mysql。可以通过部署主从备份实现高可用
配置scheduler高可用
我们可以通过第三方组件 airflow-scheduler-failover-controller 实现 scheduler 的高可用,安装配置步骤如下:
下载failover
gitclone https://github.com/teamclairvoyant/airflow-scheduler-failover-controller
//网络不稳定有时候下不下来,可以去找其他资源然后上传服务器安装
安装failover
cd{AIRFLOW_FAILOVER_CONTROLLER_HOME}
sudo pip3.7 install -e . [-i https://pypi.douban.com/simple]
初始化 failover
sudo pip3.7 install -e . [-i https://pypi.douban.com/simple]
//初始化 failover 会向${AIRFLOW_HOME}/airflow.cfg中追加内容
更改${AIRFLOW_HOME}/airflow.cfg配置,4~7 步骤之后的所有步骤可以后面统一操作
scheduler_nodes_in_cluster= DATACENTER01,DATACENTER03
配置DATACENTER01,DATACENTER03之间免密登陆
测试免密登陆
scheduler_failover_controller test_connection
scheduler_failover_controller get_current_host //获取当前的host,可以用于查看安装情况
启动failover
nohup scheduler_failover_controller start >/dev/null 2>&1 &
配置{AIRFLOW_HOME}/airflow.cfg
将一下内容配置进{AIRFLOW_HOME}/airflow.cfg
executor = CeleryExecutor
指定元数据库(metestore)
#sql_alchemy_conn = sqlite:////home/apps/airflow/airflow.db
sql_alchemy_conn = mysql+pymysql://airflow:airflow123@10.0.0.1:3306/airflow_db
设置broker,即消息队列,此处使用 RabbitMQ
broker_url = amqp://lillcol:123456@DATACENTER01:5672/
result_backend = db+mysql://airflow:airflow123@10.0.0.1:3306/airflow_db
6.修改时区
default_timezone = Asia/Shanghai
配置web端口(默认8080,因为已被占用此处改为8081)
endpoint_url = http://localhost:8081
base_url = http://localhost:8081
web_server_port = 8081
关闭加载案例(可选)
load_examples = False
将修改后的{AIRFLOW_HOME}/airflow.cfg同步到所有安装airflow的服务器上
(apps@DATACENTER0)
:airflow db init
次步骤会在mysql上创建相关元数据表
创建用户(apps@DATACENTER01):
airflow users create <br />
--username admin <br />
--firstname Peter <br />
--lastname Parker <br />
--role Admin <br />
--email spiderman@superhero.org
Password:123456
启动webserver:
airflow webserver -D
次步骤在
DATACENTER01,DATACENTER03
执行
启动scheduler
#1. 需要先启动scheduler容错插件scheduler_failover_controller,
nohup scheduler_failover_controller start >/dev/null 2>&1 &
#2. 启动scheduler,次步骤只需要在DATACENTER01执行
nohup airflow scheduler >/dev/null 2>&1 &
同一时间只能启动一个scheduler,一旦运行 scheduler 守护进程的机器出现故障,立刻启动另一台机器上的 scheduler 。
启动worker
#1. 确保必要软件已经安装
sudo pip3.7 install pymysql
sudo pip3.7 install celery
sudo pip3.7 install flower
sudo pip3.7 install psycopg2-binary
#2. 先启动flower,在需要启动worker服务器执行,此处在DATACENTER01,DATACENTER04执行
airflow celery flower -D
#3. 启动worker,在需要启动worker服务器执行,此处在DATACENTER01,DATACENTER04执行
airflow celery worker -D
确保worker的8793已经开放,WEB UI查看log的时候无法加载相关日志
登陆web UI
http://DATACENTER01:8081
http://DATACENTER03:8081
账号:Admin 密码:123456
登陆后可以新增其他的用户
配置dags
airflow 的dags默认在{AIRFLOW_HOME}/dags
任务通过scheduler调度,并通过worker执行
所以在所有有启动scheduler和worker的服务器上都要有相同的dags与相关脚本
即我们需要保证所有结点下的{AIRFLOW_HOME}/dags以及依赖的脚本是一致的
如果不一致可能导致两个结果:
取决于当前scheduler上面的{AIRFLOW_HOME}/dags
在对应的woker上找不到执行的dag或相关脚本
比如目前scheduler 运行在DATACENTER03,此时{AIRFLOW_HOME}/dags如下:
[apps@DATACENTER03 dags]$ ll
total 40
-rw-r--r-- 1 apps dolphinscheduler 12513 May 28 15:14 DAG_**_D2.py
-rw-r--r-- 1 apps dolphinscheduler 12512 May 25 17:51 DAG_**_D.py
drwxr-xr-x 2 apps dolphinscheduler 132 Jun 4 18:03 __pycache__
-rw-r--r-- 1 apps dolphinscheduler 1381 Jun 4 16:43 TEST_RUN2.py
-rw-r--r-- 1 apps dolphinscheduler 1380 Jun 1 09:02 TEST_RUN.py
WEB UI如下:
启动任务
观测执行情况
执行任务的(woker)结点为:DATACENTER01
执行任务的(woker)结点为:DATACENTER04
所以我们必须保证所有结点的dags 与 依赖脚本同步
Specified key was too long
[apps@DATACENTER03 airflow]$ airflow db init
…
raise errorclass(errno, errval)
sqlalchemy.exc.OperationalError: (pymysql.err.OperationalError) (1071, 'Specified key was too long; max key length is 3072 bytes')
[SQL: ALTER TABLE xcom ADD CONSTRAINT pk_xcom PRIMARY KEY (dag_id, task_id, key
, execution_date)]
解决办法:
#创建airflow_db时候指定编码
#CREATE DATABASE airflow_db CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;
CREATE DATABASE airflow_db CHARACTER SET UTF8mb3 COLLATE utf8_general_ci;
CREATE USER 'airflow' IDENTIFIED BY 'airflow123';
GRANT ALL PRIVILEGES ON airflow_db.* TO 'airflow';
explicit_defaults_for_timestamp 错误
MySQL [(none)]> show global variables like '%timestamp%';
+---------------------------------+--------+
| Variable_name | Value |
+---------------------------------+--------+
| explicit_defaults_for_timestamp | OFF |
| log_timestamps | SYSTEM |
+---------------------------------+--------+
2 rows in set (0.02 sec)
MySQL [(none)]> set global explicit_defaults_for_timestamp =1;
Query OK, 0 rows affected (0.00 sec)
MySQL [(none)]> show global variables like '%timestamp%';
+---------------------------------+--------+
| Variable_name | Value |
+---------------------------------+--------+
| explicit_defaults_for_timestamp | ON |
| log_timestamps | SYSTEM |
+---------------------------------+--------+
-bash: airflow: command not found
安装完后没有出现airflow命令以及相关结构,解决办法有两个
卸载apache-airflow,重新安装一次,命令如下:
sudo pip3.7 uninstall apache-airflow==2.0.2
pip3.7 install "apache-airflow==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}" --use-deprecated legacy-resolver
将~/.local/bin加入PATH ,(推荐,在airflow安装前配置)
PATH=$PATH:~/.local/bin
No module named 'airflow'
No module named 'airflow'
No module named 'airflow.www'
No module named 'airflow.www.gunicorn_config'
FileNotFoundError: [Errno 2] No such file or directory: 'gunicorn': 'gunicorn'
解决办法:
#创建/usr/python3.7/bin/gunicorn的软连接替换原来的gunicorn,
#可能在``/usr/python3/bin``或``/usr/bin``下,具体看情况操作
1. 删除原来的软连接
sudo rm -rf /usr/python3/bin/gunicorn
2. 创建新的软连接
sudo ln -s /usr/python3.7/bin/gunicorn /usr/python3/bin
airflow webserver启动时,会调用subprocess.Popen创建子进程,webserver使用gunicorn
执行gunicorn启动时,可能是在PATH中找不到该命令报错
也可能是gunicorn的版本过低导致报错
目前的版本至少是
gunicorn (version 19.10.0)
ModuleNotFoundError: No module named 'MySQLdb'
启动worker的时候ModuleNotFoundError: No module named 'MySQLdb'
解决办法安装mysqlclient(python 3.7 要安装mysqlclient):
sudo pip3.7 install mysqlclient
无法读取worker端的log
airlfow日志默认存储在{AIRFLOW_PATH}/logs/{dag}/…下,
此时在读取在web 端读取不到日志可能有两种情况
配置免密登陆,但是执行scheduler_failover_controller test_connection
的时候还是需要输入密码
免密配置问题,可能两个原因:
权限问题
sshd为了安全,对属主的目录和文件权限有所要求。如果权限不对,则ssh的免密码登陆不生效。
要求如下:
#用户目录权限为 755 或者 700,就是不能是77x。
#.ssh目录权限一般为755或者700。
#rsa_id.pub 及authorized_keys权限一般为644
#rsa_id权限必须为600
将目录改成对应的权限即可
防火墙的问题
关闭防火墙测试
systemctl status firewalld
参考文档:
手机扫一扫
移动阅读更方便
你可能感兴趣的文章