tools/ctdb: Remove setdbseqnum command
[ctdb.git] / server / ctdb_server.c
1 /* 
2    ctdb main protocol code
3
4    Copyright (C) Andrew Tridgell  2006
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #include "tdb.h"
22 #include "lib/util/dlinklist.h"
23 #include "system/network.h"
24 #include "system/filesys.h"
25 #include "../include/ctdb_private.h"
26
27 /*
28   choose the transport we will use
29 */
30 int ctdb_set_transport(struct ctdb_context *ctdb, const char *transport)
31 {
32         ctdb->transport = talloc_strdup(ctdb, transport);
33         CTDB_NO_MEMORY(ctdb, ctdb->transport);
34
35         return 0;
36 }
37
38 /*
39   Check whether an ip is a valid node ip
40   Returns the node id for this ip address or -1
41 */
42 int ctdb_ip_to_nodeid(struct ctdb_context *ctdb, const char *nodeip)
43 {
44         int nodeid;
45
46         for (nodeid=0;nodeid<ctdb->num_nodes;nodeid++) {
47                 if (ctdb->nodes[nodeid]->flags & NODE_FLAGS_DELETED) {
48                         continue;
49                 }
50                 if (!strcmp(ctdb->nodes[nodeid]->address.address, nodeip)) {
51                         return nodeid;
52                 }
53         }
54
55         return -1;
56 }
57
58 /*
59   choose the recovery lock file
60 */
61 int ctdb_set_recovery_lock_file(struct ctdb_context *ctdb, const char *file)
62 {
63         if (ctdb->recovery_lock_file != NULL) {
64                 talloc_free(ctdb->recovery_lock_file);
65                 ctdb->recovery_lock_file = NULL;
66         }
67
68         if (file == NULL) {
69                 DEBUG(DEBUG_ALERT,("Recovery lock file set to \"\". Disabling recovery lock checking\n"));
70                 ctdb->tunable.verify_recovery_lock = 0;
71                 return 0;
72         }
73
74         ctdb->recovery_lock_file = talloc_strdup(ctdb, file);
75         CTDB_NO_MEMORY(ctdb, ctdb->recovery_lock_file);
76
77         return 0;
78 }
79
80 /*
81   set the directory for the local databases
82 */
83 int ctdb_set_tdb_dir(struct ctdb_context *ctdb, const char *dir)
84 {
85         ctdb->db_directory = talloc_strdup(ctdb, dir);
86         if (ctdb->db_directory == NULL) {
87                 return -1;
88         }
89         return 0;
90 }
91
92 /*
93   set the directory for the persistent databases
94 */
95 int ctdb_set_tdb_dir_persistent(struct ctdb_context *ctdb, const char *dir)
96 {
97         ctdb->db_directory_persistent = talloc_strdup(ctdb, dir);
98         if (ctdb->db_directory_persistent == NULL) {
99                 return -1;
100         }
101         return 0;
102 }
103
104 /*
105   set the directory for internal state databases
106 */
107 int ctdb_set_tdb_dir_state(struct ctdb_context *ctdb, const char *dir)
108 {
109         ctdb->db_directory_state = talloc_strdup(ctdb, dir);
110         if (ctdb->db_directory_state == NULL) {
111                 return -1;
112         }
113         return 0;
114 }
115
116 /*
117   add a node to the list of nodes
118 */
119 static int ctdb_add_node(struct ctdb_context *ctdb, char *nstr)
120 {
121         struct ctdb_node *node, **nodep;
122
123         nodep = talloc_realloc(ctdb, ctdb->nodes, struct ctdb_node *, ctdb->num_nodes+1);
124         CTDB_NO_MEMORY(ctdb, nodep);
125
126         ctdb->nodes = nodep;
127         nodep = &ctdb->nodes[ctdb->num_nodes];
128         (*nodep) = talloc_zero(ctdb->nodes, struct ctdb_node);
129         CTDB_NO_MEMORY(ctdb, *nodep);
130         node = *nodep;
131
132         if (ctdb_parse_address(ctdb, node, nstr, &node->address) != 0) {
133                 return -1;
134         }
135         node->ctdb = ctdb;
136         node->name = talloc_asprintf(node, "%s:%u", 
137                                      node->address.address, 
138                                      node->address.port);
139         /* this assumes that the nodes are kept in sorted order, and no gaps */
140         node->pnn = ctdb->num_nodes;
141
142         /* nodes start out disconnected and unhealthy */
143         node->flags = (NODE_FLAGS_DISCONNECTED | NODE_FLAGS_UNHEALTHY);
144
145         if (ctdb->address.address &&
146             ctdb_same_address(&ctdb->address, &node->address)) {
147                 /* for automatic binding to interfaces, see tcp_connect.c */
148                 ctdb->pnn = node->pnn;
149         }
150
151         ctdb->num_nodes++;
152         node->dead_count = 0;
153
154         return 0;
155 }
156
157 /*
158   add an entry for a "deleted" node to the list of nodes.
159   a "deleted" node is a node that is commented out from the nodes file.
160   this is used to prevent that subsequent nodes in the nodes list
161   change their pnn value if a node is "delete" by commenting it out and then
162   using "ctdb reloadnodes" at runtime.
163 */
164 static int ctdb_add_deleted_node(struct ctdb_context *ctdb)
165 {
166         struct ctdb_node *node, **nodep;
167
168         nodep = talloc_realloc(ctdb, ctdb->nodes, struct ctdb_node *, ctdb->num_nodes+1);
169         CTDB_NO_MEMORY(ctdb, nodep);
170
171         ctdb->nodes = nodep;
172         nodep = &ctdb->nodes[ctdb->num_nodes];
173         (*nodep) = talloc_zero(ctdb->nodes, struct ctdb_node);
174         CTDB_NO_MEMORY(ctdb, *nodep);
175         node = *nodep;
176         
177         if (ctdb_parse_address(ctdb, node, "0.0.0.0", &node->address) != 0) {
178                 DEBUG(DEBUG_ERR,("Failed to setup deleted node %d\n", ctdb->num_nodes));
179                 return -1;
180         }
181         node->ctdb = ctdb;
182         node->name = talloc_strdup(node, "0.0.0.0:0");
183
184         /* this assumes that the nodes are kept in sorted order, and no gaps */
185         node->pnn = ctdb->num_nodes;
186
187         /* this node is permanently deleted/disconnected */
188         node->flags = NODE_FLAGS_DELETED|NODE_FLAGS_DISCONNECTED;
189
190         ctdb->num_nodes++;
191         node->dead_count = 0;
192
193         return 0;
194 }
195
196
197 /*
198   setup the node list from a file
199 */
200 int ctdb_set_nlist(struct ctdb_context *ctdb, const char *nlist)
201 {
202         char **lines;
203         int nlines;
204         int i, j, num_present;
205
206         talloc_free(ctdb->nodes);
207         ctdb->nodes     = NULL;
208         ctdb->num_nodes = 0;
209
210         lines = file_lines_load(nlist, &nlines, ctdb);
211         if (lines == NULL) {
212                 ctdb_set_error(ctdb, "Failed to load nlist '%s'\n", nlist);
213                 return -1;
214         }
215         while (nlines > 0 && strcmp(lines[nlines-1], "") == 0) {
216                 nlines--;
217         }
218
219         num_present = 0;
220         for (i=0; i < nlines; i++) {
221                 char *node;
222
223                 node = lines[i];
224                 /* strip leading spaces */
225                 while((*node == ' ') || (*node == '\t')) {
226                         node++;
227                 }
228                 if (*node == '#') {
229                         if (ctdb_add_deleted_node(ctdb) != 0) {
230                                 talloc_free(lines);
231                                 return -1;
232                         }
233                         continue;
234                 }
235                 if (strcmp(node, "") == 0) {
236                         continue;
237                 }
238                 if (ctdb_add_node(ctdb, node) != 0) {
239                         talloc_free(lines);
240                         return -1;
241                 }
242                 num_present++;
243         }
244
245         /* initialize the vnn mapping table now that we have the nodes list,
246            skipping any deleted nodes
247         */
248         ctdb->vnn_map = talloc(ctdb, struct ctdb_vnn_map);
249         CTDB_NO_MEMORY(ctdb, ctdb->vnn_map);
250
251         ctdb->vnn_map->generation = INVALID_GENERATION;
252         ctdb->vnn_map->size = num_present;
253         ctdb->vnn_map->map = talloc_array(ctdb->vnn_map, uint32_t, ctdb->vnn_map->size);
254         CTDB_NO_MEMORY(ctdb, ctdb->vnn_map->map);
255
256         for(i=0, j=0; i < ctdb->vnn_map->size; i++) {
257                 if (ctdb->nodes[i]->flags & NODE_FLAGS_DELETED) {
258                         continue;
259                 }
260                 ctdb->vnn_map->map[j] = i;
261                 j++;
262         }
263         
264         talloc_free(lines);
265         return 0;
266 }
267
268
269 /*
270   setup the local node address
271 */
272 int ctdb_set_address(struct ctdb_context *ctdb, const char *address)
273 {
274         if (ctdb_parse_address(ctdb, ctdb, address, &ctdb->address) != 0) {
275                 return -1;
276         }
277         
278         ctdb->name = talloc_asprintf(ctdb, "%s:%u", 
279                                      ctdb->address.address, 
280                                      ctdb->address.port);
281         return 0;
282 }
283
284
285 /*
286   return the number of active nodes
287 */
288 uint32_t ctdb_get_num_active_nodes(struct ctdb_context *ctdb)
289 {
290         int i;
291         uint32_t count=0;
292         for (i=0; i < ctdb->num_nodes; i++) {
293                 if (!(ctdb->nodes[i]->flags & NODE_FLAGS_INACTIVE)) {
294                         count++;
295                 }
296         }
297         return count;
298 }
299
300
301 /*
302   called when we need to process a packet. This can be a requeued packet
303   after a lockwait, or a real packet from another node
304 */
305 void ctdb_input_pkt(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
306 {
307         TALLOC_CTX *tmp_ctx;
308
309         /* place the packet as a child of the tmp_ctx. We then use
310            talloc_free() below to free it. If any of the calls want
311            to keep it, then they will steal it somewhere else, and the
312            talloc_free() will only free the tmp_ctx */
313         tmp_ctx = talloc_new(ctdb);
314         talloc_steal(tmp_ctx, hdr);
315
316         DEBUG(DEBUG_DEBUG,(__location__ " ctdb request %u of type %u length %u from "
317                  "node %u to %u\n", hdr->reqid, hdr->operation, hdr->length,
318                  hdr->srcnode, hdr->destnode));
319
320         switch (hdr->operation) {
321         case CTDB_REQ_CALL:
322         case CTDB_REPLY_CALL:
323         case CTDB_REQ_DMASTER:
324         case CTDB_REPLY_DMASTER:
325                 /* we dont allow these calls when banned */
326                 if (ctdb->nodes[ctdb->pnn]->flags & NODE_FLAGS_BANNED) {
327                         DEBUG(DEBUG_DEBUG,(__location__ " ctdb operation %u"
328                                 " request %u"
329                                 " length %u from node %u to %u while node"
330                                 " is banned\n",
331                                  hdr->operation, hdr->reqid,
332                                  hdr->length, 
333                                  hdr->srcnode, hdr->destnode));
334                         goto done;
335                 }
336
337                 /* for ctdb_call inter-node operations verify that the
338                    remote node that sent us the call is running in the
339                    same generation instance as this node
340                 */
341                 if (ctdb->vnn_map->generation != hdr->generation) {
342                         DEBUG(DEBUG_DEBUG,(__location__ " ctdb operation %u"
343                                 " request %u"
344                                 " length %u from node %u to %u had an"
345                                 " invalid generation id:%u while our"
346                                 " generation id is:%u\n", 
347                                  hdr->operation, hdr->reqid,
348                                  hdr->length, 
349                                  hdr->srcnode, hdr->destnode, 
350                                  hdr->generation, ctdb->vnn_map->generation));
351                         goto done;
352                 }
353         }
354
355         switch (hdr->operation) {
356         case CTDB_REQ_CALL:
357                 CTDB_INCREMENT_STAT(ctdb, node.req_call);
358                 ctdb_request_call(ctdb, hdr);
359                 break;
360
361         case CTDB_REPLY_CALL:
362                 CTDB_INCREMENT_STAT(ctdb, node.reply_call);
363                 ctdb_reply_call(ctdb, hdr);
364                 break;
365
366         case CTDB_REPLY_ERROR:
367                 CTDB_INCREMENT_STAT(ctdb, node.reply_error);
368                 ctdb_reply_error(ctdb, hdr);
369                 break;
370
371         case CTDB_REQ_DMASTER:
372                 CTDB_INCREMENT_STAT(ctdb, node.req_dmaster);
373                 ctdb_request_dmaster(ctdb, hdr);
374                 break;
375
376         case CTDB_REPLY_DMASTER:
377                 CTDB_INCREMENT_STAT(ctdb, node.reply_dmaster);
378                 ctdb_reply_dmaster(ctdb, hdr);
379                 break;
380
381         case CTDB_REQ_MESSAGE:
382                 CTDB_INCREMENT_STAT(ctdb, node.req_message);
383                 ctdb_request_message(ctdb, hdr);
384                 break;
385
386         case CTDB_REQ_CONTROL:
387                 CTDB_INCREMENT_STAT(ctdb, node.req_control);
388                 ctdb_request_control(ctdb, hdr);
389                 break;
390
391         case CTDB_REPLY_CONTROL:
392                 CTDB_INCREMENT_STAT(ctdb, node.reply_control);
393                 ctdb_reply_control(ctdb, hdr);
394                 break;
395
396         case CTDB_REQ_KEEPALIVE:
397                 CTDB_INCREMENT_STAT(ctdb, keepalive_packets_recv);
398                 break;
399
400         default:
401                 DEBUG(DEBUG_CRIT,("%s: Packet with unknown operation %u\n", 
402                          __location__, hdr->operation));
403                 break;
404         }
405
406 done:
407         talloc_free(tmp_ctx);
408 }
409
410
411 /*
412   called by the transport layer when a node is dead
413 */
414 void ctdb_node_dead(struct ctdb_node *node)
415 {
416         if (node->flags & NODE_FLAGS_DISCONNECTED) {
417                 DEBUG(DEBUG_INFO,("%s: node %s is already marked disconnected: %u connected\n", 
418                          node->ctdb->name, node->name, 
419                          node->ctdb->num_connected));
420                 return;
421         }
422         node->ctdb->num_connected--;
423         node->flags |= NODE_FLAGS_DISCONNECTED | NODE_FLAGS_UNHEALTHY;
424         node->rx_cnt = 0;
425         node->dead_count = 0;
426
427         DEBUG(DEBUG_NOTICE,("%s: node %s is dead: %u connected\n", 
428                  node->ctdb->name, node->name, node->ctdb->num_connected));
429         ctdb_daemon_cancel_controls(node->ctdb, node);
430
431         if (node->ctdb->methods == NULL) {
432                 DEBUG(DEBUG_ERR,(__location__ " Can not restart transport while shutting down daemon.\n"));
433                 return;
434         }
435
436         node->ctdb->methods->restart(node);
437 }
438
439 /*
440   called by the transport layer when a node is connected
441 */
442 void ctdb_node_connected(struct ctdb_node *node)
443 {
444         if (!(node->flags & NODE_FLAGS_DISCONNECTED)) {
445                 DEBUG(DEBUG_INFO,("%s: node %s is already marked connected: %u connected\n", 
446                          node->ctdb->name, node->name, 
447                          node->ctdb->num_connected));
448                 return;
449         }
450         node->ctdb->num_connected++;
451         node->dead_count = 0;
452         node->flags &= ~NODE_FLAGS_DISCONNECTED;
453         node->flags |= NODE_FLAGS_UNHEALTHY;
454         DEBUG(DEBUG_INFO,("%s: connected to %s - %u connected\n", 
455                  node->ctdb->name, node->name, node->ctdb->num_connected));
456 }
457
458 struct queue_next {
459         struct ctdb_context *ctdb;
460         struct ctdb_req_header *hdr;
461 };
462
463
464 /*
465   triggered when a deferred packet is due
466  */
467 static void queue_next_trigger(struct event_context *ev, struct timed_event *te, 
468                                struct timeval t, void *private_data)
469 {
470         struct queue_next *q = talloc_get_type(private_data, struct queue_next);
471         ctdb_input_pkt(q->ctdb, q->hdr);
472         talloc_free(q);
473 }       
474
475 /*
476   defer a packet, so it is processed on the next event loop
477   this is used for sending packets to ourselves
478  */
479 static void ctdb_defer_packet(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
480 {
481         struct queue_next *q;
482         q = talloc(ctdb, struct queue_next);
483         if (q == NULL) {
484                 DEBUG(DEBUG_ERR,(__location__ " Failed to allocate deferred packet\n"));
485                 return;
486         }
487         q->ctdb = ctdb;
488         q->hdr = talloc_memdup(ctdb, hdr, hdr->length);
489         if (q->hdr == NULL) {
490                 DEBUG(DEBUG_ERR,("Error copying deferred packet to self\n"));
491                 return;
492         }
493 #if 0
494         /* use this to put packets directly into our recv function */
495         ctdb_input_pkt(q->ctdb, q->hdr);
496 #else
497         event_add_timed(ctdb->ev, q, timeval_zero(), queue_next_trigger, q);
498 #endif
499 }
500
501
502 /*
503   broadcast a packet to all nodes
504 */
505 static void ctdb_broadcast_packet_all(struct ctdb_context *ctdb, 
506                                       struct ctdb_req_header *hdr)
507 {
508         int i;
509         for (i=0; i < ctdb->num_nodes; i++) {
510                 if (ctdb->nodes[i]->flags & NODE_FLAGS_DELETED) {
511                         continue;
512                 }
513                 hdr->destnode = ctdb->nodes[i]->pnn;
514                 ctdb_queue_packet(ctdb, hdr);
515         }
516 }
517
518 /*
519   broadcast a packet to all nodes in the current vnnmap
520 */
521 static void ctdb_broadcast_packet_vnnmap(struct ctdb_context *ctdb, 
522                                          struct ctdb_req_header *hdr)
523 {
524         int i;
525         for (i=0;i<ctdb->vnn_map->size;i++) {
526                 hdr->destnode = ctdb->vnn_map->map[i];
527                 ctdb_queue_packet(ctdb, hdr);
528         }
529 }
530
531 /*
532   broadcast a packet to all connected nodes
533 */
534 static void ctdb_broadcast_packet_connected(struct ctdb_context *ctdb, 
535                                             struct ctdb_req_header *hdr)
536 {
537         int i;
538         for (i=0; i < ctdb->num_nodes; i++) {
539                 if (ctdb->nodes[i]->flags & NODE_FLAGS_DELETED) {
540                         continue;
541                 }
542                 if (!(ctdb->nodes[i]->flags & NODE_FLAGS_DISCONNECTED)) {
543                         hdr->destnode = ctdb->nodes[i]->pnn;
544                         ctdb_queue_packet(ctdb, hdr);
545                 }
546         }
547 }
548
549 /*
550   queue a packet or die
551 */
552 void ctdb_queue_packet(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
553 {
554         struct ctdb_node *node;
555
556         switch (hdr->destnode) {
557         case CTDB_BROADCAST_ALL:
558                 ctdb_broadcast_packet_all(ctdb, hdr);
559                 return;
560         case CTDB_BROADCAST_VNNMAP:
561                 ctdb_broadcast_packet_vnnmap(ctdb, hdr);
562                 return;
563         case CTDB_BROADCAST_CONNECTED:
564                 ctdb_broadcast_packet_connected(ctdb, hdr);
565                 return;
566         }
567
568         CTDB_INCREMENT_STAT(ctdb, node_packets_sent);
569
570         if (!ctdb_validate_pnn(ctdb, hdr->destnode)) {
571                 DEBUG(DEBUG_CRIT,(__location__ " cant send to node %u that does not exist\n", 
572                          hdr->destnode));
573                 return;
574         }
575
576         node = ctdb->nodes[hdr->destnode];
577
578         if (node->flags & NODE_FLAGS_DELETED) {
579                 DEBUG(DEBUG_ERR, (__location__ " Can not queue packet to DELETED node %d\n", hdr->destnode));
580                 return;
581         }
582
583         if (node->pnn == ctdb->pnn) {
584                 ctdb_defer_packet(ctdb, hdr);
585                 return;
586         }
587
588         if (ctdb->methods == NULL) {
589                 DEBUG(DEBUG_ALERT, (__location__ " Can not queue packet. "
590                                     "Transport is DOWN\n"));
591                 return;
592         }
593
594         node->tx_cnt++;
595         if (ctdb->methods->queue_pkt(node, (uint8_t *)hdr, hdr->length) != 0) {
596                 ctdb_fatal(ctdb, "Unable to queue packet\n");
597         }
598 }
599
600
601
602
603 /*
604   a valgrind hack to allow us to get opcode specific backtraces
605   very ugly, and relies on no compiler optimisation!
606 */
607 void ctdb_queue_packet_opcode(struct ctdb_context *ctdb, struct ctdb_req_header *hdr, unsigned opcode)
608 {
609         switch (opcode) {
610 #define DO_OP(x) case x: ctdb_queue_packet(ctdb, hdr); break
611                 DO_OP(1);
612                 DO_OP(2);
613                 DO_OP(3);
614                 DO_OP(4);
615                 DO_OP(5);
616                 DO_OP(6);
617                 DO_OP(7);
618                 DO_OP(8);
619                 DO_OP(9);
620                 DO_OP(10);
621                 DO_OP(11);
622                 DO_OP(12);
623                 DO_OP(13);
624                 DO_OP(14);
625                 DO_OP(15);
626                 DO_OP(16);
627                 DO_OP(17);
628                 DO_OP(18);
629                 DO_OP(19);
630                 DO_OP(20);
631                 DO_OP(21);
632                 DO_OP(22);
633                 DO_OP(23);
634                 DO_OP(24);
635                 DO_OP(25);
636                 DO_OP(26);
637                 DO_OP(27);
638                 DO_OP(28);
639                 DO_OP(29);
640                 DO_OP(30);
641                 DO_OP(31);
642                 DO_OP(32);
643                 DO_OP(33);
644                 DO_OP(34);
645                 DO_OP(35);
646                 DO_OP(36);
647                 DO_OP(37);
648                 DO_OP(38);
649                 DO_OP(39);
650                 DO_OP(40);
651                 DO_OP(41);
652                 DO_OP(42);
653                 DO_OP(43);
654                 DO_OP(44);
655                 DO_OP(45);
656                 DO_OP(46);
657                 DO_OP(47);
658                 DO_OP(48);
659                 DO_OP(49);
660                 DO_OP(50);
661                 DO_OP(51);
662                 DO_OP(52);
663                 DO_OP(53);
664                 DO_OP(54);
665                 DO_OP(55);
666                 DO_OP(56);
667                 DO_OP(57);
668                 DO_OP(58);
669                 DO_OP(59);
670                 DO_OP(60);
671                 DO_OP(61);
672                 DO_OP(62);
673                 DO_OP(63);
674                 DO_OP(64);
675                 DO_OP(65);
676                 DO_OP(66);
677                 DO_OP(67);
678                 DO_OP(68);
679                 DO_OP(69);
680                 DO_OP(70);
681                 DO_OP(71);
682                 DO_OP(72);
683                 DO_OP(73);
684                 DO_OP(74);
685                 DO_OP(75);
686                 DO_OP(76);
687                 DO_OP(77);
688                 DO_OP(78);
689                 DO_OP(79);
690                 DO_OP(80);
691                 DO_OP(81);
692                 DO_OP(82);
693                 DO_OP(83);
694                 DO_OP(84);
695                 DO_OP(85);
696                 DO_OP(86);
697                 DO_OP(87);
698                 DO_OP(88);
699                 DO_OP(89);
700                 DO_OP(90);
701                 DO_OP(91);
702                 DO_OP(92);
703                 DO_OP(93);
704                 DO_OP(94);
705                 DO_OP(95);
706                 DO_OP(96);
707                 DO_OP(97);
708                 DO_OP(98);
709                 DO_OP(99);
710                 DO_OP(100);
711         default: 
712                 ctdb_queue_packet(ctdb, hdr);
713                 break;
714         }
715 }