6834746f69a771faec755f3eb1afec95d7862918
[ddiss/samba.git] / source4 / scripting / python / samba / tests / dcerpc / dnsserver.py
1 #!/usr/bin/env python
2
3 # Unix SMB/CIFS implementation.
4 # Copyright (C) Amitay Isaacs <amitay@gmail.com> 2011
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 3 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 # GNU General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
18 #
19
20 """Tests for samba.dcerpc.dnsserver"""
21
22 from samba.dcerpc import dnsp, dnsserver
23 from samba.tests import RpcInterfaceTestCase, env_get_var_value
24 from samba.netcmd.dns import ARecord
25
26 class DnsserverTests(RpcInterfaceTestCase):
27
28     def setUp(self):
29         super(DnsserverTests, self).setUp()
30         self.server = env_get_var_value("SERVER_IP")
31         self.zone = env_get_var_value("REALM").lower()
32         self.conn = dnsserver.dnsserver("ncacn_ip_tcp:%s" % (self.server),
33                                         self.get_loadparm(),
34                                         self.get_credentials())
35
36     def test_operation2(self):
37         pass
38
39
40     def test_query2(self):
41         typeid, result = self.conn.DnssrvQuery2(dnsserver.DNS_CLIENT_VERSION_W2K,
42                                                 0,
43                                                 self.server,
44                                                 None,
45                                                 'ServerInfo')
46         self.assertEquals(dnsserver.DNSSRV_TYPEID_SERVER_INFO_W2K, typeid)
47
48         typeid, result = self.conn.DnssrvQuery2(dnsserver.DNS_CLIENT_VERSION_DOTNET,
49                                                 0,
50                                                 self.server,
51                                                 None,
52                                                 'ServerInfo')
53         self.assertEquals(dnsserver.DNSSRV_TYPEID_SERVER_INFO_DOTNET, typeid)
54
55         typeid, result = self.conn.DnssrvQuery2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
56                                                 0,
57                                                 self.server,
58                                                 None,
59                                                 'ServerInfo')
60         self.assertEquals(dnsserver.DNSSRV_TYPEID_SERVER_INFO, typeid)
61
62
63     def test_complexoperation2(self):
64         client_version = dnsserver.DNS_CLIENT_VERSION_LONGHORN
65         request_filter = (dnsserver.DNS_ZONE_REQUEST_FORWARD |
66                             dnsserver.DNS_ZONE_REQUEST_PRIMARY)
67         typeid, zones = self.conn.DnssrvComplexOperation2(client_version,
68                                                             0,
69                                                             self.server,
70                                                             None,
71                                                             'EnumZones',
72                                                             dnsserver.DNSSRV_TYPEID_DWORD,
73                                                             request_filter)
74         self.assertEquals(dnsserver.DNSSRV_TYPEID_ZONE_LIST, typeid)
75         self.assertEquals(2, zones.dwZoneCount)
76
77         request_filter = (dnsserver.DNS_ZONE_REQUEST_REVERSE |
78                             dnsserver.DNS_ZONE_REQUEST_PRIMARY)
79         typeid, zones = self.conn.DnssrvComplexOperation2(client_version,
80                                                             0,
81                                                             self.server,
82                                                             None,
83                                                             'EnumZones',
84                                                             dnsserver.DNSSRV_TYPEID_DWORD,
85                                                             request_filter)
86         self.assertEquals(dnsserver.DNSSRV_TYPEID_ZONE_LIST, typeid)
87         self.assertEquals(0, zones.dwZoneCount)
88
89
90     def test_enumrecords2(self):
91         client_version = dnsserver.DNS_CLIENT_VERSION_LONGHORN
92         record_type = dnsp.DNS_TYPE_NS
93         select_flags = (dnsserver.DNS_RPC_VIEW_ROOT_HINT_DATA |
94                         dnsserver.DNS_RPC_VIEW_ADDITIONAL_DATA)
95         buflen, roothints = self.conn.DnssrvEnumRecords2(client_version,
96                                                             0,
97                                                             self.server,
98                                                             '..RootHints',
99                                                             '.',
100                                                             None,
101                                                             record_type,
102                                                             select_flags,
103                                                             None,
104                                                             None)
105         self.assertEquals(14, roothints.count)  # 1 NS + 13 A records (a-m)
106
107
108     def test_updaterecords2(self):
109         client_version = dnsserver.DNS_CLIENT_VERSION_LONGHORN
110         record_type = dnsp.DNS_TYPE_A
111         select_flags = dnsserver.DNS_RPC_VIEW_AUTHORITY_DATA
112
113         name = 'dummy'
114         rec = ARecord('1.2.3.4')
115         rec2 = ARecord('5.6.7.8')
116
117         # Add record
118         add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
119         add_rec_buf.rec = rec
120         self.conn.DnssrvUpdateRecord2(client_version,
121                                         0,
122                                         self.server,
123                                         self.zone,
124                                         name,
125                                         add_rec_buf,
126                                         None)
127
128         buflen, result = self.conn.DnssrvEnumRecords2(client_version,
129                                                         0,
130                                                         self.server,
131                                                         self.zone,
132                                                         name,
133                                                         None,
134                                                         record_type,
135                                                         select_flags,
136                                                         None,
137                                                         None)
138         self.assertEquals(1, result.count)
139         self.assertEquals(1, result.rec[0].wRecordCount)
140         self.assertEquals(dnsp.DNS_TYPE_A, result.rec[0].records[0].wType)
141         self.assertEquals('1.2.3.4', result.rec[0].records[0].data)
142
143         # Update record
144         add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
145         add_rec_buf.rec = rec2
146         del_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
147         del_rec_buf.rec = rec
148         self.conn.DnssrvUpdateRecord2(client_version,
149                                         0,
150                                         self.server,
151                                         self.zone,
152                                         name,
153                                         add_rec_buf,
154                                         del_rec_buf)
155
156         buflen, result = self.conn.DnssrvEnumRecords2(client_version,
157                                                         0,
158                                                         self.server,
159                                                         self.zone,
160                                                         name,
161                                                         None,
162                                                         record_type,
163                                                         select_flags,
164                                                         None,
165                                                         None)
166         self.assertEquals(1, result.count)
167         self.assertEquals(1, result.rec[0].wRecordCount)
168         self.assertEquals(dnsp.DNS_TYPE_A, result.rec[0].records[0].wType)
169         self.assertEquals('5.6.7.8', result.rec[0].records[0].data)
170
171         # Delete record
172         del_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
173         del_rec_buf.rec = rec2
174         self.conn.DnssrvUpdateRecord2(client_version,
175                                         0,
176                                         self.server,
177                                         self.zone,
178                                         name,
179                                         None,
180                                         del_rec_buf)
181
182         self.assertRaises(RuntimeError, self.conn.DnssrvEnumRecords2,
183                                         client_version,
184                                         0,
185                                         self.server,
186                                         self.zone,
187                                         name,
188                                         None,
189                                         record_type,
190                                         select_flags,
191                                         None,
192                                         None)