python:tests/rpcd_witness_samba_only: add tests for 'net witness {client,share}-move'
authorStefan Metzmacher <metze@samba.org>
Mon, 15 Jan 2024 13:20:00 +0000 (14:20 +0100)
committerStefan Metzmacher <metze@samba.org>
Fri, 26 Jan 2024 17:00:33 +0000 (17:00 +0000)
Signed-off-by: Stefan Metzmacher <metze@samba.org>
Reviewed-by: Günther Deschner <gd@samba.org>
python/samba/tests/blackbox/rpcd_witness_samba_only.py

index 3467e988385d3863d15f66f24239a47f6b429c76..ca6b6210f506d7918d3410fb13352a8eb1407e2e 100755 (executable)
@@ -171,17 +171,23 @@ class RpcdWitnessSambaTests(BlackboxTestCase):
 
     def call_net_witness_subcmd(self, subcmd,
                                 as_json=False,
+                                apply_to_all=False,
                                 registration=None,
                                 net_name=None,
                                 share_name=None,
                                 ip_address=None,
-                                client_computer=None):
+                                client_computer=None,
+                                new_ip=None,
+                                new_node=None):
         COMMAND = "UID_WRAPPER_ROOT=1 bin/net witness"
 
         argv = "%s %s" % (COMMAND, subcmd)
         if as_json:
             argv += " --json"
 
+        if apply_to_all:
+            argv += " --witness-apply-to-all"
+
         if registration is not None:
             argv += " --witness-registration='%s'" % (
                     registration.uuid)
@@ -198,6 +204,12 @@ class RpcdWitnessSambaTests(BlackboxTestCase):
         if client_computer is not None:
             argv += " --witness-client-computer-name='%s'" % (client_computer)
 
+        if new_ip is not None:
+            argv += " --witness-new-ip='%s'" % (new_ip)
+
+        if new_node is not None:
+            argv += " --witness-new-node='%s'" % (new_node)
+
         try:
             if self.verbose:
                 print("Calling: %s" % argv)
@@ -312,6 +324,40 @@ class RpcdWitnessSambaTests(BlackboxTestCase):
         expected_resource_changes = [expected_resource_change]
         self.assertResourceChanges(response, expected_resource_changes)
 
+    def assertGenericIpLists(self, response, expected_type, expected_ip_lists):
+        self.assertIsNotNone(response)
+        self.assertEqual(response.type, expected_type)
+        self.assertEqual(response.num, len(expected_ip_lists))
+        self.assertEqual(len(response.messages), len(expected_ip_lists))
+        for li in range(0, len(expected_ip_lists)):
+
+            expected_ip_list = expected_ip_lists[li]
+            ip_list = response.messages[li]
+            self.assertIsNotNone(ip_list)
+            self.assertEqual(ip_list.num, len(expected_ip_list))
+
+            for i in range(0, len(expected_ip_list)):
+                ip_info = ip_list.addr[i]
+
+                expected_flags = 0
+                expected_flags |= witness.WITNESS_IPADDR_V4
+                expected_flags |= witness.WITNESS_IPADDR_ONLINE
+                expected_flags = expected_ip_list[i].get('flags', expected_flags)
+
+                expected_ipv4 = '0.0.0.0'
+                expected_ipv4 = expected_ip_list[i].get('ipv4', expected_ipv4)
+
+                expected_ipv6 = '0000:0000:0000:0000:0000:0000:0000:0000'
+                expected_ipv6 = expected_ip_list[i].get('ipv6', expected_ipv6)
+
+                self.assertEqual(ip_info.flags, expected_flags)
+
+                self.assertIsNotNone(ip_info.ipv4)
+                self.assertEqual(ip_info.ipv4, expected_ipv4)
+
+                self.assertIsNotNone(ip_info.ipv6)
+                self.assertEqual(ip_info.ipv6, expected_ipv6)
+
     @classmethod
     def _define_ResourceChangeCTDB_tests(cls, conn_idx, monitor_idx, ndr64=False):
         if ndr64:
@@ -716,11 +762,16 @@ class RpcdWitnessSambaTests(BlackboxTestCase):
     def check_net_witness_output(self,
                                  cmd,
                                  regs,
+                                 apply_to_all=False,
                                  registration_idx=None,
                                  net_name=None,
                                  share_name=None,
                                  ip_address=None,
-                                 client_computer=None):
+                                 client_computer=None,
+                                 new_ip=None,
+                                 new_node=None,
+                                 expected_msg_type=None,
+                                 callback=None):
         self.open_all_registrations()
         if registration_idx is not None:
             registration = regs[registration_idx]['context']
@@ -729,17 +780,24 @@ class RpcdWitnessSambaTests(BlackboxTestCase):
             registration = None
 
         plain_res = self.call_net_witness_subcmd(cmd,
+                                                 apply_to_all=apply_to_all,
                                                  registration=registration,
                                                  net_name=net_name,
                                                  share_name=share_name,
                                                  ip_address=ip_address,
-                                                 client_computer=client_computer)
+                                                 client_computer=client_computer,
+                                                 new_ip=new_ip,
+                                                 new_node=new_node)
         if self.verbose:
             print("%s" % plain_res)
         plain_lines = plain_res.splitlines()
 
         num_headlines = 2
+        if expected_msg_type:
+            num_headlines += 1
         self.assertEqual(len(plain_lines), num_headlines+len(regs))
+        if expected_msg_type:
+            self.assertIn(expected_msg_type, plain_lines[0])
         plain_lines = plain_lines[num_headlines:]
         self.assertEqual(len(plain_lines), len(regs))
 
@@ -765,6 +823,9 @@ class RpcdWitnessSambaTests(BlackboxTestCase):
                 self.assertEqual(line, expected_line)
             self.assertIsNotNone(line)
 
+            if callback is not None:
+                callback(reg)
+
         self.close_all_registrations()
 
         self.open_all_registrations()
@@ -776,13 +837,18 @@ class RpcdWitnessSambaTests(BlackboxTestCase):
 
         json_res = self.call_net_witness_subcmd(cmd,
                                                 as_json=True,
+                                                apply_to_all=apply_to_all,
                                                 registration=registration,
                                                 net_name=net_name,
                                                 share_name=share_name,
                                                 ip_address=ip_address,
-                                                client_computer=client_computer)
+                                                client_computer=client_computer,
+                                                new_ip=new_ip,
+                                                new_node=new_node)
 
         num_filters = 0
+        if apply_to_all:
+            num_filters += 1
         if registration:
             num_filters += 1
         if net_name:
@@ -795,14 +861,21 @@ class RpcdWitnessSambaTests(BlackboxTestCase):
             num_filters += 1
 
         num_toplevel = 2
+        if expected_msg_type:
+            num_toplevel += 1
 
         self.assertIn('filters', json_res);
+        if expected_msg_type:
+            self.assertIn('message', json_res);
         self.assertIn('registrations', json_res);
         self.assertEqual(len(json_res.keys()), num_toplevel)
 
         json_filters = json_res['filters']
         self.assertEqual(len(json_filters.keys()), num_filters)
 
+        if apply_to_all:
+            self.assertTrue(json_filters['--witness-apply-to-all'])
+
         if registration:
             self.assertEqual(json_filters['--witness-registration'],
                              str(registration.uuid))
@@ -818,6 +891,22 @@ class RpcdWitnessSambaTests(BlackboxTestCase):
         if client_computer:
             self.assertEqual(json_filters['--witness-client-computer-name'],
                              client_computer)
+        if expected_msg_type:
+            json_message = json_res['message']
+            num_sub = 1
+            self.assertEqual(json_message['type'], expected_msg_type);
+
+            if new_ip is not None:
+                num_sub += 1
+                self.assertEqual(json_message['new_ip'], new_ip);
+            elif new_node == -1:
+                num_sub += 1
+                self.assertTrue(json_message['all_nodes'])
+            elif new_node is not None:
+                num_sub += 1
+                self.assertEqual(json_message['new_node'], new_node)
+
+            self.assertEqual(len(json_message.keys()), num_sub)
 
         json_regs = json_res['registrations']
         self.assertEqual(len(json_regs.keys()), len(regs))
@@ -829,6 +918,9 @@ class RpcdWitnessSambaTests(BlackboxTestCase):
             json_reg = json_regs[str(reg_uuid)]
             self.assertJsonReg(json_reg, reg)
 
+            if callback is not None:
+                callback(reg)
+
         self.close_all_registrations()
 
     def check_combinations(self, check_func, only_shares=False):
@@ -846,6 +938,10 @@ class RpcdWitnessSambaTests(BlackboxTestCase):
             else:
                 no_share_name_regs.append(reg)
 
+        if only_shares:
+            all_regs = all_share_name_regs
+            no_share_name_regs = []
+
         ip_address_regs = {}
         computer_name_regs = {}
         for reg in all_regs:
@@ -864,7 +960,8 @@ class RpcdWitnessSambaTests(BlackboxTestCase):
         all_computer_names = '|'.join(computer_name_regs.keys())
         common_computer_name = self.max_common_prefix(computer_name_regs.keys())
 
-        check_func(all_regs)
+        check_func(all_regs,
+                   apply_to_all=True)
         check_func(all_regs,
                    net_name=self.server_hostname)
         check_func(all_regs,
@@ -916,13 +1013,17 @@ class RpcdWitnessSambaTests(BlackboxTestCase):
 
     def test_net_witness_list(self):
         def check_list(regs,
+                       apply_to_all=False,
                        registration_idx=None,
                        net_name=None,
                        share_name=None,
                        ip_address=None,
                        client_computer=None):
+            # --witness-apply-to-all is not needed for 'list'
+            apply_to_all = None
             return self.check_net_witness_output('list',
                                                  regs,
+                                                 apply_to_all=apply_to_all,
                                                  registration_idx=registration_idx,
                                                  net_name=net_name,
                                                  share_name=share_name,
@@ -931,6 +1032,112 @@ class RpcdWitnessSambaTests(BlackboxTestCase):
 
         self.check_combinations(check_list)
 
+    def _test_net_witness_generic_move(self,
+                                       move_cmd,
+                                       msg_type_prefix,
+                                       msg_type):
+        def _check_generic_move(regs,
+                                apply_to_all=False,
+                                registration_idx=None,
+                                net_name=None,
+                                share_name=None,
+                                ip_address=None,
+                                client_computer=None,
+                                new_ip=None,
+                                new_node=None):
+
+            if new_ip:
+                expected_msg_type = "%s_IPV4" % msg_type_prefix
+            else:
+                expected_msg_type = "%s_NODE" % msg_type_prefix
+
+            expected_ip_list = []
+            if new_ip:
+                ip = { 'ipv4': str(new_ip), }
+                expected_ip_list.append(ip)
+            if new_node == -1:
+                for node_idx in range(0, len(self.nodes)):
+                    node = self.nodes[node_idx]
+                    ip = { 'ipv4': str(node['ip']), }
+                    expected_ip_list.append(ip)
+            elif new_node is not None:
+                node = self.nodes[new_node]
+                ip = { 'ipv4': str(node['ip']), }
+                expected_ip_list.append(ip)
+
+            expected_ip_lists = [expected_ip_list]
+
+            def check_generic_move_response(reg):
+                conn = reg['conn']
+                reg_context = reg['context']
+                response = conn.AsyncNotify(reg_context)
+                self.assertGenericIpLists(response, msg_type, expected_ip_lists)
+
+            return self.check_net_witness_output(move_cmd,
+                                                 regs,
+                                                 apply_to_all=apply_to_all,
+                                                 registration_idx=registration_idx,
+                                                 net_name=net_name,
+                                                 share_name=share_name,
+                                                 ip_address=ip_address,
+                                                 client_computer=client_computer,
+                                                 new_ip=new_ip,
+                                                 new_node=new_node,
+                                                 expected_msg_type=expected_msg_type,
+                                                 callback=check_generic_move_response)
+
+        def check_generic_move(regs,
+                               apply_to_all=False,
+                               registration_idx=None,
+                               net_name=None,
+                               share_name=None,
+                               ip_address=None,
+                               client_computer=None):
+            _check_generic_move(regs,
+                                apply_to_all=apply_to_all,
+                                registration_idx=registration_idx,
+                                net_name=net_name,
+                                share_name=share_name,
+                                ip_address=ip_address,
+                                client_computer=client_computer,
+                                new_node=-1)
+
+            for node_idx in range(0, len(self.nodes)):
+                node = self.nodes[node_idx]
+
+                _check_generic_move(regs,
+                                    apply_to_all=apply_to_all,
+                                    registration_idx=registration_idx,
+                                    net_name=net_name,
+                                    share_name=share_name,
+                                    ip_address=ip_address,
+                                    client_computer=client_computer,
+                                    new_node=node_idx)
+                _check_generic_move(regs,
+                                    apply_to_all=apply_to_all,
+                                    registration_idx=registration_idx,
+                                    net_name=net_name,
+                                    share_name=share_name,
+                                    ip_address=ip_address,
+                                    client_computer=client_computer,
+                                    new_ip=node['ip'])
+
+        if msg_type == witness.WITNESS_NOTIFY_CLIENT_MOVE:
+            only_shares = False
+        elif msg_type == witness.WITNESS_NOTIFY_SHARE_MOVE:
+            only_shares = True
+
+        self.check_combinations(check_generic_move, only_shares=only_shares)
+
+    def test_net_witness_client_move(self):
+        self._test_net_witness_generic_move('client-move',
+                                            'CLIENT_MOVE_TO',
+                                            witness.WITNESS_NOTIFY_CLIENT_MOVE)
+    def test_net_witness_share_move(self):
+        self._test_net_witness_generic_move('share-move',
+                                            'SHARE_MOVE_TO',
+                                            witness.WITNESS_NOTIFY_SHARE_MOVE)
+
 if __name__ == "__main__":
     import unittest
     unittest.main()