first step towards fixing "make test" with the new daemon system
[samba.git] / ctdb / common / ctdb.c
1 /* 
2    ctdb main protocol code
3
4    Copyright (C) Andrew Tridgell  2006
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 2 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, write to the Free Software
18    Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
19 */
20
21 #include "includes.h"
22 #include "lib/tdb/include/tdb.h"
23 #include "lib/events/events.h"
24 #include "lib/util/dlinklist.h"
25 #include "system/network.h"
26 #include "system/filesys.h"
27 #include "../include/ctdb_private.h"
28
29 /*
30   choose the transport we will use
31 */
32 int ctdb_set_transport(struct ctdb_context *ctdb, const char *transport)
33 {
34         ctdb->transport = talloc_strdup(ctdb, transport);
35         return 0;
36 }
37
38 /*
39   choose the recovery lock file
40 */
41 int ctdb_set_recovery_lock_file(struct ctdb_context *ctdb, const char *file)
42 {
43         ctdb->recovery_lock_file = talloc_strdup(ctdb, file);
44         return 0;
45 }
46
47 /*
48   choose the logfile location
49 */
50 int ctdb_set_logfile(struct ctdb_context *ctdb, const char *logfile)
51 {
52         ctdb->logfile = talloc_strdup(ctdb, logfile);
53         if (ctdb->logfile != NULL && strcmp(logfile, "-") != 0) {
54                 int fd;
55                 close(1);
56                 close(2);
57                 fd = open(ctdb->logfile, O_WRONLY|O_APPEND|O_CREAT, 0666);
58                 if (fd == -1) {
59                         abort();
60                 }
61                 if (fd != 1) {
62                         dup2(fd, 1);
63                         close(fd);
64                 }
65                 /* also catch stderr of subcommands to the log file */
66                 dup2(1, 2);
67         }
68         return 0;
69 }
70
71
72 /*
73   set some ctdb flags
74 */
75 void ctdb_set_flags(struct ctdb_context *ctdb, unsigned flags)
76 {
77         ctdb->flags |= flags;
78 }
79
80 /*
81   clear some ctdb flags
82 */
83 void ctdb_clear_flags(struct ctdb_context *ctdb, unsigned flags)
84 {
85         ctdb->flags &= ~flags;
86 }
87
88 /*
89   set max acess count before a dmaster migration
90 */
91 void ctdb_set_max_lacount(struct ctdb_context *ctdb, unsigned count)
92 {
93         ctdb->max_lacount = count;
94 }
95
96 /*
97   set the directory for the local databases
98 */
99 int ctdb_set_tdb_dir(struct ctdb_context *ctdb, const char *dir)
100 {
101         ctdb->db_directory = talloc_strdup(ctdb, dir);
102         if (ctdb->db_directory == NULL) {
103                 return -1;
104         }
105         return 0;
106 }
107
108 /*
109   add a node to the list of active nodes
110 */
111 static int ctdb_add_node(struct ctdb_context *ctdb, char *nstr)
112 {
113         struct ctdb_node *node, **nodep;
114
115         nodep = talloc_realloc(ctdb, ctdb->nodes, struct ctdb_node *, ctdb->num_nodes+1);
116         CTDB_NO_MEMORY(ctdb, nodep);
117
118         ctdb->nodes = nodep;
119         nodep = &ctdb->nodes[ctdb->num_nodes];
120         (*nodep) = talloc_zero(ctdb->nodes, struct ctdb_node);
121         CTDB_NO_MEMORY(ctdb, *nodep);
122         node = *nodep;
123
124         if (ctdb_parse_address(ctdb, node, nstr, &node->address) != 0) {
125                 return -1;
126         }
127         node->ctdb = ctdb;
128         node->name = talloc_asprintf(node, "%s:%u", 
129                                      node->address.address, 
130                                      node->address.port);
131         /* this assumes that the nodes are kept in sorted order, and no gaps */
132         node->vnn = ctdb->num_nodes;
133
134         if (ctdb->address.address &&
135             ctdb_same_address(&ctdb->address, &node->address)) {
136                 ctdb->vnn = node->vnn;
137                 node->flags |= NODE_FLAGS_CONNECTED;
138         }
139
140         ctdb->num_nodes++;
141         node->dead_count = 0;
142
143         return 0;
144 }
145
146 /*
147   setup the node list from a file
148 */
149 int ctdb_set_nlist(struct ctdb_context *ctdb, const char *nlist)
150 {
151         char **lines;
152         int nlines;
153         int i;
154
155         talloc_free(ctdb->node_list_file);
156         ctdb->node_list_file = talloc_strdup(ctdb, nlist);
157
158         lines = file_lines_load(nlist, &nlines, ctdb);
159         if (lines == NULL) {
160                 ctdb_set_error(ctdb, "Failed to load nlist '%s'\n", nlist);
161                 return -1;
162         }
163         while (nlines > 0 && strcmp(lines[nlines-1], "") == 0) {
164                 nlines--;
165         }
166
167         for (i=0;i<nlines;i++) {
168                 if (ctdb_add_node(ctdb, lines[i]) != 0) {
169                         talloc_free(lines);
170                         return -1;
171                 }
172         }
173
174         /* initialize the vnn mapping table now that we have num_nodes setup */
175 /*
176 XXX we currently initialize it to the maximum number of nodes to 
177 XXX make it behave the same way as previously.  
178 XXX Once we have recovery working we should initialize this always to 
179 XXX generation==0 (==invalid) and let the recovery tool populate this 
180 XXX table for the daemons. 
181 */
182         ctdb->vnn_map = talloc(ctdb, struct ctdb_vnn_map);
183         CTDB_NO_MEMORY(ctdb, ctdb->vnn_map);
184
185         ctdb->vnn_map->generation = 1;
186         ctdb->vnn_map->size = ctdb->num_nodes;
187         ctdb->vnn_map->map = talloc_array(ctdb->vnn_map, uint32_t, ctdb->vnn_map->size);
188         CTDB_NO_MEMORY(ctdb, ctdb->vnn_map->map);
189
190         for(i=0;i<ctdb->vnn_map->size;i++) {
191                 ctdb->vnn_map->map[i] = i;
192         }
193         
194         talloc_free(lines);
195         return 0;
196 }
197
198
199 /*
200   setup the local node address
201 */
202 int ctdb_set_address(struct ctdb_context *ctdb, const char *address)
203 {
204         if (ctdb_parse_address(ctdb, ctdb, address, &ctdb->address) != 0) {
205                 return -1;
206         }
207         
208         ctdb->name = talloc_asprintf(ctdb, "%s:%u", 
209                                      ctdb->address.address, 
210                                      ctdb->address.port);
211         return 0;
212 }
213
214
215 /*
216   setup the local socket name
217 */
218 int ctdb_set_socketname(struct ctdb_context *ctdb, const char *socketname)
219 {
220         ctdb->daemon.name = talloc_strdup(ctdb, socketname);
221         return 0;
222 }
223 /*
224   return the vnn of this node
225 */
226 uint32_t ctdb_get_vnn(struct ctdb_context *ctdb)
227 {
228         return ctdb->vnn;
229 }
230
231 /*
232   return the number of connected nodes
233 */
234 uint32_t ctdb_get_num_connected_nodes(struct ctdb_context *ctdb)
235 {
236         int i;
237         uint32_t count=0;
238         for (i=0;i<ctdb->vnn_map->size;i++) {
239                 if (ctdb->nodes[ctdb->vnn_map->map[i]]->flags & NODE_FLAGS_CONNECTED) {
240                         count++;
241                 }
242         }
243         return count;
244 }
245
246
247 /*
248   called when we need to process a packet. This can be a requeued packet
249   after a lockwait, or a real packet from another node
250 */
251 void ctdb_input_pkt(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
252 {
253         TALLOC_CTX *tmp_ctx;
254
255         /* place the packet as a child of the tmp_ctx. We then use
256            talloc_free() below to free it. If any of the calls want
257            to keep it, then they will steal it somewhere else, and the
258            talloc_free() will only free the tmp_ctx */
259         tmp_ctx = talloc_new(ctdb);
260         talloc_steal(tmp_ctx, hdr);
261
262         DEBUG(3,(__location__ " ctdb request %u of type %u length %u from "
263                  "node %u to %u\n", hdr->reqid, hdr->operation, hdr->length,
264                  hdr->srcnode, hdr->destnode));
265
266         switch (hdr->operation) {
267         case CTDB_REQ_CALL:
268         case CTDB_REPLY_CALL:
269         case CTDB_REQ_DMASTER:
270         case CTDB_REPLY_DMASTER:
271                 /* for ctdb_call inter-node operations verify that the
272                    remote node that sent us the call is running in the
273                    same generation instance as this node
274                 */
275                 if (ctdb->vnn_map->generation != hdr->generation) {
276                         DEBUG(0,(__location__ " ctdb request %u"
277                                 " length %u from node %u to %u had an"
278                                 " invalid generation id:%u while our"
279                                 " generation id is:%u\n", 
280                                  hdr->reqid, hdr->length, 
281                                  hdr->srcnode, hdr->destnode, 
282                                  hdr->generation, ctdb->vnn_map->generation));
283                         goto done;
284                 }
285         }
286
287         switch (hdr->operation) {
288         case CTDB_REQ_CALL:
289                 ctdb->statistics.node.req_call++;
290                 ctdb_request_call(ctdb, hdr);
291                 break;
292
293         case CTDB_REPLY_CALL:
294                 ctdb->statistics.node.reply_call++;
295                 ctdb_reply_call(ctdb, hdr);
296                 break;
297
298         case CTDB_REPLY_ERROR:
299                 ctdb->statistics.node.reply_error++;
300                 ctdb_reply_error(ctdb, hdr);
301                 break;
302
303         case CTDB_REQ_DMASTER:
304                 ctdb->statistics.node.req_dmaster++;
305                 ctdb_request_dmaster(ctdb, hdr);
306                 break;
307
308         case CTDB_REPLY_DMASTER:
309                 ctdb->statistics.node.reply_dmaster++;
310                 ctdb_reply_dmaster(ctdb, hdr);
311                 break;
312
313         case CTDB_REQ_MESSAGE:
314                 ctdb->statistics.node.req_message++;
315                 ctdb_request_message(ctdb, hdr);
316                 break;
317
318         case CTDB_REQ_FINISHED:
319                 ctdb->statistics.node.req_finished++;
320                 ctdb_request_finished(ctdb, hdr);
321                 break;
322
323         case CTDB_REQ_CONTROL:
324                 ctdb->statistics.node.req_control++;
325                 ctdb_request_control(ctdb, hdr);
326                 break;
327
328         case CTDB_REPLY_CONTROL:
329                 ctdb->statistics.node.reply_control++;
330                 ctdb_reply_control(ctdb, hdr);
331                 break;
332
333         case CTDB_REQ_KEEPALIVE:
334                 ctdb->statistics.keepalive_packets_recv++;
335                 break;
336
337         default:
338                 DEBUG(0,("%s: Packet with unknown operation %u\n", 
339                          __location__, hdr->operation));
340                 break;
341         }
342
343 done:
344         talloc_free(tmp_ctx);
345 }
346
347
348 /*
349   called by the transport layer when a packet comes in
350 */
351 static void ctdb_recv_pkt(struct ctdb_context *ctdb, uint8_t *data, uint32_t length)
352 {
353         struct ctdb_req_header *hdr = (struct ctdb_req_header *)data;
354
355         ctdb->statistics.node_packets_recv++;
356
357         /* up the counter for this source node, so we know its alive */
358         if (ctdb_validate_vnn(ctdb, hdr->srcnode)) {
359                 /* as a special case, redirected calls don't increment the rx_cnt */
360                 if (hdr->operation != CTDB_REQ_CALL ||
361                     ((struct ctdb_req_call *)hdr)->hopcount == 0) {
362                         ctdb->nodes[hdr->srcnode]->rx_cnt++;
363                 }
364         }
365
366         ctdb_input_pkt(ctdb, hdr);
367 }
368
369
370 /*
371   called by the transport layer when a node is dead
372 */
373 void ctdb_node_dead(struct ctdb_node *node)
374 {
375         if (!(node->flags & NODE_FLAGS_CONNECTED)) {
376                 DEBUG(1,("%s: node %s is already marked disconnected: %u connected\n", 
377                          node->ctdb->name, node->name, 
378                          node->ctdb->num_connected));
379                 return;
380         }
381         node->ctdb->num_connected--;
382         node->flags &= ~NODE_FLAGS_CONNECTED;
383         node->rx_cnt = 0;
384         node->dead_count = 0;
385         DEBUG(1,("%s: node %s is dead: %u connected\n", 
386                  node->ctdb->name, node->name, node->ctdb->num_connected));
387         ctdb_daemon_cancel_controls(node->ctdb, node);
388 }
389
390 /*
391   called by the transport layer when a node is connected
392 */
393 void ctdb_node_connected(struct ctdb_node *node)
394 {
395         if (node->flags & NODE_FLAGS_CONNECTED) {
396                 DEBUG(1,("%s: node %s is already marked connected: %u connected\n", 
397                          node->ctdb->name, node->name, 
398                          node->ctdb->num_connected));
399                 return;
400         }
401         node->ctdb->num_connected++;
402         node->dead_count = 0;
403         node->flags |= NODE_FLAGS_CONNECTED;
404         DEBUG(1,("%s: connected to %s - %u connected\n", 
405                  node->ctdb->name, node->name, node->ctdb->num_connected));
406 }
407
408 /*
409   wait for all nodes to be connected
410 */
411 void ctdb_daemon_connect_wait(struct ctdb_context *ctdb)
412 {
413         int expected = ctdb->num_nodes - 1;
414         if (ctdb->flags & CTDB_FLAG_SELF_CONNECT) {
415                 expected++;
416         }
417         while (ctdb->num_connected != expected) {
418                 DEBUG(3,("ctdb_connect_wait: waiting for %u nodes (have %u)\n", 
419                          expected, ctdb->num_connected));
420                 event_loop_once(ctdb->ev);
421         }
422         DEBUG(3,("ctdb_connect_wait: got all %u nodes\n", expected));
423 }
424
425 struct queue_next {
426         struct ctdb_context *ctdb;
427         struct ctdb_req_header *hdr;
428 };
429
430
431 /*
432   trigered when a deferred packet is due
433  */
434 static void queue_next_trigger(struct event_context *ev, struct timed_event *te, 
435                                struct timeval t, void *private_data)
436 {
437         struct queue_next *q = talloc_get_type(private_data, struct queue_next);
438         ctdb_input_pkt(q->ctdb, q->hdr);
439         talloc_free(q);
440 }       
441
442 /*
443   defer a packet, so it is processed on the next event loop
444   this is used for sending packets to ourselves
445  */
446 static void ctdb_defer_packet(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
447 {
448         struct queue_next *q;
449         q = talloc(ctdb, struct queue_next);
450         if (q == NULL) {
451                 DEBUG(0,(__location__ " Failed to allocate deferred packet\n"));
452                 return;
453         }
454         q->ctdb = ctdb;
455         q->hdr = talloc_memdup(ctdb, hdr, hdr->length);
456         if (q->hdr == NULL) {
457                 DEBUG(0,("Error copying deferred packet to self\n"));
458                 return;
459         }
460 #if 0
461         /* use this to put packets directly into our recv function */
462         ctdb_input_pkt(q->ctdb, q->hdr);
463 #else
464         event_add_timed(ctdb->ev, q, timeval_zero(), queue_next_trigger, q);
465 #endif
466 }
467
468
469 /*
470   broadcast a packet to all nodes
471 */
472 static void ctdb_broadcast_packet_all(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
473 {
474         int i;
475         for (i=0;i<ctdb->num_nodes;i++) {
476                 hdr->destnode = ctdb->nodes[i]->vnn;
477                 ctdb_queue_packet(ctdb, hdr);
478         }
479 }
480
481 /*
482   broadcast a packet to all nodes in the current vnnmap
483 */
484 static void ctdb_broadcast_packet_vnnmap(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
485 {
486         int i;
487         for (i=0;i<ctdb->vnn_map->size;i++) {
488                 hdr->destnode = ctdb->vnn_map->map[i];
489                 ctdb_queue_packet(ctdb, hdr);
490         }
491 }
492
493 /*
494   queue a packet or die
495 */
496 void ctdb_queue_packet(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
497 {
498         struct ctdb_node *node;
499
500         switch (hdr->destnode) {
501         case CTDB_BROADCAST_ALL:
502                 ctdb_broadcast_packet_all(ctdb, hdr);
503                 return;
504         case CTDB_BROADCAST_VNNMAP:
505                 ctdb_broadcast_packet_vnnmap(ctdb, hdr);
506                 return;
507         }
508
509         ctdb->statistics.node_packets_sent++;
510
511         if (!ctdb_validate_vnn(ctdb, hdr->destnode)) {
512                 DEBUG(0,(__location__ " cant send to node %u that does not exist\n", 
513                          hdr->destnode));
514                 return;
515         }
516
517         node = ctdb->nodes[hdr->destnode];
518
519         if (hdr->destnode == ctdb->vnn && !(ctdb->flags & CTDB_FLAG_SELF_CONNECT)) {
520                 ctdb_defer_packet(ctdb, hdr);
521         } else {
522                 node->tx_cnt++;
523                 if (ctdb->methods->queue_pkt(node, (uint8_t *)hdr, hdr->length) != 0) {
524                         ctdb_fatal(ctdb, "Unable to queue packet\n");
525                 }
526         }
527 }
528
529
530 static const struct ctdb_upcalls ctdb_upcalls = {
531         .recv_pkt       = ctdb_recv_pkt,
532         .node_dead      = ctdb_node_dead,
533         .node_connected = ctdb_node_connected
534 };
535
536 /*
537   initialise the ctdb daemon. 
538
539   NOTE: In current code the daemon does not fork. This is for testing purposes only
540   and to simplify the code.
541 */
542 struct ctdb_context *ctdb_init(struct event_context *ev)
543 {
544         struct ctdb_context *ctdb;
545
546         ctdb = talloc_zero(ev, struct ctdb_context);
547         ctdb->ev               = ev;
548         ctdb->recovery_mode    = CTDB_RECOVERY_NORMAL;
549         ctdb->recovery_master  = (uint32_t)-1;
550         ctdb->upcalls          = &ctdb_upcalls;
551         ctdb->idr              = idr_init(ctdb);
552         ctdb->max_lacount      = CTDB_DEFAULT_MAX_LACOUNT;
553         ctdb->seqnum_frequency = CTDB_DEFAULT_SEQNUM_FREQUENCY;
554         ctdb->recovery_lock_fd = -1;
555         ctdb->monitoring_mode  = CTDB_MONITORING_ACTIVE;
556
557         return ctdb;
558 }
559