4 Copyright (C) Simo Sorce 2004-2006
5 Copyright (C) Andrew Bartlett <abartlet@samba.org> 2005
6 Copyright (C) Andrew Tridgell 2005
7 Copyright (C) Stefan Metzmacher 2007
9 ** NOTE! The following LGPL license applies to the ldb
10 ** library. This does NOT imply that all of Samba is released
13 This library is free software; you can redistribute it and/or
14 modify it under the terms of the GNU Lesser General Public
15 License as published by the Free Software Foundation; either
16 version 2 of the License, or (at your option) any later version.
18 This library is distributed in the hope that it will be useful,
19 but WITHOUT ANY WARRANTY; without even the implied warranty of
20 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
21 Lesser General Public License for more details.
23 You should have received a copy of the GNU Lesser General Public
24 License along with this library; if not, write to the Free Software
25 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
31 * Component: ldb repl_meta_data module
33 * Description: - add a unique objectGUID onto every new record,
34 * - handle whenCreated, whenChanged timestamps
35 * - handle uSNCreated, uSNChanged numbers
36 * - handle replPropertyMetaData attribute
39 * Author: Stefan Metzmacher
43 #include "lib/ldb/include/ldb.h"
44 #include "lib/ldb/include/ldb_errors.h"
45 #include "lib/ldb/include/ldb_private.h"
46 #include "dsdb/samdb/samdb.h"
47 #include "librpc/gen_ndr/ndr_misc.h"
48 #include "librpc/gen_ndr/ndr_drsuapi.h"
49 #include "librpc/gen_ndr/ndr_drsblobs.h"
51 struct replmd_replicated_request {
52 struct ldb_module *module;
53 struct ldb_handle *handle;
54 struct ldb_request *orig_req;
56 struct dsdb_extended_replicated_objects *objs;
58 uint32_t index_current;
62 struct ldb_request *search_req;
63 struct ldb_message *search_msg;
65 struct ldb_request *change_req;
70 static struct replmd_replicated_request *replmd_replicated_init_handle(struct ldb_module *module,
71 struct ldb_request *req,
72 struct dsdb_extended_replicated_objects *objs)
74 struct replmd_replicated_request *ar;
77 h = talloc_zero(req, struct ldb_handle);
79 ldb_set_errstring(module->ldb, "Out of Memory");
84 h->state = LDB_ASYNC_PENDING;
85 h->status = LDB_SUCCESS;
87 ar = talloc_zero(h, struct replmd_replicated_request);
89 ldb_set_errstring(module->ldb, "Out of Memory");
106 static struct ldb_message_element *replmd_find_attribute(const struct ldb_message *msg, const char *name)
110 for (i = 0; i < msg->num_elements; i++) {
111 if (ldb_attr_cmp(name, msg->elements[i].name) == 0) {
112 return &msg->elements[i];
120 add a time element to a record
122 static int add_time_element(struct ldb_message *msg, const char *attr, time_t t)
124 struct ldb_message_element *el;
127 if (ldb_msg_find_element(msg, attr) != NULL) {
131 s = ldb_timestring(msg, t);
136 if (ldb_msg_add_string(msg, attr, s) != 0) {
140 el = ldb_msg_find_element(msg, attr);
141 /* always set as replace. This works because on add ops, the flag
143 el->flags = LDB_FLAG_MOD_REPLACE;
149 add a uint64_t element to a record
151 static int add_uint64_element(struct ldb_message *msg, const char *attr, uint64_t v)
153 struct ldb_message_element *el;
155 if (ldb_msg_find_element(msg, attr) != NULL) {
159 if (ldb_msg_add_fmt(msg, attr, "%llu", (unsigned long long)v) != 0) {
163 el = ldb_msg_find_element(msg, attr);
164 /* always set as replace. This works because on add ops, the flag
166 el->flags = LDB_FLAG_MOD_REPLACE;
171 static int replmd_add_replicated(struct ldb_module *module, struct ldb_request *req, struct ldb_control *ctrl)
173 struct ldb_control **saved_ctrls;
176 ldb_debug(module->ldb, LDB_DEBUG_TRACE, "replmd_add_replicated\n");
178 if (!save_controls(ctrl, req, &saved_ctrls)) {
179 return LDB_ERR_OPERATIONS_ERROR;
182 ret = ldb_next_request(module, req);
183 req->controls = saved_ctrls;
188 static int replmd_add_originating(struct ldb_module *module, struct ldb_request *req)
190 struct ldb_request *down_req;
191 struct ldb_message_element *attribute;
192 struct ldb_message *msg;
198 time_t t = time(NULL);
200 ldb_debug(module->ldb, LDB_DEBUG_TRACE, "replmd_add_originating\n");
202 if ((attribute = replmd_find_attribute(req->op.add.message, "objectGUID")) != NULL ) {
203 return ldb_next_request(module, req);
206 down_req = talloc(req, struct ldb_request);
207 if (down_req == NULL) {
208 return LDB_ERR_OPERATIONS_ERROR;
213 /* we have to copy the message as the caller might have it as a const */
214 down_req->op.add.message = msg = ldb_msg_copy_shallow(down_req, req->op.add.message);
216 talloc_free(down_req);
217 return LDB_ERR_OPERATIONS_ERROR;
221 guid = GUID_random();
223 nt_status = ndr_push_struct_blob(&v, msg, &guid,
224 (ndr_push_flags_fn_t)ndr_push_GUID);
225 if (!NT_STATUS_IS_OK(nt_status)) {
226 talloc_free(down_req);
227 return LDB_ERR_OPERATIONS_ERROR;
230 ret = ldb_msg_add_value(msg, "objectGUID", &v, NULL);
232 talloc_free(down_req);
236 if (add_time_element(msg, "whenCreated", t) != 0 ||
237 add_time_element(msg, "whenChanged", t) != 0) {
238 talloc_free(down_req);
239 return LDB_ERR_OPERATIONS_ERROR;
242 /* Get a sequence number from the backend */
243 ret = ldb_sequence_number(module->ldb, LDB_SEQ_NEXT, &seq_num);
244 if (ret == LDB_SUCCESS) {
245 if (add_uint64_element(msg, "uSNCreated", seq_num) != 0 ||
246 add_uint64_element(msg, "uSNChanged", seq_num) != 0) {
247 talloc_free(down_req);
248 return LDB_ERR_OPERATIONS_ERROR;
252 ldb_set_timeout_from_prev_req(module->ldb, req, down_req);
254 /* go on with the call chain */
255 ret = ldb_next_request(module, down_req);
257 /* do not free down_req as the call results may be linked to it,
258 * it will be freed when the upper level request get freed */
259 if (ret == LDB_SUCCESS) {
260 req->handle = down_req->handle;
266 static int replmd_add(struct ldb_module *module, struct ldb_request *req)
268 struct ldb_control *ctrl;
270 /* do not manipulate our control entries */
271 if (ldb_dn_is_special(req->op.add.message->dn)) {
272 return ldb_next_request(module, req);
275 ctrl = get_control_from_list(req->controls, DSDB_CONTROL_REPLICATED_OBJECT_OID);
277 /* handle replicated objects different */
278 return replmd_add_replicated(module, req, ctrl);
281 return replmd_add_originating(module, req);
284 static int replmd_modify_replicated(struct ldb_module *module, struct ldb_request *req, struct ldb_control *ctrl)
286 struct ldb_control **saved_ctrls;
289 ldb_debug(module->ldb, LDB_DEBUG_TRACE, "replmd_modify_replicated\n");
291 if (!save_controls(ctrl, req, &saved_ctrls)) {
292 return LDB_ERR_OPERATIONS_ERROR;
295 ret = ldb_next_request(module, req);
296 req->controls = saved_ctrls;
301 static int replmd_modify_originating(struct ldb_module *module, struct ldb_request *req)
303 struct ldb_request *down_req;
304 struct ldb_message *msg;
306 time_t t = time(NULL);
309 ldb_debug(module->ldb, LDB_DEBUG_TRACE, "replmd_modify_originating\n");
311 down_req = talloc(req, struct ldb_request);
312 if (down_req == NULL) {
313 return LDB_ERR_OPERATIONS_ERROR;
318 /* we have to copy the message as the caller might have it as a const */
319 down_req->op.mod.message = msg = ldb_msg_copy_shallow(down_req, req->op.mod.message);
321 talloc_free(down_req);
322 return LDB_ERR_OPERATIONS_ERROR;
325 if (add_time_element(msg, "whenChanged", t) != 0) {
326 talloc_free(down_req);
327 return LDB_ERR_OPERATIONS_ERROR;
330 /* Get a sequence number from the backend */
331 ret = ldb_sequence_number(module->ldb, LDB_SEQ_NEXT, &seq_num);
332 if (ret == LDB_SUCCESS) {
333 if (add_uint64_element(msg, "uSNChanged", seq_num) != 0) {
334 talloc_free(down_req);
335 return LDB_ERR_OPERATIONS_ERROR;
339 ldb_set_timeout_from_prev_req(module->ldb, req, down_req);
341 /* go on with the call chain */
342 ret = ldb_next_request(module, down_req);
344 /* do not free down_req as the call results may be linked to it,
345 * it will be freed when the upper level request get freed */
346 if (ret == LDB_SUCCESS) {
347 req->handle = down_req->handle;
353 static int replmd_modify(struct ldb_module *module, struct ldb_request *req)
355 struct ldb_control *ctrl;
357 /* do not manipulate our control entries */
358 if (ldb_dn_is_special(req->op.mod.message->dn)) {
359 return ldb_next_request(module, req);
362 ctrl = get_control_from_list(req->controls, DSDB_CONTROL_REPLICATED_OBJECT_OID);
364 /* handle replicated objects different */
365 return replmd_modify_replicated(module, req, ctrl);
368 return replmd_modify_originating(module, req);
371 static int replmd_replicated_request_reply_helper(struct replmd_replicated_request *ar, int ret)
373 struct ldb_reply *ares = NULL;
375 ar->handle->status = ret;
376 ar->handle->state = LDB_ASYNC_DONE;
378 if (!ar->orig_req->callback) {
382 /* we're done and need to report the success to the caller */
383 ares = talloc_zero(ar, struct ldb_reply);
385 ar->handle->status = LDB_ERR_OPERATIONS_ERROR;
386 ar->handle->state = LDB_ASYNC_DONE;
387 return LDB_ERR_OPERATIONS_ERROR;
390 ares->type = LDB_REPLY_EXTENDED;
391 ares->response = NULL;
393 return ar->orig_req->callback(ar->module->ldb, ar->orig_req->context, ares);
396 static int replmd_replicated_request_done(struct replmd_replicated_request *ar)
398 return replmd_replicated_request_reply_helper(ar, LDB_SUCCESS);
401 static int replmd_replicated_request_error(struct replmd_replicated_request *ar, int ret)
403 return replmd_replicated_request_reply_helper(ar, ret);
406 static int replmd_replicated_request_werror(struct replmd_replicated_request *ar, WERROR status)
408 int ret = LDB_ERR_OTHER;
409 /* TODO: do some error mapping */
410 return replmd_replicated_request_reply_helper(ar, ret);
413 static int replmd_replicated_apply_next(struct replmd_replicated_request *ar);
415 static int replmd_replicated_apply_add_callback(struct ldb_context *ldb,
417 struct ldb_reply *ares)
419 #ifdef REPLMD_FULL_ASYNC /* TODO: active this code when ldb support full async code */
420 struct replmd_replicated_request *ar = talloc_get_type(private_data,
421 struct replmd_replicated_request);
423 ar->sub.change_ret = ldb_wait(ar->sub.search_req->handle, LDB_WAIT_ALL);
424 if (ar->sub.change_ret != LDB_SUCCESS) {
425 return replmd_replicated_request_error(ar, ar->sub.change_ret);
428 talloc_free(ar->sub.mem_ctx);
429 ZERO_STRUCT(ar->sub);
433 return replmd_replicated_apply_next(ar);
439 static int replmd_replicated_apply_add(struct replmd_replicated_request *ar)
442 struct ldb_message *msg;
443 struct replPropertyMetaDataBlob *md;
444 struct ldb_val md_value;
450 * TODO: check if the parent object exist
454 * TODO: handle the conflict case where an object with the
458 msg = ar->objs->objects[ar->index_current].msg;
459 md = ar->objs->objects[ar->index_current].meta_data;
461 ret = ldb_sequence_number(ar->module->ldb, LDB_SEQ_NEXT, &seq_num);
462 if (ret != LDB_SUCCESS) {
463 return replmd_replicated_request_error(ar, ret);
466 ret = ldb_msg_add_value(msg, "objectGUID", &ar->objs->objects[ar->index_current].guid_value, NULL);
467 if (ret != LDB_SUCCESS) {
468 return replmd_replicated_request_error(ar, ret);
471 ret = ldb_msg_add_string(msg, "whenChanged", ar->objs->objects[ar->index_current].when_changed);
472 if (ret != LDB_SUCCESS) {
473 return replmd_replicated_request_error(ar, ret);
476 ret = samdb_msg_add_uint64(ar->module->ldb, msg, msg, "uSNCreated", seq_num);
477 if (ret != LDB_SUCCESS) {
478 return replmd_replicated_request_error(ar, ret);
481 ret = samdb_msg_add_uint64(ar->module->ldb, msg, msg, "uSNChanged", seq_num);
482 if (ret != LDB_SUCCESS) {
483 return replmd_replicated_request_error(ar, ret);
486 md = ar->objs->objects[ar->index_current].meta_data;
487 for (i=0; i < md->ctr.ctr1.count; i++) {
488 md->ctr.ctr1.array[i].local_usn = seq_num;
490 nt_status = ndr_push_struct_blob(&md_value, msg, md,
491 (ndr_push_flags_fn_t)ndr_push_replPropertyMetaDataBlob);
492 if (!NT_STATUS_IS_OK(nt_status)) {
493 return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
495 ret = ldb_msg_add_value(msg, "replPropertyMetaData", &md_value, NULL);
496 if (ret != LDB_SUCCESS) {
497 return replmd_replicated_request_error(ar, ret);
500 ret = ldb_build_add_req(&ar->sub.change_req,
506 replmd_replicated_apply_add_callback);
507 if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
509 #ifdef REPLMD_FULL_ASYNC /* TODO: active this code when ldb support full async code */
510 return ldb_next_request(ar->module, ar->sub.change_req);
512 ret = ldb_next_request(ar->module, ar->sub.change_req);
513 if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
515 ar->sub.change_ret = ldb_wait(ar->sub.search_req->handle, LDB_WAIT_ALL);
516 if (ar->sub.change_ret != LDB_SUCCESS) {
517 return replmd_replicated_request_error(ar, ar->sub.change_ret);
520 talloc_free(ar->sub.mem_ctx);
521 ZERO_STRUCT(ar->sub);
529 static int replmd_replPropertyMetaData1_attid_compare(struct replPropertyMetaData1 *m1,
530 struct replPropertyMetaData1 *m2)
532 return m1->attid - m2->attid;
535 static int replmd_replPropertyMetaData1_conflict_compare(struct replPropertyMetaData1 *m1,
536 struct replPropertyMetaData1 *m2)
540 if (m1->version != m2->version) {
541 return m1->version - m2->version;
544 if (m1->orginating_time != m2->orginating_time) {
545 return m1->orginating_time - m2->orginating_time;
548 ret = GUID_compare(&m1->orginating_invocation_id, &m2->orginating_invocation_id);
553 return m1->orginating_usn - m2->orginating_usn;
556 static int replmd_replicated_apply_merge_callback(struct ldb_context *ldb,
558 struct ldb_reply *ares)
560 #ifdef REPLMD_FULL_ASYNC /* TODO: active this code when ldb support full async code */
561 struct replmd_replicated_request *ar = talloc_get_type(private_data,
562 struct replmd_replicated_request);
564 ret = ldb_next_request(ar->module, ar->sub.change_req);
565 if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
567 ar->sub.change_ret = ldb_wait(ar->sub.search_req->handle, LDB_WAIT_ALL);
568 if (ar->sub.change_ret != LDB_SUCCESS) {
569 return replmd_replicated_request_error(ar, ar->sub.change_ret);
572 talloc_free(ar->sub.mem_ctx);
573 ZERO_STRUCT(ar->sub);
583 static int replmd_replicated_apply_merge(struct replmd_replicated_request *ar)
586 struct ldb_message *msg;
587 struct replPropertyMetaDataBlob *rmd;
588 struct replPropertyMetaDataBlob omd;
589 const struct ldb_val *omd_value;
590 struct replPropertyMetaDataBlob nmd;
591 struct ldb_val nmd_value;
593 uint32_t removed_attrs = 0;
597 msg = ar->objs->objects[ar->index_current].msg;
598 rmd = ar->objs->objects[ar->index_current].meta_data;
603 * TODO: add rename conflict handling
605 if (ldb_dn_compare(msg->dn, ar->sub.search_msg->dn) != 0) {
606 ldb_debug(ar->module->ldb, LDB_DEBUG_FATAL, "replmd_replicated_apply_merge[%u]: rename not supported",
608 ldb_debug(ar->module->ldb, LDB_DEBUG_FATAL, "%s => %s\n",
609 ldb_dn_get_linearized(ar->sub.search_msg->dn),
610 ldb_dn_get_linearized(msg->dn));
611 return replmd_replicated_request_werror(ar, WERR_NOT_SUPPORTED);
614 ret = ldb_sequence_number(ar->module->ldb, LDB_SEQ_NEXT, &seq_num);
615 if (ret != LDB_SUCCESS) {
616 return replmd_replicated_request_error(ar, ret);
619 /* find existing meta data */
620 omd_value = ldb_msg_find_ldb_val(ar->sub.search_msg, "replPropertyMetaData");
622 nt_status = ndr_pull_struct_blob(omd_value, ar->sub.mem_ctx, &omd,
623 (ndr_pull_flags_fn_t)ndr_pull_replPropertyMetaDataBlob);
624 if (!NT_STATUS_IS_OK(nt_status)) {
625 return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
628 if (omd.version != 1) {
629 return replmd_replicated_request_werror(ar, WERR_DS_DRA_INTERNAL_ERROR);
635 nmd.ctr.ctr1.count = omd.ctr.ctr1.count + rmd->ctr.ctr1.count;
636 nmd.ctr.ctr1.array = talloc_array(ar->sub.mem_ctx,
637 struct replPropertyMetaData1,
639 if (!nmd.ctr.ctr1.array) return replmd_replicated_request_werror(ar, WERR_NOMEM);
641 /* first copy the old meta data */
642 for (i=0; i < omd.ctr.ctr1.count; i++) {
643 nmd.ctr.ctr1.array[ni] = omd.ctr.ctr1.array[i];
647 /* now merge in the new meta data */
648 for (i=0; i < rmd->ctr.ctr1.count; i++) {
651 rmd->ctr.ctr1.array[i].local_usn = seq_num;
653 for (j=0; j < ni; j++) {
656 if (rmd->ctr.ctr1.array[i].attid != nmd.ctr.ctr1.array[j].attid) {
660 cmp = replmd_replPropertyMetaData1_conflict_compare(&rmd->ctr.ctr1.array[i],
661 &nmd.ctr.ctr1.array[j]);
663 /* replace the entry */
664 nmd.ctr.ctr1.array[j] = rmd->ctr.ctr1.array[i];
669 /* we don't want to apply this change so remove the attribute */
670 ldb_msg_remove_element(msg, &msg->elements[i-removed_attrs]);
679 nmd.ctr.ctr1.array[ni] = rmd->ctr.ctr1.array[i];
684 * finally correct the size of the meta_data array
686 nmd.ctr.ctr1.count = ni;
689 * the rdn attribute (the alias for the name attribute),
690 * 'cn' for most objects is the last entry in the meta data array
693 * as it should stay the last one in the new list, we move it to the end
696 struct replPropertyMetaData1 *rdn_p, rdn, *last_p;
697 uint32_t rdn_idx = omd.ctr.ctr1.count - 1;
698 uint32_t last_idx = ni - 1;
700 rdn_p = &nmd.ctr.ctr1.array[rdn_idx];
702 last_p = &nmd.ctr.ctr1.array[last_idx];
704 if (last_idx > rdn_idx) {
705 memmove(rdn_p, rdn_p+1, (last_idx - rdn_idx)*sizeof(rdn));
711 * sort the meta data entries by attid, but skip the last one containing
714 qsort(nmd.ctr.ctr1.array, nmd.ctr.ctr1.count - 1,
715 sizeof(struct replPropertyMetaData1),
716 (comparison_fn_t)replmd_replPropertyMetaData1_attid_compare);
718 /* create the meta data value */
719 nt_status = ndr_push_struct_blob(&nmd_value, msg, &nmd,
720 (ndr_push_flags_fn_t)ndr_push_replPropertyMetaDataBlob);
721 if (!NT_STATUS_IS_OK(nt_status)) {
722 return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
726 * check if some replicated attributes left, otherwise skip the ldb_modify() call
728 if (msg->num_elements == 0) {
729 ldb_debug(ar->module->ldb, LDB_DEBUG_TRACE, "replmd_replicated_apply_merge[%u]: skip replace\n",
734 ldb_debug(ar->module->ldb, LDB_DEBUG_TRACE, "replmd_replicated_apply_merge[%u]: replace %u attributes\n",
735 ar->index_current, msg->num_elements);
738 * when we now that we'll modify the record, add the whenChanged, uSNChanged
739 * and replPopertyMetaData attributes
741 ret = ldb_msg_add_string(msg, "whenChanged", ar->objs->objects[ar->index_current].when_changed);
742 if (ret != LDB_SUCCESS) {
743 return replmd_replicated_request_error(ar, ret);
745 ret = samdb_msg_add_uint64(ar->module->ldb, msg, msg, "uSNChanged", seq_num);
746 if (ret != LDB_SUCCESS) {
747 return replmd_replicated_request_error(ar, ret);
749 ret = ldb_msg_add_value(msg, "replPropertyMetaData", &nmd_value, NULL);
750 if (ret != LDB_SUCCESS) {
751 return replmd_replicated_request_error(ar, ret);
754 /* we want to replace the old values */
755 for (i=0; i < msg->num_elements; i++) {
756 msg->elements[i].flags = LDB_FLAG_MOD_REPLACE;
759 ret = ldb_build_mod_req(&ar->sub.change_req,
765 replmd_replicated_apply_merge_callback);
766 if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
768 #ifdef REPLMD_FULL_ASYNC /* TODO: active this code when ldb support full async code */
769 return ldb_next_request(ar->module, ar->sub.change_req);
771 ret = ldb_next_request(ar->module, ar->sub.change_req);
772 if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
774 ar->sub.change_ret = ldb_wait(ar->sub.search_req->handle, LDB_WAIT_ALL);
775 if (ar->sub.change_ret != LDB_SUCCESS) {
776 return replmd_replicated_request_error(ar, ar->sub.change_ret);
780 talloc_free(ar->sub.mem_ctx);
781 ZERO_STRUCT(ar->sub);
789 static int replmd_replicated_apply_search_callback(struct ldb_context *ldb,
791 struct ldb_reply *ares)
793 struct replmd_replicated_request *ar = talloc_get_type(private_data,
794 struct replmd_replicated_request);
795 bool is_done = false;
797 switch (ares->type) {
798 case LDB_REPLY_ENTRY:
799 ar->sub.search_msg = talloc_steal(ar->sub.mem_ctx, ares->message);
801 case LDB_REPLY_REFERRAL:
802 /* we ignore referrals */
804 case LDB_REPLY_EXTENDED:
811 #ifdef REPLMD_FULL_ASYNC /* TODO: active this code when ldb support full async code */
813 ar->sub.search_ret = ldb_wait(ar->sub.search_req->handle, LDB_WAIT_ALL);
814 if (ar->sub.search_ret != LDB_SUCCESS) {
815 return replmd_replicated_request_error(ar, ar->sub.search_ret);
817 if (ar->sub.search_msg) {
818 return replmd_replicated_apply_merge(ar);
820 return replmd_replicated_apply_add(ar);
826 static int replmd_replicated_apply_search(struct replmd_replicated_request *ar)
832 tmp_str = ldb_binary_encode(ar->sub.mem_ctx, ar->objs->objects[ar->index_current].guid_value);
833 if (!tmp_str) return replmd_replicated_request_werror(ar, WERR_NOMEM);
835 filter = talloc_asprintf(ar->sub.mem_ctx, "(objectGUID=%s)", tmp_str);
836 if (!filter) return replmd_replicated_request_werror(ar, WERR_NOMEM);
837 talloc_free(tmp_str);
839 ret = ldb_build_search_req(&ar->sub.search_req,
842 ar->objs->partition_dn,
848 replmd_replicated_apply_search_callback);
849 if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
851 #ifdef REPLMD_FULL_ASYNC /* TODO: active this code when ldb support full async code */
852 return ldb_next_request(ar->module, ar->sub.search_req);
854 ret = ldb_next_request(ar->module, ar->sub.search_req);
855 if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
857 ar->sub.search_ret = ldb_wait(ar->sub.search_req->handle, LDB_WAIT_ALL);
858 if (ar->sub.search_ret != LDB_SUCCESS) {
859 return replmd_replicated_request_error(ar, ar->sub.search_ret);
861 if (ar->sub.search_msg) {
862 return replmd_replicated_apply_merge(ar);
865 return replmd_replicated_apply_add(ar);
869 static int replmd_replicated_apply_next(struct replmd_replicated_request *ar)
871 #ifdef REPLMD_FULL_ASYNC /* TODO: active this code when ldb support full async code */
872 if (ar->index_current >= ar->objs->num_objects) {
873 return replmd_replicated_uptodate_vector(ar);
877 ar->sub.mem_ctx = talloc_new(ar);
878 if (!ar->sub.mem_ctx) return replmd_replicated_request_werror(ar, WERR_NOMEM);
880 return replmd_replicated_apply_search(ar);
883 static int replmd_replicated_uptodate_modify_callback(struct ldb_context *ldb,
885 struct ldb_reply *ares)
887 #ifdef REPLMD_FULL_ASYNC /* TODO: active this code when ldb support full async code */
888 struct replmd_replicated_request *ar = talloc_get_type(private_data,
889 struct replmd_replicated_request);
891 ar->sub.change_ret = ldb_wait(ar->sub.search_req->handle, LDB_WAIT_ALL);
892 if (ar->sub.change_ret != LDB_SUCCESS) {
893 return replmd_replicated_request_error(ar, ar->sub.change_ret);
896 talloc_free(ar->sub.mem_ctx);
897 ZERO_STRUCT(ar->sub);
899 return replmd_replicated_request_done(ar);
905 static int replmd_drsuapi_DsReplicaCursor2_compare(const struct drsuapi_DsReplicaCursor2 *c1,
906 const struct drsuapi_DsReplicaCursor2 *c2)
908 return GUID_compare(&c1->source_dsa_invocation_id, &c2->source_dsa_invocation_id);
911 static int replmd_replicated_uptodate_modify(struct replmd_replicated_request *ar)
914 struct ldb_message *msg;
915 struct replUpToDateVectorBlob ouv;
916 const struct ldb_val *ouv_value;
917 const struct drsuapi_DsReplicaCursor2CtrEx *ruv;
918 struct replUpToDateVectorBlob nuv;
919 struct ldb_val nuv_value;
920 struct ldb_message_element *nuv_el = NULL;
921 const struct GUID *our_invocation_id;
922 struct ldb_message_element *orf_el = NULL;
923 struct repsFromToBlob nrf;
924 struct ldb_val *nrf_value = NULL;
925 struct ldb_message_element *nrf_el = NULL;
929 time_t t = time(NULL);
933 ruv = ar->objs->uptodateness_vector;
939 unix_to_nt_time(&now, t);
942 * we use the next sequence number for our own highest_usn
943 * because we will do a modify request and this will increment
946 ret = ldb_sequence_number(ar->module->ldb, LDB_SEQ_NEXT, &seq_num);
947 if (ret != LDB_SUCCESS) {
948 return replmd_replicated_request_error(ar, ret);
952 * first create the new replUpToDateVector
954 ouv_value = ldb_msg_find_ldb_val(ar->sub.search_msg, "replUpToDateVector");
956 nt_status = ndr_pull_struct_blob(ouv_value, ar->sub.mem_ctx, &ouv,
957 (ndr_pull_flags_fn_t)ndr_pull_replUpToDateVectorBlob);
958 if (!NT_STATUS_IS_OK(nt_status)) {
959 return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
962 if (ouv.version != 2) {
963 return replmd_replicated_request_werror(ar, WERR_DS_DRA_INTERNAL_ERROR);
968 * the new uptodateness vector will at least
969 * contain 2 entries, one for the source_dsa and one the local server
971 * plus optional values from our old vector and the one from the source_dsa
973 nuv.ctr.ctr2.count = 2 + ouv.ctr.ctr2.count;
974 if (ruv) nuv.ctr.ctr2.count += ruv->count;
975 nuv.ctr.ctr2.cursors = talloc_array(ar->sub.mem_ctx,
976 struct drsuapi_DsReplicaCursor2,
978 if (!nuv.ctr.ctr2.cursors) return replmd_replicated_request_werror(ar, WERR_NOMEM);
980 /* first copy the old vector */
981 for (i=0; i < ouv.ctr.ctr2.count; i++) {
982 nuv.ctr.ctr2.cursors[ni] = ouv.ctr.ctr2.cursors[i];
986 /* merge in the source_dsa vector is available */
987 for (i=0; (ruv && i < ruv->count); i++) {
990 for (j=0; j < ni; j++) {
991 if (!GUID_equal(&ruv->cursors[i].source_dsa_invocation_id,
992 &nuv.ctr.ctr2.cursors[j].source_dsa_invocation_id)) {
999 * we update only the highest_usn and not the latest_sync_success time,
1000 * because the last success stands for direct replication
1002 if (ruv->cursors[i].highest_usn > nuv.ctr.ctr2.cursors[j].highest_usn) {
1003 nuv.ctr.ctr2.cursors[j].highest_usn = ruv->cursors[i].highest_usn;
1008 if (found) continue;
1010 /* if it's not there yet, add it */
1011 nuv.ctr.ctr2.cursors[ni] = ruv->cursors[i];
1016 * merge in the current highwatermark for the source_dsa
1019 for (j=0; j < ni; j++) {
1020 if (!GUID_equal(&ar->objs->source_dsa->source_dsa_invocation_id,
1021 &nuv.ctr.ctr2.cursors[j].source_dsa_invocation_id)) {
1028 * here we update the highest_usn and last_sync_success time
1029 * because we're directly replicating from the source_dsa
1031 * and use the tmp_highest_usn because this is what we have just applied
1034 nuv.ctr.ctr2.cursors[j].highest_usn = ar->objs->source_dsa->highwatermark.tmp_highest_usn;
1035 nuv.ctr.ctr2.cursors[j].last_sync_success = now;
1040 * here we update the highest_usn and last_sync_success time
1041 * because we're directly replicating from the source_dsa
1043 * and use the tmp_highest_usn because this is what we have just applied
1046 nuv.ctr.ctr2.cursors[ni].source_dsa_invocation_id= ar->objs->source_dsa->source_dsa_invocation_id;
1047 nuv.ctr.ctr2.cursors[ni].highest_usn = ar->objs->source_dsa->highwatermark.tmp_highest_usn;
1048 nuv.ctr.ctr2.cursors[ni].last_sync_success = now;
1053 * merge our own current values if we have a invocation_id already
1054 * attached to the ldb
1056 our_invocation_id = samdb_ntds_invocation_id(ar->module->ldb);
1057 if (our_invocation_id) {
1059 for (j=0; j < ni; j++) {
1060 if (!GUID_equal(our_invocation_id,
1061 &nuv.ctr.ctr2.cursors[j].source_dsa_invocation_id)) {
1068 * here we update the highest_usn and last_sync_success time
1069 * because it's our own entry
1071 nuv.ctr.ctr2.cursors[j].highest_usn = seq_num;
1072 nuv.ctr.ctr2.cursors[j].last_sync_success = now;
1077 * here we update the highest_usn and last_sync_success time
1078 * because it's our own entry
1080 nuv.ctr.ctr2.cursors[ni].source_dsa_invocation_id= *our_invocation_id;
1081 nuv.ctr.ctr2.cursors[ni].highest_usn = seq_num;
1082 nuv.ctr.ctr2.cursors[ni].last_sync_success = now;
1088 * finally correct the size of the cursors array
1090 nuv.ctr.ctr2.count = ni;
1095 qsort(nuv.ctr.ctr2.cursors, nuv.ctr.ctr2.count,
1096 sizeof(struct drsuapi_DsReplicaCursor2),
1097 (comparison_fn_t)replmd_drsuapi_DsReplicaCursor2_compare);
1100 * create the change ldb_message
1102 msg = ldb_msg_new(ar->sub.mem_ctx);
1103 if (!msg) return replmd_replicated_request_werror(ar, WERR_NOMEM);
1104 msg->dn = ar->sub.search_msg->dn;
1106 nt_status = ndr_push_struct_blob(&nuv_value, msg, &nuv,
1107 (ndr_push_flags_fn_t)ndr_push_replUpToDateVectorBlob);
1108 if (!NT_STATUS_IS_OK(nt_status)) {
1109 return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
1111 ret = ldb_msg_add_value(msg, "replUpToDateVector", &nuv_value, &nuv_el);
1112 if (ret != LDB_SUCCESS) {
1113 return replmd_replicated_request_error(ar, ret);
1115 nuv_el->flags = LDB_FLAG_MOD_REPLACE;
1118 * now create the new repsFrom value from the given repsFromTo1 structure
1122 nrf.ctr.ctr1 = *ar->objs->source_dsa;
1123 /* and fix some values... */
1124 nrf.ctr.ctr1.consecutive_sync_failures = 0;
1125 nrf.ctr.ctr1.last_success = now;
1126 nrf.ctr.ctr1.last_attempt = now;
1127 nrf.ctr.ctr1.result_last_attempt = WERR_OK;
1128 nrf.ctr.ctr1.highwatermark.highest_usn = nrf.ctr.ctr1.highwatermark.tmp_highest_usn;
1131 * first see if we already have a repsFrom value for the current source dsa
1132 * if so we'll later replace this value
1134 orf_el = ldb_msg_find_element(ar->sub.search_msg, "repsFrom");
1136 for (i=0; i < orf_el->num_values; i++) {
1137 struct repsFromToBlob *trf;
1139 trf = talloc(ar->sub.mem_ctx, struct repsFromToBlob);
1140 if (!trf) return replmd_replicated_request_werror(ar, WERR_NOMEM);
1142 nt_status = ndr_pull_struct_blob(&orf_el->values[i], trf, trf,
1143 (ndr_pull_flags_fn_t)ndr_pull_repsFromToBlob);
1144 if (!NT_STATUS_IS_OK(nt_status)) {
1145 return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
1148 if (trf->version != 1) {
1149 return replmd_replicated_request_werror(ar, WERR_DS_DRA_INTERNAL_ERROR);
1153 * we compare the source dsa objectGUID not the invocation_id
1154 * because we want only one repsFrom value per source dsa
1155 * and when the invocation_id of the source dsa has changed we don't need
1156 * the old repsFrom with the old invocation_id
1158 if (!GUID_equal(&trf->ctr.ctr1.source_dsa_obj_guid,
1159 &ar->objs->source_dsa->source_dsa_obj_guid)) {
1165 nrf_value = &orf_el->values[i];
1170 * copy over all old values to the new ldb_message
1172 ret = ldb_msg_add_empty(msg, "repsFrom", 0, &nrf_el);
1173 if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
1178 * if we haven't found an old repsFrom value for the current source dsa
1179 * we'll add a new value
1182 struct ldb_val zero_value;
1183 ZERO_STRUCT(zero_value);
1184 ret = ldb_msg_add_value(msg, "repsFrom", &zero_value, &nrf_el);
1185 if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
1187 nrf_value = &nrf_el->values[nrf_el->num_values - 1];
1190 /* we now fill the value which is already attached to ldb_message */
1191 nt_status = ndr_push_struct_blob(nrf_value, msg, &nrf,
1192 (ndr_push_flags_fn_t)ndr_push_repsFromToBlob);
1193 if (!NT_STATUS_IS_OK(nt_status)) {
1194 return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
1198 * the ldb_message_element for the attribute, has all the old values and the new one
1199 * so we'll replace the whole attribute with all values
1201 nrf_el->flags = LDB_FLAG_MOD_REPLACE;
1203 /* prepare the ldb_modify() request */
1204 ret = ldb_build_mod_req(&ar->sub.change_req,
1210 replmd_replicated_uptodate_modify_callback);
1211 if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
1213 #ifdef REPLMD_FULL_ASYNC /* TODO: active this code when ldb support full async code */
1214 return ldb_next_request(ar->module, ar->sub.change_req);
1216 ret = ldb_next_request(ar->module, ar->sub.change_req);
1217 if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
1219 ar->sub.change_ret = ldb_wait(ar->sub.search_req->handle, LDB_WAIT_ALL);
1220 if (ar->sub.change_ret != LDB_SUCCESS) {
1221 return replmd_replicated_request_error(ar, ar->sub.change_ret);
1224 talloc_free(ar->sub.mem_ctx);
1225 ZERO_STRUCT(ar->sub);
1227 return replmd_replicated_request_done(ar);
1231 static int replmd_replicated_uptodate_search_callback(struct ldb_context *ldb,
1233 struct ldb_reply *ares)
1235 struct replmd_replicated_request *ar = talloc_get_type(private_data,
1236 struct replmd_replicated_request);
1237 bool is_done = false;
1239 switch (ares->type) {
1240 case LDB_REPLY_ENTRY:
1241 ar->sub.search_msg = talloc_steal(ar->sub.mem_ctx, ares->message);
1243 case LDB_REPLY_REFERRAL:
1244 /* we ignore referrals */
1246 case LDB_REPLY_EXTENDED:
1247 case LDB_REPLY_DONE:
1253 #ifdef REPLMD_FULL_ASYNC /* TODO: active this code when ldb support full async code */
1255 ar->sub.search_ret = ldb_wait(ar->sub.search_req->handle, LDB_WAIT_ALL);
1256 if (ar->sub.search_ret != LDB_SUCCESS) {
1257 return replmd_replicated_request_error(ar, ar->sub.search_ret);
1259 if (!ar->sub.search_msg) {
1260 return replmd_replicated_request_werror(ar, WERR_DS_DRA_INTERNAL_ERROR);
1263 return replmd_replicated_uptodate_modify(ar);
1269 static int replmd_replicated_uptodate_search(struct replmd_replicated_request *ar)
1272 static const char *attrs[] = {
1273 "replUpToDateVector",
1278 ret = ldb_build_search_req(&ar->sub.search_req,
1281 ar->objs->partition_dn,
1287 replmd_replicated_uptodate_search_callback);
1288 if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
1290 #ifdef REPLMD_FULL_ASYNC /* TODO: active this code when ldb support full async code */
1291 return ldb_next_request(ar->module, ar->sub.search_req);
1293 ret = ldb_next_request(ar->module, ar->sub.search_req);
1294 if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
1296 ar->sub.search_ret = ldb_wait(ar->sub.search_req->handle, LDB_WAIT_ALL);
1297 if (ar->sub.search_ret != LDB_SUCCESS) {
1298 return replmd_replicated_request_error(ar, ar->sub.search_ret);
1300 if (!ar->sub.search_msg) {
1301 return replmd_replicated_request_werror(ar, WERR_DS_DRA_INTERNAL_ERROR);
1304 return replmd_replicated_uptodate_modify(ar);
1308 static int replmd_replicated_uptodate_vector(struct replmd_replicated_request *ar)
1310 ar->sub.mem_ctx = talloc_new(ar);
1311 if (!ar->sub.mem_ctx) return replmd_replicated_request_werror(ar, WERR_NOMEM);
1313 return replmd_replicated_uptodate_search(ar);
1316 static int replmd_extended_replicated_objects(struct ldb_module *module, struct ldb_request *req)
1318 struct dsdb_extended_replicated_objects *objs;
1319 struct replmd_replicated_request *ar;
1321 ldb_debug(module->ldb, LDB_DEBUG_TRACE, "replmd_extended_replicated_objects\n");
1323 objs = talloc_get_type(req->op.extended.data, struct dsdb_extended_replicated_objects);
1325 return LDB_ERR_PROTOCOL_ERROR;
1328 ar = replmd_replicated_init_handle(module, req, objs);
1330 return LDB_ERR_OPERATIONS_ERROR;
1333 #ifdef REPLMD_FULL_ASYNC /* TODO: active this code when ldb support full async code */
1334 return replmd_replicated_apply_next(ar);
1336 while (ar->index_current < ar->objs->num_objects &&
1337 req->handle->state != LDB_ASYNC_DONE) {
1338 replmd_replicated_apply_next(ar);
1341 if (req->handle->state != LDB_ASYNC_DONE) {
1342 replmd_replicated_uptodate_vector(ar);
1349 static int replmd_extended(struct ldb_module *module, struct ldb_request *req)
1351 if (strcmp(req->op.extended.oid, DSDB_EXTENDED_REPLICATED_OBJECTS_OID) == 0) {
1352 return replmd_extended_replicated_objects(module, req);
1355 return ldb_next_request(module, req);
1358 static int replmd_wait_none(struct ldb_handle *handle) {
1359 struct replmd_replicated_request *ar;
1361 if (!handle || !handle->private_data) {
1362 return LDB_ERR_OPERATIONS_ERROR;
1365 ar = talloc_get_type(handle->private_data, struct replmd_replicated_request);
1367 return LDB_ERR_OPERATIONS_ERROR;
1370 /* we do only sync calls */
1371 if (handle->state != LDB_ASYNC_DONE) {
1372 return LDB_ERR_OPERATIONS_ERROR;
1375 return handle->status;
1378 static int replmd_wait_all(struct ldb_handle *handle) {
1382 while (handle->state != LDB_ASYNC_DONE) {
1383 ret = replmd_wait_none(handle);
1384 if (ret != LDB_SUCCESS) {
1389 return handle->status;
1392 static int replmd_wait(struct ldb_handle *handle, enum ldb_wait_type type)
1394 if (type == LDB_WAIT_ALL) {
1395 return replmd_wait_all(handle);
1397 return replmd_wait_none(handle);
1401 static const struct ldb_module_ops replmd_ops = {
1402 .name = "repl_meta_data",
1404 .modify = replmd_modify,
1405 .extended = replmd_extended,
1409 int repl_meta_data_module_init(void)
1411 return ldb_register_module(&replmd_ops);