Skip to content

Commit

Permalink
Merge "Allow smaller segments in static large objects"
Browse files Browse the repository at this point in the history
  • Loading branch information
Jenkins authored and openstack-gerrit committed Jan 23, 2016
2 parents fcdb2fa + 7f636a5 commit 222649d
Show file tree
Hide file tree
Showing 6 changed files with 288 additions and 153 deletions.
9 changes: 6 additions & 3 deletions etc/proxy-server.conf-sample
Original file line number Diff line number Diff line change
Expand Up @@ -629,14 +629,17 @@ use = egg:swift#bulk
use = egg:swift#slo
# max_manifest_segments = 1000
# max_manifest_size = 2097152
# min_segment_size = 1048576
# Start rate-limiting SLO segment serving after the Nth segment of a
#
# Rate limiting applies only to segments smaller than this size (bytes).
# rate_limit_under_size = 1048576
#
# Start rate-limiting SLO segment serving after the Nth small segment of a
# segmented object.
# rate_limit_after_segment = 10
#
# Once segment rate-limiting kicks in for an object, limit segments served
# to N per second. 0 means no rate-limiting.
# rate_limit_segments_per_sec = 0
# rate_limit_segments_per_sec = 1
#
# Time limit on GET requests (seconds)
# max_get_time = 86400
Expand Down
59 changes: 32 additions & 27 deletions swift/common/middleware/slo.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,11 @@
"range": "1048576-2097151"}, ...]
The number of object segments is limited to a configurable amount, default
1000. Each segment, except for the final one, must be at least 1 megabyte
(configurable). On upload, the middleware will head every segment passed in to
verify:
1000. Each segment must be at least 1 byte. On upload, the middleware will
head every segment passed in to verify:
1. the segment exists (i.e. the HEAD was successful);
2. the segment meets minimum size requirements (if not the last segment);
2. the segment meets minimum size requirements;
3. if the user provided a non-null etag, the etag matches;
4. if the user provided a non-null size_bytes, the size_bytes matches; and
5. if the user provided a range, it is a singular, syntactically correct range
Expand Down Expand Up @@ -121,8 +120,9 @@
.. note::
The minimum sized range is min_segment_size, which by
default is 1048576 (1MB).
The minimum sized range is 1 byte. This is the same as the minimum
segment size.
-------------------------
Expand Down Expand Up @@ -221,7 +221,7 @@
ACCEPTABLE_FORMATS, Bulk


DEFAULT_MIN_SEGMENT_SIZE = 1024 * 1024 # 1 MiB
DEFAULT_RATE_LIMIT_UNDER_SIZE = 1024 * 1024 # 1 MiB
DEFAULT_MAX_MANIFEST_SEGMENTS = 1000
DEFAULT_MAX_MANIFEST_SIZE = 1024 * 1024 * 2 # 2 MiB

Expand All @@ -231,7 +231,7 @@
ALLOWED_SLO_KEYS = REQUIRED_SLO_KEYS | OPTIONAL_SLO_KEYS


def parse_and_validate_input(req_body, req_path, min_segment_size):
def parse_and_validate_input(req_body, req_path):
"""
Given a request body, parses it and returns a list of dictionaries.
Expand Down Expand Up @@ -269,7 +269,6 @@ def parse_and_validate_input(req_body, req_path, min_segment_size):
vrs, account, _junk = split_path(req_path, 3, 3, True)

errors = []
num_segs = len(parsed_data)
for seg_index, seg_dict in enumerate(parsed_data):
if not isinstance(seg_dict, dict):
errors.append("Index %d: not a JSON object" % seg_index)
Expand Down Expand Up @@ -315,10 +314,10 @@ def parse_and_validate_input(req_body, req_path, min_segment_size):
except (TypeError, ValueError):
errors.append("Index %d: invalid size_bytes" % seg_index)
continue
if (seg_size < min_segment_size and seg_index < num_segs - 1):
errors.append("Index %d: too small; each segment, except "
"the last, must be at least %d bytes."
% (seg_index, min_segment_size))
if seg_size < 1:
errors.append("Index %d: too small; each segment must be "
"at least 1 byte."
% (seg_index,))
continue

obj_path = '/'.join(['', vrs, account, seg_dict['path'].lstrip('/')])
Expand Down Expand Up @@ -662,10 +661,17 @@ def _manifest_get_response(self, req, content_length, response_headers,
plain_listing_iter = self._segment_listing_iterator(
req, ver, account, segments)

def is_small_segment((seg_dict, start_byte, end_byte)):
start = 0 if start_byte is None else start_byte
end = int(seg_dict['bytes']) - 1 if end_byte is None else end_byte
is_small = (end - start + 1) < self.slo.rate_limit_under_size
return is_small

ratelimited_listing_iter = RateLimitedIterator(
plain_listing_iter,
self.slo.rate_limit_segments_per_sec,
limit_after=self.slo.rate_limit_after_segment)
limit_after=self.slo.rate_limit_after_segment,
ratelimit_if=is_small_segment)

# self._segment_listing_iterator gives us 3-tuples of (segment dict,
# start byte, end byte), but SegmentedIterable wants (obj path, etag,
Expand Down Expand Up @@ -716,20 +722,21 @@ class StaticLargeObject(object):
:param conf: The configuration dict for the middleware.
"""

def __init__(self, app, conf, min_segment_size=DEFAULT_MIN_SEGMENT_SIZE,
def __init__(self, app, conf,
max_manifest_segments=DEFAULT_MAX_MANIFEST_SEGMENTS,
max_manifest_size=DEFAULT_MAX_MANIFEST_SIZE):
self.conf = conf
self.app = app
self.logger = get_logger(conf, log_route='slo')
self.max_manifest_segments = max_manifest_segments
self.max_manifest_size = max_manifest_size
self.min_segment_size = min_segment_size
self.max_get_time = int(self.conf.get('max_get_time', 86400))
self.rate_limit_under_size = int(self.conf.get(
'rate_limit_under_size', DEFAULT_RATE_LIMIT_UNDER_SIZE))
self.rate_limit_after_segment = int(self.conf.get(
'rate_limit_after_segment', '10'))
self.rate_limit_segments_per_sec = int(self.conf.get(
'rate_limit_segments_per_sec', '0'))
'rate_limit_segments_per_sec', '1'))
self.bulk_deleter = Bulk(app, {}, logger=self.logger)

def handle_multipart_get_or_head(self, req, start_response):
Expand Down Expand Up @@ -783,7 +790,7 @@ def handle_multipart_put(self, req, start_response):
raise HTTPLengthRequired(request=req)
parsed_data = parse_and_validate_input(
req.body_file.read(self.max_manifest_size),
req.path, self.min_segment_size)
req.path)
problem_segments = []

if len(parsed_data) > self.max_manifest_segments:
Expand Down Expand Up @@ -812,6 +819,7 @@ def handle_multipart_put(self, req, start_response):
new_env['CONTENT_LENGTH'] = 0
new_env['HTTP_USER_AGENT'] = \
'%s MultipartPUT' % req.environ.get('HTTP_USER_AGENT')

if obj_path != last_obj_path:
last_obj_path = obj_path
head_seg_resp = \
Expand Down Expand Up @@ -840,12 +848,10 @@ def handle_multipart_put(self, req, start_response):
seg_dict['range'] = '%d-%d' % (rng[0], rng[1] - 1)
segment_length = rng[1] - rng[0]

if segment_length < self.min_segment_size and \
index < len(parsed_data) - 1:
if segment_length < 1:
problem_segments.append(
[quote(obj_name),
'Too small; each segment, except the last, must be '
'at least %d bytes.' % self.min_segment_size])
'Too small; each segment must be at least 1 byte.'])
total_size += segment_length
if seg_dict['size_bytes'] is not None and \
seg_dict['size_bytes'] != head_seg_resp.content_length:
Expand Down Expand Up @@ -1045,18 +1051,17 @@ def filter_factory(global_conf, **local_conf):
DEFAULT_MAX_MANIFEST_SEGMENTS))
max_manifest_size = int(conf.get('max_manifest_size',
DEFAULT_MAX_MANIFEST_SIZE))
min_segment_size = int(conf.get('min_segment_size',
DEFAULT_MIN_SEGMENT_SIZE))

register_swift_info('slo',
max_manifest_segments=max_manifest_segments,
max_manifest_size=max_manifest_size,
min_segment_size=min_segment_size)
# this used to be configurable; report it as 1 for
# clients that might still care
min_segment_size=1)

def slo_filter(app):
return StaticLargeObject(
app, conf,
max_manifest_segments=max_manifest_segments,
max_manifest_size=max_manifest_size,
min_segment_size=min_segment_size)
max_manifest_size=max_manifest_size)
return slo_filter
19 changes: 12 additions & 7 deletions swift/common/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -1044,22 +1044,27 @@ class RateLimitedIterator(object):
this many elements; default is 0 (rate limit
immediately)
"""
def __init__(self, iterable, elements_per_second, limit_after=0):
def __init__(self, iterable, elements_per_second, limit_after=0,
ratelimit_if=lambda _junk: True):
self.iterator = iter(iterable)
self.elements_per_second = elements_per_second
self.limit_after = limit_after
self.running_time = 0
self.ratelimit_if = ratelimit_if

def __iter__(self):
return self

def next(self):
if self.limit_after > 0:
self.limit_after -= 1
else:
self.running_time = ratelimit_sleep(self.running_time,
self.elements_per_second)
return next(self.iterator)
next_value = next(self.iterator)

if self.ratelimit_if(next_value):
if self.limit_after > 0:
self.limit_after -= 1
else:
self.running_time = ratelimit_sleep(self.running_time,
self.elements_per_second)
return next_value


class GreenthreadSafeIterator(object):
Expand Down
9 changes: 5 additions & 4 deletions test/unit/common/middleware/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ def __init__(self):
self.container_ring = FakeRing()
self.get_object_ring = lambda policy_index: FakeRing()

def _get_response(self, method, path):
def _find_response(self, method, path):
resp = self._responses[(method, path)]
if isinstance(resp, list):
try:
Expand Down Expand Up @@ -84,16 +84,17 @@ def __call__(self, env, start_response):
self.swift_sources.append(env.get('swift.source'))

try:
resp_class, raw_headers, body = self._get_response(method, path)
resp_class, raw_headers, body = self._find_response(method, path)
headers = swob.HeaderKeyDict(raw_headers)
except KeyError:
if (env.get('QUERY_STRING')
and (method, env['PATH_INFO']) in self._responses):
resp_class, raw_headers, body = self._get_response(
resp_class, raw_headers, body = self._find_response(
method, env['PATH_INFO'])
headers = swob.HeaderKeyDict(raw_headers)
elif method == 'HEAD' and ('GET', path) in self._responses:
resp_class, raw_headers, body = self._get_response('GET', path)
resp_class, raw_headers, body = self._find_response(
'GET', path)
body = None
headers = swob.HeaderKeyDict(raw_headers)
elif method == 'GET' and obj and path in self.uploaded:
Expand Down
Loading

0 comments on commit 222649d

Please sign in to comment.