Skip to content

Commit

Permalink
Merge pull request #62 from parthsompura/master
Browse files Browse the repository at this point in the history
Added utility to parse and compare the xattrs along with test cases to verify setxattr()
  • Loading branch information
hamano authored Dec 30, 2022
2 parents 7c6ca8d + 5968d8e commit 070a94f
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 18 deletions.
53 changes: 35 additions & 18 deletions nosetests_/test_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import smbc
import settings
import nose
from smbc import xattr
from nose.plugins.skip import SkipTest


Expand All @@ -15,15 +16,16 @@
'SERVER' : smbc.SERVER,
'FILE_SHARE' : smbc.FILE_SHARE,
'PRINTER_SHARE' : smbc.PRINTER_SHARE,
'IPC_SHARE' : smbc.IPC_SHARE,
'IPC_SHARE' : smbc.IPC_SHARE,

smbc.WORKGROUP : 'WORKGROUP',
smbc.SERVER : 'SERVER',
smbc.FILE_SHARE : 'FILE_SHARE',
smbc.PRINTER_SHARE : 'PRINTER_SHARE',
smbc.IPC_SHARE : 'IPC_SHARE'
}


# another map for system errors TODO can you find them in another module?
EINVAL = 22

Expand All @@ -40,21 +42,21 @@ def tearDown():

def touch_file(name):
"""
create a file containing "sample test file" in the test baseurl
create a file containing "sample test file" in the test baseurl
"""
tmpfile_name = baseurl + name
dfile = ctx.open(tmpfile_name, os.O_CREAT | os.O_TRUNC | os.O_WRONLY)
dfile.write("sample test file")
dfile.close
return tmpfile_name


def test_xattr_constants():
assert smbc.XATTR_ACL
assert smbc.XATTR_OWNER
assert smbc.XATTR_ACL
assert smbc.XATTR_OWNER
assert smbc.XATTR_GROUP


def test_xattr_get():
"""
system.nt_sec_desc.<attribute name>
Expand Down Expand Up @@ -101,8 +103,8 @@ def test_xattr_get_error():
try:
ctx.getxattr("UNEXISTENT", smbc.XATTR_OWNER)
assert False, "getxattr should fail with an unexistent file"
except ValueError as e:
(errno,strerror) = e.args
except ValueError as e:
(errno,strerror) = e.args
assert errno == EINVAL # TODO is it possible to trap an unexistent entity error from smbclient?
pass

Expand All @@ -113,23 +115,24 @@ def test_xattr_get_error():
ctx.getxattr(furl, xattr)
assert False, "getxattr should fail with %s" % xattr
except ValueError as e:
(errno,strerror) = e.args
(errno,strerror) = e.args
assert errno == EINVAL # invalid arguments

ctx.open(furl)

@SkipTest
def test_xattr_set():
#raise SkipTest("xattr_set to be implemented")
data = {"revision" : "", "owner" : "", "group" : "", "acl" : {}}
print("test_xattr_put")
furl = touch_file("tmpfile_set.out")
attr_name = smbc.XATTR_ALL
attrs = ctx.getxattr(furl, attr_name)
xattr.parse_sec_desc(attrs, data)
attrs = xattr.convert_acl_hex_to_int(data)
print("attrs(%s): %s" % (attr_name, attrs))
ctx.setxattr(furl, attr_name, attrs, smbc.XATTR_FLAG_REPLACE)
attrs1 = ctx.getxattr(furl, attr_name)
print("attrs1(%s): %s" % (attr_name, attrs1))
assert attrs1 == attrs
assert xattr.compare_xattr(attrs, attrs1)

@SkipTest
def test_xattr_set_2():
Expand All @@ -152,24 +155,38 @@ def test_xattr_set_2():
print("attrs_1(%s): %s" % (attr_name, attrs_1))
assert attrs_1 == attrs_new

def test_xattr_set_3():
data = {"revision" : "", "owner" : "", "group" : "", "acl" : {}}
furl = touch_file("tmpfile_set.out")
attrs_new = "REVISION:1,OWNER:RPOLLI\\babel,GROUP:Unix Group\\babel,ACL:RPOLLI\\babel:0/0/0x001e01ff,ACL:Unix Group\\babel:0/0/0x00120089,ACL:Unix Group\\games:0/0/0x001e01ff,ACL:\\Everyone:0/0/0x00120089"
xattr.parse_sec_desc(attrs_new, data)
attrs_int = xattr.convert_acl_hex_to_int(data)
attr_name = smbc.XATTR_ALL_SID
attrs_0 = ctx.getxattr(furl, attr_name)
print("original attrs(%s)" % attrs_0)
ctx.setxattr(furl, attr_name, attrs_int, smbc.XATTR_FLAG_REPLACE)
attrs_1 = ctx.getxattr(furl, attr_name)
print("attrs_1(%s): %s" % (attr_name, attrs_1))
assert xattr.compare_xattr(attrs_int, attrs_1)

def test_xattr_set_error():
#raise SkipTest("xattr_set to be implemented")
print("test_xattr_set_error")
furl = touch_file("tmpfile_set.out")
attr_name = smbc.XATTR_ALL_SID
attrs_ok = ctx.getxattr(furl, attr_name)
attrs = "BAD_VALUE" # causes segfault
for xa in ["BAD_VALUE", u'REVISION:1,OWNER:RPOLLI\\babel,GROUP:', 0, None]:
for xa in ["BAD_VALUE", u'REVISION:1,OWNER:RPOLLI\\babel,GROUP:', 0, None]:
try:
ctx.setxattr(furl, attr_name, xa, smbc.XATTR_FLAG_REPLACE)
except ValueError as e:
(errno,strerror) = e.args
(errno,strerror) = e.args
assert errno == EINVAL # invalid arguments
print("setxattr(%s) raises %s" % (xa, e))
pass
except TypeError as e:
print("setxattr(%s) raises %s" % (xa, e))
pass
pass

@SkipTest
def test_Workgroup():
Expand All @@ -180,7 +197,7 @@ def test_Workgroup():

@SkipTest
def test_Server():
uri = 'smb://' + settings.WORKGROUP
uri = 'smb://' + settings.WORKGROUP
l_entries = ctx.opendir(uri).getdents()
assert(len(l_entries) > 0)
for entry in l_entries:
Expand Down
57 changes: 57 additions & 0 deletions smbc/xattr.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
w=0x00120116
r=0x00120089

ACL_KEYWORDS = {"revision" : "", "owner" : "", "group" : "", "acl" : ""}

class SmbAcl():
revision = None
owner = None
Expand Down Expand Up @@ -50,6 +52,61 @@ def get_target(acl_s):
def get_perm(acl_s):
return acl_s[acl_s.index(":")+1:]

def parse_sec_desc(raw, parsed):
""" Parses the raw security descriptor string into the expected
components and stores them in parsed """

parts = raw.split(",")
for part in parts:
key, value = part.split(":", 1)
key = key.lower().strip()
if key in ACL_KEYWORDS:
if key != "acl":
parsed[key] = value
else:
sid, perms = value.split(":", 1)
if sid not in parsed["acl"]:
parsed["acl"][sid] = []
parsed["acl"][sid].append(perms)

def convert_acl_hex_to_int(data):
""" Converts all ACL masks from hexadecimal to integer since
smbc_setxattr() expects integer format """

attrs_new = ""
if data.get("revision"):
attrs_new += "REVISION:" + data["revision"] + ","
if data.get("owner"):
attrs_new += "OWNER:" + data["owner"] + ","
if data.get("group"):
attrs_new += "GROUP:" + data["group"] + ","
if data.get("acl"):
for key in data["acl"].keys():
for i in range(len(data["acl"][key])):
attrs_new += "ACL:" + key + ":" + data["acl"][key][i].split("0x")[0] + str(int(data["acl"][key][i].split("/")[2], 0)) + ","

return attrs_new[:-1]

def compare_xattr(xattr_converted, xattr_current):
""" Compare the ACLs since the order might get altered by
smbc_getxattr() """

xattr_converted_data = {"revision" : "", "owner" : "", "group" : "", "acl" : {}}
xattr_current_data = {"revision" : "", "owner" : "", "group" : "", "acl" : {}}

parse_sec_desc(xattr_converted, xattr_converted_data)
parse_sec_desc(xattr_current, xattr_current_data)

for acl in xattr_current_data.get("acl"):
if xattr_converted_data.get("acl").get(acl):
for entry in xattr_current_data.get("acl").get(acl):
int_entry = entry.split("0x")[0] + str(int(entry.split("/")[2], 0))
if int_entry not in xattr_converted_data.get("acl").get(acl):
return False
else:
return False
return True

#
#import smbc
#
Expand Down

0 comments on commit 070a94f

Please sign in to comment.