4 Copyright (C) Andrew Tridgell 2007
5 Copyright (C) Ronnie Sahlberg 2007
7 This program is free software; you can redistribute it and/or modify
8 it under the terms of the GNU General Public License as published by
9 the Free Software Foundation; either version 3 of the License, or
10 (at your option) any later version.
12 This program is distributed in the hope that it will be useful,
13 but WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 GNU General Public License for more details.
17 You should have received a copy of the GNU General Public License
18 along with this program; if not, see <http://www.gnu.org/licenses/>.
21 #include "lib/events/events.h"
22 #include "lib/tdb/include/tdb.h"
23 #include "system/time.h"
24 #include "system/network.h"
25 #include "system/filesys.h"
26 #include "system/wait.h"
27 #include "../include/ctdb_private.h"
28 #include "lib/util/dlinklist.h"
32 lock all databases - mark only
34 static int ctdb_lock_all_databases_mark(struct ctdb_context *ctdb)
36 struct ctdb_db_context *ctdb_db;
37 if (ctdb->freeze_mode != CTDB_FREEZE_FROZEN) {
38 DEBUG(DEBUG_ERR,("Attempt to mark all databases locked when not frozen\n"));
41 for (ctdb_db=ctdb->db_list;ctdb_db;ctdb_db=ctdb_db->next) {
42 if (tdb_lockall_mark(ctdb_db->ltdb->tdb) != 0) {
50 lock all databases - unmark only
52 static int ctdb_lock_all_databases_unmark(struct ctdb_context *ctdb)
54 struct ctdb_db_context *ctdb_db;
55 if (ctdb->freeze_mode != CTDB_FREEZE_FROZEN) {
56 DEBUG(DEBUG_ERR,("Attempt to unmark all databases locked when not frozen\n"));
59 for (ctdb_db=ctdb->db_list;ctdb_db;ctdb_db=ctdb_db->next) {
60 if (tdb_lockall_unmark(ctdb_db->ltdb->tdb) != 0) {
69 ctdb_control_getvnnmap(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
71 CHECK_CONTROL_DATA_SIZE(0);
72 struct ctdb_vnn_map_wire *map;
75 len = offsetof(struct ctdb_vnn_map_wire, map) + sizeof(uint32_t)*ctdb->vnn_map->size;
76 map = talloc_size(outdata, len);
77 CTDB_NO_MEMORY(ctdb, map);
79 map->generation = ctdb->vnn_map->generation;
80 map->size = ctdb->vnn_map->size;
81 memcpy(map->map, ctdb->vnn_map->map, sizeof(uint32_t)*map->size);
84 outdata->dptr = (uint8_t *)map;
90 ctdb_control_setvnnmap(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
92 struct ctdb_vnn_map_wire *map = (struct ctdb_vnn_map_wire *)indata.dptr;
94 if (ctdb->freeze_mode != CTDB_FREEZE_FROZEN) {
95 DEBUG(DEBUG_ERR,("Attempt to set vnnmap when not frozen\n"));
99 talloc_free(ctdb->vnn_map);
101 ctdb->vnn_map = talloc(ctdb, struct ctdb_vnn_map);
102 CTDB_NO_MEMORY(ctdb, ctdb->vnn_map);
104 ctdb->vnn_map->generation = map->generation;
105 ctdb->vnn_map->size = map->size;
106 ctdb->vnn_map->map = talloc_array(ctdb->vnn_map, uint32_t, map->size);
107 CTDB_NO_MEMORY(ctdb, ctdb->vnn_map->map);
109 memcpy(ctdb->vnn_map->map, map->map, sizeof(uint32_t)*map->size);
115 ctdb_control_getdbmap(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
118 struct ctdb_db_context *ctdb_db;
119 struct ctdb_dbid_map *dbid_map;
121 CHECK_CONTROL_DATA_SIZE(0);
124 for(ctdb_db=ctdb->db_list;ctdb_db;ctdb_db=ctdb_db->next){
129 outdata->dsize = offsetof(struct ctdb_dbid_map, dbs) + sizeof(dbid_map->dbs[0])*len;
130 outdata->dptr = (unsigned char *)talloc_zero_size(outdata, outdata->dsize);
131 if (!outdata->dptr) {
132 DEBUG(DEBUG_ALERT, (__location__ " Failed to allocate dbmap array\n"));
136 dbid_map = (struct ctdb_dbid_map *)outdata->dptr;
138 for (i=0,ctdb_db=ctdb->db_list;ctdb_db;i++,ctdb_db=ctdb_db->next){
139 dbid_map->dbs[i].dbid = ctdb_db->db_id;
140 dbid_map->dbs[i].persistent = ctdb_db->persistent;
147 ctdb_control_getnodemap(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
149 uint32_t i, num_nodes;
150 struct ctdb_node_map *node_map;
152 CHECK_CONTROL_DATA_SIZE(0);
154 num_nodes = ctdb->num_nodes;
156 outdata->dsize = offsetof(struct ctdb_node_map, nodes) + num_nodes*sizeof(struct ctdb_node_and_flags);
157 outdata->dptr = (unsigned char *)talloc_zero_size(outdata, outdata->dsize);
158 if (!outdata->dptr) {
159 DEBUG(DEBUG_ALERT, (__location__ " Failed to allocate nodemap array\n"));
163 node_map = (struct ctdb_node_map *)outdata->dptr;
164 node_map->num = num_nodes;
165 for (i=0; i<num_nodes; i++) {
166 if (parse_ip(ctdb->nodes[i]->address.address, &node_map->nodes[i].addr) == 0) {
167 DEBUG(DEBUG_ERR, (__location__ " Failed to parse %s into a sockaddr\n", ctdb->nodes[i]->address.address));
170 node_map->nodes[i].pnn = ctdb->nodes[i]->pnn;
171 node_map->nodes[i].flags = ctdb->nodes[i]->flags;
178 get an old style ipv4-only nodemap
181 ctdb_control_getnodemapv4(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
183 uint32_t i, num_nodes;
184 struct ctdb_node_mapv4 *node_map;
186 CHECK_CONTROL_DATA_SIZE(0);
188 num_nodes = ctdb->num_nodes;
190 outdata->dsize = offsetof(struct ctdb_node_mapv4, nodes) + num_nodes*sizeof(struct ctdb_node_and_flagsv4);
191 outdata->dptr = (unsigned char *)talloc_zero_size(outdata, outdata->dsize);
192 if (!outdata->dptr) {
193 DEBUG(DEBUG_ALERT, (__location__ " Failed to allocate nodemap array\n"));
197 node_map = (struct ctdb_node_mapv4 *)outdata->dptr;
198 node_map->num = num_nodes;
199 for (i=0; i<num_nodes; i++) {
200 if (parse_ipv4(ctdb->nodes[i]->address.address, 0, &node_map->nodes[i].sin) == 0) {
201 DEBUG(DEBUG_ERR, (__location__ " Failed to parse %s into a sockaddr\n", ctdb->nodes[i]->address.address));
205 node_map->nodes[i].pnn = ctdb->nodes[i]->pnn;
206 node_map->nodes[i].flags = ctdb->nodes[i]->flags;
213 ctdb_reload_nodes_event(struct event_context *ev, struct timed_event *te,
214 struct timeval t, void *private_data)
218 struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
220 ctdb_load_nodes_file(ctdb);
222 for (i=0; i<ctdb->num_nodes; i++) {
223 if (ctdb->methods->add_node(ctdb->nodes[i]) != 0) {
224 DEBUG(DEBUG_CRIT, (__location__ " methods->add_node failed at %d\n", i));
225 ctdb_fatal(ctdb, "failed to add node. shutting down\n");
228 ctdb->methods->start(ctdb);
234 reload the nodes file after a short delay (so that we can send the response
238 ctdb_control_reload_nodes_file(struct ctdb_context *ctdb, uint32_t opcode)
240 event_add_timed(ctdb->ev, ctdb, timeval_current_ofs(1,0), ctdb_reload_nodes_event, ctdb);
246 a traverse function for pulling all relevent records from pulldb
249 struct ctdb_context *ctdb;
250 struct ctdb_marshall_buffer *pulldata;
255 static int traverse_pulldb(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p)
257 struct pulldb_data *params = (struct pulldb_data *)p;
258 struct ctdb_rec_data *rec;
260 /* add the record to the blob */
261 rec = ctdb_marshall_record(params->pulldata, 0, key, NULL, data);
263 params->failed = true;
266 params->pulldata = talloc_realloc_size(NULL, params->pulldata, rec->length + params->len);
267 if (params->pulldata == NULL) {
268 DEBUG(DEBUG_ERR,(__location__ " Failed to expand pulldb_data to %u (%u records)\n",
269 rec->length + params->len, params->pulldata->count));
270 params->failed = true;
273 params->pulldata->count++;
274 memcpy(params->len+(uint8_t *)params->pulldata, rec, rec->length);
275 params->len += rec->length;
282 pul a bunch of records from a ltdb, filtering by lmaster
284 int32_t ctdb_control_pull_db(struct ctdb_context *ctdb, TDB_DATA indata, TDB_DATA *outdata)
286 struct ctdb_control_pulldb *pull;
287 struct ctdb_db_context *ctdb_db;
288 struct pulldb_data params;
289 struct ctdb_marshall_buffer *reply;
291 if (ctdb->freeze_mode != CTDB_FREEZE_FROZEN) {
292 DEBUG(DEBUG_DEBUG,("rejecting ctdb_control_pull_db when not frozen\n"));
296 pull = (struct ctdb_control_pulldb *)indata.dptr;
298 ctdb_db = find_ctdb_db(ctdb, pull->db_id);
300 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", pull->db_id));
304 reply = talloc_zero(outdata, struct ctdb_marshall_buffer);
305 CTDB_NO_MEMORY(ctdb, reply);
307 reply->db_id = pull->db_id;
310 params.pulldata = reply;
311 params.len = offsetof(struct ctdb_marshall_buffer, data);
312 params.failed = false;
314 if (ctdb_lock_all_databases_mark(ctdb) != 0) {
315 DEBUG(DEBUG_ERR,(__location__ " Failed to get lock on entired db - failing\n"));
319 if (tdb_traverse_read(ctdb_db->ltdb->tdb, traverse_pulldb, ¶ms) == -1) {
320 DEBUG(DEBUG_ERR,(__location__ " Failed to get traverse db '%s'\n", ctdb_db->db_name));
321 ctdb_lock_all_databases_unmark(ctdb);
322 talloc_free(params.pulldata);
326 ctdb_lock_all_databases_unmark(ctdb);
328 outdata->dptr = (uint8_t *)params.pulldata;
329 outdata->dsize = params.len;
335 push a bunch of records into a ltdb, filtering by rsn
337 int32_t ctdb_control_push_db(struct ctdb_context *ctdb, TDB_DATA indata)
339 struct ctdb_marshall_buffer *reply = (struct ctdb_marshall_buffer *)indata.dptr;
340 struct ctdb_db_context *ctdb_db;
342 struct ctdb_rec_data *rec;
344 if (ctdb->freeze_mode != CTDB_FREEZE_FROZEN) {
345 DEBUG(DEBUG_DEBUG,("rejecting ctdb_control_push_db when not frozen\n"));
349 if (indata.dsize < offsetof(struct ctdb_marshall_buffer, data)) {
350 DEBUG(DEBUG_ERR,(__location__ " invalid data in pulldb reply\n"));
354 ctdb_db = find_ctdb_db(ctdb, reply->db_id);
356 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", reply->db_id));
360 if (ctdb_lock_all_databases_mark(ctdb) != 0) {
361 DEBUG(DEBUG_ERR,(__location__ " Failed to get lock on entired db - failing\n"));
365 rec = (struct ctdb_rec_data *)&reply->data[0];
367 DEBUG(DEBUG_INFO,("starting push of %u records for dbid 0x%x\n",
368 reply->count, reply->db_id));
370 for (i=0;i<reply->count;i++) {
372 struct ctdb_ltdb_header *hdr;
374 key.dptr = &rec->data[0];
375 key.dsize = rec->keylen;
376 data.dptr = &rec->data[key.dsize];
377 data.dsize = rec->datalen;
379 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
380 DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record\n"));
383 hdr = (struct ctdb_ltdb_header *)data.dptr;
384 data.dptr += sizeof(*hdr);
385 data.dsize -= sizeof(*hdr);
387 ret = ctdb_ltdb_store(ctdb_db, key, hdr, data);
389 DEBUG(DEBUG_CRIT, (__location__ " Unable to store record\n"));
393 rec = (struct ctdb_rec_data *)(rec->length + (uint8_t *)rec);
396 DEBUG(DEBUG_DEBUG,("finished push of %u records for dbid 0x%x\n",
397 reply->count, reply->db_id));
399 ctdb_lock_all_databases_unmark(ctdb);
403 ctdb_lock_all_databases_unmark(ctdb);
408 static int traverse_setdmaster(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p)
410 uint32_t *dmaster = (uint32_t *)p;
411 struct ctdb_ltdb_header *header = (struct ctdb_ltdb_header *)data.dptr;
414 /* skip if already correct */
415 if (header->dmaster == *dmaster) {
419 header->dmaster = *dmaster;
421 ret = tdb_store(tdb, key, data, TDB_REPLACE);
423 DEBUG(DEBUG_CRIT,(__location__ " failed to write tdb data back ret:%d\n",ret));
427 /* TODO: add error checking here */
432 int32_t ctdb_control_set_dmaster(struct ctdb_context *ctdb, TDB_DATA indata)
434 struct ctdb_control_set_dmaster *p = (struct ctdb_control_set_dmaster *)indata.dptr;
435 struct ctdb_db_context *ctdb_db;
437 if (ctdb->freeze_mode != CTDB_FREEZE_FROZEN) {
438 DEBUG(DEBUG_DEBUG,("rejecting ctdb_control_set_dmaster when not frozen\n"));
442 ctdb_db = find_ctdb_db(ctdb, p->db_id);
444 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", p->db_id));
448 if (ctdb_lock_all_databases_mark(ctdb) != 0) {
449 DEBUG(DEBUG_ERR,(__location__ " Failed to get lock on entired db - failing\n"));
453 tdb_traverse(ctdb_db->ltdb->tdb, traverse_setdmaster, &p->dmaster);
455 ctdb_lock_all_databases_unmark(ctdb);
460 struct ctdb_set_recmode_state {
461 struct ctdb_context *ctdb;
462 struct ctdb_req_control *c;
465 struct timed_event *te;
466 struct fd_event *fde;
471 called if our set_recmode child times out. this would happen if
472 ctdb_recovery_lock() would block.
474 static void ctdb_set_recmode_timeout(struct event_context *ev, struct timed_event *te,
475 struct timeval t, void *private_data)
477 struct ctdb_set_recmode_state *state = talloc_get_type(private_data,
478 struct ctdb_set_recmode_state);
480 /* we consider this a success, not a failure, as we failed to
481 set the recovery lock which is what we wanted. This can be
482 caused by the cluster filesystem being very slow to
483 arbitrate locks immediately after a node failure.
485 DEBUG(DEBUG_NOTICE,(__location__ " set_recmode timeout - allowing recmode set\n"));
486 state->ctdb->recovery_mode = state->recmode;
487 ctdb_request_control_reply(state->ctdb, state->c, NULL, 0, NULL);
492 /* when we free the recmode state we must kill any child process.
494 static int set_recmode_destructor(struct ctdb_set_recmode_state *state)
496 kill(state->child, SIGKILL);
500 /* this is called when the client process has completed ctdb_recovery_lock()
501 and has written data back to us through the pipe.
503 static void set_recmode_handler(struct event_context *ev, struct fd_event *fde,
504 uint16_t flags, void *private_data)
506 struct ctdb_set_recmode_state *state= talloc_get_type(private_data,
507 struct ctdb_set_recmode_state);
511 /* we got a response from our child process so we can abort the
514 talloc_free(state->te);
518 /* read the childs status when trying to lock the reclock file.
519 child wrote 0 if everything is fine and 1 if it did manage
520 to lock the file, which would be a problem since that means
521 we got a request to exit from recovery but we could still lock
522 the file which at this time SHOULD be locked by the recovery
523 daemon on the recmaster
525 ret = read(state->fd[0], &c, 1);
526 if (ret != 1 || c != 0) {
527 ctdb_request_control_reply(state->ctdb, state->c, NULL, -1, "managed to lock reclock file from inside daemon");
532 state->ctdb->recovery_mode = state->recmode;
534 ctdb_request_control_reply(state->ctdb, state->c, NULL, 0, NULL);
540 ctdb_drop_all_ips_event(struct event_context *ev, struct timed_event *te,
541 struct timeval t, void *private_data)
543 struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
545 DEBUG(DEBUG_INFO,(__location__ " Been in recovery mode for too long. Dropping all IPS\n"));
546 talloc_free(ctdb->release_ips_ctx);
547 ctdb->release_ips_ctx = NULL;
549 ctdb_release_all_ips(ctdb);
553 set the recovery mode
555 int32_t ctdb_control_set_recmode(struct ctdb_context *ctdb,
556 struct ctdb_req_control *c,
557 TDB_DATA indata, bool *async_reply,
558 const char **errormsg)
560 uint32_t recmode = *(uint32_t *)indata.dptr;
562 struct ctdb_set_recmode_state *state;
563 pid_t parent = getpid();
565 /* if we enter recovery but stay in recovery for too long
566 we will eventually drop all our ip addresses
568 if (recmode == CTDB_RECOVERY_NORMAL) {
569 talloc_free(ctdb->release_ips_ctx);
570 ctdb->release_ips_ctx = NULL;
572 talloc_free(ctdb->release_ips_ctx);
573 ctdb->release_ips_ctx = talloc_new(ctdb);
574 CTDB_NO_MEMORY(ctdb, ctdb->release_ips_ctx);
576 event_add_timed(ctdb->ev, ctdb->release_ips_ctx, timeval_current_ofs(5,0), ctdb_drop_all_ips_event, ctdb);
580 if (ctdb->freeze_mode != CTDB_FREEZE_FROZEN) {
581 DEBUG(DEBUG_ERR,("Attempt to change recovery mode to %u when not frozen\n",
583 (*errormsg) = "Cannot change recovery mode while not frozen";
587 if (recmode != ctdb->recovery_mode) {
588 DEBUG(DEBUG_NOTICE,(__location__ " Recovery mode set to %s\n",
589 recmode==CTDB_RECOVERY_NORMAL?"NORMAL":"ACTIVE"));
592 if (recmode != CTDB_RECOVERY_NORMAL ||
593 ctdb->recovery_mode != CTDB_RECOVERY_ACTIVE) {
594 ctdb->recovery_mode = recmode;
598 /* some special handling when ending recovery mode */
600 /* force the databased to thaw */
601 if (ctdb->freeze_handle) {
602 ctdb_control_thaw(ctdb);
605 state = talloc(ctdb, struct ctdb_set_recmode_state);
606 CTDB_NO_MEMORY(ctdb, state);
608 /* For the rest of what needs to be done, we need to do this in
609 a child process since
610 1, the call to ctdb_recovery_lock() can block if the cluster
611 filesystem is in the process of recovery.
612 2, running of the script may take a while.
614 ret = pipe(state->fd);
617 DEBUG(DEBUG_CRIT,(__location__ " Failed to open pipe for set_recmode child\n"));
621 state->child = fork();
622 if (state->child == (pid_t)-1) {
629 if (state->child == 0) {
633 /* we should not be able to get the lock on the nodes list,
634 as it should be held by the recovery master
636 if (ctdb_recovery_lock(ctdb, false)) {
637 DEBUG(DEBUG_CRIT,("ERROR: recovery lock file %s not locked when recovering!\n", ctdb->recovery_lock_file));
641 write(state->fd[1], &cc, 1);
642 /* make sure we die when our parent dies */
643 while (kill(parent, 0) == 0 || errno != ESRCH) {
650 talloc_set_destructor(state, set_recmode_destructor);
652 state->te = event_add_timed(ctdb->ev, state, timeval_current_ofs(3, 0),
653 ctdb_set_recmode_timeout, state);
655 state->fde = event_add_fd(ctdb->ev, state, state->fd[0],
656 EVENT_FD_READ|EVENT_FD_AUTOCLOSE,
659 if (state->fde == NULL) {
665 state->recmode = recmode;
666 state->c = talloc_steal(state, c);
675 try and get the recovery lock in shared storage - should only work
676 on the recovery master recovery daemon. Anywhere else is a bug
678 bool ctdb_recovery_lock(struct ctdb_context *ctdb, bool keep)
682 if (ctdb->recovery_lock_fd != -1) {
683 close(ctdb->recovery_lock_fd);
685 ctdb->recovery_lock_fd = open(ctdb->recovery_lock_file, O_RDWR|O_CREAT, 0600);
686 if (ctdb->recovery_lock_fd == -1) {
687 DEBUG(DEBUG_ERR,("ctdb_recovery_lock: Unable to open %s - (%s)\n",
688 ctdb->recovery_lock_file, strerror(errno)));
692 set_close_on_exec(ctdb->recovery_lock_fd);
694 lock.l_type = F_WRLCK;
695 lock.l_whence = SEEK_SET;
700 if (fcntl(ctdb->recovery_lock_fd, F_SETLK, &lock) != 0) {
701 close(ctdb->recovery_lock_fd);
702 ctdb->recovery_lock_fd = -1;
704 DEBUG(DEBUG_CRIT,("ctdb_recovery_lock: Failed to get recovery lock on '%s'\n", ctdb->recovery_lock_file));
710 close(ctdb->recovery_lock_fd);
711 ctdb->recovery_lock_fd = -1;
714 DEBUG(DEBUG_NOTICE,("ctdb_recovery_lock: Got recovery lock on '%s'\n", ctdb->recovery_lock_file));
720 delete a record as part of the vacuum process
721 only delete if we are not lmaster or dmaster, and our rsn is <= the provided rsn
722 use non-blocking locks
724 return 0 if the record was successfully deleted (i.e. it does not exist
725 when the function returns)
726 or !0 is the record still exists in the tdb after returning.
728 static int delete_tdb_record(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, struct ctdb_rec_data *rec)
731 struct ctdb_ltdb_header *hdr, *hdr2;
733 /* these are really internal tdb functions - but we need them here for
734 non-blocking lock of the freelist */
735 int tdb_lock_nonblock(struct tdb_context *tdb, int list, int ltype);
736 int tdb_unlock(struct tdb_context *tdb, int list, int ltype);
739 key.dsize = rec->keylen;
740 key.dptr = &rec->data[0];
741 data.dsize = rec->datalen;
742 data.dptr = &rec->data[rec->keylen];
744 if (ctdb_lmaster(ctdb, &key) == ctdb->pnn) {
745 DEBUG(DEBUG_INFO,(__location__ " Called delete on record where we are lmaster\n"));
749 if (data.dsize != sizeof(struct ctdb_ltdb_header)) {
750 DEBUG(DEBUG_ERR,(__location__ " Bad record size\n"));
754 hdr = (struct ctdb_ltdb_header *)data.dptr;
756 /* use a non-blocking lock */
757 if (tdb_chainlock_nonblock(ctdb_db->ltdb->tdb, key) != 0) {
761 data = tdb_fetch(ctdb_db->ltdb->tdb, key);
762 if (data.dptr == NULL) {
763 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
767 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
768 if (tdb_lock_nonblock(ctdb_db->ltdb->tdb, -1, F_WRLCK) == 0) {
769 tdb_delete(ctdb_db->ltdb->tdb, key);
770 tdb_unlock(ctdb_db->ltdb->tdb, -1, F_WRLCK);
771 DEBUG(DEBUG_CRIT,(__location__ " Deleted corrupt record\n"));
773 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
778 hdr2 = (struct ctdb_ltdb_header *)data.dptr;
780 if (hdr2->rsn > hdr->rsn) {
781 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
782 DEBUG(DEBUG_INFO,(__location__ " Skipping record with rsn=%llu - called with rsn=%llu\n",
783 (unsigned long long)hdr2->rsn, (unsigned long long)hdr->rsn));
788 if (hdr2->dmaster == ctdb->pnn) {
789 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
790 DEBUG(DEBUG_INFO,(__location__ " Attempted delete record where we are the dmaster\n"));
795 if (tdb_lock_nonblock(ctdb_db->ltdb->tdb, -1, F_WRLCK) != 0) {
796 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
801 if (tdb_delete(ctdb_db->ltdb->tdb, key) != 0) {
802 tdb_unlock(ctdb_db->ltdb->tdb, -1, F_WRLCK);
803 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
804 DEBUG(DEBUG_INFO,(__location__ " Failed to delete record\n"));
809 tdb_unlock(ctdb_db->ltdb->tdb, -1, F_WRLCK);
810 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
817 struct recovery_callback_state {
818 struct ctdb_req_control *c;
823 called when the 'recovered' event script has finished
825 static void ctdb_end_recovery_callback(struct ctdb_context *ctdb, int status, void *p)
827 struct recovery_callback_state *state = talloc_get_type(p, struct recovery_callback_state);
829 ctdb_enable_monitoring(ctdb);
832 DEBUG(DEBUG_ERR,(__location__ " recovered event script failed (status %d)\n", status));
835 ctdb_request_control_reply(ctdb, state->c, NULL, status, NULL);
838 gettimeofday(&ctdb->last_recovery_finished, NULL);
842 recovery has finished
844 int32_t ctdb_control_end_recovery(struct ctdb_context *ctdb,
845 struct ctdb_req_control *c,
849 struct recovery_callback_state *state;
851 DEBUG(DEBUG_NOTICE,("Recovery has finished\n"));
853 state = talloc(ctdb, struct recovery_callback_state);
854 CTDB_NO_MEMORY(ctdb, state);
856 state->c = talloc_steal(state, c);
858 ctdb_disable_monitoring(ctdb);
860 ret = ctdb_event_script_callback(ctdb,
861 timeval_current_ofs(ctdb->tunable.script_timeout, 0),
863 ctdb_end_recovery_callback,
867 ctdb_enable_monitoring(ctdb);
869 DEBUG(DEBUG_ERR,(__location__ " Failed to end recovery\n"));
874 /* tell the control that we will be reply asynchronously */
880 called when the 'startrecovery' event script has finished
882 static void ctdb_start_recovery_callback(struct ctdb_context *ctdb, int status, void *p)
884 struct recovery_callback_state *state = talloc_get_type(p, struct recovery_callback_state);
887 DEBUG(DEBUG_ERR,(__location__ " startrecovery event script failed (status %d)\n", status));
890 ctdb_request_control_reply(ctdb, state->c, NULL, status, NULL);
895 run the startrecovery eventscript
897 int32_t ctdb_control_start_recovery(struct ctdb_context *ctdb,
898 struct ctdb_req_control *c,
902 struct recovery_callback_state *state;
904 DEBUG(DEBUG_NOTICE,(__location__ " startrecovery eventscript has been invoked\n"));
905 gettimeofday(&ctdb->last_recovery_started, NULL);
907 state = talloc(ctdb, struct recovery_callback_state);
908 CTDB_NO_MEMORY(ctdb, state);
910 state->c = talloc_steal(state, c);
912 ctdb_disable_monitoring(ctdb);
914 ret = ctdb_event_script_callback(ctdb,
915 timeval_current_ofs(ctdb->tunable.script_timeout, 0),
917 ctdb_start_recovery_callback,
918 state, "startrecovery");
921 DEBUG(DEBUG_ERR,(__location__ " Failed to start recovery\n"));
926 /* tell the control that we will be reply asynchronously */
932 try to delete all these records as part of the vacuuming process
933 and return the records we failed to delete
935 int32_t ctdb_control_try_delete_records(struct ctdb_context *ctdb, TDB_DATA indata, TDB_DATA *outdata)
937 struct ctdb_marshall_buffer *reply = (struct ctdb_marshall_buffer *)indata.dptr;
938 struct ctdb_db_context *ctdb_db;
940 struct ctdb_rec_data *rec;
941 struct ctdb_marshall_buffer *records;
943 if (indata.dsize < offsetof(struct ctdb_marshall_buffer, data)) {
944 DEBUG(DEBUG_ERR,(__location__ " invalid data in try_delete_records\n"));
948 ctdb_db = find_ctdb_db(ctdb, reply->db_id);
950 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", reply->db_id));
955 DEBUG(DEBUG_DEBUG,("starting try_delete_records of %u records for dbid 0x%x\n",
956 reply->count, reply->db_id));
959 /* create a blob to send back the records we couldnt delete */
960 records = (struct ctdb_marshall_buffer *)
961 talloc_zero_size(outdata,
962 offsetof(struct ctdb_marshall_buffer, data));
963 if (records == NULL) {
964 DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
967 records->db_id = ctdb_db->db_id;
970 rec = (struct ctdb_rec_data *)&reply->data[0];
971 for (i=0;i<reply->count;i++) {
974 key.dptr = &rec->data[0];
975 key.dsize = rec->keylen;
976 data.dptr = &rec->data[key.dsize];
977 data.dsize = rec->datalen;
979 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
980 DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record in indata\n"));
984 /* If we cant delete the record we must add it to the reply
985 so the lmaster knows it may not purge this record
987 if (delete_tdb_record(ctdb, ctdb_db, rec) != 0) {
989 struct ctdb_ltdb_header *hdr;
991 hdr = (struct ctdb_ltdb_header *)data.dptr;
992 data.dptr += sizeof(*hdr);
993 data.dsize -= sizeof(*hdr);
995 DEBUG(DEBUG_INFO, (__location__ " Failed to vacuum delete record with hash 0x%08x\n", ctdb_hash(&key)));
997 old_size = talloc_get_size(records);
998 records = talloc_realloc_size(outdata, records, old_size + rec->length);
999 if (records == NULL) {
1000 DEBUG(DEBUG_ERR,(__location__ " Failed to expand\n"));
1004 memcpy(old_size+(uint8_t *)records, rec, rec->length);
1007 rec = (struct ctdb_rec_data *)(rec->length + (uint8_t *)rec);
1011 outdata->dptr = (uint8_t *)records;
1012 outdata->dsize = talloc_get_size(records);
1020 int32_t ctdb_control_get_capabilities(struct ctdb_context *ctdb, TDB_DATA *outdata)
1022 uint32_t *capabilities = NULL;
1024 capabilities = talloc(outdata, uint32_t);
1025 CTDB_NO_MEMORY(ctdb, capabilities);
1026 *capabilities = ctdb->capabilities;
1028 outdata->dsize = sizeof(uint32_t);
1029 outdata->dptr = (uint8_t *)capabilities;
1034 static void ctdb_recd_ping_timeout(struct event_context *ev, struct timed_event *te, struct timeval t, void *p)
1036 struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context);
1037 uint32_t *count = talloc_get_type(ctdb->recd_ping_count, uint32_t);
1039 DEBUG(DEBUG_ERR, (__location__ " Recovery daemon ping timeout. Count : %u\n", *count));
1041 if (*count < ctdb->tunable.recd_ping_failcount) {
1043 event_add_timed(ctdb->ev, ctdb->recd_ping_count,
1044 timeval_current_ofs(ctdb->tunable.recd_ping_timeout, 0),
1045 ctdb_recd_ping_timeout, ctdb);
1049 DEBUG(DEBUG_ERR, (__location__ " Final timeout for recovery daemon ping. Shutting down ctdb daemon\n"));
1051 ctdb_stop_recoverd(ctdb);
1052 ctdb_stop_keepalive(ctdb);
1053 ctdb_stop_monitoring(ctdb);
1054 ctdb_release_all_ips(ctdb);
1055 if (ctdb->methods != NULL) {
1056 ctdb->methods->shutdown(ctdb);
1058 ctdb_event_script(ctdb, "shutdown");
1059 DEBUG(DEBUG_ERR, (__location__ " Recovery daemon ping timeout. Daemon has been shut down.\n"));
1063 /* The recovery daemon will ping us at regular intervals.
1064 If we havent been pinged for a while we assume the recovery
1065 daemon is inoperable and we shut down.
1067 int32_t ctdb_control_recd_ping(struct ctdb_context *ctdb)
1069 talloc_free(ctdb->recd_ping_count);
1071 ctdb->recd_ping_count = talloc_zero(ctdb, uint32_t);
1072 CTDB_NO_MEMORY(ctdb, ctdb->recd_ping_count);
1074 if (ctdb->tunable.recd_ping_timeout != 0) {
1075 event_add_timed(ctdb->ev, ctdb->recd_ping_count,
1076 timeval_current_ofs(ctdb->tunable.recd_ping_timeout, 0),
1077 ctdb_recd_ping_timeout, ctdb);
1085 int32_t ctdb_control_set_recmaster(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata)
1087 CHECK_CONTROL_DATA_SIZE(sizeof(uint32_t));
1088 if (ctdb->freeze_mode != CTDB_FREEZE_FROZEN) {
1089 DEBUG(DEBUG_NOTICE,("Attempt to set recmaster when not frozen\n"));
1092 ctdb->recovery_master = ((uint32_t *)(&indata.dptr[0]))[0];