diff --git a/qiniu/auth.py b/qiniu/auth.py index 7f9f87e5..8da00b8e 100644 --- a/qiniu/auth.py +++ b/qiniu/auth.py @@ -35,6 +35,7 @@ 'persistentNotifyUrl', # 持久化处理结果通知URL 'persistentPipeline', # 持久化处理独享队列 'deleteAfterDays', # 文件多少天后自动删除 + 'fileType', # 文件的存储类型,0为普通存储,1为低频存储 ]) diff --git a/qiniu/services/storage/bucket.py b/qiniu/services/storage/bucket.py index 906a1707..c93d2506 100644 --- a/qiniu/services/storage/bucket.py +++ b/qiniu/services/storage/bucket.py @@ -17,7 +17,7 @@ class BucketManager(object): def __init__(self, auth, zone=None): self.auth = auth - if(zone is None): + if (zone is None): self.zone = config.get_default('default_zone') else: self.zone = zone @@ -68,7 +68,7 @@ def stat(self, bucket, key): """获取文件信息: 获取资源的元信息,但不返回文件内容,具体规格参考: - http://developer.qiniu.com/docs/v6/api/reference/rs/stat.html + https://developer.qiniu.com/kodo/api/1308/stat Args: bucket: 待获取信息资源所在的空间 @@ -81,6 +81,7 @@ def stat(self, bucket, key): "hash": "ljfockr0lOil_bZfyaI2ZY78HWoH", "mimeType": "application/octet-stream", "putTime": 13603956734587420 + "type": 0 } 一个ResponseInfo对象 """ @@ -210,6 +211,20 @@ def change_mime(self, bucket, key, mime): encode_mime = urlsafe_base64_encode(mime) return self.__rs_do('chgm', resource, 'mime/{0}'.format(encode_mime)) + def change_type(self, bucket, key, storage_type): + """修改文件的存储类型 + + 修改文件的存储类型为普通存储或者是低频存储,参考温度: + https://developer.qiniu.com/kodo/api/3710/modify-the-file-type + + Args: + bucket: 待操作资源所在空间 + key: 待操作资源文件名 + storage_type: 待操作资源存储类型,0为普通存储,1为低频存储 + """ + resource = entry(bucket, key) + return self.__rs_do('chtype', resource, 'type/{0}'.format(storage_type)) + def batch(self, operations): """批量操作: @@ -319,4 +334,5 @@ def _one_key_batch(operation, bucket, keys): def _two_key_batch(operation, source_bucket, key_pairs, target_bucket, force='false'): if target_bucket is None: target_bucket = source_bucket - return [_build_op(operation, entry(source_bucket, k), entry(target_bucket, v), 'force/{0}'.format(force)) for k, v in key_pairs.items()] + return [_build_op(operation, entry(source_bucket, k), entry(target_bucket, v), 'force/{0}'.format(force)) for k, v + in key_pairs.items()] diff --git a/test_qiniu.py b/test_qiniu.py index 0c9caf3f..3d77aa55 100644 --- a/test_qiniu.py +++ b/test_qiniu.py @@ -11,7 +11,8 @@ from qiniu import Auth, set_default, etag, PersistentFop, build_op, op_save, Zone from qiniu import put_data, put_file, put_stream -from qiniu import BucketManager, build_batch_copy, build_batch_rename, build_batch_move, build_batch_stat, build_batch_delete +from qiniu import BucketManager, build_batch_copy, build_batch_rename, build_batch_move, build_batch_stat, \ + build_batch_delete from qiniu import urlsafe_base64_encode, urlsafe_base64_decode from qiniu.compat import is_py2, is_py3, b @@ -24,6 +25,7 @@ import sys import StringIO import urllib + reload(sys) sys.setdefaultencoding('utf-8') StringIO = StringIO.StringIO @@ -31,6 +33,7 @@ elif is_py3: import io import urllib + StringIO = io.StringIO urlopen = urllib.request.urlopen @@ -51,7 +54,7 @@ def rand_string(length): def create_temp_file(size): t = tempfile.mktemp() f = open(t, 'wb') - f.seek(size-1) + f.seek(size - 1) f.write(b('0')) f.close() return t @@ -69,7 +72,6 @@ def is_travis(): class UtilsTest(unittest.TestCase): - def test_urlsafe(self): a = 'hello\x96' u = urlsafe_base64_encode(a) @@ -77,7 +79,6 @@ def test_urlsafe(self): class AuthTestCase(unittest.TestCase): - def test_token(self): token = dummy_auth.token('test') assert token == 'abcdefghklmnopq:mSNBTR7uS2crJsyFr2Amwv1LaYg=' @@ -129,7 +130,8 @@ def test_prefetch(self): assert ret['key'] == 'python-sdk.html' def test_fetch(self): - ret, info = self.bucket.fetch('http://developer.qiniu.com/docs/v6/sdk/python-sdk.html', bucket_name, 'fetch.html') + ret, info = self.bucket.fetch('http://developer.qiniu.com/docs/v6/sdk/python-sdk.html', bucket_name, + 'fetch.html') print(info) assert ret['key'] == 'fetch.html' assert 'hash' in ret @@ -152,7 +154,7 @@ def test_delete(self): assert info.status_code == 612 def test_rename(self): - key = 'renameto'+rand_string(8) + key = 'renameto' + rand_string(8) self.bucket.copy(bucket_name, 'copyfrom', bucket_name, key) key2 = key + 'move' ret, info = self.bucket.rename(bucket_name, key, key2) @@ -163,7 +165,7 @@ def test_rename(self): assert ret == {} def test_copy(self): - key = 'copyto'+rand_string(8) + key = 'copyto' + rand_string(8) ret, info = self.bucket.copy(bucket_name, 'copyfrom', bucket_name, key) print(info) assert ret == {} @@ -176,27 +178,24 @@ def test_change_mime(self): print(info) assert ret == {} - def test_copy(self): - key = 'copyto'+rand_string(8) - ret, info = self.bucket.copy(bucket_name, 'copyfrom', bucket_name, key) + def test_change_type(self): + target_key = 'copyto' + rand_string(8) + self.bucket.copy(bucket_name, 'copyfrom', bucket_name, target_key) + ret, info = self.bucket.change_type(bucket_name, target_key, 1) print(info) assert ret == {} - ret, info = self.bucket.delete(bucket_name, key) + ret, info = self.bucket.stat(bucket_name, target_key) print(info) - assert ret == {} + assert 'type' in ret + self.bucket.delete(bucket_name, target_key) def test_copy_force(self): ret, info = self.bucket.copy(bucket_name, 'copyfrom', bucket_name, 'copyfrom', force='true') print(info) assert info.status_code == 200 - def test_change_mime(self): - ret, info = self.bucket.change_mime(bucket_name, 'python-sdk.html', 'text/html') - print(info) - assert ret == {} - def test_batch_copy(self): - key = 'copyto'+rand_string(8) + key = 'copyto' + rand_string(8) ops = build_batch_copy(bucket_name, {'copyfrom': key}, bucket_name) ret, info = self.bucket.batch(ops) print(info) @@ -213,7 +212,7 @@ def test_batch_copy_force(self): assert ret[0]['code'] == 200 def test_batch_move(self): - key = 'moveto'+rand_string(8) + key = 'moveto' + rand_string(8) self.bucket.copy(bucket_name, 'copyfrom', bucket_name, key) key2 = key + 'move' ops = build_batch_move(bucket_name, {key: key2}, bucket_name) @@ -225,16 +224,16 @@ def test_batch_move(self): assert ret == {} def test_batch_move_force(self): - ret,info = self.bucket.copy(bucket_name, 'copyfrom', bucket_name, 'copyfrom', force='true') + ret, info = self.bucket.copy(bucket_name, 'copyfrom', bucket_name, 'copyfrom', force='true') print(info) assert info.status_code == 200 - ops = build_batch_move(bucket_name, {'copyfrom':'copyfrom'}, bucket_name,force='true') + ops = build_batch_move(bucket_name, {'copyfrom': 'copyfrom'}, bucket_name, force='true') ret, info = self.bucket.batch(ops) print(info) assert ret[0]['code'] == 200 def test_batch_rename(self): - key = 'rename'+rand_string(8) + key = 'rename' + rand_string(8) self.bucket.copy(bucket_name, 'copyfrom', bucket_name, key) key2 = key + 'rename' ops = build_batch_move(bucket_name, {key: key2}, bucket_name) @@ -246,10 +245,10 @@ def test_batch_rename(self): assert ret == {} def test_batch_rename_force(self): - ret,info = self.bucket.rename(bucket_name, 'copyfrom', 'copyfrom', force='true') + ret, info = self.bucket.rename(bucket_name, 'copyfrom', 'copyfrom', force='true') print(info) assert info.status_code == 200 - ops = build_batch_rename(bucket_name, {'copyfrom':'copyfrom'}, force='true') + ops = build_batch_rename(bucket_name, {'copyfrom': 'copyfrom'}, force='true') ret, info = self.bucket.batch(ops) print(info) assert ret[0]['code'] == 200 @@ -262,16 +261,15 @@ def test_batch_stat(self): def test_delete_after_days(self): days = '5' - ret, info = self.bucket.delete_after_days(bucket_name,'invaild.html', days) + ret, info = self.bucket.delete_after_days(bucket_name, 'invaild.html', days) assert info.status_code == 612 - key = 'copyto'+rand_string(8) + key = 'copyto' + rand_string(8) ret, info = self.bucket.copy(bucket_name, 'copyfrom', bucket_name, key) ret, info = self.bucket.delete_after_days(bucket_name, key, days) assert info.status_code == 200 class UploaderTestCase(unittest.TestCase): - mime_type = "text/plain" params = {'x:a': 'a'} q = Auth(access_key, secret_key) @@ -400,7 +398,6 @@ def test_putData_without_fname2(self): class ResumableUploaderTestCase(unittest.TestCase): - mime_type = "text/plain" params = {'x:a': 'a'} q = Auth(access_key, secret_key) @@ -411,7 +408,8 @@ def test_put_stream(self): size = os.stat(localfile).st_size with open(localfile, 'rb') as input_stream: token = self.q.upload_token(bucket_name, key) - ret, info = put_stream(token, key, input_stream, os.path.basename(__file__), size, self.params, self.mime_type) + ret, info = put_stream(token, key, input_stream, os.path.basename(__file__), size, self.params, + self.mime_type) print(info) assert ret['key'] == key @@ -438,13 +436,12 @@ def test_retry(self): class DownloadTestCase(unittest.TestCase): - q = Auth(access_key, secret_key) def test_private_url(self): private_bucket = 'private-res' private_key = 'gogopher.jpg' - base_url = 'http://%s/%s' % (private_bucket+'.qiniudn.com', private_key) + base_url = 'http://%s/%s' % (private_bucket + '.qiniudn.com', private_key) private_url = self.q.private_download_url(base_url, expires=3600) print(private_url) r = requests.get(private_url) @@ -469,11 +466,13 @@ def test_zero_size(self): hash = etag("x") assert hash == 'Fto5o-5ea0sNMlW_75VgGJCv2AcJ' remove_temp_file("x") + def test_small_size(self): localfile = create_temp_file(1024 * 1024) hash = etag(localfile) assert hash == 'FnlAdmDasGTQOIgrU1QIZaGDv_1D' remove_temp_file(localfile) + def test_large_size(self): localfile = create_temp_file(4 * 1024 * 1024 + 1) hash = etag(localfile) @@ -489,5 +488,6 @@ def __init__(self, str): def read(self): print(self.str) + if __name__ == '__main__': unittest.main()