Move python modules from source4/scripting/python/ to python/.
[samba.git] / python / samba / tests / samba_tool / user.py
1 # Unix SMB/CIFS implementation.
2 # Copyright (C) Sean Dague <sdague@linux.vnet.ibm.com> 2011
3 #
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 # GNU General Public License for more details.
13 #
14 # You should have received a copy of the GNU General Public License
15 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 #
17
18 import os
19 import time
20 import ldb
21 from samba.tests.samba_tool.base import SambaToolCmdTest
22 from samba import (
23         nttime2unix,
24         dsdb
25         )
26
27 class UserCmdTestCase(SambaToolCmdTest):
28     """Tests for samba-tool user subcommands"""
29     users = []
30     samdb = None
31
32     def setUp(self):
33         super(UserCmdTestCase, self).setUp()
34         self.samdb = self.getSamDB("-H", "ldap://%s" % os.environ["DC_SERVER"],
35             "-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"]))
36         self.users = []
37         self.users.append(self._randomUser({"name": "sambatool1", "company": "comp1"}))
38         self.users.append(self._randomUser({"name": "sambatool2", "company": "comp1"}))
39         self.users.append(self._randomUser({"name": "sambatool3", "company": "comp2"}))
40         self.users.append(self._randomUser({"name": "sambatool4", "company": "comp2"}))
41         self.users.append(self._randomPosixUser({"name": "posixuser1"}))
42         self.users.append(self._randomPosixUser({"name": "posixuser2"}))
43         self.users.append(self._randomPosixUser({"name": "posixuser3"}))
44         self.users.append(self._randomPosixUser({"name": "posixuser4"}))
45
46         # setup the 8 users and ensure they are correct
47         for user in self.users:
48             (result, out, err) = user["createUserFn"](user)
49
50             self.assertCmdSuccess(result)
51             self.assertEquals(err,"","Shouldn't be any error messages")
52             self.assertIn("User '%s' created successfully" % user["name"], out)
53
54             user["checkUserFn"](user)
55
56
57     def tearDown(self):
58         super(UserCmdTestCase, self).tearDown()
59         # clean up all the left over users, just in case
60         for user in self.users:
61             if self._find_user(user["name"]):
62                 self.runsubcmd("user", "delete", user["name"])
63
64
65     def test_newuser(self):
66         # try to add all the users again, this should fail
67         for user in self.users:
68             (result, out, err) = self._create_user(user)
69             self.assertCmdFail(result, "Ensure that create user fails")
70             self.assertIn("LDAP error 68 LDAP_ENTRY_ALREADY_EXISTS", err)
71
72         # try to delete all the 4 users we just added
73         for user in self.users:
74             (result, out, err) = self.runsubcmd("user", "delete", user["name"])
75             self.assertCmdSuccess(result, "Can we delete users")
76             found = self._find_user(user["name"])
77             self.assertIsNone(found)
78
79         # test adding users with --use-username-as-cn
80         for user in self.users:
81             (result, out, err) =  self.runsubcmd("user", "add", user["name"], user["password"],
82                                                  "--use-username-as-cn",
83                                                  "--surname=%s" % user["surname"],
84                                                  "--given-name=%s" % user["given-name"],
85                                                  "--job-title=%s" % user["job-title"],
86                                                  "--department=%s" % user["department"],
87                                                  "--description=%s" % user["description"],
88                                                  "--company=%s" % user["company"],
89                                                  "-H", "ldap://%s" % os.environ["DC_SERVER"],
90                                                  "-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"]))
91
92             self.assertCmdSuccess(result)
93             self.assertEquals(err,"","Shouldn't be any error messages")
94             self.assertIn("User '%s' created successfully" % user["name"], out)
95
96             found = self._find_user(user["name"])
97
98             self.assertEquals("%s" % found.get("cn"), "%(name)s" % user)
99             self.assertEquals("%s" % found.get("name"), "%(name)s" % user)
100
101
102
103     def test_setpassword(self):
104         for user in self.users:
105             newpasswd = self.randomPass()
106             (result, out, err) = self.runsubcmd("user", "setpassword",
107                                                 user["name"],
108                                                 "--newpassword=%s" % newpasswd,
109                                                 "-H", "ldap://%s" % os.environ["DC_SERVER"],
110                                                 "-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"]))
111             # self.assertCmdSuccess(result, "Ensure setpassword runs")
112             self.assertEquals(err,"","setpassword with url")
113             self.assertMatch(out, "Changed password OK", "setpassword with url")
114
115         for user in self.users:
116             newpasswd = self.randomPass()
117             (result, out, err) = self.runsubcmd("user", "setpassword",
118                                                 user["name"],
119                                                 "--newpassword=%s" % newpasswd)
120             # self.assertCmdSuccess(result, "Ensure setpassword runs")
121             self.assertEquals(err,"","setpassword without url")
122             self.assertMatch(out, "Changed password OK", "setpassword without url")
123
124         for user in self.users:
125             newpasswd = self.randomPass()
126             (result, out, err) = self.runsubcmd("user", "setpassword",
127                                                 user["name"],
128                                                 "--newpassword=%s" % newpasswd,
129                                                 "--must-change-at-next-login",
130                                                 "-H", "ldap://%s" % os.environ["DC_SERVER"],
131                                                 "-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"]))
132             # self.assertCmdSuccess(result, "Ensure setpassword runs")
133             self.assertEquals(err,"","setpassword with forced change")
134             self.assertMatch(out, "Changed password OK", "setpassword with forced change")
135
136
137
138
139     def test_setexpiry(self):
140         twodays = time.time() + (2 * 24 * 60 * 60)
141
142         for user in self.users:
143             (result, out, err) = self.runsubcmd("user", "setexpiry", user["name"],
144                                                 "--days=2",
145                                                 "-H", "ldap://%s" % os.environ["DC_SERVER"],
146                                                 "-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"]))
147             self.assertCmdSuccess(result, "Can we run setexpiry with names")
148             self.assertIn("Expiry for user '%s' set to 2 days." % user["name"], out)
149
150         for user in self.users:
151             found = self._find_user(user["name"])
152
153             expires = nttime2unix(int("%s" % found.get("accountExpires")))
154             self.assertWithin(expires, twodays, 5, "Ensure account expires is within 5 seconds of the expected time")
155
156         # TODO: renable this after the filter case is sorted out
157         if "filters are broken, bail now":
158             return
159
160         # now run the expiration based on a filter
161         fourdays = time.time() + (4 * 24 * 60 * 60)
162         (result, out, err) = self.runsubcmd("user", "setexpiry",
163                                                 "--filter", "(&(objectClass=user)(company=comp2))",
164                                                 "--days=4",
165                                                 "-H", "ldap://%s" % os.environ["DC_SERVER"],
166                                                 "-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"]))
167         self.assertCmdSuccess(result, "Can we run setexpiry with a filter")
168
169         for user in self.users:
170             found = self._find_user(user["name"])
171             if ("%s" % found.get("company")) == "comp2":
172                 expires = nttime2unix(int("%s" % found.get("accountExpires")))
173                 self.assertWithin(expires, fourdays, 5, "Ensure account expires is within 5 seconds of the expected time")
174             else:
175                 expires = nttime2unix(int("%s" % found.get("accountExpires")))
176                 self.assertWithin(expires, twodays, 5, "Ensure account expires is within 5 seconds of the expected time")
177
178
179     def test_list(self):
180         (result, out, err) = self.runsubcmd("user", "list",
181                                             "-H", "ldap://%s" % os.environ["DC_SERVER"],
182                                             "-U%s%%%s" % (os.environ["DC_USERNAME"],
183                                                           os.environ["DC_PASSWORD"]))
184         self.assertCmdSuccess(result, "Error running list")
185
186         search_filter = ("(&(objectClass=user)(userAccountControl:%s:=%u))" %
187                          (ldb.OID_COMPARATOR_AND, dsdb.UF_NORMAL_ACCOUNT))
188
189         userlist = self.samdb.search(base=self.samdb.domain_dn(),
190                                      scope=ldb.SCOPE_SUBTREE,
191                                      expression=search_filter,
192                                      attrs=["samaccountname"])
193
194         self.assertTrue(len(userlist) > 0, "no users found in samdb")
195
196         for userobj in userlist:
197             name = userobj.get("samaccountname", idx=0)
198             found = self.assertMatch(out, name,
199                                      "user '%s' not found" % name)
200     def test_getpwent(self):
201         try:
202             import pwd
203         except ImportError:
204             self.skipTest("Skipping getpwent test, no 'pwd' module available")
205             return
206
207         # get the current user's data for the test
208         uid = os.geteuid()
209         try:
210             u = pwd.getpwuid(uid)
211         except KeyError:
212             self.skipTest("Skipping getpwent test, current EUID not found in NSS")
213             return
214
215         user = self._randomPosixUser({
216                         "name": u[0],
217                         "uid": u[0],
218                         "uidNumber": u[2],
219                         "gidNumber": u[3],
220                         "gecos": u[4],
221                         "loginShell": u[6],
222                         })
223         # check if --rfc2307-from-nss sets the same values as we got from pwd.getpwuid()
224         (result, out, err) = self.runsubcmd("user", "add", user["name"], user["password"],
225                                                 "--surname=%s" % user["surname"],
226                                                 "--given-name=%s" % user["given-name"],
227                                                 "--job-title=%s" % user["job-title"],
228                                                 "--department=%s" % user["department"],
229                                                 "--description=%s" % user["description"],
230                                                 "--company=%s" % user["company"],
231                                                 "--rfc2307-from-nss",
232                                                 "-H", "ldap://%s" % os.environ["DC_SERVER"],
233                                                 "-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"]))
234
235         self.assertCmdSuccess(result)
236         self.assertEquals(err,"","Shouldn't be any error messages")
237         self.assertIn("User '%s' created successfully" % user["name"], out)
238
239         self._check_posix_user(user)
240         self.runsubcmd("user", "delete", user["name"])
241
242         # Check if overriding the attributes from NSS with explicit values works
243         #
244         # get a user with all random posix attributes
245         user = self._randomPosixUser({"name": u[0]})
246         # create a user with posix attributes from nss but override all of them with the
247         # random ones just obtained
248         (result, out, err) = self.runsubcmd("user", "add", user["name"], user["password"],
249                                                 "--surname=%s" % user["surname"],
250                                                 "--given-name=%s" % user["given-name"],
251                                                 "--job-title=%s" % user["job-title"],
252                                                 "--department=%s" % user["department"],
253                                                 "--description=%s" % user["description"],
254                                                 "--company=%s" % user["company"],
255                                                 "--rfc2307-from-nss",
256                                                 "--gecos=%s" % user["gecos"],
257                                                 "--login-shell=%s" % user["loginShell"],
258                                                 "--uid=%s" % user["uid"],
259                                                 "--uid-number=%s" % user["uidNumber"],
260                                                 "--gid-number=%s" % user["gidNumber"],
261                                                 "-H", "ldap://%s" % os.environ["DC_SERVER"],
262                                                 "-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"]))
263
264         self.assertCmdSuccess(result)
265         self.assertEquals(err,"","Shouldn't be any error messages")
266         self.assertIn("User '%s' created successfully" % user["name"], out)
267
268         self._check_posix_user(user)
269         self.runsubcmd("user", "delete", user["name"])
270
271     def _randomUser(self, base={}):
272         """create a user with random attribute values, you can specify base attributes"""
273         user = {
274             "name": self.randomName(),
275             "password": self.randomPass(),
276             "surname": self.randomName(),
277             "given-name": self.randomName(),
278             "job-title": self.randomName(),
279             "department": self.randomName(),
280             "company": self.randomName(),
281             "description": self.randomName(count=100),
282             "createUserFn": self._create_user,
283             "checkUserFn": self._check_user,
284             }
285         user.update(base)
286         return user
287
288     def _randomPosixUser(self, base={}):
289         """create a user with random attribute values and additional RFC2307
290         attributes, you can specify base attributes"""
291         user = self._randomUser({})
292         user.update(base)
293         posixAttributes = {
294             "uid": self.randomName(),
295             "loginShell": self.randomName(),
296             "gecos": self.randomName(),
297             "uidNumber": self.randomXid(),
298             "gidNumber": self.randomXid(),
299             "createUserFn": self._create_posix_user,
300             "checkUserFn": self._check_posix_user,
301         }
302         user.update(posixAttributes)
303         user.update(base)
304         return user
305
306     def _check_user(self, user):
307         """ check if a user from SamDB has the same attributes as its template """
308         found = self._find_user(user["name"])
309
310         self.assertEquals("%s" % found.get("name"), "%(given-name)s %(surname)s" % user)
311         self.assertEquals("%s" % found.get("title"), user["job-title"])
312         self.assertEquals("%s" % found.get("company"), user["company"])
313         self.assertEquals("%s" % found.get("description"), user["description"])
314         self.assertEquals("%s" % found.get("department"), user["department"])
315
316     def _check_posix_user(self, user):
317         """ check if a posix_user from SamDB has the same attributes as its template """
318         found = self._find_user(user["name"])
319
320         self.assertEquals("%s" % found.get("loginShell"), user["loginShell"])
321         self.assertEquals("%s" % found.get("gecos"), user["gecos"])
322         self.assertEquals("%s" % found.get("uidNumber"), "%s" % user["uidNumber"])
323         self.assertEquals("%s" % found.get("gidNumber"), "%s" % user["gidNumber"])
324         self.assertEquals("%s" % found.get("uid"), user["uid"])
325         self._check_user(user)
326
327     def _create_user(self, user):
328         return self.runsubcmd("user", "add", user["name"], user["password"],
329                                                 "--surname=%s" % user["surname"],
330                                                 "--given-name=%s" % user["given-name"],
331                                                 "--job-title=%s" % user["job-title"],
332                                                 "--department=%s" % user["department"],
333                                                 "--description=%s" % user["description"],
334                                                 "--company=%s" % user["company"],
335                                                 "-H", "ldap://%s" % os.environ["DC_SERVER"],
336                                                 "-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"]))
337     def _create_posix_user(self, user):
338         """ create a new user with RFC2307 attributes """
339         return self.runsubcmd("user", "create", user["name"], user["password"],
340                                                 "--surname=%s" % user["surname"],
341                                                 "--given-name=%s" % user["given-name"],
342                                                 "--job-title=%s" % user["job-title"],
343                                                 "--department=%s" % user["department"],
344                                                 "--description=%s" % user["description"],
345                                                 "--company=%s" % user["company"],
346                                                 "--gecos=%s" % user["gecos"],
347                                                 "--login-shell=%s" % user["loginShell"],
348                                                 "--uid=%s" % user["uid"],
349                                                 "--uid-number=%s" % user["uidNumber"],
350                                                 "--gid-number=%s" % user["gidNumber"],
351                                                 "-H", "ldap://%s" % os.environ["DC_SERVER"],
352                                                 "-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"]))
353
354     def _find_user(self, name):
355         search_filter = "(&(sAMAccountName=%s)(objectCategory=%s,%s))" % (ldb.binary_encode(name), "CN=Person,CN=Schema,CN=Configuration", self.samdb.domain_dn())
356         userlist = self.samdb.search(base=self.samdb.domain_dn(),
357                                   scope=ldb.SCOPE_SUBTREE,
358                                   expression=search_filter, attrs=[])
359         if userlist:
360             return userlist[0]
361         else:
362             return None