dsdb: Make less talloc() for parsed_dn.guid
[metze/samba/wip.git] / source4 / dsdb / samdb / ldb_modules / repl_meta_data.c
index f1d3958c91f80e6f49003203d54dd57bd0d040d2..347cf0f8d49986c539c1a42bdb32df063fa48512 100644 (file)
@@ -689,24 +689,6 @@ static int replmd_replPropertyMetaData1_attid_sort(const struct replPropertyMeta
                return 0;
        }
 
-       /*
-        * the rdn attribute should be at the end!
-        * so we need to return a value greater than zero
-        * which means m1 is greater than m2
-        */
-       if (attid_1 == *rdn_attid) {
-               return 1;
-       }
-
-       /*
-        * the rdn attribute should be at the end!
-        * so we need to return a value less than zero
-        * which means m2 is greater than m1
-        */
-       if (attid_2 == *rdn_attid) {
-               return -1;
-       }
-
        /*
         * See above regarding this being an unsigned comparison.
         * Otherwise when the high bit is set on non-standard
@@ -718,7 +700,6 @@ static int replmd_replPropertyMetaData1_attid_sort(const struct replPropertyMeta
 
 static int replmd_replPropertyMetaDataCtr1_verify(struct ldb_context *ldb,
                                                  struct replPropertyMetaDataCtr1 *ctr1,
-                                                 const struct dsdb_attribute *rdn_sa,
                                                  struct ldb_dn *dn)
 {
        if (ctr1->count == 0) {
@@ -741,34 +722,12 @@ static int replmd_replPropertyMetaDataCtr1_verify(struct ldb_context *ldb,
 
 static int replmd_replPropertyMetaDataCtr1_sort_and_verify(struct ldb_context *ldb,
                                                           struct replPropertyMetaDataCtr1 *ctr1,
-                                                          const struct dsdb_schema *schema,
                                                           struct ldb_dn *dn)
 {
-       const char *rdn_name;
-       const struct dsdb_attribute *rdn_sa;
-
-       rdn_name = ldb_dn_get_rdn_name(dn);
-       if (!rdn_name) {
-               ldb_debug_set(ldb, LDB_DEBUG_FATAL,
-                             __location__ ": No rDN for %s?\n",
-                             ldb_dn_get_linearized(dn));
-               return LDB_ERR_INVALID_DN_SYNTAX;
-       }
-
-       rdn_sa = dsdb_attribute_by_lDAPDisplayName(schema, rdn_name);
-       if (rdn_sa == NULL) {
-               ldb_debug_set(ldb, LDB_DEBUG_FATAL,
-                             __location__ ": No sa found for rDN %s for %s\n",
-                             rdn_name, ldb_dn_get_linearized(dn));
-               return LDB_ERR_UNDEFINED_ATTRIBUTE_TYPE;
-       }
-
-       DEBUG(6,("Sorting rpmd with attid exception %u rDN=%s DN=%s\n",
-                rdn_sa->attributeID_id, rdn_name, ldb_dn_get_linearized(dn)));
-
-       LDB_TYPESAFE_QSORT(ctr1->array, ctr1->count, &rdn_sa->attributeID_id,
+       /* Note this is O(n^2) for the almost-sorted case, which this is */
+       LDB_TYPESAFE_QSORT(ctr1->array, ctr1->count, NULL,
                           replmd_replPropertyMetaData1_attid_sort);
-       return replmd_replPropertyMetaDataCtr1_verify(ldb, ctr1, rdn_sa, dn);
+       return replmd_replPropertyMetaDataCtr1_verify(ldb, ctr1, dn);
 }
 
 static int replmd_ldb_message_element_attid_sort(const struct ldb_message_element *e1,
@@ -1083,9 +1042,9 @@ static int replmd_add(struct ldb_module *module, struct ldb_request *req)
        nmd.ctr.ctr1.count = ni;
 
        /*
-        * sort meta data array, and move the rdn attribute entry to the end
+        * sort meta data array
         */
-       ret = replmd_replPropertyMetaDataCtr1_sort_and_verify(ldb, &nmd.ctr.ctr1, ac->schema, msg->dn);
+       ret = replmd_replPropertyMetaDataCtr1_sort_and_verify(ldb, &nmd.ctr.ctr1, msg->dn);
        if (ret != LDB_SUCCESS) {
                ldb_asprintf_errstring(ldb, "%s: error during direct ADD: %s", __func__, ldb_errstring(ldb));
                talloc_free(ac);
@@ -1367,6 +1326,52 @@ static int replmd_update_rpmd_element(struct ldb_context *ldb,
        return LDB_SUCCESS;
 }
 
+/*
+ * Bump the replPropertyMetaData version on an attribute, and if it
+ * has changed (or forced by leaving rdn_old NULL), update the value
+ * in the entry.
+ *
+ * This is important, as calling a modify operation may not change the
+ * version number if the values appear unchanged, but a rename between
+ * parents bumps this value.
+ *
+ */
+static int replmd_update_rpmd_rdn_attr(struct ldb_context *ldb,
+                                      struct ldb_message *msg,
+                                      const struct ldb_val *rdn_new,
+                                      const struct ldb_val *rdn_old,
+                                      struct replPropertyMetaDataBlob *omd,
+                                      struct replmd_replicated_request *ar,
+                                      NTTIME now,
+                                      bool is_schema_nc)
+{
+       struct ldb_message_element new_el = {
+               .flags = LDB_FLAG_MOD_REPLACE,
+               .name = ldb_dn_get_rdn_name(msg->dn),
+               .num_values = 1,
+               .values = discard_const_p(struct ldb_val, rdn_new)
+       };
+       struct ldb_message_element old_el = {
+               .flags = LDB_FLAG_MOD_REPLACE,
+               .name = ldb_dn_get_rdn_name(msg->dn),
+               .num_values = rdn_old ? 1 : 0,
+               .values = discard_const_p(struct ldb_val, rdn_old)
+       };
+
+       if (ldb_msg_element_equal_ordered(&new_el, &old_el) == false) {
+               int ret = ldb_msg_add(msg, &new_el, LDB_FLAG_MOD_REPLACE);
+               if (ret != LDB_SUCCESS) {
+                       return ldb_oom(ldb);
+               }
+       }
+
+       return replmd_update_rpmd_element(ldb, msg, &new_el, NULL,
+                                         omd, ar->schema, &ar->seq_num,
+                                         &ar->our_invocation_id,
+                                         now, is_schema_nc, ar->req);
+
+}
+
 static uint64_t find_max_local_usn(struct replPropertyMetaDataBlob omd)
 {
        uint32_t count = omd.ctr.ctr1.count;
@@ -1402,7 +1407,6 @@ static int replmd_update_rpmd(struct ldb_module *module,
        const struct GUID *our_invocation_id;
        int ret;
        const char * const *attrs = NULL;
-       const char * const attrs1[] = { "replPropertyMetaData", "*", NULL };
        const char * const attrs2[] = { "uSNChanged", "objectClass", "instanceType", NULL };
        struct ldb_result *res;
        struct ldb_context *ldb;
@@ -1410,11 +1414,19 @@ static int replmd_update_rpmd(struct ldb_module *module,
        enum urgent_situation situation;
        bool rmd_is_provided;
        bool rmd_is_just_resorted = false;
-
+       const char *not_rename_attrs[4 + msg->num_elements];
+       
        if (rename_attrs) {
                attrs = rename_attrs;
        } else {
-               attrs = attrs1;
+               for (i = 0; i < msg->num_elements; i++) {
+                       not_rename_attrs[i] = msg->elements[i].name;
+               }
+               not_rename_attrs[i] = "replPropertyMetaData";
+               not_rename_attrs[i+1] = "objectClass";
+               not_rename_attrs[i+2] = "instanceType";
+               not_rename_attrs[i+3] = NULL;
+               attrs = not_rename_attrs;
        }
 
        ldb = ldb_module_get_ctx(module);
@@ -1618,7 +1630,7 @@ static int replmd_update_rpmd(struct ldb_module *module,
                        return LDB_ERR_OPERATIONS_ERROR;
                }
 
-               ret = replmd_replPropertyMetaDataCtr1_sort_and_verify(ldb, &omd.ctr.ctr1, schema, msg->dn);
+               ret = replmd_replPropertyMetaDataCtr1_sort_and_verify(ldb, &omd.ctr.ctr1, msg->dn);
                if (ret != LDB_SUCCESS) {
                        ldb_asprintf_errstring(ldb, "%s: %s", __func__, ldb_errstring(ldb));
                        return ret;
@@ -1648,13 +1660,18 @@ static int replmd_update_rpmd(struct ldb_module *module,
 
 struct parsed_dn {
        struct dsdb_dn *dsdb_dn;
-       struct GUID *guid;
+       struct GUID guid;
        struct ldb_val *v;
 };
 
 static int parsed_dn_compare(struct parsed_dn *pdn1, struct parsed_dn *pdn2)
 {
-       return GUID_compare(pdn1->guid, pdn2->guid);
+       return GUID_compare(&pdn1->guid, &pdn2->guid);
+}
+
+static int GUID_compare_struct(struct GUID *g1, struct GUID g2)
+{
+       return GUID_compare(g1, &g2);
 }
 
 static struct parsed_dn *parsed_dn_find(struct parsed_dn *pdn,
@@ -1674,7 +1691,7 @@ static struct parsed_dn *parsed_dn_find(struct parsed_dn *pdn,
                }
                return NULL;
        }
-       BINARY_ARRAY_SEARCH(pdn, count, guid, guid, GUID_compare, ret);
+       BINARY_ARRAY_SEARCH(pdn, count, guid, guid, GUID_compare_struct, ret);
        return ret;
 }
 
@@ -1715,16 +1732,10 @@ static int get_parsed_dns(struct ldb_module *module, TALLOC_CTX *mem_ctx,
 
                dn = p->dsdb_dn->dn;
 
-               p->guid = talloc(*pdn, struct GUID);
-               if (p->guid == NULL) {
-                       ldb_module_oom(module);
-                       return LDB_ERR_OPERATIONS_ERROR;
-               }
-
-               status = dsdb_get_extended_dn_guid(dn, p->guid, "GUID");
+               status = dsdb_get_extended_dn_guid(dn, &p->guid, "GUID");
                if (NT_STATUS_EQUAL(status, NT_STATUS_OBJECT_NAME_NOT_FOUND)) {
                        /* we got a DN without a GUID - go find the GUID */
-                       int ret = dsdb_module_guid_by_dn(module, dn, p->guid, parent);
+                       int ret = dsdb_module_guid_by_dn(module, dn, &p->guid, parent);
                        if (ret != LDB_SUCCESS) {
                                ldb_asprintf_errstring(ldb, "Unable to find GUID for DN %s\n",
                                                       ldb_dn_get_linearized(dn));
@@ -1735,7 +1746,7 @@ static int get_parsed_dns(struct ldb_module *module, TALLOC_CTX *mem_ctx,
                                }
                                return ret;
                        }
-                       ret = dsdb_set_extended_dn_guid(dn, p->guid, "GUID");
+                       ret = dsdb_set_extended_dn_guid(dn, &p->guid, "GUID");
                        if (ret != LDB_SUCCESS) {
                                return ret;
                        }
@@ -2025,7 +2036,7 @@ static int replmd_modify_la_add(struct ldb_module *module,
 
        /* for each new value, see if it exists already with the same GUID */
        for (i=0; i<el->num_values; i++) {
-               struct parsed_dn *p = parsed_dn_find(old_dns, old_num_values, dns[i].guid, NULL);
+               struct parsed_dn *p = parsed_dn_find(old_dns, old_num_values, &dns[i].guid, NULL);
                if (p == NULL) {
                        /* this is a new linked attribute value */
                        new_values = talloc_realloc(tmp_ctx, new_values, struct ldb_val, num_new_values+1);
@@ -2047,8 +2058,9 @@ static int replmd_modify_la_add(struct ldb_module *module,
                        uint32_t rmd_flags = dsdb_dn_rmd_flags(p->dsdb_dn->dn);
 
                        if (!(rmd_flags & DSDB_RMD_FLAG_DELETED)) {
+                               struct GUID_txt_buf guid_str;
                                ldb_asprintf_errstring(ldb, "Attribute %s already exists for target GUID %s",
-                                                      el->name, GUID_string(tmp_ctx, p->guid));
+                                                      el->name, GUID_buf_string(&p->guid, &guid_str));
                                talloc_free(tmp_ctx);
                                /* error codes for 'member' need to be
                                   special cased */
@@ -2066,7 +2078,7 @@ static int replmd_modify_la_add(struct ldb_module *module,
                        }
                }
 
-               ret = replmd_add_backlink(module, schema, msg_guid, dns[i].guid, true, schema_attr, true);
+               ret = replmd_add_backlink(module, schema, msg_guid, &dns[i].guid, true, schema_attr, true);
                if (ret != LDB_SUCCESS) {
                        talloc_free(tmp_ctx);
                        return ret;
@@ -2164,10 +2176,11 @@ static int replmd_modify_la_delete(struct ldb_module *module,
                struct parsed_dn *p2;
                uint32_t rmd_flags;
 
-               p2 = parsed_dn_find(old_dns, old_el->num_values, p->guid, NULL);
+               p2 = parsed_dn_find(old_dns, old_el->num_values, &p->guid, NULL);
                if (!p2) {
+                       struct GUID_txt_buf buf;
                        ldb_asprintf_errstring(ldb, "Attribute %s doesn't exist for target GUID %s",
-                                              el->name, GUID_string(tmp_ctx, p->guid));
+                                              el->name, GUID_buf_string(&p->guid, &buf));
                        if (ldb_attr_cmp(el->name, "member") == 0) {
                                return LDB_ERR_UNWILLING_TO_PERFORM;
                        } else {
@@ -2176,8 +2189,9 @@ static int replmd_modify_la_delete(struct ldb_module *module,
                }
                rmd_flags = dsdb_dn_rmd_flags(p2->dsdb_dn->dn);
                if (rmd_flags & DSDB_RMD_FLAG_DELETED) {
+                       struct GUID_txt_buf buf;
                        ldb_asprintf_errstring(ldb, "Attribute %s already deleted for target GUID %s",
-                                              el->name, GUID_string(tmp_ctx, p->guid));
+                                              el->name, GUID_buf_string(&p->guid, &buf));
                        if (ldb_attr_cmp(el->name, "member") == 0) {
                                return LDB_ERR_UNWILLING_TO_PERFORM;
                        } else {
@@ -2193,7 +2207,7 @@ static int replmd_modify_la_delete(struct ldb_module *module,
                struct parsed_dn *p = &old_dns[i];
                uint32_t rmd_flags;
 
-               if (el->num_values && parsed_dn_find(dns, el->num_values, p->guid, NULL) == NULL) {
+               if (el->num_values && parsed_dn_find(dns, el->num_values, &p->guid, NULL) == NULL) {
                        continue;
                }
 
@@ -2207,7 +2221,7 @@ static int replmd_modify_la_delete(struct ldb_module *module,
                        return ret;
                }
 
-               ret = replmd_add_backlink(module, schema, msg_guid, old_dns[i].guid, false, schema_attr, true);
+               ret = replmd_add_backlink(module, schema, msg_guid, &old_dns[i].guid, false, schema_attr, true);
                if (ret != LDB_SUCCESS) {
                        talloc_free(tmp_ctx);
                        return ret;
@@ -2290,13 +2304,13 @@ static int replmd_modify_la_replace(struct ldb_module *module,
 
                if (rmd_flags & DSDB_RMD_FLAG_DELETED) continue;
 
-               ret = replmd_add_backlink(module, schema, msg_guid, old_dns[i].guid, false, schema_attr, false);
+               ret = replmd_add_backlink(module, schema, msg_guid, &old_dns[i].guid, false, schema_attr, false);
                if (ret != LDB_SUCCESS) {
                        talloc_free(tmp_ctx);
                        return ret;
                }
 
-               p = parsed_dn_find(dns, el->num_values, old_p->guid, NULL);
+               p = parsed_dn_find(dns, el->num_values, &old_p->guid, NULL);
                if (p) {
                        /* we don't delete it if we are re-adding it */
                        continue;
@@ -2318,7 +2332,7 @@ static int replmd_modify_la_replace(struct ldb_module *module,
 
                if (old_dns &&
                    (old_p = parsed_dn_find(old_dns,
-                                           old_num_values, p->guid, NULL)) != NULL) {
+                                           old_num_values, &p->guid, NULL)) != NULL) {
                        /* update in place */
                        ret = replmd_update_la_val(old_el->values, old_p->v, p->dsdb_dn,
                                                   old_p->dsdb_dn, invocation_id,
@@ -2345,7 +2359,7 @@ static int replmd_modify_la_replace(struct ldb_module *module,
                        num_new_values++;
                }
 
-               ret = replmd_add_backlink(module, schema, msg_guid, dns[i].guid, true, schema_attr, false);
+               ret = replmd_add_backlink(module, schema, msg_guid, &dns[i].guid, true, schema_attr, false);
                if (ret != LDB_SUCCESS) {
                        talloc_free(tmp_ctx);
                        return ret;
@@ -3583,6 +3597,47 @@ static bool replmd_replPropertyMetaData1_is_newer(struct replPropertyMetaData1 *
                                      new_m->originating_change_time);
 }
 
+static bool replmd_replPropertyMetaData1_new_should_be_taken(uint32_t dsdb_repl_flags,
+                                                            struct replPropertyMetaData1 *cur_m,
+                                                            struct replPropertyMetaData1 *new_m)
+{
+       bool cmp;
+
+       /*
+        * If the new replPropertyMetaData entry for this attribute is
+        * not provided (this happens in the case where we look for
+        * ATTID_name, but the name was not changed), then the local
+        * state is clearly still current, as the remote
+        * server didn't send it due to being older the high watermark
+        * USN we sent.
+        */
+       if (new_m == NULL) {
+               return false;
+       }
+
+       if (dsdb_repl_flags & DSDB_REPL_FLAG_PRIORITISE_INCOMING) {
+               /*
+                * if we compare equal then do an
+                * update. This is used when a client
+                * asks for a FULL_SYNC, and can be
+                * used to recover a corrupt
+                * replica.
+                *
+                * This call is a bit tricky, what we
+                * are doing it turning the 'is_newer'
+                * call into a 'not is older' by
+                * swapping cur_m and new_m, and negating the
+                * outcome.
+                */
+               cmp = !replmd_replPropertyMetaData1_is_newer(new_m,
+                                                            cur_m);
+       } else {
+               cmp = replmd_replPropertyMetaData1_is_newer(cur_m,
+                                                           new_m);
+       }
+       return cmp;
+}
+
 
 /*
   form a conflict DN
@@ -3857,18 +3912,31 @@ static int replmd_op_possible_conflict_callback(struct ldb_request *req, struct
 
        rmd = ar->objs->objects[ar->index_current].meta_data;
 
-       /* we decide which is newer based on the RPMD on the name
-          attribute.  See [MS-DRSR] ResolveNameConflict */
+       /*
+        * we decide which is newer based on the RPMD on the name
+        * attribute.  See [MS-DRSR] ResolveNameConflict.
+        *
+        * We expect omd_name to be present, as this is from a local
+        * search, but while rmd_name should have been given to us by
+        * the remote server, if it is missing we just prefer the
+        * local name in
+        * replmd_replPropertyMetaData1_new_should_be_taken()
+        */
        rmd_name = replmd_replPropertyMetaData1_find_attid(rmd, DRSUAPI_ATTID_name);
        omd_name = replmd_replPropertyMetaData1_find_attid(&omd, DRSUAPI_ATTID_name);
-       if (!rmd_name || !omd_name) {
-               DEBUG(0,(__location__ ": Failed to find name attribute in replPropertyMetaData for %s\n",
+       if (!omd_name) {
+               DEBUG(0,(__location__ ": Failed to find name attribute in local LDB replPropertyMetaData for %s\n",
                         ldb_dn_get_linearized(conflict_dn)));
                goto failed;
        }
 
-       rename_incoming_record = !(ar->objs->dsdb_repl_flags & DSDB_REPL_FLAG_PRIORITISE_INCOMING) &&
-               !replmd_replPropertyMetaData1_is_newer(omd_name, rmd_name);
+       /*
+        * Should we preserve the current record, and so rename the
+        * incoming record to be a conflict?
+        */
+       rename_incoming_record
+               = !replmd_replPropertyMetaData1_new_should_be_taken(ar->objs->dsdb_repl_flags & DSDB_REPL_FLAG_PRIORITISE_INCOMING,
+                                                                   omd_name, rmd_name);
 
        if (rename_incoming_record) {
                struct GUID guid;
@@ -4018,12 +4086,19 @@ static int replmd_replicated_apply_add(struct replmd_replicated_request *ar)
        unsigned int i;
        int ret;
        bool remote_isDeleted = false;
-       const struct dsdb_attribute *rdn_sa;
-       const char *rdn_name;
+       bool is_schema_nc;
+       NTTIME now;
+       time_t t = time(NULL);
+       const struct ldb_val *rdn_val;
+       struct replmd_private *replmd_private =
+               talloc_get_type(ldb_module_get_private(ar->module),
+                               struct replmd_private);
+       unix_to_nt_time(&now, t);
 
        ldb = ldb_module_get_ctx(ar->module);
        msg = ar->objs->objects[ar->index_current].msg;
        md = ar->objs->objects[ar->index_current].meta_data;
+       is_schema_nc = ldb_dn_compare_base(replmd_private->schema_dn, msg->dn) == 0;
 
        ret = ldb_sequence_number(ldb, LDB_SEQ_NEXT, &ar->seq_num);
        if (ret != LDB_SUCCESS) {
@@ -4087,23 +4162,20 @@ static int replmd_replicated_apply_add(struct replmd_replicated_request *ar)
                                                     "isDeleted", false);
 
        /*
-        * the meta data array is already sorted by the caller
+        * the meta data array is already sorted by the caller, except
+        * for the RDN, which needs to be added.
         */
 
-       rdn_name = ldb_dn_get_rdn_name(msg->dn);
-       if (rdn_name == NULL) {
-               ldb_asprintf_errstring(ldb, __location__ ": No rDN for %s?\n", ldb_dn_get_linearized(msg->dn));
-               return replmd_replicated_request_error(ar, LDB_ERR_INVALID_DN_SYNTAX);
-       }
 
-       rdn_sa = dsdb_attribute_by_lDAPDisplayName(ar->schema, rdn_name);
-       if (rdn_sa == NULL) {
-               ldb_asprintf_errstring(ldb, ": No schema attribute found for rDN %s for %s\n",
-                                      rdn_name, ldb_dn_get_linearized(msg->dn));
-               return replmd_replicated_request_error(ar, LDB_ERR_UNDEFINED_ATTRIBUTE_TYPE);
+       rdn_val = ldb_dn_get_rdn_val(msg->dn);
+       ret = replmd_update_rpmd_rdn_attr(ldb, msg, rdn_val, NULL,
+                                    md, ar, now, is_schema_nc);
+       if (ret != LDB_SUCCESS) {
+               ldb_asprintf_errstring(ldb, "%s: error during DRS repl ADD: %s", __func__, ldb_errstring(ldb));
+               return replmd_replicated_request_error(ar, ret);
        }
 
-       ret = replmd_replPropertyMetaDataCtr1_verify(ldb, &md->ctr.ctr1, rdn_sa, msg->dn);
+       ret = replmd_replPropertyMetaDataCtr1_sort_and_verify(ldb, &md->ctr.ctr1, msg->dn);
        if (ret != LDB_SUCCESS) {
                ldb_asprintf_errstring(ldb, "%s: error during DRS repl ADD: %s", __func__, ldb_errstring(ldb));
                return replmd_replicated_request_error(ar, ret);
@@ -4392,6 +4464,11 @@ static int replmd_replicated_handle_rename(struct replmd_replicated_request *ar,
 
        ret = dsdb_module_rename(ar->module, ar->search_msg->dn, msg->dn,
                                 DSDB_FLAG_NEXT_MODULE, ar->req);
+       if (ret == LDB_SUCCESS) {
+               talloc_free(tmp_ctx);
+               *renamed = true;
+               return ret;
+       }
 
        if (ret != LDB_ERR_ENTRY_ALREADY_EXISTS) {
                talloc_free(tmp_ctx);
@@ -4462,18 +4539,32 @@ static int replmd_replicated_handle_rename(struct replmd_replicated_request *ar,
 
        rmd = ar->objs->objects[ar->index_current].meta_data;
 
-       /* we decide which is newer based on the RPMD on the name
-          attribute.  See [MS-DRSR] ResolveNameConflict */
+       /*
+        * we decide which is newer based on the RPMD on the name
+        * attribute.  See [MS-DRSR] ResolveNameConflict.
+        *
+        * We expect omd_name to be present, as this is from a local
+        * search, but while rmd_name should have been given to us by
+        * the remote server, if it is missing we just prefer the
+        * local name in
+        * replmd_replPropertyMetaData1_new_should_be_taken()
+        */
        rmd_name = replmd_replPropertyMetaData1_find_attid(rmd, DRSUAPI_ATTID_name);
        omd_name = replmd_replPropertyMetaData1_find_attid(&omd, DRSUAPI_ATTID_name);
-       if (!rmd_name || !omd_name) {
-               DEBUG(0,(__location__ ": Failed to find name attribute in replPropertyMetaData for %s\n",
+       if (!omd_name) {
+               DEBUG(0,(__location__ ": Failed to find name attribute in local LDB replPropertyMetaData for %s\n",
                         ldb_dn_get_linearized(conflict_dn)));
                goto failed;
        }
 
-       rename_incoming_record = !(ar->objs->dsdb_repl_flags & DSDB_REPL_FLAG_PRIORITISE_INCOMING) &&
-               !replmd_replPropertyMetaData1_is_newer(omd_name, rmd_name);
+       /*
+        * Should we preserve the current record, and so rename the
+        * incoming record to be a conflict?
+        */
+       rename_incoming_record =
+               !replmd_replPropertyMetaData1_new_should_be_taken(
+                       ar->objs->dsdb_repl_flags & DSDB_REPL_FLAG_PRIORITISE_INCOMING,
+                       omd_name, rmd_name);
 
        if (rename_incoming_record) {
 
@@ -4594,11 +4685,21 @@ static int replmd_replicated_apply_merge(struct replmd_replicated_request *ar)
        bool take_remote_isDeleted = false;
        bool sd_updated = false;
        bool renamed = false;
+       bool is_schema_nc = false;
        NTSTATUS nt_status;
+       const struct ldb_val *old_rdn, *new_rdn;
+       struct replmd_private *replmd_private =
+               talloc_get_type(ldb_module_get_private(ar->module),
+                               struct replmd_private);
+       NTTIME now;
+       time_t t = time(NULL);
+       unix_to_nt_time(&now, t);
 
        ldb = ldb_module_get_ctx(ar->module);
        msg = ar->objs->objects[ar->index_current].msg;
 
+       is_schema_nc = ldb_dn_compare_base(replmd_private->schema_dn, msg->dn) == 0;
+
        rmd = ar->objs->objects[ar->index_current].meta_data;
        ZERO_STRUCT(omd);
        omd.version = 1;
@@ -4618,6 +4719,26 @@ static int replmd_replicated_apply_merge(struct replmd_replicated_request *ar)
                }
        }
 
+       if (DEBUGLVL(5)) {
+               struct GUID_txt_buf guid_txt;
+
+               char *s = ldb_ldif_message_string(ldb, ar, LDB_CHANGETYPE_MODIFY, msg);
+               DEBUG(5, ("Initial DRS replication modify message of %s is:\n%s\n"
+                         "%s\n"
+                         "%s\n",
+                         GUID_buf_string(&ar->objs->objects[ar->index_current].object_guid, &guid_txt),
+                         s,
+                         ndr_print_struct_string(s,
+                                                 (ndr_print_fn_t)ndr_print_replPropertyMetaDataBlob,
+                                                 "existing replPropertyMetaData",
+                                                 &omd),
+                         ndr_print_struct_string(s,
+                                                 (ndr_print_fn_t)ndr_print_replPropertyMetaDataBlob,
+                                                 "incoming replPropertyMetaData",
+                                                 rmd)));
+               talloc_free(s);
+       }
+
        local_isDeleted = ldb_msg_find_attr_as_bool(ar->search_msg,
                                                    "isDeleted", false);
        remote_isDeleted = ldb_msg_find_attr_as_bool(msg,
@@ -4656,7 +4777,6 @@ static int replmd_replicated_apply_merge(struct replmd_replicated_request *ar)
                 * the peer has an older name to what we have (see
                 * replmd_replicated_apply_search_callback())
                 */
-               renamed = true;
                ret = replmd_replicated_handle_rename(ar, msg, ar->req, &renamed);
        }
 
@@ -4703,26 +4823,10 @@ static int replmd_replicated_apply_merge(struct replmd_replicated_request *ar)
                                continue;
                        }
 
-                       if (ar->objs->dsdb_repl_flags & DSDB_REPL_FLAG_PRIORITISE_INCOMING) {
-                               /*
-                                * if we compare equal then do an
-                                * update. This is used when a client
-                                * asks for a FULL_SYNC, and can be
-                                * used to recover a corrupt
-                                * replica.
-                                *
-                                * This call is a bit tricky, what we
-                                * are doing it turning the 'is_newer'
-                                * call into a 'not is older' by
-                                * swapping i and j, and negating the
-                                * outcome.
-                               */
-                               cmp = !replmd_replPropertyMetaData1_is_newer(&rmd->ctr.ctr1.array[i],
-                                                                            &nmd.ctr.ctr1.array[j]);
-                       } else {
-                               cmp = replmd_replPropertyMetaData1_is_newer(&nmd.ctr.ctr1.array[j],
-                                                                           &rmd->ctr.ctr1.array[i]);
-                       }
+                       cmp = replmd_replPropertyMetaData1_new_should_be_taken(
+                               ar->objs->dsdb_repl_flags,
+                               &nmd.ctr.ctr1.array[j],
+                               &rmd->ctr.ctr1.array[i]);
                        if (cmp) {
                                /* replace the entry */
                                nmd.ctr.ctr1.array[j] = rmd->ctr.ctr1.array[i];
@@ -4790,14 +4894,21 @@ static int replmd_replicated_apply_merge(struct replmd_replicated_request *ar)
         */
        nmd.ctr.ctr1.count = ni;
 
+       new_rdn = ldb_dn_get_rdn_val(msg->dn);
+       old_rdn = ldb_dn_get_rdn_val(ar->search_msg->dn);
+
+       if (renamed) {
+               ret = replmd_update_rpmd_rdn_attr(ldb, msg, new_rdn, old_rdn,
+                                                 &nmd, ar, now, is_schema_nc);
+               if (ret != LDB_SUCCESS) {
+                       ldb_asprintf_errstring(ldb, "%s: error during DRS repl merge: %s", __func__, ldb_errstring(ldb));
+                       return replmd_replicated_request_error(ar, ret);
+               }
+       }
        /*
-        * the rdn attribute (the alias for the name attribute),
-        * 'cn' for most objects is the last entry in the meta data array
-        * we have stored
-        *
         * sort the new meta data array
         */
-       ret = replmd_replPropertyMetaDataCtr1_sort_and_verify(ldb, &nmd.ctr.ctr1, ar->schema, msg->dn);
+       ret = replmd_replPropertyMetaDataCtr1_sort_and_verify(ldb, &nmd.ctr.ctr1, msg->dn);
        if (ret != LDB_SUCCESS) {
                ldb_asprintf_errstring(ldb, "%s: error during DRS repl merge: %s", __func__, ldb_errstring(ldb));
                return ret;
@@ -4887,7 +4998,7 @@ static int replmd_replicated_apply_merge(struct replmd_replicated_request *ar)
                struct GUID_txt_buf guid_txt;
 
                char *s = ldb_ldif_message_string(ldb, ar, LDB_CHANGETYPE_MODIFY, msg);
-               DEBUG(4, ("DRS replication modify message of %s:\n%s\n",
+               DEBUG(4, ("Final DRS replication modify message of %s:\n%s\n",
                          GUID_buf_string(&ar->objs->objects[ar->index_current].object_guid, &guid_txt),
                          s));
                talloc_free(s);
@@ -5015,17 +5126,25 @@ static int replmd_replicated_apply_search_callback(struct ldb_request *req,
                 * parent.
                 */
                md_remote = replmd_replPropertyMetaData1_find_attid(rmd, DRSUAPI_ATTID_name);
-               md_local  = replmd_replPropertyMetaData1_find_attid(&omd, DRSUAPI_ATTID_name);
-               /* if there is no name attribute then we have to assume the
-                  object we've received is in fact newer */
-               if (ar->objs->dsdb_repl_flags & DSDB_REPL_FLAG_PRIORITISE_INCOMING ||
-                   !md_remote || !md_local ||
-                   replmd_replPropertyMetaData1_is_newer(md_local, md_remote)) {
+               md_local = replmd_replPropertyMetaData1_find_attid(&omd, DRSUAPI_ATTID_name);
+               if (!md_local) {
+                       DEBUG(0,(__location__ ": Failed to find name attribute in local LDB replPropertyMetaData for %s\n",
+                                ldb_dn_get_linearized(ar->search_msg->dn)));
+                       return replmd_replicated_request_werror(ar, WERR_DS_DRA_DB_ERROR);
+               }
+
+               /*
+                * if there is no name attribute given then we have to assume the
+                *  object we've received has the older name
+                */
+               if (replmd_replPropertyMetaData1_new_should_be_taken(
+                           ar->objs->dsdb_repl_flags & DSDB_REPL_FLAG_PRIORITISE_INCOMING,
+                           md_local, md_remote)) {
                        struct GUID_txt_buf p_guid_local;
                        struct GUID_txt_buf p_guid_remote;
                        msg = ar->objs->objects[ar->index_current].msg;
 
-                       /* Otherwise, just merge on the existing object, force no rename */
+                       /* Merge on the existing object, with rename */
 
                        DEBUG(4,(__location__ ": Looking for new parent for object %s currently under %s "
                                 "as incoming object changing to %s under %s\n",
@@ -5040,7 +5159,11 @@ static int replmd_replicated_apply_search_callback(struct ldb_request *req,
                        struct GUID_txt_buf p_guid_remote;
                        msg = ar->objs->objects[ar->index_current].msg;
 
-                       /* Otherwise, just merge on the existing object, force no rename */
+                       /*
+                        * Merge on the existing object, force no
+                        * rename (code below just to explain why in
+                        * the DEBUG() logs)
+                        */
 
                        if (strcmp(ldb_dn_get_linearized(ar->search_msg->dn),
                                   ldb_dn_get_linearized(msg->dn)) == 0) {
@@ -5954,7 +6077,7 @@ linked_attributes[0]:
 
                if (!(rmd_flags & DSDB_RMD_FLAG_DELETED)) {
                        /* remove the existing backlink */
-                       ret = replmd_add_backlink(module, schema, &la->identifier->guid, &guid, false, attr, false);
+                       ret = replmd_add_backlink(module, schema, &la->identifier->guid, &guid, false, attr, true);
                        if (ret != LDB_SUCCESS) {
                                talloc_free(tmp_ctx);
                                return ret;
@@ -5974,7 +6097,7 @@ linked_attributes[0]:
 
                if (active) {
                        /* add the new backlink */
-                       ret = replmd_add_backlink(module, schema, &la->identifier->guid, &guid, true, attr, false);
+                       ret = replmd_add_backlink(module, schema, &la->identifier->guid, &guid, true, attr, true);
                        if (ret != LDB_SUCCESS) {
                                talloc_free(tmp_ctx);
                                return ret;
@@ -6009,7 +6132,7 @@ linked_attributes[0]:
 
                if (active) {
                        ret = replmd_add_backlink(module, schema, &la->identifier->guid, &guid,
-                                                 true, attr, false);
+                                                 true, attr, true);
                        if (ret != LDB_SUCCESS) {
                                talloc_free(tmp_ctx);
                                return ret;