504d0f681a8be4c60810cc2a23566b12b98e807e
[sahlberg/ctdb.git] / lib / tdb / common / transaction.c
1  /* 
2    Unix SMB/CIFS implementation.
3
4    trivial database library
5
6    Copyright (C) Andrew Tridgell              2005
7
8      ** NOTE! The following LGPL license applies to the tdb
9      ** library. This does NOT imply that all of Samba is released
10      ** under the LGPL
11
12    This library is free software; you can redistribute it and/or
13    modify it under the terms of the GNU Lesser General Public
14    License as published by the Free Software Foundation; either
15    version 3 of the License, or (at your option) any later version.
16
17    This library is distributed in the hope that it will be useful,
18    but WITHOUT ANY WARRANTY; without even the implied warranty of
19    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
20    Lesser General Public License for more details.
21
22    You should have received a copy of the GNU Lesser General Public
23    License along with this library; if not, see <http://www.gnu.org/licenses/>.
24 */
25
26 #include "tdb_private.h"
27
28 /*
29   transaction design:
30
31   - only allow a single transaction at a time per database. This makes
32     using the transaction API simpler, as otherwise the caller would
33     have to cope with temporary failures in transactions that conflict
34     with other current transactions
35
36   - keep the transaction recovery information in the same file as the
37     database, using a special 'transaction recovery' record pointed at
38     by the header. This removes the need for extra journal files as
39     used by some other databases
40
41   - dynamically allocated the transaction recover record, re-using it
42     for subsequent transactions. If a larger record is needed then
43     tdb_free() the old record to place it on the normal tdb freelist
44     before allocating the new record
45
46   - during transactions, keep a linked list of writes all that have
47     been performed by intercepting all tdb_write() calls. The hooked
48     transaction versions of tdb_read() and tdb_write() check this
49     linked list and try to use the elements of the list in preference
50     to the real database.
51
52   - don't allow any locks to be held when a transaction starts,
53     otherwise we can end up with deadlock (plus lack of lock nesting
54     in posix locks would mean the lock is lost)
55
56   - if the caller gains a lock during the transaction but doesn't
57     release it then fail the commit
58
59   - allow for nested calls to tdb_transaction_start(), re-using the
60     existing transaction record. If the inner transaction is cancelled
61     then a subsequent commit will fail
62
63   - keep a mirrored copy of the tdb hash chain heads to allow for the
64     fast hash heads scan on traverse, updating the mirrored copy in
65     the transaction version of tdb_write
66
67   - allow callers to mix transaction and non-transaction use of tdb,
68     although once a transaction is started then an exclusive lock is
69     gained until the transaction is committed or cancelled
70
71   - the commit stategy involves first saving away all modified data
72     into a linearised buffer in the transaction recovery area, then
73     marking the transaction recovery area with a magic value to
74     indicate a valid recovery record. In total 4 fsync/msync calls are
75     needed per commit to prevent race conditions. It might be possible
76     to reduce this to 3 or even 2 with some more work.
77
78   - check for a valid recovery record on open of the tdb, while the
79     open lock is held. Automatically recover from the transaction
80     recovery area if needed, then continue with the open as
81     usual. This allows for smooth crash recovery with no administrator
82     intervention.
83
84   - if TDB_NOSYNC is passed to flags in tdb_open then transactions are
85     still available, but no transaction recovery area is used and no
86     fsync/msync calls are made.
87
88   - if TDB_ALLOW_NESTING is passed to flags in tdb open, or added using
89     tdb_add_flags() transaction nesting is enabled.
90     It resets the TDB_DISALLOW_NESTING flag, as both cannot be used together.
91     The default is that transaction nesting is allowed.
92     Note: this default may change in future versions of tdb.
93
94     Beware. when transactions are nested a transaction successfully
95     completed with tdb_transaction_commit() can be silently unrolled later.
96
97   - if TDB_DISALLOW_NESTING is passed to flags in tdb open, or added using
98     tdb_add_flags() transaction nesting is disabled.
99     It resets the TDB_ALLOW_NESTING flag, as both cannot be used together.
100     An attempt create a nested transaction will fail with TDB_ERR_NESTING.
101     The default is that transaction nesting is allowed.
102     Note: this default may change in future versions of tdb.
103 */
104
105
106 /*
107   hold the context of any current transaction
108 */
109 struct tdb_transaction {
110         /* we keep a mirrored copy of the tdb hash heads here so
111            tdb_next_hash_chain() can operate efficiently */
112         uint32_t *hash_heads;
113
114         /* the original io methods - used to do IOs to the real db */
115         const struct tdb_methods *io_methods;
116
117         /* the list of transaction blocks. When a block is first
118            written to, it gets created in this list */
119         uint8_t **blocks;
120         uint32_t num_blocks;
121         uint32_t block_size;      /* bytes in each block */
122         uint32_t last_block_size; /* number of valid bytes in the last block */
123
124         /* non-zero when an internal transaction error has
125            occurred. All write operations will then fail until the
126            transaction is ended */
127         int transaction_error;
128
129         /* when inside a transaction we need to keep track of any
130            nested tdb_transaction_start() calls, as these are allowed,
131            but don't create a new transaction */
132         int nesting;
133
134         /* set when a prepare has already occurred */
135         bool prepared;
136         tdb_off_t magic_offset;
137
138         /* old file size before transaction */
139         tdb_len_t old_map_size;
140
141         /* we should re-pack on commit */
142         bool need_repack;
143 };
144
145
146 /*
147   read while in a transaction. We need to check first if the data is in our list
148   of transaction elements, then if not do a real read
149 */
150 static int transaction_read(struct tdb_context *tdb, tdb_off_t off, void *buf, 
151                             tdb_len_t len, int cv)
152 {
153         uint32_t blk;
154
155         /* break it down into block sized ops */
156         while (len + (off % tdb->transaction->block_size) > tdb->transaction->block_size) {
157                 tdb_len_t len2 = tdb->transaction->block_size - (off % tdb->transaction->block_size);
158                 if (transaction_read(tdb, off, buf, len2, cv) != 0) {
159                         return -1;
160                 }
161                 len -= len2;
162                 off += len2;
163                 buf = (void *)(len2 + (char *)buf);
164         }
165
166         if (len == 0) {
167                 return 0;
168         }
169
170         blk = off / tdb->transaction->block_size;
171
172         /* see if we have it in the block list */
173         if (tdb->transaction->num_blocks <= blk ||
174             tdb->transaction->blocks[blk] == NULL) {
175                 /* nope, do a real read */
176                 if (tdb->transaction->io_methods->tdb_read(tdb, off, buf, len, cv) != 0) {
177                         goto fail;
178                 }
179                 return 0;
180         }
181
182         /* it is in the block list. Now check for the last block */
183         if (blk == tdb->transaction->num_blocks-1) {
184                 if (len > tdb->transaction->last_block_size) {
185                         goto fail;
186                 }
187         }
188
189         /* now copy it out of this block */
190         memcpy(buf, tdb->transaction->blocks[blk] + (off % tdb->transaction->block_size), len);
191         if (cv) {
192                 tdb_convert(buf, len);
193         }
194         return 0;
195
196 fail:
197         TDB_LOG((tdb, TDB_DEBUG_FATAL, "transaction_read: failed at off=%d len=%d\n", off, len));
198         tdb->ecode = TDB_ERR_IO;
199         tdb->transaction->transaction_error = 1;
200         return -1;
201 }
202
203
204 /*
205   write while in a transaction
206 */
207 static int transaction_write(struct tdb_context *tdb, tdb_off_t off, 
208                              const void *buf, tdb_len_t len)
209 {
210         uint32_t blk;
211
212         /* Only a commit is allowed on a prepared transaction */
213         if (tdb->transaction->prepared) {
214                 tdb->ecode = TDB_ERR_EINVAL;
215                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "transaction_write: transaction already prepared, write not allowed\n"));
216                 tdb->transaction->transaction_error = 1;
217                 return -1;
218         }
219
220         /* if the write is to a hash head, then update the transaction
221            hash heads */
222         if (len == sizeof(tdb_off_t) && off >= FREELIST_TOP &&
223             off < FREELIST_TOP+TDB_HASHTABLE_SIZE(tdb)) {
224                 uint32_t chain = (off-FREELIST_TOP) / sizeof(tdb_off_t);
225                 memcpy(&tdb->transaction->hash_heads[chain], buf, len);
226         }
227
228         /* break it up into block sized chunks */
229         while (len + (off % tdb->transaction->block_size) > tdb->transaction->block_size) {
230                 tdb_len_t len2 = tdb->transaction->block_size - (off % tdb->transaction->block_size);
231                 if (transaction_write(tdb, off, buf, len2) != 0) {
232                         return -1;
233                 }
234                 len -= len2;
235                 off += len2;
236                 if (buf != NULL) {
237                         buf = (const void *)(len2 + (const char *)buf);
238                 }
239         }
240
241         if (len == 0) {
242                 return 0;
243         }
244
245         blk = off / tdb->transaction->block_size;
246         off = off % tdb->transaction->block_size;
247
248         if (tdb->transaction->num_blocks <= blk) {
249                 uint8_t **new_blocks;
250                 /* expand the blocks array */
251                 if (tdb->transaction->blocks == NULL) {
252                         new_blocks = (uint8_t **)malloc(
253                                 (blk+1)*sizeof(uint8_t *));
254                 } else {
255                         new_blocks = (uint8_t **)realloc(
256                                 tdb->transaction->blocks,
257                                 (blk+1)*sizeof(uint8_t *));
258                 }
259                 if (new_blocks == NULL) {
260                         tdb->ecode = TDB_ERR_OOM;
261                         goto fail;
262                 }
263                 memset(&new_blocks[tdb->transaction->num_blocks], 0, 
264                        (1+(blk - tdb->transaction->num_blocks))*sizeof(uint8_t *));
265                 tdb->transaction->blocks = new_blocks;
266                 tdb->transaction->num_blocks = blk+1;
267                 tdb->transaction->last_block_size = 0;
268         }
269
270         /* allocate and fill a block? */
271         if (tdb->transaction->blocks[blk] == NULL) {
272                 tdb->transaction->blocks[blk] = (uint8_t *)calloc(tdb->transaction->block_size, 1);
273                 if (tdb->transaction->blocks[blk] == NULL) {
274                         tdb->ecode = TDB_ERR_OOM;
275                         tdb->transaction->transaction_error = 1;
276                         return -1;                      
277                 }
278                 if (tdb->transaction->old_map_size > blk * tdb->transaction->block_size) {
279                         tdb_len_t len2 = tdb->transaction->block_size;
280                         if (len2 + (blk * tdb->transaction->block_size) > tdb->transaction->old_map_size) {
281                                 len2 = tdb->transaction->old_map_size - (blk * tdb->transaction->block_size);
282                         }
283                         if (tdb->transaction->io_methods->tdb_read(tdb, blk * tdb->transaction->block_size, 
284                                                                    tdb->transaction->blocks[blk], 
285                                                                    len2, 0) != 0) {
286                                 SAFE_FREE(tdb->transaction->blocks[blk]);                               
287                                 tdb->ecode = TDB_ERR_IO;
288                                 goto fail;
289                         }
290                         if (blk == tdb->transaction->num_blocks-1) {
291                                 tdb->transaction->last_block_size = len2;
292                         }                       
293                 }
294         }
295
296         /* overwrite part of an existing block */
297         if (buf == NULL) {
298                 memset(tdb->transaction->blocks[blk] + off, 0, len);
299         } else {
300                 memcpy(tdb->transaction->blocks[blk] + off, buf, len);
301         }
302         if (blk == tdb->transaction->num_blocks-1) {
303                 if (len + off > tdb->transaction->last_block_size) {
304                         tdb->transaction->last_block_size = len + off;
305                 }
306         }
307
308         return 0;
309
310 fail:
311         TDB_LOG((tdb, TDB_DEBUG_FATAL, "transaction_write: failed at off=%d len=%d\n", 
312                  (blk*tdb->transaction->block_size) + off, len));
313         tdb->transaction->transaction_error = 1;
314         return -1;
315 }
316
317
318 /*
319   write while in a transaction - this varient never expands the transaction blocks, it only
320   updates existing blocks. This means it cannot change the recovery size
321 */
322 static int transaction_write_existing(struct tdb_context *tdb, tdb_off_t off, 
323                                       const void *buf, tdb_len_t len)
324 {
325         uint32_t blk;
326
327         /* break it up into block sized chunks */
328         while (len + (off % tdb->transaction->block_size) > tdb->transaction->block_size) {
329                 tdb_len_t len2 = tdb->transaction->block_size - (off % tdb->transaction->block_size);
330                 if (transaction_write_existing(tdb, off, buf, len2) != 0) {
331                         return -1;
332                 }
333                 len -= len2;
334                 off += len2;
335                 if (buf != NULL) {
336                         buf = (const void *)(len2 + (const char *)buf);
337                 }
338         }
339
340         if (len == 0) {
341                 return 0;
342         }
343
344         blk = off / tdb->transaction->block_size;
345         off = off % tdb->transaction->block_size;
346
347         if (tdb->transaction->num_blocks <= blk ||
348             tdb->transaction->blocks[blk] == NULL) {
349                 return 0;
350         }
351
352         if (blk == tdb->transaction->num_blocks-1 &&
353             off + len > tdb->transaction->last_block_size) {
354                 if (off >= tdb->transaction->last_block_size) {
355                         return 0;
356                 }
357                 len = tdb->transaction->last_block_size - off;
358         }
359
360         /* overwrite part of an existing block */
361         memcpy(tdb->transaction->blocks[blk] + off, buf, len);
362
363         return 0;
364 }
365
366
367 /*
368   accelerated hash chain head search, using the cached hash heads
369 */
370 static void transaction_next_hash_chain(struct tdb_context *tdb, uint32_t *chain)
371 {
372         uint32_t h = *chain;
373         for (;h < tdb->header.hash_size;h++) {
374                 /* the +1 takes account of the freelist */
375                 if (0 != tdb->transaction->hash_heads[h+1]) {
376                         break;
377                 }
378         }
379         (*chain) = h;
380 }
381
382 /*
383   out of bounds check during a transaction
384 */
385 static int transaction_oob(struct tdb_context *tdb, tdb_off_t len, int probe)
386 {
387         if (len <= tdb->map_size) {
388                 return 0;
389         }
390         tdb->ecode = TDB_ERR_IO;
391         return -1;
392 }
393
394 /*
395   transaction version of tdb_expand().
396 */
397 static int transaction_expand_file(struct tdb_context *tdb, tdb_off_t size, 
398                                    tdb_off_t addition)
399 {
400         /* add a write to the transaction elements, so subsequent
401            reads see the zero data */
402         if (transaction_write(tdb, size, NULL, addition) != 0) {
403                 return -1;
404         }
405
406         tdb->transaction->need_repack = true;
407
408         return 0;
409 }
410
411 static const struct tdb_methods transaction_methods = {
412         transaction_read,
413         transaction_write,
414         transaction_next_hash_chain,
415         transaction_oob,
416         transaction_expand_file,
417 };
418
419
420 /*
421   start a tdb transaction. No token is returned, as only a single
422   transaction is allowed to be pending per tdb_context
423 */
424 int tdb_transaction_start(struct tdb_context *tdb)
425 {
426         /* some sanity checks */
427         if (tdb->read_only || (tdb->flags & TDB_INTERNAL) || tdb->traverse_read) {
428                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction on a read-only or internal db\n"));
429                 tdb->ecode = TDB_ERR_EINVAL;
430                 return -1;
431         }
432
433         /* cope with nested tdb_transaction_start() calls */
434         if (tdb->transaction != NULL) {
435                 if (!(tdb->flags & TDB_ALLOW_NESTING)) {
436                         tdb->ecode = TDB_ERR_NESTING;
437                         return -1;
438                 }
439                 tdb->transaction->nesting++;
440                 TDB_LOG((tdb, TDB_DEBUG_TRACE, "tdb_transaction_start: nesting %d\n", 
441                          tdb->transaction->nesting));
442                 return 0;
443         }
444
445         if (tdb_have_extra_locks(tdb)) {
446                 /* the caller must not have any locks when starting a
447                    transaction as otherwise we'll be screwed by lack
448                    of nested locks in posix */
449                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction with locks held\n"));
450                 tdb->ecode = TDB_ERR_LOCK;
451                 return -1;
452         }
453
454         if (tdb->travlocks.next != NULL) {
455                 /* you cannot use transactions inside a traverse (although you can use
456                    traverse inside a transaction) as otherwise you can end up with
457                    deadlock */
458                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction within a traverse\n"));
459                 tdb->ecode = TDB_ERR_LOCK;
460                 return -1;
461         }
462
463         tdb->transaction = (struct tdb_transaction *)
464                 calloc(sizeof(struct tdb_transaction), 1);
465         if (tdb->transaction == NULL) {
466                 tdb->ecode = TDB_ERR_OOM;
467                 return -1;
468         }
469
470         /* a page at a time seems like a reasonable compromise between compactness and efficiency */
471         tdb->transaction->block_size = tdb->page_size;
472
473         /* get the transaction write lock. This is a blocking lock. As
474            discussed with Volker, there are a number of ways we could
475            make this async, which we will probably do in the future */
476         if (tdb_transaction_lock(tdb, F_WRLCK) == -1) {
477                 SAFE_FREE(tdb->transaction->blocks);
478                 SAFE_FREE(tdb->transaction);
479                 return -1;
480         }
481
482         /* get a read lock from the freelist to the end of file. This
483            is upgraded to a write lock during the commit */
484         if (tdb_allrecord_lock(tdb, F_RDLCK, TDB_LOCK_WAIT, true) == -1) {
485                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: failed to get hash locks\n"));
486                 goto fail_allrecord_lock;
487         }
488
489         /* setup a copy of the hash table heads so the hash scan in
490            traverse can be fast */
491         tdb->transaction->hash_heads = (uint32_t *)
492                 calloc(tdb->header.hash_size+1, sizeof(uint32_t));
493         if (tdb->transaction->hash_heads == NULL) {
494                 tdb->ecode = TDB_ERR_OOM;
495                 goto fail;
496         }
497         if (tdb->methods->tdb_read(tdb, FREELIST_TOP, tdb->transaction->hash_heads,
498                                    TDB_HASHTABLE_SIZE(tdb), 0) != 0) {
499                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_start: failed to read hash heads\n"));
500                 tdb->ecode = TDB_ERR_IO;
501                 goto fail;
502         }
503
504         /* make sure we know about any file expansions already done by
505            anyone else */
506         tdb->methods->tdb_oob(tdb, tdb->map_size + 1, 1);
507         tdb->transaction->old_map_size = tdb->map_size;
508
509         /* finally hook the io methods, replacing them with
510            transaction specific methods */
511         tdb->transaction->io_methods = tdb->methods;
512         tdb->methods = &transaction_methods;
513
514         /* Trace at the end, so we get sequence number correct. */
515         tdb_trace(tdb, "tdb_transaction_start");
516         return 0;
517
518 fail:
519         tdb_allrecord_unlock(tdb, F_RDLCK, false);
520 fail_allrecord_lock:
521         tdb_transaction_unlock(tdb, F_WRLCK);
522         SAFE_FREE(tdb->transaction->blocks);
523         SAFE_FREE(tdb->transaction->hash_heads);
524         SAFE_FREE(tdb->transaction);
525         return -1;
526 }
527
528
529 /*
530   sync to disk
531 */
532 static int transaction_sync(struct tdb_context *tdb, tdb_off_t offset, tdb_len_t length)
533 {       
534         if (tdb->flags & TDB_NOSYNC) {
535                 return 0;
536         }
537
538         if (fdatasync(tdb->fd) != 0) {
539                 tdb->ecode = TDB_ERR_IO;
540                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction: fsync failed\n"));
541                 return -1;
542         }
543 #ifdef HAVE_MMAP
544         if (tdb->map_ptr) {
545                 tdb_off_t moffset = offset & ~(tdb->page_size-1);
546                 if (msync(moffset + (char *)tdb->map_ptr, 
547                           length + (offset - moffset), MS_SYNC) != 0) {
548                         tdb->ecode = TDB_ERR_IO;
549                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction: msync failed - %s\n",
550                                  strerror(errno)));
551                         return -1;
552                 }
553         }
554 #endif
555         return 0;
556 }
557
558
559 static int _tdb_transaction_cancel(struct tdb_context *tdb)
560 {       
561         int i, ret = 0;
562
563         if (tdb->transaction == NULL) {
564                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_cancel: no transaction\n"));
565                 return -1;
566         }
567
568         if (tdb->transaction->nesting != 0) {
569                 tdb->transaction->transaction_error = 1;
570                 tdb->transaction->nesting--;
571                 return 0;
572         }               
573
574         tdb->map_size = tdb->transaction->old_map_size;
575
576         /* free all the transaction blocks */
577         for (i=0;i<tdb->transaction->num_blocks;i++) {
578                 if (tdb->transaction->blocks[i] != NULL) {
579                         free(tdb->transaction->blocks[i]);
580                 }
581         }
582         SAFE_FREE(tdb->transaction->blocks);
583
584         if (tdb->transaction->magic_offset) {
585                 const struct tdb_methods *methods = tdb->transaction->io_methods;
586                 const uint32_t invalid = TDB_RECOVERY_INVALID_MAGIC;
587
588                 /* remove the recovery marker */
589                 if (methods->tdb_write(tdb, tdb->transaction->magic_offset, &invalid, 4) == -1 ||
590                 transaction_sync(tdb, tdb->transaction->magic_offset, 4) == -1) {
591                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_cancel: failed to remove recovery magic\n"));
592                         ret = -1;
593                 }
594         }
595
596         /* This also removes the OPEN_LOCK, if we have it. */
597         tdb_release_transaction_locks(tdb);
598
599         /* restore the normal io methods */
600         tdb->methods = tdb->transaction->io_methods;
601
602         SAFE_FREE(tdb->transaction->hash_heads);
603         SAFE_FREE(tdb->transaction);
604
605         return ret;
606 }
607
608 /*
609   cancel the current transaction
610 */
611 int tdb_transaction_cancel(struct tdb_context *tdb)
612 {
613         tdb_trace(tdb, "tdb_transaction_cancel");
614         return _tdb_transaction_cancel(tdb);
615 }
616
617 /*
618   work out how much space the linearised recovery data will consume
619 */
620 static tdb_len_t tdb_recovery_size(struct tdb_context *tdb)
621 {
622         tdb_len_t recovery_size = 0;
623         int i;
624
625         recovery_size = sizeof(uint32_t);
626         for (i=0;i<tdb->transaction->num_blocks;i++) {
627                 if (i * tdb->transaction->block_size >= tdb->transaction->old_map_size) {
628                         break;
629                 }
630                 if (tdb->transaction->blocks[i] == NULL) {
631                         continue;
632                 }
633                 recovery_size += 2*sizeof(tdb_off_t);
634                 if (i == tdb->transaction->num_blocks-1) {
635                         recovery_size += tdb->transaction->last_block_size;
636                 } else {
637                         recovery_size += tdb->transaction->block_size;
638                 }
639         }       
640
641         return recovery_size;
642 }
643
644 /*
645   allocate the recovery area, or use an existing recovery area if it is
646   large enough
647 */
648 static int tdb_recovery_allocate(struct tdb_context *tdb, 
649                                  tdb_len_t *recovery_size,
650                                  tdb_off_t *recovery_offset,
651                                  tdb_len_t *recovery_max_size)
652 {
653         struct tdb_record rec;
654         const struct tdb_methods *methods = tdb->transaction->io_methods;
655         tdb_off_t recovery_head;
656
657         if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, &recovery_head) == -1) {
658                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to read recovery head\n"));
659                 return -1;
660         }
661
662         rec.rec_len = 0;
663
664         if (recovery_head != 0) {
665                 if (methods->tdb_read(tdb, recovery_head, &rec, sizeof(rec), DOCONV()) == -1) {
666                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to read recovery record\n"));
667                         return -1;
668                 }
669                 /* ignore invalid recovery regions: can happen in crash */
670                 if (rec.magic != TDB_RECOVERY_MAGIC &&
671                     rec.magic != TDB_RECOVERY_INVALID_MAGIC) {
672                         recovery_head = 0;
673                 }
674         }
675
676         *recovery_size = tdb_recovery_size(tdb);
677
678         if (recovery_head != 0 && *recovery_size <= rec.rec_len) {
679                 /* it fits in the existing area */
680                 *recovery_max_size = rec.rec_len;
681                 *recovery_offset = recovery_head;
682                 return 0;
683         }
684
685         /* we need to free up the old recovery area, then allocate a
686            new one at the end of the file. Note that we cannot use
687            tdb_allocate() to allocate the new one as that might return
688            us an area that is being currently used (as of the start of
689            the transaction) */
690         if (recovery_head != 0) {
691                 if (tdb_free(tdb, recovery_head, &rec) == -1) {
692                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to free previous recovery area\n"));
693                         return -1;
694                 }
695         }
696
697         /* the tdb_free() call might have increased the recovery size */
698         *recovery_size = tdb_recovery_size(tdb);
699
700         /* round up to a multiple of page size */
701         *recovery_max_size = TDB_ALIGN(sizeof(rec) + *recovery_size, tdb->page_size) - sizeof(rec);
702         *recovery_offset = tdb->map_size;
703         recovery_head = *recovery_offset;
704
705         if (methods->tdb_expand_file(tdb, tdb->transaction->old_map_size, 
706                                      (tdb->map_size - tdb->transaction->old_map_size) +
707                                      sizeof(rec) + *recovery_max_size) == -1) {
708                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to create recovery area\n"));
709                 return -1;
710         }
711
712         /* remap the file (if using mmap) */
713         methods->tdb_oob(tdb, tdb->map_size + 1, 1);
714
715         /* we have to reset the old map size so that we don't try to expand the file
716            again in the transaction commit, which would destroy the recovery area */
717         tdb->transaction->old_map_size = tdb->map_size;
718
719         /* write the recovery header offset and sync - we can sync without a race here
720            as the magic ptr in the recovery record has not been set */
721         CONVERT(recovery_head);
722         if (methods->tdb_write(tdb, TDB_RECOVERY_HEAD, 
723                                &recovery_head, sizeof(tdb_off_t)) == -1) {
724                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to write recovery head\n"));
725                 return -1;
726         }
727         if (transaction_write_existing(tdb, TDB_RECOVERY_HEAD, &recovery_head, sizeof(tdb_off_t)) == -1) {
728                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to write recovery head\n"));
729                 return -1;
730         }
731
732         return 0;
733 }
734
735
736 /*
737   setup the recovery data that will be used on a crash during commit
738 */
739 static int transaction_setup_recovery(struct tdb_context *tdb, 
740                                       tdb_off_t *magic_offset)
741 {
742         tdb_len_t recovery_size;
743         unsigned char *data, *p;
744         const struct tdb_methods *methods = tdb->transaction->io_methods;
745         struct tdb_record *rec;
746         tdb_off_t recovery_offset, recovery_max_size;
747         tdb_off_t old_map_size = tdb->transaction->old_map_size;
748         uint32_t magic, tailer;
749         int i;
750
751         /*
752           check that the recovery area has enough space
753         */
754         if (tdb_recovery_allocate(tdb, &recovery_size, 
755                                   &recovery_offset, &recovery_max_size) == -1) {
756                 return -1;
757         }
758
759         data = (unsigned char *)malloc(recovery_size + sizeof(*rec));
760         if (data == NULL) {
761                 tdb->ecode = TDB_ERR_OOM;
762                 return -1;
763         }
764
765         rec = (struct tdb_record *)data;
766         memset(rec, 0, sizeof(*rec));
767
768         rec->magic    = TDB_RECOVERY_INVALID_MAGIC;
769         rec->data_len = recovery_size;
770         rec->rec_len  = recovery_max_size;
771         rec->key_len  = old_map_size;
772         CONVERT(rec);
773
774         /* build the recovery data into a single blob to allow us to do a single
775            large write, which should be more efficient */
776         p = data + sizeof(*rec);
777         for (i=0;i<tdb->transaction->num_blocks;i++) {
778                 tdb_off_t offset;
779                 tdb_len_t length;
780
781                 if (tdb->transaction->blocks[i] == NULL) {
782                         continue;
783                 }
784
785                 offset = i * tdb->transaction->block_size;
786                 length = tdb->transaction->block_size;
787                 if (i == tdb->transaction->num_blocks-1) {
788                         length = tdb->transaction->last_block_size;
789                 }
790
791                 if (offset >= old_map_size) {
792                         continue;
793                 }
794                 if (offset + length > tdb->transaction->old_map_size) {
795                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: transaction data over new region boundary\n"));
796                         free(data);
797                         tdb->ecode = TDB_ERR_CORRUPT;
798                         return -1;
799                 }
800                 memcpy(p, &offset, 4);
801                 memcpy(p+4, &length, 4);
802                 if (DOCONV()) {
803                         tdb_convert(p, 8);
804                 }
805                 /* the recovery area contains the old data, not the
806                    new data, so we have to call the original tdb_read
807                    method to get it */
808                 if (methods->tdb_read(tdb, offset, p + 8, length, 0) != 0) {
809                         free(data);
810                         tdb->ecode = TDB_ERR_IO;
811                         return -1;
812                 }
813                 p += 8 + length;
814         }
815
816         /* and the tailer */
817         tailer = sizeof(*rec) + recovery_max_size;
818         memcpy(p, &tailer, 4);
819         CONVERT(p);
820
821         /* write the recovery data to the recovery area */
822         if (methods->tdb_write(tdb, recovery_offset, data, sizeof(*rec) + recovery_size) == -1) {
823                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write recovery data\n"));
824                 free(data);
825                 tdb->ecode = TDB_ERR_IO;
826                 return -1;
827         }
828         if (transaction_write_existing(tdb, recovery_offset, data, sizeof(*rec) + recovery_size) == -1) {
829                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write secondary recovery data\n"));
830                 free(data);
831                 tdb->ecode = TDB_ERR_IO;
832                 return -1;
833         }
834
835         /* as we don't have ordered writes, we have to sync the recovery
836            data before we update the magic to indicate that the recovery
837            data is present */
838         if (transaction_sync(tdb, recovery_offset, sizeof(*rec) + recovery_size) == -1) {
839                 free(data);
840                 return -1;
841         }
842
843         free(data);
844
845         magic = TDB_RECOVERY_MAGIC;
846         CONVERT(magic);
847
848         *magic_offset = recovery_offset + offsetof(struct tdb_record, magic);
849
850         if (methods->tdb_write(tdb, *magic_offset, &magic, sizeof(magic)) == -1) {
851                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write recovery magic\n"));
852                 tdb->ecode = TDB_ERR_IO;
853                 return -1;
854         }
855         if (transaction_write_existing(tdb, *magic_offset, &magic, sizeof(magic)) == -1) {
856                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write secondary recovery magic\n"));
857                 tdb->ecode = TDB_ERR_IO;
858                 return -1;
859         }
860
861         /* ensure the recovery magic marker is on disk */
862         if (transaction_sync(tdb, *magic_offset, sizeof(magic)) == -1) {
863                 return -1;
864         }
865
866         return 0;
867 }
868
869 static int _tdb_transaction_prepare_commit(struct tdb_context *tdb)
870 {       
871         const struct tdb_methods *methods;
872
873         if (tdb->transaction == NULL) {
874                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_prepare_commit: no transaction\n"));
875                 return -1;
876         }
877
878         if (tdb->transaction->prepared) {
879                 tdb->ecode = TDB_ERR_EINVAL;
880                 _tdb_transaction_cancel(tdb);
881                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_prepare_commit: transaction already prepared\n"));
882                 return -1;
883         }
884
885         if (tdb->transaction->transaction_error) {
886                 tdb->ecode = TDB_ERR_IO;
887                 _tdb_transaction_cancel(tdb);
888                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_prepare_commit: transaction error pending\n"));
889                 return -1;
890         }
891
892
893         if (tdb->transaction->nesting != 0) {
894                 return 0;
895         }               
896
897         /* check for a null transaction */
898         if (tdb->transaction->blocks == NULL) {
899                 return 0;
900         }
901
902         methods = tdb->transaction->io_methods;
903
904         /* if there are any locks pending then the caller has not
905            nested their locks properly, so fail the transaction */
906         if (tdb_have_extra_locks(tdb)) {
907                 tdb->ecode = TDB_ERR_LOCK;
908                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_prepare_commit: locks pending on commit\n"));
909                 _tdb_transaction_cancel(tdb);
910                 return -1;
911         }
912
913         /* upgrade the main transaction lock region to a write lock */
914         if (tdb_allrecord_upgrade(tdb) == -1) {
915                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_prepare_commit: failed to upgrade hash locks\n"));
916                 _tdb_transaction_cancel(tdb);
917                 return -1;
918         }
919
920         /* get the open lock - this prevents new users attaching to the database
921            during the commit */
922         if (tdb_nest_lock(tdb, OPEN_LOCK, F_WRLCK, TDB_LOCK_WAIT) == -1) {
923                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_prepare_commit: failed to get open lock\n"));
924                 _tdb_transaction_cancel(tdb);
925                 return -1;
926         }
927
928         if (!(tdb->flags & TDB_NOSYNC)) {
929                 /* write the recovery data to the end of the file */
930                 if (transaction_setup_recovery(tdb, &tdb->transaction->magic_offset) == -1) {
931                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_prepare_commit: failed to setup recovery data\n"));
932                         _tdb_transaction_cancel(tdb);
933                         return -1;
934                 }
935         }
936
937         tdb->transaction->prepared = true;
938
939         /* expand the file to the new size if needed */
940         if (tdb->map_size != tdb->transaction->old_map_size) {
941                 if (methods->tdb_expand_file(tdb, tdb->transaction->old_map_size, 
942                                              tdb->map_size - 
943                                              tdb->transaction->old_map_size) == -1) {
944                         tdb->ecode = TDB_ERR_IO;
945                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_prepare_commit: expansion failed\n"));
946                         _tdb_transaction_cancel(tdb);
947                         return -1;
948                 }
949                 tdb->map_size = tdb->transaction->old_map_size;
950                 methods->tdb_oob(tdb, tdb->map_size + 1, 1);
951         }
952
953         /* Keep the open lock until the actual commit */
954
955         return 0;
956 }
957
958 /*
959    prepare to commit the current transaction
960 */
961 int tdb_transaction_prepare_commit(struct tdb_context *tdb)
962 {       
963         tdb_trace(tdb, "tdb_transaction_prepare_commit");
964         return _tdb_transaction_prepare_commit(tdb);
965 }
966
967 /*
968   commit the current transaction
969 */
970 int tdb_transaction_commit(struct tdb_context *tdb)
971 {       
972         const struct tdb_methods *methods;
973         int i;
974         bool need_repack;
975
976         if (tdb->transaction == NULL) {
977                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: no transaction\n"));
978                 return -1;
979         }
980
981         tdb_trace(tdb, "tdb_transaction_commit");
982
983         if (tdb->transaction->transaction_error) {
984                 tdb->ecode = TDB_ERR_IO;
985                 _tdb_transaction_cancel(tdb);
986                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: transaction error pending\n"));
987                 return -1;
988         }
989
990
991         if (tdb->transaction->nesting != 0) {
992                 tdb->transaction->nesting--;
993                 return 0;
994         }
995
996         /* check for a null transaction */
997         if (tdb->transaction->blocks == NULL) {
998                 _tdb_transaction_cancel(tdb);
999                 return 0;
1000         }
1001
1002         if (!tdb->transaction->prepared) {
1003                 int ret = _tdb_transaction_prepare_commit(tdb);
1004                 if (ret)
1005                         return ret;
1006         }
1007
1008         methods = tdb->transaction->io_methods;
1009
1010         /* perform all the writes */
1011         for (i=0;i<tdb->transaction->num_blocks;i++) {
1012                 tdb_off_t offset;
1013                 tdb_len_t length;
1014
1015                 if (tdb->transaction->blocks[i] == NULL) {
1016                         continue;
1017                 }
1018
1019                 offset = i * tdb->transaction->block_size;
1020                 length = tdb->transaction->block_size;
1021                 if (i == tdb->transaction->num_blocks-1) {
1022                         length = tdb->transaction->last_block_size;
1023                 }
1024
1025                 if (methods->tdb_write(tdb, offset, tdb->transaction->blocks[i], length) == -1) {
1026                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: write failed during commit\n"));
1027
1028                         /* we've overwritten part of the data and
1029                            possibly expanded the file, so we need to
1030                            run the crash recovery code */
1031                         tdb->methods = methods;
1032                         tdb_transaction_recover(tdb); 
1033
1034                         _tdb_transaction_cancel(tdb);
1035
1036                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: write failed\n"));
1037                         return -1;
1038                 }
1039                 SAFE_FREE(tdb->transaction->blocks[i]);
1040         } 
1041
1042         SAFE_FREE(tdb->transaction->blocks);
1043         tdb->transaction->num_blocks = 0;
1044
1045         /* ensure the new data is on disk */
1046         if (transaction_sync(tdb, 0, tdb->map_size) == -1) {
1047                 return -1;
1048         }
1049
1050         /*
1051           TODO: maybe write to some dummy hdr field, or write to magic
1052           offset without mmap, before the last sync, instead of the
1053           utime() call
1054         */
1055
1056         /* on some systems (like Linux 2.6.x) changes via mmap/msync
1057            don't change the mtime of the file, this means the file may
1058            not be backed up (as tdb rounding to block sizes means that
1059            file size changes are quite rare too). The following forces
1060            mtime changes when a transaction completes */
1061 #ifdef HAVE_UTIME
1062         utime(tdb->name, NULL);
1063 #endif
1064
1065         need_repack = tdb->transaction->need_repack;
1066
1067         /* use a transaction cancel to free memory and remove the
1068            transaction locks */
1069         _tdb_transaction_cancel(tdb);
1070
1071         if (need_repack) {
1072                 return tdb_repack(tdb);
1073         }
1074
1075         return 0;
1076 }
1077
1078
1079 /*
1080   recover from an aborted transaction. Must be called with exclusive
1081   database write access already established (including the open
1082   lock to prevent new processes attaching)
1083 */
1084 int tdb_transaction_recover(struct tdb_context *tdb)
1085 {
1086         tdb_off_t recovery_head, recovery_eof;
1087         unsigned char *data, *p;
1088         uint32_t zero = 0;
1089         struct tdb_record rec;
1090
1091         /* find the recovery area */
1092         if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, &recovery_head) == -1) {
1093                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery head\n"));
1094                 tdb->ecode = TDB_ERR_IO;
1095                 return -1;
1096         }
1097
1098         if (recovery_head == 0) {
1099                 /* we have never allocated a recovery record */
1100                 return 0;
1101         }
1102
1103         /* read the recovery record */
1104         if (tdb->methods->tdb_read(tdb, recovery_head, &rec, 
1105                                    sizeof(rec), DOCONV()) == -1) {
1106                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery record\n"));           
1107                 tdb->ecode = TDB_ERR_IO;
1108                 return -1;
1109         }
1110
1111         if (rec.magic != TDB_RECOVERY_MAGIC) {
1112                 /* there is no valid recovery data */
1113                 return 0;
1114         }
1115
1116         if (tdb->read_only) {
1117                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: attempt to recover read only database\n"));
1118                 tdb->ecode = TDB_ERR_CORRUPT;
1119                 return -1;
1120         }
1121
1122         recovery_eof = rec.key_len;
1123
1124         data = (unsigned char *)malloc(rec.data_len);
1125         if (data == NULL) {
1126                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to allocate recovery data\n"));         
1127                 tdb->ecode = TDB_ERR_OOM;
1128                 return -1;
1129         }
1130
1131         /* read the full recovery data */
1132         if (tdb->methods->tdb_read(tdb, recovery_head + sizeof(rec), data,
1133                                    rec.data_len, 0) == -1) {
1134                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery data\n"));             
1135                 tdb->ecode = TDB_ERR_IO;
1136                 return -1;
1137         }
1138
1139         /* recover the file data */
1140         p = data;
1141         while (p+8 < data + rec.data_len) {
1142                 uint32_t ofs, len;
1143                 if (DOCONV()) {
1144                         tdb_convert(p, 8);
1145                 }
1146                 memcpy(&ofs, p, 4);
1147                 memcpy(&len, p+4, 4);
1148
1149                 if (tdb->methods->tdb_write(tdb, ofs, p+8, len) == -1) {
1150                         free(data);
1151                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to recover %d bytes at offset %d\n", len, ofs));
1152                         tdb->ecode = TDB_ERR_IO;
1153                         return -1;
1154                 }
1155                 p += 8 + len;
1156         }
1157
1158         free(data);
1159
1160         if (transaction_sync(tdb, 0, tdb->map_size) == -1) {
1161                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to sync recovery\n"));
1162                 tdb->ecode = TDB_ERR_IO;
1163                 return -1;
1164         }
1165
1166         /* if the recovery area is after the recovered eof then remove it */
1167         if (recovery_eof <= recovery_head) {
1168                 if (tdb_ofs_write(tdb, TDB_RECOVERY_HEAD, &zero) == -1) {
1169                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to remove recovery head\n"));
1170                         tdb->ecode = TDB_ERR_IO;
1171                         return -1;                      
1172                 }
1173         }
1174
1175         /* remove the recovery magic */
1176         if (tdb_ofs_write(tdb, recovery_head + offsetof(struct tdb_record, magic),
1177                           &zero) == -1) {
1178                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to remove recovery magic\n"));
1179                 tdb->ecode = TDB_ERR_IO;
1180                 return -1;                      
1181         }
1182
1183         if (transaction_sync(tdb, 0, recovery_eof) == -1) {
1184                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to sync2 recovery\n"));
1185                 tdb->ecode = TDB_ERR_IO;
1186                 return -1;
1187         }
1188
1189         TDB_LOG((tdb, TDB_DEBUG_TRACE, "tdb_transaction_recover: recovered %d byte database\n", 
1190                  recovery_eof));
1191
1192         /* all done */
1193         return 0;
1194 }
1195
1196 /* Any I/O failures we say "needs recovery". */
1197 bool tdb_needs_recovery(struct tdb_context *tdb)
1198 {
1199         tdb_off_t recovery_head;
1200         struct tdb_record rec;
1201
1202         /* find the recovery area */
1203         if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, &recovery_head) == -1) {
1204                 return true;
1205         }
1206
1207         if (recovery_head == 0) {
1208                 /* we have never allocated a recovery record */
1209                 return false;
1210         }
1211
1212         /* read the recovery record */
1213         if (tdb->methods->tdb_read(tdb, recovery_head, &rec,
1214                                    sizeof(rec), DOCONV()) == -1) {
1215                 return true;
1216         }
1217
1218         return (rec.magic == TDB_RECOVERY_MAGIC);
1219 }