Skip to content

Commit

Permalink
[AIRFLOW-2918] Fix Flake8 violations (apache#3772)
Browse files Browse the repository at this point in the history
- Unused imports
- Wrong import order
- Small identation fixes
- Remove one letter variables
- Fix noqa annotations
  • Loading branch information
Fokko authored and kaxil committed Aug 20, 2018
1 parent afe857f commit d494d5a
Show file tree
Hide file tree
Showing 54 changed files with 338 additions and 338 deletions.
4 changes: 2 additions & 2 deletions airflow/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@

import sys

from airflow import configuration as conf
from airflow import settings
# flake8: noqa: F401
from airflow import settings, configuration as conf
from airflow.models import DAG
from flask_admin import BaseView
from importlib import import_module
Expand Down
10 changes: 3 additions & 7 deletions airflow/contrib/auth/backends/github_enterprise_auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,14 @@
import flask_login

# Need to expose these downstream
# pylint: disable=unused-import
from flask_login import (current_user,
logout_user,
login_required,
login_user)
# pylint: enable=unused-import
# flake8: noqa: F401
from flask_login import current_user, logout_user, login_required, login_user

from flask import url_for, redirect, request

from flask_oauthlib.client import OAuth

from airflow import models, configuration, settings
from airflow import models, configuration
from airflow.configuration import AirflowConfigException
from airflow.utils.db import provide_session
from airflow.utils.log.logging_mixin import LoggingMixin
Expand Down
10 changes: 3 additions & 7 deletions airflow/contrib/auth/backends/google_auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,14 @@
import flask_login

# Need to expose these downstream
# pylint: disable=unused-import
from flask_login import (current_user,
logout_user,
login_required,
login_user)
# pylint: enable=unused-import
# flake8: noqa: F401
from flask_login import current_user, logout_user, login_required, login_user

from flask import url_for, redirect, request

from flask_oauthlib.client import OAuth

from airflow import models, configuration, settings
from airflow import models, configuration
from airflow.utils.db import provide_session
from airflow.utils.log.logging_mixin import LoggingMixin

Expand Down
4 changes: 1 addition & 3 deletions airflow/contrib/auth/backends/kerberos_auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@
import flask_login
from flask_login import current_user
from flask import flash
from wtforms import (
Form, PasswordField, StringField)
from wtforms import Form, PasswordField, StringField
from wtforms.validators import InputRequired

# pykerberos should be used as it verifies the KDC, the "kerberos" module does not do so
Expand All @@ -32,7 +31,6 @@

from flask import url_for, redirect

from airflow import settings
from airflow import models
from airflow import configuration
from airflow.utils.db import provide_session
Expand Down
7 changes: 3 additions & 4 deletions airflow/contrib/auth/backends/ldap_auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,12 @@
from future.utils import native

import flask_login
from flask_login import login_required, current_user, logout_user
from flask_login import login_required, current_user, logout_user # noqa: F401
from flask import flash
from wtforms import (
Form, PasswordField, StringField)
from wtforms import Form, PasswordField, StringField
from wtforms.validators import InputRequired

from ldap3 import Server, Connection, Tls, LEVEL, SUBTREE, BASE
from ldap3 import Server, Connection, Tls, LEVEL, SUBTREE
import ssl

from flask import url_for, redirect
Expand Down
1 change: 1 addition & 0 deletions airflow/contrib/operators/mlengine_prediction_summary.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
# flake8: noqa: F841
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
Expand Down
7 changes: 2 additions & 5 deletions airflow/default_login.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@
"""

import flask_login
from flask_login import login_required, current_user, logout_user
from flask_login import login_required, current_user, logout_user # noqa: F401

from flask import url_for, redirect

from airflow import settings
from airflow import settings # noqa: F401
from airflow import models
from airflow.utils.db import provide_session

Expand Down Expand Up @@ -64,9 +64,6 @@ def is_superuser(self):
"""Access all the things"""
return True

# models.User = User # hack!
# del User


@login_manager.user_loader
@provide_session
Expand Down
35 changes: 17 additions & 18 deletions airflow/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -657,7 +657,7 @@ def manage_slas(self, dag, session=None):
slas = (
session
.query(SlaMiss)
.filter(SlaMiss.notification_sent == False)
.filter(SlaMiss.notification_sent == False) # noqa: E712
.filter(SlaMiss.dag_id == dag.dag_id)
.all()
)
Expand Down Expand Up @@ -707,16 +707,13 @@ def manage_slas(self, dag, session=None):
Blocking tasks:
<pre><code>{blocking_task_list}\n{bug}<code></pre>
""".format(bug=asciiart.bug, **locals())
emails = []
for t in dag.tasks:
if t.email:
if isinstance(t.email, basestring):
l = [t.email]
elif isinstance(t.email, (list, tuple)):
l = t.email
for email in l:
if email not in emails:
emails.append(email)
emails = set()
for task in dag.tasks:
if task.email:
if isinstance(task.email, basestring):
emails.add(task.email)
elif isinstance(task.email, (list, tuple)):
emails |= set(task.email)
if emails and len(slas):
try:
send_email(
Expand Down Expand Up @@ -817,7 +814,7 @@ def create_dag_run(self, dag, session=None):
session.query(func.max(DagRun.execution_date))
.filter_by(dag_id=dag.dag_id)
.filter(or_(
DagRun.external_trigger == False,
DagRun.external_trigger == False, # noqa: E712
# add % as a wildcard for the like query
DagRun.run_id.like(DagRun.ID_PREFIX + '%')
))
Expand Down Expand Up @@ -1088,14 +1085,16 @@ def _find_executable_task_instances(self, simple_dag_bag, states, session=None):
DR,
and_(DR.dag_id == TI.dag_id, DR.execution_date == TI.execution_date)
)
.filter(or_(DR.run_id == None, # noqa E711
.filter(or_(DR.run_id == None, # noqa: E711
not_(DR.run_id.like(BackfillJob.ID_PREFIX + '%'))))
.outerjoin(DM, DM.dag_id == TI.dag_id)
.filter(or_(DM.dag_id == None, # noqa E711
.filter(or_(DM.dag_id == None, # noqa: E711
not_(DM.is_paused)))
)
if None in states:
ti_query = ti_query.filter(or_(TI.state == None, TI.state.in_(states))) # noqa E711
ti_query = ti_query.filter(
or_(TI.state == None, TI.state.in_(states)) # noqa: E711
)
else:
ti_query = ti_query.filter(TI.state.in_(states))

Expand Down Expand Up @@ -1183,8 +1182,8 @@ def _find_executable_task_instances(self, simple_dag_bag, states, session=None):
)
if current_task_concurrency >= task_concurrency_limit:
self.log.info(
"Not executing %s since the number of tasks running or queued from DAG %s"
" is >= to the DAG's task concurrency limit of %s",
"Not executing %s since the number of tasks running or queued "
"from DAG %s is >= to the DAG's task concurrency limit of %s",
task_instance, dag_id, task_concurrency_limit
)
continue
Expand Down Expand Up @@ -1260,7 +1259,7 @@ def _change_state_for_executable_task_instances(self, task_instances,

if None in acceptable_states:
ti_query = ti_query.filter(
or_(TI.state == None, TI.state.in_(acceptable_states)) # noqa E711
or_(TI.state == None, TI.state.in_(acceptable_states)) # noqa: E711
)
else:
ti_query = ti_query.filter(TI.state.in_(acceptable_states))
Expand Down
1 change: 1 addition & 0 deletions airflow/migrations/env.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ def run_migrations_online():
with context.begin_transaction():
context.run_migrations()


if context.is_offline_mode():
run_migrations_offline()
else:
Expand Down
3 changes: 0 additions & 3 deletions airflow/migrations/versions/05f30312d566_merge_heads.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,6 @@
branch_labels = None
depends_on = None

from alembic import op
import sqlalchemy as sa


def upgrade():
pass
Expand Down
26 changes: 16 additions & 10 deletions airflow/migrations/versions/0e2a74e0fc9f_add_time_zone_awareness.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,16 @@
"""

from alembic import op
from sqlalchemy.dialects import mysql
import sqlalchemy as sa

# revision identifiers, used by Alembic.
revision = '0e2a74e0fc9f'
down_revision = 'd2ae31099d61'
branch_labels = None
depends_on = None

from alembic import op
from sqlalchemy.dialects import mysql
import sqlalchemy as sa


def upgrade():
conn = op.get_bind()
Expand Down Expand Up @@ -69,14 +69,16 @@ def upgrade():
op.alter_column(table_name='log', column_name='dttm', type_=mysql.TIMESTAMP(fsp=6))
op.alter_column(table_name='log', column_name='execution_date', type_=mysql.TIMESTAMP(fsp=6))

op.alter_column(table_name='sla_miss', column_name='execution_date', type_=mysql.TIMESTAMP(fsp=6), nullable=False)
op.alter_column(table_name='sla_miss', column_name='execution_date', type_=mysql.TIMESTAMP(fsp=6),
nullable=False)
op.alter_column(table_name='sla_miss', column_name='timestamp', type_=mysql.TIMESTAMP(fsp=6))

op.alter_column(table_name='task_fail', column_name='execution_date', type_=mysql.TIMESTAMP(fsp=6))
op.alter_column(table_name='task_fail', column_name='start_date', type_=mysql.TIMESTAMP(fsp=6))
op.alter_column(table_name='task_fail', column_name='end_date', type_=mysql.TIMESTAMP(fsp=6))

op.alter_column(table_name='task_instance', column_name='execution_date', type_=mysql.TIMESTAMP(fsp=6), nullable=False)
op.alter_column(table_name='task_instance', column_name='execution_date', type_=mysql.TIMESTAMP(fsp=6),
nullable=False)
op.alter_column(table_name='task_instance', column_name='start_date', type_=mysql.TIMESTAMP(fsp=6))
op.alter_column(table_name='task_instance', column_name='end_date', type_=mysql.TIMESTAMP(fsp=6))
op.alter_column(table_name='task_instance', column_name='queued_dttm', type_=mysql.TIMESTAMP(fsp=6))
Expand Down Expand Up @@ -117,14 +119,16 @@ def upgrade():
op.alter_column(table_name='log', column_name='dttm', type_=sa.TIMESTAMP(timezone=True))
op.alter_column(table_name='log', column_name='execution_date', type_=sa.TIMESTAMP(timezone=True))

op.alter_column(table_name='sla_miss', column_name='execution_date', type_=sa.TIMESTAMP(timezone=True), nullable=False)
op.alter_column(table_name='sla_miss', column_name='execution_date', type_=sa.TIMESTAMP(timezone=True),
nullable=False)
op.alter_column(table_name='sla_miss', column_name='timestamp', type_=sa.TIMESTAMP(timezone=True))

op.alter_column(table_name='task_fail', column_name='execution_date', type_=sa.TIMESTAMP(timezone=True))
op.alter_column(table_name='task_fail', column_name='start_date', type_=sa.TIMESTAMP(timezone=True))
op.alter_column(table_name='task_fail', column_name='end_date', type_=sa.TIMESTAMP(timezone=True))

op.alter_column(table_name='task_instance', column_name='execution_date', type_=sa.TIMESTAMP(timezone=True), nullable=False)
op.alter_column(table_name='task_instance', column_name='execution_date', type_=sa.TIMESTAMP(timezone=True),
nullable=False)
op.alter_column(table_name='task_instance', column_name='start_date', type_=sa.TIMESTAMP(timezone=True))
op.alter_column(table_name='task_instance', column_name='end_date', type_=sa.TIMESTAMP(timezone=True))
op.alter_column(table_name='task_instance', column_name='queued_dttm', type_=sa.TIMESTAMP(timezone=True))
Expand Down Expand Up @@ -161,14 +165,16 @@ def downgrade():
op.alter_column(table_name='log', column_name='dttm', type_=mysql.DATETIME(fsp=6))
op.alter_column(table_name='log', column_name='execution_date', type_=mysql.DATETIME(fsp=6))

op.alter_column(table_name='sla_miss', column_name='execution_date', type_=mysql.DATETIME(fsp=6), nullable=False)
op.alter_column(table_name='sla_miss', column_name='execution_date', type_=mysql.DATETIME(fsp=6),
nullable=False)
op.alter_column(table_name='sla_miss', column_name='DATETIME', type_=mysql.DATETIME(fsp=6))

op.alter_column(table_name='task_fail', column_name='execution_date', type_=mysql.DATETIME(fsp=6))
op.alter_column(table_name='task_fail', column_name='start_date', type_=mysql.DATETIME(fsp=6))
op.alter_column(table_name='task_fail', column_name='end_date', type_=mysql.DATETIME(fsp=6))

op.alter_column(table_name='task_instance', column_name='execution_date', type_=mysql.DATETIME(fsp=6), nullable=False)
op.alter_column(table_name='task_instance', column_name='execution_date', type_=mysql.DATETIME(fsp=6),
nullable=False)
op.alter_column(table_name='task_instance', column_name='start_date', type_=mysql.DATETIME(fsp=6))
op.alter_column(table_name='task_instance', column_name='end_date', type_=mysql.DATETIME(fsp=6))
op.alter_column(table_name='task_instance', column_name='queued_dttm', type_=mysql.DATETIME(fsp=6))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,14 @@
Create Date: 2017-01-25 11:43:51.635667
"""
from alembic import op

# revision identifiers, used by Alembic.
revision = '127d2bf2dfa7'
down_revision = '5e7d17757c7a'
branch_labels = None
depends_on = None

from alembic import op
import sqlalchemy as sa

def upgrade():
op.create_index('dag_id_state', 'dag_run', ['dag_id', 'state'], unique=False)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,16 @@
Create Date: 2015-08-18 18:57:51.927315
"""
from alembic import op
import sqlalchemy as sa
from sqlalchemy.engine.reflection import Inspector

# revision identifiers, used by Alembic.
revision = '1507a7289a2f'
down_revision = 'e3a246e0dc1'
branch_labels = None
depends_on = None

from alembic import op
import sqlalchemy as sa
from sqlalchemy.engine.reflection import Inspector

connectionhelper = sa.Table(
'connection',
sa.MetaData(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,19 +24,18 @@
Create Date: 2016-02-02 17:20:55.692295
"""
from alembic import op
import sqlalchemy as sa

# revision identifiers, used by Alembic.
revision = '1968acfc09e3'
down_revision = 'bba5a7cfc896'
branch_labels = None
depends_on = None

from alembic import op
import sqlalchemy as sa


def upgrade():
op.add_column('variable', sa.Column('is_encrypted', sa.Boolean,default=False))
op.add_column('variable', sa.Column('is_encrypted', sa.Boolean, default=False))


def downgrade():
Expand Down
25 changes: 12 additions & 13 deletions airflow/migrations/versions/1b38cef5b76e_add_dagrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,28 +25,27 @@
"""

from alembic import op
import sqlalchemy as sa

# revision identifiers, used by Alembic.
revision = '1b38cef5b76e'
down_revision = '502898887f84'
branch_labels = None
depends_on = None

from alembic import op
import sqlalchemy as sa


def upgrade():
op.create_table('dag_run',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('dag_id', sa.String(length=250), nullable=True),
sa.Column('execution_date', sa.DateTime(), nullable=True),
sa.Column('state', sa.String(length=50), nullable=True),
sa.Column('run_id', sa.String(length=250), nullable=True),
sa.Column('external_trigger', sa.Boolean(), nullable=True),
sa.PrimaryKeyConstraint('id'),
sa.UniqueConstraint('dag_id', 'execution_date'),
sa.UniqueConstraint('dag_id', 'run_id'),
)
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('dag_id', sa.String(length=250), nullable=True),
sa.Column('execution_date', sa.DateTime(), nullable=True),
sa.Column('state', sa.String(length=50), nullable=True),
sa.Column('run_id', sa.String(length=250), nullable=True),
sa.Column('external_trigger', sa.Boolean(), nullable=True),
sa.PrimaryKeyConstraint('id'),
sa.UniqueConstraint('dag_id', 'execution_date'),
sa.UniqueConstraint('dag_id', 'run_id'))


def downgrade():
Expand Down
Loading

0 comments on commit d494d5a

Please sign in to comment.