Skip to content

Commit

Permalink
Apply PEP-563 (Postponed Evaluation of Annotations) to non-core airfl…
Browse files Browse the repository at this point in the history
…ow (apache#26289)

This PR applies PEP-563 to all non-core airflow Python files - i.e.
those that we usually do not cherry-pick to v2-* branches. There
will be a follow-up to apply the "core changes" right after with
the intention to be cherry-picked to v2-* branch before 2.4.0
release candidate is ready, in order to make cherry-picking
easier.

This PR is a result of combining some of the 47 PRs reviewed
and approved separately (otherwise it would have been unreviewable)

The history of those PRs can be changed in:
https://github.com/apache/airflow/pulls?q=is%3Apr+label%3Afuture-annotations+is%3Aopen

Relevant discussion: https://lists.apache.org/thread/81fr042s5d3v17v83bpo24tnrr2pp0fp
Lazy consensus call: https://lists.apache.org/thread/l74nvjh8tgbtojllhwkcn7f8mfnlz4jq
  • Loading branch information
potiuk authored Sep 13, 2022
1 parent aa0118b commit 06acf40
Show file tree
Hide file tree
Showing 1,476 changed files with 10,993 additions and 9,529 deletions.
4 changes: 3 additions & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,9 @@ repos:
- id: pyupgrade
name: Upgrade Python code automatically
args: ["--py37-plus"]
exclude: ^airflow/_vendor/
# We need to exclude gcs hook from pyupgrade because it has public "list" command which clashes
# with `list` that is used as type
exclude: ^airflow/_vendor/|^airflow/providers/google/cloud/hooks/gcs.py$
- repo: https://github.com/pre-commit/pygrep-hooks
rev: v1.9.0
hooks:
Expand Down
8 changes: 4 additions & 4 deletions airflow/providers/airbyte/hooks/airbyte.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

import time
from typing import Any, Optional, Union
from typing import Any

from airflow.exceptions import AirflowException
from airflow.providers.http.hooks.http import HttpHook
Expand Down Expand Up @@ -48,9 +50,7 @@ def __init__(self, airbyte_conn_id: str = "airbyte_default", api_version: str =
super().__init__(http_conn_id=airbyte_conn_id)
self.api_version: str = api_version

def wait_for_job(
self, job_id: Union[str, int], wait_seconds: float = 3, timeout: Optional[float] = 3600
) -> None:
def wait_for_job(self, job_id: str | int, wait_seconds: float = 3, timeout: float | None = 3600) -> None:
"""
Helper method which polls a job to check if it finishes.
Expand Down
10 changes: 6 additions & 4 deletions airflow/providers/airbyte/operators/airbyte.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from typing import TYPE_CHECKING, Optional, Sequence
from __future__ import annotations

from typing import TYPE_CHECKING, Sequence

from airflow.models import BaseOperator
from airflow.providers.airbyte.hooks.airbyte import AirbyteHook
Expand Down Expand Up @@ -51,10 +53,10 @@ def __init__(
self,
connection_id: str,
airbyte_conn_id: str = "airbyte_default",
asynchronous: Optional[bool] = False,
asynchronous: bool | None = False,
api_version: str = "v1",
wait_seconds: float = 3,
timeout: Optional[float] = 3600,
timeout: float | None = 3600,
**kwargs,
) -> None:
super().__init__(**kwargs)
Expand All @@ -65,7 +67,7 @@ def __init__(
self.wait_seconds = wait_seconds
self.asynchronous = asynchronous

def execute(self, context: 'Context') -> None:
def execute(self, context: Context) -> None:
"""Create Airbyte Job and wait to finish"""
self.hook = AirbyteHook(airbyte_conn_id=self.airbyte_conn_id, api_version=self.api_version)
job_object = self.hook.submit_sync_connection(connection_id=self.connection_id)
Expand Down
4 changes: 3 additions & 1 deletion airflow/providers/airbyte/sensors/airbyte.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
# specific language governing permissions and limitations
# under the License.
"""This module contains a Airbyte Job sensor."""
from __future__ import annotations

from typing import TYPE_CHECKING, Sequence

from airflow.exceptions import AirflowException
Expand Down Expand Up @@ -52,7 +54,7 @@ def __init__(
self.airbyte_job_id = airbyte_job_id
self.api_version = api_version

def poke(self, context: 'Context') -> bool:
def poke(self, context: Context) -> bool:
hook = AirbyteHook(airbyte_conn_id=self.airbyte_conn_id, api_version=self.api_version)
job = hook.get_job(job_id=self.airbyte_job_id)
status = job.json()['job']['status']
Expand Down
38 changes: 20 additions & 18 deletions airflow/providers/alibaba/cloud/hooks/oss.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

from functools import wraps
from inspect import signature
from typing import TYPE_CHECKING, Callable, Optional, TypeVar, cast
from typing import TYPE_CHECKING, Callable, TypeVar, cast
from urllib.parse import urlparse

import oss2
Expand Down Expand Up @@ -88,13 +90,13 @@ class OSSHook(BaseHook):
conn_type = 'oss'
hook_name = 'OSS'

def __init__(self, region: Optional[str] = None, oss_conn_id='oss_default', *args, **kwargs) -> None:
def __init__(self, region: str | None = None, oss_conn_id='oss_default', *args, **kwargs) -> None:
self.oss_conn_id = oss_conn_id
self.oss_conn = self.get_connection(oss_conn_id)
self.region = self.get_default_region() if region is None else region
super().__init__(*args, **kwargs)

def get_conn(self) -> "Connection":
def get_conn(self) -> Connection:
"""Returns connection for the hook."""
return self.oss_conn

Expand All @@ -118,7 +120,7 @@ def parse_oss_url(ossurl: str) -> tuple:

@provide_bucket_name
@unify_bucket_name_and_key
def object_exists(self, key: str, bucket_name: Optional[str] = None) -> bool:
def object_exists(self, key: str, bucket_name: str | None = None) -> bool:
"""
Check if object exists.
Expand All @@ -134,7 +136,7 @@ def object_exists(self, key: str, bucket_name: Optional[str] = None) -> bool:
return False

@provide_bucket_name
def get_bucket(self, bucket_name: Optional[str] = None) -> oss2.api.Bucket:
def get_bucket(self, bucket_name: str | None = None) -> oss2.api.Bucket:
"""
Returns a oss2.Bucket object
Expand All @@ -148,7 +150,7 @@ def get_bucket(self, bucket_name: Optional[str] = None) -> oss2.api.Bucket:

@provide_bucket_name
@unify_bucket_name_and_key
def load_string(self, key: str, content: str, bucket_name: Optional[str] = None) -> None:
def load_string(self, key: str, content: str, bucket_name: str | None = None) -> None:
"""
Loads a string to OSS
Expand All @@ -167,7 +169,7 @@ def upload_local_file(
self,
key: str,
file: str,
bucket_name: Optional[str] = None,
bucket_name: str | None = None,
) -> None:
"""
Upload a local file to OSS
Expand All @@ -187,8 +189,8 @@ def download_file(
self,
key: str,
local_file: str,
bucket_name: Optional[str] = None,
) -> Optional[str]:
bucket_name: str | None = None,
) -> str | None:
"""
Download file from OSS
Expand All @@ -210,7 +212,7 @@ def download_file(
def delete_object(
self,
key: str,
bucket_name: Optional[str] = None,
bucket_name: str | None = None,
) -> None:
"""
Delete object from OSS
Expand All @@ -229,7 +231,7 @@ def delete_object(
def delete_objects(
self,
key: list,
bucket_name: Optional[str] = None,
bucket_name: str | None = None,
) -> None:
"""
Delete objects from OSS
Expand All @@ -246,7 +248,7 @@ def delete_objects(
@provide_bucket_name
def delete_bucket(
self,
bucket_name: Optional[str] = None,
bucket_name: str | None = None,
) -> None:
"""
Delete bucket from OSS
Expand All @@ -262,7 +264,7 @@ def delete_bucket(
@provide_bucket_name
def create_bucket(
self,
bucket_name: Optional[str] = None,
bucket_name: str | None = None,
) -> None:
"""
Create bucket
Expand All @@ -277,7 +279,7 @@ def create_bucket(

@provide_bucket_name
@unify_bucket_name_and_key
def append_string(self, bucket_name: Optional[str], content: str, key: str, pos: int) -> None:
def append_string(self, bucket_name: str | None, content: str, key: str, pos: int) -> None:
"""
Append string to a remote existing file
Expand All @@ -295,7 +297,7 @@ def append_string(self, bucket_name: Optional[str], content: str, key: str, pos:

@provide_bucket_name
@unify_bucket_name_and_key
def read_key(self, bucket_name: Optional[str], key: str) -> str:
def read_key(self, bucket_name: str | None, key: str) -> str:
"""
Read oss remote object content with the specified key
Expand All @@ -311,7 +313,7 @@ def read_key(self, bucket_name: Optional[str], key: str) -> str:

@provide_bucket_name
@unify_bucket_name_and_key
def head_key(self, bucket_name: Optional[str], key: str) -> oss2.models.HeadObjectResult:
def head_key(self, bucket_name: str | None, key: str) -> oss2.models.HeadObjectResult:
"""
Get meta info of the specified remote object
Expand All @@ -327,7 +329,7 @@ def head_key(self, bucket_name: Optional[str], key: str) -> oss2.models.HeadObje

@provide_bucket_name
@unify_bucket_name_and_key
def key_exist(self, bucket_name: Optional[str], key: str) -> bool:
def key_exist(self, bucket_name: str | None, key: str) -> bool:
"""
Find out whether the specified key exists in the oss remote storage
Expand Down Expand Up @@ -360,7 +362,7 @@ def get_credential(self) -> oss2.auth.Auth:

return oss2.Auth(oss_access_key_id, oss_access_key_secret)

def get_default_region(self) -> Optional[str]:
def get_default_region(self) -> str | None:
extra_config = self.oss_conn.extra_dejson
auth_type = extra_config.get('auth_type', None)
if not auth_type:
Expand Down
2 changes: 2 additions & 0 deletions airflow/providers/alibaba/cloud/log/oss_task_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

import contextlib
import os
import pathlib
Expand Down
29 changes: 15 additions & 14 deletions airflow/providers/alibaba/cloud/operators/oss.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,10 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

"""This module contains Alibaba Cloud OSS operators."""
from typing import TYPE_CHECKING, Optional
from __future__ import annotations

from typing import TYPE_CHECKING

from airflow.models import BaseOperator
from airflow.providers.alibaba.cloud.hooks.oss import OSSHook
Expand All @@ -38,7 +39,7 @@ class OSSCreateBucketOperator(BaseOperator):
def __init__(
self,
region: str,
bucket_name: Optional[str] = None,
bucket_name: str | None = None,
oss_conn_id: str = 'oss_default',
**kwargs,
) -> None:
Expand All @@ -47,7 +48,7 @@ def __init__(
self.region = region
self.bucket_name = bucket_name

def execute(self, context: 'Context'):
def execute(self, context: Context):
oss_hook = OSSHook(oss_conn_id=self.oss_conn_id, region=self.region)
oss_hook.create_bucket(bucket_name=self.bucket_name)

Expand All @@ -64,7 +65,7 @@ class OSSDeleteBucketOperator(BaseOperator):
def __init__(
self,
region: str,
bucket_name: Optional[str] = None,
bucket_name: str | None = None,
oss_conn_id: str = 'oss_default',
**kwargs,
) -> None:
Expand All @@ -73,7 +74,7 @@ def __init__(
self.region = region
self.bucket_name = bucket_name

def execute(self, context: 'Context'):
def execute(self, context: Context):
oss_hook = OSSHook(oss_conn_id=self.oss_conn_id, region=self.region)
oss_hook.delete_bucket(bucket_name=self.bucket_name)

Expand All @@ -94,7 +95,7 @@ def __init__(
key: str,
file: str,
region: str,
bucket_name: Optional[str] = None,
bucket_name: str | None = None,
oss_conn_id: str = 'oss_default',
**kwargs,
) -> None:
Expand All @@ -105,7 +106,7 @@ def __init__(
self.region = region
self.bucket_name = bucket_name

def execute(self, context: 'Context'):
def execute(self, context: Context):
oss_hook = OSSHook(oss_conn_id=self.oss_conn_id, region=self.region)
oss_hook.upload_local_file(bucket_name=self.bucket_name, key=self.key, file=self.file)

Expand All @@ -126,7 +127,7 @@ def __init__(
key: str,
file: str,
region: str,
bucket_name: Optional[str] = None,
bucket_name: str | None = None,
oss_conn_id: str = 'oss_default',
**kwargs,
) -> None:
Expand All @@ -137,7 +138,7 @@ def __init__(
self.region = region
self.bucket_name = bucket_name

def execute(self, context: 'Context'):
def execute(self, context: Context):
oss_hook = OSSHook(oss_conn_id=self.oss_conn_id, region=self.region)
oss_hook.download_file(bucket_name=self.bucket_name, key=self.key, local_file=self.file)

Expand All @@ -156,7 +157,7 @@ def __init__(
self,
keys: list,
region: str,
bucket_name: Optional[str] = None,
bucket_name: str | None = None,
oss_conn_id: str = 'oss_default',
**kwargs,
) -> None:
Expand All @@ -166,7 +167,7 @@ def __init__(
self.region = region
self.bucket_name = bucket_name

def execute(self, context: 'Context'):
def execute(self, context: Context):
oss_hook = OSSHook(oss_conn_id=self.oss_conn_id, region=self.region)
oss_hook.delete_objects(bucket_name=self.bucket_name, key=self.keys)

Expand All @@ -185,7 +186,7 @@ def __init__(
self,
key: str,
region: str,
bucket_name: Optional[str] = None,
bucket_name: str | None = None,
oss_conn_id: str = 'oss_default',
**kwargs,
) -> None:
Expand All @@ -195,6 +196,6 @@ def __init__(
self.region = region
self.bucket_name = bucket_name

def execute(self, context: 'Context'):
def execute(self, context: Context):
oss_hook = OSSHook(oss_conn_id=self.oss_conn_id, region=self.region)
oss_hook.delete_object(bucket_name=self.bucket_name, key=self.key)
Loading

0 comments on commit 06acf40

Please sign in to comment.