r20826: make the dsdb_control_current_partition struct public and allocate an oid...
[metze/samba/wip.git] / source4 / dsdb / samdb / ldb_modules / partition.c
1 /* 
2    Partitions ldb module
3
4    Copyright (C) Andrew Bartlett <abartlet@samba.org> 2006
5    Copyright (C) Stefan Metzmacher <metze@samba.org> 2007
6
7    * NOTICE: this module is NOT released under the GNU LGPL license as
8    * other ldb code. This module is release under the GNU GPL v2 or
9    * later license.
10
11    This program is free software; you can redistribute it and/or modify
12    it under the terms of the GNU General Public License as published by
13    the Free Software Foundation; either version 2 of the License, or
14    (at your option) any later version.
15    
16    This program is distributed in the hope that it will be useful,
17    but WITHOUT ANY WARRANTY; without even the implied warranty of
18    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19    GNU General Public License for more details.
20    
21    You should have received a copy of the GNU General Public License
22    along with this program; if not, write to the Free Software
23    Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
24 */
25
26 /*
27  *  Name: ldb
28  *
29  *  Component: ldb partitions module
30  *
31  *  Description: Implement LDAP partitions
32  *
33  *  Author: Andrew Bartlett
34  *  Author: Stefan Metzmacher
35  */
36
37 #include "includes.h"
38 #include "ldb/include/includes.h"
39 #include "dsdb/samdb/samdb.h"
40
41 struct partition_private_data {
42         struct dsdb_control_current_partition **partitions;
43         struct ldb_dn **replicate;
44 };
45
46 struct partition_context {
47         struct ldb_module *module;
48         struct ldb_handle *handle;
49         struct ldb_request *orig_req;
50
51         struct ldb_request **down_req;
52         int num_requests;
53         int finished_requests;
54 };
55
56 static struct partition_context *partition_init_handle(struct ldb_request *req, struct ldb_module *module)
57 {
58         struct partition_context *ac;
59         struct ldb_handle *h;
60
61         h = talloc_zero(req, struct ldb_handle);
62         if (h == NULL) {
63                 ldb_set_errstring(module->ldb, "Out of Memory");
64                 return NULL;
65         }
66
67         h->module = module;
68
69         ac = talloc_zero(h, struct partition_context);
70         if (ac == NULL) {
71                 ldb_set_errstring(module->ldb, "Out of Memory");
72                 talloc_free(h);
73                 return NULL;
74         }
75
76         h->private_data = ac;
77
78         ac->module = module;
79         ac->handle = h;
80         ac->orig_req = req;
81
82         req->handle = h;
83
84         return ac;
85 }
86
87 struct ldb_module *make_module_for_next_request(TALLOC_CTX *mem_ctx, 
88                                                 struct ldb_context *ldb,
89                                                 struct ldb_module *module)
90 {
91         struct ldb_module *current;
92         static const struct ldb_module_ops ops; /* zero */
93         current = talloc_zero(mem_ctx, struct ldb_module);
94         if (current == NULL) {
95                 return module;
96         }
97         
98         current->ldb = ldb;
99         current->ops = &ops;
100         current->prev = NULL;
101         current->next = module;
102         return current;
103 }
104
105 struct ldb_module *find_backend(struct ldb_module *module, struct ldb_request *req, struct ldb_dn *dn)
106 {
107         int i;
108         struct partition_private_data *data = talloc_get_type(module->private_data, 
109                                                               struct partition_private_data);
110         /* Look at base DN */
111         /* Figure out which partition it is under */
112         /* Skip the lot if 'data' isn't here yet (initialistion) */
113         for (i=0; data && data->partitions && data->partitions[i]; i++) {
114                 if (ldb_dn_compare_base(data->partitions[i]->dn, dn) == 0) {
115                         return make_module_for_next_request(req, module->ldb, data->partitions[i]->module);
116                 }
117         }
118
119         return module;
120 };
121
122
123 /*
124   fire the caller's callback for every entry, but only send 'done' once.
125 */
126 static int partition_search_callback(struct ldb_context *ldb, void *context, struct ldb_reply *ares)
127 {
128         struct partition_context *ac;
129
130         if (!context || !ares) {
131                 ldb_set_errstring(ldb, "partition_search_callback: NULL Context or Result in 'search' callback");
132                 goto error;
133         }
134
135         ac = talloc_get_type(context, struct partition_context);
136
137         if (ares->type == LDB_REPLY_ENTRY) {
138                 return ac->orig_req->callback(ldb, ac->orig_req->context, ares);
139         } else {
140                 ac->finished_requests++;
141                 if (ac->finished_requests == ac->num_requests) {
142                         return ac->orig_req->callback(ldb, ac->orig_req->context, ares);
143                 } else {
144                         talloc_free(ares);
145                         return LDB_SUCCESS;
146                 }
147         }
148 error:
149         talloc_free(ares);
150         return LDB_ERR_OPERATIONS_ERROR;
151 }
152
153 /*
154   only fire the 'last' callback, and only for START-TLS for now 
155 */
156 static int partition_other_callback(struct ldb_context *ldb, void *context, struct ldb_reply *ares)
157 {
158         struct partition_context *ac;
159
160         if (!context) {
161                 ldb_set_errstring(ldb, "partition_other_callback: NULL Context in 'other' callback");
162                 goto error;
163         }
164
165         ac = talloc_get_type(context, struct partition_context);
166
167         if (!ac->orig_req->callback) {
168                 talloc_free(ares);
169                 return LDB_SUCCESS;
170         }
171
172         if (!ares 
173             || (ares->type == LDB_REPLY_EXTENDED 
174                 && strcmp(ares->response->oid, LDB_EXTENDED_START_TLS_OID))) {
175                 ac->finished_requests++;
176                 if (ac->finished_requests == ac->num_requests) {
177                         return ac->orig_req->callback(ldb, ac->orig_req->context, ares);
178                 }
179                 talloc_free(ares);
180                 return LDB_SUCCESS;
181         }
182         ldb_set_errstring(ldb, "partition_other_callback: Unknown reply type, only supports START_TLS");
183 error:
184         talloc_free(ares);
185         return LDB_ERR_OPERATIONS_ERROR;
186 }
187
188
189 static int partition_send_request(struct partition_context *ac, struct ldb_module *partition, 
190                                   struct ldb_dn *partition_base_dn)
191 {
192         int ret;
193         struct ldb_module *next = make_module_for_next_request(ac->module, ac->module->ldb, partition);
194         struct ldb_request *req;
195         ac->down_req = talloc_realloc(ac, ac->down_req, 
196                                         struct ldb_request *, ac->num_requests + 1);
197         if (!ac->down_req) {
198                 ldb_set_errstring(ac->module->ldb, "Out of Memory");
199                 return LDB_ERR_OPERATIONS_ERROR;
200         }
201         req = ac->down_req[ac->num_requests] = talloc(ac, struct ldb_request);
202         if (req == NULL) {
203                 ldb_set_errstring(ac->module->ldb, "Out of Memory");
204                 return LDB_ERR_OPERATIONS_ERROR;
205         }
206         
207         *ac->down_req[ac->num_requests] = *ac->orig_req; /* copy the request */
208
209         if (req->operation == LDB_SEARCH) {
210                 /* If the search is for 'more' than this partition,
211                  * then change the basedn, so a remote LDAP server
212                  * doesn't object */
213                 if (ldb_dn_compare_base(partition_base_dn, req->op.search.base) != 0) {
214                         req->op.search.base = partition_base_dn;
215                 }
216                 req->callback = partition_search_callback;
217                 req->context = ac;
218         } else {
219                 req->callback = partition_other_callback;
220                 req->context = ac;
221         }
222
223         /* Spray off search requests to all backends */
224         ret = ldb_next_request(next, req); 
225         if (ret != LDB_SUCCESS) {
226                 return ret;
227         }
228         
229         ac->num_requests++;
230         return LDB_SUCCESS;
231 }
232
233 /* Send a request down to all the partitions */
234 static int partition_send_all(struct ldb_module *module, 
235                               struct partition_context *ac, struct ldb_request *req) 
236 {
237         int i;
238         struct partition_private_data *data = talloc_get_type(module->private_data, 
239                                                               struct partition_private_data);
240         int ret = partition_send_request(ac, module->next, NULL);
241         if (ret != LDB_SUCCESS) {
242                 return ret;
243         }
244         for (i=0; data && data->partitions && data->partitions[i]; i++) {
245                 ret = partition_send_request(ac, data->partitions[i]->module, data->partitions[i]->dn);
246                 if (ret != LDB_SUCCESS) {
247                         return ret;
248                 }
249         }
250         return LDB_SUCCESS;
251 }
252
253 /* Figure out which backend a request needs to be aimed at.  Some
254  * requests must be replicated to all backends */
255 static int partition_replicate(struct ldb_module *module, struct ldb_request *req, struct ldb_dn *dn) 
256 {
257         int i;
258         struct ldb_module *backend;
259         struct partition_private_data *data = talloc_get_type(module->private_data, 
260                                                               struct partition_private_data);
261         
262         /* Is this a special DN, we need to replicate to every backend? */
263         for (i=0; data->replicate && data->replicate[i]; i++) {
264                 if (ldb_dn_compare(data->replicate[i], 
265                                    dn) == 0) {
266                         struct partition_context *ac;
267                         
268                         ac = partition_init_handle(req, module);
269                         if (!ac) {
270                                 return LDB_ERR_OPERATIONS_ERROR;
271                         }
272                         
273                         return partition_send_all(module, ac, req);
274                 }
275         }
276
277         /* Otherwise, we need to find the backend to fire it to */
278
279         /* Find backend */
280         backend = find_backend(module, req, dn);
281         
282         /* issue request */
283         return ldb_next_request(backend, req);
284         
285 }
286
287 /* search */
288 static int partition_search(struct ldb_module *module, struct ldb_request *req)
289 {
290         /* Find backend */
291         struct partition_private_data *data = talloc_get_type(module->private_data, 
292                                                               struct partition_private_data);
293         /* issue request */
294
295         /* (later) consider if we should be searching multiple
296          * partitions (for 'invisible' partition behaviour */
297         if (ldb_get_opaque(module->ldb, "global_catalog")) {
298                 int ret, i;
299                 struct partition_context *ac;
300                 
301                 ac = partition_init_handle(req, module);
302                 if (!ac) {
303                         return LDB_ERR_OPERATIONS_ERROR;
304                 }
305
306                 /* Search from the base DN */
307                 if (!req->op.search.base || ldb_dn_is_null(req->op.search.base)) {
308                         return partition_send_all(module, ac, req);
309                 }
310                 for (i=0; data && data->partitions && data->partitions[i]; i++) {
311                         /* Find all partitions under the search base */
312                         if (ldb_dn_compare_base(req->op.search.base, data->partitions[i]->dn) == 0) {
313                                 ret = partition_send_request(ac, data->partitions[i]->module, data->partitions[i]->dn);
314                                 if (ret != LDB_SUCCESS) {
315                                         return ret;
316                                 }
317                         }
318                 }
319
320                 /* Perhaps we didn't match any partitions.  Try the main partition, only */
321                 if (ac->num_requests == 0) {
322                         talloc_free(ac);
323                         return ldb_next_request(module, req);
324                 }
325                 
326                 return LDB_SUCCESS;
327         } else {
328                 struct ldb_module *backend = find_backend(module, req, req->op.search.base);
329         
330                 return ldb_next_request(backend, req);
331         }
332 }
333
334 /* add */
335 static int partition_add(struct ldb_module *module, struct ldb_request *req)
336 {
337         return partition_replicate(module, req, req->op.add.message->dn);
338 }
339
340 /* modify */
341 static int partition_modify(struct ldb_module *module, struct ldb_request *req)
342 {
343         return partition_replicate(module, req, req->op.mod.message->dn);
344 }
345
346 /* delete */
347 static int partition_delete(struct ldb_module *module, struct ldb_request *req)
348 {
349         return partition_replicate(module, req, req->op.del.dn);
350 }
351
352 /* rename */
353 static int partition_rename(struct ldb_module *module, struct ldb_request *req)
354 {
355         /* Find backend */
356         struct ldb_module *backend = find_backend(module, req, req->op.rename.olddn);
357         struct ldb_module *backend2 = find_backend(module, req, req->op.rename.newdn);
358
359         if (backend->next != backend2->next) {
360                 return LDB_ERR_AFFECTS_MULTIPLE_DSAS;
361         }
362
363         return partition_replicate(module, req, req->op.rename.olddn);
364 }
365
366 /* start a transaction */
367 static int partition_start_trans(struct ldb_module *module)
368 {
369         int i, ret;
370         struct partition_private_data *data = talloc_get_type(module->private_data, 
371                                                               struct partition_private_data);
372         /* Look at base DN */
373         /* Figure out which partition it is under */
374         /* Skip the lot if 'data' isn't here yet (initialistion) */
375         ret = ldb_next_start_trans(module);
376         if (ret != LDB_SUCCESS) {
377                 return ret;
378         }
379
380         for (i=0; data && data->partitions && data->partitions[i]; i++) {
381                 struct ldb_module *next = make_module_for_next_request(module, module->ldb, data->partitions[i]->module);
382
383                 ret = ldb_next_start_trans(next);
384                 talloc_free(next);
385                 if (ret != LDB_SUCCESS) {
386                         /* Back it out, if it fails on one */
387                         for (i--; i >= 0; i--) {
388                                 next = make_module_for_next_request(module, module->ldb, data->partitions[i]->module);
389                                 ldb_next_del_trans(next);
390                                 talloc_free(next);
391                         }
392                         return ret;
393                 }
394         }
395         return LDB_SUCCESS;
396 }
397
398 /* end a transaction */
399 static int partition_end_trans(struct ldb_module *module)
400 {
401         int i, ret, ret2 = LDB_SUCCESS;
402         struct partition_private_data *data = talloc_get_type(module->private_data, 
403                                                               struct partition_private_data);
404         ret = ldb_next_end_trans(module);
405         if (ret != LDB_SUCCESS) {
406                 return ret;
407         }
408
409         /* Look at base DN */
410         /* Figure out which partition it is under */
411         /* Skip the lot if 'data' isn't here yet (initialistion) */
412         for (i=0; data && data->partitions && data->partitions[i]; i++) {
413                 struct ldb_module *next = make_module_for_next_request(module, module->ldb, data->partitions[i]->module);
414                 
415                 ret = ldb_next_end_trans(next);
416                 talloc_free(next);
417                 if (ret != LDB_SUCCESS) {
418                         ret2 = ret;
419                 }
420         }
421
422         if (ret != LDB_SUCCESS) {
423                 /* Back it out, if it fails on one */
424                 for (i=0; data && data->partitions && data->partitions[i]; i++) {
425                         struct ldb_module *next = make_module_for_next_request(module, module->ldb, data->partitions[i]->module);
426                         ldb_next_del_trans(next);
427                         talloc_free(next);
428                 }
429         }
430         return ret;
431 }
432
433 /* delete a transaction */
434 static int partition_del_trans(struct ldb_module *module)
435 {
436         int i, ret, ret2 = LDB_SUCCESS;
437         struct partition_private_data *data = talloc_get_type(module->private_data, 
438                                                               struct partition_private_data);
439         ret = ldb_next_del_trans(module);
440         if (ret != LDB_SUCCESS) {
441                 ret2 = ret;
442         }
443
444         /* Look at base DN */
445         /* Figure out which partition it is under */
446         /* Skip the lot if 'data' isn't here yet (initialistion) */
447         for (i=0; data && data->partitions && data->partitions[i]; i++) {
448                 struct ldb_module *next = make_module_for_next_request(module, module->ldb, data->partitions[i]->module);
449                 
450                 ret = ldb_next_del_trans(next);
451                 talloc_free(next);
452                 if (ret != LDB_SUCCESS) {
453                         ret2 = ret;
454                 }
455         }
456         return ret2;
457 }
458
459 static int partition_sequence_number(struct ldb_module *module, struct ldb_request *req)
460 {
461         int i, ret;
462         uint64_t seq_number = 0;
463         uint64_t timestamp_sequence = 0;
464         uint64_t timestamp = 0;
465         struct partition_private_data *data = talloc_get_type(module->private_data, 
466                                                               struct partition_private_data);
467
468         switch (req->op.seq_num.type) {
469         case LDB_SEQ_NEXT:
470         case LDB_SEQ_HIGHEST_SEQ:
471                 ret = ldb_next_request(module, req);
472                 if (ret != LDB_SUCCESS) {
473                         return ret;
474                 }
475                 if (req->op.seq_num.flags & LDB_SEQ_TIMESTAMP_SEQUENCE) {
476                         timestamp_sequence = req->op.seq_num.seq_num;
477                 } else {
478                         seq_number = seq_number + req->op.seq_num.seq_num;
479                 }
480
481                 /* Look at base DN */
482                 /* Figure out which partition it is under */
483                 /* Skip the lot if 'data' isn't here yet (initialistion) */
484                 for (i=0; data && data->partitions && data->partitions[i]; i++) {
485                         struct ldb_module *next = make_module_for_next_request(req, module->ldb, data->partitions[i]->module);
486                         
487                         ret = ldb_next_request(next, req);
488                         talloc_free(next);
489                         if (ret != LDB_SUCCESS) {
490                                 return ret;
491                         }
492                         if (req->op.seq_num.flags & LDB_SEQ_TIMESTAMP_SEQUENCE) {
493                                 timestamp_sequence = MAX(timestamp_sequence, req->op.seq_num.seq_num);
494                         } else {
495                                 seq_number = seq_number + req->op.seq_num.seq_num;
496                         }
497                 }
498                 /* fall though */
499         case LDB_SEQ_HIGHEST_TIMESTAMP:
500         {
501                 struct ldb_request *date_req = talloc(req, struct ldb_request);
502                 if (!date_req) {
503                         return LDB_ERR_OPERATIONS_ERROR;
504                 }
505                 *date_req = *req;
506                 date_req->op.seq_num.flags = LDB_SEQ_HIGHEST_TIMESTAMP;
507
508                 ret = ldb_next_request(module, date_req);
509                 if (ret != LDB_SUCCESS) {
510                         return ret;
511                 }
512                 timestamp = date_req->op.seq_num.seq_num;
513                 
514                 /* Look at base DN */
515                 /* Figure out which partition it is under */
516                 /* Skip the lot if 'data' isn't here yet (initialistion) */
517                 for (i=0; data && data->partitions && data->partitions[i]; i++) {
518                         struct ldb_module *next = make_module_for_next_request(req, module->ldb, data->partitions[i]->module);
519                         
520                         ret = ldb_next_request(next, date_req);
521                         talloc_free(next);
522                         if (ret != LDB_SUCCESS) {
523                                 return ret;
524                         }
525                         timestamp = MAX(timestamp, date_req->op.seq_num.seq_num);
526                 }
527                 break;
528         }
529         }
530
531         switch (req->op.seq_num.flags) {
532         case LDB_SEQ_NEXT:
533         case LDB_SEQ_HIGHEST_SEQ:
534                 
535                 req->op.seq_num.flags = 0;
536
537                 /* Has someone above set a timebase sequence? */
538                 if (timestamp_sequence) {
539                         req->op.seq_num.seq_num = (((unsigned long long)timestamp << 24) | (seq_number & 0xFFFFFF));
540                 } else {
541                         req->op.seq_num.seq_num = seq_number;
542                 }
543
544                 if (timestamp_sequence > req->op.seq_num.seq_num) {
545                         req->op.seq_num.seq_num = timestamp_sequence;
546                         req->op.seq_num.flags |= LDB_SEQ_TIMESTAMP_SEQUENCE;
547                 }
548
549                 req->op.seq_num.flags |= LDB_SEQ_GLOBAL_SEQUENCE;
550                 break;
551         case LDB_SEQ_HIGHEST_TIMESTAMP:
552                 req->op.seq_num.seq_num = timestamp;
553                 break;
554         }
555
556         switch (req->op.seq_num.flags) {
557         case LDB_SEQ_NEXT:
558                 req->op.seq_num.seq_num++;
559         }
560         return LDB_SUCCESS;
561 }
562
563 static int partition_extended_replicated_objects(struct ldb_module *module, struct ldb_request *req)
564 {
565         struct dsdb_extended_replicated_objects *ext;
566
567         ext = talloc_get_type(req->op.extended.data, struct dsdb_extended_replicated_objects);
568         if (!ext) {
569                 ldb_debug(module->ldb, LDB_DEBUG_FATAL, "partition_extended_replicated_objects: invalid extended data\n");
570                 return LDB_ERR_PROTOCOL_ERROR;
571         }
572
573         if (ext->version != DSDB_EXTENDED_REPLICATED_OBJECTS_VERSION) {
574                 ldb_debug(module->ldb, LDB_DEBUG_FATAL, "partition_extended_replicated_objects: extended data invalid version [%u != %u]\n",
575                           ext->version, DSDB_EXTENDED_REPLICATED_OBJECTS_VERSION);
576                 return LDB_ERR_PROTOCOL_ERROR;
577         }
578
579         return partition_replicate(module, req, ext->partition_dn);
580 }
581
582 /* extended */
583 static int partition_extended(struct ldb_module *module, struct ldb_request *req)
584 {
585         struct partition_context *ac;
586
587         if (strcmp(req->op.extended.oid, DSDB_EXTENDED_REPLICATED_OBJECTS_OID) == 0) {
588                 return partition_extended_replicated_objects(module, req);
589         }
590
591         /* 
592          * as the extended operation has no dn
593          * we need to send it to all partitions
594          */
595
596         ac = partition_init_handle(req, module);
597         if (!ac) {
598                 return LDB_ERR_OPERATIONS_ERROR;
599         }
600                         
601         return partition_send_all(module, ac, req);
602 }
603
604 static int sort_compare(void *void1,
605                         void *void2, void *opaque)
606 {
607         struct dsdb_control_current_partition **pp1 = void1;
608         struct dsdb_control_current_partition **pp2 = void2;
609         struct dsdb_control_current_partition *partition1 = talloc_get_type(*pp1,
610                                                             struct dsdb_control_current_partition);
611         struct dsdb_control_current_partition *partition2 = talloc_get_type(*pp2,
612                                                             struct dsdb_control_current_partition);
613
614         return ldb_dn_compare(partition1->dn, partition2->dn);
615 }
616
617 static int partition_init(struct ldb_module *module)
618 {
619         int ret, i;
620         TALLOC_CTX *mem_ctx = talloc_new(module);
621         static const char *attrs[] = { "partition", "replicateEntries", "modules", NULL };
622         struct ldb_result *res;
623         struct ldb_message *msg;
624         struct ldb_message_element *partition_attributes;
625         struct ldb_message_element *replicate_attributes;
626         struct ldb_message_element *modules_attributes;
627
628         struct partition_private_data *data;
629
630         if (!mem_ctx) {
631                 return LDB_ERR_OPERATIONS_ERROR;
632         }
633
634         data = talloc(mem_ctx, struct partition_private_data);
635         if (data == NULL) {
636                 return LDB_ERR_OPERATIONS_ERROR;
637         }
638
639         ret = ldb_search(module->ldb, ldb_dn_new(mem_ctx, module->ldb, "@PARTITION"),
640                          LDB_SCOPE_BASE,
641                          NULL, attrs,
642                          &res);
643         if (ret != LDB_SUCCESS) {
644                 talloc_free(mem_ctx);
645                 return ret;
646         }
647         talloc_steal(mem_ctx, res);
648         if (res->count == 0) {
649                 talloc_free(mem_ctx);
650                 return ldb_next_init(module);
651         }
652
653         if (res->count > 1) {
654                 talloc_free(mem_ctx);
655                 return LDB_ERR_CONSTRAINT_VIOLATION;
656         }
657
658         msg = res->msgs[0];
659
660         partition_attributes = ldb_msg_find_element(msg, "partition");
661         if (!partition_attributes) {
662                 ldb_set_errstring(module->ldb, "partition_init: no partitions specified");
663                 talloc_free(mem_ctx);
664                 return LDB_ERR_CONSTRAINT_VIOLATION;
665         }
666         data->partitions = talloc_array(data, struct dsdb_control_current_partition *, partition_attributes->num_values + 1);
667         if (!data->partitions) {
668                 talloc_free(mem_ctx);
669                 return LDB_ERR_OPERATIONS_ERROR;
670         }
671         for (i=0; i < partition_attributes->num_values; i++) {
672                 char *base = talloc_strdup(data->partitions, (char *)partition_attributes->values[i].data);
673                 char *p = strchr(base, ':');
674                 if (!p) {
675                         ldb_asprintf_errstring(module->ldb, 
676                                                 "partition_init: "
677                                                 "invalid form for partition record (missing ':'): %s", base);
678                         talloc_free(mem_ctx);
679                         return LDB_ERR_CONSTRAINT_VIOLATION;
680                 }
681                 p[0] = '\0';
682                 p++;
683                 if (!p[0]) {
684                         ldb_asprintf_errstring(module->ldb, 
685                                                 "partition_init: "
686                                                 "invalid form for partition record (missing backend database): %s", base);
687                         talloc_free(mem_ctx);
688                         return LDB_ERR_CONSTRAINT_VIOLATION;
689                 }
690                 data->partitions[i] = talloc(data->partitions, struct dsdb_control_current_partition);
691                 if (!data->partitions[i]) {
692                         talloc_free(mem_ctx);
693                         return LDB_ERR_OPERATIONS_ERROR;
694                 }
695                 data->partitions[i]->version = DSDB_CONTROL_CURRENT_PARTITION_VERSION;
696
697                 data->partitions[i]->dn = ldb_dn_new(data->partitions[i], module->ldb, base);
698                 if (!data->partitions[i]->dn) {
699                         ldb_asprintf_errstring(module->ldb, 
700                                                 "partition_init: invalid DN in partition record: %s", base);
701                         talloc_free(mem_ctx);
702                         return LDB_ERR_CONSTRAINT_VIOLATION;
703                 }
704
705                 data->partitions[i]->backend = private_path(data->partitions[i], p);
706                 ret = ldb_connect_backend(module->ldb, data->partitions[i]->backend, NULL, &data->partitions[i]->module);
707                 if (ret != LDB_SUCCESS) {
708                         talloc_free(mem_ctx);
709                         return ret;
710                 }
711         }
712         data->partitions[i] = NULL;
713
714         /* sort these into order, most to least specific */
715         ldb_qsort(data->partitions, partition_attributes->num_values, sizeof(*data->partitions), 
716                   module->ldb, sort_compare);
717
718         for (i=0; data->partitions[i]; i++) {
719                 struct ldb_request *req;
720                 req = talloc_zero(mem_ctx, struct ldb_request);
721                 if (req == NULL) {
722                         ldb_debug(module->ldb, LDB_DEBUG_ERROR, "partition: Out of memory!\n");
723                         talloc_free(mem_ctx);
724                         return LDB_ERR_OPERATIONS_ERROR;
725                 }
726                 
727                 req->operation = LDB_REQ_REGISTER_PARTITION;
728                 req->op.reg_partition.dn = data->partitions[i]->dn;
729                 
730                 ret = ldb_request(module->ldb, req);
731                 if (ret != LDB_SUCCESS) {
732                         ldb_debug(module->ldb, LDB_DEBUG_ERROR, "partition: Unable to register partition with rootdse!\n");
733                         talloc_free(mem_ctx);
734                         return LDB_ERR_OTHER;
735                 }
736                 talloc_free(req);
737         }
738
739         replicate_attributes = ldb_msg_find_element(msg, "replicateEntries");
740         if (!replicate_attributes) {
741                 data->replicate = NULL;
742         } else {
743                 data->replicate = talloc_array(data, struct ldb_dn *, replicate_attributes->num_values + 1);
744                 if (!data->replicate) {
745                         talloc_free(mem_ctx);
746                         return LDB_ERR_OPERATIONS_ERROR;
747                 }
748                 
749                 for (i=0; i < replicate_attributes->num_values; i++) {
750                         data->replicate[i] = ldb_dn_new(data->replicate, module->ldb, (const char *)replicate_attributes->values[i].data);
751                         if (!ldb_dn_validate(data->replicate[i])) {
752                                 ldb_asprintf_errstring(module->ldb, 
753                                                         "partition_init: "
754                                                         "invalid DN in partition replicate record: %s", 
755                                                         replicate_attributes->values[i].data);
756                                 talloc_free(mem_ctx);
757                                 return LDB_ERR_CONSTRAINT_VIOLATION;
758                         }
759                 }
760                 data->replicate[i] = NULL;
761         }
762
763         /* Make the private data available to any searches the modules may trigger in initialisation */
764         module->private_data = data;
765         talloc_steal(module, data);
766         
767         modules_attributes = ldb_msg_find_element(msg, "modules");
768         if (modules_attributes) {
769                 for (i=0; i < modules_attributes->num_values; i++) {
770                         struct ldb_dn *base_dn;
771                         int partition_idx;
772                         struct dsdb_control_current_partition *partition = NULL;
773                         const char **modules = NULL;
774
775                         char *base = talloc_strdup(data->partitions, (char *)modules_attributes->values[i].data);
776                         char *p = strchr(base, ':');
777                         if (!p) {
778                                 ldb_asprintf_errstring(module->ldb, 
779                                                         "partition_init: "
780                                                         "invalid form for partition module record (missing ':'): %s", base);
781                                 talloc_free(mem_ctx);
782                                 return LDB_ERR_CONSTRAINT_VIOLATION;
783                         }
784                         p[0] = '\0';
785                         p++;
786                         if (!p[0]) {
787                                 ldb_asprintf_errstring(module->ldb, 
788                                                         "partition_init: "
789                                                         "invalid form for partition module record (missing backend database): %s", base);
790                                 talloc_free(mem_ctx);
791                                 return LDB_ERR_CONSTRAINT_VIOLATION;
792                         }
793
794                         modules = ldb_modules_list_from_string(module->ldb, mem_ctx,
795                                                                p);
796                         
797                         base_dn = ldb_dn_new(mem_ctx, module->ldb, base);
798                         if (!ldb_dn_validate(base_dn)) {
799                                 talloc_free(mem_ctx);
800                                 return LDB_ERR_OPERATIONS_ERROR;
801                         }
802                         
803                         for (partition_idx = 0; data->partitions[partition_idx]; partition_idx++) {
804                                 if (ldb_dn_compare(data->partitions[partition_idx]->dn, base_dn) == 0) {
805                                         partition = data->partitions[partition_idx];
806                                         break;
807                                 }
808                         }
809                         
810                         if (!partition) {
811                                 ldb_asprintf_errstring(module->ldb, 
812                                                         "partition_init: "
813                                                         "invalid form for partition module record (no such partition): %s", base);
814                                 talloc_free(mem_ctx);
815                                 return LDB_ERR_CONSTRAINT_VIOLATION;
816                         }
817                         
818                         ret = ldb_load_modules_list(module->ldb, modules, partition->module, &partition->module);
819                         if (ret != LDB_SUCCESS) {
820                                 ldb_asprintf_errstring(module->ldb, 
821                                                        "partition_init: "
822                                                        "loading backend for %s failed: %s", 
823                                                        base, ldb_errstring(module->ldb));
824                                 talloc_free(mem_ctx);
825                                 return ret;
826                         }
827                         ret = ldb_init_module_chain(module->ldb, partition->module);
828                         if (ret != LDB_SUCCESS) {
829                                 ldb_asprintf_errstring(module->ldb, 
830                                                        "partition_init: "
831                                                        "initialising backend for %s failed: %s", 
832                                                        base, ldb_errstring(module->ldb));
833                                 talloc_free(mem_ctx);
834                                 return ret;
835                         }
836                 }
837         }
838
839         talloc_free(mem_ctx);
840         return ldb_next_init(module);
841 }
842
843 static int partition_wait_none(struct ldb_handle *handle) {
844         struct partition_context *ac;
845         int ret;
846         int i;
847     
848         if (!handle || !handle->private_data) {
849                 return LDB_ERR_OPERATIONS_ERROR;
850         }
851
852         if (handle->state == LDB_ASYNC_DONE) {
853                 return handle->status;
854         }
855
856         handle->state = LDB_ASYNC_PENDING;
857         handle->status = LDB_SUCCESS;
858
859         ac = talloc_get_type(handle->private_data, struct partition_context);
860
861         for (i=0; i < ac->num_requests; i++) {
862                 ret = ldb_wait(ac->down_req[i]->handle, LDB_WAIT_NONE);
863                 
864                 if (ret != LDB_SUCCESS) {
865                         handle->status = ret;
866                         goto done;
867                 }
868                 if (ac->down_req[i]->handle->status != LDB_SUCCESS) {
869                         handle->status = ac->down_req[i]->handle->status;
870                         goto done;
871                 }
872                 
873                 if (ac->down_req[i]->handle->state != LDB_ASYNC_DONE) {
874                         return LDB_SUCCESS;
875                 }
876         }
877
878         ret = LDB_SUCCESS;
879
880 done:
881         handle->state = LDB_ASYNC_DONE;
882         return ret;
883 }
884
885
886 static int partition_wait_all(struct ldb_handle *handle) {
887
888         int ret;
889
890         while (handle->state != LDB_ASYNC_DONE) {
891                 ret = partition_wait_none(handle);
892                 if (ret != LDB_SUCCESS) {
893                         return ret;
894                 }
895         }
896
897         return handle->status;
898 }
899
900 static int partition_wait(struct ldb_handle *handle, enum ldb_wait_type type)
901 {
902         if (type == LDB_WAIT_ALL) {
903                 return partition_wait_all(handle);
904         } else {
905                 return partition_wait_none(handle);
906         }
907 }
908
909 static const struct ldb_module_ops partition_ops = {
910         .name              = "partition",
911         .init_context      = partition_init,
912         .search            = partition_search,
913         .add               = partition_add,
914         .modify            = partition_modify,
915         .del               = partition_delete,
916         .rename            = partition_rename,
917         .extended          = partition_extended,
918         .sequence_number   = partition_sequence_number,
919         .start_transaction = partition_start_trans,
920         .end_transaction   = partition_end_trans,
921         .del_transaction   = partition_del_trans,
922         .wait              = partition_wait
923 };
924
925 int ldb_partition_init(void)
926 {
927         return ldb_register_module(&partition_ops);
928 }