2 ctdb ltdb code - server side
4 Copyright (C) Andrew Tridgell 2007
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program; if not, see <http://www.gnu.org/licenses/>.
21 #include "lib/tdb/include/tdb.h"
22 #include "system/network.h"
23 #include "system/filesys.h"
24 #include "system/dir.h"
25 #include "system/time.h"
26 #include "../include/ctdb_private.h"
27 #include "../common/rb_tree.h"
29 #include "lib/util/dlinklist.h"
32 #define PERSISTENT_HEALTH_TDB "persistent_health.tdb"
35 * write a record to a normal database
37 * This is the server-variant of the ctdb_ltdb_store function.
38 * It contains logic to determine whether a record should be
39 * stored or deleted. It also sends SCHEDULE_FOR_DELETION
40 * controls to the local ctdb daemon if apporpriate.
42 static int ctdb_ltdb_store_server(struct ctdb_db_context *ctdb_db,
44 struct ctdb_ltdb_header *header,
47 struct ctdb_context *ctdb = ctdb_db->ctdb;
50 bool seqnum_suppressed = false;
52 bool schedule_for_deletion = false;
53 bool remove_from_delete_queue = false;
56 if (ctdb->flags & CTDB_FLAG_TORTURE) {
57 struct ctdb_ltdb_header *h2;
58 rec = tdb_fetch(ctdb_db->ltdb->tdb, key);
59 h2 = (struct ctdb_ltdb_header *)rec.dptr;
60 if (rec.dptr && rec.dsize >= sizeof(h2) && h2->rsn > header->rsn) {
61 DEBUG(DEBUG_CRIT,("RSN regression! %llu %llu\n",
62 (unsigned long long)h2->rsn, (unsigned long long)header->rsn));
64 if (rec.dptr) free(rec.dptr);
67 if (ctdb->vnn_map == NULL) {
69 * Called from a client: always store the record
70 * Also don't call ctdb_lmaster since it uses the vnn_map!
76 lmaster = ctdb_lmaster(ctdb_db->ctdb, &key);
79 * If we migrate an empty record off to another node
80 * and the record has not been migrated with data,
81 * delete the record instead of storing the empty record.
83 if (data.dsize != 0) {
85 } else if (header->flags & (CTDB_REC_RO_HAVE_DELEGATIONS|CTDB_REC_RO_HAVE_READONLY|CTDB_REC_RO_REVOKING_READONLY|CTDB_REC_RO_REVOKE_COMPLETE)) {
87 } else if (ctdb_db->persistent) {
89 } else if (header->flags & CTDB_REC_FLAG_AUTOMATIC) {
91 * The record is not created by the client but
92 * automatically by the ctdb_ltdb_fetch logic that
93 * creates a record with an initial header in the
94 * ltdb before trying to migrate the record from
95 * the current lmaster. Keep it instead of trying
96 * to delete the non-existing record...
99 schedule_for_deletion = true;
100 } else if (header->flags & CTDB_REC_FLAG_MIGRATED_WITH_DATA) {
102 } else if (ctdb_db->ctdb->pnn == lmaster) {
104 * If we are lmaster, then we usually keep the record.
105 * But if we retrieve the dmaster role by a VACUUM_MIGRATE
106 * and the record is empty and has never been migrated
107 * with data, then we should delete it instead of storing it.
108 * This is part of the vacuuming process.
110 * The reason that we usually need to store even empty records
111 * on the lmaster is that a client operating directly on the
112 * lmaster (== dmaster) expects the local copy of the record to
113 * exist after successful ctdb migrate call. If the record does
114 * not exist, the client goes into a migrate loop and eventually
115 * fails. So storing the empty record makes sure that we do not
116 * need to change the client code.
118 if (!(header->flags & CTDB_REC_FLAG_VACUUM_MIGRATED)) {
120 } else if (ctdb_db->ctdb->pnn != header->dmaster) {
123 } else if (ctdb_db->ctdb->pnn == header->dmaster) {
128 if ((data.dsize == 0) &&
129 !ctdb_db->persistent &&
130 (ctdb_db->ctdb->pnn == header->dmaster) &&
131 !(header->flags & (CTDB_REC_RO_HAVE_DELEGATIONS|CTDB_REC_RO_HAVE_READONLY|CTDB_REC_RO_REVOKING_READONLY|CTDB_REC_RO_REVOKE_COMPLETE)))
133 schedule_for_deletion = true;
135 remove_from_delete_queue = !schedule_for_deletion;
140 * The VACUUM_MIGRATED flag is only set temporarily for
141 * the above logic when the record was retrieved by a
142 * VACUUM_MIGRATE call and should not be stored in the
145 * The VACUUM_MIGRATE call is triggered by a vacuum fetch,
146 * and there are two cases in which the corresponding record
147 * is stored in the local database:
148 * 1. The record has been migrated with data in the past
149 * (the MIGRATED_WITH_DATA record flag is set).
150 * 2. The record has been filled with data again since it
151 * had been submitted in the VACUUM_FETCH message to the
153 * For such records it is important to not store the
154 * VACUUM_MIGRATED flag in the database.
156 header->flags &= ~CTDB_REC_FLAG_VACUUM_MIGRATED;
159 * Similarly, clear the AUTOMATIC flag which should not enter
160 * the local database copy since this would require client
161 * modifications to clear the flag when the client stores
164 header->flags &= ~CTDB_REC_FLAG_AUTOMATIC;
166 rec.dsize = sizeof(*header) + data.dsize;
167 rec.dptr = talloc_size(ctdb, rec.dsize);
168 CTDB_NO_MEMORY(ctdb, rec.dptr);
170 memcpy(rec.dptr, header, sizeof(*header));
171 memcpy(rec.dptr + sizeof(*header), data.dptr, data.dsize);
173 /* Databases with seqnum updates enabled only get their seqnum
174 changes when/if we modify the data */
175 if (ctdb_db->seqnum_update != NULL) {
177 old = tdb_fetch(ctdb_db->ltdb->tdb, key);
179 if ( (old.dsize == rec.dsize)
180 && !memcmp(old.dptr+sizeof(struct ctdb_ltdb_header),
181 rec.dptr+sizeof(struct ctdb_ltdb_header),
182 rec.dsize-sizeof(struct ctdb_ltdb_header)) ) {
183 tdb_remove_flags(ctdb_db->ltdb->tdb, TDB_SEQNUM);
184 seqnum_suppressed = true;
186 if (old.dptr) free(old.dptr);
189 DEBUG(DEBUG_DEBUG, (__location__ " db[%s]: %s record: hash[0x%08x]\n",
191 keep?"storing":"deleting",
195 ret = tdb_store(ctdb_db->ltdb->tdb, key, rec, TDB_REPLACE);
197 ret = tdb_delete(ctdb_db->ltdb->tdb, key);
204 tdb_error(ctdb_db->ltdb->tdb) == TDB_ERR_NOEXIST)
209 DEBUG(lvl, (__location__ " db[%s]: Failed to %s record: "
212 keep?"store":"delete", ret,
213 tdb_errorstr(ctdb_db->ltdb->tdb)));
215 schedule_for_deletion = false;
216 remove_from_delete_queue = false;
218 if (seqnum_suppressed) {
219 tdb_add_flags(ctdb_db->ltdb->tdb, TDB_SEQNUM);
222 talloc_free(rec.dptr);
224 if (schedule_for_deletion) {
226 ret2 = ctdb_local_schedule_for_deletion(ctdb_db, header, key);
228 DEBUG(DEBUG_ERR, (__location__ " ctdb_local_schedule_for_deletion failed.\n"));
232 if (remove_from_delete_queue) {
233 ctdb_local_remove_from_delete_queue(ctdb_db, header, key);
239 struct lock_fetch_state {
240 struct ctdb_context *ctdb;
241 void (*recv_pkt)(void *, struct ctdb_req_header *);
243 struct ctdb_req_header *hdr;
245 bool ignore_generation;
249 called when we should retry the operation
251 static void lock_fetch_callback(void *p)
253 struct lock_fetch_state *state = talloc_get_type(p, struct lock_fetch_state);
254 if (!state->ignore_generation &&
255 state->generation != state->ctdb->vnn_map->generation) {
256 DEBUG(DEBUG_NOTICE,("Discarding previous generation lockwait packet\n"));
257 talloc_free(state->hdr);
260 state->recv_pkt(state->recv_context, state->hdr);
261 DEBUG(DEBUG_INFO,(__location__ " PACKET REQUEUED\n"));
266 do a non-blocking ltdb_lock, deferring this ctdb request until we
269 It does the following:
271 1) tries to get the chainlock. If it succeeds, then it returns 0
273 2) if it fails to get a chainlock immediately then it sets up a
274 non-blocking chainlock via ctdb_lockwait, and when it gets the
275 chainlock it re-submits this ctdb request to the main packet
278 This effectively queues all ctdb requests that cannot be
279 immediately satisfied until it can get the lock. This means that
280 the main ctdb daemon will not block waiting for a chainlock held by
283 There are 3 possible return values:
285 0: means that it got the lock immediately.
286 -1: means that it failed to get the lock, and won't retry
287 -2: means that it failed to get the lock immediately, but will retry
289 int ctdb_ltdb_lock_requeue(struct ctdb_db_context *ctdb_db,
290 TDB_DATA key, struct ctdb_req_header *hdr,
291 void (*recv_pkt)(void *, struct ctdb_req_header *),
292 void *recv_context, bool ignore_generation)
295 struct tdb_context *tdb = ctdb_db->ltdb->tdb;
296 struct lockwait_handle *h;
297 struct lock_fetch_state *state;
299 ret = tdb_chainlock_nonblock(tdb, key);
302 !(errno == EACCES || errno == EAGAIN || errno == EDEADLK)) {
303 /* a hard failure - don't try again */
307 /* when torturing, ensure we test the contended path */
308 if ((ctdb_db->ctdb->flags & CTDB_FLAG_TORTURE) &&
311 tdb_chainunlock(tdb, key);
314 /* first the non-contended path */
319 state = talloc(hdr, struct lock_fetch_state);
320 state->ctdb = ctdb_db->ctdb;
322 state->recv_pkt = recv_pkt;
323 state->recv_context = recv_context;
324 state->generation = ctdb_db->ctdb->vnn_map->generation;
325 state->ignore_generation = ignore_generation;
327 /* now the contended path */
328 h = ctdb_lockwait(ctdb_db, key, lock_fetch_callback, state);
333 /* we need to move the packet off the temporary context in ctdb_input_pkt(),
334 so it won't be freed yet */
335 talloc_steal(state, hdr);
336 talloc_steal(state, h);
338 /* now tell the caller than we will retry asynchronously */
343 a varient of ctdb_ltdb_lock_requeue that also fetches the record
345 int ctdb_ltdb_lock_fetch_requeue(struct ctdb_db_context *ctdb_db,
346 TDB_DATA key, struct ctdb_ltdb_header *header,
347 struct ctdb_req_header *hdr, TDB_DATA *data,
348 void (*recv_pkt)(void *, struct ctdb_req_header *),
349 void *recv_context, bool ignore_generation)
353 ret = ctdb_ltdb_lock_requeue(ctdb_db, key, hdr, recv_pkt,
354 recv_context, ignore_generation);
356 ret = ctdb_ltdb_fetch(ctdb_db, key, header, hdr, data);
359 uret = ctdb_ltdb_unlock(ctdb_db, key);
361 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", uret));
370 paraoid check to see if the db is empty
372 static void ctdb_check_db_empty(struct ctdb_db_context *ctdb_db)
374 struct tdb_context *tdb = ctdb_db->ltdb->tdb;
375 int count = tdb_traverse_read(tdb, NULL, NULL);
377 DEBUG(DEBUG_ALERT,(__location__ " tdb '%s' not empty on attach! aborting\n",
379 ctdb_fatal(ctdb_db->ctdb, "database not empty on attach");
383 int ctdb_load_persistent_health(struct ctdb_context *ctdb,
384 struct ctdb_db_context *ctdb_db)
386 struct tdb_context *tdb = ctdb->db_persistent_health->tdb;
392 key.dptr = discard_const_p(uint8_t, ctdb_db->db_name);
393 key.dsize = strlen(ctdb_db->db_name);
395 old = ctdb_db->unhealthy_reason;
396 ctdb_db->unhealthy_reason = NULL;
398 val = tdb_fetch(tdb, key);
400 reason = talloc_strndup(ctdb_db,
401 (const char *)val.dptr,
403 if (reason == NULL) {
404 DEBUG(DEBUG_ALERT,(__location__ " talloc_strndup(%d) failed\n",
406 ctdb_db->unhealthy_reason = old;
417 ctdb_db->unhealthy_reason = reason;
421 int ctdb_update_persistent_health(struct ctdb_context *ctdb,
422 struct ctdb_db_context *ctdb_db,
423 const char *given_reason,/* NULL means healthy */
424 int num_healthy_nodes)
426 struct tdb_context *tdb = ctdb->db_persistent_health->tdb;
430 char *new_reason = NULL;
431 char *old_reason = NULL;
433 ret = tdb_transaction_start(tdb);
435 DEBUG(DEBUG_ALERT,(__location__ " tdb_transaction_start('%s') failed: %d - %s\n",
436 tdb_name(tdb), ret, tdb_errorstr(tdb)));
440 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
442 DEBUG(DEBUG_ALERT,(__location__ " ctdb_load_persistent_health('%s') failed: %d\n",
443 ctdb_db->db_name, ret));
446 old_reason = ctdb_db->unhealthy_reason;
448 key.dptr = discard_const_p(uint8_t, ctdb_db->db_name);
449 key.dsize = strlen(ctdb_db->db_name);
452 new_reason = talloc_strdup(ctdb_db, given_reason);
453 if (new_reason == NULL) {
454 DEBUG(DEBUG_ALERT,(__location__ " talloc_strdup(%s) failed\n",
458 } else if (old_reason && num_healthy_nodes == 0) {
460 * If the reason indicates ok, but there where no healthy nodes
461 * available, that it means, we have not recovered valid content
462 * of the db. So if there's an old reason, prefix it with
463 * "NO-HEALTHY-NODES - "
467 #define _TMP_PREFIX "NO-HEALTHY-NODES - "
468 ret = strncmp(_TMP_PREFIX, old_reason, strlen(_TMP_PREFIX));
470 prefix = _TMP_PREFIX;
474 new_reason = talloc_asprintf(ctdb_db, "%s%s",
476 if (new_reason == NULL) {
477 DEBUG(DEBUG_ALERT,(__location__ " talloc_asprintf(%s%s) failed\n",
478 prefix, old_reason));
485 val.dptr = discard_const_p(uint8_t, new_reason);
486 val.dsize = strlen(new_reason);
488 ret = tdb_store(tdb, key, val, TDB_REPLACE);
490 tdb_transaction_cancel(tdb);
491 DEBUG(DEBUG_ALERT,(__location__ " tdb_store('%s', %s, %s) failed: %d - %s\n",
492 tdb_name(tdb), ctdb_db->db_name, new_reason,
493 ret, tdb_errorstr(tdb)));
494 talloc_free(new_reason);
497 DEBUG(DEBUG_ALERT,("Updated db health for db(%s) to: %s\n",
498 ctdb_db->db_name, new_reason));
499 } else if (old_reason) {
500 ret = tdb_delete(tdb, key);
502 tdb_transaction_cancel(tdb);
503 DEBUG(DEBUG_ALERT,(__location__ " tdb_delete('%s', %s) failed: %d - %s\n",
504 tdb_name(tdb), ctdb_db->db_name,
505 ret, tdb_errorstr(tdb)));
506 talloc_free(new_reason);
509 DEBUG(DEBUG_NOTICE,("Updated db health for db(%s): OK\n",
513 ret = tdb_transaction_commit(tdb);
514 if (ret != TDB_SUCCESS) {
515 DEBUG(DEBUG_ALERT,(__location__ " tdb_transaction_commit('%s') failed: %d - %s\n",
516 tdb_name(tdb), ret, tdb_errorstr(tdb)));
517 talloc_free(new_reason);
521 talloc_free(old_reason);
522 ctdb_db->unhealthy_reason = new_reason;
527 static int ctdb_backup_corrupted_tdb(struct ctdb_context *ctdb,
528 struct ctdb_db_context *ctdb_db)
530 time_t now = time(NULL);
538 /* formatted like: foo.tdb.0.corrupted.20091204160825.0Z */
539 new_path = talloc_asprintf(ctdb_db, "%s.corrupted."
540 "%04u%02u%02u%02u%02u%02u.0Z",
542 tm->tm_year+1900, tm->tm_mon+1,
543 tm->tm_mday, tm->tm_hour, tm->tm_min,
545 if (new_path == NULL) {
546 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
550 new_reason = talloc_asprintf(ctdb_db,
551 "ERROR - Backup of corrupted TDB in '%s'",
553 if (new_reason == NULL) {
554 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
557 ret = ctdb_update_persistent_health(ctdb, ctdb_db, new_reason, 0);
558 talloc_free(new_reason);
560 DEBUG(DEBUG_CRIT,(__location__
561 ": ctdb_backup_corrupted_tdb(%s) not implemented yet\n",
566 ret = rename(ctdb_db->db_path, new_path);
568 DEBUG(DEBUG_CRIT,(__location__
569 ": ctdb_backup_corrupted_tdb(%s) rename to %s failed: %d - %s\n",
570 ctdb_db->db_path, new_path,
571 errno, strerror(errno)));
572 talloc_free(new_path);
576 DEBUG(DEBUG_CRIT,(__location__
577 ": ctdb_backup_corrupted_tdb(%s) renamed to %s\n",
578 ctdb_db->db_path, new_path));
579 talloc_free(new_path);
583 int ctdb_recheck_persistent_health(struct ctdb_context *ctdb)
585 struct ctdb_db_context *ctdb_db;
590 for (ctdb_db = ctdb->db_list; ctdb_db; ctdb_db = ctdb_db->next) {
591 if (!ctdb_db->persistent) {
595 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
597 DEBUG(DEBUG_ALERT,(__location__
598 " load persistent health for '%s' failed\n",
603 if (ctdb_db->unhealthy_reason == NULL) {
605 DEBUG(DEBUG_INFO,(__location__
606 " persistent db '%s' healthy\n",
612 DEBUG(DEBUG_ALERT,(__location__
613 " persistent db '%s' unhealthy: %s\n",
615 ctdb_db->unhealthy_reason));
617 DEBUG((fail!=0)?DEBUG_ALERT:DEBUG_NOTICE,
618 ("ctdb_recheck_presistent_health: OK[%d] FAIL[%d]\n",
630 mark a database - as healthy
632 int32_t ctdb_control_db_set_healthy(struct ctdb_context *ctdb, TDB_DATA indata)
634 uint32_t db_id = *(uint32_t *)indata.dptr;
635 struct ctdb_db_context *ctdb_db;
637 bool may_recover = false;
639 ctdb_db = find_ctdb_db(ctdb, db_id);
641 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%x\n", db_id));
645 if (ctdb_db->unhealthy_reason) {
649 ret = ctdb_update_persistent_health(ctdb, ctdb_db, NULL, 1);
651 DEBUG(DEBUG_ERR,(__location__
652 " ctdb_update_persistent_health(%s) failed\n",
657 if (may_recover && !ctdb->done_startup) {
658 DEBUG(DEBUG_ERR, (__location__ " db %s become healthy - force recovery for startup\n",
660 ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
666 int32_t ctdb_control_db_get_health(struct ctdb_context *ctdb,
670 uint32_t db_id = *(uint32_t *)indata.dptr;
671 struct ctdb_db_context *ctdb_db;
674 ctdb_db = find_ctdb_db(ctdb, db_id);
676 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%x\n", db_id));
680 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
682 DEBUG(DEBUG_ERR,(__location__
683 " ctdb_load_persistent_health(%s) failed\n",
689 if (ctdb_db->unhealthy_reason) {
690 outdata->dptr = (uint8_t *)ctdb_db->unhealthy_reason;
691 outdata->dsize = strlen(ctdb_db->unhealthy_reason)+1;
698 int ctdb_set_db_readonly(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db)
702 if (ctdb_db->readonly) {
706 if (ctdb_db->persistent) {
707 DEBUG(DEBUG_ERR,("Trying to set persistent database with readonly property\n"));
711 ropath = talloc_asprintf(ctdb_db, "%s.RO", ctdb_db->db_path);
712 if (ropath == NULL) {
713 DEBUG(DEBUG_CRIT,("Failed to asprintf the tracking database\n"));
716 ctdb_db->rottdb = tdb_open(ropath,
717 ctdb->tunable.database_hash_size,
718 TDB_NOLOCK|TDB_CLEAR_IF_FIRST|TDB_NOSYNC,
720 if (ctdb_db->rottdb == NULL) {
721 DEBUG(DEBUG_CRIT,("Failed to open/create the tracking database '%s'\n", ropath));
726 DEBUG(DEBUG_NOTICE,("OPENED tracking database : '%s'\n", ropath));
728 ctdb_db->readonly = true;
734 attach to a database, handling both persistent and non-persistent databases
735 return 0 on success, -1 on failure
737 static int ctdb_local_attach(struct ctdb_context *ctdb, const char *db_name,
738 bool persistent, const char *unhealthy_reason,
741 struct ctdb_db_context *ctdb_db, *tmp_db;
746 int remaining_tries = 0;
748 ctdb_db = talloc_zero(ctdb, struct ctdb_db_context);
749 CTDB_NO_MEMORY(ctdb, ctdb_db);
751 ctdb_db->priority = 1;
752 ctdb_db->ctdb = ctdb;
753 ctdb_db->db_name = talloc_strdup(ctdb_db, db_name);
754 CTDB_NO_MEMORY(ctdb, ctdb_db->db_name);
756 key.dsize = strlen(db_name)+1;
757 key.dptr = discard_const(db_name);
758 ctdb_db->db_id = ctdb_hash(&key);
759 ctdb_db->persistent = persistent;
761 if (!ctdb_db->persistent) {
762 ctdb_db->delete_queue = trbt_create(ctdb_db, 0);
763 if (ctdb_db->delete_queue == NULL) {
764 CTDB_NO_MEMORY(ctdb, ctdb_db->delete_queue);
767 ctdb_db->ctdb_ltdb_store_fn = ctdb_ltdb_store_server;
770 /* check for hash collisions */
771 for (tmp_db=ctdb->db_list;tmp_db;tmp_db=tmp_db->next) {
772 if (tmp_db->db_id == ctdb_db->db_id) {
773 DEBUG(DEBUG_CRIT,("db_id 0x%x hash collision. name1='%s' name2='%s'\n",
774 tmp_db->db_id, db_name, tmp_db->db_name));
775 talloc_free(ctdb_db);
781 if (unhealthy_reason) {
782 ret = ctdb_update_persistent_health(ctdb, ctdb_db,
783 unhealthy_reason, 0);
785 DEBUG(DEBUG_ALERT,(__location__ " ctdb_update_persistent_health('%s','%s') failed: %d\n",
786 ctdb_db->db_name, unhealthy_reason, ret));
787 talloc_free(ctdb_db);
792 if (ctdb->max_persistent_check_errors > 0) {
795 if (ctdb->done_startup) {
799 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
801 DEBUG(DEBUG_ALERT,(__location__ " ctdb_load_persistent_health('%s') failed: %d\n",
802 ctdb_db->db_name, ret));
803 talloc_free(ctdb_db);
808 if (ctdb_db->unhealthy_reason && remaining_tries == 0) {
809 DEBUG(DEBUG_ALERT,(__location__ "ERROR: tdb %s is marked as unhealthy: %s\n",
810 ctdb_db->db_name, ctdb_db->unhealthy_reason));
811 talloc_free(ctdb_db);
815 if (ctdb_db->unhealthy_reason) {
816 /* this is just a warning, but we want that in the log file! */
817 DEBUG(DEBUG_ALERT,(__location__ "Warning: tdb %s is marked as unhealthy: %s\n",
818 ctdb_db->db_name, ctdb_db->unhealthy_reason));
821 /* open the database */
822 ctdb_db->db_path = talloc_asprintf(ctdb_db, "%s/%s.%u",
823 persistent?ctdb->db_directory_persistent:ctdb->db_directory,
826 tdb_flags = persistent? TDB_DEFAULT : TDB_CLEAR_IF_FIRST | TDB_NOSYNC;
827 if (ctdb->valgrinding) {
828 tdb_flags |= TDB_NOMMAP;
830 tdb_flags |= TDB_DISALLOW_NESTING;
832 tdb_flags |= TDB_INCOMPATIBLE_HASH;
836 ctdb_db->ltdb = tdb_wrap_open(ctdb, ctdb_db->db_path,
837 ctdb->tunable.database_hash_size,
839 O_CREAT|O_RDWR, mode);
840 if (ctdb_db->ltdb == NULL) {
842 int saved_errno = errno;
845 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s': %d - %s\n",
848 strerror(saved_errno)));
849 talloc_free(ctdb_db);
853 if (remaining_tries == 0) {
854 DEBUG(DEBUG_CRIT,(__location__
855 "Failed to open persistent tdb '%s': %d - %s\n",
858 strerror(saved_errno)));
859 talloc_free(ctdb_db);
863 ret = stat(ctdb_db->db_path, &st);
865 DEBUG(DEBUG_CRIT,(__location__
866 "Failed to open persistent tdb '%s': %d - %s\n",
869 strerror(saved_errno)));
870 talloc_free(ctdb_db);
874 ret = ctdb_backup_corrupted_tdb(ctdb, ctdb_db);
876 DEBUG(DEBUG_CRIT,(__location__
877 "Failed to open persistent tdb '%s': %d - %s\n",
880 strerror(saved_errno)));
881 talloc_free(ctdb_db);
891 ctdb_check_db_empty(ctdb_db);
893 ret = tdb_check(ctdb_db->ltdb->tdb, NULL, NULL);
898 DEBUG(DEBUG_CRIT,("tdb_check(%s) failed: %d - %s\n",
899 ctdb_db->db_path, ret,
900 tdb_errorstr(ctdb_db->ltdb->tdb)));
901 if (remaining_tries == 0) {
902 talloc_free(ctdb_db);
906 fd = tdb_fd(ctdb_db->ltdb->tdb);
907 ret = fstat(fd, &st);
909 DEBUG(DEBUG_CRIT,(__location__
910 "Failed to fstat() persistent tdb '%s': %d - %s\n",
914 talloc_free(ctdb_db);
919 talloc_free(ctdb_db->ltdb);
920 ctdb_db->ltdb = NULL;
922 ret = ctdb_backup_corrupted_tdb(ctdb, ctdb_db);
924 DEBUG(DEBUG_CRIT,("Failed to backup corrupted tdb '%s'\n",
926 talloc_free(ctdb_db);
936 /* set up a rb tree we can use to track which records we have a
937 fetch-lock in-flight for so we can defer any additional calls
940 ctdb_db->deferred_fetch = trbt_create(ctdb_db, 0);
941 if (ctdb_db->deferred_fetch == NULL) {
942 DEBUG(DEBUG_ERR,("Failed to create deferred fetch rb tree for ctdb database\n"));
943 talloc_free(ctdb_db);
947 DLIST_ADD(ctdb->db_list, ctdb_db);
949 /* setting this can help some high churn databases */
950 tdb_set_max_dead(ctdb_db->ltdb->tdb, ctdb->tunable.database_max_dead);
953 all databases support the "null" function. we need this in
954 order to do forced migration of records
956 ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_null_func, CTDB_NULL_FUNC);
958 DEBUG(DEBUG_CRIT,("Failed to setup null function for '%s'\n", ctdb_db->db_name));
959 talloc_free(ctdb_db);
964 all databases support the "fetch" function. we need this
965 for efficient Samba3 ctdb fetch
967 ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_fetch_func, CTDB_FETCH_FUNC);
969 DEBUG(DEBUG_CRIT,("Failed to setup fetch function for '%s'\n", ctdb_db->db_name));
970 talloc_free(ctdb_db);
975 all databases support the "fetch_with_header" function. we need this
976 for efficient readonly record fetches
978 ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_fetch_with_header_func, CTDB_FETCH_WITH_HEADER_FUNC);
980 DEBUG(DEBUG_CRIT,("Failed to setup fetch function for '%s'\n", ctdb_db->db_name));
981 talloc_free(ctdb_db);
985 ret = ctdb_vacuum_init(ctdb_db);
987 DEBUG(DEBUG_CRIT,("Failed to setup vacuuming for "
988 "database '%s'\n", ctdb_db->db_name));
989 talloc_free(ctdb_db);
994 DEBUG(DEBUG_INFO,("Attached to database '%s'\n", ctdb_db->db_path));
1001 struct ctdb_deferred_attach_context {
1002 struct ctdb_deferred_attach_context *next, *prev;
1003 struct ctdb_context *ctdb;
1004 struct ctdb_req_control *c;
1008 static int ctdb_deferred_attach_destructor(struct ctdb_deferred_attach_context *da_ctx)
1010 DLIST_REMOVE(da_ctx->ctdb->deferred_attach, da_ctx);
1015 static void ctdb_deferred_attach_timeout(struct event_context *ev, struct timed_event *te, struct timeval t, void *private_data)
1017 struct ctdb_deferred_attach_context *da_ctx = talloc_get_type(private_data, struct ctdb_deferred_attach_context);
1018 struct ctdb_context *ctdb = da_ctx->ctdb;
1020 ctdb_request_control_reply(ctdb, da_ctx->c, NULL, -1, NULL);
1021 talloc_free(da_ctx);
1024 static void ctdb_deferred_attach_callback(struct event_context *ev, struct timed_event *te, struct timeval t, void *private_data)
1026 struct ctdb_deferred_attach_context *da_ctx = talloc_get_type(private_data, struct ctdb_deferred_attach_context);
1027 struct ctdb_context *ctdb = da_ctx->ctdb;
1029 /* This talloc-steals the packet ->c */
1030 ctdb_input_pkt(ctdb, (struct ctdb_req_header *)da_ctx->c);
1031 talloc_free(da_ctx);
1034 int ctdb_process_deferred_attach(struct ctdb_context *ctdb)
1036 struct ctdb_deferred_attach_context *da_ctx;
1038 /* call it from the main event loop as soon as the current event
1041 while ((da_ctx = ctdb->deferred_attach) != NULL) {
1042 DLIST_REMOVE(ctdb->deferred_attach, da_ctx);
1043 event_add_timed(ctdb->ev, da_ctx, timeval_current_ofs(1,0), ctdb_deferred_attach_callback, da_ctx);
1050 a client has asked to attach a new database
1052 int32_t ctdb_control_db_attach(struct ctdb_context *ctdb, TDB_DATA indata,
1053 TDB_DATA *outdata, uint64_t tdb_flags,
1054 bool persistent, uint32_t client_id,
1055 struct ctdb_req_control *c,
1058 const char *db_name = (const char *)indata.dptr;
1059 struct ctdb_db_context *db;
1060 struct ctdb_node *node = ctdb->nodes[ctdb->pnn];
1061 struct ctdb_client *client = NULL;
1063 if (ctdb->tunable.allow_client_db_attach == 0) {
1064 DEBUG(DEBUG_ERR, ("DB Attach to database %s denied by tunable "
1065 "AllowClientDBAccess == 0\n", db_name));
1069 /* dont allow any local clients to attach while we are in recovery mode
1070 * except for the recovery daemon.
1071 * allow all attach from the network since these are always from remote
1074 if (client_id != 0) {
1075 client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
1077 if (client != NULL) {
1078 /* If the node is inactive it is not part of the cluster
1079 and we should not allow clients to attach to any
1082 if (node->flags & NODE_FLAGS_INACTIVE) {
1083 DEBUG(DEBUG_ERR,("DB Attach to database %s refused since node is inactive (disconnected or banned)\n", db_name));
1087 if (ctdb->recovery_mode == CTDB_RECOVERY_ACTIVE
1088 && client->pid != ctdb->recoverd_pid
1089 && !ctdb->done_startup) {
1090 struct ctdb_deferred_attach_context *da_ctx = talloc(client, struct ctdb_deferred_attach_context);
1092 if (da_ctx == NULL) {
1093 DEBUG(DEBUG_ERR,("DB Attach to database %s deferral for client with pid:%d failed due to OOM.\n", db_name, client->pid));
1097 da_ctx->ctdb = ctdb;
1098 da_ctx->c = talloc_steal(da_ctx, c);
1099 talloc_set_destructor(da_ctx, ctdb_deferred_attach_destructor);
1100 DLIST_ADD(ctdb->deferred_attach, da_ctx);
1102 event_add_timed(ctdb->ev, da_ctx, timeval_current_ofs(ctdb->tunable.deferred_attach_timeout, 0), ctdb_deferred_attach_timeout, da_ctx);
1104 DEBUG(DEBUG_ERR,("DB Attach to database %s deferred for client with pid:%d since node is in recovery mode.\n", db_name, client->pid));
1105 *async_reply = true;
1110 /* the client can optionally pass additional tdb flags, but we
1111 only allow a subset of those on the database in ctdb. Note
1112 that tdb_flags is passed in via the (otherwise unused)
1113 srvid to the attach control */
1114 tdb_flags &= (TDB_NOSYNC|TDB_INCOMPATIBLE_HASH);
1116 /* see if we already have this name */
1117 db = ctdb_db_handle(ctdb, db_name);
1119 outdata->dptr = (uint8_t *)&db->db_id;
1120 outdata->dsize = sizeof(db->db_id);
1121 tdb_add_flags(db->ltdb->tdb, tdb_flags);
1125 if (ctdb_local_attach(ctdb, db_name, persistent, NULL, (tdb_flags&TDB_INCOMPATIBLE_HASH)?true:false) != 0) {
1129 db = ctdb_db_handle(ctdb, db_name);
1131 DEBUG(DEBUG_ERR,("Failed to find db handle for name '%s'\n", db_name));
1135 /* remember the flags the client has specified */
1136 tdb_add_flags(db->ltdb->tdb, tdb_flags);
1138 outdata->dptr = (uint8_t *)&db->db_id;
1139 outdata->dsize = sizeof(db->db_id);
1141 /* Try to ensure it's locked in mem */
1142 ctdb_lockdown_memory(ctdb);
1144 /* tell all the other nodes about this database */
1145 ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL, tdb_flags,
1146 persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:
1147 CTDB_CONTROL_DB_ATTACH,
1148 0, CTDB_CTRL_FLAG_NOREPLY,
1149 indata, NULL, NULL);
1157 attach to all existing persistent databases
1159 static int ctdb_attach_persistent(struct ctdb_context *ctdb,
1160 const char *unhealthy_reason)
1165 /* open the persistent db directory and scan it for files */
1166 d = opendir(ctdb->db_directory_persistent);
1171 while ((de=readdir(d))) {
1173 size_t len = strlen(de->d_name);
1175 int invalid_name = 0;
1177 s = talloc_strdup(ctdb, de->d_name);
1178 CTDB_NO_MEMORY(ctdb, s);
1180 /* only accept names ending in .tdb */
1181 p = strstr(s, ".tdb.");
1182 if (len < 7 || p == NULL) {
1187 /* only accept names ending with .tdb. and any number of digits */
1189 while (*q != 0 && invalid_name == 0) {
1190 if (!isdigit(*q++)) {
1194 if (invalid_name == 1 || sscanf(p+5, "%u", &node) != 1 || node != ctdb->pnn) {
1195 DEBUG(DEBUG_ERR,("Ignoring persistent database '%s'\n", de->d_name));
1201 if (ctdb_local_attach(ctdb, s, true, unhealthy_reason, 0) != 0) {
1202 DEBUG(DEBUG_ERR,("Failed to attach to persistent database '%s'\n", de->d_name));
1208 DEBUG(DEBUG_INFO,("Attached to persistent database %s\n", s));
1216 int ctdb_attach_databases(struct ctdb_context *ctdb)
1219 char *persistent_health_path = NULL;
1220 char *unhealthy_reason = NULL;
1221 bool first_try = true;
1223 if (ctdb->db_directory == NULL) {
1224 ctdb->db_directory = VARDIR "/ctdb";
1226 if (ctdb->db_directory_persistent == NULL) {
1227 ctdb->db_directory_persistent = VARDIR "/ctdb/persistent";
1229 if (ctdb->db_directory_state == NULL) {
1230 ctdb->db_directory_state = VARDIR "/ctdb/state";
1233 /* make sure the db directory exists */
1234 ret = mkdir(ctdb->db_directory, 0700);
1235 if (ret == -1 && errno != EEXIST) {
1236 DEBUG(DEBUG_CRIT,(__location__ " Unable to create ctdb directory '%s'\n",
1237 ctdb->db_directory));
1241 /* make sure the persistent db directory exists */
1242 ret = mkdir(ctdb->db_directory_persistent, 0700);
1243 if (ret == -1 && errno != EEXIST) {
1244 DEBUG(DEBUG_CRIT,(__location__ " Unable to create ctdb persistent directory '%s'\n",
1245 ctdb->db_directory_persistent));
1249 /* make sure the internal state db directory exists */
1250 ret = mkdir(ctdb->db_directory_state, 0700);
1251 if (ret == -1 && errno != EEXIST) {
1252 DEBUG(DEBUG_CRIT,(__location__ " Unable to create ctdb state directory '%s'\n",
1253 ctdb->db_directory_state));
1257 persistent_health_path = talloc_asprintf(ctdb, "%s/%s.%u",
1258 ctdb->db_directory_state,
1259 PERSISTENT_HEALTH_TDB,
1261 if (persistent_health_path == NULL) {
1262 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
1268 ctdb->db_persistent_health = tdb_wrap_open(ctdb, persistent_health_path,
1269 0, TDB_DISALLOW_NESTING,
1270 O_CREAT | O_RDWR, 0600);
1271 if (ctdb->db_persistent_health == NULL) {
1272 struct tdb_wrap *tdb;
1275 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s': %d - %s\n",
1276 persistent_health_path,
1279 talloc_free(persistent_health_path);
1280 talloc_free(unhealthy_reason);
1285 unhealthy_reason = talloc_asprintf(ctdb, "WARNING - '%s' %s - %s",
1286 persistent_health_path,
1287 "was cleared after a failure",
1288 "manual verification needed");
1289 if (unhealthy_reason == NULL) {
1290 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
1291 talloc_free(persistent_health_path);
1295 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - retrying after CLEAR_IF_FIRST\n",
1296 persistent_health_path));
1297 tdb = tdb_wrap_open(ctdb, persistent_health_path,
1298 0, TDB_CLEAR_IF_FIRST | TDB_DISALLOW_NESTING,
1299 O_CREAT | O_RDWR, 0600);
1301 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - with CLEAR_IF_FIRST: %d - %s\n",
1302 persistent_health_path,
1305 talloc_free(persistent_health_path);
1306 talloc_free(unhealthy_reason);
1313 ret = tdb_check(ctdb->db_persistent_health->tdb, NULL, NULL);
1315 struct tdb_wrap *tdb;
1317 talloc_free(ctdb->db_persistent_health);
1318 ctdb->db_persistent_health = NULL;
1321 DEBUG(DEBUG_CRIT,("tdb_check('%s') failed\n",
1322 persistent_health_path));
1323 talloc_free(persistent_health_path);
1324 talloc_free(unhealthy_reason);
1329 unhealthy_reason = talloc_asprintf(ctdb, "WARNING - '%s' %s - %s",
1330 persistent_health_path,
1331 "was cleared after a failure",
1332 "manual verification needed");
1333 if (unhealthy_reason == NULL) {
1334 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
1335 talloc_free(persistent_health_path);
1339 DEBUG(DEBUG_CRIT,("tdb_check('%s') failed - retrying after CLEAR_IF_FIRST\n",
1340 persistent_health_path));
1341 tdb = tdb_wrap_open(ctdb, persistent_health_path,
1342 0, TDB_CLEAR_IF_FIRST | TDB_DISALLOW_NESTING,
1343 O_CREAT | O_RDWR, 0600);
1345 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - with CLEAR_IF_FIRST: %d - %s\n",
1346 persistent_health_path,
1349 talloc_free(persistent_health_path);
1350 talloc_free(unhealthy_reason);
1357 talloc_free(persistent_health_path);
1359 ret = ctdb_attach_persistent(ctdb, unhealthy_reason);
1360 talloc_free(unhealthy_reason);
1369 called when a broadcast seqnum update comes in
1371 int32_t ctdb_ltdb_update_seqnum(struct ctdb_context *ctdb, uint32_t db_id, uint32_t srcnode)
1373 struct ctdb_db_context *ctdb_db;
1374 if (srcnode == ctdb->pnn) {
1375 /* don't update ourselves! */
1379 ctdb_db = find_ctdb_db(ctdb, db_id);
1381 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_ltdb_update_seqnum\n", db_id));
1385 if (ctdb_db->unhealthy_reason) {
1386 DEBUG(DEBUG_ERR,("db(%s) unhealty in ctdb_ltdb_update_seqnum: %s\n",
1387 ctdb_db->db_name, ctdb_db->unhealthy_reason));
1391 tdb_increment_seqnum_nonblock(ctdb_db->ltdb->tdb);
1392 ctdb_db->seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1397 timer to check for seqnum changes in a ltdb and propogate them
1399 static void ctdb_ltdb_seqnum_check(struct event_context *ev, struct timed_event *te,
1400 struct timeval t, void *p)
1402 struct ctdb_db_context *ctdb_db = talloc_get_type(p, struct ctdb_db_context);
1403 struct ctdb_context *ctdb = ctdb_db->ctdb;
1404 uint32_t new_seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1405 if (new_seqnum != ctdb_db->seqnum) {
1406 /* something has changed - propogate it */
1408 data.dptr = (uint8_t *)&ctdb_db->db_id;
1409 data.dsize = sizeof(uint32_t);
1410 ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_VNNMAP, 0,
1411 CTDB_CONTROL_UPDATE_SEQNUM, 0, CTDB_CTRL_FLAG_NOREPLY,
1414 ctdb_db->seqnum = new_seqnum;
1416 /* setup a new timer */
1417 ctdb_db->seqnum_update =
1418 event_add_timed(ctdb->ev, ctdb_db,
1419 timeval_current_ofs(ctdb->tunable.seqnum_interval/1000, (ctdb->tunable.seqnum_interval%1000)*1000),
1420 ctdb_ltdb_seqnum_check, ctdb_db);
1424 enable seqnum handling on this db
1426 int32_t ctdb_ltdb_enable_seqnum(struct ctdb_context *ctdb, uint32_t db_id)
1428 struct ctdb_db_context *ctdb_db;
1429 ctdb_db = find_ctdb_db(ctdb, db_id);
1431 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_ltdb_enable_seqnum\n", db_id));
1435 if (ctdb_db->seqnum_update == NULL) {
1436 ctdb_db->seqnum_update =
1437 event_add_timed(ctdb->ev, ctdb_db,
1438 timeval_current_ofs(ctdb->tunable.seqnum_interval/1000, (ctdb->tunable.seqnum_interval%1000)*1000),
1439 ctdb_ltdb_seqnum_check, ctdb_db);
1442 tdb_enable_seqnum(ctdb_db->ltdb->tdb);
1443 ctdb_db->seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1447 int32_t ctdb_control_set_db_priority(struct ctdb_context *ctdb, TDB_DATA indata)
1449 struct ctdb_db_priority *db_prio = (struct ctdb_db_priority *)indata.dptr;
1450 struct ctdb_db_context *ctdb_db;
1452 ctdb_db = find_ctdb_db(ctdb, db_prio->db_id);
1454 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_set_db_priority\n", db_prio->db_id));
1458 if ((db_prio->priority<1) || (db_prio->priority>NUM_DB_PRIORITIES)) {
1459 DEBUG(DEBUG_ERR,("Trying to set invalid priority : %u\n", db_prio->priority));
1463 ctdb_db->priority = db_prio->priority;
1464 DEBUG(DEBUG_INFO,("Setting DB priority to %u for db 0x%08x\n", db_prio->priority, db_prio->db_id));
1470 int ctdb_set_db_sticky(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db)
1473 DEBUG(DEBUG_NOTICE,("set db sticky %s\n", ctdb_db->db_name));
1475 if (ctdb_db->sticky) {
1479 if (ctdb_db->persistent) {
1480 DEBUG(DEBUG_ERR,("Trying to set persistent database with sticky property\n"));
1484 ctdb_db->sticky_records = trbt_create(ctdb_db, 0);
1486 ctdb_db->sticky = true;
1491 int32_t ctdb_control_get_db_statistics(struct ctdb_context *ctdb,
1495 struct ctdb_db_context *ctdb_db;
1497 ctdb_db = find_ctdb_db(ctdb, db_id);
1499 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in get_db_statistics\n", db_id));
1503 outdata->dptr = (uint8_t *)&(ctdb_db->statistics);
1504 outdata->dsize = sizeof(ctdb_db->statistics);