Skip to content

Commit

Permalink
s4-dsdb-tests: Some tests for deleted objects undelete operation
Browse files Browse the repository at this point in the history
Based on MS-ADTS 3.1.1.5.3.7.2

Signed-off-by: Nadezhda Ivanova <[email protected]>
Reviewed-by: Andrew Bartlett <[email protected]>
Reviewed-by: Garming Sam <[email protected]>

Change-Id: I650b315601fce574f9302435f812d1dd4b177e68
  • Loading branch information
nivanova-symas authored and abartlet committed Feb 3, 2015
1 parent bba753b commit b881da6
Showing 1 changed file with 198 additions and 5 deletions.
203 changes: 198 additions & 5 deletions source4/dsdb/tests/python/deletetest.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@
import samba.getopt as options

from samba.auth import system_session
from ldb import SCOPE_BASE, LdbError
from ldb import ERR_NO_SUCH_OBJECT, ERR_NOT_ALLOWED_ON_NON_LEAF
from ldb import ERR_UNWILLING_TO_PERFORM
from ldb import SCOPE_BASE, LdbError, Message, MessageElement, Dn, FLAG_MOD_ADD, FLAG_MOD_DELETE, FLAG_MOD_REPLACE
from ldb import ERR_NO_SUCH_OBJECT, ERR_NOT_ALLOWED_ON_NON_LEAF, ERR_ENTRY_ALREADY_EXISTS, ERR_ATTRIBUTE_OR_VALUE_EXISTS
from ldb import ERR_UNWILLING_TO_PERFORM, ERR_OPERATIONS_ERROR
from samba.samdb import SamDB
from samba.tests import delete_force

Expand All @@ -39,13 +39,13 @@
lp = sambaopts.get_loadparm()
creds = credopts.get_credentials(lp)

class BasicDeleteTests(samba.tests.TestCase):
class BaseDeleteTests(samba.tests.TestCase):

def GUID_string(self, guid):
return self.ldb.schema_format_value("objectGUID", guid)

def setUp(self):
super(BasicDeleteTests, self).setUp()
super(BaseDeleteTests, self).setUp()
self.ldb = SamDB(host, credentials=creds, session_info=system_session(lp), lp=lp)

self.base_dn = self.ldb.domain_dn()
Expand All @@ -69,6 +69,12 @@ def search_dn(self,dn):
self.assertEquals(len(res), 1)
return res[0]


class BasicDeleteTests(BaseDeleteTests):

def setUp(self):
super(BasicDeleteTests, self).setUp()

def del_attr_values(self, delObj):
print "Checking attributes for %s" % delObj["dn"]

Expand Down Expand Up @@ -373,6 +379,193 @@ def test_all(self):
self.assertFalse("CN=Deleted Objects" in str(objDeleted6.dn))
self.assertFalse("CN=Deleted Objects" in str(objDeleted7.dn))

class BasicUndeleteTests(BaseDeleteTests):

def setUp(self):
super(BasicUndeleteTests, self).setUp()

def enable_recycle_bin(self):
msg = Message()
msg.dn = Dn(ldb, "")
msg["enableOptionalFeature"] = MessageElement(
"CN=Partitions," + self.configuration_dn + ":766ddcd8-acd0-445e-f3b9-a7f9b6744f2a",
FLAG_MOD_ADD, "enableOptionalFeature")
try:
ldb.modify(msg)
except LdbError, (num, _):
self.assertEquals(num, ERR_ATTRIBUTE_OR_VALUE_EXISTS)

def get_ldb_connection(self, target_username, target_password):
creds_tmp = Credentials()
creds_tmp.set_username(target_username)
creds_tmp.set_password(target_password)
creds_tmp.set_domain(creds.get_domain())
creds_tmp.set_realm(creds.get_realm())
creds_tmp.set_workstation(creds.get_workstation())
creds_tmp.set_gensec_features(creds_tmp.get_gensec_features()
| gensec.FEATURE_SEAL)
creds_tmp.set_kerberos_state(DONT_USE_KERBEROS) # kinit is too expensive to use in a tight loop
ldb_target = SamDB(url=ldaphost, credentials=creds_tmp, lp=lp)
return ldb_target

def undelete_deleted(self, olddn, newdn, samldb):
msg = Message()
msg.dn = Dn(ldb, olddn)
msg["isDeleted"] = MessageElement([], FLAG_MOD_DELETE, "isDeleted")
msg["distinguishedName"] = MessageElement([newdn], FLAG_MOD_REPLACE, "distinguishedName")
res = samldb.modify(msg, ["show_deleted:1"])

def undelete_deleted_with_mod(self, olddn, newdn):
msg = Message()
msg.dn = Dn(ldb, olddn)
msg["isDeleted"] = MessageElement([], FLAG_MOD_DELETE, "isDeleted")
msg["distinguishedName"] = MessageElement([newdn], FLAG_MOD_REPLACE, "distinguishedName")
msg["url"] = MessageElement(["www.samba.org"], FLAG_MOD_REPLACE, "url")
res = ldb.modify(msg, ["show_deleted:1"])


def test_undelete(self):
print "Testing standard undelete operation"
usr1="cn=testuser,cn=users," + self.base_dn
delete_force(self.ldb, usr1)
ldb.add({
"dn": usr1,
"objectclass": "user",
"description": "test user description",
"samaccountname": "testuser"})
objLive1 = self.search_dn(usr1)
guid1=objLive1["objectGUID"][0]
ldb.delete(usr1)
objDeleted1 = self.search_guid(guid1)
self.undelete_deleted(str(objDeleted1.dn), usr1, ldb)
objLive2 = self.search_dn(usr1)
self.assertEqual(str(objLive2.dn),str(objLive1.dn))
delete_force(self.ldb, usr1)

def test_rename(self):
print "Testing attempt to rename deleted object"
usr1="cn=testuser,cn=users," + self.base_dn
ldb.add({
"dn": usr1,
"objectclass": "user",
"description": "test user description",
"samaccountname": "testuser"})
objLive1 = self.search_dn(usr1)
guid1=objLive1["objectGUID"][0]
ldb.delete(usr1)
objDeleted1 = self.search_guid(guid1)
#just to make sure we get the correct error if the show deleted is missing
try:
ldb.rename(str(objDeleted1.dn), usr1)
self.fail()
except LdbError, (num, _):
self.assertEquals(num,ERR_NO_SUCH_OBJECT)

try:
ldb.rename(str(objDeleted1.dn), usr1, ["show_deleted:1"])
self.fail()
except LdbError, (num, _):
self.assertEquals(num,ERR_UNWILLING_TO_PERFORM)

def test_undelete_with_mod(self):
print "Testing standard undelete operation with modification of additional attributes"
usr1="cn=testuser,cn=users," + self.base_dn
ldb.add({
"dn": usr1,
"objectclass": "user",
"description": "test user description",
"samaccountname": "testuser"})
objLive1 = self.search_dn(usr1)
guid1=objLive1["objectGUID"][0]
ldb.delete(usr1)
objDeleted1 = self.search_guid(guid1)
self.undelete_deleted_with_mod(str(objDeleted1.dn), usr1)
objLive2 = self.search_dn(usr1)
self.assertEqual(objLive2["url"][0],"www.samba.org")
delete_force(self.ldb, usr1)

def test_undelete_newuser(self):
print "Testing undelete user with a different dn"
usr1="cn=testuser,cn=users," + self.base_dn
usr2="cn=testuser2,cn=users," + self.base_dn
delete_force(self.ldb, usr1)
ldb.add({
"dn": usr1,
"objectclass": "user",
"description": "test user description",
"samaccountname": "testuser"})
objLive1 = self.search_dn(usr1)
guid1=objLive1["objectGUID"][0]
ldb.delete(usr1)
objDeleted1 = self.search_guid(guid1)
self.undelete_deleted(str(objDeleted1.dn), usr2, ldb)
objLive2 = self.search_dn(usr2)
delete_force(self.ldb, usr1)
delete_force(self.ldb, usr2)

def test_undelete_existing(self):
print "Testing undelete user after a user with the same dn has been created"
usr1="cn=testuser,cn=users," + self.base_dn
ldb.add({
"dn": usr1,
"objectclass": "user",
"description": "test user description",
"samaccountname": "testuser"})
objLive1 = self.search_dn(usr1)
guid1=objLive1["objectGUID"][0]
ldb.delete(usr1)
ldb.add({
"dn": usr1,
"objectclass": "user",
"description": "test user description",
"samaccountname": "testuser"})
objDeleted1 = self.search_guid(guid1)
try:
self.undelete_deleted(str(objDeleted1.dn), usr1, ldb)
self.fail()
except LdbError, (num, _):
self.assertEquals(num,ERR_ENTRY_ALREADY_EXISTS)

def test_undelete_cross_nc(self):
print "Cross NC undelete"
c1 = "cn=ldaptestcontainer," + self.base_dn;
c2 = "cn=ldaptestcontainer2," + self.configuration_dn
c3 = "cn=ldaptestcontainer," + self.configuration_dn
c4 = "cn=ldaptestcontainer2," + self.base_dn;
ldb.add({
"dn": c1,
"objectclass": "container"})
ldb.add({
"dn": c2,
"objectclass": "container"})
objLive1 = self.search_dn(c1)
objLive2 = self.search_dn(c2)
guid1=objLive1["objectGUID"][0]
guid2=objLive2["objectGUID"][0]
ldb.delete(c1)
ldb.delete(c2)
objDeleted1 = self.search_guid(guid1)
objDeleted2 = self.search_guid(guid2)
#try to undelete from base dn to config
try:
self.undelete_deleted(str(objDeleted1.dn), c3, ldb)
self.fail()
except LdbError, (num, _):
self.assertEquals(num, ERR_OPERATIONS_ERROR)
#try to undelete from config to base dn
try:
self.undelete_deleted(str(objDeleted2.dn), c4, ldb)
self.fail()
except LdbError, (num, _):
self.assertEquals(num, ERR_OPERATIONS_ERROR)
#assert undeletion will work in same nc
self.undelete_deleted(str(objDeleted1.dn), c4, ldb)
self.undelete_deleted(str(objDeleted2.dn), c3, ldb)
delete_force(self.ldb, c3)
delete_force(self.ldb, c4)



if not "://" in host:
if os.path.isfile(host):
host = "tdb://%s" % host
Expand Down

0 comments on commit b881da6

Please sign in to comment.