2 ctdb ltdb code - server side
4 Copyright (C) Andrew Tridgell 2007
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program; if not, see <http://www.gnu.org/licenses/>.
21 #include "system/network.h"
22 #include "system/filesys.h"
23 #include "system/dir.h"
24 #include "system/time.h"
25 #include "system/locale.h"
30 #include "lib/tdb_wrap/tdb_wrap.h"
31 #include "lib/util/dlinklist.h"
32 #include "lib/util/debug.h"
33 #include "lib/util/samba_util.h"
35 #include "ctdb_private.h"
36 #include "ctdb_client.h"
38 #include "common/rb_tree.h"
39 #include "common/reqid.h"
40 #include "common/system.h"
41 #include "common/common.h"
42 #include "common/logging.h"
44 #define PERSISTENT_HEALTH_TDB "persistent_health.tdb"
47 * write a record to a normal database
49 * This is the server-variant of the ctdb_ltdb_store function.
50 * It contains logic to determine whether a record should be
51 * stored or deleted. It also sends SCHEDULE_FOR_DELETION
52 * controls to the local ctdb daemon if apporpriate.
54 static int ctdb_ltdb_store_server(struct ctdb_db_context *ctdb_db,
56 struct ctdb_ltdb_header *header,
59 struct ctdb_context *ctdb = ctdb_db->ctdb;
61 uint32_t hsize = sizeof(struct ctdb_ltdb_header);
63 bool seqnum_suppressed = false;
65 bool schedule_for_deletion = false;
66 bool remove_from_delete_queue = false;
69 if (ctdb->flags & CTDB_FLAG_TORTURE) {
71 struct ctdb_ltdb_header *h2;
73 old = tdb_fetch(ctdb_db->ltdb->tdb, key);
74 h2 = (struct ctdb_ltdb_header *)old.dptr;
75 if (old.dptr != NULL &&
77 h2->rsn > header->rsn) {
79 ("RSN regression! %"PRIu64" %"PRIu64"\n",
80 h2->rsn, header->rsn));
87 if (ctdb->vnn_map == NULL) {
89 * Called from a client: always store the record
90 * Also don't call ctdb_lmaster since it uses the vnn_map!
96 lmaster = ctdb_lmaster(ctdb_db->ctdb, &key);
99 * If we migrate an empty record off to another node
100 * and the record has not been migrated with data,
101 * delete the record instead of storing the empty record.
103 if (data.dsize != 0) {
105 } else if (header->flags & CTDB_REC_RO_FLAGS) {
107 } else if (header->flags & CTDB_REC_FLAG_AUTOMATIC) {
109 * The record is not created by the client but
110 * automatically by the ctdb_ltdb_fetch logic that
111 * creates a record with an initial header in the
112 * ltdb before trying to migrate the record from
113 * the current lmaster. Keep it instead of trying
114 * to delete the non-existing record...
117 schedule_for_deletion = true;
118 } else if (header->flags & CTDB_REC_FLAG_MIGRATED_WITH_DATA) {
120 } else if (ctdb_db->ctdb->pnn == lmaster) {
122 * If we are lmaster, then we usually keep the record.
123 * But if we retrieve the dmaster role by a VACUUM_MIGRATE
124 * and the record is empty and has never been migrated
125 * with data, then we should delete it instead of storing it.
126 * This is part of the vacuuming process.
128 * The reason that we usually need to store even empty records
129 * on the lmaster is that a client operating directly on the
130 * lmaster (== dmaster) expects the local copy of the record to
131 * exist after successful ctdb migrate call. If the record does
132 * not exist, the client goes into a migrate loop and eventually
133 * fails. So storing the empty record makes sure that we do not
134 * need to change the client code.
136 if (!(header->flags & CTDB_REC_FLAG_VACUUM_MIGRATED)) {
138 } else if (ctdb_db->ctdb->pnn != header->dmaster) {
141 } else if (ctdb_db->ctdb->pnn == header->dmaster) {
146 if (!ctdb_db->persistent &&
147 (ctdb_db->ctdb->pnn == header->dmaster) &&
148 !(header->flags & CTDB_REC_RO_FLAGS))
152 if (data.dsize == 0) {
153 schedule_for_deletion = true;
156 remove_from_delete_queue = !schedule_for_deletion;
161 * The VACUUM_MIGRATED flag is only set temporarily for
162 * the above logic when the record was retrieved by a
163 * VACUUM_MIGRATE call and should not be stored in the
166 * The VACUUM_MIGRATE call is triggered by a vacuum fetch,
167 * and there are two cases in which the corresponding record
168 * is stored in the local database:
169 * 1. The record has been migrated with data in the past
170 * (the MIGRATED_WITH_DATA record flag is set).
171 * 2. The record has been filled with data again since it
172 * had been submitted in the VACUUM_FETCH message to the
174 * For such records it is important to not store the
175 * VACUUM_MIGRATED flag in the database.
177 header->flags &= ~CTDB_REC_FLAG_VACUUM_MIGRATED;
180 * Similarly, clear the AUTOMATIC flag which should not enter
181 * the local database copy since this would require client
182 * modifications to clear the flag when the client stores
185 header->flags &= ~CTDB_REC_FLAG_AUTOMATIC;
187 rec[0].dsize = hsize;
188 rec[0].dptr = (uint8_t *)header;
190 rec[1].dsize = data.dsize;
191 rec[1].dptr = data.dptr;
193 /* Databases with seqnum updates enabled only get their seqnum
194 changes when/if we modify the data */
195 if (ctdb_db->seqnum_update != NULL) {
197 old = tdb_fetch(ctdb_db->ltdb->tdb, key);
199 if ((old.dsize == hsize + data.dsize) &&
200 memcmp(old.dptr + hsize, data.dptr, data.dsize) == 0) {
201 tdb_remove_flags(ctdb_db->ltdb->tdb, TDB_SEQNUM);
202 seqnum_suppressed = true;
204 if (old.dptr != NULL) {
209 DEBUG(DEBUG_DEBUG, (__location__ " db[%s]: %s record: hash[0x%08x]\n",
211 keep?"storing":"deleting",
215 ret = tdb_storev(ctdb_db->ltdb->tdb, key, rec, 2, TDB_REPLACE);
217 ret = tdb_delete(ctdb_db->ltdb->tdb, key);
224 tdb_error(ctdb_db->ltdb->tdb) == TDB_ERR_NOEXIST)
229 DEBUG(lvl, (__location__ " db[%s]: Failed to %s record: "
232 keep?"store":"delete", ret,
233 tdb_errorstr(ctdb_db->ltdb->tdb)));
235 schedule_for_deletion = false;
236 remove_from_delete_queue = false;
238 if (seqnum_suppressed) {
239 tdb_add_flags(ctdb_db->ltdb->tdb, TDB_SEQNUM);
242 if (schedule_for_deletion) {
244 ret2 = ctdb_local_schedule_for_deletion(ctdb_db, header, key);
246 DEBUG(DEBUG_ERR, (__location__ " ctdb_local_schedule_for_deletion failed.\n"));
250 if (remove_from_delete_queue) {
251 ctdb_local_remove_from_delete_queue(ctdb_db, header, key);
257 struct lock_fetch_state {
258 struct ctdb_context *ctdb;
259 struct ctdb_db_context *ctdb_db;
260 void (*recv_pkt)(void *, struct ctdb_req_header *);
262 struct ctdb_req_header *hdr;
264 bool ignore_generation;
268 called when we should retry the operation
270 static void lock_fetch_callback(void *p, bool locked)
272 struct lock_fetch_state *state = talloc_get_type(p, struct lock_fetch_state);
273 if (!state->ignore_generation &&
274 state->generation != state->ctdb_db->generation) {
275 DEBUG(DEBUG_NOTICE,("Discarding previous generation lockwait packet\n"));
276 talloc_free(state->hdr);
279 state->recv_pkt(state->recv_context, state->hdr);
280 DEBUG(DEBUG_INFO,(__location__ " PACKET REQUEUED\n"));
285 do a non-blocking ltdb_lock, deferring this ctdb request until we
288 It does the following:
290 1) tries to get the chainlock. If it succeeds, then it returns 0
292 2) if it fails to get a chainlock immediately then it sets up a
293 non-blocking chainlock via ctdb_lock_record, and when it gets the
294 chainlock it re-submits this ctdb request to the main packet
297 This effectively queues all ctdb requests that cannot be
298 immediately satisfied until it can get the lock. This means that
299 the main ctdb daemon will not block waiting for a chainlock held by
302 There are 3 possible return values:
304 0: means that it got the lock immediately.
305 -1: means that it failed to get the lock, and won't retry
306 -2: means that it failed to get the lock immediately, but will retry
308 int ctdb_ltdb_lock_requeue(struct ctdb_db_context *ctdb_db,
309 TDB_DATA key, struct ctdb_req_header *hdr,
310 void (*recv_pkt)(void *, struct ctdb_req_header *),
311 void *recv_context, bool ignore_generation)
314 struct tdb_context *tdb = ctdb_db->ltdb->tdb;
315 struct lock_request *lreq;
316 struct lock_fetch_state *state;
318 ret = tdb_chainlock_nonblock(tdb, key);
321 !(errno == EACCES || errno == EAGAIN || errno == EDEADLK)) {
322 /* a hard failure - don't try again */
326 /* when torturing, ensure we test the contended path */
327 if ((ctdb_db->ctdb->flags & CTDB_FLAG_TORTURE) &&
330 tdb_chainunlock(tdb, key);
333 /* first the non-contended path */
338 state = talloc(hdr, struct lock_fetch_state);
339 state->ctdb = ctdb_db->ctdb;
340 state->ctdb_db = ctdb_db;
342 state->recv_pkt = recv_pkt;
343 state->recv_context = recv_context;
344 state->generation = ctdb_db->generation;
345 state->ignore_generation = ignore_generation;
347 /* now the contended path */
348 lreq = ctdb_lock_record(state, ctdb_db, key, true, lock_fetch_callback, state);
353 /* we need to move the packet off the temporary context in ctdb_input_pkt(),
354 so it won't be freed yet */
355 talloc_steal(state, hdr);
357 /* now tell the caller than we will retry asynchronously */
362 a varient of ctdb_ltdb_lock_requeue that also fetches the record
364 int ctdb_ltdb_lock_fetch_requeue(struct ctdb_db_context *ctdb_db,
365 TDB_DATA key, struct ctdb_ltdb_header *header,
366 struct ctdb_req_header *hdr, TDB_DATA *data,
367 void (*recv_pkt)(void *, struct ctdb_req_header *),
368 void *recv_context, bool ignore_generation)
372 ret = ctdb_ltdb_lock_requeue(ctdb_db, key, hdr, recv_pkt,
373 recv_context, ignore_generation);
375 ret = ctdb_ltdb_fetch(ctdb_db, key, header, hdr, data);
378 uret = ctdb_ltdb_unlock(ctdb_db, key);
380 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", uret));
389 paraoid check to see if the db is empty
391 static void ctdb_check_db_empty(struct ctdb_db_context *ctdb_db)
393 struct tdb_context *tdb = ctdb_db->ltdb->tdb;
394 int count = tdb_traverse_read(tdb, NULL, NULL);
396 DEBUG(DEBUG_ALERT,(__location__ " tdb '%s' not empty on attach! aborting\n",
398 ctdb_fatal(ctdb_db->ctdb, "database not empty on attach");
402 int ctdb_load_persistent_health(struct ctdb_context *ctdb,
403 struct ctdb_db_context *ctdb_db)
405 struct tdb_context *tdb = ctdb->db_persistent_health->tdb;
411 key.dptr = discard_const_p(uint8_t, ctdb_db->db_name);
412 key.dsize = strlen(ctdb_db->db_name);
414 old = ctdb_db->unhealthy_reason;
415 ctdb_db->unhealthy_reason = NULL;
417 val = tdb_fetch(tdb, key);
419 reason = talloc_strndup(ctdb_db,
420 (const char *)val.dptr,
422 if (reason == NULL) {
423 DEBUG(DEBUG_ALERT,(__location__ " talloc_strndup(%d) failed\n",
425 ctdb_db->unhealthy_reason = old;
436 ctdb_db->unhealthy_reason = reason;
440 int ctdb_update_persistent_health(struct ctdb_context *ctdb,
441 struct ctdb_db_context *ctdb_db,
442 const char *given_reason,/* NULL means healthy */
443 int num_healthy_nodes)
445 struct tdb_context *tdb = ctdb->db_persistent_health->tdb;
449 char *new_reason = NULL;
450 char *old_reason = NULL;
452 ret = tdb_transaction_start(tdb);
454 DEBUG(DEBUG_ALERT,(__location__ " tdb_transaction_start('%s') failed: %d - %s\n",
455 tdb_name(tdb), ret, tdb_errorstr(tdb)));
459 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
461 DEBUG(DEBUG_ALERT,(__location__ " ctdb_load_persistent_health('%s') failed: %d\n",
462 ctdb_db->db_name, ret));
465 old_reason = ctdb_db->unhealthy_reason;
467 key.dptr = discard_const_p(uint8_t, ctdb_db->db_name);
468 key.dsize = strlen(ctdb_db->db_name);
471 new_reason = talloc_strdup(ctdb_db, given_reason);
472 if (new_reason == NULL) {
473 DEBUG(DEBUG_ALERT,(__location__ " talloc_strdup(%s) failed\n",
477 } else if (old_reason && num_healthy_nodes == 0) {
479 * If the reason indicates ok, but there where no healthy nodes
480 * available, that it means, we have not recovered valid content
481 * of the db. So if there's an old reason, prefix it with
482 * "NO-HEALTHY-NODES - "
486 #define _TMP_PREFIX "NO-HEALTHY-NODES - "
487 ret = strncmp(_TMP_PREFIX, old_reason, strlen(_TMP_PREFIX));
489 prefix = _TMP_PREFIX;
493 new_reason = talloc_asprintf(ctdb_db, "%s%s",
495 if (new_reason == NULL) {
496 DEBUG(DEBUG_ALERT,(__location__ " talloc_asprintf(%s%s) failed\n",
497 prefix, old_reason));
504 val.dptr = discard_const_p(uint8_t, new_reason);
505 val.dsize = strlen(new_reason);
507 ret = tdb_store(tdb, key, val, TDB_REPLACE);
509 tdb_transaction_cancel(tdb);
510 DEBUG(DEBUG_ALERT,(__location__ " tdb_store('%s', %s, %s) failed: %d - %s\n",
511 tdb_name(tdb), ctdb_db->db_name, new_reason,
512 ret, tdb_errorstr(tdb)));
513 talloc_free(new_reason);
516 DEBUG(DEBUG_ALERT,("Updated db health for db(%s) to: %s\n",
517 ctdb_db->db_name, new_reason));
518 } else if (old_reason) {
519 ret = tdb_delete(tdb, key);
521 tdb_transaction_cancel(tdb);
522 DEBUG(DEBUG_ALERT,(__location__ " tdb_delete('%s', %s) failed: %d - %s\n",
523 tdb_name(tdb), ctdb_db->db_name,
524 ret, tdb_errorstr(tdb)));
525 talloc_free(new_reason);
528 DEBUG(DEBUG_NOTICE,("Updated db health for db(%s): OK\n",
532 ret = tdb_transaction_commit(tdb);
533 if (ret != TDB_SUCCESS) {
534 DEBUG(DEBUG_ALERT,(__location__ " tdb_transaction_commit('%s') failed: %d - %s\n",
535 tdb_name(tdb), ret, tdb_errorstr(tdb)));
536 talloc_free(new_reason);
540 talloc_free(old_reason);
541 ctdb_db->unhealthy_reason = new_reason;
546 static int ctdb_backup_corrupted_tdb(struct ctdb_context *ctdb,
547 struct ctdb_db_context *ctdb_db)
549 time_t now = time(NULL);
557 /* formatted like: foo.tdb.0.corrupted.20091204160825.0Z */
558 new_path = talloc_asprintf(ctdb_db, "%s.corrupted."
559 "%04u%02u%02u%02u%02u%02u.0Z",
561 tm->tm_year+1900, tm->tm_mon+1,
562 tm->tm_mday, tm->tm_hour, tm->tm_min,
564 if (new_path == NULL) {
565 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
569 new_reason = talloc_asprintf(ctdb_db,
570 "ERROR - Backup of corrupted TDB in '%s'",
572 if (new_reason == NULL) {
573 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
576 ret = ctdb_update_persistent_health(ctdb, ctdb_db, new_reason, 0);
577 talloc_free(new_reason);
579 DEBUG(DEBUG_CRIT,(__location__
580 ": ctdb_backup_corrupted_tdb(%s) not implemented yet\n",
585 ret = rename(ctdb_db->db_path, new_path);
587 DEBUG(DEBUG_CRIT,(__location__
588 ": ctdb_backup_corrupted_tdb(%s) rename to %s failed: %d - %s\n",
589 ctdb_db->db_path, new_path,
590 errno, strerror(errno)));
591 talloc_free(new_path);
595 DEBUG(DEBUG_CRIT,(__location__
596 ": ctdb_backup_corrupted_tdb(%s) renamed to %s\n",
597 ctdb_db->db_path, new_path));
598 talloc_free(new_path);
602 int ctdb_recheck_persistent_health(struct ctdb_context *ctdb)
604 struct ctdb_db_context *ctdb_db;
609 for (ctdb_db = ctdb->db_list; ctdb_db; ctdb_db = ctdb_db->next) {
610 if (!ctdb_db->persistent) {
614 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
616 DEBUG(DEBUG_ALERT,(__location__
617 " load persistent health for '%s' failed\n",
622 if (ctdb_db->unhealthy_reason == NULL) {
624 DEBUG(DEBUG_INFO,(__location__
625 " persistent db '%s' healthy\n",
631 DEBUG(DEBUG_ALERT,(__location__
632 " persistent db '%s' unhealthy: %s\n",
634 ctdb_db->unhealthy_reason));
637 ("ctdb_recheck_persistent_health: OK[%d] FAIL[%d]\n",
649 mark a database - as healthy
651 int32_t ctdb_control_db_set_healthy(struct ctdb_context *ctdb, TDB_DATA indata)
653 uint32_t db_id = *(uint32_t *)indata.dptr;
654 struct ctdb_db_context *ctdb_db;
656 bool may_recover = false;
658 ctdb_db = find_ctdb_db(ctdb, db_id);
660 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%x\n", db_id));
664 if (ctdb_db->unhealthy_reason) {
668 ret = ctdb_update_persistent_health(ctdb, ctdb_db, NULL, 1);
670 DEBUG(DEBUG_ERR,(__location__
671 " ctdb_update_persistent_health(%s) failed\n",
676 if (may_recover && ctdb->runstate == CTDB_RUNSTATE_STARTUP) {
677 DEBUG(DEBUG_ERR, (__location__ " db %s become healthy - force recovery for startup\n",
679 ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
685 int32_t ctdb_control_db_get_health(struct ctdb_context *ctdb,
689 uint32_t db_id = *(uint32_t *)indata.dptr;
690 struct ctdb_db_context *ctdb_db;
693 ctdb_db = find_ctdb_db(ctdb, db_id);
695 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%x\n", db_id));
699 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
701 DEBUG(DEBUG_ERR,(__location__
702 " ctdb_load_persistent_health(%s) failed\n",
708 if (ctdb_db->unhealthy_reason) {
709 outdata->dptr = (uint8_t *)ctdb_db->unhealthy_reason;
710 outdata->dsize = strlen(ctdb_db->unhealthy_reason)+1;
717 int ctdb_set_db_readonly(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db)
721 if (ctdb_db->readonly) {
725 if (ctdb_db->persistent) {
726 DEBUG(DEBUG_ERR,("Persistent databases do not support readonly property\n"));
730 ropath = talloc_asprintf(ctdb_db, "%s.RO", ctdb_db->db_path);
731 if (ropath == NULL) {
732 DEBUG(DEBUG_CRIT,("Failed to asprintf the tracking database\n"));
735 ctdb_db->rottdb = tdb_open(ropath,
736 ctdb->tunable.database_hash_size,
737 TDB_NOLOCK|TDB_CLEAR_IF_FIRST|TDB_NOSYNC,
738 O_CREAT|O_RDWR, 0600);
739 if (ctdb_db->rottdb == NULL) {
740 DEBUG(DEBUG_CRIT,("Failed to open/create the tracking database '%s'\n", ropath));
745 DEBUG(DEBUG_NOTICE,("OPENED tracking database : '%s'\n", ropath));
747 ctdb_db->readonly = true;
749 DEBUG(DEBUG_NOTICE, ("Readonly property set on DB %s\n", ctdb_db->db_name));
756 attach to a database, handling both persistent and non-persistent databases
757 return 0 on success, -1 on failure
759 static int ctdb_local_attach(struct ctdb_context *ctdb, const char *db_name,
760 bool persistent, const char *unhealthy_reason,
761 bool jenkinshash, bool mutexes)
763 struct ctdb_db_context *ctdb_db, *tmp_db;
768 int remaining_tries = 0;
770 ctdb_db = talloc_zero(ctdb, struct ctdb_db_context);
771 CTDB_NO_MEMORY(ctdb, ctdb_db);
773 ctdb_db->ctdb = ctdb;
774 ctdb_db->db_name = talloc_strdup(ctdb_db, db_name);
775 CTDB_NO_MEMORY(ctdb, ctdb_db->db_name);
777 key.dsize = strlen(db_name)+1;
778 key.dptr = discard_const(db_name);
779 ctdb_db->db_id = ctdb_hash(&key);
780 ctdb_db->persistent = persistent;
782 if (!ctdb_db->persistent) {
783 ctdb_db->delete_queue = trbt_create(ctdb_db, 0);
784 if (ctdb_db->delete_queue == NULL) {
785 CTDB_NO_MEMORY(ctdb, ctdb_db->delete_queue);
788 ctdb_db->ctdb_ltdb_store_fn = ctdb_ltdb_store_server;
791 /* check for hash collisions */
792 for (tmp_db=ctdb->db_list;tmp_db;tmp_db=tmp_db->next) {
793 if (tmp_db->db_id == ctdb_db->db_id) {
794 DEBUG(DEBUG_CRIT,("db_id 0x%x hash collision. name1='%s' name2='%s'\n",
795 tmp_db->db_id, db_name, tmp_db->db_name));
796 talloc_free(ctdb_db);
802 if (unhealthy_reason) {
803 ret = ctdb_update_persistent_health(ctdb, ctdb_db,
804 unhealthy_reason, 0);
806 DEBUG(DEBUG_ALERT,(__location__ " ctdb_update_persistent_health('%s','%s') failed: %d\n",
807 ctdb_db->db_name, unhealthy_reason, ret));
808 talloc_free(ctdb_db);
813 if (ctdb->max_persistent_check_errors > 0) {
816 if (ctdb->runstate == CTDB_RUNSTATE_RUNNING) {
820 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
822 DEBUG(DEBUG_ALERT,(__location__ " ctdb_load_persistent_health('%s') failed: %d\n",
823 ctdb_db->db_name, ret));
824 talloc_free(ctdb_db);
829 if (ctdb_db->unhealthy_reason && remaining_tries == 0) {
830 DEBUG(DEBUG_ALERT,(__location__ "ERROR: tdb %s is marked as unhealthy: %s\n",
831 ctdb_db->db_name, ctdb_db->unhealthy_reason));
832 talloc_free(ctdb_db);
836 if (ctdb_db->unhealthy_reason) {
837 /* this is just a warning, but we want that in the log file! */
838 DEBUG(DEBUG_ALERT,(__location__ "Warning: tdb %s is marked as unhealthy: %s\n",
839 ctdb_db->db_name, ctdb_db->unhealthy_reason));
842 /* open the database */
843 ctdb_db->db_path = talloc_asprintf(ctdb_db, "%s/%s.%u",
844 persistent?ctdb->db_directory_persistent:ctdb->db_directory,
847 tdb_flags = persistent? TDB_DEFAULT : TDB_CLEAR_IF_FIRST | TDB_NOSYNC;
848 if (ctdb->valgrinding) {
849 tdb_flags |= TDB_NOMMAP;
851 tdb_flags |= TDB_DISALLOW_NESTING;
853 tdb_flags |= TDB_INCOMPATIBLE_HASH;
855 #ifdef TDB_MUTEX_LOCKING
856 if (ctdb->tunable.mutex_enabled && mutexes &&
857 tdb_runtime_check_for_robust_mutexes()) {
858 tdb_flags |= (TDB_MUTEX_LOCKING | TDB_CLEAR_IF_FIRST);
863 ctdb_db->ltdb = tdb_wrap_open(ctdb_db, ctdb_db->db_path,
864 ctdb->tunable.database_hash_size,
866 O_CREAT|O_RDWR, mode);
867 if (ctdb_db->ltdb == NULL) {
869 int saved_errno = errno;
872 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s': %d - %s\n",
875 strerror(saved_errno)));
876 talloc_free(ctdb_db);
880 if (remaining_tries == 0) {
881 DEBUG(DEBUG_CRIT,(__location__
882 "Failed to open persistent tdb '%s': %d - %s\n",
885 strerror(saved_errno)));
886 talloc_free(ctdb_db);
890 ret = stat(ctdb_db->db_path, &st);
892 DEBUG(DEBUG_CRIT,(__location__
893 "Failed to open persistent tdb '%s': %d - %s\n",
896 strerror(saved_errno)));
897 talloc_free(ctdb_db);
901 ret = ctdb_backup_corrupted_tdb(ctdb, ctdb_db);
903 DEBUG(DEBUG_CRIT,(__location__
904 "Failed to open persistent tdb '%s': %d - %s\n",
907 strerror(saved_errno)));
908 talloc_free(ctdb_db);
918 ctdb_check_db_empty(ctdb_db);
920 ret = tdb_check(ctdb_db->ltdb->tdb, NULL, NULL);
925 DEBUG(DEBUG_CRIT,("tdb_check(%s) failed: %d - %s\n",
926 ctdb_db->db_path, ret,
927 tdb_errorstr(ctdb_db->ltdb->tdb)));
928 if (remaining_tries == 0) {
929 talloc_free(ctdb_db);
933 fd = tdb_fd(ctdb_db->ltdb->tdb);
934 ret = fstat(fd, &st);
936 DEBUG(DEBUG_CRIT,(__location__
937 "Failed to fstat() persistent tdb '%s': %d - %s\n",
941 talloc_free(ctdb_db);
946 talloc_free(ctdb_db->ltdb);
947 ctdb_db->ltdb = NULL;
949 ret = ctdb_backup_corrupted_tdb(ctdb, ctdb_db);
951 DEBUG(DEBUG_CRIT,("Failed to backup corrupted tdb '%s'\n",
953 talloc_free(ctdb_db);
963 /* set up a rb tree we can use to track which records we have a
964 fetch-lock in-flight for so we can defer any additional calls
967 ctdb_db->deferred_fetch = trbt_create(ctdb_db, 0);
968 if (ctdb_db->deferred_fetch == NULL) {
969 DEBUG(DEBUG_ERR,("Failed to create deferred fetch rb tree for ctdb database\n"));
970 talloc_free(ctdb_db);
974 ctdb_db->defer_dmaster = trbt_create(ctdb_db, 0);
975 if (ctdb_db->defer_dmaster == NULL) {
976 DEBUG(DEBUG_ERR, ("Failed to create defer dmaster rb tree for %s\n",
978 talloc_free(ctdb_db);
982 DLIST_ADD(ctdb->db_list, ctdb_db);
984 /* setting this can help some high churn databases */
985 tdb_set_max_dead(ctdb_db->ltdb->tdb, ctdb->tunable.database_max_dead);
988 all databases support the "null" function. we need this in
989 order to do forced migration of records
991 ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_null_func, CTDB_NULL_FUNC);
993 DEBUG(DEBUG_CRIT,("Failed to setup null function for '%s'\n", ctdb_db->db_name));
994 talloc_free(ctdb_db);
999 all databases support the "fetch" function. we need this
1000 for efficient Samba3 ctdb fetch
1002 ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_fetch_func, CTDB_FETCH_FUNC);
1004 DEBUG(DEBUG_CRIT,("Failed to setup fetch function for '%s'\n", ctdb_db->db_name));
1005 talloc_free(ctdb_db);
1010 all databases support the "fetch_with_header" function. we need this
1011 for efficient readonly record fetches
1013 ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_fetch_with_header_func, CTDB_FETCH_WITH_HEADER_FUNC);
1015 DEBUG(DEBUG_CRIT,("Failed to setup fetch function for '%s'\n", ctdb_db->db_name));
1016 talloc_free(ctdb_db);
1020 ret = ctdb_vacuum_init(ctdb_db);
1022 DEBUG(DEBUG_CRIT,("Failed to setup vacuuming for "
1023 "database '%s'\n", ctdb_db->db_name));
1024 talloc_free(ctdb_db);
1028 ret = ctdb_migration_init(ctdb_db);
1031 ("Failed to setup migration tracking for db '%s'\n",
1033 talloc_free(ctdb_db);
1037 ctdb_db->generation = ctdb->vnn_map->generation;
1039 DEBUG(DEBUG_NOTICE,("Attached to database '%s' with flags 0x%x\n",
1040 ctdb_db->db_path, tdb_flags));
1047 struct ctdb_deferred_attach_context {
1048 struct ctdb_deferred_attach_context *next, *prev;
1049 struct ctdb_context *ctdb;
1050 struct ctdb_req_control_old *c;
1054 static int ctdb_deferred_attach_destructor(struct ctdb_deferred_attach_context *da_ctx)
1056 DLIST_REMOVE(da_ctx->ctdb->deferred_attach, da_ctx);
1061 static void ctdb_deferred_attach_timeout(struct tevent_context *ev,
1062 struct tevent_timer *te,
1063 struct timeval t, void *private_data)
1065 struct ctdb_deferred_attach_context *da_ctx = talloc_get_type(private_data, struct ctdb_deferred_attach_context);
1066 struct ctdb_context *ctdb = da_ctx->ctdb;
1068 ctdb_request_control_reply(ctdb, da_ctx->c, NULL, -1, NULL);
1069 talloc_free(da_ctx);
1072 static void ctdb_deferred_attach_callback(struct tevent_context *ev,
1073 struct tevent_timer *te,
1074 struct timeval t, void *private_data)
1076 struct ctdb_deferred_attach_context *da_ctx = talloc_get_type(private_data, struct ctdb_deferred_attach_context);
1077 struct ctdb_context *ctdb = da_ctx->ctdb;
1079 /* This talloc-steals the packet ->c */
1080 ctdb_input_pkt(ctdb, (struct ctdb_req_header *)da_ctx->c);
1081 talloc_free(da_ctx);
1084 int ctdb_process_deferred_attach(struct ctdb_context *ctdb)
1086 struct ctdb_deferred_attach_context *da_ctx;
1088 /* call it from the main event loop as soon as the current event
1091 while ((da_ctx = ctdb->deferred_attach) != NULL) {
1092 DLIST_REMOVE(ctdb->deferred_attach, da_ctx);
1093 tevent_add_timer(ctdb->ev, da_ctx,
1094 timeval_current_ofs(1,0),
1095 ctdb_deferred_attach_callback, da_ctx);
1102 a client has asked to attach a new database
1104 int32_t ctdb_control_db_attach(struct ctdb_context *ctdb, TDB_DATA indata,
1105 TDB_DATA *outdata, uint64_t tdb_flags,
1106 bool persistent, uint32_t client_id,
1107 struct ctdb_req_control_old *c,
1110 const char *db_name = (const char *)indata.dptr;
1111 struct ctdb_db_context *db;
1112 struct ctdb_node *node = ctdb->nodes[ctdb->pnn];
1113 struct ctdb_client *client = NULL;
1114 bool with_jenkinshash, with_mutexes;
1116 if (ctdb->tunable.allow_client_db_attach == 0) {
1117 DEBUG(DEBUG_ERR, ("DB Attach to database %s denied by tunable "
1118 "AllowClientDBAccess == 0\n", db_name));
1122 /* don't allow any local clients to attach while we are in recovery mode
1123 * except for the recovery daemon.
1124 * allow all attach from the network since these are always from remote
1127 if (client_id != 0) {
1128 client = reqid_find(ctdb->idr, client_id, struct ctdb_client);
1130 if (client != NULL) {
1131 /* If the node is inactive it is not part of the cluster
1132 and we should not allow clients to attach to any
1135 if (node->flags & NODE_FLAGS_INACTIVE) {
1136 DEBUG(DEBUG_ERR,("DB Attach to database %s refused since node is inactive (flags=0x%x)\n", db_name, node->flags));
1140 if (ctdb->recovery_mode == CTDB_RECOVERY_ACTIVE &&
1141 client->pid != ctdb->recoverd_pid &&
1142 ctdb->runstate < CTDB_RUNSTATE_RUNNING) {
1143 struct ctdb_deferred_attach_context *da_ctx = talloc(client, struct ctdb_deferred_attach_context);
1145 if (da_ctx == NULL) {
1146 DEBUG(DEBUG_ERR,("DB Attach to database %s deferral for client with pid:%d failed due to OOM.\n", db_name, client->pid));
1150 da_ctx->ctdb = ctdb;
1151 da_ctx->c = talloc_steal(da_ctx, c);
1152 talloc_set_destructor(da_ctx, ctdb_deferred_attach_destructor);
1153 DLIST_ADD(ctdb->deferred_attach, da_ctx);
1155 tevent_add_timer(ctdb->ev, da_ctx,
1156 timeval_current_ofs(ctdb->tunable.deferred_attach_timeout, 0),
1157 ctdb_deferred_attach_timeout, da_ctx);
1159 DEBUG(DEBUG_ERR,("DB Attach to database %s deferred for client with pid:%d since node is in recovery mode.\n", db_name, client->pid));
1160 *async_reply = true;
1165 /* the client can optionally pass additional tdb flags, but we
1166 only allow a subset of those on the database in ctdb. Note
1167 that tdb_flags is passed in via the (otherwise unused)
1168 srvid to the attach control */
1169 #ifdef TDB_MUTEX_LOCKING
1170 tdb_flags &= (TDB_NOSYNC|TDB_INCOMPATIBLE_HASH|TDB_MUTEX_LOCKING|TDB_CLEAR_IF_FIRST);
1172 tdb_flags &= (TDB_NOSYNC|TDB_INCOMPATIBLE_HASH);
1175 /* see if we already have this name */
1176 db = ctdb_db_handle(ctdb, db_name);
1178 if (db->persistent != persistent) {
1179 DEBUG(DEBUG_ERR, ("ERROR: DB Attach %spersistent to %spersistent "
1180 "database %s\n", persistent ? "" : "non-",
1181 db-> persistent ? "" : "non-", db_name));
1184 outdata->dptr = (uint8_t *)&db->db_id;
1185 outdata->dsize = sizeof(db->db_id);
1186 tdb_add_flags(db->ltdb->tdb, tdb_flags);
1190 with_jenkinshash = (tdb_flags & TDB_INCOMPATIBLE_HASH) ? true : false;
1191 #ifdef TDB_MUTEX_LOCKING
1192 with_mutexes = (tdb_flags & TDB_MUTEX_LOCKING) ? true : false;
1194 with_mutexes = false;
1197 if (ctdb_local_attach(ctdb, db_name, persistent, NULL,
1198 with_jenkinshash, with_mutexes) != 0) {
1202 db = ctdb_db_handle(ctdb, db_name);
1204 DEBUG(DEBUG_ERR,("Failed to find db handle for name '%s'\n", db_name));
1208 /* remember the flags the client has specified */
1209 tdb_add_flags(db->ltdb->tdb, tdb_flags);
1211 outdata->dptr = (uint8_t *)&db->db_id;
1212 outdata->dsize = sizeof(db->db_id);
1214 /* Try to ensure it's locked in mem */
1215 lockdown_memory(ctdb->valgrinding);
1217 /* tell all the other nodes about this database */
1218 ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL, tdb_flags,
1219 persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:
1220 CTDB_CONTROL_DB_ATTACH,
1221 0, CTDB_CTRL_FLAG_NOREPLY,
1222 indata, NULL, NULL);
1229 * a client has asked to detach from a database
1231 int32_t ctdb_control_db_detach(struct ctdb_context *ctdb, TDB_DATA indata,
1235 struct ctdb_db_context *ctdb_db;
1236 struct ctdb_client *client = NULL;
1238 db_id = *(uint32_t *)indata.dptr;
1239 ctdb_db = find_ctdb_db(ctdb, db_id);
1240 if (ctdb_db == NULL) {
1241 DEBUG(DEBUG_ERR, ("Invalid dbid 0x%08x in DB detach\n",
1246 if (ctdb->tunable.allow_client_db_attach == 1) {
1247 DEBUG(DEBUG_ERR, ("DB detach from database %s denied. "
1248 "Clients are allowed access to databases "
1249 "(AllowClientDBAccess == 1)\n",
1254 if (ctdb_db->persistent) {
1255 DEBUG(DEBUG_ERR, ("DB detach from persistent database %s "
1256 "denied\n", ctdb_db->db_name));
1260 /* Cannot detach from database when in recovery */
1261 if (ctdb->recovery_mode == CTDB_RECOVERY_ACTIVE) {
1262 DEBUG(DEBUG_ERR, ("DB detach denied while in recovery\n"));
1266 /* If a control comes from a client, then broadcast it to all nodes.
1267 * Do the actual detach only if the control comes from other daemons.
1269 if (client_id != 0) {
1270 client = reqid_find(ctdb->idr, client_id, struct ctdb_client);
1271 if (client != NULL) {
1272 /* forward the control to all the nodes */
1273 ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL, 0,
1274 CTDB_CONTROL_DB_DETACH, 0,
1275 CTDB_CTRL_FLAG_NOREPLY,
1276 indata, NULL, NULL);
1279 DEBUG(DEBUG_ERR, ("Client has gone away. Failing DB detach "
1280 "for database '%s'\n", ctdb_db->db_name));
1284 /* Detach database from recoverd */
1285 if (ctdb_daemon_send_message(ctdb, ctdb->pnn,
1286 CTDB_SRVID_DETACH_DATABASE,
1288 DEBUG(DEBUG_ERR, ("Unable to detach DB from recoverd\n"));
1292 /* Disable vacuuming and drop all vacuuming data */
1293 talloc_free(ctdb_db->vacuum_handle);
1294 talloc_free(ctdb_db->delete_queue);
1296 /* Terminate any deferred fetch */
1297 talloc_free(ctdb_db->deferred_fetch);
1299 /* Terminate any traverses */
1300 while (ctdb_db->traverse) {
1301 talloc_free(ctdb_db->traverse);
1304 /* Terminate any revokes */
1305 while (ctdb_db->revokechild_active) {
1306 talloc_free(ctdb_db->revokechild_active);
1309 /* Free readonly tracking database */
1310 if (ctdb_db->readonly) {
1311 talloc_free(ctdb_db->rottdb);
1314 DLIST_REMOVE(ctdb->db_list, ctdb_db);
1316 DEBUG(DEBUG_NOTICE, ("Detached from database '%s'\n",
1318 talloc_free(ctdb_db);
1324 attach to all existing persistent databases
1326 static int ctdb_attach_persistent(struct ctdb_context *ctdb,
1327 const char *unhealthy_reason)
1332 /* open the persistent db directory and scan it for files */
1333 d = opendir(ctdb->db_directory_persistent);
1338 while ((de=readdir(d))) {
1340 size_t len = strlen(de->d_name);
1342 int invalid_name = 0;
1344 s = talloc_strdup(ctdb, de->d_name);
1347 CTDB_NO_MEMORY(ctdb, s);
1350 /* only accept names ending in .tdb */
1351 p = strstr(s, ".tdb.");
1352 if (len < 7 || p == NULL) {
1357 /* only accept names ending with .tdb. and any number of digits */
1359 while (*q != 0 && invalid_name == 0) {
1360 if (!isdigit(*q++)) {
1364 if (invalid_name == 1 || sscanf(p+5, "%u", &node) != 1 || node != ctdb->pnn) {
1365 DEBUG(DEBUG_ERR,("Ignoring persistent database '%s'\n", de->d_name));
1371 if (ctdb_local_attach(ctdb, s, true, unhealthy_reason, false, false) != 0) {
1372 DEBUG(DEBUG_ERR,("Failed to attach to persistent database '%s'\n", de->d_name));
1378 DEBUG(DEBUG_INFO,("Attached to persistent database %s\n", s));
1386 int ctdb_attach_databases(struct ctdb_context *ctdb)
1389 char *persistent_health_path = NULL;
1390 char *unhealthy_reason = NULL;
1391 bool first_try = true;
1393 persistent_health_path = talloc_asprintf(ctdb, "%s/%s.%u",
1394 ctdb->db_directory_state,
1395 PERSISTENT_HEALTH_TDB,
1397 if (persistent_health_path == NULL) {
1398 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
1404 ctdb->db_persistent_health = tdb_wrap_open(ctdb, persistent_health_path,
1405 0, TDB_DISALLOW_NESTING,
1406 O_CREAT | O_RDWR, 0600);
1407 if (ctdb->db_persistent_health == NULL) {
1408 struct tdb_wrap *tdb;
1411 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s': %d - %s\n",
1412 persistent_health_path,
1415 talloc_free(persistent_health_path);
1416 talloc_free(unhealthy_reason);
1421 unhealthy_reason = talloc_asprintf(ctdb, "WARNING - '%s' %s - %s",
1422 persistent_health_path,
1423 "was cleared after a failure",
1424 "manual verification needed");
1425 if (unhealthy_reason == NULL) {
1426 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
1427 talloc_free(persistent_health_path);
1431 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - retrying after CLEAR_IF_FIRST\n",
1432 persistent_health_path));
1433 tdb = tdb_wrap_open(ctdb, persistent_health_path,
1434 0, TDB_CLEAR_IF_FIRST | TDB_DISALLOW_NESTING,
1435 O_CREAT | O_RDWR, 0600);
1437 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - with CLEAR_IF_FIRST: %d - %s\n",
1438 persistent_health_path,
1441 talloc_free(persistent_health_path);
1442 talloc_free(unhealthy_reason);
1449 ret = tdb_check(ctdb->db_persistent_health->tdb, NULL, NULL);
1451 struct tdb_wrap *tdb;
1453 talloc_free(ctdb->db_persistent_health);
1454 ctdb->db_persistent_health = NULL;
1457 DEBUG(DEBUG_CRIT,("tdb_check('%s') failed\n",
1458 persistent_health_path));
1459 talloc_free(persistent_health_path);
1460 talloc_free(unhealthy_reason);
1465 unhealthy_reason = talloc_asprintf(ctdb, "WARNING - '%s' %s - %s",
1466 persistent_health_path,
1467 "was cleared after a failure",
1468 "manual verification needed");
1469 if (unhealthy_reason == NULL) {
1470 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
1471 talloc_free(persistent_health_path);
1475 DEBUG(DEBUG_CRIT,("tdb_check('%s') failed - retrying after CLEAR_IF_FIRST\n",
1476 persistent_health_path));
1477 tdb = tdb_wrap_open(ctdb, persistent_health_path,
1478 0, TDB_CLEAR_IF_FIRST | TDB_DISALLOW_NESTING,
1479 O_CREAT | O_RDWR, 0600);
1481 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - with CLEAR_IF_FIRST: %d - %s\n",
1482 persistent_health_path,
1485 talloc_free(persistent_health_path);
1486 talloc_free(unhealthy_reason);
1493 talloc_free(persistent_health_path);
1495 ret = ctdb_attach_persistent(ctdb, unhealthy_reason);
1496 talloc_free(unhealthy_reason);
1505 called when a broadcast seqnum update comes in
1507 int32_t ctdb_ltdb_update_seqnum(struct ctdb_context *ctdb, uint32_t db_id, uint32_t srcnode)
1509 struct ctdb_db_context *ctdb_db;
1510 if (srcnode == ctdb->pnn) {
1511 /* don't update ourselves! */
1515 ctdb_db = find_ctdb_db(ctdb, db_id);
1517 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_ltdb_update_seqnum\n", db_id));
1521 if (ctdb_db->unhealthy_reason) {
1522 DEBUG(DEBUG_ERR,("db(%s) unhealty in ctdb_ltdb_update_seqnum: %s\n",
1523 ctdb_db->db_name, ctdb_db->unhealthy_reason));
1527 tdb_increment_seqnum_nonblock(ctdb_db->ltdb->tdb);
1528 ctdb_db->seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1533 timer to check for seqnum changes in a ltdb and propogate them
1535 static void ctdb_ltdb_seqnum_check(struct tevent_context *ev,
1536 struct tevent_timer *te,
1537 struct timeval t, void *p)
1539 struct ctdb_db_context *ctdb_db = talloc_get_type(p, struct ctdb_db_context);
1540 struct ctdb_context *ctdb = ctdb_db->ctdb;
1541 uint32_t new_seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1542 if (new_seqnum != ctdb_db->seqnum) {
1543 /* something has changed - propogate it */
1545 data.dptr = (uint8_t *)&ctdb_db->db_id;
1546 data.dsize = sizeof(uint32_t);
1547 ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_VNNMAP, 0,
1548 CTDB_CONTROL_UPDATE_SEQNUM, 0, CTDB_CTRL_FLAG_NOREPLY,
1551 ctdb_db->seqnum = new_seqnum;
1553 /* setup a new timer */
1554 ctdb_db->seqnum_update =
1555 tevent_add_timer(ctdb->ev, ctdb_db,
1556 timeval_current_ofs(ctdb->tunable.seqnum_interval/1000,
1557 (ctdb->tunable.seqnum_interval%1000)*1000),
1558 ctdb_ltdb_seqnum_check, ctdb_db);
1562 enable seqnum handling on this db
1564 int32_t ctdb_ltdb_enable_seqnum(struct ctdb_context *ctdb, uint32_t db_id)
1566 struct ctdb_db_context *ctdb_db;
1567 ctdb_db = find_ctdb_db(ctdb, db_id);
1569 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_ltdb_enable_seqnum\n", db_id));
1573 if (ctdb_db->seqnum_update == NULL) {
1574 ctdb_db->seqnum_update = tevent_add_timer(
1576 timeval_current_ofs(ctdb->tunable.seqnum_interval/1000,
1577 (ctdb->tunable.seqnum_interval%1000)*1000),
1578 ctdb_ltdb_seqnum_check, ctdb_db);
1581 tdb_enable_seqnum(ctdb_db->ltdb->tdb);
1582 ctdb_db->seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1586 int ctdb_set_db_sticky(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db)
1588 if (ctdb_db->sticky) {
1592 if (ctdb_db->persistent) {
1593 DEBUG(DEBUG_ERR,("Trying to set persistent database with sticky property\n"));
1597 ctdb_db->sticky_records = trbt_create(ctdb_db, 0);
1599 ctdb_db->sticky = true;
1601 DEBUG(DEBUG_NOTICE,("set db sticky %s\n", ctdb_db->db_name));
1606 void ctdb_db_statistics_reset(struct ctdb_db_context *ctdb_db)
1608 struct ctdb_db_statistics_old *s = &ctdb_db->statistics;
1611 for (i=0; i<MAX_HOT_KEYS; i++) {
1612 if (s->hot_keys[i].key.dsize > 0) {
1613 talloc_free(s->hot_keys[i].key.dptr);
1617 ZERO_STRUCT(ctdb_db->statistics);
1620 int32_t ctdb_control_get_db_statistics(struct ctdb_context *ctdb,
1624 struct ctdb_db_context *ctdb_db;
1625 struct ctdb_db_statistics_old *stats;
1630 ctdb_db = find_ctdb_db(ctdb, db_id);
1632 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in get_db_statistics\n", db_id));
1636 len = offsetof(struct ctdb_db_statistics_old, hot_keys_wire);
1637 for (i = 0; i < MAX_HOT_KEYS; i++) {
1638 len += ctdb_db->statistics.hot_keys[i].key.dsize;
1641 stats = talloc_size(outdata, len);
1642 if (stats == NULL) {
1643 DEBUG(DEBUG_ERR,("Failed to allocate db statistics structure\n"));
1647 memcpy(stats, &ctdb_db->statistics,
1648 offsetof(struct ctdb_db_statistics_old, hot_keys_wire));
1650 stats->num_hot_keys = MAX_HOT_KEYS;
1652 ptr = &stats->hot_keys_wire[0];
1653 for (i = 0; i < MAX_HOT_KEYS; i++) {
1654 memcpy(ptr, ctdb_db->statistics.hot_keys[i].key.dptr,
1655 ctdb_db->statistics.hot_keys[i].key.dsize);
1656 ptr += ctdb_db->statistics.hot_keys[i].key.dsize;
1659 outdata->dptr = (uint8_t *)stats;
1660 outdata->dsize = len;