2 ctdb ltdb code - server side
4 Copyright (C) Andrew Tridgell 2007
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program; if not, see <http://www.gnu.org/licenses/>.
21 #include "lib/tdb/include/tdb.h"
22 #include "system/network.h"
23 #include "system/filesys.h"
24 #include "system/dir.h"
25 #include "system/time.h"
26 #include "../include/ctdb_private.h"
27 #include "../common/rb_tree.h"
29 #include "lib/util/dlinklist.h"
32 #define PERSISTENT_HEALTH_TDB "persistent_health.tdb"
35 * write a record to a normal database
37 * This is the server-variant of the ctdb_ltdb_store function.
38 * It contains logic to determine whether a record should be
39 * stored or deleted. It also sends SCHEDULE_FOR_DELETION
40 * controls to the local ctdb daemon if apporpriate.
42 static int ctdb_ltdb_store_server(struct ctdb_db_context *ctdb_db,
44 struct ctdb_ltdb_header *header,
47 struct ctdb_context *ctdb = ctdb_db->ctdb;
50 bool seqnum_suppressed = false;
52 bool schedule_for_deletion = false;
53 bool remove_from_delete_queue = false;
56 if (ctdb->flags & CTDB_FLAG_TORTURE) {
57 struct ctdb_ltdb_header *h2;
58 rec = tdb_fetch(ctdb_db->ltdb->tdb, key);
59 h2 = (struct ctdb_ltdb_header *)rec.dptr;
60 if (rec.dptr && rec.dsize >= sizeof(h2) && h2->rsn > header->rsn) {
61 DEBUG(DEBUG_CRIT,("RSN regression! %llu %llu\n",
62 (unsigned long long)h2->rsn, (unsigned long long)header->rsn));
64 if (rec.dptr) free(rec.dptr);
67 if (ctdb->vnn_map == NULL) {
69 * Called from a client: always store the record
70 * Also don't call ctdb_lmaster since it uses the vnn_map!
76 lmaster = ctdb_lmaster(ctdb_db->ctdb, &key);
79 * If we migrate an empty record off to another node
80 * and the record has not been migrated with data,
81 * delete the record instead of storing the empty record.
83 if (data.dsize != 0) {
85 } else if (header->flags & (CTDB_REC_RO_HAVE_DELEGATIONS|CTDB_REC_RO_HAVE_READONLY|CTDB_REC_RO_REVOKING_READONLY|CTDB_REC_RO_REVOKE_COMPLETE)) {
87 } else if (ctdb_db->persistent) {
89 } else if (header->flags & CTDB_REC_FLAG_AUTOMATIC) {
91 * The record is not created by the client but
92 * automatically by the ctdb_ltdb_fetch logic that
93 * creates a record with an initial header in the
94 * ltdb before trying to migrate the record from
95 * the current lmaster. Keep it instead of trying
96 * to delete the non-existing record...
99 schedule_for_deletion = true;
100 } else if (header->flags & CTDB_REC_FLAG_MIGRATED_WITH_DATA) {
102 } else if (ctdb_db->ctdb->pnn == lmaster) {
104 * If we are lmaster, then we usually keep the record.
105 * But if we retrieve the dmaster role by a VACUUM_MIGRATE
106 * and the record is empty and has never been migrated
107 * with data, then we should delete it instead of storing it.
108 * This is part of the vacuuming process.
110 * The reason that we usually need to store even empty records
111 * on the lmaster is that a client operating directly on the
112 * lmaster (== dmaster) expects the local copy of the record to
113 * exist after successful ctdb migrate call. If the record does
114 * not exist, the client goes into a migrate loop and eventually
115 * fails. So storing the empty record makes sure that we do not
116 * need to change the client code.
118 if (!(header->flags & CTDB_REC_FLAG_VACUUM_MIGRATED)) {
120 } else if (ctdb_db->ctdb->pnn != header->dmaster) {
123 } else if (ctdb_db->ctdb->pnn == header->dmaster) {
128 if ((data.dsize == 0) &&
129 !ctdb_db->persistent &&
130 (ctdb_db->ctdb->pnn == header->dmaster) &&
131 !(header->flags & (CTDB_REC_RO_HAVE_DELEGATIONS|CTDB_REC_RO_HAVE_READONLY|CTDB_REC_RO_REVOKING_READONLY|CTDB_REC_RO_REVOKE_COMPLETE)))
133 schedule_for_deletion = true;
135 remove_from_delete_queue = !schedule_for_deletion;
140 * The VACUUM_MIGRATED flag is only set temporarily for
141 * the above logic when the record was retrieved by a
142 * VACUUM_MIGRATE call and should not be stored in the
145 * The VACUUM_MIGRATE call is triggered by a vacuum fetch,
146 * and there are two cases in which the corresponding record
147 * is stored in the local database:
148 * 1. The record has been migrated with data in the past
149 * (the MIGRATED_WITH_DATA record flag is set).
150 * 2. The record has been filled with data again since it
151 * had been submitted in the VACUUM_FETCH message to the
153 * For such records it is important to not store the
154 * VACUUM_MIGRATED flag in the database.
156 header->flags &= ~CTDB_REC_FLAG_VACUUM_MIGRATED;
159 * Similarly, clear the AUTOMATIC flag which should not enter
160 * the local database copy since this would require client
161 * modifications to clear the flag when the client stores
164 header->flags &= ~CTDB_REC_FLAG_AUTOMATIC;
166 rec.dsize = sizeof(*header) + data.dsize;
167 rec.dptr = talloc_size(ctdb, rec.dsize);
168 CTDB_NO_MEMORY(ctdb, rec.dptr);
170 memcpy(rec.dptr, header, sizeof(*header));
171 memcpy(rec.dptr + sizeof(*header), data.dptr, data.dsize);
173 /* Databases with seqnum updates enabled only get their seqnum
174 changes when/if we modify the data */
175 if (ctdb_db->seqnum_update != NULL) {
177 old = tdb_fetch(ctdb_db->ltdb->tdb, key);
179 if ( (old.dsize == rec.dsize)
180 && !memcmp(old.dptr+sizeof(struct ctdb_ltdb_header),
181 rec.dptr+sizeof(struct ctdb_ltdb_header),
182 rec.dsize-sizeof(struct ctdb_ltdb_header)) ) {
183 tdb_remove_flags(ctdb_db->ltdb->tdb, TDB_SEQNUM);
184 seqnum_suppressed = true;
186 if (old.dptr) free(old.dptr);
189 DEBUG(DEBUG_DEBUG, (__location__ " db[%s]: %s record: hash[0x%08x]\n",
191 keep?"storing":"deleting",
195 ret = tdb_store(ctdb_db->ltdb->tdb, key, rec, TDB_REPLACE);
197 ret = tdb_delete(ctdb_db->ltdb->tdb, key);
204 tdb_error(ctdb_db->ltdb->tdb) == TDB_ERR_NOEXIST)
209 DEBUG(lvl, (__location__ " db[%s]: Failed to %s record: "
212 keep?"store":"delete", ret,
213 tdb_errorstr(ctdb_db->ltdb->tdb)));
215 schedule_for_deletion = false;
216 remove_from_delete_queue = false;
218 if (seqnum_suppressed) {
219 tdb_add_flags(ctdb_db->ltdb->tdb, TDB_SEQNUM);
222 talloc_free(rec.dptr);
224 if (schedule_for_deletion) {
226 ret2 = ctdb_local_schedule_for_deletion(ctdb_db, header, key);
228 DEBUG(DEBUG_ERR, (__location__ " ctdb_local_schedule_for_deletion failed.\n"));
232 if (remove_from_delete_queue) {
233 ctdb_local_remove_from_delete_queue(ctdb_db, header, key);
239 struct lock_fetch_state {
240 struct ctdb_context *ctdb;
241 void (*recv_pkt)(void *, struct ctdb_req_header *);
243 struct ctdb_req_header *hdr;
245 bool ignore_generation;
249 called when we should retry the operation
251 static void lock_fetch_callback(void *p, bool locked)
253 struct lock_fetch_state *state = talloc_get_type(p, struct lock_fetch_state);
254 if (!state->ignore_generation &&
255 state->generation != state->ctdb->vnn_map->generation) {
256 DEBUG(DEBUG_NOTICE,("Discarding previous generation lockwait packet\n"));
257 talloc_free(state->hdr);
260 state->recv_pkt(state->recv_context, state->hdr);
261 DEBUG(DEBUG_INFO,(__location__ " PACKET REQUEUED\n"));
266 do a non-blocking ltdb_lock, deferring this ctdb request until we
269 It does the following:
271 1) tries to get the chainlock. If it succeeds, then it returns 0
273 2) if it fails to get a chainlock immediately then it sets up a
274 non-blocking chainlock via ctdb_lock_record, and when it gets the
275 chainlock it re-submits this ctdb request to the main packet
278 This effectively queues all ctdb requests that cannot be
279 immediately satisfied until it can get the lock. This means that
280 the main ctdb daemon will not block waiting for a chainlock held by
283 There are 3 possible return values:
285 0: means that it got the lock immediately.
286 -1: means that it failed to get the lock, and won't retry
287 -2: means that it failed to get the lock immediately, but will retry
289 int ctdb_ltdb_lock_requeue(struct ctdb_db_context *ctdb_db,
290 TDB_DATA key, struct ctdb_req_header *hdr,
291 void (*recv_pkt)(void *, struct ctdb_req_header *),
292 void *recv_context, bool ignore_generation)
295 struct tdb_context *tdb = ctdb_db->ltdb->tdb;
296 struct lock_request *lreq;
297 struct lock_fetch_state *state;
299 ret = tdb_chainlock_nonblock(tdb, key);
302 !(errno == EACCES || errno == EAGAIN || errno == EDEADLK)) {
303 /* a hard failure - don't try again */
307 /* when torturing, ensure we test the contended path */
308 if ((ctdb_db->ctdb->flags & CTDB_FLAG_TORTURE) &&
311 tdb_chainunlock(tdb, key);
314 /* first the non-contended path */
319 state = talloc(hdr, struct lock_fetch_state);
320 state->ctdb = ctdb_db->ctdb;
322 state->recv_pkt = recv_pkt;
323 state->recv_context = recv_context;
324 state->generation = ctdb_db->ctdb->vnn_map->generation;
325 state->ignore_generation = ignore_generation;
327 /* now the contended path */
328 lreq = ctdb_lock_record(ctdb_db, key, true, lock_fetch_callback, state);
333 /* we need to move the packet off the temporary context in ctdb_input_pkt(),
334 so it won't be freed yet */
335 talloc_steal(state, hdr);
337 /* now tell the caller than we will retry asynchronously */
342 a varient of ctdb_ltdb_lock_requeue that also fetches the record
344 int ctdb_ltdb_lock_fetch_requeue(struct ctdb_db_context *ctdb_db,
345 TDB_DATA key, struct ctdb_ltdb_header *header,
346 struct ctdb_req_header *hdr, TDB_DATA *data,
347 void (*recv_pkt)(void *, struct ctdb_req_header *),
348 void *recv_context, bool ignore_generation)
352 ret = ctdb_ltdb_lock_requeue(ctdb_db, key, hdr, recv_pkt,
353 recv_context, ignore_generation);
355 ret = ctdb_ltdb_fetch(ctdb_db, key, header, hdr, data);
358 uret = ctdb_ltdb_unlock(ctdb_db, key);
360 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", uret));
369 paraoid check to see if the db is empty
371 static void ctdb_check_db_empty(struct ctdb_db_context *ctdb_db)
373 struct tdb_context *tdb = ctdb_db->ltdb->tdb;
374 int count = tdb_traverse_read(tdb, NULL, NULL);
376 DEBUG(DEBUG_ALERT,(__location__ " tdb '%s' not empty on attach! aborting\n",
378 ctdb_fatal(ctdb_db->ctdb, "database not empty on attach");
382 int ctdb_load_persistent_health(struct ctdb_context *ctdb,
383 struct ctdb_db_context *ctdb_db)
385 struct tdb_context *tdb = ctdb->db_persistent_health->tdb;
391 key.dptr = discard_const_p(uint8_t, ctdb_db->db_name);
392 key.dsize = strlen(ctdb_db->db_name);
394 old = ctdb_db->unhealthy_reason;
395 ctdb_db->unhealthy_reason = NULL;
397 val = tdb_fetch(tdb, key);
399 reason = talloc_strndup(ctdb_db,
400 (const char *)val.dptr,
402 if (reason == NULL) {
403 DEBUG(DEBUG_ALERT,(__location__ " talloc_strndup(%d) failed\n",
405 ctdb_db->unhealthy_reason = old;
416 ctdb_db->unhealthy_reason = reason;
420 int ctdb_update_persistent_health(struct ctdb_context *ctdb,
421 struct ctdb_db_context *ctdb_db,
422 const char *given_reason,/* NULL means healthy */
423 int num_healthy_nodes)
425 struct tdb_context *tdb = ctdb->db_persistent_health->tdb;
429 char *new_reason = NULL;
430 char *old_reason = NULL;
432 ret = tdb_transaction_start(tdb);
434 DEBUG(DEBUG_ALERT,(__location__ " tdb_transaction_start('%s') failed: %d - %s\n",
435 tdb_name(tdb), ret, tdb_errorstr(tdb)));
439 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
441 DEBUG(DEBUG_ALERT,(__location__ " ctdb_load_persistent_health('%s') failed: %d\n",
442 ctdb_db->db_name, ret));
445 old_reason = ctdb_db->unhealthy_reason;
447 key.dptr = discard_const_p(uint8_t, ctdb_db->db_name);
448 key.dsize = strlen(ctdb_db->db_name);
451 new_reason = talloc_strdup(ctdb_db, given_reason);
452 if (new_reason == NULL) {
453 DEBUG(DEBUG_ALERT,(__location__ " talloc_strdup(%s) failed\n",
457 } else if (old_reason && num_healthy_nodes == 0) {
459 * If the reason indicates ok, but there where no healthy nodes
460 * available, that it means, we have not recovered valid content
461 * of the db. So if there's an old reason, prefix it with
462 * "NO-HEALTHY-NODES - "
466 #define _TMP_PREFIX "NO-HEALTHY-NODES - "
467 ret = strncmp(_TMP_PREFIX, old_reason, strlen(_TMP_PREFIX));
469 prefix = _TMP_PREFIX;
473 new_reason = talloc_asprintf(ctdb_db, "%s%s",
475 if (new_reason == NULL) {
476 DEBUG(DEBUG_ALERT,(__location__ " talloc_asprintf(%s%s) failed\n",
477 prefix, old_reason));
484 val.dptr = discard_const_p(uint8_t, new_reason);
485 val.dsize = strlen(new_reason);
487 ret = tdb_store(tdb, key, val, TDB_REPLACE);
489 tdb_transaction_cancel(tdb);
490 DEBUG(DEBUG_ALERT,(__location__ " tdb_store('%s', %s, %s) failed: %d - %s\n",
491 tdb_name(tdb), ctdb_db->db_name, new_reason,
492 ret, tdb_errorstr(tdb)));
493 talloc_free(new_reason);
496 DEBUG(DEBUG_ALERT,("Updated db health for db(%s) to: %s\n",
497 ctdb_db->db_name, new_reason));
498 } else if (old_reason) {
499 ret = tdb_delete(tdb, key);
501 tdb_transaction_cancel(tdb);
502 DEBUG(DEBUG_ALERT,(__location__ " tdb_delete('%s', %s) failed: %d - %s\n",
503 tdb_name(tdb), ctdb_db->db_name,
504 ret, tdb_errorstr(tdb)));
505 talloc_free(new_reason);
508 DEBUG(DEBUG_NOTICE,("Updated db health for db(%s): OK\n",
512 ret = tdb_transaction_commit(tdb);
513 if (ret != TDB_SUCCESS) {
514 DEBUG(DEBUG_ALERT,(__location__ " tdb_transaction_commit('%s') failed: %d - %s\n",
515 tdb_name(tdb), ret, tdb_errorstr(tdb)));
516 talloc_free(new_reason);
520 talloc_free(old_reason);
521 ctdb_db->unhealthy_reason = new_reason;
526 static int ctdb_backup_corrupted_tdb(struct ctdb_context *ctdb,
527 struct ctdb_db_context *ctdb_db)
529 time_t now = time(NULL);
537 /* formatted like: foo.tdb.0.corrupted.20091204160825.0Z */
538 new_path = talloc_asprintf(ctdb_db, "%s.corrupted."
539 "%04u%02u%02u%02u%02u%02u.0Z",
541 tm->tm_year+1900, tm->tm_mon+1,
542 tm->tm_mday, tm->tm_hour, tm->tm_min,
544 if (new_path == NULL) {
545 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
549 new_reason = talloc_asprintf(ctdb_db,
550 "ERROR - Backup of corrupted TDB in '%s'",
552 if (new_reason == NULL) {
553 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
556 ret = ctdb_update_persistent_health(ctdb, ctdb_db, new_reason, 0);
557 talloc_free(new_reason);
559 DEBUG(DEBUG_CRIT,(__location__
560 ": ctdb_backup_corrupted_tdb(%s) not implemented yet\n",
565 ret = rename(ctdb_db->db_path, new_path);
567 DEBUG(DEBUG_CRIT,(__location__
568 ": ctdb_backup_corrupted_tdb(%s) rename to %s failed: %d - %s\n",
569 ctdb_db->db_path, new_path,
570 errno, strerror(errno)));
571 talloc_free(new_path);
575 DEBUG(DEBUG_CRIT,(__location__
576 ": ctdb_backup_corrupted_tdb(%s) renamed to %s\n",
577 ctdb_db->db_path, new_path));
578 talloc_free(new_path);
582 int ctdb_recheck_persistent_health(struct ctdb_context *ctdb)
584 struct ctdb_db_context *ctdb_db;
589 for (ctdb_db = ctdb->db_list; ctdb_db; ctdb_db = ctdb_db->next) {
590 if (!ctdb_db->persistent) {
594 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
596 DEBUG(DEBUG_ALERT,(__location__
597 " load persistent health for '%s' failed\n",
602 if (ctdb_db->unhealthy_reason == NULL) {
604 DEBUG(DEBUG_INFO,(__location__
605 " persistent db '%s' healthy\n",
611 DEBUG(DEBUG_ALERT,(__location__
612 " persistent db '%s' unhealthy: %s\n",
614 ctdb_db->unhealthy_reason));
616 DEBUG((fail!=0)?DEBUG_ALERT:DEBUG_NOTICE,
617 ("ctdb_recheck_presistent_health: OK[%d] FAIL[%d]\n",
629 mark a database - as healthy
631 int32_t ctdb_control_db_set_healthy(struct ctdb_context *ctdb, TDB_DATA indata)
633 uint32_t db_id = *(uint32_t *)indata.dptr;
634 struct ctdb_db_context *ctdb_db;
636 bool may_recover = false;
638 ctdb_db = find_ctdb_db(ctdb, db_id);
640 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%x\n", db_id));
644 if (ctdb_db->unhealthy_reason) {
648 ret = ctdb_update_persistent_health(ctdb, ctdb_db, NULL, 1);
650 DEBUG(DEBUG_ERR,(__location__
651 " ctdb_update_persistent_health(%s) failed\n",
656 if (may_recover && !ctdb->done_startup) {
657 DEBUG(DEBUG_ERR, (__location__ " db %s become healthy - force recovery for startup\n",
659 ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
665 int32_t ctdb_control_db_get_health(struct ctdb_context *ctdb,
669 uint32_t db_id = *(uint32_t *)indata.dptr;
670 struct ctdb_db_context *ctdb_db;
673 ctdb_db = find_ctdb_db(ctdb, db_id);
675 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%x\n", db_id));
679 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
681 DEBUG(DEBUG_ERR,(__location__
682 " ctdb_load_persistent_health(%s) failed\n",
688 if (ctdb_db->unhealthy_reason) {
689 outdata->dptr = (uint8_t *)ctdb_db->unhealthy_reason;
690 outdata->dsize = strlen(ctdb_db->unhealthy_reason)+1;
697 int ctdb_set_db_readonly(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db)
701 if (ctdb_db->readonly) {
705 if (ctdb_db->persistent) {
706 DEBUG(DEBUG_ERR,("Trying to set persistent database with readonly property\n"));
710 ropath = talloc_asprintf(ctdb_db, "%s.RO", ctdb_db->db_path);
711 if (ropath == NULL) {
712 DEBUG(DEBUG_CRIT,("Failed to asprintf the tracking database\n"));
715 ctdb_db->rottdb = tdb_open(ropath,
716 ctdb->tunable.database_hash_size,
717 TDB_NOLOCK|TDB_CLEAR_IF_FIRST|TDB_NOSYNC,
719 if (ctdb_db->rottdb == NULL) {
720 DEBUG(DEBUG_CRIT,("Failed to open/create the tracking database '%s'\n", ropath));
725 DEBUG(DEBUG_NOTICE,("OPENED tracking database : '%s'\n", ropath));
727 ctdb_db->readonly = true;
733 attach to a database, handling both persistent and non-persistent databases
734 return 0 on success, -1 on failure
736 static int ctdb_local_attach(struct ctdb_context *ctdb, const char *db_name,
737 bool persistent, const char *unhealthy_reason,
740 struct ctdb_db_context *ctdb_db, *tmp_db;
745 int remaining_tries = 0;
747 ctdb_db = talloc_zero(ctdb, struct ctdb_db_context);
748 CTDB_NO_MEMORY(ctdb, ctdb_db);
750 ctdb_db->priority = 1;
751 ctdb_db->ctdb = ctdb;
752 ctdb_db->db_name = talloc_strdup(ctdb_db, db_name);
753 CTDB_NO_MEMORY(ctdb, ctdb_db->db_name);
755 key.dsize = strlen(db_name)+1;
756 key.dptr = discard_const(db_name);
757 ctdb_db->db_id = ctdb_hash(&key);
758 ctdb_db->persistent = persistent;
760 if (!ctdb_db->persistent) {
761 ctdb_db->delete_queue = trbt_create(ctdb_db, 0);
762 if (ctdb_db->delete_queue == NULL) {
763 CTDB_NO_MEMORY(ctdb, ctdb_db->delete_queue);
766 ctdb_db->ctdb_ltdb_store_fn = ctdb_ltdb_store_server;
769 /* check for hash collisions */
770 for (tmp_db=ctdb->db_list;tmp_db;tmp_db=tmp_db->next) {
771 if (tmp_db->db_id == ctdb_db->db_id) {
772 DEBUG(DEBUG_CRIT,("db_id 0x%x hash collision. name1='%s' name2='%s'\n",
773 tmp_db->db_id, db_name, tmp_db->db_name));
774 talloc_free(ctdb_db);
780 if (unhealthy_reason) {
781 ret = ctdb_update_persistent_health(ctdb, ctdb_db,
782 unhealthy_reason, 0);
784 DEBUG(DEBUG_ALERT,(__location__ " ctdb_update_persistent_health('%s','%s') failed: %d\n",
785 ctdb_db->db_name, unhealthy_reason, ret));
786 talloc_free(ctdb_db);
791 if (ctdb->max_persistent_check_errors > 0) {
794 if (ctdb->done_startup) {
798 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
800 DEBUG(DEBUG_ALERT,(__location__ " ctdb_load_persistent_health('%s') failed: %d\n",
801 ctdb_db->db_name, ret));
802 talloc_free(ctdb_db);
807 if (ctdb_db->unhealthy_reason && remaining_tries == 0) {
808 DEBUG(DEBUG_ALERT,(__location__ "ERROR: tdb %s is marked as unhealthy: %s\n",
809 ctdb_db->db_name, ctdb_db->unhealthy_reason));
810 talloc_free(ctdb_db);
814 if (ctdb_db->unhealthy_reason) {
815 /* this is just a warning, but we want that in the log file! */
816 DEBUG(DEBUG_ALERT,(__location__ "Warning: tdb %s is marked as unhealthy: %s\n",
817 ctdb_db->db_name, ctdb_db->unhealthy_reason));
820 /* open the database */
821 ctdb_db->db_path = talloc_asprintf(ctdb_db, "%s/%s.%u",
822 persistent?ctdb->db_directory_persistent:ctdb->db_directory,
825 tdb_flags = persistent? TDB_DEFAULT : TDB_CLEAR_IF_FIRST | TDB_NOSYNC;
826 if (ctdb->valgrinding) {
827 tdb_flags |= TDB_NOMMAP;
829 tdb_flags |= TDB_DISALLOW_NESTING;
831 tdb_flags |= TDB_INCOMPATIBLE_HASH;
835 ctdb_db->ltdb = tdb_wrap_open(ctdb, ctdb_db->db_path,
836 ctdb->tunable.database_hash_size,
838 O_CREAT|O_RDWR, mode);
839 if (ctdb_db->ltdb == NULL) {
841 int saved_errno = errno;
844 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s': %d - %s\n",
847 strerror(saved_errno)));
848 talloc_free(ctdb_db);
852 if (remaining_tries == 0) {
853 DEBUG(DEBUG_CRIT,(__location__
854 "Failed to open persistent tdb '%s': %d - %s\n",
857 strerror(saved_errno)));
858 talloc_free(ctdb_db);
862 ret = stat(ctdb_db->db_path, &st);
864 DEBUG(DEBUG_CRIT,(__location__
865 "Failed to open persistent tdb '%s': %d - %s\n",
868 strerror(saved_errno)));
869 talloc_free(ctdb_db);
873 ret = ctdb_backup_corrupted_tdb(ctdb, ctdb_db);
875 DEBUG(DEBUG_CRIT,(__location__
876 "Failed to open persistent tdb '%s': %d - %s\n",
879 strerror(saved_errno)));
880 talloc_free(ctdb_db);
890 ctdb_check_db_empty(ctdb_db);
892 ret = tdb_check(ctdb_db->ltdb->tdb, NULL, NULL);
897 DEBUG(DEBUG_CRIT,("tdb_check(%s) failed: %d - %s\n",
898 ctdb_db->db_path, ret,
899 tdb_errorstr(ctdb_db->ltdb->tdb)));
900 if (remaining_tries == 0) {
901 talloc_free(ctdb_db);
905 fd = tdb_fd(ctdb_db->ltdb->tdb);
906 ret = fstat(fd, &st);
908 DEBUG(DEBUG_CRIT,(__location__
909 "Failed to fstat() persistent tdb '%s': %d - %s\n",
913 talloc_free(ctdb_db);
918 talloc_free(ctdb_db->ltdb);
919 ctdb_db->ltdb = NULL;
921 ret = ctdb_backup_corrupted_tdb(ctdb, ctdb_db);
923 DEBUG(DEBUG_CRIT,("Failed to backup corrupted tdb '%s'\n",
925 talloc_free(ctdb_db);
935 /* set up a rb tree we can use to track which records we have a
936 fetch-lock in-flight for so we can defer any additional calls
939 ctdb_db->deferred_fetch = trbt_create(ctdb_db, 0);
940 if (ctdb_db->deferred_fetch == NULL) {
941 DEBUG(DEBUG_ERR,("Failed to create deferred fetch rb tree for ctdb database\n"));
942 talloc_free(ctdb_db);
946 DLIST_ADD(ctdb->db_list, ctdb_db);
948 /* setting this can help some high churn databases */
949 tdb_set_max_dead(ctdb_db->ltdb->tdb, ctdb->tunable.database_max_dead);
952 all databases support the "null" function. we need this in
953 order to do forced migration of records
955 ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_null_func, CTDB_NULL_FUNC);
957 DEBUG(DEBUG_CRIT,("Failed to setup null function for '%s'\n", ctdb_db->db_name));
958 talloc_free(ctdb_db);
963 all databases support the "fetch" function. we need this
964 for efficient Samba3 ctdb fetch
966 ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_fetch_func, CTDB_FETCH_FUNC);
968 DEBUG(DEBUG_CRIT,("Failed to setup fetch function for '%s'\n", ctdb_db->db_name));
969 talloc_free(ctdb_db);
974 all databases support the "fetch_with_header" function. we need this
975 for efficient readonly record fetches
977 ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_fetch_with_header_func, CTDB_FETCH_WITH_HEADER_FUNC);
979 DEBUG(DEBUG_CRIT,("Failed to setup fetch function for '%s'\n", ctdb_db->db_name));
980 talloc_free(ctdb_db);
984 ret = ctdb_vacuum_init(ctdb_db);
986 DEBUG(DEBUG_CRIT,("Failed to setup vacuuming for "
987 "database '%s'\n", ctdb_db->db_name));
988 talloc_free(ctdb_db);
993 DEBUG(DEBUG_INFO,("Attached to database '%s'\n", ctdb_db->db_path));
1000 struct ctdb_deferred_attach_context {
1001 struct ctdb_deferred_attach_context *next, *prev;
1002 struct ctdb_context *ctdb;
1003 struct ctdb_req_control *c;
1007 static int ctdb_deferred_attach_destructor(struct ctdb_deferred_attach_context *da_ctx)
1009 DLIST_REMOVE(da_ctx->ctdb->deferred_attach, da_ctx);
1014 static void ctdb_deferred_attach_timeout(struct event_context *ev, struct timed_event *te, struct timeval t, void *private_data)
1016 struct ctdb_deferred_attach_context *da_ctx = talloc_get_type(private_data, struct ctdb_deferred_attach_context);
1017 struct ctdb_context *ctdb = da_ctx->ctdb;
1019 ctdb_request_control_reply(ctdb, da_ctx->c, NULL, -1, NULL);
1020 talloc_free(da_ctx);
1023 static void ctdb_deferred_attach_callback(struct event_context *ev, struct timed_event *te, struct timeval t, void *private_data)
1025 struct ctdb_deferred_attach_context *da_ctx = talloc_get_type(private_data, struct ctdb_deferred_attach_context);
1026 struct ctdb_context *ctdb = da_ctx->ctdb;
1028 /* This talloc-steals the packet ->c */
1029 ctdb_input_pkt(ctdb, (struct ctdb_req_header *)da_ctx->c);
1030 talloc_free(da_ctx);
1033 int ctdb_process_deferred_attach(struct ctdb_context *ctdb)
1035 struct ctdb_deferred_attach_context *da_ctx;
1037 /* call it from the main event loop as soon as the current event
1040 while ((da_ctx = ctdb->deferred_attach) != NULL) {
1041 DLIST_REMOVE(ctdb->deferred_attach, da_ctx);
1042 event_add_timed(ctdb->ev, da_ctx, timeval_current_ofs(1,0), ctdb_deferred_attach_callback, da_ctx);
1049 a client has asked to attach a new database
1051 int32_t ctdb_control_db_attach(struct ctdb_context *ctdb, TDB_DATA indata,
1052 TDB_DATA *outdata, uint64_t tdb_flags,
1053 bool persistent, uint32_t client_id,
1054 struct ctdb_req_control *c,
1057 const char *db_name = (const char *)indata.dptr;
1058 struct ctdb_db_context *db;
1059 struct ctdb_node *node = ctdb->nodes[ctdb->pnn];
1060 struct ctdb_client *client = NULL;
1062 if (ctdb->tunable.allow_client_db_attach == 0) {
1063 DEBUG(DEBUG_ERR, ("DB Attach to database %s denied by tunable "
1064 "AllowClientDBAccess == 0\n", db_name));
1068 /* dont allow any local clients to attach while we are in recovery mode
1069 * except for the recovery daemon.
1070 * allow all attach from the network since these are always from remote
1073 if (client_id != 0) {
1074 client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
1076 if (client != NULL) {
1077 /* If the node is inactive it is not part of the cluster
1078 and we should not allow clients to attach to any
1081 if (node->flags & NODE_FLAGS_INACTIVE) {
1082 DEBUG(DEBUG_ERR,("DB Attach to database %s refused since node is inactive (disconnected or banned)\n", db_name));
1086 if (ctdb->recovery_mode == CTDB_RECOVERY_ACTIVE
1087 && client->pid != ctdb->recoverd_pid
1088 && !ctdb->done_startup) {
1089 struct ctdb_deferred_attach_context *da_ctx = talloc(client, struct ctdb_deferred_attach_context);
1091 if (da_ctx == NULL) {
1092 DEBUG(DEBUG_ERR,("DB Attach to database %s deferral for client with pid:%d failed due to OOM.\n", db_name, client->pid));
1096 da_ctx->ctdb = ctdb;
1097 da_ctx->c = talloc_steal(da_ctx, c);
1098 talloc_set_destructor(da_ctx, ctdb_deferred_attach_destructor);
1099 DLIST_ADD(ctdb->deferred_attach, da_ctx);
1101 event_add_timed(ctdb->ev, da_ctx, timeval_current_ofs(ctdb->tunable.deferred_attach_timeout, 0), ctdb_deferred_attach_timeout, da_ctx);
1103 DEBUG(DEBUG_ERR,("DB Attach to database %s deferred for client with pid:%d since node is in recovery mode.\n", db_name, client->pid));
1104 *async_reply = true;
1109 /* the client can optionally pass additional tdb flags, but we
1110 only allow a subset of those on the database in ctdb. Note
1111 that tdb_flags is passed in via the (otherwise unused)
1112 srvid to the attach control */
1113 tdb_flags &= (TDB_NOSYNC|TDB_INCOMPATIBLE_HASH);
1115 /* see if we already have this name */
1116 db = ctdb_db_handle(ctdb, db_name);
1118 if (db->persistent != persistent) {
1119 DEBUG(DEBUG_ERR, ("ERROR: DB Attach %spersistent to %spersistent "
1120 "database %s", persistent ? "" : "non-",
1121 db-> persistent ? "" : "non-", db_name));
1124 outdata->dptr = (uint8_t *)&db->db_id;
1125 outdata->dsize = sizeof(db->db_id);
1126 tdb_add_flags(db->ltdb->tdb, tdb_flags);
1130 if (ctdb_local_attach(ctdb, db_name, persistent, NULL, (tdb_flags&TDB_INCOMPATIBLE_HASH)?true:false) != 0) {
1134 db = ctdb_db_handle(ctdb, db_name);
1136 DEBUG(DEBUG_ERR,("Failed to find db handle for name '%s'\n", db_name));
1140 /* remember the flags the client has specified */
1141 tdb_add_flags(db->ltdb->tdb, tdb_flags);
1143 outdata->dptr = (uint8_t *)&db->db_id;
1144 outdata->dsize = sizeof(db->db_id);
1146 /* Try to ensure it's locked in mem */
1147 ctdb_lockdown_memory(ctdb);
1149 /* tell all the other nodes about this database */
1150 ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL, tdb_flags,
1151 persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:
1152 CTDB_CONTROL_DB_ATTACH,
1153 0, CTDB_CTRL_FLAG_NOREPLY,
1154 indata, NULL, NULL);
1162 attach to all existing persistent databases
1164 static int ctdb_attach_persistent(struct ctdb_context *ctdb,
1165 const char *unhealthy_reason)
1170 /* open the persistent db directory and scan it for files */
1171 d = opendir(ctdb->db_directory_persistent);
1176 while ((de=readdir(d))) {
1178 size_t len = strlen(de->d_name);
1180 int invalid_name = 0;
1182 s = talloc_strdup(ctdb, de->d_name);
1183 CTDB_NO_MEMORY(ctdb, s);
1185 /* only accept names ending in .tdb */
1186 p = strstr(s, ".tdb.");
1187 if (len < 7 || p == NULL) {
1192 /* only accept names ending with .tdb. and any number of digits */
1194 while (*q != 0 && invalid_name == 0) {
1195 if (!isdigit(*q++)) {
1199 if (invalid_name == 1 || sscanf(p+5, "%u", &node) != 1 || node != ctdb->pnn) {
1200 DEBUG(DEBUG_ERR,("Ignoring persistent database '%s'\n", de->d_name));
1206 if (ctdb_local_attach(ctdb, s, true, unhealthy_reason, 0) != 0) {
1207 DEBUG(DEBUG_ERR,("Failed to attach to persistent database '%s'\n", de->d_name));
1213 DEBUG(DEBUG_INFO,("Attached to persistent database %s\n", s));
1221 int ctdb_attach_databases(struct ctdb_context *ctdb)
1224 char *persistent_health_path = NULL;
1225 char *unhealthy_reason = NULL;
1226 bool first_try = true;
1228 if (ctdb->db_directory == NULL) {
1229 ctdb->db_directory = VARDIR "/ctdb";
1231 if (ctdb->db_directory_persistent == NULL) {
1232 ctdb->db_directory_persistent = VARDIR "/ctdb/persistent";
1234 if (ctdb->db_directory_state == NULL) {
1235 ctdb->db_directory_state = VARDIR "/ctdb/state";
1238 /* make sure the db directory exists */
1239 ret = mkdir(ctdb->db_directory, 0700);
1240 if (ret == -1 && errno != EEXIST) {
1241 DEBUG(DEBUG_CRIT,(__location__ " Unable to create ctdb directory '%s'\n",
1242 ctdb->db_directory));
1246 /* make sure the persistent db directory exists */
1247 ret = mkdir(ctdb->db_directory_persistent, 0700);
1248 if (ret == -1 && errno != EEXIST) {
1249 DEBUG(DEBUG_CRIT,(__location__ " Unable to create ctdb persistent directory '%s'\n",
1250 ctdb->db_directory_persistent));
1254 /* make sure the internal state db directory exists */
1255 ret = mkdir(ctdb->db_directory_state, 0700);
1256 if (ret == -1 && errno != EEXIST) {
1257 DEBUG(DEBUG_CRIT,(__location__ " Unable to create ctdb state directory '%s'\n",
1258 ctdb->db_directory_state));
1262 persistent_health_path = talloc_asprintf(ctdb, "%s/%s.%u",
1263 ctdb->db_directory_state,
1264 PERSISTENT_HEALTH_TDB,
1266 if (persistent_health_path == NULL) {
1267 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
1273 ctdb->db_persistent_health = tdb_wrap_open(ctdb, persistent_health_path,
1274 0, TDB_DISALLOW_NESTING,
1275 O_CREAT | O_RDWR, 0600);
1276 if (ctdb->db_persistent_health == NULL) {
1277 struct tdb_wrap *tdb;
1280 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s': %d - %s\n",
1281 persistent_health_path,
1284 talloc_free(persistent_health_path);
1285 talloc_free(unhealthy_reason);
1290 unhealthy_reason = talloc_asprintf(ctdb, "WARNING - '%s' %s - %s",
1291 persistent_health_path,
1292 "was cleared after a failure",
1293 "manual verification needed");
1294 if (unhealthy_reason == NULL) {
1295 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
1296 talloc_free(persistent_health_path);
1300 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - retrying after CLEAR_IF_FIRST\n",
1301 persistent_health_path));
1302 tdb = tdb_wrap_open(ctdb, persistent_health_path,
1303 0, TDB_CLEAR_IF_FIRST | TDB_DISALLOW_NESTING,
1304 O_CREAT | O_RDWR, 0600);
1306 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - with CLEAR_IF_FIRST: %d - %s\n",
1307 persistent_health_path,
1310 talloc_free(persistent_health_path);
1311 talloc_free(unhealthy_reason);
1318 ret = tdb_check(ctdb->db_persistent_health->tdb, NULL, NULL);
1320 struct tdb_wrap *tdb;
1322 talloc_free(ctdb->db_persistent_health);
1323 ctdb->db_persistent_health = NULL;
1326 DEBUG(DEBUG_CRIT,("tdb_check('%s') failed\n",
1327 persistent_health_path));
1328 talloc_free(persistent_health_path);
1329 talloc_free(unhealthy_reason);
1334 unhealthy_reason = talloc_asprintf(ctdb, "WARNING - '%s' %s - %s",
1335 persistent_health_path,
1336 "was cleared after a failure",
1337 "manual verification needed");
1338 if (unhealthy_reason == NULL) {
1339 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
1340 talloc_free(persistent_health_path);
1344 DEBUG(DEBUG_CRIT,("tdb_check('%s') failed - retrying after CLEAR_IF_FIRST\n",
1345 persistent_health_path));
1346 tdb = tdb_wrap_open(ctdb, persistent_health_path,
1347 0, TDB_CLEAR_IF_FIRST | TDB_DISALLOW_NESTING,
1348 O_CREAT | O_RDWR, 0600);
1350 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - with CLEAR_IF_FIRST: %d - %s\n",
1351 persistent_health_path,
1354 talloc_free(persistent_health_path);
1355 talloc_free(unhealthy_reason);
1362 talloc_free(persistent_health_path);
1364 ret = ctdb_attach_persistent(ctdb, unhealthy_reason);
1365 talloc_free(unhealthy_reason);
1374 called when a broadcast seqnum update comes in
1376 int32_t ctdb_ltdb_update_seqnum(struct ctdb_context *ctdb, uint32_t db_id, uint32_t srcnode)
1378 struct ctdb_db_context *ctdb_db;
1379 if (srcnode == ctdb->pnn) {
1380 /* don't update ourselves! */
1384 ctdb_db = find_ctdb_db(ctdb, db_id);
1386 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_ltdb_update_seqnum\n", db_id));
1390 if (ctdb_db->unhealthy_reason) {
1391 DEBUG(DEBUG_ERR,("db(%s) unhealty in ctdb_ltdb_update_seqnum: %s\n",
1392 ctdb_db->db_name, ctdb_db->unhealthy_reason));
1396 tdb_increment_seqnum_nonblock(ctdb_db->ltdb->tdb);
1397 ctdb_db->seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1402 timer to check for seqnum changes in a ltdb and propogate them
1404 static void ctdb_ltdb_seqnum_check(struct event_context *ev, struct timed_event *te,
1405 struct timeval t, void *p)
1407 struct ctdb_db_context *ctdb_db = talloc_get_type(p, struct ctdb_db_context);
1408 struct ctdb_context *ctdb = ctdb_db->ctdb;
1409 uint32_t new_seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1410 if (new_seqnum != ctdb_db->seqnum) {
1411 /* something has changed - propogate it */
1413 data.dptr = (uint8_t *)&ctdb_db->db_id;
1414 data.dsize = sizeof(uint32_t);
1415 ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_VNNMAP, 0,
1416 CTDB_CONTROL_UPDATE_SEQNUM, 0, CTDB_CTRL_FLAG_NOREPLY,
1419 ctdb_db->seqnum = new_seqnum;
1421 /* setup a new timer */
1422 ctdb_db->seqnum_update =
1423 event_add_timed(ctdb->ev, ctdb_db,
1424 timeval_current_ofs(ctdb->tunable.seqnum_interval/1000, (ctdb->tunable.seqnum_interval%1000)*1000),
1425 ctdb_ltdb_seqnum_check, ctdb_db);
1429 enable seqnum handling on this db
1431 int32_t ctdb_ltdb_enable_seqnum(struct ctdb_context *ctdb, uint32_t db_id)
1433 struct ctdb_db_context *ctdb_db;
1434 ctdb_db = find_ctdb_db(ctdb, db_id);
1436 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_ltdb_enable_seqnum\n", db_id));
1440 if (ctdb_db->seqnum_update == NULL) {
1441 ctdb_db->seqnum_update =
1442 event_add_timed(ctdb->ev, ctdb_db,
1443 timeval_current_ofs(ctdb->tunable.seqnum_interval/1000, (ctdb->tunable.seqnum_interval%1000)*1000),
1444 ctdb_ltdb_seqnum_check, ctdb_db);
1447 tdb_enable_seqnum(ctdb_db->ltdb->tdb);
1448 ctdb_db->seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1452 int32_t ctdb_control_set_db_priority(struct ctdb_context *ctdb, TDB_DATA indata)
1454 struct ctdb_db_priority *db_prio = (struct ctdb_db_priority *)indata.dptr;
1455 struct ctdb_db_context *ctdb_db;
1457 ctdb_db = find_ctdb_db(ctdb, db_prio->db_id);
1459 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_set_db_priority\n", db_prio->db_id));
1463 if ((db_prio->priority<1) || (db_prio->priority>NUM_DB_PRIORITIES)) {
1464 DEBUG(DEBUG_ERR,("Trying to set invalid priority : %u\n", db_prio->priority));
1468 ctdb_db->priority = db_prio->priority;
1469 DEBUG(DEBUG_INFO,("Setting DB priority to %u for db 0x%08x\n", db_prio->priority, db_prio->db_id));
1475 int ctdb_set_db_sticky(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db)
1478 DEBUG(DEBUG_NOTICE,("set db sticky %s\n", ctdb_db->db_name));
1480 if (ctdb_db->sticky) {
1484 if (ctdb_db->persistent) {
1485 DEBUG(DEBUG_ERR,("Trying to set persistent database with sticky property\n"));
1489 ctdb_db->sticky_records = trbt_create(ctdb_db, 0);
1491 ctdb_db->sticky = true;
1496 int32_t ctdb_control_get_db_statistics(struct ctdb_context *ctdb,
1500 struct ctdb_db_context *ctdb_db;
1501 struct ctdb_db_statistics_wire *stats;
1506 ctdb_db = find_ctdb_db(ctdb, db_id);
1508 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in get_db_statistics\n", db_id));
1512 len = offsetof(struct ctdb_db_statistics_wire, hot_keys);
1513 for (i = 0; i < MAX_HOT_KEYS; i++) {
1514 len += 8 + ctdb_db->statistics.hot_keys[i].key.dsize;
1517 stats = talloc_size(outdata, len);
1518 if (stats == NULL) {
1519 DEBUG(DEBUG_ERR,("Failed to allocate db statistics wire structure\n"));
1523 stats->db_ro_delegations = ctdb_db->statistics.db_ro_delegations;
1524 stats->db_ro_revokes = ctdb_db->statistics.db_ro_revokes;
1525 for (i = 0; i < MAX_COUNT_BUCKETS; i++) {
1526 stats->hop_count_bucket[i] = ctdb_db->statistics.hop_count_bucket[i];
1528 stats->num_hot_keys = MAX_HOT_KEYS;
1530 ptr = &stats->hot_keys[0];
1531 for (i = 0; i < MAX_HOT_KEYS; i++) {
1532 *(uint32_t *)ptr = ctdb_db->statistics.hot_keys[i].count;
1535 *(uint32_t *)ptr = ctdb_db->statistics.hot_keys[i].key.dsize;
1538 memcpy(ptr, ctdb_db->statistics.hot_keys[i].key.dptr, ctdb_db->statistics.hot_keys[i].key.dsize);
1539 ptr += ctdb_db->statistics.hot_keys[i].key.dsize;
1542 outdata->dptr = (uint8_t *)stats;
1543 outdata->dsize = len;