4 Copyright (C) Andrew Bartlett <abartlet@samba.org> 2006
5 Copyright (C) Stefan Metzmacher <metze@samba.org> 2007
7 This program is free software; you can redistribute it and/or modify
8 it under the terms of the GNU General Public License as published by
9 the Free Software Foundation; either version 3 of the License, or
10 (at your option) any later version.
12 This program is distributed in the hope that it will be useful,
13 but WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 GNU General Public License for more details.
17 You should have received a copy of the GNU General Public License
18 along with this program. If not, see <http://www.gnu.org/licenses/>.
24 * Component: ldb partitions module
26 * Description: Implement LDAP partitions
28 * Author: Andrew Bartlett
29 * Author: Stefan Metzmacher
32 #include "dsdb/samdb/ldb_modules/partition.h"
35 struct ldb_module *module;
36 struct ldb_request *req;
39 struct partition_context {
40 struct ldb_module *module;
41 struct ldb_request *req;
43 struct part_request *part_req;
44 unsigned int num_requests;
45 unsigned int finished_requests;
47 const char **referrals;
50 static struct partition_context *partition_init_ctx(struct ldb_module *module, struct ldb_request *req)
52 struct partition_context *ac;
54 ac = talloc_zero(req, struct partition_context);
56 ldb_set_errstring(ldb_module_get_ctx(module), "Out of Memory");
67 * helper functions to call the next module in chain
69 int partition_request(struct ldb_module *module, struct ldb_request *request)
71 if ((module && ldb_module_flags(ldb_module_get_ctx(module)) & LDB_FLG_ENABLE_TRACING)) { \
72 const struct dsdb_control_current_partition *partition = NULL;
73 struct ldb_control *partition_ctrl = ldb_request_get_control(request, DSDB_CONTROL_CURRENT_PARTITION_OID);
75 partition = talloc_get_type(partition_ctrl->data,
76 struct dsdb_control_current_partition);
79 if (partition != NULL) {
80 ldb_debug(ldb_module_get_ctx(module), LDB_DEBUG_TRACE, "partition_request() -> %s",
81 ldb_dn_get_linearized(partition->dn));
83 ldb_debug(ldb_module_get_ctx(module), LDB_DEBUG_TRACE, "partition_request() -> (metadata partition)");
87 return ldb_next_request(module, request);
90 static struct dsdb_partition *find_partition(struct partition_private_data *data,
92 struct ldb_request *req)
95 struct ldb_control *partition_ctrl;
97 /* see if the request has the partition DN specified in a
98 * control. The repl_meta_data module can specify this to
99 * ensure that replication happens to the right partition
101 partition_ctrl = ldb_request_get_control(req, DSDB_CONTROL_CURRENT_PARTITION_OID);
102 if (partition_ctrl) {
103 const struct dsdb_control_current_partition *partition;
104 partition = talloc_get_type(partition_ctrl->data,
105 struct dsdb_control_current_partition);
106 if (partition != NULL) {
115 /* Look at base DN */
116 /* Figure out which partition it is under */
117 /* Skip the lot if 'data' isn't here yet (initialisation) */
118 for (i=0; data && data->partitions && data->partitions[i]; i++) {
119 if (ldb_dn_compare_base(data->partitions[i]->ctrl->dn, dn) == 0) {
120 return data->partitions[i];
128 * fire the caller's callback for every entry, but only send 'done' once.
130 static int partition_req_callback(struct ldb_request *req,
131 struct ldb_reply *ares)
133 struct partition_context *ac;
134 struct ldb_module *module;
135 struct ldb_request *nreq;
137 struct partition_private_data *data;
138 struct ldb_control *partition_ctrl;
140 ac = talloc_get_type(req->context, struct partition_context);
141 data = talloc_get_type(ldb_module_get_private(ac->module), struct partition_private_data);
144 return ldb_module_done(ac->req, NULL, NULL,
145 LDB_ERR_OPERATIONS_ERROR);
148 partition_ctrl = ldb_request_get_control(req, DSDB_CONTROL_CURRENT_PARTITION_OID);
149 if (partition_ctrl && (ac->num_requests == 1 || ares->type == LDB_REPLY_ENTRY)) {
150 /* If we didn't fan this request out to mulitple partitions,
151 * or this is an individual search result, we can
152 * deterministically tell the caller what partition this was
153 * written to (repl_meta_data likes to know) */
154 ret = ldb_reply_add_control(ares,
155 DSDB_CONTROL_CURRENT_PARTITION_OID,
156 false, partition_ctrl->data);
157 if (ret != LDB_SUCCESS) {
158 return ldb_module_done(ac->req, NULL, NULL,
163 if (ares->error != LDB_SUCCESS) {
164 return ldb_module_done(ac->req, ares->controls,
165 ares->response, ares->error);
168 switch (ares->type) {
169 case LDB_REPLY_REFERRAL:
170 return ldb_module_send_referral(ac->req, ares->referral);
172 case LDB_REPLY_ENTRY:
173 if (ac->req->operation != LDB_SEARCH) {
174 ldb_set_errstring(ldb_module_get_ctx(ac->module),
175 "partition_req_callback:"
176 " Unsupported reply type for this request");
177 return ldb_module_done(ac->req, NULL, NULL,
178 LDB_ERR_OPERATIONS_ERROR);
181 return ldb_module_send_entry(ac->req, ares->message, ares->controls);
184 if (ac->req->operation == LDB_EXTENDED) {
185 /* FIXME: check for ares->response, replmd does not fill it ! */
186 if (ares->response) {
187 if (strcmp(ares->response->oid, LDB_EXTENDED_START_TLS_OID) != 0) {
188 ldb_set_errstring(ldb_module_get_ctx(ac->module),
189 "partition_req_callback:"
190 " Unknown extended reply, "
191 "only supports START_TLS");
193 return ldb_module_done(ac->req, NULL, NULL,
194 LDB_ERR_OPERATIONS_ERROR);
199 ac->finished_requests++;
200 if (ac->finished_requests == ac->num_requests) {
201 /* Send back referrals if they do exist (search ops) */
202 if (ac->referrals != NULL) {
204 for (ref = ac->referrals; *ref != NULL; ++ref) {
205 ret = ldb_module_send_referral(ac->req,
206 talloc_strdup(ac->req, *ref));
207 if (ret != LDB_SUCCESS) {
208 return ldb_module_done(ac->req, NULL, NULL,
214 /* this was the last one, call callback */
215 return ldb_module_done(ac->req, ares->controls,
220 /* not the last, now call the next one */
221 module = ac->part_req[ac->finished_requests].module;
222 nreq = ac->part_req[ac->finished_requests].req;
224 ret = partition_request(module, nreq);
225 if (ret != LDB_SUCCESS) {
227 return ldb_module_done(ac->req, NULL, NULL, ret);
237 static int partition_prep_request(struct partition_context *ac,
238 struct dsdb_partition *partition)
241 struct ldb_request *req;
242 struct ldb_control *partition_ctrl = NULL;
244 ac->part_req = talloc_realloc(ac, ac->part_req,
246 ac->num_requests + 1);
247 if (ac->part_req == NULL) {
248 return ldb_oom(ldb_module_get_ctx(ac->module));
251 switch (ac->req->operation) {
253 ret = ldb_build_search_req_ex(&req, ldb_module_get_ctx(ac->module),
255 ac->req->op.search.base,
256 ac->req->op.search.scope,
257 ac->req->op.search.tree,
258 ac->req->op.search.attrs,
260 ac, partition_req_callback,
262 LDB_REQ_SET_LOCATION(req);
265 ret = ldb_build_add_req(&req, ldb_module_get_ctx(ac->module), ac->part_req,
266 ac->req->op.add.message,
268 ac, partition_req_callback,
270 LDB_REQ_SET_LOCATION(req);
273 ret = ldb_build_mod_req(&req, ldb_module_get_ctx(ac->module), ac->part_req,
274 ac->req->op.mod.message,
276 ac, partition_req_callback,
278 LDB_REQ_SET_LOCATION(req);
281 ret = ldb_build_del_req(&req, ldb_module_get_ctx(ac->module), ac->part_req,
284 ac, partition_req_callback,
286 LDB_REQ_SET_LOCATION(req);
289 ret = ldb_build_rename_req(&req, ldb_module_get_ctx(ac->module), ac->part_req,
290 ac->req->op.rename.olddn,
291 ac->req->op.rename.newdn,
293 ac, partition_req_callback,
295 LDB_REQ_SET_LOCATION(req);
298 ret = ldb_build_extended_req(&req, ldb_module_get_ctx(ac->module),
300 ac->req->op.extended.oid,
301 ac->req->op.extended.data,
303 ac, partition_req_callback,
305 LDB_REQ_SET_LOCATION(req);
308 ldb_set_errstring(ldb_module_get_ctx(ac->module),
309 "Unsupported request type!");
310 ret = LDB_ERR_UNWILLING_TO_PERFORM;
313 if (ret != LDB_SUCCESS) {
317 ac->part_req[ac->num_requests].req = req;
319 if (ac->req->controls) {
320 /* Duplicate everything beside the current partition control */
321 partition_ctrl = ldb_request_get_control(ac->req,
322 DSDB_CONTROL_CURRENT_PARTITION_OID);
323 if (!ldb_save_controls(partition_ctrl, req, NULL)) {
324 return ldb_module_oom(ac->module);
329 void *part_data = partition->ctrl;
331 ac->part_req[ac->num_requests].module = partition->module;
333 if (partition_ctrl != NULL) {
334 if (partition_ctrl->data != NULL) {
335 part_data = partition_ctrl->data;
339 * If the provided current partition control is without
340 * data then use the calculated one.
342 ret = ldb_request_add_control(req,
343 DSDB_CONTROL_CURRENT_PARTITION_OID,
345 if (ret != LDB_SUCCESS) {
350 if (req->operation == LDB_SEARCH) {
351 /* If the search is for 'more' than this partition,
352 * then change the basedn, so a remote LDAP server
354 if (ldb_dn_compare_base(partition->ctrl->dn,
355 req->op.search.base) != 0) {
356 req->op.search.base = partition->ctrl->dn;
361 /* make sure you put the module here, or
362 * or ldb_next_request() will skip a module */
363 ac->part_req[ac->num_requests].module = ac->module;
371 static int partition_call_first(struct partition_context *ac)
373 return partition_request(ac->part_req[0].module, ac->part_req[0].req);
377 * Send a request down to all the partitions
379 static int partition_send_all(struct ldb_module *module,
380 struct partition_context *ac,
381 struct ldb_request *req)
384 struct partition_private_data *data = talloc_get_type(ldb_module_get_private(module),
385 struct partition_private_data);
386 int ret = partition_prep_request(ac, NULL);
387 if (ret != LDB_SUCCESS) {
390 for (i=0; data && data->partitions && data->partitions[i]; i++) {
391 ret = partition_prep_request(ac, data->partitions[i]);
392 if (ret != LDB_SUCCESS) {
397 /* fire the first one */
398 return partition_call_first(ac);
403 * send an operation to the top partition, then copy the resulting
404 * object to all other partitions
406 static int partition_copy_all(struct ldb_module *module,
407 struct partition_context *ac,
408 struct ldb_request *req,
412 struct partition_private_data *data = talloc_get_type(ldb_module_get_private(module),
413 struct partition_private_data);
415 struct ldb_result *res;
417 /* do the request on the top level sam.ldb synchronously */
418 ret = ldb_next_request(module, req);
419 if (ret != LDB_SUCCESS) {
422 ret = ldb_wait(req->handle, LDB_WAIT_ALL);
423 if (ret != LDB_SUCCESS) {
427 /* now fetch the resulting object, and then copy it to all the
428 * other partitions. We need this approach to cope with the
429 * partitions getting out of sync. If for example the
430 * @ATTRIBUTES object exists on one partition but not the
431 * others then just doing each of the partitions in turn will
434 search_ret = dsdb_module_search_dn(module, ac, &res, dn, NULL, DSDB_FLAG_NEXT_MODULE, req);
435 if (search_ret != LDB_SUCCESS && ret != LDB_ERR_NO_SUCH_OBJECT) {
439 /* now delete the object in the other partitions. Once that is
440 done we will re-add the object, if search_ret was not
441 LDB_ERR_NO_SUCH_OBJECT
443 for (i=0; data->partitions && data->partitions[i]; i++) {
445 pret = dsdb_module_del(data->partitions[i]->module, dn, DSDB_FLAG_NEXT_MODULE, req);
446 if (pret != LDB_SUCCESS && pret != LDB_ERR_NO_SUCH_OBJECT) {
447 /* we should only get success or no
448 such object from the other partitions */
454 if (search_ret != LDB_ERR_NO_SUCH_OBJECT) {
455 /* now re-add in the other partitions */
456 for (i=0; data->partitions && data->partitions[i]; i++) {
458 pret = dsdb_module_add(data->partitions[i]->module, res->msgs[0], DSDB_FLAG_NEXT_MODULE, req);
459 if (pret != LDB_SUCCESS) {
465 return ldb_module_done(req, NULL, NULL, LDB_SUCCESS);
469 * Figure out which backend a request needs to be aimed at. Some
470 * requests must be replicated to all backends
472 static int partition_replicate(struct ldb_module *module, struct ldb_request *req, struct ldb_dn *dn)
474 struct partition_context *ac;
477 struct dsdb_partition *partition;
478 struct partition_private_data *data = talloc_get_type(ldb_module_get_private(module),
479 struct partition_private_data);
481 /* if we aren't initialised yet go further */
482 if (!data || !data->partitions) {
483 return ldb_next_request(module, req);
486 if (ldb_dn_is_special(dn)) {
487 /* Is this a special DN, we need to replicate to every backend? */
488 for (i=0; data->replicate && data->replicate[i]; i++) {
489 if (ldb_dn_compare(data->replicate[i],
492 ac = partition_init_ctx(module, req);
494 return ldb_operr(ldb_module_get_ctx(module));
497 return partition_copy_all(module, ac, req, dn);
502 /* Otherwise, we need to find the partition to fire it to */
505 partition = find_partition(data, dn, req);
508 * if we haven't found a matching partition
509 * pass the request to the main ldb
511 * TODO: we should maybe return an error here
512 * if it's not a special dn
515 return ldb_next_request(module, req);
518 ac = partition_init_ctx(module, req);
520 return ldb_operr(ldb_module_get_ctx(module));
523 /* we need to add a control but we never touch the original request */
524 ret = partition_prep_request(ac, partition);
525 if (ret != LDB_SUCCESS) {
529 /* fire the first one */
530 return partition_call_first(ac);
534 static int partition_search(struct ldb_module *module, struct ldb_request *req)
536 struct ldb_control **saved_controls;
538 struct partition_private_data *data = talloc_get_type(ldb_module_get_private(module),
539 struct partition_private_data);
540 struct partition_context *ac;
541 struct ldb_context *ldb;
542 struct loadparm_context *lp_ctx;
544 struct ldb_control *search_control = ldb_request_get_control(req, LDB_CONTROL_SEARCH_OPTIONS_OID);
545 struct ldb_control *domain_scope_control = ldb_request_get_control(req, LDB_CONTROL_DOMAIN_SCOPE_OID);
546 struct ldb_control *no_gc_control = ldb_request_get_control(req, DSDB_CONTROL_NO_GLOBAL_CATALOG);
548 struct ldb_search_options_control *search_options = NULL;
549 struct dsdb_partition *p;
552 bool domain_scope = false, phantom_root = false;
554 /* see if we are still up-to-date */
555 ret = partition_reload_if_required(module, data, req);
556 if (ret != LDB_SUCCESS) {
560 p = find_partition(data, NULL, req);
562 /* the caller specified what partition they want the
563 * search - just pass it on
565 return ldb_next_request(p->module, req);
568 /* Get back the search options from the search control, and mark it as
569 * non-critical (to make backends and also dcpromo happy).
571 if (search_control) {
572 search_options = talloc_get_type(search_control->data, struct ldb_search_options_control);
573 search_control->critical = 0;
577 /* Remove the "domain_scope" control, so we don't confuse a backend
579 if (domain_scope_control && !ldb_save_controls(domain_scope_control, req, &saved_controls)) {
580 return ldb_oom(ldb_module_get_ctx(module));
583 /* if we aren't initialised yet go further */
584 if (!data || !data->partitions) {
585 return ldb_next_request(module, req);
588 /* Locate the options */
589 domain_scope = (search_options
590 && (search_options->search_options & LDB_SEARCH_OPTION_DOMAIN_SCOPE))
591 || domain_scope_control;
592 phantom_root = search_options
593 && (search_options->search_options & LDB_SEARCH_OPTION_PHANTOM_ROOT);
595 /* Remove handled options from the search control flag */
596 if (search_options) {
597 search_options->search_options = search_options->search_options
598 & ~LDB_SEARCH_OPTION_DOMAIN_SCOPE
599 & ~LDB_SEARCH_OPTION_PHANTOM_ROOT;
602 ac = partition_init_ctx(module, req);
604 return ldb_operr(ldb_module_get_ctx(module));
607 ldb = ldb_module_get_ctx(ac->module);
608 lp_ctx = talloc_get_type(ldb_get_opaque(ldb, "loadparm"),
609 struct loadparm_context);
611 /* Search from the base DN */
612 if (ldb_dn_is_null(req->op.search.base)) {
614 return ldb_error(ldb, LDB_ERR_NO_SUCH_OBJECT, "empty base DN");
616 return partition_send_all(module, ac, req);
619 for (i=0; data->partitions[i]; i++) {
620 bool match = false, stop = false;
622 if (data->partitions[i]->partial_replica && no_gc_control != NULL) {
623 if (ldb_dn_compare_base(data->partitions[i]->ctrl->dn,
624 req->op.search.base) == 0) {
625 /* base DN is in a partial replica
626 with the NO_GLOBAL_CATALOG
627 control. This partition is invisible */
628 /* DEBUG(0,("DENYING NON-GC OP: %s\n", ldb_module_call_chain(req, req))); */
634 /* Phantom root: Find all partitions under the
635 * search base. We match if:
637 * 1) the DN we are looking for exactly matches a
638 * certain partition and always stop
639 * 2) the DN we are looking for is a parent of certain
640 * partitions and it isn't a scope base search
641 * 3) the DN we are looking for is a child of a certain
642 * partition and always stop
643 * - we don't need to go any further up in the
646 if (ldb_dn_compare(data->partitions[i]->ctrl->dn,
647 req->op.search.base) == 0) {
652 (ldb_dn_compare_base(req->op.search.base,
653 data->partitions[i]->ctrl->dn) == 0 &&
654 req->op.search.scope != LDB_SCOPE_BASE)) {
658 ldb_dn_compare_base(data->partitions[i]->ctrl->dn,
659 req->op.search.base) == 0) {
661 stop = true; /* note that this relies on partition ordering */
664 /* Domain scope: Find all partitions under the search
667 * We generate referral candidates if we haven't
668 * specified the domain scope control, haven't a base
669 * search* scope and the DN we are looking for is a real
670 * predecessor of certain partitions. When a new
671 * referral candidate is nearer to the DN than an
672 * existing one delete the latter (we want to have only
673 * the closest ones). When we checked this for all
674 * candidates we have the final referrals.
676 * We match if the DN we are looking for is a child of
677 * a certain partition or the partition
678 * DN itself - we don't need to go any further
679 * up in the hierarchy!
681 if ((!domain_scope) &&
682 (req->op.search.scope != LDB_SCOPE_BASE) &&
683 (ldb_dn_compare_base(req->op.search.base,
684 data->partitions[i]->ctrl->dn) == 0) &&
685 (ldb_dn_compare(req->op.search.base,
686 data->partitions[i]->ctrl->dn) != 0)) {
687 char *ref = talloc_asprintf(ac,
689 lpcfg_dnsdomain(lp_ctx),
690 ldb_dn_get_linearized(data->partitions[i]->ctrl->dn),
691 req->op.search.scope == LDB_SCOPE_ONELEVEL ? "??base" : "");
697 /* Initialise the referrals list */
698 if (ac->referrals == NULL) {
699 ac->referrals = (const char **) str_list_make_empty(ac);
700 if (ac->referrals == NULL) {
705 /* Check if the new referral candidate is
706 * closer to the base DN than already
707 * saved ones and delete the latters */
709 while (ac->referrals[j] != NULL) {
710 if (strstr(ac->referrals[j],
711 ldb_dn_get_linearized(data->partitions[i]->ctrl->dn)) != NULL) {
712 str_list_remove(ac->referrals,
719 /* Add our new candidate */
720 ac->referrals = str_list_add(ac->referrals, ref);
724 if (ac->referrals == NULL) {
728 if (ldb_dn_compare_base(data->partitions[i]->ctrl->dn, req->op.search.base) == 0) {
730 stop = true; /* note that this relies on partition ordering */
735 ret = partition_prep_request(ac, data->partitions[i]);
736 if (ret != LDB_SUCCESS) {
744 /* Perhaps we didn't match any partitions. Try the main partition */
745 if (ac->num_requests == 0) {
747 return ldb_next_request(module, req);
750 /* fire the first one */
751 return partition_call_first(ac);
755 static int partition_add(struct ldb_module *module, struct ldb_request *req)
757 return partition_replicate(module, req, req->op.add.message->dn);
761 static int partition_modify(struct ldb_module *module, struct ldb_request *req)
763 return partition_replicate(module, req, req->op.mod.message->dn);
767 static int partition_delete(struct ldb_module *module, struct ldb_request *req)
769 return partition_replicate(module, req, req->op.del.dn);
773 static int partition_rename(struct ldb_module *module, struct ldb_request *req)
776 struct dsdb_partition *backend, *backend2;
778 struct partition_private_data *data = talloc_get_type(ldb_module_get_private(module),
779 struct partition_private_data);
781 /* Skip the lot if 'data' isn't here yet (initialisation) */
783 return ldb_operr(ldb_module_get_ctx(module));
786 backend = find_partition(data, req->op.rename.olddn, req);
787 backend2 = find_partition(data, req->op.rename.newdn, req);
789 if ((backend && !backend2) || (!backend && backend2)) {
790 return LDB_ERR_AFFECTS_MULTIPLE_DSAS;
793 if (backend != backend2) {
794 ldb_asprintf_errstring(ldb_module_get_ctx(module),
795 "Cannot rename from %s in %s to %s in %s: %s",
796 ldb_dn_get_linearized(req->op.rename.olddn),
797 ldb_dn_get_linearized(backend->ctrl->dn),
798 ldb_dn_get_linearized(req->op.rename.newdn),
799 ldb_dn_get_linearized(backend2->ctrl->dn),
800 ldb_strerror(LDB_ERR_AFFECTS_MULTIPLE_DSAS));
801 return LDB_ERR_AFFECTS_MULTIPLE_DSAS;
804 return partition_replicate(module, req, req->op.rename.olddn);
807 /* start a transaction */
808 static int partition_start_trans(struct ldb_module *module)
812 struct partition_private_data *data = talloc_get_type(ldb_module_get_private(module),
813 struct partition_private_data);
814 /* Look at base DN */
815 /* Figure out which partition it is under */
816 /* Skip the lot if 'data' isn't here yet (initialization) */
817 if ((module && ldb_module_flags(ldb_module_get_ctx(module)) & LDB_FLG_ENABLE_TRACING)) {
818 ldb_debug(ldb_module_get_ctx(module), LDB_DEBUG_TRACE, "partition_start_trans() -> (metadata partition)");
820 ret = ldb_next_start_trans(module);
821 if (ret != LDB_SUCCESS) {
825 ret = partition_reload_if_required(module, data, NULL);
826 if (ret != LDB_SUCCESS) {
830 for (i=0; data && data->partitions && data->partitions[i]; i++) {
831 if ((module && ldb_module_flags(ldb_module_get_ctx(module)) & LDB_FLG_ENABLE_TRACING)) {
832 ldb_debug(ldb_module_get_ctx(module), LDB_DEBUG_TRACE, "partition_start_trans() -> %s",
833 ldb_dn_get_linearized(data->partitions[i]->ctrl->dn));
835 ret = ldb_next_start_trans(data->partitions[i]->module);
836 if (ret != LDB_SUCCESS) {
837 /* Back it out, if it fails on one */
838 for (i--; i >= 0; i--) {
839 ldb_next_del_trans(data->partitions[i]->module);
841 ldb_next_del_trans(module);
846 data->in_transaction++;
851 /* prepare for a commit */
852 static int partition_prepare_commit(struct ldb_module *module)
855 struct partition_private_data *data = talloc_get_type(ldb_module_get_private(module),
856 struct partition_private_data);
858 for (i=0; data && data->partitions && data->partitions[i]; i++) {
861 if ((module && ldb_module_flags(ldb_module_get_ctx(module)) & LDB_FLG_ENABLE_TRACING)) {
862 ldb_debug(ldb_module_get_ctx(module), LDB_DEBUG_TRACE, "partition_prepare_commit() -> %s",
863 ldb_dn_get_linearized(data->partitions[i]->ctrl->dn));
865 ret = ldb_next_prepare_commit(data->partitions[i]->module);
866 if (ret != LDB_SUCCESS) {
867 ldb_asprintf_errstring(ldb_module_get_ctx(module), "prepare_commit error on %s: %s",
868 ldb_dn_get_linearized(data->partitions[i]->ctrl->dn),
869 ldb_errstring(ldb_module_get_ctx(module)));
874 if ((module && ldb_module_flags(ldb_module_get_ctx(module)) & LDB_FLG_ENABLE_TRACING)) {
875 ldb_debug(ldb_module_get_ctx(module), LDB_DEBUG_TRACE, "partition_prepare_commit() -> (metadata partition)");
877 return ldb_next_prepare_commit(module);
881 /* end a transaction */
882 static int partition_end_trans(struct ldb_module *module)
886 struct partition_private_data *data = talloc_get_type(ldb_module_get_private(module),
887 struct partition_private_data);
891 if (data->in_transaction == 0) {
892 DEBUG(0,("partition end transaction mismatch\n"));
893 ret = LDB_ERR_OPERATIONS_ERROR;
895 data->in_transaction--;
898 for (i=0; data && data->partitions && data->partitions[i]; i++) {
899 if ((module && ldb_module_flags(ldb_module_get_ctx(module)) & LDB_FLG_ENABLE_TRACING)) {
900 ldb_debug(ldb_module_get_ctx(module), LDB_DEBUG_TRACE, "partition_end_trans() -> %s",
901 ldb_dn_get_linearized(data->partitions[i]->ctrl->dn));
903 ret2 = ldb_next_end_trans(data->partitions[i]->module);
904 if (ret2 != LDB_SUCCESS) {
905 ldb_asprintf_errstring(ldb_module_get_ctx(module), "end_trans error on %s: %s",
906 ldb_dn_get_linearized(data->partitions[i]->ctrl->dn),
907 ldb_errstring(ldb_module_get_ctx(module)));
912 if ((module && ldb_module_flags(ldb_module_get_ctx(module)) & LDB_FLG_ENABLE_TRACING)) {
913 ldb_debug(ldb_module_get_ctx(module), LDB_DEBUG_TRACE, "partition_end_trans() -> (metadata partition)");
915 ret2 = ldb_next_end_trans(module);
916 if (ret2 != LDB_SUCCESS) {
922 /* delete a transaction */
923 static int partition_del_trans(struct ldb_module *module)
925 int ret, final_ret = LDB_SUCCESS;
927 struct partition_private_data *data = talloc_get_type(ldb_module_get_private(module),
928 struct partition_private_data);
929 for (i=0; data && data->partitions && data->partitions[i]; i++) {
930 if ((module && ldb_module_flags(ldb_module_get_ctx(module)) & LDB_FLG_ENABLE_TRACING)) {
931 ldb_debug(ldb_module_get_ctx(module), LDB_DEBUG_TRACE, "partition_del_trans() -> %s",
932 ldb_dn_get_linearized(data->partitions[i]->ctrl->dn));
934 ret = ldb_next_del_trans(data->partitions[i]->module);
935 if (ret != LDB_SUCCESS) {
936 ldb_asprintf_errstring(ldb_module_get_ctx(module), "del_trans error on %s: %s",
937 ldb_dn_get_linearized(data->partitions[i]->ctrl->dn),
938 ldb_errstring(ldb_module_get_ctx(module)));
943 if (data->in_transaction == 0) {
944 DEBUG(0,("partition del transaction mismatch\n"));
945 return ldb_operr(ldb_module_get_ctx(module));
947 data->in_transaction--;
949 if ((module && ldb_module_flags(ldb_module_get_ctx(module)) & LDB_FLG_ENABLE_TRACING)) {
950 ldb_debug(ldb_module_get_ctx(module), LDB_DEBUG_TRACE, "partition_del_trans() -> (metadata partition)");
952 ret = ldb_next_del_trans(module);
953 if (ret != LDB_SUCCESS) {
959 int partition_primary_sequence_number(struct ldb_module *module, TALLOC_CTX *mem_ctx,
960 enum ldb_sequence_type type, uint64_t *seq_number)
963 struct ldb_result *res;
964 struct ldb_seqnum_request *tseq;
965 struct ldb_request *treq;
966 struct ldb_seqnum_result *seqr;
967 res = talloc_zero(mem_ctx, struct ldb_result);
969 return ldb_oom(ldb_module_get_ctx(module));
971 tseq = talloc_zero(res, struct ldb_seqnum_request);
974 return ldb_oom(ldb_module_get_ctx(module));
978 ret = ldb_build_extended_req(&treq, ldb_module_get_ctx(module), res,
979 LDB_EXTENDED_SEQUENCE_NUMBER,
983 ldb_extended_default_callback,
985 LDB_REQ_SET_LOCATION(treq);
986 if (ret != LDB_SUCCESS) {
991 ret = ldb_next_request(module, treq);
992 if (ret != LDB_SUCCESS) {
996 ret = ldb_wait(treq->handle, LDB_WAIT_ALL);
997 if (ret != LDB_SUCCESS) {
1002 seqr = talloc_get_type(res->extended->data,
1003 struct ldb_seqnum_result);
1004 if (seqr->flags & LDB_SEQ_TIMESTAMP_SEQUENCE) {
1005 ret = LDB_ERR_OPERATIONS_ERROR;
1006 ldb_set_errstring(ldb_module_get_ctx(module), "Primary backend in partitions module returned a timestamp based seq number (must return a normal number)");
1010 *seq_number = seqr->seq_num;
1016 /* FIXME: This function is still semi-async */
1017 static int partition_sequence_number(struct ldb_module *module, struct ldb_request *req)
1021 uint64_t seq_number = 0;
1022 uint64_t timestamp_sequence = 0;
1023 uint64_t timestamp = 0;
1024 struct partition_private_data *data = talloc_get_type(ldb_module_get_private(module),
1025 struct partition_private_data);
1026 struct ldb_seqnum_request *seq;
1027 struct ldb_seqnum_result *seqr;
1028 struct ldb_request *treq;
1029 struct ldb_seqnum_request *tseq;
1030 struct ldb_seqnum_result *tseqr;
1031 struct ldb_extended *ext;
1032 struct ldb_result *res;
1033 struct dsdb_partition *p;
1035 p = find_partition(data, NULL, req);
1037 /* the caller specified what partition they want the
1038 * sequence number operation on - just pass it on
1040 return ldb_next_request(p->module, req);
1043 seq = talloc_get_type(req->op.extended.data, struct ldb_seqnum_request);
1045 switch (seq->type) {
1047 case LDB_SEQ_HIGHEST_SEQ:
1049 ret = partition_primary_sequence_number(module, req, seq->type, &seq_number);
1050 if (ret != LDB_SUCCESS) {
1054 /* Skip the lot if 'data' isn't here yet (initialisation) */
1055 for (i=0; data && data->partitions && data->partitions[i]; i++) {
1057 res = talloc_zero(req, struct ldb_result);
1059 return ldb_oom(ldb_module_get_ctx(module));
1061 tseq = talloc_zero(res, struct ldb_seqnum_request);
1064 return ldb_oom(ldb_module_get_ctx(module));
1066 tseq->type = seq->type;
1068 ret = ldb_build_extended_req(&treq, ldb_module_get_ctx(module), res,
1069 LDB_EXTENDED_SEQUENCE_NUMBER,
1073 ldb_extended_default_callback,
1075 LDB_REQ_SET_LOCATION(treq);
1076 if (ret != LDB_SUCCESS) {
1081 ret = ldb_request_add_control(treq,
1082 DSDB_CONTROL_CURRENT_PARTITION_OID,
1083 false, data->partitions[i]->ctrl);
1084 if (ret != LDB_SUCCESS) {
1089 ret = partition_request(data->partitions[i]->module, treq);
1090 if (ret != LDB_SUCCESS) {
1094 ret = ldb_wait(treq->handle, LDB_WAIT_ALL);
1095 if (ret != LDB_SUCCESS) {
1099 tseqr = talloc_get_type(res->extended->data,
1100 struct ldb_seqnum_result);
1101 if (tseqr->flags & LDB_SEQ_TIMESTAMP_SEQUENCE) {
1102 timestamp_sequence = MAX(timestamp_sequence,
1105 seq_number += tseqr->seq_num;
1110 case LDB_SEQ_HIGHEST_TIMESTAMP:
1112 res = talloc_zero(req, struct ldb_result);
1114 return ldb_oom(ldb_module_get_ctx(module));
1117 tseq = talloc_zero(res, struct ldb_seqnum_request);
1120 return ldb_oom(ldb_module_get_ctx(module));
1122 tseq->type = LDB_SEQ_HIGHEST_TIMESTAMP;
1124 ret = ldb_build_extended_req(&treq, ldb_module_get_ctx(module), res,
1125 LDB_EXTENDED_SEQUENCE_NUMBER,
1129 ldb_extended_default_callback,
1131 LDB_REQ_SET_LOCATION(treq);
1132 if (ret != LDB_SUCCESS) {
1137 ret = ldb_next_request(module, treq);
1138 if (ret != LDB_SUCCESS) {
1142 ret = ldb_wait(treq->handle, LDB_WAIT_ALL);
1143 if (ret != LDB_SUCCESS) {
1148 tseqr = talloc_get_type(res->extended->data,
1149 struct ldb_seqnum_result);
1150 timestamp = tseqr->seq_num;
1154 /* Skip the lot if 'data' isn't here yet (initialisation) */
1155 for (i=0; data && data->partitions && data->partitions[i]; i++) {
1157 res = talloc_zero(req, struct ldb_result);
1159 return ldb_oom(ldb_module_get_ctx(module));
1162 tseq = talloc_zero(res, struct ldb_seqnum_request);
1165 return ldb_oom(ldb_module_get_ctx(module));
1167 tseq->type = LDB_SEQ_HIGHEST_TIMESTAMP;
1169 ret = ldb_build_extended_req(&treq, ldb_module_get_ctx(module), res,
1170 LDB_EXTENDED_SEQUENCE_NUMBER,
1174 ldb_extended_default_callback,
1176 LDB_REQ_SET_LOCATION(treq);
1177 if (ret != LDB_SUCCESS) {
1182 ret = ldb_request_add_control(treq,
1183 DSDB_CONTROL_CURRENT_PARTITION_OID,
1184 false, data->partitions[i]->ctrl);
1185 if (ret != LDB_SUCCESS) {
1190 ret = partition_request(data->partitions[i]->module, treq);
1191 if (ret != LDB_SUCCESS) {
1195 ret = ldb_wait(treq->handle, LDB_WAIT_ALL);
1196 if (ret != LDB_SUCCESS) {
1201 tseqr = talloc_get_type(res->extended->data,
1202 struct ldb_seqnum_result);
1203 timestamp = MAX(timestamp, tseqr->seq_num);
1211 ext = talloc_zero(req, struct ldb_extended);
1213 return ldb_oom(ldb_module_get_ctx(module));
1215 seqr = talloc_zero(ext, struct ldb_seqnum_result);
1218 return ldb_oom(ldb_module_get_ctx(module));
1220 ext->oid = LDB_EXTENDED_SEQUENCE_NUMBER;
1223 switch (seq->type) {
1225 case LDB_SEQ_HIGHEST_SEQ:
1227 /* Has someone above set a timebase sequence? */
1228 if (timestamp_sequence) {
1229 seqr->seq_num = (((unsigned long long)timestamp << 24) | (seq_number & 0xFFFFFF));
1231 seqr->seq_num = seq_number;
1234 if (timestamp_sequence > seqr->seq_num) {
1235 seqr->seq_num = timestamp_sequence;
1236 seqr->flags |= LDB_SEQ_TIMESTAMP_SEQUENCE;
1239 seqr->flags |= LDB_SEQ_GLOBAL_SEQUENCE;
1241 case LDB_SEQ_HIGHEST_TIMESTAMP:
1242 seqr->seq_num = timestamp;
1246 if (seq->type == LDB_SEQ_NEXT) {
1250 /* send request done */
1251 return ldb_module_done(req, NULL, ext, LDB_SUCCESS);
1255 static int partition_extended(struct ldb_module *module, struct ldb_request *req)
1257 struct partition_private_data *data = talloc_get_type(ldb_module_get_private(module),
1258 struct partition_private_data);
1259 struct partition_context *ac;
1262 /* if we aren't initialised yet go further */
1264 return ldb_next_request(module, req);
1267 /* see if we are still up-to-date */
1268 ret = partition_reload_if_required(module, data, req);
1269 if (ret != LDB_SUCCESS) {
1273 if (strcmp(req->op.extended.oid, LDB_EXTENDED_SEQUENCE_NUMBER) == 0) {
1274 return partition_sequence_number(module, req);
1277 if (strcmp(req->op.extended.oid, DSDB_EXTENDED_CREATE_PARTITION_OID) == 0) {
1278 return partition_create(module, req);
1282 * as the extended operation has no dn
1283 * we need to send it to all partitions
1286 ac = partition_init_ctx(module, req);
1288 return ldb_operr(ldb_module_get_ctx(module));
1291 return partition_send_all(module, ac, req);
1294 static const struct ldb_module_ops ldb_partition_module_ops = {
1295 .name = "partition",
1296 .init_context = partition_init,
1297 .search = partition_search,
1298 .add = partition_add,
1299 .modify = partition_modify,
1300 .del = partition_delete,
1301 .rename = partition_rename,
1302 .extended = partition_extended,
1303 .start_transaction = partition_start_trans,
1304 .prepare_commit = partition_prepare_commit,
1305 .end_transaction = partition_end_trans,
1306 .del_transaction = partition_del_trans,
1309 int ldb_partition_module_init(const char *version)
1311 LDB_MODULE_CHECK_VERSION(version);
1312 return ldb_register_module(&ldb_partition_module_ops);