Merge branch 'master' of /home/tridge/samba/git/combined
[samba.git] / source4 / dsdb / samdb / ldb_modules / partition.c
1 /* 
2    Partitions ldb module
3
4    Copyright (C) Andrew Bartlett <abartlet@samba.org> 2006
5    Copyright (C) Stefan Metzmacher <metze@samba.org> 2007
6
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 3 of the License, or
10    (at your option) any later version.
11    
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16    
17    You should have received a copy of the GNU General Public License
18    along with this program.  If not, see <http://www.gnu.org/licenses/>.
19 */
20
21 /*
22  *  Name: ldb
23  *
24  *  Component: ldb partitions module
25  *
26  *  Description: Implement LDAP partitions
27  *
28  *  Author: Andrew Bartlett
29  *  Author: Stefan Metzmacher
30  */
31
32 #include "includes.h"
33 #include "lib/ldb/include/ldb.h"
34 #include "lib/ldb/include/ldb_errors.h"
35 #include "lib/ldb/include/ldb_module.h"
36 #include "lib/ldb/include/ldb_private.h"
37 #include "dsdb/samdb/samdb.h"
38
39 struct dsdb_partition {
40         struct ldb_module *module;
41         struct dsdb_control_current_partition *ctrl;
42 };
43
44 struct partition_private_data {
45         struct dsdb_partition **partitions;
46         struct ldb_dn **replicate;
47 };
48
49 struct part_request {
50         struct ldb_module *module;
51         struct ldb_request *req;
52 };
53
54 struct partition_context {
55         struct ldb_module *module;
56         struct ldb_request *req;
57         bool got_success;
58
59         struct part_request *part_req;
60         int num_requests;
61         int finished_requests;
62 };
63
64 static struct partition_context *partition_init_ctx(struct ldb_module *module, struct ldb_request *req)
65 {
66         struct partition_context *ac;
67
68         ac = talloc_zero(req, struct partition_context);
69         if (ac == NULL) {
70                 ldb_set_errstring(ldb_module_get_ctx(module), "Out of Memory");
71                 return NULL;
72         }
73
74         ac->module = module;
75         ac->req = req;
76
77         return ac;
78 }
79
80 #define PARTITION_FIND_OP_NOERROR(module, op) do { \
81         while (module && module->ops->op == NULL) module = module->next; \
82 } while (0)
83
84 #define PARTITION_FIND_OP(module, op) do { \
85         PARTITION_FIND_OP_NOERROR(module, op); \
86         if (module == NULL) { \
87                 ldb_asprintf_errstring(ldb_module_get_ctx(module), \
88                         "Unable to find backend operation for " #op ); \
89                 return LDB_ERR_OPERATIONS_ERROR; \
90         } \
91 } while (0)
92
93 /*
94  *    helper functions to call the next module in chain
95  *    */
96
97 static int partition_request(struct ldb_module *module, struct ldb_request *request)
98 {
99         int ret;
100         switch (request->operation) {
101         case LDB_SEARCH:
102                 PARTITION_FIND_OP(module, search);
103                 ret = module->ops->search(module, request);
104                 break;
105         case LDB_ADD:
106                 PARTITION_FIND_OP(module, add);
107                 ret = module->ops->add(module, request);
108                 break;
109         case LDB_MODIFY:
110                 PARTITION_FIND_OP(module, modify);
111                 ret = module->ops->modify(module, request);
112                 break;
113         case LDB_DELETE:
114                 PARTITION_FIND_OP(module, del);
115                 ret = module->ops->del(module, request);
116                 break;
117         case LDB_RENAME:
118                 PARTITION_FIND_OP(module, rename);
119                 ret = module->ops->rename(module, request);
120                 break;
121         case LDB_EXTENDED:
122                 PARTITION_FIND_OP(module, extended);
123                 ret = module->ops->extended(module, request);
124                 break;
125         default:
126                 PARTITION_FIND_OP(module, request);
127                 ret = module->ops->request(module, request);
128                 break;
129         }
130         if (ret == LDB_SUCCESS) {
131                 return ret;
132         }
133         if (!ldb_errstring(ldb_module_get_ctx(module))) {
134                 /* Set a default error string, to place the blame somewhere */
135                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
136                                         "error in module %s: %s (%d)",
137                                         module->ops->name,
138                                         ldb_strerror(ret), ret);
139         }
140         return ret;
141 }
142
143 static struct dsdb_partition *find_partition(struct partition_private_data *data,
144                                              struct ldb_dn *dn,
145                                              struct ldb_request *req)
146 {
147         int i;
148         struct ldb_control *partition_ctrl;
149
150         /* see if the request has the partition DN specified in a
151          * control. The repl_meta_data module can specify this to
152          * ensure that replication happens to the right partition
153          */
154         partition_ctrl = ldb_request_get_control(req, DSDB_CONTROL_CURRENT_PARTITION_OID);
155         if (partition_ctrl) {
156                 const struct dsdb_control_current_partition *partition;
157                 partition = talloc_get_type(partition_ctrl->data,
158                                             struct dsdb_control_current_partition);
159                 if (partition != NULL) {
160                         dn = partition->dn;
161                 }
162         }
163
164         /* Look at base DN */
165         /* Figure out which partition it is under */
166         /* Skip the lot if 'data' isn't here yet (initialisation) */
167         for (i=0; data && data->partitions && data->partitions[i]; i++) {
168                 if (ldb_dn_compare_base(data->partitions[i]->ctrl->dn, dn) == 0) {
169                         return data->partitions[i];
170                 }
171         }
172
173         return NULL;
174 }
175
176 /**
177  * fire the caller's callback for every entry, but only send 'done' once.
178  */
179 static int partition_req_callback(struct ldb_request *req,
180                                   struct ldb_reply *ares)
181 {
182         struct partition_context *ac;
183         struct ldb_module *module;
184         struct ldb_request *nreq;
185         int ret;
186
187         ac = talloc_get_type(req->context, struct partition_context);
188
189         if (!ares) {
190                 return ldb_module_done(ac->req, NULL, NULL,
191                                         LDB_ERR_OPERATIONS_ERROR);
192         }
193
194         if (ares->error != LDB_SUCCESS && !ac->got_success) {
195                 return ldb_module_done(ac->req, ares->controls,
196                                         ares->response, ares->error);
197         }
198
199         switch (ares->type) {
200         case LDB_REPLY_REFERRAL:
201                 /* ignore referrals for now */
202                 break;
203
204         case LDB_REPLY_ENTRY:
205                 if (ac->req->operation != LDB_SEARCH) {
206                         ldb_set_errstring(ldb_module_get_ctx(ac->module),
207                                 "partition_req_callback:"
208                                 " Unsupported reply type for this request");
209                         return ldb_module_done(ac->req, NULL, NULL,
210                                                 LDB_ERR_OPERATIONS_ERROR);
211                 }
212                 return ldb_module_send_entry(ac->req, ares->message, ares->controls);
213
214         case LDB_REPLY_DONE:
215                 if (ares->error == LDB_SUCCESS) {
216                         ac->got_success = true;
217                 }
218                 if (ac->req->operation == LDB_EXTENDED) {
219                         /* FIXME: check for ares->response, replmd does not fill it ! */
220                         if (ares->response) {
221                                 if (strcmp(ares->response->oid, LDB_EXTENDED_START_TLS_OID) != 0) {
222                                         ldb_set_errstring(ldb_module_get_ctx(ac->module),
223                                                           "partition_req_callback:"
224                                                           " Unknown extended reply, "
225                                                           "only supports START_TLS");
226                                         talloc_free(ares);
227                                         return ldb_module_done(ac->req, NULL, NULL,
228                                                                 LDB_ERR_OPERATIONS_ERROR);
229                                 }
230                         }
231                 }
232
233                 ac->finished_requests++;
234                 if (ac->finished_requests == ac->num_requests) {
235                         /* this was the last one, call callback */
236                         return ldb_module_done(ac->req, ares->controls,
237                                                ares->response, 
238                                                ac->got_success?LDB_SUCCESS:ares->error);
239                 }
240
241                 /* not the last, now call the next one */
242                 module = ac->part_req[ac->finished_requests].module;
243                 nreq = ac->part_req[ac->finished_requests].req;
244
245                 ret = partition_request(module, nreq);
246                 if (ret != LDB_SUCCESS) {
247                         talloc_free(ares);
248                         return ldb_module_done(ac->req, NULL, NULL, ret);
249                 }
250
251                 break;
252         }
253
254         talloc_free(ares);
255         return LDB_SUCCESS;
256 }
257
258 static int partition_prep_request(struct partition_context *ac,
259                                   struct dsdb_partition *partition)
260 {
261         int ret;
262         struct ldb_request *req;
263
264         ac->part_req = talloc_realloc(ac, ac->part_req,
265                                         struct part_request,
266                                         ac->num_requests + 1);
267         if (ac->part_req == NULL) {
268                 ldb_oom(ldb_module_get_ctx(ac->module));
269                 return LDB_ERR_OPERATIONS_ERROR;
270         }
271
272         switch (ac->req->operation) {
273         case LDB_SEARCH:
274                 ret = ldb_build_search_req_ex(&req, ldb_module_get_ctx(ac->module),
275                                         ac->part_req,
276                                         ac->req->op.search.base,
277                                         ac->req->op.search.scope,
278                                         ac->req->op.search.tree,
279                                         ac->req->op.search.attrs,
280                                         ac->req->controls,
281                                         ac, partition_req_callback,
282                                         ac->req);
283                 break;
284         case LDB_ADD:
285                 ret = ldb_build_add_req(&req, ldb_module_get_ctx(ac->module), ac->part_req,
286                                         ac->req->op.add.message,
287                                         ac->req->controls,
288                                         ac, partition_req_callback,
289                                         ac->req);
290                 break;
291         case LDB_MODIFY:
292                 ret = ldb_build_mod_req(&req, ldb_module_get_ctx(ac->module), ac->part_req,
293                                         ac->req->op.mod.message,
294                                         ac->req->controls,
295                                         ac, partition_req_callback,
296                                         ac->req);
297                 break;
298         case LDB_DELETE:
299                 ret = ldb_build_del_req(&req, ldb_module_get_ctx(ac->module), ac->part_req,
300                                         ac->req->op.del.dn,
301                                         ac->req->controls,
302                                         ac, partition_req_callback,
303                                         ac->req);
304                 break;
305         case LDB_RENAME:
306                 ret = ldb_build_rename_req(&req, ldb_module_get_ctx(ac->module), ac->part_req,
307                                         ac->req->op.rename.olddn,
308                                         ac->req->op.rename.newdn,
309                                         ac->req->controls,
310                                         ac, partition_req_callback,
311                                         ac->req);
312                 break;
313         case LDB_EXTENDED:
314                 ret = ldb_build_extended_req(&req, ldb_module_get_ctx(ac->module),
315                                         ac->part_req,
316                                         ac->req->op.extended.oid,
317                                         ac->req->op.extended.data,
318                                         ac->req->controls,
319                                         ac, partition_req_callback,
320                                         ac->req);
321                 break;
322         default:
323                 ldb_set_errstring(ldb_module_get_ctx(ac->module),
324                                   "Unsupported request type!");
325                 ret = LDB_ERR_UNWILLING_TO_PERFORM;
326         }
327
328         if (ret != LDB_SUCCESS) {
329                 return ret;
330         }
331
332         ac->part_req[ac->num_requests].req = req;
333
334         if (ac->req->controls) {
335                 req->controls = talloc_memdup(req, ac->req->controls,
336                                         talloc_get_size(ac->req->controls));
337                 if (req->controls == NULL) {
338                         ldb_oom(ldb_module_get_ctx(ac->module));
339                         return LDB_ERR_OPERATIONS_ERROR;
340                 }
341         }
342
343         if (partition) {
344                 ac->part_req[ac->num_requests].module = partition->module;
345
346                 if (!ldb_request_get_control(req, DSDB_CONTROL_CURRENT_PARTITION_OID)) {
347                         ret = ldb_request_add_control(req,
348                                                       DSDB_CONTROL_CURRENT_PARTITION_OID,
349                                                       false, partition->ctrl);
350                         if (ret != LDB_SUCCESS) {
351                                 return ret;
352                         }
353                 }
354
355                 if (req->operation == LDB_SEARCH) {
356                         /* If the search is for 'more' than this partition,
357                          * then change the basedn, so a remote LDAP server
358                          * doesn't object */
359                         if (ldb_dn_compare_base(partition->ctrl->dn,
360                                                 req->op.search.base) != 0) {
361                                 req->op.search.base = partition->ctrl->dn;
362                         }
363                 }
364
365         } else {
366                 /* make sure you put the NEXT module here, or
367                  * partition_request() will simply loop forever on itself */
368                 ac->part_req[ac->num_requests].module = ac->module->next;
369         }
370
371         ac->num_requests++;
372
373         return LDB_SUCCESS;
374 }
375
376 static int partition_call_first(struct partition_context *ac)
377 {
378         return partition_request(ac->part_req[0].module, ac->part_req[0].req);
379 }
380
381 /**
382  * Send a request down to all the partitions
383  */
384 static int partition_send_all(struct ldb_module *module, 
385                               struct partition_context *ac, 
386                               struct ldb_request *req) 
387 {
388         int i;
389         struct partition_private_data *data = talloc_get_type(module->private_data, 
390                                                               struct partition_private_data);
391         int ret = partition_prep_request(ac, NULL);
392         if (ret != LDB_SUCCESS) {
393                 return ret;
394         }
395         for (i=0; data && data->partitions && data->partitions[i]; i++) {
396                 ret = partition_prep_request(ac, data->partitions[i]);
397                 if (ret != LDB_SUCCESS) {
398                         return ret;
399                 }
400         }
401
402         /* fire the first one */
403         return partition_call_first(ac);
404 }
405
406 /**
407  * Figure out which backend a request needs to be aimed at.  Some
408  * requests must be replicated to all backends
409  */
410 static int partition_replicate(struct ldb_module *module, struct ldb_request *req, struct ldb_dn *dn) 
411 {
412         struct partition_context *ac;
413         unsigned i;
414         int ret;
415         struct dsdb_partition *partition;
416         struct partition_private_data *data = talloc_get_type(module->private_data, 
417                                                               struct partition_private_data);
418         if (!data || !data->partitions) {
419                 return ldb_next_request(module, req);
420         }
421         
422         if (req->operation != LDB_SEARCH) {
423                 /* Is this a special DN, we need to replicate to every backend? */
424                 for (i=0; data->replicate && data->replicate[i]; i++) {
425                         if (ldb_dn_compare(data->replicate[i], 
426                                            dn) == 0) {
427                                 
428                                 ac = partition_init_ctx(module, req);
429                                 if (!ac) {
430                                         return LDB_ERR_OPERATIONS_ERROR;
431                                 }
432                                 
433                                 return partition_send_all(module, ac, req);
434                         }
435                 }
436         }
437
438         /* Otherwise, we need to find the partition to fire it to */
439
440         /* Find partition */
441         partition = find_partition(data, dn, req);
442         if (!partition) {
443                 /*
444                  * if we haven't found a matching partition
445                  * pass the request to the main ldb
446                  *
447                  * TODO: we should maybe return an error here
448                  *       if it's not a special dn
449                  */
450
451                 return ldb_next_request(module, req);
452         }
453
454         ac = partition_init_ctx(module, req);
455         if (!ac) {
456                 return LDB_ERR_OPERATIONS_ERROR;
457         }
458
459         /* we need to add a control but we never touch the original request */
460         ret = partition_prep_request(ac, partition);
461         if (ret != LDB_SUCCESS) {
462                 return ret;
463         }
464
465         /* fire the first one */
466         return partition_call_first(ac);
467 }
468
469 /* search */
470 static int partition_search(struct ldb_module *module, struct ldb_request *req)
471 {
472         struct ldb_control **saved_controls;
473         
474         /* Find backend */
475         struct partition_private_data *data = talloc_get_type(module->private_data, 
476                                                               struct partition_private_data);
477         /* issue request */
478
479         /* (later) consider if we should be searching multiple
480          * partitions (for 'invisible' partition behaviour */
481
482         struct ldb_control *search_control = ldb_request_get_control(req, LDB_CONTROL_SEARCH_OPTIONS_OID);
483         struct ldb_control *domain_scope_control = ldb_request_get_control(req, LDB_CONTROL_DOMAIN_SCOPE_OID);
484         
485         struct ldb_search_options_control *search_options = NULL;
486         if (search_control) {
487                 search_options = talloc_get_type(search_control->data, struct ldb_search_options_control);
488         }
489
490         /* Remove the domain_scope control, so we don't confuse a backend server */
491         if (domain_scope_control && !save_controls(domain_scope_control, req, &saved_controls)) {
492                 ldb_oom(ldb_module_get_ctx(module));
493                 return LDB_ERR_OPERATIONS_ERROR;
494         }
495
496         /*
497          * for now pass down the LDB_CONTROL_SEARCH_OPTIONS_OID control
498          * down as uncritical to make windows 2008 dcpromo happy.
499          */
500         if (search_control) {
501                 search_control->critical = 0;
502         }
503
504         /* TODO:
505            Generate referrals (look for a partition under this DN) if we don't have the above control specified
506         */
507         
508         if (search_options && (search_options->search_options & LDB_SEARCH_OPTION_PHANTOM_ROOT)) {
509                 int ret, i;
510                 struct partition_context *ac;
511                 if ((search_options->search_options & ~LDB_SEARCH_OPTION_PHANTOM_ROOT) == 0) {
512                         /* We have processed this flag, so we are done with this control now */
513
514                         /* Remove search control, so we don't confuse a backend server */
515                         if (search_control && !save_controls(search_control, req, &saved_controls)) {
516                                 ldb_oom(ldb_module_get_ctx(module));
517                                 return LDB_ERR_OPERATIONS_ERROR;
518                         }
519                 }
520                 ac = partition_init_ctx(module, req);
521                 if (!ac) {
522                         return LDB_ERR_OPERATIONS_ERROR;
523                 }
524
525                 /* Search from the base DN */
526                 if (!req->op.search.base || ldb_dn_is_null(req->op.search.base)) {
527                         return partition_send_all(module, ac, req);
528                 }
529                 for (i=0; data && data->partitions && data->partitions[i]; i++) {
530                         bool match = false, stop = false;
531                         /* Find all partitions under the search base 
532                            
533                            we match if:
534
535                               1) the DN we are looking for exactly matches the partition
536                              or
537                               2) the DN we are looking for is a parent of the partition and it isn't
538                                  a scope base search
539                              or
540                               3) the DN we are looking for is a child of the partition
541                          */
542                         if (ldb_dn_compare(data->partitions[i]->ctrl->dn, req->op.search.base) == 0) {
543                                 match = true;
544                                 if (req->op.search.scope == LDB_SCOPE_BASE) {
545                                         stop = true;
546                                 }
547                         }
548                         if (!match && 
549                             (ldb_dn_compare_base(req->op.search.base, data->partitions[i]->ctrl->dn) == 0 &&
550                              req->op.search.scope != LDB_SCOPE_BASE)) {
551                                 match = true;
552                         }
553                         if (!match &&
554                             ldb_dn_compare_base(data->partitions[i]->ctrl->dn, req->op.search.base) == 0) {
555                                 match = true;
556                                 stop = true; /* note that this relies on partition ordering */
557                         }
558                         if (match) {
559                                 ret = partition_prep_request(ac, data->partitions[i]);
560                                 if (ret != LDB_SUCCESS) {
561                                         return ret;
562                                 }
563                         }
564                         if (stop) break;
565                 }
566
567                 /* Perhaps we didn't match any partitions.  Try the main partition, only */
568                 if (ac->num_requests == 0) {
569                         talloc_free(ac);
570                         return ldb_next_request(module, req);
571                 }
572
573                 /* fire the first one */
574                 return partition_call_first(ac);
575
576         } else {
577                 /* Handle this like all other requests */
578                 if (search_control && (search_options->search_options & ~LDB_SEARCH_OPTION_PHANTOM_ROOT) == 0) {
579                         /* We have processed this flag, so we are done with this control now */
580
581                         /* Remove search control, so we don't confuse a backend server */
582                         if (search_control && !save_controls(search_control, req, &saved_controls)) {
583                                 ldb_oom(ldb_module_get_ctx(module));
584                                 return LDB_ERR_OPERATIONS_ERROR;
585                         }
586                 }
587
588                 return partition_replicate(module, req, req->op.search.base);
589         }
590 }
591
592 /* add */
593 static int partition_add(struct ldb_module *module, struct ldb_request *req)
594 {
595         return partition_replicate(module, req, req->op.add.message->dn);
596 }
597
598 /* modify */
599 static int partition_modify(struct ldb_module *module, struct ldb_request *req)
600 {
601         return partition_replicate(module, req, req->op.mod.message->dn);
602 }
603
604 /* delete */
605 static int partition_delete(struct ldb_module *module, struct ldb_request *req)
606 {
607         return partition_replicate(module, req, req->op.del.dn);
608 }
609
610 /* rename */
611 static int partition_rename(struct ldb_module *module, struct ldb_request *req)
612 {
613         /* Find backend */
614         struct dsdb_partition *backend, *backend2;
615         
616         struct partition_private_data *data = talloc_get_type(module->private_data, 
617                                                               struct partition_private_data);
618
619         /* Skip the lot if 'data' isn't here yet (initialisation) */
620         if (!data) {
621                 return LDB_ERR_OPERATIONS_ERROR;
622         }
623
624         backend = find_partition(data, req->op.rename.olddn, req);
625         backend2 = find_partition(data, req->op.rename.newdn, req);
626
627         if ((backend && !backend2) || (!backend && backend2)) {
628                 return LDB_ERR_AFFECTS_MULTIPLE_DSAS;
629         }
630
631         if (backend != backend2) {
632                 ldb_asprintf_errstring(ldb_module_get_ctx(module), 
633                                        "Cannot rename from %s in %s to %s in %s: %s",
634                                        ldb_dn_get_linearized(req->op.rename.olddn),
635                                        ldb_dn_get_linearized(backend->ctrl->dn),
636                                        ldb_dn_get_linearized(req->op.rename.newdn),
637                                        ldb_dn_get_linearized(backend2->ctrl->dn),
638                                        ldb_strerror(LDB_ERR_AFFECTS_MULTIPLE_DSAS));
639                 return LDB_ERR_AFFECTS_MULTIPLE_DSAS;
640         }
641
642         return partition_replicate(module, req, req->op.rename.olddn);
643 }
644
645 /* start a transaction */
646 static int partition_start_trans(struct ldb_module *module)
647 {
648         int i, ret;
649         struct partition_private_data *data = talloc_get_type(module->private_data, 
650                                                               struct partition_private_data);
651         /* Look at base DN */
652         /* Figure out which partition it is under */
653         /* Skip the lot if 'data' isn't here yet (initialization) */
654         ret = ldb_next_start_trans(module);
655         if (ret != LDB_SUCCESS) {
656                 return ret;
657         }
658
659         for (i=0; data && data->partitions && data->partitions[i]; i++) {
660                 struct ldb_module *next = data->partitions[i]->module;
661                 PARTITION_FIND_OP(next, start_transaction);
662
663                 ret = next->ops->start_transaction(next);
664                 if (ret != LDB_SUCCESS) {
665                         /* Back it out, if it fails on one */
666                         for (i--; i >= 0; i--) {
667                                 next = data->partitions[i]->module;
668                                 PARTITION_FIND_OP(next, del_transaction);
669
670                                 next->ops->del_transaction(next);
671                         }
672                         ldb_next_del_trans(module);
673                         return ret;
674                 }
675         }
676         return LDB_SUCCESS;
677 }
678
679 /* prepare for a commit */
680 static int partition_prepare_commit(struct ldb_module *module)
681 {
682         int i;
683         struct partition_private_data *data = talloc_get_type(module->private_data, 
684                                                               struct partition_private_data);
685
686         for (i=0; data && data->partitions && data->partitions[i]; i++) {
687                 struct ldb_module *next_prepare = data->partitions[i]->module;
688                 int ret;
689
690                 PARTITION_FIND_OP_NOERROR(next_prepare, prepare_commit);
691                 if (next_prepare == NULL) {
692                         continue;
693                 }
694
695                 ret = next_prepare->ops->prepare_commit(next_prepare);
696                 if (ret != LDB_SUCCESS) {
697                         return ret;
698                 }
699         }
700
701         return ldb_next_prepare_commit(module);
702 }
703
704
705 /* end a transaction */
706 static int partition_end_trans(struct ldb_module *module)
707 {
708         int i;
709         struct partition_private_data *data = talloc_get_type(module->private_data, 
710                                                               struct partition_private_data);
711         for (i=0; data && data->partitions && data->partitions[i]; i++) {
712                 struct ldb_module *next_end = data->partitions[i]->module;
713                 int ret;
714
715                 PARTITION_FIND_OP(next_end, end_transaction);
716
717                 ret = next_end->ops->end_transaction(next_end);
718                 if (ret != LDB_SUCCESS) {
719                         return ret;
720                 }
721         }
722
723         return ldb_next_end_trans(module);
724 }
725
726 /* delete a transaction */
727 static int partition_del_trans(struct ldb_module *module)
728 {
729         int i, ret, final_ret = LDB_SUCCESS;
730         struct partition_private_data *data = talloc_get_type(module->private_data, 
731                                                               struct partition_private_data);
732         for (i=0; data && data->partitions && data->partitions[i]; i++) {
733                 struct ldb_module *next = data->partitions[i]->module;
734                 PARTITION_FIND_OP(next, del_transaction);
735
736                 ret = next->ops->del_transaction(next);
737                 if (ret != LDB_SUCCESS) {
738                         final_ret = ret;
739                 }
740         }       
741
742         ret = ldb_next_del_trans(module);
743         if (ret != LDB_SUCCESS) {
744                 final_ret = ret;
745         }
746         return final_ret;
747 }
748
749
750 /* FIXME: This function is still semi-async */
751 static int partition_sequence_number(struct ldb_module *module, struct ldb_request *req)
752 {
753         int i, ret;
754         uint64_t seq_number = 0;
755         uint64_t timestamp_sequence = 0;
756         uint64_t timestamp = 0;
757         struct partition_private_data *data = talloc_get_type(module->private_data, 
758                                                               struct partition_private_data);
759         struct ldb_seqnum_request *seq;
760         struct ldb_seqnum_result *seqr;
761         struct ldb_request *treq;
762         struct ldb_seqnum_request *tseq;
763         struct ldb_seqnum_result *tseqr;
764         struct ldb_extended *ext;
765         struct ldb_result *res;
766         struct dsdb_partition *p;
767
768         p = find_partition(NULL, NULL, req);
769         if (p != NULL) {
770                 /* the caller specified what partition they want the
771                  * sequence number operation on - just pass it on
772                  */
773                 return ldb_next_request(p->module, req);                
774         }
775
776         seq = talloc_get_type(req->op.extended.data, struct ldb_seqnum_request);
777
778         switch (seq->type) {
779         case LDB_SEQ_NEXT:
780         case LDB_SEQ_HIGHEST_SEQ:
781                 res = talloc_zero(req, struct ldb_result);
782                 if (res == NULL) {
783                         return LDB_ERR_OPERATIONS_ERROR;
784                 }
785                 tseq = talloc_zero(res, struct ldb_seqnum_request);
786                 if (tseq == NULL) {
787                         talloc_free(res);
788                         return LDB_ERR_OPERATIONS_ERROR;
789                 }
790                 tseq->type = seq->type;
791
792                 ret = ldb_build_extended_req(&treq, ldb_module_get_ctx(module), res,
793                                              LDB_EXTENDED_SEQUENCE_NUMBER,
794                                              tseq,
795                                              NULL,
796                                              res,
797                                              ldb_extended_default_callback,
798                                              NULL);
799                 ret = ldb_next_request(module, treq);
800                 if (ret == LDB_SUCCESS) {
801                         ret = ldb_wait(treq->handle, LDB_WAIT_ALL);
802                 }
803                 if (ret != LDB_SUCCESS) {
804                         talloc_free(res);
805                         return ret;
806                 }
807                 seqr = talloc_get_type(res->extended->data,
808                                         struct ldb_seqnum_result);
809                 if (seqr->flags & LDB_SEQ_TIMESTAMP_SEQUENCE) {
810                         timestamp_sequence = seqr->seq_num;
811                 } else {
812                         seq_number += seqr->seq_num;
813                 }
814                 talloc_free(res);
815
816                 /* Skip the lot if 'data' isn't here yet (initialisation) */
817                 for (i=0; data && data->partitions && data->partitions[i]; i++) {
818
819                         res = talloc_zero(req, struct ldb_result);
820                         if (res == NULL) {
821                                 return LDB_ERR_OPERATIONS_ERROR;
822                         }
823                         tseq = talloc_zero(res, struct ldb_seqnum_request);
824                         if (tseq == NULL) {
825                                 talloc_free(res);
826                                 return LDB_ERR_OPERATIONS_ERROR;
827                         }
828                         tseq->type = seq->type;
829
830                         ret = ldb_build_extended_req(&treq, ldb_module_get_ctx(module), res,
831                                                      LDB_EXTENDED_SEQUENCE_NUMBER,
832                                                      tseq,
833                                                      NULL,
834                                                      res,
835                                                      ldb_extended_default_callback,
836                                                      NULL);
837                         if (ret != LDB_SUCCESS) {
838                                 talloc_free(res);
839                                 return ret;
840                         }
841
842                         if (!ldb_request_get_control(treq, DSDB_CONTROL_CURRENT_PARTITION_OID)) {
843                                 ret = ldb_request_add_control(treq,
844                                                               DSDB_CONTROL_CURRENT_PARTITION_OID,
845                                                               false, data->partitions[i]->ctrl);
846                                 if (ret != LDB_SUCCESS) {
847                                         talloc_free(res);
848                                         return ret;
849                                 }
850                         }
851
852                         ret = partition_request(data->partitions[i]->module, treq);
853                         if (ret != LDB_SUCCESS) {
854                                 talloc_free(res);
855                                 return ret;
856                         }
857                         ret = ldb_wait(treq->handle, LDB_WAIT_ALL);
858                         if (ret != LDB_SUCCESS) {
859                                 talloc_free(res);
860                                 return ret;
861                         }
862                         tseqr = talloc_get_type(res->extended->data,
863                                                 struct ldb_seqnum_result);
864                         if (tseqr->flags & LDB_SEQ_TIMESTAMP_SEQUENCE) {
865                                 timestamp_sequence = MAX(timestamp_sequence,
866                                                          tseqr->seq_num);
867                         } else {
868                                 seq_number += tseqr->seq_num;
869                         }
870                         talloc_free(res);
871                 }
872                 /* fall through */
873         case LDB_SEQ_HIGHEST_TIMESTAMP:
874
875                 res = talloc_zero(req, struct ldb_result);
876                 if (res == NULL) {
877                         return LDB_ERR_OPERATIONS_ERROR;
878                 }
879
880                 tseq = talloc_zero(res, struct ldb_seqnum_request);
881                 if (tseq == NULL) {
882                         talloc_free(res);
883                         return LDB_ERR_OPERATIONS_ERROR;
884                 }
885                 tseq->type = LDB_SEQ_HIGHEST_TIMESTAMP;
886
887                 ret = ldb_build_extended_req(&treq, ldb_module_get_ctx(module), res,
888                                              LDB_EXTENDED_SEQUENCE_NUMBER,
889                                              tseq,
890                                              NULL,
891                                              res,
892                                              ldb_extended_default_callback,
893                                              NULL);
894                 if (ret != LDB_SUCCESS) {
895                         talloc_free(res);
896                         return ret;
897                 }
898
899                 ret = ldb_next_request(module, treq);
900                 if (ret != LDB_SUCCESS) {
901                         talloc_free(res);
902                         return ret;
903                 }
904                 ret = ldb_wait(treq->handle, LDB_WAIT_ALL);
905                 if (ret != LDB_SUCCESS) {
906                         talloc_free(res);
907                         return ret;
908                 }
909
910                 tseqr = talloc_get_type(res->extended->data,
911                                            struct ldb_seqnum_result);
912                 timestamp = tseqr->seq_num;
913
914                 talloc_free(res);
915
916                 /* Skip the lot if 'data' isn't here yet (initialisation) */
917                 for (i=0; data && data->partitions && data->partitions[i]; i++) {
918
919                         res = talloc_zero(req, struct ldb_result);
920                         if (res == NULL) {
921                                 return LDB_ERR_OPERATIONS_ERROR;
922                         }
923
924                         tseq = talloc_zero(res, struct ldb_seqnum_request);
925                         if (tseq == NULL) {
926                                 talloc_free(res);
927                                 return LDB_ERR_OPERATIONS_ERROR;
928                         }
929                         tseq->type = LDB_SEQ_HIGHEST_TIMESTAMP;
930
931                         ret = ldb_build_extended_req(&treq, ldb_module_get_ctx(module), res,
932                                                      LDB_EXTENDED_SEQUENCE_NUMBER,
933                                                      tseq,
934                                                      NULL,
935                                                      res,
936                                                      ldb_extended_default_callback,
937                                                      NULL);
938                         if (ret != LDB_SUCCESS) {
939                                 talloc_free(res);
940                                 return ret;
941                         }
942
943                         if (!ldb_request_get_control(treq, DSDB_CONTROL_CURRENT_PARTITION_OID)) {
944                                 ret = ldb_request_add_control(treq,
945                                                               DSDB_CONTROL_CURRENT_PARTITION_OID,
946                                                               false, data->partitions[i]->ctrl);
947                                 if (ret != LDB_SUCCESS) {
948                                         talloc_free(res);
949                                         return ret;
950                                 }
951                         }
952
953                         ret = partition_request(data->partitions[i]->module, treq);
954                         if (ret != LDB_SUCCESS) {
955                                 talloc_free(res);
956                                 return ret;
957                         }
958                         ret = ldb_wait(treq->handle, LDB_WAIT_ALL);
959                         if (ret != LDB_SUCCESS) {
960                                 talloc_free(res);
961                                 return ret;
962                         }
963
964                         tseqr = talloc_get_type(res->extended->data,
965                                                   struct ldb_seqnum_result);
966                         timestamp = MAX(timestamp, tseqr->seq_num);
967
968                         talloc_free(res);
969                 }
970
971                 break;
972         }
973
974         ext = talloc_zero(req, struct ldb_extended);
975         if (!ext) {
976                 return LDB_ERR_OPERATIONS_ERROR;
977         }
978         seqr = talloc_zero(ext, struct ldb_seqnum_result);
979         if (seqr == NULL) {
980                 talloc_free(ext);
981                 return LDB_ERR_OPERATIONS_ERROR;
982         }
983         ext->oid = LDB_EXTENDED_SEQUENCE_NUMBER;
984         ext->data = seqr;
985
986         switch (seq->type) {
987         case LDB_SEQ_NEXT:
988         case LDB_SEQ_HIGHEST_SEQ:
989
990                 /* Has someone above set a timebase sequence? */
991                 if (timestamp_sequence) {
992                         seqr->seq_num = (((unsigned long long)timestamp << 24) | (seq_number & 0xFFFFFF));
993                 } else {
994                         seqr->seq_num = seq_number;
995                 }
996
997                 if (timestamp_sequence > seqr->seq_num) {
998                         seqr->seq_num = timestamp_sequence;
999                         seqr->flags |= LDB_SEQ_TIMESTAMP_SEQUENCE;
1000                 }
1001
1002                 seqr->flags |= LDB_SEQ_GLOBAL_SEQUENCE;
1003                 break;
1004         case LDB_SEQ_HIGHEST_TIMESTAMP:
1005                 seqr->seq_num = timestamp;
1006                 break;
1007         }
1008
1009         if (seq->type == LDB_SEQ_NEXT) {
1010                 seqr->seq_num++;
1011         }
1012
1013         /* send request done */
1014         return ldb_module_done(req, NULL, ext, LDB_SUCCESS);
1015 }
1016
1017 static int partition_extended_schema_update_now(struct ldb_module *module, struct ldb_request *req)
1018 {
1019         struct dsdb_partition *partition;
1020         struct partition_private_data *data;
1021         struct ldb_dn *schema_dn;
1022         struct partition_context *ac;
1023         int ret;
1024
1025         schema_dn = talloc_get_type(req->op.extended.data, struct ldb_dn);
1026         if (!schema_dn) {
1027                 ldb_debug(ldb_module_get_ctx(module), LDB_DEBUG_FATAL, "partition_extended: invalid extended data\n");
1028                 return LDB_ERR_PROTOCOL_ERROR;
1029         }
1030
1031         data = talloc_get_type(module->private_data, struct partition_private_data);
1032         if (!data) {
1033                 return LDB_ERR_OPERATIONS_ERROR;
1034         }
1035         
1036         partition = find_partition( data, schema_dn, req);
1037         if (!partition) {
1038                 return ldb_next_request(module, req);
1039         }
1040
1041         ac = partition_init_ctx(module, req);
1042         if (!ac) {
1043                 return LDB_ERR_OPERATIONS_ERROR;
1044         }
1045
1046         /* we need to add a control but we never touch the original request */
1047         ret = partition_prep_request(ac, partition);
1048         if (ret != LDB_SUCCESS) {
1049                 return ret;
1050         }
1051
1052         /* fire the first one */
1053         ret =  partition_call_first(ac);
1054
1055         if (ret != LDB_SUCCESS){
1056                 return ret;
1057         }
1058
1059         return ldb_request_done(req, ret);
1060 }
1061
1062
1063 /* extended */
1064 static int partition_extended(struct ldb_module *module, struct ldb_request *req)
1065 {
1066         struct partition_private_data *data;
1067         struct partition_context *ac;
1068
1069         data = talloc_get_type(module->private_data, struct partition_private_data);
1070         if (!data || !data->partitions) {
1071                 return ldb_next_request(module, req);
1072         }
1073
1074         if (strcmp(req->op.extended.oid, LDB_EXTENDED_SEQUENCE_NUMBER) == 0) {
1075                 return partition_sequence_number(module, req);
1076         }
1077
1078         /* forward schemaUpdateNow operation to schema_fsmo module*/
1079         if (strcmp(req->op.extended.oid, DSDB_EXTENDED_SCHEMA_UPDATE_NOW_OID) == 0) {
1080                 return partition_extended_schema_update_now( module, req );
1081         }       
1082
1083         /* 
1084          * as the extended operation has no dn
1085          * we need to send it to all partitions
1086          */
1087
1088         ac = partition_init_ctx(module, req);
1089         if (!ac) {
1090                 return LDB_ERR_OPERATIONS_ERROR;
1091         }
1092
1093         return partition_send_all(module, ac, req);
1094 }
1095
1096 static int partition_sort_compare(const void *v1, const void *v2)
1097 {
1098         const struct dsdb_partition *p1;
1099         const struct dsdb_partition *p2;
1100
1101         p1 = *((struct dsdb_partition * const*)v1);
1102         p2 = *((struct dsdb_partition * const*)v2);
1103
1104         return ldb_dn_compare(p1->ctrl->dn, p2->ctrl->dn);
1105 }
1106
1107 static int partition_init(struct ldb_module *module)
1108 {
1109         int ret, i;
1110         TALLOC_CTX *mem_ctx = talloc_new(module);
1111         const char *attrs[] = { "partition", "replicateEntries", "modules", NULL };
1112         struct ldb_result *res;
1113         struct ldb_message *msg;
1114         struct ldb_message_element *partition_attributes;
1115         struct ldb_message_element *replicate_attributes;
1116         struct ldb_message_element *modules_attributes;
1117
1118         struct partition_private_data *data;
1119
1120         if (!mem_ctx) {
1121                 return LDB_ERR_OPERATIONS_ERROR;
1122         }
1123
1124         data = talloc(mem_ctx, struct partition_private_data);
1125         if (data == NULL) {
1126                 return LDB_ERR_OPERATIONS_ERROR;
1127         }
1128
1129         ret = ldb_search(ldb_module_get_ctx(module), mem_ctx, &res,
1130                          ldb_dn_new(mem_ctx, ldb_module_get_ctx(module), "@PARTITION"),
1131                          LDB_SCOPE_BASE, attrs, NULL);
1132         if (ret != LDB_SUCCESS) {
1133                 talloc_free(mem_ctx);
1134                 return ret;
1135         }
1136         if (res->count == 0) {
1137                 talloc_free(mem_ctx);
1138                 return ldb_next_init(module);
1139         }
1140
1141         if (res->count > 1) {
1142                 talloc_free(mem_ctx);
1143                 return LDB_ERR_CONSTRAINT_VIOLATION;
1144         }
1145
1146         msg = res->msgs[0];
1147
1148         partition_attributes = ldb_msg_find_element(msg, "partition");
1149         if (!partition_attributes) {
1150                 ldb_set_errstring(ldb_module_get_ctx(module), "partition_init: no partitions specified");
1151                 talloc_free(mem_ctx);
1152                 return LDB_ERR_CONSTRAINT_VIOLATION;
1153         }
1154         data->partitions = talloc_array(data, struct dsdb_partition *, partition_attributes->num_values + 1);
1155         if (!data->partitions) {
1156                 talloc_free(mem_ctx);
1157                 return LDB_ERR_OPERATIONS_ERROR;
1158         }
1159         for (i=0; i < partition_attributes->num_values; i++) {
1160                 char *base = talloc_strdup(data->partitions, (char *)partition_attributes->values[i].data);
1161                 char *p = strchr(base, ':');
1162                 const char *backend;
1163
1164                 if (!p) {
1165                         ldb_asprintf_errstring(ldb_module_get_ctx(module), 
1166                                                 "partition_init: "
1167                                                 "invalid form for partition record (missing ':'): %s", base);
1168                         talloc_free(mem_ctx);
1169                         return LDB_ERR_CONSTRAINT_VIOLATION;
1170                 }
1171                 p[0] = '\0';
1172                 p++;
1173                 if (!p[0]) {
1174                         ldb_asprintf_errstring(ldb_module_get_ctx(module), 
1175                                                 "partition_init: "
1176                                                 "invalid form for partition record (missing backend database): %s", base);
1177                         talloc_free(mem_ctx);
1178                         return LDB_ERR_CONSTRAINT_VIOLATION;
1179                 }
1180                 data->partitions[i] = talloc(data->partitions, struct dsdb_partition);
1181                 if (!data->partitions[i]) {
1182                         talloc_free(mem_ctx);
1183                         return LDB_ERR_OPERATIONS_ERROR;
1184                 }
1185                 data->partitions[i]->ctrl = talloc(data->partitions[i], struct dsdb_control_current_partition);
1186                 if (!data->partitions[i]->ctrl) {
1187                         talloc_free(mem_ctx);
1188                         return LDB_ERR_OPERATIONS_ERROR;
1189                 }
1190                 data->partitions[i]->ctrl->version = DSDB_CONTROL_CURRENT_PARTITION_VERSION;
1191                 data->partitions[i]->ctrl->dn = ldb_dn_new(data->partitions[i], ldb_module_get_ctx(module), base);
1192                 if (!data->partitions[i]->ctrl->dn) {
1193                         ldb_asprintf_errstring(ldb_module_get_ctx(module), 
1194                                                 "partition_init: invalid DN in partition record: %s", base);
1195                         talloc_free(mem_ctx);
1196                         return LDB_ERR_CONSTRAINT_VIOLATION;
1197                 }
1198
1199                 backend = samdb_relative_path(ldb_module_get_ctx(module), 
1200                                                                    data->partitions[i], 
1201                                                                    p);
1202                 if (!backend) {
1203                         ldb_asprintf_errstring(ldb_module_get_ctx(module), 
1204                                                 "partition_init: unable to determine an relative path for partition: %s", base);
1205                         talloc_free(mem_ctx);                   
1206                 }
1207                 ret = ldb_connect_backend(ldb_module_get_ctx(module), backend, NULL, &data->partitions[i]->module);
1208                 if (ret != LDB_SUCCESS) {
1209                         talloc_free(mem_ctx);
1210                         return ret;
1211                 }
1212         }
1213         data->partitions[i] = NULL;
1214
1215         /* sort these into order, most to least specific */
1216         qsort(data->partitions, partition_attributes->num_values,
1217               sizeof(*data->partitions), partition_sort_compare);
1218
1219         for (i=0; data->partitions[i]; i++) {
1220                 struct ldb_request *req;
1221                 req = talloc_zero(mem_ctx, struct ldb_request);
1222                 if (req == NULL) {
1223                         ldb_debug(ldb_module_get_ctx(module), LDB_DEBUG_ERROR, "partition: Out of memory!\n");
1224                         talloc_free(mem_ctx);
1225                         return LDB_ERR_OPERATIONS_ERROR;
1226                 }
1227                 
1228                 req->operation = LDB_REQ_REGISTER_PARTITION;
1229                 req->op.reg_partition.dn = data->partitions[i]->ctrl->dn;
1230                 req->callback = ldb_op_default_callback;
1231
1232                 ldb_set_timeout(ldb_module_get_ctx(module), req, 0);
1233
1234                 req->handle = ldb_handle_new(req, ldb_module_get_ctx(module));
1235                 if (req->handle == NULL) {
1236                         return LDB_ERR_OPERATIONS_ERROR;
1237                 }
1238                 
1239                 ret = ldb_request(ldb_module_get_ctx(module), req);
1240                 if (ret == LDB_SUCCESS) {
1241                         ret = ldb_wait(req->handle, LDB_WAIT_ALL);
1242                 }
1243                 if (ret != LDB_SUCCESS) {
1244                         ldb_debug(ldb_module_get_ctx(module), LDB_DEBUG_ERROR, "partition: Unable to register partition with rootdse!\n");
1245                         talloc_free(mem_ctx);
1246                         return LDB_ERR_OTHER;
1247                 }
1248                 talloc_free(req);
1249         }
1250
1251         replicate_attributes = ldb_msg_find_element(msg, "replicateEntries");
1252         if (!replicate_attributes) {
1253                 data->replicate = NULL;
1254         } else {
1255                 data->replicate = talloc_array(data, struct ldb_dn *, replicate_attributes->num_values + 1);
1256                 if (!data->replicate) {
1257                         talloc_free(mem_ctx);
1258                         return LDB_ERR_OPERATIONS_ERROR;
1259                 }
1260
1261                 for (i=0; i < replicate_attributes->num_values; i++) {
1262                         data->replicate[i] = ldb_dn_from_ldb_val(data->replicate, ldb_module_get_ctx(module), &replicate_attributes->values[i]);
1263                         if (!ldb_dn_validate(data->replicate[i])) {
1264                                 ldb_asprintf_errstring(ldb_module_get_ctx(module), 
1265                                                         "partition_init: "
1266                                                         "invalid DN in partition replicate record: %s", 
1267                                                         replicate_attributes->values[i].data);
1268                                 talloc_free(mem_ctx);
1269                                 return LDB_ERR_CONSTRAINT_VIOLATION;
1270                         }
1271                 }
1272                 data->replicate[i] = NULL;
1273         }
1274
1275         /* Make the private data available to any searches the modules may trigger in initialisation */
1276         module->private_data = data;
1277         talloc_steal(module, data);
1278         
1279         modules_attributes = ldb_msg_find_element(msg, "modules");
1280         if (modules_attributes) {
1281                 for (i=0; i < modules_attributes->num_values; i++) {
1282                         struct ldb_dn *base_dn;
1283                         int partition_idx;
1284                         struct dsdb_partition *partition = NULL;
1285                         const char **modules = NULL;
1286
1287                         char *base = talloc_strdup(data->partitions, (char *)modules_attributes->values[i].data);
1288                         char *p = strchr(base, ':');
1289                         if (!p) {
1290                                 ldb_asprintf_errstring(ldb_module_get_ctx(module), 
1291                                                         "partition_init: "
1292                                                         "invalid form for partition module record (missing ':'): %s", base);
1293                                 talloc_free(mem_ctx);
1294                                 return LDB_ERR_CONSTRAINT_VIOLATION;
1295                         }
1296                         p[0] = '\0';
1297                         p++;
1298                         if (!p[0]) {
1299                                 ldb_asprintf_errstring(ldb_module_get_ctx(module), 
1300                                                         "partition_init: "
1301                                                         "invalid form for partition module record (missing backend database): %s", base);
1302                                 talloc_free(mem_ctx);
1303                                 return LDB_ERR_CONSTRAINT_VIOLATION;
1304                         }
1305
1306                         modules = ldb_modules_list_from_string(ldb_module_get_ctx(module), mem_ctx,
1307                                                                p);
1308                         
1309                         base_dn = ldb_dn_new(mem_ctx, ldb_module_get_ctx(module), base);
1310                         if (!ldb_dn_validate(base_dn)) {
1311                                 talloc_free(mem_ctx);
1312                                 return LDB_ERR_OPERATIONS_ERROR;
1313                         }
1314                         
1315                         for (partition_idx = 0; data->partitions[partition_idx]; partition_idx++) {
1316                                 if (ldb_dn_compare(data->partitions[partition_idx]->ctrl->dn, base_dn) == 0) {
1317                                         partition = data->partitions[partition_idx];
1318                                         break;
1319                                 }
1320                         }
1321                         
1322                         if (!partition) {
1323                                 ldb_asprintf_errstring(ldb_module_get_ctx(module), 
1324                                                         "partition_init: "
1325                                                         "invalid form for partition module record (no such partition): %s", base);
1326                                 talloc_free(mem_ctx);
1327                                 return LDB_ERR_CONSTRAINT_VIOLATION;
1328                         }
1329                         
1330                         ret = ldb_load_modules_list(ldb_module_get_ctx(module), modules, partition->module, &partition->module);
1331                         if (ret != LDB_SUCCESS) {
1332                                 ldb_asprintf_errstring(ldb_module_get_ctx(module), 
1333                                                        "partition_init: "
1334                                                        "loading backend for %s failed: %s", 
1335                                                        base, ldb_errstring(ldb_module_get_ctx(module)));
1336                                 talloc_free(mem_ctx);
1337                                 return ret;
1338                         }
1339                         ret = ldb_init_module_chain(ldb_module_get_ctx(module), partition->module);
1340                         if (ret != LDB_SUCCESS) {
1341                                 ldb_asprintf_errstring(ldb_module_get_ctx(module), 
1342                                                        "partition_init: "
1343                                                        "initialising backend for %s failed: %s", 
1344                                                        base, ldb_errstring(ldb_module_get_ctx(module)));
1345                                 talloc_free(mem_ctx);
1346                                 return ret;
1347                         }
1348                 }
1349         }
1350
1351         ret = ldb_mod_register_control(module, LDB_CONTROL_DOMAIN_SCOPE_OID);
1352         if (ret != LDB_SUCCESS) {
1353                 ldb_debug(ldb_module_get_ctx(module), LDB_DEBUG_ERROR,
1354                         "partition: Unable to register control with rootdse!\n");
1355                 return LDB_ERR_OPERATIONS_ERROR;
1356         }
1357
1358         ret = ldb_mod_register_control(module, LDB_CONTROL_SEARCH_OPTIONS_OID);
1359         if (ret != LDB_SUCCESS) {
1360                 ldb_debug(ldb_module_get_ctx(module), LDB_DEBUG_ERROR,
1361                         "partition: Unable to register control with rootdse!\n");
1362                 return LDB_ERR_OPERATIONS_ERROR;
1363         }
1364
1365         talloc_free(mem_ctx);
1366         return ldb_next_init(module);
1367 }
1368
1369 _PUBLIC_ const struct ldb_module_ops ldb_partition_module_ops = {
1370         .name              = "partition",
1371         .init_context      = partition_init,
1372         .search            = partition_search,
1373         .add               = partition_add,
1374         .modify            = partition_modify,
1375         .del               = partition_delete,
1376         .rename            = partition_rename,
1377         .extended          = partition_extended,
1378         .start_transaction = partition_start_trans,
1379         .prepare_commit    = partition_prepare_commit,
1380         .end_transaction   = partition_end_trans,
1381         .del_transaction   = partition_del_trans,
1382 };