r20727: implement basic merging of replicated objects when it already exist
[metze/samba/wip.git] / source4 / dsdb / samdb / ldb_modules / repl_meta_data.c
1 /* 
2    ldb database library
3
4    Copyright (C) Simo Sorce  2004-2006
5    Copyright (C) Andrew Bartlett <abartlet@samba.org> 2005
6    Copyright (C) Andrew Tridgell 2005
7    Copyright (C) Stefan Metzmacher 2007
8
9      ** NOTE! The following LGPL license applies to the ldb
10      ** library. This does NOT imply that all of Samba is released
11      ** under the LGPL
12    
13    This library is free software; you can redistribute it and/or
14    modify it under the terms of the GNU Lesser General Public
15    License as published by the Free Software Foundation; either
16    version 2 of the License, or (at your option) any later version.
17
18    This library is distributed in the hope that it will be useful,
19    but WITHOUT ANY WARRANTY; without even the implied warranty of
20    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
21    Lesser General Public License for more details.
22
23    You should have received a copy of the GNU Lesser General Public
24    License along with this library; if not, write to the Free Software
25    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
26 */
27
28 /*
29  *  Name: ldb
30  *
31  *  Component: ldb repl_meta_data module
32  *
33  *  Description: - add a unique objectGUID onto every new record,
34  *               - handle whenCreated, whenChanged timestamps
35  *               - handle uSNCreated, uSNChanged numbers
36  *               - handle replPropertyMetaData attribute
37  *
38  *  Author: Simo Sorce
39  *  Author: Stefan Metzmacher
40  */
41
42 #include "includes.h"
43 #include "lib/ldb/include/ldb.h"
44 #include "lib/ldb/include/ldb_errors.h"
45 #include "lib/ldb/include/ldb_private.h"
46 #include "dsdb/samdb/samdb.h"
47 #include "librpc/gen_ndr/ndr_misc.h"
48 #include "librpc/gen_ndr/ndr_drsuapi.h"
49 #include "librpc/gen_ndr/ndr_drsblobs.h"
50
51 struct replmd_replicated_request {
52         struct ldb_module *module;
53         struct ldb_handle *handle;
54         struct ldb_request *orig_req;
55
56         struct dsdb_extended_replicated_objects *objs;
57
58         uint32_t index_current;
59
60         struct {
61                 TALLOC_CTX *mem_ctx;
62                 struct ldb_request *search_req;
63                 struct ldb_message *search_msg;
64                 int search_ret;
65                 struct ldb_request *change_req;
66                 int change_ret;
67         } sub;
68 };
69
70 static struct replmd_replicated_request *replmd_replicated_init_handle(struct ldb_module *module,
71                                                                        struct ldb_request *req,
72                                                                        struct dsdb_extended_replicated_objects *objs)
73 {
74         struct replmd_replicated_request *ar;
75         struct ldb_handle *h;
76
77         h = talloc_zero(req, struct ldb_handle);
78         if (h == NULL) {
79                 ldb_set_errstring(module->ldb, "Out of Memory");
80                 return NULL;
81         }
82
83         h->module       = module;
84         h->state        = LDB_ASYNC_PENDING;
85         h->status       = LDB_SUCCESS;
86
87         ar = talloc_zero(h, struct replmd_replicated_request);
88         if (ar == NULL) {
89                 ldb_set_errstring(module->ldb, "Out of Memory");
90                 talloc_free(h);
91                 return NULL;
92         }
93
94         h->private_data = ar;
95
96         ar->module      = module;
97         ar->handle      = h;
98         ar->orig_req    = req;
99         ar->objs        = objs;
100
101         req->handle = h;
102
103         return ar;
104 }
105
106 static struct ldb_message_element *replmd_find_attribute(const struct ldb_message *msg, const char *name)
107 {
108         int i;
109
110         for (i = 0; i < msg->num_elements; i++) {
111                 if (ldb_attr_cmp(name, msg->elements[i].name) == 0) {
112                         return &msg->elements[i];
113                 }
114         }
115
116         return NULL;
117 }
118
119 /*
120   add a time element to a record
121 */
122 static int add_time_element(struct ldb_message *msg, const char *attr, time_t t)
123 {
124         struct ldb_message_element *el;
125         char *s;
126
127         if (ldb_msg_find_element(msg, attr) != NULL) {
128                 return 0;
129         }
130
131         s = ldb_timestring(msg, t);
132         if (s == NULL) {
133                 return -1;
134         }
135
136         if (ldb_msg_add_string(msg, attr, s) != 0) {
137                 return -1;
138         }
139
140         el = ldb_msg_find_element(msg, attr);
141         /* always set as replace. This works because on add ops, the flag
142            is ignored */
143         el->flags = LDB_FLAG_MOD_REPLACE;
144
145         return 0;
146 }
147
148 /*
149   add a uint64_t element to a record
150 */
151 static int add_uint64_element(struct ldb_message *msg, const char *attr, uint64_t v)
152 {
153         struct ldb_message_element *el;
154
155         if (ldb_msg_find_element(msg, attr) != NULL) {
156                 return 0;
157         }
158
159         if (ldb_msg_add_fmt(msg, attr, "%llu", (unsigned long long)v) != 0) {
160                 return -1;
161         }
162
163         el = ldb_msg_find_element(msg, attr);
164         /* always set as replace. This works because on add ops, the flag
165            is ignored */
166         el->flags = LDB_FLAG_MOD_REPLACE;
167
168         return 0;
169 }
170
171 static int replmd_add_replicated(struct ldb_module *module, struct ldb_request *req, struct ldb_control *ctrl)
172 {
173         struct ldb_control **saved_ctrls;
174         int ret;
175
176         ldb_debug(module->ldb, LDB_DEBUG_TRACE, "replmd_add_replicated\n");
177
178         if (!save_controls(ctrl, req, &saved_ctrls)) {
179                 return LDB_ERR_OPERATIONS_ERROR;
180         }
181
182         ret = ldb_next_request(module, req);
183         req->controls = saved_ctrls;
184
185         return ret;
186 }
187
188 static int replmd_add_originating(struct ldb_module *module, struct ldb_request *req)
189 {
190         struct ldb_request *down_req;
191         struct ldb_message_element *attribute;
192         struct ldb_message *msg;
193         struct ldb_val v;
194         struct GUID guid;
195         uint64_t seq_num;
196         NTSTATUS nt_status;
197         int ret;
198         time_t t = time(NULL);
199
200         ldb_debug(module->ldb, LDB_DEBUG_TRACE, "replmd_add_originating\n");
201
202         if ((attribute = replmd_find_attribute(req->op.add.message, "objectGUID")) != NULL ) {
203                 return ldb_next_request(module, req);
204         }
205
206         down_req = talloc(req, struct ldb_request);
207         if (down_req == NULL) {
208                 return LDB_ERR_OPERATIONS_ERROR;
209         }
210
211         *down_req = *req;
212
213         /* we have to copy the message as the caller might have it as a const */
214         down_req->op.add.message = msg = ldb_msg_copy_shallow(down_req, req->op.add.message);
215         if (msg == NULL) {
216                 talloc_free(down_req);
217                 return LDB_ERR_OPERATIONS_ERROR;
218         }
219
220         /* a new GUID */
221         guid = GUID_random();
222
223         nt_status = ndr_push_struct_blob(&v, msg, &guid, 
224                                          (ndr_push_flags_fn_t)ndr_push_GUID);
225         if (!NT_STATUS_IS_OK(nt_status)) {
226                 talloc_free(down_req);
227                 return LDB_ERR_OPERATIONS_ERROR;
228         }
229
230         ret = ldb_msg_add_value(msg, "objectGUID", &v, NULL);
231         if (ret) {
232                 talloc_free(down_req);
233                 return ret;
234         }
235         
236         if (add_time_element(msg, "whenCreated", t) != 0 ||
237             add_time_element(msg, "whenChanged", t) != 0) {
238                 talloc_free(down_req);
239                 return LDB_ERR_OPERATIONS_ERROR;
240         }
241
242         /* Get a sequence number from the backend */
243         ret = ldb_sequence_number(module->ldb, LDB_SEQ_NEXT, &seq_num);
244         if (ret == LDB_SUCCESS) {
245                 if (add_uint64_element(msg, "uSNCreated", seq_num) != 0 ||
246                     add_uint64_element(msg, "uSNChanged", seq_num) != 0) {
247                         talloc_free(down_req);
248                         return LDB_ERR_OPERATIONS_ERROR;
249                 }
250         }
251
252         ldb_set_timeout_from_prev_req(module->ldb, req, down_req);
253
254         /* go on with the call chain */
255         ret = ldb_next_request(module, down_req);
256
257         /* do not free down_req as the call results may be linked to it,
258          * it will be freed when the upper level request get freed */
259         if (ret == LDB_SUCCESS) {
260                 req->handle = down_req->handle;
261         }
262
263         return ret;
264 }
265
266 static int replmd_add(struct ldb_module *module, struct ldb_request *req)
267 {
268         struct ldb_control *ctrl;
269
270         /* do not manipulate our control entries */
271         if (ldb_dn_is_special(req->op.add.message->dn)) {
272                 return ldb_next_request(module, req);
273         }
274
275         ctrl = get_control_from_list(req->controls, DSDB_CONTROL_REPLICATED_OBJECT_OID);
276         if (ctrl) {
277                 /* handle replicated objects different */
278                 return replmd_add_replicated(module, req, ctrl);
279         }
280
281         return replmd_add_originating(module, req);
282 }
283
284 static int replmd_modify_replicated(struct ldb_module *module, struct ldb_request *req, struct ldb_control *ctrl)
285 {
286         struct ldb_control **saved_ctrls;
287         int ret;
288
289         ldb_debug(module->ldb, LDB_DEBUG_TRACE, "replmd_modify_replicated\n");
290
291         if (!save_controls(ctrl, req, &saved_ctrls)) {
292                 return LDB_ERR_OPERATIONS_ERROR;
293         }
294
295         ret = ldb_next_request(module, req);
296         req->controls = saved_ctrls;
297
298         return ret;
299 }
300
301 static int replmd_modify_originating(struct ldb_module *module, struct ldb_request *req)
302 {
303         struct ldb_request *down_req;
304         struct ldb_message *msg;
305         int ret;
306         time_t t = time(NULL);
307         uint64_t seq_num;
308
309         ldb_debug(module->ldb, LDB_DEBUG_TRACE, "replmd_modify_originating\n");
310
311         down_req = talloc(req, struct ldb_request);
312         if (down_req == NULL) {
313                 return LDB_ERR_OPERATIONS_ERROR;
314         }
315
316         *down_req = *req;
317
318         /* we have to copy the message as the caller might have it as a const */
319         down_req->op.mod.message = msg = ldb_msg_copy_shallow(down_req, req->op.mod.message);
320         if (msg == NULL) {
321                 talloc_free(down_req);
322                 return LDB_ERR_OPERATIONS_ERROR;
323         }
324
325         if (add_time_element(msg, "whenChanged", t) != 0) {
326                 talloc_free(down_req);
327                 return LDB_ERR_OPERATIONS_ERROR;
328         }
329
330         /* Get a sequence number from the backend */
331         ret = ldb_sequence_number(module->ldb, LDB_SEQ_NEXT, &seq_num);
332         if (ret == LDB_SUCCESS) {
333                 if (add_uint64_element(msg, "uSNChanged", seq_num) != 0) {
334                         talloc_free(down_req);
335                         return LDB_ERR_OPERATIONS_ERROR;
336                 }
337         }
338
339         ldb_set_timeout_from_prev_req(module->ldb, req, down_req);
340
341         /* go on with the call chain */
342         ret = ldb_next_request(module, down_req);
343
344         /* do not free down_req as the call results may be linked to it,
345          * it will be freed when the upper level request get freed */
346         if (ret == LDB_SUCCESS) {
347                 req->handle = down_req->handle;
348         }
349
350         return ret;
351 }
352
353 static int replmd_modify(struct ldb_module *module, struct ldb_request *req)
354 {
355         struct ldb_control *ctrl;
356
357         /* do not manipulate our control entries */
358         if (ldb_dn_is_special(req->op.mod.message->dn)) {
359                 return ldb_next_request(module, req);
360         }
361
362         ctrl = get_control_from_list(req->controls, DSDB_CONTROL_REPLICATED_OBJECT_OID);
363         if (ctrl) {
364                 /* handle replicated objects different */
365                 return replmd_modify_replicated(module, req, ctrl);
366         }
367
368         return replmd_modify_originating(module, req);
369 }
370
371 static int replmd_replicated_request_reply_helper(struct replmd_replicated_request *ar, int ret)
372 {
373         struct ldb_reply *ares = NULL;
374
375         ar->handle->status = ret;
376         ar->handle->state = LDB_ASYNC_DONE;
377
378         if (!ar->orig_req->callback) {
379                 return LDB_SUCCESS;
380         }
381         
382         /* we're done and need to report the success to the caller */
383         ares = talloc_zero(ar, struct ldb_reply);
384         if (!ares) {
385                 ar->handle->status = LDB_ERR_OPERATIONS_ERROR;
386                 ar->handle->state = LDB_ASYNC_DONE;
387                 return LDB_ERR_OPERATIONS_ERROR;
388         }
389
390         ares->type      = LDB_REPLY_EXTENDED;
391         ares->response  = NULL;
392
393         return ar->orig_req->callback(ar->module->ldb, ar->orig_req->context, ares);
394 }
395
396 static int replmd_replicated_request_done(struct replmd_replicated_request *ar)
397 {
398         return replmd_replicated_request_reply_helper(ar, LDB_SUCCESS);
399 }
400
401 static int replmd_replicated_request_error(struct replmd_replicated_request *ar, int ret)
402 {
403         return replmd_replicated_request_reply_helper(ar, ret);
404 }
405
406 static int replmd_replicated_request_werror(struct replmd_replicated_request *ar, WERROR status)
407 {
408         int ret = LDB_ERR_OTHER;
409         /* TODO: do some error mapping */
410         return replmd_replicated_request_reply_helper(ar, ret);
411 }
412
413 static int replmd_replicated_apply_next(struct replmd_replicated_request *ar);
414
415 static int replmd_replicated_apply_add_callback(struct ldb_context *ldb,
416                                                 void *private_data,
417                                                 struct ldb_reply *ares)
418 {
419 #ifdef REPLMD_FULL_ASYNC /* TODO: active this code when ldb support full async code */ 
420         struct replmd_replicated_request *ar = talloc_get_type(private_data,
421                                                struct replmd_replicated_request);
422
423         ar->sub.change_ret = ldb_wait(ar->sub.search_req->handle, LDB_WAIT_ALL);
424         if (ar->sub.change_ret != LDB_SUCCESS) {
425                 return replmd_replicated_request_error(ar, ar->sub.change_ret);
426         }
427
428         talloc_free(ar->sub.mem_ctx);
429         ZERO_STRUCT(ar->sub);
430
431         ar->index_current++;
432
433         return replmd_replicated_apply_next(ar);
434 #else
435         return LDB_SUCCESS;
436 #endif
437 }
438
439 static int replmd_replicated_apply_add(struct replmd_replicated_request *ar)
440 {
441         NTSTATUS nt_status;
442         struct ldb_message *msg;
443         struct replPropertyMetaDataBlob *md;
444         struct ldb_val md_value;
445         uint32_t i;
446         uint64_t seq_num;
447         int ret;
448
449         /*
450          * TODO: check if the parent object exist
451          */
452
453         /*
454          * TODO: handle the conflict case where an object with the
455          *       same name exist
456          */
457
458         msg = ar->objs->objects[ar->index_current].msg;
459         md = ar->objs->objects[ar->index_current].meta_data;
460
461         ret = ldb_sequence_number(ar->module->ldb, LDB_SEQ_NEXT, &seq_num);
462         if (ret != LDB_SUCCESS) {
463                 return replmd_replicated_request_error(ar, ret);
464         }
465
466         ret = ldb_msg_add_value(msg, "objectGUID", &ar->objs->objects[ar->index_current].guid_value, NULL);
467         if (ret != LDB_SUCCESS) {
468                 return replmd_replicated_request_error(ar, ret);
469         }
470
471         ret = ldb_msg_add_string(msg, "whenChanged", ar->objs->objects[ar->index_current].when_changed);
472         if (ret != LDB_SUCCESS) {
473                 return replmd_replicated_request_error(ar, ret);
474         }
475
476         ret = samdb_msg_add_uint64(ar->module->ldb, msg, msg, "uSNCreated", seq_num);
477         if (ret != LDB_SUCCESS) {
478                 return replmd_replicated_request_error(ar, ret);
479         }
480
481         ret = samdb_msg_add_uint64(ar->module->ldb, msg, msg, "uSNChanged", seq_num);
482         if (ret != LDB_SUCCESS) {
483                 return replmd_replicated_request_error(ar, ret);
484         }
485
486         md = ar->objs->objects[ar->index_current].meta_data;
487         for (i=0; i < md->ctr.ctr1.count; i++) {
488                 md->ctr.ctr1.array[i].local_usn = seq_num;
489         }
490         nt_status = ndr_push_struct_blob(&md_value, msg, md,
491                                          (ndr_push_flags_fn_t)ndr_push_replPropertyMetaDataBlob);
492         if (!NT_STATUS_IS_OK(nt_status)) {
493                 return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
494         }
495         ret = ldb_msg_add_value(msg, "replPropertyMetaData", &md_value, NULL);
496         if (ret != LDB_SUCCESS) {
497                 return replmd_replicated_request_error(ar, ret);
498         }
499
500         ret = ldb_build_add_req(&ar->sub.change_req,
501                                 ar->module->ldb,
502                                 ar->sub.mem_ctx,
503                                 msg,
504                                 NULL,
505                                 ar,
506                                 replmd_replicated_apply_add_callback);
507         if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
508
509 #ifdef REPLMD_FULL_ASYNC /* TODO: active this code when ldb support full async code */ 
510         return ldb_next_request(ar->module, ar->sub.change_req);
511 #else
512         ret = ldb_next_request(ar->module, ar->sub.change_req);
513         if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
514
515         ar->sub.change_ret = ldb_wait(ar->sub.search_req->handle, LDB_WAIT_ALL);
516         if (ar->sub.change_ret != LDB_SUCCESS) {
517                 return replmd_replicated_request_error(ar, ar->sub.change_ret);
518         }
519
520         talloc_free(ar->sub.mem_ctx);
521         ZERO_STRUCT(ar->sub);
522
523         ar->index_current++;
524
525         return LDB_SUCCESS;
526 #endif
527 }
528
529 static int replmd_replPropertyMetaData1_attid_compare(struct replPropertyMetaData1 *m1,
530                                                       struct replPropertyMetaData1 *m2)
531 {
532         return m1->attid - m2->attid;
533 }
534
535 static int replmd_replPropertyMetaData1_conflict_compare(struct replPropertyMetaData1 *m1,
536                                                          struct replPropertyMetaData1 *m2)
537 {
538         int ret;
539
540         if (m1->version != m2->version) {
541                 return m1->version - m2->version;
542         }
543
544         if (m1->orginating_time != m2->orginating_time) {
545                 return m1->orginating_time - m2->orginating_time;
546         }
547
548         ret = GUID_compare(&m1->orginating_invocation_id, &m2->orginating_invocation_id);
549         if (ret != 0) {
550                 return ret;
551         }
552
553         return m1->orginating_usn - m2->orginating_usn;
554 }
555
556 static int replmd_replicated_apply_merge_callback(struct ldb_context *ldb,
557                                                   void *private_data,
558                                                   struct ldb_reply *ares)
559 {
560 #ifdef REPLMD_FULL_ASYNC /* TODO: active this code when ldb support full async code */ 
561         struct replmd_replicated_request *ar = talloc_get_type(private_data,
562                                                struct replmd_replicated_request);
563
564         ret = ldb_next_request(ar->module, ar->sub.change_req);
565         if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
566
567         ar->sub.change_ret = ldb_wait(ar->sub.search_req->handle, LDB_WAIT_ALL);
568         if (ar->sub.change_ret != LDB_SUCCESS) {
569                 return replmd_replicated_request_error(ar, ar->sub.change_ret);
570         }
571
572         talloc_free(ar->sub.mem_ctx);
573         ZERO_STRUCT(ar->sub);
574
575         ar->index_current++;
576
577         return LDB_SUCCESS;
578 #else
579         return LDB_SUCCESS;
580 #endif
581 }
582
583 static int replmd_replicated_apply_merge(struct replmd_replicated_request *ar)
584 {
585         NTSTATUS nt_status;
586         struct ldb_message *msg;
587         struct replPropertyMetaDataBlob *rmd;
588         struct replPropertyMetaDataBlob omd;
589         const struct ldb_val *omd_value;
590         struct replPropertyMetaDataBlob nmd;
591         struct ldb_val nmd_value;
592         uint32_t i,j,ni=0;
593         uint32_t removed_attrs = 0;
594         uint64_t seq_num;
595         int ret;
596
597         msg = ar->objs->objects[ar->index_current].msg;
598         rmd = ar->objs->objects[ar->index_current].meta_data;
599         ZERO_STRUCT(omd);
600         omd.version = 1;
601
602         /*
603          * TODO: add rename conflict handling
604          */
605         if (ldb_dn_compare(msg->dn, ar->sub.search_msg->dn) != 0) {
606                 ldb_debug(ar->module->ldb, LDB_DEBUG_FATAL, "replmd_replicated_apply_merge[%u]: rename not supported",
607                           ar->index_current);
608                 ldb_debug(ar->module->ldb, LDB_DEBUG_FATAL, "%s => %s\n",
609                           ldb_dn_get_linearized(ar->sub.search_msg->dn),
610                           ldb_dn_get_linearized(msg->dn));
611                 return replmd_replicated_request_werror(ar, WERR_NOT_SUPPORTED);
612         }
613
614         ret = ldb_sequence_number(ar->module->ldb, LDB_SEQ_NEXT, &seq_num);
615         if (ret != LDB_SUCCESS) {
616                 return replmd_replicated_request_error(ar, ret);
617         }
618
619         /* find existing meta data */
620         omd_value = ldb_msg_find_ldb_val(ar->sub.search_msg, "replPropertyMetaData");
621         if (omd_value) {
622                 nt_status = ndr_pull_struct_blob(omd_value, ar->sub.mem_ctx, &omd,
623                                                  (ndr_pull_flags_fn_t)ndr_pull_replPropertyMetaDataBlob);
624                 if (!NT_STATUS_IS_OK(nt_status)) {
625                         return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
626                 }
627
628                 if (omd.version != 1) {
629                         return replmd_replicated_request_werror(ar, WERR_DS_DRA_INTERNAL_ERROR);
630                 }
631         }
632
633         ZERO_STRUCT(nmd);
634         nmd.version = 1;
635         nmd.ctr.ctr1.count = omd.ctr.ctr1.count + rmd->ctr.ctr1.count;
636         nmd.ctr.ctr1.array = talloc_array(ar->sub.mem_ctx,
637                                           struct replPropertyMetaData1,
638                                           nmd.ctr.ctr1.count);
639         if (!nmd.ctr.ctr1.array) return replmd_replicated_request_werror(ar, WERR_NOMEM);
640
641         /* first copy the old meta data */
642         for (i=0; i < omd.ctr.ctr1.count; i++) {
643                 nmd.ctr.ctr1.array[ni]  = omd.ctr.ctr1.array[i];
644                 ni++;
645         }
646
647         /* now merge in the new meta data */
648         for (i=0; i < rmd->ctr.ctr1.count; i++) {
649                 bool found = false;
650
651                 rmd->ctr.ctr1.array[i].local_usn = seq_num;
652
653                 for (j=0; j < ni; j++) {
654                         int cmp;
655
656                         if (rmd->ctr.ctr1.array[i].attid != nmd.ctr.ctr1.array[j].attid) {
657                                 continue;
658                         }
659
660                         cmp = replmd_replPropertyMetaData1_conflict_compare(&rmd->ctr.ctr1.array[i],
661                                                                             &nmd.ctr.ctr1.array[j]);
662                         if (cmp > 0) {
663                                 /* replace the entry */
664                                 nmd.ctr.ctr1.array[j] = rmd->ctr.ctr1.array[i];
665                                 found = true;
666                                 break;
667                         }
668
669                         /* we don't want to apply this change so remove the attribute */
670                         ldb_msg_remove_element(msg, &msg->elements[i-removed_attrs]);
671                         removed_attrs++;
672
673                         found = true;
674                         break;
675                 }
676
677                 if (found) continue;
678
679                 nmd.ctr.ctr1.array[ni] = rmd->ctr.ctr1.array[i];
680                 ni++;
681         }
682
683         /*
684          * finally correct the size of the meta_data array
685          */
686         nmd.ctr.ctr1.count = ni;
687
688         /*
689          * the rdn attribute (the alias for the name attribute),
690          * 'cn' for most objects is the last entry in the meta data array
691          * we have stored
692          *
693          * as it should stay the last one in the new list, we move it to the end
694          */
695         {
696                 struct replPropertyMetaData1 *rdn_p, rdn, *last_p;
697                 uint32_t rdn_idx = omd.ctr.ctr1.count - 1;
698                 uint32_t last_idx = ni - 1;
699
700                 rdn_p = &nmd.ctr.ctr1.array[rdn_idx];
701                 rdn = *rdn_p;
702                 last_p = &nmd.ctr.ctr1.array[last_idx];
703
704                 if (last_idx > rdn_idx) {
705                         memmove(rdn_p, rdn_p+1, (last_idx - rdn_idx)*sizeof(rdn));
706                         *last_p = rdn;
707                 }
708         }
709
710         /*
711          * sort the meta data entries by attid, but skip the last one containing
712          * the rdn attribute
713          */
714         qsort(nmd.ctr.ctr1.array, nmd.ctr.ctr1.count - 1,
715               sizeof(struct replPropertyMetaData1),
716               (comparison_fn_t)replmd_replPropertyMetaData1_attid_compare);
717
718         /* create the meta data value */
719         nt_status = ndr_push_struct_blob(&nmd_value, msg, &nmd,
720                                          (ndr_push_flags_fn_t)ndr_push_replPropertyMetaDataBlob);
721         if (!NT_STATUS_IS_OK(nt_status)) {
722                 return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
723         }
724
725         /*
726          * check if some replicated attributes left, otherwise skip the ldb_modify() call
727          */
728         if (msg->num_elements == 0) {
729                 ldb_debug(ar->module->ldb, LDB_DEBUG_TRACE, "replmd_replicated_apply_merge[%u]: skip replace\n",
730                           ar->index_current);
731                 goto next_object;
732         }
733
734         ldb_debug(ar->module->ldb, LDB_DEBUG_TRACE, "replmd_replicated_apply_merge[%u]: replace %u attributes\n",
735                   ar->index_current, msg->num_elements);
736
737         /*
738          * when we now that we'll modify the record, add the whenChanged, uSNChanged
739          * and replPopertyMetaData attributes
740          */
741         ret = ldb_msg_add_string(msg, "whenChanged", ar->objs->objects[ar->index_current].when_changed);
742         if (ret != LDB_SUCCESS) {
743                 return replmd_replicated_request_error(ar, ret);
744         }
745         ret = samdb_msg_add_uint64(ar->module->ldb, msg, msg, "uSNChanged", seq_num);
746         if (ret != LDB_SUCCESS) {
747                 return replmd_replicated_request_error(ar, ret);
748         }
749         ret = ldb_msg_add_value(msg, "replPropertyMetaData", &nmd_value, NULL);
750         if (ret != LDB_SUCCESS) {
751                 return replmd_replicated_request_error(ar, ret);
752         }
753
754         /* we want to replace the old values */
755         for (i=0; i < msg->num_elements; i++) {
756                 msg->elements[i].flags = LDB_FLAG_MOD_REPLACE;
757         }
758
759         ret = ldb_build_mod_req(&ar->sub.change_req,
760                                 ar->module->ldb,
761                                 ar->sub.mem_ctx,
762                                 msg,
763                                 NULL,
764                                 ar,
765                                 replmd_replicated_apply_merge_callback);
766         if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
767
768 #ifdef REPLMD_FULL_ASYNC /* TODO: active this code when ldb support full async code */ 
769         return ldb_next_request(ar->module, ar->sub.change_req);
770 #else
771         ret = ldb_next_request(ar->module, ar->sub.change_req);
772         if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
773
774         ar->sub.change_ret = ldb_wait(ar->sub.search_req->handle, LDB_WAIT_ALL);
775         if (ar->sub.change_ret != LDB_SUCCESS) {
776                 return replmd_replicated_request_error(ar, ar->sub.change_ret);
777         }
778
779 next_object:
780         talloc_free(ar->sub.mem_ctx);
781         ZERO_STRUCT(ar->sub);
782
783         ar->index_current++;
784
785         return LDB_SUCCESS;
786 #endif
787 }
788
789 static int replmd_replicated_apply_search_callback(struct ldb_context *ldb,
790                                                    void *private_data,
791                                                    struct ldb_reply *ares)
792 {
793         struct replmd_replicated_request *ar = talloc_get_type(private_data,
794                                                struct replmd_replicated_request);
795         bool is_done = false;
796
797         switch (ares->type) {
798         case LDB_REPLY_ENTRY:
799                 ar->sub.search_msg = talloc_steal(ar->sub.mem_ctx, ares->message);
800                 break;
801         case LDB_REPLY_REFERRAL:
802                 /* we ignore referrals */
803                 break;
804         case LDB_REPLY_EXTENDED:
805         case LDB_REPLY_DONE:
806                 is_done = true;
807         }
808
809         talloc_free(ares);
810
811 #ifdef REPLMD_FULL_ASYNC /* TODO: active this code when ldb support full async code */ 
812         if (is_done) {
813                 ar->sub.search_ret = ldb_wait(ar->sub.search_req->handle, LDB_WAIT_ALL);
814                 if (ar->sub.search_ret != LDB_SUCCESS) {
815                         return replmd_replicated_request_error(ar, ar->sub.search_ret);
816                 }
817                 if (ar->sub.search_msg) {
818                         return replmd_replicated_apply_merge(ar);
819                 }
820                 return replmd_replicated_apply_add(ar);
821         }
822 #endif
823         return LDB_SUCCESS;
824 }
825
826 static int replmd_replicated_apply_search(struct replmd_replicated_request *ar)
827 {
828         int ret;
829         char *tmp_str;
830         char *filter;
831
832         tmp_str = ldb_binary_encode(ar->sub.mem_ctx, ar->objs->objects[ar->index_current].guid_value);
833         if (!tmp_str) return replmd_replicated_request_werror(ar, WERR_NOMEM);
834
835         filter = talloc_asprintf(ar->sub.mem_ctx, "(objectGUID=%s)", tmp_str);
836         if (!filter) return replmd_replicated_request_werror(ar, WERR_NOMEM);
837         talloc_free(tmp_str);
838
839         ret = ldb_build_search_req(&ar->sub.search_req,
840                                    ar->module->ldb,
841                                    ar->sub.mem_ctx,
842                                    ar->objs->partition_dn,
843                                    LDB_SCOPE_SUBTREE,
844                                    filter,
845                                    NULL,
846                                    NULL,
847                                    ar,
848                                    replmd_replicated_apply_search_callback);
849         if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
850
851 #ifdef REPLMD_FULL_ASYNC /* TODO: active this code when ldb support full async code */ 
852         return ldb_next_request(ar->module, ar->sub.search_req);
853 #else
854         ret = ldb_next_request(ar->module, ar->sub.search_req);
855         if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
856
857         ar->sub.search_ret = ldb_wait(ar->sub.search_req->handle, LDB_WAIT_ALL);
858         if (ar->sub.search_ret != LDB_SUCCESS) {
859                 return replmd_replicated_request_error(ar, ar->sub.search_ret);
860         }
861         if (ar->sub.search_msg) {
862                 return replmd_replicated_apply_merge(ar);
863         }
864
865         return replmd_replicated_apply_add(ar);
866 #endif
867 }
868
869 static int replmd_replicated_apply_next(struct replmd_replicated_request *ar)
870 {
871 #ifdef REPLMD_FULL_ASYNC /* TODO: active this code when ldb support full async code */ 
872         if (ar->index_current >= ar->objs->num_objects) {
873                 return replmd_replicated_uptodate_vector(ar);
874         }
875 #endif
876
877         ar->sub.mem_ctx = talloc_new(ar);
878         if (!ar->sub.mem_ctx) return replmd_replicated_request_werror(ar, WERR_NOMEM);
879
880         return replmd_replicated_apply_search(ar);
881 }
882
883 static int replmd_replicated_uptodate_modify_callback(struct ldb_context *ldb,
884                                                       void *private_data,
885                                                       struct ldb_reply *ares)
886 {
887 #ifdef REPLMD_FULL_ASYNC /* TODO: active this code when ldb support full async code */ 
888         struct replmd_replicated_request *ar = talloc_get_type(private_data,
889                                                struct replmd_replicated_request);
890
891         ar->sub.change_ret = ldb_wait(ar->sub.search_req->handle, LDB_WAIT_ALL);
892         if (ar->sub.change_ret != LDB_SUCCESS) {
893                 return replmd_replicated_request_error(ar, ar->sub.change_ret);
894         }
895
896         talloc_free(ar->sub.mem_ctx);
897         ZERO_STRUCT(ar->sub);
898
899         return replmd_replicated_request_done(ar);
900 #else
901         return LDB_SUCCESS;
902 #endif
903 }
904
905 static int replmd_drsuapi_DsReplicaCursor2_compare(const struct drsuapi_DsReplicaCursor2 *c1,
906                                                    const struct drsuapi_DsReplicaCursor2 *c2)
907 {
908         return GUID_compare(&c1->source_dsa_invocation_id, &c2->source_dsa_invocation_id);
909 }
910
911 static int replmd_replicated_uptodate_modify(struct replmd_replicated_request *ar)
912 {
913         NTSTATUS nt_status;
914         struct ldb_message *msg;
915         struct replUpToDateVectorBlob ouv;
916         const struct ldb_val *ouv_value;
917         const struct drsuapi_DsReplicaCursor2CtrEx *ruv;
918         struct replUpToDateVectorBlob nuv;
919         struct ldb_val nuv_value;
920         struct ldb_message_element *nuv_el = NULL;
921         const struct GUID *our_invocation_id;
922         struct ldb_message_element *orf_el = NULL;
923         struct repsFromToBlob nrf;
924         struct ldb_val *nrf_value = NULL;
925         struct ldb_message_element *nrf_el = NULL;
926         uint32_t i,j,ni=0;
927         uint64_t seq_num;
928         bool found = false;
929         time_t t = time(NULL);
930         NTTIME now;
931         int ret;
932
933         ruv = ar->objs->uptodateness_vector;
934         ZERO_STRUCT(ouv);
935         ouv.version = 2;
936         ZERO_STRUCT(nuv);
937         nuv.version = 2;
938
939         unix_to_nt_time(&now, t);
940
941         /* 
942          * we use the next sequence number for our own highest_usn
943          * because we will do a modify request and this will increment
944          * our highest_usn
945          */
946         ret = ldb_sequence_number(ar->module->ldb, LDB_SEQ_NEXT, &seq_num);
947         if (ret != LDB_SUCCESS) {
948                 return replmd_replicated_request_error(ar, ret);
949         }
950
951         /*
952          * first create the new replUpToDateVector
953          */
954         ouv_value = ldb_msg_find_ldb_val(ar->sub.search_msg, "replUpToDateVector");
955         if (ouv_value) {
956                 nt_status = ndr_pull_struct_blob(ouv_value, ar->sub.mem_ctx, &ouv,
957                                                  (ndr_pull_flags_fn_t)ndr_pull_replUpToDateVectorBlob);
958                 if (!NT_STATUS_IS_OK(nt_status)) {
959                         return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
960                 }
961
962                 if (ouv.version != 2) {
963                         return replmd_replicated_request_werror(ar, WERR_DS_DRA_INTERNAL_ERROR);
964                 }
965         }
966
967         /*
968          * the new uptodateness vector will at least
969          * contain 2 entries, one for the source_dsa and one the local server
970          *
971          * plus optional values from our old vector and the one from the source_dsa
972          */
973         nuv.ctr.ctr2.count = 2 + ouv.ctr.ctr2.count;
974         if (ruv) nuv.ctr.ctr2.count += ruv->count;
975         nuv.ctr.ctr2.cursors = talloc_array(ar->sub.mem_ctx,
976                                             struct drsuapi_DsReplicaCursor2,
977                                             nuv.ctr.ctr2.count);
978         if (!nuv.ctr.ctr2.cursors) return replmd_replicated_request_werror(ar, WERR_NOMEM);
979
980         /* first copy the old vector */
981         for (i=0; i < ouv.ctr.ctr2.count; i++) {
982                 nuv.ctr.ctr2.cursors[ni] = ouv.ctr.ctr2.cursors[i];
983                 ni++;
984         }
985
986         /* merge in the source_dsa vector is available */
987         for (i=0; (ruv && i < ruv->count); i++) {
988                 found = false;
989
990                 for (j=0; j < ni; j++) {
991                         if (!GUID_equal(&ruv->cursors[i].source_dsa_invocation_id,
992                                         &nuv.ctr.ctr2.cursors[j].source_dsa_invocation_id)) {
993                                 continue;
994                         }
995
996                         found = true;
997
998                         /*
999                          * we update only the highest_usn and not the latest_sync_success time,
1000                          * because the last success stands for direct replication
1001                          */
1002                         if (ruv->cursors[i].highest_usn > nuv.ctr.ctr2.cursors[j].highest_usn) {
1003                                 nuv.ctr.ctr2.cursors[j].highest_usn = ruv->cursors[i].highest_usn;
1004                         }
1005                         break;                  
1006                 }
1007
1008                 if (found) continue;
1009
1010                 /* if it's not there yet, add it */
1011                 nuv.ctr.ctr2.cursors[ni] = ruv->cursors[i];
1012                 ni++;
1013         }
1014
1015         /*
1016          * merge in the current highwatermark for the source_dsa
1017          */
1018         found = false;
1019         for (j=0; j < ni; j++) {
1020                 if (!GUID_equal(&ar->objs->source_dsa->source_dsa_invocation_id,
1021                                 &nuv.ctr.ctr2.cursors[j].source_dsa_invocation_id)) {
1022                         continue;
1023                 }
1024
1025                 found = true;
1026
1027                 /*
1028                  * here we update the highest_usn and last_sync_success time
1029                  * because we're directly replicating from the source_dsa
1030                  *
1031                  * and use the tmp_highest_usn because this is what we have just applied
1032                  * to our ldb
1033                  */
1034                 nuv.ctr.ctr2.cursors[j].highest_usn             = ar->objs->source_dsa->highwatermark.tmp_highest_usn;
1035                 nuv.ctr.ctr2.cursors[j].last_sync_success       = now;
1036                 break;
1037         }
1038         if (!found) {
1039                 /*
1040                  * here we update the highest_usn and last_sync_success time
1041                  * because we're directly replicating from the source_dsa
1042                  *
1043                  * and use the tmp_highest_usn because this is what we have just applied
1044                  * to our ldb
1045                  */
1046                 nuv.ctr.ctr2.cursors[ni].source_dsa_invocation_id= ar->objs->source_dsa->source_dsa_invocation_id;
1047                 nuv.ctr.ctr2.cursors[ni].highest_usn            = ar->objs->source_dsa->highwatermark.tmp_highest_usn;
1048                 nuv.ctr.ctr2.cursors[ni].last_sync_success      = now;
1049                 ni++;
1050         }
1051
1052         /*
1053          * merge our own current values if we have a invocation_id already
1054          * attached to the ldb
1055          */
1056         our_invocation_id = samdb_ntds_invocation_id(ar->module->ldb);
1057         if (our_invocation_id) {
1058                 found = false;
1059                 for (j=0; j < ni; j++) {
1060                         if (!GUID_equal(our_invocation_id,
1061                                         &nuv.ctr.ctr2.cursors[j].source_dsa_invocation_id)) {
1062                                 continue;
1063                         }
1064
1065                         found = true;
1066
1067                         /*
1068                          * here we update the highest_usn and last_sync_success time
1069                          * because it's our own entry
1070                          */
1071                         nuv.ctr.ctr2.cursors[j].highest_usn             = seq_num;
1072                         nuv.ctr.ctr2.cursors[j].last_sync_success       = now;
1073                         break;
1074                 }
1075                 if (!found) {
1076                         /*
1077                          * here we update the highest_usn and last_sync_success time
1078                          * because it's our own entry
1079                          */
1080                         nuv.ctr.ctr2.cursors[ni].source_dsa_invocation_id= *our_invocation_id;
1081                         nuv.ctr.ctr2.cursors[ni].highest_usn            = seq_num;
1082                         nuv.ctr.ctr2.cursors[ni].last_sync_success      = now;
1083                         ni++;
1084                 }
1085         }
1086
1087         /*
1088          * finally correct the size of the cursors array
1089          */
1090         nuv.ctr.ctr2.count = ni;
1091
1092         /*
1093          * sort the cursors
1094          */
1095         qsort(nuv.ctr.ctr2.cursors, nuv.ctr.ctr2.count,
1096               sizeof(struct drsuapi_DsReplicaCursor2),
1097               (comparison_fn_t)replmd_drsuapi_DsReplicaCursor2_compare);
1098
1099         /*
1100          * create the change ldb_message
1101          */
1102         msg = ldb_msg_new(ar->sub.mem_ctx);
1103         if (!msg) return replmd_replicated_request_werror(ar, WERR_NOMEM);
1104         msg->dn = ar->sub.search_msg->dn;
1105
1106         nt_status = ndr_push_struct_blob(&nuv_value, msg, &nuv,
1107                                          (ndr_push_flags_fn_t)ndr_push_replUpToDateVectorBlob);
1108         if (!NT_STATUS_IS_OK(nt_status)) {
1109                 return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
1110         }
1111         ret = ldb_msg_add_value(msg, "replUpToDateVector", &nuv_value, &nuv_el);
1112         if (ret != LDB_SUCCESS) {
1113                 return replmd_replicated_request_error(ar, ret);
1114         }
1115         nuv_el->flags = LDB_FLAG_MOD_REPLACE;
1116
1117         /*
1118          * now create the new repsFrom value from the given repsFromTo1 structure
1119          */
1120         ZERO_STRUCT(nrf);
1121         nrf.version                                     = 1;
1122         nrf.ctr.ctr1                                    = *ar->objs->source_dsa;
1123         /* and fix some values... */
1124         nrf.ctr.ctr1.consecutive_sync_failures          = 0;
1125         nrf.ctr.ctr1.last_success                       = now;
1126         nrf.ctr.ctr1.last_attempt                       = now;
1127         nrf.ctr.ctr1.result_last_attempt                = WERR_OK;
1128         nrf.ctr.ctr1.highwatermark.highest_usn          = nrf.ctr.ctr1.highwatermark.tmp_highest_usn;
1129
1130         /*
1131          * first see if we already have a repsFrom value for the current source dsa
1132          * if so we'll later replace this value
1133          */
1134         orf_el = ldb_msg_find_element(ar->sub.search_msg, "repsFrom");
1135         if (orf_el) {
1136                 for (i=0; i < orf_el->num_values; i++) {
1137                         struct repsFromToBlob *trf;
1138
1139                         trf = talloc(ar->sub.mem_ctx, struct repsFromToBlob);
1140                         if (!trf) return replmd_replicated_request_werror(ar, WERR_NOMEM);
1141
1142                         nt_status = ndr_pull_struct_blob(&orf_el->values[i], trf, trf,
1143                                                          (ndr_pull_flags_fn_t)ndr_pull_repsFromToBlob);
1144                         if (!NT_STATUS_IS_OK(nt_status)) {
1145                                 return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
1146                         }
1147
1148                         if (trf->version != 1) {
1149                                 return replmd_replicated_request_werror(ar, WERR_DS_DRA_INTERNAL_ERROR);
1150                         }
1151
1152                         /*
1153                          * we compare the source dsa objectGUID not the invocation_id
1154                          * because we want only one repsFrom value per source dsa
1155                          * and when the invocation_id of the source dsa has changed we don't need 
1156                          * the old repsFrom with the old invocation_id
1157                          */
1158                         if (!GUID_equal(&trf->ctr.ctr1.source_dsa_obj_guid,
1159                                         &ar->objs->source_dsa->source_dsa_obj_guid)) {
1160                                 talloc_free(trf);
1161                                 continue;
1162                         }
1163
1164                         talloc_free(trf);
1165                         nrf_value = &orf_el->values[i];
1166                         break;
1167                 }
1168
1169                 /*
1170                  * copy over all old values to the new ldb_message
1171                  */
1172                 ret = ldb_msg_add_empty(msg, "repsFrom", 0, &nrf_el);
1173                 if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
1174                 *nrf_el = *orf_el;
1175         }
1176
1177         /*
1178          * if we haven't found an old repsFrom value for the current source dsa
1179          * we'll add a new value
1180          */
1181         if (!nrf_value) {
1182                 struct ldb_val zero_value;
1183                 ZERO_STRUCT(zero_value);
1184                 ret = ldb_msg_add_value(msg, "repsFrom", &zero_value, &nrf_el);
1185                 if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
1186
1187                 nrf_value = &nrf_el->values[nrf_el->num_values - 1];
1188         }
1189
1190         /* we now fill the value which is already attached to ldb_message */
1191         nt_status = ndr_push_struct_blob(nrf_value, msg, &nrf,
1192                                          (ndr_push_flags_fn_t)ndr_push_repsFromToBlob);
1193         if (!NT_STATUS_IS_OK(nt_status)) {
1194                 return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
1195         }
1196
1197         /* 
1198          * the ldb_message_element for the attribute, has all the old values and the new one
1199          * so we'll replace the whole attribute with all values
1200          */
1201         nrf_el->flags = LDB_FLAG_MOD_REPLACE;
1202
1203         /* prepare the ldb_modify() request */
1204         ret = ldb_build_mod_req(&ar->sub.change_req,
1205                                 ar->module->ldb,
1206                                 ar->sub.mem_ctx,
1207                                 msg,
1208                                 NULL,
1209                                 ar,
1210                                 replmd_replicated_uptodate_modify_callback);
1211         if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
1212
1213 #ifdef REPLMD_FULL_ASYNC /* TODO: active this code when ldb support full async code */ 
1214         return ldb_next_request(ar->module, ar->sub.change_req);
1215 #else
1216         ret = ldb_next_request(ar->module, ar->sub.change_req);
1217         if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
1218
1219         ar->sub.change_ret = ldb_wait(ar->sub.search_req->handle, LDB_WAIT_ALL);
1220         if (ar->sub.change_ret != LDB_SUCCESS) {
1221                 return replmd_replicated_request_error(ar, ar->sub.change_ret);
1222         }
1223
1224         talloc_free(ar->sub.mem_ctx);
1225         ZERO_STRUCT(ar->sub);
1226
1227         return replmd_replicated_request_done(ar);
1228 #endif
1229 }
1230
1231 static int replmd_replicated_uptodate_search_callback(struct ldb_context *ldb,
1232                                                       void *private_data,
1233                                                       struct ldb_reply *ares)
1234 {
1235         struct replmd_replicated_request *ar = talloc_get_type(private_data,
1236                                                struct replmd_replicated_request);
1237         bool is_done = false;
1238
1239         switch (ares->type) {
1240         case LDB_REPLY_ENTRY:
1241                 ar->sub.search_msg = talloc_steal(ar->sub.mem_ctx, ares->message);
1242                 break;
1243         case LDB_REPLY_REFERRAL:
1244                 /* we ignore referrals */
1245                 break;
1246         case LDB_REPLY_EXTENDED:
1247         case LDB_REPLY_DONE:
1248                 is_done = true;
1249         }
1250
1251         talloc_free(ares);
1252
1253 #ifdef REPLMD_FULL_ASYNC /* TODO: active this code when ldb support full async code */ 
1254         if (is_done) {
1255                 ar->sub.search_ret = ldb_wait(ar->sub.search_req->handle, LDB_WAIT_ALL);
1256                 if (ar->sub.search_ret != LDB_SUCCESS) {
1257                         return replmd_replicated_request_error(ar, ar->sub.search_ret);
1258                 }
1259                 if (!ar->sub.search_msg) {
1260                         return replmd_replicated_request_werror(ar, WERR_DS_DRA_INTERNAL_ERROR);
1261                 }
1262
1263                 return replmd_replicated_uptodate_modify(ar);
1264         }
1265 #endif
1266         return LDB_SUCCESS;
1267 }
1268
1269 static int replmd_replicated_uptodate_search(struct replmd_replicated_request *ar)
1270 {
1271         int ret;
1272         static const char *attrs[] = {
1273                 "replUpToDateVector",
1274                 "repsFrom",
1275                 NULL
1276         };
1277
1278         ret = ldb_build_search_req(&ar->sub.search_req,
1279                                    ar->module->ldb,
1280                                    ar->sub.mem_ctx,
1281                                    ar->objs->partition_dn,
1282                                    LDB_SCOPE_BASE,
1283                                    "(objectClass=*)",
1284                                    attrs,
1285                                    NULL,
1286                                    ar,
1287                                    replmd_replicated_uptodate_search_callback);
1288         if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
1289
1290 #ifdef REPLMD_FULL_ASYNC /* TODO: active this code when ldb support full async code */ 
1291         return ldb_next_request(ar->module, ar->sub.search_req);
1292 #else
1293         ret = ldb_next_request(ar->module, ar->sub.search_req);
1294         if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
1295
1296         ar->sub.search_ret = ldb_wait(ar->sub.search_req->handle, LDB_WAIT_ALL);
1297         if (ar->sub.search_ret != LDB_SUCCESS) {
1298                 return replmd_replicated_request_error(ar, ar->sub.search_ret);
1299         }
1300         if (!ar->sub.search_msg) {
1301                 return replmd_replicated_request_werror(ar, WERR_DS_DRA_INTERNAL_ERROR);
1302         }
1303
1304         return replmd_replicated_uptodate_modify(ar);
1305 #endif
1306 }
1307
1308 static int replmd_replicated_uptodate_vector(struct replmd_replicated_request *ar)
1309 {
1310         ar->sub.mem_ctx = talloc_new(ar);
1311         if (!ar->sub.mem_ctx) return replmd_replicated_request_werror(ar, WERR_NOMEM);
1312
1313         return replmd_replicated_uptodate_search(ar);
1314 }
1315
1316 static int replmd_extended_replicated_objects(struct ldb_module *module, struct ldb_request *req)
1317 {
1318         struct dsdb_extended_replicated_objects *objs;
1319         struct replmd_replicated_request *ar;
1320
1321         ldb_debug(module->ldb, LDB_DEBUG_TRACE, "replmd_extended_replicated_objects\n");
1322
1323         objs = talloc_get_type(req->op.extended.data, struct dsdb_extended_replicated_objects);
1324         if (!objs) {
1325                 return LDB_ERR_PROTOCOL_ERROR;
1326         }
1327
1328         ar = replmd_replicated_init_handle(module, req, objs);
1329         if (!ar) {
1330                 return LDB_ERR_OPERATIONS_ERROR;
1331         }
1332
1333 #ifdef REPLMD_FULL_ASYNC /* TODO: active this code when ldb support full async code */ 
1334         return replmd_replicated_apply_next(ar);
1335 #else
1336         while (ar->index_current < ar->objs->num_objects &&
1337                req->handle->state != LDB_ASYNC_DONE) { 
1338                 replmd_replicated_apply_next(ar);
1339         }
1340
1341         if (req->handle->state != LDB_ASYNC_DONE) {
1342                 replmd_replicated_uptodate_vector(ar);
1343         }
1344
1345         return LDB_SUCCESS;
1346 #endif
1347 }
1348
1349 static int replmd_extended(struct ldb_module *module, struct ldb_request *req)
1350 {
1351         if (strcmp(req->op.extended.oid, DSDB_EXTENDED_REPLICATED_OBJECTS_OID) == 0) {
1352                 return replmd_extended_replicated_objects(module, req);
1353         }
1354
1355         return ldb_next_request(module, req);
1356 }
1357
1358 static int replmd_wait_none(struct ldb_handle *handle) {
1359         struct replmd_replicated_request *ar;
1360     
1361         if (!handle || !handle->private_data) {
1362                 return LDB_ERR_OPERATIONS_ERROR;
1363         }
1364
1365         ar = talloc_get_type(handle->private_data, struct replmd_replicated_request);
1366         if (!ar) {
1367                 return LDB_ERR_OPERATIONS_ERROR;
1368         }
1369
1370         /* we do only sync calls */
1371         if (handle->state != LDB_ASYNC_DONE) {
1372                 return LDB_ERR_OPERATIONS_ERROR;
1373         }
1374
1375         return handle->status;
1376 }
1377
1378 static int replmd_wait_all(struct ldb_handle *handle) {
1379
1380         int ret;
1381
1382         while (handle->state != LDB_ASYNC_DONE) {
1383                 ret = replmd_wait_none(handle);
1384                 if (ret != LDB_SUCCESS) {
1385                         return ret;
1386                 }
1387         }
1388
1389         return handle->status;
1390 }
1391
1392 static int replmd_wait(struct ldb_handle *handle, enum ldb_wait_type type)
1393 {
1394         if (type == LDB_WAIT_ALL) {
1395                 return replmd_wait_all(handle);
1396         } else {
1397                 return replmd_wait_none(handle);
1398         }
1399 }
1400
1401 static const struct ldb_module_ops replmd_ops = {
1402         .name          = "repl_meta_data",
1403         .add           = replmd_add,
1404         .modify        = replmd_modify,
1405         .extended      = replmd_extended,
1406         .wait          = replmd_wait
1407 };
1408
1409 int repl_meta_data_module_init(void)
1410 {
1411         return ldb_register_module(&replmd_ops);
1412 }