Fix for schemaUpdateNow command
[samba.git] / source4 / dsdb / samdb / ldb_modules / partition.c
1 /* 
2    Partitions ldb module
3
4    Copyright (C) Andrew Bartlett <abartlet@samba.org> 2006
5    Copyright (C) Stefan Metzmacher <metze@samba.org> 2007
6
7    * NOTICE: this module is NOT released under the GNU LGPL license as
8    * other ldb code. This module is release under the GNU GPL v3 or
9    * later license.
10
11    This program is free software; you can redistribute it and/or modify
12    it under the terms of the GNU General Public License as published by
13    the Free Software Foundation; either version 3 of the License, or
14    (at your option) any later version.
15    
16    This program is distributed in the hope that it will be useful,
17    but WITHOUT ANY WARRANTY; without even the implied warranty of
18    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19    GNU General Public License for more details.
20    
21    You should have received a copy of the GNU General Public License
22    along with this program.  If not, see <http://www.gnu.org/licenses/>.
23 */
24
25 /*
26  *  Name: ldb
27  *
28  *  Component: ldb partitions module
29  *
30  *  Description: Implement LDAP partitions
31  *
32  *  Author: Andrew Bartlett
33  *  Author: Stefan Metzmacher
34  */
35
36 #include "includes.h"
37 #include "lib/ldb/include/ldb.h"
38 #include "lib/ldb/include/ldb_errors.h"
39 #include "lib/ldb/include/ldb_module.h"
40 #include "lib/ldb/include/ldb_private.h"
41 #include "dsdb/samdb/samdb.h"
42
43 struct partition_private_data {
44         struct dsdb_control_current_partition **partitions;
45         struct ldb_dn **replicate;
46 };
47
48 struct part_request {
49         struct ldb_module *module;
50         struct ldb_request *req;
51 };
52
53 struct partition_context {
54         struct ldb_module *module;
55         struct ldb_request *req;
56         bool got_success;
57
58         struct part_request *part_req;
59         int num_requests;
60         int finished_requests;
61 };
62
63 static struct partition_context *partition_init_ctx(struct ldb_module *module, struct ldb_request *req)
64 {
65         struct partition_context *ac;
66
67         ac = talloc_zero(req, struct partition_context);
68         if (ac == NULL) {
69                 ldb_set_errstring(ldb_module_get_ctx(module), "Out of Memory");
70                 return NULL;
71         }
72
73         ac->module = module;
74         ac->req = req;
75
76         return ac;
77 }
78
79 #define PARTITION_FIND_OP_NOERROR(module, op) do { \
80         while (module && module->ops->op == NULL) module = module->next; \
81 } while (0)
82
83 #define PARTITION_FIND_OP(module, op) do { \
84         PARTITION_FIND_OP_NOERROR(module, op); \
85         if (module == NULL) { \
86                 ldb_asprintf_errstring(ldb_module_get_ctx(module), \
87                         "Unable to find backend operation for " #op ); \
88                 return LDB_ERR_OPERATIONS_ERROR; \
89         } \
90 } while (0)
91
92 /*
93  *    helper functions to call the next module in chain
94  *    */
95
96 static int partition_request(struct ldb_module *module, struct ldb_request *request)
97 {
98         int ret;
99         switch (request->operation) {
100         case LDB_SEARCH:
101                 PARTITION_FIND_OP(module, search);
102                 ret = module->ops->search(module, request);
103                 break;
104         case LDB_ADD:
105                 PARTITION_FIND_OP(module, add);
106                 ret = module->ops->add(module, request);
107                 break;
108         case LDB_MODIFY:
109                 PARTITION_FIND_OP(module, modify);
110                 ret = module->ops->modify(module, request);
111                 break;
112         case LDB_DELETE:
113                 PARTITION_FIND_OP(module, del);
114                 ret = module->ops->del(module, request);
115                 break;
116         case LDB_RENAME:
117                 PARTITION_FIND_OP(module, rename);
118                 ret = module->ops->rename(module, request);
119                 break;
120         case LDB_EXTENDED:
121                 PARTITION_FIND_OP(module, extended);
122                 ret = module->ops->extended(module, request);
123                 break;
124         default:
125                 PARTITION_FIND_OP(module, request);
126                 ret = module->ops->request(module, request);
127                 break;
128         }
129         if (ret == LDB_SUCCESS) {
130                 return ret;
131         }
132         if (!ldb_errstring(ldb_module_get_ctx(module))) {
133                 /* Set a default error string, to place the blame somewhere */
134                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
135                                         "error in module %s: %s (%d)",
136                                         module->ops->name,
137                                         ldb_strerror(ret), ret);
138         }
139         return ret;
140 }
141
142 static struct dsdb_control_current_partition *find_partition(struct partition_private_data *data,
143                                                              struct ldb_dn *dn)
144 {
145         int i;
146
147         /* Look at base DN */
148         /* Figure out which partition it is under */
149         /* Skip the lot if 'data' isn't here yet (initialisation) */
150         for (i=0; data && data->partitions && data->partitions[i]; i++) {
151                 if (ldb_dn_compare_base(data->partitions[i]->dn, dn) == 0) {
152                         return data->partitions[i];
153                 }
154         }
155
156         return NULL;
157 }
158
159 /**
160  * fire the caller's callback for every entry, but only send 'done' once.
161  */
162 static int partition_req_callback(struct ldb_request *req,
163                                   struct ldb_reply *ares)
164 {
165         struct partition_context *ac;
166         struct ldb_module *module;
167         struct ldb_request *nreq;
168         int ret;
169
170         ac = talloc_get_type(req->context, struct partition_context);
171
172         if (!ares) {
173                 return ldb_module_done(ac->req, NULL, NULL,
174                                         LDB_ERR_OPERATIONS_ERROR);
175         }
176
177         if (ares->error != LDB_SUCCESS && !ac->got_success) {
178                 return ldb_module_done(ac->req, ares->controls,
179                                         ares->response, ares->error);
180         }
181
182         switch (ares->type) {
183         case LDB_REPLY_REFERRAL:
184                 /* ignore referrals for now */
185                 break;
186
187         case LDB_REPLY_ENTRY:
188                 if (ac->req->operation != LDB_SEARCH) {
189                         ldb_set_errstring(ldb_module_get_ctx(ac->module),
190                                 "partition_req_callback:"
191                                 " Unsupported reply type for this request");
192                         return ldb_module_done(ac->req, NULL, NULL,
193                                                 LDB_ERR_OPERATIONS_ERROR);
194                 }
195                 return ldb_module_send_entry(ac->req, ares->message, ares->controls);
196
197         case LDB_REPLY_DONE:
198                 if (ares->error == LDB_SUCCESS) {
199                         ac->got_success = true;
200                 }
201                 if (ac->req->operation == LDB_EXTENDED) {
202                         /* FIXME: check for ares->response, replmd does not fill it ! */
203                         if (ares->response) {
204                                 if (strcmp(ares->response->oid, LDB_EXTENDED_START_TLS_OID) != 0) {
205                                         ldb_set_errstring(ldb_module_get_ctx(ac->module),
206                                                           "partition_req_callback:"
207                                                           " Unknown extended reply, "
208                                                           "only supports START_TLS");
209                                         talloc_free(ares);
210                                         return ldb_module_done(ac->req, NULL, NULL,
211                                                                 LDB_ERR_OPERATIONS_ERROR);
212                                 }
213                         }
214                 }
215
216                 ac->finished_requests++;
217                 if (ac->finished_requests == ac->num_requests) {
218                         /* this was the last one, call callback */
219                         return ldb_module_done(ac->req, ares->controls,
220                                                ares->response, 
221                                                ac->got_success?LDB_SUCCESS:ares->error);
222                 }
223
224                 /* not the last, now call the next one */
225                 module = ac->part_req[ac->finished_requests].module;
226                 nreq = ac->part_req[ac->finished_requests].req;
227
228                 ret = partition_request(module, nreq);
229                 if (ret != LDB_SUCCESS) {
230                         talloc_free(ares);
231                         return ldb_module_done(ac->req, NULL, NULL, ret);
232                 }
233
234                 break;
235         }
236
237         talloc_free(ares);
238         return LDB_SUCCESS;
239 }
240
241 static int partition_prep_request(struct partition_context *ac,
242                                   struct dsdb_control_current_partition *partition)
243 {
244         int ret;
245         struct ldb_request *req;
246
247         ac->part_req = talloc_realloc(ac, ac->part_req,
248                                         struct part_request,
249                                         ac->num_requests + 1);
250         if (ac->part_req == NULL) {
251                 ldb_oom(ldb_module_get_ctx(ac->module));
252                 return LDB_ERR_OPERATIONS_ERROR;
253         }
254
255         switch (ac->req->operation) {
256         case LDB_SEARCH:
257                 ret = ldb_build_search_req_ex(&req, ldb_module_get_ctx(ac->module),
258                                         ac->part_req,
259                                         ac->req->op.search.base,
260                                         ac->req->op.search.scope,
261                                         ac->req->op.search.tree,
262                                         ac->req->op.search.attrs,
263                                         ac->req->controls,
264                                         ac, partition_req_callback,
265                                         ac->req);
266                 break;
267         case LDB_ADD:
268                 ret = ldb_build_add_req(&req, ldb_module_get_ctx(ac->module), ac->part_req,
269                                         ac->req->op.add.message,
270                                         ac->req->controls,
271                                         ac, partition_req_callback,
272                                         ac->req);
273                 break;
274         case LDB_MODIFY:
275                 ret = ldb_build_mod_req(&req, ldb_module_get_ctx(ac->module), ac->part_req,
276                                         ac->req->op.mod.message,
277                                         ac->req->controls,
278                                         ac, partition_req_callback,
279                                         ac->req);
280                 break;
281         case LDB_DELETE:
282                 ret = ldb_build_del_req(&req, ldb_module_get_ctx(ac->module), ac->part_req,
283                                         ac->req->op.del.dn,
284                                         ac->req->controls,
285                                         ac, partition_req_callback,
286                                         ac->req);
287                 break;
288         case LDB_RENAME:
289                 ret = ldb_build_rename_req(&req, ldb_module_get_ctx(ac->module), ac->part_req,
290                                         ac->req->op.rename.olddn,
291                                         ac->req->op.rename.newdn,
292                                         ac->req->controls,
293                                         ac, partition_req_callback,
294                                         ac->req);
295                 break;
296         case LDB_EXTENDED:
297                 ret = ldb_build_extended_req(&req, ldb_module_get_ctx(ac->module),
298                                         ac->part_req,
299                                         ac->req->op.extended.oid,
300                                         ac->req->op.extended.data,
301                                         ac->req->controls,
302                                         ac, partition_req_callback,
303                                         ac->req);
304                 break;
305         default:
306                 ldb_set_errstring(ldb_module_get_ctx(ac->module),
307                                   "Unsupported request type!");
308                 ret = LDB_ERR_UNWILLING_TO_PERFORM;
309         }
310
311         if (ret != LDB_SUCCESS) {
312                 return ret;
313         }
314
315         ac->part_req[ac->num_requests].req = req;
316
317         if (ac->req->controls) {
318                 req->controls = talloc_memdup(req, ac->req->controls,
319                                         talloc_get_size(ac->req->controls));
320                 if (req->controls == NULL) {
321                         ldb_oom(ldb_module_get_ctx(ac->module));
322                         return LDB_ERR_OPERATIONS_ERROR;
323                 }
324         }
325
326         if (partition) {
327                 ac->part_req[ac->num_requests].module = partition->module;
328
329                 ret = ldb_request_add_control(req,
330                                         DSDB_CONTROL_CURRENT_PARTITION_OID,
331                                         false, partition);
332                 if (ret != LDB_SUCCESS) {
333                         return ret;
334                 }
335
336                 if (req->operation == LDB_SEARCH) {
337                         /* If the search is for 'more' than this partition,
338                          * then change the basedn, so a remote LDAP server
339                          * doesn't object */
340                         if (ldb_dn_compare_base(partition->dn,
341                                                 req->op.search.base) != 0) {
342                                 req->op.search.base = partition->dn;
343                         }
344                 }
345
346         } else {
347                 /* make sure you put the NEXT module here, or
348                  * partition_request() will simply loop forever on itself */
349                 ac->part_req[ac->num_requests].module = ac->module->next;
350         }
351
352         ac->num_requests++;
353
354         return LDB_SUCCESS;
355 }
356
357 static int partition_call_first(struct partition_context *ac)
358 {
359         return partition_request(ac->part_req[0].module, ac->part_req[0].req);
360 }
361
362 /**
363  * Send a request down to all the partitions
364  */
365 static int partition_send_all(struct ldb_module *module, 
366                               struct partition_context *ac, 
367                               struct ldb_request *req) 
368 {
369         int i;
370         struct partition_private_data *data = talloc_get_type(module->private_data, 
371                                                               struct partition_private_data);
372         int ret = partition_prep_request(ac, NULL);
373         if (ret != LDB_SUCCESS) {
374                 return ret;
375         }
376         for (i=0; data && data->partitions && data->partitions[i]; i++) {
377                 ret = partition_prep_request(ac, data->partitions[i]);
378                 if (ret != LDB_SUCCESS) {
379                         return ret;
380                 }
381         }
382
383         /* fire the first one */
384         return partition_call_first(ac);
385 }
386
387 /**
388  * Figure out which backend a request needs to be aimed at.  Some
389  * requests must be replicated to all backends
390  */
391 static int partition_replicate(struct ldb_module *module, struct ldb_request *req, struct ldb_dn *dn) 
392 {
393         struct partition_context *ac;
394         unsigned i;
395         int ret;
396         struct dsdb_control_current_partition *partition;
397         struct partition_private_data *data = talloc_get_type(module->private_data, 
398                                                               struct partition_private_data);
399         if (!data || !data->partitions) {
400                 return ldb_next_request(module, req);
401         }
402         
403         if (req->operation != LDB_SEARCH) {
404                 /* Is this a special DN, we need to replicate to every backend? */
405                 for (i=0; data->replicate && data->replicate[i]; i++) {
406                         if (ldb_dn_compare(data->replicate[i], 
407                                            dn) == 0) {
408                                 
409                                 ac = partition_init_ctx(module, req);
410                                 if (!ac) {
411                                         return LDB_ERR_OPERATIONS_ERROR;
412                                 }
413                                 
414                                 return partition_send_all(module, ac, req);
415                         }
416                 }
417         }
418
419         /* Otherwise, we need to find the partition to fire it to */
420
421         /* Find partition */
422         partition = find_partition(data, dn);
423         if (!partition) {
424                 /*
425                  * if we haven't found a matching partition
426                  * pass the request to the main ldb
427                  *
428                  * TODO: we should maybe return an error here
429                  *       if it's not a special dn
430                  */
431
432                 return ldb_next_request(module, req);
433         }
434
435         ac = partition_init_ctx(module, req);
436         if (!ac) {
437                 return LDB_ERR_OPERATIONS_ERROR;
438         }
439
440         /* we need to add a control but we never touch the original request */
441         ret = partition_prep_request(ac, partition);
442         if (ret != LDB_SUCCESS) {
443                 return ret;
444         }
445
446         /* fire the first one */
447         return partition_call_first(ac);
448 }
449
450 /* search */
451 static int partition_search(struct ldb_module *module, struct ldb_request *req)
452 {
453         struct ldb_control **saved_controls;
454         
455         /* Find backend */
456         struct partition_private_data *data = talloc_get_type(module->private_data, 
457                                                               struct partition_private_data);
458         /* issue request */
459
460         /* (later) consider if we should be searching multiple
461          * partitions (for 'invisible' partition behaviour */
462
463         struct ldb_control *search_control = ldb_request_get_control(req, LDB_CONTROL_SEARCH_OPTIONS_OID);
464         struct ldb_control *domain_scope_control = ldb_request_get_control(req, LDB_CONTROL_DOMAIN_SCOPE_OID);
465         
466         struct ldb_search_options_control *search_options = NULL;
467         if (search_control) {
468                 search_options = talloc_get_type(search_control->data, struct ldb_search_options_control);
469         }
470
471         /* Remove the domain_scope control, so we don't confuse a backend server */
472         if (domain_scope_control && !save_controls(domain_scope_control, req, &saved_controls)) {
473                 ldb_oom(ldb_module_get_ctx(module));
474                 return LDB_ERR_OPERATIONS_ERROR;
475         }
476
477         /*
478          * for now pass down the LDB_CONTROL_SEARCH_OPTIONS_OID control
479          * down as uncritical to make windows 2008 dcpromo happy.
480          */
481         if (search_control) {
482                 search_control->critical = 0;
483         }
484
485         /* TODO:
486            Generate referrals (look for a partition under this DN) if we don't have the above control specified
487         */
488         
489         if (search_options && (search_options->search_options & LDB_SEARCH_OPTION_PHANTOM_ROOT)) {
490                 int ret, i;
491                 struct partition_context *ac;
492                 if ((search_options->search_options & ~LDB_SEARCH_OPTION_PHANTOM_ROOT) == 0) {
493                         /* We have processed this flag, so we are done with this control now */
494
495                         /* Remove search control, so we don't confuse a backend server */
496                         if (search_control && !save_controls(search_control, req, &saved_controls)) {
497                                 ldb_oom(ldb_module_get_ctx(module));
498                                 return LDB_ERR_OPERATIONS_ERROR;
499                         }
500                 }
501                 ac = partition_init_ctx(module, req);
502                 if (!ac) {
503                         return LDB_ERR_OPERATIONS_ERROR;
504                 }
505
506                 /* Search from the base DN */
507                 if (!req->op.search.base || ldb_dn_is_null(req->op.search.base)) {
508                         return partition_send_all(module, ac, req);
509                 }
510                 for (i=0; data && data->partitions && data->partitions[i]; i++) {
511                         bool match = false, stop = false;
512                         /* Find all partitions under the search base 
513                            
514                            we match if:
515
516                               1) the DN we are looking for exactly matches the partition
517                              or
518                               2) the DN we are looking for is a parent of the partition and it isn't
519                                  a scope base search
520                              or
521                               3) the DN we are looking for is a child of the partition
522                          */
523                         if (ldb_dn_compare(data->partitions[i]->dn, req->op.search.base) == 0) {
524                                 match = true;
525                                 if (req->op.search.scope == LDB_SCOPE_BASE) {
526                                         stop = true;
527                                 }
528                         }
529                         if (!match && 
530                             (ldb_dn_compare_base(req->op.search.base, data->partitions[i]->dn) == 0 &&
531                              req->op.search.scope != LDB_SCOPE_BASE)) {
532                                 match = true;
533                         }
534                         if (!match &&
535                             ldb_dn_compare_base(data->partitions[i]->dn, req->op.search.base) == 0) {
536                                 match = true;
537                                 stop = true; /* note that this relies on partition ordering */
538                         }
539                         if (match) {
540                                 ret = partition_prep_request(ac, data->partitions[i]);
541                                 if (ret != LDB_SUCCESS) {
542                                         return ret;
543                                 }
544                         }
545                         if (stop) break;
546                 }
547
548                 /* Perhaps we didn't match any partitions.  Try the main partition, only */
549                 if (ac->num_requests == 0) {
550                         talloc_free(ac);
551                         return ldb_next_request(module, req);
552                 }
553
554                 /* fire the first one */
555                 return partition_call_first(ac);
556
557         } else {
558                 /* Handle this like all other requests */
559                 if (search_control && (search_options->search_options & ~LDB_SEARCH_OPTION_PHANTOM_ROOT) == 0) {
560                         /* We have processed this flag, so we are done with this control now */
561
562                         /* Remove search control, so we don't confuse a backend server */
563                         if (search_control && !save_controls(search_control, req, &saved_controls)) {
564                                 ldb_oom(ldb_module_get_ctx(module));
565                                 return LDB_ERR_OPERATIONS_ERROR;
566                         }
567                 }
568
569                 return partition_replicate(module, req, req->op.search.base);
570         }
571 }
572
573 /* add */
574 static int partition_add(struct ldb_module *module, struct ldb_request *req)
575 {
576         return partition_replicate(module, req, req->op.add.message->dn);
577 }
578
579 /* modify */
580 static int partition_modify(struct ldb_module *module, struct ldb_request *req)
581 {
582         return partition_replicate(module, req, req->op.mod.message->dn);
583 }
584
585 /* delete */
586 static int partition_delete(struct ldb_module *module, struct ldb_request *req)
587 {
588         return partition_replicate(module, req, req->op.del.dn);
589 }
590
591 /* rename */
592 static int partition_rename(struct ldb_module *module, struct ldb_request *req)
593 {
594         /* Find backend */
595         struct dsdb_control_current_partition *backend, *backend2;
596         
597         struct partition_private_data *data = talloc_get_type(module->private_data, 
598                                                               struct partition_private_data);
599
600         /* Skip the lot if 'data' isn't here yet (initialisation) */
601         if (!data) {
602                 return LDB_ERR_OPERATIONS_ERROR;
603         }
604
605         backend = find_partition(data, req->op.rename.olddn);
606         backend2 = find_partition(data, req->op.rename.newdn);
607
608         if ((backend && !backend2) || (!backend && backend2)) {
609                 return LDB_ERR_AFFECTS_MULTIPLE_DSAS;
610         }
611
612         if (backend != backend2) {
613                 ldb_asprintf_errstring(ldb_module_get_ctx(module), 
614                                        "Cannot rename from %s in %s to %s in %s: %s",
615                                        ldb_dn_get_linearized(req->op.rename.olddn),
616                                        ldb_dn_get_linearized(backend->dn),
617                                        ldb_dn_get_linearized(req->op.rename.newdn),
618                                        ldb_dn_get_linearized(backend2->dn),
619                                        ldb_strerror(LDB_ERR_AFFECTS_MULTIPLE_DSAS));
620                 return LDB_ERR_AFFECTS_MULTIPLE_DSAS;
621         }
622
623         return partition_replicate(module, req, req->op.rename.olddn);
624 }
625
626 /* start a transaction */
627 static int partition_start_trans(struct ldb_module *module)
628 {
629         int i, ret;
630         struct partition_private_data *data = talloc_get_type(module->private_data, 
631                                                               struct partition_private_data);
632         /* Look at base DN */
633         /* Figure out which partition it is under */
634         /* Skip the lot if 'data' isn't here yet (initialization) */
635         ret = ldb_next_start_trans(module);
636         if (ret != LDB_SUCCESS) {
637                 return ret;
638         }
639
640         for (i=0; data && data->partitions && data->partitions[i]; i++) {
641                 struct ldb_module *next = data->partitions[i]->module;
642                 PARTITION_FIND_OP(next, start_transaction);
643
644                 ret = next->ops->start_transaction(next);
645                 if (ret != LDB_SUCCESS) {
646                         /* Back it out, if it fails on one */
647                         for (i--; i >= 0; i--) {
648                                 next = data->partitions[i]->module;
649                                 PARTITION_FIND_OP(next, del_transaction);
650
651                                 next->ops->del_transaction(next);
652                         }
653                         ldb_next_del_trans(module);
654                         return ret;
655                 }
656         }
657         return LDB_SUCCESS;
658 }
659
660 /* end a transaction */
661 static int partition_end_trans(struct ldb_module *module)
662 {
663         int i, ret, final_ret;
664         struct partition_private_data *data = talloc_get_type(module->private_data, 
665                                                               struct partition_private_data);
666         ret = ldb_next_end_trans(module);
667         if (ret != LDB_SUCCESS) {
668                 return ret;
669         }
670
671         /* if the backend has a prepare_commit op then use that, to ensure
672            that all partitions are committed safely together */
673         for (i=0; data && data->partitions && data->partitions[i]; i++) {
674                 struct ldb_module *next_end = data->partitions[i]->module;
675                 struct ldb_module *next_prepare = data->partitions[i]->module;
676                 struct ldb_module *next_del = data->partitions[i]->module;
677
678                 PARTITION_FIND_OP_NOERROR(next_prepare, prepare_commit);
679                 if (next_prepare == NULL) {
680                         continue;
681                 }
682
683                 PARTITION_FIND_OP(next_end, end_transaction);
684                 PARTITION_FIND_OP(next_del, del_transaction);
685
686                 if (next_end != next_prepare || next_del != next_end) {
687                         ldb_asprintf_errstring(ldb_module_get_ctx(module), "ERROR: Mismatch between prepare and commit ops in ldb module");
688                         return LDB_ERR_OPERATIONS_ERROR;
689                 }
690                 
691                 ret = next_prepare->ops->prepare_commit(next_prepare);
692                 if (ret != LDB_SUCCESS) {
693                         /* if one fails, cancel all but this one */
694                         int j;
695                         for (j=0; data->partitions[j]; j++) {
696                                 if (j == i) continue;
697                                 next_del = data->partitions[j]->module;
698                                 PARTITION_FIND_OP(next_del, del_transaction);
699                                 next_del->ops->del_transaction(next_del);
700                         }
701                         ldb_next_del_trans(module);
702                         return ret;
703                 }
704         }
705
706         /* Look at base DN */
707         /* Figure out which partition it is under */
708         /* Skip the lot if 'data' isn't here yet (initialisation) */
709         final_ret = LDB_SUCCESS;
710
711         for (i=0; data && data->partitions && data->partitions[i]; i++) {
712                 struct ldb_module *next = data->partitions[i]->module;
713                 PARTITION_FIND_OP(next, end_transaction);
714
715                 ret = next->ops->end_transaction(next);
716                 if (ret != LDB_SUCCESS) {
717                         /* this should only be happening if we had a serious 
718                            OS or hardware error */
719                         ldb_asprintf_errstring(ldb_module_get_ctx(module), "ERROR: partition commit error");
720                         final_ret = ret;
721                 }
722         }
723
724         return final_ret;
725 }
726
727 /* delete a transaction */
728 static int partition_del_trans(struct ldb_module *module)
729 {
730         int i, ret, ret2 = LDB_SUCCESS;
731         struct partition_private_data *data = talloc_get_type(module->private_data, 
732                                                               struct partition_private_data);
733         ret = ldb_next_del_trans(module);
734         if (ret != LDB_SUCCESS) {
735                 ret2 = ret;
736         }
737
738         /* Look at base DN */
739         /* Figure out which partition it is under */
740         /* Skip the lot if 'data' isn't here yet (initialistion) */
741         for (i=0; data && data->partitions && data->partitions[i]; i++) {
742                 struct ldb_module *next = data->partitions[i]->module;
743                 PARTITION_FIND_OP(next, del_transaction);
744
745                 ret = next->ops->del_transaction(next);
746                 if (ret != LDB_SUCCESS) {
747                         ret2 = ret;
748                 }
749         }
750         return ret2;
751 }
752
753
754 /* FIXME: This function is still semi-async */
755 static int partition_sequence_number(struct ldb_module *module, struct ldb_request *req)
756 {
757         int i, ret;
758         uint64_t seq_number = 0;
759         uint64_t timestamp_sequence = 0;
760         uint64_t timestamp = 0;
761         struct partition_private_data *data = talloc_get_type(module->private_data, 
762                                                               struct partition_private_data);
763         struct ldb_seqnum_request *seq;
764         struct ldb_seqnum_result *seqr;
765         struct ldb_request *treq;
766         struct ldb_seqnum_request *tseq;
767         struct ldb_seqnum_result *tseqr;
768         struct ldb_extended *ext;
769         struct ldb_result *res;
770
771         seq = talloc_get_type(req->op.extended.data, struct ldb_seqnum_request);
772
773         switch (seq->type) {
774         case LDB_SEQ_NEXT:
775         case LDB_SEQ_HIGHEST_SEQ:
776                 res = talloc_zero(req, struct ldb_result);
777                 if (res == NULL) {
778                         return LDB_ERR_OPERATIONS_ERROR;
779                 }
780                 tseq = talloc_zero(res, struct ldb_seqnum_request);
781                 if (tseq == NULL) {
782                         talloc_free(res);
783                         return LDB_ERR_OPERATIONS_ERROR;
784                 }
785                 tseq->type = seq->type;
786
787                 ret = ldb_build_extended_req(&treq, ldb_module_get_ctx(module), res,
788                                              LDB_EXTENDED_SEQUENCE_NUMBER,
789                                              tseq,
790                                              NULL,
791                                              res,
792                                              ldb_extended_default_callback,
793                                              NULL);
794                 ret = ldb_next_request(module, treq);
795                 if (ret == LDB_SUCCESS) {
796                         ret = ldb_wait(treq->handle, LDB_WAIT_ALL);
797                 }
798                 if (ret != LDB_SUCCESS) {
799                         talloc_free(res);
800                         return ret;
801                 }
802                 seqr = talloc_get_type(res->extended->data,
803                                         struct ldb_seqnum_result);
804                 if (seqr->flags & LDB_SEQ_TIMESTAMP_SEQUENCE) {
805                         timestamp_sequence = seqr->seq_num;
806                 } else {
807                         seq_number += seqr->seq_num;
808                 }
809                 talloc_free(res);
810
811                 /* Skip the lot if 'data' isn't here yet (initialisation) */
812                 for (i=0; data && data->partitions && data->partitions[i]; i++) {
813
814                         res = talloc_zero(req, struct ldb_result);
815                         if (res == NULL) {
816                                 return LDB_ERR_OPERATIONS_ERROR;
817                         }
818                         tseq = talloc_zero(res, struct ldb_seqnum_request);
819                         if (tseq == NULL) {
820                                 talloc_free(res);
821                                 return LDB_ERR_OPERATIONS_ERROR;
822                         }
823                         tseq->type = seq->type;
824
825                         ret = ldb_build_extended_req(&treq, ldb_module_get_ctx(module), res,
826                                                      LDB_EXTENDED_SEQUENCE_NUMBER,
827                                                      tseq,
828                                                      NULL,
829                                                      res,
830                                                      ldb_extended_default_callback,
831                                                      NULL);
832                         if (ret != LDB_SUCCESS) {
833                                 talloc_free(res);
834                                 return ret;
835                         }
836
837                         ret = ldb_request_add_control(treq,
838                                                       DSDB_CONTROL_CURRENT_PARTITION_OID,
839                                                       false, data->partitions[i]);
840                         if (ret != LDB_SUCCESS) {
841                                 talloc_free(res);
842                                 return ret;
843                         }
844
845                         ret = partition_request(data->partitions[i]->module, treq);
846                         if (ret != LDB_SUCCESS) {
847                                 talloc_free(res);
848                                 return ret;
849                         }
850                         ret = ldb_wait(treq->handle, LDB_WAIT_ALL);
851                         if (ret != LDB_SUCCESS) {
852                                 talloc_free(res);
853                                 return ret;
854                         }
855                         tseqr = talloc_get_type(res->extended->data,
856                                                 struct ldb_seqnum_result);
857                         if (tseqr->flags & LDB_SEQ_TIMESTAMP_SEQUENCE) {
858                                 timestamp_sequence = MAX(timestamp_sequence,
859                                                          tseqr->seq_num);
860                         } else {
861                                 seq_number += tseqr->seq_num;
862                         }
863                         talloc_free(res);
864                 }
865                 /* fall through */
866         case LDB_SEQ_HIGHEST_TIMESTAMP:
867
868                 res = talloc_zero(req, struct ldb_result);
869                 if (res == NULL) {
870                         return LDB_ERR_OPERATIONS_ERROR;
871                 }
872
873                 tseq = talloc_zero(res, struct ldb_seqnum_request);
874                 if (tseq == NULL) {
875                         talloc_free(res);
876                         return LDB_ERR_OPERATIONS_ERROR;
877                 }
878                 tseq->type = LDB_SEQ_HIGHEST_TIMESTAMP;
879
880                 ret = ldb_build_extended_req(&treq, ldb_module_get_ctx(module), res,
881                                              LDB_EXTENDED_SEQUENCE_NUMBER,
882                                              tseq,
883                                              NULL,
884                                              res,
885                                              ldb_extended_default_callback,
886                                              NULL);
887                 if (ret != LDB_SUCCESS) {
888                         talloc_free(res);
889                         return ret;
890                 }
891
892                 ret = ldb_next_request(module, treq);
893                 if (ret != LDB_SUCCESS) {
894                         talloc_free(res);
895                         return ret;
896                 }
897                 ret = ldb_wait(treq->handle, LDB_WAIT_ALL);
898                 if (ret != LDB_SUCCESS) {
899                         talloc_free(res);
900                         return ret;
901                 }
902
903                 tseqr = talloc_get_type(res->extended->data,
904                                            struct ldb_seqnum_result);
905                 timestamp = tseqr->seq_num;
906
907                 talloc_free(res);
908
909                 /* Skip the lot if 'data' isn't here yet (initialisation) */
910                 for (i=0; data && data->partitions && data->partitions[i]; i++) {
911
912                         res = talloc_zero(req, struct ldb_result);
913                         if (res == NULL) {
914                                 return LDB_ERR_OPERATIONS_ERROR;
915                         }
916
917                         tseq = talloc_zero(res, struct ldb_seqnum_request);
918                         if (tseq == NULL) {
919                                 talloc_free(res);
920                                 return LDB_ERR_OPERATIONS_ERROR;
921                         }
922                         tseq->type = LDB_SEQ_HIGHEST_TIMESTAMP;
923
924                         ret = ldb_build_extended_req(&treq, ldb_module_get_ctx(module), res,
925                                                      LDB_EXTENDED_SEQUENCE_NUMBER,
926                                                      tseq,
927                                                      NULL,
928                                                      res,
929                                                      ldb_extended_default_callback,
930                                                      NULL);
931                         if (ret != LDB_SUCCESS) {
932                                 talloc_free(res);
933                                 return ret;
934                         }
935
936                         ret = ldb_request_add_control(treq,
937                                                       DSDB_CONTROL_CURRENT_PARTITION_OID,
938                                                       false, data->partitions[i]);
939                         if (ret != LDB_SUCCESS) {
940                                 talloc_free(res);
941                                 return ret;
942                         }
943
944                         ret = partition_request(data->partitions[i]->module, treq);
945                         if (ret != LDB_SUCCESS) {
946                                 talloc_free(res);
947                                 return ret;
948                         }
949                         ret = ldb_wait(treq->handle, LDB_WAIT_ALL);
950                         if (ret != LDB_SUCCESS) {
951                                 talloc_free(res);
952                                 return ret;
953                         }
954
955                         tseqr = talloc_get_type(res->extended->data,
956                                                   struct ldb_seqnum_result);
957                         timestamp = MAX(timestamp, tseqr->seq_num);
958
959                         talloc_free(res);
960                 }
961
962                 break;
963         }
964
965         ext = talloc_zero(req, struct ldb_extended);
966         if (!ext) {
967                 return LDB_ERR_OPERATIONS_ERROR;
968         }
969         seqr = talloc_zero(ext, struct ldb_seqnum_result);
970         if (seqr == NULL) {
971                 talloc_free(ext);
972                 return LDB_ERR_OPERATIONS_ERROR;
973         }
974         ext->oid = LDB_EXTENDED_SEQUENCE_NUMBER;
975         ext->data = seqr;
976
977         switch (seq->type) {
978         case LDB_SEQ_NEXT:
979         case LDB_SEQ_HIGHEST_SEQ:
980
981                 /* Has someone above set a timebase sequence? */
982                 if (timestamp_sequence) {
983                         seqr->seq_num = (((unsigned long long)timestamp << 24) | (seq_number & 0xFFFFFF));
984                 } else {
985                         seqr->seq_num = seq_number;
986                 }
987
988                 if (timestamp_sequence > seqr->seq_num) {
989                         seqr->seq_num = timestamp_sequence;
990                         seqr->flags |= LDB_SEQ_TIMESTAMP_SEQUENCE;
991                 }
992
993                 seqr->flags |= LDB_SEQ_GLOBAL_SEQUENCE;
994                 break;
995         case LDB_SEQ_HIGHEST_TIMESTAMP:
996                 seqr->seq_num = timestamp;
997                 break;
998         }
999
1000         if (seq->type == LDB_SEQ_NEXT) {
1001                 seqr->seq_num++;
1002         }
1003
1004         /* send request done */
1005         return ldb_module_done(req, NULL, ext, LDB_SUCCESS);
1006 }
1007
1008 static int partition_extended_replicated_objects(struct ldb_module *module, struct ldb_request *req)
1009 {
1010         struct dsdb_extended_replicated_objects *ext;
1011
1012         ext = talloc_get_type(req->op.extended.data, struct dsdb_extended_replicated_objects);
1013         if (!ext) {
1014                 ldb_debug(ldb_module_get_ctx(module), LDB_DEBUG_FATAL, "partition_extended_replicated_objects: invalid extended data\n");
1015                 return LDB_ERR_PROTOCOL_ERROR;
1016         }
1017
1018         if (ext->version != DSDB_EXTENDED_REPLICATED_OBJECTS_VERSION) {
1019                 ldb_debug(ldb_module_get_ctx(module), LDB_DEBUG_FATAL, "partition_extended_replicated_objects: extended data invalid version [%u != %u]\n",
1020                           ext->version, DSDB_EXTENDED_REPLICATED_OBJECTS_VERSION);
1021                 return LDB_ERR_PROTOCOL_ERROR;
1022         }
1023
1024         return partition_replicate(module, req, ext->partition_dn);
1025 }
1026
1027 static int partition_extended_schema_update_now(struct ldb_module *module, struct ldb_request *req)
1028 {
1029         struct dsdb_control_current_partition *partition;
1030         struct partition_private_data *data;
1031         struct ldb_dn *schema_dn;
1032         struct partition_context *ac;
1033         int ret;
1034
1035         schema_dn = talloc_get_type(req->op.extended.data, struct ldb_dn);
1036         if (!schema_dn) {
1037                 ldb_debug(ldb_module_get_ctx(module), LDB_DEBUG_FATAL, "partition_extended: invalid extended data\n");
1038                 return LDB_ERR_PROTOCOL_ERROR;
1039         }
1040
1041         data = talloc_get_type(module->private_data, struct partition_private_data);
1042         if (!data) {
1043                 return LDB_ERR_OPERATIONS_ERROR;
1044         }
1045         
1046         partition = find_partition( data, schema_dn );
1047         if (!partition) {
1048                 return ldb_next_request(module, req);
1049         }
1050
1051         ac = partition_init_ctx(module, req);
1052         if (!ac) {
1053                 return LDB_ERR_OPERATIONS_ERROR;
1054         }
1055
1056         /* we need to add a control but we never touch the original request */
1057         ret = partition_prep_request(ac, partition);
1058         if (ret != LDB_SUCCESS) {
1059                 return ret;
1060         }
1061
1062         /* fire the first one */
1063         ret =  partition_call_first(ac);
1064
1065         if (ret != LDB_SUCCESS){
1066                 return ret;
1067         }
1068
1069         return ldb_request_done(req, ret);
1070 }
1071
1072
1073 /* extended */
1074 static int partition_extended(struct ldb_module *module, struct ldb_request *req)
1075 {
1076         struct partition_private_data *data;
1077         struct partition_context *ac;
1078
1079         data = talloc_get_type(module->private_data, struct partition_private_data);
1080         if (!data || !data->partitions) {
1081                 return ldb_next_request(module, req);
1082         }
1083
1084         if (strcmp(req->op.extended.oid, LDB_EXTENDED_SEQUENCE_NUMBER) == 0) {
1085                 return partition_sequence_number(module, req);
1086         }
1087
1088         if (strcmp(req->op.extended.oid, DSDB_EXTENDED_REPLICATED_OBJECTS_OID) == 0) {
1089                 return partition_extended_replicated_objects(module, req);
1090         }
1091
1092         /* forward schemaUpdateNow operation to schema_fsmo module*/
1093         if (strcmp(req->op.extended.oid, DSDB_EXTENDED_SCHEMA_UPDATE_NOW_OID) == 0) {
1094                 return partition_extended_schema_update_now( module, req );
1095         }       
1096
1097         /* 
1098          * as the extended operation has no dn
1099          * we need to send it to all partitions
1100          */
1101
1102         ac = partition_init_ctx(module, req);
1103         if (!ac) {
1104                 return LDB_ERR_OPERATIONS_ERROR;
1105         }
1106
1107         return partition_send_all(module, ac, req);
1108 }
1109
1110 static int partition_sort_compare(const void *v1, const void *v2)
1111 {
1112         const struct dsdb_control_current_partition *p1;
1113         const struct dsdb_control_current_partition *p2;
1114
1115         p1 = *((struct dsdb_control_current_partition * const*)v1);
1116         p2 = *((struct dsdb_control_current_partition * const*)v2);
1117
1118         return ldb_dn_compare(p1->dn, p2->dn);
1119 }
1120
1121 static int partition_init(struct ldb_module *module)
1122 {
1123         int ret, i;
1124         TALLOC_CTX *mem_ctx = talloc_new(module);
1125         const char *attrs[] = { "partition", "replicateEntries", "modules", NULL };
1126         struct ldb_result *res;
1127         struct ldb_message *msg;
1128         struct ldb_message_element *partition_attributes;
1129         struct ldb_message_element *replicate_attributes;
1130         struct ldb_message_element *modules_attributes;
1131
1132         struct partition_private_data *data;
1133
1134         if (!mem_ctx) {
1135                 return LDB_ERR_OPERATIONS_ERROR;
1136         }
1137
1138         data = talloc(mem_ctx, struct partition_private_data);
1139         if (data == NULL) {
1140                 return LDB_ERR_OPERATIONS_ERROR;
1141         }
1142
1143         ret = ldb_search(ldb_module_get_ctx(module), mem_ctx, &res,
1144                          ldb_dn_new(mem_ctx, ldb_module_get_ctx(module), "@PARTITION"),
1145                          LDB_SCOPE_BASE, attrs, NULL);
1146         if (ret != LDB_SUCCESS) {
1147                 talloc_free(mem_ctx);
1148                 return ret;
1149         }
1150         if (res->count == 0) {
1151                 talloc_free(mem_ctx);
1152                 return ldb_next_init(module);
1153         }
1154
1155         if (res->count > 1) {
1156                 talloc_free(mem_ctx);
1157                 return LDB_ERR_CONSTRAINT_VIOLATION;
1158         }
1159
1160         msg = res->msgs[0];
1161
1162         partition_attributes = ldb_msg_find_element(msg, "partition");
1163         if (!partition_attributes) {
1164                 ldb_set_errstring(ldb_module_get_ctx(module), "partition_init: no partitions specified");
1165                 talloc_free(mem_ctx);
1166                 return LDB_ERR_CONSTRAINT_VIOLATION;
1167         }
1168         data->partitions = talloc_array(data, struct dsdb_control_current_partition *, partition_attributes->num_values + 1);
1169         if (!data->partitions) {
1170                 talloc_free(mem_ctx);
1171                 return LDB_ERR_OPERATIONS_ERROR;
1172         }
1173         for (i=0; i < partition_attributes->num_values; i++) {
1174                 char *base = talloc_strdup(data->partitions, (char *)partition_attributes->values[i].data);
1175                 char *p = strchr(base, ':');
1176                 if (!p) {
1177                         ldb_asprintf_errstring(ldb_module_get_ctx(module), 
1178                                                 "partition_init: "
1179                                                 "invalid form for partition record (missing ':'): %s", base);
1180                         talloc_free(mem_ctx);
1181                         return LDB_ERR_CONSTRAINT_VIOLATION;
1182                 }
1183                 p[0] = '\0';
1184                 p++;
1185                 if (!p[0]) {
1186                         ldb_asprintf_errstring(ldb_module_get_ctx(module), 
1187                                                 "partition_init: "
1188                                                 "invalid form for partition record (missing backend database): %s", base);
1189                         talloc_free(mem_ctx);
1190                         return LDB_ERR_CONSTRAINT_VIOLATION;
1191                 }
1192                 data->partitions[i] = talloc(data->partitions, struct dsdb_control_current_partition);
1193                 if (!data->partitions[i]) {
1194                         talloc_free(mem_ctx);
1195                         return LDB_ERR_OPERATIONS_ERROR;
1196                 }
1197                 data->partitions[i]->version = DSDB_CONTROL_CURRENT_PARTITION_VERSION;
1198
1199                 data->partitions[i]->dn = ldb_dn_new(data->partitions[i], ldb_module_get_ctx(module), base);
1200                 if (!data->partitions[i]->dn) {
1201                         ldb_asprintf_errstring(ldb_module_get_ctx(module), 
1202                                                 "partition_init: invalid DN in partition record: %s", base);
1203                         talloc_free(mem_ctx);
1204                         return LDB_ERR_CONSTRAINT_VIOLATION;
1205                 }
1206
1207                 data->partitions[i]->backend = samdb_relative_path(ldb_module_get_ctx(module), 
1208                                                                    data->partitions[i], 
1209                                                                    p);
1210                 if (!data->partitions[i]->backend) {
1211                         ldb_asprintf_errstring(ldb_module_get_ctx(module), 
1212                                                 "partition_init: unable to determine an relative path for partition: %s", base);
1213                         talloc_free(mem_ctx);                   
1214                 }
1215                 ret = ldb_connect_backend(ldb_module_get_ctx(module), data->partitions[i]->backend, NULL, &data->partitions[i]->module);
1216                 if (ret != LDB_SUCCESS) {
1217                         talloc_free(mem_ctx);
1218                         return ret;
1219                 }
1220         }
1221         data->partitions[i] = NULL;
1222
1223         /* sort these into order, most to least specific */
1224         qsort(data->partitions, partition_attributes->num_values,
1225               sizeof(*data->partitions), partition_sort_compare);
1226
1227         for (i=0; data->partitions[i]; i++) {
1228                 struct ldb_request *req;
1229                 req = talloc_zero(mem_ctx, struct ldb_request);
1230                 if (req == NULL) {
1231                         ldb_debug(ldb_module_get_ctx(module), LDB_DEBUG_ERROR, "partition: Out of memory!\n");
1232                         talloc_free(mem_ctx);
1233                         return LDB_ERR_OPERATIONS_ERROR;
1234                 }
1235                 
1236                 req->operation = LDB_REQ_REGISTER_PARTITION;
1237                 req->op.reg_partition.dn = data->partitions[i]->dn;
1238                 req->callback = ldb_op_default_callback;
1239
1240                 ldb_set_timeout(ldb_module_get_ctx(module), req, 0);
1241
1242                 req->handle = ldb_handle_new(req, ldb_module_get_ctx(module));
1243                 if (req->handle == NULL) {
1244                         return LDB_ERR_OPERATIONS_ERROR;
1245                 }
1246                 
1247                 ret = ldb_request(ldb_module_get_ctx(module), req);
1248                 if (ret == LDB_SUCCESS) {
1249                         ret = ldb_wait(req->handle, LDB_WAIT_ALL);
1250                 }
1251                 if (ret != LDB_SUCCESS) {
1252                         ldb_debug(ldb_module_get_ctx(module), LDB_DEBUG_ERROR, "partition: Unable to register partition with rootdse!\n");
1253                         talloc_free(mem_ctx);
1254                         return LDB_ERR_OTHER;
1255                 }
1256                 talloc_free(req);
1257         }
1258
1259         replicate_attributes = ldb_msg_find_element(msg, "replicateEntries");
1260         if (!replicate_attributes) {
1261                 data->replicate = NULL;
1262         } else {
1263                 data->replicate = talloc_array(data, struct ldb_dn *, replicate_attributes->num_values + 1);
1264                 if (!data->replicate) {
1265                         talloc_free(mem_ctx);
1266                         return LDB_ERR_OPERATIONS_ERROR;
1267                 }
1268
1269                 for (i=0; i < replicate_attributes->num_values; i++) {
1270                         data->replicate[i] = ldb_dn_from_ldb_val(data->replicate, ldb_module_get_ctx(module), &replicate_attributes->values[i]);
1271                         if (!ldb_dn_validate(data->replicate[i])) {
1272                                 ldb_asprintf_errstring(ldb_module_get_ctx(module), 
1273                                                         "partition_init: "
1274                                                         "invalid DN in partition replicate record: %s", 
1275                                                         replicate_attributes->values[i].data);
1276                                 talloc_free(mem_ctx);
1277                                 return LDB_ERR_CONSTRAINT_VIOLATION;
1278                         }
1279                 }
1280                 data->replicate[i] = NULL;
1281         }
1282
1283         /* Make the private data available to any searches the modules may trigger in initialisation */
1284         module->private_data = data;
1285         talloc_steal(module, data);
1286         
1287         modules_attributes = ldb_msg_find_element(msg, "modules");
1288         if (modules_attributes) {
1289                 for (i=0; i < modules_attributes->num_values; i++) {
1290                         struct ldb_dn *base_dn;
1291                         int partition_idx;
1292                         struct dsdb_control_current_partition *partition = NULL;
1293                         const char **modules = NULL;
1294
1295                         char *base = talloc_strdup(data->partitions, (char *)modules_attributes->values[i].data);
1296                         char *p = strchr(base, ':');
1297                         if (!p) {
1298                                 ldb_asprintf_errstring(ldb_module_get_ctx(module), 
1299                                                         "partition_init: "
1300                                                         "invalid form for partition module record (missing ':'): %s", base);
1301                                 talloc_free(mem_ctx);
1302                                 return LDB_ERR_CONSTRAINT_VIOLATION;
1303                         }
1304                         p[0] = '\0';
1305                         p++;
1306                         if (!p[0]) {
1307                                 ldb_asprintf_errstring(ldb_module_get_ctx(module), 
1308                                                         "partition_init: "
1309                                                         "invalid form for partition module record (missing backend database): %s", base);
1310                                 talloc_free(mem_ctx);
1311                                 return LDB_ERR_CONSTRAINT_VIOLATION;
1312                         }
1313
1314                         modules = ldb_modules_list_from_string(ldb_module_get_ctx(module), mem_ctx,
1315                                                                p);
1316                         
1317                         base_dn = ldb_dn_new(mem_ctx, ldb_module_get_ctx(module), base);
1318                         if (!ldb_dn_validate(base_dn)) {
1319                                 talloc_free(mem_ctx);
1320                                 return LDB_ERR_OPERATIONS_ERROR;
1321                         }
1322                         
1323                         for (partition_idx = 0; data->partitions[partition_idx]; partition_idx++) {
1324                                 if (ldb_dn_compare(data->partitions[partition_idx]->dn, base_dn) == 0) {
1325                                         partition = data->partitions[partition_idx];
1326                                         break;
1327                                 }
1328                         }
1329                         
1330                         if (!partition) {
1331                                 ldb_asprintf_errstring(ldb_module_get_ctx(module), 
1332                                                         "partition_init: "
1333                                                         "invalid form for partition module record (no such partition): %s", base);
1334                                 talloc_free(mem_ctx);
1335                                 return LDB_ERR_CONSTRAINT_VIOLATION;
1336                         }
1337                         
1338                         ret = ldb_load_modules_list(ldb_module_get_ctx(module), modules, partition->module, &partition->module);
1339                         if (ret != LDB_SUCCESS) {
1340                                 ldb_asprintf_errstring(ldb_module_get_ctx(module), 
1341                                                        "partition_init: "
1342                                                        "loading backend for %s failed: %s", 
1343                                                        base, ldb_errstring(ldb_module_get_ctx(module)));
1344                                 talloc_free(mem_ctx);
1345                                 return ret;
1346                         }
1347                         ret = ldb_init_module_chain(ldb_module_get_ctx(module), partition->module);
1348                         if (ret != LDB_SUCCESS) {
1349                                 ldb_asprintf_errstring(ldb_module_get_ctx(module), 
1350                                                        "partition_init: "
1351                                                        "initialising backend for %s failed: %s", 
1352                                                        base, ldb_errstring(ldb_module_get_ctx(module)));
1353                                 talloc_free(mem_ctx);
1354                                 return ret;
1355                         }
1356                 }
1357         }
1358
1359         ret = ldb_mod_register_control(module, LDB_CONTROL_DOMAIN_SCOPE_OID);
1360         if (ret != LDB_SUCCESS) {
1361                 ldb_debug(ldb_module_get_ctx(module), LDB_DEBUG_ERROR,
1362                         "partition: Unable to register control with rootdse!\n");
1363                 return LDB_ERR_OPERATIONS_ERROR;
1364         }
1365
1366         ret = ldb_mod_register_control(module, LDB_CONTROL_SEARCH_OPTIONS_OID);
1367         if (ret != LDB_SUCCESS) {
1368                 ldb_debug(ldb_module_get_ctx(module), LDB_DEBUG_ERROR,
1369                         "partition: Unable to register control with rootdse!\n");
1370                 return LDB_ERR_OPERATIONS_ERROR;
1371         }
1372
1373         talloc_free(mem_ctx);
1374         return ldb_next_init(module);
1375 }
1376
1377 _PUBLIC_ const struct ldb_module_ops ldb_partition_module_ops = {
1378         .name              = "partition",
1379         .init_context      = partition_init,
1380         .search            = partition_search,
1381         .add               = partition_add,
1382         .modify            = partition_modify,
1383         .del               = partition_delete,
1384         .rename            = partition_rename,
1385         .extended          = partition_extended,
1386         .start_transaction = partition_start_trans,
1387         .end_transaction   = partition_end_trans,
1388         .del_transaction   = partition_del_trans,
1389 };