4 Copyright (C) Andrew Tridgell 2007
5 Copyright (C) Ronnie Sahlberg 2007
7 This program is free software; you can redistribute it and/or modify
8 it under the terms of the GNU General Public License as published by
9 the Free Software Foundation; either version 3 of the License, or
10 (at your option) any later version.
12 This program is distributed in the hope that it will be useful,
13 but WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 GNU General Public License for more details.
17 You should have received a copy of the GNU General Public License
18 along with this program; if not, see <http://www.gnu.org/licenses/>.
21 #include "system/time.h"
22 #include "system/network.h"
23 #include "system/filesys.h"
24 #include "system/wait.h"
30 #include "lib/tdb_wrap/tdb_wrap.h"
31 #include "lib/util/dlinklist.h"
32 #include "lib/util/debug.h"
33 #include "lib/util/time.h"
34 #include "lib/util/util_process.h"
36 #include "ctdb_private.h"
37 #include "ctdb_client.h"
39 #include "common/system.h"
40 #include "common/common.h"
41 #include "common/logging.h"
44 ctdb_control_getvnnmap(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
46 struct ctdb_vnn_map_wire *map;
49 CHECK_CONTROL_DATA_SIZE(0);
51 len = offsetof(struct ctdb_vnn_map_wire, map) + sizeof(uint32_t)*ctdb->vnn_map->size;
52 map = talloc_size(outdata, len);
53 CTDB_NO_MEMORY(ctdb, map);
55 map->generation = ctdb->vnn_map->generation;
56 map->size = ctdb->vnn_map->size;
57 memcpy(map->map, ctdb->vnn_map->map, sizeof(uint32_t)*map->size);
60 outdata->dptr = (uint8_t *)map;
66 ctdb_control_setvnnmap(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
68 struct ctdb_vnn_map_wire *map = (struct ctdb_vnn_map_wire *)indata.dptr;
70 if (ctdb->recovery_mode != CTDB_RECOVERY_ACTIVE) {
71 DEBUG(DEBUG_ERR, ("Attempt to set vnnmap when not in recovery\n"));
75 talloc_free(ctdb->vnn_map);
77 ctdb->vnn_map = talloc(ctdb, struct ctdb_vnn_map);
78 CTDB_NO_MEMORY(ctdb, ctdb->vnn_map);
80 ctdb->vnn_map->generation = map->generation;
81 ctdb->vnn_map->size = map->size;
82 ctdb->vnn_map->map = talloc_array(ctdb->vnn_map, uint32_t, map->size);
83 CTDB_NO_MEMORY(ctdb, ctdb->vnn_map->map);
85 memcpy(ctdb->vnn_map->map, map->map, sizeof(uint32_t)*map->size);
91 ctdb_control_getdbmap(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
94 struct ctdb_db_context *ctdb_db;
95 struct ctdb_dbid_map_old *dbid_map;
97 CHECK_CONTROL_DATA_SIZE(0);
100 for(ctdb_db=ctdb->db_list;ctdb_db;ctdb_db=ctdb_db->next){
105 outdata->dsize = offsetof(struct ctdb_dbid_map_old, dbs) + sizeof(dbid_map->dbs[0])*len;
106 outdata->dptr = (unsigned char *)talloc_zero_size(outdata, outdata->dsize);
107 if (!outdata->dptr) {
108 DEBUG(DEBUG_ALERT, (__location__ " Failed to allocate dbmap array\n"));
112 dbid_map = (struct ctdb_dbid_map_old *)outdata->dptr;
114 for (i=0,ctdb_db=ctdb->db_list;ctdb_db;i++,ctdb_db=ctdb_db->next){
115 dbid_map->dbs[i].db_id = ctdb_db->db_id;
116 if (ctdb_db->persistent != 0) {
117 dbid_map->dbs[i].flags |= CTDB_DB_FLAGS_PERSISTENT;
119 if (ctdb_db->readonly != 0) {
120 dbid_map->dbs[i].flags |= CTDB_DB_FLAGS_READONLY;
122 if (ctdb_db->sticky != 0) {
123 dbid_map->dbs[i].flags |= CTDB_DB_FLAGS_STICKY;
131 ctdb_control_getnodemap(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
133 CHECK_CONTROL_DATA_SIZE(0);
135 outdata->dptr = (unsigned char *)ctdb_node_list_to_map(ctdb->nodes,
138 if (outdata->dptr == NULL) {
142 outdata->dsize = talloc_get_size(outdata->dptr);
148 reload the nodes file
151 ctdb_control_reload_nodes_file(struct ctdb_context *ctdb, uint32_t opcode)
155 struct ctdb_node **nodes;
157 tmp_ctx = talloc_new(ctdb);
159 /* steal the old nodes file for a while */
160 talloc_steal(tmp_ctx, ctdb->nodes);
163 num_nodes = ctdb->num_nodes;
166 /* load the new nodes file */
167 ctdb_load_nodes_file(ctdb);
169 for (i=0; i<ctdb->num_nodes; i++) {
170 /* keep any identical pre-existing nodes and connections */
171 if ((i < num_nodes) && ctdb_same_address(&ctdb->nodes[i]->address, &nodes[i]->address)) {
172 talloc_free(ctdb->nodes[i]);
173 ctdb->nodes[i] = talloc_steal(ctdb->nodes, nodes[i]);
177 if (ctdb->nodes[i]->flags & NODE_FLAGS_DELETED) {
181 /* any new or different nodes must be added */
182 if (ctdb->methods->add_node(ctdb->nodes[i]) != 0) {
183 DEBUG(DEBUG_CRIT, (__location__ " methods->add_node failed at %d\n", i));
184 ctdb_fatal(ctdb, "failed to add node. shutting down\n");
186 if (ctdb->methods->connect_node(ctdb->nodes[i]) != 0) {
187 DEBUG(DEBUG_CRIT, (__location__ " methods->add_connect failed at %d\n", i));
188 ctdb_fatal(ctdb, "failed to connect to node. shutting down\n");
192 /* tell the recovery daemon to reaload the nodes file too */
193 ctdb_daemon_send_message(ctdb, ctdb->pnn, CTDB_SRVID_RELOAD_NODES, tdb_null);
195 talloc_free(tmp_ctx);
201 a traverse function for pulling all relevent records from pulldb
204 struct ctdb_context *ctdb;
205 struct ctdb_db_context *ctdb_db;
206 struct ctdb_marshall_buffer *pulldata;
208 uint32_t allocated_len;
212 static int traverse_pulldb(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p)
214 struct pulldb_data *params = (struct pulldb_data *)p;
215 struct ctdb_rec_data_old *rec;
216 struct ctdb_context *ctdb = params->ctdb;
217 struct ctdb_db_context *ctdb_db = params->ctdb_db;
219 /* add the record to the blob */
220 rec = ctdb_marshall_record(params->pulldata, 0, key, NULL, data);
222 params->failed = true;
225 if (params->len + rec->length >= params->allocated_len) {
226 params->allocated_len = rec->length + params->len + ctdb->tunable.pulldb_preallocation_size;
227 params->pulldata = talloc_realloc_size(NULL, params->pulldata, params->allocated_len);
229 if (params->pulldata == NULL) {
230 DEBUG(DEBUG_CRIT,(__location__ " Failed to expand pulldb_data to %u\n", rec->length + params->len));
231 ctdb_fatal(params->ctdb, "failed to allocate memory for recovery. shutting down\n");
233 params->pulldata->count++;
234 memcpy(params->len+(uint8_t *)params->pulldata, rec, rec->length);
235 params->len += rec->length;
237 if (ctdb->tunable.db_record_size_warn != 0 && rec->length > ctdb->tunable.db_record_size_warn) {
238 DEBUG(DEBUG_ERR,("Data record in %s is big. Record size is %d bytes\n", ctdb_db->db_name, (int)rec->length));
247 pull a bunch of records from a ltdb, filtering by lmaster
249 int32_t ctdb_control_pull_db(struct ctdb_context *ctdb, TDB_DATA indata, TDB_DATA *outdata)
251 struct ctdb_pulldb *pull;
252 struct ctdb_db_context *ctdb_db;
253 struct pulldb_data params;
254 struct ctdb_marshall_buffer *reply;
256 pull = (struct ctdb_pulldb *)indata.dptr;
258 ctdb_db = find_ctdb_db(ctdb, pull->db_id);
260 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", pull->db_id));
264 if (!ctdb_db_frozen(ctdb_db)) {
266 ("rejecting ctdb_control_pull_db when not frozen\n"));
270 reply = talloc_zero(outdata, struct ctdb_marshall_buffer);
271 CTDB_NO_MEMORY(ctdb, reply);
273 reply->db_id = pull->db_id;
276 params.ctdb_db = ctdb_db;
277 params.pulldata = reply;
278 params.len = offsetof(struct ctdb_marshall_buffer, data);
279 params.allocated_len = params.len;
280 params.failed = false;
282 if (ctdb_db->unhealthy_reason) {
283 /* this is just a warning, as the tdb should be empty anyway */
284 DEBUG(DEBUG_WARNING,("db(%s) unhealty in ctdb_control_pull_db: %s\n",
285 ctdb_db->db_name, ctdb_db->unhealthy_reason));
288 if (ctdb_lockdb_mark(ctdb_db) != 0) {
289 DEBUG(DEBUG_ERR,(__location__ " Failed to get lock on entire db - failing\n"));
293 if (tdb_traverse_read(ctdb_db->ltdb->tdb, traverse_pulldb, ¶ms) == -1) {
294 DEBUG(DEBUG_ERR,(__location__ " Failed to get traverse db '%s'\n", ctdb_db->db_name));
295 ctdb_lockdb_unmark(ctdb_db);
296 talloc_free(params.pulldata);
300 ctdb_lockdb_unmark(ctdb_db);
302 outdata->dptr = (uint8_t *)params.pulldata;
303 outdata->dsize = params.len;
305 if (ctdb->tunable.db_record_count_warn != 0 && params.pulldata->count > ctdb->tunable.db_record_count_warn) {
306 DEBUG(DEBUG_ERR,("Database %s is big. Contains %d records\n", ctdb_db->db_name, params.pulldata->count));
308 if (ctdb->tunable.db_size_warn != 0 && outdata->dsize > ctdb->tunable.db_size_warn) {
309 DEBUG(DEBUG_ERR,("Database %s is big. Contains %d bytes\n", ctdb_db->db_name, (int)outdata->dsize));
317 push a bunch of records into a ltdb, filtering by rsn
319 int32_t ctdb_control_push_db(struct ctdb_context *ctdb, TDB_DATA indata)
321 struct ctdb_marshall_buffer *reply = (struct ctdb_marshall_buffer *)indata.dptr;
322 struct ctdb_db_context *ctdb_db;
324 struct ctdb_rec_data_old *rec;
326 if (indata.dsize < offsetof(struct ctdb_marshall_buffer, data)) {
327 DEBUG(DEBUG_ERR,(__location__ " invalid data in pulldb reply\n"));
331 ctdb_db = find_ctdb_db(ctdb, reply->db_id);
333 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", reply->db_id));
337 if (!ctdb_db_frozen(ctdb_db)) {
339 ("rejecting ctdb_control_push_db when not frozen\n"));
343 if (ctdb_lockdb_mark(ctdb_db) != 0) {
344 DEBUG(DEBUG_ERR,(__location__ " Failed to get lock on entire db - failing\n"));
348 rec = (struct ctdb_rec_data_old *)&reply->data[0];
350 DEBUG(DEBUG_INFO,("starting push of %u records for dbid 0x%x\n",
351 reply->count, reply->db_id));
353 for (i=0;i<reply->count;i++) {
355 struct ctdb_ltdb_header *hdr;
357 key.dptr = &rec->data[0];
358 key.dsize = rec->keylen;
359 data.dptr = &rec->data[key.dsize];
360 data.dsize = rec->datalen;
362 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
363 DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record\n"));
366 hdr = (struct ctdb_ltdb_header *)data.dptr;
367 /* strip off any read only record flags. All readonly records
368 are revoked implicitely by a recovery
370 hdr->flags &= ~CTDB_REC_RO_FLAGS;
372 data.dptr += sizeof(*hdr);
373 data.dsize -= sizeof(*hdr);
375 ret = ctdb_ltdb_store(ctdb_db, key, hdr, data);
377 DEBUG(DEBUG_CRIT, (__location__ " Unable to store record\n"));
381 rec = (struct ctdb_rec_data_old *)(rec->length + (uint8_t *)rec);
384 DEBUG(DEBUG_DEBUG,("finished push of %u records for dbid 0x%x\n",
385 reply->count, reply->db_id));
387 if (ctdb_db->readonly) {
388 DEBUG(DEBUG_CRIT,("Clearing the tracking database for dbid 0x%x\n",
390 if (tdb_wipe_all(ctdb_db->rottdb) != 0) {
391 DEBUG(DEBUG_ERR,("Failed to wipe tracking database for 0x%x. Dropping read-only delegation support\n", ctdb_db->db_id));
392 ctdb_db->readonly = false;
393 tdb_close(ctdb_db->rottdb);
394 ctdb_db->rottdb = NULL;
395 ctdb_db->readonly = false;
397 while (ctdb_db->revokechild_active != NULL) {
398 talloc_free(ctdb_db->revokechild_active);
402 ctdb_lockdb_unmark(ctdb_db);
406 ctdb_lockdb_unmark(ctdb_db);
410 struct ctdb_set_recmode_state {
411 struct ctdb_context *ctdb;
412 struct ctdb_req_control_old *c;
415 struct tevent_timer *te;
416 struct tevent_fd *fde;
418 struct timeval start_time;
422 called if our set_recmode child times out. this would happen if
423 ctdb_recovery_lock() would block.
425 static void ctdb_set_recmode_timeout(struct tevent_context *ev,
426 struct tevent_timer *te,
427 struct timeval t, void *private_data)
429 struct ctdb_set_recmode_state *state = talloc_get_type(private_data,
430 struct ctdb_set_recmode_state);
432 /* we consider this a success, not a failure, as we failed to
433 set the recovery lock which is what we wanted. This can be
434 caused by the cluster filesystem being very slow to
435 arbitrate locks immediately after a node failure.
437 DEBUG(DEBUG_ERR,(__location__ " set_recmode child process hung/timedout CFS slow to grant locks? (allowing recmode set anyway)\n"));
438 state->ctdb->recovery_mode = state->recmode;
439 ctdb_request_control_reply(state->ctdb, state->c, NULL, 0, NULL);
444 /* when we free the recmode state we must kill any child process.
446 static int set_recmode_destructor(struct ctdb_set_recmode_state *state)
448 double l = timeval_elapsed(&state->start_time);
450 CTDB_UPDATE_RECLOCK_LATENCY(state->ctdb, "daemon reclock", reclock.ctdbd, l);
452 if (state->fd[0] != -1) {
455 if (state->fd[1] != -1) {
458 ctdb_kill(state->ctdb, state->child, SIGKILL);
462 /* this is called when the client process has completed ctdb_recovery_lock()
463 and has written data back to us through the pipe.
465 static void set_recmode_handler(struct tevent_context *ev,
466 struct tevent_fd *fde,
467 uint16_t flags, void *private_data)
469 struct ctdb_set_recmode_state *state= talloc_get_type(private_data,
470 struct ctdb_set_recmode_state);
474 /* we got a response from our child process so we can abort the
477 talloc_free(state->te);
481 /* If, as expected, the child was unable to take the recovery
482 * lock then it will have written 0 into the pipe, so
483 * continue. However, any other value (e.g. 1) indicates that
484 * it was able to take the recovery lock when it should have
485 * been held by the recovery daemon on the recovery master.
487 ret = sys_read(state->fd[0], &c, 1);
488 if (ret != 1 || c != 0) {
489 ctdb_request_control_reply(
490 state->ctdb, state->c, NULL, -1,
491 "Took recovery lock from daemon during recovery - probably a cluster filesystem lock coherence problem");
496 state->ctdb->recovery_mode = state->recmode;
498 /* release any deferred attach calls from clients */
499 if (state->recmode == CTDB_RECOVERY_NORMAL) {
500 ctdb_process_deferred_attach(state->ctdb);
503 ctdb_request_control_reply(state->ctdb, state->c, NULL, 0, NULL);
509 ctdb_drop_all_ips_event(struct tevent_context *ev, struct tevent_timer *te,
510 struct timeval t, void *private_data)
512 struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
514 DEBUG(DEBUG_ERR,(__location__ " Been in recovery mode for too long. Dropping all IPS\n"));
515 talloc_free(ctdb->release_ips_ctx);
516 ctdb->release_ips_ctx = NULL;
518 ctdb_release_all_ips(ctdb);
522 * Set up an event to drop all public ips if we remain in recovery for too
525 int ctdb_deferred_drop_all_ips(struct ctdb_context *ctdb)
527 if (ctdb->release_ips_ctx != NULL) {
528 talloc_free(ctdb->release_ips_ctx);
530 ctdb->release_ips_ctx = talloc_new(ctdb);
531 CTDB_NO_MEMORY(ctdb, ctdb->release_ips_ctx);
533 tevent_add_timer(ctdb->ev, ctdb->release_ips_ctx,
534 timeval_current_ofs(ctdb->tunable.recovery_drop_all_ips, 0),
535 ctdb_drop_all_ips_event, ctdb);
540 set the recovery mode
542 int32_t ctdb_control_set_recmode(struct ctdb_context *ctdb,
543 struct ctdb_req_control_old *c,
544 TDB_DATA indata, bool *async_reply,
545 const char **errormsg)
547 uint32_t recmode = *(uint32_t *)indata.dptr;
549 struct ctdb_set_recmode_state *state;
550 pid_t parent = getpid();
551 struct ctdb_db_context *ctdb_db;
553 /* if we enter recovery but stay in recovery for too long
554 we will eventually drop all our ip addresses
556 if (recmode == CTDB_RECOVERY_NORMAL) {
557 talloc_free(ctdb->release_ips_ctx);
558 ctdb->release_ips_ctx = NULL;
560 if (ctdb_deferred_drop_all_ips(ctdb) != 0) {
561 DEBUG(DEBUG_ERR,("Failed to set up deferred drop all ips\n"));
565 if (recmode != ctdb->recovery_mode) {
566 DEBUG(DEBUG_NOTICE,(__location__ " Recovery mode set to %s\n",
567 recmode==CTDB_RECOVERY_NORMAL?"NORMAL":"ACTIVE"));
570 if (recmode != CTDB_RECOVERY_NORMAL ||
571 ctdb->recovery_mode != CTDB_RECOVERY_ACTIVE) {
572 ctdb->recovery_mode = recmode;
576 /* some special handling when ending recovery mode */
578 for (ctdb_db = ctdb->db_list; ctdb_db != NULL; ctdb_db = ctdb_db->next) {
579 if (ctdb_db->generation != ctdb->vnn_map->generation) {
581 ("Inconsistent DB generation %u for %s\n",
582 ctdb_db->generation, ctdb_db->db_name));
583 DEBUG(DEBUG_ERR, ("Recovery mode set to ACTIVE\n"));
588 /* force the databases to thaw */
589 for (i=1; i<=NUM_DB_PRIORITIES; i++) {
590 if (ctdb_db_prio_frozen(ctdb, i)) {
591 ctdb_control_thaw(ctdb, i, false);
595 /* release any deferred attach calls from clients */
596 if (recmode == CTDB_RECOVERY_NORMAL) {
597 ctdb_process_deferred_attach(ctdb);
600 if (ctdb->recovery_lock_file == NULL) {
601 /* Not using recovery lock file */
602 ctdb->recovery_mode = recmode;
606 state = talloc(ctdb, struct ctdb_set_recmode_state);
607 CTDB_NO_MEMORY(ctdb, state);
609 state->start_time = timeval_current();
613 /* For the rest of what needs to be done, we need to do this in
614 a child process since
615 1, the call to ctdb_recovery_lock() can block if the cluster
616 filesystem is in the process of recovery.
618 ret = pipe(state->fd);
621 DEBUG(DEBUG_CRIT,(__location__ " Failed to open pipe for set_recmode child\n"));
625 state->child = ctdb_fork(ctdb);
626 if (state->child == (pid_t)-1) {
633 if (state->child == 0) {
637 prctl_set_comment("ctdb_recmode");
638 debug_extra = talloc_asprintf(NULL, "set_recmode:");
639 /* Daemon should not be able to get the recover lock,
640 * as it should be held by the recovery master */
641 if (ctdb_recovery_lock(ctdb)) {
643 ("ERROR: Daemon able to take recovery lock on \"%s\" during recovery\n",
644 ctdb->recovery_lock_file));
645 ctdb_recovery_unlock(ctdb);
649 sys_write(state->fd[1], &cc, 1);
650 ctdb_wait_for_process_to_exit(parent);
654 set_close_on_exec(state->fd[0]);
658 talloc_set_destructor(state, set_recmode_destructor);
660 DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d for setrecmode\n", state->fd[0]));
662 state->te = tevent_add_timer(ctdb->ev, state, timeval_current_ofs(5, 0),
663 ctdb_set_recmode_timeout, state);
665 state->fde = tevent_add_fd(ctdb->ev, state, state->fd[0], TEVENT_FD_READ,
666 set_recmode_handler, (void *)state);
668 if (state->fde == NULL) {
672 tevent_fd_set_auto_close(state->fde);
675 state->recmode = recmode;
676 state->c = talloc_steal(state, c);
684 bool ctdb_recovery_have_lock(struct ctdb_context *ctdb)
686 return ctdb->recovery_lock_fd != -1;
690 try and get the recovery lock in shared storage - should only work
691 on the recovery master recovery daemon. Anywhere else is a bug
693 bool ctdb_recovery_lock(struct ctdb_context *ctdb)
697 ctdb->recovery_lock_fd = open(ctdb->recovery_lock_file,
698 O_RDWR|O_CREAT, 0600);
699 if (ctdb->recovery_lock_fd == -1) {
701 ("ctdb_recovery_lock: Unable to open %s - (%s)\n",
702 ctdb->recovery_lock_file, strerror(errno)));
706 set_close_on_exec(ctdb->recovery_lock_fd);
708 lock.l_type = F_WRLCK;
709 lock.l_whence = SEEK_SET;
714 if (fcntl(ctdb->recovery_lock_fd, F_SETLK, &lock) != 0) {
715 int saved_errno = errno;
716 close(ctdb->recovery_lock_fd);
717 ctdb->recovery_lock_fd = -1;
718 /* Fail silently on these errors, since they indicate
719 * lock contention, but log an error for any other
721 if (saved_errno != EACCES &&
722 saved_errno != EAGAIN) {
723 DEBUG(DEBUG_ERR,("ctdb_recovery_lock: Failed to get "
724 "recovery lock on '%s' - (%s)\n",
725 ctdb->recovery_lock_file,
726 strerror(saved_errno)));
734 void ctdb_recovery_unlock(struct ctdb_context *ctdb)
736 if (ctdb->recovery_lock_fd != -1) {
737 DEBUG(DEBUG_NOTICE, ("Releasing recovery lock\n"));
738 close(ctdb->recovery_lock_fd);
739 ctdb->recovery_lock_fd = -1;
744 delete a record as part of the vacuum process
745 only delete if we are not lmaster or dmaster, and our rsn is <= the provided rsn
746 use non-blocking locks
748 return 0 if the record was successfully deleted (i.e. it does not exist
749 when the function returns)
750 or !0 is the record still exists in the tdb after returning.
752 static int delete_tdb_record(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, struct ctdb_rec_data_old *rec)
754 TDB_DATA key, data, data2;
755 struct ctdb_ltdb_header *hdr, *hdr2;
757 /* these are really internal tdb functions - but we need them here for
758 non-blocking lock of the freelist */
759 int tdb_lock_nonblock(struct tdb_context *tdb, int list, int ltype);
760 int tdb_unlock(struct tdb_context *tdb, int list, int ltype);
763 key.dsize = rec->keylen;
764 key.dptr = &rec->data[0];
765 data.dsize = rec->datalen;
766 data.dptr = &rec->data[rec->keylen];
768 if (ctdb_lmaster(ctdb, &key) == ctdb->pnn) {
769 DEBUG(DEBUG_INFO,(__location__ " Called delete on record where we are lmaster\n"));
773 if (data.dsize != sizeof(struct ctdb_ltdb_header)) {
774 DEBUG(DEBUG_ERR,(__location__ " Bad record size\n"));
778 hdr = (struct ctdb_ltdb_header *)data.dptr;
780 /* use a non-blocking lock */
781 if (tdb_chainlock_nonblock(ctdb_db->ltdb->tdb, key) != 0) {
785 data2 = tdb_fetch(ctdb_db->ltdb->tdb, key);
786 if (data2.dptr == NULL) {
787 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
791 if (data2.dsize < sizeof(struct ctdb_ltdb_header)) {
792 if (tdb_lock_nonblock(ctdb_db->ltdb->tdb, -1, F_WRLCK) == 0) {
793 if (tdb_delete(ctdb_db->ltdb->tdb, key) != 0) {
794 DEBUG(DEBUG_CRIT,(__location__ " Failed to delete corrupt record\n"));
796 tdb_unlock(ctdb_db->ltdb->tdb, -1, F_WRLCK);
797 DEBUG(DEBUG_CRIT,(__location__ " Deleted corrupt record\n"));
799 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
804 hdr2 = (struct ctdb_ltdb_header *)data2.dptr;
806 if (hdr2->rsn > hdr->rsn) {
807 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
808 DEBUG(DEBUG_INFO,(__location__ " Skipping record with rsn=%llu - called with rsn=%llu\n",
809 (unsigned long long)hdr2->rsn, (unsigned long long)hdr->rsn));
814 /* do not allow deleting record that have readonly flags set. */
815 if (hdr->flags & CTDB_REC_RO_FLAGS) {
816 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
817 DEBUG(DEBUG_INFO,(__location__ " Skipping record with readonly flags set\n"));
821 if (hdr2->flags & CTDB_REC_RO_FLAGS) {
822 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
823 DEBUG(DEBUG_INFO,(__location__ " Skipping record with readonly flags set\n"));
828 if (hdr2->dmaster == ctdb->pnn) {
829 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
830 DEBUG(DEBUG_INFO,(__location__ " Attempted delete record where we are the dmaster\n"));
835 if (tdb_lock_nonblock(ctdb_db->ltdb->tdb, -1, F_WRLCK) != 0) {
836 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
841 if (tdb_delete(ctdb_db->ltdb->tdb, key) != 0) {
842 tdb_unlock(ctdb_db->ltdb->tdb, -1, F_WRLCK);
843 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
844 DEBUG(DEBUG_INFO,(__location__ " Failed to delete record\n"));
849 tdb_unlock(ctdb_db->ltdb->tdb, -1, F_WRLCK);
850 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
857 struct recovery_callback_state {
858 struct ctdb_req_control_old *c;
863 called when the 'recovered' event script has finished
865 static void ctdb_end_recovery_callback(struct ctdb_context *ctdb, int status, void *p)
867 struct recovery_callback_state *state = talloc_get_type(p, struct recovery_callback_state);
869 ctdb_enable_monitoring(ctdb);
870 CTDB_INCREMENT_STAT(ctdb, num_recoveries);
873 DEBUG(DEBUG_ERR,(__location__ " recovered event script failed (status %d)\n", status));
874 if (status == -ETIME) {
879 ctdb_request_control_reply(ctdb, state->c, NULL, status, NULL);
882 gettimeofday(&ctdb->last_recovery_finished, NULL);
884 if (ctdb->runstate == CTDB_RUNSTATE_FIRST_RECOVERY) {
885 ctdb_set_runstate(ctdb, CTDB_RUNSTATE_STARTUP);
890 recovery has finished
892 int32_t ctdb_control_end_recovery(struct ctdb_context *ctdb,
893 struct ctdb_req_control_old *c,
897 struct recovery_callback_state *state;
899 DEBUG(DEBUG_NOTICE,("Recovery has finished\n"));
901 ctdb_persistent_finish_trans3_commits(ctdb);
903 state = talloc(ctdb, struct recovery_callback_state);
904 CTDB_NO_MEMORY(ctdb, state);
908 ctdb_disable_monitoring(ctdb);
910 ret = ctdb_event_script_callback(ctdb, state,
911 ctdb_end_recovery_callback,
913 CTDB_EVENT_RECOVERED, "%s", "");
916 ctdb_enable_monitoring(ctdb);
918 DEBUG(DEBUG_ERR,(__location__ " Failed to end recovery\n"));
923 /* tell the control that we will be reply asynchronously */
924 state->c = talloc_steal(state, c);
930 called when the 'startrecovery' event script has finished
932 static void ctdb_start_recovery_callback(struct ctdb_context *ctdb, int status, void *p)
934 struct recovery_callback_state *state = talloc_get_type(p, struct recovery_callback_state);
937 DEBUG(DEBUG_ERR,(__location__ " startrecovery event script failed (status %d)\n", status));
940 ctdb_request_control_reply(ctdb, state->c, NULL, status, NULL);
945 run the startrecovery eventscript
947 int32_t ctdb_control_start_recovery(struct ctdb_context *ctdb,
948 struct ctdb_req_control_old *c,
952 struct recovery_callback_state *state;
954 DEBUG(DEBUG_NOTICE,(__location__ " startrecovery eventscript has been invoked\n"));
955 gettimeofday(&ctdb->last_recovery_started, NULL);
957 state = talloc(ctdb, struct recovery_callback_state);
958 CTDB_NO_MEMORY(ctdb, state);
960 state->c = talloc_steal(state, c);
962 ctdb_disable_monitoring(ctdb);
964 ret = ctdb_event_script_callback(ctdb, state,
965 ctdb_start_recovery_callback,
967 CTDB_EVENT_START_RECOVERY,
971 DEBUG(DEBUG_ERR,(__location__ " Failed to start recovery\n"));
976 /* tell the control that we will be reply asynchronously */
982 try to delete all these records as part of the vacuuming process
983 and return the records we failed to delete
985 int32_t ctdb_control_try_delete_records(struct ctdb_context *ctdb, TDB_DATA indata, TDB_DATA *outdata)
987 struct ctdb_marshall_buffer *reply = (struct ctdb_marshall_buffer *)indata.dptr;
988 struct ctdb_db_context *ctdb_db;
990 struct ctdb_rec_data_old *rec;
991 struct ctdb_marshall_buffer *records;
993 if (indata.dsize < offsetof(struct ctdb_marshall_buffer, data)) {
994 DEBUG(DEBUG_ERR,(__location__ " invalid data in try_delete_records\n"));
998 ctdb_db = find_ctdb_db(ctdb, reply->db_id);
1000 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", reply->db_id));
1005 DEBUG(DEBUG_DEBUG,("starting try_delete_records of %u records for dbid 0x%x\n",
1006 reply->count, reply->db_id));
1009 /* create a blob to send back the records we couldnt delete */
1010 records = (struct ctdb_marshall_buffer *)
1011 talloc_zero_size(outdata,
1012 offsetof(struct ctdb_marshall_buffer, data));
1013 if (records == NULL) {
1014 DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
1017 records->db_id = ctdb_db->db_id;
1020 rec = (struct ctdb_rec_data_old *)&reply->data[0];
1021 for (i=0;i<reply->count;i++) {
1024 key.dptr = &rec->data[0];
1025 key.dsize = rec->keylen;
1026 data.dptr = &rec->data[key.dsize];
1027 data.dsize = rec->datalen;
1029 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
1030 DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record in indata\n"));
1034 /* If we cant delete the record we must add it to the reply
1035 so the lmaster knows it may not purge this record
1037 if (delete_tdb_record(ctdb, ctdb_db, rec) != 0) {
1039 struct ctdb_ltdb_header *hdr;
1041 hdr = (struct ctdb_ltdb_header *)data.dptr;
1042 data.dptr += sizeof(*hdr);
1043 data.dsize -= sizeof(*hdr);
1045 DEBUG(DEBUG_INFO, (__location__ " Failed to vacuum delete record with hash 0x%08x\n", ctdb_hash(&key)));
1047 old_size = talloc_get_size(records);
1048 records = talloc_realloc_size(outdata, records, old_size + rec->length);
1049 if (records == NULL) {
1050 DEBUG(DEBUG_ERR,(__location__ " Failed to expand\n"));
1054 memcpy(old_size+(uint8_t *)records, rec, rec->length);
1057 rec = (struct ctdb_rec_data_old *)(rec->length + (uint8_t *)rec);
1061 *outdata = ctdb_marshall_finish(records);
1067 * Store a record as part of the vacuum process:
1068 * This is called from the RECEIVE_RECORD control which
1069 * the lmaster uses to send the current empty copy
1070 * to all nodes for storing, before it lets the other
1071 * nodes delete the records in the second phase with
1072 * the TRY_DELETE_RECORDS control.
1074 * Only store if we are not lmaster or dmaster, and our
1075 * rsn is <= the provided rsn. Use non-blocking locks.
1077 * return 0 if the record was successfully stored.
1078 * return !0 if the record still exists in the tdb after returning.
1080 static int store_tdb_record(struct ctdb_context *ctdb,
1081 struct ctdb_db_context *ctdb_db,
1082 struct ctdb_rec_data_old *rec)
1084 TDB_DATA key, data, data2;
1085 struct ctdb_ltdb_header *hdr, *hdr2;
1088 key.dsize = rec->keylen;
1089 key.dptr = &rec->data[0];
1090 data.dsize = rec->datalen;
1091 data.dptr = &rec->data[rec->keylen];
1093 if (ctdb_lmaster(ctdb, &key) == ctdb->pnn) {
1094 DEBUG(DEBUG_INFO, (__location__ " Called store_tdb_record "
1095 "where we are lmaster\n"));
1099 if (data.dsize != sizeof(struct ctdb_ltdb_header)) {
1100 DEBUG(DEBUG_ERR, (__location__ " Bad record size\n"));
1104 hdr = (struct ctdb_ltdb_header *)data.dptr;
1106 /* use a non-blocking lock */
1107 if (tdb_chainlock_nonblock(ctdb_db->ltdb->tdb, key) != 0) {
1108 DEBUG(DEBUG_INFO, (__location__ " Failed to lock chain in non-blocking mode\n"));
1112 data2 = tdb_fetch(ctdb_db->ltdb->tdb, key);
1113 if (data2.dptr == NULL || data2.dsize < sizeof(struct ctdb_ltdb_header)) {
1114 if (tdb_store(ctdb_db->ltdb->tdb, key, data, 0) == -1) {
1115 DEBUG(DEBUG_ERR, (__location__ "Failed to store record\n"));
1119 DEBUG(DEBUG_INFO, (__location__ " Stored record\n"));
1124 hdr2 = (struct ctdb_ltdb_header *)data2.dptr;
1126 if (hdr2->rsn > hdr->rsn) {
1127 DEBUG(DEBUG_INFO, (__location__ " Skipping record with "
1128 "rsn=%llu - called with rsn=%llu\n",
1129 (unsigned long long)hdr2->rsn,
1130 (unsigned long long)hdr->rsn));
1135 /* do not allow vacuuming of records that have readonly flags set. */
1136 if (hdr->flags & CTDB_REC_RO_FLAGS) {
1137 DEBUG(DEBUG_INFO,(__location__ " Skipping record with readonly "
1142 if (hdr2->flags & CTDB_REC_RO_FLAGS) {
1143 DEBUG(DEBUG_INFO,(__location__ " Skipping record with readonly "
1149 if (hdr2->dmaster == ctdb->pnn) {
1150 DEBUG(DEBUG_INFO, (__location__ " Attempted to store record "
1151 "where we are the dmaster\n"));
1156 if (tdb_store(ctdb_db->ltdb->tdb, key, data, 0) != 0) {
1157 DEBUG(DEBUG_INFO,(__location__ " Failed to store record\n"));
1165 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
1173 * Try to store all these records as part of the vacuuming process
1174 * and return the records we failed to store.
1176 int32_t ctdb_control_receive_records(struct ctdb_context *ctdb,
1177 TDB_DATA indata, TDB_DATA *outdata)
1179 struct ctdb_marshall_buffer *reply = (struct ctdb_marshall_buffer *)indata.dptr;
1180 struct ctdb_db_context *ctdb_db;
1182 struct ctdb_rec_data_old *rec;
1183 struct ctdb_marshall_buffer *records;
1185 if (indata.dsize < offsetof(struct ctdb_marshall_buffer, data)) {
1187 (__location__ " invalid data in receive_records\n"));
1191 ctdb_db = find_ctdb_db(ctdb, reply->db_id);
1193 DEBUG(DEBUG_ERR, (__location__ " Unknown db 0x%08x\n",
1198 DEBUG(DEBUG_DEBUG, ("starting receive_records of %u records for "
1199 "dbid 0x%x\n", reply->count, reply->db_id));
1201 /* create a blob to send back the records we could not store */
1202 records = (struct ctdb_marshall_buffer *)
1203 talloc_zero_size(outdata,
1204 offsetof(struct ctdb_marshall_buffer, data));
1205 if (records == NULL) {
1206 DEBUG(DEBUG_ERR, (__location__ " Out of memory\n"));
1209 records->db_id = ctdb_db->db_id;
1211 rec = (struct ctdb_rec_data_old *)&reply->data[0];
1212 for (i=0; i<reply->count; i++) {
1215 key.dptr = &rec->data[0];
1216 key.dsize = rec->keylen;
1217 data.dptr = &rec->data[key.dsize];
1218 data.dsize = rec->datalen;
1220 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
1221 DEBUG(DEBUG_CRIT, (__location__ " bad ltdb record "
1227 * If we can not store the record we must add it to the reply
1228 * so the lmaster knows it may not purge this record.
1230 if (store_tdb_record(ctdb, ctdb_db, rec) != 0) {
1232 struct ctdb_ltdb_header *hdr;
1234 hdr = (struct ctdb_ltdb_header *)data.dptr;
1235 data.dptr += sizeof(*hdr);
1236 data.dsize -= sizeof(*hdr);
1238 DEBUG(DEBUG_INFO, (__location__ " Failed to store "
1239 "record with hash 0x%08x in vacuum "
1240 "via RECEIVE_RECORDS\n",
1243 old_size = talloc_get_size(records);
1244 records = talloc_realloc_size(outdata, records,
1245 old_size + rec->length);
1246 if (records == NULL) {
1247 DEBUG(DEBUG_ERR, (__location__ " Failed to "
1252 memcpy(old_size+(uint8_t *)records, rec, rec->length);
1255 rec = (struct ctdb_rec_data_old *)(rec->length + (uint8_t *)rec);
1258 *outdata = ctdb_marshall_finish(records);
1267 int32_t ctdb_control_get_capabilities(struct ctdb_context *ctdb, TDB_DATA *outdata)
1269 uint32_t *capabilities = NULL;
1271 capabilities = talloc(outdata, uint32_t);
1272 CTDB_NO_MEMORY(ctdb, capabilities);
1273 *capabilities = ctdb->capabilities;
1275 outdata->dsize = sizeof(uint32_t);
1276 outdata->dptr = (uint8_t *)capabilities;
1281 /* The recovery daemon will ping us at regular intervals.
1282 If we havent been pinged for a while we assume the recovery
1283 daemon is inoperable and we restart.
1285 static void ctdb_recd_ping_timeout(struct tevent_context *ev,
1286 struct tevent_timer *te,
1287 struct timeval t, void *p)
1289 struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context);
1290 uint32_t *count = talloc_get_type(ctdb->recd_ping_count, uint32_t);
1292 DEBUG(DEBUG_ERR, ("Recovery daemon ping timeout. Count : %u\n", *count));
1294 if (*count < ctdb->tunable.recd_ping_failcount) {
1296 tevent_add_timer(ctdb->ev, ctdb->recd_ping_count,
1297 timeval_current_ofs(ctdb->tunable.recd_ping_timeout, 0),
1298 ctdb_recd_ping_timeout, ctdb);
1302 DEBUG(DEBUG_ERR, ("Final timeout for recovery daemon ping. Restarting recovery daemon. (This can be caused if the cluster filesystem has hung)\n"));
1304 ctdb_stop_recoverd(ctdb);
1305 ctdb_start_recoverd(ctdb);
1308 int32_t ctdb_control_recd_ping(struct ctdb_context *ctdb)
1310 talloc_free(ctdb->recd_ping_count);
1312 ctdb->recd_ping_count = talloc_zero(ctdb, uint32_t);
1313 CTDB_NO_MEMORY(ctdb, ctdb->recd_ping_count);
1315 if (ctdb->tunable.recd_ping_timeout != 0) {
1316 tevent_add_timer(ctdb->ev, ctdb->recd_ping_count,
1317 timeval_current_ofs(ctdb->tunable.recd_ping_timeout, 0),
1318 ctdb_recd_ping_timeout, ctdb);
1326 int32_t ctdb_control_set_recmaster(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata)
1328 uint32_t new_recmaster;
1330 CHECK_CONTROL_DATA_SIZE(sizeof(uint32_t));
1331 new_recmaster = ((uint32_t *)(&indata.dptr[0]))[0];
1333 if (ctdb->pnn != new_recmaster && ctdb->recovery_master == ctdb->pnn) {
1335 ("This node (%u) is no longer the recovery master\n", ctdb->pnn));
1338 if (ctdb->pnn == new_recmaster && ctdb->recovery_master != new_recmaster) {
1340 ("This node (%u) is now the recovery master\n", ctdb->pnn));
1343 ctdb->recovery_master = new_recmaster;
1348 int32_t ctdb_control_stop_node(struct ctdb_context *ctdb)
1350 DEBUG(DEBUG_NOTICE, ("Stopping node\n"));
1351 ctdb_disable_monitoring(ctdb);
1352 ctdb->nodes[ctdb->pnn]->flags |= NODE_FLAGS_STOPPED;
1357 int32_t ctdb_control_continue_node(struct ctdb_context *ctdb)
1359 DEBUG(DEBUG_NOTICE, ("Continue node\n"));
1360 ctdb->nodes[ctdb->pnn]->flags &= ~NODE_FLAGS_STOPPED;