Skip to content

Commit

Permalink
[AIRFLOW-4478] Lazily instantiate default resources objects. (apache#…
Browse files Browse the repository at this point in the history
…5259)

Instantiating `Resources` and its child classes takes non-negligible
time when users create many operators. To save time, don't create the 
resources object until it is needed.
  • Loading branch information
jmcarp authored and ashb committed Jul 4, 2019
1 parent dc6909f commit 526c65a
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 35 deletions.
3 changes: 2 additions & 1 deletion airflow/contrib/task_runner/cgroup_task_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

from airflow.task.task_runner.base_task_runner import BaseTaskRunner
from airflow.utils.helpers import reap_process_group
from airflow.utils.operator_resources import Resources


class CgroupTaskRunner(BaseTaskRunner):
Expand Down Expand Up @@ -138,7 +139,7 @@ def start(self):

# Get the resource requirements from the task
task = self._task_instance.task
resources = task.resources
resources = task.resources if task.resources is not None else Resources()
cpus = resources.cpus.qty
self._cpu_shares = cpus * 1024
self._mem_mb_limit = resources.ram.qty
Expand Down
46 changes: 23 additions & 23 deletions airflow/models/baseoperator.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,28 @@ class derived from this one results in the creation of a task object,
# Defines the operator level extra links
operator_extra_links = () # type: Iterable[BaseOperatorLink]

_comps = {
'task_id',
'dag_id',
'owner',
'email',
'email_on_retry',
'retry_delay',
'retry_exponential_backoff',
'max_retry_delay',
'start_date',
'schedule_interval',
'depends_on_past',
'wait_for_downstream',
'priority_weight',
'sla',
'execution_timeout',
'on_failure_callback',
'on_success_callback',
'on_retry_callback',
'do_xcom_push',
}

@apply_defaults
def __init__(
self,
Expand Down Expand Up @@ -351,7 +373,7 @@ def __init__(
d=dag.dag_id if dag else "", t=task_id, tr=weight_rule))
self.weight_rule = weight_rule

self.resources = Resources(**(resources or {}))
self.resources = Resources(*resources) if resources is not None else None
self.run_as_user = run_as_user
self.task_concurrency = task_concurrency
self.executor_config = executor_config or {}
Expand Down Expand Up @@ -389,28 +411,6 @@ def __init__(
if outlets:
self._outlets.update(outlets)

self._comps = {
'task_id',
'dag_id',
'owner',
'email',
'email_on_retry',
'retry_delay',
'retry_exponential_backoff',
'max_retry_delay',
'start_date',
'schedule_interval',
'depends_on_past',
'wait_for_downstream',
'priority_weight',
'sla',
'execution_timeout',
'on_failure_callback',
'on_success_callback',
'on_retry_callback',
'do_xcom_push',
}

def __eq__(self, other):
if (type(self) == type(other) and
self.task_id == other.task_id):
Expand Down
22 changes: 11 additions & 11 deletions airflow/models/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,17 @@ class DAG(BaseDag, LoggingMixin):
:type is_paused_upon_creation: bool or None
"""

_comps = {
'dag_id',
'task_ids',
'parent_dag',
'start_date',
'schedule_interval',
'full_filepath',
'template_searchpath',
'last_loaded',
}

def __init__(
self,
dag_id: str,
Expand Down Expand Up @@ -280,17 +291,6 @@ def __init__(
self._access_control = access_control
self.is_paused_upon_creation = is_paused_upon_creation

self._comps = {
'dag_id',
'task_ids',
'parent_dag',
'start_date',
'schedule_interval',
'full_filepath',
'template_searchpath',
'last_loaded',
}

def __repr__(self):
return "<DAG: {self.dag_id}>".format(self=self)

Expand Down

0 comments on commit 526c65a

Please sign in to comment.