2 ctdb ltdb code - server side
4 Copyright (C) Andrew Tridgell 2007
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program; if not, see <http://www.gnu.org/licenses/>.
21 #include "lib/tevent/tevent.h"
22 #include "lib/tdb/include/tdb.h"
23 #include "system/network.h"
24 #include "system/filesys.h"
25 #include "system/dir.h"
26 #include "system/time.h"
27 #include "../include/ctdb_private.h"
28 #include "../common/rb_tree.h"
30 #include "lib/util/dlinklist.h"
33 #define PERSISTENT_HEALTH_TDB "persistent_health.tdb"
36 this is the dummy null procedure that all databases support
38 static int ctdb_null_func(struct ctdb_call_info *call)
44 this is a plain fetch procedure that all databases support
46 static int ctdb_fetch_func(struct ctdb_call_info *call)
48 call->reply_data = &call->record_data;
54 * write a record to a normal database
56 * This is the server-variant of the ctdb_ltdb_store function.
57 * It contains logic to determine whether a record should be
58 * stored or deleted. It also sends SCHEDULE_FOR_DELETION
59 * controls to the local ctdb daemon if apporpriate.
61 static int ctdb_ltdb_store_server(struct ctdb_db_context *ctdb_db,
63 struct ctdb_ltdb_header *header,
66 struct ctdb_context *ctdb = ctdb_db->ctdb;
69 bool seqnum_suppressed = false;
71 bool schedule_for_deletion = false;
74 if (ctdb->flags & CTDB_FLAG_TORTURE) {
75 struct ctdb_ltdb_header *h2;
76 rec = tdb_fetch(ctdb_db->ltdb->tdb, key);
77 h2 = (struct ctdb_ltdb_header *)rec.dptr;
78 if (rec.dptr && rec.dsize >= sizeof(h2) && h2->rsn > header->rsn) {
79 DEBUG(DEBUG_CRIT,("RSN regression! %llu %llu\n",
80 (unsigned long long)h2->rsn, (unsigned long long)header->rsn));
82 if (rec.dptr) free(rec.dptr);
85 if (ctdb->vnn_map == NULL) {
87 * Called from a client: always store the record
88 * Also don't call ctdb_lmaster since it uses the vnn_map!
94 lmaster = ctdb_lmaster(ctdb_db->ctdb, &key);
97 * If we migrate an empty record off to another node
98 * and the record has not been migrated with data,
99 * delete the record instead of storing the empty record.
101 if (data.dsize != 0) {
103 } else if (ctdb_db->persistent) {
105 } else if (header->flags & CTDB_REC_FLAG_AUTOMATIC) {
107 * The record is not created by the client but
108 * automatically by the ctdb_ltdb_fetch logic that
109 * creates a record with an initial header in the
110 * ltdb before trying to migrate the record from
111 * the current lmaster. Keep it instead of trying
112 * to delete the non-existing record...
115 schedule_for_deletion = true;
116 } else if (header->flags & CTDB_REC_FLAG_MIGRATED_WITH_DATA) {
118 } else if (ctdb_db->ctdb->pnn == lmaster) {
120 * If we are lmaster, then we usually keep the record.
121 * But if we retrieve the dmaster role by a VACUUM_MIGRATE
122 * and the record is empty and has never been migrated
123 * with data, then we should delete it instead of storing it.
124 * This is part of the vacuuming process.
126 * The reason that we usually need to store even empty records
127 * on the lmaster is that a client operating directly on the
128 * lmaster (== dmaster) expects the local copy of the record to
129 * exist after successful ctdb migrate call. If the record does
130 * not exist, the client goes into a migrate loop and eventually
131 * fails. So storing the empty record makes sure that we do not
132 * need to change the client code.
134 if (!(header->flags & CTDB_REC_FLAG_VACUUM_MIGRATED)) {
136 } else if (ctdb_db->ctdb->pnn != header->dmaster) {
139 } else if (ctdb_db->ctdb->pnn == header->dmaster) {
145 !ctdb_db->persistent &&
146 (ctdb_db->ctdb->pnn == header->dmaster))
148 schedule_for_deletion = true;
153 * The VACUUM_MIGRATED flag is only set temporarily for
154 * the above logic when the record was retrieved by a
155 * VACUUM_MIGRATE call and should not be stored in the
158 * The VACUUM_MIGRATE call is triggered by a vacuum fetch,
159 * and there are two cases in which the corresponding record
160 * is stored in the local database:
161 * 1. The record has been migrated with data in the past
162 * (the MIGRATED_WITH_DATA record flag is set).
163 * 2. The record has been filled with data again since it
164 * had been submitted in the VACUUM_FETCH message to the
166 * For such records it is important to not store the
167 * VACUUM_MIGRATED flag in the database.
169 header->flags &= ~CTDB_REC_FLAG_VACUUM_MIGRATED;
172 * Similarly, clear the AUTOMATIC flag which should not enter
173 * the local database copy since this would require client
174 * modifications to clear the flag when the client stores
177 header->flags &= ~CTDB_REC_FLAG_AUTOMATIC;
179 rec.dsize = sizeof(*header) + data.dsize;
180 rec.dptr = talloc_size(ctdb, rec.dsize);
181 CTDB_NO_MEMORY(ctdb, rec.dptr);
183 memcpy(rec.dptr, header, sizeof(*header));
184 memcpy(rec.dptr + sizeof(*header), data.dptr, data.dsize);
186 /* Databases with seqnum updates enabled only get their seqnum
187 changes when/if we modify the data */
188 if (ctdb_db->seqnum_update != NULL) {
190 old = tdb_fetch(ctdb_db->ltdb->tdb, key);
192 if ( (old.dsize == rec.dsize)
193 && !memcmp(old.dptr+sizeof(struct ctdb_ltdb_header),
194 rec.dptr+sizeof(struct ctdb_ltdb_header),
195 rec.dsize-sizeof(struct ctdb_ltdb_header)) ) {
196 tdb_remove_flags(ctdb_db->ltdb->tdb, TDB_SEQNUM);
197 seqnum_suppressed = true;
199 if (old.dptr) free(old.dptr);
202 DEBUG(DEBUG_DEBUG, (__location__ " db[%s]: %s record: hash[0x%08x]\n",
204 keep?"storing":"deleting",
208 ret = tdb_store(ctdb_db->ltdb->tdb, key, rec, TDB_REPLACE);
210 ret = tdb_delete(ctdb_db->ltdb->tdb, key);
217 tdb_error(ctdb_db->ltdb->tdb) == TDB_ERR_NOEXIST)
222 DEBUG(lvl, (__location__ " db[%s]: Failed to %s record: "
225 keep?"store":"delete", ret,
226 tdb_errorstr(ctdb_db->ltdb->tdb)));
228 schedule_for_deletion = false;
230 if (seqnum_suppressed) {
231 tdb_add_flags(ctdb_db->ltdb->tdb, TDB_SEQNUM);
234 talloc_free(rec.dptr);
236 if (schedule_for_deletion) {
238 ret2 = ctdb_local_schedule_for_deletion(ctdb_db, header, key);
240 DEBUG(DEBUG_ERR, (__location__ " ctdb_local_schedule_for_deletion failed.\n"));
247 struct lock_fetch_state {
248 struct ctdb_context *ctdb;
249 void (*recv_pkt)(void *, struct ctdb_req_header *);
251 struct ctdb_req_header *hdr;
253 bool ignore_generation;
257 called when we should retry the operation
259 static void lock_fetch_callback(void *p)
261 struct lock_fetch_state *state = talloc_get_type(p, struct lock_fetch_state);
262 if (!state->ignore_generation &&
263 state->generation != state->ctdb->vnn_map->generation) {
264 DEBUG(DEBUG_NOTICE,("Discarding previous generation lockwait packet\n"));
265 talloc_free(state->hdr);
268 state->recv_pkt(state->recv_context, state->hdr);
269 DEBUG(DEBUG_INFO,(__location__ " PACKET REQUEUED\n"));
274 do a non-blocking ltdb_lock, deferring this ctdb request until we
277 It does the following:
279 1) tries to get the chainlock. If it succeeds, then it returns 0
281 2) if it fails to get a chainlock immediately then it sets up a
282 non-blocking chainlock via ctdb_lockwait, and when it gets the
283 chainlock it re-submits this ctdb request to the main packet
286 This effectively queues all ctdb requests that cannot be
287 immediately satisfied until it can get the lock. This means that
288 the main ctdb daemon will not block waiting for a chainlock held by
291 There are 3 possible return values:
293 0: means that it got the lock immediately.
294 -1: means that it failed to get the lock, and won't retry
295 -2: means that it failed to get the lock immediately, but will retry
297 int ctdb_ltdb_lock_requeue(struct ctdb_db_context *ctdb_db,
298 TDB_DATA key, struct ctdb_req_header *hdr,
299 void (*recv_pkt)(void *, struct ctdb_req_header *),
300 void *recv_context, bool ignore_generation)
303 struct tdb_context *tdb = ctdb_db->ltdb->tdb;
304 struct lockwait_handle *h;
305 struct lock_fetch_state *state;
307 ret = tdb_chainlock_nonblock(tdb, key);
310 !(errno == EACCES || errno == EAGAIN || errno == EDEADLK)) {
311 /* a hard failure - don't try again */
315 /* when torturing, ensure we test the contended path */
316 if ((ctdb_db->ctdb->flags & CTDB_FLAG_TORTURE) &&
319 tdb_chainunlock(tdb, key);
322 /* first the non-contended path */
327 state = talloc(hdr, struct lock_fetch_state);
328 state->ctdb = ctdb_db->ctdb;
330 state->recv_pkt = recv_pkt;
331 state->recv_context = recv_context;
332 state->generation = ctdb_db->ctdb->vnn_map->generation;
333 state->ignore_generation = ignore_generation;
335 /* now the contended path */
336 h = ctdb_lockwait(ctdb_db, key, lock_fetch_callback, state);
341 /* we need to move the packet off the temporary context in ctdb_input_pkt(),
342 so it won't be freed yet */
343 talloc_steal(state, hdr);
344 talloc_steal(state, h);
346 /* now tell the caller than we will retry asynchronously */
351 a varient of ctdb_ltdb_lock_requeue that also fetches the record
353 int ctdb_ltdb_lock_fetch_requeue(struct ctdb_db_context *ctdb_db,
354 TDB_DATA key, struct ctdb_ltdb_header *header,
355 struct ctdb_req_header *hdr, TDB_DATA *data,
356 void (*recv_pkt)(void *, struct ctdb_req_header *),
357 void *recv_context, bool ignore_generation)
361 ret = ctdb_ltdb_lock_requeue(ctdb_db, key, hdr, recv_pkt,
362 recv_context, ignore_generation);
364 ret = ctdb_ltdb_fetch(ctdb_db, key, header, hdr, data);
367 uret = ctdb_ltdb_unlock(ctdb_db, key);
369 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", uret));
378 paraoid check to see if the db is empty
380 static void ctdb_check_db_empty(struct ctdb_db_context *ctdb_db)
382 struct tdb_context *tdb = ctdb_db->ltdb->tdb;
383 int count = tdb_traverse_read(tdb, NULL, NULL);
385 DEBUG(DEBUG_ALERT,(__location__ " tdb '%s' not empty on attach! aborting\n",
387 ctdb_fatal(ctdb_db->ctdb, "database not empty on attach");
391 int ctdb_load_persistent_health(struct ctdb_context *ctdb,
392 struct ctdb_db_context *ctdb_db)
394 struct tdb_context *tdb = ctdb->db_persistent_health->tdb;
400 key.dptr = discard_const_p(uint8_t, ctdb_db->db_name);
401 key.dsize = strlen(ctdb_db->db_name);
403 old = ctdb_db->unhealthy_reason;
404 ctdb_db->unhealthy_reason = NULL;
406 val = tdb_fetch(tdb, key);
408 reason = talloc_strndup(ctdb_db,
409 (const char *)val.dptr,
411 if (reason == NULL) {
412 DEBUG(DEBUG_ALERT,(__location__ " talloc_strndup(%d) failed\n",
414 ctdb_db->unhealthy_reason = old;
425 ctdb_db->unhealthy_reason = reason;
429 int ctdb_update_persistent_health(struct ctdb_context *ctdb,
430 struct ctdb_db_context *ctdb_db,
431 const char *given_reason,/* NULL means healthy */
432 int num_healthy_nodes)
434 struct tdb_context *tdb = ctdb->db_persistent_health->tdb;
438 char *new_reason = NULL;
439 char *old_reason = NULL;
441 ret = tdb_transaction_start(tdb);
443 DEBUG(DEBUG_ALERT,(__location__ " tdb_transaction_start('%s') failed: %d - %s\n",
444 tdb_name(tdb), ret, tdb_errorstr(tdb)));
448 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
450 DEBUG(DEBUG_ALERT,(__location__ " ctdb_load_persistent_health('%s') failed: %d\n",
451 ctdb_db->db_name, ret));
454 old_reason = ctdb_db->unhealthy_reason;
456 key.dptr = discard_const_p(uint8_t, ctdb_db->db_name);
457 key.dsize = strlen(ctdb_db->db_name);
460 new_reason = talloc_strdup(ctdb_db, given_reason);
461 if (new_reason == NULL) {
462 DEBUG(DEBUG_ALERT,(__location__ " talloc_strdup(%s) failed\n",
466 } else if (old_reason && num_healthy_nodes == 0) {
468 * If the reason indicates ok, but there where no healthy nodes
469 * available, that it means, we have not recovered valid content
470 * of the db. So if there's an old reason, prefix it with
471 * "NO-HEALTHY-NODES - "
475 #define _TMP_PREFIX "NO-HEALTHY-NODES - "
476 ret = strncmp(_TMP_PREFIX, old_reason, strlen(_TMP_PREFIX));
478 prefix = _TMP_PREFIX;
482 new_reason = talloc_asprintf(ctdb_db, "%s%s",
484 if (new_reason == NULL) {
485 DEBUG(DEBUG_ALERT,(__location__ " talloc_asprintf(%s%s) failed\n",
486 prefix, old_reason));
493 val.dptr = discard_const_p(uint8_t, new_reason);
494 val.dsize = strlen(new_reason);
496 ret = tdb_store(tdb, key, val, TDB_REPLACE);
498 tdb_transaction_cancel(tdb);
499 DEBUG(DEBUG_ALERT,(__location__ " tdb_store('%s', %s, %s) failed: %d - %s\n",
500 tdb_name(tdb), ctdb_db->db_name, new_reason,
501 ret, tdb_errorstr(tdb)));
502 talloc_free(new_reason);
505 DEBUG(DEBUG_ALERT,("Updated db health for db(%s) to: %s\n",
506 ctdb_db->db_name, new_reason));
507 } else if (old_reason) {
508 ret = tdb_delete(tdb, key);
510 tdb_transaction_cancel(tdb);
511 DEBUG(DEBUG_ALERT,(__location__ " tdb_delete('%s', %s) failed: %d - %s\n",
512 tdb_name(tdb), ctdb_db->db_name,
513 ret, tdb_errorstr(tdb)));
514 talloc_free(new_reason);
517 DEBUG(DEBUG_NOTICE,("Updated db health for db(%s): OK\n",
521 ret = tdb_transaction_commit(tdb);
522 if (ret != TDB_SUCCESS) {
523 DEBUG(DEBUG_ALERT,(__location__ " tdb_transaction_commit('%s') failed: %d - %s\n",
524 tdb_name(tdb), ret, tdb_errorstr(tdb)));
525 talloc_free(new_reason);
529 talloc_free(old_reason);
530 ctdb_db->unhealthy_reason = new_reason;
535 static int ctdb_backup_corrupted_tdb(struct ctdb_context *ctdb,
536 struct ctdb_db_context *ctdb_db)
538 time_t now = time(NULL);
546 /* formatted like: foo.tdb.0.corrupted.20091204160825.0Z */
547 new_path = talloc_asprintf(ctdb_db, "%s.corrupted."
548 "%04u%02u%02u%02u%02u%02u.0Z",
550 tm->tm_year+1900, tm->tm_mon+1,
551 tm->tm_mday, tm->tm_hour, tm->tm_min,
553 if (new_path == NULL) {
554 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
558 new_reason = talloc_asprintf(ctdb_db,
559 "ERROR - Backup of corrupted TDB in '%s'",
561 if (new_reason == NULL) {
562 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
565 ret = ctdb_update_persistent_health(ctdb, ctdb_db, new_reason, 0);
566 talloc_free(new_reason);
568 DEBUG(DEBUG_CRIT,(__location__
569 ": ctdb_backup_corrupted_tdb(%s) not implemented yet\n",
574 ret = rename(ctdb_db->db_path, new_path);
576 DEBUG(DEBUG_CRIT,(__location__
577 ": ctdb_backup_corrupted_tdb(%s) rename to %s failed: %d - %s\n",
578 ctdb_db->db_path, new_path,
579 errno, strerror(errno)));
580 talloc_free(new_path);
584 DEBUG(DEBUG_CRIT,(__location__
585 ": ctdb_backup_corrupted_tdb(%s) renamed to %s\n",
586 ctdb_db->db_path, new_path));
587 talloc_free(new_path);
591 int ctdb_recheck_persistent_health(struct ctdb_context *ctdb)
593 struct ctdb_db_context *ctdb_db;
598 for (ctdb_db = ctdb->db_list; ctdb_db; ctdb_db = ctdb_db->next) {
599 if (!ctdb_db->persistent) {
603 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
605 DEBUG(DEBUG_ALERT,(__location__
606 " load persistent health for '%s' failed\n",
611 if (ctdb_db->unhealthy_reason == NULL) {
613 DEBUG(DEBUG_INFO,(__location__
614 " persistent db '%s' healthy\n",
620 DEBUG(DEBUG_ALERT,(__location__
621 " persistent db '%s' unhealthy: %s\n",
623 ctdb_db->unhealthy_reason));
625 DEBUG((fail!=0)?DEBUG_ALERT:DEBUG_NOTICE,
626 ("ctdb_recheck_presistent_health: OK[%d] FAIL[%d]\n",
638 mark a database - as healthy
640 int32_t ctdb_control_db_set_healthy(struct ctdb_context *ctdb, TDB_DATA indata)
642 uint32_t db_id = *(uint32_t *)indata.dptr;
643 struct ctdb_db_context *ctdb_db;
645 bool may_recover = false;
647 ctdb_db = find_ctdb_db(ctdb, db_id);
649 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%x\n", db_id));
653 if (ctdb_db->unhealthy_reason) {
657 ret = ctdb_update_persistent_health(ctdb, ctdb_db, NULL, 1);
659 DEBUG(DEBUG_ERR,(__location__
660 " ctdb_update_persistent_health(%s) failed\n",
665 if (may_recover && !ctdb->done_startup) {
666 DEBUG(DEBUG_ERR, (__location__ " db %s become healthy - force recovery for startup\n",
668 ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
674 int32_t ctdb_control_db_get_health(struct ctdb_context *ctdb,
678 uint32_t db_id = *(uint32_t *)indata.dptr;
679 struct ctdb_db_context *ctdb_db;
682 ctdb_db = find_ctdb_db(ctdb, db_id);
684 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%x\n", db_id));
688 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
690 DEBUG(DEBUG_ERR,(__location__
691 " ctdb_load_persistent_health(%s) failed\n",
697 if (ctdb_db->unhealthy_reason) {
698 outdata->dptr = (uint8_t *)ctdb_db->unhealthy_reason;
699 outdata->dsize = strlen(ctdb_db->unhealthy_reason)+1;
706 attach to a database, handling both persistent and non-persistent databases
707 return 0 on success, -1 on failure
709 static int ctdb_local_attach(struct ctdb_context *ctdb, const char *db_name,
710 bool persistent, const char *unhealthy_reason,
713 struct ctdb_db_context *ctdb_db, *tmp_db;
718 int remaining_tries = 0;
720 ctdb_db = talloc_zero(ctdb, struct ctdb_db_context);
721 CTDB_NO_MEMORY(ctdb, ctdb_db);
723 ctdb_db->priority = 1;
724 ctdb_db->ctdb = ctdb;
725 ctdb_db->db_name = talloc_strdup(ctdb_db, db_name);
726 CTDB_NO_MEMORY(ctdb, ctdb_db->db_name);
728 key.dsize = strlen(db_name)+1;
729 key.dptr = discard_const(db_name);
730 ctdb_db->db_id = ctdb_hash(&key);
731 ctdb_db->persistent = persistent;
733 if (!ctdb_db->persistent) {
734 ctdb_db->delete_queue = trbt_create(ctdb_db, 0);
735 if (ctdb_db->delete_queue == NULL) {
736 CTDB_NO_MEMORY(ctdb, ctdb_db->delete_queue);
739 ctdb_db->ctdb_ltdb_store_fn = ctdb_ltdb_store_server;
742 /* check for hash collisions */
743 for (tmp_db=ctdb->db_list;tmp_db;tmp_db=tmp_db->next) {
744 if (tmp_db->db_id == ctdb_db->db_id) {
745 DEBUG(DEBUG_CRIT,("db_id 0x%x hash collision. name1='%s' name2='%s'\n",
746 tmp_db->db_id, db_name, tmp_db->db_name));
747 talloc_free(ctdb_db);
753 if (unhealthy_reason) {
754 ret = ctdb_update_persistent_health(ctdb, ctdb_db,
755 unhealthy_reason, 0);
757 DEBUG(DEBUG_ALERT,(__location__ " ctdb_update_persistent_health('%s','%s') failed: %d\n",
758 ctdb_db->db_name, unhealthy_reason, ret));
759 talloc_free(ctdb_db);
764 if (ctdb->max_persistent_check_errors > 0) {
767 if (ctdb->done_startup) {
771 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
773 DEBUG(DEBUG_ALERT,(__location__ " ctdb_load_persistent_health('%s') failed: %d\n",
774 ctdb_db->db_name, ret));
775 talloc_free(ctdb_db);
780 if (ctdb_db->unhealthy_reason && remaining_tries == 0) {
781 DEBUG(DEBUG_ALERT,(__location__ "ERROR: tdb %s is marked as unhealthy: %s\n",
782 ctdb_db->db_name, ctdb_db->unhealthy_reason));
783 talloc_free(ctdb_db);
787 if (ctdb_db->unhealthy_reason) {
788 /* this is just a warning, but we want that in the log file! */
789 DEBUG(DEBUG_ALERT,(__location__ "Warning: tdb %s is marked as unhealthy: %s\n",
790 ctdb_db->db_name, ctdb_db->unhealthy_reason));
793 /* open the database */
794 ctdb_db->db_path = talloc_asprintf(ctdb_db, "%s/%s.%u",
795 persistent?ctdb->db_directory_persistent:ctdb->db_directory,
798 tdb_flags = persistent? TDB_DEFAULT : TDB_CLEAR_IF_FIRST | TDB_NOSYNC;
799 if (ctdb->valgrinding) {
800 tdb_flags |= TDB_NOMMAP;
802 tdb_flags |= TDB_DISALLOW_NESTING;
804 tdb_flags |= TDB_INCOMPATIBLE_HASH;
808 ctdb_db->ltdb = tdb_wrap_open(ctdb, ctdb_db->db_path,
809 ctdb->tunable.database_hash_size,
811 O_CREAT|O_RDWR, mode);
812 if (ctdb_db->ltdb == NULL) {
814 int saved_errno = errno;
817 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s': %d - %s\n",
820 strerror(saved_errno)));
821 talloc_free(ctdb_db);
825 if (remaining_tries == 0) {
826 DEBUG(DEBUG_CRIT,(__location__
827 "Failed to open persistent tdb '%s': %d - %s\n",
830 strerror(saved_errno)));
831 talloc_free(ctdb_db);
835 ret = stat(ctdb_db->db_path, &st);
837 DEBUG(DEBUG_CRIT,(__location__
838 "Failed to open persistent tdb '%s': %d - %s\n",
841 strerror(saved_errno)));
842 talloc_free(ctdb_db);
846 ret = ctdb_backup_corrupted_tdb(ctdb, ctdb_db);
848 DEBUG(DEBUG_CRIT,(__location__
849 "Failed to open persistent tdb '%s': %d - %s\n",
852 strerror(saved_errno)));
853 talloc_free(ctdb_db);
863 ctdb_check_db_empty(ctdb_db);
865 ret = tdb_check(ctdb_db->ltdb->tdb, NULL, NULL);
870 DEBUG(DEBUG_CRIT,("tdb_check(%s) failed: %d - %s\n",
871 ctdb_db->db_path, ret,
872 tdb_errorstr(ctdb_db->ltdb->tdb)));
873 if (remaining_tries == 0) {
874 talloc_free(ctdb_db);
878 fd = tdb_fd(ctdb_db->ltdb->tdb);
879 ret = fstat(fd, &st);
881 DEBUG(DEBUG_CRIT,(__location__
882 "Failed to fstat() persistent tdb '%s': %d - %s\n",
886 talloc_free(ctdb_db);
891 talloc_free(ctdb_db->ltdb);
892 ctdb_db->ltdb = NULL;
894 ret = ctdb_backup_corrupted_tdb(ctdb, ctdb_db);
896 DEBUG(DEBUG_CRIT,("Failed to backup corrupted tdb '%s'\n",
898 talloc_free(ctdb_db);
908 DLIST_ADD(ctdb->db_list, ctdb_db);
910 /* setting this can help some high churn databases */
911 tdb_set_max_dead(ctdb_db->ltdb->tdb, ctdb->tunable.database_max_dead);
914 all databases support the "null" function. we need this in
915 order to do forced migration of records
917 ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_null_func, CTDB_NULL_FUNC);
919 DEBUG(DEBUG_CRIT,("Failed to setup null function for '%s'\n", ctdb_db->db_name));
920 talloc_free(ctdb_db);
925 all databases support the "fetch" function. we need this
926 for efficient Samba3 ctdb fetch
928 ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_fetch_func, CTDB_FETCH_FUNC);
930 DEBUG(DEBUG_CRIT,("Failed to setup fetch function for '%s'\n", ctdb_db->db_name));
931 talloc_free(ctdb_db);
935 ret = ctdb_vacuum_init(ctdb_db);
937 DEBUG(DEBUG_CRIT,("Failed to setup vacuuming for "
938 "database '%s'\n", ctdb_db->db_name));
939 talloc_free(ctdb_db);
944 DEBUG(DEBUG_INFO,("Attached to database '%s'\n", ctdb_db->db_path));
951 struct ctdb_deferred_attach_context {
952 struct ctdb_deferred_attach_context *next, *prev;
953 struct ctdb_context *ctdb;
954 struct ctdb_req_control *c;
958 static int ctdb_deferred_attach_destructor(struct ctdb_deferred_attach_context *da_ctx)
960 DLIST_REMOVE(da_ctx->ctdb->deferred_attach, da_ctx);
965 static void ctdb_deferred_attach_timeout(struct event_context *ev, struct timed_event *te, struct timeval t, void *private_data)
967 struct ctdb_deferred_attach_context *da_ctx = talloc_get_type(private_data, struct ctdb_deferred_attach_context);
968 struct ctdb_context *ctdb = da_ctx->ctdb;
970 ctdb_request_control_reply(ctdb, da_ctx->c, NULL, -1, NULL);
974 static void ctdb_deferred_attach_callback(struct event_context *ev, struct timed_event *te, struct timeval t, void *private_data)
976 struct ctdb_deferred_attach_context *da_ctx = talloc_get_type(private_data, struct ctdb_deferred_attach_context);
977 struct ctdb_context *ctdb = da_ctx->ctdb;
979 /* This talloc-steals the packet ->c */
980 ctdb_input_pkt(ctdb, (struct ctdb_req_header *)da_ctx->c);
984 int ctdb_process_deferred_attach(struct ctdb_context *ctdb)
986 struct ctdb_deferred_attach_context *da_ctx;
988 /* call it from the main event loop as soon as the current event
991 while ((da_ctx = ctdb->deferred_attach) != NULL) {
992 DLIST_REMOVE(ctdb->deferred_attach, da_ctx);
993 event_add_timed(ctdb->ev, da_ctx, timeval_current_ofs(1,0), ctdb_deferred_attach_callback, da_ctx);
1000 a client has asked to attach a new database
1002 int32_t ctdb_control_db_attach(struct ctdb_context *ctdb, TDB_DATA indata,
1003 TDB_DATA *outdata, uint64_t tdb_flags,
1004 bool persistent, uint32_t client_id,
1005 struct ctdb_req_control *c,
1008 const char *db_name = (const char *)indata.dptr;
1009 struct ctdb_db_context *db;
1010 struct ctdb_node *node = ctdb->nodes[ctdb->pnn];
1011 struct ctdb_client *client = NULL;
1013 if (ctdb->tunable.allow_client_db_attach == 0) {
1014 DEBUG(DEBUG_ERR, ("DB Attach to database %s denied by tunable "
1015 "AllowClientDBAccess == 0\n", db_name));
1019 /* dont allow any local clients to attach while we are in recovery mode
1020 * except for the recovery daemon.
1021 * allow all attach from the network since these are always from remote
1024 if (client_id != 0) {
1025 client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
1027 if (client != NULL) {
1028 /* If the node is inactive it is not part of the cluster
1029 and we should not allow clients to attach to any
1032 if (node->flags & NODE_FLAGS_INACTIVE) {
1033 DEBUG(DEBUG_ERR,("DB Attach to database %s refused since node is inactive (disconnected or banned)\n", db_name));
1037 if (ctdb->recovery_mode == CTDB_RECOVERY_ACTIVE
1038 && client->pid != ctdb->recoverd_pid
1039 && !ctdb->done_startup) {
1040 struct ctdb_deferred_attach_context *da_ctx = talloc(client, struct ctdb_deferred_attach_context);
1042 if (da_ctx == NULL) {
1043 DEBUG(DEBUG_ERR,("DB Attach to database %s deferral for client with pid:%d failed due to OOM.\n", db_name, client->pid));
1047 da_ctx->ctdb = ctdb;
1048 da_ctx->c = talloc_steal(da_ctx, c);
1049 talloc_set_destructor(da_ctx, ctdb_deferred_attach_destructor);
1050 DLIST_ADD(ctdb->deferred_attach, da_ctx);
1052 event_add_timed(ctdb->ev, da_ctx, timeval_current_ofs(ctdb->tunable.deferred_attach_timeout, 0), ctdb_deferred_attach_timeout, da_ctx);
1054 DEBUG(DEBUG_ERR,("DB Attach to database %s deferred for client with pid:%d since node is in recovery mode.\n", db_name, client->pid));
1055 *async_reply = true;
1060 /* the client can optionally pass additional tdb flags, but we
1061 only allow a subset of those on the database in ctdb. Note
1062 that tdb_flags is passed in via the (otherwise unused)
1063 srvid to the attach control */
1064 tdb_flags &= (TDB_NOSYNC|TDB_INCOMPATIBLE_HASH);
1066 /* see if we already have this name */
1067 db = ctdb_db_handle(ctdb, db_name);
1069 outdata->dptr = (uint8_t *)&db->db_id;
1070 outdata->dsize = sizeof(db->db_id);
1071 tdb_add_flags(db->ltdb->tdb, tdb_flags);
1075 if (ctdb_local_attach(ctdb, db_name, persistent, NULL, (tdb_flags&TDB_INCOMPATIBLE_HASH)?true:false) != 0) {
1079 db = ctdb_db_handle(ctdb, db_name);
1081 DEBUG(DEBUG_ERR,("Failed to find db handle for name '%s'\n", db_name));
1085 /* remember the flags the client has specified */
1086 tdb_add_flags(db->ltdb->tdb, tdb_flags);
1088 outdata->dptr = (uint8_t *)&db->db_id;
1089 outdata->dsize = sizeof(db->db_id);
1091 /* Try to ensure it's locked in mem */
1092 ctdb_lockdown_memory(ctdb);
1094 /* tell all the other nodes about this database */
1095 ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL, tdb_flags,
1096 persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:
1097 CTDB_CONTROL_DB_ATTACH,
1098 0, CTDB_CTRL_FLAG_NOREPLY,
1099 indata, NULL, NULL);
1107 attach to all existing persistent databases
1109 static int ctdb_attach_persistent(struct ctdb_context *ctdb,
1110 const char *unhealthy_reason)
1115 /* open the persistent db directory and scan it for files */
1116 d = opendir(ctdb->db_directory_persistent);
1121 while ((de=readdir(d))) {
1123 size_t len = strlen(de->d_name);
1125 int invalid_name = 0;
1127 s = talloc_strdup(ctdb, de->d_name);
1128 CTDB_NO_MEMORY(ctdb, s);
1130 /* only accept names ending in .tdb */
1131 p = strstr(s, ".tdb.");
1132 if (len < 7 || p == NULL) {
1137 /* only accept names ending with .tdb. and any number of digits */
1139 while (*q != 0 && invalid_name == 0) {
1140 if (!isdigit(*q++)) {
1144 if (invalid_name == 1 || sscanf(p+5, "%u", &node) != 1 || node != ctdb->pnn) {
1145 DEBUG(DEBUG_ERR,("Ignoring persistent database '%s'\n", de->d_name));
1151 if (ctdb_local_attach(ctdb, s, true, unhealthy_reason, 0) != 0) {
1152 DEBUG(DEBUG_ERR,("Failed to attach to persistent database '%s'\n", de->d_name));
1158 DEBUG(DEBUG_INFO,("Attached to persistent database %s\n", s));
1166 int ctdb_attach_databases(struct ctdb_context *ctdb)
1169 char *persistent_health_path = NULL;
1170 char *unhealthy_reason = NULL;
1171 bool first_try = true;
1173 if (ctdb->db_directory == NULL) {
1174 ctdb->db_directory = VARDIR "/ctdb";
1176 if (ctdb->db_directory_persistent == NULL) {
1177 ctdb->db_directory_persistent = VARDIR "/ctdb/persistent";
1179 if (ctdb->db_directory_state == NULL) {
1180 ctdb->db_directory_state = VARDIR "/ctdb/state";
1183 /* make sure the db directory exists */
1184 ret = mkdir(ctdb->db_directory, 0700);
1185 if (ret == -1 && errno != EEXIST) {
1186 DEBUG(DEBUG_CRIT,(__location__ " Unable to create ctdb directory '%s'\n",
1187 ctdb->db_directory));
1191 /* make sure the persistent db directory exists */
1192 ret = mkdir(ctdb->db_directory_persistent, 0700);
1193 if (ret == -1 && errno != EEXIST) {
1194 DEBUG(DEBUG_CRIT,(__location__ " Unable to create ctdb persistent directory '%s'\n",
1195 ctdb->db_directory_persistent));
1199 /* make sure the internal state db directory exists */
1200 ret = mkdir(ctdb->db_directory_state, 0700);
1201 if (ret == -1 && errno != EEXIST) {
1202 DEBUG(DEBUG_CRIT,(__location__ " Unable to create ctdb state directory '%s'\n",
1203 ctdb->db_directory_state));
1207 persistent_health_path = talloc_asprintf(ctdb, "%s/%s.%u",
1208 ctdb->db_directory_state,
1209 PERSISTENT_HEALTH_TDB,
1211 if (persistent_health_path == NULL) {
1212 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
1218 ctdb->db_persistent_health = tdb_wrap_open(ctdb, persistent_health_path,
1219 0, TDB_DISALLOW_NESTING,
1220 O_CREAT | O_RDWR, 0600);
1221 if (ctdb->db_persistent_health == NULL) {
1222 struct tdb_wrap *tdb;
1225 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s': %d - %s\n",
1226 persistent_health_path,
1229 talloc_free(persistent_health_path);
1230 talloc_free(unhealthy_reason);
1235 unhealthy_reason = talloc_asprintf(ctdb, "WARNING - '%s' %s - %s",
1236 persistent_health_path,
1237 "was cleared after a failure",
1238 "manual verification needed");
1239 if (unhealthy_reason == NULL) {
1240 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
1241 talloc_free(persistent_health_path);
1245 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - retrying after CLEAR_IF_FIRST\n",
1246 persistent_health_path));
1247 tdb = tdb_wrap_open(ctdb, persistent_health_path,
1248 0, TDB_CLEAR_IF_FIRST | TDB_DISALLOW_NESTING,
1249 O_CREAT | O_RDWR, 0600);
1251 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - with CLEAR_IF_FIRST: %d - %s\n",
1252 persistent_health_path,
1255 talloc_free(persistent_health_path);
1256 talloc_free(unhealthy_reason);
1263 ret = tdb_check(ctdb->db_persistent_health->tdb, NULL, NULL);
1265 struct tdb_wrap *tdb;
1267 talloc_free(ctdb->db_persistent_health);
1268 ctdb->db_persistent_health = NULL;
1271 DEBUG(DEBUG_CRIT,("tdb_check('%s') failed\n",
1272 persistent_health_path));
1273 talloc_free(persistent_health_path);
1274 talloc_free(unhealthy_reason);
1279 unhealthy_reason = talloc_asprintf(ctdb, "WARNING - '%s' %s - %s",
1280 persistent_health_path,
1281 "was cleared after a failure",
1282 "manual verification needed");
1283 if (unhealthy_reason == NULL) {
1284 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
1285 talloc_free(persistent_health_path);
1289 DEBUG(DEBUG_CRIT,("tdb_check('%s') failed - retrying after CLEAR_IF_FIRST\n",
1290 persistent_health_path));
1291 tdb = tdb_wrap_open(ctdb, persistent_health_path,
1292 0, TDB_CLEAR_IF_FIRST | TDB_DISALLOW_NESTING,
1293 O_CREAT | O_RDWR, 0600);
1295 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - with CLEAR_IF_FIRST: %d - %s\n",
1296 persistent_health_path,
1299 talloc_free(persistent_health_path);
1300 talloc_free(unhealthy_reason);
1307 talloc_free(persistent_health_path);
1309 ret = ctdb_attach_persistent(ctdb, unhealthy_reason);
1310 talloc_free(unhealthy_reason);
1319 called when a broadcast seqnum update comes in
1321 int32_t ctdb_ltdb_update_seqnum(struct ctdb_context *ctdb, uint32_t db_id, uint32_t srcnode)
1323 struct ctdb_db_context *ctdb_db;
1324 if (srcnode == ctdb->pnn) {
1325 /* don't update ourselves! */
1329 ctdb_db = find_ctdb_db(ctdb, db_id);
1331 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_ltdb_update_seqnum\n", db_id));
1335 if (ctdb_db->unhealthy_reason) {
1336 DEBUG(DEBUG_ERR,("db(%s) unhealty in ctdb_ltdb_update_seqnum: %s\n",
1337 ctdb_db->db_name, ctdb_db->unhealthy_reason));
1341 tdb_increment_seqnum_nonblock(ctdb_db->ltdb->tdb);
1342 ctdb_db->seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1347 timer to check for seqnum changes in a ltdb and propogate them
1349 static void ctdb_ltdb_seqnum_check(struct event_context *ev, struct timed_event *te,
1350 struct timeval t, void *p)
1352 struct ctdb_db_context *ctdb_db = talloc_get_type(p, struct ctdb_db_context);
1353 struct ctdb_context *ctdb = ctdb_db->ctdb;
1354 uint32_t new_seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1355 if (new_seqnum != ctdb_db->seqnum) {
1356 /* something has changed - propogate it */
1358 data.dptr = (uint8_t *)&ctdb_db->db_id;
1359 data.dsize = sizeof(uint32_t);
1360 ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_VNNMAP, 0,
1361 CTDB_CONTROL_UPDATE_SEQNUM, 0, CTDB_CTRL_FLAG_NOREPLY,
1364 ctdb_db->seqnum = new_seqnum;
1366 /* setup a new timer */
1367 ctdb_db->seqnum_update =
1368 event_add_timed(ctdb->ev, ctdb_db,
1369 timeval_current_ofs(ctdb->tunable.seqnum_interval/1000, (ctdb->tunable.seqnum_interval%1000)*1000),
1370 ctdb_ltdb_seqnum_check, ctdb_db);
1374 enable seqnum handling on this db
1376 int32_t ctdb_ltdb_enable_seqnum(struct ctdb_context *ctdb, uint32_t db_id)
1378 struct ctdb_db_context *ctdb_db;
1379 ctdb_db = find_ctdb_db(ctdb, db_id);
1381 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_ltdb_enable_seqnum\n", db_id));
1385 if (ctdb_db->seqnum_update == NULL) {
1386 ctdb_db->seqnum_update =
1387 event_add_timed(ctdb->ev, ctdb_db,
1388 timeval_current_ofs(ctdb->tunable.seqnum_interval/1000, (ctdb->tunable.seqnum_interval%1000)*1000),
1389 ctdb_ltdb_seqnum_check, ctdb_db);
1392 tdb_enable_seqnum(ctdb_db->ltdb->tdb);
1393 ctdb_db->seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1397 int32_t ctdb_control_set_db_priority(struct ctdb_context *ctdb, TDB_DATA indata)
1399 struct ctdb_db_priority *db_prio = (struct ctdb_db_priority *)indata.dptr;
1400 struct ctdb_db_context *ctdb_db;
1402 ctdb_db = find_ctdb_db(ctdb, db_prio->db_id);
1404 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_set_db_priority\n", db_prio->db_id));
1408 if ((db_prio->priority<1) || (db_prio->priority>NUM_DB_PRIORITIES)) {
1409 DEBUG(DEBUG_ERR,("Trying to set invalid priority : %u\n", db_prio->priority));
1413 ctdb_db->priority = db_prio->priority;
1414 DEBUG(DEBUG_INFO,("Setting DB priority to %u for db 0x%08x\n", db_prio->priority, db_prio->db_id));