tests/auth_log_winbind: Expect an empty remote address
[bbaumbach/samba-autobuild/.git] / python / samba / tests / auth_log_winbind.py
1 # Unix SMB/CIFS implementation.
2 # Copyright (C) Andrew Bartlett <abartlet@samba.org> 2019
3 #
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 # GNU General Public License for more details.
13 #
14 # You should have received a copy of the GNU General Public License
15 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 #
17
18 """
19     auth logging tests that exercise winbind
20 """
21
22 import json
23 import os
24 import time
25
26 from samba.auth import system_session
27 from samba.credentials import Credentials
28 from samba.common import get_string, get_bytes
29 from samba.dcerpc.messaging import AUTH_EVENT_NAME, MSG_AUTH_LOG
30 from samba.dsdb import UF_NORMAL_ACCOUNT
31 from samba.messaging import Messaging
32 from samba.param import LoadParm
33 from samba.samdb import SamDB
34 from samba.tests import delete_force, BlackboxProcessError, BlackboxTestCase
35 from samba.tests.auth_log_base import AuthLogTestBase
36
37 USER_NAME = "WBALU"
38
39
40 class AuthLogTestsWinbind(AuthLogTestBase, BlackboxTestCase):
41
42     #
43     # Helper function to watch for authentication messages on the
44     # Domain Controller.
45     #
46     def dc_watcher(self):
47
48         (r1, w1) = os.pipe()
49         pid = os.fork()
50         if pid != 0:
51             # Parent process return the result socket to the caller.
52             return r1
53
54         # Load the lp context for the Domain Controller, rather than the
55         # member server.
56         config_file = os.environ["DC_SERVERCONFFILE"]
57         lp_ctx = LoadParm()
58         lp_ctx.load(config_file)
59
60         #
61         # Is the message a SamLogon authentication?
62         def is_sam_logon(m):
63             if m is None:
64                 return False
65             msg = json.loads(m)
66             return (
67                 msg["type"] == "Authentication" and
68                 msg["Authentication"]["serviceDescription"] == "SamLogon")
69
70         #
71         # Handler function for received authentication messages.
72         def message_handler(context, msgType, src, message):
73             # Print the message to help debugging the tests.
74             # as it's a JSON message it does not look like a sub-unit message.
75             print(message)
76             self.dc_msgs.append(message)
77
78         # Set up a messaging context to listen for authentication events on
79         # the domain controller.
80         msg_ctx = Messaging((1,), lp_ctx=lp_ctx)
81         msg_ctx.irpc_add_name(AUTH_EVENT_NAME)
82         msg_handler_and_context = (message_handler, None)
83         msg_ctx.register(msg_handler_and_context, msg_type=MSG_AUTH_LOG)
84
85         # Wait for the SamLogon message.
86         # As there could be other SamLogon's in progress we need to collect
87         # all the SamLogons and let the caller match them to the session.
88         self.dc_msgs = []
89         start_time = time.time()
90         while (time.time() - start_time < 1):
91             msg_ctx.loop_once(0.1)
92
93         # Only interested in SamLogon messages, filter out the rest
94         msgs = list(filter(is_sam_logon, self.dc_msgs))
95         if msgs:
96             for m in msgs:
97                 os.write(w1, get_bytes(m+"\n"))
98         else:
99             os.write(w1, get_bytes("None\n"))
100         os.close(w1)
101
102         msg_ctx.deregister(msg_handler_and_context, msg_type=MSG_AUTH_LOG)
103         msg_ctx.irpc_remove_name(AUTH_EVENT_NAME)
104
105         os._exit(0)
106
107     # Remove any DCE/RPC ncacn_np messages
108     # these only get triggered once per session, and stripping them out
109     # avoids ordering dependencies in the tests
110     #
111     def filter_messages(self, messages):
112         def keep(msg):
113             if (msg["type"] == "Authorization" and
114                 msg["Authorization"]["serviceDescription"] == "DCE/RPC" and
115                 msg["Authorization"]["authType"] == "ncacn_np"):
116                     return False
117             else:
118                 return True
119
120         return list(filter(keep, messages))
121
122     def setUp(self):
123         super(AuthLogTestsWinbind, self).setUp()
124         self.domain = os.environ["DOMAIN"]
125         self.host = os.environ["SERVER"]
126         self.dc = os.environ["DC_SERVER"]
127         self.lp = self.get_loadparm()
128         self.credentials = self.get_credentials()
129         self.session = system_session()
130
131         self.ldb = SamDB(
132             url="ldap://{0}".format(self.dc),
133             session_info=self.session,
134             credentials=self.credentials,
135             lp=self.lp)
136         self.create_user_account()
137
138         self.remoteAddress = ''
139
140     def tearDown(self):
141         super(AuthLogTestsWinbind, self).tearDown()
142         delete_force(self.ldb, self.user_dn)
143
144     #
145     # Create a test user account
146     def create_user_account(self):
147         self.user_pass = self.random_password()
148         self.user_name = USER_NAME
149         self.user_dn = "cn=%s,%s" % (self.user_name, self.ldb.domain_dn())
150
151         # remove the account if it exists, this will happen if a previous test
152         # run failed
153         delete_force(self.ldb, self.user_dn)
154
155         utf16pw = ('"%s"' % get_string(self.user_pass)).encode('utf-16-le')
156         self.ldb.add({
157            "dn": self.user_dn,
158            "objectclass": "user",
159            "sAMAccountName": "%s" % self.user_name,
160            "userAccountControl": str(UF_NORMAL_ACCOUNT),
161            "unicodePwd": utf16pw})
162
163         self.user_creds = Credentials()
164         self.user_creds.guess(self.get_loadparm())
165         self.user_creds.set_password(self.user_pass)
166         self.user_creds.set_username(self.user_name)
167         self.user_creds.set_workstation(self.server)
168
169     #
170     # Check that the domain server received a SamLogon request for the
171     # current logon.
172     #
173     def check_domain_server_authentication(self, pipe, logon_id, description):
174
175         messages = os.read(pipe, 8192)
176         messages = get_string(messages)
177         if len(messages) == 0 or messages == "None":
178             self.fail("No Domain server authentication message")
179
180         #
181         # Look for the SamLogon request matching logon_id
182         msg = None
183         for message in messages.split("\n"):
184             msg = json.loads(get_string(message))
185             if logon_id == msg["Authentication"]["logonId"]:
186                 break
187             msg = None
188
189         if msg is None:
190             self.fail("No Domain server authentication message")
191
192         #
193         # Validate that message contains the expected data
194         #
195         self.assertEqual("Authentication", msg["type"])
196         self.assertEqual(logon_id, msg["Authentication"]["logonId"])
197         self.assertEqual("SamLogon",
198                           msg["Authentication"]["serviceDescription"])
199         self.assertEqual(description,
200                           msg["Authentication"]["authDescription"])
201
202     def test_ntlm_auth(self):
203
204         def isLastExpectedMessage(msg):
205             DESC = "PAM_AUTH, ntlm_auth"
206             return (
207                 msg["type"] == "Authentication" and
208                 msg["Authentication"]["serviceDescription"] == "winbind" and
209                 msg["Authentication"]["authDescription"] is not None and
210                 msg["Authentication"]["authDescription"].startswith(DESC))
211
212         pipe = self.dc_watcher()
213         COMMAND = "bin/ntlm_auth"
214         self.check_run("{0} --username={1} --password={2}".format(
215             COMMAND,
216             self.credentials.get_username(),
217             self.credentials.get_password()),
218             msg="ntlm_auth failed")
219
220         messages = self.waitForMessages(isLastExpectedMessage)
221         messages = self.filter_messages(messages)
222         expected_messages = 1
223         self.assertEqual(expected_messages,
224                           len(messages),
225                           "Did not receive the expected number of messages")
226
227         # Check the first message it should be an Authentication
228         msg = messages[0]
229         self.assertEqual("Authentication", msg["type"])
230         self.assertTrue(
231             msg["Authentication"]["authDescription"].startswith(
232                 "PAM_AUTH, ntlm_auth,"))
233         self.assertEqual("winbind",
234                           msg["Authentication"]["serviceDescription"])
235         self.assertEqual("Plaintext", msg["Authentication"]["passwordType"])
236         # Logon type should be NetworkCleartext
237         self.assertEqual(8, msg["Authentication"]["logonType"])
238         # Event code should be Successful logon
239         self.assertEqual(4624, msg["Authentication"]["eventId"])
240         self.assertEqual("unix:", msg["Authentication"]["remoteAddress"])
241         self.assertEqual("unix:", msg["Authentication"]["localAddress"])
242         self.assertEqual(self.domain, msg["Authentication"]["clientDomain"])
243         self.assertEqual("NT_STATUS_OK", msg["Authentication"]["status"])
244         self.assertEqual(self.credentials.get_username(),
245                           msg["Authentication"]["clientAccount"])
246         self.assertEqual(self.credentials.get_domain(),
247                           msg["Authentication"]["clientDomain"])
248         self.assertTrue(msg["Authentication"]["workstation"] is None)
249
250         logon_id = msg["Authentication"]["logonId"]
251
252         #
253         # Now check the Domain server authentication message
254         #
255         self.check_domain_server_authentication(pipe, logon_id, "interactive")
256
257     def test_wbinfo(self):
258         def isLastExpectedMessage(msg):
259             DESC = "NTLM_AUTH, wbinfo"
260             return (
261                 msg["type"] == "Authentication" and
262                 msg["Authentication"]["serviceDescription"] == "winbind" and
263                 msg["Authentication"]["authDescription"] is not None and
264                 msg["Authentication"]["authDescription"].startswith(DESC))
265
266         pipe = self.dc_watcher()
267         COMMAND = "bin/wbinfo"
268         try:
269             self.check_run("{0} -a {1}%{2}".format(
270                 COMMAND,
271                 self.credentials.get_username(),
272                 self.credentials.get_password()),
273                 msg="ntlm_auth failed")
274         except BlackboxProcessError:
275             pass
276
277         messages = self.waitForMessages(isLastExpectedMessage)
278         messages = self.filter_messages(messages)
279         expected_messages = 3
280         self.assertEqual(expected_messages,
281                           len(messages),
282                           "Did not receive the expected number of messages")
283
284         # The 1st message should be an Authentication against the local
285         # password database
286         msg = messages[0]
287         self.assertEqual("Authentication", msg["type"])
288         self.assertTrue(msg["Authentication"]["authDescription"].startswith(
289             "PASSDB, wbinfo,"))
290         self.assertEqual("winbind",
291                           msg["Authentication"]["serviceDescription"])
292         # Logon type should be Interactive
293         self.assertEqual(2, msg["Authentication"]["logonType"])
294         # Event code should be Unsuccessful logon
295         self.assertEqual(4625, msg["Authentication"]["eventId"])
296         self.assertEqual("unix:", msg["Authentication"]["remoteAddress"])
297         self.assertEqual("unix:", msg["Authentication"]["localAddress"])
298         self.assertEqual('', msg["Authentication"]["clientDomain"])
299         # This is what the existing winbind implementation returns.
300         self.assertEqual("NT_STATUS_NO_SUCH_USER",
301                           msg["Authentication"]["status"])
302         self.assertEqual("NTLMv2", msg["Authentication"]["passwordType"])
303         self.assertEqual(self.credentials.get_username(),
304                           msg["Authentication"]["clientAccount"])
305         self.assertEqual("", msg["Authentication"]["clientDomain"])
306
307         logon_id = msg["Authentication"]["logonId"]
308
309         # The 2nd message should be a PAM_AUTH with the same logon id as the
310         # 1st message
311         msg = messages[1]
312         self.assertEqual("Authentication", msg["type"])
313         self.assertTrue(msg["Authentication"]["authDescription"].startswith(
314             "PAM_AUTH"))
315         self.assertEqual("winbind",
316                           msg["Authentication"]["serviceDescription"])
317         self.assertEqual(logon_id, msg["Authentication"]["logonId"])
318         # Logon type should be NetworkCleartext
319         self.assertEqual(8, msg["Authentication"]["logonType"])
320         # Event code should be Unsuccessful logon
321         self.assertEqual(4625, msg["Authentication"]["eventId"])
322         self.assertEqual("unix:", msg["Authentication"]["remoteAddress"])
323         self.assertEqual("unix:", msg["Authentication"]["localAddress"])
324         self.assertEqual('', msg["Authentication"]["clientDomain"])
325         # This is what the existing winbind implementation returns.
326         self.assertEqual("NT_STATUS_NO_SUCH_USER",
327                           msg["Authentication"]["status"])
328         self.assertEqual(self.credentials.get_username(),
329                           msg["Authentication"]["clientAccount"])
330         self.assertEqual("", msg["Authentication"]["clientDomain"])
331
332         # The 3rd message should be an NTLM_AUTH
333         msg = messages[2]
334         self.assertEqual("Authentication", msg["type"])
335         self.assertTrue(msg["Authentication"]["authDescription"].startswith(
336             "NTLM_AUTH, wbinfo,"))
337         self.assertEqual("winbind",
338                           msg["Authentication"]["serviceDescription"])
339         # Logon type should be Network
340         self.assertEqual(3, msg["Authentication"]["logonType"])
341         self.assertEqual("NT_STATUS_OK", msg["Authentication"]["status"])
342         # Event code should be successful logon
343         self.assertEqual(4624, msg["Authentication"]["eventId"])
344         self.assertEqual("NTLMv2", msg["Authentication"]["passwordType"])
345         self.assertEqual("unix:", msg["Authentication"]["remoteAddress"])
346         self.assertEqual("unix:", msg["Authentication"]["localAddress"])
347         self.assertEqual(self.credentials.get_username(),
348                           msg["Authentication"]["clientAccount"])
349         self.assertEqual(self.credentials.get_domain(),
350                           msg["Authentication"]["clientDomain"])
351
352         logon_id = msg["Authentication"]["logonId"]
353
354         #
355         # Now check the Domain server authentication message
356         #
357         self.check_domain_server_authentication(pipe, logon_id, "network")
358
359     def test_wbinfo_ntlmv1(self):
360         def isLastExpectedMessage(msg):
361             DESC = "NTLM_AUTH, wbinfo"
362             return (
363                 msg["type"] == "Authentication" and
364                 msg["Authentication"]["serviceDescription"] == "winbind" and
365                 msg["Authentication"]["authDescription"] is not None and
366                 msg["Authentication"]["authDescription"].startswith(DESC))
367
368         pipe = self.dc_watcher()
369         COMMAND = "bin/wbinfo"
370         try:
371             self.check_run("{0} --ntlmv1 -a {1}%{2}".format(
372                 COMMAND,
373                 self.credentials.get_username(),
374                 self.credentials.get_password()),
375                 msg="ntlm_auth failed")
376         except BlackboxProcessError:
377             pass
378
379         messages = self.waitForMessages(isLastExpectedMessage)
380         messages = self.filter_messages(messages)
381         expected_messages = 3
382         self.assertEqual(expected_messages,
383                           len(messages),
384                           "Did not receive the expected number of messages")
385
386         # The 1st message should be an Authentication against the local
387         # password database
388         msg = messages[0]
389         self.assertEqual("Authentication", msg["type"])
390         self.assertTrue(msg["Authentication"]["authDescription"].startswith(
391             "PASSDB, wbinfo,"))
392         self.assertEqual("winbind",
393                           msg["Authentication"]["serviceDescription"])
394         # Logon type should be Interactive
395         self.assertEqual(2, msg["Authentication"]["logonType"])
396         # Event code should be Unsuccessful logon
397         self.assertEqual(4625, msg["Authentication"]["eventId"])
398         self.assertEqual("unix:", msg["Authentication"]["remoteAddress"])
399         self.assertEqual("unix:", msg["Authentication"]["localAddress"])
400         self.assertEqual('', msg["Authentication"]["clientDomain"])
401         # This is what the existing winbind implementation returns.
402         self.assertEqual("NT_STATUS_NO_SUCH_USER",
403                           msg["Authentication"]["status"])
404         self.assertEqual("NTLMv2", msg["Authentication"]["passwordType"])
405         self.assertEqual(self.credentials.get_username(),
406                           msg["Authentication"]["clientAccount"])
407         self.assertEqual("", msg["Authentication"]["clientDomain"])
408
409         logon_id = msg["Authentication"]["logonId"]
410
411         # The 2nd message should be a PAM_AUTH with the same logon id as the
412         # 1st message
413         msg = messages[1]
414         self.assertEqual("Authentication", msg["type"])
415         self.assertTrue(msg["Authentication"]["authDescription"].startswith(
416             "PAM_AUTH"))
417         self.assertEqual("winbind",
418                           msg["Authentication"]["serviceDescription"])
419         self.assertEqual(logon_id, msg["Authentication"]["logonId"])
420         self.assertEqual("Plaintext", msg["Authentication"]["passwordType"])
421         # Logon type should be NetworkCleartext
422         self.assertEqual(8, msg["Authentication"]["logonType"])
423         # Event code should be Unsuccessful logon
424         self.assertEqual(4625, msg["Authentication"]["eventId"])
425         self.assertEqual("unix:", msg["Authentication"]["remoteAddress"])
426         self.assertEqual("unix:", msg["Authentication"]["localAddress"])
427         self.assertEqual('', msg["Authentication"]["clientDomain"])
428         # This is what the existing winbind implementation returns.
429         self.assertEqual("NT_STATUS_NO_SUCH_USER",
430                           msg["Authentication"]["status"])
431         self.assertEqual(self.credentials.get_username(),
432                           msg["Authentication"]["clientAccount"])
433         self.assertEqual("", msg["Authentication"]["clientDomain"])
434
435         # The 3rd message should be an NTLM_AUTH
436         msg = messages[2]
437         self.assertEqual("Authentication", msg["type"])
438         self.assertTrue(msg["Authentication"]["authDescription"].startswith(
439             "NTLM_AUTH, wbinfo,"))
440         self.assertEqual("winbind",
441                           msg["Authentication"]["serviceDescription"])
442         self.assertEqual("NTLMv1",
443                           msg["Authentication"]["passwordType"])
444         # Logon type should be Network
445         self.assertEqual(3, msg["Authentication"]["logonType"])
446         self.assertEqual("NT_STATUS_OK", msg["Authentication"]["status"])
447         # Event code should be successful logon
448         self.assertEqual(4624, msg["Authentication"]["eventId"])
449         self.assertEqual("unix:", msg["Authentication"]["remoteAddress"])
450         self.assertEqual("unix:", msg["Authentication"]["localAddress"])
451         self.assertEqual(self.credentials.get_username(),
452                           msg["Authentication"]["clientAccount"])
453         self.assertEqual(self.credentials.get_domain(),
454                           msg["Authentication"]["clientDomain"])
455
456         logon_id = msg["Authentication"]["logonId"]
457         #
458         # Now check the Domain server authentication message
459         #
460         self.check_domain_server_authentication(pipe, logon_id, "network")