4563bb2d9a3d32e0a2c51b7a46361576b098b6b2
[palcantara/samba-autobuild/.git] / python / samba / tests / samba_tool / user.py
1 # Unix SMB/CIFS implementation.
2 # Copyright (C) Sean Dague <sdague@linux.vnet.ibm.com> 2011
3 #
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 # GNU General Public License for more details.
13 #
14 # You should have received a copy of the GNU General Public License
15 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 #
17
18 import os
19 import time
20 import base64
21 import ldb
22 from samba.tests.samba_tool.base import SambaToolCmdTest
23 from samba import (
24         credentials,
25         nttime2unix,
26         dsdb
27         )
28 from samba.ndr import ndr_unpack
29 from samba.dcerpc import drsblobs
30 from samba.common import get_bytes
31 from samba.common import get_string
32 from samba.tests import env_loadparm
33
34
35 class UserCmdTestCase(SambaToolCmdTest):
36     """Tests for samba-tool user subcommands"""
37     users = []
38     samdb = None
39
40     def setUp(self):
41         super(UserCmdTestCase, self).setUp()
42         self.samdb = self.getSamDB("-H", "ldap://%s" % os.environ["DC_SERVER"],
43                                    "-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"]))
44
45         # Modify the default template homedir
46         lp = self.get_loadparm()
47         self.template_homedir = lp.get('template homedir')
48         lp.set('template homedir', '/home/test/%D/%U')
49
50         self.users = []
51         self.users.append(self._randomUser({"name": "sambatool1", "company": "comp1"}))
52         self.users.append(self._randomUser({"name": "sambatool2", "company": "comp1"}))
53         self.users.append(self._randomUser({"name": "sambatool3", "company": "comp2"}))
54         self.users.append(self._randomUser({"name": "sambatool4", "company": "comp2"}))
55         self.users.append(self._randomPosixUser({"name": "posixuser1"}))
56         self.users.append(self._randomPosixUser({"name": "posixuser2"}))
57         self.users.append(self._randomPosixUser({"name": "posixuser3"}))
58         self.users.append(self._randomPosixUser({"name": "posixuser4"}))
59         self.users.append(self._randomUnixUser({"name": "unixuser1"}))
60         self.users.append(self._randomUnixUser({"name": "unixuser2"}))
61         self.users.append(self._randomUnixUser({"name": "unixuser3"}))
62         self.users.append(self._randomUnixUser({"name": "unixuser4"}))
63
64         # setup the 12 users and ensure they are correct
65         for user in self.users:
66             (result, out, err) = user["createUserFn"](user)
67
68             self.assertCmdSuccess(result, out, err)
69             self.assertEqual(err, "", "Shouldn't be any error messages")
70             if 'unix' in user["name"]:
71                 self.assertIn("Modified User '%s' successfully" % user["name"],
72                               out)
73             else:
74                 self.assertIn("User '%s' added successfully" % user["name"],
75                               out)
76
77             user["checkUserFn"](user)
78
79     def tearDown(self):
80         super(UserCmdTestCase, self).tearDown()
81         # clean up all the left over users, just in case
82         for user in self.users:
83             if self._find_user(user["name"]):
84                 self.runsubcmd("user", "delete", user["name"])
85         lp = env_loadparm()
86         # second run of this test
87         # the cache is still there and '--cache-ldb-initialize'
88         # will fail
89         cachedb = lp.private_path("user-syncpasswords-cache.ldb")
90         if os.path.exists(cachedb):
91             os.remove(cachedb)
92         lp.set('template homedir', self.template_homedir)
93
94     def test_newuser(self):
95         # try to add all the users again, this should fail
96         for user in self.users:
97             (result, out, err) = self._create_user(user)
98             self.assertCmdFail(result, "Ensure that create user fails")
99             self.assertIn("LDAP error 68 LDAP_ENTRY_ALREADY_EXISTS", err)
100
101         # try to delete all the 4 users we just added
102         for user in self.users:
103             (result, out, err) = self.runsubcmd("user", "delete", user["name"])
104             self.assertCmdSuccess(result, out, err, "Can we delete users")
105             found = self._find_user(user["name"])
106             self.assertIsNone(found)
107
108         # test adding users with --use-username-as-cn
109         for user in self.users:
110             (result, out, err) = self.runsubcmd("user", "create", user["name"], user["password"],
111                                                 "--use-username-as-cn",
112                                                 "--surname=%s" % user["surname"],
113                                                 "--given-name=%s" % user["given-name"],
114                                                 "--job-title=%s" % user["job-title"],
115                                                 "--department=%s" % user["department"],
116                                                 "--description=%s" % user["description"],
117                                                 "--company=%s" % user["company"],
118                                                 "-H", "ldap://%s" % os.environ["DC_SERVER"],
119                                                 "-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"]))
120
121             self.assertCmdSuccess(result, out, err)
122             self.assertEqual(err, "", "Shouldn't be any error messages")
123             self.assertIn("User '%s' added successfully" % user["name"], out)
124
125             found = self._find_user(user["name"])
126
127             self.assertEqual("%s" % found.get("cn"), "%(name)s" % user)
128             self.assertEqual("%s" % found.get("name"), "%(name)s" % user)
129
130     def _verify_supplementalCredentials(self, ldif,
131                                         min_packages=3,
132                                         max_packages=6):
133         msgs = self.samdb.parse_ldif(ldif)
134         (changetype, obj) = next(msgs)
135
136         self.assertIn("supplementalCredentials", obj, "supplementalCredentials attribute required")
137         sc_blob = obj["supplementalCredentials"][0]
138         sc = ndr_unpack(drsblobs.supplementalCredentialsBlob, sc_blob)
139
140         self.assertGreaterEqual(sc.sub.num_packages,
141                                 min_packages, "min_packages check")
142         self.assertLessEqual(sc.sub.num_packages,
143                              max_packages, "max_packages check")
144
145         if max_packages == 0:
146             return
147
148         def find_package(packages, name, start_idx=0):
149             for i in range(start_idx, len(packages)):
150                 if packages[i].name == name:
151                     return (i, packages[i])
152             return (None, None)
153
154         # The ordering is this
155         #
156         # Primary:Kerberos-Newer-Keys (optional)
157         # Primary:Kerberos
158         # Primary:WDigest
159         # Primary:CLEARTEXT (optional)
160         # Primary:SambaGPG (optional)
161         #
162         # And the 'Packages' package is insert before the last
163         # other package.
164
165         nidx = 0
166         (pidx, pp) = find_package(sc.sub.packages, "Packages", start_idx=nidx)
167         self.assertIsNotNone(pp, "Packages required")
168         self.assertEqual(pidx + 1, sc.sub.num_packages - 1,
169                          "Packages needs to be at num_packages - 1")
170
171         (knidx, knp) = find_package(sc.sub.packages, "Primary:Kerberos-Newer-Keys",
172                                     start_idx=nidx)
173         if knidx is not None:
174             self.assertEqual(knidx, nidx, "Primary:Kerberos-Newer-Keys at wrong position")
175             nidx = nidx + 1
176             if nidx == pidx:
177                 nidx = nidx + 1
178
179         (kidx, kp) = find_package(sc.sub.packages, "Primary:Kerberos",
180                                   start_idx=nidx)
181         self.assertIsNotNone(pp, "Primary:Kerberos required")
182         self.assertEqual(kidx, nidx, "Primary:Kerberos at wrong position")
183         nidx = nidx + 1
184         if nidx == pidx:
185             nidx = nidx + 1
186
187         (widx, wp) = find_package(sc.sub.packages, "Primary:WDigest",
188                                   start_idx=nidx)
189         self.assertIsNotNone(pp, "Primary:WDigest required")
190         self.assertEqual(widx, nidx, "Primary:WDigest at wrong position")
191         nidx = nidx + 1
192         if nidx == pidx:
193             nidx = nidx + 1
194
195         (cidx, cp) = find_package(sc.sub.packages, "Primary:CLEARTEXT",
196                                   start_idx=nidx)
197         if cidx is not None:
198             self.assertEqual(cidx, nidx, "Primary:CLEARTEXT at wrong position")
199             nidx = nidx + 1
200             if nidx == pidx:
201                 nidx = nidx + 1
202
203         (gidx, gp) = find_package(sc.sub.packages, "Primary:SambaGPG",
204                                   start_idx=nidx)
205         if gidx is not None:
206             self.assertEqual(gidx, nidx, "Primary:SambaGPG at wrong position")
207             nidx = nidx + 1
208             if nidx == pidx:
209                 nidx = nidx + 1
210
211         self.assertEqual(nidx, sc.sub.num_packages, "Unknown packages found")
212
213     def test_setpassword(self):
214         expect_nt_hash = bool(int(os.environ.get("EXPECT_NT_HASH", "1")))
215
216         for user in self.users:
217             newpasswd = self.random_password(16)
218             (result, out, err) = self.runsubcmd("user", "setpassword",
219                                                 user["name"],
220                                                 "--newpassword=%s" % newpasswd,
221                                                 "-H", "ldap://%s" % os.environ["DC_SERVER"],
222                                                 "-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"]))
223             self.assertCmdSuccess(result, out, err, "Ensure setpassword runs")
224             self.assertEqual(err, "", "setpassword with url")
225             self.assertMatch(out, "Changed password OK", "setpassword with url")
226
227         attributes = "sAMAccountName,unicodePwd,supplementalCredentials,virtualClearTextUTF8,virtualClearTextUTF16,virtualSSHA,virtualSambaGPG"
228         (result, out, err) = self.runsubcmd("user", "syncpasswords",
229                                             "--cache-ldb-initialize",
230                                             "--attributes=%s" % attributes,
231                                             "--decrypt-samba-gpg")
232         self.assertCmdSuccess(result, out, err, "Ensure syncpasswords --cache-ldb-initialize runs")
233         self.assertEqual(err, "", "getpassword without url")
234         cache_attrs = {
235             "objectClass": {"value": "userSyncPasswords"},
236             "samdbUrl": {},
237             "dirsyncFilter": {},
238             "dirsyncAttribute": {},
239             "dirsyncControl": {"value": "dirsync:1:0:0"},
240             "passwordAttribute": {},
241             "decryptSambaGPG": {},
242             "currentTime": {},
243         }
244         for a in cache_attrs.keys():
245             v = cache_attrs[a].get("value", "")
246             self.assertMatch(out, "%s: %s" % (a, v),
247                              "syncpasswords --cache-ldb-initialize: %s: %s out[%s]" % (a, v, out))
248
249         (result, out, err) = self.runsubcmd("user", "syncpasswords", "--no-wait")
250         self.assertCmdSuccess(result, out, err, "Ensure syncpasswords --no-wait runs")
251         self.assertEqual(err, "", "syncpasswords --no-wait")
252         self.assertMatch(out, "dirsync_loop(): results 0",
253                          "syncpasswords --no-wait: 'dirsync_loop(): results 0': out[%s]" % (out))
254         for user in self.users:
255             self.assertMatch(out, "sAMAccountName: %s" % (user["name"]),
256                              "syncpasswords --no-wait: 'sAMAccountName': %s out[%s]" % (user["name"], out))
257
258         for user in self.users:
259             newpasswd = self.random_password(16)
260             creds = credentials.Credentials()
261             creds.set_anonymous()
262             creds.set_password(newpasswd)
263             unicodePwd = base64.b64encode(creds.get_nt_hash()).decode('utf8')
264             virtualClearTextUTF8 = base64.b64encode(get_bytes(newpasswd)).decode('utf8')
265             virtualClearTextUTF16 = base64.b64encode(get_string(newpasswd).encode('utf-16-le')).decode('utf8')
266
267             (result, out, err) = self.runsubcmd("user", "setpassword",
268                                                 user["name"],
269                                                 "--newpassword=%s" % newpasswd)
270             self.assertCmdSuccess(result, out, err, "Ensure setpassword runs")
271             self.assertEqual(err, "", "setpassword without url")
272             self.assertMatch(out, "Changed password OK", "setpassword without url")
273
274             (result, out, err) = self.runsubcmd("user", "syncpasswords", "--no-wait")
275             self.assertCmdSuccess(result, out, err, "Ensure syncpasswords --no-wait runs")
276             self.assertEqual(err, "", "syncpasswords --no-wait")
277             self.assertMatch(out, "dirsync_loop(): results 0",
278                              "syncpasswords --no-wait: 'dirsync_loop(): results 0': out[%s]" % (out))
279             self.assertMatch(out, "sAMAccountName: %s" % (user["name"]),
280                              "syncpasswords --no-wait: 'sAMAccountName': %s out[%s]" % (user["name"], out))
281             self.assertMatch(out, "# unicodePwd::: REDACTED SECRET ATTRIBUTE",
282                              "getpassword '# unicodePwd::: REDACTED SECRET ATTRIBUTE': out[%s]" % out)
283             if expect_nt_hash:
284                 self.assertMatch(out, "unicodePwd:: %s" % unicodePwd,
285                                  "getpassword unicodePwd: out[%s]" % out)
286             else:
287                 self.assertNotIn("unicodePwd:: %s" % unicodePwd, out)
288             self.assertMatch(out, "# supplementalCredentials::: REDACTED SECRET ATTRIBUTE",
289                              "getpassword '# supplementalCredentials::: REDACTED SECRET ATTRIBUTE': out[%s]" % out)
290             self.assertMatch(out, "supplementalCredentials:: ",
291                              "getpassword supplementalCredentials: out[%s]" % out)
292             if "virtualSambaGPG:: " in out:
293                 self.assertMatch(out, "virtualClearTextUTF8:: %s" % virtualClearTextUTF8,
294                                  "getpassword virtualClearTextUTF8: out[%s]" % out)
295                 self.assertMatch(out, "virtualClearTextUTF16:: %s" % virtualClearTextUTF16,
296                                  "getpassword virtualClearTextUTF16: out[%s]" % out)
297                 self.assertMatch(out, "virtualSSHA: ",
298                                  "getpassword virtualSSHA: out[%s]" % out)
299
300             (result, out, err) = self.runsubcmd("user", "getpassword",
301                                                 user["name"],
302                                                 "--attributes=%s" % attributes,
303                                                 "--decrypt-samba-gpg")
304             self.assertCmdSuccess(result, out, err, "Ensure getpassword runs")
305             self.assertEqual(err, "", "getpassword without url")
306             self.assertMatch(out, "Got password OK", "getpassword without url")
307             self.assertMatch(out, "sAMAccountName: %s" % (user["name"]),
308                              "getpassword: 'sAMAccountName': %s out[%s]" % (user["name"], out))
309             if expect_nt_hash:
310                 self.assertMatch(out, "unicodePwd:: %s" % unicodePwd,
311                                  "getpassword unicodePwd: out[%s]" % out)
312             else:
313                 self.assertNotIn("unicodePwd:: %s" % unicodePwd, out)
314             self.assertMatch(out, "supplementalCredentials:: ",
315                              "getpassword supplementalCredentials: out[%s]" % out)
316             self._verify_supplementalCredentials(out.replace("\nGot password OK\n", ""))
317             if "virtualSambaGPG:: " in out:
318                 self.assertMatch(out, "virtualClearTextUTF8:: %s" % virtualClearTextUTF8,
319                                  "getpassword virtualClearTextUTF8: out[%s]" % out)
320                 self.assertMatch(out, "virtualClearTextUTF16:: %s" % virtualClearTextUTF16,
321                                  "getpassword virtualClearTextUTF16: out[%s]" % out)
322                 self.assertMatch(out, "virtualSSHA: ",
323                                  "getpassword virtualSSHA: out[%s]" % out)
324
325         for user in self.users:
326             newpasswd = self.random_password(16)
327             (result, out, err) = self.runsubcmd("user", "setpassword",
328                                                 user["name"],
329                                                 "--newpassword=%s" % newpasswd,
330                                                 "--must-change-at-next-login",
331                                                 "-H", "ldap://%s" % os.environ["DC_SERVER"],
332                                                 "-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"]))
333             self.assertCmdSuccess(result, out, err, "Ensure setpassword runs")
334             self.assertEqual(err, "", "setpassword with forced change")
335             self.assertMatch(out, "Changed password OK", "setpassword with forced change")
336
337     def test_setexpiry(self):
338         for user in self.users:
339             twodays = time.time() + (2 * 24 * 60 * 60)
340
341             (result, out, err) = self.runsubcmd("user", "setexpiry", user["name"],
342                                                 "--days=2",
343                                                 "-H", "ldap://%s" % os.environ["DC_SERVER"],
344                                                 "-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"]))
345             self.assertCmdSuccess(result, out, err, "Can we run setexpiry with names")
346             self.assertIn("Expiry for user '%s' set to 2 days." % user["name"], out)
347
348             found = self._find_user(user["name"])
349
350             expires = nttime2unix(int("%s" % found.get("accountExpires")))
351             self.assertWithin(expires, twodays, 5, "Ensure account expires is within 5 seconds of the expected time")
352
353         # TODO: renable this after the filter case is sorted out
354         if "filters are broken, bail now":
355             return
356
357         # now run the expiration based on a filter
358         fourdays = time.time() + (4 * 24 * 60 * 60)
359         (result, out, err) = self.runsubcmd("user", "setexpiry",
360                                             "--filter", "(&(objectClass=user)(company=comp2))",
361                                             "--days=4",
362                                             "-H", "ldap://%s" % os.environ["DC_SERVER"],
363                                             "-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"]))
364         self.assertCmdSuccess(result, out, err, "Can we run setexpiry with a filter")
365
366         for user in self.users:
367             found = self._find_user(user["name"])
368             if ("%s" % found.get("company")) == "comp2":
369                 expires = nttime2unix(int("%s" % found.get("accountExpires")))
370                 self.assertWithin(expires, fourdays, 5, "Ensure account expires is within 5 seconds of the expected time")
371             else:
372                 expires = nttime2unix(int("%s" % found.get("accountExpires")))
373                 self.assertWithin(expires, twodays, 5, "Ensure account expires is within 5 seconds of the expected time")
374
375     def test_list(self):
376         (result, out, err) = self.runsubcmd("user", "list",
377                                             "-H", "ldap://%s" % os.environ["DC_SERVER"],
378                                             "-U%s%%%s" % (os.environ["DC_USERNAME"],
379                                                           os.environ["DC_PASSWORD"]))
380         self.assertCmdSuccess(result, out, err, "Error running list")
381
382         search_filter = ("(&(objectClass=user)(userAccountControl:%s:=%u))" %
383                          (ldb.OID_COMPARATOR_AND, dsdb.UF_NORMAL_ACCOUNT))
384
385         userlist = self.samdb.search(base=self.samdb.domain_dn(),
386                                      scope=ldb.SCOPE_SUBTREE,
387                                      expression=search_filter,
388                                      attrs=["samaccountname"])
389
390         self.assertTrue(len(userlist) > 0, "no users found in samdb")
391
392         for userobj in userlist:
393             name = str(userobj.get("samaccountname", idx=0))
394             found = self.assertMatch(out, name,
395                                      "user '%s' not found" % name)
396
397
398     def test_list_base_dn(self):
399         base_dn = "CN=Users"
400         (result, out, err) = self.runsubcmd("user", "list", "-b", base_dn,
401                                             "-H", "ldap://%s" % os.environ["DC_SERVER"],
402                                             "-U%s%%%s" % (os.environ["DC_USERNAME"],
403                                                           os.environ["DC_PASSWORD"]))
404         self.assertCmdSuccess(result, out, err, "Error running list")
405
406         search_filter = ("(&(objectClass=user)(userAccountControl:%s:=%u))" %
407                          (ldb.OID_COMPARATOR_AND, dsdb.UF_NORMAL_ACCOUNT))
408
409         userlist = self.samdb.search(base=self.samdb.normalize_dn_in_domain(base_dn),
410                                      scope=ldb.SCOPE_SUBTREE,
411                                      expression=search_filter,
412                                      attrs=["samaccountname"])
413
414         self.assertTrue(len(userlist) > 0, "no users found in samdb")
415
416         for userobj in userlist:
417             name = str(userobj.get("samaccountname", idx=0))
418             found = self.assertMatch(out, name,
419                                      "user '%s' not found" % name)
420
421     def test_list_full_dn(self):
422         (result, out, err) = self.runsubcmd("user", "list", "--full-dn",
423                                             "-H", "ldap://%s" % os.environ["DC_SERVER"],
424                                             "-U%s%%%s" % (os.environ["DC_USERNAME"],
425                                                           os.environ["DC_PASSWORD"]))
426         self.assertCmdSuccess(result, out, err, "Error running list")
427
428         search_filter = ("(&(objectClass=user)(userAccountControl:%s:=%u))" %
429                          (ldb.OID_COMPARATOR_AND, dsdb.UF_NORMAL_ACCOUNT))
430
431         userlist = self.samdb.search(base=self.samdb.domain_dn(),
432                                      scope=ldb.SCOPE_SUBTREE,
433                                      expression=search_filter,
434                                      attrs=["dn"])
435
436         self.assertTrue(len(userlist) > 0, "no users found in samdb")
437
438         for userobj in userlist:
439             name = str(userobj.get("dn", idx=0))
440             found = self.assertMatch(out, name,
441                                      "user '%s' not found" % name)
442
443     def test_list_hide_expired(self):
444         expire_username = "expireUser"
445         expire_user = self._randomUser({"name": expire_username})
446         self._create_user(expire_user)
447
448         (result, out, err) = self.runsubcmd(
449             "user",
450             "list",
451             "--hide-expired",
452             "-H",
453             "ldap://%s" % os.environ["DC_SERVER"],
454             "-U%s%%%s" % (os.environ["DC_USERNAME"],
455                           os.environ["DC_PASSWORD"]))
456         self.assertCmdSuccess(result, out, err, "Error running list")
457         self.assertTrue(expire_username in out,
458                         "user '%s' not found" % expire_username)
459
460         # user will be expired one second ago
461         self.samdb.setexpiry(
462             "(sAMAccountname=%s)" % expire_username,
463             -1,
464             False)
465
466         (result, out, err) = self.runsubcmd(
467             "user",
468             "list",
469             "--hide-expired",
470             "-H",
471             "ldap://%s" % os.environ["DC_SERVER"],
472             "-U%s%%%s" % (os.environ["DC_USERNAME"],
473                           os.environ["DC_PASSWORD"]))
474         self.assertCmdSuccess(result, out, err, "Error running list")
475         self.assertFalse(expire_username in out,
476                          "user '%s' found" % expire_username)
477
478         self.samdb.deleteuser(expire_username)
479
480     def test_list_hide_disabled(self):
481         disable_username = "disableUser"
482         disable_user = self._randomUser({"name": disable_username})
483         self._create_user(disable_user)
484
485         (result, out, err) = self.runsubcmd(
486             "user",
487             "list",
488             "--hide-disabled",
489             "-H",
490             "ldap://%s" % os.environ["DC_SERVER"],
491             "-U%s%%%s" % (os.environ["DC_USERNAME"],
492                           os.environ["DC_PASSWORD"]))
493         self.assertCmdSuccess(result, out, err, "Error running list")
494         self.assertTrue(disable_username in out,
495                         "user '%s' not found" % disable_username)
496
497         self.samdb.disable_account("(sAMAccountname=%s)" % disable_username)
498
499         (result, out, err) = self.runsubcmd(
500             "user",
501             "list",
502             "--hide-disabled",
503             "-H",
504             "ldap://%s" % os.environ["DC_SERVER"],
505             "-U%s%%%s" % (os.environ["DC_USERNAME"],
506                           os.environ["DC_PASSWORD"]))
507         self.assertCmdSuccess(result, out, err, "Error running list")
508         self.assertFalse(disable_username in out,
509                          "user '%s' found" % disable_username)
510
511         self.samdb.deleteuser(disable_username)
512
513     def test_show(self):
514         for user in self.users:
515             (result, out, err) = self.runsubcmd(
516                 "user", "show", user["name"],
517                 "--attributes=sAMAccountName,company",
518                 "-H", "ldap://%s" % os.environ["DC_SERVER"],
519                 "-U%s%%%s" % (os.environ["DC_USERNAME"],
520                               os.environ["DC_PASSWORD"]))
521             self.assertCmdSuccess(result, out, err, "Error running show")
522
523             expected_out = """dn: CN=%s %s,CN=Users,%s
524 company: %s
525 sAMAccountName: %s
526
527 """ % (user["given-name"], user["surname"], self.samdb.domain_dn(),
528                 user["company"], user["name"])
529
530             self.assertEqual(out, expected_out,
531                              "Unexpected show output for user '%s'" %
532                              user["name"])
533
534             time_attrs = [
535                 "name", # test that invalid values are just ignored
536                 "whenCreated",
537                 "whenChanged",
538                 "accountExpires",
539                 "badPasswordTime",
540                 "lastLogoff",
541                 "lastLogon",
542                 "lastLogonTimestamp",
543                 "lockoutTime",
544                 "msDS-UserPasswordExpiryTimeComputed",
545                 "pwdLastSet",
546                 ]
547
548             attrs = []
549             for ta in time_attrs:
550                 attrs.append(ta)
551                 for fm in ["GeneralizedTime", "UnixTime", "TimeSpec"]:
552                     attrs.append("%s;format=%s" % (ta, fm))
553
554             (result, out, err) = self.runsubcmd(
555                 "user", "show", user["name"],
556                 "--attributes=%s" % ",".join(attrs),
557                 "-H", "ldap://%s" % os.environ["DC_SERVER"],
558                 "-U%s%%%s" % (os.environ["DC_USERNAME"],
559                               os.environ["DC_PASSWORD"]))
560             self.assertCmdSuccess(result, out, err, "Error running show")
561
562             self.assertIn(";format=GeneralizedTime", out)
563             self.assertIn(";format=UnixTime", out)
564             self.assertIn(";format=TimeSpec", out)
565
566             self.assertIn("name: ", out)
567             self.assertNotIn("name;format=GeneralizedTime: ", out)
568             self.assertNotIn("name;format=UnixTime: ", out)
569             self.assertNotIn("name;format=TimeSpec: ", out)
570
571             self.assertIn("whenCreated: 20", out)
572             self.assertIn("whenCreated;format=GeneralizedTime: 20", out)
573             self.assertIn("whenCreated;format=UnixTime: 1", out)
574             self.assertIn("whenCreated;format=TimeSpec: 1", out)
575
576             self.assertIn("whenChanged: 20", out)
577             self.assertIn("whenChanged;format=GeneralizedTime: 20", out)
578             self.assertIn("whenChanged;format=UnixTime: 1", out)
579             self.assertIn("whenChanged;format=TimeSpec: 1", out)
580
581             self.assertIn("accountExpires: 9223372036854775807", out)
582             self.assertNotIn("accountExpires;format=GeneralizedTime: ", out)
583             self.assertNotIn("accountExpires;format=UnixTime: ", out)
584             self.assertNotIn("accountExpires;format=TimeSpec: ", out)
585
586             self.assertIn("badPasswordTime: 0", out)
587             self.assertNotIn("badPasswordTime;format=GeneralizedTime: ", out)
588             self.assertNotIn("badPasswordTime;format=UnixTime: ", out)
589             self.assertNotIn("badPasswordTime;format=TimeSpec: ", out)
590
591             self.assertIn("lastLogoff: 0", out)
592             self.assertNotIn("lastLogoff;format=GeneralizedTime: ", out)
593             self.assertNotIn("lastLogoff;format=UnixTime: ", out)
594             self.assertNotIn("lastLogoff;format=TimeSpec: ", out)
595
596             self.assertIn("lastLogon: 0", out)
597             self.assertNotIn("lastLogon;format=GeneralizedTime: ", out)
598             self.assertNotIn("lastLogon;format=UnixTime: ", out)
599             self.assertNotIn("lastLogon;format=TimeSpec: ", out)
600
601             # If a specified attribute is not available on a user object
602             # it's silently omitted.
603             self.assertNotIn("lastLogonTimestamp:", out)
604             self.assertNotIn("lockoutTime:", out)
605
606             self.assertIn("msDS-UserPasswordExpiryTimeComputed: 1", out)
607             self.assertIn("msDS-UserPasswordExpiryTimeComputed;format=GeneralizedTime: 20", out)
608             self.assertIn("msDS-UserPasswordExpiryTimeComputed;format=UnixTime: 1", out)
609             self.assertIn("msDS-UserPasswordExpiryTimeComputed;format=TimeSpec: 1", out)
610
611             self.assertIn("pwdLastSet: 1", out)
612             self.assertIn("pwdLastSet;format=GeneralizedTime: 20", out)
613             self.assertIn("pwdLastSet;format=UnixTime: 1", out)
614             self.assertIn("pwdLastSet;format=TimeSpec: 1", out)
615
616             out_msgs = self.samdb.parse_ldif(out)
617             out_msg = next(out_msgs)[1]
618
619             self.assertIn("whenCreated", out_msg)
620             when_created_str = str(out_msg["whenCreated"][0])
621             self.assertIn("whenCreated;format=GeneralizedTime", out_msg)
622             self.assertEqual(str(out_msg["whenCreated;format=GeneralizedTime"][0]), when_created_str)
623             when_created_time = ldb.string_to_time(when_created_str)
624             self.assertIn("whenCreated;format=UnixTime", out_msg)
625             self.assertEqual(str(out_msg["whenCreated;format=UnixTime"][0]), str(when_created_time))
626             self.assertIn("whenCreated;format=TimeSpec", out_msg)
627             self.assertEqual(str(out_msg["whenCreated;format=TimeSpec"][0]),
628                              "%d.000000000" % (when_created_time))
629
630             self.assertIn("whenChanged", out_msg)
631             when_changed_str = str(out_msg["whenChanged"][0])
632             self.assertIn("whenChanged;format=GeneralizedTime", out_msg)
633             self.assertEqual(str(out_msg["whenChanged;format=GeneralizedTime"][0]), when_changed_str)
634             when_changed_time = ldb.string_to_time(when_changed_str)
635             self.assertIn("whenChanged;format=UnixTime", out_msg)
636             self.assertEqual(str(out_msg["whenChanged;format=UnixTime"][0]), str(when_changed_time))
637             self.assertIn("whenChanged;format=TimeSpec", out_msg)
638             self.assertEqual(str(out_msg["whenChanged;format=TimeSpec"][0]),
639                              "%d.000000000" % (when_changed_time))
640
641             self.assertIn("pwdLastSet;format=GeneralizedTime", out_msg)
642             pwd_last_set_str = str(out_msg["pwdLastSet;format=GeneralizedTime"][0])
643             pwd_last_set_time = ldb.string_to_time(pwd_last_set_str)
644             self.assertIn("pwdLastSet;format=UnixTime", out_msg)
645             self.assertEqual(str(out_msg["pwdLastSet;format=UnixTime"][0]), str(pwd_last_set_time))
646             self.assertIn("pwdLastSet;format=TimeSpec", out_msg)
647             self.assertIn("%d." % pwd_last_set_time, str(out_msg["pwdLastSet;format=TimeSpec"][0]))
648             self.assertNotIn(".000000000", str(out_msg["pwdLastSet;format=TimeSpec"][0]))
649
650             # assert that the pwd has been set in the minute after user creation
651             self.assertGreaterEqual(pwd_last_set_time, when_created_time)
652             self.assertLess(pwd_last_set_time, when_created_time + 60)
653
654             self.assertIn("msDS-UserPasswordExpiryTimeComputed;format=GeneralizedTime", out_msg)
655             pwd_expires_str = str(out_msg["msDS-UserPasswordExpiryTimeComputed;format=GeneralizedTime"][0])
656             pwd_expires_time = ldb.string_to_time(pwd_expires_str)
657             self.assertIn("msDS-UserPasswordExpiryTimeComputed;format=UnixTime", out_msg)
658             self.assertEqual(str(out_msg["msDS-UserPasswordExpiryTimeComputed;format=UnixTime"][0]), str(pwd_expires_time))
659             self.assertIn("msDS-UserPasswordExpiryTimeComputed;format=TimeSpec", out_msg)
660             self.assertIn("%d." % pwd_expires_time, str(out_msg["msDS-UserPasswordExpiryTimeComputed;format=TimeSpec"][0]))
661             self.assertNotIn(".000000000", str(out_msg["msDS-UserPasswordExpiryTimeComputed;format=TimeSpec"][0]))
662
663             # assert that the pwd expires after it was set
664             self.assertGreater(pwd_expires_time, pwd_last_set_time)
665
666     def test_move(self):
667         full_ou_dn = str(self.samdb.normalize_dn_in_domain("OU=movetest"))
668         (result, out, err) = self.runsubcmd("ou", "add", full_ou_dn)
669         self.assertCmdSuccess(result, out, err)
670         self.assertEqual(err, "", "There shouldn't be any error message")
671         self.assertIn('Added ou "%s"' % full_ou_dn, out)
672
673         for user in self.users:
674             (result, out, err) = self.runsubcmd(
675                 "user", "move", user["name"], full_ou_dn)
676             self.assertCmdSuccess(result, out, err, "Error running move")
677             self.assertIn('Moved user "%s" into "%s"' %
678                           (user["name"], full_ou_dn), out)
679
680         # Should fail as users objects are in OU
681         (result, out, err) = self.runsubcmd("ou", "delete", full_ou_dn)
682         self.assertCmdFail(result)
683         self.assertIn(("subtree_delete: Unable to delete a non-leaf node "
684                        "(it has %d children)!") % len(self.users), err)
685
686         for user in self.users:
687             new_dn = "CN=Users,%s" % self.samdb.domain_dn()
688             (result, out, err) = self.runsubcmd(
689                 "user", "move", user["name"], new_dn)
690             self.assertCmdSuccess(result, out, err, "Error running move")
691             self.assertIn('Moved user "%s" into "%s"' %
692                           (user["name"], new_dn), out)
693
694         (result, out, err) = self.runsubcmd("ou", "delete", full_ou_dn)
695         self.assertCmdSuccess(result, out, err,
696                               "Failed to delete ou '%s'" % full_ou_dn)
697
698     def test_rename_surname_initials_givenname(self):
699         """rename the existing surname and given name and add missing
700         initials, then remove them, for all users"""
701         for user in self.users:
702             new_givenname = "new_given_name_of_" + user["name"]
703             new_initials = "A"
704             new_surname = "new_surname_of_" + user["name"]
705             found = self._find_user(user["name"])
706             old_cn = str(found.get("cn"))
707
708             # rename given name, initials and surname
709             (result, out, err) = self.runsubcmd("user", "rename", user["name"],
710                                                 "--surname=%s" % new_surname,
711                                                 "--initials=%s" % new_initials,
712                                                 "--given-name=%s" % new_givenname)
713             self.assertCmdSuccess(result, out, err)
714             self.assertEqual(err, "", "Shouldn't be any error messages")
715             self.assertIn('successfully', out)
716
717             found = self._find_user(user["name"])
718             self.assertEqual("%s" % found.get("givenName"), new_givenname)
719             self.assertEqual("%s" % found.get("initials"), new_initials)
720             self.assertEqual("%s" % found.get("sn"), new_surname)
721             self.assertEqual("%s" % found.get("name"),
722                              "%s %s. %s" % (new_givenname, new_initials, new_surname))
723             self.assertEqual("%s" % found.get("cn"),
724                              "%s %s. %s" % (new_givenname, new_initials, new_surname))
725
726             # remove given name, initials and surname
727             (result, out, err) = self.runsubcmd("user", "rename", user["name"],
728                                                 "--surname=",
729                                                 "--initials=",
730                                                 "--given-name=")
731             self.assertCmdSuccess(result, out, err)
732             self.assertEqual(err, "", "Shouldn't be any error messages")
733             self.assertIn('successfully', out)
734
735             found = self._find_user(user["name"])
736             self.assertEqual(found.get("givenName"), None)
737             self.assertEqual(found.get("initials"), None)
738             self.assertEqual(found.get("sn"), None)
739             self.assertEqual("%s" % found.get("cn"), user["name"])
740
741             # reset changes (initials are removed)
742             (result, out, err) = self.runsubcmd("user", "rename", user["name"],
743                                                 "--surname=%(surname)s" % user,
744                                                 "--given-name=%(given-name)s" % user)
745             self.assertCmdSuccess(result, out, err)
746
747             if old_cn:
748                 (result, out, err) = self.runsubcmd("user", "rename", user["name"],
749                                                 "--force-new-cn=%s" % old_cn)
750
751     def test_rename_cn_samaccountname(self):
752         """rename and try to remove the cn and the samaccount of all users"""
753         for user in self.users:
754             new_cn = "new_cn_of_" + user["name"]
755             new_samaccountname = "new_samaccount_of_" + user["name"]
756             new_surname = "new_surname_of_" + user["name"]
757
758             # rename cn
759             (result, out, err) = self.runsubcmd("user", "rename", user["name"],
760                                                 "--samaccountname=%s"
761                                                  % new_samaccountname,
762                                                 "--force-new-cn=%s" % new_cn)
763             self.assertCmdSuccess(result, out, err)
764             self.assertEqual(err, "", "Shouldn't be any error messages")
765             self.assertIn('successfully', out)
766
767             found = self._find_user(new_samaccountname)
768             self.assertEqual("%s" % found.get("cn"), new_cn)
769             self.assertEqual("%s" % found.get("sAMAccountName"),
770                              new_samaccountname)
771
772             # changing the surname has no effect to the cn
773             (result, out, err) = self.runsubcmd("user", "rename", new_samaccountname,
774                                                 "--surname=%s" % new_surname)
775             self.assertCmdSuccess(result, out, err)
776
777             found = self._find_user(new_samaccountname)
778             self.assertEqual("%s" % found.get("cn"), new_cn)
779
780             # trying to remove cn (throws an error)
781             (result, out, err) = self.runsubcmd("user", "rename",
782                                                 new_samaccountname,
783                                                 "--force-new-cn=")
784             self.assertCmdFail(result)
785             self.assertIn('Failed to rename user', err)
786             self.assertIn("delete protected attribute", err)
787
788             # trying to remove the samccountname (throws an error)
789             (result, out, err) = self.runsubcmd("user", "rename",
790                                                 new_samaccountname,
791                                                 "--samaccountname=")
792             self.assertCmdFail(result)
793             self.assertIn('Failed to rename user', err)
794             self.assertIn('delete protected attribute', err)
795
796             # reset changes (cn must be the name)
797             (result, out, err) = self.runsubcmd("user", "rename", new_samaccountname,
798                                                 "--samaccountname=%(name)s"
799                                                   % user,
800                                                 "--force-new-cn=%(name)s" % user)
801             self.assertCmdSuccess(result, out, err)
802
803     def test_rename_standard_cn(self):
804         """reset the cn of all users to the standard"""
805         for user in self.users:
806             new_cn = "new_cn_of_" + user["name"]
807             new_givenname = "new_given_name_of_" + user["name"]
808             new_initials = "A"
809             new_surname = "new_surname_of_" + user["name"]
810
811             # set different cn
812             (result, out, err) = self.runsubcmd("user", "rename", user["name"],
813                                                 "--force-new-cn=%s" % new_cn)
814             self.assertCmdSuccess(result, out, err)
815
816             # remove given name, initials and surname
817             (result, out, err) = self.runsubcmd("user", "rename", user["name"],
818                                                 "--surname=",
819                                                 "--initials=",
820                                                 "--given-name=")
821             self.assertCmdSuccess(result, out, err)
822
823             # reset the CN (no given name, initials or surname --> samaccountname)
824             (result, out, err) = self.runsubcmd("user", "rename", user["name"],
825                                                 "--reset-cn")
826
827             self.assertCmdSuccess(result, out, err)
828             self.assertEqual(err, "", "Shouldn't be any error messages")
829             self.assertIn('successfully', out)
830
831             found = self._find_user(user["name"])
832             self.assertEqual("%s" % found.get("cn"), user["name"])
833
834             # set given name, initials and surname and set different cn
835             (result, out, err) = self.runsubcmd("user", "rename", user["name"],
836                                                 "--force-new-cn=%s" % new_cn,
837                                                 "--surname=%s" % new_surname,
838                                                 "--initials=%s" % new_initials,
839                                                 "--given-name=%s" % new_givenname)
840             self.assertCmdSuccess(result, out, err)
841
842             # reset the CN (given name, initials or surname are given --> given name)
843             (result, out, err) = self.runsubcmd("user", "rename", user["name"],
844                                                 "--reset-cn")
845
846             self.assertCmdSuccess(result, out, err)
847             self.assertEqual(err, "", "Shouldn't be any error messages")
848             self.assertIn('successfully', out)
849
850             found = self._find_user(user["name"])
851             self.assertEqual("%s" % found.get("cn"),
852                              "%s %s. %s" % (new_givenname, new_initials, new_surname))
853
854             # reset changes
855             (result, out, err) = self.runsubcmd("user", "rename", user["name"],
856                                                 "--reset-cn",
857                                                 "--initials=",
858                                                 "--surname=%(surname)s" % user,
859                                                 "--given-name=%(given-name)s" % user)
860             self.assertCmdSuccess(result, out, err)
861
862     def test_rename_mailaddress_displayname(self):
863         for user in self.users:
864             new_mail = "new_mailaddress_of_" + user["name"]
865             new_displayname = "new displayname of " + user["name"]
866
867             # change mail and displayname
868             (result, out, err) = self.runsubcmd("user", "rename", user["name"],
869                                                 "--mail-address=%s"
870                                                   % new_mail,
871                                                 "--display-name=%s"
872                                                   % new_displayname)
873             self.assertCmdSuccess(result, out, err)
874             self.assertEqual(err, "", "Shouldn't be any error messages")
875             self.assertIn('successfully', out)
876
877             found = self._find_user(user["name"])
878             self.assertEqual("%s" % found.get("mail"), new_mail)
879             self.assertEqual("%s" % found.get("displayName"), new_displayname)
880
881             # remove mail and displayname
882             (result, out, err) = self.runsubcmd("user", "rename", user["name"],
883                                                 "--mail-address=",
884                                                 "--display-name=")
885             self.assertCmdSuccess(result, out, err)
886             self.assertEqual(err, "", "Shouldn't be any error messages")
887             self.assertIn('successfully', out)
888
889             found = self._find_user(user["name"])
890             self.assertEqual(found.get("mail"), None)
891             self.assertEqual(found.get("displayName"), None)
892
893     def test_rename_upn(self):
894         """rename upn of all users"""
895         for user in self.users:
896             found = self._find_user(user["name"])
897             old_upn = "%s" % found.get("userPrincipalName")
898             valid_suffix = old_upn.split('@')[1]   # samba.example.com
899
900             valid_new_upn = "new_%s@%s" % (user["name"], valid_suffix)
901             invalid_new_upn = "%s@invalid.suffix" + user["name"]
902
903             # trying to set invalid upn
904             (result, out, err) = self.runsubcmd("user", "rename", user["name"],
905                                                 "--upn=%s"
906                                                   % invalid_new_upn)
907             self.assertCmdFail(result)
908             self.assertIn('is not a valid upn', err)
909
910             # set valid upn
911             (result, out, err) = self.runsubcmd("user", "rename", user["name"],
912                                                 "--upn=%s"
913                                                   % valid_new_upn)
914             self.assertCmdSuccess(result, out, err)
915             self.assertEqual(err, "", "Shouldn't be any error messages")
916             self.assertIn('successfully', out)
917
918             found = self._find_user(user["name"])
919             self.assertEqual("%s" % found.get("userPrincipalName"), valid_new_upn)
920
921             # trying to remove upn
922             (result, out, err) = self.runsubcmd("user", "rename", user["name"],
923                                                 "--upn=%s")
924             self.assertCmdFail(result)
925             self.assertIn('is not a valid upn', err)
926
927             # reset upn
928             (result, out, err) = self.runsubcmd("user", "rename", user["name"],
929                                                 "--upn=%s" % old_upn)
930             self.assertCmdSuccess(result, out, err)
931
932     def test_getpwent(self):
933         try:
934             import pwd
935         except ImportError:
936             self.skipTest("Skipping getpwent test, no 'pwd' module available")
937             return
938
939         # get the current user's data for the test
940         uid = os.geteuid()
941         try:
942             u = pwd.getpwuid(uid)
943         except KeyError:
944             self.skipTest("Skipping getpwent test, current EUID not found in NSS")
945             return
946
947
948 # samba-tool user create command didn't support users with empty gecos if none is
949 # specified on the command line and the user hasn't one in the passwd file it
950 # will fail, so let's add some contents
951
952         gecos = u[4]
953         if (gecos is None or len(gecos) == 0):
954             gecos = "Foo GECOS"
955         user = self._randomPosixUser({
956                         "name": u[0],
957                         "uid": u[0],
958                         "uidNumber": u[2],
959                         "gidNumber": u[3],
960                         "gecos": gecos,
961                         "loginShell": u[6],
962                         })
963         # check if --rfc2307-from-nss sets the same values as we got from pwd.getpwuid()
964         (result, out, err) = self.runsubcmd("user", "create", user["name"], user["password"],
965                                             "--surname=%s" % user["surname"],
966                                             "--given-name=%s" % user["given-name"],
967                                             "--job-title=%s" % user["job-title"],
968                                             "--department=%s" % user["department"],
969                                             "--description=%s" % user["description"],
970                                             "--company=%s" % user["company"],
971                                             "--gecos=%s" % user["gecos"],
972                                             "--rfc2307-from-nss",
973                                             "-H", "ldap://%s" % os.environ["DC_SERVER"],
974                                             "-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"]))
975
976         self.assertCmdSuccess(result, out, err)
977         self.assertEqual(err, "", "Shouldn't be any error messages")
978         self.assertIn("User '%s' added successfully" % user["name"], out)
979
980         self._check_posix_user(user)
981         self.runsubcmd("user", "delete", user["name"])
982
983         # Check if overriding the attributes from NSS with explicit values works
984         #
985         # get a user with all random posix attributes
986         user = self._randomPosixUser({"name": u[0]})
987         # create a user with posix attributes from nss but override all of them with the
988         # random ones just obtained
989         (result, out, err) = self.runsubcmd("user", "create", user["name"], user["password"],
990                                             "--surname=%s" % user["surname"],
991                                             "--given-name=%s" % user["given-name"],
992                                             "--job-title=%s" % user["job-title"],
993                                             "--department=%s" % user["department"],
994                                             "--description=%s" % user["description"],
995                                             "--company=%s" % user["company"],
996                                             "--rfc2307-from-nss",
997                                             "--gecos=%s" % user["gecos"],
998                                             "--login-shell=%s" % user["loginShell"],
999                                             "--uid=%s" % user["uid"],
1000                                             "--uid-number=%s" % user["uidNumber"],
1001                                             "--gid-number=%s" % user["gidNumber"],
1002                                             "-H", "ldap://%s" % os.environ["DC_SERVER"],
1003                                             "-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"]))
1004
1005         self.assertCmdSuccess(result, out, err)
1006         self.assertEqual(err, "", "Shouldn't be any error messages")
1007         self.assertIn("User '%s' added successfully" % user["name"], out)
1008
1009         self._check_posix_user(user)
1010         self.runsubcmd("user", "delete", user["name"])
1011
1012     # Test: samba-tool user unlock
1013     # This test does not verify that the command unlocks the user, it just
1014     # tests the command itself. The unlock test, which unlocks locked users,
1015     # is located in the 'samba4.ldap.password_lockout' test in
1016     # source4/dsdb/tests/python/password_lockout.py
1017     def test_unlock(self):
1018
1019         # try to unlock a nonexistent user, this should fail
1020         nonexistentusername = "userdoesnotexist"
1021         (result, out, err) = self.runsubcmd(
1022             "user", "unlock", nonexistentusername)
1023         self.assertCmdFail(result, "Ensure that unlock nonexistent user fails")
1024         self.assertIn("Failed to unlock user '%s'" % nonexistentusername, err)
1025         self.assertIn("Unable to find user", err)
1026
1027         # try to unlock with insufficient permissions, this should fail
1028         unprivileged_username = "unprivilegedunlockuser"
1029         unlocktest_username = "usertounlock"
1030
1031         self.runsubcmd("user", "add", unprivileged_username, "Passw0rd")
1032         self.runsubcmd("user", "add", unlocktest_username, "Passw0rd")
1033
1034         (result, out, err) = self.runsubcmd(
1035             "user", "unlock", unlocktest_username,
1036             "-H", "ldap://%s" % os.environ["DC_SERVER"],
1037             "-U%s%%%s" % (unprivileged_username,
1038                           "Passw0rd"))
1039         self.assertCmdFail(result, "Fail with LDAP_INSUFFICIENT_ACCESS_RIGHTS")
1040         self.assertIn("Failed to unlock user '%s'" % unlocktest_username, err)
1041         self.assertIn("LDAP error 50 LDAP_INSUFFICIENT_ACCESS_RIGHTS", err)
1042
1043         self.runsubcmd("user", "delete", unprivileged_username)
1044         self.runsubcmd("user", "delete", unlocktest_username)
1045
1046         # run unlock against test users
1047         for user in self.users:
1048             (result, out, err) = self.runsubcmd(
1049                 "user", "unlock", user["name"])
1050             self.assertCmdSuccess(result, out, err, "Error running user unlock")
1051             self.assertEqual(err, "", "Shouldn't be any error messages")
1052
1053     def _randomUser(self, base={}):
1054         """create a user with random attribute values, you can specify base attributes"""
1055         user = {
1056             "name": self.randomName(),
1057             "password": self.random_password(16),
1058             "surname": self.randomName(),
1059             "given-name": self.randomName(),
1060             "job-title": self.randomName(),
1061             "department": self.randomName(),
1062             "company": self.randomName(),
1063             "description": self.randomName(count=100),
1064             "createUserFn": self._create_user,
1065             "checkUserFn": self._check_user,
1066         }
1067         user.update(base)
1068         return user
1069
1070     def _randomPosixUser(self, base={}):
1071         """create a user with random attribute values and additional RFC2307
1072         attributes, you can specify base attributes"""
1073         user = self._randomUser({})
1074         user.update(base)
1075         posixAttributes = {
1076             "uid": self.randomName(),
1077             "loginShell": self.randomName(),
1078             "gecos": self.randomName(),
1079             "uidNumber": self.randomXid(),
1080             "gidNumber": self.randomXid(),
1081             "createUserFn": self._create_posix_user,
1082             "checkUserFn": self._check_posix_user,
1083         }
1084         user.update(posixAttributes)
1085         user.update(base)
1086         return user
1087
1088     def _randomUnixUser(self, base={}):
1089         """create a user with random attribute values and additional RFC2307
1090         attributes, you can specify base attributes"""
1091         user = self._randomUser({})
1092         user.update(base)
1093         posixAttributes = {
1094             "uidNumber": self.randomXid(),
1095             "gidNumber": self.randomXid(),
1096             "uid": self.randomName(),
1097             "loginShell": self.randomName(),
1098             "gecos": self.randomName(),
1099             "createUserFn": self._create_unix_user,
1100             "checkUserFn": self._check_unix_user,
1101         }
1102         user.update(posixAttributes)
1103         user.update(base)
1104         return user
1105
1106     def _check_user(self, user):
1107         """ check if a user from SamDB has the same attributes as its template """
1108         found = self._find_user(user["name"])
1109
1110         self.assertEqual("%s" % found.get("name"), "%(given-name)s %(surname)s" % user)
1111         self.assertEqual("%s" % found.get("title"), user["job-title"])
1112         self.assertEqual("%s" % found.get("company"), user["company"])
1113         self.assertEqual("%s" % found.get("description"), user["description"])
1114         self.assertEqual("%s" % found.get("department"), user["department"])
1115
1116     def _check_posix_user(self, user):
1117         """ check if a posix_user from SamDB has the same attributes as its template """
1118         found = self._find_user(user["name"])
1119
1120         self.assertEqual("%s" % found.get("loginShell"), user["loginShell"])
1121         self.assertEqual("%s" % found.get("gecos"), user["gecos"])
1122         self.assertEqual("%s" % found.get("uidNumber"), "%s" % user["uidNumber"])
1123         self.assertEqual("%s" % found.get("gidNumber"), "%s" % user["gidNumber"])
1124         self.assertEqual("%s" % found.get("uid"), user["uid"])
1125         self._check_user(user)
1126
1127     def _check_unix_user(self, user):
1128         """ check if a unix_user from SamDB has the same attributes as its
1129 template """
1130         found = self._find_user(user["name"])
1131
1132         self.assertEqual("%s" % found.get("loginShell"), user["loginShell"])
1133         self.assertEqual("%s" % found.get("gecos"), user["gecos"])
1134         self.assertEqual("%s" % found.get("uidNumber"), "%s" %
1135                           user["uidNumber"])
1136         self.assertEqual("%s" % found.get("gidNumber"), "%s" %
1137                           user["gidNumber"])
1138         self.assertEqual("%s" % found.get("uid"), user["uid"])
1139         self.assertIn('/home/test/', "%s" % found.get("unixHomeDirectory"))
1140         self._check_user(user)
1141
1142     def _create_user(self, user):
1143         return self.runsubcmd("user", "add", user["name"], user["password"],
1144                               "--surname=%s" % user["surname"],
1145                               "--given-name=%s" % user["given-name"],
1146                               "--job-title=%s" % user["job-title"],
1147                               "--department=%s" % user["department"],
1148                               "--description=%s" % user["description"],
1149                               "--company=%s" % user["company"],
1150                               "-H", "ldap://%s" % os.environ["DC_SERVER"],
1151                               "-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"]))
1152
1153     def _create_posix_user(self, user):
1154         """ create a new user with RFC2307 attributes """
1155         return self.runsubcmd("user", "create", user["name"], user["password"],
1156                               "--surname=%s" % user["surname"],
1157                               "--given-name=%s" % user["given-name"],
1158                               "--job-title=%s" % user["job-title"],
1159                               "--department=%s" % user["department"],
1160                               "--description=%s" % user["description"],
1161                               "--company=%s" % user["company"],
1162                               "--gecos=%s" % user["gecos"],
1163                               "--login-shell=%s" % user["loginShell"],
1164                               "--uid=%s" % user["uid"],
1165                               "--uid-number=%s" % user["uidNumber"],
1166                               "--gid-number=%s" % user["gidNumber"],
1167                               "-H", "ldap://%s" % os.environ["DC_SERVER"],
1168                               "-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"]))
1169
1170     def _create_unix_user(self, user):
1171         """ Add RFC2307 attributes to a user"""
1172         self._create_user(user)
1173         return self.runsubcmd("user", "addunixattrs", user["name"],
1174                               "%s" % user["uidNumber"],
1175                               "--gid-number=%s" % user["gidNumber"],
1176                               "--gecos=%s" % user["gecos"],
1177                               "--login-shell=%s" % user["loginShell"],
1178                               "--uid=%s" % user["uid"],
1179                               "-H", "ldap://%s" % os.environ["DC_SERVER"],
1180                               "-U%s%%%s" % (os.environ["DC_USERNAME"],
1181                                             os.environ["DC_PASSWORD"]))
1182
1183     def _find_user(self, name):
1184         search_filter = "(&(sAMAccountName=%s)(objectCategory=%s,%s))" % (ldb.binary_encode(name), "CN=Person,CN=Schema,CN=Configuration", self.samdb.domain_dn())
1185         userlist = self.samdb.search(base=self.samdb.domain_dn(),
1186                                      scope=ldb.SCOPE_SUBTREE,
1187                                      expression=search_filter)
1188         if userlist:
1189             return userlist[0]
1190         else:
1191             return None