2 ctdb ltdb code - server side
4 Copyright (C) Andrew Tridgell 2007
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program; if not, see <http://www.gnu.org/licenses/>.
21 #include "system/network.h"
22 #include "system/filesys.h"
23 #include "system/dir.h"
24 #include "system/time.h"
25 #include "system/locale.h"
30 #include "lib/tdb_wrap/tdb_wrap.h"
31 #include "lib/util/dlinklist.h"
32 #include "lib/util/debug.h"
33 #include "lib/util/samba_util.h"
35 #include "ctdb_private.h"
36 #include "ctdb_client.h"
38 #include "common/rb_tree.h"
39 #include "common/reqid.h"
40 #include "common/system.h"
41 #include "common/common.h"
42 #include "common/logging.h"
44 #define PERSISTENT_HEALTH_TDB "persistent_health.tdb"
47 * write a record to a normal database
49 * This is the server-variant of the ctdb_ltdb_store function.
50 * It contains logic to determine whether a record should be
51 * stored or deleted. It also sends SCHEDULE_FOR_DELETION
52 * controls to the local ctdb daemon if apporpriate.
54 static int ctdb_ltdb_store_server(struct ctdb_db_context *ctdb_db,
56 struct ctdb_ltdb_header *header,
59 struct ctdb_context *ctdb = ctdb_db->ctdb;
61 uint32_t hsize = sizeof(struct ctdb_ltdb_header);
63 bool seqnum_suppressed = false;
65 bool schedule_for_deletion = false;
66 bool remove_from_delete_queue = false;
69 if (ctdb->flags & CTDB_FLAG_TORTURE) {
71 struct ctdb_ltdb_header *h2;
73 old = tdb_fetch(ctdb_db->ltdb->tdb, key);
74 h2 = (struct ctdb_ltdb_header *)old.dptr;
75 if (old.dptr != NULL &&
77 h2->rsn > header->rsn) {
79 ("RSN regression! %"PRIu64" %"PRIu64"\n",
80 h2->rsn, header->rsn));
87 if (ctdb->vnn_map == NULL) {
89 * Called from a client: always store the record
90 * Also don't call ctdb_lmaster since it uses the vnn_map!
96 lmaster = ctdb_lmaster(ctdb_db->ctdb, &key);
99 * If we migrate an empty record off to another node
100 * and the record has not been migrated with data,
101 * delete the record instead of storing the empty record.
103 if (data.dsize != 0) {
105 } else if (header->flags & CTDB_REC_RO_FLAGS) {
107 } else if (header->flags & CTDB_REC_FLAG_AUTOMATIC) {
109 * The record is not created by the client but
110 * automatically by the ctdb_ltdb_fetch logic that
111 * creates a record with an initial header in the
112 * ltdb before trying to migrate the record from
113 * the current lmaster. Keep it instead of trying
114 * to delete the non-existing record...
117 schedule_for_deletion = true;
118 } else if (header->flags & CTDB_REC_FLAG_MIGRATED_WITH_DATA) {
120 } else if (ctdb_db->ctdb->pnn == lmaster) {
122 * If we are lmaster, then we usually keep the record.
123 * But if we retrieve the dmaster role by a VACUUM_MIGRATE
124 * and the record is empty and has never been migrated
125 * with data, then we should delete it instead of storing it.
126 * This is part of the vacuuming process.
128 * The reason that we usually need to store even empty records
129 * on the lmaster is that a client operating directly on the
130 * lmaster (== dmaster) expects the local copy of the record to
131 * exist after successful ctdb migrate call. If the record does
132 * not exist, the client goes into a migrate loop and eventually
133 * fails. So storing the empty record makes sure that we do not
134 * need to change the client code.
136 if (!(header->flags & CTDB_REC_FLAG_VACUUM_MIGRATED)) {
138 } else if (ctdb_db->ctdb->pnn != header->dmaster) {
141 } else if (ctdb_db->ctdb->pnn == header->dmaster) {
146 if (ctdb_db_volatile(ctdb_db) &&
147 (ctdb_db->ctdb->pnn == header->dmaster) &&
148 !(header->flags & CTDB_REC_RO_FLAGS))
152 if (data.dsize == 0) {
153 schedule_for_deletion = true;
156 remove_from_delete_queue = !schedule_for_deletion;
161 * The VACUUM_MIGRATED flag is only set temporarily for
162 * the above logic when the record was retrieved by a
163 * VACUUM_MIGRATE call and should not be stored in the
166 * The VACUUM_MIGRATE call is triggered by a vacuum fetch,
167 * and there are two cases in which the corresponding record
168 * is stored in the local database:
169 * 1. The record has been migrated with data in the past
170 * (the MIGRATED_WITH_DATA record flag is set).
171 * 2. The record has been filled with data again since it
172 * had been submitted in the VACUUM_FETCH message to the
174 * For such records it is important to not store the
175 * VACUUM_MIGRATED flag in the database.
177 header->flags &= ~CTDB_REC_FLAG_VACUUM_MIGRATED;
180 * Similarly, clear the AUTOMATIC flag which should not enter
181 * the local database copy since this would require client
182 * modifications to clear the flag when the client stores
185 header->flags &= ~CTDB_REC_FLAG_AUTOMATIC;
187 rec[0].dsize = hsize;
188 rec[0].dptr = (uint8_t *)header;
190 rec[1].dsize = data.dsize;
191 rec[1].dptr = data.dptr;
193 /* Databases with seqnum updates enabled only get their seqnum
194 changes when/if we modify the data */
195 if (ctdb_db->seqnum_update != NULL) {
197 old = tdb_fetch(ctdb_db->ltdb->tdb, key);
199 if ((old.dsize == hsize + data.dsize) &&
200 memcmp(old.dptr + hsize, data.dptr, data.dsize) == 0) {
201 tdb_remove_flags(ctdb_db->ltdb->tdb, TDB_SEQNUM);
202 seqnum_suppressed = true;
204 if (old.dptr != NULL) {
209 DEBUG(DEBUG_DEBUG, (__location__ " db[%s]: %s record: hash[0x%08x]\n",
211 keep?"storing":"deleting",
215 ret = tdb_storev(ctdb_db->ltdb->tdb, key, rec, 2, TDB_REPLACE);
217 ret = tdb_delete(ctdb_db->ltdb->tdb, key);
224 tdb_error(ctdb_db->ltdb->tdb) == TDB_ERR_NOEXIST)
229 DEBUG(lvl, (__location__ " db[%s]: Failed to %s record: "
232 keep?"store":"delete", ret,
233 tdb_errorstr(ctdb_db->ltdb->tdb)));
235 schedule_for_deletion = false;
236 remove_from_delete_queue = false;
238 if (seqnum_suppressed) {
239 tdb_add_flags(ctdb_db->ltdb->tdb, TDB_SEQNUM);
242 if (schedule_for_deletion) {
244 ret2 = ctdb_local_schedule_for_deletion(ctdb_db, header, key);
246 DEBUG(DEBUG_ERR, (__location__ " ctdb_local_schedule_for_deletion failed.\n"));
250 if (remove_from_delete_queue) {
251 ctdb_local_remove_from_delete_queue(ctdb_db, header, key);
257 struct lock_fetch_state {
258 struct ctdb_context *ctdb;
259 struct ctdb_db_context *ctdb_db;
260 void (*recv_pkt)(void *, struct ctdb_req_header *);
262 struct ctdb_req_header *hdr;
264 bool ignore_generation;
268 called when we should retry the operation
270 static void lock_fetch_callback(void *p, bool locked)
272 struct lock_fetch_state *state = talloc_get_type(p, struct lock_fetch_state);
273 if (!state->ignore_generation &&
274 state->generation != state->ctdb_db->generation) {
275 DEBUG(DEBUG_NOTICE,("Discarding previous generation lockwait packet\n"));
276 talloc_free(state->hdr);
279 state->recv_pkt(state->recv_context, state->hdr);
280 DEBUG(DEBUG_INFO,(__location__ " PACKET REQUEUED\n"));
285 do a non-blocking ltdb_lock, deferring this ctdb request until we
288 It does the following:
290 1) tries to get the chainlock. If it succeeds, then it returns 0
292 2) if it fails to get a chainlock immediately then it sets up a
293 non-blocking chainlock via ctdb_lock_record, and when it gets the
294 chainlock it re-submits this ctdb request to the main packet
297 This effectively queues all ctdb requests that cannot be
298 immediately satisfied until it can get the lock. This means that
299 the main ctdb daemon will not block waiting for a chainlock held by
302 There are 3 possible return values:
304 0: means that it got the lock immediately.
305 -1: means that it failed to get the lock, and won't retry
306 -2: means that it failed to get the lock immediately, but will retry
308 int ctdb_ltdb_lock_requeue(struct ctdb_db_context *ctdb_db,
309 TDB_DATA key, struct ctdb_req_header *hdr,
310 void (*recv_pkt)(void *, struct ctdb_req_header *),
311 void *recv_context, bool ignore_generation)
314 struct tdb_context *tdb = ctdb_db->ltdb->tdb;
315 struct lock_request *lreq;
316 struct lock_fetch_state *state;
318 ret = tdb_chainlock_nonblock(tdb, key);
321 !(errno == EACCES || errno == EAGAIN || errno == EDEADLK)) {
322 /* a hard failure - don't try again */
326 /* when torturing, ensure we test the contended path */
327 if ((ctdb_db->ctdb->flags & CTDB_FLAG_TORTURE) &&
330 tdb_chainunlock(tdb, key);
333 /* first the non-contended path */
338 state = talloc(hdr, struct lock_fetch_state);
339 state->ctdb = ctdb_db->ctdb;
340 state->ctdb_db = ctdb_db;
342 state->recv_pkt = recv_pkt;
343 state->recv_context = recv_context;
344 state->generation = ctdb_db->generation;
345 state->ignore_generation = ignore_generation;
347 /* now the contended path */
348 lreq = ctdb_lock_record(state, ctdb_db, key, true, lock_fetch_callback, state);
353 /* we need to move the packet off the temporary context in ctdb_input_pkt(),
354 so it won't be freed yet */
355 talloc_steal(state, hdr);
357 /* now tell the caller than we will retry asynchronously */
362 a varient of ctdb_ltdb_lock_requeue that also fetches the record
364 int ctdb_ltdb_lock_fetch_requeue(struct ctdb_db_context *ctdb_db,
365 TDB_DATA key, struct ctdb_ltdb_header *header,
366 struct ctdb_req_header *hdr, TDB_DATA *data,
367 void (*recv_pkt)(void *, struct ctdb_req_header *),
368 void *recv_context, bool ignore_generation)
372 ret = ctdb_ltdb_lock_requeue(ctdb_db, key, hdr, recv_pkt,
373 recv_context, ignore_generation);
375 ret = ctdb_ltdb_fetch(ctdb_db, key, header, hdr, data);
378 uret = ctdb_ltdb_unlock(ctdb_db, key);
380 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", uret));
389 paraoid check to see if the db is empty
391 static void ctdb_check_db_empty(struct ctdb_db_context *ctdb_db)
393 struct tdb_context *tdb = ctdb_db->ltdb->tdb;
394 int count = tdb_traverse_read(tdb, NULL, NULL);
396 DEBUG(DEBUG_ALERT,(__location__ " tdb '%s' not empty on attach! aborting\n",
398 ctdb_fatal(ctdb_db->ctdb, "database not empty on attach");
402 int ctdb_load_persistent_health(struct ctdb_context *ctdb,
403 struct ctdb_db_context *ctdb_db)
405 struct tdb_context *tdb = ctdb->db_persistent_health->tdb;
411 key.dptr = discard_const_p(uint8_t, ctdb_db->db_name);
412 key.dsize = strlen(ctdb_db->db_name);
414 old = ctdb_db->unhealthy_reason;
415 ctdb_db->unhealthy_reason = NULL;
417 val = tdb_fetch(tdb, key);
419 reason = talloc_strndup(ctdb_db,
420 (const char *)val.dptr,
422 if (reason == NULL) {
423 DEBUG(DEBUG_ALERT,(__location__ " talloc_strndup(%d) failed\n",
425 ctdb_db->unhealthy_reason = old;
436 ctdb_db->unhealthy_reason = reason;
440 int ctdb_update_persistent_health(struct ctdb_context *ctdb,
441 struct ctdb_db_context *ctdb_db,
442 const char *given_reason,/* NULL means healthy */
443 int num_healthy_nodes)
445 struct tdb_context *tdb = ctdb->db_persistent_health->tdb;
449 char *new_reason = NULL;
450 char *old_reason = NULL;
452 ret = tdb_transaction_start(tdb);
454 DEBUG(DEBUG_ALERT,(__location__ " tdb_transaction_start('%s') failed: %d - %s\n",
455 tdb_name(tdb), ret, tdb_errorstr(tdb)));
459 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
461 DEBUG(DEBUG_ALERT,(__location__ " ctdb_load_persistent_health('%s') failed: %d\n",
462 ctdb_db->db_name, ret));
465 old_reason = ctdb_db->unhealthy_reason;
467 key.dptr = discard_const_p(uint8_t, ctdb_db->db_name);
468 key.dsize = strlen(ctdb_db->db_name);
471 new_reason = talloc_strdup(ctdb_db, given_reason);
472 if (new_reason == NULL) {
473 DEBUG(DEBUG_ALERT,(__location__ " talloc_strdup(%s) failed\n",
477 } else if (old_reason && num_healthy_nodes == 0) {
479 * If the reason indicates ok, but there where no healthy nodes
480 * available, that it means, we have not recovered valid content
481 * of the db. So if there's an old reason, prefix it with
482 * "NO-HEALTHY-NODES - "
486 #define _TMP_PREFIX "NO-HEALTHY-NODES - "
487 ret = strncmp(_TMP_PREFIX, old_reason, strlen(_TMP_PREFIX));
489 prefix = _TMP_PREFIX;
493 new_reason = talloc_asprintf(ctdb_db, "%s%s",
495 if (new_reason == NULL) {
496 DEBUG(DEBUG_ALERT,(__location__ " talloc_asprintf(%s%s) failed\n",
497 prefix, old_reason));
504 val.dptr = discard_const_p(uint8_t, new_reason);
505 val.dsize = strlen(new_reason);
507 ret = tdb_store(tdb, key, val, TDB_REPLACE);
509 tdb_transaction_cancel(tdb);
510 DEBUG(DEBUG_ALERT,(__location__ " tdb_store('%s', %s, %s) failed: %d - %s\n",
511 tdb_name(tdb), ctdb_db->db_name, new_reason,
512 ret, tdb_errorstr(tdb)));
513 talloc_free(new_reason);
516 DEBUG(DEBUG_ALERT,("Updated db health for db(%s) to: %s\n",
517 ctdb_db->db_name, new_reason));
518 } else if (old_reason) {
519 ret = tdb_delete(tdb, key);
521 tdb_transaction_cancel(tdb);
522 DEBUG(DEBUG_ALERT,(__location__ " tdb_delete('%s', %s) failed: %d - %s\n",
523 tdb_name(tdb), ctdb_db->db_name,
524 ret, tdb_errorstr(tdb)));
525 talloc_free(new_reason);
528 DEBUG(DEBUG_NOTICE,("Updated db health for db(%s): OK\n",
532 ret = tdb_transaction_commit(tdb);
533 if (ret != TDB_SUCCESS) {
534 DEBUG(DEBUG_ALERT,(__location__ " tdb_transaction_commit('%s') failed: %d - %s\n",
535 tdb_name(tdb), ret, tdb_errorstr(tdb)));
536 talloc_free(new_reason);
540 talloc_free(old_reason);
541 ctdb_db->unhealthy_reason = new_reason;
546 static int ctdb_backup_corrupted_tdb(struct ctdb_context *ctdb,
547 struct ctdb_db_context *ctdb_db)
549 time_t now = time(NULL);
557 /* formatted like: foo.tdb.0.corrupted.20091204160825.0Z */
558 new_path = talloc_asprintf(ctdb_db, "%s.corrupted."
559 "%04u%02u%02u%02u%02u%02u.0Z",
561 tm->tm_year+1900, tm->tm_mon+1,
562 tm->tm_mday, tm->tm_hour, tm->tm_min,
564 if (new_path == NULL) {
565 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
569 new_reason = talloc_asprintf(ctdb_db,
570 "ERROR - Backup of corrupted TDB in '%s'",
572 if (new_reason == NULL) {
573 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
576 ret = ctdb_update_persistent_health(ctdb, ctdb_db, new_reason, 0);
577 talloc_free(new_reason);
579 DEBUG(DEBUG_CRIT,(__location__
580 ": ctdb_backup_corrupted_tdb(%s) not implemented yet\n",
585 ret = rename(ctdb_db->db_path, new_path);
587 DEBUG(DEBUG_CRIT,(__location__
588 ": ctdb_backup_corrupted_tdb(%s) rename to %s failed: %d - %s\n",
589 ctdb_db->db_path, new_path,
590 errno, strerror(errno)));
591 talloc_free(new_path);
595 DEBUG(DEBUG_CRIT,(__location__
596 ": ctdb_backup_corrupted_tdb(%s) renamed to %s\n",
597 ctdb_db->db_path, new_path));
598 talloc_free(new_path);
602 int ctdb_recheck_persistent_health(struct ctdb_context *ctdb)
604 struct ctdb_db_context *ctdb_db;
609 for (ctdb_db = ctdb->db_list; ctdb_db; ctdb_db = ctdb_db->next) {
610 if (!ctdb_db_persistent(ctdb_db)) {
614 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
616 DEBUG(DEBUG_ALERT,(__location__
617 " load persistent health for '%s' failed\n",
622 if (ctdb_db->unhealthy_reason == NULL) {
624 DEBUG(DEBUG_INFO,(__location__
625 " persistent db '%s' healthy\n",
631 DEBUG(DEBUG_ALERT,(__location__
632 " persistent db '%s' unhealthy: %s\n",
634 ctdb_db->unhealthy_reason));
637 ("ctdb_recheck_persistent_health: OK[%d] FAIL[%d]\n",
649 mark a database - as healthy
651 int32_t ctdb_control_db_set_healthy(struct ctdb_context *ctdb, TDB_DATA indata)
653 uint32_t db_id = *(uint32_t *)indata.dptr;
654 struct ctdb_db_context *ctdb_db;
656 bool may_recover = false;
658 ctdb_db = find_ctdb_db(ctdb, db_id);
660 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%x\n", db_id));
664 if (ctdb_db->unhealthy_reason) {
668 ret = ctdb_update_persistent_health(ctdb, ctdb_db, NULL, 1);
670 DEBUG(DEBUG_ERR,(__location__
671 " ctdb_update_persistent_health(%s) failed\n",
676 if (may_recover && ctdb->runstate == CTDB_RUNSTATE_STARTUP) {
677 DEBUG(DEBUG_ERR, (__location__ " db %s become healthy - force recovery for startup\n",
679 ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
685 int32_t ctdb_control_db_get_health(struct ctdb_context *ctdb,
689 uint32_t db_id = *(uint32_t *)indata.dptr;
690 struct ctdb_db_context *ctdb_db;
693 ctdb_db = find_ctdb_db(ctdb, db_id);
695 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%x\n", db_id));
699 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
701 DEBUG(DEBUG_ERR,(__location__
702 " ctdb_load_persistent_health(%s) failed\n",
708 if (ctdb_db->unhealthy_reason) {
709 outdata->dptr = (uint8_t *)ctdb_db->unhealthy_reason;
710 outdata->dsize = strlen(ctdb_db->unhealthy_reason)+1;
717 int ctdb_set_db_readonly(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db)
721 if (ctdb_db_readonly(ctdb_db)) {
725 if (! ctdb_db_volatile(ctdb_db)) {
727 ("Non-volatile databases do not support readonly flag\n"));
731 ropath = talloc_asprintf(ctdb_db, "%s.RO", ctdb_db->db_path);
732 if (ropath == NULL) {
733 DEBUG(DEBUG_CRIT,("Failed to asprintf the tracking database\n"));
736 ctdb_db->rottdb = tdb_open(ropath,
737 ctdb->tunable.database_hash_size,
738 TDB_NOLOCK|TDB_CLEAR_IF_FIRST|TDB_NOSYNC,
739 O_CREAT|O_RDWR, 0600);
740 if (ctdb_db->rottdb == NULL) {
741 DEBUG(DEBUG_CRIT,("Failed to open/create the tracking database '%s'\n", ropath));
746 DEBUG(DEBUG_NOTICE,("OPENED tracking database : '%s'\n", ropath));
748 ctdb_db_set_readonly(ctdb_db);
750 DEBUG(DEBUG_NOTICE, ("Readonly property set on DB %s\n", ctdb_db->db_name));
757 attach to a database, handling both persistent and non-persistent databases
758 return 0 on success, -1 on failure
760 static int ctdb_local_attach(struct ctdb_context *ctdb, const char *db_name,
761 bool persistent, const char *unhealthy_reason)
763 struct ctdb_db_context *ctdb_db, *tmp_db;
768 int remaining_tries = 0;
769 uint8_t db_flags = 0;
771 ctdb_db = talloc_zero(ctdb, struct ctdb_db_context);
772 CTDB_NO_MEMORY(ctdb, ctdb_db);
774 ctdb_db->ctdb = ctdb;
775 ctdb_db->db_name = talloc_strdup(ctdb_db, db_name);
776 CTDB_NO_MEMORY(ctdb, ctdb_db->db_name);
778 key.dsize = strlen(db_name)+1;
779 key.dptr = discard_const(db_name);
780 ctdb_db->db_id = ctdb_hash(&key);
782 ctdb_db->db_flags = CTDB_DB_FLAGS_PERSISTENT;
785 if (ctdb_db_volatile(ctdb_db)) {
786 ctdb_db->delete_queue = trbt_create(ctdb_db, 0);
787 if (ctdb_db->delete_queue == NULL) {
788 CTDB_NO_MEMORY(ctdb, ctdb_db->delete_queue);
791 ctdb_db->ctdb_ltdb_store_fn = ctdb_ltdb_store_server;
794 /* check for hash collisions */
795 for (tmp_db=ctdb->db_list;tmp_db;tmp_db=tmp_db->next) {
796 if (tmp_db->db_id == ctdb_db->db_id) {
797 DEBUG(DEBUG_CRIT,("db_id 0x%x hash collision. name1='%s' name2='%s'\n",
798 tmp_db->db_id, db_name, tmp_db->db_name));
799 talloc_free(ctdb_db);
805 if (unhealthy_reason) {
806 ret = ctdb_update_persistent_health(ctdb, ctdb_db,
807 unhealthy_reason, 0);
809 DEBUG(DEBUG_ALERT,(__location__ " ctdb_update_persistent_health('%s','%s') failed: %d\n",
810 ctdb_db->db_name, unhealthy_reason, ret));
811 talloc_free(ctdb_db);
816 if (ctdb->max_persistent_check_errors > 0) {
819 if (ctdb->runstate == CTDB_RUNSTATE_RUNNING) {
823 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
825 DEBUG(DEBUG_ALERT,(__location__ " ctdb_load_persistent_health('%s') failed: %d\n",
826 ctdb_db->db_name, ret));
827 talloc_free(ctdb_db);
832 if (ctdb_db->unhealthy_reason && remaining_tries == 0) {
833 DEBUG(DEBUG_ALERT,(__location__ "ERROR: tdb %s is marked as unhealthy: %s\n",
834 ctdb_db->db_name, ctdb_db->unhealthy_reason));
835 talloc_free(ctdb_db);
839 if (ctdb_db->unhealthy_reason) {
840 /* this is just a warning, but we want that in the log file! */
841 DEBUG(DEBUG_ALERT,(__location__ "Warning: tdb %s is marked as unhealthy: %s\n",
842 ctdb_db->db_name, ctdb_db->unhealthy_reason));
845 /* open the database */
846 ctdb_db->db_path = talloc_asprintf(ctdb_db, "%s/%s.%u",
847 persistent?ctdb->db_directory_persistent:ctdb->db_directory,
851 db_flags = CTDB_DB_FLAGS_PERSISTENT;
854 tdb_flags = ctdb_db_tdb_flags(db_flags, ctdb->valgrinding,
855 ctdb->tunable.mutex_enabled);
858 ctdb_db->ltdb = tdb_wrap_open(ctdb_db, ctdb_db->db_path,
859 ctdb->tunable.database_hash_size,
861 O_CREAT|O_RDWR, mode);
862 if (ctdb_db->ltdb == NULL) {
864 int saved_errno = errno;
867 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s': %d - %s\n",
870 strerror(saved_errno)));
871 talloc_free(ctdb_db);
875 if (remaining_tries == 0) {
876 DEBUG(DEBUG_CRIT,(__location__
877 "Failed to open persistent tdb '%s': %d - %s\n",
880 strerror(saved_errno)));
881 talloc_free(ctdb_db);
885 ret = stat(ctdb_db->db_path, &st);
887 DEBUG(DEBUG_CRIT,(__location__
888 "Failed to open persistent tdb '%s': %d - %s\n",
891 strerror(saved_errno)));
892 talloc_free(ctdb_db);
896 ret = ctdb_backup_corrupted_tdb(ctdb, ctdb_db);
898 DEBUG(DEBUG_CRIT,(__location__
899 "Failed to open persistent tdb '%s': %d - %s\n",
902 strerror(saved_errno)));
903 talloc_free(ctdb_db);
913 ctdb_check_db_empty(ctdb_db);
915 ret = tdb_check(ctdb_db->ltdb->tdb, NULL, NULL);
920 DEBUG(DEBUG_CRIT,("tdb_check(%s) failed: %d - %s\n",
921 ctdb_db->db_path, ret,
922 tdb_errorstr(ctdb_db->ltdb->tdb)));
923 if (remaining_tries == 0) {
924 talloc_free(ctdb_db);
928 fd = tdb_fd(ctdb_db->ltdb->tdb);
929 ret = fstat(fd, &st);
931 DEBUG(DEBUG_CRIT,(__location__
932 "Failed to fstat() persistent tdb '%s': %d - %s\n",
936 talloc_free(ctdb_db);
941 talloc_free(ctdb_db->ltdb);
942 ctdb_db->ltdb = NULL;
944 ret = ctdb_backup_corrupted_tdb(ctdb, ctdb_db);
946 DEBUG(DEBUG_CRIT,("Failed to backup corrupted tdb '%s'\n",
948 talloc_free(ctdb_db);
958 /* remember the flags the client has specified */
959 tdb_add_flags(ctdb_db->ltdb->tdb, tdb_flags);
962 /* set up a rb tree we can use to track which records we have a
963 fetch-lock in-flight for so we can defer any additional calls
966 ctdb_db->deferred_fetch = trbt_create(ctdb_db, 0);
967 if (ctdb_db->deferred_fetch == NULL) {
968 DEBUG(DEBUG_ERR,("Failed to create deferred fetch rb tree for ctdb database\n"));
969 talloc_free(ctdb_db);
973 ctdb_db->defer_dmaster = trbt_create(ctdb_db, 0);
974 if (ctdb_db->defer_dmaster == NULL) {
975 DEBUG(DEBUG_ERR, ("Failed to create defer dmaster rb tree for %s\n",
977 talloc_free(ctdb_db);
981 DLIST_ADD(ctdb->db_list, ctdb_db);
983 /* setting this can help some high churn databases */
984 tdb_set_max_dead(ctdb_db->ltdb->tdb, ctdb->tunable.database_max_dead);
987 all databases support the "null" function. we need this in
988 order to do forced migration of records
990 ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_null_func, CTDB_NULL_FUNC);
992 DEBUG(DEBUG_CRIT,("Failed to setup null function for '%s'\n", ctdb_db->db_name));
993 talloc_free(ctdb_db);
998 all databases support the "fetch" function. we need this
999 for efficient Samba3 ctdb fetch
1001 ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_fetch_func, CTDB_FETCH_FUNC);
1003 DEBUG(DEBUG_CRIT,("Failed to setup fetch function for '%s'\n", ctdb_db->db_name));
1004 talloc_free(ctdb_db);
1009 all databases support the "fetch_with_header" function. we need this
1010 for efficient readonly record fetches
1012 ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_fetch_with_header_func, CTDB_FETCH_WITH_HEADER_FUNC);
1014 DEBUG(DEBUG_CRIT,("Failed to setup fetch function for '%s'\n", ctdb_db->db_name));
1015 talloc_free(ctdb_db);
1019 ret = ctdb_vacuum_init(ctdb_db);
1021 DEBUG(DEBUG_CRIT,("Failed to setup vacuuming for "
1022 "database '%s'\n", ctdb_db->db_name));
1023 talloc_free(ctdb_db);
1027 ret = ctdb_migration_init(ctdb_db);
1030 ("Failed to setup migration tracking for db '%s'\n",
1032 talloc_free(ctdb_db);
1036 ret = db_hash_init(ctdb_db, "lock_log", 2048, DB_HASH_COMPLEX,
1037 &ctdb_db->lock_log);
1040 ("Failed to setup lock logging for db '%s'\n",
1042 talloc_free(ctdb_db);
1046 ctdb_db->generation = ctdb->vnn_map->generation;
1048 DEBUG(DEBUG_NOTICE,("Attached to database '%s' with flags 0x%x\n",
1049 ctdb_db->db_path, tdb_flags));
1056 struct ctdb_deferred_attach_context {
1057 struct ctdb_deferred_attach_context *next, *prev;
1058 struct ctdb_context *ctdb;
1059 struct ctdb_req_control_old *c;
1063 static int ctdb_deferred_attach_destructor(struct ctdb_deferred_attach_context *da_ctx)
1065 DLIST_REMOVE(da_ctx->ctdb->deferred_attach, da_ctx);
1070 static void ctdb_deferred_attach_timeout(struct tevent_context *ev,
1071 struct tevent_timer *te,
1072 struct timeval t, void *private_data)
1074 struct ctdb_deferred_attach_context *da_ctx = talloc_get_type(private_data, struct ctdb_deferred_attach_context);
1075 struct ctdb_context *ctdb = da_ctx->ctdb;
1077 ctdb_request_control_reply(ctdb, da_ctx->c, NULL, -1, NULL);
1078 talloc_free(da_ctx);
1081 static void ctdb_deferred_attach_callback(struct tevent_context *ev,
1082 struct tevent_timer *te,
1083 struct timeval t, void *private_data)
1085 struct ctdb_deferred_attach_context *da_ctx = talloc_get_type(private_data, struct ctdb_deferred_attach_context);
1086 struct ctdb_context *ctdb = da_ctx->ctdb;
1088 /* This talloc-steals the packet ->c */
1089 ctdb_input_pkt(ctdb, (struct ctdb_req_header *)da_ctx->c);
1090 talloc_free(da_ctx);
1093 int ctdb_process_deferred_attach(struct ctdb_context *ctdb)
1095 struct ctdb_deferred_attach_context *da_ctx;
1097 /* call it from the main event loop as soon as the current event
1100 while ((da_ctx = ctdb->deferred_attach) != NULL) {
1101 DLIST_REMOVE(ctdb->deferred_attach, da_ctx);
1102 tevent_add_timer(ctdb->ev, da_ctx,
1103 timeval_current_ofs(1,0),
1104 ctdb_deferred_attach_callback, da_ctx);
1111 a client has asked to attach a new database
1113 int32_t ctdb_control_db_attach(struct ctdb_context *ctdb, TDB_DATA indata,
1115 bool persistent, uint32_t client_id,
1116 struct ctdb_req_control_old *c,
1119 const char *db_name = (const char *)indata.dptr;
1120 struct ctdb_db_context *db;
1121 struct ctdb_node *node = ctdb->nodes[ctdb->pnn];
1122 struct ctdb_client *client = NULL;
1124 if (ctdb->tunable.allow_client_db_attach == 0) {
1125 DEBUG(DEBUG_ERR, ("DB Attach to database %s denied by tunable "
1126 "AllowClientDBAccess == 0\n", db_name));
1130 /* don't allow any local clients to attach while we are in recovery mode
1131 * except for the recovery daemon.
1132 * allow all attach from the network since these are always from remote
1135 if (client_id != 0) {
1136 client = reqid_find(ctdb->idr, client_id, struct ctdb_client);
1138 if (client != NULL) {
1139 /* If the node is inactive it is not part of the cluster
1140 and we should not allow clients to attach to any
1143 if (node->flags & NODE_FLAGS_INACTIVE) {
1144 DEBUG(DEBUG_ERR,("DB Attach to database %s refused since node is inactive (flags=0x%x)\n", db_name, node->flags));
1148 if (ctdb->recovery_mode == CTDB_RECOVERY_ACTIVE &&
1149 client->pid != ctdb->recoverd_pid &&
1150 ctdb->runstate < CTDB_RUNSTATE_RUNNING) {
1151 struct ctdb_deferred_attach_context *da_ctx = talloc(client, struct ctdb_deferred_attach_context);
1153 if (da_ctx == NULL) {
1154 DEBUG(DEBUG_ERR,("DB Attach to database %s deferral for client with pid:%d failed due to OOM.\n", db_name, client->pid));
1158 da_ctx->ctdb = ctdb;
1159 da_ctx->c = talloc_steal(da_ctx, c);
1160 talloc_set_destructor(da_ctx, ctdb_deferred_attach_destructor);
1161 DLIST_ADD(ctdb->deferred_attach, da_ctx);
1163 tevent_add_timer(ctdb->ev, da_ctx,
1164 timeval_current_ofs(ctdb->tunable.deferred_attach_timeout, 0),
1165 ctdb_deferred_attach_timeout, da_ctx);
1167 DEBUG(DEBUG_ERR,("DB Attach to database %s deferred for client with pid:%d since node is in recovery mode.\n", db_name, client->pid));
1168 *async_reply = true;
1173 /* see if we already have this name */
1174 db = ctdb_db_handle(ctdb, db_name);
1176 if (ctdb_db_persistent(db) != persistent) {
1177 DEBUG(DEBUG_ERR, ("ERROR: DB Attach %spersistent to %spersistent "
1178 "database %s\n", persistent ? "" : "non-",
1179 ctdb_db_persistent(db) ? "" : "non-", db_name));
1182 outdata->dptr = (uint8_t *)&db->db_id;
1183 outdata->dsize = sizeof(db->db_id);
1187 if (ctdb_local_attach(ctdb, db_name, persistent, NULL) != 0) {
1191 db = ctdb_db_handle(ctdb, db_name);
1193 DEBUG(DEBUG_ERR,("Failed to find db handle for name '%s'\n", db_name));
1197 outdata->dptr = (uint8_t *)&db->db_id;
1198 outdata->dsize = sizeof(db->db_id);
1200 /* Try to ensure it's locked in mem */
1201 lockdown_memory(ctdb->valgrinding);
1203 /* tell all the other nodes about this database */
1204 ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL, 0,
1205 persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:
1206 CTDB_CONTROL_DB_ATTACH,
1207 0, CTDB_CTRL_FLAG_NOREPLY,
1208 indata, NULL, NULL);
1215 * a client has asked to detach from a database
1217 int32_t ctdb_control_db_detach(struct ctdb_context *ctdb, TDB_DATA indata,
1221 struct ctdb_db_context *ctdb_db;
1222 struct ctdb_client *client = NULL;
1224 db_id = *(uint32_t *)indata.dptr;
1225 ctdb_db = find_ctdb_db(ctdb, db_id);
1226 if (ctdb_db == NULL) {
1227 DEBUG(DEBUG_ERR, ("Invalid dbid 0x%08x in DB detach\n",
1232 if (ctdb->tunable.allow_client_db_attach == 1) {
1233 DEBUG(DEBUG_ERR, ("DB detach from database %s denied. "
1234 "Clients are allowed access to databases "
1235 "(AllowClientDBAccess == 1)\n",
1240 if (! ctdb_db_volatile(ctdb_db)) {
1242 ("Detaching non-volatile database %s denied\n",
1247 /* Cannot detach from database when in recovery */
1248 if (ctdb->recovery_mode == CTDB_RECOVERY_ACTIVE) {
1249 DEBUG(DEBUG_ERR, ("DB detach denied while in recovery\n"));
1253 /* If a control comes from a client, then broadcast it to all nodes.
1254 * Do the actual detach only if the control comes from other daemons.
1256 if (client_id != 0) {
1257 client = reqid_find(ctdb->idr, client_id, struct ctdb_client);
1258 if (client != NULL) {
1259 /* forward the control to all the nodes */
1260 ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL, 0,
1261 CTDB_CONTROL_DB_DETACH, 0,
1262 CTDB_CTRL_FLAG_NOREPLY,
1263 indata, NULL, NULL);
1266 DEBUG(DEBUG_ERR, ("Client has gone away. Failing DB detach "
1267 "for database '%s'\n", ctdb_db->db_name));
1271 /* Detach database from recoverd */
1272 if (ctdb_daemon_send_message(ctdb, ctdb->pnn,
1273 CTDB_SRVID_DETACH_DATABASE,
1275 DEBUG(DEBUG_ERR, ("Unable to detach DB from recoverd\n"));
1279 /* Disable vacuuming and drop all vacuuming data */
1280 talloc_free(ctdb_db->vacuum_handle);
1281 talloc_free(ctdb_db->delete_queue);
1283 /* Terminate any deferred fetch */
1284 talloc_free(ctdb_db->deferred_fetch);
1286 /* Terminate any traverses */
1287 while (ctdb_db->traverse) {
1288 talloc_free(ctdb_db->traverse);
1291 /* Terminate any revokes */
1292 while (ctdb_db->revokechild_active) {
1293 talloc_free(ctdb_db->revokechild_active);
1296 /* Free readonly tracking database */
1297 if (ctdb_db_readonly(ctdb_db)) {
1298 talloc_free(ctdb_db->rottdb);
1301 DLIST_REMOVE(ctdb->db_list, ctdb_db);
1303 DEBUG(DEBUG_NOTICE, ("Detached from database '%s'\n",
1305 talloc_free(ctdb_db);
1311 attach to all existing persistent databases
1313 static int ctdb_attach_persistent(struct ctdb_context *ctdb,
1314 const char *unhealthy_reason)
1319 /* open the persistent db directory and scan it for files */
1320 d = opendir(ctdb->db_directory_persistent);
1325 while ((de=readdir(d))) {
1327 size_t len = strlen(de->d_name);
1329 int invalid_name = 0;
1331 s = talloc_strdup(ctdb, de->d_name);
1334 CTDB_NO_MEMORY(ctdb, s);
1337 /* only accept names ending in .tdb */
1338 p = strstr(s, ".tdb.");
1339 if (len < 7 || p == NULL) {
1344 /* only accept names ending with .tdb. and any number of digits */
1346 while (*q != 0 && invalid_name == 0) {
1347 if (!isdigit(*q++)) {
1351 if (invalid_name == 1 || sscanf(p+5, "%u", &node) != 1 || node != ctdb->pnn) {
1352 DEBUG(DEBUG_ERR,("Ignoring persistent database '%s'\n", de->d_name));
1358 if (ctdb_local_attach(ctdb, s, true, unhealthy_reason) != 0) {
1359 DEBUG(DEBUG_ERR,("Failed to attach to persistent database '%s'\n", de->d_name));
1365 DEBUG(DEBUG_INFO,("Attached to persistent database %s\n", s));
1373 int ctdb_attach_databases(struct ctdb_context *ctdb)
1376 char *persistent_health_path = NULL;
1377 char *unhealthy_reason = NULL;
1378 bool first_try = true;
1380 persistent_health_path = talloc_asprintf(ctdb, "%s/%s.%u",
1381 ctdb->db_directory_state,
1382 PERSISTENT_HEALTH_TDB,
1384 if (persistent_health_path == NULL) {
1385 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
1391 ctdb->db_persistent_health = tdb_wrap_open(ctdb, persistent_health_path,
1392 0, TDB_DISALLOW_NESTING,
1393 O_CREAT | O_RDWR, 0600);
1394 if (ctdb->db_persistent_health == NULL) {
1395 struct tdb_wrap *tdb;
1398 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s': %d - %s\n",
1399 persistent_health_path,
1402 talloc_free(persistent_health_path);
1403 talloc_free(unhealthy_reason);
1408 unhealthy_reason = talloc_asprintf(ctdb, "WARNING - '%s' %s - %s",
1409 persistent_health_path,
1410 "was cleared after a failure",
1411 "manual verification needed");
1412 if (unhealthy_reason == NULL) {
1413 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
1414 talloc_free(persistent_health_path);
1418 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - retrying after CLEAR_IF_FIRST\n",
1419 persistent_health_path));
1420 tdb = tdb_wrap_open(ctdb, persistent_health_path,
1421 0, TDB_CLEAR_IF_FIRST | TDB_DISALLOW_NESTING,
1422 O_CREAT | O_RDWR, 0600);
1424 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - with CLEAR_IF_FIRST: %d - %s\n",
1425 persistent_health_path,
1428 talloc_free(persistent_health_path);
1429 talloc_free(unhealthy_reason);
1436 ret = tdb_check(ctdb->db_persistent_health->tdb, NULL, NULL);
1438 struct tdb_wrap *tdb;
1440 talloc_free(ctdb->db_persistent_health);
1441 ctdb->db_persistent_health = NULL;
1444 DEBUG(DEBUG_CRIT,("tdb_check('%s') failed\n",
1445 persistent_health_path));
1446 talloc_free(persistent_health_path);
1447 talloc_free(unhealthy_reason);
1452 unhealthy_reason = talloc_asprintf(ctdb, "WARNING - '%s' %s - %s",
1453 persistent_health_path,
1454 "was cleared after a failure",
1455 "manual verification needed");
1456 if (unhealthy_reason == NULL) {
1457 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
1458 talloc_free(persistent_health_path);
1462 DEBUG(DEBUG_CRIT,("tdb_check('%s') failed - retrying after CLEAR_IF_FIRST\n",
1463 persistent_health_path));
1464 tdb = tdb_wrap_open(ctdb, persistent_health_path,
1465 0, TDB_CLEAR_IF_FIRST | TDB_DISALLOW_NESTING,
1466 O_CREAT | O_RDWR, 0600);
1468 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - with CLEAR_IF_FIRST: %d - %s\n",
1469 persistent_health_path,
1472 talloc_free(persistent_health_path);
1473 talloc_free(unhealthy_reason);
1480 talloc_free(persistent_health_path);
1482 ret = ctdb_attach_persistent(ctdb, unhealthy_reason);
1483 talloc_free(unhealthy_reason);
1492 called when a broadcast seqnum update comes in
1494 int32_t ctdb_ltdb_update_seqnum(struct ctdb_context *ctdb, uint32_t db_id, uint32_t srcnode)
1496 struct ctdb_db_context *ctdb_db;
1497 if (srcnode == ctdb->pnn) {
1498 /* don't update ourselves! */
1502 ctdb_db = find_ctdb_db(ctdb, db_id);
1504 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_ltdb_update_seqnum\n", db_id));
1508 if (ctdb_db->unhealthy_reason) {
1509 DEBUG(DEBUG_ERR,("db(%s) unhealty in ctdb_ltdb_update_seqnum: %s\n",
1510 ctdb_db->db_name, ctdb_db->unhealthy_reason));
1514 tdb_increment_seqnum_nonblock(ctdb_db->ltdb->tdb);
1515 ctdb_db->seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1520 timer to check for seqnum changes in a ltdb and propogate them
1522 static void ctdb_ltdb_seqnum_check(struct tevent_context *ev,
1523 struct tevent_timer *te,
1524 struct timeval t, void *p)
1526 struct ctdb_db_context *ctdb_db = talloc_get_type(p, struct ctdb_db_context);
1527 struct ctdb_context *ctdb = ctdb_db->ctdb;
1528 uint32_t new_seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1529 if (new_seqnum != ctdb_db->seqnum) {
1530 /* something has changed - propogate it */
1532 data.dptr = (uint8_t *)&ctdb_db->db_id;
1533 data.dsize = sizeof(uint32_t);
1534 ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_VNNMAP, 0,
1535 CTDB_CONTROL_UPDATE_SEQNUM, 0, CTDB_CTRL_FLAG_NOREPLY,
1538 ctdb_db->seqnum = new_seqnum;
1540 /* setup a new timer */
1541 ctdb_db->seqnum_update =
1542 tevent_add_timer(ctdb->ev, ctdb_db,
1543 timeval_current_ofs(ctdb->tunable.seqnum_interval/1000,
1544 (ctdb->tunable.seqnum_interval%1000)*1000),
1545 ctdb_ltdb_seqnum_check, ctdb_db);
1549 enable seqnum handling on this db
1551 int32_t ctdb_ltdb_enable_seqnum(struct ctdb_context *ctdb, uint32_t db_id)
1553 struct ctdb_db_context *ctdb_db;
1554 ctdb_db = find_ctdb_db(ctdb, db_id);
1556 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_ltdb_enable_seqnum\n", db_id));
1560 if (ctdb_db->seqnum_update == NULL) {
1561 ctdb_db->seqnum_update = tevent_add_timer(
1563 timeval_current_ofs(ctdb->tunable.seqnum_interval/1000,
1564 (ctdb->tunable.seqnum_interval%1000)*1000),
1565 ctdb_ltdb_seqnum_check, ctdb_db);
1568 tdb_enable_seqnum(ctdb_db->ltdb->tdb);
1569 ctdb_db->seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1573 int ctdb_set_db_sticky(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db)
1575 if (ctdb_db_sticky(ctdb_db)) {
1579 if (! ctdb_db_volatile(ctdb_db)) {
1581 ("Non-volatile databases do not support sticky flag\n"));
1585 ctdb_db->sticky_records = trbt_create(ctdb_db, 0);
1587 ctdb_db_set_sticky(ctdb_db);
1589 DEBUG(DEBUG_NOTICE,("set db sticky %s\n", ctdb_db->db_name));
1594 void ctdb_db_statistics_reset(struct ctdb_db_context *ctdb_db)
1596 struct ctdb_db_statistics_old *s = &ctdb_db->statistics;
1599 for (i=0; i<MAX_HOT_KEYS; i++) {
1600 if (s->hot_keys[i].key.dsize > 0) {
1601 talloc_free(s->hot_keys[i].key.dptr);
1605 ZERO_STRUCT(ctdb_db->statistics);
1608 int32_t ctdb_control_get_db_statistics(struct ctdb_context *ctdb,
1612 struct ctdb_db_context *ctdb_db;
1613 struct ctdb_db_statistics_old *stats;
1618 ctdb_db = find_ctdb_db(ctdb, db_id);
1620 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in get_db_statistics\n", db_id));
1624 len = offsetof(struct ctdb_db_statistics_old, hot_keys_wire);
1625 for (i = 0; i < MAX_HOT_KEYS; i++) {
1626 len += ctdb_db->statistics.hot_keys[i].key.dsize;
1629 stats = talloc_size(outdata, len);
1630 if (stats == NULL) {
1631 DEBUG(DEBUG_ERR,("Failed to allocate db statistics structure\n"));
1635 memcpy(stats, &ctdb_db->statistics,
1636 offsetof(struct ctdb_db_statistics_old, hot_keys_wire));
1638 stats->num_hot_keys = MAX_HOT_KEYS;
1640 ptr = &stats->hot_keys_wire[0];
1641 for (i = 0; i < MAX_HOT_KEYS; i++) {
1642 memcpy(ptr, ctdb_db->statistics.hot_keys[i].key.dptr,
1643 ctdb_db->statistics.hot_keys[i].key.dsize);
1644 ptr += ctdb_db->statistics.hot_keys[i].key.dsize;
1647 outdata->dptr = (uint8_t *)stats;
1648 outdata->dsize = len;