4 Copyright (C) Andrew Tridgell 2007
5 Copyright (C) Ronnie Sahlberg 2007
7 This program is free software; you can redistribute it and/or modify
8 it under the terms of the GNU General Public License as published by
9 the Free Software Foundation; either version 3 of the License, or
10 (at your option) any later version.
12 This program is distributed in the hope that it will be useful,
13 but WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 GNU General Public License for more details.
17 You should have received a copy of the GNU General Public License
18 along with this program; if not, see <http://www.gnu.org/licenses/>.
21 #include "lib/events/events.h"
22 #include "lib/tdb/include/tdb.h"
23 #include "system/time.h"
24 #include "system/network.h"
25 #include "system/filesys.h"
26 #include "system/wait.h"
27 #include "../include/ctdb_private.h"
28 #include "lib/util/dlinklist.h"
32 lock all databases - mark only
34 static int ctdb_lock_all_databases_mark(struct ctdb_context *ctdb)
36 struct ctdb_db_context *ctdb_db;
37 if (ctdb->freeze_mode != CTDB_FREEZE_FROZEN) {
38 DEBUG(DEBUG_ERR,("Attempt to mark all databases locked when not frozen\n"));
41 for (ctdb_db=ctdb->db_list;ctdb_db;ctdb_db=ctdb_db->next) {
42 if (tdb_lockall_mark(ctdb_db->ltdb->tdb) != 0) {
50 lock all databases - unmark only
52 static int ctdb_lock_all_databases_unmark(struct ctdb_context *ctdb)
54 struct ctdb_db_context *ctdb_db;
55 if (ctdb->freeze_mode != CTDB_FREEZE_FROZEN) {
56 DEBUG(DEBUG_ERR,("Attempt to unmark all databases locked when not frozen\n"));
59 for (ctdb_db=ctdb->db_list;ctdb_db;ctdb_db=ctdb_db->next) {
60 if (tdb_lockall_unmark(ctdb_db->ltdb->tdb) != 0) {
69 ctdb_control_getvnnmap(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
71 CHECK_CONTROL_DATA_SIZE(0);
72 struct ctdb_vnn_map_wire *map;
75 len = offsetof(struct ctdb_vnn_map_wire, map) + sizeof(uint32_t)*ctdb->vnn_map->size;
76 map = talloc_size(outdata, len);
77 CTDB_NO_MEMORY(ctdb, map);
79 map->generation = ctdb->vnn_map->generation;
80 map->size = ctdb->vnn_map->size;
81 memcpy(map->map, ctdb->vnn_map->map, sizeof(uint32_t)*map->size);
84 outdata->dptr = (uint8_t *)map;
90 ctdb_control_setvnnmap(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
92 struct ctdb_vnn_map_wire *map = (struct ctdb_vnn_map_wire *)indata.dptr;
94 if (ctdb->freeze_mode != CTDB_FREEZE_FROZEN) {
95 DEBUG(DEBUG_ERR,("Attempt to set vnnmap when not frozen\n"));
99 talloc_free(ctdb->vnn_map);
101 ctdb->vnn_map = talloc(ctdb, struct ctdb_vnn_map);
102 CTDB_NO_MEMORY(ctdb, ctdb->vnn_map);
104 ctdb->vnn_map->generation = map->generation;
105 ctdb->vnn_map->size = map->size;
106 ctdb->vnn_map->map = talloc_array(ctdb->vnn_map, uint32_t, map->size);
107 CTDB_NO_MEMORY(ctdb, ctdb->vnn_map->map);
109 memcpy(ctdb->vnn_map->map, map->map, sizeof(uint32_t)*map->size);
115 ctdb_control_getdbmap(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
118 struct ctdb_db_context *ctdb_db;
119 struct ctdb_dbid_map *dbid_map;
121 CHECK_CONTROL_DATA_SIZE(0);
124 for(ctdb_db=ctdb->db_list;ctdb_db;ctdb_db=ctdb_db->next){
129 outdata->dsize = offsetof(struct ctdb_dbid_map, dbs) + sizeof(dbid_map->dbs[0])*len;
130 outdata->dptr = (unsigned char *)talloc_zero_size(outdata, outdata->dsize);
131 if (!outdata->dptr) {
132 DEBUG(DEBUG_ALERT, (__location__ " Failed to allocate dbmap array\n"));
136 dbid_map = (struct ctdb_dbid_map *)outdata->dptr;
138 for (i=0,ctdb_db=ctdb->db_list;ctdb_db;i++,ctdb_db=ctdb_db->next){
139 dbid_map->dbs[i].dbid = ctdb_db->db_id;
140 dbid_map->dbs[i].persistent = ctdb_db->persistent;
147 ctdb_control_getnodemap(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
149 uint32_t i, num_nodes;
150 struct ctdb_node_map *node_map;
152 CHECK_CONTROL_DATA_SIZE(0);
154 num_nodes = ctdb->num_nodes;
156 outdata->dsize = offsetof(struct ctdb_node_map, nodes) + num_nodes*sizeof(struct ctdb_node_and_flags);
157 outdata->dptr = (unsigned char *)talloc_zero_size(outdata, outdata->dsize);
158 if (!outdata->dptr) {
159 DEBUG(DEBUG_ALERT, (__location__ " Failed to allocate nodemap array\n"));
163 node_map = (struct ctdb_node_map *)outdata->dptr;
164 node_map->num = num_nodes;
165 for (i=0; i<num_nodes; i++) {
166 if (parse_ip(ctdb->nodes[i]->address.address, &node_map->nodes[i].addr) == 0) {
167 DEBUG(DEBUG_ERR, (__location__ " Failed to parse %s into a sockaddr\n", ctdb->nodes[i]->address.address));
170 node_map->nodes[i].pnn = ctdb->nodes[i]->pnn;
171 node_map->nodes[i].flags = ctdb->nodes[i]->flags;
178 get an old style ipv4-only nodemap
181 ctdb_control_getnodemapv4(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
183 uint32_t i, num_nodes;
184 struct ctdb_node_mapv4 *node_map;
186 CHECK_CONTROL_DATA_SIZE(0);
188 num_nodes = ctdb->num_nodes;
190 outdata->dsize = offsetof(struct ctdb_node_mapv4, nodes) + num_nodes*sizeof(struct ctdb_node_and_flagsv4);
191 outdata->dptr = (unsigned char *)talloc_zero_size(outdata, outdata->dsize);
192 if (!outdata->dptr) {
193 DEBUG(DEBUG_ALERT, (__location__ " Failed to allocate nodemap array\n"));
197 node_map = (struct ctdb_node_mapv4 *)outdata->dptr;
198 node_map->num = num_nodes;
199 for (i=0; i<num_nodes; i++) {
200 if (parse_ipv4(ctdb->nodes[i]->address.address, 0, &node_map->nodes[i].sin) == 0) {
201 DEBUG(DEBUG_ERR, (__location__ " Failed to parse %s into a sockaddr\n", ctdb->nodes[i]->address.address));
205 node_map->nodes[i].pnn = ctdb->nodes[i]->pnn;
206 node_map->nodes[i].flags = ctdb->nodes[i]->flags;
213 ctdb_reload_nodes_event(struct event_context *ev, struct timed_event *te,
214 struct timeval t, void *private_data)
217 struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
219 struct ctdb_node **nodes;
221 tmp_ctx = talloc_new(ctdb);
223 /* steal the old nodes file for a while */
224 talloc_steal(tmp_ctx, ctdb->nodes);
227 num_nodes = ctdb->num_nodes;
230 /* load the new nodes file */
231 ctdb_load_nodes_file(ctdb);
233 for (i=0; i<ctdb->num_nodes; i++) {
234 /* keep any identical pre-existing nodes and connections */
235 if ((i < num_nodes) && ctdb_same_address(&ctdb->nodes[i]->address, &nodes[i]->address)) {
236 talloc_free(ctdb->nodes[i]);
237 ctdb->nodes[i] = talloc_steal(ctdb->nodes, nodes[i]);
241 /* any new or different nodes must be added */
242 if (ctdb->methods->add_node(ctdb->nodes[i]) != 0) {
243 DEBUG(DEBUG_CRIT, (__location__ " methods->add_node failed at %d\n", i));
244 ctdb_fatal(ctdb, "failed to add node. shutting down\n");
246 if (ctdb->methods->connect_node(ctdb->nodes[i]) != 0) {
247 DEBUG(DEBUG_CRIT, (__location__ " methods->add_connect failed at %d\n", i));
248 ctdb_fatal(ctdb, "failed to connect to node. shutting down\n");
252 talloc_free(tmp_ctx);
257 reload the nodes file after a short delay (so that we can send the response
261 ctdb_control_reload_nodes_file(struct ctdb_context *ctdb, uint32_t opcode)
263 event_add_timed(ctdb->ev, ctdb, timeval_current_ofs(1,0), ctdb_reload_nodes_event, ctdb);
269 a traverse function for pulling all relevent records from pulldb
272 struct ctdb_context *ctdb;
273 struct ctdb_marshall_buffer *pulldata;
278 static int traverse_pulldb(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p)
280 struct pulldb_data *params = (struct pulldb_data *)p;
281 struct ctdb_rec_data *rec;
283 /* add the record to the blob */
284 rec = ctdb_marshall_record(params->pulldata, 0, key, NULL, data);
286 params->failed = true;
289 params->pulldata = talloc_realloc_size(NULL, params->pulldata, rec->length + params->len);
290 if (params->pulldata == NULL) {
291 DEBUG(DEBUG_ERR,(__location__ " Failed to expand pulldb_data to %u (%u records)\n",
292 rec->length + params->len, params->pulldata->count));
293 params->failed = true;
296 params->pulldata->count++;
297 memcpy(params->len+(uint8_t *)params->pulldata, rec, rec->length);
298 params->len += rec->length;
305 pul a bunch of records from a ltdb, filtering by lmaster
307 int32_t ctdb_control_pull_db(struct ctdb_context *ctdb, TDB_DATA indata, TDB_DATA *outdata)
309 struct ctdb_control_pulldb *pull;
310 struct ctdb_db_context *ctdb_db;
311 struct pulldb_data params;
312 struct ctdb_marshall_buffer *reply;
314 if (ctdb->freeze_mode != CTDB_FREEZE_FROZEN) {
315 DEBUG(DEBUG_DEBUG,("rejecting ctdb_control_pull_db when not frozen\n"));
319 pull = (struct ctdb_control_pulldb *)indata.dptr;
321 ctdb_db = find_ctdb_db(ctdb, pull->db_id);
323 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", pull->db_id));
327 reply = talloc_zero(outdata, struct ctdb_marshall_buffer);
328 CTDB_NO_MEMORY(ctdb, reply);
330 reply->db_id = pull->db_id;
333 params.pulldata = reply;
334 params.len = offsetof(struct ctdb_marshall_buffer, data);
335 params.failed = false;
337 if (ctdb_lock_all_databases_mark(ctdb) != 0) {
338 DEBUG(DEBUG_ERR,(__location__ " Failed to get lock on entired db - failing\n"));
342 if (tdb_traverse_read(ctdb_db->ltdb->tdb, traverse_pulldb, ¶ms) == -1) {
343 DEBUG(DEBUG_ERR,(__location__ " Failed to get traverse db '%s'\n", ctdb_db->db_name));
344 ctdb_lock_all_databases_unmark(ctdb);
345 talloc_free(params.pulldata);
349 ctdb_lock_all_databases_unmark(ctdb);
351 outdata->dptr = (uint8_t *)params.pulldata;
352 outdata->dsize = params.len;
358 push a bunch of records into a ltdb, filtering by rsn
360 int32_t ctdb_control_push_db(struct ctdb_context *ctdb, TDB_DATA indata)
362 struct ctdb_marshall_buffer *reply = (struct ctdb_marshall_buffer *)indata.dptr;
363 struct ctdb_db_context *ctdb_db;
365 struct ctdb_rec_data *rec;
367 if (ctdb->freeze_mode != CTDB_FREEZE_FROZEN) {
368 DEBUG(DEBUG_DEBUG,("rejecting ctdb_control_push_db when not frozen\n"));
372 if (indata.dsize < offsetof(struct ctdb_marshall_buffer, data)) {
373 DEBUG(DEBUG_ERR,(__location__ " invalid data in pulldb reply\n"));
377 ctdb_db = find_ctdb_db(ctdb, reply->db_id);
379 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", reply->db_id));
383 if (ctdb_lock_all_databases_mark(ctdb) != 0) {
384 DEBUG(DEBUG_ERR,(__location__ " Failed to get lock on entired db - failing\n"));
388 rec = (struct ctdb_rec_data *)&reply->data[0];
390 DEBUG(DEBUG_INFO,("starting push of %u records for dbid 0x%x\n",
391 reply->count, reply->db_id));
393 for (i=0;i<reply->count;i++) {
395 struct ctdb_ltdb_header *hdr;
397 key.dptr = &rec->data[0];
398 key.dsize = rec->keylen;
399 data.dptr = &rec->data[key.dsize];
400 data.dsize = rec->datalen;
402 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
403 DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record\n"));
406 hdr = (struct ctdb_ltdb_header *)data.dptr;
407 data.dptr += sizeof(*hdr);
408 data.dsize -= sizeof(*hdr);
410 ret = ctdb_ltdb_store(ctdb_db, key, hdr, data);
412 DEBUG(DEBUG_CRIT, (__location__ " Unable to store record\n"));
416 rec = (struct ctdb_rec_data *)(rec->length + (uint8_t *)rec);
419 DEBUG(DEBUG_DEBUG,("finished push of %u records for dbid 0x%x\n",
420 reply->count, reply->db_id));
422 ctdb_lock_all_databases_unmark(ctdb);
426 ctdb_lock_all_databases_unmark(ctdb);
431 static int traverse_setdmaster(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p)
433 uint32_t *dmaster = (uint32_t *)p;
434 struct ctdb_ltdb_header *header = (struct ctdb_ltdb_header *)data.dptr;
437 /* skip if already correct */
438 if (header->dmaster == *dmaster) {
442 header->dmaster = *dmaster;
444 ret = tdb_store(tdb, key, data, TDB_REPLACE);
446 DEBUG(DEBUG_CRIT,(__location__ " failed to write tdb data back ret:%d\n",ret));
450 /* TODO: add error checking here */
455 int32_t ctdb_control_set_dmaster(struct ctdb_context *ctdb, TDB_DATA indata)
457 struct ctdb_control_set_dmaster *p = (struct ctdb_control_set_dmaster *)indata.dptr;
458 struct ctdb_db_context *ctdb_db;
460 if (ctdb->freeze_mode != CTDB_FREEZE_FROZEN) {
461 DEBUG(DEBUG_DEBUG,("rejecting ctdb_control_set_dmaster when not frozen\n"));
465 ctdb_db = find_ctdb_db(ctdb, p->db_id);
467 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", p->db_id));
471 if (ctdb_lock_all_databases_mark(ctdb) != 0) {
472 DEBUG(DEBUG_ERR,(__location__ " Failed to get lock on entired db - failing\n"));
476 tdb_traverse(ctdb_db->ltdb->tdb, traverse_setdmaster, &p->dmaster);
478 ctdb_lock_all_databases_unmark(ctdb);
483 struct ctdb_set_recmode_state {
484 struct ctdb_context *ctdb;
485 struct ctdb_req_control *c;
488 struct timed_event *te;
489 struct fd_event *fde;
494 called if our set_recmode child times out. this would happen if
495 ctdb_recovery_lock() would block.
497 static void ctdb_set_recmode_timeout(struct event_context *ev, struct timed_event *te,
498 struct timeval t, void *private_data)
500 struct ctdb_set_recmode_state *state = talloc_get_type(private_data,
501 struct ctdb_set_recmode_state);
503 /* we consider this a success, not a failure, as we failed to
504 set the recovery lock which is what we wanted. This can be
505 caused by the cluster filesystem being very slow to
506 arbitrate locks immediately after a node failure.
508 DEBUG(DEBUG_ERR,(__location__ " set_recmode child process hung/timedout CFS slow to grant locks? (allowing recmode set anyway)\n"));
509 state->ctdb->recovery_mode = state->recmode;
510 ctdb_request_control_reply(state->ctdb, state->c, NULL, 0, NULL);
515 /* when we free the recmode state we must kill any child process.
517 static int set_recmode_destructor(struct ctdb_set_recmode_state *state)
519 kill(state->child, SIGKILL);
523 /* this is called when the client process has completed ctdb_recovery_lock()
524 and has written data back to us through the pipe.
526 static void set_recmode_handler(struct event_context *ev, struct fd_event *fde,
527 uint16_t flags, void *private_data)
529 struct ctdb_set_recmode_state *state= talloc_get_type(private_data,
530 struct ctdb_set_recmode_state);
534 /* we got a response from our child process so we can abort the
537 talloc_free(state->te);
541 /* read the childs status when trying to lock the reclock file.
542 child wrote 0 if everything is fine and 1 if it did manage
543 to lock the file, which would be a problem since that means
544 we got a request to exit from recovery but we could still lock
545 the file which at this time SHOULD be locked by the recovery
546 daemon on the recmaster
548 ret = read(state->fd[0], &c, 1);
549 if (ret != 1 || c != 0) {
550 ctdb_request_control_reply(state->ctdb, state->c, NULL, -1, "managed to lock reclock file from inside daemon");
555 state->ctdb->recovery_mode = state->recmode;
557 ctdb_request_control_reply(state->ctdb, state->c, NULL, 0, NULL);
563 ctdb_drop_all_ips_event(struct event_context *ev, struct timed_event *te,
564 struct timeval t, void *private_data)
566 struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
568 DEBUG(DEBUG_ERR,(__location__ " Been in recovery mode for too long. Dropping all IPS\n"));
569 talloc_free(ctdb->release_ips_ctx);
570 ctdb->release_ips_ctx = NULL;
572 ctdb_release_all_ips(ctdb);
576 set the recovery mode
578 int32_t ctdb_control_set_recmode(struct ctdb_context *ctdb,
579 struct ctdb_req_control *c,
580 TDB_DATA indata, bool *async_reply,
581 const char **errormsg)
583 uint32_t recmode = *(uint32_t *)indata.dptr;
585 struct ctdb_set_recmode_state *state;
586 pid_t parent = getpid();
588 /* if we enter recovery but stay in recovery for too long
589 we will eventually drop all our ip addresses
591 if (recmode == CTDB_RECOVERY_NORMAL) {
592 talloc_free(ctdb->release_ips_ctx);
593 ctdb->release_ips_ctx = NULL;
595 talloc_free(ctdb->release_ips_ctx);
596 ctdb->release_ips_ctx = talloc_new(ctdb);
597 CTDB_NO_MEMORY(ctdb, ctdb->release_ips_ctx);
599 event_add_timed(ctdb->ev, ctdb->release_ips_ctx, timeval_current_ofs(ctdb->tunable.recovery_drop_all_ips, 0), ctdb_drop_all_ips_event, ctdb);
603 if (ctdb->freeze_mode != CTDB_FREEZE_FROZEN) {
604 DEBUG(DEBUG_ERR,("Attempt to change recovery mode to %u when not frozen\n",
606 (*errormsg) = "Cannot change recovery mode while not frozen";
610 if (recmode != ctdb->recovery_mode) {
611 DEBUG(DEBUG_NOTICE,(__location__ " Recovery mode set to %s\n",
612 recmode==CTDB_RECOVERY_NORMAL?"NORMAL":"ACTIVE"));
615 if (recmode != CTDB_RECOVERY_NORMAL ||
616 ctdb->recovery_mode != CTDB_RECOVERY_ACTIVE) {
617 ctdb->recovery_mode = recmode;
621 /* some special handling when ending recovery mode */
623 /* force the databased to thaw */
624 if (ctdb->freeze_handle) {
625 ctdb_control_thaw(ctdb);
628 state = talloc(ctdb, struct ctdb_set_recmode_state);
629 CTDB_NO_MEMORY(ctdb, state);
632 if (ctdb->tunable.verify_recovery_lock == 0) {
633 /* dont need to verify the reclock file */
634 ctdb->recovery_mode = recmode;
638 /* For the rest of what needs to be done, we need to do this in
639 a child process since
640 1, the call to ctdb_recovery_lock() can block if the cluster
641 filesystem is in the process of recovery.
643 ret = pipe(state->fd);
646 DEBUG(DEBUG_CRIT,(__location__ " Failed to open pipe for set_recmode child\n"));
650 state->child = fork();
651 if (state->child == (pid_t)-1) {
658 if (state->child == 0) {
662 /* we should not be able to get the lock on the reclock file,
663 as it should be held by the recovery master
665 if (ctdb_recovery_lock(ctdb, false)) {
666 DEBUG(DEBUG_CRIT,("ERROR: recovery lock file %s not locked when recovering!\n", ctdb->recovery_lock_file));
670 write(state->fd[1], &cc, 1);
671 /* make sure we die when our parent dies */
672 while (kill(parent, 0) == 0 || errno != ESRCH) {
674 write(state->fd[1], &cc, 1);
680 talloc_set_destructor(state, set_recmode_destructor);
682 state->te = event_add_timed(ctdb->ev, state, timeval_current_ofs(15, 0),
683 ctdb_set_recmode_timeout, state);
685 state->fde = event_add_fd(ctdb->ev, state, state->fd[0],
686 EVENT_FD_READ|EVENT_FD_AUTOCLOSE,
689 if (state->fde == NULL) {
695 state->recmode = recmode;
696 state->c = talloc_steal(state, c);
705 try and get the recovery lock in shared storage - should only work
706 on the recovery master recovery daemon. Anywhere else is a bug
708 bool ctdb_recovery_lock(struct ctdb_context *ctdb, bool keep)
712 if (ctdb->recovery_lock_fd != -1) {
713 close(ctdb->recovery_lock_fd);
715 ctdb->recovery_lock_fd = open(ctdb->recovery_lock_file, O_RDWR|O_CREAT, 0600);
716 if (ctdb->recovery_lock_fd == -1) {
717 DEBUG(DEBUG_ERR,("ctdb_recovery_lock: Unable to open %s - (%s)\n",
718 ctdb->recovery_lock_file, strerror(errno)));
722 set_close_on_exec(ctdb->recovery_lock_fd);
724 lock.l_type = F_WRLCK;
725 lock.l_whence = SEEK_SET;
730 if (fcntl(ctdb->recovery_lock_fd, F_SETLK, &lock) != 0) {
731 close(ctdb->recovery_lock_fd);
732 ctdb->recovery_lock_fd = -1;
734 DEBUG(DEBUG_CRIT,("ctdb_recovery_lock: Failed to get recovery lock on '%s'\n", ctdb->recovery_lock_file));
740 close(ctdb->recovery_lock_fd);
741 ctdb->recovery_lock_fd = -1;
744 DEBUG(DEBUG_NOTICE,("ctdb_recovery_lock: Got recovery lock on '%s'\n", ctdb->recovery_lock_file));
750 delete a record as part of the vacuum process
751 only delete if we are not lmaster or dmaster, and our rsn is <= the provided rsn
752 use non-blocking locks
754 return 0 if the record was successfully deleted (i.e. it does not exist
755 when the function returns)
756 or !0 is the record still exists in the tdb after returning.
758 static int delete_tdb_record(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, struct ctdb_rec_data *rec)
761 struct ctdb_ltdb_header *hdr, *hdr2;
763 /* these are really internal tdb functions - but we need them here for
764 non-blocking lock of the freelist */
765 int tdb_lock_nonblock(struct tdb_context *tdb, int list, int ltype);
766 int tdb_unlock(struct tdb_context *tdb, int list, int ltype);
769 key.dsize = rec->keylen;
770 key.dptr = &rec->data[0];
771 data.dsize = rec->datalen;
772 data.dptr = &rec->data[rec->keylen];
774 if (ctdb_lmaster(ctdb, &key) == ctdb->pnn) {
775 DEBUG(DEBUG_INFO,(__location__ " Called delete on record where we are lmaster\n"));
779 if (data.dsize != sizeof(struct ctdb_ltdb_header)) {
780 DEBUG(DEBUG_ERR,(__location__ " Bad record size\n"));
784 hdr = (struct ctdb_ltdb_header *)data.dptr;
786 /* use a non-blocking lock */
787 if (tdb_chainlock_nonblock(ctdb_db->ltdb->tdb, key) != 0) {
791 data = tdb_fetch(ctdb_db->ltdb->tdb, key);
792 if (data.dptr == NULL) {
793 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
797 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
798 if (tdb_lock_nonblock(ctdb_db->ltdb->tdb, -1, F_WRLCK) == 0) {
799 tdb_delete(ctdb_db->ltdb->tdb, key);
800 tdb_unlock(ctdb_db->ltdb->tdb, -1, F_WRLCK);
801 DEBUG(DEBUG_CRIT,(__location__ " Deleted corrupt record\n"));
803 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
808 hdr2 = (struct ctdb_ltdb_header *)data.dptr;
810 if (hdr2->rsn > hdr->rsn) {
811 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
812 DEBUG(DEBUG_INFO,(__location__ " Skipping record with rsn=%llu - called with rsn=%llu\n",
813 (unsigned long long)hdr2->rsn, (unsigned long long)hdr->rsn));
818 if (hdr2->dmaster == ctdb->pnn) {
819 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
820 DEBUG(DEBUG_INFO,(__location__ " Attempted delete record where we are the dmaster\n"));
825 if (tdb_lock_nonblock(ctdb_db->ltdb->tdb, -1, F_WRLCK) != 0) {
826 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
831 if (tdb_delete(ctdb_db->ltdb->tdb, key) != 0) {
832 tdb_unlock(ctdb_db->ltdb->tdb, -1, F_WRLCK);
833 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
834 DEBUG(DEBUG_INFO,(__location__ " Failed to delete record\n"));
839 tdb_unlock(ctdb_db->ltdb->tdb, -1, F_WRLCK);
840 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
847 struct recovery_callback_state {
848 struct ctdb_req_control *c;
853 called when the 'recovered' event script has finished
855 static void ctdb_end_recovery_callback(struct ctdb_context *ctdb, int status, void *p)
857 struct recovery_callback_state *state = talloc_get_type(p, struct recovery_callback_state);
859 ctdb_enable_monitoring(ctdb);
862 DEBUG(DEBUG_ERR,(__location__ " recovered event script failed (status %d)\n", status));
865 ctdb_request_control_reply(ctdb, state->c, NULL, status, NULL);
868 gettimeofday(&ctdb->last_recovery_finished, NULL);
872 recovery has finished
874 int32_t ctdb_control_end_recovery(struct ctdb_context *ctdb,
875 struct ctdb_req_control *c,
879 struct recovery_callback_state *state;
881 DEBUG(DEBUG_NOTICE,("Recovery has finished\n"));
883 state = talloc(ctdb, struct recovery_callback_state);
884 CTDB_NO_MEMORY(ctdb, state);
886 state->c = talloc_steal(state, c);
888 ctdb_disable_monitoring(ctdb);
890 ret = ctdb_event_script_callback(ctdb,
891 timeval_current_ofs(ctdb->tunable.script_timeout, 0),
893 ctdb_end_recovery_callback,
897 ctdb_enable_monitoring(ctdb);
899 DEBUG(DEBUG_ERR,(__location__ " Failed to end recovery\n"));
904 /* tell the control that we will be reply asynchronously */
910 called when the 'startrecovery' event script has finished
912 static void ctdb_start_recovery_callback(struct ctdb_context *ctdb, int status, void *p)
914 struct recovery_callback_state *state = talloc_get_type(p, struct recovery_callback_state);
917 DEBUG(DEBUG_ERR,(__location__ " startrecovery event script failed (status %d)\n", status));
920 ctdb_request_control_reply(ctdb, state->c, NULL, status, NULL);
925 run the startrecovery eventscript
927 int32_t ctdb_control_start_recovery(struct ctdb_context *ctdb,
928 struct ctdb_req_control *c,
932 struct recovery_callback_state *state;
934 DEBUG(DEBUG_NOTICE,(__location__ " startrecovery eventscript has been invoked\n"));
935 gettimeofday(&ctdb->last_recovery_started, NULL);
937 state = talloc(ctdb, struct recovery_callback_state);
938 CTDB_NO_MEMORY(ctdb, state);
940 state->c = talloc_steal(state, c);
942 ctdb_disable_monitoring(ctdb);
944 ret = ctdb_event_script_callback(ctdb,
945 timeval_current_ofs(ctdb->tunable.script_timeout, 0),
947 ctdb_start_recovery_callback,
948 state, "startrecovery");
951 DEBUG(DEBUG_ERR,(__location__ " Failed to start recovery\n"));
956 /* tell the control that we will be reply asynchronously */
962 try to delete all these records as part of the vacuuming process
963 and return the records we failed to delete
965 int32_t ctdb_control_try_delete_records(struct ctdb_context *ctdb, TDB_DATA indata, TDB_DATA *outdata)
967 struct ctdb_marshall_buffer *reply = (struct ctdb_marshall_buffer *)indata.dptr;
968 struct ctdb_db_context *ctdb_db;
970 struct ctdb_rec_data *rec;
971 struct ctdb_marshall_buffer *records;
973 if (indata.dsize < offsetof(struct ctdb_marshall_buffer, data)) {
974 DEBUG(DEBUG_ERR,(__location__ " invalid data in try_delete_records\n"));
978 ctdb_db = find_ctdb_db(ctdb, reply->db_id);
980 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", reply->db_id));
985 DEBUG(DEBUG_DEBUG,("starting try_delete_records of %u records for dbid 0x%x\n",
986 reply->count, reply->db_id));
989 /* create a blob to send back the records we couldnt delete */
990 records = (struct ctdb_marshall_buffer *)
991 talloc_zero_size(outdata,
992 offsetof(struct ctdb_marshall_buffer, data));
993 if (records == NULL) {
994 DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
997 records->db_id = ctdb_db->db_id;
1000 rec = (struct ctdb_rec_data *)&reply->data[0];
1001 for (i=0;i<reply->count;i++) {
1004 key.dptr = &rec->data[0];
1005 key.dsize = rec->keylen;
1006 data.dptr = &rec->data[key.dsize];
1007 data.dsize = rec->datalen;
1009 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
1010 DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record in indata\n"));
1014 /* If we cant delete the record we must add it to the reply
1015 so the lmaster knows it may not purge this record
1017 if (delete_tdb_record(ctdb, ctdb_db, rec) != 0) {
1019 struct ctdb_ltdb_header *hdr;
1021 hdr = (struct ctdb_ltdb_header *)data.dptr;
1022 data.dptr += sizeof(*hdr);
1023 data.dsize -= sizeof(*hdr);
1025 DEBUG(DEBUG_INFO, (__location__ " Failed to vacuum delete record with hash 0x%08x\n", ctdb_hash(&key)));
1027 old_size = talloc_get_size(records);
1028 records = talloc_realloc_size(outdata, records, old_size + rec->length);
1029 if (records == NULL) {
1030 DEBUG(DEBUG_ERR,(__location__ " Failed to expand\n"));
1034 memcpy(old_size+(uint8_t *)records, rec, rec->length);
1037 rec = (struct ctdb_rec_data *)(rec->length + (uint8_t *)rec);
1041 outdata->dptr = (uint8_t *)records;
1042 outdata->dsize = talloc_get_size(records);
1050 int32_t ctdb_control_get_capabilities(struct ctdb_context *ctdb, TDB_DATA *outdata)
1052 uint32_t *capabilities = NULL;
1054 capabilities = talloc(outdata, uint32_t);
1055 CTDB_NO_MEMORY(ctdb, capabilities);
1056 *capabilities = ctdb->capabilities;
1058 outdata->dsize = sizeof(uint32_t);
1059 outdata->dptr = (uint8_t *)capabilities;
1064 static void ctdb_recd_ping_timeout(struct event_context *ev, struct timed_event *te, struct timeval t, void *p)
1066 struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context);
1067 uint32_t *count = talloc_get_type(ctdb->recd_ping_count, uint32_t);
1069 DEBUG(DEBUG_ERR, (__location__ " Recovery daemon ping timeout. Count : %u\n", *count));
1071 if (*count < ctdb->tunable.recd_ping_failcount) {
1073 event_add_timed(ctdb->ev, ctdb->recd_ping_count,
1074 timeval_current_ofs(ctdb->tunable.recd_ping_timeout, 0),
1075 ctdb_recd_ping_timeout, ctdb);
1079 DEBUG(DEBUG_ERR, (__location__ " Final timeout for recovery daemon ping. Shutting down ctdb daemon\n"));
1081 ctdb_stop_recoverd(ctdb);
1082 ctdb_stop_keepalive(ctdb);
1083 ctdb_stop_monitoring(ctdb);
1084 ctdb_release_all_ips(ctdb);
1085 if (ctdb->methods != NULL) {
1086 ctdb->methods->shutdown(ctdb);
1088 ctdb_event_script(ctdb, "shutdown");
1089 DEBUG(DEBUG_ERR, (__location__ " Recovery daemon ping timeout. Daemon has been shut down.\n"));
1093 /* The recovery daemon will ping us at regular intervals.
1094 If we havent been pinged for a while we assume the recovery
1095 daemon is inoperable and we shut down.
1097 int32_t ctdb_control_recd_ping(struct ctdb_context *ctdb)
1099 talloc_free(ctdb->recd_ping_count);
1101 ctdb->recd_ping_count = talloc_zero(ctdb, uint32_t);
1102 CTDB_NO_MEMORY(ctdb, ctdb->recd_ping_count);
1104 if (ctdb->tunable.recd_ping_timeout != 0) {
1105 event_add_timed(ctdb->ev, ctdb->recd_ping_count,
1106 timeval_current_ofs(ctdb->tunable.recd_ping_timeout, 0),
1107 ctdb_recd_ping_timeout, ctdb);
1115 int32_t ctdb_control_set_recmaster(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata)
1117 CHECK_CONTROL_DATA_SIZE(sizeof(uint32_t));
1119 ctdb->recovery_master = ((uint32_t *)(&indata.dptr[0]))[0];