ctdbd_conn: move CTDB_CONTROL_ENABLE_SEQNUM control to db_open_ctdb
[samba.git] / source3 / lib / dbwrap / dbwrap_ctdb.c
1 /*
2    Unix SMB/CIFS implementation.
3    Database interface wrapper around ctdbd
4    Copyright (C) Volker Lendecke 2007-2009
5    Copyright (C) Michael Adam 2009
6
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 3 of the License, or
10    (at your option) any later version.
11
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16
17    You should have received a copy of the GNU General Public License
18    along with this program.  If not, see <http://www.gnu.org/licenses/>.
19 */
20
21 #include "includes.h"
22 #include "system/filesys.h"
23 #include "lib/tdb_wrap/tdb_wrap.h"
24 #include "util_tdb.h"
25 #include "dbwrap/dbwrap.h"
26 #include "dbwrap/dbwrap_ctdb.h"
27 #include "dbwrap/dbwrap_rbt.h"
28 #include "lib/param/param.h"
29
30 #include "ctdb/include/ctdb_protocol.h"
31 #include "ctdbd_conn.h"
32 #include "dbwrap/dbwrap.h"
33 #include "dbwrap/dbwrap_private.h"
34 #include "dbwrap/dbwrap_ctdb.h"
35 #include "g_lock.h"
36 #include "messages.h"
37 #include "lib/cluster_support.h"
38 #include "lib/util/tevent_ntstatus.h"
39
40 struct db_ctdb_transaction_handle {
41         struct db_ctdb_ctx *ctx;
42         /*
43          * we store the writes done under a transaction:
44          */
45         struct ctdb_marshall_buffer *m_write;
46         uint32_t nesting;
47         bool nested_cancel;
48         char *lock_name;
49 };
50
51 struct db_ctdb_ctx {
52         struct db_context *db;
53         struct ctdbd_connection *conn;
54         struct tdb_wrap *wtdb;
55         uint32_t db_id;
56         struct db_ctdb_transaction_handle *transaction;
57         struct g_lock_ctx *lock_ctx;
58
59         /* thresholds for warning messages */
60         int warn_unlock_msecs;
61         int warn_migrate_msecs;
62         int warn_migrate_attempts;
63         int warn_locktime_msecs;
64 };
65
66 struct db_ctdb_rec {
67         struct db_ctdb_ctx *ctdb_ctx;
68         struct ctdb_ltdb_header header;
69         struct timeval lock_time;
70 };
71
72 struct ctdb_async_ctx {
73         bool initialized;
74         struct ctdbd_connection *async_conn;
75 };
76
77 static struct ctdb_async_ctx ctdb_async_ctx;
78
79 static int ctdb_async_ctx_init_internal(TALLOC_CTX *mem_ctx,
80                                         struct tevent_context *ev,
81                                         bool reinit)
82 {
83         int ret;
84
85         if (reinit) {
86                 TALLOC_FREE(ctdb_async_ctx.async_conn);
87                 ctdb_async_ctx.initialized = false;
88         }
89
90         if (ctdb_async_ctx.initialized) {
91                 return 0;
92         }
93
94         become_root();
95         ret = ctdbd_init_connection(mem_ctx,
96                                     lp_ctdbd_socket(),
97                                     lp_ctdb_timeout(),
98                                     &ctdb_async_ctx.async_conn);
99         unbecome_root();
100
101         if (ctdb_async_ctx.async_conn == NULL) {
102                 DBG_ERR("ctdbd_init_connection failed\n");
103                 return EIO;
104         }
105
106         ret = ctdbd_setup_fde(ctdb_async_ctx.async_conn, ev);
107         if (ret != 0) {
108                 DBG_ERR("ctdbd_setup_fde failed\n");
109                 TALLOC_FREE(ctdb_async_ctx.async_conn);
110                 return ret;
111         }
112
113         ctdb_async_ctx.initialized = true;
114         return 0;
115 }
116
117 static int ctdb_async_ctx_init(TALLOC_CTX *mem_ctx, struct tevent_context *ev)
118 {
119         return ctdb_async_ctx_init_internal(mem_ctx, ev, false);
120 }
121
122 int ctdb_async_ctx_reinit(TALLOC_CTX *mem_ctx, struct tevent_context *ev)
123 {
124         return ctdb_async_ctx_init_internal(mem_ctx, ev, true);
125 }
126
127 static NTSTATUS tdb_error_to_ntstatus(struct tdb_context *tdb)
128 {
129         enum TDB_ERROR tret = tdb_error(tdb);
130
131         return map_nt_error_from_tdb(tret);
132 }
133
134 struct db_ctdb_ltdb_parse_state {
135         void (*parser)(TDB_DATA key, struct ctdb_ltdb_header *header,
136                        TDB_DATA data, void *private_data);
137         void *private_data;
138 };
139
140 static int db_ctdb_ltdb_parser(TDB_DATA key, TDB_DATA data,
141                                void *private_data)
142 {
143         struct db_ctdb_ltdb_parse_state *state =
144                 (struct db_ctdb_ltdb_parse_state *)private_data;
145
146         if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
147                 return -1;
148         }
149
150         state->parser(
151                 key, (struct ctdb_ltdb_header *)data.dptr,
152                 make_tdb_data(data.dptr + sizeof(struct ctdb_ltdb_header),
153                               data.dsize - sizeof(struct ctdb_ltdb_header)),
154                 state->private_data);
155         return 0;
156 }
157
158 static NTSTATUS db_ctdb_ltdb_parse(
159         struct db_ctdb_ctx *db, TDB_DATA key,
160         void (*parser)(TDB_DATA key, struct ctdb_ltdb_header *header,
161                        TDB_DATA data, void *private_data),
162         void *private_data)
163 {
164         struct db_ctdb_ltdb_parse_state state;
165         int ret;
166
167         state.parser = parser;
168         state.private_data = private_data;
169
170         ret = tdb_parse_record(db->wtdb->tdb, key, db_ctdb_ltdb_parser,
171                                &state);
172         if (ret == -1) {
173                 return NT_STATUS_NOT_FOUND;
174         }
175         return NT_STATUS_OK;
176 }
177
178 /*
179  * Store a record together with the ctdb record header
180  * in the local copy of the database.
181  */
182 static NTSTATUS db_ctdb_ltdb_store(struct db_ctdb_ctx *db,
183                                    TDB_DATA key,
184                                    struct ctdb_ltdb_header *header,
185                                    TDB_DATA data)
186 {
187         TDB_DATA recs[2];
188         int ret;
189
190         recs[0] = (TDB_DATA) { .dptr = (uint8_t *)header,
191                                .dsize = sizeof(struct ctdb_ltdb_header) };
192         recs[1] = data;
193
194         ret = tdb_storev(db->wtdb->tdb, key, recs, 2, TDB_REPLACE);
195
196         return (ret == 0) ? NT_STATUS_OK
197                           : tdb_error_to_ntstatus(db->wtdb->tdb);
198
199 }
200
201 /*
202   form a ctdb_rec_data record from a key/data pair
203  */
204 static struct ctdb_rec_data_old *db_ctdb_marshall_record(TALLOC_CTX *mem_ctx, uint32_t reqid,
205                                                   TDB_DATA key,
206                                                   struct ctdb_ltdb_header *header,
207                                                   TDB_DATA data)
208 {
209         size_t length;
210         struct ctdb_rec_data_old *d;
211
212         length = offsetof(struct ctdb_rec_data_old, data) + key.dsize +
213                 data.dsize + sizeof(*header);
214         d = (struct ctdb_rec_data_old *)talloc_size(mem_ctx, length);
215         if (d == NULL) {
216                 return NULL;
217         }
218         d->length = length;
219         d->reqid = reqid;
220         d->keylen = key.dsize;
221         memcpy(&d->data[0], key.dptr, key.dsize);
222
223         d->datalen = data.dsize + sizeof(*header);
224         memcpy(&d->data[key.dsize], header, sizeof(*header));
225         memcpy(&d->data[key.dsize+sizeof(*header)], data.dptr, data.dsize);
226         return d;
227 }
228
229
230 /* helper function for marshalling multiple records */
231 static struct ctdb_marshall_buffer *db_ctdb_marshall_add(TALLOC_CTX *mem_ctx,
232                                                struct ctdb_marshall_buffer *m,
233                                                uint32_t db_id,
234                                                uint32_t reqid,
235                                                TDB_DATA key,
236                                                struct ctdb_ltdb_header *header,
237                                                TDB_DATA data)
238 {
239         struct ctdb_rec_data_old *r;
240         size_t m_size, r_size;
241         struct ctdb_marshall_buffer *m2 = NULL;
242
243         r = db_ctdb_marshall_record(talloc_tos(), reqid, key, header, data);
244         if (r == NULL) {
245                 talloc_free(m);
246                 return NULL;
247         }
248
249         if (m == NULL) {
250                 m = (struct ctdb_marshall_buffer *)talloc_zero_size(
251                         mem_ctx, offsetof(struct ctdb_marshall_buffer, data));
252                 if (m == NULL) {
253                         goto done;
254                 }
255                 m->db_id = db_id;
256         }
257
258         m_size = talloc_get_size(m);
259         r_size = talloc_get_size(r);
260
261         m2 = (struct ctdb_marshall_buffer *)talloc_realloc_size(
262                 mem_ctx, m,  m_size + r_size);
263         if (m2 == NULL) {
264                 talloc_free(m);
265                 goto done;
266         }
267
268         memcpy(m_size + (uint8_t *)m2, r, r_size);
269
270         m2->count++;
271
272 done:
273         talloc_free(r);
274         return m2;
275 }
276
277 /* we've finished marshalling, return a data blob with the marshalled records */
278 static TDB_DATA db_ctdb_marshall_finish(struct ctdb_marshall_buffer *m)
279 {
280         TDB_DATA data;
281         data.dptr = (uint8_t *)m;
282         data.dsize = talloc_get_size(m);
283         return data;
284 }
285
286 /*
287    loop over a marshalling buffer
288
289      - pass r==NULL to start
290      - loop the number of times indicated by m->count
291 */
292 static struct ctdb_rec_data_old *db_ctdb_marshall_loop_next_key(
293         struct ctdb_marshall_buffer *m, struct ctdb_rec_data_old *r, TDB_DATA *key)
294 {
295         if (r == NULL) {
296                 r = (struct ctdb_rec_data_old *)&m->data[0];
297         } else {
298                 r = (struct ctdb_rec_data_old *)(r->length + (uint8_t *)r);
299         }
300
301         key->dptr   = &r->data[0];
302         key->dsize  = r->keylen;
303         return r;
304 }
305
306 static bool db_ctdb_marshall_buf_parse(
307         struct ctdb_rec_data_old *r, uint32_t *reqid,
308         struct ctdb_ltdb_header **header, TDB_DATA *data)
309 {
310         if (r->datalen < sizeof(struct ctdb_ltdb_header)) {
311                 return false;
312         }
313
314         *reqid = r->reqid;
315
316         data->dptr  = &r->data[r->keylen] + sizeof(struct ctdb_ltdb_header);
317         data->dsize = r->datalen - sizeof(struct ctdb_ltdb_header);
318
319         *header = (struct ctdb_ltdb_header *)&r->data[r->keylen];
320
321         return true;
322 }
323
324 /**
325  * CTDB transaction destructor
326  */
327 static int db_ctdb_transaction_destructor(struct db_ctdb_transaction_handle *h)
328 {
329         NTSTATUS status;
330
331         status = g_lock_unlock(h->ctx->lock_ctx, h->lock_name);
332         if (!NT_STATUS_IS_OK(status)) {
333                 DEBUG(0, ("g_lock_unlock failed for %s: %s\n", h->lock_name,
334                           nt_errstr(status)));
335                 return -1;
336         }
337         return 0;
338 }
339
340 /**
341  * CTDB dbwrap API: transaction_start function
342  * starts a transaction on a persistent database
343  */
344 static int db_ctdb_transaction_start(struct db_context *db)
345 {
346         struct db_ctdb_transaction_handle *h;
347         NTSTATUS status;
348         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
349                                                         struct db_ctdb_ctx);
350
351         if (!db->persistent) {
352                 DEBUG(0,("transactions not supported on non-persistent database 0x%08x\n", 
353                          ctx->db_id));
354                 return -1;
355         }
356
357         if (ctx->transaction) {
358                 ctx->transaction->nesting++;
359                 DEBUG(5, (__location__ " transaction start on db 0x%08x: nesting %d -> %d\n",
360                           ctx->db_id, ctx->transaction->nesting - 1, ctx->transaction->nesting));
361                 return 0;
362         }
363
364         h = talloc_zero(db, struct db_ctdb_transaction_handle);
365         if (h == NULL) {
366                 DEBUG(0,(__location__ " oom for transaction handle\n"));
367                 return -1;
368         }
369
370         h->ctx = ctx;
371
372         h->lock_name = talloc_asprintf(h, "transaction_db_0x%08x",
373                                        (unsigned int)ctx->db_id);
374         if (h->lock_name == NULL) {
375                 DEBUG(0, ("talloc_asprintf failed\n"));
376                 TALLOC_FREE(h);
377                 return -1;
378         }
379
380         /*
381          * Wait a day, i.e. forever...
382          */
383         status = g_lock_lock(ctx->lock_ctx, h->lock_name, G_LOCK_WRITE,
384                              timeval_set(86400, 0));
385         if (!NT_STATUS_IS_OK(status)) {
386                 DEBUG(0, ("g_lock_lock failed: %s\n", nt_errstr(status)));
387                 TALLOC_FREE(h);
388                 return -1;
389         }
390
391         talloc_set_destructor(h, db_ctdb_transaction_destructor);
392
393         ctx->transaction = h;
394
395         DEBUG(5,(__location__ " transaction started on db 0x%08x\n", ctx->db_id));
396
397         return 0;
398 }
399
400 static bool parse_newest_in_marshall_buffer(
401         struct ctdb_marshall_buffer *buf, TDB_DATA key,
402         void (*parser)(TDB_DATA key, struct ctdb_ltdb_header *header,
403                        TDB_DATA data, void *private_data),
404         void *private_data)
405 {
406         struct ctdb_rec_data_old *rec = NULL;
407         struct ctdb_ltdb_header *h = NULL;
408         TDB_DATA data;
409         uint32_t i;
410
411         if (buf == NULL) {
412                 return false;
413         }
414
415         /*
416          * Walk the list of records written during this
417          * transaction. If we want to read one we have already
418          * written, return the last written sample. Thus we do not do
419          * a "break;" for the first hit, this record might have been
420          * overwritten later.
421          */
422
423         for (i=0; i<buf->count; i++) {
424                 TDB_DATA tkey;
425                 uint32_t reqid;
426
427                 rec = db_ctdb_marshall_loop_next_key(buf, rec, &tkey);
428                 if (rec == NULL) {
429                         return false;
430                 }
431
432                 if (!tdb_data_equal(key, tkey)) {
433                         continue;
434                 }
435
436                 if (!db_ctdb_marshall_buf_parse(rec, &reqid, &h, &data)) {
437                         return false;
438                 }
439         }
440
441         if (h == NULL) {
442                 return false;
443         }
444
445         parser(key, h, data, private_data);
446
447         return true;
448 }
449
450 struct pull_newest_from_marshall_buffer_state {
451         struct ctdb_ltdb_header *pheader;
452         TALLOC_CTX *mem_ctx;
453         TDB_DATA *pdata;
454 };
455
456 static void pull_newest_from_marshall_buffer_parser(
457         TDB_DATA key, struct ctdb_ltdb_header *header,
458         TDB_DATA data, void *private_data)
459 {
460         struct pull_newest_from_marshall_buffer_state *state =
461                 (struct pull_newest_from_marshall_buffer_state *)private_data;
462
463         if (state->pheader != NULL) {
464                 memcpy(state->pheader, header, sizeof(*state->pheader));
465         }
466         if (state->pdata != NULL) {
467                 state->pdata->dsize = data.dsize;
468                 state->pdata->dptr = (uint8_t *)talloc_memdup(
469                         state->mem_ctx, data.dptr, data.dsize);
470         }
471 }
472
473 static bool pull_newest_from_marshall_buffer(struct ctdb_marshall_buffer *buf,
474                                              TDB_DATA key,
475                                              struct ctdb_ltdb_header *pheader,
476                                              TALLOC_CTX *mem_ctx,
477                                              TDB_DATA *pdata)
478 {
479         struct pull_newest_from_marshall_buffer_state state;
480
481         state.pheader = pheader;
482         state.mem_ctx = mem_ctx;
483         state.pdata = pdata;
484
485         if (!parse_newest_in_marshall_buffer(
486                     buf, key, pull_newest_from_marshall_buffer_parser,
487                     &state)) {
488                 return false;
489         }
490         if ((pdata != NULL) && (pdata->dsize != 0) && (pdata->dptr == NULL)) {
491                 /* ENOMEM */
492                 return false;
493         }
494         return true;
495 }
496
497 static NTSTATUS db_ctdb_store_transaction(struct db_record *rec, TDB_DATA data, int flag);
498 static NTSTATUS db_ctdb_delete_transaction(struct db_record *rec);
499
500 static struct db_record *db_ctdb_fetch_locked_transaction(struct db_ctdb_ctx *ctx,
501                                                           TALLOC_CTX *mem_ctx,
502                                                           TDB_DATA key)
503 {
504         struct db_record *result;
505         TDB_DATA ctdb_data;
506
507         if (!(result = talloc(mem_ctx, struct db_record))) {
508                 DEBUG(0, ("talloc failed\n"));
509                 return NULL;
510         }
511
512         result->db = ctx->db;
513         result->private_data = ctx->transaction;
514
515         result->key.dsize = key.dsize;
516         result->key.dptr = (uint8_t *)talloc_memdup(result, key.dptr,
517                                                     key.dsize);
518         if (result->key.dptr == NULL) {
519                 DEBUG(0, ("talloc failed\n"));
520                 TALLOC_FREE(result);
521                 return NULL;
522         }
523
524         result->store = db_ctdb_store_transaction;
525         result->delete_rec = db_ctdb_delete_transaction;
526
527         if (pull_newest_from_marshall_buffer(ctx->transaction->m_write, key,
528                                              NULL, result, &result->value)) {
529                 return result;
530         }
531
532         ctdb_data = tdb_fetch(ctx->wtdb->tdb, key);
533         if (ctdb_data.dptr == NULL) {
534                 /* create the record */
535                 result->value = tdb_null;
536                 return result;
537         }
538
539         result->value.dsize = ctdb_data.dsize - sizeof(struct ctdb_ltdb_header);
540         result->value.dptr = NULL;
541
542         if ((result->value.dsize != 0)
543             && !(result->value.dptr = (uint8_t *)talloc_memdup(
544                          result, ctdb_data.dptr + sizeof(struct ctdb_ltdb_header),
545                          result->value.dsize))) {
546                 DEBUG(0, ("talloc failed\n"));
547                 TALLOC_FREE(result);
548         }
549
550         SAFE_FREE(ctdb_data.dptr);
551
552         return result;
553 }
554
555 static int db_ctdb_record_destructor(struct db_record **recp)
556 {
557         struct db_record *rec = talloc_get_type_abort(*recp, struct db_record);
558         struct db_ctdb_transaction_handle *h = talloc_get_type_abort(
559                 rec->private_data, struct db_ctdb_transaction_handle);
560         int ret = h->ctx->db->transaction_commit(h->ctx->db);
561         if (ret != 0) {
562                 DEBUG(0,(__location__ " transaction_commit failed\n"));
563         }
564         return 0;
565 }
566
567 /*
568   auto-create a transaction for persistent databases
569  */
570 static struct db_record *db_ctdb_fetch_locked_persistent(struct db_ctdb_ctx *ctx,
571                                                          TALLOC_CTX *mem_ctx,
572                                                          TDB_DATA key)
573 {
574         int res;
575         struct db_record *rec, **recp;
576
577         res = db_ctdb_transaction_start(ctx->db);
578         if (res == -1) {
579                 return NULL;
580         }
581
582         rec = db_ctdb_fetch_locked_transaction(ctx, mem_ctx, key);
583         if (rec == NULL) {
584                 ctx->db->transaction_cancel(ctx->db);
585                 return NULL;
586         }
587
588         /* destroy this transaction when we release the lock */
589         recp = talloc(rec, struct db_record *);
590         if (recp == NULL) {
591                 ctx->db->transaction_cancel(ctx->db);
592                 talloc_free(rec);
593                 return NULL;
594         }
595         *recp = rec;
596         talloc_set_destructor(recp, db_ctdb_record_destructor);
597         return rec;
598 }
599
600
601 /*
602   stores a record inside a transaction
603  */
604 static NTSTATUS db_ctdb_transaction_store(struct db_ctdb_transaction_handle *h,
605                                           TDB_DATA key, TDB_DATA data)
606 {
607         TALLOC_CTX *tmp_ctx = talloc_new(h);
608         TDB_DATA rec;
609         struct ctdb_ltdb_header header;
610
611         ZERO_STRUCT(header);
612
613         /* we need the header so we can update the RSN */
614
615         if (!pull_newest_from_marshall_buffer(h->m_write, key, &header,
616                                               NULL, NULL)) {
617
618                 rec = tdb_fetch(h->ctx->wtdb->tdb, key);
619
620                 if (rec.dptr != NULL) {
621                         memcpy(&header, rec.dptr,
622                                sizeof(struct ctdb_ltdb_header));
623                         rec.dsize -= sizeof(struct ctdb_ltdb_header);
624
625                         /*
626                          * a special case, we are writing the same
627                          * data that is there now
628                          */
629                         if (data.dsize == rec.dsize &&
630                             memcmp(data.dptr,
631                                    rec.dptr + sizeof(struct ctdb_ltdb_header),
632                                    data.dsize) == 0) {
633                                 SAFE_FREE(rec.dptr);
634                                 talloc_free(tmp_ctx);
635                                 return NT_STATUS_OK;
636                         }
637                 }
638                 SAFE_FREE(rec.dptr);
639         }
640
641         header.dmaster = ctdbd_vnn(h->ctx->conn);
642         header.rsn++;
643
644         h->m_write = db_ctdb_marshall_add(h, h->m_write, h->ctx->db_id, 0, key, &header, data);
645         if (h->m_write == NULL) {
646                 DEBUG(0,(__location__ " Failed to add to marshalling record\n"));
647                 talloc_free(tmp_ctx);
648                 return NT_STATUS_NO_MEMORY;
649         }
650
651         talloc_free(tmp_ctx);
652         return NT_STATUS_OK;
653 }
654
655
656 /* 
657    a record store inside a transaction
658  */
659 static NTSTATUS db_ctdb_store_transaction(struct db_record *rec, TDB_DATA data, int flag)
660 {
661         struct db_ctdb_transaction_handle *h = talloc_get_type_abort(
662                 rec->private_data, struct db_ctdb_transaction_handle);
663         NTSTATUS status;
664
665         status = db_ctdb_transaction_store(h, rec->key, data);
666         return status;
667 }
668
669 /*
670    a record delete inside a transaction
671  */
672 static NTSTATUS db_ctdb_delete_transaction(struct db_record *rec)
673 {
674         struct db_ctdb_transaction_handle *h = talloc_get_type_abort(
675                 rec->private_data, struct db_ctdb_transaction_handle);
676         NTSTATUS status;
677
678         status =  db_ctdb_transaction_store(h, rec->key, tdb_null);
679         return status;
680 }
681
682 static void db_ctdb_fetch_db_seqnum_parser(
683         TDB_DATA key, struct ctdb_ltdb_header *header,
684         TDB_DATA data, void *private_data)
685 {
686         uint64_t *seqnum = (uint64_t *)private_data;
687
688         if (data.dsize != sizeof(uint64_t)) {
689                 *seqnum = 0;
690                 return;
691         }
692         memcpy(seqnum, data.dptr, sizeof(*seqnum));
693 }
694
695 /**
696  * Fetch the db sequence number of a persistent db directly from the db.
697  */
698 static NTSTATUS db_ctdb_fetch_db_seqnum_from_db(struct db_ctdb_ctx *db,
699                                                 uint64_t *seqnum)
700 {
701         NTSTATUS status;
702         TDB_DATA key;
703
704         if (seqnum == NULL) {
705                 return NT_STATUS_INVALID_PARAMETER;
706         }
707
708         key = string_term_tdb_data(CTDB_DB_SEQNUM_KEY);
709
710         status = db_ctdb_ltdb_parse(
711                 db, key, db_ctdb_fetch_db_seqnum_parser, seqnum);
712
713         if (NT_STATUS_IS_OK(status)) {
714                 return NT_STATUS_OK;
715         }
716         if (NT_STATUS_EQUAL(status, NT_STATUS_NOT_FOUND)) {
717                 *seqnum = 0;
718                 return NT_STATUS_OK;
719         }
720         return status;
721 }
722
723 /**
724  * Store the database sequence number inside a transaction.
725  */
726 static NTSTATUS db_ctdb_store_db_seqnum(struct db_ctdb_transaction_handle *h,
727                                         uint64_t seqnum)
728 {
729         NTSTATUS status;
730         const char *keyname = CTDB_DB_SEQNUM_KEY;
731         TDB_DATA key;
732         TDB_DATA data;
733
734         key = string_term_tdb_data(keyname);
735
736         data.dptr = (uint8_t *)&seqnum;
737         data.dsize = sizeof(uint64_t);
738
739         status = db_ctdb_transaction_store(h, key, data);
740
741         return status;
742 }
743
744 /*
745   commit a transaction
746  */
747 static int db_ctdb_transaction_commit(struct db_context *db)
748 {
749         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
750                                                         struct db_ctdb_ctx);
751         NTSTATUS rets;
752         int32_t status;
753         struct db_ctdb_transaction_handle *h = ctx->transaction;
754         uint64_t old_seqnum, new_seqnum;
755         int ret;
756
757         if (h == NULL) {
758                 DEBUG(0,(__location__ " transaction commit with no open transaction on db 0x%08x\n", ctx->db_id));
759                 return -1;
760         }
761
762         if (h->nested_cancel) {
763                 db->transaction_cancel(db);
764                 DEBUG(5,(__location__ " Failed transaction commit after nested cancel\n"));
765                 return -1;
766         }
767
768         if (h->nesting != 0) {
769                 h->nesting--;
770                 DEBUG(5, (__location__ " transaction commit on db 0x%08x: nesting %d -> %d\n",
771                           ctx->db_id, ctx->transaction->nesting + 1, ctx->transaction->nesting));
772                 return 0;
773         }
774
775         if (h->m_write == NULL) {
776                 /*
777                  * No changes were made, so don't change the seqnum,
778                  * don't push to other node, just exit with success.
779                  */
780                 ret = 0;
781                 goto done;
782         }
783
784         DEBUG(5,(__location__ " transaction commit on db 0x%08x\n", ctx->db_id));
785
786         /*
787          * As the last db action before committing, bump the database sequence
788          * number. Note that this undoes all changes to the seqnum records
789          * performed under the transaction. This record is not meant to be
790          * modified by user interaction. It is for internal use only...
791          */
792         rets = db_ctdb_fetch_db_seqnum_from_db(ctx, &old_seqnum);
793         if (!NT_STATUS_IS_OK(rets)) {
794                 DEBUG(1, (__location__ " failed to fetch the db sequence number "
795                           "in transaction commit on db 0x%08x\n", ctx->db_id));
796                 ret = -1;
797                 goto done;
798         }
799
800         new_seqnum = old_seqnum + 1;
801
802         rets = db_ctdb_store_db_seqnum(h, new_seqnum);
803         if (!NT_STATUS_IS_OK(rets)) {
804                 DEBUG(1, (__location__ "failed to store the db sequence number "
805                           " in transaction commit on db 0x%08x\n", ctx->db_id));
806                 ret = -1;
807                 goto done;
808         }
809
810 again:
811         /* tell ctdbd to commit to the other nodes */
812         ret = ctdbd_control_local(ctx->conn, CTDB_CONTROL_TRANS3_COMMIT,
813                                   h->ctx->db_id, 0,
814                                   db_ctdb_marshall_finish(h->m_write),
815                                   NULL, NULL, &status);
816         if ((ret != 0) || status != 0) {
817                 /*
818                  * The TRANS3_COMMIT control should only possibly fail when a
819                  * recovery has been running concurrently. In any case, the db
820                  * will be the same on all nodes, either the new copy or the
821                  * old copy.  This can be detected by comparing the old and new
822                  * local sequence numbers.
823                  */
824                 rets = db_ctdb_fetch_db_seqnum_from_db(ctx, &new_seqnum);
825                 if (!NT_STATUS_IS_OK(rets)) {
826                         DEBUG(1, (__location__ " failed to refetch db sequence "
827                                   "number after failed TRANS3_COMMIT\n"));
828                         ret = -1;
829                         goto done;
830                 }
831
832                 if (new_seqnum == old_seqnum) {
833                         /* Recovery prevented all our changes: retry. */
834                         goto again;
835                 }
836                 if (new_seqnum != (old_seqnum + 1)) {
837                         DEBUG(0, (__location__ " ERROR: new_seqnum[%lu] != "
838                                   "old_seqnum[%lu] + (0 or 1) after failed "
839                                   "TRANS3_COMMIT - this should not happen!\n",
840                                   (unsigned long)new_seqnum,
841                                   (unsigned long)old_seqnum));
842                         ret = -1;
843                         goto done;
844                 }
845                 /*
846                  * Recovery propagated our changes to all nodes, completing
847                  * our commit for us - succeed.
848                  */
849         }
850
851         ret = 0;
852
853 done:
854         h->ctx->transaction = NULL;
855         talloc_free(h);
856         return ret;
857 }
858
859
860 /*
861   cancel a transaction
862  */
863 static int db_ctdb_transaction_cancel(struct db_context *db)
864 {
865         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
866                                                         struct db_ctdb_ctx);
867         struct db_ctdb_transaction_handle *h = ctx->transaction;
868
869         if (h == NULL) {
870                 DEBUG(0,(__location__ " transaction cancel with no open transaction on db 0x%08x\n", ctx->db_id));
871                 return -1;
872         }
873
874         if (h->nesting != 0) {
875                 h->nesting--;
876                 h->nested_cancel = true;
877                 DEBUG(5, (__location__ " transaction cancel on db 0x%08x: nesting %d -> %d\n",
878                           ctx->db_id, ctx->transaction->nesting + 1, ctx->transaction->nesting));
879                 return 0;
880         }
881
882         DEBUG(5,(__location__ " Cancel transaction on db 0x%08x\n", ctx->db_id));
883
884         ctx->transaction = NULL;
885         talloc_free(h);
886         return 0;
887 }
888
889
890 static NTSTATUS db_ctdb_store(struct db_record *rec, TDB_DATA data, int flag)
891 {
892         struct db_ctdb_rec *crec = talloc_get_type_abort(
893                 rec->private_data, struct db_ctdb_rec);
894
895         return db_ctdb_ltdb_store(crec->ctdb_ctx, rec->key, &(crec->header), data);
896 }
897
898
899
900 static NTSTATUS db_ctdb_send_schedule_for_deletion(struct db_record *rec)
901 {
902         NTSTATUS status = NT_STATUS_OK;
903         int ret;
904         struct ctdb_control_schedule_for_deletion *dd;
905         TDB_DATA indata;
906         int32_t cstatus;
907         struct db_ctdb_rec *crec = talloc_get_type_abort(
908                 rec->private_data, struct db_ctdb_rec);
909         struct db_ctdb_ctx *ctx = crec->ctdb_ctx;
910
911         indata.dsize = offsetof(struct ctdb_control_schedule_for_deletion, key) + rec->key.dsize;
912         indata.dptr = talloc_zero_array(crec, uint8_t, indata.dsize);
913         if (indata.dptr == NULL) {
914                 DEBUG(0, (__location__ " talloc failed!\n"));
915                 return NT_STATUS_NO_MEMORY;
916         }
917
918         dd = (struct ctdb_control_schedule_for_deletion *)(void *)indata.dptr;
919         dd->db_id = ctx->db_id;
920         dd->hdr = crec->header;
921         dd->keylen = rec->key.dsize;
922         memcpy(dd->key, rec->key.dptr, rec->key.dsize);
923
924         ret = ctdbd_control_local(ctx->conn,
925                                   CTDB_CONTROL_SCHEDULE_FOR_DELETION,
926                                   crec->ctdb_ctx->db_id,
927                                   CTDB_CTRL_FLAG_NOREPLY, /* flags */
928                                   indata,
929                                   NULL, /* outdata */
930                                   NULL, /* errmsg */
931                                   &cstatus);
932         talloc_free(indata.dptr);
933
934         if ((ret != 0) || cstatus != 0) {
935                 DEBUG(1, (__location__ " Error sending local control "
936                           "SCHEDULE_FOR_DELETION: %s, cstatus = %"PRIi32"\n",
937                           strerror(ret), cstatus));
938                 if (ret != 0) {
939                         status = map_nt_error_from_unix(ret);
940                 } else {
941                         status = NT_STATUS_UNSUCCESSFUL;
942                 }
943         }
944
945         return status;
946 }
947
948 static NTSTATUS db_ctdb_delete(struct db_record *rec)
949 {
950         NTSTATUS status;
951
952         /*
953          * We have to store the header with empty data. TODO: Fix the
954          * tdb-level cleanup
955          */
956
957         status = db_ctdb_store(rec, tdb_null, 0);
958         if (!NT_STATUS_IS_OK(status)) {
959                 return status;
960         }
961
962         status = db_ctdb_send_schedule_for_deletion(rec);
963         return status;
964 }
965
966 static int db_ctdb_record_destr(struct db_record* data)
967 {
968         struct db_ctdb_rec *crec = talloc_get_type_abort(
969                 data->private_data, struct db_ctdb_rec);
970         int threshold;
971         int ret;
972         struct timeval before;
973         double timediff;
974
975         DEBUG(10, (DEBUGLEVEL > 10
976                    ? "Unlocking db %u key %s\n"
977                    : "Unlocking db %u key %.20s\n",
978                    (int)crec->ctdb_ctx->db_id,
979                    hex_encode_talloc(data, (unsigned char *)data->key.dptr,
980                               data->key.dsize)));
981
982         before = timeval_current();
983
984         ret = tdb_chainunlock(crec->ctdb_ctx->wtdb->tdb, data->key);
985
986         timediff = timeval_elapsed(&before);
987         timediff *= 1000;       /* get us milliseconds */
988
989         if (timediff > crec->ctdb_ctx->warn_unlock_msecs) {
990                 char *key;
991                 key = hex_encode_talloc(talloc_tos(),
992                                         (unsigned char *)data->key.dptr,
993                                         data->key.dsize);
994                 DEBUG(0, ("tdb_chainunlock on db %s, key %s took %f milliseconds\n",
995                           tdb_name(crec->ctdb_ctx->wtdb->tdb), key,
996                           timediff));
997                 TALLOC_FREE(key);
998         }
999
1000         if (ret != 0) {
1001                 DEBUG(0, ("tdb_chainunlock failed\n"));
1002                 return -1;
1003         }
1004
1005         threshold = crec->ctdb_ctx->warn_locktime_msecs;
1006         if (threshold != 0) {
1007                 timediff = timeval_elapsed(&crec->lock_time) * 1000;
1008                 if (timediff > threshold) {
1009                         const char *key;
1010
1011                         key = hex_encode_talloc(data,
1012                                                 (unsigned char *)data->key.dptr,
1013                                                 data->key.dsize);
1014                         DEBUG(0, ("Held tdb lock on db %s, key %s "
1015                                   "%f milliseconds\n",
1016                                   tdb_name(crec->ctdb_ctx->wtdb->tdb),
1017                                   key, timediff));
1018                 }
1019         }
1020
1021         return 0;
1022 }
1023
1024 /**
1025  * Check whether we have a valid local copy of the given record,
1026  * either for reading or for writing.
1027  */
1028 static bool db_ctdb_can_use_local_hdr(const struct ctdb_ltdb_header *hdr,
1029                                       uint32_t my_vnn, bool read_only)
1030 {
1031         if (hdr->dmaster != my_vnn) {
1032                 /* If we're not dmaster, it must be r/o copy. */
1033                 return read_only && (hdr->flags & CTDB_REC_RO_HAVE_READONLY);
1034         }
1035
1036         /*
1037          * If we want write access, no one may have r/o copies.
1038          */
1039         return read_only || !(hdr->flags & CTDB_REC_RO_HAVE_DELEGATIONS);
1040 }
1041
1042 static bool db_ctdb_can_use_local_copy(TDB_DATA ctdb_data, uint32_t my_vnn,
1043                                        bool read_only)
1044 {
1045         if (ctdb_data.dptr == NULL) {
1046                 return false;
1047         }
1048
1049         if (ctdb_data.dsize < sizeof(struct ctdb_ltdb_header)) {
1050                 return false;
1051         }
1052
1053         return db_ctdb_can_use_local_hdr(
1054                 (struct ctdb_ltdb_header *)ctdb_data.dptr, my_vnn, read_only);
1055 }
1056
1057 static struct db_record *fetch_locked_internal(struct db_ctdb_ctx *ctx,
1058                                                TALLOC_CTX *mem_ctx,
1059                                                TDB_DATA key,
1060                                                bool tryonly)
1061 {
1062         struct db_record *result;
1063         struct db_ctdb_rec *crec;
1064         TDB_DATA ctdb_data;
1065         int migrate_attempts;
1066         struct timeval migrate_start;
1067         struct timeval chainlock_start;
1068         struct timeval ctdb_start_time;
1069         double chainlock_time = 0;
1070         double ctdb_time = 0;
1071         int duration_msecs;
1072         int lockret;
1073         int ret;
1074
1075         if (!(result = talloc(mem_ctx, struct db_record))) {
1076                 DEBUG(0, ("talloc failed\n"));
1077                 return NULL;
1078         }
1079
1080         if (!(crec = talloc_zero(result, struct db_ctdb_rec))) {
1081                 DEBUG(0, ("talloc failed\n"));
1082                 TALLOC_FREE(result);
1083                 return NULL;
1084         }
1085
1086         result->db = ctx->db;
1087         result->private_data = (void *)crec;
1088         crec->ctdb_ctx = ctx;
1089
1090         result->key.dsize = key.dsize;
1091         result->key.dptr = (uint8_t *)talloc_memdup(result, key.dptr,
1092                                                     key.dsize);
1093         if (result->key.dptr == NULL) {
1094                 DEBUG(0, ("talloc failed\n"));
1095                 TALLOC_FREE(result);
1096                 return NULL;
1097         }
1098
1099         migrate_attempts = 0;
1100         GetTimeOfDay(&migrate_start);
1101
1102         /*
1103          * Do a blocking lock on the record
1104          */
1105 again:
1106
1107         if (DEBUGLEVEL >= 10) {
1108                 char *keystr = hex_encode_talloc(result, key.dptr, key.dsize);
1109                 DEBUG(10, (DEBUGLEVEL > 10
1110                            ? "Locking db %u key %s\n"
1111                            : "Locking db %u key %.20s\n",
1112                            (int)crec->ctdb_ctx->db_id, keystr));
1113                 TALLOC_FREE(keystr);
1114         }
1115
1116         GetTimeOfDay(&chainlock_start);
1117         lockret = tryonly
1118                 ? tdb_chainlock_nonblock(ctx->wtdb->tdb, key)
1119                 : tdb_chainlock(ctx->wtdb->tdb, key);
1120         chainlock_time += timeval_elapsed(&chainlock_start);
1121
1122         if (lockret != 0) {
1123                 DEBUG(3, ("tdb_chainlock failed\n"));
1124                 TALLOC_FREE(result);
1125                 return NULL;
1126         }
1127
1128         result->store = db_ctdb_store;
1129         result->delete_rec = db_ctdb_delete;
1130         talloc_set_destructor(result, db_ctdb_record_destr);
1131
1132         ctdb_data = tdb_fetch(ctx->wtdb->tdb, key);
1133
1134         /*
1135          * See if we have a valid record and we are the dmaster. If so, we can
1136          * take the shortcut and just return it.
1137          */
1138
1139         if (!db_ctdb_can_use_local_copy(ctdb_data, ctdbd_vnn(ctx->conn),
1140                                         false)) {
1141                 SAFE_FREE(ctdb_data.dptr);
1142                 tdb_chainunlock(ctx->wtdb->tdb, key);
1143                 talloc_set_destructor(result, NULL);
1144
1145                 if (tryonly && (migrate_attempts != 0)) {
1146                         DEBUG(5, ("record migrated away again\n"));
1147                         TALLOC_FREE(result);
1148                         return NULL;
1149                 }
1150
1151                 migrate_attempts += 1;
1152
1153                 DEBUG(10, ("ctdb_data.dptr = %p, dmaster = %"PRIu32" "
1154                            "(%"PRIu32") %"PRIu32"\n",
1155                            ctdb_data.dptr, ctdb_data.dptr ?
1156                            ((struct ctdb_ltdb_header *)ctdb_data.dptr)->dmaster :
1157                            UINT32_MAX,
1158                            ctdbd_vnn(ctx->conn),
1159                            ctdb_data.dptr ?
1160                            ((struct ctdb_ltdb_header *)ctdb_data.dptr)->flags : 0));
1161
1162                 GetTimeOfDay(&ctdb_start_time);
1163                 ret = ctdbd_migrate(ctx->conn, ctx->db_id, key);
1164                 ctdb_time += timeval_elapsed(&ctdb_start_time);
1165
1166                 if (ret != 0) {
1167                         DEBUG(5, ("ctdbd_migrate failed: %s\n",
1168                                   strerror(ret)));
1169                         TALLOC_FREE(result);
1170                         return NULL;
1171                 }
1172                 /* now its migrated, try again */
1173                 goto again;
1174         }
1175
1176         {
1177                 double duration;
1178                 duration = timeval_elapsed(&migrate_start);
1179
1180                 /*
1181                  * Convert the duration to milliseconds to avoid a
1182                  * floating-point division of
1183                  * lp_parm_int("migrate_duration") by 1000.
1184                  */
1185                 duration_msecs = duration * 1000;
1186         }
1187
1188         if ((migrate_attempts > ctx->warn_migrate_attempts) ||
1189             (duration_msecs > ctx->warn_migrate_msecs)) {
1190                 int chain = 0;
1191
1192                 if (tdb_get_flags(ctx->wtdb->tdb) & TDB_INCOMPATIBLE_HASH) {
1193                         chain = tdb_jenkins_hash(&key) %
1194                                 tdb_hash_size(ctx->wtdb->tdb);
1195                 }
1196
1197                 DEBUG(0, ("db_ctdb_fetch_locked for %s key %s, chain %d "
1198                           "needed %d attempts, %d milliseconds, "
1199                           "chainlock: %f ms, CTDB %f ms\n",
1200                           tdb_name(ctx->wtdb->tdb),
1201                           hex_encode_talloc(talloc_tos(),
1202                                             (unsigned char *)key.dptr,
1203                                             key.dsize),
1204                           chain,
1205                           migrate_attempts, duration_msecs,
1206                           chainlock_time * 1000.0,
1207                           ctdb_time * 1000.0));
1208         }
1209
1210         GetTimeOfDay(&crec->lock_time);
1211
1212         memcpy(&crec->header, ctdb_data.dptr, sizeof(crec->header));
1213
1214         result->value.dsize = ctdb_data.dsize - sizeof(crec->header);
1215         result->value.dptr = NULL;
1216
1217         if ((result->value.dsize != 0)
1218             && !(result->value.dptr = (uint8_t *)talloc_memdup(
1219                          result, ctdb_data.dptr + sizeof(crec->header),
1220                          result->value.dsize))) {
1221                 DEBUG(0, ("talloc failed\n"));
1222                 TALLOC_FREE(result);
1223         }
1224
1225         SAFE_FREE(ctdb_data.dptr);
1226
1227         return result;
1228 }
1229
1230 static struct db_record *db_ctdb_fetch_locked(struct db_context *db,
1231                                               TALLOC_CTX *mem_ctx,
1232                                               TDB_DATA key)
1233 {
1234         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1235                                                         struct db_ctdb_ctx);
1236
1237         if (ctx->transaction != NULL) {
1238                 return db_ctdb_fetch_locked_transaction(ctx, mem_ctx, key);
1239         }
1240
1241         if (db->persistent) {
1242                 return db_ctdb_fetch_locked_persistent(ctx, mem_ctx, key);
1243         }
1244
1245         return fetch_locked_internal(ctx, mem_ctx, key, false);
1246 }
1247
1248 static struct db_record *db_ctdb_try_fetch_locked(struct db_context *db,
1249                                                   TALLOC_CTX *mem_ctx,
1250                                                   TDB_DATA key)
1251 {
1252         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1253                                                         struct db_ctdb_ctx);
1254
1255         if (ctx->transaction != NULL) {
1256                 return db_ctdb_fetch_locked_transaction(ctx, mem_ctx, key);
1257         }
1258
1259         if (db->persistent) {
1260                 return db_ctdb_fetch_locked_persistent(ctx, mem_ctx, key);
1261         }
1262
1263         return fetch_locked_internal(ctx, mem_ctx, key, true);
1264 }
1265
1266 struct db_ctdb_parse_record_state {
1267         void (*parser)(TDB_DATA key, TDB_DATA data, void *private_data);
1268         void *private_data;
1269         uint32_t my_vnn;
1270         bool ask_for_readonly_copy;
1271         bool done;
1272         bool empty_record;
1273 };
1274
1275 static void db_ctdb_parse_record_parser(
1276         TDB_DATA key, struct ctdb_ltdb_header *header,
1277         TDB_DATA data, void *private_data)
1278 {
1279         struct db_ctdb_parse_record_state *state =
1280                 (struct db_ctdb_parse_record_state *)private_data;
1281         state->parser(key, data, state->private_data);
1282 }
1283
1284 static void db_ctdb_parse_record_parser_nonpersistent(
1285         TDB_DATA key, struct ctdb_ltdb_header *header,
1286         TDB_DATA data, void *private_data)
1287 {
1288         struct db_ctdb_parse_record_state *state =
1289                 (struct db_ctdb_parse_record_state *)private_data;
1290
1291         if (db_ctdb_can_use_local_hdr(header, state->my_vnn, true)) {
1292                 /*
1293                  * A record consisting only of the ctdb header can be
1294                  * a validly created empty record or a tombstone
1295                  * record of a deleted record (not vacuumed yet). Mark
1296                  * it accordingly.
1297                  */
1298                 state->empty_record = (data.dsize == 0);
1299                 if (!state->empty_record) {
1300                         state->parser(key, data, state->private_data);
1301                 }
1302                 state->done = true;
1303         } else {
1304                 /*
1305                  * We found something in the db, so it seems that this record,
1306                  * while not usable locally right now, is popular. Ask for a
1307                  * R/O copy.
1308                  */
1309                 state->ask_for_readonly_copy = true;
1310         }
1311 }
1312
1313 static NTSTATUS db_ctdb_try_parse_local_record(struct db_ctdb_ctx *ctx,
1314                                                TDB_DATA key,
1315                                                struct db_ctdb_parse_record_state *state)
1316 {
1317         NTSTATUS status;
1318
1319         if (ctx->transaction != NULL) {
1320                 struct db_ctdb_transaction_handle *h = ctx->transaction;
1321                 bool found;
1322
1323                 /*
1324                  * Transactions only happen for persistent db's.
1325                  */
1326
1327                 found = parse_newest_in_marshall_buffer(
1328                         h->m_write, key, db_ctdb_parse_record_parser, state);
1329
1330                 if (found) {
1331                         return NT_STATUS_OK;
1332                 }
1333         }
1334
1335         if (ctx->db->persistent) {
1336                 /*
1337                  * Persistent db, but not found in the transaction buffer
1338                  */
1339                 return db_ctdb_ltdb_parse(
1340                         ctx, key, db_ctdb_parse_record_parser, state);
1341         }
1342
1343         state->done = false;
1344         state->ask_for_readonly_copy = false;
1345
1346         status = db_ctdb_ltdb_parse(
1347                 ctx, key, db_ctdb_parse_record_parser_nonpersistent, state);
1348         if (NT_STATUS_IS_OK(status) && state->done) {
1349                 if (state->empty_record) {
1350                         /*
1351                          * We know authoritatively, that this is an empty
1352                          * record. Since ctdb does not distinguish between empty
1353                          * and deleted records, this can be a record stored as
1354                          * empty or a not-yet-vacuumed tombstone record of a
1355                          * deleted record. Now Samba right now can live without
1356                          * empty records, so we can safely report this record
1357                          * as non-existing.
1358                          *
1359                          * See bugs 10008 and 12005.
1360                          */
1361                         return NT_STATUS_NOT_FOUND;
1362                 }
1363                 return NT_STATUS_OK;
1364         }
1365
1366         return NT_STATUS_MORE_PROCESSING_REQUIRED;
1367 }
1368
1369 static NTSTATUS db_ctdb_parse_record(struct db_context *db, TDB_DATA key,
1370                                      void (*parser)(TDB_DATA key,
1371                                                     TDB_DATA data,
1372                                                     void *private_data),
1373                                      void *private_data)
1374 {
1375         struct db_ctdb_ctx *ctx = talloc_get_type_abort(
1376                 db->private_data, struct db_ctdb_ctx);
1377         struct db_ctdb_parse_record_state state;
1378         NTSTATUS status;
1379         int ret;
1380
1381         state.parser = parser;
1382         state.private_data = private_data;
1383         state.my_vnn = ctdbd_vnn(ctx->conn);
1384         state.empty_record = false;
1385
1386         status = db_ctdb_try_parse_local_record(ctx, key, &state);
1387         if (!NT_STATUS_EQUAL(status, NT_STATUS_MORE_PROCESSING_REQUIRED)) {
1388                 return status;
1389         }
1390
1391         ret = ctdbd_parse(ctx->conn, ctx->db_id, key,
1392                           state.ask_for_readonly_copy, parser, private_data);
1393         if (ret != 0) {
1394                 if (ret == ENOENT) {
1395                         /*
1396                          * This maps to
1397                          * NT_STATUS_OBJECT_NAME_NOT_FOUND. Our upper
1398                          * layers expect NT_STATUS_NOT_FOUND for "no
1399                          * record around". We need to convert dbwrap
1400                          * to 0/errno away from NTSTATUS ... :-)
1401                          */
1402                         return NT_STATUS_NOT_FOUND;
1403                 }
1404                 return map_nt_error_from_unix(ret);
1405         }
1406         return NT_STATUS_OK;
1407 }
1408
1409 static void db_ctdb_parse_record_done(struct tevent_req *subreq);
1410
1411 static struct tevent_req *db_ctdb_parse_record_send(
1412         TALLOC_CTX *mem_ctx,
1413         struct tevent_context *ev,
1414         struct db_context *db,
1415         TDB_DATA key,
1416         void (*parser)(TDB_DATA key,
1417                        TDB_DATA data,
1418                        void *private_data),
1419         void *private_data,
1420         enum dbwrap_req_state *req_state)
1421 {
1422         struct db_ctdb_ctx *ctx = talloc_get_type_abort(
1423                 db->private_data, struct db_ctdb_ctx);
1424         struct tevent_req *req = NULL;
1425         struct tevent_req *subreq = NULL;
1426         struct db_ctdb_parse_record_state *state = NULL;
1427         NTSTATUS status;
1428
1429         req = tevent_req_create(mem_ctx, &state,
1430                                 struct db_ctdb_parse_record_state);
1431         if (req == NULL) {
1432                 *req_state = DBWRAP_REQ_ERROR;
1433                 return NULL;
1434
1435         }
1436
1437         *state = (struct db_ctdb_parse_record_state) {
1438                 .parser = parser,
1439                 .private_data = private_data,
1440                 .my_vnn = ctdbd_vnn(ctx->conn),
1441                 .empty_record = false,
1442         };
1443
1444         status = db_ctdb_try_parse_local_record(ctx, key, state);
1445         if (!NT_STATUS_EQUAL(status, NT_STATUS_MORE_PROCESSING_REQUIRED)) {
1446                 if (tevent_req_nterror(req, status)) {
1447                         *req_state = DBWRAP_REQ_ERROR;
1448                         return tevent_req_post(req, ev);
1449                 }
1450                 *req_state = DBWRAP_REQ_DONE;
1451                 tevent_req_done(req);
1452                 return tevent_req_post(req, ev);
1453         }
1454
1455         subreq = ctdbd_parse_send(state,
1456                                   ev,
1457                                   ctdb_async_ctx.async_conn,
1458                                   ctx->db_id,
1459                                   key,
1460                                   state->ask_for_readonly_copy,
1461                                   parser,
1462                                   private_data,
1463                                   req_state);
1464         if (tevent_req_nomem(subreq, req)) {
1465                 *req_state = DBWRAP_REQ_ERROR;
1466                 return tevent_req_post(req, ev);
1467         }
1468         tevent_req_set_callback(subreq, db_ctdb_parse_record_done, req);
1469
1470         return req;
1471 }
1472
1473 static void db_ctdb_parse_record_done(struct tevent_req *subreq)
1474 {
1475         struct tevent_req *req = tevent_req_callback_data(
1476                 subreq, struct tevent_req);
1477         int ret;
1478
1479         ret = ctdbd_parse_recv(subreq);
1480         TALLOC_FREE(subreq);
1481         if (ret != 0) {
1482                 if (ret == ENOENT) {
1483                         /*
1484                          * This maps to NT_STATUS_OBJECT_NAME_NOT_FOUND. Our
1485                          * upper layers expect NT_STATUS_NOT_FOUND for "no
1486                          * record around". We need to convert dbwrap to 0/errno
1487                          * away from NTSTATUS ... :-)
1488                          */
1489                         tevent_req_nterror(req, NT_STATUS_NOT_FOUND);
1490                         return;
1491                 }
1492                 tevent_req_nterror(req, map_nt_error_from_unix(ret));
1493                 return;
1494         }
1495
1496         tevent_req_done(req);
1497         return;
1498 }
1499
1500 static NTSTATUS db_ctdb_parse_record_recv(struct tevent_req *req)
1501 {
1502         return tevent_req_simple_recv_ntstatus(req);
1503 }
1504
1505 struct traverse_state {
1506         struct db_context *db;
1507         int (*fn)(struct db_record *rec, void *private_data);
1508         void *private_data;
1509         int count;
1510 };
1511
1512 static void traverse_callback(TDB_DATA key, TDB_DATA data, void *private_data)
1513 {
1514         struct traverse_state *state = (struct traverse_state *)private_data;
1515         struct db_record *rec;
1516         TALLOC_CTX *tmp_ctx = talloc_new(state->db);
1517         /* we have to give them a locked record to prevent races */
1518         rec = db_ctdb_fetch_locked(state->db, tmp_ctx, key);
1519         if (rec && rec->value.dsize > 0) {
1520                 state->fn(rec, state->private_data);
1521         }
1522         talloc_free(tmp_ctx);
1523 }
1524
1525 static int traverse_persistent_callback(TDB_CONTEXT *tdb, TDB_DATA kbuf, TDB_DATA dbuf,
1526                                         void *private_data)
1527 {
1528         struct traverse_state *state = (struct traverse_state *)private_data;
1529         struct db_record *rec;
1530         TALLOC_CTX *tmp_ctx = talloc_new(state->db);
1531         int ret = 0;
1532
1533         /*
1534          * Skip the __db_sequence_number__ key:
1535          * This is used for persistent transactions internally.
1536          */
1537         if (kbuf.dsize == strlen(CTDB_DB_SEQNUM_KEY) + 1 &&
1538             strcmp((const char*)kbuf.dptr, CTDB_DB_SEQNUM_KEY) == 0)
1539         {
1540                 goto done;
1541         }
1542
1543         /* we have to give them a locked record to prevent races */
1544         rec = db_ctdb_fetch_locked(state->db, tmp_ctx, kbuf);
1545         if (rec && rec->value.dsize > 0) {
1546                 ret = state->fn(rec, state->private_data);
1547         }
1548
1549 done:
1550         talloc_free(tmp_ctx);
1551         return ret;
1552 }
1553
1554 /* wrapper to use traverse_persistent_callback with dbwrap */
1555 static int traverse_persistent_callback_dbwrap(struct db_record *rec, void* data)
1556 {
1557         return traverse_persistent_callback(NULL, rec->key, rec->value, data);
1558 }
1559
1560 static int db_ctdbd_traverse(uint32_t db_id,
1561                              void (*fn)(TDB_DATA key, TDB_DATA data,
1562                                         void *private_data),
1563                              void *private_data)
1564 {
1565         struct ctdbd_connection *conn;
1566         int ret;
1567
1568         become_root();
1569         ret = ctdbd_init_connection(talloc_tos(), lp_ctdbd_socket(),
1570                                     lp_ctdb_timeout(), &conn);
1571         unbecome_root();
1572         if (ret != 0) {
1573                 DBG_WARNING("ctdbd_init_connection failed: %s\n",
1574                             strerror(ret));
1575                 return ret;
1576         }
1577
1578         ret = ctdbd_traverse(conn, db_id, fn, private_data);
1579         TALLOC_FREE(conn);
1580
1581         if (ret != 0) {
1582                 DBG_WARNING("ctdbd_traverse failed: %s\n",
1583                             strerror(ret));
1584                 return ret;
1585         }
1586
1587         return 0;
1588 }
1589
1590
1591 static int db_ctdb_traverse(struct db_context *db,
1592                             int (*fn)(struct db_record *rec,
1593                                       void *private_data),
1594                             void *private_data)
1595 {
1596         int ret;
1597         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1598                                                         struct db_ctdb_ctx);
1599         struct traverse_state state;
1600
1601         state.db = db;
1602         state.fn = fn;
1603         state.private_data = private_data;
1604         state.count = 0;
1605
1606         if (db->persistent) {
1607                 struct tdb_context *ltdb = ctx->wtdb->tdb;
1608
1609                 /* for persistent databases we don't need to do a ctdb traverse,
1610                    we can do a faster local traverse */
1611                 ret = tdb_traverse(ltdb, traverse_persistent_callback, &state);
1612                 if (ret < 0) {
1613                         return ret;
1614                 }
1615                 if (ctx->transaction && ctx->transaction->m_write) {
1616                         /*
1617                          * we now have to handle keys not yet
1618                          * present at transaction start
1619                          */
1620                         struct db_context *newkeys = db_open_rbt(talloc_tos());
1621                         struct ctdb_marshall_buffer *mbuf = ctx->transaction->m_write;
1622                         struct ctdb_rec_data_old *rec=NULL;
1623                         uint32_t i;
1624                         int count = 0;
1625                         NTSTATUS status;
1626
1627                         if (newkeys == NULL) {
1628                                 return -1;
1629                         }
1630
1631                         for (i=0; i<mbuf->count; i++) {
1632                                 TDB_DATA key;
1633                                 rec = db_ctdb_marshall_loop_next_key(
1634                                         mbuf, rec, &key);
1635                                 SMB_ASSERT(rec != NULL);
1636
1637                                 if (!tdb_exists(ltdb, key)) {
1638                                         dbwrap_store(newkeys, key, tdb_null, 0);
1639                                 }
1640                         }
1641                         status = dbwrap_traverse(newkeys,
1642                                                  traverse_persistent_callback_dbwrap,
1643                                                  &state,
1644                                                  &count);
1645                         talloc_free(newkeys);
1646                         if (!NT_STATUS_IS_OK(status)) {
1647                                 return -1;
1648                         }
1649                         ret += count;
1650                 }
1651                 return ret;
1652         }
1653
1654         ret = db_ctdbd_traverse(ctx->db_id, traverse_callback, &state);
1655         if (ret != 0) {
1656                 return -1;
1657         }
1658         return state.count;
1659 }
1660
1661 static NTSTATUS db_ctdb_store_deny(struct db_record *rec, TDB_DATA data, int flag)
1662 {
1663         return NT_STATUS_MEDIA_WRITE_PROTECTED;
1664 }
1665
1666 static NTSTATUS db_ctdb_delete_deny(struct db_record *rec)
1667 {
1668         return NT_STATUS_MEDIA_WRITE_PROTECTED;
1669 }
1670
1671 static void traverse_read_callback(TDB_DATA key, TDB_DATA data, void *private_data)
1672 {
1673         struct traverse_state *state = (struct traverse_state *)private_data;
1674         struct db_record rec;
1675
1676         ZERO_STRUCT(rec);
1677         rec.db = state->db;
1678         rec.key = key;
1679         rec.value = data;
1680         rec.store = db_ctdb_store_deny;
1681         rec.delete_rec = db_ctdb_delete_deny;
1682         rec.private_data = NULL;
1683         state->fn(&rec, state->private_data);
1684         state->count++;
1685 }
1686
1687 static int traverse_persistent_callback_read(TDB_CONTEXT *tdb, TDB_DATA kbuf, TDB_DATA dbuf,
1688                                         void *private_data)
1689 {
1690         struct traverse_state *state = (struct traverse_state *)private_data;
1691         struct db_record rec;
1692
1693         /*
1694          * Skip the __db_sequence_number__ key:
1695          * This is used for persistent transactions internally.
1696          */
1697         if (kbuf.dsize == strlen(CTDB_DB_SEQNUM_KEY) + 1 &&
1698             strcmp((const char*)kbuf.dptr, CTDB_DB_SEQNUM_KEY) == 0)
1699         {
1700                 return 0;
1701         }
1702
1703         ZERO_STRUCT(rec);
1704         rec.db = state->db;
1705         rec.key = kbuf;
1706         rec.value = dbuf;
1707         rec.store = db_ctdb_store_deny;
1708         rec.delete_rec = db_ctdb_delete_deny;
1709         rec.private_data = NULL;
1710
1711         if (rec.value.dsize <= sizeof(struct ctdb_ltdb_header)) {
1712                 /* a deleted record */
1713                 return 0;
1714         }
1715         rec.value.dsize -= sizeof(struct ctdb_ltdb_header);
1716         rec.value.dptr += sizeof(struct ctdb_ltdb_header);
1717
1718         state->count++;
1719         return state->fn(&rec, state->private_data);
1720 }
1721
1722 static int db_ctdb_traverse_read(struct db_context *db,
1723                                  int (*fn)(struct db_record *rec,
1724                                            void *private_data),
1725                                  void *private_data)
1726 {
1727         int ret;
1728         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1729                                                         struct db_ctdb_ctx);
1730         struct traverse_state state;
1731
1732         state.db = db;
1733         state.fn = fn;
1734         state.private_data = private_data;
1735         state.count = 0;
1736
1737         if (db->persistent) {
1738                 /* for persistent databases we don't need to do a ctdb traverse,
1739                    we can do a faster local traverse */
1740                 return tdb_traverse_read(ctx->wtdb->tdb, traverse_persistent_callback_read, &state);
1741         }
1742
1743         ret = db_ctdbd_traverse(ctx->db_id, traverse_read_callback, &state);
1744         if (ret != 0) {
1745                 return -1;
1746         }
1747         return state.count;
1748 }
1749
1750 static int db_ctdb_get_seqnum(struct db_context *db)
1751 {
1752         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1753                                                         struct db_ctdb_ctx);
1754         return tdb_get_seqnum(ctx->wtdb->tdb);
1755 }
1756
1757 static size_t db_ctdb_id(struct db_context *db, uint8_t *id, size_t idlen)
1758 {
1759         struct db_ctdb_ctx *ctx = talloc_get_type_abort(
1760                 db->private_data, struct db_ctdb_ctx);
1761
1762         if (idlen >= sizeof(ctx->db_id)) {
1763                 memcpy(id, &ctx->db_id, sizeof(ctx->db_id));
1764         }
1765
1766         return sizeof(ctx->db_id);
1767 }
1768
1769 struct db_context *db_open_ctdb(TALLOC_CTX *mem_ctx,
1770                                 struct messaging_context *msg_ctx,
1771                                 struct ctdbd_connection *conn,
1772                                 const char *name,
1773                                 int hash_size, int tdb_flags,
1774                                 int open_flags, mode_t mode,
1775                                 enum dbwrap_lock_order lock_order,
1776                                 uint64_t dbwrap_flags)
1777 {
1778         struct db_context *result;
1779         struct db_ctdb_ctx *db_ctdb;
1780         char *db_path;
1781         struct loadparm_context *lp_ctx;
1782         TDB_DATA data;
1783         int32_t cstatus;
1784         int ret;
1785
1786         if (!lp_clustering()) {
1787                 DEBUG(10, ("Clustering disabled -- no ctdb\n"));
1788                 return NULL;
1789         }
1790
1791         if (!(result = talloc_zero(mem_ctx, struct db_context))) {
1792                 DEBUG(0, ("talloc failed\n"));
1793                 TALLOC_FREE(result);
1794                 return NULL;
1795         }
1796
1797         if (!(db_ctdb = talloc(result, struct db_ctdb_ctx))) {
1798                 DEBUG(0, ("talloc failed\n"));
1799                 TALLOC_FREE(result);
1800                 return NULL;
1801         }
1802
1803         result->name = talloc_strdup(result, name);
1804         if (result->name == NULL) {
1805                 DEBUG(0, ("talloc failed\n"));
1806                 TALLOC_FREE(result);
1807                 return NULL;
1808         }
1809
1810         db_ctdb->transaction = NULL;
1811         db_ctdb->db = result;
1812         db_ctdb->conn = conn;
1813
1814         ret = ctdbd_db_attach(db_ctdb->conn, name, &db_ctdb->db_id, tdb_flags);
1815         if (ret != 0) {
1816                 DEBUG(0, ("ctdbd_db_attach failed for %s: %s\n", name,
1817                           strerror(ret)));
1818                 TALLOC_FREE(result);
1819                 return NULL;
1820         }
1821
1822         if (tdb_flags & TDB_SEQNUM) {
1823                 data.dptr = (uint8_t *)&db_ctdb->db_id;
1824                 data.dsize = sizeof(db_ctdb->db_id);
1825
1826                 ret = ctdbd_control_local(conn, CTDB_CONTROL_ENABLE_SEQNUM,
1827                                           0, 0, data,
1828                                           NULL, NULL, &cstatus);
1829                 if ((ret != 0) || cstatus != 0) {
1830                         DBG_ERR("ctdb_control for enable seqnum "
1831                                 "failed: %s\n", strerror(ret));
1832                         TALLOC_FREE(result);
1833                         return NULL;
1834                 }
1835         }
1836
1837         db_path = ctdbd_dbpath(db_ctdb->conn, db_ctdb, db_ctdb->db_id);
1838
1839         result->persistent = ((tdb_flags & TDB_CLEAR_IF_FIRST) == 0);
1840         result->lock_order = lock_order;
1841
1842         /* only pass through specific flags */
1843         tdb_flags &= TDB_SEQNUM|TDB_VOLATILE|
1844                 TDB_MUTEX_LOCKING|TDB_CLEAR_IF_FIRST;
1845
1846         if (!result->persistent) {
1847                 ret = ctdb_async_ctx_init(NULL, messaging_tevent_context(msg_ctx));
1848                 if (ret != 0) {
1849                         DBG_ERR("ctdb_async_ctx_init failed: %s\n", strerror(ret));
1850                         TALLOC_FREE(result);
1851                         return NULL;
1852                 }
1853         }
1854
1855         if (!result->persistent &&
1856             (dbwrap_flags & DBWRAP_FLAG_OPTIMIZE_READONLY_ACCESS))
1857         {
1858                 TDB_DATA indata;
1859
1860                 indata = make_tdb_data((uint8_t *)&db_ctdb->db_id,
1861                                        sizeof(db_ctdb->db_id));
1862
1863                 ret = ctdbd_control_local(
1864                         db_ctdb->conn, CTDB_CONTROL_SET_DB_READONLY, 0, 0,
1865                         indata, NULL, NULL, &cstatus);
1866                 if ((ret != 0) || (cstatus != 0)) {
1867                         DEBUG(1, ("CTDB_CONTROL_SET_DB_READONLY failed: "
1868                                   "%s, %"PRIi32"\n", strerror(ret), cstatus));
1869                         TALLOC_FREE(result);
1870                         return NULL;
1871                 }
1872         }
1873
1874         lp_ctx = loadparm_init_s3(db_path, loadparm_s3_helpers());
1875
1876         if (hash_size == 0) {
1877                 hash_size = lpcfg_tdb_hash_size(lp_ctx, db_path);
1878         }
1879
1880         db_ctdb->wtdb = tdb_wrap_open(db_ctdb, db_path, hash_size,
1881                                       lpcfg_tdb_flags(lp_ctx, tdb_flags),
1882                                       O_RDWR, 0);
1883         talloc_unlink(db_path, lp_ctx);
1884         if (db_ctdb->wtdb == NULL) {
1885                 DEBUG(0, ("Could not open tdb %s: %s\n", db_path, strerror(errno)));
1886                 TALLOC_FREE(result);
1887                 return NULL;
1888         }
1889         talloc_free(db_path);
1890
1891         /* honor permissions if user has specified O_CREAT */
1892         if (open_flags & O_CREAT) {
1893                 int fd;
1894                 fd = tdb_fd(db_ctdb->wtdb->tdb);
1895                 ret = fchmod(fd, mode);
1896                 if (ret == -1) {
1897                         DBG_WARNING("fchmod failed: %s\n",
1898                                     strerror(errno));
1899                         TALLOC_FREE(result);
1900                         return NULL;
1901                 }
1902         }
1903
1904         if (result->persistent) {
1905                 db_ctdb->lock_ctx = g_lock_ctx_init(db_ctdb, msg_ctx);
1906                 if (db_ctdb->lock_ctx == NULL) {
1907                         DEBUG(0, ("g_lock_ctx_init failed\n"));
1908                         TALLOC_FREE(result);
1909                         return NULL;
1910                 }
1911         }
1912
1913         db_ctdb->warn_unlock_msecs = lp_parm_int(-1, "ctdb",
1914                                                  "unlock_warn_threshold", 5);
1915         db_ctdb->warn_migrate_attempts = lp_parm_int(-1, "ctdb",
1916                                                      "migrate_attempts", 10);
1917         db_ctdb->warn_migrate_msecs = lp_parm_int(-1, "ctdb",
1918                                                   "migrate_duration", 5000);
1919         db_ctdb->warn_locktime_msecs = lp_ctdb_locktime_warn_threshold();
1920
1921         result->private_data = (void *)db_ctdb;
1922         result->fetch_locked = db_ctdb_fetch_locked;
1923         result->try_fetch_locked = db_ctdb_try_fetch_locked;
1924         result->parse_record = db_ctdb_parse_record;
1925         result->parse_record_send = db_ctdb_parse_record_send;
1926         result->parse_record_recv = db_ctdb_parse_record_recv;
1927         result->traverse = db_ctdb_traverse;
1928         result->traverse_read = db_ctdb_traverse_read;
1929         result->get_seqnum = db_ctdb_get_seqnum;
1930         result->transaction_start = db_ctdb_transaction_start;
1931         result->transaction_commit = db_ctdb_transaction_commit;
1932         result->transaction_cancel = db_ctdb_transaction_cancel;
1933         result->id = db_ctdb_id;
1934
1935         DEBUG(3,("db_open_ctdb: opened database '%s' with dbid 0x%x\n",
1936                  name, db_ctdb->db_id));
1937
1938         return result;
1939 }