2 ctdb ltdb code - server side
4 Copyright (C) Andrew Tridgell 2007
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program; if not, see <http://www.gnu.org/licenses/>.
21 #include "lib/tevent/tevent.h"
22 #include "lib/tdb/include/tdb.h"
23 #include "system/network.h"
24 #include "system/filesys.h"
25 #include "system/dir.h"
26 #include "system/time.h"
27 #include "../include/ctdb_private.h"
28 #include "../common/rb_tree.h"
30 #include "lib/util/dlinklist.h"
33 #define PERSISTENT_HEALTH_TDB "persistent_health.tdb"
36 this is the dummy null procedure that all databases support
38 static int ctdb_null_func(struct ctdb_call_info *call)
44 this is a plain fetch procedure that all databases support
46 static int ctdb_fetch_func(struct ctdb_call_info *call)
48 call->reply_data = &call->record_data;
53 this is a plain fetch procedure that all databases support
54 this returns the full record including the ltdb header
56 static int ctdb_fetch_with_header_func(struct ctdb_call_info *call)
58 call->reply_data = talloc(call, TDB_DATA);
59 if (call->reply_data == NULL) {
62 call->reply_data->dsize = sizeof(struct ctdb_ltdb_header) + call->record_data.dsize;
63 call->reply_data->dptr = talloc_size(call->reply_data, call->reply_data->dsize);
64 if (call->reply_data->dptr == NULL) {
67 memcpy(call->reply_data->dptr, call->header, sizeof(struct ctdb_ltdb_header));
68 memcpy(&call->reply_data->dptr[sizeof(struct ctdb_ltdb_header)], call->record_data.dptr, call->record_data.dsize);
75 * write a record to a normal database
77 * This is the server-variant of the ctdb_ltdb_store function.
78 * It contains logic to determine whether a record should be
79 * stored or deleted. It also sends SCHEDULE_FOR_DELETION
80 * controls to the local ctdb daemon if apporpriate.
82 static int ctdb_ltdb_store_server(struct ctdb_db_context *ctdb_db,
84 struct ctdb_ltdb_header *header,
87 struct ctdb_context *ctdb = ctdb_db->ctdb;
90 bool seqnum_suppressed = false;
92 bool schedule_for_deletion = false;
95 if (ctdb->flags & CTDB_FLAG_TORTURE) {
96 struct ctdb_ltdb_header *h2;
97 rec = tdb_fetch(ctdb_db->ltdb->tdb, key);
98 h2 = (struct ctdb_ltdb_header *)rec.dptr;
99 if (rec.dptr && rec.dsize >= sizeof(h2) && h2->rsn > header->rsn) {
100 DEBUG(DEBUG_CRIT,("RSN regression! %llu %llu\n",
101 (unsigned long long)h2->rsn, (unsigned long long)header->rsn));
103 if (rec.dptr) free(rec.dptr);
106 if (ctdb->vnn_map == NULL) {
108 * Called from a client: always store the record
109 * Also don't call ctdb_lmaster since it uses the vnn_map!
115 lmaster = ctdb_lmaster(ctdb_db->ctdb, &key);
118 * If we migrate an empty record off to another node
119 * and the record has not been migrated with data,
120 * delete the record instead of storing the empty record.
122 if (data.dsize != 0) {
124 } else if (ctdb_db->persistent) {
126 } else if (header->flags & CTDB_REC_FLAG_AUTOMATIC) {
128 * The record is not created by the client but
129 * automatically by the ctdb_ltdb_fetch logic that
130 * creates a record with an initial header in the
131 * ltdb before trying to migrate the record from
132 * the current lmaster. Keep it instead of trying
133 * to delete the non-existing record...
136 schedule_for_deletion = true;
137 } else if (header->flags & CTDB_REC_FLAG_MIGRATED_WITH_DATA) {
139 } else if (ctdb_db->ctdb->pnn == lmaster) {
141 * If we are lmaster, then we usually keep the record.
142 * But if we retrieve the dmaster role by a VACUUM_MIGRATE
143 * and the record is empty and has never been migrated
144 * with data, then we should delete it instead of storing it.
145 * This is part of the vacuuming process.
147 * The reason that we usually need to store even empty records
148 * on the lmaster is that a client operating directly on the
149 * lmaster (== dmaster) expects the local copy of the record to
150 * exist after successful ctdb migrate call. If the record does
151 * not exist, the client goes into a migrate loop and eventually
152 * fails. So storing the empty record makes sure that we do not
153 * need to change the client code.
155 if (!(header->flags & CTDB_REC_FLAG_VACUUM_MIGRATED)) {
157 } else if (ctdb_db->ctdb->pnn != header->dmaster) {
160 } else if (ctdb_db->ctdb->pnn == header->dmaster) {
166 !ctdb_db->persistent &&
167 (ctdb_db->ctdb->pnn == header->dmaster))
169 schedule_for_deletion = true;
174 * The VACUUM_MIGRATED flag is only set temporarily for
175 * the above logic when the record was retrieved by a
176 * VACUUM_MIGRATE call and should not be stored in the
179 * The VACUUM_MIGRATE call is triggered by a vacuum fetch,
180 * and there are two cases in which the corresponding record
181 * is stored in the local database:
182 * 1. The record has been migrated with data in the past
183 * (the MIGRATED_WITH_DATA record flag is set).
184 * 2. The record has been filled with data again since it
185 * had been submitted in the VACUUM_FETCH message to the
187 * For such records it is important to not store the
188 * VACUUM_MIGRATED flag in the database.
190 header->flags &= ~CTDB_REC_FLAG_VACUUM_MIGRATED;
193 * Similarly, clear the AUTOMATIC flag which should not enter
194 * the local database copy since this would require client
195 * modifications to clear the flag when the client stores
198 header->flags &= ~CTDB_REC_FLAG_AUTOMATIC;
200 rec.dsize = sizeof(*header) + data.dsize;
201 rec.dptr = talloc_size(ctdb, rec.dsize);
202 CTDB_NO_MEMORY(ctdb, rec.dptr);
204 memcpy(rec.dptr, header, sizeof(*header));
205 memcpy(rec.dptr + sizeof(*header), data.dptr, data.dsize);
207 /* Databases with seqnum updates enabled only get their seqnum
208 changes when/if we modify the data */
209 if (ctdb_db->seqnum_update != NULL) {
211 old = tdb_fetch(ctdb_db->ltdb->tdb, key);
213 if ( (old.dsize == rec.dsize)
214 && !memcmp(old.dptr+sizeof(struct ctdb_ltdb_header),
215 rec.dptr+sizeof(struct ctdb_ltdb_header),
216 rec.dsize-sizeof(struct ctdb_ltdb_header)) ) {
217 tdb_remove_flags(ctdb_db->ltdb->tdb, TDB_SEQNUM);
218 seqnum_suppressed = true;
220 if (old.dptr) free(old.dptr);
223 DEBUG(DEBUG_DEBUG, (__location__ " db[%s]: %s record: hash[0x%08x]\n",
225 keep?"storing":"deleting",
229 ret = tdb_store(ctdb_db->ltdb->tdb, key, rec, TDB_REPLACE);
231 ret = tdb_delete(ctdb_db->ltdb->tdb, key);
238 tdb_error(ctdb_db->ltdb->tdb) == TDB_ERR_NOEXIST)
243 DEBUG(lvl, (__location__ " db[%s]: Failed to %s record: "
246 keep?"store":"delete", ret,
247 tdb_errorstr(ctdb_db->ltdb->tdb)));
249 schedule_for_deletion = false;
251 if (seqnum_suppressed) {
252 tdb_add_flags(ctdb_db->ltdb->tdb, TDB_SEQNUM);
255 talloc_free(rec.dptr);
257 if (schedule_for_deletion) {
259 ret2 = ctdb_local_schedule_for_deletion(ctdb_db, header, key);
261 DEBUG(DEBUG_ERR, (__location__ " ctdb_local_schedule_for_deletion failed.\n"));
268 struct lock_fetch_state {
269 struct ctdb_context *ctdb;
270 void (*recv_pkt)(void *, struct ctdb_req_header *);
272 struct ctdb_req_header *hdr;
274 bool ignore_generation;
278 called when we should retry the operation
280 static void lock_fetch_callback(void *p)
282 struct lock_fetch_state *state = talloc_get_type(p, struct lock_fetch_state);
283 if (!state->ignore_generation &&
284 state->generation != state->ctdb->vnn_map->generation) {
285 DEBUG(DEBUG_NOTICE,("Discarding previous generation lockwait packet\n"));
286 talloc_free(state->hdr);
289 state->recv_pkt(state->recv_context, state->hdr);
290 DEBUG(DEBUG_INFO,(__location__ " PACKET REQUEUED\n"));
295 do a non-blocking ltdb_lock, deferring this ctdb request until we
298 It does the following:
300 1) tries to get the chainlock. If it succeeds, then it returns 0
302 2) if it fails to get a chainlock immediately then it sets up a
303 non-blocking chainlock via ctdb_lockwait, and when it gets the
304 chainlock it re-submits this ctdb request to the main packet
307 This effectively queues all ctdb requests that cannot be
308 immediately satisfied until it can get the lock. This means that
309 the main ctdb daemon will not block waiting for a chainlock held by
312 There are 3 possible return values:
314 0: means that it got the lock immediately.
315 -1: means that it failed to get the lock, and won't retry
316 -2: means that it failed to get the lock immediately, but will retry
318 int ctdb_ltdb_lock_requeue(struct ctdb_db_context *ctdb_db,
319 TDB_DATA key, struct ctdb_req_header *hdr,
320 void (*recv_pkt)(void *, struct ctdb_req_header *),
321 void *recv_context, bool ignore_generation)
324 struct tdb_context *tdb = ctdb_db->ltdb->tdb;
325 struct lockwait_handle *h;
326 struct lock_fetch_state *state;
328 ret = tdb_chainlock_nonblock(tdb, key);
331 !(errno == EACCES || errno == EAGAIN || errno == EDEADLK)) {
332 /* a hard failure - don't try again */
336 /* when torturing, ensure we test the contended path */
337 if ((ctdb_db->ctdb->flags & CTDB_FLAG_TORTURE) &&
340 tdb_chainunlock(tdb, key);
343 /* first the non-contended path */
348 state = talloc(hdr, struct lock_fetch_state);
349 state->ctdb = ctdb_db->ctdb;
351 state->recv_pkt = recv_pkt;
352 state->recv_context = recv_context;
353 state->generation = ctdb_db->ctdb->vnn_map->generation;
354 state->ignore_generation = ignore_generation;
356 /* now the contended path */
357 h = ctdb_lockwait(ctdb_db, key, lock_fetch_callback, state);
362 /* we need to move the packet off the temporary context in ctdb_input_pkt(),
363 so it won't be freed yet */
364 talloc_steal(state, hdr);
365 talloc_steal(state, h);
367 /* now tell the caller than we will retry asynchronously */
372 a varient of ctdb_ltdb_lock_requeue that also fetches the record
374 int ctdb_ltdb_lock_fetch_requeue(struct ctdb_db_context *ctdb_db,
375 TDB_DATA key, struct ctdb_ltdb_header *header,
376 struct ctdb_req_header *hdr, TDB_DATA *data,
377 void (*recv_pkt)(void *, struct ctdb_req_header *),
378 void *recv_context, bool ignore_generation)
382 ret = ctdb_ltdb_lock_requeue(ctdb_db, key, hdr, recv_pkt,
383 recv_context, ignore_generation);
385 ret = ctdb_ltdb_fetch(ctdb_db, key, header, hdr, data);
388 uret = ctdb_ltdb_unlock(ctdb_db, key);
390 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", uret));
399 paraoid check to see if the db is empty
401 static void ctdb_check_db_empty(struct ctdb_db_context *ctdb_db)
403 struct tdb_context *tdb = ctdb_db->ltdb->tdb;
404 int count = tdb_traverse_read(tdb, NULL, NULL);
406 DEBUG(DEBUG_ALERT,(__location__ " tdb '%s' not empty on attach! aborting\n",
408 ctdb_fatal(ctdb_db->ctdb, "database not empty on attach");
412 int ctdb_load_persistent_health(struct ctdb_context *ctdb,
413 struct ctdb_db_context *ctdb_db)
415 struct tdb_context *tdb = ctdb->db_persistent_health->tdb;
421 key.dptr = discard_const_p(uint8_t, ctdb_db->db_name);
422 key.dsize = strlen(ctdb_db->db_name);
424 old = ctdb_db->unhealthy_reason;
425 ctdb_db->unhealthy_reason = NULL;
427 val = tdb_fetch(tdb, key);
429 reason = talloc_strndup(ctdb_db,
430 (const char *)val.dptr,
432 if (reason == NULL) {
433 DEBUG(DEBUG_ALERT,(__location__ " talloc_strndup(%d) failed\n",
435 ctdb_db->unhealthy_reason = old;
446 ctdb_db->unhealthy_reason = reason;
450 int ctdb_update_persistent_health(struct ctdb_context *ctdb,
451 struct ctdb_db_context *ctdb_db,
452 const char *given_reason,/* NULL means healthy */
453 int num_healthy_nodes)
455 struct tdb_context *tdb = ctdb->db_persistent_health->tdb;
459 char *new_reason = NULL;
460 char *old_reason = NULL;
462 ret = tdb_transaction_start(tdb);
464 DEBUG(DEBUG_ALERT,(__location__ " tdb_transaction_start('%s') failed: %d - %s\n",
465 tdb_name(tdb), ret, tdb_errorstr(tdb)));
469 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
471 DEBUG(DEBUG_ALERT,(__location__ " ctdb_load_persistent_health('%s') failed: %d\n",
472 ctdb_db->db_name, ret));
475 old_reason = ctdb_db->unhealthy_reason;
477 key.dptr = discard_const_p(uint8_t, ctdb_db->db_name);
478 key.dsize = strlen(ctdb_db->db_name);
481 new_reason = talloc_strdup(ctdb_db, given_reason);
482 if (new_reason == NULL) {
483 DEBUG(DEBUG_ALERT,(__location__ " talloc_strdup(%s) failed\n",
487 } else if (old_reason && num_healthy_nodes == 0) {
489 * If the reason indicates ok, but there where no healthy nodes
490 * available, that it means, we have not recovered valid content
491 * of the db. So if there's an old reason, prefix it with
492 * "NO-HEALTHY-NODES - "
496 #define _TMP_PREFIX "NO-HEALTHY-NODES - "
497 ret = strncmp(_TMP_PREFIX, old_reason, strlen(_TMP_PREFIX));
499 prefix = _TMP_PREFIX;
503 new_reason = talloc_asprintf(ctdb_db, "%s%s",
505 if (new_reason == NULL) {
506 DEBUG(DEBUG_ALERT,(__location__ " talloc_asprintf(%s%s) failed\n",
507 prefix, old_reason));
514 val.dptr = discard_const_p(uint8_t, new_reason);
515 val.dsize = strlen(new_reason);
517 ret = tdb_store(tdb, key, val, TDB_REPLACE);
519 tdb_transaction_cancel(tdb);
520 DEBUG(DEBUG_ALERT,(__location__ " tdb_store('%s', %s, %s) failed: %d - %s\n",
521 tdb_name(tdb), ctdb_db->db_name, new_reason,
522 ret, tdb_errorstr(tdb)));
523 talloc_free(new_reason);
526 DEBUG(DEBUG_ALERT,("Updated db health for db(%s) to: %s\n",
527 ctdb_db->db_name, new_reason));
528 } else if (old_reason) {
529 ret = tdb_delete(tdb, key);
531 tdb_transaction_cancel(tdb);
532 DEBUG(DEBUG_ALERT,(__location__ " tdb_delete('%s', %s) failed: %d - %s\n",
533 tdb_name(tdb), ctdb_db->db_name,
534 ret, tdb_errorstr(tdb)));
535 talloc_free(new_reason);
538 DEBUG(DEBUG_NOTICE,("Updated db health for db(%s): OK\n",
542 ret = tdb_transaction_commit(tdb);
543 if (ret != TDB_SUCCESS) {
544 DEBUG(DEBUG_ALERT,(__location__ " tdb_transaction_commit('%s') failed: %d - %s\n",
545 tdb_name(tdb), ret, tdb_errorstr(tdb)));
546 talloc_free(new_reason);
550 talloc_free(old_reason);
551 ctdb_db->unhealthy_reason = new_reason;
556 static int ctdb_backup_corrupted_tdb(struct ctdb_context *ctdb,
557 struct ctdb_db_context *ctdb_db)
559 time_t now = time(NULL);
567 /* formatted like: foo.tdb.0.corrupted.20091204160825.0Z */
568 new_path = talloc_asprintf(ctdb_db, "%s.corrupted."
569 "%04u%02u%02u%02u%02u%02u.0Z",
571 tm->tm_year+1900, tm->tm_mon+1,
572 tm->tm_mday, tm->tm_hour, tm->tm_min,
574 if (new_path == NULL) {
575 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
579 new_reason = talloc_asprintf(ctdb_db,
580 "ERROR - Backup of corrupted TDB in '%s'",
582 if (new_reason == NULL) {
583 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
586 ret = ctdb_update_persistent_health(ctdb, ctdb_db, new_reason, 0);
587 talloc_free(new_reason);
589 DEBUG(DEBUG_CRIT,(__location__
590 ": ctdb_backup_corrupted_tdb(%s) not implemented yet\n",
595 ret = rename(ctdb_db->db_path, new_path);
597 DEBUG(DEBUG_CRIT,(__location__
598 ": ctdb_backup_corrupted_tdb(%s) rename to %s failed: %d - %s\n",
599 ctdb_db->db_path, new_path,
600 errno, strerror(errno)));
601 talloc_free(new_path);
605 DEBUG(DEBUG_CRIT,(__location__
606 ": ctdb_backup_corrupted_tdb(%s) renamed to %s\n",
607 ctdb_db->db_path, new_path));
608 talloc_free(new_path);
612 int ctdb_recheck_persistent_health(struct ctdb_context *ctdb)
614 struct ctdb_db_context *ctdb_db;
619 for (ctdb_db = ctdb->db_list; ctdb_db; ctdb_db = ctdb_db->next) {
620 if (!ctdb_db->persistent) {
624 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
626 DEBUG(DEBUG_ALERT,(__location__
627 " load persistent health for '%s' failed\n",
632 if (ctdb_db->unhealthy_reason == NULL) {
634 DEBUG(DEBUG_INFO,(__location__
635 " persistent db '%s' healthy\n",
641 DEBUG(DEBUG_ALERT,(__location__
642 " persistent db '%s' unhealthy: %s\n",
644 ctdb_db->unhealthy_reason));
646 DEBUG((fail!=0)?DEBUG_ALERT:DEBUG_NOTICE,
647 ("ctdb_recheck_presistent_health: OK[%d] FAIL[%d]\n",
659 mark a database - as healthy
661 int32_t ctdb_control_db_set_healthy(struct ctdb_context *ctdb, TDB_DATA indata)
663 uint32_t db_id = *(uint32_t *)indata.dptr;
664 struct ctdb_db_context *ctdb_db;
666 bool may_recover = false;
668 ctdb_db = find_ctdb_db(ctdb, db_id);
670 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%x\n", db_id));
674 if (ctdb_db->unhealthy_reason) {
678 ret = ctdb_update_persistent_health(ctdb, ctdb_db, NULL, 1);
680 DEBUG(DEBUG_ERR,(__location__
681 " ctdb_update_persistent_health(%s) failed\n",
686 if (may_recover && !ctdb->done_startup) {
687 DEBUG(DEBUG_ERR, (__location__ " db %s become healthy - force recovery for startup\n",
689 ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
695 int32_t ctdb_control_db_get_health(struct ctdb_context *ctdb,
699 uint32_t db_id = *(uint32_t *)indata.dptr;
700 struct ctdb_db_context *ctdb_db;
703 ctdb_db = find_ctdb_db(ctdb, db_id);
705 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%x\n", db_id));
709 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
711 DEBUG(DEBUG_ERR,(__location__
712 " ctdb_load_persistent_health(%s) failed\n",
718 if (ctdb_db->unhealthy_reason) {
719 outdata->dptr = (uint8_t *)ctdb_db->unhealthy_reason;
720 outdata->dsize = strlen(ctdb_db->unhealthy_reason)+1;
727 attach to a database, handling both persistent and non-persistent databases
728 return 0 on success, -1 on failure
730 static int ctdb_local_attach(struct ctdb_context *ctdb, const char *db_name,
731 bool persistent, const char *unhealthy_reason,
734 struct ctdb_db_context *ctdb_db, *tmp_db;
739 int remaining_tries = 0;
741 ctdb_db = talloc_zero(ctdb, struct ctdb_db_context);
742 CTDB_NO_MEMORY(ctdb, ctdb_db);
744 ctdb_db->priority = 1;
745 ctdb_db->ctdb = ctdb;
746 ctdb_db->db_name = talloc_strdup(ctdb_db, db_name);
747 CTDB_NO_MEMORY(ctdb, ctdb_db->db_name);
749 key.dsize = strlen(db_name)+1;
750 key.dptr = discard_const(db_name);
751 ctdb_db->db_id = ctdb_hash(&key);
752 ctdb_db->persistent = persistent;
754 if (!ctdb_db->persistent) {
755 ctdb_db->delete_queue = trbt_create(ctdb_db, 0);
756 if (ctdb_db->delete_queue == NULL) {
757 CTDB_NO_MEMORY(ctdb, ctdb_db->delete_queue);
760 ctdb_db->ctdb_ltdb_store_fn = ctdb_ltdb_store_server;
763 /* check for hash collisions */
764 for (tmp_db=ctdb->db_list;tmp_db;tmp_db=tmp_db->next) {
765 if (tmp_db->db_id == ctdb_db->db_id) {
766 DEBUG(DEBUG_CRIT,("db_id 0x%x hash collision. name1='%s' name2='%s'\n",
767 tmp_db->db_id, db_name, tmp_db->db_name));
768 talloc_free(ctdb_db);
774 if (unhealthy_reason) {
775 ret = ctdb_update_persistent_health(ctdb, ctdb_db,
776 unhealthy_reason, 0);
778 DEBUG(DEBUG_ALERT,(__location__ " ctdb_update_persistent_health('%s','%s') failed: %d\n",
779 ctdb_db->db_name, unhealthy_reason, ret));
780 talloc_free(ctdb_db);
785 if (ctdb->max_persistent_check_errors > 0) {
788 if (ctdb->done_startup) {
792 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
794 DEBUG(DEBUG_ALERT,(__location__ " ctdb_load_persistent_health('%s') failed: %d\n",
795 ctdb_db->db_name, ret));
796 talloc_free(ctdb_db);
801 if (ctdb_db->unhealthy_reason && remaining_tries == 0) {
802 DEBUG(DEBUG_ALERT,(__location__ "ERROR: tdb %s is marked as unhealthy: %s\n",
803 ctdb_db->db_name, ctdb_db->unhealthy_reason));
804 talloc_free(ctdb_db);
808 if (ctdb_db->unhealthy_reason) {
809 /* this is just a warning, but we want that in the log file! */
810 DEBUG(DEBUG_ALERT,(__location__ "Warning: tdb %s is marked as unhealthy: %s\n",
811 ctdb_db->db_name, ctdb_db->unhealthy_reason));
814 /* open the database */
815 ctdb_db->db_path = talloc_asprintf(ctdb_db, "%s/%s.%u",
816 persistent?ctdb->db_directory_persistent:ctdb->db_directory,
819 tdb_flags = persistent? TDB_DEFAULT : TDB_CLEAR_IF_FIRST | TDB_NOSYNC;
820 if (ctdb->valgrinding) {
821 tdb_flags |= TDB_NOMMAP;
823 tdb_flags |= TDB_DISALLOW_NESTING;
825 tdb_flags |= TDB_INCOMPATIBLE_HASH;
829 ctdb_db->ltdb = tdb_wrap_open(ctdb, ctdb_db->db_path,
830 ctdb->tunable.database_hash_size,
832 O_CREAT|O_RDWR, mode);
833 if (ctdb_db->ltdb == NULL) {
835 int saved_errno = errno;
838 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s': %d - %s\n",
841 strerror(saved_errno)));
842 talloc_free(ctdb_db);
846 if (remaining_tries == 0) {
847 DEBUG(DEBUG_CRIT,(__location__
848 "Failed to open persistent tdb '%s': %d - %s\n",
851 strerror(saved_errno)));
852 talloc_free(ctdb_db);
856 ret = stat(ctdb_db->db_path, &st);
858 DEBUG(DEBUG_CRIT,(__location__
859 "Failed to open persistent tdb '%s': %d - %s\n",
862 strerror(saved_errno)));
863 talloc_free(ctdb_db);
867 ret = ctdb_backup_corrupted_tdb(ctdb, ctdb_db);
869 DEBUG(DEBUG_CRIT,(__location__
870 "Failed to open persistent tdb '%s': %d - %s\n",
873 strerror(saved_errno)));
874 talloc_free(ctdb_db);
884 ctdb_check_db_empty(ctdb_db);
886 ret = tdb_check(ctdb_db->ltdb->tdb, NULL, NULL);
891 DEBUG(DEBUG_CRIT,("tdb_check(%s) failed: %d - %s\n",
892 ctdb_db->db_path, ret,
893 tdb_errorstr(ctdb_db->ltdb->tdb)));
894 if (remaining_tries == 0) {
895 talloc_free(ctdb_db);
899 fd = tdb_fd(ctdb_db->ltdb->tdb);
900 ret = fstat(fd, &st);
902 DEBUG(DEBUG_CRIT,(__location__
903 "Failed to fstat() persistent tdb '%s': %d - %s\n",
907 talloc_free(ctdb_db);
912 talloc_free(ctdb_db->ltdb);
913 ctdb_db->ltdb = NULL;
915 ret = ctdb_backup_corrupted_tdb(ctdb, ctdb_db);
917 DEBUG(DEBUG_CRIT,("Failed to backup corrupted tdb '%s'\n",
919 talloc_free(ctdb_db);
929 /* Assume all non-persistent databases support read only delegations */
930 if (!ctdb_db->persistent) {
931 ctdb_db->readonly = true;
934 if (ctdb_db->readonly) {
937 ropath = talloc_asprintf(ctdb_db, "%s.RO", ctdb_db->db_path);
938 if (ropath == NULL) {
939 DEBUG(DEBUG_CRIT,("Failed to asprintf the tracking database\n"));
940 ctdb_db->readonly = false;
941 talloc_free(ctdb_db);
944 ctdb_db->rottdb = tdb_open(ropath,
945 ctdb->tunable.database_hash_size,
946 TDB_NOLOCK|TDB_CLEAR_IF_FIRST|TDB_NOSYNC,
948 if (ctdb_db->rottdb == NULL) {
949 DEBUG(DEBUG_CRIT,("Failed to open/create the tracking database '%s'\n", ropath));
950 ctdb_db->readonly = false;
951 talloc_free(ctdb_db);
954 DEBUG(DEBUG_NOTICE,("OPENED tracking database : '%s'\n", ropath));
958 DLIST_ADD(ctdb->db_list, ctdb_db);
960 /* setting this can help some high churn databases */
961 tdb_set_max_dead(ctdb_db->ltdb->tdb, ctdb->tunable.database_max_dead);
964 all databases support the "null" function. we need this in
965 order to do forced migration of records
967 ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_null_func, CTDB_NULL_FUNC);
969 DEBUG(DEBUG_CRIT,("Failed to setup null function for '%s'\n", ctdb_db->db_name));
970 talloc_free(ctdb_db);
975 all databases support the "fetch" function. we need this
976 for efficient Samba3 ctdb fetch
978 ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_fetch_func, CTDB_FETCH_FUNC);
980 DEBUG(DEBUG_CRIT,("Failed to setup fetch function for '%s'\n", ctdb_db->db_name));
981 talloc_free(ctdb_db);
986 all databases support the "fetch_with_header" function. we need this
987 for efficient readonly record fetches
989 ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_fetch_with_header_func, CTDB_FETCH_WITH_HEADER_FUNC);
991 DEBUG(DEBUG_CRIT,("Failed to setup fetch function for '%s'\n", ctdb_db->db_name));
992 talloc_free(ctdb_db);
996 ret = ctdb_vacuum_init(ctdb_db);
998 DEBUG(DEBUG_CRIT,("Failed to setup vacuuming for "
999 "database '%s'\n", ctdb_db->db_name));
1000 talloc_free(ctdb_db);
1005 DEBUG(DEBUG_INFO,("Attached to database '%s'\n", ctdb_db->db_path));
1012 struct ctdb_deferred_attach_context {
1013 struct ctdb_deferred_attach_context *next, *prev;
1014 struct ctdb_context *ctdb;
1015 struct ctdb_req_control *c;
1019 static int ctdb_deferred_attach_destructor(struct ctdb_deferred_attach_context *da_ctx)
1021 DLIST_REMOVE(da_ctx->ctdb->deferred_attach, da_ctx);
1026 static void ctdb_deferred_attach_timeout(struct event_context *ev, struct timed_event *te, struct timeval t, void *private_data)
1028 struct ctdb_deferred_attach_context *da_ctx = talloc_get_type(private_data, struct ctdb_deferred_attach_context);
1029 struct ctdb_context *ctdb = da_ctx->ctdb;
1031 ctdb_request_control_reply(ctdb, da_ctx->c, NULL, -1, NULL);
1032 talloc_free(da_ctx);
1035 static void ctdb_deferred_attach_callback(struct event_context *ev, struct timed_event *te, struct timeval t, void *private_data)
1037 struct ctdb_deferred_attach_context *da_ctx = talloc_get_type(private_data, struct ctdb_deferred_attach_context);
1038 struct ctdb_context *ctdb = da_ctx->ctdb;
1040 /* This talloc-steals the packet ->c */
1041 ctdb_input_pkt(ctdb, (struct ctdb_req_header *)da_ctx->c);
1042 talloc_free(da_ctx);
1045 int ctdb_process_deferred_attach(struct ctdb_context *ctdb)
1047 struct ctdb_deferred_attach_context *da_ctx;
1049 /* call it from the main event loop as soon as the current event
1052 while ((da_ctx = ctdb->deferred_attach) != NULL) {
1053 DLIST_REMOVE(ctdb->deferred_attach, da_ctx);
1054 event_add_timed(ctdb->ev, da_ctx, timeval_current_ofs(1,0), ctdb_deferred_attach_callback, da_ctx);
1061 a client has asked to attach a new database
1063 int32_t ctdb_control_db_attach(struct ctdb_context *ctdb, TDB_DATA indata,
1064 TDB_DATA *outdata, uint64_t tdb_flags,
1065 bool persistent, uint32_t client_id,
1066 struct ctdb_req_control *c,
1069 const char *db_name = (const char *)indata.dptr;
1070 struct ctdb_db_context *db;
1071 struct ctdb_node *node = ctdb->nodes[ctdb->pnn];
1072 struct ctdb_client *client = NULL;
1074 /* dont allow any local clients to attach while we are in recovery mode
1075 * except for the recovery daemon.
1076 * allow all attach from the network since these are always from remote
1079 if (client_id != 0) {
1080 client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
1082 if (client != NULL) {
1083 /* If the node is inactive it is not part of the cluster
1084 and we should not allow clients to attach to any
1087 if (node->flags & NODE_FLAGS_INACTIVE) {
1088 DEBUG(DEBUG_ERR,("DB Attach to database %s refused since node is inactive (disconnected or banned)\n", db_name));
1092 if (ctdb->recovery_mode == CTDB_RECOVERY_ACTIVE
1093 && client->pid != ctdb->recoverd_pid
1094 && !ctdb->done_startup) {
1095 struct ctdb_deferred_attach_context *da_ctx = talloc(client, struct ctdb_deferred_attach_context);
1097 if (da_ctx == NULL) {
1098 DEBUG(DEBUG_ERR,("DB Attach to database %s deferral for client with pid:%d failed due to OOM.\n", db_name, client->pid));
1102 da_ctx->ctdb = ctdb;
1103 da_ctx->c = talloc_steal(da_ctx, c);
1104 talloc_set_destructor(da_ctx, ctdb_deferred_attach_destructor);
1105 DLIST_ADD(ctdb->deferred_attach, da_ctx);
1107 event_add_timed(ctdb->ev, da_ctx, timeval_current_ofs(ctdb->tunable.deferred_attach_timeout, 0), ctdb_deferred_attach_timeout, da_ctx);
1109 DEBUG(DEBUG_ERR,("DB Attach to database %s deferred for client with pid:%d since node is in recovery mode.\n", db_name, client->pid));
1110 *async_reply = true;
1115 /* the client can optionally pass additional tdb flags, but we
1116 only allow a subset of those on the database in ctdb. Note
1117 that tdb_flags is passed in via the (otherwise unused)
1118 srvid to the attach control */
1119 tdb_flags &= (TDB_NOSYNC|TDB_INCOMPATIBLE_HASH);
1121 /* see if we already have this name */
1122 db = ctdb_db_handle(ctdb, db_name);
1124 outdata->dptr = (uint8_t *)&db->db_id;
1125 outdata->dsize = sizeof(db->db_id);
1126 tdb_add_flags(db->ltdb->tdb, tdb_flags);
1130 if (ctdb_local_attach(ctdb, db_name, persistent, NULL, (tdb_flags&TDB_INCOMPATIBLE_HASH)?true:false) != 0) {
1134 db = ctdb_db_handle(ctdb, db_name);
1136 DEBUG(DEBUG_ERR,("Failed to find db handle for name '%s'\n", db_name));
1140 /* remember the flags the client has specified */
1141 tdb_add_flags(db->ltdb->tdb, tdb_flags);
1143 outdata->dptr = (uint8_t *)&db->db_id;
1144 outdata->dsize = sizeof(db->db_id);
1146 /* Try to ensure it's locked in mem */
1147 ctdb_lockdown_memory(ctdb);
1149 /* tell all the other nodes about this database */
1150 ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL, tdb_flags,
1151 persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:
1152 CTDB_CONTROL_DB_ATTACH,
1153 0, CTDB_CTRL_FLAG_NOREPLY,
1154 indata, NULL, NULL);
1162 attach to all existing persistent databases
1164 static int ctdb_attach_persistent(struct ctdb_context *ctdb,
1165 const char *unhealthy_reason)
1170 /* open the persistent db directory and scan it for files */
1171 d = opendir(ctdb->db_directory_persistent);
1176 while ((de=readdir(d))) {
1178 size_t len = strlen(de->d_name);
1180 int invalid_name = 0;
1182 s = talloc_strdup(ctdb, de->d_name);
1183 CTDB_NO_MEMORY(ctdb, s);
1185 /* only accept names ending in .tdb */
1186 p = strstr(s, ".tdb.");
1187 if (len < 7 || p == NULL) {
1192 /* only accept names ending with .tdb. and any number of digits */
1194 while (*q != 0 && invalid_name == 0) {
1195 if (!isdigit(*q++)) {
1199 if (invalid_name == 1 || sscanf(p+5, "%u", &node) != 1 || node != ctdb->pnn) {
1200 DEBUG(DEBUG_ERR,("Ignoring persistent database '%s'\n", de->d_name));
1206 if (ctdb_local_attach(ctdb, s, true, unhealthy_reason, 0) != 0) {
1207 DEBUG(DEBUG_ERR,("Failed to attach to persistent database '%s'\n", de->d_name));
1213 DEBUG(DEBUG_INFO,("Attached to persistent database %s\n", s));
1221 int ctdb_attach_databases(struct ctdb_context *ctdb)
1224 char *persistent_health_path = NULL;
1225 char *unhealthy_reason = NULL;
1226 bool first_try = true;
1228 if (ctdb->db_directory == NULL) {
1229 ctdb->db_directory = VARDIR "/ctdb";
1231 if (ctdb->db_directory_persistent == NULL) {
1232 ctdb->db_directory_persistent = VARDIR "/ctdb/persistent";
1234 if (ctdb->db_directory_state == NULL) {
1235 ctdb->db_directory_state = VARDIR "/ctdb/state";
1238 /* make sure the db directory exists */
1239 ret = mkdir(ctdb->db_directory, 0700);
1240 if (ret == -1 && errno != EEXIST) {
1241 DEBUG(DEBUG_CRIT,(__location__ " Unable to create ctdb directory '%s'\n",
1242 ctdb->db_directory));
1246 /* make sure the persistent db directory exists */
1247 ret = mkdir(ctdb->db_directory_persistent, 0700);
1248 if (ret == -1 && errno != EEXIST) {
1249 DEBUG(DEBUG_CRIT,(__location__ " Unable to create ctdb persistent directory '%s'\n",
1250 ctdb->db_directory_persistent));
1254 /* make sure the internal state db directory exists */
1255 ret = mkdir(ctdb->db_directory_state, 0700);
1256 if (ret == -1 && errno != EEXIST) {
1257 DEBUG(DEBUG_CRIT,(__location__ " Unable to create ctdb state directory '%s'\n",
1258 ctdb->db_directory_state));
1262 persistent_health_path = talloc_asprintf(ctdb, "%s/%s.%u",
1263 ctdb->db_directory_state,
1264 PERSISTENT_HEALTH_TDB,
1266 if (persistent_health_path == NULL) {
1267 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
1273 ctdb->db_persistent_health = tdb_wrap_open(ctdb, persistent_health_path,
1274 0, TDB_DISALLOW_NESTING,
1275 O_CREAT | O_RDWR, 0600);
1276 if (ctdb->db_persistent_health == NULL) {
1277 struct tdb_wrap *tdb;
1280 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s': %d - %s\n",
1281 persistent_health_path,
1284 talloc_free(persistent_health_path);
1285 talloc_free(unhealthy_reason);
1290 unhealthy_reason = talloc_asprintf(ctdb, "WARNING - '%s' %s - %s",
1291 persistent_health_path,
1292 "was cleared after a failure",
1293 "manual verification needed");
1294 if (unhealthy_reason == NULL) {
1295 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
1296 talloc_free(persistent_health_path);
1300 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - retrying after CLEAR_IF_FIRST\n",
1301 persistent_health_path));
1302 tdb = tdb_wrap_open(ctdb, persistent_health_path,
1303 0, TDB_CLEAR_IF_FIRST | TDB_DISALLOW_NESTING,
1304 O_CREAT | O_RDWR, 0600);
1306 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - with CLEAR_IF_FIRST: %d - %s\n",
1307 persistent_health_path,
1310 talloc_free(persistent_health_path);
1311 talloc_free(unhealthy_reason);
1318 ret = tdb_check(ctdb->db_persistent_health->tdb, NULL, NULL);
1320 struct tdb_wrap *tdb;
1322 talloc_free(ctdb->db_persistent_health);
1323 ctdb->db_persistent_health = NULL;
1326 DEBUG(DEBUG_CRIT,("tdb_check('%s') failed\n",
1327 persistent_health_path));
1328 talloc_free(persistent_health_path);
1329 talloc_free(unhealthy_reason);
1334 unhealthy_reason = talloc_asprintf(ctdb, "WARNING - '%s' %s - %s",
1335 persistent_health_path,
1336 "was cleared after a failure",
1337 "manual verification needed");
1338 if (unhealthy_reason == NULL) {
1339 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
1340 talloc_free(persistent_health_path);
1344 DEBUG(DEBUG_CRIT,("tdb_check('%s') failed - retrying after CLEAR_IF_FIRST\n",
1345 persistent_health_path));
1346 tdb = tdb_wrap_open(ctdb, persistent_health_path,
1347 0, TDB_CLEAR_IF_FIRST | TDB_DISALLOW_NESTING,
1348 O_CREAT | O_RDWR, 0600);
1350 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - with CLEAR_IF_FIRST: %d - %s\n",
1351 persistent_health_path,
1354 talloc_free(persistent_health_path);
1355 talloc_free(unhealthy_reason);
1362 talloc_free(persistent_health_path);
1364 ret = ctdb_attach_persistent(ctdb, unhealthy_reason);
1365 talloc_free(unhealthy_reason);
1374 called when a broadcast seqnum update comes in
1376 int32_t ctdb_ltdb_update_seqnum(struct ctdb_context *ctdb, uint32_t db_id, uint32_t srcnode)
1378 struct ctdb_db_context *ctdb_db;
1379 if (srcnode == ctdb->pnn) {
1380 /* don't update ourselves! */
1384 ctdb_db = find_ctdb_db(ctdb, db_id);
1386 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_ltdb_update_seqnum\n", db_id));
1390 if (ctdb_db->unhealthy_reason) {
1391 DEBUG(DEBUG_ERR,("db(%s) unhealty in ctdb_ltdb_update_seqnum: %s\n",
1392 ctdb_db->db_name, ctdb_db->unhealthy_reason));
1396 tdb_increment_seqnum_nonblock(ctdb_db->ltdb->tdb);
1397 ctdb_db->seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1402 timer to check for seqnum changes in a ltdb and propogate them
1404 static void ctdb_ltdb_seqnum_check(struct event_context *ev, struct timed_event *te,
1405 struct timeval t, void *p)
1407 struct ctdb_db_context *ctdb_db = talloc_get_type(p, struct ctdb_db_context);
1408 struct ctdb_context *ctdb = ctdb_db->ctdb;
1409 uint32_t new_seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1410 if (new_seqnum != ctdb_db->seqnum) {
1411 /* something has changed - propogate it */
1413 data.dptr = (uint8_t *)&ctdb_db->db_id;
1414 data.dsize = sizeof(uint32_t);
1415 ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_VNNMAP, 0,
1416 CTDB_CONTROL_UPDATE_SEQNUM, 0, CTDB_CTRL_FLAG_NOREPLY,
1419 ctdb_db->seqnum = new_seqnum;
1421 /* setup a new timer */
1422 ctdb_db->seqnum_update =
1423 event_add_timed(ctdb->ev, ctdb_db,
1424 timeval_current_ofs(ctdb->tunable.seqnum_interval/1000, (ctdb->tunable.seqnum_interval%1000)*1000),
1425 ctdb_ltdb_seqnum_check, ctdb_db);
1429 enable seqnum handling on this db
1431 int32_t ctdb_ltdb_enable_seqnum(struct ctdb_context *ctdb, uint32_t db_id)
1433 struct ctdb_db_context *ctdb_db;
1434 ctdb_db = find_ctdb_db(ctdb, db_id);
1436 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_ltdb_enable_seqnum\n", db_id));
1440 if (ctdb_db->seqnum_update == NULL) {
1441 ctdb_db->seqnum_update =
1442 event_add_timed(ctdb->ev, ctdb_db,
1443 timeval_current_ofs(ctdb->tunable.seqnum_interval/1000, (ctdb->tunable.seqnum_interval%1000)*1000),
1444 ctdb_ltdb_seqnum_check, ctdb_db);
1447 tdb_enable_seqnum(ctdb_db->ltdb->tdb);
1448 ctdb_db->seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1452 int32_t ctdb_control_set_db_priority(struct ctdb_context *ctdb, TDB_DATA indata)
1454 struct ctdb_db_priority *db_prio = (struct ctdb_db_priority *)indata.dptr;
1455 struct ctdb_db_context *ctdb_db;
1457 ctdb_db = find_ctdb_db(ctdb, db_prio->db_id);
1459 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_set_db_priority\n", db_prio->db_id));
1463 if ((db_prio->priority<1) || (db_prio->priority>NUM_DB_PRIORITIES)) {
1464 DEBUG(DEBUG_ERR,("Trying to set invalid priority : %u\n", db_prio->priority));
1468 ctdb_db->priority = db_prio->priority;
1469 DEBUG(DEBUG_INFO,("Setting DB priority to %u for db 0x%08x\n", db_prio->priority, db_prio->db_id));