4 Copyright (C) Andrew Tridgell 2007
5 Copyright (C) Ronnie Sahlberg 2007
7 This program is free software; you can redistribute it and/or modify
8 it under the terms of the GNU General Public License as published by
9 the Free Software Foundation; either version 3 of the License, or
10 (at your option) any later version.
12 This program is distributed in the hope that it will be useful,
13 but WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 GNU General Public License for more details.
17 You should have received a copy of the GNU General Public License
18 along with this program; if not, see <http://www.gnu.org/licenses/>.
21 #include "system/time.h"
22 #include "system/network.h"
23 #include "system/filesys.h"
24 #include "system/wait.h"
30 #include "lib/tdb_wrap/tdb_wrap.h"
31 #include "lib/util/dlinklist.h"
32 #include "lib/util/debug.h"
33 #include "lib/util/time.h"
34 #include "lib/util/util_process.h"
36 #include "ctdb_private.h"
37 #include "ctdb_client.h"
39 #include "common/system.h"
40 #include "common/common.h"
41 #include "common/logging.h"
44 ctdb_control_getvnnmap(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
46 struct ctdb_vnn_map_wire *map;
49 CHECK_CONTROL_DATA_SIZE(0);
51 len = offsetof(struct ctdb_vnn_map_wire, map) + sizeof(uint32_t)*ctdb->vnn_map->size;
52 map = talloc_size(outdata, len);
53 CTDB_NO_MEMORY(ctdb, map);
55 map->generation = ctdb->vnn_map->generation;
56 map->size = ctdb->vnn_map->size;
57 memcpy(map->map, ctdb->vnn_map->map, sizeof(uint32_t)*map->size);
60 outdata->dptr = (uint8_t *)map;
66 ctdb_control_setvnnmap(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
68 struct ctdb_vnn_map_wire *map = (struct ctdb_vnn_map_wire *)indata.dptr;
70 if (ctdb->recovery_mode != CTDB_RECOVERY_ACTIVE) {
71 DEBUG(DEBUG_ERR, ("Attempt to set vnnmap when not in recovery\n"));
75 talloc_free(ctdb->vnn_map);
77 ctdb->vnn_map = talloc(ctdb, struct ctdb_vnn_map);
78 CTDB_NO_MEMORY(ctdb, ctdb->vnn_map);
80 ctdb->vnn_map->generation = map->generation;
81 ctdb->vnn_map->size = map->size;
82 ctdb->vnn_map->map = talloc_array(ctdb->vnn_map, uint32_t, map->size);
83 CTDB_NO_MEMORY(ctdb, ctdb->vnn_map->map);
85 memcpy(ctdb->vnn_map->map, map->map, sizeof(uint32_t)*map->size);
91 ctdb_control_getdbmap(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
94 struct ctdb_db_context *ctdb_db;
95 struct ctdb_dbid_map_old *dbid_map;
97 CHECK_CONTROL_DATA_SIZE(0);
100 for(ctdb_db=ctdb->db_list;ctdb_db;ctdb_db=ctdb_db->next){
105 outdata->dsize = offsetof(struct ctdb_dbid_map_old, dbs) + sizeof(dbid_map->dbs[0])*len;
106 outdata->dptr = (unsigned char *)talloc_zero_size(outdata, outdata->dsize);
107 if (!outdata->dptr) {
108 DEBUG(DEBUG_ALERT, (__location__ " Failed to allocate dbmap array\n"));
112 dbid_map = (struct ctdb_dbid_map_old *)outdata->dptr;
114 for (i=0,ctdb_db=ctdb->db_list;ctdb_db;i++,ctdb_db=ctdb_db->next){
115 dbid_map->dbs[i].db_id = ctdb_db->db_id;
116 if (ctdb_db->persistent != 0) {
117 dbid_map->dbs[i].flags |= CTDB_DB_FLAGS_PERSISTENT;
119 if (ctdb_db->readonly != 0) {
120 dbid_map->dbs[i].flags |= CTDB_DB_FLAGS_READONLY;
122 if (ctdb_db->sticky != 0) {
123 dbid_map->dbs[i].flags |= CTDB_DB_FLAGS_STICKY;
131 ctdb_control_getnodemap(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
133 CHECK_CONTROL_DATA_SIZE(0);
135 outdata->dptr = (unsigned char *)ctdb_node_list_to_map(ctdb->nodes,
138 if (outdata->dptr == NULL) {
142 outdata->dsize = talloc_get_size(outdata->dptr);
148 reload the nodes file
151 ctdb_control_reload_nodes_file(struct ctdb_context *ctdb, uint32_t opcode)
155 struct ctdb_node **nodes;
157 tmp_ctx = talloc_new(ctdb);
159 /* steal the old nodes file for a while */
160 talloc_steal(tmp_ctx, ctdb->nodes);
163 num_nodes = ctdb->num_nodes;
166 /* load the new nodes file */
167 ctdb_load_nodes_file(ctdb);
169 for (i=0; i<ctdb->num_nodes; i++) {
170 /* keep any identical pre-existing nodes and connections */
171 if ((i < num_nodes) && ctdb_same_address(&ctdb->nodes[i]->address, &nodes[i]->address)) {
172 talloc_free(ctdb->nodes[i]);
173 ctdb->nodes[i] = talloc_steal(ctdb->nodes, nodes[i]);
177 if (ctdb->nodes[i]->flags & NODE_FLAGS_DELETED) {
181 /* any new or different nodes must be added */
182 if (ctdb->methods->add_node(ctdb->nodes[i]) != 0) {
183 DEBUG(DEBUG_CRIT, (__location__ " methods->add_node failed at %d\n", i));
184 ctdb_fatal(ctdb, "failed to add node. shutting down\n");
186 if (ctdb->methods->connect_node(ctdb->nodes[i]) != 0) {
187 DEBUG(DEBUG_CRIT, (__location__ " methods->add_connect failed at %d\n", i));
188 ctdb_fatal(ctdb, "failed to connect to node. shutting down\n");
192 /* tell the recovery daemon to reaload the nodes file too */
193 ctdb_daemon_send_message(ctdb, ctdb->pnn, CTDB_SRVID_RELOAD_NODES, tdb_null);
195 talloc_free(tmp_ctx);
201 a traverse function for pulling all relevent records from pulldb
204 struct ctdb_context *ctdb;
205 struct ctdb_db_context *ctdb_db;
206 struct ctdb_marshall_buffer *pulldata;
208 uint32_t allocated_len;
212 static int traverse_pulldb(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p)
214 struct pulldb_data *params = (struct pulldb_data *)p;
215 struct ctdb_rec_data_old *rec;
216 struct ctdb_context *ctdb = params->ctdb;
217 struct ctdb_db_context *ctdb_db = params->ctdb_db;
219 /* add the record to the blob */
220 rec = ctdb_marshall_record(params->pulldata, 0, key, NULL, data);
222 params->failed = true;
225 if (params->len + rec->length >= params->allocated_len) {
226 params->allocated_len = rec->length + params->len + ctdb->tunable.pulldb_preallocation_size;
227 params->pulldata = talloc_realloc_size(NULL, params->pulldata, params->allocated_len);
229 if (params->pulldata == NULL) {
230 DEBUG(DEBUG_CRIT,(__location__ " Failed to expand pulldb_data to %u\n", rec->length + params->len));
231 ctdb_fatal(params->ctdb, "failed to allocate memory for recovery. shutting down\n");
233 params->pulldata->count++;
234 memcpy(params->len+(uint8_t *)params->pulldata, rec, rec->length);
235 params->len += rec->length;
237 if (ctdb->tunable.db_record_size_warn != 0 && rec->length > ctdb->tunable.db_record_size_warn) {
238 DEBUG(DEBUG_ERR,("Data record in %s is big. Record size is %d bytes\n", ctdb_db->db_name, (int)rec->length));
247 pull a bunch of records from a ltdb, filtering by lmaster
249 int32_t ctdb_control_pull_db(struct ctdb_context *ctdb, TDB_DATA indata, TDB_DATA *outdata)
251 struct ctdb_pulldb *pull;
252 struct ctdb_db_context *ctdb_db;
253 struct pulldb_data params;
254 struct ctdb_marshall_buffer *reply;
256 pull = (struct ctdb_pulldb *)indata.dptr;
258 ctdb_db = find_ctdb_db(ctdb, pull->db_id);
260 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", pull->db_id));
264 if (!ctdb_db_frozen(ctdb_db)) {
266 ("rejecting ctdb_control_pull_db when not frozen\n"));
270 reply = talloc_zero(outdata, struct ctdb_marshall_buffer);
271 CTDB_NO_MEMORY(ctdb, reply);
273 reply->db_id = pull->db_id;
276 params.ctdb_db = ctdb_db;
277 params.pulldata = reply;
278 params.len = offsetof(struct ctdb_marshall_buffer, data);
279 params.allocated_len = params.len;
280 params.failed = false;
282 if (ctdb_db->unhealthy_reason) {
283 /* this is just a warning, as the tdb should be empty anyway */
284 DEBUG(DEBUG_WARNING,("db(%s) unhealty in ctdb_control_pull_db: %s\n",
285 ctdb_db->db_name, ctdb_db->unhealthy_reason));
288 if (ctdb_lockdb_mark(ctdb_db) != 0) {
289 DEBUG(DEBUG_ERR,(__location__ " Failed to get lock on entire db - failing\n"));
293 if (tdb_traverse_read(ctdb_db->ltdb->tdb, traverse_pulldb, ¶ms) == -1) {
294 DEBUG(DEBUG_ERR,(__location__ " Failed to get traverse db '%s'\n", ctdb_db->db_name));
295 ctdb_lockdb_unmark(ctdb_db);
296 talloc_free(params.pulldata);
300 ctdb_lockdb_unmark(ctdb_db);
302 outdata->dptr = (uint8_t *)params.pulldata;
303 outdata->dsize = params.len;
305 if (ctdb->tunable.db_record_count_warn != 0 && params.pulldata->count > ctdb->tunable.db_record_count_warn) {
306 DEBUG(DEBUG_ERR,("Database %s is big. Contains %d records\n", ctdb_db->db_name, params.pulldata->count));
308 if (ctdb->tunable.db_size_warn != 0 && outdata->dsize > ctdb->tunable.db_size_warn) {
309 DEBUG(DEBUG_ERR,("Database %s is big. Contains %d bytes\n", ctdb_db->db_name, (int)outdata->dsize));
317 push a bunch of records into a ltdb, filtering by rsn
319 int32_t ctdb_control_push_db(struct ctdb_context *ctdb, TDB_DATA indata)
321 struct ctdb_marshall_buffer *reply = (struct ctdb_marshall_buffer *)indata.dptr;
322 struct ctdb_db_context *ctdb_db;
324 struct ctdb_rec_data_old *rec;
326 if (indata.dsize < offsetof(struct ctdb_marshall_buffer, data)) {
327 DEBUG(DEBUG_ERR,(__location__ " invalid data in pulldb reply\n"));
331 ctdb_db = find_ctdb_db(ctdb, reply->db_id);
333 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", reply->db_id));
337 if (!ctdb_db_frozen(ctdb_db)) {
339 ("rejecting ctdb_control_push_db when not frozen\n"));
343 if (ctdb_lockdb_mark(ctdb_db) != 0) {
344 DEBUG(DEBUG_ERR,(__location__ " Failed to get lock on entire db - failing\n"));
348 rec = (struct ctdb_rec_data_old *)&reply->data[0];
350 DEBUG(DEBUG_INFO,("starting push of %u records for dbid 0x%x\n",
351 reply->count, reply->db_id));
353 for (i=0;i<reply->count;i++) {
355 struct ctdb_ltdb_header *hdr;
357 key.dptr = &rec->data[0];
358 key.dsize = rec->keylen;
359 data.dptr = &rec->data[key.dsize];
360 data.dsize = rec->datalen;
362 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
363 DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record\n"));
366 hdr = (struct ctdb_ltdb_header *)data.dptr;
367 /* strip off any read only record flags. All readonly records
368 are revoked implicitely by a recovery
370 hdr->flags &= ~CTDB_REC_RO_FLAGS;
372 data.dptr += sizeof(*hdr);
373 data.dsize -= sizeof(*hdr);
375 ret = ctdb_ltdb_store(ctdb_db, key, hdr, data);
377 DEBUG(DEBUG_CRIT, (__location__ " Unable to store record\n"));
381 rec = (struct ctdb_rec_data_old *)(rec->length + (uint8_t *)rec);
384 DEBUG(DEBUG_DEBUG,("finished push of %u records for dbid 0x%x\n",
385 reply->count, reply->db_id));
387 if (ctdb_db->readonly) {
388 DEBUG(DEBUG_CRIT,("Clearing the tracking database for dbid 0x%x\n",
390 if (tdb_wipe_all(ctdb_db->rottdb) != 0) {
391 DEBUG(DEBUG_ERR,("Failed to wipe tracking database for 0x%x. Dropping read-only delegation support\n", ctdb_db->db_id));
392 ctdb_db->readonly = false;
393 tdb_close(ctdb_db->rottdb);
394 ctdb_db->rottdb = NULL;
395 ctdb_db->readonly = false;
397 while (ctdb_db->revokechild_active != NULL) {
398 talloc_free(ctdb_db->revokechild_active);
402 ctdb_lockdb_unmark(ctdb_db);
406 ctdb_lockdb_unmark(ctdb_db);
410 struct ctdb_set_recmode_state {
411 struct ctdb_context *ctdb;
412 struct ctdb_req_control_old *c;
414 struct tevent_timer *te;
415 struct tevent_fd *fde;
417 struct timeval start_time;
421 called if our set_recmode child times out. this would happen if
422 ctdb_recovery_lock() would block.
424 static void ctdb_set_recmode_timeout(struct tevent_context *ev,
425 struct tevent_timer *te,
426 struct timeval t, void *private_data)
428 struct ctdb_set_recmode_state *state = talloc_get_type(private_data,
429 struct ctdb_set_recmode_state);
431 /* we consider this a success, not a failure, as we failed to
432 set the recovery lock which is what we wanted. This can be
433 caused by the cluster filesystem being very slow to
434 arbitrate locks immediately after a node failure.
436 DEBUG(DEBUG_ERR,(__location__ " set_recmode child process hung/timedout CFS slow to grant locks? (allowing recmode set anyway)\n"));
437 state->ctdb->recovery_mode = CTDB_RECOVERY_NORMAL;
438 ctdb_request_control_reply(state->ctdb, state->c, NULL, 0, NULL);
443 /* when we free the recmode state we must kill any child process.
445 static int set_recmode_destructor(struct ctdb_set_recmode_state *state)
447 double l = timeval_elapsed(&state->start_time);
449 CTDB_UPDATE_RECLOCK_LATENCY(state->ctdb, "daemon reclock", reclock.ctdbd, l);
451 if (state->fd[0] != -1) {
454 ctdb_kill(state->ctdb, state->child, SIGKILL);
458 /* this is called when the client process has completed ctdb_recovery_lock()
459 and has written data back to us through the pipe.
461 static void set_recmode_handler(struct tevent_context *ev,
462 struct tevent_fd *fde,
463 uint16_t flags, void *private_data)
465 struct ctdb_set_recmode_state *state= talloc_get_type(private_data,
466 struct ctdb_set_recmode_state);
470 /* we got a response from our child process so we can abort the
473 talloc_free(state->te);
477 /* If, as expected, the child was unable to take the recovery
478 * lock then it will have written 0 into the pipe, so
479 * continue. However, any other value (e.g. 1) indicates that
480 * it was able to take the recovery lock when it should have
481 * been held by the recovery daemon on the recovery master.
483 ret = sys_read(state->fd[0], &c, 1);
484 if (ret != 1 || c != 0) {
485 ctdb_request_control_reply(
486 state->ctdb, state->c, NULL, -1,
487 "Took recovery lock from daemon during recovery - probably a cluster filesystem lock coherence problem");
492 state->ctdb->recovery_mode = CTDB_RECOVERY_NORMAL;
494 /* release any deferred attach calls from clients */
495 ctdb_process_deferred_attach(state->ctdb);
497 ctdb_request_control_reply(state->ctdb, state->c, NULL, 0, NULL);
503 ctdb_drop_all_ips_event(struct tevent_context *ev, struct tevent_timer *te,
504 struct timeval t, void *private_data)
506 struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
508 DEBUG(DEBUG_ERR,(__location__ " Been in recovery mode for too long. Dropping all IPS\n"));
509 talloc_free(ctdb->release_ips_ctx);
510 ctdb->release_ips_ctx = NULL;
512 ctdb_release_all_ips(ctdb);
516 * Set up an event to drop all public ips if we remain in recovery for too
519 int ctdb_deferred_drop_all_ips(struct ctdb_context *ctdb)
521 if (ctdb->release_ips_ctx != NULL) {
522 talloc_free(ctdb->release_ips_ctx);
524 ctdb->release_ips_ctx = talloc_new(ctdb);
525 CTDB_NO_MEMORY(ctdb, ctdb->release_ips_ctx);
527 tevent_add_timer(ctdb->ev, ctdb->release_ips_ctx,
528 timeval_current_ofs(ctdb->tunable.recovery_drop_all_ips, 0),
529 ctdb_drop_all_ips_event, ctdb);
534 set the recovery mode
536 int32_t ctdb_control_set_recmode(struct ctdb_context *ctdb,
537 struct ctdb_req_control_old *c,
538 TDB_DATA indata, bool *async_reply,
539 const char **errormsg)
541 uint32_t recmode = *(uint32_t *)indata.dptr;
543 struct ctdb_set_recmode_state *state;
544 pid_t parent = getpid();
545 struct ctdb_db_context *ctdb_db;
547 /* if we enter recovery but stay in recovery for too long
548 we will eventually drop all our ip addresses
550 if (recmode == CTDB_RECOVERY_NORMAL) {
551 talloc_free(ctdb->release_ips_ctx);
552 ctdb->release_ips_ctx = NULL;
554 if (ctdb_deferred_drop_all_ips(ctdb) != 0) {
555 DEBUG(DEBUG_ERR,("Failed to set up deferred drop all ips\n"));
559 if (recmode != ctdb->recovery_mode) {
560 DEBUG(DEBUG_NOTICE,(__location__ " Recovery mode set to %s\n",
561 recmode==CTDB_RECOVERY_NORMAL?"NORMAL":"ACTIVE"));
564 if (recmode != CTDB_RECOVERY_NORMAL ||
565 ctdb->recovery_mode != CTDB_RECOVERY_ACTIVE) {
566 ctdb->recovery_mode = recmode;
570 /* From this point: recmode == CTDB_RECOVERY_NORMAL
572 * Therefore, what follows is special handling when setting
573 * recovery mode back to normal */
575 for (ctdb_db = ctdb->db_list; ctdb_db != NULL; ctdb_db = ctdb_db->next) {
576 if (ctdb_db->generation != ctdb->vnn_map->generation) {
578 ("Inconsistent DB generation %u for %s\n",
579 ctdb_db->generation, ctdb_db->db_name));
580 DEBUG(DEBUG_ERR, ("Recovery mode set to ACTIVE\n"));
585 /* force the databases to thaw */
586 for (i=1; i<=NUM_DB_PRIORITIES; i++) {
587 if (ctdb_db_prio_frozen(ctdb, i)) {
588 ctdb_control_thaw(ctdb, i, false);
592 /* release any deferred attach calls from clients */
593 if (recmode == CTDB_RECOVERY_NORMAL) {
594 ctdb_process_deferred_attach(ctdb);
597 if (ctdb->recovery_lock_file == NULL) {
598 /* Not using recovery lock file */
599 ctdb->recovery_mode = recmode;
603 state = talloc(ctdb, struct ctdb_set_recmode_state);
604 CTDB_NO_MEMORY(ctdb, state);
606 state->start_time = timeval_current();
610 /* For the rest of what needs to be done, we need to do this in
611 a child process since
612 1, the call to ctdb_recovery_lock() can block if the cluster
613 filesystem is in the process of recovery.
615 ret = pipe(state->fd);
618 DEBUG(DEBUG_CRIT,(__location__ " Failed to open pipe for set_recmode child\n"));
622 state->child = ctdb_fork(ctdb);
623 if (state->child == (pid_t)-1) {
630 if (state->child == 0) {
634 prctl_set_comment("ctdb_recmode");
635 debug_extra = talloc_asprintf(NULL, "set_recmode:");
636 /* Daemon should not be able to get the recover lock,
637 * as it should be held by the recovery master */
638 if (ctdb_recovery_lock(ctdb)) {
640 ("ERROR: Daemon able to take recovery lock on \"%s\" during recovery\n",
641 ctdb->recovery_lock_file));
642 ctdb_recovery_unlock(ctdb);
646 sys_write(state->fd[1], &cc, 1);
647 ctdb_wait_for_process_to_exit(parent);
651 set_close_on_exec(state->fd[0]);
655 talloc_set_destructor(state, set_recmode_destructor);
657 DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d for setrecmode\n", state->fd[0]));
659 state->te = tevent_add_timer(ctdb->ev, state, timeval_current_ofs(5, 0),
660 ctdb_set_recmode_timeout, state);
662 state->fde = tevent_add_fd(ctdb->ev, state, state->fd[0], TEVENT_FD_READ,
663 set_recmode_handler, (void *)state);
665 if (state->fde == NULL) {
669 tevent_fd_set_auto_close(state->fde);
672 state->c = talloc_steal(state, c);
680 bool ctdb_recovery_have_lock(struct ctdb_context *ctdb)
682 return ctdb->recovery_lock_fd != -1;
686 try and get the recovery lock in shared storage - should only work
687 on the recovery master recovery daemon. Anywhere else is a bug
689 bool ctdb_recovery_lock(struct ctdb_context *ctdb)
693 ctdb->recovery_lock_fd = open(ctdb->recovery_lock_file,
694 O_RDWR|O_CREAT, 0600);
695 if (ctdb->recovery_lock_fd == -1) {
697 ("ctdb_recovery_lock: Unable to open %s - (%s)\n",
698 ctdb->recovery_lock_file, strerror(errno)));
702 set_close_on_exec(ctdb->recovery_lock_fd);
704 lock.l_type = F_WRLCK;
705 lock.l_whence = SEEK_SET;
710 if (fcntl(ctdb->recovery_lock_fd, F_SETLK, &lock) != 0) {
711 int saved_errno = errno;
712 close(ctdb->recovery_lock_fd);
713 ctdb->recovery_lock_fd = -1;
714 /* Fail silently on these errors, since they indicate
715 * lock contention, but log an error for any other
717 if (saved_errno != EACCES &&
718 saved_errno != EAGAIN) {
719 DEBUG(DEBUG_ERR,("ctdb_recovery_lock: Failed to get "
720 "recovery lock on '%s' - (%s)\n",
721 ctdb->recovery_lock_file,
722 strerror(saved_errno)));
730 void ctdb_recovery_unlock(struct ctdb_context *ctdb)
732 if (ctdb->recovery_lock_fd != -1) {
733 DEBUG(DEBUG_NOTICE, ("Releasing recovery lock\n"));
734 close(ctdb->recovery_lock_fd);
735 ctdb->recovery_lock_fd = -1;
740 delete a record as part of the vacuum process
741 only delete if we are not lmaster or dmaster, and our rsn is <= the provided rsn
742 use non-blocking locks
744 return 0 if the record was successfully deleted (i.e. it does not exist
745 when the function returns)
746 or !0 is the record still exists in the tdb after returning.
748 static int delete_tdb_record(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, struct ctdb_rec_data_old *rec)
750 TDB_DATA key, data, data2;
751 struct ctdb_ltdb_header *hdr, *hdr2;
753 /* these are really internal tdb functions - but we need them here for
754 non-blocking lock of the freelist */
755 int tdb_lock_nonblock(struct tdb_context *tdb, int list, int ltype);
756 int tdb_unlock(struct tdb_context *tdb, int list, int ltype);
759 key.dsize = rec->keylen;
760 key.dptr = &rec->data[0];
761 data.dsize = rec->datalen;
762 data.dptr = &rec->data[rec->keylen];
764 if (ctdb_lmaster(ctdb, &key) == ctdb->pnn) {
765 DEBUG(DEBUG_INFO,(__location__ " Called delete on record where we are lmaster\n"));
769 if (data.dsize != sizeof(struct ctdb_ltdb_header)) {
770 DEBUG(DEBUG_ERR,(__location__ " Bad record size\n"));
774 hdr = (struct ctdb_ltdb_header *)data.dptr;
776 /* use a non-blocking lock */
777 if (tdb_chainlock_nonblock(ctdb_db->ltdb->tdb, key) != 0) {
781 data2 = tdb_fetch(ctdb_db->ltdb->tdb, key);
782 if (data2.dptr == NULL) {
783 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
787 if (data2.dsize < sizeof(struct ctdb_ltdb_header)) {
788 if (tdb_lock_nonblock(ctdb_db->ltdb->tdb, -1, F_WRLCK) == 0) {
789 if (tdb_delete(ctdb_db->ltdb->tdb, key) != 0) {
790 DEBUG(DEBUG_CRIT,(__location__ " Failed to delete corrupt record\n"));
792 tdb_unlock(ctdb_db->ltdb->tdb, -1, F_WRLCK);
793 DEBUG(DEBUG_CRIT,(__location__ " Deleted corrupt record\n"));
795 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
800 hdr2 = (struct ctdb_ltdb_header *)data2.dptr;
802 if (hdr2->rsn > hdr->rsn) {
803 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
804 DEBUG(DEBUG_INFO,(__location__ " Skipping record with rsn=%llu - called with rsn=%llu\n",
805 (unsigned long long)hdr2->rsn, (unsigned long long)hdr->rsn));
810 /* do not allow deleting record that have readonly flags set. */
811 if (hdr->flags & CTDB_REC_RO_FLAGS) {
812 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
813 DEBUG(DEBUG_INFO,(__location__ " Skipping record with readonly flags set\n"));
817 if (hdr2->flags & CTDB_REC_RO_FLAGS) {
818 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
819 DEBUG(DEBUG_INFO,(__location__ " Skipping record with readonly flags set\n"));
824 if (hdr2->dmaster == ctdb->pnn) {
825 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
826 DEBUG(DEBUG_INFO,(__location__ " Attempted delete record where we are the dmaster\n"));
831 if (tdb_lock_nonblock(ctdb_db->ltdb->tdb, -1, F_WRLCK) != 0) {
832 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
837 if (tdb_delete(ctdb_db->ltdb->tdb, key) != 0) {
838 tdb_unlock(ctdb_db->ltdb->tdb, -1, F_WRLCK);
839 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
840 DEBUG(DEBUG_INFO,(__location__ " Failed to delete record\n"));
845 tdb_unlock(ctdb_db->ltdb->tdb, -1, F_WRLCK);
846 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
853 struct recovery_callback_state {
854 struct ctdb_req_control_old *c;
859 called when the 'recovered' event script has finished
861 static void ctdb_end_recovery_callback(struct ctdb_context *ctdb, int status, void *p)
863 struct recovery_callback_state *state = talloc_get_type(p, struct recovery_callback_state);
865 ctdb_enable_monitoring(ctdb);
866 CTDB_INCREMENT_STAT(ctdb, num_recoveries);
869 DEBUG(DEBUG_ERR,(__location__ " recovered event script failed (status %d)\n", status));
870 if (status == -ETIME) {
875 ctdb_request_control_reply(ctdb, state->c, NULL, status, NULL);
878 gettimeofday(&ctdb->last_recovery_finished, NULL);
880 if (ctdb->runstate == CTDB_RUNSTATE_FIRST_RECOVERY) {
881 ctdb_set_runstate(ctdb, CTDB_RUNSTATE_STARTUP);
886 recovery has finished
888 int32_t ctdb_control_end_recovery(struct ctdb_context *ctdb,
889 struct ctdb_req_control_old *c,
893 struct recovery_callback_state *state;
895 DEBUG(DEBUG_NOTICE,("Recovery has finished\n"));
897 ctdb_persistent_finish_trans3_commits(ctdb);
899 state = talloc(ctdb, struct recovery_callback_state);
900 CTDB_NO_MEMORY(ctdb, state);
904 ctdb_disable_monitoring(ctdb);
906 ret = ctdb_event_script_callback(ctdb, state,
907 ctdb_end_recovery_callback,
909 CTDB_EVENT_RECOVERED, "%s", "");
912 ctdb_enable_monitoring(ctdb);
914 DEBUG(DEBUG_ERR,(__location__ " Failed to end recovery\n"));
919 /* tell the control that we will be reply asynchronously */
920 state->c = talloc_steal(state, c);
926 called when the 'startrecovery' event script has finished
928 static void ctdb_start_recovery_callback(struct ctdb_context *ctdb, int status, void *p)
930 struct recovery_callback_state *state = talloc_get_type(p, struct recovery_callback_state);
933 DEBUG(DEBUG_ERR,(__location__ " startrecovery event script failed (status %d)\n", status));
936 ctdb_request_control_reply(ctdb, state->c, NULL, status, NULL);
941 run the startrecovery eventscript
943 int32_t ctdb_control_start_recovery(struct ctdb_context *ctdb,
944 struct ctdb_req_control_old *c,
948 struct recovery_callback_state *state;
950 DEBUG(DEBUG_NOTICE,(__location__ " startrecovery eventscript has been invoked\n"));
951 gettimeofday(&ctdb->last_recovery_started, NULL);
953 state = talloc(ctdb, struct recovery_callback_state);
954 CTDB_NO_MEMORY(ctdb, state);
956 state->c = talloc_steal(state, c);
958 ctdb_disable_monitoring(ctdb);
960 ret = ctdb_event_script_callback(ctdb, state,
961 ctdb_start_recovery_callback,
963 CTDB_EVENT_START_RECOVERY,
967 DEBUG(DEBUG_ERR,(__location__ " Failed to start recovery\n"));
972 /* tell the control that we will be reply asynchronously */
978 try to delete all these records as part of the vacuuming process
979 and return the records we failed to delete
981 int32_t ctdb_control_try_delete_records(struct ctdb_context *ctdb, TDB_DATA indata, TDB_DATA *outdata)
983 struct ctdb_marshall_buffer *reply = (struct ctdb_marshall_buffer *)indata.dptr;
984 struct ctdb_db_context *ctdb_db;
986 struct ctdb_rec_data_old *rec;
987 struct ctdb_marshall_buffer *records;
989 if (indata.dsize < offsetof(struct ctdb_marshall_buffer, data)) {
990 DEBUG(DEBUG_ERR,(__location__ " invalid data in try_delete_records\n"));
994 ctdb_db = find_ctdb_db(ctdb, reply->db_id);
996 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", reply->db_id));
1001 DEBUG(DEBUG_DEBUG,("starting try_delete_records of %u records for dbid 0x%x\n",
1002 reply->count, reply->db_id));
1005 /* create a blob to send back the records we couldnt delete */
1006 records = (struct ctdb_marshall_buffer *)
1007 talloc_zero_size(outdata,
1008 offsetof(struct ctdb_marshall_buffer, data));
1009 if (records == NULL) {
1010 DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
1013 records->db_id = ctdb_db->db_id;
1016 rec = (struct ctdb_rec_data_old *)&reply->data[0];
1017 for (i=0;i<reply->count;i++) {
1020 key.dptr = &rec->data[0];
1021 key.dsize = rec->keylen;
1022 data.dptr = &rec->data[key.dsize];
1023 data.dsize = rec->datalen;
1025 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
1026 DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record in indata\n"));
1030 /* If we cant delete the record we must add it to the reply
1031 so the lmaster knows it may not purge this record
1033 if (delete_tdb_record(ctdb, ctdb_db, rec) != 0) {
1035 struct ctdb_ltdb_header *hdr;
1037 hdr = (struct ctdb_ltdb_header *)data.dptr;
1038 data.dptr += sizeof(*hdr);
1039 data.dsize -= sizeof(*hdr);
1041 DEBUG(DEBUG_INFO, (__location__ " Failed to vacuum delete record with hash 0x%08x\n", ctdb_hash(&key)));
1043 old_size = talloc_get_size(records);
1044 records = talloc_realloc_size(outdata, records, old_size + rec->length);
1045 if (records == NULL) {
1046 DEBUG(DEBUG_ERR,(__location__ " Failed to expand\n"));
1050 memcpy(old_size+(uint8_t *)records, rec, rec->length);
1053 rec = (struct ctdb_rec_data_old *)(rec->length + (uint8_t *)rec);
1057 *outdata = ctdb_marshall_finish(records);
1063 * Store a record as part of the vacuum process:
1064 * This is called from the RECEIVE_RECORD control which
1065 * the lmaster uses to send the current empty copy
1066 * to all nodes for storing, before it lets the other
1067 * nodes delete the records in the second phase with
1068 * the TRY_DELETE_RECORDS control.
1070 * Only store if we are not lmaster or dmaster, and our
1071 * rsn is <= the provided rsn. Use non-blocking locks.
1073 * return 0 if the record was successfully stored.
1074 * return !0 if the record still exists in the tdb after returning.
1076 static int store_tdb_record(struct ctdb_context *ctdb,
1077 struct ctdb_db_context *ctdb_db,
1078 struct ctdb_rec_data_old *rec)
1080 TDB_DATA key, data, data2;
1081 struct ctdb_ltdb_header *hdr, *hdr2;
1084 key.dsize = rec->keylen;
1085 key.dptr = &rec->data[0];
1086 data.dsize = rec->datalen;
1087 data.dptr = &rec->data[rec->keylen];
1089 if (ctdb_lmaster(ctdb, &key) == ctdb->pnn) {
1090 DEBUG(DEBUG_INFO, (__location__ " Called store_tdb_record "
1091 "where we are lmaster\n"));
1095 if (data.dsize != sizeof(struct ctdb_ltdb_header)) {
1096 DEBUG(DEBUG_ERR, (__location__ " Bad record size\n"));
1100 hdr = (struct ctdb_ltdb_header *)data.dptr;
1102 /* use a non-blocking lock */
1103 if (tdb_chainlock_nonblock(ctdb_db->ltdb->tdb, key) != 0) {
1104 DEBUG(DEBUG_INFO, (__location__ " Failed to lock chain in non-blocking mode\n"));
1108 data2 = tdb_fetch(ctdb_db->ltdb->tdb, key);
1109 if (data2.dptr == NULL || data2.dsize < sizeof(struct ctdb_ltdb_header)) {
1110 if (tdb_store(ctdb_db->ltdb->tdb, key, data, 0) == -1) {
1111 DEBUG(DEBUG_ERR, (__location__ "Failed to store record\n"));
1115 DEBUG(DEBUG_INFO, (__location__ " Stored record\n"));
1120 hdr2 = (struct ctdb_ltdb_header *)data2.dptr;
1122 if (hdr2->rsn > hdr->rsn) {
1123 DEBUG(DEBUG_INFO, (__location__ " Skipping record with "
1124 "rsn=%llu - called with rsn=%llu\n",
1125 (unsigned long long)hdr2->rsn,
1126 (unsigned long long)hdr->rsn));
1131 /* do not allow vacuuming of records that have readonly flags set. */
1132 if (hdr->flags & CTDB_REC_RO_FLAGS) {
1133 DEBUG(DEBUG_INFO,(__location__ " Skipping record with readonly "
1138 if (hdr2->flags & CTDB_REC_RO_FLAGS) {
1139 DEBUG(DEBUG_INFO,(__location__ " Skipping record with readonly "
1145 if (hdr2->dmaster == ctdb->pnn) {
1146 DEBUG(DEBUG_INFO, (__location__ " Attempted to store record "
1147 "where we are the dmaster\n"));
1152 if (tdb_store(ctdb_db->ltdb->tdb, key, data, 0) != 0) {
1153 DEBUG(DEBUG_INFO,(__location__ " Failed to store record\n"));
1161 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
1169 * Try to store all these records as part of the vacuuming process
1170 * and return the records we failed to store.
1172 int32_t ctdb_control_receive_records(struct ctdb_context *ctdb,
1173 TDB_DATA indata, TDB_DATA *outdata)
1175 struct ctdb_marshall_buffer *reply = (struct ctdb_marshall_buffer *)indata.dptr;
1176 struct ctdb_db_context *ctdb_db;
1178 struct ctdb_rec_data_old *rec;
1179 struct ctdb_marshall_buffer *records;
1181 if (indata.dsize < offsetof(struct ctdb_marshall_buffer, data)) {
1183 (__location__ " invalid data in receive_records\n"));
1187 ctdb_db = find_ctdb_db(ctdb, reply->db_id);
1189 DEBUG(DEBUG_ERR, (__location__ " Unknown db 0x%08x\n",
1194 DEBUG(DEBUG_DEBUG, ("starting receive_records of %u records for "
1195 "dbid 0x%x\n", reply->count, reply->db_id));
1197 /* create a blob to send back the records we could not store */
1198 records = (struct ctdb_marshall_buffer *)
1199 talloc_zero_size(outdata,
1200 offsetof(struct ctdb_marshall_buffer, data));
1201 if (records == NULL) {
1202 DEBUG(DEBUG_ERR, (__location__ " Out of memory\n"));
1205 records->db_id = ctdb_db->db_id;
1207 rec = (struct ctdb_rec_data_old *)&reply->data[0];
1208 for (i=0; i<reply->count; i++) {
1211 key.dptr = &rec->data[0];
1212 key.dsize = rec->keylen;
1213 data.dptr = &rec->data[key.dsize];
1214 data.dsize = rec->datalen;
1216 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
1217 DEBUG(DEBUG_CRIT, (__location__ " bad ltdb record "
1223 * If we can not store the record we must add it to the reply
1224 * so the lmaster knows it may not purge this record.
1226 if (store_tdb_record(ctdb, ctdb_db, rec) != 0) {
1228 struct ctdb_ltdb_header *hdr;
1230 hdr = (struct ctdb_ltdb_header *)data.dptr;
1231 data.dptr += sizeof(*hdr);
1232 data.dsize -= sizeof(*hdr);
1234 DEBUG(DEBUG_INFO, (__location__ " Failed to store "
1235 "record with hash 0x%08x in vacuum "
1236 "via RECEIVE_RECORDS\n",
1239 old_size = talloc_get_size(records);
1240 records = talloc_realloc_size(outdata, records,
1241 old_size + rec->length);
1242 if (records == NULL) {
1243 DEBUG(DEBUG_ERR, (__location__ " Failed to "
1248 memcpy(old_size+(uint8_t *)records, rec, rec->length);
1251 rec = (struct ctdb_rec_data_old *)(rec->length + (uint8_t *)rec);
1254 *outdata = ctdb_marshall_finish(records);
1263 int32_t ctdb_control_get_capabilities(struct ctdb_context *ctdb, TDB_DATA *outdata)
1265 uint32_t *capabilities = NULL;
1267 capabilities = talloc(outdata, uint32_t);
1268 CTDB_NO_MEMORY(ctdb, capabilities);
1269 *capabilities = ctdb->capabilities;
1271 outdata->dsize = sizeof(uint32_t);
1272 outdata->dptr = (uint8_t *)capabilities;
1277 /* The recovery daemon will ping us at regular intervals.
1278 If we havent been pinged for a while we assume the recovery
1279 daemon is inoperable and we restart.
1281 static void ctdb_recd_ping_timeout(struct tevent_context *ev,
1282 struct tevent_timer *te,
1283 struct timeval t, void *p)
1285 struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context);
1286 uint32_t *count = talloc_get_type(ctdb->recd_ping_count, uint32_t);
1288 DEBUG(DEBUG_ERR, ("Recovery daemon ping timeout. Count : %u\n", *count));
1290 if (*count < ctdb->tunable.recd_ping_failcount) {
1292 tevent_add_timer(ctdb->ev, ctdb->recd_ping_count,
1293 timeval_current_ofs(ctdb->tunable.recd_ping_timeout, 0),
1294 ctdb_recd_ping_timeout, ctdb);
1298 DEBUG(DEBUG_ERR, ("Final timeout for recovery daemon ping. Restarting recovery daemon. (This can be caused if the cluster filesystem has hung)\n"));
1300 ctdb_stop_recoverd(ctdb);
1301 ctdb_start_recoverd(ctdb);
1304 int32_t ctdb_control_recd_ping(struct ctdb_context *ctdb)
1306 talloc_free(ctdb->recd_ping_count);
1308 ctdb->recd_ping_count = talloc_zero(ctdb, uint32_t);
1309 CTDB_NO_MEMORY(ctdb, ctdb->recd_ping_count);
1311 if (ctdb->tunable.recd_ping_timeout != 0) {
1312 tevent_add_timer(ctdb->ev, ctdb->recd_ping_count,
1313 timeval_current_ofs(ctdb->tunable.recd_ping_timeout, 0),
1314 ctdb_recd_ping_timeout, ctdb);
1322 int32_t ctdb_control_set_recmaster(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata)
1324 uint32_t new_recmaster;
1326 CHECK_CONTROL_DATA_SIZE(sizeof(uint32_t));
1327 new_recmaster = ((uint32_t *)(&indata.dptr[0]))[0];
1329 if (ctdb->pnn != new_recmaster && ctdb->recovery_master == ctdb->pnn) {
1331 ("This node (%u) is no longer the recovery master\n", ctdb->pnn));
1334 if (ctdb->pnn == new_recmaster && ctdb->recovery_master != new_recmaster) {
1336 ("This node (%u) is now the recovery master\n", ctdb->pnn));
1339 ctdb->recovery_master = new_recmaster;
1344 int32_t ctdb_control_stop_node(struct ctdb_context *ctdb)
1346 DEBUG(DEBUG_NOTICE, ("Stopping node\n"));
1347 ctdb_disable_monitoring(ctdb);
1348 ctdb->nodes[ctdb->pnn]->flags |= NODE_FLAGS_STOPPED;
1353 int32_t ctdb_control_continue_node(struct ctdb_context *ctdb)
1355 DEBUG(DEBUG_NOTICE, ("Continue node\n"));
1356 ctdb->nodes[ctdb->pnn]->flags &= ~NODE_FLAGS_STOPPED;