4 Copyright (C) Andrew Tridgell 2007
5 Copyright (C) Ronnie Sahlberg 2007
7 This program is free software; you can redistribute it and/or modify
8 it under the terms of the GNU General Public License as published by
9 the Free Software Foundation; either version 3 of the License, or
10 (at your option) any later version.
12 This program is distributed in the hope that it will be useful,
13 but WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 GNU General Public License for more details.
17 You should have received a copy of the GNU General Public License
18 along with this program; if not, see <http://www.gnu.org/licenses/>.
21 #include "system/time.h"
22 #include "system/network.h"
23 #include "system/filesys.h"
24 #include "system/wait.h"
30 #include "lib/tdb_wrap/tdb_wrap.h"
31 #include "lib/util/dlinklist.h"
32 #include "lib/util/debug.h"
33 #include "lib/util/time.h"
34 #include "lib/util/util_process.h"
35 #include "lib/util/strv.h"
36 #include "lib/util/strv_util.h"
38 #include "ctdb_private.h"
39 #include "ctdb_client.h"
41 #include "common/system.h"
42 #include "common/common.h"
43 #include "common/logging.h"
46 ctdb_control_getvnnmap(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
48 struct ctdb_vnn_map_wire *map;
51 CHECK_CONTROL_DATA_SIZE(0);
53 len = offsetof(struct ctdb_vnn_map_wire, map) + sizeof(uint32_t)*ctdb->vnn_map->size;
54 map = talloc_size(outdata, len);
55 CTDB_NO_MEMORY(ctdb, map);
57 map->generation = ctdb->vnn_map->generation;
58 map->size = ctdb->vnn_map->size;
59 memcpy(map->map, ctdb->vnn_map->map, sizeof(uint32_t)*map->size);
62 outdata->dptr = (uint8_t *)map;
68 ctdb_control_setvnnmap(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
70 struct ctdb_vnn_map_wire *map = (struct ctdb_vnn_map_wire *)indata.dptr;
72 if (ctdb->recovery_mode != CTDB_RECOVERY_ACTIVE) {
73 DEBUG(DEBUG_ERR, ("Attempt to set vnnmap when not in recovery\n"));
77 talloc_free(ctdb->vnn_map);
79 ctdb->vnn_map = talloc(ctdb, struct ctdb_vnn_map);
80 CTDB_NO_MEMORY(ctdb, ctdb->vnn_map);
82 ctdb->vnn_map->generation = map->generation;
83 ctdb->vnn_map->size = map->size;
84 ctdb->vnn_map->map = talloc_array(ctdb->vnn_map, uint32_t, map->size);
85 CTDB_NO_MEMORY(ctdb, ctdb->vnn_map->map);
87 memcpy(ctdb->vnn_map->map, map->map, sizeof(uint32_t)*map->size);
93 ctdb_control_getdbmap(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
96 struct ctdb_db_context *ctdb_db;
97 struct ctdb_dbid_map_old *dbid_map;
99 CHECK_CONTROL_DATA_SIZE(0);
102 for(ctdb_db=ctdb->db_list;ctdb_db;ctdb_db=ctdb_db->next){
107 outdata->dsize = offsetof(struct ctdb_dbid_map_old, dbs) + sizeof(dbid_map->dbs[0])*len;
108 outdata->dptr = (unsigned char *)talloc_zero_size(outdata, outdata->dsize);
109 if (!outdata->dptr) {
110 DEBUG(DEBUG_ALERT, (__location__ " Failed to allocate dbmap array\n"));
114 dbid_map = (struct ctdb_dbid_map_old *)outdata->dptr;
116 for (i=0,ctdb_db=ctdb->db_list;ctdb_db;i++,ctdb_db=ctdb_db->next){
117 dbid_map->dbs[i].db_id = ctdb_db->db_id;
118 if (ctdb_db->persistent != 0) {
119 dbid_map->dbs[i].flags |= CTDB_DB_FLAGS_PERSISTENT;
121 if (ctdb_db->readonly != 0) {
122 dbid_map->dbs[i].flags |= CTDB_DB_FLAGS_READONLY;
124 if (ctdb_db->sticky != 0) {
125 dbid_map->dbs[i].flags |= CTDB_DB_FLAGS_STICKY;
133 ctdb_control_getnodemap(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
135 CHECK_CONTROL_DATA_SIZE(0);
137 outdata->dptr = (unsigned char *)ctdb_node_list_to_map(ctdb->nodes,
140 if (outdata->dptr == NULL) {
144 outdata->dsize = talloc_get_size(outdata->dptr);
150 reload the nodes file
153 ctdb_control_reload_nodes_file(struct ctdb_context *ctdb, uint32_t opcode)
157 struct ctdb_node **nodes;
159 tmp_ctx = talloc_new(ctdb);
161 /* steal the old nodes file for a while */
162 talloc_steal(tmp_ctx, ctdb->nodes);
165 num_nodes = ctdb->num_nodes;
168 /* load the new nodes file */
169 ctdb_load_nodes_file(ctdb);
171 for (i=0; i<ctdb->num_nodes; i++) {
172 /* keep any identical pre-existing nodes and connections */
173 if ((i < num_nodes) && ctdb_same_address(&ctdb->nodes[i]->address, &nodes[i]->address)) {
174 talloc_free(ctdb->nodes[i]);
175 ctdb->nodes[i] = talloc_steal(ctdb->nodes, nodes[i]);
179 if (ctdb->nodes[i]->flags & NODE_FLAGS_DELETED) {
183 /* any new or different nodes must be added */
184 if (ctdb->methods->add_node(ctdb->nodes[i]) != 0) {
185 DEBUG(DEBUG_CRIT, (__location__ " methods->add_node failed at %d\n", i));
186 ctdb_fatal(ctdb, "failed to add node. shutting down\n");
188 if (ctdb->methods->connect_node(ctdb->nodes[i]) != 0) {
189 DEBUG(DEBUG_CRIT, (__location__ " methods->add_connect failed at %d\n", i));
190 ctdb_fatal(ctdb, "failed to connect to node. shutting down\n");
194 /* tell the recovery daemon to reaload the nodes file too */
195 ctdb_daemon_send_message(ctdb, ctdb->pnn, CTDB_SRVID_RELOAD_NODES, tdb_null);
197 talloc_free(tmp_ctx);
203 a traverse function for pulling all relevent records from pulldb
206 struct ctdb_context *ctdb;
207 struct ctdb_db_context *ctdb_db;
208 struct ctdb_marshall_buffer *pulldata;
210 uint32_t allocated_len;
214 static int traverse_pulldb(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p)
216 struct pulldb_data *params = (struct pulldb_data *)p;
217 struct ctdb_rec_data_old *rec;
218 struct ctdb_context *ctdb = params->ctdb;
219 struct ctdb_db_context *ctdb_db = params->ctdb_db;
221 /* add the record to the blob */
222 rec = ctdb_marshall_record(params->pulldata, 0, key, NULL, data);
224 params->failed = true;
227 if (params->len + rec->length >= params->allocated_len) {
228 params->allocated_len = rec->length + params->len + ctdb->tunable.pulldb_preallocation_size;
229 params->pulldata = talloc_realloc_size(NULL, params->pulldata, params->allocated_len);
231 if (params->pulldata == NULL) {
232 DEBUG(DEBUG_CRIT,(__location__ " Failed to expand pulldb_data to %u\n", rec->length + params->len));
233 ctdb_fatal(params->ctdb, "failed to allocate memory for recovery. shutting down\n");
235 params->pulldata->count++;
236 memcpy(params->len+(uint8_t *)params->pulldata, rec, rec->length);
237 params->len += rec->length;
239 if (ctdb->tunable.db_record_size_warn != 0 && rec->length > ctdb->tunable.db_record_size_warn) {
240 DEBUG(DEBUG_ERR,("Data record in %s is big. Record size is %d bytes\n", ctdb_db->db_name, (int)rec->length));
249 pull a bunch of records from a ltdb, filtering by lmaster
251 int32_t ctdb_control_pull_db(struct ctdb_context *ctdb, TDB_DATA indata, TDB_DATA *outdata)
253 struct ctdb_pulldb *pull;
254 struct ctdb_db_context *ctdb_db;
255 struct pulldb_data params;
256 struct ctdb_marshall_buffer *reply;
258 pull = (struct ctdb_pulldb *)indata.dptr;
260 ctdb_db = find_ctdb_db(ctdb, pull->db_id);
262 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", pull->db_id));
266 if (!ctdb_db_frozen(ctdb_db)) {
268 ("rejecting ctdb_control_pull_db when not frozen\n"));
272 reply = talloc_zero(outdata, struct ctdb_marshall_buffer);
273 CTDB_NO_MEMORY(ctdb, reply);
275 reply->db_id = pull->db_id;
278 params.ctdb_db = ctdb_db;
279 params.pulldata = reply;
280 params.len = offsetof(struct ctdb_marshall_buffer, data);
281 params.allocated_len = params.len;
282 params.failed = false;
284 if (ctdb_db->unhealthy_reason) {
285 /* this is just a warning, as the tdb should be empty anyway */
286 DEBUG(DEBUG_WARNING,("db(%s) unhealty in ctdb_control_pull_db: %s\n",
287 ctdb_db->db_name, ctdb_db->unhealthy_reason));
290 if (ctdb_lockdb_mark(ctdb_db) != 0) {
291 DEBUG(DEBUG_ERR,(__location__ " Failed to get lock on entire db - failing\n"));
295 if (tdb_traverse_read(ctdb_db->ltdb->tdb, traverse_pulldb, ¶ms) == -1) {
296 DEBUG(DEBUG_ERR,(__location__ " Failed to get traverse db '%s'\n", ctdb_db->db_name));
297 ctdb_lockdb_unmark(ctdb_db);
298 talloc_free(params.pulldata);
302 ctdb_lockdb_unmark(ctdb_db);
304 outdata->dptr = (uint8_t *)params.pulldata;
305 outdata->dsize = params.len;
307 if (ctdb->tunable.db_record_count_warn != 0 && params.pulldata->count > ctdb->tunable.db_record_count_warn) {
308 DEBUG(DEBUG_ERR,("Database %s is big. Contains %d records\n", ctdb_db->db_name, params.pulldata->count));
310 if (ctdb->tunable.db_size_warn != 0 && outdata->dsize > ctdb->tunable.db_size_warn) {
311 DEBUG(DEBUG_ERR,("Database %s is big. Contains %d bytes\n", ctdb_db->db_name, (int)outdata->dsize));
318 struct db_pull_state {
319 struct ctdb_context *ctdb;
320 struct ctdb_db_context *ctdb_db;
321 struct ctdb_marshall_buffer *recs;
324 uint32_t num_records;
327 static int traverse_db_pull(struct tdb_context *tdb, TDB_DATA key,
328 TDB_DATA data, void *private_data)
330 struct db_pull_state *state = (struct db_pull_state *)private_data;
331 struct ctdb_marshall_buffer *recs;
333 recs = ctdb_marshall_add(state->ctdb, state->recs,
334 state->ctdb_db->db_id, 0, key, NULL, data);
336 TALLOC_FREE(state->recs);
341 if (talloc_get_size(state->recs) >=
342 state->ctdb->tunable.rec_buffer_size_limit) {
346 buffer = ctdb_marshall_finish(state->recs);
347 ret = ctdb_daemon_send_message(state->ctdb, state->pnn,
348 state->srvid, buffer);
350 TALLOC_FREE(state->recs);
354 state->num_records += state->recs->count;
355 TALLOC_FREE(state->recs);
361 int32_t ctdb_control_db_pull(struct ctdb_context *ctdb,
362 struct ctdb_req_control_old *c,
363 TDB_DATA indata, TDB_DATA *outdata)
365 struct ctdb_pulldb_ext *pulldb_ext;
366 struct ctdb_db_context *ctdb_db;
367 struct db_pull_state state;
370 pulldb_ext = (struct ctdb_pulldb_ext *)indata.dptr;
372 ctdb_db = find_ctdb_db(ctdb, pulldb_ext->db_id);
373 if (ctdb_db == NULL) {
374 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n",
379 if (!ctdb_db_frozen(ctdb_db)) {
381 ("rejecting ctdb_control_pull_db when not frozen\n"));
385 if (ctdb_db->unhealthy_reason) {
386 /* this is just a warning, as the tdb should be empty anyway */
388 ("db(%s) unhealty in ctdb_control_db_pull: %s\n",
389 ctdb_db->db_name, ctdb_db->unhealthy_reason));
393 state.ctdb_db = ctdb_db;
395 state.pnn = c->hdr.srcnode;
396 state.srvid = pulldb_ext->srvid;
397 state.num_records = 0;
399 if (ctdb_lockdb_mark(ctdb_db) != 0) {
401 (__location__ " Failed to get lock on entire db - failing\n"));
405 ret = tdb_traverse_read(ctdb_db->ltdb->tdb, traverse_db_pull, &state);
408 (__location__ " Failed to get traverse db '%s'\n",
410 ctdb_lockdb_unmark(ctdb_db);
414 /* Last few records */
415 if (state.recs != NULL) {
418 buffer = ctdb_marshall_finish(state.recs);
419 ret = ctdb_daemon_send_message(state.ctdb, state.pnn,
420 state.srvid, buffer);
422 TALLOC_FREE(state.recs);
423 ctdb_lockdb_unmark(ctdb_db);
427 state.num_records += state.recs->count;
428 TALLOC_FREE(state.recs);
431 ctdb_lockdb_unmark(ctdb_db);
433 outdata->dptr = talloc_size(outdata, sizeof(uint32_t));
434 if (outdata->dptr == NULL) {
435 DEBUG(DEBUG_ERR, (__location__ " Memory allocation error\n"));
439 memcpy(outdata->dptr, (uint8_t *)&state.num_records, sizeof(uint32_t));
440 outdata->dsize = sizeof(uint32_t);
446 push a bunch of records into a ltdb, filtering by rsn
448 int32_t ctdb_control_push_db(struct ctdb_context *ctdb, TDB_DATA indata)
450 struct ctdb_marshall_buffer *reply = (struct ctdb_marshall_buffer *)indata.dptr;
451 struct ctdb_db_context *ctdb_db;
453 struct ctdb_rec_data_old *rec;
455 if (indata.dsize < offsetof(struct ctdb_marshall_buffer, data)) {
456 DEBUG(DEBUG_ERR,(__location__ " invalid data in pulldb reply\n"));
460 ctdb_db = find_ctdb_db(ctdb, reply->db_id);
462 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", reply->db_id));
466 if (!ctdb_db_frozen(ctdb_db)) {
468 ("rejecting ctdb_control_push_db when not frozen\n"));
472 if (ctdb_lockdb_mark(ctdb_db) != 0) {
473 DEBUG(DEBUG_ERR,(__location__ " Failed to get lock on entire db - failing\n"));
477 rec = (struct ctdb_rec_data_old *)&reply->data[0];
479 DEBUG(DEBUG_INFO,("starting push of %u records for dbid 0x%x\n",
480 reply->count, reply->db_id));
482 for (i=0;i<reply->count;i++) {
484 struct ctdb_ltdb_header *hdr;
486 key.dptr = &rec->data[0];
487 key.dsize = rec->keylen;
488 data.dptr = &rec->data[key.dsize];
489 data.dsize = rec->datalen;
491 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
492 DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record\n"));
495 hdr = (struct ctdb_ltdb_header *)data.dptr;
496 /* strip off any read only record flags. All readonly records
497 are revoked implicitely by a recovery
499 hdr->flags &= ~CTDB_REC_RO_FLAGS;
501 data.dptr += sizeof(*hdr);
502 data.dsize -= sizeof(*hdr);
504 ret = ctdb_ltdb_store(ctdb_db, key, hdr, data);
506 DEBUG(DEBUG_CRIT, (__location__ " Unable to store record\n"));
510 rec = (struct ctdb_rec_data_old *)(rec->length + (uint8_t *)rec);
513 DEBUG(DEBUG_DEBUG,("finished push of %u records for dbid 0x%x\n",
514 reply->count, reply->db_id));
516 if (ctdb_db->readonly) {
517 DEBUG(DEBUG_CRIT,("Clearing the tracking database for dbid 0x%x\n",
519 if (tdb_wipe_all(ctdb_db->rottdb) != 0) {
520 DEBUG(DEBUG_ERR,("Failed to wipe tracking database for 0x%x. Dropping read-only delegation support\n", ctdb_db->db_id));
521 ctdb_db->readonly = false;
522 tdb_close(ctdb_db->rottdb);
523 ctdb_db->rottdb = NULL;
524 ctdb_db->readonly = false;
526 while (ctdb_db->revokechild_active != NULL) {
527 talloc_free(ctdb_db->revokechild_active);
531 ctdb_lockdb_unmark(ctdb_db);
535 ctdb_lockdb_unmark(ctdb_db);
539 struct db_push_state {
540 struct ctdb_context *ctdb;
541 struct ctdb_db_context *ctdb_db;
543 uint32_t num_records;
547 static void db_push_msg_handler(uint64_t srvid, TDB_DATA indata,
550 struct db_push_state *state = talloc_get_type(
551 private_data, struct db_push_state);
552 struct ctdb_marshall_buffer *recs;
553 struct ctdb_rec_data_old *rec;
560 recs = (struct ctdb_marshall_buffer *)indata.dptr;
561 rec = (struct ctdb_rec_data_old *)&recs->data[0];
563 DEBUG(DEBUG_INFO, ("starting push of %u records for dbid 0x%x\n",
564 recs->count, recs->db_id));
566 for (i=0; i<recs->count; i++) {
568 struct ctdb_ltdb_header *hdr;
570 key.dptr = &rec->data[0];
571 key.dsize = rec->keylen;
572 data.dptr = &rec->data[key.dsize];
573 data.dsize = rec->datalen;
575 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
576 DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record\n"));
580 hdr = (struct ctdb_ltdb_header *)data.dptr;
581 /* Strip off any read only record flags.
582 * All readonly records are revoked implicitely by a recovery.
584 hdr->flags &= ~CTDB_REC_RO_FLAGS;
586 data.dptr += sizeof(*hdr);
587 data.dsize -= sizeof(*hdr);
589 ret = ctdb_ltdb_store(state->ctdb_db, key, hdr, data);
592 (__location__ " Unable to store record\n"));
596 rec = (struct ctdb_rec_data_old *)(rec->length + (uint8_t *)rec);
599 DEBUG(DEBUG_DEBUG, ("finished push of %u records for dbid 0x%x\n",
600 recs->count, recs->db_id));
602 state->num_records += recs->count;
606 state->failed = true;
609 int32_t ctdb_control_db_push_start(struct ctdb_context *ctdb, TDB_DATA indata)
611 struct ctdb_pulldb_ext *pulldb_ext;
612 struct ctdb_db_context *ctdb_db;
613 struct db_push_state *state;
616 pulldb_ext = (struct ctdb_pulldb_ext *)indata.dptr;
618 ctdb_db = find_ctdb_db(ctdb, pulldb_ext->db_id);
619 if (ctdb_db == NULL) {
621 (__location__ " Unknown db 0x%08x\n", pulldb_ext->db_id));
625 if (!ctdb_db_frozen(ctdb_db)) {
627 ("rejecting ctdb_control_db_push_start when not frozen\n"));
631 if (ctdb_db->push_started) {
633 (__location__ " DB push already started for %s\n",
636 /* De-register old state */
637 state = (struct db_push_state *)ctdb_db->push_state;
639 srvid_deregister(ctdb->srv, state->srvid, state);
641 ctdb_db->push_state = NULL;
645 state = talloc_zero(ctdb_db, struct db_push_state);
647 DEBUG(DEBUG_ERR, (__location__ " Memory allocation error\n"));
652 state->ctdb_db = ctdb_db;
653 state->srvid = pulldb_ext->srvid;
654 state->failed = false;
656 ret = srvid_register(ctdb->srv, state, state->srvid,
657 db_push_msg_handler, state);
660 (__location__ " Failed to register srvid for db push\n"));
665 if (ctdb_lockdb_mark(ctdb_db) != 0) {
667 (__location__ " Failed to get lock on entire db - failing\n"));
668 srvid_deregister(ctdb->srv, state->srvid, state);
673 ctdb_db->push_started = true;
674 ctdb_db->push_state = state;
679 int32_t ctdb_control_db_push_confirm(struct ctdb_context *ctdb,
680 TDB_DATA indata, TDB_DATA *outdata)
683 struct ctdb_db_context *ctdb_db;
684 struct db_push_state *state;
686 db_id = *(uint32_t *)indata.dptr;
688 ctdb_db = find_ctdb_db(ctdb, db_id);
689 if (ctdb_db == NULL) {
690 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", db_id));
694 if (!ctdb_db_frozen(ctdb_db)) {
696 ("rejecting ctdb_control_db_push_confirm when not frozen\n"));
700 if (!ctdb_db->push_started) {
701 DEBUG(DEBUG_ERR, (__location__ " DB push not started\n"));
705 if (ctdb_db->readonly) {
707 ("Clearing the tracking database for dbid 0x%x\n",
709 if (tdb_wipe_all(ctdb_db->rottdb) != 0) {
711 ("Failed to wipe tracking database for 0x%x."
712 " Dropping read-only delegation support\n",
714 ctdb_db->readonly = false;
715 tdb_close(ctdb_db->rottdb);
716 ctdb_db->rottdb = NULL;
717 ctdb_db->readonly = false;
720 while (ctdb_db->revokechild_active != NULL) {
721 talloc_free(ctdb_db->revokechild_active);
725 ctdb_lockdb_unmark(ctdb_db);
727 state = (struct db_push_state *)ctdb_db->push_state;
729 DEBUG(DEBUG_ERR, (__location__ " Missing push db state\n"));
733 srvid_deregister(ctdb->srv, state->srvid, state);
735 outdata->dptr = talloc_size(outdata, sizeof(uint32_t));
736 if (outdata->dptr == NULL) {
737 DEBUG(DEBUG_ERR, (__location__ " Memory allocation error\n"));
739 ctdb_db->push_state = NULL;
743 memcpy(outdata->dptr, (uint8_t *)&state->num_records, sizeof(uint32_t));
744 outdata->dsize = sizeof(uint32_t);
747 ctdb_db->push_state = NULL;
752 struct ctdb_cluster_mutex_handle;
753 typedef void (*cluster_mutex_handler_t) (
754 struct ctdb_context *ctdb,
757 struct ctdb_cluster_mutex_handle *h,
760 struct ctdb_cluster_mutex_handle {
761 struct ctdb_context *ctdb;
762 cluster_mutex_handler_t handler;
765 struct tevent_timer *te;
766 struct tevent_fd *fde;
768 struct timeval start_time;
771 static void set_recmode_handler(struct ctdb_context *ctdb,
774 struct ctdb_cluster_mutex_handle *h,
777 /* It would be good to use talloc_get_type() here. However,
778 * the name of the packet is manually set - not sure why.
779 * Could use talloc_check_name() but this seems like a lot of
780 * manual overkill. */
781 struct ctdb_req_control_old *c =
782 (struct ctdb_req_control_old *) private_data;
784 const char *err = NULL;
790 ("ERROR: Daemon able to take recovery lock on \"%s\" during recovery\n",
791 ctdb->recovery_lock_file));
793 err = "Took recovery lock from daemon during recovery - probably a cluster filesystem lock coherence problem";
798 DEBUG(DEBUG_DEBUG, (__location__ " Recovery lock check OK\n"));
799 ctdb->recovery_mode = CTDB_RECOVERY_NORMAL;
800 ctdb_process_deferred_attach(ctdb);
804 CTDB_UPDATE_RECLOCK_LATENCY(ctdb, "daemon reclock",
805 reclock.ctdbd, latency);
809 /* Timeout. Consider this a success, not a failure,
810 * as we failed to set the recovery lock which is what
811 * we wanted. This can be caused by the cluster
812 * filesystem being very slow to arbitrate locks
813 * immediately after a node failure. */
816 "Time out getting recovery lock, allowing recmode set anyway\n"));
817 ctdb->recovery_mode = CTDB_RECOVERY_NORMAL;
818 ctdb_process_deferred_attach(ctdb);
825 ("Unexpected error when testing recovery lock\n"));
827 err = "Unexpected error when testing recovery lock";
830 ctdb_request_control_reply(ctdb, c, NULL, s, err);
835 called if our set_recmode child times out. this would happen if
836 ctdb_recovery_lock() would block.
838 static void cluster_mutex_timeout(struct tevent_context *ev,
839 struct tevent_timer *te,
840 struct timeval t, void *private_data)
842 struct ctdb_cluster_mutex_handle *h =
843 talloc_get_type(private_data, struct ctdb_cluster_mutex_handle);
844 double latency = timeval_elapsed(&h->start_time);
846 if (h->handler != NULL) {
847 h->handler(h->ctdb, '2', latency, h, h->private_data);
852 /* When the handle is freed it causes any child holding the mutex to
853 * be killed, thus freeing the mutex */
854 static int cluster_mutex_destructor(struct ctdb_cluster_mutex_handle *h)
856 if (h->fd[0] != -1) {
859 ctdb_kill(h->ctdb, h->child, SIGTERM);
863 /* this is called when the client process has completed ctdb_recovery_lock()
864 and has written data back to us through the pipe.
866 static void cluster_mutex_handler(struct tevent_context *ev,
867 struct tevent_fd *fde,
868 uint16_t flags, void *private_data)
870 struct ctdb_cluster_mutex_handle *h=
871 talloc_get_type(private_data, struct ctdb_cluster_mutex_handle);
872 double latency = timeval_elapsed(&h->start_time);
876 /* Got response from child process so abort timeout */
879 ret = sys_read(h->fd[0], &c, 1);
881 /* If the child wrote status then just pass it to the handler.
882 * If no status was written then this is an unexpected error
883 * so pass generic error code to handler. */
884 if (h->handler != NULL) {
885 h->handler(h->ctdb, ret == 1 ? c : '3', latency,
891 ctdb_drop_all_ips_event(struct tevent_context *ev, struct tevent_timer *te,
892 struct timeval t, void *private_data)
894 struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
896 DEBUG(DEBUG_ERR,(__location__ " Been in recovery mode for too long. Dropping all IPS\n"));
897 talloc_free(ctdb->release_ips_ctx);
898 ctdb->release_ips_ctx = NULL;
900 ctdb_release_all_ips(ctdb);
903 static char cluster_mutex_helper[PATH_MAX+1] = "";
905 static bool cluster_mutex_helper_args(TALLOC_CTX *mem_ctx,
906 const char *argstring, char ***argv)
908 int nargs, i, ret, n;
913 ret = strv_split(mem_ctx, &strv, argstring, " \t");
916 ("Unable to parse mutex helper string \"%s\" (%s)\n",
917 argstring, strerror(ret)));
920 n = strv_count(strv);
922 args = talloc_array(mem_ctx, char *, n + 2);
924 DEBUG(DEBUG_ERR,(__location__ " out of memory\n"));
930 if (!ctdb_set_helper("cluster mutex helper",
931 cluster_mutex_helper,
932 sizeof(cluster_mutex_helper),
933 "CTDB_CLUSTER_MUTEX_HELPER",
934 CTDB_HELPER_BINDIR, "ctdb_mutex_fcntl_helper")) {
935 DEBUG(DEBUG_ERR,("ctdb exiting with error: %s\n",
937 " Unable to set cluster mutex helper\n"));
940 args[nargs++] = cluster_mutex_helper;
943 for (i = 0; i < n; i++) {
944 /* Don't copy, just keep cmd_args around */
945 t = strv_next(strv, t);
949 /* Make sure last argument is NULL */
956 static struct ctdb_cluster_mutex_handle *
957 ctdb_cluster_mutex(struct ctdb_context *ctdb, int timeout)
959 struct ctdb_cluster_mutex_handle *h;
963 h = talloc(ctdb, struct ctdb_cluster_mutex_handle);
965 DEBUG(DEBUG_ERR, (__location__ " out of memory\n"));
969 h->start_time = timeval_current();
976 DEBUG(DEBUG_ERR, (__location__ " Failed to open pipe\n"));
979 set_close_on_exec(h->fd[0]);
981 /* Create arguments for lock helper */
982 if (!cluster_mutex_helper_args(h, ctdb->recovery_lock_file, &args)) {
989 h->child = ctdb_fork(ctdb);
990 if (h->child == (pid_t)-1) {
998 /* Make stdout point to the pipe */
999 close(STDOUT_FILENO);
1000 dup2(h->fd[1], STDOUT_FILENO);
1003 execv(args[0], args);
1005 /* Only happens on error */
1006 DEBUG(DEBUG_ERR, (__location__ "execv() failed\n"));
1012 DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d\n", h->fd[0]));
1013 set_close_on_exec(h->fd[0]);
1018 talloc_set_destructor(h, cluster_mutex_destructor);
1021 h->te = tevent_add_timer(ctdb->ev, h,
1022 timeval_current_ofs(timeout, 0),
1023 cluster_mutex_timeout, h);
1028 h->fde = tevent_add_fd(ctdb->ev, h, h->fd[0], TEVENT_FD_READ,
1029 cluster_mutex_handler, (void *)h);
1031 if (h->fde == NULL) {
1035 tevent_fd_set_auto_close(h->fde);
1039 h->private_data = NULL;
1046 * Set up an event to drop all public ips if we remain in recovery for too
1049 int ctdb_deferred_drop_all_ips(struct ctdb_context *ctdb)
1051 if (ctdb->release_ips_ctx != NULL) {
1052 talloc_free(ctdb->release_ips_ctx);
1054 ctdb->release_ips_ctx = talloc_new(ctdb);
1055 CTDB_NO_MEMORY(ctdb, ctdb->release_ips_ctx);
1057 tevent_add_timer(ctdb->ev, ctdb->release_ips_ctx,
1058 timeval_current_ofs(ctdb->tunable.recovery_drop_all_ips, 0),
1059 ctdb_drop_all_ips_event, ctdb);
1064 set the recovery mode
1066 int32_t ctdb_control_set_recmode(struct ctdb_context *ctdb,
1067 struct ctdb_req_control_old *c,
1068 TDB_DATA indata, bool *async_reply,
1069 const char **errormsg)
1071 uint32_t recmode = *(uint32_t *)indata.dptr;
1073 struct ctdb_db_context *ctdb_db;
1074 struct ctdb_cluster_mutex_handle *h;
1076 /* if we enter recovery but stay in recovery for too long
1077 we will eventually drop all our ip addresses
1079 if (recmode == CTDB_RECOVERY_NORMAL) {
1080 talloc_free(ctdb->release_ips_ctx);
1081 ctdb->release_ips_ctx = NULL;
1083 if (ctdb_deferred_drop_all_ips(ctdb) != 0) {
1084 DEBUG(DEBUG_ERR,("Failed to set up deferred drop all ips\n"));
1088 if (recmode != ctdb->recovery_mode) {
1089 DEBUG(DEBUG_NOTICE,(__location__ " Recovery mode set to %s\n",
1090 recmode==CTDB_RECOVERY_NORMAL?"NORMAL":"ACTIVE"));
1093 if (recmode != CTDB_RECOVERY_NORMAL ||
1094 ctdb->recovery_mode != CTDB_RECOVERY_ACTIVE) {
1095 ctdb->recovery_mode = recmode;
1099 /* From this point: recmode == CTDB_RECOVERY_NORMAL
1101 * Therefore, what follows is special handling when setting
1102 * recovery mode back to normal */
1104 for (ctdb_db = ctdb->db_list; ctdb_db != NULL; ctdb_db = ctdb_db->next) {
1105 if (ctdb_db->generation != ctdb->vnn_map->generation) {
1107 ("Inconsistent DB generation %u for %s\n",
1108 ctdb_db->generation, ctdb_db->db_name));
1109 DEBUG(DEBUG_ERR, ("Recovery mode set to ACTIVE\n"));
1114 /* force the databases to thaw */
1115 for (i=1; i<=NUM_DB_PRIORITIES; i++) {
1116 if (ctdb_db_prio_frozen(ctdb, i)) {
1117 ctdb_control_thaw(ctdb, i, false);
1121 /* release any deferred attach calls from clients */
1122 if (recmode == CTDB_RECOVERY_NORMAL) {
1123 ctdb_process_deferred_attach(ctdb);
1126 if (ctdb->recovery_lock_file == NULL) {
1127 /* Not using recovery lock file */
1128 ctdb->recovery_mode = recmode;
1132 h = ctdb_cluster_mutex(ctdb, 5);
1137 /* set_recmode_handler() frees h */
1138 h->handler = set_recmode_handler;
1139 h->private_data = talloc_steal(h, c);
1141 *async_reply = true;
1147 bool ctdb_recovery_have_lock(struct ctdb_context *ctdb)
1149 return (ctdb->recovery_lock_handle != NULL);
1152 struct hold_reclock_state {
1157 static void hold_reclock_handler(struct ctdb_context *ctdb,
1160 struct ctdb_cluster_mutex_handle *h,
1163 struct hold_reclock_state *s =
1164 (struct hold_reclock_state *) private_data;
1168 ctdb->recovery_lock_handle = h;
1173 ("Unable to take recovery lock - contention\n"));
1178 DEBUG(DEBUG_ERR, ("ERROR: when taking recovery lock\n"));
1186 bool ctdb_recovery_lock(struct ctdb_context *ctdb)
1188 struct ctdb_cluster_mutex_handle *h;
1189 struct hold_reclock_state s = {
1194 h = ctdb_cluster_mutex(ctdb, 0);
1199 h->handler = hold_reclock_handler;
1200 h->private_data = &s;
1203 tevent_loop_once(ctdb->ev);
1206 h->private_data = NULL;
1208 return (s.status == '0');
1211 void ctdb_recovery_unlock(struct ctdb_context *ctdb)
1213 if (ctdb->recovery_lock_handle != NULL) {
1214 DEBUG(DEBUG_NOTICE, ("Releasing recovery lock\n"));
1215 TALLOC_FREE(ctdb->recovery_lock_handle);
1220 delete a record as part of the vacuum process
1221 only delete if we are not lmaster or dmaster, and our rsn is <= the provided rsn
1222 use non-blocking locks
1224 return 0 if the record was successfully deleted (i.e. it does not exist
1225 when the function returns)
1226 or !0 is the record still exists in the tdb after returning.
1228 static int delete_tdb_record(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, struct ctdb_rec_data_old *rec)
1230 TDB_DATA key, data, data2;
1231 struct ctdb_ltdb_header *hdr, *hdr2;
1233 /* these are really internal tdb functions - but we need them here for
1234 non-blocking lock of the freelist */
1235 int tdb_lock_nonblock(struct tdb_context *tdb, int list, int ltype);
1236 int tdb_unlock(struct tdb_context *tdb, int list, int ltype);
1239 key.dsize = rec->keylen;
1240 key.dptr = &rec->data[0];
1241 data.dsize = rec->datalen;
1242 data.dptr = &rec->data[rec->keylen];
1244 if (ctdb_lmaster(ctdb, &key) == ctdb->pnn) {
1245 DEBUG(DEBUG_INFO,(__location__ " Called delete on record where we are lmaster\n"));
1249 if (data.dsize != sizeof(struct ctdb_ltdb_header)) {
1250 DEBUG(DEBUG_ERR,(__location__ " Bad record size\n"));
1254 hdr = (struct ctdb_ltdb_header *)data.dptr;
1256 /* use a non-blocking lock */
1257 if (tdb_chainlock_nonblock(ctdb_db->ltdb->tdb, key) != 0) {
1261 data2 = tdb_fetch(ctdb_db->ltdb->tdb, key);
1262 if (data2.dptr == NULL) {
1263 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
1267 if (data2.dsize < sizeof(struct ctdb_ltdb_header)) {
1268 if (tdb_lock_nonblock(ctdb_db->ltdb->tdb, -1, F_WRLCK) == 0) {
1269 if (tdb_delete(ctdb_db->ltdb->tdb, key) != 0) {
1270 DEBUG(DEBUG_CRIT,(__location__ " Failed to delete corrupt record\n"));
1272 tdb_unlock(ctdb_db->ltdb->tdb, -1, F_WRLCK);
1273 DEBUG(DEBUG_CRIT,(__location__ " Deleted corrupt record\n"));
1275 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
1280 hdr2 = (struct ctdb_ltdb_header *)data2.dptr;
1282 if (hdr2->rsn > hdr->rsn) {
1283 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
1284 DEBUG(DEBUG_INFO,(__location__ " Skipping record with rsn=%llu - called with rsn=%llu\n",
1285 (unsigned long long)hdr2->rsn, (unsigned long long)hdr->rsn));
1290 /* do not allow deleting record that have readonly flags set. */
1291 if (hdr->flags & CTDB_REC_RO_FLAGS) {
1292 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
1293 DEBUG(DEBUG_INFO,(__location__ " Skipping record with readonly flags set\n"));
1297 if (hdr2->flags & CTDB_REC_RO_FLAGS) {
1298 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
1299 DEBUG(DEBUG_INFO,(__location__ " Skipping record with readonly flags set\n"));
1304 if (hdr2->dmaster == ctdb->pnn) {
1305 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
1306 DEBUG(DEBUG_INFO,(__location__ " Attempted delete record where we are the dmaster\n"));
1311 if (tdb_lock_nonblock(ctdb_db->ltdb->tdb, -1, F_WRLCK) != 0) {
1312 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
1317 if (tdb_delete(ctdb_db->ltdb->tdb, key) != 0) {
1318 tdb_unlock(ctdb_db->ltdb->tdb, -1, F_WRLCK);
1319 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
1320 DEBUG(DEBUG_INFO,(__location__ " Failed to delete record\n"));
1325 tdb_unlock(ctdb_db->ltdb->tdb, -1, F_WRLCK);
1326 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
1333 struct recovery_callback_state {
1334 struct ctdb_req_control_old *c;
1339 called when the 'recovered' event script has finished
1341 static void ctdb_end_recovery_callback(struct ctdb_context *ctdb, int status, void *p)
1343 struct recovery_callback_state *state = talloc_get_type(p, struct recovery_callback_state);
1345 ctdb_enable_monitoring(ctdb);
1346 CTDB_INCREMENT_STAT(ctdb, num_recoveries);
1349 DEBUG(DEBUG_ERR,(__location__ " recovered event script failed (status %d)\n", status));
1350 if (status == -ETIME) {
1351 ctdb_ban_self(ctdb);
1355 ctdb_request_control_reply(ctdb, state->c, NULL, status, NULL);
1358 gettimeofday(&ctdb->last_recovery_finished, NULL);
1360 if (ctdb->runstate == CTDB_RUNSTATE_FIRST_RECOVERY) {
1361 ctdb_set_runstate(ctdb, CTDB_RUNSTATE_STARTUP);
1366 recovery has finished
1368 int32_t ctdb_control_end_recovery(struct ctdb_context *ctdb,
1369 struct ctdb_req_control_old *c,
1373 struct recovery_callback_state *state;
1375 DEBUG(DEBUG_NOTICE,("Recovery has finished\n"));
1377 ctdb_persistent_finish_trans3_commits(ctdb);
1379 state = talloc(ctdb, struct recovery_callback_state);
1380 CTDB_NO_MEMORY(ctdb, state);
1384 ctdb_disable_monitoring(ctdb);
1386 ret = ctdb_event_script_callback(ctdb, state,
1387 ctdb_end_recovery_callback,
1389 CTDB_EVENT_RECOVERED, "%s", "");
1392 ctdb_enable_monitoring(ctdb);
1394 DEBUG(DEBUG_ERR,(__location__ " Failed to end recovery\n"));
1399 /* tell the control that we will be reply asynchronously */
1400 state->c = talloc_steal(state, c);
1401 *async_reply = true;
1406 called when the 'startrecovery' event script has finished
1408 static void ctdb_start_recovery_callback(struct ctdb_context *ctdb, int status, void *p)
1410 struct recovery_callback_state *state = talloc_get_type(p, struct recovery_callback_state);
1413 DEBUG(DEBUG_ERR,(__location__ " startrecovery event script failed (status %d)\n", status));
1416 ctdb_request_control_reply(ctdb, state->c, NULL, status, NULL);
1421 run the startrecovery eventscript
1423 int32_t ctdb_control_start_recovery(struct ctdb_context *ctdb,
1424 struct ctdb_req_control_old *c,
1428 struct recovery_callback_state *state;
1430 DEBUG(DEBUG_NOTICE,(__location__ " startrecovery eventscript has been invoked\n"));
1431 gettimeofday(&ctdb->last_recovery_started, NULL);
1433 state = talloc(ctdb, struct recovery_callback_state);
1434 CTDB_NO_MEMORY(ctdb, state);
1436 state->c = talloc_steal(state, c);
1438 ctdb_disable_monitoring(ctdb);
1440 ret = ctdb_event_script_callback(ctdb, state,
1441 ctdb_start_recovery_callback,
1443 CTDB_EVENT_START_RECOVERY,
1447 DEBUG(DEBUG_ERR,(__location__ " Failed to start recovery\n"));
1452 /* tell the control that we will be reply asynchronously */
1453 *async_reply = true;
1458 try to delete all these records as part of the vacuuming process
1459 and return the records we failed to delete
1461 int32_t ctdb_control_try_delete_records(struct ctdb_context *ctdb, TDB_DATA indata, TDB_DATA *outdata)
1463 struct ctdb_marshall_buffer *reply = (struct ctdb_marshall_buffer *)indata.dptr;
1464 struct ctdb_db_context *ctdb_db;
1466 struct ctdb_rec_data_old *rec;
1467 struct ctdb_marshall_buffer *records;
1469 if (indata.dsize < offsetof(struct ctdb_marshall_buffer, data)) {
1470 DEBUG(DEBUG_ERR,(__location__ " invalid data in try_delete_records\n"));
1474 ctdb_db = find_ctdb_db(ctdb, reply->db_id);
1476 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", reply->db_id));
1481 DEBUG(DEBUG_DEBUG,("starting try_delete_records of %u records for dbid 0x%x\n",
1482 reply->count, reply->db_id));
1485 /* create a blob to send back the records we couldnt delete */
1486 records = (struct ctdb_marshall_buffer *)
1487 talloc_zero_size(outdata,
1488 offsetof(struct ctdb_marshall_buffer, data));
1489 if (records == NULL) {
1490 DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
1493 records->db_id = ctdb_db->db_id;
1496 rec = (struct ctdb_rec_data_old *)&reply->data[0];
1497 for (i=0;i<reply->count;i++) {
1500 key.dptr = &rec->data[0];
1501 key.dsize = rec->keylen;
1502 data.dptr = &rec->data[key.dsize];
1503 data.dsize = rec->datalen;
1505 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
1506 DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record in indata\n"));
1510 /* If we cant delete the record we must add it to the reply
1511 so the lmaster knows it may not purge this record
1513 if (delete_tdb_record(ctdb, ctdb_db, rec) != 0) {
1515 struct ctdb_ltdb_header *hdr;
1517 hdr = (struct ctdb_ltdb_header *)data.dptr;
1518 data.dptr += sizeof(*hdr);
1519 data.dsize -= sizeof(*hdr);
1521 DEBUG(DEBUG_INFO, (__location__ " Failed to vacuum delete record with hash 0x%08x\n", ctdb_hash(&key)));
1523 old_size = talloc_get_size(records);
1524 records = talloc_realloc_size(outdata, records, old_size + rec->length);
1525 if (records == NULL) {
1526 DEBUG(DEBUG_ERR,(__location__ " Failed to expand\n"));
1530 memcpy(old_size+(uint8_t *)records, rec, rec->length);
1533 rec = (struct ctdb_rec_data_old *)(rec->length + (uint8_t *)rec);
1537 *outdata = ctdb_marshall_finish(records);
1543 * Store a record as part of the vacuum process:
1544 * This is called from the RECEIVE_RECORD control which
1545 * the lmaster uses to send the current empty copy
1546 * to all nodes for storing, before it lets the other
1547 * nodes delete the records in the second phase with
1548 * the TRY_DELETE_RECORDS control.
1550 * Only store if we are not lmaster or dmaster, and our
1551 * rsn is <= the provided rsn. Use non-blocking locks.
1553 * return 0 if the record was successfully stored.
1554 * return !0 if the record still exists in the tdb after returning.
1556 static int store_tdb_record(struct ctdb_context *ctdb,
1557 struct ctdb_db_context *ctdb_db,
1558 struct ctdb_rec_data_old *rec)
1560 TDB_DATA key, data, data2;
1561 struct ctdb_ltdb_header *hdr, *hdr2;
1564 key.dsize = rec->keylen;
1565 key.dptr = &rec->data[0];
1566 data.dsize = rec->datalen;
1567 data.dptr = &rec->data[rec->keylen];
1569 if (ctdb_lmaster(ctdb, &key) == ctdb->pnn) {
1570 DEBUG(DEBUG_INFO, (__location__ " Called store_tdb_record "
1571 "where we are lmaster\n"));
1575 if (data.dsize != sizeof(struct ctdb_ltdb_header)) {
1576 DEBUG(DEBUG_ERR, (__location__ " Bad record size\n"));
1580 hdr = (struct ctdb_ltdb_header *)data.dptr;
1582 /* use a non-blocking lock */
1583 if (tdb_chainlock_nonblock(ctdb_db->ltdb->tdb, key) != 0) {
1584 DEBUG(DEBUG_INFO, (__location__ " Failed to lock chain in non-blocking mode\n"));
1588 data2 = tdb_fetch(ctdb_db->ltdb->tdb, key);
1589 if (data2.dptr == NULL || data2.dsize < sizeof(struct ctdb_ltdb_header)) {
1590 if (tdb_store(ctdb_db->ltdb->tdb, key, data, 0) == -1) {
1591 DEBUG(DEBUG_ERR, (__location__ "Failed to store record\n"));
1595 DEBUG(DEBUG_INFO, (__location__ " Stored record\n"));
1600 hdr2 = (struct ctdb_ltdb_header *)data2.dptr;
1602 if (hdr2->rsn > hdr->rsn) {
1603 DEBUG(DEBUG_INFO, (__location__ " Skipping record with "
1604 "rsn=%llu - called with rsn=%llu\n",
1605 (unsigned long long)hdr2->rsn,
1606 (unsigned long long)hdr->rsn));
1611 /* do not allow vacuuming of records that have readonly flags set. */
1612 if (hdr->flags & CTDB_REC_RO_FLAGS) {
1613 DEBUG(DEBUG_INFO,(__location__ " Skipping record with readonly "
1618 if (hdr2->flags & CTDB_REC_RO_FLAGS) {
1619 DEBUG(DEBUG_INFO,(__location__ " Skipping record with readonly "
1625 if (hdr2->dmaster == ctdb->pnn) {
1626 DEBUG(DEBUG_INFO, (__location__ " Attempted to store record "
1627 "where we are the dmaster\n"));
1632 if (tdb_store(ctdb_db->ltdb->tdb, key, data, 0) != 0) {
1633 DEBUG(DEBUG_INFO,(__location__ " Failed to store record\n"));
1641 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
1649 * Try to store all these records as part of the vacuuming process
1650 * and return the records we failed to store.
1652 int32_t ctdb_control_receive_records(struct ctdb_context *ctdb,
1653 TDB_DATA indata, TDB_DATA *outdata)
1655 struct ctdb_marshall_buffer *reply = (struct ctdb_marshall_buffer *)indata.dptr;
1656 struct ctdb_db_context *ctdb_db;
1658 struct ctdb_rec_data_old *rec;
1659 struct ctdb_marshall_buffer *records;
1661 if (indata.dsize < offsetof(struct ctdb_marshall_buffer, data)) {
1663 (__location__ " invalid data in receive_records\n"));
1667 ctdb_db = find_ctdb_db(ctdb, reply->db_id);
1669 DEBUG(DEBUG_ERR, (__location__ " Unknown db 0x%08x\n",
1674 DEBUG(DEBUG_DEBUG, ("starting receive_records of %u records for "
1675 "dbid 0x%x\n", reply->count, reply->db_id));
1677 /* create a blob to send back the records we could not store */
1678 records = (struct ctdb_marshall_buffer *)
1679 talloc_zero_size(outdata,
1680 offsetof(struct ctdb_marshall_buffer, data));
1681 if (records == NULL) {
1682 DEBUG(DEBUG_ERR, (__location__ " Out of memory\n"));
1685 records->db_id = ctdb_db->db_id;
1687 rec = (struct ctdb_rec_data_old *)&reply->data[0];
1688 for (i=0; i<reply->count; i++) {
1691 key.dptr = &rec->data[0];
1692 key.dsize = rec->keylen;
1693 data.dptr = &rec->data[key.dsize];
1694 data.dsize = rec->datalen;
1696 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
1697 DEBUG(DEBUG_CRIT, (__location__ " bad ltdb record "
1703 * If we can not store the record we must add it to the reply
1704 * so the lmaster knows it may not purge this record.
1706 if (store_tdb_record(ctdb, ctdb_db, rec) != 0) {
1708 struct ctdb_ltdb_header *hdr;
1710 hdr = (struct ctdb_ltdb_header *)data.dptr;
1711 data.dptr += sizeof(*hdr);
1712 data.dsize -= sizeof(*hdr);
1714 DEBUG(DEBUG_INFO, (__location__ " Failed to store "
1715 "record with hash 0x%08x in vacuum "
1716 "via RECEIVE_RECORDS\n",
1719 old_size = talloc_get_size(records);
1720 records = talloc_realloc_size(outdata, records,
1721 old_size + rec->length);
1722 if (records == NULL) {
1723 DEBUG(DEBUG_ERR, (__location__ " Failed to "
1728 memcpy(old_size+(uint8_t *)records, rec, rec->length);
1731 rec = (struct ctdb_rec_data_old *)(rec->length + (uint8_t *)rec);
1734 *outdata = ctdb_marshall_finish(records);
1743 int32_t ctdb_control_get_capabilities(struct ctdb_context *ctdb, TDB_DATA *outdata)
1745 uint32_t *capabilities = NULL;
1747 capabilities = talloc(outdata, uint32_t);
1748 CTDB_NO_MEMORY(ctdb, capabilities);
1749 *capabilities = ctdb->capabilities;
1751 outdata->dsize = sizeof(uint32_t);
1752 outdata->dptr = (uint8_t *)capabilities;
1757 /* The recovery daemon will ping us at regular intervals.
1758 If we havent been pinged for a while we assume the recovery
1759 daemon is inoperable and we restart.
1761 static void ctdb_recd_ping_timeout(struct tevent_context *ev,
1762 struct tevent_timer *te,
1763 struct timeval t, void *p)
1765 struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context);
1766 uint32_t *count = talloc_get_type(ctdb->recd_ping_count, uint32_t);
1768 DEBUG(DEBUG_ERR, ("Recovery daemon ping timeout. Count : %u\n", *count));
1770 if (*count < ctdb->tunable.recd_ping_failcount) {
1772 tevent_add_timer(ctdb->ev, ctdb->recd_ping_count,
1773 timeval_current_ofs(ctdb->tunable.recd_ping_timeout, 0),
1774 ctdb_recd_ping_timeout, ctdb);
1778 DEBUG(DEBUG_ERR, ("Final timeout for recovery daemon ping. Restarting recovery daemon. (This can be caused if the cluster filesystem has hung)\n"));
1780 ctdb_stop_recoverd(ctdb);
1781 ctdb_start_recoverd(ctdb);
1784 int32_t ctdb_control_recd_ping(struct ctdb_context *ctdb)
1786 talloc_free(ctdb->recd_ping_count);
1788 ctdb->recd_ping_count = talloc_zero(ctdb, uint32_t);
1789 CTDB_NO_MEMORY(ctdb, ctdb->recd_ping_count);
1791 if (ctdb->tunable.recd_ping_timeout != 0) {
1792 tevent_add_timer(ctdb->ev, ctdb->recd_ping_count,
1793 timeval_current_ofs(ctdb->tunable.recd_ping_timeout, 0),
1794 ctdb_recd_ping_timeout, ctdb);
1802 int32_t ctdb_control_set_recmaster(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata)
1804 uint32_t new_recmaster;
1806 CHECK_CONTROL_DATA_SIZE(sizeof(uint32_t));
1807 new_recmaster = ((uint32_t *)(&indata.dptr[0]))[0];
1809 if (ctdb->pnn != new_recmaster && ctdb->recovery_master == ctdb->pnn) {
1811 ("This node (%u) is no longer the recovery master\n", ctdb->pnn));
1814 if (ctdb->pnn == new_recmaster && ctdb->recovery_master != new_recmaster) {
1816 ("This node (%u) is now the recovery master\n", ctdb->pnn));
1819 ctdb->recovery_master = new_recmaster;
1824 int32_t ctdb_control_stop_node(struct ctdb_context *ctdb)
1826 DEBUG(DEBUG_NOTICE, ("Stopping node\n"));
1827 ctdb_disable_monitoring(ctdb);
1828 ctdb->nodes[ctdb->pnn]->flags |= NODE_FLAGS_STOPPED;
1833 int32_t ctdb_control_continue_node(struct ctdb_context *ctdb)
1835 DEBUG(DEBUG_NOTICE, ("Continue node\n"));
1836 ctdb->nodes[ctdb->pnn]->flags &= ~NODE_FLAGS_STOPPED;