Skip to content

Commit

Permalink
last date based on stats in the storage rather than last call
Browse files Browse the repository at this point in the history
  • Loading branch information
malleor committed Jan 31, 2017
1 parent 814b7d1 commit cd5db9a
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 32 deletions.
12 changes: 12 additions & 0 deletions elastic.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,3 +56,15 @@ def _unpack_response(self, r):
return None
else:
return r.json()

def field_stats(self, index, field):
# fetch stats
url = '/'.join([self.base_url, index, '_field_stats?fields=' + field])
r = rq.get(url)
ans = self._unpack_response(r)
if ans is None:
return None

# unpack it
stats = ans['indices']['_all']['fields']
return stats[field] if field in stats else None
2 changes: 1 addition & 1 deletion jira.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ def get(self, api, params=None, payload=None):
try:
print ' problem: ', r.json()
except:
print ' problem: ', r.text
print ' problem: ', r.text.encode('utf-8')
return None
else:
return r.json()
Expand Down
68 changes: 37 additions & 31 deletions jira_feeds.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
import os
import sys
from datetime import datetime
import time


PROJECT_NAME = os.environ['JIRA_PROJECT']


class CreatedIssuesFeed(object):
def __init__(self):
self.last_called = None
self.index = PROJECT_NAME.lower()
self.type = 'created'

Expand All @@ -27,26 +27,31 @@ def __init__(self):

class Issue(object):
def __init__(self, jira_issue):
self.created = jira_issue['fields']['created']
created_str = jira_issue['fields']['created'][:-5]
created_date = datetime.strptime(created_str, '%Y-%m-%dT%H:%M:%S.%f')
created_epoch = 1000*int(time.mktime(created_date.timetuple()))

self.created = created_epoch
self.issue_key = jira_issue['key']

def __call__(self, jira, storage):
# check the clock
now = datetime.now().strftime('%Y-%m-%d %H:%M')
if self.last_called is None:
print 'dry run at', now
sys.stdout.flush()
self.last_called = now
return

# assert that the storage is ready for receiving docs
if not storage.assert_mapping(self.index, self.mapping):
print 'mapping not set; dropping the feed'
sys.stdout.flush()
return

# check since when should the issues be fetched
stats = storage.field_stats(self.index, 'created')
if stats:
last_created = datetime.fromtimestamp(stats['max_value'] / 1000).strftime('"%Y-%m-%d %H:%M"')
print 'the freshest issue was created', last_created
else:
last_created = "startOfDay()"
print 'could not fetch stats from the storage; using', last_created

# fetch issues
issues = jira.get_issues('project=%s and created>="%s"' % (PROJECT_NAME, self.last_called),
issues = jira.get_issues('project=%s and created>=%s' % (PROJECT_NAME, last_created),
fields=('created',),
verbose=True)
print 'got', len(issues), 'issues'
Expand All @@ -55,27 +60,28 @@ def __call__(self, jira, storage):
# store the issues
erroneous_issues = []
n = len(issues) - 1
BAR_LENGTH = 30
for i, jira_issue in enumerate(issues):
# convert
issue = CreatedIssuesFeed.Issue(jira_issue)

# store
res = storage.put(self.index, self.type, issue.issue_key, issue)
if res is None:
erroneous_issues.append(issue.issue_key)
if n >= 0:
if n == 0:
n = 1
BAR_LENGTH = 30
for i, jira_issue in enumerate(issues):
# convert
issue = CreatedIssuesFeed.Issue(jira_issue)

# log progress
if (i * 100 / n) % 10 == 0:
u = i * BAR_LENGTH / n
v = BAR_LENGTH + 1 - u
print '|' + u * '=' + '>' + v * ' ' + '|'
sys.stdout.flush()
print 'stored', len(issues), 'issues'
if len(erroneous_issues) > 0:
print len(erroneous_issues), 'FAILED:', '\n '.join([''] + erroneous_issues)
sys.stdout.flush()
# store
res = storage.put(self.index, self.type, issue.issue_key, issue)
if res is None:
erroneous_issues.append(issue.issue_key)

self.last_called = now
# log progress
if (i * 100 / n) % 10 == 0:
u = i * BAR_LENGTH / n
v = BAR_LENGTH + 1 - u
print '|' + u * '=' + '>' + v * ' ' + '|'
sys.stdout.flush()
print 'stored', len(issues), 'issues'
if len(erroneous_issues) > 0:
print len(erroneous_issues), 'FAILED:', '\n '.join([''] + erroneous_issues)
sys.stdout.flush()

# def

0 comments on commit cd5db9a

Please sign in to comment.