Skip to content

Commit

Permalink
[AIRFLOW-276] Gunicorn rolling restart
Browse files Browse the repository at this point in the history
- Tell gunicorn to prepend `[ready]` to worker process name once worker is ready (to serve requests) - in particular this happens after DAGs folder is parsed
- Airflow cli runs gunicorn as a child process instead of `excecvp`-ing over itself
- Airflow cli monitors gunicorn worker processes and restarts them by sending TTIN/TTOU signals to the gunicorn master process
- Fix bug where `conf.get('webserver', 'workers')` and `conf.get('webserver', 'webserver_worker_timeout')` were ignored

- Alternatively, https://github.com/apache/incubator-airflow/pull/1684/files does the same thing but the worker-restart script is provided separately for the user to run

- Start airflow, observe that workers are restarted
- Add new dags to dags folder and check that they show up
- Run `siege` against airflow while server is restarting and confirm that all requests succeed
- Run with configuration set to `batch_size = 0`, `batch_size = 1` and `batch_size = 4`

Closes apache#1685 from zodiac/xuanji_gunicorn_rolling_restart_2
  • Loading branch information
ldct authored and aoen committed Aug 8, 2016
1 parent 62768bc commit 9d254a3
Show file tree
Hide file tree
Showing 6 changed files with 185 additions and 16 deletions.
157 changes: 142 additions & 15 deletions airflow/bin/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,16 +33,19 @@
import sys
import threading
import traceback
import time
import psutil

import airflow
from airflow import jobs, settings
from airflow import configuration as conf
from airflow.exceptions import AirflowException
from airflow.executors import DEFAULT_EXECUTOR
from airflow.models import DagModel, DagBag, TaskInstance, DagPickle, DagRun, Variable
from airflow.utils import db as db_utils
from airflow.utils import logging as logging_utils
from airflow.utils.state import State
from airflow.exceptions import AirflowException
from airflow.www.app import cached_app

DAGS_FOLDER = os.path.expanduser(conf.get('core', 'DAGS_FOLDER'))

Expand Down Expand Up @@ -500,16 +503,127 @@ def clear(args):
include_subdags=not args.exclude_subdags)


def restart_workers(gunicorn_master_proc, num_workers_expected):
"""
Runs forever, monitoring the child processes of @gunicorn_master_proc and
restarting workers occasionally.
Each iteration of the loop traverses one edge of this state transition
diagram, where each state (node) represents
[ num_ready_workers_running / num_workers_running ]. We expect most time to
be spent in [n / n]. `bs` is the setting webserver.worker_refresh_batch_size.
The horizontal transition at ? happens after the new worker parses all the
dags (so it could take a while!)
V ────────────────────────────────────────────────────────────────────────┐
[n / n] ──TTIN──> [ [n, n+bs) / n + bs ] ────?───> [n + bs / n + bs] ──TTOU─┘
^ ^───────────────┘
│ ┌────────────────v
└──────┴────── [ [0, n) / n ] <─── start
We change the number of workers by sending TTIN and TTOU to the gunicorn
master process, which increases and decreases the number of child workers
respectively. Gunicorn guarantees that on TTOU workers are terminated
gracefully and that the oldest worker is terminated.
"""

def wait_until_true(fn):
"""
Sleeps until fn is true
"""
while not fn():
time.sleep(0.1)

def get_num_workers_running(gunicorn_master_proc):
workers = psutil.Process(gunicorn_master_proc.pid).children()
return len(workers)

def get_num_ready_workers_running(gunicorn_master_proc):
workers = psutil.Process(gunicorn_master_proc.pid).children()
ready_workers = [
proc for proc in workers
if settings.GUNICORN_WORKER_READY_PREFIX in proc.cmdline()[0]
]
return len(ready_workers)

def start_refresh(gunicorn_master_proc):
batch_size = conf.getint('webserver', 'worker_refresh_batch_size')
logging.debug('%s doing a refresh of %s workers',
state, batch_size)
sys.stdout.flush()
sys.stderr.flush()

excess = 0
for _ in range(batch_size):
gunicorn_master_proc.send_signal(signal.SIGTTIN)
excess += 1
wait_until_true(lambda: num_workers_expected + excess ==
get_num_workers_running(gunicorn_master_proc))


wait_until_true(lambda: num_workers_expected ==
get_num_workers_running(gunicorn_master_proc))

while True:

num_workers_running = get_num_workers_running(gunicorn_master_proc)
num_ready_workers_running = get_num_ready_workers_running(gunicorn_master_proc)

state = '[{0} / {1}]'.format(num_ready_workers_running, num_workers_running)

# Whenever some workers are not ready, wait until all workers are ready
if num_ready_workers_running < num_workers_running:
logging.debug('%s some workers are starting up, waiting...', state)
sys.stdout.flush()
time.sleep(1)

# Kill a worker gracefully by asking gunicorn to reduce number of workers
elif num_workers_running > num_workers_expected:
excess = num_workers_running - num_workers_expected
logging.debug('%s killing %s workers', state, excess)

for _ in range(excess):
gunicorn_master_proc.send_signal(signal.SIGTTOU)
excess -= 1
wait_until_true(lambda: num_workers_expected + excess ==
get_num_workers_running(gunicorn_master_proc))

# Start a new worker by asking gunicorn to increase number of workers
elif num_workers_running == num_workers_expected:
refresh_interval = conf.getint('webserver', 'worker_refresh_interval')
logging.debug(
'%s sleeping for %ss starting doing a refresh...',
state, refresh_interval
)
time.sleep(refresh_interval)
start_refresh(gunicorn_master_proc)

else:
# num_ready_workers_running == num_workers_running < num_workers_expected
logging.error((
"%s some workers seem to have died and gunicorn"
"did not restart them as expected"
), state)
time.sleep(10)
if len(
psutil.Process(gunicorn_master_proc.pid).children()
) < num_workers_expected:
start_refresh(gunicorn_master_proc)


def webserver(args):

print(settings.HEADER)

from airflow.www.app import cached_app
app = cached_app(conf)
access_logfile = args.access_logfile or conf.get('webserver', 'access_logfile')
error_logfile = args.error_logfile or conf.get('webserver', 'error_logfile')
workers = args.workers or conf.get('webserver', 'workers')
num_workers = args.workers or conf.get('webserver', 'workers')
worker_timeout = (args.worker_timeout or
conf.get('webserver', 'webserver_worker_timeout'))

if args.debug:
print(
"Starting the web server on port {0} and host {1}.".format(
Expand All @@ -520,7 +634,7 @@ def webserver(args):
print(
textwrap.dedent('''\
Running the Gunicorn Server with:
Workers: {workers} {args.workerclass}
Workers: {num_workers} {args.workerclass}
Host: {args.hostname}:{args.port}
Timeout: {worker_timeout}
Logfiles: {access_logfile} {error_logfile}
Expand All @@ -529,12 +643,13 @@ def webserver(args):

run_args = [
'gunicorn',
'-w ' + str(args.workers),
'-k ' + str(args.workerclass),
'-t ' + str(args.worker_timeout),
'-b ' + args.hostname + ':' + str(args.port),
'-n ' + 'airflow-webserver',
'-p ' + str(pid),
'-w', str(num_workers),
'-k', str(args.workerclass),
'-t', str(worker_timeout),
'-b', args.hostname + ':' + str(args.port),
'-n', 'airflow-webserver',
'-p', str(pid),
'-c', 'airflow.www.gunicorn_config'
]

if args.access_logfile:
Expand All @@ -546,11 +661,23 @@ def webserver(args):
if args.daemon:
run_args += ["-D"]

module = "airflow.www.app:cached_app()".encode()
run_args += [module]
os.execvp(
'gunicorn', run_args
)
run_args += ["airflow.www.app:cached_app()"]

gunicorn_master_proc = subprocess.Popen(run_args)

def kill_proc(dummy_signum, dummy_frame):
gunicorn_master_proc.terminate()
gunicorn_master_proc.wait()
sys.exit(0)

signal.signal(signal.SIGINT, kill_proc)
signal.signal(signal.SIGTERM, kill_proc)

# These run forever until SIG{INT, TERM, KILL, ...} signal is sent
if conf.getint('webserver', 'worker_refresh_interval') > 0:
restart_workers(gunicorn_master_proc, num_workers)
else:
while True: time.sleep(1)


def scheduler(args):
Expand Down
12 changes: 11 additions & 1 deletion airflow/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,8 @@ def run_command(command):
'web_server_host': '0.0.0.0',
'web_server_port': '8080',
'web_server_worker_timeout': 120,
'worker_refresh_batch_size': 1,
'worker_refresh_interval': 30,
'authenticate': False,
'filter_by_owner': False,
'owner_mode': 'user',
Expand Down Expand Up @@ -282,9 +284,17 @@ def run_command(command):
# The port on which to run the web server
web_server_port = 8080
# The time the gunicorn webserver waits before timing out on a worker
# Number of seconds the gunicorn webserver waits before timing out on a worker
web_server_worker_timeout = 120
# Number of workers to refresh at a time. When set to 0, worker refresh is
# disabled. When nonzero, airflow periodically refreshes webserver workers by
# bringing up new ones and killing old ones.
worker_refresh_batch_size = 1
# Number of seconds to wait before refreshing a batch of workers.
worker_refresh_interval = 30
# Secret key used to run your flask app
secret_key = temporary_key
Expand Down
5 changes: 5 additions & 0 deletions airflow/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,14 @@ def timing(cls, stat, dt):
LOGGING_LEVEL = logging.INFO
DAGS_FOLDER = os.path.expanduser(conf.get('core', 'DAGS_FOLDER'))

# the prefix to append to gunicorn worker processes after init
GUNICORN_WORKER_READY_PREFIX = "[ready] "

# can't move this to conf due to ConfigParser interpolation
LOG_FORMAT = (
'[%(asctime)s] {%(filename)s:%(lineno)d} %(levelname)s - %(message)s')
LOG_FORMAT_WITH_PID = (
'[%(asctime)s] [%(process)d] {%(filename)s:%(lineno)d} %(levelname)s - %(message)s')
LOG_FORMAT_WITH_THREAD_NAME = (
'[%(asctime)s] {%(filename)s:%(lineno)d} %(threadName)s %(levelname)s - %(message)s')
SIMPLE_LOG_FORMAT = '%(asctime)s %(levelname)s - %(message)s'
Expand Down
3 changes: 3 additions & 0 deletions airflow/www/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ def create_app(config=None):

app.register_blueprint(routes)

log_format = airflow.settings.LOG_FORMAT_WITH_PID
airflow.settings.configure_logging(log_format=log_format)

with app.app_context():
from airflow.www import views

Expand Down
23 changes: 23 additions & 0 deletions airflow/www/gunicorn_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import setproctitle
from airflow import settings


def post_worker_init(dummy_worker):
setproctitle.setproctitle(
settings.GUNICORN_WORKER_READY_PREFIX + setproctitle.getproctitle()
)
1 change: 1 addition & 0 deletions scripts/ci/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ pyhive
pydruid
PyOpenSSL
PySmbClient
psutil>=4.2.0, <5.0.0
psycopg2
python-dateutil
redis
Expand Down

0 comments on commit 9d254a3

Please sign in to comment.