Skip to content

Commit

Permalink
Added a new blobstore based on OpenStack Swift
Browse files Browse the repository at this point in the history
  • Loading branch information
alex authored and Daniel Farina committed Dec 6, 2013
1 parent a2d4f89 commit ebd482a
Show file tree
Hide file tree
Showing 16 changed files with 401 additions and 14 deletions.
11 changes: 8 additions & 3 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,10 @@ will attempt to resolve them:
Backend Blob Store
------------------

The storage backend is determined by the defined _PREFIX. Prefixes with
the scheme ``s3`` will be directed towards S3, where those with the scheme
``wabs`` will be directed towards Windows Azure Blob Service.
The storage backend is determined by the defined _PREFIX. Prefixes with the
scheme ``s3`` will be directed towards S3, those with the scheme ``wabs`` will
be directed towards Windows Azure Blob Service, and those with the scheme
``swift`` will be directed towards an OpenStack Swift installation.

Example S3 Prefix:

Expand All @@ -93,6 +94,10 @@ Example WABS Prefix:

wabs://some-container/directory/or/whatever

Example OpenStack Swift Prefix:

swift://some-container/directory/or/whatever


Examples
--------
Expand Down
2 changes: 2 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
gevent>=0.13.1
boto>=2.6.0
azure==0.7.0
python-swiftclient
python-keystoneclient
3 changes: 3 additions & 0 deletions wal_e/blobstore/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,7 @@ def get_blobstore(layout):
elif layout.is_wabs:
from wal_e.blobstore import wabs
blobstore = wabs
elif layout.is_swift:
from wal_e.blobstore import swift
blobstore = swift
return blobstore
13 changes: 13 additions & 0 deletions wal_e/blobstore/swift/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
from wal_e.blobstore.swift.credentials import Credentials
from wal_e.blobstore.swift.utils import (
uri_put_file, uri_get_file, do_lzop_get, write_and_return_error, SwiftKey
)

__all__ = [
"Credentials",
"uri_put_file",
"uri_get_file",
"do_lzop_get",
"write_and_return_error",
"SwiftKey",
]
15 changes: 15 additions & 0 deletions wal_e/blobstore/swift/calling_format.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import swiftclient


def connect(creds):
"""
Construct a connection value from a container
"""
return swiftclient.Connection(
authurl=creds.authurl,
user=creds.user,
key=creds.password,
auth_version="2",
tenant_name=creds.tenant_name,
os_options={"region_name": creds.region},
)
7 changes: 7 additions & 0 deletions wal_e/blobstore/swift/credentials.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
class Credentials(object):
def __init__(self, authurl, user, password, tenant_name, region):
self.authurl = authurl
self.user = user
self.password = password
self.tenant_name = tenant_name
self.region = region
126 changes: 126 additions & 0 deletions wal_e/blobstore/swift/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
import socket
import traceback
from urlparse import urlparse

import gevent

from wal_e import log_help
from wal_e.blobstore.swift import calling_format
from wal_e.pipeline import get_download_pipeline
from wal_e.piper import PIPE
from wal_e.retries import retry, retry_with_count


logger = log_help.WalELogger(__name__)


class SwiftKey(object):
def __init__(self, name, size, last_modified=None):
self.name = name
self.size = size
self.last_modified = last_modified


def uri_put_file(creds, uri, fp, content_encoding=None):
assert fp.tell() == 0
assert uri.startswith('swift://')

url_tup = urlparse(uri)

container_name = url_tup.netloc
conn = calling_format.connect(creds)

data = fp.read()
conn.put_object(
container_name, url_tup.path, data, content_type=content_encoding
)
return SwiftKey(url_tup.path, len(data))


def do_lzop_get(creds, uri, path, decrypt):
"""
Get and decompress a Swift URL
This streams the content directly to lzop; the compressed version
is never stored on disk.
"""
assert uri.endswith('.lzo'), 'Expect an lzop-compressed file'

def log_wal_fetch_failures_on_error(exc_tup, exc_processor_cxt):
def standard_detail_message(prefix=''):
return (prefix + ' There have been {n} attempts to fetch wal '
'file {uri} so far.'.format(n=exc_processor_cxt, uri=uri))
typ, value, tb = exc_tup
del exc_tup

# Screen for certain kinds of known-errors to retry from
if issubclass(typ, socket.error):
socketmsg = value[1] if isinstance(value, tuple) else value

logger.info(
msg='Retrying fetch because of a socket error',
detail=standard_detail_message(
"The socket error's message is '{0}'."
.format(socketmsg)))
else:
# For all otherwise untreated exceptions, report them as a
# warning and retry anyway -- all exceptions that can be
# justified should be treated and have error messages
# listed.
logger.warning(
msg='retrying WAL file fetch from unexpected exception',
detail=standard_detail_message(
'The exception type is {etype} and its value is '
'{evalue} and its traceback is {etraceback}'
.format(etype=typ, evalue=value,
etraceback=''.join(traceback.format_tb(tb)))))

# Help Python GC by resolving possible cycles
del tb

@retry(retry_with_count(log_wal_fetch_failures_on_error))
def download():
with open(path, 'wb') as decomp_out:
pipeline = get_download_pipeline(PIPE, decomp_out, decrypt)

conn = calling_format.connect(creds)

g = gevent.spawn(write_and_return_error, uri, conn, pipeline.stdin)

# Raise any exceptions from write_and_return_error
exc = g.get()
if exc is not None:
raise exc

pipeline.finish()

logger.info(
msg='completed download and decompression',
detail='Downloaded and decompressed "{uri}" to "{path}"'
.format(uri=uri, path=path))
return True

return download()


def uri_get_file(creds, uri, conn=None):
assert uri.startswith('swift://')
url_tup = urlparse(uri)
container_name = url_tup.netloc
object_name = url_tup.path

if conn is None:
conn = calling_format.connect(creds)
_, content = conn.get_object(container_name, object_name)
return content


def write_and_return_error(uri, conn, stream):
try:
stream.write(uri_get_file(None, uri, conn))
stream.flush()
except Exception, e:
return e
finally:
stream.close()
25 changes: 21 additions & 4 deletions wal_e/cmd.py
Original file line number Diff line number Diff line change
Expand Up @@ -340,14 +340,18 @@ def build_parser():
def configure_backup_cxt(args):
# Try to find some WAL-E prefix to store data in.
prefix = (args.s3_prefix or args.wabs_prefix
or os.getenv('WALE_S3_PREFIX') or os.getenv('WALE_WABS_PREFIX'))
or os.getenv('WALE_S3_PREFIX') or os.getenv('WALE_WABS_PREFIX')
or os.getenv('WALE_SWIFT_PREFIX'))

if prefix is None:
raise UserException(
msg='no storage prefix defined',
hint=('Either set one of the --wabs-prefix or --s3-prefix '
'options or define one of the WALE_WABS_PREFIX or '
'WALE_S3_PREFIX environment variables.'))
hint=(
'Either set one of the --wabs-prefix or --s3-prefix options or'
' define one of the WALE_WABS_PREFIX, WALE_S3_PREFIX, or '
'WALE_SWIFT_PREFIX environment variables.'
)
)

store = storage.StorageLayout(prefix)

Expand Down Expand Up @@ -413,6 +417,18 @@ def _env_hint(optname):
creds = wabs.Credentials(account_name, access_key)

return WABSBackup(store, creds, gpg_key_id)
elif store.is_swift:
from wal_e.blobstore import swift
from wal_e.operator.swift_operator import SwiftBackup

creds = swift.Credentials(
os.getenv('SWIFT_AUTHURL'),
os.getenv('SWIFT_USER'),
os.getenv('SWIFT_PASSWORD'),
os.getenv('SWIFT_TENANT'),
os.getenv('SWIFT_REGION'),
)
return SwiftBackup(store, creds, gpg_key_id)
else:
raise UserCritical(
msg='no unsupported blob stores should get here',
Expand Down Expand Up @@ -517,6 +533,7 @@ def just_error(*args, **kwargs):
backup_cxt.delete_all(is_dry_run_really)
elif args.delete_subcommand == 'before':
segment_info = extract_segment(args.BEFORE_SEGMENT_EXCLUSIVE)
assert segment_info is not None
backup_cxt.delete_before(is_dry_run_really, segment_info)
else:
assert False, 'Should be rejected by argument parsing.'
Expand Down
14 changes: 14 additions & 0 deletions wal_e/operator/swift_operator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
from wal_e.blobstore.swift import calling_format
from wal_e.operator.backup import Backup
from wal_e.worker.swift import swift_worker


class SwiftBackup(Backup):
"""
Aerforms OpenStack Swift uploads of PostgreSQL WAL files and clusters
"""

def __init__(self, layout, creds, gpg_key_id):
super(SwiftBackup, self).__init__(layout, creds, gpg_key_id)
self.cinfo = calling_format
self.worker = swift_worker
2 changes: 2 additions & 0 deletions wal_e/storage/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from wal_e.storage.base import VOLUME_REGEXP
from wal_e.storage.base import StorageLayout
from wal_e.storage.base import get_backup_info
from wal_e.storage.base import SegmentNumber


__all__ = [
Expand All @@ -21,4 +22,5 @@
'COMPLETE_BASE_BACKUP_REGEXP',
'VOLUME_REGEXP',
'get_backup_info',
'SegmentNumber',
]
25 changes: 21 additions & 4 deletions wal_e/storage/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ def as_an_integer(self):

OBSOLETE_VERSIONS = frozenset(('004', '003', '002', '001', '000'))

SUPPORTED_STORE_SCHEMES = ('s3', 'wabs')
SUPPORTED_STORE_SCHEMES = ('s3', 'wabs', 'swift')


# Exhaustively enumerates all possible metadata about a backup. These
Expand Down Expand Up @@ -127,6 +127,19 @@ class StorageLayout(object):
>>> sl.store_name()
'foo'
Swift:
Without a trailing slash
>>> sl = StorageLayout('swift://foo/bar')
>>> sl.is_swift
True
>>> sl.basebackups()
'bar/basebackups_005/'
>>> sl.wal_directory()
'bar/wal_005/'
>>> sl.store_name()
'foo'
"""

def __init__(self, prefix, version=CURRENT_VERSION):
Expand All @@ -136,9 +149,10 @@ def __init__(self, prefix, version=CURRENT_VERSION):

if url_tup.scheme not in SUPPORTED_STORE_SCHEMES:
raise wal_e.exception.UserException(
msg='bad S3 or Windows Azure Blob Storage URL scheme passed',
detail=('The scheme {0} was passed when "s3" or "wabs" '
'was expected.'.format(url_tup.scheme)))
msg='bad S3, Windows Azure Blob Storage, or OpenStack Swift '
'URL scheme passed',
detail=('The scheme {0} was passed when "s3", "wabs", or '
'"swift" was expected.'.format(url_tup.scheme)))

for scheme in SUPPORTED_STORE_SCHEMES:
setattr(self, 'is_%s' % scheme, scheme == url_tup.scheme)
Expand Down Expand Up @@ -232,4 +246,7 @@ def get_backup_info(layout, **kwargs):
elif layout.is_wabs:
from wal_e.storage.wabs_storage import WABSBackupInfo
bi = WABSBackupInfo(**kwargs)
elif layout.is_swift:
from wal_e.storage.swift_storage import SwiftBackupInfo
bi = SwiftBackupInfo(**kwargs)
return bi
21 changes: 21 additions & 0 deletions wal_e/storage/swift_storage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import json

from wal_e.blobstore import swift
from wal_e.storage.base import BackupInfo


class SwiftBackupInfo(BackupInfo):
def load_detail(self, conn):
if self._details_loaded:
return

uri = "{scheme}://{bucket}/{path}".format(
scheme=self.layout.scheme,
bucket=self.layout.store_name(),
path=self.layout.basebackup_sentinel(self))

data = json.loads(swift.uri_get_file(None, uri, conn=conn))
for k, v in data.items():
setattr(self, k, v)

self._details_loaded = True
8 changes: 5 additions & 3 deletions wal_e/worker/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -359,9 +359,11 @@ def delete_if_qualifies(delete_horizon_segment_number,
wal_key_depth = self.layout.wal_directory().count('/') + 1
for key in self._backup_list(prefix=self.layout.wal_directory()):
key_name = self.layout.key_name(key)
url = '{scm}://{bucket}/{name}'.format(scm=self.layout.scheme,
bucket=key.bucket.name,
name=key_name)
url = '{scm}://{bucket}/{name}'.format(
scm=self.layout.scheme,
bucket=self.layout.store_name(),
name=key_name
)
key_parts = key_name.split('/')
key_depth = len(key_parts)
if key_depth != wal_key_depth:
Expand Down
12 changes: 12 additions & 0 deletions wal_e/worker/swift/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
from wal_e.worker.swift.swift_deleter import Deleter
from wal_e.worker.swift.swift_worker import (
TarPartitionLister, BackupFetcher, BackupList, DeleteFromContext
)

__all__ = [
"Deleter",
"TarPartitionLister",
"BackupFetcher",
"BackupList",
"DeleteFromContext",
]
Loading

0 comments on commit ebd482a

Please sign in to comment.