r19314: Commit tridge's fixes for a big mem leak in ltdb I introduced
[samba.git] / source4 / lib / ldb / ldb_tdb / ldb_tdb.c
1 /* 
2    ldb database library
3
4    Copyright (C) Andrew Tridgell  2004
5    Copyright (C) Stefan Metzmacher  2004
6    Copyright (C) Simo Sorce       2006
7    
8
9      ** NOTE! The following LGPL license applies to the ldb
10      ** library. This does NOT imply that all of Samba is released
11      ** under the LGPL
12    
13    This library is free software; you can redistribute it and/or
14    modify it under the terms of the GNU Lesser General Public
15    License as published by the Free Software Foundation; either
16    version 2 of the License, or (at your option) any later version.
17
18    This library is distributed in the hope that it will be useful,
19    but WITHOUT ANY WARRANTY; without even the implied warranty of
20    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
21    Lesser General Public License for more details.
22
23    You should have received a copy of the GNU Lesser General Public
24    License along with this library; if not, write to the Free Software
25    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
26 */
27
28 /*
29  *  Name: ldb_tdb
30  *
31  *  Component: ldb tdb backend
32  *
33  *  Description: core functions for tdb backend
34  *
35  *  Author: Andrew Tridgell
36  *  Author: Stefan Metzmacher
37  *
38  *  Modifications:
39  *
40  *  - description: make the module use asyncronous calls
41  *    date: Feb 2006
42  *    Author: Simo Sorce
43  */
44
45 #include "includes.h"
46 #include "ldb/include/includes.h"
47
48 #include "ldb/ldb_tdb/ldb_tdb.h"
49
50
51 /*
52   map a tdb error code to a ldb error code
53 */
54 static int ltdb_err_map(enum TDB_ERROR tdb_code)
55 {
56         switch (tdb_code) {
57         case TDB_SUCCESS:
58                 return LDB_SUCCESS;
59         case TDB_ERR_CORRUPT:
60         case TDB_ERR_OOM:
61         case TDB_ERR_EINVAL:
62                 return LDB_ERR_OPERATIONS_ERROR;
63         case TDB_ERR_IO:
64                 return LDB_ERR_PROTOCOL_ERROR;
65         case TDB_ERR_LOCK:
66         case TDB_ERR_NOLOCK:
67                 return LDB_ERR_BUSY;
68         case TDB_ERR_LOCK_TIMEOUT:
69                 return LDB_ERR_TIME_LIMIT_EXCEEDED;
70         case TDB_ERR_EXISTS:
71                 return LDB_ERR_ENTRY_ALREADY_EXISTS;
72         case TDB_ERR_NOEXIST:
73                 return LDB_ERR_NO_SUCH_OBJECT;
74         case TDB_ERR_RDONLY:
75                 return LDB_ERR_INSUFFICIENT_ACCESS_RIGHTS;
76         }
77         return LDB_ERR_OTHER;
78 }
79
80
81 struct ldb_handle *init_ltdb_handle(struct ltdb_private *ltdb, struct ldb_module *module,
82                                     struct ldb_request *req)
83 {
84         struct ltdb_context *ac;
85         struct ldb_handle *h;
86
87         h = talloc_zero(req, struct ldb_handle);
88         if (h == NULL) {
89                 ldb_set_errstring(module->ldb, "Out of Memory");
90                 return NULL;
91         }
92
93         h->module = module;
94
95         ac = talloc_zero(h, struct ltdb_context);
96         if (ac == NULL) {
97                 ldb_set_errstring(module->ldb, "Out of Memory");
98                 talloc_free(h);
99                 return NULL;
100         }
101
102         h->private_data = (void *)ac;
103
104         h->state = LDB_ASYNC_INIT;
105         h->status = LDB_SUCCESS;
106
107         ac->module = module;
108         ac->context = req->context;
109         ac->callback = req->callback;
110
111         return h;
112 }
113
114 /*
115   form a TDB_DATA for a record key
116   caller frees
117
118   note that the key for a record can depend on whether the 
119   dn refers to a case sensitive index record or not
120 */
121 struct TDB_DATA ltdb_key(struct ldb_module *module, const struct ldb_dn *dn)
122 {
123         struct ldb_context *ldb = module->ldb;
124         TDB_DATA key;
125         char *key_str = NULL;
126         char *dn_folded = NULL;
127
128         /*
129           most DNs are case insensitive. The exception is index DNs for
130           case sensitive attributes
131
132           there are 3 cases dealt with in this code:
133
134           1) if the dn doesn't start with @ then uppercase the attribute
135              names and the attributes values of case insensitive attributes
136           2) if the dn starts with @ then leave it alone - the indexing code handles
137              the rest
138         */
139
140         dn_folded = ldb_dn_linearize_casefold(ldb, ldb, dn);
141         if (!dn_folded) {
142                 goto failed;
143         }
144
145         key_str = talloc_asprintf(ldb, "DN=%s", dn_folded);
146
147         talloc_free(dn_folded);
148
149         if (!key_str) {
150                 goto failed;
151         }
152
153         key.dptr = (uint8_t *)key_str;
154         key.dsize = strlen(key_str) + 1;
155
156         return key;
157
158 failed:
159         errno = ENOMEM;
160         key.dptr = NULL;
161         key.dsize = 0;
162         return key;
163 }
164
165 /*
166   check special dn's have valid attributes
167   currently only @ATTRIBUTES is checked
168 */
169 int ltdb_check_special_dn(struct ldb_module *module, const struct ldb_message *msg)
170 {
171         int i, j;
172  
173         if (! ldb_dn_is_special(msg->dn) ||
174             ! ldb_dn_check_special(msg->dn, LTDB_ATTRIBUTES)) {
175                 return 0;
176         }
177
178         /* we have @ATTRIBUTES, let's check attributes are fine */
179         /* should we check that we deny multivalued attributes ? */
180         for (i = 0; i < msg->num_elements; i++) {
181                 for (j = 0; j < msg->elements[i].num_values; j++) {
182                         if (ltdb_check_at_attributes_values(&msg->elements[i].values[j]) != 0) {
183                                 ldb_set_errstring(module->ldb, "Invalid attribute value in an @ATTRIBUTES entry");
184                                 return LDB_ERR_INVALID_ATTRIBUTE_SYNTAX;
185                         }
186                 }
187         }
188
189         return 0;
190 }
191
192
193 /*
194   we've made a modification to a dn - possibly reindex and 
195   update sequence number
196 */
197 static int ltdb_modified(struct ldb_module *module, const struct ldb_dn *dn)
198 {
199         int ret = 0;
200
201         if (ldb_dn_is_special(dn) &&
202             (ldb_dn_check_special(dn, LTDB_INDEXLIST) ||
203              ldb_dn_check_special(dn, LTDB_ATTRIBUTES)) ) {
204                 ret = ltdb_reindex(module);
205         }
206
207         if (ret == 0 &&
208             !(ldb_dn_is_special(dn) &&
209               ldb_dn_check_special(dn, LTDB_BASEINFO)) ) {
210                 ret = ltdb_increase_sequence_number(module);
211         }
212
213         return ret;
214 }
215
216 /*
217   store a record into the db
218 */
219 int ltdb_store(struct ldb_module *module, const struct ldb_message *msg, int flgs)
220 {
221         struct ltdb_private *ltdb =
222                 talloc_get_type(module->private_data, struct ltdb_private);
223         TDB_DATA tdb_key, tdb_data;
224         int ret;
225
226         tdb_key = ltdb_key(module, msg->dn);
227         if (!tdb_key.dptr) {
228                 return LDB_ERR_OTHER;
229         }
230
231         ret = ltdb_pack_data(module, msg, &tdb_data);
232         if (ret == -1) {
233                 talloc_free(tdb_key.dptr);
234                 return LDB_ERR_OTHER;
235         }
236
237         ret = tdb_store(ltdb->tdb, tdb_key, tdb_data, flgs);
238         if (ret == -1) {
239                 ret = ltdb_err_map(tdb_error(ltdb->tdb));
240                 goto done;
241         }
242         
243         ret = ltdb_index_add(module, msg);
244         if (ret == -1) {
245                 tdb_delete(ltdb->tdb, tdb_key);
246         }
247
248 done:
249         talloc_free(tdb_key.dptr);
250         talloc_free(tdb_data.dptr);
251
252         return ret;
253 }
254
255
256 static int ltdb_add_internal(struct ldb_module *module, const struct ldb_message *msg)
257 {
258         int ret;
259         
260         ret = ltdb_check_special_dn(module, msg);
261         if (ret != LDB_SUCCESS) {
262                 return ret;
263         }
264         
265         if (ltdb_cache_load(module) != 0) {
266                 return LDB_ERR_OPERATIONS_ERROR;
267         }
268
269         ret = ltdb_store(module, msg, TDB_INSERT);
270
271         if (ret == LDB_ERR_ENTRY_ALREADY_EXISTS) {
272                 char *dn;
273
274                 dn = ldb_dn_linearize(module, msg->dn);
275                 if (!dn) {
276                         return ret;
277                 }
278                 ldb_asprintf_errstring(module->ldb, "Entry %s already exists", dn);
279                 talloc_free(dn);
280                 return ret;
281         }
282         
283         if (ret == LDB_SUCCESS) {
284                 ret = ltdb_modified(module, msg->dn);
285                 if (ret != LDB_SUCCESS) {
286                         return LDB_ERR_OPERATIONS_ERROR;
287                 }
288         }
289
290         return ret;
291 }
292
293 /*
294   add a record to the database
295 */
296 static int ltdb_add(struct ldb_module *module, struct ldb_request *req)
297 {
298         struct ltdb_private *ltdb = talloc_get_type(module->private_data, struct ltdb_private);
299         struct ltdb_context *ltdb_ac;
300         int tret, ret = LDB_SUCCESS;
301
302         if (req->controls != NULL) {
303                 ldb_debug(module->ldb, LDB_DEBUG_WARNING, "Controls should not reach the ldb_tdb backend!\n");
304                 if (check_critical_controls(req->controls)) {
305                         return LDB_ERR_UNSUPPORTED_CRITICAL_EXTENSION;
306                 }
307         }
308         
309         req->handle = init_ltdb_handle(ltdb, module, req);
310         if (req->handle == NULL) {
311                 return LDB_ERR_OPERATIONS_ERROR;
312         }
313         ltdb_ac = talloc_get_type(req->handle->private_data, struct ltdb_context);
314
315         tret = ltdb_add_internal(module, req->op.add.message);
316         if (tret != LDB_SUCCESS) {
317                 req->handle->status = tret;
318                 goto done;
319         }
320         
321         if (ltdb_ac->callback) {
322                 ret = ltdb_ac->callback(module->ldb, ltdb_ac->context, NULL);
323         }
324 done:
325         req->handle->state = LDB_ASYNC_DONE;
326         return ret;
327 }
328
329 /*
330   delete a record from the database, not updating indexes (used for deleting
331   index records)
332 */
333 int ltdb_delete_noindex(struct ldb_module *module, const struct ldb_dn *dn)
334 {
335         struct ltdb_private *ltdb =
336                 talloc_get_type(module->private_data, struct ltdb_private);
337         TDB_DATA tdb_key;
338         int ret;
339
340         tdb_key = ltdb_key(module, dn);
341         if (!tdb_key.dptr) {
342                 return LDB_ERR_OTHER;
343         }
344
345         ret = tdb_delete(ltdb->tdb, tdb_key);
346         talloc_free(tdb_key.dptr);
347
348         if (ret != 0) {
349                 ret = ltdb_err_map(tdb_error(ltdb->tdb));
350         }
351
352         return ret;
353 }
354
355 static int ltdb_delete_internal(struct ldb_module *module, const struct ldb_dn *dn)
356 {
357         struct ldb_message *msg;
358         int ret;
359
360         msg = talloc(module, struct ldb_message);
361         if (msg == NULL) {
362                 return LDB_ERR_OPERATIONS_ERROR;
363         }
364
365         /* in case any attribute of the message was indexed, we need
366            to fetch the old record */
367         ret = ltdb_search_dn1(module, dn, msg);
368         if (ret != 1) {
369                 /* not finding the old record is an error */
370                 talloc_free(msg);
371                 return LDB_ERR_NO_SUCH_OBJECT;
372         }
373
374         ret = ltdb_delete_noindex(module, dn);
375         if (ret != LDB_SUCCESS) {
376                 talloc_free(msg);
377                 return LDB_ERR_NO_SUCH_OBJECT;
378         }
379
380         /* remove any indexed attributes */
381         ret = ltdb_index_del(module, msg);
382         if (ret != LDB_SUCCESS) {
383                 talloc_free(msg);
384                 return LDB_ERR_OPERATIONS_ERROR;
385         }
386
387         ret = ltdb_modified(module, dn);
388         if (ret != LDB_SUCCESS) {
389                 return LDB_ERR_OPERATIONS_ERROR;
390         }
391
392         talloc_free(msg);
393         return LDB_SUCCESS;
394 }
395
396 /*
397   delete a record from the database
398 */
399 static int ltdb_delete(struct ldb_module *module, struct ldb_request *req)
400 {
401         struct ltdb_private *ltdb = talloc_get_type(module->private_data, struct ltdb_private);
402         struct ltdb_context *ltdb_ac;
403         int tret, ret = LDB_SUCCESS;
404
405         if (req->controls != NULL) {
406                 ldb_debug(module->ldb, LDB_DEBUG_WARNING, "Controls should not reach the ldb_tdb backend!\n");
407                 if (check_critical_controls(req->controls)) {
408                         return LDB_ERR_UNSUPPORTED_CRITICAL_EXTENSION;
409                 }
410         }
411         
412         req->handle = NULL;
413
414         if (ltdb_cache_load(module) != 0) {
415                 return LDB_ERR_OPERATIONS_ERROR;
416         }
417
418         req->handle = init_ltdb_handle(ltdb, module, req);
419         if (req->handle == NULL) {
420                 return LDB_ERR_OPERATIONS_ERROR;
421         }
422         ltdb_ac = talloc_get_type(req->handle->private_data, struct ltdb_context);
423
424         tret = ltdb_delete_internal(module, req->op.del.dn);
425         if (tret != LDB_SUCCESS) {
426                 req->handle->status = tret; 
427                 goto done;
428         }
429
430         if (ltdb_ac->callback) {
431                 ret = ltdb_ac->callback(module->ldb, ltdb_ac->context, NULL);
432         }
433 done:
434         req->handle->state = LDB_ASYNC_DONE;
435         return ret;
436 }
437
438 /*
439   find an element by attribute name. At the moment this does a linear search, it should
440   be re-coded to use a binary search once all places that modify records guarantee
441   sorted order
442
443   return the index of the first matching element if found, otherwise -1
444 */
445 static int find_element(const struct ldb_message *msg, const char *name)
446 {
447         unsigned int i;
448         for (i=0;i<msg->num_elements;i++) {
449                 if (ldb_attr_cmp(msg->elements[i].name, name) == 0) {
450                         return i;
451                 }
452         }
453         return -1;
454 }
455
456
457 /*
458   add an element to an existing record. Assumes a elements array that we
459   can call re-alloc on, and assumed that we can re-use the data pointers from the 
460   passed in additional values. Use with care!
461
462   returns 0 on success, -1 on failure (and sets errno)
463 */
464 static int msg_add_element(struct ldb_context *ldb,
465                            struct ldb_message *msg, struct ldb_message_element *el)
466 {
467         struct ldb_message_element *e2;
468         unsigned int i;
469
470         e2 = talloc_realloc(msg, msg->elements, struct ldb_message_element, 
471                               msg->num_elements+1);
472         if (!e2) {
473                 errno = ENOMEM;
474                 return -1;
475         }
476
477         msg->elements = e2;
478
479         e2 = &msg->elements[msg->num_elements];
480
481         e2->name = el->name;
482         e2->flags = el->flags;
483         e2->values = NULL;
484         if (el->num_values != 0) {
485                 e2->values = talloc_array(msg->elements, struct ldb_val, el->num_values);
486                 if (!e2->values) {
487                         errno = ENOMEM;
488                         return -1;
489                 }
490         }
491         for (i=0;i<el->num_values;i++) {
492                 e2->values[i] = el->values[i];
493         }
494         e2->num_values = el->num_values;
495
496         msg->num_elements++;
497
498         return 0;
499 }
500
501 /*
502   delete all elements having a specified attribute name
503 */
504 static int msg_delete_attribute(struct ldb_module *module,
505                                 struct ldb_context *ldb,
506                                 struct ldb_message *msg, const char *name)
507 {
508         char *dn;
509         unsigned int i, j;
510
511         dn = ldb_dn_linearize(ldb, msg->dn);
512         if (dn == NULL) {
513                 return -1;
514         }
515
516         for (i=0;i<msg->num_elements;i++) {
517                 if (ldb_attr_cmp(msg->elements[i].name, name) == 0) {
518                         for (j=0;j<msg->elements[i].num_values;j++) {
519                                 ltdb_index_del_value(module, dn, &msg->elements[i], j);
520                         }
521                         talloc_free(msg->elements[i].values);
522                         if (msg->num_elements > (i+1)) {
523                                 memmove(&msg->elements[i], 
524                                         &msg->elements[i+1], 
525                                         sizeof(struct ldb_message_element)*
526                                         (msg->num_elements - (i+1)));
527                         }
528                         msg->num_elements--;
529                         i--;
530                         msg->elements = talloc_realloc(msg, msg->elements, 
531                                                          struct ldb_message_element, 
532                                                          msg->num_elements);
533                 }
534         }
535
536         talloc_free(dn);
537         return 0;
538 }
539
540 /*
541   delete all elements matching an attribute name/value 
542
543   return 0 on success, -1 on failure
544 */
545 static int msg_delete_element(struct ldb_module *module,
546                               struct ldb_message *msg, 
547                               const char *name,
548                               const struct ldb_val *val)
549 {
550         struct ldb_context *ldb = module->ldb;
551         unsigned int i;
552         int found;
553         struct ldb_message_element *el;
554         const struct ldb_attrib_handler *h;
555
556         found = find_element(msg, name);
557         if (found == -1) {
558                 return -1;
559         }
560
561         el = &msg->elements[found];
562
563         h = ldb_attrib_handler(ldb, el->name);
564
565         for (i=0;i<el->num_values;i++) {
566                 if (h->comparison_fn(ldb, ldb, &el->values[i], val) == 0) {
567                         if (i<el->num_values-1) {
568                                 memmove(&el->values[i], &el->values[i+1],
569                                         sizeof(el->values[i])*(el->num_values-(i+1)));
570                         }
571                         el->num_values--;
572                         if (el->num_values == 0) {
573                                 return msg_delete_attribute(module, ldb, msg, name);
574                         }
575                         return 0;
576                 }
577         }
578
579         return -1;
580 }
581
582
583 /*
584   modify a record - internal interface
585
586   yuck - this is O(n^2). Luckily n is usually small so we probably
587   get away with it, but if we ever have really large attribute lists 
588   then we'll need to look at this again
589 */
590 int ltdb_modify_internal(struct ldb_module *module, const struct ldb_message *msg)
591 {
592         struct ldb_context *ldb = module->ldb;
593         struct ltdb_private *ltdb =
594                 talloc_get_type(module->private_data, struct ltdb_private);
595         TDB_DATA tdb_key, tdb_data;
596         struct ldb_message *msg2;
597         unsigned i, j;
598         int ret;
599
600         tdb_key = ltdb_key(module, msg->dn);
601         if (!tdb_key.dptr) {
602                 return LDB_ERR_OTHER;
603         }
604
605         tdb_data = tdb_fetch(ltdb->tdb, tdb_key);
606         if (!tdb_data.dptr) {
607                 talloc_free(tdb_key.dptr);
608                 return ltdb_err_map(tdb_error(ltdb->tdb));
609         }
610
611         msg2 = talloc(tdb_key.dptr, struct ldb_message);
612         if (msg2 == NULL) {
613                 talloc_free(tdb_key.dptr);
614                 return LDB_ERR_OTHER;
615         }
616
617         ret = ltdb_unpack_data(module, &tdb_data, msg2);
618         if (ret == -1) {
619                 ret = LDB_ERR_OTHER;
620                 goto failed;
621         }
622
623         if (!msg2->dn) {
624                 msg2->dn = msg->dn;
625         }
626
627         for (i=0;i<msg->num_elements;i++) {
628                 struct ldb_message_element *el = &msg->elements[i];
629                 struct ldb_message_element *el2;
630                 struct ldb_val *vals;
631                 char *dn;
632
633                 switch (msg->elements[i].flags & LDB_FLAG_MOD_MASK) {
634
635                 case LDB_FLAG_MOD_ADD:
636                         /* add this element to the message. fail if it
637                            already exists */
638                         ret = find_element(msg2, el->name);
639
640                         if (ret == -1) {
641                                 if (msg_add_element(ldb, msg2, el) != 0) {
642                                         ret = LDB_ERR_OTHER;
643                                         goto failed;
644                                 }
645                                 continue;
646                         }
647
648                         el2 = &msg2->elements[ret];
649
650                         /* An attribute with this name already exists, add all
651                          * values if they don't already exist. */
652
653                         for (j=0;j<el->num_values;j++) {
654                                 if (ldb_msg_find_val(el2, &el->values[j])) {
655                                         ldb_set_errstring(module->ldb, "Type or value exists");
656                                         ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
657                                         goto failed;
658                                 }
659                         }
660
661                         vals = talloc_realloc(msg2->elements, el2->values, struct ldb_val,
662                                                 el2->num_values + el->num_values);
663
664                         if (vals == NULL) {
665                                 ret = LDB_ERR_OTHER;
666                                 goto failed;
667                         }
668
669                         for (j=0;j<el->num_values;j++) {
670                                 vals[el2->num_values + j] =
671                                         ldb_val_dup(vals, &el->values[j]);
672                         }
673
674                         el2->values = vals;
675                         el2->num_values += el->num_values;
676
677                         break;
678
679                 case LDB_FLAG_MOD_REPLACE:
680                         /* replace all elements of this attribute name with the elements
681                            listed. The attribute not existing is not an error */
682                         msg_delete_attribute(module, ldb, msg2, msg->elements[i].name);
683
684                         /* add the replacement element, if not empty */
685                         if (msg->elements[i].num_values != 0 &&
686                             msg_add_element(ldb, msg2, &msg->elements[i]) != 0) {
687                                 ret = LDB_ERR_OTHER;
688                                 goto failed;
689                         }
690                         break;
691
692                 case LDB_FLAG_MOD_DELETE:
693
694                         dn = ldb_dn_linearize(msg2, msg->dn);
695                         if (dn == NULL) {
696                                 ret = LDB_ERR_OTHER;
697                                 goto failed;
698                         }
699
700                         /* we could be being asked to delete all
701                            values or just some values */
702                         if (msg->elements[i].num_values == 0) {
703                                 if (msg_delete_attribute(module, ldb, msg2, 
704                                                          msg->elements[i].name) != 0) {
705                                         ldb_asprintf_errstring(module->ldb, "No such attribute: %s for delete on %s", msg->elements[i].name, dn);
706                                         ret = LDB_ERR_NO_SUCH_ATTRIBUTE;
707                                         goto failed;
708                                 }
709                                 break;
710                         }
711                         for (j=0;j<msg->elements[i].num_values;j++) {
712                                 if (msg_delete_element(module,
713                                                        msg2, 
714                                                        msg->elements[i].name,
715                                                        &msg->elements[i].values[j]) != 0) {
716                                         ldb_asprintf_errstring(module->ldb, "No matching attribute value when deleting attribute: %s on %s", msg->elements[i].name, dn);
717                                         ret = LDB_ERR_NO_SUCH_ATTRIBUTE;
718                                         goto failed;
719                                 }
720                                 if (ltdb_index_del_value(module, dn, &msg->elements[i], j) != 0) {
721                                         ret = LDB_ERR_OTHER;
722                                         goto failed;
723                                 }
724                         }
725                         break;
726                 default:
727                         ldb_asprintf_errstring(module->ldb, "Invalid ldb_modify flags on %s: 0x%x", 
728                                                              msg->elements[i].name, 
729                                                              msg->elements[i].flags & LDB_FLAG_MOD_MASK);
730                         ret = LDB_ERR_PROTOCOL_ERROR;
731                         goto failed;
732                 }
733         }
734
735         /* we've made all the mods - save the modified record back into the database */
736         ret = ltdb_store(module, msg2, TDB_MODIFY);
737         if (ret != LDB_SUCCESS) {
738                 goto failed;
739         }
740
741         if (ltdb_modified(module, msg->dn) != LDB_SUCCESS) {
742                 ret = LDB_ERR_OPERATIONS_ERROR;
743                 goto failed;
744         }
745
746         talloc_free(tdb_key.dptr);
747         free(tdb_data.dptr);
748         return ret;
749
750 failed:
751         talloc_free(tdb_key.dptr);
752         free(tdb_data.dptr);
753         return ret;
754 }
755
756 /*
757   modify a record
758 */
759 static int ltdb_modify(struct ldb_module *module, struct ldb_request *req)
760 {
761         struct ltdb_private *ltdb = talloc_get_type(module->private_data, struct ltdb_private);
762         struct ltdb_context *ltdb_ac;
763         int tret, ret = LDB_SUCCESS;
764
765         if (req->controls != NULL) {
766                 ldb_debug(module->ldb, LDB_DEBUG_WARNING, "Controls should not reach the ldb_tdb backend!\n");
767                 if (check_critical_controls(req->controls)) {
768                         return LDB_ERR_UNSUPPORTED_CRITICAL_EXTENSION;
769                 }
770         }
771         
772         req->handle = NULL;
773
774         req->handle = init_ltdb_handle(ltdb, module, req);
775         if (req->handle == NULL) {
776                 return LDB_ERR_OPERATIONS_ERROR;
777         }
778         ltdb_ac = talloc_get_type(req->handle->private_data, struct ltdb_context);
779
780         tret = ltdb_check_special_dn(module, req->op.mod.message);
781         if (tret != LDB_SUCCESS) {
782                 req->handle->status = tret;
783                 goto done;
784         }
785         
786         if (ltdb_cache_load(module) != 0) {
787                 ret = LDB_ERR_OPERATIONS_ERROR;
788                 goto done;
789         }
790
791         tret = ltdb_modify_internal(module, req->op.mod.message);
792         if (tret != LDB_SUCCESS) {
793                 req->handle->status = tret;
794                 goto done;
795         }
796
797         if (ltdb_ac->callback) {
798                 ret = ltdb_ac->callback(module->ldb, ltdb_ac->context, NULL);
799         }
800 done:
801         req->handle->state = LDB_ASYNC_DONE;
802         return ret;
803 }
804
805 /*
806   rename a record
807 */
808 static int ltdb_rename(struct ldb_module *module, struct ldb_request *req)
809 {
810         struct ltdb_private *ltdb = talloc_get_type(module->private_data, struct ltdb_private);
811         struct ltdb_context *ltdb_ac;
812         struct ldb_message *msg;
813         int tret, ret = LDB_SUCCESS;
814
815         if (req->controls != NULL) {
816                 ldb_debug(module->ldb, LDB_DEBUG_WARNING, "Controls should not reach the ldb_tdb backend!\n");
817                 if (check_critical_controls(req->controls)) {
818                         return LDB_ERR_UNSUPPORTED_CRITICAL_EXTENSION;
819                 }
820         }
821         
822         req->handle = NULL;
823
824         if (ltdb_cache_load(module) != 0) {
825                 return LDB_ERR_OPERATIONS_ERROR;
826         }
827
828         req->handle = init_ltdb_handle(ltdb, module, req);
829         if (req->handle == NULL) {
830                 return LDB_ERR_OPERATIONS_ERROR;
831         }
832         ltdb_ac = talloc_get_type(req->handle->private_data, struct ltdb_context);
833
834         msg = talloc(ltdb_ac, struct ldb_message);
835         if (msg == NULL) {
836                 ret = LDB_ERR_OPERATIONS_ERROR;
837                 goto done;
838         }
839
840         /* in case any attribute of the message was indexed, we need
841            to fetch the old record */
842         tret = ltdb_search_dn1(module, req->op.rename.olddn, msg);
843         if (tret != 1) {
844                 /* not finding the old record is an error */
845                 req->handle->status = LDB_ERR_NO_SUCH_OBJECT;
846                 goto done;
847         }
848
849         msg->dn = ldb_dn_copy(msg, req->op.rename.newdn);
850         if (!msg->dn) {
851                 ret = LDB_ERR_OPERATIONS_ERROR;
852                 goto done;
853         }
854
855         tret = ltdb_add_internal(module, msg);
856         if (tret != LDB_SUCCESS) {
857                 ret = LDB_ERR_OPERATIONS_ERROR;
858                 goto done;
859         }
860
861         tret = ltdb_delete_internal(module, req->op.rename.olddn);
862         if (tret != LDB_SUCCESS) {
863                 ltdb_delete_internal(module, req->op.rename.newdn);
864                 ret = LDB_ERR_OPERATIONS_ERROR;
865                 goto done;
866         }
867
868         if (ltdb_ac->callback) {
869                 ret = ltdb_ac->callback(module->ldb, ltdb_ac->context, NULL);
870         }
871 done:
872         req->handle->state = LDB_ASYNC_DONE;
873         return ret;
874 }
875
876 static int ltdb_start_trans(struct ldb_module *module)
877 {
878         struct ltdb_private *ltdb =
879                 talloc_get_type(module->private_data, struct ltdb_private);
880
881         if (tdb_transaction_start(ltdb->tdb) != 0) {
882                 return ltdb_err_map(tdb_error(ltdb->tdb));
883         }
884
885         return LDB_SUCCESS;
886 }
887
888 static int ltdb_end_trans(struct ldb_module *module)
889 {
890         struct ltdb_private *ltdb =
891                 talloc_get_type(module->private_data, struct ltdb_private);
892
893         if (tdb_transaction_commit(ltdb->tdb) != 0) {
894                 return ltdb_err_map(tdb_error(ltdb->tdb));
895         }
896
897         return LDB_SUCCESS;
898 }
899
900 static int ltdb_del_trans(struct ldb_module *module)
901 {
902         struct ltdb_private *ltdb =
903                 talloc_get_type(module->private_data, struct ltdb_private);
904
905         if (tdb_transaction_cancel(ltdb->tdb) != 0) {
906                 return ltdb_err_map(tdb_error(ltdb->tdb));
907         }
908
909         return LDB_SUCCESS;
910 }
911
912 static int ltdb_wait(struct ldb_handle *handle, enum ldb_wait_type type)
913 {
914         return handle->status;
915 }
916
917 static int ltdb_request(struct ldb_module *module, struct ldb_request *req)
918 {
919         /* check for oustanding critical controls and return an error if found */
920         if (req->controls != NULL) {
921                 ldb_debug(module->ldb, LDB_DEBUG_WARNING, "Controls should not reach the ldb_tdb backend!\n");
922                 if (check_critical_controls(req->controls)) {
923                         return LDB_ERR_UNSUPPORTED_CRITICAL_EXTENSION;
924                 }
925         }
926         
927         /* search, add, modify, delete, rename are handled by their own, no other op supported */
928         return LDB_ERR_OPERATIONS_ERROR;
929 }
930
931 /*
932   return sequenceNumber from @BASEINFO
933 */
934 static int ltdb_sequence_number(struct ldb_module *module, struct ldb_request *req)
935 {
936         TALLOC_CTX *tmp_ctx = talloc_new(req);
937         struct ldb_message *msg = NULL;
938         struct ldb_dn *dn = ldb_dn_explode(tmp_ctx, LTDB_BASEINFO);
939         int tret;
940
941         if (tmp_ctx == NULL) {
942                 talloc_free(tmp_ctx);
943                 return LDB_ERR_OPERATIONS_ERROR;
944         }
945
946         msg = talloc(tmp_ctx, struct ldb_message);
947         if (msg == NULL) {
948                 talloc_free(tmp_ctx);
949                 return LDB_ERR_OPERATIONS_ERROR;
950         }
951
952         req->op.seq_num.flags = 0;
953
954         tret = ltdb_search_dn1(module, dn, msg);
955         if (tret != 1) {
956                 talloc_free(tmp_ctx);
957                 req->op.seq_num.seq_num = 0;
958                 /* zero is as good as anything when we don't know */
959                 return LDB_SUCCESS;
960         }
961
962         switch (req->op.seq_num.type) {
963         case LDB_SEQ_HIGHEST_SEQ:
964                 req->op.seq_num.seq_num = ldb_msg_find_attr_as_uint64(msg, LTDB_SEQUENCE_NUMBER, 0);
965                 break;
966         case LDB_SEQ_NEXT:
967                 req->op.seq_num.seq_num = ldb_msg_find_attr_as_uint64(msg, LTDB_SEQUENCE_NUMBER, 0);
968                 req->op.seq_num.seq_num++;
969                 break;
970         case LDB_SEQ_HIGHEST_TIMESTAMP:
971         {
972                 const char *date = ldb_msg_find_attr_as_string(msg, LTDB_MOD_TIMESTAMP, NULL);
973                 if (date) {
974                         req->op.seq_num.seq_num = ldb_string_to_time(date);
975                 } else {
976                         req->op.seq_num.seq_num = 0;
977                         /* zero is as good as anything when we don't know */
978                 }
979                 break;
980         }
981         }
982         talloc_free(tmp_ctx);
983         return LDB_SUCCESS;
984 }
985
986 static const struct ldb_module_ops ltdb_ops = {
987         .name              = "tdb",
988         .search            = ltdb_search,
989         .add               = ltdb_add,
990         .modify            = ltdb_modify,
991         .del               = ltdb_delete,
992         .rename            = ltdb_rename,
993         .request           = ltdb_request,
994         .start_transaction = ltdb_start_trans,
995         .end_transaction   = ltdb_end_trans,
996         .del_transaction   = ltdb_del_trans,
997         .wait              = ltdb_wait,
998         .sequence_number   = ltdb_sequence_number
999 };
1000
1001 /*
1002   connect to the database
1003 */
1004 static int ltdb_connect(struct ldb_context *ldb, const char *url, 
1005                         unsigned int flags, const char *options[],
1006                         struct ldb_module **module)
1007 {
1008         const char *path;
1009         int tdb_flags, open_flags;
1010         struct ltdb_private *ltdb;
1011
1012         /* parse the url */
1013         if (strchr(url, ':')) {
1014                 if (strncmp(url, "tdb://", 6) != 0) {
1015                         ldb_debug(ldb, LDB_DEBUG_ERROR, "Invalid tdb URL '%s'", url);
1016                         return -1;
1017                 }
1018                 path = url+6;
1019         } else {
1020                 path = url;
1021         }
1022
1023         tdb_flags = TDB_DEFAULT;
1024
1025         /* check for the 'nosync' option */
1026         if (flags & LDB_FLG_NOSYNC) {
1027                 tdb_flags |= TDB_NOSYNC;
1028         }
1029
1030         if (flags & LDB_FLG_RDONLY) {
1031                 open_flags = O_RDONLY;
1032         } else {
1033                 open_flags = O_CREAT | O_RDWR;
1034         }
1035
1036         ltdb = talloc_zero(ldb, struct ltdb_private);
1037         if (!ltdb) {
1038                 ldb_oom(ldb);
1039                 return -1;
1040         }
1041
1042         /* note that we use quite a large default hash size */
1043         ltdb->tdb = ltdb_wrap_open(ltdb, path, 10000, 
1044                                    tdb_flags, open_flags, 
1045                                    ldb->create_perms, ldb);
1046         if (!ltdb->tdb) {
1047                 ldb_debug(ldb, LDB_DEBUG_ERROR, "Unable to open tdb '%s'\n", path);
1048                 talloc_free(ltdb);
1049                 return -1;
1050         }
1051
1052         ltdb->sequence_number = 0;
1053
1054         *module = talloc(ldb, struct ldb_module);
1055         if (!module) {
1056                 ldb_oom(ldb);
1057                 talloc_free(ltdb);
1058                 return -1;
1059         }
1060         (*module)->ldb = ldb;
1061         (*module)->prev = (*module)->next = NULL;
1062         (*module)->private_data = ltdb;
1063         (*module)->ops = &ltdb_ops;
1064
1065         return 0;
1066 }
1067
1068 int ldb_tdb_init(void)
1069 {
1070         return ldb_register_backend("tdb", ltdb_connect);
1071 }