Skip to content

Commit

Permalink
the resource example file example_upload.txt has been moved to `air…
Browse files Browse the repository at this point in the history
…flow-system-tests-resources/gcs` (apache#37883)

Co-authored-by: tverdokhlib <[email protected]>
  • Loading branch information
moiseenkov and tverdokhlib authored Mar 5, 2024
1 parent 9d90166 commit 4df2d9c
Show file tree
Hide file tree
Showing 10 changed files with 109 additions and 178 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ There is a possibility to start S3ToGCSOperator asynchronously using deferrable
Transfer Service. By changing parameter ``poll_interval=10`` you can control frequency of polling a transfer
job status.

.. exampleinclude::/../tests/system/providers/google/cloud/gcs/example_s3_to_gcs_async.py
.. exampleinclude::/../tests/system/providers/google/cloud/gcs/example_s3_to_gcs.py
:language: python
:start-after: [START howto_transfer_s3togcs_operator_async]
:end-before: [END howto_transfer_s3togcs_operator_async]
Expand Down
20 changes: 11 additions & 9 deletions tests/system/providers/google/cloud/gcs/example_gcs_acl.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@

import os
from datetime import datetime
from pathlib import Path

from airflow.models.dag import DAG
from airflow.providers.google.cloud.operators.gcs import (
Expand All @@ -31,17 +30,18 @@
GCSDeleteBucketOperator,
GCSObjectCreateAclEntryOperator,
)
from airflow.providers.google.cloud.transfers.local_to_gcs import LocalFilesystemToGCSOperator
from airflow.providers.google.cloud.transfers.gcs_to_gcs import GCSToGCSOperator
from airflow.utils.trigger_rule import TriggerRule

ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT")

DAG_ID = "gcs_acl"

RESOURCES_BUCKET_NAME = "airflow-system-tests-resources"
BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}"
FILE_NAME = "example_upload.txt"
UPLOAD_FILE_PATH = str(Path(__file__).parent / "resources" / FILE_NAME)
UPLOAD_FILE_PATH = f"gcs/{FILE_NAME}"

GCS_ACL_ENTITY = "allUsers"
GCS_ACL_BUCKET_ROLE = "OWNER"
Expand All @@ -62,11 +62,13 @@
resource={"predefined_acl": "public_read_write"},
)

upload_file = LocalFilesystemToGCSOperator(
task_id="upload_file",
src=UPLOAD_FILE_PATH,
dst=FILE_NAME,
bucket=BUCKET_NAME,
copy_file = GCSToGCSOperator(
task_id="copy_example_gcs_file",
source_bucket=RESOURCES_BUCKET_NAME,
source_object=UPLOAD_FILE_PATH,
destination_bucket=BUCKET_NAME,
destination_object=FILE_NAME,
exact_match=True,
)

# [START howto_operator_gcs_bucket_create_acl_entry_task]
Expand Down Expand Up @@ -95,7 +97,7 @@
(
# TEST SETUP
create_bucket
>> upload_file
>> copy_file
# TEST BODY
>> gcs_bucket_create_acl_entry_task
>> gcs_object_create_acl_entry_task
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@

import os
from datetime import datetime
from pathlib import Path

from airflow.models.baseoperator import chain
from airflow.models.dag import DAG
Expand All @@ -35,18 +34,18 @@
GCSListObjectsOperator,
)
from airflow.providers.google.cloud.transfers.gcs_to_gcs import GCSToGCSOperator
from airflow.providers.google.cloud.transfers.local_to_gcs import LocalFilesystemToGCSOperator
from airflow.utils.trigger_rule import TriggerRule

ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT")

DAG_ID = "gcs_copy_delete"

RESOURCES_BUCKET_NAME = "airflow-system-tests-resources"
BUCKET_NAME_SRC = f"bucket_{DAG_ID}_{ENV_ID}"
BUCKET_NAME_DST = f"bucket_dst_{DAG_ID}_{ENV_ID}"
FILE_NAME = "example_upload.txt"
UPLOAD_FILE_PATH = str(Path(__file__).parent / "resources" / FILE_NAME)
UPLOAD_FILE_PATH = f"gcs/{FILE_NAME}"


with DAG(
Expand All @@ -68,11 +67,13 @@
project_id=PROJECT_ID,
)

upload_file = LocalFilesystemToGCSOperator(
upload_file = GCSToGCSOperator(
task_id="upload_file",
src=UPLOAD_FILE_PATH,
dst=FILE_NAME,
bucket=BUCKET_NAME_SRC,
source_bucket=RESOURCES_BUCKET_NAME,
source_object=UPLOAD_FILE_PATH,
destination_bucket=BUCKET_NAME_SRC,
destination_object=FILE_NAME,
exact_match=True,
)

# [START howto_operator_gcs_list_bucket]
Expand Down
47 changes: 26 additions & 21 deletions tests/system/providers/google/cloud/gcs/example_gcs_sensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@

import os
from datetime import datetime
from pathlib import Path

from airflow.models.baseoperator import chain
from airflow.models.dag import DAG
Expand All @@ -34,17 +33,18 @@
GCSObjectUpdateSensor,
GCSUploadSessionCompleteSensor,
)
from airflow.providers.google.cloud.transfers.local_to_gcs import LocalFilesystemToGCSOperator
from airflow.providers.google.cloud.transfers.gcs_to_gcs import GCSToGCSOperator
from airflow.utils.trigger_rule import TriggerRule

ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT")

DAG_ID = "gcs_sensor"

BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}"
RESOURCES_BUCKET_NAME = "airflow-system-tests-resources"
DESTINATION_BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}"
FILE_NAME = "example_upload.txt"
UPLOAD_FILE_PATH = str(Path(__file__).parent / "resources" / FILE_NAME)
UPLOAD_FILE_PATH = f"gcs/{FILE_NAME}"


def workaround_in_debug_executor(cls):
Expand Down Expand Up @@ -75,14 +75,14 @@ def mode_setter(self, value):
tags=["gcs", "example"],
) as dag:
create_bucket = GCSCreateBucketOperator(
task_id="create_bucket", bucket_name=BUCKET_NAME, project_id=PROJECT_ID
task_id="create_bucket", bucket_name=DESTINATION_BUCKET_NAME, project_id=PROJECT_ID
)

workaround_in_debug_executor(GCSUploadSessionCompleteSensor)

# [START howto_sensor_gcs_upload_session_complete_task]
gcs_upload_session_complete = GCSUploadSessionCompleteSensor(
bucket=BUCKET_NAME,
bucket=DESTINATION_BUCKET_NAME,
prefix=FILE_NAME,
inactivity_period=15,
min_objects=1,
Expand All @@ -94,7 +94,7 @@ def mode_setter(self, value):

# [START howto_sensor_gcs_upload_session_async_task]
gcs_upload_session_async_complete = GCSUploadSessionCompleteSensor(
bucket=BUCKET_NAME,
bucket=DESTINATION_BUCKET_NAME,
prefix=FILE_NAME,
inactivity_period=15,
min_objects=1,
Expand All @@ -107,72 +107,77 @@ def mode_setter(self, value):

# [START howto_sensor_object_update_exists_task]
gcs_update_object_exists = GCSObjectUpdateSensor(
bucket=BUCKET_NAME,
bucket=DESTINATION_BUCKET_NAME,
object=FILE_NAME,
task_id="gcs_object_update_sensor_task",
)
# [END howto_sensor_object_update_exists_task]

# [START howto_sensor_object_update_exists_task_async]
gcs_update_object_exists_async = GCSObjectUpdateSensor(
bucket=BUCKET_NAME, object=FILE_NAME, task_id="gcs_object_update_sensor_task_async", deferrable=True
bucket=DESTINATION_BUCKET_NAME,
object=FILE_NAME,
task_id="gcs_object_update_sensor_task_async",
deferrable=True,
)
# [END howto_sensor_object_update_exists_task_async]

upload_file = LocalFilesystemToGCSOperator(
task_id="upload_file",
src=UPLOAD_FILE_PATH,
dst=FILE_NAME,
bucket=BUCKET_NAME,
copy_file = GCSToGCSOperator(
task_id="copy_example_gcs_file",
source_bucket=RESOURCES_BUCKET_NAME,
source_object=UPLOAD_FILE_PATH,
destination_bucket=DESTINATION_BUCKET_NAME,
destination_object=FILE_NAME,
exact_match=True,
)

# [START howto_sensor_object_exists_task]
gcs_object_exists = GCSObjectExistenceSensor(
bucket=BUCKET_NAME,
bucket=DESTINATION_BUCKET_NAME,
object=FILE_NAME,
task_id="gcs_object_exists_task",
)
# [END howto_sensor_object_exists_task]

# [START howto_sensor_object_exists_task_async]
gcs_object_exists_async = GCSObjectExistenceAsyncSensor(
bucket=BUCKET_NAME,
bucket=DESTINATION_BUCKET_NAME,
object=FILE_NAME,
task_id="gcs_object_exists_task_async",
)
# [END howto_sensor_object_exists_task_async]

# [START howto_sensor_object_exists_task_defered]
gcs_object_exists_defered = GCSObjectExistenceSensor(
bucket=BUCKET_NAME, object=FILE_NAME, task_id="gcs_object_exists_defered", deferrable=True
bucket=DESTINATION_BUCKET_NAME, object=FILE_NAME, task_id="gcs_object_exists_defered", deferrable=True
)
# [END howto_sensor_object_exists_task_defered]

# [START howto_sensor_object_with_prefix_exists_task]
gcs_object_with_prefix_exists = GCSObjectsWithPrefixExistenceSensor(
bucket=BUCKET_NAME,
bucket=DESTINATION_BUCKET_NAME,
prefix=FILE_NAME[:5],
task_id="gcs_object_with_prefix_exists_task",
)
# [END howto_sensor_object_with_prefix_exists_task]

# [START howto_sensor_object_with_prefix_exists_task_async]
gcs_object_with_prefix_exists_async = GCSObjectsWithPrefixExistenceSensor(
bucket=BUCKET_NAME,
bucket=DESTINATION_BUCKET_NAME,
prefix=FILE_NAME[:5],
task_id="gcs_object_with_prefix_exists_task_async",
deferrable=True,
)
# [END howto_sensor_object_with_prefix_exists_task_async]

delete_bucket = GCSDeleteBucketOperator(
task_id="delete_bucket", bucket_name=BUCKET_NAME, trigger_rule=TriggerRule.ALL_DONE
task_id="delete_bucket", bucket_name=DESTINATION_BUCKET_NAME, trigger_rule=TriggerRule.ALL_DONE
)

chain(
# TEST SETUP
create_bucket,
upload_file,
copy_file,
# TEST BODY
[
gcs_object_exists,
Expand Down
25 changes: 15 additions & 10 deletions tests/system/providers/google/cloud/gcs/example_gcs_to_gdrive.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
from airflow.models.dag import DAG
from airflow.operators.bash import BashOperator
from airflow.providers.google.cloud.operators.gcs import GCSCreateBucketOperator, GCSDeleteBucketOperator
from airflow.providers.google.cloud.transfers.local_to_gcs import LocalFilesystemToGCSOperator
from airflow.providers.google.cloud.transfers.gcs_to_gcs import GCSToGCSOperator
from airflow.providers.google.suite.hooks.drive import GoogleDriveHook
from airflow.providers.google.suite.transfers.gcs_to_gdrive import GCSToGoogleDriveOperator
from airflow.settings import Session
Expand All @@ -46,13 +46,14 @@

DAG_ID = "example_gcs_to_gdrive"

RESOURCES_BUCKET_NAME = "airflow-system-tests-resources"
BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}"
CONNECTION_ID = f"connection_{DAG_ID}_{ENV_ID}"

TMP_PATH = "tmp"
WORK_DIR = f"folder_{DAG_ID}_{ENV_ID}".replace("-", "_")
CURRENT_FOLDER = Path(__file__).parent
LOCAL_PATH = str(Path(CURRENT_FOLDER) / "resources")
LOCAL_PATH = str(Path("gcs"))
FILE_LOCAL_PATH = str(Path(LOCAL_PATH))
FILE_NAME = "example_upload.txt"

Expand Down Expand Up @@ -94,18 +95,22 @@ def create_temp_gcp_connection():
task_id="create_bucket", bucket_name=BUCKET_NAME, project_id=PROJECT_ID
)

upload_file_1 = LocalFilesystemToGCSOperator(
upload_file_1 = GCSToGCSOperator(
task_id="upload_file_1",
src=f"{FILE_LOCAL_PATH}/{FILE_NAME}",
dst=f"{TMP_PATH}/{FILE_NAME}",
bucket=BUCKET_NAME,
source_bucket=RESOURCES_BUCKET_NAME,
source_object=f"{FILE_LOCAL_PATH}/{FILE_NAME}",
destination_bucket=BUCKET_NAME,
destination_object=f"{TMP_PATH}/{FILE_NAME}",
exact_match=True,
)

upload_file_2 = LocalFilesystemToGCSOperator(
upload_file_2 = GCSToGCSOperator(
task_id="upload_file_2",
src=f"{FILE_LOCAL_PATH}/{FILE_NAME}",
dst=f"{TMP_PATH}/2_{FILE_NAME}",
bucket=BUCKET_NAME,
source_bucket=RESOURCES_BUCKET_NAME,
source_object=f"{FILE_LOCAL_PATH}/{FILE_NAME}",
destination_bucket=BUCKET_NAME,
destination_object=f"{TMP_PATH}/2_{FILE_NAME}",
exact_match=True,
)
# [START howto_operator_gcs_to_gdrive_copy_single_file]
copy_single_file = GCSToGoogleDriveOperator(
Expand Down
15 changes: 9 additions & 6 deletions tests/system/providers/google/cloud/gcs/example_gcs_transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,18 +30,19 @@
GCSDeleteBucketOperator,
GCSFileTransformOperator,
)
from airflow.providers.google.cloud.transfers.local_to_gcs import LocalFilesystemToGCSOperator
from airflow.providers.google.cloud.transfers.gcs_to_gcs import GCSToGCSOperator
from airflow.utils.trigger_rule import TriggerRule

ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT")

DAG_ID = "gcs_transform"

RESOURCES_BUCKET_NAME = "airflow-system-tests-resources"
BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}"

FILE_NAME = "example_upload.txt"
UPLOAD_FILE_PATH = str(Path(__file__).parent / "resources" / FILE_NAME)
UPLOAD_FILE_PATH = f"gcs/{FILE_NAME}"

TRANSFORM_SCRIPT_PATH = str(Path(__file__).parent / "resources" / "transform_script.py")

Expand All @@ -59,11 +60,13 @@
project_id=PROJECT_ID,
)

upload_file = LocalFilesystemToGCSOperator(
upload_file = GCSToGCSOperator(
task_id="upload_file",
src=UPLOAD_FILE_PATH,
dst=FILE_NAME,
bucket=BUCKET_NAME,
source_bucket=RESOURCES_BUCKET_NAME,
source_object=UPLOAD_FILE_PATH,
destination_bucket=BUCKET_NAME,
destination_object=FILE_NAME,
exact_match=True,
)

# [START howto_operator_gcs_transform]
Expand Down
Loading

0 comments on commit 4df2d9c

Please sign in to comment.