1 # Unix SMB/CIFS implementation.
2 # Copyright (C) Jelmer Vernooij <jelmer@samba.org> 2007-2010
3 # Copyright (C) Stefan Metzmacher 2014,2015
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
22 import samba.dcerpc.dcerpc
23 import samba.dcerpc.base
24 import samba.dcerpc.epmapper
26 from samba import gensec
27 from samba.credentials import Credentials
28 from samba.tests import TestCase
29 from samba.ndr import ndr_pack, ndr_unpack, ndr_unpack_out
32 class RawDCERPCTest(TestCase):
33 """A raw DCE/RPC Test case."""
35 def _disconnect(self, reason):
41 sys.stderr.write("disconnect[%s]\n" % reason)
45 self.a = socket.getaddrinfo(self.host, self.tcp_port, socket.AF_UNSPEC,
46 socket.SOCK_STREAM, socket.SOL_TCP,
48 self.s = socket.socket(self.a[0][0], self.a[0][1], self.a[0][2])
50 self.s.connect(self.a[0][4])
51 except socket.error as e:
57 except Exception as e:
63 super(RawDCERPCTest, self).setUp()
64 self.do_ndr_print = False
65 self.do_hexdump = False
67 self.host = samba.tests.env_get_var_value('SERVER')
68 self.target_hostname = samba.tests.env_get_var_value('TARGET_HOSTNAME', allow_missing=True)
69 if self.target_hostname is None:
70 self.target_hostname = self.host
74 self.settings["lp_ctx"] = self.lp_ctx = samba.tests.env_loadparm()
75 self.settings["target_hostname"] = self.target_hostname
82 def second_connection(self, tcp_port=None):
83 c = RawDCERPCTest(methodName='noop')
84 c.do_ndr_print = self.do_ndr_print
85 c.do_hexdump = self.do_hexdump
88 c.target_hostname = self.target_hostname
89 if tcp_port is not None:
92 c.tcp_port = self.tcp_port
94 c.settings = self.settings
99 def get_user_creds(self):
102 username = samba.tests.env_get_var_value('USERNAME')
103 password = samba.tests.env_get_var_value('PASSWORD')
104 c.set_username(username)
105 c.set_password(password)
108 def get_anon_creds(self):
113 def get_auth_context_creds(self, creds, auth_type, auth_level,
117 if g_auth_level is None:
118 g_auth_level = auth_level
120 g = gensec.Security.start_client(self.settings)
121 g.set_credentials(creds)
122 g.want_feature(gensec.FEATURE_DCE_STYLE)
123 g.start_mech_by_authtype(auth_type, g_auth_level)
126 auth_context["auth_type"] = auth_type
127 auth_context["auth_level"] = auth_level
128 auth_context["auth_context_id"] = auth_context_id
129 auth_context["g_auth_level"] = g_auth_level
130 auth_context["gensec"] = g
134 def do_generic_bind(self, ctx, auth_context=None,
135 pfc_flags=samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_FIRST |
136 samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_LAST,
137 assoc_group_id=0, call_id=0,
138 nak_reason=None, alter_fault=None):
141 if auth_context is not None:
143 (finished, to_server) = auth_context["gensec"].update(from_server)
144 self.assertFalse(finished)
146 auth_info = self.generate_auth(auth_type=auth_context["auth_type"],
147 auth_level=auth_context["auth_level"],
148 auth_context_id=auth_context["auth_context_id"],
153 req = self.generate_bind(call_id=call_id,
156 assoc_group_id=assoc_group_id,
159 rep = self.recv_pdu()
160 if nak_reason is not None:
161 self.verify_pdu(rep, samba.dcerpc.dcerpc.DCERPC_PKT_BIND_NAK, req.call_id,
163 self.assertEquals(rep.u.reject_reason, nak_reason)
164 self.assertEquals(rep.u.num_versions, 1)
165 self.assertEquals(rep.u.versions[0].rpc_vers, req.rpc_vers)
166 self.assertEquals(rep.u.versions[0].rpc_vers_minor, req.rpc_vers_minor)
167 self.assertEquals(len(rep.u._pad), 3)
168 self.assertEquals(rep.u._pad, '\0' * 3)
170 self.verify_pdu(rep, samba.dcerpc.dcerpc.DCERPC_PKT_BIND_ACK, req.call_id,
172 self.assertEquals(rep.u.max_xmit_frag, req.u.max_xmit_frag)
173 self.assertEquals(rep.u.max_recv_frag, req.u.max_recv_frag)
174 if assoc_group_id != 0:
175 self.assertEquals(rep.u.assoc_group_id, assoc_group_id)
177 self.assertNotEquals(rep.u.assoc_group_id, 0)
178 assoc_group_id = rep.u.assoc_group_id
179 port_str = "%d" % self.tcp_port
180 port_len = len(port_str) + 1
181 mod_len = (2 + port_len) % 4
183 port_pad = 4 - mod_len
186 self.assertEquals(rep.u.secondary_address_size, port_len)
187 self.assertEquals(rep.u.secondary_address, port_str)
188 self.assertEquals(len(rep.u._pad1), port_pad)
189 # sometimes windows sends random bytes
190 # self.assertEquals(rep.u._pad1, '\0' * port_pad)
191 self.assertEquals(rep.u.num_results, 1)
192 self.assertEquals(rep.u.ctx_list[0].result,
193 samba.dcerpc.dcerpc.DCERPC_BIND_ACK_RESULT_ACCEPTANCE)
194 self.assertEquals(rep.u.ctx_list[0].reason,
195 samba.dcerpc.dcerpc.DCERPC_BIND_ACK_REASON_NOT_SPECIFIED)
196 self.assertNDRSyntaxEquals(rep.u.ctx_list[0].syntax, ctx.transfer_syntaxes[0])
198 if auth_context is None:
199 self.assertEquals(rep.auth_length, 0)
200 self.assertEquals(len(rep.u.auth_info), 0)
202 self.assertNotEquals(rep.auth_length, 0)
203 self.assertGreater(len(rep.u.auth_info), samba.dcerpc.dcerpc.DCERPC_AUTH_TRAILER_LENGTH)
204 self.assertEquals(rep.auth_length, len(rep.u.auth_info) - samba.dcerpc.dcerpc.DCERPC_AUTH_TRAILER_LENGTH)
206 a = self.parse_auth(rep.u.auth_info)
208 from_server = a.credentials
209 (finished, to_server) = auth_context["gensec"].update(from_server)
210 self.assertFalse(finished)
212 auth_info = self.generate_auth(auth_type=auth_context["auth_type"],
213 auth_level=auth_context["auth_level"],
214 auth_context_id=auth_context["auth_context_id"],
216 req = self.generate_alter(call_id=call_id,
218 assoc_group_id=0xffffffff -assoc_group_id,
221 rep = self.recv_pdu()
222 if alter_fault is not None:
223 self.verify_pdu(rep, samba.dcerpc.dcerpc.DCERPC_PKT_FAULT, req.call_id,
224 pfc_flags=req.pfc_flags |
225 samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_DID_NOT_EXECUTE,
227 self.assertNotEquals(rep.u.alloc_hint, 0)
228 self.assertEquals(rep.u.context_id, 0)
229 self.assertEquals(rep.u.cancel_count, 0)
230 self.assertEquals(rep.u.flags, 0)
231 self.assertEquals(rep.u.status, alter_fault)
232 self.assertEquals(rep.u.reserved, 0)
233 self.assertEquals(len(rep.u.error_and_verifier), 0)
235 self.verify_pdu(rep, samba.dcerpc.dcerpc.DCERPC_PKT_ALTER_RESP, req.call_id)
236 self.assertEquals(rep.u.max_xmit_frag, req.u.max_xmit_frag)
237 self.assertEquals(rep.u.max_recv_frag, req.u.max_recv_frag)
238 self.assertEquals(rep.u.assoc_group_id, assoc_group_id)
239 self.assertEquals(rep.u.secondary_address_size, 0)
240 self.assertEquals(rep.u.secondary_address, '')
241 self.assertEquals(len(rep.u._pad1), 2)
242 # sometimes windows sends random bytes
243 # self.assertEquals(rep.u._pad1, '\0' * 2)
244 self.assertEquals(rep.u.num_results, 1)
245 self.assertEquals(rep.u.ctx_list[0].result,
246 samba.dcerpc.dcerpc.DCERPC_BIND_ACK_RESULT_ACCEPTANCE)
247 self.assertEquals(rep.u.ctx_list[0].reason,
248 samba.dcerpc.dcerpc.DCERPC_BIND_ACK_REASON_NOT_SPECIFIED)
249 self.assertNDRSyntaxEquals(rep.u.ctx_list[0].syntax, ctx.transfer_syntaxes[0])
250 self.assertNotEquals(rep.auth_length, 0)
251 self.assertGreater(len(rep.u.auth_info), samba.dcerpc.dcerpc.DCERPC_AUTH_TRAILER_LENGTH)
252 self.assertEquals(rep.auth_length, len(rep.u.auth_info) - samba.dcerpc.dcerpc.DCERPC_AUTH_TRAILER_LENGTH)
254 a = self.parse_auth(rep.u.auth_info)
256 from_server = a.credentials
257 (finished, to_server) = auth_context["gensec"].update(from_server)
258 self.assertTrue(finished)
262 def prepare_presentation(self, abstract, transfer, object=None,
263 context_id=0xffff, epmap=False, auth_context=None,
264 pfc_flags=samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_FIRST |
265 samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_LAST,
269 self.epmap_reconnect(abstract, transfer=transfer, object=object)
271 tsf1_list = [transfer]
272 ctx = samba.dcerpc.dcerpc.ctx_list()
273 ctx.context_id = context_id
274 ctx.num_transfer_syntaxes = len(tsf1_list)
275 ctx.abstract_syntax = abstract
276 ctx.transfer_syntaxes = tsf1_list
278 ack = self.do_generic_bind(ctx=ctx,
279 auth_context=auth_context,
281 assoc_group_id=assoc_group_id)
289 def do_single_request(self, call_id, ctx, io,
292 bigendian=False, ndr64=False,
293 allow_remaining=False,
296 fault_pfc_flags=samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_FIRST |
297 samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_LAST,
299 fault_context_id=None,
304 if fault_context_id is None:
305 fault_context_id = ctx.context_id
307 if ndr_print is None:
308 ndr_print = self.do_ndr_print
310 hexdump = self.do_hexdump
314 sys.stderr.write("in: %s" % samba.ndr.ndr_print_in(io))
315 stub_in = samba.ndr.ndr_pack_in(io, bigendian=bigendian, ndr64=ndr64)
317 sys.stderr.write("stub_in: %d\n%s" % (len(stub_in), self.hexdump(stub_in)))
319 # only used for sig_size calculation
320 stub_in = '\xff' * samba.dcerpc.dcerpc.DCERPC_AUTH_PAD_ALIGNMENT
323 if auth_context is not None:
324 mod_len = len(stub_in) % samba.dcerpc.dcerpc.DCERPC_AUTH_PAD_ALIGNMENT
327 auth_pad_length = samba.dcerpc.dcerpc.DCERPC_AUTH_PAD_ALIGNMENT - mod_len
328 stub_in += '\x00' * auth_pad_length
330 if auth_context["g_auth_level"] >= samba.dcerpc.dcerpc.DCERPC_AUTH_LEVEL_PACKET:
331 sig_size = auth_context["gensec"].sig_size(len(stub_in))
335 zero_sig = "\x00" * sig_size
336 auth_info = self.generate_auth(auth_type=auth_context["auth_type"],
337 auth_level=auth_context["auth_level"],
338 auth_pad_length=auth_pad_length,
339 auth_context_id=auth_context["auth_context_id"],
344 pfc_flags = samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_FIRST
345 pfc_flags |= samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_LAST
346 if object is not None:
347 pfc_flags |= samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_OBJECT_UUID
349 req = self.generate_request(call_id=call_id,
350 context_id=ctx.context_id,
358 if sig_size != 0 and auth_context["auth_level"] >= samba.dcerpc.dcerpc.DCERPC_AUTH_LEVEL_PACKET:
359 req_blob = samba.ndr.ndr_pack(req)
360 ofs_stub = samba.dcerpc.dcerpc.DCERPC_REQUEST_LENGTH
361 ofs_sig = len(req_blob) - req.auth_length
362 ofs_trailer = ofs_sig - samba.dcerpc.dcerpc.DCERPC_AUTH_TRAILER_LENGTH
363 req_data = req_blob[ofs_stub:ofs_trailer]
364 req_whole = req_blob[0:ofs_sig]
365 sig = auth_context["gensec"].sign_packet(req_data, req_whole)
366 auth_info = self.generate_auth(auth_type=auth_context["auth_type"],
367 auth_level=auth_context["auth_level"],
368 auth_pad_length=auth_pad_length,
369 auth_context_id=auth_context["auth_context_id"],
371 req = self.generate_request(call_id=call_id,
372 context_id=ctx.context_id,
378 self.send_pdu(req, ndr_print=ndr_print, hexdump=hexdump)
380 (rep, rep_blob) = self.recv_pdu_raw(timeout=timeout,
384 self.verify_pdu(rep, samba.dcerpc.dcerpc.DCERPC_PKT_FAULT, req.call_id,
385 pfc_flags=fault_pfc_flags, auth_length=0)
386 self.assertNotEquals(rep.u.alloc_hint, 0)
387 self.assertEquals(rep.u.context_id, fault_context_id)
388 self.assertEquals(rep.u.cancel_count, 0)
389 self.assertEquals(rep.u.flags, 0)
390 self.assertEquals(rep.u.status, fault_status)
391 self.assertEquals(rep.u.reserved, 0)
392 self.assertEquals(len(rep.u.error_and_verifier), 0)
395 self.verify_pdu(rep, samba.dcerpc.dcerpc.DCERPC_PKT_RESPONSE, req.call_id,
396 auth_length=sig_size)
397 self.assertNotEquals(rep.u.alloc_hint, 0)
398 self.assertEquals(rep.u.context_id, req.u.context_id & 0xff)
399 self.assertEquals(rep.u.cancel_count, 0)
400 self.assertGreaterEqual(len(rep.u.stub_and_verifier), rep.u.alloc_hint)
403 ofs_stub = samba.dcerpc.dcerpc.DCERPC_REQUEST_LENGTH
404 ofs_sig = rep.frag_length - rep.auth_length
405 ofs_trailer = ofs_sig - samba.dcerpc.dcerpc.DCERPC_AUTH_TRAILER_LENGTH
406 rep_data = rep_blob[ofs_stub:ofs_trailer]
407 rep_whole = rep_blob[0:ofs_sig]
408 rep_sig = rep_blob[ofs_sig:]
409 rep_auth_info_blob = rep_blob[ofs_trailer:]
411 rep_auth_info = self.parse_auth(rep_auth_info_blob)
412 self.assertEquals(rep_auth_info.auth_type, auth_context["auth_type"])
413 self.assertEquals(rep_auth_info.auth_level, auth_context["auth_level"])
414 self.assertLessEqual(rep_auth_info.auth_pad_length, len(rep_data))
415 self.assertEquals(rep_auth_info.auth_reserved, 0)
416 self.assertEquals(rep_auth_info.auth_context_id, auth_context["auth_context_id"])
417 self.assertEquals(rep_auth_info.credentials, rep_sig)
419 if auth_context["auth_level"] >= samba.dcerpc.dcerpc.DCERPC_AUTH_LEVEL_PACKET:
420 auth_context["gensec"].check_packet(rep_data, rep_whole, rep_sig)
422 stub_out = rep_data[0:-rep_auth_info.auth_pad_length]
424 stub_out = rep.u.stub_and_verifier
427 sys.stderr.write("stub_out: %d\n%s" % (len(stub_out), self.hexdump(stub_out)))
428 ndr_unpack_out(io, stub_out, bigendian=bigendian, ndr64=ndr64,
429 allow_remaining=allow_remaining)
431 sys.stderr.write("out: %s" % samba.ndr.ndr_print_out(io))
433 def epmap_reconnect(self, abstract, transfer=None, object=None):
434 ndr32 = samba.dcerpc.base.transfer_syntax_ndr()
440 object = samba.dcerpc.misc.GUID()
442 ctx = self.prepare_presentation(samba.dcerpc.epmapper.abstract_syntax(),
443 transfer, context_id=0)
445 data1 = ndr_pack(abstract)
446 lhs1 = samba.dcerpc.epmapper.epm_lhs()
447 lhs1.protocol = samba.dcerpc.epmapper.EPM_PROTOCOL_UUID
448 lhs1.lhs_data = data1[:18]
449 rhs1 = samba.dcerpc.epmapper.epm_rhs_uuid()
450 rhs1.unknown = data1[18:]
451 floor1 = samba.dcerpc.epmapper.epm_floor()
454 data2 = ndr_pack(transfer)
455 lhs2 = samba.dcerpc.epmapper.epm_lhs()
456 lhs2.protocol = samba.dcerpc.epmapper.EPM_PROTOCOL_UUID
457 lhs2.lhs_data = data2[:18]
458 rhs2 = samba.dcerpc.epmapper.epm_rhs_uuid()
459 rhs2.unknown = data1[18:]
460 floor2 = samba.dcerpc.epmapper.epm_floor()
463 lhs3 = samba.dcerpc.epmapper.epm_lhs()
464 lhs3.protocol = samba.dcerpc.epmapper.EPM_PROTOCOL_NCACN
466 floor3 = samba.dcerpc.epmapper.epm_floor()
468 floor3.rhs.minor_version = 0
469 lhs4 = samba.dcerpc.epmapper.epm_lhs()
470 lhs4.protocol = samba.dcerpc.epmapper.EPM_PROTOCOL_TCP
472 floor4 = samba.dcerpc.epmapper.epm_floor()
474 floor4.rhs.port = self.tcp_port
475 lhs5 = samba.dcerpc.epmapper.epm_lhs()
476 lhs5.protocol = samba.dcerpc.epmapper.EPM_PROTOCOL_IP
478 floor5 = samba.dcerpc.epmapper.epm_floor()
480 floor5.rhs.ipaddr = "0.0.0.0"
482 floors = [floor1, floor2, floor3, floor4, floor5]
483 req_tower = samba.dcerpc.epmapper.epm_tower()
484 req_tower.num_floors = len(floors)
485 req_tower.floors = floors
486 req_twr = samba.dcerpc.epmapper.epm_twr_t()
487 req_twr.tower = req_tower
489 epm_map = samba.dcerpc.epmapper.epm_Map()
490 epm_map.in_object = object
491 epm_map.in_map_tower = req_twr
492 epm_map.in_entry_handle = samba.dcerpc.misc.policy_handle()
493 epm_map.in_max_towers = 4
495 self.do_single_request(call_id=2, ctx=ctx, io=epm_map)
497 self.assertGreaterEqual(epm_map.out_num_towers, 1)
498 rep_twr = epm_map.out_towers[0].twr
499 self.assertIsNotNone(rep_twr)
500 self.assertEqual(rep_twr.tower_length, 75)
501 self.assertEqual(rep_twr.tower.num_floors, 5)
502 self.assertEqual(len(rep_twr.tower.floors), 5)
503 self.assertEqual(rep_twr.tower.floors[3].lhs.protocol,
504 samba.dcerpc.epmapper.EPM_PROTOCOL_TCP)
505 self.assertEqual(rep_twr.tower.floors[3].lhs.protocol,
506 samba.dcerpc.epmapper.EPM_PROTOCOL_TCP)
508 # reconnect to the given port
509 self._disconnect("epmap_reconnect")
510 self.tcp_port = rep_twr.tower.floors[3].rhs.port
513 def send_pdu(self, req, ndr_print=None, hexdump=None):
514 if ndr_print is None:
515 ndr_print = self.do_ndr_print
517 hexdump = self.do_hexdump
519 req_pdu = ndr_pack(req)
521 sys.stderr.write("send_pdu: %s" % samba.ndr.ndr_print(req))
523 sys.stderr.write("send_pdu: %d\n%s" % (len(req_pdu), self.hexdump(req_pdu)))
525 sent = self.s.send(req_pdu, 0)
526 if sent == len(req_pdu):
528 req_pdu = req_pdu[sent:]
529 except socket.error as e:
530 self._disconnect("send_pdu: %s" % e)
533 self._disconnect("send_pdu: %s" % e)
538 def recv_raw(self, hexdump=None, timeout=None):
541 hexdump = self.do_hexdump
543 if timeout is not None:
544 self.s.settimeout(timeout)
545 rep_pdu = self.s.recv(0xffff, 0)
546 self.s.settimeout(10)
547 if len(rep_pdu) == 0:
548 self._disconnect("recv_raw: EOF")
551 sys.stderr.write("recv_raw: %d\n%s" % (len(rep_pdu), self.hexdump(rep_pdu)))
552 except socket.timeout as e:
553 self.s.settimeout(10)
554 sys.stderr.write("recv_raw: TIMEOUT\n")
556 except socket.error as e:
557 self._disconnect("recv_raw: %s" % e)
560 self._disconnect("recv_raw: %s" % e)
566 def recv_pdu_raw(self, ndr_print=None, hexdump=None, timeout=None):
569 if ndr_print is None:
570 ndr_print = self.do_ndr_print
572 hexdump = self.do_hexdump
574 rep_pdu = self.recv_raw(hexdump=hexdump, timeout=timeout)
577 rep = ndr_unpack(samba.dcerpc.dcerpc.ncacn_packet, rep_pdu, allow_remaining=True)
579 sys.stderr.write("recv_pdu: %s" % samba.ndr.ndr_print(rep))
580 self.assertEqual(rep.frag_length, len(rep_pdu))
583 return (rep, rep_pdu)
585 def recv_pdu(self, ndr_print=None, hexdump=None, timeout=None):
586 (rep, rep_pdu) = self.recv_pdu_raw(ndr_print=ndr_print,
591 def generate_auth(self,
595 auth_context_id=None,
597 ndr_print=None, hexdump=None):
598 if ndr_print is None:
599 ndr_print = self.do_ndr_print
601 hexdump = self.do_hexdump
603 if auth_type is not None:
604 a = samba.dcerpc.dcerpc.auth()
605 a.auth_type = auth_type
606 a.auth_level = auth_level
607 a.auth_pad_length = auth_pad_length
608 a.auth_context_id = auth_context_id
609 a.credentials = auth_blob
613 sys.stderr.write("generate_auth: %s" % samba.ndr.ndr_print(a))
615 sys.stderr.write("generate_auth: %d\n%s" % (len(ai), self.hexdump(ai)))
621 def parse_auth(self, auth_info, ndr_print=None, hexdump=None):
622 if ndr_print is None:
623 ndr_print = self.do_ndr_print
625 hexdump = self.do_hexdump
627 if (len(auth_info) <= samba.dcerpc.dcerpc.DCERPC_AUTH_TRAILER_LENGTH):
631 sys.stderr.write("parse_auth: %d\n%s" % (len(auth_info), self.hexdump(auth_info)))
632 a = ndr_unpack(samba.dcerpc.dcerpc.auth, auth_info, allow_remaining=True)
634 sys.stderr.write("parse_auth: %s" % samba.ndr.ndr_print(a))
638 def generate_pdu(self, ptype, call_id, payload,
641 pfc_flags=samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_FIRST |
642 samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_LAST,
643 drep=[samba.dcerpc.dcerpc.DCERPC_DREP_LE, 0, 0, 0],
644 ndr_print=None, hexdump=None):
646 if getattr(payload, 'auth_info', None):
647 ai = payload.auth_info
651 p = samba.dcerpc.dcerpc.ncacn_packet()
652 p.rpc_vers = rpc_vers
653 p.rpc_vers_minor = rpc_vers_minor
655 p.pfc_flags = pfc_flags
658 if len(ai) > samba.dcerpc.dcerpc.DCERPC_AUTH_TRAILER_LENGTH:
659 p.auth_length = len(ai) - samba.dcerpc.dcerpc.DCERPC_AUTH_TRAILER_LENGTH
666 p.frag_length = len(pdu)
670 def verify_pdu(self, p, ptype, call_id,
673 pfc_flags=samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_FIRST |
674 samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_LAST,
675 drep=[samba.dcerpc.dcerpc.DCERPC_DREP_LE, 0, 0, 0],
678 self.assertIsNotNone(p, "No valid pdu")
680 if getattr(p.u, 'auth_info', None):
685 self.assertEqual(p.rpc_vers, rpc_vers)
686 self.assertEqual(p.rpc_vers_minor, rpc_vers_minor)
687 self.assertEqual(p.ptype, ptype)
688 self.assertEqual(p.pfc_flags, pfc_flags)
689 self.assertEqual(p.drep, drep)
690 self.assertGreaterEqual(p.frag_length,
691 samba.dcerpc.dcerpc.DCERPC_NCACN_PAYLOAD_OFFSET)
692 if len(ai) > samba.dcerpc.dcerpc.DCERPC_AUTH_TRAILER_LENGTH:
693 self.assertEqual(p.auth_length,
694 len(ai) - samba.dcerpc.dcerpc.DCERPC_AUTH_TRAILER_LENGTH)
695 elif auth_length is not None:
696 self.assertEqual(p.auth_length, auth_length)
698 self.assertEqual(p.auth_length, 0)
699 self.assertEqual(p.call_id, call_id)
703 def generate_bind(self, call_id,
704 pfc_flags=samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_FIRST |
705 samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_LAST,
711 ndr_print=None, hexdump=None):
713 b = samba.dcerpc.dcerpc.bind()
714 b.max_xmit_frag = max_xmit_frag
715 b.max_recv_frag = max_recv_frag
716 b.assoc_group_id = assoc_group_id
717 b.num_contexts = len(ctx_list)
718 b.ctx_list = ctx_list
719 b.auth_info = auth_info
721 p = self.generate_pdu(ptype=samba.dcerpc.dcerpc.DCERPC_PKT_BIND,
725 ndr_print=ndr_print, hexdump=hexdump)
729 def generate_alter(self, call_id,
730 pfc_flags=samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_FIRST |
731 samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_LAST,
737 ndr_print=None, hexdump=None):
739 a = samba.dcerpc.dcerpc.bind()
740 a.max_xmit_frag = max_xmit_frag
741 a.max_recv_frag = max_recv_frag
742 a.assoc_group_id = assoc_group_id
743 a.num_contexts = len(ctx_list)
744 a.ctx_list = ctx_list
745 a.auth_info = auth_info
747 p = self.generate_pdu(ptype=samba.dcerpc.dcerpc.DCERPC_PKT_ALTER,
751 ndr_print=ndr_print, hexdump=hexdump)
755 def generate_auth3(self, call_id,
756 pfc_flags=samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_FIRST |
757 samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_LAST,
759 ndr_print=None, hexdump=None):
761 a = samba.dcerpc.dcerpc.auth3()
762 a.auth_info = auth_info
764 p = self.generate_pdu(ptype=samba.dcerpc.dcerpc.DCERPC_PKT_AUTH3,
768 ndr_print=ndr_print, hexdump=hexdump)
772 def generate_request(self, call_id,
773 pfc_flags=samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_FIRST |
774 samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_LAST,
781 ndr_print=None, hexdump=None):
783 if alloc_hint is None:
784 alloc_hint = len(stub)
786 r = samba.dcerpc.dcerpc.request()
787 r.alloc_hint = alloc_hint
788 r.context_id = context_id
790 if object is not None:
792 r.stub_and_verifier = stub + auth_info
794 p = self.generate_pdu(ptype=samba.dcerpc.dcerpc.DCERPC_PKT_REQUEST,
798 ndr_print=ndr_print, hexdump=hexdump)
800 if len(auth_info) > samba.dcerpc.dcerpc.DCERPC_AUTH_TRAILER_LENGTH:
801 p.auth_length = len(auth_info) - samba.dcerpc.dcerpc.DCERPC_AUTH_TRAILER_LENGTH
805 def generate_co_cancel(self, call_id,
806 pfc_flags=samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_FIRST |
807 samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_LAST,
809 ndr_print=None, hexdump=None):
811 c = samba.dcerpc.dcerpc.co_cancel()
812 c.auth_info = auth_info
814 p = self.generate_pdu(ptype=samba.dcerpc.dcerpc.DCERPC_PKT_CO_CANCEL,
818 ndr_print=ndr_print, hexdump=hexdump)
822 def generate_orphaned(self, call_id,
823 pfc_flags=samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_FIRST |
824 samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_LAST,
826 ndr_print=None, hexdump=None):
828 o = samba.dcerpc.dcerpc.orphaned()
829 o.auth_info = auth_info
831 p = self.generate_pdu(ptype=samba.dcerpc.dcerpc.DCERPC_PKT_ORPHANED,
835 ndr_print=ndr_print, hexdump=hexdump)
839 def generate_shutdown(self, call_id,
840 pfc_flags=samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_FIRST |
841 samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_LAST,
842 ndr_print=None, hexdump=None):
844 s = samba.dcerpc.dcerpc.shutdown()
846 p = self.generate_pdu(ptype=samba.dcerpc.dcerpc.DCERPC_PKT_SHUTDOWN,
850 ndr_print=ndr_print, hexdump=hexdump)
854 def assertIsConnected(self):
855 self.assertIsNotNone(self.s, msg="Not connected")
858 def assertNotConnected(self):
859 self.assertIsNone(self.s, msg="Is connected")
862 def assertNDRSyntaxEquals(self, s1, s2):
863 self.assertEqual(s1.uuid, s2.uuid)
864 self.assertEqual(s1.if_version, s2.if_version)