3 # ctdb ip takeover code
5 # Copyright (C) Martin Schwenke, Ronnie Sahlberg 2010, 2011
7 # Based on original CTDB C code:
9 # Copyright (C) Ronnie Sahlberg 2007
10 # Copyright (C) Andrew Tridgell 2007
12 # This program is free software; you can redistribute it and/or modify
13 # it under the terms of the GNU General Public License as published by
14 # the Free Software Foundation; either version 3 of the License, or
15 # (at your option) any later version.
17 # This program is distributed in the hope that it will be useful,
18 # but WITHOUT ANY WARRANTY; without even the implied warranty of
19 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
20 # GNU General Public License for more details.
22 # You should have received a copy of the GNU General Public License
23 # along with this program; if not, see <http://www.gnu.org/licenses/>.
28 # Use optparse since newer argparse not available in RHEL5/EPEL.
29 from optparse import OptionParser
34 # For parsing IP addresses
40 def process_args(extra_options=[]):
43 parser = OptionParser(option_list=extra_options)
45 parser.add_option("--nd",
46 action="store_false", dest="deterministic_public_ips",
48 help="turn off deterministic_public_ips")
49 parser.add_option("--ni",
50 action="store_true", dest="no_ip_failback", default=False,
51 help="turn on no_ip_failback")
52 parser.add_option("-L", "--lcp2",
53 action="store_true", dest="lcp2", default=False,
54 help="use LCP2 IP rebalancing algorithm [default: %default]")
55 parser.add_option("-b", "--balance",
56 action="store_true", dest="balance", default=False,
57 help="show (im)balance information after each event")
58 parser.add_option("-d", "--diff",
59 action="store_true", dest="diff", default=False,
60 help="show IP address movements for each event")
61 parser.add_option("-n", "--no-print",
62 action="store_false", dest="show", default=True,
63 help="don't show IP address layout after each event")
64 parser.add_option("-v", "--verbose",
65 action="count", dest="verbose", default=0,
66 help="print information and actions taken to stdout")
67 parser.add_option("-r", "--retries",
68 action="store", type="int", dest="retries", default=5,
69 help="number of retry loops for rebalancing non-deterministic failback [default: %default]")
70 parser.add_option("-i", "--iterations",
71 action="store", type="int", dest="iterations",
73 help="number of iterations to run in test [default: %default]")
74 parser.add_option("-o", "--odds",
75 action="store", type="int", dest="odds", default=4,
76 help="make the chances of a failover 1 in ODDS [default: %default]")
77 parser.add_option("-A", "--aggressive",
78 action="store_true", dest="aggressive", default=False,
79 help="apply ODDS to try to flip each node [default: %default]")
81 def seed_callback(option, opt, value, parser):
83 parser.add_option("-s", "--seed",
84 action="callback", type="int", callback=seed_callback,
85 help="initial random number seed for random events")
87 parser.add_option("-x", "--exit",
88 action="store_true", dest="exit", default=False,
89 help="exit on the 1st gratuitous IP move or IP imbalance")
90 parser.add_option("-H", "--hard-imbalance-limit",
91 action="store", type="int", dest="hard_limit", default=1,
92 help="exceeding this limit causes termination [default: %default]")
93 parser.add_option("-S", "--soft-imbalance-limit",
94 action="store", type="int", dest="soft_limit", default=1,
95 help="exceeding this limit increments a counter [default: %default]")
97 (options, args) = parser.parse_args()
100 parser.error("too many argumentss")
102 def print_begin(t, delim='='):
109 def verbose_begin(t):
110 if options.verbose > 0:
114 if options.verbose > 0:
117 def verbose_print(t):
118 if options.verbose > 0:
119 if not type(t) == list:
122 print "\n".join([str(i) for i in t])
124 # more than this and we switch to the logging module... :-)
126 if options.verbose > 1:
130 if options.verbose > 1:
134 if options.verbose > 1:
135 if not type(t) == list:
138 print "\n".join([str(i) for i in t])
140 def ip_to_list_of_ints(ip):
141 # Be lazy... but only expose errors in IPv4 addresses, since
142 # they'll be more commonly used. :-)
144 l = socket.inet_pton(socket.AF_INET6, ip)
146 # Pad with leading 0s. This makes IPv4 addresses comparable
147 # with IPv6 but reduces the overall effectiveness of the
148 # algorithm. The alternative would be to treat these
149 # addresses separately while trying to keep all the IPs in
151 l = "".join(itertools.repeat("\0", 12)) + \
152 socket.inet_pton(socket.AF_INET, ip)
154 return map(lambda x: struct.unpack('B', x)[0], l)
156 def ip_distance(ip1, ip2):
157 """Calculate the distance between 2 IPs.
159 This is the length of the longtest common prefix between the IPs.
160 It is calculated by XOR-ing the 2 IPs together and counting the
161 number of leading zeroes."""
164 for (o1, o2) in zip(ip_to_list_of_ints(ip1), ip_to_list_of_ints(ip2)):
165 # XOR this pair of octets
167 # count number leading zeroes
171 # bin() gives minimal length '0bNNN' string
172 distance += (8 - (len(bin(x)) - 2))
177 def ip_distance_2_sum(ip, ips):
178 """Calculate the IP distance for the given IP relative to IPs.
180 This could be made more efficient by insering ip_distance_2 into
181 the loop in this function. However, that would result in some
182 loss of clarity and also will not be necessary in a C
187 sum += ip_distance(ip, i) ** 2
191 def imbalance_metric(ips):
192 """Return the imbalance metric for a group of IPs.
194 This is the sum of squares of the IP distances between each pair of IPs."""
196 (h, t) = (ips[0], ips[1:])
197 return ip_distance_2_sum(h, t) + imbalance_metric(t)
202 return float(sum(l))/len(l)
205 def __init__(self, public_addresses):
206 # List of list allows groups of IPs to be passed in. They're
207 # not actually used in the algorithm but are just used by
208 # calculate_imbalance() for checking the simulation. Note
209 # that people can pass in garbage and make this code
210 # fail... but we're all friends here in simulation world...
212 if type(public_addresses[0]) is str:
213 self.public_addresses = set(public_addresses)
217 self.public_addresses = set([i for s in public_addresses for i in s])
218 self.ip_groups = public_addresses
220 self.current_addresses = set()
226 ("*" if len(self.public_addresses) == 0 else \
227 (" " if self.healthy else "#"),
228 sorted(list(self.current_addresses)),
229 " %d" % self.imbalance if options.lcp2 else "")
231 def can_node_serve_ip(self, ip):
232 return ip in self.public_addresses
234 def node_ip_coverage(self, ips=None):
235 return len([a for a in self.current_addresses if ips == None or a in ips])
237 def set_imbalance(self, imbalance=-1):
238 """Set the imbalance metric to the given value. If none given
239 then calculate it."""
242 self.imbalance = imbalance
244 self.imbalance = imbalance_metric(list(self.current_addresses))
246 def get_imbalance(self):
247 return self.imbalance
249 class Cluster(object):
252 self.deterministic_public_ips = options.deterministic_public_ips
253 self.no_ip_failback = options.no_ip_failback
254 self.all_public_ips = set()
258 self.grat_ip_moves = []
260 self.imbalance_groups = []
261 self.imbalance_count = 0
262 self.imbalance_groups_count = itertools.repeat(0)
263 self.imbalance_metric = []
265 self.num_unhealthy = []
270 return "\n".join(["%2d %s" % (i, n) \
271 for (i, n) in enumerate(self.nodes)])
273 # This is naive. It assumes that IP groups are indicated by the
274 # 1st node having IP groups.
275 def have_ip_groups(self):
276 return (len(self.nodes[0].ip_groups) > 0)
278 def print_statistics(self):
279 print_begin("STATISTICS")
280 print "Events: %6d" % self.events
281 print "Total IP moves: %6d" % sum(self.ip_moves)
282 print "Gratuitous IP moves: %6d" % sum(self.grat_ip_moves)
283 print "Max imbalance: %6d" % max(self.imbalance)
284 if self.have_ip_groups():
285 print "Max group imbalance counts: ", map(max, zip(*self.imbalance_groups))
286 print "Mean imbalance: %f" % mean(self.imbalance)
287 if self.have_ip_groups():
288 print "Mean group imbalances counts: ", map(mean, zip(*self.imbalance_groups))
289 print "Final imbalance: %6d" % self.imbalance[-1]
290 if self.have_ip_groups():
291 print "Final group imbalances: ", self.imbalance_groups[-1]
293 print "Max LCP2 imbalance : %6d" % max(self.imbalance_metric)
294 print "Soft imbalance count: %6d" % self.imbalance_count
295 if self.have_ip_groups():
296 print "Soft imbalance group counts: ", self.imbalance_groups_count
298 print "Final LCP2 imbalance : %6d" % self.imbalance_metric[-1]
299 print "Maximum unhealthy: %6d" % max(self.num_unhealthy)
302 def find_pnn_with_ip(self, ip):
303 for (i, n) in enumerate(self.nodes):
304 if ip in n.current_addresses:
308 def quietly_remove_ip(self, ip):
309 # Remove address from old node.
310 old = self.find_pnn_with_ip(ip)
312 self.nodes[old].current_addresses.remove(ip)
314 def add_node(self, node):
315 self.nodes.append(node)
316 self.all_public_ips |= node.public_addresses
318 def healthy(self, *pnns):
319 verbose_begin("HEALTHY")
322 self.nodes[pnn].healthy = True
327 def unhealthy(self, *pnns):
329 verbose_begin("UNHEALTHY")
332 self.nodes[pnn].healthy = False
337 def do_something_random(self):
339 """Make random node(s) healthy or unhealthy.
341 If options.aggressive is False then: If all nodes are healthy
342 or unhealthy, then invert one of them; otherwise, there's a 1
343 in options.odds chance of making another node unhealthy.
345 If options.aggressive is True then: For each node there is a 1
346 in options.odds chance of flipping the state of that node
347 between healthy and unhealthy."""
349 if not options.aggressive:
350 num_nodes = len(self.nodes)
351 healthy_pnns = [i for (i,n) in enumerate(self.nodes) if n.healthy]
352 num_healthy = len(healthy_pnns)
354 if num_nodes == num_healthy:
355 self.unhealthy(random.randint(0, num_nodes-1))
356 elif num_healthy == 0:
357 self.healthy(random.randint(0, num_nodes-1))
358 elif random.randint(1, options.odds) == 1:
359 self.unhealthy(random.choice(healthy_pnns))
361 all_pnns = range(num_nodes)
362 unhealthy_pnns = sorted(list(set(all_pnns) - set(healthy_pnns)))
363 self.healthy(random.choice(unhealthy_pnns))
365 # We need to make at least one change or we retry...x
368 for (pnn, n) in enumerate(self.nodes):
369 if random.randint(1, options.odds) == 1:
376 def random_iterations(self):
378 while i <= options.iterations:
379 verbose_begin("EVENT %d" % i)
381 self.do_something_random()
382 if self.recover() and options.exit:
386 self.print_statistics()
388 def imbalance_for_ips(self, ips):
396 for (i, n) in enumerate(self.nodes):
398 if not n.healthy or not n.can_node_serve_ip(ip):
401 num = n.node_ip_coverage(ips)
403 if maxnode == -1 or num > maxnum:
407 if minnode == -1 or num < minnum:
411 if maxnode == -1 or minnode == -1:
417 imbalance = max([imbalance, i])
422 def calculate_imbalance(self):
424 # First, do all the assigned IPs.
425 assigned = sorted([ip
427 for ip in n.current_addresses])
429 i = self.imbalance_for_ips(assigned)
432 # FIXME? If dealing with IP groups, assume the nodes are all
434 for ips in self.nodes[0].ip_groups:
435 gi = self.imbalance_for_ips(ips)
442 """Calculate differences in IP assignments between self and prev.
444 Gratuitous IP moves (from a healthy node to a healthy node)
445 are prefixed by !!."""
451 for (new, n) in enumerate(self.nodes):
452 for ip in n.current_addresses:
453 old = self.prev.find_pnn_with_ip(ip)
457 self.prev.nodes[new].healthy and \
458 self.nodes[new].healthy and \
459 self.nodes[old].healthy and \
460 self.prev.nodes[old].healthy:
465 details.append("%s %s: %d -> %d" %
466 (prefix, ip, old, new))
468 return (ip_moves, grat_ip_moves, details)
470 def find_takeover_node(self, ip):
474 for (i, n) in enumerate(self.nodes):
478 if not n.can_node_serve_ip(ip):
481 num = n.node_ip_coverage()
492 verbose_print("Could not find node to take over public address %s" % ip)
495 self.nodes[pnn].current_addresses.add(ip)
497 verbose_print("%s -> %d" % (ip, pnn))
500 def basic_allocate_unassigned(self):
502 assigned = set([ip for n in self.nodes for ip in n.current_addresses])
503 unassigned = sorted(list(self.all_public_ips - assigned))
505 for ip in unassigned:
506 self.find_takeover_node(ip)
508 def basic_failback(self, retries_l):
510 assigned = sorted([ip
512 for ip in n.current_addresses])
517 for (i, n) in enumerate(self.nodes):
521 if not n.can_node_serve_ip(ip):
524 num = n.node_ip_coverage()
542 print "Could not find maxnode. May not be able to serve ip", ip
545 #if self.deterministic_public_ips:
548 if maxnum > minnum + 1 and retries_l[0] < options.retries:
549 # Remove the 1st ip from maxnode
550 t = sorted(list(self.nodes[maxnode].current_addresses))
552 verbose_print("%s <- %d" % (realloc, maxnode))
553 self.nodes[maxnode].current_addresses.remove(realloc)
554 # Redo the outer loop.
561 def lcp2_allocate_unassigned(self):
563 # Assign as many unassigned addresses as possible. Keep
564 # selecting the optimal assignment until we don't manage to
566 assigned = set([ip for n in self.nodes for ip in n.current_addresses])
567 unassigned = sorted(list(self.all_public_ips - assigned))
570 while len(unassigned) > 0 and should_loop:
573 debug_begin(" CONSIDERING MOVES (UNASSIGNED)")
579 for ip in unassigned:
580 for dstnode in range(len(self.nodes)):
581 if self.nodes[dstnode].can_node_serve_ip(ip) and \
582 self.nodes[dstnode].healthy:
583 dstdsum = ip_distance_2_sum(ip, self.nodes[dstnode].current_addresses)
584 dstimbl = self.nodes[dstnode].get_imbalance() + dstdsum
585 debug_print(" %s -> %d [+%d]" % \
588 dstimbl - self.nodes[dstnode].get_imbalance()))
590 if (minnode == -1) or (dstdsum < mindsum):
599 self.nodes[minnode].current_addresses.add(minip)
600 self.nodes[minnode].set_imbalance(self.nodes[minnode].get_imbalance() + mindsum)
601 verbose_print("%s -> %d [+%d]" % (minip, minnode, mindsum))
602 unassigned.remove(minip)
604 for ip in unassigned:
605 verbose_print("Could not find node to take over public address %s" % ip)
607 def lcp2_failback(self, targets):
609 # Get the node with the highest imbalance metric.
612 for (pnn, n) in enumerate(self.nodes):
613 b = n.get_imbalance()
614 if (srcnode == -1) or (b > maximbl):
618 # This means that all nodes had 0 or 1 addresses, so can't
623 # We'll need this a few times...
624 ips = self.nodes[srcnode].current_addresses
626 # Find an IP and destination node that best reduces imbalance.
628 debug_begin(" CONSIDERING MOVES FROM %d [%d]" % (srcnode, maximbl))
630 # What is this IP address costing the source node?
631 srcdsum = ip_distance_2_sum(ip, ips - set([ip]))
632 srcimbl = maximbl - srcdsum
634 # Consider this IP address would cost each potential
635 # destination node. Destination nodes are limited to
636 # those that are newly healthy, since we don't want to
637 # do gratuitous failover of IPs just to make minor
638 # balance improvements.
639 for dstnode in targets:
640 if self.nodes[dstnode].can_node_serve_ip(ip) and \
641 self.nodes[dstnode].healthy:
642 dstdsum = ip_distance_2_sum(ip, self.nodes[dstnode].current_addresses)
643 dstimbl = self.nodes[dstnode].get_imbalance() + dstdsum
644 debug_print(" %d [%d] -> %s -> %d [+%d]" % \
646 srcimbl - self.nodes[srcnode].get_imbalance(),
649 dstimbl - self.nodes[dstnode].get_imbalance()))
651 if (dstimbl < maximbl) and (dstdsum < srcdsum):
653 optimum = (ip, srcnode, srcimbl, dstnode, dstimbl)
655 (x, sn, si, dn, di) = optimum
656 if (srcimbl + dstimbl) < (si + di):
657 optimum = (ip, srcnode, srcimbl, dstnode, dstimbl)
660 if optimum is not None:
661 # We found a move that makes things better...
662 (ip, srcnode, srcimbl, dstnode, dstimbl) = optimum
663 ini_srcimbl = self.nodes[srcnode].get_imbalance()
664 ini_dstimbl = self.nodes[dstnode].get_imbalance()
666 self.nodes[srcnode].current_addresses.remove(ip)
667 self.nodes[srcnode].set_imbalance(srcimbl)
669 self.nodes[dstnode].current_addresses.add(ip)
670 self.nodes[dstnode].set_imbalance(dstimbl)
672 verbose_print("%d [%d] -> %s -> %d [+%d]" % \
674 srcimbl - ini_srcimbl,
677 dstimbl - ini_dstimbl))
684 def ctdb_takeover_run(self):
688 # Don't bother with the num_healthy stuff. It is an
691 # We just keep the allocate IPs in the current_addresses field
692 # of the node. This needs to readable, not efficient!
694 if self.deterministic_public_ips:
696 addr_list = sorted(list(self.all_public_ips))
697 for (i, ip) in enumerate(addr_list):
698 self.quietly_remove_ip(ip)
699 # Add addresses to new node.
700 pnn = i % len(self.nodes)
701 self.nodes[pnn].current_addresses.add(ip)
702 verbose_print("%s -> %d" % (ip, pnn))
704 # Remove public addresses from unhealthy nodes.
705 for (pnn, n) in enumerate(self.nodes):
707 verbose_print(["%s <- %d" % (ip, pnn)
708 for ip in n.current_addresses])
709 n.current_addresses = set()
711 # If a node can't serve an assigned address then remove it.
713 verbose_print(["%s <- %d" % (ip, pnn)
714 for ip in n.current_addresses - n.public_addresses])
715 n.current_addresses &= n.public_addresses
718 newly_healthy = [pnn for (pnn, n) in enumerate(self.nodes)
719 if len(n.current_addresses) == 0 and n.healthy]
723 # We'll only retry the balancing act up to options.retries
724 # times (for the basic non-deterministic algorithm). This
725 # nonsense gives us a reference on the retries count in
726 # Python. It will be easier in C. :-)
727 # For LCP2 we reassignas many IPs from heavily "loaded" nodes
728 # to nodes that are newly healthy, looping until we fail to
736 self.lcp2_allocate_unassigned()
738 self.basic_allocate_unassigned()
740 if self.no_ip_failback or self.deterministic_public_ips:
744 if len(newly_healthy) == 0:
746 should_loop = self.lcp2_failback(newly_healthy)
748 should_loop = self.basic_failback(retries_l)
752 verbose_begin("TAKEOVER")
754 self.ctdb_takeover_run()
760 if self.prev is not None:
761 (ip_moves, grat_ip_moves, details) = self.diff()
762 self.ip_moves.append(ip_moves)
763 self.grat_ip_moves.append(grat_ip_moves)
767 print "\n".join(details)
770 (imbalance, imbalance_groups) = self.calculate_imbalance()
771 self.imbalance.append(imbalance)
772 self.imbalance_groups.append(imbalance_groups)
774 if imbalance > options.soft_limit:
775 self.imbalance_count += 1
777 # There must be a cleaner way...
779 for (c, i) in zip(self.imbalance_groups_count, imbalance_groups):
780 if i > options.soft_limit:
784 self.imbalance_groups_count = t
786 imbalance_metric = max([n.get_imbalance() for n in self.nodes])
787 self.imbalance_metric.append(imbalance_metric)
789 print_begin("IMBALANCE")
790 print "ALL IPS:", imbalance
791 if self.have_ip_groups():
792 print "IP GROUPS:", imbalance_groups
794 print "LCP2 IMBALANCE:", imbalance_metric
797 num_unhealthy = len(self.nodes) - \
798 len([n for n in self.nodes if n.healthy])
799 self.num_unhealthy.append(num_unhealthy)
807 self.prev = copy.deepcopy(self)
810 return (grat_ip_moves > 0) or \
811 (not self.have_ip_groups() and imbalance > options.hard_limit) or \
812 (self.have_ip_groups() and (max(imbalance_groups) > options.hard_limit))