linked attribute tests: fix logic for add test
[samba.git] / source4 / dsdb / tests / python / linked_attributes.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3 # Originally based on ./sam.py
4 import optparse
5 import sys
6 import os
7 import itertools
8
9 sys.path.insert(0, "bin/python")
10 import samba
11 from samba.tests.subunitrun import SubunitOptions, TestProgram
12
13 import samba.getopt as options
14
15 from samba.auth import system_session
16 import ldb
17 from samba.samdb import SamDB
18 from samba.dcerpc import misc
19
20 parser = optparse.OptionParser("linked_attributes.py [options] <host>")
21 sambaopts = options.SambaOptions(parser)
22 parser.add_option_group(sambaopts)
23 parser.add_option_group(options.VersionOptions(parser))
24 # use command line creds if available
25 credopts = options.CredentialsOptions(parser)
26 parser.add_option_group(credopts)
27 subunitopts = SubunitOptions(parser)
28 parser.add_option_group(subunitopts)
29
30 parser.add_option('--delete-in-setup', action='store_true',
31                   help="cleanup in setup")
32
33 parser.add_option('--no-cleanup', action='store_true',
34                   help="don't cleanup in teardown")
35
36 parser.add_option('--no-reveal-internals', action='store_true',
37                   help="Only use windows compatible ldap controls")
38
39 opts, args = parser.parse_args()
40
41 if len(args) < 1:
42     parser.print_usage()
43     sys.exit(1)
44
45 host = args[0]
46
47 lp = sambaopts.get_loadparm()
48 creds = credopts.get_credentials(lp)
49
50
51 class LATestException(Exception):
52     pass
53
54
55 class LATests(samba.tests.TestCase):
56
57     def setUp(self):
58         super(LATests, self).setUp()
59         self.samdb = SamDB(host, credentials=creds,
60                            session_info=system_session(lp), lp=lp)
61
62         self.base_dn = self.samdb.domain_dn()
63         self.ou = "OU=la,%s" % self.base_dn
64         if opts.delete_in_setup:
65             try:
66                 self.samdb.delete(self.ou, ['tree_delete:1'])
67             except ldb.LdbError, e:
68                 print "tried deleting %s, got error %s" % (self.ou, e)
69         self.samdb.add({'objectclass': 'organizationalUnit',
70                         'dn': self.ou})
71
72     def tearDown(self):
73         super(LATests, self).tearDown()
74         if not opts.no_cleanup:
75             self.samdb.delete(self.ou, ['tree_delete:1'])
76
77     def add_object(self, cn, objectclass, more_attrs={}):
78         dn = "CN=%s,%s" % (cn, self.ou)
79         attrs = {'cn': cn,
80                  'objectclass': objectclass,
81                  'dn': dn}
82         attrs.update(more_attrs)
83         self.samdb.add(attrs)
84
85         return dn
86
87     def add_objects(self, n, objectclass, prefix=None, more_attrs={}):
88         if prefix is None:
89             prefix = objectclass
90         dns = []
91         for i in range(n):
92             dns.append(self.add_object("%s%d" % (prefix, i + 1),
93                                        objectclass,
94                                        more_attrs=more_attrs))
95         return dns
96
97     def add_linked_attribute(self, src, dest, attr='member',
98                              controls=None):
99         m = ldb.Message()
100         m.dn = ldb.Dn(self.samdb, src)
101         m[attr] = ldb.MessageElement(dest, ldb.FLAG_MOD_ADD, attr)
102         self.samdb.modify(m, controls=controls)
103
104     def remove_linked_attribute(self, src, dest, attr='member',
105                                 controls=None):
106         m = ldb.Message()
107         m.dn = ldb.Dn(self.samdb, src)
108         m[attr] = ldb.MessageElement(dest, ldb.FLAG_MOD_DELETE, attr)
109         self.samdb.modify(m, controls=controls)
110
111     def replace_linked_attribute(self, src, dest, attr='member',
112                                  controls=None):
113         m = ldb.Message()
114         m.dn = ldb.Dn(self.samdb, src)
115         m[attr] = ldb.MessageElement(dest, ldb.FLAG_MOD_REPLACE, attr)
116         self.samdb.modify(m, controls=controls)
117
118     def attr_search(self, obj, attr, scope=ldb.SCOPE_BASE, **controls):
119         if opts.no_reveal_internals:
120             if 'reveal_internals' in controls:
121                 del controls['reveal_internals']
122
123         controls = ['%s:%d' % (k, int(v)) for k, v in controls.items()]
124
125         res = self.samdb.search(obj,
126                                 scope=scope,
127                                 attrs=[attr],
128                                 controls=controls)
129         return res
130
131     def assert_links(self, obj, expected, attr, msg='', **kwargs):
132         res = self.attr_search(obj, attr, **kwargs)
133
134         if len(expected) == 0:
135             if attr in res[0]:
136                 self.fail("found attr '%s' in %s" % (attr, res[0]))
137             return
138
139         try:
140             results = list([x[attr] for x in res][0])
141         except KeyError:
142             self.fail("missing attr '%s' on %s" % (attr, obj))
143
144         expected = sorted(expected)
145         results = sorted(results)
146
147         if expected != results:
148             print msg
149             print "expected %s" % expected
150             print "received %s" % results
151
152         self.assertEqual(results, expected)
153
154     def assert_back_links(self, obj, expected, attr='memberOf', **kwargs):
155         self.assert_links(obj, expected, attr=attr,
156                           msg='back links do not match', **kwargs)
157
158     def assert_forward_links(self, obj, expected, attr='member', **kwargs):
159         self.assert_links(obj, expected, attr=attr,
160                           msg='forward links do not match', **kwargs)
161
162     def get_object_guid(self, dn):
163         res = self.samdb.search(dn,
164                                 scope=ldb.SCOPE_BASE,
165                                 attrs=['objectGUID'])
166         return str(misc.GUID(res[0]['objectGUID'][0]))
167
168     def _test_la_backlinks(self, reveal=False):
169         tag = 'backlinks'
170         kwargs = {}
171         if reveal:
172             tag += '_reveal'
173             kwargs = {'reveal_internals': 0}
174
175         u1, u2 = self.add_objects(2, 'user', 'u_%s' % tag)
176         g1, g2 = self.add_objects(2, 'group', 'g_%s' % tag)
177
178         self.add_linked_attribute(g1, u1)
179         self.add_linked_attribute(g2, u1)
180         self.add_linked_attribute(g2, u2)
181
182         self.assert_back_links(u1, [g1, g2], **kwargs)
183         self.assert_back_links(u2, [g2], **kwargs)
184
185     def test_la_backlinks(self):
186         self._test_la_backlinks()
187
188     def test_la_backlinks_reveal(self):
189         if opts.no_reveal_internals:
190             print 'skipping because --no-reveal-internals'
191             return
192         self._test_la_backlinks(True)
193
194     def _test_la_backlinks_delete_group(self, reveal=False):
195         tag = 'del_group'
196         kwargs = {}
197         if reveal:
198             tag += '_reveal'
199             kwargs = {'reveal_internals': 0}
200
201         u1, u2 = self.add_objects(2, 'user', 'u_' + tag)
202         g1, g2 = self.add_objects(2, 'group', 'g_' + tag)
203
204         self.add_linked_attribute(g1, u1)
205         self.add_linked_attribute(g2, u1)
206         self.add_linked_attribute(g2, u2)
207
208         self.samdb.delete(g2, ['tree_delete:1'])
209
210         self.assert_back_links(u1, [g1], **kwargs)
211         self.assert_back_links(u2, set(), **kwargs)
212
213     def test_la_backlinks_delete_group(self):
214         self._test_la_backlinks_delete_group()
215
216     def test_la_backlinks_delete_group_reveal(self):
217         if opts.no_reveal_internals:
218             print 'skipping because --no-reveal-internals'
219             return
220         self._test_la_backlinks_delete_group(True)
221
222     def test_links_all_delete_group(self):
223         u1, u2 = self.add_objects(2, 'user', 'u_all_del_group')
224         g1, g2 = self.add_objects(2, 'group', 'g_all_del_group')
225         g2guid = self.get_object_guid(g2)
226
227         self.add_linked_attribute(g1, u1)
228         self.add_linked_attribute(g2, u1)
229         self.add_linked_attribute(g2, u2)
230
231         self.samdb.delete(g2)
232         self.assert_back_links(u1, [g1], show_deleted=1, show_recycled=1,
233                                show_deactivated_link=0)
234         self.assert_back_links(u2, set(), show_deleted=1, show_recycled=1,
235                                show_deactivated_link=0)
236         self.assert_forward_links(g1, [u1], show_deleted=1, show_recycled=1,
237                                   show_deactivated_link=0)
238         self.assert_forward_links('<GUID=%s>' % g2guid,
239                                   [], show_deleted=1, show_recycled=1,
240                                   show_deactivated_link=0)
241
242     def test_links_all_delete_group_reveal(self):
243         u1, u2 = self.add_objects(2, 'user', 'u_all_del_group_reveal')
244         g1, g2 = self.add_objects(2, 'group', 'g_all_del_group_reveal')
245         g2guid = self.get_object_guid(g2)
246
247         self.add_linked_attribute(g1, u1)
248         self.add_linked_attribute(g2, u1)
249         self.add_linked_attribute(g2, u2)
250
251         self.samdb.delete(g2)
252         self.assert_back_links(u1, [g1], show_deleted=1, show_recycled=1,
253                                show_deactivated_link=0,
254                                reveal_internals=0)
255         self.assert_back_links(u2, set(), show_deleted=1, show_recycled=1,
256                                show_deactivated_link=0,
257                                reveal_internals=0)
258         self.assert_forward_links(g1, [u1], show_deleted=1, show_recycled=1,
259                                   show_deactivated_link=0,
260                                   reveal_internals=0)
261         self.assert_forward_links('<GUID=%s>' % g2guid,
262                                   [], show_deleted=1, show_recycled=1,
263                                   show_deactivated_link=0,
264                                   reveal_internals=0)
265
266     def test_la_links_delete_link(self):
267         u1, u2 = self.add_objects(2, 'user', 'u_del_link')
268         g1, g2 = self.add_objects(2, 'group', 'g_del_link')
269
270         res = self.samdb.search(g1, scope=ldb.SCOPE_BASE,
271                                 attrs=['uSNChanged'])
272         old_usn1 = int(res[0]['uSNChanged'][0])
273
274         self.add_linked_attribute(g1, u1)
275
276         res = self.samdb.search(g1, scope=ldb.SCOPE_BASE,
277                                 attrs=['uSNChanged'])
278         new_usn1 = int(res[0]['uSNChanged'][0])
279
280         self.assertNotEqual(old_usn1, new_usn1, "USN should have incremented")
281
282         self.add_linked_attribute(g2, u1)
283         self.add_linked_attribute(g2, u2)
284
285         res = self.samdb.search(g2, scope=ldb.SCOPE_BASE,
286                                 attrs=['uSNChanged'])
287         old_usn2 = int(res[0]['uSNChanged'][0])
288
289         self.remove_linked_attribute(g2, u1)
290
291         res = self.samdb.search(g2, scope=ldb.SCOPE_BASE,
292                                 attrs=['uSNChanged'])
293         new_usn2 = int(res[0]['uSNChanged'][0])
294
295         self.assertNotEqual(old_usn2, new_usn2, "USN should have incremented")
296
297         self.assert_forward_links(g1, [u1])
298         self.assert_forward_links(g2, [u2])
299
300         self.add_linked_attribute(g2, u1)
301         self.assert_forward_links(g2, [u1, u2])
302         self.remove_linked_attribute(g2, u2)
303         self.assert_forward_links(g2, [u1])
304         self.remove_linked_attribute(g2, u1)
305         self.assert_forward_links(g2, [])
306         self.remove_linked_attribute(g1, [])
307         self.assert_forward_links(g1, [])
308
309         # removing a duplicate link in the same message should fail
310         self.add_linked_attribute(g2, [u1, u2])
311         self.assertRaises(ldb.LdbError,
312                           self.remove_linked_attribute,g2, [u1, u1])
313
314     def _test_la_links_delete_link_reveal(self):
315         u1, u2 = self.add_objects(2, 'user', 'u_del_link_reveal')
316         g1, g2 = self.add_objects(2, 'group', 'g_del_link_reveal')
317
318         self.add_linked_attribute(g1, u1)
319         self.add_linked_attribute(g2, u1)
320         self.add_linked_attribute(g2, u2)
321
322         self.remove_linked_attribute(g2, u1)
323
324         self.assert_forward_links(g2, [u1, u2], show_deleted=1,
325                                   show_recycled=1,
326                                   show_deactivated_link=0,
327                                   reveal_internals=0
328         )
329
330     def test_la_links_delete_link_reveal(self):
331         if opts.no_reveal_internals:
332             print 'skipping because --no-reveal-internals'
333             return
334         self._test_la_links_delete_link_reveal()
335
336     def test_la_links_delete_user(self):
337         u1, u2 = self.add_objects(2, 'user', 'u_del_user')
338         g1, g2 = self.add_objects(2, 'group', 'g_del_user')
339
340         self.add_linked_attribute(g1, u1)
341         self.add_linked_attribute(g2, u1)
342         self.add_linked_attribute(g2, u2)
343
344         res = self.samdb.search(g1, scope=ldb.SCOPE_BASE,
345                                 attrs=['uSNChanged'])
346         old_usn1 = int(res[0]['uSNChanged'][0])
347
348         res = self.samdb.search(g2, scope=ldb.SCOPE_BASE,
349                                 attrs=['uSNChanged'])
350         old_usn2 = int(res[0]['uSNChanged'][0])
351
352         self.samdb.delete(u1)
353
354         self.assert_forward_links(g1, [])
355         self.assert_forward_links(g2, [u2])
356
357         res = self.samdb.search(g1, scope=ldb.SCOPE_BASE,
358                                 attrs=['uSNChanged'])
359         new_usn1 = int(res[0]['uSNChanged'][0])
360
361         res = self.samdb.search(g2, scope=ldb.SCOPE_BASE,
362                                 attrs=['uSNChanged'])
363         new_usn2 = int(res[0]['uSNChanged'][0])
364
365         # Assert the USN on the alternate object is unchanged
366         self.assertEqual(old_usn1, new_usn1)
367         self.assertEqual(old_usn2, new_usn2)
368
369     def test_la_links_delete_user_reveal(self):
370         u1, u2 = self.add_objects(2, 'user', 'u_del_user_reveal')
371         g1, g2 = self.add_objects(2, 'group', 'g_del_user_reveal')
372
373         self.add_linked_attribute(g1, u1)
374         self.add_linked_attribute(g2, u1)
375         self.add_linked_attribute(g2, u2)
376
377         self.samdb.delete(u1)
378
379         self.assert_forward_links(g2, [u2],
380                                   show_deleted=1, show_recycled=1,
381                                   show_deactivated_link=0,
382                                   reveal_internals=0)
383         self.assert_forward_links(g1, [],
384                                   show_deleted=1, show_recycled=1,
385                                   show_deactivated_link=0,
386                                   reveal_internals=0)
387
388     def test_multiple_links(self):
389         u1, u2, u3, u4 = self.add_objects(4, 'user', 'u_multiple_links')
390         g1, g2, g3, g4 = self.add_objects(4, 'group', 'g_multiple_links')
391
392         self.add_linked_attribute(g1, [u1, u2, u3, u4])
393         self.add_linked_attribute(g2, [u3, u1])
394         self.add_linked_attribute(g3, u2)
395
396         try:
397             # adding u2 twice should be an error
398             self.add_linked_attribute(g2, [u1, u2, u3, u2])
399         except ldb.LdbError as (num, msg):
400             if num != ldb.ERR_ENTRY_ALREADY_EXISTS:
401                 self.fail("adding duplicate values, expected "
402                           "ERR_ENTRY_ALREADY_EXISTS, (%d) "
403                           "got %d" % (ldb.ERR_ENTRY_ALREADY_EXISTS, num))
404         else:
405             self.fail("adding duplicate values succeed when it shouldn't")
406
407         self.assert_forward_links(g1, [u1, u2, u3, u4])
408         self.assert_forward_links(g2, [u3, u1])
409         self.assert_forward_links(g3, [u2])
410         self.assert_back_links(u1, [g2, g1])
411         self.assert_back_links(u2, [g3, g1])
412         self.assert_back_links(u3, [g2, g1])
413         self.assert_back_links(u4, [g1])
414
415         self.remove_linked_attribute(g2, [u1, u3])
416         self.remove_linked_attribute(g1, [u1, u3])
417
418         self.assert_forward_links(g1, [u2, u4])
419         self.assert_forward_links(g2, [])
420         self.assert_forward_links(g3, [u2])
421         self.assert_back_links(u1, [])
422         self.assert_back_links(u2, [g3, g1])
423         self.assert_back_links(u3, [])
424         self.assert_back_links(u4, [g1])
425
426         self.add_linked_attribute(g1, [u1, u3])
427         self.add_linked_attribute(g2, [u3, u1])
428         self.add_linked_attribute(g3, [u1, u3])
429
430         self.assert_forward_links(g1, [u1, u2, u3, u4])
431         self.assert_forward_links(g2, [u1, u3])
432         self.assert_forward_links(g3, [u1, u2, u3])
433         self.assert_back_links(u1, [g1, g2, g3])
434         self.assert_back_links(u2, [g3, g1])
435         self.assert_back_links(u3, [g3, g2, g1])
436         self.assert_back_links(u4, [g1])
437
438     def test_la_links_replace(self):
439         u1, u2, u3, u4 = self.add_objects(4, 'user', 'u_replace')
440         g1, g2, g3, g4 = self.add_objects(4, 'group', 'g_replace')
441
442         self.add_linked_attribute(g1, [u1, u2])
443         self.add_linked_attribute(g2, [u1, u3])
444         self.add_linked_attribute(g3, u1)
445
446         self.replace_linked_attribute(g1, [u2])
447         self.replace_linked_attribute(g2, [u2, u3])
448         self.replace_linked_attribute(g3, [u1, u3])
449         self.replace_linked_attribute(g4, [u4])
450
451         self.assert_forward_links(g1, [u2])
452         self.assert_forward_links(g2, [u3, u2])
453         self.assert_forward_links(g3, [u3, u1])
454         self.assert_forward_links(g4, [u4])
455         self.assert_back_links(u1, [g3])
456         self.assert_back_links(u2, [g1, g2])
457         self.assert_back_links(u3, [g2, g3])
458         self.assert_back_links(u4, [g4])
459
460         self.replace_linked_attribute(g1, [u1, u2, u3])
461         self.replace_linked_attribute(g2, [u1])
462         self.replace_linked_attribute(g3, [u2])
463         self.replace_linked_attribute(g4, [])
464
465         self.assert_forward_links(g1, [u1, u2, u3])
466         self.assert_forward_links(g2, [u1])
467         self.assert_forward_links(g3, [u2])
468         self.assert_forward_links(g4, [])
469         self.assert_back_links(u1, [g1, g2])
470         self.assert_back_links(u2, [g1, g3])
471         self.assert_back_links(u3, [g1])
472         self.assert_back_links(u4, [])
473
474         try:
475             # adding u2 twice should be an error
476             self.replace_linked_attribute(g2, [u1, u2, u3, u2])
477         except ldb.LdbError as (num, msg):
478             if num != ldb.ERR_ENTRY_ALREADY_EXISTS:
479                 self.fail("adding duplicate values, expected "
480                           "ERR_ENTRY_ALREADY_EXISTS, (%d) "
481                           "got %d" % (ldb.ERR_ENTRY_ALREADY_EXISTS, num))
482         else:
483             self.fail("replacing duplicate values succeeded when it shouldn't")
484
485     def test_la_links_replace2(self):
486         users = self.add_objects(12, 'user', 'u_replace2')
487         g1, = self.add_objects(1, 'group', 'g_replace2')
488
489         self.add_linked_attribute(g1, users[:6])
490         self.assert_forward_links(g1, users[:6])
491         self.replace_linked_attribute(g1, users)
492         self.assert_forward_links(g1, users)
493         self.replace_linked_attribute(g1, users[6:])
494         self.assert_forward_links(g1, users[6:])
495         self.remove_linked_attribute(g1, users[6:9])
496         self.assert_forward_links(g1, users[9:])
497         self.remove_linked_attribute(g1, users[9:])
498         self.assert_forward_links(g1, [])
499
500     def test_la_links_permutations(self):
501         """Make sure the order in which we add links doesn't matter."""
502         users = self.add_objects(3, 'user', 'u_permutations')
503         groups = self.add_objects(6, 'group', 'g_permutations')
504
505         for g, p in zip(groups, itertools.permutations(users)):
506             self.add_linked_attribute(g, p)
507
508         # everyone should be in every group
509         for g in groups:
510             self.assert_forward_links(g, users)
511
512         for u in users:
513             self.assert_back_links(u, groups)
514
515         for g, p in zip(groups[::-1], itertools.permutations(users)):
516             self.replace_linked_attribute(g, p)
517
518         for g in groups:
519             self.assert_forward_links(g, users)
520
521         for u in users:
522             self.assert_back_links(u, groups)
523
524         for g, p in zip(groups, itertools.permutations(users)):
525             self.remove_linked_attribute(g, p)
526
527         for g in groups:
528             self.assert_forward_links(g, [])
529
530         for u in users:
531             self.assert_back_links(u, [])
532
533     def test_la_links_relaxed(self):
534         """Check that the relax control doesn't mess with linked attributes."""
535         relax_control = ['relax:0']
536
537         users = self.add_objects(10, 'user', 'u_relax')
538         groups = self.add_objects(3, 'group', 'g_relax',
539                                   more_attrs={'member': users[:2]})
540         g_relax1, g_relax2, g_uptight = groups
541
542         # g_relax1 has all users added at once
543         # g_relax2 gets them one at a time in reverse order
544         # g_uptight never relaxes
545
546         self.add_linked_attribute(g_relax1, users[2:5], controls=relax_control)
547
548         for u in reversed(users[2:5]):
549             self.add_linked_attribute(g_relax2, u, controls=relax_control)
550             self.add_linked_attribute(g_uptight, u)
551
552         for g in groups:
553             self.assert_forward_links(g, users[:5])
554
555             self.add_linked_attribute(g, users[5:7])
556             self.assert_forward_links(g, users[:7])
557
558             for u in users[7:]:
559                 self.add_linked_attribute(g, u)
560
561             self.assert_forward_links(g, users)
562
563         for u in users:
564             self.assert_back_links(u, groups)
565
566         # try some replacement permutations
567         import random
568         random.seed(1)
569         users2 = users[:]
570         for i in range(5):
571             random.shuffle(users2)
572             self.replace_linked_attribute(g_relax1, users2,
573                                           controls=relax_control)
574
575             self.assert_forward_links(g_relax1, users)
576
577         for i in range(5):
578             random.shuffle(users2)
579             self.remove_linked_attribute(g_relax2, users2,
580                                          controls=relax_control)
581             self.remove_linked_attribute(g_uptight, users2)
582
583             self.replace_linked_attribute(g_relax1, [], controls=relax_control)
584
585             random.shuffle(users2)
586             self.add_linked_attribute(g_relax2, users2,
587                                       controls=relax_control)
588             self.add_linked_attribute(g_uptight, users2)
589             self.replace_linked_attribute(g_relax1, users2,
590                                           controls=relax_control)
591
592             self.assert_forward_links(g_relax1, users)
593             self.assert_forward_links(g_relax2, users)
594             self.assert_forward_links(g_uptight, users)
595
596         for u in users:
597             self.assert_back_links(u, groups)
598
599     def test_add_all_at_once(self):
600         """All these other tests are creating linked attributes after the
601         objects are there. We want to test creating them all at once
602         using LDIF.
603         """
604         users = self.add_objects(7, 'user', 'u_all_at_once')
605         g1, g3 = self.add_objects(2, 'group', 'g_all_at_once',
606                                   more_attrs={'member': users})
607         (g2,) = self.add_objects(1, 'group', 'g_all_at_once2',
608                                  more_attrs={'member': users[:5]})
609
610         try:
611             self.add_objects(1, 'group', 'g_with_duplicate_links',
612                              more_attrs={'member': users[:5] + users[1:2]})
613         except ldb.LdbError as (num, msg):
614             if num != ldb.ERR_ENTRY_ALREADY_EXISTS:
615                 self.fail("adding duplicate values, expected "
616                           "ERR_ENTRY_ALREADY_EXISTS, (%d) "
617                           "got %d" % (ldb.ERR_ENTRY_ALREADY_EXISTS, num))
618
619         self.assert_forward_links(g1, users)
620         self.assert_forward_links(g2, users[:5])
621         self.assert_forward_links(g3, users)
622         for u in users[:5]:
623             self.assert_back_links(u, [g1, g2, g3])
624         for u in users[5:]:
625             self.assert_back_links(u, [g1, g3])
626
627         self.remove_linked_attribute(g2, users[0])
628         self.remove_linked_attribute(g2, users[1])
629         self.add_linked_attribute(g2, users[1])
630         self.add_linked_attribute(g2, users[5])
631         self.add_linked_attribute(g2, users[6])
632
633         self.assert_forward_links(g1, users)
634         self.assert_forward_links(g2, users[1:])
635
636         for u in users[1:]:
637             self.remove_linked_attribute(g2, u)
638         self.remove_linked_attribute(g1, users)
639
640         for u in users:
641             self.samdb.delete(u)
642
643         self.assert_forward_links(g1, [])
644         self.assert_forward_links(g2, [])
645         self.assert_forward_links(g3, [])
646
647     def test_one_way_attributes(self):
648         e1, e2 = self.add_objects(2, 'msExchConfigurationContainer',
649                                   'e_one_way')
650         guid = self.get_object_guid(e2)
651
652         self.add_linked_attribute(e1, e2, attr="addressBookRoots")
653         self.assert_forward_links(e1, [e2], attr='addressBookRoots')
654
655         self.samdb.delete(e2)
656
657         res = self.samdb.search("<GUID=%s>" % guid,
658                                 scope=ldb.SCOPE_BASE,
659                                 controls=['show_deleted:1',
660                                           'show_recycled:1'])
661
662         new_dn = str(res[0].dn)
663         self.assert_forward_links(e1, [new_dn], attr='addressBookRoots')
664         self.assert_forward_links(e1, [new_dn],
665                                   attr='addressBookRoots',
666                                   show_deactivated_link=0)
667
668     def test_one_way_attributes_delete_link(self):
669         e1, e2 = self.add_objects(2, 'msExchConfigurationContainer',
670                                   'e_one_way')
671         guid = self.get_object_guid(e2)
672
673         self.add_linked_attribute(e1, e2, attr="addressBookRoots")
674         self.assert_forward_links(e1, [e2], attr='addressBookRoots')
675
676         self.remove_linked_attribute(e1, e2, attr="addressBookRoots")
677
678         self.assert_forward_links(e1, [], attr='addressBookRoots')
679         self.assert_forward_links(e1, [], attr='addressBookRoots',
680                                   show_deactivated_link=0)
681
682     def test_pretend_one_way_attributes(self):
683         e1, e2 = self.add_objects(2, 'msExchConfigurationContainer',
684                                   'e_one_way')
685         guid = self.get_object_guid(e2)
686
687         self.add_linked_attribute(e1, e2, attr="addressBookRoots2")
688         self.assert_forward_links(e1, [e2], attr='addressBookRoots2')
689
690         self.samdb.delete(e2)
691         res = self.samdb.search("<GUID=%s>" % guid,
692                                 scope=ldb.SCOPE_BASE,
693                                 controls=['show_deleted:1',
694                                           'show_recycled:1'])
695
696         new_dn = str(res[0].dn)
697
698         self.assert_forward_links(e1, [], attr='addressBookRoots2')
699         self.assert_forward_links(e1, [], attr='addressBookRoots2',
700                                   show_deactivated_link=0)
701
702     def test_pretend_one_way_attributes_delete_link(self):
703         e1, e2 = self.add_objects(2, 'msExchConfigurationContainer',
704                                   'e_one_way')
705         guid = self.get_object_guid(e2)
706
707         self.add_linked_attribute(e1, e2, attr="addressBookRoots2")
708         self.assert_forward_links(e1, [e2], attr='addressBookRoots2')
709
710         self.remove_linked_attribute(e1, e2, attr="addressBookRoots2")
711
712         self.assert_forward_links(e1, [], attr='addressBookRoots2')
713         self.assert_forward_links(e1, [], attr='addressBookRoots2',
714                                   show_deactivated_link=0)
715
716 if "://" not in host:
717     if os.path.isfile(host):
718         host = "tdb://%s" % host
719     else:
720         host = "ldap://%s" % host
721
722
723 TestProgram(module=__name__, opts=subunitopts)