Skip to content

Commit

Permalink
Merge remote-tracking branch 'apache/master'
Browse files Browse the repository at this point in the history
  • Loading branch information
bolkedebruin committed Aug 9, 2016
2 parents 1d67d62 + 9d254a3 commit 2c3d0fd
Show file tree
Hide file tree
Showing 8 changed files with 207 additions and 34 deletions.
157 changes: 142 additions & 15 deletions airflow/bin/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,16 +33,19 @@
import sys
import threading
import traceback
import time
import psutil

import airflow
from airflow import jobs, settings
from airflow import configuration as conf
from airflow.exceptions import AirflowException
from airflow.executors import DEFAULT_EXECUTOR
from airflow.models import DagModel, DagBag, TaskInstance, DagPickle, DagRun, Variable
from airflow.utils import db as db_utils
from airflow.utils import logging as logging_utils
from airflow.utils.state import State
from airflow.exceptions import AirflowException
from airflow.www.app import cached_app

DAGS_FOLDER = os.path.expanduser(conf.get('core', 'DAGS_FOLDER'))

Expand Down Expand Up @@ -500,16 +503,127 @@ def clear(args):
include_subdags=not args.exclude_subdags)


def restart_workers(gunicorn_master_proc, num_workers_expected):
"""
Runs forever, monitoring the child processes of @gunicorn_master_proc and
restarting workers occasionally.
Each iteration of the loop traverses one edge of this state transition
diagram, where each state (node) represents
[ num_ready_workers_running / num_workers_running ]. We expect most time to
be spent in [n / n]. `bs` is the setting webserver.worker_refresh_batch_size.
The horizontal transition at ? happens after the new worker parses all the
dags (so it could take a while!)
V ────────────────────────────────────────────────────────────────────────┐
[n / n] ──TTIN──> [ [n, n+bs) / n + bs ] ────?───> [n + bs / n + bs] ──TTOU─┘
^ ^───────────────┘
│ ┌────────────────v
└──────┴────── [ [0, n) / n ] <─── start
We change the number of workers by sending TTIN and TTOU to the gunicorn
master process, which increases and decreases the number of child workers
respectively. Gunicorn guarantees that on TTOU workers are terminated
gracefully and that the oldest worker is terminated.
"""

def wait_until_true(fn):
"""
Sleeps until fn is true
"""
while not fn():
time.sleep(0.1)

def get_num_workers_running(gunicorn_master_proc):
workers = psutil.Process(gunicorn_master_proc.pid).children()
return len(workers)

def get_num_ready_workers_running(gunicorn_master_proc):
workers = psutil.Process(gunicorn_master_proc.pid).children()
ready_workers = [
proc for proc in workers
if settings.GUNICORN_WORKER_READY_PREFIX in proc.cmdline()[0]
]
return len(ready_workers)

def start_refresh(gunicorn_master_proc):
batch_size = conf.getint('webserver', 'worker_refresh_batch_size')
logging.debug('%s doing a refresh of %s workers',
state, batch_size)
sys.stdout.flush()
sys.stderr.flush()

excess = 0
for _ in range(batch_size):
gunicorn_master_proc.send_signal(signal.SIGTTIN)
excess += 1
wait_until_true(lambda: num_workers_expected + excess ==
get_num_workers_running(gunicorn_master_proc))


wait_until_true(lambda: num_workers_expected ==
get_num_workers_running(gunicorn_master_proc))

while True:

num_workers_running = get_num_workers_running(gunicorn_master_proc)
num_ready_workers_running = get_num_ready_workers_running(gunicorn_master_proc)

state = '[{0} / {1}]'.format(num_ready_workers_running, num_workers_running)

# Whenever some workers are not ready, wait until all workers are ready
if num_ready_workers_running < num_workers_running:
logging.debug('%s some workers are starting up, waiting...', state)
sys.stdout.flush()
time.sleep(1)

# Kill a worker gracefully by asking gunicorn to reduce number of workers
elif num_workers_running > num_workers_expected:
excess = num_workers_running - num_workers_expected
logging.debug('%s killing %s workers', state, excess)

for _ in range(excess):
gunicorn_master_proc.send_signal(signal.SIGTTOU)
excess -= 1
wait_until_true(lambda: num_workers_expected + excess ==
get_num_workers_running(gunicorn_master_proc))

# Start a new worker by asking gunicorn to increase number of workers
elif num_workers_running == num_workers_expected:
refresh_interval = conf.getint('webserver', 'worker_refresh_interval')
logging.debug(
'%s sleeping for %ss starting doing a refresh...',
state, refresh_interval
)
time.sleep(refresh_interval)
start_refresh(gunicorn_master_proc)

else:
# num_ready_workers_running == num_workers_running < num_workers_expected
logging.error((
"%s some workers seem to have died and gunicorn"
"did not restart them as expected"
), state)
time.sleep(10)
if len(
psutil.Process(gunicorn_master_proc.pid).children()
) < num_workers_expected:
start_refresh(gunicorn_master_proc)


def webserver(args):

print(settings.HEADER)

from airflow.www.app import cached_app
app = cached_app(conf)
access_logfile = args.access_logfile or conf.get('webserver', 'access_logfile')
error_logfile = args.error_logfile or conf.get('webserver', 'error_logfile')
workers = args.workers or conf.get('webserver', 'workers')
num_workers = args.workers or conf.get('webserver', 'workers')
worker_timeout = (args.worker_timeout or
conf.get('webserver', 'webserver_worker_timeout'))

if args.debug:
print(
"Starting the web server on port {0} and host {1}.".format(
Expand All @@ -520,7 +634,7 @@ def webserver(args):
print(
textwrap.dedent('''\
Running the Gunicorn Server with:
Workers: {workers} {args.workerclass}
Workers: {num_workers} {args.workerclass}
Host: {args.hostname}:{args.port}
Timeout: {worker_timeout}
Logfiles: {access_logfile} {error_logfile}
Expand All @@ -529,12 +643,13 @@ def webserver(args):

run_args = [
'gunicorn',
'-w ' + str(args.workers),
'-k ' + str(args.workerclass),
'-t ' + str(args.worker_timeout),
'-b ' + args.hostname + ':' + str(args.port),
'-n ' + 'airflow-webserver',
'-p ' + str(pid),
'-w', str(num_workers),
'-k', str(args.workerclass),
'-t', str(worker_timeout),
'-b', args.hostname + ':' + str(args.port),
'-n', 'airflow-webserver',
'-p', str(pid),
'-c', 'airflow.www.gunicorn_config'
]

if args.access_logfile:
Expand All @@ -546,11 +661,23 @@ def webserver(args):
if args.daemon:
run_args += ["-D"]

module = "airflow.www.app:cached_app()".encode()
run_args += [module]
os.execvp(
'gunicorn', run_args
)
run_args += ["airflow.www.app:cached_app()"]

gunicorn_master_proc = subprocess.Popen(run_args)

def kill_proc(dummy_signum, dummy_frame):
gunicorn_master_proc.terminate()
gunicorn_master_proc.wait()
sys.exit(0)

signal.signal(signal.SIGINT, kill_proc)
signal.signal(signal.SIGTERM, kill_proc)

# These run forever until SIG{INT, TERM, KILL, ...} signal is sent
if conf.getint('webserver', 'worker_refresh_interval') > 0:
restart_workers(gunicorn_master_proc, num_workers)
else:
while True: time.sleep(1)


def scheduler(args):
Expand Down
12 changes: 11 additions & 1 deletion airflow/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,8 @@ def run_command(command):
'web_server_host': '0.0.0.0',
'web_server_port': '8080',
'web_server_worker_timeout': 120,
'worker_refresh_batch_size': 1,
'worker_refresh_interval': 30,
'authenticate': False,
'filter_by_owner': False,
'owner_mode': 'user',
Expand Down Expand Up @@ -282,9 +284,17 @@ def run_command(command):
# The port on which to run the web server
web_server_port = 8080
# The time the gunicorn webserver waits before timing out on a worker
# Number of seconds the gunicorn webserver waits before timing out on a worker
web_server_worker_timeout = 120
# Number of workers to refresh at a time. When set to 0, worker refresh is
# disabled. When nonzero, airflow periodically refreshes webserver workers by
# bringing up new ones and killing old ones.
worker_refresh_batch_size = 1
# Number of seconds to wait before refreshing a batch of workers.
worker_refresh_interval = 30
# Secret key used to run your flask app
secret_key = temporary_key
Expand Down
2 changes: 1 addition & 1 deletion airflow/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -2919,7 +2919,7 @@ def set_dag_runs_state(
dates = utils_date_range(start_date, end_date)
drs = session.query(DagModel).filter_by(dag_id=self.dag_id).all()
for dr in drs:
dr.state = State.RUNNING
dr.state = state

def clear(
self, start_date=None, end_date=None,
Expand Down
5 changes: 5 additions & 0 deletions airflow/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,14 @@ def timing(cls, stat, dt):
LOGGING_LEVEL = logging.INFO
DAGS_FOLDER = os.path.expanduser(conf.get('core', 'DAGS_FOLDER'))

# the prefix to append to gunicorn worker processes after init
GUNICORN_WORKER_READY_PREFIX = "[ready] "

# can't move this to conf due to ConfigParser interpolation
LOG_FORMAT = (
'[%(asctime)s] {%(filename)s:%(lineno)d} %(levelname)s - %(message)s')
LOG_FORMAT_WITH_PID = (
'[%(asctime)s] [%(process)d] {%(filename)s:%(lineno)d} %(levelname)s - %(message)s')
LOG_FORMAT_WITH_THREAD_NAME = (
'[%(asctime)s] {%(filename)s:%(lineno)d} %(threadName)s %(levelname)s - %(message)s')
SIMPLE_LOG_FORMAT = '%(asctime)s %(levelname)s - %(message)s'
Expand Down
3 changes: 3 additions & 0 deletions airflow/www/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ def create_app(config=None):

app.register_blueprint(routes)

log_format = airflow.settings.LOG_FORMAT_WITH_PID
airflow.settings.configure_logging(log_format=log_format)

with app.app_context():
from airflow.www import views

Expand Down
24 changes: 7 additions & 17 deletions dags/testdruid.py → airflow/www/gunicorn_config.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Licensed under the Apache License, Version 2.0 (the "License");
Expand All @@ -12,22 +13,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from airflow.operators.hive_to_druid import HiveToDruidTransfer
from airflow import DAG
from datetime import datetime
import setproctitle
from airflow import settings

args = {
'owner': 'qi_wang',
'start_date': datetime(2015, 4, 4),
}

dag = DAG("test_druid", default_args=args)


HiveToDruidTransfer(task_id="load_dummy_test",
sql="select * from qi.druid_test_dataset_w_platform_1 \
limit 10;",
druid_datasource="airflow_test",
ts_dim="ds",
dag=dag
)
def post_worker_init(dummy_worker):
setproctitle.setproctitle(
settings.GUNICORN_WORKER_READY_PREFIX + setproctitle.getproctitle()
)
37 changes: 37 additions & 0 deletions dags/test_dag.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
# -*- coding: utf-8 -*-
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from datetime import datetime

now = datetime.now()
now_to_the_hour = now.replace(hour=now.time().hour-3 , minute=0, second=0, microsecond=0)
START_DATE = now_to_the_hour
DAG_NAME = 'test_dag_v1'

default_args = {
'owner': 'airflow',
'depends_on_past': True,
'start_date': START_DATE,
}
dag = DAG(DAG_NAME, schedule_interval='*/10 * * * *', default_args=default_args)

run_this_1 = DummyOperator(task_id='run_this_1', dag=dag)
run_this_2 = DummyOperator(task_id='run_this_2', dag=dag)
run_this_2.set_upstream(run_this_1)
run_this_3 = DummyOperator(task_id='run_this_3', dag=dag)
run_this_3.set_upstream(run_this_2)


1 change: 1 addition & 0 deletions scripts/ci/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ pyhive
pydruid
PyOpenSSL
PySmbClient
psutil>=4.2.0, <5.0.0
psycopg2
python-dateutil
redis
Expand Down

0 comments on commit 2c3d0fd

Please sign in to comment.