-
Notifications
You must be signed in to change notification settings - Fork 42
/
Copy pathget-broadcast-stats
executable file
·96 lines (77 loc) · 2.98 KB
/
get-broadcast-stats
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
#!/usr/bin/env python
from __future__ import print_function
import json
from requests_oauthlib import OAuth2Session
try:
from urllib.parse import urlencode
except ImportError:
from urllib import urlencode
with open('credentials.json', 'rt') as f:
credentials = json.load(f)
TOKEN_URL = "https://auth.aweber.com/oauth2/token"
client_id = credentials['client_id']
client_secret = credentials['client_secret']
token = credentials['token']
extra = {
'client_id': client_id,
'client_secret': client_secret,
'token': token
}
def token_updater(token):
with open('credentials.json', 'wt') as creds_file:
json.dump(
{
'client_id': client_id,
'client_secret': client_secret,
'token': token
}, creds_file)
print('Token was refreshed.\n'
'Updated credentials.json with your new credentials')
session = OAuth2Session(client_id=credentials['client_id'],
token=credentials['token'],
auto_refresh_url=TOKEN_URL,
auto_refresh_kwargs=extra,
token_updater=token_updater)
def get_collection(url):
collection = []
while url:
response = session.get(url)
response.raise_for_status()
body = response.json()
collection.extend(body.pop('entries'))
# if there is a next link, there are more pages to retrieve
next_link = body.get('next_collection_link')
url = next_link if next_link else None
return collection
# get all the accounts entries
account_url = 'https://api.aweber.com/1.0/accounts'
accounts = get_collection(account_url)
# get all the list entries for the first account
lists_url = accounts[0]['lists_collection_link']
lists = get_collection(lists_url)
# get a sent broadcast
params = urlencode({'ws.op': 'find', 'campaign_type': 'b'})
broadcasts_url = '{}?{}'.format(lists[0]['campaigns_collection_link'], params)
sent_broadcasts = get_collection(broadcasts_url)
broadcast = sent_broadcasts[0] # get the first broadcast to retrieve stats
print('Broadcast id: {} Subject: {} Sent At: {}\n'.format(
broadcast['id'], broadcast['subject'], broadcast['sent_at']))
stats = get_collection(broadcast['stats_collection_link'])
# look at every stat for the first broadcast
for stat in stats:
# stats entries can be in many formats
# daily stats are shown over 14 days
# hourly stats are shown over 24 hours
# some stats show the top 10 (e.g. webhits_by_link)
# some stats show total counts (e.g. total_clicks)
value = stat['value']
if not value:
print('\n{}: N/A'.format(stat['description']))
elif isinstance(value, list):
print('\n{}:'.format(stat['description']))
for item in value:
print('') # add an extra newline between each item
for k, v in item.items():
print(' {}: {}'.format(k, v))
else:
print('\n{}: {}'.format(stat['description'], value))