Skip to content

Commit

Permalink
Split up artifact cache code.
Browse files Browse the repository at this point in the history
(sapling split of 33f6bdd7511db229d81b82da0eb527b36187b742)
  • Loading branch information
Benjy committed May 30, 2013
1 parent ceb2bff commit d3bab89
Show file tree
Hide file tree
Showing 8 changed files with 241 additions and 263 deletions.
23 changes: 23 additions & 0 deletions src/python/twitter/pants/cache/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1,24 @@
from twitter.common.lang import Compatibility
from twitter.pants.cache.combined_artifact_cache import CombinedArtifactCache
from twitter.pants.cache.file_based_artifact_cache import FileBasedArtifactCache
from twitter.pants.cache.restful_artifact_cache import RESTfulArtifactCache


def create_artifact_cache(context, artifact_root, spec):
"""Returns an artifact cache for the specified spec.
If config is a string, it's interpreted as a path or URL prefix to a cache root. If it's a list of
strings, it returns an appropriate combined cache.
"""
if not spec:
raise ValueError('Empty artifact cache spec')
if isinstance(spec, Compatibility.string):
if spec.startswith('/'):
return FileBasedArtifactCache(context, artifact_root, spec)
elif spec.startswith('http://') or spec.startswith('https://'):
return RESTfulArtifactCache(context, artifact_root, spec)
else:
raise ValueError('Invalid artifact cache spec: %s' % spec)
elif isinstance(spec, (list, tuple)):
caches = [ create_artifact_cache(context, artifact_root, x) for x in spec ]
return CombinedArtifactCache(caches)
240 changes: 0 additions & 240 deletions src/python/twitter/pants/cache/artifact_cache.py
Original file line number Diff line number Diff line change
@@ -1,52 +1,9 @@
# ==================================================================================================
# Copyright 2012 Twitter, Inc.
# --------------------------------------------------------------------------------------------------
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this work except in compliance with the License.
# You may obtain a copy of the License in the LICENSE file, or at:
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==================================================================================================

import httplib
import os
import shutil
import urlparse

from twitter.common.contextutil import open_tar, temporary_file, temporary_file_path
from twitter.common.dirutil import safe_mkdir, safe_rmtree
from twitter.common.lang import Compatibility
from twitter.common.quantity import Amount, Data

# Note throughout the distinction between the artifact_root (which is where the artifacts are
# originally built and where the cache restores them to) and the cache root path/URL (which is
# where the artifacts are cached).

def create_artifact_cache(context, artifact_root, spec):
"""Returns an artifact cache for the specified spec.
If config is a string, it's interpreted as a path or URL prefix to a cache root. If it's a list of
strings, it returns an appropriate combined cache.
"""
if not spec:
raise ValueError('Empty artifact cache spec')
if isinstance(spec, Compatibility.string):
if spec.startswith('/'):
return FileBasedArtifactCache(context, artifact_root, spec)
elif spec.startswith('http://') or spec.startswith('https://'):
return RESTfulArtifactCache(context, artifact_root, spec)
else:
raise ValueError('Invalid artifact cache spec: %s' % spec)
elif isinstance(spec, (list, tuple)):
caches = [ create_artifact_cache(context, artifact_root, x) for x in spec ]
return CombinedArtifactCache(caches)


class ArtifactCache(object):
"""A map from cache key to a set of build artifacts.
Expand Down Expand Up @@ -126,200 +83,3 @@ def delete(self, cache_key):
pass


class FileBasedArtifactCache(ArtifactCache):
"""An artifact cache that stores the artifacts in local files."""
def __init__(self, context, artifact_root, cache_root, copy_fn=None):
"""
cache_root: The locally cached files are stored under this directory.
copy_fn: An optional function with the signature copy_fn(absolute_src_path, relative_dst_path) that
will copy cached files into the desired destination. If unspecified, a simple file copy is used.
"""
ArtifactCache.__init__(self, context, artifact_root)
self._cache_root = cache_root
self._copy_fn = copy_fn or (
lambda src, rel_dst: shutil.copy(src, os.path.join(self.artifact_root, rel_dst)))
safe_mkdir(self._cache_root)

def try_insert(self, cache_key, build_artifacts):
cache_dir = self._cache_dir_for_key(cache_key)
safe_rmtree(cache_dir)
for artifact in build_artifacts or ():
rel_path = os.path.relpath(artifact, self.artifact_root)

if rel_path.startswith('..'):
raise self.CacheError('Artifact %s is not under artifact root %s' % (artifact,
self.artifact_root))

artifact_dest = os.path.join(cache_dir, rel_path)
safe_mkdir(os.path.dirname(artifact_dest))
if os.path.isdir(artifact):
shutil.copytree(artifact, artifact_dest)
else:
shutil.copy(artifact, artifact_dest)

def has(self, cache_key):
return os.path.isdir(self._cache_dir_for_key(cache_key))

def use_cached_files(self, cache_key):
cache_dir = self._cache_dir_for_key(cache_key)
if not os.path.exists(cache_dir):
return False
for dir_name, _, filenames in os.walk(cache_dir):
for filename in filenames:
filename = os.path.join(dir_name, filename)
relative_filename = os.path.relpath(filename, cache_dir)
self._copy_fn(filename, relative_filename)
return True

def delete(self, cache_key):
safe_rmtree(self._cache_dir_for_key(cache_key))

def _cache_dir_for_key(self, cache_key):
# Note: it's important to use the id as well as the hash, because two different targets
# may have the same hash if both have no sources, but we may still want to differentiate them.
return os.path.join(self._cache_root, cache_key.id, cache_key.hash)


class RESTfulArtifactCache(ArtifactCache):
"""An artifact cache that stores the artifacts on a RESTful service."""

READ_SIZE = Amount(4, Data.MB).as_(Data.BYTES)

def __init__(self, context, artifact_root, url_base, compress=True):
"""
url_base: The prefix for urls on some RESTful service. We must be able to PUT and GET to any
path under this base.
compress: Whether to compress the artifacts before storing them.
"""
ArtifactCache.__init__(self, context, artifact_root)
parsed_url = urlparse.urlparse(url_base)
if parsed_url.scheme == 'http':
self._ssl = False
elif parsed_url.scheme == 'https':
self._ssl = True
else:
raise ValueError('RESTfulArtifactCache only supports HTTP and HTTPS')
self._timeout_secs = 2.0
self._netloc = parsed_url.netloc
self._path_prefix = parsed_url.path.rstrip('/')
self.compress = compress

def try_insert(self, cache_key, build_artifacts):
with temporary_file_path() as tarfile:
mode = 'w:bz2' if self.compress else 'w'
with open_tar(tarfile, mode, dereference=True) as tarout:
for artifact in build_artifacts:
# Adds dirs recursively.
tarout.add(artifact, os.path.relpath(artifact, self.artifact_root))

with open(tarfile, 'rb') as infile:
path = self._path_for_key(cache_key)
if not self._request('PUT', path, body=infile):
raise self.CacheError('Failed to PUT to %s. Error: 404' % self._url_string(path))

def has(self, cache_key):
return self._request('HEAD', self._path_for_key(cache_key)) is not None

def use_cached_files(self, cache_key):
# This implementation fetches the appropriate tarball and extracts it.
path = self._path_for_key(cache_key)
try:
# Send an HTTP request for the tarball.
response = self._request('GET', path)
if response is None:
return False
expected_size = int(response.getheader('content-length', -1))
if expected_size == -1:
raise self.CacheError('No content-length header in HTTP response')

done = False
if self.context:
self.context.log.info('Reading %d bytes from artifact cache at %s' %
(expected_size, self._url_string(path)))
# Read the data in a loop.
with temporary_file() as outfile:
total_bytes = 0
while not done:
data = response.read(self.READ_SIZE)
outfile.write(data)
if len(data) < self.READ_SIZE:
done = True
total_bytes += len(data)
if self.context:
self.context.log.debug('Read %d bytes' % total_bytes)
outfile.close()
# Check the size.
if total_bytes != expected_size:
raise self.CacheError('Read only %d bytes from %d expected' % (total_bytes,
expected_size))
# Extract the tarfile.
mode = 'r:bz2' if self.compress else 'r'
with open_tar(outfile.name, mode) as tarfile:
tarfile.extractall(self.artifact_root)
return True
except Exception, e:
if self.context:
self.context.log.warn('Error while reading from artifact cache: %s' % e)
return False

def delete(self, cache_key):
path = self._path_for_key(cache_key)
self._request('DELETE', path)

def _path_for_key(self, cache_key):
# Note: it's important to use the id as well as the hash, because two different targets
# may have the same hash if both have no sources, but we may still want to differentiate them.
return '%s/%s/%s.tar.bz2' % (self._path_prefix, cache_key.id, cache_key.hash)

def _connect(self):
if self._ssl:
return httplib.HTTPSConnection(self._netloc, timeout=self._timeout_secs)
else:
return httplib.HTTPConnection(self._netloc, timeout=self._timeout_secs)

# Returns a response if we get a 200, None if we get a 404 and raises an exception otherwise.
def _request(self, method, path, body=None):
if self.context:
self.context.log.debug('Sending %s request to %s' % (method, self._url_string(path)))
# TODO(benjy): Keep connection open and reuse?
conn = self._connect()
conn.request(method, path, body=body)
response = conn.getresponse()
# TODO: Can HEAD return 204? It would be correct, but I've not seen it happen.
if response.status == 200:
return response
elif response.status == 404:
return None
else:
raise self.CacheError('Failed to %s %s. Error: %d %s' % (method, self._url_string(path),
response.status, response.reason))

def _url_string(self, path):
return '%s://%s%s' % (('https' if self._ssl else 'http'), self._netloc, path)


class CombinedArtifactCache(ArtifactCache):
"""An artifact cache that delegates to a list of other caches."""
def __init__(self, artifact_caches):
if not artifact_caches:
raise ValueError('Must provide at least one underlying artifact cache')
context = artifact_caches[0].context
artifact_root = artifact_caches[0].artifact_root
if any(x.context != context or x.artifact_root != artifact_root for x in artifact_caches):
raise ValueError('Combined artifact caches must all have the same artifact root.')
ArtifactCache.__init__(self, context, artifact_root)
self._artifact_caches = artifact_caches

def insert(self, cache_key, build_artifacts):
for cache in self._artifact_caches: # Insert into all.
cache.insert(cache_key, build_artifacts)

def has(self, cache_key):
return any(cache.has(cache_key) for cache in self._artifact_caches)

def use_cached_files(self, cache_key):
return any(cache.use_cached_files(cache_key) for cache in self._artifact_caches)

def delete(self, cache_key):
for cache in self._artifact_caches: # Delete from all.
cache.delete(cache_key)
28 changes: 28 additions & 0 deletions src/python/twitter/pants/cache/combined_artifact_cache.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
from twitter.pants.cache.artifact_cache import ArtifactCache


class CombinedArtifactCache(ArtifactCache):
"""An artifact cache that delegates to a list of other caches."""
def __init__(self, artifact_caches):
if not artifact_caches:
raise ValueError('Must provide at least one underlying artifact cache')
context = artifact_caches[0].context
artifact_root = artifact_caches[0].artifact_root
if any(x.context != context or x.artifact_root != artifact_root for x in artifact_caches):
raise ValueError('Combined artifact caches must all have the same artifact root.')
ArtifactCache.__init__(self, context, artifact_root)
self._artifact_caches = artifact_caches

def insert(self, cache_key, build_artifacts):
for cache in self._artifact_caches: # Insert into all.
cache.insert(cache_key, build_artifacts)

def has(self, cache_key):
return any(cache.has(cache_key) for cache in self._artifact_caches)

def use_cached_files(self, cache_key):
return any(cache.use_cached_files(cache_key) for cache in self._artifact_caches)

def delete(self, cache_key):
for cache in self._artifact_caches: # Delete from all.
cache.delete(cache_key)
58 changes: 58 additions & 0 deletions src/python/twitter/pants/cache/file_based_artifact_cache.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
import os
import shutil
from twitter.common.dirutil import safe_mkdir, safe_rmtree
from twitter.pants.cache.artifact_cache import ArtifactCache


class FileBasedArtifactCache(ArtifactCache):
"""An artifact cache that stores the artifacts in local files."""
def __init__(self, context, artifact_root, cache_root, copy_fn=None):
"""
cache_root: The locally cached files are stored under this directory.
copy_fn: An optional function with the signature copy_fn(absolute_src_path, relative_dst_path) that
will copy cached files into the desired destination. If unspecified, a simple file copy is used.
"""
ArtifactCache.__init__(self, context, artifact_root)
self._cache_root = cache_root
self._copy_fn = copy_fn or (
lambda src, rel_dst: shutil.copy(src, os.path.join(self.artifact_root, rel_dst)))
safe_mkdir(self._cache_root)

def try_insert(self, cache_key, build_artifacts):
cache_dir = self._cache_dir_for_key(cache_key)
safe_rmtree(cache_dir)
for artifact in build_artifacts or ():
rel_path = os.path.relpath(artifact, self.artifact_root)

if rel_path.startswith('..'):
raise self.CacheError('Artifact %s is not under artifact root %s' % (artifact,
self.artifact_root))

artifact_dest = os.path.join(cache_dir, rel_path)
safe_mkdir(os.path.dirname(artifact_dest))
if os.path.isdir(artifact):
shutil.copytree(artifact, artifact_dest)
else:
shutil.copy(artifact, artifact_dest)

def has(self, cache_key):
return os.path.isdir(self._cache_dir_for_key(cache_key))

def use_cached_files(self, cache_key):
cache_dir = self._cache_dir_for_key(cache_key)
if not os.path.exists(cache_dir):
return False
for dir_name, _, filenames in os.walk(cache_dir):
for filename in filenames:
filename = os.path.join(dir_name, filename)
relative_filename = os.path.relpath(filename, cache_dir)
self._copy_fn(filename, relative_filename)
return True

def delete(self, cache_key):
safe_rmtree(self._cache_dir_for_key(cache_key))

def _cache_dir_for_key(self, cache_key):
# Note: it's important to use the id as well as the hash, because two different targets
# may have the same hash if both have no sources, but we may still want to differentiate them.
return os.path.join(self._cache_root, cache_key.id, cache_key.hash)
Loading

0 comments on commit d3bab89

Please sign in to comment.