335e7d540ef0b31695231c702708bc9e050b8634
[metze/samba/wip.git] / source4 / lib / ldb / ldb_tdb / ldb_tdb.c
1 /* 
2    ldb database library
3
4    Copyright (C) Andrew Tridgell  2004
5    Copyright (C) Stefan Metzmacher  2004
6    Copyright (C) Simo Sorce       2006
7    
8
9      ** NOTE! The following LGPL license applies to the ldb
10      ** library. This does NOT imply that all of Samba is released
11      ** under the LGPL
12    
13    This library is free software; you can redistribute it and/or
14    modify it under the terms of the GNU Lesser General Public
15    License as published by the Free Software Foundation; either
16    version 2 of the License, or (at your option) any later version.
17
18    This library is distributed in the hope that it will be useful,
19    but WITHOUT ANY WARRANTY; without even the implied warranty of
20    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
21    Lesser General Public License for more details.
22
23    You should have received a copy of the GNU Lesser General Public
24    License along with this library; if not, write to the Free Software
25    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
26 */
27
28 /*
29  *  Name: ldb_tdb
30  *
31  *  Component: ldb tdb backend
32  *
33  *  Description: core functions for tdb backend
34  *
35  *  Author: Andrew Tridgell
36  *  Author: Stefan Metzmacher
37  *
38  *  Modifications:
39  *
40  *  - description: make the module use asyncronous calls
41  *    date: Feb 2006
42  *    Author: Simo Sorce
43  */
44
45 #include "ldb_includes.h"
46
47 #include "ldb_tdb.h"
48
49
50 /*
51   map a tdb error code to a ldb error code
52 */
53 static int ltdb_err_map(enum TDB_ERROR tdb_code)
54 {
55         switch (tdb_code) {
56         case TDB_SUCCESS:
57                 return LDB_SUCCESS;
58         case TDB_ERR_CORRUPT:
59         case TDB_ERR_OOM:
60         case TDB_ERR_EINVAL:
61                 return LDB_ERR_OPERATIONS_ERROR;
62         case TDB_ERR_IO:
63                 return LDB_ERR_PROTOCOL_ERROR;
64         case TDB_ERR_LOCK:
65         case TDB_ERR_NOLOCK:
66                 return LDB_ERR_BUSY;
67         case TDB_ERR_LOCK_TIMEOUT:
68                 return LDB_ERR_TIME_LIMIT_EXCEEDED;
69         case TDB_ERR_EXISTS:
70                 return LDB_ERR_ENTRY_ALREADY_EXISTS;
71         case TDB_ERR_NOEXIST:
72                 return LDB_ERR_NO_SUCH_OBJECT;
73         case TDB_ERR_RDONLY:
74                 return LDB_ERR_INSUFFICIENT_ACCESS_RIGHTS;
75         }
76         return LDB_ERR_OTHER;
77 }
78
79
80 struct ldb_handle *init_ltdb_handle(struct ltdb_private *ltdb, struct ldb_module *module,
81                                     struct ldb_request *req)
82 {
83         struct ltdb_context *ac;
84         struct ldb_handle *h;
85
86         h = talloc_zero(req, struct ldb_handle);
87         if (h == NULL) {
88                 ldb_set_errstring(module->ldb, "Out of Memory");
89                 return NULL;
90         }
91
92         h->module = module;
93
94         ac = talloc_zero(h, struct ltdb_context);
95         if (ac == NULL) {
96                 ldb_set_errstring(module->ldb, "Out of Memory");
97                 talloc_free(h);
98                 return NULL;
99         }
100
101         h->private_data = (void *)ac;
102
103         h->state = LDB_ASYNC_INIT;
104         h->status = LDB_SUCCESS;
105
106         ac->module = module;
107         ac->context = req->context;
108         ac->callback = req->callback;
109
110         return h;
111 }
112
113 /*
114   form a TDB_DATA for a record key
115   caller frees
116
117   note that the key for a record can depend on whether the 
118   dn refers to a case sensitive index record or not
119 */
120 struct TDB_DATA ltdb_key(struct ldb_module *module, struct ldb_dn *dn)
121 {
122         struct ldb_context *ldb = module->ldb;
123         TDB_DATA key;
124         char *key_str = NULL;
125         const char *dn_folded = NULL;
126
127         /*
128           most DNs are case insensitive. The exception is index DNs for
129           case sensitive attributes
130
131           there are 3 cases dealt with in this code:
132
133           1) if the dn doesn't start with @ then uppercase the attribute
134              names and the attributes values of case insensitive attributes
135           2) if the dn starts with @ then leave it alone - the indexing code handles
136              the rest
137         */
138
139         dn_folded = ldb_dn_get_casefold(dn);
140         if (!dn_folded) {
141                 goto failed;
142         }
143
144         key_str = talloc_strdup(ldb, "DN=");
145         if (!key_str) {
146                 goto failed;
147         }
148
149         key_str = talloc_append_string(ldb, key_str, dn_folded);
150         if (!key_str) {
151                 goto failed;
152         }
153
154         key.dptr = (uint8_t *)key_str;
155         key.dsize = strlen(key_str) + 1;
156
157         return key;
158
159 failed:
160         errno = ENOMEM;
161         key.dptr = NULL;
162         key.dsize = 0;
163         return key;
164 }
165
166 /*
167   check special dn's have valid attributes
168   currently only @ATTRIBUTES is checked
169 */
170 int ltdb_check_special_dn(struct ldb_module *module, const struct ldb_message *msg)
171 {
172         int i, j;
173  
174         if (! ldb_dn_is_special(msg->dn) ||
175             ! ldb_dn_check_special(msg->dn, LTDB_ATTRIBUTES)) {
176                 return 0;
177         }
178
179         /* we have @ATTRIBUTES, let's check attributes are fine */
180         /* should we check that we deny multivalued attributes ? */
181         for (i = 0; i < msg->num_elements; i++) {
182                 for (j = 0; j < msg->elements[i].num_values; j++) {
183                         if (ltdb_check_at_attributes_values(&msg->elements[i].values[j]) != 0) {
184                                 ldb_set_errstring(module->ldb, "Invalid attribute value in an @ATTRIBUTES entry");
185                                 return LDB_ERR_INVALID_ATTRIBUTE_SYNTAX;
186                         }
187                 }
188         }
189
190         return 0;
191 }
192
193
194 /*
195   we've made a modification to a dn - possibly reindex and 
196   update sequence number
197 */
198 static int ltdb_modified(struct ldb_module *module, struct ldb_dn *dn)
199 {
200         int ret = LDB_SUCCESS;
201
202         if (ldb_dn_is_special(dn) &&
203             (ldb_dn_check_special(dn, LTDB_INDEXLIST) ||
204              ldb_dn_check_special(dn, LTDB_ATTRIBUTES)) ) {
205                 ret = ltdb_reindex(module);
206         }
207
208         if (ret == LDB_SUCCESS &&
209             !(ldb_dn_is_special(dn) &&
210               ldb_dn_check_special(dn, LTDB_BASEINFO)) ) {
211                 ret = ltdb_increase_sequence_number(module);
212         }
213
214         return ret;
215 }
216
217 /*
218   store a record into the db
219 */
220 int ltdb_store(struct ldb_module *module, const struct ldb_message *msg, int flgs)
221 {
222         struct ltdb_private *ltdb =
223                 talloc_get_type(module->private_data, struct ltdb_private);
224         TDB_DATA tdb_key, tdb_data;
225         int ret;
226
227         tdb_key = ltdb_key(module, msg->dn);
228         if (!tdb_key.dptr) {
229                 return LDB_ERR_OTHER;
230         }
231
232         ret = ltdb_pack_data(module, msg, &tdb_data);
233         if (ret == -1) {
234                 talloc_free(tdb_key.dptr);
235                 return LDB_ERR_OTHER;
236         }
237
238         ret = tdb_store(ltdb->tdb, tdb_key, tdb_data, flgs);
239         if (ret == -1) {
240                 ret = ltdb_err_map(tdb_error(ltdb->tdb));
241                 goto done;
242         }
243         
244         ret = ltdb_index_add(module, msg);
245         if (ret != LDB_SUCCESS) {
246                 tdb_delete(ltdb->tdb, tdb_key);
247         }
248
249 done:
250         talloc_free(tdb_key.dptr);
251         talloc_free(tdb_data.dptr);
252
253         return ret;
254 }
255
256
257 static int ltdb_add_internal(struct ldb_module *module, const struct ldb_message *msg)
258 {
259         int ret;
260         
261         ret = ltdb_check_special_dn(module, msg);
262         if (ret != LDB_SUCCESS) {
263                 return ret;
264         }
265         
266         if (ltdb_cache_load(module) != 0) {
267                 return LDB_ERR_OPERATIONS_ERROR;
268         }
269
270         ret = ltdb_store(module, msg, TDB_INSERT);
271
272         if (ret == LDB_ERR_ENTRY_ALREADY_EXISTS) {
273                 ldb_asprintf_errstring(module->ldb, "Entry %s already exists", ldb_dn_get_linearized(msg->dn));
274                 return ret;
275         }
276         
277         if (ret == LDB_SUCCESS) {
278                 ret = ltdb_index_one(module, msg, 1);
279                 if (ret != LDB_SUCCESS) {
280                         return ret;
281                 }
282
283                 ret = ltdb_modified(module, msg->dn);
284                 if (ret != LDB_SUCCESS) {
285                         return ret;
286                 }
287         }
288
289         return ret;
290 }
291
292 /*
293   add a record to the database
294 */
295 static int ltdb_add(struct ldb_module *module, struct ldb_request *req)
296 {
297         struct ltdb_private *ltdb = talloc_get_type(module->private_data, struct ltdb_private);
298         struct ltdb_context *ltdb_ac;
299         int tret, ret = LDB_SUCCESS;
300
301         if (check_critical_controls(req->controls)) {
302                 return LDB_ERR_UNSUPPORTED_CRITICAL_EXTENSION;
303         }
304         
305         req->handle = init_ltdb_handle(ltdb, module, req);
306         if (req->handle == NULL) {
307                 return LDB_ERR_OPERATIONS_ERROR;
308         }
309         ltdb_ac = talloc_get_type(req->handle->private_data, struct ltdb_context);
310
311         tret = ltdb_add_internal(module, req->op.add.message);
312         if (tret != LDB_SUCCESS) {
313                 req->handle->status = tret;
314                 goto done;
315         }
316         
317         if (ltdb_ac->callback) {
318                 ret = ltdb_ac->callback(module->ldb, ltdb_ac->context, NULL);
319         }
320 done:
321         req->handle->state = LDB_ASYNC_DONE;
322         return ret;
323 }
324
325 /*
326   delete a record from the database, not updating indexes (used for deleting
327   index records)
328 */
329 int ltdb_delete_noindex(struct ldb_module *module, struct ldb_dn *dn)
330 {
331         struct ltdb_private *ltdb =
332                 talloc_get_type(module->private_data, struct ltdb_private);
333         TDB_DATA tdb_key;
334         int ret;
335
336         tdb_key = ltdb_key(module, dn);
337         if (!tdb_key.dptr) {
338                 return LDB_ERR_OTHER;
339         }
340
341         ret = tdb_delete(ltdb->tdb, tdb_key);
342         talloc_free(tdb_key.dptr);
343
344         if (ret != 0) {
345                 ret = ltdb_err_map(tdb_error(ltdb->tdb));
346         }
347
348         return ret;
349 }
350
351 static int ltdb_delete_internal(struct ldb_module *module, struct ldb_dn *dn)
352 {
353         struct ldb_message *msg;
354         int ret;
355
356         msg = talloc(module, struct ldb_message);
357         if (msg == NULL) {
358                 return LDB_ERR_OPERATIONS_ERROR;
359         }
360
361         /* in case any attribute of the message was indexed, we need
362            to fetch the old record */
363         ret = ltdb_search_dn1(module, dn, msg);
364         if (ret != LDB_SUCCESS) {
365                 /* not finding the old record is an error */
366                 goto done;
367         }
368
369         ret = ltdb_delete_noindex(module, dn);
370         if (ret != LDB_SUCCESS) {
371                 goto done;
372         }
373
374         /* remove one level attribute */
375         ret = ltdb_index_one(module, msg, 0);
376         if (ret != LDB_SUCCESS) {
377                 goto done;
378         }
379
380         /* remove any indexed attributes */
381         ret = ltdb_index_del(module, msg);
382         if (ret != LDB_SUCCESS) {
383                 goto done;
384         }
385
386         ret = ltdb_modified(module, dn);
387         if (ret != LDB_SUCCESS) {
388                 goto done;
389         }
390
391 done:
392         talloc_free(msg);
393         return ret;
394 }
395
396 /*
397   delete a record from the database
398 */
399 static int ltdb_delete(struct ldb_module *module, struct ldb_request *req)
400 {
401         struct ltdb_private *ltdb = talloc_get_type(module->private_data, struct ltdb_private);
402         struct ltdb_context *ltdb_ac;
403         int tret, ret = LDB_SUCCESS;
404
405         if (check_critical_controls(req->controls)) {
406                 return LDB_ERR_UNSUPPORTED_CRITICAL_EXTENSION;
407         }
408         
409         req->handle = NULL;
410
411         if (ltdb_cache_load(module) != 0) {
412                 return LDB_ERR_OPERATIONS_ERROR;
413         }
414
415         req->handle = init_ltdb_handle(ltdb, module, req);
416         if (req->handle == NULL) {
417                 return LDB_ERR_OPERATIONS_ERROR;
418         }
419         ltdb_ac = talloc_get_type(req->handle->private_data, struct ltdb_context);
420
421         tret = ltdb_delete_internal(module, req->op.del.dn);
422         if (tret != LDB_SUCCESS) {
423                 req->handle->status = tret; 
424                 goto done;
425         }
426
427         if (ltdb_ac->callback) {
428                 ret = ltdb_ac->callback(module->ldb, ltdb_ac->context, NULL);
429         }
430 done:
431         req->handle->state = LDB_ASYNC_DONE;
432         return ret;
433 }
434
435 /*
436   find an element by attribute name. At the moment this does a linear search, it should
437   be re-coded to use a binary search once all places that modify records guarantee
438   sorted order
439
440   return the index of the first matching element if found, otherwise -1
441 */
442 static int find_element(const struct ldb_message *msg, const char *name)
443 {
444         unsigned int i;
445         for (i=0;i<msg->num_elements;i++) {
446                 if (ldb_attr_cmp(msg->elements[i].name, name) == 0) {
447                         return i;
448                 }
449         }
450         return -1;
451 }
452
453
454 /*
455   add an element to an existing record. Assumes a elements array that we
456   can call re-alloc on, and assumed that we can re-use the data pointers from the 
457   passed in additional values. Use with care!
458
459   returns 0 on success, -1 on failure (and sets errno)
460 */
461 static int msg_add_element(struct ldb_context *ldb,
462                            struct ldb_message *msg, struct ldb_message_element *el)
463 {
464         struct ldb_message_element *e2;
465         unsigned int i;
466
467         e2 = talloc_realloc(msg, msg->elements, struct ldb_message_element, 
468                               msg->num_elements+1);
469         if (!e2) {
470                 errno = ENOMEM;
471                 return -1;
472         }
473
474         msg->elements = e2;
475
476         e2 = &msg->elements[msg->num_elements];
477
478         e2->name = el->name;
479         e2->flags = el->flags;
480         e2->values = NULL;
481         if (el->num_values != 0) {
482                 e2->values = talloc_array(msg->elements, struct ldb_val, el->num_values);
483                 if (!e2->values) {
484                         errno = ENOMEM;
485                         return -1;
486                 }
487         }
488         for (i=0;i<el->num_values;i++) {
489                 e2->values[i] = el->values[i];
490         }
491         e2->num_values = el->num_values;
492
493         msg->num_elements++;
494
495         return 0;
496 }
497
498 /*
499   delete all elements having a specified attribute name
500 */
501 static int msg_delete_attribute(struct ldb_module *module,
502                                 struct ldb_context *ldb,
503                                 struct ldb_message *msg, const char *name)
504 {
505         const char *dn;
506         unsigned int i, j;
507
508         dn = ldb_dn_get_linearized(msg->dn);
509         if (dn == NULL) {
510                 return -1;
511         }
512
513         for (i=0;i<msg->num_elements;i++) {
514                 if (ldb_attr_cmp(msg->elements[i].name, name) == 0) {
515                         for (j=0;j<msg->elements[i].num_values;j++) {
516                                 ltdb_index_del_value(module, dn, &msg->elements[i], j);
517                         }
518                         talloc_free(msg->elements[i].values);
519                         if (msg->num_elements > (i+1)) {
520                                 memmove(&msg->elements[i], 
521                                         &msg->elements[i+1], 
522                                         sizeof(struct ldb_message_element)*
523                                         (msg->num_elements - (i+1)));
524                         }
525                         msg->num_elements--;
526                         i--;
527                         msg->elements = talloc_realloc(msg, msg->elements, 
528                                                          struct ldb_message_element, 
529                                                          msg->num_elements);
530                 }
531         }
532
533         return 0;
534 }
535
536 /*
537   delete all elements matching an attribute name/value 
538
539   return 0 on success, -1 on failure
540 */
541 static int msg_delete_element(struct ldb_module *module,
542                               struct ldb_message *msg, 
543                               const char *name,
544                               const struct ldb_val *val)
545 {
546         struct ldb_context *ldb = module->ldb;
547         unsigned int i;
548         int found;
549         struct ldb_message_element *el;
550         const struct ldb_schema_attribute *a;
551
552         found = find_element(msg, name);
553         if (found == -1) {
554                 return -1;
555         }
556
557         el = &msg->elements[found];
558
559         a = ldb_schema_attribute_by_name(ldb, el->name);
560
561         for (i=0;i<el->num_values;i++) {
562                 if (a->syntax->comparison_fn(ldb, ldb, &el->values[i], val) == 0) {
563                         if (i<el->num_values-1) {
564                                 memmove(&el->values[i], &el->values[i+1],
565                                         sizeof(el->values[i])*(el->num_values-(i+1)));
566                         }
567                         el->num_values--;
568                         if (el->num_values == 0) {
569                                 return msg_delete_attribute(module, ldb, msg, name);
570                         }
571                         return 0;
572                 }
573         }
574
575         return -1;
576 }
577
578
579 /*
580   modify a record - internal interface
581
582   yuck - this is O(n^2). Luckily n is usually small so we probably
583   get away with it, but if we ever have really large attribute lists 
584   then we'll need to look at this again
585 */
586 int ltdb_modify_internal(struct ldb_module *module, const struct ldb_message *msg)
587 {
588         struct ldb_context *ldb = module->ldb;
589         struct ltdb_private *ltdb =
590                 talloc_get_type(module->private_data, struct ltdb_private);
591         TDB_DATA tdb_key, tdb_data;
592         struct ldb_message *msg2;
593         unsigned i, j;
594         int ret, idx;
595
596         tdb_key = ltdb_key(module, msg->dn);
597         if (!tdb_key.dptr) {
598                 return LDB_ERR_OTHER;
599         }
600
601         tdb_data = tdb_fetch(ltdb->tdb, tdb_key);
602         if (!tdb_data.dptr) {
603                 talloc_free(tdb_key.dptr);
604                 return ltdb_err_map(tdb_error(ltdb->tdb));
605         }
606
607         msg2 = talloc(tdb_key.dptr, struct ldb_message);
608         if (msg2 == NULL) {
609                 talloc_free(tdb_key.dptr);
610                 return LDB_ERR_OTHER;
611         }
612
613         ret = ltdb_unpack_data(module, &tdb_data, msg2);
614         if (ret == -1) {
615                 ret = LDB_ERR_OTHER;
616                 goto failed;
617         }
618
619         if (!msg2->dn) {
620                 msg2->dn = msg->dn;
621         }
622
623         for (i=0;i<msg->num_elements;i++) {
624                 struct ldb_message_element *el = &msg->elements[i];
625                 struct ldb_message_element *el2;
626                 struct ldb_val *vals;
627                 const char *dn;
628
629                 switch (msg->elements[i].flags & LDB_FLAG_MOD_MASK) {
630
631                 case LDB_FLAG_MOD_ADD:
632                         /* add this element to the message. fail if it
633                            already exists */
634                         idx = find_element(msg2, el->name);
635
636                         if (idx == -1) {
637                                 if (msg_add_element(ldb, msg2, el) != 0) {
638                                         ret = LDB_ERR_OTHER;
639                                         goto failed;
640                                 }
641                                 continue;
642                         }
643
644                         el2 = &msg2->elements[idx];
645
646                         /* An attribute with this name already exists,
647                          * add all values if they don't already exist
648                          * (check both the other elements to be added,
649                          * and those already in the db). */
650
651                         for (j=0;j<el->num_values;j++) {
652                                 if (ldb_msg_find_val(el2, &el->values[j])) {
653                                         ldb_asprintf_errstring(module->ldb, "%s: value #%d already exists", el->name, j);
654                                         ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
655                                         goto failed;
656                                 }
657                                 if (ldb_msg_find_val(el, &el->values[j]) != &el->values[j]) {
658                                         ldb_asprintf_errstring(module->ldb, "%s: value #%d provided more than once", el->name, j);
659                                         ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
660                                         goto failed;
661                                 }
662                         }
663
664                         vals = talloc_realloc(msg2->elements, el2->values, struct ldb_val,
665                                                 el2->num_values + el->num_values);
666
667                         if (vals == NULL) {
668                                 ret = LDB_ERR_OTHER;
669                                 goto failed;
670                         }
671
672                         for (j=0;j<el->num_values;j++) {
673                                 vals[el2->num_values + j] =
674                                         ldb_val_dup(vals, &el->values[j]);
675                         }
676
677                         el2->values = vals;
678                         el2->num_values += el->num_values;
679
680                         break;
681
682                 case LDB_FLAG_MOD_REPLACE:
683                         /* replace all elements of this attribute name with the elements
684                            listed. The attribute not existing is not an error */
685                         msg_delete_attribute(module, ldb, msg2, el->name);
686
687                         for (j=0;j<el->num_values;j++) {
688                                 if (ldb_msg_find_val(el, &el->values[j]) != &el->values[j]) {
689                                         ldb_asprintf_errstring(module->ldb, "%s: value #%d provided more than once", el->name, j);
690                                         ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
691                                         goto failed;
692                                 }
693                         }
694
695                         /* add the replacement element, if not empty */
696                         if (el->num_values != 0 &&
697                             msg_add_element(ldb, msg2, el) != 0) {
698                                 ret = LDB_ERR_OTHER;
699                                 goto failed;
700                         }
701                         break;
702
703                 case LDB_FLAG_MOD_DELETE:
704
705                         dn = ldb_dn_get_linearized(msg->dn);
706                         if (dn == NULL) {
707                                 ret = LDB_ERR_OTHER;
708                                 goto failed;
709                         }
710
711                         /* we could be being asked to delete all
712                            values or just some values */
713                         if (msg->elements[i].num_values == 0) {
714                                 if (msg_delete_attribute(module, ldb, msg2, 
715                                                          msg->elements[i].name) != 0) {
716                                         ldb_asprintf_errstring(module->ldb, "No such attribute: %s for delete on %s", msg->elements[i].name, dn);
717                                         ret = LDB_ERR_NO_SUCH_ATTRIBUTE;
718                                         goto failed;
719                                 }
720                                 break;
721                         }
722                         for (j=0;j<msg->elements[i].num_values;j++) {
723                                 if (msg_delete_element(module,
724                                                        msg2, 
725                                                        msg->elements[i].name,
726                                                        &msg->elements[i].values[j]) != 0) {
727                                         ldb_asprintf_errstring(module->ldb, "No matching attribute value when deleting attribute: %s on %s", msg->elements[i].name, dn);
728                                         ret = LDB_ERR_NO_SUCH_ATTRIBUTE;
729                                         goto failed;
730                                 }
731                                 ret = ltdb_index_del_value(module, dn, &msg->elements[i], j);
732                                 if (ret != LDB_SUCCESS) {
733                                         goto failed;
734                                 }
735                         }
736                         break;
737                 default:
738                         ldb_asprintf_errstring(module->ldb, "Invalid ldb_modify flags on %s: 0x%x", 
739                                                              msg->elements[i].name, 
740                                                              msg->elements[i].flags & LDB_FLAG_MOD_MASK);
741                         ret = LDB_ERR_PROTOCOL_ERROR;
742                         goto failed;
743                 }
744         }
745
746         /* we've made all the mods - save the modified record back into the database */
747         ret = ltdb_store(module, msg2, TDB_MODIFY);
748         if (ret != LDB_SUCCESS) {
749                 goto failed;
750         }
751
752         ret = ltdb_modified(module, msg->dn);
753         if (ret != LDB_SUCCESS) {
754                 goto failed;
755         }
756
757         talloc_free(tdb_key.dptr);
758         free(tdb_data.dptr);
759         return ret;
760
761 failed:
762         talloc_free(tdb_key.dptr);
763         free(tdb_data.dptr);
764         return ret;
765 }
766
767 /*
768   modify a record
769 */
770 static int ltdb_modify(struct ldb_module *module, struct ldb_request *req)
771 {
772         struct ltdb_private *ltdb = talloc_get_type(module->private_data, struct ltdb_private);
773         struct ltdb_context *ltdb_ac;
774         int tret, ret = LDB_SUCCESS;
775
776         if (check_critical_controls(req->controls)) {
777                 return LDB_ERR_UNSUPPORTED_CRITICAL_EXTENSION;
778         }
779         
780         req->handle = NULL;
781
782         req->handle = init_ltdb_handle(ltdb, module, req);
783         if (req->handle == NULL) {
784                 return LDB_ERR_OPERATIONS_ERROR;
785         }
786         ltdb_ac = talloc_get_type(req->handle->private_data, struct ltdb_context);
787
788         tret = ltdb_check_special_dn(module, req->op.mod.message);
789         if (tret != LDB_SUCCESS) {
790                 req->handle->status = tret;
791                 goto done;
792         }
793         
794         if (ltdb_cache_load(module) != 0) {
795                 ret = LDB_ERR_OPERATIONS_ERROR;
796                 goto done;
797         }
798
799         tret = ltdb_modify_internal(module, req->op.mod.message);
800         if (tret != LDB_SUCCESS) {
801                 req->handle->status = tret;
802                 goto done;
803         }
804
805         if (ltdb_ac->callback) {
806                 ret = ltdb_ac->callback(module->ldb, ltdb_ac->context, NULL);
807         }
808 done:
809         req->handle->state = LDB_ASYNC_DONE;
810         return ret;
811 }
812
813 /*
814   rename a record
815 */
816 static int ltdb_rename(struct ldb_module *module, struct ldb_request *req)
817 {
818         struct ltdb_private *ltdb = talloc_get_type(module->private_data, struct ltdb_private);
819         struct ltdb_context *ltdb_ac;
820         struct ldb_message *msg;
821         int tret, ret = LDB_SUCCESS;
822
823         if (check_critical_controls(req->controls)) {
824                 return LDB_ERR_UNSUPPORTED_CRITICAL_EXTENSION;
825         }
826         
827         req->handle = NULL;
828
829         if (ltdb_cache_load(module) != 0) {
830                 return LDB_ERR_OPERATIONS_ERROR;
831         }
832
833         req->handle = init_ltdb_handle(ltdb, module, req);
834         if (req->handle == NULL) {
835                 return LDB_ERR_OPERATIONS_ERROR;
836         }
837         ltdb_ac = talloc_get_type(req->handle->private_data, struct ltdb_context);
838
839         msg = talloc(ltdb_ac, struct ldb_message);
840         if (msg == NULL) {
841                 ret = LDB_ERR_OPERATIONS_ERROR;
842                 goto done;
843         }
844
845         /* in case any attribute of the message was indexed, we need
846            to fetch the old record */
847         tret = ltdb_search_dn1(module, req->op.rename.olddn, msg);
848         if (tret != LDB_SUCCESS) {
849                 /* not finding the old record is an error */
850                 req->handle->status = tret;
851                 goto done;
852         }
853
854         msg->dn = ldb_dn_copy(msg, req->op.rename.newdn);
855         if (!msg->dn) {
856                 ret = LDB_ERR_OPERATIONS_ERROR;
857                 goto done;
858         }
859
860         ret = ltdb_add_internal(module, msg);
861         if (ret != LDB_SUCCESS) {
862                 goto done;
863         }
864
865         tret = ltdb_delete_internal(module, req->op.rename.olddn);
866         if (tret != LDB_SUCCESS) {
867                 ltdb_delete_internal(module, req->op.rename.newdn);
868                 ret = LDB_ERR_OPERATIONS_ERROR;
869                 goto done;
870         }
871
872         if (ltdb_ac->callback) {
873                 ret = ltdb_ac->callback(module->ldb, ltdb_ac->context, NULL);
874         }
875 done:
876         req->handle->state = LDB_ASYNC_DONE;
877         return ret;
878 }
879
880 static int ltdb_start_trans(struct ldb_module *module)
881 {
882         struct ltdb_private *ltdb =
883                 talloc_get_type(module->private_data, struct ltdb_private);
884
885         if (tdb_transaction_start(ltdb->tdb) != 0) {
886                 return ltdb_err_map(tdb_error(ltdb->tdb));
887         }
888
889         return LDB_SUCCESS;
890 }
891
892 static int ltdb_end_trans(struct ldb_module *module)
893 {
894         struct ltdb_private *ltdb =
895                 talloc_get_type(module->private_data, struct ltdb_private);
896
897         if (tdb_transaction_commit(ltdb->tdb) != 0) {
898                 return ltdb_err_map(tdb_error(ltdb->tdb));
899         }
900
901         return LDB_SUCCESS;
902 }
903
904 static int ltdb_del_trans(struct ldb_module *module)
905 {
906         struct ltdb_private *ltdb =
907                 talloc_get_type(module->private_data, struct ltdb_private);
908
909         if (tdb_transaction_cancel(ltdb->tdb) != 0) {
910                 return ltdb_err_map(tdb_error(ltdb->tdb));
911         }
912
913         return LDB_SUCCESS;
914 }
915
916 static int ltdb_wait(struct ldb_handle *handle, enum ldb_wait_type type)
917 {
918         return handle->status;
919 }
920
921 static int ltdb_request(struct ldb_module *module, struct ldb_request *req)
922 {
923         /* check for oustanding critical controls and return an error if found */
924         if (check_critical_controls(req->controls)) {
925                 return LDB_ERR_UNSUPPORTED_CRITICAL_EXTENSION;
926         }
927         
928         /* search, add, modify, delete, rename are handled by their own, no other op supported */
929         return LDB_ERR_OPERATIONS_ERROR;
930 }
931
932 /*
933   return sequenceNumber from @BASEINFO
934 */
935 static int ltdb_sequence_number(struct ldb_module *module, struct ldb_request *req)
936 {
937         TALLOC_CTX *tmp_ctx = talloc_new(req);
938         struct ldb_message *msg = NULL;
939         struct ldb_dn *dn = ldb_dn_new(tmp_ctx, module->ldb, LTDB_BASEINFO);
940         int tret;
941
942         if (tmp_ctx == NULL) {
943                 talloc_free(tmp_ctx);
944                 return LDB_ERR_OPERATIONS_ERROR;
945         }
946
947         msg = talloc(tmp_ctx, struct ldb_message);
948         if (msg == NULL) {
949                 talloc_free(tmp_ctx);
950                 return LDB_ERR_OPERATIONS_ERROR;
951         }
952
953         req->op.seq_num.flags = 0;
954
955         tret = ltdb_search_dn1(module, dn, msg);
956         if (tret != LDB_SUCCESS) {
957                 talloc_free(tmp_ctx);
958                 /* zero is as good as anything when we don't know */
959                 req->op.seq_num.seq_num = 0;
960                 return LDB_SUCCESS;
961         }
962
963         switch (req->op.seq_num.type) {
964         case LDB_SEQ_HIGHEST_SEQ:
965                 req->op.seq_num.seq_num = ldb_msg_find_attr_as_uint64(msg, LTDB_SEQUENCE_NUMBER, 0);
966                 break;
967         case LDB_SEQ_NEXT:
968                 req->op.seq_num.seq_num = ldb_msg_find_attr_as_uint64(msg, LTDB_SEQUENCE_NUMBER, 0);
969                 req->op.seq_num.seq_num++;
970                 break;
971         case LDB_SEQ_HIGHEST_TIMESTAMP:
972         {
973                 const char *date = ldb_msg_find_attr_as_string(msg, LTDB_MOD_TIMESTAMP, NULL);
974                 if (date) {
975                         req->op.seq_num.seq_num = ldb_string_to_time(date);
976                 } else {
977                         req->op.seq_num.seq_num = 0;
978                         /* zero is as good as anything when we don't know */
979                 }
980                 break;
981         }
982         }
983         talloc_free(tmp_ctx);
984         return LDB_SUCCESS;
985 }
986
987 static const struct ldb_module_ops ltdb_ops = {
988         .name              = "tdb",
989         .search            = ltdb_search,
990         .add               = ltdb_add,
991         .modify            = ltdb_modify,
992         .del               = ltdb_delete,
993         .rename            = ltdb_rename,
994         .request           = ltdb_request,
995         .start_transaction = ltdb_start_trans,
996         .end_transaction   = ltdb_end_trans,
997         .del_transaction   = ltdb_del_trans,
998         .wait              = ltdb_wait,
999         .sequence_number   = ltdb_sequence_number
1000 };
1001
1002 /*
1003   connect to the database
1004 */
1005 static int ltdb_connect(struct ldb_context *ldb, const char *url, 
1006                         unsigned int flags, const char *options[],
1007                         struct ldb_module **module)
1008 {
1009         const char *path;
1010         int tdb_flags, open_flags;
1011         struct ltdb_private *ltdb;
1012
1013         /* parse the url */
1014         if (strchr(url, ':')) {
1015                 if (strncmp(url, "tdb://", 6) != 0) {
1016                         ldb_debug(ldb, LDB_DEBUG_ERROR, "Invalid tdb URL '%s'", url);
1017                         return -1;
1018                 }
1019                 path = url+6;
1020         } else {
1021                 path = url;
1022         }
1023
1024         tdb_flags = TDB_DEFAULT | TDB_SEQNUM;
1025
1026         /* check for the 'nosync' option */
1027         if (flags & LDB_FLG_NOSYNC) {
1028                 tdb_flags |= TDB_NOSYNC;
1029         }
1030
1031         /* and nommap option */
1032         if (flags & LDB_FLG_NOMMAP) {
1033                 tdb_flags |= TDB_NOMMAP;
1034         }
1035
1036         if (flags & LDB_FLG_RDONLY) {
1037                 open_flags = O_RDONLY;
1038         } else {
1039                 open_flags = O_CREAT | O_RDWR;
1040         }
1041
1042         ltdb = talloc_zero(ldb, struct ltdb_private);
1043         if (!ltdb) {
1044                 ldb_oom(ldb);
1045                 return -1;
1046         }
1047
1048         /* note that we use quite a large default hash size */
1049         ltdb->tdb = ltdb_wrap_open(ltdb, path, 10000, 
1050                                    tdb_flags, open_flags, 
1051                                    ldb->create_perms, ldb);
1052         if (!ltdb->tdb) {
1053                 ldb_debug(ldb, LDB_DEBUG_ERROR, "Unable to open tdb '%s'\n", path);
1054                 talloc_free(ltdb);
1055                 return -1;
1056         }
1057
1058         ltdb->sequence_number = 0;
1059
1060         *module = talloc(ldb, struct ldb_module);
1061         if (!module) {
1062                 ldb_oom(ldb);
1063                 talloc_free(ltdb);
1064                 return -1;
1065         }
1066         talloc_set_name_const(*module, "ldb_tdb backend");
1067         (*module)->ldb = ldb;
1068         (*module)->prev = (*module)->next = NULL;
1069         (*module)->private_data = ltdb;
1070         (*module)->ops = &ltdb_ops;
1071
1072         if (ltdb_cache_load(*module) != 0) {
1073                 talloc_free(*module);
1074                 talloc_free(ltdb);
1075                 return -1;
1076         }
1077
1078         return 0;
1079 }
1080
1081 int ldb_tdb_init(void)
1082 {
1083         return ldb_register_backend("tdb", ltdb_connect);
1084 }