4 Copyright (C) Andrew Tridgell 2007
5 Copyright (C) Ronnie Sahlberg 2007
7 This program is free software; you can redistribute it and/or modify
8 it under the terms of the GNU General Public License as published by
9 the Free Software Foundation; either version 3 of the License, or
10 (at your option) any later version.
12 This program is distributed in the hope that it will be useful,
13 but WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 GNU General Public License for more details.
17 You should have received a copy of the GNU General Public License
18 along with this program; if not, see <http://www.gnu.org/licenses/>.
21 #include "system/time.h"
22 #include "system/network.h"
23 #include "system/filesys.h"
24 #include "system/wait.h"
30 #include "lib/tdb_wrap/tdb_wrap.h"
31 #include "lib/util/dlinklist.h"
32 #include "lib/util/debug.h"
33 #include "lib/util/time.h"
34 #include "lib/util/util_process.h"
36 #include "ctdb_private.h"
37 #include "ctdb_client.h"
39 #include "common/system.h"
40 #include "common/common.h"
41 #include "common/logging.h"
44 ctdb_control_getvnnmap(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
46 struct ctdb_vnn_map_wire *map;
49 CHECK_CONTROL_DATA_SIZE(0);
51 len = offsetof(struct ctdb_vnn_map_wire, map) + sizeof(uint32_t)*ctdb->vnn_map->size;
52 map = talloc_size(outdata, len);
53 CTDB_NO_MEMORY(ctdb, map);
55 map->generation = ctdb->vnn_map->generation;
56 map->size = ctdb->vnn_map->size;
57 memcpy(map->map, ctdb->vnn_map->map, sizeof(uint32_t)*map->size);
60 outdata->dptr = (uint8_t *)map;
66 ctdb_control_setvnnmap(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
68 struct ctdb_vnn_map_wire *map = (struct ctdb_vnn_map_wire *)indata.dptr;
70 if (ctdb->recovery_mode != CTDB_RECOVERY_ACTIVE) {
71 DEBUG(DEBUG_ERR, ("Attempt to set vnnmap when not in recovery\n"));
75 talloc_free(ctdb->vnn_map);
77 ctdb->vnn_map = talloc(ctdb, struct ctdb_vnn_map);
78 CTDB_NO_MEMORY(ctdb, ctdb->vnn_map);
80 ctdb->vnn_map->generation = map->generation;
81 ctdb->vnn_map->size = map->size;
82 ctdb->vnn_map->map = talloc_array(ctdb->vnn_map, uint32_t, map->size);
83 CTDB_NO_MEMORY(ctdb, ctdb->vnn_map->map);
85 memcpy(ctdb->vnn_map->map, map->map, sizeof(uint32_t)*map->size);
91 ctdb_control_getdbmap(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
94 struct ctdb_db_context *ctdb_db;
95 struct ctdb_dbid_map_old *dbid_map;
97 CHECK_CONTROL_DATA_SIZE(0);
100 for(ctdb_db=ctdb->db_list;ctdb_db;ctdb_db=ctdb_db->next){
105 outdata->dsize = offsetof(struct ctdb_dbid_map_old, dbs) + sizeof(dbid_map->dbs[0])*len;
106 outdata->dptr = (unsigned char *)talloc_zero_size(outdata, outdata->dsize);
107 if (!outdata->dptr) {
108 DEBUG(DEBUG_ALERT, (__location__ " Failed to allocate dbmap array\n"));
112 dbid_map = (struct ctdb_dbid_map_old *)outdata->dptr;
114 for (i=0,ctdb_db=ctdb->db_list;ctdb_db;i++,ctdb_db=ctdb_db->next){
115 dbid_map->dbs[i].db_id = ctdb_db->db_id;
116 if (ctdb_db->persistent != 0) {
117 dbid_map->dbs[i].flags |= CTDB_DB_FLAGS_PERSISTENT;
119 if (ctdb_db->readonly != 0) {
120 dbid_map->dbs[i].flags |= CTDB_DB_FLAGS_READONLY;
122 if (ctdb_db->sticky != 0) {
123 dbid_map->dbs[i].flags |= CTDB_DB_FLAGS_STICKY;
131 ctdb_control_getnodemap(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
133 CHECK_CONTROL_DATA_SIZE(0);
135 outdata->dptr = (unsigned char *)ctdb_node_list_to_map(ctdb->nodes,
138 if (outdata->dptr == NULL) {
142 outdata->dsize = talloc_get_size(outdata->dptr);
148 reload the nodes file
151 ctdb_control_reload_nodes_file(struct ctdb_context *ctdb, uint32_t opcode)
155 struct ctdb_node **nodes;
157 tmp_ctx = talloc_new(ctdb);
159 /* steal the old nodes file for a while */
160 talloc_steal(tmp_ctx, ctdb->nodes);
163 num_nodes = ctdb->num_nodes;
166 /* load the new nodes file */
167 ctdb_load_nodes_file(ctdb);
169 for (i=0; i<ctdb->num_nodes; i++) {
170 /* keep any identical pre-existing nodes and connections */
171 if ((i < num_nodes) && ctdb_same_address(&ctdb->nodes[i]->address, &nodes[i]->address)) {
172 talloc_free(ctdb->nodes[i]);
173 ctdb->nodes[i] = talloc_steal(ctdb->nodes, nodes[i]);
177 if (ctdb->nodes[i]->flags & NODE_FLAGS_DELETED) {
181 /* any new or different nodes must be added */
182 if (ctdb->methods->add_node(ctdb->nodes[i]) != 0) {
183 DEBUG(DEBUG_CRIT, (__location__ " methods->add_node failed at %d\n", i));
184 ctdb_fatal(ctdb, "failed to add node. shutting down\n");
186 if (ctdb->methods->connect_node(ctdb->nodes[i]) != 0) {
187 DEBUG(DEBUG_CRIT, (__location__ " methods->add_connect failed at %d\n", i));
188 ctdb_fatal(ctdb, "failed to connect to node. shutting down\n");
192 /* tell the recovery daemon to reaload the nodes file too */
193 ctdb_daemon_send_message(ctdb, ctdb->pnn, CTDB_SRVID_RELOAD_NODES, tdb_null);
195 talloc_free(tmp_ctx);
201 a traverse function for pulling all relevent records from pulldb
204 struct ctdb_context *ctdb;
205 struct ctdb_db_context *ctdb_db;
206 struct ctdb_marshall_buffer *pulldata;
208 uint32_t allocated_len;
212 static int traverse_pulldb(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p)
214 struct pulldb_data *params = (struct pulldb_data *)p;
215 struct ctdb_rec_data_old *rec;
216 struct ctdb_context *ctdb = params->ctdb;
217 struct ctdb_db_context *ctdb_db = params->ctdb_db;
219 /* add the record to the blob */
220 rec = ctdb_marshall_record(params->pulldata, 0, key, NULL, data);
222 params->failed = true;
225 if (params->len + rec->length >= params->allocated_len) {
226 params->allocated_len = rec->length + params->len + ctdb->tunable.pulldb_preallocation_size;
227 params->pulldata = talloc_realloc_size(NULL, params->pulldata, params->allocated_len);
229 if (params->pulldata == NULL) {
230 DEBUG(DEBUG_CRIT,(__location__ " Failed to expand pulldb_data to %u\n", rec->length + params->len));
231 ctdb_fatal(params->ctdb, "failed to allocate memory for recovery. shutting down\n");
233 params->pulldata->count++;
234 memcpy(params->len+(uint8_t *)params->pulldata, rec, rec->length);
235 params->len += rec->length;
237 if (ctdb->tunable.db_record_size_warn != 0 && rec->length > ctdb->tunable.db_record_size_warn) {
238 DEBUG(DEBUG_ERR,("Data record in %s is big. Record size is %d bytes\n", ctdb_db->db_name, (int)rec->length));
247 pull a bunch of records from a ltdb, filtering by lmaster
249 int32_t ctdb_control_pull_db(struct ctdb_context *ctdb, TDB_DATA indata, TDB_DATA *outdata)
251 struct ctdb_pulldb *pull;
252 struct ctdb_db_context *ctdb_db;
253 struct pulldb_data params;
254 struct ctdb_marshall_buffer *reply;
256 pull = (struct ctdb_pulldb *)indata.dptr;
258 ctdb_db = find_ctdb_db(ctdb, pull->db_id);
260 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", pull->db_id));
264 if (!ctdb_db_frozen(ctdb_db)) {
266 ("rejecting ctdb_control_pull_db when not frozen\n"));
270 reply = talloc_zero(outdata, struct ctdb_marshall_buffer);
271 CTDB_NO_MEMORY(ctdb, reply);
273 reply->db_id = pull->db_id;
276 params.ctdb_db = ctdb_db;
277 params.pulldata = reply;
278 params.len = offsetof(struct ctdb_marshall_buffer, data);
279 params.allocated_len = params.len;
280 params.failed = false;
282 if (ctdb_db->unhealthy_reason) {
283 /* this is just a warning, as the tdb should be empty anyway */
284 DEBUG(DEBUG_WARNING,("db(%s) unhealty in ctdb_control_pull_db: %s\n",
285 ctdb_db->db_name, ctdb_db->unhealthy_reason));
288 if (ctdb_lockdb_mark(ctdb_db) != 0) {
289 DEBUG(DEBUG_ERR,(__location__ " Failed to get lock on entire db - failing\n"));
293 if (tdb_traverse_read(ctdb_db->ltdb->tdb, traverse_pulldb, ¶ms) == -1) {
294 DEBUG(DEBUG_ERR,(__location__ " Failed to get traverse db '%s'\n", ctdb_db->db_name));
295 ctdb_lockdb_unmark(ctdb_db);
296 talloc_free(params.pulldata);
300 ctdb_lockdb_unmark(ctdb_db);
302 outdata->dptr = (uint8_t *)params.pulldata;
303 outdata->dsize = params.len;
305 if (ctdb->tunable.db_record_count_warn != 0 && params.pulldata->count > ctdb->tunable.db_record_count_warn) {
306 DEBUG(DEBUG_ERR,("Database %s is big. Contains %d records\n", ctdb_db->db_name, params.pulldata->count));
308 if (ctdb->tunable.db_size_warn != 0 && outdata->dsize > ctdb->tunable.db_size_warn) {
309 DEBUG(DEBUG_ERR,("Database %s is big. Contains %d bytes\n", ctdb_db->db_name, (int)outdata->dsize));
316 struct db_pull_state {
317 struct ctdb_context *ctdb;
318 struct ctdb_db_context *ctdb_db;
319 struct ctdb_marshall_buffer *recs;
322 uint32_t num_records;
325 static int traverse_db_pull(struct tdb_context *tdb, TDB_DATA key,
326 TDB_DATA data, void *private_data)
328 struct db_pull_state *state = (struct db_pull_state *)private_data;
329 struct ctdb_marshall_buffer *recs;
331 recs = ctdb_marshall_add(state->ctdb, state->recs,
332 state->ctdb_db->db_id, 0, key, NULL, data);
334 TALLOC_FREE(state->recs);
339 if (talloc_get_size(state->recs) >=
340 state->ctdb->tunable.rec_buffer_size_limit) {
344 buffer = ctdb_marshall_finish(state->recs);
345 ret = ctdb_daemon_send_message(state->ctdb, state->pnn,
346 state->srvid, buffer);
348 TALLOC_FREE(state->recs);
352 state->num_records += state->recs->count;
353 TALLOC_FREE(state->recs);
359 int32_t ctdb_control_db_pull(struct ctdb_context *ctdb,
360 struct ctdb_req_control_old *c,
361 TDB_DATA indata, TDB_DATA *outdata)
363 struct ctdb_pulldb_ext *pulldb_ext;
364 struct ctdb_db_context *ctdb_db;
365 struct db_pull_state state;
368 pulldb_ext = (struct ctdb_pulldb_ext *)indata.dptr;
370 ctdb_db = find_ctdb_db(ctdb, pulldb_ext->db_id);
371 if (ctdb_db == NULL) {
372 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n",
377 if (!ctdb_db_frozen(ctdb_db)) {
379 ("rejecting ctdb_control_pull_db when not frozen\n"));
383 if (ctdb_db->unhealthy_reason) {
384 /* this is just a warning, as the tdb should be empty anyway */
386 ("db(%s) unhealty in ctdb_control_db_pull: %s\n",
387 ctdb_db->db_name, ctdb_db->unhealthy_reason));
391 state.ctdb_db = ctdb_db;
393 state.pnn = c->hdr.srcnode;
394 state.srvid = pulldb_ext->srvid;
395 state.num_records = 0;
397 if (ctdb_lockdb_mark(ctdb_db) != 0) {
399 (__location__ " Failed to get lock on entire db - failing\n"));
403 ret = tdb_traverse_read(ctdb_db->ltdb->tdb, traverse_db_pull, &state);
406 (__location__ " Failed to get traverse db '%s'\n",
408 ctdb_lockdb_unmark(ctdb_db);
412 /* Last few records */
413 if (state.recs != NULL) {
416 buffer = ctdb_marshall_finish(state.recs);
417 ret = ctdb_daemon_send_message(state.ctdb, state.pnn,
418 state.srvid, buffer);
420 TALLOC_FREE(state.recs);
421 ctdb_lockdb_unmark(ctdb_db);
425 state.num_records += state.recs->count;
426 TALLOC_FREE(state.recs);
429 ctdb_lockdb_unmark(ctdb_db);
431 outdata->dptr = talloc_size(outdata, sizeof(uint32_t));
432 if (outdata->dptr == NULL) {
433 DEBUG(DEBUG_ERR, (__location__ " Memory allocation error\n"));
437 memcpy(outdata->dptr, (uint8_t *)&state.num_records, sizeof(uint32_t));
438 outdata->dsize = sizeof(uint32_t);
444 push a bunch of records into a ltdb, filtering by rsn
446 int32_t ctdb_control_push_db(struct ctdb_context *ctdb, TDB_DATA indata)
448 struct ctdb_marshall_buffer *reply = (struct ctdb_marshall_buffer *)indata.dptr;
449 struct ctdb_db_context *ctdb_db;
451 struct ctdb_rec_data_old *rec;
453 if (indata.dsize < offsetof(struct ctdb_marshall_buffer, data)) {
454 DEBUG(DEBUG_ERR,(__location__ " invalid data in pulldb reply\n"));
458 ctdb_db = find_ctdb_db(ctdb, reply->db_id);
460 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", reply->db_id));
464 if (!ctdb_db_frozen(ctdb_db)) {
466 ("rejecting ctdb_control_push_db when not frozen\n"));
470 if (ctdb_lockdb_mark(ctdb_db) != 0) {
471 DEBUG(DEBUG_ERR,(__location__ " Failed to get lock on entire db - failing\n"));
475 rec = (struct ctdb_rec_data_old *)&reply->data[0];
477 DEBUG(DEBUG_INFO,("starting push of %u records for dbid 0x%x\n",
478 reply->count, reply->db_id));
480 for (i=0;i<reply->count;i++) {
482 struct ctdb_ltdb_header *hdr;
484 key.dptr = &rec->data[0];
485 key.dsize = rec->keylen;
486 data.dptr = &rec->data[key.dsize];
487 data.dsize = rec->datalen;
489 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
490 DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record\n"));
493 hdr = (struct ctdb_ltdb_header *)data.dptr;
494 /* strip off any read only record flags. All readonly records
495 are revoked implicitely by a recovery
497 hdr->flags &= ~CTDB_REC_RO_FLAGS;
499 data.dptr += sizeof(*hdr);
500 data.dsize -= sizeof(*hdr);
502 ret = ctdb_ltdb_store(ctdb_db, key, hdr, data);
504 DEBUG(DEBUG_CRIT, (__location__ " Unable to store record\n"));
508 rec = (struct ctdb_rec_data_old *)(rec->length + (uint8_t *)rec);
511 DEBUG(DEBUG_DEBUG,("finished push of %u records for dbid 0x%x\n",
512 reply->count, reply->db_id));
514 if (ctdb_db->readonly) {
515 DEBUG(DEBUG_CRIT,("Clearing the tracking database for dbid 0x%x\n",
517 if (tdb_wipe_all(ctdb_db->rottdb) != 0) {
518 DEBUG(DEBUG_ERR,("Failed to wipe tracking database for 0x%x. Dropping read-only delegation support\n", ctdb_db->db_id));
519 ctdb_db->readonly = false;
520 tdb_close(ctdb_db->rottdb);
521 ctdb_db->rottdb = NULL;
522 ctdb_db->readonly = false;
524 while (ctdb_db->revokechild_active != NULL) {
525 talloc_free(ctdb_db->revokechild_active);
529 ctdb_lockdb_unmark(ctdb_db);
533 ctdb_lockdb_unmark(ctdb_db);
537 struct db_push_state {
538 struct ctdb_context *ctdb;
539 struct ctdb_db_context *ctdb_db;
541 uint32_t num_records;
545 static void db_push_msg_handler(uint64_t srvid, TDB_DATA indata,
548 struct db_push_state *state = talloc_get_type(
549 private_data, struct db_push_state);
550 struct ctdb_marshall_buffer *recs;
551 struct ctdb_rec_data_old *rec;
558 recs = (struct ctdb_marshall_buffer *)indata.dptr;
559 rec = (struct ctdb_rec_data_old *)&recs->data[0];
561 DEBUG(DEBUG_INFO, ("starting push of %u records for dbid 0x%x\n",
562 recs->count, recs->db_id));
564 for (i=0; i<recs->count; i++) {
566 struct ctdb_ltdb_header *hdr;
568 key.dptr = &rec->data[0];
569 key.dsize = rec->keylen;
570 data.dptr = &rec->data[key.dsize];
571 data.dsize = rec->datalen;
573 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
574 DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record\n"));
578 hdr = (struct ctdb_ltdb_header *)data.dptr;
579 /* Strip off any read only record flags.
580 * All readonly records are revoked implicitely by a recovery.
582 hdr->flags &= ~CTDB_REC_RO_FLAGS;
584 data.dptr += sizeof(*hdr);
585 data.dsize -= sizeof(*hdr);
587 ret = ctdb_ltdb_store(state->ctdb_db, key, hdr, data);
590 (__location__ " Unable to store record\n"));
594 rec = (struct ctdb_rec_data_old *)(rec->length + (uint8_t *)rec);
597 DEBUG(DEBUG_DEBUG, ("finished push of %u records for dbid 0x%x\n",
598 recs->count, recs->db_id));
600 state->num_records += recs->count;
604 state->failed = true;
607 int32_t ctdb_control_db_push_start(struct ctdb_context *ctdb, TDB_DATA indata)
609 struct ctdb_pulldb_ext *pulldb_ext;
610 struct ctdb_db_context *ctdb_db;
611 struct db_push_state *state;
614 pulldb_ext = (struct ctdb_pulldb_ext *)indata.dptr;
616 ctdb_db = find_ctdb_db(ctdb, pulldb_ext->db_id);
617 if (ctdb_db == NULL) {
619 (__location__ " Unknown db 0x%08x\n", pulldb_ext->db_id));
623 if (!ctdb_db_frozen(ctdb_db)) {
625 ("rejecting ctdb_control_db_push_start when not frozen\n"));
629 if (ctdb_db->push_started) {
631 (__location__ " DB push already started for %s\n",
634 /* De-register old state */
635 state = (struct db_push_state *)ctdb_db->push_state;
637 srvid_deregister(ctdb->srv, state->srvid, state);
639 ctdb_db->push_state = NULL;
643 state = talloc_zero(ctdb_db, struct db_push_state);
645 DEBUG(DEBUG_ERR, (__location__ " Memory allocation error\n"));
650 state->ctdb_db = ctdb_db;
651 state->srvid = pulldb_ext->srvid;
652 state->failed = false;
654 ret = srvid_register(ctdb->srv, state, state->srvid,
655 db_push_msg_handler, state);
658 (__location__ " Failed to register srvid for db push\n"));
663 if (ctdb_lockdb_mark(ctdb_db) != 0) {
665 (__location__ " Failed to get lock on entire db - failing\n"));
666 srvid_deregister(ctdb->srv, state->srvid, state);
671 ctdb_db->push_started = true;
672 ctdb_db->push_state = state;
677 int32_t ctdb_control_db_push_confirm(struct ctdb_context *ctdb,
678 TDB_DATA indata, TDB_DATA *outdata)
681 struct ctdb_db_context *ctdb_db;
682 struct db_push_state *state;
684 db_id = *(uint32_t *)indata.dptr;
686 ctdb_db = find_ctdb_db(ctdb, db_id);
687 if (ctdb_db == NULL) {
688 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", db_id));
692 if (!ctdb_db_frozen(ctdb_db)) {
694 ("rejecting ctdb_control_db_push_confirm when not frozen\n"));
698 if (!ctdb_db->push_started) {
699 DEBUG(DEBUG_ERR, (__location__ " DB push not started\n"));
703 if (ctdb_db->readonly) {
705 ("Clearing the tracking database for dbid 0x%x\n",
707 if (tdb_wipe_all(ctdb_db->rottdb) != 0) {
709 ("Failed to wipe tracking database for 0x%x."
710 " Dropping read-only delegation support\n",
712 ctdb_db->readonly = false;
713 tdb_close(ctdb_db->rottdb);
714 ctdb_db->rottdb = NULL;
715 ctdb_db->readonly = false;
718 while (ctdb_db->revokechild_active != NULL) {
719 talloc_free(ctdb_db->revokechild_active);
723 ctdb_lockdb_unmark(ctdb_db);
725 state = (struct db_push_state *)ctdb_db->push_state;
727 DEBUG(DEBUG_ERR, (__location__ " Missing push db state\n"));
731 srvid_deregister(ctdb->srv, state->srvid, state);
733 outdata->dptr = talloc_size(outdata, sizeof(uint32_t));
734 if (outdata->dptr == NULL) {
735 DEBUG(DEBUG_ERR, (__location__ " Memory allocation error\n"));
737 ctdb_db->push_state = NULL;
741 memcpy(outdata->dptr, (uint8_t *)&state->num_records, sizeof(uint32_t));
742 outdata->dsize = sizeof(uint32_t);
745 ctdb_db->push_state = NULL;
750 struct ctdb_cluster_mutex_handle;
751 typedef void (*cluster_mutex_handler_t) (
752 struct ctdb_context *ctdb,
755 struct ctdb_cluster_mutex_handle *h,
758 struct ctdb_cluster_mutex_handle {
759 struct ctdb_context *ctdb;
760 cluster_mutex_handler_t handler;
763 struct tevent_timer *te;
764 struct tevent_fd *fde;
766 struct timeval start_time;
769 static void set_recmode_handler(struct ctdb_context *ctdb,
772 struct ctdb_cluster_mutex_handle *h,
775 /* It would be good to use talloc_get_type() here. However,
776 * the name of the packet is manually set - not sure why.
777 * Could use talloc_check_name() but this seems like a lot of
778 * manual overkill. */
779 struct ctdb_req_control_old *c =
780 (struct ctdb_req_control_old *) private_data;
782 const char *err = NULL;
788 ("ERROR: Daemon able to take recovery lock on \"%s\" during recovery\n",
789 ctdb->recovery_lock_file));
791 err = "Took recovery lock from daemon during recovery - probably a cluster filesystem lock coherence problem";
796 DEBUG(DEBUG_DEBUG, (__location__ " Recovery lock check OK\n"));
797 ctdb->recovery_mode = CTDB_RECOVERY_NORMAL;
798 ctdb_process_deferred_attach(ctdb);
802 CTDB_UPDATE_RECLOCK_LATENCY(ctdb, "daemon reclock",
803 reclock.ctdbd, latency);
807 /* Timeout. Consider this a success, not a failure,
808 * as we failed to set the recovery lock which is what
809 * we wanted. This can be caused by the cluster
810 * filesystem being very slow to arbitrate locks
811 * immediately after a node failure. */
814 "Time out getting recovery lock, allowing recmode set anyway\n"));
815 ctdb->recovery_mode = CTDB_RECOVERY_NORMAL;
816 ctdb_process_deferred_attach(ctdb);
823 ("Unexpected error when testing recovery lock\n"));
825 err = "Unexpected error when testing recovery lock";
828 ctdb_request_control_reply(ctdb, c, NULL, s, err);
833 called if our set_recmode child times out. this would happen if
834 ctdb_recovery_lock() would block.
836 static void cluster_mutex_timeout(struct tevent_context *ev,
837 struct tevent_timer *te,
838 struct timeval t, void *private_data)
840 struct ctdb_cluster_mutex_handle *h =
841 talloc_get_type(private_data, struct ctdb_cluster_mutex_handle);
842 double latency = timeval_elapsed(&h->start_time);
844 if (h->handler != NULL) {
845 h->handler(h->ctdb, '2', latency, h, h->private_data);
850 /* When the handle is freed it causes any child holding the mutex to
851 * be killed, thus freeing the mutex */
852 static int cluster_mutex_destructor(struct ctdb_cluster_mutex_handle *h)
854 if (h->fd[0] != -1) {
857 ctdb_kill(h->ctdb, h->child, SIGKILL);
861 /* this is called when the client process has completed ctdb_recovery_lock()
862 and has written data back to us through the pipe.
864 static void cluster_mutex_handler(struct tevent_context *ev,
865 struct tevent_fd *fde,
866 uint16_t flags, void *private_data)
868 struct ctdb_cluster_mutex_handle *h=
869 talloc_get_type(private_data, struct ctdb_cluster_mutex_handle);
870 double latency = timeval_elapsed(&h->start_time);
874 /* Got response from child process so abort timeout */
877 ret = sys_read(h->fd[0], &c, 1);
879 /* If the child wrote status then just pass it to the handler.
880 * If no status was written then this is an unexpected error
881 * so pass generic error code to handler. */
882 if (h->handler != NULL) {
883 h->handler(h->ctdb, ret == 1 ? c : '3', latency,
889 ctdb_drop_all_ips_event(struct tevent_context *ev, struct tevent_timer *te,
890 struct timeval t, void *private_data)
892 struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
894 DEBUG(DEBUG_ERR,(__location__ " Been in recovery mode for too long. Dropping all IPS\n"));
895 talloc_free(ctdb->release_ips_ctx);
896 ctdb->release_ips_ctx = NULL;
898 ctdb_release_all_ips(ctdb);
901 static char cluster_mutex_helper[PATH_MAX+1] = "";
903 static bool cluster_mutex_helper_args(TALLOC_CTX *mem_ctx,
904 const char *lockfile, char ***argv)
909 /* Anticipate the size of the array. Given that lock file is
910 * really now some arbitrary arguments to a configuration
911 * helper, it really needs to be parsed... but not yet. */
912 args = talloc_array(mem_ctx, char *, 3);
914 DEBUG(DEBUG_ERR,(__location__ " out of memory\n"));
920 if (!ctdb_set_helper("cluster mutex helper",
921 cluster_mutex_helper,
922 sizeof(cluster_mutex_helper),
923 "CTDB_CLUSTER_MUTEX_HELPER",
924 CTDB_HELPER_BINDIR, "ctdb_mutex_fcntl_helper")) {
925 DEBUG(DEBUG_ERR,("ctdb exiting with error: %s\n",
927 " Unable to set cluster mutex helper\n"));
930 args[nargs++] = cluster_mutex_helper;
932 args[nargs] = talloc_strdup(args, lockfile);
933 if (args[nargs] == NULL) {
935 DEBUG(DEBUG_ERR,(__location__ " out of memory\n"));
940 /* Make sure last argument is NULL */
947 static struct ctdb_cluster_mutex_handle *
948 ctdb_cluster_mutex(struct ctdb_context *ctdb, int timeout)
950 struct ctdb_cluster_mutex_handle *h;
954 h = talloc(ctdb, struct ctdb_cluster_mutex_handle);
956 DEBUG(DEBUG_ERR, (__location__ " out of memory\n"));
960 h->start_time = timeval_current();
967 DEBUG(DEBUG_ERR, (__location__ " Failed to open pipe\n"));
970 set_close_on_exec(h->fd[0]);
972 /* Create arguments for lock helper */
973 if (!cluster_mutex_helper_args(h, ctdb->recovery_lock_file, &args)) {
980 h->child = ctdb_fork(ctdb);
981 if (h->child == (pid_t)-1) {
989 /* Make stdout point to the pipe */
990 close(STDOUT_FILENO);
991 dup2(h->fd[1], STDOUT_FILENO);
994 execv(args[0], args);
996 /* Only happens on error */
997 DEBUG(DEBUG_ERR, (__location__ "execv() failed\n"));
1003 DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d\n", h->fd[0]));
1004 set_close_on_exec(h->fd[0]);
1009 talloc_set_destructor(h, cluster_mutex_destructor);
1012 h->te = tevent_add_timer(ctdb->ev, h,
1013 timeval_current_ofs(timeout, 0),
1014 cluster_mutex_timeout, h);
1019 h->fde = tevent_add_fd(ctdb->ev, h, h->fd[0], TEVENT_FD_READ,
1020 cluster_mutex_handler, (void *)h);
1022 if (h->fde == NULL) {
1026 tevent_fd_set_auto_close(h->fde);
1030 h->private_data = NULL;
1037 * Set up an event to drop all public ips if we remain in recovery for too
1040 int ctdb_deferred_drop_all_ips(struct ctdb_context *ctdb)
1042 if (ctdb->release_ips_ctx != NULL) {
1043 talloc_free(ctdb->release_ips_ctx);
1045 ctdb->release_ips_ctx = talloc_new(ctdb);
1046 CTDB_NO_MEMORY(ctdb, ctdb->release_ips_ctx);
1048 tevent_add_timer(ctdb->ev, ctdb->release_ips_ctx,
1049 timeval_current_ofs(ctdb->tunable.recovery_drop_all_ips, 0),
1050 ctdb_drop_all_ips_event, ctdb);
1055 set the recovery mode
1057 int32_t ctdb_control_set_recmode(struct ctdb_context *ctdb,
1058 struct ctdb_req_control_old *c,
1059 TDB_DATA indata, bool *async_reply,
1060 const char **errormsg)
1062 uint32_t recmode = *(uint32_t *)indata.dptr;
1064 struct ctdb_db_context *ctdb_db;
1065 struct ctdb_cluster_mutex_handle *h;
1067 /* if we enter recovery but stay in recovery for too long
1068 we will eventually drop all our ip addresses
1070 if (recmode == CTDB_RECOVERY_NORMAL) {
1071 talloc_free(ctdb->release_ips_ctx);
1072 ctdb->release_ips_ctx = NULL;
1074 if (ctdb_deferred_drop_all_ips(ctdb) != 0) {
1075 DEBUG(DEBUG_ERR,("Failed to set up deferred drop all ips\n"));
1079 if (recmode != ctdb->recovery_mode) {
1080 DEBUG(DEBUG_NOTICE,(__location__ " Recovery mode set to %s\n",
1081 recmode==CTDB_RECOVERY_NORMAL?"NORMAL":"ACTIVE"));
1084 if (recmode != CTDB_RECOVERY_NORMAL ||
1085 ctdb->recovery_mode != CTDB_RECOVERY_ACTIVE) {
1086 ctdb->recovery_mode = recmode;
1090 /* From this point: recmode == CTDB_RECOVERY_NORMAL
1092 * Therefore, what follows is special handling when setting
1093 * recovery mode back to normal */
1095 for (ctdb_db = ctdb->db_list; ctdb_db != NULL; ctdb_db = ctdb_db->next) {
1096 if (ctdb_db->generation != ctdb->vnn_map->generation) {
1098 ("Inconsistent DB generation %u for %s\n",
1099 ctdb_db->generation, ctdb_db->db_name));
1100 DEBUG(DEBUG_ERR, ("Recovery mode set to ACTIVE\n"));
1105 /* force the databases to thaw */
1106 for (i=1; i<=NUM_DB_PRIORITIES; i++) {
1107 if (ctdb_db_prio_frozen(ctdb, i)) {
1108 ctdb_control_thaw(ctdb, i, false);
1112 /* release any deferred attach calls from clients */
1113 if (recmode == CTDB_RECOVERY_NORMAL) {
1114 ctdb_process_deferred_attach(ctdb);
1117 if (ctdb->recovery_lock_file == NULL) {
1118 /* Not using recovery lock file */
1119 ctdb->recovery_mode = recmode;
1123 h = ctdb_cluster_mutex(ctdb, 5);
1128 /* set_recmode_handler() frees h */
1129 h->handler = set_recmode_handler;
1130 h->private_data = talloc_steal(h, c);
1132 *async_reply = true;
1138 bool ctdb_recovery_have_lock(struct ctdb_context *ctdb)
1140 return ctdb->recovery_lock_fd != -1;
1144 try and get the recovery lock in shared storage - should only work
1145 on the recovery master recovery daemon. Anywhere else is a bug
1147 bool ctdb_recovery_lock(struct ctdb_context *ctdb)
1151 ctdb->recovery_lock_fd = open(ctdb->recovery_lock_file,
1152 O_RDWR|O_CREAT, 0600);
1153 if (ctdb->recovery_lock_fd == -1) {
1155 ("ctdb_recovery_lock: Unable to open %s - (%s)\n",
1156 ctdb->recovery_lock_file, strerror(errno)));
1160 set_close_on_exec(ctdb->recovery_lock_fd);
1162 lock.l_type = F_WRLCK;
1163 lock.l_whence = SEEK_SET;
1168 if (fcntl(ctdb->recovery_lock_fd, F_SETLK, &lock) != 0) {
1169 int saved_errno = errno;
1170 close(ctdb->recovery_lock_fd);
1171 ctdb->recovery_lock_fd = -1;
1172 /* Fail silently on these errors, since they indicate
1173 * lock contention, but log an error for any other
1175 if (saved_errno != EACCES &&
1176 saved_errno != EAGAIN) {
1177 DEBUG(DEBUG_ERR,("ctdb_recovery_lock: Failed to get "
1178 "recovery lock on '%s' - (%s)\n",
1179 ctdb->recovery_lock_file,
1180 strerror(saved_errno)));
1188 void ctdb_recovery_unlock(struct ctdb_context *ctdb)
1190 if (ctdb->recovery_lock_fd != -1) {
1191 DEBUG(DEBUG_NOTICE, ("Releasing recovery lock\n"));
1192 close(ctdb->recovery_lock_fd);
1193 ctdb->recovery_lock_fd = -1;
1198 delete a record as part of the vacuum process
1199 only delete if we are not lmaster or dmaster, and our rsn is <= the provided rsn
1200 use non-blocking locks
1202 return 0 if the record was successfully deleted (i.e. it does not exist
1203 when the function returns)
1204 or !0 is the record still exists in the tdb after returning.
1206 static int delete_tdb_record(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, struct ctdb_rec_data_old *rec)
1208 TDB_DATA key, data, data2;
1209 struct ctdb_ltdb_header *hdr, *hdr2;
1211 /* these are really internal tdb functions - but we need them here for
1212 non-blocking lock of the freelist */
1213 int tdb_lock_nonblock(struct tdb_context *tdb, int list, int ltype);
1214 int tdb_unlock(struct tdb_context *tdb, int list, int ltype);
1217 key.dsize = rec->keylen;
1218 key.dptr = &rec->data[0];
1219 data.dsize = rec->datalen;
1220 data.dptr = &rec->data[rec->keylen];
1222 if (ctdb_lmaster(ctdb, &key) == ctdb->pnn) {
1223 DEBUG(DEBUG_INFO,(__location__ " Called delete on record where we are lmaster\n"));
1227 if (data.dsize != sizeof(struct ctdb_ltdb_header)) {
1228 DEBUG(DEBUG_ERR,(__location__ " Bad record size\n"));
1232 hdr = (struct ctdb_ltdb_header *)data.dptr;
1234 /* use a non-blocking lock */
1235 if (tdb_chainlock_nonblock(ctdb_db->ltdb->tdb, key) != 0) {
1239 data2 = tdb_fetch(ctdb_db->ltdb->tdb, key);
1240 if (data2.dptr == NULL) {
1241 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
1245 if (data2.dsize < sizeof(struct ctdb_ltdb_header)) {
1246 if (tdb_lock_nonblock(ctdb_db->ltdb->tdb, -1, F_WRLCK) == 0) {
1247 if (tdb_delete(ctdb_db->ltdb->tdb, key) != 0) {
1248 DEBUG(DEBUG_CRIT,(__location__ " Failed to delete corrupt record\n"));
1250 tdb_unlock(ctdb_db->ltdb->tdb, -1, F_WRLCK);
1251 DEBUG(DEBUG_CRIT,(__location__ " Deleted corrupt record\n"));
1253 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
1258 hdr2 = (struct ctdb_ltdb_header *)data2.dptr;
1260 if (hdr2->rsn > hdr->rsn) {
1261 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
1262 DEBUG(DEBUG_INFO,(__location__ " Skipping record with rsn=%llu - called with rsn=%llu\n",
1263 (unsigned long long)hdr2->rsn, (unsigned long long)hdr->rsn));
1268 /* do not allow deleting record that have readonly flags set. */
1269 if (hdr->flags & CTDB_REC_RO_FLAGS) {
1270 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
1271 DEBUG(DEBUG_INFO,(__location__ " Skipping record with readonly flags set\n"));
1275 if (hdr2->flags & CTDB_REC_RO_FLAGS) {
1276 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
1277 DEBUG(DEBUG_INFO,(__location__ " Skipping record with readonly flags set\n"));
1282 if (hdr2->dmaster == ctdb->pnn) {
1283 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
1284 DEBUG(DEBUG_INFO,(__location__ " Attempted delete record where we are the dmaster\n"));
1289 if (tdb_lock_nonblock(ctdb_db->ltdb->tdb, -1, F_WRLCK) != 0) {
1290 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
1295 if (tdb_delete(ctdb_db->ltdb->tdb, key) != 0) {
1296 tdb_unlock(ctdb_db->ltdb->tdb, -1, F_WRLCK);
1297 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
1298 DEBUG(DEBUG_INFO,(__location__ " Failed to delete record\n"));
1303 tdb_unlock(ctdb_db->ltdb->tdb, -1, F_WRLCK);
1304 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
1311 struct recovery_callback_state {
1312 struct ctdb_req_control_old *c;
1317 called when the 'recovered' event script has finished
1319 static void ctdb_end_recovery_callback(struct ctdb_context *ctdb, int status, void *p)
1321 struct recovery_callback_state *state = talloc_get_type(p, struct recovery_callback_state);
1323 ctdb_enable_monitoring(ctdb);
1324 CTDB_INCREMENT_STAT(ctdb, num_recoveries);
1327 DEBUG(DEBUG_ERR,(__location__ " recovered event script failed (status %d)\n", status));
1328 if (status == -ETIME) {
1329 ctdb_ban_self(ctdb);
1333 ctdb_request_control_reply(ctdb, state->c, NULL, status, NULL);
1336 gettimeofday(&ctdb->last_recovery_finished, NULL);
1338 if (ctdb->runstate == CTDB_RUNSTATE_FIRST_RECOVERY) {
1339 ctdb_set_runstate(ctdb, CTDB_RUNSTATE_STARTUP);
1344 recovery has finished
1346 int32_t ctdb_control_end_recovery(struct ctdb_context *ctdb,
1347 struct ctdb_req_control_old *c,
1351 struct recovery_callback_state *state;
1353 DEBUG(DEBUG_NOTICE,("Recovery has finished\n"));
1355 ctdb_persistent_finish_trans3_commits(ctdb);
1357 state = talloc(ctdb, struct recovery_callback_state);
1358 CTDB_NO_MEMORY(ctdb, state);
1362 ctdb_disable_monitoring(ctdb);
1364 ret = ctdb_event_script_callback(ctdb, state,
1365 ctdb_end_recovery_callback,
1367 CTDB_EVENT_RECOVERED, "%s", "");
1370 ctdb_enable_monitoring(ctdb);
1372 DEBUG(DEBUG_ERR,(__location__ " Failed to end recovery\n"));
1377 /* tell the control that we will be reply asynchronously */
1378 state->c = talloc_steal(state, c);
1379 *async_reply = true;
1384 called when the 'startrecovery' event script has finished
1386 static void ctdb_start_recovery_callback(struct ctdb_context *ctdb, int status, void *p)
1388 struct recovery_callback_state *state = talloc_get_type(p, struct recovery_callback_state);
1391 DEBUG(DEBUG_ERR,(__location__ " startrecovery event script failed (status %d)\n", status));
1394 ctdb_request_control_reply(ctdb, state->c, NULL, status, NULL);
1399 run the startrecovery eventscript
1401 int32_t ctdb_control_start_recovery(struct ctdb_context *ctdb,
1402 struct ctdb_req_control_old *c,
1406 struct recovery_callback_state *state;
1408 DEBUG(DEBUG_NOTICE,(__location__ " startrecovery eventscript has been invoked\n"));
1409 gettimeofday(&ctdb->last_recovery_started, NULL);
1411 state = talloc(ctdb, struct recovery_callback_state);
1412 CTDB_NO_MEMORY(ctdb, state);
1414 state->c = talloc_steal(state, c);
1416 ctdb_disable_monitoring(ctdb);
1418 ret = ctdb_event_script_callback(ctdb, state,
1419 ctdb_start_recovery_callback,
1421 CTDB_EVENT_START_RECOVERY,
1425 DEBUG(DEBUG_ERR,(__location__ " Failed to start recovery\n"));
1430 /* tell the control that we will be reply asynchronously */
1431 *async_reply = true;
1436 try to delete all these records as part of the vacuuming process
1437 and return the records we failed to delete
1439 int32_t ctdb_control_try_delete_records(struct ctdb_context *ctdb, TDB_DATA indata, TDB_DATA *outdata)
1441 struct ctdb_marshall_buffer *reply = (struct ctdb_marshall_buffer *)indata.dptr;
1442 struct ctdb_db_context *ctdb_db;
1444 struct ctdb_rec_data_old *rec;
1445 struct ctdb_marshall_buffer *records;
1447 if (indata.dsize < offsetof(struct ctdb_marshall_buffer, data)) {
1448 DEBUG(DEBUG_ERR,(__location__ " invalid data in try_delete_records\n"));
1452 ctdb_db = find_ctdb_db(ctdb, reply->db_id);
1454 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", reply->db_id));
1459 DEBUG(DEBUG_DEBUG,("starting try_delete_records of %u records for dbid 0x%x\n",
1460 reply->count, reply->db_id));
1463 /* create a blob to send back the records we couldnt delete */
1464 records = (struct ctdb_marshall_buffer *)
1465 talloc_zero_size(outdata,
1466 offsetof(struct ctdb_marshall_buffer, data));
1467 if (records == NULL) {
1468 DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
1471 records->db_id = ctdb_db->db_id;
1474 rec = (struct ctdb_rec_data_old *)&reply->data[0];
1475 for (i=0;i<reply->count;i++) {
1478 key.dptr = &rec->data[0];
1479 key.dsize = rec->keylen;
1480 data.dptr = &rec->data[key.dsize];
1481 data.dsize = rec->datalen;
1483 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
1484 DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record in indata\n"));
1488 /* If we cant delete the record we must add it to the reply
1489 so the lmaster knows it may not purge this record
1491 if (delete_tdb_record(ctdb, ctdb_db, rec) != 0) {
1493 struct ctdb_ltdb_header *hdr;
1495 hdr = (struct ctdb_ltdb_header *)data.dptr;
1496 data.dptr += sizeof(*hdr);
1497 data.dsize -= sizeof(*hdr);
1499 DEBUG(DEBUG_INFO, (__location__ " Failed to vacuum delete record with hash 0x%08x\n", ctdb_hash(&key)));
1501 old_size = talloc_get_size(records);
1502 records = talloc_realloc_size(outdata, records, old_size + rec->length);
1503 if (records == NULL) {
1504 DEBUG(DEBUG_ERR,(__location__ " Failed to expand\n"));
1508 memcpy(old_size+(uint8_t *)records, rec, rec->length);
1511 rec = (struct ctdb_rec_data_old *)(rec->length + (uint8_t *)rec);
1515 *outdata = ctdb_marshall_finish(records);
1521 * Store a record as part of the vacuum process:
1522 * This is called from the RECEIVE_RECORD control which
1523 * the lmaster uses to send the current empty copy
1524 * to all nodes for storing, before it lets the other
1525 * nodes delete the records in the second phase with
1526 * the TRY_DELETE_RECORDS control.
1528 * Only store if we are not lmaster or dmaster, and our
1529 * rsn is <= the provided rsn. Use non-blocking locks.
1531 * return 0 if the record was successfully stored.
1532 * return !0 if the record still exists in the tdb after returning.
1534 static int store_tdb_record(struct ctdb_context *ctdb,
1535 struct ctdb_db_context *ctdb_db,
1536 struct ctdb_rec_data_old *rec)
1538 TDB_DATA key, data, data2;
1539 struct ctdb_ltdb_header *hdr, *hdr2;
1542 key.dsize = rec->keylen;
1543 key.dptr = &rec->data[0];
1544 data.dsize = rec->datalen;
1545 data.dptr = &rec->data[rec->keylen];
1547 if (ctdb_lmaster(ctdb, &key) == ctdb->pnn) {
1548 DEBUG(DEBUG_INFO, (__location__ " Called store_tdb_record "
1549 "where we are lmaster\n"));
1553 if (data.dsize != sizeof(struct ctdb_ltdb_header)) {
1554 DEBUG(DEBUG_ERR, (__location__ " Bad record size\n"));
1558 hdr = (struct ctdb_ltdb_header *)data.dptr;
1560 /* use a non-blocking lock */
1561 if (tdb_chainlock_nonblock(ctdb_db->ltdb->tdb, key) != 0) {
1562 DEBUG(DEBUG_INFO, (__location__ " Failed to lock chain in non-blocking mode\n"));
1566 data2 = tdb_fetch(ctdb_db->ltdb->tdb, key);
1567 if (data2.dptr == NULL || data2.dsize < sizeof(struct ctdb_ltdb_header)) {
1568 if (tdb_store(ctdb_db->ltdb->tdb, key, data, 0) == -1) {
1569 DEBUG(DEBUG_ERR, (__location__ "Failed to store record\n"));
1573 DEBUG(DEBUG_INFO, (__location__ " Stored record\n"));
1578 hdr2 = (struct ctdb_ltdb_header *)data2.dptr;
1580 if (hdr2->rsn > hdr->rsn) {
1581 DEBUG(DEBUG_INFO, (__location__ " Skipping record with "
1582 "rsn=%llu - called with rsn=%llu\n",
1583 (unsigned long long)hdr2->rsn,
1584 (unsigned long long)hdr->rsn));
1589 /* do not allow vacuuming of records that have readonly flags set. */
1590 if (hdr->flags & CTDB_REC_RO_FLAGS) {
1591 DEBUG(DEBUG_INFO,(__location__ " Skipping record with readonly "
1596 if (hdr2->flags & CTDB_REC_RO_FLAGS) {
1597 DEBUG(DEBUG_INFO,(__location__ " Skipping record with readonly "
1603 if (hdr2->dmaster == ctdb->pnn) {
1604 DEBUG(DEBUG_INFO, (__location__ " Attempted to store record "
1605 "where we are the dmaster\n"));
1610 if (tdb_store(ctdb_db->ltdb->tdb, key, data, 0) != 0) {
1611 DEBUG(DEBUG_INFO,(__location__ " Failed to store record\n"));
1619 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
1627 * Try to store all these records as part of the vacuuming process
1628 * and return the records we failed to store.
1630 int32_t ctdb_control_receive_records(struct ctdb_context *ctdb,
1631 TDB_DATA indata, TDB_DATA *outdata)
1633 struct ctdb_marshall_buffer *reply = (struct ctdb_marshall_buffer *)indata.dptr;
1634 struct ctdb_db_context *ctdb_db;
1636 struct ctdb_rec_data_old *rec;
1637 struct ctdb_marshall_buffer *records;
1639 if (indata.dsize < offsetof(struct ctdb_marshall_buffer, data)) {
1641 (__location__ " invalid data in receive_records\n"));
1645 ctdb_db = find_ctdb_db(ctdb, reply->db_id);
1647 DEBUG(DEBUG_ERR, (__location__ " Unknown db 0x%08x\n",
1652 DEBUG(DEBUG_DEBUG, ("starting receive_records of %u records for "
1653 "dbid 0x%x\n", reply->count, reply->db_id));
1655 /* create a blob to send back the records we could not store */
1656 records = (struct ctdb_marshall_buffer *)
1657 talloc_zero_size(outdata,
1658 offsetof(struct ctdb_marshall_buffer, data));
1659 if (records == NULL) {
1660 DEBUG(DEBUG_ERR, (__location__ " Out of memory\n"));
1663 records->db_id = ctdb_db->db_id;
1665 rec = (struct ctdb_rec_data_old *)&reply->data[0];
1666 for (i=0; i<reply->count; i++) {
1669 key.dptr = &rec->data[0];
1670 key.dsize = rec->keylen;
1671 data.dptr = &rec->data[key.dsize];
1672 data.dsize = rec->datalen;
1674 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
1675 DEBUG(DEBUG_CRIT, (__location__ " bad ltdb record "
1681 * If we can not store the record we must add it to the reply
1682 * so the lmaster knows it may not purge this record.
1684 if (store_tdb_record(ctdb, ctdb_db, rec) != 0) {
1686 struct ctdb_ltdb_header *hdr;
1688 hdr = (struct ctdb_ltdb_header *)data.dptr;
1689 data.dptr += sizeof(*hdr);
1690 data.dsize -= sizeof(*hdr);
1692 DEBUG(DEBUG_INFO, (__location__ " Failed to store "
1693 "record with hash 0x%08x in vacuum "
1694 "via RECEIVE_RECORDS\n",
1697 old_size = talloc_get_size(records);
1698 records = talloc_realloc_size(outdata, records,
1699 old_size + rec->length);
1700 if (records == NULL) {
1701 DEBUG(DEBUG_ERR, (__location__ " Failed to "
1706 memcpy(old_size+(uint8_t *)records, rec, rec->length);
1709 rec = (struct ctdb_rec_data_old *)(rec->length + (uint8_t *)rec);
1712 *outdata = ctdb_marshall_finish(records);
1721 int32_t ctdb_control_get_capabilities(struct ctdb_context *ctdb, TDB_DATA *outdata)
1723 uint32_t *capabilities = NULL;
1725 capabilities = talloc(outdata, uint32_t);
1726 CTDB_NO_MEMORY(ctdb, capabilities);
1727 *capabilities = ctdb->capabilities;
1729 outdata->dsize = sizeof(uint32_t);
1730 outdata->dptr = (uint8_t *)capabilities;
1735 /* The recovery daemon will ping us at regular intervals.
1736 If we havent been pinged for a while we assume the recovery
1737 daemon is inoperable and we restart.
1739 static void ctdb_recd_ping_timeout(struct tevent_context *ev,
1740 struct tevent_timer *te,
1741 struct timeval t, void *p)
1743 struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context);
1744 uint32_t *count = talloc_get_type(ctdb->recd_ping_count, uint32_t);
1746 DEBUG(DEBUG_ERR, ("Recovery daemon ping timeout. Count : %u\n", *count));
1748 if (*count < ctdb->tunable.recd_ping_failcount) {
1750 tevent_add_timer(ctdb->ev, ctdb->recd_ping_count,
1751 timeval_current_ofs(ctdb->tunable.recd_ping_timeout, 0),
1752 ctdb_recd_ping_timeout, ctdb);
1756 DEBUG(DEBUG_ERR, ("Final timeout for recovery daemon ping. Restarting recovery daemon. (This can be caused if the cluster filesystem has hung)\n"));
1758 ctdb_stop_recoverd(ctdb);
1759 ctdb_start_recoverd(ctdb);
1762 int32_t ctdb_control_recd_ping(struct ctdb_context *ctdb)
1764 talloc_free(ctdb->recd_ping_count);
1766 ctdb->recd_ping_count = talloc_zero(ctdb, uint32_t);
1767 CTDB_NO_MEMORY(ctdb, ctdb->recd_ping_count);
1769 if (ctdb->tunable.recd_ping_timeout != 0) {
1770 tevent_add_timer(ctdb->ev, ctdb->recd_ping_count,
1771 timeval_current_ofs(ctdb->tunable.recd_ping_timeout, 0),
1772 ctdb_recd_ping_timeout, ctdb);
1780 int32_t ctdb_control_set_recmaster(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata)
1782 uint32_t new_recmaster;
1784 CHECK_CONTROL_DATA_SIZE(sizeof(uint32_t));
1785 new_recmaster = ((uint32_t *)(&indata.dptr[0]))[0];
1787 if (ctdb->pnn != new_recmaster && ctdb->recovery_master == ctdb->pnn) {
1789 ("This node (%u) is no longer the recovery master\n", ctdb->pnn));
1792 if (ctdb->pnn == new_recmaster && ctdb->recovery_master != new_recmaster) {
1794 ("This node (%u) is now the recovery master\n", ctdb->pnn));
1797 ctdb->recovery_master = new_recmaster;
1802 int32_t ctdb_control_stop_node(struct ctdb_context *ctdb)
1804 DEBUG(DEBUG_NOTICE, ("Stopping node\n"));
1805 ctdb_disable_monitoring(ctdb);
1806 ctdb->nodes[ctdb->pnn]->flags |= NODE_FLAGS_STOPPED;
1811 int32_t ctdb_control_continue_node(struct ctdb_context *ctdb)
1813 DEBUG(DEBUG_NOTICE, ("Continue node\n"));
1814 ctdb->nodes[ctdb->pnn]->flags &= ~NODE_FLAGS_STOPPED;