forked from fabric8-analytics/fabric8-analytics-scaler
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsqs_status.py
executable file
·84 lines (63 loc) · 2.5 KB
/
sqs_status.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
#!/usr/bin/env python3
import argparse
import os
import sys
import math
import logging
import boto3
conn_args = {
'aws_access_key_id': os.getenv('AWS_SQS_ACCESS_KEY_ID'),
'aws_secret_access_key': os.getenv('AWS_SQS_SECRET_ACCESS_KEY'),
'region_name': 'us-east-1'
}
sqs = boto3.resource('sqs', **conn_args)
default_max_replicas = 6
default_min_replicas = 1
default_scale_coef = 100
logging.basicConfig(level=logging.WARNING)
logger = logging.getLogger('sqs_status')
def get_number_of_messages(queues_str):
"""Return approximate number of messages in given queue(s).
:param queues_str: a comma-separated list of queues to check, without deployment prefix
:return: approximate number of messages in given queues
"""
queues = [x.strip() for x in queues_str.split(',')]
total_count = 0
for queue_name in queues:
full_queue_name = '{p}_{q}'.format(p=os.environ.get('DEPLOYMENT_PREFIX'), q=queue_name)
try:
queue = sqs.get_queue_by_name(QueueName=full_queue_name)
total_count += int(queue.attributes.get('ApproximateNumberOfMessages') or 0)
except Exception as e:
logger.warning('Unable to check queue: {q}'.format(q=full_queue_name), exc_info=True)
continue
return total_count
def get_number_of_replicas(msg_count):
"""Return recommended number of replicas for given message count.
Min and max limits are honored.
:param msg_count: number of messages in the given queue
:return: recommended number of replicas
>>> get_number_of_replicas(1000)
6
>>> get_number_of_replicas(999)
6
>>> get_number_of_replicas(0)
1
>>> get_number_of_replicas(101)
2
"""
max_replicas = int(os.environ.get('MAX_REPLICAS', default_max_replicas))
min_replicas = int(os.environ.get('DEFAULT_REPLICAS', default_min_replicas))
scale_coef = int(os.environ.get('SCALE_COEF', default_scale_coef))
return max(min_replicas, min(int(math.ceil(msg_count/scale_coef)), max_replicas))
if __name__ == '__main__':
parser = argparse.ArgumentParser(
description='Report recommended number of worker replicas for given queue.')
parser.add_argument('-q', '--queue', help='queue to check')
args = parser.parse_args()
if not args.queue:
print(parser.format_usage())
sys.exit(1)
count = get_number_of_messages(queues_str=args.queue)
replicas = get_number_of_replicas(msg_count=count)
print('{count} {replicas}'.format(count=count, replicas=replicas))