Skip to content

Commit

Permalink
Merge pull request conda#1343 from conda/signed-packages
Browse files Browse the repository at this point in the history
Signed packages
  • Loading branch information
ilanschnell committed Jun 16, 2015
2 parents 7a15c90 + d23500f commit 972ad5d
Show file tree
Hide file tree
Showing 4 changed files with 118 additions and 2 deletions.
14 changes: 14 additions & 0 deletions conda/fetch.py
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,20 @@ def fetch_pkg(info, dst_dir=None, session=None):
path = join(dst_dir, fn)

download(url, path, session=session, md5=info['md5'], urlstxt=True)
if info.get('sig'):
from conda.signature import verify, SignatureError

fn2 = fn + '.sig'
url = (info['channel'] if info['sig'] == '.' else
info['sig'].rstrip('/') + '/') + fn2
log.debug("signature url=%r" % url)
download(url, join(dst_dir, fn2), session=session)
try:
if verify(path):
return
except SignatureError as e:
sys.exit(str(e))
sys.exit("Error: Signature for '%s' is invalid." % (basename(path)))


def download(url, dst_path, session=None, md5=None, urlstxt=False,
Expand Down
87 changes: 87 additions & 0 deletions conda/signature.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
import sys
import base64
import hashlib
from os.path import abspath, expanduser, isfile, join

from conda.compat import PY3

try:
from Crypto.PublicKey import RSA
except ImportError:
sys.exit("""\
Error: could not import Crypto (required for signature verification).
Run the following command:
$ conda install -n root pycrypto
""")

KEYS = {}
KEYS_DIR = abspath(expanduser('~/.conda/keys'))


def hash_file(path):
h = hashlib.new('sha256')
with open(path, 'rb') as fi:
while True:
chunk = fi.read(262144) # process chunks of 256KB
if not chunk:
break
h.update(chunk)
return h.digest()


def sig2ascii(i):
"""
Given a positive integer `i`, return a base64 encoded string
representation of the value.
"""
if i < 0:
raise ValueError('positive integer expected, got: %r' % i)
ret = []
while i:
i, r = divmod(i, 256)
ret.append(r)
if PY3:
s = bytes(n for n in ret[::-1])
else:
s = ''.join(chr(n) for n in ret[::-1])
return base64.b64encode(s).decode('utf-8')


def ascii2sig(s):
"""
Given the base64 encoded string representation of an integer (returned
by sig2ascii), return the integer value.
"""
res = 0
for c in base64.b64decode(s):
res *= 256
res += (c if PY3 else ord(c))
return res


class SignatureError(Exception):
pass


def verify(path):
"""
Verify the file `path`, with signature `path`.sig, against the key
found under ~/.conda/keys/<key_name>.pub. This function returns:
- True, if the signature is valid
- False, if the signature is invalid
It raises SignatureError when the signature file, or the public key
does not exist.
"""
sig_path = path + '.sig'
if not isfile(sig_path):
raise SignatureError("signature does not exist: %s" % sig_path)
with open(sig_path) as fi:
key_name, sig = fi.read().split()
if key_name not in KEYS:
key_path = join(KEYS_DIR, '%s.pub' % key_name)
if not isfile(key_path):
raise SignatureError("public key does not exist: %s" % key_path)
KEYS[key_name] = RSA.importKey(open(key_path).read())
key = KEYS[key_name]
return key.verify(hash_file(path), (ascii2sig(sig),))
4 changes: 2 additions & 2 deletions conda/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,10 @@ def try_write(dir_path):


def hashsum_file(path, mode='md5'):
h = hashlib.new(mode)
with open(path, 'rb') as fi:
h = hashlib.new(mode)
while True:
chunk = fi.read(262144)
chunk = fi.read(262144) # process chunks of 256KB
if not chunk:
break
h.update(chunk)
Expand Down
15 changes: 15 additions & 0 deletions tests/test_misc.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import unittest

from conda.fetch import cache_fn_url
from conda.signature import ascii2sig, sig2ascii


class TestMisc(unittest.TestCase):
Expand All @@ -9,6 +10,20 @@ def test_cache_fn_url(self):
url = "http://repo.continuum.io/pkgs/pro/osx-64/"
self.assertEqual(cache_fn_url(url), '7618c8b6.json')

def test_ascii2sig(self):
self.assertEqual(sig2ascii(1234), 'BNI=')
self.assertRaises(ValueError, sig2ascii, -1)
self.assertRaises(ValueError, sig2ascii, -12345)
self.assertRaises(TypeError, sig2ascii, 2.0)
self.assertRaises(TypeError, sig2ascii, 'a')

def test_sig2ascii(self):
self.assertEqual(ascii2sig('BNI='), 1234)

def test_sig2ascii2sig(self):
for i in range(10000):
self.assertEqual(ascii2sig(sig2ascii(i)), i)


if __name__ == '__main__':
unittest.main()

0 comments on commit 972ad5d

Please sign in to comment.