r20729: add a version number to struct dsdb_extended_replicated_objects
[metze/samba/wip.git] / source4 / dsdb / samdb / ldb_modules / partition.c
1 /* 
2    Partitions ldb module
3
4    Copyright (C) Andrew Bartlett <abartlet@samba.org> 2006
5    Copyright (C) Stefan Metzmacher <metze@samba.org> 2007
6
7    * NOTICE: this module is NOT released under the GNU LGPL license as
8    * other ldb code. This module is release under the GNU GPL v2 or
9    * later license.
10
11    This program is free software; you can redistribute it and/or modify
12    it under the terms of the GNU General Public License as published by
13    the Free Software Foundation; either version 2 of the License, or
14    (at your option) any later version.
15    
16    This program is distributed in the hope that it will be useful,
17    but WITHOUT ANY WARRANTY; without even the implied warranty of
18    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19    GNU General Public License for more details.
20    
21    You should have received a copy of the GNU General Public License
22    along with this program; if not, write to the Free Software
23    Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
24 */
25
26 /*
27  *  Name: ldb
28  *
29  *  Component: ldb partitions module
30  *
31  *  Description: Implement LDAP partitions
32  *
33  *  Author: Andrew Bartlett
34  *  Author: Stefan Metzmacher
35  */
36
37 #include "includes.h"
38 #include "ldb/include/includes.h"
39 #include "dsdb/samdb/samdb.h"
40
41 struct partition {
42         struct ldb_module *module;
43         const char *backend;
44         struct ldb_dn *dn;
45 };
46 struct partition_private_data {
47         struct partition **partitions;
48         struct ldb_dn **replicate;
49 };
50
51 struct partition_context {
52         struct ldb_module *module;
53         struct ldb_handle *handle;
54         struct ldb_request *orig_req;
55
56         struct ldb_request **down_req;
57         int num_requests;
58         int finished_requests;
59 };
60
61 static struct partition_context *partition_init_handle(struct ldb_request *req, struct ldb_module *module)
62 {
63         struct partition_context *ac;
64         struct ldb_handle *h;
65
66         h = talloc_zero(req, struct ldb_handle);
67         if (h == NULL) {
68                 ldb_set_errstring(module->ldb, "Out of Memory");
69                 return NULL;
70         }
71
72         h->module = module;
73
74         ac = talloc_zero(h, struct partition_context);
75         if (ac == NULL) {
76                 ldb_set_errstring(module->ldb, "Out of Memory");
77                 talloc_free(h);
78                 return NULL;
79         }
80
81         h->private_data = ac;
82
83         ac->module = module;
84         ac->handle = h;
85         ac->orig_req = req;
86
87         req->handle = h;
88
89         return ac;
90 }
91
92 struct ldb_module *make_module_for_next_request(TALLOC_CTX *mem_ctx, 
93                                                 struct ldb_context *ldb,
94                                                 struct ldb_module *module)
95 {
96         struct ldb_module *current;
97         static const struct ldb_module_ops ops; /* zero */
98         current = talloc_zero(mem_ctx, struct ldb_module);
99         if (current == NULL) {
100                 return module;
101         }
102         
103         current->ldb = ldb;
104         current->ops = &ops;
105         current->prev = NULL;
106         current->next = module;
107         return current;
108 }
109
110 struct ldb_module *find_backend(struct ldb_module *module, struct ldb_request *req, struct ldb_dn *dn)
111 {
112         int i;
113         struct partition_private_data *data = talloc_get_type(module->private_data, 
114                                                               struct partition_private_data);
115         /* Look at base DN */
116         /* Figure out which partition it is under */
117         /* Skip the lot if 'data' isn't here yet (initialistion) */
118         for (i=0; data && data->partitions && data->partitions[i]; i++) {
119                 if (ldb_dn_compare_base(data->partitions[i]->dn, dn) == 0) {
120                         return make_module_for_next_request(req, module->ldb, data->partitions[i]->module);
121                 }
122         }
123
124         return module;
125 };
126
127
128 /*
129   fire the caller's callback for every entry, but only send 'done' once.
130 */
131 static int partition_search_callback(struct ldb_context *ldb, void *context, struct ldb_reply *ares)
132 {
133         struct partition_context *ac;
134
135         if (!context || !ares) {
136                 ldb_set_errstring(ldb, "partition_search_callback: NULL Context or Result in 'search' callback");
137                 goto error;
138         }
139
140         ac = talloc_get_type(context, struct partition_context);
141
142         if (ares->type == LDB_REPLY_ENTRY) {
143                 return ac->orig_req->callback(ldb, ac->orig_req->context, ares);
144         } else {
145                 ac->finished_requests++;
146                 if (ac->finished_requests == ac->num_requests) {
147                         return ac->orig_req->callback(ldb, ac->orig_req->context, ares);
148                 } else {
149                         talloc_free(ares);
150                         return LDB_SUCCESS;
151                 }
152         }
153 error:
154         talloc_free(ares);
155         return LDB_ERR_OPERATIONS_ERROR;
156 }
157
158 /*
159   only fire the 'last' callback, and only for START-TLS for now 
160 */
161 static int partition_other_callback(struct ldb_context *ldb, void *context, struct ldb_reply *ares)
162 {
163         struct partition_context *ac;
164
165         if (!context) {
166                 ldb_set_errstring(ldb, "partition_other_callback: NULL Context in 'other' callback");
167                 goto error;
168         }
169
170         ac = talloc_get_type(context, struct partition_context);
171
172         if (!ac->orig_req->callback) {
173                 talloc_free(ares);
174                 return LDB_SUCCESS;
175         }
176
177         if (!ares 
178             || (ares->type == LDB_REPLY_EXTENDED 
179                 && strcmp(ares->response->oid, LDB_EXTENDED_START_TLS_OID))) {
180                 ac->finished_requests++;
181                 if (ac->finished_requests == ac->num_requests) {
182                         return ac->orig_req->callback(ldb, ac->orig_req->context, ares);
183                 }
184                 talloc_free(ares);
185                 return LDB_SUCCESS;
186         }
187         ldb_set_errstring(ldb, "partition_other_callback: Unknown reply type, only supports START_TLS");
188 error:
189         talloc_free(ares);
190         return LDB_ERR_OPERATIONS_ERROR;
191 }
192
193
194 static int partition_send_request(struct partition_context *ac, struct ldb_module *partition, 
195                                   struct ldb_dn *partition_base_dn)
196 {
197         int ret;
198         struct ldb_module *next = make_module_for_next_request(ac->module, ac->module->ldb, partition);
199         struct ldb_request *req;
200         ac->down_req = talloc_realloc(ac, ac->down_req, 
201                                         struct ldb_request *, ac->num_requests + 1);
202         if (!ac->down_req) {
203                 ldb_set_errstring(ac->module->ldb, "Out of Memory");
204                 return LDB_ERR_OPERATIONS_ERROR;
205         }
206         req = ac->down_req[ac->num_requests] = talloc(ac, struct ldb_request);
207         if (req == NULL) {
208                 ldb_set_errstring(ac->module->ldb, "Out of Memory");
209                 return LDB_ERR_OPERATIONS_ERROR;
210         }
211         
212         *ac->down_req[ac->num_requests] = *ac->orig_req; /* copy the request */
213
214         if (req->operation == LDB_SEARCH) {
215                 /* If the search is for 'more' than this partition,
216                  * then change the basedn, so a remote LDAP server
217                  * doesn't object */
218                 if (ldb_dn_compare_base(partition_base_dn, req->op.search.base) != 0) {
219                         req->op.search.base = partition_base_dn;
220                 }
221                 req->callback = partition_search_callback;
222                 req->context = ac;
223         } else {
224                 req->callback = partition_other_callback;
225                 req->context = ac;
226         }
227
228         /* Spray off search requests to all backends */
229         ret = ldb_next_request(next, req); 
230         if (ret != LDB_SUCCESS) {
231                 return ret;
232         }
233         
234         ac->num_requests++;
235         return LDB_SUCCESS;
236 }
237
238 /* Send a request down to all the partitions */
239 static int partition_send_all(struct ldb_module *module, 
240                               struct partition_context *ac, struct ldb_request *req) 
241 {
242         int i;
243         struct partition_private_data *data = talloc_get_type(module->private_data, 
244                                                               struct partition_private_data);
245         int ret = partition_send_request(ac, module->next, NULL);
246         if (ret != LDB_SUCCESS) {
247                 return ret;
248         }
249         for (i=0; data && data->partitions && data->partitions[i]; i++) {
250                 ret = partition_send_request(ac, data->partitions[i]->module, data->partitions[i]->dn);
251                 if (ret != LDB_SUCCESS) {
252                         return ret;
253                 }
254         }
255         return LDB_SUCCESS;
256 }
257
258 /* Figure out which backend a request needs to be aimed at.  Some
259  * requests must be replicated to all backends */
260 static int partition_replicate(struct ldb_module *module, struct ldb_request *req, struct ldb_dn *dn) 
261 {
262         int i;
263         struct ldb_module *backend;
264         struct partition_private_data *data = talloc_get_type(module->private_data, 
265                                                               struct partition_private_data);
266         
267         /* Is this a special DN, we need to replicate to every backend? */
268         for (i=0; data->replicate && data->replicate[i]; i++) {
269                 if (ldb_dn_compare(data->replicate[i], 
270                                    dn) == 0) {
271                         struct partition_context *ac;
272                         
273                         ac = partition_init_handle(req, module);
274                         if (!ac) {
275                                 return LDB_ERR_OPERATIONS_ERROR;
276                         }
277                         
278                         return partition_send_all(module, ac, req);
279                 }
280         }
281
282         /* Otherwise, we need to find the backend to fire it to */
283
284         /* Find backend */
285         backend = find_backend(module, req, dn);
286         
287         /* issue request */
288         return ldb_next_request(backend, req);
289         
290 }
291
292 /* search */
293 static int partition_search(struct ldb_module *module, struct ldb_request *req)
294 {
295         /* Find backend */
296         struct partition_private_data *data = talloc_get_type(module->private_data, 
297                                                               struct partition_private_data);
298         /* issue request */
299
300         /* (later) consider if we should be searching multiple
301          * partitions (for 'invisible' partition behaviour */
302         if (ldb_get_opaque(module->ldb, "global_catalog")) {
303                 int ret, i;
304                 struct partition_context *ac;
305                 
306                 ac = partition_init_handle(req, module);
307                 if (!ac) {
308                         return LDB_ERR_OPERATIONS_ERROR;
309                 }
310
311                 /* Search from the base DN */
312                 if (!req->op.search.base || ldb_dn_is_null(req->op.search.base)) {
313                         return partition_send_all(module, ac, req);
314                 }
315                 for (i=0; data && data->partitions && data->partitions[i]; i++) {
316                         /* Find all partitions under the search base */
317                         if (ldb_dn_compare_base(req->op.search.base, data->partitions[i]->dn) == 0) {
318                                 ret = partition_send_request(ac, data->partitions[i]->module, data->partitions[i]->dn);
319                                 if (ret != LDB_SUCCESS) {
320                                         return ret;
321                                 }
322                         }
323                 }
324
325                 /* Perhaps we didn't match any partitions.  Try the main partition, only */
326                 if (ac->num_requests == 0) {
327                         talloc_free(ac);
328                         return ldb_next_request(module, req);
329                 }
330                 
331                 return LDB_SUCCESS;
332         } else {
333                 struct ldb_module *backend = find_backend(module, req, req->op.search.base);
334         
335                 return ldb_next_request(backend, req);
336         }
337 }
338
339 /* add */
340 static int partition_add(struct ldb_module *module, struct ldb_request *req)
341 {
342         return partition_replicate(module, req, req->op.add.message->dn);
343 }
344
345 /* modify */
346 static int partition_modify(struct ldb_module *module, struct ldb_request *req)
347 {
348         return partition_replicate(module, req, req->op.mod.message->dn);
349 }
350
351 /* delete */
352 static int partition_delete(struct ldb_module *module, struct ldb_request *req)
353 {
354         return partition_replicate(module, req, req->op.del.dn);
355 }
356
357 /* rename */
358 static int partition_rename(struct ldb_module *module, struct ldb_request *req)
359 {
360         /* Find backend */
361         struct ldb_module *backend = find_backend(module, req, req->op.rename.olddn);
362         struct ldb_module *backend2 = find_backend(module, req, req->op.rename.newdn);
363
364         if (backend->next != backend2->next) {
365                 return LDB_ERR_AFFECTS_MULTIPLE_DSAS;
366         }
367
368         return partition_replicate(module, req, req->op.rename.olddn);
369 }
370
371 /* start a transaction */
372 static int partition_start_trans(struct ldb_module *module)
373 {
374         int i, ret;
375         struct partition_private_data *data = talloc_get_type(module->private_data, 
376                                                               struct partition_private_data);
377         /* Look at base DN */
378         /* Figure out which partition it is under */
379         /* Skip the lot if 'data' isn't here yet (initialistion) */
380         ret = ldb_next_start_trans(module);
381         if (ret != LDB_SUCCESS) {
382                 return ret;
383         }
384
385         for (i=0; data && data->partitions && data->partitions[i]; i++) {
386                 struct ldb_module *next = make_module_for_next_request(module, module->ldb, data->partitions[i]->module);
387
388                 ret = ldb_next_start_trans(next);
389                 talloc_free(next);
390                 if (ret != LDB_SUCCESS) {
391                         /* Back it out, if it fails on one */
392                         for (i--; i >= 0; i--) {
393                                 next = make_module_for_next_request(module, module->ldb, data->partitions[i]->module);
394                                 ldb_next_del_trans(next);
395                                 talloc_free(next);
396                         }
397                         return ret;
398                 }
399         }
400         return LDB_SUCCESS;
401 }
402
403 /* end a transaction */
404 static int partition_end_trans(struct ldb_module *module)
405 {
406         int i, ret, ret2 = LDB_SUCCESS;
407         struct partition_private_data *data = talloc_get_type(module->private_data, 
408                                                               struct partition_private_data);
409         ret = ldb_next_end_trans(module);
410         if (ret != LDB_SUCCESS) {
411                 return ret;
412         }
413
414         /* Look at base DN */
415         /* Figure out which partition it is under */
416         /* Skip the lot if 'data' isn't here yet (initialistion) */
417         for (i=0; data && data->partitions && data->partitions[i]; i++) {
418                 struct ldb_module *next = make_module_for_next_request(module, module->ldb, data->partitions[i]->module);
419                 
420                 ret = ldb_next_end_trans(next);
421                 talloc_free(next);
422                 if (ret != LDB_SUCCESS) {
423                         ret2 = ret;
424                 }
425         }
426
427         if (ret != LDB_SUCCESS) {
428                 /* Back it out, if it fails on one */
429                 for (i=0; data && data->partitions && data->partitions[i]; i++) {
430                         struct ldb_module *next = make_module_for_next_request(module, module->ldb, data->partitions[i]->module);
431                         ldb_next_del_trans(next);
432                         talloc_free(next);
433                 }
434         }
435         return ret;
436 }
437
438 /* delete a transaction */
439 static int partition_del_trans(struct ldb_module *module)
440 {
441         int i, ret, ret2 = LDB_SUCCESS;
442         struct partition_private_data *data = talloc_get_type(module->private_data, 
443                                                               struct partition_private_data);
444         ret = ldb_next_del_trans(module);
445         if (ret != LDB_SUCCESS) {
446                 ret2 = ret;
447         }
448
449         /* Look at base DN */
450         /* Figure out which partition it is under */
451         /* Skip the lot if 'data' isn't here yet (initialistion) */
452         for (i=0; data && data->partitions && data->partitions[i]; i++) {
453                 struct ldb_module *next = make_module_for_next_request(module, module->ldb, data->partitions[i]->module);
454                 
455                 ret = ldb_next_del_trans(next);
456                 talloc_free(next);
457                 if (ret != LDB_SUCCESS) {
458                         ret2 = ret;
459                 }
460         }
461         return ret2;
462 }
463
464 static int partition_sequence_number(struct ldb_module *module, struct ldb_request *req)
465 {
466         int i, ret;
467         uint64_t seq_number = 0;
468         uint64_t timestamp_sequence = 0;
469         uint64_t timestamp = 0;
470         struct partition_private_data *data = talloc_get_type(module->private_data, 
471                                                               struct partition_private_data);
472
473         switch (req->op.seq_num.type) {
474         case LDB_SEQ_NEXT:
475         case LDB_SEQ_HIGHEST_SEQ:
476                 ret = ldb_next_request(module, req);
477                 if (ret != LDB_SUCCESS) {
478                         return ret;
479                 }
480                 if (req->op.seq_num.flags & LDB_SEQ_TIMESTAMP_SEQUENCE) {
481                         timestamp_sequence = req->op.seq_num.seq_num;
482                 } else {
483                         seq_number = seq_number + req->op.seq_num.seq_num;
484                 }
485
486                 /* Look at base DN */
487                 /* Figure out which partition it is under */
488                 /* Skip the lot if 'data' isn't here yet (initialistion) */
489                 for (i=0; data && data->partitions && data->partitions[i]; i++) {
490                         struct ldb_module *next = make_module_for_next_request(req, module->ldb, data->partitions[i]->module);
491                         
492                         ret = ldb_next_request(next, req);
493                         talloc_free(next);
494                         if (ret != LDB_SUCCESS) {
495                                 return ret;
496                         }
497                         if (req->op.seq_num.flags & LDB_SEQ_TIMESTAMP_SEQUENCE) {
498                                 timestamp_sequence = MAX(timestamp_sequence, req->op.seq_num.seq_num);
499                         } else {
500                                 seq_number = seq_number + req->op.seq_num.seq_num;
501                         }
502                 }
503                 /* fall though */
504         case LDB_SEQ_HIGHEST_TIMESTAMP:
505         {
506                 struct ldb_request *date_req = talloc(req, struct ldb_request);
507                 if (!date_req) {
508                         return LDB_ERR_OPERATIONS_ERROR;
509                 }
510                 *date_req = *req;
511                 date_req->op.seq_num.flags = LDB_SEQ_HIGHEST_TIMESTAMP;
512
513                 ret = ldb_next_request(module, date_req);
514                 if (ret != LDB_SUCCESS) {
515                         return ret;
516                 }
517                 timestamp = date_req->op.seq_num.seq_num;
518                 
519                 /* Look at base DN */
520                 /* Figure out which partition it is under */
521                 /* Skip the lot if 'data' isn't here yet (initialistion) */
522                 for (i=0; data && data->partitions && data->partitions[i]; i++) {
523                         struct ldb_module *next = make_module_for_next_request(req, module->ldb, data->partitions[i]->module);
524                         
525                         ret = ldb_next_request(next, date_req);
526                         talloc_free(next);
527                         if (ret != LDB_SUCCESS) {
528                                 return ret;
529                         }
530                         timestamp = MAX(timestamp, date_req->op.seq_num.seq_num);
531                 }
532                 break;
533         }
534         }
535
536         switch (req->op.seq_num.flags) {
537         case LDB_SEQ_NEXT:
538         case LDB_SEQ_HIGHEST_SEQ:
539                 
540                 req->op.seq_num.flags = 0;
541
542                 /* Has someone above set a timebase sequence? */
543                 if (timestamp_sequence) {
544                         req->op.seq_num.seq_num = (((unsigned long long)timestamp << 24) | (seq_number & 0xFFFFFF));
545                 } else {
546                         req->op.seq_num.seq_num = seq_number;
547                 }
548
549                 if (timestamp_sequence > req->op.seq_num.seq_num) {
550                         req->op.seq_num.seq_num = timestamp_sequence;
551                         req->op.seq_num.flags |= LDB_SEQ_TIMESTAMP_SEQUENCE;
552                 }
553
554                 req->op.seq_num.flags |= LDB_SEQ_GLOBAL_SEQUENCE;
555                 break;
556         case LDB_SEQ_HIGHEST_TIMESTAMP:
557                 req->op.seq_num.seq_num = timestamp;
558                 break;
559         }
560
561         switch (req->op.seq_num.flags) {
562         case LDB_SEQ_NEXT:
563                 req->op.seq_num.seq_num++;
564         }
565         return LDB_SUCCESS;
566 }
567
568 static int partition_extended_replicated_objects(struct ldb_module *module, struct ldb_request *req)
569 {
570         struct dsdb_extended_replicated_objects *ext;
571
572         ext = talloc_get_type(req->op.extended.data, struct dsdb_extended_replicated_objects);
573         if (!ext) {
574                 ldb_debug(module->ldb, LDB_DEBUG_FATAL, "partition_extended_replicated_objects: invalid extended data\n");
575                 return LDB_ERR_PROTOCOL_ERROR;
576         }
577
578         if (ext->version != DSDB_EXTENDED_REPLICATED_OBJECTS_VERSION) {
579                 ldb_debug(module->ldb, LDB_DEBUG_FATAL, "partition_extended_replicated_objects: extended data invalid version [%u != %u]\n",
580                           ext->version, DSDB_EXTENDED_REPLICATED_OBJECTS_VERSION);
581                 return LDB_ERR_PROTOCOL_ERROR;
582         }
583
584         return partition_replicate(module, req, ext->partition_dn);
585 }
586
587 /* extended */
588 static int partition_extended(struct ldb_module *module, struct ldb_request *req)
589 {
590         struct partition_context *ac;
591
592         if (strcmp(req->op.extended.oid, DSDB_EXTENDED_REPLICATED_OBJECTS_OID) == 0) {
593                 return partition_extended_replicated_objects(module, req);
594         }
595
596         /* 
597          * as the extended operation has no dn
598          * we need to send it to all partitions
599          */
600
601         ac = partition_init_handle(req, module);
602         if (!ac) {
603                 return LDB_ERR_OPERATIONS_ERROR;
604         }
605                         
606         return partition_send_all(module, ac, req);
607 }
608
609 static int sort_compare(void *void1,
610                         void *void2, void *opaque)
611 {
612         struct partition **pp1 = void1;
613         struct partition **pp2 = void2;
614         struct partition *partition1 = talloc_get_type(*pp1, struct partition);
615         struct partition *partition2 = talloc_get_type(*pp2, struct partition);
616
617         return ldb_dn_compare(partition1->dn, partition2->dn);
618 }
619
620 static int partition_init(struct ldb_module *module)
621 {
622         int ret, i;
623         TALLOC_CTX *mem_ctx = talloc_new(module);
624         static const char *attrs[] = { "partition", "replicateEntries", "modules", NULL };
625         struct ldb_result *res;
626         struct ldb_message *msg;
627         struct ldb_message_element *partition_attributes;
628         struct ldb_message_element *replicate_attributes;
629         struct ldb_message_element *modules_attributes;
630
631         struct partition_private_data *data;
632
633         if (!mem_ctx) {
634                 return LDB_ERR_OPERATIONS_ERROR;
635         }
636
637         data = talloc(mem_ctx, struct partition_private_data);
638         if (data == NULL) {
639                 return LDB_ERR_OPERATIONS_ERROR;
640         }
641
642         ret = ldb_search(module->ldb, ldb_dn_new(mem_ctx, module->ldb, "@PARTITION"),
643                          LDB_SCOPE_BASE,
644                          NULL, attrs,
645                          &res);
646         if (ret != LDB_SUCCESS) {
647                 talloc_free(mem_ctx);
648                 return ret;
649         }
650         talloc_steal(mem_ctx, res);
651         if (res->count == 0) {
652                 talloc_free(mem_ctx);
653                 return ldb_next_init(module);
654         }
655
656         if (res->count > 1) {
657                 talloc_free(mem_ctx);
658                 return LDB_ERR_CONSTRAINT_VIOLATION;
659         }
660
661         msg = res->msgs[0];
662
663         partition_attributes = ldb_msg_find_element(msg, "partition");
664         if (!partition_attributes) {
665                 ldb_set_errstring(module->ldb, "partition_init: no partitions specified");
666                 talloc_free(mem_ctx);
667                 return LDB_ERR_CONSTRAINT_VIOLATION;
668         }
669         data->partitions = talloc_array(data, struct partition *, partition_attributes->num_values + 1);
670         if (!data->partitions) {
671                 talloc_free(mem_ctx);
672                 return LDB_ERR_OPERATIONS_ERROR;
673         }
674         for (i=0; i < partition_attributes->num_values; i++) {
675                 char *base = talloc_strdup(data->partitions, (char *)partition_attributes->values[i].data);
676                 char *p = strchr(base, ':');
677                 if (!p) {
678                         ldb_asprintf_errstring(module->ldb, 
679                                                 "partition_init: "
680                                                 "invalid form for partition record (missing ':'): %s", base);
681                         talloc_free(mem_ctx);
682                         return LDB_ERR_CONSTRAINT_VIOLATION;
683                 }
684                 p[0] = '\0';
685                 p++;
686                 if (!p[0]) {
687                         ldb_asprintf_errstring(module->ldb, 
688                                                 "partition_init: "
689                                                 "invalid form for partition record (missing backend database): %s", base);
690                         talloc_free(mem_ctx);
691                         return LDB_ERR_CONSTRAINT_VIOLATION;
692                 }
693                 data->partitions[i] = talloc(data->partitions, struct partition);
694                 if (!data->partitions[i]) {
695                         talloc_free(mem_ctx);
696                         return LDB_ERR_OPERATIONS_ERROR;
697                 }
698
699                 data->partitions[i]->dn = ldb_dn_new(data->partitions[i], module->ldb, base);
700                 if (!data->partitions[i]->dn) {
701                         ldb_asprintf_errstring(module->ldb, 
702                                                 "partition_init: invalid DN in partition record: %s", base);
703                         talloc_free(mem_ctx);
704                         return LDB_ERR_CONSTRAINT_VIOLATION;
705                 }
706
707                 data->partitions[i]->backend = private_path(data->partitions[i], p);
708                 ret = ldb_connect_backend(module->ldb, data->partitions[i]->backend, NULL, &data->partitions[i]->module);
709                 if (ret != LDB_SUCCESS) {
710                         talloc_free(mem_ctx);
711                         return ret;
712                 }
713         }
714         data->partitions[i] = NULL;
715
716         /* sort these into order, most to least specific */
717         ldb_qsort(data->partitions, partition_attributes->num_values, sizeof(*data->partitions), 
718                   module->ldb, sort_compare);
719
720         for (i=0; data->partitions[i]; i++) {
721                 struct ldb_request *req;
722                 req = talloc_zero(mem_ctx, struct ldb_request);
723                 if (req == NULL) {
724                         ldb_debug(module->ldb, LDB_DEBUG_ERROR, "partition: Out of memory!\n");
725                         talloc_free(mem_ctx);
726                         return LDB_ERR_OPERATIONS_ERROR;
727                 }
728                 
729                 req->operation = LDB_REQ_REGISTER_PARTITION;
730                 req->op.reg_partition.dn = data->partitions[i]->dn;
731                 
732                 ret = ldb_request(module->ldb, req);
733                 if (ret != LDB_SUCCESS) {
734                         ldb_debug(module->ldb, LDB_DEBUG_ERROR, "partition: Unable to register partition with rootdse!\n");
735                         talloc_free(mem_ctx);
736                         return LDB_ERR_OTHER;
737                 }
738                 talloc_free(req);
739         }
740
741         replicate_attributes = ldb_msg_find_element(msg, "replicateEntries");
742         if (!replicate_attributes) {
743                 data->replicate = NULL;
744         } else {
745                 data->replicate = talloc_array(data, struct ldb_dn *, replicate_attributes->num_values + 1);
746                 if (!data->replicate) {
747                         talloc_free(mem_ctx);
748                         return LDB_ERR_OPERATIONS_ERROR;
749                 }
750                 
751                 for (i=0; i < replicate_attributes->num_values; i++) {
752                         data->replicate[i] = ldb_dn_new(data->replicate, module->ldb, (const char *)replicate_attributes->values[i].data);
753                         if (!ldb_dn_validate(data->replicate[i])) {
754                                 ldb_asprintf_errstring(module->ldb, 
755                                                         "partition_init: "
756                                                         "invalid DN in partition replicate record: %s", 
757                                                         replicate_attributes->values[i].data);
758                                 talloc_free(mem_ctx);
759                                 return LDB_ERR_CONSTRAINT_VIOLATION;
760                         }
761                 }
762                 data->replicate[i] = NULL;
763         }
764
765         /* Make the private data available to any searches the modules may trigger in initialisation */
766         module->private_data = data;
767         talloc_steal(module, data);
768         
769         modules_attributes = ldb_msg_find_element(msg, "modules");
770         if (modules_attributes) {
771                 for (i=0; i < modules_attributes->num_values; i++) {
772                         struct ldb_dn *base_dn;
773                         int partition_idx;
774                         struct partition *partition = NULL;
775                         const char **modules = NULL;
776
777                         char *base = talloc_strdup(data->partitions, (char *)modules_attributes->values[i].data);
778                         char *p = strchr(base, ':');
779                         if (!p) {
780                                 ldb_asprintf_errstring(module->ldb, 
781                                                         "partition_init: "
782                                                         "invalid form for partition module record (missing ':'): %s", base);
783                                 talloc_free(mem_ctx);
784                                 return LDB_ERR_CONSTRAINT_VIOLATION;
785                         }
786                         p[0] = '\0';
787                         p++;
788                         if (!p[0]) {
789                                 ldb_asprintf_errstring(module->ldb, 
790                                                         "partition_init: "
791                                                         "invalid form for partition module record (missing backend database): %s", base);
792                                 talloc_free(mem_ctx);
793                                 return LDB_ERR_CONSTRAINT_VIOLATION;
794                         }
795
796                         modules = ldb_modules_list_from_string(module->ldb, mem_ctx,
797                                                                p);
798                         
799                         base_dn = ldb_dn_new(mem_ctx, module->ldb, base);
800                         if (!ldb_dn_validate(base_dn)) {
801                                 talloc_free(mem_ctx);
802                                 return LDB_ERR_OPERATIONS_ERROR;
803                         }
804                         
805                         for (partition_idx = 0; data->partitions[partition_idx]; partition_idx++) {
806                                 if (ldb_dn_compare(data->partitions[partition_idx]->dn, base_dn) == 0) {
807                                         partition = data->partitions[partition_idx];
808                                         break;
809                                 }
810                         }
811                         
812                         if (!partition) {
813                                 ldb_asprintf_errstring(module->ldb, 
814                                                         "partition_init: "
815                                                         "invalid form for partition module record (no such partition): %s", base);
816                                 talloc_free(mem_ctx);
817                                 return LDB_ERR_CONSTRAINT_VIOLATION;
818                         }
819                         
820                         ret = ldb_load_modules_list(module->ldb, modules, partition->module, &partition->module);
821                         if (ret != LDB_SUCCESS) {
822                                 ldb_asprintf_errstring(module->ldb, 
823                                                        "partition_init: "
824                                                        "loading backend for %s failed: %s", 
825                                                        base, ldb_errstring(module->ldb));
826                                 talloc_free(mem_ctx);
827                                 return ret;
828                         }
829                         ret = ldb_init_module_chain(module->ldb, partition->module);
830                         if (ret != LDB_SUCCESS) {
831                                 ldb_asprintf_errstring(module->ldb, 
832                                                        "partition_init: "
833                                                        "initialising backend for %s failed: %s", 
834                                                        base, ldb_errstring(module->ldb));
835                                 talloc_free(mem_ctx);
836                                 return ret;
837                         }
838                 }
839         }
840
841         talloc_free(mem_ctx);
842         return ldb_next_init(module);
843 }
844
845 static int partition_wait_none(struct ldb_handle *handle) {
846         struct partition_context *ac;
847         int ret;
848         int i;
849     
850         if (!handle || !handle->private_data) {
851                 return LDB_ERR_OPERATIONS_ERROR;
852         }
853
854         if (handle->state == LDB_ASYNC_DONE) {
855                 return handle->status;
856         }
857
858         handle->state = LDB_ASYNC_PENDING;
859         handle->status = LDB_SUCCESS;
860
861         ac = talloc_get_type(handle->private_data, struct partition_context);
862
863         for (i=0; i < ac->num_requests; i++) {
864                 ret = ldb_wait(ac->down_req[i]->handle, LDB_WAIT_NONE);
865                 
866                 if (ret != LDB_SUCCESS) {
867                         handle->status = ret;
868                         goto done;
869                 }
870                 if (ac->down_req[i]->handle->status != LDB_SUCCESS) {
871                         handle->status = ac->down_req[i]->handle->status;
872                         goto done;
873                 }
874                 
875                 if (ac->down_req[i]->handle->state != LDB_ASYNC_DONE) {
876                         return LDB_SUCCESS;
877                 }
878         }
879
880         ret = LDB_SUCCESS;
881
882 done:
883         handle->state = LDB_ASYNC_DONE;
884         return ret;
885 }
886
887
888 static int partition_wait_all(struct ldb_handle *handle) {
889
890         int ret;
891
892         while (handle->state != LDB_ASYNC_DONE) {
893                 ret = partition_wait_none(handle);
894                 if (ret != LDB_SUCCESS) {
895                         return ret;
896                 }
897         }
898
899         return handle->status;
900 }
901
902 static int partition_wait(struct ldb_handle *handle, enum ldb_wait_type type)
903 {
904         if (type == LDB_WAIT_ALL) {
905                 return partition_wait_all(handle);
906         } else {
907                 return partition_wait_none(handle);
908         }
909 }
910
911 static const struct ldb_module_ops partition_ops = {
912         .name              = "partition",
913         .init_context      = partition_init,
914         .search            = partition_search,
915         .add               = partition_add,
916         .modify            = partition_modify,
917         .del               = partition_delete,
918         .rename            = partition_rename,
919         .extended          = partition_extended,
920         .sequence_number   = partition_sequence_number,
921         .start_transaction = partition_start_trans,
922         .end_transaction   = partition_end_trans,
923         .del_transaction   = partition_del_trans,
924         .wait              = partition_wait
925 };
926
927 int ldb_partition_init(void)
928 {
929         return ldb_register_module(&partition_ops);
930 }