remove all '\n' from ldb_debug
[samba.git] / source4 / lib / ldb / ldb_tdb / ldb_tdb.c
1 /*
2    ldb database library
3
4    Copyright (C) Andrew Tridgell  2004
5    Copyright (C) Stefan Metzmacher  2004
6    Copyright (C) Simo Sorce       2006-2008
7
8
9      ** NOTE! The following LGPL license applies to the ldb
10      ** library. This does NOT imply that all of Samba is released
11      ** under the LGPL
12
13    This library is free software; you can redistribute it and/or
14    modify it under the terms of the GNU Lesser General Public
15    License as published by the Free Software Foundation; either
16    version 3 of the License, or (at your option) any later version.
17
18    This library is distributed in the hope that it will be useful,
19    but WITHOUT ANY WARRANTY; without even the implied warranty of
20    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
21    Lesser General Public License for more details.
22
23    You should have received a copy of the GNU Lesser General Public
24    License along with this library; if not, see <http://www.gnu.org/licenses/>.
25 */
26
27 /*
28  *  Name: ldb_tdb
29  *
30  *  Component: ldb tdb backend
31  *
32  *  Description: core functions for tdb backend
33  *
34  *  Author: Andrew Tridgell
35  *  Author: Stefan Metzmacher
36  *
37  *  Modifications:
38  *
39  *  - description: make the module use asyncronous calls
40  *    date: Feb 2006
41  *    Author: Simo Sorce
42  *
43  *  - description: make it possible to use event contexts
44  *    date: Jan 2008
45  *    Author: Simo Sorce
46  */
47
48 #include "ldb_tdb.h"
49
50
51 /*
52   map a tdb error code to a ldb error code
53 */
54 static int ltdb_err_map(enum TDB_ERROR tdb_code)
55 {
56         switch (tdb_code) {
57         case TDB_SUCCESS:
58                 return LDB_SUCCESS;
59         case TDB_ERR_CORRUPT:
60         case TDB_ERR_OOM:
61         case TDB_ERR_EINVAL:
62                 return LDB_ERR_OPERATIONS_ERROR;
63         case TDB_ERR_IO:
64                 return LDB_ERR_PROTOCOL_ERROR;
65         case TDB_ERR_LOCK:
66         case TDB_ERR_NOLOCK:
67                 return LDB_ERR_BUSY;
68         case TDB_ERR_LOCK_TIMEOUT:
69                 return LDB_ERR_TIME_LIMIT_EXCEEDED;
70         case TDB_ERR_EXISTS:
71                 return LDB_ERR_ENTRY_ALREADY_EXISTS;
72         case TDB_ERR_NOEXIST:
73                 return LDB_ERR_NO_SUCH_OBJECT;
74         case TDB_ERR_RDONLY:
75                 return LDB_ERR_INSUFFICIENT_ACCESS_RIGHTS;
76         }
77         return LDB_ERR_OTHER;
78 }
79
80 /*
81   lock the database for read - use by ltdb_search and ltdb_sequence_number
82 */
83 int ltdb_lock_read(struct ldb_module *module)
84 {
85         void *data = ldb_module_get_private(module);
86         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
87         if (ltdb->in_transaction == 0) {
88                 return tdb_lockall_read(ltdb->tdb);
89         }
90         return 0;
91 }
92
93 /*
94   unlock the database after a ltdb_lock_read()
95 */
96 int ltdb_unlock_read(struct ldb_module *module)
97 {
98         void *data = ldb_module_get_private(module);
99         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
100         if (ltdb->in_transaction == 0) {
101                 return tdb_unlockall_read(ltdb->tdb);
102         }
103         return 0;
104 }
105
106
107 /*
108   form a TDB_DATA for a record key
109   caller frees
110
111   note that the key for a record can depend on whether the
112   dn refers to a case sensitive index record or not
113 */
114 struct TDB_DATA ltdb_key(struct ldb_module *module, struct ldb_dn *dn)
115 {
116         struct ldb_context *ldb = ldb_module_get_ctx(module);
117         TDB_DATA key;
118         char *key_str = NULL;
119         const char *dn_folded = NULL;
120
121         /*
122           most DNs are case insensitive. The exception is index DNs for
123           case sensitive attributes
124
125           there are 3 cases dealt with in this code:
126
127           1) if the dn doesn't start with @ then uppercase the attribute
128              names and the attributes values of case insensitive attributes
129           2) if the dn starts with @ then leave it alone -
130              the indexing code handles the rest
131         */
132
133         dn_folded = ldb_dn_get_casefold(dn);
134         if (!dn_folded) {
135                 goto failed;
136         }
137
138         key_str = talloc_strdup(ldb, "DN=");
139         if (!key_str) {
140                 goto failed;
141         }
142
143         key_str = talloc_strdup_append_buffer(key_str, dn_folded);
144         if (!key_str) {
145                 goto failed;
146         }
147
148         key.dptr = (uint8_t *)key_str;
149         key.dsize = strlen(key_str) + 1;
150
151         return key;
152
153 failed:
154         errno = ENOMEM;
155         key.dptr = NULL;
156         key.dsize = 0;
157         return key;
158 }
159
160 /*
161   check special dn's have valid attributes
162   currently only @ATTRIBUTES is checked
163 */
164 static int ltdb_check_special_dn(struct ldb_module *module,
165                           const struct ldb_message *msg)
166 {
167         struct ldb_context *ldb = ldb_module_get_ctx(module);
168         int i, j;
169
170         if (! ldb_dn_is_special(msg->dn) ||
171             ! ldb_dn_check_special(msg->dn, LTDB_ATTRIBUTES)) {
172                 return 0;
173         }
174
175         /* we have @ATTRIBUTES, let's check attributes are fine */
176         /* should we check that we deny multivalued attributes ? */
177         for (i = 0; i < msg->num_elements; i++) {
178                 for (j = 0; j < msg->elements[i].num_values; j++) {
179                         if (ltdb_check_at_attributes_values(&msg->elements[i].values[j]) != 0) {
180                                 ldb_set_errstring(ldb, "Invalid attribute value in an @ATTRIBUTES entry");
181                                 return LDB_ERR_INVALID_ATTRIBUTE_SYNTAX;
182                         }
183                 }
184         }
185
186         return 0;
187 }
188
189
190 /*
191   we've made a modification to a dn - possibly reindex and
192   update sequence number
193 */
194 static int ltdb_modified(struct ldb_module *module, struct ldb_dn *dn)
195 {
196         int ret = LDB_SUCCESS;
197
198         if (ldb_dn_is_special(dn) &&
199             (ldb_dn_check_special(dn, LTDB_INDEXLIST) ||
200              ldb_dn_check_special(dn, LTDB_ATTRIBUTES)) ) {
201                 ret = ltdb_reindex(module);
202         }
203
204         if (ret == LDB_SUCCESS &&
205             !(ldb_dn_is_special(dn) &&
206               ldb_dn_check_special(dn, LTDB_BASEINFO)) ) {
207                 ret = ltdb_increase_sequence_number(module);
208         }
209
210         return ret;
211 }
212
213 /*
214   store a record into the db
215 */
216 int ltdb_store(struct ldb_module *module, const struct ldb_message *msg, int flgs)
217 {
218         void *data = ldb_module_get_private(module);
219         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
220         TDB_DATA tdb_key, tdb_data;
221         int ret;
222
223         tdb_key = ltdb_key(module, msg->dn);
224         if (!tdb_key.dptr) {
225                 return LDB_ERR_OTHER;
226         }
227
228         ret = ltdb_pack_data(module, msg, &tdb_data);
229         if (ret == -1) {
230                 talloc_free(tdb_key.dptr);
231                 return LDB_ERR_OTHER;
232         }
233
234         ret = tdb_store(ltdb->tdb, tdb_key, tdb_data, flgs);
235         if (ret == -1) {
236                 ret = ltdb_err_map(tdb_error(ltdb->tdb));
237                 goto done;
238         }
239
240         ret = ltdb_index_add(module, msg);
241         if (ret != LDB_SUCCESS) {
242                 tdb_delete(ltdb->tdb, tdb_key);
243         }
244
245 done:
246         talloc_free(tdb_key.dptr);
247         talloc_free(tdb_data.dptr);
248
249         return ret;
250 }
251
252
253 static int ltdb_add_internal(struct ldb_module *module,
254                              const struct ldb_message *msg)
255 {
256         struct ldb_context *ldb = ldb_module_get_ctx(module);
257         int ret;
258
259         ret = ltdb_check_special_dn(module, msg);
260         if (ret != LDB_SUCCESS) {
261                 return ret;
262         }
263
264         if (ltdb_cache_load(module) != 0) {
265                 return LDB_ERR_OPERATIONS_ERROR;
266         }
267
268         ret = ltdb_store(module, msg, TDB_INSERT);
269
270         if (ret == LDB_ERR_ENTRY_ALREADY_EXISTS) {
271                 ldb_asprintf_errstring(ldb,
272                                         "Entry %s already exists",
273                                         ldb_dn_get_linearized(msg->dn));
274                 return ret;
275         }
276
277         if (ret == LDB_SUCCESS) {
278                 ret = ltdb_index_one(module, msg, 1);
279                 if (ret != LDB_SUCCESS) {
280                         return ret;
281                 }
282
283                 ret = ltdb_modified(module, msg->dn);
284                 if (ret != LDB_SUCCESS) {
285                         return ret;
286                 }
287         }
288
289         return ret;
290 }
291
292 /*
293   add a record to the database
294 */
295 static int ltdb_add(struct ltdb_context *ctx)
296 {
297         struct ldb_module *module = ctx->module;
298         struct ldb_request *req = ctx->req;
299         int tret;
300
301         ldb_request_set_state(req, LDB_ASYNC_PENDING);
302
303         tret = ltdb_add_internal(module, req->op.add.message);
304         if (tret != LDB_SUCCESS) {
305                 return tret;
306         }
307
308         return LDB_SUCCESS;
309 }
310
311 /*
312   delete a record from the database, not updating indexes (used for deleting
313   index records)
314 */
315 int ltdb_delete_noindex(struct ldb_module *module, struct ldb_dn *dn)
316 {
317         void *data = ldb_module_get_private(module);
318         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
319         TDB_DATA tdb_key;
320         int ret;
321
322         tdb_key = ltdb_key(module, dn);
323         if (!tdb_key.dptr) {
324                 return LDB_ERR_OTHER;
325         }
326
327         ret = tdb_delete(ltdb->tdb, tdb_key);
328         talloc_free(tdb_key.dptr);
329
330         if (ret != 0) {
331                 ret = ltdb_err_map(tdb_error(ltdb->tdb));
332         }
333
334         return ret;
335 }
336
337 static int ltdb_delete_internal(struct ldb_module *module, struct ldb_dn *dn)
338 {
339         struct ldb_message *msg;
340         int ret;
341
342         msg = talloc(module, struct ldb_message);
343         if (msg == NULL) {
344                 return LDB_ERR_OPERATIONS_ERROR;
345         }
346
347         /* in case any attribute of the message was indexed, we need
348            to fetch the old record */
349         ret = ltdb_search_dn1(module, dn, msg);
350         if (ret != LDB_SUCCESS) {
351                 /* not finding the old record is an error */
352                 goto done;
353         }
354
355         ret = ltdb_delete_noindex(module, dn);
356         if (ret != LDB_SUCCESS) {
357                 goto done;
358         }
359
360         /* remove one level attribute */
361         ret = ltdb_index_one(module, msg, 0);
362         if (ret != LDB_SUCCESS) {
363                 goto done;
364         }
365
366         /* remove any indexed attributes */
367         ret = ltdb_index_del(module, msg);
368         if (ret != LDB_SUCCESS) {
369                 goto done;
370         }
371
372         ret = ltdb_modified(module, dn);
373         if (ret != LDB_SUCCESS) {
374                 goto done;
375         }
376
377 done:
378         talloc_free(msg);
379         return ret;
380 }
381
382 /*
383   delete a record from the database
384 */
385 static int ltdb_delete(struct ltdb_context *ctx)
386 {
387         struct ldb_module *module = ctx->module;
388         struct ldb_request *req = ctx->req;
389         int tret;
390
391         ldb_request_set_state(req, LDB_ASYNC_PENDING);
392
393         if (ltdb_cache_load(module) != 0) {
394                 return LDB_ERR_OPERATIONS_ERROR;
395         }
396
397         tret = ltdb_delete_internal(module, req->op.del.dn);
398         if (tret != LDB_SUCCESS) {
399                 return tret;
400         }
401
402         return LDB_SUCCESS;
403 }
404
405 /*
406   find an element by attribute name. At the moment this does a linear search,
407   it should be re-coded to use a binary search once all places that modify
408   records guarantee sorted order
409
410   return the index of the first matching element if found, otherwise -1
411 */
412 static int find_element(const struct ldb_message *msg, const char *name)
413 {
414         unsigned int i;
415         for (i=0;i<msg->num_elements;i++) {
416                 if (ldb_attr_cmp(msg->elements[i].name, name) == 0) {
417                         return i;
418                 }
419         }
420         return -1;
421 }
422
423
424 /*
425   add an element to an existing record. Assumes a elements array that we
426   can call re-alloc on, and assumed that we can re-use the data pointers from
427   the passed in additional values. Use with care!
428
429   returns 0 on success, -1 on failure (and sets errno)
430 */
431 static int msg_add_element(struct ldb_context *ldb,
432                            struct ldb_message *msg,
433                            struct ldb_message_element *el)
434 {
435         struct ldb_message_element *e2;
436         unsigned int i;
437
438         e2 = talloc_realloc(msg, msg->elements, struct ldb_message_element,
439                               msg->num_elements+1);
440         if (!e2) {
441                 errno = ENOMEM;
442                 return -1;
443         }
444
445         msg->elements = e2;
446
447         e2 = &msg->elements[msg->num_elements];
448
449         e2->name = el->name;
450         e2->flags = el->flags;
451         e2->values = NULL;
452         if (el->num_values != 0) {
453                 e2->values = talloc_array(msg->elements,
454                                           struct ldb_val, el->num_values);
455                 if (!e2->values) {
456                         errno = ENOMEM;
457                         return -1;
458                 }
459         }
460         for (i=0;i<el->num_values;i++) {
461                 e2->values[i] = el->values[i];
462         }
463         e2->num_values = el->num_values;
464
465         msg->num_elements++;
466
467         return 0;
468 }
469
470 /*
471   delete all elements having a specified attribute name
472 */
473 static int msg_delete_attribute(struct ldb_module *module,
474                                 struct ldb_context *ldb,
475                                 struct ldb_message *msg, const char *name)
476 {
477         const char *dn;
478         unsigned int i, j;
479
480         dn = ldb_dn_get_linearized(msg->dn);
481         if (dn == NULL) {
482                 return -1;
483         }
484
485         for (i=0;i<msg->num_elements;i++) {
486                 if (ldb_attr_cmp(msg->elements[i].name, name) == 0) {
487                         for (j=0;j<msg->elements[i].num_values;j++) {
488                                 ltdb_index_del_value(module, dn,
489                                                      &msg->elements[i], j);
490                         }
491                         talloc_free(msg->elements[i].values);
492                         if (msg->num_elements > (i+1)) {
493                                 memmove(&msg->elements[i],
494                                         &msg->elements[i+1],
495                                         sizeof(struct ldb_message_element)*
496                                         (msg->num_elements - (i+1)));
497                         }
498                         msg->num_elements--;
499                         i--;
500                         msg->elements = talloc_realloc(msg, msg->elements,
501                                                         struct ldb_message_element,
502                                                         msg->num_elements);
503                 }
504         }
505
506         return 0;
507 }
508
509 /*
510   delete all elements matching an attribute name/value
511
512   return 0 on success, -1 on failure
513 */
514 static int msg_delete_element(struct ldb_module *module,
515                               struct ldb_message *msg,
516                               const char *name,
517                               const struct ldb_val *val)
518 {
519         struct ldb_context *ldb = ldb_module_get_ctx(module);
520         unsigned int i;
521         int found;
522         struct ldb_message_element *el;
523         const struct ldb_schema_attribute *a;
524
525         found = find_element(msg, name);
526         if (found == -1) {
527                 return -1;
528         }
529
530         el = &msg->elements[found];
531
532         a = ldb_schema_attribute_by_name(ldb, el->name);
533
534         for (i=0;i<el->num_values;i++) {
535                 if (a->syntax->comparison_fn(ldb, ldb,
536                                                 &el->values[i], val) == 0) {
537                         if (i<el->num_values-1) {
538                                 memmove(&el->values[i], &el->values[i+1],
539                                         sizeof(el->values[i])*
540                                                 (el->num_values-(i+1)));
541                         }
542                         el->num_values--;
543                         if (el->num_values == 0) {
544                                 return msg_delete_attribute(module, ldb,
545                                                             msg, name);
546                         }
547                         return 0;
548                 }
549         }
550
551         return -1;
552 }
553
554
555 /*
556   modify a record - internal interface
557
558   yuck - this is O(n^2). Luckily n is usually small so we probably
559   get away with it, but if we ever have really large attribute lists
560   then we'll need to look at this again
561 */
562 int ltdb_modify_internal(struct ldb_module *module,
563                          const struct ldb_message *msg)
564 {
565         struct ldb_context *ldb = ldb_module_get_ctx(module);
566         void *data = ldb_module_get_private(module);
567         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
568         TDB_DATA tdb_key, tdb_data;
569         struct ldb_message *msg2;
570         unsigned i, j;
571         int ret, idx;
572
573         tdb_key = ltdb_key(module, msg->dn);
574         if (!tdb_key.dptr) {
575                 return LDB_ERR_OTHER;
576         }
577
578         tdb_data = tdb_fetch(ltdb->tdb, tdb_key);
579         if (!tdb_data.dptr) {
580                 talloc_free(tdb_key.dptr);
581                 return ltdb_err_map(tdb_error(ltdb->tdb));
582         }
583
584         msg2 = talloc(tdb_key.dptr, struct ldb_message);
585         if (msg2 == NULL) {
586                 talloc_free(tdb_key.dptr);
587                 return LDB_ERR_OTHER;
588         }
589
590         ret = ltdb_unpack_data(module, &tdb_data, msg2);
591         if (ret == -1) {
592                 ret = LDB_ERR_OTHER;
593                 goto failed;
594         }
595
596         if (!msg2->dn) {
597                 msg2->dn = msg->dn;
598         }
599
600         for (i=0;i<msg->num_elements;i++) {
601                 struct ldb_message_element *el = &msg->elements[i];
602                 struct ldb_message_element *el2;
603                 struct ldb_val *vals;
604                 const char *dn;
605
606                 switch (msg->elements[i].flags & LDB_FLAG_MOD_MASK) {
607
608                 case LDB_FLAG_MOD_ADD:
609                         /* add this element to the message. fail if it
610                            already exists */
611                         idx = find_element(msg2, el->name);
612
613                         if (idx == -1) {
614                                 if (msg_add_element(ldb, msg2, el) != 0) {
615                                         ret = LDB_ERR_OTHER;
616                                         goto failed;
617                                 }
618                                 continue;
619                         }
620
621                         el2 = &msg2->elements[idx];
622
623                         /* An attribute with this name already exists,
624                          * add all values if they don't already exist
625                          * (check both the other elements to be added,
626                          * and those already in the db). */
627
628                         for (j=0;j<el->num_values;j++) {
629                                 if (ldb_msg_find_val(el2, &el->values[j])) {
630                                         ldb_asprintf_errstring(ldb, "%s: value #%d already exists", el->name, j);
631                                         ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
632                                         goto failed;
633                                 }
634                                 if (ldb_msg_find_val(el, &el->values[j]) != &el->values[j]) {
635                                         ldb_asprintf_errstring(ldb, "%s: value #%d provided more than once", el->name, j);
636                                         ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
637                                         goto failed;
638                                 }
639                         }
640
641                         vals = talloc_realloc(msg2->elements, el2->values, struct ldb_val,
642                                                 el2->num_values + el->num_values);
643
644                         if (vals == NULL) {
645                                 ret = LDB_ERR_OTHER;
646                                 goto failed;
647                         }
648
649                         for (j=0;j<el->num_values;j++) {
650                                 vals[el2->num_values + j] =
651                                         ldb_val_dup(vals, &el->values[j]);
652                         }
653
654                         el2->values = vals;
655                         el2->num_values += el->num_values;
656
657                         break;
658
659                 case LDB_FLAG_MOD_REPLACE:
660                         /* replace all elements of this attribute name with the elements
661                            listed. The attribute not existing is not an error */
662                         msg_delete_attribute(module, ldb, msg2, el->name);
663
664                         for (j=0;j<el->num_values;j++) {
665                                 if (ldb_msg_find_val(el, &el->values[j]) != &el->values[j]) {
666                                         ldb_asprintf_errstring(ldb, "%s: value #%d provided more than once", el->name, j);
667                                         ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
668                                         goto failed;
669                                 }
670                         }
671
672                         /* add the replacement element, if not empty */
673                         if (el->num_values != 0 &&
674                             msg_add_element(ldb, msg2, el) != 0) {
675                                 ret = LDB_ERR_OTHER;
676                                 goto failed;
677                         }
678                         break;
679
680                 case LDB_FLAG_MOD_DELETE:
681
682                         dn = ldb_dn_get_linearized(msg->dn);
683                         if (dn == NULL) {
684                                 ret = LDB_ERR_OTHER;
685                                 goto failed;
686                         }
687
688                         /* we could be being asked to delete all
689                            values or just some values */
690                         if (msg->elements[i].num_values == 0) {
691                                 if (msg_delete_attribute(module, ldb, msg2, 
692                                                          msg->elements[i].name) != 0) {
693                                         ldb_asprintf_errstring(ldb, "No such attribute: %s for delete on %s", msg->elements[i].name, dn);
694                                         ret = LDB_ERR_NO_SUCH_ATTRIBUTE;
695                                         goto failed;
696                                 }
697                                 break;
698                         }
699                         for (j=0;j<msg->elements[i].num_values;j++) {
700                                 if (msg_delete_element(module,
701                                                        msg2, 
702                                                        msg->elements[i].name,
703                                                        &msg->elements[i].values[j]) != 0) {
704                                         ldb_asprintf_errstring(ldb, "No matching attribute value when deleting attribute: %s on %s", msg->elements[i].name, dn);
705                                         ret = LDB_ERR_NO_SUCH_ATTRIBUTE;
706                                         goto failed;
707                                 }
708                                 ret = ltdb_index_del_value(module, dn, &msg->elements[i], j);
709                                 if (ret != LDB_SUCCESS) {
710                                         goto failed;
711                                 }
712                         }
713                         break;
714                 default:
715                         ldb_asprintf_errstring(ldb,
716                                 "Invalid ldb_modify flags on %s: 0x%x",
717                                 msg->elements[i].name,
718                                 msg->elements[i].flags & LDB_FLAG_MOD_MASK);
719                         ret = LDB_ERR_PROTOCOL_ERROR;
720                         goto failed;
721                 }
722         }
723
724         /* we've made all the mods
725          * save the modified record back into the database */
726         ret = ltdb_store(module, msg2, TDB_MODIFY);
727         if (ret != LDB_SUCCESS) {
728                 goto failed;
729         }
730
731         ret = ltdb_modified(module, msg->dn);
732         if (ret != LDB_SUCCESS) {
733                 goto failed;
734         }
735
736         talloc_free(tdb_key.dptr);
737         free(tdb_data.dptr);
738         return ret;
739
740 failed:
741         talloc_free(tdb_key.dptr);
742         free(tdb_data.dptr);
743         return ret;
744 }
745
746 /*
747   modify a record
748 */
749 static int ltdb_modify(struct ltdb_context *ctx)
750 {
751         struct ldb_module *module = ctx->module;
752         struct ldb_request *req = ctx->req;
753         int tret;
754
755         ldb_request_set_state(req, LDB_ASYNC_PENDING);
756
757         tret = ltdb_check_special_dn(module, req->op.mod.message);
758         if (tret != LDB_SUCCESS) {
759                 return tret;
760         }
761
762         if (ltdb_cache_load(module) != 0) {
763                 return LDB_ERR_OPERATIONS_ERROR;
764         }
765
766         tret = ltdb_modify_internal(module, req->op.mod.message);
767         if (tret != LDB_SUCCESS) {
768                 return tret;
769         }
770
771         return LDB_SUCCESS;
772 }
773
774 /*
775   rename a record
776 */
777 static int ltdb_rename(struct ltdb_context *ctx)
778 {
779         struct ldb_module *module = ctx->module;
780         struct ldb_request *req = ctx->req;
781         struct ldb_message *msg;
782         int tret;
783
784         ldb_request_set_state(req, LDB_ASYNC_PENDING);
785
786         if (ltdb_cache_load(ctx->module) != 0) {
787                 return LDB_ERR_OPERATIONS_ERROR;
788         }
789
790         msg = talloc(ctx, struct ldb_message);
791         if (msg == NULL) {
792                 return LDB_ERR_OPERATIONS_ERROR;
793         }
794
795         /* in case any attribute of the message was indexed, we need
796            to fetch the old record */
797         tret = ltdb_search_dn1(module, req->op.rename.olddn, msg);
798         if (tret != LDB_SUCCESS) {
799                 /* not finding the old record is an error */
800                 return tret;
801         }
802
803         msg->dn = ldb_dn_copy(msg, req->op.rename.newdn);
804         if (!msg->dn) {
805                 return LDB_ERR_OPERATIONS_ERROR;
806         }
807
808         /* Always delete first then add, to avoid conflicts with
809          * unique indexes. We rely on the transaction to make this
810          * atomic
811          */
812         tret = ltdb_delete_internal(module, req->op.rename.olddn);
813         if (tret != LDB_SUCCESS) {
814                 return tret;
815         }
816
817         tret = ltdb_add_internal(module, msg);
818         if (tret != LDB_SUCCESS) {
819                 return tret;
820         }
821
822         return LDB_SUCCESS;
823 }
824
825 static int ltdb_start_trans(struct ldb_module *module)
826 {
827         void *data = ldb_module_get_private(module);
828         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
829
830         if (tdb_transaction_start(ltdb->tdb) != 0) {
831                 return ltdb_err_map(tdb_error(ltdb->tdb));
832         }
833
834         ltdb->in_transaction++;
835
836         ltdb_index_transaction_start(module);
837
838         return LDB_SUCCESS;
839 }
840
841 static int ltdb_prepare_commit(struct ldb_module *module)
842 {
843         void *data = ldb_module_get_private(module);
844         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
845
846         if (ltdb->in_transaction != 1) {
847                 return LDB_SUCCESS;
848         }
849
850         if (ltdb_index_transaction_commit(module) != 0) {
851                 tdb_transaction_cancel(ltdb->tdb);
852                 ltdb->in_transaction--;
853                 return ltdb_err_map(tdb_error(ltdb->tdb));
854         }
855
856         if (tdb_transaction_prepare_commit(ltdb->tdb) != 0) {
857                 ltdb->in_transaction--;
858                 return ltdb_err_map(tdb_error(ltdb->tdb));
859         }
860
861         ltdb->prepared_commit = true;
862
863         return LDB_SUCCESS;
864 }
865
866 static int ltdb_end_trans(struct ldb_module *module)
867 {
868         void *data = ldb_module_get_private(module);
869         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
870
871         if (!ltdb->prepared_commit) {
872                 int ret = ltdb_prepare_commit(module);
873                 if (ret != LDB_SUCCESS) {
874                         return ret;
875                 }
876         }
877
878         ltdb->in_transaction--;
879         ltdb->prepared_commit = false;
880
881         if (tdb_transaction_commit(ltdb->tdb) != 0) {
882                 return ltdb_err_map(tdb_error(ltdb->tdb));
883         }
884
885         return LDB_SUCCESS;
886 }
887
888 static int ltdb_del_trans(struct ldb_module *module)
889 {
890         void *data = ldb_module_get_private(module);
891         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
892
893         ltdb->in_transaction--;
894
895         if (ltdb_index_transaction_cancel(module) != 0) {
896                 tdb_transaction_cancel(ltdb->tdb);
897                 return ltdb_err_map(tdb_error(ltdb->tdb));
898         }
899
900         if (tdb_transaction_cancel(ltdb->tdb) != 0) {
901                 return ltdb_err_map(tdb_error(ltdb->tdb));
902         }
903
904         return LDB_SUCCESS;
905 }
906
907 /*
908   return sequenceNumber from @BASEINFO
909 */
910 static int ltdb_sequence_number(struct ltdb_context *ctx,
911                                 struct ldb_extended **ext)
912 {
913         struct ldb_context *ldb;
914         struct ldb_module *module = ctx->module;
915         struct ldb_request *req = ctx->req;
916         TALLOC_CTX *tmp_ctx;
917         struct ldb_seqnum_request *seq;
918         struct ldb_seqnum_result *res;
919         struct ldb_message *msg = NULL;
920         struct ldb_dn *dn;
921         const char *date;
922         int ret;
923
924         ldb = ldb_module_get_ctx(module);
925
926         seq = talloc_get_type(req->op.extended.data,
927                                 struct ldb_seqnum_request);
928         if (seq == NULL) {
929                 return LDB_ERR_OPERATIONS_ERROR;
930         }
931
932         ldb_request_set_state(req, LDB_ASYNC_PENDING);
933
934         if (ltdb_lock_read(module) != 0) {
935                 return LDB_ERR_OPERATIONS_ERROR;
936         }
937
938         res = talloc_zero(req, struct ldb_seqnum_result);
939         if (res == NULL) {
940                 ret = LDB_ERR_OPERATIONS_ERROR;
941                 goto done;
942         }
943         tmp_ctx = talloc_new(req);
944         if (tmp_ctx == NULL) {
945                 ret = LDB_ERR_OPERATIONS_ERROR;
946                 goto done;
947         }
948
949         dn = ldb_dn_new(tmp_ctx, ldb, LTDB_BASEINFO);
950
951         msg = talloc(tmp_ctx, struct ldb_message);
952         if (msg == NULL) {
953                 ret = LDB_ERR_OPERATIONS_ERROR;
954                 goto done;
955         }
956
957         ret = ltdb_search_dn1(module, dn, msg);
958         if (ret != LDB_SUCCESS) {
959                 goto done;
960         }
961
962         switch (seq->type) {
963         case LDB_SEQ_HIGHEST_SEQ:
964                 res->seq_num = ldb_msg_find_attr_as_uint64(msg, LTDB_SEQUENCE_NUMBER, 0);
965                 break;
966         case LDB_SEQ_NEXT:
967                 res->seq_num = ldb_msg_find_attr_as_uint64(msg, LTDB_SEQUENCE_NUMBER, 0);
968                 res->seq_num++;
969                 break;
970         case LDB_SEQ_HIGHEST_TIMESTAMP:
971                 date = ldb_msg_find_attr_as_string(msg, LTDB_MOD_TIMESTAMP, NULL);
972                 if (date) {
973                         res->seq_num = ldb_string_to_time(date);
974                 } else {
975                         res->seq_num = 0;
976                         /* zero is as good as anything when we don't know */
977                 }
978                 break;
979         }
980
981         *ext = talloc_zero(req, struct ldb_extended);
982         if (*ext == NULL) {
983                 ret = LDB_ERR_OPERATIONS_ERROR;
984                 goto done;
985         }
986         (*ext)->oid = LDB_EXTENDED_SEQUENCE_NUMBER;
987         (*ext)->data = talloc_steal(*ext, res);
988
989         ret = LDB_SUCCESS;
990
991 done:
992         talloc_free(tmp_ctx);
993         ltdb_unlock_read(module);
994         return ret;
995 }
996
997 static void ltdb_request_done(struct ltdb_context *ctx, int error)
998 {
999         struct ldb_context *ldb;
1000         struct ldb_request *req;
1001         struct ldb_reply *ares;
1002
1003         ldb = ldb_module_get_ctx(ctx->module);
1004         req = ctx->req;
1005
1006         /* if we already returned an error just return */
1007         if (ldb_request_get_status(req) != LDB_SUCCESS) {
1008                 return;
1009         }
1010
1011         ares = talloc_zero(req, struct ldb_reply);
1012         if (!ares) {
1013                 ldb_oom(ldb);
1014                 req->callback(req, NULL);
1015                 return;
1016         }
1017         ares->type = LDB_REPLY_DONE;
1018         ares->error = error;
1019
1020         req->callback(req, ares);
1021 }
1022
1023 static void ltdb_timeout(struct tevent_context *ev,
1024                           struct tevent_timer *te,
1025                           struct timeval t,
1026                           void *private_data)
1027 {
1028         struct ltdb_context *ctx;
1029         ctx = talloc_get_type(private_data, struct ltdb_context);
1030
1031         if (!ctx->request_terminated) {
1032                 /* request is done now */
1033                 ltdb_request_done(ctx, LDB_ERR_TIME_LIMIT_EXCEEDED);
1034         }
1035
1036         if (!ctx->request_terminated) {
1037                 /* neutralize the spy */
1038                 ctx->spy->ctx = NULL;
1039         }
1040         talloc_free(ctx);
1041 }
1042
1043 static void ltdb_request_extended_done(struct ltdb_context *ctx,
1044                                         struct ldb_extended *ext,
1045                                         int error)
1046 {
1047         struct ldb_context *ldb;
1048         struct ldb_request *req;
1049         struct ldb_reply *ares;
1050
1051         ldb = ldb_module_get_ctx(ctx->module);
1052         req = ctx->req;
1053
1054         /* if we already returned an error just return */
1055         if (ldb_request_get_status(req) != LDB_SUCCESS) {
1056                 return;
1057         }
1058
1059         ares = talloc_zero(req, struct ldb_reply);
1060         if (!ares) {
1061                 ldb_oom(ldb);
1062                 req->callback(req, NULL);
1063                 return;
1064         }
1065         ares->type = LDB_REPLY_DONE;
1066         ares->response = ext;
1067         ares->error = error;
1068
1069         req->callback(req, ares);
1070 }
1071
1072 static void ltdb_handle_extended(struct ltdb_context *ctx)
1073 {
1074         struct ldb_extended *ext = NULL;
1075         int ret;
1076
1077         if (strcmp(ctx->req->op.extended.oid,
1078                    LDB_EXTENDED_SEQUENCE_NUMBER) == 0) {
1079                 /* get sequence number */
1080                 ret = ltdb_sequence_number(ctx, &ext);
1081         } else {
1082                 /* not recognized */
1083                 ret = LDB_ERR_UNSUPPORTED_CRITICAL_EXTENSION;
1084         }
1085
1086         ltdb_request_extended_done(ctx, ext, ret);
1087 }
1088
1089 static void ltdb_callback(struct tevent_context *ev,
1090                           struct tevent_timer *te,
1091                           struct timeval t,
1092                           void *private_data)
1093 {
1094         struct ltdb_context *ctx;
1095         int ret;
1096
1097         ctx = talloc_get_type(private_data, struct ltdb_context);
1098
1099         if (ctx->request_terminated) {
1100                 goto done;
1101         }
1102
1103         switch (ctx->req->operation) {
1104         case LDB_SEARCH:
1105                 ret = ltdb_search(ctx);
1106                 break;
1107         case LDB_ADD:
1108                 ret = ltdb_add(ctx);
1109                 break;
1110         case LDB_MODIFY:
1111                 ret = ltdb_modify(ctx);
1112                 break;
1113         case LDB_DELETE:
1114                 ret = ltdb_delete(ctx);
1115                 break;
1116         case LDB_RENAME:
1117                 ret = ltdb_rename(ctx);
1118                 break;
1119         case LDB_EXTENDED:
1120                 ltdb_handle_extended(ctx);
1121                 goto done;
1122         default:
1123                 /* no other op supported */
1124                 ret = LDB_ERR_UNWILLING_TO_PERFORM;
1125         }
1126
1127         if (!ctx->request_terminated) {
1128                 /* request is done now */
1129                 ltdb_request_done(ctx, ret);
1130         }
1131
1132 done:
1133         if (!ctx->request_terminated) {
1134                 /* neutralize the spy */
1135                 ctx->spy->ctx = NULL;
1136         }
1137         talloc_free(ctx);
1138 }
1139
1140 static int ltdb_request_destructor(void *ptr)
1141 {
1142         struct ltdb_req_spy *spy = talloc_get_type(ptr, struct ltdb_req_spy);
1143
1144         if (spy->ctx != NULL) {
1145                 spy->ctx->request_terminated = true;
1146         }
1147
1148         return 0;
1149 }
1150
1151 static int ltdb_handle_request(struct ldb_module *module,
1152                                struct ldb_request *req)
1153 {
1154         struct ldb_context *ldb;
1155         struct tevent_context *ev;
1156         struct ltdb_context *ac;
1157         struct tevent_timer *te;
1158         struct timeval tv;
1159
1160         if (check_critical_controls(req->controls)) {
1161                 return LDB_ERR_UNSUPPORTED_CRITICAL_EXTENSION;
1162         }
1163
1164         ldb = ldb_module_get_ctx(module);
1165
1166         if (req->starttime == 0 || req->timeout == 0) {
1167                 ldb_set_errstring(ldb, "Invalid timeout settings");
1168                 return LDB_ERR_TIME_LIMIT_EXCEEDED;
1169         }
1170
1171         ev = ldb_get_event_context(ldb);
1172
1173         ac = talloc_zero(ldb, struct ltdb_context);
1174         if (ac == NULL) {
1175                 ldb_set_errstring(ldb, "Out of Memory");
1176                 return LDB_ERR_OPERATIONS_ERROR;
1177         }
1178
1179         ac->module = module;
1180         ac->req = req;
1181
1182         tv.tv_sec = 0;
1183         tv.tv_usec = 0;
1184         te = tevent_add_timer(ev, ac, tv, ltdb_callback, ac);
1185         if (NULL == te) {
1186                 talloc_free(ac);
1187                 return LDB_ERR_OPERATIONS_ERROR;
1188         }
1189
1190         tv.tv_sec = req->starttime + req->timeout;
1191         ac->timeout_event = tevent_add_timer(ev, ac, tv, ltdb_timeout, ac);
1192         if (NULL == ac->timeout_event) {
1193                 talloc_free(ac);
1194                 return LDB_ERR_OPERATIONS_ERROR;
1195         }
1196
1197         /* set a spy so that we do not try to use the request context
1198          * if it is freed before ltdb_callback fires */
1199         ac->spy = talloc(req, struct ltdb_req_spy);
1200         if (NULL == ac->spy) {
1201                 talloc_free(ac);
1202                 return LDB_ERR_OPERATIONS_ERROR;
1203         }
1204         ac->spy->ctx = ac;
1205
1206         talloc_set_destructor((TALLOC_CTX *)ac->spy, ltdb_request_destructor);
1207
1208         return LDB_SUCCESS;
1209 }
1210
1211 static const struct ldb_module_ops ltdb_ops = {
1212         .name              = "tdb",
1213         .search            = ltdb_handle_request,
1214         .add               = ltdb_handle_request,
1215         .modify            = ltdb_handle_request,
1216         .del               = ltdb_handle_request,
1217         .rename            = ltdb_handle_request,
1218         .extended          = ltdb_handle_request,
1219         .start_transaction = ltdb_start_trans,
1220         .end_transaction   = ltdb_end_trans,
1221         .prepare_commit    = ltdb_prepare_commit,
1222         .del_transaction   = ltdb_del_trans,
1223 };
1224
1225 /*
1226   connect to the database
1227 */
1228 static int ltdb_connect(struct ldb_context *ldb, const char *url,
1229                         unsigned int flags, const char *options[],
1230                         struct ldb_module **_module)
1231 {
1232         struct ldb_module *module;
1233         const char *path;
1234         int tdb_flags, open_flags;
1235         struct ltdb_private *ltdb;
1236
1237         /* parse the url */
1238         if (strchr(url, ':')) {
1239                 if (strncmp(url, "tdb://", 6) != 0) {
1240                         ldb_debug(ldb, LDB_DEBUG_ERROR,
1241                                   "Invalid tdb URL '%s'", url);
1242                         return -1;
1243                 }
1244                 path = url+6;
1245         } else {
1246                 path = url;
1247         }
1248
1249         tdb_flags = TDB_DEFAULT | TDB_SEQNUM;
1250
1251         /* check for the 'nosync' option */
1252         if (flags & LDB_FLG_NOSYNC) {
1253                 tdb_flags |= TDB_NOSYNC;
1254         }
1255
1256         /* and nommap option */
1257         if (flags & LDB_FLG_NOMMAP) {
1258                 tdb_flags |= TDB_NOMMAP;
1259         }
1260
1261         if (flags & LDB_FLG_RDONLY) {
1262                 open_flags = O_RDONLY;
1263         } else {
1264                 open_flags = O_CREAT | O_RDWR;
1265         }
1266
1267         ltdb = talloc_zero(ldb, struct ltdb_private);
1268         if (!ltdb) {
1269                 ldb_oom(ldb);
1270                 return -1;
1271         }
1272
1273         /* note that we use quite a large default hash size */
1274         ltdb->tdb = ltdb_wrap_open(ltdb, path, 10000,
1275                                    tdb_flags, open_flags,
1276                                    ldb_get_create_perms(ldb), ldb);
1277         if (!ltdb->tdb) {
1278                 ldb_debug(ldb, LDB_DEBUG_ERROR,
1279                           "Unable to open tdb '%s'", path);
1280                 talloc_free(ltdb);
1281                 return -1;
1282         }
1283
1284         ltdb->sequence_number = 0;
1285
1286         module = ldb_module_new(ldb, ldb, "ldb_tdb backend", &ltdb_ops);
1287         if (!module) {
1288                 talloc_free(ltdb);
1289                 return -1;
1290         }
1291         ldb_module_set_private(module, ltdb);
1292
1293         if (ltdb_cache_load(module) != 0) {
1294                 talloc_free(module);
1295                 talloc_free(ltdb);
1296                 return -1;
1297         }
1298
1299         *_module = module;
1300         return 0;
1301 }
1302
1303 const struct ldb_backend_ops ldb_tdb_backend_ops = {
1304         .name = "tdb",
1305         .connect_fn = ltdb_connect
1306 };