2 Unix SMB/CIFS implementation.
3 Database interface wrapper around ctdbd
4 Copyright (C) Volker Lendecke 2007
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program. If not, see <http://www.gnu.org/licenses/>.
21 #ifdef CLUSTER_SUPPORT
23 #include "ctdb_private.h"
24 #include "ctdbd_conn.h"
26 struct db_ctdb_transaction_handle {
27 struct db_ctdb_ctx *ctx;
29 /* we store the reads and writes done under a transaction one
30 list stores both reads and writes, the other just writes
32 struct ctdb_marshall_buffer *m_all;
33 struct ctdb_marshall_buffer *m_write;
39 struct db_context *db;
40 struct tdb_wrap *wtdb;
42 struct db_ctdb_transaction_handle *transaction;
46 struct db_ctdb_ctx *ctdb_ctx;
47 struct ctdb_ltdb_header header;
50 static struct db_record *fetch_locked_internal(struct db_ctdb_ctx *ctx,
55 static NTSTATUS tdb_error_to_ntstatus(struct tdb_context *tdb)
58 enum TDB_ERROR tret = tdb_error(tdb);
62 status = NT_STATUS_OBJECT_NAME_COLLISION;
65 status = NT_STATUS_OBJECT_NAME_NOT_FOUND;
68 status = NT_STATUS_INTERNAL_DB_CORRUPTION;
78 form a ctdb_rec_data record from a key/data pair
80 note that header may be NULL. If not NULL then it is included in the data portion
83 static struct ctdb_rec_data *db_ctdb_marshall_record(TALLOC_CTX *mem_ctx, uint32_t reqid,
85 struct ctdb_ltdb_header *header,
89 struct ctdb_rec_data *d;
91 length = offsetof(struct ctdb_rec_data, data) + key.dsize +
92 data.dsize + (header?sizeof(*header):0);
93 d = (struct ctdb_rec_data *)talloc_size(mem_ctx, length);
99 d->keylen = key.dsize;
100 memcpy(&d->data[0], key.dptr, key.dsize);
102 d->datalen = data.dsize + sizeof(*header);
103 memcpy(&d->data[key.dsize], header, sizeof(*header));
104 memcpy(&d->data[key.dsize+sizeof(*header)], data.dptr, data.dsize);
106 d->datalen = data.dsize;
107 memcpy(&d->data[key.dsize], data.dptr, data.dsize);
113 /* helper function for marshalling multiple records */
114 static struct ctdb_marshall_buffer *db_ctdb_marshall_add(TALLOC_CTX *mem_ctx,
115 struct ctdb_marshall_buffer *m,
119 struct ctdb_ltdb_header *header,
122 struct ctdb_rec_data *r;
123 size_t m_size, r_size;
124 struct ctdb_marshall_buffer *m2 = NULL;
126 r = db_ctdb_marshall_record(talloc_tos(), reqid, key, header, data);
133 m = (struct ctdb_marshall_buffer *)talloc_zero_size(
134 mem_ctx, offsetof(struct ctdb_marshall_buffer, data));
141 m_size = talloc_get_size(m);
142 r_size = talloc_get_size(r);
144 m2 = (struct ctdb_marshall_buffer *)talloc_realloc_size(
145 mem_ctx, m, m_size + r_size);
151 memcpy(m_size + (uint8_t *)m2, r, r_size);
160 /* we've finished marshalling, return a data blob with the marshalled records */
161 static TDB_DATA db_ctdb_marshall_finish(struct ctdb_marshall_buffer *m)
164 data.dptr = (uint8_t *)m;
165 data.dsize = talloc_get_size(m);
170 loop over a marshalling buffer
172 - pass r==NULL to start
173 - loop the number of times indicated by m->count
175 static struct ctdb_rec_data *db_ctdb_marshall_loop_next(struct ctdb_marshall_buffer *m, struct ctdb_rec_data *r,
177 struct ctdb_ltdb_header *header,
178 TDB_DATA *key, TDB_DATA *data)
181 r = (struct ctdb_rec_data *)&m->data[0];
183 r = (struct ctdb_rec_data *)(r->length + (uint8_t *)r);
191 key->dptr = &r->data[0];
192 key->dsize = r->keylen;
195 data->dptr = &r->data[r->keylen];
196 data->dsize = r->datalen;
197 if (header != NULL) {
198 data->dptr += sizeof(*header);
199 data->dsize -= sizeof(*header);
203 if (header != NULL) {
204 if (r->datalen < sizeof(*header)) {
207 *header = *(struct ctdb_ltdb_header *)&r->data[r->keylen];
216 * CTDB transaction destructor
218 static int db_ctdb_transaction_destructor(struct db_ctdb_transaction_handle *h)
220 tdb_transaction_cancel(h->ctx->wtdb->tdb);
225 * start a transaction on a ctdb database:
226 * - lock the transaction lock key
227 * - start the tdb transaction
229 static int db_ctdb_transaction_fetch_start(struct db_ctdb_transaction_handle *h)
231 struct db_record *rh;
234 const char *keyname = CTDB_TRANSACTION_LOCK_KEY;
236 struct db_ctdb_ctx *ctx = h->ctx;
239 key.dptr = (uint8_t *)discard_const(keyname);
240 key.dsize = strlen(keyname);
243 tmp_ctx = talloc_new(h);
245 rh = fetch_locked_internal(ctx, tmp_ctx, key, true);
247 DEBUG(0,(__location__ " Failed to fetch_lock database\n"));
248 talloc_free(tmp_ctx);
253 ret = tdb_transaction_start(ctx->wtdb->tdb);
255 DEBUG(0,(__location__ " Failed to start tdb transaction\n"));
256 talloc_free(tmp_ctx);
260 data = tdb_fetch(ctx->wtdb->tdb, key);
261 if ((data.dptr == NULL) ||
262 (data.dsize < sizeof(struct ctdb_ltdb_header)) ||
263 ((struct ctdb_ltdb_header *)data.dptr)->dmaster != get_my_vnn()) {
264 SAFE_FREE(data.dptr);
265 tdb_transaction_cancel(ctx->wtdb->tdb);
266 talloc_free(tmp_ctx);
270 SAFE_FREE(data.dptr);
271 talloc_free(tmp_ctx);
278 * CTDB dbwrap API: transaction_start function
279 * starts a transaction on a persistent database
281 static int db_ctdb_transaction_start(struct db_context *db)
283 struct db_ctdb_transaction_handle *h;
285 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
288 if (!db->persistent) {
289 DEBUG(0,("transactions not supported on non-persistent database 0x%08x\n",
294 if (ctx->transaction) {
295 ctx->transaction->nesting++;
299 h = talloc_zero(db, struct db_ctdb_transaction_handle);
301 DEBUG(0,(__location__ " oom for transaction handle\n"));
307 ret = db_ctdb_transaction_fetch_start(h);
313 talloc_set_destructor(h, db_ctdb_transaction_destructor);
315 ctx->transaction = h;
317 DEBUG(5,(__location__ " Started transaction on db 0x%08x\n", ctx->db_id));
325 fetch a record inside a transaction
327 static int db_ctdb_transaction_fetch(struct db_ctdb_ctx *db,
329 TDB_DATA key, TDB_DATA *data)
331 struct db_ctdb_transaction_handle *h = db->transaction;
333 *data = tdb_fetch(h->ctx->wtdb->tdb, key);
335 if (data->dptr != NULL) {
336 uint8_t *oldptr = (uint8_t *)data->dptr;
337 data->dsize -= sizeof(struct ctdb_ltdb_header);
338 if (data->dsize == 0) {
341 data->dptr = (uint8 *)
343 mem_ctx, data->dptr+sizeof(struct ctdb_ltdb_header),
347 if (data->dptr == NULL && data->dsize != 0) {
353 h->m_all = db_ctdb_marshall_add(h, h->m_all, h->ctx->db_id, 1, key, NULL, *data);
354 if (h->m_all == NULL) {
355 DEBUG(0,(__location__ " Failed to add to marshalling record\n"));
357 talloc_free(data->dptr);
366 static NTSTATUS db_ctdb_store_transaction(struct db_record *rec, TDB_DATA data, int flag);
367 static NTSTATUS db_ctdb_delete_transaction(struct db_record *rec);
369 static struct db_record *db_ctdb_fetch_locked_transaction(struct db_ctdb_ctx *ctx,
373 struct db_record *result;
376 if (!(result = talloc(mem_ctx, struct db_record))) {
377 DEBUG(0, ("talloc failed\n"));
381 result->private_data = ctx->transaction;
383 result->key.dsize = key.dsize;
384 result->key.dptr = (uint8 *)talloc_memdup(result, key.dptr, key.dsize);
385 if (result->key.dptr == NULL) {
386 DEBUG(0, ("talloc failed\n"));
391 result->store = db_ctdb_store_transaction;
392 result->delete_rec = db_ctdb_delete_transaction;
394 ctdb_data = tdb_fetch(ctx->wtdb->tdb, key);
395 if (ctdb_data.dptr == NULL) {
396 /* create the record */
397 result->value = tdb_null;
401 result->value.dsize = ctdb_data.dsize - sizeof(struct ctdb_ltdb_header);
402 result->value.dptr = NULL;
404 if ((result->value.dsize != 0)
405 && !(result->value.dptr = (uint8 *)talloc_memdup(
406 result, ctdb_data.dptr + sizeof(struct ctdb_ltdb_header),
407 result->value.dsize))) {
408 DEBUG(0, ("talloc failed\n"));
412 SAFE_FREE(ctdb_data.dptr);
417 static int db_ctdb_record_destructor(struct db_record **recp)
419 struct db_record *rec = talloc_get_type_abort(*recp, struct db_record);
420 struct db_ctdb_transaction_handle *h = talloc_get_type_abort(
421 rec->private_data, struct db_ctdb_transaction_handle);
422 int ret = h->ctx->db->transaction_commit(h->ctx->db);
424 DEBUG(0,(__location__ " transaction_commit failed\n"));
430 auto-create a transaction for persistent databases
432 static struct db_record *db_ctdb_fetch_locked_persistent(struct db_ctdb_ctx *ctx,
437 struct db_record *rec, **recp;
439 res = db_ctdb_transaction_start(ctx->db);
444 rec = db_ctdb_fetch_locked_transaction(ctx, mem_ctx, key);
446 ctx->db->transaction_cancel(ctx->db);
450 /* destroy this transaction when we release the lock */
451 recp = talloc(rec, struct db_record *);
453 ctx->db->transaction_cancel(ctx->db);
458 talloc_set_destructor(recp, db_ctdb_record_destructor);
464 stores a record inside a transaction
466 static int db_ctdb_transaction_store(struct db_ctdb_transaction_handle *h,
467 TDB_DATA key, TDB_DATA data)
469 TALLOC_CTX *tmp_ctx = talloc_new(h);
472 struct ctdb_ltdb_header header;
474 /* we need the header so we can update the RSN */
475 rec = tdb_fetch(h->ctx->wtdb->tdb, key);
476 if (rec.dptr == NULL) {
477 /* the record doesn't exist - create one with us as dmaster.
478 This is only safe because we are in a transaction and this
479 is a persistent database */
482 memcpy(&header, rec.dptr, sizeof(struct ctdb_ltdb_header));
483 rec.dsize -= sizeof(struct ctdb_ltdb_header);
484 /* a special case, we are writing the same data that is there now */
485 if (data.dsize == rec.dsize &&
486 memcmp(data.dptr, rec.dptr + sizeof(struct ctdb_ltdb_header), data.dsize) == 0) {
488 talloc_free(tmp_ctx);
494 header.dmaster = get_my_vnn();
498 h->m_all = db_ctdb_marshall_add(h, h->m_all, h->ctx->db_id, 0, key, NULL, data);
499 if (h->m_all == NULL) {
500 DEBUG(0,(__location__ " Failed to add to marshalling record\n"));
501 talloc_free(tmp_ctx);
506 h->m_write = db_ctdb_marshall_add(h, h->m_write, h->ctx->db_id, 0, key, &header, data);
507 if (h->m_write == NULL) {
508 DEBUG(0,(__location__ " Failed to add to marshalling record\n"));
509 talloc_free(tmp_ctx);
513 rec.dsize = data.dsize + sizeof(struct ctdb_ltdb_header);
514 rec.dptr = (uint8_t *)talloc_size(tmp_ctx, rec.dsize);
515 if (rec.dptr == NULL) {
516 DEBUG(0,(__location__ " Failed to alloc record\n"));
517 talloc_free(tmp_ctx);
520 memcpy(rec.dptr, &header, sizeof(struct ctdb_ltdb_header));
521 memcpy(sizeof(struct ctdb_ltdb_header) + (uint8_t *)rec.dptr, data.dptr, data.dsize);
523 ret = tdb_store(h->ctx->wtdb->tdb, key, rec, TDB_REPLACE);
525 talloc_free(tmp_ctx);
532 a record store inside a transaction
534 static NTSTATUS db_ctdb_store_transaction(struct db_record *rec, TDB_DATA data, int flag)
536 struct db_ctdb_transaction_handle *h = talloc_get_type_abort(
537 rec->private_data, struct db_ctdb_transaction_handle);
540 ret = db_ctdb_transaction_store(h, rec->key, data);
542 return tdb_error_to_ntstatus(h->ctx->wtdb->tdb);
548 a record delete inside a transaction
550 static NTSTATUS db_ctdb_delete_transaction(struct db_record *rec)
552 struct db_ctdb_transaction_handle *h = talloc_get_type_abort(
553 rec->private_data, struct db_ctdb_transaction_handle);
556 ret = db_ctdb_transaction_store(h, rec->key, tdb_null);
558 return tdb_error_to_ntstatus(h->ctx->wtdb->tdb);
567 static int ctdb_replay_transaction(struct db_ctdb_transaction_handle *h)
570 struct ctdb_rec_data *rec = NULL;
573 talloc_free(h->m_write);
576 ret = db_ctdb_transaction_fetch_start(h);
581 for (i=0;i<h->m_all->count;i++) {
584 rec = db_ctdb_marshall_loop_next(h->m_all, rec, NULL, NULL, &key, &data);
586 DEBUG(0, (__location__ " Out of records in ctdb_replay_transaction?\n"));
590 if (rec->reqid == 0) {
592 if (db_ctdb_transaction_store(h, key, data) != 0) {
597 TALLOC_CTX *tmp_ctx = talloc_new(h);
599 if (db_ctdb_transaction_fetch(h->ctx, tmp_ctx, key, &data2) != 0) {
600 talloc_free(tmp_ctx);
603 if (data2.dsize != data.dsize ||
604 memcmp(data2.dptr, data.dptr, data.dsize) != 0) {
605 /* the record has changed on us - we have to give up */
606 talloc_free(tmp_ctx);
609 talloc_free(tmp_ctx);
616 tdb_transaction_cancel(h->ctx->wtdb->tdb);
624 static int db_ctdb_transaction_commit(struct db_context *db)
626 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
632 struct db_ctdb_transaction_handle *h = ctx->transaction;
633 enum ctdb_controls failure_control = CTDB_CONTROL_TRANS2_ERROR;
636 DEBUG(0,(__location__ " transaction commit with no open transaction on db 0x%08x\n", ctx->db_id));
640 if (h->nested_cancel) {
641 db->transaction_cancel(db);
642 DEBUG(5,(__location__ " Failed transaction commit after nested cancel\n"));
646 if (h->nesting != 0) {
651 DEBUG(5,(__location__ " Commit transaction on db 0x%08x\n", ctx->db_id));
653 talloc_set_destructor(h, NULL);
655 /* our commit strategy is quite complex.
657 - we first try to commit the changes to all other nodes
659 - if that works, then we commit locally and we are done
661 - if a commit on another node fails, then we need to cancel
662 the transaction, then restart the transaction (thus
663 opening a window of time for a pending recovery to
664 complete), then replay the transaction, checking all the
665 reads and writes (checking that reads give the same data,
666 and writes succeed). Then we retry the transaction to the
671 if (h->m_write == NULL) {
672 /* no changes were made, potentially after a retry */
673 tdb_transaction_cancel(h->ctx->wtdb->tdb);
675 ctx->transaction = NULL;
679 /* tell ctdbd to commit to the other nodes */
680 rets = ctdbd_control_local(messaging_ctdbd_connection(),
681 retries==0?CTDB_CONTROL_TRANS2_COMMIT:CTDB_CONTROL_TRANS2_COMMIT_RETRY,
683 db_ctdb_marshall_finish(h->m_write), NULL, NULL, &status);
684 if (!NT_STATUS_IS_OK(rets) || status != 0) {
685 tdb_transaction_cancel(h->ctx->wtdb->tdb);
688 if (!NT_STATUS_IS_OK(rets)) {
689 failure_control = CTDB_CONTROL_TRANS2_ERROR;
691 /* work out what error code we will give if we
692 have to fail the operation */
693 switch ((enum ctdb_trans2_commit_error)status) {
694 case CTDB_TRANS2_COMMIT_SUCCESS:
695 case CTDB_TRANS2_COMMIT_SOMEFAIL:
696 case CTDB_TRANS2_COMMIT_TIMEOUT:
697 failure_control = CTDB_CONTROL_TRANS2_ERROR;
699 case CTDB_TRANS2_COMMIT_ALLFAIL:
700 failure_control = CTDB_CONTROL_TRANS2_FINISHED;
705 if (++retries == 5) {
706 DEBUG(0,(__location__ " Giving up transaction on db 0x%08x after %d retries failure_control=%u\n",
707 h->ctx->db_id, retries, (unsigned)failure_control));
708 ctdbd_control_local(messaging_ctdbd_connection(), failure_control,
709 h->ctx->db_id, CTDB_CTRL_FLAG_NOREPLY,
710 tdb_null, NULL, NULL, NULL);
711 h->ctx->transaction = NULL;
713 ctx->transaction = NULL;
717 if (ctdb_replay_transaction(h) != 0) {
718 DEBUG(0,(__location__ " Failed to replay transaction failure_control=%u\n",
719 (unsigned)failure_control));
720 ctdbd_control_local(messaging_ctdbd_connection(), failure_control,
721 h->ctx->db_id, CTDB_CTRL_FLAG_NOREPLY,
722 tdb_null, NULL, NULL, NULL);
723 h->ctx->transaction = NULL;
725 ctx->transaction = NULL;
730 failure_control = CTDB_CONTROL_TRANS2_ERROR;
733 /* do the real commit locally */
734 ret = tdb_transaction_commit(h->ctx->wtdb->tdb);
736 DEBUG(0,(__location__ " Failed to commit transaction failure_control=%u\n",
737 (unsigned)failure_control));
738 ctdbd_control_local(messaging_ctdbd_connection(), failure_control, h->ctx->db_id,
739 CTDB_CTRL_FLAG_NOREPLY, tdb_null, NULL, NULL, NULL);
740 h->ctx->transaction = NULL;
745 /* tell ctdbd that we are finished with our local commit */
746 ctdbd_control_local(messaging_ctdbd_connection(), CTDB_CONTROL_TRANS2_FINISHED,
747 h->ctx->db_id, CTDB_CTRL_FLAG_NOREPLY,
748 tdb_null, NULL, NULL, NULL);
749 h->ctx->transaction = NULL;
758 static int db_ctdb_transaction_cancel(struct db_context *db)
760 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
762 struct db_ctdb_transaction_handle *h = ctx->transaction;
765 DEBUG(0,(__location__ " transaction cancel with no open transaction on db 0x%08x\n", ctx->db_id));
769 if (h->nesting != 0) {
771 h->nested_cancel = true;
775 DEBUG(5,(__location__ " Cancel transaction on db 0x%08x\n", ctx->db_id));
777 ctx->transaction = NULL;
783 static NTSTATUS db_ctdb_store(struct db_record *rec, TDB_DATA data, int flag)
785 struct db_ctdb_rec *crec = talloc_get_type_abort(
786 rec->private_data, struct db_ctdb_rec);
790 cdata.dsize = sizeof(crec->header) + data.dsize;
792 if (!(cdata.dptr = SMB_MALLOC_ARRAY(uint8, cdata.dsize))) {
793 return NT_STATUS_NO_MEMORY;
796 memcpy(cdata.dptr, &crec->header, sizeof(crec->header));
797 memcpy(cdata.dptr + sizeof(crec->header), data.dptr, data.dsize);
799 ret = tdb_store(crec->ctdb_ctx->wtdb->tdb, rec->key, cdata, TDB_REPLACE);
801 SAFE_FREE(cdata.dptr);
803 return (ret == 0) ? NT_STATUS_OK
804 : tdb_error_to_ntstatus(crec->ctdb_ctx->wtdb->tdb);
809 static NTSTATUS db_ctdb_delete(struct db_record *rec)
814 * We have to store the header with empty data. TODO: Fix the
820 return db_ctdb_store(rec, data, 0);
824 static int db_ctdb_record_destr(struct db_record* data)
826 struct db_ctdb_rec *crec = talloc_get_type_abort(
827 data->private_data, struct db_ctdb_rec);
829 DEBUG(10, (DEBUGLEVEL > 10
830 ? "Unlocking db %u key %s\n"
831 : "Unlocking db %u key %.20s\n",
832 (int)crec->ctdb_ctx->db_id,
833 hex_encode_talloc(data, (unsigned char *)data->key.dptr,
836 if (tdb_chainunlock(crec->ctdb_ctx->wtdb->tdb, data->key) != 0) {
837 DEBUG(0, ("tdb_chainunlock failed\n"));
844 static struct db_record *fetch_locked_internal(struct db_ctdb_ctx *ctx,
849 struct db_record *result;
850 struct db_ctdb_rec *crec;
853 int migrate_attempts = 0;
855 if (!(result = talloc(mem_ctx, struct db_record))) {
856 DEBUG(0, ("talloc failed\n"));
860 if (!(crec = TALLOC_ZERO_P(result, struct db_ctdb_rec))) {
861 DEBUG(0, ("talloc failed\n"));
866 result->private_data = (void *)crec;
867 crec->ctdb_ctx = ctx;
869 result->key.dsize = key.dsize;
870 result->key.dptr = (uint8 *)talloc_memdup(result, key.dptr, key.dsize);
871 if (result->key.dptr == NULL) {
872 DEBUG(0, ("talloc failed\n"));
878 * Do a blocking lock on the record
882 if (DEBUGLEVEL >= 10) {
883 char *keystr = hex_encode_talloc(result, key.dptr, key.dsize);
884 DEBUG(10, (DEBUGLEVEL > 10
885 ? "Locking db %u key %s\n"
886 : "Locking db %u key %.20s\n",
887 (int)crec->ctdb_ctx->db_id, keystr));
891 if (tdb_chainlock(ctx->wtdb->tdb, key) != 0) {
892 DEBUG(3, ("tdb_chainlock failed\n"));
897 result->store = db_ctdb_store;
898 result->delete_rec = db_ctdb_delete;
899 talloc_set_destructor(result, db_ctdb_record_destr);
901 ctdb_data = tdb_fetch(ctx->wtdb->tdb, key);
904 * See if we have a valid record and we are the dmaster. If so, we can
905 * take the shortcut and just return it.
908 if ((ctdb_data.dptr == NULL) ||
909 (ctdb_data.dsize < sizeof(struct ctdb_ltdb_header)) ||
910 ((struct ctdb_ltdb_header *)ctdb_data.dptr)->dmaster != get_my_vnn()
912 || (random() % 2 != 0)
915 SAFE_FREE(ctdb_data.dptr);
916 tdb_chainunlock(ctx->wtdb->tdb, key);
917 talloc_set_destructor(result, NULL);
919 migrate_attempts += 1;
921 DEBUG(10, ("ctdb_data.dptr = %p, dmaster = %u (%u)\n",
922 ctdb_data.dptr, ctdb_data.dptr ?
923 ((struct ctdb_ltdb_header *)ctdb_data.dptr)->dmaster : -1,
926 status = ctdbd_migrate(messaging_ctdbd_connection(),ctx->db_id, key);
927 if (!NT_STATUS_IS_OK(status)) {
928 DEBUG(5, ("ctdb_migrate failed: %s\n",
933 /* now its migrated, try again */
937 if (migrate_attempts > 10) {
938 DEBUG(0, ("db_ctdb_fetch_locked needed %d attempts\n",
942 memcpy(&crec->header, ctdb_data.dptr, sizeof(crec->header));
944 result->value.dsize = ctdb_data.dsize - sizeof(crec->header);
945 result->value.dptr = NULL;
947 if ((result->value.dsize != 0)
948 && !(result->value.dptr = (uint8 *)talloc_memdup(
949 result, ctdb_data.dptr + sizeof(crec->header),
950 result->value.dsize))) {
951 DEBUG(0, ("talloc failed\n"));
955 SAFE_FREE(ctdb_data.dptr);
960 static struct db_record *db_ctdb_fetch_locked(struct db_context *db,
964 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
967 if (ctx->transaction != NULL) {
968 return db_ctdb_fetch_locked_transaction(ctx, mem_ctx, key);
971 if (db->persistent) {
972 return db_ctdb_fetch_locked_persistent(ctx, mem_ctx, key);
975 return fetch_locked_internal(ctx, mem_ctx, key, db->persistent);
979 fetch (unlocked, no migration) operation on ctdb
981 static int db_ctdb_fetch(struct db_context *db, TALLOC_CTX *mem_ctx,
982 TDB_DATA key, TDB_DATA *data)
984 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
989 if (ctx->transaction) {
990 return db_ctdb_transaction_fetch(ctx, mem_ctx, key, data);
993 /* try a direct fetch */
994 ctdb_data = tdb_fetch(ctx->wtdb->tdb, key);
997 * See if we have a valid record and we are the dmaster. If so, we can
998 * take the shortcut and just return it.
999 * we bypass the dmaster check for persistent databases
1001 if ((ctdb_data.dptr != NULL) &&
1002 (ctdb_data.dsize >= sizeof(struct ctdb_ltdb_header)) &&
1004 ((struct ctdb_ltdb_header *)ctdb_data.dptr)->dmaster == get_my_vnn())) {
1005 /* we are the dmaster - avoid the ctdb protocol op */
1007 data->dsize = ctdb_data.dsize - sizeof(struct ctdb_ltdb_header);
1008 if (data->dsize == 0) {
1009 SAFE_FREE(ctdb_data.dptr);
1014 data->dptr = (uint8 *)talloc_memdup(
1015 mem_ctx, ctdb_data.dptr+sizeof(struct ctdb_ltdb_header),
1018 SAFE_FREE(ctdb_data.dptr);
1020 if (data->dptr == NULL) {
1026 SAFE_FREE(ctdb_data.dptr);
1028 /* we weren't able to get it locally - ask ctdb to fetch it for us */
1029 status = ctdbd_fetch(messaging_ctdbd_connection(),ctx->db_id, key, mem_ctx, data);
1030 if (!NT_STATUS_IS_OK(status)) {
1031 DEBUG(5, ("ctdbd_fetch failed: %s\n", nt_errstr(status)));
1038 struct traverse_state {
1039 struct db_context *db;
1040 int (*fn)(struct db_record *rec, void *private_data);
1044 static void traverse_callback(TDB_DATA key, TDB_DATA data, void *private_data)
1046 struct traverse_state *state = (struct traverse_state *)private_data;
1047 struct db_record *rec;
1048 TALLOC_CTX *tmp_ctx = talloc_new(state->db);
1049 /* we have to give them a locked record to prevent races */
1050 rec = db_ctdb_fetch_locked(state->db, tmp_ctx, key);
1051 if (rec && rec->value.dsize > 0) {
1052 state->fn(rec, state->private_data);
1054 talloc_free(tmp_ctx);
1057 static int traverse_persistent_callback(TDB_CONTEXT *tdb, TDB_DATA kbuf, TDB_DATA dbuf,
1060 struct traverse_state *state = (struct traverse_state *)private_data;
1061 struct db_record *rec;
1062 TALLOC_CTX *tmp_ctx = talloc_new(state->db);
1064 /* we have to give them a locked record to prevent races */
1065 rec = db_ctdb_fetch_locked(state->db, tmp_ctx, kbuf);
1066 if (rec && rec->value.dsize > 0) {
1067 ret = state->fn(rec, state->private_data);
1069 talloc_free(tmp_ctx);
1073 static int db_ctdb_traverse(struct db_context *db,
1074 int (*fn)(struct db_record *rec,
1075 void *private_data),
1078 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1079 struct db_ctdb_ctx);
1080 struct traverse_state state;
1084 state.private_data = private_data;
1086 if (db->persistent) {
1087 /* for persistent databases we don't need to do a ctdb traverse,
1088 we can do a faster local traverse */
1089 return tdb_traverse(ctx->wtdb->tdb, traverse_persistent_callback, &state);
1093 ctdbd_traverse(ctx->db_id, traverse_callback, &state);
1097 static NTSTATUS db_ctdb_store_deny(struct db_record *rec, TDB_DATA data, int flag)
1099 return NT_STATUS_MEDIA_WRITE_PROTECTED;
1102 static NTSTATUS db_ctdb_delete_deny(struct db_record *rec)
1104 return NT_STATUS_MEDIA_WRITE_PROTECTED;
1107 static void traverse_read_callback(TDB_DATA key, TDB_DATA data, void *private_data)
1109 struct traverse_state *state = (struct traverse_state *)private_data;
1110 struct db_record rec;
1113 rec.store = db_ctdb_store_deny;
1114 rec.delete_rec = db_ctdb_delete_deny;
1115 rec.private_data = state->db;
1116 state->fn(&rec, state->private_data);
1119 static int traverse_persistent_callback_read(TDB_CONTEXT *tdb, TDB_DATA kbuf, TDB_DATA dbuf,
1122 struct traverse_state *state = (struct traverse_state *)private_data;
1123 struct db_record rec;
1126 rec.store = db_ctdb_store_deny;
1127 rec.delete_rec = db_ctdb_delete_deny;
1128 rec.private_data = state->db;
1130 if (rec.value.dsize <= sizeof(struct ctdb_ltdb_header)) {
1131 /* a deleted record */
1134 rec.value.dsize -= sizeof(struct ctdb_ltdb_header);
1135 rec.value.dptr += sizeof(struct ctdb_ltdb_header);
1137 return state->fn(&rec, state->private_data);
1140 static int db_ctdb_traverse_read(struct db_context *db,
1141 int (*fn)(struct db_record *rec,
1142 void *private_data),
1145 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1146 struct db_ctdb_ctx);
1147 struct traverse_state state;
1151 state.private_data = private_data;
1153 if (db->persistent) {
1154 /* for persistent databases we don't need to do a ctdb traverse,
1155 we can do a faster local traverse */
1156 return tdb_traverse_read(ctx->wtdb->tdb, traverse_persistent_callback_read, &state);
1159 ctdbd_traverse(ctx->db_id, traverse_read_callback, &state);
1163 static int db_ctdb_get_seqnum(struct db_context *db)
1165 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1166 struct db_ctdb_ctx);
1167 return tdb_get_seqnum(ctx->wtdb->tdb);
1170 static int db_ctdb_get_flags(struct db_context *db)
1172 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1173 struct db_ctdb_ctx);
1174 return tdb_get_flags(ctx->wtdb->tdb);
1177 struct db_context *db_open_ctdb(TALLOC_CTX *mem_ctx,
1179 int hash_size, int tdb_flags,
1180 int open_flags, mode_t mode)
1182 struct db_context *result;
1183 struct db_ctdb_ctx *db_ctdb;
1186 if (!lp_clustering()) {
1187 DEBUG(10, ("Clustering disabled -- no ctdb\n"));
1191 if (!(result = TALLOC_ZERO_P(mem_ctx, struct db_context))) {
1192 DEBUG(0, ("talloc failed\n"));
1193 TALLOC_FREE(result);
1197 if (!(db_ctdb = TALLOC_P(result, struct db_ctdb_ctx))) {
1198 DEBUG(0, ("talloc failed\n"));
1199 TALLOC_FREE(result);
1203 db_ctdb->transaction = NULL;
1204 db_ctdb->db = result;
1206 if (!NT_STATUS_IS_OK(ctdbd_db_attach(messaging_ctdbd_connection(),name, &db_ctdb->db_id, tdb_flags))) {
1207 DEBUG(0, ("ctdbd_db_attach failed for %s\n", name));
1208 TALLOC_FREE(result);
1212 db_path = ctdbd_dbpath(messaging_ctdbd_connection(), db_ctdb, db_ctdb->db_id);
1214 result->persistent = ((tdb_flags & TDB_CLEAR_IF_FIRST) == 0);
1216 /* only pass through specific flags */
1217 tdb_flags &= TDB_SEQNUM;
1219 /* honor permissions if user has specified O_CREAT */
1220 if (open_flags & O_CREAT) {
1221 chmod(db_path, mode);
1224 db_ctdb->wtdb = tdb_wrap_open(db_ctdb, db_path, hash_size, tdb_flags, O_RDWR, 0);
1225 if (db_ctdb->wtdb == NULL) {
1226 DEBUG(0, ("Could not open tdb %s: %s\n", db_path, strerror(errno)));
1227 TALLOC_FREE(result);
1230 talloc_free(db_path);
1232 result->private_data = (void *)db_ctdb;
1233 result->fetch_locked = db_ctdb_fetch_locked;
1234 result->fetch = db_ctdb_fetch;
1235 result->traverse = db_ctdb_traverse;
1236 result->traverse_read = db_ctdb_traverse_read;
1237 result->get_seqnum = db_ctdb_get_seqnum;
1238 result->get_flags = db_ctdb_get_flags;
1239 result->transaction_start = db_ctdb_transaction_start;
1240 result->transaction_commit = db_ctdb_transaction_commit;
1241 result->transaction_cancel = db_ctdb_transaction_cancel;
1243 DEBUG(3,("db_open_ctdb: opened database '%s' with dbid 0x%x\n",
1244 name, db_ctdb->db_id));