s4:partition DSDB module - Cosmetic fixups
[samba.git] / source4 / dsdb / samdb / ldb_modules / partition.c
1 /* 
2    Partitions ldb module
3
4    Copyright (C) Andrew Bartlett <abartlet@samba.org> 2006
5    Copyright (C) Stefan Metzmacher <metze@samba.org> 2007
6
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 3 of the License, or
10    (at your option) any later version.
11    
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16    
17    You should have received a copy of the GNU General Public License
18    along with this program.  If not, see <http://www.gnu.org/licenses/>.
19 */
20
21 /*
22  *  Name: ldb
23  *
24  *  Component: ldb partitions module
25  *
26  *  Description: Implement LDAP partitions
27  *
28  *  Author: Andrew Bartlett
29  *  Author: Stefan Metzmacher
30  */
31
32 #include "dsdb/samdb/ldb_modules/partition.h"
33
34 struct part_request {
35         struct ldb_module *module;
36         struct ldb_request *req;
37 };
38
39 struct partition_context {
40         struct ldb_module *module;
41         struct ldb_request *req;
42
43         struct part_request *part_req;
44         int num_requests;
45         int finished_requests;
46 };
47
48 static struct partition_context *partition_init_ctx(struct ldb_module *module, struct ldb_request *req)
49 {
50         struct partition_context *ac;
51
52         ac = talloc_zero(req, struct partition_context);
53         if (ac == NULL) {
54                 ldb_set_errstring(ldb_module_get_ctx(module), "Out of Memory");
55                 return NULL;
56         }
57
58         ac->module = module;
59         ac->req = req;
60
61         return ac;
62 }
63
64 /*
65  * helper functions to call the next module in chain
66  */
67 int partition_request(struct ldb_module *module, struct ldb_request *request)
68 {
69         if ((module && module->ldb->flags & LDB_FLG_ENABLE_TRACING)) { \
70                 const struct dsdb_control_current_partition *partition = NULL;
71                 struct ldb_control *partition_ctrl = ldb_request_get_control(request, DSDB_CONTROL_CURRENT_PARTITION_OID);
72                 if (partition_ctrl) {
73                         partition = talloc_get_type(partition_ctrl->data,
74                                                     struct dsdb_control_current_partition);
75                 }
76
77                 if (partition != NULL) {
78                         ldb_debug(module->ldb, LDB_DEBUG_TRACE, "partition_request() -> %s", 
79                                   ldb_dn_get_linearized(partition->dn));                        
80                 } else {
81                         ldb_debug(module->ldb, LDB_DEBUG_TRACE, "partition_request() -> (metadata partition)");                 
82                 }
83         }
84
85         return ldb_next_request(module, request);
86 }
87
88 static struct dsdb_partition *find_partition(struct partition_private_data *data,
89                                              struct ldb_dn *dn,
90                                              struct ldb_request *req)
91 {
92         int i;
93         struct ldb_control *partition_ctrl;
94
95         /* see if the request has the partition DN specified in a
96          * control. The repl_meta_data module can specify this to
97          * ensure that replication happens to the right partition
98          */
99         partition_ctrl = ldb_request_get_control(req, DSDB_CONTROL_CURRENT_PARTITION_OID);
100         if (partition_ctrl) {
101                 const struct dsdb_control_current_partition *partition;
102                 partition = talloc_get_type(partition_ctrl->data,
103                                             struct dsdb_control_current_partition);
104                 if (partition != NULL) {
105                         dn = partition->dn;
106                 }
107         }
108
109         if (dn == NULL) {
110                 return NULL;
111         }
112
113         /* Look at base DN */
114         /* Figure out which partition it is under */
115         /* Skip the lot if 'data' isn't here yet (initialisation) */
116         for (i=0; data && data->partitions && data->partitions[i]; i++) {
117                 if (ldb_dn_compare_base(data->partitions[i]->ctrl->dn, dn) == 0) {
118                         return data->partitions[i];
119                 }
120         }
121
122         return NULL;
123 }
124
125 /**
126  * fire the caller's callback for every entry, but only send 'done' once.
127  */
128 static int partition_req_callback(struct ldb_request *req,
129                                   struct ldb_reply *ares)
130 {
131         struct partition_context *ac;
132         struct ldb_module *module;
133         struct ldb_request *nreq;
134         int ret;
135         struct partition_private_data *data;
136         struct ldb_control *partition_ctrl;
137
138         ac = talloc_get_type(req->context, struct partition_context);
139         data = talloc_get_type(ac->module->private_data, struct partition_private_data);
140
141         if (!ares) {
142                 return ldb_module_done(ac->req, NULL, NULL,
143                                         LDB_ERR_OPERATIONS_ERROR);
144         }
145
146         partition_ctrl = ldb_request_get_control(req, DSDB_CONTROL_CURRENT_PARTITION_OID);
147         if (partition_ctrl && (ac->num_requests == 1 || ares->type == LDB_REPLY_ENTRY)) {
148                 /* If we didn't fan this request out to mulitple partitions,
149                  * or this is an individual search result, we can
150                  * deterministily tell the caller what partition this was
151                  * written to (repl_meta_data likes to know) */
152                 ret = ldb_reply_add_control(ares,
153                                             DSDB_CONTROL_CURRENT_PARTITION_OID,
154                                             false, partition_ctrl->data);
155                 if (ret != LDB_SUCCESS) {
156                         return ldb_module_done(ac->req, NULL, NULL,
157                                                ret);
158                 }
159         }
160
161         if (ares->error != LDB_SUCCESS) {
162                 return ldb_module_done(ac->req, ares->controls,
163                                         ares->response, ares->error);
164         }
165
166         switch (ares->type) {
167         case LDB_REPLY_REFERRAL:
168                 /* ignore referrals for now */
169                 break;
170
171         case LDB_REPLY_ENTRY:
172                 if (ac->req->operation != LDB_SEARCH) {
173                         ldb_set_errstring(ldb_module_get_ctx(ac->module),
174                                 "partition_req_callback:"
175                                 " Unsupported reply type for this request");
176                         return ldb_module_done(ac->req, NULL, NULL,
177                                                 LDB_ERR_OPERATIONS_ERROR);
178                 }
179                 
180                 return ldb_module_send_entry(ac->req, ares->message, ares->controls);
181
182         case LDB_REPLY_DONE:
183                 if (ac->req->operation == LDB_EXTENDED) {
184                         /* FIXME: check for ares->response, replmd does not fill it ! */
185                         if (ares->response) {
186                                 if (strcmp(ares->response->oid, LDB_EXTENDED_START_TLS_OID) != 0) {
187                                         ldb_set_errstring(ldb_module_get_ctx(ac->module),
188                                                           "partition_req_callback:"
189                                                           " Unknown extended reply, "
190                                                           "only supports START_TLS");
191                                         talloc_free(ares);
192                                         return ldb_module_done(ac->req, NULL, NULL,
193                                                                 LDB_ERR_OPERATIONS_ERROR);
194                                 }
195                         }
196                 }
197
198                 ac->finished_requests++;
199                 if (ac->finished_requests == ac->num_requests) {
200                         /* this was the last one, call callback */
201                         return ldb_module_done(ac->req, ares->controls,
202                                                ares->response, 
203                                                ares->error);
204                 }
205
206                 /* not the last, now call the next one */
207                 module = ac->part_req[ac->finished_requests].module;
208                 nreq = ac->part_req[ac->finished_requests].req;
209
210                 ret = partition_request(module, nreq);
211                 if (ret != LDB_SUCCESS) {
212                         talloc_free(ares);
213                         return ldb_module_done(ac->req, NULL, NULL, ret);
214                 }
215
216                 break;
217         }
218
219         talloc_free(ares);
220         return LDB_SUCCESS;
221 }
222
223 static int partition_prep_request(struct partition_context *ac,
224                                   struct dsdb_partition *partition)
225 {
226         int ret;
227         struct ldb_request *req;
228
229         ac->part_req = talloc_realloc(ac, ac->part_req,
230                                         struct part_request,
231                                         ac->num_requests + 1);
232         if (ac->part_req == NULL) {
233                 ldb_oom(ldb_module_get_ctx(ac->module));
234                 return LDB_ERR_OPERATIONS_ERROR;
235         }
236
237         switch (ac->req->operation) {
238         case LDB_SEARCH:
239                 ret = ldb_build_search_req_ex(&req, ldb_module_get_ctx(ac->module),
240                                         ac->part_req,
241                                         ac->req->op.search.base,
242                                         ac->req->op.search.scope,
243                                         ac->req->op.search.tree,
244                                         ac->req->op.search.attrs,
245                                         ac->req->controls,
246                                         ac, partition_req_callback,
247                                         ac->req);
248                 break;
249         case LDB_ADD:
250                 ret = ldb_build_add_req(&req, ldb_module_get_ctx(ac->module), ac->part_req,
251                                         ac->req->op.add.message,
252                                         ac->req->controls,
253                                         ac, partition_req_callback,
254                                         ac->req);
255                 break;
256         case LDB_MODIFY:
257                 ret = ldb_build_mod_req(&req, ldb_module_get_ctx(ac->module), ac->part_req,
258                                         ac->req->op.mod.message,
259                                         ac->req->controls,
260                                         ac, partition_req_callback,
261                                         ac->req);
262                 break;
263         case LDB_DELETE:
264                 ret = ldb_build_del_req(&req, ldb_module_get_ctx(ac->module), ac->part_req,
265                                         ac->req->op.del.dn,
266                                         ac->req->controls,
267                                         ac, partition_req_callback,
268                                         ac->req);
269                 break;
270         case LDB_RENAME:
271                 ret = ldb_build_rename_req(&req, ldb_module_get_ctx(ac->module), ac->part_req,
272                                         ac->req->op.rename.olddn,
273                                         ac->req->op.rename.newdn,
274                                         ac->req->controls,
275                                         ac, partition_req_callback,
276                                         ac->req);
277                 break;
278         case LDB_EXTENDED:
279                 ret = ldb_build_extended_req(&req, ldb_module_get_ctx(ac->module),
280                                         ac->part_req,
281                                         ac->req->op.extended.oid,
282                                         ac->req->op.extended.data,
283                                         ac->req->controls,
284                                         ac, partition_req_callback,
285                                         ac->req);
286                 break;
287         default:
288                 ldb_set_errstring(ldb_module_get_ctx(ac->module),
289                                   "Unsupported request type!");
290                 ret = LDB_ERR_UNWILLING_TO_PERFORM;
291         }
292
293         if (ret != LDB_SUCCESS) {
294                 return ret;
295         }
296
297         ac->part_req[ac->num_requests].req = req;
298
299         if (ac->req->controls) {
300                 req->controls = talloc_memdup(req, ac->req->controls,
301                                         talloc_get_size(ac->req->controls));
302                 if (req->controls == NULL) {
303                         ldb_oom(ldb_module_get_ctx(ac->module));
304                         return LDB_ERR_OPERATIONS_ERROR;
305                 }
306         }
307
308         if (partition) {
309                 ac->part_req[ac->num_requests].module = partition->module;
310
311                 if (!ldb_request_get_control(req, DSDB_CONTROL_CURRENT_PARTITION_OID)) {
312                         ret = ldb_request_add_control(req,
313                                                       DSDB_CONTROL_CURRENT_PARTITION_OID,
314                                                       false, partition->ctrl);
315                         if (ret != LDB_SUCCESS) {
316                                 return ret;
317                         }
318                 }
319
320                 if (req->operation == LDB_SEARCH) {
321                         /* If the search is for 'more' than this partition,
322                          * then change the basedn, so a remote LDAP server
323                          * doesn't object */
324                         if (ldb_dn_compare_base(partition->ctrl->dn,
325                                                 req->op.search.base) != 0) {
326                                 req->op.search.base = partition->ctrl->dn;
327                         }
328                 }
329
330         } else {
331                 /* make sure you put the module here, or
332                  * or ldb_next_request() will skip a module */
333                 ac->part_req[ac->num_requests].module = ac->module;
334         }
335
336         ac->num_requests++;
337
338         return LDB_SUCCESS;
339 }
340
341 static int partition_call_first(struct partition_context *ac)
342 {
343         return partition_request(ac->part_req[0].module, ac->part_req[0].req);
344 }
345
346 /**
347  * Send a request down to all the partitions
348  */
349 static int partition_send_all(struct ldb_module *module, 
350                               struct partition_context *ac, 
351                               struct ldb_request *req) 
352 {
353         int i;
354         struct partition_private_data *data = talloc_get_type(module->private_data, 
355                                                               struct partition_private_data);
356         int ret = partition_prep_request(ac, NULL);
357         if (ret != LDB_SUCCESS) {
358                 return ret;
359         }
360         for (i=0; data && data->partitions && data->partitions[i]; i++) {
361                 ret = partition_prep_request(ac, data->partitions[i]);
362                 if (ret != LDB_SUCCESS) {
363                         return ret;
364                 }
365         }
366
367         /* fire the first one */
368         return partition_call_first(ac);
369 }
370
371 /**
372  * Figure out which backend a request needs to be aimed at.  Some
373  * requests must be replicated to all backends
374  */
375 static int partition_replicate(struct ldb_module *module, struct ldb_request *req, struct ldb_dn *dn) 
376 {
377         struct partition_context *ac;
378         unsigned i;
379         int ret;
380         struct dsdb_partition *partition;
381         struct partition_private_data *data = talloc_get_type(module->private_data, 
382                                                               struct partition_private_data);
383         if (!data || !data->partitions) {
384                 return ldb_next_request(module, req);
385         }
386
387         if (req->operation != LDB_SEARCH && ldb_dn_is_special(dn)) {
388                 /* Is this a special DN, we need to replicate to every backend? */
389                 for (i=0; data->replicate && data->replicate[i]; i++) {
390                         if (ldb_dn_compare(data->replicate[i], 
391                                            dn) == 0) {
392                                 
393                                 ac = partition_init_ctx(module, req);
394                                 if (!ac) {
395                                         return LDB_ERR_OPERATIONS_ERROR;
396                                 }
397                                 
398                                 return partition_send_all(module, ac, req);
399                         }
400                 }
401         }
402
403         /* Otherwise, we need to find the partition to fire it to */
404
405         /* Find partition */
406         partition = find_partition(data, dn, req);
407         if (!partition) {
408                 /*
409                  * if we haven't found a matching partition
410                  * pass the request to the main ldb
411                  *
412                  * TODO: we should maybe return an error here
413                  *       if it's not a special dn
414                  */
415
416                 return ldb_next_request(module, req);
417         }
418
419         ac = partition_init_ctx(module, req);
420         if (!ac) {
421                 return LDB_ERR_OPERATIONS_ERROR;
422         }
423
424         /* we need to add a control but we never touch the original request */
425         ret = partition_prep_request(ac, partition);
426         if (ret != LDB_SUCCESS) {
427                 return ret;
428         }
429
430         /* fire the first one */
431         return partition_call_first(ac);
432 }
433
434 /* search */
435 static int partition_search(struct ldb_module *module, struct ldb_request *req)
436 {
437         int ret;
438         struct ldb_control **saved_controls;
439         /* Find backend */
440         struct partition_private_data *data = talloc_get_type(module->private_data, 
441                                                               struct partition_private_data);
442
443         /* issue request */
444
445         /* (later) consider if we should be searching multiple
446          * partitions (for 'invisible' partition behaviour */
447
448         struct ldb_control *search_control = ldb_request_get_control(req, LDB_CONTROL_SEARCH_OPTIONS_OID);
449         struct ldb_control *domain_scope_control = ldb_request_get_control(req, LDB_CONTROL_DOMAIN_SCOPE_OID);
450         
451         struct ldb_search_options_control *search_options = NULL;
452         struct dsdb_partition *p;
453         
454         ret = partition_reload_if_required(module, data);
455         if (ret != LDB_SUCCESS) {
456                 return ret;
457         }
458
459         p = find_partition(data, NULL, req);
460         if (p != NULL) {
461                 /* the caller specified what partition they want the
462                  * search - just pass it on
463                  */
464                 return ldb_next_request(p->module, req);                
465         }
466
467
468         if (search_control) {
469                 search_options = talloc_get_type(search_control->data, struct ldb_search_options_control);
470         }
471
472         /* Remove the domain_scope control, so we don't confuse a backend server */
473         if (domain_scope_control && !save_controls(domain_scope_control, req, &saved_controls)) {
474                 ldb_oom(ldb_module_get_ctx(module));
475                 return LDB_ERR_OPERATIONS_ERROR;
476         }
477
478         /*
479          * for now pass down the LDB_CONTROL_SEARCH_OPTIONS_OID control
480          * down as uncritical to make windows 2008 dcpromo happy.
481          */
482         if (search_control) {
483                 search_control->critical = 0;
484         }
485
486         /* TODO:
487          * Generate referrals (look for a partition under this DN) if we don't
488          * have the LDB_CONTROL_DOMAIN_SCOPE_OID control specified.
489          */
490         
491         if (search_options && (search_options->search_options & LDB_SEARCH_OPTION_PHANTOM_ROOT)) {
492                 int i;
493                 struct partition_context *ac;
494
495                 if ((search_options->search_options & ~LDB_SEARCH_OPTION_PHANTOM_ROOT) == 0) {
496                         /* We have processed this flag, so we are done with this control now */
497
498                         /* Remove search control, so we don't confuse a backend server */
499                         if (search_control && !save_controls(search_control, req, &saved_controls)) {
500                                 ldb_oom(ldb_module_get_ctx(module));
501                                 return LDB_ERR_OPERATIONS_ERROR;
502                         }
503                 }
504
505                 ac = partition_init_ctx(module, req);
506                 if (!ac) {
507                         return LDB_ERR_OPERATIONS_ERROR;
508                 }
509
510                 /* Search from the base DN */
511                 if (!req->op.search.base || ldb_dn_is_null(req->op.search.base)) {
512                         return partition_send_all(module, ac, req);
513                 }
514                 for (i=0; data && data->partitions && data->partitions[i]; i++) {
515                         bool match = false, stop = false;
516                         /* Find all partitions under the search base 
517                            
518                            we match if:
519
520                               1) the DN we are looking for exactly matches the partition
521                              or
522                               2) the DN we are looking for is a parent of the partition and it isn't
523                                  a scope base search
524                              or
525                               3) the DN we are looking for is a child of the partition
526                          */
527                         if (ldb_dn_compare(data->partitions[i]->ctrl->dn, req->op.search.base) == 0) {
528                                 match = true;
529                                 stop = true;
530                         }
531                         if (!match && 
532                             (ldb_dn_compare_base(req->op.search.base, data->partitions[i]->ctrl->dn) == 0 &&
533                              req->op.search.scope != LDB_SCOPE_BASE)) {
534                                 match = true;
535                         }
536                         if (!match &&
537                             ldb_dn_compare_base(data->partitions[i]->ctrl->dn, req->op.search.base) == 0) {
538                                 match = true;
539                                 stop = true; /* note that this relies on partition ordering */
540                         }
541                         if (match) {
542                                 ret = partition_prep_request(ac, data->partitions[i]);
543                                 if (ret != LDB_SUCCESS) {
544                                         return ret;
545                                 }
546                         }
547                         if (stop) break;
548                 }
549
550                 /* Perhaps we didn't match any partitions.  Try the main partition, only */
551                 if (ac->num_requests == 0) {
552                         talloc_free(ac);
553                         return ldb_next_request(module, req);
554                 }
555
556                 /* fire the first one */
557                 return partition_call_first(ac);
558         } else {
559                 /* Handle this like all other requests */
560                 if (search_control && (search_options->search_options & ~LDB_SEARCH_OPTION_PHANTOM_ROOT) == 0) {
561                         /* We have processed this flag, so we are done with this control now */
562
563                         /* Remove search control, so we don't confuse a backend server */
564                         if (search_control && !save_controls(search_control, req, &saved_controls)) {
565                                 ldb_oom(ldb_module_get_ctx(module));
566                                 return LDB_ERR_OPERATIONS_ERROR;
567                         }
568                 }
569
570                 return partition_replicate(module, req, req->op.search.base);
571         }
572 }
573
574 /* add */
575 static int partition_add(struct ldb_module *module, struct ldb_request *req)
576 {
577         return partition_replicate(module, req, req->op.add.message->dn);
578 }
579
580 /* modify */
581 static int partition_modify(struct ldb_module *module, struct ldb_request *req)
582 {
583         return partition_replicate(module, req, req->op.mod.message->dn);
584 }
585
586 /* delete */
587 static int partition_delete(struct ldb_module *module, struct ldb_request *req)
588 {
589         return partition_replicate(module, req, req->op.del.dn);
590 }
591
592 /* rename */
593 static int partition_rename(struct ldb_module *module, struct ldb_request *req)
594 {
595         /* Find backend */
596         struct dsdb_partition *backend, *backend2;
597         
598         struct partition_private_data *data = talloc_get_type(module->private_data, 
599                                                               struct partition_private_data);
600
601         /* Skip the lot if 'data' isn't here yet (initialisation) */
602         if (!data) {
603                 return LDB_ERR_OPERATIONS_ERROR;
604         }
605
606         backend = find_partition(data, req->op.rename.olddn, req);
607         backend2 = find_partition(data, req->op.rename.newdn, req);
608
609         if ((backend && !backend2) || (!backend && backend2)) {
610                 return LDB_ERR_AFFECTS_MULTIPLE_DSAS;
611         }
612
613         if (backend != backend2) {
614                 ldb_asprintf_errstring(ldb_module_get_ctx(module), 
615                                        "Cannot rename from %s in %s to %s in %s: %s",
616                                        ldb_dn_get_linearized(req->op.rename.olddn),
617                                        ldb_dn_get_linearized(backend->ctrl->dn),
618                                        ldb_dn_get_linearized(req->op.rename.newdn),
619                                        ldb_dn_get_linearized(backend2->ctrl->dn),
620                                        ldb_strerror(LDB_ERR_AFFECTS_MULTIPLE_DSAS));
621                 return LDB_ERR_AFFECTS_MULTIPLE_DSAS;
622         }
623
624         return partition_replicate(module, req, req->op.rename.olddn);
625 }
626
627 /* start a transaction */
628 static int partition_start_trans(struct ldb_module *module)
629 {
630         int i, ret;
631         struct partition_private_data *data = talloc_get_type(module->private_data, 
632                                                               struct partition_private_data);
633         /* Look at base DN */
634         /* Figure out which partition it is under */
635         /* Skip the lot if 'data' isn't here yet (initialization) */
636         if ((module && module->ldb->flags & LDB_FLG_ENABLE_TRACING)) {
637                 ldb_debug(module->ldb, LDB_DEBUG_TRACE, "partition_start_trans() -> (metadata partition)");
638         }
639         ret = ldb_next_start_trans(module);
640         if (ret != LDB_SUCCESS) {
641                 return ret;
642         }
643
644         ret = partition_reload_if_required(module, data);
645         if (ret != LDB_SUCCESS) {
646                 return ret;
647         }
648
649         for (i=0; data && data->partitions && data->partitions[i]; i++) {
650                 if ((module && module->ldb->flags & LDB_FLG_ENABLE_TRACING)) {
651                         ldb_debug(module->ldb, LDB_DEBUG_TRACE, "partition_start_trans() -> %s", 
652                                   ldb_dn_get_linearized(data->partitions[i]->ctrl->dn));
653                 }
654                 ret = ldb_next_start_trans(data->partitions[i]->module);
655                 if (ret != LDB_SUCCESS) {
656                         /* Back it out, if it fails on one */
657                         for (i--; i >= 0; i--) {
658                                 ldb_next_del_trans(data->partitions[i]->module);
659                         }
660                         ldb_next_del_trans(module);
661                         return ret;
662                 }
663         }
664
665         data->in_transaction++;
666
667         return LDB_SUCCESS;
668 }
669
670 /* prepare for a commit */
671 static int partition_prepare_commit(struct ldb_module *module)
672 {
673         int i;
674         struct partition_private_data *data = talloc_get_type(module->private_data, 
675                                                               struct partition_private_data);
676
677         for (i=0; data && data->partitions && data->partitions[i]; i++) {
678                 int ret;
679
680                 if ((module && module->ldb->flags & LDB_FLG_ENABLE_TRACING)) {
681                         ldb_debug(module->ldb, LDB_DEBUG_TRACE, "partition_prepare_commit() -> %s", 
682                                   ldb_dn_get_linearized(data->partitions[i]->ctrl->dn));
683                 }
684                 ret = ldb_next_prepare_commit(data->partitions[i]->module);
685                 if (ret != LDB_SUCCESS) {
686                         ldb_asprintf_errstring(module->ldb, "prepare_commit error on %s: %s",
687                                                ldb_dn_get_linearized(data->partitions[i]->ctrl->dn),
688                                                ldb_errstring(module->ldb));
689                         return ret;
690                 }
691         }
692
693         if ((module && module->ldb->flags & LDB_FLG_ENABLE_TRACING)) {
694                 ldb_debug(module->ldb, LDB_DEBUG_TRACE, "partition_prepare_commit() -> (metadata partition)");
695         }
696         return ldb_next_prepare_commit(module);
697 }
698
699
700 /* end a transaction */
701 static int partition_end_trans(struct ldb_module *module)
702 {
703         int i, ret, ret2;
704         struct partition_private_data *data = talloc_get_type(module->private_data, 
705                                                               struct partition_private_data);
706
707         ret = LDB_SUCCESS;
708
709         if (data->in_transaction == 0) {
710                 DEBUG(0,("partition end transaction mismatch\n"));
711                 ret = LDB_ERR_OPERATIONS_ERROR;
712         } else {
713                 data->in_transaction--;
714         }
715
716         for (i=0; data && data->partitions && data->partitions[i]; i++) {
717                 if ((module && module->ldb->flags & LDB_FLG_ENABLE_TRACING)) {
718                         ldb_debug(module->ldb, LDB_DEBUG_TRACE, "partition_end_trans() -> %s", 
719                                   ldb_dn_get_linearized(data->partitions[i]->ctrl->dn));
720                 }
721                 ret2 = ldb_next_end_trans(data->partitions[i]->module);
722                 if (ret2 != LDB_SUCCESS) {
723                         ldb_asprintf_errstring(module->ldb, "end_trans error on %s: %s",
724                                                ldb_dn_get_linearized(data->partitions[i]->ctrl->dn),
725                                                ldb_errstring(module->ldb));
726                         ret = ret2;
727                 }
728         }
729
730         if ((module && module->ldb->flags & LDB_FLG_ENABLE_TRACING)) {
731                 ldb_debug(module->ldb, LDB_DEBUG_TRACE, "partition_end_trans() -> (metadata partition)");
732         }
733         ret2 = ldb_next_end_trans(module);
734         if (ret2 != LDB_SUCCESS) {
735                 ret = ret2;
736         }
737         return ret;
738 }
739
740 /* delete a transaction */
741 static int partition_del_trans(struct ldb_module *module)
742 {
743         int i, ret, final_ret = LDB_SUCCESS;
744         struct partition_private_data *data = talloc_get_type(module->private_data, 
745                                                               struct partition_private_data);
746         for (i=0; data && data->partitions && data->partitions[i]; i++) {
747                 if ((module && module->ldb->flags & LDB_FLG_ENABLE_TRACING)) {
748                         ldb_debug(module->ldb, LDB_DEBUG_TRACE, "partition_del_trans() -> %s", 
749                                   ldb_dn_get_linearized(data->partitions[i]->ctrl->dn));
750                 }
751                 ret = ldb_next_del_trans(data->partitions[i]->module);
752                 if (ret != LDB_SUCCESS) {
753                         ldb_asprintf_errstring(module->ldb, "del_trans error on %s: %s",
754                                                ldb_dn_get_linearized(data->partitions[i]->ctrl->dn),
755                                                ldb_errstring(module->ldb));
756                         final_ret = ret;
757                 }
758         }       
759
760         if (data->in_transaction == 0) {
761                 DEBUG(0,("partition del transaction mismatch\n"));
762                 return LDB_ERR_OPERATIONS_ERROR;
763         }
764         data->in_transaction--;
765
766         if ((module && module->ldb->flags & LDB_FLG_ENABLE_TRACING)) {
767                 ldb_debug(module->ldb, LDB_DEBUG_TRACE, "partition_del_trans() -> (metadata partition)");
768         }
769         ret = ldb_next_del_trans(module);
770         if (ret != LDB_SUCCESS) {
771                 final_ret = ret;
772         }
773         return final_ret;
774 }
775
776 int partition_primary_sequence_number(struct ldb_module *module, TALLOC_CTX *mem_ctx, 
777                                      enum ldb_sequence_type type, uint64_t *seq_number) 
778 {
779         int ret;
780         struct ldb_result *res;
781         struct ldb_seqnum_request *tseq;
782         struct ldb_request *treq;
783         struct ldb_seqnum_result *seqr;
784         res = talloc_zero(mem_ctx, struct ldb_result);
785         if (res == NULL) {
786                 return LDB_ERR_OPERATIONS_ERROR;
787         }
788         tseq = talloc_zero(res, struct ldb_seqnum_request);
789         if (tseq == NULL) {
790                 talloc_free(res);
791                 return LDB_ERR_OPERATIONS_ERROR;
792         }
793         tseq->type = type;
794         
795         ret = ldb_build_extended_req(&treq, ldb_module_get_ctx(module), res,
796                                      LDB_EXTENDED_SEQUENCE_NUMBER,
797                                      tseq,
798                                      NULL,
799                                      res,
800                                      ldb_extended_default_callback,
801                                      NULL);
802         if (ret != LDB_SUCCESS) {
803                 talloc_free(res);
804                 return ret;
805         }
806         
807         ret = ldb_next_request(module, treq);
808         if (ret != LDB_SUCCESS) {
809                 talloc_free(res);
810                 return ret;
811         }
812         ret = ldb_wait(treq->handle, LDB_WAIT_ALL);
813         if (ret != LDB_SUCCESS) {
814                 talloc_free(res);
815                 return ret;
816         }
817         
818         seqr = talloc_get_type(res->extended->data,
819                                struct ldb_seqnum_result);
820         if (seqr->flags & LDB_SEQ_TIMESTAMP_SEQUENCE) {
821                 ret = LDB_ERR_OPERATIONS_ERROR;
822                 ldb_set_errstring(ldb_module_get_ctx(module), "Primary backend in partitions module returned a timestamp based seq number (must return a normal number)");
823                 talloc_free(res);
824                 return ret;
825         } else {
826                 *seq_number = seqr->seq_num;
827         }
828         talloc_free(res);
829         return LDB_SUCCESS;
830 }
831
832 /* FIXME: This function is still semi-async */
833 static int partition_sequence_number(struct ldb_module *module, struct ldb_request *req)
834 {
835         int i, ret;
836         uint64_t seq_number = 0;
837         uint64_t timestamp_sequence = 0;
838         uint64_t timestamp = 0;
839         struct partition_private_data *data = talloc_get_type(module->private_data, 
840                                                               struct partition_private_data);
841         struct ldb_seqnum_request *seq;
842         struct ldb_seqnum_result *seqr;
843         struct ldb_request *treq;
844         struct ldb_seqnum_request *tseq;
845         struct ldb_seqnum_result *tseqr;
846         struct ldb_extended *ext;
847         struct ldb_result *res;
848         struct dsdb_partition *p;
849
850         p = find_partition(data, NULL, req);
851         if (p != NULL) {
852                 /* the caller specified what partition they want the
853                  * sequence number operation on - just pass it on
854                  */
855                 return ldb_next_request(p->module, req);                
856         }
857
858         seq = talloc_get_type(req->op.extended.data, struct ldb_seqnum_request);
859
860         switch (seq->type) {
861         case LDB_SEQ_NEXT:
862         case LDB_SEQ_HIGHEST_SEQ:
863
864                 ret = partition_primary_sequence_number(module, req, seq->type, &seq_number);
865                 if (ret != LDB_SUCCESS) {
866                         return ret;
867                 }
868
869                 /* Skip the lot if 'data' isn't here yet (initialisation) */
870                 for (i=0; data && data->partitions && data->partitions[i]; i++) {
871
872                         res = talloc_zero(req, struct ldb_result);
873                         if (res == NULL) {
874                                 return LDB_ERR_OPERATIONS_ERROR;
875                         }
876                         tseq = talloc_zero(res, struct ldb_seqnum_request);
877                         if (tseq == NULL) {
878                                 talloc_free(res);
879                                 return LDB_ERR_OPERATIONS_ERROR;
880                         }
881                         tseq->type = seq->type;
882
883                         ret = ldb_build_extended_req(&treq, ldb_module_get_ctx(module), res,
884                                                      LDB_EXTENDED_SEQUENCE_NUMBER,
885                                                      tseq,
886                                                      NULL,
887                                                      res,
888                                                      ldb_extended_default_callback,
889                                                      NULL);
890                         if (ret != LDB_SUCCESS) {
891                                 talloc_free(res);
892                                 return ret;
893                         }
894
895                         if (!ldb_request_get_control(treq, DSDB_CONTROL_CURRENT_PARTITION_OID)) {
896                                 ret = ldb_request_add_control(treq,
897                                                               DSDB_CONTROL_CURRENT_PARTITION_OID,
898                                                               false, data->partitions[i]->ctrl);
899                                 if (ret != LDB_SUCCESS) {
900                                         talloc_free(res);
901                                         return ret;
902                                 }
903                         }
904
905                         ret = partition_request(data->partitions[i]->module, treq);
906                         if (ret != LDB_SUCCESS) {
907                                 talloc_free(res);
908                                 return ret;
909                         }
910                         ret = ldb_wait(treq->handle, LDB_WAIT_ALL);
911                         if (ret != LDB_SUCCESS) {
912                                 talloc_free(res);
913                                 return ret;
914                         }
915                         tseqr = talloc_get_type(res->extended->data,
916                                                 struct ldb_seqnum_result);
917                         if (tseqr->flags & LDB_SEQ_TIMESTAMP_SEQUENCE) {
918                                 timestamp_sequence = MAX(timestamp_sequence,
919                                                          tseqr->seq_num);
920                         } else {
921                                 seq_number += tseqr->seq_num;
922                         }
923                         talloc_free(res);
924                 }
925                 /* fall through */
926         case LDB_SEQ_HIGHEST_TIMESTAMP:
927
928                 res = talloc_zero(req, struct ldb_result);
929                 if (res == NULL) {
930                         return LDB_ERR_OPERATIONS_ERROR;
931                 }
932
933                 tseq = talloc_zero(res, struct ldb_seqnum_request);
934                 if (tseq == NULL) {
935                         talloc_free(res);
936                         return LDB_ERR_OPERATIONS_ERROR;
937                 }
938                 tseq->type = LDB_SEQ_HIGHEST_TIMESTAMP;
939
940                 ret = ldb_build_extended_req(&treq, ldb_module_get_ctx(module), res,
941                                              LDB_EXTENDED_SEQUENCE_NUMBER,
942                                              tseq,
943                                              NULL,
944                                              res,
945                                              ldb_extended_default_callback,
946                                              NULL);
947                 if (ret != LDB_SUCCESS) {
948                         talloc_free(res);
949                         return ret;
950                 }
951
952                 ret = ldb_next_request(module, treq);
953                 if (ret != LDB_SUCCESS) {
954                         talloc_free(res);
955                         return ret;
956                 }
957                 ret = ldb_wait(treq->handle, LDB_WAIT_ALL);
958                 if (ret != LDB_SUCCESS) {
959                         talloc_free(res);
960                         return ret;
961                 }
962
963                 tseqr = talloc_get_type(res->extended->data,
964                                            struct ldb_seqnum_result);
965                 timestamp = tseqr->seq_num;
966
967                 talloc_free(res);
968
969                 /* Skip the lot if 'data' isn't here yet (initialisation) */
970                 for (i=0; data && data->partitions && data->partitions[i]; i++) {
971
972                         res = talloc_zero(req, struct ldb_result);
973                         if (res == NULL) {
974                                 return LDB_ERR_OPERATIONS_ERROR;
975                         }
976
977                         tseq = talloc_zero(res, struct ldb_seqnum_request);
978                         if (tseq == NULL) {
979                                 talloc_free(res);
980                                 return LDB_ERR_OPERATIONS_ERROR;
981                         }
982                         tseq->type = LDB_SEQ_HIGHEST_TIMESTAMP;
983
984                         ret = ldb_build_extended_req(&treq, ldb_module_get_ctx(module), res,
985                                                      LDB_EXTENDED_SEQUENCE_NUMBER,
986                                                      tseq,
987                                                      NULL,
988                                                      res,
989                                                      ldb_extended_default_callback,
990                                                      NULL);
991                         if (ret != LDB_SUCCESS) {
992                                 talloc_free(res);
993                                 return ret;
994                         }
995
996                         if (!ldb_request_get_control(treq, DSDB_CONTROL_CURRENT_PARTITION_OID)) {
997                                 ret = ldb_request_add_control(treq,
998                                                               DSDB_CONTROL_CURRENT_PARTITION_OID,
999                                                               false, data->partitions[i]->ctrl);
1000                                 if (ret != LDB_SUCCESS) {
1001                                         talloc_free(res);
1002                                         return ret;
1003                                 }
1004                         }
1005
1006                         ret = partition_request(data->partitions[i]->module, treq);
1007                         if (ret != LDB_SUCCESS) {
1008                                 talloc_free(res);
1009                                 return ret;
1010                         }
1011                         ret = ldb_wait(treq->handle, LDB_WAIT_ALL);
1012                         if (ret != LDB_SUCCESS) {
1013                                 talloc_free(res);
1014                                 return ret;
1015                         }
1016
1017                         tseqr = talloc_get_type(res->extended->data,
1018                                                   struct ldb_seqnum_result);
1019                         timestamp = MAX(timestamp, tseqr->seq_num);
1020
1021                         talloc_free(res);
1022                 }
1023
1024                 break;
1025         }
1026
1027         ext = talloc_zero(req, struct ldb_extended);
1028         if (!ext) {
1029                 return LDB_ERR_OPERATIONS_ERROR;
1030         }
1031         seqr = talloc_zero(ext, struct ldb_seqnum_result);
1032         if (seqr == NULL) {
1033                 talloc_free(ext);
1034                 return LDB_ERR_OPERATIONS_ERROR;
1035         }
1036         ext->oid = LDB_EXTENDED_SEQUENCE_NUMBER;
1037         ext->data = seqr;
1038
1039         switch (seq->type) {
1040         case LDB_SEQ_NEXT:
1041         case LDB_SEQ_HIGHEST_SEQ:
1042
1043                 /* Has someone above set a timebase sequence? */
1044                 if (timestamp_sequence) {
1045                         seqr->seq_num = (((unsigned long long)timestamp << 24) | (seq_number & 0xFFFFFF));
1046                 } else {
1047                         seqr->seq_num = seq_number;
1048                 }
1049
1050                 if (timestamp_sequence > seqr->seq_num) {
1051                         seqr->seq_num = timestamp_sequence;
1052                         seqr->flags |= LDB_SEQ_TIMESTAMP_SEQUENCE;
1053                 }
1054
1055                 seqr->flags |= LDB_SEQ_GLOBAL_SEQUENCE;
1056                 break;
1057         case LDB_SEQ_HIGHEST_TIMESTAMP:
1058                 seqr->seq_num = timestamp;
1059                 break;
1060         }
1061
1062         if (seq->type == LDB_SEQ_NEXT) {
1063                 seqr->seq_num++;
1064         }
1065
1066         /* send request done */
1067         return ldb_module_done(req, NULL, ext, LDB_SUCCESS);
1068 }
1069
1070 /* extended */
1071 static int partition_extended(struct ldb_module *module, struct ldb_request *req)
1072 {
1073         struct partition_private_data *data;
1074         struct partition_context *ac;
1075         int ret;
1076
1077         data = talloc_get_type(module->private_data, struct partition_private_data);
1078         if (!data) {
1079                 return ldb_next_request(module, req);
1080         }
1081
1082         ret = partition_reload_if_required(module, data);
1083         if (ret != LDB_SUCCESS) {
1084                 return ret;
1085         }
1086         
1087         if (strcmp(req->op.extended.oid, LDB_EXTENDED_SEQUENCE_NUMBER) == 0) {
1088                 return partition_sequence_number(module, req);
1089         }
1090
1091         if (strcmp(req->op.extended.oid, DSDB_EXTENDED_CREATE_PARTITION_OID) == 0) {
1092                 return partition_create(module, req);
1093         }
1094
1095         /* 
1096          * as the extended operation has no dn
1097          * we need to send it to all partitions
1098          */
1099
1100         ac = partition_init_ctx(module, req);
1101         if (!ac) {
1102                 return LDB_ERR_OPERATIONS_ERROR;
1103         }
1104
1105         return partition_send_all(module, ac, req);
1106 }
1107
1108 _PUBLIC_ const struct ldb_module_ops ldb_partition_module_ops = {
1109         .name              = "partition",
1110         .init_context      = partition_init,
1111         .search            = partition_search,
1112         .add               = partition_add,
1113         .modify            = partition_modify,
1114         .del               = partition_delete,
1115         .rename            = partition_rename,
1116         .extended          = partition_extended,
1117         .start_transaction = partition_start_trans,
1118         .prepare_commit    = partition_prepare_commit,
1119         .end_transaction   = partition_end_trans,
1120         .del_transaction   = partition_del_trans,
1121 };