allow nested ctdb transactions in the same manner that they are
[metze/samba/wip.git] / source3 / lib / dbwrap_ctdb.c
1 /* 
2    Unix SMB/CIFS implementation.
3    Database interface wrapper around ctdbd
4    Copyright (C) Volker Lendecke 2007
5    
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program.  If not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #ifdef CLUSTER_SUPPORT
22 #include "ctdb.h"
23 #include "ctdb_private.h"
24 #include "ctdbd_conn.h"
25
26 struct db_ctdb_transaction_handle {
27         struct db_ctdb_ctx *ctx;
28         bool in_replay;
29         /* we store the reads and writes done under a transaction one
30            list stores both reads and writes, the other just writes
31         */
32         struct ctdb_marshall_buffer *m_all;
33         struct ctdb_marshall_buffer *m_write;
34         uint32_t nesting;
35         bool nested_cancel;
36 };
37
38 struct db_ctdb_ctx {
39         struct db_context *db;
40         struct tdb_wrap *wtdb;
41         uint32 db_id;
42         struct db_ctdb_transaction_handle *transaction;
43 };
44
45 struct db_ctdb_rec {
46         struct db_ctdb_ctx *ctdb_ctx;
47         struct ctdb_ltdb_header header;
48 };
49
50 static struct db_record *fetch_locked_internal(struct db_ctdb_ctx *ctx,
51                                                TALLOC_CTX *mem_ctx,
52                                                TDB_DATA key,
53                                                bool persistent);
54
55 static NTSTATUS tdb_error_to_ntstatus(struct tdb_context *tdb)
56 {
57         NTSTATUS status;
58         enum TDB_ERROR tret = tdb_error(tdb);
59
60         switch (tret) {
61         case TDB_ERR_EXISTS:
62                 status = NT_STATUS_OBJECT_NAME_COLLISION;
63                 break;
64         case TDB_ERR_NOEXIST:
65                 status = NT_STATUS_OBJECT_NAME_NOT_FOUND;
66                 break;
67         default:
68                 status = NT_STATUS_INTERNAL_DB_CORRUPTION;
69                 break;
70         }
71
72         return status;
73 }
74
75
76
77 /*
78   form a ctdb_rec_data record from a key/data pair
79   
80   note that header may be NULL. If not NULL then it is included in the data portion
81   of the record
82  */
83 static struct ctdb_rec_data *db_ctdb_marshall_record(TALLOC_CTX *mem_ctx, uint32_t reqid,       
84                                                   TDB_DATA key, 
85                                                   struct ctdb_ltdb_header *header,
86                                                   TDB_DATA data)
87 {
88         size_t length;
89         struct ctdb_rec_data *d;
90
91         length = offsetof(struct ctdb_rec_data, data) + key.dsize + 
92                 data.dsize + (header?sizeof(*header):0);
93         d = (struct ctdb_rec_data *)talloc_size(mem_ctx, length);
94         if (d == NULL) {
95                 return NULL;
96         }
97         d->length = length;
98         d->reqid = reqid;
99         d->keylen = key.dsize;
100         memcpy(&d->data[0], key.dptr, key.dsize);
101         if (header) {
102                 d->datalen = data.dsize + sizeof(*header);
103                 memcpy(&d->data[key.dsize], header, sizeof(*header));
104                 memcpy(&d->data[key.dsize+sizeof(*header)], data.dptr, data.dsize);
105         } else {
106                 d->datalen = data.dsize;
107                 memcpy(&d->data[key.dsize], data.dptr, data.dsize);
108         }
109         return d;
110 }
111
112
113 /* helper function for marshalling multiple records */
114 static struct ctdb_marshall_buffer *db_ctdb_marshall_add(TALLOC_CTX *mem_ctx, 
115                                                struct ctdb_marshall_buffer *m,
116                                                uint64_t db_id,
117                                                uint32_t reqid,
118                                                TDB_DATA key,
119                                                struct ctdb_ltdb_header *header,
120                                                TDB_DATA data)
121 {
122         struct ctdb_rec_data *r;
123         size_t m_size, r_size;
124         struct ctdb_marshall_buffer *m2;
125
126         r = db_ctdb_marshall_record(mem_ctx, reqid, key, header, data);
127         if (r == NULL) {
128                 talloc_free(m);
129                 return NULL;
130         }
131
132         if (m == NULL) {
133                 m = talloc_zero_size(mem_ctx, offsetof(struct ctdb_marshall_buffer, data));
134                 if (m == NULL) {
135                         return NULL;
136                 }
137                 m->db_id = db_id;
138         }
139
140         m_size = talloc_get_size(m);
141         r_size = talloc_get_size(r);
142
143         m2 = talloc_realloc_size(mem_ctx, m,  m_size + r_size);
144         if (m2 == NULL) {
145                 talloc_free(m);
146                 return NULL;
147         }
148
149         memcpy(m_size + (uint8_t *)m2, r, r_size);
150
151         talloc_free(r);
152
153         m2->count++;
154
155         return m2;
156 }
157
158 /* we've finished marshalling, return a data blob with the marshalled records */
159 static TDB_DATA db_ctdb_marshall_finish(struct ctdb_marshall_buffer *m)
160 {
161         TDB_DATA data;
162         data.dptr = (uint8_t *)m;
163         data.dsize = talloc_get_size(m);
164         return data;
165 }
166
167 /* 
168    loop over a marshalling buffer 
169    
170      - pass r==NULL to start
171      - loop the number of times indicated by m->count
172 */
173 static struct ctdb_rec_data *db_ctdb_marshall_loop_next(struct ctdb_marshall_buffer *m, struct ctdb_rec_data *r,
174                                                      uint32_t *reqid,
175                                                      struct ctdb_ltdb_header *header,
176                                                      TDB_DATA *key, TDB_DATA *data)
177 {
178         if (r == NULL) {
179                 r = (struct ctdb_rec_data *)&m->data[0];
180         } else {
181                 r = (struct ctdb_rec_data *)(r->length + (uint8_t *)r);
182         }
183
184         if (reqid != NULL) {
185                 *reqid = r->reqid;
186         }
187         
188         if (key != NULL) {
189                 key->dptr   = &r->data[0];
190                 key->dsize  = r->keylen;
191         }
192         if (data != NULL) {
193                 data->dptr  = &r->data[r->keylen];
194                 data->dsize = r->datalen;
195                 if (header != NULL) {
196                         data->dptr += sizeof(*header);
197                         data->dsize -= sizeof(*header);
198                 }
199         }
200
201         if (header != NULL) {
202                 if (r->datalen < sizeof(*header)) {
203                         return NULL;
204                 }
205                 *header = *(struct ctdb_ltdb_header *)&r->data[r->keylen];
206         }
207
208         return r;
209 }
210
211
212
213 /* start a transaction on a database */
214 static int db_ctdb_transaction_destructor(struct db_ctdb_transaction_handle *h)
215 {
216         tdb_transaction_cancel(h->ctx->wtdb->tdb);
217         return 0;
218 }
219
220 /* start a transaction on a database */
221 static int db_ctdb_transaction_fetch_start(struct db_ctdb_transaction_handle *h)
222 {
223         struct db_record *rh;
224         TDB_DATA key;
225         TALLOC_CTX *tmp_ctx;
226         const char *keyname = CTDB_TRANSACTION_LOCK_KEY;
227         int ret;
228         struct db_ctdb_ctx *ctx = h->ctx;
229         TDB_DATA data;
230
231         key.dptr = discard_const(keyname);
232         key.dsize = strlen(keyname);
233
234 again:
235         tmp_ctx = talloc_new(h);
236
237         rh = fetch_locked_internal(ctx, tmp_ctx, key, true);
238         if (rh == NULL) {
239                 DEBUG(0,(__location__ " Failed to fetch_lock database\n"));             
240                 talloc_free(tmp_ctx);
241                 return -1;
242         }
243         talloc_free(rh);
244
245         ret = tdb_transaction_start(ctx->wtdb->tdb);
246         if (ret != 0) {
247                 DEBUG(0,(__location__ " Failed to start tdb transaction\n"));
248                 talloc_free(tmp_ctx);
249                 return -1;
250         }
251
252         data = tdb_fetch(ctx->wtdb->tdb, key);
253         if ((data.dptr == NULL) ||
254             (data.dsize < sizeof(struct ctdb_ltdb_header)) ||
255             ((struct ctdb_ltdb_header *)data.dptr)->dmaster != get_my_vnn()) {
256                 SAFE_FREE(data.dptr);
257                 tdb_transaction_cancel(ctx->wtdb->tdb);
258                 talloc_free(tmp_ctx);
259                 goto again;
260         }
261
262         SAFE_FREE(data.dptr);
263         talloc_free(tmp_ctx);
264
265         return 0;
266 }
267
268
269 /* start a transaction on a database */
270 static int db_ctdb_transaction_start(struct db_context *db)
271 {
272         struct db_ctdb_transaction_handle *h;
273         int ret;
274         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
275                                                         struct db_ctdb_ctx);
276
277         if (!db->persistent) {
278                 DEBUG(0,("transactions not supported on non-persistent database 0x%08x\n", 
279                          ctx->db_id));
280                 return -1;
281         }
282
283         if (ctx->transaction) {
284                 ctx->transaction->nesting++;
285                 return 0;
286         }
287
288         h = talloc_zero(db, struct db_ctdb_transaction_handle);
289         if (h == NULL) {
290                 DEBUG(0,(__location__ " oom for transaction handle\n"));                
291                 return -1;
292         }
293
294         h->ctx = ctx;
295
296         ret = db_ctdb_transaction_fetch_start(h);
297         if (ret != 0) {
298                 talloc_free(h);
299                 return -1;
300         }
301
302         talloc_set_destructor(h, db_ctdb_transaction_destructor);
303
304         ctx->transaction = h;
305
306         DEBUG(5,(__location__ " Started transaction on db 0x%08x\n", ctx->db_id));
307
308         return 0;
309 }
310
311
312
313 /*
314   fetch a record inside a transaction
315  */
316 static int db_ctdb_transaction_fetch(struct db_ctdb_ctx *db, 
317                                      TALLOC_CTX *mem_ctx, 
318                                      TDB_DATA key, TDB_DATA *data)
319 {
320         struct db_ctdb_transaction_handle *h = db->transaction;
321
322         *data = tdb_fetch(h->ctx->wtdb->tdb, key);
323
324         if (data->dptr != NULL) {
325                 uint8_t *oldptr = (uint8_t *)data->dptr;
326                 data->dsize -= sizeof(struct ctdb_ltdb_header);
327                 if (data->dsize == 0) {
328                         data->dptr = NULL;
329                 } else {
330                         data->dptr = (uint8 *)
331                                 talloc_memdup(
332                                         mem_ctx, data->dptr+sizeof(struct ctdb_ltdb_header),
333                                         data->dsize);
334                 }
335                 SAFE_FREE(oldptr);
336                 if (data->dptr == NULL && data->dsize != 0) {
337                         return -1;
338                 }
339         }
340
341         if (!h->in_replay) {
342                 h->m_all = db_ctdb_marshall_add(h, h->m_all, h->ctx->db_id, 1, key, NULL, *data);
343                 if (h->m_all == NULL) {
344                         DEBUG(0,(__location__ " Failed to add to marshalling record\n"));
345                         data->dsize = 0;
346                         talloc_free(data->dptr);
347                         return -1;
348                 }
349         }
350
351         return 0;
352 }
353
354
355 static NTSTATUS db_ctdb_store_transaction(struct db_record *rec, TDB_DATA data, int flag);
356 static NTSTATUS db_ctdb_delete_transaction(struct db_record *rec);
357
358 static struct db_record *db_ctdb_fetch_locked_transaction(struct db_ctdb_ctx *ctx,
359                                                           TALLOC_CTX *mem_ctx,
360                                                           TDB_DATA key)
361 {
362         struct db_record *result;
363         TDB_DATA ctdb_data;
364
365         if (!(result = talloc(mem_ctx, struct db_record))) {
366                 DEBUG(0, ("talloc failed\n"));
367                 return NULL;
368         }
369
370         result->private_data = ctx->transaction;
371
372         result->key.dsize = key.dsize;
373         result->key.dptr = (uint8 *)talloc_memdup(result, key.dptr, key.dsize);
374         if (result->key.dptr == NULL) {
375                 DEBUG(0, ("talloc failed\n"));
376                 TALLOC_FREE(result);
377                 return NULL;
378         }
379
380         result->store = db_ctdb_store_transaction;
381         result->delete_rec = db_ctdb_delete_transaction;
382
383         ctdb_data = tdb_fetch(ctx->wtdb->tdb, key);
384         if (ctdb_data.dptr == NULL) {
385                 /* create the record */
386                 result->value = tdb_null;
387                 return result;
388         }
389
390         result->value.dsize = ctdb_data.dsize - sizeof(struct ctdb_ltdb_header);
391         result->value.dptr = NULL;
392
393         if ((result->value.dsize != 0)
394             && !(result->value.dptr = (uint8 *)talloc_memdup(
395                          result, ctdb_data.dptr + sizeof(struct ctdb_ltdb_header),
396                          result->value.dsize))) {
397                 DEBUG(0, ("talloc failed\n"));
398                 TALLOC_FREE(result);
399         }
400
401         SAFE_FREE(ctdb_data.dptr);
402
403         return result;
404 }
405
406 static int db_ctdb_record_destructor(struct db_record *rec)
407 {
408         struct db_ctdb_transaction_handle *h = talloc_get_type_abort(
409                 rec->private_data, struct db_ctdb_transaction_handle);
410         int ret = h->ctx->db->transaction_commit(h->ctx->db);
411         if (ret != 0) {
412                 DEBUG(0,(__location__ " transaction_commit failed\n"));
413         }
414         return 0;
415 }
416
417 /*
418   auto-create a transaction for persistent databases
419  */
420 static struct db_record *db_ctdb_fetch_locked_persistent(struct db_ctdb_ctx *ctx,
421                                                          TALLOC_CTX *mem_ctx,
422                                                          TDB_DATA key)
423 {
424         int res;
425         struct db_record *rec;
426
427         res = db_ctdb_transaction_start(ctx->db);
428         if (res == -1) {
429                 return NULL;
430         }
431
432         rec = db_ctdb_fetch_locked_transaction(ctx, mem_ctx, key);
433         if (rec == NULL) {
434                 ctx->db->transaction_cancel(ctx->db);           
435                 return NULL;
436         }
437
438         /* destroy this transaction when we release the lock */
439         talloc_set_destructor((struct db_record *)talloc_new(rec), db_ctdb_record_destructor);
440         return rec;
441 }
442
443
444 /*
445   stores a record inside a transaction
446  */
447 static int db_ctdb_transaction_store(struct db_ctdb_transaction_handle *h, 
448                                      TDB_DATA key, TDB_DATA data)
449 {
450         TALLOC_CTX *tmp_ctx = talloc_new(h);
451         int ret;
452         TDB_DATA rec;
453         struct ctdb_ltdb_header header;
454
455         /* we need the header so we can update the RSN */
456         rec = tdb_fetch(h->ctx->wtdb->tdb, key);
457         if (rec.dptr == NULL) {
458                 /* the record doesn't exist - create one with us as dmaster.
459                    This is only safe because we are in a transaction and this
460                    is a persistent database */
461                 ZERO_STRUCT(header);
462                 header.dmaster = get_my_vnn();
463         } else {
464                 memcpy(&header, rec.dptr, sizeof(struct ctdb_ltdb_header));
465                 rec.dsize -= sizeof(struct ctdb_ltdb_header);
466                 /* a special case, we are writing the same data that is there now */
467                 if (data.dsize == rec.dsize &&
468                     memcmp(data.dptr, rec.dptr + sizeof(struct ctdb_ltdb_header), data.dsize) == 0) {
469                         SAFE_FREE(rec.dptr);
470                         talloc_free(tmp_ctx);
471                         return 0;
472                 }
473                 SAFE_FREE(rec.dptr);
474         }
475
476         header.rsn++;
477
478         if (!h->in_replay) {
479                 h->m_all = db_ctdb_marshall_add(h, h->m_all, h->ctx->db_id, 0, key, NULL, data);
480                 if (h->m_all == NULL) {
481                         DEBUG(0,(__location__ " Failed to add to marshalling record\n"));
482                         talloc_free(tmp_ctx);
483                         return -1;
484                 }
485         }
486                 
487         h->m_write = db_ctdb_marshall_add(h, h->m_write, h->ctx->db_id, 0, key, &header, data);
488         if (h->m_write == NULL) {
489                 DEBUG(0,(__location__ " Failed to add to marshalling record\n"));
490                 talloc_free(tmp_ctx);
491                 return -1;
492         }
493         
494         rec.dsize = data.dsize + sizeof(struct ctdb_ltdb_header);
495         rec.dptr = talloc_size(tmp_ctx, rec.dsize);
496         if (rec.dptr == NULL) {
497                 DEBUG(0,(__location__ " Failed to alloc record\n"));
498                 talloc_free(tmp_ctx);
499                 return -1;
500         }
501         memcpy(rec.dptr, &header, sizeof(struct ctdb_ltdb_header));
502         memcpy(sizeof(struct ctdb_ltdb_header) + (uint8_t *)rec.dptr, data.dptr, data.dsize);
503
504         ret = tdb_store(h->ctx->wtdb->tdb, key, rec, TDB_REPLACE);
505
506         talloc_free(tmp_ctx);
507         
508         return ret;
509 }
510
511
512 /* 
513    a record store inside a transaction
514  */
515 static NTSTATUS db_ctdb_store_transaction(struct db_record *rec, TDB_DATA data, int flag)
516 {
517         struct db_ctdb_transaction_handle *h = talloc_get_type_abort(
518                 rec->private_data, struct db_ctdb_transaction_handle);
519         int ret;
520
521         ret = db_ctdb_transaction_store(h, rec->key, data);
522         if (ret != 0) {
523                 return tdb_error_to_ntstatus(h->ctx->wtdb->tdb);
524         }
525         return NT_STATUS_OK;
526 }
527
528 /* 
529    a record delete inside a transaction
530  */
531 static NTSTATUS db_ctdb_delete_transaction(struct db_record *rec)
532 {
533         struct db_ctdb_transaction_handle *h = talloc_get_type_abort(
534                 rec->private_data, struct db_ctdb_transaction_handle);
535         int ret;
536
537         ret = db_ctdb_transaction_store(h, rec->key, tdb_null);
538         if (ret != 0) {
539                 return tdb_error_to_ntstatus(h->ctx->wtdb->tdb);
540         }
541         return NT_STATUS_OK;
542 }
543
544
545 /*
546   replay a transaction
547  */
548 static int ctdb_replay_transaction(struct db_ctdb_transaction_handle *h)
549 {
550         int ret, i;
551         struct ctdb_rec_data *rec = NULL;
552
553         h->in_replay = true;
554         talloc_free(h->m_write);
555         h->m_write = NULL;
556
557         ret = db_ctdb_transaction_fetch_start(h);
558         if (ret != 0) {
559                 return ret;
560         }
561
562         for (i=0;i<h->m_all->count;i++) {
563                 TDB_DATA key, data;
564
565                 rec = db_ctdb_marshall_loop_next(h->m_all, rec, NULL, NULL, &key, &data);
566                 if (rec == NULL) {
567                         DEBUG(0, (__location__ " Out of records in ctdb_replay_transaction?\n"));
568                         goto failed;
569                 }
570
571                 if (rec->reqid == 0) {
572                         /* its a store */
573                         if (db_ctdb_transaction_store(h, key, data) != 0) {
574                                 goto failed;
575                         }
576                 } else {
577                         TDB_DATA data2;
578                         TALLOC_CTX *tmp_ctx = talloc_new(h);
579
580                         if (db_ctdb_transaction_fetch(h->ctx, tmp_ctx, key, &data2) != 0) {
581                                 talloc_free(tmp_ctx);
582                                 goto failed;
583                         }
584                         if (data2.dsize != data.dsize ||
585                             memcmp(data2.dptr, data.dptr, data.dsize) != 0) {
586                                 /* the record has changed on us - we have to give up */
587                                 talloc_free(tmp_ctx);
588                                 goto failed;
589                         }
590                         talloc_free(tmp_ctx);
591                 }
592         }
593         
594         return 0;
595
596 failed:
597         tdb_transaction_cancel(h->ctx->wtdb->tdb);
598         return -1;
599 }
600
601
602 /*
603   commit a transaction
604  */
605 static int db_ctdb_transaction_commit(struct db_context *db)
606 {
607         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
608                                                         struct db_ctdb_ctx);
609         NTSTATUS rets;
610         int ret;
611         int status;
612         int retries = 0;
613         struct db_ctdb_transaction_handle *h = ctx->transaction;
614         enum ctdb_controls failure_control = CTDB_CONTROL_TRANS2_ERROR;
615
616         if (h == NULL) {
617                 DEBUG(0,(__location__ " transaction commit with no open transaction on db 0x%08x\n", ctx->db_id));
618                 return -1;
619         }
620
621         if (h->nested_cancel) {
622                 db->transaction_cancel(db);
623                 DEBUG(5,(__location__ " Failed transaction commit after nested cancel\n"));
624                 return -1;
625         }
626
627         if (h->nesting != 0) {
628                 h->nesting--;
629                 return 0;
630         }
631
632         DEBUG(5,(__location__ " Commit transaction on db 0x%08x\n", ctx->db_id));
633
634         talloc_set_destructor(h, NULL);
635
636         /* our commit strategy is quite complex.
637
638            - we first try to commit the changes to all other nodes
639
640            - if that works, then we commit locally and we are done
641
642            - if a commit on another node fails, then we need to cancel
643              the transaction, then restart the transaction (thus
644              opening a window of time for a pending recovery to
645              complete), then replay the transaction, checking all the
646              reads and writes (checking that reads give the same data,
647              and writes succeed). Then we retry the transaction to the
648              other nodes
649         */
650
651 again:
652         if (h->m_write == NULL) {
653                 /* no changes were made, potentially after a retry */
654                 tdb_transaction_cancel(h->ctx->wtdb->tdb);
655                 talloc_free(h);
656                 ctx->transaction = NULL;
657                 return 0;
658         }
659
660         /* tell ctdbd to commit to the other nodes */
661         rets = ctdbd_control_local(messaging_ctdbd_connection(), 
662                                    retries==0?CTDB_CONTROL_TRANS2_COMMIT:CTDB_CONTROL_TRANS2_COMMIT_RETRY, 
663                                    h->ctx->db_id, 0,
664                                    db_ctdb_marshall_finish(h->m_write), NULL, NULL, &status);
665         if (!NT_STATUS_IS_OK(rets) || status != 0) {
666                 tdb_transaction_cancel(h->ctx->wtdb->tdb);
667                 sleep(1);
668
669                 if (!NT_STATUS_IS_OK(rets)) {
670                         failure_control = CTDB_CONTROL_TRANS2_ERROR;                    
671                 } else {
672                         /* work out what error code we will give if we 
673                            have to fail the operation */
674                         switch ((enum ctdb_trans2_commit_error)status) {
675                         case CTDB_TRANS2_COMMIT_SUCCESS:
676                         case CTDB_TRANS2_COMMIT_SOMEFAIL:
677                         case CTDB_TRANS2_COMMIT_TIMEOUT:
678                                 failure_control = CTDB_CONTROL_TRANS2_ERROR;
679                                 break;
680                         case CTDB_TRANS2_COMMIT_ALLFAIL:
681                                 failure_control = CTDB_CONTROL_TRANS2_FINISHED;
682                                 break;
683                         }
684                 }
685
686                 if (++retries == 5) {
687                         DEBUG(0,(__location__ " Giving up transaction on db 0x%08x after %d retries failure_control=%u\n", 
688                                  h->ctx->db_id, retries, (unsigned)failure_control));
689                         ctdbd_control_local(messaging_ctdbd_connection(), failure_control,
690                                             h->ctx->db_id, CTDB_CTRL_FLAG_NOREPLY, 
691                                             tdb_null, NULL, NULL, NULL);
692                         h->ctx->transaction = NULL;
693                         talloc_free(h);
694                         ctx->transaction = NULL;
695                         return -1;                      
696                 }
697
698                 if (ctdb_replay_transaction(h) != 0) {
699                         DEBUG(0,(__location__ " Failed to replay transaction failure_control=%u\n",
700                                  (unsigned)failure_control));
701                         ctdbd_control_local(messaging_ctdbd_connection(), failure_control,
702                                             h->ctx->db_id, CTDB_CTRL_FLAG_NOREPLY, 
703                                             tdb_null, NULL, NULL, NULL);
704                         h->ctx->transaction = NULL;
705                         talloc_free(h);
706                         ctx->transaction = NULL;
707                         return -1;
708                 }
709                 goto again;
710         } else {
711                 failure_control = CTDB_CONTROL_TRANS2_ERROR;
712         }
713
714         /* do the real commit locally */
715         ret = tdb_transaction_commit(h->ctx->wtdb->tdb);
716         if (ret != 0) {
717                 DEBUG(0,(__location__ " Failed to commit transaction failure_control=%u\n",
718                          (unsigned)failure_control));
719                 ctdbd_control_local(messaging_ctdbd_connection(), failure_control, h->ctx->db_id, 
720                                     CTDB_CTRL_FLAG_NOREPLY, tdb_null, NULL, NULL, NULL);
721                 h->ctx->transaction = NULL;
722                 talloc_free(h);
723                 return ret;
724         }
725
726         /* tell ctdbd that we are finished with our local commit */
727         ctdbd_control_local(messaging_ctdbd_connection(), CTDB_CONTROL_TRANS2_FINISHED, 
728                             h->ctx->db_id, CTDB_CTRL_FLAG_NOREPLY, 
729                             tdb_null, NULL, NULL, NULL);
730         h->ctx->transaction = NULL;
731         talloc_free(h);
732         return 0;
733 }
734
735
736 /*
737   cancel a transaction
738  */
739 static int db_ctdb_transaction_cancel(struct db_context *db)
740 {
741         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
742                                                         struct db_ctdb_ctx);
743         struct db_ctdb_transaction_handle *h = ctx->transaction;
744
745         if (h == NULL) {
746                 DEBUG(0,(__location__ " transaction cancel with no open transaction on db 0x%08x\n", ctx->db_id));
747                 return -1;
748         }
749
750         if (h->nesting != 0) {
751                 h->nesting--;
752                 h->nested_cancel = true;
753                 return 0;
754         }
755
756         DEBUG(5,(__location__ " Cancel transaction on db 0x%08x\n", ctx->db_id));
757
758         ctx->transaction = NULL;
759         talloc_free(h);
760         return 0;
761 }
762
763
764 static NTSTATUS db_ctdb_store(struct db_record *rec, TDB_DATA data, int flag)
765 {
766         struct db_ctdb_rec *crec = talloc_get_type_abort(
767                 rec->private_data, struct db_ctdb_rec);
768         TDB_DATA cdata;
769         int ret;
770
771         cdata.dsize = sizeof(crec->header) + data.dsize;
772
773         if (!(cdata.dptr = SMB_MALLOC_ARRAY(uint8, cdata.dsize))) {
774                 return NT_STATUS_NO_MEMORY;
775         }
776
777         memcpy(cdata.dptr, &crec->header, sizeof(crec->header));
778         memcpy(cdata.dptr + sizeof(crec->header), data.dptr, data.dsize);
779
780         ret = tdb_store(crec->ctdb_ctx->wtdb->tdb, rec->key, cdata, TDB_REPLACE);
781
782         SAFE_FREE(cdata.dptr);
783
784         return (ret == 0) ? NT_STATUS_OK
785                           : tdb_error_to_ntstatus(crec->ctdb_ctx->wtdb->tdb);
786 }
787
788
789
790 static NTSTATUS db_ctdb_delete(struct db_record *rec)
791 {
792         TDB_DATA data;
793
794         /*
795          * We have to store the header with empty data. TODO: Fix the
796          * tdb-level cleanup
797          */
798
799         ZERO_STRUCT(data);
800
801         return db_ctdb_store(rec, data, 0);
802
803 }
804
805 static int db_ctdb_record_destr(struct db_record* data)
806 {
807         struct db_ctdb_rec *crec = talloc_get_type_abort(
808                 data->private_data, struct db_ctdb_rec);
809
810         DEBUG(10, (DEBUGLEVEL > 10
811                    ? "Unlocking db %u key %s\n"
812                    : "Unlocking db %u key %.20s\n",
813                    (int)crec->ctdb_ctx->db_id,
814                    hex_encode(data, (unsigned char *)data->key.dptr,
815                               data->key.dsize)));
816
817         if (tdb_chainunlock(crec->ctdb_ctx->wtdb->tdb, data->key) != 0) {
818                 DEBUG(0, ("tdb_chainunlock failed\n"));
819                 return -1;
820         }
821
822         return 0;
823 }
824
825 static struct db_record *fetch_locked_internal(struct db_ctdb_ctx *ctx,
826                                                TALLOC_CTX *mem_ctx,
827                                                TDB_DATA key,
828                                                bool persistent)
829 {
830         struct db_record *result;
831         struct db_ctdb_rec *crec;
832         NTSTATUS status;
833         TDB_DATA ctdb_data;
834         int migrate_attempts = 0;
835
836         if (!(result = talloc(mem_ctx, struct db_record))) {
837                 DEBUG(0, ("talloc failed\n"));
838                 return NULL;
839         }
840
841         if (!(crec = TALLOC_ZERO_P(result, struct db_ctdb_rec))) {
842                 DEBUG(0, ("talloc failed\n"));
843                 TALLOC_FREE(result);
844                 return NULL;
845         }
846
847         result->private_data = (void *)crec;
848         crec->ctdb_ctx = ctx;
849
850         result->key.dsize = key.dsize;
851         result->key.dptr = (uint8 *)talloc_memdup(result, key.dptr, key.dsize);
852         if (result->key.dptr == NULL) {
853                 DEBUG(0, ("talloc failed\n"));
854                 TALLOC_FREE(result);
855                 return NULL;
856         }
857
858         /*
859          * Do a blocking lock on the record
860          */
861 again:
862
863         if (DEBUGLEVEL >= 10) {
864                 char *keystr = hex_encode(result, key.dptr, key.dsize);
865                 DEBUG(10, (DEBUGLEVEL > 10
866                            ? "Locking db %u key %s\n"
867                            : "Locking db %u key %.20s\n",
868                            (int)crec->ctdb_ctx->db_id, keystr));
869                 TALLOC_FREE(keystr);
870         }
871         
872         if (tdb_chainlock(ctx->wtdb->tdb, key) != 0) {
873                 DEBUG(3, ("tdb_chainlock failed\n"));
874                 TALLOC_FREE(result);
875                 return NULL;
876         }
877
878         result->store = db_ctdb_store;
879         result->delete_rec = db_ctdb_delete;
880         talloc_set_destructor(result, db_ctdb_record_destr);
881
882         ctdb_data = tdb_fetch(ctx->wtdb->tdb, key);
883
884         /*
885          * See if we have a valid record and we are the dmaster. If so, we can
886          * take the shortcut and just return it.
887          */
888
889         if ((ctdb_data.dptr == NULL) ||
890             (ctdb_data.dsize < sizeof(struct ctdb_ltdb_header)) ||
891             ((struct ctdb_ltdb_header *)ctdb_data.dptr)->dmaster != get_my_vnn()
892 #if 0
893             || (random() % 2 != 0)
894 #endif
895 ) {
896                 SAFE_FREE(ctdb_data.dptr);
897                 tdb_chainunlock(ctx->wtdb->tdb, key);
898                 talloc_set_destructor(result, NULL);
899
900                 migrate_attempts += 1;
901
902                 DEBUG(10, ("ctdb_data.dptr = %p, dmaster = %u (%u)\n",
903                            ctdb_data.dptr, ctdb_data.dptr ?
904                            ((struct ctdb_ltdb_header *)ctdb_data.dptr)->dmaster : -1,
905                            get_my_vnn()));
906
907                 status = ctdbd_migrate(messaging_ctdbd_connection(),ctx->db_id, key);
908                 if (!NT_STATUS_IS_OK(status)) {
909                         DEBUG(5, ("ctdb_migrate failed: %s\n",
910                                   nt_errstr(status)));
911                         TALLOC_FREE(result);
912                         return NULL;
913                 }
914                 /* now its migrated, try again */
915                 goto again;
916         }
917
918         if (migrate_attempts > 10) {
919                 DEBUG(0, ("db_ctdb_fetch_locked needed %d attempts\n",
920                           migrate_attempts));
921         }
922
923         memcpy(&crec->header, ctdb_data.dptr, sizeof(crec->header));
924
925         result->value.dsize = ctdb_data.dsize - sizeof(crec->header);
926         result->value.dptr = NULL;
927
928         if ((result->value.dsize != 0)
929             && !(result->value.dptr = (uint8 *)talloc_memdup(
930                          result, ctdb_data.dptr + sizeof(crec->header),
931                          result->value.dsize))) {
932                 DEBUG(0, ("talloc failed\n"));
933                 TALLOC_FREE(result);
934         }
935
936         SAFE_FREE(ctdb_data.dptr);
937
938         return result;
939 }
940
941 static struct db_record *db_ctdb_fetch_locked(struct db_context *db,
942                                               TALLOC_CTX *mem_ctx,
943                                               TDB_DATA key)
944 {
945         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
946                                                         struct db_ctdb_ctx);
947
948         if (ctx->transaction != NULL) {
949                 return db_ctdb_fetch_locked_transaction(ctx, mem_ctx, key);
950         }
951
952         if (db->persistent) {
953                 return db_ctdb_fetch_locked_persistent(ctx, mem_ctx, key);
954         }
955
956         return fetch_locked_internal(ctx, mem_ctx, key, db->persistent);
957 }
958
959 /*
960   fetch (unlocked, no migration) operation on ctdb
961  */
962 static int db_ctdb_fetch(struct db_context *db, TALLOC_CTX *mem_ctx,
963                          TDB_DATA key, TDB_DATA *data)
964 {
965         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
966                                                         struct db_ctdb_ctx);
967         NTSTATUS status;
968         TDB_DATA ctdb_data;
969
970         if (ctx->transaction) {
971                 return db_ctdb_transaction_fetch(ctx, mem_ctx, key, data);
972         }
973
974         /* try a direct fetch */
975         ctdb_data = tdb_fetch(ctx->wtdb->tdb, key);
976
977         /*
978          * See if we have a valid record and we are the dmaster. If so, we can
979          * take the shortcut and just return it.
980          * we bypass the dmaster check for persistent databases
981          */
982         if ((ctdb_data.dptr != NULL) &&
983             (ctdb_data.dsize >= sizeof(struct ctdb_ltdb_header)) &&
984             (db->persistent ||
985              ((struct ctdb_ltdb_header *)ctdb_data.dptr)->dmaster == get_my_vnn())) {
986                 /* we are the dmaster - avoid the ctdb protocol op */
987
988                 data->dsize = ctdb_data.dsize - sizeof(struct ctdb_ltdb_header);
989                 if (data->dsize == 0) {
990                         SAFE_FREE(ctdb_data.dptr);
991                         data->dptr = NULL;
992                         return 0;
993                 }
994
995                 data->dptr = (uint8 *)talloc_memdup(
996                         mem_ctx, ctdb_data.dptr+sizeof(struct ctdb_ltdb_header),
997                         data->dsize);
998
999                 SAFE_FREE(ctdb_data.dptr);
1000
1001                 if (data->dptr == NULL) {
1002                         return -1;
1003                 }
1004                 return 0;
1005         }
1006
1007         SAFE_FREE(ctdb_data.dptr);
1008
1009         /* we weren't able to get it locally - ask ctdb to fetch it for us */
1010         status = ctdbd_fetch(messaging_ctdbd_connection(),ctx->db_id, key, mem_ctx, data);
1011         if (!NT_STATUS_IS_OK(status)) {
1012                 DEBUG(5, ("ctdbd_fetch failed: %s\n", nt_errstr(status)));
1013                 return -1;
1014         }
1015
1016         return 0;
1017 }
1018
1019 struct traverse_state {
1020         struct db_context *db;
1021         int (*fn)(struct db_record *rec, void *private_data);
1022         void *private_data;
1023 };
1024
1025 static void traverse_callback(TDB_DATA key, TDB_DATA data, void *private_data)
1026 {
1027         struct traverse_state *state = (struct traverse_state *)private_data;
1028         struct db_record *rec;
1029         TALLOC_CTX *tmp_ctx = talloc_new(state->db);
1030         /* we have to give them a locked record to prevent races */
1031         rec = db_ctdb_fetch_locked(state->db, tmp_ctx, key);
1032         if (rec && rec->value.dsize > 0) {
1033                 state->fn(rec, state->private_data);
1034         }
1035         talloc_free(tmp_ctx);
1036 }
1037
1038 static int traverse_persistent_callback(TDB_CONTEXT *tdb, TDB_DATA kbuf, TDB_DATA dbuf,
1039                                         void *private_data)
1040 {
1041         struct traverse_state *state = (struct traverse_state *)private_data;
1042         struct db_record *rec;
1043         TALLOC_CTX *tmp_ctx = talloc_new(state->db);
1044         int ret = 0;
1045         /* we have to give them a locked record to prevent races */
1046         rec = db_ctdb_fetch_locked(state->db, tmp_ctx, kbuf);
1047         if (rec && rec->value.dsize > 0) {
1048                 ret = state->fn(rec, state->private_data);
1049         }
1050         talloc_free(tmp_ctx);
1051         return ret;
1052 }
1053
1054 static int db_ctdb_traverse(struct db_context *db,
1055                             int (*fn)(struct db_record *rec,
1056                                       void *private_data),
1057                             void *private_data)
1058 {
1059         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1060                                                         struct db_ctdb_ctx);
1061         struct traverse_state state;
1062
1063         state.db = db;
1064         state.fn = fn;
1065         state.private_data = private_data;
1066
1067         if (db->persistent) {
1068                 /* for persistent databases we don't need to do a ctdb traverse,
1069                    we can do a faster local traverse */
1070                 return tdb_traverse(ctx->wtdb->tdb, traverse_persistent_callback, &state);
1071         }
1072
1073
1074         ctdbd_traverse(ctx->db_id, traverse_callback, &state);
1075         return 0;
1076 }
1077
1078 static NTSTATUS db_ctdb_store_deny(struct db_record *rec, TDB_DATA data, int flag)
1079 {
1080         return NT_STATUS_MEDIA_WRITE_PROTECTED;
1081 }
1082
1083 static NTSTATUS db_ctdb_delete_deny(struct db_record *rec)
1084 {
1085         return NT_STATUS_MEDIA_WRITE_PROTECTED;
1086 }
1087
1088 static void traverse_read_callback(TDB_DATA key, TDB_DATA data, void *private_data)
1089 {
1090         struct traverse_state *state = (struct traverse_state *)private_data;
1091         struct db_record rec;
1092         rec.key = key;
1093         rec.value = data;
1094         rec.store = db_ctdb_store_deny;
1095         rec.delete_rec = db_ctdb_delete_deny;
1096         rec.private_data = state->db;
1097         state->fn(&rec, state->private_data);
1098 }
1099
1100 static int traverse_persistent_callback_read(TDB_CONTEXT *tdb, TDB_DATA kbuf, TDB_DATA dbuf,
1101                                         void *private_data)
1102 {
1103         struct traverse_state *state = (struct traverse_state *)private_data;
1104         struct db_record rec;
1105         rec.key = kbuf;
1106         rec.value = dbuf;
1107         rec.store = db_ctdb_store_deny;
1108         rec.delete_rec = db_ctdb_delete_deny;
1109         rec.private_data = state->db;
1110
1111         if (rec.value.dsize <= sizeof(struct ctdb_ltdb_header)) {
1112                 /* a deleted record */
1113                 return 0;
1114         }
1115         rec.value.dsize -= sizeof(struct ctdb_ltdb_header);
1116         rec.value.dptr += sizeof(struct ctdb_ltdb_header);
1117
1118         return state->fn(&rec, state->private_data);
1119 }
1120
1121 static int db_ctdb_traverse_read(struct db_context *db,
1122                                  int (*fn)(struct db_record *rec,
1123                                            void *private_data),
1124                                  void *private_data)
1125 {
1126         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1127                                                         struct db_ctdb_ctx);
1128         struct traverse_state state;
1129
1130         state.db = db;
1131         state.fn = fn;
1132         state.private_data = private_data;
1133
1134         if (db->persistent) {
1135                 /* for persistent databases we don't need to do a ctdb traverse,
1136                    we can do a faster local traverse */
1137                 return tdb_traverse_read(ctx->wtdb->tdb, traverse_persistent_callback_read, &state);
1138         }
1139
1140         ctdbd_traverse(ctx->db_id, traverse_read_callback, &state);
1141         return 0;
1142 }
1143
1144 static int db_ctdb_get_seqnum(struct db_context *db)
1145 {
1146         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1147                                                         struct db_ctdb_ctx);
1148         return tdb_get_seqnum(ctx->wtdb->tdb);
1149 }
1150
1151 struct db_context *db_open_ctdb(TALLOC_CTX *mem_ctx,
1152                                 const char *name,
1153                                 int hash_size, int tdb_flags,
1154                                 int open_flags, mode_t mode)
1155 {
1156         struct db_context *result;
1157         struct db_ctdb_ctx *db_ctdb;
1158         char *db_path;
1159
1160         if (!lp_clustering()) {
1161                 DEBUG(10, ("Clustering disabled -- no ctdb\n"));
1162                 return NULL;
1163         }
1164
1165         if (!(result = TALLOC_ZERO_P(mem_ctx, struct db_context))) {
1166                 DEBUG(0, ("talloc failed\n"));
1167                 TALLOC_FREE(result);
1168                 return NULL;
1169         }
1170
1171         if (!(db_ctdb = TALLOC_P(result, struct db_ctdb_ctx))) {
1172                 DEBUG(0, ("talloc failed\n"));
1173                 TALLOC_FREE(result);
1174                 return NULL;
1175         }
1176
1177         db_ctdb->transaction = NULL;
1178         db_ctdb->db = result;
1179
1180         if (!NT_STATUS_IS_OK(ctdbd_db_attach(messaging_ctdbd_connection(),name, &db_ctdb->db_id, tdb_flags))) {
1181                 DEBUG(0, ("ctdbd_db_attach failed for %s\n", name));
1182                 TALLOC_FREE(result);
1183                 return NULL;
1184         }
1185
1186         db_path = ctdbd_dbpath(messaging_ctdbd_connection(), db_ctdb, db_ctdb->db_id);
1187
1188         result->persistent = ((tdb_flags & TDB_CLEAR_IF_FIRST) == 0);
1189
1190         /* only pass through specific flags */
1191         tdb_flags &= TDB_SEQNUM;
1192
1193         /* honor permissions if user has specified O_CREAT */
1194         if (open_flags & O_CREAT) {
1195                 chmod(db_path, mode);
1196         }
1197
1198         db_ctdb->wtdb = tdb_wrap_open(db_ctdb, db_path, hash_size, tdb_flags, O_RDWR, 0);
1199         if (db_ctdb->wtdb == NULL) {
1200                 DEBUG(0, ("Could not open tdb %s: %s\n", db_path, strerror(errno)));
1201                 TALLOC_FREE(result);
1202                 return NULL;
1203         }
1204         talloc_free(db_path);
1205
1206         result->private_data = (void *)db_ctdb;
1207         result->fetch_locked = db_ctdb_fetch_locked;
1208         result->fetch = db_ctdb_fetch;
1209         result->traverse = db_ctdb_traverse;
1210         result->traverse_read = db_ctdb_traverse_read;
1211         result->get_seqnum = db_ctdb_get_seqnum;
1212         result->transaction_start = db_ctdb_transaction_start;
1213         result->transaction_commit = db_ctdb_transaction_commit;
1214         result->transaction_cancel = db_ctdb_transaction_cancel;
1215
1216         DEBUG(3,("db_open_ctdb: opened database '%s' with dbid 0x%x\n",
1217                  name, db_ctdb->db_id));
1218
1219         return result;
1220 }
1221 #endif