4 Copyright (C) Andrew Tridgell 2007
5 Copyright (C) Ronnie Sahlberg 2007
7 This program is free software; you can redistribute it and/or modify
8 it under the terms of the GNU General Public License as published by
9 the Free Software Foundation; either version 3 of the License, or
10 (at your option) any later version.
12 This program is distributed in the hope that it will be useful,
13 but WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 GNU General Public License for more details.
17 You should have received a copy of the GNU General Public License
18 along with this program; if not, see <http://www.gnu.org/licenses/>.
21 #include "system/time.h"
22 #include "system/network.h"
23 #include "system/filesys.h"
24 #include "system/wait.h"
30 #include "lib/tdb_wrap/tdb_wrap.h"
31 #include "lib/util/dlinklist.h"
32 #include "lib/util/debug.h"
33 #include "lib/util/samba_util.h"
34 #include "lib/util/util_process.h"
36 #include "ctdb_private.h"
37 #include "ctdb_client.h"
39 #include "common/system.h"
40 #include "common/common.h"
41 #include "common/logging.h"
44 ctdb_control_getvnnmap(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
46 struct ctdb_vnn_map_wire *map;
49 CHECK_CONTROL_DATA_SIZE(0);
51 len = offsetof(struct ctdb_vnn_map_wire, map) + sizeof(uint32_t)*ctdb->vnn_map->size;
52 map = talloc_size(outdata, len);
53 CTDB_NO_MEMORY(ctdb, map);
55 map->generation = ctdb->vnn_map->generation;
56 map->size = ctdb->vnn_map->size;
57 memcpy(map->map, ctdb->vnn_map->map, sizeof(uint32_t)*map->size);
60 outdata->dptr = (uint8_t *)map;
66 ctdb_control_setvnnmap(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
68 struct ctdb_vnn_map_wire *map = (struct ctdb_vnn_map_wire *)indata.dptr;
70 if (ctdb->recovery_mode != CTDB_RECOVERY_ACTIVE) {
71 DEBUG(DEBUG_ERR, ("Attempt to set vnnmap when not in recovery\n"));
75 talloc_free(ctdb->vnn_map);
77 ctdb->vnn_map = talloc(ctdb, struct ctdb_vnn_map);
78 CTDB_NO_MEMORY(ctdb, ctdb->vnn_map);
80 ctdb->vnn_map->generation = map->generation;
81 ctdb->vnn_map->size = map->size;
82 ctdb->vnn_map->map = talloc_array(ctdb->vnn_map, uint32_t, map->size);
83 CTDB_NO_MEMORY(ctdb, ctdb->vnn_map->map);
85 memcpy(ctdb->vnn_map->map, map->map, sizeof(uint32_t)*map->size);
91 ctdb_control_getdbmap(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
94 struct ctdb_db_context *ctdb_db;
95 struct ctdb_dbid_map_old *dbid_map;
97 CHECK_CONTROL_DATA_SIZE(0);
100 for(ctdb_db=ctdb->db_list;ctdb_db;ctdb_db=ctdb_db->next){
105 outdata->dsize = offsetof(struct ctdb_dbid_map_old, dbs) + sizeof(dbid_map->dbs[0])*len;
106 outdata->dptr = (unsigned char *)talloc_zero_size(outdata, outdata->dsize);
107 if (!outdata->dptr) {
108 DEBUG(DEBUG_ALERT, (__location__ " Failed to allocate dbmap array\n"));
112 dbid_map = (struct ctdb_dbid_map_old *)outdata->dptr;
114 for (i=0,ctdb_db=ctdb->db_list;ctdb_db;i++,ctdb_db=ctdb_db->next){
115 dbid_map->dbs[i].db_id = ctdb_db->db_id;
116 if (ctdb_db->persistent != 0) {
117 dbid_map->dbs[i].flags |= CTDB_DB_FLAGS_PERSISTENT;
119 if (ctdb_db->readonly != 0) {
120 dbid_map->dbs[i].flags |= CTDB_DB_FLAGS_READONLY;
122 if (ctdb_db->sticky != 0) {
123 dbid_map->dbs[i].flags |= CTDB_DB_FLAGS_STICKY;
131 ctdb_control_getnodemap(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
133 CHECK_CONTROL_DATA_SIZE(0);
135 outdata->dptr = (unsigned char *)ctdb_node_list_to_map(ctdb->nodes,
138 if (outdata->dptr == NULL) {
142 outdata->dsize = talloc_get_size(outdata->dptr);
148 reload the nodes file
151 ctdb_control_reload_nodes_file(struct ctdb_context *ctdb, uint32_t opcode)
155 struct ctdb_node **nodes;
157 tmp_ctx = talloc_new(ctdb);
159 /* steal the old nodes file for a while */
160 talloc_steal(tmp_ctx, ctdb->nodes);
163 num_nodes = ctdb->num_nodes;
166 /* load the new nodes file */
167 ctdb_load_nodes_file(ctdb);
169 for (i=0; i<ctdb->num_nodes; i++) {
170 /* keep any identical pre-existing nodes and connections */
171 if ((i < num_nodes) && ctdb_same_address(&ctdb->nodes[i]->address, &nodes[i]->address)) {
172 talloc_free(ctdb->nodes[i]);
173 ctdb->nodes[i] = talloc_steal(ctdb->nodes, nodes[i]);
177 if (ctdb->nodes[i]->flags & NODE_FLAGS_DELETED) {
181 /* any new or different nodes must be added */
182 if (ctdb->methods->add_node(ctdb->nodes[i]) != 0) {
183 DEBUG(DEBUG_CRIT, (__location__ " methods->add_node failed at %d\n", i));
184 ctdb_fatal(ctdb, "failed to add node. shutting down\n");
186 if (ctdb->methods->connect_node(ctdb->nodes[i]) != 0) {
187 DEBUG(DEBUG_CRIT, (__location__ " methods->add_connect failed at %d\n", i));
188 ctdb_fatal(ctdb, "failed to connect to node. shutting down\n");
192 /* tell the recovery daemon to reaload the nodes file too */
193 ctdb_daemon_send_message(ctdb, ctdb->pnn, CTDB_SRVID_RELOAD_NODES, tdb_null);
195 talloc_free(tmp_ctx);
201 a traverse function for pulling all relevent records from pulldb
204 struct ctdb_context *ctdb;
205 struct ctdb_db_context *ctdb_db;
206 struct ctdb_marshall_buffer *pulldata;
208 uint32_t allocated_len;
212 static int traverse_pulldb(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p)
214 struct pulldb_data *params = (struct pulldb_data *)p;
215 struct ctdb_rec_data_old *rec;
216 struct ctdb_context *ctdb = params->ctdb;
217 struct ctdb_db_context *ctdb_db = params->ctdb_db;
219 /* add the record to the blob */
220 rec = ctdb_marshall_record(params->pulldata, 0, key, NULL, data);
222 params->failed = true;
225 if (params->len + rec->length >= params->allocated_len) {
226 params->allocated_len = rec->length + params->len + ctdb->tunable.pulldb_preallocation_size;
227 params->pulldata = talloc_realloc_size(NULL, params->pulldata, params->allocated_len);
229 if (params->pulldata == NULL) {
230 DEBUG(DEBUG_CRIT,(__location__ " Failed to expand pulldb_data to %u\n", rec->length + params->len));
231 ctdb_fatal(params->ctdb, "failed to allocate memory for recovery. shutting down\n");
233 params->pulldata->count++;
234 memcpy(params->len+(uint8_t *)params->pulldata, rec, rec->length);
235 params->len += rec->length;
237 if (ctdb->tunable.db_record_size_warn != 0 && rec->length > ctdb->tunable.db_record_size_warn) {
238 DEBUG(DEBUG_ERR,("Data record in %s is big. Record size is %d bytes\n", ctdb_db->db_name, (int)rec->length));
247 pull a bunch of records from a ltdb, filtering by lmaster
249 int32_t ctdb_control_pull_db(struct ctdb_context *ctdb, TDB_DATA indata, TDB_DATA *outdata)
251 struct ctdb_pulldb *pull;
252 struct ctdb_db_context *ctdb_db;
253 struct pulldb_data params;
254 struct ctdb_marshall_buffer *reply;
256 pull = (struct ctdb_pulldb *)indata.dptr;
258 ctdb_db = find_ctdb_db(ctdb, pull->db_id);
260 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", pull->db_id));
264 if (!ctdb_db_frozen(ctdb_db)) {
266 ("rejecting ctdb_control_pull_db when not frozen\n"));
270 reply = talloc_zero(outdata, struct ctdb_marshall_buffer);
271 CTDB_NO_MEMORY(ctdb, reply);
273 reply->db_id = pull->db_id;
276 params.ctdb_db = ctdb_db;
277 params.pulldata = reply;
278 params.len = offsetof(struct ctdb_marshall_buffer, data);
279 params.allocated_len = params.len;
280 params.failed = false;
282 if (ctdb_db->unhealthy_reason) {
283 /* this is just a warning, as the tdb should be empty anyway */
284 DEBUG(DEBUG_WARNING,("db(%s) unhealty in ctdb_control_pull_db: %s\n",
285 ctdb_db->db_name, ctdb_db->unhealthy_reason));
288 if (ctdb_lockdb_mark(ctdb_db) != 0) {
289 DEBUG(DEBUG_ERR,(__location__ " Failed to get lock on entire db - failing\n"));
293 if (tdb_traverse_read(ctdb_db->ltdb->tdb, traverse_pulldb, ¶ms) == -1) {
294 DEBUG(DEBUG_ERR,(__location__ " Failed to get traverse db '%s'\n", ctdb_db->db_name));
295 ctdb_lockdb_unmark(ctdb_db);
296 talloc_free(params.pulldata);
300 ctdb_lockdb_unmark(ctdb_db);
302 outdata->dptr = (uint8_t *)params.pulldata;
303 outdata->dsize = params.len;
305 if (ctdb->tunable.db_record_count_warn != 0 && params.pulldata->count > ctdb->tunable.db_record_count_warn) {
306 DEBUG(DEBUG_ERR,("Database %s is big. Contains %d records\n", ctdb_db->db_name, params.pulldata->count));
308 if (ctdb->tunable.db_size_warn != 0 && outdata->dsize > ctdb->tunable.db_size_warn) {
309 DEBUG(DEBUG_ERR,("Database %s is big. Contains %d bytes\n", ctdb_db->db_name, (int)outdata->dsize));
317 push a bunch of records into a ltdb, filtering by rsn
319 int32_t ctdb_control_push_db(struct ctdb_context *ctdb, TDB_DATA indata)
321 struct ctdb_marshall_buffer *reply = (struct ctdb_marshall_buffer *)indata.dptr;
322 struct ctdb_db_context *ctdb_db;
324 struct ctdb_rec_data_old *rec;
326 if (indata.dsize < offsetof(struct ctdb_marshall_buffer, data)) {
327 DEBUG(DEBUG_ERR,(__location__ " invalid data in pulldb reply\n"));
331 ctdb_db = find_ctdb_db(ctdb, reply->db_id);
333 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", reply->db_id));
337 if (!ctdb_db_frozen(ctdb_db)) {
339 ("rejecting ctdb_control_push_db when not frozen\n"));
343 if (ctdb_lockdb_mark(ctdb_db) != 0) {
344 DEBUG(DEBUG_ERR,(__location__ " Failed to get lock on entire db - failing\n"));
348 rec = (struct ctdb_rec_data_old *)&reply->data[0];
350 DEBUG(DEBUG_INFO,("starting push of %u records for dbid 0x%x\n",
351 reply->count, reply->db_id));
353 for (i=0;i<reply->count;i++) {
355 struct ctdb_ltdb_header *hdr;
357 key.dptr = &rec->data[0];
358 key.dsize = rec->keylen;
359 data.dptr = &rec->data[key.dsize];
360 data.dsize = rec->datalen;
362 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
363 DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record\n"));
366 hdr = (struct ctdb_ltdb_header *)data.dptr;
367 /* strip off any read only record flags. All readonly records
368 are revoked implicitely by a recovery
370 hdr->flags &= ~CTDB_REC_RO_FLAGS;
372 data.dptr += sizeof(*hdr);
373 data.dsize -= sizeof(*hdr);
375 ret = ctdb_ltdb_store(ctdb_db, key, hdr, data);
377 DEBUG(DEBUG_CRIT, (__location__ " Unable to store record\n"));
381 rec = (struct ctdb_rec_data_old *)(rec->length + (uint8_t *)rec);
384 DEBUG(DEBUG_DEBUG,("finished push of %u records for dbid 0x%x\n",
385 reply->count, reply->db_id));
387 if (ctdb_db->readonly) {
388 DEBUG(DEBUG_CRIT,("Clearing the tracking database for dbid 0x%x\n",
390 if (tdb_wipe_all(ctdb_db->rottdb) != 0) {
391 DEBUG(DEBUG_ERR,("Failed to wipe tracking database for 0x%x. Dropping read-only delegation support\n", ctdb_db->db_id));
392 ctdb_db->readonly = false;
393 tdb_close(ctdb_db->rottdb);
394 ctdb_db->rottdb = NULL;
395 ctdb_db->readonly = false;
397 while (ctdb_db->revokechild_active != NULL) {
398 talloc_free(ctdb_db->revokechild_active);
402 ctdb_lockdb_unmark(ctdb_db);
406 ctdb_lockdb_unmark(ctdb_db);
410 struct ctdb_set_recmode_state {
411 struct ctdb_context *ctdb;
412 struct ctdb_req_control_old *c;
415 struct tevent_timer *te;
416 struct tevent_fd *fde;
418 struct timeval start_time;
422 called if our set_recmode child times out. this would happen if
423 ctdb_recovery_lock() would block.
425 static void ctdb_set_recmode_timeout(struct tevent_context *ev,
426 struct tevent_timer *te,
427 struct timeval t, void *private_data)
429 struct ctdb_set_recmode_state *state = talloc_get_type(private_data,
430 struct ctdb_set_recmode_state);
432 /* we consider this a success, not a failure, as we failed to
433 set the recovery lock which is what we wanted. This can be
434 caused by the cluster filesystem being very slow to
435 arbitrate locks immediately after a node failure.
437 DEBUG(DEBUG_ERR,(__location__ " set_recmode child process hung/timedout CFS slow to grant locks? (allowing recmode set anyway)\n"));
438 state->ctdb->recovery_mode = state->recmode;
439 ctdb_request_control_reply(state->ctdb, state->c, NULL, 0, NULL);
444 /* when we free the recmode state we must kill any child process.
446 static int set_recmode_destructor(struct ctdb_set_recmode_state *state)
448 double l = timeval_elapsed(&state->start_time);
450 CTDB_UPDATE_RECLOCK_LATENCY(state->ctdb, "daemon reclock", reclock.ctdbd, l);
452 if (state->fd[0] != -1) {
455 if (state->fd[1] != -1) {
458 ctdb_kill(state->ctdb, state->child, SIGKILL);
462 /* this is called when the client process has completed ctdb_recovery_lock()
463 and has written data back to us through the pipe.
465 static void set_recmode_handler(struct tevent_context *ev,
466 struct tevent_fd *fde,
467 uint16_t flags, void *private_data)
469 struct ctdb_set_recmode_state *state= talloc_get_type(private_data,
470 struct ctdb_set_recmode_state);
474 /* we got a response from our child process so we can abort the
477 talloc_free(state->te);
481 /* If, as expected, the child was unable to take the recovery
482 * lock then it will have written 0 into the pipe, so
483 * continue. However, any other value (e.g. 1) indicates that
484 * it was able to take the recovery lock when it should have
485 * been held by the recovery daemon on the recovery master.
487 ret = sys_read(state->fd[0], &c, 1);
488 if (ret != 1 || c != 0) {
489 ctdb_request_control_reply(
490 state->ctdb, state->c, NULL, -1,
491 "Took recovery lock from daemon during recovery - probably a cluster filesystem lock coherence problem");
496 state->ctdb->recovery_mode = state->recmode;
498 /* release any deferred attach calls from clients */
499 if (state->recmode == CTDB_RECOVERY_NORMAL) {
500 ctdb_process_deferred_attach(state->ctdb);
503 ctdb_request_control_reply(state->ctdb, state->c, NULL, 0, NULL);
509 ctdb_drop_all_ips_event(struct tevent_context *ev, struct tevent_timer *te,
510 struct timeval t, void *private_data)
512 struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
514 DEBUG(DEBUG_ERR,(__location__ " Been in recovery mode for too long. Dropping all IPS\n"));
515 talloc_free(ctdb->release_ips_ctx);
516 ctdb->release_ips_ctx = NULL;
518 ctdb_release_all_ips(ctdb);
522 * Set up an event to drop all public ips if we remain in recovery for too
525 int ctdb_deferred_drop_all_ips(struct ctdb_context *ctdb)
527 if (ctdb->release_ips_ctx != NULL) {
528 talloc_free(ctdb->release_ips_ctx);
530 ctdb->release_ips_ctx = talloc_new(ctdb);
531 CTDB_NO_MEMORY(ctdb, ctdb->release_ips_ctx);
533 tevent_add_timer(ctdb->ev, ctdb->release_ips_ctx,
534 timeval_current_ofs(ctdb->tunable.recovery_drop_all_ips, 0),
535 ctdb_drop_all_ips_event, ctdb);
540 set the recovery mode
542 int32_t ctdb_control_set_recmode(struct ctdb_context *ctdb,
543 struct ctdb_req_control_old *c,
544 TDB_DATA indata, bool *async_reply,
545 const char **errormsg)
547 uint32_t recmode = *(uint32_t *)indata.dptr;
549 struct ctdb_set_recmode_state *state;
550 pid_t parent = getpid();
551 struct ctdb_db_context *ctdb_db;
553 /* if we enter recovery but stay in recovery for too long
554 we will eventually drop all our ip addresses
556 if (recmode == CTDB_RECOVERY_NORMAL) {
557 talloc_free(ctdb->release_ips_ctx);
558 ctdb->release_ips_ctx = NULL;
560 if (ctdb_deferred_drop_all_ips(ctdb) != 0) {
561 DEBUG(DEBUG_ERR,("Failed to set up deferred drop all ips\n"));
565 if (recmode != ctdb->recovery_mode) {
566 DEBUG(DEBUG_NOTICE,(__location__ " Recovery mode set to %s\n",
567 recmode==CTDB_RECOVERY_NORMAL?"NORMAL":"ACTIVE"));
570 if (recmode != CTDB_RECOVERY_NORMAL ||
571 ctdb->recovery_mode != CTDB_RECOVERY_ACTIVE) {
572 ctdb->recovery_mode = recmode;
576 /* some special handling when ending recovery mode */
578 for (ctdb_db = ctdb->db_list; ctdb_db != NULL; ctdb_db = ctdb_db->next) {
579 if (ctdb_db->generation != ctdb->vnn_map->generation) {
581 ("Inconsistent DB generation %u for %s\n",
582 ctdb_db->generation, ctdb_db->db_name));
583 DEBUG(DEBUG_ERR, ("Recovery mode set to ACTIVE\n"));
588 /* force the databases to thaw */
589 for (i=1; i<=NUM_DB_PRIORITIES; i++) {
590 if (ctdb_db_prio_frozen(ctdb, i)) {
591 ctdb_control_thaw(ctdb, i, false);
595 /* release any deferred attach calls from clients */
596 if (recmode == CTDB_RECOVERY_NORMAL) {
597 ctdb_process_deferred_attach(ctdb);
600 if (ctdb->recovery_lock_file == NULL) {
601 /* Not using recovery lock file */
602 ctdb->recovery_mode = recmode;
606 state = talloc(ctdb, struct ctdb_set_recmode_state);
607 CTDB_NO_MEMORY(ctdb, state);
609 state->start_time = timeval_current();
613 /* For the rest of what needs to be done, we need to do this in
614 a child process since
615 1, the call to ctdb_recovery_lock() can block if the cluster
616 filesystem is in the process of recovery.
618 ret = pipe(state->fd);
621 DEBUG(DEBUG_CRIT,(__location__ " Failed to open pipe for set_recmode child\n"));
625 state->child = ctdb_fork(ctdb);
626 if (state->child == (pid_t)-1) {
633 if (state->child == 0) {
637 prctl_set_comment("ctdb_recmode");
638 debug_extra = talloc_asprintf(NULL, "set_recmode:");
639 /* Daemon should not be able to get the recover lock,
640 * as it should be held by the recovery master */
641 if (ctdb_recovery_lock(ctdb)) {
643 ("ERROR: Daemon able to take recovery lock on \"%s\" during recovery\n",
644 ctdb->recovery_lock_file));
645 ctdb_recovery_unlock(ctdb);
649 sys_write(state->fd[1], &cc, 1);
650 /* make sure we die when our parent dies */
651 while (ctdb_kill(ctdb, parent, 0) == 0 || errno != ESRCH) {
653 sys_write(state->fd[1], &cc, 1);
658 set_close_on_exec(state->fd[0]);
662 talloc_set_destructor(state, set_recmode_destructor);
664 DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d for setrecmode\n", state->fd[0]));
666 state->te = tevent_add_timer(ctdb->ev, state, timeval_current_ofs(5, 0),
667 ctdb_set_recmode_timeout, state);
669 state->fde = tevent_add_fd(ctdb->ev, state, state->fd[0], TEVENT_FD_READ,
670 set_recmode_handler, (void *)state);
672 if (state->fde == NULL) {
676 tevent_fd_set_auto_close(state->fde);
679 state->recmode = recmode;
680 state->c = talloc_steal(state, c);
688 bool ctdb_recovery_have_lock(struct ctdb_context *ctdb)
690 return ctdb->recovery_lock_fd != -1;
694 try and get the recovery lock in shared storage - should only work
695 on the recovery master recovery daemon. Anywhere else is a bug
697 bool ctdb_recovery_lock(struct ctdb_context *ctdb)
701 ctdb->recovery_lock_fd = open(ctdb->recovery_lock_file,
702 O_RDWR|O_CREAT, 0600);
703 if (ctdb->recovery_lock_fd == -1) {
705 ("ctdb_recovery_lock: Unable to open %s - (%s)\n",
706 ctdb->recovery_lock_file, strerror(errno)));
710 set_close_on_exec(ctdb->recovery_lock_fd);
712 lock.l_type = F_WRLCK;
713 lock.l_whence = SEEK_SET;
718 if (fcntl(ctdb->recovery_lock_fd, F_SETLK, &lock) != 0) {
719 int saved_errno = errno;
720 close(ctdb->recovery_lock_fd);
721 ctdb->recovery_lock_fd = -1;
722 /* Fail silently on these errors, since they indicate
723 * lock contention, but log an error for any other
725 if (saved_errno != EACCES &&
726 saved_errno != EAGAIN) {
727 DEBUG(DEBUG_ERR,("ctdb_recovery_lock: Failed to get "
728 "recovery lock on '%s' - (%s)\n",
729 ctdb->recovery_lock_file,
730 strerror(saved_errno)));
738 void ctdb_recovery_unlock(struct ctdb_context *ctdb)
740 if (ctdb->recovery_lock_fd != -1) {
741 DEBUG(DEBUG_NOTICE, ("Releasing recovery lock\n"));
742 close(ctdb->recovery_lock_fd);
743 ctdb->recovery_lock_fd = -1;
748 delete a record as part of the vacuum process
749 only delete if we are not lmaster or dmaster, and our rsn is <= the provided rsn
750 use non-blocking locks
752 return 0 if the record was successfully deleted (i.e. it does not exist
753 when the function returns)
754 or !0 is the record still exists in the tdb after returning.
756 static int delete_tdb_record(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, struct ctdb_rec_data_old *rec)
758 TDB_DATA key, data, data2;
759 struct ctdb_ltdb_header *hdr, *hdr2;
761 /* these are really internal tdb functions - but we need them here for
762 non-blocking lock of the freelist */
763 int tdb_lock_nonblock(struct tdb_context *tdb, int list, int ltype);
764 int tdb_unlock(struct tdb_context *tdb, int list, int ltype);
767 key.dsize = rec->keylen;
768 key.dptr = &rec->data[0];
769 data.dsize = rec->datalen;
770 data.dptr = &rec->data[rec->keylen];
772 if (ctdb_lmaster(ctdb, &key) == ctdb->pnn) {
773 DEBUG(DEBUG_INFO,(__location__ " Called delete on record where we are lmaster\n"));
777 if (data.dsize != sizeof(struct ctdb_ltdb_header)) {
778 DEBUG(DEBUG_ERR,(__location__ " Bad record size\n"));
782 hdr = (struct ctdb_ltdb_header *)data.dptr;
784 /* use a non-blocking lock */
785 if (tdb_chainlock_nonblock(ctdb_db->ltdb->tdb, key) != 0) {
789 data2 = tdb_fetch(ctdb_db->ltdb->tdb, key);
790 if (data2.dptr == NULL) {
791 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
795 if (data2.dsize < sizeof(struct ctdb_ltdb_header)) {
796 if (tdb_lock_nonblock(ctdb_db->ltdb->tdb, -1, F_WRLCK) == 0) {
797 if (tdb_delete(ctdb_db->ltdb->tdb, key) != 0) {
798 DEBUG(DEBUG_CRIT,(__location__ " Failed to delete corrupt record\n"));
800 tdb_unlock(ctdb_db->ltdb->tdb, -1, F_WRLCK);
801 DEBUG(DEBUG_CRIT,(__location__ " Deleted corrupt record\n"));
803 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
808 hdr2 = (struct ctdb_ltdb_header *)data2.dptr;
810 if (hdr2->rsn > hdr->rsn) {
811 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
812 DEBUG(DEBUG_INFO,(__location__ " Skipping record with rsn=%llu - called with rsn=%llu\n",
813 (unsigned long long)hdr2->rsn, (unsigned long long)hdr->rsn));
818 /* do not allow deleting record that have readonly flags set. */
819 if (hdr->flags & CTDB_REC_RO_FLAGS) {
820 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
821 DEBUG(DEBUG_INFO,(__location__ " Skipping record with readonly flags set\n"));
825 if (hdr2->flags & CTDB_REC_RO_FLAGS) {
826 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
827 DEBUG(DEBUG_INFO,(__location__ " Skipping record with readonly flags set\n"));
832 if (hdr2->dmaster == ctdb->pnn) {
833 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
834 DEBUG(DEBUG_INFO,(__location__ " Attempted delete record where we are the dmaster\n"));
839 if (tdb_lock_nonblock(ctdb_db->ltdb->tdb, -1, F_WRLCK) != 0) {
840 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
845 if (tdb_delete(ctdb_db->ltdb->tdb, key) != 0) {
846 tdb_unlock(ctdb_db->ltdb->tdb, -1, F_WRLCK);
847 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
848 DEBUG(DEBUG_INFO,(__location__ " Failed to delete record\n"));
853 tdb_unlock(ctdb_db->ltdb->tdb, -1, F_WRLCK);
854 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
861 struct recovery_callback_state {
862 struct ctdb_req_control_old *c;
867 called when the 'recovered' event script has finished
869 static void ctdb_end_recovery_callback(struct ctdb_context *ctdb, int status, void *p)
871 struct recovery_callback_state *state = talloc_get_type(p, struct recovery_callback_state);
873 ctdb_enable_monitoring(ctdb);
874 CTDB_INCREMENT_STAT(ctdb, num_recoveries);
877 DEBUG(DEBUG_ERR,(__location__ " recovered event script failed (status %d)\n", status));
878 if (status == -ETIME) {
883 ctdb_request_control_reply(ctdb, state->c, NULL, status, NULL);
886 gettimeofday(&ctdb->last_recovery_finished, NULL);
888 if (ctdb->runstate == CTDB_RUNSTATE_FIRST_RECOVERY) {
889 ctdb_set_runstate(ctdb, CTDB_RUNSTATE_STARTUP);
894 recovery has finished
896 int32_t ctdb_control_end_recovery(struct ctdb_context *ctdb,
897 struct ctdb_req_control_old *c,
901 struct recovery_callback_state *state;
903 DEBUG(DEBUG_NOTICE,("Recovery has finished\n"));
905 ctdb_persistent_finish_trans3_commits(ctdb);
907 state = talloc(ctdb, struct recovery_callback_state);
908 CTDB_NO_MEMORY(ctdb, state);
912 ctdb_disable_monitoring(ctdb);
914 ret = ctdb_event_script_callback(ctdb, state,
915 ctdb_end_recovery_callback,
917 CTDB_EVENT_RECOVERED, "%s", "");
920 ctdb_enable_monitoring(ctdb);
922 DEBUG(DEBUG_ERR,(__location__ " Failed to end recovery\n"));
927 /* tell the control that we will be reply asynchronously */
928 state->c = talloc_steal(state, c);
934 called when the 'startrecovery' event script has finished
936 static void ctdb_start_recovery_callback(struct ctdb_context *ctdb, int status, void *p)
938 struct recovery_callback_state *state = talloc_get_type(p, struct recovery_callback_state);
941 DEBUG(DEBUG_ERR,(__location__ " startrecovery event script failed (status %d)\n", status));
944 ctdb_request_control_reply(ctdb, state->c, NULL, status, NULL);
949 run the startrecovery eventscript
951 int32_t ctdb_control_start_recovery(struct ctdb_context *ctdb,
952 struct ctdb_req_control_old *c,
956 struct recovery_callback_state *state;
958 DEBUG(DEBUG_NOTICE,(__location__ " startrecovery eventscript has been invoked\n"));
959 gettimeofday(&ctdb->last_recovery_started, NULL);
961 state = talloc(ctdb, struct recovery_callback_state);
962 CTDB_NO_MEMORY(ctdb, state);
964 state->c = talloc_steal(state, c);
966 ctdb_disable_monitoring(ctdb);
968 ret = ctdb_event_script_callback(ctdb, state,
969 ctdb_start_recovery_callback,
971 CTDB_EVENT_START_RECOVERY,
975 DEBUG(DEBUG_ERR,(__location__ " Failed to start recovery\n"));
980 /* tell the control that we will be reply asynchronously */
986 try to delete all these records as part of the vacuuming process
987 and return the records we failed to delete
989 int32_t ctdb_control_try_delete_records(struct ctdb_context *ctdb, TDB_DATA indata, TDB_DATA *outdata)
991 struct ctdb_marshall_buffer *reply = (struct ctdb_marshall_buffer *)indata.dptr;
992 struct ctdb_db_context *ctdb_db;
994 struct ctdb_rec_data_old *rec;
995 struct ctdb_marshall_buffer *records;
997 if (indata.dsize < offsetof(struct ctdb_marshall_buffer, data)) {
998 DEBUG(DEBUG_ERR,(__location__ " invalid data in try_delete_records\n"));
1002 ctdb_db = find_ctdb_db(ctdb, reply->db_id);
1004 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", reply->db_id));
1009 DEBUG(DEBUG_DEBUG,("starting try_delete_records of %u records for dbid 0x%x\n",
1010 reply->count, reply->db_id));
1013 /* create a blob to send back the records we couldnt delete */
1014 records = (struct ctdb_marshall_buffer *)
1015 talloc_zero_size(outdata,
1016 offsetof(struct ctdb_marshall_buffer, data));
1017 if (records == NULL) {
1018 DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
1021 records->db_id = ctdb_db->db_id;
1024 rec = (struct ctdb_rec_data_old *)&reply->data[0];
1025 for (i=0;i<reply->count;i++) {
1028 key.dptr = &rec->data[0];
1029 key.dsize = rec->keylen;
1030 data.dptr = &rec->data[key.dsize];
1031 data.dsize = rec->datalen;
1033 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
1034 DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record in indata\n"));
1038 /* If we cant delete the record we must add it to the reply
1039 so the lmaster knows it may not purge this record
1041 if (delete_tdb_record(ctdb, ctdb_db, rec) != 0) {
1043 struct ctdb_ltdb_header *hdr;
1045 hdr = (struct ctdb_ltdb_header *)data.dptr;
1046 data.dptr += sizeof(*hdr);
1047 data.dsize -= sizeof(*hdr);
1049 DEBUG(DEBUG_INFO, (__location__ " Failed to vacuum delete record with hash 0x%08x\n", ctdb_hash(&key)));
1051 old_size = talloc_get_size(records);
1052 records = talloc_realloc_size(outdata, records, old_size + rec->length);
1053 if (records == NULL) {
1054 DEBUG(DEBUG_ERR,(__location__ " Failed to expand\n"));
1058 memcpy(old_size+(uint8_t *)records, rec, rec->length);
1061 rec = (struct ctdb_rec_data_old *)(rec->length + (uint8_t *)rec);
1065 *outdata = ctdb_marshall_finish(records);
1071 * Store a record as part of the vacuum process:
1072 * This is called from the RECEIVE_RECORD control which
1073 * the lmaster uses to send the current empty copy
1074 * to all nodes for storing, before it lets the other
1075 * nodes delete the records in the second phase with
1076 * the TRY_DELETE_RECORDS control.
1078 * Only store if we are not lmaster or dmaster, and our
1079 * rsn is <= the provided rsn. Use non-blocking locks.
1081 * return 0 if the record was successfully stored.
1082 * return !0 if the record still exists in the tdb after returning.
1084 static int store_tdb_record(struct ctdb_context *ctdb,
1085 struct ctdb_db_context *ctdb_db,
1086 struct ctdb_rec_data_old *rec)
1088 TDB_DATA key, data, data2;
1089 struct ctdb_ltdb_header *hdr, *hdr2;
1092 key.dsize = rec->keylen;
1093 key.dptr = &rec->data[0];
1094 data.dsize = rec->datalen;
1095 data.dptr = &rec->data[rec->keylen];
1097 if (ctdb_lmaster(ctdb, &key) == ctdb->pnn) {
1098 DEBUG(DEBUG_INFO, (__location__ " Called store_tdb_record "
1099 "where we are lmaster\n"));
1103 if (data.dsize != sizeof(struct ctdb_ltdb_header)) {
1104 DEBUG(DEBUG_ERR, (__location__ " Bad record size\n"));
1108 hdr = (struct ctdb_ltdb_header *)data.dptr;
1110 /* use a non-blocking lock */
1111 if (tdb_chainlock_nonblock(ctdb_db->ltdb->tdb, key) != 0) {
1112 DEBUG(DEBUG_INFO, (__location__ " Failed to lock chain in non-blocking mode\n"));
1116 data2 = tdb_fetch(ctdb_db->ltdb->tdb, key);
1117 if (data2.dptr == NULL || data2.dsize < sizeof(struct ctdb_ltdb_header)) {
1118 if (tdb_store(ctdb_db->ltdb->tdb, key, data, 0) == -1) {
1119 DEBUG(DEBUG_ERR, (__location__ "Failed to store record\n"));
1123 DEBUG(DEBUG_INFO, (__location__ " Stored record\n"));
1128 hdr2 = (struct ctdb_ltdb_header *)data2.dptr;
1130 if (hdr2->rsn > hdr->rsn) {
1131 DEBUG(DEBUG_INFO, (__location__ " Skipping record with "
1132 "rsn=%llu - called with rsn=%llu\n",
1133 (unsigned long long)hdr2->rsn,
1134 (unsigned long long)hdr->rsn));
1139 /* do not allow vacuuming of records that have readonly flags set. */
1140 if (hdr->flags & CTDB_REC_RO_FLAGS) {
1141 DEBUG(DEBUG_INFO,(__location__ " Skipping record with readonly "
1146 if (hdr2->flags & CTDB_REC_RO_FLAGS) {
1147 DEBUG(DEBUG_INFO,(__location__ " Skipping record with readonly "
1153 if (hdr2->dmaster == ctdb->pnn) {
1154 DEBUG(DEBUG_INFO, (__location__ " Attempted to store record "
1155 "where we are the dmaster\n"));
1160 if (tdb_store(ctdb_db->ltdb->tdb, key, data, 0) != 0) {
1161 DEBUG(DEBUG_INFO,(__location__ " Failed to store record\n"));
1169 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
1177 * Try to store all these records as part of the vacuuming process
1178 * and return the records we failed to store.
1180 int32_t ctdb_control_receive_records(struct ctdb_context *ctdb,
1181 TDB_DATA indata, TDB_DATA *outdata)
1183 struct ctdb_marshall_buffer *reply = (struct ctdb_marshall_buffer *)indata.dptr;
1184 struct ctdb_db_context *ctdb_db;
1186 struct ctdb_rec_data_old *rec;
1187 struct ctdb_marshall_buffer *records;
1189 if (indata.dsize < offsetof(struct ctdb_marshall_buffer, data)) {
1191 (__location__ " invalid data in receive_records\n"));
1195 ctdb_db = find_ctdb_db(ctdb, reply->db_id);
1197 DEBUG(DEBUG_ERR, (__location__ " Unknown db 0x%08x\n",
1202 DEBUG(DEBUG_DEBUG, ("starting receive_records of %u records for "
1203 "dbid 0x%x\n", reply->count, reply->db_id));
1205 /* create a blob to send back the records we could not store */
1206 records = (struct ctdb_marshall_buffer *)
1207 talloc_zero_size(outdata,
1208 offsetof(struct ctdb_marshall_buffer, data));
1209 if (records == NULL) {
1210 DEBUG(DEBUG_ERR, (__location__ " Out of memory\n"));
1213 records->db_id = ctdb_db->db_id;
1215 rec = (struct ctdb_rec_data_old *)&reply->data[0];
1216 for (i=0; i<reply->count; i++) {
1219 key.dptr = &rec->data[0];
1220 key.dsize = rec->keylen;
1221 data.dptr = &rec->data[key.dsize];
1222 data.dsize = rec->datalen;
1224 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
1225 DEBUG(DEBUG_CRIT, (__location__ " bad ltdb record "
1231 * If we can not store the record we must add it to the reply
1232 * so the lmaster knows it may not purge this record.
1234 if (store_tdb_record(ctdb, ctdb_db, rec) != 0) {
1236 struct ctdb_ltdb_header *hdr;
1238 hdr = (struct ctdb_ltdb_header *)data.dptr;
1239 data.dptr += sizeof(*hdr);
1240 data.dsize -= sizeof(*hdr);
1242 DEBUG(DEBUG_INFO, (__location__ " Failed to store "
1243 "record with hash 0x%08x in vacuum "
1244 "via RECEIVE_RECORDS\n",
1247 old_size = talloc_get_size(records);
1248 records = talloc_realloc_size(outdata, records,
1249 old_size + rec->length);
1250 if (records == NULL) {
1251 DEBUG(DEBUG_ERR, (__location__ " Failed to "
1256 memcpy(old_size+(uint8_t *)records, rec, rec->length);
1259 rec = (struct ctdb_rec_data_old *)(rec->length + (uint8_t *)rec);
1262 *outdata = ctdb_marshall_finish(records);
1271 int32_t ctdb_control_get_capabilities(struct ctdb_context *ctdb, TDB_DATA *outdata)
1273 uint32_t *capabilities = NULL;
1275 capabilities = talloc(outdata, uint32_t);
1276 CTDB_NO_MEMORY(ctdb, capabilities);
1277 *capabilities = ctdb->capabilities;
1279 outdata->dsize = sizeof(uint32_t);
1280 outdata->dptr = (uint8_t *)capabilities;
1285 /* The recovery daemon will ping us at regular intervals.
1286 If we havent been pinged for a while we assume the recovery
1287 daemon is inoperable and we restart.
1289 static void ctdb_recd_ping_timeout(struct tevent_context *ev,
1290 struct tevent_timer *te,
1291 struct timeval t, void *p)
1293 struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context);
1294 uint32_t *count = talloc_get_type(ctdb->recd_ping_count, uint32_t);
1296 DEBUG(DEBUG_ERR, ("Recovery daemon ping timeout. Count : %u\n", *count));
1298 if (*count < ctdb->tunable.recd_ping_failcount) {
1300 tevent_add_timer(ctdb->ev, ctdb->recd_ping_count,
1301 timeval_current_ofs(ctdb->tunable.recd_ping_timeout, 0),
1302 ctdb_recd_ping_timeout, ctdb);
1306 DEBUG(DEBUG_ERR, ("Final timeout for recovery daemon ping. Restarting recovery daemon. (This can be caused if the cluster filesystem has hung)\n"));
1308 ctdb_stop_recoverd(ctdb);
1309 ctdb_start_recoverd(ctdb);
1312 int32_t ctdb_control_recd_ping(struct ctdb_context *ctdb)
1314 talloc_free(ctdb->recd_ping_count);
1316 ctdb->recd_ping_count = talloc_zero(ctdb, uint32_t);
1317 CTDB_NO_MEMORY(ctdb, ctdb->recd_ping_count);
1319 if (ctdb->tunable.recd_ping_timeout != 0) {
1320 tevent_add_timer(ctdb->ev, ctdb->recd_ping_count,
1321 timeval_current_ofs(ctdb->tunable.recd_ping_timeout, 0),
1322 ctdb_recd_ping_timeout, ctdb);
1330 int32_t ctdb_control_set_recmaster(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata)
1332 uint32_t new_recmaster;
1334 CHECK_CONTROL_DATA_SIZE(sizeof(uint32_t));
1335 new_recmaster = ((uint32_t *)(&indata.dptr[0]))[0];
1337 if (ctdb->pnn != new_recmaster && ctdb->recovery_master == ctdb->pnn) {
1339 ("This node (%u) is no longer the recovery master\n", ctdb->pnn));
1342 if (ctdb->pnn == new_recmaster && ctdb->recovery_master != new_recmaster) {
1344 ("This node (%u) is now the recovery master\n", ctdb->pnn));
1347 ctdb->recovery_master = new_recmaster;
1352 int32_t ctdb_control_stop_node(struct ctdb_context *ctdb)
1354 DEBUG(DEBUG_NOTICE, ("Stopping node\n"));
1355 ctdb_disable_monitoring(ctdb);
1356 ctdb->nodes[ctdb->pnn]->flags |= NODE_FLAGS_STOPPED;
1361 int32_t ctdb_control_continue_node(struct ctdb_context *ctdb)
1363 DEBUG(DEBUG_NOTICE, ("Continue node\n"));
1364 ctdb->nodes[ctdb->pnn]->flags &= ~NODE_FLAGS_STOPPED;