dsdb: Make less talloc() for parsed_dn.guid
[metze/samba/wip.git] / source4 / dsdb / samdb / ldb_modules / repl_meta_data.c
index 1511b447eeff1715fc685fdedf2d2c5990abbe00..347cf0f8d49986c539c1a42bdb32df063fa48512 100644 (file)
@@ -2,10 +2,10 @@
    ldb database library
 
    Copyright (C) Simo Sorce  2004-2008
-   Copyright (C) Andrew Bartlett <abartlet@samba.org> 2005
-   Copyright (C) Andrew Tridgell 2005
+   Copyright (C) Andrew Bartlett <abartlet@samba.org> 2005-2013
+   Copyright (C) Andrew Tridgell 2005-2009
    Copyright (C) Stefan Metzmacher <metze@samba.org> 2007
-   Copyright (C) Matthieu Patou <mat@samba.org> 2010
+   Copyright (C) Matthieu Patou <mat@samba.org> 2010-2011
 
    This program is free software; you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
 #include "lib/util/binsearch.h"
 #include "lib/util/tsort.h"
 
+/*
+ * It's 29/12/9999 at 23:59:59 UTC as specified in MS-ADTS 7.1.1.4.2
+ * Deleted Objects Container
+ */
+static const NTTIME DELETED_OBJECT_CONTAINER_CHANGE_TIME = 2650466015990000000ULL;
+
 struct replmd_private {
        TALLOC_CTX *la_ctx;
        struct la_entry *la_list;
@@ -61,6 +67,7 @@ struct replmd_private {
                uint64_t mod_usn;
                uint64_t mod_usn_urgent;
        } *ncs;
+       struct ldb_dn *schema_dn;
 };
 
 struct la_entry {
@@ -73,6 +80,7 @@ struct replmd_replicated_request {
        struct ldb_request *req;
 
        const struct dsdb_schema *schema;
+       struct GUID our_invocation_id;
 
        /* the controls we pass down */
        struct ldb_control **controls;
@@ -83,17 +91,87 @@ struct replmd_replicated_request {
        struct dsdb_extended_replicated_objects *objs;
 
        struct ldb_message *search_msg;
+       struct GUID local_parent_guid;
 
        uint64_t seq_num;
        bool is_urgent;
+
+       bool isDeleted;
 };
 
+static int replmd_replicated_apply_merge(struct replmd_replicated_request *ar);
+static int replmd_delete_internals(struct ldb_module *module, struct ldb_request *req, bool re_delete);
+
 enum urgent_situation {
        REPL_URGENT_ON_CREATE = 1,
        REPL_URGENT_ON_UPDATE = 2,
        REPL_URGENT_ON_DELETE = 4
 };
 
+enum deletion_state {
+       OBJECT_NOT_DELETED=1,
+       OBJECT_DELETED=2,
+       OBJECT_RECYCLED=3,
+       OBJECT_TOMBSTONE=4,
+       OBJECT_REMOVED=5
+};
+
+static void replmd_deletion_state(struct ldb_module *module,
+                                 const struct ldb_message *msg,
+                                 enum deletion_state *current_state,
+                                 enum deletion_state *next_state)
+{
+       int ret;
+       bool enabled = false;
+
+       if (msg == NULL) {
+               *current_state = OBJECT_REMOVED;
+               if (next_state != NULL) {
+                       *next_state = OBJECT_REMOVED;
+               }
+               return;
+       }
+
+       ret = dsdb_recyclebin_enabled(module, &enabled);
+       if (ret != LDB_SUCCESS) {
+               enabled = false;
+       }
+
+       if (ldb_msg_check_string_attribute(msg, "isDeleted", "TRUE")) {
+               if (!enabled) {
+                       *current_state = OBJECT_TOMBSTONE;
+                       if (next_state != NULL) {
+                               *next_state = OBJECT_REMOVED;
+                       }
+                       return;
+               }
+
+               if (ldb_msg_check_string_attribute(msg, "isRecycled", "TRUE")) {
+                       *current_state = OBJECT_RECYCLED;
+                       if (next_state != NULL) {
+                               *next_state = OBJECT_REMOVED;
+                       }
+                       return;
+               }
+
+               *current_state = OBJECT_DELETED;
+               if (next_state != NULL) {
+                       *next_state = OBJECT_RECYCLED;
+               }
+               return;
+       }
+
+       *current_state = OBJECT_NOT_DELETED;
+       if (next_state == NULL) {
+               return;
+       }
+
+       if (enabled) {
+               *next_state = OBJECT_DELETED;
+       } else {
+               *next_state = OBJECT_TOMBSTONE;
+       }
+}
 
 static const struct {
        const char *update_name;
@@ -146,7 +224,7 @@ static bool replmd_check_urgent_attribute(const struct ldb_message_element *el)
 }
 
 
-static int replmd_replicated_apply_next(struct replmd_replicated_request *ar);
+static int replmd_replicated_apply_isDeleted(struct replmd_replicated_request *ar);
 
 /*
   initialise the module
@@ -165,6 +243,8 @@ static int replmd_init(struct ldb_module *module)
        }
        ldb_module_set_private(module, replmd_private);
 
+       replmd_private->schema_dn = ldb_get_schema_basedn(ldb);
+
        return ldb_next_init(module);
 }
 
@@ -258,7 +338,7 @@ static int replmd_process_backlink(struct ldb_module *module, struct la_backlink
                /* we allow LDB_ERR_NO_SUCH_ATTRIBUTE as success to
                   cope with possible corruption where the backlink has
                   already been removed */
-               DEBUG(0,("WARNING: backlink from %s already removed from %s - %s\n",
+               DEBUG(3,("WARNING: backlink from %s already removed from %s - %s\n",
                         ldb_dn_get_linearized(target_dn),
                         ldb_dn_get_linearized(source_dn),
                         ldb_errstring(ldb)));
@@ -395,7 +475,7 @@ static int replmd_op_callback(struct ldb_request *req, struct ldb_reply *ares)
        }
 
        if (ares->error != LDB_SUCCESS) {
-               DEBUG(0,("%s failure. Error is: %s\n", __FUNCTION__, ldb_strerror(ares->error)));
+               DEBUG(5,("%s failure. Error is: %s\n", __FUNCTION__, ldb_strerror(ares->error)));
                return ldb_module_done(ac->req, controls,
                                        ares->response, ares->error);
        }
@@ -448,10 +528,7 @@ static int replmd_op_callback(struct ldb_request *req, struct ldb_reply *ares)
        }
 
        if (ac->apply_mode) {
-               talloc_free(ares);
-               ac->index_current++;
-
-               ret = replmd_replicated_apply_next(ac);
+               ret = replmd_replicated_apply_isDeleted(ac);
                if (ret != LDB_SUCCESS) {
                        return ldb_module_done(ac->req, NULL, NULL, ret);
                }
@@ -504,6 +581,7 @@ static struct replmd_replicated_request *replmd_ctx_init(struct ldb_module *modu
 {
        struct ldb_context *ldb;
        struct replmd_replicated_request *ac;
+       const struct GUID *our_invocation_id;
 
        ldb = ldb_module_get_ctx(module);
 
@@ -521,8 +599,19 @@ static struct replmd_replicated_request *replmd_ctx_init(struct ldb_module *modu
                ldb_debug_set(ldb, LDB_DEBUG_FATAL,
                              "replmd_modify: no dsdb_schema loaded");
                DEBUG(0,(__location__ ": %s\n", ldb_errstring(ldb)));
+               talloc_free(ac);
+               return NULL;
+       }
+
+       /* get our invocationId */
+       our_invocation_id = samdb_ntds_invocation_id(ldb);
+       if (!our_invocation_id) {
+               ldb_debug_set(ldb, LDB_DEBUG_FATAL,
+                             "replmd_add: unable to find invocationId\n");
+               talloc_free(ac);
                return NULL;
        }
+       ac->our_invocation_id = *our_invocation_id;
 
        return ac;
 }
@@ -588,58 +677,59 @@ static int replmd_replPropertyMetaData1_attid_sort(const struct replPropertyMeta
                                                   const struct replPropertyMetaData1 *m2,
                                                   const uint32_t *rdn_attid)
 {
-       if (m1->attid == m2->attid) {
-               return 0;
-       }
-
        /*
-        * the rdn attribute should be at the end!
-        * so we need to return a value greater than zero
-        * which means m1 is greater than m2
+        * This assignment seems inoccous, but it is critical for the
+        * system, as we need to do the comparisons as a unsigned
+        * quantity, not signed (enums are signed integers)
         */
-       if (m1->attid == *rdn_attid) {
-               return 1;
+       uint32_t attid_1 = m1->attid;
+       uint32_t attid_2 = m2->attid;
+
+       if (attid_1 == attid_2) {
+               return 0;
        }
 
        /*
-        * the rdn attribute should be at the end!
-        * so we need to return a value less than zero
-        * which means m2 is greater than m1
+        * See above regarding this being an unsigned comparison.
+        * Otherwise when the high bit is set on non-standard
+        * attributes, they would end up first, before objectClass
+        * (0).
         */
-       if (m2->attid == *rdn_attid) {
-               return -1;
-       }
-
-       return m1->attid > m2->attid ? 1 : -1;
+       return attid_1 > attid_2 ? 1 : -1;
 }
 
-static int replmd_replPropertyMetaDataCtr1_sort(struct replPropertyMetaDataCtr1 *ctr1,
-                                               const struct dsdb_schema *schema,
-                                               struct ldb_dn *dn)
+static int replmd_replPropertyMetaDataCtr1_verify(struct ldb_context *ldb,
+                                                 struct replPropertyMetaDataCtr1 *ctr1,
+                                                 struct ldb_dn *dn)
 {
-       const char *rdn_name;
-       const struct dsdb_attribute *rdn_sa;
-
-       rdn_name = ldb_dn_get_rdn_name(dn);
-       if (!rdn_name) {
-               DEBUG(0,(__location__ ": No rDN for %s?\n", ldb_dn_get_linearized(dn)));
-               return LDB_ERR_OPERATIONS_ERROR;
+       if (ctr1->count == 0) {
+               ldb_debug_set(ldb, LDB_DEBUG_FATAL,
+                             "No elements found in replPropertyMetaData for %s!\n",
+                             ldb_dn_get_linearized(dn));
+               return LDB_ERR_CONSTRAINT_VIOLATION;
        }
 
-       rdn_sa = dsdb_attribute_by_lDAPDisplayName(schema, rdn_name);
-       if (rdn_sa == NULL) {
-               DEBUG(0,(__location__ ": No sa found for rDN %s for %s\n", rdn_name, ldb_dn_get_linearized(dn)));
-               return LDB_ERR_OPERATIONS_ERROR;
+       /* the objectClass attribute is value 0x00000000, so must be first */
+       if (ctr1->array[0].attid != DRSUAPI_ATTID_objectClass) {
+               ldb_debug_set(ldb, LDB_DEBUG_FATAL,
+                             "No objectClass found in replPropertyMetaData for %s!\n",
+                             ldb_dn_get_linearized(dn));
+               return LDB_ERR_OBJECT_CLASS_VIOLATION;
        }
 
-       DEBUG(6,("Sorting rpmd with attid exception %u rDN=%s DN=%s\n",
-                rdn_sa->attributeID_id, rdn_name, ldb_dn_get_linearized(dn)));
-
-       LDB_TYPESAFE_QSORT(ctr1->array, ctr1->count, &rdn_sa->attributeID_id, replmd_replPropertyMetaData1_attid_sort);
-
        return LDB_SUCCESS;
 }
 
+static int replmd_replPropertyMetaDataCtr1_sort_and_verify(struct ldb_context *ldb,
+                                                          struct replPropertyMetaDataCtr1 *ctr1,
+                                                          struct ldb_dn *dn)
+{
+       /* Note this is O(n^2) for the almost-sorted case, which this is */
+       LDB_TYPESAFE_QSORT(ctr1->array, ctr1->count, NULL,
+                          replmd_replPropertyMetaData1_attid_sort);
+       return replmd_replPropertyMetaDataCtr1_verify(ldb, ctr1, dn);
+}
+
 static int replmd_ldb_message_element_attid_sort(const struct ldb_message_element *e1,
                                                 const struct ldb_message_element *e2,
                                                 const struct dsdb_schema *schema)
@@ -745,6 +835,7 @@ static int replmd_add_fix_la(struct ldb_module *module, struct ldb_message_eleme
  */
 static int replmd_add(struct ldb_module *module, struct ldb_request *req)
 {
+       struct samldb_msds_intid_persistant *msds_intid_struct;
        struct ldb_context *ldb;
         struct ldb_control *control;
        struct replmd_replicated_request *ac;
@@ -755,7 +846,12 @@ static int replmd_add(struct ldb_module *module, struct ldb_request *req)
        struct GUID guid;
        struct replPropertyMetaDataBlob nmd;
        struct ldb_val nmd_value;
-       const struct GUID *our_invocation_id;
+
+       /*
+        * The use of a time_t here seems odd, but as the NTTIME
+        * elements are actually declared as NTTIME_1sec in the IDL,
+        * getting a higher resolution timestamp is not required.
+        */
        time_t t = time(NULL);
        NTTIME now;
        char *time_str;
@@ -766,7 +862,10 @@ static int replmd_add(struct ldb_module *module, struct ldb_request *req)
        bool allow_add_guid = false;
        bool remove_current_guid = false;
        bool is_urgent = false;
+       bool is_schema_nc = false;
        struct ldb_message_element *objectclass_el;
+       struct replmd_private *replmd_private =
+               talloc_get_type_abort(ldb_module_get_private(module), struct replmd_private);
 
         /* check if there's a show relax control (used by provision to say 'I know what I'm doing') */
         control = ldb_request_get_control(req, LDB_CONTROL_RELAX_OID);
@@ -820,15 +919,6 @@ static int replmd_add(struct ldb_module *module, struct ldb_request *req)
                return ret;
        }
 
-       /* get our invocationId */
-       our_invocation_id = samdb_ntds_invocation_id(ldb);
-       if (!our_invocation_id) {
-               ldb_debug_set(ldb, LDB_DEBUG_ERROR,
-                             "replmd_add: unable to find invocationId\n");
-               talloc_free(ac);
-               return LDB_ERR_OPERATIONS_ERROR;
-       }
-
        /* we have to copy the message as the caller might have it as a const */
        msg = ldb_msg_copy_shallow(ac, req->op.add.message);
        if (msg == NULL) {
@@ -881,6 +971,8 @@ static int replmd_add(struct ldb_module *module, struct ldb_request *req)
                return LDB_ERR_OPERATIONS_ERROR;
        }
 
+       is_schema_nc = ldb_dn_compare_base(replmd_private->schema_dn, msg->dn) == 0;
+
        for (i=0; i < msg->num_elements; i++) {
                struct ldb_message_element *e = &msg->elements[i];
                struct replPropertyMetaData1 *m = &nmd.ctr.ctr1.array[ni];
@@ -905,7 +997,7 @@ static int replmd_add(struct ldb_module *module, struct ldb_request *req)
                }
 
                if (sa->linkID != 0 && functional_level > DS_DOMAIN_FUNCTION_2000) {
-                       ret = replmd_add_fix_la(module, e, ac->seq_num, our_invocation_id, t, &guid, sa, req);
+                       ret = replmd_add_fix_la(module, e, ac->seq_num, &ac->our_invocation_id, t, &guid, sa, req);
                        if (ret != LDB_SUCCESS) {
                                talloc_free(ac);
                                return ret;
@@ -915,10 +1007,32 @@ static int replmd_add(struct ldb_module *module, struct ldb_request *req)
                        continue;
                }
 
-               m->attid                        = sa->attributeID_id;
-               m->version                      = 1;
-               m->originating_change_time      = now;
-               m->originating_invocation_id    = *our_invocation_id;
+               m->attid   = dsdb_attribute_get_attid(sa, is_schema_nc);
+               m->version = 1;
+               if (m->attid == DRSUAPI_ATTID_isDeleted) {
+                       const struct ldb_val *rdn_val = ldb_dn_get_rdn_val(msg->dn);
+                       const char* rdn;
+
+                       if (rdn_val == NULL) {
+                               ldb_oom(ldb);
+                               talloc_free(ac);
+                               return LDB_ERR_OPERATIONS_ERROR;
+                       }
+
+                       rdn = (const char*)rdn_val->data;
+                       if (strcmp(rdn, "Deleted Objects") == 0) {
+                               /*
+                                * Set the originating_change_time to 29/12/9999 at 23:59:59
+                                * as specified in MS-ADTS 7.1.1.4.2 Deleted Objects Container
+                                */
+                               m->originating_change_time      = DELETED_OBJECT_CONTAINER_CHANGE_TIME;
+                       } else {
+                               m->originating_change_time      = now;
+                       }
+               } else {
+                       m->originating_change_time      = now;
+               }
+               m->originating_invocation_id    = ac->our_invocation_id;
                m->originating_usn              = ac->seq_num;
                m->local_usn                    = ac->seq_num;
                ni++;
@@ -928,10 +1042,11 @@ static int replmd_add(struct ldb_module *module, struct ldb_request *req)
        nmd.ctr.ctr1.count = ni;
 
        /*
-        * sort meta data array, and move the rdn attribute entry to the end
+        * sort meta data array
         */
-       ret = replmd_replPropertyMetaDataCtr1_sort(&nmd.ctr.ctr1, ac->schema, msg->dn);
+       ret = replmd_replPropertyMetaDataCtr1_sort_and_verify(ldb, &nmd.ctr.ctr1, msg->dn);
        if (ret != LDB_SUCCESS) {
+               ldb_asprintf_errstring(ldb, "%s: error during direct ADD: %s", __func__, ldb_errstring(ldb));
                talloc_free(ac);
                return ret;
        }
@@ -985,7 +1100,17 @@ static int replmd_add(struct ldb_module *module, struct ldb_request *req)
         */
        replmd_ldb_message_sort(msg, ac->schema);
 
+       /*
+        * Assert that we do have an objectClass
+        */
        objectclass_el = ldb_msg_find_element(msg, "objectClass");
+       if (objectclass_el == NULL) {
+               ldb_asprintf_errstring(ldb, __location__
+                                      ": objectClass missing on %s\n",
+                                      ldb_dn_get_linearized(msg->dn));
+               talloc_free(ac);
+               return LDB_ERR_OBJECT_CLASS_VIOLATION;
+       }
        is_urgent = replmd_check_urgent_objectclass(objectclass_el,
                                                        REPL_URGENT_ON_CREATE);
 
@@ -1025,7 +1150,14 @@ static int replmd_add(struct ldb_module *module, struct ldb_request *req)
        if (control) {
                control->critical = 0;
        }
+       if (ldb_dn_compare_base(replmd_private->schema_dn, req->op.add.message->dn) != 0) {
 
+               /* Update the usn in the SAMLDB_MSDS_INTID_OPAQUE opaque */
+               msds_intid_struct = (struct samldb_msds_intid_persistant *) ldb_get_opaque(ldb, SAMLDB_MSDS_INTID_OPAQUE);
+               if (msds_intid_struct) {
+                       msds_intid_struct->usn = ac->seq_num;
+               }
+       }
        /* go on with the call chain */
        return ldb_next_request(module, down_req);
 }
@@ -1043,11 +1175,14 @@ static int replmd_update_rpmd_element(struct ldb_context *ldb,
                                      uint64_t *seq_num,
                                      const struct GUID *our_invocation_id,
                                      NTTIME now,
+                                     bool is_schema_nc,
                                      struct ldb_request *req)
 {
        uint32_t i;
        const struct dsdb_attribute *a;
        struct replPropertyMetaData1 *md1;
+       bool may_skip = false;
+       uint32_t attid;
 
        a = dsdb_attribute_by_lDAPDisplayName(schema, el->name);
        if (a == NULL) {
@@ -1062,13 +1197,42 @@ static int replmd_update_rpmd_element(struct ldb_context *ldb,
                return LDB_ERR_OPERATIONS_ERROR;
        }
 
+       attid = dsdb_attribute_get_attid(a, is_schema_nc);
+
        if ((a->systemFlags & DS_FLAG_ATTR_NOT_REPLICATED) || (a->systemFlags & DS_FLAG_ATTR_IS_CONSTRUCTED)) {
                return LDB_SUCCESS;
        }
 
-       /* if the attribute's value haven't changed then return LDB_SUCCESS     */
-       if (old_el != NULL && ldb_msg_element_compare(el, old_el) == 0) {
-               if (!ldb_request_get_control(req, LDB_CONTROL_PROVISION_OID)) {
+       /*
+        * if the attribute's value haven't changed, and this isn't
+        * just a delete of everything then return LDB_SUCCESS Unless
+        * we have the provision control or if the attribute is
+        * interSiteTopologyGenerator as this page explain:
+        * http://support.microsoft.com/kb/224815 this attribute is
+        * periodicaly written by the DC responsible for the intersite
+        * generation in a given site
+        *
+        * Unchanged could be deleting or replacing an already-gone
+        * thing with an unconstrained delete/empty replace or a
+        * replace with the same value, but not an add with the same
+        * value because that could be about adding a duplicate (which
+        * is for someone else to error out on).
+        */
+       if (old_el != NULL && ldb_msg_element_equal_ordered(el, old_el)) {
+               if (LDB_FLAG_MOD_TYPE(el->flags) == LDB_FLAG_MOD_REPLACE) {
+                       may_skip = true;
+               }
+       } else if (old_el == NULL && el->num_values == 0) {
+               if (LDB_FLAG_MOD_TYPE(el->flags) == LDB_FLAG_MOD_REPLACE) {
+                       may_skip = true;
+               } else if (LDB_FLAG_MOD_TYPE(el->flags) == LDB_FLAG_MOD_DELETE) {
+                       may_skip = true;
+               }
+       }
+
+       if (may_skip) {
+               if (strcmp(el->name, "interSiteTopologyGenerator") != 0 &&
+                   !ldb_request_get_control(req, LDB_CONTROL_PROVISION_OID)) {
                        /*
                         * allow this to make it possible for dbcheck
                         * to rebuild broken metadata
@@ -1078,7 +1242,22 @@ static int replmd_update_rpmd_element(struct ldb_context *ldb,
        }
 
        for (i=0; i<omd->ctr.ctr1.count; i++) {
-               if (a->attributeID_id == omd->ctr.ctr1.array[i].attid) break;
+               /*
+                * First check if we find it under the msDS-IntID,
+                * then check if we find it under the OID and
+                * prefixMap ID.
+                *
+                * This allows the administrator to simply re-write
+                * the attributes and so restore replication, which is
+                * likely what they will try to do.
+                */
+               if (attid == omd->ctr.ctr1.array[i].attid) {
+                       break;
+               }
+
+               if (a->attributeID_id == omd->ctr.ctr1.array[i].attid) {
+                       break;
+               }
        }
 
        if (a->linkID != 0 && dsdb_functional_level(ldb) > DS_DOMAIN_FUNCTION_2000) {
@@ -1117,8 +1296,29 @@ static int replmd_update_rpmd_element(struct ldb_context *ldb,
 
        md1 = &omd->ctr.ctr1.array[i];
        md1->version++;
-       md1->attid                     = a->attributeID_id;
-       md1->originating_change_time   = now;
+       md1->attid = attid;
+       if (md1->attid == DRSUAPI_ATTID_isDeleted) {
+               const struct ldb_val *rdn_val = ldb_dn_get_rdn_val(msg->dn);
+               const char* rdn;
+
+               if (rdn_val == NULL) {
+                       ldb_oom(ldb);
+                       return LDB_ERR_OPERATIONS_ERROR;
+               }
+
+               rdn = (const char*)rdn_val->data;
+               if (strcmp(rdn, "Deleted Objects") == 0) {
+                       /*
+                        * Set the originating_change_time to 29/12/9999 at 23:59:59
+                        * as specified in MS-ADTS 7.1.1.4.2 Deleted Objects Container
+                        */
+                       md1->originating_change_time    = DELETED_OBJECT_CONTAINER_CHANGE_TIME;
+               } else {
+                       md1->originating_change_time    = now;
+               }
+       } else {
+               md1->originating_change_time    = now;
+       }
        md1->originating_invocation_id = *our_invocation_id;
        md1->originating_usn           = *seq_num;
        md1->local_usn                 = *seq_num;
@@ -1126,6 +1326,52 @@ static int replmd_update_rpmd_element(struct ldb_context *ldb,
        return LDB_SUCCESS;
 }
 
+/*
+ * Bump the replPropertyMetaData version on an attribute, and if it
+ * has changed (or forced by leaving rdn_old NULL), update the value
+ * in the entry.
+ *
+ * This is important, as calling a modify operation may not change the
+ * version number if the values appear unchanged, but a rename between
+ * parents bumps this value.
+ *
+ */
+static int replmd_update_rpmd_rdn_attr(struct ldb_context *ldb,
+                                      struct ldb_message *msg,
+                                      const struct ldb_val *rdn_new,
+                                      const struct ldb_val *rdn_old,
+                                      struct replPropertyMetaDataBlob *omd,
+                                      struct replmd_replicated_request *ar,
+                                      NTTIME now,
+                                      bool is_schema_nc)
+{
+       struct ldb_message_element new_el = {
+               .flags = LDB_FLAG_MOD_REPLACE,
+               .name = ldb_dn_get_rdn_name(msg->dn),
+               .num_values = 1,
+               .values = discard_const_p(struct ldb_val, rdn_new)
+       };
+       struct ldb_message_element old_el = {
+               .flags = LDB_FLAG_MOD_REPLACE,
+               .name = ldb_dn_get_rdn_name(msg->dn),
+               .num_values = rdn_old ? 1 : 0,
+               .values = discard_const_p(struct ldb_val, rdn_old)
+       };
+
+       if (ldb_msg_element_equal_ordered(&new_el, &old_el) == false) {
+               int ret = ldb_msg_add(msg, &new_el, LDB_FLAG_MOD_REPLACE);
+               if (ret != LDB_SUCCESS) {
+                       return ldb_oom(ldb);
+               }
+       }
+
+       return replmd_update_rpmd_element(ldb, msg, &new_el, NULL,
+                                         omd, ar->schema, &ar->seq_num,
+                                         &ar->our_invocation_id,
+                                         now, is_schema_nc, ar->req);
+
+}
+
 static uint64_t find_max_local_usn(struct replPropertyMetaDataBlob omd)
 {
        uint32_t count = omd.ctr.ctr1.count;
@@ -1150,8 +1396,8 @@ static int replmd_update_rpmd(struct ldb_module *module,
                              struct ldb_request *req,
                              const char * const *rename_attrs,
                              struct ldb_message *msg, uint64_t *seq_num,
-                             time_t t,
-                             bool *is_urgent)
+                             time_t t, bool is_schema_nc,
+                             bool *is_urgent, bool *rodc)
 {
        const struct ldb_val *omd_value;
        enum ndr_err_code ndr_err;
@@ -1161,18 +1407,26 @@ static int replmd_update_rpmd(struct ldb_module *module,
        const struct GUID *our_invocation_id;
        int ret;
        const char * const *attrs = NULL;
-       const char * const attrs1[] = { "replPropertyMetaData", "*", NULL };
-       const char * const attrs2[] = { "uSNChanged", "objectClass", NULL };
+       const char * const attrs2[] = { "uSNChanged", "objectClass", "instanceType", NULL };
        struct ldb_result *res;
        struct ldb_context *ldb;
        struct ldb_message_element *objectclass_el;
        enum urgent_situation situation;
-       bool rodc, rmd_is_provided;
-
+       bool rmd_is_provided;
+       bool rmd_is_just_resorted = false;
+       const char *not_rename_attrs[4 + msg->num_elements];
+       
        if (rename_attrs) {
                attrs = rename_attrs;
        } else {
-               attrs = attrs1;
+               for (i = 0; i < msg->num_elements; i++) {
+                       not_rename_attrs[i] = msg->elements[i].name;
+               }
+               not_rename_attrs[i] = "replPropertyMetaData";
+               not_rename_attrs[i+1] = "objectClass";
+               not_rename_attrs[i+2] = "instanceType";
+               not_rename_attrs[i+3] = NULL;
+               attrs = not_rename_attrs;
        }
 
        ldb = ldb_module_get_ctx(module);
@@ -1189,6 +1443,9 @@ static int replmd_update_rpmd(struct ldb_module *module,
 
        if (ldb_request_get_control(req, DSDB_CONTROL_CHANGEREPLMETADATA_OID)) {
                rmd_is_provided = true;
+               if (ldb_request_get_control(req, DSDB_CONTROL_CHANGEREPLMETADATA_RESORT_OID)) {
+                       rmd_is_just_resorted = true;
+               }
        } else {
                rmd_is_provided = false;
        }
@@ -1207,7 +1464,7 @@ static int replmd_update_rpmd(struct ldb_module *module,
                /* In this case the change_replmetadata control was supplied */
                /* We check that it's the only attribute that is provided
                 * (it's a rare case so it's better to keep the code simplier)
-                * We also check that the highest local_usn is bigger than
+                * We also check that the highest local_usn is bigger or the same as
                 * uSNChanged. */
                uint64_t db_seq;
                if( msg->num_elements != 1 ||
@@ -1234,7 +1491,6 @@ static int replmd_update_rpmd(struct ldb_module *module,
                                 ldb_dn_get_linearized(msg->dn)));
                        return LDB_ERR_OPERATIONS_ERROR;
                }
-               *seq_num = find_max_local_usn(omd);
 
                ret = dsdb_module_search_dn(module, msg, &res, msg->dn, attrs2,
                                            DSDB_FLAG_NEXT_MODULE |
@@ -1243,24 +1499,26 @@ static int replmd_update_rpmd(struct ldb_module *module,
                                            DSDB_SEARCH_SHOW_DN_IN_STORAGE_FORMAT |
                                            DSDB_SEARCH_REVEAL_INTERNALS, req);
 
-               if (ret != LDB_SUCCESS || res->count != 1) {
-                       DEBUG(0,(__location__ ": Object %s failed to find uSNChanged\n",
-                                ldb_dn_get_linearized(msg->dn)));
-                       return LDB_ERR_OPERATIONS_ERROR;
+               if (ret != LDB_SUCCESS) {
+                       return ret;
                }
 
-               objectclass_el = ldb_msg_find_element(res->msgs[0], "objectClass");
-               if (is_urgent && replmd_check_urgent_objectclass(objectclass_el,
-                                                               situation)) {
-                       *is_urgent = true;
-               }
+               if (rmd_is_just_resorted == false) {
+                       *seq_num = find_max_local_usn(omd);
 
-               db_seq = ldb_msg_find_attr_as_uint64(res->msgs[0], "uSNChanged", 0);
-               if (*seq_num <= db_seq) {
-                       DEBUG(0,(__location__ ": changereplmetada control provided but max(local_usn)"\
-                                             " is less or equal to uSNChanged (max = %lld uSNChanged = %lld)\n",
-                                (long long)*seq_num, (long long)db_seq));
-                       return LDB_ERR_OPERATIONS_ERROR;
+                       db_seq = ldb_msg_find_attr_as_uint64(res->msgs[0], "uSNChanged", 0);
+
+                       /*
+                        * The test here now allows for a new
+                        * replPropertyMetaData with no change, if was
+                        * just dbcheck re-sorting the values.
+                        */
+                       if (*seq_num <= db_seq) {
+                               DEBUG(0,(__location__ ": changereplmetada control provided but max(local_usn)" \
+                                        " is less than uSNChanged (max = %lld uSNChanged = %lld)\n",
+                                        (long long)*seq_num, (long long)db_seq));
+                               return LDB_ERR_OPERATIONS_ERROR;
+                       }
                }
 
        } else {
@@ -1275,16 +1533,8 @@ static int replmd_update_rpmd(struct ldb_module *module,
                                            DSDB_SEARCH_SHOW_EXTENDED_DN |
                                            DSDB_SEARCH_SHOW_DN_IN_STORAGE_FORMAT |
                                            DSDB_SEARCH_REVEAL_INTERNALS, req);
-               if (ret != LDB_SUCCESS || res->count != 1) {
-                       DEBUG(0,(__location__ ": Object %s failed to find replPropertyMetaData\n",
-                                ldb_dn_get_linearized(msg->dn)));
-                       return LDB_ERR_OPERATIONS_ERROR;
-               }
-
-               objectclass_el = ldb_msg_find_element(res->msgs[0], "objectClass");
-               if (is_urgent && replmd_check_urgent_objectclass(objectclass_el,
-                                                               situation)) {
-                       *is_urgent = true;
+               if (ret != LDB_SUCCESS) {
+                       return ret;
                }
 
                omd_value = ldb_msg_find_ldb_val(res->msgs[0], "replPropertyMetaData");
@@ -1312,34 +1562,66 @@ static int replmd_update_rpmd(struct ldb_module *module,
                        struct ldb_message_element *old_el;
                        old_el = ldb_msg_find_element(res->msgs[0], msg->elements[i].name);
                        ret = replmd_update_rpmd_element(ldb, msg, &msg->elements[i], old_el, &omd, schema, seq_num,
-                                                        our_invocation_id, now, req);
+                                                        our_invocation_id,
+                                                        now, is_schema_nc,
+                                                        req);
                        if (ret != LDB_SUCCESS) {
                                return ret;
                        }
 
-                       if (is_urgent && !*is_urgent && (situation == REPL_URGENT_ON_UPDATE)) {
+                       if (!*is_urgent && (situation == REPL_URGENT_ON_UPDATE)) {
                                *is_urgent = replmd_check_urgent_attribute(&msg->elements[i]);
                        }
 
                }
        }
+
+       /*
+        * Assert that we have an objectClass attribute - this is major
+        * corruption if we don't have this!
+        */
+       objectclass_el = ldb_msg_find_element(res->msgs[0], "objectClass");
+       if (objectclass_el != NULL) {
+               /*
+                * Now check if this objectClass means we need to do urgent replication
+                */
+               if (!*is_urgent && replmd_check_urgent_objectclass(objectclass_el,
+                                                                  situation)) {
+                       *is_urgent = true;
+               }
+       } else if (!ldb_request_get_control(req, DSDB_CONTROL_DBCHECK)) {
+               ldb_asprintf_errstring(ldb, __location__
+                                      ": objectClass missing on %s\n",
+                                      ldb_dn_get_linearized(msg->dn));
+               return LDB_ERR_OBJECT_CLASS_VIOLATION;
+       }
+
        /*
         * replmd_update_rpmd_element has done an update if the
         * seq_num is set
         */
-       if (*seq_num != 0) {
+       if (*seq_num != 0 || rmd_is_just_resorted == true) {
                struct ldb_val *md_value;
                struct ldb_message_element *el;
 
                /*if we are RODC and this is a DRSR update then its ok*/
-               if (!ldb_request_get_control(req, DSDB_CONTROL_REPLICATED_UPDATE_OID)) {
-                       ret = samdb_rodc(ldb, &rodc);
+               if (!ldb_request_get_control(req, DSDB_CONTROL_REPLICATED_UPDATE_OID)
+                   && !ldb_request_get_control(req, DSDB_CONTROL_DBCHECK_MODIFY_RO_REPLICA)) {
+                       unsigned instanceType;
+
+                       ret = samdb_rodc(ldb, rodc);
                        if (ret != LDB_SUCCESS) {
                                DEBUG(4, (__location__ ": unable to tell if we are an RODC\n"));
-                       } else if (rodc) {
-                               ldb_asprintf_errstring(ldb, "RODC modify is forbidden\n");
+                       } else if (*rodc) {
+                               ldb_set_errstring(ldb, "RODC modify is forbidden!");
                                return LDB_ERR_REFERRAL;
                        }
+
+                       instanceType = ldb_msg_find_attr_as_uint(res->msgs[0], "instanceType", INSTANCE_TYPE_WRITE);
+                       if (!(instanceType & INSTANCE_TYPE_WRITE)) {
+                               return ldb_error(ldb, LDB_ERR_UNWILLING_TO_PERFORM,
+                                                "cannot change replicated attribute on partial replica");
+                       }
                }
 
                md_value = talloc(msg, struct ldb_val);
@@ -1348,8 +1630,9 @@ static int replmd_update_rpmd(struct ldb_module *module,
                        return LDB_ERR_OPERATIONS_ERROR;
                }
 
-               ret = replmd_replPropertyMetaDataCtr1_sort(&omd.ctr.ctr1, schema, msg->dn);
+               ret = replmd_replPropertyMetaDataCtr1_sort_and_verify(ldb, &omd.ctr.ctr1, msg->dn);
                if (ret != LDB_SUCCESS) {
+                       ldb_asprintf_errstring(ldb, "%s: %s", __func__, ldb_errstring(ldb));
                        return ret;
                }
 
@@ -1377,13 +1660,18 @@ static int replmd_update_rpmd(struct ldb_module *module,
 
 struct parsed_dn {
        struct dsdb_dn *dsdb_dn;
-       struct GUID *guid;
+       struct GUID guid;
        struct ldb_val *v;
 };
 
 static int parsed_dn_compare(struct parsed_dn *pdn1, struct parsed_dn *pdn2)
 {
-       return GUID_compare(pdn1->guid, pdn2->guid);
+       return GUID_compare(&pdn1->guid, &pdn2->guid);
+}
+
+static int GUID_compare_struct(struct GUID *g1, struct GUID g2)
+{
+       return GUID_compare(g1, &g2);
 }
 
 static struct parsed_dn *parsed_dn_find(struct parsed_dn *pdn,
@@ -1403,7 +1691,7 @@ static struct parsed_dn *parsed_dn_find(struct parsed_dn *pdn,
                }
                return NULL;
        }
-       BINARY_ARRAY_SEARCH(pdn, count, guid, guid, GUID_compare, ret);
+       BINARY_ARRAY_SEARCH(pdn, count, guid, guid, GUID_compare_struct, ret);
        return ret;
 }
 
@@ -1444,22 +1732,21 @@ static int get_parsed_dns(struct ldb_module *module, TALLOC_CTX *mem_ctx,
 
                dn = p->dsdb_dn->dn;
 
-               p->guid = talloc(*pdn, struct GUID);
-               if (p->guid == NULL) {
-                       ldb_module_oom(module);
-                       return LDB_ERR_OPERATIONS_ERROR;
-               }
-
-               status = dsdb_get_extended_dn_guid(dn, p->guid, "GUID");
+               status = dsdb_get_extended_dn_guid(dn, &p->guid, "GUID");
                if (NT_STATUS_EQUAL(status, NT_STATUS_OBJECT_NAME_NOT_FOUND)) {
                        /* we got a DN without a GUID - go find the GUID */
-                       int ret = dsdb_module_guid_by_dn(module, dn, p->guid, parent);
+                       int ret = dsdb_module_guid_by_dn(module, dn, &p->guid, parent);
                        if (ret != LDB_SUCCESS) {
                                ldb_asprintf_errstring(ldb, "Unable to find GUID for DN %s\n",
                                                       ldb_dn_get_linearized(dn));
+                               if (ret == LDB_ERR_NO_SUCH_OBJECT &&
+                                   LDB_FLAG_MOD_TYPE(el->flags) == LDB_FLAG_MOD_DELETE &&
+                                   ldb_attr_cmp(el->name, "member") == 0) {
+                                       return LDB_ERR_UNWILLING_TO_PERFORM;
+                               }
                                return ret;
                        }
-                       ret = dsdb_set_extended_dn_guid(dn, p->guid, "GUID");
+                       ret = dsdb_set_extended_dn_guid(dn, &p->guid, "GUID");
                        if (ret != LDB_SUCCESS) {
                                return ret;
                        }
@@ -1602,7 +1889,7 @@ static int replmd_check_upgrade_links(struct parsed_dn *dns, uint32_t count, str
  */
 static int replmd_update_la_val(TALLOC_CTX *mem_ctx, struct ldb_val *v, struct dsdb_dn *dsdb_dn,
                                struct dsdb_dn *old_dsdb_dn, const struct GUID *invocation_id,
-                               uint64_t seq_num, uint64_t local_usn, NTTIME nttime,
+                               uint64_t usn, uint64_t local_usn, NTTIME nttime,
                                uint32_t version, bool deleted)
 {
        struct ldb_dn *dn = dsdb_dn->dn;
@@ -1625,7 +1912,7 @@ static int replmd_update_la_val(TALLOC_CTX *mem_ctx, struct ldb_val *v, struct d
        }
        tval = data_blob_string_const(tstring);
 
-       usn_string = talloc_asprintf(mem_ctx, "%llu", (unsigned long long)seq_num);
+       usn_string = talloc_asprintf(mem_ctx, "%llu", (unsigned long long)usn);
        if (!usn_string) {
                return LDB_ERR_OPERATIONS_ERROR;
        }
@@ -1749,7 +2036,7 @@ static int replmd_modify_la_add(struct ldb_module *module,
 
        /* for each new value, see if it exists already with the same GUID */
        for (i=0; i<el->num_values; i++) {
-               struct parsed_dn *p = parsed_dn_find(old_dns, old_num_values, dns[i].guid, NULL);
+               struct parsed_dn *p = parsed_dn_find(old_dns, old_num_values, &dns[i].guid, NULL);
                if (p == NULL) {
                        /* this is a new linked attribute value */
                        new_values = talloc_realloc(tmp_ctx, new_values, struct ldb_val, num_new_values+1);
@@ -1771,8 +2058,9 @@ static int replmd_modify_la_add(struct ldb_module *module,
                        uint32_t rmd_flags = dsdb_dn_rmd_flags(p->dsdb_dn->dn);
 
                        if (!(rmd_flags & DSDB_RMD_FLAG_DELETED)) {
+                               struct GUID_txt_buf guid_str;
                                ldb_asprintf_errstring(ldb, "Attribute %s already exists for target GUID %s",
-                                                      el->name, GUID_string(tmp_ctx, p->guid));
+                                                      el->name, GUID_buf_string(&p->guid, &guid_str));
                                talloc_free(tmp_ctx);
                                /* error codes for 'member' need to be
                                   special cased */
@@ -1790,7 +2078,7 @@ static int replmd_modify_la_add(struct ldb_module *module,
                        }
                }
 
-               ret = replmd_add_backlink(module, schema, msg_guid, dns[i].guid, true, schema_attr, true);
+               ret = replmd_add_backlink(module, schema, msg_guid, &dns[i].guid, true, schema_attr, true);
                if (ret != LDB_SUCCESS) {
                        talloc_free(tmp_ctx);
                        return ret;
@@ -1888,10 +2176,11 @@ static int replmd_modify_la_delete(struct ldb_module *module,
                struct parsed_dn *p2;
                uint32_t rmd_flags;
 
-               p2 = parsed_dn_find(old_dns, old_el->num_values, p->guid, NULL);
+               p2 = parsed_dn_find(old_dns, old_el->num_values, &p->guid, NULL);
                if (!p2) {
+                       struct GUID_txt_buf buf;
                        ldb_asprintf_errstring(ldb, "Attribute %s doesn't exist for target GUID %s",
-                                              el->name, GUID_string(tmp_ctx, p->guid));
+                                              el->name, GUID_buf_string(&p->guid, &buf));
                        if (ldb_attr_cmp(el->name, "member") == 0) {
                                return LDB_ERR_UNWILLING_TO_PERFORM;
                        } else {
@@ -1900,8 +2189,9 @@ static int replmd_modify_la_delete(struct ldb_module *module,
                }
                rmd_flags = dsdb_dn_rmd_flags(p2->dsdb_dn->dn);
                if (rmd_flags & DSDB_RMD_FLAG_DELETED) {
+                       struct GUID_txt_buf buf;
                        ldb_asprintf_errstring(ldb, "Attribute %s already deleted for target GUID %s",
-                                              el->name, GUID_string(tmp_ctx, p->guid));
+                                              el->name, GUID_buf_string(&p->guid, &buf));
                        if (ldb_attr_cmp(el->name, "member") == 0) {
                                return LDB_ERR_UNWILLING_TO_PERFORM;
                        } else {
@@ -1917,7 +2207,7 @@ static int replmd_modify_la_delete(struct ldb_module *module,
                struct parsed_dn *p = &old_dns[i];
                uint32_t rmd_flags;
 
-               if (el->num_values && parsed_dn_find(dns, el->num_values, p->guid, NULL) == NULL) {
+               if (el->num_values && parsed_dn_find(dns, el->num_values, &p->guid, NULL) == NULL) {
                        continue;
                }
 
@@ -1931,7 +2221,7 @@ static int replmd_modify_la_delete(struct ldb_module *module,
                        return ret;
                }
 
-               ret = replmd_add_backlink(module, schema, msg_guid, old_dns[i].guid, false, schema_attr, true);
+               ret = replmd_add_backlink(module, schema, msg_guid, &old_dns[i].guid, false, schema_attr, true);
                if (ret != LDB_SUCCESS) {
                        talloc_free(tmp_ctx);
                        return ret;
@@ -2014,13 +2304,13 @@ static int replmd_modify_la_replace(struct ldb_module *module,
 
                if (rmd_flags & DSDB_RMD_FLAG_DELETED) continue;
 
-               ret = replmd_add_backlink(module, schema, msg_guid, old_dns[i].guid, false, schema_attr, false);
+               ret = replmd_add_backlink(module, schema, msg_guid, &old_dns[i].guid, false, schema_attr, false);
                if (ret != LDB_SUCCESS) {
                        talloc_free(tmp_ctx);
                        return ret;
                }
 
-               p = parsed_dn_find(dns, el->num_values, old_p->guid, NULL);
+               p = parsed_dn_find(dns, el->num_values, &old_p->guid, NULL);
                if (p) {
                        /* we don't delete it if we are re-adding it */
                        continue;
@@ -2042,7 +2332,7 @@ static int replmd_modify_la_replace(struct ldb_module *module,
 
                if (old_dns &&
                    (old_p = parsed_dn_find(old_dns,
-                                           old_num_values, p->guid, NULL)) != NULL) {
+                                           old_num_values, &p->guid, NULL)) != NULL) {
                        /* update in place */
                        ret = replmd_update_la_val(old_el->values, old_p->v, p->dsdb_dn,
                                                   old_p->dsdb_dn, invocation_id,
@@ -2069,7 +2359,7 @@ static int replmd_modify_la_replace(struct ldb_module *module,
                        num_new_values++;
                }
 
-               ret = replmd_add_backlink(module, schema, msg_guid, dns[i].guid, true, schema_attr, false);
+               ret = replmd_add_backlink(module, schema, msg_guid, &dns[i].guid, true, schema_attr, false);
                if (ret != LDB_SUCCESS) {
                        talloc_free(tmp_ctx);
                        return ret;
@@ -2166,6 +2456,9 @@ static int replmd_modify_handle_linked_attribs(struct ldb_module *module,
                        continue;
                }
                if ((schema_attr->linkID & 1) == 1) {
+                       if (parent && ldb_request_get_control(parent, DSDB_CONTROL_DBCHECK)) {
+                               continue;
+                       }
                        /* Odd is for the target.  Illegal to modify */
                        ldb_asprintf_errstring(ldb,
                                               "attribute %s must not be modified directly, it is a linked attribute", el->name);
@@ -2227,29 +2520,47 @@ static int replmd_modify_handle_linked_attribs(struct ldb_module *module,
 
 static int replmd_modify(struct ldb_module *module, struct ldb_request *req)
 {
+       struct samldb_msds_intid_persistant *msds_intid_struct;
        struct ldb_context *ldb;
        struct replmd_replicated_request *ac;
        struct ldb_request *down_req;
        struct ldb_message *msg;
        time_t t = time(NULL);
        int ret;
-       bool is_urgent = false;
-       struct loadparm_context *lp_ctx;
-       char *referral;
+       bool is_urgent = false, rodc = false;
+       bool is_schema_nc = false;
        unsigned int functional_level;
-       const DATA_BLOB *guid_blob;
+       const struct ldb_message_element *guid_el = NULL;
+       struct ldb_control *sd_propagation_control;
+       struct replmd_private *replmd_private =
+               talloc_get_type(ldb_module_get_private(module), struct replmd_private);
 
        /* do not manipulate our control entries */
        if (ldb_dn_is_special(req->op.mod.message->dn)) {
                return ldb_next_request(module, req);
        }
 
+       sd_propagation_control = ldb_request_get_control(req,
+                                       DSDB_CONTROL_SEC_DESC_PROPAGATION_OID);
+       if (sd_propagation_control != NULL) {
+               if (req->op.mod.message->num_elements != 1) {
+                       return ldb_module_operr(module);
+               }
+               ret = strcmp(req->op.mod.message->elements[0].name,
+                            "nTSecurityDescriptor");
+               if (ret != 0) {
+                       return ldb_module_operr(module);
+               }
+
+               return ldb_next_request(module, req);
+       }
+
        ldb = ldb_module_get_ctx(module);
 
        ldb_debug(ldb, LDB_DEBUG_TRACE, "replmd_modify\n");
 
-       guid_blob = ldb_msg_find_ldb_val(req->op.mod.message, "objectGUID");
-       if ( guid_blob != NULL ) {
+       guid_el = ldb_msg_find_element(req->op.mod.message, "objectGUID");
+       if (guid_el != NULL) {
                ldb_set_errstring(ldb,
                                  "replmd_modify: it's not allowed to change the objectGUID!");
                return LDB_ERR_CONSTRAINT_VIOLATION;
@@ -2262,9 +2573,6 @@ static int replmd_modify(struct ldb_module *module, struct ldb_request *req)
 
        functional_level = dsdb_functional_level(ldb);
 
-       lp_ctx = talloc_get_type(ldb_get_opaque(ldb, "loadparm"),
-                                struct loadparm_context);
-
        /* we have to copy the message as the caller might have it as a const */
        msg = ldb_msg_copy_shallow(ac, req->op.mod.message);
        if (msg == NULL) {
@@ -2276,16 +2584,25 @@ static int replmd_modify(struct ldb_module *module, struct ldb_request *req)
        ldb_msg_remove_attr(msg, "whenChanged");
        ldb_msg_remove_attr(msg, "uSNChanged");
 
+       is_schema_nc = ldb_dn_compare_base(replmd_private->schema_dn, msg->dn) == 0;
+
        ret = replmd_update_rpmd(module, ac->schema, req, NULL,
-                                msg, &ac->seq_num, t, &is_urgent);
-       if (ret == LDB_ERR_REFERRAL) {
+                                msg, &ac->seq_num, t, is_schema_nc,
+                                &is_urgent, &rodc);
+       if (rodc && (ret == LDB_ERR_REFERRAL)) {
+               struct loadparm_context *lp_ctx;
+               char *referral;
+
+               lp_ctx = talloc_get_type(ldb_get_opaque(ldb, "loadparm"),
+                                        struct loadparm_context);
+
                referral = talloc_asprintf(req,
                                           "ldap://%s/%s",
                                           lpcfg_dnsdomain(lp_ctx),
                                           ldb_dn_get_linearized(msg->dn));
                ret = ldb_module_send_referral(req, referral);
                talloc_free(ac);
-               return ldb_module_done(req, NULL, NULL, ret);
+               return ret;
        }
 
        if (ret != LDB_SUCCESS) {
@@ -2346,16 +2663,26 @@ static int replmd_modify(struct ldb_module *module, struct ldb_request *req)
                ret = add_time_element(msg, "whenChanged", t);
                if (ret != LDB_SUCCESS) {
                        talloc_free(ac);
+                       ldb_operr(ldb);
                        return ret;
                }
 
                ret = add_uint64_element(ldb, msg, "uSNChanged", ac->seq_num);
                if (ret != LDB_SUCCESS) {
                        talloc_free(ac);
+                       ldb_operr(ldb);
                        return ret;
                }
        }
 
+       if (!ldb_dn_compare_base(replmd_private->schema_dn, msg->dn)) {
+               /* Update the usn in the SAMLDB_MSDS_INTID_OPAQUE opaque */
+               msds_intid_struct = (struct samldb_msds_intid_persistant *) ldb_get_opaque(ldb, SAMLDB_MSDS_INTID_OPAQUE);
+               if (msds_intid_struct) {
+                       msds_intid_struct->usn = ac->seq_num;
+               }
+       }
+
        /* go on with the call chain */
        return ldb_next_request(module, down_req);
 }
@@ -2409,18 +2736,22 @@ static int replmd_rename(struct ldb_module *module, struct ldb_request *req)
 static int replmd_rename_callback(struct ldb_request *req, struct ldb_reply *ares)
 {
        struct ldb_context *ldb;
-       struct replmd_replicated_request *ac;
        struct ldb_request *down_req;
        struct ldb_message *msg;
        const struct dsdb_attribute *rdn_attr;
        const char *rdn_name;
        const struct ldb_val *rdn_val;
-       const char *attrs[4] = { NULL, };
+       const char *attrs[5] = { NULL, };
        time_t t = time(NULL);
        int ret;
-       bool is_urgent = false;
+       bool is_urgent = false, rodc = false;
+       bool is_schema_nc;
+       struct replmd_replicated_request *ac =
+               talloc_get_type(req->context, struct replmd_replicated_request);
+       struct replmd_private *replmd_private =
+               talloc_get_type(ldb_module_get_private(ac->module),
+                               struct replmd_private);
 
-       ac = talloc_get_type(req->context, struct replmd_replicated_request);
        ldb = ldb_module_get_ctx(ac->module);
 
        if (ares->error != LDB_SUCCESS) {
@@ -2448,6 +2779,8 @@ static int replmd_rename_callback(struct ldb_request *req, struct ldb_reply *are
 
        msg->dn = ac->req->op.rename.newdn;
 
+       is_schema_nc = ldb_dn_compare_base(replmd_private->schema_dn, msg->dn) == 0;
+
        rdn_name = ldb_dn_get_rdn_name(msg->dn);
        if (rdn_name == NULL) {
                talloc_free(ares);
@@ -2528,12 +2861,14 @@ static int replmd_rename_callback(struct ldb_request *req, struct ldb_reply *are
         */
        attrs[0] = "replPropertyMetaData";
        attrs[1] = "objectClass";
-       attrs[2] = rdn_name;
-       attrs[3] = NULL;
+       attrs[2] = "instanceType";
+       attrs[3] = rdn_name;
+       attrs[4] = NULL;
 
        ret = replmd_update_rpmd(ac->module, ac->schema, req, attrs,
-                                msg, &ac->seq_num, t, &is_urgent);
-       if (ret == LDB_ERR_REFERRAL) {
+                                msg, &ac->seq_num, t,
+                                is_schema_nc, &is_urgent, &rodc);
+       if (rodc && (ret == LDB_ERR_REFERRAL)) {
                struct ldb_dn *olddn = ac->req->op.rename.olddn;
                struct loadparm_context *lp_ctx;
                char *referral;
@@ -2546,15 +2881,13 @@ static int replmd_rename_callback(struct ldb_request *req, struct ldb_reply *are
                                           lpcfg_dnsdomain(lp_ctx),
                                           ldb_dn_get_linearized(olddn));
                ret = ldb_module_send_referral(req, referral);
-               talloc_free(ac);
+               talloc_free(ares);
                return ldb_module_done(req, NULL, NULL, ret);
        }
 
        if (ret != LDB_SUCCESS) {
                talloc_free(ares);
-               return ldb_module_done(ac->req, NULL, NULL,
-                                      ldb_error(ldb, ret,
-                                       "failed to call replmd_update_rpmd()"));
+               return ldb_module_done(ac->req, NULL, NULL, ret);
        }
 
        if (ac->seq_num == 0) {
@@ -2592,12 +2925,14 @@ static int replmd_rename_callback(struct ldb_request *req, struct ldb_reply *are
        ret = add_time_element(msg, "whenChanged", t);
        if (ret != LDB_SUCCESS) {
                talloc_free(ac);
+               ldb_operr(ldb);
                return ret;
        }
 
        ret = add_uint64_element(ldb, msg, "uSNChanged", ac->seq_num);
        if (ret != LDB_SUCCESS) {
                talloc_free(ac);
+               ldb_operr(ldb);
                return ret;
        }
 
@@ -2606,8 +2941,11 @@ static int replmd_rename_callback(struct ldb_request *req, struct ldb_reply *are
 }
 
 /*
-   remove links from objects that point at this object when an object
-   is deleted
+ * remove links from objects that point at this object when an object
+ * is deleted.  We remove it from the NEXT module per MS-DRSR 5.160
+ * RemoveObj which states that link removal due to the object being
+ * deleted is NOT an originating update - they just go away!
+ *
  */
 static int replmd_delete_remove_link(struct ldb_module *module,
                                     const struct dsdb_schema *schema,
@@ -2688,8 +3026,13 @@ static int replmd_delete_remove_link(struct ldb_module *module,
 
   This also handles the mapping of delete to a rename operation
   to allow deletes to be replicated.
+
+  It also handles the incoming deleted objects, to ensure they are
+  fully deleted here.  In that case re_delete is true, and we do not
+  use this as a signal to change the deleted state, just reinforce it.
+
  */
-static int replmd_delete(struct ldb_module *module, struct ldb_request *req)
+static int replmd_delete_internals(struct ldb_module *module, struct ldb_request *req, bool re_delete)
 {
        int ret = LDB_ERR_OTHER;
        bool retb, disallow_move_on_delete;
@@ -2715,15 +3058,26 @@ static int replmd_delete(struct ldb_module *module, struct ldb_request *req)
                "trustType", "trustAttributes", "userAccountControl", "uSNChanged", "uSNCreated", "whenCreated",
                "whenChanged", NULL};
        unsigned int i, el_count = 0;
-       enum deletion_state { OBJECT_NOT_DELETED=1, OBJECT_DELETED=2, OBJECT_RECYCLED=3,
-                                               OBJECT_TOMBSTONE=4, OBJECT_REMOVED=5 };
        enum deletion_state deletion_state, next_deletion_state;
-       bool enabled;
 
        if (ldb_dn_is_special(req->op.del.dn)) {
                return ldb_next_request(module, req);
        }
 
+       /*
+        * We have to allow dbcheck to remove an object that
+        * is beyond repair, and to do so totally.  This could
+        * mean we we can get a partial object from the other
+        * DC, causing havoc, so dbcheck suggests
+        * re-replication first.  dbcheck sets both DBCHECK
+        * and RELAX in this situation.
+        */
+       if (ldb_request_get_control(req, LDB_CONTROL_RELAX_OID)
+           && ldb_request_get_control(req, DSDB_CONTROL_DBCHECK)) {
+               /* really, really remove it */
+               return ldb_next_request(module, req);
+       }
+
        tmp_ctx = talloc_new(ldb);
        if (!tmp_ctx) {
                ldb_oom(ldb);
@@ -2732,6 +3086,7 @@ static int replmd_delete(struct ldb_module *module, struct ldb_request *req)
 
        schema = dsdb_get_schema(ldb, tmp_ctx);
        if (!schema) {
+               talloc_free(tmp_ctx);
                return LDB_ERR_OPERATIONS_ERROR;
        }
 
@@ -2745,50 +3100,46 @@ static int replmd_delete(struct ldb_module *module, struct ldb_request *req)
                                    DSDB_SEARCH_REVEAL_INTERNALS |
                                    DSDB_SEARCH_SHOW_DN_IN_STORAGE_FORMAT, req);
        if (ret != LDB_SUCCESS) {
+               ldb_asprintf_errstring(ldb_module_get_ctx(module),
+                                      "repmd_delete: Failed to %s %s, because we failed to find it: %s",
+                                      re_delete ? "re-delete" : "delete",
+                                      ldb_dn_get_linearized(old_dn),
+                                      ldb_errstring(ldb_module_get_ctx(module)));
                talloc_free(tmp_ctx);
                return ret;
        }
        old_msg = res->msgs[0];
 
+       replmd_deletion_state(module, old_msg,
+                             &deletion_state,
+                             &next_deletion_state);
 
-       ret = dsdb_recyclebin_enabled(module, &enabled);
-       if (ret != LDB_SUCCESS) {
-               talloc_free(tmp_ctx);
-               return ret;
-       }
-
-       if (ldb_msg_check_string_attribute(old_msg, "isDeleted", "TRUE")) {
-               if (!enabled) {
-                       deletion_state = OBJECT_TOMBSTONE;
-                       next_deletion_state = OBJECT_REMOVED;
-               } else if (ldb_msg_check_string_attribute(old_msg, "isRecycled", "TRUE")) {
-                       deletion_state = OBJECT_RECYCLED;
-                       next_deletion_state = OBJECT_REMOVED;
-               } else {
-                       deletion_state = OBJECT_DELETED;
-                       next_deletion_state = OBJECT_RECYCLED;
-               }
-       } else {
-               deletion_state = OBJECT_NOT_DELETED;
-               if (enabled) {
-                       next_deletion_state = OBJECT_DELETED;
-               } else {
-                       next_deletion_state = OBJECT_TOMBSTONE;
-               }
+       /* This supports us noticing an incoming isDeleted and acting on it */
+       if (re_delete) {
+               SMB_ASSERT(deletion_state > OBJECT_NOT_DELETED);
+               next_deletion_state = deletion_state;
        }
 
        if (next_deletion_state == OBJECT_REMOVED) {
-               struct auth_session_info *session_info =
-                               (struct auth_session_info *)ldb_get_opaque(ldb, "sessionInfo");
-               if (security_session_user_level(session_info, NULL) != SECURITY_SYSTEM) {
-                       ldb_asprintf_errstring(ldb, "Refusing to delete deleted object %s",
-                                       ldb_dn_get_linearized(old_msg->dn));
-                       return LDB_ERR_UNWILLING_TO_PERFORM;
+               /*
+                * We have to prevent objects being deleted, even if
+                * the administrator really wants them gone, as
+                * without the tombstone, we can get a partial object
+                * from the other DC, causing havoc.
+                *
+                * The only other valid case is when the 180 day
+                * timeout has expired, when relax is specified.
+                */
+               if (ldb_request_get_control(req, LDB_CONTROL_RELAX_OID)) {
+                       /* it is already deleted - really remove it this time */
+                       talloc_free(tmp_ctx);
+                       return ldb_next_request(module, req);
                }
 
-               /* it is already deleted - really remove it this time */
-               talloc_free(tmp_ctx);
-               return ldb_next_request(module, req);
+               ldb_asprintf_errstring(ldb, "Refusing to delete tombstone object %s.  "
+                                      "This check is to prevent corruption of the replicated state.",
+                                      ldb_dn_get_linearized(old_msg->dn));
+               return LDB_ERR_UNWILLING_TO_PERFORM;
        }
 
        rdn_name = ldb_dn_get_rdn_name(old_dn);
@@ -2807,57 +3158,101 @@ static int replmd_delete(struct ldb_module *module, struct ldb_request *req)
 
        msg->dn = old_dn;
 
-       if (deletion_state == OBJECT_NOT_DELETED){
-               /* consider the SYSTEM_FLAG_DISALLOW_MOVE_ON_DELETE flag */
-               disallow_move_on_delete =
-                       (ldb_msg_find_attr_as_int(old_msg, "systemFlags", 0)
-                               & SYSTEM_FLAG_DISALLOW_MOVE_ON_DELETE);
+       /* consider the SYSTEM_FLAG_DISALLOW_MOVE_ON_DELETE flag */
+       disallow_move_on_delete =
+               (ldb_msg_find_attr_as_int(old_msg, "systemFlags", 0)
+                & SYSTEM_FLAG_DISALLOW_MOVE_ON_DELETE);
 
-               /* work out where we will be renaming this object to */
-               if (!disallow_move_on_delete) {
-                       ret = dsdb_get_deleted_objects_dn(ldb, tmp_ctx, old_dn,
-                                                         &new_dn);
-                       if (ret != LDB_SUCCESS) {
-                               /* this is probably an attempted delete on a partition
-                                * that doesn't allow delete operations, such as the
-                                * schema partition */
-                               ldb_asprintf_errstring(ldb, "No Deleted Objects container for DN %s",
-                                                          ldb_dn_get_linearized(old_dn));
-                               talloc_free(tmp_ctx);
-                               return LDB_ERR_UNWILLING_TO_PERFORM;
-                       }
-               } else {
+       /* work out where we will be renaming this object to */
+       if (!disallow_move_on_delete) {
+               struct ldb_dn *deleted_objects_dn;
+               ret = dsdb_get_deleted_objects_dn(ldb, tmp_ctx, old_dn,
+                                                 &deleted_objects_dn);
+
+               /*
+                * We should not move objects if we can't find the
+                * deleted objects DN.  Not moving (or otherwise
+                * harming) the Deleted Objects DN itself is handled
+                * in the caller.
+                */
+               if (re_delete && (ret != LDB_SUCCESS)) {
                        new_dn = ldb_dn_get_parent(tmp_ctx, old_dn);
                        if (new_dn == NULL) {
                                ldb_module_oom(module);
                                talloc_free(tmp_ctx);
                                return LDB_ERR_OPERATIONS_ERROR;
                        }
+               } else if (ret != LDB_SUCCESS) {
+                       /* this is probably an attempted delete on a partition
+                        * that doesn't allow delete operations, such as the
+                        * schema partition */
+                       ldb_asprintf_errstring(ldb, "No Deleted Objects container for DN %s",
+                                              ldb_dn_get_linearized(old_dn));
+                       talloc_free(tmp_ctx);
+                       return LDB_ERR_UNWILLING_TO_PERFORM;
+               } else {
+                       new_dn = deleted_objects_dn;
                }
+       } else {
+               new_dn = ldb_dn_get_parent(tmp_ctx, old_dn);
+               if (new_dn == NULL) {
+                       ldb_module_oom(module);
+                       talloc_free(tmp_ctx);
+                       return LDB_ERR_OPERATIONS_ERROR;
+               }
+       }
 
+       if (deletion_state == OBJECT_NOT_DELETED) {
                /* get the objects GUID from the search we just did */
                guid = samdb_result_guid(old_msg, "objectGUID");
 
                /* Add a formatted child */
                retb = ldb_dn_add_child_fmt(new_dn, "%s=%s\\0ADEL:%s",
-                                               rdn_name,
-                                               ldb_dn_escape_value(tmp_ctx, *rdn_value),
-                                               GUID_string(tmp_ctx, &guid));
+                                           rdn_name,
+                                           ldb_dn_escape_value(tmp_ctx, *rdn_value),
+                                           GUID_string(tmp_ctx, &guid));
                if (!retb) {
-                       DEBUG(0,(__location__ ": Unable to add a formatted child to dn: %s",
-                                       ldb_dn_get_linearized(new_dn)));
+                       ldb_asprintf_errstring(ldb, __location__
+                                              ": Unable to add a formatted child to dn: %s",
+                                              ldb_dn_get_linearized(new_dn));
                        talloc_free(tmp_ctx);
                        return LDB_ERR_OPERATIONS_ERROR;
                }
 
                ret = ldb_msg_add_string(msg, "isDeleted", "TRUE");
                if (ret != LDB_SUCCESS) {
-                       DEBUG(0,(__location__ ": Failed to add isDeleted string to the msg\n"));
-                       ldb_module_oom(module);
+                       ldb_asprintf_errstring(ldb, __location__
+                                              ": Failed to add isDeleted string to the msg");
                        talloc_free(tmp_ctx);
                        return ret;
                }
                msg->elements[el_count++].flags = LDB_FLAG_MOD_REPLACE;
+       } else {
+               /*
+                * No matter what has happened with other renames etc, try again to
+                * get this to be under the deleted DN. See MS-DRSR 5.160 RemoveObj
+                */
+
+               struct ldb_dn *rdn = ldb_dn_copy(tmp_ctx, old_dn);
+               retb = ldb_dn_remove_base_components(rdn, ldb_dn_get_comp_num(rdn) - 1);
+               if (!retb) {
+                       ldb_asprintf_errstring(ldb, __location__
+                                              ": Unable to add a prepare rdn of %s",
+                                              ldb_dn_get_linearized(rdn));
+                       talloc_free(tmp_ctx);
+                       return LDB_ERR_OPERATIONS_ERROR;
+               }
+               SMB_ASSERT(ldb_dn_get_comp_num(rdn) == 1);
+
+               retb = ldb_dn_add_child(new_dn, rdn);
+               if (!retb) {
+                       ldb_asprintf_errstring(ldb, __location__
+                                              ": Unable to add rdn %s to base dn: %s",
+                                              ldb_dn_get_linearized(rdn),
+                                              ldb_dn_get_linearized(new_dn));
+                       talloc_free(tmp_ctx);
+                       return LDB_ERR_OPERATIONS_ERROR;
+               }
        }
 
        /*
@@ -2877,65 +3272,93 @@ static int replmd_delete(struct ldb_module *module, struct ldb_request *req)
          see MS-ADTS "Tombstone Requirements" section 3.1.1.5.5.1.1
         */
 
-       /* we need the storage form of the parent GUID */
-       ret = dsdb_module_search_dn(module, tmp_ctx, &parent_res,
-                                   ldb_dn_get_parent(tmp_ctx, old_dn), NULL,
-                                   DSDB_FLAG_NEXT_MODULE |
-                                   DSDB_SEARCH_SHOW_DN_IN_STORAGE_FORMAT |
-                                   DSDB_SEARCH_REVEAL_INTERNALS|
-                                   DSDB_SEARCH_SHOW_RECYCLED, req);
-       if (ret != LDB_SUCCESS) {
-               talloc_free(tmp_ctx);
-               return ret;
-       }
+       if (deletion_state == OBJECT_NOT_DELETED) {
+               struct ldb_dn *parent_dn = ldb_dn_get_parent(tmp_ctx, old_dn);
+               char *parent_dn_str = NULL;
 
-       if (deletion_state == OBJECT_NOT_DELETED){
-               ret = ldb_msg_add_steal_string(msg, "lastKnownParent",
-                                                  ldb_dn_get_extended_linearized(tmp_ctx, parent_res->msgs[0]->dn, 1));
+               /* we need the storage form of the parent GUID */
+               ret = dsdb_module_search_dn(module, tmp_ctx, &parent_res,
+                                           parent_dn, NULL,
+                                           DSDB_FLAG_NEXT_MODULE |
+                                           DSDB_SEARCH_SHOW_DN_IN_STORAGE_FORMAT |
+                                           DSDB_SEARCH_REVEAL_INTERNALS|
+                                           DSDB_SEARCH_SHOW_RECYCLED, req);
                if (ret != LDB_SUCCESS) {
-                       DEBUG(0,(__location__ ": Failed to add lastKnownParent string to the msg\n"));
-                       ldb_module_oom(module);
+                       ldb_asprintf_errstring(ldb_module_get_ctx(module),
+                                              "repmd_delete: Failed to %s %s, "
+                                              "because we failed to find it's parent (%s): %s",
+                                              re_delete ? "re-delete" : "delete",
+                                              ldb_dn_get_linearized(old_dn),
+                                              ldb_dn_get_linearized(parent_dn),
+                                              ldb_errstring(ldb_module_get_ctx(module)));
                        talloc_free(tmp_ctx);
                        return ret;
                }
-               msg->elements[el_count++].flags = LDB_FLAG_MOD_ADD;
-       }
-
-       switch (next_deletion_state){
 
-       case OBJECT_DELETED:
-
-               ret = ldb_msg_add_value(msg, "msDS-LastKnownRDN", rdn_value, NULL);
-               if (ret != LDB_SUCCESS) {
-                       DEBUG(0,(__location__ ": Failed to add msDS-LastKnownRDN string to the msg\n"));
-                       ldb_module_oom(module);
+               /*
+                * Now we can use the DB version,
+                * it will have the extended DN info in it
+                */
+               parent_dn = parent_res->msgs[0]->dn;
+               parent_dn_str = ldb_dn_get_extended_linearized(tmp_ctx,
+                                                              parent_dn,
+                                                              1);
+               if (parent_dn_str == NULL) {
                        talloc_free(tmp_ctx);
-                       return ret;
+                       return ldb_module_oom(module);
                }
-               msg->elements[el_count++].flags = LDB_FLAG_MOD_ADD;
 
-               ret = ldb_msg_add_empty(msg, "objectCategory", LDB_FLAG_MOD_DELETE, NULL);
+               ret = ldb_msg_add_steal_string(msg, "lastKnownParent",
+                                              parent_dn_str);
                if (ret != LDB_SUCCESS) {
+                       ldb_asprintf_errstring(ldb, __location__
+                                              ": Failed to add lastKnownParent "
+                                              "string when deleting %s",
+                                              ldb_dn_get_linearized(old_dn));
                        talloc_free(tmp_ctx);
-                       ldb_module_oom(module);
                        return ret;
                }
+               msg->elements[el_count++].flags = LDB_FLAG_MOD_REPLACE;
 
-               ret = ldb_msg_add_empty(msg, "sAMAccountType", LDB_FLAG_MOD_DELETE, NULL);
-               if (ret != LDB_SUCCESS) {
-                       talloc_free(tmp_ctx);
-                       ldb_module_oom(module);
-                       return ret;
+               if (next_deletion_state == OBJECT_DELETED) {
+                       ret = ldb_msg_add_value(msg, "msDS-LastKnownRDN", rdn_value, NULL);
+                       if (ret != LDB_SUCCESS) {
+                               ldb_asprintf_errstring(ldb, __location__
+                                                      ": Failed to add msDS-LastKnownRDN "
+                                                      "string when deleting %s",
+                                                      ldb_dn_get_linearized(old_dn));
+                               talloc_free(tmp_ctx);
+                               return ret;
+                       }
+                       msg->elements[el_count++].flags = LDB_FLAG_MOD_ADD;
                }
+       }
 
-               break;
+       switch (next_deletion_state) {
 
        case OBJECT_RECYCLED:
        case OBJECT_TOMBSTONE:
 
-               /* we also mark it as recycled, meaning this object can't be
-                  recovered (we are stripping its attributes) */
-               if (dsdb_functional_level(ldb) >= DS_DOMAIN_FUNCTION_2008_R2) {
+               /*
+                * MS-ADTS 3.1.1.5.5.1.1 Tombstone Requirements
+                * describes what must be removed from a tombstone
+                * object
+                *
+                * MS-ADTS 3.1.1.5.5.1.3 Recycled-Object Requirements
+                * describes what must be removed from a recycled
+                * object
+                *
+                */
+
+               /*
+                * we also mark it as recycled, meaning this object can't be
+                * recovered (we are stripping its attributes).
+                * This is done only if we have this schema object of course ...
+                * This behavior is identical to the one of Windows 2008R2 which
+                * always set the isRecycled attribute, even if the recycle-bin is
+                * not activated and what ever the forest level is.
+                */
+               if (dsdb_attribute_by_lDAPDisplayName(schema, "isRecycled") != NULL) {
                        ret = ldb_msg_add_string(msg, "isRecycled", "TRUE");
                        if (ret != LDB_SUCCESS) {
                                DEBUG(0,(__location__ ": Failed to add isRecycled string to the msg\n"));
@@ -2943,7 +3366,7 @@ static int replmd_delete(struct ldb_module *module, struct ldb_request *req)
                                talloc_free(tmp_ctx);
                                return ret;
                        }
-                       msg->elements[el_count++].flags = LDB_FLAG_MOD_ADD;
+                       msg->elements[el_count++].flags = LDB_FLAG_MOD_REPLACE;
                }
 
                /* work out which of the old attributes we will be removing */
@@ -2959,16 +3382,41 @@ static int replmd_delete(struct ldb_module *module, struct ldb_request *req)
                                /* don't remove the rDN */
                                continue;
                        }
-                       if (sa->linkID && sa->linkID & 1) {
+                       if (sa->linkID && (sa->linkID & 1)) {
+                               /*
+                                 we have a backlink in this object
+                                 that needs to be removed. We're not
+                                 allowed to remove it directly
+                                 however, so we instead setup a
+                                 modify to delete the corresponding
+                                 forward link
+                                */
                                ret = replmd_delete_remove_link(module, schema, old_dn, el, sa, req);
                                if (ret != LDB_SUCCESS) {
+                                       const char *old_dn_str
+                                               = ldb_dn_get_linearized(old_dn);
+                                       ldb_asprintf_errstring(ldb,
+                                                              __location__
+                                                              ": Failed to remove backlink of "
+                                                              "%s when deleting %s",
+                                                              el->name,
+                                                              old_dn_str);
                                        talloc_free(tmp_ctx);
                                        return LDB_ERR_OPERATIONS_ERROR;
                                }
+                               /* now we continue, which means we
+                                  won't remove this backlink
+                                  directly
+                               */
                                continue;
                        }
-                       if (!sa->linkID && ldb_attr_in_list(preserved_attrs, el->name)) {
-                               continue;
+                       if (!sa->linkID) {
+                               if (ldb_attr_in_list(preserved_attrs, el->name)) {
+                                       continue;
+                               }
+                               if (sa->searchFlags & SEARCH_FLAG_PRESERVEONDELETE) {
+                                       continue;
+                               }
                        }
                        ret = ldb_msg_add_empty(msg, el->name, LDB_FLAG_MOD_DELETE, &el);
                        if (ret != LDB_SUCCESS) {
@@ -2977,6 +3425,30 @@ static int replmd_delete(struct ldb_module *module, struct ldb_request *req)
                                return ret;
                        }
                }
+
+               break;
+
+       case OBJECT_DELETED:
+               /*
+                * MS-ADTS 3.1.1.5.5.1.2 Deleted-Object Requirements
+                * describes what must be removed from a deleted
+                * object
+                */
+
+               ret = ldb_msg_add_empty(msg, "objectCategory", LDB_FLAG_MOD_REPLACE, NULL);
+               if (ret != LDB_SUCCESS) {
+                       talloc_free(tmp_ctx);
+                       ldb_module_oom(module);
+                       return ret;
+               }
+
+               ret = ldb_msg_add_empty(msg, "sAMAccountType", LDB_FLAG_MOD_REPLACE, NULL);
+               if (ret != LDB_SUCCESS) {
+                       talloc_free(tmp_ctx);
+                       ldb_module_oom(module);
+                       return ret;
+               }
+
                break;
 
        default:
@@ -3019,15 +3491,16 @@ static int replmd_delete(struct ldb_module *module, struct ldb_request *req)
                }
        }
 
-       ret = dsdb_module_modify(module, msg, DSDB_FLAG_OWN_MODULE, req);
-       if (ret != LDB_SUCCESS) {
-               ldb_asprintf_errstring(ldb, "replmd_delete: Failed to modify object %s in delete - %s",
-                                      ldb_dn_get_linearized(old_dn), ldb_errstring(ldb));
-               talloc_free(tmp_ctx);
-               return ret;
-       }
+       /*
+        * TODO: Per MS-DRSR 5.160 RemoveObj we should remove links directly, not as an originating update!
+        *
+        */
 
-       if (deletion_state == OBJECT_NOT_DELETED) {
+       /*
+        * No matter what has happned with other renames, try again to
+        * get this to be under the deleted DN.
+        */
+       if (strcmp(ldb_dn_get_linearized(old_dn), ldb_dn_get_linearized(new_dn)) != 0) {
                /* now rename onto the new DN */
                ret = dsdb_module_rename(module, old_dn, new_dn, DSDB_FLAG_NEXT_MODULE, req);
                if (ret != LDB_SUCCESS){
@@ -3038,6 +3511,15 @@ static int replmd_delete(struct ldb_module *module, struct ldb_request *req)
                        talloc_free(tmp_ctx);
                        return ret;
                }
+               msg->dn = new_dn;
+       }
+
+       ret = dsdb_module_modify(module, msg, DSDB_FLAG_OWN_MODULE, req);
+       if (ret != LDB_SUCCESS) {
+               ldb_asprintf_errstring(ldb, "replmd_delete: Failed to modify object %s in delete - %s",
+                                      ldb_dn_get_linearized(old_dn), ldb_errstring(ldb));
+               talloc_free(tmp_ctx);
+               return ret;
        }
 
        talloc_free(tmp_ctx);
@@ -3045,6 +3527,10 @@ static int replmd_delete(struct ldb_module *module, struct ldb_request *req)
        return ldb_module_done(req, NULL, NULL, LDB_SUCCESS);
 }
 
+static int replmd_delete(struct ldb_module *module, struct ldb_request *req)
+{
+       return replmd_delete_internals(module, req, false);
+}
 
 
 static int replmd_replicated_request_error(struct replmd_replicated_request *ar, int ret)
@@ -3056,6 +3542,10 @@ static int replmd_replicated_request_werror(struct replmd_replicated_request *ar
 {
        int ret = LDB_ERR_OTHER;
        /* TODO: do some error mapping */
+
+       /* Let the caller know the full WERROR */
+       ar->objs->error = status;
+
        return ret;
 }
 
@@ -3107,6 +3597,47 @@ static bool replmd_replPropertyMetaData1_is_newer(struct replPropertyMetaData1 *
                                      new_m->originating_change_time);
 }
 
+static bool replmd_replPropertyMetaData1_new_should_be_taken(uint32_t dsdb_repl_flags,
+                                                            struct replPropertyMetaData1 *cur_m,
+                                                            struct replPropertyMetaData1 *new_m)
+{
+       bool cmp;
+
+       /*
+        * If the new replPropertyMetaData entry for this attribute is
+        * not provided (this happens in the case where we look for
+        * ATTID_name, but the name was not changed), then the local
+        * state is clearly still current, as the remote
+        * server didn't send it due to being older the high watermark
+        * USN we sent.
+        */
+       if (new_m == NULL) {
+               return false;
+       }
+
+       if (dsdb_repl_flags & DSDB_REPL_FLAG_PRIORITISE_INCOMING) {
+               /*
+                * if we compare equal then do an
+                * update. This is used when a client
+                * asks for a FULL_SYNC, and can be
+                * used to recover a corrupt
+                * replica.
+                *
+                * This call is a bit tricky, what we
+                * are doing it turning the 'is_newer'
+                * call into a 'not is older' by
+                * swapping cur_m and new_m, and negating the
+                * outcome.
+                */
+               cmp = !replmd_replPropertyMetaData1_is_newer(new_m,
+                                                            cur_m);
+       } else {
+               cmp = replmd_replPropertyMetaData1_is_newer(cur_m,
+                                                           new_m);
+       }
+       return cmp;
+}
+
 
 /*
   form a conflict DN
@@ -3219,11 +3750,14 @@ failed:
   callback for conflict DN handling where we have renamed the incoming
   record. After renaming it, we need to ensure the change of name and
   rDN for the incoming record is seen as an originating update by this DC.
+
+  This also handles updating lastKnownParent for entries sent to lostAndFound
  */
 static int replmd_op_name_modify_callback(struct ldb_request *req, struct ldb_reply *ares)
 {
        struct replmd_replicated_request *ar =
                talloc_get_type_abort(req->context, struct replmd_replicated_request);
+       struct ldb_dn *conflict_dn = NULL;
        int ret;
 
        if (ares->error != LDB_SUCCESS) {
@@ -3231,13 +3765,52 @@ static int replmd_op_name_modify_callback(struct ldb_request *req, struct ldb_re
                return replmd_op_callback(req, ares);
        }
 
+       switch (req->operation) {
+       case LDB_ADD:
+               conflict_dn = req->op.add.message->dn;
+               break;
+       case LDB_MODIFY:
+               conflict_dn = req->op.mod.message->dn;
+               break;
+       default:
+               smb_panic("replmd_op_name_modify_callback called in unknown circumstances");
+       }
+
        /* perform a modify of the rDN and name of the record */
-       ret = replmd_name_modify(ar, req, req->op.add.message->dn);
+       ret = replmd_name_modify(ar, req, conflict_dn);
        if (ret != LDB_SUCCESS) {
                ares->error = ret;
                return replmd_op_callback(req, ares);
        }
 
+       if (ar->objs->objects[ar->index_current].last_known_parent) {
+               struct ldb_message *msg = ldb_msg_new(req);
+               if (msg == NULL) {
+                       ldb_module_oom(ar->module);
+                       return LDB_ERR_OPERATIONS_ERROR;
+               }
+
+               msg->dn = req->op.add.message->dn;
+
+               ret = ldb_msg_add_steal_string(msg, "lastKnownParent",
+                                              ldb_dn_get_extended_linearized(msg, ar->objs->objects[ar->index_current].last_known_parent, 1));
+               if (ret != LDB_SUCCESS) {
+                       DEBUG(0,(__location__ ": Failed to add lastKnownParent string to the msg\n"));
+                       ldb_module_oom(ar->module);
+                       return ret;
+               }
+               msg->elements[0].flags = LDB_FLAG_MOD_REPLACE;
+
+               ret = dsdb_module_modify(ar->module, msg, DSDB_FLAG_OWN_MODULE, req);
+               if (ret != LDB_SUCCESS) {
+                       DEBUG(0,(__location__ ": Failed to modify lastKnownParent of lostAndFound DN '%s' - %s",
+                                ldb_dn_get_linearized(msg->dn),
+                                ldb_errstring(ldb_module_get_ctx(ar->module))));
+                       return ret;
+               }
+               TALLOC_FREE(msg);
+       }
+
        return replmd_op_callback(req, ares);
 }
 
@@ -3246,7 +3819,7 @@ static int replmd_op_name_modify_callback(struct ldb_request *req, struct ldb_re
   This copes with the creation of conflict records in the case where
   the DN exists, but with a different objectGUID
  */
-static int replmd_op_add_callback(struct ldb_request *req, struct ldb_reply *ares)
+static int replmd_op_possible_conflict_callback(struct ldb_request *req, struct ldb_reply *ares, int (*callback)(struct ldb_request *req, struct ldb_reply *ares))
 {
        struct ldb_dn *conflict_dn;
        struct replmd_replicated_request *ar =
@@ -3254,27 +3827,62 @@ static int replmd_op_add_callback(struct ldb_request *req, struct ldb_reply *are
        struct ldb_result *res;
        const char *attrs[] = { "replPropertyMetaData", "objectGUID", NULL };
        int ret;
-       const struct ldb_val *rmd_value, *omd_value;
-       struct replPropertyMetaDataBlob omd, rmd;
+       const struct ldb_val *omd_value;
+       struct replPropertyMetaDataBlob omd, *rmd;
        enum ndr_err_code ndr_err;
-       bool rename_incoming_record;
+       bool rename_incoming_record, rodc;
        struct replPropertyMetaData1 *rmd_name, *omd_name;
+       struct ldb_message *msg;
+       struct ldb_request *down_req = NULL;
 
-       if (ares->error != LDB_ERR_ENTRY_ALREADY_EXISTS) {
-               /* call the normal callback for everything except
-                  conflicts */
-               return replmd_op_callback(req, ares);
+       /* call the normal callback for success */
+       if (ares->error == LDB_SUCCESS) {
+               return callback(req, ares);
        }
 
        /*
         * we have a conflict, and need to decide if we will keep the
         * new record or the old record
         */
-       conflict_dn = req->op.add.message->dn;
+
+       msg = ar->objs->objects[ar->index_current].msg;
+       conflict_dn = msg->dn;
+
+       /* For failures other than conflicts, fail the whole operation here */
+       if (ares->error != LDB_ERR_ENTRY_ALREADY_EXISTS) {
+               ldb_asprintf_errstring(ldb_module_get_ctx(ar->module), "Failed to locally apply remote add of %s: %s",
+                                      ldb_dn_get_linearized(conflict_dn),
+                                      ldb_errstring(ldb_module_get_ctx(ar->module)));
+
+               return ldb_module_done(ar->req, NULL, NULL,
+                                      LDB_ERR_OPERATIONS_ERROR);
+       }
+
+       ret = samdb_rodc(ldb_module_get_ctx(ar->module), &rodc);
+       if (ret != LDB_SUCCESS) {
+               ldb_asprintf_errstring(ldb_module_get_ctx(ar->module), "Failed to determine if we are an RODC when attempting to form conflict DN: %s", ldb_errstring(ldb_module_get_ctx(ar->module)));
+               return ldb_module_done(ar->req, NULL, NULL,
+                                      LDB_ERR_OPERATIONS_ERROR);
+
+       }
+
+       if (rodc) {
+               /*
+                * We are on an RODC, or were a GC for this
+                * partition, so we have to fail this until
+                * someone who owns the partition sorts it
+                * out 
+                */
+               ldb_asprintf_errstring(ldb_module_get_ctx(ar->module), 
+                                      "Conflict adding object '%s' from incoming replication as we are read only for the partition.  \n"
+                                      " - We must fail the operation until a master for this partition resolves the conflict",
+                                      ldb_dn_get_linearized(conflict_dn));
+               goto failed;
+       }
 
        /*
         * first we need the replPropertyMetaData attribute from the
-        * old record
+        * local, conflicting record
         */
        ret = dsdb_module_search_dn(ar->module, req, &res, conflict_dn,
                                    attrs,
@@ -3302,43 +3910,39 @@ static int replmd_op_add_callback(struct ldb_request *req, struct ldb_reply *are
                goto failed;
        }
 
+       rmd = ar->objs->objects[ar->index_current].meta_data;
+
        /*
-        * and the replPropertyMetaData attribute from the
-        * new record
+        * we decide which is newer based on the RPMD on the name
+        * attribute.  See [MS-DRSR] ResolveNameConflict.
+        *
+        * We expect omd_name to be present, as this is from a local
+        * search, but while rmd_name should have been given to us by
+        * the remote server, if it is missing we just prefer the
+        * local name in
+        * replmd_replPropertyMetaData1_new_should_be_taken()
         */
-       rmd_value = ldb_msg_find_ldb_val(req->op.add.message, "replPropertyMetaData");
-       if (rmd_value == NULL) {
-               DEBUG(0,(__location__ ": Unable to find replPropertyMetaData for new record '%s'\n",
-                        ldb_dn_get_linearized(conflict_dn)));
-               goto failed;
-       }
-
-       ndr_err = ndr_pull_struct_blob(rmd_value, req, &rmd,
-                                      (ndr_pull_flags_fn_t)ndr_pull_replPropertyMetaDataBlob);
-       if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
-               DEBUG(0,(__location__ ": Failed to parse new replPropertyMetaData for %s\n",
-                        ldb_dn_get_linearized(conflict_dn)));
-               goto failed;
-       }
-
-       /* we decide which is newer based on the RPMD on the name
-          attribute.  See [MS-DRSR] ResolveNameConflict */
-       rmd_name = replmd_replPropertyMetaData1_find_attid(&rmd, DRSUAPI_ATTID_name);
+       rmd_name = replmd_replPropertyMetaData1_find_attid(rmd, DRSUAPI_ATTID_name);
        omd_name = replmd_replPropertyMetaData1_find_attid(&omd, DRSUAPI_ATTID_name);
-       if (!rmd_name || !omd_name) {
-               DEBUG(0,(__location__ ": Failed to find name attribute in replPropertyMetaData for %s\n",
+       if (!omd_name) {
+               DEBUG(0,(__location__ ": Failed to find name attribute in local LDB replPropertyMetaData for %s\n",
                         ldb_dn_get_linearized(conflict_dn)));
                goto failed;
        }
 
-       rename_incoming_record = !replmd_replPropertyMetaData1_is_newer(omd_name, rmd_name);
+       /*
+        * Should we preserve the current record, and so rename the
+        * incoming record to be a conflict?
+        */
+       rename_incoming_record
+               = !replmd_replPropertyMetaData1_new_should_be_taken(ar->objs->dsdb_repl_flags & DSDB_REPL_FLAG_PRIORITISE_INCOMING,
+                                                                   omd_name, rmd_name);
 
        if (rename_incoming_record) {
                struct GUID guid;
                struct ldb_dn *new_dn;
-               struct ldb_message *new_msg;
 
-               guid = samdb_result_guid(req->op.add.message, "objectGUID");
+               guid = samdb_result_guid(msg, "objectGUID");
                if (GUID_all_zero(&guid)) {
                        DEBUG(0,(__location__ ": Failed to find objectGUID for conflicting incoming record %s\n",
                                 ldb_dn_get_linearized(conflict_dn)));
@@ -3351,22 +3955,12 @@ static int replmd_op_add_callback(struct ldb_request *req, struct ldb_reply *are
                        goto failed;
                }
 
-               DEBUG(1,(__location__ ": Resolving conflict record via incoming rename '%s' -> '%s'\n",
+               DEBUG(2,(__location__ ": Resolving conflict record via incoming rename '%s' -> '%s'\n",
                         ldb_dn_get_linearized(conflict_dn), ldb_dn_get_linearized(new_dn)));
 
-               /* re-submit the request, but with a different
-                  callback, so we don't loop forever. */
-               new_msg = ldb_msg_copy_shallow(req, req->op.add.message);
-               if (!new_msg) {
-                       goto failed;
-                       DEBUG(0,(__location__ ": Failed to copy conflict DN message for %s\n",
-                                ldb_dn_get_linearized(conflict_dn)));
-               }
-               new_msg->dn = new_dn;
-               req->op.add.message = new_msg;
-               req->callback = replmd_op_name_modify_callback;
-
-               return ldb_next_request(ar->module, req);
+               /* re-submit the request, but with the new DN */
+               callback = replmd_op_name_modify_callback;
+               msg->dn = new_dn;
        } else {
                /* we are renaming the existing record */
                struct GUID guid;
@@ -3386,7 +3980,7 @@ static int replmd_op_add_callback(struct ldb_request *req, struct ldb_reply *are
                        goto failed;
                }
 
-               DEBUG(1,(__location__ ": Resolving conflict record via existing rename '%s' -> '%s'\n",
+               DEBUG(2,(__location__ ": Resolving conflict record via existing-record rename '%s' -> '%s'\n",
                         ldb_dn_get_linearized(conflict_dn), ldb_dn_get_linearized(new_dn)));
 
                ret = dsdb_module_rename(ar->module, conflict_dn, new_dn,
@@ -3408,17 +4002,74 @@ static int replmd_op_add_callback(struct ldb_request *req, struct ldb_reply *are
                        goto failed;
                }
 
-               req->callback = replmd_op_callback;
+               DEBUG(2,(__location__ ": With conflicting record renamed, re-apply replicated creation of '%s'\n",
+                        ldb_dn_get_linearized(req->op.add.message->dn)));
+       }
 
-               return ldb_next_request(ar->module, req);
+       ret = ldb_build_add_req(&down_req,
+                               ldb_module_get_ctx(ar->module),
+                               req,
+                               msg,
+                               ar->controls,
+                               ar,
+                               callback,
+                               req);
+       if (ret != LDB_SUCCESS) {
+               goto failed;
        }
+       LDB_REQ_SET_LOCATION(down_req);
 
+       /* current partition control needed by "repmd_op_callback" */
+       ret = ldb_request_add_control(down_req,
+                                     DSDB_CONTROL_CURRENT_PARTITION_OID,
+                                     false, NULL);
+       if (ret != LDB_SUCCESS) {
+               return replmd_replicated_request_error(ar, ret);
+       }
+
+       if (ar->objs->dsdb_repl_flags & DSDB_REPL_FLAG_PARTIAL_REPLICA) {
+               /* this tells the partition module to make it a
+                  partial replica if creating an NC */
+               ret = ldb_request_add_control(down_req,
+                                             DSDB_CONTROL_PARTIAL_REPLICA,
+                                             false, NULL);
+               if (ret != LDB_SUCCESS) {
+                       return replmd_replicated_request_error(ar, ret);
+               }
+       }
+
+       /*
+        * Finally we re-run the add, otherwise the new record won't
+        * exist, as we are here because of that exact failure!
+        */
+       return ldb_next_request(ar->module, down_req);
 failed:
-       /* on failure do the original callback. This means replication
-        * will stop with an error, but there is not much else we can
-        * do
+
+       /* on failure make the caller get the error. This means
+        * replication will stop with an error, but there is not much
+        * else we can do.
         */
-       return replmd_op_callback(req, ares);
+       return ldb_module_done(ar->req, NULL, NULL,
+                              ret);
+}
+
+/*
+  callback for replmd_replicated_apply_add()
+  This copes with the creation of conflict records in the case where
+  the DN exists, but with a different objectGUID
+ */
+static int replmd_op_add_callback(struct ldb_request *req, struct ldb_reply *ares)
+{
+       struct replmd_replicated_request *ar =
+               talloc_get_type_abort(req->context, struct replmd_replicated_request);
+
+       if (ar->objs->objects[ar->index_current].last_known_parent) {
+               /* This is like a conflict DN, where we put the object in LostAndFound
+                  see MS-DRSR 4.1.10.6.10 FindBestParentObject */
+               return replmd_op_possible_conflict_callback(req, ares, replmd_op_name_modify_callback);
+       }
+
+       return replmd_op_possible_conflict_callback(req, ares, replmd_op_callback);
 }
 
 /*
@@ -3434,26 +4085,29 @@ static int replmd_replicated_apply_add(struct replmd_replicated_request *ar)
        struct ldb_val md_value;
        unsigned int i;
        int ret;
-
-       /*
-        * TODO: check if the parent object exist
-        */
-
-       /*
-        * TODO: handle the conflict case where an object with the
-        *       same name exist
-        */
+       bool remote_isDeleted = false;
+       bool is_schema_nc;
+       NTTIME now;
+       time_t t = time(NULL);
+       const struct ldb_val *rdn_val;
+       struct replmd_private *replmd_private =
+               talloc_get_type(ldb_module_get_private(ar->module),
+                               struct replmd_private);
+       unix_to_nt_time(&now, t);
 
        ldb = ldb_module_get_ctx(ar->module);
        msg = ar->objs->objects[ar->index_current].msg;
        md = ar->objs->objects[ar->index_current].meta_data;
+       is_schema_nc = ldb_dn_compare_base(replmd_private->schema_dn, msg->dn) == 0;
 
        ret = ldb_sequence_number(ldb, LDB_SEQ_NEXT, &ar->seq_num);
        if (ret != LDB_SUCCESS) {
                return replmd_replicated_request_error(ar, ret);
        }
 
-       ret = ldb_msg_add_value(msg, "objectGUID", &ar->objs->objects[ar->index_current].guid_value, NULL);
+       ret = dsdb_msg_add_guid(msg,
+                               &ar->objs->objects[ar->index_current].object_guid,
+                               "objectGUID");
        if (ret != LDB_SUCCESS) {
                return replmd_replicated_request_error(ar, ret);
        }
@@ -3478,6 +4132,13 @@ static int replmd_replicated_apply_add(struct replmd_replicated_request *ar)
                struct ldb_message_element *el = &msg->elements[i];
 
                if (el->num_values == 0) {
+                       if (ldb_attr_cmp(msg->elements[i].name, "objectClass") == 0) {
+                               ldb_asprintf_errstring(ldb, __location__
+                                                      ": empty objectClass sent on %s, aborting replication\n",
+                                                      ldb_dn_get_linearized(msg->dn));
+                               return replmd_replicated_request_error(ar, LDB_ERR_OBJECT_CLASS_VIOLATION);
+                       }
+
                        DEBUG(4,(__location__ ": Removing attribute %s with num_values==0\n",
                                 el->name));
                        memmove(el, el+1, sizeof(*el)*(msg->num_elements - (i+1)));
@@ -3487,9 +4148,39 @@ static int replmd_replicated_apply_add(struct replmd_replicated_request *ar)
                }
        }
 
+       if (DEBUGLVL(4)) {
+               struct GUID_txt_buf guid_txt;
+
+               char *s = ldb_ldif_message_string(ldb, ar, LDB_CHANGETYPE_ADD, msg);
+               DEBUG(4, ("DRS replication add message of %s:\n%s\n",
+                         GUID_buf_string(&ar->objs->objects[ar->index_current].object_guid, &guid_txt),
+                         s));
+               talloc_free(s);
+       }
+
+       remote_isDeleted = ldb_msg_find_attr_as_bool(msg,
+                                                    "isDeleted", false);
+
        /*
-        * the meta data array is already sorted by the caller
+        * the meta data array is already sorted by the caller, except
+        * for the RDN, which needs to be added.
         */
+
+
+       rdn_val = ldb_dn_get_rdn_val(msg->dn);
+       ret = replmd_update_rpmd_rdn_attr(ldb, msg, rdn_val, NULL,
+                                    md, ar, now, is_schema_nc);
+       if (ret != LDB_SUCCESS) {
+               ldb_asprintf_errstring(ldb, "%s: error during DRS repl ADD: %s", __func__, ldb_errstring(ldb));
+               return replmd_replicated_request_error(ar, ret);
+       }
+
+       ret = replmd_replPropertyMetaDataCtr1_sort_and_verify(ldb, &md->ctr.ctr1, msg->dn);
+       if (ret != LDB_SUCCESS) {
+               ldb_asprintf_errstring(ldb, "%s: error during DRS repl ADD: %s", __func__, ldb_errstring(ldb));
+               return replmd_replicated_request_error(ar, ret);
+       }
+
        for (i=0; i < md->ctr.ctr1.count; i++) {
                md->ctr.ctr1.array[i].local_usn = ar->seq_num;
        }
@@ -3506,12 +4197,17 @@ static int replmd_replicated_apply_add(struct replmd_replicated_request *ar)
 
        replmd_ldb_message_sort(msg, ar->schema);
 
-       if (DEBUGLVL(4)) {
-               char *s = ldb_ldif_message_string(ldb, ar, LDB_CHANGETYPE_ADD, msg);
-               DEBUG(4, ("DRS replication add message:\n%s\n", s));
-               talloc_free(s);
+       if (!remote_isDeleted) {
+               ret = dsdb_module_schedule_sd_propagation(ar->module,
+                                                         ar->objs->partition_dn,
+                                                         msg->dn, true);
+               if (ret != LDB_SUCCESS) {
+                       return replmd_replicated_request_error(ar, ret);
+               }
        }
 
+       ar->isDeleted = remote_isDeleted;
+
        ret = ldb_build_add_req(&change_req,
                                ldb,
                                ar,
@@ -3529,52 +4225,440 @@ static int replmd_replicated_apply_add(struct replmd_replicated_request *ar)
                                      false, NULL);
        if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
 
+       if (ar->objs->dsdb_repl_flags & DSDB_REPL_FLAG_PARTIAL_REPLICA) {
+               /* this tells the partition module to make it a
+                  partial replica if creating an NC */
+               ret = ldb_request_add_control(change_req,
+                                             DSDB_CONTROL_PARTIAL_REPLICA,
+                                             false, NULL);
+               if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
+       }
+
        return ldb_next_request(ar->module, change_req);
 }
 
+static int replmd_replicated_apply_search_for_parent_callback(struct ldb_request *req,
+                                                             struct ldb_reply *ares)
+{
+       struct replmd_replicated_request *ar = talloc_get_type(req->context,
+                                              struct replmd_replicated_request);
+       int ret;
+
+       if (!ares) {
+               return ldb_module_done(ar->req, NULL, NULL,
+                                       LDB_ERR_OPERATIONS_ERROR);
+       }
+
+       /*
+        * The error NO_SUCH_OBJECT is not expected, unless the search
+        * base is the partition DN, and that case doesn't happen here
+        * because then we wouldn't get a parent_guid_value in any
+        * case.
+        */
+       if (ares->error != LDB_SUCCESS) {
+               return ldb_module_done(ar->req, ares->controls,
+                                       ares->response, ares->error);
+       }
+
+       switch (ares->type) {
+       case LDB_REPLY_ENTRY:
+       {
+               struct ldb_message *parent_msg = ares->message;
+               struct ldb_message *msg = ar->objs->objects[ar->index_current].msg;
+               struct ldb_dn *parent_dn;
+               int comp_num;
+
+               if (!ldb_msg_check_string_attribute(msg, "isDeleted", "TRUE")
+                   && ldb_msg_check_string_attribute(parent_msg, "isDeleted", "TRUE")) {
+                       /* Per MS-DRSR 4.1.10.6.10
+                        * FindBestParentObject we need to move this
+                        * new object under a deleted object to
+                        * lost-and-found */
+                       struct ldb_dn *nc_root;
+
+                       ret = dsdb_find_nc_root(ldb_module_get_ctx(ar->module), msg, msg->dn, &nc_root);
+                       if (ret == LDB_ERR_NO_SUCH_OBJECT) {
+                               ldb_asprintf_errstring(ldb_module_get_ctx(ar->module),
+                                                      "No suitable NC root found for %s.  "
+                                                      "We need to move this object because parent object %s "
+                                                      "is deleted, but this object is not.",
+                                                      ldb_dn_get_linearized(msg->dn),
+                                                      ldb_dn_get_linearized(parent_msg->dn));
+                               return ldb_module_done(ar->req, NULL, NULL, LDB_ERR_OPERATIONS_ERROR);
+                       } else if (ret != LDB_SUCCESS) {
+                               ldb_asprintf_errstring(ldb_module_get_ctx(ar->module),
+                                                      "Unable to find NC root for %s: %s. "
+                                                      "We need to move this object because parent object %s "
+                                                      "is deleted, but this object is not.",
+                                                      ldb_dn_get_linearized(msg->dn),
+                                                      ldb_errstring(ldb_module_get_ctx(ar->module)),
+                                                      ldb_dn_get_linearized(parent_msg->dn));
+                               return ldb_module_done(ar->req, NULL, NULL, LDB_ERR_OPERATIONS_ERROR);
+                       }
+                       
+                       ret = dsdb_wellknown_dn(ldb_module_get_ctx(ar->module), msg,
+                                               nc_root,
+                                               DS_GUID_LOSTANDFOUND_CONTAINER,
+                                               &parent_dn);
+                       if (ret != LDB_SUCCESS) {
+                               ldb_asprintf_errstring(ldb_module_get_ctx(ar->module),
+                                                      "Unable to find LostAndFound Container for %s "
+                                                      "in partition %s: %s. "
+                                                      "We need to move this object because parent object %s "
+                                                      "is deleted, but this object is not.",
+                                                      ldb_dn_get_linearized(msg->dn), ldb_dn_get_linearized(nc_root),
+                                                      ldb_errstring(ldb_module_get_ctx(ar->module)),
+                                                      ldb_dn_get_linearized(parent_msg->dn));
+                               return ldb_module_done(ar->req, NULL, NULL, LDB_ERR_OPERATIONS_ERROR);
+                       }
+                       ar->objs->objects[ar->index_current].last_known_parent
+                               = talloc_steal(ar->objs->objects[ar->index_current].msg, parent_msg->dn);
+
+               } else {
+                       parent_dn
+                               = talloc_steal(ar->objs->objects[ar->index_current].msg, parent_msg->dn);
+
+               }
+               ar->objs->objects[ar->index_current].local_parent_dn = parent_dn;
+
+               comp_num = ldb_dn_get_comp_num(msg->dn);
+               if (comp_num > 1) {
+                       if (!ldb_dn_remove_base_components(msg->dn, comp_num - 1)) {
+                               talloc_free(ares);
+                               return ldb_module_done(ar->req, NULL, NULL, ldb_module_operr(ar->module));
+                       }
+               }
+               if (!ldb_dn_add_base(msg->dn, parent_dn)) {
+                       talloc_free(ares);
+                       return ldb_module_done(ar->req, NULL, NULL, ldb_module_operr(ar->module));
+               }
+               break;
+       }
+       case LDB_REPLY_REFERRAL:
+               /* we ignore referrals */
+               break;
+
+       case LDB_REPLY_DONE:
+
+               if (ar->objs->objects[ar->index_current].local_parent_dn == NULL) {
+                       struct GUID_txt_buf str_buf;
+                       if (ar->search_msg != NULL) {
+                               ldb_asprintf_errstring(ldb_module_get_ctx(ar->module),
+                                                      "No parent with GUID %s found for object locally known as %s",
+                                                      GUID_buf_string(ar->objs->objects[ar->index_current].parent_guid, &str_buf),
+                                                      ldb_dn_get_linearized(ar->search_msg->dn));
+                       } else {
+                               ldb_asprintf_errstring(ldb_module_get_ctx(ar->module),
+                                                      "No parent with GUID %s found for object remotely known as %s",
+                                                      GUID_buf_string(ar->objs->objects[ar->index_current].parent_guid, &str_buf),
+                                                      ldb_dn_get_linearized(ar->objs->objects[ar->index_current].msg->dn));
+                       }
+
+                       /*
+                        * This error code is really important, as it
+                        * is the flag back to the callers to retry
+                        * this with DRSUAPI_DRS_GET_ANC, and so get
+                        * the parent objects before the child
+                        * objects
+                        */
+                       return ldb_module_done(ar->req, NULL, NULL,
+                                              replmd_replicated_request_werror(ar, WERR_DS_DRA_MISSING_PARENT));
+               }
+
+               if (ar->search_msg != NULL) {
+                       ret = replmd_replicated_apply_merge(ar);
+               } else {
+                       ret = replmd_replicated_apply_add(ar);
+               }
+               if (ret != LDB_SUCCESS) {
+                       return ldb_module_done(ar->req, NULL, NULL, ret);
+               }
+       }
+
+       talloc_free(ares);
+       return LDB_SUCCESS;
+}
+
+/*
+ * Look for the parent object, so we put the new object in the right
+ * place This is akin to NameObject in MS-DRSR - this routine and the
+ * callbacks find the right parent name, and correct name for this
+ * object
+ */
+
+static int replmd_replicated_apply_search_for_parent(struct replmd_replicated_request *ar)
+{
+       struct ldb_context *ldb;
+       int ret;
+       char *tmp_str;
+       char *filter;
+       struct ldb_request *search_req;
+       static const char *attrs[] = {"isDeleted", NULL};
+       struct GUID_txt_buf guid_str_buf;
+
+       ldb = ldb_module_get_ctx(ar->module);
+
+       if (ar->objs->objects[ar->index_current].parent_guid == NULL) {
+               if (ar->search_msg != NULL) {
+                       return replmd_replicated_apply_merge(ar);
+               } else {
+                       return replmd_replicated_apply_add(ar);
+               }
+       }
+
+       tmp_str = GUID_buf_string(ar->objs->objects[ar->index_current].parent_guid,
+                                 &guid_str_buf);
+
+       filter = talloc_asprintf(ar, "(objectGUID=%s)", tmp_str);
+       if (!filter) return replmd_replicated_request_werror(ar, WERR_NOMEM);
+
+       ret = ldb_build_search_req(&search_req,
+                                  ldb,
+                                  ar,
+                                  ar->objs->partition_dn,
+                                  LDB_SCOPE_SUBTREE,
+                                  filter,
+                                  attrs,
+                                  NULL,
+                                  ar,
+                                  replmd_replicated_apply_search_for_parent_callback,
+                                  ar->req);
+       LDB_REQ_SET_LOCATION(search_req);
+
+       ret = dsdb_request_add_controls(search_req, 
+                                       DSDB_SEARCH_SHOW_RECYCLED|
+                                       DSDB_SEARCH_SHOW_DELETED|
+                                       DSDB_SEARCH_SHOW_EXTENDED_DN);
+       if (ret != LDB_SUCCESS) {
+               return ret;
+       }
+
+       return ldb_next_request(ar->module, search_req);
+}
+
 /*
   handle renames that come in over DRS replication
  */
 static int replmd_replicated_handle_rename(struct replmd_replicated_request *ar,
                                           struct ldb_message *msg,
-                                          struct replPropertyMetaDataBlob *rmd,
-                                          struct replPropertyMetaDataBlob *omd,
-                                          struct ldb_request *parent)
+                                          struct ldb_request *parent,
+                                          bool *renamed)
 {
-       struct replPropertyMetaData1 *md_remote;
-       struct replPropertyMetaData1 *md_local;
+       int ret;
+       TALLOC_CTX *tmp_ctx = talloc_new(msg);
+       struct ldb_result *res;
+       struct ldb_dn *conflict_dn;
+       const char *attrs[] = { "replPropertyMetaData", "objectGUID", NULL };
+       const struct ldb_val *omd_value;
+       struct replPropertyMetaDataBlob omd, *rmd;
+       enum ndr_err_code ndr_err;
+       bool rename_incoming_record, rodc;
+       struct replPropertyMetaData1 *rmd_name, *omd_name;
+       struct ldb_dn *new_dn;
+       struct GUID guid;
+
+       DEBUG(4,("replmd_replicated_request rename %s => %s\n",
+                ldb_dn_get_linearized(ar->search_msg->dn),
+                ldb_dn_get_linearized(msg->dn)));
+
+
+       ret = dsdb_module_rename(ar->module, ar->search_msg->dn, msg->dn,
+                                DSDB_FLAG_NEXT_MODULE, ar->req);
+       if (ret == LDB_SUCCESS) {
+               talloc_free(tmp_ctx);
+               *renamed = true;
+               return ret;
+       }
+
+       if (ret != LDB_ERR_ENTRY_ALREADY_EXISTS) {
+               talloc_free(tmp_ctx);
+               ldb_asprintf_errstring(ldb_module_get_ctx(ar->module), "Failed to locally apply remote rename from %s to %s: %s",
+                                      ldb_dn_get_linearized(ar->search_msg->dn),
+                                      ldb_dn_get_linearized(msg->dn),
+                                      ldb_errstring(ldb_module_get_ctx(ar->module)));
+               return ret;
+       }
+
+       ret = samdb_rodc(ldb_module_get_ctx(ar->module), &rodc);
+       if (ret != LDB_SUCCESS) {
+               ldb_asprintf_errstring(ldb_module_get_ctx(ar->module),
+                                      "Failed to determine if we are an RODC when attempting to form conflict DN: %s",
+                                      ldb_errstring(ldb_module_get_ctx(ar->module)));
+               return LDB_ERR_OPERATIONS_ERROR;
+       }
+       /*
+        * we have a conflict, and need to decide if we will keep the
+        * new record or the old record
+        */
+
+       conflict_dn = msg->dn;
+
+       if (rodc) {
+               /*
+                * We are on an RODC, or were a GC for this
+                * partition, so we have to fail this until
+                * someone who owns the partition sorts it
+                * out
+                */
+               ldb_asprintf_errstring(ldb_module_get_ctx(ar->module),
+                                      "Conflict adding object '%s' from incoming replication but we are read only for the partition.  \n"
+                                      " - We must fail the operation until a master for this partition resolves the conflict",
+                                      ldb_dn_get_linearized(conflict_dn));
+               goto failed;
+       }
+
+       /*
+        * first we need the replPropertyMetaData attribute from the
+        * old record
+        */
+       ret = dsdb_module_search_dn(ar->module, tmp_ctx, &res, conflict_dn,
+                                   attrs,
+                                   DSDB_FLAG_NEXT_MODULE |
+                                   DSDB_SEARCH_SHOW_DELETED |
+                                   DSDB_SEARCH_SHOW_RECYCLED, ar->req);
+       if (ret != LDB_SUCCESS) {
+               DEBUG(0,(__location__ ": Unable to find object for conflicting record '%s'\n",
+                        ldb_dn_get_linearized(conflict_dn)));
+               goto failed;
+       }
+
+       omd_value = ldb_msg_find_ldb_val(res->msgs[0], "replPropertyMetaData");
+       if (omd_value == NULL) {
+               DEBUG(0,(__location__ ": Unable to find replPropertyMetaData for conflicting record '%s'\n",
+                        ldb_dn_get_linearized(conflict_dn)));
+               goto failed;
+       }
+
+       ndr_err = ndr_pull_struct_blob(omd_value, res->msgs[0], &omd,
+                                      (ndr_pull_flags_fn_t)ndr_pull_replPropertyMetaDataBlob);
+       if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
+               DEBUG(0,(__location__ ": Failed to parse old replPropertyMetaData for %s\n",
+                        ldb_dn_get_linearized(conflict_dn)));
+               goto failed;
+       }
+
+       rmd = ar->objs->objects[ar->index_current].meta_data;
+
+       /*
+        * we decide which is newer based on the RPMD on the name
+        * attribute.  See [MS-DRSR] ResolveNameConflict.
+        *
+        * We expect omd_name to be present, as this is from a local
+        * search, but while rmd_name should have been given to us by
+        * the remote server, if it is missing we just prefer the
+        * local name in
+        * replmd_replPropertyMetaData1_new_should_be_taken()
+        */
+       rmd_name = replmd_replPropertyMetaData1_find_attid(rmd, DRSUAPI_ATTID_name);
+       omd_name = replmd_replPropertyMetaData1_find_attid(&omd, DRSUAPI_ATTID_name);
+       if (!omd_name) {
+               DEBUG(0,(__location__ ": Failed to find name attribute in local LDB replPropertyMetaData for %s\n",
+                        ldb_dn_get_linearized(conflict_dn)));
+               goto failed;
+       }
+
+       /*
+        * Should we preserve the current record, and so rename the
+        * incoming record to be a conflict?
+        */
+       rename_incoming_record =
+               !replmd_replPropertyMetaData1_new_should_be_taken(
+                       ar->objs->dsdb_repl_flags & DSDB_REPL_FLAG_PRIORITISE_INCOMING,
+                       omd_name, rmd_name);
+
+       if (rename_incoming_record) {
+
+               new_dn = replmd_conflict_dn(msg, msg->dn,
+                                           &ar->objs->objects[ar->index_current].object_guid);
+               if (new_dn == NULL) {
+                       ldb_asprintf_errstring(ldb_module_get_ctx(ar->module),
+                                                                 "Failed to form conflict DN for %s\n",
+                                                                 ldb_dn_get_linearized(msg->dn));
+
+                       return replmd_replicated_request_werror(ar, WERR_NOMEM);
+               }
+
+               ret = dsdb_module_rename(ar->module, ar->search_msg->dn, new_dn,
+                                        DSDB_FLAG_NEXT_MODULE, ar->req);
+               if (ret != LDB_SUCCESS) {
+                       ldb_asprintf_errstring(ldb_module_get_ctx(ar->module),
+                                              "Failed to rename incoming conflicting dn '%s' (was '%s') to '%s' - %s\n",
+                                              ldb_dn_get_linearized(conflict_dn),
+                                              ldb_dn_get_linearized(ar->search_msg->dn),
+                                              ldb_dn_get_linearized(new_dn),
+                                              ldb_errstring(ldb_module_get_ctx(ar->module)));
+                       return replmd_replicated_request_werror(ar, WERR_DS_DRA_DB_ERROR);
+               }
+
+               msg->dn = new_dn;
+               *renamed = true;
+               return LDB_SUCCESS;
+       }
+
+       /* we are renaming the existing record */
+
+       guid = samdb_result_guid(res->msgs[0], "objectGUID");
+       if (GUID_all_zero(&guid)) {
+               DEBUG(0,(__location__ ": Failed to find objectGUID for existing conflict record %s\n",
+                        ldb_dn_get_linearized(conflict_dn)));
+               goto failed;
+       }
+
+       new_dn = replmd_conflict_dn(tmp_ctx, conflict_dn, &guid);
+       if (new_dn == NULL) {
+               DEBUG(0,(__location__ ": Failed to form conflict DN for %s\n",
+                        ldb_dn_get_linearized(conflict_dn)));
+               goto failed;
+       }
 
-       if (ldb_dn_compare(msg->dn, ar->search_msg->dn) == 0) {
-               /* no rename */
-               return LDB_SUCCESS;
+       DEBUG(2,(__location__ ": Resolving conflict record via existing-record rename '%s' -> '%s'\n",
+                ldb_dn_get_linearized(conflict_dn), ldb_dn_get_linearized(new_dn)));
+
+       ret = dsdb_module_rename(ar->module, conflict_dn, new_dn,
+                                DSDB_FLAG_OWN_MODULE, ar->req);
+       if (ret != LDB_SUCCESS) {
+               DEBUG(0,(__location__ ": Failed to rename conflict dn '%s' to '%s' - %s\n",
+                        ldb_dn_get_linearized(conflict_dn),
+                        ldb_dn_get_linearized(new_dn),
+                        ldb_errstring(ldb_module_get_ctx(ar->module))));
+               goto failed;
        }
 
-       /* now we need to check for double renames. We could have a
-        * local rename pending which our replication partner hasn't
-        * received yet. We choose which one wins by looking at the
-        * attribute stamps on the two objects, the newer one wins
+       /*
+        * now we need to ensure that the rename is seen as an
+        * originating update. We do that with a modify.
         */
-       md_remote = replmd_replPropertyMetaData1_find_attid(rmd, DRSUAPI_ATTID_name);
-       md_local  = replmd_replPropertyMetaData1_find_attid(omd, DRSUAPI_ATTID_name);
-       /* if there is no name attribute then we have to assume the
-          object we've received is in fact newer */
-       if (!md_remote || !md_local ||
-           replmd_replPropertyMetaData1_is_newer(md_local, md_remote)) {
-               DEBUG(4,("replmd_replicated_request rename %s => %s\n",
-                        ldb_dn_get_linearized(ar->search_msg->dn),
-                        ldb_dn_get_linearized(msg->dn)));
-               /* pass rename to the next module
-                * so it doesn't appear as an originating update */
-               return dsdb_module_rename(ar->module,
-                                         ar->search_msg->dn, msg->dn,
-                                         DSDB_FLAG_NEXT_MODULE | DSDB_MODIFY_RELAX, parent);
+       ret = replmd_name_modify(ar, ar->req, new_dn);
+       if (ret != LDB_SUCCESS) {
+               goto failed;
        }
 
-       /* we're going to keep our old object */
-       DEBUG(4,(__location__ ": Keeping object %s and rejecting older rename to %s\n",
+       DEBUG(2,(__location__ ": With conflicting record renamed, re-apply replicated rename '%s' -> '%s'\n",
                 ldb_dn_get_linearized(ar->search_msg->dn),
                 ldb_dn_get_linearized(msg->dn)));
-       return LDB_SUCCESS;
+
+
+       ret = dsdb_module_rename(ar->module, ar->search_msg->dn, msg->dn,
+                                DSDB_FLAG_NEXT_MODULE, ar->req);
+       if (ret != LDB_SUCCESS) {
+               DEBUG(0,(__location__ ": After conflict resolution, failed to rename dn '%s' to '%s' - %s\n",
+                        ldb_dn_get_linearized(ar->search_msg->dn),
+                        ldb_dn_get_linearized(msg->dn),
+                        ldb_errstring(ldb_module_get_ctx(ar->module))));
+                       goto failed;
+       }
+failed:
+
+       /*
+        * On failure make the caller get the error
+        * This means replication will stop with an error,
+        * but there is not much else we can do.  In the
+        * LDB_ERR_ENTRY_ALREADY_EXISTS case this is exactly what is
+        * needed.
+        */
+
+       talloc_free(tmp_ctx);
+       return ret;
 }
 
 
@@ -3589,13 +4673,33 @@ static int replmd_replicated_apply_merge(struct replmd_replicated_request *ar)
        const struct ldb_val *omd_value;
        struct replPropertyMetaDataBlob nmd;
        struct ldb_val nmd_value;
+       struct GUID remote_parent_guid;
        unsigned int i;
        uint32_t j,ni=0;
        unsigned int removed_attrs = 0;
        int ret;
+       int (*callback)(struct ldb_request *req, struct ldb_reply *ares) = replmd_op_callback;
+       bool isDeleted = false;
+       bool local_isDeleted = false;
+       bool remote_isDeleted = false;
+       bool take_remote_isDeleted = false;
+       bool sd_updated = false;
+       bool renamed = false;
+       bool is_schema_nc = false;
+       NTSTATUS nt_status;
+       const struct ldb_val *old_rdn, *new_rdn;
+       struct replmd_private *replmd_private =
+               talloc_get_type(ldb_module_get_private(ar->module),
+                               struct replmd_private);
+       NTTIME now;
+       time_t t = time(NULL);
+       unix_to_nt_time(&now, t);
 
        ldb = ldb_module_get_ctx(ar->module);
        msg = ar->objs->objects[ar->index_current].msg;
+
+       is_schema_nc = ldb_dn_compare_base(replmd_private->schema_dn, msg->dn) == 0;
+
        rmd = ar->objs->objects[ar->index_current].meta_data;
        ZERO_STRUCT(omd);
        omd.version = 1;
@@ -3606,7 +4710,7 @@ static int replmd_replicated_apply_merge(struct replmd_replicated_request *ar)
                ndr_err = ndr_pull_struct_blob(omd_value, ar, &omd,
                                               (ndr_pull_flags_fn_t)ndr_pull_replPropertyMetaDataBlob);
                if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
-                       NTSTATUS nt_status = ndr_map_error2ntstatus(ndr_err);
+                       nt_status = ndr_map_error2ntstatus(ndr_err);
                        return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
                }
 
@@ -3615,8 +4719,67 @@ static int replmd_replicated_apply_merge(struct replmd_replicated_request *ar)
                }
        }
 
-       /* handle renames that come in over DRS */
-       ret = replmd_replicated_handle_rename(ar, msg, rmd, &omd, ar->req);
+       if (DEBUGLVL(5)) {
+               struct GUID_txt_buf guid_txt;
+
+               char *s = ldb_ldif_message_string(ldb, ar, LDB_CHANGETYPE_MODIFY, msg);
+               DEBUG(5, ("Initial DRS replication modify message of %s is:\n%s\n"
+                         "%s\n"
+                         "%s\n",
+                         GUID_buf_string(&ar->objs->objects[ar->index_current].object_guid, &guid_txt),
+                         s,
+                         ndr_print_struct_string(s,
+                                                 (ndr_print_fn_t)ndr_print_replPropertyMetaDataBlob,
+                                                 "existing replPropertyMetaData",
+                                                 &omd),
+                         ndr_print_struct_string(s,
+                                                 (ndr_print_fn_t)ndr_print_replPropertyMetaDataBlob,
+                                                 "incoming replPropertyMetaData",
+                                                 rmd)));
+               talloc_free(s);
+       }
+
+       local_isDeleted = ldb_msg_find_attr_as_bool(ar->search_msg,
+                                                   "isDeleted", false);
+       remote_isDeleted = ldb_msg_find_attr_as_bool(msg,
+                                                    "isDeleted", false);
+
+       /*
+        * Fill in the remote_parent_guid with the GUID or an all-zero
+        * GUID.
+        */
+       if (ar->objs->objects[ar->index_current].parent_guid != NULL) {
+               remote_parent_guid = *ar->objs->objects[ar->index_current].parent_guid;
+       } else {
+               remote_parent_guid = GUID_zero();
+       }
+
+       /*
+        * To ensure we follow a complex rename chain around, we have
+        * to confirm that the DN is the same (mostly to confirm the
+        * RDN) and the parentGUID is the same.
+        *
+        * This ensures we keep things under the correct parent, which
+        * replmd_replicated_handle_rename() will do.
+        */
+
+       if (strcmp(ldb_dn_get_linearized(msg->dn), ldb_dn_get_linearized(ar->search_msg->dn)) == 0
+           && GUID_equal(&remote_parent_guid, &ar->local_parent_guid)) {
+               ret = LDB_SUCCESS;
+       } else {
+               /*
+                * handle renames, even just by case that come in over
+                * DRS.  Changes in the parent DN don't hit us here,
+                * because the search for a parent will clean up those
+                * components.
+                *
+                * We also have already filtered out the case where
+                * the peer has an older name to what we have (see
+                * replmd_replicated_apply_search_callback())
+                */
+               ret = replmd_replicated_handle_rename(ar, msg, ar->req, &renamed);
+       }
+
        if (ret != LDB_SUCCESS) {
                ldb_debug(ldb, LDB_DEBUG_FATAL,
                          "replmd_replicated_request rename %s => %s failed - %s\n",
@@ -3626,6 +4789,14 @@ static int replmd_replicated_apply_merge(struct replmd_replicated_request *ar)
                return replmd_replicated_request_werror(ar, WERR_DS_DRA_DB_ERROR);
        }
 
+       if (renamed == true) {
+               /*
+                * Set the callback to one that will fix up the name
+                * metadata on the new conflict DN
+                */
+               callback = replmd_op_name_modify_callback;
+       }
+
        ZERO_STRUCT(nmd);
        nmd.version = 1;
        nmd.ctr.ctr1.count = omd.ctr.ctr1.count + rmd->ctr.ctr1.count;
@@ -3652,8 +4823,10 @@ static int replmd_replicated_apply_merge(struct replmd_replicated_request *ar)
                                continue;
                        }
 
-                       cmp = replmd_replPropertyMetaData1_is_newer(&nmd.ctr.ctr1.array[j],
-                                                                   &rmd->ctr.ctr1.array[i]);
+                       cmp = replmd_replPropertyMetaData1_new_should_be_taken(
+                               ar->objs->dsdb_repl_flags,
+                               &nmd.ctr.ctr1.array[j],
+                               &rmd->ctr.ctr1.array[i]);
                        if (cmp) {
                                /* replace the entry */
                                nmd.ctr.ctr1.array[j] = rmd->ctr.ctr1.array[i];
@@ -3664,6 +4837,16 @@ static int replmd_replicated_apply_merge(struct replmd_replicated_request *ar)
                                        }
                                }
                                nmd.ctr.ctr1.array[j].local_usn = ar->seq_num;
+                               switch (nmd.ctr.ctr1.array[j].attid) {
+                               case DRSUAPI_ATTID_ntSecurityDescriptor:
+                                       sd_updated = true;
+                                       break;
+                               case DRSUAPI_ATTID_isDeleted:
+                                       take_remote_isDeleted = true;
+                                       break;
+                               default:
+                                       break;
+                               }
                                found = true;
                                break;
                        }
@@ -3693,6 +4876,16 @@ static int replmd_replicated_apply_merge(struct replmd_replicated_request *ar)
                        }
                }
                nmd.ctr.ctr1.array[ni].local_usn = ar->seq_num;
+               switch (nmd.ctr.ctr1.array[ni].attid) {
+               case DRSUAPI_ATTID_ntSecurityDescriptor:
+                       sd_updated = true;
+                       break;
+               case DRSUAPI_ATTID_isDeleted:
+                       take_remote_isDeleted = true;
+                       break;
+               default:
+                       break;
+               }
                ni++;
        }
 
@@ -3701,18 +4894,40 @@ static int replmd_replicated_apply_merge(struct replmd_replicated_request *ar)
         */
        nmd.ctr.ctr1.count = ni;
 
+       new_rdn = ldb_dn_get_rdn_val(msg->dn);
+       old_rdn = ldb_dn_get_rdn_val(ar->search_msg->dn);
+
+       if (renamed) {
+               ret = replmd_update_rpmd_rdn_attr(ldb, msg, new_rdn, old_rdn,
+                                                 &nmd, ar, now, is_schema_nc);
+               if (ret != LDB_SUCCESS) {
+                       ldb_asprintf_errstring(ldb, "%s: error during DRS repl merge: %s", __func__, ldb_errstring(ldb));
+                       return replmd_replicated_request_error(ar, ret);
+               }
+       }
        /*
-        * the rdn attribute (the alias for the name attribute),
-        * 'cn' for most objects is the last entry in the meta data array
-        * we have stored
-        *
         * sort the new meta data array
         */
-       ret = replmd_replPropertyMetaDataCtr1_sort(&nmd.ctr.ctr1, ar->schema, msg->dn);
+       ret = replmd_replPropertyMetaDataCtr1_sort_and_verify(ldb, &nmd.ctr.ctr1, msg->dn);
        if (ret != LDB_SUCCESS) {
+               ldb_asprintf_errstring(ldb, "%s: error during DRS repl merge: %s", __func__, ldb_errstring(ldb));
                return ret;
        }
 
+       /*
+        * Work out if this object is deleted, so we can prune any extra attributes.  See MS-DRSR 4.1.10.6.9
+        * UpdateObject.
+        *
+        * This also controls SD propagation below
+        */
+       if (take_remote_isDeleted) {
+               isDeleted = remote_isDeleted;
+       } else {
+               isDeleted = local_isDeleted;
+       }
+
+       ar->isDeleted = isDeleted;
+
        /*
         * check if some replicated attributes left, otherwise skip the ldb_modify() call
         */
@@ -3720,18 +4935,30 @@ static int replmd_replicated_apply_merge(struct replmd_replicated_request *ar)
                ldb_debug(ldb, LDB_DEBUG_TRACE, "replmd_replicated_apply_merge[%u]: skip replace\n",
                          ar->index_current);
 
-               ar->index_current++;
-               return replmd_replicated_apply_next(ar);
+               return replmd_replicated_apply_isDeleted(ar);
        }
 
        ldb_debug(ldb, LDB_DEBUG_TRACE, "replmd_replicated_apply_merge[%u]: replace %u attributes\n",
                  ar->index_current, msg->num_elements);
 
+       if (renamed) {
+               sd_updated = true;
+       }
+
+       if (sd_updated && !isDeleted) {
+               ret = dsdb_module_schedule_sd_propagation(ar->module,
+                                                         ar->objs->partition_dn,
+                                                         msg->dn, true);
+               if (ret != LDB_SUCCESS) {
+                       return ldb_operr(ldb);
+               }
+       }
+
        /* create the meta data value */
        ndr_err = ndr_push_struct_blob(&nmd_value, msg, &nmd,
                                       (ndr_push_flags_fn_t)ndr_push_replPropertyMetaDataBlob);
        if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
-               NTSTATUS nt_status = ndr_map_error2ntstatus(ndr_err);
+               nt_status = ndr_map_error2ntstatus(ndr_err);
                return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
        }
 
@@ -3757,11 +4984,23 @@ static int replmd_replicated_apply_merge(struct replmd_replicated_request *ar)
        /* we want to replace the old values */
        for (i=0; i < msg->num_elements; i++) {
                msg->elements[i].flags = LDB_FLAG_MOD_REPLACE;
+               if (ldb_attr_cmp(msg->elements[i].name, "objectClass") == 0) {
+                       if (msg->elements[i].num_values == 0) {
+                               ldb_asprintf_errstring(ldb, __location__
+                                                      ": objectClass removed on %s, aborting replication\n",
+                                                      ldb_dn_get_linearized(msg->dn));
+                               return replmd_replicated_request_error(ar, LDB_ERR_OBJECT_CLASS_VIOLATION);
+                       }
+               }
        }
 
        if (DEBUGLVL(4)) {
+               struct GUID_txt_buf guid_txt;
+
                char *s = ldb_ldif_message_string(ldb, ar, LDB_CHANGETYPE_MODIFY, msg);
-               DEBUG(4, ("DRS replication modify message:\n%s\n", s));
+               DEBUG(4, ("Final DRS replication modify message of %s:\n%s\n",
+                         GUID_buf_string(&ar->objs->objects[ar->index_current].object_guid, &guid_txt),
+                         s));
                talloc_free(s);
        }
 
@@ -3771,7 +5010,7 @@ static int replmd_replicated_apply_merge(struct replmd_replicated_request *ar)
                                msg,
                                ar->controls,
                                ar,
-                               replmd_op_callback,
+                               callback,
                                ar->req);
        LDB_REQ_SET_LOCATION(change_req);
        if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
@@ -3812,15 +5051,159 @@ static int replmd_replicated_apply_search_callback(struct ldb_request *req,
                break;
 
        case LDB_REPLY_DONE:
-               if (ar->search_msg != NULL) {
-                       ret = replmd_replicated_apply_merge(ar);
+       {
+               struct replPropertyMetaData1 *md_remote;
+               struct replPropertyMetaData1 *md_local;
+
+               struct replPropertyMetaDataBlob omd;
+               const struct ldb_val *omd_value;
+               struct replPropertyMetaDataBlob *rmd;
+               struct ldb_message *msg;
+               int instanceType;
+               ar->objs->objects[ar->index_current].local_parent_dn = NULL;
+               ar->objs->objects[ar->index_current].last_known_parent = NULL;
+
+               /*
+                * This is the ADD case, find the appropriate parent,
+                * as this object doesn't exist locally:
+                */
+               if (ar->search_msg == NULL) {
+                       ret = replmd_replicated_apply_search_for_parent(ar);
+                       if (ret != LDB_SUCCESS) {
+                               return ldb_module_done(ar->req, NULL, NULL, ret);
+                       }
+                       talloc_free(ares);
+                       return LDB_SUCCESS;
+               }
+
+               /*
+                * Otherwise, in the MERGE case, work out if we are
+                * attempting a rename, and if so find the parent the
+                * newly renamed object wants to belong under (which
+                * may not be the parent in it's attached string DN
+                */
+               rmd = ar->objs->objects[ar->index_current].meta_data;
+               ZERO_STRUCT(omd);
+               omd.version = 1;
+
+               /* find existing meta data */
+               omd_value = ldb_msg_find_ldb_val(ar->search_msg, "replPropertyMetaData");
+               if (omd_value) {
+                       enum ndr_err_code ndr_err;
+                       ndr_err = ndr_pull_struct_blob(omd_value, ar, &omd,
+                                                      (ndr_pull_flags_fn_t)ndr_pull_replPropertyMetaDataBlob);
+                       if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
+                               NTSTATUS nt_status = ndr_map_error2ntstatus(ndr_err);
+                               return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
+                       }
+
+                       if (omd.version != 1) {
+                               return replmd_replicated_request_werror(ar, WERR_DS_DRA_INTERNAL_ERROR);
+                       }
+               }
+
+               ar->local_parent_guid = samdb_result_guid(ar->search_msg, "parentGUID");
+
+               instanceType = ldb_msg_find_attr_as_int(ar->search_msg, "instanceType", 0);
+               if (((instanceType & INSTANCE_TYPE_IS_NC_HEAD) == 0)
+                   && GUID_all_zero(&ar->local_parent_guid)) {
+                       DEBUG(0, ("Refusing to replicate new version of %s "
+                                 "as local object has an all-zero parentGUID attribute, "
+                                 "despite not being an NC root\n",
+                                 ldb_dn_get_linearized(ar->search_msg->dn)));
+                       return replmd_replicated_request_werror(ar, WERR_DS_DRA_INTERNAL_ERROR);
+               }
+
+               /*
+                * now we need to check for double renames. We could have a
+                * local rename pending which our replication partner hasn't
+                * received yet. We choose which one wins by looking at the
+                * attribute stamps on the two objects, the newer one wins.
+                *
+                * This also simply applies the correct algorithms for
+                * determining if a change was made to name at all, or
+                * if the object has just been renamed under the same
+                * parent.
+                */
+               md_remote = replmd_replPropertyMetaData1_find_attid(rmd, DRSUAPI_ATTID_name);
+               md_local = replmd_replPropertyMetaData1_find_attid(&omd, DRSUAPI_ATTID_name);
+               if (!md_local) {
+                       DEBUG(0,(__location__ ": Failed to find name attribute in local LDB replPropertyMetaData for %s\n",
+                                ldb_dn_get_linearized(ar->search_msg->dn)));
+                       return replmd_replicated_request_werror(ar, WERR_DS_DRA_DB_ERROR);
+               }
+
+               /*
+                * if there is no name attribute given then we have to assume the
+                *  object we've received has the older name
+                */
+               if (replmd_replPropertyMetaData1_new_should_be_taken(
+                           ar->objs->dsdb_repl_flags & DSDB_REPL_FLAG_PRIORITISE_INCOMING,
+                           md_local, md_remote)) {
+                       struct GUID_txt_buf p_guid_local;
+                       struct GUID_txt_buf p_guid_remote;
+                       msg = ar->objs->objects[ar->index_current].msg;
+
+                       /* Merge on the existing object, with rename */
+
+                       DEBUG(4,(__location__ ": Looking for new parent for object %s currently under %s "
+                                "as incoming object changing to %s under %s\n",
+                                ldb_dn_get_linearized(ar->search_msg->dn),
+                                GUID_buf_string(&ar->local_parent_guid, &p_guid_local),
+                                ldb_dn_get_linearized(msg->dn),
+                                GUID_buf_string(ar->objs->objects[ar->index_current].parent_guid,
+                                                &p_guid_remote)));
+                       ret = replmd_replicated_apply_search_for_parent(ar);
                } else {
-                       ret = replmd_replicated_apply_add(ar);
+                       struct GUID_txt_buf p_guid_local;
+                       struct GUID_txt_buf p_guid_remote;
+                       msg = ar->objs->objects[ar->index_current].msg;
+
+                       /*
+                        * Merge on the existing object, force no
+                        * rename (code below just to explain why in
+                        * the DEBUG() logs)
+                        */
+
+                       if (strcmp(ldb_dn_get_linearized(ar->search_msg->dn),
+                                  ldb_dn_get_linearized(msg->dn)) == 0) {
+                               if (ar->objs->objects[ar->index_current].parent_guid != NULL &&
+                                   GUID_equal(&ar->local_parent_guid,
+                                              ar->objs->objects[ar->index_current].parent_guid)
+                                   == false) {
+                                       DEBUG(4,(__location__ ": Keeping object %s at under %s "
+                                                "despite incoming object changing parent to %s\n",
+                                                ldb_dn_get_linearized(ar->search_msg->dn),
+                                                GUID_buf_string(&ar->local_parent_guid, &p_guid_local),
+                                                GUID_buf_string(ar->objs->objects[ar->index_current].parent_guid,
+                                                                &p_guid_remote)));
+                               }
+                       } else {
+                               DEBUG(4,(__location__ ": Keeping object %s at under %s "
+                                        " and rejecting older rename to %s under %s\n",
+                                        ldb_dn_get_linearized(ar->search_msg->dn),
+                                        GUID_buf_string(&ar->local_parent_guid, &p_guid_local),
+                                        ldb_dn_get_linearized(msg->dn),
+                                        GUID_buf_string(ar->objs->objects[ar->index_current].parent_guid,
+                                                        &p_guid_remote)));
+                       }
+                       /*
+                        * This assignment ensures that the strcmp()
+                        * and GUID_equal() calls in
+                        * replmd_replicated_apply_merge() avoids the
+                        * rename call
+                        */
+                       ar->objs->objects[ar->index_current].parent_guid =
+                               &ar->local_parent_guid;
+
+                       msg->dn = ar->search_msg->dn;
+                       ret = replmd_replicated_apply_merge(ar);
                }
                if (ret != LDB_SUCCESS) {
                        return ldb_module_done(ar->req, NULL, NULL, ret);
                }
        }
+       }
 
        talloc_free(ares);
        return LDB_SUCCESS;
@@ -3835,7 +5218,10 @@ static int replmd_replicated_apply_next(struct replmd_replicated_request *ar)
        char *tmp_str;
        char *filter;
        struct ldb_request *search_req;
-       struct ldb_search_options_control *options;
+       static const char *attrs[] = { "*", "parentGUID", "instanceType",
+                                      "replPropertyMetaData", "nTSecurityDescriptor",
+                                      NULL };
+       struct GUID_txt_buf guid_str_buf;
 
        if (ar->index_current >= ar->objs->num_objects) {
                /* done with it, go to next stage */
@@ -3844,50 +5230,111 @@ static int replmd_replicated_apply_next(struct replmd_replicated_request *ar)
 
        ldb = ldb_module_get_ctx(ar->module);
        ar->search_msg = NULL;
+       ar->isDeleted = false;
 
-       tmp_str = ldb_binary_encode(ar, ar->objs->objects[ar->index_current].guid_value);
-       if (!tmp_str) return replmd_replicated_request_werror(ar, WERR_NOMEM);
+       tmp_str = GUID_buf_string(&ar->objs->objects[ar->index_current].object_guid,
+                                 &guid_str_buf);
 
        filter = talloc_asprintf(ar, "(objectGUID=%s)", tmp_str);
        if (!filter) return replmd_replicated_request_werror(ar, WERR_NOMEM);
-       talloc_free(tmp_str);
 
        ret = ldb_build_search_req(&search_req,
                                   ldb,
                                   ar,
-                                  NULL,
+                                  ar->objs->partition_dn,
                                   LDB_SCOPE_SUBTREE,
                                   filter,
-                                  NULL,
+                                  attrs,
                                   NULL,
                                   ar,
                                   replmd_replicated_apply_search_callback,
                                   ar->req);
        LDB_REQ_SET_LOCATION(search_req);
 
-       ret = ldb_request_add_control(search_req, LDB_CONTROL_SHOW_RECYCLED_OID,
-                                     true, NULL);
+       ret = dsdb_request_add_controls(search_req, DSDB_SEARCH_SHOW_RECYCLED);
+
        if (ret != LDB_SUCCESS) {
                return ret;
        }
 
-       /* we need to cope with cross-partition links, so search for
-          the GUID over all partitions */
-       options = talloc(search_req, struct ldb_search_options_control);
-       if (options == NULL) {
-               DEBUG(0, (__location__ ": out of memory\n"));
-               return LDB_ERR_OPERATIONS_ERROR;
-       }
-       options->search_options = LDB_SEARCH_OPTION_PHANTOM_ROOT;
+       return ldb_next_request(ar->module, search_req);
+}
 
-       ret = ldb_request_add_control(search_req,
-                                     LDB_CONTROL_SEARCH_OPTIONS_OID,
-                                     true, options);
-       if (ret != LDB_SUCCESS) {
-               return ret;
+/*
+ * This is essentially a wrapper for replmd_replicated_apply_next()
+ *
+ * This is needed to ensure that both codepaths call this handler.
+ */
+static int replmd_replicated_apply_isDeleted(struct replmd_replicated_request *ar)
+{
+       struct ldb_dn *deleted_objects_dn;
+       struct ldb_message *msg = ar->objs->objects[ar->index_current].msg;
+       int ret = dsdb_get_deleted_objects_dn(ldb_module_get_ctx(ar->module), msg, msg->dn,
+                                             &deleted_objects_dn);
+       if (ar->isDeleted && (ret != LDB_SUCCESS || ldb_dn_compare(msg->dn, deleted_objects_dn) != 0)) {
+               /*
+                * Do a delete here again, so that if there is
+                * anything local that conflicts with this
+                * object being deleted, it is removed.  This
+                * includes links.  See MS-DRSR 4.1.10.6.9
+                * UpdateObject.
+                *
+                * If the object is already deleted, and there
+                * is no more work required, it doesn't do
+                * anything.
+                */
+
+               /* This has been updated to point to the DN we eventually did the modify on */
+
+               struct ldb_request *del_req;
+               struct ldb_result *res;
+
+               TALLOC_CTX *tmp_ctx = talloc_new(ar);
+               if (!tmp_ctx) {
+                       ret = ldb_oom(ldb_module_get_ctx(ar->module));
+                       return ret;
+               }
+
+               res = talloc_zero(tmp_ctx, struct ldb_result);
+               if (!res) {
+                       ret = ldb_oom(ldb_module_get_ctx(ar->module));
+                       talloc_free(tmp_ctx);
+                       return ret;
+               }
+
+               /* Build a delete request, which hopefully will artually turn into nothing */
+               ret = ldb_build_del_req(&del_req, ldb_module_get_ctx(ar->module), tmp_ctx,
+                                       msg->dn,
+                                       NULL,
+                                       res,
+                                       ldb_modify_default_callback,
+                                       ar->req);
+               LDB_REQ_SET_LOCATION(del_req);
+               if (ret != LDB_SUCCESS) {
+                       talloc_free(tmp_ctx);
+                       return ret;
+               }
+
+               /*
+                * This is the guts of the call, call back
+                * into our delete code, but setting the
+                * re_delete flag so we delete anything that
+                * shouldn't be there on a deleted or recycled
+                * object
+                */
+               ret = replmd_delete_internals(ar->module, del_req, true);
+               if (ret == LDB_SUCCESS) {
+                       ret = ldb_wait(del_req->handle, LDB_WAIT_ALL);
+               }
+
+               talloc_free(tmp_ctx);
+               if (ret != LDB_SUCCESS) {
+                       return ret;
+               }
        }
 
-       return ldb_next_request(ar->module, search_req);
+       ar->index_current++;
+       return replmd_replicated_apply_next(ar);
 }
 
 static int replmd_replicated_uptodate_modify_callback(struct ldb_request *req,
@@ -3908,7 +5355,7 @@ static int replmd_replicated_uptodate_modify_callback(struct ldb_request *req,
        }
 
        if (ares->type != LDB_REPLY_DONE) {
-               ldb_set_errstring(ldb, "Invalid reply type\n!");
+               ldb_asprintf_errstring(ldb, "Invalid LDB reply type %d", ares->type);
                return ldb_module_done(ar->req, NULL, NULL,
                                        LDB_ERR_OPERATIONS_ERROR);
        }
@@ -3930,7 +5377,6 @@ static int replmd_replicated_uptodate_modify(struct replmd_replicated_request *a
        struct replUpToDateVectorBlob nuv;
        struct ldb_val nuv_value;
        struct ldb_message_element *nuv_el = NULL;
-       const struct GUID *our_invocation_id;
        struct ldb_message_element *orf_el = NULL;
        struct repsFromToBlob nrf;
        struct ldb_val *nrf_value = NULL;
@@ -3952,6 +5398,15 @@ static int replmd_replicated_uptodate_modify(struct replmd_replicated_request *a
 
        unix_to_nt_time(&now, t);
 
+       if (ar->search_msg == NULL) {
+               /* this happens for a REPL_OBJ call where we are
+                  creating the target object by replicating it. The
+                  subdomain join code does this for the partition DN
+               */
+               DEBUG(4,(__location__ ": Skipping UDV and repsFrom update as no target DN\n"));
+               return ldb_module_done(ar->req, NULL, NULL, LDB_SUCCESS);
+       }
+
        instanceType = ldb_msg_find_attr_as_uint(ar->search_msg, "instanceType", 0);
        if (! (instanceType & INSTANCE_TYPE_IS_NC_HEAD)) {
                DEBUG(4,(__location__ ": Skipping UDV and repsFrom update as not NC root: %s\n",
@@ -3982,7 +5437,7 @@ static int replmd_replicated_uptodate_modify(struct replmd_replicated_request *a
         *
         * plus optional values from our old vector and the one from the source_dsa
         */
-       nuv.ctr.ctr2.count = 1 + ouv.ctr.ctr2.count;
+       nuv.ctr.ctr2.count = ouv.ctr.ctr2.count;
        if (ruv) nuv.ctr.ctr2.count += ruv->count;
        nuv.ctr.ctr2.cursors = talloc_array(ar,
                                            struct drsuapi_DsReplicaCursor2,
@@ -3995,16 +5450,12 @@ static int replmd_replicated_uptodate_modify(struct replmd_replicated_request *a
                ni++;
        }
 
-       /* get our invocation_id if we have one already attached to the ldb */
-       our_invocation_id = samdb_ntds_invocation_id(ldb);
-
        /* merge in the source_dsa vector is available */
        for (i=0; (ruv && i < ruv->count); i++) {
                found = false;
 
-               if (our_invocation_id &&
-                   GUID_equal(&ruv->cursors[i].source_dsa_invocation_id,
-                              our_invocation_id)) {
+               if (GUID_equal(&ruv->cursors[i].source_dsa_invocation_id,
+                              &ar->our_invocation_id)) {
                        continue;
                }
 
@@ -4016,12 +5467,8 @@ static int replmd_replicated_uptodate_modify(struct replmd_replicated_request *a
 
                        found = true;
 
-                       /*
-                        * we update only the highest_usn and not the latest_sync_success time,
-                        * because the last success stands for direct replication
-                        */
                        if (ruv->cursors[i].highest_usn > nuv.ctr.ctr2.cursors[j].highest_usn) {
-                               nuv.ctr.ctr2.cursors[j].highest_usn = ruv->cursors[i].highest_usn;
+                               nuv.ctr.ctr2.cursors[j] = ruv->cursors[i];
                        }
                        break;
                }
@@ -4033,43 +5480,6 @@ static int replmd_replicated_uptodate_modify(struct replmd_replicated_request *a
                ni++;
        }
 
-       /*
-        * merge in the current highwatermark for the source_dsa
-        */
-       found = false;
-       for (j=0; j < ni; j++) {
-               if (!GUID_equal(&ar->objs->source_dsa->source_dsa_invocation_id,
-                               &nuv.ctr.ctr2.cursors[j].source_dsa_invocation_id)) {
-                       continue;
-               }
-
-               found = true;
-
-               /*
-                * here we update the highest_usn and last_sync_success time
-                * because we're directly replicating from the source_dsa
-                *
-                * and use the tmp_highest_usn because this is what we have just applied
-                * to our ldb
-                */
-               nuv.ctr.ctr2.cursors[j].highest_usn             = ar->objs->source_dsa->highwatermark.tmp_highest_usn;
-               nuv.ctr.ctr2.cursors[j].last_sync_success       = now;
-               break;
-       }
-       if (!found) {
-               /*
-                * here we update the highest_usn and last_sync_success time
-                * because we're directly replicating from the source_dsa
-                *
-                * and use the tmp_highest_usn because this is what we have just applied
-                * to our ldb
-                */
-               nuv.ctr.ctr2.cursors[ni].source_dsa_invocation_id= ar->objs->source_dsa->source_dsa_invocation_id;
-               nuv.ctr.ctr2.cursors[ni].highest_usn            = ar->objs->source_dsa->highwatermark.tmp_highest_usn;
-               nuv.ctr.ctr2.cursors[ni].last_sync_success      = now;
-               ni++;
-       }
-
        /*
         * finally correct the size of the cursors array
         */
@@ -4105,7 +5515,9 @@ static int replmd_replicated_uptodate_modify(struct replmd_replicated_request *a
        ZERO_STRUCT(nrf);
        nrf.version                                     = 1;
        nrf.ctr.ctr1                                    = *ar->objs->source_dsa;
-       nrf.ctr.ctr1.highwatermark.highest_usn          = nrf.ctr.ctr1.highwatermark.tmp_highest_usn;
+       nrf.ctr.ctr1.last_attempt                       = now;
+       nrf.ctr.ctr1.last_success                       = now;
+       nrf.ctr.ctr1.result_last_attempt                = WERR_OK;
 
        /*
         * first see if we already have a repsFrom value for the current source dsa
@@ -4183,7 +5595,7 @@ static int replmd_replicated_uptodate_modify(struct replmd_replicated_request *a
         */
        nrf_el->flags = LDB_FLAG_MOD_REPLACE;
 
-       if (DEBUGLVL(4)) {
+       if (CHECK_DEBUGLVL(4)) {
                char *s = ldb_ldif_message_string(ldb, ar, LDB_CHANGETYPE_MODIFY, msg);
                DEBUG(4, ("DRS replication uptodate modify message:\n%s\n", s));
                talloc_free(s);
@@ -4231,11 +5643,7 @@ static int replmd_replicated_uptodate_search_callback(struct ldb_request *req,
                break;
 
        case LDB_REPLY_DONE:
-               if (ar->search_msg == NULL) {
-                       ret = replmd_replicated_request_werror(ar, WERR_DS_DRA_INTERNAL_ERROR);
-               } else {
-                       ret = replmd_replicated_uptodate_modify(ar);
-               }
+               ret = replmd_replicated_uptodate_modify(ar);
                if (ret != LDB_SUCCESS) {
                        return ldb_module_done(ar->req, NULL, NULL, ret);
                }
@@ -4290,6 +5698,7 @@ static int replmd_extended_replicated_objects(struct ldb_module *module, struct
        uint32_t i;
        struct replmd_private *replmd_private =
                talloc_get_type(ldb_module_get_private(module), struct replmd_private);
+       struct dsdb_control_replicated_update *rep_update;
 
        ldb = ldb_module_get_ctx(module);
 
@@ -4330,8 +5739,15 @@ static int replmd_extended_replicated_objects(struct ldb_module *module, struct
                if (!req->controls) return replmd_replicated_request_werror(ar, WERR_NOMEM);
        }
 
-       /* This allows layers further down to know if a change came in over replication */
-       ret = ldb_request_add_control(req, DSDB_CONTROL_REPLICATED_UPDATE_OID, false, NULL);
+       /* This allows layers further down to know if a change came in
+          over replication and what the replication flags were */
+       rep_update = talloc_zero(ar, struct dsdb_control_replicated_update);
+       if (rep_update == NULL) {
+               return ldb_module_oom(module);
+       }
+       rep_update->dsdb_repl_flags = objs->dsdb_repl_flags;
+
+       ret = ldb_request_add_control(req, DSDB_CONTROL_REPLICATED_UPDATE_OID, false, rep_update);
        if (ret != LDB_SUCCESS) {
                return ret;
        }
@@ -4391,6 +5807,7 @@ static int replmd_process_linked_attribute(struct ldb_module *module,
        struct drsuapi_DsReplicaLinkedAttribute *la = la_entry->la;
        struct ldb_context *ldb = ldb_module_get_ctx(module);
        struct ldb_message *msg;
+       struct ldb_message *target_msg = NULL;
        TALLOC_CTX *tmp_ctx = talloc_new(la_entry);
        const struct dsdb_schema *schema = dsdb_get_schema(ldb, tmp_ctx);
        int ret;
@@ -4401,13 +5818,18 @@ static int replmd_process_linked_attribute(struct ldb_module *module,
        WERROR status;
        time_t t = time(NULL);
        struct ldb_result *res;
-       const char *attrs[2];
+       struct ldb_result *target_res;
+       const char *attrs[4];
+       const char *attrs2[] = { "isDeleted", "isRecycled", NULL };
        struct parsed_dn *pdn_list, *pdn;
        struct GUID guid = GUID_zero();
        NTSTATUS ntstatus;
        bool active = (la->flags & DRSUAPI_DS_LINKED_ATTRIBUTE_FLAG_ACTIVE)?true:false;
        const struct GUID *our_invocation_id;
 
+       enum deletion_state deletion_state = OBJECT_NOT_DELETED;
+       enum deletion_state target_deletion_state = OBJECT_NOT_DELETED;
+
 /*
 linked_attributes[0]:
      &objs->linked_attributes[i]: struct drsuapi_DsReplicaLinkedAttribute
@@ -4452,7 +5874,9 @@ linked_attributes[0]:
        }
 
        attrs[0] = attr->lDAPDisplayName;
-       attrs[1] = NULL;
+       attrs[1] = "isDeleted";
+       attrs[2] = "isRecycled";
+       attrs[3] = NULL;
 
        /* get the existing message from the db for the object with
           this GUID, returning attribute being modified. We will then
@@ -4477,7 +5901,23 @@ linked_attributes[0]:
        }
        msg = res->msgs[0];
 
-       if (msg->num_elements == 0) {
+       /*
+        * Check for deleted objects per MS-DRSR 4.1.10.6.13
+        * ProcessLinkValue, because link updates are not applied to
+        * recycled and tombstone objects.  We don't have to delete
+        * any existing link, that should have happened when the
+        * object deletion was replicated or initiated.
+        */
+
+       replmd_deletion_state(module, msg, &deletion_state, NULL);
+
+       if (deletion_state >= OBJECT_RECYCLED) {
+               talloc_free(tmp_ctx);
+               return LDB_SUCCESS;
+       }
+
+       old_el = ldb_msg_find_element(msg, attr->lDAPDisplayName);
+       if (old_el == NULL) {
                ret = ldb_msg_add_empty(msg, attr->lDAPDisplayName, LDB_FLAG_MOD_REPLACE, &old_el);
                if (ret != LDB_SUCCESS) {
                        ldb_module_oom(module);
@@ -4485,7 +5925,6 @@ linked_attributes[0]:
                        return LDB_ERR_OPERATIONS_ERROR;
                }
        } else {
-               old_el = &msg->elements[0];
                old_el->flags = LDB_FLAG_MOD_REPLACE;
        }
 
@@ -4514,25 +5953,91 @@ linked_attributes[0]:
        if (!W_ERROR_IS_OK(status)) {
                ldb_asprintf_errstring(ldb, "Failed to parsed linked attribute blob for %s on %s - %s\n",
                                       old_el->name, ldb_dn_get_linearized(msg->dn), win_errstr(status));
+               talloc_free(tmp_ctx);
                return LDB_ERR_OPERATIONS_ERROR;
        }
 
        ntstatus = dsdb_get_extended_dn_guid(dsdb_dn->dn, &guid, "GUID");
-       if (!NT_STATUS_IS_OK(ntstatus) && active) {
+       if (!NT_STATUS_IS_OK(ntstatus) && !active) {
+               /*
+                * This strange behaviour (allowing a NULL/missing
+                * GUID) originally comes from:
+                *
+                * commit e3054ce0fe0f8f62d2f5b2a77893e7a1479128bd
+                * Author: Andrew Tridgell <tridge@samba.org>
+                * Date:   Mon Dec 21 21:21:55 2009 +1100
+                *
+                *  s4-drs: cope better with NULL GUIDS from DRS
+                *
+                *  It is valid to get a NULL GUID over DRS for a deleted forward link. We
+                *  need to match by DN if possible when seeing if we should update an
+                *  existing link.
+                *
+                *  Pair-Programmed-With: Andrew Bartlett <abartlet@samba.org>
+                */
+
+               ret = dsdb_module_search_dn(module, tmp_ctx, &target_res,
+                                           dsdb_dn->dn, attrs2,
+                                           DSDB_FLAG_NEXT_MODULE |
+                                           DSDB_SEARCH_SHOW_RECYCLED |
+                                           DSDB_SEARCH_SEARCH_ALL_PARTITIONS |
+                                           DSDB_SEARCH_SHOW_DN_IN_STORAGE_FORMAT,
+                                           parent);
+       } else if (!NT_STATUS_IS_OK(ntstatus)) {
                ldb_asprintf_errstring(ldb, "Failed to find GUID in linked attribute blob for %s on %s from %s",
                                       old_el->name,
                                       ldb_dn_get_linearized(dsdb_dn->dn),
                                       ldb_dn_get_linearized(msg->dn));
+               talloc_free(tmp_ctx);
                return LDB_ERR_OPERATIONS_ERROR;
+       } else {
+               ret = dsdb_module_search(module, tmp_ctx, &target_res,
+                                        NULL, LDB_SCOPE_SUBTREE,
+                                        attrs2,
+                                        DSDB_FLAG_NEXT_MODULE |
+                                        DSDB_SEARCH_SHOW_RECYCLED |
+                                        DSDB_SEARCH_SEARCH_ALL_PARTITIONS |
+                                        DSDB_SEARCH_SHOW_DN_IN_STORAGE_FORMAT,
+                                        parent,
+                                        "objectGUID=%s",
+                                        GUID_string(tmp_ctx, &guid));
        }
 
-       /* re-resolve the DN by GUID, as the DRS server may give us an
-          old DN value */
-       ret = dsdb_module_dn_by_guid(module, dsdb_dn, &guid, &dsdb_dn->dn, parent);
        if (ret != LDB_SUCCESS) {
-               DEBUG(2,(__location__ ": WARNING: Failed to re-resolve GUID %s - using %s",
+               ldb_asprintf_errstring(ldb_module_get_ctx(module), "Failed to re-resolve GUID %s: %s\n",
+                                      GUID_string(tmp_ctx, &guid),
+                                      ldb_errstring(ldb_module_get_ctx(module)));
+               talloc_free(tmp_ctx);
+               return ret;
+       }
+
+       if (target_res->count == 0) {
+               DEBUG(2,(__location__ ": WARNING: Failed to re-resolve GUID %s - using %s\n",
                         GUID_string(tmp_ctx, &guid),
                         ldb_dn_get_linearized(dsdb_dn->dn)));
+       } else if (target_res->count != 1) {
+               ldb_asprintf_errstring(ldb_module_get_ctx(module), "More than one object found matching objectGUID %s\n",
+                                      GUID_string(tmp_ctx, &guid));
+               talloc_free(tmp_ctx);
+               return LDB_ERR_OPERATIONS_ERROR;
+       } else {
+               target_msg = target_res->msgs[0];
+               dsdb_dn->dn = talloc_steal(dsdb_dn, target_msg->dn);
+       }
+
+       /*
+        * Check for deleted objects per MS-DRSR 4.1.10.6.13
+        * ProcessLinkValue, because link updates are not applied to
+        * recycled and tombstone objects.  We don't have to delete
+        * any existing link, that should have happened when the
+        * object deletion was replicated or initiated.
+        */
+       replmd_deletion_state(module, target_msg,
+                             &target_deletion_state, NULL);
+
+       if (target_deletion_state >= OBJECT_RECYCLED) {
+               talloc_free(tmp_ctx);
+               return LDB_SUCCESS;
        }
 
        /* see if this link already exists */
@@ -4572,7 +6077,7 @@ linked_attributes[0]:
 
                if (!(rmd_flags & DSDB_RMD_FLAG_DELETED)) {
                        /* remove the existing backlink */
-                       ret = replmd_add_backlink(module, schema, &la->identifier->guid, &guid, false, attr, false);
+                       ret = replmd_add_backlink(module, schema, &la->identifier->guid, &guid, false, attr, true);
                        if (ret != LDB_SUCCESS) {
                                talloc_free(tmp_ctx);
                                return ret;
@@ -4592,7 +6097,7 @@ linked_attributes[0]:
 
                if (active) {
                        /* add the new backlink */
-                       ret = replmd_add_backlink(module, schema, &la->identifier->guid, &guid, true, attr, false);
+                       ret = replmd_add_backlink(module, schema, &la->identifier->guid, &guid, true, attr, true);
                        if (ret != LDB_SUCCESS) {
                                talloc_free(tmp_ctx);
                                return ret;
@@ -4627,7 +6132,7 @@ linked_attributes[0]:
 
                if (active) {
                        ret = replmd_add_backlink(module, schema, &la->identifier->guid, &guid,
-                                                 true, attr, false);
+                                                 true, attr, true);
                        if (ret != LDB_SUCCESS) {
                                talloc_free(tmp_ctx);
                                return ret;
@@ -4637,15 +6142,18 @@ linked_attributes[0]:
 
        /* we only change whenChanged and uSNChanged if the seq_num
           has changed */
-       if (add_time_element(msg, "whenChanged", t) != LDB_SUCCESS) {
+       ret = add_time_element(msg, "whenChanged", t);
+       if (ret != LDB_SUCCESS) {
                talloc_free(tmp_ctx);
-               return ldb_operr(ldb);
+               ldb_operr(ldb);
+               return ret;
        }
 
-       if (add_uint64_element(ldb, msg, "uSNChanged",
-                              seq_num) != LDB_SUCCESS) {
+       ret = add_uint64_element(ldb, msg, "uSNChanged", seq_num);
+       if (ret != LDB_SUCCESS) {
                talloc_free(tmp_ctx);
-               return ldb_operr(ldb);
+               ldb_operr(ldb);
+               return ret;
        }
 
        old_el = ldb_msg_find_element(msg, attr->lDAPDisplayName);