1 # Unix SMB/CIFS implementation.
2 # Copyright (C) Sean Dague <sdague@linux.vnet.ibm.com> 2011
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
14 # You should have received a copy of the GNU General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
22 from samba.tests.samba_tool.base import SambaToolCmdTest
28 from samba.ndr import ndr_unpack
29 from samba.dcerpc import drsblobs
30 from samba.common import get_bytes
31 from samba.common import get_string
32 from samba.tests import env_loadparm
35 class UserCmdTestCase(SambaToolCmdTest):
36 """Tests for samba-tool user subcommands"""
41 super(UserCmdTestCase, self).setUp()
42 self.samdb = self.getSamDB("-H", "ldap://%s" % os.environ["DC_SERVER"],
43 "-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"]))
45 # Modify the default template homedir
46 lp = self.get_loadparm()
47 self.template_homedir = lp.get('template homedir')
48 lp.set('template homedir', '/home/test/%D/%U')
51 self.users.append(self._randomUser({"name": "sambatool1", "company": "comp1"}))
52 self.users.append(self._randomUser({"name": "sambatool2", "company": "comp1"}))
53 self.users.append(self._randomUser({"name": "sambatool3", "company": "comp2"}))
54 self.users.append(self._randomUser({"name": "sambatool4", "company": "comp2"}))
55 self.users.append(self._randomPosixUser({"name": "posixuser1"}))
56 self.users.append(self._randomPosixUser({"name": "posixuser2"}))
57 self.users.append(self._randomPosixUser({"name": "posixuser3"}))
58 self.users.append(self._randomPosixUser({"name": "posixuser4"}))
59 self.users.append(self._randomUnixUser({"name": "unixuser1"}))
60 self.users.append(self._randomUnixUser({"name": "unixuser2"}))
61 self.users.append(self._randomUnixUser({"name": "unixuser3"}))
62 self.users.append(self._randomUnixUser({"name": "unixuser4"}))
64 # setup the 12 users and ensure they are correct
65 for user in self.users:
66 (result, out, err) = user["createUserFn"](user)
68 self.assertCmdSuccess(result, out, err)
69 self.assertEqual(err, "", "Shouldn't be any error messages")
70 if 'unix' in user["name"]:
71 self.assertIn("Modified User '%s' successfully" % user["name"],
74 self.assertIn("User '%s' added successfully" % user["name"],
77 user["checkUserFn"](user)
80 super(UserCmdTestCase, self).tearDown()
81 # clean up all the left over users, just in case
82 for user in self.users:
83 if self._find_user(user["name"]):
84 self.runsubcmd("user", "delete", user["name"])
86 # second run of this test
87 # the cache is still there and '--cache-ldb-initialize'
89 cachedb = lp.private_path("user-syncpasswords-cache.ldb")
90 if os.path.exists(cachedb):
92 lp.set('template homedir', self.template_homedir)
94 def test_newuser(self):
95 # try to add all the users again, this should fail
96 for user in self.users:
97 (result, out, err) = self._create_user(user)
98 self.assertCmdFail(result, "Ensure that create user fails")
99 self.assertIn("LDAP error 68 LDAP_ENTRY_ALREADY_EXISTS", err)
101 # try to delete all the 4 users we just added
102 for user in self.users:
103 (result, out, err) = self.runsubcmd("user", "delete", user["name"])
104 self.assertCmdSuccess(result, out, err, "Can we delete users")
105 found = self._find_user(user["name"])
106 self.assertIsNone(found)
108 # test adding users with --use-username-as-cn
109 for user in self.users:
110 (result, out, err) = self.runsubcmd("user", "create", user["name"], user["password"],
111 "--use-username-as-cn",
112 "--surname=%s" % user["surname"],
113 "--given-name=%s" % user["given-name"],
114 "--job-title=%s" % user["job-title"],
115 "--department=%s" % user["department"],
116 "--description=%s" % user["description"],
117 "--company=%s" % user["company"],
118 "-H", "ldap://%s" % os.environ["DC_SERVER"],
119 "-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"]))
121 self.assertCmdSuccess(result, out, err)
122 self.assertEqual(err, "", "Shouldn't be any error messages")
123 self.assertIn("User '%s' added successfully" % user["name"], out)
125 found = self._find_user(user["name"])
127 self.assertEqual("%s" % found.get("cn"), "%(name)s" % user)
128 self.assertEqual("%s" % found.get("name"), "%(name)s" % user)
130 def _verify_supplementalCredentials(self, ldif,
133 msgs = self.samdb.parse_ldif(ldif)
134 (changetype, obj) = next(msgs)
136 self.assertIn("supplementalCredentials", obj, "supplementalCredentials attribute required")
137 sc_blob = obj["supplementalCredentials"][0]
138 sc = ndr_unpack(drsblobs.supplementalCredentialsBlob, sc_blob)
140 self.assertGreaterEqual(sc.sub.num_packages,
141 min_packages, "min_packages check")
142 self.assertLessEqual(sc.sub.num_packages,
143 max_packages, "max_packages check")
145 if max_packages == 0:
148 def find_package(packages, name, start_idx=0):
149 for i in range(start_idx, len(packages)):
150 if packages[i].name == name:
151 return (i, packages[i])
154 # The ordering is this
156 # Primary:Kerberos-Newer-Keys (optional)
159 # Primary:CLEARTEXT (optional)
160 # Primary:SambaGPG (optional)
162 # And the 'Packages' package is insert before the last
166 (pidx, pp) = find_package(sc.sub.packages, "Packages", start_idx=nidx)
167 self.assertIsNotNone(pp, "Packages required")
168 self.assertEqual(pidx + 1, sc.sub.num_packages - 1,
169 "Packages needs to be at num_packages - 1")
171 (knidx, knp) = find_package(sc.sub.packages, "Primary:Kerberos-Newer-Keys",
173 if knidx is not None:
174 self.assertEqual(knidx, nidx, "Primary:Kerberos-Newer-Keys at wrong position")
179 (kidx, kp) = find_package(sc.sub.packages, "Primary:Kerberos",
181 self.assertIsNotNone(pp, "Primary:Kerberos required")
182 self.assertEqual(kidx, nidx, "Primary:Kerberos at wrong position")
187 (widx, wp) = find_package(sc.sub.packages, "Primary:WDigest",
189 self.assertIsNotNone(pp, "Primary:WDigest required")
190 self.assertEqual(widx, nidx, "Primary:WDigest at wrong position")
195 (cidx, cp) = find_package(sc.sub.packages, "Primary:CLEARTEXT",
198 self.assertEqual(cidx, nidx, "Primary:CLEARTEXT at wrong position")
203 (gidx, gp) = find_package(sc.sub.packages, "Primary:SambaGPG",
206 self.assertEqual(gidx, nidx, "Primary:SambaGPG at wrong position")
211 self.assertEqual(nidx, sc.sub.num_packages, "Unknown packages found")
213 def test_setpassword(self):
214 expect_nt_hash = bool(int(os.environ.get("EXPECT_NT_HASH", "1")))
216 for user in self.users:
217 newpasswd = self.random_password(16)
218 (result, out, err) = self.runsubcmd("user", "setpassword",
220 "--newpassword=%s" % newpasswd,
221 "-H", "ldap://%s" % os.environ["DC_SERVER"],
222 "-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"]))
223 self.assertCmdSuccess(result, out, err, "Ensure setpassword runs")
224 self.assertEqual(err, "", "setpassword with url")
225 self.assertMatch(out, "Changed password OK", "setpassword with url")
227 attributes = "sAMAccountName,unicodePwd,supplementalCredentials,virtualClearTextUTF8,virtualClearTextUTF16,virtualSSHA,virtualSambaGPG"
228 (result, out, err) = self.runsubcmd("user", "syncpasswords",
229 "--cache-ldb-initialize",
230 "--attributes=%s" % attributes,
231 "--decrypt-samba-gpg")
232 self.assertCmdSuccess(result, out, err, "Ensure syncpasswords --cache-ldb-initialize runs")
233 self.assertEqual(err, "", "getpassword without url")
235 "objectClass": {"value": "userSyncPasswords"},
238 "dirsyncAttribute": {},
239 "dirsyncControl": {"value": "dirsync:1:0:0"},
240 "passwordAttribute": {},
241 "decryptSambaGPG": {},
244 for a in cache_attrs.keys():
245 v = cache_attrs[a].get("value", "")
246 self.assertMatch(out, "%s: %s" % (a, v),
247 "syncpasswords --cache-ldb-initialize: %s: %s out[%s]" % (a, v, out))
249 (result, out, err) = self.runsubcmd("user", "syncpasswords", "--no-wait")
250 self.assertCmdSuccess(result, out, err, "Ensure syncpasswords --no-wait runs")
251 self.assertEqual(err, "", "syncpasswords --no-wait")
252 self.assertMatch(out, "dirsync_loop(): results 0",
253 "syncpasswords --no-wait: 'dirsync_loop(): results 0': out[%s]" % (out))
254 for user in self.users:
255 self.assertMatch(out, "sAMAccountName: %s" % (user["name"]),
256 "syncpasswords --no-wait: 'sAMAccountName': %s out[%s]" % (user["name"], out))
258 for user in self.users:
259 newpasswd = self.random_password(16)
260 creds = credentials.Credentials()
261 creds.set_anonymous()
262 creds.set_password(newpasswd)
263 unicodePwd = base64.b64encode(creds.get_nt_hash()).decode('utf8')
264 virtualClearTextUTF8 = base64.b64encode(get_bytes(newpasswd)).decode('utf8')
265 virtualClearTextUTF16 = base64.b64encode(get_string(newpasswd).encode('utf-16-le')).decode('utf8')
267 (result, out, err) = self.runsubcmd("user", "setpassword",
269 "--newpassword=%s" % newpasswd)
270 self.assertCmdSuccess(result, out, err, "Ensure setpassword runs")
271 self.assertEqual(err, "", "setpassword without url")
272 self.assertMatch(out, "Changed password OK", "setpassword without url")
274 (result, out, err) = self.runsubcmd("user", "syncpasswords", "--no-wait")
275 self.assertCmdSuccess(result, out, err, "Ensure syncpasswords --no-wait runs")
276 self.assertEqual(err, "", "syncpasswords --no-wait")
277 self.assertMatch(out, "dirsync_loop(): results 0",
278 "syncpasswords --no-wait: 'dirsync_loop(): results 0': out[%s]" % (out))
279 self.assertMatch(out, "sAMAccountName: %s" % (user["name"]),
280 "syncpasswords --no-wait: 'sAMAccountName': %s out[%s]" % (user["name"], out))
281 self.assertMatch(out, "# unicodePwd::: REDACTED SECRET ATTRIBUTE",
282 "getpassword '# unicodePwd::: REDACTED SECRET ATTRIBUTE': out[%s]" % out)
284 self.assertMatch(out, "unicodePwd:: %s" % unicodePwd,
285 "getpassword unicodePwd: out[%s]" % out)
287 self.assertNotIn("unicodePwd:: %s" % unicodePwd, out)
288 self.assertMatch(out, "# supplementalCredentials::: REDACTED SECRET ATTRIBUTE",
289 "getpassword '# supplementalCredentials::: REDACTED SECRET ATTRIBUTE': out[%s]" % out)
290 self.assertMatch(out, "supplementalCredentials:: ",
291 "getpassword supplementalCredentials: out[%s]" % out)
292 if "virtualSambaGPG:: " in out:
293 self.assertMatch(out, "virtualClearTextUTF8:: %s" % virtualClearTextUTF8,
294 "getpassword virtualClearTextUTF8: out[%s]" % out)
295 self.assertMatch(out, "virtualClearTextUTF16:: %s" % virtualClearTextUTF16,
296 "getpassword virtualClearTextUTF16: out[%s]" % out)
297 self.assertMatch(out, "virtualSSHA: ",
298 "getpassword virtualSSHA: out[%s]" % out)
300 (result, out, err) = self.runsubcmd("user", "getpassword",
302 "--attributes=%s" % attributes,
303 "--decrypt-samba-gpg")
304 self.assertCmdSuccess(result, out, err, "Ensure getpassword runs")
305 self.assertEqual(err, "", "getpassword without url")
306 self.assertMatch(out, "Got password OK", "getpassword without url")
307 self.assertMatch(out, "sAMAccountName: %s" % (user["name"]),
308 "getpassword: 'sAMAccountName': %s out[%s]" % (user["name"], out))
310 self.assertMatch(out, "unicodePwd:: %s" % unicodePwd,
311 "getpassword unicodePwd: out[%s]" % out)
313 self.assertNotIn("unicodePwd:: %s" % unicodePwd, out)
314 self.assertMatch(out, "supplementalCredentials:: ",
315 "getpassword supplementalCredentials: out[%s]" % out)
316 self._verify_supplementalCredentials(out.replace("\nGot password OK\n", ""))
317 if "virtualSambaGPG:: " in out:
318 self.assertMatch(out, "virtualClearTextUTF8:: %s" % virtualClearTextUTF8,
319 "getpassword virtualClearTextUTF8: out[%s]" % out)
320 self.assertMatch(out, "virtualClearTextUTF16:: %s" % virtualClearTextUTF16,
321 "getpassword virtualClearTextUTF16: out[%s]" % out)
322 self.assertMatch(out, "virtualSSHA: ",
323 "getpassword virtualSSHA: out[%s]" % out)
325 for user in self.users:
326 newpasswd = self.random_password(16)
327 (result, out, err) = self.runsubcmd("user", "setpassword",
329 "--newpassword=%s" % newpasswd,
330 "--must-change-at-next-login",
331 "-H", "ldap://%s" % os.environ["DC_SERVER"],
332 "-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"]))
333 self.assertCmdSuccess(result, out, err, "Ensure setpassword runs")
334 self.assertEqual(err, "", "setpassword with forced change")
335 self.assertMatch(out, "Changed password OK", "setpassword with forced change")
337 def test_setexpiry(self):
338 for user in self.users:
339 twodays = time.time() + (2 * 24 * 60 * 60)
341 (result, out, err) = self.runsubcmd("user", "setexpiry", user["name"],
343 "-H", "ldap://%s" % os.environ["DC_SERVER"],
344 "-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"]))
345 self.assertCmdSuccess(result, out, err, "Can we run setexpiry with names")
346 self.assertIn("Expiry for user '%s' set to 2 days." % user["name"], out)
348 found = self._find_user(user["name"])
350 expires = nttime2unix(int("%s" % found.get("accountExpires")))
351 self.assertWithin(expires, twodays, 5, "Ensure account expires is within 5 seconds of the expected time")
353 # TODO: renable this after the filter case is sorted out
354 if "filters are broken, bail now":
357 # now run the expiration based on a filter
358 fourdays = time.time() + (4 * 24 * 60 * 60)
359 (result, out, err) = self.runsubcmd("user", "setexpiry",
360 "--filter", "(&(objectClass=user)(company=comp2))",
362 "-H", "ldap://%s" % os.environ["DC_SERVER"],
363 "-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"]))
364 self.assertCmdSuccess(result, out, err, "Can we run setexpiry with a filter")
366 for user in self.users:
367 found = self._find_user(user["name"])
368 if ("%s" % found.get("company")) == "comp2":
369 expires = nttime2unix(int("%s" % found.get("accountExpires")))
370 self.assertWithin(expires, fourdays, 5, "Ensure account expires is within 5 seconds of the expected time")
372 expires = nttime2unix(int("%s" % found.get("accountExpires")))
373 self.assertWithin(expires, twodays, 5, "Ensure account expires is within 5 seconds of the expected time")
376 (result, out, err) = self.runsubcmd("user", "list",
377 "-H", "ldap://%s" % os.environ["DC_SERVER"],
378 "-U%s%%%s" % (os.environ["DC_USERNAME"],
379 os.environ["DC_PASSWORD"]))
380 self.assertCmdSuccess(result, out, err, "Error running list")
382 search_filter = ("(&(objectClass=user)(userAccountControl:%s:=%u))" %
383 (ldb.OID_COMPARATOR_AND, dsdb.UF_NORMAL_ACCOUNT))
385 userlist = self.samdb.search(base=self.samdb.domain_dn(),
386 scope=ldb.SCOPE_SUBTREE,
387 expression=search_filter,
388 attrs=["samaccountname"])
390 self.assertTrue(len(userlist) > 0, "no users found in samdb")
392 for userobj in userlist:
393 name = str(userobj.get("samaccountname", idx=0))
394 found = self.assertMatch(out, name,
395 "user '%s' not found" % name)
398 def test_list_base_dn(self):
400 (result, out, err) = self.runsubcmd("user", "list", "-b", base_dn,
401 "-H", "ldap://%s" % os.environ["DC_SERVER"],
402 "-U%s%%%s" % (os.environ["DC_USERNAME"],
403 os.environ["DC_PASSWORD"]))
404 self.assertCmdSuccess(result, out, err, "Error running list")
406 search_filter = ("(&(objectClass=user)(userAccountControl:%s:=%u))" %
407 (ldb.OID_COMPARATOR_AND, dsdb.UF_NORMAL_ACCOUNT))
409 userlist = self.samdb.search(base=self.samdb.normalize_dn_in_domain(base_dn),
410 scope=ldb.SCOPE_SUBTREE,
411 expression=search_filter,
412 attrs=["samaccountname"])
414 self.assertTrue(len(userlist) > 0, "no users found in samdb")
416 for userobj in userlist:
417 name = str(userobj.get("samaccountname", idx=0))
418 found = self.assertMatch(out, name,
419 "user '%s' not found" % name)
421 def test_list_full_dn(self):
422 (result, out, err) = self.runsubcmd("user", "list", "--full-dn",
423 "-H", "ldap://%s" % os.environ["DC_SERVER"],
424 "-U%s%%%s" % (os.environ["DC_USERNAME"],
425 os.environ["DC_PASSWORD"]))
426 self.assertCmdSuccess(result, out, err, "Error running list")
428 search_filter = ("(&(objectClass=user)(userAccountControl:%s:=%u))" %
429 (ldb.OID_COMPARATOR_AND, dsdb.UF_NORMAL_ACCOUNT))
431 userlist = self.samdb.search(base=self.samdb.domain_dn(),
432 scope=ldb.SCOPE_SUBTREE,
433 expression=search_filter,
436 self.assertTrue(len(userlist) > 0, "no users found in samdb")
438 for userobj in userlist:
439 name = str(userobj.get("dn", idx=0))
440 found = self.assertMatch(out, name,
441 "user '%s' not found" % name)
443 def test_list_hide_expired(self):
444 expire_username = "expireUser"
445 expire_user = self._randomUser({"name": expire_username})
446 self._create_user(expire_user)
448 (result, out, err) = self.runsubcmd(
453 "ldap://%s" % os.environ["DC_SERVER"],
454 "-U%s%%%s" % (os.environ["DC_USERNAME"],
455 os.environ["DC_PASSWORD"]))
456 self.assertCmdSuccess(result, out, err, "Error running list")
457 self.assertTrue(expire_username in out,
458 "user '%s' not found" % expire_username)
460 # user will be expired one second ago
461 self.samdb.setexpiry(
462 "(sAMAccountname=%s)" % expire_username,
466 (result, out, err) = self.runsubcmd(
471 "ldap://%s" % os.environ["DC_SERVER"],
472 "-U%s%%%s" % (os.environ["DC_USERNAME"],
473 os.environ["DC_PASSWORD"]))
474 self.assertCmdSuccess(result, out, err, "Error running list")
475 self.assertFalse(expire_username in out,
476 "user '%s' found" % expire_username)
478 self.samdb.deleteuser(expire_username)
480 def test_list_hide_disabled(self):
481 disable_username = "disableUser"
482 disable_user = self._randomUser({"name": disable_username})
483 self._create_user(disable_user)
485 (result, out, err) = self.runsubcmd(
490 "ldap://%s" % os.environ["DC_SERVER"],
491 "-U%s%%%s" % (os.environ["DC_USERNAME"],
492 os.environ["DC_PASSWORD"]))
493 self.assertCmdSuccess(result, out, err, "Error running list")
494 self.assertTrue(disable_username in out,
495 "user '%s' not found" % disable_username)
497 self.samdb.disable_account("(sAMAccountname=%s)" % disable_username)
499 (result, out, err) = self.runsubcmd(
504 "ldap://%s" % os.environ["DC_SERVER"],
505 "-U%s%%%s" % (os.environ["DC_USERNAME"],
506 os.environ["DC_PASSWORD"]))
507 self.assertCmdSuccess(result, out, err, "Error running list")
508 self.assertFalse(disable_username in out,
509 "user '%s' found" % disable_username)
511 self.samdb.deleteuser(disable_username)
514 for user in self.users:
515 (result, out, err) = self.runsubcmd(
516 "user", "show", user["name"],
517 "--attributes=sAMAccountName,company",
518 "-H", "ldap://%s" % os.environ["DC_SERVER"],
519 "-U%s%%%s" % (os.environ["DC_USERNAME"],
520 os.environ["DC_PASSWORD"]))
521 self.assertCmdSuccess(result, out, err, "Error running show")
523 expected_out = """dn: CN=%s %s,CN=Users,%s
527 """ % (user["given-name"], user["surname"], self.samdb.domain_dn(),
528 user["company"], user["name"])
530 self.assertEqual(out, expected_out,
531 "Unexpected show output for user '%s'" %
535 "name", # test that invalid values are just ignored
542 "lastLogonTimestamp",
544 "msDS-UserPasswordExpiryTimeComputed",
549 for ta in time_attrs:
551 for fm in ["GeneralizedTime", "UnixTime", "TimeSpec"]:
552 attrs.append("%s;format=%s" % (ta, fm))
554 (result, out, err) = self.runsubcmd(
555 "user", "show", user["name"],
556 "--attributes=%s" % ",".join(attrs),
557 "-H", "ldap://%s" % os.environ["DC_SERVER"],
558 "-U%s%%%s" % (os.environ["DC_USERNAME"],
559 os.environ["DC_PASSWORD"]))
560 self.assertCmdSuccess(result, out, err, "Error running show")
562 self.assertIn(";format=GeneralizedTime", out)
563 self.assertIn(";format=UnixTime", out)
564 self.assertIn(";format=TimeSpec", out)
566 self.assertIn("name: ", out)
567 self.assertNotIn("name;format=GeneralizedTime: ", out)
568 self.assertNotIn("name;format=UnixTime: ", out)
569 self.assertNotIn("name;format=TimeSpec: ", out)
571 self.assertIn("whenCreated: 20", out)
572 self.assertIn("whenCreated;format=GeneralizedTime: 20", out)
573 self.assertIn("whenCreated;format=UnixTime: 1", out)
574 self.assertIn("whenCreated;format=TimeSpec: 1", out)
576 self.assertIn("whenChanged: 20", out)
577 self.assertIn("whenChanged;format=GeneralizedTime: 20", out)
578 self.assertIn("whenChanged;format=UnixTime: 1", out)
579 self.assertIn("whenChanged;format=TimeSpec: 1", out)
581 self.assertIn("accountExpires: 9223372036854775807", out)
582 self.assertNotIn("accountExpires;format=GeneralizedTime: ", out)
583 self.assertNotIn("accountExpires;format=UnixTime: ", out)
584 self.assertNotIn("accountExpires;format=TimeSpec: ", out)
586 self.assertIn("badPasswordTime: 0", out)
587 self.assertNotIn("badPasswordTime;format=GeneralizedTime: ", out)
588 self.assertNotIn("badPasswordTime;format=UnixTime: ", out)
589 self.assertNotIn("badPasswordTime;format=TimeSpec: ", out)
591 self.assertIn("lastLogoff: 0", out)
592 self.assertNotIn("lastLogoff;format=GeneralizedTime: ", out)
593 self.assertNotIn("lastLogoff;format=UnixTime: ", out)
594 self.assertNotIn("lastLogoff;format=TimeSpec: ", out)
596 self.assertIn("lastLogon: 0", out)
597 self.assertNotIn("lastLogon;format=GeneralizedTime: ", out)
598 self.assertNotIn("lastLogon;format=UnixTime: ", out)
599 self.assertNotIn("lastLogon;format=TimeSpec: ", out)
601 # If a specified attribute is not available on a user object
602 # it's silently omitted.
603 self.assertNotIn("lastLogonTimestamp:", out)
604 self.assertNotIn("lockoutTime:", out)
606 self.assertIn("msDS-UserPasswordExpiryTimeComputed: 1", out)
607 self.assertIn("msDS-UserPasswordExpiryTimeComputed;format=GeneralizedTime: 20", out)
608 self.assertIn("msDS-UserPasswordExpiryTimeComputed;format=UnixTime: 1", out)
609 self.assertIn("msDS-UserPasswordExpiryTimeComputed;format=TimeSpec: 1", out)
611 self.assertIn("pwdLastSet: 1", out)
612 self.assertIn("pwdLastSet;format=GeneralizedTime: 20", out)
613 self.assertIn("pwdLastSet;format=UnixTime: 1", out)
614 self.assertIn("pwdLastSet;format=TimeSpec: 1", out)
616 out_msgs = self.samdb.parse_ldif(out)
617 out_msg = next(out_msgs)[1]
619 self.assertIn("whenCreated", out_msg)
620 when_created_str = str(out_msg["whenCreated"][0])
621 self.assertIn("whenCreated;format=GeneralizedTime", out_msg)
622 self.assertEqual(str(out_msg["whenCreated;format=GeneralizedTime"][0]), when_created_str)
623 when_created_time = ldb.string_to_time(when_created_str)
624 self.assertIn("whenCreated;format=UnixTime", out_msg)
625 self.assertEqual(str(out_msg["whenCreated;format=UnixTime"][0]), str(when_created_time))
626 self.assertIn("whenCreated;format=TimeSpec", out_msg)
627 self.assertEqual(str(out_msg["whenCreated;format=TimeSpec"][0]),
628 "%d.000000000" % (when_created_time))
630 self.assertIn("whenChanged", out_msg)
631 when_changed_str = str(out_msg["whenChanged"][0])
632 self.assertIn("whenChanged;format=GeneralizedTime", out_msg)
633 self.assertEqual(str(out_msg["whenChanged;format=GeneralizedTime"][0]), when_changed_str)
634 when_changed_time = ldb.string_to_time(when_changed_str)
635 self.assertIn("whenChanged;format=UnixTime", out_msg)
636 self.assertEqual(str(out_msg["whenChanged;format=UnixTime"][0]), str(when_changed_time))
637 self.assertIn("whenChanged;format=TimeSpec", out_msg)
638 self.assertEqual(str(out_msg["whenChanged;format=TimeSpec"][0]),
639 "%d.000000000" % (when_changed_time))
641 self.assertIn("pwdLastSet;format=GeneralizedTime", out_msg)
642 pwd_last_set_str = str(out_msg["pwdLastSet;format=GeneralizedTime"][0])
643 pwd_last_set_time = ldb.string_to_time(pwd_last_set_str)
644 self.assertIn("pwdLastSet;format=UnixTime", out_msg)
645 self.assertEqual(str(out_msg["pwdLastSet;format=UnixTime"][0]), str(pwd_last_set_time))
646 self.assertIn("pwdLastSet;format=TimeSpec", out_msg)
647 self.assertIn("%d." % pwd_last_set_time, str(out_msg["pwdLastSet;format=TimeSpec"][0]))
648 self.assertNotIn(".000000000", str(out_msg["pwdLastSet;format=TimeSpec"][0]))
650 # assert that the pwd has been set in the minute after user creation
651 self.assertGreaterEqual(pwd_last_set_time, when_created_time)
652 self.assertLess(pwd_last_set_time, when_created_time + 60)
654 self.assertIn("msDS-UserPasswordExpiryTimeComputed;format=GeneralizedTime", out_msg)
655 pwd_expires_str = str(out_msg["msDS-UserPasswordExpiryTimeComputed;format=GeneralizedTime"][0])
656 pwd_expires_time = ldb.string_to_time(pwd_expires_str)
657 self.assertIn("msDS-UserPasswordExpiryTimeComputed;format=UnixTime", out_msg)
658 self.assertEqual(str(out_msg["msDS-UserPasswordExpiryTimeComputed;format=UnixTime"][0]), str(pwd_expires_time))
659 self.assertIn("msDS-UserPasswordExpiryTimeComputed;format=TimeSpec", out_msg)
660 self.assertIn("%d." % pwd_expires_time, str(out_msg["msDS-UserPasswordExpiryTimeComputed;format=TimeSpec"][0]))
661 self.assertNotIn(".000000000", str(out_msg["msDS-UserPasswordExpiryTimeComputed;format=TimeSpec"][0]))
663 # assert that the pwd expires after it was set
664 self.assertGreater(pwd_expires_time, pwd_last_set_time)
667 full_ou_dn = str(self.samdb.normalize_dn_in_domain("OU=movetest"))
668 (result, out, err) = self.runsubcmd("ou", "add", full_ou_dn)
669 self.assertCmdSuccess(result, out, err)
670 self.assertEqual(err, "", "There shouldn't be any error message")
671 self.assertIn('Added ou "%s"' % full_ou_dn, out)
673 for user in self.users:
674 (result, out, err) = self.runsubcmd(
675 "user", "move", user["name"], full_ou_dn)
676 self.assertCmdSuccess(result, out, err, "Error running move")
677 self.assertIn('Moved user "%s" into "%s"' %
678 (user["name"], full_ou_dn), out)
680 # Should fail as users objects are in OU
681 (result, out, err) = self.runsubcmd("ou", "delete", full_ou_dn)
682 self.assertCmdFail(result)
683 self.assertIn(("subtree_delete: Unable to delete a non-leaf node "
684 "(it has %d children)!") % len(self.users), err)
686 for user in self.users:
687 new_dn = "CN=Users,%s" % self.samdb.domain_dn()
688 (result, out, err) = self.runsubcmd(
689 "user", "move", user["name"], new_dn)
690 self.assertCmdSuccess(result, out, err, "Error running move")
691 self.assertIn('Moved user "%s" into "%s"' %
692 (user["name"], new_dn), out)
694 (result, out, err) = self.runsubcmd("ou", "delete", full_ou_dn)
695 self.assertCmdSuccess(result, out, err,
696 "Failed to delete ou '%s'" % full_ou_dn)
698 def test_rename_surname_initials_givenname(self):
699 """rename the existing surname and given name and add missing
700 initials, then remove them, for all users"""
701 for user in self.users:
702 new_givenname = "new_given_name_of_" + user["name"]
704 new_surname = "new_surname_of_" + user["name"]
705 found = self._find_user(user["name"])
706 old_cn = str(found.get("cn"))
708 # rename given name, initials and surname
709 (result, out, err) = self.runsubcmd("user", "rename", user["name"],
710 "--surname=%s" % new_surname,
711 "--initials=%s" % new_initials,
712 "--given-name=%s" % new_givenname)
713 self.assertCmdSuccess(result, out, err)
714 self.assertEqual(err, "", "Shouldn't be any error messages")
715 self.assertIn('successfully', out)
717 found = self._find_user(user["name"])
718 self.assertEqual("%s" % found.get("givenName"), new_givenname)
719 self.assertEqual("%s" % found.get("initials"), new_initials)
720 self.assertEqual("%s" % found.get("sn"), new_surname)
721 self.assertEqual("%s" % found.get("name"),
722 "%s %s. %s" % (new_givenname, new_initials, new_surname))
723 self.assertEqual("%s" % found.get("cn"),
724 "%s %s. %s" % (new_givenname, new_initials, new_surname))
726 # remove given name, initials and surname
727 (result, out, err) = self.runsubcmd("user", "rename", user["name"],
731 self.assertCmdSuccess(result, out, err)
732 self.assertEqual(err, "", "Shouldn't be any error messages")
733 self.assertIn('successfully', out)
735 found = self._find_user(user["name"])
736 self.assertEqual(found.get("givenName"), None)
737 self.assertEqual(found.get("initials"), None)
738 self.assertEqual(found.get("sn"), None)
739 self.assertEqual("%s" % found.get("cn"), user["name"])
741 # reset changes (initials are removed)
742 (result, out, err) = self.runsubcmd("user", "rename", user["name"],
743 "--surname=%(surname)s" % user,
744 "--given-name=%(given-name)s" % user)
745 self.assertCmdSuccess(result, out, err)
748 (result, out, err) = self.runsubcmd("user", "rename", user["name"],
749 "--force-new-cn=%s" % old_cn)
751 def test_rename_cn_samaccountname(self):
752 """rename and try to remove the cn and the samaccount of all users"""
753 for user in self.users:
754 new_cn = "new_cn_of_" + user["name"]
755 new_samaccountname = "new_samaccount_of_" + user["name"]
756 new_surname = "new_surname_of_" + user["name"]
759 (result, out, err) = self.runsubcmd("user", "rename", user["name"],
760 "--samaccountname=%s"
761 % new_samaccountname,
762 "--force-new-cn=%s" % new_cn)
763 self.assertCmdSuccess(result, out, err)
764 self.assertEqual(err, "", "Shouldn't be any error messages")
765 self.assertIn('successfully', out)
767 found = self._find_user(new_samaccountname)
768 self.assertEqual("%s" % found.get("cn"), new_cn)
769 self.assertEqual("%s" % found.get("sAMAccountName"),
772 # changing the surname has no effect to the cn
773 (result, out, err) = self.runsubcmd("user", "rename", new_samaccountname,
774 "--surname=%s" % new_surname)
775 self.assertCmdSuccess(result, out, err)
777 found = self._find_user(new_samaccountname)
778 self.assertEqual("%s" % found.get("cn"), new_cn)
780 # trying to remove cn (throws an error)
781 (result, out, err) = self.runsubcmd("user", "rename",
784 self.assertCmdFail(result)
785 self.assertIn('Failed to rename user', err)
786 self.assertIn("delete protected attribute", err)
788 # trying to remove the samccountname (throws an error)
789 (result, out, err) = self.runsubcmd("user", "rename",
792 self.assertCmdFail(result)
793 self.assertIn('Failed to rename user', err)
794 self.assertIn('delete protected attribute', err)
796 # reset changes (cn must be the name)
797 (result, out, err) = self.runsubcmd("user", "rename", new_samaccountname,
798 "--samaccountname=%(name)s"
800 "--force-new-cn=%(name)s" % user)
801 self.assertCmdSuccess(result, out, err)
803 def test_rename_standard_cn(self):
804 """reset the cn of all users to the standard"""
805 for user in self.users:
806 new_cn = "new_cn_of_" + user["name"]
807 new_givenname = "new_given_name_of_" + user["name"]
809 new_surname = "new_surname_of_" + user["name"]
812 (result, out, err) = self.runsubcmd("user", "rename", user["name"],
813 "--force-new-cn=%s" % new_cn)
814 self.assertCmdSuccess(result, out, err)
816 # remove given name, initials and surname
817 (result, out, err) = self.runsubcmd("user", "rename", user["name"],
821 self.assertCmdSuccess(result, out, err)
823 # reset the CN (no given name, initials or surname --> samaccountname)
824 (result, out, err) = self.runsubcmd("user", "rename", user["name"],
827 self.assertCmdSuccess(result, out, err)
828 self.assertEqual(err, "", "Shouldn't be any error messages")
829 self.assertIn('successfully', out)
831 found = self._find_user(user["name"])
832 self.assertEqual("%s" % found.get("cn"), user["name"])
834 # set given name, initials and surname and set different cn
835 (result, out, err) = self.runsubcmd("user", "rename", user["name"],
836 "--force-new-cn=%s" % new_cn,
837 "--surname=%s" % new_surname,
838 "--initials=%s" % new_initials,
839 "--given-name=%s" % new_givenname)
840 self.assertCmdSuccess(result, out, err)
842 # reset the CN (given name, initials or surname are given --> given name)
843 (result, out, err) = self.runsubcmd("user", "rename", user["name"],
846 self.assertCmdSuccess(result, out, err)
847 self.assertEqual(err, "", "Shouldn't be any error messages")
848 self.assertIn('successfully', out)
850 found = self._find_user(user["name"])
851 self.assertEqual("%s" % found.get("cn"),
852 "%s %s. %s" % (new_givenname, new_initials, new_surname))
855 (result, out, err) = self.runsubcmd("user", "rename", user["name"],
858 "--surname=%(surname)s" % user,
859 "--given-name=%(given-name)s" % user)
860 self.assertCmdSuccess(result, out, err)
862 def test_rename_mailaddress_displayname(self):
863 for user in self.users:
864 new_mail = "new_mailaddress_of_" + user["name"]
865 new_displayname = "new displayname of " + user["name"]
867 # change mail and displayname
868 (result, out, err) = self.runsubcmd("user", "rename", user["name"],
873 self.assertCmdSuccess(result, out, err)
874 self.assertEqual(err, "", "Shouldn't be any error messages")
875 self.assertIn('successfully', out)
877 found = self._find_user(user["name"])
878 self.assertEqual("%s" % found.get("mail"), new_mail)
879 self.assertEqual("%s" % found.get("displayName"), new_displayname)
881 # remove mail and displayname
882 (result, out, err) = self.runsubcmd("user", "rename", user["name"],
885 self.assertCmdSuccess(result, out, err)
886 self.assertEqual(err, "", "Shouldn't be any error messages")
887 self.assertIn('successfully', out)
889 found = self._find_user(user["name"])
890 self.assertEqual(found.get("mail"), None)
891 self.assertEqual(found.get("displayName"), None)
893 def test_rename_upn(self):
894 """rename upn of all users"""
895 for user in self.users:
896 found = self._find_user(user["name"])
897 old_upn = "%s" % found.get("userPrincipalName")
898 valid_suffix = old_upn.split('@')[1] # samba.example.com
900 valid_new_upn = "new_%s@%s" % (user["name"], valid_suffix)
901 invalid_new_upn = "%s@invalid.suffix" + user["name"]
903 # trying to set invalid upn
904 (result, out, err) = self.runsubcmd("user", "rename", user["name"],
907 self.assertCmdFail(result)
908 self.assertIn('is not a valid upn', err)
911 (result, out, err) = self.runsubcmd("user", "rename", user["name"],
914 self.assertCmdSuccess(result, out, err)
915 self.assertEqual(err, "", "Shouldn't be any error messages")
916 self.assertIn('successfully', out)
918 found = self._find_user(user["name"])
919 self.assertEqual("%s" % found.get("userPrincipalName"), valid_new_upn)
921 # trying to remove upn
922 (result, out, err) = self.runsubcmd("user", "rename", user["name"],
924 self.assertCmdFail(result)
925 self.assertIn('is not a valid upn', err)
928 (result, out, err) = self.runsubcmd("user", "rename", user["name"],
929 "--upn=%s" % old_upn)
930 self.assertCmdSuccess(result, out, err)
932 def test_getpwent(self):
936 self.skipTest("Skipping getpwent test, no 'pwd' module available")
939 # get the current user's data for the test
942 u = pwd.getpwuid(uid)
944 self.skipTest("Skipping getpwent test, current EUID not found in NSS")
948 # samba-tool user create command didn't support users with empty gecos if none is
949 # specified on the command line and the user hasn't one in the passwd file it
950 # will fail, so let's add some contents
953 if (gecos is None or len(gecos) == 0):
955 user = self._randomPosixUser({
963 # check if --rfc2307-from-nss sets the same values as we got from pwd.getpwuid()
964 (result, out, err) = self.runsubcmd("user", "create", user["name"], user["password"],
965 "--surname=%s" % user["surname"],
966 "--given-name=%s" % user["given-name"],
967 "--job-title=%s" % user["job-title"],
968 "--department=%s" % user["department"],
969 "--description=%s" % user["description"],
970 "--company=%s" % user["company"],
971 "--gecos=%s" % user["gecos"],
972 "--rfc2307-from-nss",
973 "-H", "ldap://%s" % os.environ["DC_SERVER"],
974 "-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"]))
976 self.assertCmdSuccess(result, out, err)
977 self.assertEqual(err, "", "Shouldn't be any error messages")
978 self.assertIn("User '%s' added successfully" % user["name"], out)
980 self._check_posix_user(user)
981 self.runsubcmd("user", "delete", user["name"])
983 # Check if overriding the attributes from NSS with explicit values works
985 # get a user with all random posix attributes
986 user = self._randomPosixUser({"name": u[0]})
987 # create a user with posix attributes from nss but override all of them with the
988 # random ones just obtained
989 (result, out, err) = self.runsubcmd("user", "create", user["name"], user["password"],
990 "--surname=%s" % user["surname"],
991 "--given-name=%s" % user["given-name"],
992 "--job-title=%s" % user["job-title"],
993 "--department=%s" % user["department"],
994 "--description=%s" % user["description"],
995 "--company=%s" % user["company"],
996 "--rfc2307-from-nss",
997 "--gecos=%s" % user["gecos"],
998 "--login-shell=%s" % user["loginShell"],
999 "--uid=%s" % user["uid"],
1000 "--uid-number=%s" % user["uidNumber"],
1001 "--gid-number=%s" % user["gidNumber"],
1002 "-H", "ldap://%s" % os.environ["DC_SERVER"],
1003 "-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"]))
1005 self.assertCmdSuccess(result, out, err)
1006 self.assertEqual(err, "", "Shouldn't be any error messages")
1007 self.assertIn("User '%s' added successfully" % user["name"], out)
1009 self._check_posix_user(user)
1010 self.runsubcmd("user", "delete", user["name"])
1012 # Test: samba-tool user unlock
1013 # This test does not verify that the command unlocks the user, it just
1014 # tests the command itself. The unlock test, which unlocks locked users,
1015 # is located in the 'samba4.ldap.password_lockout' test in
1016 # source4/dsdb/tests/python/password_lockout.py
1017 def test_unlock(self):
1019 # try to unlock a nonexistent user, this should fail
1020 nonexistentusername = "userdoesnotexist"
1021 (result, out, err) = self.runsubcmd(
1022 "user", "unlock", nonexistentusername)
1023 self.assertCmdFail(result, "Ensure that unlock nonexistent user fails")
1024 self.assertIn("Failed to unlock user '%s'" % nonexistentusername, err)
1025 self.assertIn("Unable to find user", err)
1027 # try to unlock with insufficient permissions, this should fail
1028 unprivileged_username = "unprivilegedunlockuser"
1029 unlocktest_username = "usertounlock"
1031 self.runsubcmd("user", "add", unprivileged_username, "Passw0rd")
1032 self.runsubcmd("user", "add", unlocktest_username, "Passw0rd")
1034 (result, out, err) = self.runsubcmd(
1035 "user", "unlock", unlocktest_username,
1036 "-H", "ldap://%s" % os.environ["DC_SERVER"],
1037 "-U%s%%%s" % (unprivileged_username,
1039 self.assertCmdFail(result, "Fail with LDAP_INSUFFICIENT_ACCESS_RIGHTS")
1040 self.assertIn("Failed to unlock user '%s'" % unlocktest_username, err)
1041 self.assertIn("LDAP error 50 LDAP_INSUFFICIENT_ACCESS_RIGHTS", err)
1043 self.runsubcmd("user", "delete", unprivileged_username)
1044 self.runsubcmd("user", "delete", unlocktest_username)
1046 # run unlock against test users
1047 for user in self.users:
1048 (result, out, err) = self.runsubcmd(
1049 "user", "unlock", user["name"])
1050 self.assertCmdSuccess(result, out, err, "Error running user unlock")
1051 self.assertEqual(err, "", "Shouldn't be any error messages")
1053 def _randomUser(self, base={}):
1054 """create a user with random attribute values, you can specify base attributes"""
1056 "name": self.randomName(),
1057 "password": self.random_password(16),
1058 "surname": self.randomName(),
1059 "given-name": self.randomName(),
1060 "job-title": self.randomName(),
1061 "department": self.randomName(),
1062 "company": self.randomName(),
1063 "description": self.randomName(count=100),
1064 "createUserFn": self._create_user,
1065 "checkUserFn": self._check_user,
1070 def _randomPosixUser(self, base={}):
1071 """create a user with random attribute values and additional RFC2307
1072 attributes, you can specify base attributes"""
1073 user = self._randomUser({})
1076 "uid": self.randomName(),
1077 "loginShell": self.randomName(),
1078 "gecos": self.randomName(),
1079 "uidNumber": self.randomXid(),
1080 "gidNumber": self.randomXid(),
1081 "createUserFn": self._create_posix_user,
1082 "checkUserFn": self._check_posix_user,
1084 user.update(posixAttributes)
1088 def _randomUnixUser(self, base={}):
1089 """create a user with random attribute values and additional RFC2307
1090 attributes, you can specify base attributes"""
1091 user = self._randomUser({})
1094 "uidNumber": self.randomXid(),
1095 "gidNumber": self.randomXid(),
1096 "uid": self.randomName(),
1097 "loginShell": self.randomName(),
1098 "gecos": self.randomName(),
1099 "createUserFn": self._create_unix_user,
1100 "checkUserFn": self._check_unix_user,
1102 user.update(posixAttributes)
1106 def _check_user(self, user):
1107 """ check if a user from SamDB has the same attributes as its template """
1108 found = self._find_user(user["name"])
1110 self.assertEqual("%s" % found.get("name"), "%(given-name)s %(surname)s" % user)
1111 self.assertEqual("%s" % found.get("title"), user["job-title"])
1112 self.assertEqual("%s" % found.get("company"), user["company"])
1113 self.assertEqual("%s" % found.get("description"), user["description"])
1114 self.assertEqual("%s" % found.get("department"), user["department"])
1116 def _check_posix_user(self, user):
1117 """ check if a posix_user from SamDB has the same attributes as its template """
1118 found = self._find_user(user["name"])
1120 self.assertEqual("%s" % found.get("loginShell"), user["loginShell"])
1121 self.assertEqual("%s" % found.get("gecos"), user["gecos"])
1122 self.assertEqual("%s" % found.get("uidNumber"), "%s" % user["uidNumber"])
1123 self.assertEqual("%s" % found.get("gidNumber"), "%s" % user["gidNumber"])
1124 self.assertEqual("%s" % found.get("uid"), user["uid"])
1125 self._check_user(user)
1127 def _check_unix_user(self, user):
1128 """ check if a unix_user from SamDB has the same attributes as its
1130 found = self._find_user(user["name"])
1132 self.assertEqual("%s" % found.get("loginShell"), user["loginShell"])
1133 self.assertEqual("%s" % found.get("gecos"), user["gecos"])
1134 self.assertEqual("%s" % found.get("uidNumber"), "%s" %
1136 self.assertEqual("%s" % found.get("gidNumber"), "%s" %
1138 self.assertEqual("%s" % found.get("uid"), user["uid"])
1139 self.assertIn('/home/test/', "%s" % found.get("unixHomeDirectory"))
1140 self._check_user(user)
1142 def _create_user(self, user):
1143 return self.runsubcmd("user", "add", user["name"], user["password"],
1144 "--surname=%s" % user["surname"],
1145 "--given-name=%s" % user["given-name"],
1146 "--job-title=%s" % user["job-title"],
1147 "--department=%s" % user["department"],
1148 "--description=%s" % user["description"],
1149 "--company=%s" % user["company"],
1150 "-H", "ldap://%s" % os.environ["DC_SERVER"],
1151 "-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"]))
1153 def _create_posix_user(self, user):
1154 """ create a new user with RFC2307 attributes """
1155 return self.runsubcmd("user", "create", user["name"], user["password"],
1156 "--surname=%s" % user["surname"],
1157 "--given-name=%s" % user["given-name"],
1158 "--job-title=%s" % user["job-title"],
1159 "--department=%s" % user["department"],
1160 "--description=%s" % user["description"],
1161 "--company=%s" % user["company"],
1162 "--gecos=%s" % user["gecos"],
1163 "--login-shell=%s" % user["loginShell"],
1164 "--uid=%s" % user["uid"],
1165 "--uid-number=%s" % user["uidNumber"],
1166 "--gid-number=%s" % user["gidNumber"],
1167 "-H", "ldap://%s" % os.environ["DC_SERVER"],
1168 "-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"]))
1170 def _create_unix_user(self, user):
1171 """ Add RFC2307 attributes to a user"""
1172 self._create_user(user)
1173 return self.runsubcmd("user", "addunixattrs", user["name"],
1174 "%s" % user["uidNumber"],
1175 "--gid-number=%s" % user["gidNumber"],
1176 "--gecos=%s" % user["gecos"],
1177 "--login-shell=%s" % user["loginShell"],
1178 "--uid=%s" % user["uid"],
1179 "-H", "ldap://%s" % os.environ["DC_SERVER"],
1180 "-U%s%%%s" % (os.environ["DC_USERNAME"],
1181 os.environ["DC_PASSWORD"]))
1183 def _find_user(self, name):
1184 search_filter = "(&(sAMAccountName=%s)(objectCategory=%s,%s))" % (ldb.binary_encode(name), "CN=Person,CN=Schema,CN=Configuration", self.samdb.domain_dn())
1185 userlist = self.samdb.search(base=self.samdb.domain_dn(),
1186 scope=ldb.SCOPE_SUBTREE,
1187 expression=search_filter)