Skip to content

Commit

Permalink
Update GcsIO initialization to support converting input parameters to…
Browse files Browse the repository at this point in the history
… PipelineOptions for authentication (apache#23766)

fixes apache#23764
  • Loading branch information
lukecwik authored Oct 21, 2022
1 parent a138a4f commit 2e49c7e
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 0 deletions.
6 changes: 6 additions & 0 deletions sdks/python/apache_beam/internal/gcp/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@
import logging
import socket
import threading
from typing import Optional

from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.options.pipeline_options import PipelineOptions

# google.auth is only available when Beam is installed with the gcp extra.
try:
Expand Down Expand Up @@ -63,6 +65,8 @@ def set_running_in_gce(worker_executing_project):


def get_service_credentials(pipeline_options):
# type: (PipelineOptions) -> Optional[google.auth.credentials.Credentials]

"""For internal use only; no backwards-compatibility guarantees.
Get credentials to access Google services.
Expand Down Expand Up @@ -115,6 +119,7 @@ class _Credentials(object):

@classmethod
def get_service_credentials(cls, pipeline_options):
# type: (PipelineOptions) -> Optional[google.auth.credentials.Credentials]
with cls._credentials_lock:
if cls._credentials_init:
return cls._credentials
Expand All @@ -134,6 +139,7 @@ def get_service_credentials(cls, pipeline_options):

@staticmethod
def _get_service_credentials(pipeline_options):
# type: (PipelineOptions) -> Optional[google.auth.credentials.Credentials]
if not _GOOGLE_AUTH_AVAILABLE:
_LOGGER.warning(
'Unable to find default credentials because the google-auth library '
Expand Down
8 changes: 8 additions & 0 deletions sdks/python/apache_beam/io/gcp/gcsio.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
import time
import traceback
from itertools import islice
from typing import Optional
from typing import Union

import apache_beam
from apache_beam.internal.http_client import get_new_http
Expand All @@ -49,6 +51,7 @@
from apache_beam.io.filesystemio import UploaderStream
from apache_beam.io.gcp import resource_identifiers
from apache_beam.metrics import monitoring_infos
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.utils import retry

__all__ = ['GcsIO']
Expand Down Expand Up @@ -158,7 +161,12 @@ class GcsIOError(IOError, retry.PermanentException):
class GcsIO(object):
"""Google Cloud Storage I/O client."""
def __init__(self, storage_client=None, pipeline_options=None):
# type: (Optional[storage.StorageV1], Optional[Union[dict, PipelineOptions]]) -> None
if storage_client is None:
if not pipeline_options:
pipeline_options = PipelineOptions()
elif isinstance(pipeline_options, dict):
pipeline_options = PipelineOptions.from_dictionary(pipeline_options)
storage_client = storage.StorageV1(
credentials=auth.get_service_credentials(pipeline_options),
get_credentials=False,
Expand Down

0 comments on commit 2e49c7e

Please sign in to comment.