# Unix SMB/CIFS implementation.
# Copyright (C) Jelmer Vernooij <jelmer@samba.org> 2007-2010
+# Copyright (C) Stefan Metzmacher 2014,2015
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
from samba import param
from samba.samdb import SamDB
from samba import credentials
+import samba.ndr
+import samba.dcerpc.dcerpc
+import samba.dcerpc.base
+import samba.dcerpc.epmapper
+import socket
+import struct
import subprocess
import sys
import tempfile
class RpcInterfaceTestCase(TestCase):
"""DCE/RPC Test case."""
+class RawDCERPCTest(TestCase):
+ """A raw DCE/RPC Test case."""
+
+ def _disconnect(self, reason):
+ if self.s is None:
+ return
+ self.s.close()
+ self.s = None
+ if self.do_hexdump:
+ sys.stderr.write("disconnect[%s]\n" % reason)
+
+ def connect(self):
+ try:
+ self.a = socket.getaddrinfo(self.host, self.tcp_port, socket.AF_UNSPEC,
+ socket.SOCK_STREAM, socket.SOL_TCP,
+ 0)
+ self.s = socket.socket(self.a[0][0], self.a[0][1], self.a[0][2])
+ self.s.settimeout(10)
+ self.s.connect(self.a[0][4])
+ except socket.error as e:
+ self.s.close()
+ raise
+ except IOError as e:
+ self.s.close()
+ raise
+ except Exception as e:
+ raise
+ finally:
+ pass
+
+ def setUp(self):
+ super(RawDCERPCTest, self).setUp()
+ self.do_ndr_print = False
+ self.do_hexdump = False
+
+ self.host = samba.tests.env_get_var_value('SERVER')
+ self.tcp_port = 135
+
+ self.settings = {}
+ self.settings["lp_ctx"] = self.lp_ctx = samba.tests.env_loadparm()
+ self.settings["target_hostname"] = self.host
+
+ self.connect()
+
+ def epmap_reconnect(self, abstract):
+ ndr32 = samba.dcerpc.base.transfer_syntax_ndr()
+
+ tsf0_list = [ndr32]
+ ctx0 = samba.dcerpc.dcerpc.ctx_list()
+ ctx0.context_id = 1
+ ctx0.num_transfer_syntaxes = len(tsf0_list)
+ ctx0.abstract_syntax = samba.dcerpc.epmapper.abstract_syntax()
+ ctx0.transfer_syntaxes = tsf0_list
+
+ req = self.generate_bind(call_id=0, ctx_list=[ctx0])
+ self.send_pdu(req)
+ rep = self.recv_pdu()
+ self.verify_pdu(rep, samba.dcerpc.dcerpc.DCERPC_PKT_BIND_ACK,
+ req.call_id, auth_length=0)
+ self.assertEqual(rep.u.max_xmit_frag, req.u.max_xmit_frag)
+ self.assertEqual(rep.u.max_recv_frag, req.u.max_recv_frag)
+ self.assertNotEqual(rep.u.assoc_group_id, req.u.assoc_group_id)
+ self.assertEqual(rep.u.secondary_address_size, 4)
+ self.assertEqual(rep.u.secondary_address, "%d" % self.tcp_port)
+ self.assertEqual(len(rep.u._pad1), 2)
+ self.assertEqual(rep.u._pad1, '\0' * 2)
+ self.assertEqual(rep.u.num_results, 1)
+ self.assertEqual(rep.u.ctx_list[0].result,
+ samba.dcerpc.dcerpc.DCERPC_BIND_ACK_RESULT_ACCEPTANCE)
+ self.assertEqual(rep.u.ctx_list[0].reason,
+ samba.dcerpc.dcerpc.DCERPC_BIND_ACK_REASON_NOT_SPECIFIED)
+ self.assertNDRSyntaxEquals(rep.u.ctx_list[0].syntax, ndr32)
+ self.assertEqual(rep.u.auth_info, '\0' * 0)
+
+ # And now try a request
+ data1 = samba.ndr.ndr_pack(abstract)
+ lhs1 = samba.dcerpc.epmapper.epm_lhs()
+ lhs1.protocol = samba.dcerpc.epmapper.EPM_PROTOCOL_UUID
+ lhs1.lhs_data = data1[:18]
+ rhs1 = samba.dcerpc.epmapper.epm_rhs_uuid()
+ rhs1.unknown = data1[18:]
+ floor1 = samba.dcerpc.epmapper.epm_floor()
+ floor1.lhs = lhs1
+ floor1.rhs = rhs1
+ data2 = samba.ndr.ndr_pack(ndr32)
+ lhs2 = samba.dcerpc.epmapper.epm_lhs()
+ lhs2.protocol = samba.dcerpc.epmapper.EPM_PROTOCOL_UUID
+ lhs2.lhs_data = data2[:18]
+ rhs2 = samba.dcerpc.epmapper.epm_rhs_uuid()
+ rhs2.unknown = data1[18:]
+ floor2 = samba.dcerpc.epmapper.epm_floor()
+ floor2.lhs = lhs2
+ floor2.rhs = rhs2
+ lhs3 = samba.dcerpc.epmapper.epm_lhs()
+ lhs3.protocol = samba.dcerpc.epmapper.EPM_PROTOCOL_NCACN
+ lhs3.lhs_data = ""
+ floor3 = samba.dcerpc.epmapper.epm_floor()
+ floor3.lhs = lhs3
+ floor3.rhs.minor_version = 0
+ lhs4 = samba.dcerpc.epmapper.epm_lhs()
+ lhs4.protocol = samba.dcerpc.epmapper.EPM_PROTOCOL_TCP
+ lhs4.lhs_data = ""
+ floor4 = samba.dcerpc.epmapper.epm_floor()
+ floor4.lhs = lhs4
+ floor4.rhs.port = self.tcp_port
+ lhs5 = samba.dcerpc.epmapper.epm_lhs()
+ lhs5.protocol = samba.dcerpc.epmapper.EPM_PROTOCOL_IP
+ lhs5.lhs_data = ""
+ floor5 = samba.dcerpc.epmapper.epm_floor()
+ floor5.lhs = lhs5
+ floor5.rhs.ipaddr = "0.0.0.0"
+
+ floors = [floor1,floor2,floor3,floor4,floor5]
+ req_tower = samba.dcerpc.epmapper.epm_tower()
+ req_tower.num_floors = len(floors)
+ req_tower.floors = floors
+ req_twr = samba.dcerpc.epmapper.epm_twr_t()
+ req_twr.tower = req_tower
+
+ pack_twr = samba.ndr.ndr_pack(req_twr)
+
+ # object
+ stub = "\x01\x00\x00\x00"
+ stub += "\x00" * 16
+ # tower
+ stub += "\x02\x00\x00\x00"
+ stub += pack_twr
+ # padding?
+ stub += "\x00" * 1
+ # handle
+ stub += "\x00" * 20
+ # max_towers
+ stub += "\x04\x00\x00\x00"
+
+ # we do an epm_Map() request
+ req = self.generate_request(call_id = 1,
+ context_id=ctx0.context_id,
+ opnum=3,
+ stub=stub)
+ self.send_pdu(req)
+ rep = self.recv_pdu()
+ self.verify_pdu(rep, samba.dcerpc.dcerpc.DCERPC_PKT_RESPONSE,
+ req.call_id, auth_length=0)
+ self.assertNotEqual(rep.u.alloc_hint, 0)
+ self.assertEqual(rep.u.context_id, req.u.context_id)
+ self.assertEqual(rep.u.cancel_count, 0)
+ self.assertGreaterEqual(len(rep.u.stub_and_verifier), rep.u.alloc_hint)
+
+ num_towers = struct.unpack_from("<I", rep.u.stub_and_verifier, 20)
+ (array_max, array_ofs, array_cnt) = struct.unpack_from("<III", rep.u.stub_and_verifier, 24)
+ status = struct.unpack_from("<I", rep.u.stub_and_verifier, len(rep.u.stub_and_verifier) - 4)
+ self.assertEqual(status[0], 0)
+ self.assertGreaterEqual(num_towers[0], 1)
+ self.assertEqual(array_max, 4)
+ self.assertEqual(array_ofs, 0)
+ self.assertGreaterEqual(array_cnt, 1)
+
+ unpack_twr = rep.u.stub_and_verifier[(36 + 4 * array_cnt):-4]
+ rep_twr = samba.ndr.ndr_unpack(samba.dcerpc.epmapper.epm_twr_t, unpack_twr, allow_remaining=True)
+ self.assertEqual(rep_twr.tower_length, 75)
+ self.assertEqual(rep_twr.tower.num_floors, 5)
+ self.assertEqual(len(rep_twr.tower.floors), 5)
+ self.assertEqual(rep_twr.tower.floors[3].lhs.protocol,
+ samba.dcerpc.epmapper.EPM_PROTOCOL_TCP)
+ self.assertEqual(rep_twr.tower.floors[3].lhs.protocol,
+ samba.dcerpc.epmapper.EPM_PROTOCOL_TCP)
+
+ # reconnect to the given port
+ self._disconnect("epmap_reconnect")
+ self.tcp_port = rep_twr.tower.floors[3].rhs.port
+ self.connect()
+
+ def send_pdu(self, req, ndr_print=None, hexdump=None):
+ if ndr_print is None:
+ ndr_print = self.do_ndr_print
+ if hexdump is None:
+ hexdump = self.do_hexdump
+ try:
+ req_pdu = samba.ndr.ndr_pack(req)
+ if ndr_print:
+ sys.stderr.write("send_pdu: %s" % samba.ndr.ndr_print(req))
+ if hexdump:
+ sys.stderr.write("send_pdu: %d\n%s" % (len(req_pdu), self.hexdump(req_pdu)))
+ while True:
+ sent = self.s.send(req_pdu, 0)
+ if sent == len(req_pdu):
+ break
+ req_pdu = req_pdu[sent:]
+ except socket.error as e:
+ self._disconnect("send_pdu: %s" % e)
+ raise
+ except IOError as e:
+ self._disconnect("send_pdu: %s" % e)
+ raise
+ finally:
+ pass
+
+ def recv_raw(self, hexdump=None, timeout=None):
+ rep_pdu = None
+ if hexdump is None:
+ hexdump = self.do_hexdump
+ try:
+ if timeout is not None:
+ self.s.settimeout(timeout)
+ rep_pdu = self.s.recv(0xffff, 0)
+ self.s.settimeout(10)
+ if len(rep_pdu) == 0:
+ self._disconnect("recv_raw: EOF")
+ return None
+ if hexdump:
+ sys.stderr.write("recv_raw: %d\n%s" % (len(rep_pdu), self.hexdump(rep_pdu)))
+ except socket.timeout as e:
+ self.s.settimeout(10)
+ sys.stderr.write("recv_raw: TIMEOUT\n")
+ pass
+ except socket.error as e:
+ self._disconnect("recv_raw: %s" % e)
+ raise
+ except IOError as e:
+ self._disconnect("recv_raw: %s" % e)
+ raise
+ finally:
+ pass
+ return rep_pdu
+
+ def recv_pdu(self, ndr_print=None, hexdump=None, timeout=None):
+ rep = None
+ if ndr_print is None:
+ ndr_print = self.do_ndr_print
+ if hexdump is None:
+ hexdump = self.do_hexdump
+ try:
+ rep_pdu = self.recv_raw(hexdump=hexdump, timeout=timeout)
+ if rep_pdu is None:
+ return None
+ rep = samba.ndr.ndr_unpack(samba.dcerpc.dcerpc.ncacn_packet, rep_pdu, allow_remaining=True)
+ if ndr_print:
+ sys.stderr.write("recv_pdu: %s" % samba.ndr.ndr_print(rep))
+ self.assertEqual(rep.frag_length, len(rep_pdu))
+ finally:
+ pass
+ return rep
+
+ def generate_auth(self,
+ auth_type=None,
+ auth_level=None,
+ auth_pad_length=0,
+ auth_context_id=None,
+ auth_blob=None,
+ ndr_print=None, hexdump=None):
+ if ndr_print is None:
+ ndr_print = self.do_ndr_print
+ if hexdump is None:
+ hexdump = self.do_hexdump
+
+ if auth_type is not None:
+ a = samba.dcerpc.dcerpc.auth()
+ a.auth_type = auth_type
+ a.auth_level = auth_level
+ a.auth_pad_length = auth_pad_length
+ a.auth_context_id= auth_context_id
+ a.credentials = auth_blob
+
+ ai = samba.ndr.ndr_pack(a)
+ if ndr_print:
+ sys.stderr.write("generate_auth: %s" % samba.ndr.ndr_print(a))
+ if hexdump:
+ sys.stderr.write("generate_auth: %d\n%s" % (len(ai), self.hexdump(ai)))
+ else:
+ ai = ""
+
+ return ai
+
+ def parse_auth(self, auth_info, ndr_print=None, hexdump=None):
+ if ndr_print is None:
+ ndr_print = self.do_ndr_print
+ if hexdump is None:
+ hexdump = self.do_hexdump
+
+ if (len(auth_info) <= samba.dcerpc.dcerpc.DCERPC_AUTH_TRAILER_LENGTH):
+ return None
+
+ if hexdump:
+ sys.stderr.write("parse_auth: %d\n%s" % (len(auth_info), self.hexdump(auth_info)))
+ a = samba.ndr.ndr_unpack(samba.dcerpc.dcerpc.auth, auth_info, allow_remaining=True)
+ if ndr_print:
+ sys.stderr.write("parse_auth: %s" % samba.ndr.ndr_print(a))
+
+ return a
+
+ def generate_pdu(self, ptype, call_id, payload,
+ rpc_vers=5,
+ rpc_vers_minor=0,
+ pfc_flags = samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_FIRST |
+ samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_LAST,
+ drep = [samba.dcerpc.dcerpc.DCERPC_DREP_LE, 0, 0, 0],
+ ndr_print=None, hexdump=None):
+
+ if getattr(payload, 'auth_info', None):
+ ai = payload.auth_info
+ else:
+ ai = ""
+
+ p = samba.dcerpc.dcerpc.ncacn_packet()
+ p.rpc_vers = rpc_vers
+ p.rpc_vers_minor = rpc_vers_minor
+ p.ptype = ptype
+ p.pfc_flags = pfc_flags
+ p.drep = drep
+ p.frag_length = 0
+ if len(ai) > samba.dcerpc.dcerpc.DCERPC_AUTH_TRAILER_LENGTH:
+ p.auth_length = len(ai) - samba.dcerpc.dcerpc.DCERPC_AUTH_TRAILER_LENGTH
+ else:
+ p.auth_length = 0
+ p.call_id = call_id
+ p.u = payload
+
+ pdu = samba.ndr.ndr_pack(p)
+ p.frag_length = len(pdu)
+
+ return p
+
+ def verify_pdu(self, p, ptype, call_id,
+ rpc_vers=5,
+ rpc_vers_minor=0,
+ pfc_flags = samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_FIRST |
+ samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_LAST,
+ drep = [samba.dcerpc.dcerpc.DCERPC_DREP_LE, 0, 0, 0],
+ auth_length=None):
+
+ self.assertIsNotNone(p, "No valid pdu")
+
+ if getattr(p.u, 'auth_info', None):
+ ai = p.u.auth_info
+ else:
+ ai = ""
+
+ self.assertEqual(p.rpc_vers, rpc_vers)
+ self.assertEqual(p.rpc_vers_minor, rpc_vers_minor)
+ self.assertEqual(p.ptype, ptype)
+ self.assertEqual(p.pfc_flags, pfc_flags)
+ self.assertEqual(p.drep, drep)
+ self.assertGreaterEqual(p.frag_length,
+ samba.dcerpc.dcerpc.DCERPC_NCACN_PAYLOAD_OFFSET)
+ if len(ai) > samba.dcerpc.dcerpc.DCERPC_AUTH_TRAILER_LENGTH:
+ self.assertEqual(p.auth_length,
+ len(ai) - samba.dcerpc.dcerpc.DCERPC_AUTH_TRAILER_LENGTH)
+ elif auth_length is not None:
+ self.assertEqual(p.auth_length, auth_length)
+ else:
+ self.assertEqual(p.auth_length, 0)
+ self.assertEqual(p.call_id, call_id)
+
+ return
+
+ def generate_bind(self, call_id,
+ pfc_flags = samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_FIRST |
+ samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_LAST,
+ max_xmit_frag=5840,
+ max_recv_frag=5840,
+ assoc_group_id=0,
+ ctx_list=[],
+ auth_info="",
+ ndr_print=None, hexdump=None):
+
+ b = samba.dcerpc.dcerpc.bind()
+ b.max_xmit_frag = max_xmit_frag
+ b.max_recv_frag = max_recv_frag
+ b.assoc_group_id = assoc_group_id
+ b.num_contexts = len(ctx_list)
+ b.ctx_list = ctx_list
+ b.auth_info = auth_info
+
+ p = self.generate_pdu(ptype=samba.dcerpc.dcerpc.DCERPC_PKT_BIND,
+ pfc_flags=pfc_flags,
+ call_id=call_id,
+ payload=b,
+ ndr_print=ndr_print, hexdump=hexdump)
+
+ return p
+
+ def generate_alter(self, call_id,
+ pfc_flags = samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_FIRST |
+ samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_LAST,
+ max_xmit_frag=5840,
+ max_recv_frag=5840,
+ assoc_group_id=0,
+ ctx_list=[],
+ auth_info="",
+ ndr_print=None, hexdump=None):
+
+ a = samba.dcerpc.dcerpc.bind()
+ a.max_xmit_frag = max_xmit_frag
+ a.max_recv_frag = max_recv_frag
+ a.assoc_group_id = assoc_group_id
+ a.num_contexts = len(ctx_list)
+ a.ctx_list = ctx_list
+ a.auth_info = auth_info
+
+ p = self.generate_pdu(ptype=samba.dcerpc.dcerpc.DCERPC_PKT_ALTER,
+ pfc_flags=pfc_flags,
+ call_id=call_id,
+ payload=a,
+ ndr_print=ndr_print, hexdump=hexdump)
+
+ return p
+
+ def generate_auth3(self, call_id,
+ pfc_flags = samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_FIRST |
+ samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_LAST,
+ auth_info="",
+ ndr_print=None, hexdump=None):
+
+ a = samba.dcerpc.dcerpc.auth3()
+ a.auth_info = auth_info
+
+ p = self.generate_pdu(ptype=samba.dcerpc.dcerpc.DCERPC_PKT_AUTH3,
+ pfc_flags=pfc_flags,
+ call_id=call_id,
+ payload=a,
+ ndr_print=ndr_print, hexdump=hexdump)
+
+ return p
+
+ def generate_request(self, call_id,
+ pfc_flags = samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_FIRST |
+ samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_LAST,
+ alloc_hint=None,
+ context_id=None,
+ opnum=None,
+ object=None,
+ stub=None,
+ auth_info="",
+ ndr_print=None, hexdump=None):
+
+ if alloc_hint is None:
+ alloc_hint = len(stub)
+
+ r = samba.dcerpc.dcerpc.request()
+ r.alloc_hint = alloc_hint
+ r.context_id = context_id
+ r.opnum = opnum
+ if object is not None:
+ r.object = object
+ r.stub_and_verifier = stub + auth_info
+
+ p = self.generate_pdu(ptype=samba.dcerpc.dcerpc.DCERPC_PKT_REQUEST,
+ pfc_flags=pfc_flags,
+ call_id=call_id,
+ payload=r,
+ ndr_print=ndr_print, hexdump=hexdump)
+
+ if len(auth_info) > samba.dcerpc.dcerpc.DCERPC_AUTH_TRAILER_LENGTH:
+ p.auth_length = len(auth_info) - samba.dcerpc.dcerpc.DCERPC_AUTH_TRAILER_LENGTH
+
+ return p
+
+ def generate_co_cancel(self, call_id,
+ pfc_flags = samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_FIRST |
+ samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_LAST,
+ auth_info="",
+ ndr_print=None, hexdump=None):
+
+ c = samba.dcerpc.dcerpc.co_cancel()
+ c.auth_info = auth_info
+
+ p = self.generate_pdu(ptype=samba.dcerpc.dcerpc.DCERPC_PKT_CO_CANCEL,
+ pfc_flags=pfc_flags,
+ call_id=call_id,
+ payload=c,
+ ndr_print=ndr_print, hexdump=hexdump)
+
+ return p
+
+ def generate_orphaned(self, call_id,
+ pfc_flags = samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_FIRST |
+ samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_LAST,
+ auth_info="",
+ ndr_print=None, hexdump=None):
+
+ o = samba.dcerpc.dcerpc.orphaned()
+ o.auth_info = auth_info
+
+ p = self.generate_pdu(ptype=samba.dcerpc.dcerpc.DCERPC_PKT_ORPHANED,
+ pfc_flags=pfc_flags,
+ call_id=call_id,
+ payload=o,
+ ndr_print=ndr_print, hexdump=hexdump)
+
+ return p
+
+ def generate_shutdown(self, call_id,
+ pfc_flags = samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_FIRST |
+ samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_LAST,
+ ndr_print=None, hexdump=None):
+
+ s = samba.dcerpc.dcerpc.shutdown()
+
+ p = self.generate_pdu(ptype=samba.dcerpc.dcerpc.DCERPC_PKT_SHUTDOWN,
+ pfc_flags=pfc_flags,
+ call_id=call_id,
+ payload=s,
+ ndr_print=ndr_print, hexdump=hexdump)
+
+ return p
+
+ def assertIsConnected(self):
+ self.assertIsNotNone(self.s, msg="Not connected")
+ return
+
+ def assertNotConnected(self):
+ self.assertIsNone(self.s, msg="Is connected")
+ return
+
+ def assertNDRSyntaxEquals(self, s1, s2):
+ self.assertEqual(s1.uuid, s2.uuid)
+ self.assertEqual(s1.if_version, s2.if_version)
+ return
class ValidNetbiosNameTests(TestCase):