Skip to content

Commit

Permalink
[AIRFLOW-4659] Fix pylint problems for api module (apache#5398)
Browse files Browse the repository at this point in the history
  • Loading branch information
potiuk authored Jun 10, 2019
1 parent 201e671 commit 4b7667d
Show file tree
Hide file tree
Showing 27 changed files with 425 additions and 331 deletions.
21 changes: 14 additions & 7 deletions airflow/api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,32 +16,39 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""Authentication backend"""

from typing import Any
from importlib import import_module

from airflow.exceptions import AirflowException
from airflow import configuration as conf
from importlib import import_module

from airflow.utils.log.logging_mixin import LoggingMixin

api_auth = None # type: Any

log = LoggingMixin().log
class ApiAuth: # pylint: disable=too-few-public-methods
"""Class to keep module of Authentication API """
def __init__(self):
self.api_auth = None


API_AUTH = ApiAuth()

LOG = LoggingMixin().log


def load_auth():
"""Loads authentication backend"""
auth_backend = 'airflow.api.auth.backend.default'
try:
auth_backend = conf.get("api", "auth_backend")
except conf.AirflowConfigException:
pass

try:
global api_auth
api_auth = import_module(auth_backend)
API_AUTH.api_auth = import_module(auth_backend)
except ImportError as err:
log.critical(
LOG.critical(
"Cannot import %s for API authentication due to: %s",
auth_backend, err
)
Expand Down
9 changes: 5 additions & 4 deletions airflow/api/auth/backend/default.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,18 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

"""Default authentication backend - everything is allowed"""
from functools import wraps

client_auth = None
CLIENT_AUTH = None


def init_app(app):
pass
def init_app(_):
"""Initializes authentication backend"""


def requires_authentication(function):
"""Decorator for functions that require authentication"""
@wraps(function)
def decorated(*args, **kwargs):
return function(*args, **kwargs)
Expand Down
13 changes: 8 additions & 5 deletions airflow/api/auth/backend/deny_all.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,23 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

"""Authentication backend that denies all requests"""
from functools import wraps
from flask import Response

client_auth = None
CLIENT_AUTH = None


def init_app(app):
pass
def init_app(_):
"""Initializes authentication"""


def requires_authentication(function):
"""Decorator for functions that require authentication"""

# noinspection PyUnusedLocal
@wraps(function)
def decorated(*args, **kwargs):
def decorated(*args, **kwargs): # pylint: disable=unused-argument
return Response("Forbidden", 403)

return decorated
67 changes: 40 additions & 27 deletions airflow/api/auth/backend/kerberos_auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,53 +23,66 @@
# ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

from airflow.utils.log.logging_mixin import LoggingMixin

import kerberos
"""Kerberos authentication module"""
import os

from airflow import configuration as conf
from functools import wraps
from socket import getfqdn

from flask import Response
# noinspection PyProtectedMember
from flask import _request_ctx_stack as stack # type: ignore
from flask import make_response
from flask import request
from flask import g
from functools import wraps

import kerberos

from requests_kerberos import HTTPKerberosAuth
from socket import getfqdn

client_auth = HTTPKerberosAuth(service='airflow')
from airflow import configuration as conf
from airflow.utils.log.logging_mixin import LoggingMixin


# pylint: disable=c-extension-no-member
CLIENT_AUTH = HTTPKerberosAuth(service='airflow')


_SERVICE_NAME = None
LOG = LoggingMixin().log

log = LoggingMixin().log

class KerberosService: # pylint: disable=too-few-public-methods
"""Class to keep information about the Kerberos Service initialized """
def __init__(self):
self.service_name = None


# Stores currently initialized Kerberos Service
_KERBEROS_SERVICE = KerberosService()


def init_app(app):
global _SERVICE_NAME
"""Initializes application with kerberos"""

hostname = app.config.get('SERVER_NAME')
if not hostname:
hostname = getfqdn()
log.info("Kerberos: hostname %s", hostname)
LOG.info("Kerberos: hostname %s", hostname)

service = 'airflow'

_SERVICE_NAME = "{}@{}".format(service, hostname)
_KERBEROS_SERVICE.service_name = "{}@{}".format(service, hostname)

if 'KRB5_KTNAME' not in os.environ:
os.environ['KRB5_KTNAME'] = conf.get('kerberos', 'keytab')

try:
log.info("Kerberos init: %s %s", service, hostname)
LOG.info("Kerberos init: %s %s", service, hostname)
principal = kerberos.getServerPrincipalDetails(service, hostname)
except kerberos.KrbError as err:
log.warning("Kerberos: %s", err)
LOG.warning("Kerberos: %s", err)
else:
log.info("Kerberos API: server is %s", principal)
LOG.info("Kerberos API: server is %s", principal)


def _unauthorized():
Expand All @@ -88,18 +101,17 @@ def _gssapi_authenticate(token):
state = None
ctx = stack.top
try:
rc, state = kerberos.authGSSServerInit(_SERVICE_NAME)
if rc != kerberos.AUTH_GSS_COMPLETE:
return_code, state = kerberos.authGSSServerInit(_KERBEROS_SERVICE.service_name)
if return_code != kerberos.AUTH_GSS_COMPLETE:
return None
rc = kerberos.authGSSServerStep(state, token)
if rc == kerberos.AUTH_GSS_COMPLETE:
return_code = kerberos.authGSSServerStep(state, token)
if return_code == kerberos.AUTH_GSS_COMPLETE:
ctx.kerberos_token = kerberos.authGSSServerResponse(state)
ctx.kerberos_user = kerberos.authGSSServerUserName(state)
return rc
elif rc == kerberos.AUTH_GSS_CONTINUE:
return return_code
if return_code == kerberos.AUTH_GSS_CONTINUE:
return kerberos.AUTH_GSS_CONTINUE
else:
return None
return None
except kerberos.GSSError:
return None
finally:
Expand All @@ -108,14 +120,15 @@ def _gssapi_authenticate(token):


def requires_authentication(function):
"""Decorator for functions that require authentication with Kerberos"""
@wraps(function)
def decorated(*args, **kwargs):
header = request.headers.get("Authorization")
if header:
ctx = stack.top
token = ''.join(header.split()[1:])
rc = _gssapi_authenticate(token)
if rc == kerberos.AUTH_GSS_COMPLETE:
return_code = _gssapi_authenticate(token)
if return_code == kerberos.AUTH_GSS_COMPLETE:
g.user = ctx.kerberos_user
response = function(*args, **kwargs)
response = make_response(response)
Expand All @@ -124,7 +137,7 @@ def decorated(*args, **kwargs):
ctx.kerberos_token])

return response
elif rc != kerberos.AUTH_GSS_CONTINUE:
if return_code != kerberos.AUTH_GSS_CONTINUE:
return _forbidden()
return _unauthorized()
return decorated
1 change: 1 addition & 0 deletions airflow/api/client/api_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""Client for all the API clients."""


class Client:
Expand Down
7 changes: 5 additions & 2 deletions airflow/api/client/json_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""JSON API Client"""

from urllib.parse import urljoin

Expand All @@ -35,11 +36,13 @@ def _request(self, url, method='GET', json=None):
if json is not None:
params['json'] = json

resp = getattr(requests, method.lower())(**params)
resp = getattr(requests, method.lower())(**params) # pylint: disable=not-callable
if not resp.ok:
# It is justified here because there might be many resp types.
# noinspection PyBroadException
try:
data = resp.json()
except Exception:
except Exception: # pylint: disable=broad-except
data = {}
raise IOError(data.get('error', 'Server error'))

Expand Down
23 changes: 12 additions & 11 deletions airflow/api/client/local_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""Local client API"""

from airflow.api.client import api_client
from airflow.api.common.experimental import pool
Expand All @@ -27,27 +28,27 @@ class Client(api_client.Client):
"""Local API client implementation."""

def trigger_dag(self, dag_id, run_id=None, conf=None, execution_date=None):
dr = trigger_dag.trigger_dag(dag_id=dag_id,
run_id=run_id,
conf=conf,
execution_date=execution_date)
return "Created {}".format(dr)
dag_run = trigger_dag.trigger_dag(dag_id=dag_id,
run_id=run_id,
conf=conf,
execution_date=execution_date)
return "Created {}".format(dag_run)

def delete_dag(self, dag_id):
count = delete_dag.delete_dag(dag_id)
return "Removed {} record(s)".format(count)

def get_pool(self, name):
p = pool.get_pool(name=name)
return p.pool, p.slots, p.description
the_pool = pool.get_pool(name=name)
return the_pool.pool, the_pool.slots, the_pool.description

def get_pools(self):
return [(p.pool, p.slots, p.description) for p in pool.get_pools()]

def create_pool(self, name, slots, description):
p = pool.create_pool(name=name, slots=slots, description=description)
return p.pool, p.slots, p.description
the_pool = pool.create_pool(name=name, slots=slots, description=description)
return the_pool.pool, the_pool.slots, the_pool.description

def delete_pool(self, name):
p = pool.delete_pool(name=name)
return p.pool, p.slots, p.description
the_pool = pool.delete_pool(name=name)
return the_pool.pool, the_pool.slots, the_pool.description
29 changes: 29 additions & 0 deletions airflow/api/common/experimental/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,32 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""Experimental APIs."""
from datetime import datetime
from typing import Optional

from airflow.exceptions import DagNotFound, TaskNotFound, DagRunNotFound
from airflow.models import DagBag, DagModel, DagRun


def check_and_get_dag(dag_id: str, task_id: Optional[str] = None) -> DagModel:
"""Checks that DAG exists and in case it is specified that Task exist"""
dagbag = DagBag()
if dag_id not in dagbag.dags:
error_message = "Dag id {} not found".format(dag_id)
raise DagNotFound(error_message)
dag = dagbag.get_dag(dag_id)
if task_id and not dag.has_task(task_id):
error_message = 'Task {} not found in dag {}'.format(task_id, dag_id)
raise TaskNotFound(error_message)
return dag


def check_and_get_dagrun(dag: DagModel, execution_date: datetime) -> DagRun:
"""Get DagRun object and check that it exists"""
dagrun = dag.get_dagrun(execution_date=execution_date)
if not dagrun:
error_message = ('Dag Run for date {} not found in dag {}'
.format(execution_date, dag.dag_id))
raise DagRunNotFound(error_message)
return dagrun
Loading

0 comments on commit 4b7667d

Please sign in to comment.