Skip to content

Commit

Permalink
Add CloudRunServiceHook and CloudRunCreateServiceOperator (apache…
Browse files Browse the repository at this point in the history
…#40008)

* init cloud_run service hook & operator

* init docs

* added example_cloud_run_service

* remove parent init

Co-authored-by: Shahar Epstein <[email protected]>

* apply reveiw - more clearer

* typo

* add ENV_ID in system tests

* validate inputs

* add deleteOperator & test

* typo

* modified second review

* 404/409 case in operator

* pass static checks

* typo

* modified service creation in hook & operator

* fixed by reviews in tests

* modified docs

* fixed test code to success

* modified hook

* add exceptions

* modified test codes

* static checks

---------

Co-authored-by: Shahar Epstein <[email protected]>
  • Loading branch information
jx2lee and shahar1 authored Aug 10, 2024
1 parent 6b810b8 commit 0aad0c8
Show file tree
Hide file tree
Showing 6 changed files with 676 additions and 4 deletions.
124 changes: 124 additions & 0 deletions airflow/providers/google/cloud/hooks/cloud_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,19 @@

from google.cloud.run_v2 import (
CreateJobRequest,
CreateServiceRequest,
DeleteJobRequest,
DeleteServiceRequest,
GetJobRequest,
GetServiceRequest,
Job,
JobsAsyncClient,
JobsClient,
ListJobsRequest,
RunJobRequest,
Service,
ServicesAsyncClient,
ServicesClient,
UpdateJobRequest,
)
from google.longrunning import operations_pb2 # type: ignore[attr-defined]
Expand All @@ -39,6 +45,7 @@

if TYPE_CHECKING:
from google.api_core import operation
from google.api_core.operation_async import AsyncOperation
from google.cloud.run_v2.services.jobs import pagers


Expand Down Expand Up @@ -183,3 +190,120 @@ async def get_operation(self, operation_name: str) -> operations_pb2.Operation:
return await self.get_conn().get_operation(
operations_pb2.GetOperationRequest(name=operation_name), timeout=120
)


class CloudRunServiceHook(GoogleBaseHook):
"""
Hook for the Google Cloud Run services.
:param gcp_conn_id: The connection ID to use when fetching connection info.
:param impersonation_chain: Optional service account to impersonate using short-term
credentials, or chained list of accounts required to get the access_token
of the last account in the list, which will be impersonated in the request.
If set as a string, the account must grant the originating account
the Service Account Token Creator IAM role.
If set as a sequence, the identities from the list must grant
Service Account Token Creator IAM role to the directly preceding identity, with first
account from the list granting this role to the originating account.
"""

def __init__(
self,
gcp_conn_id: str = "google_cloud_default",
impersonation_chain: str | Sequence[str] | None = None,
):
self._client: ServicesClient | None = None
super().__init__(gcp_conn_id=gcp_conn_id, impersonation_chain=impersonation_chain)

def get_conn(self):
if self._client is None:
self._client = ServicesClient(credentials=self.get_credentials(), client_info=CLIENT_INFO)

return self._client

@GoogleBaseHook.fallback_to_default_project_id
def get_service(self, service_name: str, region: str, project_id: str = PROVIDE_PROJECT_ID):
get_service_request = GetServiceRequest(
name=f"projects/{project_id}/locations/{region}/services/{service_name}"
)
return self.get_conn().get_service(get_service_request)

@GoogleBaseHook.fallback_to_default_project_id
def create_service(
self, service_name: str, service: Service | dict, region: str, project_id: str = PROVIDE_PROJECT_ID
) -> Service:
if isinstance(service, dict):
service = Service(service)

create_request = CreateServiceRequest(
parent=f"projects/{project_id}/locations/{region}",
service=service,
service_id=service_name,
)

operation = self.get_conn().create_service(create_request)
return operation.result()

@GoogleBaseHook.fallback_to_default_project_id
def delete_service(self, service_name: str, region: str, project_id: str = PROVIDE_PROJECT_ID) -> Service:
delete_request = DeleteServiceRequest(
name=f"projects/{project_id}/locations/{region}/services/{service_name}"
)

operation = self.get_conn().delete_service(delete_request)
return operation.result()


class CloudRunServiceAsyncHook(GoogleBaseHook):
"""
Async hook for the Google Cloud Run services.
:param gcp_conn_id: The connection ID to use when fetching connection info.
:param impersonation_chain: Optional service account to impersonate using short-term
credentials, or chained list of accounts required to get the access_token
of the last account in the list, which will be impersonated in the request.
If set as a string, the account must grant the originating account
the Service Account Token Creator IAM role.
If set as a sequence, the identities from the list must grant
Service Account Token Creator IAM role to the directly preceding identity, with first
account from the list granting this role to the originating account.
"""

def __init__(
self,
gcp_conn_id: str = "google_cloud_default",
impersonation_chain: str | Sequence[str] | None = None,
):
self._client: ServicesClient | None = None
super().__init__(gcp_conn_id=gcp_conn_id, impersonation_chain=impersonation_chain)

def get_conn(self):
if self._client is None:
self._client = ServicesAsyncClient(credentials=self.get_credentials(), client_info=CLIENT_INFO)

return self._client

@GoogleBaseHook.fallback_to_default_project_id
async def create_service(
self, service_name: str, service: Service | dict, region: str, project_id: str = PROVIDE_PROJECT_ID
) -> AsyncOperation:
if isinstance(service, dict):
service = Service(service)

create_request = CreateServiceRequest(
parent=f"projects/{project_id}/locations/{region}",
service=service,
service_id=service_name,
)

return await self.get_conn().create_service(create_request)

@GoogleBaseHook.fallback_to_default_project_id
async def delete_service(
self, service_name: str, region: str, project_id: str = PROVIDE_PROJECT_ID
) -> AsyncOperation:
delete_request = DeleteServiceRequest(
name=f"projects/{project_id}/locations/{region}/services/{service_name}"
)

return await self.get_conn().delete_service(delete_request)
145 changes: 143 additions & 2 deletions airflow/providers/google/cloud/operators/cloud_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@

from typing import TYPE_CHECKING, Any, Sequence

from google.cloud.run_v2 import Job
import google.cloud.exceptions
from google.api_core.exceptions import AlreadyExists
from google.cloud.run_v2 import Job, Service

from airflow.configuration import conf
from airflow.exceptions import AirflowException
from airflow.providers.google.cloud.hooks.cloud_run import CloudRunHook
from airflow.providers.google.cloud.hooks.cloud_run import CloudRunHook, CloudRunServiceHook
from airflow.providers.google.cloud.operators.cloud_base import GoogleCloudBaseOperator
from airflow.providers.google.cloud.triggers.cloud_run import CloudRunJobFinishedTrigger, RunJobStatus

Expand Down Expand Up @@ -353,3 +355,142 @@ def _wait_for_operation(self, operation: operation.Operation):
except Exception:
error = operation.exception(timeout=self.timeout_seconds)
raise AirflowException(error)


class CloudRunCreateServiceOperator(GoogleCloudBaseOperator):
"""
Creates a Service without executing it. Pushes the created service to xcom.
:param project_id: Required. The ID of the Google Cloud project that the service belongs to.
:param region: Required. The ID of the Google Cloud region that the service belongs to.
:param service_name: Required. The name of the service to create.
:param service: The service descriptor containing the configuration of the service to submit.
:param gcp_conn_id: The connection ID used to connect to Google Cloud.
:param impersonation_chain: Optional service account to impersonate using short-term
credentials, or chained list of accounts required to get the access_token
of the last account in the list, which will be impersonated in the request.
If set as a string, the account must grant the originating account
the Service Account Token Creator IAM role.
If set as a sequence, the identities from the list must grant
Service Account Token Creator IAM role to the directly preceding identity, with first
account from the list granting this role to the originating account (templated).
"""

template_fields = ("project_id", "region", "gcp_conn_id", "impersonation_chain", "service_name")

def __init__(
self,
project_id: str,
region: str,
service_name: str,
service: dict | Service,
gcp_conn_id: str = "google_cloud_default",
impersonation_chain: str | Sequence[str] | None = None,
**kwargs,
):
super().__init__(**kwargs)
self.project_id = project_id
self.region = region
self.service = service
self.service_name = service_name
self.gcp_conn_id = gcp_conn_id
self.impersonation_chain = impersonation_chain
self._validate_inputs()

def _validate_inputs(self):
missing_fields = [k for k in ["project_id", "region", "service_name"] if not getattr(self, k)]
if not self.project_id or not self.region or not self.service_name:
raise AirflowException(
f"Required parameters are missing: {missing_fields}. These parameters be passed either as "
"keyword parameter or as extra field in Airflow connection definition. Both are not set!"
)

def execute(self, context: Context):
hook: CloudRunServiceHook = CloudRunServiceHook(
gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain
)

try:
service = hook.create_service(
service=self.service,
service_name=self.service_name,
region=self.region,
project_id=self.project_id,
)
except AlreadyExists:
self.log.info(
"Already existed Cloud run service, service_name=%s, region=%s",
self.service_name,
self.region,
)
return hook.get_service(
service_name=self.service_name, region=self.region, project_id=self.project_id
)
except google.cloud.exceptions.GoogleCloudError as e:
self.log.error("An error occurred. Exiting.")
raise e

return Service.to_dict(service)


class CloudRunDeleteServiceOperator(GoogleCloudBaseOperator):
"""
Deletes a Service without executing it. Pushes the deleted service to xcom.
:param project_id: Required. The ID of the Google Cloud project that the service belongs to.
:param region: Required. The ID of the Google Cloud region that the service belongs to.
:param service_name: Required. The name of the service to create.
:param gcp_conn_id: The connection ID used to connect to Google Cloud.
:param impersonation_chain: Optional service account to impersonate using short-term
credentials, or chained list of accounts required to get the access_token
of the last account in the list, which will be impersonated in the request.
If set as a string, the account must grant the originating account
the Service Account Token Creator IAM role.
If set as a sequence, the identities from the list must grant
Service Account Token Creator IAM role to the directly preceding identity, with first
account from the list granting this role to the originating account (templated).
"""

template_fields = ("project_id", "region", "gcp_conn_id", "impersonation_chain", "service_name")

def __init__(
self,
project_id: str,
region: str,
service_name: str,
gcp_conn_id: str = "google_cloud_default",
impersonation_chain: str | Sequence[str] | None = None,
**kwargs,
):
super().__init__(**kwargs)
self.project_id = project_id
self.region = region
self.service_name = service_name
self.gcp_conn_id = gcp_conn_id
self.impersonation_chain = impersonation_chain
self._validate_inputs()

def _validate_inputs(self):
missing_fields = [k for k in ["project_id", "region", "service_name"] if not getattr(self, k)]
if not self.project_id or not self.region or not self.service_name:
raise AirflowException(
f"Required parameters are missing: {missing_fields}. These parameters be passed either as "
"keyword parameter or as extra field in Airflow connection definition. Both are not set!"
)

def execute(self, context: Context):
hook: CloudRunServiceHook = CloudRunServiceHook(
gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain
)

try:
service = hook.delete_service(
service_name=self.service_name,
region=self.region,
project_id=self.project_id,
)
except google.cloud.exceptions.NotFound as e:
self.log.error("An error occurred. Not Found.")
raise e

return Service.to_dict(service)
42 changes: 42 additions & 0 deletions docs/apache-airflow-providers-google/operators/cloud/cloud_run.rst
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,48 @@ You can create a Cloud Run Job with any of these configurations :

Note that this operator only creates the job without executing it. The Job's dictionary representation is pushed to XCom.

Create a service
---------------------

Before you create a service in Cloud Run, you need to define it.
For more information about the Service object fields, visit `Google Cloud Run Service description <https://cloud.google.com/run/docs/reference/rpc/google.cloud.run.v2#google.cloud.run.v2.Service>`__

A simple service configuration can look as follows:

.. exampleinclude:: /../../tests/system/providers/google/cloud/cloud_run/example_cloud_run_service.py
:language: python
:dedent: 0
:start-after: [START howto_operator_cloud_run_service_creation]
:end-before: [END howto_operator_cloud_run_service_creation]


With this configuration we can create the service:
:class:`~airflow.providers.google.cloud.operators.cloud_run.CloudRunCreateServiceOperator`

.. exampleinclude:: /../../tests/system/providers/google/cloud/cloud_run/example_cloud_run_service.py
:language: python
:dedent: 4
:start-after: [START howto_operator_cloud_run_create_service]
:end-before: [END howto_operator_cloud_run_create_service]


Note that this operator only creates the service without executing it. The Service's dictionary representation is pushed to XCom.

Delete a service
---------------------

With this configuration we can delete the service:
:class:`~airflow.providers.google.cloud.operators.cloud_run.CloudRunDeleteServiceOperator`

.. exampleinclude:: /../../tests/system/providers/google/cloud/cloud_run/example_cloud_run_service.py
:language: python
:dedent: 4
:start-after: [START howto_operator_cloud_run_delete_service]
:end-before: [END howto_operator_cloud_run_delete_service]


Note this operator waits for the service to be deleted, and the deleted Service's dictionary representation is pushed to XCom.

Execute a job
---------------------

Expand Down
Loading

0 comments on commit 0aad0c8

Please sign in to comment.