4 Copyright (C) Andrew Tridgell 2007
5 Copyright (C) Ronnie Sahlberg 2007
7 This program is free software; you can redistribute it and/or modify
8 it under the terms of the GNU General Public License as published by
9 the Free Software Foundation; either version 3 of the License, or
10 (at your option) any later version.
12 This program is distributed in the hope that it will be useful,
13 but WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 GNU General Public License for more details.
17 You should have received a copy of the GNU General Public License
18 along with this program; if not, see <http://www.gnu.org/licenses/>.
21 #include "lib/events/events.h"
22 #include "lib/tdb/include/tdb.h"
23 #include "system/time.h"
24 #include "system/network.h"
25 #include "system/filesys.h"
26 #include "system/wait.h"
27 #include "../include/ctdb_private.h"
28 #include "lib/util/dlinklist.h"
32 lock all databases - mark only
34 static int ctdb_lock_all_databases_mark(struct ctdb_context *ctdb)
36 struct ctdb_db_context *ctdb_db;
37 if (ctdb->freeze_mode != CTDB_FREEZE_FROZEN) {
38 DEBUG(DEBUG_ERR,("Attempt to mark all databases locked when not frozen\n"));
41 for (ctdb_db=ctdb->db_list;ctdb_db;ctdb_db=ctdb_db->next) {
42 if (tdb_lockall_mark(ctdb_db->ltdb->tdb) != 0) {
50 lock all databases - unmark only
52 static int ctdb_lock_all_databases_unmark(struct ctdb_context *ctdb)
54 struct ctdb_db_context *ctdb_db;
55 if (ctdb->freeze_mode != CTDB_FREEZE_FROZEN) {
56 DEBUG(DEBUG_ERR,("Attempt to unmark all databases locked when not frozen\n"));
59 for (ctdb_db=ctdb->db_list;ctdb_db;ctdb_db=ctdb_db->next) {
60 if (tdb_lockall_unmark(ctdb_db->ltdb->tdb) != 0) {
69 ctdb_control_getvnnmap(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
71 CHECK_CONTROL_DATA_SIZE(0);
72 struct ctdb_vnn_map_wire *map;
75 len = offsetof(struct ctdb_vnn_map_wire, map) + sizeof(uint32_t)*ctdb->vnn_map->size;
76 map = talloc_size(outdata, len);
77 CTDB_NO_MEMORY(ctdb, map);
79 map->generation = ctdb->vnn_map->generation;
80 map->size = ctdb->vnn_map->size;
81 memcpy(map->map, ctdb->vnn_map->map, sizeof(uint32_t)*map->size);
84 outdata->dptr = (uint8_t *)map;
90 ctdb_control_setvnnmap(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
92 struct ctdb_vnn_map_wire *map = (struct ctdb_vnn_map_wire *)indata.dptr;
94 if (ctdb->freeze_mode != CTDB_FREEZE_FROZEN) {
95 DEBUG(DEBUG_ERR,("Attempt to set vnnmap when not frozen\n"));
99 talloc_free(ctdb->vnn_map);
101 ctdb->vnn_map = talloc(ctdb, struct ctdb_vnn_map);
102 CTDB_NO_MEMORY(ctdb, ctdb->vnn_map);
104 ctdb->vnn_map->generation = map->generation;
105 ctdb->vnn_map->size = map->size;
106 ctdb->vnn_map->map = talloc_array(ctdb->vnn_map, uint32_t, map->size);
107 CTDB_NO_MEMORY(ctdb, ctdb->vnn_map->map);
109 memcpy(ctdb->vnn_map->map, map->map, sizeof(uint32_t)*map->size);
115 ctdb_control_getdbmap(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
118 struct ctdb_db_context *ctdb_db;
119 struct ctdb_dbid_map *dbid_map;
121 CHECK_CONTROL_DATA_SIZE(0);
124 for(ctdb_db=ctdb->db_list;ctdb_db;ctdb_db=ctdb_db->next){
129 outdata->dsize = offsetof(struct ctdb_dbid_map, dbs) + sizeof(dbid_map->dbs[0])*len;
130 outdata->dptr = (unsigned char *)talloc_zero_size(outdata, outdata->dsize);
131 if (!outdata->dptr) {
132 DEBUG(DEBUG_ALERT, (__location__ " Failed to allocate dbmap array\n"));
136 dbid_map = (struct ctdb_dbid_map *)outdata->dptr;
138 for (i=0,ctdb_db=ctdb->db_list;ctdb_db;i++,ctdb_db=ctdb_db->next){
139 dbid_map->dbs[i].dbid = ctdb_db->db_id;
140 dbid_map->dbs[i].persistent = ctdb_db->persistent;
147 ctdb_control_getnodemap(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
149 uint32_t i, num_nodes;
150 struct ctdb_node_map *node_map;
152 CHECK_CONTROL_DATA_SIZE(0);
154 num_nodes = ctdb->num_nodes;
156 outdata->dsize = offsetof(struct ctdb_node_map, nodes) + num_nodes*sizeof(struct ctdb_node_and_flags);
157 outdata->dptr = (unsigned char *)talloc_zero_size(outdata, outdata->dsize);
158 if (!outdata->dptr) {
159 DEBUG(DEBUG_ALERT, (__location__ " Failed to allocate nodemap array\n"));
163 node_map = (struct ctdb_node_map *)outdata->dptr;
164 node_map->num = num_nodes;
165 for (i=0; i<num_nodes; i++) {
166 if (parse_ip(ctdb->nodes[i]->address.address,
167 NULL, /* TODO: pass in the correct interface here*/
169 &node_map->nodes[i].addr) == 0)
171 DEBUG(DEBUG_ERR, (__location__ " Failed to parse %s into a sockaddr\n", ctdb->nodes[i]->address.address));
174 node_map->nodes[i].pnn = ctdb->nodes[i]->pnn;
175 node_map->nodes[i].flags = ctdb->nodes[i]->flags;
182 get an old style ipv4-only nodemap
185 ctdb_control_getnodemapv4(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
187 uint32_t i, num_nodes;
188 struct ctdb_node_mapv4 *node_map;
190 CHECK_CONTROL_DATA_SIZE(0);
192 num_nodes = ctdb->num_nodes;
194 outdata->dsize = offsetof(struct ctdb_node_mapv4, nodes) + num_nodes*sizeof(struct ctdb_node_and_flagsv4);
195 outdata->dptr = (unsigned char *)talloc_zero_size(outdata, outdata->dsize);
196 if (!outdata->dptr) {
197 DEBUG(DEBUG_ALERT, (__location__ " Failed to allocate nodemap array\n"));
201 node_map = (struct ctdb_node_mapv4 *)outdata->dptr;
202 node_map->num = num_nodes;
203 for (i=0; i<num_nodes; i++) {
204 if (parse_ipv4(ctdb->nodes[i]->address.address, 0, &node_map->nodes[i].sin) == 0) {
205 DEBUG(DEBUG_ERR, (__location__ " Failed to parse %s into a sockaddr\n", ctdb->nodes[i]->address.address));
209 node_map->nodes[i].pnn = ctdb->nodes[i]->pnn;
210 node_map->nodes[i].flags = ctdb->nodes[i]->flags;
217 ctdb_reload_nodes_event(struct event_context *ev, struct timed_event *te,
218 struct timeval t, void *private_data)
221 struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
223 struct ctdb_node **nodes;
225 tmp_ctx = talloc_new(ctdb);
227 /* steal the old nodes file for a while */
228 talloc_steal(tmp_ctx, ctdb->nodes);
231 num_nodes = ctdb->num_nodes;
234 /* load the new nodes file */
235 ctdb_load_nodes_file(ctdb);
237 for (i=0; i<ctdb->num_nodes; i++) {
238 /* keep any identical pre-existing nodes and connections */
239 if ((i < num_nodes) && ctdb_same_address(&ctdb->nodes[i]->address, &nodes[i]->address)) {
240 talloc_free(ctdb->nodes[i]);
241 ctdb->nodes[i] = talloc_steal(ctdb->nodes, nodes[i]);
245 /* any new or different nodes must be added */
246 if (ctdb->methods->add_node(ctdb->nodes[i]) != 0) {
247 DEBUG(DEBUG_CRIT, (__location__ " methods->add_node failed at %d\n", i));
248 ctdb_fatal(ctdb, "failed to add node. shutting down\n");
250 if (ctdb->methods->connect_node(ctdb->nodes[i]) != 0) {
251 DEBUG(DEBUG_CRIT, (__location__ " methods->add_connect failed at %d\n", i));
252 ctdb_fatal(ctdb, "failed to connect to node. shutting down\n");
256 talloc_free(tmp_ctx);
261 reload the nodes file after a short delay (so that we can send the response
265 ctdb_control_reload_nodes_file(struct ctdb_context *ctdb, uint32_t opcode)
267 event_add_timed(ctdb->ev, ctdb, timeval_current_ofs(1,0), ctdb_reload_nodes_event, ctdb);
273 a traverse function for pulling all relevent records from pulldb
276 struct ctdb_context *ctdb;
277 struct ctdb_marshall_buffer *pulldata;
282 static int traverse_pulldb(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p)
284 struct pulldb_data *params = (struct pulldb_data *)p;
285 struct ctdb_rec_data *rec;
287 /* add the record to the blob */
288 rec = ctdb_marshall_record(params->pulldata, 0, key, NULL, data);
290 params->failed = true;
293 params->pulldata = talloc_realloc_size(NULL, params->pulldata, rec->length + params->len);
294 if (params->pulldata == NULL) {
295 DEBUG(DEBUG_ERR,(__location__ " Failed to expand pulldb_data to %u (%u records)\n",
296 rec->length + params->len, params->pulldata->count));
297 params->failed = true;
300 params->pulldata->count++;
301 memcpy(params->len+(uint8_t *)params->pulldata, rec, rec->length);
302 params->len += rec->length;
309 pul a bunch of records from a ltdb, filtering by lmaster
311 int32_t ctdb_control_pull_db(struct ctdb_context *ctdb, TDB_DATA indata, TDB_DATA *outdata)
313 struct ctdb_control_pulldb *pull;
314 struct ctdb_db_context *ctdb_db;
315 struct pulldb_data params;
316 struct ctdb_marshall_buffer *reply;
318 if (ctdb->freeze_mode != CTDB_FREEZE_FROZEN) {
319 DEBUG(DEBUG_DEBUG,("rejecting ctdb_control_pull_db when not frozen\n"));
323 pull = (struct ctdb_control_pulldb *)indata.dptr;
325 ctdb_db = find_ctdb_db(ctdb, pull->db_id);
327 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", pull->db_id));
331 reply = talloc_zero(outdata, struct ctdb_marshall_buffer);
332 CTDB_NO_MEMORY(ctdb, reply);
334 reply->db_id = pull->db_id;
337 params.pulldata = reply;
338 params.len = offsetof(struct ctdb_marshall_buffer, data);
339 params.failed = false;
341 if (ctdb_lock_all_databases_mark(ctdb) != 0) {
342 DEBUG(DEBUG_ERR,(__location__ " Failed to get lock on entired db - failing\n"));
346 if (tdb_traverse_read(ctdb_db->ltdb->tdb, traverse_pulldb, ¶ms) == -1) {
347 DEBUG(DEBUG_ERR,(__location__ " Failed to get traverse db '%s'\n", ctdb_db->db_name));
348 ctdb_lock_all_databases_unmark(ctdb);
349 talloc_free(params.pulldata);
353 ctdb_lock_all_databases_unmark(ctdb);
355 outdata->dptr = (uint8_t *)params.pulldata;
356 outdata->dsize = params.len;
362 push a bunch of records into a ltdb, filtering by rsn
364 int32_t ctdb_control_push_db(struct ctdb_context *ctdb, TDB_DATA indata)
366 struct ctdb_marshall_buffer *reply = (struct ctdb_marshall_buffer *)indata.dptr;
367 struct ctdb_db_context *ctdb_db;
369 struct ctdb_rec_data *rec;
371 if (ctdb->freeze_mode != CTDB_FREEZE_FROZEN) {
372 DEBUG(DEBUG_DEBUG,("rejecting ctdb_control_push_db when not frozen\n"));
376 if (indata.dsize < offsetof(struct ctdb_marshall_buffer, data)) {
377 DEBUG(DEBUG_ERR,(__location__ " invalid data in pulldb reply\n"));
381 ctdb_db = find_ctdb_db(ctdb, reply->db_id);
383 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", reply->db_id));
387 if (ctdb_lock_all_databases_mark(ctdb) != 0) {
388 DEBUG(DEBUG_ERR,(__location__ " Failed to get lock on entired db - failing\n"));
392 rec = (struct ctdb_rec_data *)&reply->data[0];
394 DEBUG(DEBUG_INFO,("starting push of %u records for dbid 0x%x\n",
395 reply->count, reply->db_id));
397 for (i=0;i<reply->count;i++) {
399 struct ctdb_ltdb_header *hdr;
401 key.dptr = &rec->data[0];
402 key.dsize = rec->keylen;
403 data.dptr = &rec->data[key.dsize];
404 data.dsize = rec->datalen;
406 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
407 DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record\n"));
410 hdr = (struct ctdb_ltdb_header *)data.dptr;
411 data.dptr += sizeof(*hdr);
412 data.dsize -= sizeof(*hdr);
414 ret = ctdb_ltdb_store(ctdb_db, key, hdr, data);
416 DEBUG(DEBUG_CRIT, (__location__ " Unable to store record\n"));
420 rec = (struct ctdb_rec_data *)(rec->length + (uint8_t *)rec);
423 DEBUG(DEBUG_DEBUG,("finished push of %u records for dbid 0x%x\n",
424 reply->count, reply->db_id));
426 ctdb_lock_all_databases_unmark(ctdb);
430 ctdb_lock_all_databases_unmark(ctdb);
435 static int traverse_setdmaster(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p)
437 uint32_t *dmaster = (uint32_t *)p;
438 struct ctdb_ltdb_header *header = (struct ctdb_ltdb_header *)data.dptr;
441 /* skip if already correct */
442 if (header->dmaster == *dmaster) {
446 header->dmaster = *dmaster;
448 ret = tdb_store(tdb, key, data, TDB_REPLACE);
450 DEBUG(DEBUG_CRIT,(__location__ " failed to write tdb data back ret:%d\n",ret));
454 /* TODO: add error checking here */
459 int32_t ctdb_control_set_dmaster(struct ctdb_context *ctdb, TDB_DATA indata)
461 struct ctdb_control_set_dmaster *p = (struct ctdb_control_set_dmaster *)indata.dptr;
462 struct ctdb_db_context *ctdb_db;
464 if (ctdb->freeze_mode != CTDB_FREEZE_FROZEN) {
465 DEBUG(DEBUG_DEBUG,("rejecting ctdb_control_set_dmaster when not frozen\n"));
469 ctdb_db = find_ctdb_db(ctdb, p->db_id);
471 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", p->db_id));
475 if (ctdb_lock_all_databases_mark(ctdb) != 0) {
476 DEBUG(DEBUG_ERR,(__location__ " Failed to get lock on entired db - failing\n"));
480 tdb_traverse(ctdb_db->ltdb->tdb, traverse_setdmaster, &p->dmaster);
482 ctdb_lock_all_databases_unmark(ctdb);
487 struct ctdb_set_recmode_state {
488 struct ctdb_context *ctdb;
489 struct ctdb_req_control *c;
492 struct timed_event *te;
493 struct fd_event *fde;
498 called if our set_recmode child times out. this would happen if
499 ctdb_recovery_lock() would block.
501 static void ctdb_set_recmode_timeout(struct event_context *ev, struct timed_event *te,
502 struct timeval t, void *private_data)
504 struct ctdb_set_recmode_state *state = talloc_get_type(private_data,
505 struct ctdb_set_recmode_state);
507 /* we consider this a success, not a failure, as we failed to
508 set the recovery lock which is what we wanted. This can be
509 caused by the cluster filesystem being very slow to
510 arbitrate locks immediately after a node failure.
512 DEBUG(DEBUG_NOTICE,(__location__ " set_recmode timeout - allowing recmode set\n"));
513 state->ctdb->recovery_mode = state->recmode;
514 ctdb_request_control_reply(state->ctdb, state->c, NULL, 0, NULL);
519 /* when we free the recmode state we must kill any child process.
521 static int set_recmode_destructor(struct ctdb_set_recmode_state *state)
523 kill(state->child, SIGKILL);
527 /* this is called when the client process has completed ctdb_recovery_lock()
528 and has written data back to us through the pipe.
530 static void set_recmode_handler(struct event_context *ev, struct fd_event *fde,
531 uint16_t flags, void *private_data)
533 struct ctdb_set_recmode_state *state= talloc_get_type(private_data,
534 struct ctdb_set_recmode_state);
538 /* we got a response from our child process so we can abort the
541 talloc_free(state->te);
545 /* read the childs status when trying to lock the reclock file.
546 child wrote 0 if everything is fine and 1 if it did manage
547 to lock the file, which would be a problem since that means
548 we got a request to exit from recovery but we could still lock
549 the file which at this time SHOULD be locked by the recovery
550 daemon on the recmaster
552 ret = read(state->fd[0], &c, 1);
553 if (ret != 1 || c != 0) {
554 ctdb_request_control_reply(state->ctdb, state->c, NULL, -1, "managed to lock reclock file from inside daemon");
559 state->ctdb->recovery_mode = state->recmode;
561 ctdb_request_control_reply(state->ctdb, state->c, NULL, 0, NULL);
567 ctdb_drop_all_ips_event(struct event_context *ev, struct timed_event *te,
568 struct timeval t, void *private_data)
570 struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
572 DEBUG(DEBUG_INFO,(__location__ " Been in recovery mode for too long. Dropping all IPS\n"));
573 talloc_free(ctdb->release_ips_ctx);
574 ctdb->release_ips_ctx = NULL;
576 ctdb_release_all_ips(ctdb);
580 set the recovery mode
582 int32_t ctdb_control_set_recmode(struct ctdb_context *ctdb,
583 struct ctdb_req_control *c,
584 TDB_DATA indata, bool *async_reply,
585 const char **errormsg)
587 uint32_t recmode = *(uint32_t *)indata.dptr;
589 struct ctdb_set_recmode_state *state;
590 pid_t parent = getpid();
592 /* if we enter recovery but stay in recovery for too long
593 we will eventually drop all our ip addresses
595 if (recmode == CTDB_RECOVERY_NORMAL) {
596 talloc_free(ctdb->release_ips_ctx);
597 ctdb->release_ips_ctx = NULL;
599 talloc_free(ctdb->release_ips_ctx);
600 ctdb->release_ips_ctx = talloc_new(ctdb);
601 CTDB_NO_MEMORY(ctdb, ctdb->release_ips_ctx);
603 event_add_timed(ctdb->ev, ctdb->release_ips_ctx, timeval_current_ofs(5,0), ctdb_drop_all_ips_event, ctdb);
607 if (ctdb->freeze_mode != CTDB_FREEZE_FROZEN) {
608 DEBUG(DEBUG_ERR,("Attempt to change recovery mode to %u when not frozen\n",
610 (*errormsg) = "Cannot change recovery mode while not frozen";
614 if (recmode != ctdb->recovery_mode) {
615 DEBUG(DEBUG_NOTICE,(__location__ " Recovery mode set to %s\n",
616 recmode==CTDB_RECOVERY_NORMAL?"NORMAL":"ACTIVE"));
619 if (recmode != CTDB_RECOVERY_NORMAL ||
620 ctdb->recovery_mode != CTDB_RECOVERY_ACTIVE) {
621 ctdb->recovery_mode = recmode;
625 /* some special handling when ending recovery mode */
627 /* force the databased to thaw */
628 if (ctdb->freeze_handle) {
629 ctdb_control_thaw(ctdb);
632 state = talloc(ctdb, struct ctdb_set_recmode_state);
633 CTDB_NO_MEMORY(ctdb, state);
635 /* For the rest of what needs to be done, we need to do this in
636 a child process since
637 1, the call to ctdb_recovery_lock() can block if the cluster
638 filesystem is in the process of recovery.
639 2, running of the script may take a while.
641 ret = pipe(state->fd);
644 DEBUG(DEBUG_CRIT,(__location__ " Failed to open pipe for set_recmode child\n"));
648 state->child = fork();
649 if (state->child == (pid_t)-1) {
656 if (state->child == 0) {
660 /* we should not be able to get the lock on the nodes list,
661 as it should be held by the recovery master
663 if (ctdb_recovery_lock(ctdb, false)) {
664 DEBUG(DEBUG_CRIT,("ERROR: recovery lock file %s not locked when recovering!\n", ctdb->recovery_lock_file));
668 write(state->fd[1], &cc, 1);
669 /* make sure we die when our parent dies */
670 while (kill(parent, 0) == 0 || errno != ESRCH) {
677 talloc_set_destructor(state, set_recmode_destructor);
679 state->te = event_add_timed(ctdb->ev, state, timeval_current_ofs(3, 0),
680 ctdb_set_recmode_timeout, state);
682 state->fde = event_add_fd(ctdb->ev, state, state->fd[0],
683 EVENT_FD_READ|EVENT_FD_AUTOCLOSE,
686 if (state->fde == NULL) {
692 state->recmode = recmode;
693 state->c = talloc_steal(state, c);
702 try and get the recovery lock in shared storage - should only work
703 on the recovery master recovery daemon. Anywhere else is a bug
705 bool ctdb_recovery_lock(struct ctdb_context *ctdb, bool keep)
709 if (ctdb->recovery_lock_fd != -1) {
710 close(ctdb->recovery_lock_fd);
712 ctdb->recovery_lock_fd = open(ctdb->recovery_lock_file, O_RDWR|O_CREAT, 0600);
713 if (ctdb->recovery_lock_fd == -1) {
714 DEBUG(DEBUG_ERR,("ctdb_recovery_lock: Unable to open %s - (%s)\n",
715 ctdb->recovery_lock_file, strerror(errno)));
719 set_close_on_exec(ctdb->recovery_lock_fd);
721 lock.l_type = F_WRLCK;
722 lock.l_whence = SEEK_SET;
727 if (fcntl(ctdb->recovery_lock_fd, F_SETLK, &lock) != 0) {
728 close(ctdb->recovery_lock_fd);
729 ctdb->recovery_lock_fd = -1;
731 DEBUG(DEBUG_CRIT,("ctdb_recovery_lock: Failed to get recovery lock on '%s'\n", ctdb->recovery_lock_file));
737 close(ctdb->recovery_lock_fd);
738 ctdb->recovery_lock_fd = -1;
741 DEBUG(DEBUG_NOTICE,("ctdb_recovery_lock: Got recovery lock on '%s'\n", ctdb->recovery_lock_file));
747 delete a record as part of the vacuum process
748 only delete if we are not lmaster or dmaster, and our rsn is <= the provided rsn
749 use non-blocking locks
751 return 0 if the record was successfully deleted (i.e. it does not exist
752 when the function returns)
753 or !0 is the record still exists in the tdb after returning.
755 static int delete_tdb_record(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, struct ctdb_rec_data *rec)
758 struct ctdb_ltdb_header *hdr, *hdr2;
760 /* these are really internal tdb functions - but we need them here for
761 non-blocking lock of the freelist */
762 int tdb_lock_nonblock(struct tdb_context *tdb, int list, int ltype);
763 int tdb_unlock(struct tdb_context *tdb, int list, int ltype);
766 key.dsize = rec->keylen;
767 key.dptr = &rec->data[0];
768 data.dsize = rec->datalen;
769 data.dptr = &rec->data[rec->keylen];
771 if (ctdb_lmaster(ctdb, &key) == ctdb->pnn) {
772 DEBUG(DEBUG_INFO,(__location__ " Called delete on record where we are lmaster\n"));
776 if (data.dsize != sizeof(struct ctdb_ltdb_header)) {
777 DEBUG(DEBUG_ERR,(__location__ " Bad record size\n"));
781 hdr = (struct ctdb_ltdb_header *)data.dptr;
783 /* use a non-blocking lock */
784 if (tdb_chainlock_nonblock(ctdb_db->ltdb->tdb, key) != 0) {
788 data = tdb_fetch(ctdb_db->ltdb->tdb, key);
789 if (data.dptr == NULL) {
790 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
794 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
795 if (tdb_lock_nonblock(ctdb_db->ltdb->tdb, -1, F_WRLCK) == 0) {
796 tdb_delete(ctdb_db->ltdb->tdb, key);
797 tdb_unlock(ctdb_db->ltdb->tdb, -1, F_WRLCK);
798 DEBUG(DEBUG_CRIT,(__location__ " Deleted corrupt record\n"));
800 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
805 hdr2 = (struct ctdb_ltdb_header *)data.dptr;
807 if (hdr2->rsn > hdr->rsn) {
808 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
809 DEBUG(DEBUG_INFO,(__location__ " Skipping record with rsn=%llu - called with rsn=%llu\n",
810 (unsigned long long)hdr2->rsn, (unsigned long long)hdr->rsn));
815 if (hdr2->dmaster == ctdb->pnn) {
816 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
817 DEBUG(DEBUG_INFO,(__location__ " Attempted delete record where we are the dmaster\n"));
822 if (tdb_lock_nonblock(ctdb_db->ltdb->tdb, -1, F_WRLCK) != 0) {
823 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
828 if (tdb_delete(ctdb_db->ltdb->tdb, key) != 0) {
829 tdb_unlock(ctdb_db->ltdb->tdb, -1, F_WRLCK);
830 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
831 DEBUG(DEBUG_INFO,(__location__ " Failed to delete record\n"));
836 tdb_unlock(ctdb_db->ltdb->tdb, -1, F_WRLCK);
837 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
844 struct recovery_callback_state {
845 struct ctdb_req_control *c;
850 called when the 'recovered' event script has finished
852 static void ctdb_end_recovery_callback(struct ctdb_context *ctdb, int status, void *p)
854 struct recovery_callback_state *state = talloc_get_type(p, struct recovery_callback_state);
856 ctdb_enable_monitoring(ctdb);
859 DEBUG(DEBUG_ERR,(__location__ " recovered event script failed (status %d)\n", status));
862 ctdb_request_control_reply(ctdb, state->c, NULL, status, NULL);
865 gettimeofday(&ctdb->last_recovery_finished, NULL);
869 recovery has finished
871 int32_t ctdb_control_end_recovery(struct ctdb_context *ctdb,
872 struct ctdb_req_control *c,
876 struct recovery_callback_state *state;
878 DEBUG(DEBUG_NOTICE,("Recovery has finished\n"));
880 state = talloc(ctdb, struct recovery_callback_state);
881 CTDB_NO_MEMORY(ctdb, state);
883 state->c = talloc_steal(state, c);
885 ctdb_disable_monitoring(ctdb);
887 ret = ctdb_event_script_callback(ctdb,
888 timeval_current_ofs(ctdb->tunable.script_timeout, 0),
890 ctdb_end_recovery_callback,
894 ctdb_enable_monitoring(ctdb);
896 DEBUG(DEBUG_ERR,(__location__ " Failed to end recovery\n"));
901 /* tell the control that we will be reply asynchronously */
907 called when the 'startrecovery' event script has finished
909 static void ctdb_start_recovery_callback(struct ctdb_context *ctdb, int status, void *p)
911 struct recovery_callback_state *state = talloc_get_type(p, struct recovery_callback_state);
914 DEBUG(DEBUG_ERR,(__location__ " startrecovery event script failed (status %d)\n", status));
917 ctdb_request_control_reply(ctdb, state->c, NULL, status, NULL);
922 run the startrecovery eventscript
924 int32_t ctdb_control_start_recovery(struct ctdb_context *ctdb,
925 struct ctdb_req_control *c,
929 struct recovery_callback_state *state;
931 DEBUG(DEBUG_NOTICE,(__location__ " startrecovery eventscript has been invoked\n"));
932 gettimeofday(&ctdb->last_recovery_started, NULL);
934 state = talloc(ctdb, struct recovery_callback_state);
935 CTDB_NO_MEMORY(ctdb, state);
937 state->c = talloc_steal(state, c);
939 ctdb_disable_monitoring(ctdb);
941 ret = ctdb_event_script_callback(ctdb,
942 timeval_current_ofs(ctdb->tunable.script_timeout, 0),
944 ctdb_start_recovery_callback,
945 state, "startrecovery");
948 DEBUG(DEBUG_ERR,(__location__ " Failed to start recovery\n"));
953 /* tell the control that we will be reply asynchronously */
959 try to delete all these records as part of the vacuuming process
960 and return the records we failed to delete
962 int32_t ctdb_control_try_delete_records(struct ctdb_context *ctdb, TDB_DATA indata, TDB_DATA *outdata)
964 struct ctdb_marshall_buffer *reply = (struct ctdb_marshall_buffer *)indata.dptr;
965 struct ctdb_db_context *ctdb_db;
967 struct ctdb_rec_data *rec;
968 struct ctdb_marshall_buffer *records;
970 if (indata.dsize < offsetof(struct ctdb_marshall_buffer, data)) {
971 DEBUG(DEBUG_ERR,(__location__ " invalid data in try_delete_records\n"));
975 ctdb_db = find_ctdb_db(ctdb, reply->db_id);
977 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", reply->db_id));
982 DEBUG(DEBUG_DEBUG,("starting try_delete_records of %u records for dbid 0x%x\n",
983 reply->count, reply->db_id));
986 /* create a blob to send back the records we couldnt delete */
987 records = (struct ctdb_marshall_buffer *)
988 talloc_zero_size(outdata,
989 offsetof(struct ctdb_marshall_buffer, data));
990 if (records == NULL) {
991 DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
994 records->db_id = ctdb_db->db_id;
997 rec = (struct ctdb_rec_data *)&reply->data[0];
998 for (i=0;i<reply->count;i++) {
1001 key.dptr = &rec->data[0];
1002 key.dsize = rec->keylen;
1003 data.dptr = &rec->data[key.dsize];
1004 data.dsize = rec->datalen;
1006 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
1007 DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record in indata\n"));
1011 /* If we cant delete the record we must add it to the reply
1012 so the lmaster knows it may not purge this record
1014 if (delete_tdb_record(ctdb, ctdb_db, rec) != 0) {
1016 struct ctdb_ltdb_header *hdr;
1018 hdr = (struct ctdb_ltdb_header *)data.dptr;
1019 data.dptr += sizeof(*hdr);
1020 data.dsize -= sizeof(*hdr);
1022 DEBUG(DEBUG_INFO, (__location__ " Failed to vacuum delete record with hash 0x%08x\n", ctdb_hash(&key)));
1024 old_size = talloc_get_size(records);
1025 records = talloc_realloc_size(outdata, records, old_size + rec->length);
1026 if (records == NULL) {
1027 DEBUG(DEBUG_ERR,(__location__ " Failed to expand\n"));
1031 memcpy(old_size+(uint8_t *)records, rec, rec->length);
1034 rec = (struct ctdb_rec_data *)(rec->length + (uint8_t *)rec);
1038 outdata->dptr = (uint8_t *)records;
1039 outdata->dsize = talloc_get_size(records);
1047 int32_t ctdb_control_get_capabilities(struct ctdb_context *ctdb, TDB_DATA *outdata)
1049 uint32_t *capabilities = NULL;
1051 capabilities = talloc(outdata, uint32_t);
1052 CTDB_NO_MEMORY(ctdb, capabilities);
1053 *capabilities = ctdb->capabilities;
1055 outdata->dsize = sizeof(uint32_t);
1056 outdata->dptr = (uint8_t *)capabilities;
1061 static void ctdb_recd_ping_timeout(struct event_context *ev, struct timed_event *te, struct timeval t, void *p)
1063 struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context);
1064 uint32_t *count = talloc_get_type(ctdb->recd_ping_count, uint32_t);
1066 DEBUG(DEBUG_ERR, (__location__ " Recovery daemon ping timeout. Count : %u\n", *count));
1068 if (*count < ctdb->tunable.recd_ping_failcount) {
1070 event_add_timed(ctdb->ev, ctdb->recd_ping_count,
1071 timeval_current_ofs(ctdb->tunable.recd_ping_timeout, 0),
1072 ctdb_recd_ping_timeout, ctdb);
1076 DEBUG(DEBUG_ERR, (__location__ " Final timeout for recovery daemon ping. Shutting down ctdb daemon\n"));
1078 ctdb_stop_recoverd(ctdb);
1079 ctdb_stop_keepalive(ctdb);
1080 ctdb_stop_monitoring(ctdb);
1081 ctdb_release_all_ips(ctdb);
1082 if (ctdb->methods != NULL) {
1083 ctdb->methods->shutdown(ctdb);
1085 ctdb_event_script(ctdb, "shutdown");
1086 DEBUG(DEBUG_ERR, (__location__ " Recovery daemon ping timeout. Daemon has been shut down.\n"));
1090 /* The recovery daemon will ping us at regular intervals.
1091 If we havent been pinged for a while we assume the recovery
1092 daemon is inoperable and we shut down.
1094 int32_t ctdb_control_recd_ping(struct ctdb_context *ctdb)
1096 talloc_free(ctdb->recd_ping_count);
1098 ctdb->recd_ping_count = talloc_zero(ctdb, uint32_t);
1099 CTDB_NO_MEMORY(ctdb, ctdb->recd_ping_count);
1101 if (ctdb->tunable.recd_ping_timeout != 0) {
1102 event_add_timed(ctdb->ev, ctdb->recd_ping_count,
1103 timeval_current_ofs(ctdb->tunable.recd_ping_timeout, 0),
1104 ctdb_recd_ping_timeout, ctdb);
1112 int32_t ctdb_control_set_recmaster(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata)
1114 CHECK_CONTROL_DATA_SIZE(sizeof(uint32_t));
1116 ctdb->recovery_master = ((uint32_t *)(&indata.dptr[0]))[0];