-
Notifications
You must be signed in to change notification settings - Fork 1
/
auth.py
145 lines (115 loc) · 4.74 KB
/
auth.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
import os.path
import config
import json
import util
from flask import Flask, redirect, request, abort
import requests
import logging
import time
class HomeAuthStore:
def __init__(self, dir, filename):
self._dir = dir
self._filename = os.path.join(dir, filename)
@property
def _full_path(self):
return os.path.join(self._filename)
def load_auth(self):
if not os.path.exists(self._full_path):
return None
with open(self._full_path, 'r') as infile:
return json.load(infile)
def save_auth(self, auth):
if not os.path.exists(self._dir):
os.makedirs(self._dir)
with open(self._full_path, 'w') as outfile:
json.dump(auth, outfile)
class Authorizer:
AUTH_URL_TEMPLATE = 'https://www.strava.com/oauth/authorize?client_id={}'\
'&response_type=code&redirect_uri=http://localhost:{}/token'\
'&scope=activity:read_all,activity:write,profile:read_all'
TOKEN_URL = 'https://www.strava.com/oauth/token'
KEYS = ('expires_at', 'refresh_token', 'access_token')
def __init__(self, port, store=HomeAuthStore(
config.get_strava_cli_dir(), "auth.json")):
self._port = port
self._store = store
def index(self):
return redirect(Authorizer.AUTH_URL_TEMPLATE.format(
self._client_id, self._port), 302)
def get_token(self):
if 'error' in request.args:
return abort(401)
if 'code' in request.args:
refresher = TokenRefresher(self._client_id, self._client_secret)
auth = refresher.refresh(code=request.args['code'])
if auth:
self._store.save_auth(auth)
return "Token successfully stored"
return "An error occurred retrieving access token; "\
"please check your client id and client secret."
abort(404)
def authorize(self, client_id, client_secret):
self._client_id = client_id
self._client_secret = client_secret
app = Flask(__name__)
app.add_url_rule('/', 'index', self.index)
app.add_url_rule('/token', 'get_token', self.get_token)
log = logging.getLogger('werkzeug')
log.setLevel(logging.ERROR)
print("Navigate to http://localhost:{} using your web browser and "
"perform the authorization flow.\n"
"Press CTRL-C to abort".format(self._port))
app.run(port=self._port)
class TokenRefresher:
TOKEN_URL = 'https://www.strava.com/oauth/token'
REFRESH_TOKEN = 'refresh_token'
ACCESS_TOKEN = 'access_token'
EXPIRES_AT = 'expires_at'
CLIENT_ID = 'client_id'
CLIENT_SECRET = 'client_secret'
GRANT_TYPE = 'grant_type'
CODE = 'code'
AUTHORIZATION_CODE = 'authorization_code'
KEYS = (EXPIRES_AT, REFRESH_TOKEN, ACCESS_TOKEN)
def __init__(self, client_id, client_secret):
self._client_id = client_id
self._client_secret = client_secret
def refresh(self, code=None, refresh_token=None):
if code and refresh_token:
raise ValueError("Choose one between code and refresh_token")
data = {}
data[TokenRefresher.CLIENT_ID] = self._client_id
data[TokenRefresher.CLIENT_SECRET] = self._client_secret
data[TokenRefresher.GRANT_TYPE] = TokenRefresher.AUTHORIZATION_CODE if code else TokenRefresher.REFRESH_TOKEN
if refresh_token:
data[TokenRefresher.REFRESH_TOKEN] = refresh_token
if code:
data[TokenRefresher.CODE] = code
response = requests.post(TokenRefresher.TOKEN_URL, data=data)
d = response.json()
if TokenRefresher.ACCESS_TOKEN in d:
auth = {k: d[k] for k in TokenRefresher.KEYS}
auth[TokenRefresher.CLIENT_ID] = self._client_id
auth[TokenRefresher.CLIENT_SECRET] = self._client_secret
return auth
return None
class AccessTokenProvider:
def __init__(self, store):
self._store = store
def get_access_token(self):
auth = self._store.load_auth()
if time.time() < auth[TokenRefresher.EXPIRES_AT]:
return auth[TokenRefresher.ACCESS_TOKEN]
refresher = TokenRefresher(auth[TokenRefresher.CLIENT_ID], auth[TokenRefresher.CLIENT_SECRET])
auth = refresher.refresh(
refresh_token=auth[TokenRefresher.REFRESH_TOKEN])
if not auth:
return None
self._store.save_auth(auth)
return auth[TokenRefresher.ACCESS_TOKEN]
def auth_store():
return HomeAuthStore(config.get_strava_cli_dir(), "auth.json")
def authorizer(port):
return Authorizer(port, auth_store())
def access_token_provider():
return AccessTokenProvider(auth_store())