2 ctdb ltdb code - server side
4 Copyright (C) Andrew Tridgell 2007
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program; if not, see <http://www.gnu.org/licenses/>.
21 #include "system/network.h"
22 #include "system/filesys.h"
23 #include "system/dir.h"
24 #include "system/time.h"
25 #include "system/locale.h"
30 #include "lib/tdb_wrap/tdb_wrap.h"
31 #include "lib/util/dlinklist.h"
32 #include "lib/util/debug.h"
33 #include "lib/util/samba_util.h"
35 #include "ctdb_private.h"
36 #include "ctdb_client.h"
38 #include "common/rb_tree.h"
39 #include "common/reqid.h"
40 #include "common/system.h"
41 #include "common/common.h"
42 #include "common/logging.h"
44 #define PERSISTENT_HEALTH_TDB "persistent_health.tdb"
47 * write a record to a normal database
49 * This is the server-variant of the ctdb_ltdb_store function.
50 * It contains logic to determine whether a record should be
51 * stored or deleted. It also sends SCHEDULE_FOR_DELETION
52 * controls to the local ctdb daemon if apporpriate.
54 static int ctdb_ltdb_store_server(struct ctdb_db_context *ctdb_db,
56 struct ctdb_ltdb_header *header,
59 struct ctdb_context *ctdb = ctdb_db->ctdb;
61 uint32_t hsize = sizeof(struct ctdb_ltdb_header);
63 bool seqnum_suppressed = false;
65 bool schedule_for_deletion = false;
66 bool remove_from_delete_queue = false;
69 if (ctdb->flags & CTDB_FLAG_TORTURE) {
71 struct ctdb_ltdb_header *h2;
73 old = tdb_fetch(ctdb_db->ltdb->tdb, key);
74 h2 = (struct ctdb_ltdb_header *)old.dptr;
75 if (old.dptr != NULL &&
77 h2->rsn > header->rsn) {
79 ("RSN regression! %"PRIu64" %"PRIu64"\n",
80 h2->rsn, header->rsn));
87 if (ctdb->vnn_map == NULL) {
89 * Called from a client: always store the record
90 * Also don't call ctdb_lmaster since it uses the vnn_map!
96 lmaster = ctdb_lmaster(ctdb_db->ctdb, &key);
99 * If we migrate an empty record off to another node
100 * and the record has not been migrated with data,
101 * delete the record instead of storing the empty record.
103 if (data.dsize != 0) {
105 } else if (header->flags & CTDB_REC_RO_FLAGS) {
107 } else if (ctdb_db->persistent) {
109 } else if (header->flags & CTDB_REC_FLAG_AUTOMATIC) {
111 * The record is not created by the client but
112 * automatically by the ctdb_ltdb_fetch logic that
113 * creates a record with an initial header in the
114 * ltdb before trying to migrate the record from
115 * the current lmaster. Keep it instead of trying
116 * to delete the non-existing record...
119 schedule_for_deletion = true;
120 } else if (header->flags & CTDB_REC_FLAG_MIGRATED_WITH_DATA) {
122 } else if (ctdb_db->ctdb->pnn == lmaster) {
124 * If we are lmaster, then we usually keep the record.
125 * But if we retrieve the dmaster role by a VACUUM_MIGRATE
126 * and the record is empty and has never been migrated
127 * with data, then we should delete it instead of storing it.
128 * This is part of the vacuuming process.
130 * The reason that we usually need to store even empty records
131 * on the lmaster is that a client operating directly on the
132 * lmaster (== dmaster) expects the local copy of the record to
133 * exist after successful ctdb migrate call. If the record does
134 * not exist, the client goes into a migrate loop and eventually
135 * fails. So storing the empty record makes sure that we do not
136 * need to change the client code.
138 if (!(header->flags & CTDB_REC_FLAG_VACUUM_MIGRATED)) {
140 } else if (ctdb_db->ctdb->pnn != header->dmaster) {
143 } else if (ctdb_db->ctdb->pnn == header->dmaster) {
148 if (!ctdb_db->persistent &&
149 (ctdb_db->ctdb->pnn == header->dmaster) &&
150 !(header->flags & CTDB_REC_RO_FLAGS))
154 if (data.dsize == 0) {
155 schedule_for_deletion = true;
158 remove_from_delete_queue = !schedule_for_deletion;
163 * The VACUUM_MIGRATED flag is only set temporarily for
164 * the above logic when the record was retrieved by a
165 * VACUUM_MIGRATE call and should not be stored in the
168 * The VACUUM_MIGRATE call is triggered by a vacuum fetch,
169 * and there are two cases in which the corresponding record
170 * is stored in the local database:
171 * 1. The record has been migrated with data in the past
172 * (the MIGRATED_WITH_DATA record flag is set).
173 * 2. The record has been filled with data again since it
174 * had been submitted in the VACUUM_FETCH message to the
176 * For such records it is important to not store the
177 * VACUUM_MIGRATED flag in the database.
179 header->flags &= ~CTDB_REC_FLAG_VACUUM_MIGRATED;
182 * Similarly, clear the AUTOMATIC flag which should not enter
183 * the local database copy since this would require client
184 * modifications to clear the flag when the client stores
187 header->flags &= ~CTDB_REC_FLAG_AUTOMATIC;
189 rec[0].dsize = hsize;
190 rec[0].dptr = (uint8_t *)header;
192 rec[1].dsize = data.dsize;
193 rec[1].dptr = data.dptr;
195 /* Databases with seqnum updates enabled only get their seqnum
196 changes when/if we modify the data */
197 if (ctdb_db->seqnum_update != NULL) {
199 old = tdb_fetch(ctdb_db->ltdb->tdb, key);
201 if ((old.dsize == hsize + data.dsize) &&
202 memcmp(old.dptr + hsize, data.dptr, data.dsize) == 0) {
203 tdb_remove_flags(ctdb_db->ltdb->tdb, TDB_SEQNUM);
204 seqnum_suppressed = true;
206 if (old.dptr != NULL) {
211 DEBUG(DEBUG_DEBUG, (__location__ " db[%s]: %s record: hash[0x%08x]\n",
213 keep?"storing":"deleting",
217 ret = tdb_storev(ctdb_db->ltdb->tdb, key, rec, 2, TDB_REPLACE);
219 ret = tdb_delete(ctdb_db->ltdb->tdb, key);
226 tdb_error(ctdb_db->ltdb->tdb) == TDB_ERR_NOEXIST)
231 DEBUG(lvl, (__location__ " db[%s]: Failed to %s record: "
234 keep?"store":"delete", ret,
235 tdb_errorstr(ctdb_db->ltdb->tdb)));
237 schedule_for_deletion = false;
238 remove_from_delete_queue = false;
240 if (seqnum_suppressed) {
241 tdb_add_flags(ctdb_db->ltdb->tdb, TDB_SEQNUM);
244 if (schedule_for_deletion) {
246 ret2 = ctdb_local_schedule_for_deletion(ctdb_db, header, key);
248 DEBUG(DEBUG_ERR, (__location__ " ctdb_local_schedule_for_deletion failed.\n"));
252 if (remove_from_delete_queue) {
253 ctdb_local_remove_from_delete_queue(ctdb_db, header, key);
259 struct lock_fetch_state {
260 struct ctdb_context *ctdb;
261 struct ctdb_db_context *ctdb_db;
262 void (*recv_pkt)(void *, struct ctdb_req_header *);
264 struct ctdb_req_header *hdr;
266 bool ignore_generation;
270 called when we should retry the operation
272 static void lock_fetch_callback(void *p, bool locked)
274 struct lock_fetch_state *state = talloc_get_type(p, struct lock_fetch_state);
275 if (!state->ignore_generation &&
276 state->generation != state->ctdb_db->generation) {
277 DEBUG(DEBUG_NOTICE,("Discarding previous generation lockwait packet\n"));
278 talloc_free(state->hdr);
281 state->recv_pkt(state->recv_context, state->hdr);
282 DEBUG(DEBUG_INFO,(__location__ " PACKET REQUEUED\n"));
287 do a non-blocking ltdb_lock, deferring this ctdb request until we
290 It does the following:
292 1) tries to get the chainlock. If it succeeds, then it returns 0
294 2) if it fails to get a chainlock immediately then it sets up a
295 non-blocking chainlock via ctdb_lock_record, and when it gets the
296 chainlock it re-submits this ctdb request to the main packet
299 This effectively queues all ctdb requests that cannot be
300 immediately satisfied until it can get the lock. This means that
301 the main ctdb daemon will not block waiting for a chainlock held by
304 There are 3 possible return values:
306 0: means that it got the lock immediately.
307 -1: means that it failed to get the lock, and won't retry
308 -2: means that it failed to get the lock immediately, but will retry
310 int ctdb_ltdb_lock_requeue(struct ctdb_db_context *ctdb_db,
311 TDB_DATA key, struct ctdb_req_header *hdr,
312 void (*recv_pkt)(void *, struct ctdb_req_header *),
313 void *recv_context, bool ignore_generation)
316 struct tdb_context *tdb = ctdb_db->ltdb->tdb;
317 struct lock_request *lreq;
318 struct lock_fetch_state *state;
320 ret = tdb_chainlock_nonblock(tdb, key);
323 !(errno == EACCES || errno == EAGAIN || errno == EDEADLK)) {
324 /* a hard failure - don't try again */
328 /* when torturing, ensure we test the contended path */
329 if ((ctdb_db->ctdb->flags & CTDB_FLAG_TORTURE) &&
332 tdb_chainunlock(tdb, key);
335 /* first the non-contended path */
340 state = talloc(hdr, struct lock_fetch_state);
341 state->ctdb = ctdb_db->ctdb;
342 state->ctdb_db = ctdb_db;
344 state->recv_pkt = recv_pkt;
345 state->recv_context = recv_context;
346 state->generation = ctdb_db->generation;
347 state->ignore_generation = ignore_generation;
349 /* now the contended path */
350 lreq = ctdb_lock_record(state, ctdb_db, key, true, lock_fetch_callback, state);
355 /* we need to move the packet off the temporary context in ctdb_input_pkt(),
356 so it won't be freed yet */
357 talloc_steal(state, hdr);
359 /* now tell the caller than we will retry asynchronously */
364 a varient of ctdb_ltdb_lock_requeue that also fetches the record
366 int ctdb_ltdb_lock_fetch_requeue(struct ctdb_db_context *ctdb_db,
367 TDB_DATA key, struct ctdb_ltdb_header *header,
368 struct ctdb_req_header *hdr, TDB_DATA *data,
369 void (*recv_pkt)(void *, struct ctdb_req_header *),
370 void *recv_context, bool ignore_generation)
374 ret = ctdb_ltdb_lock_requeue(ctdb_db, key, hdr, recv_pkt,
375 recv_context, ignore_generation);
377 ret = ctdb_ltdb_fetch(ctdb_db, key, header, hdr, data);
380 uret = ctdb_ltdb_unlock(ctdb_db, key);
382 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", uret));
391 paraoid check to see if the db is empty
393 static void ctdb_check_db_empty(struct ctdb_db_context *ctdb_db)
395 struct tdb_context *tdb = ctdb_db->ltdb->tdb;
396 int count = tdb_traverse_read(tdb, NULL, NULL);
398 DEBUG(DEBUG_ALERT,(__location__ " tdb '%s' not empty on attach! aborting\n",
400 ctdb_fatal(ctdb_db->ctdb, "database not empty on attach");
404 int ctdb_load_persistent_health(struct ctdb_context *ctdb,
405 struct ctdb_db_context *ctdb_db)
407 struct tdb_context *tdb = ctdb->db_persistent_health->tdb;
413 key.dptr = discard_const_p(uint8_t, ctdb_db->db_name);
414 key.dsize = strlen(ctdb_db->db_name);
416 old = ctdb_db->unhealthy_reason;
417 ctdb_db->unhealthy_reason = NULL;
419 val = tdb_fetch(tdb, key);
421 reason = talloc_strndup(ctdb_db,
422 (const char *)val.dptr,
424 if (reason == NULL) {
425 DEBUG(DEBUG_ALERT,(__location__ " talloc_strndup(%d) failed\n",
427 ctdb_db->unhealthy_reason = old;
438 ctdb_db->unhealthy_reason = reason;
442 int ctdb_update_persistent_health(struct ctdb_context *ctdb,
443 struct ctdb_db_context *ctdb_db,
444 const char *given_reason,/* NULL means healthy */
445 int num_healthy_nodes)
447 struct tdb_context *tdb = ctdb->db_persistent_health->tdb;
451 char *new_reason = NULL;
452 char *old_reason = NULL;
454 ret = tdb_transaction_start(tdb);
456 DEBUG(DEBUG_ALERT,(__location__ " tdb_transaction_start('%s') failed: %d - %s\n",
457 tdb_name(tdb), ret, tdb_errorstr(tdb)));
461 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
463 DEBUG(DEBUG_ALERT,(__location__ " ctdb_load_persistent_health('%s') failed: %d\n",
464 ctdb_db->db_name, ret));
467 old_reason = ctdb_db->unhealthy_reason;
469 key.dptr = discard_const_p(uint8_t, ctdb_db->db_name);
470 key.dsize = strlen(ctdb_db->db_name);
473 new_reason = talloc_strdup(ctdb_db, given_reason);
474 if (new_reason == NULL) {
475 DEBUG(DEBUG_ALERT,(__location__ " talloc_strdup(%s) failed\n",
479 } else if (old_reason && num_healthy_nodes == 0) {
481 * If the reason indicates ok, but there where no healthy nodes
482 * available, that it means, we have not recovered valid content
483 * of the db. So if there's an old reason, prefix it with
484 * "NO-HEALTHY-NODES - "
488 #define _TMP_PREFIX "NO-HEALTHY-NODES - "
489 ret = strncmp(_TMP_PREFIX, old_reason, strlen(_TMP_PREFIX));
491 prefix = _TMP_PREFIX;
495 new_reason = talloc_asprintf(ctdb_db, "%s%s",
497 if (new_reason == NULL) {
498 DEBUG(DEBUG_ALERT,(__location__ " talloc_asprintf(%s%s) failed\n",
499 prefix, old_reason));
506 val.dptr = discard_const_p(uint8_t, new_reason);
507 val.dsize = strlen(new_reason);
509 ret = tdb_store(tdb, key, val, TDB_REPLACE);
511 tdb_transaction_cancel(tdb);
512 DEBUG(DEBUG_ALERT,(__location__ " tdb_store('%s', %s, %s) failed: %d - %s\n",
513 tdb_name(tdb), ctdb_db->db_name, new_reason,
514 ret, tdb_errorstr(tdb)));
515 talloc_free(new_reason);
518 DEBUG(DEBUG_ALERT,("Updated db health for db(%s) to: %s\n",
519 ctdb_db->db_name, new_reason));
520 } else if (old_reason) {
521 ret = tdb_delete(tdb, key);
523 tdb_transaction_cancel(tdb);
524 DEBUG(DEBUG_ALERT,(__location__ " tdb_delete('%s', %s) failed: %d - %s\n",
525 tdb_name(tdb), ctdb_db->db_name,
526 ret, tdb_errorstr(tdb)));
527 talloc_free(new_reason);
530 DEBUG(DEBUG_NOTICE,("Updated db health for db(%s): OK\n",
534 ret = tdb_transaction_commit(tdb);
535 if (ret != TDB_SUCCESS) {
536 DEBUG(DEBUG_ALERT,(__location__ " tdb_transaction_commit('%s') failed: %d - %s\n",
537 tdb_name(tdb), ret, tdb_errorstr(tdb)));
538 talloc_free(new_reason);
542 talloc_free(old_reason);
543 ctdb_db->unhealthy_reason = new_reason;
548 static int ctdb_backup_corrupted_tdb(struct ctdb_context *ctdb,
549 struct ctdb_db_context *ctdb_db)
551 time_t now = time(NULL);
559 /* formatted like: foo.tdb.0.corrupted.20091204160825.0Z */
560 new_path = talloc_asprintf(ctdb_db, "%s.corrupted."
561 "%04u%02u%02u%02u%02u%02u.0Z",
563 tm->tm_year+1900, tm->tm_mon+1,
564 tm->tm_mday, tm->tm_hour, tm->tm_min,
566 if (new_path == NULL) {
567 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
571 new_reason = talloc_asprintf(ctdb_db,
572 "ERROR - Backup of corrupted TDB in '%s'",
574 if (new_reason == NULL) {
575 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
578 ret = ctdb_update_persistent_health(ctdb, ctdb_db, new_reason, 0);
579 talloc_free(new_reason);
581 DEBUG(DEBUG_CRIT,(__location__
582 ": ctdb_backup_corrupted_tdb(%s) not implemented yet\n",
587 ret = rename(ctdb_db->db_path, new_path);
589 DEBUG(DEBUG_CRIT,(__location__
590 ": ctdb_backup_corrupted_tdb(%s) rename to %s failed: %d - %s\n",
591 ctdb_db->db_path, new_path,
592 errno, strerror(errno)));
593 talloc_free(new_path);
597 DEBUG(DEBUG_CRIT,(__location__
598 ": ctdb_backup_corrupted_tdb(%s) renamed to %s\n",
599 ctdb_db->db_path, new_path));
600 talloc_free(new_path);
604 int ctdb_recheck_persistent_health(struct ctdb_context *ctdb)
606 struct ctdb_db_context *ctdb_db;
611 for (ctdb_db = ctdb->db_list; ctdb_db; ctdb_db = ctdb_db->next) {
612 if (!ctdb_db->persistent) {
616 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
618 DEBUG(DEBUG_ALERT,(__location__
619 " load persistent health for '%s' failed\n",
624 if (ctdb_db->unhealthy_reason == NULL) {
626 DEBUG(DEBUG_INFO,(__location__
627 " persistent db '%s' healthy\n",
633 DEBUG(DEBUG_ALERT,(__location__
634 " persistent db '%s' unhealthy: %s\n",
636 ctdb_db->unhealthy_reason));
639 ("ctdb_recheck_persistent_health: OK[%d] FAIL[%d]\n",
651 mark a database - as healthy
653 int32_t ctdb_control_db_set_healthy(struct ctdb_context *ctdb, TDB_DATA indata)
655 uint32_t db_id = *(uint32_t *)indata.dptr;
656 struct ctdb_db_context *ctdb_db;
658 bool may_recover = false;
660 ctdb_db = find_ctdb_db(ctdb, db_id);
662 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%x\n", db_id));
666 if (ctdb_db->unhealthy_reason) {
670 ret = ctdb_update_persistent_health(ctdb, ctdb_db, NULL, 1);
672 DEBUG(DEBUG_ERR,(__location__
673 " ctdb_update_persistent_health(%s) failed\n",
678 if (may_recover && ctdb->runstate == CTDB_RUNSTATE_STARTUP) {
679 DEBUG(DEBUG_ERR, (__location__ " db %s become healthy - force recovery for startup\n",
681 ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
687 int32_t ctdb_control_db_get_health(struct ctdb_context *ctdb,
691 uint32_t db_id = *(uint32_t *)indata.dptr;
692 struct ctdb_db_context *ctdb_db;
695 ctdb_db = find_ctdb_db(ctdb, db_id);
697 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%x\n", db_id));
701 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
703 DEBUG(DEBUG_ERR,(__location__
704 " ctdb_load_persistent_health(%s) failed\n",
710 if (ctdb_db->unhealthy_reason) {
711 outdata->dptr = (uint8_t *)ctdb_db->unhealthy_reason;
712 outdata->dsize = strlen(ctdb_db->unhealthy_reason)+1;
719 int ctdb_set_db_readonly(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db)
723 if (ctdb_db->readonly) {
727 if (ctdb_db->persistent) {
728 DEBUG(DEBUG_ERR,("Persistent databases do not support readonly property\n"));
732 ropath = talloc_asprintf(ctdb_db, "%s.RO", ctdb_db->db_path);
733 if (ropath == NULL) {
734 DEBUG(DEBUG_CRIT,("Failed to asprintf the tracking database\n"));
737 ctdb_db->rottdb = tdb_open(ropath,
738 ctdb->tunable.database_hash_size,
739 TDB_NOLOCK|TDB_CLEAR_IF_FIRST|TDB_NOSYNC,
740 O_CREAT|O_RDWR, 0600);
741 if (ctdb_db->rottdb == NULL) {
742 DEBUG(DEBUG_CRIT,("Failed to open/create the tracking database '%s'\n", ropath));
747 DEBUG(DEBUG_NOTICE,("OPENED tracking database : '%s'\n", ropath));
749 ctdb_db->readonly = true;
751 DEBUG(DEBUG_NOTICE, ("Readonly property set on DB %s\n", ctdb_db->db_name));
758 attach to a database, handling both persistent and non-persistent databases
759 return 0 on success, -1 on failure
761 static int ctdb_local_attach(struct ctdb_context *ctdb, const char *db_name,
762 bool persistent, const char *unhealthy_reason,
763 bool jenkinshash, bool mutexes)
765 struct ctdb_db_context *ctdb_db, *tmp_db;
770 int remaining_tries = 0;
772 ctdb_db = talloc_zero(ctdb, struct ctdb_db_context);
773 CTDB_NO_MEMORY(ctdb, ctdb_db);
775 ctdb_db->ctdb = ctdb;
776 ctdb_db->db_name = talloc_strdup(ctdb_db, db_name);
777 CTDB_NO_MEMORY(ctdb, ctdb_db->db_name);
779 key.dsize = strlen(db_name)+1;
780 key.dptr = discard_const(db_name);
781 ctdb_db->db_id = ctdb_hash(&key);
782 ctdb_db->persistent = persistent;
784 if (!ctdb_db->persistent) {
785 ctdb_db->delete_queue = trbt_create(ctdb_db, 0);
786 if (ctdb_db->delete_queue == NULL) {
787 CTDB_NO_MEMORY(ctdb, ctdb_db->delete_queue);
790 ctdb_db->ctdb_ltdb_store_fn = ctdb_ltdb_store_server;
793 /* check for hash collisions */
794 for (tmp_db=ctdb->db_list;tmp_db;tmp_db=tmp_db->next) {
795 if (tmp_db->db_id == ctdb_db->db_id) {
796 DEBUG(DEBUG_CRIT,("db_id 0x%x hash collision. name1='%s' name2='%s'\n",
797 tmp_db->db_id, db_name, tmp_db->db_name));
798 talloc_free(ctdb_db);
804 if (unhealthy_reason) {
805 ret = ctdb_update_persistent_health(ctdb, ctdb_db,
806 unhealthy_reason, 0);
808 DEBUG(DEBUG_ALERT,(__location__ " ctdb_update_persistent_health('%s','%s') failed: %d\n",
809 ctdb_db->db_name, unhealthy_reason, ret));
810 talloc_free(ctdb_db);
815 if (ctdb->max_persistent_check_errors > 0) {
818 if (ctdb->runstate == CTDB_RUNSTATE_RUNNING) {
822 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
824 DEBUG(DEBUG_ALERT,(__location__ " ctdb_load_persistent_health('%s') failed: %d\n",
825 ctdb_db->db_name, ret));
826 talloc_free(ctdb_db);
831 if (ctdb_db->unhealthy_reason && remaining_tries == 0) {
832 DEBUG(DEBUG_ALERT,(__location__ "ERROR: tdb %s is marked as unhealthy: %s\n",
833 ctdb_db->db_name, ctdb_db->unhealthy_reason));
834 talloc_free(ctdb_db);
838 if (ctdb_db->unhealthy_reason) {
839 /* this is just a warning, but we want that in the log file! */
840 DEBUG(DEBUG_ALERT,(__location__ "Warning: tdb %s is marked as unhealthy: %s\n",
841 ctdb_db->db_name, ctdb_db->unhealthy_reason));
844 /* open the database */
845 ctdb_db->db_path = talloc_asprintf(ctdb_db, "%s/%s.%u",
846 persistent?ctdb->db_directory_persistent:ctdb->db_directory,
849 tdb_flags = persistent? TDB_DEFAULT : TDB_CLEAR_IF_FIRST | TDB_NOSYNC;
850 if (ctdb->valgrinding) {
851 tdb_flags |= TDB_NOMMAP;
853 tdb_flags |= TDB_DISALLOW_NESTING;
855 tdb_flags |= TDB_INCOMPATIBLE_HASH;
857 #ifdef TDB_MUTEX_LOCKING
858 if (ctdb->tunable.mutex_enabled && mutexes &&
859 tdb_runtime_check_for_robust_mutexes()) {
860 tdb_flags |= (TDB_MUTEX_LOCKING | TDB_CLEAR_IF_FIRST);
865 ctdb_db->ltdb = tdb_wrap_open(ctdb_db, ctdb_db->db_path,
866 ctdb->tunable.database_hash_size,
868 O_CREAT|O_RDWR, mode);
869 if (ctdb_db->ltdb == NULL) {
871 int saved_errno = errno;
874 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s': %d - %s\n",
877 strerror(saved_errno)));
878 talloc_free(ctdb_db);
882 if (remaining_tries == 0) {
883 DEBUG(DEBUG_CRIT,(__location__
884 "Failed to open persistent tdb '%s': %d - %s\n",
887 strerror(saved_errno)));
888 talloc_free(ctdb_db);
892 ret = stat(ctdb_db->db_path, &st);
894 DEBUG(DEBUG_CRIT,(__location__
895 "Failed to open persistent tdb '%s': %d - %s\n",
898 strerror(saved_errno)));
899 talloc_free(ctdb_db);
903 ret = ctdb_backup_corrupted_tdb(ctdb, ctdb_db);
905 DEBUG(DEBUG_CRIT,(__location__
906 "Failed to open persistent tdb '%s': %d - %s\n",
909 strerror(saved_errno)));
910 talloc_free(ctdb_db);
920 ctdb_check_db_empty(ctdb_db);
922 ret = tdb_check(ctdb_db->ltdb->tdb, NULL, NULL);
927 DEBUG(DEBUG_CRIT,("tdb_check(%s) failed: %d - %s\n",
928 ctdb_db->db_path, ret,
929 tdb_errorstr(ctdb_db->ltdb->tdb)));
930 if (remaining_tries == 0) {
931 talloc_free(ctdb_db);
935 fd = tdb_fd(ctdb_db->ltdb->tdb);
936 ret = fstat(fd, &st);
938 DEBUG(DEBUG_CRIT,(__location__
939 "Failed to fstat() persistent tdb '%s': %d - %s\n",
943 talloc_free(ctdb_db);
948 talloc_free(ctdb_db->ltdb);
949 ctdb_db->ltdb = NULL;
951 ret = ctdb_backup_corrupted_tdb(ctdb, ctdb_db);
953 DEBUG(DEBUG_CRIT,("Failed to backup corrupted tdb '%s'\n",
955 talloc_free(ctdb_db);
965 /* set up a rb tree we can use to track which records we have a
966 fetch-lock in-flight for so we can defer any additional calls
969 ctdb_db->deferred_fetch = trbt_create(ctdb_db, 0);
970 if (ctdb_db->deferred_fetch == NULL) {
971 DEBUG(DEBUG_ERR,("Failed to create deferred fetch rb tree for ctdb database\n"));
972 talloc_free(ctdb_db);
976 ctdb_db->defer_dmaster = trbt_create(ctdb_db, 0);
977 if (ctdb_db->defer_dmaster == NULL) {
978 DEBUG(DEBUG_ERR, ("Failed to create defer dmaster rb tree for %s\n",
980 talloc_free(ctdb_db);
984 DLIST_ADD(ctdb->db_list, ctdb_db);
986 /* setting this can help some high churn databases */
987 tdb_set_max_dead(ctdb_db->ltdb->tdb, ctdb->tunable.database_max_dead);
990 all databases support the "null" function. we need this in
991 order to do forced migration of records
993 ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_null_func, CTDB_NULL_FUNC);
995 DEBUG(DEBUG_CRIT,("Failed to setup null function for '%s'\n", ctdb_db->db_name));
996 talloc_free(ctdb_db);
1001 all databases support the "fetch" function. we need this
1002 for efficient Samba3 ctdb fetch
1004 ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_fetch_func, CTDB_FETCH_FUNC);
1006 DEBUG(DEBUG_CRIT,("Failed to setup fetch function for '%s'\n", ctdb_db->db_name));
1007 talloc_free(ctdb_db);
1012 all databases support the "fetch_with_header" function. we need this
1013 for efficient readonly record fetches
1015 ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_fetch_with_header_func, CTDB_FETCH_WITH_HEADER_FUNC);
1017 DEBUG(DEBUG_CRIT,("Failed to setup fetch function for '%s'\n", ctdb_db->db_name));
1018 talloc_free(ctdb_db);
1022 ret = ctdb_vacuum_init(ctdb_db);
1024 DEBUG(DEBUG_CRIT,("Failed to setup vacuuming for "
1025 "database '%s'\n", ctdb_db->db_name));
1026 talloc_free(ctdb_db);
1030 ret = ctdb_migration_init(ctdb_db);
1033 ("Failed to setup migration tracking for db '%s'\n",
1035 talloc_free(ctdb_db);
1039 ctdb_db->generation = ctdb->vnn_map->generation;
1041 DEBUG(DEBUG_NOTICE,("Attached to database '%s' with flags 0x%x\n",
1042 ctdb_db->db_path, tdb_flags));
1049 struct ctdb_deferred_attach_context {
1050 struct ctdb_deferred_attach_context *next, *prev;
1051 struct ctdb_context *ctdb;
1052 struct ctdb_req_control_old *c;
1056 static int ctdb_deferred_attach_destructor(struct ctdb_deferred_attach_context *da_ctx)
1058 DLIST_REMOVE(da_ctx->ctdb->deferred_attach, da_ctx);
1063 static void ctdb_deferred_attach_timeout(struct tevent_context *ev,
1064 struct tevent_timer *te,
1065 struct timeval t, void *private_data)
1067 struct ctdb_deferred_attach_context *da_ctx = talloc_get_type(private_data, struct ctdb_deferred_attach_context);
1068 struct ctdb_context *ctdb = da_ctx->ctdb;
1070 ctdb_request_control_reply(ctdb, da_ctx->c, NULL, -1, NULL);
1071 talloc_free(da_ctx);
1074 static void ctdb_deferred_attach_callback(struct tevent_context *ev,
1075 struct tevent_timer *te,
1076 struct timeval t, void *private_data)
1078 struct ctdb_deferred_attach_context *da_ctx = talloc_get_type(private_data, struct ctdb_deferred_attach_context);
1079 struct ctdb_context *ctdb = da_ctx->ctdb;
1081 /* This talloc-steals the packet ->c */
1082 ctdb_input_pkt(ctdb, (struct ctdb_req_header *)da_ctx->c);
1083 talloc_free(da_ctx);
1086 int ctdb_process_deferred_attach(struct ctdb_context *ctdb)
1088 struct ctdb_deferred_attach_context *da_ctx;
1090 /* call it from the main event loop as soon as the current event
1093 while ((da_ctx = ctdb->deferred_attach) != NULL) {
1094 DLIST_REMOVE(ctdb->deferred_attach, da_ctx);
1095 tevent_add_timer(ctdb->ev, da_ctx,
1096 timeval_current_ofs(1,0),
1097 ctdb_deferred_attach_callback, da_ctx);
1104 a client has asked to attach a new database
1106 int32_t ctdb_control_db_attach(struct ctdb_context *ctdb, TDB_DATA indata,
1107 TDB_DATA *outdata, uint64_t tdb_flags,
1108 bool persistent, uint32_t client_id,
1109 struct ctdb_req_control_old *c,
1112 const char *db_name = (const char *)indata.dptr;
1113 struct ctdb_db_context *db;
1114 struct ctdb_node *node = ctdb->nodes[ctdb->pnn];
1115 struct ctdb_client *client = NULL;
1116 bool with_jenkinshash, with_mutexes;
1118 if (ctdb->tunable.allow_client_db_attach == 0) {
1119 DEBUG(DEBUG_ERR, ("DB Attach to database %s denied by tunable "
1120 "AllowClientDBAccess == 0\n", db_name));
1124 /* don't allow any local clients to attach while we are in recovery mode
1125 * except for the recovery daemon.
1126 * allow all attach from the network since these are always from remote
1129 if (client_id != 0) {
1130 client = reqid_find(ctdb->idr, client_id, struct ctdb_client);
1132 if (client != NULL) {
1133 /* If the node is inactive it is not part of the cluster
1134 and we should not allow clients to attach to any
1137 if (node->flags & NODE_FLAGS_INACTIVE) {
1138 DEBUG(DEBUG_ERR,("DB Attach to database %s refused since node is inactive (flags=0x%x)\n", db_name, node->flags));
1142 if (ctdb->recovery_mode == CTDB_RECOVERY_ACTIVE &&
1143 client->pid != ctdb->recoverd_pid &&
1144 ctdb->runstate < CTDB_RUNSTATE_RUNNING) {
1145 struct ctdb_deferred_attach_context *da_ctx = talloc(client, struct ctdb_deferred_attach_context);
1147 if (da_ctx == NULL) {
1148 DEBUG(DEBUG_ERR,("DB Attach to database %s deferral for client with pid:%d failed due to OOM.\n", db_name, client->pid));
1152 da_ctx->ctdb = ctdb;
1153 da_ctx->c = talloc_steal(da_ctx, c);
1154 talloc_set_destructor(da_ctx, ctdb_deferred_attach_destructor);
1155 DLIST_ADD(ctdb->deferred_attach, da_ctx);
1157 tevent_add_timer(ctdb->ev, da_ctx,
1158 timeval_current_ofs(ctdb->tunable.deferred_attach_timeout, 0),
1159 ctdb_deferred_attach_timeout, da_ctx);
1161 DEBUG(DEBUG_ERR,("DB Attach to database %s deferred for client with pid:%d since node is in recovery mode.\n", db_name, client->pid));
1162 *async_reply = true;
1167 /* the client can optionally pass additional tdb flags, but we
1168 only allow a subset of those on the database in ctdb. Note
1169 that tdb_flags is passed in via the (otherwise unused)
1170 srvid to the attach control */
1171 #ifdef TDB_MUTEX_LOCKING
1172 tdb_flags &= (TDB_NOSYNC|TDB_INCOMPATIBLE_HASH|TDB_MUTEX_LOCKING|TDB_CLEAR_IF_FIRST);
1174 tdb_flags &= (TDB_NOSYNC|TDB_INCOMPATIBLE_HASH);
1177 /* see if we already have this name */
1178 db = ctdb_db_handle(ctdb, db_name);
1180 if (db->persistent != persistent) {
1181 DEBUG(DEBUG_ERR, ("ERROR: DB Attach %spersistent to %spersistent "
1182 "database %s\n", persistent ? "" : "non-",
1183 db-> persistent ? "" : "non-", db_name));
1186 outdata->dptr = (uint8_t *)&db->db_id;
1187 outdata->dsize = sizeof(db->db_id);
1188 tdb_add_flags(db->ltdb->tdb, tdb_flags);
1192 with_jenkinshash = (tdb_flags & TDB_INCOMPATIBLE_HASH) ? true : false;
1193 #ifdef TDB_MUTEX_LOCKING
1194 with_mutexes = (tdb_flags & TDB_MUTEX_LOCKING) ? true : false;
1196 with_mutexes = false;
1199 if (ctdb_local_attach(ctdb, db_name, persistent, NULL,
1200 with_jenkinshash, with_mutexes) != 0) {
1204 db = ctdb_db_handle(ctdb, db_name);
1206 DEBUG(DEBUG_ERR,("Failed to find db handle for name '%s'\n", db_name));
1210 /* remember the flags the client has specified */
1211 tdb_add_flags(db->ltdb->tdb, tdb_flags);
1213 outdata->dptr = (uint8_t *)&db->db_id;
1214 outdata->dsize = sizeof(db->db_id);
1216 /* Try to ensure it's locked in mem */
1217 lockdown_memory(ctdb->valgrinding);
1219 /* tell all the other nodes about this database */
1220 ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL, tdb_flags,
1221 persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:
1222 CTDB_CONTROL_DB_ATTACH,
1223 0, CTDB_CTRL_FLAG_NOREPLY,
1224 indata, NULL, NULL);
1231 * a client has asked to detach from a database
1233 int32_t ctdb_control_db_detach(struct ctdb_context *ctdb, TDB_DATA indata,
1237 struct ctdb_db_context *ctdb_db;
1238 struct ctdb_client *client = NULL;
1240 db_id = *(uint32_t *)indata.dptr;
1241 ctdb_db = find_ctdb_db(ctdb, db_id);
1242 if (ctdb_db == NULL) {
1243 DEBUG(DEBUG_ERR, ("Invalid dbid 0x%08x in DB detach\n",
1248 if (ctdb->tunable.allow_client_db_attach == 1) {
1249 DEBUG(DEBUG_ERR, ("DB detach from database %s denied. "
1250 "Clients are allowed access to databases "
1251 "(AllowClientDBAccess == 1)\n",
1256 if (ctdb_db->persistent) {
1257 DEBUG(DEBUG_ERR, ("DB detach from persistent database %s "
1258 "denied\n", ctdb_db->db_name));
1262 /* Cannot detach from database when in recovery */
1263 if (ctdb->recovery_mode == CTDB_RECOVERY_ACTIVE) {
1264 DEBUG(DEBUG_ERR, ("DB detach denied while in recovery\n"));
1268 /* If a control comes from a client, then broadcast it to all nodes.
1269 * Do the actual detach only if the control comes from other daemons.
1271 if (client_id != 0) {
1272 client = reqid_find(ctdb->idr, client_id, struct ctdb_client);
1273 if (client != NULL) {
1274 /* forward the control to all the nodes */
1275 ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL, 0,
1276 CTDB_CONTROL_DB_DETACH, 0,
1277 CTDB_CTRL_FLAG_NOREPLY,
1278 indata, NULL, NULL);
1281 DEBUG(DEBUG_ERR, ("Client has gone away. Failing DB detach "
1282 "for database '%s'\n", ctdb_db->db_name));
1286 /* Detach database from recoverd */
1287 if (ctdb_daemon_send_message(ctdb, ctdb->pnn,
1288 CTDB_SRVID_DETACH_DATABASE,
1290 DEBUG(DEBUG_ERR, ("Unable to detach DB from recoverd\n"));
1294 /* Disable vacuuming and drop all vacuuming data */
1295 talloc_free(ctdb_db->vacuum_handle);
1296 talloc_free(ctdb_db->delete_queue);
1298 /* Terminate any deferred fetch */
1299 talloc_free(ctdb_db->deferred_fetch);
1301 /* Terminate any traverses */
1302 while (ctdb_db->traverse) {
1303 talloc_free(ctdb_db->traverse);
1306 /* Terminate any revokes */
1307 while (ctdb_db->revokechild_active) {
1308 talloc_free(ctdb_db->revokechild_active);
1311 /* Free readonly tracking database */
1312 if (ctdb_db->readonly) {
1313 talloc_free(ctdb_db->rottdb);
1316 DLIST_REMOVE(ctdb->db_list, ctdb_db);
1318 DEBUG(DEBUG_NOTICE, ("Detached from database '%s'\n",
1320 talloc_free(ctdb_db);
1326 attach to all existing persistent databases
1328 static int ctdb_attach_persistent(struct ctdb_context *ctdb,
1329 const char *unhealthy_reason)
1334 /* open the persistent db directory and scan it for files */
1335 d = opendir(ctdb->db_directory_persistent);
1340 while ((de=readdir(d))) {
1342 size_t len = strlen(de->d_name);
1344 int invalid_name = 0;
1346 s = talloc_strdup(ctdb, de->d_name);
1349 CTDB_NO_MEMORY(ctdb, s);
1352 /* only accept names ending in .tdb */
1353 p = strstr(s, ".tdb.");
1354 if (len < 7 || p == NULL) {
1359 /* only accept names ending with .tdb. and any number of digits */
1361 while (*q != 0 && invalid_name == 0) {
1362 if (!isdigit(*q++)) {
1366 if (invalid_name == 1 || sscanf(p+5, "%u", &node) != 1 || node != ctdb->pnn) {
1367 DEBUG(DEBUG_ERR,("Ignoring persistent database '%s'\n", de->d_name));
1373 if (ctdb_local_attach(ctdb, s, true, unhealthy_reason, false, false) != 0) {
1374 DEBUG(DEBUG_ERR,("Failed to attach to persistent database '%s'\n", de->d_name));
1380 DEBUG(DEBUG_INFO,("Attached to persistent database %s\n", s));
1388 int ctdb_attach_databases(struct ctdb_context *ctdb)
1391 char *persistent_health_path = NULL;
1392 char *unhealthy_reason = NULL;
1393 bool first_try = true;
1395 persistent_health_path = talloc_asprintf(ctdb, "%s/%s.%u",
1396 ctdb->db_directory_state,
1397 PERSISTENT_HEALTH_TDB,
1399 if (persistent_health_path == NULL) {
1400 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
1406 ctdb->db_persistent_health = tdb_wrap_open(ctdb, persistent_health_path,
1407 0, TDB_DISALLOW_NESTING,
1408 O_CREAT | O_RDWR, 0600);
1409 if (ctdb->db_persistent_health == NULL) {
1410 struct tdb_wrap *tdb;
1413 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s': %d - %s\n",
1414 persistent_health_path,
1417 talloc_free(persistent_health_path);
1418 talloc_free(unhealthy_reason);
1423 unhealthy_reason = talloc_asprintf(ctdb, "WARNING - '%s' %s - %s",
1424 persistent_health_path,
1425 "was cleared after a failure",
1426 "manual verification needed");
1427 if (unhealthy_reason == NULL) {
1428 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
1429 talloc_free(persistent_health_path);
1433 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - retrying after CLEAR_IF_FIRST\n",
1434 persistent_health_path));
1435 tdb = tdb_wrap_open(ctdb, persistent_health_path,
1436 0, TDB_CLEAR_IF_FIRST | TDB_DISALLOW_NESTING,
1437 O_CREAT | O_RDWR, 0600);
1439 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - with CLEAR_IF_FIRST: %d - %s\n",
1440 persistent_health_path,
1443 talloc_free(persistent_health_path);
1444 talloc_free(unhealthy_reason);
1451 ret = tdb_check(ctdb->db_persistent_health->tdb, NULL, NULL);
1453 struct tdb_wrap *tdb;
1455 talloc_free(ctdb->db_persistent_health);
1456 ctdb->db_persistent_health = NULL;
1459 DEBUG(DEBUG_CRIT,("tdb_check('%s') failed\n",
1460 persistent_health_path));
1461 talloc_free(persistent_health_path);
1462 talloc_free(unhealthy_reason);
1467 unhealthy_reason = talloc_asprintf(ctdb, "WARNING - '%s' %s - %s",
1468 persistent_health_path,
1469 "was cleared after a failure",
1470 "manual verification needed");
1471 if (unhealthy_reason == NULL) {
1472 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
1473 talloc_free(persistent_health_path);
1477 DEBUG(DEBUG_CRIT,("tdb_check('%s') failed - retrying after CLEAR_IF_FIRST\n",
1478 persistent_health_path));
1479 tdb = tdb_wrap_open(ctdb, persistent_health_path,
1480 0, TDB_CLEAR_IF_FIRST | TDB_DISALLOW_NESTING,
1481 O_CREAT | O_RDWR, 0600);
1483 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - with CLEAR_IF_FIRST: %d - %s\n",
1484 persistent_health_path,
1487 talloc_free(persistent_health_path);
1488 talloc_free(unhealthy_reason);
1495 talloc_free(persistent_health_path);
1497 ret = ctdb_attach_persistent(ctdb, unhealthy_reason);
1498 talloc_free(unhealthy_reason);
1507 called when a broadcast seqnum update comes in
1509 int32_t ctdb_ltdb_update_seqnum(struct ctdb_context *ctdb, uint32_t db_id, uint32_t srcnode)
1511 struct ctdb_db_context *ctdb_db;
1512 if (srcnode == ctdb->pnn) {
1513 /* don't update ourselves! */
1517 ctdb_db = find_ctdb_db(ctdb, db_id);
1519 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_ltdb_update_seqnum\n", db_id));
1523 if (ctdb_db->unhealthy_reason) {
1524 DEBUG(DEBUG_ERR,("db(%s) unhealty in ctdb_ltdb_update_seqnum: %s\n",
1525 ctdb_db->db_name, ctdb_db->unhealthy_reason));
1529 tdb_increment_seqnum_nonblock(ctdb_db->ltdb->tdb);
1530 ctdb_db->seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1535 timer to check for seqnum changes in a ltdb and propogate them
1537 static void ctdb_ltdb_seqnum_check(struct tevent_context *ev,
1538 struct tevent_timer *te,
1539 struct timeval t, void *p)
1541 struct ctdb_db_context *ctdb_db = talloc_get_type(p, struct ctdb_db_context);
1542 struct ctdb_context *ctdb = ctdb_db->ctdb;
1543 uint32_t new_seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1544 if (new_seqnum != ctdb_db->seqnum) {
1545 /* something has changed - propogate it */
1547 data.dptr = (uint8_t *)&ctdb_db->db_id;
1548 data.dsize = sizeof(uint32_t);
1549 ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_VNNMAP, 0,
1550 CTDB_CONTROL_UPDATE_SEQNUM, 0, CTDB_CTRL_FLAG_NOREPLY,
1553 ctdb_db->seqnum = new_seqnum;
1555 /* setup a new timer */
1556 ctdb_db->seqnum_update =
1557 tevent_add_timer(ctdb->ev, ctdb_db,
1558 timeval_current_ofs(ctdb->tunable.seqnum_interval/1000,
1559 (ctdb->tunable.seqnum_interval%1000)*1000),
1560 ctdb_ltdb_seqnum_check, ctdb_db);
1564 enable seqnum handling on this db
1566 int32_t ctdb_ltdb_enable_seqnum(struct ctdb_context *ctdb, uint32_t db_id)
1568 struct ctdb_db_context *ctdb_db;
1569 ctdb_db = find_ctdb_db(ctdb, db_id);
1571 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_ltdb_enable_seqnum\n", db_id));
1575 if (ctdb_db->seqnum_update == NULL) {
1576 ctdb_db->seqnum_update = tevent_add_timer(
1578 timeval_current_ofs(ctdb->tunable.seqnum_interval/1000,
1579 (ctdb->tunable.seqnum_interval%1000)*1000),
1580 ctdb_ltdb_seqnum_check, ctdb_db);
1583 tdb_enable_seqnum(ctdb_db->ltdb->tdb);
1584 ctdb_db->seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1588 int ctdb_set_db_sticky(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db)
1590 if (ctdb_db->sticky) {
1594 if (ctdb_db->persistent) {
1595 DEBUG(DEBUG_ERR,("Trying to set persistent database with sticky property\n"));
1599 ctdb_db->sticky_records = trbt_create(ctdb_db, 0);
1601 ctdb_db->sticky = true;
1603 DEBUG(DEBUG_NOTICE,("set db sticky %s\n", ctdb_db->db_name));
1608 void ctdb_db_statistics_reset(struct ctdb_db_context *ctdb_db)
1610 struct ctdb_db_statistics_old *s = &ctdb_db->statistics;
1613 for (i=0; i<MAX_HOT_KEYS; i++) {
1614 if (s->hot_keys[i].key.dsize > 0) {
1615 talloc_free(s->hot_keys[i].key.dptr);
1619 ZERO_STRUCT(ctdb_db->statistics);
1622 int32_t ctdb_control_get_db_statistics(struct ctdb_context *ctdb,
1626 struct ctdb_db_context *ctdb_db;
1627 struct ctdb_db_statistics_old *stats;
1632 ctdb_db = find_ctdb_db(ctdb, db_id);
1634 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in get_db_statistics\n", db_id));
1638 len = offsetof(struct ctdb_db_statistics_old, hot_keys_wire);
1639 for (i = 0; i < MAX_HOT_KEYS; i++) {
1640 len += ctdb_db->statistics.hot_keys[i].key.dsize;
1643 stats = talloc_size(outdata, len);
1644 if (stats == NULL) {
1645 DEBUG(DEBUG_ERR,("Failed to allocate db statistics structure\n"));
1649 memcpy(stats, &ctdb_db->statistics,
1650 offsetof(struct ctdb_db_statistics_old, hot_keys_wire));
1652 stats->num_hot_keys = MAX_HOT_KEYS;
1654 ptr = &stats->hot_keys_wire[0];
1655 for (i = 0; i < MAX_HOT_KEYS; i++) {
1656 memcpy(ptr, ctdb_db->statistics.hot_keys[i].key.dptr,
1657 ctdb_db->statistics.hot_keys[i].key.dsize);
1658 ptr += ctdb_db->statistics.hot_keys[i].key.dsize;
1661 outdata->dptr = (uint8_t *)stats;
1662 outdata->dsize = len;