diff --git a/lib/ldb-samba/pyldb.c b/lib/ldb-samba/pyldb.c index 0cf550bc7a5b..397e11062b6e 100644 --- a/lib/ldb-samba/pyldb.c +++ b/lib/ldb-samba/pyldb.c @@ -30,6 +30,7 @@ #include "lib/ldb-samba/ldif_handlers.h" #include "auth/pyauth.h" #include "source4/dsdb/common/util.h" +#include "lib/ldb/include/ldb_private.h" static PyObject *pyldb_module; @@ -202,6 +203,29 @@ static PyObject *py_ldb_set_session_info(PyObject *self, PyObject *args) Py_RETURN_NONE; } +static PyObject *py_ldb_samba_schema_attribute_add(PyLdbObject *self, + PyObject *args) +{ + char *attribute, *syntax; + const struct ldb_schema_syntax *s; + unsigned int flags; + int ret; + struct ldb_context *ldb_ctx; + + if (!PyArg_ParseTuple(args, "sIs", &attribute, &flags, &syntax)) + return NULL; + + ldb_ctx = pyldb_Ldb_AsLdbContext(self); + + s = ldb_samba_syntax_by_name(ldb_ctx, syntax); + ret = ldb_schema_attribute_add_with_syntax(ldb_ctx, attribute, + flags, s); + + PyErr_LDB_ERROR_IS_ERR_RAISE(py_ldb_error, ret, ldb_ctx); + + Py_RETURN_NONE; +} + static PyObject *py_ldb_register_samba_handlers(PyObject *self) { struct ldb_context *ldb; @@ -237,6 +261,9 @@ static PyMethodDef py_samba_ldb_methods[] = { { "set_session_info", (PyCFunction)py_ldb_set_session_info, METH_VARARGS, "set_session_info(session_info)\n" "Set session info to use when connecting." }, + { "samba_schema_attribute_add", + (PyCFunction)py_ldb_samba_schema_attribute_add, + METH_VARARGS, NULL }, { NULL }, }; @@ -255,6 +282,7 @@ static PyTypeObject PySambaLdb = { .tp_flags = Py_TPFLAGS_DEFAULT|Py_TPFLAGS_BASETYPE, }; + MODULE_INIT_FUNC(_ldb) { PyObject *m; @@ -283,5 +311,8 @@ MODULE_INIT_FUNC(_ldb) Py_INCREF(&PySambaLdb); PyModule_AddObject(m, "Ldb", (PyObject *)&PySambaLdb); +#define ADD_LDB_STRING(val) PyModule_AddStringConstant(m, #val, LDB_## val) + ADD_LDB_STRING(SYNTAX_SAMBA_INT32); + return m; } diff --git a/lib/ldb-samba/tests/index.py b/lib/ldb-samba/tests/index.py new file mode 100644 index 000000000000..764cb2cf3ac1 --- /dev/null +++ b/lib/ldb-samba/tests/index.py @@ -0,0 +1,149 @@ +#!/usr/bin/env python3 +# +# Tests for comparison expressions on indexed keys +# +# Copyright (C) Andrew Bartlett 2019 +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +# +"""Tests for expressions containing comparisons on indexed attributes. + Copied from ldb's index.py""" + +import os +from unittest import TestCase +import sys +from samba import _ldb +import shutil +from ldb import SCOPE_SUBTREE +from samba.tests.subunitrun import TestProgram + +PY3 = sys.version_info > (3, 0) + +TDB_PREFIX = "tdb://" +MDB_PREFIX = "mdb://" + +def tempdir(): + import tempfile + try: + dir_prefix = os.path.join(os.environ["SELFTEST_PREFIX"], "tmp") + except KeyError: + dir_prefix = None + return tempfile.mkdtemp(dir=dir_prefix) + +class LdbBaseTest(TestCase): + def setUp(self): + super(LdbBaseTest, self).setUp() + try: + if self.prefix is None: + self.prefix = TDB_PREFIX + except AttributeError: + self.prefix = TDB_PREFIX + + def tearDown(self): + super(LdbBaseTest, self).tearDown() + + def url(self): + return self.prefix + self.filename + + def flags(self): + if self.prefix == MDB_PREFIX: + return ldb.FLG_NOSYNC + else: + return 0 + + def options(self): + if self.prefix == MDB_PREFIX: + return ['disable_full_db_scan_for_self_test:1'] + else: + return None + +class LdbTDBIndexedComparisonExpressions(LdbBaseTest): + def tearDown(self): + shutil.rmtree(self.testdir) + super(LdbTDBIndexedComparisonExpressions, self).tearDown() + + # Ensure the LDB is closed now, so we close the FD + del(self.l) + + def setUp(self): + super(LdbTDBIndexedComparisonExpressions, self).setUp() + self.testdir = tempdir() + self.filename = os.path.join(self.testdir, "indexedcomptest.ldb") + # Note that the maximum key length is set to 54 + # This accounts for the 4 bytes added by the dn formatting + # a leading dn=, and a trailing zero terminator + # + self.l = _ldb.Ldb(self.url(), options=self.options()) + self.l.add({"dn": "@ATTRIBUTES"}) + self.l.add({"dn": "@INDEXLIST", + "@IDXATTR": [b"int32attr"], + "@IDXONE": [b"1"], + "@IDXGUID": [b"objectUUID"], + "@IDX_DN_GUID": [b"GUID"]}) + + def test_comparison_expression(self): + self.l.samba_schema_attribute_add("int32attr", 0, + _ldb.SYNTAX_SAMBA_INT32) + + int32_max = 2**31-1 + int32_min = -2**31 + test_nums = list(range(-5, 5)) + test_nums += list(range(int32_max-5, int32_max+1)) + test_nums += list(range(int32_min, int32_min+5)) + test_nums = sorted(test_nums) + + for i in test_nums: + ouuid = 0x0123456789abcdef + i + ouuid_s = bytes(('0' + hex(ouuid)[2:]).encode()) + self.l.add({"dn": "OU=COMPTESTOU{},DC=SAMBA,DC=ORG".format(i), + "objectUUID": ouuid_s, + "int32attr": str(i)}) + + def assert_int32_expr(expr, py_expr=None): + res = self.l.search(base="DC=SAMBA,DC=ORG", + scope=SCOPE_SUBTREE, + expression="(int32attr%s)" % (expr)) + + if not py_expr: + py_expr = expr + expect = [n for n in test_nums if eval(str(n) + py_expr)] + vals = sorted([int(r.get("int32attr")[0]) for r in res]) + self.assertEqual(len(res), len(expect)) + self.assertEqual(set(vals), set(expect)) + self.assertEqual(expect, vals) + + assert_int32_expr(">=-2") + assert_int32_expr("<=2") + assert_int32_expr(">=" + str(int32_min)) + assert_int32_expr("<=" + str(int32_min)) + assert_int32_expr("<=" + str(int32_min+1)) + assert_int32_expr("<=" + str(int32_max)) + assert_int32_expr(">=" + str(int32_max)) + assert_int32_expr(">=" + str(int32_max-1)) + assert_int32_expr("=10", "==10") + +# Run the same tests against an lmdb backend +class LdbLMDBIndexedComparisonExpressions(LdbTDBIndexedComparisonExpressions): + + def setUp(self): + if os.environ.get('HAVE_LMDB', '1') == '0': + self.skipTest("No lmdb backend") + self.prefix = MDB_PREFIX + super(LdbLMDBIndexedComparisonExpressions, self).setUp() + + def tearDown(self): + super(LdbLMDBIndexedComparisonExpressions, self).tearDown() + + +TestProgram(module=__name__, opts=[]) diff --git a/lib/ldb/tests/python/index.py b/lib/ldb/tests/python/index.py index 48dbaccfa56f..4509d94d11d4 100755 --- a/lib/ldb/tests/python/index.py +++ b/lib/ldb/tests/python/index.py @@ -1295,6 +1295,89 @@ def tearDown(self): super(MaxIndexKeyLengthTestsLmdb, self).tearDown() +class OrderedIntegerRangeTests(LdbBaseTest): + + def tearDown(self): + shutil.rmtree(self.testdir) + super(OrderedIntegerRangeTests, self).tearDown() + + # Ensure the LDB is closed now, so we close the FD + del(self.l) + + def setUp(self): + super(OrderedIntegerRangeTests, self).setUp() + self.testdir = tempdir() + self.filename = os.path.join(self.testdir, "ordered_integer_test.ldb") + + self.l = ldb.Ldb(self.url(), + options=self.options()) + self.l.add({"dn": "@ATTRIBUTES", + "int64attr": "ORDERED_INTEGER"}) + self.l.add({"dn": "@INDEXLIST", + "@IDXATTR": [b"int64attr"], + "@IDXONE": [b"1"], + "@IDXGUID": [b"objectUUID"], + "@IDX_DN_GUID": [b"GUID"]}) + + def options(self): + if self.prefix == MDB_PREFIX: + return ['modules:rdn_name', + 'disable_full_db_scan_for_self_test:1'] + else: + return ['modules:rdn_name'] + + def test_comparison_expression(self): + int64_max = 2**63-1 + int64_min = -2**63 + test_nums = list(range(-5, 5)) + test_nums += list(range(int64_max-5, int64_max+1)) + test_nums += list(range(int64_min, int64_min+5)) + test_nums = sorted(test_nums) + + for (i, num) in enumerate(test_nums): + ouuid = 0x0123456789abcdef + i + ouuid_s = bytes(('0' + hex(ouuid)[2:]).encode()) + self.l.add({"dn": "OU=COMPTESTOU{},DC=SAMBA,DC=ORG".format(i), + "objectUUID": ouuid_s, + "int64attr": str(num)}) + + def assert_int64_expr(expr, py_expr=None): + res = self.l.search(base="DC=SAMBA,DC=ORG", + scope=ldb.SCOPE_SUBTREE, + expression="(int64attr%s)" % (expr)) + + if not py_expr: + py_expr = expr + expect = [n for n in test_nums if eval(str(n) + py_expr)] + vals = sorted([int(r.get("int64attr")[0]) for r in res]) + self.assertEqual(len(res), len(expect)) + self.assertEqual(set(vals), set(expect)) + self.assertEqual(expect, vals) + + assert_int64_expr(">=-2") + assert_int64_expr("<=2") + assert_int64_expr(">=" + str(int64_min)) + assert_int64_expr("<=" + str(int64_min)) + assert_int64_expr("<=" + str(int64_min+1)) + assert_int64_expr("<=" + str(int64_max)) + assert_int64_expr(">=" + str(int64_max)) + assert_int64_expr(">=" + str(int64_max-1)) + assert_int64_expr("=10", "==10") + + +# Run the ordered integer range tests against an lmdb backend +class OrderedIntegerRangeTestsLmdb(OrderedIntegerRangeTests): + + def setUp(self): + if os.environ.get('HAVE_LMDB', '1') == '0': + self.skipTest("No lmdb backend") + self.prefix = MDB_PREFIX + super(OrderedIntegerRangeTestsLmdb, self).setUp() + + def tearDown(self): + super(OrderedIntegerRangeTestsLmdb, self).tearDown() + + # Run the index truncation tests against an lmdb backend class RejectSubDBIndex(LdbBaseTest): diff --git a/python/samba/tests/complex_expressions.py b/python/samba/tests/complex_expressions.py index 75ea61f12d55..d88f296be283 100644 --- a/python/samba/tests/complex_expressions.py +++ b/python/samba/tests/complex_expressions.py @@ -189,6 +189,16 @@ def test_int_range(self, field=None): py_expr = "%d <= {%s} <= %d" % (n-1, field, n+1) self.assertLDAPQuery(expr, ou_dn, py_expr, ldap_objects) + half_n = int(n/2) + + expr = "(%s<=%s)" % (field, half_n) + py_expr = "{%s} <= %d" % (field, half_n) + self.assertLDAPQuery(expr, ou_dn, py_expr, ldap_objects) + + expr = "(%s>=%s)" % (field, half_n) + py_expr = "{%s} >= %d" % (field, half_n) + self.assertLDAPQuery(expr, ou_dn, py_expr, ldap_objects) + # Same test again for largeint and enum def test_largeint_range(self): self.test_int_range(self.largeint_f) @@ -455,6 +465,11 @@ def test_usnchanged(self): py_expr = "{%s} <= %d" % (field, search_num) self.assertLDAPQuery(expr, ou_dn, py_expr, ldap_objects) + expr = "(&(%s>=%d)(objectClass=user))" % (field, search_num) + py_expr = "{%s} >= %d" % (field, search_num) + self.assertLDAPQuery(expr, ou_dn, py_expr, ldap_objects) + + # If we're called independently then import subunit, get host from first # arg and run. Otherwise, subunit ran us so just set host from env. # We always try to run over LDAP rather than direct file, so that diff --git a/source4/selftest/tests.py b/source4/selftest/tests.py index 7e0ce96c45c8..76655d768f00 100755 --- a/source4/selftest/tests.py +++ b/source4/selftest/tests.py @@ -847,6 +847,7 @@ def planoldpythontestsuite(env, module, name=None, extra_path=[], environ={}, ex plantestsuite_loadlist("samba4.urgent_replication.python(ad_dc_ntvfs)", "ad_dc_ntvfs:local", [python, os.path.join(samba4srcdir, "dsdb/tests/python/urgent_replication.py"), '$PREFIX_ABS/ad_dc_ntvfs/private/sam.ldb', '$LOADLIST', '$LISTOPT']) plantestsuite_loadlist("samba4.ldap.dirsync.python(ad_dc_default)", "ad_dc_default", [python, os.path.join(samba4srcdir, "dsdb/tests/python/dirsync.py"), '$SERVER', '-U"$USERNAME%$PASSWORD"', '--workgroup=$DOMAIN', '$LOADLIST', '$LISTOPT']) plantestsuite_loadlist("samba4.ldap.match_rules.python", "ad_dc_ntvfs", [python, os.path.join(srcdir(), "lib/ldb-samba/tests/match_rules.py"), '$PREFIX_ABS/ad_dc_ntvfs/private/sam.ldb', '-U"$USERNAME%$PASSWORD"', '--workgroup=$DOMAIN', '$LOADLIST', '$LISTOPT']) +plantestsuite("samba4.ldap.index.python", "none", [python, os.path.join(srcdir(), "lib/ldb-samba/tests/index.py")]) plantestsuite_loadlist("samba4.ldap.notification.python(ad_dc_ntvfs)", "ad_dc_ntvfs", [python, os.path.join(samba4srcdir, "dsdb/tests/python/notification.py"), '$SERVER', '-U"$USERNAME%$PASSWORD"', '--workgroup=$DOMAIN', '$LOADLIST', '$LISTOPT']) plantestsuite_loadlist("samba4.ldap.sites.python(ad_dc_default)", "ad_dc_default", [python, os.path.join(samba4srcdir, "dsdb/tests/python/sites.py"), '$SERVER', '-U"$USERNAME%$PASSWORD"', '--workgroup=$DOMAIN', '$LOADLIST', '$LISTOPT'])