s4 python: Add functions to samdb to manipulate version of replPropertyMetaData attribute
authorMatthieu Patou <mat@matws.net>
Tue, 22 Jun 2010 16:03:15 +0000 (20:03 +0400)
committerAndrew Bartlett <abartlet@samba.org>
Thu, 15 Jul 2010 12:08:20 +0000 (22:08 +1000)
This change contains also helpers for attribute id to attribute oid
conversion and from attribute id to attribute name.
It brings also unit tests

Signed-off-by: Andrew Bartlett <abartlet@samba.org>
source4/scripting/python/samba/samdb.py
source4/scripting/python/samba/tests/dsdb.py

index f810926710d2a72849c87cdf65d5cc3e64b579ca..f358747b5196ba1ce86d10ab4d1500fd420a163c 100644 (file)
@@ -28,12 +28,16 @@ import samba
 import ldb
 import time
 import base64
+from samba.ndr import ndr_unpack, ndr_pack
+from samba.dcerpc import drsblobs, misc
 
 __docformat__ = "restructuredText"
 
 class SamDB(samba.Ldb):
     """The SAM database."""
 
+    hash_oid_name = {}
+
     def __init__(self, url=None, lp=None, modules_dir=None, session_info=None,
                  credentials=None, flags=0, options=None, global_schema=True,
                  auto_connect=True, am_rodc=False):
@@ -470,5 +474,104 @@ accountExpires: %u
     def set_schema_from_ldb(self, ldb):
         dsdb._dsdb_set_schema_from_ldb(self, ldb)
 
+    def get_attribute_from_attid(self, attid):
+        """ Get from an attid the associated attribute
+
+           :param attid: The attribute id for searched attribute
+           :return: The name of the attribute associated with this id
+        """
+        if len(self.hash_oid_name.keys()) == 0:
+            self._populate_oid_attid()
+        if self.hash_oid_name.has_key(self.get_oid_from_attid(attid)):
+            return self.hash_oid_name[self.get_oid_from_attid(attid)]
+        else:
+            return None
+
+
+    def _populate_oid_attid(self):
+        """Populate the hash hash_oid_name
+
+           This hash contains the oid of the attribute as a key and
+           its display name as a value
+        """
+        self.hash_oid_name = {}
+        res = self.search(expression="objectClass=attributeSchema",
+                           controls=["search_options:1:2"],
+                           attrs=["attributeID",
+                           "lDAPDisplayName"])
+        if len(res) > 0:
+            for e in res:
+                strDisplay = str(e.get("lDAPDisplayName"))
+                self.hash_oid_name[str(e.get("attributeID"))] = strDisplay
+
+
+    def get_attribute_replmetadata_version(self, dn, att):
+        """ Get the version field trom the replPropertyMetaData for
+            the given field
+
+           :param dn: The on which we want to get the version
+           :param att: The name of the attribute
+           :return: The value of the version field in the replPropertyMetaData
+             for the given attribute. None if the attribute is not replicated
+        """
+
+        res = self.search(expression="dn=%s" % dn,
+                            scope=ldb.SCOPE_SUBTREE,
+                            controls=["search_options:1:2"],
+                            attrs=["replPropertyMetaData"])
+        if len(res) == 0:
+            return None
+
+        repl = ndr_unpack(drsblobs.replPropertyMetaDataBlob,
+                            str(res[0]["replPropertyMetaData"]))
+        ctr = repl.ctr
+        if len(self.hash_oid_name.keys()) == 0:
+            self._populate_oid_attid()
+        for o in ctr.array:
+            # Search for Description
+            att_oid = self.get_oid_from_attid(o.attid)
+            if self.hash_oid_name.has_key(att_oid) and\
+               att.lower() == self.hash_oid_name[att_oid].lower():
+                return o.version
+        return None
+
+
+    def set_attribute_replmetadata_version(self, dn, att, value):
+        res = self.search(expression="dn=%s" % dn,
+                            scope=ldb.SCOPE_SUBTREE,
+                            controls=["search_options:1:2"],
+                            attrs=["replPropertyMetaData"])
+        if len(res) == 0:
+            return None
+
+        repl = ndr_unpack(drsblobs.replPropertyMetaDataBlob,
+                            str(res[0]["replPropertyMetaData"]))
+        ctr = repl.ctr
+        now = samba.unix2nttime(int(time.time()))
+        found = False
+        if len(self.hash_oid_name.keys()) == 0:
+            self._populate_oid_attid()
+        for o in ctr.array:
+            # Search for Description
+            att_oid = self.get_oid_from_attid(o.attid)
+            if self.hash_oid_name.has_key(att_oid) and\
+               att.lower() == self.hash_oid_name[att_oid].lower():
+                found = True
+                seq = self.sequence_number(ldb.SEQ_NEXT)
+                o.version = value
+                o.originating_change_time = now
+                o.originating_invocation_id = misc.GUID(self.get_invocation_id())
+                o.originating_usn = seq
+                o.local_usn = seq
+        if found :
+            replBlob = ndr_pack(repl)
+            msg = ldb.Message()
+            msg.dn = res[0].dn
+            msg["replPropertyMetaData"] = ldb.MessageElement(replBlob,
+                                                ldb.FLAG_MOD_REPLACE,
+                                                "replPropertyMetaData")
+            self.modify(msg, ["local_oid:1.3.6.1.4.1.7165.4.3.14:0"])
+
+
     def write_prefixes_from_schema(self):
         dsdb._dsdb_write_prefixes_from_schema_to_ldb(self)
index d28be82984752cded4e00def5639bf7ec9d56c9a..4a50c96e256678f08eff8e7cc70a09b6d2f71b85 100644 (file)
@@ -25,7 +25,7 @@ from samba.ndr import ndr_unpack, ndr_pack
 from samba.dcerpc import drsblobs
 import ldb
 import os
-import samba.dsdb
+import samba
 
 
 class DsdbTests(TestCase):
@@ -107,3 +107,27 @@ class DsdbTests(TestCase):
         msg.dn = res[0].dn
         msg["replPropertyMetaData"] = ldb.MessageElement(replBlob, ldb.FLAG_MOD_REPLACE, "replPropertyMetaData")
         self.samdb.modify(msg, ["local_oid:1.3.6.1.4.1.7165.4.3.14:0"])
+
+    def test_ok_get_attribute_from_attid(self):
+        self.assertEquals(self.samdb.get_attribute_from_attid(13), "description")
+
+    def test_ko_get_attribute_from_attid(self):
+        self.assertEquals(self.samdb.get_attribute_from_attid(11979), None)
+
+    def test_get_attribute_replmetadata_version(self):
+        res = self.samdb.search(expression="cn=Administrator",
+                            scope=ldb.SCOPE_SUBTREE,
+                            attrs=["dn"])
+        self.assertEquals(len(res), 1)
+        dn = str(res[0].dn)
+        self.assertEqual(self.samdb.get_attribute_replmetadata_version(dn, "unicodePwd"), 1)
+
+    def test_set_attribute_replmetadata_version(self):
+        res = self.samdb.search(expression="cn=Administrator",
+                            scope=ldb.SCOPE_SUBTREE,
+                            attrs=["dn"])
+        self.assertEquals(len(res), 1)
+        dn = str(res[0].dn)
+        version = self.samdb.get_attribute_replmetadata_version(dn, "description")
+        self.samdb.set_attribute_replmetadata_version(dn, "description", version + 2)
+        self.assertEqual(self.samdb.get_attribute_replmetadata_version(dn, "description"), version + 2)