2 ctdb ltdb code - server side
4 Copyright (C) Andrew Tridgell 2007
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program; if not, see <http://www.gnu.org/licenses/>.
22 #include "system/network.h"
23 #include "system/filesys.h"
24 #include "system/dir.h"
25 #include "system/time.h"
26 #include "../include/ctdb_private.h"
27 #include "../common/rb_tree.h"
28 #include "lib/tdb_wrap/tdb_wrap.h"
29 #include "lib/util/dlinklist.h"
31 #include "common/reqid.h"
32 #include "common/system.h"
34 #define PERSISTENT_HEALTH_TDB "persistent_health.tdb"
37 * write a record to a normal database
39 * This is the server-variant of the ctdb_ltdb_store function.
40 * It contains logic to determine whether a record should be
41 * stored or deleted. It also sends SCHEDULE_FOR_DELETION
42 * controls to the local ctdb daemon if apporpriate.
44 static int ctdb_ltdb_store_server(struct ctdb_db_context *ctdb_db,
46 struct ctdb_ltdb_header *header,
49 struct ctdb_context *ctdb = ctdb_db->ctdb;
52 bool seqnum_suppressed = false;
54 bool schedule_for_deletion = false;
55 bool remove_from_delete_queue = false;
58 if (ctdb->flags & CTDB_FLAG_TORTURE) {
59 struct ctdb_ltdb_header *h2;
60 rec = tdb_fetch(ctdb_db->ltdb->tdb, key);
61 h2 = (struct ctdb_ltdb_header *)rec.dptr;
62 if (rec.dptr && rec.dsize >= sizeof(h2) && h2->rsn > header->rsn) {
63 DEBUG(DEBUG_CRIT,("RSN regression! %llu %llu\n",
64 (unsigned long long)h2->rsn, (unsigned long long)header->rsn));
66 if (rec.dptr) free(rec.dptr);
69 if (ctdb->vnn_map == NULL) {
71 * Called from a client: always store the record
72 * Also don't call ctdb_lmaster since it uses the vnn_map!
78 lmaster = ctdb_lmaster(ctdb_db->ctdb, &key);
81 * If we migrate an empty record off to another node
82 * and the record has not been migrated with data,
83 * delete the record instead of storing the empty record.
85 if (data.dsize != 0) {
87 } else if (header->flags & CTDB_REC_RO_FLAGS) {
89 } else if (ctdb_db->persistent) {
91 } else if (header->flags & CTDB_REC_FLAG_AUTOMATIC) {
93 * The record is not created by the client but
94 * automatically by the ctdb_ltdb_fetch logic that
95 * creates a record with an initial header in the
96 * ltdb before trying to migrate the record from
97 * the current lmaster. Keep it instead of trying
98 * to delete the non-existing record...
101 schedule_for_deletion = true;
102 } else if (header->flags & CTDB_REC_FLAG_MIGRATED_WITH_DATA) {
104 } else if (ctdb_db->ctdb->pnn == lmaster) {
106 * If we are lmaster, then we usually keep the record.
107 * But if we retrieve the dmaster role by a VACUUM_MIGRATE
108 * and the record is empty and has never been migrated
109 * with data, then we should delete it instead of storing it.
110 * This is part of the vacuuming process.
112 * The reason that we usually need to store even empty records
113 * on the lmaster is that a client operating directly on the
114 * lmaster (== dmaster) expects the local copy of the record to
115 * exist after successful ctdb migrate call. If the record does
116 * not exist, the client goes into a migrate loop and eventually
117 * fails. So storing the empty record makes sure that we do not
118 * need to change the client code.
120 if (!(header->flags & CTDB_REC_FLAG_VACUUM_MIGRATED)) {
122 } else if (ctdb_db->ctdb->pnn != header->dmaster) {
125 } else if (ctdb_db->ctdb->pnn == header->dmaster) {
130 if (!ctdb_db->persistent &&
131 (ctdb_db->ctdb->pnn == header->dmaster) &&
132 !(header->flags & CTDB_REC_RO_FLAGS))
136 if (data.dsize == 0) {
137 schedule_for_deletion = true;
140 remove_from_delete_queue = !schedule_for_deletion;
145 * The VACUUM_MIGRATED flag is only set temporarily for
146 * the above logic when the record was retrieved by a
147 * VACUUM_MIGRATE call and should not be stored in the
150 * The VACUUM_MIGRATE call is triggered by a vacuum fetch,
151 * and there are two cases in which the corresponding record
152 * is stored in the local database:
153 * 1. The record has been migrated with data in the past
154 * (the MIGRATED_WITH_DATA record flag is set).
155 * 2. The record has been filled with data again since it
156 * had been submitted in the VACUUM_FETCH message to the
158 * For such records it is important to not store the
159 * VACUUM_MIGRATED flag in the database.
161 header->flags &= ~CTDB_REC_FLAG_VACUUM_MIGRATED;
164 * Similarly, clear the AUTOMATIC flag which should not enter
165 * the local database copy since this would require client
166 * modifications to clear the flag when the client stores
169 header->flags &= ~CTDB_REC_FLAG_AUTOMATIC;
171 rec.dsize = sizeof(*header) + data.dsize;
172 rec.dptr = talloc_size(ctdb, rec.dsize);
173 CTDB_NO_MEMORY(ctdb, rec.dptr);
175 memcpy(rec.dptr, header, sizeof(*header));
176 memcpy(rec.dptr + sizeof(*header), data.dptr, data.dsize);
178 /* Databases with seqnum updates enabled only get their seqnum
179 changes when/if we modify the data */
180 if (ctdb_db->seqnum_update != NULL) {
182 old = tdb_fetch(ctdb_db->ltdb->tdb, key);
184 if ( (old.dsize == rec.dsize)
185 && !memcmp(old.dptr+sizeof(struct ctdb_ltdb_header),
186 rec.dptr+sizeof(struct ctdb_ltdb_header),
187 rec.dsize-sizeof(struct ctdb_ltdb_header)) ) {
188 tdb_remove_flags(ctdb_db->ltdb->tdb, TDB_SEQNUM);
189 seqnum_suppressed = true;
191 if (old.dptr) free(old.dptr);
194 DEBUG(DEBUG_DEBUG, (__location__ " db[%s]: %s record: hash[0x%08x]\n",
196 keep?"storing":"deleting",
200 ret = tdb_store(ctdb_db->ltdb->tdb, key, rec, TDB_REPLACE);
202 ret = tdb_delete(ctdb_db->ltdb->tdb, key);
209 tdb_error(ctdb_db->ltdb->tdb) == TDB_ERR_NOEXIST)
214 DEBUG(lvl, (__location__ " db[%s]: Failed to %s record: "
217 keep?"store":"delete", ret,
218 tdb_errorstr(ctdb_db->ltdb->tdb)));
220 schedule_for_deletion = false;
221 remove_from_delete_queue = false;
223 if (seqnum_suppressed) {
224 tdb_add_flags(ctdb_db->ltdb->tdb, TDB_SEQNUM);
227 talloc_free(rec.dptr);
229 if (schedule_for_deletion) {
231 ret2 = ctdb_local_schedule_for_deletion(ctdb_db, header, key);
233 DEBUG(DEBUG_ERR, (__location__ " ctdb_local_schedule_for_deletion failed.\n"));
237 if (remove_from_delete_queue) {
238 ctdb_local_remove_from_delete_queue(ctdb_db, header, key);
244 struct lock_fetch_state {
245 struct ctdb_context *ctdb;
246 struct ctdb_db_context *ctdb_db;
247 void (*recv_pkt)(void *, struct ctdb_req_header *);
249 struct ctdb_req_header *hdr;
251 bool ignore_generation;
255 called when we should retry the operation
257 static void lock_fetch_callback(void *p, bool locked)
259 struct lock_fetch_state *state = talloc_get_type(p, struct lock_fetch_state);
260 if (!state->ignore_generation &&
261 state->generation != state->ctdb_db->generation) {
262 DEBUG(DEBUG_NOTICE,("Discarding previous generation lockwait packet\n"));
263 talloc_free(state->hdr);
266 state->recv_pkt(state->recv_context, state->hdr);
267 DEBUG(DEBUG_INFO,(__location__ " PACKET REQUEUED\n"));
272 do a non-blocking ltdb_lock, deferring this ctdb request until we
275 It does the following:
277 1) tries to get the chainlock. If it succeeds, then it returns 0
279 2) if it fails to get a chainlock immediately then it sets up a
280 non-blocking chainlock via ctdb_lock_record, and when it gets the
281 chainlock it re-submits this ctdb request to the main packet
284 This effectively queues all ctdb requests that cannot be
285 immediately satisfied until it can get the lock. This means that
286 the main ctdb daemon will not block waiting for a chainlock held by
289 There are 3 possible return values:
291 0: means that it got the lock immediately.
292 -1: means that it failed to get the lock, and won't retry
293 -2: means that it failed to get the lock immediately, but will retry
295 int ctdb_ltdb_lock_requeue(struct ctdb_db_context *ctdb_db,
296 TDB_DATA key, struct ctdb_req_header *hdr,
297 void (*recv_pkt)(void *, struct ctdb_req_header *),
298 void *recv_context, bool ignore_generation)
301 struct tdb_context *tdb = ctdb_db->ltdb->tdb;
302 struct lock_request *lreq;
303 struct lock_fetch_state *state;
305 ret = tdb_chainlock_nonblock(tdb, key);
308 !(errno == EACCES || errno == EAGAIN || errno == EDEADLK)) {
309 /* a hard failure - don't try again */
313 /* when torturing, ensure we test the contended path */
314 if ((ctdb_db->ctdb->flags & CTDB_FLAG_TORTURE) &&
317 tdb_chainunlock(tdb, key);
320 /* first the non-contended path */
325 state = talloc(hdr, struct lock_fetch_state);
326 state->ctdb = ctdb_db->ctdb;
327 state->ctdb_db = ctdb_db;
329 state->recv_pkt = recv_pkt;
330 state->recv_context = recv_context;
331 state->generation = ctdb_db->generation;
332 state->ignore_generation = ignore_generation;
334 /* now the contended path */
335 lreq = ctdb_lock_record(state, ctdb_db, key, true, lock_fetch_callback, state);
340 /* we need to move the packet off the temporary context in ctdb_input_pkt(),
341 so it won't be freed yet */
342 talloc_steal(state, hdr);
344 /* now tell the caller than we will retry asynchronously */
349 a varient of ctdb_ltdb_lock_requeue that also fetches the record
351 int ctdb_ltdb_lock_fetch_requeue(struct ctdb_db_context *ctdb_db,
352 TDB_DATA key, struct ctdb_ltdb_header *header,
353 struct ctdb_req_header *hdr, TDB_DATA *data,
354 void (*recv_pkt)(void *, struct ctdb_req_header *),
355 void *recv_context, bool ignore_generation)
359 ret = ctdb_ltdb_lock_requeue(ctdb_db, key, hdr, recv_pkt,
360 recv_context, ignore_generation);
362 ret = ctdb_ltdb_fetch(ctdb_db, key, header, hdr, data);
365 uret = ctdb_ltdb_unlock(ctdb_db, key);
367 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", uret));
376 paraoid check to see if the db is empty
378 static void ctdb_check_db_empty(struct ctdb_db_context *ctdb_db)
380 struct tdb_context *tdb = ctdb_db->ltdb->tdb;
381 int count = tdb_traverse_read(tdb, NULL, NULL);
383 DEBUG(DEBUG_ALERT,(__location__ " tdb '%s' not empty on attach! aborting\n",
385 ctdb_fatal(ctdb_db->ctdb, "database not empty on attach");
389 int ctdb_load_persistent_health(struct ctdb_context *ctdb,
390 struct ctdb_db_context *ctdb_db)
392 struct tdb_context *tdb = ctdb->db_persistent_health->tdb;
398 key.dptr = discard_const_p(uint8_t, ctdb_db->db_name);
399 key.dsize = strlen(ctdb_db->db_name);
401 old = ctdb_db->unhealthy_reason;
402 ctdb_db->unhealthy_reason = NULL;
404 val = tdb_fetch(tdb, key);
406 reason = talloc_strndup(ctdb_db,
407 (const char *)val.dptr,
409 if (reason == NULL) {
410 DEBUG(DEBUG_ALERT,(__location__ " talloc_strndup(%d) failed\n",
412 ctdb_db->unhealthy_reason = old;
423 ctdb_db->unhealthy_reason = reason;
427 int ctdb_update_persistent_health(struct ctdb_context *ctdb,
428 struct ctdb_db_context *ctdb_db,
429 const char *given_reason,/* NULL means healthy */
430 int num_healthy_nodes)
432 struct tdb_context *tdb = ctdb->db_persistent_health->tdb;
436 char *new_reason = NULL;
437 char *old_reason = NULL;
439 ret = tdb_transaction_start(tdb);
441 DEBUG(DEBUG_ALERT,(__location__ " tdb_transaction_start('%s') failed: %d - %s\n",
442 tdb_name(tdb), ret, tdb_errorstr(tdb)));
446 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
448 DEBUG(DEBUG_ALERT,(__location__ " ctdb_load_persistent_health('%s') failed: %d\n",
449 ctdb_db->db_name, ret));
452 old_reason = ctdb_db->unhealthy_reason;
454 key.dptr = discard_const_p(uint8_t, ctdb_db->db_name);
455 key.dsize = strlen(ctdb_db->db_name);
458 new_reason = talloc_strdup(ctdb_db, given_reason);
459 if (new_reason == NULL) {
460 DEBUG(DEBUG_ALERT,(__location__ " talloc_strdup(%s) failed\n",
464 } else if (old_reason && num_healthy_nodes == 0) {
466 * If the reason indicates ok, but there where no healthy nodes
467 * available, that it means, we have not recovered valid content
468 * of the db. So if there's an old reason, prefix it with
469 * "NO-HEALTHY-NODES - "
473 #define _TMP_PREFIX "NO-HEALTHY-NODES - "
474 ret = strncmp(_TMP_PREFIX, old_reason, strlen(_TMP_PREFIX));
476 prefix = _TMP_PREFIX;
480 new_reason = talloc_asprintf(ctdb_db, "%s%s",
482 if (new_reason == NULL) {
483 DEBUG(DEBUG_ALERT,(__location__ " talloc_asprintf(%s%s) failed\n",
484 prefix, old_reason));
491 val.dptr = discard_const_p(uint8_t, new_reason);
492 val.dsize = strlen(new_reason);
494 ret = tdb_store(tdb, key, val, TDB_REPLACE);
496 tdb_transaction_cancel(tdb);
497 DEBUG(DEBUG_ALERT,(__location__ " tdb_store('%s', %s, %s) failed: %d - %s\n",
498 tdb_name(tdb), ctdb_db->db_name, new_reason,
499 ret, tdb_errorstr(tdb)));
500 talloc_free(new_reason);
503 DEBUG(DEBUG_ALERT,("Updated db health for db(%s) to: %s\n",
504 ctdb_db->db_name, new_reason));
505 } else if (old_reason) {
506 ret = tdb_delete(tdb, key);
508 tdb_transaction_cancel(tdb);
509 DEBUG(DEBUG_ALERT,(__location__ " tdb_delete('%s', %s) failed: %d - %s\n",
510 tdb_name(tdb), ctdb_db->db_name,
511 ret, tdb_errorstr(tdb)));
512 talloc_free(new_reason);
515 DEBUG(DEBUG_NOTICE,("Updated db health for db(%s): OK\n",
519 ret = tdb_transaction_commit(tdb);
520 if (ret != TDB_SUCCESS) {
521 DEBUG(DEBUG_ALERT,(__location__ " tdb_transaction_commit('%s') failed: %d - %s\n",
522 tdb_name(tdb), ret, tdb_errorstr(tdb)));
523 talloc_free(new_reason);
527 talloc_free(old_reason);
528 ctdb_db->unhealthy_reason = new_reason;
533 static int ctdb_backup_corrupted_tdb(struct ctdb_context *ctdb,
534 struct ctdb_db_context *ctdb_db)
536 time_t now = time(NULL);
544 /* formatted like: foo.tdb.0.corrupted.20091204160825.0Z */
545 new_path = talloc_asprintf(ctdb_db, "%s.corrupted."
546 "%04u%02u%02u%02u%02u%02u.0Z",
548 tm->tm_year+1900, tm->tm_mon+1,
549 tm->tm_mday, tm->tm_hour, tm->tm_min,
551 if (new_path == NULL) {
552 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
556 new_reason = talloc_asprintf(ctdb_db,
557 "ERROR - Backup of corrupted TDB in '%s'",
559 if (new_reason == NULL) {
560 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
563 ret = ctdb_update_persistent_health(ctdb, ctdb_db, new_reason, 0);
564 talloc_free(new_reason);
566 DEBUG(DEBUG_CRIT,(__location__
567 ": ctdb_backup_corrupted_tdb(%s) not implemented yet\n",
572 ret = rename(ctdb_db->db_path, new_path);
574 DEBUG(DEBUG_CRIT,(__location__
575 ": ctdb_backup_corrupted_tdb(%s) rename to %s failed: %d - %s\n",
576 ctdb_db->db_path, new_path,
577 errno, strerror(errno)));
578 talloc_free(new_path);
582 DEBUG(DEBUG_CRIT,(__location__
583 ": ctdb_backup_corrupted_tdb(%s) renamed to %s\n",
584 ctdb_db->db_path, new_path));
585 talloc_free(new_path);
589 int ctdb_recheck_persistent_health(struct ctdb_context *ctdb)
591 struct ctdb_db_context *ctdb_db;
596 for (ctdb_db = ctdb->db_list; ctdb_db; ctdb_db = ctdb_db->next) {
597 if (!ctdb_db->persistent) {
601 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
603 DEBUG(DEBUG_ALERT,(__location__
604 " load persistent health for '%s' failed\n",
609 if (ctdb_db->unhealthy_reason == NULL) {
611 DEBUG(DEBUG_INFO,(__location__
612 " persistent db '%s' healthy\n",
618 DEBUG(DEBUG_ALERT,(__location__
619 " persistent db '%s' unhealthy: %s\n",
621 ctdb_db->unhealthy_reason));
623 DEBUG((fail!=0)?DEBUG_ALERT:DEBUG_NOTICE,
624 ("ctdb_recheck_persistent_health: OK[%d] FAIL[%d]\n",
636 mark a database - as healthy
638 int32_t ctdb_control_db_set_healthy(struct ctdb_context *ctdb, TDB_DATA indata)
640 uint32_t db_id = *(uint32_t *)indata.dptr;
641 struct ctdb_db_context *ctdb_db;
643 bool may_recover = false;
645 ctdb_db = find_ctdb_db(ctdb, db_id);
647 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%x\n", db_id));
651 if (ctdb_db->unhealthy_reason) {
655 ret = ctdb_update_persistent_health(ctdb, ctdb_db, NULL, 1);
657 DEBUG(DEBUG_ERR,(__location__
658 " ctdb_update_persistent_health(%s) failed\n",
663 if (may_recover && ctdb->runstate == CTDB_RUNSTATE_STARTUP) {
664 DEBUG(DEBUG_ERR, (__location__ " db %s become healthy - force recovery for startup\n",
666 ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
672 int32_t ctdb_control_db_get_health(struct ctdb_context *ctdb,
676 uint32_t db_id = *(uint32_t *)indata.dptr;
677 struct ctdb_db_context *ctdb_db;
680 ctdb_db = find_ctdb_db(ctdb, db_id);
682 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%x\n", db_id));
686 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
688 DEBUG(DEBUG_ERR,(__location__
689 " ctdb_load_persistent_health(%s) failed\n",
695 if (ctdb_db->unhealthy_reason) {
696 outdata->dptr = (uint8_t *)ctdb_db->unhealthy_reason;
697 outdata->dsize = strlen(ctdb_db->unhealthy_reason)+1;
704 int ctdb_set_db_readonly(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db)
708 if (ctdb_db->readonly) {
712 if (ctdb_db->persistent) {
713 DEBUG(DEBUG_ERR,("Persistent databases do not support readonly property\n"));
717 ropath = talloc_asprintf(ctdb_db, "%s.RO", ctdb_db->db_path);
718 if (ropath == NULL) {
719 DEBUG(DEBUG_CRIT,("Failed to asprintf the tracking database\n"));
722 ctdb_db->rottdb = tdb_open(ropath,
723 ctdb->tunable.database_hash_size,
724 TDB_NOLOCK|TDB_CLEAR_IF_FIRST|TDB_NOSYNC,
725 O_CREAT|O_RDWR, 0600);
726 if (ctdb_db->rottdb == NULL) {
727 DEBUG(DEBUG_CRIT,("Failed to open/create the tracking database '%s'\n", ropath));
732 DEBUG(DEBUG_NOTICE,("OPENED tracking database : '%s'\n", ropath));
734 ctdb_db->readonly = true;
736 DEBUG(DEBUG_NOTICE, ("Readonly property set on DB %s\n", ctdb_db->db_name));
743 attach to a database, handling both persistent and non-persistent databases
744 return 0 on success, -1 on failure
746 static int ctdb_local_attach(struct ctdb_context *ctdb, const char *db_name,
747 bool persistent, const char *unhealthy_reason,
748 bool jenkinshash, bool mutexes)
750 struct ctdb_db_context *ctdb_db, *tmp_db;
755 int remaining_tries = 0;
757 ctdb_db = talloc_zero(ctdb, struct ctdb_db_context);
758 CTDB_NO_MEMORY(ctdb, ctdb_db);
760 ctdb_db->priority = 1;
761 ctdb_db->ctdb = ctdb;
762 ctdb_db->db_name = talloc_strdup(ctdb_db, db_name);
763 CTDB_NO_MEMORY(ctdb, ctdb_db->db_name);
765 key.dsize = strlen(db_name)+1;
766 key.dptr = discard_const(db_name);
767 ctdb_db->db_id = ctdb_hash(&key);
768 ctdb_db->persistent = persistent;
770 if (!ctdb_db->persistent) {
771 ctdb_db->delete_queue = trbt_create(ctdb_db, 0);
772 if (ctdb_db->delete_queue == NULL) {
773 CTDB_NO_MEMORY(ctdb, ctdb_db->delete_queue);
776 ctdb_db->ctdb_ltdb_store_fn = ctdb_ltdb_store_server;
779 /* check for hash collisions */
780 for (tmp_db=ctdb->db_list;tmp_db;tmp_db=tmp_db->next) {
781 if (tmp_db->db_id == ctdb_db->db_id) {
782 DEBUG(DEBUG_CRIT,("db_id 0x%x hash collision. name1='%s' name2='%s'\n",
783 tmp_db->db_id, db_name, tmp_db->db_name));
784 talloc_free(ctdb_db);
790 if (unhealthy_reason) {
791 ret = ctdb_update_persistent_health(ctdb, ctdb_db,
792 unhealthy_reason, 0);
794 DEBUG(DEBUG_ALERT,(__location__ " ctdb_update_persistent_health('%s','%s') failed: %d\n",
795 ctdb_db->db_name, unhealthy_reason, ret));
796 talloc_free(ctdb_db);
801 if (ctdb->max_persistent_check_errors > 0) {
804 if (ctdb->runstate == CTDB_RUNSTATE_RUNNING) {
808 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
810 DEBUG(DEBUG_ALERT,(__location__ " ctdb_load_persistent_health('%s') failed: %d\n",
811 ctdb_db->db_name, ret));
812 talloc_free(ctdb_db);
817 if (ctdb_db->unhealthy_reason && remaining_tries == 0) {
818 DEBUG(DEBUG_ALERT,(__location__ "ERROR: tdb %s is marked as unhealthy: %s\n",
819 ctdb_db->db_name, ctdb_db->unhealthy_reason));
820 talloc_free(ctdb_db);
824 if (ctdb_db->unhealthy_reason) {
825 /* this is just a warning, but we want that in the log file! */
826 DEBUG(DEBUG_ALERT,(__location__ "Warning: tdb %s is marked as unhealthy: %s\n",
827 ctdb_db->db_name, ctdb_db->unhealthy_reason));
830 /* open the database */
831 ctdb_db->db_path = talloc_asprintf(ctdb_db, "%s/%s.%u",
832 persistent?ctdb->db_directory_persistent:ctdb->db_directory,
835 tdb_flags = persistent? TDB_DEFAULT : TDB_CLEAR_IF_FIRST | TDB_NOSYNC;
836 if (ctdb->valgrinding) {
837 tdb_flags |= TDB_NOMMAP;
839 tdb_flags |= TDB_DISALLOW_NESTING;
841 tdb_flags |= TDB_INCOMPATIBLE_HASH;
843 #ifdef TDB_MUTEX_LOCKING
844 if (ctdb->tunable.mutex_enabled && mutexes &&
845 tdb_runtime_check_for_robust_mutexes()) {
846 tdb_flags |= (TDB_MUTEX_LOCKING | TDB_CLEAR_IF_FIRST);
851 ctdb_db->ltdb = tdb_wrap_open(ctdb_db, ctdb_db->db_path,
852 ctdb->tunable.database_hash_size,
854 O_CREAT|O_RDWR, mode);
855 if (ctdb_db->ltdb == NULL) {
857 int saved_errno = errno;
860 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s': %d - %s\n",
863 strerror(saved_errno)));
864 talloc_free(ctdb_db);
868 if (remaining_tries == 0) {
869 DEBUG(DEBUG_CRIT,(__location__
870 "Failed to open persistent tdb '%s': %d - %s\n",
873 strerror(saved_errno)));
874 talloc_free(ctdb_db);
878 ret = stat(ctdb_db->db_path, &st);
880 DEBUG(DEBUG_CRIT,(__location__
881 "Failed to open persistent tdb '%s': %d - %s\n",
884 strerror(saved_errno)));
885 talloc_free(ctdb_db);
889 ret = ctdb_backup_corrupted_tdb(ctdb, ctdb_db);
891 DEBUG(DEBUG_CRIT,(__location__
892 "Failed to open persistent tdb '%s': %d - %s\n",
895 strerror(saved_errno)));
896 talloc_free(ctdb_db);
906 ctdb_check_db_empty(ctdb_db);
908 ret = tdb_check(ctdb_db->ltdb->tdb, NULL, NULL);
913 DEBUG(DEBUG_CRIT,("tdb_check(%s) failed: %d - %s\n",
914 ctdb_db->db_path, ret,
915 tdb_errorstr(ctdb_db->ltdb->tdb)));
916 if (remaining_tries == 0) {
917 talloc_free(ctdb_db);
921 fd = tdb_fd(ctdb_db->ltdb->tdb);
922 ret = fstat(fd, &st);
924 DEBUG(DEBUG_CRIT,(__location__
925 "Failed to fstat() persistent tdb '%s': %d - %s\n",
929 talloc_free(ctdb_db);
934 talloc_free(ctdb_db->ltdb);
935 ctdb_db->ltdb = NULL;
937 ret = ctdb_backup_corrupted_tdb(ctdb, ctdb_db);
939 DEBUG(DEBUG_CRIT,("Failed to backup corrupted tdb '%s'\n",
941 talloc_free(ctdb_db);
951 /* set up a rb tree we can use to track which records we have a
952 fetch-lock in-flight for so we can defer any additional calls
955 ctdb_db->deferred_fetch = trbt_create(ctdb_db, 0);
956 if (ctdb_db->deferred_fetch == NULL) {
957 DEBUG(DEBUG_ERR,("Failed to create deferred fetch rb tree for ctdb database\n"));
958 talloc_free(ctdb_db);
962 ctdb_db->defer_dmaster = trbt_create(ctdb_db, 0);
963 if (ctdb_db->defer_dmaster == NULL) {
964 DEBUG(DEBUG_ERR, ("Failed to create defer dmaster rb tree for %s\n",
966 talloc_free(ctdb_db);
970 DLIST_ADD(ctdb->db_list, ctdb_db);
972 /* setting this can help some high churn databases */
973 tdb_set_max_dead(ctdb_db->ltdb->tdb, ctdb->tunable.database_max_dead);
976 all databases support the "null" function. we need this in
977 order to do forced migration of records
979 ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_null_func, CTDB_NULL_FUNC);
981 DEBUG(DEBUG_CRIT,("Failed to setup null function for '%s'\n", ctdb_db->db_name));
982 talloc_free(ctdb_db);
987 all databases support the "fetch" function. we need this
988 for efficient Samba3 ctdb fetch
990 ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_fetch_func, CTDB_FETCH_FUNC);
992 DEBUG(DEBUG_CRIT,("Failed to setup fetch function for '%s'\n", ctdb_db->db_name));
993 talloc_free(ctdb_db);
998 all databases support the "fetch_with_header" function. we need this
999 for efficient readonly record fetches
1001 ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_fetch_with_header_func, CTDB_FETCH_WITH_HEADER_FUNC);
1003 DEBUG(DEBUG_CRIT,("Failed to setup fetch function for '%s'\n", ctdb_db->db_name));
1004 talloc_free(ctdb_db);
1008 ret = ctdb_vacuum_init(ctdb_db);
1010 DEBUG(DEBUG_CRIT,("Failed to setup vacuuming for "
1011 "database '%s'\n", ctdb_db->db_name));
1012 talloc_free(ctdb_db);
1016 ctdb_db->generation = ctdb->vnn_map->generation;
1018 DEBUG(DEBUG_NOTICE,("Attached to database '%s' with flags 0x%x\n",
1019 ctdb_db->db_path, tdb_flags));
1026 struct ctdb_deferred_attach_context {
1027 struct ctdb_deferred_attach_context *next, *prev;
1028 struct ctdb_context *ctdb;
1029 struct ctdb_req_control *c;
1033 static int ctdb_deferred_attach_destructor(struct ctdb_deferred_attach_context *da_ctx)
1035 DLIST_REMOVE(da_ctx->ctdb->deferred_attach, da_ctx);
1040 static void ctdb_deferred_attach_timeout(struct event_context *ev, struct timed_event *te, struct timeval t, void *private_data)
1042 struct ctdb_deferred_attach_context *da_ctx = talloc_get_type(private_data, struct ctdb_deferred_attach_context);
1043 struct ctdb_context *ctdb = da_ctx->ctdb;
1045 ctdb_request_control_reply(ctdb, da_ctx->c, NULL, -1, NULL);
1046 talloc_free(da_ctx);
1049 static void ctdb_deferred_attach_callback(struct event_context *ev, struct timed_event *te, struct timeval t, void *private_data)
1051 struct ctdb_deferred_attach_context *da_ctx = talloc_get_type(private_data, struct ctdb_deferred_attach_context);
1052 struct ctdb_context *ctdb = da_ctx->ctdb;
1054 /* This talloc-steals the packet ->c */
1055 ctdb_input_pkt(ctdb, (struct ctdb_req_header *)da_ctx->c);
1056 talloc_free(da_ctx);
1059 int ctdb_process_deferred_attach(struct ctdb_context *ctdb)
1061 struct ctdb_deferred_attach_context *da_ctx;
1063 /* call it from the main event loop as soon as the current event
1066 while ((da_ctx = ctdb->deferred_attach) != NULL) {
1067 DLIST_REMOVE(ctdb->deferred_attach, da_ctx);
1068 event_add_timed(ctdb->ev, da_ctx, timeval_current_ofs(1,0), ctdb_deferred_attach_callback, da_ctx);
1075 a client has asked to attach a new database
1077 int32_t ctdb_control_db_attach(struct ctdb_context *ctdb, TDB_DATA indata,
1078 TDB_DATA *outdata, uint64_t tdb_flags,
1079 bool persistent, uint32_t client_id,
1080 struct ctdb_req_control *c,
1083 const char *db_name = (const char *)indata.dptr;
1084 struct ctdb_db_context *db;
1085 struct ctdb_node *node = ctdb->nodes[ctdb->pnn];
1086 struct ctdb_client *client = NULL;
1087 bool with_jenkinshash, with_mutexes;
1089 if (ctdb->tunable.allow_client_db_attach == 0) {
1090 DEBUG(DEBUG_ERR, ("DB Attach to database %s denied by tunable "
1091 "AllowClientDBAccess == 0\n", db_name));
1095 /* dont allow any local clients to attach while we are in recovery mode
1096 * except for the recovery daemon.
1097 * allow all attach from the network since these are always from remote
1100 if (client_id != 0) {
1101 client = reqid_find(ctdb->idr, client_id, struct ctdb_client);
1103 if (client != NULL) {
1104 /* If the node is inactive it is not part of the cluster
1105 and we should not allow clients to attach to any
1108 if (node->flags & NODE_FLAGS_INACTIVE) {
1109 DEBUG(DEBUG_ERR,("DB Attach to database %s refused since node is inactive (flags=0x%x)\n", db_name, node->flags));
1113 if (ctdb->recovery_mode == CTDB_RECOVERY_ACTIVE &&
1114 client->pid != ctdb->recoverd_pid &&
1115 ctdb->runstate < CTDB_RUNSTATE_RUNNING) {
1116 struct ctdb_deferred_attach_context *da_ctx = talloc(client, struct ctdb_deferred_attach_context);
1118 if (da_ctx == NULL) {
1119 DEBUG(DEBUG_ERR,("DB Attach to database %s deferral for client with pid:%d failed due to OOM.\n", db_name, client->pid));
1123 da_ctx->ctdb = ctdb;
1124 da_ctx->c = talloc_steal(da_ctx, c);
1125 talloc_set_destructor(da_ctx, ctdb_deferred_attach_destructor);
1126 DLIST_ADD(ctdb->deferred_attach, da_ctx);
1128 event_add_timed(ctdb->ev, da_ctx, timeval_current_ofs(ctdb->tunable.deferred_attach_timeout, 0), ctdb_deferred_attach_timeout, da_ctx);
1130 DEBUG(DEBUG_ERR,("DB Attach to database %s deferred for client with pid:%d since node is in recovery mode.\n", db_name, client->pid));
1131 *async_reply = true;
1136 /* the client can optionally pass additional tdb flags, but we
1137 only allow a subset of those on the database in ctdb. Note
1138 that tdb_flags is passed in via the (otherwise unused)
1139 srvid to the attach control */
1140 #ifdef TDB_MUTEX_LOCKING
1141 tdb_flags &= (TDB_NOSYNC|TDB_INCOMPATIBLE_HASH|TDB_MUTEX_LOCKING|TDB_CLEAR_IF_FIRST);
1143 tdb_flags &= (TDB_NOSYNC|TDB_INCOMPATIBLE_HASH);
1146 /* see if we already have this name */
1147 db = ctdb_db_handle(ctdb, db_name);
1149 if (db->persistent != persistent) {
1150 DEBUG(DEBUG_ERR, ("ERROR: DB Attach %spersistent to %spersistent "
1151 "database %s\n", persistent ? "" : "non-",
1152 db-> persistent ? "" : "non-", db_name));
1155 outdata->dptr = (uint8_t *)&db->db_id;
1156 outdata->dsize = sizeof(db->db_id);
1157 tdb_add_flags(db->ltdb->tdb, tdb_flags);
1161 with_jenkinshash = (tdb_flags & TDB_INCOMPATIBLE_HASH) ? true : false;
1162 #ifdef TDB_MUTEX_LOCKING
1163 with_mutexes = (tdb_flags & TDB_MUTEX_LOCKING) ? true : false;
1165 with_mutexes = false;
1168 if (ctdb_local_attach(ctdb, db_name, persistent, NULL,
1169 with_jenkinshash, with_mutexes) != 0) {
1173 db = ctdb_db_handle(ctdb, db_name);
1175 DEBUG(DEBUG_ERR,("Failed to find db handle for name '%s'\n", db_name));
1179 /* remember the flags the client has specified */
1180 tdb_add_flags(db->ltdb->tdb, tdb_flags);
1182 outdata->dptr = (uint8_t *)&db->db_id;
1183 outdata->dsize = sizeof(db->db_id);
1185 /* Try to ensure it's locked in mem */
1186 lockdown_memory(ctdb->valgrinding);
1188 /* tell all the other nodes about this database */
1189 ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL, tdb_flags,
1190 persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:
1191 CTDB_CONTROL_DB_ATTACH,
1192 0, CTDB_CTRL_FLAG_NOREPLY,
1193 indata, NULL, NULL);
1200 * a client has asked to detach from a database
1202 int32_t ctdb_control_db_detach(struct ctdb_context *ctdb, TDB_DATA indata,
1206 struct ctdb_db_context *ctdb_db;
1207 struct ctdb_client *client = NULL;
1209 db_id = *(uint32_t *)indata.dptr;
1210 ctdb_db = find_ctdb_db(ctdb, db_id);
1211 if (ctdb_db == NULL) {
1212 DEBUG(DEBUG_ERR, ("Invalid dbid 0x%08x in DB detach\n",
1217 if (ctdb->tunable.allow_client_db_attach == 1) {
1218 DEBUG(DEBUG_ERR, ("DB detach from database %s denied. "
1219 "Clients are allowed access to databases "
1220 "(AllowClientDBAccess == 1)\n",
1225 if (ctdb_db->persistent) {
1226 DEBUG(DEBUG_ERR, ("DB detach from persistent database %s "
1227 "denied\n", ctdb_db->db_name));
1231 /* Cannot detach from database when in recovery */
1232 if (ctdb->recovery_mode == CTDB_RECOVERY_ACTIVE) {
1233 DEBUG(DEBUG_ERR, ("DB detach denied while in recovery\n"));
1237 /* If a control comes from a client, then broadcast it to all nodes.
1238 * Do the actual detach only if the control comes from other daemons.
1240 if (client_id != 0) {
1241 client = reqid_find(ctdb->idr, client_id, struct ctdb_client);
1242 if (client != NULL) {
1243 /* forward the control to all the nodes */
1244 ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL, 0,
1245 CTDB_CONTROL_DB_DETACH, 0,
1246 CTDB_CTRL_FLAG_NOREPLY,
1247 indata, NULL, NULL);
1250 DEBUG(DEBUG_ERR, ("Client has gone away. Failing DB detach "
1251 "for database '%s'\n", ctdb_db->db_name));
1255 /* Detach database from recoverd */
1256 if (ctdb_daemon_send_message(ctdb, ctdb->pnn,
1257 CTDB_SRVID_DETACH_DATABASE,
1259 DEBUG(DEBUG_ERR, ("Unable to detach DB from recoverd\n"));
1263 /* Disable vacuuming and drop all vacuuming data */
1264 talloc_free(ctdb_db->vacuum_handle);
1265 talloc_free(ctdb_db->delete_queue);
1267 /* Terminate any deferred fetch */
1268 talloc_free(ctdb_db->deferred_fetch);
1270 /* Terminate any traverses */
1271 while (ctdb_db->traverse) {
1272 talloc_free(ctdb_db->traverse);
1275 /* Terminate any revokes */
1276 while (ctdb_db->revokechild_active) {
1277 talloc_free(ctdb_db->revokechild_active);
1280 /* Free readonly tracking database */
1281 if (ctdb_db->readonly) {
1282 talloc_free(ctdb_db->rottdb);
1285 DLIST_REMOVE(ctdb->db_list, ctdb_db);
1287 DEBUG(DEBUG_NOTICE, ("Detached from database '%s'\n",
1289 talloc_free(ctdb_db);
1295 attach to all existing persistent databases
1297 static int ctdb_attach_persistent(struct ctdb_context *ctdb,
1298 const char *unhealthy_reason)
1303 /* open the persistent db directory and scan it for files */
1304 d = opendir(ctdb->db_directory_persistent);
1309 while ((de=readdir(d))) {
1311 size_t len = strlen(de->d_name);
1313 int invalid_name = 0;
1315 s = talloc_strdup(ctdb, de->d_name);
1318 CTDB_NO_MEMORY(ctdb, s);
1321 /* only accept names ending in .tdb */
1322 p = strstr(s, ".tdb.");
1323 if (len < 7 || p == NULL) {
1328 /* only accept names ending with .tdb. and any number of digits */
1330 while (*q != 0 && invalid_name == 0) {
1331 if (!isdigit(*q++)) {
1335 if (invalid_name == 1 || sscanf(p+5, "%u", &node) != 1 || node != ctdb->pnn) {
1336 DEBUG(DEBUG_ERR,("Ignoring persistent database '%s'\n", de->d_name));
1342 if (ctdb_local_attach(ctdb, s, true, unhealthy_reason, false, false) != 0) {
1343 DEBUG(DEBUG_ERR,("Failed to attach to persistent database '%s'\n", de->d_name));
1349 DEBUG(DEBUG_INFO,("Attached to persistent database %s\n", s));
1357 int ctdb_attach_databases(struct ctdb_context *ctdb)
1360 char *persistent_health_path = NULL;
1361 char *unhealthy_reason = NULL;
1362 bool first_try = true;
1364 persistent_health_path = talloc_asprintf(ctdb, "%s/%s.%u",
1365 ctdb->db_directory_state,
1366 PERSISTENT_HEALTH_TDB,
1368 if (persistent_health_path == NULL) {
1369 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
1375 ctdb->db_persistent_health = tdb_wrap_open(ctdb, persistent_health_path,
1376 0, TDB_DISALLOW_NESTING,
1377 O_CREAT | O_RDWR, 0600);
1378 if (ctdb->db_persistent_health == NULL) {
1379 struct tdb_wrap *tdb;
1382 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s': %d - %s\n",
1383 persistent_health_path,
1386 talloc_free(persistent_health_path);
1387 talloc_free(unhealthy_reason);
1392 unhealthy_reason = talloc_asprintf(ctdb, "WARNING - '%s' %s - %s",
1393 persistent_health_path,
1394 "was cleared after a failure",
1395 "manual verification needed");
1396 if (unhealthy_reason == NULL) {
1397 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
1398 talloc_free(persistent_health_path);
1402 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - retrying after CLEAR_IF_FIRST\n",
1403 persistent_health_path));
1404 tdb = tdb_wrap_open(ctdb, persistent_health_path,
1405 0, TDB_CLEAR_IF_FIRST | TDB_DISALLOW_NESTING,
1406 O_CREAT | O_RDWR, 0600);
1408 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - with CLEAR_IF_FIRST: %d - %s\n",
1409 persistent_health_path,
1412 talloc_free(persistent_health_path);
1413 talloc_free(unhealthy_reason);
1420 ret = tdb_check(ctdb->db_persistent_health->tdb, NULL, NULL);
1422 struct tdb_wrap *tdb;
1424 talloc_free(ctdb->db_persistent_health);
1425 ctdb->db_persistent_health = NULL;
1428 DEBUG(DEBUG_CRIT,("tdb_check('%s') failed\n",
1429 persistent_health_path));
1430 talloc_free(persistent_health_path);
1431 talloc_free(unhealthy_reason);
1436 unhealthy_reason = talloc_asprintf(ctdb, "WARNING - '%s' %s - %s",
1437 persistent_health_path,
1438 "was cleared after a failure",
1439 "manual verification needed");
1440 if (unhealthy_reason == NULL) {
1441 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
1442 talloc_free(persistent_health_path);
1446 DEBUG(DEBUG_CRIT,("tdb_check('%s') failed - retrying after CLEAR_IF_FIRST\n",
1447 persistent_health_path));
1448 tdb = tdb_wrap_open(ctdb, persistent_health_path,
1449 0, TDB_CLEAR_IF_FIRST | TDB_DISALLOW_NESTING,
1450 O_CREAT | O_RDWR, 0600);
1452 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - with CLEAR_IF_FIRST: %d - %s\n",
1453 persistent_health_path,
1456 talloc_free(persistent_health_path);
1457 talloc_free(unhealthy_reason);
1464 talloc_free(persistent_health_path);
1466 ret = ctdb_attach_persistent(ctdb, unhealthy_reason);
1467 talloc_free(unhealthy_reason);
1476 called when a broadcast seqnum update comes in
1478 int32_t ctdb_ltdb_update_seqnum(struct ctdb_context *ctdb, uint32_t db_id, uint32_t srcnode)
1480 struct ctdb_db_context *ctdb_db;
1481 if (srcnode == ctdb->pnn) {
1482 /* don't update ourselves! */
1486 ctdb_db = find_ctdb_db(ctdb, db_id);
1488 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_ltdb_update_seqnum\n", db_id));
1492 if (ctdb_db->unhealthy_reason) {
1493 DEBUG(DEBUG_ERR,("db(%s) unhealty in ctdb_ltdb_update_seqnum: %s\n",
1494 ctdb_db->db_name, ctdb_db->unhealthy_reason));
1498 tdb_increment_seqnum_nonblock(ctdb_db->ltdb->tdb);
1499 ctdb_db->seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1504 timer to check for seqnum changes in a ltdb and propogate them
1506 static void ctdb_ltdb_seqnum_check(struct event_context *ev, struct timed_event *te,
1507 struct timeval t, void *p)
1509 struct ctdb_db_context *ctdb_db = talloc_get_type(p, struct ctdb_db_context);
1510 struct ctdb_context *ctdb = ctdb_db->ctdb;
1511 uint32_t new_seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1512 if (new_seqnum != ctdb_db->seqnum) {
1513 /* something has changed - propogate it */
1515 data.dptr = (uint8_t *)&ctdb_db->db_id;
1516 data.dsize = sizeof(uint32_t);
1517 ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_VNNMAP, 0,
1518 CTDB_CONTROL_UPDATE_SEQNUM, 0, CTDB_CTRL_FLAG_NOREPLY,
1521 ctdb_db->seqnum = new_seqnum;
1523 /* setup a new timer */
1524 ctdb_db->seqnum_update =
1525 event_add_timed(ctdb->ev, ctdb_db,
1526 timeval_current_ofs(ctdb->tunable.seqnum_interval/1000, (ctdb->tunable.seqnum_interval%1000)*1000),
1527 ctdb_ltdb_seqnum_check, ctdb_db);
1531 enable seqnum handling on this db
1533 int32_t ctdb_ltdb_enable_seqnum(struct ctdb_context *ctdb, uint32_t db_id)
1535 struct ctdb_db_context *ctdb_db;
1536 ctdb_db = find_ctdb_db(ctdb, db_id);
1538 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_ltdb_enable_seqnum\n", db_id));
1542 if (ctdb_db->seqnum_update == NULL) {
1543 ctdb_db->seqnum_update =
1544 event_add_timed(ctdb->ev, ctdb_db,
1545 timeval_current_ofs(ctdb->tunable.seqnum_interval/1000, (ctdb->tunable.seqnum_interval%1000)*1000),
1546 ctdb_ltdb_seqnum_check, ctdb_db);
1549 tdb_enable_seqnum(ctdb_db->ltdb->tdb);
1550 ctdb_db->seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1554 int32_t ctdb_control_set_db_priority(struct ctdb_context *ctdb, TDB_DATA indata,
1557 struct ctdb_db_priority *db_prio = (struct ctdb_db_priority *)indata.dptr;
1558 struct ctdb_db_context *ctdb_db;
1560 ctdb_db = find_ctdb_db(ctdb, db_prio->db_id);
1562 if (!(ctdb->nodes[ctdb->pnn]->flags & NODE_FLAGS_INACTIVE)) {
1563 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_set_db_priority\n",
1569 if ((db_prio->priority<1) || (db_prio->priority>NUM_DB_PRIORITIES)) {
1570 DEBUG(DEBUG_ERR,("Trying to set invalid priority : %u\n", db_prio->priority));
1574 ctdb_db->priority = db_prio->priority;
1575 DEBUG(DEBUG_INFO,("Setting DB priority to %u for db 0x%08x\n", db_prio->priority, db_prio->db_id));
1577 if (client_id != 0) {
1578 /* Broadcast the update to the rest of the cluster */
1579 ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL, 0,
1580 CTDB_CONTROL_SET_DB_PRIORITY, 0,
1581 CTDB_CTRL_FLAG_NOREPLY, indata,
1588 int ctdb_set_db_sticky(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db)
1590 if (ctdb_db->sticky) {
1594 if (ctdb_db->persistent) {
1595 DEBUG(DEBUG_ERR,("Trying to set persistent database with sticky property\n"));
1599 ctdb_db->sticky_records = trbt_create(ctdb_db, 0);
1601 ctdb_db->sticky = true;
1603 DEBUG(DEBUG_NOTICE,("set db sticky %s\n", ctdb_db->db_name));
1608 void ctdb_db_statistics_reset(struct ctdb_db_context *ctdb_db)
1610 struct ctdb_db_statistics *s = &ctdb_db->statistics;
1613 for (i=0; i<MAX_HOT_KEYS; i++) {
1614 if (s->hot_keys[i].key.dsize > 0) {
1615 talloc_free(s->hot_keys[i].key.dptr);
1619 ZERO_STRUCT(ctdb_db->statistics);
1622 int32_t ctdb_control_get_db_statistics(struct ctdb_context *ctdb,
1626 struct ctdb_db_context *ctdb_db;
1627 struct ctdb_db_statistics *stats;
1632 ctdb_db = find_ctdb_db(ctdb, db_id);
1634 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in get_db_statistics\n", db_id));
1638 len = offsetof(struct ctdb_db_statistics, hot_keys_wire);
1639 for (i = 0; i < MAX_HOT_KEYS; i++) {
1640 len += ctdb_db->statistics.hot_keys[i].key.dsize;
1643 stats = talloc_size(outdata, len);
1644 if (stats == NULL) {
1645 DEBUG(DEBUG_ERR,("Failed to allocate db statistics structure\n"));
1649 memcpy(stats, &ctdb_db->statistics,
1650 offsetof(struct ctdb_db_statistics, hot_keys_wire));
1652 stats->num_hot_keys = MAX_HOT_KEYS;
1654 ptr = &stats->hot_keys_wire[0];
1655 for (i = 0; i < MAX_HOT_KEYS; i++) {
1656 memcpy(ptr, ctdb_db->statistics.hot_keys[i].key.dptr,
1657 ctdb_db->statistics.hot_keys[i].key.dsize);
1658 ptr += ctdb_db->statistics.hot_keys[i].key.dsize;
1661 outdata->dptr = (uint8_t *)stats;
1662 outdata->dsize = len;