f99d9d86f39737bed858ebb8ba884c5d2836d98e
[metze/samba/wip.git] / python / samba / tests / auth_log_pass_change.py
1 # Unix SMB/CIFS implementation.
2 # Copyright (C) Andrew Bartlett <abartlet@samba.org> 2017
3 #
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 # GNU General Public License for more details.
13 #
14 # You should have received a copy of the GNU General Public License
15 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 #
17
18 from __future__ import print_function
19 """Tests for the Auth and AuthZ logging of password changes.
20 """
21
22 import samba.tests
23 from samba.samdb import SamDB
24 from samba.auth import system_session
25 import os
26 import samba.tests.auth_log_base
27 from samba.tests import delete_force
28 from samba.net import Net
29 import samba
30 from subprocess import call
31 from ldb import LdbError
32 from samba.tests.password_test import PasswordCommon
33 from samba.dcerpc.windows_event_ids import (
34     EVT_ID_SUCCESSFUL_LOGON,
35     EVT_ID_UNSUCCESSFUL_LOGON
36 )
37
38 USER_NAME = "authlogtestuser"
39 USER_PASS = samba.generate_random_password(32, 32)
40
41
42 class AuthLogPassChangeTests(samba.tests.auth_log_base.AuthLogTestBase):
43
44     def setUp(self):
45         super(AuthLogPassChangeTests, self).setUp()
46
47         self.remoteAddress = os.environ["CLIENT_IP"]
48         self.server_ip = os.environ["SERVER_IP"]
49
50         host = "ldap://%s" % os.environ["SERVER"]
51         self.ldb = SamDB(url=host,
52                          session_info=system_session(),
53                          credentials=self.get_credentials(),
54                          lp=self.get_loadparm())
55
56         print("ldb %s" % type(self.ldb))
57         # Gets back the basedn
58         base_dn = self.ldb.domain_dn()
59         print("base_dn %s" % base_dn)
60
61         # permit password changes during this test
62         PasswordCommon.allow_password_changes(self, self.ldb)
63
64         self.base_dn = self.ldb.domain_dn()
65
66         # (Re)adds the test user USER_NAME with password USER_PASS
67         delete_force(self.ldb, "cn=" + USER_NAME + ",cn=users," + self.base_dn)
68         self.ldb.add({
69             "dn": "cn=" + USER_NAME + ",cn=users," + self.base_dn,
70             "objectclass": "user",
71             "sAMAccountName": USER_NAME,
72             "userPassword": USER_PASS
73         })
74
75         # discard any auth log messages for the password setup
76         self.discardMessages()
77
78     def tearDown(self):
79         super(AuthLogPassChangeTests, self).tearDown()
80
81     def test_admin_change_password(self):
82         def isLastExpectedMessage(msg):
83             return ((msg["type"] == "Authentication") and
84                     (msg["Authentication"]["status"] == "NT_STATUS_OK") and
85                     (msg["Authentication"]["serviceDescription"] ==
86                         "SAMR Password Change") and
87                     (msg["Authentication"]["authDescription"] ==
88                         "samr_ChangePasswordUser3") and
89                     (msg["Authentication"]["eventId"] ==
90                         EVT_ID_SUCCESSFUL_LOGON))
91
92         creds = self.insta_creds(template=self.get_credentials())
93
94         lp = self.get_loadparm()
95         net = Net(creds, lp, server=self.server_ip)
96         password = "newPassword!!42"
97
98         net.change_password(newpassword=password,
99                             username=USER_NAME,
100                             oldpassword=USER_PASS)
101
102         messages = self.waitForMessages(isLastExpectedMessage)
103         print("Received %d messages" % len(messages))
104         self.assertEquals(8,
105                           len(messages),
106                           "Did not receive the expected number of messages")
107
108     def test_admin_change_password_new_password_fails_restriction(self):
109         def isLastExpectedMessage(msg):
110             return ((msg["type"] == "Authentication") and
111                     (msg["Authentication"]["status"] ==
112                         "NT_STATUS_PASSWORD_RESTRICTION") and
113                     (msg["Authentication"]["serviceDescription"] ==
114                         "SAMR Password Change") and
115                     (msg["Authentication"]["authDescription"] ==
116                         "samr_ChangePasswordUser3") and
117                     (msg["Authentication"]["eventId"] ==
118                         EVT_ID_UNSUCCESSFUL_LOGON))
119
120         creds = self.insta_creds(template=self.get_credentials())
121
122         lp = self.get_loadparm()
123         net = Net(creds, lp, server=self.server_ip)
124         password = "newPassword"
125
126         exception_thrown = False
127         try:
128             net.change_password(newpassword=password,
129                                 oldpassword=USER_PASS,
130                                 username=USER_NAME)
131         except Exception:
132             exception_thrown = True
133         self.assertEquals(True, exception_thrown,
134                           "Expected exception not thrown")
135
136         messages = self.waitForMessages(isLastExpectedMessage)
137         self.assertEquals(8,
138                           len(messages),
139                           "Did not receive the expected number of messages")
140
141     def test_admin_change_password_unknown_user(self):
142         def isLastExpectedMessage(msg):
143             return ((msg["type"] == "Authentication") and
144                     (msg["Authentication"]["status"] ==
145                         "NT_STATUS_NO_SUCH_USER") and
146                     (msg["Authentication"]["serviceDescription"] ==
147                         "SAMR Password Change") and
148                     (msg["Authentication"]["authDescription"] ==
149                         "samr_ChangePasswordUser3") and
150                     (msg["Authentication"]["eventId"] ==
151                         EVT_ID_UNSUCCESSFUL_LOGON))
152
153         creds = self.insta_creds(template=self.get_credentials())
154
155         lp = self.get_loadparm()
156         net = Net(creds, lp, server=self.server_ip)
157         password = "newPassword!!42"
158
159         exception_thrown = False
160         try:
161             net.change_password(newpassword=password,
162                                 oldpassword=USER_PASS,
163                                 username="badUser")
164         except Exception:
165             exception_thrown = True
166         self.assertEquals(True, exception_thrown,
167                           "Expected exception not thrown")
168
169         messages = self.waitForMessages(isLastExpectedMessage)
170         self.assertEquals(8,
171                           len(messages),
172                           "Did not receive the expected number of messages")
173
174     def test_admin_change_password_bad_original_password(self):
175         def isLastExpectedMessage(msg):
176             return ((msg["type"] == "Authentication") and
177                     (msg["Authentication"]["status"] ==
178                         "NT_STATUS_WRONG_PASSWORD") and
179                     (msg["Authentication"]["serviceDescription"] ==
180                         "SAMR Password Change") and
181                     (msg["Authentication"]["authDescription"] ==
182                         "samr_ChangePasswordUser3") and
183                     (msg["Authentication"]["eventId"] ==
184                         EVT_ID_UNSUCCESSFUL_LOGON))
185
186         creds = self.insta_creds(template=self.get_credentials())
187
188         lp = self.get_loadparm()
189         net = Net(creds, lp, server=self.server_ip)
190         password = "newPassword!!42"
191
192         exception_thrown = False
193         try:
194             net.change_password(newpassword=password,
195                                 oldpassword="badPassword",
196                                 username=USER_NAME)
197         except Exception:
198             exception_thrown = True
199         self.assertEquals(True, exception_thrown,
200                           "Expected exception not thrown")
201
202         messages = self.waitForMessages(isLastExpectedMessage)
203         self.assertEquals(8,
204                           len(messages),
205                           "Did not receive the expected number of messages")
206
207     # net rap password changes are broken, but they trigger enough of the
208     # server side behaviour to exercise the code paths of interest.
209     # if we used the real password it would be too long and does not hash
210     # correctly, so we just check it triggers the wrong password path.
211     def test_rap_change_password(self):
212         def isLastExpectedMessage(msg):
213             return ((msg["type"] == "Authentication") and
214                     (msg["Authentication"]["serviceDescription"] ==
215                         "SAMR Password Change") and
216                     (msg["Authentication"]["status"] ==
217                         "NT_STATUS_WRONG_PASSWORD") and
218                     (msg["Authentication"]["authDescription"] ==
219                         "OemChangePasswordUser2") and
220                     (msg["Authentication"]["eventId"] ==
221                         EVT_ID_UNSUCCESSFUL_LOGON))
222
223         username = os.environ["USERNAME"]
224         server = os.environ["SERVER"]
225         password = os.environ["PASSWORD"]
226         server_param = "--server=%s" % server
227         creds = "-U%s%%%s" % (username, password)
228         call(["bin/net", "rap", server_param,
229               "password", USER_NAME, "notMyPassword", "notGoingToBeMyPassword",
230               server, creds, "--option=client ipc max protocol=nt1"])
231
232         messages = self.waitForMessages(isLastExpectedMessage)
233         self.assertEquals(7,
234                           len(messages),
235                           "Did not receive the expected number of messages")
236
237     def test_ldap_change_password(self):
238         def isLastExpectedMessage(msg):
239             return ((msg["type"] == "Authentication") and
240                     (msg["Authentication"]["status"] == "NT_STATUS_OK") and
241                     (msg["Authentication"]["serviceDescription"] ==
242                         "LDAP Password Change") and
243                     (msg["Authentication"]["authDescription"] ==
244                         "LDAP Modify") and
245                     (msg["Authentication"]["eventId"] ==
246                         EVT_ID_SUCCESSFUL_LOGON))
247
248         new_password = samba.generate_random_password(32, 32)
249         self.ldb.modify_ldif(
250             "dn: cn=" + USER_NAME + ",cn=users," + self.base_dn + "\n" +
251             "changetype: modify\n" +
252             "delete: userPassword\n" +
253             "userPassword: " + USER_PASS + "\n" +
254             "add: userPassword\n" +
255             "userPassword: " + new_password + "\n")
256
257         messages = self.waitForMessages(isLastExpectedMessage)
258         print("Received %d messages" % len(messages))
259         self.assertEquals(4,
260                           len(messages),
261                           "Did not receive the expected number of messages")
262
263     #
264     # Currently this does not get logged, so we expect to only see the log
265     # entries for the underlying ldap bind.
266     #
267     def test_ldap_change_password_bad_user(self):
268         def isLastExpectedMessage(msg):
269             return (msg["type"] == "Authorization" and
270                     msg["Authorization"]["serviceDescription"] == "LDAP" and
271                     msg["Authorization"]["authType"] == "krb5")
272
273         new_password = samba.generate_random_password(32, 32)
274         try:
275             self.ldb.modify_ldif(
276                 "dn: cn=" + "badUser" + ",cn=users," + self.base_dn + "\n" +
277                 "changetype: modify\n" +
278                 "delete: userPassword\n" +
279                 "userPassword: " + USER_PASS + "\n" +
280                 "add: userPassword\n" +
281                 "userPassword: " + new_password + "\n")
282             self.fail()
283         except LdbError as e:
284             (num, msg) = e.args
285             pass
286
287         messages = self.waitForMessages(isLastExpectedMessage)
288         print("Received %d messages" % len(messages))
289         self.assertEquals(3,
290                           len(messages),
291                           "Did not receive the expected number of messages")
292
293     def test_ldap_change_password_bad_original_password(self):
294         def isLastExpectedMessage(msg):
295             return ((msg["type"] == "Authentication") and
296                     (msg["Authentication"]["status"] ==
297                         "NT_STATUS_WRONG_PASSWORD") and
298                     (msg["Authentication"]["serviceDescription"] ==
299                         "LDAP Password Change") and
300                     (msg["Authentication"]["authDescription"] ==
301                         "LDAP Modify") and
302                     (msg["Authentication"]["eventId"] ==
303                         EVT_ID_UNSUCCESSFUL_LOGON))
304
305         new_password = samba.generate_random_password(32, 32)
306         try:
307             self.ldb.modify_ldif(
308                 "dn: cn=" + USER_NAME + ",cn=users," + self.base_dn + "\n" +
309                 "changetype: modify\n" +
310                 "delete: userPassword\n" +
311                 "userPassword: " + "badPassword" + "\n" +
312                 "add: userPassword\n" +
313                 "userPassword: " + new_password + "\n")
314             self.fail()
315         except LdbError as e1:
316             (num, msg) = e1.args
317             pass
318
319         messages = self.waitForMessages(isLastExpectedMessage)
320         print("Received %d messages" % len(messages))
321         self.assertEquals(4,
322                           len(messages),
323                           "Did not receive the expected number of messages")