Fix some nonempty blank lines
[samba.git] / source3 / lib / dbwrap_ctdb.c
1 /* 
2    Unix SMB/CIFS implementation.
3    Database interface wrapper around ctdbd
4    Copyright (C) Volker Lendecke 2007
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15
16    You should have received a copy of the GNU General Public License
17    along with this program.  If not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #ifdef CLUSTER_SUPPORT
22 #include "ctdb.h"
23 #include "ctdb_private.h"
24 #include "ctdbd_conn.h"
25
26 struct db_ctdb_transaction_handle {
27         struct db_ctdb_ctx *ctx;
28         bool in_replay;
29         /* we store the reads and writes done under a transaction one
30            list stores both reads and writes, the other just writes
31         */
32         struct ctdb_marshall_buffer *m_all;
33         struct ctdb_marshall_buffer *m_write;
34         uint32_t nesting;
35         bool nested_cancel;
36 };
37
38 struct db_ctdb_ctx {
39         struct db_context *db;
40         struct tdb_wrap *wtdb;
41         uint32 db_id;
42         struct db_ctdb_transaction_handle *transaction;
43 };
44
45 struct db_ctdb_rec {
46         struct db_ctdb_ctx *ctdb_ctx;
47         struct ctdb_ltdb_header header;
48 };
49
50 static struct db_record *fetch_locked_internal(struct db_ctdb_ctx *ctx,
51                                                TALLOC_CTX *mem_ctx,
52                                                TDB_DATA key,
53                                                bool persistent);
54
55 static NTSTATUS tdb_error_to_ntstatus(struct tdb_context *tdb)
56 {
57         NTSTATUS status;
58         enum TDB_ERROR tret = tdb_error(tdb);
59
60         switch (tret) {
61         case TDB_ERR_EXISTS:
62                 status = NT_STATUS_OBJECT_NAME_COLLISION;
63                 break;
64         case TDB_ERR_NOEXIST:
65                 status = NT_STATUS_OBJECT_NAME_NOT_FOUND;
66                 break;
67         default:
68                 status = NT_STATUS_INTERNAL_DB_CORRUPTION;
69                 break;
70         }
71
72         return status;
73 }
74
75
76
77 /*
78   form a ctdb_rec_data record from a key/data pair
79
80   note that header may be NULL. If not NULL then it is included in the data portion
81   of the record
82  */
83 static struct ctdb_rec_data *db_ctdb_marshall_record(TALLOC_CTX *mem_ctx, uint32_t reqid,       
84                                                   TDB_DATA key, 
85                                                   struct ctdb_ltdb_header *header,
86                                                   TDB_DATA data)
87 {
88         size_t length;
89         struct ctdb_rec_data *d;
90
91         length = offsetof(struct ctdb_rec_data, data) + key.dsize + 
92                 data.dsize + (header?sizeof(*header):0);
93         d = (struct ctdb_rec_data *)talloc_size(mem_ctx, length);
94         if (d == NULL) {
95                 return NULL;
96         }
97         d->length = length;
98         d->reqid = reqid;
99         d->keylen = key.dsize;
100         memcpy(&d->data[0], key.dptr, key.dsize);
101         if (header) {
102                 d->datalen = data.dsize + sizeof(*header);
103                 memcpy(&d->data[key.dsize], header, sizeof(*header));
104                 memcpy(&d->data[key.dsize+sizeof(*header)], data.dptr, data.dsize);
105         } else {
106                 d->datalen = data.dsize;
107                 memcpy(&d->data[key.dsize], data.dptr, data.dsize);
108         }
109         return d;
110 }
111
112
113 /* helper function for marshalling multiple records */
114 static struct ctdb_marshall_buffer *db_ctdb_marshall_add(TALLOC_CTX *mem_ctx, 
115                                                struct ctdb_marshall_buffer *m,
116                                                uint64_t db_id,
117                                                uint32_t reqid,
118                                                TDB_DATA key,
119                                                struct ctdb_ltdb_header *header,
120                                                TDB_DATA data)
121 {
122         struct ctdb_rec_data *r;
123         size_t m_size, r_size;
124         struct ctdb_marshall_buffer *m2;
125
126         r = db_ctdb_marshall_record(mem_ctx, reqid, key, header, data);
127         if (r == NULL) {
128                 talloc_free(m);
129                 return NULL;
130         }
131
132         if (m == NULL) {
133                 m = (struct ctdb_marshall_buffer *)talloc_zero_size(
134                         mem_ctx, offsetof(struct ctdb_marshall_buffer, data));
135                 if (m == NULL) {
136                         return NULL;
137                 }
138                 m->db_id = db_id;
139         }
140
141         m_size = talloc_get_size(m);
142         r_size = talloc_get_size(r);
143
144         m2 = (struct ctdb_marshall_buffer *)talloc_realloc_size(
145                 mem_ctx, m,  m_size + r_size);
146         if (m2 == NULL) {
147                 talloc_free(m);
148                 return NULL;
149         }
150
151         memcpy(m_size + (uint8_t *)m2, r, r_size);
152
153         talloc_free(r);
154
155         m2->count++;
156
157         return m2;
158 }
159
160 /* we've finished marshalling, return a data blob with the marshalled records */
161 static TDB_DATA db_ctdb_marshall_finish(struct ctdb_marshall_buffer *m)
162 {
163         TDB_DATA data;
164         data.dptr = (uint8_t *)m;
165         data.dsize = talloc_get_size(m);
166         return data;
167 }
168
169 /* 
170    loop over a marshalling buffer 
171
172      - pass r==NULL to start
173      - loop the number of times indicated by m->count
174 */
175 static struct ctdb_rec_data *db_ctdb_marshall_loop_next(struct ctdb_marshall_buffer *m, struct ctdb_rec_data *r,
176                                                      uint32_t *reqid,
177                                                      struct ctdb_ltdb_header *header,
178                                                      TDB_DATA *key, TDB_DATA *data)
179 {
180         if (r == NULL) {
181                 r = (struct ctdb_rec_data *)&m->data[0];
182         } else {
183                 r = (struct ctdb_rec_data *)(r->length + (uint8_t *)r);
184         }
185
186         if (reqid != NULL) {
187                 *reqid = r->reqid;
188         }
189
190         if (key != NULL) {
191                 key->dptr   = &r->data[0];
192                 key->dsize  = r->keylen;
193         }
194         if (data != NULL) {
195                 data->dptr  = &r->data[r->keylen];
196                 data->dsize = r->datalen;
197                 if (header != NULL) {
198                         data->dptr += sizeof(*header);
199                         data->dsize -= sizeof(*header);
200                 }
201         }
202
203         if (header != NULL) {
204                 if (r->datalen < sizeof(*header)) {
205                         return NULL;
206                 }
207                 *header = *(struct ctdb_ltdb_header *)&r->data[r->keylen];
208         }
209
210         return r;
211 }
212
213
214
215 /* start a transaction on a database */
216 static int db_ctdb_transaction_destructor(struct db_ctdb_transaction_handle *h)
217 {
218         tdb_transaction_cancel(h->ctx->wtdb->tdb);
219         return 0;
220 }
221
222 /* start a transaction on a database */
223 static int db_ctdb_transaction_fetch_start(struct db_ctdb_transaction_handle *h)
224 {
225         struct db_record *rh;
226         TDB_DATA key;
227         TALLOC_CTX *tmp_ctx;
228         const char *keyname = CTDB_TRANSACTION_LOCK_KEY;
229         int ret;
230         struct db_ctdb_ctx *ctx = h->ctx;
231         TDB_DATA data;
232
233         key.dptr = (uint8_t *)discard_const(keyname);
234         key.dsize = strlen(keyname);
235
236 again:
237         tmp_ctx = talloc_new(h);
238
239         rh = fetch_locked_internal(ctx, tmp_ctx, key, true);
240         if (rh == NULL) {
241                 DEBUG(0,(__location__ " Failed to fetch_lock database\n"));             
242                 talloc_free(tmp_ctx);
243                 return -1;
244         }
245         talloc_free(rh);
246
247         ret = tdb_transaction_start(ctx->wtdb->tdb);
248         if (ret != 0) {
249                 DEBUG(0,(__location__ " Failed to start tdb transaction\n"));
250                 talloc_free(tmp_ctx);
251                 return -1;
252         }
253
254         data = tdb_fetch(ctx->wtdb->tdb, key);
255         if ((data.dptr == NULL) ||
256             (data.dsize < sizeof(struct ctdb_ltdb_header)) ||
257             ((struct ctdb_ltdb_header *)data.dptr)->dmaster != get_my_vnn()) {
258                 SAFE_FREE(data.dptr);
259                 tdb_transaction_cancel(ctx->wtdb->tdb);
260                 talloc_free(tmp_ctx);
261                 goto again;
262         }
263
264         SAFE_FREE(data.dptr);
265         talloc_free(tmp_ctx);
266
267         return 0;
268 }
269
270
271 /* start a transaction on a database */
272 static int db_ctdb_transaction_start(struct db_context *db)
273 {
274         struct db_ctdb_transaction_handle *h;
275         int ret;
276         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
277                                                         struct db_ctdb_ctx);
278
279         if (!db->persistent) {
280                 DEBUG(0,("transactions not supported on non-persistent database 0x%08x\n", 
281                          ctx->db_id));
282                 return -1;
283         }
284
285         if (ctx->transaction) {
286                 ctx->transaction->nesting++;
287                 return 0;
288         }
289
290         h = talloc_zero(db, struct db_ctdb_transaction_handle);
291         if (h == NULL) {
292                 DEBUG(0,(__location__ " oom for transaction handle\n"));                
293                 return -1;
294         }
295
296         h->ctx = ctx;
297
298         ret = db_ctdb_transaction_fetch_start(h);
299         if (ret != 0) {
300                 talloc_free(h);
301                 return -1;
302         }
303
304         talloc_set_destructor(h, db_ctdb_transaction_destructor);
305
306         ctx->transaction = h;
307
308         DEBUG(5,(__location__ " Started transaction on db 0x%08x\n", ctx->db_id));
309
310         return 0;
311 }
312
313
314
315 /*
316   fetch a record inside a transaction
317  */
318 static int db_ctdb_transaction_fetch(struct db_ctdb_ctx *db, 
319                                      TALLOC_CTX *mem_ctx, 
320                                      TDB_DATA key, TDB_DATA *data)
321 {
322         struct db_ctdb_transaction_handle *h = db->transaction;
323
324         *data = tdb_fetch(h->ctx->wtdb->tdb, key);
325
326         if (data->dptr != NULL) {
327                 uint8_t *oldptr = (uint8_t *)data->dptr;
328                 data->dsize -= sizeof(struct ctdb_ltdb_header);
329                 if (data->dsize == 0) {
330                         data->dptr = NULL;
331                 } else {
332                         data->dptr = (uint8 *)
333                                 talloc_memdup(
334                                         mem_ctx, data->dptr+sizeof(struct ctdb_ltdb_header),
335                                         data->dsize);
336                 }
337                 SAFE_FREE(oldptr);
338                 if (data->dptr == NULL && data->dsize != 0) {
339                         return -1;
340                 }
341         }
342
343         if (!h->in_replay) {
344                 h->m_all = db_ctdb_marshall_add(h, h->m_all, h->ctx->db_id, 1, key, NULL, *data);
345                 if (h->m_all == NULL) {
346                         DEBUG(0,(__location__ " Failed to add to marshalling record\n"));
347                         data->dsize = 0;
348                         talloc_free(data->dptr);
349                         return -1;
350                 }
351         }
352
353         return 0;
354 }
355
356
357 static NTSTATUS db_ctdb_store_transaction(struct db_record *rec, TDB_DATA data, int flag);
358 static NTSTATUS db_ctdb_delete_transaction(struct db_record *rec);
359
360 static struct db_record *db_ctdb_fetch_locked_transaction(struct db_ctdb_ctx *ctx,
361                                                           TALLOC_CTX *mem_ctx,
362                                                           TDB_DATA key)
363 {
364         struct db_record *result;
365         TDB_DATA ctdb_data;
366
367         if (!(result = talloc(mem_ctx, struct db_record))) {
368                 DEBUG(0, ("talloc failed\n"));
369                 return NULL;
370         }
371
372         result->private_data = ctx->transaction;
373
374         result->key.dsize = key.dsize;
375         result->key.dptr = (uint8 *)talloc_memdup(result, key.dptr, key.dsize);
376         if (result->key.dptr == NULL) {
377                 DEBUG(0, ("talloc failed\n"));
378                 TALLOC_FREE(result);
379                 return NULL;
380         }
381
382         result->store = db_ctdb_store_transaction;
383         result->delete_rec = db_ctdb_delete_transaction;
384
385         ctdb_data = tdb_fetch(ctx->wtdb->tdb, key);
386         if (ctdb_data.dptr == NULL) {
387                 /* create the record */
388                 result->value = tdb_null;
389                 return result;
390         }
391
392         result->value.dsize = ctdb_data.dsize - sizeof(struct ctdb_ltdb_header);
393         result->value.dptr = NULL;
394
395         if ((result->value.dsize != 0)
396             && !(result->value.dptr = (uint8 *)talloc_memdup(
397                          result, ctdb_data.dptr + sizeof(struct ctdb_ltdb_header),
398                          result->value.dsize))) {
399                 DEBUG(0, ("talloc failed\n"));
400                 TALLOC_FREE(result);
401         }
402
403         SAFE_FREE(ctdb_data.dptr);
404
405         return result;
406 }
407
408 static int db_ctdb_record_destructor(struct db_record *rec)
409 {
410         struct db_ctdb_transaction_handle *h = talloc_get_type_abort(
411                 rec->private_data, struct db_ctdb_transaction_handle);
412         int ret = h->ctx->db->transaction_commit(h->ctx->db);
413         if (ret != 0) {
414                 DEBUG(0,(__location__ " transaction_commit failed\n"));
415         }
416         return 0;
417 }
418
419 /*
420   auto-create a transaction for persistent databases
421  */
422 static struct db_record *db_ctdb_fetch_locked_persistent(struct db_ctdb_ctx *ctx,
423                                                          TALLOC_CTX *mem_ctx,
424                                                          TDB_DATA key)
425 {
426         int res;
427         struct db_record *rec;
428
429         res = db_ctdb_transaction_start(ctx->db);
430         if (res == -1) {
431                 return NULL;
432         }
433
434         rec = db_ctdb_fetch_locked_transaction(ctx, mem_ctx, key);
435         if (rec == NULL) {
436                 ctx->db->transaction_cancel(ctx->db);           
437                 return NULL;
438         }
439
440         /* destroy this transaction when we release the lock */
441         talloc_set_destructor((struct db_record *)talloc_new(rec), db_ctdb_record_destructor);
442         return rec;
443 }
444
445
446 /*
447   stores a record inside a transaction
448  */
449 static int db_ctdb_transaction_store(struct db_ctdb_transaction_handle *h, 
450                                      TDB_DATA key, TDB_DATA data)
451 {
452         TALLOC_CTX *tmp_ctx = talloc_new(h);
453         int ret;
454         TDB_DATA rec;
455         struct ctdb_ltdb_header header;
456
457         /* we need the header so we can update the RSN */
458         rec = tdb_fetch(h->ctx->wtdb->tdb, key);
459         if (rec.dptr == NULL) {
460                 /* the record doesn't exist - create one with us as dmaster.
461                    This is only safe because we are in a transaction and this
462                    is a persistent database */
463                 ZERO_STRUCT(header);
464                 header.dmaster = get_my_vnn();
465         } else {
466                 memcpy(&header, rec.dptr, sizeof(struct ctdb_ltdb_header));
467                 rec.dsize -= sizeof(struct ctdb_ltdb_header);
468                 /* a special case, we are writing the same data that is there now */
469                 if (data.dsize == rec.dsize &&
470                     memcmp(data.dptr, rec.dptr + sizeof(struct ctdb_ltdb_header), data.dsize) == 0) {
471                         SAFE_FREE(rec.dptr);
472                         talloc_free(tmp_ctx);
473                         return 0;
474                 }
475                 SAFE_FREE(rec.dptr);
476         }
477
478         header.rsn++;
479
480         if (!h->in_replay) {
481                 h->m_all = db_ctdb_marshall_add(h, h->m_all, h->ctx->db_id, 0, key, NULL, data);
482                 if (h->m_all == NULL) {
483                         DEBUG(0,(__location__ " Failed to add to marshalling record\n"));
484                         talloc_free(tmp_ctx);
485                         return -1;
486                 }
487         }
488
489         h->m_write = db_ctdb_marshall_add(h, h->m_write, h->ctx->db_id, 0, key, &header, data);
490         if (h->m_write == NULL) {
491                 DEBUG(0,(__location__ " Failed to add to marshalling record\n"));
492                 talloc_free(tmp_ctx);
493                 return -1;
494         }
495
496         rec.dsize = data.dsize + sizeof(struct ctdb_ltdb_header);
497         rec.dptr = (uint8_t *)talloc_size(tmp_ctx, rec.dsize);
498         if (rec.dptr == NULL) {
499                 DEBUG(0,(__location__ " Failed to alloc record\n"));
500                 talloc_free(tmp_ctx);
501                 return -1;
502         }
503         memcpy(rec.dptr, &header, sizeof(struct ctdb_ltdb_header));
504         memcpy(sizeof(struct ctdb_ltdb_header) + (uint8_t *)rec.dptr, data.dptr, data.dsize);
505
506         ret = tdb_store(h->ctx->wtdb->tdb, key, rec, TDB_REPLACE);
507
508         talloc_free(tmp_ctx);
509
510         return ret;
511 }
512
513
514 /* 
515    a record store inside a transaction
516  */
517 static NTSTATUS db_ctdb_store_transaction(struct db_record *rec, TDB_DATA data, int flag)
518 {
519         struct db_ctdb_transaction_handle *h = talloc_get_type_abort(
520                 rec->private_data, struct db_ctdb_transaction_handle);
521         int ret;
522
523         ret = db_ctdb_transaction_store(h, rec->key, data);
524         if (ret != 0) {
525                 return tdb_error_to_ntstatus(h->ctx->wtdb->tdb);
526         }
527         return NT_STATUS_OK;
528 }
529
530 /* 
531    a record delete inside a transaction
532  */
533 static NTSTATUS db_ctdb_delete_transaction(struct db_record *rec)
534 {
535         struct db_ctdb_transaction_handle *h = talloc_get_type_abort(
536                 rec->private_data, struct db_ctdb_transaction_handle);
537         int ret;
538
539         ret = db_ctdb_transaction_store(h, rec->key, tdb_null);
540         if (ret != 0) {
541                 return tdb_error_to_ntstatus(h->ctx->wtdb->tdb);
542         }
543         return NT_STATUS_OK;
544 }
545
546
547 /*
548   replay a transaction
549  */
550 static int ctdb_replay_transaction(struct db_ctdb_transaction_handle *h)
551 {
552         int ret, i;
553         struct ctdb_rec_data *rec = NULL;
554
555         h->in_replay = true;
556         talloc_free(h->m_write);
557         h->m_write = NULL;
558
559         ret = db_ctdb_transaction_fetch_start(h);
560         if (ret != 0) {
561                 return ret;
562         }
563
564         for (i=0;i<h->m_all->count;i++) {
565                 TDB_DATA key, data;
566
567                 rec = db_ctdb_marshall_loop_next(h->m_all, rec, NULL, NULL, &key, &data);
568                 if (rec == NULL) {
569                         DEBUG(0, (__location__ " Out of records in ctdb_replay_transaction?\n"));
570                         goto failed;
571                 }
572
573                 if (rec->reqid == 0) {
574                         /* its a store */
575                         if (db_ctdb_transaction_store(h, key, data) != 0) {
576                                 goto failed;
577                         }
578                 } else {
579                         TDB_DATA data2;
580                         TALLOC_CTX *tmp_ctx = talloc_new(h);
581
582                         if (db_ctdb_transaction_fetch(h->ctx, tmp_ctx, key, &data2) != 0) {
583                                 talloc_free(tmp_ctx);
584                                 goto failed;
585                         }
586                         if (data2.dsize != data.dsize ||
587                             memcmp(data2.dptr, data.dptr, data.dsize) != 0) {
588                                 /* the record has changed on us - we have to give up */
589                                 talloc_free(tmp_ctx);
590                                 goto failed;
591                         }
592                         talloc_free(tmp_ctx);
593                 }
594         }
595
596         return 0;
597
598 failed:
599         tdb_transaction_cancel(h->ctx->wtdb->tdb);
600         return -1;
601 }
602
603
604 /*
605   commit a transaction
606  */
607 static int db_ctdb_transaction_commit(struct db_context *db)
608 {
609         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
610                                                         struct db_ctdb_ctx);
611         NTSTATUS rets;
612         int ret;
613         int status;
614         int retries = 0;
615         struct db_ctdb_transaction_handle *h = ctx->transaction;
616         enum ctdb_controls failure_control = CTDB_CONTROL_TRANS2_ERROR;
617
618         if (h == NULL) {
619                 DEBUG(0,(__location__ " transaction commit with no open transaction on db 0x%08x\n", ctx->db_id));
620                 return -1;
621         }
622
623         if (h->nested_cancel) {
624                 db->transaction_cancel(db);
625                 DEBUG(5,(__location__ " Failed transaction commit after nested cancel\n"));
626                 return -1;
627         }
628
629         if (h->nesting != 0) {
630                 h->nesting--;
631                 return 0;
632         }
633
634         DEBUG(5,(__location__ " Commit transaction on db 0x%08x\n", ctx->db_id));
635
636         talloc_set_destructor(h, NULL);
637
638         /* our commit strategy is quite complex.
639
640            - we first try to commit the changes to all other nodes
641
642            - if that works, then we commit locally and we are done
643
644            - if a commit on another node fails, then we need to cancel
645              the transaction, then restart the transaction (thus
646              opening a window of time for a pending recovery to
647              complete), then replay the transaction, checking all the
648              reads and writes (checking that reads give the same data,
649              and writes succeed). Then we retry the transaction to the
650              other nodes
651         */
652
653 again:
654         if (h->m_write == NULL) {
655                 /* no changes were made, potentially after a retry */
656                 tdb_transaction_cancel(h->ctx->wtdb->tdb);
657                 talloc_free(h);
658                 ctx->transaction = NULL;
659                 return 0;
660         }
661
662         /* tell ctdbd to commit to the other nodes */
663         rets = ctdbd_control_local(messaging_ctdbd_connection(), 
664                                    retries==0?CTDB_CONTROL_TRANS2_COMMIT:CTDB_CONTROL_TRANS2_COMMIT_RETRY, 
665                                    h->ctx->db_id, 0,
666                                    db_ctdb_marshall_finish(h->m_write), NULL, NULL, &status);
667         if (!NT_STATUS_IS_OK(rets) || status != 0) {
668                 tdb_transaction_cancel(h->ctx->wtdb->tdb);
669                 sleep(1);
670
671                 if (!NT_STATUS_IS_OK(rets)) {
672                         failure_control = CTDB_CONTROL_TRANS2_ERROR;                    
673                 } else {
674                         /* work out what error code we will give if we 
675                            have to fail the operation */
676                         switch ((enum ctdb_trans2_commit_error)status) {
677                         case CTDB_TRANS2_COMMIT_SUCCESS:
678                         case CTDB_TRANS2_COMMIT_SOMEFAIL:
679                         case CTDB_TRANS2_COMMIT_TIMEOUT:
680                                 failure_control = CTDB_CONTROL_TRANS2_ERROR;
681                                 break;
682                         case CTDB_TRANS2_COMMIT_ALLFAIL:
683                                 failure_control = CTDB_CONTROL_TRANS2_FINISHED;
684                                 break;
685                         }
686                 }
687
688                 if (++retries == 5) {
689                         DEBUG(0,(__location__ " Giving up transaction on db 0x%08x after %d retries failure_control=%u\n", 
690                                  h->ctx->db_id, retries, (unsigned)failure_control));
691                         ctdbd_control_local(messaging_ctdbd_connection(), failure_control,
692                                             h->ctx->db_id, CTDB_CTRL_FLAG_NOREPLY, 
693                                             tdb_null, NULL, NULL, NULL);
694                         h->ctx->transaction = NULL;
695                         talloc_free(h);
696                         ctx->transaction = NULL;
697                         return -1;                      
698                 }
699
700                 if (ctdb_replay_transaction(h) != 0) {
701                         DEBUG(0,(__location__ " Failed to replay transaction failure_control=%u\n",
702                                  (unsigned)failure_control));
703                         ctdbd_control_local(messaging_ctdbd_connection(), failure_control,
704                                             h->ctx->db_id, CTDB_CTRL_FLAG_NOREPLY, 
705                                             tdb_null, NULL, NULL, NULL);
706                         h->ctx->transaction = NULL;
707                         talloc_free(h);
708                         ctx->transaction = NULL;
709                         return -1;
710                 }
711                 goto again;
712         } else {
713                 failure_control = CTDB_CONTROL_TRANS2_ERROR;
714         }
715
716         /* do the real commit locally */
717         ret = tdb_transaction_commit(h->ctx->wtdb->tdb);
718         if (ret != 0) {
719                 DEBUG(0,(__location__ " Failed to commit transaction failure_control=%u\n",
720                          (unsigned)failure_control));
721                 ctdbd_control_local(messaging_ctdbd_connection(), failure_control, h->ctx->db_id, 
722                                     CTDB_CTRL_FLAG_NOREPLY, tdb_null, NULL, NULL, NULL);
723                 h->ctx->transaction = NULL;
724                 talloc_free(h);
725                 return ret;
726         }
727
728         /* tell ctdbd that we are finished with our local commit */
729         ctdbd_control_local(messaging_ctdbd_connection(), CTDB_CONTROL_TRANS2_FINISHED, 
730                             h->ctx->db_id, CTDB_CTRL_FLAG_NOREPLY, 
731                             tdb_null, NULL, NULL, NULL);
732         h->ctx->transaction = NULL;
733         talloc_free(h);
734         return 0;
735 }
736
737
738 /*
739   cancel a transaction
740  */
741 static int db_ctdb_transaction_cancel(struct db_context *db)
742 {
743         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
744                                                         struct db_ctdb_ctx);
745         struct db_ctdb_transaction_handle *h = ctx->transaction;
746
747         if (h == NULL) {
748                 DEBUG(0,(__location__ " transaction cancel with no open transaction on db 0x%08x\n", ctx->db_id));
749                 return -1;
750         }
751
752         if (h->nesting != 0) {
753                 h->nesting--;
754                 h->nested_cancel = true;
755                 return 0;
756         }
757
758         DEBUG(5,(__location__ " Cancel transaction on db 0x%08x\n", ctx->db_id));
759
760         ctx->transaction = NULL;
761         talloc_free(h);
762         return 0;
763 }
764
765
766 static NTSTATUS db_ctdb_store(struct db_record *rec, TDB_DATA data, int flag)
767 {
768         struct db_ctdb_rec *crec = talloc_get_type_abort(
769                 rec->private_data, struct db_ctdb_rec);
770         TDB_DATA cdata;
771         int ret;
772
773         cdata.dsize = sizeof(crec->header) + data.dsize;
774
775         if (!(cdata.dptr = SMB_MALLOC_ARRAY(uint8, cdata.dsize))) {
776                 return NT_STATUS_NO_MEMORY;
777         }
778
779         memcpy(cdata.dptr, &crec->header, sizeof(crec->header));
780         memcpy(cdata.dptr + sizeof(crec->header), data.dptr, data.dsize);
781
782         ret = tdb_store(crec->ctdb_ctx->wtdb->tdb, rec->key, cdata, TDB_REPLACE);
783
784         SAFE_FREE(cdata.dptr);
785
786         return (ret == 0) ? NT_STATUS_OK
787                           : tdb_error_to_ntstatus(crec->ctdb_ctx->wtdb->tdb);
788 }
789
790
791
792 static NTSTATUS db_ctdb_delete(struct db_record *rec)
793 {
794         TDB_DATA data;
795
796         /*
797          * We have to store the header with empty data. TODO: Fix the
798          * tdb-level cleanup
799          */
800
801         ZERO_STRUCT(data);
802
803         return db_ctdb_store(rec, data, 0);
804
805 }
806
807 static int db_ctdb_record_destr(struct db_record* data)
808 {
809         struct db_ctdb_rec *crec = talloc_get_type_abort(
810                 data->private_data, struct db_ctdb_rec);
811
812         DEBUG(10, (DEBUGLEVEL > 10
813                    ? "Unlocking db %u key %s\n"
814                    : "Unlocking db %u key %.20s\n",
815                    (int)crec->ctdb_ctx->db_id,
816                    hex_encode(data, (unsigned char *)data->key.dptr,
817                               data->key.dsize)));
818
819         if (tdb_chainunlock(crec->ctdb_ctx->wtdb->tdb, data->key) != 0) {
820                 DEBUG(0, ("tdb_chainunlock failed\n"));
821                 return -1;
822         }
823
824         return 0;
825 }
826
827 static struct db_record *fetch_locked_internal(struct db_ctdb_ctx *ctx,
828                                                TALLOC_CTX *mem_ctx,
829                                                TDB_DATA key,
830                                                bool persistent)
831 {
832         struct db_record *result;
833         struct db_ctdb_rec *crec;
834         NTSTATUS status;
835         TDB_DATA ctdb_data;
836         int migrate_attempts = 0;
837
838         if (!(result = talloc(mem_ctx, struct db_record))) {
839                 DEBUG(0, ("talloc failed\n"));
840                 return NULL;
841         }
842
843         if (!(crec = TALLOC_ZERO_P(result, struct db_ctdb_rec))) {
844                 DEBUG(0, ("talloc failed\n"));
845                 TALLOC_FREE(result);
846                 return NULL;
847         }
848
849         result->private_data = (void *)crec;
850         crec->ctdb_ctx = ctx;
851
852         result->key.dsize = key.dsize;
853         result->key.dptr = (uint8 *)talloc_memdup(result, key.dptr, key.dsize);
854         if (result->key.dptr == NULL) {
855                 DEBUG(0, ("talloc failed\n"));
856                 TALLOC_FREE(result);
857                 return NULL;
858         }
859
860         /*
861          * Do a blocking lock on the record
862          */
863 again:
864
865         if (DEBUGLEVEL >= 10) {
866                 char *keystr = hex_encode(result, key.dptr, key.dsize);
867                 DEBUG(10, (DEBUGLEVEL > 10
868                            ? "Locking db %u key %s\n"
869                            : "Locking db %u key %.20s\n",
870                            (int)crec->ctdb_ctx->db_id, keystr));
871                 TALLOC_FREE(keystr);
872         }
873
874         if (tdb_chainlock(ctx->wtdb->tdb, key) != 0) {
875                 DEBUG(3, ("tdb_chainlock failed\n"));
876                 TALLOC_FREE(result);
877                 return NULL;
878         }
879
880         result->store = db_ctdb_store;
881         result->delete_rec = db_ctdb_delete;
882         talloc_set_destructor(result, db_ctdb_record_destr);
883
884         ctdb_data = tdb_fetch(ctx->wtdb->tdb, key);
885
886         /*
887          * See if we have a valid record and we are the dmaster. If so, we can
888          * take the shortcut and just return it.
889          */
890
891         if ((ctdb_data.dptr == NULL) ||
892             (ctdb_data.dsize < sizeof(struct ctdb_ltdb_header)) ||
893             ((struct ctdb_ltdb_header *)ctdb_data.dptr)->dmaster != get_my_vnn()
894 #if 0
895             || (random() % 2 != 0)
896 #endif
897 ) {
898                 SAFE_FREE(ctdb_data.dptr);
899                 tdb_chainunlock(ctx->wtdb->tdb, key);
900                 talloc_set_destructor(result, NULL);
901
902                 migrate_attempts += 1;
903
904                 DEBUG(10, ("ctdb_data.dptr = %p, dmaster = %u (%u)\n",
905                            ctdb_data.dptr, ctdb_data.dptr ?
906                            ((struct ctdb_ltdb_header *)ctdb_data.dptr)->dmaster : -1,
907                            get_my_vnn()));
908
909                 status = ctdbd_migrate(messaging_ctdbd_connection(),ctx->db_id, key);
910                 if (!NT_STATUS_IS_OK(status)) {
911                         DEBUG(5, ("ctdb_migrate failed: %s\n",
912                                   nt_errstr(status)));
913                         TALLOC_FREE(result);
914                         return NULL;
915                 }
916                 /* now its migrated, try again */
917                 goto again;
918         }
919
920         if (migrate_attempts > 10) {
921                 DEBUG(0, ("db_ctdb_fetch_locked needed %d attempts\n",
922                           migrate_attempts));
923         }
924
925         memcpy(&crec->header, ctdb_data.dptr, sizeof(crec->header));
926
927         result->value.dsize = ctdb_data.dsize - sizeof(crec->header);
928         result->value.dptr = NULL;
929
930         if ((result->value.dsize != 0)
931             && !(result->value.dptr = (uint8 *)talloc_memdup(
932                          result, ctdb_data.dptr + sizeof(crec->header),
933                          result->value.dsize))) {
934                 DEBUG(0, ("talloc failed\n"));
935                 TALLOC_FREE(result);
936         }
937
938         SAFE_FREE(ctdb_data.dptr);
939
940         return result;
941 }
942
943 static struct db_record *db_ctdb_fetch_locked(struct db_context *db,
944                                               TALLOC_CTX *mem_ctx,
945                                               TDB_DATA key)
946 {
947         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
948                                                         struct db_ctdb_ctx);
949
950         if (ctx->transaction != NULL) {
951                 return db_ctdb_fetch_locked_transaction(ctx, mem_ctx, key);
952         }
953
954         if (db->persistent) {
955                 return db_ctdb_fetch_locked_persistent(ctx, mem_ctx, key);
956         }
957
958         return fetch_locked_internal(ctx, mem_ctx, key, db->persistent);
959 }
960
961 /*
962   fetch (unlocked, no migration) operation on ctdb
963  */
964 static int db_ctdb_fetch(struct db_context *db, TALLOC_CTX *mem_ctx,
965                          TDB_DATA key, TDB_DATA *data)
966 {
967         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
968                                                         struct db_ctdb_ctx);
969         NTSTATUS status;
970         TDB_DATA ctdb_data;
971
972         if (ctx->transaction) {
973                 return db_ctdb_transaction_fetch(ctx, mem_ctx, key, data);
974         }
975
976         /* try a direct fetch */
977         ctdb_data = tdb_fetch(ctx->wtdb->tdb, key);
978
979         /*
980          * See if we have a valid record and we are the dmaster. If so, we can
981          * take the shortcut and just return it.
982          * we bypass the dmaster check for persistent databases
983          */
984         if ((ctdb_data.dptr != NULL) &&
985             (ctdb_data.dsize >= sizeof(struct ctdb_ltdb_header)) &&
986             (db->persistent ||
987              ((struct ctdb_ltdb_header *)ctdb_data.dptr)->dmaster == get_my_vnn())) {
988                 /* we are the dmaster - avoid the ctdb protocol op */
989
990                 data->dsize = ctdb_data.dsize - sizeof(struct ctdb_ltdb_header);
991                 if (data->dsize == 0) {
992                         SAFE_FREE(ctdb_data.dptr);
993                         data->dptr = NULL;
994                         return 0;
995                 }
996
997                 data->dptr = (uint8 *)talloc_memdup(
998                         mem_ctx, ctdb_data.dptr+sizeof(struct ctdb_ltdb_header),
999                         data->dsize);
1000
1001                 SAFE_FREE(ctdb_data.dptr);
1002
1003                 if (data->dptr == NULL) {
1004                         return -1;
1005                 }
1006                 return 0;
1007         }
1008
1009         SAFE_FREE(ctdb_data.dptr);
1010
1011         /* we weren't able to get it locally - ask ctdb to fetch it for us */
1012         status = ctdbd_fetch(messaging_ctdbd_connection(),ctx->db_id, key, mem_ctx, data);
1013         if (!NT_STATUS_IS_OK(status)) {
1014                 DEBUG(5, ("ctdbd_fetch failed: %s\n", nt_errstr(status)));
1015                 return -1;
1016         }
1017
1018         return 0;
1019 }
1020
1021 struct traverse_state {
1022         struct db_context *db;
1023         int (*fn)(struct db_record *rec, void *private_data);
1024         void *private_data;
1025 };
1026
1027 static void traverse_callback(TDB_DATA key, TDB_DATA data, void *private_data)
1028 {
1029         struct traverse_state *state = (struct traverse_state *)private_data;
1030         struct db_record *rec;
1031         TALLOC_CTX *tmp_ctx = talloc_new(state->db);
1032         /* we have to give them a locked record to prevent races */
1033         rec = db_ctdb_fetch_locked(state->db, tmp_ctx, key);
1034         if (rec && rec->value.dsize > 0) {
1035                 state->fn(rec, state->private_data);
1036         }
1037         talloc_free(tmp_ctx);
1038 }
1039
1040 static int traverse_persistent_callback(TDB_CONTEXT *tdb, TDB_DATA kbuf, TDB_DATA dbuf,
1041                                         void *private_data)
1042 {
1043         struct traverse_state *state = (struct traverse_state *)private_data;
1044         struct db_record *rec;
1045         TALLOC_CTX *tmp_ctx = talloc_new(state->db);
1046         int ret = 0;
1047         /* we have to give them a locked record to prevent races */
1048         rec = db_ctdb_fetch_locked(state->db, tmp_ctx, kbuf);
1049         if (rec && rec->value.dsize > 0) {
1050                 ret = state->fn(rec, state->private_data);
1051         }
1052         talloc_free(tmp_ctx);
1053         return ret;
1054 }
1055
1056 static int db_ctdb_traverse(struct db_context *db,
1057                             int (*fn)(struct db_record *rec,
1058                                       void *private_data),
1059                             void *private_data)
1060 {
1061         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1062                                                         struct db_ctdb_ctx);
1063         struct traverse_state state;
1064
1065         state.db = db;
1066         state.fn = fn;
1067         state.private_data = private_data;
1068
1069         if (db->persistent) {
1070                 /* for persistent databases we don't need to do a ctdb traverse,
1071                    we can do a faster local traverse */
1072                 return tdb_traverse(ctx->wtdb->tdb, traverse_persistent_callback, &state);
1073         }
1074
1075
1076         ctdbd_traverse(ctx->db_id, traverse_callback, &state);
1077         return 0;
1078 }
1079
1080 static NTSTATUS db_ctdb_store_deny(struct db_record *rec, TDB_DATA data, int flag)
1081 {
1082         return NT_STATUS_MEDIA_WRITE_PROTECTED;
1083 }
1084
1085 static NTSTATUS db_ctdb_delete_deny(struct db_record *rec)
1086 {
1087         return NT_STATUS_MEDIA_WRITE_PROTECTED;
1088 }
1089
1090 static void traverse_read_callback(TDB_DATA key, TDB_DATA data, void *private_data)
1091 {
1092         struct traverse_state *state = (struct traverse_state *)private_data;
1093         struct db_record rec;
1094         rec.key = key;
1095         rec.value = data;
1096         rec.store = db_ctdb_store_deny;
1097         rec.delete_rec = db_ctdb_delete_deny;
1098         rec.private_data = state->db;
1099         state->fn(&rec, state->private_data);
1100 }
1101
1102 static int traverse_persistent_callback_read(TDB_CONTEXT *tdb, TDB_DATA kbuf, TDB_DATA dbuf,
1103                                         void *private_data)
1104 {
1105         struct traverse_state *state = (struct traverse_state *)private_data;
1106         struct db_record rec;
1107         rec.key = kbuf;
1108         rec.value = dbuf;
1109         rec.store = db_ctdb_store_deny;
1110         rec.delete_rec = db_ctdb_delete_deny;
1111         rec.private_data = state->db;
1112
1113         if (rec.value.dsize <= sizeof(struct ctdb_ltdb_header)) {
1114                 /* a deleted record */
1115                 return 0;
1116         }
1117         rec.value.dsize -= sizeof(struct ctdb_ltdb_header);
1118         rec.value.dptr += sizeof(struct ctdb_ltdb_header);
1119
1120         return state->fn(&rec, state->private_data);
1121 }
1122
1123 static int db_ctdb_traverse_read(struct db_context *db,
1124                                  int (*fn)(struct db_record *rec,
1125                                            void *private_data),
1126                                  void *private_data)
1127 {
1128         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1129                                                         struct db_ctdb_ctx);
1130         struct traverse_state state;
1131
1132         state.db = db;
1133         state.fn = fn;
1134         state.private_data = private_data;
1135
1136         if (db->persistent) {
1137                 /* for persistent databases we don't need to do a ctdb traverse,
1138                    we can do a faster local traverse */
1139                 return tdb_traverse_read(ctx->wtdb->tdb, traverse_persistent_callback_read, &state);
1140         }
1141
1142         ctdbd_traverse(ctx->db_id, traverse_read_callback, &state);
1143         return 0;
1144 }
1145
1146 static int db_ctdb_get_seqnum(struct db_context *db)
1147 {
1148         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1149                                                         struct db_ctdb_ctx);
1150         return tdb_get_seqnum(ctx->wtdb->tdb);
1151 }
1152
1153 struct db_context *db_open_ctdb(TALLOC_CTX *mem_ctx,
1154                                 const char *name,
1155                                 int hash_size, int tdb_flags,
1156                                 int open_flags, mode_t mode)
1157 {
1158         struct db_context *result;
1159         struct db_ctdb_ctx *db_ctdb;
1160         char *db_path;
1161
1162         if (!lp_clustering()) {
1163                 DEBUG(10, ("Clustering disabled -- no ctdb\n"));
1164                 return NULL;
1165         }
1166
1167         if (!(result = TALLOC_ZERO_P(mem_ctx, struct db_context))) {
1168                 DEBUG(0, ("talloc failed\n"));
1169                 TALLOC_FREE(result);
1170                 return NULL;
1171         }
1172
1173         if (!(db_ctdb = TALLOC_P(result, struct db_ctdb_ctx))) {
1174                 DEBUG(0, ("talloc failed\n"));
1175                 TALLOC_FREE(result);
1176                 return NULL;
1177         }
1178
1179         db_ctdb->transaction = NULL;
1180         db_ctdb->db = result;
1181
1182         if (!NT_STATUS_IS_OK(ctdbd_db_attach(messaging_ctdbd_connection(),name, &db_ctdb->db_id, tdb_flags))) {
1183                 DEBUG(0, ("ctdbd_db_attach failed for %s\n", name));
1184                 TALLOC_FREE(result);
1185                 return NULL;
1186         }
1187
1188         db_path = ctdbd_dbpath(messaging_ctdbd_connection(), db_ctdb, db_ctdb->db_id);
1189
1190         result->persistent = ((tdb_flags & TDB_CLEAR_IF_FIRST) == 0);
1191
1192         /* only pass through specific flags */
1193         tdb_flags &= TDB_SEQNUM;
1194
1195         /* honor permissions if user has specified O_CREAT */
1196         if (open_flags & O_CREAT) {
1197                 chmod(db_path, mode);
1198         }
1199
1200         db_ctdb->wtdb = tdb_wrap_open(db_ctdb, db_path, hash_size, tdb_flags, O_RDWR, 0);
1201         if (db_ctdb->wtdb == NULL) {
1202                 DEBUG(0, ("Could not open tdb %s: %s\n", db_path, strerror(errno)));
1203                 TALLOC_FREE(result);
1204                 return NULL;
1205         }
1206         talloc_free(db_path);
1207
1208         result->private_data = (void *)db_ctdb;
1209         result->fetch_locked = db_ctdb_fetch_locked;
1210         result->fetch = db_ctdb_fetch;
1211         result->traverse = db_ctdb_traverse;
1212         result->traverse_read = db_ctdb_traverse_read;
1213         result->get_seqnum = db_ctdb_get_seqnum;
1214         result->transaction_start = db_ctdb_transaction_start;
1215         result->transaction_commit = db_ctdb_transaction_commit;
1216         result->transaction_cancel = db_ctdb_transaction_cancel;
1217
1218         DEBUG(3,("db_open_ctdb: opened database '%s' with dbid 0x%x\n",
1219                  name, db_ctdb->db_id));
1220
1221         return result;
1222 }
1223 #endif