s4-ldb: added a bunch more debug for DC join
[samba.git] / source4 / dsdb / samdb / ldb_modules / repl_meta_data.c
1 /* 
2    ldb database library
3
4    Copyright (C) Simo Sorce  2004-2008
5    Copyright (C) Andrew Bartlett <abartlet@samba.org> 2005
6    Copyright (C) Andrew Tridgell 2005
7    Copyright (C) Stefan Metzmacher <metze@samba.org> 2007
8
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation; either version 3 of the License, or
12    (at your option) any later version.
13    
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License for more details.
18    
19    You should have received a copy of the GNU General Public License
20    along with this program.  If not, see <http://www.gnu.org/licenses/>.
21 */
22
23 /*
24  *  Name: ldb
25  *
26  *  Component: ldb repl_meta_data module
27  *
28  *  Description: - add a unique objectGUID onto every new record,
29  *               - handle whenCreated, whenChanged timestamps
30  *               - handle uSNCreated, uSNChanged numbers
31  *               - handle replPropertyMetaData attribute
32  *
33  *  Author: Simo Sorce
34  *  Author: Stefan Metzmacher
35  */
36
37 #include "includes.h"
38 #include "ldb_module.h"
39 #include "dsdb/samdb/samdb.h"
40 #include "dsdb/common/proto.h"
41 #include "../libds/common/flags.h"
42 #include "librpc/gen_ndr/ndr_misc.h"
43 #include "librpc/gen_ndr/ndr_drsuapi.h"
44 #include "librpc/gen_ndr/ndr_drsblobs.h"
45 #include "param/param.h"
46 #include "libcli/security/dom_sid.h"
47 #include "lib/util/dlinklist.h"
48
49 struct replmd_private {
50         TALLOC_CTX *la_ctx;
51         struct la_entry *la_list;
52         uint32_t num_ncs;
53         struct nc_entry {
54                 struct ldb_dn *dn;
55                 struct GUID guid;
56                 uint64_t mod_usn;
57         } *ncs;
58 };
59
60 struct la_entry {
61         struct la_entry *next, *prev;
62         struct drsuapi_DsReplicaLinkedAttribute *la;
63 };
64
65 struct replmd_replicated_request {
66         struct ldb_module *module;
67         struct ldb_request *req;
68
69         const struct dsdb_schema *schema;
70
71         struct dsdb_extended_replicated_objects *objs;
72
73         /* the controls we pass down */
74         struct ldb_control **controls;
75
76         uint32_t index_current;
77
78         struct ldb_message *search_msg;
79 };
80
81
82 /*
83   initialise the module
84   allocate the private structure and build the list
85   of partition DNs for use by replmd_notify()
86  */
87 static int replmd_init(struct ldb_module *module)
88 {
89         struct replmd_private *replmd_private;
90         struct ldb_context *ldb = ldb_module_get_ctx(module);
91
92         replmd_private = talloc_zero(module, struct replmd_private);
93         if (replmd_private == NULL) {
94                 ldb_oom(ldb);
95                 return LDB_ERR_OPERATIONS_ERROR;
96         }
97         ldb_module_set_private(module, replmd_private);
98
99         return ldb_next_init(module);
100 }
101
102
103 static int nc_compare(struct nc_entry *n1, struct nc_entry *n2)
104 {
105         return ldb_dn_compare(n1->dn, n2->dn);
106 }
107
108 /*
109   build the list of partition DNs for use by replmd_notify()
110  */
111 static int replmd_load_NCs(struct ldb_module *module)
112 {
113         const char *attrs[] = { "namingContexts", NULL };
114         struct ldb_result *res = NULL;
115         int i, ret;
116         TALLOC_CTX *tmp_ctx;
117         struct ldb_context *ldb;
118         struct ldb_message_element *el;
119         struct replmd_private *replmd_private = 
120                 talloc_get_type(ldb_module_get_private(module), struct replmd_private);
121
122         if (replmd_private->ncs != NULL) {
123                 return LDB_SUCCESS;
124         }
125
126         ldb = ldb_module_get_ctx(module);
127         tmp_ctx = talloc_new(module);
128
129         /* load the list of naming contexts */
130         ret = ldb_search(ldb, tmp_ctx, &res, ldb_dn_new(tmp_ctx, ldb, ""), 
131                          LDB_SCOPE_BASE, attrs, NULL);
132         if (ret != LDB_SUCCESS ||
133             res->count != 1) {
134                 DEBUG(0,(__location__ ": Failed to load rootDSE\n"));
135                 return LDB_ERR_OPERATIONS_ERROR;
136         }
137
138         el = ldb_msg_find_element(res->msgs[0], "namingContexts");
139         if (el == NULL) {
140                 DEBUG(0,(__location__ ": Failed to load namingContexts\n"));
141                 return LDB_ERR_OPERATIONS_ERROR;                
142         }
143
144         replmd_private->num_ncs = el->num_values;
145         replmd_private->ncs = talloc_array(replmd_private, struct nc_entry, 
146                                            replmd_private->num_ncs);
147         if (replmd_private->ncs == NULL) {
148                 ldb_oom(ldb);
149                 return LDB_ERR_OPERATIONS_ERROR;
150         }
151
152         for (i=0; i<replmd_private->num_ncs; i++) {
153                 replmd_private->ncs[i].dn = 
154                         ldb_dn_from_ldb_val(replmd_private->ncs, 
155                                             ldb, &el->values[i]);
156                 replmd_private->ncs[i].mod_usn = 0;
157         }
158
159         talloc_free(res);
160
161         /* now find the GUIDs of each of those DNs */
162         for (i=0; i<replmd_private->num_ncs; i++) {
163                 const char *attrs2[] = { "objectGUID", NULL };
164                 ret = ldb_search(ldb, tmp_ctx, &res, replmd_private->ncs[i].dn,
165                                  LDB_SCOPE_BASE, attrs2, NULL);
166                 if (ret != LDB_SUCCESS ||
167                     res->count != 1) {
168                         /* this happens when the schema is first being
169                            setup */
170                         talloc_free(replmd_private->ncs);
171                         replmd_private->ncs = NULL;
172                         replmd_private->num_ncs = 0;
173                         talloc_free(tmp_ctx);
174                         return LDB_SUCCESS;
175                 }
176                 replmd_private->ncs[i].guid = 
177                         samdb_result_guid(res->msgs[0], "objectGUID");
178                 talloc_free(res);
179         }       
180
181         /* sort the NCs into order, most to least specific */
182         qsort(replmd_private->ncs, replmd_private->num_ncs,
183               sizeof(replmd_private->ncs[0]), QSORT_CAST nc_compare);
184
185         
186         talloc_free(tmp_ctx);
187         
188         return LDB_SUCCESS;
189 }
190
191
192 /*
193  * notify the repl task that a object has changed. The notifies are
194  * gathered up in the replmd_private structure then written to the
195  * @REPLCHANGED object in each partition during the prepare_commit
196  */
197 static int replmd_notify(struct ldb_module *module, struct ldb_dn *dn, uint64_t uSN)
198 {
199         int ret, i;
200         struct replmd_private *replmd_private = 
201                 talloc_get_type(ldb_module_get_private(module), struct replmd_private);
202
203         ret = replmd_load_NCs(module);
204         if (ret != LDB_SUCCESS) {
205                 return ret;
206         }
207         if (replmd_private->num_ncs == 0) {
208                 return LDB_SUCCESS;
209         }
210
211         for (i=0; i<replmd_private->num_ncs; i++) {
212                 if (ldb_dn_compare_base(replmd_private->ncs[i].dn, dn) == 0) {
213                         break;
214                 }
215         }
216         if (i == replmd_private->num_ncs) {
217                 DEBUG(0,(__location__ ": DN not within known NCs '%s'\n", 
218                          ldb_dn_get_linearized(dn)));
219                 return LDB_ERR_OPERATIONS_ERROR;
220         }
221
222         if (uSN > replmd_private->ncs[i].mod_usn) {
223             replmd_private->ncs[i].mod_usn = uSN;
224         }
225
226         return LDB_SUCCESS;
227 }
228
229
230 /*
231  * update a @REPLCHANGED record in each partition if there have been
232  * any writes of replicated data in the partition
233  */
234 static int replmd_notify_store(struct ldb_module *module)
235 {
236         int i;
237         struct replmd_private *replmd_private = 
238                 talloc_get_type(ldb_module_get_private(module), struct replmd_private);
239         struct ldb_context *ldb = ldb_module_get_ctx(module);
240
241         for (i=0; i<replmd_private->num_ncs; i++) {
242                 int ret;
243
244                 if (replmd_private->ncs[i].mod_usn == 0) {
245                         /* this partition has not changed in this
246                            transaction */
247                         continue;
248                 }
249
250                 ret = dsdb_save_partition_usn(ldb, replmd_private->ncs[i].dn, 
251                                               replmd_private->ncs[i].mod_usn);
252                 if (ret != LDB_SUCCESS) {
253                         DEBUG(0,(__location__ ": Failed to save partition uSN for %s\n",
254                                  ldb_dn_get_linearized(replmd_private->ncs[i].dn)));
255                         return ret;
256                 }
257         }
258
259         return LDB_SUCCESS;
260 }
261
262
263 /*
264   created a replmd_replicated_request context
265  */
266 static struct replmd_replicated_request *replmd_ctx_init(struct ldb_module *module,
267                                                          struct ldb_request *req)
268 {
269         struct ldb_context *ldb;
270         struct replmd_replicated_request *ac;
271
272         ldb = ldb_module_get_ctx(module);
273
274         ac = talloc_zero(req, struct replmd_replicated_request);
275         if (ac == NULL) {
276                 ldb_oom(ldb);
277                 return NULL;
278         }
279
280         ac->module = module;
281         ac->req = req;
282         return ac;
283 }
284
285 /*
286   add a time element to a record
287 */
288 static int add_time_element(struct ldb_message *msg, const char *attr, time_t t)
289 {
290         struct ldb_message_element *el;
291         char *s;
292
293         if (ldb_msg_find_element(msg, attr) != NULL) {
294                 return LDB_SUCCESS;
295         }
296
297         s = ldb_timestring(msg, t);
298         if (s == NULL) {
299                 return LDB_ERR_OPERATIONS_ERROR;
300         }
301
302         if (ldb_msg_add_string(msg, attr, s) != LDB_SUCCESS) {
303                 return LDB_ERR_OPERATIONS_ERROR;
304         }
305
306         el = ldb_msg_find_element(msg, attr);
307         /* always set as replace. This works because on add ops, the flag
308            is ignored */
309         el->flags = LDB_FLAG_MOD_REPLACE;
310
311         return LDB_SUCCESS;
312 }
313
314 /*
315   add a uint64_t element to a record
316 */
317 static int add_uint64_element(struct ldb_message *msg, const char *attr, uint64_t v)
318 {
319         struct ldb_message_element *el;
320
321         if (ldb_msg_find_element(msg, attr) != NULL) {
322                 return LDB_SUCCESS;
323         }
324
325         if (ldb_msg_add_fmt(msg, attr, "%llu", (unsigned long long)v) != LDB_SUCCESS) {
326                 return LDB_ERR_OPERATIONS_ERROR;
327         }
328
329         el = ldb_msg_find_element(msg, attr);
330         /* always set as replace. This works because on add ops, the flag
331            is ignored */
332         el->flags = LDB_FLAG_MOD_REPLACE;
333
334         return LDB_SUCCESS;
335 }
336
337 static int replmd_replPropertyMetaData1_attid_sort(const struct replPropertyMetaData1 *m1,
338                                                    const struct replPropertyMetaData1 *m2,
339                                                    const uint32_t *rdn_attid)
340 {
341         if (m1->attid == m2->attid) {
342                 return 0;
343         }
344
345         /*
346          * the rdn attribute should be at the end!
347          * so we need to return a value greater than zero
348          * which means m1 is greater than m2
349          */
350         if (m1->attid == *rdn_attid) {
351                 return 1;
352         }
353
354         /*
355          * the rdn attribute should be at the end!
356          * so we need to return a value less than zero
357          * which means m2 is greater than m1
358          */
359         if (m2->attid == *rdn_attid) {
360                 return -1;
361         }
362
363         return m1->attid - m2->attid;
364 }
365
366 static void replmd_replPropertyMetaDataCtr1_sort(struct replPropertyMetaDataCtr1 *ctr1,
367                                                  const uint32_t *rdn_attid)
368 {
369         ldb_qsort(ctr1->array, ctr1->count, sizeof(struct replPropertyMetaData1),
370                   discard_const_p(void, rdn_attid), (ldb_qsort_cmp_fn_t)replmd_replPropertyMetaData1_attid_sort);
371 }
372
373 static int replmd_ldb_message_element_attid_sort(const struct ldb_message_element *e1,
374                                                  const struct ldb_message_element *e2,
375                                                  const struct dsdb_schema *schema)
376 {
377         const struct dsdb_attribute *a1;
378         const struct dsdb_attribute *a2;
379
380         /* 
381          * TODO: make this faster by caching the dsdb_attribute pointer
382          *       on the ldb_messag_element
383          */
384
385         a1 = dsdb_attribute_by_lDAPDisplayName(schema, e1->name);
386         a2 = dsdb_attribute_by_lDAPDisplayName(schema, e2->name);
387
388         /*
389          * TODO: remove this check, we should rely on e1 and e2 having valid attribute names
390          *       in the schema
391          */
392         if (!a1 || !a2) {
393                 return strcasecmp(e1->name, e2->name);
394         }
395
396         return a1->attributeID_id - a2->attributeID_id;
397 }
398
399 static void replmd_ldb_message_sort(struct ldb_message *msg,
400                                     const struct dsdb_schema *schema)
401 {
402         ldb_qsort(msg->elements, msg->num_elements, sizeof(struct ldb_message_element),
403                   discard_const_p(void, schema), (ldb_qsort_cmp_fn_t)replmd_ldb_message_element_attid_sort);
404 }
405
406 static int replmd_op_callback(struct ldb_request *req, struct ldb_reply *ares)
407 {
408         struct ldb_context *ldb;
409         struct replmd_replicated_request *ac;
410
411         ac = talloc_get_type(req->context, struct replmd_replicated_request);
412         ldb = ldb_module_get_ctx(ac->module);
413
414         if (!ares) {
415                 return ldb_module_done(ac->req, NULL, NULL,
416                                         LDB_ERR_OPERATIONS_ERROR);
417         }
418         if (ares->error != LDB_SUCCESS) {
419                 return ldb_module_done(ac->req, ares->controls,
420                                         ares->response, ares->error);
421         }
422
423         if (ares->type != LDB_REPLY_DONE) {
424                 ldb_set_errstring(ldb,
425                                   "invalid ldb_reply_type in callback");
426                 talloc_free(ares);
427                 return ldb_module_done(ac->req, NULL, NULL,
428                                         LDB_ERR_OPERATIONS_ERROR);
429         }
430
431         return ldb_module_done(ac->req, ares->controls,
432                                 ares->response, LDB_SUCCESS);
433 }
434
435 static int replmd_add(struct ldb_module *module, struct ldb_request *req)
436 {
437         struct ldb_context *ldb;
438         struct replmd_replicated_request *ac;
439         const struct dsdb_schema *schema;
440         enum ndr_err_code ndr_err;
441         struct ldb_request *down_req;
442         struct ldb_message *msg;
443         const struct dsdb_attribute *rdn_attr = NULL;
444         struct GUID guid;
445         struct ldb_val guid_value;
446         struct replPropertyMetaDataBlob nmd;
447         struct ldb_val nmd_value;
448         uint64_t seq_num;
449         const struct GUID *our_invocation_id;
450         time_t t = time(NULL);
451         NTTIME now;
452         char *time_str;
453         int ret;
454         uint32_t i, ni=0;
455
456         /* do not manipulate our control entries */
457         if (ldb_dn_is_special(req->op.add.message->dn)) {
458                 return ldb_next_request(module, req);
459         }
460
461         ldb = ldb_module_get_ctx(module);
462
463         ldb_debug(ldb, LDB_DEBUG_TRACE, "replmd_add\n");
464
465         schema = dsdb_get_schema(ldb);
466         if (!schema) {
467                 ldb_debug_set(ldb, LDB_DEBUG_FATAL,
468                               "replmd_add: no dsdb_schema loaded");
469                 DEBUG(0,(__location__ ": %s\n", ldb_errstring(ldb)));
470                 return LDB_ERR_CONSTRAINT_VIOLATION;
471         }
472
473         ac = replmd_ctx_init(module, req);
474         if (!ac) {
475                 return LDB_ERR_OPERATIONS_ERROR;
476         }
477
478         ac->schema = schema;
479
480         if (ldb_msg_find_element(req->op.add.message, "objectGUID") != NULL) {
481                 ldb_debug_set(ldb, LDB_DEBUG_ERROR,
482                               "replmd_add: it's not allowed to add an object with objectGUID\n");
483                 return LDB_ERR_UNWILLING_TO_PERFORM;
484         }
485
486         /* Get a sequence number from the backend */
487         ret = ldb_sequence_number(ldb, LDB_SEQ_NEXT, &seq_num);
488         if (ret != LDB_SUCCESS) {
489                 return ret;
490         }
491
492         /* a new GUID */
493         guid = GUID_random();
494
495         /* get our invocationId */
496         our_invocation_id = samdb_ntds_invocation_id(ldb);
497         if (!our_invocation_id) {
498                 ldb_debug_set(ldb, LDB_DEBUG_ERROR,
499                               "replmd_add: unable to find invocationId\n");
500                 return LDB_ERR_OPERATIONS_ERROR;
501         }
502
503         /* we have to copy the message as the caller might have it as a const */
504         msg = ldb_msg_copy_shallow(ac, req->op.add.message);
505         if (msg == NULL) {
506                 ldb_oom(ldb);
507                 return LDB_ERR_OPERATIONS_ERROR;
508         }
509
510         /* generated times */
511         unix_to_nt_time(&now, t);
512         time_str = ldb_timestring(msg, t);
513         if (!time_str) {
514                 return LDB_ERR_OPERATIONS_ERROR;
515         }
516
517         /* 
518          * remove autogenerated attributes
519          */
520         ldb_msg_remove_attr(msg, "whenCreated");
521         ldb_msg_remove_attr(msg, "whenChanged");
522         ldb_msg_remove_attr(msg, "uSNCreated");
523         ldb_msg_remove_attr(msg, "uSNChanged");
524         ldb_msg_remove_attr(msg, "replPropertyMetaData");
525
526         /*
527          * readd replicated attributes
528          */
529         ret = ldb_msg_add_string(msg, "whenCreated", time_str);
530         if (ret != LDB_SUCCESS) {
531                 ldb_oom(ldb);
532                 return LDB_ERR_OPERATIONS_ERROR;
533         }
534
535         /* build the replication meta_data */
536         ZERO_STRUCT(nmd);
537         nmd.version             = 1;
538         nmd.ctr.ctr1.count      = msg->num_elements;
539         nmd.ctr.ctr1.array      = talloc_array(msg,
540                                                struct replPropertyMetaData1,
541                                                nmd.ctr.ctr1.count);
542         if (!nmd.ctr.ctr1.array) {
543                 ldb_oom(ldb);
544                 return LDB_ERR_OPERATIONS_ERROR;
545         }
546
547         for (i=0; i < msg->num_elements; i++) {
548                 struct ldb_message_element *e = &msg->elements[i];
549                 struct replPropertyMetaData1 *m = &nmd.ctr.ctr1.array[ni];
550                 const struct dsdb_attribute *sa;
551
552                 if (e->name[0] == '@') continue;
553
554                 sa = dsdb_attribute_by_lDAPDisplayName(schema, e->name);
555                 if (!sa) {
556                         ldb_debug_set(ldb, LDB_DEBUG_ERROR,
557                                       "replmd_add: attribute '%s' not defined in schema\n",
558                                       e->name);
559                         return LDB_ERR_NO_SUCH_ATTRIBUTE;
560                 }
561
562                 if ((sa->systemFlags & 0x00000001) || (sa->systemFlags & 0x00000004)) {
563                         /* if the attribute is not replicated (0x00000001)
564                          * or constructed (0x00000004) it has no metadata
565                          */
566                         continue;
567                 }
568
569                 m->attid                        = sa->attributeID_id;
570                 m->version                      = 1;
571                 m->originating_change_time      = now;
572                 m->originating_invocation_id    = *our_invocation_id;
573                 m->originating_usn              = seq_num;
574                 m->local_usn                    = seq_num;
575                 ni++;
576
577                 if (ldb_attr_cmp(e->name, ldb_dn_get_rdn_name(msg->dn))) {
578                         rdn_attr = sa;
579                 }
580         }
581
582         /* fix meta data count */
583         nmd.ctr.ctr1.count = ni;
584
585         /*
586          * sort meta data array, and move the rdn attribute entry to the end
587          */
588         replmd_replPropertyMetaDataCtr1_sort(&nmd.ctr.ctr1, &rdn_attr->attributeID_id);
589
590         /* generated NDR encoded values */
591         ndr_err = ndr_push_struct_blob(&guid_value, msg, 
592                                        NULL,
593                                        &guid,
594                                        (ndr_push_flags_fn_t)ndr_push_GUID);
595         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
596                 ldb_oom(ldb);
597                 return LDB_ERR_OPERATIONS_ERROR;
598         }
599         ndr_err = ndr_push_struct_blob(&nmd_value, msg, 
600                                        lp_iconv_convenience(ldb_get_opaque(ldb, "loadparm")),
601                                        &nmd,
602                                        (ndr_push_flags_fn_t)ndr_push_replPropertyMetaDataBlob);
603         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
604                 ldb_oom(ldb);
605                 return LDB_ERR_OPERATIONS_ERROR;
606         }
607
608         /*
609          * add the autogenerated values
610          */
611         ret = ldb_msg_add_value(msg, "objectGUID", &guid_value, NULL);
612         if (ret != LDB_SUCCESS) {
613                 ldb_oom(ldb);
614                 return LDB_ERR_OPERATIONS_ERROR;
615         }
616         ret = ldb_msg_add_string(msg, "whenChanged", time_str);
617         if (ret != LDB_SUCCESS) {
618                 ldb_oom(ldb);
619                 return LDB_ERR_OPERATIONS_ERROR;
620         }
621         ret = samdb_msg_add_uint64(ldb, msg, msg, "uSNCreated", seq_num);
622         if (ret != LDB_SUCCESS) {
623                 ldb_oom(ldb);
624                 return LDB_ERR_OPERATIONS_ERROR;
625         }
626         ret = samdb_msg_add_uint64(ldb, msg, msg, "uSNChanged", seq_num);
627         if (ret != LDB_SUCCESS) {
628                 ldb_oom(ldb);
629                 return LDB_ERR_OPERATIONS_ERROR;
630         }
631         ret = ldb_msg_add_value(msg, "replPropertyMetaData", &nmd_value, NULL);
632         if (ret != LDB_SUCCESS) {
633                 ldb_oom(ldb);
634                 return LDB_ERR_OPERATIONS_ERROR;
635         }
636
637         /*
638          * sort the attributes by attid before storing the object
639          */
640         replmd_ldb_message_sort(msg, schema);
641
642         ret = ldb_build_add_req(&down_req, ldb, ac,
643                                 msg,
644                                 req->controls,
645                                 ac, replmd_op_callback,
646                                 req);
647         if (ret != LDB_SUCCESS) {
648                 return ret;
649         }
650
651         ret = replmd_notify(module, msg->dn, seq_num);
652         if (ret != LDB_SUCCESS) {
653                 return ret;
654         }
655
656         /* go on with the call chain */
657         return ldb_next_request(module, down_req);
658 }
659
660
661 /*
662  * update the replPropertyMetaData for one element
663  */
664 static int replmd_update_rpmd_element(struct ldb_context *ldb, 
665                                       struct ldb_message *msg,
666                                       struct ldb_message_element *el,
667                                       struct replPropertyMetaDataBlob *omd,
668                                       struct dsdb_schema *schema,
669                                       uint64_t *seq_num,
670                                       const struct GUID *our_invocation_id,
671                                       NTTIME now)
672 {
673         int i;
674         const struct dsdb_attribute *a;
675         struct replPropertyMetaData1 *md1;
676
677         a = dsdb_attribute_by_lDAPDisplayName(schema, el->name);
678         if (a == NULL) {
679                 DEBUG(0,(__location__ ": Unable to find attribute %s in schema\n",
680                          el->name));
681                 return LDB_ERR_OPERATIONS_ERROR;
682         }
683
684         if ((a->systemFlags & 0x00000001) || (a->systemFlags & 0x00000004)) {
685                 /* if the attribute is not replicated (0x00000001)
686                  * or constructed (0x00000004) it has no metadata
687                  */
688                 return LDB_SUCCESS;
689         }
690
691         for (i=0; i<omd->ctr.ctr1.count; i++) {
692                 if (a->attributeID_id == omd->ctr.ctr1.array[i].attid) break;
693         }
694         if (i == omd->ctr.ctr1.count) {
695                 /* we need to add a new one */
696                 omd->ctr.ctr1.array = talloc_realloc(msg, omd->ctr.ctr1.array, 
697                                                      struct replPropertyMetaData1, omd->ctr.ctr1.count+1);
698                 if (omd->ctr.ctr1.array == NULL) {
699                         ldb_oom(ldb);
700                         return LDB_ERR_OPERATIONS_ERROR;
701                 }
702                 omd->ctr.ctr1.count++;
703                 ZERO_STRUCT(omd->ctr.ctr1.array[i]);
704         }
705
706         /* Get a new sequence number from the backend. We only do this
707          * if we have a change that requires a new
708          * replPropertyMetaData element 
709          */
710         if (*seq_num == 0) {
711                 int ret = ldb_sequence_number(ldb, LDB_SEQ_NEXT, seq_num);
712                 if (ret != LDB_SUCCESS) {
713                         return LDB_ERR_OPERATIONS_ERROR;
714                 }
715         }
716
717         md1 = &omd->ctr.ctr1.array[i];
718         md1->version++;
719         md1->attid                     = a->attributeID_id;
720         md1->originating_change_time   = now;
721         md1->originating_invocation_id = *our_invocation_id;
722         md1->originating_usn           = *seq_num;
723         md1->local_usn                 = *seq_num;
724         
725         return LDB_SUCCESS;
726 }
727
728 /*
729  * update the replPropertyMetaData object each time we modify an
730  * object. This is needed for DRS replication, as the merge on the
731  * client is based on this object 
732  */
733 static int replmd_update_rpmd(struct ldb_module *module, 
734                               struct ldb_message *msg, uint64_t *seq_num)
735 {
736         const struct ldb_val *omd_value;
737         enum ndr_err_code ndr_err;
738         struct replPropertyMetaDataBlob omd;
739         int i;
740         struct dsdb_schema *schema;
741         time_t t = time(NULL);
742         NTTIME now;
743         const struct GUID *our_invocation_id;
744         int ret;
745         const char *attrs[] = { "replPropertyMetaData" , NULL };
746         struct ldb_result *res;
747         struct ldb_context *ldb;
748
749         ldb = ldb_module_get_ctx(module);
750
751         our_invocation_id = samdb_ntds_invocation_id(ldb);
752         if (!our_invocation_id) {
753                 /* this happens during an initial vampire while
754                    updating the schema */
755                 DEBUG(5,("No invocationID - skipping replPropertyMetaData update\n"));
756                 return LDB_SUCCESS;
757         }
758
759         unix_to_nt_time(&now, t);
760
761         /* search for the existing replPropertyMetaDataBlob */
762         ret = ldb_search(ldb, msg, &res, msg->dn, LDB_SCOPE_BASE, attrs, NULL);
763         if (ret != LDB_SUCCESS || res->count < 1) {
764                 DEBUG(0,(__location__ ": Object %s failed to find replPropertyMetaData\n",
765                          ldb_dn_get_linearized(msg->dn)));
766                 return LDB_ERR_OPERATIONS_ERROR;
767         }
768                 
769
770         omd_value = ldb_msg_find_ldb_val(res->msgs[0], "replPropertyMetaData");
771         if (!omd_value) {
772                 DEBUG(0,(__location__ ": Object %s does not have a replPropertyMetaData attribute\n",
773                          ldb_dn_get_linearized(msg->dn)));
774                 return LDB_ERR_OPERATIONS_ERROR;
775         }
776
777         ndr_err = ndr_pull_struct_blob(omd_value, msg,
778                                        lp_iconv_convenience(ldb_get_opaque(ldb, "loadparm")), &omd,
779                                        (ndr_pull_flags_fn_t)ndr_pull_replPropertyMetaDataBlob);
780         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
781                 DEBUG(0,(__location__ ": Failed to parse replPropertyMetaData for %s\n",
782                          ldb_dn_get_linearized(msg->dn)));
783                 return LDB_ERR_OPERATIONS_ERROR;
784         }
785
786         if (omd.version != 1) {
787                 DEBUG(0,(__location__ ": bad version %u in replPropertyMetaData for %s\n",
788                          omd.version, ldb_dn_get_linearized(msg->dn)));
789                 return LDB_ERR_OPERATIONS_ERROR;
790         }
791
792         schema = dsdb_get_schema(ldb);
793
794         for (i=0; i<msg->num_elements; i++) {
795                 ret = replmd_update_rpmd_element(ldb, msg, &msg->elements[i], &omd, schema, seq_num, 
796                                                  our_invocation_id, now);
797                 if (ret != LDB_SUCCESS) {
798                         return ret;
799                 }
800         }
801
802         /*
803          * replmd_update_rpmd_element has done an update if the
804          * seq_num is set
805          */
806         if (*seq_num != 0) {
807                 struct ldb_val *md_value;
808                 struct ldb_message_element *el;
809
810                 md_value = talloc(msg, struct ldb_val);
811                 if (md_value == NULL) {
812                         ldb_oom(ldb);
813                         return LDB_ERR_OPERATIONS_ERROR;
814                 }
815
816                 ndr_err = ndr_push_struct_blob(md_value, msg, 
817                                                lp_iconv_convenience(ldb_get_opaque(ldb, "loadparm")),
818                                                &omd,
819                                                (ndr_push_flags_fn_t)ndr_push_replPropertyMetaDataBlob);
820                 if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
821                         DEBUG(0,(__location__ ": Failed to marshall replPropertyMetaData for %s\n",
822                                  ldb_dn_get_linearized(msg->dn)));
823                         return LDB_ERR_OPERATIONS_ERROR;
824                 }
825
826                 ret = ldb_msg_add_empty(msg, "replPropertyMetaData", LDB_FLAG_MOD_REPLACE, &el);
827                 if (ret != LDB_SUCCESS) {
828                         DEBUG(0,(__location__ ": Failed to add updated replPropertyMetaData %s\n",
829                                  ldb_dn_get_linearized(msg->dn)));
830                         return ret;
831                 }
832
833                 ret = replmd_notify(module, msg->dn, *seq_num);
834                 if (ret != LDB_SUCCESS) {
835                         return ret;
836                 }
837
838                 el->num_values = 1;
839                 el->values = md_value;
840         }
841
842         return LDB_SUCCESS;     
843 }
844
845
846 static int replmd_modify(struct ldb_module *module, struct ldb_request *req)
847 {
848         struct ldb_context *ldb;
849         struct replmd_replicated_request *ac;
850         const struct dsdb_schema *schema;
851         struct ldb_request *down_req;
852         struct ldb_message *msg;
853         int ret;
854         time_t t = time(NULL);
855         uint64_t seq_num = 0;
856
857         /* do not manipulate our control entries */
858         if (ldb_dn_is_special(req->op.mod.message->dn)) {
859                 return ldb_next_request(module, req);
860         }
861
862         ldb = ldb_module_get_ctx(module);
863
864         ldb_debug(ldb, LDB_DEBUG_TRACE, "replmd_modify\n");
865
866         schema = dsdb_get_schema(ldb);
867         if (!schema) {
868                 ldb_debug_set(ldb, LDB_DEBUG_FATAL,
869                               "replmd_modify: no dsdb_schema loaded");
870                 DEBUG(0,(__location__ ": %s\n", ldb_errstring(ldb)));
871                 return LDB_ERR_CONSTRAINT_VIOLATION;
872         }
873
874         ac = replmd_ctx_init(module, req);
875         if (!ac) {
876                 return LDB_ERR_OPERATIONS_ERROR;
877         }
878
879         ac->schema = schema;
880
881         /* we have to copy the message as the caller might have it as a const */
882         msg = ldb_msg_copy_shallow(ac, req->op.mod.message);
883         if (msg == NULL) {
884                 talloc_free(ac);
885                 return LDB_ERR_OPERATIONS_ERROR;
886         }
887
888         /* TODO:
889          * - get the whole old object
890          * - if the old object doesn't exist report an error
891          * - give an error when a readonly attribute should
892          *   be modified
893          * - merge the changed into the old object
894          *   if the caller set values to the same value
895          *   ignore the attribute, return success when no
896          *   attribute was changed
897          */
898
899         ret = replmd_update_rpmd(module, msg, &seq_num);
900         if (ret != LDB_SUCCESS) {
901                 return ret;
902         }
903
904         /* TODO:
905          * - sort the attributes by attid with replmd_ldb_message_sort()
906          * - replace the old object with the newly constructed one
907          */
908
909         ret = ldb_build_mod_req(&down_req, ldb, ac,
910                                 msg,
911                                 req->controls,
912                                 ac, replmd_op_callback,
913                                 req);
914         if (ret != LDB_SUCCESS) {
915                 return ret;
916         }
917         talloc_steal(down_req, msg);
918
919         /* we only change whenChanged and uSNChanged if the seq_num
920            has changed */
921         if (seq_num != 0) {
922                 if (add_time_element(msg, "whenChanged", t) != LDB_SUCCESS) {
923                         talloc_free(ac);
924                         return LDB_ERR_OPERATIONS_ERROR;
925                 }
926
927                 if (add_uint64_element(msg, "uSNChanged", seq_num) != LDB_SUCCESS) {
928                         talloc_free(ac);
929                         return LDB_ERR_OPERATIONS_ERROR;
930                 }
931         }
932
933         /* go on with the call chain */
934         return ldb_next_request(module, down_req);
935 }
936
937
938 /*
939   handle a rename request
940
941   On a rename we need to do an extra ldb_modify which sets the
942   whenChanged and uSNChanged attributes
943  */
944 static int replmd_rename(struct ldb_module *module, struct ldb_request *req)
945 {
946         struct ldb_context *ldb;
947         int ret, i;
948         time_t t = time(NULL);
949         uint64_t seq_num = 0;
950         struct ldb_message *msg;
951         struct replmd_private *replmd_private = 
952                 talloc_get_type(ldb_module_get_private(module), struct replmd_private);
953
954         /* do not manipulate our control entries */
955         if (ldb_dn_is_special(req->op.mod.message->dn)) {
956                 return ldb_next_request(module, req);
957         }
958
959         ldb = ldb_module_get_ctx(module);
960
961         ldb_debug(ldb, LDB_DEBUG_TRACE, "replmd_rename\n");
962
963         /* Get a sequence number from the backend */
964         ret = ldb_sequence_number(ldb, LDB_SEQ_NEXT, &seq_num);
965         if (ret != LDB_SUCCESS) {
966                 return ret;
967         }
968
969         msg = ldb_msg_new(req);
970         if (msg == NULL) {
971                 ldb_oom(ldb);
972                 return LDB_ERR_OPERATIONS_ERROR;
973         }
974
975         msg->dn = req->op.rename.olddn;
976
977         if (add_time_element(msg, "whenChanged", t) != LDB_SUCCESS) {
978                 talloc_free(msg);
979                 return LDB_ERR_OPERATIONS_ERROR;
980         }
981         msg->elements[0].flags = LDB_FLAG_MOD_REPLACE;
982
983         if (add_uint64_element(msg, "uSNChanged", seq_num) != LDB_SUCCESS) {
984                 talloc_free(msg);
985                 return LDB_ERR_OPERATIONS_ERROR;
986         }
987         msg->elements[1].flags = LDB_FLAG_MOD_REPLACE;
988
989         ret = ldb_modify(ldb, msg);
990         talloc_free(msg);
991         if (ret != LDB_SUCCESS) {
992                 return ret;
993         }
994
995         ret = replmd_load_NCs(module);
996         if (ret != 0) {
997                 return ret;
998         }
999
1000         /* now update the highest uSNs of the partitions that are
1001            affected. Note that two partitions could be changing */
1002         for (i=0; i<replmd_private->num_ncs; i++) {
1003                 if (ldb_dn_compare_base(replmd_private->ncs[i].dn, 
1004                                         req->op.rename.olddn) == 0) {
1005                         break;
1006                 }
1007         }
1008         if (i == replmd_private->num_ncs) {
1009                 DEBUG(0,(__location__ ": rename olddn outside tree? %s\n",
1010                          ldb_dn_get_linearized(req->op.rename.olddn)));
1011                 return LDB_ERR_OPERATIONS_ERROR;
1012         }
1013         replmd_private->ncs[i].mod_usn = seq_num;
1014
1015         for (i=0; i<replmd_private->num_ncs; i++) {
1016                 if (ldb_dn_compare_base(replmd_private->ncs[i].dn, 
1017                                         req->op.rename.newdn) == 0) {
1018                         break;
1019                 }
1020         }
1021         if (i == replmd_private->num_ncs) {
1022                 DEBUG(0,(__location__ ": rename newdn outside tree? %s\n",
1023                          ldb_dn_get_linearized(req->op.rename.newdn)));
1024                 return LDB_ERR_OPERATIONS_ERROR;
1025         }
1026         replmd_private->ncs[i].mod_usn = seq_num;
1027         
1028         /* go on with the call chain */
1029         return ldb_next_request(module, req);
1030 }
1031
1032
1033 static int replmd_replicated_request_error(struct replmd_replicated_request *ar, int ret)
1034 {
1035         return ret;
1036 }
1037
1038 static int replmd_replicated_request_werror(struct replmd_replicated_request *ar, WERROR status)
1039 {
1040         int ret = LDB_ERR_OTHER;
1041         /* TODO: do some error mapping */
1042         return ret;
1043 }
1044
1045 static int replmd_replicated_apply_next(struct replmd_replicated_request *ar);
1046
1047 static int replmd_replicated_apply_add_callback(struct ldb_request *req,
1048                                                 struct ldb_reply *ares)
1049 {
1050         struct ldb_context *ldb;
1051         struct replmd_replicated_request *ar = talloc_get_type(req->context,
1052                                                struct replmd_replicated_request);
1053         int ret;
1054
1055         ldb = ldb_module_get_ctx(ar->module);
1056
1057         if (!ares) {
1058                 return ldb_module_done(ar->req, NULL, NULL,
1059                                         LDB_ERR_OPERATIONS_ERROR);
1060         }
1061         if (ares->error != LDB_SUCCESS) {
1062                 return ldb_module_done(ar->req, ares->controls,
1063                                         ares->response, ares->error);
1064         }
1065
1066         if (ares->type != LDB_REPLY_DONE) {
1067                 ldb_set_errstring(ldb, "Invalid reply type\n!");
1068                 return ldb_module_done(ar->req, NULL, NULL,
1069                                         LDB_ERR_OPERATIONS_ERROR);
1070         }
1071
1072         talloc_free(ares);
1073         ar->index_current++;
1074
1075         ret = replmd_replicated_apply_next(ar);
1076         if (ret != LDB_SUCCESS) {
1077                 return ldb_module_done(ar->req, NULL, NULL, ret);
1078         }
1079
1080         return LDB_SUCCESS;
1081 }
1082
1083 static int replmd_replicated_apply_add(struct replmd_replicated_request *ar)
1084 {
1085         struct ldb_context *ldb;
1086         struct ldb_request *change_req;
1087         enum ndr_err_code ndr_err;
1088         struct ldb_message *msg;
1089         struct replPropertyMetaDataBlob *md;
1090         struct ldb_val md_value;
1091         uint32_t i;
1092         uint64_t seq_num;
1093         int ret;
1094
1095         /*
1096          * TODO: check if the parent object exist
1097          */
1098
1099         /*
1100          * TODO: handle the conflict case where an object with the
1101          *       same name exist
1102          */
1103
1104         ldb = ldb_module_get_ctx(ar->module);
1105         msg = ar->objs->objects[ar->index_current].msg;
1106         md = ar->objs->objects[ar->index_current].meta_data;
1107
1108         ret = ldb_sequence_number(ldb, LDB_SEQ_NEXT, &seq_num);
1109         if (ret != LDB_SUCCESS) {
1110                 return replmd_replicated_request_error(ar, ret);
1111         }
1112
1113         ret = ldb_msg_add_value(msg, "objectGUID", &ar->objs->objects[ar->index_current].guid_value, NULL);
1114         if (ret != LDB_SUCCESS) {
1115                 return replmd_replicated_request_error(ar, ret);
1116         }
1117
1118         ret = ldb_msg_add_string(msg, "whenChanged", ar->objs->objects[ar->index_current].when_changed);
1119         if (ret != LDB_SUCCESS) {
1120                 return replmd_replicated_request_error(ar, ret);
1121         }
1122
1123         ret = samdb_msg_add_uint64(ldb, msg, msg, "uSNCreated", seq_num);
1124         if (ret != LDB_SUCCESS) {
1125                 return replmd_replicated_request_error(ar, ret);
1126         }
1127
1128         ret = samdb_msg_add_uint64(ldb, msg, msg, "uSNChanged", seq_num);
1129         if (ret != LDB_SUCCESS) {
1130                 return replmd_replicated_request_error(ar, ret);
1131         }
1132
1133         ret = replmd_notify(ar->module, msg->dn, seq_num);
1134         if (ret != LDB_SUCCESS) {
1135                 return replmd_replicated_request_error(ar, ret);
1136         }
1137
1138         /*
1139          * the meta data array is already sorted by the caller
1140          */
1141         for (i=0; i < md->ctr.ctr1.count; i++) {
1142                 md->ctr.ctr1.array[i].local_usn = seq_num;
1143         }
1144         ndr_err = ndr_push_struct_blob(&md_value, msg, 
1145                                        lp_iconv_convenience(ldb_get_opaque(ldb, "loadparm")),
1146                                        md,
1147                                        (ndr_push_flags_fn_t)ndr_push_replPropertyMetaDataBlob);
1148         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
1149                 NTSTATUS nt_status = ndr_map_error2ntstatus(ndr_err);
1150                 return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
1151         }
1152         ret = ldb_msg_add_value(msg, "replPropertyMetaData", &md_value, NULL);
1153         if (ret != LDB_SUCCESS) {
1154                 return replmd_replicated_request_error(ar, ret);
1155         }
1156
1157         replmd_ldb_message_sort(msg, ar->schema);
1158
1159         if (DEBUGLVL(4)) {
1160                 char *s = ldb_ldif_message_string(ldb, ar, LDB_CHANGETYPE_ADD, msg);
1161                 DEBUG(4, ("DRS replication add message:\n%s\n", s));
1162                 talloc_free(s);
1163         }
1164
1165         ret = ldb_build_add_req(&change_req,
1166                                 ldb,
1167                                 ar,
1168                                 msg,
1169                                 ar->controls,
1170                                 ar,
1171                                 replmd_replicated_apply_add_callback,
1172                                 ar->req);
1173         if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
1174
1175         return ldb_next_request(ar->module, change_req);
1176 }
1177
1178 static int replmd_replPropertyMetaData1_conflict_compare(struct replPropertyMetaData1 *m1,
1179                                                          struct replPropertyMetaData1 *m2)
1180 {
1181         int ret;
1182
1183         if (m1->version != m2->version) {
1184                 return m1->version - m2->version;
1185         }
1186
1187         if (m1->originating_change_time != m2->originating_change_time) {
1188                 return m1->originating_change_time - m2->originating_change_time;
1189         }
1190
1191         ret = GUID_compare(&m1->originating_invocation_id, &m2->originating_invocation_id);
1192         if (ret != 0) {
1193                 return ret;
1194         }
1195
1196         return m1->originating_usn - m2->originating_usn;
1197 }
1198
1199 static int replmd_replicated_apply_merge_callback(struct ldb_request *req,
1200                                                   struct ldb_reply *ares)
1201 {
1202         struct ldb_context *ldb;
1203         struct replmd_replicated_request *ar = talloc_get_type(req->context,
1204                                                struct replmd_replicated_request);
1205         int ret;
1206
1207         ldb = ldb_module_get_ctx(ar->module);
1208
1209         if (!ares) {
1210                 return ldb_module_done(ar->req, NULL, NULL,
1211                                         LDB_ERR_OPERATIONS_ERROR);
1212         }
1213         if (ares->error != LDB_SUCCESS) {
1214                 return ldb_module_done(ar->req, ares->controls,
1215                                         ares->response, ares->error);
1216         }
1217
1218         if (ares->type != LDB_REPLY_DONE) {
1219                 ldb_set_errstring(ldb, "Invalid reply type\n!");
1220                 return ldb_module_done(ar->req, NULL, NULL,
1221                                         LDB_ERR_OPERATIONS_ERROR);
1222         }
1223
1224         talloc_free(ares);
1225         ar->index_current++;
1226
1227         ret = replmd_replicated_apply_next(ar);
1228         if (ret != LDB_SUCCESS) {
1229                 return ldb_module_done(ar->req, NULL, NULL, ret);
1230         }
1231
1232         return LDB_SUCCESS;
1233 }
1234
1235 static int replmd_replicated_apply_merge(struct replmd_replicated_request *ar)
1236 {
1237         struct ldb_context *ldb;
1238         struct ldb_request *change_req;
1239         enum ndr_err_code ndr_err;
1240         struct ldb_message *msg;
1241         struct replPropertyMetaDataBlob *rmd;
1242         struct replPropertyMetaDataBlob omd;
1243         const struct ldb_val *omd_value;
1244         struct replPropertyMetaDataBlob nmd;
1245         struct ldb_val nmd_value;
1246         uint32_t i,j,ni=0;
1247         uint32_t removed_attrs = 0;
1248         uint64_t seq_num;
1249         int ret;
1250
1251         ldb = ldb_module_get_ctx(ar->module);
1252         msg = ar->objs->objects[ar->index_current].msg;
1253         rmd = ar->objs->objects[ar->index_current].meta_data;
1254         ZERO_STRUCT(omd);
1255         omd.version = 1;
1256
1257         /*
1258          * TODO: check repl data is correct after a rename
1259          */
1260         if (ldb_dn_compare(msg->dn, ar->search_msg->dn) != 0) {
1261                 ldb_debug(ldb, LDB_DEBUG_TRACE, "replmd_replicated_request rename %s => %s\n",
1262                           ldb_dn_get_linearized(ar->search_msg->dn),
1263                           ldb_dn_get_linearized(msg->dn));
1264                 if (ldb_rename(ldb, ar->search_msg->dn, msg->dn) != LDB_SUCCESS) {
1265                         ldb_debug(ldb, LDB_DEBUG_FATAL, "replmd_replicated_request rename %s => %s failed - %s\n",
1266                                   ldb_dn_get_linearized(ar->search_msg->dn),
1267                                   ldb_dn_get_linearized(msg->dn),
1268                                   ldb_errstring(ldb));
1269                         return replmd_replicated_request_werror(ar, WERR_DS_DRA_DB_ERROR);
1270                 }
1271         }
1272
1273         /* find existing meta data */
1274         omd_value = ldb_msg_find_ldb_val(ar->search_msg, "replPropertyMetaData");
1275         if (omd_value) {
1276                 ndr_err = ndr_pull_struct_blob(omd_value, ar,
1277                                                lp_iconv_convenience(ldb_get_opaque(ldb, "loadparm")), &omd,
1278                                                (ndr_pull_flags_fn_t)ndr_pull_replPropertyMetaDataBlob);
1279                 if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
1280                         NTSTATUS nt_status = ndr_map_error2ntstatus(ndr_err);
1281                         return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
1282                 }
1283
1284                 if (omd.version != 1) {
1285                         return replmd_replicated_request_werror(ar, WERR_DS_DRA_INTERNAL_ERROR);
1286                 }
1287         }
1288
1289         ZERO_STRUCT(nmd);
1290         nmd.version = 1;
1291         nmd.ctr.ctr1.count = omd.ctr.ctr1.count + rmd->ctr.ctr1.count;
1292         nmd.ctr.ctr1.array = talloc_array(ar,
1293                                           struct replPropertyMetaData1,
1294                                           nmd.ctr.ctr1.count);
1295         if (!nmd.ctr.ctr1.array) return replmd_replicated_request_werror(ar, WERR_NOMEM);
1296
1297         /* first copy the old meta data */
1298         for (i=0; i < omd.ctr.ctr1.count; i++) {
1299                 nmd.ctr.ctr1.array[ni]  = omd.ctr.ctr1.array[i];
1300                 ni++;
1301         }
1302
1303         /* now merge in the new meta data */
1304         for (i=0; i < rmd->ctr.ctr1.count; i++) {
1305                 bool found = false;
1306
1307                 for (j=0; j < ni; j++) {
1308                         int cmp;
1309
1310                         if (rmd->ctr.ctr1.array[i].attid != nmd.ctr.ctr1.array[j].attid) {
1311                                 continue;
1312                         }
1313
1314                         cmp = replmd_replPropertyMetaData1_conflict_compare(&rmd->ctr.ctr1.array[i],
1315                                                                             &nmd.ctr.ctr1.array[j]);
1316                         if (cmp > 0) {
1317                                 /* replace the entry */
1318                                 nmd.ctr.ctr1.array[j] = rmd->ctr.ctr1.array[i];
1319                                 found = true;
1320                                 break;
1321                         }
1322
1323                         /* we don't want to apply this change so remove the attribute */
1324                         ldb_msg_remove_element(msg, &msg->elements[i-removed_attrs]);
1325                         removed_attrs++;
1326
1327                         found = true;
1328                         break;
1329                 }
1330
1331                 if (found) continue;
1332
1333                 nmd.ctr.ctr1.array[ni] = rmd->ctr.ctr1.array[i];
1334                 ni++;
1335         }
1336
1337         /*
1338          * finally correct the size of the meta_data array
1339          */
1340         nmd.ctr.ctr1.count = ni;
1341
1342         /*
1343          * the rdn attribute (the alias for the name attribute),
1344          * 'cn' for most objects is the last entry in the meta data array
1345          * we have stored
1346          *
1347          * sort the new meta data array
1348          */
1349         {
1350                 struct replPropertyMetaData1 *rdn_p;
1351                 uint32_t rdn_idx = omd.ctr.ctr1.count - 1;
1352
1353                 rdn_p = &nmd.ctr.ctr1.array[rdn_idx];
1354                 replmd_replPropertyMetaDataCtr1_sort(&nmd.ctr.ctr1, &rdn_p->attid);
1355         }
1356
1357         /*
1358          * check if some replicated attributes left, otherwise skip the ldb_modify() call
1359          */
1360         if (msg->num_elements == 0) {
1361                 ldb_debug(ldb, LDB_DEBUG_TRACE, "replmd_replicated_apply_merge[%u]: skip replace\n",
1362                           ar->index_current);
1363
1364                 ar->index_current++;
1365                 return replmd_replicated_apply_next(ar);
1366         }
1367
1368         ldb_debug(ldb, LDB_DEBUG_TRACE, "replmd_replicated_apply_merge[%u]: replace %u attributes\n",
1369                   ar->index_current, msg->num_elements);
1370
1371         ret = ldb_sequence_number(ldb, LDB_SEQ_NEXT, &seq_num);
1372         if (ret != LDB_SUCCESS) {
1373                 return replmd_replicated_request_error(ar, ret);
1374         }
1375
1376         for (i=0; i<ni; i++) {
1377                 nmd.ctr.ctr1.array[i].local_usn = seq_num;
1378         }
1379
1380         /* create the meta data value */
1381         ndr_err = ndr_push_struct_blob(&nmd_value, msg, 
1382                                        lp_iconv_convenience(ldb_get_opaque(ldb, "loadparm")),
1383                                        &nmd,
1384                                        (ndr_push_flags_fn_t)ndr_push_replPropertyMetaDataBlob);
1385         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
1386                 NTSTATUS nt_status = ndr_map_error2ntstatus(ndr_err);
1387                 return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
1388         }
1389
1390         /*
1391          * when we know that we'll modify the record, add the whenChanged, uSNChanged
1392          * and replPopertyMetaData attributes
1393          */
1394         ret = ldb_msg_add_string(msg, "whenChanged", ar->objs->objects[ar->index_current].when_changed);
1395         if (ret != LDB_SUCCESS) {
1396                 return replmd_replicated_request_error(ar, ret);
1397         }
1398         ret = samdb_msg_add_uint64(ldb, msg, msg, "uSNChanged", seq_num);
1399         if (ret != LDB_SUCCESS) {
1400                 return replmd_replicated_request_error(ar, ret);
1401         }
1402         ret = ldb_msg_add_value(msg, "replPropertyMetaData", &nmd_value, NULL);
1403         if (ret != LDB_SUCCESS) {
1404                 return replmd_replicated_request_error(ar, ret);
1405         }
1406
1407         replmd_ldb_message_sort(msg, ar->schema);
1408
1409         /* we want to replace the old values */
1410         for (i=0; i < msg->num_elements; i++) {
1411                 msg->elements[i].flags = LDB_FLAG_MOD_REPLACE;
1412         }
1413
1414         ret = replmd_notify(ar->module, msg->dn, seq_num);
1415         if (ret != LDB_SUCCESS) {
1416                 return replmd_replicated_request_error(ar, ret);
1417         }
1418
1419         if (DEBUGLVL(4)) {
1420                 char *s = ldb_ldif_message_string(ldb, ar, LDB_CHANGETYPE_MODIFY, msg);
1421                 DEBUG(4, ("DRS replication modify message:\n%s\n", s));
1422                 talloc_free(s);
1423         }
1424
1425         ret = ldb_build_mod_req(&change_req,
1426                                 ldb,
1427                                 ar,
1428                                 msg,
1429                                 ar->controls,
1430                                 ar,
1431                                 replmd_replicated_apply_merge_callback,
1432                                 ar->req);
1433         if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
1434
1435         return ldb_next_request(ar->module, change_req);
1436 }
1437
1438 static int replmd_replicated_apply_search_callback(struct ldb_request *req,
1439                                                    struct ldb_reply *ares)
1440 {
1441         struct replmd_replicated_request *ar = talloc_get_type(req->context,
1442                                                struct replmd_replicated_request);
1443         int ret;
1444
1445         if (!ares) {
1446                 return ldb_module_done(ar->req, NULL, NULL,
1447                                         LDB_ERR_OPERATIONS_ERROR);
1448         }
1449         if (ares->error != LDB_SUCCESS &&
1450             ares->error != LDB_ERR_NO_SUCH_OBJECT) {
1451                 return ldb_module_done(ar->req, ares->controls,
1452                                         ares->response, ares->error);
1453         }
1454
1455         switch (ares->type) {
1456         case LDB_REPLY_ENTRY:
1457                 ar->search_msg = talloc_steal(ar, ares->message);
1458                 break;
1459
1460         case LDB_REPLY_REFERRAL:
1461                 /* we ignore referrals */
1462                 break;
1463
1464         case LDB_REPLY_DONE:
1465                 if (ar->search_msg != NULL) {
1466                         ret = replmd_replicated_apply_merge(ar);
1467                 } else {
1468                         ret = replmd_replicated_apply_add(ar);
1469                 }
1470                 if (ret != LDB_SUCCESS) {
1471                         return ldb_module_done(ar->req, NULL, NULL, ret);
1472                 }
1473         }
1474
1475         talloc_free(ares);
1476         return LDB_SUCCESS;
1477 }
1478
1479 static int replmd_replicated_uptodate_vector(struct replmd_replicated_request *ar);
1480
1481 static int replmd_replicated_apply_next(struct replmd_replicated_request *ar)
1482 {
1483         struct ldb_context *ldb;
1484         int ret;
1485         char *tmp_str;
1486         char *filter;
1487         struct ldb_request *search_req;
1488
1489         if (ar->index_current >= ar->objs->num_objects) {
1490                 /* done with it, go to next stage */
1491                 return replmd_replicated_uptodate_vector(ar);
1492         }
1493
1494         ldb = ldb_module_get_ctx(ar->module);
1495         ar->search_msg = NULL;
1496
1497         tmp_str = ldb_binary_encode(ar, ar->objs->objects[ar->index_current].guid_value);
1498         if (!tmp_str) return replmd_replicated_request_werror(ar, WERR_NOMEM);
1499
1500         filter = talloc_asprintf(ar, "(objectGUID=%s)", tmp_str);
1501         if (!filter) return replmd_replicated_request_werror(ar, WERR_NOMEM);
1502         talloc_free(tmp_str);
1503
1504         ret = ldb_build_search_req(&search_req,
1505                                    ldb,
1506                                    ar,
1507                                    ar->objs->partition_dn,
1508                                    LDB_SCOPE_SUBTREE,
1509                                    filter,
1510                                    NULL,
1511                                    NULL,
1512                                    ar,
1513                                    replmd_replicated_apply_search_callback,
1514                                    ar->req);
1515         if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
1516
1517         return ldb_next_request(ar->module, search_req);
1518 }
1519
1520 static int replmd_replicated_uptodate_modify_callback(struct ldb_request *req,
1521                                                       struct ldb_reply *ares)
1522 {
1523         struct ldb_context *ldb;
1524         struct replmd_replicated_request *ar = talloc_get_type(req->context,
1525                                                struct replmd_replicated_request);
1526         ldb = ldb_module_get_ctx(ar->module);
1527
1528         if (!ares) {
1529                 return ldb_module_done(ar->req, NULL, NULL,
1530                                         LDB_ERR_OPERATIONS_ERROR);
1531         }
1532         if (ares->error != LDB_SUCCESS) {
1533                 return ldb_module_done(ar->req, ares->controls,
1534                                         ares->response, ares->error);
1535         }
1536
1537         if (ares->type != LDB_REPLY_DONE) {
1538                 ldb_set_errstring(ldb, "Invalid reply type\n!");
1539                 return ldb_module_done(ar->req, NULL, NULL,
1540                                         LDB_ERR_OPERATIONS_ERROR);
1541         }
1542
1543         talloc_free(ares);
1544
1545         return ldb_module_done(ar->req, NULL, NULL, LDB_SUCCESS);
1546 }
1547
1548 static int replmd_replicated_uptodate_modify(struct replmd_replicated_request *ar)
1549 {
1550         struct ldb_context *ldb;
1551         struct ldb_request *change_req;
1552         enum ndr_err_code ndr_err;
1553         struct ldb_message *msg;
1554         struct replUpToDateVectorBlob ouv;
1555         const struct ldb_val *ouv_value;
1556         const struct drsuapi_DsReplicaCursor2CtrEx *ruv;
1557         struct replUpToDateVectorBlob nuv;
1558         struct ldb_val nuv_value;
1559         struct ldb_message_element *nuv_el = NULL;
1560         const struct GUID *our_invocation_id;
1561         struct ldb_message_element *orf_el = NULL;
1562         struct repsFromToBlob nrf;
1563         struct ldb_val *nrf_value = NULL;
1564         struct ldb_message_element *nrf_el = NULL;
1565         uint32_t i,j,ni=0;
1566         bool found = false;
1567         time_t t = time(NULL);
1568         NTTIME now;
1569         int ret;
1570
1571         ldb = ldb_module_get_ctx(ar->module);
1572         ruv = ar->objs->uptodateness_vector;
1573         ZERO_STRUCT(ouv);
1574         ouv.version = 2;
1575         ZERO_STRUCT(nuv);
1576         nuv.version = 2;
1577
1578         unix_to_nt_time(&now, t);
1579
1580         /*
1581          * first create the new replUpToDateVector
1582          */
1583         ouv_value = ldb_msg_find_ldb_val(ar->search_msg, "replUpToDateVector");
1584         if (ouv_value) {
1585                 ndr_err = ndr_pull_struct_blob(ouv_value, ar,
1586                                                lp_iconv_convenience(ldb_get_opaque(ldb, "loadparm")), &ouv,
1587                                                (ndr_pull_flags_fn_t)ndr_pull_replUpToDateVectorBlob);
1588                 if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
1589                         NTSTATUS nt_status = ndr_map_error2ntstatus(ndr_err);
1590                         return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
1591                 }
1592
1593                 if (ouv.version != 2) {
1594                         return replmd_replicated_request_werror(ar, WERR_DS_DRA_INTERNAL_ERROR);
1595                 }
1596         }
1597
1598         /*
1599          * the new uptodateness vector will at least
1600          * contain 1 entry, one for the source_dsa
1601          *
1602          * plus optional values from our old vector and the one from the source_dsa
1603          */
1604         nuv.ctr.ctr2.count = 1 + ouv.ctr.ctr2.count;
1605         if (ruv) nuv.ctr.ctr2.count += ruv->count;
1606         nuv.ctr.ctr2.cursors = talloc_array(ar,
1607                                             struct drsuapi_DsReplicaCursor2,
1608                                             nuv.ctr.ctr2.count);
1609         if (!nuv.ctr.ctr2.cursors) return replmd_replicated_request_werror(ar, WERR_NOMEM);
1610
1611         /* first copy the old vector */
1612         for (i=0; i < ouv.ctr.ctr2.count; i++) {
1613                 nuv.ctr.ctr2.cursors[ni] = ouv.ctr.ctr2.cursors[i];
1614                 ni++;
1615         }
1616
1617         /* get our invocation_id if we have one already attached to the ldb */
1618         our_invocation_id = samdb_ntds_invocation_id(ldb);
1619
1620         /* merge in the source_dsa vector is available */
1621         for (i=0; (ruv && i < ruv->count); i++) {
1622                 found = false;
1623
1624                 if (our_invocation_id &&
1625                     GUID_equal(&ruv->cursors[i].source_dsa_invocation_id,
1626                                our_invocation_id)) {
1627                         continue;
1628                 }
1629
1630                 for (j=0; j < ni; j++) {
1631                         if (!GUID_equal(&ruv->cursors[i].source_dsa_invocation_id,
1632                                         &nuv.ctr.ctr2.cursors[j].source_dsa_invocation_id)) {
1633                                 continue;
1634                         }
1635
1636                         found = true;
1637
1638                         /*
1639                          * we update only the highest_usn and not the latest_sync_success time,
1640                          * because the last success stands for direct replication
1641                          */
1642                         if (ruv->cursors[i].highest_usn > nuv.ctr.ctr2.cursors[j].highest_usn) {
1643                                 nuv.ctr.ctr2.cursors[j].highest_usn = ruv->cursors[i].highest_usn;
1644                         }
1645                         break;                  
1646                 }
1647
1648                 if (found) continue;
1649
1650                 /* if it's not there yet, add it */
1651                 nuv.ctr.ctr2.cursors[ni] = ruv->cursors[i];
1652                 ni++;
1653         }
1654
1655         /*
1656          * merge in the current highwatermark for the source_dsa
1657          */
1658         found = false;
1659         for (j=0; j < ni; j++) {
1660                 if (!GUID_equal(&ar->objs->source_dsa->source_dsa_invocation_id,
1661                                 &nuv.ctr.ctr2.cursors[j].source_dsa_invocation_id)) {
1662                         continue;
1663                 }
1664
1665                 found = true;
1666
1667                 /*
1668                  * here we update the highest_usn and last_sync_success time
1669                  * because we're directly replicating from the source_dsa
1670                  *
1671                  * and use the tmp_highest_usn because this is what we have just applied
1672                  * to our ldb
1673                  */
1674                 nuv.ctr.ctr2.cursors[j].highest_usn             = ar->objs->source_dsa->highwatermark.tmp_highest_usn;
1675                 nuv.ctr.ctr2.cursors[j].last_sync_success       = now;
1676                 break;
1677         }
1678         if (!found) {
1679                 /*
1680                  * here we update the highest_usn and last_sync_success time
1681                  * because we're directly replicating from the source_dsa
1682                  *
1683                  * and use the tmp_highest_usn because this is what we have just applied
1684                  * to our ldb
1685                  */
1686                 nuv.ctr.ctr2.cursors[ni].source_dsa_invocation_id= ar->objs->source_dsa->source_dsa_invocation_id;
1687                 nuv.ctr.ctr2.cursors[ni].highest_usn            = ar->objs->source_dsa->highwatermark.tmp_highest_usn;
1688                 nuv.ctr.ctr2.cursors[ni].last_sync_success      = now;
1689                 ni++;
1690         }
1691
1692         /*
1693          * finally correct the size of the cursors array
1694          */
1695         nuv.ctr.ctr2.count = ni;
1696
1697         /*
1698          * sort the cursors
1699          */
1700         qsort(nuv.ctr.ctr2.cursors, nuv.ctr.ctr2.count,
1701               sizeof(struct drsuapi_DsReplicaCursor2),
1702               (comparison_fn_t)drsuapi_DsReplicaCursor2_compare);
1703
1704         /*
1705          * create the change ldb_message
1706          */
1707         msg = ldb_msg_new(ar);
1708         if (!msg) return replmd_replicated_request_werror(ar, WERR_NOMEM);
1709         msg->dn = ar->search_msg->dn;
1710
1711         ndr_err = ndr_push_struct_blob(&nuv_value, msg, 
1712                                        lp_iconv_convenience(ldb_get_opaque(ldb, "loadparm")), 
1713                                        &nuv,
1714                                        (ndr_push_flags_fn_t)ndr_push_replUpToDateVectorBlob);
1715         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
1716                 NTSTATUS nt_status = ndr_map_error2ntstatus(ndr_err);
1717                 return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
1718         }
1719         ret = ldb_msg_add_value(msg, "replUpToDateVector", &nuv_value, &nuv_el);
1720         if (ret != LDB_SUCCESS) {
1721                 return replmd_replicated_request_error(ar, ret);
1722         }
1723         nuv_el->flags = LDB_FLAG_MOD_REPLACE;
1724
1725         /*
1726          * now create the new repsFrom value from the given repsFromTo1 structure
1727          */
1728         ZERO_STRUCT(nrf);
1729         nrf.version                                     = 1;
1730         nrf.ctr.ctr1                                    = *ar->objs->source_dsa;
1731         /* and fix some values... */
1732         nrf.ctr.ctr1.consecutive_sync_failures          = 0;
1733         nrf.ctr.ctr1.last_success                       = now;
1734         nrf.ctr.ctr1.last_attempt                       = now;
1735         nrf.ctr.ctr1.result_last_attempt                = WERR_OK;
1736         nrf.ctr.ctr1.highwatermark.highest_usn          = nrf.ctr.ctr1.highwatermark.tmp_highest_usn;
1737
1738         /*
1739          * first see if we already have a repsFrom value for the current source dsa
1740          * if so we'll later replace this value
1741          */
1742         orf_el = ldb_msg_find_element(ar->search_msg, "repsFrom");
1743         if (orf_el) {
1744                 for (i=0; i < orf_el->num_values; i++) {
1745                         struct repsFromToBlob *trf;
1746
1747                         trf = talloc(ar, struct repsFromToBlob);
1748                         if (!trf) return replmd_replicated_request_werror(ar, WERR_NOMEM);
1749
1750                         ndr_err = ndr_pull_struct_blob(&orf_el->values[i], trf, lp_iconv_convenience(ldb_get_opaque(ldb, "loadparm")), trf,
1751                                                        (ndr_pull_flags_fn_t)ndr_pull_repsFromToBlob);
1752                         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
1753                                 NTSTATUS nt_status = ndr_map_error2ntstatus(ndr_err);
1754                                 return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
1755                         }
1756
1757                         if (trf->version != 1) {
1758                                 return replmd_replicated_request_werror(ar, WERR_DS_DRA_INTERNAL_ERROR);
1759                         }
1760
1761                         /*
1762                          * we compare the source dsa objectGUID not the invocation_id
1763                          * because we want only one repsFrom value per source dsa
1764                          * and when the invocation_id of the source dsa has changed we don't need 
1765                          * the old repsFrom with the old invocation_id
1766                          */
1767                         if (!GUID_equal(&trf->ctr.ctr1.source_dsa_obj_guid,
1768                                         &ar->objs->source_dsa->source_dsa_obj_guid)) {
1769                                 talloc_free(trf);
1770                                 continue;
1771                         }
1772
1773                         talloc_free(trf);
1774                         nrf_value = &orf_el->values[i];
1775                         break;
1776                 }
1777
1778                 /*
1779                  * copy over all old values to the new ldb_message
1780                  */
1781                 ret = ldb_msg_add_empty(msg, "repsFrom", 0, &nrf_el);
1782                 if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
1783                 *nrf_el = *orf_el;
1784         }
1785
1786         /*
1787          * if we haven't found an old repsFrom value for the current source dsa
1788          * we'll add a new value
1789          */
1790         if (!nrf_value) {
1791                 struct ldb_val zero_value;
1792                 ZERO_STRUCT(zero_value);
1793                 ret = ldb_msg_add_value(msg, "repsFrom", &zero_value, &nrf_el);
1794                 if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
1795
1796                 nrf_value = &nrf_el->values[nrf_el->num_values - 1];
1797         }
1798
1799         /* we now fill the value which is already attached to ldb_message */
1800         ndr_err = ndr_push_struct_blob(nrf_value, msg, 
1801                                        lp_iconv_convenience(ldb_get_opaque(ldb, "loadparm")),
1802                                        &nrf,
1803                                        (ndr_push_flags_fn_t)ndr_push_repsFromToBlob);
1804         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
1805                 NTSTATUS nt_status = ndr_map_error2ntstatus(ndr_err);
1806                 return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
1807         }
1808
1809         /* 
1810          * the ldb_message_element for the attribute, has all the old values and the new one
1811          * so we'll replace the whole attribute with all values
1812          */
1813         nrf_el->flags = LDB_FLAG_MOD_REPLACE;
1814
1815         if (DEBUGLVL(4)) {
1816                 char *s = ldb_ldif_message_string(ldb, ar, LDB_CHANGETYPE_MODIFY, msg);
1817                 DEBUG(4, ("DRS replication uptodate modify message:\n%s\n", s));
1818                 talloc_free(s);
1819         }
1820
1821         /* prepare the ldb_modify() request */
1822         ret = ldb_build_mod_req(&change_req,
1823                                 ldb,
1824                                 ar,
1825                                 msg,
1826                                 ar->controls,
1827                                 ar,
1828                                 replmd_replicated_uptodate_modify_callback,
1829                                 ar->req);
1830         if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
1831
1832         return ldb_next_request(ar->module, change_req);
1833 }
1834
1835 static int replmd_replicated_uptodate_search_callback(struct ldb_request *req,
1836                                                       struct ldb_reply *ares)
1837 {
1838         struct replmd_replicated_request *ar = talloc_get_type(req->context,
1839                                                struct replmd_replicated_request);
1840         int ret;
1841
1842         if (!ares) {
1843                 return ldb_module_done(ar->req, NULL, NULL,
1844                                         LDB_ERR_OPERATIONS_ERROR);
1845         }
1846         if (ares->error != LDB_SUCCESS &&
1847             ares->error != LDB_ERR_NO_SUCH_OBJECT) {
1848                 return ldb_module_done(ar->req, ares->controls,
1849                                         ares->response, ares->error);
1850         }
1851
1852         switch (ares->type) {
1853         case LDB_REPLY_ENTRY:
1854                 ar->search_msg = talloc_steal(ar, ares->message);
1855                 break;
1856
1857         case LDB_REPLY_REFERRAL:
1858                 /* we ignore referrals */
1859                 break;
1860
1861         case LDB_REPLY_DONE:
1862                 if (ar->search_msg == NULL) {
1863                         ret = replmd_replicated_request_werror(ar, WERR_DS_DRA_INTERNAL_ERROR);
1864                 } else {
1865                         ret = replmd_replicated_uptodate_modify(ar);
1866                 }
1867                 if (ret != LDB_SUCCESS) {
1868                         return ldb_module_done(ar->req, NULL, NULL, ret);
1869                 }
1870         }
1871
1872         talloc_free(ares);
1873         return LDB_SUCCESS;
1874 }
1875
1876
1877 static int replmd_replicated_uptodate_vector(struct replmd_replicated_request *ar)
1878 {
1879         struct ldb_context *ldb;
1880         int ret;
1881         static const char *attrs[] = {
1882                 "replUpToDateVector",
1883                 "repsFrom",
1884                 NULL
1885         };
1886         struct ldb_request *search_req;
1887
1888         ldb = ldb_module_get_ctx(ar->module);
1889         ar->search_msg = NULL;
1890
1891         ret = ldb_build_search_req(&search_req,
1892                                    ldb,
1893                                    ar,
1894                                    ar->objs->partition_dn,
1895                                    LDB_SCOPE_BASE,
1896                                    "(objectClass=*)",
1897                                    attrs,
1898                                    NULL,
1899                                    ar,
1900                                    replmd_replicated_uptodate_search_callback,
1901                                    ar->req);
1902         if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
1903
1904         return ldb_next_request(ar->module, search_req);
1905 }
1906
1907
1908
1909 static int replmd_extended_replicated_objects(struct ldb_module *module, struct ldb_request *req)
1910 {
1911         struct ldb_context *ldb;
1912         struct dsdb_extended_replicated_objects *objs;
1913         struct replmd_replicated_request *ar;
1914         struct ldb_control **ctrls;
1915         int ret, i;
1916         struct dsdb_control_current_partition *partition_ctrl;
1917         struct replmd_private *replmd_private = 
1918                 talloc_get_type(ldb_module_get_private(module), struct replmd_private);
1919
1920         ldb = ldb_module_get_ctx(module);
1921
1922         ldb_debug(ldb, LDB_DEBUG_TRACE, "replmd_extended_replicated_objects\n");
1923
1924         objs = talloc_get_type(req->op.extended.data, struct dsdb_extended_replicated_objects);
1925         if (!objs) {
1926                 ldb_debug(ldb, LDB_DEBUG_FATAL, "replmd_extended_replicated_objects: invalid extended data\n");
1927                 return LDB_ERR_PROTOCOL_ERROR;
1928         }
1929
1930         if (objs->version != DSDB_EXTENDED_REPLICATED_OBJECTS_VERSION) {
1931                 ldb_debug(ldb, LDB_DEBUG_FATAL, "replmd_extended_replicated_objects: extended data invalid version [%u != %u]\n",
1932                           objs->version, DSDB_EXTENDED_REPLICATED_OBJECTS_VERSION);
1933                 return LDB_ERR_PROTOCOL_ERROR;
1934         }
1935
1936         ar = replmd_ctx_init(module, req);
1937         if (!ar)
1938                 return LDB_ERR_OPERATIONS_ERROR;
1939
1940         ar->objs = objs;
1941         ar->schema = dsdb_get_schema(ldb);
1942         if (!ar->schema) {
1943                 ldb_debug_set(ldb, LDB_DEBUG_FATAL, "replmd_ctx_init: no loaded schema found\n");
1944                 talloc_free(ar);
1945                 DEBUG(0,(__location__ ": %s\n", ldb_errstring(ldb)));
1946                 return LDB_ERR_CONSTRAINT_VIOLATION;
1947         }
1948
1949         ctrls = req->controls;
1950
1951         if (req->controls) {
1952                 req->controls = talloc_memdup(ar, req->controls,
1953                                               talloc_get_size(req->controls));
1954                 if (!req->controls) return replmd_replicated_request_werror(ar, WERR_NOMEM);
1955         }
1956
1957         ret = ldb_request_add_control(req, DSDB_CONTROL_REPLICATED_UPDATE_OID, false, NULL);
1958         if (ret != LDB_SUCCESS) {
1959                 return ret;
1960         }
1961
1962         /*
1963           add the DSDB_CONTROL_CURRENT_PARTITION_OID control. This
1964           tells the partition module which partition this request is
1965           directed at. That is important as the partition roots appear
1966           twice in the directory, once as mount points in the top
1967           level store, and once as the roots of each partition. The
1968           replication code wants to operate on the root of the
1969           partitions, not the top level mount points
1970          */
1971         partition_ctrl = talloc(req, struct dsdb_control_current_partition);
1972         if (partition_ctrl == NULL) {
1973                 if (!partition_ctrl) return replmd_replicated_request_werror(ar, WERR_NOMEM);
1974         }
1975         partition_ctrl->version = DSDB_CONTROL_CURRENT_PARTITION_VERSION;
1976         partition_ctrl->dn = objs->partition_dn;
1977
1978         ret = ldb_request_add_control(req, DSDB_CONTROL_CURRENT_PARTITION_OID, false, partition_ctrl);
1979         if (ret != LDB_SUCCESS) {
1980                 return ret;
1981         }
1982
1983         ar->controls = req->controls;
1984         req->controls = ctrls;
1985
1986         DEBUG(4,("linked_attributes_count=%u\n", objs->linked_attributes_count));
1987
1988         /* save away the linked attributes for the end of the
1989            transaction */
1990         for (i=0; i<ar->objs->linked_attributes_count; i++) {
1991                 struct la_entry *la_entry;
1992
1993                 if (replmd_private->la_ctx == NULL) {
1994                         replmd_private->la_ctx = talloc_new(replmd_private);
1995                 }
1996                 la_entry = talloc(replmd_private->la_ctx, struct la_entry);
1997                 if (la_entry == NULL) {
1998                         ldb_oom(ldb);
1999                         return LDB_ERR_OPERATIONS_ERROR;
2000                 }
2001                 la_entry->la = talloc(la_entry, struct drsuapi_DsReplicaLinkedAttribute);
2002                 if (la_entry->la == NULL) {
2003                         talloc_free(la_entry);
2004                         ldb_oom(ldb);
2005                         return LDB_ERR_OPERATIONS_ERROR;
2006                 }
2007                 *la_entry->la = ar->objs->linked_attributes[i];
2008
2009                 /* we need to steal the non-scalars so they stay
2010                    around until the end of the transaction */
2011                 talloc_steal(la_entry->la, la_entry->la->identifier);
2012                 talloc_steal(la_entry->la, la_entry->la->value.blob);
2013
2014                 DLIST_ADD(replmd_private->la_list, la_entry);
2015         }
2016
2017         return replmd_replicated_apply_next(ar);
2018 }
2019
2020 /*
2021   process one linked attribute structure
2022  */
2023 static int replmd_process_linked_attribute(struct ldb_module *module,
2024                                            struct la_entry *la_entry)
2025 {                                          
2026         struct drsuapi_DsReplicaLinkedAttribute *la = la_entry->la;
2027         struct ldb_context *ldb = ldb_module_get_ctx(module);
2028         struct drsuapi_DsReplicaObjectIdentifier3 target;
2029         struct ldb_message *msg;
2030         struct ldb_message_element *ret_el;
2031         TALLOC_CTX *tmp_ctx = talloc_new(la_entry);
2032         enum ndr_err_code ndr_err;
2033         char *target_dn;
2034         struct ldb_request *mod_req;
2035         int ret;
2036         const struct dsdb_attribute *attr;
2037
2038 /*
2039 linked_attributes[0]:                                                     
2040      &objs->linked_attributes[i]: struct drsuapi_DsReplicaLinkedAttribute 
2041         identifier               : *                                      
2042             identifier: struct drsuapi_DsReplicaObjectIdentifier          
2043                 __ndr_size               : 0x0000003a (58)                
2044                 __ndr_size_sid           : 0x00000000 (0)                 
2045                 guid                     : 8e95b6a9-13dd-4158-89db-3220a5be5cc7
2046                 sid                      : S-0-0                               
2047                 __ndr_size_dn            : 0x00000000 (0)                      
2048                 dn                       : ''                                  
2049         attid                    : DRSUAPI_ATTRIBUTE_member (0x1F)             
2050         value: struct drsuapi_DsAttributeValue                                 
2051             __ndr_size               : 0x0000007e (126)                        
2052             blob                     : *                                       
2053                 blob                     : DATA_BLOB length=126                
2054         flags                    : 0x00000001 (1)                              
2055                1: DRSUAPI_DS_LINKED_ATTRIBUTE_FLAG_ACTIVE                      
2056         originating_add_time     : Wed Sep  2 22:20:01 2009 EST                
2057         meta_data: struct drsuapi_DsReplicaMetaData                            
2058             version                  : 0x00000015 (21)                         
2059             originating_change_time  : Wed Sep  2 23:39:07 2009 EST            
2060             originating_invocation_id: 794640f3-18cf-40ee-a211-a93992b67a64    
2061             originating_usn          : 0x000000000001e19c (123292)             
2062      &target: struct drsuapi_DsReplicaObjectIdentifier3                        
2063         __ndr_size               : 0x0000007e (126)                            
2064         __ndr_size_sid           : 0x0000001c (28)                             
2065         guid                     : 7639e594-db75-4086-b0d4-67890ae46031        
2066         sid                      : S-1-5-21-2848215498-2472035911-1947525656-19924
2067         __ndr_size_dn            : 0x00000022 (34)                                
2068         dn                       : 'CN=UOne,OU=TestOU,DC=vsofs8,DC=com'           
2069  */
2070         if (DEBUGLVL(4)) {
2071                 NDR_PRINT_DEBUG(drsuapi_DsReplicaLinkedAttribute, la); 
2072         }
2073         
2074         /* decode the target of the link */
2075         ndr_err = ndr_pull_struct_blob(la->value.blob, 
2076                                        tmp_ctx, lp_iconv_convenience(ldb_get_opaque(ldb, "loadparm")), 
2077                                        &target,
2078                                        (ndr_pull_flags_fn_t)ndr_pull_drsuapi_DsReplicaObjectIdentifier3);
2079         if (ndr_err != NDR_ERR_SUCCESS) {
2080                 DEBUG(0,("Unable to decode linked_attribute target\n"));
2081                 dump_data(4, la->value.blob->data, la->value.blob->length);                     
2082                 talloc_free(tmp_ctx);
2083                 return LDB_ERR_OPERATIONS_ERROR;
2084         }
2085         if (DEBUGLVL(4)) {
2086                 NDR_PRINT_DEBUG(drsuapi_DsReplicaObjectIdentifier3, &target);
2087         }
2088
2089         /* construct a modify request for this attribute change */
2090         msg = ldb_msg_new(tmp_ctx);
2091         if (!msg) {
2092                 ldb_oom(ldb);
2093                 talloc_free(tmp_ctx);
2094                 return LDB_ERR_OPERATIONS_ERROR;
2095         }
2096
2097         ret = dsdb_find_dn_by_guid(ldb, tmp_ctx, 
2098                                    GUID_string(tmp_ctx, &la->identifier->guid), &msg->dn);
2099         if (ret != LDB_SUCCESS) {
2100                 talloc_free(tmp_ctx);
2101                 return ret;
2102         }
2103
2104         /* find the attribute being modified */
2105         attr = dsdb_attribute_by_attributeID_id(dsdb_get_schema(ldb), la->attid);
2106         if (attr == NULL) {
2107                 DEBUG(0, (__location__ ": Unable to find attributeID 0x%x\n", la->attid));
2108                 talloc_free(tmp_ctx);
2109                 return LDB_ERR_OPERATIONS_ERROR;
2110         }
2111
2112         if (la->flags & DRSUAPI_DS_LINKED_ATTRIBUTE_FLAG_ACTIVE) {
2113                 ret = ldb_msg_add_empty(msg, attr->lDAPDisplayName,
2114                                         LDB_FLAG_MOD_ADD, &ret_el);
2115         } else {
2116                 ret = ldb_msg_add_empty(msg, attr->lDAPDisplayName,
2117                                         LDB_FLAG_MOD_DELETE, &ret_el);
2118         }
2119         if (ret != LDB_SUCCESS) {
2120                 talloc_free(tmp_ctx);
2121                 return ret;
2122         }
2123         /* we allocate two entries here, in case we need a remove/add
2124            pair */
2125         ret_el->values = talloc_array(msg, struct ldb_val, 2);
2126         if (!ret_el->values) {
2127                 ldb_oom(ldb);
2128                 talloc_free(tmp_ctx);
2129                 return LDB_ERR_OPERATIONS_ERROR;
2130         }
2131         ret_el->num_values = 1;
2132
2133         target_dn = talloc_asprintf(tmp_ctx, "<GUID=%s>;<SID=%s>;%s",
2134                                     GUID_string(tmp_ctx, &target.guid),
2135                                     dom_sid_string(tmp_ctx, &target.sid),
2136                                     target.dn);
2137         if (target_dn == NULL) {
2138                 ldb_oom(ldb);
2139                 talloc_free(tmp_ctx);
2140                 return LDB_ERR_OPERATIONS_ERROR;
2141         }
2142         ret_el->values[0] = data_blob_string_const(target_dn);
2143
2144         ret = ldb_build_mod_req(&mod_req, ldb, tmp_ctx,
2145                                 msg,
2146                                 NULL,
2147                                 NULL, 
2148                                 ldb_op_default_callback,
2149                                 NULL);
2150         if (ret != LDB_SUCCESS) {
2151                 talloc_free(tmp_ctx);
2152                 return ret;
2153         }
2154         talloc_steal(mod_req, msg);
2155
2156         if (DEBUGLVL(4)) {
2157                 DEBUG(4,("Applying DRS linked attribute change:\n%s\n",
2158                          ldb_ldif_message_string(ldb, tmp_ctx, LDB_CHANGETYPE_MODIFY, msg)));
2159         }
2160
2161         /* Run the new request */
2162         ret = ldb_next_request(module, mod_req);
2163
2164         /* we need to wait for this to finish, as we are being called
2165            from the synchronous end_transaction hook of this module */
2166         if (ret == LDB_SUCCESS) {
2167                 ret = ldb_wait(mod_req->handle, LDB_WAIT_ALL);
2168         }
2169
2170         if (ret == LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS) {
2171                 /* the link destination exists, we need to update it
2172                  * by deleting the old one for the same DN then adding
2173                  * the new one */
2174                 msg->elements = talloc_realloc(msg, msg->elements,
2175                                                struct ldb_message_element,
2176                                                msg->num_elements+1);
2177                 if (msg->elements == NULL) {
2178                         ldb_oom(ldb);
2179                         talloc_free(tmp_ctx);
2180                         return LDB_ERR_OPERATIONS_ERROR;
2181                 }
2182                 /* this relies on the backend matching the old entry
2183                    only by the DN portion of the extended DN */
2184                 msg->elements[1] = msg->elements[0];
2185                 msg->elements[0].flags = LDB_FLAG_MOD_DELETE;
2186                 msg->num_elements++;
2187
2188                 ret = ldb_build_mod_req(&mod_req, ldb, tmp_ctx,
2189                                         msg,
2190                                         NULL,
2191                                         NULL, 
2192                                         ldb_op_default_callback,
2193                                         NULL);
2194                 if (ret != LDB_SUCCESS) {
2195                         talloc_free(tmp_ctx);
2196                         return ret;
2197                 }
2198
2199                 /* Run the new request */
2200                 ret = ldb_next_request(module, mod_req);
2201                 
2202                 if (ret == LDB_SUCCESS) {
2203                         ret = ldb_wait(mod_req->handle, LDB_WAIT_ALL);
2204                 }
2205         }
2206
2207         if (ret != LDB_SUCCESS) {
2208                 ldb_debug(ldb, LDB_DEBUG_WARNING, "Failed to apply linked attribute change '%s' %s\n",
2209                           ldb_errstring(ldb),
2210                           ldb_ldif_message_string(ldb, tmp_ctx, LDB_CHANGETYPE_MODIFY, msg));
2211                 ret = LDB_SUCCESS;
2212         }
2213         
2214         talloc_free(tmp_ctx);
2215
2216         return ret;     
2217 }
2218
2219 static int replmd_extended(struct ldb_module *module, struct ldb_request *req)
2220 {
2221         if (strcmp(req->op.extended.oid, DSDB_EXTENDED_REPLICATED_OBJECTS_OID) == 0) {
2222                 return replmd_extended_replicated_objects(module, req);
2223         }
2224
2225         return ldb_next_request(module, req);
2226 }
2227
2228
2229 /*
2230   we hook into the transaction operations to allow us to 
2231   perform the linked attribute updates at the end of the whole
2232   transaction. This allows a forward linked attribute to be created
2233   before the object is created. During a vampire, w2k8 sends us linked
2234   attributes before the objects they are part of.
2235  */
2236 static int replmd_start_transaction(struct ldb_module *module)
2237 {
2238         /* create our private structure for this transaction */
2239         int i;
2240         struct replmd_private *replmd_private = talloc_get_type(ldb_module_get_private(module),
2241                                                                 struct replmd_private);
2242         talloc_free(replmd_private->la_ctx);
2243         replmd_private->la_list = NULL;
2244         replmd_private->la_ctx = NULL;
2245
2246         for (i=0; i<replmd_private->num_ncs; i++) {
2247                 replmd_private->ncs[i].mod_usn = 0;
2248         }
2249
2250         return ldb_next_start_trans(module);
2251 }
2252
2253 /*
2254   on prepare commit we loop over our queued la_context structures and
2255   apply each of them  
2256  */
2257 static int replmd_prepare_commit(struct ldb_module *module)
2258 {
2259         struct replmd_private *replmd_private = 
2260                 talloc_get_type(ldb_module_get_private(module), struct replmd_private);
2261         struct la_entry *la, *prev;
2262         int ret;
2263
2264         /* walk the list backwards, to do the first entry first, as we
2265          * added the entries with DLIST_ADD() which puts them at the
2266          * start of the list */
2267         for (la = replmd_private->la_list; la && la->next; la=la->next) ;
2268
2269         for (; la; la=prev) {
2270                 prev = la->prev;
2271                 DLIST_REMOVE(replmd_private->la_list, la);
2272                 ret = replmd_process_linked_attribute(module, la);
2273                 if (ret != LDB_SUCCESS) {
2274                         return ret;
2275                 }
2276         }
2277
2278         talloc_free(replmd_private->la_ctx);
2279         replmd_private->la_list = NULL;
2280         replmd_private->la_ctx = NULL;
2281
2282         /* possibly change @REPLCHANGED */
2283         ret = replmd_notify_store(module);
2284         if (ret != LDB_SUCCESS) {
2285                 return ret;
2286         }
2287         
2288         return ldb_next_prepare_commit(module);
2289 }
2290
2291 static int replmd_del_transaction(struct ldb_module *module)
2292 {
2293         struct replmd_private *replmd_private = 
2294                 talloc_get_type(ldb_module_get_private(module), struct replmd_private);
2295         talloc_free(replmd_private->la_ctx);
2296         replmd_private->la_list = NULL;
2297         replmd_private->la_ctx = NULL;
2298         return ldb_next_del_trans(module);
2299 }
2300
2301
2302 _PUBLIC_ const struct ldb_module_ops ldb_repl_meta_data_module_ops = {
2303         .name          = "repl_meta_data",
2304         .init_context      = replmd_init,
2305         .add               = replmd_add,
2306         .modify            = replmd_modify,
2307         .rename            = replmd_rename,
2308         .extended          = replmd_extended,
2309         .start_transaction = replmd_start_transaction,
2310         .prepare_commit    = replmd_prepare_commit,
2311         .del_transaction   = replmd_del_transaction,
2312 };