4 Copyright (C) Andrew Tridgell 2007
5 Copyright (C) Ronnie Sahlberg 2007
7 This program is free software; you can redistribute it and/or modify
8 it under the terms of the GNU General Public License as published by
9 the Free Software Foundation; either version 3 of the License, or
10 (at your option) any later version.
12 This program is distributed in the hope that it will be useful,
13 but WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 GNU General Public License for more details.
17 You should have received a copy of the GNU General Public License
18 along with this program; if not, see <http://www.gnu.org/licenses/>.
21 #include "lib/events/events.h"
22 #include "lib/tdb/include/tdb.h"
23 #include "system/time.h"
24 #include "system/network.h"
25 #include "system/filesys.h"
26 #include "system/wait.h"
27 #include "../include/ctdb_private.h"
28 #include "lib/util/dlinklist.h"
32 lock all databases - mark only
34 static int ctdb_lock_all_databases_mark(struct ctdb_context *ctdb)
36 struct ctdb_db_context *ctdb_db;
37 if (ctdb->freeze_mode != CTDB_FREEZE_FROZEN) {
38 DEBUG(DEBUG_ERR,("Attempt to mark all databases locked when not frozen\n"));
41 for (ctdb_db=ctdb->db_list;ctdb_db;ctdb_db=ctdb_db->next) {
42 if (tdb_lockall_mark(ctdb_db->ltdb->tdb) != 0) {
50 lock all databases - unmark only
52 static int ctdb_lock_all_databases_unmark(struct ctdb_context *ctdb)
54 struct ctdb_db_context *ctdb_db;
55 if (ctdb->freeze_mode != CTDB_FREEZE_FROZEN) {
56 DEBUG(DEBUG_ERR,("Attempt to unmark all databases locked when not frozen\n"));
59 for (ctdb_db=ctdb->db_list;ctdb_db;ctdb_db=ctdb_db->next) {
60 if (tdb_lockall_unmark(ctdb_db->ltdb->tdb) != 0) {
69 ctdb_control_getvnnmap(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
71 CHECK_CONTROL_DATA_SIZE(0);
72 struct ctdb_vnn_map_wire *map;
75 len = offsetof(struct ctdb_vnn_map_wire, map) + sizeof(uint32_t)*ctdb->vnn_map->size;
76 map = talloc_size(outdata, len);
77 CTDB_NO_MEMORY(ctdb, map);
79 map->generation = ctdb->vnn_map->generation;
80 map->size = ctdb->vnn_map->size;
81 memcpy(map->map, ctdb->vnn_map->map, sizeof(uint32_t)*map->size);
84 outdata->dptr = (uint8_t *)map;
90 ctdb_control_setvnnmap(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
92 struct ctdb_vnn_map_wire *map = (struct ctdb_vnn_map_wire *)indata.dptr;
94 if (ctdb->freeze_mode != CTDB_FREEZE_FROZEN) {
95 DEBUG(DEBUG_ERR,("Attempt to set vnnmap when not frozen\n"));
99 talloc_free(ctdb->vnn_map);
101 ctdb->vnn_map = talloc(ctdb, struct ctdb_vnn_map);
102 CTDB_NO_MEMORY(ctdb, ctdb->vnn_map);
104 ctdb->vnn_map->generation = map->generation;
105 ctdb->vnn_map->size = map->size;
106 ctdb->vnn_map->map = talloc_array(ctdb->vnn_map, uint32_t, map->size);
107 CTDB_NO_MEMORY(ctdb, ctdb->vnn_map->map);
109 memcpy(ctdb->vnn_map->map, map->map, sizeof(uint32_t)*map->size);
115 ctdb_control_getdbmap(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
118 struct ctdb_db_context *ctdb_db;
119 struct ctdb_dbid_map *dbid_map;
121 CHECK_CONTROL_DATA_SIZE(0);
124 for(ctdb_db=ctdb->db_list;ctdb_db;ctdb_db=ctdb_db->next){
129 outdata->dsize = offsetof(struct ctdb_dbid_map, dbs) + sizeof(dbid_map->dbs[0])*len;
130 outdata->dptr = (unsigned char *)talloc_zero_size(outdata, outdata->dsize);
131 if (!outdata->dptr) {
132 DEBUG(DEBUG_ALERT, (__location__ " Failed to allocate dbmap array\n"));
136 dbid_map = (struct ctdb_dbid_map *)outdata->dptr;
138 for (i=0,ctdb_db=ctdb->db_list;ctdb_db;i++,ctdb_db=ctdb_db->next){
139 dbid_map->dbs[i].dbid = ctdb_db->db_id;
140 dbid_map->dbs[i].persistent = ctdb_db->persistent;
147 ctdb_control_getnodemap(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
149 uint32_t i, num_nodes;
150 struct ctdb_node_map *node_map;
152 CHECK_CONTROL_DATA_SIZE(0);
154 num_nodes = ctdb->num_nodes;
156 outdata->dsize = offsetof(struct ctdb_node_map, nodes) + num_nodes*sizeof(struct ctdb_node_and_flags);
157 outdata->dptr = (unsigned char *)talloc_zero_size(outdata, outdata->dsize);
158 if (!outdata->dptr) {
159 DEBUG(DEBUG_ALERT, (__location__ " Failed to allocate nodemap array\n"));
163 node_map = (struct ctdb_node_map *)outdata->dptr;
164 node_map->num = num_nodes;
165 for (i=0; i<num_nodes; i++) {
166 if (parse_ip(ctdb->nodes[i]->address.address,
167 NULL, /* TODO: pass in the correct interface here*/
169 &node_map->nodes[i].addr) == 0)
171 DEBUG(DEBUG_ERR, (__location__ " Failed to parse %s into a sockaddr\n", ctdb->nodes[i]->address.address));
174 node_map->nodes[i].pnn = ctdb->nodes[i]->pnn;
175 node_map->nodes[i].flags = ctdb->nodes[i]->flags;
182 get an old style ipv4-only nodemap
185 ctdb_control_getnodemapv4(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
187 uint32_t i, num_nodes;
188 struct ctdb_node_mapv4 *node_map;
190 CHECK_CONTROL_DATA_SIZE(0);
192 num_nodes = ctdb->num_nodes;
194 outdata->dsize = offsetof(struct ctdb_node_mapv4, nodes) + num_nodes*sizeof(struct ctdb_node_and_flagsv4);
195 outdata->dptr = (unsigned char *)talloc_zero_size(outdata, outdata->dsize);
196 if (!outdata->dptr) {
197 DEBUG(DEBUG_ALERT, (__location__ " Failed to allocate nodemap array\n"));
201 node_map = (struct ctdb_node_mapv4 *)outdata->dptr;
202 node_map->num = num_nodes;
203 for (i=0; i<num_nodes; i++) {
204 if (parse_ipv4(ctdb->nodes[i]->address.address, 0, &node_map->nodes[i].sin) == 0) {
205 DEBUG(DEBUG_ERR, (__location__ " Failed to parse %s into a sockaddr\n", ctdb->nodes[i]->address.address));
209 node_map->nodes[i].pnn = ctdb->nodes[i]->pnn;
210 node_map->nodes[i].flags = ctdb->nodes[i]->flags;
217 ctdb_reload_nodes_event(struct event_context *ev, struct timed_event *te,
218 struct timeval t, void *private_data)
221 struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
223 struct ctdb_node **nodes;
225 tmp_ctx = talloc_new(ctdb);
227 /* steal the old nodes file for a while */
228 talloc_steal(tmp_ctx, ctdb->nodes);
231 num_nodes = ctdb->num_nodes;
234 /* load the new nodes file */
235 ctdb_load_nodes_file(ctdb);
237 for (i=0; i<ctdb->num_nodes; i++) {
238 /* keep any identical pre-existing nodes and connections */
239 if ((i < num_nodes) && ctdb_same_address(&ctdb->nodes[i]->address, &nodes[i]->address)) {
240 talloc_free(ctdb->nodes[i]);
241 ctdb->nodes[i] = talloc_steal(ctdb->nodes, nodes[i]);
245 /* any new or different nodes must be added */
246 if (ctdb->methods->add_node(ctdb->nodes[i]) != 0) {
247 DEBUG(DEBUG_CRIT, (__location__ " methods->add_node failed at %d\n", i));
248 ctdb_fatal(ctdb, "failed to add node. shutting down\n");
250 if (ctdb->methods->connect_node(ctdb->nodes[i]) != 0) {
251 DEBUG(DEBUG_CRIT, (__location__ " methods->add_connect failed at %d\n", i));
252 ctdb_fatal(ctdb, "failed to connect to node. shutting down\n");
256 talloc_free(tmp_ctx);
261 reload the nodes file after a short delay (so that we can send the response
265 ctdb_control_reload_nodes_file(struct ctdb_context *ctdb, uint32_t opcode)
267 event_add_timed(ctdb->ev, ctdb, timeval_current_ofs(1,0), ctdb_reload_nodes_event, ctdb);
273 a traverse function for pulling all relevent records from pulldb
276 struct ctdb_context *ctdb;
277 struct ctdb_marshall_buffer *pulldata;
282 static int traverse_pulldb(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p)
284 struct pulldb_data *params = (struct pulldb_data *)p;
285 struct ctdb_rec_data *rec;
287 /* add the record to the blob */
288 rec = ctdb_marshall_record(params->pulldata, 0, key, NULL, data);
290 params->failed = true;
293 params->pulldata = talloc_realloc_size(NULL, params->pulldata, rec->length + params->len);
294 if (params->pulldata == NULL) {
295 DEBUG(DEBUG_ERR,(__location__ " Failed to expand pulldb_data to %u (%u records)\n",
296 rec->length + params->len, params->pulldata->count));
297 params->failed = true;
300 params->pulldata->count++;
301 memcpy(params->len+(uint8_t *)params->pulldata, rec, rec->length);
302 params->len += rec->length;
309 pul a bunch of records from a ltdb, filtering by lmaster
311 int32_t ctdb_control_pull_db(struct ctdb_context *ctdb, TDB_DATA indata, TDB_DATA *outdata)
313 struct ctdb_control_pulldb *pull;
314 struct ctdb_db_context *ctdb_db;
315 struct pulldb_data params;
316 struct ctdb_marshall_buffer *reply;
318 if (ctdb->freeze_mode != CTDB_FREEZE_FROZEN) {
319 DEBUG(DEBUG_DEBUG,("rejecting ctdb_control_pull_db when not frozen\n"));
323 pull = (struct ctdb_control_pulldb *)indata.dptr;
325 ctdb_db = find_ctdb_db(ctdb, pull->db_id);
327 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", pull->db_id));
331 reply = talloc_zero(outdata, struct ctdb_marshall_buffer);
332 CTDB_NO_MEMORY(ctdb, reply);
334 reply->db_id = pull->db_id;
337 params.pulldata = reply;
338 params.len = offsetof(struct ctdb_marshall_buffer, data);
339 params.failed = false;
341 if (ctdb_lock_all_databases_mark(ctdb) != 0) {
342 DEBUG(DEBUG_ERR,(__location__ " Failed to get lock on entired db - failing\n"));
346 if (tdb_traverse_read(ctdb_db->ltdb->tdb, traverse_pulldb, ¶ms) == -1) {
347 DEBUG(DEBUG_ERR,(__location__ " Failed to get traverse db '%s'\n", ctdb_db->db_name));
348 ctdb_lock_all_databases_unmark(ctdb);
349 talloc_free(params.pulldata);
353 ctdb_lock_all_databases_unmark(ctdb);
355 outdata->dptr = (uint8_t *)params.pulldata;
356 outdata->dsize = params.len;
362 push a bunch of records into a ltdb, filtering by rsn
364 int32_t ctdb_control_push_db(struct ctdb_context *ctdb, TDB_DATA indata)
366 struct ctdb_marshall_buffer *reply = (struct ctdb_marshall_buffer *)indata.dptr;
367 struct ctdb_db_context *ctdb_db;
369 struct ctdb_rec_data *rec;
371 if (ctdb->freeze_mode != CTDB_FREEZE_FROZEN) {
372 DEBUG(DEBUG_DEBUG,("rejecting ctdb_control_push_db when not frozen\n"));
376 if (indata.dsize < offsetof(struct ctdb_marshall_buffer, data)) {
377 DEBUG(DEBUG_ERR,(__location__ " invalid data in pulldb reply\n"));
381 ctdb_db = find_ctdb_db(ctdb, reply->db_id);
383 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", reply->db_id));
387 if (ctdb_lock_all_databases_mark(ctdb) != 0) {
388 DEBUG(DEBUG_ERR,(__location__ " Failed to get lock on entired db - failing\n"));
392 rec = (struct ctdb_rec_data *)&reply->data[0];
394 DEBUG(DEBUG_INFO,("starting push of %u records for dbid 0x%x\n",
395 reply->count, reply->db_id));
397 for (i=0;i<reply->count;i++) {
399 struct ctdb_ltdb_header *hdr;
401 key.dptr = &rec->data[0];
402 key.dsize = rec->keylen;
403 data.dptr = &rec->data[key.dsize];
404 data.dsize = rec->datalen;
406 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
407 DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record\n"));
410 hdr = (struct ctdb_ltdb_header *)data.dptr;
411 data.dptr += sizeof(*hdr);
412 data.dsize -= sizeof(*hdr);
414 ret = ctdb_ltdb_store(ctdb_db, key, hdr, data);
416 DEBUG(DEBUG_CRIT, (__location__ " Unable to store record\n"));
420 rec = (struct ctdb_rec_data *)(rec->length + (uint8_t *)rec);
423 DEBUG(DEBUG_DEBUG,("finished push of %u records for dbid 0x%x\n",
424 reply->count, reply->db_id));
426 ctdb_lock_all_databases_unmark(ctdb);
430 ctdb_lock_all_databases_unmark(ctdb);
435 static int traverse_setdmaster(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p)
437 uint32_t *dmaster = (uint32_t *)p;
438 struct ctdb_ltdb_header *header = (struct ctdb_ltdb_header *)data.dptr;
441 /* skip if already correct */
442 if (header->dmaster == *dmaster) {
446 header->dmaster = *dmaster;
448 ret = tdb_store(tdb, key, data, TDB_REPLACE);
450 DEBUG(DEBUG_CRIT,(__location__ " failed to write tdb data back ret:%d\n",ret));
454 /* TODO: add error checking here */
459 int32_t ctdb_control_set_dmaster(struct ctdb_context *ctdb, TDB_DATA indata)
461 struct ctdb_control_set_dmaster *p = (struct ctdb_control_set_dmaster *)indata.dptr;
462 struct ctdb_db_context *ctdb_db;
464 if (ctdb->freeze_mode != CTDB_FREEZE_FROZEN) {
465 DEBUG(DEBUG_DEBUG,("rejecting ctdb_control_set_dmaster when not frozen\n"));
469 ctdb_db = find_ctdb_db(ctdb, p->db_id);
471 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", p->db_id));
475 if (ctdb_lock_all_databases_mark(ctdb) != 0) {
476 DEBUG(DEBUG_ERR,(__location__ " Failed to get lock on entired db - failing\n"));
480 tdb_traverse(ctdb_db->ltdb->tdb, traverse_setdmaster, &p->dmaster);
482 ctdb_lock_all_databases_unmark(ctdb);
487 struct ctdb_set_recmode_state {
488 struct ctdb_context *ctdb;
489 struct ctdb_req_control *c;
492 struct timed_event *te;
493 struct fd_event *fde;
498 called if our set_recmode child times out. this would happen if
499 ctdb_recovery_lock() would block.
501 static void ctdb_set_recmode_timeout(struct event_context *ev, struct timed_event *te,
502 struct timeval t, void *private_data)
504 struct ctdb_set_recmode_state *state = talloc_get_type(private_data,
505 struct ctdb_set_recmode_state);
507 /* we consider this a success, not a failure, as we failed to
508 set the recovery lock which is what we wanted. This can be
509 caused by the cluster filesystem being very slow to
510 arbitrate locks immediately after a node failure.
512 DEBUG(DEBUG_ERR,(__location__ " set_recmode child process hung/timedout CFS slow to grant locks? (allowing recmode set anyway)\n"));
513 state->ctdb->recovery_mode = state->recmode;
514 ctdb_request_control_reply(state->ctdb, state->c, NULL, 0, NULL);
519 /* when we free the recmode state we must kill any child process.
521 static int set_recmode_destructor(struct ctdb_set_recmode_state *state)
523 kill(state->child, SIGKILL);
527 /* this is called when the client process has completed ctdb_recovery_lock()
528 and has written data back to us through the pipe.
530 static void set_recmode_handler(struct event_context *ev, struct fd_event *fde,
531 uint16_t flags, void *private_data)
533 struct ctdb_set_recmode_state *state= talloc_get_type(private_data,
534 struct ctdb_set_recmode_state);
538 /* we got a response from our child process so we can abort the
541 talloc_free(state->te);
545 /* read the childs status when trying to lock the reclock file.
546 child wrote 0 if everything is fine and 1 if it did manage
547 to lock the file, which would be a problem since that means
548 we got a request to exit from recovery but we could still lock
549 the file which at this time SHOULD be locked by the recovery
550 daemon on the recmaster
552 ret = read(state->fd[0], &c, 1);
553 if (ret != 1 || c != 0) {
554 ctdb_request_control_reply(state->ctdb, state->c, NULL, -1, "managed to lock reclock file from inside daemon");
559 state->ctdb->recovery_mode = state->recmode;
561 ctdb_request_control_reply(state->ctdb, state->c, NULL, 0, NULL);
567 ctdb_drop_all_ips_event(struct event_context *ev, struct timed_event *te,
568 struct timeval t, void *private_data)
570 struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
572 DEBUG(DEBUG_ERR,(__location__ " Been in recovery mode for too long. Dropping all IPS\n"));
573 talloc_free(ctdb->release_ips_ctx);
574 ctdb->release_ips_ctx = NULL;
576 ctdb_release_all_ips(ctdb);
580 set the recovery mode
582 int32_t ctdb_control_set_recmode(struct ctdb_context *ctdb,
583 struct ctdb_req_control *c,
584 TDB_DATA indata, bool *async_reply,
585 const char **errormsg)
587 uint32_t recmode = *(uint32_t *)indata.dptr;
589 struct ctdb_set_recmode_state *state;
590 pid_t parent = getpid();
592 /* if we enter recovery but stay in recovery for too long
593 we will eventually drop all our ip addresses
595 if (recmode == CTDB_RECOVERY_NORMAL) {
596 talloc_free(ctdb->release_ips_ctx);
597 ctdb->release_ips_ctx = NULL;
599 talloc_free(ctdb->release_ips_ctx);
600 ctdb->release_ips_ctx = talloc_new(ctdb);
601 CTDB_NO_MEMORY(ctdb, ctdb->release_ips_ctx);
603 event_add_timed(ctdb->ev, ctdb->release_ips_ctx, timeval_current_ofs(ctdb->tunable.recovery_drop_all_ips, 0), ctdb_drop_all_ips_event, ctdb);
607 if (ctdb->freeze_mode != CTDB_FREEZE_FROZEN) {
608 DEBUG(DEBUG_ERR,("Attempt to change recovery mode to %u when not frozen\n",
610 (*errormsg) = "Cannot change recovery mode while not frozen";
614 if (recmode != ctdb->recovery_mode) {
615 DEBUG(DEBUG_NOTICE,(__location__ " Recovery mode set to %s\n",
616 recmode==CTDB_RECOVERY_NORMAL?"NORMAL":"ACTIVE"));
619 if (recmode != CTDB_RECOVERY_NORMAL ||
620 ctdb->recovery_mode != CTDB_RECOVERY_ACTIVE) {
621 ctdb->recovery_mode = recmode;
625 /* some special handling when ending recovery mode */
627 /* force the databased to thaw */
628 if (ctdb->freeze_handle) {
629 ctdb_control_thaw(ctdb);
632 state = talloc(ctdb, struct ctdb_set_recmode_state);
633 CTDB_NO_MEMORY(ctdb, state);
636 if (ctdb->tunable.verify_recovery_lock == 0) {
637 /* dont need to verify the reclock file */
638 ctdb->recovery_mode = recmode;
642 /* For the rest of what needs to be done, we need to do this in
643 a child process since
644 1, the call to ctdb_recovery_lock() can block if the cluster
645 filesystem is in the process of recovery.
647 ret = pipe(state->fd);
650 DEBUG(DEBUG_CRIT,(__location__ " Failed to open pipe for set_recmode child\n"));
654 state->child = fork();
655 if (state->child == (pid_t)-1) {
662 if (state->child == 0) {
666 /* we should not be able to get the lock on the reclock file,
667 as it should be held by the recovery master
669 if (ctdb_recovery_lock(ctdb, false)) {
670 DEBUG(DEBUG_CRIT,("ERROR: recovery lock file %s not locked when recovering!\n", ctdb->recovery_lock_file));
674 write(state->fd[1], &cc, 1);
675 /* make sure we die when our parent dies */
676 while (kill(parent, 0) == 0 || errno != ESRCH) {
678 write(state->fd[1], &cc, 1);
684 talloc_set_destructor(state, set_recmode_destructor);
686 state->te = event_add_timed(ctdb->ev, state, timeval_current_ofs(15, 0),
687 ctdb_set_recmode_timeout, state);
689 state->fde = event_add_fd(ctdb->ev, state, state->fd[0],
690 EVENT_FD_READ|EVENT_FD_AUTOCLOSE,
693 if (state->fde == NULL) {
699 state->recmode = recmode;
700 state->c = talloc_steal(state, c);
709 try and get the recovery lock in shared storage - should only work
710 on the recovery master recovery daemon. Anywhere else is a bug
712 bool ctdb_recovery_lock(struct ctdb_context *ctdb, bool keep)
716 if (ctdb->recovery_lock_fd != -1) {
717 close(ctdb->recovery_lock_fd);
719 ctdb->recovery_lock_fd = open(ctdb->recovery_lock_file, O_RDWR|O_CREAT, 0600);
720 if (ctdb->recovery_lock_fd == -1) {
721 DEBUG(DEBUG_ERR,("ctdb_recovery_lock: Unable to open %s - (%s)\n",
722 ctdb->recovery_lock_file, strerror(errno)));
726 set_close_on_exec(ctdb->recovery_lock_fd);
728 lock.l_type = F_WRLCK;
729 lock.l_whence = SEEK_SET;
734 if (fcntl(ctdb->recovery_lock_fd, F_SETLK, &lock) != 0) {
735 close(ctdb->recovery_lock_fd);
736 ctdb->recovery_lock_fd = -1;
738 DEBUG(DEBUG_CRIT,("ctdb_recovery_lock: Failed to get recovery lock on '%s'\n", ctdb->recovery_lock_file));
744 close(ctdb->recovery_lock_fd);
745 ctdb->recovery_lock_fd = -1;
748 DEBUG(DEBUG_NOTICE,("ctdb_recovery_lock: Got recovery lock on '%s'\n", ctdb->recovery_lock_file));
754 delete a record as part of the vacuum process
755 only delete if we are not lmaster or dmaster, and our rsn is <= the provided rsn
756 use non-blocking locks
758 return 0 if the record was successfully deleted (i.e. it does not exist
759 when the function returns)
760 or !0 is the record still exists in the tdb after returning.
762 static int delete_tdb_record(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, struct ctdb_rec_data *rec)
765 struct ctdb_ltdb_header *hdr, *hdr2;
767 /* these are really internal tdb functions - but we need them here for
768 non-blocking lock of the freelist */
769 int tdb_lock_nonblock(struct tdb_context *tdb, int list, int ltype);
770 int tdb_unlock(struct tdb_context *tdb, int list, int ltype);
773 key.dsize = rec->keylen;
774 key.dptr = &rec->data[0];
775 data.dsize = rec->datalen;
776 data.dptr = &rec->data[rec->keylen];
778 if (ctdb_lmaster(ctdb, &key) == ctdb->pnn) {
779 DEBUG(DEBUG_INFO,(__location__ " Called delete on record where we are lmaster\n"));
783 if (data.dsize != sizeof(struct ctdb_ltdb_header)) {
784 DEBUG(DEBUG_ERR,(__location__ " Bad record size\n"));
788 hdr = (struct ctdb_ltdb_header *)data.dptr;
790 /* use a non-blocking lock */
791 if (tdb_chainlock_nonblock(ctdb_db->ltdb->tdb, key) != 0) {
795 data = tdb_fetch(ctdb_db->ltdb->tdb, key);
796 if (data.dptr == NULL) {
797 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
801 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
802 if (tdb_lock_nonblock(ctdb_db->ltdb->tdb, -1, F_WRLCK) == 0) {
803 tdb_delete(ctdb_db->ltdb->tdb, key);
804 tdb_unlock(ctdb_db->ltdb->tdb, -1, F_WRLCK);
805 DEBUG(DEBUG_CRIT,(__location__ " Deleted corrupt record\n"));
807 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
812 hdr2 = (struct ctdb_ltdb_header *)data.dptr;
814 if (hdr2->rsn > hdr->rsn) {
815 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
816 DEBUG(DEBUG_INFO,(__location__ " Skipping record with rsn=%llu - called with rsn=%llu\n",
817 (unsigned long long)hdr2->rsn, (unsigned long long)hdr->rsn));
822 if (hdr2->dmaster == ctdb->pnn) {
823 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
824 DEBUG(DEBUG_INFO,(__location__ " Attempted delete record where we are the dmaster\n"));
829 if (tdb_lock_nonblock(ctdb_db->ltdb->tdb, -1, F_WRLCK) != 0) {
830 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
835 if (tdb_delete(ctdb_db->ltdb->tdb, key) != 0) {
836 tdb_unlock(ctdb_db->ltdb->tdb, -1, F_WRLCK);
837 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
838 DEBUG(DEBUG_INFO,(__location__ " Failed to delete record\n"));
843 tdb_unlock(ctdb_db->ltdb->tdb, -1, F_WRLCK);
844 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
851 struct recovery_callback_state {
852 struct ctdb_req_control *c;
857 called when the 'recovered' event script has finished
859 static void ctdb_end_recovery_callback(struct ctdb_context *ctdb, int status, void *p)
861 struct recovery_callback_state *state = talloc_get_type(p, struct recovery_callback_state);
863 ctdb_enable_monitoring(ctdb);
866 DEBUG(DEBUG_ERR,(__location__ " recovered event script failed (status %d)\n", status));
869 ctdb_request_control_reply(ctdb, state->c, NULL, status, NULL);
872 gettimeofday(&ctdb->last_recovery_finished, NULL);
876 recovery has finished
878 int32_t ctdb_control_end_recovery(struct ctdb_context *ctdb,
879 struct ctdb_req_control *c,
883 struct recovery_callback_state *state;
885 DEBUG(DEBUG_NOTICE,("Recovery has finished\n"));
887 state = talloc(ctdb, struct recovery_callback_state);
888 CTDB_NO_MEMORY(ctdb, state);
890 state->c = talloc_steal(state, c);
892 ctdb_disable_monitoring(ctdb);
894 ret = ctdb_event_script_callback(ctdb,
895 timeval_current_ofs(ctdb->tunable.script_timeout, 0),
897 ctdb_end_recovery_callback,
901 ctdb_enable_monitoring(ctdb);
903 DEBUG(DEBUG_ERR,(__location__ " Failed to end recovery\n"));
908 /* tell the control that we will be reply asynchronously */
914 called when the 'startrecovery' event script has finished
916 static void ctdb_start_recovery_callback(struct ctdb_context *ctdb, int status, void *p)
918 struct recovery_callback_state *state = talloc_get_type(p, struct recovery_callback_state);
921 DEBUG(DEBUG_ERR,(__location__ " startrecovery event script failed (status %d)\n", status));
924 ctdb_request_control_reply(ctdb, state->c, NULL, status, NULL);
929 run the startrecovery eventscript
931 int32_t ctdb_control_start_recovery(struct ctdb_context *ctdb,
932 struct ctdb_req_control *c,
936 struct recovery_callback_state *state;
938 DEBUG(DEBUG_NOTICE,(__location__ " startrecovery eventscript has been invoked\n"));
939 gettimeofday(&ctdb->last_recovery_started, NULL);
941 state = talloc(ctdb, struct recovery_callback_state);
942 CTDB_NO_MEMORY(ctdb, state);
944 state->c = talloc_steal(state, c);
946 ctdb_disable_monitoring(ctdb);
948 ret = ctdb_event_script_callback(ctdb,
949 timeval_current_ofs(ctdb->tunable.script_timeout, 0),
951 ctdb_start_recovery_callback,
952 state, "startrecovery");
955 DEBUG(DEBUG_ERR,(__location__ " Failed to start recovery\n"));
960 /* tell the control that we will be reply asynchronously */
966 try to delete all these records as part of the vacuuming process
967 and return the records we failed to delete
969 int32_t ctdb_control_try_delete_records(struct ctdb_context *ctdb, TDB_DATA indata, TDB_DATA *outdata)
971 struct ctdb_marshall_buffer *reply = (struct ctdb_marshall_buffer *)indata.dptr;
972 struct ctdb_db_context *ctdb_db;
974 struct ctdb_rec_data *rec;
975 struct ctdb_marshall_buffer *records;
977 if (indata.dsize < offsetof(struct ctdb_marshall_buffer, data)) {
978 DEBUG(DEBUG_ERR,(__location__ " invalid data in try_delete_records\n"));
982 ctdb_db = find_ctdb_db(ctdb, reply->db_id);
984 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", reply->db_id));
989 DEBUG(DEBUG_DEBUG,("starting try_delete_records of %u records for dbid 0x%x\n",
990 reply->count, reply->db_id));
993 /* create a blob to send back the records we couldnt delete */
994 records = (struct ctdb_marshall_buffer *)
995 talloc_zero_size(outdata,
996 offsetof(struct ctdb_marshall_buffer, data));
997 if (records == NULL) {
998 DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
1001 records->db_id = ctdb_db->db_id;
1004 rec = (struct ctdb_rec_data *)&reply->data[0];
1005 for (i=0;i<reply->count;i++) {
1008 key.dptr = &rec->data[0];
1009 key.dsize = rec->keylen;
1010 data.dptr = &rec->data[key.dsize];
1011 data.dsize = rec->datalen;
1013 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
1014 DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record in indata\n"));
1018 /* If we cant delete the record we must add it to the reply
1019 so the lmaster knows it may not purge this record
1021 if (delete_tdb_record(ctdb, ctdb_db, rec) != 0) {
1023 struct ctdb_ltdb_header *hdr;
1025 hdr = (struct ctdb_ltdb_header *)data.dptr;
1026 data.dptr += sizeof(*hdr);
1027 data.dsize -= sizeof(*hdr);
1029 DEBUG(DEBUG_INFO, (__location__ " Failed to vacuum delete record with hash 0x%08x\n", ctdb_hash(&key)));
1031 old_size = talloc_get_size(records);
1032 records = talloc_realloc_size(outdata, records, old_size + rec->length);
1033 if (records == NULL) {
1034 DEBUG(DEBUG_ERR,(__location__ " Failed to expand\n"));
1038 memcpy(old_size+(uint8_t *)records, rec, rec->length);
1041 rec = (struct ctdb_rec_data *)(rec->length + (uint8_t *)rec);
1045 outdata->dptr = (uint8_t *)records;
1046 outdata->dsize = talloc_get_size(records);
1054 int32_t ctdb_control_get_capabilities(struct ctdb_context *ctdb, TDB_DATA *outdata)
1056 uint32_t *capabilities = NULL;
1058 capabilities = talloc(outdata, uint32_t);
1059 CTDB_NO_MEMORY(ctdb, capabilities);
1060 *capabilities = ctdb->capabilities;
1062 outdata->dsize = sizeof(uint32_t);
1063 outdata->dptr = (uint8_t *)capabilities;
1068 static void ctdb_recd_ping_timeout(struct event_context *ev, struct timed_event *te, struct timeval t, void *p)
1070 struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context);
1071 uint32_t *count = talloc_get_type(ctdb->recd_ping_count, uint32_t);
1073 DEBUG(DEBUG_ERR, (__location__ " Recovery daemon ping timeout. Count : %u\n", *count));
1075 if (*count < ctdb->tunable.recd_ping_failcount) {
1077 event_add_timed(ctdb->ev, ctdb->recd_ping_count,
1078 timeval_current_ofs(ctdb->tunable.recd_ping_timeout, 0),
1079 ctdb_recd_ping_timeout, ctdb);
1083 DEBUG(DEBUG_ERR, (__location__ " Final timeout for recovery daemon ping. Shutting down ctdb daemon\n"));
1085 ctdb_stop_recoverd(ctdb);
1086 ctdb_stop_keepalive(ctdb);
1087 ctdb_stop_monitoring(ctdb);
1088 ctdb_release_all_ips(ctdb);
1089 if (ctdb->methods != NULL) {
1090 ctdb->methods->shutdown(ctdb);
1092 ctdb_event_script(ctdb, "shutdown");
1093 DEBUG(DEBUG_ERR, (__location__ " Recovery daemon ping timeout. Daemon has been shut down.\n"));
1097 /* The recovery daemon will ping us at regular intervals.
1098 If we havent been pinged for a while we assume the recovery
1099 daemon is inoperable and we shut down.
1101 int32_t ctdb_control_recd_ping(struct ctdb_context *ctdb)
1103 talloc_free(ctdb->recd_ping_count);
1105 ctdb->recd_ping_count = talloc_zero(ctdb, uint32_t);
1106 CTDB_NO_MEMORY(ctdb, ctdb->recd_ping_count);
1108 if (ctdb->tunable.recd_ping_timeout != 0) {
1109 event_add_timed(ctdb->ev, ctdb->recd_ping_count,
1110 timeval_current_ofs(ctdb->tunable.recd_ping_timeout, 0),
1111 ctdb_recd_ping_timeout, ctdb);
1119 int32_t ctdb_control_set_recmaster(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata)
1121 CHECK_CONTROL_DATA_SIZE(sizeof(uint32_t));
1123 ctdb->recovery_master = ((uint32_t *)(&indata.dptr[0]))[0];