def call_net_witness_subcmd(self, subcmd,
as_json=False,
+ apply_to_all=False,
registration=None,
net_name=None,
share_name=None,
ip_address=None,
- client_computer=None):
+ client_computer=None,
+ new_ip=None,
+ new_node=None):
COMMAND = "UID_WRAPPER_ROOT=1 bin/net witness"
argv = "%s %s" % (COMMAND, subcmd)
if as_json:
argv += " --json"
+ if apply_to_all:
+ argv += " --witness-apply-to-all"
+
if registration is not None:
argv += " --witness-registration='%s'" % (
registration.uuid)
if client_computer is not None:
argv += " --witness-client-computer-name='%s'" % (client_computer)
+ if new_ip is not None:
+ argv += " --witness-new-ip='%s'" % (new_ip)
+
+ if new_node is not None:
+ argv += " --witness-new-node='%s'" % (new_node)
+
try:
if self.verbose:
print("Calling: %s" % argv)
expected_resource_changes = [expected_resource_change]
self.assertResourceChanges(response, expected_resource_changes)
+ def assertGenericIpLists(self, response, expected_type, expected_ip_lists):
+ self.assertIsNotNone(response)
+ self.assertEqual(response.type, expected_type)
+ self.assertEqual(response.num, len(expected_ip_lists))
+ self.assertEqual(len(response.messages), len(expected_ip_lists))
+ for li in range(0, len(expected_ip_lists)):
+
+ expected_ip_list = expected_ip_lists[li]
+ ip_list = response.messages[li]
+ self.assertIsNotNone(ip_list)
+ self.assertEqual(ip_list.num, len(expected_ip_list))
+
+ for i in range(0, len(expected_ip_list)):
+ ip_info = ip_list.addr[i]
+
+ expected_flags = 0
+ expected_flags |= witness.WITNESS_IPADDR_V4
+ expected_flags |= witness.WITNESS_IPADDR_ONLINE
+ expected_flags = expected_ip_list[i].get('flags', expected_flags)
+
+ expected_ipv4 = '0.0.0.0'
+ expected_ipv4 = expected_ip_list[i].get('ipv4', expected_ipv4)
+
+ expected_ipv6 = '0000:0000:0000:0000:0000:0000:0000:0000'
+ expected_ipv6 = expected_ip_list[i].get('ipv6', expected_ipv6)
+
+ self.assertEqual(ip_info.flags, expected_flags)
+
+ self.assertIsNotNone(ip_info.ipv4)
+ self.assertEqual(ip_info.ipv4, expected_ipv4)
+
+ self.assertIsNotNone(ip_info.ipv6)
+ self.assertEqual(ip_info.ipv6, expected_ipv6)
+
@classmethod
def _define_ResourceChangeCTDB_tests(cls, conn_idx, monitor_idx, ndr64=False):
if ndr64:
def check_net_witness_output(self,
cmd,
regs,
+ apply_to_all=False,
registration_idx=None,
net_name=None,
share_name=None,
ip_address=None,
- client_computer=None):
+ client_computer=None,
+ new_ip=None,
+ new_node=None,
+ expected_msg_type=None,
+ callback=None):
self.open_all_registrations()
if registration_idx is not None:
registration = regs[registration_idx]['context']
registration = None
plain_res = self.call_net_witness_subcmd(cmd,
+ apply_to_all=apply_to_all,
registration=registration,
net_name=net_name,
share_name=share_name,
ip_address=ip_address,
- client_computer=client_computer)
+ client_computer=client_computer,
+ new_ip=new_ip,
+ new_node=new_node)
if self.verbose:
print("%s" % plain_res)
plain_lines = plain_res.splitlines()
num_headlines = 2
+ if expected_msg_type:
+ num_headlines += 1
self.assertEqual(len(plain_lines), num_headlines+len(regs))
+ if expected_msg_type:
+ self.assertIn(expected_msg_type, plain_lines[0])
plain_lines = plain_lines[num_headlines:]
self.assertEqual(len(plain_lines), len(regs))
self.assertEqual(line, expected_line)
self.assertIsNotNone(line)
+ if callback is not None:
+ callback(reg)
+
self.close_all_registrations()
self.open_all_registrations()
json_res = self.call_net_witness_subcmd(cmd,
as_json=True,
+ apply_to_all=apply_to_all,
registration=registration,
net_name=net_name,
share_name=share_name,
ip_address=ip_address,
- client_computer=client_computer)
+ client_computer=client_computer,
+ new_ip=new_ip,
+ new_node=new_node)
num_filters = 0
+ if apply_to_all:
+ num_filters += 1
if registration:
num_filters += 1
if net_name:
num_filters += 1
num_toplevel = 2
+ if expected_msg_type:
+ num_toplevel += 1
self.assertIn('filters', json_res);
+ if expected_msg_type:
+ self.assertIn('message', json_res);
self.assertIn('registrations', json_res);
self.assertEqual(len(json_res.keys()), num_toplevel)
json_filters = json_res['filters']
self.assertEqual(len(json_filters.keys()), num_filters)
+ if apply_to_all:
+ self.assertTrue(json_filters['--witness-apply-to-all'])
+
if registration:
self.assertEqual(json_filters['--witness-registration'],
str(registration.uuid))
if client_computer:
self.assertEqual(json_filters['--witness-client-computer-name'],
client_computer)
+ if expected_msg_type:
+ json_message = json_res['message']
+ num_sub = 1
+ self.assertEqual(json_message['type'], expected_msg_type);
+
+ if new_ip is not None:
+ num_sub += 1
+ self.assertEqual(json_message['new_ip'], new_ip);
+ elif new_node == -1:
+ num_sub += 1
+ self.assertTrue(json_message['all_nodes'])
+ elif new_node is not None:
+ num_sub += 1
+ self.assertEqual(json_message['new_node'], new_node)
+
+ self.assertEqual(len(json_message.keys()), num_sub)
json_regs = json_res['registrations']
self.assertEqual(len(json_regs.keys()), len(regs))
json_reg = json_regs[str(reg_uuid)]
self.assertJsonReg(json_reg, reg)
+ if callback is not None:
+ callback(reg)
+
self.close_all_registrations()
def check_combinations(self, check_func, only_shares=False):
else:
no_share_name_regs.append(reg)
+ if only_shares:
+ all_regs = all_share_name_regs
+ no_share_name_regs = []
+
ip_address_regs = {}
computer_name_regs = {}
for reg in all_regs:
all_computer_names = '|'.join(computer_name_regs.keys())
common_computer_name = self.max_common_prefix(computer_name_regs.keys())
- check_func(all_regs)
+ check_func(all_regs,
+ apply_to_all=True)
check_func(all_regs,
net_name=self.server_hostname)
check_func(all_regs,
def test_net_witness_list(self):
def check_list(regs,
+ apply_to_all=False,
registration_idx=None,
net_name=None,
share_name=None,
ip_address=None,
client_computer=None):
+ # --witness-apply-to-all is not needed for 'list'
+ apply_to_all = None
return self.check_net_witness_output('list',
regs,
+ apply_to_all=apply_to_all,
registration_idx=registration_idx,
net_name=net_name,
share_name=share_name,
self.check_combinations(check_list)
+ def _test_net_witness_generic_move(self,
+ move_cmd,
+ msg_type_prefix,
+ msg_type):
+ def _check_generic_move(regs,
+ apply_to_all=False,
+ registration_idx=None,
+ net_name=None,
+ share_name=None,
+ ip_address=None,
+ client_computer=None,
+ new_ip=None,
+ new_node=None):
+
+ if new_ip:
+ expected_msg_type = "%s_IPV4" % msg_type_prefix
+ else:
+ expected_msg_type = "%s_NODE" % msg_type_prefix
+
+ expected_ip_list = []
+ if new_ip:
+ ip = { 'ipv4': str(new_ip), }
+ expected_ip_list.append(ip)
+ if new_node == -1:
+ for node_idx in range(0, len(self.nodes)):
+ node = self.nodes[node_idx]
+ ip = { 'ipv4': str(node['ip']), }
+ expected_ip_list.append(ip)
+ elif new_node is not None:
+ node = self.nodes[new_node]
+ ip = { 'ipv4': str(node['ip']), }
+ expected_ip_list.append(ip)
+
+ expected_ip_lists = [expected_ip_list]
+
+ def check_generic_move_response(reg):
+ conn = reg['conn']
+ reg_context = reg['context']
+ response = conn.AsyncNotify(reg_context)
+ self.assertGenericIpLists(response, msg_type, expected_ip_lists)
+
+ return self.check_net_witness_output(move_cmd,
+ regs,
+ apply_to_all=apply_to_all,
+ registration_idx=registration_idx,
+ net_name=net_name,
+ share_name=share_name,
+ ip_address=ip_address,
+ client_computer=client_computer,
+ new_ip=new_ip,
+ new_node=new_node,
+ expected_msg_type=expected_msg_type,
+ callback=check_generic_move_response)
+
+ def check_generic_move(regs,
+ apply_to_all=False,
+ registration_idx=None,
+ net_name=None,
+ share_name=None,
+ ip_address=None,
+ client_computer=None):
+ _check_generic_move(regs,
+ apply_to_all=apply_to_all,
+ registration_idx=registration_idx,
+ net_name=net_name,
+ share_name=share_name,
+ ip_address=ip_address,
+ client_computer=client_computer,
+ new_node=-1)
+
+ for node_idx in range(0, len(self.nodes)):
+ node = self.nodes[node_idx]
+
+ _check_generic_move(regs,
+ apply_to_all=apply_to_all,
+ registration_idx=registration_idx,
+ net_name=net_name,
+ share_name=share_name,
+ ip_address=ip_address,
+ client_computer=client_computer,
+ new_node=node_idx)
+ _check_generic_move(regs,
+ apply_to_all=apply_to_all,
+ registration_idx=registration_idx,
+ net_name=net_name,
+ share_name=share_name,
+ ip_address=ip_address,
+ client_computer=client_computer,
+ new_ip=node['ip'])
+
+ if msg_type == witness.WITNESS_NOTIFY_CLIENT_MOVE:
+ only_shares = False
+ elif msg_type == witness.WITNESS_NOTIFY_SHARE_MOVE:
+ only_shares = True
+
+ self.check_combinations(check_generic_move, only_shares=only_shares)
+
+ def test_net_witness_client_move(self):
+ self._test_net_witness_generic_move('client-move',
+ 'CLIENT_MOVE_TO',
+ witness.WITNESS_NOTIFY_CLIENT_MOVE)
+ def test_net_witness_share_move(self):
+ self._test_net_witness_generic_move('share-move',
+ 'SHARE_MOVE_TO',
+ witness.WITNESS_NOTIFY_SHARE_MOVE)
+
if __name__ == "__main__":
import unittest
unittest.main()