cope with the control failing completely without returning a status
[samba.git] / source3 / lib / dbwrap_ctdb.c
1 /* 
2    Unix SMB/CIFS implementation.
3    Database interface wrapper around ctdbd
4    Copyright (C) Volker Lendecke 2007
5    
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program.  If not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #ifdef CLUSTER_SUPPORT
22 #include "ctdb.h"
23 #include "ctdb_private.h"
24 #include "ctdbd_conn.h"
25
26 struct db_ctdb_transaction_handle {
27         struct db_ctdb_ctx *ctx;
28         bool in_replay;
29         /* we store the reads and writes done under a transaction one
30            list stores both reads and writes, the other just writes
31         */
32         struct ctdb_marshall_buffer *m_all;
33         struct ctdb_marshall_buffer *m_write;
34 };
35
36 struct db_ctdb_ctx {
37         struct db_context *db;
38         struct tdb_wrap *wtdb;
39         uint32 db_id;
40         struct db_ctdb_transaction_handle *transaction;
41 };
42
43 struct db_ctdb_rec {
44         struct db_ctdb_ctx *ctdb_ctx;
45         struct ctdb_ltdb_header header;
46 };
47
48 static struct db_record *fetch_locked_internal(struct db_ctdb_ctx *ctx,
49                                                TALLOC_CTX *mem_ctx,
50                                                TDB_DATA key,
51                                                bool persistent);
52
53 static NTSTATUS tdb_error_to_ntstatus(struct tdb_context *tdb)
54 {
55         NTSTATUS status;
56         enum TDB_ERROR tret = tdb_error(tdb);
57
58         switch (tret) {
59         case TDB_ERR_EXISTS:
60                 status = NT_STATUS_OBJECT_NAME_COLLISION;
61                 break;
62         case TDB_ERR_NOEXIST:
63                 status = NT_STATUS_OBJECT_NAME_NOT_FOUND;
64                 break;
65         default:
66                 status = NT_STATUS_INTERNAL_DB_CORRUPTION;
67                 break;
68         }
69
70         return status;
71 }
72
73
74
75 /*
76   form a ctdb_rec_data record from a key/data pair
77   
78   note that header may be NULL. If not NULL then it is included in the data portion
79   of the record
80  */
81 static struct ctdb_rec_data *db_ctdb_marshall_record(TALLOC_CTX *mem_ctx, uint32_t reqid,       
82                                                   TDB_DATA key, 
83                                                   struct ctdb_ltdb_header *header,
84                                                   TDB_DATA data)
85 {
86         size_t length;
87         struct ctdb_rec_data *d;
88
89         length = offsetof(struct ctdb_rec_data, data) + key.dsize + 
90                 data.dsize + (header?sizeof(*header):0);
91         d = (struct ctdb_rec_data *)talloc_size(mem_ctx, length);
92         if (d == NULL) {
93                 return NULL;
94         }
95         d->length = length;
96         d->reqid = reqid;
97         d->keylen = key.dsize;
98         memcpy(&d->data[0], key.dptr, key.dsize);
99         if (header) {
100                 d->datalen = data.dsize + sizeof(*header);
101                 memcpy(&d->data[key.dsize], header, sizeof(*header));
102                 memcpy(&d->data[key.dsize+sizeof(*header)], data.dptr, data.dsize);
103         } else {
104                 d->datalen = data.dsize;
105                 memcpy(&d->data[key.dsize], data.dptr, data.dsize);
106         }
107         return d;
108 }
109
110
111 /* helper function for marshalling multiple records */
112 static struct ctdb_marshall_buffer *db_ctdb_marshall_add(TALLOC_CTX *mem_ctx, 
113                                                struct ctdb_marshall_buffer *m,
114                                                uint64_t db_id,
115                                                uint32_t reqid,
116                                                TDB_DATA key,
117                                                struct ctdb_ltdb_header *header,
118                                                TDB_DATA data)
119 {
120         struct ctdb_rec_data *r;
121         size_t m_size, r_size;
122         struct ctdb_marshall_buffer *m2;
123
124         r = db_ctdb_marshall_record(mem_ctx, reqid, key, header, data);
125         if (r == NULL) {
126                 talloc_free(m);
127                 return NULL;
128         }
129
130         if (m == NULL) {
131                 m = talloc_zero_size(mem_ctx, offsetof(struct ctdb_marshall_buffer, data));
132                 if (m == NULL) {
133                         return NULL;
134                 }
135                 m->db_id = db_id;
136         }
137
138         m_size = talloc_get_size(m);
139         r_size = talloc_get_size(r);
140
141         m2 = talloc_realloc_size(mem_ctx, m,  m_size + r_size);
142         if (m2 == NULL) {
143                 talloc_free(m);
144                 return NULL;
145         }
146
147         memcpy(m_size + (uint8_t *)m2, r, r_size);
148
149         talloc_free(r);
150
151         m2->count++;
152
153         return m2;
154 }
155
156 /* we've finished marshalling, return a data blob with the marshalled records */
157 static TDB_DATA db_ctdb_marshall_finish(struct ctdb_marshall_buffer *m)
158 {
159         TDB_DATA data;
160         data.dptr = (uint8_t *)m;
161         data.dsize = talloc_get_size(m);
162         return data;
163 }
164
165 /* 
166    loop over a marshalling buffer 
167    
168      - pass r==NULL to start
169      - loop the number of times indicated by m->count
170 */
171 static struct ctdb_rec_data *db_ctdb_marshall_loop_next(struct ctdb_marshall_buffer *m, struct ctdb_rec_data *r,
172                                                      uint32_t *reqid,
173                                                      struct ctdb_ltdb_header *header,
174                                                      TDB_DATA *key, TDB_DATA *data)
175 {
176         if (r == NULL) {
177                 r = (struct ctdb_rec_data *)&m->data[0];
178         } else {
179                 r = (struct ctdb_rec_data *)(r->length + (uint8_t *)r);
180         }
181
182         if (reqid != NULL) {
183                 *reqid = r->reqid;
184         }
185         
186         if (key != NULL) {
187                 key->dptr   = &r->data[0];
188                 key->dsize  = r->keylen;
189         }
190         if (data != NULL) {
191                 data->dptr  = &r->data[r->keylen];
192                 data->dsize = r->datalen;
193                 if (header != NULL) {
194                         data->dptr += sizeof(*header);
195                         data->dsize -= sizeof(*header);
196                 }
197         }
198
199         if (header != NULL) {
200                 if (r->datalen < sizeof(*header)) {
201                         return NULL;
202                 }
203                 *header = *(struct ctdb_ltdb_header *)&r->data[r->keylen];
204         }
205
206         return r;
207 }
208
209
210
211 /* start a transaction on a database */
212 static int db_ctdb_transaction_destructor(struct db_ctdb_transaction_handle *h)
213 {
214         tdb_transaction_cancel(h->ctx->wtdb->tdb);
215         return 0;
216 }
217
218 /* start a transaction on a database */
219 static int db_ctdb_transaction_fetch_start(struct db_ctdb_transaction_handle *h)
220 {
221         struct db_record *rh;
222         TDB_DATA key;
223         TALLOC_CTX *tmp_ctx;
224         const char *keyname = CTDB_TRANSACTION_LOCK_KEY;
225         int ret;
226         struct db_ctdb_ctx *ctx = h->ctx;
227         TDB_DATA data;
228
229         key.dptr = discard_const(keyname);
230         key.dsize = strlen(keyname);
231
232 again:
233         tmp_ctx = talloc_new(h);
234
235         rh = fetch_locked_internal(ctx, tmp_ctx, key, true);
236         if (rh == NULL) {
237                 DEBUG(0,(__location__ " Failed to fetch_lock database\n"));             
238                 talloc_free(tmp_ctx);
239                 return -1;
240         }
241         talloc_free(rh);
242
243         ret = tdb_transaction_start(ctx->wtdb->tdb);
244         if (ret != 0) {
245                 DEBUG(0,(__location__ " Failed to start tdb transaction\n"));
246                 talloc_free(tmp_ctx);
247                 return -1;
248         }
249
250         data = tdb_fetch(ctx->wtdb->tdb, key);
251         if ((data.dptr == NULL) ||
252             (data.dsize < sizeof(struct ctdb_ltdb_header)) ||
253             ((struct ctdb_ltdb_header *)data.dptr)->dmaster != get_my_vnn()) {
254                 SAFE_FREE(data.dptr);
255                 tdb_transaction_cancel(ctx->wtdb->tdb);
256                 talloc_free(tmp_ctx);
257                 goto again;
258         }
259
260         SAFE_FREE(data.dptr);
261         talloc_free(tmp_ctx);
262
263         return 0;
264 }
265
266
267 /* start a transaction on a database */
268 static int db_ctdb_transaction_start(struct db_context *db)
269 {
270         struct db_ctdb_transaction_handle *h;
271         int ret;
272         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
273                                                         struct db_ctdb_ctx);
274
275         if (!db->persistent) {
276                 DEBUG(0,("transactions not supported on non-persistent database 0x%08x\n", 
277                          ctx->db_id));
278                 return -1;
279         }
280
281         if (ctx->transaction) {
282                 DEBUG(0,("Nested transactions not supported on db 0x%08x\n", ctx->db_id));
283                 return -1;
284         }
285
286         h = talloc_zero(db, struct db_ctdb_transaction_handle);
287         if (h == NULL) {
288                 DEBUG(0,(__location__ " oom for transaction handle\n"));                
289                 return -1;
290         }
291
292         h->ctx = ctx;
293
294         ret = db_ctdb_transaction_fetch_start(h);
295         if (ret != 0) {
296                 talloc_free(h);
297                 return -1;
298         }
299
300         talloc_set_destructor(h, db_ctdb_transaction_destructor);
301
302         ctx->transaction = h;
303
304         DEBUG(5,(__location__ " Started transaction on db 0x%08x\n", ctx->db_id));
305
306         return 0;
307 }
308
309
310
311 /*
312   fetch a record inside a transaction
313  */
314 static int db_ctdb_transaction_fetch(struct db_ctdb_ctx *db, 
315                                      TALLOC_CTX *mem_ctx, 
316                                      TDB_DATA key, TDB_DATA *data)
317 {
318         struct db_ctdb_transaction_handle *h = db->transaction;
319
320         *data = tdb_fetch(h->ctx->wtdb->tdb, key);
321
322         if (data->dptr != NULL) {
323                 uint8_t *oldptr = (uint8_t *)data->dptr;
324                 data->dsize -= sizeof(struct ctdb_ltdb_header);
325                 if (data->dsize == 0) {
326                         data->dptr = NULL;
327                 } else {
328                         data->dptr = (uint8 *)
329                                 talloc_memdup(
330                                         mem_ctx, data->dptr+sizeof(struct ctdb_ltdb_header),
331                                         data->dsize);
332                 }
333                 SAFE_FREE(oldptr);
334                 if (data->dptr == NULL && data->dsize != 0) {
335                         return -1;
336                 }
337         }
338
339         if (!h->in_replay) {
340                 h->m_all = db_ctdb_marshall_add(h, h->m_all, h->ctx->db_id, 1, key, NULL, *data);
341                 if (h->m_all == NULL) {
342                         DEBUG(0,(__location__ " Failed to add to marshalling record\n"));
343                         data->dsize = 0;
344                         talloc_free(data->dptr);
345                         return -1;
346                 }
347         }
348
349         return 0;
350 }
351
352
353 static NTSTATUS db_ctdb_store_transaction(struct db_record *rec, TDB_DATA data, int flag);
354 static NTSTATUS db_ctdb_delete_transaction(struct db_record *rec);
355
356 static struct db_record *db_ctdb_fetch_locked_transaction(struct db_ctdb_ctx *ctx,
357                                                           TALLOC_CTX *mem_ctx,
358                                                           TDB_DATA key)
359 {
360         struct db_record *result;
361         TDB_DATA ctdb_data;
362
363         if (!(result = talloc(mem_ctx, struct db_record))) {
364                 DEBUG(0, ("talloc failed\n"));
365                 return NULL;
366         }
367
368         result->private_data = ctx->transaction;
369
370         result->key.dsize = key.dsize;
371         result->key.dptr = (uint8 *)talloc_memdup(result, key.dptr, key.dsize);
372         if (result->key.dptr == NULL) {
373                 DEBUG(0, ("talloc failed\n"));
374                 TALLOC_FREE(result);
375                 return NULL;
376         }
377
378         result->store = db_ctdb_store_transaction;
379         result->delete_rec = db_ctdb_delete_transaction;
380
381         ctdb_data = tdb_fetch(ctx->wtdb->tdb, key);
382         if (ctdb_data.dptr == NULL) {
383                 /* create the record */
384                 result->value = tdb_null;
385                 return result;
386         }
387
388         result->value.dsize = ctdb_data.dsize - sizeof(struct ctdb_ltdb_header);
389         result->value.dptr = NULL;
390
391         if ((result->value.dsize != 0)
392             && !(result->value.dptr = (uint8 *)talloc_memdup(
393                          result, ctdb_data.dptr + sizeof(struct ctdb_ltdb_header),
394                          result->value.dsize))) {
395                 DEBUG(0, ("talloc failed\n"));
396                 TALLOC_FREE(result);
397         }
398
399         SAFE_FREE(ctdb_data.dptr);
400
401         return result;
402 }
403
404 static int db_ctdb_record_destructor(struct db_record *rec)
405 {
406         struct db_ctdb_transaction_handle *h = talloc_get_type_abort(
407                 rec->private_data, struct db_ctdb_transaction_handle);
408         int ret = h->ctx->db->transaction_commit(h->ctx->db);
409         if (ret != 0) {
410                 DEBUG(0,(__location__ " transaction_commit failed\n"));
411         }
412         return 0;
413 }
414
415 /*
416   auto-create a transaction for persistent databases
417  */
418 static struct db_record *db_ctdb_fetch_locked_persistent(struct db_ctdb_ctx *ctx,
419                                                          TALLOC_CTX *mem_ctx,
420                                                          TDB_DATA key)
421 {
422         int res;
423         struct db_record *rec;
424
425         res = db_ctdb_transaction_start(ctx->db);
426         if (res == -1) {
427                 return NULL;
428         }
429
430         rec = db_ctdb_fetch_locked_transaction(ctx, mem_ctx, key);
431         if (rec == NULL) {
432                 ctx->db->transaction_cancel(ctx->db);           
433                 return NULL;
434         }
435
436         /* destroy this transaction when we release the lock */
437         talloc_set_destructor((struct db_record *)talloc_new(rec), db_ctdb_record_destructor);
438         return rec;
439 }
440
441
442 /*
443   stores a record inside a transaction
444  */
445 static int db_ctdb_transaction_store(struct db_ctdb_transaction_handle *h, 
446                                      TDB_DATA key, TDB_DATA data)
447 {
448         TALLOC_CTX *tmp_ctx = talloc_new(h);
449         int ret;
450         TDB_DATA rec;
451         struct ctdb_ltdb_header header;
452
453         /* we need the header so we can update the RSN */
454         rec = tdb_fetch(h->ctx->wtdb->tdb, key);
455         if (rec.dptr == NULL) {
456                 /* the record doesn't exist - create one with us as dmaster.
457                    This is only safe because we are in a transaction and this
458                    is a persistent database */
459                 ZERO_STRUCT(header);
460                 header.dmaster = get_my_vnn();
461         } else {
462                 memcpy(&header, rec.dptr, sizeof(struct ctdb_ltdb_header));
463                 rec.dsize -= sizeof(struct ctdb_ltdb_header);
464                 /* a special case, we are writing the same data that is there now */
465                 if (data.dsize == rec.dsize &&
466                     memcmp(data.dptr, rec.dptr + sizeof(struct ctdb_ltdb_header), data.dsize) == 0) {
467                         SAFE_FREE(rec.dptr);
468                         talloc_free(tmp_ctx);
469                         return 0;
470                 }
471                 SAFE_FREE(rec.dptr);
472         }
473
474         header.rsn++;
475
476         if (!h->in_replay) {
477                 h->m_all = db_ctdb_marshall_add(h, h->m_all, h->ctx->db_id, 0, key, NULL, data);
478                 if (h->m_all == NULL) {
479                         DEBUG(0,(__location__ " Failed to add to marshalling record\n"));
480                         talloc_free(tmp_ctx);
481                         return -1;
482                 }
483         }
484                 
485         h->m_write = db_ctdb_marshall_add(h, h->m_write, h->ctx->db_id, 0, key, &header, data);
486         if (h->m_write == NULL) {
487                 DEBUG(0,(__location__ " Failed to add to marshalling record\n"));
488                 talloc_free(tmp_ctx);
489                 return -1;
490         }
491         
492         rec.dsize = data.dsize + sizeof(struct ctdb_ltdb_header);
493         rec.dptr = talloc_size(tmp_ctx, rec.dsize);
494         if (rec.dptr == NULL) {
495                 DEBUG(0,(__location__ " Failed to alloc record\n"));
496                 talloc_free(tmp_ctx);
497                 return -1;
498         }
499         memcpy(rec.dptr, &header, sizeof(struct ctdb_ltdb_header));
500         memcpy(sizeof(struct ctdb_ltdb_header) + (uint8_t *)rec.dptr, data.dptr, data.dsize);
501
502         ret = tdb_store(h->ctx->wtdb->tdb, key, rec, TDB_REPLACE);
503
504         talloc_free(tmp_ctx);
505         
506         return ret;
507 }
508
509
510 /* 
511    a record store inside a transaction
512  */
513 static NTSTATUS db_ctdb_store_transaction(struct db_record *rec, TDB_DATA data, int flag)
514 {
515         struct db_ctdb_transaction_handle *h = talloc_get_type_abort(
516                 rec->private_data, struct db_ctdb_transaction_handle);
517         int ret;
518
519         ret = db_ctdb_transaction_store(h, rec->key, data);
520         if (ret != 0) {
521                 return tdb_error_to_ntstatus(h->ctx->wtdb->tdb);
522         }
523         return NT_STATUS_OK;
524 }
525
526 /* 
527    a record delete inside a transaction
528  */
529 static NTSTATUS db_ctdb_delete_transaction(struct db_record *rec)
530 {
531         struct db_ctdb_transaction_handle *h = talloc_get_type_abort(
532                 rec->private_data, struct db_ctdb_transaction_handle);
533         int ret;
534
535         ret = db_ctdb_transaction_store(h, rec->key, tdb_null);
536         if (ret != 0) {
537                 return tdb_error_to_ntstatus(h->ctx->wtdb->tdb);
538         }
539         return NT_STATUS_OK;
540 }
541
542
543 /*
544   replay a transaction
545  */
546 static int ctdb_replay_transaction(struct db_ctdb_transaction_handle *h)
547 {
548         int ret, i;
549         struct ctdb_rec_data *rec = NULL;
550
551         h->in_replay = true;
552         talloc_free(h->m_write);
553         h->m_write = NULL;
554
555         ret = db_ctdb_transaction_fetch_start(h);
556         if (ret != 0) {
557                 return ret;
558         }
559
560         for (i=0;i<h->m_all->count;i++) {
561                 TDB_DATA key, data;
562
563                 rec = db_ctdb_marshall_loop_next(h->m_all, rec, NULL, NULL, &key, &data);
564                 if (rec == NULL) {
565                         DEBUG(0, (__location__ " Out of records in ctdb_replay_transaction?\n"));
566                         goto failed;
567                 }
568
569                 if (rec->reqid == 0) {
570                         /* its a store */
571                         if (db_ctdb_transaction_store(h, key, data) != 0) {
572                                 goto failed;
573                         }
574                 } else {
575                         TDB_DATA data2;
576                         TALLOC_CTX *tmp_ctx = talloc_new(h);
577
578                         if (db_ctdb_transaction_fetch(h->ctx, tmp_ctx, key, &data2) != 0) {
579                                 talloc_free(tmp_ctx);
580                                 goto failed;
581                         }
582                         if (data2.dsize != data.dsize ||
583                             memcmp(data2.dptr, data.dptr, data.dsize) != 0) {
584                                 /* the record has changed on us - we have to give up */
585                                 talloc_free(tmp_ctx);
586                                 goto failed;
587                         }
588                         talloc_free(tmp_ctx);
589                 }
590         }
591         
592         return 0;
593
594 failed:
595         tdb_transaction_cancel(h->ctx->wtdb->tdb);
596         return -1;
597 }
598
599
600 /*
601   commit a transaction
602  */
603 static int db_ctdb_transaction_commit(struct db_context *db)
604 {
605         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
606                                                         struct db_ctdb_ctx);
607         NTSTATUS rets;
608         int ret;
609         int status;
610         int retries = 0;
611         struct db_ctdb_transaction_handle *h = ctx->transaction;
612         enum ctdb_controls failure_control = CTDB_CONTROL_TRANS2_ERROR;
613
614         if (h == NULL) {
615                 DEBUG(0,(__location__ " transaction commit with no open transaction on db 0x%08x\n", ctx->db_id));
616                 return -1;
617         }
618
619         DEBUG(5,(__location__ " Commit transaction on db 0x%08x\n", ctx->db_id));
620
621         talloc_set_destructor(h, NULL);
622
623         /* our commit strategy is quite complex.
624
625            - we first try to commit the changes to all other nodes
626
627            - if that works, then we commit locally and we are done
628
629            - if a commit on another node fails, then we need to cancel
630              the transaction, then restart the transaction (thus
631              opening a window of time for a pending recovery to
632              complete), then replay the transaction, checking all the
633              reads and writes (checking that reads give the same data,
634              and writes succeed). Then we retry the transaction to the
635              other nodes
636         */
637
638 again:
639         if (h->m_write == NULL) {
640                 /* no changes were made, potentially after a retry */
641                 tdb_transaction_cancel(h->ctx->wtdb->tdb);
642                 talloc_free(h);
643                 ctx->transaction = NULL;
644                 return 0;
645         }
646
647         /* tell ctdbd to commit to the other nodes */
648         rets = ctdbd_control_local(messaging_ctdbd_connection(), 
649                                   CTDB_CONTROL_TRANS2_COMMIT, h->ctx->db_id, 0,
650                                   db_ctdb_marshall_finish(h->m_write), NULL, NULL, &status);
651         if (!NT_STATUS_IS_OK(rets) || status != 0) {
652                 tdb_transaction_cancel(h->ctx->wtdb->tdb);
653                 sleep(1);
654
655                 if (!NT_STATUS_IS_OK(rets)) {
656                         failure_control = CTDB_CONTROL_TRANS2_ERROR;                    
657                 } else {
658                         /* work out what error code we will give if we 
659                            have to fail the operation */
660                         switch ((enum ctdb_trans2_commit_error)status) {
661                         case CTDB_TRANS2_COMMIT_SUCCESS:
662                         case CTDB_TRANS2_COMMIT_SOMEFAIL:
663                         case CTDB_TRANS2_COMMIT_TIMEOUT:
664                                 failure_control = CTDB_CONTROL_TRANS2_ERROR;
665                                 break;
666                         case CTDB_TRANS2_COMMIT_ALLFAIL:
667                                 failure_control = CTDB_CONTROL_TRANS2_FINISHED;
668                                 break;
669                         }
670                 }
671
672                 if (ctdb_replay_transaction(h) != 0) {
673                         DEBUG(0,(__location__ " Failed to replay transaction\n"));
674                         ctdbd_control_local(messaging_ctdbd_connection(), failure_control,
675                                             h->ctx->db_id, CTDB_CTRL_FLAG_NOREPLY, 
676                                             tdb_null, NULL, NULL, NULL);
677                         h->ctx->transaction = NULL;
678                         talloc_free(h);
679                         ctx->transaction = NULL;
680                         return -1;
681                 }
682                 if (++retries == 10) {
683                         DEBUG(0,(__location__ " Giving up transaction on db 0x%08x after %d retries\n", 
684                                  h->ctx->db_id, retries));
685                         ctdbd_control_local(messaging_ctdbd_connection(), failure_control,
686                                             h->ctx->db_id, CTDB_CTRL_FLAG_NOREPLY, 
687                                             tdb_null, NULL, NULL, NULL);
688                         h->ctx->transaction = NULL;
689                         talloc_free(h);
690                         ctx->transaction = NULL;
691                         return -1;                      
692                 }
693                 goto again;
694         } else {
695                 failure_control = CTDB_CONTROL_TRANS2_ERROR;
696         }
697
698         /* do the real commit locally */
699         ret = tdb_transaction_commit(h->ctx->wtdb->tdb);
700         if (ret != 0) {
701                 DEBUG(0,(__location__ " Failed to commit transaction\n"));
702                 ctdbd_control_local(messaging_ctdbd_connection(), failure_control, h->ctx->db_id, 
703                                     CTDB_CTRL_FLAG_NOREPLY, tdb_null, NULL, NULL, NULL);
704                 h->ctx->transaction = NULL;
705                 talloc_free(h);
706                 return ret;
707         }
708
709         /* tell ctdbd that we are finished with our local commit */
710         ctdbd_control_local(messaging_ctdbd_connection(), CTDB_CONTROL_TRANS2_FINISHED, 
711                             h->ctx->db_id, CTDB_CTRL_FLAG_NOREPLY, 
712                             tdb_null, NULL, NULL, NULL);
713         h->ctx->transaction = NULL;
714         talloc_free(h);
715         return 0;
716 }
717
718
719 /*
720   cancel a transaction
721  */
722 static int db_ctdb_transaction_cancel(struct db_context *db)
723 {
724         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
725                                                         struct db_ctdb_ctx);
726         struct db_ctdb_transaction_handle *h = ctx->transaction;
727
728         if (h == NULL) {
729                 DEBUG(0,(__location__ " transaction cancel with no open transaction on db 0x%08x\n", ctx->db_id));
730                 return -1;
731         }
732
733         DEBUG(5,(__location__ " Cancel transaction on db 0x%08x\n", ctx->db_id));
734
735         ctx->transaction = NULL;
736         talloc_free(h);
737         return 0;
738 }
739
740
741 static NTSTATUS db_ctdb_store(struct db_record *rec, TDB_DATA data, int flag)
742 {
743         struct db_ctdb_rec *crec = talloc_get_type_abort(
744                 rec->private_data, struct db_ctdb_rec);
745         TDB_DATA cdata;
746         int ret;
747
748         cdata.dsize = sizeof(crec->header) + data.dsize;
749
750         if (!(cdata.dptr = SMB_MALLOC_ARRAY(uint8, cdata.dsize))) {
751                 return NT_STATUS_NO_MEMORY;
752         }
753
754         memcpy(cdata.dptr, &crec->header, sizeof(crec->header));
755         memcpy(cdata.dptr + sizeof(crec->header), data.dptr, data.dsize);
756
757         ret = tdb_store(crec->ctdb_ctx->wtdb->tdb, rec->key, cdata, TDB_REPLACE);
758
759         SAFE_FREE(cdata.dptr);
760
761         return (ret == 0) ? NT_STATUS_OK
762                           : tdb_error_to_ntstatus(crec->ctdb_ctx->wtdb->tdb);
763 }
764
765
766
767 static NTSTATUS db_ctdb_delete(struct db_record *rec)
768 {
769         TDB_DATA data;
770
771         /*
772          * We have to store the header with empty data. TODO: Fix the
773          * tdb-level cleanup
774          */
775
776         ZERO_STRUCT(data);
777
778         return db_ctdb_store(rec, data, 0);
779
780 }
781
782 static int db_ctdb_record_destr(struct db_record* data)
783 {
784         struct db_ctdb_rec *crec = talloc_get_type_abort(
785                 data->private_data, struct db_ctdb_rec);
786
787         DEBUG(10, (DEBUGLEVEL > 10
788                    ? "Unlocking db %u key %s\n"
789                    : "Unlocking db %u key %.20s\n",
790                    (int)crec->ctdb_ctx->db_id,
791                    hex_encode(data, (unsigned char *)data->key.dptr,
792                               data->key.dsize)));
793
794         if (tdb_chainunlock(crec->ctdb_ctx->wtdb->tdb, data->key) != 0) {
795                 DEBUG(0, ("tdb_chainunlock failed\n"));
796                 return -1;
797         }
798
799         return 0;
800 }
801
802 static struct db_record *fetch_locked_internal(struct db_ctdb_ctx *ctx,
803                                                TALLOC_CTX *mem_ctx,
804                                                TDB_DATA key,
805                                                bool persistent)
806 {
807         struct db_record *result;
808         struct db_ctdb_rec *crec;
809         NTSTATUS status;
810         TDB_DATA ctdb_data;
811         int migrate_attempts = 0;
812
813         if (!(result = talloc(mem_ctx, struct db_record))) {
814                 DEBUG(0, ("talloc failed\n"));
815                 return NULL;
816         }
817
818         if (!(crec = TALLOC_ZERO_P(result, struct db_ctdb_rec))) {
819                 DEBUG(0, ("talloc failed\n"));
820                 TALLOC_FREE(result);
821                 return NULL;
822         }
823
824         result->private_data = (void *)crec;
825         crec->ctdb_ctx = ctx;
826
827         result->key.dsize = key.dsize;
828         result->key.dptr = (uint8 *)talloc_memdup(result, key.dptr, key.dsize);
829         if (result->key.dptr == NULL) {
830                 DEBUG(0, ("talloc failed\n"));
831                 TALLOC_FREE(result);
832                 return NULL;
833         }
834
835         /*
836          * Do a blocking lock on the record
837          */
838 again:
839
840         if (DEBUGLEVEL >= 10) {
841                 char *keystr = hex_encode(result, key.dptr, key.dsize);
842                 DEBUG(10, (DEBUGLEVEL > 10
843                            ? "Locking db %u key %s\n"
844                            : "Locking db %u key %.20s\n",
845                            (int)crec->ctdb_ctx->db_id, keystr));
846                 TALLOC_FREE(keystr);
847         }
848         
849         if (tdb_chainlock(ctx->wtdb->tdb, key) != 0) {
850                 DEBUG(3, ("tdb_chainlock failed\n"));
851                 TALLOC_FREE(result);
852                 return NULL;
853         }
854
855         result->store = db_ctdb_store;
856         result->delete_rec = db_ctdb_delete;
857         talloc_set_destructor(result, db_ctdb_record_destr);
858
859         ctdb_data = tdb_fetch(ctx->wtdb->tdb, key);
860
861         /*
862          * See if we have a valid record and we are the dmaster. If so, we can
863          * take the shortcut and just return it.
864          */
865
866         if ((ctdb_data.dptr == NULL) ||
867             (ctdb_data.dsize < sizeof(struct ctdb_ltdb_header)) ||
868             ((struct ctdb_ltdb_header *)ctdb_data.dptr)->dmaster != get_my_vnn()
869 #if 0
870             || (random() % 2 != 0)
871 #endif
872 ) {
873                 SAFE_FREE(ctdb_data.dptr);
874                 tdb_chainunlock(ctx->wtdb->tdb, key);
875                 talloc_set_destructor(result, NULL);
876
877                 migrate_attempts += 1;
878
879                 DEBUG(10, ("ctdb_data.dptr = %p, dmaster = %u (%u)\n",
880                            ctdb_data.dptr, ctdb_data.dptr ?
881                            ((struct ctdb_ltdb_header *)ctdb_data.dptr)->dmaster : -1,
882                            get_my_vnn()));
883
884                 status = ctdbd_migrate(messaging_ctdbd_connection(),ctx->db_id, key);
885                 if (!NT_STATUS_IS_OK(status)) {
886                         DEBUG(5, ("ctdb_migrate failed: %s\n",
887                                   nt_errstr(status)));
888                         TALLOC_FREE(result);
889                         return NULL;
890                 }
891                 /* now its migrated, try again */
892                 goto again;
893         }
894
895         if (migrate_attempts > 10) {
896                 DEBUG(0, ("db_ctdb_fetch_locked needed %d attempts\n",
897                           migrate_attempts));
898         }
899
900         memcpy(&crec->header, ctdb_data.dptr, sizeof(crec->header));
901
902         result->value.dsize = ctdb_data.dsize - sizeof(crec->header);
903         result->value.dptr = NULL;
904
905         if ((result->value.dsize != 0)
906             && !(result->value.dptr = (uint8 *)talloc_memdup(
907                          result, ctdb_data.dptr + sizeof(crec->header),
908                          result->value.dsize))) {
909                 DEBUG(0, ("talloc failed\n"));
910                 TALLOC_FREE(result);
911         }
912
913         SAFE_FREE(ctdb_data.dptr);
914
915         return result;
916 }
917
918 static struct db_record *db_ctdb_fetch_locked(struct db_context *db,
919                                               TALLOC_CTX *mem_ctx,
920                                               TDB_DATA key)
921 {
922         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
923                                                         struct db_ctdb_ctx);
924
925         if (ctx->transaction != NULL) {
926                 return db_ctdb_fetch_locked_transaction(ctx, mem_ctx, key);
927         }
928
929         if (db->persistent) {
930                 return db_ctdb_fetch_locked_persistent(ctx, mem_ctx, key);
931         }
932
933         return fetch_locked_internal(ctx, mem_ctx, key, db->persistent);
934 }
935
936 /*
937   fetch (unlocked, no migration) operation on ctdb
938  */
939 static int db_ctdb_fetch(struct db_context *db, TALLOC_CTX *mem_ctx,
940                          TDB_DATA key, TDB_DATA *data)
941 {
942         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
943                                                         struct db_ctdb_ctx);
944         NTSTATUS status;
945         TDB_DATA ctdb_data;
946
947         if (ctx->transaction) {
948                 return db_ctdb_transaction_fetch(ctx, mem_ctx, key, data);
949         }
950
951         /* try a direct fetch */
952         ctdb_data = tdb_fetch(ctx->wtdb->tdb, key);
953
954         /*
955          * See if we have a valid record and we are the dmaster. If so, we can
956          * take the shortcut and just return it.
957          * we bypass the dmaster check for persistent databases
958          */
959         if ((ctdb_data.dptr != NULL) &&
960             (ctdb_data.dsize >= sizeof(struct ctdb_ltdb_header)) &&
961             (db->persistent ||
962              ((struct ctdb_ltdb_header *)ctdb_data.dptr)->dmaster == get_my_vnn())) {
963                 /* we are the dmaster - avoid the ctdb protocol op */
964
965                 data->dsize = ctdb_data.dsize - sizeof(struct ctdb_ltdb_header);
966                 if (data->dsize == 0) {
967                         SAFE_FREE(ctdb_data.dptr);
968                         data->dptr = NULL;
969                         return 0;
970                 }
971
972                 data->dptr = (uint8 *)talloc_memdup(
973                         mem_ctx, ctdb_data.dptr+sizeof(struct ctdb_ltdb_header),
974                         data->dsize);
975
976                 SAFE_FREE(ctdb_data.dptr);
977
978                 if (data->dptr == NULL) {
979                         return -1;
980                 }
981                 return 0;
982         }
983
984         SAFE_FREE(ctdb_data.dptr);
985
986         /* we weren't able to get it locally - ask ctdb to fetch it for us */
987         status = ctdbd_fetch(messaging_ctdbd_connection(),ctx->db_id, key, mem_ctx, data);
988         if (!NT_STATUS_IS_OK(status)) {
989                 DEBUG(5, ("ctdbd_fetch failed: %s\n", nt_errstr(status)));
990                 return -1;
991         }
992
993         return 0;
994 }
995
996 struct traverse_state {
997         struct db_context *db;
998         int (*fn)(struct db_record *rec, void *private_data);
999         void *private_data;
1000 };
1001
1002 static void traverse_callback(TDB_DATA key, TDB_DATA data, void *private_data)
1003 {
1004         struct traverse_state *state = (struct traverse_state *)private_data;
1005         struct db_record *rec;
1006         TALLOC_CTX *tmp_ctx = talloc_new(state->db);
1007         /* we have to give them a locked record to prevent races */
1008         rec = db_ctdb_fetch_locked(state->db, tmp_ctx, key);
1009         if (rec && rec->value.dsize > 0) {
1010                 state->fn(rec, state->private_data);
1011         }
1012         talloc_free(tmp_ctx);
1013 }
1014
1015 static int traverse_persistent_callback(TDB_CONTEXT *tdb, TDB_DATA kbuf, TDB_DATA dbuf,
1016                                         void *private_data)
1017 {
1018         struct traverse_state *state = (struct traverse_state *)private_data;
1019         struct db_record *rec;
1020         TALLOC_CTX *tmp_ctx = talloc_new(state->db);
1021         int ret = 0;
1022         /* we have to give them a locked record to prevent races */
1023         rec = db_ctdb_fetch_locked(state->db, tmp_ctx, kbuf);
1024         if (rec && rec->value.dsize > 0) {
1025                 ret = state->fn(rec, state->private_data);
1026         }
1027         talloc_free(tmp_ctx);
1028         return ret;
1029 }
1030
1031 static int db_ctdb_traverse(struct db_context *db,
1032                             int (*fn)(struct db_record *rec,
1033                                       void *private_data),
1034                             void *private_data)
1035 {
1036         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1037                                                         struct db_ctdb_ctx);
1038         struct traverse_state state;
1039
1040         state.db = db;
1041         state.fn = fn;
1042         state.private_data = private_data;
1043
1044         if (db->persistent) {
1045                 /* for persistent databases we don't need to do a ctdb traverse,
1046                    we can do a faster local traverse */
1047                 return tdb_traverse(ctx->wtdb->tdb, traverse_persistent_callback, &state);
1048         }
1049
1050
1051         ctdbd_traverse(ctx->db_id, traverse_callback, &state);
1052         return 0;
1053 }
1054
1055 static NTSTATUS db_ctdb_store_deny(struct db_record *rec, TDB_DATA data, int flag)
1056 {
1057         return NT_STATUS_MEDIA_WRITE_PROTECTED;
1058 }
1059
1060 static NTSTATUS db_ctdb_delete_deny(struct db_record *rec)
1061 {
1062         return NT_STATUS_MEDIA_WRITE_PROTECTED;
1063 }
1064
1065 static void traverse_read_callback(TDB_DATA key, TDB_DATA data, void *private_data)
1066 {
1067         struct traverse_state *state = (struct traverse_state *)private_data;
1068         struct db_record rec;
1069         rec.key = key;
1070         rec.value = data;
1071         rec.store = db_ctdb_store_deny;
1072         rec.delete_rec = db_ctdb_delete_deny;
1073         rec.private_data = state->db;
1074         state->fn(&rec, state->private_data);
1075 }
1076
1077 static int traverse_persistent_callback_read(TDB_CONTEXT *tdb, TDB_DATA kbuf, TDB_DATA dbuf,
1078                                         void *private_data)
1079 {
1080         struct traverse_state *state = (struct traverse_state *)private_data;
1081         struct db_record rec;
1082         rec.key = kbuf;
1083         rec.value = dbuf;
1084         rec.store = db_ctdb_store_deny;
1085         rec.delete_rec = db_ctdb_delete_deny;
1086         rec.private_data = state->db;
1087
1088         if (rec.value.dsize <= sizeof(struct ctdb_ltdb_header)) {
1089                 /* a deleted record */
1090                 return 0;
1091         }
1092         rec.value.dsize -= sizeof(struct ctdb_ltdb_header);
1093         rec.value.dptr += sizeof(struct ctdb_ltdb_header);
1094
1095         return state->fn(&rec, state->private_data);
1096 }
1097
1098 static int db_ctdb_traverse_read(struct db_context *db,
1099                                  int (*fn)(struct db_record *rec,
1100                                            void *private_data),
1101                                  void *private_data)
1102 {
1103         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1104                                                         struct db_ctdb_ctx);
1105         struct traverse_state state;
1106
1107         state.db = db;
1108         state.fn = fn;
1109         state.private_data = private_data;
1110
1111         if (db->persistent) {
1112                 /* for persistent databases we don't need to do a ctdb traverse,
1113                    we can do a faster local traverse */
1114                 return tdb_traverse_read(ctx->wtdb->tdb, traverse_persistent_callback_read, &state);
1115         }
1116
1117         ctdbd_traverse(ctx->db_id, traverse_read_callback, &state);
1118         return 0;
1119 }
1120
1121 static int db_ctdb_get_seqnum(struct db_context *db)
1122 {
1123         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1124                                                         struct db_ctdb_ctx);
1125         return tdb_get_seqnum(ctx->wtdb->tdb);
1126 }
1127
1128 struct db_context *db_open_ctdb(TALLOC_CTX *mem_ctx,
1129                                 const char *name,
1130                                 int hash_size, int tdb_flags,
1131                                 int open_flags, mode_t mode)
1132 {
1133         struct db_context *result;
1134         struct db_ctdb_ctx *db_ctdb;
1135         char *db_path;
1136
1137         if (!lp_clustering()) {
1138                 DEBUG(10, ("Clustering disabled -- no ctdb\n"));
1139                 return NULL;
1140         }
1141
1142         if (!(result = TALLOC_ZERO_P(mem_ctx, struct db_context))) {
1143                 DEBUG(0, ("talloc failed\n"));
1144                 TALLOC_FREE(result);
1145                 return NULL;
1146         }
1147
1148         if (!(db_ctdb = TALLOC_P(result, struct db_ctdb_ctx))) {
1149                 DEBUG(0, ("talloc failed\n"));
1150                 TALLOC_FREE(result);
1151                 return NULL;
1152         }
1153
1154         db_ctdb->transaction = NULL;
1155         db_ctdb->db = result;
1156
1157         if (!NT_STATUS_IS_OK(ctdbd_db_attach(messaging_ctdbd_connection(),name, &db_ctdb->db_id, tdb_flags))) {
1158                 DEBUG(0, ("ctdbd_db_attach failed for %s\n", name));
1159                 TALLOC_FREE(result);
1160                 return NULL;
1161         }
1162
1163         db_path = ctdbd_dbpath(messaging_ctdbd_connection(), db_ctdb, db_ctdb->db_id);
1164
1165         result->persistent = ((tdb_flags & TDB_CLEAR_IF_FIRST) == 0);
1166
1167         /* only pass through specific flags */
1168         tdb_flags &= TDB_SEQNUM;
1169
1170         /* honor permissions if user has specified O_CREAT */
1171         if (open_flags & O_CREAT) {
1172                 chmod(db_path, mode);
1173         }
1174
1175         db_ctdb->wtdb = tdb_wrap_open(db_ctdb, db_path, hash_size, tdb_flags, O_RDWR, 0);
1176         if (db_ctdb->wtdb == NULL) {
1177                 DEBUG(0, ("Could not open tdb %s: %s\n", db_path, strerror(errno)));
1178                 TALLOC_FREE(result);
1179                 return NULL;
1180         }
1181         talloc_free(db_path);
1182
1183         result->private_data = (void *)db_ctdb;
1184         result->fetch_locked = db_ctdb_fetch_locked;
1185         result->fetch = db_ctdb_fetch;
1186         result->traverse = db_ctdb_traverse;
1187         result->traverse_read = db_ctdb_traverse_read;
1188         result->get_seqnum = db_ctdb_get_seqnum;
1189         result->transaction_start = db_ctdb_transaction_start;
1190         result->transaction_commit = db_ctdb_transaction_commit;
1191         result->transaction_cancel = db_ctdb_transaction_cancel;
1192
1193         DEBUG(3,("db_open_ctdb: opened database '%s' with dbid 0x%x\n",
1194                  name, db_ctdb->db_id));
1195
1196         return result;
1197 }
1198 #endif