
前面聊了Airflow基础架构??,以及又讲了如何在容器化内部署Airflow??,今天我们就再来看看如何通过Airflow和celery构建一个健壮的分布式调度集群。
1集群环境同样是在Ubuntu 20.04.3 LTS机器上安装Airflow集群,这次我们准备三台同等配置服务器,进行测试,前篇文章??[1]中,我们已经在Bigdata1服务器上安装了airflow的所有组件,没看过的可以点击链接先看下之前的文章,现在只需要在其他两个节点安装worker组件即可。
| Bigdata1(A) | Bigdata2(B) | Bigdata3(C) | |
|---|---|---|---|
| Webserver | √ | ||
| Scheduler | √ | ||
| Worker | √ | √ | √ |
在上篇文章中的docker-compose.yml中没有对部署文件以及数据目录进行的分离,这样在后期管理的时候不太方便,因此我们可以把服务停止后,将数据库以及数据目录与部署文件分开
部署文件:docker-compose.yaml/.env 存放在/apps/airflow目录下 MySQL以及配置文件: 放在/data/mysql airflow数据目录: 放在/data/airflow
这样拆分开就方便后期的统一管理了。
2部署worker服务前期准备
- mkdir /data/airflow/{dags,plugins} -pv mkdir -pv /apps/airflow
- mkdir -pv /logs/airflow
worker的部署文件:
- --- version: '3'
- x-airflow-common: &airflow-common
- # In order to add custom dependencies or upgrade provider packages you can use your extended image. # Comment the image line, place your Dockerfile in the directory where you placed the docker-compose.yaml
- # and uncomment the "build" line below, Then run `docker-compose build` to build the images. image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.2.3}
- # build: . environment:
- &airflow-common-env AIRFLOW__CORE__EXECUTOR: CeleryExecutor
- AIRFLOW__CORE__SQL_ALCHEMY_CONN: mysql+mysqldb://airflow:aaaa@$${MYSQL_HOST}:3306/airflow #修改MySQL对应的账号和密码 AIRFLOW__CELERY__RESULT_BACKEND: db+mysql://airflow:aaaa@$${MYSQL_HOST}:3306/airflow #修改MySQL对应的账号和密码
- AIRFLOW__CELERY__BROKER_URL: redis://:xxxx@$${REDIS_HOST}:7480/0 #修改Redis的密码 AIRFLOW__CORE__FERNET_KEY: ''
- AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true' AIRFLOW__CORE__LOAD_EXAMPLES: 'true'
- AIRFLOW__API__AUTH_BACKEND: 'airflow.api.auth.backend.basic_auth' _PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:-}
- volumes: - /data/airflow/dags:/opt/airflow/dags
- - /logs/airflow:/opt/airflow/logs - /data/airflow/plugins:/opt/airflow/plugins
- - /data/airflow/airflow.cfg:/opt/airflow/airflow.cfg user: "${AIRFLOW_UID:-50000}:0"
- services:
- airflow-worker: <<: *airflow-common
- command: celery worker healthcheck:
- test: - "CMD-SHELL"
- - 'celery --app airflow.executors.celery_executor.app inspect ping -d "celery@$${HOSTNAME}"' interval: 10s
- timeout: 10s retries: 5
- environment: <<: *airflow-common-env
- # Required to handle warm shutdown of the celery workers properly # See https://airflow.apache.org/docs/docker-stack/entrypoint.html#signal-propagation
- DUMB_INIT_SETSID: "0" restart: always
- hostname: bigdata-20-194 # 此处设置容器的主机名,便于在flower中查看是哪个worker depends_on:
- airflow-init: condition: service_completed_successfully
- airflow-init:
- <<: *airflow-common entrypoint: /bin/bash
- # yamllint disable rule:line-length command:
- - -c - |
- function ver() { printf "%04d%04d%04d%04d" $${1//./ }
- } airflow_version=$$(gosu airflow airflow version)
- airflow_version_comparable=$$(ver $${airflow_version}) min_airflow_version=2.2.0
- min_airflow_version_comparable=$$(ver $${min_airflow_version}) if (( airflow_version_comparable < min_airflow_version_comparable )); then
- echo echo -e " 33[1;31mERROR!!!: Too old Airflow version $${airflow_version}!e[0m"
- echo "The minimum Airflow version supported: $${min_airflow_version}. Only use this or higher!" echo
- exit 1 fi
- if [[ -z "${AIRFLOW_UID}" ]]; then echo
- echo -e " 33[1;33mWARNING!!!: AIRFLOW_UID not set!e[0m" echo "If you are on Linux, you SHOULD follow the instructions below to set "
- echo "AIRFLOW_UID environment variable, otherwise files will be owned by root." echo "For other operating systems you can get rid of the warning with manually created .env file:"
- echo " See: https://airflow.apache.org/docs/apache-airflow/stable/start/docker.html#setting-the-right-airflow-user" echo
- fi one_meg=1048576
- mem_available=$$(($$(getconf _PHYS_PAGES) * $$(getconf PAGE_SIZE) / one_meg)) cpus_available=$$(grep -cE 'cpu[0-9]+' /proc/stat)
- disk_available=$$(df / | tail -1 | awk '{print $$4}') warning_resources="false"
- if (( mem_available < 4000 )) ; then echo
- echo -e " 33[1;33mWARNING!!!: Not enough memory available for Docker.e[0m" echo "At least 4GB of memory required. You have $$(numfmt --to iec $$((mem_available * one_meg)))"
- echo warning_resources="true"
- fi if (( cpus_available < 2 )); then
- echo echo -e " 33[1;33mWARNING!!!: Not enough CPUS available for Docker.e[0m"
- echo "At least 2 CPUs recommended. You have $${cpus_available}" echo
- warning_resources="true" fi
- if (( disk_available < one_meg * 10 )); then echo
- echo -e " 33[1;33mWARNING!!!: Not enough Disk space available for Docker.e[0m" echo "At least 10 GBs recommended. You have $$(numfmt --to iec $$((disk_available * 1024 )))"
- echo warning_resources="true"
- fi if [[ $${warning_resources} == "true" ]]; then
- echo echo -e " 33[1;33mWARNING!!!: You have not enough resources to run Airflow (see above)!e[0m"
- echo "Please follow the instructions to increase amount of resources available:" echo " https://airflow.apache.org/docs/apache-airflow/stable/start/docker.html#before-you-begin"
- echo fi
- mkdir -p /sources/logs /sources/dags /sources/plugins chown -R "${AIRFLOW_UID}:0" /sources/{logs,dags,plugins}
- exec /entrypoint airflow version # yamllint enable rule:line-length
- environment: <<: *airflow-common-env
- _AIRFLOW_DB_UPGRADE: 'true' _AIRFLOW_WWW_USER_CREATE: 'true'
- _AIRFLOW_WWW_USER_USERNAME: ${_AIRFLOW_WWW_USER_USERNAME:-airflow} _AIRFLOW_WWW_USER_PASSWORD: ${_AIRFLOW_WWW_USER_PASSWORD:-airflow}
- user: "0:0" volumes:
- - .:/sources
- airflow-cli: <<: *airflow-common
- profiles: - debug
- environment: <<: *airflow-common-env
- CONNECTION_CHECK_MAX_COUNT: "0" # Workaround for entrypoint issue. See: https://github.com/apache/airflow/issues/16252
- command: - bash
- - -c - airflow
初始化检测,检查环境是否满足:
- cd /apps/ariflow/ echo -e "AIRFLOW_UID=$(id -u)" > .env # 注意,此处一定要保证AIRFLOW_UID是普通用户的UID,且保证此用户有创建这些持久化目录的权限
- docker-compose up airflow-init
如果数据库已经存在,初始化检测不影响已有的数据库,接下来就运行airflow-worker服务
- docker-compose up -d
接下来,按照同样的方式在bigdata3节点上安装airflow-worker服务就可以了。部署完成之后,就可以通过flower查看broker的状态:
3持久化配置文件大多情况下,使用airflow多worker节点的集群,我们就需要持久化airflow的配置文件,并且将airflow同步到所有的节点上,因此这里需要修改一下docker-compose.yaml中x-airflow-common的volumes,将airflow.cfg通过挂载卷的形式挂载到容器中,配置文件可以在容器中拷贝一份出来,然后在修改;
前期使用的时候,我们需要将docker-compose文件中的一些环境变量的值写入到airflow.cfg文件中,例如以下信息:
- [core] dags_folder = /opt/airflow/dags
- hostname_callable = socket.getfqdn default_timezone = Asia/Shanghai # 修改时区
- executor = CeleryExecutor sql_alchemy_conn = mysql+mysqldb://airflow:aaaa@$${MYSQL_HOST}:3306/airflow
- sql_engine_encoding = utf-8 sql_alchemy_pool_enabled = True
- sql_alchemy_pool_size = 5 sql_alchemy_max_overflow = 10
- sql_alchemy_pool_recycle = 1800 sql_alchemy_pool_pre_ping = True
- sql_alchemy_schema = parallelism = 32
- max_active_tasks_per_dag = 16 dags_are_paused_at_creation = True
- max_active_runs_per_dag = 16 load_examples = True
- load_default_connections = True plugins_folder = /opt/airflow/plugins
- execute_tasks_new_python_interpreter = False fernet_key =
- donot_pickle = True dagbag_import_timeout = 30.0
- dagbag_import_error_tracebacks = True dagbag_import_error_traceback_depth = 2
- dag_file_processor_timeout = 50 task_runner = StandardTaskRunner
- default_impersonation = security =
- unit_test_mode = False enable_xcom_pickling = False
- killed_task_cleanup_time = 60 dag_run_conf_overrides_params = True
- dag_discovery_safe_mode = True default_task_retries = 0
- default_task_weight_rule = downstream min_serialized_dag_update_interval = 30
- min_serialized_dag_fetch_interval = 10 max_num_rendered_ti_fields_per_task = 30
- check_slas = True xcom_backend = airflow.models.xcom.baseXCom
- lazy_load_plugins = True lazy_discover_providers = True
- max_db_retries = 3 hide_sensitive_var_conn_fields = True
- sensitive_var_conn_names = default_pool_task_slot_count = 128
- [logging] base_log_folder = /opt/airflow/logs
- remote_logging = False remote_log_conn_id =
- google_key_path = remote_base_log_folder =
- encrypt_s3_logs = False logging_level = INFO
- fab_logging_level = WARNING logging_config_class =
- colored_console_log = True colored_log_format = [%%(blue)s%%(asctime)s%%(reset)s] {%%(blue)s%%(filename)s:%%(reset)s%%(lineno)d} %%(log_color)s%%(levelname)s%%(reset)s - %%(log_color)s%%(message)s%%(reset)s
- colored_formatter_class = airflow.utils.log.colored_log.CustomTTYColoredFormatter log_format = [%%(asctime)s] {%%(filename)s:%%(lineno)d} %%(levelname)s - %%(message)s
- simple_log_format = %%(asctime)s %%(levelname)s - %%(message)s task_log_prefix_template =
- log_filename_template = {{ ti.dag_id }}/{{ ti.task_id }}/{{ ts }}/{{ try_number }}.log log_processor_filename_template = {{ filename }}.log
- dag_processor_manager_log_location = /opt/airflow/logs/dag_processor_manager/dag_processor_manager.log task_log_reader = task
- extra_logger_names = worker_log_server_port = 8793
- [metrics] statsd_on = False
- statsd_host = localhost statsd_port = 8125
- statsd_prefix = airflow statsd_allow_list =
- stat_name_handler = statsd_datadog_enabled = False
- statsd_datadog_tags = [secrets]
- backend = backend_kwargs =
- [cli] api_client = airflow.api.client.local_client
- endpoint_url = http://localhost:8080 [debug]
- fail_fast = False [api]
- enable_experimental_api = False auth_backend = airflow.api.auth.backend.deny_all
- maximum_page_limit = 100 fallback_page_limit = 100
- google_oauth2_audience = google_key_path =
- access_control_allow_headers = access_control_allow_methods =
- access_control_allow_origins = [lineage]
- backend = [atlas]
- sasl_enabled = False host =
- port = 21000 username =
- password = [operators]
- default_owner = airflow default_cpus = 1
- default_ram = 512 default_disk = 512
- default_gpus = 0 default_queue = default
- allow_illegal_arguments = False [hive]
- default_hive_mapred_queue = [webserver]
- base_url = https://devopsman.cn/airflow #自定义airflow域名 default_ui_timezone = Asia/Shanghai # 设置默认的时区
- web_server_host = 0.0.0.0 web_server_port = 8080
- web_server_ssl_cert = web_server_ssl_key =
- web_server_master_timeout = 120 web_server_worker_timeout = 120
- worker_refresh_batch_size = 1 worker_refresh_interval = 6000
- reload_on_plugin_change = False secret_key = emEfndkf3QWZ5zVLE1kVMg==
- workers = 4 worker_class = sync
- access_logfile = - error_logfile = -
- access_logformat = expose_config = False
- expose_hostname = True expose_stacktrace = True
- dag_default_view = tree dag_orientation = LR
- log_fetch_timeout_sec = 5 log_fetch_delay_sec = 2
- log_auto_tailing_offset = 30 log_animation_speed = 1000
- hide_paused_dags_by_default = False page_size = 100
- navbar_color = #fff default_dag_run_display_number = 25
- enable_proxy_fix = False proxy_fix_x_for = 1
- proxy_fix_x_proto = 1 proxy_fix_x_host = 1
- proxy_fix_x_port = 1 proxy_fix_x_prefix = 1
- cookie_secure = False cookie_samesite = Lax
- default_wrap = False x_frame_enabled = True
- show_recent_stats_for_completed_runs = True update_fab_perms = True
- session_lifetime_minutes = 43200 auto_refresh_interval = 3
- [email] email_backend = airflow.utils.email.send_email_smtp
- email_conn_id = smtp_default default_email_on_retry = True
- default_email_on_failure = True [smtp] # 邮箱配置
- smtp_host = localhost smtp_starttls = True
- smtp_ssl = False smtp_port = 25
- smtp_mail_from = airflow@example.com smtp_timeout = 30
- smtp_retry_limit = 5 [sentry]
- sentry_on = false sentry_dsn =
- [celery_kubernetes_executor] kubernetes_queue = kubernetes
- [celery] celery_app_name = airflow.executors.celery_executor
- worker_concurrency = 16 worker_umask = 0o077
- broker_url = redis://:xxxx@$${REDIS_HOST}:7480/0 result_backend = db+mysql://airflow:aaaa@$${MYSQL_HOST}:3306/airflow
- flower_host = 0.0.0.0 flower_url_prefix =
- flower_port = 5555 flower_basic_auth =
- sync_parallelism = 0 celery_config_options = airflow.config_templates.default_celery.DEFAULT_CELERY_CONFIG
- ssl_active = False ssl_key =
- ssl_cert = ssl_cacert =
- pool = prefork operation_timeout = 1.0
- task_track_started = True task_adoption_timeout = 600
- task_publish_max_retries = 3 worker_precheck = False
- [celery_broker_transport_options] [dask]
- cluster_address = 127.0.0.1:8786 tls_ca =
- tls_cert = tls_key =
- [scheduler] job_heartbeat_sec = 5
- scheduler_heartbeat_sec = 5 num_runs = -1
- scheduler_idle_sleep_time = 1 min_file_process_interval = 30
- dag_dir_list_interval = 300 print_stats_interval = 30
- pool_metrics_interval = 5.0 scheduler_health_check_threshold = 30
- orphaned_tasks_check_interval = 300.0 child_process_log_directory = /opt/airflow/logs/scheduler
- scheduler_zombie_task_threshold = 300 catchup_by_default = True
- max_tis_per_query = 512 use_row_level_locking = True
- max_dagruns_to_create_per_loop = 10 max_dagruns_per_loop_to_schedule = 20
- schedule_after_task_execution = True parsing_processes = 2
- file_parsing_sort_mode = modified_time use_job_schedule = True
- allow_trigger_in_future = False dependency_detector = airflow.serialization.serialized_objects.DependencyDetector
- trigger_timeout_check_interval = 15 [triggerer]
- default_capacity = 1000 [kerberos]
- ccache = /tmp/airflow_krb5_ccache principal = airflow
- reinit_frequency = 3600 kinit_path = kinit
- keytab = airflow.keytab forwardable = True
- include_ip = True [github_enterprise]
- api_rev = v3 [elasticsearch]
- host = log_id_template = {dag_id}-{task_id}-{execution_date}-{try_number}
- end_of_log_mark = end_of_log frontend =
- write_stdout = False json_format = False
- json_fields = asctime, filename, lineno, levelname, message host_field = host
- offset_field = offset [elasticsearch_configs]
- use_ssl = False verify_certs = True
- [kubernetes] pod_template_file =
- worker_container_repository = worker_container_tag =
- namespace = default delete_worker_pods = True
- delete_worker_pods_on_failure = False worker_pods_creation_batch_size = 1
- multi_namespace_mode = False in_cluster = True
- kube_client_request_args = delete_option_kwargs =
- enable_tcp_keepalive = True tcp_keep_idle = 120
- tcp_keep_intvl = 30 tcp_keep_cnt = 6
- verify_ssl = True worker_pods_pending_timeout = 300
- worker_pods_pending_timeout_check_interval = 120 worker_pods_queued_check_interval = 60
- worker_pods_pending_timeout_batch_size = 100 [smart_sensor]
- use_smart_sensor = False shard_code_upper_limit = 10000
- shards = 5 sensors_enabled = NamedHivePartitionSensor
修改完成之后,重启一下服务。
4数据同步
- docker-compose restart
因为airflow使用了三个worker节点,每个节点修改配置,其他节点都要同步,同时DAGS目录以及plugins目录也需要实时进行同步,在scheduler将信息调度到某个节点后,如果找不到对应的DAGS文件,就会报错,因此我们使用lsyncd进行数据实时同步:
- apt-get install lsyncd -y
配置节点之间通过公钥连接
- ssh-keygen -t rsa -C "airflow-sync" -b 4096 #生成一对名为airflow-sync的密钥 for ip in 100 200;do ssh-copy-id -i ~/.ssh/airflow-sync.pub ${USERNAME}@192.168.0.$ip -P12022;done
然后我们就可以通过私钥访问了其它节点了。
编辑同步的配置文件,lsyncd配置的更多参数学习,可以直达官方文档[2]
- settings { logfile = "/var/log/lsyncd.log", # 日志文件
- statusFile = "/var/log/lsyncd.status", # 同步状态信息 pidfile = "/var/run/lsyncd.pid",
- statusInterval = 1, nodaemon = false, # 守护进程
- inotifyMode = "CloseWrite", maxProcesses = 1,
- maxDelays = 1, }
- sync { default.rsync,
- source = "/data/airflow", target = "192.168.0.100:/data/airflow",
- rsync = {
- binary = "/usr/bin/rsync", compress = false,
- archive = true, owner = true,
- perms = true, --delete = true,
- whole_file = false, rsh = "/usr/bin/ssh -p 12022 -l suoper -o StrictHostKeyChecking=no -i /home/username/.ssh/airflow-rsync"
- }, }
- sync { default.rsync,
- source = "/data/airflow", target = "192.168.0.200:/data/airflow",
- rsync = {
- binary = "/usr/bin/rsync", compress = false,
- archive = true, owner = true,
- perms = true, --delete = true,
- whole_file = false, rsh = "/usr/bin/ssh -p 12022 -l suoper -o StrictHostKeyChecking=no -i /home/username/.ssh/airflow-rsync"
- }, }
以上的参数是什么意思,可以访问官网查看,此处是通过rsync的rsh定义ssh命令,能够解决使用了私钥,自定义端口等安全措施的场景,当然你也可以使用配置无密访问,然后使用default.rsync或者default.rsyncssh等进行配置。
配置lsyncd的服务托管
- cat << EOF > /etc/systemd/system/lsyncd.service [Unit]
- Description=lsyncd ConditionFileIsExecutable=/usr/bin/lsyncd
- After=network-online.target
- Wants=network-online.target
- [Service] StartLimitBurst=10
- ExecStart=/usr/bin/lsyncd /etc/lsyncd.conf Restart=on-failure
- RestartSec=120 EnvironmentFile=-/etc/sysconfig/aliyun
- KillMode=process [Install]
- WantedBy=multi-user.target EOF
- systemctl daemon-reload
- systemctl enable --now lsyncd.service #启动服务并配置开启自启
这样就完成了数据(dags,plugins,airflow.cfg)的同步问题,后期使用CICD场景的时候,便可以直接将dag文件上传到Bigdata1节点上即可,其他两个节点就会自动同步了。如果出现问题,可以通过查看日志进行debug
5反向代理[3]
- lsyncd -log all /etc/lsyncd.conf tail -f /var/log/lsyncd.log
如果你需要将airflow放在反向代理之后,如https://lab.mycompany.com/myorg/airflow/你可以通过一下配置完成:
在airflow.cfg中配置base_url
- base_url = http://my_host/myorg/airflow enable_proxy_fix = True
nginx的配置
- server { listen 80;
- server_name lab.mycompany.com;
- location /myorg/airflow/ { proxy_pass http://localhost:8080;
- proxy_set_header Host $http_host; proxy_redirect off;
- proxy_http_version 1.1; proxy_set_header Upgrade $http_upgrade;
- proxy_set_header Connection "upgrade"; }
- }
到这里就基本上完成的airflow分布式调度集群的安装了.看下具体效果如下。
看到这里说明你也正在使用或对Airflow感兴趣,顺便送你一个学习Airflow资料;
https://livebook.manning.com/book/data-pipelines-with-apache-airflow/chapter-12/1
参考资料
[1]Airflow 2.2.3 + MySQL8.0.27: https://mp.weixin.qq.com/s/VncpyXcTtlvnDkFrsAZ5lQ
[2]lsyncd config file: https://lsyncd.github.io/lsyncd/manual/config/file/
[3]airflow-behind-proxy: https://airflow.apache.org/docs/apache-airflow/stable/howto/run-behind-proxy.html