From d3bab89545085fb45bd070029ab1dacbedb6057e Mon Sep 17 00:00:00 2001 From: Benjy Date: Thu, 30 May 2013 14:21:52 -0700 Subject: [PATCH] Split up artifact cache code. (sapling split of 33f6bdd7511db229d81b82da0eb527b36187b742) --- src/python/twitter/pants/cache/__init__.py | 23 ++ .../twitter/pants/cache/artifact_cache.py | 240 ------------------ .../pants/cache/combined_artifact_cache.py | 28 ++ .../pants/cache/file_based_artifact_cache.py | 58 +++++ .../pants/cache/restful_artifact_cache.py | 124 +++++++++ .../twitter/pants/python/python_chroot.py | 2 +- src/python/twitter/pants/tasks/__init__.py | 2 +- .../pants/cache/test_artifact_cache.py | 27 +- 8 files changed, 241 insertions(+), 263 deletions(-) create mode 100644 src/python/twitter/pants/cache/combined_artifact_cache.py create mode 100644 src/python/twitter/pants/cache/file_based_artifact_cache.py create mode 100644 src/python/twitter/pants/cache/restful_artifact_cache.py diff --git a/src/python/twitter/pants/cache/__init__.py b/src/python/twitter/pants/cache/__init__.py index 8b137891791..ded16c1c1cc 100644 --- a/src/python/twitter/pants/cache/__init__.py +++ b/src/python/twitter/pants/cache/__init__.py @@ -1 +1,24 @@ +from twitter.common.lang import Compatibility +from twitter.pants.cache.combined_artifact_cache import CombinedArtifactCache +from twitter.pants.cache.file_based_artifact_cache import FileBasedArtifactCache +from twitter.pants.cache.restful_artifact_cache import RESTfulArtifactCache + +def create_artifact_cache(context, artifact_root, spec): + """Returns an artifact cache for the specified spec. + + If config is a string, it's interpreted as a path or URL prefix to a cache root. If it's a list of + strings, it returns an appropriate combined cache. + """ + if not spec: + raise ValueError('Empty artifact cache spec') + if isinstance(spec, Compatibility.string): + if spec.startswith('/'): + return FileBasedArtifactCache(context, artifact_root, spec) + elif spec.startswith('http://') or spec.startswith('https://'): + return RESTfulArtifactCache(context, artifact_root, spec) + else: + raise ValueError('Invalid artifact cache spec: %s' % spec) + elif isinstance(spec, (list, tuple)): + caches = [ create_artifact_cache(context, artifact_root, x) for x in spec ] + return CombinedArtifactCache(caches) diff --git a/src/python/twitter/pants/cache/artifact_cache.py b/src/python/twitter/pants/cache/artifact_cache.py index 7467965bddf..7b62cc7b247 100644 --- a/src/python/twitter/pants/cache/artifact_cache.py +++ b/src/python/twitter/pants/cache/artifact_cache.py @@ -1,52 +1,9 @@ -# ================================================================================================== -# Copyright 2012 Twitter, Inc. -# -------------------------------------------------------------------------------------------------- -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this work except in compliance with the License. -# You may obtain a copy of the License in the LICENSE file, or at: -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# ================================================================================================== - -import httplib import os -import shutil -import urlparse - -from twitter.common.contextutil import open_tar, temporary_file, temporary_file_path -from twitter.common.dirutil import safe_mkdir, safe_rmtree -from twitter.common.lang import Compatibility -from twitter.common.quantity import Amount, Data # Note throughout the distinction between the artifact_root (which is where the artifacts are # originally built and where the cache restores them to) and the cache root path/URL (which is # where the artifacts are cached). -def create_artifact_cache(context, artifact_root, spec): - """Returns an artifact cache for the specified spec. - - If config is a string, it's interpreted as a path or URL prefix to a cache root. If it's a list of - strings, it returns an appropriate combined cache. - """ - if not spec: - raise ValueError('Empty artifact cache spec') - if isinstance(spec, Compatibility.string): - if spec.startswith('/'): - return FileBasedArtifactCache(context, artifact_root, spec) - elif spec.startswith('http://') or spec.startswith('https://'): - return RESTfulArtifactCache(context, artifact_root, spec) - else: - raise ValueError('Invalid artifact cache spec: %s' % spec) - elif isinstance(spec, (list, tuple)): - caches = [ create_artifact_cache(context, artifact_root, x) for x in spec ] - return CombinedArtifactCache(caches) - class ArtifactCache(object): """A map from cache key to a set of build artifacts. @@ -126,200 +83,3 @@ def delete(self, cache_key): pass -class FileBasedArtifactCache(ArtifactCache): - """An artifact cache that stores the artifacts in local files.""" - def __init__(self, context, artifact_root, cache_root, copy_fn=None): - """ - cache_root: The locally cached files are stored under this directory. - copy_fn: An optional function with the signature copy_fn(absolute_src_path, relative_dst_path) that - will copy cached files into the desired destination. If unspecified, a simple file copy is used. - """ - ArtifactCache.__init__(self, context, artifact_root) - self._cache_root = cache_root - self._copy_fn = copy_fn or ( - lambda src, rel_dst: shutil.copy(src, os.path.join(self.artifact_root, rel_dst))) - safe_mkdir(self._cache_root) - - def try_insert(self, cache_key, build_artifacts): - cache_dir = self._cache_dir_for_key(cache_key) - safe_rmtree(cache_dir) - for artifact in build_artifacts or (): - rel_path = os.path.relpath(artifact, self.artifact_root) - - if rel_path.startswith('..'): - raise self.CacheError('Artifact %s is not under artifact root %s' % (artifact, - self.artifact_root)) - - artifact_dest = os.path.join(cache_dir, rel_path) - safe_mkdir(os.path.dirname(artifact_dest)) - if os.path.isdir(artifact): - shutil.copytree(artifact, artifact_dest) - else: - shutil.copy(artifact, artifact_dest) - - def has(self, cache_key): - return os.path.isdir(self._cache_dir_for_key(cache_key)) - - def use_cached_files(self, cache_key): - cache_dir = self._cache_dir_for_key(cache_key) - if not os.path.exists(cache_dir): - return False - for dir_name, _, filenames in os.walk(cache_dir): - for filename in filenames: - filename = os.path.join(dir_name, filename) - relative_filename = os.path.relpath(filename, cache_dir) - self._copy_fn(filename, relative_filename) - return True - - def delete(self, cache_key): - safe_rmtree(self._cache_dir_for_key(cache_key)) - - def _cache_dir_for_key(self, cache_key): - # Note: it's important to use the id as well as the hash, because two different targets - # may have the same hash if both have no sources, but we may still want to differentiate them. - return os.path.join(self._cache_root, cache_key.id, cache_key.hash) - - -class RESTfulArtifactCache(ArtifactCache): - """An artifact cache that stores the artifacts on a RESTful service.""" - - READ_SIZE = Amount(4, Data.MB).as_(Data.BYTES) - - def __init__(self, context, artifact_root, url_base, compress=True): - """ - url_base: The prefix for urls on some RESTful service. We must be able to PUT and GET to any - path under this base. - compress: Whether to compress the artifacts before storing them. - """ - ArtifactCache.__init__(self, context, artifact_root) - parsed_url = urlparse.urlparse(url_base) - if parsed_url.scheme == 'http': - self._ssl = False - elif parsed_url.scheme == 'https': - self._ssl = True - else: - raise ValueError('RESTfulArtifactCache only supports HTTP and HTTPS') - self._timeout_secs = 2.0 - self._netloc = parsed_url.netloc - self._path_prefix = parsed_url.path.rstrip('/') - self.compress = compress - - def try_insert(self, cache_key, build_artifacts): - with temporary_file_path() as tarfile: - mode = 'w:bz2' if self.compress else 'w' - with open_tar(tarfile, mode, dereference=True) as tarout: - for artifact in build_artifacts: - # Adds dirs recursively. - tarout.add(artifact, os.path.relpath(artifact, self.artifact_root)) - - with open(tarfile, 'rb') as infile: - path = self._path_for_key(cache_key) - if not self._request('PUT', path, body=infile): - raise self.CacheError('Failed to PUT to %s. Error: 404' % self._url_string(path)) - - def has(self, cache_key): - return self._request('HEAD', self._path_for_key(cache_key)) is not None - - def use_cached_files(self, cache_key): - # This implementation fetches the appropriate tarball and extracts it. - path = self._path_for_key(cache_key) - try: - # Send an HTTP request for the tarball. - response = self._request('GET', path) - if response is None: - return False - expected_size = int(response.getheader('content-length', -1)) - if expected_size == -1: - raise self.CacheError('No content-length header in HTTP response') - - done = False - if self.context: - self.context.log.info('Reading %d bytes from artifact cache at %s' % - (expected_size, self._url_string(path))) - # Read the data in a loop. - with temporary_file() as outfile: - total_bytes = 0 - while not done: - data = response.read(self.READ_SIZE) - outfile.write(data) - if len(data) < self.READ_SIZE: - done = True - total_bytes += len(data) - if self.context: - self.context.log.debug('Read %d bytes' % total_bytes) - outfile.close() - # Check the size. - if total_bytes != expected_size: - raise self.CacheError('Read only %d bytes from %d expected' % (total_bytes, - expected_size)) - # Extract the tarfile. - mode = 'r:bz2' if self.compress else 'r' - with open_tar(outfile.name, mode) as tarfile: - tarfile.extractall(self.artifact_root) - return True - except Exception, e: - if self.context: - self.context.log.warn('Error while reading from artifact cache: %s' % e) - return False - - def delete(self, cache_key): - path = self._path_for_key(cache_key) - self._request('DELETE', path) - - def _path_for_key(self, cache_key): - # Note: it's important to use the id as well as the hash, because two different targets - # may have the same hash if both have no sources, but we may still want to differentiate them. - return '%s/%s/%s.tar.bz2' % (self._path_prefix, cache_key.id, cache_key.hash) - - def _connect(self): - if self._ssl: - return httplib.HTTPSConnection(self._netloc, timeout=self._timeout_secs) - else: - return httplib.HTTPConnection(self._netloc, timeout=self._timeout_secs) - - # Returns a response if we get a 200, None if we get a 404 and raises an exception otherwise. - def _request(self, method, path, body=None): - if self.context: - self.context.log.debug('Sending %s request to %s' % (method, self._url_string(path))) - # TODO(benjy): Keep connection open and reuse? - conn = self._connect() - conn.request(method, path, body=body) - response = conn.getresponse() - # TODO: Can HEAD return 204? It would be correct, but I've not seen it happen. - if response.status == 200: - return response - elif response.status == 404: - return None - else: - raise self.CacheError('Failed to %s %s. Error: %d %s' % (method, self._url_string(path), - response.status, response.reason)) - - def _url_string(self, path): - return '%s://%s%s' % (('https' if self._ssl else 'http'), self._netloc, path) - - -class CombinedArtifactCache(ArtifactCache): - """An artifact cache that delegates to a list of other caches.""" - def __init__(self, artifact_caches): - if not artifact_caches: - raise ValueError('Must provide at least one underlying artifact cache') - context = artifact_caches[0].context - artifact_root = artifact_caches[0].artifact_root - if any(x.context != context or x.artifact_root != artifact_root for x in artifact_caches): - raise ValueError('Combined artifact caches must all have the same artifact root.') - ArtifactCache.__init__(self, context, artifact_root) - self._artifact_caches = artifact_caches - - def insert(self, cache_key, build_artifacts): - for cache in self._artifact_caches: # Insert into all. - cache.insert(cache_key, build_artifacts) - - def has(self, cache_key): - return any(cache.has(cache_key) for cache in self._artifact_caches) - - def use_cached_files(self, cache_key): - return any(cache.use_cached_files(cache_key) for cache in self._artifact_caches) - - def delete(self, cache_key): - for cache in self._artifact_caches: # Delete from all. - cache.delete(cache_key) diff --git a/src/python/twitter/pants/cache/combined_artifact_cache.py b/src/python/twitter/pants/cache/combined_artifact_cache.py new file mode 100644 index 00000000000..bd30b842c51 --- /dev/null +++ b/src/python/twitter/pants/cache/combined_artifact_cache.py @@ -0,0 +1,28 @@ +from twitter.pants.cache.artifact_cache import ArtifactCache + + +class CombinedArtifactCache(ArtifactCache): + """An artifact cache that delegates to a list of other caches.""" + def __init__(self, artifact_caches): + if not artifact_caches: + raise ValueError('Must provide at least one underlying artifact cache') + context = artifact_caches[0].context + artifact_root = artifact_caches[0].artifact_root + if any(x.context != context or x.artifact_root != artifact_root for x in artifact_caches): + raise ValueError('Combined artifact caches must all have the same artifact root.') + ArtifactCache.__init__(self, context, artifact_root) + self._artifact_caches = artifact_caches + + def insert(self, cache_key, build_artifacts): + for cache in self._artifact_caches: # Insert into all. + cache.insert(cache_key, build_artifacts) + + def has(self, cache_key): + return any(cache.has(cache_key) for cache in self._artifact_caches) + + def use_cached_files(self, cache_key): + return any(cache.use_cached_files(cache_key) for cache in self._artifact_caches) + + def delete(self, cache_key): + for cache in self._artifact_caches: # Delete from all. + cache.delete(cache_key) diff --git a/src/python/twitter/pants/cache/file_based_artifact_cache.py b/src/python/twitter/pants/cache/file_based_artifact_cache.py new file mode 100644 index 00000000000..f90dbcf30b3 --- /dev/null +++ b/src/python/twitter/pants/cache/file_based_artifact_cache.py @@ -0,0 +1,58 @@ +import os +import shutil +from twitter.common.dirutil import safe_mkdir, safe_rmtree +from twitter.pants.cache.artifact_cache import ArtifactCache + + +class FileBasedArtifactCache(ArtifactCache): + """An artifact cache that stores the artifacts in local files.""" + def __init__(self, context, artifact_root, cache_root, copy_fn=None): + """ + cache_root: The locally cached files are stored under this directory. + copy_fn: An optional function with the signature copy_fn(absolute_src_path, relative_dst_path) that + will copy cached files into the desired destination. If unspecified, a simple file copy is used. + """ + ArtifactCache.__init__(self, context, artifact_root) + self._cache_root = cache_root + self._copy_fn = copy_fn or ( + lambda src, rel_dst: shutil.copy(src, os.path.join(self.artifact_root, rel_dst))) + safe_mkdir(self._cache_root) + + def try_insert(self, cache_key, build_artifacts): + cache_dir = self._cache_dir_for_key(cache_key) + safe_rmtree(cache_dir) + for artifact in build_artifacts or (): + rel_path = os.path.relpath(artifact, self.artifact_root) + + if rel_path.startswith('..'): + raise self.CacheError('Artifact %s is not under artifact root %s' % (artifact, + self.artifact_root)) + + artifact_dest = os.path.join(cache_dir, rel_path) + safe_mkdir(os.path.dirname(artifact_dest)) + if os.path.isdir(artifact): + shutil.copytree(artifact, artifact_dest) + else: + shutil.copy(artifact, artifact_dest) + + def has(self, cache_key): + return os.path.isdir(self._cache_dir_for_key(cache_key)) + + def use_cached_files(self, cache_key): + cache_dir = self._cache_dir_for_key(cache_key) + if not os.path.exists(cache_dir): + return False + for dir_name, _, filenames in os.walk(cache_dir): + for filename in filenames: + filename = os.path.join(dir_name, filename) + relative_filename = os.path.relpath(filename, cache_dir) + self._copy_fn(filename, relative_filename) + return True + + def delete(self, cache_key): + safe_rmtree(self._cache_dir_for_key(cache_key)) + + def _cache_dir_for_key(self, cache_key): + # Note: it's important to use the id as well as the hash, because two different targets + # may have the same hash if both have no sources, but we may still want to differentiate them. + return os.path.join(self._cache_root, cache_key.id, cache_key.hash) diff --git a/src/python/twitter/pants/cache/restful_artifact_cache.py b/src/python/twitter/pants/cache/restful_artifact_cache.py new file mode 100644 index 00000000000..bed0b552479 --- /dev/null +++ b/src/python/twitter/pants/cache/restful_artifact_cache.py @@ -0,0 +1,124 @@ +import httplib +import os +import urlparse +from twitter.common.contextutil import temporary_file_path, open_tar, temporary_file +from twitter.common.quantity import Amount, Data +from twitter.pants.cache.artifact_cache import ArtifactCache + + +class RESTfulArtifactCache(ArtifactCache): + """An artifact cache that stores the artifacts on a RESTful service.""" + + READ_SIZE = Amount(4, Data.MB).as_(Data.BYTES) + + def __init__(self, context, artifact_root, url_base, compress=True): + """ + url_base: The prefix for urls on some RESTful service. We must be able to PUT and GET to any + path under this base. + compress: Whether to compress the artifacts before storing them. + """ + ArtifactCache.__init__(self, context, artifact_root) + parsed_url = urlparse.urlparse(url_base) + if parsed_url.scheme == 'http': + self._ssl = False + elif parsed_url.scheme == 'https': + self._ssl = True + else: + raise ValueError('RESTfulArtifactCache only supports HTTP and HTTPS') + self._timeout_secs = 2.0 + self._netloc = parsed_url.netloc + self._path_prefix = parsed_url.path.rstrip('/') + self.compress = compress + + def try_insert(self, cache_key, build_artifacts): + with temporary_file_path() as tarfile: + mode = 'w:bz2' if self.compress else 'w' + with open_tar(tarfile, mode, dereference=True) as tarout: + for artifact in build_artifacts: + # Adds dirs recursively. + tarout.add(artifact, os.path.relpath(artifact, self.artifact_root)) + + with open(tarfile, 'rb') as infile: + path = self._path_for_key(cache_key) + if not self._request('PUT', path, body=infile): + raise self.CacheError('Failed to PUT to %s. Error: 404' % self._url_string(path)) + + def has(self, cache_key): + return self._request('HEAD', self._path_for_key(cache_key)) is not None + + def use_cached_files(self, cache_key): + # This implementation fetches the appropriate tarball and extracts it. + path = self._path_for_key(cache_key) + try: + # Send an HTTP request for the tarball. + response = self._request('GET', path) + if response is None: + return False + expected_size = int(response.getheader('content-length', -1)) + if expected_size == -1: + raise self.CacheError('No content-length header in HTTP response') + + done = False + if self.context: + self.context.log.info('Reading %d bytes from artifact cache at %s' % + (expected_size, self._url_string(path))) + # Read the data in a loop. + with temporary_file() as outfile: + total_bytes = 0 + while not done: + data = response.read(self.READ_SIZE) + outfile.write(data) + if len(data) < self.READ_SIZE: + done = True + total_bytes += len(data) + if self.context: + self.context.log.debug('Read %d bytes' % total_bytes) + outfile.close() + # Check the size. + if total_bytes != expected_size: + raise self.CacheError('Read only %d bytes from %d expected' % (total_bytes, + expected_size)) + # Extract the tarfile. + mode = 'r:bz2' if self.compress else 'r' + with open_tar(outfile.name, mode) as tarfile: + tarfile.extractall(self.artifact_root) + return True + except Exception, e: + if self.context: + self.context.log.warn('Error while reading from artifact cache: %s' % e) + return False + + def delete(self, cache_key): + path = self._path_for_key(cache_key) + self._request('DELETE', path) + + def _path_for_key(self, cache_key): + # Note: it's important to use the id as well as the hash, because two different targets + # may have the same hash if both have no sources, but we may still want to differentiate them. + return '%s/%s/%s.tar.bz2' % (self._path_prefix, cache_key.id, cache_key.hash) + + def _connect(self): + if self._ssl: + return httplib.HTTPSConnection(self._netloc, timeout=self._timeout_secs) + else: + return httplib.HTTPConnection(self._netloc, timeout=self._timeout_secs) + + # Returns a response if we get a 200, None if we get a 404 and raises an exception otherwise. + def _request(self, method, path, body=None): + if self.context: + self.context.log.debug('Sending %s request to %s' % (method, self._url_string(path))) + # TODO(benjy): Keep connection open and reuse? + conn = self._connect() + conn.request(method, path, body=body) + response = conn.getresponse() + # TODO: Can HEAD return 204? It would be correct, but I've not seen it happen. + if response.status == 200: + return response + elif response.status == 404: + return None + else: + raise self.CacheError('Failed to %s %s. Error: %d %s' % (method, self._url_string(path), + response.status, response.reason)) + + def _url_string(self, path): + return '%s://%s%s' % (('https' if self._ssl else 'http'), self._netloc, path) diff --git a/src/python/twitter/pants/python/python_chroot.py b/src/python/twitter/pants/python/python_chroot.py index b8b42d06d5c..9ff195080ce 100644 --- a/src/python/twitter/pants/python/python_chroot.py +++ b/src/python/twitter/pants/python/python_chroot.py @@ -15,6 +15,7 @@ # ================================================================================================== from __future__ import print_function +from twitter.pants.cache.file_based_artifact_cache import FileBasedArtifactCache __author__ = 'Brian Wickman' @@ -31,7 +32,6 @@ from twitter.pants import is_concrete from twitter.pants.base import Config -from twitter.pants.cache.artifact_cache import FileBasedArtifactCache from twitter.pants.base.build_invalidator import CacheKeyGenerator from twitter.pants.targets import ( PythonAntlrLibrary, diff --git a/src/python/twitter/pants/tasks/__init__.py b/src/python/twitter/pants/tasks/__init__.py index 414f703a894..bcace10c32b 100644 --- a/src/python/twitter/pants/tasks/__init__.py +++ b/src/python/twitter/pants/tasks/__init__.py @@ -22,8 +22,8 @@ from multiprocessing.pool import ThreadPool from twitter.common.collections.orderedset import OrderedSet +from twitter.pants.cache import create_artifact_cache -from twitter.pants.cache.artifact_cache import create_artifact_cache from twitter.pants.base.hash_utils import hash_file from twitter.pants.base.build_invalidator import CacheKeyGenerator from twitter.pants.reporting.reporting_utils import items_to_report_element diff --git a/tests/python/twitter/pants/cache/test_artifact_cache.py b/tests/python/twitter/pants/cache/test_artifact_cache.py index 5a92f22e231..96dfab95549 100644 --- a/tests/python/twitter/pants/cache/test_artifact_cache.py +++ b/tests/python/twitter/pants/cache/test_artifact_cache.py @@ -1,19 +1,3 @@ -# ================================================================================================== -# Copyright 2011 Twitter, Inc. -# -------------------------------------------------------------------------------------------------- -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this work except in compliance with the License. -# You may obtain a copy of the License in the LICENSE file, or at: -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# ================================================================================================== - import SimpleHTTPServer import SocketServer @@ -23,16 +7,17 @@ from twitter.common.contextutil import pushd, temporary_dir, temporary_file from twitter.common.dirutil import safe_mkdir +from twitter.pants.cache import create_artifact_cache -from twitter.pants.cache.artifact_cache import ( - CombinedArtifactCache, - FileBasedArtifactCache, - RESTfulArtifactCache, - create_artifact_cache) from twitter.pants.base.build_invalidator import CacheKey # A very trivial server that serves files under the cwd. +from twitter.pants.cache.combined_artifact_cache import CombinedArtifactCache +from twitter.pants.cache.file_based_artifact_cache import FileBasedArtifactCache +from twitter.pants.cache.restful_artifact_cache import RESTfulArtifactCache + + class SimpleRESTHandler(SimpleHTTPServer.SimpleHTTPRequestHandler): def __init__(self, request, client_address, server): # The base class implements GET and HEAD.