Skip to content

Commit

Permalink
[engine] Storage clean ups (pantsbuild#4257)
Browse files Browse the repository at this point in the history
### Problem

`Storage` is due for clean up. 

1. `lmdb` is no longer useful because everything fits in memory, we do not need to pass objects across process boundaries.
2. For the same reason, there is no need to store object's serialized form.

### Solution

1. Removed lmdb storage.
2. Replace key to pickled object mapping with key to original object mapping.

Continue to use pickle to compute digest since from experiment we did in pantsbuild#2969 it's faster than msgpack or json for our use case. It's possible we can improve from here but will defer the optimization until that's necessary.
  • Loading branch information
peiyuwang authored Feb 14, 2017
1 parent d3d02b7 commit daf5dc4
Show file tree
Hide file tree
Showing 10 changed files with 51 additions and 294 deletions.
1 change: 0 additions & 1 deletion 3rdparty/python/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ docutils>=0.12,<0.13
fasteners==0.14.1
futures==3.0.5
isort==4.2.5
lmdb==0.89
Markdown==2.1.1
mock==1.3.0
packaging==16.8
Expand Down
2 changes: 0 additions & 2 deletions src/python/pants/engine/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -186,8 +186,6 @@ python_library(
name='storage',
sources=['storage.py'],
dependencies=[
'3rdparty/python:cffi',
'3rdparty/python:lmdb',
':nodes',
':objects',
'src/python/pants/util:dirutil',
Expand Down
5 changes: 0 additions & 5 deletions src/python/pants/engine/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,11 +120,6 @@ def product_request(self, product, subjects):
for computed_product in maybe_list(return_value, expected_type=product):
yield computed_product

def close(self):
"""Shutdown this engine instance, releasing resources it was using."""
self._storage.close()
self._cache.close()

def cache_stats(self):
"""Returns cache stats for the engine."""
return self._cache.get_stats()
Expand Down
8 changes: 0 additions & 8 deletions src/python/pants/engine/objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,11 +143,3 @@ def validate(self):
:raises: :class:`ValidationError` if this object is invalid.
"""


class Closable(object):
"""Marks a class that cleans up itself, usually at the end of lifecycle."""

def close(self):
"""A no-op implementation, overwrite to actually clean up resources etc."""
return
239 changes: 18 additions & 221 deletions src/python/pants/engine/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,37 +7,14 @@

import cPickle as pickle
import cStringIO as StringIO
import sys
from abc import abstractmethod
from binascii import hexlify
from collections import Counter
from contextlib import closing
from hashlib import sha1
from struct import Struct as StdlibStruct

import lmdb
import six

from pants.engine.nodes import State
from pants.engine.objects import Closable, SerializationError
from pants.util.dirutil import safe_mkdtemp
from pants.util.meta import AbstractClass


def _unpickle(value):
if isinstance(value, six.binary_type):
# Deserialize string values.
return pickle.loads(value)
# Deserialize values with file interface,
return pickle.load(value)


def _identity(value):
return value


def _copy_bytes(value):
return bytes(value)
from pants.engine.objects import SerializationError


class Key(object):
Expand Down Expand Up @@ -103,11 +80,11 @@ class InvalidKeyError(Exception):
"""Indicate an invalid `Key` entry"""


class Storage(Closable):
"""Stores and creates unique keys for input pickleable objects.
class Storage(object):
"""Stores and creates unique keys for input objects from their contents.
Storage as `Closable`, `close()` can be called either explicitly or through the `with`
statement in a context.
This assumes objects can fit in memory, therefore there is no need to store their
serialized form.
Besides contents indexed by their hashed Keys, a secondary index is also provided
for mappings between Keys. This allows to establish links between contents that
Expand All @@ -118,41 +95,19 @@ class Storage(Closable):
into keys, and vice versa are also provided.
"""

LMDB_KEY_MAPPINGS_DB_NAME = b'_key_mappings_'

@classmethod
def create(cls, path=None, in_memory=True, protocol=None):
def create(cls, protocol=None):
"""Create a content addressable Storage backed by a key value store.
:param path: If in_memory=False, the path to store the database in.
:param in_memory: Indicate whether to use the in-memory kvs or an embeded database.
:param protocol: Serialization protocol for pickle, if not provided will use ASCII protocol.
"""
if in_memory:
content, key_mappings = InMemoryDb(), InMemoryDb()
else:
content, key_mappings = Lmdb.create(path=path,
child_databases=[cls.LMDB_KEY_MAPPINGS_DB_NAME])
return Storage(protocol=protocol)

return Storage(content, key_mappings, protocol=protocol)

@classmethod
def clone(cls, storage):
"""Clone a Storage so it can be shared across process boundary."""
if isinstance(storage._contents, InMemoryDb):
contents, key_mappings = storage._contents, storage._key_mappings
else:
contents, key_mappings = Lmdb.create(path=storage._contents.path,
child_databases=[cls.LMDB_KEY_MAPPINGS_DB_NAME])

return Storage(contents, key_mappings, protocol=storage._protocol)

def __init__(self, contents, key_mappings, protocol=None):
def __init__(self, protocol=None):
"""Not for direct use: construct a Storage via either `create` or `clone`."""
self._contents = contents
self._key_mappings = key_mappings
self._objects = dict()
self._key_mappings = dict()
self._protocol = protocol if protocol is not None else pickle.HIGHEST_PROTOCOL
self._memo = dict()

def put(self, obj):
"""Serialize and hash something pickleable, returning a unique key to retrieve it later.
Expand All @@ -171,9 +126,8 @@ def put(self, obj):

# Hash the blob and store it if it does not exist.
key = Key.create(blob)
if key not in self._memo:
self._memo[key] = obj
self._contents.put(key.digest, blob)
if key not in self._objects:
self._objects[key] = obj
except Exception as e:
# Unfortunately, pickle can raise things other than PickleError instances. For example it
# will raise ValueError when handed a lambda; so we handle the otherwise overly-broad
Expand All @@ -191,10 +145,7 @@ def get(self, key):
if not isinstance(key, Key):
raise InvalidKeyError('Not a valid key: {}'.format(key))

obj = self._memo.get(key)
if obj is not None:
return obj
return self._contents.get(key.digest, _unpickle)
return self._objects.get(key)

def put_state(self, state):
"""Put the components of the State individually in storage, then put the aggregate."""
Expand All @@ -213,36 +164,18 @@ def add_mapping(self, from_key, to_key):
Unlike content storage, key mappings allows overwriting existing entries,
meaning a key can be re-mapped to a different key.
"""
self._key_mappings.put(key=from_key.digest,
value=pickle.dumps(to_key, protocol=self._protocol))
if from_key.digest not in self._key_mappings:
self._key_mappings[from_key.digest] = to_key

def get_mapping(self, from_key):
"""Retrieve the mapping Key from a given Key.
Noe is returned if the mapping does not exist.
None is returned if the mapping does not exist.
"""
to_key = self._key_mappings.get(key=from_key.digest)

if to_key is None:
return None

if isinstance(to_key, six.binary_type):
return pickle.loads(to_key)
return pickle.load(to_key)
return self._key_mappings.get(from_key.digest)

def close(self):
self._contents.close()

def _assert_type_matches(self, value, key_type):
"""Ensure the type of deserialized object matches the type from key."""
value_type = type(value)
if key_type and value_type is not key_type:
raise ValueError('Mismatch types, key: {}, value: {}'
.format(key_type, value_type))
return value


class Cache(Closable):
class Cache(object):
"""Cache the State resulting from a given Runnable."""

@classmethod
Expand Down Expand Up @@ -297,10 +230,6 @@ def items(self):
request = self._storage.get(request_key)
yield request, self._storage.get(self._storage.get_mapping(self._storage.put(request)))

def close(self):
# NB: This is a facade above a Storage instance, which is always closed independently.
pass


class CacheStats(Counter):
"""Record cache hits and misses."""
Expand Down Expand Up @@ -333,135 +262,3 @@ def total(self):

def __repr__(self):
return 'hits={}, misses={}, total={}'.format(self.hits, self.misses, self.total)


class KeyValueStore(Closable, AbstractClass):
@abstractmethod
def get(self, key, transform=_identity):
"""Fetch the value for a given key.
:param key: key in bytestring.
:param transform: optional function that is applied on the retrieved value from storage
before it is returned, since the original value may be only valid within the context.
:return: value can be either string-like or file-like, `None` if does not exist.
"""

@abstractmethod
def put(self, key, value, transform=_copy_bytes):
"""Save the value under a key, but only once.
The write once semantics is specifically provided for the content addressable use case.
:param key: key in bytestring.
:param value: value in bytestring.
:param transform: optional function that is applied on the input value before it is
saved to the storage, since the original value may be only valid within the context,
default is to play safe and make a copy.
:return: `True` to indicate the write actually happens, i.e, first write, `False` for
repeated writes of the same key.
"""

@abstractmethod
def items(self):
"""Generator to iterate over items.
For testing purpose.
"""


class InMemoryDb(KeyValueStore):
"""An in-memory implementation of the kvs interface."""

def __init__(self):
self._storage = dict()

def get(self, key, transform=_identity):
return transform(self._storage.get(key))

def put(self, key, value, transform=_copy_bytes):
if key in self._storage:
return False
self._storage[key] = transform(value)
return True

def items(self):
for k in iter(self._storage):
yield k, self._storage.get(k)


class Lmdb(KeyValueStore):
"""A lmdb implementation of the kvs interface."""

# TODO make this more configurable through a subsystem.

# 256GB - some arbitrary maximum size database may grow to.
MAX_DATABASE_SIZE = 256 * 1024 * 1024 * 1024

# writemap will use a writeable memory mapping to directly update storage, therefore
# improves performance. But it may cause filesystems that don’t support sparse files,
# such as OSX, to immediately preallocate map_size = bytes of underlying storage.
# See https://lmdb.readthedocs.org/en/release/#writemap-mode
USE_SPARSE_FILES = sys.platform != 'darwin'

@classmethod
def create(self, path=None, child_databases=None):
"""
:param path: Database directory location, if `None` a temporary location will be provided
and cleaned up upon process exit.
:param child_databases: Optional child database names.
:return: List of Lmdb databases, main database under the path is always created,
plus the child databases requested.
"""
path = path if path is not None else safe_mkdtemp()
child_databases = child_databases or []
env = lmdb.open(path, map_size=self.MAX_DATABASE_SIZE,
metasync=False, sync=False, map_async=True,
writemap=self.USE_SPARSE_FILES,
max_dbs=1+len(child_databases))
instances = [Lmdb(env)]
for child_db in child_databases:
instances.append(Lmdb(env, env.open_db(child_db)))
return tuple(instances)

def __init__(self, env, db=None):
"""Not for direct use, use factory method `create`.
db if None represents the main database.
"""
self._env = env
self._db = db

@property
def path(self):
return self._env.path()

def get(self, key, transform=_identity):
"""Return the value or `None` if the key does not exist.
NB: Memory mapped storage returns a buffer object without copying keys or values, which
is then wrapped with `StringIO` as the more friendly string buffer to allow `pickle.load`
to read, again no copy involved.
"""
with self._env.begin(db=self._db, buffers=True) as txn:
value = txn.get(key)
if value is not None:
return transform(StringIO.StringIO(value))
return None

def put(self, key, value, transform=_identity):
"""Returning True if the key/value are actually written to the storage.
No need to do additional transform since value is to be persisted.
"""
with self._env.begin(db=self._db, buffers=True, write=True) as txn:
return txn.put(key, transform(value), overwrite=False)

def items(self):
with self._env.begin(db=self._db, buffers=True) as txn:
cursor = txn.cursor()
for k, v in cursor:
yield k, v

def close(self):
"""Close the lmdb environment, calling multiple times has no effect."""
self._env.close()
2 changes: 1 addition & 1 deletion src/python/pants/engine/subsystem/native.py
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,7 @@ class ObjectIdMap(object):

def __init__(self):
# Objects indexed by their keys, i.e, content digests
self._objects = Storage.create(in_memory=True)
self._objects = Storage.create()
# Memoized object Ids.
self._id_to_key = dict()
self._key_to_id = dict()
Expand Down
9 changes: 3 additions & 6 deletions tests/python/pants_test/engine/legacy/test_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,9 @@ def open_scheduler(self, specs, symbol_table_cls=None):
graph_helper = EngineInitializer.setup_legacy_graph(path_ignore_patterns,
symbol_table_cls=symbol_table_cls,
native=self._native)
try:
graph = graph_helper.create_build_graph(target_roots)[0]
addresses = tuple(graph.inject_specs_closure(target_roots.as_specs()))
yield graph, addresses, graph_helper.scheduler
finally:
graph_helper.engine.close()
graph = graph_helper.create_build_graph(target_roots)[0]
addresses = tuple(graph.inject_specs_closure(target_roots.as_specs()))
yield graph, addresses, graph_helper.scheduler

def test_invalidate_fsnode(self):
with self.open_scheduler(['3rdparty/python::']) as (_, _, scheduler):
Expand Down
Loading

0 comments on commit daf5dc4

Please sign in to comment.