bd636c7a0b55844c529345e682f360f0a0107918
[samba.git] / source4 / dsdb / samdb / ldb_modules / partition.c
1 /* 
2    Partitions ldb module
3
4    Copyright (C) Andrew Bartlett <abartlet@samba.org> 2006
5    Copyright (C) Stefan Metzmacher <metze@samba.org> 2007
6
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 3 of the License, or
10    (at your option) any later version.
11    
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16    
17    You should have received a copy of the GNU General Public License
18    along with this program.  If not, see <http://www.gnu.org/licenses/>.
19 */
20
21 /*
22  *  Name: ldb
23  *
24  *  Component: ldb partitions module
25  *
26  *  Description: Implement LDAP partitions
27  *
28  *  Author: Andrew Bartlett
29  *  Author: Stefan Metzmacher
30  */
31
32 #include "dsdb/samdb/ldb_modules/partition.h"
33
34 struct part_request {
35         struct ldb_module *module;
36         struct ldb_request *req;
37 };
38
39 struct partition_context {
40         struct ldb_module *module;
41         struct ldb_request *req;
42
43         struct part_request *part_req;
44         unsigned int num_requests;
45         unsigned int finished_requests;
46
47         const char **referrals;
48 };
49
50 static struct partition_context *partition_init_ctx(struct ldb_module *module, struct ldb_request *req)
51 {
52         struct partition_context *ac;
53
54         ac = talloc_zero(req, struct partition_context);
55         if (ac == NULL) {
56                 ldb_set_errstring(ldb_module_get_ctx(module), "Out of Memory");
57                 return NULL;
58         }
59
60         ac->module = module;
61         ac->req = req;
62
63         return ac;
64 }
65
66 /*
67  * helper functions to call the next module in chain
68  */
69 int partition_request(struct ldb_module *module, struct ldb_request *request)
70 {
71         if ((module && ldb_module_flags(ldb_module_get_ctx(module)) & LDB_FLG_ENABLE_TRACING)) { \
72                 const struct dsdb_control_current_partition *partition = NULL;
73                 struct ldb_control *partition_ctrl = ldb_request_get_control(request, DSDB_CONTROL_CURRENT_PARTITION_OID);
74                 if (partition_ctrl) {
75                         partition = talloc_get_type(partition_ctrl->data,
76                                                     struct dsdb_control_current_partition);
77                 }
78
79                 if (partition != NULL) {
80                         ldb_debug(ldb_module_get_ctx(module), LDB_DEBUG_TRACE, "partition_request() -> %s",
81                                   ldb_dn_get_linearized(partition->dn));                        
82                 } else {
83                         ldb_debug(ldb_module_get_ctx(module), LDB_DEBUG_TRACE, "partition_request() -> (metadata partition)");
84                 }
85         }
86
87         return ldb_next_request(module, request);
88 }
89
90 static struct dsdb_partition *find_partition(struct partition_private_data *data,
91                                              struct ldb_dn *dn,
92                                              struct ldb_request *req)
93 {
94         unsigned int i;
95         struct ldb_control *partition_ctrl;
96
97         /* see if the request has the partition DN specified in a
98          * control. The repl_meta_data module can specify this to
99          * ensure that replication happens to the right partition
100          */
101         partition_ctrl = ldb_request_get_control(req, DSDB_CONTROL_CURRENT_PARTITION_OID);
102         if (partition_ctrl) {
103                 const struct dsdb_control_current_partition *partition;
104                 partition = talloc_get_type(partition_ctrl->data,
105                                             struct dsdb_control_current_partition);
106                 if (partition != NULL) {
107                         dn = partition->dn;
108                 }
109         }
110
111         if (dn == NULL) {
112                 return NULL;
113         }
114
115         /* Look at base DN */
116         /* Figure out which partition it is under */
117         /* Skip the lot if 'data' isn't here yet (initialisation) */
118         for (i=0; data && data->partitions && data->partitions[i]; i++) {
119                 if (ldb_dn_compare_base(data->partitions[i]->ctrl->dn, dn) == 0) {
120                         return data->partitions[i];
121                 }
122         }
123
124         return NULL;
125 }
126
127 /**
128  * fire the caller's callback for every entry, but only send 'done' once.
129  */
130 static int partition_req_callback(struct ldb_request *req,
131                                   struct ldb_reply *ares)
132 {
133         struct partition_context *ac;
134         struct ldb_module *module;
135         struct ldb_request *nreq;
136         int ret;
137         struct ldb_control *partition_ctrl;
138
139         ac = talloc_get_type(req->context, struct partition_context);
140
141         if (!ares) {
142                 return ldb_module_done(ac->req, NULL, NULL,
143                                         LDB_ERR_OPERATIONS_ERROR);
144         }
145
146         partition_ctrl = ldb_request_get_control(req, DSDB_CONTROL_CURRENT_PARTITION_OID);
147         if (partition_ctrl && (ac->num_requests == 1 || ares->type == LDB_REPLY_ENTRY)) {
148                 /* If we didn't fan this request out to multiple partitions,
149                  * or this is an individual search result, we can
150                  * deterministically tell the caller what partition this was
151                  * written to (repl_meta_data likes to know) */
152                 ret = ldb_reply_add_control(ares,
153                                             DSDB_CONTROL_CURRENT_PARTITION_OID,
154                                             false, partition_ctrl->data);
155                 if (ret != LDB_SUCCESS) {
156                         return ldb_module_done(ac->req, NULL, NULL,
157                                                ret);
158                 }
159         }
160
161         if (ares->error != LDB_SUCCESS) {
162                 return ldb_module_done(ac->req, ares->controls,
163                                         ares->response, ares->error);
164         }
165
166         switch (ares->type) {
167         case LDB_REPLY_REFERRAL:
168                 return ldb_module_send_referral(ac->req, ares->referral);
169
170         case LDB_REPLY_ENTRY:
171                 if (ac->req->operation != LDB_SEARCH) {
172                         ldb_set_errstring(ldb_module_get_ctx(ac->module),
173                                 "partition_req_callback:"
174                                 " Unsupported reply type for this request");
175                         return ldb_module_done(ac->req, NULL, NULL,
176                                                 LDB_ERR_OPERATIONS_ERROR);
177                 }
178                 
179                 return ldb_module_send_entry(ac->req, ares->message, ares->controls);
180
181         case LDB_REPLY_DONE:
182                 if (ac->req->operation == LDB_EXTENDED) {
183                         /* FIXME: check for ares->response, replmd does not fill it ! */
184                         if (ares->response) {
185                                 if (strcmp(ares->response->oid, LDB_EXTENDED_START_TLS_OID) != 0) {
186                                         ldb_set_errstring(ldb_module_get_ctx(ac->module),
187                                                           "partition_req_callback:"
188                                                           " Unknown extended reply, "
189                                                           "only supports START_TLS");
190                                         talloc_free(ares);
191                                         return ldb_module_done(ac->req, NULL, NULL,
192                                                                 LDB_ERR_OPERATIONS_ERROR);
193                                 }
194                         }
195                 }
196
197                 ac->finished_requests++;
198                 if (ac->finished_requests == ac->num_requests) {
199                         /* Send back referrals if they do exist (search ops) */
200                         if (ac->referrals != NULL) {
201                                 const char **ref;
202                                 for (ref = ac->referrals; *ref != NULL; ++ref) {
203                                         ret = ldb_module_send_referral(ac->req,
204                                                                        talloc_strdup(ac->req, *ref));
205                                         if (ret != LDB_SUCCESS) {
206                                                 return ldb_module_done(ac->req, NULL, NULL,
207                                                                        ret);
208                                         }
209                                 }
210                         }
211
212                         /* this was the last one, call callback */
213                         return ldb_module_done(ac->req, ares->controls,
214                                                ares->response, 
215                                                ares->error);
216                 }
217
218                 /* not the last, now call the next one */
219                 module = ac->part_req[ac->finished_requests].module;
220                 nreq = ac->part_req[ac->finished_requests].req;
221
222                 ret = partition_request(module, nreq);
223                 if (ret != LDB_SUCCESS) {
224                         talloc_free(ares);
225                         return ldb_module_done(ac->req, NULL, NULL, ret);
226                 }
227
228                 break;
229         }
230
231         talloc_free(ares);
232         return LDB_SUCCESS;
233 }
234
235 static int partition_prep_request(struct partition_context *ac,
236                                   struct dsdb_partition *partition)
237 {
238         int ret;
239         struct ldb_request *req;
240         struct ldb_control *partition_ctrl = NULL;
241         void *part_data = NULL;
242
243         ac->part_req = talloc_realloc(ac, ac->part_req,
244                                         struct part_request,
245                                         ac->num_requests + 1);
246         if (ac->part_req == NULL) {
247                 return ldb_oom(ldb_module_get_ctx(ac->module));
248         }
249
250         switch (ac->req->operation) {
251         case LDB_SEARCH:
252                 ret = ldb_build_search_req_ex(&req, ldb_module_get_ctx(ac->module),
253                                         ac->part_req,
254                                         ac->req->op.search.base,
255                                         ac->req->op.search.scope,
256                                         ac->req->op.search.tree,
257                                         ac->req->op.search.attrs,
258                                         ac->req->controls,
259                                         ac, partition_req_callback,
260                                         ac->req);
261                 LDB_REQ_SET_LOCATION(req);
262                 break;
263         case LDB_ADD:
264                 ret = ldb_build_add_req(&req, ldb_module_get_ctx(ac->module), ac->part_req,
265                                         ac->req->op.add.message,
266                                         ac->req->controls,
267                                         ac, partition_req_callback,
268                                         ac->req);
269                 LDB_REQ_SET_LOCATION(req);
270                 break;
271         case LDB_MODIFY:
272                 ret = ldb_build_mod_req(&req, ldb_module_get_ctx(ac->module), ac->part_req,
273                                         ac->req->op.mod.message,
274                                         ac->req->controls,
275                                         ac, partition_req_callback,
276                                         ac->req);
277                 LDB_REQ_SET_LOCATION(req);
278                 break;
279         case LDB_DELETE:
280                 ret = ldb_build_del_req(&req, ldb_module_get_ctx(ac->module), ac->part_req,
281                                         ac->req->op.del.dn,
282                                         ac->req->controls,
283                                         ac, partition_req_callback,
284                                         ac->req);
285                 LDB_REQ_SET_LOCATION(req);
286                 break;
287         case LDB_RENAME:
288                 ret = ldb_build_rename_req(&req, ldb_module_get_ctx(ac->module), ac->part_req,
289                                         ac->req->op.rename.olddn,
290                                         ac->req->op.rename.newdn,
291                                         ac->req->controls,
292                                         ac, partition_req_callback,
293                                         ac->req);
294                 LDB_REQ_SET_LOCATION(req);
295                 break;
296         case LDB_EXTENDED:
297                 ret = ldb_build_extended_req(&req, ldb_module_get_ctx(ac->module),
298                                         ac->part_req,
299                                         ac->req->op.extended.oid,
300                                         ac->req->op.extended.data,
301                                         ac->req->controls,
302                                         ac, partition_req_callback,
303                                         ac->req);
304                 LDB_REQ_SET_LOCATION(req);
305                 break;
306         default:
307                 ldb_set_errstring(ldb_module_get_ctx(ac->module),
308                                   "Unsupported request type!");
309                 ret = LDB_ERR_UNWILLING_TO_PERFORM;
310         }
311
312         if (ret != LDB_SUCCESS) {
313                 return ret;
314         }
315
316         ac->part_req[ac->num_requests].req = req;
317
318         if (ac->req->controls) {
319                 /* Duplicate everything beside the current partition control */
320                 partition_ctrl = ldb_request_get_control(ac->req,
321                                                          DSDB_CONTROL_CURRENT_PARTITION_OID);
322                 if (!ldb_save_controls(partition_ctrl, req, NULL)) {
323                         return ldb_module_oom(ac->module);
324                 }
325         }
326
327         part_data = partition->ctrl;
328
329         ac->part_req[ac->num_requests].module = partition->module;
330
331         if (partition_ctrl != NULL) {
332                 if (partition_ctrl->data != NULL) {
333                         part_data = partition_ctrl->data;
334                 }
335
336                 /*
337                  * If the provided current partition control is without
338                  * data then use the calculated one.
339                  */
340                 ret = ldb_request_add_control(req,
341                                               DSDB_CONTROL_CURRENT_PARTITION_OID,
342                                               false, part_data);
343                 if (ret != LDB_SUCCESS) {
344                         return ret;
345                 }
346         }
347
348         if (req->operation == LDB_SEARCH) {
349                 /*
350                  * If the search is for 'more' than this partition,
351                  * then change the basedn, so the check of the BASE DN
352                  * still passes in the ldb_key_value layer
353                  */
354                 if (ldb_dn_compare_base(partition->ctrl->dn,
355                                         req->op.search.base) != 0) {
356                         req->op.search.base = partition->ctrl->dn;
357                 }
358         }
359
360         ac->num_requests++;
361
362         return LDB_SUCCESS;
363 }
364
365 static int partition_call_first(struct partition_context *ac)
366 {
367         return partition_request(ac->part_req[0].module, ac->part_req[0].req);
368 }
369
370 /**
371  * Send a request down to all the partitions (but not the sam.ldb file)
372  */
373 static int partition_send_all(struct ldb_module *module, 
374                               struct partition_context *ac, 
375                               struct ldb_request *req) 
376 {
377         unsigned int i;
378         struct partition_private_data *data = talloc_get_type(ldb_module_get_private(module),
379                                                               struct partition_private_data);
380         int ret;
381
382         for (i=0; data && data->partitions && data->partitions[i]; i++) {
383                 ret = partition_prep_request(ac, data->partitions[i]);
384                 if (ret != LDB_SUCCESS) {
385                         return ret;
386                 }
387         }
388
389         /* fire the first one */
390         return partition_call_first(ac);
391 }
392
393 struct partition_copy_context {
394         struct ldb_module *module;
395         struct partition_context *partition_context;
396         struct ldb_request *request;
397         struct ldb_dn *dn;
398 };
399
400 /*
401  * A special DN has been updated in the primary partition. Now propagate those
402  * changes to the remaining partitions.
403  *
404  * Note: that the operations are asynchronous and this function is called
405  *       from partition_copy_all_callback_handler in response to an async
406  *       callback.
407  */
408 static int partition_copy_all_callback_action(
409         struct ldb_module *module,
410         struct partition_context *ac,
411         struct ldb_request *req,
412         struct ldb_dn *dn)
413
414 {
415
416         unsigned int i;
417         struct partition_private_data *data =
418                 talloc_get_type(
419                         ldb_module_get_private(module),
420                         struct partition_private_data);
421         int search_ret;
422         struct ldb_result *res;
423         /* now fetch the resulting object, and then copy it to all the
424          * other partitions. We need this approach to cope with the
425          * partitions getting out of sync. If for example the
426          * @ATTRIBUTES object exists on one partition but not the
427          * others then just doing each of the partitions in turn will
428          * lead to an error
429          */
430         search_ret = dsdb_module_search_dn(module, ac, &res, dn, NULL, DSDB_FLAG_NEXT_MODULE, req);
431         if (search_ret != LDB_SUCCESS && search_ret != LDB_ERR_NO_SUCH_OBJECT) {
432                 return search_ret;
433         }
434
435         /* now delete the object in the other partitions, if required
436         */
437         if (search_ret == LDB_ERR_NO_SUCH_OBJECT) {
438                 for (i=0; data->partitions && data->partitions[i]; i++) {
439                         int pret;
440                         pret = dsdb_module_del(data->partitions[i]->module,
441                                                dn,
442                                                DSDB_FLAG_NEXT_MODULE,
443                                                req);
444                         if (pret != LDB_SUCCESS && pret != LDB_ERR_NO_SUCH_OBJECT) {
445                                 /* we should only get success or no
446                                    such object from the other partitions */
447                                 return pret;
448                         }
449                 }
450
451                 return ldb_module_done(req, NULL, NULL, LDB_SUCCESS);
452         }
453
454         /* now add/modify in the other partitions */
455         for (i=0; data->partitions && data->partitions[i]; i++) {
456                 struct ldb_message *modify_msg = NULL;
457                 int pret;
458                 unsigned int el_idx;
459
460                 pret = dsdb_module_add(data->partitions[i]->module,
461                                        res->msgs[0],
462                                        DSDB_FLAG_NEXT_MODULE,
463                                        req);
464                 if (pret == LDB_SUCCESS) {
465                         continue;
466                 }
467
468                 if (pret != LDB_ERR_ENTRY_ALREADY_EXISTS) {
469                         return pret;
470                 }
471
472                 modify_msg = ldb_msg_copy(req, res->msgs[0]);
473                 if (modify_msg == NULL) {
474                         return ldb_module_oom(module);
475                 }
476
477                 /*
478                  * mark all the message elements as
479                  * LDB_FLAG_MOD_REPLACE
480                  */
481                 for (el_idx=0;
482                      el_idx < modify_msg->num_elements;
483                      el_idx++) {
484                         modify_msg->elements[el_idx].flags
485                                 = LDB_FLAG_MOD_REPLACE;
486                 }
487
488                 if (req->operation == LDB_MODIFY) {
489                         const struct ldb_message *req_msg = req->op.mod.message;
490                         /*
491                          * mark elements to be removed, if these were
492                          * deleted entirely above we need to delete
493                          * them here too
494                          */
495                         for (el_idx=0; el_idx < req_msg->num_elements; el_idx++) {
496                                 if (LDB_FLAG_MOD_TYPE(req_msg->elements[el_idx].flags) == LDB_FLAG_MOD_DELETE
497                                     || ((LDB_FLAG_MOD_TYPE(req_msg->elements[el_idx].flags) == LDB_FLAG_MOD_REPLACE) &&
498                                         req_msg->elements[el_idx].num_values == 0)) {
499                                         if (ldb_msg_find_element(modify_msg,
500                                                                  req_msg->elements[el_idx].name) != NULL) {
501                                                 continue;
502                                         }
503                                         pret = ldb_msg_add_empty(
504                                                 modify_msg,
505                                                 req_msg->elements[el_idx].name,
506                                                 LDB_FLAG_MOD_REPLACE,
507                                                 NULL);
508                                         if (pret != LDB_SUCCESS) {
509                                                 return pret;
510                                         }
511                                 }
512                         }
513                 }
514
515                 pret = dsdb_module_modify(data->partitions[i]->module,
516                                           modify_msg,
517                                           DSDB_FLAG_NEXT_MODULE,
518                                           req);
519
520                 if (pret != LDB_SUCCESS) {
521                         return pret;
522                 }
523         }
524
525         return ldb_module_done(req, NULL, NULL, LDB_SUCCESS);
526 }
527
528
529 /*
530  * @brief call back function for the ldb operations on special DN's.
531  *
532  * As the LDB operations are async, and we wish to use the result
533  * the operations, a callback needs to be registered to process the results
534  * of the LDB operations.
535  *
536  * @param req the ldb request
537  * @param res the result of the operation
538  *
539  * @return the LDB_STATUS
540  */
541 static int partition_copy_all_callback_handler(
542         struct ldb_request *req,
543         struct ldb_reply *ares)
544 {
545         struct partition_copy_context *ac = NULL;
546
547         ac = talloc_get_type(
548                 req->context,
549                 struct partition_copy_context);
550
551         if (!ares) {
552                 return ldb_module_done(
553                         ac->request,
554                         NULL,
555                         NULL,
556                         LDB_ERR_OPERATIONS_ERROR);
557         }
558
559         /* pass on to the callback */
560         switch (ares->type) {
561         case LDB_REPLY_ENTRY:
562                 return ldb_module_send_entry(
563                         ac->request,
564                         ares->message,
565                         ares->controls);
566
567         case LDB_REPLY_REFERRAL:
568                 return ldb_module_send_referral(
569                         ac->request,
570                         ares->referral);
571
572         case LDB_REPLY_DONE: {
573                 int error = ares->error;
574                 if (error == LDB_SUCCESS) {
575                         error = partition_copy_all_callback_action(
576                                 ac->module,
577                                 ac->partition_context,
578                                 ac->request,
579                                 ac->dn);
580                 }
581                 return ldb_module_done(
582                         ac->request,
583                         ares->controls,
584                         ares->response,
585                         error);
586         }
587
588         default:
589                 /* Can't happen */
590                 return LDB_ERR_OPERATIONS_ERROR;
591         }
592 }
593
594 /**
595  * send an operation to the top partition, then copy the resulting
596  * object to all other partitions.
597  */
598 static int partition_copy_all(
599         struct ldb_module *module,
600         struct partition_context *partition_context,
601         struct ldb_request *req,
602         struct ldb_dn *dn)
603 {
604         struct ldb_request *new_req = NULL;
605         struct ldb_context *ldb = NULL;
606         struct partition_copy_context *context = NULL;
607
608         int ret;
609
610         ldb = ldb_module_get_ctx(module);
611
612         context = talloc_zero(req, struct partition_copy_context);
613         if (context == NULL) {
614                 return ldb_oom(ldb);
615         }
616         context->module = module;
617         context->request = req;
618         context->dn = dn;
619         context->partition_context = partition_context;
620
621         switch (req->operation) {
622         case LDB_ADD:
623                 ret = ldb_build_add_req(
624                         &new_req,
625                         ldb,
626                         req,
627                         req->op.add.message,
628                         req->controls,
629                         context,
630                         partition_copy_all_callback_handler,
631                         req);
632                 break;
633         case LDB_MODIFY:
634                 ret = ldb_build_mod_req(
635                         &new_req,
636                         ldb,
637                         req,
638                         req->op.mod.message,
639                         req->controls,
640                         context,
641                         partition_copy_all_callback_handler,
642                         req);
643                 break;
644         case LDB_DELETE:
645                 ret = ldb_build_del_req(
646                         &new_req,
647                         ldb,
648                         req,
649                         req->op.del.dn,
650                         req->controls,
651                         context,
652                         partition_copy_all_callback_handler,
653                         req);
654                 break;
655         case LDB_RENAME:
656                 ret = ldb_build_rename_req(
657                         &new_req,
658                         ldb,
659                         req,
660                         req->op.rename.olddn,
661                         req->op.rename.newdn,
662                         req->controls,
663                         context,
664                         partition_copy_all_callback_handler,
665                         req);
666                 break;
667         default:
668                 /*
669                  * Shouldn't happen.
670                  */
671                 ldb_debug(
672                         ldb,
673                         LDB_DEBUG_ERROR,
674                         "Unexpected operation type (%d)\n", req->operation);
675                 ret = LDB_ERR_OPERATIONS_ERROR;
676                 break;
677         }
678         if (ret != LDB_SUCCESS) {
679                 return ret;
680         }
681         return ldb_next_request(module, new_req);
682 }
683 /**
684  * Figure out which backend a request needs to be aimed at.  Some
685  * requests must be replicated to all backends
686  */
687 static int partition_replicate(struct ldb_module *module, struct ldb_request *req, struct ldb_dn *dn) 
688 {
689         struct partition_context *ac;
690         unsigned int i;
691         int ret;
692         struct dsdb_partition *partition;
693         struct partition_private_data *data = talloc_get_type(ldb_module_get_private(module),
694                                                               struct partition_private_data);
695
696         /* if we aren't initialised yet go further */
697         if (!data || !data->partitions) {
698                 return ldb_next_request(module, req);
699         }
700
701         if (ldb_dn_is_special(dn)) {
702                 /* Is this a special DN, we need to replicate to every backend? */
703                 for (i=0; data->replicate && data->replicate[i]; i++) {
704                         if (ldb_dn_compare(data->replicate[i], 
705                                            dn) == 0) {
706                                 
707                                 ac = partition_init_ctx(module, req);
708                                 if (!ac) {
709                                         return ldb_operr(ldb_module_get_ctx(module));
710                                 }
711                                 
712                                 return partition_copy_all(module, ac, req, dn);
713                         }
714                 }
715         }
716
717         /* Otherwise, we need to find the partition to fire it to */
718
719         /* Find partition */
720         partition = find_partition(data, dn, req);
721         if (!partition) {
722                 /*
723                  * if we haven't found a matching partition
724                  * pass the request to the main ldb
725                  *
726                  * TODO: we should maybe return an error here
727                  *       if it's not a special dn
728                  */
729
730                 return ldb_next_request(module, req);
731         }
732
733         ac = partition_init_ctx(module, req);
734         if (!ac) {
735                 return ldb_operr(ldb_module_get_ctx(module));
736         }
737
738         /* we need to add a control but we never touch the original request */
739         ret = partition_prep_request(ac, partition);
740         if (ret != LDB_SUCCESS) {
741                 return ret;
742         }
743
744         /* fire the first one */
745         return partition_call_first(ac);
746 }
747
748 /* search */
749 static int partition_search(struct ldb_module *module, struct ldb_request *req)
750 {
751         /* Find backend */
752         struct partition_private_data *data = talloc_get_type(ldb_module_get_private(module),
753                                                               struct partition_private_data);
754         struct partition_context *ac;
755         struct ldb_context *ldb;
756         struct loadparm_context *lp_ctx;
757
758         struct ldb_control *search_control = ldb_request_get_control(req, LDB_CONTROL_SEARCH_OPTIONS_OID);
759         struct ldb_control *domain_scope_control = ldb_request_get_control(req, LDB_CONTROL_DOMAIN_SCOPE_OID);
760         struct ldb_control *no_gc_control = ldb_request_get_control(req, DSDB_CONTROL_NO_GLOBAL_CATALOG);
761         
762         struct ldb_search_options_control *search_options = NULL;
763         struct dsdb_partition *p;
764         unsigned int i, j;
765         int ret;
766         bool domain_scope = false, phantom_root = false;
767
768         p = find_partition(data, NULL, req);
769         if (p != NULL) {
770                 /* the caller specified what partition they want the
771                  * search - just pass it on
772                  */
773                 return ldb_next_request(p->module, req);
774         }
775
776         /* Get back the search options from the search control, and mark it as
777          * non-critical (to make backends and also dcpromo happy).
778          */
779         if (search_control) {
780                 search_options = talloc_get_type(search_control->data, struct ldb_search_options_control);
781                 search_control->critical = 0;
782
783         }
784
785         /* if we aren't initialised yet go further */
786         if (!data || !data->partitions) {
787                 return ldb_next_request(module, req);
788         }
789
790         /* Special DNs without specified partition should go further */
791         if (ldb_dn_is_special(req->op.search.base)) {
792                 return ldb_next_request(module, req);
793         }
794
795         /* Locate the options */
796         domain_scope = (search_options
797                 && (search_options->search_options & LDB_SEARCH_OPTION_DOMAIN_SCOPE))
798                 || domain_scope_control;
799         phantom_root = search_options
800                 && (search_options->search_options & LDB_SEARCH_OPTION_PHANTOM_ROOT);
801
802         /* Remove handled options from the search control flag */
803         if (search_options) {
804                 search_options->search_options = search_options->search_options
805                         & ~LDB_SEARCH_OPTION_DOMAIN_SCOPE
806                         & ~LDB_SEARCH_OPTION_PHANTOM_ROOT;
807         }
808
809         ac = partition_init_ctx(module, req);
810         if (!ac) {
811                 return ldb_operr(ldb_module_get_ctx(module));
812         }
813
814         ldb = ldb_module_get_ctx(ac->module);
815         lp_ctx = talloc_get_type(ldb_get_opaque(ldb, "loadparm"),
816                                                 struct loadparm_context);
817
818         /* Search from the base DN */
819         if (ldb_dn_is_null(req->op.search.base)) {
820                 if (!phantom_root) {
821                         return ldb_error(ldb, LDB_ERR_NO_SUCH_OBJECT, "empty base DN");
822                 }
823                 return partition_send_all(module, ac, req);
824         }
825
826         for (i=0; data->partitions[i]; i++) {
827                 bool match = false, stop = false;
828
829                 if (data->partitions[i]->partial_replica && no_gc_control != NULL) {
830                         if (ldb_dn_compare_base(data->partitions[i]->ctrl->dn,
831                                                 req->op.search.base) == 0) {
832                                 /* base DN is in a partial replica
833                                    with the NO_GLOBAL_CATALOG
834                                    control. This partition is invisible */
835                                 /* DEBUG(0,("DENYING NON-GC OP: %s\n", ldb_module_call_chain(req, req))); */
836                                 continue;
837                         }
838                 }
839
840                 if (phantom_root) {
841                         /* Phantom root: Find all partitions under the
842                          * search base. We match if:
843                          *
844                          * 1) the DN we are looking for exactly matches a
845                          *    certain partition and always stop
846                          * 2) the DN we are looking for is a parent of certain
847                          *    partitions and it isn't a scope base search
848                          * 3) the DN we are looking for is a child of a certain
849                          *    partition and always stop
850                          *    - we don't need to go any further up in the
851                          *    hierarchy!
852                          */
853                         if (ldb_dn_compare(data->partitions[i]->ctrl->dn,
854                                            req->op.search.base) == 0) {
855                                 match = true;
856                                 stop = true;
857                         }
858                         if (!match &&
859                             (ldb_dn_compare_base(req->op.search.base,
860                                                  data->partitions[i]->ctrl->dn) == 0 &&
861                              req->op.search.scope != LDB_SCOPE_BASE)) {
862                                 match = true;
863                         }
864                         if (!match &&
865                             ldb_dn_compare_base(data->partitions[i]->ctrl->dn,
866                                                 req->op.search.base) == 0) {
867                                 match = true;
868                                 stop = true; /* note that this relies on partition ordering */
869                         }
870                 } else {
871                         /* Domain scope: Find all partitions under the search
872                          * base.
873                          *
874                          * We generate referral candidates if we haven't
875                          * specified the domain scope control, haven't a base
876                          * search* scope and the DN we are looking for is a real
877                          * predecessor of certain partitions. When a new
878                          * referral candidate is nearer to the DN than an
879                          * existing one delete the latter (we want to have only
880                          * the closest ones). When we checked this for all
881                          * candidates we have the final referrals.
882                          *
883                          * We match if the DN we are looking for is a child of
884                          * a certain partition or the partition
885                          * DN itself - we don't need to go any further
886                          * up in the hierarchy!
887                          */
888                         if ((!domain_scope) &&
889                             (req->op.search.scope != LDB_SCOPE_BASE) &&
890                             (ldb_dn_compare_base(req->op.search.base,
891                                                  data->partitions[i]->ctrl->dn) == 0) &&
892                             (ldb_dn_compare(req->op.search.base,
893                                             data->partitions[i]->ctrl->dn) != 0)) {
894                                 const char *scheme = ldb_get_opaque(
895                                     ldb, LDAP_REFERRAL_SCHEME_OPAQUE);
896                                 char *ref = talloc_asprintf(
897                                         ac,
898                                         "%s://%s/%s%s",
899                                         scheme == NULL ? "ldap" : scheme,
900                                         lpcfg_dnsdomain(lp_ctx),
901                                         ldb_dn_get_linearized(
902                                             data->partitions[i]->ctrl->dn),
903                                         req->op.search.scope ==
904                                             LDB_SCOPE_ONELEVEL ? "??base" : "");
905
906                                 if (ref == NULL) {
907                                         return ldb_oom(ldb);
908                                 }
909
910                                 /* Initialise the referrals list */
911                                 if (ac->referrals == NULL) {
912                                         char **l = str_list_make_empty(ac);
913                                         ac->referrals = discard_const_p(const char *, l);
914                                         if (ac->referrals == NULL) {
915                                                 return ldb_oom(ldb);
916                                         }
917                                 }
918
919                                 /* Check if the new referral candidate is
920                                  * closer to the base DN than already
921                                  * saved ones and delete the latters */
922                                 j = 0;
923                                 while (ac->referrals[j] != NULL) {
924                                         if (strstr(ac->referrals[j],
925                                                    ldb_dn_get_linearized(data->partitions[i]->ctrl->dn)) != NULL) {
926                                                 str_list_remove(ac->referrals,
927                                                                 ac->referrals[j]);
928                                         } else {
929                                                 ++j;
930                                         }
931                                 }
932
933                                 /* Add our new candidate */
934                                 ac->referrals = str_list_add(ac->referrals, ref);
935
936                                 talloc_free(ref);
937
938                                 if (ac->referrals == NULL) {
939                                         return ldb_oom(ldb);
940                                 }
941                         }
942                         if (ldb_dn_compare_base(data->partitions[i]->ctrl->dn, req->op.search.base) == 0) {
943                                 match = true;
944                                 stop = true; /* note that this relies on partition ordering */
945                         }
946                 }
947
948                 if (match) {
949                         ret = partition_prep_request(ac, data->partitions[i]);
950                         if (ret != LDB_SUCCESS) {
951                                 return ret;
952                         }
953                 }
954
955                 if (stop) break;
956         }
957
958         /* Perhaps we didn't match any partitions. Try the main partition */
959         if (ac->num_requests == 0) {
960                 talloc_free(ac);
961                 return ldb_next_request(module, req);
962         }
963
964         /* fire the first one */
965         return partition_call_first(ac);
966 }
967
968 /* add */
969 static int partition_add(struct ldb_module *module, struct ldb_request *req)
970 {
971         return partition_replicate(module, req, req->op.add.message->dn);
972 }
973
974 /* modify */
975 static int partition_modify(struct ldb_module *module, struct ldb_request *req)
976 {
977         return partition_replicate(module, req, req->op.mod.message->dn);
978 }
979
980 /* delete */
981 static int partition_delete(struct ldb_module *module, struct ldb_request *req)
982 {
983         return partition_replicate(module, req, req->op.del.dn);
984 }
985
986 /* rename */
987 static int partition_rename(struct ldb_module *module, struct ldb_request *req)
988 {
989         /* Find backend */
990         struct dsdb_partition *backend, *backend2;
991         
992         struct partition_private_data *data = talloc_get_type(ldb_module_get_private(module),
993                                                               struct partition_private_data);
994
995         /* Skip the lot if 'data' isn't here yet (initialisation) */
996         if (!data) {
997                 return ldb_operr(ldb_module_get_ctx(module));
998         }
999
1000         backend = find_partition(data, req->op.rename.olddn, req);
1001         backend2 = find_partition(data, req->op.rename.newdn, req);
1002
1003         if ((backend && !backend2) || (!backend && backend2)) {
1004                 return LDB_ERR_AFFECTS_MULTIPLE_DSAS;
1005         }
1006
1007         if (backend != backend2) {
1008                 ldb_asprintf_errstring(ldb_module_get_ctx(module), 
1009                                        "Cannot rename from %s in %s to %s in %s: %s",
1010                                        ldb_dn_get_linearized(req->op.rename.olddn),
1011                                        ldb_dn_get_linearized(backend->ctrl->dn),
1012                                        ldb_dn_get_linearized(req->op.rename.newdn),
1013                                        ldb_dn_get_linearized(backend2->ctrl->dn),
1014                                        ldb_strerror(LDB_ERR_AFFECTS_MULTIPLE_DSAS));
1015                 return LDB_ERR_AFFECTS_MULTIPLE_DSAS;
1016         }
1017
1018         return partition_replicate(module, req, req->op.rename.olddn);
1019 }
1020
1021 /* start a transaction */
1022 int partition_start_trans(struct ldb_module *module)
1023 {
1024         int i = 0;
1025         int ret = 0;
1026         struct partition_private_data *data = talloc_get_type(ldb_module_get_private(module),
1027                                                               struct partition_private_data);
1028         /* Look at base DN */
1029         /* Figure out which partition it is under */
1030         /* Skip the lot if 'data' isn't here yet (initialization) */
1031         if (ldb_module_flags(ldb_module_get_ctx(module)) & LDB_FLG_ENABLE_TRACING) {
1032                 ldb_debug(ldb_module_get_ctx(module), LDB_DEBUG_TRACE, "partition_start_trans() -> (metadata partition)");
1033         }
1034
1035         /*
1036          * We start a transaction on metadata.tdb first and end it last in
1037          * end_trans. This makes locking semantics follow TDB rather than MDB,
1038          * and effectively locks all partitions at once.
1039          * Detail:
1040          * Samba AD is special in that the partitions module (this file)
1041          * combines multiple independently locked databases into one overall
1042          * transaction. Changes across multiple partition DBs in a single
1043          * transaction must ALL be either visible or invisible.
1044          * The way this is achieved is by taking out a write lock on
1045          * metadata.tdb at the start of prepare_commit, while unlocking it at
1046          * the end of end_trans. This is matched by read_lock, ensuring it
1047          * can't progress until that write lock is released.
1048          *
1049          * metadata.tdb needs to be a TDB file because MDB uses independent
1050          * locks, which means a read lock and a write lock can be held at the
1051          * same time, whereas in TDB, the two locks block each other. The TDB
1052          * behaviour is required to implement the functionality described
1053          * above.
1054          *
1055          * An important additional detail here is that if prepare_commit is
1056          * called on a TDB without any changes being made, no write lock is
1057          * taken. We address this by storing a sequence number in metadata.tdb
1058          * which is updated every time a replicated attribute is modified.
1059          * The possibility of a few unreplicated attributes being out of date
1060          * turns out not to be a problem.
1061          * For this reason, a lock on sam.ldb (which is a TDB) won't achieve
1062          * the same end as locking metadata.tdb, unless we made a modification
1063          * to the @ records found there before every prepare_commit.
1064          */
1065         ret = partition_metadata_start_trans(module);
1066         if (ret != LDB_SUCCESS) {
1067                 return ret;
1068         }
1069
1070         ret = ldb_next_start_trans(module);
1071         if (ret != LDB_SUCCESS) {
1072                 partition_metadata_del_trans(module);
1073                 return ret;
1074         }
1075
1076         ret = partition_reload_if_required(module, data, NULL);
1077         if (ret != LDB_SUCCESS) {
1078                 ldb_next_del_trans(module);
1079                 partition_metadata_del_trans(module);
1080                 return ret;
1081         }
1082
1083         /*
1084          * The following per partition locks are required mostly because TDB
1085          * and MDB require locks before read and write ops are permitted.
1086          */
1087         for (i=0; data && data->partitions && data->partitions[i]; i++) {
1088                 if ((module && ldb_module_flags(ldb_module_get_ctx(module)) & LDB_FLG_ENABLE_TRACING)) {
1089                         ldb_debug(ldb_module_get_ctx(module), LDB_DEBUG_TRACE, "partition_start_trans() -> %s",
1090                                   ldb_dn_get_linearized(data->partitions[i]->ctrl->dn));
1091                 }
1092                 ret = ldb_next_start_trans(data->partitions[i]->module);
1093                 if (ret != LDB_SUCCESS) {
1094                         /* Back it out, if it fails on one */
1095                         for (i--; i >= 0; i--) {
1096                                 ldb_next_del_trans(data->partitions[i]->module);
1097                         }
1098                         ldb_next_del_trans(module);
1099                         partition_metadata_del_trans(module);
1100                         return ret;
1101                 }
1102         }
1103
1104         data->in_transaction++;
1105
1106         return LDB_SUCCESS;
1107 }
1108
1109 /* prepare for a commit */
1110 int partition_prepare_commit(struct ldb_module *module)
1111 {
1112         unsigned int i;
1113         struct partition_private_data *data = talloc_get_type(ldb_module_get_private(module),
1114                                                               struct partition_private_data);
1115         int ret;
1116
1117         /*
1118          * Order of prepare_commit calls must match that in
1119          * partition_start_trans. See comment in that function for detail.
1120          */
1121         ret = partition_metadata_prepare_commit(module);
1122         if (ret != LDB_SUCCESS) {
1123                 return ret;
1124         }
1125
1126         ret = ldb_next_prepare_commit(module);
1127         if (ret != LDB_SUCCESS) {
1128                 return ret;
1129         }
1130
1131         for (i=0; data && data->partitions && data->partitions[i]; i++) {
1132                 if ((module && ldb_module_flags(ldb_module_get_ctx(module)) & LDB_FLG_ENABLE_TRACING)) {
1133                         ldb_debug(ldb_module_get_ctx(module), LDB_DEBUG_TRACE, "partition_prepare_commit() -> %s",
1134                                   ldb_dn_get_linearized(data->partitions[i]->ctrl->dn));
1135                 }
1136                 ret = ldb_next_prepare_commit(data->partitions[i]->module);
1137                 if (ret != LDB_SUCCESS) {
1138                         ldb_asprintf_errstring(ldb_module_get_ctx(module), "prepare_commit error on %s: %s",
1139                                                ldb_dn_get_linearized(data->partitions[i]->ctrl->dn),
1140                                                ldb_errstring(ldb_module_get_ctx(module)));
1141                         return ret;
1142                 }
1143         }
1144
1145         if ((module && ldb_module_flags(ldb_module_get_ctx(module)) & LDB_FLG_ENABLE_TRACING)) {
1146                 ldb_debug(ldb_module_get_ctx(module), LDB_DEBUG_TRACE, "partition_prepare_commit() -> (metadata partition)");
1147         }
1148
1149         return LDB_SUCCESS;
1150 }
1151
1152
1153 /* end a transaction */
1154 int partition_end_trans(struct ldb_module *module)
1155 {
1156         int ret, ret2;
1157         int i;
1158         struct ldb_context *ldb = ldb_module_get_ctx(module);
1159         struct partition_private_data *data = talloc_get_type(ldb_module_get_private(module),
1160                                                               struct partition_private_data);
1161         bool trace = module && ldb_module_flags(ldb) & LDB_FLG_ENABLE_TRACING;
1162
1163         ret = LDB_SUCCESS;
1164
1165         if (data->in_transaction == 0) {
1166                 DEBUG(0,("partition end transaction mismatch\n"));
1167                 ret = LDB_ERR_OPERATIONS_ERROR;
1168         } else {
1169                 data->in_transaction--;
1170         }
1171
1172         /*
1173          * Order of end_trans calls must be the reverse of that in
1174          * partition_start_trans. See comment in that function for detail.
1175          */
1176         if (data && data->partitions) {
1177                 /* Just counting the partitions */
1178                 for (i=0; data->partitions[i]; i++) {}
1179
1180                 /* now walk them backwards */
1181                 for (i--; i>=0; i--) {
1182                         struct dsdb_partition *p = data->partitions[i];
1183                         if (trace) {
1184                                 ldb_debug(ldb,
1185                                           LDB_DEBUG_TRACE,
1186                                           "partition_end_trans() -> %s",
1187                                           ldb_dn_get_linearized(p->ctrl->dn));
1188                         }
1189                         ret2 = ldb_next_end_trans(p->module);
1190                         if (ret2 != LDB_SUCCESS) {
1191                                 ldb_asprintf_errstring(ldb,
1192                                         "end_trans error on %s: %s",
1193                                         ldb_dn_get_linearized(p->ctrl->dn),
1194                                         ldb_errstring(ldb));
1195                                 ret = ret2;
1196                         }
1197                 }
1198         }
1199
1200         if (trace) {
1201                 ldb_debug(ldb_module_get_ctx(module), LDB_DEBUG_TRACE, "partition_end_trans() -> (metadata partition)");
1202         }
1203         ret2 = ldb_next_end_trans(module);
1204         if (ret2 != LDB_SUCCESS) {
1205                 ret = ret2;
1206         }
1207
1208         ret2 = partition_metadata_end_trans(module);
1209         if (ret2 != LDB_SUCCESS) {
1210                 ret = ret2;
1211         }
1212
1213         return ret;
1214 }
1215
1216 /* delete a transaction */
1217 int partition_del_trans(struct ldb_module *module)
1218 {
1219         int ret, final_ret = LDB_SUCCESS;
1220         int i;
1221         struct ldb_context *ldb = ldb_module_get_ctx(module);
1222         struct partition_private_data *data = talloc_get_type(ldb_module_get_private(module),
1223                                                               struct partition_private_data);
1224         bool trace = module && ldb_module_flags(ldb) & LDB_FLG_ENABLE_TRACING;
1225
1226         if (data == NULL) {
1227                 DEBUG(0,("partition delete transaction with no private data\n"));
1228                 return ldb_operr(ldb);
1229         }
1230
1231         /*
1232          * Order of del_trans calls must be the reverse of that in
1233          * partition_start_trans. See comment in that function for detail.
1234          */
1235         if (data->partitions) {
1236                 /* Just counting the partitions */
1237                 for (i=0; data->partitions[i]; i++) {}
1238
1239                 /* now walk them backwards */
1240                 for (i--; i>=0; i--) {
1241                         struct dsdb_partition *p = data->partitions[i];
1242                         if (trace) {
1243                                 ldb_debug(ldb,
1244                                           LDB_DEBUG_TRACE,
1245                                           "partition_del_trans() -> %s",
1246                                           ldb_dn_get_linearized(p->ctrl->dn));
1247                         }
1248                         ret = ldb_next_del_trans(p->module);
1249                         if (ret != LDB_SUCCESS) {
1250                                 ldb_asprintf_errstring(ldb,
1251                                         "del_trans error on %s: %s",
1252                                         ldb_dn_get_linearized(p->ctrl->dn),
1253                                         ldb_errstring(ldb));
1254                                 final_ret = ret;
1255                         }
1256                 }
1257         }
1258
1259         if (trace) {
1260                 ldb_debug(ldb_module_get_ctx(module), LDB_DEBUG_TRACE, "partition_del_trans() -> (metadata partition)");
1261         }
1262         ret = ldb_next_del_trans(module);
1263         if (ret != LDB_SUCCESS) {
1264                 final_ret = ret;
1265         }
1266
1267         ret = partition_metadata_del_trans(module);
1268         if (ret != LDB_SUCCESS) {
1269                 final_ret = ret;
1270         }
1271
1272         if (data->in_transaction == 0) {
1273                 DEBUG(0,("partition del transaction mismatch\n"));
1274                 return ldb_operr(ldb_module_get_ctx(module));
1275         }
1276         data->in_transaction--;
1277
1278         return final_ret;
1279 }
1280
1281 int partition_primary_sequence_number(struct ldb_module *module, TALLOC_CTX *mem_ctx, 
1282                                       uint64_t *seq_number,
1283                                       struct ldb_request *parent)
1284 {
1285         int ret;
1286         struct ldb_result *res;
1287         struct ldb_seqnum_request *tseq;
1288         struct ldb_seqnum_result *seqr;
1289
1290         tseq = talloc_zero(mem_ctx, struct ldb_seqnum_request);
1291         if (tseq == NULL) {
1292                 return ldb_oom(ldb_module_get_ctx(module));
1293         }
1294         tseq->type = LDB_SEQ_HIGHEST_SEQ;
1295         
1296         ret = dsdb_module_extended(module, tseq, &res,
1297                                    LDB_EXTENDED_SEQUENCE_NUMBER,
1298                                    tseq,
1299                                    DSDB_FLAG_NEXT_MODULE,
1300                                    parent);
1301         if (ret != LDB_SUCCESS) {
1302                 talloc_free(tseq);
1303                 return ret;
1304         }
1305         
1306         seqr = talloc_get_type_abort(res->extended->data,
1307                                      struct ldb_seqnum_result);
1308         if (seqr->flags & LDB_SEQ_TIMESTAMP_SEQUENCE) {
1309                 talloc_free(res);
1310                 return ldb_module_error(module, LDB_ERR_OPERATIONS_ERROR,
1311                         "Primary backend in partition module returned a timestamp based seq");
1312         }
1313
1314         *seq_number = seqr->seq_num;
1315         talloc_free(tseq);
1316         return LDB_SUCCESS;
1317 }
1318
1319
1320 /*
1321  * Older version of sequence number as sum of sequence numbers for each partition
1322  */
1323 int partition_sequence_number_from_partitions(struct ldb_module *module,
1324                                               uint64_t *seqr)
1325 {
1326         int ret;
1327         unsigned int i;
1328         uint64_t seq_number = 0;
1329         struct partition_private_data *data = talloc_get_type(ldb_module_get_private(module),
1330                                                               struct partition_private_data);
1331
1332         ret = partition_primary_sequence_number(module, data, &seq_number, NULL);
1333         if (ret != LDB_SUCCESS) {
1334                 return ret;
1335         }
1336         
1337         /* Skip the lot if 'data' isn't here yet (initialisation) */
1338         for (i=0; data && data->partitions && data->partitions[i]; i++) {
1339                 struct ldb_seqnum_request *tseq;
1340                 struct ldb_seqnum_result *tseqr;
1341                 struct ldb_request *treq;
1342                 struct ldb_result *res = talloc_zero(data, struct ldb_result);
1343                 if (res == NULL) {
1344                         return ldb_oom(ldb_module_get_ctx(module));
1345                 }
1346                 tseq = talloc_zero(res, struct ldb_seqnum_request);
1347                 if (tseq == NULL) {
1348                         talloc_free(res);
1349                         return ldb_oom(ldb_module_get_ctx(module));
1350                 }
1351                 tseq->type = LDB_SEQ_HIGHEST_SEQ;
1352                 
1353                 ret = ldb_build_extended_req(&treq, ldb_module_get_ctx(module), res,
1354                                              LDB_EXTENDED_SEQUENCE_NUMBER,
1355                                              tseq,
1356                                              NULL,
1357                                              res,
1358                                              ldb_extended_default_callback,
1359                                              NULL);
1360                 LDB_REQ_SET_LOCATION(treq);
1361                 if (ret != LDB_SUCCESS) {
1362                         talloc_free(res);
1363                         return ret;
1364                 }
1365                 
1366                 ret = partition_request(data->partitions[i]->module, treq);
1367                 if (ret != LDB_SUCCESS) {
1368                         talloc_free(res);
1369                         return ret;
1370                 }
1371                 ret = ldb_wait(treq->handle, LDB_WAIT_ALL);
1372                 if (ret != LDB_SUCCESS) {
1373                         talloc_free(res);
1374                         return ret;
1375                 }
1376                 tseqr = talloc_get_type(res->extended->data,
1377                                         struct ldb_seqnum_result);
1378                 seq_number += tseqr->seq_num;
1379                 talloc_free(res);
1380         }
1381
1382         *seqr = seq_number;
1383         return LDB_SUCCESS;
1384 }
1385
1386
1387 /*
1388  * Newer version of sequence number using metadata tdb
1389  */
1390 static int partition_sequence_number(struct ldb_module *module, struct ldb_request *req)
1391 {
1392         struct ldb_extended *ext;
1393         struct ldb_seqnum_request *seq;
1394         struct ldb_seqnum_result *seqr;
1395         uint64_t seq_number;
1396         int ret;
1397
1398         seq = talloc_get_type_abort(req->op.extended.data, struct ldb_seqnum_request);
1399         switch (seq->type) {
1400         case LDB_SEQ_NEXT:
1401                 ret = partition_metadata_sequence_number_increment(module, &seq_number);
1402                 if (ret != LDB_SUCCESS) {
1403                         return ret;
1404                 }
1405                 break;
1406
1407         case LDB_SEQ_HIGHEST_SEQ:
1408                 ret = partition_metadata_sequence_number(module, &seq_number);
1409                 if (ret != LDB_SUCCESS) {
1410                         return ret;
1411                 }
1412                 break;
1413
1414         case LDB_SEQ_HIGHEST_TIMESTAMP:
1415                 return ldb_module_error(module, LDB_ERR_OPERATIONS_ERROR,
1416                                         "LDB_SEQ_HIGHEST_TIMESTAMP not supported");
1417         }
1418
1419         ext = talloc_zero(req, struct ldb_extended);
1420         if (!ext) {
1421                 return ldb_module_oom(module);
1422         }
1423         seqr = talloc_zero(ext, struct ldb_seqnum_result);
1424         if (seqr == NULL) {
1425                 talloc_free(ext);
1426                 return ldb_module_oom(module);
1427         }
1428         ext->oid = LDB_EXTENDED_SEQUENCE_NUMBER;
1429         ext->data = seqr;
1430
1431         seqr->seq_num = seq_number;
1432         seqr->flags |= LDB_SEQ_GLOBAL_SEQUENCE;
1433
1434         /* send request done */
1435         return ldb_module_done(req, NULL, ext, LDB_SUCCESS);
1436 }
1437
1438 /* lock all the backends */
1439 int partition_read_lock(struct ldb_module *module)
1440 {
1441         int i = 0;
1442         int ret = 0;
1443         int ret2 = 0;
1444         struct ldb_context *ldb = ldb_module_get_ctx(module);
1445         struct partition_private_data *data = \
1446                 talloc_get_type(ldb_module_get_private(module),
1447                                 struct partition_private_data);
1448
1449         if (ldb_module_flags(ldb) & LDB_FLG_ENABLE_TRACING) {
1450                 ldb_debug(ldb, LDB_DEBUG_TRACE,
1451                           "partition_read_lock() -> (metadata partition)");
1452         }
1453
1454         /*
1455          * It is important to only do this for LOCK because:
1456          * - we don't want to unlock what we did not lock
1457          *
1458          * - we don't want to make a new lock on the sam.ldb
1459          *   (triggered inside this routine due to the seq num check)
1460          *   during an unlock phase as that will violate the lock
1461          *   ordering
1462          */
1463
1464         if (data == NULL) {
1465                 TALLOC_CTX *mem_ctx = talloc_new(module);
1466
1467                 data = talloc_zero(mem_ctx, struct partition_private_data);
1468                 if (data == NULL) {
1469                         talloc_free(mem_ctx);
1470                         return ldb_operr(ldb);
1471                 }
1472
1473                 /*
1474                  * When used from Samba4, this message is set by the
1475                  * samba4 module, as a fixed value not read from the
1476                  * DB.  This avoids listing modules in the DB
1477                  */
1478                 data->forced_module_msg = talloc_get_type(
1479                         ldb_get_opaque(ldb,
1480                                        DSDB_OPAQUE_PARTITION_MODULE_MSG_OPAQUE_NAME),
1481                         struct ldb_message);
1482
1483                 ldb_module_set_private(module, talloc_steal(module,
1484                                                             data));
1485                 talloc_free(mem_ctx);
1486         }
1487
1488         /*
1489          * This will lock sam.ldb and will also call event loops,
1490          * so we do it before we get the whole db lock.
1491          */
1492         ret = partition_reload_if_required(module, data, NULL);
1493         if (ret != LDB_SUCCESS) {
1494                 return ret;
1495         }
1496
1497         /*
1498          * Order of read_lock calls must match that in partition_start_trans.
1499          * See comment in that function for detail.
1500          */
1501         ret = partition_metadata_read_lock(module);
1502         if (ret != LDB_SUCCESS) {
1503                 goto failed;
1504         }
1505
1506         /*
1507          * The top level DB (sam.ldb) lock is not enough to block another
1508          * process in prepare_commit(), because if nothing was changed in the
1509          * specific backend, then prepare_commit() is a no-op. Therefore the
1510          * metadata.tdb lock is taken out above, as it is the best we can do
1511          * right now.
1512          */
1513         ret = ldb_next_read_lock(module);
1514         if (ret != LDB_SUCCESS) {
1515                 ldb_debug_set(ldb,
1516                               LDB_DEBUG_FATAL,
1517                               "Failed to lock db: %s / %s for metadata partition",
1518                               ldb_errstring(ldb),
1519                               ldb_strerror(ret));
1520
1521                 return ret;
1522         }
1523
1524         /*
1525          * The following per partition locks are required mostly because TDB
1526          * and MDB require locks before reads are permitted.
1527          */
1528         for (i=0; data && data->partitions && data->partitions[i]; i++) {
1529                 if ((module && ldb_module_flags(ldb) & LDB_FLG_ENABLE_TRACING)) {
1530                         ldb_debug(ldb, LDB_DEBUG_TRACE,
1531                                   "partition_read_lock() -> %s",
1532                                   ldb_dn_get_linearized(
1533                                           data->partitions[i]->ctrl->dn));
1534                 }
1535                 ret = ldb_next_read_lock(data->partitions[i]->module);
1536                 if (ret == LDB_SUCCESS) {
1537                         continue;
1538                 }
1539
1540                 ldb_debug_set(ldb,
1541                               LDB_DEBUG_FATAL,
1542                               "Failed to lock db: %s / %s for %s",
1543                               ldb_errstring(ldb),
1544                               ldb_strerror(ret),
1545                               ldb_dn_get_linearized(
1546                                       data->partitions[i]->ctrl->dn));
1547
1548                 goto failed;
1549         }
1550
1551         return LDB_SUCCESS;
1552
1553 failed:
1554         /* Back it out, if it fails on one */
1555         for (i--; i >= 0; i--) {
1556                 ret2 = ldb_next_read_unlock(data->partitions[i]->module);
1557                 if (ret2 != LDB_SUCCESS) {
1558                         ldb_debug(ldb,
1559                                   LDB_DEBUG_FATAL,
1560                                   "Failed to unlock db: %s / %s",
1561                                   ldb_errstring(ldb),
1562                                   ldb_strerror(ret2));
1563                 }
1564         }
1565         ret2 = ldb_next_read_unlock(module);
1566         if (ret2 != LDB_SUCCESS) {
1567                 ldb_debug(ldb,
1568                           LDB_DEBUG_FATAL,
1569                           "Failed to unlock db: %s / %s",
1570                           ldb_errstring(ldb),
1571                           ldb_strerror(ret2));
1572         }
1573         return ret;
1574 }
1575
1576 /* unlock all the backends */
1577 int partition_read_unlock(struct ldb_module *module)
1578 {
1579         int i;
1580         int ret = LDB_SUCCESS;
1581         int ret2;
1582         struct ldb_context *ldb = ldb_module_get_ctx(module);
1583         struct partition_private_data *data = \
1584                 talloc_get_type(ldb_module_get_private(module),
1585                                 struct partition_private_data);
1586         bool trace = module && ldb_module_flags(ldb) & LDB_FLG_ENABLE_TRACING;
1587
1588         /*
1589          * Order of read_unlock calls must be the reverse of that in
1590          * partition_start_trans. See comment in that function for detail.
1591          */
1592         if (data && data->partitions) {
1593                 /* Just counting the partitions */
1594                 for (i=0; data->partitions[i]; i++) {}
1595
1596                 /* now walk them backwards */
1597                 for (i--; i>=0; i--) {
1598                         struct dsdb_partition *p = data->partitions[i];
1599                         if (trace) {
1600                                 ldb_debug(ldb, LDB_DEBUG_TRACE,
1601                                           "partition_read_unlock() -> %s",
1602                                           ldb_dn_get_linearized(p->ctrl->dn));
1603                         }
1604                         ret2 = ldb_next_read_unlock(p->module);
1605                         if (ret2 != LDB_SUCCESS) {
1606                                 ldb_debug_set(ldb,
1607                                            LDB_DEBUG_FATAL,
1608                                            "Failed to lock db: %s / %s for %s",
1609                                            ldb_errstring(ldb),
1610                                            ldb_strerror(ret2),
1611                                            ldb_dn_get_linearized(p->ctrl->dn));
1612
1613                                 /*
1614                                  * Don't overwrite the original failure code
1615                                  * if there was one
1616                                  */
1617                                 if (ret == LDB_SUCCESS) {
1618                                         ret = ret2;
1619                                 }
1620                         }
1621                 }
1622         }
1623
1624         if (trace) {
1625                 ldb_debug(ldb, LDB_DEBUG_TRACE,
1626                           "partition_read_unlock() -> (metadata partition)");
1627         }
1628
1629         ret2 = ldb_next_read_unlock(module);
1630         if (ret2 != LDB_SUCCESS) {
1631                 ldb_debug_set(ldb,
1632                               LDB_DEBUG_FATAL,
1633                               "Failed to unlock db: %s / %s for metadata partition",
1634                               ldb_errstring(ldb),
1635                               ldb_strerror(ret2));
1636
1637                 /*
1638                  * Don't overwrite the original failure code
1639                  * if there was one
1640                  */
1641                 if (ret == LDB_SUCCESS) {
1642                         ret = ret2;
1643                 }
1644         }
1645
1646         ret2 = partition_metadata_read_unlock(module);
1647
1648         /*
1649          * Don't overwrite the original failure code
1650          * if there was one
1651          */
1652         if (ret == LDB_SUCCESS) {
1653                 ret = ret2;
1654         }
1655
1656         return ret;
1657 }
1658
1659 /* extended */
1660 static int partition_extended(struct ldb_module *module, struct ldb_request *req)
1661 {
1662         struct partition_private_data *data = talloc_get_type(ldb_module_get_private(module),
1663                                                               struct partition_private_data);
1664         struct partition_context *ac;
1665         int ret;
1666
1667         /* if we aren't initialised yet go further */
1668         if (!data) {
1669                 return ldb_next_request(module, req);
1670         }
1671
1672         if (strcmp(req->op.extended.oid, DSDB_EXTENDED_SCHEMA_UPDATE_NOW_OID) == 0) {
1673                 /* Update the metadata.tdb to increment the schema version if needed*/
1674                 DEBUG(10, ("Incrementing the sequence_number after schema_update_now\n"));
1675                 ret = partition_metadata_inc_schema_sequence(module);
1676                 return ldb_module_done(req, NULL, NULL, ret);
1677         }
1678         
1679         if (strcmp(req->op.extended.oid, LDB_EXTENDED_SEQUENCE_NUMBER) == 0) {
1680                 return partition_sequence_number(module, req);
1681         }
1682
1683         if (strcmp(req->op.extended.oid, DSDB_EXTENDED_CREATE_PARTITION_OID) == 0) {
1684                 return partition_create(module, req);
1685         }
1686
1687         /* 
1688          * as the extended operation has no dn
1689          * we need to send it to all partitions
1690          */
1691
1692         ac = partition_init_ctx(module, req);
1693         if (!ac) {
1694                 return ldb_operr(ldb_module_get_ctx(module));
1695         }
1696
1697         return partition_send_all(module, ac, req);
1698 }
1699
1700 static const struct ldb_module_ops ldb_partition_module_ops = {
1701         .name              = "partition",
1702         .init_context      = partition_init,
1703         .search            = partition_search,
1704         .add               = partition_add,
1705         .modify            = partition_modify,
1706         .del               = partition_delete,
1707         .rename            = partition_rename,
1708         .extended          = partition_extended,
1709         .start_transaction = partition_start_trans,
1710         .prepare_commit    = partition_prepare_commit,
1711         .end_transaction   = partition_end_trans,
1712         .del_transaction   = partition_del_trans,
1713         .read_lock         = partition_read_lock,
1714         .read_unlock       = partition_read_unlock
1715 };
1716
1717 int ldb_partition_module_init(const char *version)
1718 {
1719         LDB_MODULE_CHECK_VERSION(version);
1720         return ldb_register_module(&ldb_partition_module_ops);
1721 }