Add tdb_transaction_prepare_commit()
[metze/samba/wip.git] / lib / tdb / common / transaction.c
1  /* 
2    Unix SMB/CIFS implementation.
3
4    trivial database library
5
6    Copyright (C) Andrew Tridgell              2005
7
8      ** NOTE! The following LGPL license applies to the tdb
9      ** library. This does NOT imply that all of Samba is released
10      ** under the LGPL
11    
12    This library is free software; you can redistribute it and/or
13    modify it under the terms of the GNU Lesser General Public
14    License as published by the Free Software Foundation; either
15    version 3 of the License, or (at your option) any later version.
16
17    This library is distributed in the hope that it will be useful,
18    but WITHOUT ANY WARRANTY; without even the implied warranty of
19    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
20    Lesser General Public License for more details.
21
22    You should have received a copy of the GNU Lesser General Public
23    License along with this library; if not, see <http://www.gnu.org/licenses/>.
24 */
25
26 #include "tdb_private.h"
27
28 /*
29   transaction design:
30
31   - only allow a single transaction at a time per database. This makes
32     using the transaction API simpler, as otherwise the caller would
33     have to cope with temporary failures in transactions that conflict
34     with other current transactions
35
36   - keep the transaction recovery information in the same file as the
37     database, using a special 'transaction recovery' record pointed at
38     by the header. This removes the need for extra journal files as
39     used by some other databases
40
41   - dynamically allocated the transaction recover record, re-using it
42     for subsequent transactions. If a larger record is needed then
43     tdb_free() the old record to place it on the normal tdb freelist
44     before allocating the new record
45
46   - during transactions, keep a linked list of writes all that have
47     been performed by intercepting all tdb_write() calls. The hooked
48     transaction versions of tdb_read() and tdb_write() check this
49     linked list and try to use the elements of the list in preference
50     to the real database.
51
52   - don't allow any locks to be held when a transaction starts,
53     otherwise we can end up with deadlock (plus lack of lock nesting
54     in posix locks would mean the lock is lost)
55
56   - if the caller gains a lock during the transaction but doesn't
57     release it then fail the commit
58
59   - allow for nested calls to tdb_transaction_start(), re-using the
60     existing transaction record. If the inner transaction is cancelled
61     then a subsequent commit will fail
62  
63   - keep a mirrored copy of the tdb hash chain heads to allow for the
64     fast hash heads scan on traverse, updating the mirrored copy in
65     the transaction version of tdb_write
66
67   - allow callers to mix transaction and non-transaction use of tdb,
68     although once a transaction is started then an exclusive lock is
69     gained until the transaction is committed or cancelled
70
71   - the commit stategy involves first saving away all modified data
72     into a linearised buffer in the transaction recovery area, then
73     marking the transaction recovery area with a magic value to
74     indicate a valid recovery record. In total 4 fsync/msync calls are
75     needed per commit to prevent race conditions. It might be possible
76     to reduce this to 3 or even 2 with some more work.
77
78   - check for a valid recovery record on open of the tdb, while the
79     global lock is held. Automatically recover from the transaction
80     recovery area if needed, then continue with the open as
81     usual. This allows for smooth crash recovery with no administrator
82     intervention.
83
84   - if TDB_NOSYNC is passed to flags in tdb_open then transactions are
85     still available, but no transaction recovery area is used and no
86     fsync/msync calls are made.
87
88 */
89
90
91 /*
92   hold the context of any current transaction
93 */
94 struct tdb_transaction {
95         /* we keep a mirrored copy of the tdb hash heads here so
96            tdb_next_hash_chain() can operate efficiently */
97         uint32_t *hash_heads;
98
99         /* the original io methods - used to do IOs to the real db */
100         const struct tdb_methods *io_methods;
101
102         /* the list of transaction blocks. When a block is first
103            written to, it gets created in this list */
104         uint8_t **blocks;
105         uint32_t num_blocks;
106         uint32_t block_size;      /* bytes in each block */
107         uint32_t last_block_size; /* number of valid bytes in the last block */
108
109         /* non-zero when an internal transaction error has
110            occurred. All write operations will then fail until the
111            transaction is ended */
112         int transaction_error;
113
114         /* when inside a transaction we need to keep track of any
115            nested tdb_transaction_start() calls, as these are allowed,
116            but don't create a new transaction */
117         int nesting;
118
119         /* set when a prepare has already occurred */
120         bool prepared;
121         tdb_off_t magic_offset;
122
123         /* old file size before transaction */
124         tdb_len_t old_map_size;
125 };
126
127
128 /*
129   read while in a transaction. We need to check first if the data is in our list
130   of transaction elements, then if not do a real read
131 */
132 static int transaction_read(struct tdb_context *tdb, tdb_off_t off, void *buf, 
133                             tdb_len_t len, int cv)
134 {
135         uint32_t blk;
136
137         /* Only a commit is allowed on a prepared transaction */
138         if (tdb->transaction->prepared) {
139                 tdb->ecode = TDB_ERR_EINVAL;
140                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "transaction_read: transaction already prepared, read not allowed\n"));
141                 tdb->transaction->transaction_error = 1;
142                 return -1;
143         }
144
145         /* break it down into block sized ops */
146         while (len + (off % tdb->transaction->block_size) > tdb->transaction->block_size) {
147                 tdb_len_t len2 = tdb->transaction->block_size - (off % tdb->transaction->block_size);
148                 if (transaction_read(tdb, off, buf, len2, cv) != 0) {
149                         return -1;
150                 }
151                 len -= len2;
152                 off += len2;
153                 buf = (void *)(len2 + (char *)buf);
154         }
155
156         if (len == 0) {
157                 return 0;
158         }
159
160         blk = off / tdb->transaction->block_size;
161
162         /* see if we have it in the block list */
163         if (tdb->transaction->num_blocks <= blk ||
164             tdb->transaction->blocks[blk] == NULL) {
165                 /* nope, do a real read */
166                 if (tdb->transaction->io_methods->tdb_read(tdb, off, buf, len, cv) != 0) {
167                         goto fail;
168                 }
169                 return 0;
170         }
171
172         /* it is in the block list. Now check for the last block */
173         if (blk == tdb->transaction->num_blocks-1) {
174                 if (len > tdb->transaction->last_block_size) {
175                         goto fail;
176                 }
177         }
178         
179         /* now copy it out of this block */
180         memcpy(buf, tdb->transaction->blocks[blk] + (off % tdb->transaction->block_size), len);
181         if (cv) {
182                 tdb_convert(buf, len);
183         }
184         return 0;
185
186 fail:
187         TDB_LOG((tdb, TDB_DEBUG_FATAL, "transaction_read: failed at off=%d len=%d\n", off, len));
188         tdb->ecode = TDB_ERR_IO;
189         tdb->transaction->transaction_error = 1;
190         return -1;
191 }
192
193
194 /*
195   write while in a transaction
196 */
197 static int transaction_write(struct tdb_context *tdb, tdb_off_t off, 
198                              const void *buf, tdb_len_t len)
199 {
200         uint32_t blk;
201
202         /* Only a commit is allowed on a prepared transaction */
203         if (tdb->transaction->prepared) {
204                 tdb->ecode = TDB_ERR_EINVAL;
205                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "transaction_write: transaction already prepared, write not allowed\n"));
206                 tdb->transaction->transaction_error = 1;
207                 return -1;
208         }
209
210         /* if the write is to a hash head, then update the transaction
211            hash heads */
212         if (len == sizeof(tdb_off_t) && off >= FREELIST_TOP &&
213             off < FREELIST_TOP+TDB_HASHTABLE_SIZE(tdb)) {
214                 uint32_t chain = (off-FREELIST_TOP) / sizeof(tdb_off_t);
215                 memcpy(&tdb->transaction->hash_heads[chain], buf, len);
216         }
217
218         /* break it up into block sized chunks */
219         while (len + (off % tdb->transaction->block_size) > tdb->transaction->block_size) {
220                 tdb_len_t len2 = tdb->transaction->block_size - (off % tdb->transaction->block_size);
221                 if (transaction_write(tdb, off, buf, len2) != 0) {
222                         return -1;
223                 }
224                 len -= len2;
225                 off += len2;
226                 if (buf != NULL) {
227                         buf = (const void *)(len2 + (const char *)buf);
228                 }
229         }
230
231         if (len == 0) {
232                 return 0;
233         }
234
235         blk = off / tdb->transaction->block_size;
236         off = off % tdb->transaction->block_size;
237
238         if (tdb->transaction->num_blocks <= blk) {
239                 uint8_t **new_blocks;
240                 /* expand the blocks array */
241                 if (tdb->transaction->blocks == NULL) {
242                         new_blocks = (uint8_t **)malloc(
243                                 (blk+1)*sizeof(uint8_t *));
244                 } else {
245                         new_blocks = (uint8_t **)realloc(
246                                 tdb->transaction->blocks,
247                                 (blk+1)*sizeof(uint8_t *));
248                 }
249                 if (new_blocks == NULL) {
250                         tdb->ecode = TDB_ERR_OOM;
251                         goto fail;
252                 }
253                 memset(&new_blocks[tdb->transaction->num_blocks], 0, 
254                        (1+(blk - tdb->transaction->num_blocks))*sizeof(uint8_t *));
255                 tdb->transaction->blocks = new_blocks;
256                 tdb->transaction->num_blocks = blk+1;
257                 tdb->transaction->last_block_size = 0;
258         }
259
260         /* allocate and fill a block? */
261         if (tdb->transaction->blocks[blk] == NULL) {
262                 tdb->transaction->blocks[blk] = (uint8_t *)calloc(tdb->transaction->block_size, 1);
263                 if (tdb->transaction->blocks[blk] == NULL) {
264                         tdb->ecode = TDB_ERR_OOM;
265                         tdb->transaction->transaction_error = 1;
266                         return -1;                      
267                 }
268                 if (tdb->transaction->old_map_size > blk * tdb->transaction->block_size) {
269                         tdb_len_t len2 = tdb->transaction->block_size;
270                         if (len2 + (blk * tdb->transaction->block_size) > tdb->transaction->old_map_size) {
271                                 len2 = tdb->transaction->old_map_size - (blk * tdb->transaction->block_size);
272                         }
273                         if (tdb->transaction->io_methods->tdb_read(tdb, blk * tdb->transaction->block_size, 
274                                                                    tdb->transaction->blocks[blk], 
275                                                                    len2, 0) != 0) {
276                                 SAFE_FREE(tdb->transaction->blocks[blk]);                               
277                                 tdb->ecode = TDB_ERR_IO;
278                                 goto fail;
279                         }
280                         if (blk == tdb->transaction->num_blocks-1) {
281                                 tdb->transaction->last_block_size = len2;
282                         }                       
283                 }
284         }
285         
286         /* overwrite part of an existing block */
287         if (buf == NULL) {
288                 memset(tdb->transaction->blocks[blk] + off, 0, len);
289         } else {
290                 memcpy(tdb->transaction->blocks[blk] + off, buf, len);
291         }
292         if (blk == tdb->transaction->num_blocks-1) {
293                 if (len + off > tdb->transaction->last_block_size) {
294                         tdb->transaction->last_block_size = len + off;
295                 }
296         }
297
298         return 0;
299
300 fail:
301         TDB_LOG((tdb, TDB_DEBUG_FATAL, "transaction_write: failed at off=%d len=%d\n", 
302                  (blk*tdb->transaction->block_size) + off, len));
303         tdb->transaction->transaction_error = 1;
304         return -1;
305 }
306
307
308 /*
309   write while in a transaction - this varient never expands the transaction blocks, it only
310   updates existing blocks. This means it cannot change the recovery size
311 */
312 static int transaction_write_existing(struct tdb_context *tdb, tdb_off_t off, 
313                                       const void *buf, tdb_len_t len)
314 {
315         uint32_t blk;
316
317         /* break it up into block sized chunks */
318         while (len + (off % tdb->transaction->block_size) > tdb->transaction->block_size) {
319                 tdb_len_t len2 = tdb->transaction->block_size - (off % tdb->transaction->block_size);
320                 if (transaction_write_existing(tdb, off, buf, len2) != 0) {
321                         return -1;
322                 }
323                 len -= len2;
324                 off += len2;
325                 if (buf != NULL) {
326                         buf = (const void *)(len2 + (const char *)buf);
327                 }
328         }
329
330         if (len == 0) {
331                 return 0;
332         }
333
334         blk = off / tdb->transaction->block_size;
335         off = off % tdb->transaction->block_size;
336
337         if (tdb->transaction->num_blocks <= blk ||
338             tdb->transaction->blocks[blk] == NULL) {
339                 return 0;
340         }
341
342         if (blk == tdb->transaction->num_blocks-1 &&
343             off + len > tdb->transaction->last_block_size) {
344                 if (off >= tdb->transaction->last_block_size) {
345                         return 0;
346                 }
347                 len = tdb->transaction->last_block_size - off;
348         }
349
350         /* overwrite part of an existing block */
351         memcpy(tdb->transaction->blocks[blk] + off, buf, len);
352
353         return 0;
354 }
355
356
357 /*
358   accelerated hash chain head search, using the cached hash heads
359 */
360 static void transaction_next_hash_chain(struct tdb_context *tdb, uint32_t *chain)
361 {
362         uint32_t h = *chain;
363         for (;h < tdb->header.hash_size;h++) {
364                 /* the +1 takes account of the freelist */
365                 if (0 != tdb->transaction->hash_heads[h+1]) {
366                         break;
367                 }
368         }
369         (*chain) = h;
370 }
371
372 /*
373   out of bounds check during a transaction
374 */
375 static int transaction_oob(struct tdb_context *tdb, tdb_off_t len, int probe)
376 {
377         if (len <= tdb->map_size) {
378                 return 0;
379         }
380         return TDB_ERRCODE(TDB_ERR_IO, -1);
381 }
382
383 /*
384   transaction version of tdb_expand().
385 */
386 static int transaction_expand_file(struct tdb_context *tdb, tdb_off_t size, 
387                                    tdb_off_t addition)
388 {
389         /* add a write to the transaction elements, so subsequent
390            reads see the zero data */
391         if (transaction_write(tdb, size, NULL, addition) != 0) {
392                 return -1;
393         }
394
395         return 0;
396 }
397
398 /*
399   brlock during a transaction - ignore them
400 */
401 static int transaction_brlock(struct tdb_context *tdb, tdb_off_t offset, 
402                               int rw_type, int lck_type, int probe, size_t len)
403 {
404         return 0;
405 }
406
407 static const struct tdb_methods transaction_methods = {
408         transaction_read,
409         transaction_write,
410         transaction_next_hash_chain,
411         transaction_oob,
412         transaction_expand_file,
413         transaction_brlock
414 };
415
416
417 /*
418   start a tdb transaction. No token is returned, as only a single
419   transaction is allowed to be pending per tdb_context
420 */
421 int tdb_transaction_start(struct tdb_context *tdb)
422 {
423         /* some sanity checks */
424         if (tdb->read_only || (tdb->flags & TDB_INTERNAL) || tdb->traverse_read) {
425                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction on a read-only or internal db\n"));
426                 tdb->ecode = TDB_ERR_EINVAL;
427                 return -1;
428         }
429
430         /* cope with nested tdb_transaction_start() calls */
431         if (tdb->transaction != NULL) {
432                 tdb->transaction->nesting++;
433                 TDB_LOG((tdb, TDB_DEBUG_TRACE, "tdb_transaction_start: nesting %d\n", 
434                          tdb->transaction->nesting));
435                 return 0;
436         }
437
438         if (tdb->num_locks != 0 || tdb->global_lock.count) {
439                 /* the caller must not have any locks when starting a
440                    transaction as otherwise we'll be screwed by lack
441                    of nested locks in posix */
442                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction with locks held\n"));
443                 tdb->ecode = TDB_ERR_LOCK;
444                 return -1;
445         }
446
447         if (tdb->travlocks.next != NULL) {
448                 /* you cannot use transactions inside a traverse (although you can use
449                    traverse inside a transaction) as otherwise you can end up with
450                    deadlock */
451                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction within a traverse\n"));
452                 tdb->ecode = TDB_ERR_LOCK;
453                 return -1;
454         }
455
456         tdb->transaction = (struct tdb_transaction *)
457                 calloc(sizeof(struct tdb_transaction), 1);
458         if (tdb->transaction == NULL) {
459                 tdb->ecode = TDB_ERR_OOM;
460                 return -1;
461         }
462
463         /* a page at a time seems like a reasonable compromise between compactness and efficiency */
464         tdb->transaction->block_size = tdb->page_size;
465
466         /* get the transaction write lock. This is a blocking lock. As
467            discussed with Volker, there are a number of ways we could
468            make this async, which we will probably do in the future */
469         if (tdb_transaction_lock(tdb, F_WRLCK) == -1) {
470                 SAFE_FREE(tdb->transaction->blocks);
471                 SAFE_FREE(tdb->transaction);
472                 return -1;
473         }
474         
475         /* get a read lock from the freelist to the end of file. This
476            is upgraded to a write lock during the commit */
477         if (tdb_brlock(tdb, FREELIST_TOP, F_RDLCK, F_SETLKW, 0, 0) == -1) {
478                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: failed to get hash locks\n"));
479                 tdb->ecode = TDB_ERR_LOCK;
480                 goto fail;
481         }
482
483         /* setup a copy of the hash table heads so the hash scan in
484            traverse can be fast */
485         tdb->transaction->hash_heads = (uint32_t *)
486                 calloc(tdb->header.hash_size+1, sizeof(uint32_t));
487         if (tdb->transaction->hash_heads == NULL) {
488                 tdb->ecode = TDB_ERR_OOM;
489                 goto fail;
490         }
491         if (tdb->methods->tdb_read(tdb, FREELIST_TOP, tdb->transaction->hash_heads,
492                                    TDB_HASHTABLE_SIZE(tdb), 0) != 0) {
493                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_start: failed to read hash heads\n"));
494                 tdb->ecode = TDB_ERR_IO;
495                 goto fail;
496         }
497
498         /* make sure we know about any file expansions already done by
499            anyone else */
500         tdb->methods->tdb_oob(tdb, tdb->map_size + 1, 1);
501         tdb->transaction->old_map_size = tdb->map_size;
502
503         /* finally hook the io methods, replacing them with
504            transaction specific methods */
505         tdb->transaction->io_methods = tdb->methods;
506         tdb->methods = &transaction_methods;
507
508         return 0;
509         
510 fail:
511         tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 0);
512         tdb_transaction_unlock(tdb);
513         SAFE_FREE(tdb->transaction->blocks);
514         SAFE_FREE(tdb->transaction->hash_heads);
515         SAFE_FREE(tdb->transaction);
516         return -1;
517 }
518
519
520 /*
521   sync to disk
522 */
523 static int transaction_sync(struct tdb_context *tdb, tdb_off_t offset, tdb_len_t length)
524 {       
525         if (fsync(tdb->fd) != 0) {
526                 tdb->ecode = TDB_ERR_IO;
527                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction: fsync failed\n"));
528                 return -1;
529         }
530 #ifdef HAVE_MMAP
531         if (tdb->map_ptr) {
532                 tdb_off_t moffset = offset & ~(tdb->page_size-1);
533                 if (msync(moffset + (char *)tdb->map_ptr, 
534                           length + (offset - moffset), MS_SYNC) != 0) {
535                         tdb->ecode = TDB_ERR_IO;
536                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction: msync failed - %s\n",
537                                  strerror(errno)));
538                         return -1;
539                 }
540         }
541 #endif
542         return 0;
543 }
544
545
546 /*
547   cancel the current transaction
548 */
549 int tdb_transaction_cancel(struct tdb_context *tdb)
550 {       
551         int i, ret = 0;
552
553         if (tdb->transaction == NULL) {
554                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_cancel: no transaction\n"));
555                 return -1;
556         }
557
558         if (tdb->transaction->nesting != 0) {
559                 tdb->transaction->transaction_error = 1;
560                 tdb->transaction->nesting--;
561                 return 0;
562         }               
563
564         tdb->map_size = tdb->transaction->old_map_size;
565
566         /* free all the transaction blocks */
567         for (i=0;i<tdb->transaction->num_blocks;i++) {
568                 if (tdb->transaction->blocks[i] != NULL) {
569                         free(tdb->transaction->blocks[i]);
570                 }
571         }
572         SAFE_FREE(tdb->transaction->blocks);
573
574         if (tdb->transaction->magic_offset) {
575                 const struct tdb_methods *methods = tdb->transaction->io_methods;
576                 uint32_t zero = 0;
577
578                 /* remove the recovery marker */
579                 if (methods->tdb_write(tdb, tdb->transaction->magic_offset, &zero, 4) == -1 ||
580                 transaction_sync(tdb, tdb->transaction->magic_offset, 4) == -1) {
581                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_cancel: failed to remove recovery magic\n"));
582                         ret = -1;
583                 }
584         }
585
586         /* remove any global lock created during the transaction */
587         if (tdb->global_lock.count != 0) {
588                 tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 4*tdb->header.hash_size);
589                 tdb->global_lock.count = 0;
590         }
591
592         /* remove any locks created during the transaction */
593         if (tdb->num_locks != 0) {
594                 for (i=0;i<tdb->num_lockrecs;i++) {
595                         tdb_brlock(tdb,FREELIST_TOP+4*tdb->lockrecs[i].list,
596                                    F_UNLCK,F_SETLKW, 0, 1);
597                 }
598                 tdb->num_locks = 0;
599                 tdb->num_lockrecs = 0;
600                 SAFE_FREE(tdb->lockrecs);
601         }
602
603         /* restore the normal io methods */
604         tdb->methods = tdb->transaction->io_methods;
605
606         tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 0);
607         tdb_transaction_unlock(tdb);
608         SAFE_FREE(tdb->transaction->hash_heads);
609         SAFE_FREE(tdb->transaction);
610         
611         return ret;
612 }
613
614
615 /*
616   work out how much space the linearised recovery data will consume
617 */
618 static tdb_len_t tdb_recovery_size(struct tdb_context *tdb)
619 {
620         tdb_len_t recovery_size = 0;
621         int i;
622
623         recovery_size = sizeof(uint32_t);
624         for (i=0;i<tdb->transaction->num_blocks;i++) {
625                 if (i * tdb->transaction->block_size >= tdb->transaction->old_map_size) {
626                         break;
627                 }
628                 if (tdb->transaction->blocks[i] == NULL) {
629                         continue;
630                 }
631                 recovery_size += 2*sizeof(tdb_off_t);
632                 if (i == tdb->transaction->num_blocks-1) {
633                         recovery_size += tdb->transaction->last_block_size;
634                 } else {
635                         recovery_size += tdb->transaction->block_size;
636                 }
637         }       
638
639         return recovery_size;
640 }
641
642 /*
643   allocate the recovery area, or use an existing recovery area if it is
644   large enough
645 */
646 static int tdb_recovery_allocate(struct tdb_context *tdb, 
647                                  tdb_len_t *recovery_size,
648                                  tdb_off_t *recovery_offset,
649                                  tdb_len_t *recovery_max_size)
650 {
651         struct list_struct rec;
652         const struct tdb_methods *methods = tdb->transaction->io_methods;
653         tdb_off_t recovery_head;
654
655         if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, &recovery_head) == -1) {
656                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to read recovery head\n"));
657                 return -1;
658         }
659
660         rec.rec_len = 0;
661
662         if (recovery_head != 0 && 
663             methods->tdb_read(tdb, recovery_head, &rec, sizeof(rec), DOCONV()) == -1) {
664                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to read recovery record\n"));
665                 return -1;
666         }
667
668         *recovery_size = tdb_recovery_size(tdb);
669
670         if (recovery_head != 0 && *recovery_size <= rec.rec_len) {
671                 /* it fits in the existing area */
672                 *recovery_max_size = rec.rec_len;
673                 *recovery_offset = recovery_head;
674                 return 0;
675         }
676
677         /* we need to free up the old recovery area, then allocate a
678            new one at the end of the file. Note that we cannot use
679            tdb_allocate() to allocate the new one as that might return
680            us an area that is being currently used (as of the start of
681            the transaction) */
682         if (recovery_head != 0) {
683                 if (tdb_free(tdb, recovery_head, &rec) == -1) {
684                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to free previous recovery area\n"));
685                         return -1;
686                 }
687         }
688
689         /* the tdb_free() call might have increased the recovery size */
690         *recovery_size = tdb_recovery_size(tdb);
691
692         /* round up to a multiple of page size */
693         *recovery_max_size = TDB_ALIGN(sizeof(rec) + *recovery_size, tdb->page_size) - sizeof(rec);
694         *recovery_offset = tdb->map_size;
695         recovery_head = *recovery_offset;
696
697         if (methods->tdb_expand_file(tdb, tdb->transaction->old_map_size, 
698                                      (tdb->map_size - tdb->transaction->old_map_size) +
699                                      sizeof(rec) + *recovery_max_size) == -1) {
700                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to create recovery area\n"));
701                 return -1;
702         }
703
704         /* remap the file (if using mmap) */
705         methods->tdb_oob(tdb, tdb->map_size + 1, 1);
706
707         /* we have to reset the old map size so that we don't try to expand the file
708            again in the transaction commit, which would destroy the recovery area */
709         tdb->transaction->old_map_size = tdb->map_size;
710
711         /* write the recovery header offset and sync - we can sync without a race here
712            as the magic ptr in the recovery record has not been set */
713         CONVERT(recovery_head);
714         if (methods->tdb_write(tdb, TDB_RECOVERY_HEAD, 
715                                &recovery_head, sizeof(tdb_off_t)) == -1) {
716                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to write recovery head\n"));
717                 return -1;
718         }
719         if (transaction_write_existing(tdb, TDB_RECOVERY_HEAD, &recovery_head, sizeof(tdb_off_t)) == -1) {
720                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to write recovery head\n"));
721                 return -1;
722         }
723
724         return 0;
725 }
726
727
728 /*
729   setup the recovery data that will be used on a crash during commit
730 */
731 static int transaction_setup_recovery(struct tdb_context *tdb, 
732                                       tdb_off_t *magic_offset)
733 {
734         tdb_len_t recovery_size;
735         unsigned char *data, *p;
736         const struct tdb_methods *methods = tdb->transaction->io_methods;
737         struct list_struct *rec;
738         tdb_off_t recovery_offset, recovery_max_size;
739         tdb_off_t old_map_size = tdb->transaction->old_map_size;
740         uint32_t magic, tailer;
741         int i;
742
743         /*
744           check that the recovery area has enough space
745         */
746         if (tdb_recovery_allocate(tdb, &recovery_size, 
747                                   &recovery_offset, &recovery_max_size) == -1) {
748                 return -1;
749         }
750
751         data = (unsigned char *)malloc(recovery_size + sizeof(*rec));
752         if (data == NULL) {
753                 tdb->ecode = TDB_ERR_OOM;
754                 return -1;
755         }
756
757         rec = (struct list_struct *)data;
758         memset(rec, 0, sizeof(*rec));
759
760         rec->magic    = 0;
761         rec->data_len = recovery_size;
762         rec->rec_len  = recovery_max_size;
763         rec->key_len  = old_map_size;
764         CONVERT(rec);
765
766         /* build the recovery data into a single blob to allow us to do a single
767            large write, which should be more efficient */
768         p = data + sizeof(*rec);
769         for (i=0;i<tdb->transaction->num_blocks;i++) {
770                 tdb_off_t offset;
771                 tdb_len_t length;
772
773                 if (tdb->transaction->blocks[i] == NULL) {
774                         continue;
775                 }
776
777                 offset = i * tdb->transaction->block_size;
778                 length = tdb->transaction->block_size;
779                 if (i == tdb->transaction->num_blocks-1) {
780                         length = tdb->transaction->last_block_size;
781                 }
782                 
783                 if (offset >= old_map_size) {
784                         continue;
785                 }
786                 if (offset + length > tdb->transaction->old_map_size) {
787                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: transaction data over new region boundary\n"));
788                         free(data);
789                         tdb->ecode = TDB_ERR_CORRUPT;
790                         return -1;
791                 }
792                 memcpy(p, &offset, 4);
793                 memcpy(p+4, &length, 4);
794                 if (DOCONV()) {
795                         tdb_convert(p, 8);
796                 }
797                 /* the recovery area contains the old data, not the
798                    new data, so we have to call the original tdb_read
799                    method to get it */
800                 if (methods->tdb_read(tdb, offset, p + 8, length, 0) != 0) {
801                         free(data);
802                         tdb->ecode = TDB_ERR_IO;
803                         return -1;
804                 }
805                 p += 8 + length;
806         }
807
808         /* and the tailer */
809         tailer = sizeof(*rec) + recovery_max_size;
810         memcpy(p, &tailer, 4);
811         CONVERT(p);
812
813         /* write the recovery data to the recovery area */
814         if (methods->tdb_write(tdb, recovery_offset, data, sizeof(*rec) + recovery_size) == -1) {
815                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write recovery data\n"));
816                 free(data);
817                 tdb->ecode = TDB_ERR_IO;
818                 return -1;
819         }
820         if (transaction_write_existing(tdb, recovery_offset, data, sizeof(*rec) + recovery_size) == -1) {
821                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write secondary recovery data\n"));
822                 free(data);
823                 tdb->ecode = TDB_ERR_IO;
824                 return -1;
825         }
826
827         /* as we don't have ordered writes, we have to sync the recovery
828            data before we update the magic to indicate that the recovery
829            data is present */
830         if (transaction_sync(tdb, recovery_offset, sizeof(*rec) + recovery_size) == -1) {
831                 free(data);
832                 return -1;
833         }
834
835         free(data);
836
837         magic = TDB_RECOVERY_MAGIC;
838         CONVERT(magic);
839
840         *magic_offset = recovery_offset + offsetof(struct list_struct, magic);
841
842         if (methods->tdb_write(tdb, *magic_offset, &magic, sizeof(magic)) == -1) {
843                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write recovery magic\n"));
844                 tdb->ecode = TDB_ERR_IO;
845                 return -1;
846         }
847         if (transaction_write_existing(tdb, *magic_offset, &magic, sizeof(magic)) == -1) {
848                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write secondary recovery magic\n"));
849                 tdb->ecode = TDB_ERR_IO;
850                 return -1;
851         }
852
853         /* ensure the recovery magic marker is on disk */
854         if (transaction_sync(tdb, *magic_offset, sizeof(magic)) == -1) {
855                 return -1;
856         }
857
858         return 0;
859 }
860
861 /*
862   prepare to commit the current transaction
863 */
864 int tdb_transaction_prepare_commit(struct tdb_context *tdb)
865 {       
866         const struct tdb_methods *methods;
867         int i;
868
869         if (tdb->transaction == NULL) {
870                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_prepare_commit: no transaction\n"));
871                 return -1;
872         }
873
874         if (tdb->transaction->prepared) {
875                 tdb->ecode = TDB_ERR_EINVAL;
876                 tdb_transaction_cancel(tdb);
877                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_prepare_commit: transaction already prepared\n"));
878                 return -1;
879         }
880
881         if (tdb->transaction->transaction_error) {
882                 tdb->ecode = TDB_ERR_IO;
883                 tdb_transaction_cancel(tdb);
884                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_prepare_commit: transaction error pending\n"));
885                 return -1;
886         }
887
888
889         if (tdb->transaction->nesting != 0) {
890                 return 0;
891         }               
892
893         /* check for a null transaction */
894         if (tdb->transaction->blocks == NULL) {
895                 return 0;
896         }
897
898         methods = tdb->transaction->io_methods;
899         
900         /* if there are any locks pending then the caller has not
901            nested their locks properly, so fail the transaction */
902         if (tdb->num_locks || tdb->global_lock.count) {
903                 tdb->ecode = TDB_ERR_LOCK;
904                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_prepare_commit: locks pending on commit\n"));
905                 tdb_transaction_cancel(tdb);
906                 return -1;
907         }
908
909         /* upgrade the main transaction lock region to a write lock */
910         if (tdb_brlock_upgrade(tdb, FREELIST_TOP, 0) == -1) {
911                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_prepare_commit: failed to upgrade hash locks\n"));
912                 tdb->ecode = TDB_ERR_LOCK;
913                 tdb_transaction_cancel(tdb);
914                 return -1;
915         }
916
917         /* get the global lock - this prevents new users attaching to the database
918            during the commit */
919         if (tdb_brlock(tdb, GLOBAL_LOCK, F_WRLCK, F_SETLKW, 0, 1) == -1) {
920                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_prepare_commit: failed to get global lock\n"));
921                 tdb->ecode = TDB_ERR_LOCK;
922                 tdb_transaction_cancel(tdb);
923                 return -1;
924         }
925
926         if (!(tdb->flags & TDB_NOSYNC)) {
927                 /* write the recovery data to the end of the file */
928                 if (transaction_setup_recovery(tdb, &tdb->transaction->magic_offset) == -1) {
929                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_prepare_commit: failed to setup recovery data\n"));
930                         tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
931                         tdb_transaction_cancel(tdb);
932                         return -1;
933                 }
934         }
935
936         tdb->transaction->prepared = true;
937
938         /* expand the file to the new size if needed */
939         if (tdb->map_size != tdb->transaction->old_map_size) {
940                 if (methods->tdb_expand_file(tdb, tdb->transaction->old_map_size, 
941                                              tdb->map_size - 
942                                              tdb->transaction->old_map_size) == -1) {
943                         tdb->ecode = TDB_ERR_IO;
944                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_prepare_commit: expansion failed\n"));
945                         tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
946                         tdb_transaction_cancel(tdb);
947                         return -1;
948                 }
949                 tdb->map_size = tdb->transaction->old_map_size;
950                 methods->tdb_oob(tdb, tdb->map_size + 1, 1);
951         }
952
953         /* Keep the global lock until the actual commit */
954
955         return 0;
956 }
957
958 /*
959   commit the current transaction
960 */
961 int tdb_transaction_commit(struct tdb_context *tdb)
962 {       
963         const struct tdb_methods *methods;
964         int i;
965
966         if (tdb->transaction == NULL) {
967                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: no transaction\n"));
968                 return -1;
969         }
970
971         if (tdb->transaction->transaction_error) {
972                 tdb->ecode = TDB_ERR_IO;
973                 tdb_transaction_cancel(tdb);
974                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: transaction error pending\n"));
975                 return -1;
976         }
977
978
979         if (tdb->transaction->nesting != 0) {
980                 tdb->transaction->nesting--;
981                 return 0;
982         }
983
984         /* check for a null transaction */
985         if (tdb->transaction->blocks == NULL) {
986                 tdb_transaction_cancel(tdb);
987                 return 0;
988         }
989
990         if (!tdb->transaction->prepared) {
991                 int ret = tdb_transaction_prepare_commit(tdb);
992                 if (ret)
993                         return ret;
994         }
995
996         methods = tdb->transaction->io_methods;
997
998         /* perform all the writes */
999         for (i=0;i<tdb->transaction->num_blocks;i++) {
1000                 tdb_off_t offset;
1001                 tdb_len_t length;
1002
1003                 if (tdb->transaction->blocks[i] == NULL) {
1004                         continue;
1005                 }
1006
1007                 offset = i * tdb->transaction->block_size;
1008                 length = tdb->transaction->block_size;
1009                 if (i == tdb->transaction->num_blocks-1) {
1010                         length = tdb->transaction->last_block_size;
1011                 }
1012
1013                 if (methods->tdb_write(tdb, offset, tdb->transaction->blocks[i], length) == -1) {
1014                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: write failed during commit\n"));
1015                         
1016                         /* we've overwritten part of the data and
1017                            possibly expanded the file, so we need to
1018                            run the crash recovery code */
1019                         tdb->methods = methods;
1020                         tdb_transaction_recover(tdb); 
1021
1022                         tdb_transaction_cancel(tdb);
1023                         tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
1024
1025                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: write failed\n"));
1026                         return -1;
1027                 }
1028                 SAFE_FREE(tdb->transaction->blocks[i]);
1029         } 
1030
1031         SAFE_FREE(tdb->transaction->blocks);
1032         tdb->transaction->num_blocks = 0;
1033
1034         if (!(tdb->flags & TDB_NOSYNC)) {
1035                 /* ensure the new data is on disk */
1036                 if (transaction_sync(tdb, 0, tdb->map_size) == -1) {
1037                         return -1;
1038                 }
1039         }
1040
1041         tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
1042
1043         /*
1044           TODO: maybe write to some dummy hdr field, or write to magic
1045           offset without mmap, before the last sync, instead of the
1046           utime() call
1047         */
1048
1049         /* on some systems (like Linux 2.6.x) changes via mmap/msync
1050            don't change the mtime of the file, this means the file may
1051            not be backed up (as tdb rounding to block sizes means that
1052            file size changes are quite rare too). The following forces
1053            mtime changes when a transaction completes */
1054 #ifdef HAVE_UTIME
1055         utime(tdb->name, NULL);
1056 #endif
1057
1058         /* use a transaction cancel to free memory and remove the
1059            transaction locks */
1060         tdb_transaction_cancel(tdb);
1061
1062         return 0;
1063 }
1064
1065
1066 /*
1067   recover from an aborted transaction. Must be called with exclusive
1068   database write access already established (including the global
1069   lock to prevent new processes attaching)
1070 */
1071 int tdb_transaction_recover(struct tdb_context *tdb)
1072 {
1073         tdb_off_t recovery_head, recovery_eof;
1074         unsigned char *data, *p;
1075         uint32_t zero = 0;
1076         struct list_struct rec;
1077
1078         /* find the recovery area */
1079         if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, &recovery_head) == -1) {
1080                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery head\n"));
1081                 tdb->ecode = TDB_ERR_IO;
1082                 return -1;
1083         }
1084
1085         if (recovery_head == 0) {
1086                 /* we have never allocated a recovery record */
1087                 return 0;
1088         }
1089
1090         /* read the recovery record */
1091         if (tdb->methods->tdb_read(tdb, recovery_head, &rec, 
1092                                    sizeof(rec), DOCONV()) == -1) {
1093                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery record\n"));           
1094                 tdb->ecode = TDB_ERR_IO;
1095                 return -1;
1096         }
1097
1098         if (rec.magic != TDB_RECOVERY_MAGIC) {
1099                 /* there is no valid recovery data */
1100                 return 0;
1101         }
1102
1103         if (tdb->read_only) {
1104                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: attempt to recover read only database\n"));
1105                 tdb->ecode = TDB_ERR_CORRUPT;
1106                 return -1;
1107         }
1108
1109         recovery_eof = rec.key_len;
1110
1111         data = (unsigned char *)malloc(rec.data_len);
1112         if (data == NULL) {
1113                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to allocate recovery data\n"));         
1114                 tdb->ecode = TDB_ERR_OOM;
1115                 return -1;
1116         }
1117
1118         /* read the full recovery data */
1119         if (tdb->methods->tdb_read(tdb, recovery_head + sizeof(rec), data,
1120                                    rec.data_len, 0) == -1) {
1121                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery data\n"));             
1122                 tdb->ecode = TDB_ERR_IO;
1123                 return -1;
1124         }
1125
1126         /* recover the file data */
1127         p = data;
1128         while (p+8 < data + rec.data_len) {
1129                 uint32_t ofs, len;
1130                 if (DOCONV()) {
1131                         tdb_convert(p, 8);
1132                 }
1133                 memcpy(&ofs, p, 4);
1134                 memcpy(&len, p+4, 4);
1135
1136                 if (tdb->methods->tdb_write(tdb, ofs, p+8, len) == -1) {
1137                         free(data);
1138                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to recover %d bytes at offset %d\n", len, ofs));
1139                         tdb->ecode = TDB_ERR_IO;
1140                         return -1;
1141                 }
1142                 p += 8 + len;
1143         }
1144
1145         free(data);
1146
1147         if (transaction_sync(tdb, 0, tdb->map_size) == -1) {
1148                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to sync recovery\n"));
1149                 tdb->ecode = TDB_ERR_IO;
1150                 return -1;
1151         }
1152
1153         /* if the recovery area is after the recovered eof then remove it */
1154         if (recovery_eof <= recovery_head) {
1155                 if (tdb_ofs_write(tdb, TDB_RECOVERY_HEAD, &zero) == -1) {
1156                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to remove recovery head\n"));
1157                         tdb->ecode = TDB_ERR_IO;
1158                         return -1;                      
1159                 }
1160         }
1161
1162         /* remove the recovery magic */
1163         if (tdb_ofs_write(tdb, recovery_head + offsetof(struct list_struct, magic), 
1164                           &zero) == -1) {
1165                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to remove recovery magic\n"));
1166                 tdb->ecode = TDB_ERR_IO;
1167                 return -1;                      
1168         }
1169         
1170         /* reduce the file size to the old size */
1171         tdb_munmap(tdb);
1172         if (ftruncate(tdb->fd, recovery_eof) != 0) {
1173                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to reduce to recovery size\n"));
1174                 tdb->ecode = TDB_ERR_IO;
1175                 return -1;                      
1176         }
1177         tdb->map_size = recovery_eof;
1178         tdb_mmap(tdb);
1179
1180         if (transaction_sync(tdb, 0, recovery_eof) == -1) {
1181                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to sync2 recovery\n"));
1182                 tdb->ecode = TDB_ERR_IO;
1183                 return -1;
1184         }
1185
1186         TDB_LOG((tdb, TDB_DEBUG_TRACE, "tdb_transaction_recover: recovered %d byte database\n", 
1187                  recovery_eof));
1188
1189         /* all done */
1190         return 0;
1191 }