s3:dbwrap_ctdb: fix some function header comments
[metze/samba/wip.git] / source3 / lib / dbwrap_ctdb.c
1 /* 
2    Unix SMB/CIFS implementation.
3    Database interface wrapper around ctdbd
4    Copyright (C) Volker Lendecke 2007
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15
16    You should have received a copy of the GNU General Public License
17    along with this program.  If not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #ifdef CLUSTER_SUPPORT
22 #include "ctdb.h"
23 #include "ctdb_private.h"
24 #include "ctdbd_conn.h"
25
26 struct db_ctdb_transaction_handle {
27         struct db_ctdb_ctx *ctx;
28         bool in_replay;
29         /* we store the reads and writes done under a transaction one
30            list stores both reads and writes, the other just writes
31         */
32         struct ctdb_marshall_buffer *m_all;
33         struct ctdb_marshall_buffer *m_write;
34         uint32_t nesting;
35         bool nested_cancel;
36 };
37
38 struct db_ctdb_ctx {
39         struct db_context *db;
40         struct tdb_wrap *wtdb;
41         uint32 db_id;
42         struct db_ctdb_transaction_handle *transaction;
43 };
44
45 struct db_ctdb_rec {
46         struct db_ctdb_ctx *ctdb_ctx;
47         struct ctdb_ltdb_header header;
48 };
49
50 static struct db_record *fetch_locked_internal(struct db_ctdb_ctx *ctx,
51                                                TALLOC_CTX *mem_ctx,
52                                                TDB_DATA key,
53                                                bool persistent);
54
55 static NTSTATUS tdb_error_to_ntstatus(struct tdb_context *tdb)
56 {
57         NTSTATUS status;
58         enum TDB_ERROR tret = tdb_error(tdb);
59
60         switch (tret) {
61         case TDB_ERR_EXISTS:
62                 status = NT_STATUS_OBJECT_NAME_COLLISION;
63                 break;
64         case TDB_ERR_NOEXIST:
65                 status = NT_STATUS_OBJECT_NAME_NOT_FOUND;
66                 break;
67         default:
68                 status = NT_STATUS_INTERNAL_DB_CORRUPTION;
69                 break;
70         }
71
72         return status;
73 }
74
75
76
77 /*
78   form a ctdb_rec_data record from a key/data pair
79
80   note that header may be NULL. If not NULL then it is included in the data portion
81   of the record
82  */
83 static struct ctdb_rec_data *db_ctdb_marshall_record(TALLOC_CTX *mem_ctx, uint32_t reqid,       
84                                                   TDB_DATA key, 
85                                                   struct ctdb_ltdb_header *header,
86                                                   TDB_DATA data)
87 {
88         size_t length;
89         struct ctdb_rec_data *d;
90
91         length = offsetof(struct ctdb_rec_data, data) + key.dsize + 
92                 data.dsize + (header?sizeof(*header):0);
93         d = (struct ctdb_rec_data *)talloc_size(mem_ctx, length);
94         if (d == NULL) {
95                 return NULL;
96         }
97         d->length = length;
98         d->reqid = reqid;
99         d->keylen = key.dsize;
100         memcpy(&d->data[0], key.dptr, key.dsize);
101         if (header) {
102                 d->datalen = data.dsize + sizeof(*header);
103                 memcpy(&d->data[key.dsize], header, sizeof(*header));
104                 memcpy(&d->data[key.dsize+sizeof(*header)], data.dptr, data.dsize);
105         } else {
106                 d->datalen = data.dsize;
107                 memcpy(&d->data[key.dsize], data.dptr, data.dsize);
108         }
109         return d;
110 }
111
112
113 /* helper function for marshalling multiple records */
114 static struct ctdb_marshall_buffer *db_ctdb_marshall_add(TALLOC_CTX *mem_ctx, 
115                                                struct ctdb_marshall_buffer *m,
116                                                uint64_t db_id,
117                                                uint32_t reqid,
118                                                TDB_DATA key,
119                                                struct ctdb_ltdb_header *header,
120                                                TDB_DATA data)
121 {
122         struct ctdb_rec_data *r;
123         size_t m_size, r_size;
124         struct ctdb_marshall_buffer *m2 = NULL;
125
126         r = db_ctdb_marshall_record(talloc_tos(), reqid, key, header, data);
127         if (r == NULL) {
128                 talloc_free(m);
129                 return NULL;
130         }
131
132         if (m == NULL) {
133                 m = (struct ctdb_marshall_buffer *)talloc_zero_size(
134                         mem_ctx, offsetof(struct ctdb_marshall_buffer, data));
135                 if (m == NULL) {
136                         goto done;
137                 }
138                 m->db_id = db_id;
139         }
140
141         m_size = talloc_get_size(m);
142         r_size = talloc_get_size(r);
143
144         m2 = (struct ctdb_marshall_buffer *)talloc_realloc_size(
145                 mem_ctx, m,  m_size + r_size);
146         if (m2 == NULL) {
147                 talloc_free(m);
148                 goto done;
149         }
150
151         memcpy(m_size + (uint8_t *)m2, r, r_size);
152
153         m2->count++;
154
155 done:
156         talloc_free(r);
157         return m2;
158 }
159
160 /* we've finished marshalling, return a data blob with the marshalled records */
161 static TDB_DATA db_ctdb_marshall_finish(struct ctdb_marshall_buffer *m)
162 {
163         TDB_DATA data;
164         data.dptr = (uint8_t *)m;
165         data.dsize = talloc_get_size(m);
166         return data;
167 }
168
169 /* 
170    loop over a marshalling buffer 
171
172      - pass r==NULL to start
173      - loop the number of times indicated by m->count
174 */
175 static struct ctdb_rec_data *db_ctdb_marshall_loop_next(struct ctdb_marshall_buffer *m, struct ctdb_rec_data *r,
176                                                      uint32_t *reqid,
177                                                      struct ctdb_ltdb_header *header,
178                                                      TDB_DATA *key, TDB_DATA *data)
179 {
180         if (r == NULL) {
181                 r = (struct ctdb_rec_data *)&m->data[0];
182         } else {
183                 r = (struct ctdb_rec_data *)(r->length + (uint8_t *)r);
184         }
185
186         if (reqid != NULL) {
187                 *reqid = r->reqid;
188         }
189
190         if (key != NULL) {
191                 key->dptr   = &r->data[0];
192                 key->dsize  = r->keylen;
193         }
194         if (data != NULL) {
195                 data->dptr  = &r->data[r->keylen];
196                 data->dsize = r->datalen;
197                 if (header != NULL) {
198                         data->dptr += sizeof(*header);
199                         data->dsize -= sizeof(*header);
200                 }
201         }
202
203         if (header != NULL) {
204                 if (r->datalen < sizeof(*header)) {
205                         return NULL;
206                 }
207                 *header = *(struct ctdb_ltdb_header *)&r->data[r->keylen];
208         }
209
210         return r;
211 }
212
213
214
215 /**
216  * CTDB transaction destructor
217  */
218 static int db_ctdb_transaction_destructor(struct db_ctdb_transaction_handle *h)
219 {
220         tdb_transaction_cancel(h->ctx->wtdb->tdb);
221         return 0;
222 }
223
224 /**
225  * start a transaction on a ctdb database:
226  * - lock the transaction lock key
227  * - start the tdb transaction
228  */
229 static int db_ctdb_transaction_fetch_start(struct db_ctdb_transaction_handle *h)
230 {
231         struct db_record *rh;
232         TDB_DATA key;
233         TALLOC_CTX *tmp_ctx;
234         const char *keyname = CTDB_TRANSACTION_LOCK_KEY;
235         int ret;
236         struct db_ctdb_ctx *ctx = h->ctx;
237         TDB_DATA data;
238
239         key.dptr = (uint8_t *)discard_const(keyname);
240         key.dsize = strlen(keyname);
241
242 again:
243         tmp_ctx = talloc_new(h);
244
245         rh = fetch_locked_internal(ctx, tmp_ctx, key, true);
246         if (rh == NULL) {
247                 DEBUG(0,(__location__ " Failed to fetch_lock database\n"));             
248                 talloc_free(tmp_ctx);
249                 return -1;
250         }
251         talloc_free(rh);
252
253         ret = tdb_transaction_start(ctx->wtdb->tdb);
254         if (ret != 0) {
255                 DEBUG(0,(__location__ " Failed to start tdb transaction\n"));
256                 talloc_free(tmp_ctx);
257                 return -1;
258         }
259
260         data = tdb_fetch(ctx->wtdb->tdb, key);
261         if ((data.dptr == NULL) ||
262             (data.dsize < sizeof(struct ctdb_ltdb_header)) ||
263             ((struct ctdb_ltdb_header *)data.dptr)->dmaster != get_my_vnn()) {
264                 SAFE_FREE(data.dptr);
265                 tdb_transaction_cancel(ctx->wtdb->tdb);
266                 talloc_free(tmp_ctx);
267                 goto again;
268         }
269
270         SAFE_FREE(data.dptr);
271         talloc_free(tmp_ctx);
272
273         return 0;
274 }
275
276
277 /**
278  * CTDB dbwrap API: transaction_start function
279  * starts a transaction on a persistent database
280  */
281 static int db_ctdb_transaction_start(struct db_context *db)
282 {
283         struct db_ctdb_transaction_handle *h;
284         int ret;
285         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
286                                                         struct db_ctdb_ctx);
287
288         if (!db->persistent) {
289                 DEBUG(0,("transactions not supported on non-persistent database 0x%08x\n", 
290                          ctx->db_id));
291                 return -1;
292         }
293
294         if (ctx->transaction) {
295                 ctx->transaction->nesting++;
296                 return 0;
297         }
298
299         h = talloc_zero(db, struct db_ctdb_transaction_handle);
300         if (h == NULL) {
301                 DEBUG(0,(__location__ " oom for transaction handle\n"));                
302                 return -1;
303         }
304
305         h->ctx = ctx;
306
307         ret = db_ctdb_transaction_fetch_start(h);
308         if (ret != 0) {
309                 talloc_free(h);
310                 return -1;
311         }
312
313         talloc_set_destructor(h, db_ctdb_transaction_destructor);
314
315         ctx->transaction = h;
316
317         DEBUG(5,(__location__ " Started transaction on db 0x%08x\n", ctx->db_id));
318
319         return 0;
320 }
321
322
323
324 /*
325   fetch a record inside a transaction
326  */
327 static int db_ctdb_transaction_fetch(struct db_ctdb_ctx *db, 
328                                      TALLOC_CTX *mem_ctx, 
329                                      TDB_DATA key, TDB_DATA *data)
330 {
331         struct db_ctdb_transaction_handle *h = db->transaction;
332
333         *data = tdb_fetch(h->ctx->wtdb->tdb, key);
334
335         if (data->dptr != NULL) {
336                 uint8_t *oldptr = (uint8_t *)data->dptr;
337                 data->dsize -= sizeof(struct ctdb_ltdb_header);
338                 if (data->dsize == 0) {
339                         data->dptr = NULL;
340                 } else {
341                         data->dptr = (uint8 *)
342                                 talloc_memdup(
343                                         mem_ctx, data->dptr+sizeof(struct ctdb_ltdb_header),
344                                         data->dsize);
345                 }
346                 SAFE_FREE(oldptr);
347                 if (data->dptr == NULL && data->dsize != 0) {
348                         return -1;
349                 }
350         }
351
352         if (!h->in_replay) {
353                 h->m_all = db_ctdb_marshall_add(h, h->m_all, h->ctx->db_id, 1, key, NULL, *data);
354                 if (h->m_all == NULL) {
355                         DEBUG(0,(__location__ " Failed to add to marshalling record\n"));
356                         data->dsize = 0;
357                         talloc_free(data->dptr);
358                         return -1;
359                 }
360         }
361
362         return 0;
363 }
364
365
366 static NTSTATUS db_ctdb_store_transaction(struct db_record *rec, TDB_DATA data, int flag);
367 static NTSTATUS db_ctdb_delete_transaction(struct db_record *rec);
368
369 static struct db_record *db_ctdb_fetch_locked_transaction(struct db_ctdb_ctx *ctx,
370                                                           TALLOC_CTX *mem_ctx,
371                                                           TDB_DATA key)
372 {
373         struct db_record *result;
374         TDB_DATA ctdb_data;
375
376         if (!(result = talloc(mem_ctx, struct db_record))) {
377                 DEBUG(0, ("talloc failed\n"));
378                 return NULL;
379         }
380
381         result->private_data = ctx->transaction;
382
383         result->key.dsize = key.dsize;
384         result->key.dptr = (uint8 *)talloc_memdup(result, key.dptr, key.dsize);
385         if (result->key.dptr == NULL) {
386                 DEBUG(0, ("talloc failed\n"));
387                 TALLOC_FREE(result);
388                 return NULL;
389         }
390
391         result->store = db_ctdb_store_transaction;
392         result->delete_rec = db_ctdb_delete_transaction;
393
394         ctdb_data = tdb_fetch(ctx->wtdb->tdb, key);
395         if (ctdb_data.dptr == NULL) {
396                 /* create the record */
397                 result->value = tdb_null;
398                 return result;
399         }
400
401         result->value.dsize = ctdb_data.dsize - sizeof(struct ctdb_ltdb_header);
402         result->value.dptr = NULL;
403
404         if ((result->value.dsize != 0)
405             && !(result->value.dptr = (uint8 *)talloc_memdup(
406                          result, ctdb_data.dptr + sizeof(struct ctdb_ltdb_header),
407                          result->value.dsize))) {
408                 DEBUG(0, ("talloc failed\n"));
409                 TALLOC_FREE(result);
410         }
411
412         SAFE_FREE(ctdb_data.dptr);
413
414         return result;
415 }
416
417 static int db_ctdb_record_destructor(struct db_record **recp)
418 {
419         struct db_record *rec = talloc_get_type_abort(*recp, struct db_record);
420         struct db_ctdb_transaction_handle *h = talloc_get_type_abort(
421                 rec->private_data, struct db_ctdb_transaction_handle);
422         int ret = h->ctx->db->transaction_commit(h->ctx->db);
423         if (ret != 0) {
424                 DEBUG(0,(__location__ " transaction_commit failed\n"));
425         }
426         return 0;
427 }
428
429 /*
430   auto-create a transaction for persistent databases
431  */
432 static struct db_record *db_ctdb_fetch_locked_persistent(struct db_ctdb_ctx *ctx,
433                                                          TALLOC_CTX *mem_ctx,
434                                                          TDB_DATA key)
435 {
436         int res;
437         struct db_record *rec, **recp;
438
439         res = db_ctdb_transaction_start(ctx->db);
440         if (res == -1) {
441                 return NULL;
442         }
443
444         rec = db_ctdb_fetch_locked_transaction(ctx, mem_ctx, key);
445         if (rec == NULL) {
446                 ctx->db->transaction_cancel(ctx->db);           
447                 return NULL;
448         }
449
450         /* destroy this transaction when we release the lock */
451         recp = talloc(rec, struct db_record *);
452         if (recp == NULL) {
453                 ctx->db->transaction_cancel(ctx->db);
454                 talloc_free(rec);
455                 return NULL;
456         }
457         *recp = rec;
458         talloc_set_destructor(recp, db_ctdb_record_destructor);
459         return rec;
460 }
461
462
463 /*
464   stores a record inside a transaction
465  */
466 static int db_ctdb_transaction_store(struct db_ctdb_transaction_handle *h, 
467                                      TDB_DATA key, TDB_DATA data)
468 {
469         TALLOC_CTX *tmp_ctx = talloc_new(h);
470         int ret;
471         TDB_DATA rec;
472         struct ctdb_ltdb_header header;
473
474         /* we need the header so we can update the RSN */
475         rec = tdb_fetch(h->ctx->wtdb->tdb, key);
476         if (rec.dptr == NULL) {
477                 /* the record doesn't exist - create one with us as dmaster.
478                    This is only safe because we are in a transaction and this
479                    is a persistent database */
480                 ZERO_STRUCT(header);
481         } else {
482                 memcpy(&header, rec.dptr, sizeof(struct ctdb_ltdb_header));
483                 rec.dsize -= sizeof(struct ctdb_ltdb_header);
484                 /* a special case, we are writing the same data that is there now */
485                 if (data.dsize == rec.dsize &&
486                     memcmp(data.dptr, rec.dptr + sizeof(struct ctdb_ltdb_header), data.dsize) == 0) {
487                         SAFE_FREE(rec.dptr);
488                         talloc_free(tmp_ctx);
489                         return 0;
490                 }
491                 SAFE_FREE(rec.dptr);
492         }
493
494         header.dmaster = get_my_vnn();
495         header.rsn++;
496
497         if (!h->in_replay) {
498                 h->m_all = db_ctdb_marshall_add(h, h->m_all, h->ctx->db_id, 0, key, NULL, data);
499                 if (h->m_all == NULL) {
500                         DEBUG(0,(__location__ " Failed to add to marshalling record\n"));
501                         talloc_free(tmp_ctx);
502                         return -1;
503                 }
504         }
505
506         h->m_write = db_ctdb_marshall_add(h, h->m_write, h->ctx->db_id, 0, key, &header, data);
507         if (h->m_write == NULL) {
508                 DEBUG(0,(__location__ " Failed to add to marshalling record\n"));
509                 talloc_free(tmp_ctx);
510                 return -1;
511         }
512
513         rec.dsize = data.dsize + sizeof(struct ctdb_ltdb_header);
514         rec.dptr = (uint8_t *)talloc_size(tmp_ctx, rec.dsize);
515         if (rec.dptr == NULL) {
516                 DEBUG(0,(__location__ " Failed to alloc record\n"));
517                 talloc_free(tmp_ctx);
518                 return -1;
519         }
520         memcpy(rec.dptr, &header, sizeof(struct ctdb_ltdb_header));
521         memcpy(sizeof(struct ctdb_ltdb_header) + (uint8_t *)rec.dptr, data.dptr, data.dsize);
522
523         ret = tdb_store(h->ctx->wtdb->tdb, key, rec, TDB_REPLACE);
524
525         talloc_free(tmp_ctx);
526
527         return ret;
528 }
529
530
531 /* 
532    a record store inside a transaction
533  */
534 static NTSTATUS db_ctdb_store_transaction(struct db_record *rec, TDB_DATA data, int flag)
535 {
536         struct db_ctdb_transaction_handle *h = talloc_get_type_abort(
537                 rec->private_data, struct db_ctdb_transaction_handle);
538         int ret;
539
540         ret = db_ctdb_transaction_store(h, rec->key, data);
541         if (ret != 0) {
542                 return tdb_error_to_ntstatus(h->ctx->wtdb->tdb);
543         }
544         return NT_STATUS_OK;
545 }
546
547 /* 
548    a record delete inside a transaction
549  */
550 static NTSTATUS db_ctdb_delete_transaction(struct db_record *rec)
551 {
552         struct db_ctdb_transaction_handle *h = talloc_get_type_abort(
553                 rec->private_data, struct db_ctdb_transaction_handle);
554         int ret;
555
556         ret = db_ctdb_transaction_store(h, rec->key, tdb_null);
557         if (ret != 0) {
558                 return tdb_error_to_ntstatus(h->ctx->wtdb->tdb);
559         }
560         return NT_STATUS_OK;
561 }
562
563
564 /*
565   replay a transaction
566  */
567 static int ctdb_replay_transaction(struct db_ctdb_transaction_handle *h)
568 {
569         int ret, i;
570         struct ctdb_rec_data *rec = NULL;
571
572         h->in_replay = true;
573         talloc_free(h->m_write);
574         h->m_write = NULL;
575
576         ret = db_ctdb_transaction_fetch_start(h);
577         if (ret != 0) {
578                 return ret;
579         }
580
581         for (i=0;i<h->m_all->count;i++) {
582                 TDB_DATA key, data;
583
584                 rec = db_ctdb_marshall_loop_next(h->m_all, rec, NULL, NULL, &key, &data);
585                 if (rec == NULL) {
586                         DEBUG(0, (__location__ " Out of records in ctdb_replay_transaction?\n"));
587                         goto failed;
588                 }
589
590                 if (rec->reqid == 0) {
591                         /* its a store */
592                         if (db_ctdb_transaction_store(h, key, data) != 0) {
593                                 goto failed;
594                         }
595                 } else {
596                         TDB_DATA data2;
597                         TALLOC_CTX *tmp_ctx = talloc_new(h);
598
599                         if (db_ctdb_transaction_fetch(h->ctx, tmp_ctx, key, &data2) != 0) {
600                                 talloc_free(tmp_ctx);
601                                 goto failed;
602                         }
603                         if (data2.dsize != data.dsize ||
604                             memcmp(data2.dptr, data.dptr, data.dsize) != 0) {
605                                 /* the record has changed on us - we have to give up */
606                                 talloc_free(tmp_ctx);
607                                 goto failed;
608                         }
609                         talloc_free(tmp_ctx);
610                 }
611         }
612
613         return 0;
614
615 failed:
616         tdb_transaction_cancel(h->ctx->wtdb->tdb);
617         return -1;
618 }
619
620
621 /*
622   commit a transaction
623  */
624 static int db_ctdb_transaction_commit(struct db_context *db)
625 {
626         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
627                                                         struct db_ctdb_ctx);
628         NTSTATUS rets;
629         int ret;
630         int status;
631         int retries = 0;
632         struct db_ctdb_transaction_handle *h = ctx->transaction;
633         enum ctdb_controls failure_control = CTDB_CONTROL_TRANS2_ERROR;
634
635         if (h == NULL) {
636                 DEBUG(0,(__location__ " transaction commit with no open transaction on db 0x%08x\n", ctx->db_id));
637                 return -1;
638         }
639
640         if (h->nested_cancel) {
641                 db->transaction_cancel(db);
642                 DEBUG(5,(__location__ " Failed transaction commit after nested cancel\n"));
643                 return -1;
644         }
645
646         if (h->nesting != 0) {
647                 h->nesting--;
648                 return 0;
649         }
650
651         DEBUG(5,(__location__ " Commit transaction on db 0x%08x\n", ctx->db_id));
652
653         talloc_set_destructor(h, NULL);
654
655         /* our commit strategy is quite complex.
656
657            - we first try to commit the changes to all other nodes
658
659            - if that works, then we commit locally and we are done
660
661            - if a commit on another node fails, then we need to cancel
662              the transaction, then restart the transaction (thus
663              opening a window of time for a pending recovery to
664              complete), then replay the transaction, checking all the
665              reads and writes (checking that reads give the same data,
666              and writes succeed). Then we retry the transaction to the
667              other nodes
668         */
669
670 again:
671         if (h->m_write == NULL) {
672                 /* no changes were made, potentially after a retry */
673                 tdb_transaction_cancel(h->ctx->wtdb->tdb);
674                 talloc_free(h);
675                 ctx->transaction = NULL;
676                 return 0;
677         }
678
679         /* tell ctdbd to commit to the other nodes */
680         rets = ctdbd_control_local(messaging_ctdbd_connection(), 
681                                    retries==0?CTDB_CONTROL_TRANS2_COMMIT:CTDB_CONTROL_TRANS2_COMMIT_RETRY, 
682                                    h->ctx->db_id, 0,
683                                    db_ctdb_marshall_finish(h->m_write), NULL, NULL, &status);
684         if (!NT_STATUS_IS_OK(rets) || status != 0) {
685                 tdb_transaction_cancel(h->ctx->wtdb->tdb);
686                 sleep(1);
687
688                 if (!NT_STATUS_IS_OK(rets)) {
689                         failure_control = CTDB_CONTROL_TRANS2_ERROR;                    
690                 } else {
691                         /* work out what error code we will give if we 
692                            have to fail the operation */
693                         switch ((enum ctdb_trans2_commit_error)status) {
694                         case CTDB_TRANS2_COMMIT_SUCCESS:
695                         case CTDB_TRANS2_COMMIT_SOMEFAIL:
696                         case CTDB_TRANS2_COMMIT_TIMEOUT:
697                                 failure_control = CTDB_CONTROL_TRANS2_ERROR;
698                                 break;
699                         case CTDB_TRANS2_COMMIT_ALLFAIL:
700                                 failure_control = CTDB_CONTROL_TRANS2_FINISHED;
701                                 break;
702                         }
703                 }
704
705                 if (++retries == 5) {
706                         DEBUG(0,(__location__ " Giving up transaction on db 0x%08x after %d retries failure_control=%u\n", 
707                                  h->ctx->db_id, retries, (unsigned)failure_control));
708                         ctdbd_control_local(messaging_ctdbd_connection(), failure_control,
709                                             h->ctx->db_id, CTDB_CTRL_FLAG_NOREPLY, 
710                                             tdb_null, NULL, NULL, NULL);
711                         h->ctx->transaction = NULL;
712                         talloc_free(h);
713                         ctx->transaction = NULL;
714                         return -1;                      
715                 }
716
717                 if (ctdb_replay_transaction(h) != 0) {
718                         DEBUG(0,(__location__ " Failed to replay transaction failure_control=%u\n",
719                                  (unsigned)failure_control));
720                         ctdbd_control_local(messaging_ctdbd_connection(), failure_control,
721                                             h->ctx->db_id, CTDB_CTRL_FLAG_NOREPLY, 
722                                             tdb_null, NULL, NULL, NULL);
723                         h->ctx->transaction = NULL;
724                         talloc_free(h);
725                         ctx->transaction = NULL;
726                         return -1;
727                 }
728                 goto again;
729         } else {
730                 failure_control = CTDB_CONTROL_TRANS2_ERROR;
731         }
732
733         /* do the real commit locally */
734         ret = tdb_transaction_commit(h->ctx->wtdb->tdb);
735         if (ret != 0) {
736                 DEBUG(0,(__location__ " Failed to commit transaction failure_control=%u\n",
737                          (unsigned)failure_control));
738                 ctdbd_control_local(messaging_ctdbd_connection(), failure_control, h->ctx->db_id, 
739                                     CTDB_CTRL_FLAG_NOREPLY, tdb_null, NULL, NULL, NULL);
740                 h->ctx->transaction = NULL;
741                 talloc_free(h);
742                 return ret;
743         }
744
745         /* tell ctdbd that we are finished with our local commit */
746         ctdbd_control_local(messaging_ctdbd_connection(), CTDB_CONTROL_TRANS2_FINISHED, 
747                             h->ctx->db_id, CTDB_CTRL_FLAG_NOREPLY, 
748                             tdb_null, NULL, NULL, NULL);
749         h->ctx->transaction = NULL;
750         talloc_free(h);
751         return 0;
752 }
753
754
755 /*
756   cancel a transaction
757  */
758 static int db_ctdb_transaction_cancel(struct db_context *db)
759 {
760         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
761                                                         struct db_ctdb_ctx);
762         struct db_ctdb_transaction_handle *h = ctx->transaction;
763
764         if (h == NULL) {
765                 DEBUG(0,(__location__ " transaction cancel with no open transaction on db 0x%08x\n", ctx->db_id));
766                 return -1;
767         }
768
769         if (h->nesting != 0) {
770                 h->nesting--;
771                 h->nested_cancel = true;
772                 return 0;
773         }
774
775         DEBUG(5,(__location__ " Cancel transaction on db 0x%08x\n", ctx->db_id));
776
777         ctx->transaction = NULL;
778         talloc_free(h);
779         return 0;
780 }
781
782
783 static NTSTATUS db_ctdb_store(struct db_record *rec, TDB_DATA data, int flag)
784 {
785         struct db_ctdb_rec *crec = talloc_get_type_abort(
786                 rec->private_data, struct db_ctdb_rec);
787         TDB_DATA cdata;
788         int ret;
789
790         cdata.dsize = sizeof(crec->header) + data.dsize;
791
792         if (!(cdata.dptr = SMB_MALLOC_ARRAY(uint8, cdata.dsize))) {
793                 return NT_STATUS_NO_MEMORY;
794         }
795
796         memcpy(cdata.dptr, &crec->header, sizeof(crec->header));
797         memcpy(cdata.dptr + sizeof(crec->header), data.dptr, data.dsize);
798
799         ret = tdb_store(crec->ctdb_ctx->wtdb->tdb, rec->key, cdata, TDB_REPLACE);
800
801         SAFE_FREE(cdata.dptr);
802
803         return (ret == 0) ? NT_STATUS_OK
804                           : tdb_error_to_ntstatus(crec->ctdb_ctx->wtdb->tdb);
805 }
806
807
808
809 static NTSTATUS db_ctdb_delete(struct db_record *rec)
810 {
811         TDB_DATA data;
812
813         /*
814          * We have to store the header with empty data. TODO: Fix the
815          * tdb-level cleanup
816          */
817
818         ZERO_STRUCT(data);
819
820         return db_ctdb_store(rec, data, 0);
821
822 }
823
824 static int db_ctdb_record_destr(struct db_record* data)
825 {
826         struct db_ctdb_rec *crec = talloc_get_type_abort(
827                 data->private_data, struct db_ctdb_rec);
828
829         DEBUG(10, (DEBUGLEVEL > 10
830                    ? "Unlocking db %u key %s\n"
831                    : "Unlocking db %u key %.20s\n",
832                    (int)crec->ctdb_ctx->db_id,
833                    hex_encode_talloc(data, (unsigned char *)data->key.dptr,
834                               data->key.dsize)));
835
836         if (tdb_chainunlock(crec->ctdb_ctx->wtdb->tdb, data->key) != 0) {
837                 DEBUG(0, ("tdb_chainunlock failed\n"));
838                 return -1;
839         }
840
841         return 0;
842 }
843
844 static struct db_record *fetch_locked_internal(struct db_ctdb_ctx *ctx,
845                                                TALLOC_CTX *mem_ctx,
846                                                TDB_DATA key,
847                                                bool persistent)
848 {
849         struct db_record *result;
850         struct db_ctdb_rec *crec;
851         NTSTATUS status;
852         TDB_DATA ctdb_data;
853         int migrate_attempts = 0;
854
855         if (!(result = talloc(mem_ctx, struct db_record))) {
856                 DEBUG(0, ("talloc failed\n"));
857                 return NULL;
858         }
859
860         if (!(crec = TALLOC_ZERO_P(result, struct db_ctdb_rec))) {
861                 DEBUG(0, ("talloc failed\n"));
862                 TALLOC_FREE(result);
863                 return NULL;
864         }
865
866         result->private_data = (void *)crec;
867         crec->ctdb_ctx = ctx;
868
869         result->key.dsize = key.dsize;
870         result->key.dptr = (uint8 *)talloc_memdup(result, key.dptr, key.dsize);
871         if (result->key.dptr == NULL) {
872                 DEBUG(0, ("talloc failed\n"));
873                 TALLOC_FREE(result);
874                 return NULL;
875         }
876
877         /*
878          * Do a blocking lock on the record
879          */
880 again:
881
882         if (DEBUGLEVEL >= 10) {
883                 char *keystr = hex_encode_talloc(result, key.dptr, key.dsize);
884                 DEBUG(10, (DEBUGLEVEL > 10
885                            ? "Locking db %u key %s\n"
886                            : "Locking db %u key %.20s\n",
887                            (int)crec->ctdb_ctx->db_id, keystr));
888                 TALLOC_FREE(keystr);
889         }
890
891         if (tdb_chainlock(ctx->wtdb->tdb, key) != 0) {
892                 DEBUG(3, ("tdb_chainlock failed\n"));
893                 TALLOC_FREE(result);
894                 return NULL;
895         }
896
897         result->store = db_ctdb_store;
898         result->delete_rec = db_ctdb_delete;
899         talloc_set_destructor(result, db_ctdb_record_destr);
900
901         ctdb_data = tdb_fetch(ctx->wtdb->tdb, key);
902
903         /*
904          * See if we have a valid record and we are the dmaster. If so, we can
905          * take the shortcut and just return it.
906          */
907
908         if ((ctdb_data.dptr == NULL) ||
909             (ctdb_data.dsize < sizeof(struct ctdb_ltdb_header)) ||
910             ((struct ctdb_ltdb_header *)ctdb_data.dptr)->dmaster != get_my_vnn()
911 #if 0
912             || (random() % 2 != 0)
913 #endif
914 ) {
915                 SAFE_FREE(ctdb_data.dptr);
916                 tdb_chainunlock(ctx->wtdb->tdb, key);
917                 talloc_set_destructor(result, NULL);
918
919                 migrate_attempts += 1;
920
921                 DEBUG(10, ("ctdb_data.dptr = %p, dmaster = %u (%u)\n",
922                            ctdb_data.dptr, ctdb_data.dptr ?
923                            ((struct ctdb_ltdb_header *)ctdb_data.dptr)->dmaster : -1,
924                            get_my_vnn()));
925
926                 status = ctdbd_migrate(messaging_ctdbd_connection(),ctx->db_id, key);
927                 if (!NT_STATUS_IS_OK(status)) {
928                         DEBUG(5, ("ctdb_migrate failed: %s\n",
929                                   nt_errstr(status)));
930                         TALLOC_FREE(result);
931                         return NULL;
932                 }
933                 /* now its migrated, try again */
934                 goto again;
935         }
936
937         if (migrate_attempts > 10) {
938                 DEBUG(0, ("db_ctdb_fetch_locked needed %d attempts\n",
939                           migrate_attempts));
940         }
941
942         memcpy(&crec->header, ctdb_data.dptr, sizeof(crec->header));
943
944         result->value.dsize = ctdb_data.dsize - sizeof(crec->header);
945         result->value.dptr = NULL;
946
947         if ((result->value.dsize != 0)
948             && !(result->value.dptr = (uint8 *)talloc_memdup(
949                          result, ctdb_data.dptr + sizeof(crec->header),
950                          result->value.dsize))) {
951                 DEBUG(0, ("talloc failed\n"));
952                 TALLOC_FREE(result);
953         }
954
955         SAFE_FREE(ctdb_data.dptr);
956
957         return result;
958 }
959
960 static struct db_record *db_ctdb_fetch_locked(struct db_context *db,
961                                               TALLOC_CTX *mem_ctx,
962                                               TDB_DATA key)
963 {
964         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
965                                                         struct db_ctdb_ctx);
966
967         if (ctx->transaction != NULL) {
968                 return db_ctdb_fetch_locked_transaction(ctx, mem_ctx, key);
969         }
970
971         if (db->persistent) {
972                 return db_ctdb_fetch_locked_persistent(ctx, mem_ctx, key);
973         }
974
975         return fetch_locked_internal(ctx, mem_ctx, key, db->persistent);
976 }
977
978 /*
979   fetch (unlocked, no migration) operation on ctdb
980  */
981 static int db_ctdb_fetch(struct db_context *db, TALLOC_CTX *mem_ctx,
982                          TDB_DATA key, TDB_DATA *data)
983 {
984         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
985                                                         struct db_ctdb_ctx);
986         NTSTATUS status;
987         TDB_DATA ctdb_data;
988
989         if (ctx->transaction) {
990                 return db_ctdb_transaction_fetch(ctx, mem_ctx, key, data);
991         }
992
993         /* try a direct fetch */
994         ctdb_data = tdb_fetch(ctx->wtdb->tdb, key);
995
996         /*
997          * See if we have a valid record and we are the dmaster. If so, we can
998          * take the shortcut and just return it.
999          * we bypass the dmaster check for persistent databases
1000          */
1001         if ((ctdb_data.dptr != NULL) &&
1002             (ctdb_data.dsize >= sizeof(struct ctdb_ltdb_header)) &&
1003             (db->persistent ||
1004              ((struct ctdb_ltdb_header *)ctdb_data.dptr)->dmaster == get_my_vnn())) {
1005                 /* we are the dmaster - avoid the ctdb protocol op */
1006
1007                 data->dsize = ctdb_data.dsize - sizeof(struct ctdb_ltdb_header);
1008                 if (data->dsize == 0) {
1009                         SAFE_FREE(ctdb_data.dptr);
1010                         data->dptr = NULL;
1011                         return 0;
1012                 }
1013
1014                 data->dptr = (uint8 *)talloc_memdup(
1015                         mem_ctx, ctdb_data.dptr+sizeof(struct ctdb_ltdb_header),
1016                         data->dsize);
1017
1018                 SAFE_FREE(ctdb_data.dptr);
1019
1020                 if (data->dptr == NULL) {
1021                         return -1;
1022                 }
1023                 return 0;
1024         }
1025
1026         SAFE_FREE(ctdb_data.dptr);
1027
1028         /* we weren't able to get it locally - ask ctdb to fetch it for us */
1029         status = ctdbd_fetch(messaging_ctdbd_connection(),ctx->db_id, key, mem_ctx, data);
1030         if (!NT_STATUS_IS_OK(status)) {
1031                 DEBUG(5, ("ctdbd_fetch failed: %s\n", nt_errstr(status)));
1032                 return -1;
1033         }
1034
1035         return 0;
1036 }
1037
1038 struct traverse_state {
1039         struct db_context *db;
1040         int (*fn)(struct db_record *rec, void *private_data);
1041         void *private_data;
1042 };
1043
1044 static void traverse_callback(TDB_DATA key, TDB_DATA data, void *private_data)
1045 {
1046         struct traverse_state *state = (struct traverse_state *)private_data;
1047         struct db_record *rec;
1048         TALLOC_CTX *tmp_ctx = talloc_new(state->db);
1049         /* we have to give them a locked record to prevent races */
1050         rec = db_ctdb_fetch_locked(state->db, tmp_ctx, key);
1051         if (rec && rec->value.dsize > 0) {
1052                 state->fn(rec, state->private_data);
1053         }
1054         talloc_free(tmp_ctx);
1055 }
1056
1057 static int traverse_persistent_callback(TDB_CONTEXT *tdb, TDB_DATA kbuf, TDB_DATA dbuf,
1058                                         void *private_data)
1059 {
1060         struct traverse_state *state = (struct traverse_state *)private_data;
1061         struct db_record *rec;
1062         TALLOC_CTX *tmp_ctx = talloc_new(state->db);
1063         int ret = 0;
1064         /* we have to give them a locked record to prevent races */
1065         rec = db_ctdb_fetch_locked(state->db, tmp_ctx, kbuf);
1066         if (rec && rec->value.dsize > 0) {
1067                 ret = state->fn(rec, state->private_data);
1068         }
1069         talloc_free(tmp_ctx);
1070         return ret;
1071 }
1072
1073 static int db_ctdb_traverse(struct db_context *db,
1074                             int (*fn)(struct db_record *rec,
1075                                       void *private_data),
1076                             void *private_data)
1077 {
1078         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1079                                                         struct db_ctdb_ctx);
1080         struct traverse_state state;
1081
1082         state.db = db;
1083         state.fn = fn;
1084         state.private_data = private_data;
1085
1086         if (db->persistent) {
1087                 /* for persistent databases we don't need to do a ctdb traverse,
1088                    we can do a faster local traverse */
1089                 return tdb_traverse(ctx->wtdb->tdb, traverse_persistent_callback, &state);
1090         }
1091
1092
1093         ctdbd_traverse(ctx->db_id, traverse_callback, &state);
1094         return 0;
1095 }
1096
1097 static NTSTATUS db_ctdb_store_deny(struct db_record *rec, TDB_DATA data, int flag)
1098 {
1099         return NT_STATUS_MEDIA_WRITE_PROTECTED;
1100 }
1101
1102 static NTSTATUS db_ctdb_delete_deny(struct db_record *rec)
1103 {
1104         return NT_STATUS_MEDIA_WRITE_PROTECTED;
1105 }
1106
1107 static void traverse_read_callback(TDB_DATA key, TDB_DATA data, void *private_data)
1108 {
1109         struct traverse_state *state = (struct traverse_state *)private_data;
1110         struct db_record rec;
1111         rec.key = key;
1112         rec.value = data;
1113         rec.store = db_ctdb_store_deny;
1114         rec.delete_rec = db_ctdb_delete_deny;
1115         rec.private_data = state->db;
1116         state->fn(&rec, state->private_data);
1117 }
1118
1119 static int traverse_persistent_callback_read(TDB_CONTEXT *tdb, TDB_DATA kbuf, TDB_DATA dbuf,
1120                                         void *private_data)
1121 {
1122         struct traverse_state *state = (struct traverse_state *)private_data;
1123         struct db_record rec;
1124         rec.key = kbuf;
1125         rec.value = dbuf;
1126         rec.store = db_ctdb_store_deny;
1127         rec.delete_rec = db_ctdb_delete_deny;
1128         rec.private_data = state->db;
1129
1130         if (rec.value.dsize <= sizeof(struct ctdb_ltdb_header)) {
1131                 /* a deleted record */
1132                 return 0;
1133         }
1134         rec.value.dsize -= sizeof(struct ctdb_ltdb_header);
1135         rec.value.dptr += sizeof(struct ctdb_ltdb_header);
1136
1137         return state->fn(&rec, state->private_data);
1138 }
1139
1140 static int db_ctdb_traverse_read(struct db_context *db,
1141                                  int (*fn)(struct db_record *rec,
1142                                            void *private_data),
1143                                  void *private_data)
1144 {
1145         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1146                                                         struct db_ctdb_ctx);
1147         struct traverse_state state;
1148
1149         state.db = db;
1150         state.fn = fn;
1151         state.private_data = private_data;
1152
1153         if (db->persistent) {
1154                 /* for persistent databases we don't need to do a ctdb traverse,
1155                    we can do a faster local traverse */
1156                 return tdb_traverse_read(ctx->wtdb->tdb, traverse_persistent_callback_read, &state);
1157         }
1158
1159         ctdbd_traverse(ctx->db_id, traverse_read_callback, &state);
1160         return 0;
1161 }
1162
1163 static int db_ctdb_get_seqnum(struct db_context *db)
1164 {
1165         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1166                                                         struct db_ctdb_ctx);
1167         return tdb_get_seqnum(ctx->wtdb->tdb);
1168 }
1169
1170 static int db_ctdb_get_flags(struct db_context *db)
1171 {
1172         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1173                                                         struct db_ctdb_ctx);
1174         return tdb_get_flags(ctx->wtdb->tdb);
1175 }
1176
1177 struct db_context *db_open_ctdb(TALLOC_CTX *mem_ctx,
1178                                 const char *name,
1179                                 int hash_size, int tdb_flags,
1180                                 int open_flags, mode_t mode)
1181 {
1182         struct db_context *result;
1183         struct db_ctdb_ctx *db_ctdb;
1184         char *db_path;
1185
1186         if (!lp_clustering()) {
1187                 DEBUG(10, ("Clustering disabled -- no ctdb\n"));
1188                 return NULL;
1189         }
1190
1191         if (!(result = TALLOC_ZERO_P(mem_ctx, struct db_context))) {
1192                 DEBUG(0, ("talloc failed\n"));
1193                 TALLOC_FREE(result);
1194                 return NULL;
1195         }
1196
1197         if (!(db_ctdb = TALLOC_P(result, struct db_ctdb_ctx))) {
1198                 DEBUG(0, ("talloc failed\n"));
1199                 TALLOC_FREE(result);
1200                 return NULL;
1201         }
1202
1203         db_ctdb->transaction = NULL;
1204         db_ctdb->db = result;
1205
1206         if (!NT_STATUS_IS_OK(ctdbd_db_attach(messaging_ctdbd_connection(),name, &db_ctdb->db_id, tdb_flags))) {
1207                 DEBUG(0, ("ctdbd_db_attach failed for %s\n", name));
1208                 TALLOC_FREE(result);
1209                 return NULL;
1210         }
1211
1212         db_path = ctdbd_dbpath(messaging_ctdbd_connection(), db_ctdb, db_ctdb->db_id);
1213
1214         result->persistent = ((tdb_flags & TDB_CLEAR_IF_FIRST) == 0);
1215
1216         /* only pass through specific flags */
1217         tdb_flags &= TDB_SEQNUM;
1218
1219         /* honor permissions if user has specified O_CREAT */
1220         if (open_flags & O_CREAT) {
1221                 chmod(db_path, mode);
1222         }
1223
1224         db_ctdb->wtdb = tdb_wrap_open(db_ctdb, db_path, hash_size, tdb_flags, O_RDWR, 0);
1225         if (db_ctdb->wtdb == NULL) {
1226                 DEBUG(0, ("Could not open tdb %s: %s\n", db_path, strerror(errno)));
1227                 TALLOC_FREE(result);
1228                 return NULL;
1229         }
1230         talloc_free(db_path);
1231
1232         result->private_data = (void *)db_ctdb;
1233         result->fetch_locked = db_ctdb_fetch_locked;
1234         result->fetch = db_ctdb_fetch;
1235         result->traverse = db_ctdb_traverse;
1236         result->traverse_read = db_ctdb_traverse_read;
1237         result->get_seqnum = db_ctdb_get_seqnum;
1238         result->get_flags = db_ctdb_get_flags;
1239         result->transaction_start = db_ctdb_transaction_start;
1240         result->transaction_commit = db_ctdb_transaction_commit;
1241         result->transaction_cancel = db_ctdb_transaction_cancel;
1242
1243         DEBUG(3,("db_open_ctdb: opened database '%s' with dbid 0x%x\n",
1244                  name, db_ctdb->db_id));
1245
1246         return result;
1247 }
1248 #endif