Skip to content

Commit

Permalink
[AIRFLOW-1160] Update Spark parameters for Mesos
Browse files Browse the repository at this point in the history
Closes apache#2265 from cameres/master
  • Loading branch information
cameres authored and bolkedebruin committed May 1, 2017
1 parent 48135ad commit 2e3f07f
Show file tree
Hide file tree
Showing 6 changed files with 30 additions and 4 deletions.
8 changes: 7 additions & 1 deletion airflow/contrib/hooks/spark_sql_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@ class SparkSqlHook(BaseHook):
:type conf: str (format: PROP=VALUE)
:param conn_id: connection_id string
:type conn_id: str
:param executor_cores: Number of cores per executor
:param total_executor_cores: (Standalone & Mesos only) Total cores for all executors (Default: all the available cores on the worker)
:type total_executor_cores: int
:param executor_cores: (Standalone & YARN only) Number of cores per executor (Default: 2)
:type executor_cores: int
:param executor_memory: Memory per executor (e.g. 1000M, 2G) (Default: 1G)
:type executor_memory: str
Expand All @@ -52,6 +54,7 @@ def __init__(self,
sql,
conf=None,
conn_id='spark_sql_default',
total_executor_cores=None,
executor_cores=None,
executor_memory=None,
keytab=None,
Expand All @@ -64,6 +67,7 @@ def __init__(self,
self._sql = sql
self._conf = conf
self._conn = self.get_connection(conn_id)
self._total_executor_cores = total_executor_cores
self._executor_cores = executor_cores
self._executor_memory = executor_memory
self._keytab = keytab
Expand All @@ -89,6 +93,8 @@ def _prepare_command(self, cmd):
if self._conf:
for conf_el in self._conf.split(","):
connection_cmd += ["--conf", conf_el]
if self._total_executor_cores:
connection_cmd += ["--total-executor-cores", str(self._total_executor_cores)]
if self._executor_cores:
connection_cmd += ["--executor-cores", str(self._executor_cores)]
if self._executor_memory:
Expand Down
8 changes: 7 additions & 1 deletion airflow/contrib/hooks/spark_submit_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,9 @@ class SparkSubmitHook(BaseHook):
:type jars: str
:param java_class: the main class of the Java application
:type java_class: str
:param executor_cores: Number of cores per executor (Default: 2)
:param total_executor_cores: (Standalone & Mesos only) Total cores for all executors (Default: all the available cores on the worker)
:type total_executor_cores: int
:param executor_cores: (Standalone & YARN only) Number of cores per executor (Default: 2)
:type executor_cores: int
:param executor_memory: Memory per executor (e.g. 1000M, 2G) (Default: 1G)
:type executor_memory: str
Expand All @@ -69,6 +71,7 @@ def __init__(self,
py_files=None,
jars=None,
java_class=None,
total_executor_cores=None,
executor_cores=None,
executor_memory=None,
driver_memory=None,
Expand All @@ -84,6 +87,7 @@ def __init__(self,
self._py_files = py_files
self._jars = jars
self._java_class = java_class
self._total_executor_cores = total_executor_cores
self._executor_cores = executor_cores
self._executor_memory = executor_memory
self._driver_memory = driver_memory
Expand Down Expand Up @@ -163,6 +167,8 @@ def _build_command(self, application):
connection_cmd += ["--jars", self._jars]
if self._num_executors:
connection_cmd += ["--num-executors", str(self._num_executors)]
if self._total_executor_cores:
connection_cmd += ["--total-executor-cores", str(self._total_executor_cores)]
if self._executor_cores:
connection_cmd += ["--executor-cores", str(self._executor_cores)]
if self._executor_memory:
Expand Down
7 changes: 6 additions & 1 deletion airflow/contrib/operators/spark_sql_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ class SparkSqlOperator(BaseOperator):
:type conf: str (format: PROP=VALUE)
:param conn_id: connection_id string
:type conn_id: str
:param executor_cores: Number of cores per executor
:param total_executor_cores: (Standalone & Mesos only) Total cores for all executors (Default: all the available cores on the worker)
:type total_executor_cores: int
:param executor_cores: (Standalone & YARN only) Number of cores per executor (Default: 2)
:type executor_cores: int
:param executor_memory: Memory per executor (e.g. 1000M, 2G) (Default: 1G)
:type executor_memory: str
Expand All @@ -52,6 +54,7 @@ def __init__(self,
sql,
conf=None,
conn_id='spark_sql_default',
total_executor_cores=None,
executor_cores=None,
executor_memory=None,
keytab=None,
Expand All @@ -65,6 +68,7 @@ def __init__(self,
self._sql = sql
self._conf = conf
self._conn_id = conn_id
self._total_executor_cores = total_executor_cores
self._executor_cores = executor_cores
self._executor_memory = executor_memory
self._keytab = keytab
Expand All @@ -81,6 +85,7 @@ def execute(self, context):
self._hook = SparkSqlHook(sql=self._sql,
conf=self._conf,
conn_id=self._conn_id,
total_executor_cores=self._total_executor_cores,
executor_cores=self._executor_cores,
executor_memory=self._executor_memory,
keytab=self._keytab,
Expand Down
7 changes: 6 additions & 1 deletion airflow/contrib/operators/spark_submit_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,9 @@ class SparkSubmitOperator(BaseOperator):
:type jars: str
:param java_class: the main class of the Java application
:type java_class: str
:param executor_cores: Number of cores per executor (Default: 2)
:param total_executor_cores: (Standalone & Mesos only) Total cores for all executors (Default: all the available cores on the worker)
:type total_executor_cores: int
:param executor_cores: (Standalone & YARN only) Number of cores per executor (Default: 2)
:type executor_cores: int
:param executor_memory: Memory per executor (e.g. 1000M, 2G) (Default: 1G)
:type executor_memory: str
Expand Down Expand Up @@ -71,6 +73,7 @@ def __init__(self,
py_files=None,
jars=None,
java_class=None,
total_executor_cores=None,
executor_cores=None,
executor_memory=None,
driver_memory=None,
Expand All @@ -89,6 +92,7 @@ def __init__(self,
self._py_files = py_files
self._jars = jars
self._java_class = java_class
self._total_executor_cores = total_executor_cores
self._executor_cores = executor_cores
self._executor_memory = executor_memory
self._driver_memory = driver_memory
Expand All @@ -112,6 +116,7 @@ def execute(self, context):
py_files=self._py_files,
jars=self._jars,
java_class=self._java_class,
total_executor_cores=self._total_executor_cores,
executor_cores=self._executor_cores,
executor_memory=self._executor_memory,
driver_memory=self._driver_memory,
Expand Down
2 changes: 2 additions & 0 deletions tests/contrib/hooks/test_spark_submit_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ class TestSparkSubmitHook(unittest.TestCase):
'files': 'hive-site.xml',
'py_files': 'sample_library.py',
'jars': 'parquet.jar',
'total_executor_cores': 4,
'executor_cores': 4,
'executor_memory': '22g',
'keytab': 'privileged_user.keytab',
Expand Down Expand Up @@ -94,6 +95,7 @@ def test_build_command(self):
assert "--files {}".format(self._config['files']) in cmd
assert "--py-files {}".format(self._config['py_files']) in cmd
assert "--jars {}".format(self._config['jars']) in cmd
assert "--total-executor-cores {}".format(self._config['total_executor_cores']) in cmd
assert "--executor-cores {}".format(self._config['executor_cores']) in cmd
assert "--executor-memory {}".format(self._config['executor_memory']) in cmd
assert "--keytab {}".format(self._config['keytab']) in cmd
Expand Down
2 changes: 2 additions & 0 deletions tests/contrib/operators/test_spark_submit_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ class TestSparkSubmitOperator(unittest.TestCase):
'files': 'hive-site.xml',
'py_files': 'sample_library.py',
'jars': 'parquet.jar',
'total_executor_cores':4,
'executor_cores': 4,
'executor_memory': '22g',
'keytab': 'privileged_user.keytab',
Expand Down Expand Up @@ -75,6 +76,7 @@ def test_execute(self, conn_id='spark_default'):
self.assertEqual(self._config['files'], operator._files)
self.assertEqual(self._config['py_files'], operator._py_files)
self.assertEqual(self._config['jars'], operator._jars)
self.assertEqual(self._config['total_executor_cores'], operator._total_executor_cores)
self.assertEqual(self._config['executor_cores'], operator._executor_cores)
self.assertEqual(self._config['executor_memory'], operator._executor_memory)
self.assertEqual(self._config['keytab'], operator._keytab)
Expand Down

0 comments on commit 2e3f07f

Please sign in to comment.