9763664940b79d2c5705a999073ebe284f0e3055
[samba.git] / source4 / dsdb / samdb / ldb_modules / partition.c
1 /* 
2    Partitions ldb module
3
4    Copyright (C) Andrew Bartlett <abartlet@samba.org> 2006
5    Copyright (C) Stefan Metzmacher <metze@samba.org> 2007
6
7    * NOTICE: this module is NOT released under the GNU LGPL license as
8    * other ldb code. This module is release under the GNU GPL v3 or
9    * later license.
10
11    This program is free software; you can redistribute it and/or modify
12    it under the terms of the GNU General Public License as published by
13    the Free Software Foundation; either version 3 of the License, or
14    (at your option) any later version.
15    
16    This program is distributed in the hope that it will be useful,
17    but WITHOUT ANY WARRANTY; without even the implied warranty of
18    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19    GNU General Public License for more details.
20    
21    You should have received a copy of the GNU General Public License
22    along with this program.  If not, see <http://www.gnu.org/licenses/>.
23 */
24
25 /*
26  *  Name: ldb
27  *
28  *  Component: ldb partitions module
29  *
30  *  Description: Implement LDAP partitions
31  *
32  *  Author: Andrew Bartlett
33  *  Author: Stefan Metzmacher
34  */
35
36 #include "includes.h"
37 #include "lib/ldb/include/ldb.h"
38 #include "lib/ldb/include/ldb_errors.h"
39 #include "lib/ldb/include/ldb_module.h"
40 #include "lib/ldb/include/ldb_private.h"
41 #include "dsdb/samdb/samdb.h"
42
43 struct partition_private_data {
44         struct dsdb_control_current_partition **partitions;
45         struct ldb_dn **replicate;
46 };
47
48 struct part_request {
49         struct ldb_module *module;
50         struct ldb_request *req;
51 };
52
53 struct partition_context {
54         struct ldb_module *module;
55         struct ldb_request *req;
56         bool got_success;
57
58         struct part_request *part_req;
59         int num_requests;
60         int finished_requests;
61 };
62
63 static struct partition_context *partition_init_ctx(struct ldb_module *module, struct ldb_request *req)
64 {
65         struct partition_context *ac;
66
67         ac = talloc_zero(req, struct partition_context);
68         if (ac == NULL) {
69                 ldb_set_errstring(ldb_module_get_ctx(module), "Out of Memory");
70                 return NULL;
71         }
72
73         ac->module = module;
74         ac->req = req;
75
76         return ac;
77 }
78
79 #define PARTITION_FIND_OP_NOERROR(module, op) do { \
80         while (module && module->ops->op == NULL) module = module->next; \
81 } while (0)
82
83 #define PARTITION_FIND_OP(module, op) do { \
84         PARTITION_FIND_OP_NOERROR(module, op); \
85         if (module == NULL) { \
86                 ldb_asprintf_errstring(ldb_module_get_ctx(module), \
87                         "Unable to find backend operation for " #op ); \
88                 return LDB_ERR_OPERATIONS_ERROR; \
89         } \
90 } while (0)
91
92 /*
93  *    helper functions to call the next module in chain
94  *    */
95
96 static int partition_request(struct ldb_module *module, struct ldb_request *request)
97 {
98         int ret;
99         switch (request->operation) {
100         case LDB_SEARCH:
101                 PARTITION_FIND_OP(module, search);
102                 ret = module->ops->search(module, request);
103                 break;
104         case LDB_ADD:
105                 PARTITION_FIND_OP(module, add);
106                 ret = module->ops->add(module, request);
107                 break;
108         case LDB_MODIFY:
109                 PARTITION_FIND_OP(module, modify);
110                 ret = module->ops->modify(module, request);
111                 break;
112         case LDB_DELETE:
113                 PARTITION_FIND_OP(module, del);
114                 ret = module->ops->del(module, request);
115                 break;
116         case LDB_RENAME:
117                 PARTITION_FIND_OP(module, rename);
118                 ret = module->ops->rename(module, request);
119                 break;
120         case LDB_EXTENDED:
121                 PARTITION_FIND_OP(module, extended);
122                 ret = module->ops->extended(module, request);
123                 break;
124         default:
125                 PARTITION_FIND_OP(module, request);
126                 ret = module->ops->request(module, request);
127                 break;
128         }
129         if (ret == LDB_SUCCESS) {
130                 return ret;
131         }
132         if (!ldb_errstring(ldb_module_get_ctx(module))) {
133                 /* Set a default error string, to place the blame somewhere */
134                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
135                                         "error in module %s: %s (%d)",
136                                         module->ops->name,
137                                         ldb_strerror(ret), ret);
138         }
139         return ret;
140 }
141
142 static struct dsdb_control_current_partition *find_partition(struct partition_private_data *data,
143                                                              struct ldb_dn *dn)
144 {
145         int i;
146
147         /* Look at base DN */
148         /* Figure out which partition it is under */
149         /* Skip the lot if 'data' isn't here yet (initialisation) */
150         for (i=0; data && data->partitions && data->partitions[i]; i++) {
151                 if (ldb_dn_compare_base(data->partitions[i]->dn, dn) == 0) {
152                         return data->partitions[i];
153                 }
154         }
155
156         return NULL;
157 }
158
159 /**
160  * fire the caller's callback for every entry, but only send 'done' once.
161  */
162 static int partition_req_callback(struct ldb_request *req,
163                                   struct ldb_reply *ares)
164 {
165         struct partition_context *ac;
166         struct ldb_module *module;
167         struct ldb_request *nreq;
168         int ret;
169
170         ac = talloc_get_type(req->context, struct partition_context);
171
172         if (!ares) {
173                 return ldb_module_done(ac->req, NULL, NULL,
174                                         LDB_ERR_OPERATIONS_ERROR);
175         }
176
177         if (ares->error != LDB_SUCCESS && !ac->got_success) {
178                 return ldb_module_done(ac->req, ares->controls,
179                                         ares->response, ares->error);
180         }
181
182         switch (ares->type) {
183         case LDB_REPLY_REFERRAL:
184                 /* ignore referrals for now */
185                 break;
186
187         case LDB_REPLY_ENTRY:
188                 if (ac->req->operation != LDB_SEARCH) {
189                         ldb_set_errstring(ldb_module_get_ctx(ac->module),
190                                 "partition_req_callback:"
191                                 " Unsupported reply type for this request");
192                         return ldb_module_done(ac->req, NULL, NULL,
193                                                 LDB_ERR_OPERATIONS_ERROR);
194                 }
195                 return ldb_module_send_entry(ac->req, ares->message, ares->controls);
196
197         case LDB_REPLY_DONE:
198                 if (ares->error == LDB_SUCCESS) {
199                         ac->got_success = true;
200                 }
201                 if (ac->req->operation == LDB_EXTENDED) {
202                         /* FIXME: check for ares->response, replmd does not fill it ! */
203                         if (ares->response) {
204                                 if (strcmp(ares->response->oid, LDB_EXTENDED_START_TLS_OID) != 0) {
205                                         ldb_set_errstring(ldb_module_get_ctx(ac->module),
206                                                           "partition_req_callback:"
207                                                           " Unknown extended reply, "
208                                                           "only supports START_TLS");
209                                         talloc_free(ares);
210                                         return ldb_module_done(ac->req, NULL, NULL,
211                                                                 LDB_ERR_OPERATIONS_ERROR);
212                                 }
213                         }
214                 }
215
216                 ac->finished_requests++;
217                 if (ac->finished_requests == ac->num_requests) {
218                         /* this was the last one, call callback */
219                         return ldb_module_done(ac->req, ares->controls,
220                                                ares->response, 
221                                                ac->got_success?LDB_SUCCESS:ares->error);
222                 }
223
224                 /* not the last, now call the next one */
225                 module = ac->part_req[ac->finished_requests].module;
226                 nreq = ac->part_req[ac->finished_requests].req;
227
228                 ret = partition_request(module, nreq);
229                 if (ret != LDB_SUCCESS) {
230                         talloc_free(ares);
231                         return ldb_module_done(ac->req, NULL, NULL, ret);
232                 }
233
234                 break;
235         }
236
237         talloc_free(ares);
238         return LDB_SUCCESS;
239 }
240
241 static int partition_prep_request(struct partition_context *ac,
242                                   struct dsdb_control_current_partition *partition)
243 {
244         int ret;
245         struct ldb_request *req;
246
247         ac->part_req = talloc_realloc(ac, ac->part_req,
248                                         struct part_request,
249                                         ac->num_requests + 1);
250         if (ac->part_req == NULL) {
251                 ldb_oom(ldb_module_get_ctx(ac->module));
252                 return LDB_ERR_OPERATIONS_ERROR;
253         }
254
255         switch (ac->req->operation) {
256         case LDB_SEARCH:
257                 ret = ldb_build_search_req_ex(&req, ldb_module_get_ctx(ac->module),
258                                         ac->part_req,
259                                         ac->req->op.search.base,
260                                         ac->req->op.search.scope,
261                                         ac->req->op.search.tree,
262                                         ac->req->op.search.attrs,
263                                         ac->req->controls,
264                                         ac, partition_req_callback,
265                                         ac->req);
266                 break;
267         case LDB_ADD:
268                 ret = ldb_build_add_req(&req, ldb_module_get_ctx(ac->module), ac->part_req,
269                                         ac->req->op.add.message,
270                                         ac->req->controls,
271                                         ac, partition_req_callback,
272                                         ac->req);
273                 break;
274         case LDB_MODIFY:
275                 ret = ldb_build_mod_req(&req, ldb_module_get_ctx(ac->module), ac->part_req,
276                                         ac->req->op.mod.message,
277                                         ac->req->controls,
278                                         ac, partition_req_callback,
279                                         ac->req);
280                 break;
281         case LDB_DELETE:
282                 ret = ldb_build_del_req(&req, ldb_module_get_ctx(ac->module), ac->part_req,
283                                         ac->req->op.del.dn,
284                                         ac->req->controls,
285                                         ac, partition_req_callback,
286                                         ac->req);
287                 break;
288         case LDB_RENAME:
289                 ret = ldb_build_rename_req(&req, ldb_module_get_ctx(ac->module), ac->part_req,
290                                         ac->req->op.rename.olddn,
291                                         ac->req->op.rename.newdn,
292                                         ac->req->controls,
293                                         ac, partition_req_callback,
294                                         ac->req);
295                 break;
296         case LDB_EXTENDED:
297                 ret = ldb_build_extended_req(&req, ldb_module_get_ctx(ac->module),
298                                         ac->part_req,
299                                         ac->req->op.extended.oid,
300                                         ac->req->op.extended.data,
301                                         ac->req->controls,
302                                         ac, partition_req_callback,
303                                         ac->req);
304                 break;
305         default:
306                 ldb_set_errstring(ldb_module_get_ctx(ac->module),
307                                   "Unsupported request type!");
308                 ret = LDB_ERR_UNWILLING_TO_PERFORM;
309         }
310
311         if (ret != LDB_SUCCESS) {
312                 return ret;
313         }
314
315         ac->part_req[ac->num_requests].req = req;
316
317         if (ac->req->controls) {
318                 req->controls = talloc_memdup(req, ac->req->controls,
319                                         talloc_get_size(ac->req->controls));
320                 if (req->controls == NULL) {
321                         ldb_oom(ldb_module_get_ctx(ac->module));
322                         return LDB_ERR_OPERATIONS_ERROR;
323                 }
324         }
325
326         if (partition) {
327                 ac->part_req[ac->num_requests].module = partition->module;
328
329                 ret = ldb_request_add_control(req,
330                                         DSDB_CONTROL_CURRENT_PARTITION_OID,
331                                         false, partition);
332                 if (ret != LDB_SUCCESS) {
333                         return ret;
334                 }
335
336                 if (req->operation == LDB_SEARCH) {
337                         /* If the search is for 'more' than this partition,
338                          * then change the basedn, so a remote LDAP server
339                          * doesn't object */
340                         if (ldb_dn_compare_base(partition->dn,
341                                                 req->op.search.base) != 0) {
342                                 req->op.search.base = partition->dn;
343                         }
344                 }
345
346         } else {
347                 /* make sure you put the NEXT module here, or
348                  * partition_request() will simply loop forever on itself */
349                 ac->part_req[ac->num_requests].module = ac->module->next;
350         }
351
352         ac->num_requests++;
353
354         return LDB_SUCCESS;
355 }
356
357 static int partition_call_first(struct partition_context *ac)
358 {
359         return partition_request(ac->part_req[0].module, ac->part_req[0].req);
360 }
361
362 /**
363  * Send a request down to all the partitions
364  */
365 static int partition_send_all(struct ldb_module *module, 
366                               struct partition_context *ac, 
367                               struct ldb_request *req) 
368 {
369         int i;
370         struct partition_private_data *data = talloc_get_type(module->private_data, 
371                                                               struct partition_private_data);
372         int ret = partition_prep_request(ac, NULL);
373         if (ret != LDB_SUCCESS) {
374                 return ret;
375         }
376         for (i=0; data && data->partitions && data->partitions[i]; i++) {
377                 ret = partition_prep_request(ac, data->partitions[i]);
378                 if (ret != LDB_SUCCESS) {
379                         return ret;
380                 }
381         }
382
383         /* fire the first one */
384         return partition_call_first(ac);
385 }
386
387 /**
388  * Figure out which backend a request needs to be aimed at.  Some
389  * requests must be replicated to all backends
390  */
391 static int partition_replicate(struct ldb_module *module, struct ldb_request *req, struct ldb_dn *dn) 
392 {
393         struct partition_context *ac;
394         unsigned i;
395         int ret;
396         struct dsdb_control_current_partition *partition;
397         struct partition_private_data *data = talloc_get_type(module->private_data, 
398                                                               struct partition_private_data);
399         if (!data || !data->partitions) {
400                 return ldb_next_request(module, req);
401         }
402         
403         if (req->operation != LDB_SEARCH) {
404                 /* Is this a special DN, we need to replicate to every backend? */
405                 for (i=0; data->replicate && data->replicate[i]; i++) {
406                         if (ldb_dn_compare(data->replicate[i], 
407                                            dn) == 0) {
408                                 
409                                 ac = partition_init_ctx(module, req);
410                                 if (!ac) {
411                                         return LDB_ERR_OPERATIONS_ERROR;
412                                 }
413                                 
414                                 return partition_send_all(module, ac, req);
415                         }
416                 }
417         }
418
419         /* Otherwise, we need to find the partition to fire it to */
420
421         /* Find partition */
422         partition = find_partition(data, dn);
423         if (!partition) {
424                 /*
425                  * if we haven't found a matching partition
426                  * pass the request to the main ldb
427                  *
428                  * TODO: we should maybe return an error here
429                  *       if it's not a special dn
430                  */
431
432                 return ldb_next_request(module, req);
433         }
434
435         ac = partition_init_ctx(module, req);
436         if (!ac) {
437                 return LDB_ERR_OPERATIONS_ERROR;
438         }
439
440         /* we need to add a control but we never touch the original request */
441         ret = partition_prep_request(ac, partition);
442         if (ret != LDB_SUCCESS) {
443                 return ret;
444         }
445
446         /* fire the first one */
447         return partition_call_first(ac);
448 }
449
450 /* search */
451 static int partition_search(struct ldb_module *module, struct ldb_request *req)
452 {
453         struct ldb_control **saved_controls;
454         
455         /* Find backend */
456         struct partition_private_data *data = talloc_get_type(module->private_data, 
457                                                               struct partition_private_data);
458         /* issue request */
459
460         /* (later) consider if we should be searching multiple
461          * partitions (for 'invisible' partition behaviour */
462
463         struct ldb_control *search_control = ldb_request_get_control(req, LDB_CONTROL_SEARCH_OPTIONS_OID);
464         struct ldb_control *domain_scope_control = ldb_request_get_control(req, LDB_CONTROL_DOMAIN_SCOPE_OID);
465         
466         struct ldb_search_options_control *search_options = NULL;
467         if (search_control) {
468                 search_options = talloc_get_type(search_control->data, struct ldb_search_options_control);
469         }
470
471         /* Remove the domain_scope control, so we don't confuse a backend server */
472         if (domain_scope_control && !save_controls(domain_scope_control, req, &saved_controls)) {
473                 ldb_oom(ldb_module_get_ctx(module));
474                 return LDB_ERR_OPERATIONS_ERROR;
475         }
476
477         /*
478          * for now pass down the LDB_CONTROL_SEARCH_OPTIONS_OID control
479          * down as uncritical to make windows 2008 dcpromo happy.
480          */
481         if (search_control) {
482                 search_control->critical = 0;
483         }
484
485         /* TODO:
486            Generate referrals (look for a partition under this DN) if we don't have the above control specified
487         */
488         
489         if (search_options && (search_options->search_options & LDB_SEARCH_OPTION_PHANTOM_ROOT)) {
490                 int ret, i;
491                 struct partition_context *ac;
492                 if ((search_options->search_options & ~LDB_SEARCH_OPTION_PHANTOM_ROOT) == 0) {
493                         /* We have processed this flag, so we are done with this control now */
494
495                         /* Remove search control, so we don't confuse a backend server */
496                         if (search_control && !save_controls(search_control, req, &saved_controls)) {
497                                 ldb_oom(ldb_module_get_ctx(module));
498                                 return LDB_ERR_OPERATIONS_ERROR;
499                         }
500                 }
501                 ac = partition_init_ctx(module, req);
502                 if (!ac) {
503                         return LDB_ERR_OPERATIONS_ERROR;
504                 }
505
506                 /* Search from the base DN */
507                 if (!req->op.search.base || ldb_dn_is_null(req->op.search.base)) {
508                         return partition_send_all(module, ac, req);
509                 }
510                 for (i=0; data && data->partitions && data->partitions[i]; i++) {
511                         bool match = false, stop = false;
512                         /* Find all partitions under the search base 
513                            
514                            we match if:
515
516                               1) the DN we are looking for exactly matches the partition
517                              or
518                               2) the DN we are looking for is a parent of the partition and it isn't
519                                  a scope base search
520                              or
521                               3) the DN we are looking for is a child of the partition
522                          */
523                         if (ldb_dn_compare(data->partitions[i]->dn, req->op.search.base) == 0) {
524                                 match = true;
525                                 if (req->op.search.scope == LDB_SCOPE_BASE) {
526                                         stop = true;
527                                 }
528                         }
529                         if (!match && 
530                             (ldb_dn_compare_base(req->op.search.base, data->partitions[i]->dn) == 0 &&
531                              req->op.search.scope != LDB_SCOPE_BASE)) {
532                                 match = true;
533                         }
534                         if (!match &&
535                             ldb_dn_compare_base(data->partitions[i]->dn, req->op.search.base) == 0) {
536                                 match = true;
537                                 stop = true; /* note that this relies on partition ordering */
538                         }
539                         if (match) {
540                                 ret = partition_prep_request(ac, data->partitions[i]);
541                                 if (ret != LDB_SUCCESS) {
542                                         return ret;
543                                 }
544                         }
545                         if (stop) break;
546                 }
547
548                 /* Perhaps we didn't match any partitions.  Try the main partition, only */
549                 if (ac->num_requests == 0) {
550                         talloc_free(ac);
551                         return ldb_next_request(module, req);
552                 }
553
554                 /* fire the first one */
555                 return partition_call_first(ac);
556
557         } else {
558                 /* Handle this like all other requests */
559                 if (search_control && (search_options->search_options & ~LDB_SEARCH_OPTION_PHANTOM_ROOT) == 0) {
560                         /* We have processed this flag, so we are done with this control now */
561
562                         /* Remove search control, so we don't confuse a backend server */
563                         if (search_control && !save_controls(search_control, req, &saved_controls)) {
564                                 ldb_oom(ldb_module_get_ctx(module));
565                                 return LDB_ERR_OPERATIONS_ERROR;
566                         }
567                 }
568
569                 return partition_replicate(module, req, req->op.search.base);
570         }
571 }
572
573 /* add */
574 static int partition_add(struct ldb_module *module, struct ldb_request *req)
575 {
576         return partition_replicate(module, req, req->op.add.message->dn);
577 }
578
579 /* modify */
580 static int partition_modify(struct ldb_module *module, struct ldb_request *req)
581 {
582         return partition_replicate(module, req, req->op.mod.message->dn);
583 }
584
585 /* delete */
586 static int partition_delete(struct ldb_module *module, struct ldb_request *req)
587 {
588         return partition_replicate(module, req, req->op.del.dn);
589 }
590
591 /* rename */
592 static int partition_rename(struct ldb_module *module, struct ldb_request *req)
593 {
594         /* Find backend */
595         struct dsdb_control_current_partition *backend, *backend2;
596         
597         struct partition_private_data *data = talloc_get_type(module->private_data, 
598                                                               struct partition_private_data);
599
600         /* Skip the lot if 'data' isn't here yet (initialisation) */
601         if (!data) {
602                 return LDB_ERR_OPERATIONS_ERROR;
603         }
604
605         backend = find_partition(data, req->op.rename.olddn);
606         backend2 = find_partition(data, req->op.rename.newdn);
607
608         if ((backend && !backend2) || (!backend && backend2)) {
609                 return LDB_ERR_AFFECTS_MULTIPLE_DSAS;
610         }
611
612         if (backend != backend2) {
613                 ldb_asprintf_errstring(ldb_module_get_ctx(module), 
614                                        "Cannot rename from %s in %s to %s in %s: %s",
615                                        ldb_dn_get_linearized(req->op.rename.olddn),
616                                        ldb_dn_get_linearized(backend->dn),
617                                        ldb_dn_get_linearized(req->op.rename.newdn),
618                                        ldb_dn_get_linearized(backend2->dn),
619                                        ldb_strerror(LDB_ERR_AFFECTS_MULTIPLE_DSAS));
620                 return LDB_ERR_AFFECTS_MULTIPLE_DSAS;
621         }
622
623         return partition_replicate(module, req, req->op.rename.olddn);
624 }
625
626 /* start a transaction */
627 static int partition_start_trans(struct ldb_module *module)
628 {
629         int i, ret;
630         struct partition_private_data *data = talloc_get_type(module->private_data, 
631                                                               struct partition_private_data);
632         /* Look at base DN */
633         /* Figure out which partition it is under */
634         /* Skip the lot if 'data' isn't here yet (initialization) */
635         ret = ldb_next_start_trans(module);
636         if (ret != LDB_SUCCESS) {
637                 return ret;
638         }
639
640         for (i=0; data && data->partitions && data->partitions[i]; i++) {
641                 struct ldb_module *next = data->partitions[i]->module;
642                 PARTITION_FIND_OP(next, start_transaction);
643
644                 ret = next->ops->start_transaction(next);
645                 if (ret != LDB_SUCCESS) {
646                         /* Back it out, if it fails on one */
647                         for (i--; i >= 0; i--) {
648                                 next = data->partitions[i]->module;
649                                 PARTITION_FIND_OP(next, del_transaction);
650
651                                 next->ops->del_transaction(next);
652                         }
653                         ldb_next_del_trans(module);
654                         return ret;
655                 }
656         }
657         return LDB_SUCCESS;
658 }
659
660 /* end a transaction */
661 static int partition_end_trans(struct ldb_module *module)
662 {
663         int i, ret, final_ret;
664         struct partition_private_data *data = talloc_get_type(module->private_data, 
665                                                               struct partition_private_data);
666         ret = ldb_next_end_trans(module);
667         if (ret != LDB_SUCCESS) {
668                 return ret;
669         }
670
671         /* if the backend has a prepare_commit op then use that, to ensure
672            that all partitions are committed safely together */
673         for (i=0; data && data->partitions && data->partitions[i]; i++) {
674                 struct ldb_module *next_end = data->partitions[i]->module;
675                 struct ldb_module *next_prepare = data->partitions[i]->module;
676                 struct ldb_module *next_del = data->partitions[i]->module;
677
678                 PARTITION_FIND_OP_NOERROR(next_prepare, prepare_commit);
679                 if (next_prepare == NULL) {
680                         continue;
681                 }
682
683                 PARTITION_FIND_OP(next_end, end_transaction);
684                 PARTITION_FIND_OP(next_del, del_transaction);
685
686                 if (next_end != next_prepare || next_del != next_end) {
687                         ldb_asprintf_errstring(ldb_module_get_ctx(module), "ERROR: Mismatch between prepare and commit ops in ldb module");
688                         return LDB_ERR_OPERATIONS_ERROR;
689                 }
690                 
691                 ret = next_prepare->ops->prepare_commit(next_prepare);
692                 if (ret != LDB_SUCCESS) {
693                         /* if one fails, cancel all but this one */
694                         int j;
695                         for (j=0; data->partitions[j]; j++) {
696                                 if (j == i) continue;
697                                 next_del = data->partitions[j]->module;
698                                 PARTITION_FIND_OP(next_del, del_transaction);
699                                 next_del->ops->del_transaction(next_del);
700                         }
701                         ldb_next_del_trans(module);
702                         return ret;
703                 }
704         }
705
706         /* Look at base DN */
707         /* Figure out which partition it is under */
708         /* Skip the lot if 'data' isn't here yet (initialisation) */
709         final_ret = LDB_SUCCESS;
710
711         for (i=0; data && data->partitions && data->partitions[i]; i++) {
712                 struct ldb_module *next = data->partitions[i]->module;
713                 PARTITION_FIND_OP(next, end_transaction);
714
715                 ret = next->ops->end_transaction(next);
716                 if (ret != LDB_SUCCESS) {
717                         /* this should only be happening if we had a serious 
718                            OS or hardware error */
719                         ldb_asprintf_errstring(ldb_module_get_ctx(module), "ERROR: partition commit error");
720                         final_ret = ret;
721                 }
722         }
723
724         return final_ret;
725 }
726
727 /* delete a transaction */
728 static int partition_del_trans(struct ldb_module *module)
729 {
730         int i, ret, ret2 = LDB_SUCCESS;
731         struct partition_private_data *data = talloc_get_type(module->private_data, 
732                                                               struct partition_private_data);
733         ret = ldb_next_del_trans(module);
734         if (ret != LDB_SUCCESS) {
735                 ret2 = ret;
736         }
737
738         /* Look at base DN */
739         /* Figure out which partition it is under */
740         /* Skip the lot if 'data' isn't here yet (initialistion) */
741         for (i=0; data && data->partitions && data->partitions[i]; i++) {
742                 struct ldb_module *next = data->partitions[i]->module;
743                 PARTITION_FIND_OP(next, del_transaction);
744
745                 ret = next->ops->del_transaction(next);
746                 if (ret != LDB_SUCCESS) {
747                         ret2 = ret;
748                 }
749         }
750         return ret2;
751 }
752
753
754 /* FIXME: This function is still semi-async */
755 static int partition_sequence_number(struct ldb_module *module, struct ldb_request *req)
756 {
757         int i, ret;
758         uint64_t seq_number = 0;
759         uint64_t timestamp_sequence = 0;
760         uint64_t timestamp = 0;
761         struct partition_private_data *data = talloc_get_type(module->private_data, 
762                                                               struct partition_private_data);
763         struct ldb_seqnum_request *seq;
764         struct ldb_seqnum_result *seqr;
765         struct ldb_request *treq;
766         struct ldb_seqnum_request *tseq;
767         struct ldb_seqnum_result *tseqr;
768         struct ldb_extended *ext;
769         struct ldb_result *res;
770
771         seq = talloc_get_type(req->op.extended.data, struct ldb_seqnum_request);
772
773         switch (seq->type) {
774         case LDB_SEQ_NEXT:
775         case LDB_SEQ_HIGHEST_SEQ:
776                 res = talloc_zero(req, struct ldb_result);
777                 if (res == NULL) {
778                         return LDB_ERR_OPERATIONS_ERROR;
779                 }
780                 tseq = talloc_zero(res, struct ldb_seqnum_request);
781                 if (tseq == NULL) {
782                         talloc_free(res);
783                         return LDB_ERR_OPERATIONS_ERROR;
784                 }
785                 tseq->type = seq->type;
786
787                 ret = ldb_build_extended_req(&treq, ldb_module_get_ctx(module), res,
788                                              LDB_EXTENDED_SEQUENCE_NUMBER,
789                                              tseq,
790                                              NULL,
791                                              res,
792                                              ldb_extended_default_callback,
793                                              NULL);
794                 ret = ldb_next_request(module, treq);
795                 if (ret == LDB_SUCCESS) {
796                         ret = ldb_wait(treq->handle, LDB_WAIT_ALL);
797                 }
798                 if (ret != LDB_SUCCESS) {
799                         talloc_free(res);
800                         return ret;
801                 }
802                 seqr = talloc_get_type(res->extended->data,
803                                         struct ldb_seqnum_result);
804                 if (seqr->flags & LDB_SEQ_TIMESTAMP_SEQUENCE) {
805                         timestamp_sequence = seqr->seq_num;
806                 } else {
807                         seq_number += seqr->seq_num;
808                 }
809                 talloc_free(res);
810
811                 /* Skip the lot if 'data' isn't here yet (initialisation) */
812                 for (i=0; data && data->partitions && data->partitions[i]; i++) {
813
814                         res = talloc_zero(req, struct ldb_result);
815                         if (res == NULL) {
816                                 return LDB_ERR_OPERATIONS_ERROR;
817                         }
818                         tseq = talloc_zero(res, struct ldb_seqnum_request);
819                         if (tseq == NULL) {
820                                 talloc_free(res);
821                                 return LDB_ERR_OPERATIONS_ERROR;
822                         }
823                         tseq->type = seq->type;
824
825                         ret = ldb_build_extended_req(&treq, ldb_module_get_ctx(module), res,
826                                                      LDB_EXTENDED_SEQUENCE_NUMBER,
827                                                      tseq,
828                                                      NULL,
829                                                      res,
830                                                      ldb_extended_default_callback,
831                                                      NULL);
832                         if (ret != LDB_SUCCESS) {
833                                 talloc_free(res);
834                                 return ret;
835                         }
836
837                         ret = ldb_request_add_control(treq,
838                                                       DSDB_CONTROL_CURRENT_PARTITION_OID,
839                                                       false, data->partitions[i]);
840                         if (ret != LDB_SUCCESS) {
841                                 talloc_free(res);
842                                 return ret;
843                         }
844
845                         ret = partition_request(data->partitions[i]->module, treq);
846                         if (ret != LDB_SUCCESS) {
847                                 talloc_free(res);
848                                 return ret;
849                         }
850                         ret = ldb_wait(treq->handle, LDB_WAIT_ALL);
851                         if (ret != LDB_SUCCESS) {
852                                 talloc_free(res);
853                                 return ret;
854                         }
855                         tseqr = talloc_get_type(res->extended->data,
856                                                 struct ldb_seqnum_result);
857                         if (tseqr->flags & LDB_SEQ_TIMESTAMP_SEQUENCE) {
858                                 timestamp_sequence = MAX(timestamp_sequence,
859                                                          tseqr->seq_num);
860                         } else {
861                                 seq_number += tseqr->seq_num;
862                         }
863                         talloc_free(res);
864                 }
865                 /* fall through */
866         case LDB_SEQ_HIGHEST_TIMESTAMP:
867
868                 res = talloc_zero(req, struct ldb_result);
869                 if (res == NULL) {
870                         return LDB_ERR_OPERATIONS_ERROR;
871                 }
872
873                 tseq = talloc_zero(res, struct ldb_seqnum_request);
874                 if (tseq == NULL) {
875                         talloc_free(res);
876                         return LDB_ERR_OPERATIONS_ERROR;
877                 }
878                 tseq->type = LDB_SEQ_HIGHEST_TIMESTAMP;
879
880                 ret = ldb_build_extended_req(&treq, ldb_module_get_ctx(module), res,
881                                              LDB_EXTENDED_SEQUENCE_NUMBER,
882                                              tseq,
883                                              NULL,
884                                              res,
885                                              ldb_extended_default_callback,
886                                              NULL);
887                 if (ret != LDB_SUCCESS) {
888                         talloc_free(res);
889                         return ret;
890                 }
891
892                 ret = ldb_next_request(module, treq);
893                 if (ret != LDB_SUCCESS) {
894                         talloc_free(res);
895                         return ret;
896                 }
897                 ret = ldb_wait(treq->handle, LDB_WAIT_ALL);
898                 if (ret != LDB_SUCCESS) {
899                         talloc_free(res);
900                         return ret;
901                 }
902
903                 tseqr = talloc_get_type(res->extended->data,
904                                            struct ldb_seqnum_result);
905                 timestamp = tseqr->seq_num;
906
907                 talloc_free(res);
908
909                 /* Skip the lot if 'data' isn't here yet (initialisation) */
910                 for (i=0; data && data->partitions && data->partitions[i]; i++) {
911
912                         res = talloc_zero(req, struct ldb_result);
913                         if (res == NULL) {
914                                 return LDB_ERR_OPERATIONS_ERROR;
915                         }
916
917                         tseq = talloc_zero(res, struct ldb_seqnum_request);
918                         if (tseq == NULL) {
919                                 talloc_free(res);
920                                 return LDB_ERR_OPERATIONS_ERROR;
921                         }
922                         tseq->type = LDB_SEQ_HIGHEST_TIMESTAMP;
923
924                         ret = ldb_build_extended_req(&treq, ldb_module_get_ctx(module), res,
925                                                      LDB_EXTENDED_SEQUENCE_NUMBER,
926                                                      tseq,
927                                                      NULL,
928                                                      res,
929                                                      ldb_extended_default_callback,
930                                                      NULL);
931                         if (ret != LDB_SUCCESS) {
932                                 talloc_free(res);
933                                 return ret;
934                         }
935
936                         ret = ldb_request_add_control(treq,
937                                                       DSDB_CONTROL_CURRENT_PARTITION_OID,
938                                                       false, data->partitions[i]);
939                         if (ret != LDB_SUCCESS) {
940                                 talloc_free(res);
941                                 return ret;
942                         }
943
944                         ret = partition_request(data->partitions[i]->module, treq);
945                         if (ret != LDB_SUCCESS) {
946                                 talloc_free(res);
947                                 return ret;
948                         }
949                         ret = ldb_wait(treq->handle, LDB_WAIT_ALL);
950                         if (ret != LDB_SUCCESS) {
951                                 talloc_free(res);
952                                 return ret;
953                         }
954
955                         tseqr = talloc_get_type(res->extended->data,
956                                                   struct ldb_seqnum_result);
957                         timestamp = MAX(timestamp, tseqr->seq_num);
958
959                         talloc_free(res);
960                 }
961
962                 break;
963         }
964
965         ext = talloc_zero(req, struct ldb_extended);
966         if (!ext) {
967                 return LDB_ERR_OPERATIONS_ERROR;
968         }
969         seqr = talloc_zero(ext, struct ldb_seqnum_result);
970         if (seqr == NULL) {
971                 talloc_free(ext);
972                 return LDB_ERR_OPERATIONS_ERROR;
973         }
974         ext->oid = LDB_EXTENDED_SEQUENCE_NUMBER;
975         ext->data = seqr;
976
977         switch (seq->type) {
978         case LDB_SEQ_NEXT:
979         case LDB_SEQ_HIGHEST_SEQ:
980
981                 /* Has someone above set a timebase sequence? */
982                 if (timestamp_sequence) {
983                         seqr->seq_num = (((unsigned long long)timestamp << 24) | (seq_number & 0xFFFFFF));
984                 } else {
985                         seqr->seq_num = seq_number;
986                 }
987
988                 if (timestamp_sequence > seqr->seq_num) {
989                         seqr->seq_num = timestamp_sequence;
990                         seqr->flags |= LDB_SEQ_TIMESTAMP_SEQUENCE;
991                 }
992
993                 seqr->flags |= LDB_SEQ_GLOBAL_SEQUENCE;
994                 break;
995         case LDB_SEQ_HIGHEST_TIMESTAMP:
996                 seqr->seq_num = timestamp;
997                 break;
998         }
999
1000         if (seq->type == LDB_SEQ_NEXT) {
1001                 seqr->seq_num++;
1002         }
1003
1004         /* send request done */
1005         return ldb_module_done(req, NULL, ext, LDB_SUCCESS);
1006 }
1007
1008 static int partition_extended_replicated_objects(struct ldb_module *module, struct ldb_request *req)
1009 {
1010         struct dsdb_extended_replicated_objects *ext;
1011
1012         ext = talloc_get_type(req->op.extended.data, struct dsdb_extended_replicated_objects);
1013         if (!ext) {
1014                 ldb_debug(ldb_module_get_ctx(module), LDB_DEBUG_FATAL, "partition_extended_replicated_objects: invalid extended data\n");
1015                 return LDB_ERR_PROTOCOL_ERROR;
1016         }
1017
1018         if (ext->version != DSDB_EXTENDED_REPLICATED_OBJECTS_VERSION) {
1019                 ldb_debug(ldb_module_get_ctx(module), LDB_DEBUG_FATAL, "partition_extended_replicated_objects: extended data invalid version [%u != %u]\n",
1020                           ext->version, DSDB_EXTENDED_REPLICATED_OBJECTS_VERSION);
1021                 return LDB_ERR_PROTOCOL_ERROR;
1022         }
1023
1024         return partition_replicate(module, req, ext->partition_dn);
1025 }
1026
1027 static int partition_extended_schema_update_now(struct ldb_module *module, struct ldb_request *req)
1028 {
1029         struct dsdb_control_current_partition *partition;
1030         struct partition_private_data *data;
1031         struct ldb_dn *schema_dn;
1032         struct partition_context *ac;
1033         int ret;
1034
1035         schema_dn = talloc_get_type(req->op.extended.data, struct ldb_dn);
1036         if (!schema_dn) {
1037                 ldb_debug(ldb_module_get_ctx(module), LDB_DEBUG_FATAL, "partition_extended: invalid extended data\n");
1038                 return LDB_ERR_PROTOCOL_ERROR;
1039         }
1040
1041         data = talloc_get_type(module->private_data, struct partition_private_data);
1042         if (!data) {
1043                 return LDB_ERR_OPERATIONS_ERROR;
1044         }
1045         
1046         partition = find_partition( data, schema_dn );
1047         if (!partition) {
1048                 return ldb_next_request(module, req);
1049         }
1050
1051         ac = partition_init_ctx(module, req);
1052         if (!ac) {
1053                 return LDB_ERR_OPERATIONS_ERROR;
1054         }
1055
1056         /* we need to add a control but we never touch the original request */
1057         ret = partition_prep_request(ac, partition);
1058         if (ret != LDB_SUCCESS) {
1059                 return ret;
1060         }
1061
1062         /* fire the first one */
1063         return partition_call_first(ac);
1064 }
1065
1066
1067 /* extended */
1068 static int partition_extended(struct ldb_module *module, struct ldb_request *req)
1069 {
1070         struct partition_private_data *data;
1071         struct partition_context *ac;
1072
1073         data = talloc_get_type(module->private_data, struct partition_private_data);
1074         if (!data || !data->partitions) {
1075                 return ldb_next_request(module, req);
1076         }
1077
1078         if (strcmp(req->op.extended.oid, LDB_EXTENDED_SEQUENCE_NUMBER) == 0) {
1079                 return partition_sequence_number(module, req);
1080         }
1081
1082         if (strcmp(req->op.extended.oid, DSDB_EXTENDED_REPLICATED_OBJECTS_OID) == 0) {
1083                 return partition_extended_replicated_objects(module, req);
1084         }
1085
1086         /* forward schemaUpdateNow operation to schema_fsmo module*/
1087         if (strcmp(req->op.extended.oid, DSDB_EXTENDED_SCHEMA_UPDATE_NOW_OID) == 0) {
1088                 return partition_extended_schema_update_now( module, req );
1089         }       
1090
1091         /* 
1092          * as the extended operation has no dn
1093          * we need to send it to all partitions
1094          */
1095
1096         ac = partition_init_ctx(module, req);
1097         if (!ac) {
1098                 return LDB_ERR_OPERATIONS_ERROR;
1099         }
1100
1101         return partition_send_all(module, ac, req);
1102 }
1103
1104 static int partition_sort_compare(const void *v1, const void *v2)
1105 {
1106         const struct dsdb_control_current_partition *p1;
1107         const struct dsdb_control_current_partition *p2;
1108
1109         p1 = *((struct dsdb_control_current_partition * const*)v1);
1110         p2 = *((struct dsdb_control_current_partition * const*)v2);
1111
1112         return ldb_dn_compare(p1->dn, p2->dn);
1113 }
1114
1115 static int partition_init(struct ldb_module *module)
1116 {
1117         int ret, i;
1118         TALLOC_CTX *mem_ctx = talloc_new(module);
1119         const char *attrs[] = { "partition", "replicateEntries", "modules", NULL };
1120         struct ldb_result *res;
1121         struct ldb_message *msg;
1122         struct ldb_message_element *partition_attributes;
1123         struct ldb_message_element *replicate_attributes;
1124         struct ldb_message_element *modules_attributes;
1125
1126         struct partition_private_data *data;
1127
1128         if (!mem_ctx) {
1129                 return LDB_ERR_OPERATIONS_ERROR;
1130         }
1131
1132         data = talloc(mem_ctx, struct partition_private_data);
1133         if (data == NULL) {
1134                 return LDB_ERR_OPERATIONS_ERROR;
1135         }
1136
1137         ret = ldb_search(ldb_module_get_ctx(module), mem_ctx, &res,
1138                          ldb_dn_new(mem_ctx, ldb_module_get_ctx(module), "@PARTITION"),
1139                          LDB_SCOPE_BASE, attrs, NULL);
1140         if (ret != LDB_SUCCESS) {
1141                 talloc_free(mem_ctx);
1142                 return ret;
1143         }
1144         if (res->count == 0) {
1145                 talloc_free(mem_ctx);
1146                 return ldb_next_init(module);
1147         }
1148
1149         if (res->count > 1) {
1150                 talloc_free(mem_ctx);
1151                 return LDB_ERR_CONSTRAINT_VIOLATION;
1152         }
1153
1154         msg = res->msgs[0];
1155
1156         partition_attributes = ldb_msg_find_element(msg, "partition");
1157         if (!partition_attributes) {
1158                 ldb_set_errstring(ldb_module_get_ctx(module), "partition_init: no partitions specified");
1159                 talloc_free(mem_ctx);
1160                 return LDB_ERR_CONSTRAINT_VIOLATION;
1161         }
1162         data->partitions = talloc_array(data, struct dsdb_control_current_partition *, partition_attributes->num_values + 1);
1163         if (!data->partitions) {
1164                 talloc_free(mem_ctx);
1165                 return LDB_ERR_OPERATIONS_ERROR;
1166         }
1167         for (i=0; i < partition_attributes->num_values; i++) {
1168                 char *base = talloc_strdup(data->partitions, (char *)partition_attributes->values[i].data);
1169                 char *p = strchr(base, ':');
1170                 if (!p) {
1171                         ldb_asprintf_errstring(ldb_module_get_ctx(module), 
1172                                                 "partition_init: "
1173                                                 "invalid form for partition record (missing ':'): %s", base);
1174                         talloc_free(mem_ctx);
1175                         return LDB_ERR_CONSTRAINT_VIOLATION;
1176                 }
1177                 p[0] = '\0';
1178                 p++;
1179                 if (!p[0]) {
1180                         ldb_asprintf_errstring(ldb_module_get_ctx(module), 
1181                                                 "partition_init: "
1182                                                 "invalid form for partition record (missing backend database): %s", base);
1183                         talloc_free(mem_ctx);
1184                         return LDB_ERR_CONSTRAINT_VIOLATION;
1185                 }
1186                 data->partitions[i] = talloc(data->partitions, struct dsdb_control_current_partition);
1187                 if (!data->partitions[i]) {
1188                         talloc_free(mem_ctx);
1189                         return LDB_ERR_OPERATIONS_ERROR;
1190                 }
1191                 data->partitions[i]->version = DSDB_CONTROL_CURRENT_PARTITION_VERSION;
1192
1193                 data->partitions[i]->dn = ldb_dn_new(data->partitions[i], ldb_module_get_ctx(module), base);
1194                 if (!data->partitions[i]->dn) {
1195                         ldb_asprintf_errstring(ldb_module_get_ctx(module), 
1196                                                 "partition_init: invalid DN in partition record: %s", base);
1197                         talloc_free(mem_ctx);
1198                         return LDB_ERR_CONSTRAINT_VIOLATION;
1199                 }
1200
1201                 data->partitions[i]->backend = samdb_relative_path(ldb_module_get_ctx(module), 
1202                                                                    data->partitions[i], 
1203                                                                    p);
1204                 if (!data->partitions[i]->backend) {
1205                         ldb_asprintf_errstring(ldb_module_get_ctx(module), 
1206                                                 "partition_init: unable to determine an relative path for partition: %s", base);
1207                         talloc_free(mem_ctx);                   
1208                 }
1209                 ret = ldb_connect_backend(ldb_module_get_ctx(module), data->partitions[i]->backend, NULL, &data->partitions[i]->module);
1210                 if (ret != LDB_SUCCESS) {
1211                         talloc_free(mem_ctx);
1212                         return ret;
1213                 }
1214         }
1215         data->partitions[i] = NULL;
1216
1217         /* sort these into order, most to least specific */
1218         qsort(data->partitions, partition_attributes->num_values,
1219               sizeof(*data->partitions), partition_sort_compare);
1220
1221         for (i=0; data->partitions[i]; i++) {
1222                 struct ldb_request *req;
1223                 req = talloc_zero(mem_ctx, struct ldb_request);
1224                 if (req == NULL) {
1225                         ldb_debug(ldb_module_get_ctx(module), LDB_DEBUG_ERROR, "partition: Out of memory!\n");
1226                         talloc_free(mem_ctx);
1227                         return LDB_ERR_OPERATIONS_ERROR;
1228                 }
1229                 
1230                 req->operation = LDB_REQ_REGISTER_PARTITION;
1231                 req->op.reg_partition.dn = data->partitions[i]->dn;
1232                 req->callback = ldb_op_default_callback;
1233
1234                 ldb_set_timeout(ldb_module_get_ctx(module), req, 0);
1235
1236                 req->handle = ldb_handle_new(req, ldb_module_get_ctx(module));
1237                 if (req->handle == NULL) {
1238                         return LDB_ERR_OPERATIONS_ERROR;
1239                 }
1240                 
1241                 ret = ldb_request(ldb_module_get_ctx(module), req);
1242                 if (ret == LDB_SUCCESS) {
1243                         ret = ldb_wait(req->handle, LDB_WAIT_ALL);
1244                 }
1245                 if (ret != LDB_SUCCESS) {
1246                         ldb_debug(ldb_module_get_ctx(module), LDB_DEBUG_ERROR, "partition: Unable to register partition with rootdse!\n");
1247                         talloc_free(mem_ctx);
1248                         return LDB_ERR_OTHER;
1249                 }
1250                 talloc_free(req);
1251         }
1252
1253         replicate_attributes = ldb_msg_find_element(msg, "replicateEntries");
1254         if (!replicate_attributes) {
1255                 data->replicate = NULL;
1256         } else {
1257                 data->replicate = talloc_array(data, struct ldb_dn *, replicate_attributes->num_values + 1);
1258                 if (!data->replicate) {
1259                         talloc_free(mem_ctx);
1260                         return LDB_ERR_OPERATIONS_ERROR;
1261                 }
1262
1263                 for (i=0; i < replicate_attributes->num_values; i++) {
1264                         data->replicate[i] = ldb_dn_from_ldb_val(data->replicate, ldb_module_get_ctx(module), &replicate_attributes->values[i]);
1265                         if (!ldb_dn_validate(data->replicate[i])) {
1266                                 ldb_asprintf_errstring(ldb_module_get_ctx(module), 
1267                                                         "partition_init: "
1268                                                         "invalid DN in partition replicate record: %s", 
1269                                                         replicate_attributes->values[i].data);
1270                                 talloc_free(mem_ctx);
1271                                 return LDB_ERR_CONSTRAINT_VIOLATION;
1272                         }
1273                 }
1274                 data->replicate[i] = NULL;
1275         }
1276
1277         /* Make the private data available to any searches the modules may trigger in initialisation */
1278         module->private_data = data;
1279         talloc_steal(module, data);
1280         
1281         modules_attributes = ldb_msg_find_element(msg, "modules");
1282         if (modules_attributes) {
1283                 for (i=0; i < modules_attributes->num_values; i++) {
1284                         struct ldb_dn *base_dn;
1285                         int partition_idx;
1286                         struct dsdb_control_current_partition *partition = NULL;
1287                         const char **modules = NULL;
1288
1289                         char *base = talloc_strdup(data->partitions, (char *)modules_attributes->values[i].data);
1290                         char *p = strchr(base, ':');
1291                         if (!p) {
1292                                 ldb_asprintf_errstring(ldb_module_get_ctx(module), 
1293                                                         "partition_init: "
1294                                                         "invalid form for partition module record (missing ':'): %s", base);
1295                                 talloc_free(mem_ctx);
1296                                 return LDB_ERR_CONSTRAINT_VIOLATION;
1297                         }
1298                         p[0] = '\0';
1299                         p++;
1300                         if (!p[0]) {
1301                                 ldb_asprintf_errstring(ldb_module_get_ctx(module), 
1302                                                         "partition_init: "
1303                                                         "invalid form for partition module record (missing backend database): %s", base);
1304                                 talloc_free(mem_ctx);
1305                                 return LDB_ERR_CONSTRAINT_VIOLATION;
1306                         }
1307
1308                         modules = ldb_modules_list_from_string(ldb_module_get_ctx(module), mem_ctx,
1309                                                                p);
1310                         
1311                         base_dn = ldb_dn_new(mem_ctx, ldb_module_get_ctx(module), base);
1312                         if (!ldb_dn_validate(base_dn)) {
1313                                 talloc_free(mem_ctx);
1314                                 return LDB_ERR_OPERATIONS_ERROR;
1315                         }
1316                         
1317                         for (partition_idx = 0; data->partitions[partition_idx]; partition_idx++) {
1318                                 if (ldb_dn_compare(data->partitions[partition_idx]->dn, base_dn) == 0) {
1319                                         partition = data->partitions[partition_idx];
1320                                         break;
1321                                 }
1322                         }
1323                         
1324                         if (!partition) {
1325                                 ldb_asprintf_errstring(ldb_module_get_ctx(module), 
1326                                                         "partition_init: "
1327                                                         "invalid form for partition module record (no such partition): %s", base);
1328                                 talloc_free(mem_ctx);
1329                                 return LDB_ERR_CONSTRAINT_VIOLATION;
1330                         }
1331                         
1332                         ret = ldb_load_modules_list(ldb_module_get_ctx(module), modules, partition->module, &partition->module);
1333                         if (ret != LDB_SUCCESS) {
1334                                 ldb_asprintf_errstring(ldb_module_get_ctx(module), 
1335                                                        "partition_init: "
1336                                                        "loading backend for %s failed: %s", 
1337                                                        base, ldb_errstring(ldb_module_get_ctx(module)));
1338                                 talloc_free(mem_ctx);
1339                                 return ret;
1340                         }
1341                         ret = ldb_init_module_chain(ldb_module_get_ctx(module), partition->module);
1342                         if (ret != LDB_SUCCESS) {
1343                                 ldb_asprintf_errstring(ldb_module_get_ctx(module), 
1344                                                        "partition_init: "
1345                                                        "initialising backend for %s failed: %s", 
1346                                                        base, ldb_errstring(ldb_module_get_ctx(module)));
1347                                 talloc_free(mem_ctx);
1348                                 return ret;
1349                         }
1350                 }
1351         }
1352
1353         ret = ldb_mod_register_control(module, LDB_CONTROL_DOMAIN_SCOPE_OID);
1354         if (ret != LDB_SUCCESS) {
1355                 ldb_debug(ldb_module_get_ctx(module), LDB_DEBUG_ERROR,
1356                         "partition: Unable to register control with rootdse!\n");
1357                 return LDB_ERR_OPERATIONS_ERROR;
1358         }
1359
1360         ret = ldb_mod_register_control(module, LDB_CONTROL_SEARCH_OPTIONS_OID);
1361         if (ret != LDB_SUCCESS) {
1362                 ldb_debug(ldb_module_get_ctx(module), LDB_DEBUG_ERROR,
1363                         "partition: Unable to register control with rootdse!\n");
1364                 return LDB_ERR_OPERATIONS_ERROR;
1365         }
1366
1367         talloc_free(mem_ctx);
1368         return ldb_next_init(module);
1369 }
1370
1371 _PUBLIC_ const struct ldb_module_ops ldb_partition_module_ops = {
1372         .name              = "partition",
1373         .init_context      = partition_init,
1374         .search            = partition_search,
1375         .add               = partition_add,
1376         .modify            = partition_modify,
1377         .del               = partition_delete,
1378         .rename            = partition_rename,
1379         .extended          = partition_extended,
1380         .start_transaction = partition_start_trans,
1381         .end_transaction   = partition_end_trans,
1382         .del_transaction   = partition_del_trans,
1383 };