Skip to content

Commit

Permalink
Remove :type lines now sphinx-autoapi supports typehints (apache#20951
Browse files Browse the repository at this point in the history
)

* Remove `:type` lines now sphinx-autoapi supports typehints

Since we have no updated sphinx-autoapi to a more recent version it
supports showing type hints in the documentation, so we don't need to
have the type hints _and_ the `:type` lines -- which is good, as the
ones in the doc strings are easy to get out of date!

The following settings have been set:

`autodoc_typehints = 'description'` -- show types in description (where
previous `:type` used to show up)

`autodoc_typehints_description_target = 'documented'` -- only link to
types that are documented. (Without this we have some missing return
types that aren't documented, and aren't linked to in our current python
API docs, so this caused a build failure)

`autodoc_typehints_format = 'short'` -- Shorten type hints where
possible, i.e. `StringIO` instead of `io.StringIO`

* Add argument type names to local spelling dictionary

Now that we are using the type hints in the docs, sphinxcontrib-spelling
picks them up as words to be checked, so we have to ignore them.

I've chosen to add the provider specific ones to local dictionary files
rather than the global, as for example, `mgmt` is an error in most
places, but not in some of the Azure provider.
  • Loading branch information
ashb authored Jan 20, 2022
1 parent 4748276 commit 602abe8
Show file tree
Hide file tree
Showing 518 changed files with 143 additions and 9,296 deletions.
10 changes: 0 additions & 10 deletions airflow/api/common/mark_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,26 +97,16 @@ def set_state(
on the schedule (but it will as for subdag dag runs if needed).
:param tasks: the iterable of tasks from which to work. task.task.dag needs to be set
:type tasks: list[airflow.models.baseoperator.BaseOperator]
:param dag_run_id: the run_id of the dagrun to start looking from
:type dag_run_id: str
:param execution_date: the execution date from which to start looking(deprecated)
:type execution_date: datetime.datetime
:param upstream: Mark all parents (upstream tasks)
:type upstream: bool
:param downstream: Mark all siblings (downstream tasks) of task_id, including SubDags
:type downstream: bool
:param future: Mark all future tasks on the interval of the dag up until
last execution date.
:type future: bool
:param past: Retroactively mark all tasks starting from start_date of the DAG
:type past: bool
:param state: State to which the tasks need to be set
:type state: str
:param commit: Commit tasks to be altered to the database
:type commit: bool
:param session: database session
:type session: sqlalchemy.orm.session.Session
:return: list of tasks that have been created and updated
"""
if not tasks:
Expand Down
6 changes: 1 addition & 5 deletions airflow/api_connexion/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,7 @@


def common_error_handler(exception: BaseException) -> flask.Response:
"""
Used to capture connexion exceptions and add link to the type field
:type exception: Exception
"""
"""Used to capture connexion exceptions and add link to the type field."""
if isinstance(exception, ProblemException):

link = EXCEPTIONS_LINK_MAP.get(exception.status)
Expand Down
6 changes: 0 additions & 6 deletions airflow/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -616,25 +616,19 @@ def as_dict(
:param display_source: If False, the option value is returned. If True,
a tuple of (option_value, source) is returned. Source is either
'airflow.cfg', 'default', 'env var', or 'cmd'.
:type display_source: bool
:param display_sensitive: If True, the values of options set by env
vars and bash commands will be displayed. If False, those options
are shown as '< hidden >'
:type display_sensitive: bool
:param raw: Should the values be output as interpolated values, or the
"raw" form that can be fed back in to ConfigParser
:type raw: bool
:param include_env: Should the value of configuration from AIRFLOW__
environment variables be included or not
:type include_env: bool
:param include_cmds: Should the result of calling any *_cmd config be
set (True, default), or should the _cmd options be left as the
command to run (False)
:type include_cmds: bool
:param include_secret: Should the result of calling any *_secret config be
set (True, default), or should the _secret options be left as the
path to get the secret from (False)
:type include_secret: bool
:rtype: Dict[str, Dict[str, str]]
:return: Dictionary, where the key is the name of the section and the content is
the dictionary with the name of the parameter and its value.
Expand Down
25 changes: 0 additions & 25 deletions airflow/dag_processing/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,18 +94,13 @@ class DagFileProcessorAgent(LoggingMixin, MultiprocessingStartMethodMixin):
:param dag_directory: Directory where DAG definitions are kept. All
files in file_paths should be under this directory
:type dag_directory: str
:param max_runs: The number of times to parse and schedule each file. -1
for unlimited.
:type max_runs: int
:param processor_timeout: How long to wait before timing out a DAG file processor
:type processor_timeout: timedelta
:param dag_ids: if specified, only schedule tasks with these DAG IDs
:type dag_ids: list[str]
:param pickle_dags: whether to pickle DAGs.
:type: pickle_dags: bool
:param async_mode: Whether to start agent in async mode
:type async_mode: bool
"""

def __init__(
Expand Down Expand Up @@ -188,7 +183,6 @@ def send_callback_to_execute(self, request: CallbackRequest) -> None:
Sends information about the callback to be executed by DagFileProcessor.
:param request: Callback request to be executed.
:type request: CallbackRequest
"""
if not self._parent_signal_conn:
raise ValueError("Process not started.")
Expand All @@ -204,9 +198,7 @@ def send_sla_callback_request_to_execute(self, full_filepath: str, dag_id: str)
Sends information about the SLA callback to be executed by DagFileProcessor.
:param full_filepath: DAG File path
:type full_filepath: str
:param dag_id: DAG ID
:type dag_id: str
"""
if not self._parent_signal_conn:
raise ValueError("Process not started.")
Expand Down Expand Up @@ -390,20 +382,13 @@ class DagFileProcessorManager(LoggingMixin):
:param dag_directory: Directory where DAG definitions are kept. All
files in file_paths should be under this directory
:type dag_directory: unicode
:param max_runs: The number of times to parse and schedule each file. -1
for unlimited.
:type max_runs: int
:param processor_timeout: How long to wait before timing out a DAG file processor
:type processor_timeout: timedelta
:param signal_conn: connection to communicate signal with processor agent.
:type signal_conn: MultiprocessingConnection
:param dag_ids: if specified, only schedule tasks with these DAG IDs
:type dag_ids: list[str]
:param pickle_dags: whether to pickle DAGs.
:type pickle_dags: bool
:param async_mode: whether to start the manager in async mode
:type async_mode: bool
"""

def __init__(
Expand Down Expand Up @@ -709,7 +694,6 @@ def clear_nonexistent_import_errors(self, session):
Clears import errors for files that no longer exist.
:param session: session for ORM operations
:type session: sqlalchemy.orm.session.Session
"""
query = session.query(errors.ImportError)
if self._file_paths:
Expand All @@ -723,7 +707,6 @@ def _log_file_processing_stats(self, known_file_paths):
:param known_file_paths: a list of file paths that may contain Airflow
DAG definitions
:type known_file_paths: list[unicode]
:return: None
"""
# File Path: Path to the file containing the DAG definition
Expand Down Expand Up @@ -786,7 +769,6 @@ def _log_file_processing_stats(self, known_file_paths):
def get_pid(self, file_path):
"""
:param file_path: the path to the file that's being processed
:type file_path: unicode
:return: the PID of the process processing the given file or None if
the specified file is not being processed
:rtype: int
Expand All @@ -805,7 +787,6 @@ def get_all_pids(self):
def get_last_runtime(self, file_path):
"""
:param file_path: the path to the file that was processed
:type file_path: unicode
:return: the runtime (in seconds) of the process of the last run, or
None if the file was never processed.
:rtype: float
Expand All @@ -816,7 +797,6 @@ def get_last_runtime(self, file_path):
def get_last_dag_count(self, file_path):
"""
:param file_path: the path to the file that was processed
:type file_path: unicode
:return: the number of dags loaded from that file, or None if the file
was never processed.
:rtype: int
Expand All @@ -827,7 +807,6 @@ def get_last_dag_count(self, file_path):
def get_last_error_count(self, file_path):
"""
:param file_path: the path to the file that was processed
:type file_path: unicode
:return: the number of import errors from processing, or None if the file
was never processed.
:rtype: int
Expand All @@ -838,7 +817,6 @@ def get_last_error_count(self, file_path):
def get_last_finish_time(self, file_path):
"""
:param file_path: the path to the file that was processed
:type file_path: unicode
:return: the finish time of the process of the last run, or None if the
file was never processed.
:rtype: datetime
Expand All @@ -849,7 +827,6 @@ def get_last_finish_time(self, file_path):
def get_start_time(self, file_path):
"""
:param file_path: the path to the file that's being processed
:type file_path: unicode
:return: the start time of the process that's processing the
specified file or None if the file is not currently being processed
:rtype: datetime
Expand All @@ -861,7 +838,6 @@ def get_start_time(self, file_path):
def get_run_count(self, file_path):
"""
:param file_path: the path to the file that's being processed
:type file_path: unicode
:return: the number of times the given file has been parsed
:rtype: int
"""
Expand All @@ -873,7 +849,6 @@ def set_file_paths(self, new_file_paths):
Update this with a new set of paths to DAG definition files.
:param new_file_paths: list of paths to DAG definition files
:type new_file_paths: list[unicode]
:return: None
"""
self._file_paths = new_file_paths
Expand Down
21 changes: 0 additions & 21 deletions airflow/dag_processing/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,13 +58,9 @@ class DagFileProcessorProcess(LoggingMixin, MultiprocessingStartMethodMixin):
"""Runs DAG processing in a separate process using DagFileProcessor
:param file_path: a Python file containing Airflow DAG definitions
:type file_path: str
:param pickle_dags: whether to serialize the DAG objects to the DB
:type pickle_dags: bool
:param dag_ids: If specified, only look at these DAG ID's
:type dag_ids: List[str]
:param callback_requests: failure callback to execute
:type callback_requests: List[airflow.utils.callback_requests.CallbackRequest]
"""

# Counter that increments every time an instance of this class is created
Expand Down Expand Up @@ -116,21 +112,14 @@ def _run_file_processor(
Process the given file.
:param result_channel: the connection to use for passing back the result
:type result_channel: multiprocessing.Connection
:param parent_channel: the parent end of the channel to close in the child
:type parent_channel: multiprocessing.Connection
:param file_path: the file to process
:type file_path: str
:param pickle_dags: whether to pickle the DAGs found in the file and
save them to the DB
:type pickle_dags: bool
:param dag_ids: if specified, only examine DAG ID's that are
in this list
:type dag_ids: list[str]
:param thread_name: the name to use for the process that is launched
:type thread_name: str
:param callback_requests: failure callback to execute
:type callback_requests: List[airflow.utils.callback_requests.CallbackRequest]
:return: the process that was launched
:rtype: multiprocessing.Process
"""
Expand Down Expand Up @@ -223,7 +212,6 @@ def terminate(self, sigkill: bool = False) -> None:
Terminate (and then kill) the process launched to process the file.
:param sigkill: whether to issue a SIGKILL if SIGTERM doesn't work.
:type sigkill: bool
"""
if self._process is None or self._parent_channel is None:
raise AirflowException("Tried to call terminate before starting!")
Expand Down Expand Up @@ -353,9 +341,7 @@ class DagFileProcessor(LoggingMixin):
Returns a tuple of 'number of dags found' and 'the count of import errors'
:param dag_ids: If specified, only look at these DAG ID's
:type dag_ids: List[str]
:param log: Logger to save the processing process
:type log: logging.Logger
"""

UNIT_TEST_MODE: bool = conf.getboolean('core', 'UNIT_TEST_MODE')
Expand Down Expand Up @@ -532,9 +518,7 @@ def update_import_errors(session: Session, dagbag: DagBag) -> None:
Airflow UI so that users know that there are issues parsing DAGs.
:param session: session for ORM operations
:type session: sqlalchemy.orm.session.Session
:param dagbag: DagBag containing DAGs with import errors
:type dagbag: airflow.DagBag
"""
# Clear the errors of the processed files
for dagbag_file in dagbag.file_last_changed:
Expand Down Expand Up @@ -565,7 +549,6 @@ def execute_callbacks(
:param dagbag: Dag Bag of dags
:param callback_requests: failure callbacks to execute
:type callback_requests: List[airflow.utils.callback_requests.CallbackRequest]
:param session: DB session.
"""
for request in callback_requests:
Expand Down Expand Up @@ -628,14 +611,10 @@ def process_file(
6. Record any errors importing the file into ORM
:param file_path: the path to the Python file that should be executed
:type file_path: str
:param callback_requests: failure callback to execute
:type callback_requests: List[airflow.utils.dag_processing.CallbackRequest]
:param pickle_dags: whether serialize the DAGs found in the file and
save them to the db
:type pickle_dags: bool
:param session: Sqlalchemy ORM Session
:type session: Session
:return: number of dags found, count of import errors
:rtype: Tuple[int, int]
"""
Expand Down
Loading

0 comments on commit 602abe8

Please sign in to comment.