TODO test/sq sort meta data after adding isDeleted
[metze/samba/wip.git] / source4 / dsdb / samdb / ldb_modules / partition.c
1 /* 
2    Partitions ldb module
3
4    Copyright (C) Andrew Bartlett <abartlet@samba.org> 2006
5    Copyright (C) Stefan Metzmacher <metze@samba.org> 2007
6
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 3 of the License, or
10    (at your option) any later version.
11    
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16    
17    You should have received a copy of the GNU General Public License
18    along with this program.  If not, see <http://www.gnu.org/licenses/>.
19 */
20
21 /*
22  *  Name: ldb
23  *
24  *  Component: ldb partitions module
25  *
26  *  Description: Implement LDAP partitions
27  *
28  *  Author: Andrew Bartlett
29  *  Author: Stefan Metzmacher
30  */
31
32 #include "dsdb/samdb/ldb_modules/partition.h"
33
34 struct part_request {
35         struct ldb_module *module;
36         struct ldb_request *req;
37 };
38
39 struct partition_context {
40         struct ldb_module *module;
41         struct ldb_request *req;
42
43         struct part_request *part_req;
44         unsigned int num_requests;
45         unsigned int finished_requests;
46
47         const char **referrals;
48 };
49
50 static struct partition_context *partition_init_ctx(struct ldb_module *module, struct ldb_request *req)
51 {
52         struct partition_context *ac;
53
54         ac = talloc_zero(req, struct partition_context);
55         if (ac == NULL) {
56                 ldb_set_errstring(ldb_module_get_ctx(module), "Out of Memory");
57                 return NULL;
58         }
59
60         ac->module = module;
61         ac->req = req;
62
63         return ac;
64 }
65
66 /*
67  * helper functions to call the next module in chain
68  */
69 int partition_request(struct ldb_module *module, struct ldb_request *request)
70 {
71         if ((module && ldb_module_flags(ldb_module_get_ctx(module)) & LDB_FLG_ENABLE_TRACING)) { \
72                 const struct dsdb_control_current_partition *partition = NULL;
73                 struct ldb_control *partition_ctrl = ldb_request_get_control(request, DSDB_CONTROL_CURRENT_PARTITION_OID);
74                 if (partition_ctrl) {
75                         partition = talloc_get_type(partition_ctrl->data,
76                                                     struct dsdb_control_current_partition);
77                 }
78
79                 if (partition != NULL) {
80                         ldb_debug(ldb_module_get_ctx(module), LDB_DEBUG_TRACE, "partition_request() -> %s",
81                                   ldb_dn_get_linearized(partition->dn));                        
82                 } else {
83                         ldb_debug(ldb_module_get_ctx(module), LDB_DEBUG_TRACE, "partition_request() -> (metadata partition)");
84                 }
85         }
86
87         return ldb_next_request(module, request);
88 }
89
90 static struct dsdb_partition *find_partition(struct partition_private_data *data,
91                                              struct ldb_dn *dn,
92                                              struct ldb_request *req)
93 {
94         unsigned int i;
95         struct ldb_control *partition_ctrl;
96
97         /* see if the request has the partition DN specified in a
98          * control. The repl_meta_data module can specify this to
99          * ensure that replication happens to the right partition
100          */
101         partition_ctrl = ldb_request_get_control(req, DSDB_CONTROL_CURRENT_PARTITION_OID);
102         if (partition_ctrl) {
103                 const struct dsdb_control_current_partition *partition;
104                 partition = talloc_get_type(partition_ctrl->data,
105                                             struct dsdb_control_current_partition);
106                 if (partition != NULL) {
107                         dn = partition->dn;
108                 }
109         }
110
111         if (dn == NULL) {
112                 return NULL;
113         }
114
115         /* Look at base DN */
116         /* Figure out which partition it is under */
117         /* Skip the lot if 'data' isn't here yet (initialisation) */
118         for (i=0; data && data->partitions && data->partitions[i]; i++) {
119                 if (ldb_dn_compare_base(data->partitions[i]->ctrl->dn, dn) == 0) {
120                         return data->partitions[i];
121                 }
122         }
123
124         return NULL;
125 }
126
127 /**
128  * fire the caller's callback for every entry, but only send 'done' once.
129  */
130 static int partition_req_callback(struct ldb_request *req,
131                                   struct ldb_reply *ares)
132 {
133         struct partition_context *ac;
134         struct ldb_module *module;
135         struct ldb_request *nreq;
136         int ret;
137         struct ldb_control *partition_ctrl;
138
139         ac = talloc_get_type(req->context, struct partition_context);
140
141         if (!ares) {
142                 return ldb_module_done(ac->req, NULL, NULL,
143                                         LDB_ERR_OPERATIONS_ERROR);
144         }
145
146         partition_ctrl = ldb_request_get_control(req, DSDB_CONTROL_CURRENT_PARTITION_OID);
147         if (partition_ctrl && (ac->num_requests == 1 || ares->type == LDB_REPLY_ENTRY)) {
148                 /* If we didn't fan this request out to mulitple partitions,
149                  * or this is an individual search result, we can
150                  * deterministically tell the caller what partition this was
151                  * written to (repl_meta_data likes to know) */
152                 ret = ldb_reply_add_control(ares,
153                                             DSDB_CONTROL_CURRENT_PARTITION_OID,
154                                             false, partition_ctrl->data);
155                 if (ret != LDB_SUCCESS) {
156                         return ldb_module_done(ac->req, NULL, NULL,
157                                                ret);
158                 }
159         }
160
161         if (ares->error != LDB_SUCCESS) {
162                 return ldb_module_done(ac->req, ares->controls,
163                                         ares->response, ares->error);
164         }
165
166         switch (ares->type) {
167         case LDB_REPLY_REFERRAL:
168                 return ldb_module_send_referral(ac->req, ares->referral);
169
170         case LDB_REPLY_ENTRY:
171                 if (ac->req->operation != LDB_SEARCH) {
172                         ldb_set_errstring(ldb_module_get_ctx(ac->module),
173                                 "partition_req_callback:"
174                                 " Unsupported reply type for this request");
175                         return ldb_module_done(ac->req, NULL, NULL,
176                                                 LDB_ERR_OPERATIONS_ERROR);
177                 }
178                 
179                 return ldb_module_send_entry(ac->req, ares->message, ares->controls);
180
181         case LDB_REPLY_DONE:
182                 if (ac->req->operation == LDB_EXTENDED) {
183                         /* FIXME: check for ares->response, replmd does not fill it ! */
184                         if (ares->response) {
185                                 if (strcmp(ares->response->oid, LDB_EXTENDED_START_TLS_OID) != 0) {
186                                         ldb_set_errstring(ldb_module_get_ctx(ac->module),
187                                                           "partition_req_callback:"
188                                                           " Unknown extended reply, "
189                                                           "only supports START_TLS");
190                                         talloc_free(ares);
191                                         return ldb_module_done(ac->req, NULL, NULL,
192                                                                 LDB_ERR_OPERATIONS_ERROR);
193                                 }
194                         }
195                 }
196
197                 ac->finished_requests++;
198                 if (ac->finished_requests == ac->num_requests) {
199                         /* Send back referrals if they do exist (search ops) */
200                         if (ac->referrals != NULL) {
201                                 const char **ref;
202                                 for (ref = ac->referrals; *ref != NULL; ++ref) {
203                                         ret = ldb_module_send_referral(ac->req,
204                                                                        talloc_strdup(ac->req, *ref));
205                                         if (ret != LDB_SUCCESS) {
206                                                 return ldb_module_done(ac->req, NULL, NULL,
207                                                                        ret);
208                                         }
209                                 }
210                         }
211
212                         /* this was the last one, call callback */
213                         return ldb_module_done(ac->req, ares->controls,
214                                                ares->response, 
215                                                ares->error);
216                 }
217
218                 /* not the last, now call the next one */
219                 module = ac->part_req[ac->finished_requests].module;
220                 nreq = ac->part_req[ac->finished_requests].req;
221
222                 ret = partition_request(module, nreq);
223                 if (ret != LDB_SUCCESS) {
224                         talloc_free(ares);
225                         return ldb_module_done(ac->req, NULL, NULL, ret);
226                 }
227
228                 break;
229         }
230
231         talloc_free(ares);
232         return LDB_SUCCESS;
233 }
234
235 static int partition_prep_request(struct partition_context *ac,
236                                   struct dsdb_partition *partition)
237 {
238         int ret;
239         struct ldb_request *req;
240         struct ldb_control *partition_ctrl = NULL;
241
242         ac->part_req = talloc_realloc(ac, ac->part_req,
243                                         struct part_request,
244                                         ac->num_requests + 1);
245         if (ac->part_req == NULL) {
246                 return ldb_oom(ldb_module_get_ctx(ac->module));
247         }
248
249         switch (ac->req->operation) {
250         case LDB_SEARCH:
251                 ret = ldb_build_search_req_ex(&req, ldb_module_get_ctx(ac->module),
252                                         ac->part_req,
253                                         ac->req->op.search.base,
254                                         ac->req->op.search.scope,
255                                         ac->req->op.search.tree,
256                                         ac->req->op.search.attrs,
257                                         ac->req->controls,
258                                         ac, partition_req_callback,
259                                         ac->req);
260                 LDB_REQ_SET_LOCATION(req);
261                 break;
262         case LDB_ADD:
263                 ret = ldb_build_add_req(&req, ldb_module_get_ctx(ac->module), ac->part_req,
264                                         ac->req->op.add.message,
265                                         ac->req->controls,
266                                         ac, partition_req_callback,
267                                         ac->req);
268                 LDB_REQ_SET_LOCATION(req);
269                 break;
270         case LDB_MODIFY:
271                 ret = ldb_build_mod_req(&req, ldb_module_get_ctx(ac->module), ac->part_req,
272                                         ac->req->op.mod.message,
273                                         ac->req->controls,
274                                         ac, partition_req_callback,
275                                         ac->req);
276                 LDB_REQ_SET_LOCATION(req);
277                 break;
278         case LDB_DELETE:
279                 ret = ldb_build_del_req(&req, ldb_module_get_ctx(ac->module), ac->part_req,
280                                         ac->req->op.del.dn,
281                                         ac->req->controls,
282                                         ac, partition_req_callback,
283                                         ac->req);
284                 LDB_REQ_SET_LOCATION(req);
285                 break;
286         case LDB_RENAME:
287                 ret = ldb_build_rename_req(&req, ldb_module_get_ctx(ac->module), ac->part_req,
288                                         ac->req->op.rename.olddn,
289                                         ac->req->op.rename.newdn,
290                                         ac->req->controls,
291                                         ac, partition_req_callback,
292                                         ac->req);
293                 LDB_REQ_SET_LOCATION(req);
294                 break;
295         case LDB_EXTENDED:
296                 ret = ldb_build_extended_req(&req, ldb_module_get_ctx(ac->module),
297                                         ac->part_req,
298                                         ac->req->op.extended.oid,
299                                         ac->req->op.extended.data,
300                                         ac->req->controls,
301                                         ac, partition_req_callback,
302                                         ac->req);
303                 LDB_REQ_SET_LOCATION(req);
304                 break;
305         default:
306                 ldb_set_errstring(ldb_module_get_ctx(ac->module),
307                                   "Unsupported request type!");
308                 ret = LDB_ERR_UNWILLING_TO_PERFORM;
309         }
310
311         if (ret != LDB_SUCCESS) {
312                 return ret;
313         }
314
315         ac->part_req[ac->num_requests].req = req;
316
317         if (ac->req->controls) {
318                 /* Duplicate everything beside the current partition control */
319                 partition_ctrl = ldb_request_get_control(ac->req,
320                                                          DSDB_CONTROL_CURRENT_PARTITION_OID);
321                 if (!ldb_save_controls(partition_ctrl, req, NULL)) {
322                         return ldb_module_oom(ac->module);
323                 }
324         }
325
326         if (partition) {
327                 void *part_data = partition->ctrl;
328
329                 ac->part_req[ac->num_requests].module = partition->module;
330
331                 if (partition_ctrl != NULL) {
332                         if (partition_ctrl->data != NULL) {
333                                 part_data = partition_ctrl->data;
334                         }
335
336                         /*
337                          * If the provided current partition control is without
338                          * data then use the calculated one.
339                          */
340                         ret = ldb_request_add_control(req,
341                                                       DSDB_CONTROL_CURRENT_PARTITION_OID,
342                                                       false, part_data);
343                         if (ret != LDB_SUCCESS) {
344                                 return ret;
345                         }
346                 }
347
348                 if (req->operation == LDB_SEARCH) {
349                         /* If the search is for 'more' than this partition,
350                          * then change the basedn, so a remote LDAP server
351                          * doesn't object */
352                         if (ldb_dn_compare_base(partition->ctrl->dn,
353                                                 req->op.search.base) != 0) {
354                                 req->op.search.base = partition->ctrl->dn;
355                         }
356                 }
357
358         } else {
359                 /* make sure you put the module here, or
360                  * or ldb_next_request() will skip a module */
361                 ac->part_req[ac->num_requests].module = ac->module;
362         }
363
364         ac->num_requests++;
365
366         return LDB_SUCCESS;
367 }
368
369 static int partition_call_first(struct partition_context *ac)
370 {
371         return partition_request(ac->part_req[0].module, ac->part_req[0].req);
372 }
373
374 /**
375  * Send a request down to all the partitions
376  */
377 static int partition_send_all(struct ldb_module *module, 
378                               struct partition_context *ac, 
379                               struct ldb_request *req) 
380 {
381         unsigned int i;
382         struct partition_private_data *data = talloc_get_type(ldb_module_get_private(module),
383                                                               struct partition_private_data);
384         int ret = partition_prep_request(ac, NULL);
385         if (ret != LDB_SUCCESS) {
386                 return ret;
387         }
388         for (i=0; data && data->partitions && data->partitions[i]; i++) {
389                 ret = partition_prep_request(ac, data->partitions[i]);
390                 if (ret != LDB_SUCCESS) {
391                         return ret;
392                 }
393         }
394
395         /* fire the first one */
396         return partition_call_first(ac);
397 }
398
399
400 /**
401  * send an operation to the top partition, then copy the resulting
402  * object to all other partitions
403  */
404 static int partition_copy_all(struct ldb_module *module,
405                               struct partition_context *ac,
406                               struct ldb_request *req,
407                               struct ldb_dn *dn)
408 {
409         unsigned int i;
410         struct partition_private_data *data = talloc_get_type(ldb_module_get_private(module),
411                                                               struct partition_private_data);
412         int ret, search_ret;
413         struct ldb_result *res;
414
415         /* do the request on the top level sam.ldb synchronously */
416         ret = ldb_next_request(module, req);
417         if (ret != LDB_SUCCESS) {
418                 return ret;
419         }
420         ret = ldb_wait(req->handle, LDB_WAIT_ALL);
421         if (ret != LDB_SUCCESS) {
422                 return ret;
423         }
424
425         /* now fetch the resulting object, and then copy it to all the
426          * other partitions. We need this approach to cope with the
427          * partitions getting out of sync. If for example the
428          * @ATTRIBUTES object exists on one partition but not the
429          * others then just doing each of the partitions in turn will
430          * lead to an error
431          */
432         search_ret = dsdb_module_search_dn(module, ac, &res, dn, NULL, DSDB_FLAG_NEXT_MODULE, req);
433         if (search_ret != LDB_SUCCESS && ret != LDB_ERR_NO_SUCH_OBJECT) {
434                 return search_ret;
435         }
436
437         /* now delete the object in the other partitions. Once that is
438            done we will re-add the object, if search_ret was not
439            LDB_ERR_NO_SUCH_OBJECT
440         */
441         for (i=0; data->partitions && data->partitions[i]; i++) {
442                 int pret;
443                 pret = dsdb_module_del(data->partitions[i]->module, dn, DSDB_FLAG_NEXT_MODULE, req);
444                 if (pret != LDB_SUCCESS && pret != LDB_ERR_NO_SUCH_OBJECT) {
445                         /* we should only get success or no
446                            such object from the other partitions */
447                         return pret;
448                 }
449         }
450
451
452         if (search_ret != LDB_ERR_NO_SUCH_OBJECT) {
453                 /* now re-add in the other partitions */
454                 for (i=0; data->partitions && data->partitions[i]; i++) {
455                         int pret;
456                         pret = dsdb_module_add(data->partitions[i]->module, res->msgs[0], DSDB_FLAG_NEXT_MODULE, req);
457                         if (pret != LDB_SUCCESS) {
458                                 return pret;
459                         }
460                 }
461         }
462
463         return ldb_module_done(req, NULL, NULL, LDB_SUCCESS);
464 }
465
466 /**
467  * Figure out which backend a request needs to be aimed at.  Some
468  * requests must be replicated to all backends
469  */
470 static int partition_replicate(struct ldb_module *module, struct ldb_request *req, struct ldb_dn *dn) 
471 {
472         struct partition_context *ac;
473         unsigned int i;
474         int ret;
475         struct dsdb_partition *partition;
476         struct partition_private_data *data = talloc_get_type(ldb_module_get_private(module),
477                                                               struct partition_private_data);
478
479         /* if we aren't initialised yet go further */
480         if (!data || !data->partitions) {
481                 return ldb_next_request(module, req);
482         }
483
484         if (ldb_dn_is_special(dn)) {
485                 /* Is this a special DN, we need to replicate to every backend? */
486                 for (i=0; data->replicate && data->replicate[i]; i++) {
487                         if (ldb_dn_compare(data->replicate[i], 
488                                            dn) == 0) {
489                                 
490                                 ac = partition_init_ctx(module, req);
491                                 if (!ac) {
492                                         return ldb_operr(ldb_module_get_ctx(module));
493                                 }
494                                 
495                                 return partition_copy_all(module, ac, req, dn);
496                         }
497                 }
498         }
499
500         /* Otherwise, we need to find the partition to fire it to */
501
502         /* Find partition */
503         partition = find_partition(data, dn, req);
504         if (!partition) {
505                 /*
506                  * if we haven't found a matching partition
507                  * pass the request to the main ldb
508                  *
509                  * TODO: we should maybe return an error here
510                  *       if it's not a special dn
511                  */
512
513                 return ldb_next_request(module, req);
514         }
515
516         ac = partition_init_ctx(module, req);
517         if (!ac) {
518                 return ldb_operr(ldb_module_get_ctx(module));
519         }
520
521         /* we need to add a control but we never touch the original request */
522         ret = partition_prep_request(ac, partition);
523         if (ret != LDB_SUCCESS) {
524                 return ret;
525         }
526
527         /* fire the first one */
528         return partition_call_first(ac);
529 }
530
531 /* search */
532 static int partition_search(struct ldb_module *module, struct ldb_request *req)
533 {
534         struct ldb_control **saved_controls;
535         /* Find backend */
536         struct partition_private_data *data = talloc_get_type(ldb_module_get_private(module),
537                                                               struct partition_private_data);
538         struct partition_context *ac;
539         struct ldb_context *ldb;
540         struct loadparm_context *lp_ctx;
541
542         struct ldb_control *search_control = ldb_request_get_control(req, LDB_CONTROL_SEARCH_OPTIONS_OID);
543         struct ldb_control *domain_scope_control = ldb_request_get_control(req, LDB_CONTROL_DOMAIN_SCOPE_OID);
544         struct ldb_control *no_gc_control = ldb_request_get_control(req, DSDB_CONTROL_NO_GLOBAL_CATALOG);
545         
546         struct ldb_search_options_control *search_options = NULL;
547         struct dsdb_partition *p;
548         unsigned int i, j;
549         int ret;
550         bool domain_scope = false, phantom_root = false;
551
552         /* see if we are still up-to-date */
553         ret = partition_reload_if_required(module, data, req);
554         if (ret != LDB_SUCCESS) {
555                 return ret;
556         }
557
558         p = find_partition(data, NULL, req);
559         if (p != NULL) {
560                 /* the caller specified what partition they want the
561                  * search - just pass it on
562                  */
563                 return ldb_next_request(p->module, req);
564         }
565
566         /* Get back the search options from the search control, and mark it as
567          * non-critical (to make backends and also dcpromo happy).
568          */
569         if (search_control) {
570                 search_options = talloc_get_type(search_control->data, struct ldb_search_options_control);
571                 search_control->critical = 0;
572
573         }
574
575         /* Remove the "domain_scope" control, so we don't confuse a backend
576          * server */
577         if (domain_scope_control && !ldb_save_controls(domain_scope_control, req, &saved_controls)) {
578                 return ldb_oom(ldb_module_get_ctx(module));
579         }
580
581         /* if we aren't initialised yet go further */
582         if (!data || !data->partitions) {
583                 return ldb_next_request(module, req);
584         }
585
586         /* Special DNs without specified partition should go further */
587         if (ldb_dn_is_special(req->op.search.base)) {
588                 return ldb_next_request(module, req);
589         }
590
591         /* Locate the options */
592         domain_scope = (search_options
593                 && (search_options->search_options & LDB_SEARCH_OPTION_DOMAIN_SCOPE))
594                 || domain_scope_control;
595         phantom_root = search_options
596                 && (search_options->search_options & LDB_SEARCH_OPTION_PHANTOM_ROOT);
597
598         /* Remove handled options from the search control flag */
599         if (search_options) {
600                 search_options->search_options = search_options->search_options
601                         & ~LDB_SEARCH_OPTION_DOMAIN_SCOPE
602                         & ~LDB_SEARCH_OPTION_PHANTOM_ROOT;
603         }
604
605         ac = partition_init_ctx(module, req);
606         if (!ac) {
607                 return ldb_operr(ldb_module_get_ctx(module));
608         }
609
610         ldb = ldb_module_get_ctx(ac->module);
611         lp_ctx = talloc_get_type(ldb_get_opaque(ldb, "loadparm"),
612                                                 struct loadparm_context);
613
614         /* Search from the base DN */
615         if (ldb_dn_is_null(req->op.search.base)) {
616                 if (!phantom_root) {
617                         return ldb_error(ldb, LDB_ERR_NO_SUCH_OBJECT, "empty base DN");
618                 }
619                 return partition_send_all(module, ac, req);
620         }
621
622         for (i=0; data->partitions[i]; i++) {
623                 bool match = false, stop = false;
624
625                 if (data->partitions[i]->partial_replica && no_gc_control != NULL) {
626                         if (ldb_dn_compare_base(data->partitions[i]->ctrl->dn,
627                                                 req->op.search.base) == 0) {
628                                 /* base DN is in a partial replica
629                                    with the NO_GLOBAL_CATALOG
630                                    control. This partition is invisible */
631                                 /* DEBUG(0,("DENYING NON-GC OP: %s\n", ldb_module_call_chain(req, req))); */
632                                 continue;
633                         }
634                 }
635
636                 if (phantom_root) {
637                         /* Phantom root: Find all partitions under the
638                          * search base. We match if:
639                          *
640                          * 1) the DN we are looking for exactly matches a
641                          *    certain partition and always stop
642                          * 2) the DN we are looking for is a parent of certain
643                          *    partitions and it isn't a scope base search
644                          * 3) the DN we are looking for is a child of a certain
645                          *    partition and always stop
646                          *    - we don't need to go any further up in the
647                          *    hierarchy!
648                          */
649                         if (ldb_dn_compare(data->partitions[i]->ctrl->dn,
650                                            req->op.search.base) == 0) {
651                                 match = true;
652                                 stop = true;
653                         }
654                         if (!match &&
655                             (ldb_dn_compare_base(req->op.search.base,
656                                                  data->partitions[i]->ctrl->dn) == 0 &&
657                              req->op.search.scope != LDB_SCOPE_BASE)) {
658                                 match = true;
659                         }
660                         if (!match &&
661                             ldb_dn_compare_base(data->partitions[i]->ctrl->dn,
662                                                 req->op.search.base) == 0) {
663                                 match = true;
664                                 stop = true; /* note that this relies on partition ordering */
665                         }
666                 } else {
667                         /* Domain scope: Find all partitions under the search
668                          * base.
669                          *
670                          * We generate referral candidates if we haven't
671                          * specified the domain scope control, haven't a base
672                          * search* scope and the DN we are looking for is a real
673                          * predecessor of certain partitions. When a new
674                          * referral candidate is nearer to the DN than an
675                          * existing one delete the latter (we want to have only
676                          * the closest ones). When we checked this for all
677                          * candidates we have the final referrals.
678                          *
679                          * We match if the DN we are looking for is a child of
680                          * a certain partition or the partition
681                          * DN itself - we don't need to go any further
682                          * up in the hierarchy!
683                          */
684                         if ((!domain_scope) &&
685                             (req->op.search.scope != LDB_SCOPE_BASE) &&
686                             (ldb_dn_compare_base(req->op.search.base,
687                                                  data->partitions[i]->ctrl->dn) == 0) &&
688                             (ldb_dn_compare(req->op.search.base,
689                                             data->partitions[i]->ctrl->dn) != 0)) {
690                                 char *ref = talloc_asprintf(ac,
691                                                             "ldap://%s/%s%s",
692                                                             lpcfg_dnsdomain(lp_ctx),
693                                                             ldb_dn_get_linearized(data->partitions[i]->ctrl->dn),
694                                                             req->op.search.scope == LDB_SCOPE_ONELEVEL ? "??base" : "");
695
696                                 if (ref == NULL) {
697                                         return ldb_oom(ldb);
698                                 }
699
700                                 /* Initialise the referrals list */
701                                 if (ac->referrals == NULL) {
702                                         ac->referrals = (const char **) str_list_make_empty(ac);
703                                         if (ac->referrals == NULL) {
704                                                 return ldb_oom(ldb);
705                                         }
706                                 }
707
708                                 /* Check if the new referral candidate is
709                                  * closer to the base DN than already
710                                  * saved ones and delete the latters */
711                                 j = 0;
712                                 while (ac->referrals[j] != NULL) {
713                                         if (strstr(ac->referrals[j],
714                                                    ldb_dn_get_linearized(data->partitions[i]->ctrl->dn)) != NULL) {
715                                                 str_list_remove(ac->referrals,
716                                                                 ac->referrals[j]);
717                                         } else {
718                                                 ++j;
719                                         }
720                                 }
721
722                                 /* Add our new candidate */
723                                 ac->referrals = str_list_add(ac->referrals, ref);
724
725                                 talloc_free(ref);
726
727                                 if (ac->referrals == NULL) {
728                                         return ldb_oom(ldb);
729                                 }
730                         }
731                         if (ldb_dn_compare_base(data->partitions[i]->ctrl->dn, req->op.search.base) == 0) {
732                                 match = true;
733                                 stop = true; /* note that this relies on partition ordering */
734                         }
735                 }
736
737                 if (match) {
738                         ret = partition_prep_request(ac, data->partitions[i]);
739                         if (ret != LDB_SUCCESS) {
740                                 return ret;
741                         }
742                 }
743
744                 if (stop) break;
745         }
746
747         /* Perhaps we didn't match any partitions. Try the main partition */
748         if (ac->num_requests == 0) {
749                 talloc_free(ac);
750                 return ldb_next_request(module, req);
751         }
752
753         /* fire the first one */
754         return partition_call_first(ac);
755 }
756
757 /* add */
758 static int partition_add(struct ldb_module *module, struct ldb_request *req)
759 {
760         return partition_replicate(module, req, req->op.add.message->dn);
761 }
762
763 /* modify */
764 static int partition_modify(struct ldb_module *module, struct ldb_request *req)
765 {
766         return partition_replicate(module, req, req->op.mod.message->dn);
767 }
768
769 /* delete */
770 static int partition_delete(struct ldb_module *module, struct ldb_request *req)
771 {
772         return partition_replicate(module, req, req->op.del.dn);
773 }
774
775 /* rename */
776 static int partition_rename(struct ldb_module *module, struct ldb_request *req)
777 {
778         /* Find backend */
779         struct dsdb_partition *backend, *backend2;
780         
781         struct partition_private_data *data = talloc_get_type(ldb_module_get_private(module),
782                                                               struct partition_private_data);
783
784         /* Skip the lot if 'data' isn't here yet (initialisation) */
785         if (!data) {
786                 return ldb_operr(ldb_module_get_ctx(module));
787         }
788
789         backend = find_partition(data, req->op.rename.olddn, req);
790         backend2 = find_partition(data, req->op.rename.newdn, req);
791
792         if ((backend && !backend2) || (!backend && backend2)) {
793                 return LDB_ERR_AFFECTS_MULTIPLE_DSAS;
794         }
795
796         if (backend != backend2) {
797                 ldb_asprintf_errstring(ldb_module_get_ctx(module), 
798                                        "Cannot rename from %s in %s to %s in %s: %s",
799                                        ldb_dn_get_linearized(req->op.rename.olddn),
800                                        ldb_dn_get_linearized(backend->ctrl->dn),
801                                        ldb_dn_get_linearized(req->op.rename.newdn),
802                                        ldb_dn_get_linearized(backend2->ctrl->dn),
803                                        ldb_strerror(LDB_ERR_AFFECTS_MULTIPLE_DSAS));
804                 return LDB_ERR_AFFECTS_MULTIPLE_DSAS;
805         }
806
807         return partition_replicate(module, req, req->op.rename.olddn);
808 }
809
810 /* start a transaction */
811 static int partition_start_trans(struct ldb_module *module)
812 {
813         unsigned int i;
814         int ret;
815         struct partition_private_data *data = talloc_get_type(ldb_module_get_private(module),
816                                                               struct partition_private_data);
817         /* Look at base DN */
818         /* Figure out which partition it is under */
819         /* Skip the lot if 'data' isn't here yet (initialization) */
820         if ((module && ldb_module_flags(ldb_module_get_ctx(module)) & LDB_FLG_ENABLE_TRACING)) {
821                 ldb_debug(ldb_module_get_ctx(module), LDB_DEBUG_TRACE, "partition_start_trans() -> (metadata partition)");
822         }
823         ret = ldb_next_start_trans(module);
824         if (ret != LDB_SUCCESS) {
825                 return ret;
826         }
827
828         ret = partition_reload_if_required(module, data, NULL);
829         if (ret != LDB_SUCCESS) {
830                 ldb_next_del_trans(module);
831                 return ret;
832         }
833
834         ret = partition_metadata_start_trans(module);
835         if (ret != LDB_SUCCESS) {
836                 ldb_next_del_trans(module);
837                 return ret;
838         }
839
840         for (i=0; data && data->partitions && data->partitions[i]; i++) {
841                 if ((module && ldb_module_flags(ldb_module_get_ctx(module)) & LDB_FLG_ENABLE_TRACING)) {
842                         ldb_debug(ldb_module_get_ctx(module), LDB_DEBUG_TRACE, "partition_start_trans() -> %s",
843                                   ldb_dn_get_linearized(data->partitions[i]->ctrl->dn));
844                 }
845                 ret = ldb_next_start_trans(data->partitions[i]->module);
846                 if (ret != LDB_SUCCESS) {
847                         /* Back it out, if it fails on one */
848                         for (i--; i >= 0; i--) {
849                                 ldb_next_del_trans(data->partitions[i]->module);
850                         }
851                         ldb_next_del_trans(module);
852                         partition_metadata_del_trans(module);
853                         return ret;
854                 }
855         }
856
857         data->in_transaction++;
858
859         return LDB_SUCCESS;
860 }
861
862 /* prepare for a commit */
863 static int partition_prepare_commit(struct ldb_module *module)
864 {
865         unsigned int i;
866         struct partition_private_data *data = talloc_get_type(ldb_module_get_private(module),
867                                                               struct partition_private_data);
868         int ret;
869
870         for (i=0; data && data->partitions && data->partitions[i]; i++) {
871                 if ((module && ldb_module_flags(ldb_module_get_ctx(module)) & LDB_FLG_ENABLE_TRACING)) {
872                         ldb_debug(ldb_module_get_ctx(module), LDB_DEBUG_TRACE, "partition_prepare_commit() -> %s",
873                                   ldb_dn_get_linearized(data->partitions[i]->ctrl->dn));
874                 }
875                 ret = ldb_next_prepare_commit(data->partitions[i]->module);
876                 if (ret != LDB_SUCCESS) {
877                         ldb_asprintf_errstring(ldb_module_get_ctx(module), "prepare_commit error on %s: %s",
878                                                ldb_dn_get_linearized(data->partitions[i]->ctrl->dn),
879                                                ldb_errstring(ldb_module_get_ctx(module)));
880                         return ret;
881                 }
882         }
883
884         if ((module && ldb_module_flags(ldb_module_get_ctx(module)) & LDB_FLG_ENABLE_TRACING)) {
885                 ldb_debug(ldb_module_get_ctx(module), LDB_DEBUG_TRACE, "partition_prepare_commit() -> (metadata partition)");
886         }
887
888         ret = ldb_next_prepare_commit(module);
889         if (ret != LDB_SUCCESS) {
890                 return ret;
891         }
892
893         /* metadata prepare commit must come last, as other partitions could modify
894          * the database inside the prepare commit method of a module */
895         return partition_metadata_prepare_commit(module);
896 }
897
898
899 /* end a transaction */
900 static int partition_end_trans(struct ldb_module *module)
901 {
902         int ret, ret2;
903         unsigned int i;
904         struct partition_private_data *data = talloc_get_type(ldb_module_get_private(module),
905                                                               struct partition_private_data);
906
907         ret = LDB_SUCCESS;
908
909         if (data->in_transaction == 0) {
910                 DEBUG(0,("partition end transaction mismatch\n"));
911                 ret = LDB_ERR_OPERATIONS_ERROR;
912         } else {
913                 data->in_transaction--;
914         }
915
916         ret2 = partition_metadata_end_trans(module);
917         if (ret2 != LDB_SUCCESS) {
918                 ret = ret2;
919         }
920
921         for (i=0; data && data->partitions && data->partitions[i]; i++) {
922                 if ((module && ldb_module_flags(ldb_module_get_ctx(module)) & LDB_FLG_ENABLE_TRACING)) {
923                         ldb_debug(ldb_module_get_ctx(module), LDB_DEBUG_TRACE, "partition_end_trans() -> %s",
924                                   ldb_dn_get_linearized(data->partitions[i]->ctrl->dn));
925                 }
926                 ret2 = ldb_next_end_trans(data->partitions[i]->module);
927                 if (ret2 != LDB_SUCCESS) {
928                         ldb_asprintf_errstring(ldb_module_get_ctx(module), "end_trans error on %s: %s",
929                                                ldb_dn_get_linearized(data->partitions[i]->ctrl->dn),
930                                                ldb_errstring(ldb_module_get_ctx(module)));
931                         ret = ret2;
932                 }
933         }
934
935         if ((module && ldb_module_flags(ldb_module_get_ctx(module)) & LDB_FLG_ENABLE_TRACING)) {
936                 ldb_debug(ldb_module_get_ctx(module), LDB_DEBUG_TRACE, "partition_end_trans() -> (metadata partition)");
937         }
938         ret2 = ldb_next_end_trans(module);
939         if (ret2 != LDB_SUCCESS) {
940                 ret = ret2;
941         }
942         return ret;
943 }
944
945 /* delete a transaction */
946 static int partition_del_trans(struct ldb_module *module)
947 {
948         int ret, final_ret = LDB_SUCCESS;
949         unsigned int i;
950         struct partition_private_data *data = talloc_get_type(ldb_module_get_private(module),
951                                                               struct partition_private_data);
952         ret = partition_metadata_del_trans(module);
953         if (ret != LDB_SUCCESS) {
954                 final_ret = ret;
955         }
956
957         for (i=0; data && data->partitions && data->partitions[i]; i++) {
958                 if ((module && ldb_module_flags(ldb_module_get_ctx(module)) & LDB_FLG_ENABLE_TRACING)) {
959                         ldb_debug(ldb_module_get_ctx(module), LDB_DEBUG_TRACE, "partition_del_trans() -> %s",
960                                   ldb_dn_get_linearized(data->partitions[i]->ctrl->dn));
961                 }
962                 ret = ldb_next_del_trans(data->partitions[i]->module);
963                 if (ret != LDB_SUCCESS) {
964                         ldb_asprintf_errstring(ldb_module_get_ctx(module), "del_trans error on %s: %s",
965                                                ldb_dn_get_linearized(data->partitions[i]->ctrl->dn),
966                                                ldb_errstring(ldb_module_get_ctx(module)));
967                         final_ret = ret;
968                 }
969         }       
970
971         if (data->in_transaction == 0) {
972                 DEBUG(0,("partition del transaction mismatch\n"));
973                 return ldb_operr(ldb_module_get_ctx(module));
974         }
975         data->in_transaction--;
976
977         if ((module && ldb_module_flags(ldb_module_get_ctx(module)) & LDB_FLG_ENABLE_TRACING)) {
978                 ldb_debug(ldb_module_get_ctx(module), LDB_DEBUG_TRACE, "partition_del_trans() -> (metadata partition)");
979         }
980         ret = ldb_next_del_trans(module);
981         if (ret != LDB_SUCCESS) {
982                 final_ret = ret;
983         }
984         return final_ret;
985 }
986
987 int partition_primary_sequence_number(struct ldb_module *module, TALLOC_CTX *mem_ctx, 
988                                       enum ldb_sequence_type type, uint64_t *seq_number,
989                                       struct ldb_request *parent)
990 {
991         int ret;
992         struct ldb_result *res;
993         struct ldb_seqnum_request *tseq;
994         struct ldb_seqnum_result *seqr;
995
996         tseq = talloc_zero(mem_ctx, struct ldb_seqnum_request);
997         if (tseq == NULL) {
998                 return ldb_oom(ldb_module_get_ctx(module));
999         }
1000         tseq->type = type;
1001         
1002         ret = dsdb_module_extended(module, tseq, &res,
1003                                    LDB_EXTENDED_SEQUENCE_NUMBER,
1004                                    tseq,
1005                                    DSDB_FLAG_NEXT_MODULE,
1006                                    parent);
1007         if (ret != LDB_SUCCESS) {
1008                 talloc_free(tseq);
1009                 return ret;
1010         }
1011         
1012         seqr = talloc_get_type_abort(res->extended->data,
1013                                      struct ldb_seqnum_result);
1014         if (seqr->flags & LDB_SEQ_TIMESTAMP_SEQUENCE) {
1015                 talloc_free(res);
1016                 return ldb_module_error(module, LDB_ERR_OPERATIONS_ERROR,
1017                         "Primary backend in partition module returned a timestamp based seq");
1018         }
1019
1020         *seq_number = seqr->seq_num;
1021         talloc_free(tseq);
1022         return LDB_SUCCESS;
1023 }
1024
1025
1026 /*
1027  * Older version of sequence number as sum of sequence numbers for each partition
1028  */
1029 int partition_sequence_number_from_partitions(struct ldb_module *module,
1030                                               struct ldb_request *req,
1031                                               struct ldb_extended **ext)
1032 {
1033         int ret;
1034         unsigned int i;
1035         uint64_t seq_number = 0;
1036         struct partition_private_data *data = talloc_get_type(ldb_module_get_private(module),
1037                                                               struct partition_private_data);
1038         struct ldb_seqnum_request *seq;
1039         struct ldb_seqnum_result *seqr;
1040         struct ldb_request *treq;
1041         struct ldb_seqnum_request *tseq;
1042         struct ldb_seqnum_result *tseqr;
1043         struct ldb_result *res;
1044         struct dsdb_partition *p;
1045
1046         p = find_partition(data, NULL, req);
1047         if (p != NULL) {
1048                 /* the caller specified what partition they want the
1049                  * sequence number operation on - just pass it on
1050                  */
1051                 return ldb_next_request(p->module, req);                
1052         }
1053
1054         seq = talloc_get_type(req->op.extended.data, struct ldb_seqnum_request);
1055
1056         switch (seq->type) {
1057         case LDB_SEQ_NEXT:
1058         case LDB_SEQ_HIGHEST_SEQ:
1059
1060                 ret = partition_primary_sequence_number(module, req, seq->type, &seq_number, req);
1061                 if (ret != LDB_SUCCESS) {
1062                         return ret;
1063                 }
1064
1065                 /* Skip the lot if 'data' isn't here yet (initialisation) */
1066                 for (i=0; data && data->partitions && data->partitions[i]; i++) {
1067
1068                         res = talloc_zero(req, struct ldb_result);
1069                         if (res == NULL) {
1070                                 return ldb_oom(ldb_module_get_ctx(module));
1071                         }
1072                         tseq = talloc_zero(res, struct ldb_seqnum_request);
1073                         if (tseq == NULL) {
1074                                 talloc_free(res);
1075                                 return ldb_oom(ldb_module_get_ctx(module));
1076                         }
1077                         tseq->type = seq->type;
1078
1079                         ret = ldb_build_extended_req(&treq, ldb_module_get_ctx(module), res,
1080                                                      LDB_EXTENDED_SEQUENCE_NUMBER,
1081                                                      tseq,
1082                                                      NULL,
1083                                                      res,
1084                                                      ldb_extended_default_callback,
1085                                                      req);
1086                         LDB_REQ_SET_LOCATION(treq);
1087                         if (ret != LDB_SUCCESS) {
1088                                 talloc_free(res);
1089                                 return ret;
1090                         }
1091
1092                         ret = ldb_request_add_control(treq,
1093                                                       DSDB_CONTROL_CURRENT_PARTITION_OID,
1094                                                       false, data->partitions[i]->ctrl);
1095                         if (ret != LDB_SUCCESS) {
1096                                 talloc_free(res);
1097                                 return ret;
1098                         }
1099
1100                         ret = partition_request(data->partitions[i]->module, treq);
1101                         if (ret != LDB_SUCCESS) {
1102                                 talloc_free(res);
1103                                 return ret;
1104                         }
1105                         ret = ldb_wait(treq->handle, LDB_WAIT_ALL);
1106                         if (ret != LDB_SUCCESS) {
1107                                 talloc_free(res);
1108                                 return ret;
1109                         }
1110                         tseqr = talloc_get_type(res->extended->data,
1111                                                 struct ldb_seqnum_result);
1112                         seq_number += tseqr->seq_num;
1113                         talloc_free(res);
1114                 }
1115                 break;
1116
1117         case LDB_SEQ_HIGHEST_TIMESTAMP:
1118                 return ldb_module_error(module, LDB_ERR_OPERATIONS_ERROR, "LDB_SEQ_HIGHEST_TIMESTAMP not supported");
1119         }
1120
1121         *ext = talloc_zero(req, struct ldb_extended);
1122         if (!*ext) {
1123                 return ldb_oom(ldb_module_get_ctx(module));
1124         }
1125         seqr = talloc_zero(*ext, struct ldb_seqnum_result);
1126         if (seqr == NULL) {
1127                 talloc_free(*ext);
1128                 return ldb_oom(ldb_module_get_ctx(module));
1129         }
1130         (*ext)->oid = LDB_EXTENDED_SEQUENCE_NUMBER;
1131         (*ext)->data = seqr;
1132
1133         seqr->seq_num = seq_number;
1134         if (seq->type == LDB_SEQ_NEXT) {
1135                 seqr->seq_num++;
1136         }
1137
1138         seqr->flags |= LDB_SEQ_GLOBAL_SEQUENCE;
1139         return LDB_SUCCESS;
1140 }
1141
1142
1143 /*
1144  * Newer version of sequence number using metadata tdb
1145  */
1146 static int partition_sequence_number(struct ldb_module *module, struct ldb_request *req)
1147 {
1148         struct partition_private_data *data = talloc_get_type(ldb_module_get_private(module),
1149                                                               struct partition_private_data);
1150         struct ldb_extended *ext;
1151         struct ldb_seqnum_request *seq;
1152         struct ldb_seqnum_result *seqr;
1153         uint64_t seq_number;
1154         struct dsdb_partition *p;
1155         int ret;
1156
1157         p = find_partition(data, NULL, req);
1158         if (p != NULL) {
1159                 /* the caller specified what partition they want the
1160                  * sequence number operation on - just pass it on
1161                  */
1162                 return ldb_next_request(p->module, req);
1163         }
1164
1165         seq = talloc_get_type_abort(req->op.extended.data, struct ldb_seqnum_request);
1166         switch (seq->type) {
1167         case LDB_SEQ_NEXT:
1168                 ret = partition_metadata_sequence_number_increment(module, &seq_number);
1169                 if (ret != LDB_SUCCESS) {
1170                         return ret;
1171                 }
1172                 break;
1173
1174         case LDB_SEQ_HIGHEST_SEQ:
1175                 ret = partition_metadata_sequence_number(module, &seq_number);
1176                 if (ret != LDB_SUCCESS) {
1177                         return ret;
1178                 }
1179                 break;
1180
1181         case LDB_SEQ_HIGHEST_TIMESTAMP:
1182                 return ldb_module_error(module, LDB_ERR_OPERATIONS_ERROR,
1183                                         "LDB_SEQ_HIGHEST_TIMESTAMP not supported");
1184         }
1185
1186         ext = talloc_zero(req, struct ldb_extended);
1187         if (!ext) {
1188                 return ldb_module_oom(module);
1189         }
1190         seqr = talloc_zero(ext, struct ldb_seqnum_result);
1191         if (seqr == NULL) {
1192                 talloc_free(ext);
1193                 return ldb_module_oom(module);
1194         }
1195         ext->oid = LDB_EXTENDED_SEQUENCE_NUMBER;
1196         ext->data = seqr;
1197
1198         seqr->seq_num = seq_number;
1199         seqr->flags |= LDB_SEQ_GLOBAL_SEQUENCE;
1200
1201         /* send request done */
1202         return ldb_module_done(req, NULL, ext, LDB_SUCCESS);
1203 }
1204
1205 /* extended */
1206 static int partition_extended(struct ldb_module *module, struct ldb_request *req)
1207 {
1208         struct partition_private_data *data = talloc_get_type(ldb_module_get_private(module),
1209                                                               struct partition_private_data);
1210         struct partition_context *ac;
1211         int ret;
1212
1213         /* if we aren't initialised yet go further */
1214         if (!data) {
1215                 return ldb_next_request(module, req);
1216         }
1217
1218         /* see if we are still up-to-date */
1219         ret = partition_reload_if_required(module, data, req);
1220         if (ret != LDB_SUCCESS) {
1221                 return ret;
1222         }
1223         
1224         if (strcmp(req->op.extended.oid, LDB_EXTENDED_SEQUENCE_NUMBER) == 0) {
1225                 return partition_sequence_number(module, req);
1226         }
1227
1228         if (strcmp(req->op.extended.oid, DSDB_EXTENDED_CREATE_PARTITION_OID) == 0) {
1229                 return partition_create(module, req);
1230         }
1231
1232         /* 
1233          * as the extended operation has no dn
1234          * we need to send it to all partitions
1235          */
1236
1237         ac = partition_init_ctx(module, req);
1238         if (!ac) {
1239                 return ldb_operr(ldb_module_get_ctx(module));
1240         }
1241
1242         return partition_send_all(module, ac, req);
1243 }
1244
1245 static const struct ldb_module_ops ldb_partition_module_ops = {
1246         .name              = "partition",
1247         .init_context      = partition_init,
1248         .search            = partition_search,
1249         .add               = partition_add,
1250         .modify            = partition_modify,
1251         .del               = partition_delete,
1252         .rename            = partition_rename,
1253         .extended          = partition_extended,
1254         .start_transaction = partition_start_trans,
1255         .prepare_commit    = partition_prepare_commit,
1256         .end_transaction   = partition_end_trans,
1257         .del_transaction   = partition_del_trans,
1258 };
1259
1260 int ldb_partition_module_init(const char *version)
1261 {
1262         LDB_MODULE_CHECK_VERSION(version);
1263         return ldb_register_module(&ldb_partition_module_ops);
1264 }