4 Copyright (C) Andrew Tridgell 2007
5 Copyright (C) Ronnie Sahlberg 2007
7 This program is free software; you can redistribute it and/or modify
8 it under the terms of the GNU General Public License as published by
9 the Free Software Foundation; either version 3 of the License, or
10 (at your option) any later version.
12 This program is distributed in the hope that it will be useful,
13 but WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 GNU General Public License for more details.
17 You should have received a copy of the GNU General Public License
18 along with this program; if not, see <http://www.gnu.org/licenses/>.
22 #include "system/time.h"
23 #include "system/network.h"
24 #include "system/filesys.h"
25 #include "system/wait.h"
26 #include "../include/ctdb_private.h"
27 #include "lib/util/dlinklist.h"
28 #include "lib/tdb_wrap/tdb_wrap.h"
32 ctdb_control_getvnnmap(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
34 struct ctdb_vnn_map_wire *map;
37 CHECK_CONTROL_DATA_SIZE(0);
39 len = offsetof(struct ctdb_vnn_map_wire, map) + sizeof(uint32_t)*ctdb->vnn_map->size;
40 map = talloc_size(outdata, len);
41 CTDB_NO_MEMORY(ctdb, map);
43 map->generation = ctdb->vnn_map->generation;
44 map->size = ctdb->vnn_map->size;
45 memcpy(map->map, ctdb->vnn_map->map, sizeof(uint32_t)*map->size);
48 outdata->dptr = (uint8_t *)map;
54 ctdb_control_setvnnmap(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
56 struct ctdb_vnn_map_wire *map = (struct ctdb_vnn_map_wire *)indata.dptr;
59 for(i=1; i<=NUM_DB_PRIORITIES; i++) {
60 if (ctdb->freeze_mode[i] != CTDB_FREEZE_FROZEN) {
61 DEBUG(DEBUG_ERR,("Attempt to set vnnmap when not frozen\n"));
66 talloc_free(ctdb->vnn_map);
68 ctdb->vnn_map = talloc(ctdb, struct ctdb_vnn_map);
69 CTDB_NO_MEMORY(ctdb, ctdb->vnn_map);
71 ctdb->vnn_map->generation = map->generation;
72 ctdb->vnn_map->size = map->size;
73 ctdb->vnn_map->map = talloc_array(ctdb->vnn_map, uint32_t, map->size);
74 CTDB_NO_MEMORY(ctdb, ctdb->vnn_map->map);
76 memcpy(ctdb->vnn_map->map, map->map, sizeof(uint32_t)*map->size);
82 ctdb_control_getdbmap(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
85 struct ctdb_db_context *ctdb_db;
86 struct ctdb_dbid_map *dbid_map;
88 CHECK_CONTROL_DATA_SIZE(0);
91 for(ctdb_db=ctdb->db_list;ctdb_db;ctdb_db=ctdb_db->next){
96 outdata->dsize = offsetof(struct ctdb_dbid_map, dbs) + sizeof(dbid_map->dbs[0])*len;
97 outdata->dptr = (unsigned char *)talloc_zero_size(outdata, outdata->dsize);
99 DEBUG(DEBUG_ALERT, (__location__ " Failed to allocate dbmap array\n"));
103 dbid_map = (struct ctdb_dbid_map *)outdata->dptr;
105 for (i=0,ctdb_db=ctdb->db_list;ctdb_db;i++,ctdb_db=ctdb_db->next){
106 dbid_map->dbs[i].dbid = ctdb_db->db_id;
107 if (ctdb_db->persistent != 0) {
108 dbid_map->dbs[i].flags |= CTDB_DB_FLAGS_PERSISTENT;
110 if (ctdb_db->readonly != 0) {
111 dbid_map->dbs[i].flags |= CTDB_DB_FLAGS_READONLY;
113 if (ctdb_db->sticky != 0) {
114 dbid_map->dbs[i].flags |= CTDB_DB_FLAGS_STICKY;
122 ctdb_control_getnodemap(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
124 uint32_t i, num_nodes;
125 struct ctdb_node_map *node_map;
127 CHECK_CONTROL_DATA_SIZE(0);
129 num_nodes = ctdb->num_nodes;
131 outdata->dsize = offsetof(struct ctdb_node_map, nodes) + num_nodes*sizeof(struct ctdb_node_and_flags);
132 outdata->dptr = (unsigned char *)talloc_zero_size(outdata, outdata->dsize);
133 if (!outdata->dptr) {
134 DEBUG(DEBUG_ALERT, (__location__ " Failed to allocate nodemap array\n"));
138 node_map = (struct ctdb_node_map *)outdata->dptr;
139 node_map->num = num_nodes;
140 for (i=0; i<num_nodes; i++) {
141 if (parse_ip(ctdb->nodes[i]->address.address,
142 NULL, /* TODO: pass in the correct interface here*/
144 &node_map->nodes[i].addr) == 0)
146 DEBUG(DEBUG_ERR, (__location__ " Failed to parse %s into a sockaddr\n", ctdb->nodes[i]->address.address));
149 node_map->nodes[i].pnn = ctdb->nodes[i]->pnn;
150 node_map->nodes[i].flags = ctdb->nodes[i]->flags;
157 get an old style ipv4-only nodemap
160 ctdb_control_getnodemapv4(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
162 uint32_t i, num_nodes;
163 struct ctdb_node_mapv4 *node_map;
165 CHECK_CONTROL_DATA_SIZE(0);
167 num_nodes = ctdb->num_nodes;
169 outdata->dsize = offsetof(struct ctdb_node_mapv4, nodes) + num_nodes*sizeof(struct ctdb_node_and_flagsv4);
170 outdata->dptr = (unsigned char *)talloc_zero_size(outdata, outdata->dsize);
171 if (!outdata->dptr) {
172 DEBUG(DEBUG_ALERT, (__location__ " Failed to allocate nodemap array\n"));
176 node_map = (struct ctdb_node_mapv4 *)outdata->dptr;
177 node_map->num = num_nodes;
178 for (i=0; i<num_nodes; i++) {
179 if (parse_ipv4(ctdb->nodes[i]->address.address, 0, &node_map->nodes[i].sin) == 0) {
180 DEBUG(DEBUG_ERR, (__location__ " Failed to parse %s into a sockaddr\n", ctdb->nodes[i]->address.address));
184 node_map->nodes[i].pnn = ctdb->nodes[i]->pnn;
185 node_map->nodes[i].flags = ctdb->nodes[i]->flags;
192 ctdb_reload_nodes_event(struct event_context *ev, struct timed_event *te,
193 struct timeval t, void *private_data)
196 struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
198 struct ctdb_node **nodes;
200 tmp_ctx = talloc_new(ctdb);
202 /* steal the old nodes file for a while */
203 talloc_steal(tmp_ctx, ctdb->nodes);
206 num_nodes = ctdb->num_nodes;
209 /* load the new nodes file */
210 ctdb_load_nodes_file(ctdb);
212 for (i=0; i<ctdb->num_nodes; i++) {
213 /* keep any identical pre-existing nodes and connections */
214 if ((i < num_nodes) && ctdb_same_address(&ctdb->nodes[i]->address, &nodes[i]->address)) {
215 talloc_free(ctdb->nodes[i]);
216 ctdb->nodes[i] = talloc_steal(ctdb->nodes, nodes[i]);
220 if (ctdb->nodes[i]->flags & NODE_FLAGS_DELETED) {
224 /* any new or different nodes must be added */
225 if (ctdb->methods->add_node(ctdb->nodes[i]) != 0) {
226 DEBUG(DEBUG_CRIT, (__location__ " methods->add_node failed at %d\n", i));
227 ctdb_fatal(ctdb, "failed to add node. shutting down\n");
229 if (ctdb->methods->connect_node(ctdb->nodes[i]) != 0) {
230 DEBUG(DEBUG_CRIT, (__location__ " methods->add_connect failed at %d\n", i));
231 ctdb_fatal(ctdb, "failed to connect to node. shutting down\n");
235 /* tell the recovery daemon to reaload the nodes file too */
236 ctdb_daemon_send_message(ctdb, ctdb->pnn, CTDB_SRVID_RELOAD_NODES, tdb_null);
238 talloc_free(tmp_ctx);
243 reload the nodes file after a short delay (so that we can send the response
247 ctdb_control_reload_nodes_file(struct ctdb_context *ctdb, uint32_t opcode)
249 event_add_timed(ctdb->ev, ctdb, timeval_current_ofs(1,0), ctdb_reload_nodes_event, ctdb);
255 a traverse function for pulling all relevent records from pulldb
258 struct ctdb_context *ctdb;
259 struct ctdb_db_context *ctdb_db;
260 struct ctdb_marshall_buffer *pulldata;
262 uint32_t allocated_len;
266 static int traverse_pulldb(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p)
268 struct pulldb_data *params = (struct pulldb_data *)p;
269 struct ctdb_rec_data *rec;
270 struct ctdb_context *ctdb = params->ctdb;
271 struct ctdb_db_context *ctdb_db = params->ctdb_db;
273 /* add the record to the blob */
274 rec = ctdb_marshall_record(params->pulldata, 0, key, NULL, data);
276 params->failed = true;
279 if (params->len + rec->length >= params->allocated_len) {
280 params->allocated_len = rec->length + params->len + ctdb->tunable.pulldb_preallocation_size;
281 params->pulldata = talloc_realloc_size(NULL, params->pulldata, params->allocated_len);
283 if (params->pulldata == NULL) {
284 DEBUG(DEBUG_CRIT,(__location__ " Failed to expand pulldb_data to %u\n", rec->length + params->len));
285 ctdb_fatal(params->ctdb, "failed to allocate memory for recovery. shutting down\n");
287 params->pulldata->count++;
288 memcpy(params->len+(uint8_t *)params->pulldata, rec, rec->length);
289 params->len += rec->length;
291 if (ctdb->tunable.db_record_size_warn != 0 && rec->length > ctdb->tunable.db_record_size_warn) {
292 DEBUG(DEBUG_ERR,("Data record in %s is big. Record size is %d bytes\n", ctdb_db->db_name, (int)rec->length));
301 pull a bunch of records from a ltdb, filtering by lmaster
303 int32_t ctdb_control_pull_db(struct ctdb_context *ctdb, TDB_DATA indata, TDB_DATA *outdata)
305 struct ctdb_control_pulldb *pull;
306 struct ctdb_db_context *ctdb_db;
307 struct pulldb_data params;
308 struct ctdb_marshall_buffer *reply;
310 pull = (struct ctdb_control_pulldb *)indata.dptr;
312 ctdb_db = find_ctdb_db(ctdb, pull->db_id);
314 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", pull->db_id));
318 if (ctdb->freeze_mode[ctdb_db->priority] != CTDB_FREEZE_FROZEN) {
319 DEBUG(DEBUG_DEBUG,("rejecting ctdb_control_pull_db when not frozen\n"));
323 reply = talloc_zero(outdata, struct ctdb_marshall_buffer);
324 CTDB_NO_MEMORY(ctdb, reply);
326 reply->db_id = pull->db_id;
329 params.ctdb_db = ctdb_db;
330 params.pulldata = reply;
331 params.len = offsetof(struct ctdb_marshall_buffer, data);
332 params.allocated_len = params.len;
333 params.failed = false;
335 if (ctdb_db->unhealthy_reason) {
336 /* this is just a warning, as the tdb should be empty anyway */
337 DEBUG(DEBUG_WARNING,("db(%s) unhealty in ctdb_control_pull_db: %s\n",
338 ctdb_db->db_name, ctdb_db->unhealthy_reason));
341 if (ctdb_lockall_mark_prio(ctdb, ctdb_db->priority) != 0) {
342 DEBUG(DEBUG_ERR,(__location__ " Failed to get lock on entired db - failing\n"));
346 if (tdb_traverse_read(ctdb_db->ltdb->tdb, traverse_pulldb, ¶ms) == -1) {
347 DEBUG(DEBUG_ERR,(__location__ " Failed to get traverse db '%s'\n", ctdb_db->db_name));
348 ctdb_lockall_unmark_prio(ctdb, ctdb_db->priority);
349 talloc_free(params.pulldata);
353 ctdb_lockall_unmark_prio(ctdb, ctdb_db->priority);
355 outdata->dptr = (uint8_t *)params.pulldata;
356 outdata->dsize = params.len;
358 if (ctdb->tunable.db_record_count_warn != 0 && params.pulldata->count > ctdb->tunable.db_record_count_warn) {
359 DEBUG(DEBUG_ERR,("Database %s is big. Contains %d records\n", ctdb_db->db_name, params.pulldata->count));
361 if (ctdb->tunable.db_size_warn != 0 && outdata->dsize > ctdb->tunable.db_size_warn) {
362 DEBUG(DEBUG_ERR,("Database %s is big. Contains %d bytes\n", ctdb_db->db_name, (int)outdata->dsize));
370 push a bunch of records into a ltdb, filtering by rsn
372 int32_t ctdb_control_push_db(struct ctdb_context *ctdb, TDB_DATA indata)
374 struct ctdb_marshall_buffer *reply = (struct ctdb_marshall_buffer *)indata.dptr;
375 struct ctdb_db_context *ctdb_db;
377 struct ctdb_rec_data *rec;
379 if (indata.dsize < offsetof(struct ctdb_marshall_buffer, data)) {
380 DEBUG(DEBUG_ERR,(__location__ " invalid data in pulldb reply\n"));
384 ctdb_db = find_ctdb_db(ctdb, reply->db_id);
386 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", reply->db_id));
390 if (ctdb->freeze_mode[ctdb_db->priority] != CTDB_FREEZE_FROZEN) {
391 DEBUG(DEBUG_DEBUG,("rejecting ctdb_control_push_db when not frozen\n"));
395 if (ctdb_lockall_mark_prio(ctdb, ctdb_db->priority) != 0) {
396 DEBUG(DEBUG_ERR,(__location__ " Failed to get lock on entired db - failing\n"));
400 rec = (struct ctdb_rec_data *)&reply->data[0];
402 DEBUG(DEBUG_INFO,("starting push of %u records for dbid 0x%x\n",
403 reply->count, reply->db_id));
405 for (i=0;i<reply->count;i++) {
407 struct ctdb_ltdb_header *hdr;
409 key.dptr = &rec->data[0];
410 key.dsize = rec->keylen;
411 data.dptr = &rec->data[key.dsize];
412 data.dsize = rec->datalen;
414 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
415 DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record\n"));
418 hdr = (struct ctdb_ltdb_header *)data.dptr;
419 /* strip off any read only record flags. All readonly records
420 are revoked implicitely by a recovery
422 hdr->flags &= ~CTDB_REC_RO_FLAGS;
424 data.dptr += sizeof(*hdr);
425 data.dsize -= sizeof(*hdr);
427 ret = ctdb_ltdb_store(ctdb_db, key, hdr, data);
429 DEBUG(DEBUG_CRIT, (__location__ " Unable to store record\n"));
433 rec = (struct ctdb_rec_data *)(rec->length + (uint8_t *)rec);
436 DEBUG(DEBUG_DEBUG,("finished push of %u records for dbid 0x%x\n",
437 reply->count, reply->db_id));
439 if (ctdb_db->readonly) {
440 DEBUG(DEBUG_CRIT,("Clearing the tracking database for dbid 0x%x\n",
442 if (tdb_wipe_all(ctdb_db->rottdb) != 0) {
443 DEBUG(DEBUG_ERR,("Failed to wipe tracking database for 0x%x. Dropping read-only delegation support\n", ctdb_db->db_id));
444 ctdb_db->readonly = false;
445 tdb_close(ctdb_db->rottdb);
446 ctdb_db->rottdb = NULL;
447 ctdb_db->readonly = false;
449 while (ctdb_db->revokechild_active != NULL) {
450 talloc_free(ctdb_db->revokechild_active);
454 ctdb_lockall_unmark_prio(ctdb, ctdb_db->priority);
458 ctdb_lockall_unmark_prio(ctdb, ctdb_db->priority);
462 struct ctdb_set_recmode_state {
463 struct ctdb_context *ctdb;
464 struct ctdb_req_control *c;
467 struct timed_event *te;
468 struct fd_event *fde;
470 struct timeval start_time;
474 called if our set_recmode child times out. this would happen if
475 ctdb_recovery_lock() would block.
477 static void ctdb_set_recmode_timeout(struct event_context *ev, struct timed_event *te,
478 struct timeval t, void *private_data)
480 struct ctdb_set_recmode_state *state = talloc_get_type(private_data,
481 struct ctdb_set_recmode_state);
483 /* we consider this a success, not a failure, as we failed to
484 set the recovery lock which is what we wanted. This can be
485 caused by the cluster filesystem being very slow to
486 arbitrate locks immediately after a node failure.
488 DEBUG(DEBUG_ERR,(__location__ " set_recmode child process hung/timedout CFS slow to grant locks? (allowing recmode set anyway)\n"));
489 state->ctdb->recovery_mode = state->recmode;
490 ctdb_request_control_reply(state->ctdb, state->c, NULL, 0, NULL);
495 /* when we free the recmode state we must kill any child process.
497 static int set_recmode_destructor(struct ctdb_set_recmode_state *state)
499 double l = timeval_elapsed(&state->start_time);
501 CTDB_UPDATE_RECLOCK_LATENCY(state->ctdb, "daemon reclock", reclock.ctdbd, l);
503 if (state->fd[0] != -1) {
506 if (state->fd[1] != -1) {
509 ctdb_kill(state->ctdb, state->child, SIGKILL);
513 /* this is called when the client process has completed ctdb_recovery_lock()
514 and has written data back to us through the pipe.
516 static void set_recmode_handler(struct event_context *ev, struct fd_event *fde,
517 uint16_t flags, void *private_data)
519 struct ctdb_set_recmode_state *state= talloc_get_type(private_data,
520 struct ctdb_set_recmode_state);
524 /* we got a response from our child process so we can abort the
527 talloc_free(state->te);
531 /* read the childs status when trying to lock the reclock file.
532 child wrote 0 if everything is fine and 1 if it did manage
533 to lock the file, which would be a problem since that means
534 we got a request to exit from recovery but we could still lock
535 the file which at this time SHOULD be locked by the recovery
536 daemon on the recmaster
538 ret = sys_read(state->fd[0], &c, 1);
539 if (ret != 1 || c != 0) {
540 ctdb_request_control_reply(state->ctdb, state->c, NULL, -1, "managed to lock reclock file from inside daemon");
545 state->ctdb->recovery_mode = state->recmode;
547 /* release any deferred attach calls from clients */
548 if (state->recmode == CTDB_RECOVERY_NORMAL) {
549 ctdb_process_deferred_attach(state->ctdb);
552 ctdb_request_control_reply(state->ctdb, state->c, NULL, 0, NULL);
558 ctdb_drop_all_ips_event(struct event_context *ev, struct timed_event *te,
559 struct timeval t, void *private_data)
561 struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
563 DEBUG(DEBUG_ERR,(__location__ " Been in recovery mode for too long. Dropping all IPS\n"));
564 talloc_free(ctdb->release_ips_ctx);
565 ctdb->release_ips_ctx = NULL;
567 ctdb_release_all_ips(ctdb);
571 * Set up an event to drop all public ips if we remain in recovery for too
574 int ctdb_deferred_drop_all_ips(struct ctdb_context *ctdb)
576 if (ctdb->release_ips_ctx != NULL) {
577 talloc_free(ctdb->release_ips_ctx);
579 ctdb->release_ips_ctx = talloc_new(ctdb);
580 CTDB_NO_MEMORY(ctdb, ctdb->release_ips_ctx);
582 event_add_timed(ctdb->ev, ctdb->release_ips_ctx, timeval_current_ofs(ctdb->tunable.recovery_drop_all_ips, 0), ctdb_drop_all_ips_event, ctdb);
587 set the recovery mode
589 int32_t ctdb_control_set_recmode(struct ctdb_context *ctdb,
590 struct ctdb_req_control *c,
591 TDB_DATA indata, bool *async_reply,
592 const char **errormsg)
594 uint32_t recmode = *(uint32_t *)indata.dptr;
596 struct ctdb_set_recmode_state *state;
597 pid_t parent = getpid();
599 /* if we enter recovery but stay in recovery for too long
600 we will eventually drop all our ip addresses
602 if (recmode == CTDB_RECOVERY_NORMAL) {
603 talloc_free(ctdb->release_ips_ctx);
604 ctdb->release_ips_ctx = NULL;
606 if (ctdb_deferred_drop_all_ips(ctdb) != 0) {
607 DEBUG(DEBUG_ERR,("Failed to set up deferred drop all ips\n"));
611 if (recmode != ctdb->recovery_mode) {
612 DEBUG(DEBUG_NOTICE,(__location__ " Recovery mode set to %s\n",
613 recmode==CTDB_RECOVERY_NORMAL?"NORMAL":"ACTIVE"));
616 if (recmode != CTDB_RECOVERY_NORMAL ||
617 ctdb->recovery_mode != CTDB_RECOVERY_ACTIVE) {
618 ctdb->recovery_mode = recmode;
622 /* some special handling when ending recovery mode */
624 /* force the databases to thaw */
625 for (i=1; i<=NUM_DB_PRIORITIES; i++) {
626 if (ctdb->freeze_handles[i] != NULL) {
627 ctdb_control_thaw(ctdb, i, false);
631 state = talloc(ctdb, struct ctdb_set_recmode_state);
632 CTDB_NO_MEMORY(ctdb, state);
634 state->start_time = timeval_current();
638 /* release any deferred attach calls from clients */
639 if (recmode == CTDB_RECOVERY_NORMAL) {
640 ctdb_process_deferred_attach(ctdb);
643 if (ctdb->recovery_lock_file == NULL) {
644 /* Not using recovery lock file */
645 ctdb->recovery_mode = recmode;
649 /* For the rest of what needs to be done, we need to do this in
650 a child process since
651 1, the call to ctdb_recovery_lock() can block if the cluster
652 filesystem is in the process of recovery.
654 ret = pipe(state->fd);
657 DEBUG(DEBUG_CRIT,(__location__ " Failed to open pipe for set_recmode child\n"));
661 state->child = ctdb_fork(ctdb);
662 if (state->child == (pid_t)-1) {
669 if (state->child == 0) {
673 ctdb_set_process_name("ctdb_recmode");
674 debug_extra = talloc_asprintf(NULL, "set_recmode:");
675 /* we should not be able to get the lock on the reclock file,
676 as it should be held by the recovery master
678 if (ctdb_recovery_lock(ctdb)) {
679 DEBUG(DEBUG_CRIT,("ERROR: recovery lock file %s not locked when recovering!\n", ctdb->recovery_lock_file));
680 ctdb_recovery_unlock(ctdb);
684 sys_write(state->fd[1], &cc, 1);
685 /* make sure we die when our parent dies */
686 while (ctdb_kill(ctdb, parent, 0) == 0 || errno != ESRCH) {
688 sys_write(state->fd[1], &cc, 1);
693 set_close_on_exec(state->fd[0]);
697 talloc_set_destructor(state, set_recmode_destructor);
699 DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d for setrecmode\n", state->fd[0]));
701 state->te = event_add_timed(ctdb->ev, state, timeval_current_ofs(5, 0),
702 ctdb_set_recmode_timeout, state);
704 state->fde = event_add_fd(ctdb->ev, state, state->fd[0],
709 if (state->fde == NULL) {
713 tevent_fd_set_auto_close(state->fde);
716 state->recmode = recmode;
717 state->c = talloc_steal(state, c);
725 bool ctdb_recovery_have_lock(struct ctdb_context *ctdb)
727 return ctdb->recovery_lock_fd != -1;
731 try and get the recovery lock in shared storage - should only work
732 on the recovery master recovery daemon. Anywhere else is a bug
734 bool ctdb_recovery_lock(struct ctdb_context *ctdb)
738 ctdb->recovery_lock_fd = open(ctdb->recovery_lock_file,
739 O_RDWR|O_CREAT, 0600);
740 if (ctdb->recovery_lock_fd == -1) {
742 ("ctdb_recovery_lock: Unable to open %s - (%s)\n",
743 ctdb->recovery_lock_file, strerror(errno)));
747 set_close_on_exec(ctdb->recovery_lock_fd);
749 lock.l_type = F_WRLCK;
750 lock.l_whence = SEEK_SET;
755 if (fcntl(ctdb->recovery_lock_fd, F_SETLK, &lock) != 0) {
756 int saved_errno = errno;
757 close(ctdb->recovery_lock_fd);
758 ctdb->recovery_lock_fd = -1;
759 /* Fail silently on these errors, since they indicate
760 * lock contention, but log an error for any other
762 if (saved_errno != EACCES &&
763 saved_errno != EAGAIN) {
764 DEBUG(DEBUG_ERR,("ctdb_recovery_lock: Failed to get "
765 "recovery lock on '%s' - (%s)\n",
766 ctdb->recovery_lock_file,
767 strerror(saved_errno)));
775 void ctdb_recovery_unlock(struct ctdb_context *ctdb)
777 if (ctdb->recovery_lock_fd != -1) {
778 DEBUG(DEBUG_NOTICE, ("Releasing recovery lock\n"));
779 close(ctdb->recovery_lock_fd);
780 ctdb->recovery_lock_fd = -1;
785 delete a record as part of the vacuum process
786 only delete if we are not lmaster or dmaster, and our rsn is <= the provided rsn
787 use non-blocking locks
789 return 0 if the record was successfully deleted (i.e. it does not exist
790 when the function returns)
791 or !0 is the record still exists in the tdb after returning.
793 static int delete_tdb_record(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, struct ctdb_rec_data *rec)
795 TDB_DATA key, data, data2;
796 struct ctdb_ltdb_header *hdr, *hdr2;
798 /* these are really internal tdb functions - but we need them here for
799 non-blocking lock of the freelist */
800 int tdb_lock_nonblock(struct tdb_context *tdb, int list, int ltype);
801 int tdb_unlock(struct tdb_context *tdb, int list, int ltype);
804 key.dsize = rec->keylen;
805 key.dptr = &rec->data[0];
806 data.dsize = rec->datalen;
807 data.dptr = &rec->data[rec->keylen];
809 if (ctdb_lmaster(ctdb, &key) == ctdb->pnn) {
810 DEBUG(DEBUG_INFO,(__location__ " Called delete on record where we are lmaster\n"));
814 if (data.dsize != sizeof(struct ctdb_ltdb_header)) {
815 DEBUG(DEBUG_ERR,(__location__ " Bad record size\n"));
819 hdr = (struct ctdb_ltdb_header *)data.dptr;
821 /* use a non-blocking lock */
822 if (tdb_chainlock_nonblock(ctdb_db->ltdb->tdb, key) != 0) {
826 data2 = tdb_fetch(ctdb_db->ltdb->tdb, key);
827 if (data2.dptr == NULL) {
828 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
832 if (data2.dsize < sizeof(struct ctdb_ltdb_header)) {
833 if (tdb_lock_nonblock(ctdb_db->ltdb->tdb, -1, F_WRLCK) == 0) {
834 if (tdb_delete(ctdb_db->ltdb->tdb, key) != 0) {
835 DEBUG(DEBUG_CRIT,(__location__ " Failed to delete corrupt record\n"));
837 tdb_unlock(ctdb_db->ltdb->tdb, -1, F_WRLCK);
838 DEBUG(DEBUG_CRIT,(__location__ " Deleted corrupt record\n"));
840 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
845 hdr2 = (struct ctdb_ltdb_header *)data2.dptr;
847 if (hdr2->rsn > hdr->rsn) {
848 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
849 DEBUG(DEBUG_INFO,(__location__ " Skipping record with rsn=%llu - called with rsn=%llu\n",
850 (unsigned long long)hdr2->rsn, (unsigned long long)hdr->rsn));
855 /* do not allow deleting record that have readonly flags set. */
856 if (hdr->flags & CTDB_REC_RO_FLAGS) {
857 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
858 DEBUG(DEBUG_INFO,(__location__ " Skipping record with readonly flags set\n"));
862 if (hdr2->flags & CTDB_REC_RO_FLAGS) {
863 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
864 DEBUG(DEBUG_INFO,(__location__ " Skipping record with readonly flags set\n"));
869 if (hdr2->dmaster == ctdb->pnn) {
870 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
871 DEBUG(DEBUG_INFO,(__location__ " Attempted delete record where we are the dmaster\n"));
876 if (tdb_lock_nonblock(ctdb_db->ltdb->tdb, -1, F_WRLCK) != 0) {
877 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
882 if (tdb_delete(ctdb_db->ltdb->tdb, key) != 0) {
883 tdb_unlock(ctdb_db->ltdb->tdb, -1, F_WRLCK);
884 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
885 DEBUG(DEBUG_INFO,(__location__ " Failed to delete record\n"));
890 tdb_unlock(ctdb_db->ltdb->tdb, -1, F_WRLCK);
891 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
898 struct recovery_callback_state {
899 struct ctdb_req_control *c;
904 called when the 'recovered' event script has finished
906 static void ctdb_end_recovery_callback(struct ctdb_context *ctdb, int status, void *p)
908 struct recovery_callback_state *state = talloc_get_type(p, struct recovery_callback_state);
910 ctdb_enable_monitoring(ctdb);
911 CTDB_INCREMENT_STAT(ctdb, num_recoveries);
914 DEBUG(DEBUG_ERR,(__location__ " recovered event script failed (status %d)\n", status));
915 if (status == -ETIME) {
920 ctdb_request_control_reply(ctdb, state->c, NULL, status, NULL);
923 gettimeofday(&ctdb->last_recovery_finished, NULL);
925 if (ctdb->runstate == CTDB_RUNSTATE_FIRST_RECOVERY) {
926 ctdb_set_runstate(ctdb, CTDB_RUNSTATE_STARTUP);
931 recovery has finished
933 int32_t ctdb_control_end_recovery(struct ctdb_context *ctdb,
934 struct ctdb_req_control *c,
938 struct recovery_callback_state *state;
940 DEBUG(DEBUG_NOTICE,("Recovery has finished\n"));
942 ctdb_persistent_finish_trans3_commits(ctdb);
944 state = talloc(ctdb, struct recovery_callback_state);
945 CTDB_NO_MEMORY(ctdb, state);
949 ctdb_disable_monitoring(ctdb);
951 ret = ctdb_event_script_callback(ctdb, state,
952 ctdb_end_recovery_callback,
954 CTDB_EVENT_RECOVERED, "%s", "");
957 ctdb_enable_monitoring(ctdb);
959 DEBUG(DEBUG_ERR,(__location__ " Failed to end recovery\n"));
964 /* tell the control that we will be reply asynchronously */
965 state->c = talloc_steal(state, c);
971 called when the 'startrecovery' event script has finished
973 static void ctdb_start_recovery_callback(struct ctdb_context *ctdb, int status, void *p)
975 struct recovery_callback_state *state = talloc_get_type(p, struct recovery_callback_state);
978 DEBUG(DEBUG_ERR,(__location__ " startrecovery event script failed (status %d)\n", status));
981 ctdb_request_control_reply(ctdb, state->c, NULL, status, NULL);
986 run the startrecovery eventscript
988 int32_t ctdb_control_start_recovery(struct ctdb_context *ctdb,
989 struct ctdb_req_control *c,
993 struct recovery_callback_state *state;
995 DEBUG(DEBUG_NOTICE,(__location__ " startrecovery eventscript has been invoked\n"));
996 gettimeofday(&ctdb->last_recovery_started, NULL);
998 state = talloc(ctdb, struct recovery_callback_state);
999 CTDB_NO_MEMORY(ctdb, state);
1001 state->c = talloc_steal(state, c);
1003 ctdb_disable_monitoring(ctdb);
1005 ret = ctdb_event_script_callback(ctdb, state,
1006 ctdb_start_recovery_callback,
1008 CTDB_EVENT_START_RECOVERY,
1012 DEBUG(DEBUG_ERR,(__location__ " Failed to start recovery\n"));
1017 /* tell the control that we will be reply asynchronously */
1018 *async_reply = true;
1023 try to delete all these records as part of the vacuuming process
1024 and return the records we failed to delete
1026 int32_t ctdb_control_try_delete_records(struct ctdb_context *ctdb, TDB_DATA indata, TDB_DATA *outdata)
1028 struct ctdb_marshall_buffer *reply = (struct ctdb_marshall_buffer *)indata.dptr;
1029 struct ctdb_db_context *ctdb_db;
1031 struct ctdb_rec_data *rec;
1032 struct ctdb_marshall_buffer *records;
1034 if (indata.dsize < offsetof(struct ctdb_marshall_buffer, data)) {
1035 DEBUG(DEBUG_ERR,(__location__ " invalid data in try_delete_records\n"));
1039 ctdb_db = find_ctdb_db(ctdb, reply->db_id);
1041 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", reply->db_id));
1046 DEBUG(DEBUG_DEBUG,("starting try_delete_records of %u records for dbid 0x%x\n",
1047 reply->count, reply->db_id));
1050 /* create a blob to send back the records we couldnt delete */
1051 records = (struct ctdb_marshall_buffer *)
1052 talloc_zero_size(outdata,
1053 offsetof(struct ctdb_marshall_buffer, data));
1054 if (records == NULL) {
1055 DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
1058 records->db_id = ctdb_db->db_id;
1061 rec = (struct ctdb_rec_data *)&reply->data[0];
1062 for (i=0;i<reply->count;i++) {
1065 key.dptr = &rec->data[0];
1066 key.dsize = rec->keylen;
1067 data.dptr = &rec->data[key.dsize];
1068 data.dsize = rec->datalen;
1070 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
1071 DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record in indata\n"));
1075 /* If we cant delete the record we must add it to the reply
1076 so the lmaster knows it may not purge this record
1078 if (delete_tdb_record(ctdb, ctdb_db, rec) != 0) {
1080 struct ctdb_ltdb_header *hdr;
1082 hdr = (struct ctdb_ltdb_header *)data.dptr;
1083 data.dptr += sizeof(*hdr);
1084 data.dsize -= sizeof(*hdr);
1086 DEBUG(DEBUG_INFO, (__location__ " Failed to vacuum delete record with hash 0x%08x\n", ctdb_hash(&key)));
1088 old_size = talloc_get_size(records);
1089 records = talloc_realloc_size(outdata, records, old_size + rec->length);
1090 if (records == NULL) {
1091 DEBUG(DEBUG_ERR,(__location__ " Failed to expand\n"));
1095 memcpy(old_size+(uint8_t *)records, rec, rec->length);
1098 rec = (struct ctdb_rec_data *)(rec->length + (uint8_t *)rec);
1102 *outdata = ctdb_marshall_finish(records);
1108 * Store a record as part of the vacuum process:
1109 * This is called from the RECEIVE_RECORD control which
1110 * the lmaster uses to send the current empty copy
1111 * to all nodes for storing, before it lets the other
1112 * nodes delete the records in the second phase with
1113 * the TRY_DELETE_RECORDS control.
1115 * Only store if we are not lmaster or dmaster, and our
1116 * rsn is <= the provided rsn. Use non-blocking locks.
1118 * return 0 if the record was successfully stored.
1119 * return !0 if the record still exists in the tdb after returning.
1121 static int store_tdb_record(struct ctdb_context *ctdb,
1122 struct ctdb_db_context *ctdb_db,
1123 struct ctdb_rec_data *rec)
1125 TDB_DATA key, data, data2;
1126 struct ctdb_ltdb_header *hdr, *hdr2;
1129 key.dsize = rec->keylen;
1130 key.dptr = &rec->data[0];
1131 data.dsize = rec->datalen;
1132 data.dptr = &rec->data[rec->keylen];
1134 if (ctdb_lmaster(ctdb, &key) == ctdb->pnn) {
1135 DEBUG(DEBUG_INFO, (__location__ " Called store_tdb_record "
1136 "where we are lmaster\n"));
1140 if (data.dsize != sizeof(struct ctdb_ltdb_header)) {
1141 DEBUG(DEBUG_ERR, (__location__ " Bad record size\n"));
1145 hdr = (struct ctdb_ltdb_header *)data.dptr;
1147 /* use a non-blocking lock */
1148 if (tdb_chainlock_nonblock(ctdb_db->ltdb->tdb, key) != 0) {
1149 DEBUG(DEBUG_INFO, (__location__ " Failed to lock chain in non-blocking mode\n"));
1153 data2 = tdb_fetch(ctdb_db->ltdb->tdb, key);
1154 if (data2.dptr == NULL || data2.dsize < sizeof(struct ctdb_ltdb_header)) {
1155 if (tdb_store(ctdb_db->ltdb->tdb, key, data, 0) == -1) {
1156 DEBUG(DEBUG_ERR, (__location__ "Failed to store record\n"));
1160 DEBUG(DEBUG_INFO, (__location__ " Stored record\n"));
1165 hdr2 = (struct ctdb_ltdb_header *)data2.dptr;
1167 if (hdr2->rsn > hdr->rsn) {
1168 DEBUG(DEBUG_INFO, (__location__ " Skipping record with "
1169 "rsn=%llu - called with rsn=%llu\n",
1170 (unsigned long long)hdr2->rsn,
1171 (unsigned long long)hdr->rsn));
1176 /* do not allow vacuuming of records that have readonly flags set. */
1177 if (hdr->flags & CTDB_REC_RO_FLAGS) {
1178 DEBUG(DEBUG_INFO,(__location__ " Skipping record with readonly "
1183 if (hdr2->flags & CTDB_REC_RO_FLAGS) {
1184 DEBUG(DEBUG_INFO,(__location__ " Skipping record with readonly "
1190 if (hdr2->dmaster == ctdb->pnn) {
1191 DEBUG(DEBUG_INFO, (__location__ " Attempted to store record "
1192 "where we are the dmaster\n"));
1197 if (tdb_store(ctdb_db->ltdb->tdb, key, data, 0) != 0) {
1198 DEBUG(DEBUG_INFO,(__location__ " Failed to store record\n"));
1206 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
1214 * Try to store all these records as part of the vacuuming process
1215 * and return the records we failed to store.
1217 int32_t ctdb_control_receive_records(struct ctdb_context *ctdb,
1218 TDB_DATA indata, TDB_DATA *outdata)
1220 struct ctdb_marshall_buffer *reply = (struct ctdb_marshall_buffer *)indata.dptr;
1221 struct ctdb_db_context *ctdb_db;
1223 struct ctdb_rec_data *rec;
1224 struct ctdb_marshall_buffer *records;
1226 if (indata.dsize < offsetof(struct ctdb_marshall_buffer, data)) {
1228 (__location__ " invalid data in receive_records\n"));
1232 ctdb_db = find_ctdb_db(ctdb, reply->db_id);
1234 DEBUG(DEBUG_ERR, (__location__ " Unknown db 0x%08x\n",
1239 DEBUG(DEBUG_DEBUG, ("starting receive_records of %u records for "
1240 "dbid 0x%x\n", reply->count, reply->db_id));
1242 /* create a blob to send back the records we could not store */
1243 records = (struct ctdb_marshall_buffer *)
1244 talloc_zero_size(outdata,
1245 offsetof(struct ctdb_marshall_buffer, data));
1246 if (records == NULL) {
1247 DEBUG(DEBUG_ERR, (__location__ " Out of memory\n"));
1250 records->db_id = ctdb_db->db_id;
1252 rec = (struct ctdb_rec_data *)&reply->data[0];
1253 for (i=0; i<reply->count; i++) {
1256 key.dptr = &rec->data[0];
1257 key.dsize = rec->keylen;
1258 data.dptr = &rec->data[key.dsize];
1259 data.dsize = rec->datalen;
1261 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
1262 DEBUG(DEBUG_CRIT, (__location__ " bad ltdb record "
1268 * If we can not store the record we must add it to the reply
1269 * so the lmaster knows it may not purge this record.
1271 if (store_tdb_record(ctdb, ctdb_db, rec) != 0) {
1273 struct ctdb_ltdb_header *hdr;
1275 hdr = (struct ctdb_ltdb_header *)data.dptr;
1276 data.dptr += sizeof(*hdr);
1277 data.dsize -= sizeof(*hdr);
1279 DEBUG(DEBUG_INFO, (__location__ " Failed to store "
1280 "record with hash 0x%08x in vacuum "
1281 "via RECEIVE_RECORDS\n",
1284 old_size = talloc_get_size(records);
1285 records = talloc_realloc_size(outdata, records,
1286 old_size + rec->length);
1287 if (records == NULL) {
1288 DEBUG(DEBUG_ERR, (__location__ " Failed to "
1293 memcpy(old_size+(uint8_t *)records, rec, rec->length);
1296 rec = (struct ctdb_rec_data *)(rec->length + (uint8_t *)rec);
1299 *outdata = ctdb_marshall_finish(records);
1308 int32_t ctdb_control_get_capabilities(struct ctdb_context *ctdb, TDB_DATA *outdata)
1310 uint32_t *capabilities = NULL;
1312 capabilities = talloc(outdata, uint32_t);
1313 CTDB_NO_MEMORY(ctdb, capabilities);
1314 *capabilities = ctdb->capabilities;
1316 outdata->dsize = sizeof(uint32_t);
1317 outdata->dptr = (uint8_t *)capabilities;
1322 /* The recovery daemon will ping us at regular intervals.
1323 If we havent been pinged for a while we assume the recovery
1324 daemon is inoperable and we restart.
1326 static void ctdb_recd_ping_timeout(struct event_context *ev, struct timed_event *te, struct timeval t, void *p)
1328 struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context);
1329 uint32_t *count = talloc_get_type(ctdb->recd_ping_count, uint32_t);
1331 DEBUG(DEBUG_ERR, ("Recovery daemon ping timeout. Count : %u\n", *count));
1333 if (*count < ctdb->tunable.recd_ping_failcount) {
1335 event_add_timed(ctdb->ev, ctdb->recd_ping_count,
1336 timeval_current_ofs(ctdb->tunable.recd_ping_timeout, 0),
1337 ctdb_recd_ping_timeout, ctdb);
1341 DEBUG(DEBUG_ERR, ("Final timeout for recovery daemon ping. Restarting recovery daemon. (This can be caused if the cluster filesystem has hung)\n"));
1343 ctdb_stop_recoverd(ctdb);
1344 ctdb_start_recoverd(ctdb);
1347 int32_t ctdb_control_recd_ping(struct ctdb_context *ctdb)
1349 talloc_free(ctdb->recd_ping_count);
1351 ctdb->recd_ping_count = talloc_zero(ctdb, uint32_t);
1352 CTDB_NO_MEMORY(ctdb, ctdb->recd_ping_count);
1354 if (ctdb->tunable.recd_ping_timeout != 0) {
1355 event_add_timed(ctdb->ev, ctdb->recd_ping_count,
1356 timeval_current_ofs(ctdb->tunable.recd_ping_timeout, 0),
1357 ctdb_recd_ping_timeout, ctdb);
1365 int32_t ctdb_control_set_recmaster(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata)
1367 uint32_t new_recmaster;
1369 CHECK_CONTROL_DATA_SIZE(sizeof(uint32_t));
1370 new_recmaster = ((uint32_t *)(&indata.dptr[0]))[0];
1372 if (ctdb->pnn != new_recmaster && ctdb->recovery_master == ctdb->pnn) {
1374 ("This node (%u) is no longer the recovery master\n", ctdb->pnn));
1377 if (ctdb->pnn == new_recmaster && ctdb->recovery_master != new_recmaster) {
1379 ("This node (%u) is now the recovery master\n", ctdb->pnn));
1382 ctdb->recovery_master = new_recmaster;
1387 int32_t ctdb_control_stop_node(struct ctdb_context *ctdb)
1389 DEBUG(DEBUG_NOTICE, ("Stopping node\n"));
1390 ctdb_disable_monitoring(ctdb);
1391 ctdb->nodes[ctdb->pnn]->flags |= NODE_FLAGS_STOPPED;
1396 int32_t ctdb_control_continue_node(struct ctdb_context *ctdb)
1398 DEBUG(DEBUG_NOTICE, ("Continue node\n"));
1399 ctdb->nodes[ctdb->pnn]->flags &= ~NODE_FLAGS_STOPPED;