ntdb: fix recovery data write.
[ddiss/samba.git] / lib / ntdb / transaction.c
1  /*
2    Unix SMB/CIFS implementation.
3
4    trivial database library
5
6    Copyright (C) Andrew Tridgell              2005
7    Copyright (C) Rusty Russell                2010
8
9      ** NOTE! The following LGPL license applies to the ntdb
10      ** library. This does NOT imply that all of Samba is released
11      ** under the LGPL
12
13    This library is free software; you can redistribute it and/or
14    modify it under the terms of the GNU Lesser General Public
15    License as published by the Free Software Foundation; either
16    version 3 of the License, or (at your option) any later version.
17
18    This library is distributed in the hope that it will be useful,
19    but WITHOUT ANY WARRANTY; without even the implied warranty of
20    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
21    Lesser General Public License for more details.
22
23    You should have received a copy of the GNU Lesser General Public
24    License along with this library; if not, see <http://www.gnu.org/licenses/>.
25 */
26
27 #include "private.h"
28 #define SAFE_FREE(x) do { if ((x) != NULL) {free((void *)x); (x)=NULL;} } while(0)
29
30 /*
31   transaction design:
32
33   - only allow a single transaction at a time per database. This makes
34     using the transaction API simpler, as otherwise the caller would
35     have to cope with temporary failures in transactions that conflict
36     with other current transactions
37
38   - keep the transaction recovery information in the same file as the
39     database, using a special 'transaction recovery' record pointed at
40     by the header. This removes the need for extra journal files as
41     used by some other databases
42
43   - dynamically allocated the transaction recover record, re-using it
44     for subsequent transactions. If a larger record is needed then
45     ntdb_free() the old record to place it on the normal ntdb freelist
46     before allocating the new record
47
48   - during transactions, keep a linked list of writes all that have
49     been performed by intercepting all ntdb_write() calls. The hooked
50     transaction versions of ntdb_read() and ntdb_write() check this
51     linked list and try to use the elements of the list in preference
52     to the real database.
53
54   - don't allow any locks to be held when a transaction starts,
55     otherwise we can end up with deadlock (plus lack of lock nesting
56     in POSIX locks would mean the lock is lost)
57
58   - if the caller gains a lock during the transaction but doesn't
59     release it then fail the commit
60
61   - allow for nested calls to ntdb_transaction_start(), re-using the
62     existing transaction record. If the inner transaction is canceled
63     then a subsequent commit will fail
64
65   - keep a mirrored copy of the ntdb hash chain heads to allow for the
66     fast hash heads scan on traverse, updating the mirrored copy in
67     the transaction version of ntdb_write
68
69   - allow callers to mix transaction and non-transaction use of ntdb,
70     although once a transaction is started then an exclusive lock is
71     gained until the transaction is committed or canceled
72
73   - the commit stategy involves first saving away all modified data
74     into a linearised buffer in the transaction recovery area, then
75     marking the transaction recovery area with a magic value to
76     indicate a valid recovery record. In total 4 fsync/msync calls are
77     needed per commit to prevent race conditions. It might be possible
78     to reduce this to 3 or even 2 with some more work.
79
80   - check for a valid recovery record on open of the ntdb, while the
81     open lock is held. Automatically recover from the transaction
82     recovery area if needed, then continue with the open as
83     usual. This allows for smooth crash recovery with no administrator
84     intervention.
85
86   - if NTDB_NOSYNC is passed to flags in ntdb_open then transactions are
87     still available, but no transaction recovery area is used and no
88     fsync/msync calls are made.
89 */
90
91 /*
92   hold the context of any current transaction
93 */
94 struct ntdb_transaction {
95         /* the original io methods - used to do IOs to the real db */
96         const struct ntdb_methods *io_methods;
97
98         /* the list of transaction blocks. When a block is first
99            written to, it gets created in this list */
100         uint8_t **blocks;
101         size_t num_blocks;
102         size_t last_block_size; /* number of valid bytes in the last block */
103
104         /* non-zero when an internal transaction error has
105            occurred. All write operations will then fail until the
106            transaction is ended */
107         int transaction_error;
108
109         /* when inside a transaction we need to keep track of any
110            nested ntdb_transaction_start() calls, as these are allowed,
111            but don't create a new transaction */
112         unsigned int nesting;
113
114         /* set when a prepare has already occurred */
115         bool prepared;
116         ntdb_off_t magic_offset;
117
118         /* old file size before transaction */
119         ntdb_len_t old_map_size;
120 };
121
122 /* This doesn't really need to be pagesize, but we use it for similar reasons. */
123 #define PAGESIZE 65536
124
125 /*
126   read while in a transaction. We need to check first if the data is in our list
127   of transaction elements, then if not do a real read
128 */
129 static enum NTDB_ERROR transaction_read(struct ntdb_context *ntdb, ntdb_off_t off,
130                                        void *buf, ntdb_len_t len)
131 {
132         size_t blk;
133         enum NTDB_ERROR ecode;
134
135         /* break it down into block sized ops */
136         while (len + (off % PAGESIZE) > PAGESIZE) {
137                 ntdb_len_t len2 = PAGESIZE - (off % PAGESIZE);
138                 ecode = transaction_read(ntdb, off, buf, len2);
139                 if (ecode != NTDB_SUCCESS) {
140                         return ecode;
141                 }
142                 len -= len2;
143                 off += len2;
144                 buf = (void *)(len2 + (char *)buf);
145         }
146
147         if (len == 0) {
148                 return NTDB_SUCCESS;
149         }
150
151         blk = off / PAGESIZE;
152
153         /* see if we have it in the block list */
154         if (ntdb->transaction->num_blocks <= blk ||
155             ntdb->transaction->blocks[blk] == NULL) {
156                 /* nope, do a real read */
157                 ecode = ntdb->transaction->io_methods->tread(ntdb, off, buf, len);
158                 if (ecode != NTDB_SUCCESS) {
159                         goto fail;
160                 }
161                 return 0;
162         }
163
164         /* it is in the block list. Now check for the last block */
165         if (blk == ntdb->transaction->num_blocks-1) {
166                 if (len > ntdb->transaction->last_block_size) {
167                         ecode = NTDB_ERR_IO;
168                         goto fail;
169                 }
170         }
171
172         /* now copy it out of this block */
173         memcpy(buf, ntdb->transaction->blocks[blk] + (off % PAGESIZE), len);
174         return NTDB_SUCCESS;
175
176 fail:
177         ntdb->transaction->transaction_error = 1;
178         return ntdb_logerr(ntdb, ecode, NTDB_LOG_ERROR,
179                           "transaction_read: failed at off=%zu len=%zu",
180                           (size_t)off, (size_t)len);
181 }
182
183
184 /*
185   write while in a transaction
186 */
187 static enum NTDB_ERROR transaction_write(struct ntdb_context *ntdb, ntdb_off_t off,
188                                         const void *buf, ntdb_len_t len)
189 {
190         size_t blk;
191         enum NTDB_ERROR ecode;
192
193         /* Only a commit is allowed on a prepared transaction */
194         if (ntdb->transaction->prepared) {
195                 ecode = ntdb_logerr(ntdb, NTDB_ERR_EINVAL, NTDB_LOG_ERROR,
196                                    "transaction_write: transaction already"
197                                    " prepared, write not allowed");
198                 goto fail;
199         }
200
201         /* break it up into block sized chunks */
202         while (len + (off % PAGESIZE) > PAGESIZE) {
203                 ntdb_len_t len2 = PAGESIZE - (off % PAGESIZE);
204                 ecode = transaction_write(ntdb, off, buf, len2);
205                 if (ecode != NTDB_SUCCESS) {
206                         return ecode;
207                 }
208                 len -= len2;
209                 off += len2;
210                 if (buf != NULL) {
211                         buf = (const void *)(len2 + (const char *)buf);
212                 }
213         }
214
215         if (len == 0) {
216                 return NTDB_SUCCESS;
217         }
218
219         blk = off / PAGESIZE;
220         off = off % PAGESIZE;
221
222         if (ntdb->transaction->num_blocks <= blk) {
223                 uint8_t **new_blocks;
224                 /* expand the blocks array */
225                 if (ntdb->transaction->blocks == NULL) {
226                         new_blocks = (uint8_t **)malloc(
227                                 (blk+1)*sizeof(uint8_t *));
228                 } else {
229                         new_blocks = (uint8_t **)realloc(
230                                 ntdb->transaction->blocks,
231                                 (blk+1)*sizeof(uint8_t *));
232                 }
233                 if (new_blocks == NULL) {
234                         ecode = ntdb_logerr(ntdb, NTDB_ERR_OOM, NTDB_LOG_ERROR,
235                                            "transaction_write:"
236                                            " failed to allocate");
237                         goto fail;
238                 }
239                 memset(&new_blocks[ntdb->transaction->num_blocks], 0,
240                        (1+(blk - ntdb->transaction->num_blocks))*sizeof(uint8_t *));
241                 ntdb->transaction->blocks = new_blocks;
242                 ntdb->transaction->num_blocks = blk+1;
243                 ntdb->transaction->last_block_size = 0;
244         }
245
246         /* allocate and fill a block? */
247         if (ntdb->transaction->blocks[blk] == NULL) {
248                 ntdb->transaction->blocks[blk] = (uint8_t *)calloc(PAGESIZE, 1);
249                 if (ntdb->transaction->blocks[blk] == NULL) {
250                         ecode = ntdb_logerr(ntdb, NTDB_ERR_OOM, NTDB_LOG_ERROR,
251                                            "transaction_write:"
252                                            " failed to allocate");
253                         goto fail;
254                 }
255                 if (ntdb->transaction->old_map_size > blk * PAGESIZE) {
256                         ntdb_len_t len2 = PAGESIZE;
257                         if (len2 + (blk * PAGESIZE) > ntdb->transaction->old_map_size) {
258                                 len2 = ntdb->transaction->old_map_size - (blk * PAGESIZE);
259                         }
260                         ecode = ntdb->transaction->io_methods->tread(ntdb,
261                                         blk * PAGESIZE,
262                                         ntdb->transaction->blocks[blk],
263                                         len2);
264                         if (ecode != NTDB_SUCCESS) {
265                                 ecode = ntdb_logerr(ntdb, ecode,
266                                                    NTDB_LOG_ERROR,
267                                                    "transaction_write:"
268                                                    " failed to"
269                                                    " read old block: %s",
270                                                    strerror(errno));
271                                 SAFE_FREE(ntdb->transaction->blocks[blk]);
272                                 goto fail;
273                         }
274                         if (blk == ntdb->transaction->num_blocks-1) {
275                                 ntdb->transaction->last_block_size = len2;
276                         }
277                 }
278         }
279
280         /* overwrite part of an existing block */
281         if (buf == NULL) {
282                 memset(ntdb->transaction->blocks[blk] + off, 0, len);
283         } else {
284                 memcpy(ntdb->transaction->blocks[blk] + off, buf, len);
285         }
286         if (blk == ntdb->transaction->num_blocks-1) {
287                 if (len + off > ntdb->transaction->last_block_size) {
288                         ntdb->transaction->last_block_size = len + off;
289                 }
290         }
291
292         return NTDB_SUCCESS;
293
294 fail:
295         ntdb->transaction->transaction_error = 1;
296         return ecode;
297 }
298
299
300 /*
301   write while in a transaction - this variant never expands the transaction blocks, it only
302   updates existing blocks. This means it cannot change the recovery size
303 */
304 static void transaction_write_existing(struct ntdb_context *ntdb, ntdb_off_t off,
305                                        const void *buf, ntdb_len_t len)
306 {
307         size_t blk;
308
309         /* break it up into block sized chunks */
310         while (len + (off % PAGESIZE) > PAGESIZE) {
311                 ntdb_len_t len2 = PAGESIZE - (off % PAGESIZE);
312                 transaction_write_existing(ntdb, off, buf, len2);
313                 len -= len2;
314                 off += len2;
315                 if (buf != NULL) {
316                         buf = (const void *)(len2 + (const char *)buf);
317                 }
318         }
319
320         if (len == 0) {
321                 return;
322         }
323
324         blk = off / PAGESIZE;
325         off = off % PAGESIZE;
326
327         if (ntdb->transaction->num_blocks <= blk ||
328             ntdb->transaction->blocks[blk] == NULL) {
329                 return;
330         }
331
332         if (blk == ntdb->transaction->num_blocks-1 &&
333             off + len > ntdb->transaction->last_block_size) {
334                 if (off >= ntdb->transaction->last_block_size) {
335                         return;
336                 }
337                 len = ntdb->transaction->last_block_size - off;
338         }
339
340         /* overwrite part of an existing block */
341         memcpy(ntdb->transaction->blocks[blk] + off, buf, len);
342 }
343
344
345 /*
346   out of bounds check during a transaction
347 */
348 static enum NTDB_ERROR transaction_oob(struct ntdb_context *ntdb,
349                                       ntdb_off_t off, ntdb_len_t len, bool probe)
350 {
351         if ((off + len >= off && off + len <= ntdb->file->map_size) || probe) {
352                 return NTDB_SUCCESS;
353         }
354
355         ntdb_logerr(ntdb, NTDB_ERR_IO, NTDB_LOG_ERROR,
356                    "ntdb_oob len %lld beyond transaction size %lld",
357                    (long long)(off + len),
358                    (long long)ntdb->file->map_size);
359         return NTDB_ERR_IO;
360 }
361
362 /*
363   transaction version of ntdb_expand().
364 */
365 static enum NTDB_ERROR transaction_expand_file(struct ntdb_context *ntdb,
366                                               ntdb_off_t addition)
367 {
368         enum NTDB_ERROR ecode;
369
370         /* add a write to the transaction elements, so subsequent
371            reads see the zero data */
372         ecode = transaction_write(ntdb, ntdb->file->map_size, NULL, addition);
373         if (ecode == NTDB_SUCCESS) {
374                 ntdb->file->map_size += addition;
375         }
376         return ecode;
377 }
378
379 static void *transaction_direct(struct ntdb_context *ntdb, ntdb_off_t off,
380                                 size_t len, bool write_mode)
381 {
382         size_t blk = off / PAGESIZE, end_blk;
383
384         /* This is wrong for zero-length blocks, but will fail gracefully */
385         end_blk = (off + len - 1) / PAGESIZE;
386
387         /* Can only do direct if in single block and we've already copied. */
388         if (write_mode) {
389                 ntdb->stats.transaction_write_direct++;
390                 if (blk != end_blk
391                     || blk >= ntdb->transaction->num_blocks
392                     || ntdb->transaction->blocks[blk] == NULL) {
393                         ntdb->stats.transaction_write_direct_fail++;
394                         return NULL;
395                 }
396                 return ntdb->transaction->blocks[blk] + off % PAGESIZE;
397         }
398
399         ntdb->stats.transaction_read_direct++;
400         /* Single which we have copied? */
401         if (blk == end_blk
402             && blk < ntdb->transaction->num_blocks
403             && ntdb->transaction->blocks[blk])
404                 return ntdb->transaction->blocks[blk] + off % PAGESIZE;
405
406         /* Otherwise must be all not copied. */
407         while (blk <= end_blk) {
408                 if (blk >= ntdb->transaction->num_blocks)
409                         break;
410                 if (ntdb->transaction->blocks[blk]) {
411                         ntdb->stats.transaction_read_direct_fail++;
412                         return NULL;
413                 }
414                 blk++;
415         }
416         return ntdb->transaction->io_methods->direct(ntdb, off, len, false);
417 }
418
419 static const struct ntdb_methods transaction_methods = {
420         transaction_read,
421         transaction_write,
422         transaction_oob,
423         transaction_expand_file,
424         transaction_direct,
425 };
426
427 /*
428   sync to disk
429 */
430 static enum NTDB_ERROR transaction_sync(struct ntdb_context *ntdb,
431                                        ntdb_off_t offset, ntdb_len_t length)
432 {
433         if (ntdb->flags & NTDB_NOSYNC) {
434                 return NTDB_SUCCESS;
435         }
436
437         if (fsync(ntdb->file->fd) != 0) {
438                 return ntdb_logerr(ntdb, NTDB_ERR_IO, NTDB_LOG_ERROR,
439                                   "ntdb_transaction: fsync failed: %s",
440                                   strerror(errno));
441         }
442 #ifdef MS_SYNC
443         if (ntdb->file->map_ptr) {
444                 ntdb_off_t moffset = offset & ~(getpagesize()-1);
445                 if (msync(moffset + (char *)ntdb->file->map_ptr,
446                           length + (offset - moffset), MS_SYNC) != 0) {
447                         return ntdb_logerr(ntdb, NTDB_ERR_IO, NTDB_LOG_ERROR,
448                                           "ntdb_transaction: msync failed: %s",
449                                           strerror(errno));
450                 }
451         }
452 #endif
453         return NTDB_SUCCESS;
454 }
455
456
457 static void _ntdb_transaction_cancel(struct ntdb_context *ntdb)
458 {
459         int i;
460         enum NTDB_ERROR ecode;
461
462         if (ntdb->transaction == NULL) {
463                 ntdb_logerr(ntdb, NTDB_ERR_EINVAL, NTDB_LOG_USE_ERROR,
464                            "ntdb_transaction_cancel: no transaction");
465                 return;
466         }
467
468         if (ntdb->transaction->nesting != 0) {
469                 ntdb->transaction->transaction_error = 1;
470                 ntdb->transaction->nesting--;
471                 return;
472         }
473
474         ntdb->file->map_size = ntdb->transaction->old_map_size;
475
476         /* free all the transaction blocks */
477         for (i=0;i<ntdb->transaction->num_blocks;i++) {
478                 if (ntdb->transaction->blocks[i] != NULL) {
479                         free(ntdb->transaction->blocks[i]);
480                 }
481         }
482         SAFE_FREE(ntdb->transaction->blocks);
483
484         if (ntdb->transaction->magic_offset) {
485                 const struct ntdb_methods *methods = ntdb->transaction->io_methods;
486                 uint64_t invalid = NTDB_RECOVERY_INVALID_MAGIC;
487
488                 /* remove the recovery marker */
489                 ecode = methods->twrite(ntdb, ntdb->transaction->magic_offset,
490                                         &invalid, sizeof(invalid));
491                 if (ecode == NTDB_SUCCESS)
492                         ecode = transaction_sync(ntdb,
493                                                  ntdb->transaction->magic_offset,
494                                                  sizeof(invalid));
495                 if (ecode != NTDB_SUCCESS) {
496                         ntdb_logerr(ntdb, ecode, NTDB_LOG_ERROR,
497                                    "ntdb_transaction_cancel: failed to remove"
498                                    " recovery magic");
499                 }
500         }
501
502         if (ntdb->file->allrecord_lock.count)
503                 ntdb_allrecord_unlock(ntdb, ntdb->file->allrecord_lock.ltype);
504
505         /* restore the normal io methods */
506         ntdb->io = ntdb->transaction->io_methods;
507
508         ntdb_transaction_unlock(ntdb, F_WRLCK);
509
510         if (ntdb_has_open_lock(ntdb))
511                 ntdb_unlock_open(ntdb, F_WRLCK);
512
513         SAFE_FREE(ntdb->transaction);
514 }
515
516 /*
517   start a ntdb transaction. No token is returned, as only a single
518   transaction is allowed to be pending per ntdb_context
519 */
520 _PUBLIC_ enum NTDB_ERROR ntdb_transaction_start(struct ntdb_context *ntdb)
521 {
522         enum NTDB_ERROR ecode;
523
524         ntdb->stats.transactions++;
525         /* some sanity checks */
526         if (ntdb->flags & NTDB_INTERNAL) {
527                 return ntdb_logerr(ntdb, NTDB_ERR_EINVAL, NTDB_LOG_USE_ERROR,
528                                    "ntdb_transaction_start:"
529                                    " cannot start a transaction on an"
530                                    " internal ntdb");
531         }
532
533         if (ntdb->flags & NTDB_RDONLY) {
534                 return ntdb_logerr(ntdb, NTDB_ERR_RDONLY, NTDB_LOG_USE_ERROR,
535                                    "ntdb_transaction_start:"
536                                    " cannot start a transaction on a"
537                                    " read-only ntdb");
538         }
539
540         /* cope with nested ntdb_transaction_start() calls */
541         if (ntdb->transaction != NULL) {
542                 if (!(ntdb->flags & NTDB_ALLOW_NESTING)) {
543                         return ntdb_logerr(ntdb, NTDB_ERR_IO,
544                                            NTDB_LOG_USE_ERROR,
545                                            "ntdb_transaction_start:"
546                                            " already inside transaction");
547                 }
548                 ntdb->transaction->nesting++;
549                 ntdb->stats.transaction_nest++;
550                 return 0;
551         }
552
553         if (ntdb_has_hash_locks(ntdb)) {
554                 /* the caller must not have any locks when starting a
555                    transaction as otherwise we'll be screwed by lack
556                    of nested locks in POSIX */
557                 return ntdb_logerr(ntdb, NTDB_ERR_LOCK,
558                                    NTDB_LOG_USE_ERROR,
559                                    "ntdb_transaction_start:"
560                                    " cannot start a transaction with locks"
561                                    " held");
562         }
563
564         ntdb->transaction = (struct ntdb_transaction *)
565                 calloc(sizeof(struct ntdb_transaction), 1);
566         if (ntdb->transaction == NULL) {
567                 return ntdb_logerr(ntdb, NTDB_ERR_OOM, NTDB_LOG_ERROR,
568                                    "ntdb_transaction_start:"
569                                    " cannot allocate");
570         }
571
572         /* get the transaction write lock. This is a blocking lock. As
573            discussed with Volker, there are a number of ways we could
574            make this async, which we will probably do in the future */
575         ecode = ntdb_transaction_lock(ntdb, F_WRLCK);
576         if (ecode != NTDB_SUCCESS) {
577                 SAFE_FREE(ntdb->transaction->blocks);
578                 SAFE_FREE(ntdb->transaction);
579                 return ecode;
580         }
581
582         /* get a read lock over entire file. This is upgraded to a write
583            lock during the commit */
584         ecode = ntdb_allrecord_lock(ntdb, F_RDLCK, NTDB_LOCK_WAIT, true);
585         if (ecode != NTDB_SUCCESS) {
586                 goto fail_allrecord_lock;
587         }
588
589         /* make sure we know about any file expansions already done by
590            anyone else */
591         ntdb->io->oob(ntdb, ntdb->file->map_size, 1, true);
592         ntdb->transaction->old_map_size = ntdb->file->map_size;
593
594         /* finally hook the io methods, replacing them with
595            transaction specific methods */
596         ntdb->transaction->io_methods = ntdb->io;
597         ntdb->io = &transaction_methods;
598         return NTDB_SUCCESS;
599
600 fail_allrecord_lock:
601         ntdb_transaction_unlock(ntdb, F_WRLCK);
602         SAFE_FREE(ntdb->transaction->blocks);
603         SAFE_FREE(ntdb->transaction);
604         return ecode;
605 }
606
607
608 /*
609   cancel the current transaction
610 */
611 _PUBLIC_ void ntdb_transaction_cancel(struct ntdb_context *ntdb)
612 {
613         ntdb->stats.transaction_cancel++;
614         _ntdb_transaction_cancel(ntdb);
615 }
616
617 /*
618   work out how much space the linearised recovery data will consume (worst case)
619 */
620 static ntdb_len_t ntdb_recovery_size(struct ntdb_context *ntdb)
621 {
622         ntdb_len_t recovery_size = 0;
623         int i;
624
625         recovery_size = 0;
626         for (i=0;i<ntdb->transaction->num_blocks;i++) {
627                 if (i * PAGESIZE >= ntdb->transaction->old_map_size) {
628                         break;
629                 }
630                 if (ntdb->transaction->blocks[i] == NULL) {
631                         continue;
632                 }
633                 recovery_size += 2*sizeof(ntdb_off_t);
634                 if (i == ntdb->transaction->num_blocks-1) {
635                         recovery_size += ntdb->transaction->last_block_size;
636                 } else {
637                         recovery_size += PAGESIZE;
638                 }
639         }
640
641         return recovery_size;
642 }
643
644 static enum NTDB_ERROR ntdb_recovery_area(struct ntdb_context *ntdb,
645                                         const struct ntdb_methods *methods,
646                                         ntdb_off_t *recovery_offset,
647                                         struct ntdb_recovery_record *rec)
648 {
649         enum NTDB_ERROR ecode;
650
651         *recovery_offset = ntdb_read_off(ntdb,
652                                         offsetof(struct ntdb_header, recovery));
653         if (NTDB_OFF_IS_ERR(*recovery_offset)) {
654                 return NTDB_OFF_TO_ERR(*recovery_offset);
655         }
656
657         if (*recovery_offset == 0) {
658                 rec->max_len = 0;
659                 return NTDB_SUCCESS;
660         }
661
662         ecode = methods->tread(ntdb, *recovery_offset, rec, sizeof(*rec));
663         if (ecode != NTDB_SUCCESS)
664                 return ecode;
665
666         ntdb_convert(ntdb, rec, sizeof(*rec));
667         /* ignore invalid recovery regions: can happen in crash */
668         if (rec->magic != NTDB_RECOVERY_MAGIC &&
669             rec->magic != NTDB_RECOVERY_INVALID_MAGIC) {
670                 *recovery_offset = 0;
671                 rec->max_len = 0;
672         }
673         return NTDB_SUCCESS;
674 }
675
676 static unsigned int same(const unsigned char *new,
677                          const unsigned char *old,
678                          unsigned int length)
679 {
680         unsigned int i;
681
682         for (i = 0; i < length; i++) {
683                 if (new[i] != old[i])
684                         break;
685         }
686         return i;
687 }
688
689 static unsigned int different(const unsigned char *new,
690                               const unsigned char *old,
691                               unsigned int length,
692                               unsigned int min_same,
693                               unsigned int *samelen)
694 {
695         unsigned int i;
696
697         *samelen = 0;
698         for (i = 0; i < length; i++) {
699                 if (new[i] == old[i]) {
700                         (*samelen)++;
701                 } else {
702                         if (*samelen >= min_same) {
703                                 return i - *samelen;
704                         }
705                         *samelen = 0;
706                 }
707         }
708
709         if (*samelen < min_same)
710                 *samelen = 0;
711         return length - *samelen;
712 }
713
714 /* Allocates recovery blob, without ntdb_recovery_record at head set up. */
715 static struct ntdb_recovery_record *alloc_recovery(struct ntdb_context *ntdb,
716                                                   ntdb_len_t *len)
717 {
718         struct ntdb_recovery_record *rec;
719         size_t i;
720         enum NTDB_ERROR ecode;
721         unsigned char *p;
722         const struct ntdb_methods *old_methods = ntdb->io;
723
724         rec = malloc(sizeof(*rec) + ntdb_recovery_size(ntdb));
725         if (!rec) {
726                 ntdb_logerr(ntdb, NTDB_ERR_OOM, NTDB_LOG_ERROR,
727                            "transaction_setup_recovery:"
728                            " cannot allocate");
729                 return NTDB_ERR_PTR(NTDB_ERR_OOM);
730         }
731
732         /* We temporarily revert to the old I/O methods, so we can use
733          * ntdb_access_read */
734         ntdb->io = ntdb->transaction->io_methods;
735
736         /* build the recovery data into a single blob to allow us to do a single
737            large write, which should be more efficient */
738         p = (unsigned char *)(rec + 1);
739         for (i=0;i<ntdb->transaction->num_blocks;i++) {
740                 ntdb_off_t offset;
741                 ntdb_len_t length;
742                 unsigned int off;
743                 const unsigned char *buffer;
744
745                 if (ntdb->transaction->blocks[i] == NULL) {
746                         continue;
747                 }
748
749                 offset = i * PAGESIZE;
750                 length = PAGESIZE;
751                 if (i == ntdb->transaction->num_blocks-1) {
752                         length = ntdb->transaction->last_block_size;
753                 }
754
755                 if (offset >= ntdb->transaction->old_map_size) {
756                         continue;
757                 }
758
759                 if (offset + length > ntdb->file->map_size) {
760                         ecode = ntdb_logerr(ntdb, NTDB_ERR_CORRUPT, NTDB_LOG_ERROR,
761                                            "ntdb_transaction_setup_recovery:"
762                                            " transaction data over new region"
763                                            " boundary");
764                         goto fail;
765                 }
766                 if (offset + length > ntdb->transaction->old_map_size) {
767                         /* Short read at EOF. */
768                         length = ntdb->transaction->old_map_size - offset;
769                 }
770                 buffer = ntdb_access_read(ntdb, offset, length, false);
771                 if (NTDB_PTR_IS_ERR(buffer)) {
772                         ecode = NTDB_PTR_ERR(buffer);
773                         goto fail;
774                 }
775
776                 /* Skip over anything the same at the start. */
777                 off = same(ntdb->transaction->blocks[i], buffer, length);
778                 offset += off;
779
780                 while (off < length) {
781                         ntdb_len_t len1;
782                         unsigned int samelen;
783
784                         len1 = different(ntdb->transaction->blocks[i] + off,
785                                         buffer + off, length - off,
786                                         sizeof(offset) + sizeof(len1) + 1,
787                                         &samelen);
788
789                         memcpy(p, &offset, sizeof(offset));
790                         memcpy(p + sizeof(offset), &len1, sizeof(len1));
791                         ntdb_convert(ntdb, p, sizeof(offset) + sizeof(len1));
792                         p += sizeof(offset) + sizeof(len1);
793                         memcpy(p, buffer + off, len1);
794                         p += len1;
795                         off += len1 + samelen;
796                         offset += len1 + samelen;
797                 }
798                 ntdb_access_release(ntdb, buffer);
799         }
800
801         *len = p - (unsigned char *)(rec + 1);
802         ntdb->io = old_methods;
803         return rec;
804
805 fail:
806         free(rec);
807         ntdb->io = old_methods;
808         return NTDB_ERR_PTR(ecode);
809 }
810
811 static ntdb_off_t create_recovery_area(struct ntdb_context *ntdb,
812                                       ntdb_len_t rec_length,
813                                       struct ntdb_recovery_record *rec)
814 {
815         ntdb_off_t off, recovery_off;
816         ntdb_len_t addition;
817         enum NTDB_ERROR ecode;
818         const struct ntdb_methods *methods = ntdb->transaction->io_methods;
819
820         /* round up to a multiple of page size. Overallocate, since each
821          * such allocation forces us to expand the file. */
822         rec->max_len = ntdb_expand_adjust(ntdb->file->map_size, rec_length);
823
824         /* Round up to a page. */
825         rec->max_len = ((sizeof(*rec) + rec->max_len + PAGESIZE-1)
826                         & ~(PAGESIZE-1))
827                 - sizeof(*rec);
828
829         off = ntdb->file->map_size;
830
831         /* Restore ->map_size before calling underlying expand_file.
832            Also so that we don't try to expand the file again in the
833            transaction commit, which would destroy the recovery
834            area */
835         addition = (ntdb->file->map_size - ntdb->transaction->old_map_size) +
836                 sizeof(*rec) + rec->max_len;
837         ntdb->file->map_size = ntdb->transaction->old_map_size;
838         ntdb->stats.transaction_expand_file++;
839         ecode = methods->expand_file(ntdb, addition);
840         if (ecode != NTDB_SUCCESS) {
841                 ntdb_logerr(ntdb, ecode, NTDB_LOG_ERROR,
842                            "ntdb_recovery_allocate:"
843                            " failed to create recovery area");
844                 return NTDB_ERR_TO_OFF(ecode);
845         }
846
847         /* we have to reset the old map size so that we don't try to
848            expand the file again in the transaction commit, which
849            would destroy the recovery area */
850         ntdb->transaction->old_map_size = ntdb->file->map_size;
851
852         /* write the recovery header offset and sync - we can sync without a race here
853            as the magic ptr in the recovery record has not been set */
854         recovery_off = off;
855         ntdb_convert(ntdb, &recovery_off, sizeof(recovery_off));
856         ecode = methods->twrite(ntdb, offsetof(struct ntdb_header, recovery),
857                                 &recovery_off, sizeof(ntdb_off_t));
858         if (ecode != NTDB_SUCCESS) {
859                 ntdb_logerr(ntdb, ecode, NTDB_LOG_ERROR,
860                            "ntdb_recovery_allocate:"
861                            " failed to write recovery head");
862                 return NTDB_ERR_TO_OFF(ecode);
863         }
864         transaction_write_existing(ntdb, offsetof(struct ntdb_header, recovery),
865                                    &recovery_off,
866                                    sizeof(ntdb_off_t));
867         return off;
868 }
869
870 /*
871   setup the recovery data that will be used on a crash during commit
872 */
873 static enum NTDB_ERROR transaction_setup_recovery(struct ntdb_context *ntdb)
874 {
875         ntdb_len_t recovery_size = 0;
876         ntdb_off_t recovery_off = 0;
877         ntdb_off_t old_map_size = ntdb->transaction->old_map_size;
878         struct ntdb_recovery_record *recovery;
879         const struct ntdb_methods *methods = ntdb->transaction->io_methods;
880         uint64_t magic;
881         enum NTDB_ERROR ecode;
882
883         recovery = alloc_recovery(ntdb, &recovery_size);
884         if (NTDB_PTR_IS_ERR(recovery))
885                 return NTDB_PTR_ERR(recovery);
886
887         ecode = ntdb_recovery_area(ntdb, methods, &recovery_off, recovery);
888         if (ecode) {
889                 free(recovery);
890                 return ecode;
891         }
892
893         if (recovery->max_len < recovery_size) {
894                 /* Not large enough. Free up old recovery area. */
895                 if (recovery_off) {
896                         ntdb->stats.frees++;
897                         ecode = add_free_record(ntdb, recovery_off,
898                                                 sizeof(*recovery)
899                                                 + recovery->max_len,
900                                                 NTDB_LOCK_WAIT, true);
901                         free(recovery);
902                         if (ecode != NTDB_SUCCESS) {
903                                 return ntdb_logerr(ntdb, ecode, NTDB_LOG_ERROR,
904                                                   "ntdb_recovery_allocate:"
905                                                   " failed to free previous"
906                                                   " recovery area");
907                         }
908
909                         /* Refresh recovery after add_free_record above. */
910                         recovery = alloc_recovery(ntdb, &recovery_size);
911                         if (NTDB_PTR_IS_ERR(recovery))
912                                 return NTDB_PTR_ERR(recovery);
913                 }
914
915                 recovery_off = create_recovery_area(ntdb, recovery_size,
916                                                     recovery);
917                 if (NTDB_OFF_IS_ERR(recovery_off)) {
918                         free(recovery);
919                         return NTDB_OFF_TO_ERR(recovery_off);
920                 }
921         }
922
923         /* Now we know size, convert rec header. */
924         recovery->magic = NTDB_RECOVERY_INVALID_MAGIC;
925         recovery->len = recovery_size;
926         recovery->eof = old_map_size;
927         ntdb_convert(ntdb, recovery, sizeof(*recovery));
928
929         /* write the recovery data to the recovery area */
930         ecode = methods->twrite(ntdb, recovery_off, recovery,
931                                 sizeof(*recovery) + recovery_size);
932         if (ecode != NTDB_SUCCESS) {
933                 free(recovery);
934                 return ntdb_logerr(ntdb, ecode, NTDB_LOG_ERROR,
935                                   "ntdb_transaction_setup_recovery:"
936                                   " failed to write recovery data");
937         }
938         transaction_write_existing(ntdb, recovery_off, recovery, recovery_size);
939
940         free(recovery);
941
942         /* as we don't have ordered writes, we have to sync the recovery
943            data before we update the magic to indicate that the recovery
944            data is present */
945         ecode = transaction_sync(ntdb, recovery_off, recovery_size);
946         if (ecode != NTDB_SUCCESS)
947                 return ecode;
948
949         magic = NTDB_RECOVERY_MAGIC;
950         ntdb_convert(ntdb, &magic, sizeof(magic));
951
952         ntdb->transaction->magic_offset
953                 = recovery_off + offsetof(struct ntdb_recovery_record, magic);
954
955         ecode = methods->twrite(ntdb, ntdb->transaction->magic_offset,
956                                 &magic, sizeof(magic));
957         if (ecode != NTDB_SUCCESS) {
958                 return ntdb_logerr(ntdb, ecode, NTDB_LOG_ERROR,
959                                   "ntdb_transaction_setup_recovery:"
960                                   " failed to write recovery magic");
961         }
962         transaction_write_existing(ntdb, ntdb->transaction->magic_offset,
963                                    &magic, sizeof(magic));
964
965         /* ensure the recovery magic marker is on disk */
966         return transaction_sync(ntdb, ntdb->transaction->magic_offset,
967                                 sizeof(magic));
968 }
969
970 static enum NTDB_ERROR _ntdb_transaction_prepare_commit(struct ntdb_context *ntdb)
971 {
972         const struct ntdb_methods *methods;
973         enum NTDB_ERROR ecode;
974
975         if (ntdb->transaction == NULL) {
976                 return ntdb_logerr(ntdb, NTDB_ERR_EINVAL, NTDB_LOG_USE_ERROR,
977                                   "ntdb_transaction_prepare_commit:"
978                                   " no transaction");
979         }
980
981         if (ntdb->transaction->prepared) {
982                 _ntdb_transaction_cancel(ntdb);
983                 return ntdb_logerr(ntdb, NTDB_ERR_EINVAL, NTDB_LOG_USE_ERROR,
984                                   "ntdb_transaction_prepare_commit:"
985                                   " transaction already prepared");
986         }
987
988         if (ntdb->transaction->transaction_error) {
989                 _ntdb_transaction_cancel(ntdb);
990                 return ntdb_logerr(ntdb, NTDB_ERR_EINVAL, NTDB_LOG_ERROR,
991                                   "ntdb_transaction_prepare_commit:"
992                                   " transaction error pending");
993         }
994
995
996         if (ntdb->transaction->nesting != 0) {
997                 return NTDB_SUCCESS;
998         }
999
1000         /* check for a null transaction */
1001         if (ntdb->transaction->blocks == NULL) {
1002                 return NTDB_SUCCESS;
1003         }
1004
1005         methods = ntdb->transaction->io_methods;
1006
1007         /* upgrade the main transaction lock region to a write lock */
1008         ecode = ntdb_allrecord_upgrade(ntdb, NTDB_HASH_LOCK_START);
1009         if (ecode != NTDB_SUCCESS) {
1010                 return ecode;
1011         }
1012
1013         /* get the open lock - this prevents new users attaching to the database
1014            during the commit */
1015         ecode = ntdb_lock_open(ntdb, F_WRLCK, NTDB_LOCK_WAIT|NTDB_LOCK_NOCHECK);
1016         if (ecode != NTDB_SUCCESS) {
1017                 return ecode;
1018         }
1019
1020         /* Since we have whole db locked, we don't need the expansion lock. */
1021         if (!(ntdb->flags & NTDB_NOSYNC)) {
1022                 /* Sets up ntdb->transaction->recovery and
1023                  * ntdb->transaction->magic_offset. */
1024                 ecode = transaction_setup_recovery(ntdb);
1025                 if (ecode != NTDB_SUCCESS) {
1026                         return ecode;
1027                 }
1028         }
1029
1030         ntdb->transaction->prepared = true;
1031
1032         /* expand the file to the new size if needed */
1033         if (ntdb->file->map_size != ntdb->transaction->old_map_size) {
1034                 ntdb_len_t add;
1035
1036                 add = ntdb->file->map_size - ntdb->transaction->old_map_size;
1037                 /* Restore original map size for ntdb_expand_file */
1038                 ntdb->file->map_size = ntdb->transaction->old_map_size;
1039                 ecode = methods->expand_file(ntdb, add);
1040                 if (ecode != NTDB_SUCCESS) {
1041                         return ecode;
1042                 }
1043         }
1044
1045         /* Keep the open lock until the actual commit */
1046         return NTDB_SUCCESS;
1047 }
1048
1049 /*
1050    prepare to commit the current transaction
1051 */
1052 _PUBLIC_ enum NTDB_ERROR ntdb_transaction_prepare_commit(struct ntdb_context *ntdb)
1053 {
1054         return _ntdb_transaction_prepare_commit(ntdb);
1055 }
1056
1057 /*
1058   commit the current transaction
1059 */
1060 _PUBLIC_ enum NTDB_ERROR ntdb_transaction_commit(struct ntdb_context *ntdb)
1061 {
1062         const struct ntdb_methods *methods;
1063         int i;
1064         enum NTDB_ERROR ecode;
1065
1066         if (ntdb->transaction == NULL) {
1067                 return ntdb_logerr(ntdb, NTDB_ERR_EINVAL, NTDB_LOG_USE_ERROR,
1068                                    "ntdb_transaction_commit:"
1069                                    " no transaction");
1070         }
1071
1072         ntdb_trace(ntdb, "ntdb_transaction_commit");
1073
1074         if (ntdb->transaction->nesting != 0) {
1075                 ntdb->transaction->nesting--;
1076                 return NTDB_SUCCESS;
1077         }
1078
1079         /* check for a null transaction */
1080         if (ntdb->transaction->blocks == NULL) {
1081                 _ntdb_transaction_cancel(ntdb);
1082                 return NTDB_SUCCESS;
1083         }
1084
1085         if (!ntdb->transaction->prepared) {
1086                 ecode = _ntdb_transaction_prepare_commit(ntdb);
1087                 if (ecode != NTDB_SUCCESS) {
1088                         _ntdb_transaction_cancel(ntdb);
1089                         return ecode;
1090                 }
1091         }
1092
1093         methods = ntdb->transaction->io_methods;
1094
1095         /* perform all the writes */
1096         for (i=0;i<ntdb->transaction->num_blocks;i++) {
1097                 ntdb_off_t offset;
1098                 ntdb_len_t length;
1099
1100                 if (ntdb->transaction->blocks[i] == NULL) {
1101                         continue;
1102                 }
1103
1104                 offset = i * PAGESIZE;
1105                 length = PAGESIZE;
1106                 if (i == ntdb->transaction->num_blocks-1) {
1107                         length = ntdb->transaction->last_block_size;
1108                 }
1109
1110                 ecode = methods->twrite(ntdb, offset,
1111                                         ntdb->transaction->blocks[i], length);
1112                 if (ecode != NTDB_SUCCESS) {
1113                         /* we've overwritten part of the data and
1114                            possibly expanded the file, so we need to
1115                            run the crash recovery code */
1116                         ntdb->io = methods;
1117                         ntdb_transaction_recover(ntdb);
1118
1119                         _ntdb_transaction_cancel(ntdb);
1120
1121                         return ecode;
1122                 }
1123                 SAFE_FREE(ntdb->transaction->blocks[i]);
1124         }
1125
1126         SAFE_FREE(ntdb->transaction->blocks);
1127         ntdb->transaction->num_blocks = 0;
1128
1129         /* ensure the new data is on disk */
1130         ecode = transaction_sync(ntdb, 0, ntdb->file->map_size);
1131         if (ecode != NTDB_SUCCESS) {
1132                 return ecode;
1133         }
1134
1135         /*
1136           TODO: maybe write to some dummy hdr field, or write to magic
1137           offset without mmap, before the last sync, instead of the
1138           utime() call
1139         */
1140
1141         /* on some systems (like Linux 2.6.x) changes via mmap/msync
1142            don't change the mtime of the file, this means the file may
1143            not be backed up (as ntdb rounding to block sizes means that
1144            file size changes are quite rare too). The following forces
1145            mtime changes when a transaction completes */
1146 #if HAVE_UTIME
1147         utime(ntdb->name, NULL);
1148 #endif
1149
1150         /* use a transaction cancel to free memory and remove the
1151            transaction locks: it "restores" map_size, too. */
1152         ntdb->transaction->old_map_size = ntdb->file->map_size;
1153         _ntdb_transaction_cancel(ntdb);
1154
1155         return NTDB_SUCCESS;
1156 }
1157
1158
1159 /*
1160   recover from an aborted transaction. Must be called with exclusive
1161   database write access already established (including the open
1162   lock to prevent new processes attaching)
1163 */
1164 enum NTDB_ERROR ntdb_transaction_recover(struct ntdb_context *ntdb)
1165 {
1166         ntdb_off_t recovery_head, recovery_eof;
1167         unsigned char *data, *p;
1168         struct ntdb_recovery_record rec;
1169         enum NTDB_ERROR ecode;
1170
1171         /* find the recovery area */
1172         recovery_head = ntdb_read_off(ntdb, offsetof(struct ntdb_header,recovery));
1173         if (NTDB_OFF_IS_ERR(recovery_head)) {
1174                 ecode = NTDB_OFF_TO_ERR(recovery_head);
1175                 return ntdb_logerr(ntdb, ecode, NTDB_LOG_ERROR,
1176                                   "ntdb_transaction_recover:"
1177                                   " failed to read recovery head");
1178         }
1179
1180         if (recovery_head == 0) {
1181                 /* we have never allocated a recovery record */
1182                 return NTDB_SUCCESS;
1183         }
1184
1185         /* read the recovery record */
1186         ecode = ntdb_read_convert(ntdb, recovery_head, &rec, sizeof(rec));
1187         if (ecode != NTDB_SUCCESS) {
1188                 return ntdb_logerr(ntdb, ecode, NTDB_LOG_ERROR,
1189                                   "ntdb_transaction_recover:"
1190                                   " failed to read recovery record");
1191         }
1192
1193         if (rec.magic != NTDB_RECOVERY_MAGIC) {
1194                 /* there is no valid recovery data */
1195                 return NTDB_SUCCESS;
1196         }
1197
1198         if (ntdb->flags & NTDB_RDONLY) {
1199                 return ntdb_logerr(ntdb, NTDB_ERR_CORRUPT, NTDB_LOG_ERROR,
1200                                   "ntdb_transaction_recover:"
1201                                   " attempt to recover read only database");
1202         }
1203
1204         recovery_eof = rec.eof;
1205
1206         data = (unsigned char *)malloc(rec.len);
1207         if (data == NULL) {
1208                 return ntdb_logerr(ntdb, NTDB_ERR_OOM, NTDB_LOG_ERROR,
1209                                   "ntdb_transaction_recover:"
1210                                   " failed to allocate recovery data");
1211         }
1212
1213         /* read the full recovery data */
1214         ecode = ntdb->io->tread(ntdb, recovery_head + sizeof(rec), data,
1215                                     rec.len);
1216         if (ecode != NTDB_SUCCESS) {
1217                 return ntdb_logerr(ntdb, ecode, NTDB_LOG_ERROR,
1218                                   "ntdb_transaction_recover:"
1219                                   " failed to read recovery data");
1220         }
1221
1222         /* recover the file data */
1223         p = data;
1224         while (p+sizeof(ntdb_off_t)+sizeof(ntdb_len_t) < data + rec.len) {
1225                 ntdb_off_t ofs;
1226                 ntdb_len_t len;
1227                 ntdb_convert(ntdb, p, sizeof(ofs) + sizeof(len));
1228                 memcpy(&ofs, p, sizeof(ofs));
1229                 memcpy(&len, p + sizeof(ofs), sizeof(len));
1230                 p += sizeof(ofs) + sizeof(len);
1231
1232                 ecode = ntdb->io->twrite(ntdb, ofs, p, len);
1233                 if (ecode != NTDB_SUCCESS) {
1234                         free(data);
1235                         return ntdb_logerr(ntdb, ecode, NTDB_LOG_ERROR,
1236                                           "ntdb_transaction_recover:"
1237                                           " failed to recover %zu bytes"
1238                                           " at offset %zu",
1239                                           (size_t)len, (size_t)ofs);
1240                 }
1241                 p += len;
1242         }
1243
1244         free(data);
1245
1246         ecode = transaction_sync(ntdb, 0, ntdb->file->map_size);
1247         if (ecode != NTDB_SUCCESS) {
1248                 return ntdb_logerr(ntdb, ecode, NTDB_LOG_ERROR,
1249                                   "ntdb_transaction_recover:"
1250                                   " failed to sync recovery");
1251         }
1252
1253         /* if the recovery area is after the recovered eof then remove it */
1254         if (recovery_eof <= recovery_head) {
1255                 ecode = ntdb_write_off(ntdb, offsetof(struct ntdb_header,
1256                                                     recovery),
1257                                       0);
1258                 if (ecode != NTDB_SUCCESS) {
1259                         return ntdb_logerr(ntdb, ecode, NTDB_LOG_ERROR,
1260                                           "ntdb_transaction_recover:"
1261                                           " failed to remove recovery head");
1262                 }
1263         }
1264
1265         /* remove the recovery magic */
1266         ecode = ntdb_write_off(ntdb,
1267                               recovery_head
1268                               + offsetof(struct ntdb_recovery_record, magic),
1269                               NTDB_RECOVERY_INVALID_MAGIC);
1270         if (ecode != NTDB_SUCCESS) {
1271                 return ntdb_logerr(ntdb, ecode, NTDB_LOG_ERROR,
1272                                   "ntdb_transaction_recover:"
1273                                   " failed to remove recovery magic");
1274         }
1275
1276         ecode = transaction_sync(ntdb, 0, recovery_eof);
1277         if (ecode != NTDB_SUCCESS) {
1278                 return ntdb_logerr(ntdb, ecode, NTDB_LOG_ERROR,
1279                                   "ntdb_transaction_recover:"
1280                                   " failed to sync2 recovery");
1281         }
1282
1283         ntdb_logerr(ntdb, NTDB_SUCCESS, NTDB_LOG_WARNING,
1284                    "ntdb_transaction_recover: recovered %zu byte database",
1285                    (size_t)recovery_eof);
1286
1287         /* all done */
1288         return NTDB_SUCCESS;
1289 }
1290
1291 ntdb_bool_err ntdb_needs_recovery(struct ntdb_context *ntdb)
1292 {
1293         ntdb_off_t recovery_head;
1294         struct ntdb_recovery_record rec;
1295         enum NTDB_ERROR ecode;
1296
1297         /* find the recovery area */
1298         recovery_head = ntdb_read_off(ntdb, offsetof(struct ntdb_header,recovery));
1299         if (NTDB_OFF_IS_ERR(recovery_head)) {
1300                 return recovery_head;
1301         }
1302
1303         if (recovery_head == 0) {
1304                 /* we have never allocated a recovery record */
1305                 return false;
1306         }
1307
1308         /* read the recovery record */
1309         ecode = ntdb_read_convert(ntdb, recovery_head, &rec, sizeof(rec));
1310         if (ecode != NTDB_SUCCESS) {
1311                 return NTDB_ERR_TO_OFF(ecode);
1312         }
1313
1314         return (rec.magic == NTDB_RECOVERY_MAGIC);
1315 }