r20269: merge -r20264:20267 from SAMBA_3_0_24
[ddiss/samba.git] / source3 / lib / ldb / ldb_tdb / ldb_tdb.c
1 /* 
2    ldb database library
3
4    Copyright (C) Andrew Tridgell  2004
5    Copyright (C) Stefan Metzmacher  2004
6    Copyright (C) Simo Sorce       2006
7    
8
9      ** NOTE! The following LGPL license applies to the ldb
10      ** library. This does NOT imply that all of Samba is released
11      ** under the LGPL
12    
13    This library is free software; you can redistribute it and/or
14    modify it under the terms of the GNU Lesser General Public
15    License as published by the Free Software Foundation; either
16    version 2 of the License, or (at your option) any later version.
17
18    This library is distributed in the hope that it will be useful,
19    but WITHOUT ANY WARRANTY; without even the implied warranty of
20    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
21    Lesser General Public License for more details.
22
23    You should have received a copy of the GNU Lesser General Public
24    License along with this library; if not, write to the Free Software
25    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
26 */
27
28 /*
29  *  Name: ldb_tdb
30  *
31  *  Component: ldb tdb backend
32  *
33  *  Description: core functions for tdb backend
34  *
35  *  Author: Andrew Tridgell
36  *  Author: Stefan Metzmacher
37  *
38  *  Modifications:
39  *
40  *  - description: make the module use asyncronous calls
41  *    date: Feb 2006
42  *    Author: Simo Sorce
43  */
44
45 #include "includes.h"
46 #include "ldb/include/includes.h"
47
48 #include "ldb/ldb_tdb/ldb_tdb.h"
49
50 int ltdb_check_special_dn(struct ldb_module *module, const struct ldb_message *msg);
51
52 /*
53   map a tdb error code to a ldb error code
54 */
55 static int ltdb_err_map(enum TDB_ERROR tdb_code)
56 {
57         switch (tdb_code) {
58         case TDB_SUCCESS:
59                 return LDB_SUCCESS;
60         case TDB_ERR_CORRUPT:
61         case TDB_ERR_OOM:
62         case TDB_ERR_EINVAL:
63                 return LDB_ERR_OPERATIONS_ERROR;
64         case TDB_ERR_IO:
65                 return LDB_ERR_PROTOCOL_ERROR;
66         case TDB_ERR_LOCK:
67         case TDB_ERR_NOLOCK:
68                 return LDB_ERR_BUSY;
69         case TDB_ERR_LOCK_TIMEOUT:
70                 return LDB_ERR_TIME_LIMIT_EXCEEDED;
71         case TDB_ERR_EXISTS:
72                 return LDB_ERR_ENTRY_ALREADY_EXISTS;
73         case TDB_ERR_NOEXIST:
74                 return LDB_ERR_NO_SUCH_OBJECT;
75         case TDB_ERR_RDONLY:
76                 return LDB_ERR_INSUFFICIENT_ACCESS_RIGHTS;
77         }
78         return LDB_ERR_OTHER;
79 }
80
81
82 struct ldb_handle *init_ltdb_handle(struct ltdb_private *ltdb, struct ldb_module *module,
83                                     struct ldb_request *req)
84 {
85         struct ltdb_context *ac;
86         struct ldb_handle *h;
87
88         h = talloc_zero(req, struct ldb_handle);
89         if (h == NULL) {
90                 ldb_set_errstring(module->ldb, "Out of Memory");
91                 return NULL;
92         }
93
94         h->module = module;
95
96         ac = talloc_zero(h, struct ltdb_context);
97         if (ac == NULL) {
98                 ldb_set_errstring(module->ldb, "Out of Memory");
99                 talloc_free(h);
100                 return NULL;
101         }
102
103         h->private_data = (void *)ac;
104
105         h->state = LDB_ASYNC_INIT;
106         h->status = LDB_SUCCESS;
107
108         ac->module = module;
109         ac->context = req->context;
110         ac->callback = req->callback;
111
112         return h;
113 }
114
115 /*
116   form a TDB_DATA for a record key
117   caller frees
118
119   note that the key for a record can depend on whether the 
120   dn refers to a case sensitive index record or not
121 */
122 struct TDB_DATA ltdb_key(struct ldb_module *module, const struct ldb_dn *dn)
123 {
124         struct ldb_context *ldb = module->ldb;
125         TDB_DATA key;
126         char *key_str = NULL;
127         char *dn_folded = NULL;
128
129         /*
130           most DNs are case insensitive. The exception is index DNs for
131           case sensitive attributes
132
133           there are 3 cases dealt with in this code:
134
135           1) if the dn doesn't start with @ then uppercase the attribute
136              names and the attributes values of case insensitive attributes
137           2) if the dn starts with @ then leave it alone - the indexing code handles
138              the rest
139         */
140
141         dn_folded = ldb_dn_linearize_casefold(ldb, ldb, dn);
142         if (!dn_folded) {
143                 goto failed;
144         }
145
146         key_str = talloc_asprintf(ldb, "DN=%s", dn_folded);
147
148         talloc_free(dn_folded);
149
150         if (!key_str) {
151                 goto failed;
152         }
153
154         key.dptr = (char *)key_str;
155         key.dsize = strlen(key_str) + 1;
156
157         return key;
158
159 failed:
160         errno = ENOMEM;
161         key.dptr = NULL;
162         key.dsize = 0;
163         return key;
164 }
165
166 /*
167   check special dn's have valid attributes
168   currently only @ATTRIBUTES is checked
169 */
170 int ltdb_check_special_dn(struct ldb_module *module, const struct ldb_message *msg)
171 {
172         int i, j;
173  
174         if (! ldb_dn_is_special(msg->dn) ||
175             ! ldb_dn_check_special(msg->dn, LTDB_ATTRIBUTES)) {
176                 return 0;
177         }
178
179         /* we have @ATTRIBUTES, let's check attributes are fine */
180         /* should we check that we deny multivalued attributes ? */
181         for (i = 0; i < msg->num_elements; i++) {
182                 for (j = 0; j < msg->elements[i].num_values; j++) {
183                         if (ltdb_check_at_attributes_values(&msg->elements[i].values[j]) != 0) {
184                                 ldb_set_errstring(module->ldb, "Invalid attribute value in an @ATTRIBUTES entry");
185                                 return LDB_ERR_INVALID_ATTRIBUTE_SYNTAX;
186                         }
187                 }
188         }
189
190         return 0;
191 }
192
193
194 /*
195   we've made a modification to a dn - possibly reindex and 
196   update sequence number
197 */
198 static int ltdb_modified(struct ldb_module *module, const struct ldb_dn *dn)
199 {
200         int ret = 0;
201
202         if (ldb_dn_is_special(dn) &&
203             (ldb_dn_check_special(dn, LTDB_INDEXLIST) ||
204              ldb_dn_check_special(dn, LTDB_ATTRIBUTES)) ) {
205                 ret = ltdb_reindex(module);
206         }
207
208         if (ret == 0 &&
209             !(ldb_dn_is_special(dn) &&
210               ldb_dn_check_special(dn, LTDB_BASEINFO)) ) {
211                 ret = ltdb_increase_sequence_number(module);
212         }
213
214         return ret;
215 }
216
217 /*
218   store a record into the db
219 */
220 int ltdb_store(struct ldb_module *module, const struct ldb_message *msg, int flgs)
221 {
222         struct ltdb_private *ltdb =
223                 talloc_get_type(module->private_data, struct ltdb_private);
224         TDB_DATA tdb_key, tdb_data;
225         int ret;
226
227         tdb_key = ltdb_key(module, msg->dn);
228         if (!tdb_key.dptr) {
229                 return LDB_ERR_OTHER;
230         }
231
232         ret = ltdb_pack_data(module, msg, &tdb_data);
233         if (ret == -1) {
234                 talloc_free(tdb_key.dptr);
235                 return LDB_ERR_OTHER;
236         }
237
238         ret = tdb_store(ltdb->tdb, tdb_key, tdb_data, flgs);
239         if (ret == -1) {
240                 ret = ltdb_err_map(tdb_error(ltdb->tdb));
241                 goto done;
242         }
243         
244         ret = ltdb_index_add(module, msg);
245         if (ret == -1) {
246                 tdb_delete(ltdb->tdb, tdb_key);
247         }
248
249 done:
250         talloc_free(tdb_key.dptr);
251         talloc_free(tdb_data.dptr);
252
253         return ret;
254 }
255
256
257 static int ltdb_add_internal(struct ldb_module *module, const struct ldb_message *msg)
258 {
259         int ret;
260         
261         ret = ltdb_check_special_dn(module, msg);
262         if (ret != LDB_SUCCESS) {
263                 return ret;
264         }
265         
266         if (ltdb_cache_load(module) != 0) {
267                 return LDB_ERR_OPERATIONS_ERROR;
268         }
269
270         ret = ltdb_store(module, msg, TDB_INSERT);
271
272         if (ret == LDB_ERR_ENTRY_ALREADY_EXISTS) {
273                 char *dn;
274
275                 dn = ldb_dn_linearize(module, msg->dn);
276                 if (!dn) {
277                         return ret;
278                 }
279                 ldb_asprintf_errstring(module->ldb, "Entry %s already exists", dn);
280                 talloc_free(dn);
281                 return ret;
282         }
283         
284         if (ret == LDB_SUCCESS) {
285                 ret = ltdb_modified(module, msg->dn);
286                 if (ret != LDB_SUCCESS) {
287                         return LDB_ERR_OPERATIONS_ERROR;
288                 }
289         }
290
291         return ret;
292 }
293
294 /*
295   add a record to the database
296 */
297 static int ltdb_add(struct ldb_module *module, struct ldb_request *req)
298 {
299         struct ltdb_private *ltdb = talloc_get_type(module->private_data, struct ltdb_private);
300         struct ltdb_context *ltdb_ac;
301         int tret, ret = LDB_SUCCESS;
302
303         if (req->controls != NULL) {
304                 ldb_debug(module->ldb, LDB_DEBUG_WARNING, "Controls should not reach the ldb_tdb backend!\n");
305                 if (check_critical_controls(req->controls)) {
306                         return LDB_ERR_UNSUPPORTED_CRITICAL_EXTENSION;
307                 }
308         }
309         
310         req->handle = init_ltdb_handle(ltdb, module, req);
311         if (req->handle == NULL) {
312                 return LDB_ERR_OPERATIONS_ERROR;
313         }
314         ltdb_ac = talloc_get_type(req->handle->private_data, struct ltdb_context);
315
316         tret = ltdb_add_internal(module, req->op.add.message);
317         if (tret != LDB_SUCCESS) {
318                 req->handle->status = tret;
319                 goto done;
320         }
321         
322         if (ltdb_ac->callback) {
323                 ret = ltdb_ac->callback(module->ldb, ltdb_ac->context, NULL);
324         }
325 done:
326         req->handle->state = LDB_ASYNC_DONE;
327         return ret;
328 }
329
330 /*
331   delete a record from the database, not updating indexes (used for deleting
332   index records)
333 */
334 int ltdb_delete_noindex(struct ldb_module *module, const struct ldb_dn *dn)
335 {
336         struct ltdb_private *ltdb =
337                 talloc_get_type(module->private_data, struct ltdb_private);
338         TDB_DATA tdb_key;
339         int ret;
340
341         tdb_key = ltdb_key(module, dn);
342         if (!tdb_key.dptr) {
343                 return LDB_ERR_OTHER;
344         }
345
346         ret = tdb_delete(ltdb->tdb, tdb_key);
347         talloc_free(tdb_key.dptr);
348
349         if (ret != 0) {
350                 ret = ltdb_err_map(tdb_error(ltdb->tdb));
351         }
352
353         return ret;
354 }
355
356 static int ltdb_delete_internal(struct ldb_module *module, const struct ldb_dn *dn)
357 {
358         struct ldb_message *msg;
359         int ret;
360
361         msg = talloc(module, struct ldb_message);
362         if (msg == NULL) {
363                 return LDB_ERR_OPERATIONS_ERROR;
364         }
365
366         /* in case any attribute of the message was indexed, we need
367            to fetch the old record */
368         ret = ltdb_search_dn1(module, dn, msg);
369         if (ret != 1) {
370                 /* not finding the old record is an error */
371                 talloc_free(msg);
372                 return LDB_ERR_NO_SUCH_OBJECT;
373         }
374
375         ret = ltdb_delete_noindex(module, dn);
376         if (ret != LDB_SUCCESS) {
377                 talloc_free(msg);
378                 return LDB_ERR_NO_SUCH_OBJECT;
379         }
380
381         /* remove any indexed attributes */
382         ret = ltdb_index_del(module, msg);
383         if (ret != LDB_SUCCESS) {
384                 talloc_free(msg);
385                 return LDB_ERR_OPERATIONS_ERROR;
386         }
387
388         ret = ltdb_modified(module, dn);
389         if (ret != LDB_SUCCESS) {
390                 talloc_free(msg);
391                 return LDB_ERR_OPERATIONS_ERROR;
392         }
393
394         talloc_free(msg);
395         return LDB_SUCCESS;
396 }
397
398 /*
399   delete a record from the database
400 */
401 static int ltdb_delete(struct ldb_module *module, struct ldb_request *req)
402 {
403         struct ltdb_private *ltdb = talloc_get_type(module->private_data, struct ltdb_private);
404         struct ltdb_context *ltdb_ac;
405         int tret, ret = LDB_SUCCESS;
406
407         if (req->controls != NULL) {
408                 ldb_debug(module->ldb, LDB_DEBUG_WARNING, "Controls should not reach the ldb_tdb backend!\n");
409                 if (check_critical_controls(req->controls)) {
410                         return LDB_ERR_UNSUPPORTED_CRITICAL_EXTENSION;
411                 }
412         }
413         
414         req->handle = NULL;
415
416         if (ltdb_cache_load(module) != 0) {
417                 return LDB_ERR_OPERATIONS_ERROR;
418         }
419
420         req->handle = init_ltdb_handle(ltdb, module, req);
421         if (req->handle == NULL) {
422                 return LDB_ERR_OPERATIONS_ERROR;
423         }
424         ltdb_ac = talloc_get_type(req->handle->private_data, struct ltdb_context);
425
426         tret = ltdb_delete_internal(module, req->op.del.dn);
427         if (tret != LDB_SUCCESS) {
428                 req->handle->status = tret; 
429                 goto done;
430         }
431
432         if (ltdb_ac->callback) {
433                 ret = ltdb_ac->callback(module->ldb, ltdb_ac->context, NULL);
434         }
435 done:
436         req->handle->state = LDB_ASYNC_DONE;
437         return ret;
438 }
439
440 /*
441   find an element by attribute name. At the moment this does a linear search, it should
442   be re-coded to use a binary search once all places that modify records guarantee
443   sorted order
444
445   return the index of the first matching element if found, otherwise -1
446 */
447 static int find_element(const struct ldb_message *msg, const char *name)
448 {
449         unsigned int i;
450         for (i=0;i<msg->num_elements;i++) {
451                 if (ldb_attr_cmp(msg->elements[i].name, name) == 0) {
452                         return i;
453                 }
454         }
455         return -1;
456 }
457
458
459 /*
460   add an element to an existing record. Assumes a elements array that we
461   can call re-alloc on, and assumed that we can re-use the data pointers from the 
462   passed in additional values. Use with care!
463
464   returns 0 on success, -1 on failure (and sets errno)
465 */
466 static int msg_add_element(struct ldb_context *ldb,
467                            struct ldb_message *msg, struct ldb_message_element *el)
468 {
469         struct ldb_message_element *e2;
470         unsigned int i;
471
472         e2 = talloc_realloc(msg, msg->elements, struct ldb_message_element, 
473                               msg->num_elements+1);
474         if (!e2) {
475                 errno = ENOMEM;
476                 return -1;
477         }
478
479         msg->elements = e2;
480
481         e2 = &msg->elements[msg->num_elements];
482
483         e2->name = el->name;
484         e2->flags = el->flags;
485         e2->values = NULL;
486         if (el->num_values != 0) {
487                 e2->values = talloc_array(msg->elements, struct ldb_val, el->num_values);
488                 if (!e2->values) {
489                         errno = ENOMEM;
490                         return -1;
491                 }
492         }
493         for (i=0;i<el->num_values;i++) {
494                 e2->values[i] = el->values[i];
495         }
496         e2->num_values = el->num_values;
497
498         msg->num_elements++;
499
500         return 0;
501 }
502
503 /*
504   delete all elements having a specified attribute name
505 */
506 static int msg_delete_attribute(struct ldb_module *module,
507                                 struct ldb_context *ldb,
508                                 struct ldb_message *msg, const char *name)
509 {
510         char *dn;
511         unsigned int i, j;
512
513         dn = ldb_dn_linearize(ldb, msg->dn);
514         if (dn == NULL) {
515                 return -1;
516         }
517
518         for (i=0;i<msg->num_elements;i++) {
519                 if (ldb_attr_cmp(msg->elements[i].name, name) == 0) {
520                         for (j=0;j<msg->elements[i].num_values;j++) {
521                                 ltdb_index_del_value(module, dn, &msg->elements[i], j);
522                         }
523                         talloc_free(msg->elements[i].values);
524                         if (msg->num_elements > (i+1)) {
525                                 memmove(&msg->elements[i], 
526                                         &msg->elements[i+1], 
527                                         sizeof(struct ldb_message_element)*
528                                         (msg->num_elements - (i+1)));
529                         }
530                         msg->num_elements--;
531                         i--;
532                         msg->elements = talloc_realloc(msg, msg->elements, 
533                                                          struct ldb_message_element, 
534                                                          msg->num_elements);
535                 }
536         }
537
538         talloc_free(dn);
539         return 0;
540 }
541
542 /*
543   delete all elements matching an attribute name/value 
544
545   return 0 on success, -1 on failure
546 */
547 static int msg_delete_element(struct ldb_module *module,
548                               struct ldb_message *msg, 
549                               const char *name,
550                               const struct ldb_val *val)
551 {
552         struct ldb_context *ldb = module->ldb;
553         unsigned int i;
554         int found;
555         struct ldb_message_element *el;
556         const struct ldb_attrib_handler *h;
557
558         found = find_element(msg, name);
559         if (found == -1) {
560                 return -1;
561         }
562
563         el = &msg->elements[found];
564
565         h = ldb_attrib_handler(ldb, el->name);
566
567         for (i=0;i<el->num_values;i++) {
568                 if (h->comparison_fn(ldb, ldb, &el->values[i], val) == 0) {
569                         if (i<el->num_values-1) {
570                                 memmove(&el->values[i], &el->values[i+1],
571                                         sizeof(el->values[i])*(el->num_values-(i+1)));
572                         }
573                         el->num_values--;
574                         if (el->num_values == 0) {
575                                 return msg_delete_attribute(module, ldb, msg, name);
576                         }
577                         return 0;
578                 }
579         }
580
581         return -1;
582 }
583
584
585 /*
586   modify a record - internal interface
587
588   yuck - this is O(n^2). Luckily n is usually small so we probably
589   get away with it, but if we ever have really large attribute lists 
590   then we'll need to look at this again
591 */
592 int ltdb_modify_internal(struct ldb_module *module, const struct ldb_message *msg)
593 {
594         struct ldb_context *ldb = module->ldb;
595         struct ltdb_private *ltdb =
596                 talloc_get_type(module->private_data, struct ltdb_private);
597         TDB_DATA tdb_key, tdb_data;
598         struct ldb_message *msg2;
599         unsigned i, j;
600         int ret;
601
602         tdb_key = ltdb_key(module, msg->dn);
603         if (!tdb_key.dptr) {
604                 return LDB_ERR_OTHER;
605         }
606
607         tdb_data = tdb_fetch(ltdb->tdb, tdb_key);
608         if (!tdb_data.dptr) {
609                 talloc_free(tdb_key.dptr);
610                 return ltdb_err_map(tdb_error(ltdb->tdb));
611         }
612
613         msg2 = talloc(tdb_key.dptr, struct ldb_message);
614         if (msg2 == NULL) {
615                 talloc_free(tdb_key.dptr);
616                 return LDB_ERR_OTHER;
617         }
618
619         ret = ltdb_unpack_data(module, &tdb_data, msg2);
620         if (ret == -1) {
621                 ret = LDB_ERR_OTHER;
622                 goto failed;
623         }
624
625         if (!msg2->dn) {
626                 msg2->dn = msg->dn;
627         }
628
629         for (i=0;i<msg->num_elements;i++) {
630                 struct ldb_message_element *el = &msg->elements[i];
631                 struct ldb_message_element *el2;
632                 struct ldb_val *vals;
633                 char *dn;
634
635                 switch (msg->elements[i].flags & LDB_FLAG_MOD_MASK) {
636
637                 case LDB_FLAG_MOD_ADD:
638                         /* add this element to the message. fail if it
639                            already exists */
640                         ret = find_element(msg2, el->name);
641
642                         if (ret == -1) {
643                                 if (msg_add_element(ldb, msg2, el) != 0) {
644                                         ret = LDB_ERR_OTHER;
645                                         goto failed;
646                                 }
647                                 continue;
648                         }
649
650                         el2 = &msg2->elements[ret];
651
652                         /* An attribute with this name already exists, add all
653                          * values if they don't already exist. */
654
655                         for (j=0;j<el->num_values;j++) {
656                                 if (ldb_msg_find_val(el2, &el->values[j])) {
657                                         ldb_set_errstring(module->ldb, "Type or value exists");
658                                         ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
659                                         goto failed;
660                                 }
661                         }
662
663                         vals = talloc_realloc(msg2->elements, el2->values, struct ldb_val,
664                                                 el2->num_values + el->num_values);
665
666                         if (vals == NULL) {
667                                 ret = LDB_ERR_OTHER;
668                                 goto failed;
669                         }
670
671                         for (j=0;j<el->num_values;j++) {
672                                 vals[el2->num_values + j] =
673                                         ldb_val_dup(vals, &el->values[j]);
674                         }
675
676                         el2->values = vals;
677                         el2->num_values += el->num_values;
678
679                         break;
680
681                 case LDB_FLAG_MOD_REPLACE:
682                         /* replace all elements of this attribute name with the elements
683                            listed. The attribute not existing is not an error */
684                         msg_delete_attribute(module, ldb, msg2, msg->elements[i].name);
685
686                         /* add the replacement element, if not empty */
687                         if (msg->elements[i].num_values != 0 &&
688                             msg_add_element(ldb, msg2, &msg->elements[i]) != 0) {
689                                 ret = LDB_ERR_OTHER;
690                                 goto failed;
691                         }
692                         break;
693
694                 case LDB_FLAG_MOD_DELETE:
695
696                         dn = ldb_dn_linearize(msg2, msg->dn);
697                         if (dn == NULL) {
698                                 ret = LDB_ERR_OTHER;
699                                 goto failed;
700                         }
701
702                         /* we could be being asked to delete all
703                            values or just some values */
704                         if (msg->elements[i].num_values == 0) {
705                                 if (msg_delete_attribute(module, ldb, msg2, 
706                                                          msg->elements[i].name) != 0) {
707                                         ldb_asprintf_errstring(module->ldb, "No such attribute: %s for delete on %s", msg->elements[i].name, dn);
708                                         ret = LDB_ERR_NO_SUCH_ATTRIBUTE;
709                                         goto failed;
710                                 }
711                                 break;
712                         }
713                         for (j=0;j<msg->elements[i].num_values;j++) {
714                                 if (msg_delete_element(module,
715                                                        msg2, 
716                                                        msg->elements[i].name,
717                                                        &msg->elements[i].values[j]) != 0) {
718                                         ldb_asprintf_errstring(module->ldb, "No matching attribute value when deleting attribute: %s on %s", msg->elements[i].name, dn);
719                                         ret = LDB_ERR_NO_SUCH_ATTRIBUTE;
720                                         goto failed;
721                                 }
722                                 if (ltdb_index_del_value(module, dn, &msg->elements[i], j) != 0) {
723                                         ret = LDB_ERR_OTHER;
724                                         goto failed;
725                                 }
726                         }
727                         break;
728                 default:
729                         ldb_asprintf_errstring(module->ldb, "Invalid ldb_modify flags on %s: 0x%x", 
730                                                              msg->elements[i].name, 
731                                                              msg->elements[i].flags & LDB_FLAG_MOD_MASK);
732                         ret = LDB_ERR_PROTOCOL_ERROR;
733                         goto failed;
734                 }
735         }
736
737         /* we've made all the mods - save the modified record back into the database */
738         ret = ltdb_store(module, msg2, TDB_MODIFY);
739         if (ret != LDB_SUCCESS) {
740                 goto failed;
741         }
742
743         if (ltdb_modified(module, msg->dn) != LDB_SUCCESS) {
744                 ret = LDB_ERR_OPERATIONS_ERROR;
745                 goto failed;
746         }
747
748         talloc_free(tdb_key.dptr);
749         free(tdb_data.dptr);
750         return ret;
751
752 failed:
753         talloc_free(tdb_key.dptr);
754         free(tdb_data.dptr);
755         return ret;
756 }
757
758 /*
759   modify a record
760 */
761 static int ltdb_modify(struct ldb_module *module, struct ldb_request *req)
762 {
763         struct ltdb_private *ltdb = talloc_get_type(module->private_data, struct ltdb_private);
764         struct ltdb_context *ltdb_ac;
765         int tret, ret = LDB_SUCCESS;
766
767         if (req->controls != NULL) {
768                 ldb_debug(module->ldb, LDB_DEBUG_WARNING, "Controls should not reach the ldb_tdb backend!\n");
769                 if (check_critical_controls(req->controls)) {
770                         return LDB_ERR_UNSUPPORTED_CRITICAL_EXTENSION;
771                 }
772         }
773         
774         req->handle = NULL;
775
776         req->handle = init_ltdb_handle(ltdb, module, req);
777         if (req->handle == NULL) {
778                 return LDB_ERR_OPERATIONS_ERROR;
779         }
780         ltdb_ac = talloc_get_type(req->handle->private_data, struct ltdb_context);
781
782         tret = ltdb_check_special_dn(module, req->op.mod.message);
783         if (tret != LDB_SUCCESS) {
784                 req->handle->status = tret;
785                 goto done;
786         }
787         
788         if (ltdb_cache_load(module) != 0) {
789                 ret = LDB_ERR_OPERATIONS_ERROR;
790                 goto done;
791         }
792
793         tret = ltdb_modify_internal(module, req->op.mod.message);
794         if (tret != LDB_SUCCESS) {
795                 req->handle->status = tret;
796                 goto done;
797         }
798
799         if (ltdb_ac->callback) {
800                 ret = ltdb_ac->callback(module->ldb, ltdb_ac->context, NULL);
801         }
802 done:
803         req->handle->state = LDB_ASYNC_DONE;
804         return ret;
805 }
806
807 /*
808   rename a record
809 */
810 static int ltdb_rename(struct ldb_module *module, struct ldb_request *req)
811 {
812         struct ltdb_private *ltdb = talloc_get_type(module->private_data, struct ltdb_private);
813         struct ltdb_context *ltdb_ac;
814         struct ldb_message *msg;
815         int tret, ret = LDB_SUCCESS;
816
817         if (req->controls != NULL) {
818                 ldb_debug(module->ldb, LDB_DEBUG_WARNING, "Controls should not reach the ldb_tdb backend!\n");
819                 if (check_critical_controls(req->controls)) {
820                         return LDB_ERR_UNSUPPORTED_CRITICAL_EXTENSION;
821                 }
822         }
823         
824         req->handle = NULL;
825
826         if (ltdb_cache_load(module) != 0) {
827                 return LDB_ERR_OPERATIONS_ERROR;
828         }
829
830         req->handle = init_ltdb_handle(ltdb, module, req);
831         if (req->handle == NULL) {
832                 return LDB_ERR_OPERATIONS_ERROR;
833         }
834         ltdb_ac = talloc_get_type(req->handle->private_data, struct ltdb_context);
835
836         msg = talloc(ltdb_ac, struct ldb_message);
837         if (msg == NULL) {
838                 ret = LDB_ERR_OPERATIONS_ERROR;
839                 goto done;
840         }
841
842         /* in case any attribute of the message was indexed, we need
843            to fetch the old record */
844         tret = ltdb_search_dn1(module, req->op.rename.olddn, msg);
845         if (tret != 1) {
846                 /* not finding the old record is an error */
847                 req->handle->status = LDB_ERR_NO_SUCH_OBJECT;
848                 goto done;
849         }
850
851         msg->dn = ldb_dn_copy(msg, req->op.rename.newdn);
852         if (!msg->dn) {
853                 ret = LDB_ERR_OPERATIONS_ERROR;
854                 goto done;
855         }
856
857         tret = ltdb_add_internal(module, msg);
858         if (tret != LDB_SUCCESS) {
859                 ret = LDB_ERR_OPERATIONS_ERROR;
860                 goto done;
861         }
862
863         tret = ltdb_delete_internal(module, req->op.rename.olddn);
864         if (tret != LDB_SUCCESS) {
865                 ltdb_delete_internal(module, req->op.rename.newdn);
866                 ret = LDB_ERR_OPERATIONS_ERROR;
867                 goto done;
868         }
869
870         if (ltdb_ac->callback) {
871                 ret = ltdb_ac->callback(module->ldb, ltdb_ac->context, NULL);
872         }
873 done:
874         req->handle->state = LDB_ASYNC_DONE;
875         return ret;
876 }
877
878 static int ltdb_start_trans(struct ldb_module *module)
879 {
880         struct ltdb_private *ltdb =
881                 talloc_get_type(module->private_data, struct ltdb_private);
882
883         if (tdb_transaction_start(ltdb->tdb) != 0) {
884                 return ltdb_err_map(tdb_error(ltdb->tdb));
885         }
886
887         return LDB_SUCCESS;
888 }
889
890 static int ltdb_end_trans(struct ldb_module *module)
891 {
892         struct ltdb_private *ltdb =
893                 talloc_get_type(module->private_data, struct ltdb_private);
894
895         if (tdb_transaction_commit(ltdb->tdb) != 0) {
896                 return ltdb_err_map(tdb_error(ltdb->tdb));
897         }
898
899         return LDB_SUCCESS;
900 }
901
902 static int ltdb_del_trans(struct ldb_module *module)
903 {
904         struct ltdb_private *ltdb =
905                 talloc_get_type(module->private_data, struct ltdb_private);
906
907         if (tdb_transaction_cancel(ltdb->tdb) != 0) {
908                 return ltdb_err_map(tdb_error(ltdb->tdb));
909         }
910
911         return LDB_SUCCESS;
912 }
913
914 static int ltdb_wait(struct ldb_handle *handle, enum ldb_wait_type type)
915 {
916         return handle->status;
917 }
918
919 static int ltdb_request(struct ldb_module *module, struct ldb_request *req)
920 {
921         /* check for oustanding critical controls and return an error if found */
922         if (req->controls != NULL) {
923                 ldb_debug(module->ldb, LDB_DEBUG_WARNING, "Controls should not reach the ldb_tdb backend!\n");
924                 if (check_critical_controls(req->controls)) {
925                         return LDB_ERR_UNSUPPORTED_CRITICAL_EXTENSION;
926                 }
927         }
928         
929         /* search, add, modify, delete, rename are handled by their own, no other op supported */
930         return LDB_ERR_OPERATIONS_ERROR;
931 }
932
933 /*
934   return sequenceNumber from @BASEINFO
935 */
936 static int ltdb_sequence_number(struct ldb_module *module, struct ldb_request *req)
937 {
938         TALLOC_CTX *tmp_ctx = talloc_new(req);
939         struct ldb_message *msg = NULL;
940         struct ldb_dn *dn = ldb_dn_explode(tmp_ctx, LTDB_BASEINFO);
941         int tret;
942
943         if (tmp_ctx == NULL) {
944                 talloc_free(tmp_ctx);
945                 return LDB_ERR_OPERATIONS_ERROR;
946         }
947
948         msg = talloc(tmp_ctx, struct ldb_message);
949         if (msg == NULL) {
950                 talloc_free(tmp_ctx);
951                 return LDB_ERR_OPERATIONS_ERROR;
952         }
953
954         req->op.seq_num.flags = 0;
955
956         tret = ltdb_search_dn1(module, dn, msg);
957         if (tret != 1) {
958                 talloc_free(tmp_ctx);
959                 req->op.seq_num.seq_num = 0;
960                 /* zero is as good as anything when we don't know */
961                 return LDB_SUCCESS;
962         }
963
964         switch (req->op.seq_num.type) {
965         case LDB_SEQ_HIGHEST_SEQ:
966                 req->op.seq_num.seq_num = ldb_msg_find_attr_as_uint64(msg, LTDB_SEQUENCE_NUMBER, 0);
967                 break;
968         case LDB_SEQ_NEXT:
969                 req->op.seq_num.seq_num = ldb_msg_find_attr_as_uint64(msg, LTDB_SEQUENCE_NUMBER, 0);
970                 req->op.seq_num.seq_num++;
971                 break;
972         case LDB_SEQ_HIGHEST_TIMESTAMP:
973         {
974                 const char *date = ldb_msg_find_attr_as_string(msg, LTDB_MOD_TIMESTAMP, NULL);
975                 if (date) {
976                         req->op.seq_num.seq_num = ldb_string_to_time(date);
977                 } else {
978                         req->op.seq_num.seq_num = 0;
979                         /* zero is as good as anything when we don't know */
980                 }
981                 break;
982         }
983         }
984         talloc_free(tmp_ctx);
985         return LDB_SUCCESS;
986 }
987
988 static const struct ldb_module_ops ltdb_ops = {
989         .name              = "tdb",
990         .search            = ltdb_search,
991         .add               = ltdb_add,
992         .modify            = ltdb_modify,
993         .del               = ltdb_delete,
994         .rename            = ltdb_rename,
995         .request           = ltdb_request,
996         .start_transaction = ltdb_start_trans,
997         .end_transaction   = ltdb_end_trans,
998         .del_transaction   = ltdb_del_trans,
999         .wait              = ltdb_wait,
1000         .sequence_number   = ltdb_sequence_number
1001 };
1002
1003 /*
1004   connect to the database
1005 */
1006 static int ltdb_connect(struct ldb_context *ldb, const char *url, 
1007                         unsigned int flags, const char *options[],
1008                         struct ldb_module **module)
1009 {
1010         const char *path;
1011         int tdb_flags, open_flags;
1012         struct ltdb_private *ltdb;
1013
1014         /* parse the url */
1015         if (strchr(url, ':')) {
1016                 if (strncmp(url, "tdb://", 6) != 0) {
1017                         ldb_debug(ldb, LDB_DEBUG_ERROR, "Invalid tdb URL '%s'", url);
1018                         return -1;
1019                 }
1020                 path = url+6;
1021         } else {
1022                 path = url;
1023         }
1024
1025         tdb_flags = TDB_DEFAULT | TDB_SEQNUM;
1026
1027         /* check for the 'nosync' option */
1028         if (flags & LDB_FLG_NOSYNC) {
1029                 tdb_flags |= TDB_NOSYNC;
1030         }
1031
1032         if (flags & LDB_FLG_RDONLY) {
1033                 open_flags = O_RDONLY;
1034         } else {
1035                 open_flags = O_CREAT | O_RDWR;
1036         }
1037
1038         ltdb = talloc_zero(ldb, struct ltdb_private);
1039         if (!ltdb) {
1040                 ldb_oom(ldb);
1041                 return -1;
1042         }
1043
1044         /* note that we use quite a large default hash size */
1045         ltdb->tdb = ltdb_wrap_open(ltdb, path, 10000, 
1046                                    tdb_flags, open_flags, 
1047                                    ldb->create_perms, ldb);
1048         if (!ltdb->tdb) {
1049                 ldb_debug(ldb, LDB_DEBUG_ERROR, "Unable to open tdb '%s'\n", path);
1050                 talloc_free(ltdb);
1051                 return -1;
1052         }
1053
1054         ltdb->sequence_number = 0;
1055
1056         *module = talloc(ldb, struct ldb_module);
1057         if (!module) {
1058                 ldb_oom(ldb);
1059                 talloc_free(ltdb);
1060                 return -1;
1061         }
1062         talloc_set_name_const(*module, "ldb_tdb backend");
1063         (*module)->ldb = ldb;
1064         (*module)->prev = (*module)->next = NULL;
1065         (*module)->private_data = ltdb;
1066         (*module)->ops = &ltdb_ops;
1067
1068         if (ltdb_cache_load(*module) != 0) {
1069                 talloc_free(*module);
1070                 talloc_free(ltdb);
1071                 return -1;
1072         }
1073
1074         return 0;
1075 }
1076
1077 int ldb_tdb_init(void)
1078 {
1079         return ldb_register_backend("tdb", ltdb_connect);
1080 }