ip_address=None,
client_computer=None,
new_ip=None,
- new_node=None):
+ new_node=None,
+ forced_response=None):
COMMAND = "UID_WRAPPER_ROOT=1 bin/net witness"
argv = "%s %s" % (COMMAND, subcmd)
if new_node is not None:
argv += " --witness-new-node='%s'" % (new_node)
+ if forced_response:
+ argv += " --witness-forced-response='%s'" % (forced_response)
+
try:
if self.verbose:
print("Calling: %s" % argv)
client_computer=None,
new_ip=None,
new_node=None,
+ forced_response=None,
expected_msg_type=None,
callback=None):
self.open_all_registrations()
ip_address=ip_address,
client_computer=client_computer,
new_ip=new_ip,
- new_node=new_node)
+ new_node=new_node,
+ forced_response=forced_response)
if self.verbose:
print("%s" % plain_res)
plain_lines = plain_res.splitlines()
ip_address=ip_address,
client_computer=client_computer,
new_ip=new_ip,
- new_node=new_node)
+ new_node=new_node,
+ forced_response=forced_response)
num_filters = 0
if apply_to_all:
elif new_node is not None:
num_sub += 1
self.assertEqual(json_message['new_node'], new_node)
+ if forced_response is not None:
+ num_sub += 1
+ forced_response_json = json.loads(str(forced_response))
+ self.assertDictEqual(json_message['json'], forced_response_json)
self.assertEqual(len(json_message.keys()), num_sub)
self.check_combinations(check_force_unregister)
+ def _test_net_witness_force_response(self,
+ msg_type=None,
+ expected_resource_changes=None,
+ expected_ip_lists=None):
+ def check_force_response(regs,
+ apply_to_all=False,
+ registration_idx=None,
+ net_name=None,
+ share_name=None,
+ ip_address=None,
+ client_computer=None):
+ move_types = [
+ witness.WITNESS_NOTIFY_CLIENT_MOVE,
+ witness.WITNESS_NOTIFY_SHARE_MOVE,
+ witness.WITNESS_NOTIFY_IP_CHANGE,
+ ]
+
+ forced_response = '{ '
+ forced_response += '"result": 0, '
+ forced_response += '"response": { '
+ forced_response += '"type": %u, ' % msg_type
+ forced_response += '"messages": [ '
+ if msg_type == witness.WITNESS_NOTIFY_RESOURCE_CHANGE:
+ prefix_d1 = ""
+ for rc in expected_resource_changes:
+ forced_response += prefix_d1
+ forced_response += '{ '
+ prefix_d2 = ""
+ if 'type' in rc:
+ forced_response += prefix_d2
+ forced_response += '"type": %u ' % rc['type']
+ prefix_d2 = ", "
+ if 'name' in rc:
+ forced_response += prefix_d2
+ forced_response += '"name": "%s" ' % rc['name']
+ prefix_d2 = ", "
+ forced_response += '} '
+ prefix_d1 = ", "
+ if msg_type in move_types:
+ prefix_d1 = ""
+ for ip_list in expected_ip_lists:
+ forced_response += prefix_d1
+ forced_response += '['
+ prefix_d2 = ""
+ for ip in ip_list:
+ forced_response += prefix_d2
+ forced_response += '{ '
+ prefix_d3 = ""
+ if 'flags' in ip:
+ forced_response += prefix_d3
+ forced_response += '"flags": %u' % ip['flags']
+ prefix_d3 = ", "
+ if 'ipv4' in ip:
+ forced_response += prefix_d3
+ forced_response += '"ipv4": "%s" ' % ip['ipv4']
+ prefix_d3 = ", "
+ if 'ipv6' in ip:
+ forced_response += prefix_d3
+ forced_response += '"ipv6": "%s" ' % ip['ipv6']
+ prefix_d3 = ", "
+ forced_response += '}'
+ prefix_d2 = ", "
+ forced_response += ']'
+ prefix_d1 = ", "
+ forced_response += ']'
+ forced_response += '}'
+ forced_response += '}'
+
+ def check_forced_response_result(reg):
+ conn = reg['conn']
+ reg_context = reg['context']
+ response = conn.AsyncNotify(reg_context)
+ if msg_type == witness.WITNESS_NOTIFY_RESOURCE_CHANGE:
+ self.assertResourceChanges(response, expected_resource_changes)
+ if msg_type in move_types:
+ self.assertGenericIpLists(response, msg_type, expected_ip_lists)
+
+ return self.check_net_witness_output("force-response",
+ regs,
+ apply_to_all=apply_to_all,
+ registration_idx=registration_idx,
+ net_name=net_name,
+ share_name=share_name,
+ ip_address=ip_address,
+ client_computer=client_computer,
+ forced_response=forced_response,
+ expected_msg_type="FORCE_RESPONSE",
+ callback=check_forced_response_result)
+
+ self.check_combinations(check_force_response)
+
+ def test_net_witness_force_response_resource_changes(self):
+ msg_type = witness.WITNESS_NOTIFY_RESOURCE_CHANGE
+ expected_resource_changes = [
+ {
+ 'type': witness.WITNESS_RESOURCE_STATE_UNAVAILABLE,
+ 'name': "some-resource-name"
+ },
+ {
+ 'type': witness.WITNESS_RESOURCE_STATE_AVAILABLE,
+ 'name': "other-resource-name"
+ },
+ ]
+ self._test_net_witness_force_response(msg_type=msg_type,
+ expected_resource_changes=expected_resource_changes)
+
+ def _test_net_witness_force_response_generic_moves(self, msg_type):
+ expected_flags = 0
+ expected_flags |= witness.WITNESS_IPADDR_V4
+ expected_flags |= witness.WITNESS_IPADDR_ONLINE
+
+ expected_ip_list10 = [
+ {
+ 'flags': expected_flags,
+ 'ipv4': '10.0.10.1',
+ },
+ {
+ 'flags': 0,
+ 'ipv4': '10.0.10.2',
+ 'ipv6': 'fd00:0000:0000:0000:0010:0000:0010:0002',
+ },
+ ]
+ expected_ip_list20 = [
+ {
+ 'flags': expected_flags,
+ 'ipv4': '10.0.20.1',
+ },
+ {
+ 'flags': 0,
+ 'ipv4': '10.0.20.2',
+ 'ipv6': 'fd00:0000:0000:0000:0010:0000:0020:0002',
+ },
+ ]
+
+ expected_ip_lists = [expected_ip_list10, expected_ip_list20]
+ self._test_net_witness_force_response(msg_type=msg_type,
+ expected_ip_lists=expected_ip_lists)
+
+ def test_net_witness_force_response_client_moves(self):
+ msg_type = witness.WITNESS_NOTIFY_CLIENT_MOVE
+ self._test_net_witness_force_response_generic_moves(msg_type)
+
+ def test_net_witness_force_response_share_moves(self):
+ msg_type = witness.WITNESS_NOTIFY_SHARE_MOVE
+ self._test_net_witness_force_response_generic_moves(msg_type)
+
+ def test_net_witness_force_response_ip_changes(self):
+ msg_type = witness.WITNESS_NOTIFY_IP_CHANGE
+ self._test_net_witness_force_response_generic_moves(msg_type)
+
if __name__ == "__main__":
import unittest
unittest.main()