-
Notifications
You must be signed in to change notification settings - Fork 47
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Paul Egan
committed
Feb 18, 2013
1 parent
a599d10
commit 1393935
Showing
6 changed files
with
433 additions
and
3 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,22 @@ | ||
Copyright (c) 2013, rockpack ltd | ||
All rights reserved. | ||
|
||
Redistribution and use in source and binary forms, with or without | ||
modification, are permitted provided that the following conditions are met: | ||
|
||
1. Redistributions of source code must retain the above copyright notice, this | ||
list of conditions and the following disclaimer. | ||
2. Redistributions in binary form must reproduce the above copyright notice, | ||
this list of conditions and the following disclaimer in the documentation | ||
and/or other materials provided with the distribution. | ||
|
||
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND | ||
ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED | ||
WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE | ||
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR | ||
ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES | ||
(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; | ||
LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND | ||
ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT | ||
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS | ||
SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,4 +1,58 @@ | ||
s3yum-updater | ||
============= | ||
This daemon script can be used to keep an s3-hosted yum repository updated | ||
when new rpm packages are uploaded. It is equivalent to using `createrepo` | ||
and an `s3cmd sync`. Only a temporary copy of the repo metadata is needed | ||
locally, so there's no need to keep a full clone of the repository and all | ||
it's packages. This is also very useful if packages are uploaded by many | ||
users or systems. Having a single `repoupdate-daemon` will ensure all new | ||
packages are added to the repository metadata, avoiding issues with | ||
concurrent updates. | ||
|
||
Daemon script for s3-hosted yum repositories that updates repodata on rpm package upload | ||
The upload of a new package to s3 should be handled by whatever client is | ||
used to build the rpm, e.g. a CI system like Jenkins. The daemon listens | ||
for SNS notifications on an SQS queue which inform it of the path for these | ||
new rpm files. The daemon then downloads the repodata, updates, and uploads | ||
again. | ||
|
||
You can use the included `publish-packages` script to upload rpms to s3 and | ||
notify the update daemon. | ||
|
||
By default the daemon is configured to keep only the last two versions of | ||
each package. | ||
|
||
Install | ||
------- | ||
|
||
You can use the included spec file to build an rpm and then `yum localinstall` | ||
it. | ||
|
||
Configure | ||
--------- | ||
|
||
Create an s3 bucket to host the yum repository. Create an SNS topic and an SQS | ||
queue that is subscribed to it. | ||
|
||
Override default options: | ||
|
||
echo 'OPTIONS="$OPTIONS -b mybucket -q myqueue"' >/etc/sysconfig/repoupdate-daemon | ||
|
||
The daemon uses standard boto configuation to access the AWS credentials: IAM | ||
role, environment variables, or boto config file. | ||
|
||
Run | ||
--- | ||
|
||
service repoupdate-daemon start | ||
|
||
Test | ||
---- | ||
|
||
publish-packages --bucket mybucket --sns-topic mytopic *.rpm | ||
|
||
--- | ||
|
||
Related Tools | ||
------------- | ||
|
||
https://github.com/seporaitis/yum-s3-iam | ||
https://wiki.jenkins-ci.org/display/JENKINS/S3+Plugin | ||
https://wiki.jenkins-ci.org/display/JENKINS/Amazon+SNS+Notifier |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,21 @@ | ||
#!/usr/bin/env python | ||
"""Script to upload packages to s3 and notify repoupdate-daemon.""" | ||
import os | ||
import optparse | ||
import boto | ||
import boto.sns | ||
|
||
parser = optparse.OptionParser() | ||
parser.add_option('--bucket', default='packages.example.com') | ||
parser.add_option('--repopath', default='development/x86_64') | ||
parser.add_option('--region', default='us-east-1') | ||
parser.add_option('--sns-topic', default='arn:aws:sns:us-east-1:123:packages-new') | ||
options, args = parser.parse_args() | ||
|
||
sns = boto.sns.connect_to_region(options.region) | ||
bucket = boto.connect_s3().get_bucket(options.bucket) | ||
for rpmfile in args: | ||
filename = os.path.split(rpmfile)[1] | ||
key = bucket.new_key(os.path.join(options.repopath, filename)) | ||
key.set_contents_from_filename(rpmfile) | ||
sns.publish(options.sns_topic, filename, options.repopath) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,61 @@ | ||
#!/bin/bash | ||
|
||
# repoupdate-daemon - daemon for serialising metadata updates on an s3-hosted yum repository | ||
# | ||
# chkconfig: - 85 15 | ||
# description: daemon for serialising metadata updates on an s3-hosted yum repository | ||
# processname: repoupdate-daemon | ||
|
||
|
||
. /etc/rc.d/init.d/functions | ||
|
||
NAME=repoupdate-daemon | ||
USER=nobody | ||
BIN=/usr/bin/$NAME | ||
LOG=/var/log/$NAME.log | ||
PID=/var/run/$NAME/pid | ||
RETVAL=0 | ||
|
||
QUEUE_NAME="packages-new" | ||
|
||
OPTIONS="-q $QUEUE_NAME -d -U $USER -P $PID -l $LOG -v" | ||
|
||
[ -r /etc/sysconfig/$NAME ] && . /etc/sysconfig/$NAME | ||
|
||
start() { | ||
touch $LOG | ||
mkdir -p `dirname $PID` | ||
chown $USER $LOG `dirname $PID` | ||
|
||
echo -n "Starting $NAME: " | ||
daemon $BIN $OPTIONS | ||
RETVAL=$? | ||
echo | ||
return $RETVAL | ||
} | ||
|
||
stop() { | ||
echo -n "Stopping $NAME: " | ||
killproc -p $PID $BIN | ||
RETVAL=$? | ||
echo | ||
return $RETVAL | ||
} | ||
|
||
case "$1" in | ||
start|stop) | ||
$1 | ||
;; | ||
restart) | ||
stop && start | ||
;; | ||
status) | ||
status -p $PID $NAME | ||
;; | ||
*) | ||
echo "Usage: $0 {start|stop|restart|status}" >&2 | ||
exit 2 | ||
;; | ||
esac | ||
|
||
exit $RETVAL |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,218 @@ | ||
#!/usr/bin/env python | ||
"""Daemon for serialising metadata updates on an s3-hosted yum repository. | ||
Listens on SQS for SNS messages that specify new packages published to s3. | ||
After waiting a while and grouping any additional messages, this script will | ||
update the yum repodata to include all the new packages. | ||
Assuming you have an SQS queue subscribed to an SNS topic, you can upload | ||
a package and notify this daemon by specifying the rpm filename in the SNS | ||
message body (and optionally give the base repository path in the subject): | ||
>>> bucket = boto.connect_s3().get_bucket('bucket') | ||
>>> bucket.new_key('repo/path/mypackage.rpm').set_contents_from_string('...') | ||
>>> boto.connect_sns().publish('TOPIC', 'mypackage.rpm', 'repo/path') | ||
""" | ||
import os | ||
import time | ||
import urlparse | ||
import tempfile | ||
import shutil | ||
import optparse | ||
import logging | ||
import collections | ||
import yum | ||
import createrepo | ||
import boto | ||
import boto.sqs | ||
import boto.sqs.message | ||
from boto.sqs.jsonmessage import json | ||
|
||
|
||
# Hack for creating s3 urls | ||
urlparse.uses_relative.append('s3') | ||
urlparse.uses_netloc.append('s3') | ||
|
||
|
||
class LoggerCallback(object): | ||
def errorlog(self, message): | ||
logging.error(message) | ||
|
||
def log(self, message): | ||
message = message.strip() | ||
if message: | ||
logging.debug(message) | ||
|
||
|
||
class S3Grabber(object): | ||
def __init__(self, baseurl): | ||
base = urlparse.urlsplit(baseurl) | ||
self.baseurl = baseurl | ||
self.basepath = base.path.lstrip('/') | ||
self.bucket = boto.connect_s3().get_bucket(base.netloc) | ||
|
||
def urlgrab(self, url, filename, **kwargs): | ||
if url.startswith(self.baseurl): | ||
url = url[len(self.baseurl):].lstrip('/') | ||
key = self.bucket.get_key(os.path.join(self.basepath, url)) | ||
if not key: | ||
raise createrepo.grabber.URLGrabError(14, '%s not found' % url) | ||
logging.info('downloading: %s', key.name) | ||
key.get_contents_to_filename(filename) | ||
return filename | ||
|
||
def syncdir(self, dir, url): | ||
"""Copy all files in dir to url, removing any existing keys.""" | ||
base = os.path.join(self.basepath, url) | ||
existing_keys = list(self.bucket.list(base)) | ||
new_keys = [] | ||
for filename in sorted(os.listdir(dir)): | ||
key = self.bucket.new_key(os.path.join(base, filename)) | ||
key.set_contents_from_filename(os.path.join(dir, filename)) | ||
new_keys.append(key.name) | ||
logging.info('uploading: %s', key.name) | ||
for key in existing_keys: | ||
if key.name not in new_keys: | ||
key.delete() | ||
logging.info('removing: %s', key.name) | ||
|
||
|
||
def update_repodata(repopath, rpmfiles, options): | ||
tmpdir = tempfile.mkdtemp() | ||
s3base = urlparse.urlunsplit(('s3', options.bucket, repopath, '', '')) | ||
s3grabber = S3Grabber(s3base) | ||
|
||
# Set up temporary repo that will fetch repodata from s3 | ||
yumbase = yum.YumBase() | ||
yumbase.preconf.disabled_plugins = '*' | ||
yumbase.conf.cachedir = os.path.join(tmpdir, 'cache') | ||
yumbase.repos.disableRepo('*') | ||
repo = yumbase.add_enable_repo('s3') | ||
repo._grab = s3grabber | ||
# Ensure that missing base path doesn't cause trouble | ||
repo._sack = yum.sqlitesack.YumSqlitePackageSack( | ||
createrepo.readMetadata.CreaterepoPkgOld) | ||
|
||
# Create metadata generator | ||
mdconf = createrepo.MetaDataConfig() | ||
mdconf.directory = tmpdir | ||
mdgen = createrepo.MetaDataGenerator(mdconf, LoggerCallback()) | ||
mdgen.tempdir = tmpdir | ||
mdgen._grabber = s3grabber | ||
|
||
# Combine existing package sack with new rpm file list | ||
new_packages = [] | ||
for rpmfile in rpmfiles: | ||
newpkg = mdgen.read_in_package(os.path.join(s3base, rpmfile)) | ||
newpkg._baseurl = '' # don't leave s3 base urls in primary metadata | ||
older_pkgs = yumbase.pkgSack.searchNevra(name=newpkg.name) | ||
# Remove older versions of this package (or if it's the same version) | ||
for i, older in enumerate(reversed(older_pkgs), 1): | ||
if i > options.keep or older.pkgtup == newpkg.pkgtup: | ||
yumbase.pkgSack.delPackage(older) | ||
logging.info('ignoring: %s', older.ui_nevra) | ||
new_packages.append(newpkg) | ||
mdconf.pkglist = list(yumbase.pkgSack) + new_packages | ||
|
||
# Write out new metadata to tmpdir | ||
mdgen.doPkgMetadata() | ||
mdgen.doRepoMetadata() | ||
mdgen.doFinalMove() | ||
|
||
# Replace metadata on s3 | ||
s3grabber.syncdir(os.path.join(tmpdir, 'repodata'), 'repodata') | ||
|
||
shutil.rmtree(tmpdir) | ||
|
||
|
||
def main(options, args): | ||
loglevel = ('WARNING', 'INFO', 'DEBUG')[min(2, options.verbose)] | ||
logging.basicConfig( | ||
filename=options.logfile, | ||
level=logging.getLevelName(loglevel), | ||
format='%(asctime)s %(levelname)s %(message)s', | ||
) | ||
|
||
if args and not options.sqs_name: | ||
return update_repodata(options.repopath, args, options) | ||
|
||
conn = boto.sqs.connect_to_region(options.region) | ||
queue = conn.get_queue(options.sqs_name) | ||
queue.set_message_class(boto.sqs.message.RawMessage) | ||
messages = [] | ||
delay_count = 0 | ||
visibility_timeout = ((options.process_delay_count + 2) * | ||
options.queue_check_interval) | ||
logging.debug('sqs visibility_timeout: %d', visibility_timeout) | ||
|
||
while True: | ||
new_messages = queue.get_messages(10, visibility_timeout) | ||
if new_messages: | ||
messages.extend(new_messages) | ||
# Immediately check for more messages | ||
continue | ||
if messages: | ||
if delay_count < options.process_delay_count: | ||
logging.debug('Delaying processing: %d < %d', delay_count, | ||
options.process_delay_count) | ||
delay_count += 1 | ||
else: | ||
pkgmap = collections.defaultdict(list) | ||
for message in messages: | ||
body = json.loads(message.get_body()) | ||
repopath = str(body.get('Subject', options.repopath)) | ||
pkgmap[repopath].append(str(body['Message'])) | ||
for repopath, rpmfiles in pkgmap.items(): | ||
logging.info('updating: %s: %r', repopath, rpmfiles) | ||
try: | ||
update_repodata(repopath, set(rpmfiles), options) | ||
except: | ||
# sqs messages will be deleted even on failure | ||
logging.exception('update failed: %s', repopath) | ||
# Reset: | ||
for message in messages: | ||
message.delete() | ||
messages = [] | ||
delay_count = 0 | ||
logging.debug('sleeping %ds...', options.queue_check_interval) | ||
try: | ||
time.sleep(options.queue_check_interval) | ||
except KeyboardInterrupt: | ||
break | ||
|
||
|
||
if __name__ == '__main__': | ||
parser = optparse.OptionParser() | ||
parser.add_option('-b', '--bucket', default='packages.example.com') | ||
parser.add_option('-p', '--repopath', default='development/x86_64') | ||
parser.add_option('-r', '--region', default='us-east-1') | ||
parser.add_option('-q', '--sqs-name') | ||
parser.add_option('-k', '--keep', type='int', default=2) | ||
parser.add_option('-v', '--verbose', action='count', default=0) | ||
parser.add_option('-l', '--logfile') | ||
parser.add_option('-d', '--daemon', action='store_true') | ||
parser.add_option('-P', '--pidfile') | ||
parser.add_option('-U', '--user') | ||
parser.add_option('--queue-check-interval', type='int', default=60) | ||
parser.add_option('--process-delay-count', type='int', default=2) | ||
options, args = parser.parse_args() | ||
|
||
if not options.sqs_name and not args: | ||
parser.error("Must specify SQS queue name or rpm file args") | ||
if options.sqs_name and args: | ||
parser.error("Don't give file args when specifying an SQS queue") | ||
|
||
if options.daemon: | ||
import daemon | ||
daemon_args = {} | ||
if options.pidfile: | ||
from daemon.pidlockfile import PIDLockFile | ||
daemon_args['pidfile'] = PIDLockFile(options.pidfile) | ||
if options.user: | ||
import pwd | ||
user = pwd.getpwnam(options.user) | ||
daemon_args['uid'] = user.pw_uid | ||
daemon_args['gid'] = user.pw_gid | ||
with daemon.DaemonContext(**daemon_args): | ||
main(options, args) | ||
else: | ||
main(options, args) |
Oops, something went wrong.