Merge commit 'origin/v4-0-test' into 4-0-local
[samba.git] / source4 / lib / tdb / common / transaction.c
1  /* 
2    Unix SMB/CIFS implementation.
3
4    trivial database library
5
6    Copyright (C) Andrew Tridgell              2005
7
8      ** NOTE! The following LGPL license applies to the tdb
9      ** library. This does NOT imply that all of Samba is released
10      ** under the LGPL
11    
12    This library is free software; you can redistribute it and/or
13    modify it under the terms of the GNU Lesser General Public
14    License as published by the Free Software Foundation; either
15    version 3 of the License, or (at your option) any later version.
16
17    This library is distributed in the hope that it will be useful,
18    but WITHOUT ANY WARRANTY; without even the implied warranty of
19    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
20    Lesser General Public License for more details.
21
22    You should have received a copy of the GNU Lesser General Public
23    License along with this library; if not, see <http://www.gnu.org/licenses/>.
24 */
25
26 #include "tdb_private.h"
27
28 /*
29   transaction design:
30
31   - only allow a single transaction at a time per database. This makes
32     using the transaction API simpler, as otherwise the caller would
33     have to cope with temporary failures in transactions that conflict
34     with other current transactions
35
36   - keep the transaction recovery information in the same file as the
37     database, using a special 'transaction recovery' record pointed at
38     by the header. This removes the need for extra journal files as
39     used by some other databases
40
41   - dynamically allocated the transaction recover record, re-using it
42     for subsequent transactions. If a larger record is needed then
43     tdb_free() the old record to place it on the normal tdb freelist
44     before allocating the new record
45
46   - during transactions, keep a linked list of writes all that have
47     been performed by intercepting all tdb_write() calls. The hooked
48     transaction versions of tdb_read() and tdb_write() check this
49     linked list and try to use the elements of the list in preference
50     to the real database.
51
52   - don't allow any locks to be held when a transaction starts,
53     otherwise we can end up with deadlock (plus lack of lock nesting
54     in posix locks would mean the lock is lost)
55
56   - if the caller gains a lock during the transaction but doesn't
57     release it then fail the commit
58
59   - allow for nested calls to tdb_transaction_start(), re-using the
60     existing transaction record. If the inner transaction is cancelled
61     then a subsequent commit will fail
62  
63   - keep a mirrored copy of the tdb hash chain heads to allow for the
64     fast hash heads scan on traverse, updating the mirrored copy in
65     the transaction version of tdb_write
66
67   - allow callers to mix transaction and non-transaction use of tdb,
68     although once a transaction is started then an exclusive lock is
69     gained until the transaction is committed or cancelled
70
71   - the commit stategy involves first saving away all modified data
72     into a linearised buffer in the transaction recovery area, then
73     marking the transaction recovery area with a magic value to
74     indicate a valid recovery record. In total 4 fsync/msync calls are
75     needed per commit to prevent race conditions. It might be possible
76     to reduce this to 3 or even 2 with some more work.
77
78   - check for a valid recovery record on open of the tdb, while the
79     global lock is held. Automatically recover from the transaction
80     recovery area if needed, then continue with the open as
81     usual. This allows for smooth crash recovery with no administrator
82     intervention.
83
84   - if TDB_NOSYNC is passed to flags in tdb_open then transactions are
85     still available, but no transaction recovery area is used and no
86     fsync/msync calls are made.
87
88 */
89
90
91 /*
92   hold the context of any current transaction
93 */
94 struct tdb_transaction {
95         /* we keep a mirrored copy of the tdb hash heads here so
96            tdb_next_hash_chain() can operate efficiently */
97         uint32_t *hash_heads;
98
99         /* the original io methods - used to do IOs to the real db */
100         const struct tdb_methods *io_methods;
101
102         /* the list of transaction blocks. When a block is first
103            written to, it gets created in this list */
104         uint8_t **blocks;
105         uint32_t num_blocks;
106         uint32_t block_size;      /* bytes in each block */
107         uint32_t last_block_size; /* number of valid bytes in the last block */
108
109         /* non-zero when an internal transaction error has
110            occurred. All write operations will then fail until the
111            transaction is ended */
112         int transaction_error;
113
114         /* when inside a transaction we need to keep track of any
115            nested tdb_transaction_start() calls, as these are allowed,
116            but don't create a new transaction */
117         int nesting;
118
119         /* old file size before transaction */
120         tdb_len_t old_map_size;
121 };
122
123
124 /*
125   read while in a transaction. We need to check first if the data is in our list
126   of transaction elements, then if not do a real read
127 */
128 static int transaction_read(struct tdb_context *tdb, tdb_off_t off, void *buf, 
129                             tdb_len_t len, int cv)
130 {
131         uint32_t blk;
132
133         /* break it down into block sized ops */
134         while (len + (off % tdb->transaction->block_size) > tdb->transaction->block_size) {
135                 tdb_len_t len2 = tdb->transaction->block_size - (off % tdb->transaction->block_size);
136                 if (transaction_read(tdb, off, buf, len2, cv) != 0) {
137                         return -1;
138                 }
139                 len -= len2;
140                 off += len2;
141                 buf = (void *)(len2 + (char *)buf);
142         }
143
144         if (len == 0) {
145                 return 0;
146         }
147
148         blk = off / tdb->transaction->block_size;
149
150         /* see if we have it in the block list */
151         if (tdb->transaction->num_blocks <= blk ||
152             tdb->transaction->blocks[blk] == NULL) {
153                 /* nope, do a real read */
154                 if (tdb->transaction->io_methods->tdb_read(tdb, off, buf, len, cv) != 0) {
155                         goto fail;
156                 }
157                 return 0;
158         }
159
160         /* it is in the block list. Now check for the last block */
161         if (blk == tdb->transaction->num_blocks-1) {
162                 if (len > tdb->transaction->last_block_size) {
163                         goto fail;
164                 }
165         }
166         
167         /* now copy it out of this block */
168         memcpy(buf, tdb->transaction->blocks[blk] + (off % tdb->transaction->block_size), len);
169         if (cv) {
170                 tdb_convert(buf, len);
171         }
172         return 0;
173
174 fail:
175         TDB_LOG((tdb, TDB_DEBUG_FATAL, "transaction_read: failed at off=%d len=%d\n", off, len));
176         tdb->ecode = TDB_ERR_IO;
177         tdb->transaction->transaction_error = 1;
178         return -1;
179 }
180
181
182 /*
183   write while in a transaction
184 */
185 static int transaction_write(struct tdb_context *tdb, tdb_off_t off, 
186                              const void *buf, tdb_len_t len)
187 {
188         uint32_t blk;
189
190         /* if the write is to a hash head, then update the transaction
191            hash heads */
192         if (len == sizeof(tdb_off_t) && off >= FREELIST_TOP &&
193             off < FREELIST_TOP+TDB_HASHTABLE_SIZE(tdb)) {
194                 uint32_t chain = (off-FREELIST_TOP) / sizeof(tdb_off_t);
195                 memcpy(&tdb->transaction->hash_heads[chain], buf, len);
196         }
197
198         /* break it up into block sized chunks */
199         while (len + (off % tdb->transaction->block_size) > tdb->transaction->block_size) {
200                 tdb_len_t len2 = tdb->transaction->block_size - (off % tdb->transaction->block_size);
201                 if (transaction_write(tdb, off, buf, len2) != 0) {
202                         return -1;
203                 }
204                 len -= len2;
205                 off += len2;
206                 if (buf != NULL) {
207                         buf = (const void *)(len2 + (const char *)buf);
208                 }
209         }
210
211         if (len == 0) {
212                 return 0;
213         }
214
215         blk = off / tdb->transaction->block_size;
216         off = off % tdb->transaction->block_size;
217
218         if (tdb->transaction->num_blocks <= blk) {
219                 uint8_t **new_blocks;
220                 /* expand the blocks array */
221                 if (tdb->transaction->blocks == NULL) {
222                         new_blocks = (uint8_t **)malloc(
223                                 (blk+1)*sizeof(uint8_t *));
224                 } else {
225                         new_blocks = (uint8_t **)realloc(
226                                 tdb->transaction->blocks,
227                                 (blk+1)*sizeof(uint8_t *));
228                 }
229                 if (new_blocks == NULL) {
230                         tdb->ecode = TDB_ERR_OOM;
231                         goto fail;
232                 }
233                 memset(&new_blocks[tdb->transaction->num_blocks], 0, 
234                        (1+(blk - tdb->transaction->num_blocks))*sizeof(uint8_t *));
235                 tdb->transaction->blocks = new_blocks;
236                 tdb->transaction->num_blocks = blk+1;
237                 tdb->transaction->last_block_size = 0;
238         }
239
240         /* allocate and fill a block? */
241         if (tdb->transaction->blocks[blk] == NULL) {
242                 tdb->transaction->blocks[blk] = (uint8_t *)calloc(tdb->transaction->block_size, 1);
243                 if (tdb->transaction->blocks[blk] == NULL) {
244                         tdb->ecode = TDB_ERR_OOM;
245                         tdb->transaction->transaction_error = 1;
246                         return -1;                      
247                 }
248                 if (tdb->transaction->old_map_size > blk * tdb->transaction->block_size) {
249                         tdb_len_t len2 = tdb->transaction->block_size;
250                         if (len2 + (blk * tdb->transaction->block_size) > tdb->transaction->old_map_size) {
251                                 len2 = tdb->transaction->old_map_size - (blk * tdb->transaction->block_size);
252                         }
253                         if (tdb->transaction->io_methods->tdb_read(tdb, blk * tdb->transaction->block_size, 
254                                                                    tdb->transaction->blocks[blk], 
255                                                                    len2, 0) != 0) {
256                                 SAFE_FREE(tdb->transaction->blocks[blk]);                               
257                                 tdb->ecode = TDB_ERR_IO;
258                                 goto fail;
259                         }
260                         if (blk == tdb->transaction->num_blocks-1) {
261                                 tdb->transaction->last_block_size = len2;
262                         }                       
263                 }
264         }
265         
266         /* overwrite part of an existing block */
267         if (buf == NULL) {
268                 memset(tdb->transaction->blocks[blk] + off, 0, len);
269         } else {
270                 memcpy(tdb->transaction->blocks[blk] + off, buf, len);
271         }
272         if (blk == tdb->transaction->num_blocks-1) {
273                 if (len + off > tdb->transaction->last_block_size) {
274                         tdb->transaction->last_block_size = len + off;
275                 }
276         }
277
278         return 0;
279
280 fail:
281         TDB_LOG((tdb, TDB_DEBUG_FATAL, "transaction_write: failed at off=%d len=%d\n", 
282                  (blk*tdb->transaction->block_size) + off, len));
283         tdb->transaction->transaction_error = 1;
284         return -1;
285 }
286
287
288 /*
289   write while in a transaction - this varient never expands the transaction blocks, it only
290   updates existing blocks. This means it cannot change the recovery size
291 */
292 static int transaction_write_existing(struct tdb_context *tdb, tdb_off_t off, 
293                                       const void *buf, tdb_len_t len)
294 {
295         uint32_t blk;
296
297         /* break it up into block sized chunks */
298         while (len + (off % tdb->transaction->block_size) > tdb->transaction->block_size) {
299                 tdb_len_t len2 = tdb->transaction->block_size - (off % tdb->transaction->block_size);
300                 if (transaction_write_existing(tdb, off, buf, len2) != 0) {
301                         return -1;
302                 }
303                 len -= len2;
304                 off += len2;
305                 if (buf != NULL) {
306                         buf = (const void *)(len2 + (const char *)buf);
307                 }
308         }
309
310         if (len == 0) {
311                 return 0;
312         }
313
314         blk = off / tdb->transaction->block_size;
315         off = off % tdb->transaction->block_size;
316
317         if (tdb->transaction->num_blocks <= blk ||
318             tdb->transaction->blocks[blk] == NULL) {
319                 return 0;
320         }
321
322         if (blk == tdb->transaction->num_blocks-1 &&
323             off + len > tdb->transaction->last_block_size) {
324                 len = tdb->transaction->last_block_size - off;
325         }
326
327         /* overwrite part of an existing block */
328         memcpy(tdb->transaction->blocks[blk] + off, buf, len);
329
330         return 0;
331 }
332
333
334 /*
335   accelerated hash chain head search, using the cached hash heads
336 */
337 static void transaction_next_hash_chain(struct tdb_context *tdb, uint32_t *chain)
338 {
339         uint32_t h = *chain;
340         for (;h < tdb->header.hash_size;h++) {
341                 /* the +1 takes account of the freelist */
342                 if (0 != tdb->transaction->hash_heads[h+1]) {
343                         break;
344                 }
345         }
346         (*chain) = h;
347 }
348
349 /*
350   out of bounds check during a transaction
351 */
352 static int transaction_oob(struct tdb_context *tdb, tdb_off_t len, int probe)
353 {
354         if (len <= tdb->map_size) {
355                 return 0;
356         }
357         return TDB_ERRCODE(TDB_ERR_IO, -1);
358 }
359
360 /*
361   transaction version of tdb_expand().
362 */
363 static int transaction_expand_file(struct tdb_context *tdb, tdb_off_t size, 
364                                    tdb_off_t addition)
365 {
366         /* add a write to the transaction elements, so subsequent
367            reads see the zero data */
368         if (transaction_write(tdb, size, NULL, addition) != 0) {
369                 return -1;
370         }
371
372         return 0;
373 }
374
375 /*
376   brlock during a transaction - ignore them
377 */
378 static int transaction_brlock(struct tdb_context *tdb, tdb_off_t offset, 
379                               int rw_type, int lck_type, int probe, size_t len)
380 {
381         return 0;
382 }
383
384 static const struct tdb_methods transaction_methods = {
385         transaction_read,
386         transaction_write,
387         transaction_next_hash_chain,
388         transaction_oob,
389         transaction_expand_file,
390         transaction_brlock
391 };
392
393
394 /*
395   start a tdb transaction. No token is returned, as only a single
396   transaction is allowed to be pending per tdb_context
397 */
398 int tdb_transaction_start(struct tdb_context *tdb)
399 {
400         /* some sanity checks */
401         if (tdb->read_only || (tdb->flags & TDB_INTERNAL) || tdb->traverse_read) {
402                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction on a read-only or internal db\n"));
403                 tdb->ecode = TDB_ERR_EINVAL;
404                 return -1;
405         }
406
407         /* cope with nested tdb_transaction_start() calls */
408         if (tdb->transaction != NULL) {
409                 tdb->transaction->nesting++;
410                 TDB_LOG((tdb, TDB_DEBUG_TRACE, "tdb_transaction_start: nesting %d\n", 
411                          tdb->transaction->nesting));
412                 return 0;
413         }
414
415         if (tdb->num_locks != 0 || tdb->global_lock.count) {
416                 /* the caller must not have any locks when starting a
417                    transaction as otherwise we'll be screwed by lack
418                    of nested locks in posix */
419                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction with locks held\n"));
420                 tdb->ecode = TDB_ERR_LOCK;
421                 return -1;
422         }
423
424         if (tdb->travlocks.next != NULL) {
425                 /* you cannot use transactions inside a traverse (although you can use
426                    traverse inside a transaction) as otherwise you can end up with
427                    deadlock */
428                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction within a traverse\n"));
429                 tdb->ecode = TDB_ERR_LOCK;
430                 return -1;
431         }
432
433         tdb->transaction = (struct tdb_transaction *)
434                 calloc(sizeof(struct tdb_transaction), 1);
435         if (tdb->transaction == NULL) {
436                 tdb->ecode = TDB_ERR_OOM;
437                 return -1;
438         }
439
440         /* a page at a time seems like a reasonable compromise between compactness and efficiency */
441         tdb->transaction->block_size = tdb->page_size;
442
443         /* get the transaction write lock. This is a blocking lock. As
444            discussed with Volker, there are a number of ways we could
445            make this async, which we will probably do in the future */
446         if (tdb_transaction_lock(tdb, F_WRLCK) == -1) {
447                 SAFE_FREE(tdb->transaction->blocks);
448                 SAFE_FREE(tdb->transaction);
449                 return -1;
450         }
451         
452         /* get a read lock from the freelist to the end of file. This
453            is upgraded to a write lock during the commit */
454         if (tdb_brlock(tdb, FREELIST_TOP, F_RDLCK, F_SETLKW, 0, 0) == -1) {
455                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: failed to get hash locks\n"));
456                 tdb->ecode = TDB_ERR_LOCK;
457                 goto fail;
458         }
459
460         /* setup a copy of the hash table heads so the hash scan in
461            traverse can be fast */
462         tdb->transaction->hash_heads = (uint32_t *)
463                 calloc(tdb->header.hash_size+1, sizeof(uint32_t));
464         if (tdb->transaction->hash_heads == NULL) {
465                 tdb->ecode = TDB_ERR_OOM;
466                 goto fail;
467         }
468         if (tdb->methods->tdb_read(tdb, FREELIST_TOP, tdb->transaction->hash_heads,
469                                    TDB_HASHTABLE_SIZE(tdb), 0) != 0) {
470                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_start: failed to read hash heads\n"));
471                 tdb->ecode = TDB_ERR_IO;
472                 goto fail;
473         }
474
475         /* make sure we know about any file expansions already done by
476            anyone else */
477         tdb->methods->tdb_oob(tdb, tdb->map_size + 1, 1);
478         tdb->transaction->old_map_size = tdb->map_size;
479
480         /* finally hook the io methods, replacing them with
481            transaction specific methods */
482         tdb->transaction->io_methods = tdb->methods;
483         tdb->methods = &transaction_methods;
484
485         return 0;
486         
487 fail:
488         tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 0);
489         tdb_transaction_unlock(tdb);
490         SAFE_FREE(tdb->transaction->blocks);
491         SAFE_FREE(tdb->transaction->hash_heads);
492         SAFE_FREE(tdb->transaction);
493         return -1;
494 }
495
496
497 /*
498   cancel the current transaction
499 */
500 int tdb_transaction_cancel(struct tdb_context *tdb)
501 {       
502         int i;
503
504         if (tdb->transaction == NULL) {
505                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_cancel: no transaction\n"));
506                 return -1;
507         }
508
509         if (tdb->transaction->nesting != 0) {
510                 tdb->transaction->transaction_error = 1;
511                 tdb->transaction->nesting--;
512                 return 0;
513         }               
514
515         tdb->map_size = tdb->transaction->old_map_size;
516
517         /* free all the transaction blocks */
518         for (i=0;i<tdb->transaction->num_blocks;i++) {
519                 if (tdb->transaction->blocks[i] != NULL) {
520                         free(tdb->transaction->blocks[i]);
521                 }
522         }
523         SAFE_FREE(tdb->transaction->blocks);
524
525         /* remove any global lock created during the transaction */
526         if (tdb->global_lock.count != 0) {
527                 tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 4*tdb->header.hash_size);
528                 tdb->global_lock.count = 0;
529         }
530
531         /* remove any locks created during the transaction */
532         if (tdb->num_locks != 0) {
533                 for (i=0;i<tdb->num_lockrecs;i++) {
534                         tdb_brlock(tdb,FREELIST_TOP+4*tdb->lockrecs[i].list,
535                                    F_UNLCK,F_SETLKW, 0, 1);
536                 }
537                 tdb->num_locks = 0;
538                 tdb->num_lockrecs = 0;
539                 SAFE_FREE(tdb->lockrecs);
540         }
541
542         /* restore the normal io methods */
543         tdb->methods = tdb->transaction->io_methods;
544
545         tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 0);
546         tdb_transaction_unlock(tdb);
547         SAFE_FREE(tdb->transaction->hash_heads);
548         SAFE_FREE(tdb->transaction);
549         
550         return 0;
551 }
552
553 /*
554   sync to disk
555 */
556 static int transaction_sync(struct tdb_context *tdb, tdb_off_t offset, tdb_len_t length)
557 {       
558         if (fsync(tdb->fd) != 0) {
559                 tdb->ecode = TDB_ERR_IO;
560                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction: fsync failed\n"));
561                 return -1;
562         }
563 #ifdef MS_SYNC
564         if (tdb->map_ptr) {
565                 tdb_off_t moffset = offset & ~(tdb->page_size-1);
566                 if (msync(moffset + (char *)tdb->map_ptr, 
567                           length + (offset - moffset), MS_SYNC) != 0) {
568                         tdb->ecode = TDB_ERR_IO;
569                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction: msync failed - %s\n",
570                                  strerror(errno)));
571                         return -1;
572                 }
573         }
574 #endif
575         return 0;
576 }
577
578
579 /*
580   work out how much space the linearised recovery data will consume
581 */
582 static tdb_len_t tdb_recovery_size(struct tdb_context *tdb)
583 {
584         tdb_len_t recovery_size = 0;
585         int i;
586
587         recovery_size = sizeof(uint32_t);
588         for (i=0;i<tdb->transaction->num_blocks;i++) {
589                 if (i * tdb->transaction->block_size >= tdb->transaction->old_map_size) {
590                         break;
591                 }
592                 if (tdb->transaction->blocks[i] == NULL) {
593                         continue;
594                 }
595                 recovery_size += 2*sizeof(tdb_off_t);
596                 if (i == tdb->transaction->num_blocks-1) {
597                         recovery_size += tdb->transaction->last_block_size;
598                 } else {
599                         recovery_size += tdb->transaction->block_size;
600                 }
601         }       
602
603         return recovery_size;
604 }
605
606 /*
607   allocate the recovery area, or use an existing recovery area if it is
608   large enough
609 */
610 static int tdb_recovery_allocate(struct tdb_context *tdb, 
611                                  tdb_len_t *recovery_size,
612                                  tdb_off_t *recovery_offset,
613                                  tdb_len_t *recovery_max_size)
614 {
615         struct list_struct rec;
616         const struct tdb_methods *methods = tdb->transaction->io_methods;
617         tdb_off_t recovery_head;
618
619         if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, &recovery_head) == -1) {
620                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to read recovery head\n"));
621                 return -1;
622         }
623
624         rec.rec_len = 0;
625
626         if (recovery_head != 0 && 
627             methods->tdb_read(tdb, recovery_head, &rec, sizeof(rec), DOCONV()) == -1) {
628                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to read recovery record\n"));
629                 return -1;
630         }
631
632         *recovery_size = tdb_recovery_size(tdb);
633
634         if (recovery_head != 0 && *recovery_size <= rec.rec_len) {
635                 /* it fits in the existing area */
636                 *recovery_max_size = rec.rec_len;
637                 *recovery_offset = recovery_head;
638                 return 0;
639         }
640
641         /* we need to free up the old recovery area, then allocate a
642            new one at the end of the file. Note that we cannot use
643            tdb_allocate() to allocate the new one as that might return
644            us an area that is being currently used (as of the start of
645            the transaction) */
646         if (recovery_head != 0) {
647                 if (tdb_free(tdb, recovery_head, &rec) == -1) {
648                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to free previous recovery area\n"));
649                         return -1;
650                 }
651         }
652
653         /* the tdb_free() call might have increased the recovery size */
654         *recovery_size = tdb_recovery_size(tdb);
655
656         /* round up to a multiple of page size */
657         *recovery_max_size = TDB_ALIGN(sizeof(rec) + *recovery_size, tdb->page_size) - sizeof(rec);
658         *recovery_offset = tdb->map_size;
659         recovery_head = *recovery_offset;
660
661         if (methods->tdb_expand_file(tdb, tdb->transaction->old_map_size, 
662                                      (tdb->map_size - tdb->transaction->old_map_size) +
663                                      sizeof(rec) + *recovery_max_size) == -1) {
664                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to create recovery area\n"));
665                 return -1;
666         }
667
668         /* remap the file (if using mmap) */
669         methods->tdb_oob(tdb, tdb->map_size + 1, 1);
670
671         /* we have to reset the old map size so that we don't try to expand the file
672            again in the transaction commit, which would destroy the recovery area */
673         tdb->transaction->old_map_size = tdb->map_size;
674
675         /* write the recovery header offset and sync - we can sync without a race here
676            as the magic ptr in the recovery record has not been set */
677         CONVERT(recovery_head);
678         if (methods->tdb_write(tdb, TDB_RECOVERY_HEAD, 
679                                &recovery_head, sizeof(tdb_off_t)) == -1) {
680                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to write recovery head\n"));
681                 return -1;
682         }
683         if (transaction_write_existing(tdb, TDB_RECOVERY_HEAD, &recovery_head, sizeof(tdb_off_t)) == -1) {
684                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to write recovery head\n"));
685                 return -1;
686         }
687
688         return 0;
689 }
690
691
692 /*
693   setup the recovery data that will be used on a crash during commit
694 */
695 static int transaction_setup_recovery(struct tdb_context *tdb, 
696                                       tdb_off_t *magic_offset)
697 {
698         tdb_len_t recovery_size;
699         unsigned char *data, *p;
700         const struct tdb_methods *methods = tdb->transaction->io_methods;
701         struct list_struct *rec;
702         tdb_off_t recovery_offset, recovery_max_size;
703         tdb_off_t old_map_size = tdb->transaction->old_map_size;
704         uint32_t magic, tailer;
705         int i;
706
707         /*
708           check that the recovery area has enough space
709         */
710         if (tdb_recovery_allocate(tdb, &recovery_size, 
711                                   &recovery_offset, &recovery_max_size) == -1) {
712                 return -1;
713         }
714
715         data = (unsigned char *)malloc(recovery_size + sizeof(*rec));
716         if (data == NULL) {
717                 tdb->ecode = TDB_ERR_OOM;
718                 return -1;
719         }
720
721         rec = (struct list_struct *)data;
722         memset(rec, 0, sizeof(*rec));
723
724         rec->magic    = 0;
725         rec->data_len = recovery_size;
726         rec->rec_len  = recovery_max_size;
727         rec->key_len  = old_map_size;
728         CONVERT(rec);
729
730         /* build the recovery data into a single blob to allow us to do a single
731            large write, which should be more efficient */
732         p = data + sizeof(*rec);
733         for (i=0;i<tdb->transaction->num_blocks;i++) {
734                 tdb_off_t offset;
735                 tdb_len_t length;
736
737                 if (tdb->transaction->blocks[i] == NULL) {
738                         continue;
739                 }
740
741                 offset = i * tdb->transaction->block_size;
742                 length = tdb->transaction->block_size;
743                 if (i == tdb->transaction->num_blocks-1) {
744                         length = tdb->transaction->last_block_size;
745                 }
746                 
747                 if (offset >= old_map_size) {
748                         continue;
749                 }
750                 if (offset + length > tdb->transaction->old_map_size) {
751                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: transaction data over new region boundary\n"));
752                         free(data);
753                         tdb->ecode = TDB_ERR_CORRUPT;
754                         return -1;
755                 }
756                 memcpy(p, &offset, 4);
757                 memcpy(p+4, &length, 4);
758                 if (DOCONV()) {
759                         tdb_convert(p, 8);
760                 }
761                 /* the recovery area contains the old data, not the
762                    new data, so we have to call the original tdb_read
763                    method to get it */
764                 if (methods->tdb_read(tdb, offset, p + 8, length, 0) != 0) {
765                         free(data);
766                         tdb->ecode = TDB_ERR_IO;
767                         return -1;
768                 }
769                 p += 8 + length;
770         }
771
772         /* and the tailer */
773         tailer = sizeof(*rec) + recovery_max_size;
774         memcpy(p, &tailer, 4);
775         CONVERT(p);
776
777         /* write the recovery data to the recovery area */
778         if (methods->tdb_write(tdb, recovery_offset, data, sizeof(*rec) + recovery_size) == -1) {
779                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write recovery data\n"));
780                 free(data);
781                 tdb->ecode = TDB_ERR_IO;
782                 return -1;
783         }
784         if (transaction_write_existing(tdb, recovery_offset, data, sizeof(*rec) + recovery_size) == -1) {
785                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write secondary recovery data\n"));
786                 free(data);
787                 tdb->ecode = TDB_ERR_IO;
788                 return -1;
789         }
790
791         /* as we don't have ordered writes, we have to sync the recovery
792            data before we update the magic to indicate that the recovery
793            data is present */
794         if (transaction_sync(tdb, recovery_offset, sizeof(*rec) + recovery_size) == -1) {
795                 free(data);
796                 return -1;
797         }
798
799         free(data);
800
801         magic = TDB_RECOVERY_MAGIC;
802         CONVERT(magic);
803
804         *magic_offset = recovery_offset + offsetof(struct list_struct, magic);
805
806         if (methods->tdb_write(tdb, *magic_offset, &magic, sizeof(magic)) == -1) {
807                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write recovery magic\n"));
808                 tdb->ecode = TDB_ERR_IO;
809                 return -1;
810         }
811         if (transaction_write_existing(tdb, *magic_offset, &magic, sizeof(magic)) == -1) {
812                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write secondary recovery magic\n"));
813                 tdb->ecode = TDB_ERR_IO;
814                 return -1;
815         }
816
817         /* ensure the recovery magic marker is on disk */
818         if (transaction_sync(tdb, *magic_offset, sizeof(magic)) == -1) {
819                 return -1;
820         }
821
822         return 0;
823 }
824
825 /*
826   commit the current transaction
827 */
828 int tdb_transaction_commit(struct tdb_context *tdb)
829 {       
830         const struct tdb_methods *methods;
831         tdb_off_t magic_offset = 0;
832         uint32_t zero = 0;
833         int i;
834
835         if (tdb->transaction == NULL) {
836                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: no transaction\n"));
837                 return -1;
838         }
839
840         if (tdb->transaction->transaction_error) {
841                 tdb->ecode = TDB_ERR_IO;
842                 tdb_transaction_cancel(tdb);
843                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: transaction error pending\n"));
844                 return -1;
845         }
846
847
848         if (tdb->transaction->nesting != 0) {
849                 tdb->transaction->nesting--;
850                 return 0;
851         }               
852
853         /* check for a null transaction */
854         if (tdb->transaction->blocks == NULL) {
855                 tdb_transaction_cancel(tdb);
856                 return 0;
857         }
858
859         methods = tdb->transaction->io_methods;
860         
861         /* if there are any locks pending then the caller has not
862            nested their locks properly, so fail the transaction */
863         if (tdb->num_locks || tdb->global_lock.count) {
864                 tdb->ecode = TDB_ERR_LOCK;
865                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: locks pending on commit\n"));
866                 tdb_transaction_cancel(tdb);
867                 return -1;
868         }
869
870         /* upgrade the main transaction lock region to a write lock */
871         if (tdb_brlock_upgrade(tdb, FREELIST_TOP, 0) == -1) {
872                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: failed to upgrade hash locks\n"));
873                 tdb->ecode = TDB_ERR_LOCK;
874                 tdb_transaction_cancel(tdb);
875                 return -1;
876         }
877
878         /* get the global lock - this prevents new users attaching to the database
879            during the commit */
880         if (tdb_brlock(tdb, GLOBAL_LOCK, F_WRLCK, F_SETLKW, 0, 1) == -1) {
881                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: failed to get global lock\n"));
882                 tdb->ecode = TDB_ERR_LOCK;
883                 tdb_transaction_cancel(tdb);
884                 return -1;
885         }
886
887         if (!(tdb->flags & TDB_NOSYNC)) {
888                 /* write the recovery data to the end of the file */
889                 if (transaction_setup_recovery(tdb, &magic_offset) == -1) {
890                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: failed to setup recovery data\n"));
891                         tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
892                         tdb_transaction_cancel(tdb);
893                         return -1;
894                 }
895         }
896
897         /* expand the file to the new size if needed */
898         if (tdb->map_size != tdb->transaction->old_map_size) {
899                 if (methods->tdb_expand_file(tdb, tdb->transaction->old_map_size, 
900                                              tdb->map_size - 
901                                              tdb->transaction->old_map_size) == -1) {
902                         tdb->ecode = TDB_ERR_IO;
903                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: expansion failed\n"));
904                         tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
905                         tdb_transaction_cancel(tdb);
906                         return -1;
907                 }
908                 tdb->map_size = tdb->transaction->old_map_size;
909                 methods->tdb_oob(tdb, tdb->map_size + 1, 1);
910         }
911
912         /* perform all the writes */
913         for (i=0;i<tdb->transaction->num_blocks;i++) {
914                 tdb_off_t offset;
915                 tdb_len_t length;
916
917                 if (tdb->transaction->blocks[i] == NULL) {
918                         continue;
919                 }
920
921                 offset = i * tdb->transaction->block_size;
922                 length = tdb->transaction->block_size;
923                 if (i == tdb->transaction->num_blocks-1) {
924                         length = tdb->transaction->last_block_size;
925                 }
926
927                 if (methods->tdb_write(tdb, offset, tdb->transaction->blocks[i], length) == -1) {
928                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: write failed during commit\n"));
929                         
930                         /* we've overwritten part of the data and
931                            possibly expanded the file, so we need to
932                            run the crash recovery code */
933                         tdb->methods = methods;
934                         tdb_transaction_recover(tdb); 
935
936                         tdb_transaction_cancel(tdb);
937                         tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
938
939                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: write failed\n"));
940                         return -1;
941                 }
942                 SAFE_FREE(tdb->transaction->blocks[i]);
943         } 
944
945         SAFE_FREE(tdb->transaction->blocks);
946         tdb->transaction->num_blocks = 0;
947
948         if (!(tdb->flags & TDB_NOSYNC)) {
949                 /* ensure the new data is on disk */
950                 if (transaction_sync(tdb, 0, tdb->map_size) == -1) {
951                         return -1;
952                 }
953
954                 /* remove the recovery marker */
955                 if (methods->tdb_write(tdb, magic_offset, &zero, 4) == -1) {
956                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: failed to remove recovery magic\n"));
957                         return -1;
958                 }
959
960                 /* ensure the recovery marker has been removed on disk */
961                 if (transaction_sync(tdb, magic_offset, 4) == -1) {
962                         return -1;
963                 }
964         }
965
966         tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
967
968         /*
969           TODO: maybe write to some dummy hdr field, or write to magic
970           offset without mmap, before the last sync, instead of the
971           utime() call
972         */
973
974         /* on some systems (like Linux 2.6.x) changes via mmap/msync
975            don't change the mtime of the file, this means the file may
976            not be backed up (as tdb rounding to block sizes means that
977            file size changes are quite rare too). The following forces
978            mtime changes when a transaction completes */
979 #ifdef HAVE_UTIME
980         utime(tdb->name, NULL);
981 #endif
982
983         /* use a transaction cancel to free memory and remove the
984            transaction locks */
985         tdb_transaction_cancel(tdb);
986
987         return 0;
988 }
989
990
991 /*
992   recover from an aborted transaction. Must be called with exclusive
993   database write access already established (including the global
994   lock to prevent new processes attaching)
995 */
996 int tdb_transaction_recover(struct tdb_context *tdb)
997 {
998         tdb_off_t recovery_head, recovery_eof;
999         unsigned char *data, *p;
1000         uint32_t zero = 0;
1001         struct list_struct rec;
1002
1003         /* find the recovery area */
1004         if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, &recovery_head) == -1) {
1005                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery head\n"));
1006                 tdb->ecode = TDB_ERR_IO;
1007                 return -1;
1008         }
1009
1010         if (recovery_head == 0) {
1011                 /* we have never allocated a recovery record */
1012                 return 0;
1013         }
1014
1015         /* read the recovery record */
1016         if (tdb->methods->tdb_read(tdb, recovery_head, &rec, 
1017                                    sizeof(rec), DOCONV()) == -1) {
1018                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery record\n"));           
1019                 tdb->ecode = TDB_ERR_IO;
1020                 return -1;
1021         }
1022
1023         if (rec.magic != TDB_RECOVERY_MAGIC) {
1024                 /* there is no valid recovery data */
1025                 return 0;
1026         }
1027
1028         if (tdb->read_only) {
1029                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: attempt to recover read only database\n"));
1030                 tdb->ecode = TDB_ERR_CORRUPT;
1031                 return -1;
1032         }
1033
1034         recovery_eof = rec.key_len;
1035
1036         data = (unsigned char *)malloc(rec.data_len);
1037         if (data == NULL) {
1038                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to allocate recovery data\n"));         
1039                 tdb->ecode = TDB_ERR_OOM;
1040                 return -1;
1041         }
1042
1043         /* read the full recovery data */
1044         if (tdb->methods->tdb_read(tdb, recovery_head + sizeof(rec), data,
1045                                    rec.data_len, 0) == -1) {
1046                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery data\n"));             
1047                 tdb->ecode = TDB_ERR_IO;
1048                 return -1;
1049         }
1050
1051         /* recover the file data */
1052         p = data;
1053         while (p+8 < data + rec.data_len) {
1054                 uint32_t ofs, len;
1055                 if (DOCONV()) {
1056                         tdb_convert(p, 8);
1057                 }
1058                 memcpy(&ofs, p, 4);
1059                 memcpy(&len, p+4, 4);
1060
1061                 if (tdb->methods->tdb_write(tdb, ofs, p+8, len) == -1) {
1062                         free(data);
1063                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to recover %d bytes at offset %d\n", len, ofs));
1064                         tdb->ecode = TDB_ERR_IO;
1065                         return -1;
1066                 }
1067                 p += 8 + len;
1068         }
1069
1070         free(data);
1071
1072         if (transaction_sync(tdb, 0, tdb->map_size) == -1) {
1073                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to sync recovery\n"));
1074                 tdb->ecode = TDB_ERR_IO;
1075                 return -1;
1076         }
1077
1078         /* if the recovery area is after the recovered eof then remove it */
1079         if (recovery_eof <= recovery_head) {
1080                 if (tdb_ofs_write(tdb, TDB_RECOVERY_HEAD, &zero) == -1) {
1081                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to remove recovery head\n"));
1082                         tdb->ecode = TDB_ERR_IO;
1083                         return -1;                      
1084                 }
1085         }
1086
1087         /* remove the recovery magic */
1088         if (tdb_ofs_write(tdb, recovery_head + offsetof(struct list_struct, magic), 
1089                           &zero) == -1) {
1090                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to remove recovery magic\n"));
1091                 tdb->ecode = TDB_ERR_IO;
1092                 return -1;                      
1093         }
1094         
1095         /* reduce the file size to the old size */
1096         tdb_munmap(tdb);
1097         if (ftruncate(tdb->fd, recovery_eof) != 0) {
1098                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to reduce to recovery size\n"));
1099                 tdb->ecode = TDB_ERR_IO;
1100                 return -1;                      
1101         }
1102         tdb->map_size = recovery_eof;
1103         tdb_mmap(tdb);
1104
1105         if (transaction_sync(tdb, 0, recovery_eof) == -1) {
1106                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to sync2 recovery\n"));
1107                 tdb->ecode = TDB_ERR_IO;
1108                 return -1;
1109         }
1110
1111         TDB_LOG((tdb, TDB_DEBUG_TRACE, "tdb_transaction_recover: recovered %d byte database\n", 
1112                  recovery_eof));
1113
1114         /* all done */
1115         return 0;
1116 }