forked from Kaggle/docker-python
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathkaggle_gcp.py
85 lines (68 loc) · 3.17 KB
/
kaggle_gcp.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
import os
from google.auth import credentials
from google.auth.exceptions import RefreshError
from google.cloud import bigquery
from google.cloud.exceptions import Forbidden
from google.cloud.bigquery._http import Connection
from kaggle_secrets import UserSecretsClient
def get_integrations():
kernel_integrations_var = os.getenv("KAGGLE_KERNEL_INTEGRATIONS")
kernel_integrations = KernelIntegrations()
if kernel_integrations_var is None:
return kernel_integrations
for integration in kernel_integrations_var.split(':'):
kernel_integrations.add_integration(integration.lower())
return kernel_integrations
class KernelIntegrations():
def __init__(self):
self.integrations = {}
def add_integration(self, integration_name):
self.integrations[integration_name] = True
def has_bigquery(self):
return 'bigquery' in self.integrations.keys()
class KaggleKernelCredentials(credentials.Credentials):
"""Custom Credentials used to authenticate using the Kernel's connected OAuth account.
Example usage:
client = bigquery.Client(project='ANOTHER_PROJECT',
credentials=KaggleKernelCredentials())
"""
def refresh(self, request):
try:
client = UserSecretsClient()
self.token, self.expiry = client.get_bigquery_access_token()
except Exception as e:
if (not get_integrations().has_bigquery()):
print(
'Please ensure you have selected a BigQuery account in the Kernels Settings sidebar.')
raise RefreshError('Unable to refresh access token.') from e
class _DataProxyConnection(Connection):
"""Custom Connection class used to proxy the BigQuery client to Kaggle's data proxy."""
API_BASE_URL = os.getenv("KAGGLE_DATA_PROXY_URL")
def __init__(self, client):
super().__init__(client)
self._EXTRA_HEADERS["X-KAGGLE-PROXY-DATA"] = os.getenv(
"KAGGLE_DATA_PROXY_TOKEN")
def api_request(self, *args, **kwargs):
"""Wrap Connection.api_request in order to handle errors gracefully.
"""
try:
super().api_request(*args, **kwargs)
except Forbidden as e:
print("Permission denied using Kaggle's public BigQuery integration. "
"Did you mean to select a BigQuery account in the Kernels Settings sidebar?")
raise e
class PublicBigqueryClient(bigquery.client.Client):
"""A modified BigQuery client that routes requests using Kaggle's Data Proxy to provide free access to Public Datasets.
Example usage:
from kaggle import PublicBigqueryClient
client = PublicBigqueryClient()
"""
def __init__(self, *args, **kwargs):
data_proxy_project = os.getenv("KAGGLE_DATA_PROXY_PROJECT")
anon_credentials = credentials.AnonymousCredentials()
anon_credentials.refresh = lambda *args: None
super().__init__(
project=data_proxy_project, credentials=anon_credentials, *args, **kwargs
)
# TODO: Remove this once https://github.com/googleapis/google-cloud-python/issues/7122 is implemented.
self._connection = _DataProxyConnection(self)