0a579974e85b05d67961a0f6d12d56c9cfe40535
[obnox/samba/samba-obnox.git] / source3 / lib / dbwrap / dbwrap_ctdb.c
1 /* 
2    Unix SMB/CIFS implementation.
3    Database interface wrapper around ctdbd
4    Copyright (C) Volker Lendecke 2007-2009
5    Copyright (C) Michael Adam 2009
6
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 3 of the License, or
10    (at your option) any later version.
11
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16
17    You should have received a copy of the GNU General Public License
18    along with this program.  If not, see <http://www.gnu.org/licenses/>.
19 */
20
21 #include "includes.h"
22 #include "system/filesys.h"
23 #include "lib/tdb_wrap/tdb_wrap.h"
24 #include "util_tdb.h"
25 #include "dbwrap/dbwrap.h"
26 #include "dbwrap/dbwrap_ctdb.h"
27 #include "dbwrap/dbwrap_rbt.h"
28 #include "lib/param/param.h"
29
30 #ifdef CLUSTER_SUPPORT
31
32 /*
33  * It is not possible to include ctdb.h and tdb_compat.h (included via
34  * some other include above) without warnings. This fixes those
35  * warnings.
36  */
37
38 #ifdef typesafe_cb
39 #undef typesafe_cb
40 #endif
41
42 #ifdef typesafe_cb_preargs
43 #undef typesafe_cb_preargs
44 #endif
45
46 #ifdef typesafe_cb_postargs
47 #undef typesafe_cb_postargs
48 #endif
49
50 #include "ctdb.h"
51 #include "ctdb_private.h"
52 #include "ctdbd_conn.h"
53 #include "dbwrap/dbwrap.h"
54 #include "dbwrap/dbwrap_private.h"
55 #include "dbwrap/dbwrap_ctdb.h"
56 #include "g_lock.h"
57 #include "messages.h"
58
59 struct db_ctdb_transaction_handle {
60         struct db_ctdb_ctx *ctx;
61         /*
62          * we store the writes done under a transaction:
63          */
64         struct ctdb_marshall_buffer *m_write;
65         uint32_t nesting;
66         bool nested_cancel;
67         char *lock_name;
68 };
69
70 struct db_ctdb_ctx {
71         struct db_context *db;
72         struct tdb_wrap *wtdb;
73         uint32_t db_id;
74         struct db_ctdb_transaction_handle *transaction;
75         struct g_lock_ctx *lock_ctx;
76 };
77
78 struct db_ctdb_rec {
79         struct db_ctdb_ctx *ctdb_ctx;
80         struct ctdb_ltdb_header header;
81         struct timeval lock_time;
82 };
83
84 static NTSTATUS tdb_error_to_ntstatus(struct tdb_context *tdb)
85 {
86         enum TDB_ERROR tret = tdb_error(tdb);
87
88         return map_nt_error_from_tdb(tret);
89 }
90
91
92 /**
93  * fetch a record from the tdb, separating out the header
94  * information and returning the body of the record.
95  */
96 static NTSTATUS db_ctdb_ltdb_fetch(struct db_ctdb_ctx *db,
97                                    TDB_DATA key,
98                                    struct ctdb_ltdb_header *header,
99                                    TALLOC_CTX *mem_ctx,
100                                    TDB_DATA *data)
101 {
102         TDB_DATA rec;
103         NTSTATUS status;
104
105         rec = tdb_fetch_compat(db->wtdb->tdb, key);
106         if (rec.dsize < sizeof(struct ctdb_ltdb_header)) {
107                 status = NT_STATUS_NOT_FOUND;
108                 if (data) {
109                         ZERO_STRUCTP(data);
110                 }
111                 if (header) {
112                         header->dmaster = (uint32_t)-1;
113                         header->rsn = 0;
114                 }
115                 goto done;
116         }
117
118         if (header) {
119                 *header = *(struct ctdb_ltdb_header *)rec.dptr;
120         }
121
122         if (data) {
123                 data->dsize = rec.dsize - sizeof(struct ctdb_ltdb_header);
124                 if (data->dsize == 0) {
125                         data->dptr = NULL;
126                 } else {
127                         data->dptr = (unsigned char *)talloc_memdup(mem_ctx,
128                                         rec.dptr
129                                          + sizeof(struct ctdb_ltdb_header),
130                                         data->dsize);
131                         if (data->dptr == NULL) {
132                                 status = NT_STATUS_NO_MEMORY;
133                                 goto done;
134                         }
135                 }
136         }
137
138         status = NT_STATUS_OK;
139
140 done:
141         SAFE_FREE(rec.dptr);
142         return status;
143 }
144
145 /*
146  * Store a record together with the ctdb record header
147  * in the local copy of the database.
148  */
149 static NTSTATUS db_ctdb_ltdb_store(struct db_ctdb_ctx *db,
150                                    TDB_DATA key,
151                                    struct ctdb_ltdb_header *header,
152                                    TDB_DATA data)
153 {
154         TALLOC_CTX *tmp_ctx = talloc_stackframe();
155         TDB_DATA rec;
156         int ret;
157
158         rec.dsize = data.dsize + sizeof(struct ctdb_ltdb_header);
159         rec.dptr = (uint8_t *)talloc_size(tmp_ctx, rec.dsize);
160
161         if (rec.dptr == NULL) {
162                 talloc_free(tmp_ctx);
163                 return NT_STATUS_NO_MEMORY;
164         }
165
166         memcpy(rec.dptr, header, sizeof(struct ctdb_ltdb_header));
167         memcpy(sizeof(struct ctdb_ltdb_header) + (uint8_t *)rec.dptr, data.dptr, data.dsize);
168
169         ret = tdb_store(db->wtdb->tdb, key, rec, TDB_REPLACE);
170
171         talloc_free(tmp_ctx);
172
173         return (ret == 0) ? NT_STATUS_OK
174                           : tdb_error_to_ntstatus(db->wtdb->tdb);
175
176 }
177
178 /*
179   form a ctdb_rec_data record from a key/data pair
180
181   note that header may be NULL. If not NULL then it is included in the data portion
182   of the record
183  */
184 static struct ctdb_rec_data *db_ctdb_marshall_record(TALLOC_CTX *mem_ctx, uint32_t reqid,       
185                                                   TDB_DATA key, 
186                                                   struct ctdb_ltdb_header *header,
187                                                   TDB_DATA data)
188 {
189         size_t length;
190         struct ctdb_rec_data *d;
191
192         length = offsetof(struct ctdb_rec_data, data) + key.dsize + 
193                 data.dsize + (header?sizeof(*header):0);
194         d = (struct ctdb_rec_data *)talloc_size(mem_ctx, length);
195         if (d == NULL) {
196                 return NULL;
197         }
198         d->length = length;
199         d->reqid = reqid;
200         d->keylen = key.dsize;
201         memcpy(&d->data[0], key.dptr, key.dsize);
202         if (header) {
203                 d->datalen = data.dsize + sizeof(*header);
204                 memcpy(&d->data[key.dsize], header, sizeof(*header));
205                 memcpy(&d->data[key.dsize+sizeof(*header)], data.dptr, data.dsize);
206         } else {
207                 d->datalen = data.dsize;
208                 memcpy(&d->data[key.dsize], data.dptr, data.dsize);
209         }
210         return d;
211 }
212
213
214 /* helper function for marshalling multiple records */
215 static struct ctdb_marshall_buffer *db_ctdb_marshall_add(TALLOC_CTX *mem_ctx, 
216                                                struct ctdb_marshall_buffer *m,
217                                                uint64_t db_id,
218                                                uint32_t reqid,
219                                                TDB_DATA key,
220                                                struct ctdb_ltdb_header *header,
221                                                TDB_DATA data)
222 {
223         struct ctdb_rec_data *r;
224         size_t m_size, r_size;
225         struct ctdb_marshall_buffer *m2 = NULL;
226
227         r = db_ctdb_marshall_record(talloc_tos(), reqid, key, header, data);
228         if (r == NULL) {
229                 talloc_free(m);
230                 return NULL;
231         }
232
233         if (m == NULL) {
234                 m = (struct ctdb_marshall_buffer *)talloc_zero_size(
235                         mem_ctx, offsetof(struct ctdb_marshall_buffer, data));
236                 if (m == NULL) {
237                         goto done;
238                 }
239                 m->db_id = db_id;
240         }
241
242         m_size = talloc_get_size(m);
243         r_size = talloc_get_size(r);
244
245         m2 = (struct ctdb_marshall_buffer *)talloc_realloc_size(
246                 mem_ctx, m,  m_size + r_size);
247         if (m2 == NULL) {
248                 talloc_free(m);
249                 goto done;
250         }
251
252         memcpy(m_size + (uint8_t *)m2, r, r_size);
253
254         m2->count++;
255
256 done:
257         talloc_free(r);
258         return m2;
259 }
260
261 /* we've finished marshalling, return a data blob with the marshalled records */
262 static TDB_DATA db_ctdb_marshall_finish(struct ctdb_marshall_buffer *m)
263 {
264         TDB_DATA data;
265         data.dptr = (uint8_t *)m;
266         data.dsize = talloc_get_size(m);
267         return data;
268 }
269
270 /* 
271    loop over a marshalling buffer 
272
273      - pass r==NULL to start
274      - loop the number of times indicated by m->count
275 */
276 static struct ctdb_rec_data *db_ctdb_marshall_loop_next(struct ctdb_marshall_buffer *m, struct ctdb_rec_data *r,
277                                                      uint32_t *reqid,
278                                                      struct ctdb_ltdb_header *header,
279                                                      TDB_DATA *key, TDB_DATA *data)
280 {
281         if (r == NULL) {
282                 r = (struct ctdb_rec_data *)&m->data[0];
283         } else {
284                 r = (struct ctdb_rec_data *)(r->length + (uint8_t *)r);
285         }
286
287         if (reqid != NULL) {
288                 *reqid = r->reqid;
289         }
290
291         if (key != NULL) {
292                 key->dptr   = &r->data[0];
293                 key->dsize  = r->keylen;
294         }
295         if (data != NULL) {
296                 data->dptr  = &r->data[r->keylen];
297                 data->dsize = r->datalen;
298                 if (header != NULL) {
299                         data->dptr += sizeof(*header);
300                         data->dsize -= sizeof(*header);
301                 }
302         }
303
304         if (header != NULL) {
305                 if (r->datalen < sizeof(*header)) {
306                         return NULL;
307                 }
308                 *header = *(struct ctdb_ltdb_header *)&r->data[r->keylen];
309         }
310
311         return r;
312 }
313
314 /**
315  * CTDB transaction destructor
316  */
317 static int db_ctdb_transaction_destructor(struct db_ctdb_transaction_handle *h)
318 {
319         NTSTATUS status;
320
321         status = g_lock_unlock(h->ctx->lock_ctx, h->lock_name);
322         if (!NT_STATUS_IS_OK(status)) {
323                 DEBUG(0, ("g_lock_unlock failed for %s: %s\n", h->lock_name,
324                           nt_errstr(status)));
325                 return -1;
326         }
327         return 0;
328 }
329
330 /**
331  * CTDB dbwrap API: transaction_start function
332  * starts a transaction on a persistent database
333  */
334 static int db_ctdb_transaction_start(struct db_context *db)
335 {
336         struct db_ctdb_transaction_handle *h;
337         NTSTATUS status;
338         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
339                                                         struct db_ctdb_ctx);
340
341         if (!db->persistent) {
342                 DEBUG(0,("transactions not supported on non-persistent database 0x%08x\n", 
343                          ctx->db_id));
344                 return -1;
345         }
346
347         if (ctx->transaction) {
348                 ctx->transaction->nesting++;
349                 DEBUG(5, (__location__ " transaction start on db 0x%08x: nesting %d -> %d\n",
350                           ctx->db_id, ctx->transaction->nesting - 1, ctx->transaction->nesting));
351                 return 0;
352         }
353
354         h = talloc_zero(db, struct db_ctdb_transaction_handle);
355         if (h == NULL) {
356                 DEBUG(0,(__location__ " oom for transaction handle\n"));                
357                 return -1;
358         }
359
360         h->ctx = ctx;
361
362         h->lock_name = talloc_asprintf(h, "transaction_db_0x%08x",
363                                        (unsigned int)ctx->db_id);
364         if (h->lock_name == NULL) {
365                 DEBUG(0, ("talloc_asprintf failed\n"));
366                 TALLOC_FREE(h);
367                 return -1;
368         }
369
370         /*
371          * Wait a day, i.e. forever...
372          */
373         status = g_lock_lock(ctx->lock_ctx, h->lock_name, G_LOCK_WRITE,
374                              timeval_set(86400, 0));
375         if (!NT_STATUS_IS_OK(status)) {
376                 DEBUG(0, ("g_lock_lock failed: %s\n", nt_errstr(status)));
377                 TALLOC_FREE(h);
378                 return -1;
379         }
380
381         talloc_set_destructor(h, db_ctdb_transaction_destructor);
382
383         ctx->transaction = h;
384
385         DEBUG(5,(__location__ " transaction started on db 0x%08x\n", ctx->db_id));
386
387         return 0;
388 }
389
390 static bool pull_newest_from_marshall_buffer(struct ctdb_marshall_buffer *buf,
391                                              TDB_DATA key,
392                                              struct ctdb_ltdb_header *pheader,
393                                              TALLOC_CTX *mem_ctx,
394                                              TDB_DATA *pdata)
395 {
396         struct ctdb_rec_data *rec = NULL;
397         struct ctdb_ltdb_header h;
398         bool found = false;
399         TDB_DATA data;
400         int i;
401
402         if (buf == NULL) {
403                 return false;
404         }
405
406         ZERO_STRUCT(h);
407         ZERO_STRUCT(data);
408
409         /*
410          * Walk the list of records written during this
411          * transaction. If we want to read one we have already
412          * written, return the last written sample. Thus we do not do
413          * a "break;" for the first hit, this record might have been
414          * overwritten later.
415          */
416
417         for (i=0; i<buf->count; i++) {
418                 TDB_DATA tkey, tdata;
419                 uint32_t reqid;
420                 struct ctdb_ltdb_header hdr;
421
422                 ZERO_STRUCT(hdr);
423
424                 rec = db_ctdb_marshall_loop_next(buf, rec, &reqid, &hdr, &tkey,
425                                                  &tdata);
426                 if (rec == NULL) {
427                         return false;
428                 }
429
430                 if (tdb_data_equal(key, tkey)) {
431                         found = true;
432                         data = tdata;
433                         h = hdr;
434                 }
435         }
436
437         if (!found) {
438                 return false;
439         }
440
441         if (pdata != NULL) {
442                 data.dptr = (uint8_t *)talloc_memdup(mem_ctx, data.dptr,
443                                                      data.dsize);
444                 if ((data.dsize != 0) && (data.dptr == NULL)) {
445                         return false;
446                 }
447                 *pdata = data;
448         }
449
450         if (pheader != NULL) {
451                 *pheader = h;
452         }
453
454         return true;
455 }
456
457 /*
458   fetch a record inside a transaction
459  */
460 static NTSTATUS db_ctdb_transaction_fetch(struct db_ctdb_ctx *db,
461                                           TALLOC_CTX *mem_ctx,
462                                           TDB_DATA key, TDB_DATA *data)
463 {
464         struct db_ctdb_transaction_handle *h = db->transaction;
465         NTSTATUS status;
466         bool found;
467
468         found = pull_newest_from_marshall_buffer(h->m_write, key, NULL,
469                                                  mem_ctx, data);
470         if (found) {
471                 return NT_STATUS_OK;
472         }
473
474         status = db_ctdb_ltdb_fetch(h->ctx, key, NULL, mem_ctx, data);
475
476         if (NT_STATUS_EQUAL(status, NT_STATUS_NOT_FOUND)) {
477                 *data = tdb_null;
478         }
479
480         return status;
481 }
482
483 /**
484  * Fetch a record from a persistent database
485  * without record locking and without an active transaction.
486  *
487  * This just fetches from the local database copy.
488  * Since the databases are kept in syc cluster-wide,
489  * there is no point in doing a ctdb call to fetch the
490  * record from the lmaster. It does even harm since migration
491  * of records bump their RSN and hence render the persistent
492  * database inconsistent.
493  */
494 static NTSTATUS db_ctdb_fetch_persistent(struct db_ctdb_ctx *db,
495                                          TALLOC_CTX *mem_ctx,
496                                          TDB_DATA key, TDB_DATA *data)
497 {
498         NTSTATUS status;
499
500         status = db_ctdb_ltdb_fetch(db, key, NULL, mem_ctx, data);
501
502         if (NT_STATUS_EQUAL(status, NT_STATUS_NOT_FOUND)) {
503                 *data = tdb_null;
504         }
505
506         return status;
507 }
508
509 static NTSTATUS db_ctdb_store_transaction(struct db_record *rec, TDB_DATA data, int flag);
510 static NTSTATUS db_ctdb_delete_transaction(struct db_record *rec);
511
512 static struct db_record *db_ctdb_fetch_locked_transaction(struct db_ctdb_ctx *ctx,
513                                                           TALLOC_CTX *mem_ctx,
514                                                           TDB_DATA key)
515 {
516         struct db_record *result;
517         TDB_DATA ctdb_data;
518
519         if (!(result = talloc(mem_ctx, struct db_record))) {
520                 DEBUG(0, ("talloc failed\n"));
521                 return NULL;
522         }
523
524         result->private_data = ctx->transaction;
525
526         result->key.dsize = key.dsize;
527         result->key.dptr = (uint8_t *)talloc_memdup(result, key.dptr,
528                                                     key.dsize);
529         if (result->key.dptr == NULL) {
530                 DEBUG(0, ("talloc failed\n"));
531                 TALLOC_FREE(result);
532                 return NULL;
533         }
534
535         result->store = db_ctdb_store_transaction;
536         result->delete_rec = db_ctdb_delete_transaction;
537
538         if (pull_newest_from_marshall_buffer(ctx->transaction->m_write, key,
539                                              NULL, result, &result->value)) {
540                 return result;
541         }
542
543         ctdb_data = tdb_fetch_compat(ctx->wtdb->tdb, key);
544         if (ctdb_data.dptr == NULL) {
545                 /* create the record */
546                 result->value = tdb_null;
547                 return result;
548         }
549
550         result->value.dsize = ctdb_data.dsize - sizeof(struct ctdb_ltdb_header);
551         result->value.dptr = NULL;
552
553         if ((result->value.dsize != 0)
554             && !(result->value.dptr = (uint8_t *)talloc_memdup(
555                          result, ctdb_data.dptr + sizeof(struct ctdb_ltdb_header),
556                          result->value.dsize))) {
557                 DEBUG(0, ("talloc failed\n"));
558                 TALLOC_FREE(result);
559         }
560
561         SAFE_FREE(ctdb_data.dptr);
562
563         return result;
564 }
565
566 static int db_ctdb_record_destructor(struct db_record **recp)
567 {
568         struct db_record *rec = talloc_get_type_abort(*recp, struct db_record);
569         struct db_ctdb_transaction_handle *h = talloc_get_type_abort(
570                 rec->private_data, struct db_ctdb_transaction_handle);
571         int ret = h->ctx->db->transaction_commit(h->ctx->db);
572         if (ret != 0) {
573                 DEBUG(0,(__location__ " transaction_commit failed\n"));
574         }
575         return 0;
576 }
577
578 /*
579   auto-create a transaction for persistent databases
580  */
581 static struct db_record *db_ctdb_fetch_locked_persistent(struct db_ctdb_ctx *ctx,
582                                                          TALLOC_CTX *mem_ctx,
583                                                          TDB_DATA key)
584 {
585         int res;
586         struct db_record *rec, **recp;
587
588         res = db_ctdb_transaction_start(ctx->db);
589         if (res == -1) {
590                 return NULL;
591         }
592
593         rec = db_ctdb_fetch_locked_transaction(ctx, mem_ctx, key);
594         if (rec == NULL) {
595                 ctx->db->transaction_cancel(ctx->db);           
596                 return NULL;
597         }
598
599         /* destroy this transaction when we release the lock */
600         recp = talloc(rec, struct db_record *);
601         if (recp == NULL) {
602                 ctx->db->transaction_cancel(ctx->db);
603                 talloc_free(rec);
604                 return NULL;
605         }
606         *recp = rec;
607         talloc_set_destructor(recp, db_ctdb_record_destructor);
608         return rec;
609 }
610
611
612 /*
613   stores a record inside a transaction
614  */
615 static NTSTATUS db_ctdb_transaction_store(struct db_ctdb_transaction_handle *h,
616                                           TDB_DATA key, TDB_DATA data)
617 {
618         TALLOC_CTX *tmp_ctx = talloc_new(h);
619         TDB_DATA rec;
620         struct ctdb_ltdb_header header;
621
622         ZERO_STRUCT(header);
623
624         /* we need the header so we can update the RSN */
625
626         if (!pull_newest_from_marshall_buffer(h->m_write, key, &header,
627                                               NULL, NULL)) {
628
629                 rec = tdb_fetch_compat(h->ctx->wtdb->tdb, key);
630
631                 if (rec.dptr != NULL) {
632                         memcpy(&header, rec.dptr,
633                                sizeof(struct ctdb_ltdb_header));
634                         rec.dsize -= sizeof(struct ctdb_ltdb_header);
635
636                         /*
637                          * a special case, we are writing the same
638                          * data that is there now
639                          */
640                         if (data.dsize == rec.dsize &&
641                             memcmp(data.dptr,
642                                    rec.dptr + sizeof(struct ctdb_ltdb_header),
643                                    data.dsize) == 0) {
644                                 SAFE_FREE(rec.dptr);
645                                 talloc_free(tmp_ctx);
646                                 return NT_STATUS_OK;
647                         }
648                 }
649                 SAFE_FREE(rec.dptr);
650         }
651
652         header.dmaster = get_my_vnn();
653         header.rsn++;
654
655         h->m_write = db_ctdb_marshall_add(h, h->m_write, h->ctx->db_id, 0, key, &header, data);
656         if (h->m_write == NULL) {
657                 DEBUG(0,(__location__ " Failed to add to marshalling record\n"));
658                 talloc_free(tmp_ctx);
659                 return NT_STATUS_NO_MEMORY;
660         }
661
662         talloc_free(tmp_ctx);
663         return NT_STATUS_OK;
664 }
665
666
667 /* 
668    a record store inside a transaction
669  */
670 static NTSTATUS db_ctdb_store_transaction(struct db_record *rec, TDB_DATA data, int flag)
671 {
672         struct db_ctdb_transaction_handle *h = talloc_get_type_abort(
673                 rec->private_data, struct db_ctdb_transaction_handle);
674         NTSTATUS status;
675
676         status = db_ctdb_transaction_store(h, rec->key, data);
677         return status;
678 }
679
680 /* 
681    a record delete inside a transaction
682  */
683 static NTSTATUS db_ctdb_delete_transaction(struct db_record *rec)
684 {
685         struct db_ctdb_transaction_handle *h = talloc_get_type_abort(
686                 rec->private_data, struct db_ctdb_transaction_handle);
687         NTSTATUS status;
688
689         status =  db_ctdb_transaction_store(h, rec->key, tdb_null);
690         return status;
691 }
692
693 /**
694  * Fetch the db sequence number of a persistent db directly from the db.
695  */
696 static NTSTATUS db_ctdb_fetch_db_seqnum_from_db(struct db_ctdb_ctx *db,
697                                                 uint64_t *seqnum)
698 {
699         NTSTATUS status;
700         const char *keyname = CTDB_DB_SEQNUM_KEY;
701         TDB_DATA key;
702         TDB_DATA data;
703         struct ctdb_ltdb_header header;
704         TALLOC_CTX *mem_ctx = talloc_stackframe();
705
706         if (seqnum == NULL) {
707                 return NT_STATUS_INVALID_PARAMETER;
708         }
709
710         key = string_term_tdb_data(keyname);
711
712         status = db_ctdb_ltdb_fetch(db, key, &header, mem_ctx, &data);
713         if (!NT_STATUS_IS_OK(status) &&
714             !NT_STATUS_EQUAL(status, NT_STATUS_NOT_FOUND))
715         {
716                 goto done;
717         }
718
719         status = NT_STATUS_OK;
720
721         if (data.dsize != sizeof(uint64_t)) {
722                 *seqnum = 0;
723                 goto done;
724         }
725
726         *seqnum = *(uint64_t *)data.dptr;
727
728 done:
729         TALLOC_FREE(mem_ctx);
730         return status;
731 }
732
733 /**
734  * Store the database sequence number inside a transaction.
735  */
736 static NTSTATUS db_ctdb_store_db_seqnum(struct db_ctdb_transaction_handle *h,
737                                         uint64_t seqnum)
738 {
739         NTSTATUS status;
740         const char *keyname = CTDB_DB_SEQNUM_KEY;
741         TDB_DATA key;
742         TDB_DATA data;
743
744         key = string_term_tdb_data(keyname);
745
746         data.dptr = (uint8_t *)&seqnum;
747         data.dsize = sizeof(uint64_t);
748
749         status = db_ctdb_transaction_store(h, key, data);
750
751         return status;
752 }
753
754 /*
755   commit a transaction
756  */
757 static int db_ctdb_transaction_commit(struct db_context *db)
758 {
759         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
760                                                         struct db_ctdb_ctx);
761         NTSTATUS rets;
762         int status;
763         struct db_ctdb_transaction_handle *h = ctx->transaction;
764         uint64_t old_seqnum, new_seqnum;
765         int ret;
766
767         if (h == NULL) {
768                 DEBUG(0,(__location__ " transaction commit with no open transaction on db 0x%08x\n", ctx->db_id));
769                 return -1;
770         }
771
772         if (h->nested_cancel) {
773                 db->transaction_cancel(db);
774                 DEBUG(5,(__location__ " Failed transaction commit after nested cancel\n"));
775                 return -1;
776         }
777
778         if (h->nesting != 0) {
779                 h->nesting--;
780                 DEBUG(5, (__location__ " transaction commit on db 0x%08x: nesting %d -> %d\n",
781                           ctx->db_id, ctx->transaction->nesting + 1, ctx->transaction->nesting));
782                 return 0;
783         }
784
785         if (h->m_write == NULL) {
786                 /*
787                  * No changes were made, so don't change the seqnum,
788                  * don't push to other node, just exit with success.
789                  */
790                 ret = 0;
791                 goto done;
792         }
793
794         DEBUG(5,(__location__ " transaction commit on db 0x%08x\n", ctx->db_id));
795
796         /*
797          * As the last db action before committing, bump the database sequence
798          * number. Note that this undoes all changes to the seqnum records
799          * performed under the transaction. This record is not meant to be
800          * modified by user interaction. It is for internal use only...
801          */
802         rets = db_ctdb_fetch_db_seqnum_from_db(ctx, &old_seqnum);
803         if (!NT_STATUS_IS_OK(rets)) {
804                 DEBUG(1, (__location__ " failed to fetch the db sequence number "
805                           "in transaction commit on db 0x%08x\n", ctx->db_id));
806                 ret = -1;
807                 goto done;
808         }
809
810         new_seqnum = old_seqnum + 1;
811
812         rets = db_ctdb_store_db_seqnum(h, new_seqnum);
813         if (!NT_STATUS_IS_OK(rets)) {
814                 DEBUG(1, (__location__ "failed to store the db sequence number "
815                           " in transaction commit on db 0x%08x\n", ctx->db_id));
816                 ret = -1;
817                 goto done;
818         }
819
820 again:
821         /* tell ctdbd to commit to the other nodes */
822         rets = ctdbd_control_local(messaging_ctdbd_connection(),
823                                    CTDB_CONTROL_TRANS3_COMMIT,
824                                    h->ctx->db_id, 0,
825                                    db_ctdb_marshall_finish(h->m_write),
826                                    NULL, NULL, &status);
827         if (!NT_STATUS_IS_OK(rets) || status != 0) {
828                 /*
829                  * The TRANS3_COMMIT control should only possibly fail when a
830                  * recovery has been running concurrently. In any case, the db
831                  * will be the same on all nodes, either the new copy or the
832                  * old copy.  This can be detected by comparing the old and new
833                  * local sequence numbers.
834                  */
835                 rets = db_ctdb_fetch_db_seqnum_from_db(ctx, &new_seqnum);
836                 if (!NT_STATUS_IS_OK(rets)) {
837                         DEBUG(1, (__location__ " failed to refetch db sequence "
838                                   "number after failed TRANS3_COMMIT\n"));
839                         ret = -1;
840                         goto done;
841                 }
842
843                 if (new_seqnum == old_seqnum) {
844                         /* Recovery prevented all our changes: retry. */
845                         goto again;
846                 } else if (new_seqnum != (old_seqnum + 1)) {
847                         DEBUG(0, (__location__ " ERROR: new_seqnum[%lu] != "
848                                   "old_seqnum[%lu] + (0 or 1) after failed "
849                                   "TRANS3_COMMIT - this should not happen!\n",
850                                   (unsigned long)new_seqnum,
851                                   (unsigned long)old_seqnum));
852                         ret = -1;
853                         goto done;
854                 }
855                 /*
856                  * Recovery propagated our changes to all nodes, completing
857                  * our commit for us - succeed.
858                  */
859         }
860
861         ret = 0;
862
863 done:
864         h->ctx->transaction = NULL;
865         talloc_free(h);
866         return ret;
867 }
868
869
870 /*
871   cancel a transaction
872  */
873 static int db_ctdb_transaction_cancel(struct db_context *db)
874 {
875         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
876                                                         struct db_ctdb_ctx);
877         struct db_ctdb_transaction_handle *h = ctx->transaction;
878
879         if (h == NULL) {
880                 DEBUG(0,(__location__ " transaction cancel with no open transaction on db 0x%08x\n", ctx->db_id));
881                 return -1;
882         }
883
884         if (h->nesting != 0) {
885                 h->nesting--;
886                 h->nested_cancel = true;
887                 DEBUG(5, (__location__ " transaction cancel on db 0x%08x: nesting %d -> %d\n",
888                           ctx->db_id, ctx->transaction->nesting + 1, ctx->transaction->nesting));
889                 return 0;
890         }
891
892         DEBUG(5,(__location__ " Cancel transaction on db 0x%08x\n", ctx->db_id));
893
894         ctx->transaction = NULL;
895         talloc_free(h);
896         return 0;
897 }
898
899
900 static NTSTATUS db_ctdb_store(struct db_record *rec, TDB_DATA data, int flag)
901 {
902         struct db_ctdb_rec *crec = talloc_get_type_abort(
903                 rec->private_data, struct db_ctdb_rec);
904
905         return db_ctdb_ltdb_store(crec->ctdb_ctx, rec->key, &(crec->header), data);
906 }
907
908
909
910 #ifdef HAVE_CTDB_CONTROL_SCHEDULE_FOR_DELETION_DECL
911 static NTSTATUS db_ctdb_send_schedule_for_deletion(struct db_record *rec)
912 {
913         NTSTATUS status;
914         struct ctdb_control_schedule_for_deletion *dd;
915         TDB_DATA indata;
916         int cstatus;
917         struct db_ctdb_rec *crec = talloc_get_type_abort(
918                 rec->private_data, struct db_ctdb_rec);
919
920         indata.dsize = offsetof(struct ctdb_control_schedule_for_deletion, key) + rec->key.dsize;
921         indata.dptr = talloc_zero_array(crec, uint8_t, indata.dsize);
922         if (indata.dptr == NULL) {
923                 DEBUG(0, (__location__ " talloc failed!\n"));
924                 return NT_STATUS_NO_MEMORY;
925         }
926
927         dd = (struct ctdb_control_schedule_for_deletion *)(void *)indata.dptr;
928         dd->db_id = crec->ctdb_ctx->db_id;
929         dd->hdr = crec->header;
930         dd->keylen = rec->key.dsize;
931         memcpy(dd->key, rec->key.dptr, rec->key.dsize);
932
933         status = ctdbd_control_local(messaging_ctdbd_connection(),
934                                      CTDB_CONTROL_SCHEDULE_FOR_DELETION,
935                                      crec->ctdb_ctx->db_id,
936                                      CTDB_CTRL_FLAG_NOREPLY, /* flags */
937                                      indata,
938                                      NULL, /* outdata */
939                                      NULL, /* errmsg */
940                                      &cstatus);
941         talloc_free(indata.dptr);
942
943         if (!NT_STATUS_IS_OK(status) || cstatus != 0) {
944                 DEBUG(1, (__location__ " Error sending local control "
945                           "SCHEDULE_FOR_DELETION: %s, cstatus = %d\n",
946                           nt_errstr(status), cstatus));
947                 if (NT_STATUS_IS_OK(status)) {
948                         status = NT_STATUS_UNSUCCESSFUL;
949                 }
950         }
951
952         return status;
953 }
954 #endif
955
956 static NTSTATUS db_ctdb_delete(struct db_record *rec)
957 {
958         TDB_DATA data;
959         NTSTATUS status;
960
961         /*
962          * We have to store the header with empty data. TODO: Fix the
963          * tdb-level cleanup
964          */
965
966         ZERO_STRUCT(data);
967
968         status = db_ctdb_store(rec, data, 0);
969         if (!NT_STATUS_IS_OK(status)) {
970                 return status;
971         }
972
973 #ifdef HAVE_CTDB_CONTROL_SCHEDULE_FOR_DELETION_DECL
974         status = db_ctdb_send_schedule_for_deletion(rec);
975 #endif
976
977         return status;
978 }
979
980 static int db_ctdb_record_destr(struct db_record* data)
981 {
982         struct db_ctdb_rec *crec = talloc_get_type_abort(
983                 data->private_data, struct db_ctdb_rec);
984         int threshold;
985
986         DEBUG(10, (DEBUGLEVEL > 10
987                    ? "Unlocking db %u key %s\n"
988                    : "Unlocking db %u key %.20s\n",
989                    (int)crec->ctdb_ctx->db_id,
990                    hex_encode_talloc(data, (unsigned char *)data->key.dptr,
991                               data->key.dsize)));
992
993         tdb_chainunlock(crec->ctdb_ctx->wtdb->tdb, data->key);
994
995         threshold = lp_ctdb_locktime_warn_threshold();
996         if (threshold != 0) {
997                 double timediff = timeval_elapsed(&crec->lock_time);
998                 if ((timediff * 1000) > threshold) {
999                         DEBUG(0, ("Held tdb lock %f seconds\n", timediff));
1000                 }
1001         }
1002
1003         return 0;
1004 }
1005
1006 /**
1007  * Check whether we have a valid local copy of the given record,
1008  * either for reading or for writing.
1009  */
1010 static bool db_ctdb_can_use_local_copy(TDB_DATA ctdb_data, bool read_only)
1011 {
1012         struct ctdb_ltdb_header *hdr;
1013
1014         if (ctdb_data.dptr == NULL)
1015                 return false;
1016
1017         if (ctdb_data.dsize < sizeof(struct ctdb_ltdb_header))
1018                 return false;
1019
1020         hdr = (struct ctdb_ltdb_header *)ctdb_data.dptr;
1021
1022 #ifdef HAVE_CTDB_WANT_READONLY_DECL
1023         if (hdr->dmaster != get_my_vnn()) {
1024                 /* If we're not dmaster, it must be r/o copy. */
1025                 return read_only && (hdr->flags & CTDB_REC_RO_HAVE_READONLY);
1026         }
1027
1028         /*
1029          * If we want write access, no one may have r/o copies.
1030          */
1031         return read_only || !(hdr->flags & CTDB_REC_RO_HAVE_DELEGATIONS);
1032 #else
1033         return (hdr->dmaster == get_my_vnn());
1034 #endif
1035 }
1036
1037 static struct db_record *fetch_locked_internal(struct db_ctdb_ctx *ctx,
1038                                                TALLOC_CTX *mem_ctx,
1039                                                TDB_DATA key,
1040                                                bool tryonly)
1041 {
1042         struct db_record *result;
1043         struct db_ctdb_rec *crec;
1044         NTSTATUS status;
1045         TDB_DATA ctdb_data;
1046         int migrate_attempts = 0;
1047         int lockret;
1048
1049         if (!(result = talloc(mem_ctx, struct db_record))) {
1050                 DEBUG(0, ("talloc failed\n"));
1051                 return NULL;
1052         }
1053
1054         if (!(crec = talloc_zero(result, struct db_ctdb_rec))) {
1055                 DEBUG(0, ("talloc failed\n"));
1056                 TALLOC_FREE(result);
1057                 return NULL;
1058         }
1059
1060         result->db = ctx->db;
1061         result->private_data = (void *)crec;
1062         crec->ctdb_ctx = ctx;
1063
1064         result->key.dsize = key.dsize;
1065         result->key.dptr = (uint8_t *)talloc_memdup(result, key.dptr,
1066                                                     key.dsize);
1067         if (result->key.dptr == NULL) {
1068                 DEBUG(0, ("talloc failed\n"));
1069                 TALLOC_FREE(result);
1070                 return NULL;
1071         }
1072
1073         /*
1074          * Do a blocking lock on the record
1075          */
1076 again:
1077
1078         if (DEBUGLEVEL >= 10) {
1079                 char *keystr = hex_encode_talloc(result, key.dptr, key.dsize);
1080                 DEBUG(10, (DEBUGLEVEL > 10
1081                            ? "Locking db %u key %s\n"
1082                            : "Locking db %u key %.20s\n",
1083                            (int)crec->ctdb_ctx->db_id, keystr));
1084                 TALLOC_FREE(keystr);
1085         }
1086
1087         lockret = tryonly
1088                 ? tdb_chainlock_nonblock(ctx->wtdb->tdb, key)
1089                 : tdb_chainlock(ctx->wtdb->tdb, key);
1090         if (lockret != 0) {
1091                 DEBUG(3, ("tdb_chainlock failed\n"));
1092                 TALLOC_FREE(result);
1093                 return NULL;
1094         }
1095
1096         result->store = db_ctdb_store;
1097         result->delete_rec = db_ctdb_delete;
1098         talloc_set_destructor(result, db_ctdb_record_destr);
1099
1100         ctdb_data = tdb_fetch_compat(ctx->wtdb->tdb, key);
1101
1102         /*
1103          * See if we have a valid record and we are the dmaster. If so, we can
1104          * take the shortcut and just return it.
1105          */
1106
1107         if (!db_ctdb_can_use_local_copy(ctdb_data, false)) {
1108                 SAFE_FREE(ctdb_data.dptr);
1109                 tdb_chainunlock(ctx->wtdb->tdb, key);
1110                 talloc_set_destructor(result, NULL);
1111
1112                 if (tryonly && (migrate_attempts != 0)) {
1113                         DEBUG(5, ("record migrated away again\n"));
1114                         TALLOC_FREE(result);
1115                         return NULL;
1116                 }
1117
1118                 migrate_attempts += 1;
1119
1120                 DEBUG(10, ("ctdb_data.dptr = %p, dmaster = %u (%u) %u\n",
1121                            ctdb_data.dptr, ctdb_data.dptr ?
1122                            ((struct ctdb_ltdb_header *)ctdb_data.dptr)->dmaster : -1,
1123                            get_my_vnn(),
1124                            ctdb_data.dptr ?
1125                            ((struct ctdb_ltdb_header *)ctdb_data.dptr)->flags : 0));
1126
1127                 status = ctdbd_migrate(messaging_ctdbd_connection(), ctx->db_id,
1128                                        key);
1129                 if (!NT_STATUS_IS_OK(status)) {
1130                         DEBUG(5, ("ctdb_migrate failed: %s\n",
1131                                   nt_errstr(status)));
1132                         TALLOC_FREE(result);
1133                         return NULL;
1134                 }
1135                 /* now its migrated, try again */
1136                 goto again;
1137         }
1138
1139         if (migrate_attempts > 10) {
1140                 DEBUG(0, ("db_ctdb_fetch_locked for %s key %s needed %d "
1141                           "attempts\n", tdb_name(ctx->wtdb->tdb),
1142                           hex_encode_talloc(talloc_tos(),
1143                                             (unsigned char *)key.dptr,
1144                                             key.dsize),
1145                           migrate_attempts));
1146         }
1147
1148         GetTimeOfDay(&crec->lock_time);
1149
1150         memcpy(&crec->header, ctdb_data.dptr, sizeof(crec->header));
1151
1152         result->value.dsize = ctdb_data.dsize - sizeof(crec->header);
1153         result->value.dptr = NULL;
1154
1155         if ((result->value.dsize != 0)
1156             && !(result->value.dptr = (uint8_t *)talloc_memdup(
1157                          result, ctdb_data.dptr + sizeof(crec->header),
1158                          result->value.dsize))) {
1159                 DEBUG(0, ("talloc failed\n"));
1160                 TALLOC_FREE(result);
1161         }
1162
1163         SAFE_FREE(ctdb_data.dptr);
1164
1165         return result;
1166 }
1167
1168 static struct db_record *db_ctdb_fetch_locked(struct db_context *db,
1169                                               TALLOC_CTX *mem_ctx,
1170                                               TDB_DATA key)
1171 {
1172         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1173                                                         struct db_ctdb_ctx);
1174
1175         if (ctx->transaction != NULL) {
1176                 return db_ctdb_fetch_locked_transaction(ctx, mem_ctx, key);
1177         }
1178
1179         if (db->persistent) {
1180                 return db_ctdb_fetch_locked_persistent(ctx, mem_ctx, key);
1181         }
1182
1183         return fetch_locked_internal(ctx, mem_ctx, key, false);
1184 }
1185
1186 static struct db_record *db_ctdb_try_fetch_locked(struct db_context *db,
1187                                                   TALLOC_CTX *mem_ctx,
1188                                                   TDB_DATA key)
1189 {
1190         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1191                                                         struct db_ctdb_ctx);
1192
1193         if (ctx->transaction != NULL) {
1194                 return db_ctdb_fetch_locked_transaction(ctx, mem_ctx, key);
1195         }
1196
1197         if (db->persistent) {
1198                 return db_ctdb_fetch_locked_persistent(ctx, mem_ctx, key);
1199         }
1200
1201         return fetch_locked_internal(ctx, mem_ctx, key, true);
1202 }
1203
1204 /*
1205   fetch (unlocked, no migration) operation on ctdb
1206  */
1207 static NTSTATUS db_ctdb_fetch(struct db_context *db, TALLOC_CTX *mem_ctx,
1208                               TDB_DATA key, TDB_DATA *data)
1209 {
1210         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1211                                                         struct db_ctdb_ctx);
1212         NTSTATUS status;
1213         TDB_DATA ctdb_data;
1214
1215         if (ctx->transaction) {
1216                 return db_ctdb_transaction_fetch(ctx, mem_ctx, key, data);
1217         }
1218
1219         if (db->persistent) {
1220                 return db_ctdb_fetch_persistent(ctx, mem_ctx, key, data);
1221         }
1222
1223         /* try a direct fetch */
1224         ctdb_data = tdb_fetch_compat(ctx->wtdb->tdb, key);
1225
1226         /*
1227          * See if we have a valid record and we are the dmaster. If so, we can
1228          * take the shortcut and just return it.
1229          * we bypass the dmaster check for persistent databases
1230          */
1231         if (db_ctdb_can_use_local_copy(ctdb_data, true)) {
1232                 /*
1233                  * We have a valid local copy - avoid the ctdb protocol op
1234                  */
1235                 data->dsize = ctdb_data.dsize - sizeof(struct ctdb_ltdb_header);
1236
1237                 data->dptr = (uint8_t *)talloc_memdup(
1238                         mem_ctx, ctdb_data.dptr+sizeof(struct ctdb_ltdb_header),
1239                         data->dsize);
1240
1241                 SAFE_FREE(ctdb_data.dptr);
1242
1243                 if (data->dptr == NULL) {
1244                         return NT_STATUS_NO_MEMORY;
1245                 }
1246                 return NT_STATUS_OK;
1247         }
1248
1249         SAFE_FREE(ctdb_data.dptr);
1250
1251         /*
1252          * We weren't able to get it locally - ask ctdb to fetch it for us.
1253          * If we already had *something*, it's probably worth making a local
1254          * read-only copy.
1255          */
1256         status = ctdbd_fetch(messaging_ctdbd_connection(), ctx->db_id, key,
1257                              mem_ctx, data,
1258                              ctdb_data.dsize >= sizeof(struct ctdb_ltdb_header));
1259         if (!NT_STATUS_IS_OK(status)) {
1260                 DEBUG(5, ("ctdbd_fetch failed: %s\n", nt_errstr(status)));
1261         }
1262
1263         return status;
1264 }
1265
1266 static NTSTATUS db_ctdb_parse_record(struct db_context *db, TDB_DATA key,
1267                                      void (*parser)(TDB_DATA key,
1268                                                     TDB_DATA data,
1269                                                     void *private_data),
1270                                      void *private_data)
1271 {
1272         NTSTATUS status;
1273         TDB_DATA data;
1274
1275         status = db_ctdb_fetch(db, talloc_tos(), key, &data);
1276         if (!NT_STATUS_IS_OK(status)) {
1277                 return status;
1278         }
1279         parser(key, data, private_data);
1280         TALLOC_FREE(data.dptr);
1281         return NT_STATUS_OK;
1282 }
1283
1284 struct traverse_state {
1285         struct db_context *db;
1286         int (*fn)(struct db_record *rec, void *private_data);
1287         void *private_data;
1288         int count;
1289 };
1290
1291 static void traverse_callback(TDB_DATA key, TDB_DATA data, void *private_data)
1292 {
1293         struct traverse_state *state = (struct traverse_state *)private_data;
1294         struct db_record *rec;
1295         TALLOC_CTX *tmp_ctx = talloc_new(state->db);
1296         /* we have to give them a locked record to prevent races */
1297         rec = db_ctdb_fetch_locked(state->db, tmp_ctx, key);
1298         if (rec && rec->value.dsize > 0) {
1299                 state->fn(rec, state->private_data);
1300         }
1301         talloc_free(tmp_ctx);
1302 }
1303
1304 static int traverse_persistent_callback(TDB_CONTEXT *tdb, TDB_DATA kbuf, TDB_DATA dbuf,
1305                                         void *private_data)
1306 {
1307         struct traverse_state *state = (struct traverse_state *)private_data;
1308         struct db_record *rec;
1309         TALLOC_CTX *tmp_ctx = talloc_new(state->db);
1310         int ret = 0;
1311
1312         /*
1313          * Skip the __db_sequence_number__ key:
1314          * This is used for persistent transactions internally.
1315          */
1316         if (kbuf.dsize == strlen(CTDB_DB_SEQNUM_KEY) + 1 &&
1317             strcmp((const char*)kbuf.dptr, CTDB_DB_SEQNUM_KEY) == 0)
1318         {
1319                 goto done;
1320         }
1321
1322         /* we have to give them a locked record to prevent races */
1323         rec = db_ctdb_fetch_locked(state->db, tmp_ctx, kbuf);
1324         if (rec && rec->value.dsize > 0) {
1325                 ret = state->fn(rec, state->private_data);
1326         }
1327
1328 done:
1329         talloc_free(tmp_ctx);
1330         return ret;
1331 }
1332
1333 /* wrapper to use traverse_persistent_callback with dbwrap */
1334 static int traverse_persistent_callback_dbwrap(struct db_record *rec, void* data)
1335 {
1336         return traverse_persistent_callback(NULL, rec->key, rec->value, data);
1337 }
1338
1339
1340 static int db_ctdb_traverse(struct db_context *db,
1341                             int (*fn)(struct db_record *rec,
1342                                       void *private_data),
1343                             void *private_data)
1344 {
1345         NTSTATUS status;
1346         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1347                                                         struct db_ctdb_ctx);
1348         struct traverse_state state;
1349
1350         state.db = db;
1351         state.fn = fn;
1352         state.private_data = private_data;
1353         state.count = 0;
1354
1355         if (db->persistent) {
1356                 struct tdb_context *ltdb = ctx->wtdb->tdb;
1357                 int ret;
1358
1359                 /* for persistent databases we don't need to do a ctdb traverse,
1360                    we can do a faster local traverse */
1361                 ret = tdb_traverse(ltdb, traverse_persistent_callback, &state);
1362                 if (ret < 0) {
1363                         return ret;
1364                 }
1365                 if (ctx->transaction && ctx->transaction->m_write) {
1366                         /*
1367                          * we now have to handle keys not yet
1368                          * present at transaction start
1369                          */
1370                         struct db_context *newkeys = db_open_rbt(talloc_tos());
1371                         struct ctdb_marshall_buffer *mbuf = ctx->transaction->m_write;
1372                         struct ctdb_rec_data *rec=NULL;
1373                         int i;
1374                         int count = 0;
1375
1376                         if (newkeys == NULL) {
1377                                 return -1;
1378                         }
1379
1380                         for (i=0; i<mbuf->count; i++) {
1381                                 TDB_DATA key;
1382                                 rec =db_ctdb_marshall_loop_next(mbuf, rec,
1383                                                                 NULL, NULL,
1384                                                                 &key, NULL);
1385                                 SMB_ASSERT(rec != NULL);
1386
1387                                 if (!tdb_exists(ltdb, key)) {
1388                                         dbwrap_store(newkeys, key, tdb_null, 0);
1389                                 }
1390                         }
1391                         status = dbwrap_traverse(newkeys,
1392                                                  traverse_persistent_callback_dbwrap,
1393                                                  &state,
1394                                                  &count);
1395                         talloc_free(newkeys);
1396                         if (!NT_STATUS_IS_OK(status)) {
1397                                 return -1;
1398                         }
1399                         ret += count;
1400                 }
1401                 return ret;
1402         }
1403
1404         status = ctdbd_traverse(ctx->db_id, traverse_callback, &state);
1405         if (!NT_STATUS_IS_OK(status)) {
1406                 return -1;
1407         }
1408         return state.count;
1409 }
1410
1411 static NTSTATUS db_ctdb_store_deny(struct db_record *rec, TDB_DATA data, int flag)
1412 {
1413         return NT_STATUS_MEDIA_WRITE_PROTECTED;
1414 }
1415
1416 static NTSTATUS db_ctdb_delete_deny(struct db_record *rec)
1417 {
1418         return NT_STATUS_MEDIA_WRITE_PROTECTED;
1419 }
1420
1421 static void traverse_read_callback(TDB_DATA key, TDB_DATA data, void *private_data)
1422 {
1423         struct traverse_state *state = (struct traverse_state *)private_data;
1424         struct db_record rec;
1425         rec.key = key;
1426         rec.value = data;
1427         rec.store = db_ctdb_store_deny;
1428         rec.delete_rec = db_ctdb_delete_deny;
1429         rec.private_data = state->db;
1430         state->fn(&rec, state->private_data);
1431         state->count++;
1432 }
1433
1434 static int traverse_persistent_callback_read(TDB_CONTEXT *tdb, TDB_DATA kbuf, TDB_DATA dbuf,
1435                                         void *private_data)
1436 {
1437         struct traverse_state *state = (struct traverse_state *)private_data;
1438         struct db_record rec;
1439
1440         /*
1441          * Skip the __db_sequence_number__ key:
1442          * This is used for persistent transactions internally.
1443          */
1444         if (kbuf.dsize == strlen(CTDB_DB_SEQNUM_KEY) + 1 &&
1445             strcmp((const char*)kbuf.dptr, CTDB_DB_SEQNUM_KEY) == 0)
1446         {
1447                 return 0;
1448         }
1449
1450         rec.key = kbuf;
1451         rec.value = dbuf;
1452         rec.store = db_ctdb_store_deny;
1453         rec.delete_rec = db_ctdb_delete_deny;
1454         rec.private_data = state->db;
1455
1456         if (rec.value.dsize <= sizeof(struct ctdb_ltdb_header)) {
1457                 /* a deleted record */
1458                 return 0;
1459         }
1460         rec.value.dsize -= sizeof(struct ctdb_ltdb_header);
1461         rec.value.dptr += sizeof(struct ctdb_ltdb_header);
1462
1463         state->count++;
1464         return state->fn(&rec, state->private_data);
1465 }
1466
1467 static int db_ctdb_traverse_read(struct db_context *db,
1468                                  int (*fn)(struct db_record *rec,
1469                                            void *private_data),
1470                                  void *private_data)
1471 {
1472         NTSTATUS status;
1473         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1474                                                         struct db_ctdb_ctx);
1475         struct traverse_state state;
1476
1477         state.db = db;
1478         state.fn = fn;
1479         state.private_data = private_data;
1480         state.count = 0;
1481
1482         if (db->persistent) {
1483                 /* for persistent databases we don't need to do a ctdb traverse,
1484                    we can do a faster local traverse */
1485                 return tdb_traverse_read(ctx->wtdb->tdb, traverse_persistent_callback_read, &state);
1486         }
1487
1488         status = ctdbd_traverse(ctx->db_id, traverse_read_callback, &state);
1489         if (!NT_STATUS_IS_OK(status)) {
1490                 return -1;
1491         }
1492         return state.count;
1493 }
1494
1495 static int db_ctdb_get_seqnum(struct db_context *db)
1496 {
1497         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1498                                                         struct db_ctdb_ctx);
1499         return tdb_get_seqnum(ctx->wtdb->tdb);
1500 }
1501
1502 static void db_ctdb_id(struct db_context *db, const uint8_t **id,
1503                        size_t *idlen)
1504 {
1505         struct db_ctdb_ctx *ctx = talloc_get_type_abort(
1506                 db->private_data, struct db_ctdb_ctx);
1507
1508         *id = (uint8_t *)&ctx->db_id;
1509         *idlen = sizeof(ctx->db_id);
1510 }
1511
1512 struct db_context *db_open_ctdb(TALLOC_CTX *mem_ctx,
1513                                 const char *name,
1514                                 int hash_size, int tdb_flags,
1515                                 int open_flags, mode_t mode,
1516                                 enum dbwrap_lock_order lock_order)
1517 {
1518         struct db_context *result;
1519         struct db_ctdb_ctx *db_ctdb;
1520         char *db_path;
1521         struct ctdbd_connection *conn;
1522         struct loadparm_context *lp_ctx;
1523         struct ctdb_db_priority prio;
1524         NTSTATUS status;
1525         int cstatus;
1526
1527         if (!lp_clustering()) {
1528                 DEBUG(10, ("Clustering disabled -- no ctdb\n"));
1529                 return NULL;
1530         }
1531
1532         if (!(result = talloc_zero(mem_ctx, struct db_context))) {
1533                 DEBUG(0, ("talloc failed\n"));
1534                 TALLOC_FREE(result);
1535                 return NULL;
1536         }
1537
1538         if (!(db_ctdb = talloc(result, struct db_ctdb_ctx))) {
1539                 DEBUG(0, ("talloc failed\n"));
1540                 TALLOC_FREE(result);
1541                 return NULL;
1542         }
1543
1544         db_ctdb->transaction = NULL;
1545         db_ctdb->db = result;
1546
1547         conn = messaging_ctdbd_connection();
1548         if (conn == NULL) {
1549                 DEBUG(1, ("Could not connect to ctdb\n"));
1550                 TALLOC_FREE(result);
1551                 return NULL;
1552         }
1553
1554         if (!NT_STATUS_IS_OK(ctdbd_db_attach(conn, name, &db_ctdb->db_id, tdb_flags))) {
1555                 DEBUG(0, ("ctdbd_db_attach failed for %s\n", name));
1556                 TALLOC_FREE(result);
1557                 return NULL;
1558         }
1559
1560         db_path = ctdbd_dbpath(conn, db_ctdb, db_ctdb->db_id);
1561
1562         result->persistent = ((tdb_flags & TDB_CLEAR_IF_FIRST) == 0);
1563         result->lock_order = lock_order;
1564
1565         /* only pass through specific flags */
1566         tdb_flags &= TDB_SEQNUM;
1567
1568         /* honor permissions if user has specified O_CREAT */
1569         if (open_flags & O_CREAT) {
1570                 chmod(db_path, mode);
1571         }
1572
1573         prio.db_id = db_ctdb->db_id;
1574         prio.priority = lock_order;
1575
1576         status = ctdbd_control_local(
1577                 conn, CTDB_CONTROL_SET_DB_PRIORITY, 0, 0,
1578                 make_tdb_data((uint8_t *)&prio, sizeof(prio)),
1579                 NULL, NULL, &cstatus);
1580
1581         if (!NT_STATUS_IS_OK(status) || (cstatus != 0)) {
1582                 DEBUG(1, ("CTDB_CONTROL_SET_DB_PRIORITY failed: %s, %d\n",
1583                           nt_errstr(status), cstatus));
1584                 TALLOC_FREE(result);
1585                 return NULL;
1586         }
1587
1588         lp_ctx = loadparm_init_s3(db_path, loadparm_s3_helpers());
1589
1590         db_ctdb->wtdb = tdb_wrap_open(db_ctdb, db_path, hash_size, tdb_flags,
1591                                       O_RDWR, 0, lp_ctx);
1592         talloc_unlink(db_path, lp_ctx);
1593         if (db_ctdb->wtdb == NULL) {
1594                 DEBUG(0, ("Could not open tdb %s: %s\n", db_path, strerror(errno)));
1595                 TALLOC_FREE(result);
1596                 return NULL;
1597         }
1598         talloc_free(db_path);
1599
1600         if (result->persistent) {
1601                 db_ctdb->lock_ctx = g_lock_ctx_init(db_ctdb,
1602                                                     ctdb_conn_msg_ctx(conn));
1603                 if (db_ctdb->lock_ctx == NULL) {
1604                         DEBUG(0, ("g_lock_ctx_init failed\n"));
1605                         TALLOC_FREE(result);
1606                         return NULL;
1607                 }
1608         }
1609
1610         result->private_data = (void *)db_ctdb;
1611         result->fetch_locked = db_ctdb_fetch_locked;
1612         result->try_fetch_locked = db_ctdb_try_fetch_locked;
1613         result->parse_record = db_ctdb_parse_record;
1614         result->traverse = db_ctdb_traverse;
1615         result->traverse_read = db_ctdb_traverse_read;
1616         result->get_seqnum = db_ctdb_get_seqnum;
1617         result->transaction_start = db_ctdb_transaction_start;
1618         result->transaction_commit = db_ctdb_transaction_commit;
1619         result->transaction_cancel = db_ctdb_transaction_cancel;
1620         result->id = db_ctdb_id;
1621         result->stored_callback = NULL;
1622
1623         DEBUG(3,("db_open_ctdb: opened database '%s' with dbid 0x%x\n",
1624                  name, db_ctdb->db_id));
1625
1626         return result;
1627 }
1628
1629 #else /* CLUSTER_SUPPORT */
1630
1631 struct db_context *db_open_ctdb(TALLOC_CTX *mem_ctx,
1632                                 const char *name,
1633                                 int hash_size, int tdb_flags,
1634                                 int open_flags, mode_t mode,
1635                                 enum dbwrap_lock_order lock_order)
1636 {
1637         DEBUG(3, ("db_open_ctdb: no cluster support!\n"));
1638         return NULL;
1639 }
1640
1641 #endif