af8fa475d1d980771f93e69e4b2ba5efd08e61a2
[kamenim/samba.git] / source4 / dsdb / samdb / ldb_modules / partition.c
1 /* 
2    Partitions ldb module
3
4    Copyright (C) Andrew Bartlett <abartlet@samba.org> 2006
5
6    * NOTICE: this module is NOT released under the GNU LGPL license as
7    * other ldb code. This module is release under the GNU GPL v2 or
8    * later license.
9
10    This program is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation; either version 2 of the License, or
13    (at your option) any later version.
14    
15    This program is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License for more details.
19    
20    You should have received a copy of the GNU General Public License
21    along with this program; if not, write to the Free Software
22    Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
23 */
24
25 /*
26  *  Name: ldb
27  *
28  *  Component: ldb partitions module
29  *
30  *  Description: Implement LDAP partitions
31  *
32  *  Author: Andrew Bartlett
33  */
34
35 #include "includes.h"
36 #include "ldb/include/includes.h"
37
38 struct partition {
39         struct ldb_module *module;
40         const char *backend;
41         struct ldb_dn *dn;
42 };
43 struct partition_private_data {
44         struct partition **partitions;
45         struct ldb_dn **replicate;
46 };
47
48 struct partition_context {
49         struct ldb_module *module;
50         struct ldb_request *orig_req;
51
52         struct ldb_request **down_req;
53         int num_requests;
54         int finished_requests;
55 };
56
57 static struct ldb_handle *partition_init_handle(struct ldb_request *req, struct ldb_module *module)
58 {
59         struct partition_context *ac;
60         struct ldb_handle *h;
61
62         h = talloc_zero(req, struct ldb_handle);
63         if (h == NULL) {
64                 ldb_set_errstring(module->ldb, talloc_asprintf(module, "Out of Memory"));
65                 return NULL;
66         }
67
68         h->module = module;
69
70         ac = talloc_zero(h, struct partition_context);
71         if (ac == NULL) {
72                 ldb_set_errstring(module->ldb, talloc_asprintf(module, "Out of Memory"));
73                 talloc_free(h);
74                 return NULL;
75         }
76
77         h->private_data = (void *)ac;
78
79         ac->module = module;
80         ac->orig_req = req;
81
82         return h;
83 }
84
85 struct ldb_module *make_module_for_next_request(TALLOC_CTX *mem_ctx, 
86                                                 struct ldb_context *ldb,
87                                                 struct ldb_module *module) 
88 {
89         struct ldb_module *current;
90         static const struct ldb_module_ops ops; /* zero */
91         current = talloc_zero(mem_ctx, struct ldb_module);
92         if (current == NULL) {
93                 return module;
94         }
95         
96         current->ldb = ldb;
97         current->ops = &ops;
98         current->prev = NULL;
99         current->next = module;
100         return current;
101 }
102
103 struct ldb_module *find_backend(struct ldb_module *module, struct ldb_request *req, const struct ldb_dn *dn)
104 {
105         int i;
106         struct partition_private_data *data = talloc_get_type(module->private_data, 
107                                                               struct partition_private_data);
108         /* Look at base DN */
109         /* Figure out which partition it is under */
110         /* Skip the lot if 'data' isn't here yet (initialistion) */
111         for (i=0; data && data->partitions && data->partitions[i]; i++) {
112                 if (ldb_dn_compare_base(module->ldb, 
113                                         data->partitions[i]->dn, 
114                                         dn) == 0) {
115                         return make_module_for_next_request(req, module->ldb, data->partitions[i]->module);
116                 }
117         }
118
119         return module;
120 };
121
122
123 /*
124   fire the caller's callback for every entry, but only send 'done' once.
125 */
126 static int partition_search_callback(struct ldb_context *ldb, void *context, struct ldb_reply *ares)
127 {
128         struct partition_context *ac;
129
130         if (!context || !ares) {
131                 ldb_set_errstring(ldb, talloc_asprintf(ldb, "partition_search_callback: NULL Context or Result in 'search' callback"));
132                 goto error;
133         }
134
135         ac = talloc_get_type(context, struct partition_context);
136
137         if (ares->type == LDB_REPLY_ENTRY) {
138                 return ac->orig_req->callback(ldb, ac->orig_req->context, ares);
139         } else {
140                 ac->finished_requests++;
141                 if (ac->finished_requests == ac->num_requests) {
142                         return ac->orig_req->callback(ldb, ac->orig_req->context, ares);
143                 } else {
144                         talloc_free(ares);
145                         return LDB_SUCCESS;
146                 }
147         }
148 error:
149         talloc_free(ares);
150         return LDB_ERR_OPERATIONS_ERROR;
151 }
152
153 /*
154   only fire the 'last' callback, and only for START-TLS for now 
155 */
156 static int partition_other_callback(struct ldb_context *ldb, void *context, struct ldb_reply *ares)
157 {
158         struct partition_context *ac;
159
160         if (!context) {
161                 ldb_set_errstring(ldb, talloc_asprintf(ldb, "partition_other_callback: NULL Context in 'other' callback"));
162                 goto error;
163         }
164
165         ac = talloc_get_type(context, struct partition_context);
166
167         if (!ac->orig_req->callback) {
168                 talloc_free(ares);
169                 return LDB_SUCCESS;
170         }
171
172         if (!ares 
173             || (ares->type == LDB_REPLY_EXTENDED 
174                 && strcmp(ares->response->oid, LDB_EXTENDED_START_TLS_OID))) {
175                 ac->finished_requests++;
176                 if (ac->finished_requests == ac->num_requests) {
177                         return ac->orig_req->callback(ldb, ac->orig_req->context, ares);
178                 }
179                 talloc_free(ares);
180                 return LDB_SUCCESS;
181         }
182         ldb_set_errstring(ldb, talloc_asprintf(ldb, "partition_other_callback: Unknown reply type, only supports START_TLS"));
183 error:
184         talloc_free(ares);
185         return LDB_ERR_OPERATIONS_ERROR;
186 }
187
188
189 static int partition_send_request(struct partition_context *ac, struct ldb_module *partition)
190 {
191         int ret;
192         struct ldb_module *next = make_module_for_next_request(ac->module, ac->module->ldb, partition);
193         
194         ac->down_req = talloc_realloc(ac, ac->down_req, 
195                                         struct ldb_request *, ac->num_requests + 1);
196         if (!ac->down_req) {
197                 ldb_set_errstring(ac->module->ldb, talloc_asprintf(ac->module->ldb, "Out of memory!"));
198                 return LDB_ERR_OPERATIONS_ERROR;
199         }
200         ac->down_req[ac->num_requests] = talloc(ac, struct ldb_request);
201         if (ac->down_req[ac->num_requests] == NULL) {
202                 ldb_set_errstring(ac->module->ldb, talloc_asprintf(ac->module->ldb, "Out of memory!"));
203                 return LDB_ERR_OPERATIONS_ERROR;
204         }
205         
206         *ac->down_req[ac->num_requests] = *ac->orig_req; /* copy the request */
207         
208         if (ac->down_req[ac->num_requests]->operation == LDB_SEARCH) {
209                 ac->down_req[ac->num_requests]->callback = partition_search_callback;
210                 ac->down_req[ac->num_requests]->context = ac;
211         } else {
212                 ac->down_req[ac->num_requests]->callback = partition_other_callback;
213                 ac->down_req[ac->num_requests]->context = ac;
214         }
215
216         /* Spray off search requests to all backends */
217         ret = ldb_next_request(next, ac->down_req[ac->num_requests]); 
218         if (ret != LDB_SUCCESS) {
219                 return ret;
220         }
221         
222         ac->num_requests++;
223         return LDB_SUCCESS;
224 }
225
226 /* Send a request down to all the partitions */
227 static int partition_send_all(struct ldb_module *module, 
228                               struct partition_context *ac, struct ldb_request *req) 
229 {
230         int i;
231         struct partition_private_data *data = talloc_get_type(module->private_data, 
232                                                               struct partition_private_data);
233         int ret = partition_send_request(ac, module->next);
234         if (ret != LDB_SUCCESS) {
235                 return ret;
236         }
237         for (i=0; data && data->partitions && data->partitions[i]; i++) {
238                 ret = partition_send_request(ac, data->partitions[i]->module);
239                 if (ret != LDB_SUCCESS) {
240                         return ret;
241                 }
242         }
243         return LDB_SUCCESS;
244 }
245
246 /* Figure out which backend a request needs to be aimed at.  Some
247  * requests must be replicated to all backends */
248 static int partition_replicate(struct ldb_module *module, struct ldb_request *req, const struct ldb_dn *dn) 
249 {
250         int i;
251         struct ldb_module *backend;
252         struct partition_private_data *data = talloc_get_type(module->private_data, 
253                                                               struct partition_private_data);
254         
255         /* Is this a special DN, we need to replicate to every backend? */
256         for (i=0; data->replicate && data->replicate[i]; i++) {
257                 if (ldb_dn_compare(module->ldb, 
258                                    data->replicate[i], 
259                                    dn) == 0) {
260                         struct ldb_handle *h;
261                         struct partition_context *ac;
262                         
263                         h = partition_init_handle(req, module);
264                         if (!h) {
265                                 return LDB_ERR_OPERATIONS_ERROR;
266                         }
267                         /* return our own handle to deal with this call */
268                         req->handle = h;
269                         
270                         ac = talloc_get_type(h->private_data, struct partition_context);
271                         
272                         return partition_send_all(module, ac, req);
273                 }
274         }
275
276         /* Otherwise, we need to find the backend to fire it to */
277
278         /* Find backend */
279         backend = find_backend(module, req, dn);
280         
281         /* issue request */
282         return ldb_next_request(backend, req);
283         
284 }
285
286 /* search */
287 static int partition_search(struct ldb_module *module, struct ldb_request *req)
288 {
289         /* Find backend */
290         struct partition_private_data *data = talloc_get_type(module->private_data, 
291                                                               struct partition_private_data);
292         /* issue request */
293
294         /* (later) consider if we should be searching multiple
295          * partitions (for 'invisible' partition behaviour */
296         if (ldb_get_opaque(module->ldb, "global_catalog")) {
297                 int ret, i;
298                 struct ldb_handle *h;
299                 struct partition_context *ac;
300                 
301                 h = partition_init_handle(req, module);
302                 if (!h) {
303                         return LDB_ERR_OPERATIONS_ERROR;
304                 }
305                 /* return our own handle to deal with this call */
306                 req->handle = h;
307                 
308                 ac = talloc_get_type(h->private_data, struct partition_context);
309                 
310                 for (i=0; data && data->partitions && data->partitions[i]; i++) {
311                         /* Find all partitions under the search base */
312                         if (ldb_dn_compare_base(module->ldb, 
313                                                 req->op.search.base,
314                                                 data->partitions[i]->dn) == 0) {
315                                 ret = partition_send_request(ac, data->partitions[i]->module);
316                                 if (ret != LDB_SUCCESS) {
317                                         return ret;
318                                 }
319                         }
320                 }
321
322                 /* Perhaps we didn't match any partitions.  Try the main partition, then all partitions */
323                 if (ac->num_requests == 0) {
324                         return partition_send_all(module, ac, req);
325                 }
326                 
327                 return LDB_SUCCESS;
328         } else {
329                 struct ldb_module *backend = find_backend(module, req, req->op.search.base);
330         
331                 return ldb_next_request(backend, req);
332         }
333 }
334
335 /* add */
336 static int partition_add(struct ldb_module *module, struct ldb_request *req)
337 {
338         return partition_replicate(module, req, req->op.add.message->dn);
339 }
340
341 /* modify */
342 static int partition_modify(struct ldb_module *module, struct ldb_request *req)
343 {
344         return partition_replicate(module, req, req->op.mod.message->dn);
345 }
346
347 /* delete */
348 static int partition_delete(struct ldb_module *module, struct ldb_request *req)
349 {
350         return partition_replicate(module, req, req->op.del.dn);
351 }
352
353 /* rename */
354 static int partition_rename(struct ldb_module *module, struct ldb_request *req)
355 {
356         /* Find backend */
357         struct ldb_module *backend = find_backend(module, req, req->op.rename.olddn);
358         struct ldb_module *backend2 = find_backend(module, req, req->op.rename.newdn);
359
360         if (backend->next != backend2->next) {
361                 return LDB_ERR_AFFECTS_MULTIPLE_DSAS;
362         }
363
364         return partition_replicate(module, req, req->op.rename.olddn);
365 }
366
367 /* start a transaction */
368 static int partition_start_trans(struct ldb_module *module)
369 {
370         int i, ret;
371         struct partition_private_data *data = talloc_get_type(module->private_data, 
372                                                               struct partition_private_data);
373         /* Look at base DN */
374         /* Figure out which partition it is under */
375         /* Skip the lot if 'data' isn't here yet (initialistion) */
376         ret = ldb_next_start_trans(module);
377         if (ret != LDB_SUCCESS) {
378                 return ret;
379         }
380
381         for (i=0; data && data->partitions && data->partitions[i]; i++) {
382                 struct ldb_module *next = make_module_for_next_request(module, module->ldb, data->partitions[i]->module);
383
384                 ret = ldb_next_start_trans(next);
385                 talloc_free(next);
386                 if (ret != LDB_SUCCESS) {
387                         /* Back it out, if it fails on one */
388                         for (i--; i >= 0; i--) {
389                                 next = make_module_for_next_request(module, module->ldb, data->partitions[i]->module);
390                                 ldb_next_del_trans(next);
391                                 talloc_free(next);
392                         }
393                         return ret;
394                 }
395         }
396         return LDB_SUCCESS;
397 }
398
399 /* end a transaction */
400 static int partition_end_trans(struct ldb_module *module)
401 {
402         int i, ret, ret2 = LDB_SUCCESS;
403         struct partition_private_data *data = talloc_get_type(module->private_data, 
404                                                               struct partition_private_data);
405         ret = ldb_next_end_trans(module);
406         if (ret != LDB_SUCCESS) {
407                 return ret;
408         }
409
410         /* Look at base DN */
411         /* Figure out which partition it is under */
412         /* Skip the lot if 'data' isn't here yet (initialistion) */
413         for (i=0; data && data->partitions && data->partitions[i]; i++) {
414                 struct ldb_module *next = make_module_for_next_request(module, module->ldb, data->partitions[i]->module);
415                 
416                 ret = ldb_next_end_trans(next);
417                 talloc_free(next);
418                 if (ret != LDB_SUCCESS) {
419                         ret2 = ret;
420                 }
421         }
422
423         if (ret != LDB_SUCCESS) {
424                 /* Back it out, if it fails on one */
425                 for (i=0; data && data->partitions && data->partitions[i]; i++) {
426                         struct ldb_module *next = make_module_for_next_request(module, module->ldb, data->partitions[i]->module);
427                         ldb_next_del_trans(next);
428                         talloc_free(next);
429                 }
430         }
431         return ret;
432 }
433
434 /* delete a transaction */
435 static int partition_del_trans(struct ldb_module *module)
436 {
437         int i, ret, ret2 = LDB_SUCCESS;
438         struct partition_private_data *data = talloc_get_type(module->private_data, 
439                                                               struct partition_private_data);
440         ret = ldb_next_del_trans(module);
441         if (ret != LDB_SUCCESS) {
442                 ret2 = ret;
443         }
444
445         /* Look at base DN */
446         /* Figure out which partition it is under */
447         /* Skip the lot if 'data' isn't here yet (initialistion) */
448         for (i=0; data && data->partitions && data->partitions[i]; i++) {
449                 struct ldb_module *next = make_module_for_next_request(module, module->ldb, data->partitions[i]->module);
450                 
451                 ret = ldb_next_del_trans(next);
452                 talloc_free(next);
453                 if (ret != LDB_SUCCESS) {
454                         ret2 = ret;
455                 }
456         }
457         return ret2;
458 }
459
460 static int partition_sequence_number(struct ldb_module *module, struct ldb_request *req)
461 {
462         int i, ret;
463         uint64_t seq_number = 0;
464         struct partition_private_data *data = talloc_get_type(module->private_data, 
465                                                               struct partition_private_data);
466         ret = ldb_next_request(module, req);
467         if (ret != LDB_SUCCESS) {
468                 return ret;
469         }
470         seq_number = seq_number + req->op.seq_num.seq_num;
471
472         /* Look at base DN */
473         /* Figure out which partition it is under */
474         /* Skip the lot if 'data' isn't here yet (initialistion) */
475         for (i=0; data && data->partitions && data->partitions[i]; i++) {
476                 struct ldb_module *next = make_module_for_next_request(req, module->ldb, data->partitions[i]->module);
477                 
478                 ret = ldb_next_request(next, req);
479                 talloc_free(next);
480                 if (ret != LDB_SUCCESS) {
481                         return ret;
482                 }
483                 seq_number = seq_number + req->op.seq_num.seq_num;
484         }
485         req->op.seq_num.seq_num = seq_number;
486         return LDB_SUCCESS;
487 }
488
489 static int sort_compare(void *void1,
490                         void *void2, void *opaque)
491 {
492         struct ldb_context *ldb = talloc_get_type(opaque, struct ldb_context);
493         struct partition **pp1 = void1;
494         struct partition **pp2 = void2;
495         struct partition *partition1 = talloc_get_type(*pp1, struct partition);
496         struct partition *partition2 = talloc_get_type(*pp2, struct partition);
497
498         return ldb_dn_compare(ldb, partition1->dn, partition2->dn);
499 }
500
501 static int partition_init(struct ldb_module *module)
502 {
503         int ret, i;
504         TALLOC_CTX *mem_ctx = talloc_new(module);
505         static const char *attrs[] = { "partition", "replicateEntries", "modules", NULL };
506         struct ldb_result *res;
507         struct ldb_message *msg;
508         struct ldb_message_element *partition_attributes;
509         struct ldb_message_element *replicate_attributes;
510         struct ldb_message_element *modules_attributes;
511
512         struct partition_private_data *data;
513
514         if (!mem_ctx) {
515                 return LDB_ERR_OPERATIONS_ERROR;
516         }
517
518         data = talloc(mem_ctx, struct partition_private_data);
519         if (data == NULL) {
520                 return LDB_ERR_OPERATIONS_ERROR;
521         }
522
523         ret = ldb_search(module->ldb, ldb_dn_explode(mem_ctx, "@PARTITION"),
524                          LDB_SCOPE_BASE,
525                          NULL, attrs,
526                          &res);
527         if (ret != LDB_SUCCESS) {
528                 talloc_free(mem_ctx);
529                 return ret;
530         }
531         talloc_steal(mem_ctx, res);
532         if (res->count == 0) {
533                 talloc_free(mem_ctx);
534                 return ldb_next_init(module);
535         }
536
537         if (res->count > 1) {
538                 talloc_free(mem_ctx);
539                 return LDB_ERR_CONSTRAINT_VIOLATION;
540         }
541
542         msg = res->msgs[0];
543
544         partition_attributes = ldb_msg_find_element(msg, "partition");
545         if (!partition_attributes) {
546                 ldb_set_errstring(module->ldb, 
547                                   talloc_asprintf(module, "partition_init: "
548                                                   "no partitions specified"));
549                 talloc_free(mem_ctx);
550                 return LDB_ERR_CONSTRAINT_VIOLATION;
551         }
552         data->partitions = talloc_array(data, struct partition *, partition_attributes->num_values + 1);
553         if (!data->partitions) {
554                 talloc_free(mem_ctx);
555                 return LDB_ERR_OPERATIONS_ERROR;
556         }
557         for (i=0; i < partition_attributes->num_values; i++) {
558                 char *base = talloc_strdup(data->partitions, (char *)partition_attributes->values[i].data);
559                 char *p = strchr(base, ':');
560                 if (!p) {
561                         ldb_set_errstring(module->ldb, 
562                                           talloc_asprintf(module, "partition_init: "
563                                                           "invalid form for partition record (missing ':'): %s", base));
564                         talloc_free(mem_ctx);
565                         return LDB_ERR_CONSTRAINT_VIOLATION;
566                 }
567                 p[0] = '\0';
568                 p++;
569                 if (!p[0]) {
570                         ldb_set_errstring(module->ldb, 
571                                           talloc_asprintf(module, "partition_init: "
572                                                           "invalid form for partition record (missing backend database): %s", base));
573                         talloc_free(mem_ctx);
574                         return LDB_ERR_CONSTRAINT_VIOLATION;
575                 }
576                 data->partitions[i] = talloc(data->partitions, struct partition);
577                 if (!data->partitions[i]) {
578                         talloc_free(mem_ctx);
579                         return LDB_ERR_OPERATIONS_ERROR;
580                 }
581
582                 data->partitions[i]->dn = ldb_dn_explode(data->partitions[i], base);
583                 if (!data->partitions[i]->dn) {
584                         ldb_set_errstring(module->ldb, 
585                                           talloc_asprintf(module, "partition_init: "
586                                                           "invalid DN in partition record: %s", base));
587                         talloc_free(mem_ctx);
588                         return LDB_ERR_CONSTRAINT_VIOLATION;
589                 }
590
591                 data->partitions[i]->backend = private_path(data->partitions[i], p);
592                 ret = ldb_connect_backend(module->ldb, data->partitions[i]->backend, NULL, &data->partitions[i]->module);
593                 if (ret != LDB_SUCCESS) {
594                         talloc_free(mem_ctx);
595                         return ret;
596                 }
597         }
598         data->partitions[i] = NULL;
599
600         /* sort these into order, most to least specific */
601         ldb_qsort(data->partitions, partition_attributes->num_values, sizeof(*data->partitions), 
602                   module->ldb, sort_compare);
603
604         for (i=0; data->partitions[i]; i++) {
605                 struct ldb_request *req;
606                 req = talloc_zero(mem_ctx, struct ldb_request);
607                 if (req == NULL) {
608                         ldb_debug(module->ldb, LDB_DEBUG_ERROR, "partition: Out of memory!\n");
609                         talloc_free(mem_ctx);
610                         return LDB_ERR_OPERATIONS_ERROR;
611                 }
612                 
613                 req->operation = LDB_REQ_REGISTER_PARTITION;
614                 req->op.reg_partition.dn = data->partitions[i]->dn;
615                 
616                 ret = ldb_request(module->ldb, req);
617                 if (ret != LDB_SUCCESS) {
618                         ldb_debug(module->ldb, LDB_DEBUG_ERROR, "partition: Unable to register partition with rootdse!\n");
619                         talloc_free(mem_ctx);
620                         return LDB_ERR_OTHER;
621                 }
622                 talloc_free(req);
623         }
624
625         replicate_attributes = ldb_msg_find_element(msg, "replicateEntries");
626         if (!replicate_attributes) {
627                 data->replicate = NULL;
628         } else {
629                 data->replicate = talloc_array(data, struct ldb_dn *, replicate_attributes->num_values + 1);
630                 if (!data->replicate) {
631                         talloc_free(mem_ctx);
632                         return LDB_ERR_OPERATIONS_ERROR;
633                 }
634                 
635                 for (i=0; i < replicate_attributes->num_values; i++) {
636                         data->replicate[i] = ldb_dn_explode(data->replicate, replicate_attributes->values[i].data);
637                         if (!data->replicate[i]) {
638                                 ldb_set_errstring(module->ldb, 
639                                                   talloc_asprintf(module, "partition_init: "
640                                                                   "invalid DN in partition replicate record: %s", 
641                                                                   replicate_attributes->values[i].data));
642                                 talloc_free(mem_ctx);
643                                 return LDB_ERR_CONSTRAINT_VIOLATION;
644                         }
645                 }
646                 data->replicate[i] = NULL;
647         }
648
649         modules_attributes = ldb_msg_find_element(msg, "modules");
650         if (modules_attributes) {
651                 for (i=0; i < modules_attributes->num_values; i++) {
652                         struct ldb_dn *base_dn;
653                         int partition_idx;
654                         struct partition *partition = NULL;
655                         const char **modules = NULL;
656
657                         char *base = talloc_strdup(data->partitions, (char *)modules_attributes->values[i].data);
658                         char *p = strchr(base, ':');
659                         if (!p) {
660                                 ldb_set_errstring(module->ldb, 
661                                                   talloc_asprintf(mem_ctx, "partition_init: "
662                                                                   "invalid form for partition module record (missing ':'): %s", base));
663                                 talloc_free(mem_ctx);
664                                 return LDB_ERR_CONSTRAINT_VIOLATION;
665                         }
666                         p[0] = '\0';
667                         p++;
668                         if (!p[0]) {
669                                 ldb_set_errstring(module->ldb, 
670                                                   talloc_asprintf(mem_ctx, "partition_init: "
671                                                                   "invalid form for partition module record (missing backend database): %s", base));
672                                 talloc_free(mem_ctx);
673                                 return LDB_ERR_CONSTRAINT_VIOLATION;
674                         }
675
676                         modules = ldb_modules_list_from_string(module->ldb, mem_ctx,
677                                                                p);
678                         
679                         base_dn = ldb_dn_explode(mem_ctx, base);
680                         if (!base_dn) {
681                                 talloc_free(mem_ctx);
682                                 return LDB_ERR_OPERATIONS_ERROR;
683                         }
684                         
685                         for (partition_idx = 0; data->partitions[partition_idx]; partition_idx++) {
686                                 if (ldb_dn_compare(module->ldb, data->partitions[partition_idx]->dn, 
687                                                    base_dn) == 0) {
688                                         partition = data->partitions[partition_idx];
689                                         break;
690                                 }
691                         }
692                         
693                         if (!partition) {
694                                 ldb_set_errstring(module->ldb, 
695                                                   talloc_asprintf(mem_ctx, "partition_init: "
696                                                                   "invalid form for partition module record (no such partition): %s", base));
697                                 talloc_free(mem_ctx);
698                                 return LDB_ERR_CONSTRAINT_VIOLATION;
699                         }
700                         
701                         ret = ldb_load_modules_list(module->ldb, modules, partition->module, &partition->module);
702                         if (ret != LDB_SUCCESS) {
703                                 talloc_free(mem_ctx);
704                                 return ret;
705                         }
706                         ret = ldb_init_module_chain(module->ldb, partition->module);
707                         if (ret != LDB_SUCCESS) {
708                                 talloc_free(mem_ctx);
709                                 return ret;
710                         }
711                 }
712         }
713
714         module->private_data = data;
715         talloc_steal(module, data);
716         
717         talloc_free(mem_ctx);
718         return ldb_next_init(module);
719 }
720
721 static int partition_wait_none(struct ldb_handle *handle) {
722         struct partition_context *ac;
723         int ret;
724         int i;
725     
726         if (!handle || !handle->private_data) {
727                 return LDB_ERR_OPERATIONS_ERROR;
728         }
729
730         if (handle->state == LDB_ASYNC_DONE) {
731                 return handle->status;
732         }
733
734         handle->state = LDB_ASYNC_PENDING;
735         handle->status = LDB_SUCCESS;
736
737         ac = talloc_get_type(handle->private_data, struct partition_context);
738
739         for (i=0; i < ac->num_requests; i++) {
740                 ret = ldb_wait(ac->down_req[i]->handle, LDB_WAIT_NONE);
741                 
742                 if (ret != LDB_SUCCESS) {
743                         handle->status = ret;
744                         goto done;
745                 }
746                 if (ac->down_req[i]->handle->status != LDB_SUCCESS) {
747                         handle->status = ac->down_req[i]->handle->status;
748                         goto done;
749                 }
750                 
751                 if (ac->down_req[i]->handle->state != LDB_ASYNC_DONE) {
752                         return LDB_SUCCESS;
753                 }
754         }
755
756         ret = LDB_SUCCESS;
757
758 done:
759         handle->state = LDB_ASYNC_DONE;
760         return ret;
761 }
762
763
764 static int partition_wait_all(struct ldb_handle *handle) {
765
766         int ret;
767
768         while (handle->state != LDB_ASYNC_DONE) {
769                 ret = partition_wait_none(handle);
770                 if (ret != LDB_SUCCESS) {
771                         return ret;
772                 }
773         }
774
775         return handle->status;
776 }
777
778 static int partition_wait(struct ldb_handle *handle, enum ldb_wait_type type)
779 {
780         if (type == LDB_WAIT_ALL) {
781                 return partition_wait_all(handle);
782         } else {
783                 return partition_wait_none(handle);
784         }
785 }
786
787 static const struct ldb_module_ops partition_ops = {
788         .name              = "partition",
789         .init_context      = partition_init,
790         .search            = partition_search,
791         .add               = partition_add,
792         .modify            = partition_modify,
793         .del               = partition_delete,
794         .rename            = partition_rename,
795         .start_transaction = partition_start_trans,
796         .end_transaction   = partition_end_trans,
797         .del_transaction   = partition_del_trans,
798         .sequence_number   = partition_sequence_number,
799         .wait              = partition_wait
800 };
801
802 int ldb_partition_init(void)
803 {
804         return ldb_register_module(&partition_ops);
805 }