4 Copyright (C) Andrew Tridgell 2007
5 Copyright (C) Ronnie Sahlberg 2007
7 This program is free software; you can redistribute it and/or modify
8 it under the terms of the GNU General Public License as published by
9 the Free Software Foundation; either version 3 of the License, or
10 (at your option) any later version.
12 This program is distributed in the hope that it will be useful,
13 but WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 GNU General Public License for more details.
17 You should have received a copy of the GNU General Public License
18 along with this program; if not, see <http://www.gnu.org/licenses/>.
21 #include "system/time.h"
22 #include "system/network.h"
23 #include "system/filesys.h"
24 #include "system/wait.h"
30 #include "lib/tdb_wrap/tdb_wrap.h"
31 #include "lib/util/dlinklist.h"
32 #include "lib/util/debug.h"
33 #include "lib/util/time.h"
34 #include "lib/util/util_process.h"
36 #include "ctdb_private.h"
37 #include "ctdb_client.h"
39 #include "common/system.h"
40 #include "common/common.h"
41 #include "common/logging.h"
43 #include "ctdb_cluster_mutex.h"
46 ctdb_control_getvnnmap(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
48 struct ctdb_vnn_map_wire *map;
51 CHECK_CONTROL_DATA_SIZE(0);
53 len = offsetof(struct ctdb_vnn_map_wire, map) + sizeof(uint32_t)*ctdb->vnn_map->size;
54 map = talloc_size(outdata, len);
55 CTDB_NO_MEMORY(ctdb, map);
57 map->generation = ctdb->vnn_map->generation;
58 map->size = ctdb->vnn_map->size;
59 memcpy(map->map, ctdb->vnn_map->map, sizeof(uint32_t)*map->size);
62 outdata->dptr = (uint8_t *)map;
68 ctdb_control_setvnnmap(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
70 struct ctdb_vnn_map_wire *map = (struct ctdb_vnn_map_wire *)indata.dptr;
72 if (ctdb->recovery_mode != CTDB_RECOVERY_ACTIVE) {
73 DEBUG(DEBUG_ERR, ("Attempt to set vnnmap when not in recovery\n"));
77 talloc_free(ctdb->vnn_map);
79 ctdb->vnn_map = talloc(ctdb, struct ctdb_vnn_map);
80 CTDB_NO_MEMORY(ctdb, ctdb->vnn_map);
82 ctdb->vnn_map->generation = map->generation;
83 ctdb->vnn_map->size = map->size;
84 ctdb->vnn_map->map = talloc_array(ctdb->vnn_map, uint32_t, map->size);
85 CTDB_NO_MEMORY(ctdb, ctdb->vnn_map->map);
87 memcpy(ctdb->vnn_map->map, map->map, sizeof(uint32_t)*map->size);
93 ctdb_control_getdbmap(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
96 struct ctdb_db_context *ctdb_db;
97 struct ctdb_dbid_map_old *dbid_map;
99 CHECK_CONTROL_DATA_SIZE(0);
102 for(ctdb_db=ctdb->db_list;ctdb_db;ctdb_db=ctdb_db->next){
107 outdata->dsize = offsetof(struct ctdb_dbid_map_old, dbs) + sizeof(dbid_map->dbs[0])*len;
108 outdata->dptr = (unsigned char *)talloc_zero_size(outdata, outdata->dsize);
109 if (!outdata->dptr) {
110 DEBUG(DEBUG_ALERT, (__location__ " Failed to allocate dbmap array\n"));
114 dbid_map = (struct ctdb_dbid_map_old *)outdata->dptr;
116 for (i=0,ctdb_db=ctdb->db_list;ctdb_db;i++,ctdb_db=ctdb_db->next){
117 dbid_map->dbs[i].db_id = ctdb_db->db_id;
118 if (ctdb_db->persistent != 0) {
119 dbid_map->dbs[i].flags |= CTDB_DB_FLAGS_PERSISTENT;
121 if (ctdb_db->readonly != 0) {
122 dbid_map->dbs[i].flags |= CTDB_DB_FLAGS_READONLY;
124 if (ctdb_db->sticky != 0) {
125 dbid_map->dbs[i].flags |= CTDB_DB_FLAGS_STICKY;
133 ctdb_control_getnodemap(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
135 CHECK_CONTROL_DATA_SIZE(0);
137 outdata->dptr = (unsigned char *)ctdb_node_list_to_map(ctdb->nodes,
140 if (outdata->dptr == NULL) {
144 outdata->dsize = talloc_get_size(outdata->dptr);
150 reload the nodes file
153 ctdb_control_reload_nodes_file(struct ctdb_context *ctdb, uint32_t opcode)
157 struct ctdb_node **nodes;
159 tmp_ctx = talloc_new(ctdb);
161 /* steal the old nodes file for a while */
162 talloc_steal(tmp_ctx, ctdb->nodes);
165 num_nodes = ctdb->num_nodes;
168 /* load the new nodes file */
169 ctdb_load_nodes_file(ctdb);
171 for (i=0; i<ctdb->num_nodes; i++) {
172 /* keep any identical pre-existing nodes and connections */
173 if ((i < num_nodes) && ctdb_same_address(&ctdb->nodes[i]->address, &nodes[i]->address)) {
174 talloc_free(ctdb->nodes[i]);
175 ctdb->nodes[i] = talloc_steal(ctdb->nodes, nodes[i]);
179 if (ctdb->nodes[i]->flags & NODE_FLAGS_DELETED) {
183 /* any new or different nodes must be added */
184 if (ctdb->methods->add_node(ctdb->nodes[i]) != 0) {
185 DEBUG(DEBUG_CRIT, (__location__ " methods->add_node failed at %d\n", i));
186 ctdb_fatal(ctdb, "failed to add node. shutting down\n");
188 if (ctdb->methods->connect_node(ctdb->nodes[i]) != 0) {
189 DEBUG(DEBUG_CRIT, (__location__ " methods->add_connect failed at %d\n", i));
190 ctdb_fatal(ctdb, "failed to connect to node. shutting down\n");
194 /* tell the recovery daemon to reaload the nodes file too */
195 ctdb_daemon_send_message(ctdb, ctdb->pnn, CTDB_SRVID_RELOAD_NODES, tdb_null);
197 talloc_free(tmp_ctx);
203 a traverse function for pulling all relevent records from pulldb
206 struct ctdb_context *ctdb;
207 struct ctdb_db_context *ctdb_db;
208 struct ctdb_marshall_buffer *pulldata;
210 uint32_t allocated_len;
214 static int traverse_pulldb(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p)
216 struct pulldb_data *params = (struct pulldb_data *)p;
217 struct ctdb_rec_data_old *rec;
218 struct ctdb_context *ctdb = params->ctdb;
219 struct ctdb_db_context *ctdb_db = params->ctdb_db;
221 /* add the record to the blob */
222 rec = ctdb_marshall_record(params->pulldata, 0, key, NULL, data);
224 params->failed = true;
227 if (params->len + rec->length >= params->allocated_len) {
228 params->allocated_len = rec->length + params->len + ctdb->tunable.pulldb_preallocation_size;
229 params->pulldata = talloc_realloc_size(NULL, params->pulldata, params->allocated_len);
231 if (params->pulldata == NULL) {
232 DEBUG(DEBUG_CRIT,(__location__ " Failed to expand pulldb_data to %u\n", rec->length + params->len));
233 ctdb_fatal(params->ctdb, "failed to allocate memory for recovery. shutting down\n");
235 params->pulldata->count++;
236 memcpy(params->len+(uint8_t *)params->pulldata, rec, rec->length);
237 params->len += rec->length;
239 if (ctdb->tunable.db_record_size_warn != 0 && rec->length > ctdb->tunable.db_record_size_warn) {
240 DEBUG(DEBUG_ERR,("Data record in %s is big. Record size is %d bytes\n", ctdb_db->db_name, (int)rec->length));
249 pull a bunch of records from a ltdb, filtering by lmaster
251 int32_t ctdb_control_pull_db(struct ctdb_context *ctdb, TDB_DATA indata, TDB_DATA *outdata)
253 struct ctdb_pulldb *pull;
254 struct ctdb_db_context *ctdb_db;
255 struct pulldb_data params;
256 struct ctdb_marshall_buffer *reply;
258 pull = (struct ctdb_pulldb *)indata.dptr;
260 ctdb_db = find_ctdb_db(ctdb, pull->db_id);
262 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", pull->db_id));
266 if (!ctdb_db_frozen(ctdb_db)) {
268 ("rejecting ctdb_control_pull_db when not frozen\n"));
272 reply = talloc_zero(outdata, struct ctdb_marshall_buffer);
273 CTDB_NO_MEMORY(ctdb, reply);
275 reply->db_id = pull->db_id;
278 params.ctdb_db = ctdb_db;
279 params.pulldata = reply;
280 params.len = offsetof(struct ctdb_marshall_buffer, data);
281 params.allocated_len = params.len;
282 params.failed = false;
284 if (ctdb_db->unhealthy_reason) {
285 /* this is just a warning, as the tdb should be empty anyway */
286 DEBUG(DEBUG_WARNING,("db(%s) unhealty in ctdb_control_pull_db: %s\n",
287 ctdb_db->db_name, ctdb_db->unhealthy_reason));
290 if (ctdb_lockdb_mark(ctdb_db) != 0) {
291 DEBUG(DEBUG_ERR,(__location__ " Failed to get lock on entire db - failing\n"));
295 if (tdb_traverse_read(ctdb_db->ltdb->tdb, traverse_pulldb, ¶ms) == -1) {
296 DEBUG(DEBUG_ERR,(__location__ " Failed to get traverse db '%s'\n", ctdb_db->db_name));
297 ctdb_lockdb_unmark(ctdb_db);
298 talloc_free(params.pulldata);
302 ctdb_lockdb_unmark(ctdb_db);
304 outdata->dptr = (uint8_t *)params.pulldata;
305 outdata->dsize = params.len;
307 if (ctdb->tunable.db_record_count_warn != 0 && params.pulldata->count > ctdb->tunable.db_record_count_warn) {
308 DEBUG(DEBUG_ERR,("Database %s is big. Contains %d records\n", ctdb_db->db_name, params.pulldata->count));
310 if (ctdb->tunable.db_size_warn != 0 && outdata->dsize > ctdb->tunable.db_size_warn) {
311 DEBUG(DEBUG_ERR,("Database %s is big. Contains %d bytes\n", ctdb_db->db_name, (int)outdata->dsize));
318 struct db_pull_state {
319 struct ctdb_context *ctdb;
320 struct ctdb_db_context *ctdb_db;
321 struct ctdb_marshall_buffer *recs;
324 uint32_t num_records;
327 static int traverse_db_pull(struct tdb_context *tdb, TDB_DATA key,
328 TDB_DATA data, void *private_data)
330 struct db_pull_state *state = (struct db_pull_state *)private_data;
331 struct ctdb_marshall_buffer *recs;
333 recs = ctdb_marshall_add(state->ctdb, state->recs,
334 state->ctdb_db->db_id, 0, key, NULL, data);
336 TALLOC_FREE(state->recs);
341 if (talloc_get_size(state->recs) >=
342 state->ctdb->tunable.rec_buffer_size_limit) {
346 buffer = ctdb_marshall_finish(state->recs);
347 ret = ctdb_daemon_send_message(state->ctdb, state->pnn,
348 state->srvid, buffer);
350 TALLOC_FREE(state->recs);
354 state->num_records += state->recs->count;
355 TALLOC_FREE(state->recs);
361 int32_t ctdb_control_db_pull(struct ctdb_context *ctdb,
362 struct ctdb_req_control_old *c,
363 TDB_DATA indata, TDB_DATA *outdata)
365 struct ctdb_pulldb_ext *pulldb_ext;
366 struct ctdb_db_context *ctdb_db;
367 struct db_pull_state state;
370 pulldb_ext = (struct ctdb_pulldb_ext *)indata.dptr;
372 ctdb_db = find_ctdb_db(ctdb, pulldb_ext->db_id);
373 if (ctdb_db == NULL) {
374 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n",
379 if (!ctdb_db_frozen(ctdb_db)) {
381 ("rejecting ctdb_control_pull_db when not frozen\n"));
385 if (ctdb_db->unhealthy_reason) {
386 /* this is just a warning, as the tdb should be empty anyway */
388 ("db(%s) unhealty in ctdb_control_db_pull: %s\n",
389 ctdb_db->db_name, ctdb_db->unhealthy_reason));
393 state.ctdb_db = ctdb_db;
395 state.pnn = c->hdr.srcnode;
396 state.srvid = pulldb_ext->srvid;
397 state.num_records = 0;
399 if (ctdb_lockdb_mark(ctdb_db) != 0) {
401 (__location__ " Failed to get lock on entire db - failing\n"));
405 ret = tdb_traverse_read(ctdb_db->ltdb->tdb, traverse_db_pull, &state);
408 (__location__ " Failed to get traverse db '%s'\n",
410 ctdb_lockdb_unmark(ctdb_db);
414 /* Last few records */
415 if (state.recs != NULL) {
418 buffer = ctdb_marshall_finish(state.recs);
419 ret = ctdb_daemon_send_message(state.ctdb, state.pnn,
420 state.srvid, buffer);
422 TALLOC_FREE(state.recs);
423 ctdb_lockdb_unmark(ctdb_db);
427 state.num_records += state.recs->count;
428 TALLOC_FREE(state.recs);
431 ctdb_lockdb_unmark(ctdb_db);
433 outdata->dptr = talloc_size(outdata, sizeof(uint32_t));
434 if (outdata->dptr == NULL) {
435 DEBUG(DEBUG_ERR, (__location__ " Memory allocation error\n"));
439 memcpy(outdata->dptr, (uint8_t *)&state.num_records, sizeof(uint32_t));
440 outdata->dsize = sizeof(uint32_t);
446 push a bunch of records into a ltdb, filtering by rsn
448 int32_t ctdb_control_push_db(struct ctdb_context *ctdb, TDB_DATA indata)
450 struct ctdb_marshall_buffer *reply = (struct ctdb_marshall_buffer *)indata.dptr;
451 struct ctdb_db_context *ctdb_db;
453 struct ctdb_rec_data_old *rec;
455 if (indata.dsize < offsetof(struct ctdb_marshall_buffer, data)) {
456 DEBUG(DEBUG_ERR,(__location__ " invalid data in pulldb reply\n"));
460 ctdb_db = find_ctdb_db(ctdb, reply->db_id);
462 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", reply->db_id));
466 if (!ctdb_db_frozen(ctdb_db)) {
468 ("rejecting ctdb_control_push_db when not frozen\n"));
472 if (ctdb_lockdb_mark(ctdb_db) != 0) {
473 DEBUG(DEBUG_ERR,(__location__ " Failed to get lock on entire db - failing\n"));
477 rec = (struct ctdb_rec_data_old *)&reply->data[0];
479 DEBUG(DEBUG_INFO,("starting push of %u records for dbid 0x%x\n",
480 reply->count, reply->db_id));
482 for (i=0;i<reply->count;i++) {
484 struct ctdb_ltdb_header *hdr;
486 key.dptr = &rec->data[0];
487 key.dsize = rec->keylen;
488 data.dptr = &rec->data[key.dsize];
489 data.dsize = rec->datalen;
491 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
492 DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record\n"));
495 hdr = (struct ctdb_ltdb_header *)data.dptr;
496 /* strip off any read only record flags. All readonly records
497 are revoked implicitely by a recovery
499 hdr->flags &= ~CTDB_REC_RO_FLAGS;
501 data.dptr += sizeof(*hdr);
502 data.dsize -= sizeof(*hdr);
504 ret = ctdb_ltdb_store(ctdb_db, key, hdr, data);
506 DEBUG(DEBUG_CRIT, (__location__ " Unable to store record\n"));
510 rec = (struct ctdb_rec_data_old *)(rec->length + (uint8_t *)rec);
513 DEBUG(DEBUG_DEBUG,("finished push of %u records for dbid 0x%x\n",
514 reply->count, reply->db_id));
516 if (ctdb_db->readonly) {
517 DEBUG(DEBUG_CRIT,("Clearing the tracking database for dbid 0x%x\n",
519 if (tdb_wipe_all(ctdb_db->rottdb) != 0) {
520 DEBUG(DEBUG_ERR,("Failed to wipe tracking database for 0x%x. Dropping read-only delegation support\n", ctdb_db->db_id));
521 ctdb_db->readonly = false;
522 tdb_close(ctdb_db->rottdb);
523 ctdb_db->rottdb = NULL;
524 ctdb_db->readonly = false;
526 while (ctdb_db->revokechild_active != NULL) {
527 talloc_free(ctdb_db->revokechild_active);
531 ctdb_lockdb_unmark(ctdb_db);
535 ctdb_lockdb_unmark(ctdb_db);
539 struct db_push_state {
540 struct ctdb_context *ctdb;
541 struct ctdb_db_context *ctdb_db;
543 uint32_t num_records;
547 static void db_push_msg_handler(uint64_t srvid, TDB_DATA indata,
550 struct db_push_state *state = talloc_get_type(
551 private_data, struct db_push_state);
552 struct ctdb_marshall_buffer *recs;
553 struct ctdb_rec_data_old *rec;
560 recs = (struct ctdb_marshall_buffer *)indata.dptr;
561 rec = (struct ctdb_rec_data_old *)&recs->data[0];
563 DEBUG(DEBUG_INFO, ("starting push of %u records for dbid 0x%x\n",
564 recs->count, recs->db_id));
566 for (i=0; i<recs->count; i++) {
568 struct ctdb_ltdb_header *hdr;
570 key.dptr = &rec->data[0];
571 key.dsize = rec->keylen;
572 data.dptr = &rec->data[key.dsize];
573 data.dsize = rec->datalen;
575 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
576 DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record\n"));
580 hdr = (struct ctdb_ltdb_header *)data.dptr;
581 /* Strip off any read only record flags.
582 * All readonly records are revoked implicitely by a recovery.
584 hdr->flags &= ~CTDB_REC_RO_FLAGS;
586 data.dptr += sizeof(*hdr);
587 data.dsize -= sizeof(*hdr);
589 ret = ctdb_ltdb_store(state->ctdb_db, key, hdr, data);
592 (__location__ " Unable to store record\n"));
596 rec = (struct ctdb_rec_data_old *)(rec->length + (uint8_t *)rec);
599 DEBUG(DEBUG_DEBUG, ("finished push of %u records for dbid 0x%x\n",
600 recs->count, recs->db_id));
602 state->num_records += recs->count;
606 state->failed = true;
609 int32_t ctdb_control_db_push_start(struct ctdb_context *ctdb, TDB_DATA indata)
611 struct ctdb_pulldb_ext *pulldb_ext;
612 struct ctdb_db_context *ctdb_db;
613 struct db_push_state *state;
616 pulldb_ext = (struct ctdb_pulldb_ext *)indata.dptr;
618 ctdb_db = find_ctdb_db(ctdb, pulldb_ext->db_id);
619 if (ctdb_db == NULL) {
621 (__location__ " Unknown db 0x%08x\n", pulldb_ext->db_id));
625 if (!ctdb_db_frozen(ctdb_db)) {
627 ("rejecting ctdb_control_db_push_start when not frozen\n"));
631 if (ctdb_db->push_started) {
633 (__location__ " DB push already started for %s\n",
636 /* De-register old state */
637 state = (struct db_push_state *)ctdb_db->push_state;
639 srvid_deregister(ctdb->srv, state->srvid, state);
641 ctdb_db->push_state = NULL;
645 state = talloc_zero(ctdb_db, struct db_push_state);
647 DEBUG(DEBUG_ERR, (__location__ " Memory allocation error\n"));
652 state->ctdb_db = ctdb_db;
653 state->srvid = pulldb_ext->srvid;
654 state->failed = false;
656 ret = srvid_register(ctdb->srv, state, state->srvid,
657 db_push_msg_handler, state);
660 (__location__ " Failed to register srvid for db push\n"));
665 if (ctdb_lockdb_mark(ctdb_db) != 0) {
667 (__location__ " Failed to get lock on entire db - failing\n"));
668 srvid_deregister(ctdb->srv, state->srvid, state);
673 ctdb_db->push_started = true;
674 ctdb_db->push_state = state;
679 int32_t ctdb_control_db_push_confirm(struct ctdb_context *ctdb,
680 TDB_DATA indata, TDB_DATA *outdata)
683 struct ctdb_db_context *ctdb_db;
684 struct db_push_state *state;
686 db_id = *(uint32_t *)indata.dptr;
688 ctdb_db = find_ctdb_db(ctdb, db_id);
689 if (ctdb_db == NULL) {
690 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", db_id));
694 if (!ctdb_db_frozen(ctdb_db)) {
696 ("rejecting ctdb_control_db_push_confirm when not frozen\n"));
700 if (!ctdb_db->push_started) {
701 DEBUG(DEBUG_ERR, (__location__ " DB push not started\n"));
705 if (ctdb_db->readonly) {
707 ("Clearing the tracking database for dbid 0x%x\n",
709 if (tdb_wipe_all(ctdb_db->rottdb) != 0) {
711 ("Failed to wipe tracking database for 0x%x."
712 " Dropping read-only delegation support\n",
714 ctdb_db->readonly = false;
715 tdb_close(ctdb_db->rottdb);
716 ctdb_db->rottdb = NULL;
717 ctdb_db->readonly = false;
720 while (ctdb_db->revokechild_active != NULL) {
721 talloc_free(ctdb_db->revokechild_active);
725 ctdb_lockdb_unmark(ctdb_db);
727 state = (struct db_push_state *)ctdb_db->push_state;
729 DEBUG(DEBUG_ERR, (__location__ " Missing push db state\n"));
733 srvid_deregister(ctdb->srv, state->srvid, state);
735 outdata->dptr = talloc_size(outdata, sizeof(uint32_t));
736 if (outdata->dptr == NULL) {
737 DEBUG(DEBUG_ERR, (__location__ " Memory allocation error\n"));
739 ctdb_db->push_state = NULL;
743 memcpy(outdata->dptr, (uint8_t *)&state->num_records, sizeof(uint32_t));
744 outdata->dsize = sizeof(uint32_t);
747 ctdb_db->push_state = NULL;
752 static void set_recmode_handler(struct ctdb_context *ctdb,
755 struct ctdb_cluster_mutex_handle *h,
758 /* It would be good to use talloc_get_type() here. However,
759 * the name of the packet is manually set - not sure why.
760 * Could use talloc_check_name() but this seems like a lot of
761 * manual overkill. */
762 struct ctdb_req_control_old *c =
763 (struct ctdb_req_control_old *) private_data;
765 const char *err = NULL;
771 ("ERROR: Daemon able to take recovery lock on \"%s\" during recovery\n",
772 ctdb->recovery_lock_file));
774 err = "Took recovery lock from daemon during recovery - probably a cluster filesystem lock coherence problem";
779 DEBUG(DEBUG_DEBUG, (__location__ " Recovery lock check OK\n"));
780 ctdb->recovery_mode = CTDB_RECOVERY_NORMAL;
781 ctdb_process_deferred_attach(ctdb);
785 CTDB_UPDATE_RECLOCK_LATENCY(ctdb, "daemon reclock",
786 reclock.ctdbd, latency);
790 /* Timeout. Consider this a success, not a failure,
791 * as we failed to set the recovery lock which is what
792 * we wanted. This can be caused by the cluster
793 * filesystem being very slow to arbitrate locks
794 * immediately after a node failure. */
797 "Time out getting recovery lock, allowing recmode set anyway\n"));
798 ctdb->recovery_mode = CTDB_RECOVERY_NORMAL;
799 ctdb_process_deferred_attach(ctdb);
806 ("Unexpected error when testing recovery lock\n"));
808 err = "Unexpected error when testing recovery lock";
811 ctdb_request_control_reply(ctdb, c, NULL, s, err);
816 ctdb_drop_all_ips_event(struct tevent_context *ev, struct tevent_timer *te,
817 struct timeval t, void *private_data)
819 struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
821 DEBUG(DEBUG_ERR,(__location__ " Been in recovery mode for too long. Dropping all IPS\n"));
822 talloc_free(ctdb->release_ips_ctx);
823 ctdb->release_ips_ctx = NULL;
825 ctdb_release_all_ips(ctdb);
829 * Set up an event to drop all public ips if we remain in recovery for too
832 int ctdb_deferred_drop_all_ips(struct ctdb_context *ctdb)
834 if (ctdb->release_ips_ctx != NULL) {
835 talloc_free(ctdb->release_ips_ctx);
837 ctdb->release_ips_ctx = talloc_new(ctdb);
838 CTDB_NO_MEMORY(ctdb, ctdb->release_ips_ctx);
840 tevent_add_timer(ctdb->ev, ctdb->release_ips_ctx,
841 timeval_current_ofs(ctdb->tunable.recovery_drop_all_ips, 0),
842 ctdb_drop_all_ips_event, ctdb);
847 set the recovery mode
849 int32_t ctdb_control_set_recmode(struct ctdb_context *ctdb,
850 struct ctdb_req_control_old *c,
851 TDB_DATA indata, bool *async_reply,
852 const char **errormsg)
854 uint32_t recmode = *(uint32_t *)indata.dptr;
856 struct ctdb_db_context *ctdb_db;
857 struct ctdb_cluster_mutex_handle *h;
859 /* if we enter recovery but stay in recovery for too long
860 we will eventually drop all our ip addresses
862 if (recmode == CTDB_RECOVERY_NORMAL) {
863 talloc_free(ctdb->release_ips_ctx);
864 ctdb->release_ips_ctx = NULL;
866 if (ctdb_deferred_drop_all_ips(ctdb) != 0) {
867 DEBUG(DEBUG_ERR,("Failed to set up deferred drop all ips\n"));
871 if (recmode != ctdb->recovery_mode) {
872 DEBUG(DEBUG_NOTICE,(__location__ " Recovery mode set to %s\n",
873 recmode==CTDB_RECOVERY_NORMAL?"NORMAL":"ACTIVE"));
876 if (recmode != CTDB_RECOVERY_NORMAL ||
877 ctdb->recovery_mode != CTDB_RECOVERY_ACTIVE) {
878 ctdb->recovery_mode = recmode;
882 /* From this point: recmode == CTDB_RECOVERY_NORMAL
884 * Therefore, what follows is special handling when setting
885 * recovery mode back to normal */
887 for (ctdb_db = ctdb->db_list; ctdb_db != NULL; ctdb_db = ctdb_db->next) {
888 if (ctdb_db->generation != ctdb->vnn_map->generation) {
890 ("Inconsistent DB generation %u for %s\n",
891 ctdb_db->generation, ctdb_db->db_name));
892 DEBUG(DEBUG_ERR, ("Recovery mode set to ACTIVE\n"));
897 /* force the databases to thaw */
898 for (i=1; i<=NUM_DB_PRIORITIES; i++) {
899 if (ctdb_db_prio_frozen(ctdb, i)) {
900 ctdb_control_thaw(ctdb, i, false);
904 if (ctdb->recovery_lock_file == NULL) {
905 /* Not using recovery lock file */
906 ctdb->recovery_mode = CTDB_RECOVERY_NORMAL;
907 ctdb_process_deferred_attach(ctdb);
911 h = ctdb_cluster_mutex(ctdb, ctdb->recovery_lock_file, 5);
916 /* set_recmode_handler() frees h */
917 ctdb_cluster_mutex_set_handler(h,
927 delete a record as part of the vacuum process
928 only delete if we are not lmaster or dmaster, and our rsn is <= the provided rsn
929 use non-blocking locks
931 return 0 if the record was successfully deleted (i.e. it does not exist
932 when the function returns)
933 or !0 is the record still exists in the tdb after returning.
935 static int delete_tdb_record(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, struct ctdb_rec_data_old *rec)
937 TDB_DATA key, data, data2;
938 struct ctdb_ltdb_header *hdr, *hdr2;
940 /* these are really internal tdb functions - but we need them here for
941 non-blocking lock of the freelist */
942 int tdb_lock_nonblock(struct tdb_context *tdb, int list, int ltype);
943 int tdb_unlock(struct tdb_context *tdb, int list, int ltype);
946 key.dsize = rec->keylen;
947 key.dptr = &rec->data[0];
948 data.dsize = rec->datalen;
949 data.dptr = &rec->data[rec->keylen];
951 if (ctdb_lmaster(ctdb, &key) == ctdb->pnn) {
952 DEBUG(DEBUG_INFO,(__location__ " Called delete on record where we are lmaster\n"));
956 if (data.dsize != sizeof(struct ctdb_ltdb_header)) {
957 DEBUG(DEBUG_ERR,(__location__ " Bad record size\n"));
961 hdr = (struct ctdb_ltdb_header *)data.dptr;
963 /* use a non-blocking lock */
964 if (tdb_chainlock_nonblock(ctdb_db->ltdb->tdb, key) != 0) {
968 data2 = tdb_fetch(ctdb_db->ltdb->tdb, key);
969 if (data2.dptr == NULL) {
970 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
974 if (data2.dsize < sizeof(struct ctdb_ltdb_header)) {
975 if (tdb_lock_nonblock(ctdb_db->ltdb->tdb, -1, F_WRLCK) == 0) {
976 if (tdb_delete(ctdb_db->ltdb->tdb, key) != 0) {
977 DEBUG(DEBUG_CRIT,(__location__ " Failed to delete corrupt record\n"));
979 tdb_unlock(ctdb_db->ltdb->tdb, -1, F_WRLCK);
980 DEBUG(DEBUG_CRIT,(__location__ " Deleted corrupt record\n"));
982 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
987 hdr2 = (struct ctdb_ltdb_header *)data2.dptr;
989 if (hdr2->rsn > hdr->rsn) {
990 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
991 DEBUG(DEBUG_INFO,(__location__ " Skipping record with rsn=%llu - called with rsn=%llu\n",
992 (unsigned long long)hdr2->rsn, (unsigned long long)hdr->rsn));
997 /* do not allow deleting record that have readonly flags set. */
998 if (hdr->flags & CTDB_REC_RO_FLAGS) {
999 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
1000 DEBUG(DEBUG_INFO,(__location__ " Skipping record with readonly flags set\n"));
1004 if (hdr2->flags & CTDB_REC_RO_FLAGS) {
1005 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
1006 DEBUG(DEBUG_INFO,(__location__ " Skipping record with readonly flags set\n"));
1011 if (hdr2->dmaster == ctdb->pnn) {
1012 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
1013 DEBUG(DEBUG_INFO,(__location__ " Attempted delete record where we are the dmaster\n"));
1018 if (tdb_lock_nonblock(ctdb_db->ltdb->tdb, -1, F_WRLCK) != 0) {
1019 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
1024 if (tdb_delete(ctdb_db->ltdb->tdb, key) != 0) {
1025 tdb_unlock(ctdb_db->ltdb->tdb, -1, F_WRLCK);
1026 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
1027 DEBUG(DEBUG_INFO,(__location__ " Failed to delete record\n"));
1032 tdb_unlock(ctdb_db->ltdb->tdb, -1, F_WRLCK);
1033 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
1040 struct recovery_callback_state {
1041 struct ctdb_req_control_old *c;
1046 called when the 'recovered' event script has finished
1048 static void ctdb_end_recovery_callback(struct ctdb_context *ctdb, int status, void *p)
1050 struct recovery_callback_state *state = talloc_get_type(p, struct recovery_callback_state);
1052 ctdb_enable_monitoring(ctdb);
1053 CTDB_INCREMENT_STAT(ctdb, num_recoveries);
1056 DEBUG(DEBUG_ERR,(__location__ " recovered event script failed (status %d)\n", status));
1057 if (status == -ETIME) {
1058 ctdb_ban_self(ctdb);
1062 ctdb_request_control_reply(ctdb, state->c, NULL, status, NULL);
1065 gettimeofday(&ctdb->last_recovery_finished, NULL);
1067 if (ctdb->runstate == CTDB_RUNSTATE_FIRST_RECOVERY) {
1068 ctdb_set_runstate(ctdb, CTDB_RUNSTATE_STARTUP);
1073 recovery has finished
1075 int32_t ctdb_control_end_recovery(struct ctdb_context *ctdb,
1076 struct ctdb_req_control_old *c,
1080 struct recovery_callback_state *state;
1082 DEBUG(DEBUG_NOTICE,("Recovery has finished\n"));
1084 ctdb_persistent_finish_trans3_commits(ctdb);
1086 state = talloc(ctdb, struct recovery_callback_state);
1087 CTDB_NO_MEMORY(ctdb, state);
1091 ctdb_disable_monitoring(ctdb);
1093 ret = ctdb_event_script_callback(ctdb, state,
1094 ctdb_end_recovery_callback,
1096 CTDB_EVENT_RECOVERED, "%s", "");
1099 ctdb_enable_monitoring(ctdb);
1101 DEBUG(DEBUG_ERR,(__location__ " Failed to end recovery\n"));
1106 /* tell the control that we will be reply asynchronously */
1107 state->c = talloc_steal(state, c);
1108 *async_reply = true;
1113 called when the 'startrecovery' event script has finished
1115 static void ctdb_start_recovery_callback(struct ctdb_context *ctdb, int status, void *p)
1117 struct recovery_callback_state *state = talloc_get_type(p, struct recovery_callback_state);
1120 DEBUG(DEBUG_ERR,(__location__ " startrecovery event script failed (status %d)\n", status));
1123 ctdb_request_control_reply(ctdb, state->c, NULL, status, NULL);
1128 run the startrecovery eventscript
1130 int32_t ctdb_control_start_recovery(struct ctdb_context *ctdb,
1131 struct ctdb_req_control_old *c,
1135 struct recovery_callback_state *state;
1137 DEBUG(DEBUG_NOTICE,(__location__ " startrecovery eventscript has been invoked\n"));
1138 gettimeofday(&ctdb->last_recovery_started, NULL);
1140 state = talloc(ctdb, struct recovery_callback_state);
1141 CTDB_NO_MEMORY(ctdb, state);
1143 state->c = talloc_steal(state, c);
1145 ctdb_disable_monitoring(ctdb);
1147 ret = ctdb_event_script_callback(ctdb, state,
1148 ctdb_start_recovery_callback,
1150 CTDB_EVENT_START_RECOVERY,
1154 DEBUG(DEBUG_ERR,(__location__ " Failed to start recovery\n"));
1159 /* tell the control that we will be reply asynchronously */
1160 *async_reply = true;
1165 try to delete all these records as part of the vacuuming process
1166 and return the records we failed to delete
1168 int32_t ctdb_control_try_delete_records(struct ctdb_context *ctdb, TDB_DATA indata, TDB_DATA *outdata)
1170 struct ctdb_marshall_buffer *reply = (struct ctdb_marshall_buffer *)indata.dptr;
1171 struct ctdb_db_context *ctdb_db;
1173 struct ctdb_rec_data_old *rec;
1174 struct ctdb_marshall_buffer *records;
1176 if (indata.dsize < offsetof(struct ctdb_marshall_buffer, data)) {
1177 DEBUG(DEBUG_ERR,(__location__ " invalid data in try_delete_records\n"));
1181 ctdb_db = find_ctdb_db(ctdb, reply->db_id);
1183 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", reply->db_id));
1188 DEBUG(DEBUG_DEBUG,("starting try_delete_records of %u records for dbid 0x%x\n",
1189 reply->count, reply->db_id));
1192 /* create a blob to send back the records we couldnt delete */
1193 records = (struct ctdb_marshall_buffer *)
1194 talloc_zero_size(outdata,
1195 offsetof(struct ctdb_marshall_buffer, data));
1196 if (records == NULL) {
1197 DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
1200 records->db_id = ctdb_db->db_id;
1203 rec = (struct ctdb_rec_data_old *)&reply->data[0];
1204 for (i=0;i<reply->count;i++) {
1207 key.dptr = &rec->data[0];
1208 key.dsize = rec->keylen;
1209 data.dptr = &rec->data[key.dsize];
1210 data.dsize = rec->datalen;
1212 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
1213 DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record in indata\n"));
1217 /* If we cant delete the record we must add it to the reply
1218 so the lmaster knows it may not purge this record
1220 if (delete_tdb_record(ctdb, ctdb_db, rec) != 0) {
1222 struct ctdb_ltdb_header *hdr;
1224 hdr = (struct ctdb_ltdb_header *)data.dptr;
1225 data.dptr += sizeof(*hdr);
1226 data.dsize -= sizeof(*hdr);
1228 DEBUG(DEBUG_INFO, (__location__ " Failed to vacuum delete record with hash 0x%08x\n", ctdb_hash(&key)));
1230 old_size = talloc_get_size(records);
1231 records = talloc_realloc_size(outdata, records, old_size + rec->length);
1232 if (records == NULL) {
1233 DEBUG(DEBUG_ERR,(__location__ " Failed to expand\n"));
1237 memcpy(old_size+(uint8_t *)records, rec, rec->length);
1240 rec = (struct ctdb_rec_data_old *)(rec->length + (uint8_t *)rec);
1244 *outdata = ctdb_marshall_finish(records);
1250 * Store a record as part of the vacuum process:
1251 * This is called from the RECEIVE_RECORD control which
1252 * the lmaster uses to send the current empty copy
1253 * to all nodes for storing, before it lets the other
1254 * nodes delete the records in the second phase with
1255 * the TRY_DELETE_RECORDS control.
1257 * Only store if we are not lmaster or dmaster, and our
1258 * rsn is <= the provided rsn. Use non-blocking locks.
1260 * return 0 if the record was successfully stored.
1261 * return !0 if the record still exists in the tdb after returning.
1263 static int store_tdb_record(struct ctdb_context *ctdb,
1264 struct ctdb_db_context *ctdb_db,
1265 struct ctdb_rec_data_old *rec)
1267 TDB_DATA key, data, data2;
1268 struct ctdb_ltdb_header *hdr, *hdr2;
1271 key.dsize = rec->keylen;
1272 key.dptr = &rec->data[0];
1273 data.dsize = rec->datalen;
1274 data.dptr = &rec->data[rec->keylen];
1276 if (ctdb_lmaster(ctdb, &key) == ctdb->pnn) {
1277 DEBUG(DEBUG_INFO, (__location__ " Called store_tdb_record "
1278 "where we are lmaster\n"));
1282 if (data.dsize != sizeof(struct ctdb_ltdb_header)) {
1283 DEBUG(DEBUG_ERR, (__location__ " Bad record size\n"));
1287 hdr = (struct ctdb_ltdb_header *)data.dptr;
1289 /* use a non-blocking lock */
1290 if (tdb_chainlock_nonblock(ctdb_db->ltdb->tdb, key) != 0) {
1291 DEBUG(DEBUG_INFO, (__location__ " Failed to lock chain in non-blocking mode\n"));
1295 data2 = tdb_fetch(ctdb_db->ltdb->tdb, key);
1296 if (data2.dptr == NULL || data2.dsize < sizeof(struct ctdb_ltdb_header)) {
1297 if (tdb_store(ctdb_db->ltdb->tdb, key, data, 0) == -1) {
1298 DEBUG(DEBUG_ERR, (__location__ "Failed to store record\n"));
1302 DEBUG(DEBUG_INFO, (__location__ " Stored record\n"));
1307 hdr2 = (struct ctdb_ltdb_header *)data2.dptr;
1309 if (hdr2->rsn > hdr->rsn) {
1310 DEBUG(DEBUG_INFO, (__location__ " Skipping record with "
1311 "rsn=%llu - called with rsn=%llu\n",
1312 (unsigned long long)hdr2->rsn,
1313 (unsigned long long)hdr->rsn));
1318 /* do not allow vacuuming of records that have readonly flags set. */
1319 if (hdr->flags & CTDB_REC_RO_FLAGS) {
1320 DEBUG(DEBUG_INFO,(__location__ " Skipping record with readonly "
1325 if (hdr2->flags & CTDB_REC_RO_FLAGS) {
1326 DEBUG(DEBUG_INFO,(__location__ " Skipping record with readonly "
1332 if (hdr2->dmaster == ctdb->pnn) {
1333 DEBUG(DEBUG_INFO, (__location__ " Attempted to store record "
1334 "where we are the dmaster\n"));
1339 if (tdb_store(ctdb_db->ltdb->tdb, key, data, 0) != 0) {
1340 DEBUG(DEBUG_INFO,(__location__ " Failed to store record\n"));
1348 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
1356 * Try to store all these records as part of the vacuuming process
1357 * and return the records we failed to store.
1359 int32_t ctdb_control_receive_records(struct ctdb_context *ctdb,
1360 TDB_DATA indata, TDB_DATA *outdata)
1362 struct ctdb_marshall_buffer *reply = (struct ctdb_marshall_buffer *)indata.dptr;
1363 struct ctdb_db_context *ctdb_db;
1365 struct ctdb_rec_data_old *rec;
1366 struct ctdb_marshall_buffer *records;
1368 if (indata.dsize < offsetof(struct ctdb_marshall_buffer, data)) {
1370 (__location__ " invalid data in receive_records\n"));
1374 ctdb_db = find_ctdb_db(ctdb, reply->db_id);
1376 DEBUG(DEBUG_ERR, (__location__ " Unknown db 0x%08x\n",
1381 DEBUG(DEBUG_DEBUG, ("starting receive_records of %u records for "
1382 "dbid 0x%x\n", reply->count, reply->db_id));
1384 /* create a blob to send back the records we could not store */
1385 records = (struct ctdb_marshall_buffer *)
1386 talloc_zero_size(outdata,
1387 offsetof(struct ctdb_marshall_buffer, data));
1388 if (records == NULL) {
1389 DEBUG(DEBUG_ERR, (__location__ " Out of memory\n"));
1392 records->db_id = ctdb_db->db_id;
1394 rec = (struct ctdb_rec_data_old *)&reply->data[0];
1395 for (i=0; i<reply->count; i++) {
1398 key.dptr = &rec->data[0];
1399 key.dsize = rec->keylen;
1400 data.dptr = &rec->data[key.dsize];
1401 data.dsize = rec->datalen;
1403 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
1404 DEBUG(DEBUG_CRIT, (__location__ " bad ltdb record "
1410 * If we can not store the record we must add it to the reply
1411 * so the lmaster knows it may not purge this record.
1413 if (store_tdb_record(ctdb, ctdb_db, rec) != 0) {
1415 struct ctdb_ltdb_header *hdr;
1417 hdr = (struct ctdb_ltdb_header *)data.dptr;
1418 data.dptr += sizeof(*hdr);
1419 data.dsize -= sizeof(*hdr);
1421 DEBUG(DEBUG_INFO, (__location__ " Failed to store "
1422 "record with hash 0x%08x in vacuum "
1423 "via RECEIVE_RECORDS\n",
1426 old_size = talloc_get_size(records);
1427 records = talloc_realloc_size(outdata, records,
1428 old_size + rec->length);
1429 if (records == NULL) {
1430 DEBUG(DEBUG_ERR, (__location__ " Failed to "
1435 memcpy(old_size+(uint8_t *)records, rec, rec->length);
1438 rec = (struct ctdb_rec_data_old *)(rec->length + (uint8_t *)rec);
1441 *outdata = ctdb_marshall_finish(records);
1450 int32_t ctdb_control_get_capabilities(struct ctdb_context *ctdb, TDB_DATA *outdata)
1452 uint32_t *capabilities = NULL;
1454 capabilities = talloc(outdata, uint32_t);
1455 CTDB_NO_MEMORY(ctdb, capabilities);
1456 *capabilities = ctdb->capabilities;
1458 outdata->dsize = sizeof(uint32_t);
1459 outdata->dptr = (uint8_t *)capabilities;
1464 /* The recovery daemon will ping us at regular intervals.
1465 If we havent been pinged for a while we assume the recovery
1466 daemon is inoperable and we restart.
1468 static void ctdb_recd_ping_timeout(struct tevent_context *ev,
1469 struct tevent_timer *te,
1470 struct timeval t, void *p)
1472 struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context);
1473 uint32_t *count = talloc_get_type(ctdb->recd_ping_count, uint32_t);
1475 DEBUG(DEBUG_ERR, ("Recovery daemon ping timeout. Count : %u\n", *count));
1477 if (*count < ctdb->tunable.recd_ping_failcount) {
1479 tevent_add_timer(ctdb->ev, ctdb->recd_ping_count,
1480 timeval_current_ofs(ctdb->tunable.recd_ping_timeout, 0),
1481 ctdb_recd_ping_timeout, ctdb);
1485 DEBUG(DEBUG_ERR, ("Final timeout for recovery daemon ping. Restarting recovery daemon. (This can be caused if the cluster filesystem has hung)\n"));
1487 ctdb_stop_recoverd(ctdb);
1488 ctdb_start_recoverd(ctdb);
1491 int32_t ctdb_control_recd_ping(struct ctdb_context *ctdb)
1493 talloc_free(ctdb->recd_ping_count);
1495 ctdb->recd_ping_count = talloc_zero(ctdb, uint32_t);
1496 CTDB_NO_MEMORY(ctdb, ctdb->recd_ping_count);
1498 if (ctdb->tunable.recd_ping_timeout != 0) {
1499 tevent_add_timer(ctdb->ev, ctdb->recd_ping_count,
1500 timeval_current_ofs(ctdb->tunable.recd_ping_timeout, 0),
1501 ctdb_recd_ping_timeout, ctdb);
1509 int32_t ctdb_control_set_recmaster(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata)
1511 uint32_t new_recmaster;
1513 CHECK_CONTROL_DATA_SIZE(sizeof(uint32_t));
1514 new_recmaster = ((uint32_t *)(&indata.dptr[0]))[0];
1516 if (ctdb->pnn != new_recmaster && ctdb->recovery_master == ctdb->pnn) {
1518 ("This node (%u) is no longer the recovery master\n", ctdb->pnn));
1521 if (ctdb->pnn == new_recmaster && ctdb->recovery_master != new_recmaster) {
1523 ("This node (%u) is now the recovery master\n", ctdb->pnn));
1526 ctdb->recovery_master = new_recmaster;
1531 int32_t ctdb_control_stop_node(struct ctdb_context *ctdb)
1533 DEBUG(DEBUG_NOTICE, ("Stopping node\n"));
1534 ctdb_disable_monitoring(ctdb);
1535 ctdb->nodes[ctdb->pnn]->flags |= NODE_FLAGS_STOPPED;
1540 int32_t ctdb_control_continue_node(struct ctdb_context *ctdb)
1542 DEBUG(DEBUG_NOTICE, ("Continue node\n"));
1543 ctdb->nodes[ctdb->pnn]->flags &= ~NODE_FLAGS_STOPPED;