4 Copyright (C) Andrew Tridgell 2007
5 Copyright (C) Ronnie Sahlberg 2007
7 This program is free software; you can redistribute it and/or modify
8 it under the terms of the GNU General Public License as published by
9 the Free Software Foundation; either version 3 of the License, or
10 (at your option) any later version.
12 This program is distributed in the hope that it will be useful,
13 but WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 GNU General Public License for more details.
17 You should have received a copy of the GNU General Public License
18 along with this program; if not, see <http://www.gnu.org/licenses/>.
22 #include "system/time.h"
23 #include "system/network.h"
24 #include "system/filesys.h"
25 #include "system/wait.h"
26 #include "../include/ctdb_private.h"
27 #include "lib/util/dlinklist.h"
28 #include "lib/tdb_wrap/tdb_wrap.h"
32 ctdb_control_getvnnmap(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
34 struct ctdb_vnn_map_wire *map;
37 CHECK_CONTROL_DATA_SIZE(0);
39 len = offsetof(struct ctdb_vnn_map_wire, map) + sizeof(uint32_t)*ctdb->vnn_map->size;
40 map = talloc_size(outdata, len);
41 CTDB_NO_MEMORY(ctdb, map);
43 map->generation = ctdb->vnn_map->generation;
44 map->size = ctdb->vnn_map->size;
45 memcpy(map->map, ctdb->vnn_map->map, sizeof(uint32_t)*map->size);
48 outdata->dptr = (uint8_t *)map;
54 ctdb_control_setvnnmap(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
56 struct ctdb_vnn_map_wire *map = (struct ctdb_vnn_map_wire *)indata.dptr;
58 if (!ctdb_db_all_frozen(ctdb)) {
59 DEBUG(DEBUG_ERR,("Attempt to set vnnmap when not frozen\n"));
63 talloc_free(ctdb->vnn_map);
65 ctdb->vnn_map = talloc(ctdb, struct ctdb_vnn_map);
66 CTDB_NO_MEMORY(ctdb, ctdb->vnn_map);
68 ctdb->vnn_map->generation = map->generation;
69 ctdb->vnn_map->size = map->size;
70 ctdb->vnn_map->map = talloc_array(ctdb->vnn_map, uint32_t, map->size);
71 CTDB_NO_MEMORY(ctdb, ctdb->vnn_map->map);
73 memcpy(ctdb->vnn_map->map, map->map, sizeof(uint32_t)*map->size);
79 ctdb_control_getdbmap(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
82 struct ctdb_db_context *ctdb_db;
83 struct ctdb_dbid_map *dbid_map;
85 CHECK_CONTROL_DATA_SIZE(0);
88 for(ctdb_db=ctdb->db_list;ctdb_db;ctdb_db=ctdb_db->next){
93 outdata->dsize = offsetof(struct ctdb_dbid_map, dbs) + sizeof(dbid_map->dbs[0])*len;
94 outdata->dptr = (unsigned char *)talloc_zero_size(outdata, outdata->dsize);
96 DEBUG(DEBUG_ALERT, (__location__ " Failed to allocate dbmap array\n"));
100 dbid_map = (struct ctdb_dbid_map *)outdata->dptr;
102 for (i=0,ctdb_db=ctdb->db_list;ctdb_db;i++,ctdb_db=ctdb_db->next){
103 dbid_map->dbs[i].dbid = ctdb_db->db_id;
104 if (ctdb_db->persistent != 0) {
105 dbid_map->dbs[i].flags |= CTDB_DB_FLAGS_PERSISTENT;
107 if (ctdb_db->readonly != 0) {
108 dbid_map->dbs[i].flags |= CTDB_DB_FLAGS_READONLY;
110 if (ctdb_db->sticky != 0) {
111 dbid_map->dbs[i].flags |= CTDB_DB_FLAGS_STICKY;
119 ctdb_control_getnodemap(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
121 CHECK_CONTROL_DATA_SIZE(0);
123 outdata->dptr = (unsigned char *)ctdb_node_list_to_map(ctdb->nodes,
126 if (outdata->dptr == NULL) {
130 outdata->dsize = talloc_get_size(outdata->dptr);
136 reload the nodes file
139 ctdb_control_reload_nodes_file(struct ctdb_context *ctdb, uint32_t opcode)
143 struct ctdb_node **nodes;
145 tmp_ctx = talloc_new(ctdb);
147 /* steal the old nodes file for a while */
148 talloc_steal(tmp_ctx, ctdb->nodes);
151 num_nodes = ctdb->num_nodes;
154 /* load the new nodes file */
155 ctdb_load_nodes_file(ctdb);
157 for (i=0; i<ctdb->num_nodes; i++) {
158 /* keep any identical pre-existing nodes and connections */
159 if ((i < num_nodes) && ctdb_same_address(&ctdb->nodes[i]->address, &nodes[i]->address)) {
160 talloc_free(ctdb->nodes[i]);
161 ctdb->nodes[i] = talloc_steal(ctdb->nodes, nodes[i]);
165 if (ctdb->nodes[i]->flags & NODE_FLAGS_DELETED) {
169 /* any new or different nodes must be added */
170 if (ctdb->methods->add_node(ctdb->nodes[i]) != 0) {
171 DEBUG(DEBUG_CRIT, (__location__ " methods->add_node failed at %d\n", i));
172 ctdb_fatal(ctdb, "failed to add node. shutting down\n");
174 if (ctdb->methods->connect_node(ctdb->nodes[i]) != 0) {
175 DEBUG(DEBUG_CRIT, (__location__ " methods->add_connect failed at %d\n", i));
176 ctdb_fatal(ctdb, "failed to connect to node. shutting down\n");
180 /* tell the recovery daemon to reaload the nodes file too */
181 ctdb_daemon_send_message(ctdb, ctdb->pnn, CTDB_SRVID_RELOAD_NODES, tdb_null);
183 talloc_free(tmp_ctx);
189 a traverse function for pulling all relevent records from pulldb
192 struct ctdb_context *ctdb;
193 struct ctdb_db_context *ctdb_db;
194 struct ctdb_marshall_buffer *pulldata;
196 uint32_t allocated_len;
200 static int traverse_pulldb(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p)
202 struct pulldb_data *params = (struct pulldb_data *)p;
203 struct ctdb_rec_data *rec;
204 struct ctdb_context *ctdb = params->ctdb;
205 struct ctdb_db_context *ctdb_db = params->ctdb_db;
207 /* add the record to the blob */
208 rec = ctdb_marshall_record(params->pulldata, 0, key, NULL, data);
210 params->failed = true;
213 if (params->len + rec->length >= params->allocated_len) {
214 params->allocated_len = rec->length + params->len + ctdb->tunable.pulldb_preallocation_size;
215 params->pulldata = talloc_realloc_size(NULL, params->pulldata, params->allocated_len);
217 if (params->pulldata == NULL) {
218 DEBUG(DEBUG_CRIT,(__location__ " Failed to expand pulldb_data to %u\n", rec->length + params->len));
219 ctdb_fatal(params->ctdb, "failed to allocate memory for recovery. shutting down\n");
221 params->pulldata->count++;
222 memcpy(params->len+(uint8_t *)params->pulldata, rec, rec->length);
223 params->len += rec->length;
225 if (ctdb->tunable.db_record_size_warn != 0 && rec->length > ctdb->tunable.db_record_size_warn) {
226 DEBUG(DEBUG_ERR,("Data record in %s is big. Record size is %d bytes\n", ctdb_db->db_name, (int)rec->length));
235 pull a bunch of records from a ltdb, filtering by lmaster
237 int32_t ctdb_control_pull_db(struct ctdb_context *ctdb, TDB_DATA indata, TDB_DATA *outdata)
239 struct ctdb_control_pulldb *pull;
240 struct ctdb_db_context *ctdb_db;
241 struct pulldb_data params;
242 struct ctdb_marshall_buffer *reply;
244 pull = (struct ctdb_control_pulldb *)indata.dptr;
246 ctdb_db = find_ctdb_db(ctdb, pull->db_id);
248 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", pull->db_id));
252 if (!ctdb_db_prio_frozen(ctdb, ctdb_db->priority)) {
253 DEBUG(DEBUG_DEBUG,("rejecting ctdb_control_pull_db when not frozen\n"));
257 reply = talloc_zero(outdata, struct ctdb_marshall_buffer);
258 CTDB_NO_MEMORY(ctdb, reply);
260 reply->db_id = pull->db_id;
263 params.ctdb_db = ctdb_db;
264 params.pulldata = reply;
265 params.len = offsetof(struct ctdb_marshall_buffer, data);
266 params.allocated_len = params.len;
267 params.failed = false;
269 if (ctdb_db->unhealthy_reason) {
270 /* this is just a warning, as the tdb should be empty anyway */
271 DEBUG(DEBUG_WARNING,("db(%s) unhealty in ctdb_control_pull_db: %s\n",
272 ctdb_db->db_name, ctdb_db->unhealthy_reason));
275 if (ctdb_lockall_mark_prio(ctdb, ctdb_db->priority) != 0) {
276 DEBUG(DEBUG_ERR,(__location__ " Failed to get lock on entired db - failing\n"));
280 if (tdb_traverse_read(ctdb_db->ltdb->tdb, traverse_pulldb, ¶ms) == -1) {
281 DEBUG(DEBUG_ERR,(__location__ " Failed to get traverse db '%s'\n", ctdb_db->db_name));
282 ctdb_lockall_unmark_prio(ctdb, ctdb_db->priority);
283 talloc_free(params.pulldata);
287 ctdb_lockall_unmark_prio(ctdb, ctdb_db->priority);
289 outdata->dptr = (uint8_t *)params.pulldata;
290 outdata->dsize = params.len;
292 if (ctdb->tunable.db_record_count_warn != 0 && params.pulldata->count > ctdb->tunable.db_record_count_warn) {
293 DEBUG(DEBUG_ERR,("Database %s is big. Contains %d records\n", ctdb_db->db_name, params.pulldata->count));
295 if (ctdb->tunable.db_size_warn != 0 && outdata->dsize > ctdb->tunable.db_size_warn) {
296 DEBUG(DEBUG_ERR,("Database %s is big. Contains %d bytes\n", ctdb_db->db_name, (int)outdata->dsize));
304 push a bunch of records into a ltdb, filtering by rsn
306 int32_t ctdb_control_push_db(struct ctdb_context *ctdb, TDB_DATA indata)
308 struct ctdb_marshall_buffer *reply = (struct ctdb_marshall_buffer *)indata.dptr;
309 struct ctdb_db_context *ctdb_db;
311 struct ctdb_rec_data *rec;
313 if (indata.dsize < offsetof(struct ctdb_marshall_buffer, data)) {
314 DEBUG(DEBUG_ERR,(__location__ " invalid data in pulldb reply\n"));
318 ctdb_db = find_ctdb_db(ctdb, reply->db_id);
320 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", reply->db_id));
324 if (!ctdb_db_prio_frozen(ctdb, ctdb_db->priority)) {
325 DEBUG(DEBUG_DEBUG,("rejecting ctdb_control_push_db when not frozen\n"));
329 if (ctdb_lockall_mark_prio(ctdb, ctdb_db->priority) != 0) {
330 DEBUG(DEBUG_ERR,(__location__ " Failed to get lock on entired db - failing\n"));
334 rec = (struct ctdb_rec_data *)&reply->data[0];
336 DEBUG(DEBUG_INFO,("starting push of %u records for dbid 0x%x\n",
337 reply->count, reply->db_id));
339 for (i=0;i<reply->count;i++) {
341 struct ctdb_ltdb_header *hdr;
343 key.dptr = &rec->data[0];
344 key.dsize = rec->keylen;
345 data.dptr = &rec->data[key.dsize];
346 data.dsize = rec->datalen;
348 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
349 DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record\n"));
352 hdr = (struct ctdb_ltdb_header *)data.dptr;
353 /* strip off any read only record flags. All readonly records
354 are revoked implicitely by a recovery
356 hdr->flags &= ~CTDB_REC_RO_FLAGS;
358 data.dptr += sizeof(*hdr);
359 data.dsize -= sizeof(*hdr);
361 ret = ctdb_ltdb_store(ctdb_db, key, hdr, data);
363 DEBUG(DEBUG_CRIT, (__location__ " Unable to store record\n"));
367 rec = (struct ctdb_rec_data *)(rec->length + (uint8_t *)rec);
370 DEBUG(DEBUG_DEBUG,("finished push of %u records for dbid 0x%x\n",
371 reply->count, reply->db_id));
373 if (ctdb_db->readonly) {
374 DEBUG(DEBUG_CRIT,("Clearing the tracking database for dbid 0x%x\n",
376 if (tdb_wipe_all(ctdb_db->rottdb) != 0) {
377 DEBUG(DEBUG_ERR,("Failed to wipe tracking database for 0x%x. Dropping read-only delegation support\n", ctdb_db->db_id));
378 ctdb_db->readonly = false;
379 tdb_close(ctdb_db->rottdb);
380 ctdb_db->rottdb = NULL;
381 ctdb_db->readonly = false;
383 while (ctdb_db->revokechild_active != NULL) {
384 talloc_free(ctdb_db->revokechild_active);
388 ctdb_lockall_unmark_prio(ctdb, ctdb_db->priority);
392 ctdb_lockall_unmark_prio(ctdb, ctdb_db->priority);
396 struct ctdb_set_recmode_state {
397 struct ctdb_context *ctdb;
398 struct ctdb_req_control *c;
401 struct timed_event *te;
402 struct fd_event *fde;
404 struct timeval start_time;
408 called if our set_recmode child times out. this would happen if
409 ctdb_recovery_lock() would block.
411 static void ctdb_set_recmode_timeout(struct event_context *ev, struct timed_event *te,
412 struct timeval t, void *private_data)
414 struct ctdb_set_recmode_state *state = talloc_get_type(private_data,
415 struct ctdb_set_recmode_state);
417 /* we consider this a success, not a failure, as we failed to
418 set the recovery lock which is what we wanted. This can be
419 caused by the cluster filesystem being very slow to
420 arbitrate locks immediately after a node failure.
422 DEBUG(DEBUG_ERR,(__location__ " set_recmode child process hung/timedout CFS slow to grant locks? (allowing recmode set anyway)\n"));
423 state->ctdb->recovery_mode = state->recmode;
424 ctdb_request_control_reply(state->ctdb, state->c, NULL, 0, NULL);
429 /* when we free the recmode state we must kill any child process.
431 static int set_recmode_destructor(struct ctdb_set_recmode_state *state)
433 double l = timeval_elapsed(&state->start_time);
435 CTDB_UPDATE_RECLOCK_LATENCY(state->ctdb, "daemon reclock", reclock.ctdbd, l);
437 if (state->fd[0] != -1) {
440 if (state->fd[1] != -1) {
443 ctdb_kill(state->ctdb, state->child, SIGKILL);
447 /* this is called when the client process has completed ctdb_recovery_lock()
448 and has written data back to us through the pipe.
450 static void set_recmode_handler(struct event_context *ev, struct fd_event *fde,
451 uint16_t flags, void *private_data)
453 struct ctdb_set_recmode_state *state= talloc_get_type(private_data,
454 struct ctdb_set_recmode_state);
458 /* we got a response from our child process so we can abort the
461 talloc_free(state->te);
465 /* If, as expected, the child was unable to take the recovery
466 * lock then it will have written 0 into the pipe, so
467 * continue. However, any other value (e.g. 1) indicates that
468 * it was able to take the recovery lock when it should have
469 * been held by the recovery daemon on the recovery master.
471 ret = sys_read(state->fd[0], &c, 1);
472 if (ret != 1 || c != 0) {
473 ctdb_request_control_reply(
474 state->ctdb, state->c, NULL, -1,
475 "Took recovery lock from daemon during recovery - probably a cluster filesystem lock coherence problem");
480 state->ctdb->recovery_mode = state->recmode;
482 /* release any deferred attach calls from clients */
483 if (state->recmode == CTDB_RECOVERY_NORMAL) {
484 ctdb_process_deferred_attach(state->ctdb);
487 ctdb_request_control_reply(state->ctdb, state->c, NULL, 0, NULL);
493 ctdb_drop_all_ips_event(struct event_context *ev, struct timed_event *te,
494 struct timeval t, void *private_data)
496 struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
498 DEBUG(DEBUG_ERR,(__location__ " Been in recovery mode for too long. Dropping all IPS\n"));
499 talloc_free(ctdb->release_ips_ctx);
500 ctdb->release_ips_ctx = NULL;
502 ctdb_release_all_ips(ctdb);
506 * Set up an event to drop all public ips if we remain in recovery for too
509 int ctdb_deferred_drop_all_ips(struct ctdb_context *ctdb)
511 if (ctdb->release_ips_ctx != NULL) {
512 talloc_free(ctdb->release_ips_ctx);
514 ctdb->release_ips_ctx = talloc_new(ctdb);
515 CTDB_NO_MEMORY(ctdb, ctdb->release_ips_ctx);
517 event_add_timed(ctdb->ev, ctdb->release_ips_ctx, timeval_current_ofs(ctdb->tunable.recovery_drop_all_ips, 0), ctdb_drop_all_ips_event, ctdb);
522 set the recovery mode
524 int32_t ctdb_control_set_recmode(struct ctdb_context *ctdb,
525 struct ctdb_req_control *c,
526 TDB_DATA indata, bool *async_reply,
527 const char **errormsg)
529 uint32_t recmode = *(uint32_t *)indata.dptr;
531 struct ctdb_set_recmode_state *state;
532 pid_t parent = getpid();
534 /* if we enter recovery but stay in recovery for too long
535 we will eventually drop all our ip addresses
537 if (recmode == CTDB_RECOVERY_NORMAL) {
538 talloc_free(ctdb->release_ips_ctx);
539 ctdb->release_ips_ctx = NULL;
541 if (ctdb_deferred_drop_all_ips(ctdb) != 0) {
542 DEBUG(DEBUG_ERR,("Failed to set up deferred drop all ips\n"));
546 if (recmode != ctdb->recovery_mode) {
547 DEBUG(DEBUG_NOTICE,(__location__ " Recovery mode set to %s\n",
548 recmode==CTDB_RECOVERY_NORMAL?"NORMAL":"ACTIVE"));
551 if (recmode != CTDB_RECOVERY_NORMAL ||
552 ctdb->recovery_mode != CTDB_RECOVERY_ACTIVE) {
553 ctdb->recovery_mode = recmode;
557 /* some special handling when ending recovery mode */
559 /* force the databases to thaw */
560 for (i=1; i<=NUM_DB_PRIORITIES; i++) {
561 if (ctdb_db_prio_frozen(ctdb, i)) {
562 ctdb_control_thaw(ctdb, i, false);
566 state = talloc(ctdb, struct ctdb_set_recmode_state);
567 CTDB_NO_MEMORY(ctdb, state);
569 state->start_time = timeval_current();
573 /* release any deferred attach calls from clients */
574 if (recmode == CTDB_RECOVERY_NORMAL) {
575 ctdb_process_deferred_attach(ctdb);
578 if (ctdb->recovery_lock_file == NULL) {
579 /* Not using recovery lock file */
580 ctdb->recovery_mode = recmode;
584 /* For the rest of what needs to be done, we need to do this in
585 a child process since
586 1, the call to ctdb_recovery_lock() can block if the cluster
587 filesystem is in the process of recovery.
589 ret = pipe(state->fd);
592 DEBUG(DEBUG_CRIT,(__location__ " Failed to open pipe for set_recmode child\n"));
596 state->child = ctdb_fork(ctdb);
597 if (state->child == (pid_t)-1) {
604 if (state->child == 0) {
608 ctdb_set_process_name("ctdb_recmode");
609 debug_extra = talloc_asprintf(NULL, "set_recmode:");
610 /* Daemon should not be able to get the recover lock,
611 * as it should be held by the recovery master */
612 if (ctdb_recovery_lock(ctdb)) {
614 ("ERROR: Daemon able to take recovery lock on \"%s\" during recovery\n",
615 ctdb->recovery_lock_file));
616 ctdb_recovery_unlock(ctdb);
620 sys_write(state->fd[1], &cc, 1);
621 /* make sure we die when our parent dies */
622 while (ctdb_kill(ctdb, parent, 0) == 0 || errno != ESRCH) {
624 sys_write(state->fd[1], &cc, 1);
629 set_close_on_exec(state->fd[0]);
633 talloc_set_destructor(state, set_recmode_destructor);
635 DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d for setrecmode\n", state->fd[0]));
637 state->te = event_add_timed(ctdb->ev, state, timeval_current_ofs(5, 0),
638 ctdb_set_recmode_timeout, state);
640 state->fde = event_add_fd(ctdb->ev, state, state->fd[0],
645 if (state->fde == NULL) {
649 tevent_fd_set_auto_close(state->fde);
652 state->recmode = recmode;
653 state->c = talloc_steal(state, c);
661 bool ctdb_recovery_have_lock(struct ctdb_context *ctdb)
663 return ctdb->recovery_lock_fd != -1;
667 try and get the recovery lock in shared storage - should only work
668 on the recovery master recovery daemon. Anywhere else is a bug
670 bool ctdb_recovery_lock(struct ctdb_context *ctdb)
674 ctdb->recovery_lock_fd = open(ctdb->recovery_lock_file,
675 O_RDWR|O_CREAT, 0600);
676 if (ctdb->recovery_lock_fd == -1) {
678 ("ctdb_recovery_lock: Unable to open %s - (%s)\n",
679 ctdb->recovery_lock_file, strerror(errno)));
683 set_close_on_exec(ctdb->recovery_lock_fd);
685 lock.l_type = F_WRLCK;
686 lock.l_whence = SEEK_SET;
691 if (fcntl(ctdb->recovery_lock_fd, F_SETLK, &lock) != 0) {
692 int saved_errno = errno;
693 close(ctdb->recovery_lock_fd);
694 ctdb->recovery_lock_fd = -1;
695 /* Fail silently on these errors, since they indicate
696 * lock contention, but log an error for any other
698 if (saved_errno != EACCES &&
699 saved_errno != EAGAIN) {
700 DEBUG(DEBUG_ERR,("ctdb_recovery_lock: Failed to get "
701 "recovery lock on '%s' - (%s)\n",
702 ctdb->recovery_lock_file,
703 strerror(saved_errno)));
711 void ctdb_recovery_unlock(struct ctdb_context *ctdb)
713 if (ctdb->recovery_lock_fd != -1) {
714 DEBUG(DEBUG_NOTICE, ("Releasing recovery lock\n"));
715 close(ctdb->recovery_lock_fd);
716 ctdb->recovery_lock_fd = -1;
721 delete a record as part of the vacuum process
722 only delete if we are not lmaster or dmaster, and our rsn is <= the provided rsn
723 use non-blocking locks
725 return 0 if the record was successfully deleted (i.e. it does not exist
726 when the function returns)
727 or !0 is the record still exists in the tdb after returning.
729 static int delete_tdb_record(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, struct ctdb_rec_data *rec)
731 TDB_DATA key, data, data2;
732 struct ctdb_ltdb_header *hdr, *hdr2;
734 /* these are really internal tdb functions - but we need them here for
735 non-blocking lock of the freelist */
736 int tdb_lock_nonblock(struct tdb_context *tdb, int list, int ltype);
737 int tdb_unlock(struct tdb_context *tdb, int list, int ltype);
740 key.dsize = rec->keylen;
741 key.dptr = &rec->data[0];
742 data.dsize = rec->datalen;
743 data.dptr = &rec->data[rec->keylen];
745 if (ctdb_lmaster(ctdb, &key) == ctdb->pnn) {
746 DEBUG(DEBUG_INFO,(__location__ " Called delete on record where we are lmaster\n"));
750 if (data.dsize != sizeof(struct ctdb_ltdb_header)) {
751 DEBUG(DEBUG_ERR,(__location__ " Bad record size\n"));
755 hdr = (struct ctdb_ltdb_header *)data.dptr;
757 /* use a non-blocking lock */
758 if (tdb_chainlock_nonblock(ctdb_db->ltdb->tdb, key) != 0) {
762 data2 = tdb_fetch(ctdb_db->ltdb->tdb, key);
763 if (data2.dptr == NULL) {
764 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
768 if (data2.dsize < sizeof(struct ctdb_ltdb_header)) {
769 if (tdb_lock_nonblock(ctdb_db->ltdb->tdb, -1, F_WRLCK) == 0) {
770 if (tdb_delete(ctdb_db->ltdb->tdb, key) != 0) {
771 DEBUG(DEBUG_CRIT,(__location__ " Failed to delete corrupt record\n"));
773 tdb_unlock(ctdb_db->ltdb->tdb, -1, F_WRLCK);
774 DEBUG(DEBUG_CRIT,(__location__ " Deleted corrupt record\n"));
776 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
781 hdr2 = (struct ctdb_ltdb_header *)data2.dptr;
783 if (hdr2->rsn > hdr->rsn) {
784 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
785 DEBUG(DEBUG_INFO,(__location__ " Skipping record with rsn=%llu - called with rsn=%llu\n",
786 (unsigned long long)hdr2->rsn, (unsigned long long)hdr->rsn));
791 /* do not allow deleting record that have readonly flags set. */
792 if (hdr->flags & CTDB_REC_RO_FLAGS) {
793 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
794 DEBUG(DEBUG_INFO,(__location__ " Skipping record with readonly flags set\n"));
798 if (hdr2->flags & CTDB_REC_RO_FLAGS) {
799 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
800 DEBUG(DEBUG_INFO,(__location__ " Skipping record with readonly flags set\n"));
805 if (hdr2->dmaster == ctdb->pnn) {
806 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
807 DEBUG(DEBUG_INFO,(__location__ " Attempted delete record where we are the dmaster\n"));
812 if (tdb_lock_nonblock(ctdb_db->ltdb->tdb, -1, F_WRLCK) != 0) {
813 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
818 if (tdb_delete(ctdb_db->ltdb->tdb, key) != 0) {
819 tdb_unlock(ctdb_db->ltdb->tdb, -1, F_WRLCK);
820 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
821 DEBUG(DEBUG_INFO,(__location__ " Failed to delete record\n"));
826 tdb_unlock(ctdb_db->ltdb->tdb, -1, F_WRLCK);
827 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
834 struct recovery_callback_state {
835 struct ctdb_req_control *c;
840 called when the 'recovered' event script has finished
842 static void ctdb_end_recovery_callback(struct ctdb_context *ctdb, int status, void *p)
844 struct recovery_callback_state *state = talloc_get_type(p, struct recovery_callback_state);
846 ctdb_enable_monitoring(ctdb);
847 CTDB_INCREMENT_STAT(ctdb, num_recoveries);
850 DEBUG(DEBUG_ERR,(__location__ " recovered event script failed (status %d)\n", status));
851 if (status == -ETIME) {
856 ctdb_request_control_reply(ctdb, state->c, NULL, status, NULL);
859 gettimeofday(&ctdb->last_recovery_finished, NULL);
861 if (ctdb->runstate == CTDB_RUNSTATE_FIRST_RECOVERY) {
862 ctdb_set_runstate(ctdb, CTDB_RUNSTATE_STARTUP);
867 recovery has finished
869 int32_t ctdb_control_end_recovery(struct ctdb_context *ctdb,
870 struct ctdb_req_control *c,
874 struct recovery_callback_state *state;
876 DEBUG(DEBUG_NOTICE,("Recovery has finished\n"));
878 ctdb_persistent_finish_trans3_commits(ctdb);
880 state = talloc(ctdb, struct recovery_callback_state);
881 CTDB_NO_MEMORY(ctdb, state);
885 ctdb_disable_monitoring(ctdb);
887 ret = ctdb_event_script_callback(ctdb, state,
888 ctdb_end_recovery_callback,
890 CTDB_EVENT_RECOVERED, "%s", "");
893 ctdb_enable_monitoring(ctdb);
895 DEBUG(DEBUG_ERR,(__location__ " Failed to end recovery\n"));
900 /* tell the control that we will be reply asynchronously */
901 state->c = talloc_steal(state, c);
907 called when the 'startrecovery' event script has finished
909 static void ctdb_start_recovery_callback(struct ctdb_context *ctdb, int status, void *p)
911 struct recovery_callback_state *state = talloc_get_type(p, struct recovery_callback_state);
914 DEBUG(DEBUG_ERR,(__location__ " startrecovery event script failed (status %d)\n", status));
917 ctdb_request_control_reply(ctdb, state->c, NULL, status, NULL);
922 run the startrecovery eventscript
924 int32_t ctdb_control_start_recovery(struct ctdb_context *ctdb,
925 struct ctdb_req_control *c,
929 struct recovery_callback_state *state;
931 DEBUG(DEBUG_NOTICE,(__location__ " startrecovery eventscript has been invoked\n"));
932 gettimeofday(&ctdb->last_recovery_started, NULL);
934 state = talloc(ctdb, struct recovery_callback_state);
935 CTDB_NO_MEMORY(ctdb, state);
937 state->c = talloc_steal(state, c);
939 ctdb_disable_monitoring(ctdb);
941 ret = ctdb_event_script_callback(ctdb, state,
942 ctdb_start_recovery_callback,
944 CTDB_EVENT_START_RECOVERY,
948 DEBUG(DEBUG_ERR,(__location__ " Failed to start recovery\n"));
953 /* tell the control that we will be reply asynchronously */
959 try to delete all these records as part of the vacuuming process
960 and return the records we failed to delete
962 int32_t ctdb_control_try_delete_records(struct ctdb_context *ctdb, TDB_DATA indata, TDB_DATA *outdata)
964 struct ctdb_marshall_buffer *reply = (struct ctdb_marshall_buffer *)indata.dptr;
965 struct ctdb_db_context *ctdb_db;
967 struct ctdb_rec_data *rec;
968 struct ctdb_marshall_buffer *records;
970 if (indata.dsize < offsetof(struct ctdb_marshall_buffer, data)) {
971 DEBUG(DEBUG_ERR,(__location__ " invalid data in try_delete_records\n"));
975 ctdb_db = find_ctdb_db(ctdb, reply->db_id);
977 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", reply->db_id));
982 DEBUG(DEBUG_DEBUG,("starting try_delete_records of %u records for dbid 0x%x\n",
983 reply->count, reply->db_id));
986 /* create a blob to send back the records we couldnt delete */
987 records = (struct ctdb_marshall_buffer *)
988 talloc_zero_size(outdata,
989 offsetof(struct ctdb_marshall_buffer, data));
990 if (records == NULL) {
991 DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
994 records->db_id = ctdb_db->db_id;
997 rec = (struct ctdb_rec_data *)&reply->data[0];
998 for (i=0;i<reply->count;i++) {
1001 key.dptr = &rec->data[0];
1002 key.dsize = rec->keylen;
1003 data.dptr = &rec->data[key.dsize];
1004 data.dsize = rec->datalen;
1006 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
1007 DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record in indata\n"));
1011 /* If we cant delete the record we must add it to the reply
1012 so the lmaster knows it may not purge this record
1014 if (delete_tdb_record(ctdb, ctdb_db, rec) != 0) {
1016 struct ctdb_ltdb_header *hdr;
1018 hdr = (struct ctdb_ltdb_header *)data.dptr;
1019 data.dptr += sizeof(*hdr);
1020 data.dsize -= sizeof(*hdr);
1022 DEBUG(DEBUG_INFO, (__location__ " Failed to vacuum delete record with hash 0x%08x\n", ctdb_hash(&key)));
1024 old_size = talloc_get_size(records);
1025 records = talloc_realloc_size(outdata, records, old_size + rec->length);
1026 if (records == NULL) {
1027 DEBUG(DEBUG_ERR,(__location__ " Failed to expand\n"));
1031 memcpy(old_size+(uint8_t *)records, rec, rec->length);
1034 rec = (struct ctdb_rec_data *)(rec->length + (uint8_t *)rec);
1038 *outdata = ctdb_marshall_finish(records);
1044 * Store a record as part of the vacuum process:
1045 * This is called from the RECEIVE_RECORD control which
1046 * the lmaster uses to send the current empty copy
1047 * to all nodes for storing, before it lets the other
1048 * nodes delete the records in the second phase with
1049 * the TRY_DELETE_RECORDS control.
1051 * Only store if we are not lmaster or dmaster, and our
1052 * rsn is <= the provided rsn. Use non-blocking locks.
1054 * return 0 if the record was successfully stored.
1055 * return !0 if the record still exists in the tdb after returning.
1057 static int store_tdb_record(struct ctdb_context *ctdb,
1058 struct ctdb_db_context *ctdb_db,
1059 struct ctdb_rec_data *rec)
1061 TDB_DATA key, data, data2;
1062 struct ctdb_ltdb_header *hdr, *hdr2;
1065 key.dsize = rec->keylen;
1066 key.dptr = &rec->data[0];
1067 data.dsize = rec->datalen;
1068 data.dptr = &rec->data[rec->keylen];
1070 if (ctdb_lmaster(ctdb, &key) == ctdb->pnn) {
1071 DEBUG(DEBUG_INFO, (__location__ " Called store_tdb_record "
1072 "where we are lmaster\n"));
1076 if (data.dsize != sizeof(struct ctdb_ltdb_header)) {
1077 DEBUG(DEBUG_ERR, (__location__ " Bad record size\n"));
1081 hdr = (struct ctdb_ltdb_header *)data.dptr;
1083 /* use a non-blocking lock */
1084 if (tdb_chainlock_nonblock(ctdb_db->ltdb->tdb, key) != 0) {
1085 DEBUG(DEBUG_INFO, (__location__ " Failed to lock chain in non-blocking mode\n"));
1089 data2 = tdb_fetch(ctdb_db->ltdb->tdb, key);
1090 if (data2.dptr == NULL || data2.dsize < sizeof(struct ctdb_ltdb_header)) {
1091 if (tdb_store(ctdb_db->ltdb->tdb, key, data, 0) == -1) {
1092 DEBUG(DEBUG_ERR, (__location__ "Failed to store record\n"));
1096 DEBUG(DEBUG_INFO, (__location__ " Stored record\n"));
1101 hdr2 = (struct ctdb_ltdb_header *)data2.dptr;
1103 if (hdr2->rsn > hdr->rsn) {
1104 DEBUG(DEBUG_INFO, (__location__ " Skipping record with "
1105 "rsn=%llu - called with rsn=%llu\n",
1106 (unsigned long long)hdr2->rsn,
1107 (unsigned long long)hdr->rsn));
1112 /* do not allow vacuuming of records that have readonly flags set. */
1113 if (hdr->flags & CTDB_REC_RO_FLAGS) {
1114 DEBUG(DEBUG_INFO,(__location__ " Skipping record with readonly "
1119 if (hdr2->flags & CTDB_REC_RO_FLAGS) {
1120 DEBUG(DEBUG_INFO,(__location__ " Skipping record with readonly "
1126 if (hdr2->dmaster == ctdb->pnn) {
1127 DEBUG(DEBUG_INFO, (__location__ " Attempted to store record "
1128 "where we are the dmaster\n"));
1133 if (tdb_store(ctdb_db->ltdb->tdb, key, data, 0) != 0) {
1134 DEBUG(DEBUG_INFO,(__location__ " Failed to store record\n"));
1142 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
1150 * Try to store all these records as part of the vacuuming process
1151 * and return the records we failed to store.
1153 int32_t ctdb_control_receive_records(struct ctdb_context *ctdb,
1154 TDB_DATA indata, TDB_DATA *outdata)
1156 struct ctdb_marshall_buffer *reply = (struct ctdb_marshall_buffer *)indata.dptr;
1157 struct ctdb_db_context *ctdb_db;
1159 struct ctdb_rec_data *rec;
1160 struct ctdb_marshall_buffer *records;
1162 if (indata.dsize < offsetof(struct ctdb_marshall_buffer, data)) {
1164 (__location__ " invalid data in receive_records\n"));
1168 ctdb_db = find_ctdb_db(ctdb, reply->db_id);
1170 DEBUG(DEBUG_ERR, (__location__ " Unknown db 0x%08x\n",
1175 DEBUG(DEBUG_DEBUG, ("starting receive_records of %u records for "
1176 "dbid 0x%x\n", reply->count, reply->db_id));
1178 /* create a blob to send back the records we could not store */
1179 records = (struct ctdb_marshall_buffer *)
1180 talloc_zero_size(outdata,
1181 offsetof(struct ctdb_marshall_buffer, data));
1182 if (records == NULL) {
1183 DEBUG(DEBUG_ERR, (__location__ " Out of memory\n"));
1186 records->db_id = ctdb_db->db_id;
1188 rec = (struct ctdb_rec_data *)&reply->data[0];
1189 for (i=0; i<reply->count; i++) {
1192 key.dptr = &rec->data[0];
1193 key.dsize = rec->keylen;
1194 data.dptr = &rec->data[key.dsize];
1195 data.dsize = rec->datalen;
1197 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
1198 DEBUG(DEBUG_CRIT, (__location__ " bad ltdb record "
1204 * If we can not store the record we must add it to the reply
1205 * so the lmaster knows it may not purge this record.
1207 if (store_tdb_record(ctdb, ctdb_db, rec) != 0) {
1209 struct ctdb_ltdb_header *hdr;
1211 hdr = (struct ctdb_ltdb_header *)data.dptr;
1212 data.dptr += sizeof(*hdr);
1213 data.dsize -= sizeof(*hdr);
1215 DEBUG(DEBUG_INFO, (__location__ " Failed to store "
1216 "record with hash 0x%08x in vacuum "
1217 "via RECEIVE_RECORDS\n",
1220 old_size = talloc_get_size(records);
1221 records = talloc_realloc_size(outdata, records,
1222 old_size + rec->length);
1223 if (records == NULL) {
1224 DEBUG(DEBUG_ERR, (__location__ " Failed to "
1229 memcpy(old_size+(uint8_t *)records, rec, rec->length);
1232 rec = (struct ctdb_rec_data *)(rec->length + (uint8_t *)rec);
1235 *outdata = ctdb_marshall_finish(records);
1244 int32_t ctdb_control_get_capabilities(struct ctdb_context *ctdb, TDB_DATA *outdata)
1246 uint32_t *capabilities = NULL;
1248 capabilities = talloc(outdata, uint32_t);
1249 CTDB_NO_MEMORY(ctdb, capabilities);
1250 *capabilities = ctdb->capabilities;
1252 outdata->dsize = sizeof(uint32_t);
1253 outdata->dptr = (uint8_t *)capabilities;
1258 /* The recovery daemon will ping us at regular intervals.
1259 If we havent been pinged for a while we assume the recovery
1260 daemon is inoperable and we restart.
1262 static void ctdb_recd_ping_timeout(struct event_context *ev, struct timed_event *te, struct timeval t, void *p)
1264 struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context);
1265 uint32_t *count = talloc_get_type(ctdb->recd_ping_count, uint32_t);
1267 DEBUG(DEBUG_ERR, ("Recovery daemon ping timeout. Count : %u\n", *count));
1269 if (*count < ctdb->tunable.recd_ping_failcount) {
1271 event_add_timed(ctdb->ev, ctdb->recd_ping_count,
1272 timeval_current_ofs(ctdb->tunable.recd_ping_timeout, 0),
1273 ctdb_recd_ping_timeout, ctdb);
1277 DEBUG(DEBUG_ERR, ("Final timeout for recovery daemon ping. Restarting recovery daemon. (This can be caused if the cluster filesystem has hung)\n"));
1279 ctdb_stop_recoverd(ctdb);
1280 ctdb_start_recoverd(ctdb);
1283 int32_t ctdb_control_recd_ping(struct ctdb_context *ctdb)
1285 talloc_free(ctdb->recd_ping_count);
1287 ctdb->recd_ping_count = talloc_zero(ctdb, uint32_t);
1288 CTDB_NO_MEMORY(ctdb, ctdb->recd_ping_count);
1290 if (ctdb->tunable.recd_ping_timeout != 0) {
1291 event_add_timed(ctdb->ev, ctdb->recd_ping_count,
1292 timeval_current_ofs(ctdb->tunable.recd_ping_timeout, 0),
1293 ctdb_recd_ping_timeout, ctdb);
1301 int32_t ctdb_control_set_recmaster(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata)
1303 uint32_t new_recmaster;
1305 CHECK_CONTROL_DATA_SIZE(sizeof(uint32_t));
1306 new_recmaster = ((uint32_t *)(&indata.dptr[0]))[0];
1308 if (ctdb->pnn != new_recmaster && ctdb->recovery_master == ctdb->pnn) {
1310 ("This node (%u) is no longer the recovery master\n", ctdb->pnn));
1313 if (ctdb->pnn == new_recmaster && ctdb->recovery_master != new_recmaster) {
1315 ("This node (%u) is now the recovery master\n", ctdb->pnn));
1318 ctdb->recovery_master = new_recmaster;
1323 int32_t ctdb_control_stop_node(struct ctdb_context *ctdb)
1325 DEBUG(DEBUG_NOTICE, ("Stopping node\n"));
1326 ctdb_disable_monitoring(ctdb);
1327 ctdb->nodes[ctdb->pnn]->flags |= NODE_FLAGS_STOPPED;
1332 int32_t ctdb_control_continue_node(struct ctdb_context *ctdb)
1334 DEBUG(DEBUG_NOTICE, ("Continue node\n"));
1335 ctdb->nodes[ctdb->pnn]->flags &= ~NODE_FLAGS_STOPPED;