s4-selftest: Add tests for RPC dnsserver
[rusty/samba.git] / source4 / scripting / python / samba / tests / dcerpc / dnsserver.py
1 #!/usr/bin/env python
2
3 # Unix SMB/CIFS implementation.
4 # Copyright (C) Amitay Isaacs <amitay@gmail.com> 2011
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 3 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 # GNU General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
18 #
19
20 """Tests for samba.dcerpc.dnsserver"""
21
22 from samba.dcerpc import dnsp, dnsserver
23 from samba.tests import RpcInterfaceTestCase, env_get_var_value
24 from samba.netcmd.dns import ARecord
25
26 class DnsserverTests(RpcInterfaceTestCase):
27
28     def setUp(self):
29         super(DnsserverTests, self).setUp()
30         self.server = env_get_var_value("SERVER_IP")
31         self.zone = env_get_var_value("REALM").lower()
32         self.conn = dnsserver.dnsserver("ncacn_ip_tcp:%s" % (self.server),
33                                         self.get_loadparm(),
34                                         self.get_credentials())
35
36     def test_operation2(self):
37         pass
38
39
40     def test_query2(self):
41         typeid, result = self.conn.DnssrvQuery2(dnsserver.DNS_CLIENT_VERSION_W2K,
42                                                 0,
43                                                 self.server,
44                                                 None,
45                                                 'ServerInfo')
46         self.assertEquals(dnsserver.DNSSRV_TYPEID_SERVER_INFO_W2K, typeid)
47
48         typeid, result = self.conn.DnssrvQuery2(dnsserver.DNS_CLIENT_VERSION_DOTNET,
49                                                 0,
50                                                 self.server,
51                                                 None,
52                                                 'ServerInfo')
53         self.assertEquals(dnsserver.DNSSRV_TYPEID_SERVER_INFO_DOTNET, typeid)
54
55         typeid, result = self.conn.DnssrvQuery2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
56                                                 0,
57                                                 self.server,
58                                                 None,
59                                                 'ServerInfo')
60         self.assertEquals(dnsserver.DNSSRV_TYPEID_SERVER_INFO, typeid)
61
62
63     def test_complexoepration2(self):
64         client_version = dnsserver.DNS_CLIENT_VERSION_LONGHORN
65         request_filter = dnsserver.DNS_ZONE_REQUEST_PRIMARY
66         typeid, zones = self.conn.DnssrvComplexOperation2(client_version,
67                                                             0,
68                                                             self.server,
69                                                             None,
70                                                             'EnumZones',
71                                                             dnsserver.DNSSRV_TYPEID_DWORD,
72                                                             request_filter)
73         self.assertEquals(dnsserver.DNSSRV_TYPEID_ZONE_LIST, typeid)
74         self.assertEquals(2, zones.dwZoneCount)
75
76
77     def test_enumrecords2(self):
78         client_version = dnsserver.DNS_CLIENT_VERSION_LONGHORN
79         record_type = dnsp.DNS_TYPE_NS
80         select_flags = (dnsserver.DNS_RPC_VIEW_ROOT_HINT_DATA |
81                         dnsserver.DNS_RPC_VIEW_ADDITIONAL_DATA)
82         buflen, roothints = self.conn.DnssrvEnumRecords2(client_version,
83                                                             0,
84                                                             self.server,
85                                                             '..RootHints',
86                                                             '.',
87                                                             None,
88                                                             record_type,
89                                                             select_flags,
90                                                             None,
91                                                             None)
92         self.assertEquals(14, roothints.count)  # 1 NS + 13 A records (a-m)
93
94
95     def test_updaterecords2(self):
96         client_version = dnsserver.DNS_CLIENT_VERSION_LONGHORN
97         record_type = dnsp.DNS_TYPE_A
98         select_flags = dnsserver.DNS_RPC_VIEW_AUTHORITY_DATA
99
100         name = 'dummy'
101         rec = ARecord('1.2.3.4')
102         rec2 = ARecord('5.6.7.8')
103
104         # Add record
105         add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
106         add_rec_buf.rec = rec
107         self.conn.DnssrvUpdateRecord2(client_version,
108                                         0,
109                                         self.server,
110                                         self.zone,
111                                         name,
112                                         add_rec_buf,
113                                         None)
114
115         buflen, result = self.conn.DnssrvEnumRecords2(client_version,
116                                                         0,
117                                                         self.server,
118                                                         self.zone,
119                                                         name,
120                                                         None,
121                                                         record_type,
122                                                         select_flags,
123                                                         None,
124                                                         None)
125         self.assertEquals(1, result.count)
126         self.assertEquals(1, result.rec[0].wRecordCount)
127         self.assertEquals(dnsp.DNS_TYPE_A, result.rec[0].records[0].wType)
128         self.assertEquals('1.2.3.4', result.rec[0].records[0].data)
129
130         # Update record
131         add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
132         add_rec_buf.rec = rec2
133         del_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
134         del_rec_buf.rec = rec
135         self.conn.DnssrvUpdateRecord2(client_version,
136                                         0,
137                                         self.server,
138                                         self.zone,
139                                         name,
140                                         add_rec_buf,
141                                         del_rec_buf)
142
143         buflen, result = self.conn.DnssrvEnumRecords2(client_version,
144                                                         0,
145                                                         self.server,
146                                                         self.zone,
147                                                         name,
148                                                         None,
149                                                         record_type,
150                                                         select_flags,
151                                                         None,
152                                                         None)
153         self.assertEquals(1, result.count)
154         self.assertEquals(1, result.rec[0].wRecordCount)
155         self.assertEquals(dnsp.DNS_TYPE_A, result.rec[0].records[0].wType)
156         self.assertEquals('5.6.7.8', result.rec[0].records[0].data)
157
158         # Delete record
159         del_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
160         del_rec_buf.rec = rec2
161         self.conn.DnssrvUpdateRecord2(client_version,
162                                         0,
163                                         self.server,
164                                         self.zone,
165                                         name,
166                                         None,
167                                         del_rec_buf)
168
169         try:
170             buflen, result = self.conn.DnssrvEnumRecords2(client_version,
171                                                             0,
172                                                             self.server,
173                                                             self.zone,
174                                                             name,
175                                                             None,
176                                                             record_type,
177                                                             select_flags,
178                                                             None,
179                                                             None)
180         except RuntimeError, e:
181             self.assertEquals("(9714, 'WERR_DNS_ERROR_NAME_DOES_NOT_EXIST')", str(e))