Skip to content

Commit

Permalink
[AIRFLOW-341][operators] Add resource requirement attributes to opera…
Browse files Browse the repository at this point in the history
…tors

This PR adds optional resource requirements for tasks for use with
resource managers such as Yarn and Mesos.

Considerations:
- I chose to force users to encapsulate resources in a resources object
e.g. Resources(cpu=1) instead of just cpu=1 in their dag attributes.
This creates the pain of having to import Resources for almost every
DAG. I think this is kind of important for scoping/namespacing which we
should start doing.
- Once resources are used by executors we need to add documentation for
these new resources (and examples)

Testing Done:
- New/existing unit tests

plypaul artwr mistercrunch jlowin bolkedebruin criccomini

Closes apache#1669 from aoen/ddavydov/ddavydov/augment_tasks_with_resources
  • Loading branch information
aoen committed Jul 19, 2016
1 parent 5d90d13 commit 348f25f
Show file tree
Hide file tree
Showing 4 changed files with 171 additions and 1 deletion.
10 changes: 9 additions & 1 deletion airflow/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,11 @@ def run_command(command):
'non_pooled_task_slot_count': 128,
},
'operators': {
'default_owner': 'airflow'
'default_owner': 'airflow',
'default_cpus': 1,
'default_ram': 512,
'default_disk': 512,
'default_gpus': 0,
},
'webserver': {
'base_url': 'http://localhost:8080',
Expand Down Expand Up @@ -255,6 +259,10 @@ def run_command(command):
# The default owner assigned to each new operator, unless
# provided explicitly or passed via `default_args`
default_owner = Airflow
default_cpus: 1,
default_ram: 512,
default_disk: 512,
default_gpu: 0,
[webserver]
Expand Down
6 changes: 6 additions & 0 deletions airflow/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
from airflow.utils.helpers import (
as_tuple, is_container, is_in, validate_key, pprinttable)
from airflow.utils.logging import LoggingMixin
from airflow.utils.operator_resources import Resources
from airflow.utils.state import State
from airflow.utils.timeout import timeout
from airflow.utils.trigger_rule import TriggerRule
Expand Down Expand Up @@ -1796,6 +1797,9 @@ class derived from this one results in the creation of a task object,
using the constants defined in the static class
``airflow.utils.TriggerRule``
:type trigger_rule: str
:param resources: A map of resource parameter names (the argument names of the
Resources constructor) to their values.
:type resources: dict
"""

# For derived classes to define which fields will get jinjaified
Expand Down Expand Up @@ -1836,6 +1840,7 @@ def __init__(
on_success_callback=None,
on_retry_callback=None,
trigger_rule=TriggerRule.ALL_SUCCESS,
resources=None,
*args,
**kwargs):

Expand Down Expand Up @@ -1898,6 +1903,7 @@ def __init__(
self.params = params or {} # Available in templates!
self.adhoc = adhoc
self.priority_weight = priority_weight
self.resources = Resources(**(resources or {}))

# Private attributes
self._upstream_task_ids = []
Expand Down
121 changes: 121 additions & 0 deletions airflow/utils/operator_resources.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
# -*- coding: utf-8 -*-
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from airflow import configuration
from airflow.exceptions import AirflowException

# Constants for resources (megabytes are the base unit)
MB = 1
GB = 1024 * MB
TB = 1024 * GB
PB = 1024 * TB
EB = 1024 * PB


class Resource(object):
"""
Represents a resource requirement in an execution environment for an operator.
:param name: Name of the resource
:type name: string
:param units_str: The string representing the units of a resource (e.g. MB for a CPU
resource) to be used for display purposes
:type units_str: string
:param qty: The number of units of the specified resource that are required for
execution of the operator.
:type qty: long
"""
def __init__(self, name, units_str, qty):
if qty < 0:
raise AirflowException(
'Received resource quantity {} for resource {} but resource quantity '
'must be non-negative.'.format(qty, name))

self._name = name
self._units_str = units_str
self._qty = qty

def __eq__(self, other):
return self.__dict__ == other.__dict__

def __repr__(self):
return str(self.__dict__)

@property
def name(self):
return self._name

@property
def units_str(self):
return self._units_str

@property
def qty(self):
return self._qty


class CpuResource(Resource):
def __init__(self, qty):
super(CpuResource, self).__init__('CPU', 'core(s)', qty)


class RamResource(Resource):
def __init__(self, qty):
super(RamResource, self).__init__('RAM', 'MB', qty)


class DiskResource(Resource):
def __init__(self, qty):
super(DiskResource, self).__init__('Disk', 'MB', qty)


class GpuResource(Resource):
def __init__(self, qty):
super(GpuResource, self).__init__('GPU', 'gpu(s)', qty)


class Resources(object):
"""
The resources required by an operator. Resources that are not specified will use the
default values from the airflow config.
:param cpus: The number of cpu cores that are required
:type cpus: long
:param ram: The amount of RAM required
:type ram: long
:param disk: The amount of disk space required
:type disk: long
:param gpus: The number of gpu units that are required
:type gpus: long
"""
def __init__(self, cpus=None, ram=None, disk=None, gpus=None):
if cpus is None:
cpus = configuration.getint('operators', 'default_cpus')
if ram is None:
ram = configuration.getint('operators', 'default_ram')
if disk is None:
disk = configuration.getint('operators', 'default_disk')
if gpus is None:
gpus = configuration.getint('operators', 'default_gpus')

self.cpus = CpuResource(cpus)
self.ram = RamResource(ram)
self.disk = DiskResource(disk)
self.gpus = GpuResource(gpus)

def __eq__(self, other):
return self.__dict__ == other.__dict__

def __repr__(self):
return str(self.__dict__)
35 changes: 35 additions & 0 deletions tests/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@
import unittest

import airflow.utils.logging
from airflow import configuration
from airflow.exceptions import AirflowException
from airflow.utils.operator_resources import Resources


class LogUtilsTest(unittest.TestCase):
Expand Down Expand Up @@ -54,3 +56,36 @@ def test_gcs_url_parse(self):
self.assertEqual(
glog.parse_gcs_url('gs://bucket/'),
('bucket', ''))


class OperatorResourcesTest(unittest.TestCase):
def test_all_resources_specified(self):
resources = Resources(cpus=1, ram=2, disk=3, gpus=4)
self.assertEqual(resources.cpus.qty, 1)
self.assertEqual(resources.ram.qty, 2)
self.assertEqual(resources.disk.qty, 3)
self.assertEqual(resources.gpus.qty, 4)

def test_some_resources_specified(self):
resources = Resources(cpus=0, disk=1)
self.assertEqual(resources.cpus.qty, 0)
self.assertEqual(resources.ram.qty,
configuration.defaults['operators']['default_ram'])
self.assertEqual(resources.disk.qty, 1)
self.assertEqual(resources.gpus.qty,
configuration.defaults['operators']['default_gpus'])

def test_no_resources_specified(self):
resources = Resources()
self.assertEqual(resources.cpus.qty,
configuration.defaults['operators']['default_cpus'])
self.assertEqual(resources.ram.qty,
configuration.defaults['operators']['default_ram'])
self.assertEqual(resources.disk.qty,
configuration.defaults['operators']['default_disk'])
self.assertEqual(resources.gpus.qty,
configuration.defaults['operators']['default_gpus'])

def test_negative_resource_qty(self):
with self.assertRaises(AirflowException):
Resources(cpus=-1)

0 comments on commit 348f25f

Please sign in to comment.