2 ctdb ltdb code - server side
4 Copyright (C) Andrew Tridgell 2007
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program; if not, see <http://www.gnu.org/licenses/>.
21 #include "lib/tevent/tevent.h"
22 #include "lib/tdb/include/tdb.h"
23 #include "system/network.h"
24 #include "system/filesys.h"
25 #include "system/dir.h"
26 #include "system/time.h"
27 #include "../include/ctdb_private.h"
28 #include "../common/rb_tree.h"
30 #include "lib/util/dlinklist.h"
33 #define PERSISTENT_HEALTH_TDB "persistent_health.tdb"
36 * write a record to a normal database
38 * This is the server-variant of the ctdb_ltdb_store function.
39 * It contains logic to determine whether a record should be
40 * stored or deleted. It also sends SCHEDULE_FOR_DELETION
41 * controls to the local ctdb daemon if apporpriate.
43 static int ctdb_ltdb_store_server(struct ctdb_db_context *ctdb_db,
45 struct ctdb_ltdb_header *header,
48 struct ctdb_context *ctdb = ctdb_db->ctdb;
51 bool seqnum_suppressed = false;
53 bool schedule_for_deletion = false;
56 if (ctdb->flags & CTDB_FLAG_TORTURE) {
57 struct ctdb_ltdb_header *h2;
58 rec = tdb_fetch(ctdb_db->ltdb->tdb, key);
59 h2 = (struct ctdb_ltdb_header *)rec.dptr;
60 if (rec.dptr && rec.dsize >= sizeof(h2) && h2->rsn > header->rsn) {
61 DEBUG(DEBUG_CRIT,("RSN regression! %llu %llu\n",
62 (unsigned long long)h2->rsn, (unsigned long long)header->rsn));
64 if (rec.dptr) free(rec.dptr);
67 if (ctdb->vnn_map == NULL) {
69 * Called from a client: always store the record
70 * Also don't call ctdb_lmaster since it uses the vnn_map!
76 lmaster = ctdb_lmaster(ctdb_db->ctdb, &key);
79 * If we migrate an empty record off to another node
80 * and the record has not been migrated with data,
81 * delete the record instead of storing the empty record.
83 if (data.dsize != 0) {
85 } else if (ctdb_db->persistent) {
87 } else if (header->flags & CTDB_REC_FLAG_AUTOMATIC) {
89 * The record is not created by the client but
90 * automatically by the ctdb_ltdb_fetch logic that
91 * creates a record with an initial header in the
92 * ltdb before trying to migrate the record from
93 * the current lmaster. Keep it instead of trying
94 * to delete the non-existing record...
97 schedule_for_deletion = true;
98 } else if (header->flags & CTDB_REC_FLAG_MIGRATED_WITH_DATA) {
100 } else if (ctdb_db->ctdb->pnn == lmaster) {
102 * If we are lmaster, then we usually keep the record.
103 * But if we retrieve the dmaster role by a VACUUM_MIGRATE
104 * and the record is empty and has never been migrated
105 * with data, then we should delete it instead of storing it.
106 * This is part of the vacuuming process.
108 * The reason that we usually need to store even empty records
109 * on the lmaster is that a client operating directly on the
110 * lmaster (== dmaster) expects the local copy of the record to
111 * exist after successful ctdb migrate call. If the record does
112 * not exist, the client goes into a migrate loop and eventually
113 * fails. So storing the empty record makes sure that we do not
114 * need to change the client code.
116 if (!(header->flags & CTDB_REC_FLAG_VACUUM_MIGRATED)) {
118 } else if (ctdb_db->ctdb->pnn != header->dmaster) {
121 } else if (ctdb_db->ctdb->pnn == header->dmaster) {
127 !ctdb_db->persistent &&
128 (ctdb_db->ctdb->pnn == header->dmaster))
130 schedule_for_deletion = true;
135 * The VACUUM_MIGRATED flag is only set temporarily for
136 * the above logic when the record was retrieved by a
137 * VACUUM_MIGRATE call and should not be stored in the
140 * The VACUUM_MIGRATE call is triggered by a vacuum fetch,
141 * and there are two cases in which the corresponding record
142 * is stored in the local database:
143 * 1. The record has been migrated with data in the past
144 * (the MIGRATED_WITH_DATA record flag is set).
145 * 2. The record has been filled with data again since it
146 * had been submitted in the VACUUM_FETCH message to the
148 * For such records it is important to not store the
149 * VACUUM_MIGRATED flag in the database.
151 header->flags &= ~CTDB_REC_FLAG_VACUUM_MIGRATED;
154 * Similarly, clear the AUTOMATIC flag which should not enter
155 * the local database copy since this would require client
156 * modifications to clear the flag when the client stores
159 header->flags &= ~CTDB_REC_FLAG_AUTOMATIC;
161 rec.dsize = sizeof(*header) + data.dsize;
162 rec.dptr = talloc_size(ctdb, rec.dsize);
163 CTDB_NO_MEMORY(ctdb, rec.dptr);
165 memcpy(rec.dptr, header, sizeof(*header));
166 memcpy(rec.dptr + sizeof(*header), data.dptr, data.dsize);
168 /* Databases with seqnum updates enabled only get their seqnum
169 changes when/if we modify the data */
170 if (ctdb_db->seqnum_update != NULL) {
172 old = tdb_fetch(ctdb_db->ltdb->tdb, key);
174 if ( (old.dsize == rec.dsize)
175 && !memcmp(old.dptr+sizeof(struct ctdb_ltdb_header),
176 rec.dptr+sizeof(struct ctdb_ltdb_header),
177 rec.dsize-sizeof(struct ctdb_ltdb_header)) ) {
178 tdb_remove_flags(ctdb_db->ltdb->tdb, TDB_SEQNUM);
179 seqnum_suppressed = true;
181 if (old.dptr) free(old.dptr);
184 DEBUG(DEBUG_DEBUG, (__location__ " db[%s]: %s record: hash[0x%08x]\n",
186 keep?"storing":"deleting",
190 ret = tdb_store(ctdb_db->ltdb->tdb, key, rec, TDB_REPLACE);
192 ret = tdb_delete(ctdb_db->ltdb->tdb, key);
199 tdb_error(ctdb_db->ltdb->tdb) == TDB_ERR_NOEXIST)
204 DEBUG(lvl, (__location__ " db[%s]: Failed to %s record: "
207 keep?"store":"delete", ret,
208 tdb_errorstr(ctdb_db->ltdb->tdb)));
210 schedule_for_deletion = false;
212 if (seqnum_suppressed) {
213 tdb_add_flags(ctdb_db->ltdb->tdb, TDB_SEQNUM);
216 talloc_free(rec.dptr);
218 if (schedule_for_deletion) {
220 ret2 = ctdb_local_schedule_for_deletion(ctdb_db, header, key);
222 DEBUG(DEBUG_ERR, (__location__ " ctdb_local_schedule_for_deletion failed.\n"));
229 struct lock_fetch_state {
230 struct ctdb_context *ctdb;
231 void (*recv_pkt)(void *, struct ctdb_req_header *);
233 struct ctdb_req_header *hdr;
235 bool ignore_generation;
239 called when we should retry the operation
241 static void lock_fetch_callback(void *p)
243 struct lock_fetch_state *state = talloc_get_type(p, struct lock_fetch_state);
244 if (!state->ignore_generation &&
245 state->generation != state->ctdb->vnn_map->generation) {
246 DEBUG(DEBUG_NOTICE,("Discarding previous generation lockwait packet\n"));
247 talloc_free(state->hdr);
250 state->recv_pkt(state->recv_context, state->hdr);
251 DEBUG(DEBUG_INFO,(__location__ " PACKET REQUEUED\n"));
256 do a non-blocking ltdb_lock, deferring this ctdb request until we
259 It does the following:
261 1) tries to get the chainlock. If it succeeds, then it returns 0
263 2) if it fails to get a chainlock immediately then it sets up a
264 non-blocking chainlock via ctdb_lockwait, and when it gets the
265 chainlock it re-submits this ctdb request to the main packet
268 This effectively queues all ctdb requests that cannot be
269 immediately satisfied until it can get the lock. This means that
270 the main ctdb daemon will not block waiting for a chainlock held by
273 There are 3 possible return values:
275 0: means that it got the lock immediately.
276 -1: means that it failed to get the lock, and won't retry
277 -2: means that it failed to get the lock immediately, but will retry
279 int ctdb_ltdb_lock_requeue(struct ctdb_db_context *ctdb_db,
280 TDB_DATA key, struct ctdb_req_header *hdr,
281 void (*recv_pkt)(void *, struct ctdb_req_header *),
282 void *recv_context, bool ignore_generation)
285 struct tdb_context *tdb = ctdb_db->ltdb->tdb;
286 struct lockwait_handle *h;
287 struct lock_fetch_state *state;
289 ret = tdb_chainlock_nonblock(tdb, key);
292 !(errno == EACCES || errno == EAGAIN || errno == EDEADLK)) {
293 /* a hard failure - don't try again */
297 /* when torturing, ensure we test the contended path */
298 if ((ctdb_db->ctdb->flags & CTDB_FLAG_TORTURE) &&
301 tdb_chainunlock(tdb, key);
304 /* first the non-contended path */
309 state = talloc(hdr, struct lock_fetch_state);
310 state->ctdb = ctdb_db->ctdb;
312 state->recv_pkt = recv_pkt;
313 state->recv_context = recv_context;
314 state->generation = ctdb_db->ctdb->vnn_map->generation;
315 state->ignore_generation = ignore_generation;
317 /* now the contended path */
318 h = ctdb_lockwait(ctdb_db, key, lock_fetch_callback, state);
323 /* we need to move the packet off the temporary context in ctdb_input_pkt(),
324 so it won't be freed yet */
325 talloc_steal(state, hdr);
326 talloc_steal(state, h);
328 /* now tell the caller than we will retry asynchronously */
333 a varient of ctdb_ltdb_lock_requeue that also fetches the record
335 int ctdb_ltdb_lock_fetch_requeue(struct ctdb_db_context *ctdb_db,
336 TDB_DATA key, struct ctdb_ltdb_header *header,
337 struct ctdb_req_header *hdr, TDB_DATA *data,
338 void (*recv_pkt)(void *, struct ctdb_req_header *),
339 void *recv_context, bool ignore_generation)
343 ret = ctdb_ltdb_lock_requeue(ctdb_db, key, hdr, recv_pkt,
344 recv_context, ignore_generation);
346 ret = ctdb_ltdb_fetch(ctdb_db, key, header, hdr, data);
349 uret = ctdb_ltdb_unlock(ctdb_db, key);
351 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", uret));
360 paraoid check to see if the db is empty
362 static void ctdb_check_db_empty(struct ctdb_db_context *ctdb_db)
364 struct tdb_context *tdb = ctdb_db->ltdb->tdb;
365 int count = tdb_traverse_read(tdb, NULL, NULL);
367 DEBUG(DEBUG_ALERT,(__location__ " tdb '%s' not empty on attach! aborting\n",
369 ctdb_fatal(ctdb_db->ctdb, "database not empty on attach");
373 int ctdb_load_persistent_health(struct ctdb_context *ctdb,
374 struct ctdb_db_context *ctdb_db)
376 struct tdb_context *tdb = ctdb->db_persistent_health->tdb;
382 key.dptr = discard_const_p(uint8_t, ctdb_db->db_name);
383 key.dsize = strlen(ctdb_db->db_name);
385 old = ctdb_db->unhealthy_reason;
386 ctdb_db->unhealthy_reason = NULL;
388 val = tdb_fetch(tdb, key);
390 reason = talloc_strndup(ctdb_db,
391 (const char *)val.dptr,
393 if (reason == NULL) {
394 DEBUG(DEBUG_ALERT,(__location__ " talloc_strndup(%d) failed\n",
396 ctdb_db->unhealthy_reason = old;
407 ctdb_db->unhealthy_reason = reason;
411 int ctdb_update_persistent_health(struct ctdb_context *ctdb,
412 struct ctdb_db_context *ctdb_db,
413 const char *given_reason,/* NULL means healthy */
414 int num_healthy_nodes)
416 struct tdb_context *tdb = ctdb->db_persistent_health->tdb;
420 char *new_reason = NULL;
421 char *old_reason = NULL;
423 ret = tdb_transaction_start(tdb);
425 DEBUG(DEBUG_ALERT,(__location__ " tdb_transaction_start('%s') failed: %d - %s\n",
426 tdb_name(tdb), ret, tdb_errorstr(tdb)));
430 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
432 DEBUG(DEBUG_ALERT,(__location__ " ctdb_load_persistent_health('%s') failed: %d\n",
433 ctdb_db->db_name, ret));
436 old_reason = ctdb_db->unhealthy_reason;
438 key.dptr = discard_const_p(uint8_t, ctdb_db->db_name);
439 key.dsize = strlen(ctdb_db->db_name);
442 new_reason = talloc_strdup(ctdb_db, given_reason);
443 if (new_reason == NULL) {
444 DEBUG(DEBUG_ALERT,(__location__ " talloc_strdup(%s) failed\n",
448 } else if (old_reason && num_healthy_nodes == 0) {
450 * If the reason indicates ok, but there where no healthy nodes
451 * available, that it means, we have not recovered valid content
452 * of the db. So if there's an old reason, prefix it with
453 * "NO-HEALTHY-NODES - "
457 #define _TMP_PREFIX "NO-HEALTHY-NODES - "
458 ret = strncmp(_TMP_PREFIX, old_reason, strlen(_TMP_PREFIX));
460 prefix = _TMP_PREFIX;
464 new_reason = talloc_asprintf(ctdb_db, "%s%s",
466 if (new_reason == NULL) {
467 DEBUG(DEBUG_ALERT,(__location__ " talloc_asprintf(%s%s) failed\n",
468 prefix, old_reason));
475 val.dptr = discard_const_p(uint8_t, new_reason);
476 val.dsize = strlen(new_reason);
478 ret = tdb_store(tdb, key, val, TDB_REPLACE);
480 tdb_transaction_cancel(tdb);
481 DEBUG(DEBUG_ALERT,(__location__ " tdb_store('%s', %s, %s) failed: %d - %s\n",
482 tdb_name(tdb), ctdb_db->db_name, new_reason,
483 ret, tdb_errorstr(tdb)));
484 talloc_free(new_reason);
487 DEBUG(DEBUG_ALERT,("Updated db health for db(%s) to: %s\n",
488 ctdb_db->db_name, new_reason));
489 } else if (old_reason) {
490 ret = tdb_delete(tdb, key);
492 tdb_transaction_cancel(tdb);
493 DEBUG(DEBUG_ALERT,(__location__ " tdb_delete('%s', %s) failed: %d - %s\n",
494 tdb_name(tdb), ctdb_db->db_name,
495 ret, tdb_errorstr(tdb)));
496 talloc_free(new_reason);
499 DEBUG(DEBUG_NOTICE,("Updated db health for db(%s): OK\n",
503 ret = tdb_transaction_commit(tdb);
504 if (ret != TDB_SUCCESS) {
505 DEBUG(DEBUG_ALERT,(__location__ " tdb_transaction_commit('%s') failed: %d - %s\n",
506 tdb_name(tdb), ret, tdb_errorstr(tdb)));
507 talloc_free(new_reason);
511 talloc_free(old_reason);
512 ctdb_db->unhealthy_reason = new_reason;
517 static int ctdb_backup_corrupted_tdb(struct ctdb_context *ctdb,
518 struct ctdb_db_context *ctdb_db)
520 time_t now = time(NULL);
528 /* formatted like: foo.tdb.0.corrupted.20091204160825.0Z */
529 new_path = talloc_asprintf(ctdb_db, "%s.corrupted."
530 "%04u%02u%02u%02u%02u%02u.0Z",
532 tm->tm_year+1900, tm->tm_mon+1,
533 tm->tm_mday, tm->tm_hour, tm->tm_min,
535 if (new_path == NULL) {
536 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
540 new_reason = talloc_asprintf(ctdb_db,
541 "ERROR - Backup of corrupted TDB in '%s'",
543 if (new_reason == NULL) {
544 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
547 ret = ctdb_update_persistent_health(ctdb, ctdb_db, new_reason, 0);
548 talloc_free(new_reason);
550 DEBUG(DEBUG_CRIT,(__location__
551 ": ctdb_backup_corrupted_tdb(%s) not implemented yet\n",
556 ret = rename(ctdb_db->db_path, new_path);
558 DEBUG(DEBUG_CRIT,(__location__
559 ": ctdb_backup_corrupted_tdb(%s) rename to %s failed: %d - %s\n",
560 ctdb_db->db_path, new_path,
561 errno, strerror(errno)));
562 talloc_free(new_path);
566 DEBUG(DEBUG_CRIT,(__location__
567 ": ctdb_backup_corrupted_tdb(%s) renamed to %s\n",
568 ctdb_db->db_path, new_path));
569 talloc_free(new_path);
573 int ctdb_recheck_persistent_health(struct ctdb_context *ctdb)
575 struct ctdb_db_context *ctdb_db;
580 for (ctdb_db = ctdb->db_list; ctdb_db; ctdb_db = ctdb_db->next) {
581 if (!ctdb_db->persistent) {
585 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
587 DEBUG(DEBUG_ALERT,(__location__
588 " load persistent health for '%s' failed\n",
593 if (ctdb_db->unhealthy_reason == NULL) {
595 DEBUG(DEBUG_INFO,(__location__
596 " persistent db '%s' healthy\n",
602 DEBUG(DEBUG_ALERT,(__location__
603 " persistent db '%s' unhealthy: %s\n",
605 ctdb_db->unhealthy_reason));
607 DEBUG((fail!=0)?DEBUG_ALERT:DEBUG_NOTICE,
608 ("ctdb_recheck_presistent_health: OK[%d] FAIL[%d]\n",
620 mark a database - as healthy
622 int32_t ctdb_control_db_set_healthy(struct ctdb_context *ctdb, TDB_DATA indata)
624 uint32_t db_id = *(uint32_t *)indata.dptr;
625 struct ctdb_db_context *ctdb_db;
627 bool may_recover = false;
629 ctdb_db = find_ctdb_db(ctdb, db_id);
631 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%x\n", db_id));
635 if (ctdb_db->unhealthy_reason) {
639 ret = ctdb_update_persistent_health(ctdb, ctdb_db, NULL, 1);
641 DEBUG(DEBUG_ERR,(__location__
642 " ctdb_update_persistent_health(%s) failed\n",
647 if (may_recover && !ctdb->done_startup) {
648 DEBUG(DEBUG_ERR, (__location__ " db %s become healthy - force recovery for startup\n",
650 ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
656 int32_t ctdb_control_db_get_health(struct ctdb_context *ctdb,
660 uint32_t db_id = *(uint32_t *)indata.dptr;
661 struct ctdb_db_context *ctdb_db;
664 ctdb_db = find_ctdb_db(ctdb, db_id);
666 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%x\n", db_id));
670 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
672 DEBUG(DEBUG_ERR,(__location__
673 " ctdb_load_persistent_health(%s) failed\n",
679 if (ctdb_db->unhealthy_reason) {
680 outdata->dptr = (uint8_t *)ctdb_db->unhealthy_reason;
681 outdata->dsize = strlen(ctdb_db->unhealthy_reason)+1;
688 int ctdb_set_db_readonly(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db)
692 if (ctdb_db->readonly) {
696 if (ctdb_db->persistent) {
697 DEBUG(DEBUG_ERR,("Trying to set persistent database with readonly property\n"));
701 ropath = talloc_asprintf(ctdb_db, "%s.RO", ctdb_db->db_path);
702 if (ropath == NULL) {
703 DEBUG(DEBUG_CRIT,("Failed to asprintf the tracking database\n"));
706 ctdb_db->rottdb = tdb_open(ropath,
707 ctdb->tunable.database_hash_size,
708 TDB_NOLOCK|TDB_CLEAR_IF_FIRST|TDB_NOSYNC,
710 if (ctdb_db->rottdb == NULL) {
711 DEBUG(DEBUG_CRIT,("Failed to open/create the tracking database '%s'\n", ropath));
716 DEBUG(DEBUG_NOTICE,("OPENED tracking database : '%s'\n", ropath));
718 ctdb_db->readonly = true;
724 attach to a database, handling both persistent and non-persistent databases
725 return 0 on success, -1 on failure
727 static int ctdb_local_attach(struct ctdb_context *ctdb, const char *db_name,
728 bool persistent, const char *unhealthy_reason,
731 struct ctdb_db_context *ctdb_db, *tmp_db;
736 int remaining_tries = 0;
738 ctdb_db = talloc_zero(ctdb, struct ctdb_db_context);
739 CTDB_NO_MEMORY(ctdb, ctdb_db);
741 ctdb_db->priority = 1;
742 ctdb_db->ctdb = ctdb;
743 ctdb_db->db_name = talloc_strdup(ctdb_db, db_name);
744 CTDB_NO_MEMORY(ctdb, ctdb_db->db_name);
746 key.dsize = strlen(db_name)+1;
747 key.dptr = discard_const(db_name);
748 ctdb_db->db_id = ctdb_hash(&key);
749 ctdb_db->persistent = persistent;
751 if (!ctdb_db->persistent) {
752 ctdb_db->delete_queue = trbt_create(ctdb_db, 0);
753 if (ctdb_db->delete_queue == NULL) {
754 CTDB_NO_MEMORY(ctdb, ctdb_db->delete_queue);
757 ctdb_db->ctdb_ltdb_store_fn = ctdb_ltdb_store_server;
760 /* check for hash collisions */
761 for (tmp_db=ctdb->db_list;tmp_db;tmp_db=tmp_db->next) {
762 if (tmp_db->db_id == ctdb_db->db_id) {
763 DEBUG(DEBUG_CRIT,("db_id 0x%x hash collision. name1='%s' name2='%s'\n",
764 tmp_db->db_id, db_name, tmp_db->db_name));
765 talloc_free(ctdb_db);
771 if (unhealthy_reason) {
772 ret = ctdb_update_persistent_health(ctdb, ctdb_db,
773 unhealthy_reason, 0);
775 DEBUG(DEBUG_ALERT,(__location__ " ctdb_update_persistent_health('%s','%s') failed: %d\n",
776 ctdb_db->db_name, unhealthy_reason, ret));
777 talloc_free(ctdb_db);
782 if (ctdb->max_persistent_check_errors > 0) {
785 if (ctdb->done_startup) {
789 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
791 DEBUG(DEBUG_ALERT,(__location__ " ctdb_load_persistent_health('%s') failed: %d\n",
792 ctdb_db->db_name, ret));
793 talloc_free(ctdb_db);
798 if (ctdb_db->unhealthy_reason && remaining_tries == 0) {
799 DEBUG(DEBUG_ALERT,(__location__ "ERROR: tdb %s is marked as unhealthy: %s\n",
800 ctdb_db->db_name, ctdb_db->unhealthy_reason));
801 talloc_free(ctdb_db);
805 if (ctdb_db->unhealthy_reason) {
806 /* this is just a warning, but we want that in the log file! */
807 DEBUG(DEBUG_ALERT,(__location__ "Warning: tdb %s is marked as unhealthy: %s\n",
808 ctdb_db->db_name, ctdb_db->unhealthy_reason));
811 /* open the database */
812 ctdb_db->db_path = talloc_asprintf(ctdb_db, "%s/%s.%u",
813 persistent?ctdb->db_directory_persistent:ctdb->db_directory,
816 tdb_flags = persistent? TDB_DEFAULT : TDB_CLEAR_IF_FIRST | TDB_NOSYNC;
817 if (ctdb->valgrinding) {
818 tdb_flags |= TDB_NOMMAP;
820 tdb_flags |= TDB_DISALLOW_NESTING;
822 tdb_flags |= TDB_INCOMPATIBLE_HASH;
826 ctdb_db->ltdb = tdb_wrap_open(ctdb, ctdb_db->db_path,
827 ctdb->tunable.database_hash_size,
829 O_CREAT|O_RDWR, mode);
830 if (ctdb_db->ltdb == NULL) {
832 int saved_errno = errno;
835 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s': %d - %s\n",
838 strerror(saved_errno)));
839 talloc_free(ctdb_db);
843 if (remaining_tries == 0) {
844 DEBUG(DEBUG_CRIT,(__location__
845 "Failed to open persistent tdb '%s': %d - %s\n",
848 strerror(saved_errno)));
849 talloc_free(ctdb_db);
853 ret = stat(ctdb_db->db_path, &st);
855 DEBUG(DEBUG_CRIT,(__location__
856 "Failed to open persistent tdb '%s': %d - %s\n",
859 strerror(saved_errno)));
860 talloc_free(ctdb_db);
864 ret = ctdb_backup_corrupted_tdb(ctdb, ctdb_db);
866 DEBUG(DEBUG_CRIT,(__location__
867 "Failed to open persistent tdb '%s': %d - %s\n",
870 strerror(saved_errno)));
871 talloc_free(ctdb_db);
881 ctdb_check_db_empty(ctdb_db);
883 ret = tdb_check(ctdb_db->ltdb->tdb, NULL, NULL);
888 DEBUG(DEBUG_CRIT,("tdb_check(%s) failed: %d - %s\n",
889 ctdb_db->db_path, ret,
890 tdb_errorstr(ctdb_db->ltdb->tdb)));
891 if (remaining_tries == 0) {
892 talloc_free(ctdb_db);
896 fd = tdb_fd(ctdb_db->ltdb->tdb);
897 ret = fstat(fd, &st);
899 DEBUG(DEBUG_CRIT,(__location__
900 "Failed to fstat() persistent tdb '%s': %d - %s\n",
904 talloc_free(ctdb_db);
909 talloc_free(ctdb_db->ltdb);
910 ctdb_db->ltdb = NULL;
912 ret = ctdb_backup_corrupted_tdb(ctdb, ctdb_db);
914 DEBUG(DEBUG_CRIT,("Failed to backup corrupted tdb '%s'\n",
916 talloc_free(ctdb_db);
926 /* set up a rb tree we can use to track which records we have a
927 fetch-lock in-flight for so we can defer any additional calls
930 ctdb_db->deferred_fetch = trbt_create(ctdb_db, 0);
931 if (ctdb_db->deferred_fetch == NULL) {
932 DEBUG(DEBUG_ERR,("Failed to create deferred fetch rb tree for ctdb database\n"));
933 talloc_free(ctdb_db);
937 DLIST_ADD(ctdb->db_list, ctdb_db);
939 /* setting this can help some high churn databases */
940 tdb_set_max_dead(ctdb_db->ltdb->tdb, ctdb->tunable.database_max_dead);
943 all databases support the "null" function. we need this in
944 order to do forced migration of records
946 ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_null_func, CTDB_NULL_FUNC);
948 DEBUG(DEBUG_CRIT,("Failed to setup null function for '%s'\n", ctdb_db->db_name));
949 talloc_free(ctdb_db);
954 all databases support the "fetch" function. we need this
955 for efficient Samba3 ctdb fetch
957 ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_fetch_func, CTDB_FETCH_FUNC);
959 DEBUG(DEBUG_CRIT,("Failed to setup fetch function for '%s'\n", ctdb_db->db_name));
960 talloc_free(ctdb_db);
965 all databases support the "fetch_with_header" function. we need this
966 for efficient readonly record fetches
968 ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_fetch_with_header_func, CTDB_FETCH_WITH_HEADER_FUNC);
970 DEBUG(DEBUG_CRIT,("Failed to setup fetch function for '%s'\n", ctdb_db->db_name));
971 talloc_free(ctdb_db);
975 ret = ctdb_vacuum_init(ctdb_db);
977 DEBUG(DEBUG_CRIT,("Failed to setup vacuuming for "
978 "database '%s'\n", ctdb_db->db_name));
979 talloc_free(ctdb_db);
984 DEBUG(DEBUG_INFO,("Attached to database '%s'\n", ctdb_db->db_path));
991 struct ctdb_deferred_attach_context {
992 struct ctdb_deferred_attach_context *next, *prev;
993 struct ctdb_context *ctdb;
994 struct ctdb_req_control *c;
998 static int ctdb_deferred_attach_destructor(struct ctdb_deferred_attach_context *da_ctx)
1000 DLIST_REMOVE(da_ctx->ctdb->deferred_attach, da_ctx);
1005 static void ctdb_deferred_attach_timeout(struct event_context *ev, struct timed_event *te, struct timeval t, void *private_data)
1007 struct ctdb_deferred_attach_context *da_ctx = talloc_get_type(private_data, struct ctdb_deferred_attach_context);
1008 struct ctdb_context *ctdb = da_ctx->ctdb;
1010 ctdb_request_control_reply(ctdb, da_ctx->c, NULL, -1, NULL);
1011 talloc_free(da_ctx);
1014 static void ctdb_deferred_attach_callback(struct event_context *ev, struct timed_event *te, struct timeval t, void *private_data)
1016 struct ctdb_deferred_attach_context *da_ctx = talloc_get_type(private_data, struct ctdb_deferred_attach_context);
1017 struct ctdb_context *ctdb = da_ctx->ctdb;
1019 /* This talloc-steals the packet ->c */
1020 ctdb_input_pkt(ctdb, (struct ctdb_req_header *)da_ctx->c);
1021 talloc_free(da_ctx);
1024 int ctdb_process_deferred_attach(struct ctdb_context *ctdb)
1026 struct ctdb_deferred_attach_context *da_ctx;
1028 /* call it from the main event loop as soon as the current event
1031 while ((da_ctx = ctdb->deferred_attach) != NULL) {
1032 DLIST_REMOVE(ctdb->deferred_attach, da_ctx);
1033 event_add_timed(ctdb->ev, da_ctx, timeval_current_ofs(1,0), ctdb_deferred_attach_callback, da_ctx);
1040 a client has asked to attach a new database
1042 int32_t ctdb_control_db_attach(struct ctdb_context *ctdb, TDB_DATA indata,
1043 TDB_DATA *outdata, uint64_t tdb_flags,
1044 bool persistent, uint32_t client_id,
1045 struct ctdb_req_control *c,
1048 const char *db_name = (const char *)indata.dptr;
1049 struct ctdb_db_context *db;
1050 struct ctdb_node *node = ctdb->nodes[ctdb->pnn];
1051 struct ctdb_client *client = NULL;
1053 if (ctdb->tunable.allow_client_db_attach == 0) {
1054 DEBUG(DEBUG_ERR, ("DB Attach to database %s denied by tunable "
1055 "AllowClientDBAccess == 0\n", db_name));
1059 /* dont allow any local clients to attach while we are in recovery mode
1060 * except for the recovery daemon.
1061 * allow all attach from the network since these are always from remote
1064 if (client_id != 0) {
1065 client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
1067 if (client != NULL) {
1068 /* If the node is inactive it is not part of the cluster
1069 and we should not allow clients to attach to any
1072 if (node->flags & NODE_FLAGS_INACTIVE) {
1073 DEBUG(DEBUG_ERR,("DB Attach to database %s refused since node is inactive (disconnected or banned)\n", db_name));
1077 if (ctdb->recovery_mode == CTDB_RECOVERY_ACTIVE
1078 && client->pid != ctdb->recoverd_pid
1079 && !ctdb->done_startup) {
1080 struct ctdb_deferred_attach_context *da_ctx = talloc(client, struct ctdb_deferred_attach_context);
1082 if (da_ctx == NULL) {
1083 DEBUG(DEBUG_ERR,("DB Attach to database %s deferral for client with pid:%d failed due to OOM.\n", db_name, client->pid));
1087 da_ctx->ctdb = ctdb;
1088 da_ctx->c = talloc_steal(da_ctx, c);
1089 talloc_set_destructor(da_ctx, ctdb_deferred_attach_destructor);
1090 DLIST_ADD(ctdb->deferred_attach, da_ctx);
1092 event_add_timed(ctdb->ev, da_ctx, timeval_current_ofs(ctdb->tunable.deferred_attach_timeout, 0), ctdb_deferred_attach_timeout, da_ctx);
1094 DEBUG(DEBUG_ERR,("DB Attach to database %s deferred for client with pid:%d since node is in recovery mode.\n", db_name, client->pid));
1095 *async_reply = true;
1100 /* the client can optionally pass additional tdb flags, but we
1101 only allow a subset of those on the database in ctdb. Note
1102 that tdb_flags is passed in via the (otherwise unused)
1103 srvid to the attach control */
1104 tdb_flags &= (TDB_NOSYNC|TDB_INCOMPATIBLE_HASH);
1106 /* see if we already have this name */
1107 db = ctdb_db_handle(ctdb, db_name);
1109 outdata->dptr = (uint8_t *)&db->db_id;
1110 outdata->dsize = sizeof(db->db_id);
1111 tdb_add_flags(db->ltdb->tdb, tdb_flags);
1115 if (ctdb_local_attach(ctdb, db_name, persistent, NULL, (tdb_flags&TDB_INCOMPATIBLE_HASH)?true:false) != 0) {
1119 db = ctdb_db_handle(ctdb, db_name);
1121 DEBUG(DEBUG_ERR,("Failed to find db handle for name '%s'\n", db_name));
1125 /* remember the flags the client has specified */
1126 tdb_add_flags(db->ltdb->tdb, tdb_flags);
1128 outdata->dptr = (uint8_t *)&db->db_id;
1129 outdata->dsize = sizeof(db->db_id);
1131 /* Try to ensure it's locked in mem */
1132 ctdb_lockdown_memory(ctdb);
1134 /* tell all the other nodes about this database */
1135 ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL, tdb_flags,
1136 persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:
1137 CTDB_CONTROL_DB_ATTACH,
1138 0, CTDB_CTRL_FLAG_NOREPLY,
1139 indata, NULL, NULL);
1147 attach to all existing persistent databases
1149 static int ctdb_attach_persistent(struct ctdb_context *ctdb,
1150 const char *unhealthy_reason)
1155 /* open the persistent db directory and scan it for files */
1156 d = opendir(ctdb->db_directory_persistent);
1161 while ((de=readdir(d))) {
1163 size_t len = strlen(de->d_name);
1165 int invalid_name = 0;
1167 s = talloc_strdup(ctdb, de->d_name);
1168 CTDB_NO_MEMORY(ctdb, s);
1170 /* only accept names ending in .tdb */
1171 p = strstr(s, ".tdb.");
1172 if (len < 7 || p == NULL) {
1177 /* only accept names ending with .tdb. and any number of digits */
1179 while (*q != 0 && invalid_name == 0) {
1180 if (!isdigit(*q++)) {
1184 if (invalid_name == 1 || sscanf(p+5, "%u", &node) != 1 || node != ctdb->pnn) {
1185 DEBUG(DEBUG_ERR,("Ignoring persistent database '%s'\n", de->d_name));
1191 if (ctdb_local_attach(ctdb, s, true, unhealthy_reason, 0) != 0) {
1192 DEBUG(DEBUG_ERR,("Failed to attach to persistent database '%s'\n", de->d_name));
1198 DEBUG(DEBUG_INFO,("Attached to persistent database %s\n", s));
1206 int ctdb_attach_databases(struct ctdb_context *ctdb)
1209 char *persistent_health_path = NULL;
1210 char *unhealthy_reason = NULL;
1211 bool first_try = true;
1213 if (ctdb->db_directory == NULL) {
1214 ctdb->db_directory = VARDIR "/ctdb";
1216 if (ctdb->db_directory_persistent == NULL) {
1217 ctdb->db_directory_persistent = VARDIR "/ctdb/persistent";
1219 if (ctdb->db_directory_state == NULL) {
1220 ctdb->db_directory_state = VARDIR "/ctdb/state";
1223 /* make sure the db directory exists */
1224 ret = mkdir(ctdb->db_directory, 0700);
1225 if (ret == -1 && errno != EEXIST) {
1226 DEBUG(DEBUG_CRIT,(__location__ " Unable to create ctdb directory '%s'\n",
1227 ctdb->db_directory));
1231 /* make sure the persistent db directory exists */
1232 ret = mkdir(ctdb->db_directory_persistent, 0700);
1233 if (ret == -1 && errno != EEXIST) {
1234 DEBUG(DEBUG_CRIT,(__location__ " Unable to create ctdb persistent directory '%s'\n",
1235 ctdb->db_directory_persistent));
1239 /* make sure the internal state db directory exists */
1240 ret = mkdir(ctdb->db_directory_state, 0700);
1241 if (ret == -1 && errno != EEXIST) {
1242 DEBUG(DEBUG_CRIT,(__location__ " Unable to create ctdb state directory '%s'\n",
1243 ctdb->db_directory_state));
1247 persistent_health_path = talloc_asprintf(ctdb, "%s/%s.%u",
1248 ctdb->db_directory_state,
1249 PERSISTENT_HEALTH_TDB,
1251 if (persistent_health_path == NULL) {
1252 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
1258 ctdb->db_persistent_health = tdb_wrap_open(ctdb, persistent_health_path,
1259 0, TDB_DISALLOW_NESTING,
1260 O_CREAT | O_RDWR, 0600);
1261 if (ctdb->db_persistent_health == NULL) {
1262 struct tdb_wrap *tdb;
1265 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s': %d - %s\n",
1266 persistent_health_path,
1269 talloc_free(persistent_health_path);
1270 talloc_free(unhealthy_reason);
1275 unhealthy_reason = talloc_asprintf(ctdb, "WARNING - '%s' %s - %s",
1276 persistent_health_path,
1277 "was cleared after a failure",
1278 "manual verification needed");
1279 if (unhealthy_reason == NULL) {
1280 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
1281 talloc_free(persistent_health_path);
1285 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - retrying after CLEAR_IF_FIRST\n",
1286 persistent_health_path));
1287 tdb = tdb_wrap_open(ctdb, persistent_health_path,
1288 0, TDB_CLEAR_IF_FIRST | TDB_DISALLOW_NESTING,
1289 O_CREAT | O_RDWR, 0600);
1291 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - with CLEAR_IF_FIRST: %d - %s\n",
1292 persistent_health_path,
1295 talloc_free(persistent_health_path);
1296 talloc_free(unhealthy_reason);
1303 ret = tdb_check(ctdb->db_persistent_health->tdb, NULL, NULL);
1305 struct tdb_wrap *tdb;
1307 talloc_free(ctdb->db_persistent_health);
1308 ctdb->db_persistent_health = NULL;
1311 DEBUG(DEBUG_CRIT,("tdb_check('%s') failed\n",
1312 persistent_health_path));
1313 talloc_free(persistent_health_path);
1314 talloc_free(unhealthy_reason);
1319 unhealthy_reason = talloc_asprintf(ctdb, "WARNING - '%s' %s - %s",
1320 persistent_health_path,
1321 "was cleared after a failure",
1322 "manual verification needed");
1323 if (unhealthy_reason == NULL) {
1324 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
1325 talloc_free(persistent_health_path);
1329 DEBUG(DEBUG_CRIT,("tdb_check('%s') failed - retrying after CLEAR_IF_FIRST\n",
1330 persistent_health_path));
1331 tdb = tdb_wrap_open(ctdb, persistent_health_path,
1332 0, TDB_CLEAR_IF_FIRST | TDB_DISALLOW_NESTING,
1333 O_CREAT | O_RDWR, 0600);
1335 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - with CLEAR_IF_FIRST: %d - %s\n",
1336 persistent_health_path,
1339 talloc_free(persistent_health_path);
1340 talloc_free(unhealthy_reason);
1347 talloc_free(persistent_health_path);
1349 ret = ctdb_attach_persistent(ctdb, unhealthy_reason);
1350 talloc_free(unhealthy_reason);
1359 called when a broadcast seqnum update comes in
1361 int32_t ctdb_ltdb_update_seqnum(struct ctdb_context *ctdb, uint32_t db_id, uint32_t srcnode)
1363 struct ctdb_db_context *ctdb_db;
1364 if (srcnode == ctdb->pnn) {
1365 /* don't update ourselves! */
1369 ctdb_db = find_ctdb_db(ctdb, db_id);
1371 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_ltdb_update_seqnum\n", db_id));
1375 if (ctdb_db->unhealthy_reason) {
1376 DEBUG(DEBUG_ERR,("db(%s) unhealty in ctdb_ltdb_update_seqnum: %s\n",
1377 ctdb_db->db_name, ctdb_db->unhealthy_reason));
1381 tdb_increment_seqnum_nonblock(ctdb_db->ltdb->tdb);
1382 ctdb_db->seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1387 timer to check for seqnum changes in a ltdb and propogate them
1389 static void ctdb_ltdb_seqnum_check(struct event_context *ev, struct timed_event *te,
1390 struct timeval t, void *p)
1392 struct ctdb_db_context *ctdb_db = talloc_get_type(p, struct ctdb_db_context);
1393 struct ctdb_context *ctdb = ctdb_db->ctdb;
1394 uint32_t new_seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1395 if (new_seqnum != ctdb_db->seqnum) {
1396 /* something has changed - propogate it */
1398 data.dptr = (uint8_t *)&ctdb_db->db_id;
1399 data.dsize = sizeof(uint32_t);
1400 ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_VNNMAP, 0,
1401 CTDB_CONTROL_UPDATE_SEQNUM, 0, CTDB_CTRL_FLAG_NOREPLY,
1404 ctdb_db->seqnum = new_seqnum;
1406 /* setup a new timer */
1407 ctdb_db->seqnum_update =
1408 event_add_timed(ctdb->ev, ctdb_db,
1409 timeval_current_ofs(ctdb->tunable.seqnum_interval/1000, (ctdb->tunable.seqnum_interval%1000)*1000),
1410 ctdb_ltdb_seqnum_check, ctdb_db);
1414 enable seqnum handling on this db
1416 int32_t ctdb_ltdb_enable_seqnum(struct ctdb_context *ctdb, uint32_t db_id)
1418 struct ctdb_db_context *ctdb_db;
1419 ctdb_db = find_ctdb_db(ctdb, db_id);
1421 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_ltdb_enable_seqnum\n", db_id));
1425 if (ctdb_db->seqnum_update == NULL) {
1426 ctdb_db->seqnum_update =
1427 event_add_timed(ctdb->ev, ctdb_db,
1428 timeval_current_ofs(ctdb->tunable.seqnum_interval/1000, (ctdb->tunable.seqnum_interval%1000)*1000),
1429 ctdb_ltdb_seqnum_check, ctdb_db);
1432 tdb_enable_seqnum(ctdb_db->ltdb->tdb);
1433 ctdb_db->seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1437 int32_t ctdb_control_set_db_priority(struct ctdb_context *ctdb, TDB_DATA indata)
1439 struct ctdb_db_priority *db_prio = (struct ctdb_db_priority *)indata.dptr;
1440 struct ctdb_db_context *ctdb_db;
1442 ctdb_db = find_ctdb_db(ctdb, db_prio->db_id);
1444 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_set_db_priority\n", db_prio->db_id));
1448 if ((db_prio->priority<1) || (db_prio->priority>NUM_DB_PRIORITIES)) {
1449 DEBUG(DEBUG_ERR,("Trying to set invalid priority : %u\n", db_prio->priority));
1453 ctdb_db->priority = db_prio->priority;
1454 DEBUG(DEBUG_INFO,("Setting DB priority to %u for db 0x%08x\n", db_prio->priority, db_prio->db_id));