replmd: Cache recycle-bin state to avoid DB lookup
[samba.git] / source4 / dsdb / samdb / ldb_modules / repl_meta_data.c
index 5aa3ed7badaa7eb4f42fd813e747f2c2b33a62c3..58f9df9c98cf03fe401f49dc61d63ccd2eefbc46 100644 (file)
@@ -65,7 +65,7 @@ static const NTTIME DELETED_OBJECT_CONTAINER_CHANGE_TIME = 2650466015990000000UL
 
 struct replmd_private {
        TALLOC_CTX *la_ctx;
-       struct la_entry *la_list;
+       struct la_group *la_list;
        struct nc_entry {
                struct nc_entry *prev, *next;
                struct ldb_dn *dn;
@@ -75,6 +75,24 @@ struct replmd_private {
        struct ldb_dn *schema_dn;
        bool originating_updates;
        bool sorted_links;
+       uint32_t total_links;
+       uint32_t num_processed;
+       bool recyclebin_enabled;
+       bool recyclebin_state_known;
+};
+
+/*
+ * groups link attributes together by source-object and attribute-ID,
+ * to improve processing efficiency (i.e. for 'member' attribute, which
+ * could have 100s or 1000s of links).
+ * Note this grouping is best effort - the same source object could still
+ * correspond to several la_groups (a lot depends on the order DRS sends
+ * the links in). The groups currently don't span replication chunks (which
+ * caps the size to ~1500 links by default).
+ */
+struct la_group {
+       struct la_group *next, *prev;
+       struct la_entry *la_entries;
 };
 
 struct la_entry {
@@ -112,6 +130,8 @@ struct replmd_replicated_request {
        bool is_urgent;
 
        bool isDeleted;
+
+       bool fix_link_sid;
 };
 
 static int replmd_replicated_apply_merge(struct replmd_replicated_request *ar);
@@ -120,8 +140,16 @@ static int replmd_check_upgrade_links(struct ldb_context *ldb,
                                      struct parsed_dn *dns, uint32_t count,
                                      struct ldb_message_element *el,
                                      const char *ldap_oid);
-static int replmd_verify_linked_attribute(struct replmd_replicated_request *ar,
-                                         struct la_entry *la);
+static int replmd_verify_link_target(struct replmd_replicated_request *ar,
+                                    TALLOC_CTX *mem_ctx,
+                                    struct la_entry *la_entry,
+                                    struct ldb_dn *src_dn,
+                                    const struct dsdb_attribute *attr);
+static int replmd_get_la_entry_source(struct ldb_module *module,
+                                     struct la_entry *la_entry,
+                                     TALLOC_CTX *mem_ctx,
+                                     const struct dsdb_attribute **ret_attr,
+                                     struct ldb_message **source_msg);
 static int replmd_set_la_val(TALLOC_CTX *mem_ctx, struct ldb_val *v, struct dsdb_dn *dsdb_dn,
                             struct dsdb_dn *old_dsdb_dn, const struct GUID *invocation_id,
                             uint64_t usn, uint64_t local_usn, NTTIME nttime,
@@ -148,12 +176,35 @@ enum deletion_state {
        OBJECT_REMOVED=5
 };
 
+static bool replmd_recyclebin_enabled(struct ldb_module *module)
+{
+       bool enabled = false;
+       struct replmd_private *replmd_private =
+               talloc_get_type_abort(ldb_module_get_private(module),
+                                     struct replmd_private);
+
+       /*
+        * only lookup the recycle-bin state once per replication, then cache
+        * the result. This can save us 1000s of DB searches
+        */
+       if (!replmd_private->recyclebin_state_known) {
+               int ret = dsdb_recyclebin_enabled(module, &enabled);
+               if (ret != LDB_SUCCESS) {
+                       return false;
+               }
+
+               replmd_private->recyclebin_enabled = enabled;
+               replmd_private->recyclebin_state_known = true;
+       }
+
+       return replmd_private->recyclebin_enabled;
+}
+
 static void replmd_deletion_state(struct ldb_module *module,
                                  const struct ldb_message *msg,
                                  enum deletion_state *current_state,
                                  enum deletion_state *next_state)
 {
-       int ret;
        bool enabled = false;
 
        if (msg == NULL) {
@@ -164,10 +215,7 @@ static void replmd_deletion_state(struct ldb_module *module,
                return;
        }
 
-       ret = dsdb_recyclebin_enabled(module, &enabled);
-       if (ret != LDB_SUCCESS) {
-               enabled = false;
-       }
+       enabled = replmd_recyclebin_enabled(module);
 
        if (ldb_msg_check_string_attribute(msg, "isDeleted", "TRUE")) {
                if (!enabled) {
@@ -308,7 +356,7 @@ static void replmd_txn_cleanup(struct replmd_private *replmd_private)
        talloc_free(replmd_private->la_ctx);
        replmd_private->la_list = NULL;
        replmd_private->la_ctx = NULL;
-
+       replmd_private->recyclebin_state_known = false;
 }
 
 
@@ -2485,6 +2533,109 @@ static int replmd_modify_la_add(struct ldb_module *module,
                        return err;
                }
 
+               if (ac->fix_link_sid) {
+                       char *fixed_dnstring = NULL;
+                       struct dom_sid tmp_sid = { 0, };
+                       DATA_BLOB sid_blob = data_blob_null;
+                       enum ndr_err_code ndr_err;
+                       NTSTATUS status;
+                       int num;
+
+                       if (exact == NULL) {
+                               talloc_free(tmp_ctx);
+                               return ldb_operr(ldb);
+                       }
+
+                       if (dns[i].dsdb_dn->dn_format != DSDB_NORMAL_DN) {
+                               talloc_free(tmp_ctx);
+                               return ldb_operr(ldb);
+                       }
+
+                       /*
+                        * Only "<GUID=...><SID=...>" is allowed.
+                        *
+                        * We get the GUID to just to find the old
+                        * value and the SID in order to add it
+                        * to the found value.
+                        */
+
+                       num = ldb_dn_get_comp_num(dns[i].dsdb_dn->dn);
+                       if (num != 0) {
+                               talloc_free(tmp_ctx);
+                               return ldb_operr(ldb);
+                       }
+
+                       num = ldb_dn_get_extended_comp_num(dns[i].dsdb_dn->dn);
+                       if (num != 2) {
+                               talloc_free(tmp_ctx);
+                               return ldb_operr(ldb);
+                       }
+
+                       status = dsdb_get_extended_dn_sid(exact->dsdb_dn->dn,
+                                                         &tmp_sid, "SID");
+                       if (NT_STATUS_EQUAL(status, NT_STATUS_OBJECT_NAME_NOT_FOUND)) {
+                               /* this is what we expect */
+                       } else if (NT_STATUS_IS_OK(status)) {
+                               struct GUID_txt_buf guid_str;
+                               ldb_debug_set(ldb, LDB_DEBUG_FATAL,
+                                                      "i[%u] SID NOT MISSING... Attribute %s already "
+                                                      "exists for target GUID %s, SID %s, DN: %s",
+                                                      i, el->name,
+                                                      GUID_buf_string(&exact->guid,
+                                                                      &guid_str),
+                                                      dom_sid_string(tmp_ctx, &tmp_sid),
+                                                      dsdb_dn_get_extended_linearized(tmp_ctx,
+                                                              exact->dsdb_dn, 1));
+                               talloc_free(tmp_ctx);
+                               return LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
+                       } else {
+                               talloc_free(tmp_ctx);
+                               return ldb_operr(ldb);
+                       }
+
+                       status = dsdb_get_extended_dn_sid(dns[i].dsdb_dn->dn,
+                                                         &tmp_sid, "SID");
+                       if (!NT_STATUS_IS_OK(status)) {
+                               struct GUID_txt_buf guid_str;
+                               ldb_asprintf_errstring(ldb,
+                                                      "NO SID PROVIDED... Attribute %s already "
+                                                      "exists for target GUID %s",
+                                                      el->name,
+                                                      GUID_buf_string(&exact->guid,
+                                                                      &guid_str));
+                               talloc_free(tmp_ctx);
+                               return LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
+                       }
+
+                       ndr_err = ndr_push_struct_blob(&sid_blob, tmp_ctx, &tmp_sid,
+                                                      (ndr_push_flags_fn_t)ndr_push_dom_sid);
+                       if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
+                               talloc_free(tmp_ctx);
+                               return ldb_operr(ldb);
+                       }
+
+                       ret = ldb_dn_set_extended_component(exact->dsdb_dn->dn, "SID", &sid_blob);
+                       data_blob_free(&sid_blob);
+                       if (ret != LDB_SUCCESS) {
+                               talloc_free(tmp_ctx);
+                               return ret;
+                       }
+
+                       fixed_dnstring = dsdb_dn_get_extended_linearized(
+                                       new_values, exact->dsdb_dn, 1);
+                       if (fixed_dnstring == NULL) {
+                               talloc_free(tmp_ctx);
+                               return ldb_operr(ldb);
+                       }
+
+                       /*
+                        * We just replace the existing value...
+                        */
+                       *exact->v = data_blob_string_const(fixed_dnstring);
+
+                       continue;
+               }
+
                if (exact != NULL) {
                        /*
                         * We are trying to add one that exists, which is only
@@ -2615,12 +2766,11 @@ static int replmd_modify_la_add(struct ldb_module *module,
  */
 static int replmd_modify_la_delete(struct ldb_module *module,
                                   struct replmd_private *replmd_private,
-                                  const struct dsdb_schema *schema,
+                                  struct replmd_replicated_request *ac,
                                   struct ldb_message *msg,
                                   struct ldb_message_element *el,
                                   struct ldb_message_element *old_el,
                                   const struct dsdb_attribute *schema_attr,
-                                  uint64_t seq_num,
                                   time_t t,
                                   struct ldb_dn *msg_dn,
                                   struct ldb_request *parent)
@@ -2634,16 +2784,10 @@ static int replmd_modify_la_delete(struct ldb_module *module,
        bool vanish_links = false;
        unsigned int num_to_delete = el->num_values;
        uint32_t rmd_flags;
-       const struct GUID *invocation_id;
        NTTIME now;
 
        unix_to_nt_time(&now, t);
 
-       invocation_id = samdb_ntds_invocation_id(ldb);
-       if (!invocation_id) {
-               return LDB_ERR_OPERATIONS_ERROR;
-       }
-
        if (old_el == NULL || old_el->num_values == 0) {
                /* there is nothing to delete... */
                if (num_to_delete == 0) {
@@ -2707,7 +2851,7 @@ static int replmd_modify_la_delete(struct ldb_module *module,
                                }
                        }
                        ret = replmd_add_backlink(module, replmd_private,
-                                                 schema, msg_dn, &p->guid,
+                                                 ac->schema, msg_dn, &p->guid,
                                                  false, schema_attr,
                                                  parent);
                        if (ret != LDB_SUCCESS) {
@@ -2725,8 +2869,9 @@ static int replmd_modify_la_delete(struct ldb_module *module,
 
                        ret = replmd_update_la_val(old_el->values, p->v,
                                                   p->dsdb_dn, p->dsdb_dn,
-                                                  invocation_id, seq_num,
-                                                  seq_num, now, true);
+                                                  &ac->our_invocation_id,
+                                                  ac->seq_num, ac->seq_num,
+                                                  now, true);
                        if (ret != LDB_SUCCESS) {
                                talloc_free(tmp_ctx);
                                return ret;
@@ -2788,7 +2933,7 @@ static int replmd_modify_la_delete(struct ldb_module *module,
                        /* remove the backlink */
                        ret = replmd_add_backlink(module,
                                                  replmd_private,
-                                                 schema, 
+                                                 ac->schema,
                                                  msg_dn,
                                                  &p->guid,
                                                  false, schema_attr,
@@ -2822,14 +2967,15 @@ static int replmd_modify_la_delete(struct ldb_module *module,
 
                ret = replmd_update_la_val(old_el->values, exact->v,
                                           exact->dsdb_dn, exact->dsdb_dn,
-                                          invocation_id, seq_num, seq_num,
+                                          &ac->our_invocation_id,
+                                          ac->seq_num, ac->seq_num,
                                           now, true);
                if (ret != LDB_SUCCESS) {
                        talloc_free(tmp_ctx);
                        return ret;
                }
                ret = replmd_add_backlink(module, replmd_private,
-                                         schema, msg_dn,
+                                         ac->schema, msg_dn,
                                          &p->guid,
                                          false, schema_attr,
                                          parent);
@@ -2879,12 +3025,11 @@ static int replmd_modify_la_delete(struct ldb_module *module,
  */
 static int replmd_modify_la_replace(struct ldb_module *module,
                                    struct replmd_private *replmd_private,
-                                   const struct dsdb_schema *schema,
+                                   struct replmd_replicated_request *ac,
                                    struct ldb_message *msg,
                                    struct ldb_message_element *el,
                                    struct ldb_message_element *old_el,
                                    const struct dsdb_attribute *schema_attr,
-                                   uint64_t seq_num,
                                    time_t t,
                                    struct ldb_dn *msg_dn,
                                    struct ldb_request *parent)
@@ -2893,7 +3038,6 @@ static int replmd_modify_la_replace(struct ldb_module *module,
        struct parsed_dn *dns, *old_dns;
        TALLOC_CTX *tmp_ctx = talloc_new(msg);
        int ret;
-       const struct GUID *invocation_id;
        struct ldb_context *ldb = ldb_module_get_ctx(module);
        struct ldb_val *new_values = NULL;
        const char *ldap_oid = schema_attr->syntax->ldap_oid;
@@ -2904,11 +3048,6 @@ static int replmd_modify_la_replace(struct ldb_module *module,
 
        unix_to_nt_time(&now, t);
 
-       invocation_id = samdb_ntds_invocation_id(ldb);
-       if (!invocation_id) {
-               return LDB_ERR_OPERATIONS_ERROR;
-       }
-
        /*
         * The replace operation is unlike the replace and delete cases in that
         * we need to look at every existing link to see whether it is being
@@ -3008,8 +3147,8 @@ static int replmd_modify_la_replace(struct ldb_module *module,
                                ret = replmd_update_la_val(new_values, old_p->v,
                                                           old_p->dsdb_dn,
                                                           old_p->dsdb_dn,
-                                                          invocation_id,
-                                                          seq_num, seq_num,
+                                                          &ac->our_invocation_id,
+                                                          ac->seq_num, ac->seq_num,
                                                           now, true);
                                if (ret != LDB_SUCCESS) {
                                        talloc_free(tmp_ctx);
@@ -3017,7 +3156,7 @@ static int replmd_modify_la_replace(struct ldb_module *module,
                                }
 
                                ret = replmd_add_backlink(module, replmd_private,
-                                                         schema, 
+                                                         ac->schema,
                                                          msg_dn,
                                                          &old_p->guid, false,
                                                          schema_attr,
@@ -3042,8 +3181,8 @@ static int replmd_modify_la_replace(struct ldb_module *module,
                        ret = replmd_update_la_val(new_values, old_p->v,
                                                   new_p->dsdb_dn,
                                                   old_p->dsdb_dn,
-                                                  invocation_id,
-                                                  seq_num, seq_num,
+                                                  &ac->our_invocation_id,
+                                                  ac->seq_num, ac->seq_num,
                                                   now, false);
                        if (ret != LDB_SUCCESS) {
                                talloc_free(tmp_ctx);
@@ -3053,7 +3192,7 @@ static int replmd_modify_la_replace(struct ldb_module *module,
                        rmd_flags = dsdb_dn_rmd_flags(old_p->dsdb_dn->dn);
                        if ((rmd_flags & DSDB_RMD_FLAG_DELETED) != 0) {
                                ret = replmd_add_backlink(module, replmd_private,
-                                                         schema, 
+                                                         ac->schema,
                                                          msg_dn,
                                                          &new_p->guid, true,
                                                          schema_attr,
@@ -3075,14 +3214,14 @@ static int replmd_modify_la_replace(struct ldb_module *module,
                        ret = replmd_build_la_val(new_values,
                                                  new_p->v,
                                                  new_p->dsdb_dn,
-                                                 invocation_id,
-                                                 seq_num, now);
+                                                 &ac->our_invocation_id,
+                                                 ac->seq_num, now);
                        if (ret != LDB_SUCCESS) {
                                talloc_free(tmp_ctx);
                                return ret;
                        }
                        ret = replmd_add_backlink(module, replmd_private,
-                                                 schema,
+                                                 ac->schema,
                                                  msg_dn,
                                                  &new_p->guid, true,
                                                  schema_attr,
@@ -3201,15 +3340,15 @@ static int replmd_modify_handle_linked_attribs(struct ldb_module *module,
                switch (mod_type) {
                case LDB_FLAG_MOD_REPLACE:
                        ret = replmd_modify_la_replace(module, replmd_private,
-                                                      ac->schema, msg, el, old_el,
-                                                      schema_attr, ac->seq_num, t,
+                                                      ac, msg, el, old_el,
+                                                      schema_attr, t,
                                                       old_msg->dn,
                                                       parent);
                        break;
                case LDB_FLAG_MOD_DELETE:
                        ret = replmd_modify_la_delete(module, replmd_private,
-                                                     ac->schema, msg, el, old_el,
-                                                     schema_attr, ac->seq_num, t,
+                                                     ac, msg, el, old_el,
+                                                     schema_attr, t,
                                                      old_msg->dn,
                                                      parent);
                        break;
@@ -3322,6 +3461,7 @@ static int replmd_modify(struct ldb_module *module, struct ldb_request *req)
        struct ldb_control *sd_propagation_control;
        struct ldb_control *fix_links_control = NULL;
        struct ldb_control *fix_dn_name_control = NULL;
+       struct ldb_control *fix_dn_sid_control = NULL;
        struct replmd_private *replmd_private =
                talloc_get_type(ldb_module_get_private(module), struct replmd_private);
 
@@ -3467,6 +3607,44 @@ static int replmd_modify(struct ldb_module *module, struct ldb_request *req)
                return LDB_ERR_OPERATIONS_ERROR;
        }
 
+       fix_dn_sid_control = ldb_request_get_control(req,
+                                       DSDB_CONTROL_DBCHECK_FIX_LINK_DN_SID);
+       if (fix_dn_sid_control != NULL) {
+               const struct dsdb_attribute *sa = NULL;
+
+               if (msg->num_elements != 1) {
+                       talloc_free(ac);
+                       return ldb_module_operr(module);
+               }
+
+               if (msg->elements[0].flags != LDB_FLAG_MOD_ADD) {
+                       talloc_free(ac);
+                       return ldb_module_operr(module);
+               }
+
+               if (msg->elements[0].num_values != 1) {
+                       talloc_free(ac);
+                       return ldb_module_operr(module);
+               }
+
+               sa = dsdb_attribute_by_lDAPDisplayName(ac->schema,
+                               msg->elements[0].name);
+               if (sa == NULL) {
+                       talloc_free(ac);
+                       return ldb_module_operr(module);
+               }
+
+               if (sa->dn_format != DSDB_NORMAL_DN) {
+                       talloc_free(ac);
+                       return ldb_module_operr(module);
+               }
+
+               fix_dn_sid_control->critical = false;
+               ac->fix_link_sid = true;
+
+               goto handle_linked_attribs;
+       }
+
        ldb_msg_remove_attr(msg, "whenChanged");
        ldb_msg_remove_attr(msg, "uSNChanged");
 
@@ -3487,6 +3665,7 @@ static int replmd_modify(struct ldb_module *module, struct ldb_request *req)
                return ret;
        }
 
+ handle_linked_attribs:
        ret = replmd_modify_handle_linked_attribs(module, replmd_private,
                                                  ac, msg, t, req);
        if (ret != LDB_SUCCESS) {
@@ -4245,6 +4424,10 @@ static int replmd_delete_internals(struct ldb_module *module, struct ldb_request
             - preserved if in above list, or is rDN
          - remove all linked attribs from this object
          - remove all links from other objects to this object
+           (note we use the backlinks to do this, so we won't find one-way
+            links that still point to this object, or deactivated two-way
+            links, i.e. 'member' after the user has been removed from the
+            group)
          - add lastKnownParent
          - update replPropertyMetaData?
 
@@ -4382,12 +4565,12 @@ static int replmd_delete_internals(struct ldb_module *module, struct ldb_request
 
                        if (sa->linkID & 1) {
                                /*
-                                 we have a backlink in this object
-                                 that needs to be removed. We're not
-                                 allowed to remove it directly
-                                 however, so we instead setup a
-                                 modify to delete the corresponding
-                                 forward link
+                                * we have a backlink in this object
+                                * that needs to be removed. We're not
+                                * allowed to remove it directly
+                                * however, so we instead setup a
+                                * modify to delete the corresponding
+                                * forward link
                                 */
                                ret = replmd_delete_remove_link(module, schema,
                                                                replmd_private,
@@ -6391,6 +6574,58 @@ static int replmd_replicated_apply_search_callback(struct ldb_request *req,
        return LDB_SUCCESS;
 }
 
+/**
+ * Returns true if we can group together processing this link attribute,
+ * i.e. it has the same source-object and attribute ID as other links
+ * already in the group
+ */
+static bool la_entry_matches_group(struct la_entry *la_entry,
+                                  struct la_group *la_group)
+{
+       struct la_entry *prev = la_group->la_entries;
+
+       return (la_entry->la->attid == prev->la->attid &&
+               GUID_equal(&la_entry->la->identifier->guid,
+                          &prev->la->identifier->guid));
+}
+
+/**
+ * Creates a new la_entry to store replication info for a single
+ * linked attribute.
+ */
+static struct la_entry *
+create_la_entry(struct replmd_private *replmd_private,
+               struct drsuapi_DsReplicaLinkedAttribute *la,
+               uint32_t dsdb_repl_flags)
+{
+       struct la_entry *la_entry;
+
+       if (replmd_private->la_ctx == NULL) {
+               replmd_private->la_ctx = talloc_new(replmd_private);
+       }
+       la_entry = talloc(replmd_private->la_ctx, struct la_entry);
+       if (la_entry == NULL) {
+               return NULL;
+       }
+       la_entry->la = talloc(la_entry,
+                             struct drsuapi_DsReplicaLinkedAttribute);
+       if (la_entry->la == NULL) {
+               talloc_free(la_entry);
+               return NULL;
+       }
+       *la_entry->la = *la;
+       la_entry->dsdb_repl_flags = dsdb_repl_flags;
+
+       /*
+        * we need to steal the non-scalars so they stay
+        * around until the end of the transaction
+        */
+       talloc_steal(la_entry->la, la_entry->la->identifier);
+       talloc_steal(la_entry->la, la_entry->la->value.blob);
+
+       return la_entry;
+}
+
 /**
  * Stores the linked attributes received in the replication chunk - these get
  * applied at the end of the transaction. We also check that each linked
@@ -6403,7 +6638,11 @@ static int replmd_store_linked_attributes(struct replmd_replicated_request *ar)
        struct ldb_module *module = ar->module;
        struct replmd_private *replmd_private =
                talloc_get_type(ldb_module_get_private(module), struct replmd_private);
+       struct la_group *la_group = NULL;
        struct ldb_context *ldb;
+       TALLOC_CTX *tmp_ctx = NULL;
+       struct ldb_message *src_msg = NULL;
+       const struct dsdb_attribute *attr = NULL;
 
        ldb = ldb_module_get_ctx(module);
 
@@ -6412,38 +6651,76 @@ static int replmd_store_linked_attributes(struct replmd_replicated_request *ar)
        /* save away the linked attributes for the end of the transaction */
        for (i = 0; i < ar->objs->linked_attributes_count; i++) {
                struct la_entry *la_entry;
+               bool new_srcobj;
 
-               if (replmd_private->la_ctx == NULL) {
-                       replmd_private->la_ctx = talloc_new(replmd_private);
-               }
-               la_entry = talloc(replmd_private->la_ctx, struct la_entry);
+               /* create an entry to store the received link attribute info */
+               la_entry = create_la_entry(replmd_private,
+                                          &ar->objs->linked_attributes[i],
+                                          ar->objs->dsdb_repl_flags);
                if (la_entry == NULL) {
                        ldb_oom(ldb);
                        return LDB_ERR_OPERATIONS_ERROR;
                }
-               la_entry->la = talloc(la_entry, struct drsuapi_DsReplicaLinkedAttribute);
-               if (la_entry->la == NULL) {
-                       talloc_free(la_entry);
-                       ldb_oom(ldb);
-                       return LDB_ERR_OPERATIONS_ERROR;
-               }
-               *la_entry->la = ar->objs->linked_attributes[i];
-               la_entry->dsdb_repl_flags = ar->objs->dsdb_repl_flags;
 
-               /* we need to steal the non-scalars so they stay
-                  around until the end of the transaction */
-               talloc_steal(la_entry->la, la_entry->la->identifier);
-               talloc_steal(la_entry->la, la_entry->la->value.blob);
+               /*
+                * check if we're still dealing with the same source object
+                * as the last link
+                */
+               new_srcobj = (la_group == NULL ||
+                             !la_entry_matches_group(la_entry, la_group));
 
-               ret = replmd_verify_linked_attribute(ar, la_entry);
+               if (new_srcobj) {
 
+                       /* get a new mem_ctx to lookup the source object */
+                       TALLOC_FREE(tmp_ctx);
+                       tmp_ctx = talloc_new(ar);
+                       if (tmp_ctx == NULL) {
+                               ldb_oom(ldb);
+                               return LDB_ERR_OPERATIONS_ERROR;
+                       }
+
+                       /* verify the link source exists */
+                       ret = replmd_get_la_entry_source(module, la_entry,
+                                                        tmp_ctx, &attr,
+                                                        &src_msg);
+
+                       /*
+                        * When we fail to find the source object, the error
+                        * code we pass back here is really important. It flags
+                        * back to the callers to retry this request with
+                        * DRSUAPI_DRS_GET_ANC. This case should never happen
+                        * if we're replicating from a Samba DC, but it is
+                        * needed to talk to a Windows DC
+                        */
+                       if (ret == LDB_ERR_NO_SUCH_OBJECT) {
+                               WERROR err = WERR_DS_DRA_MISSING_PARENT;
+                               ret = replmd_replicated_request_werror(ar,
+                                                                      err);
+                               break;
+                       }
+               }
+
+               ret = replmd_verify_link_target(ar, tmp_ctx, la_entry,
+                                               src_msg->dn, attr);
                if (ret != LDB_SUCCESS) {
                        break;
                }
 
-               DLIST_ADD(replmd_private->la_list, la_entry);
+               /* group the links together by source-object for efficiency */
+               if (new_srcobj) {
+                       la_group = talloc_zero(replmd_private->la_ctx,
+                                              struct la_group);
+                       if (la_group == NULL) {
+                               ldb_oom(ldb);
+                               return LDB_ERR_OPERATIONS_ERROR;
+                       }
+                       DLIST_ADD(replmd_private->la_list, la_group);
+               }
+               DLIST_ADD(la_group->la_entries, la_entry);
+               replmd_private->total_links++;
        }
 
+       TALLOC_FREE(tmp_ctx);
        return ret;
 }
 
@@ -7236,26 +7513,23 @@ static int replmd_check_target_exists(struct ldb_module *module,
 }
 
 /**
- * Extracts the key details about the source/target object for a
+ * Extracts the key details about the source object for a
  * linked-attribute entry.
  * This returns the following details:
  * @param ret_attr the schema details for the linked attribute
  * @param source_msg the search result for the source object
- * @param target_dsdb_dn the unpacked DN info for the target object
  */
-static int replmd_extract_la_entry_details(struct ldb_module *module,
-                                          struct la_entry *la_entry,
-                                          TALLOC_CTX *mem_ctx,
-                                          const struct dsdb_attribute **ret_attr,
-                                          struct ldb_message **source_msg,
-                                          struct dsdb_dn **target_dsdb_dn)
+static int replmd_get_la_entry_source(struct ldb_module *module,
+                                     struct la_entry *la_entry,
+                                     TALLOC_CTX *mem_ctx,
+                                     const struct dsdb_attribute **ret_attr,
+                                     struct ldb_message **source_msg)
 {
        struct drsuapi_DsReplicaLinkedAttribute *la = la_entry->la;
        struct ldb_context *ldb = ldb_module_get_ctx(module);
        const struct dsdb_schema *schema = dsdb_get_schema(ldb, mem_ctx);
        int ret;
        const struct dsdb_attribute *attr;
-       WERROR status;
        struct ldb_result *res;
        const char *attrs[4];
 
@@ -7346,54 +7620,39 @@ linked_attributes[0]:
        }
 
        *source_msg = res->msgs[0];
-
-       /* the value blob for the attribute holds the target object DN */
-       status = dsdb_dn_la_from_blob(ldb, attr, schema, mem_ctx, la->value.blob, target_dsdb_dn);
-       if (!W_ERROR_IS_OK(status)) {
-               ldb_asprintf_errstring(ldb, "Failed to parsed linked attribute blob for %s on %s - %s\n",
-                                      attr->lDAPDisplayName,
-                                      ldb_dn_get_linearized(res->msgs[0]->dn),
-                                      win_errstr(status));
-               return LDB_ERR_OPERATIONS_ERROR;
-       }
-
        *ret_attr = attr;
 
        return LDB_SUCCESS;
 }
 
 /**
- * Verifies the source and target objects are known for a linked attribute
+ * Verifies the target object is known for a linked attribute
  */
-static int replmd_verify_linked_attribute(struct replmd_replicated_request *ar,
-                                         struct la_entry *la)
+static int replmd_verify_link_target(struct replmd_replicated_request *ar,
+                                    TALLOC_CTX *mem_ctx,
+                                    struct la_entry *la_entry,
+                                    struct ldb_dn *src_dn,
+                                    const struct dsdb_attribute *attr)
 {
        int ret = LDB_SUCCESS;
-       TALLOC_CTX *tmp_ctx = talloc_new(la);
        struct ldb_module *module = ar->module;
-       struct ldb_message *src_msg;
-       const struct dsdb_attribute *attr;
-       struct dsdb_dn *tgt_dsdb_dn;
+       struct dsdb_dn *tgt_dsdb_dn = NULL;
        struct GUID guid = GUID_zero();
        bool dummy;
+       WERROR status;
+       struct ldb_context *ldb = ldb_module_get_ctx(module);
+       struct drsuapi_DsReplicaLinkedAttribute *la = la_entry->la;
+       const struct dsdb_schema *schema = dsdb_get_schema(ldb, mem_ctx);
 
-       ret = replmd_extract_la_entry_details(module, la, tmp_ctx, &attr,
-                                             &src_msg, &tgt_dsdb_dn);
-
-       /*
-        * When we fail to find the source object, the error code we pass
-        * back here is really important. It flags back to the callers to
-        * retry this request with DRSUAPI_DRS_GET_ANC. This case should
-        * never happen if we're replicating from a Samba DC, but it is
-        * needed to talk to a Windows DC
-        */
-       if (ret == LDB_ERR_NO_SUCH_OBJECT) {
-               ret = replmd_replicated_request_werror(ar, WERR_DS_DRA_MISSING_PARENT);
-       }
-
-       if (ret != LDB_SUCCESS) {
-               talloc_free(tmp_ctx);
-               return ret;
+       /* the value blob for the attribute holds the target object DN */
+       status = dsdb_dn_la_from_blob(ldb, attr, schema, mem_ctx,
+                                     la->value.blob, &tgt_dsdb_dn);
+       if (!W_ERROR_IS_OK(status)) {
+               ldb_asprintf_errstring(ldb, "Failed to parsed linked attribute blob for %s on %s - %s\n",
+                                      attr->lDAPDisplayName,
+                                      ldb_dn_get_linearized(src_dn),
+                                      win_errstr(status));
+               return LDB_ERR_OPERATIONS_ERROR;
        }
 
        /*
@@ -7401,12 +7660,11 @@ static int replmd_verify_linked_attribute(struct replmd_replicated_request *ar,
         * objects, or we know the target is up-to-date. If either case, we
         * still continue even if the target doesn't exist
         */
-       if ((la->dsdb_repl_flags & (DSDB_REPL_FLAG_OBJECT_SUBSET |
-                                   DSDB_REPL_FLAG_TARGETS_UPTODATE)) == 0) {
+       if ((la_entry->dsdb_repl_flags & (DSDB_REPL_FLAG_OBJECT_SUBSET |
+                                         DSDB_REPL_FLAG_TARGETS_UPTODATE)) == 0) {
 
-               ret = replmd_check_target_exists(module, tgt_dsdb_dn, la,
-                                                src_msg->dn, false, &guid,
-                                                &dummy);
+               ret = replmd_check_target_exists(module, tgt_dsdb_dn, la_entry,
+                                                src_dn, false, &guid, &dummy);
        }
 
        /*
@@ -7418,7 +7676,6 @@ static int replmd_verify_linked_attribute(struct replmd_replicated_request *ar,
                ret = replmd_replicated_request_werror(ar, WERR_DS_DRA_RECYCLED_TARGET);
        }
 
-       talloc_free(tmp_ctx);
        return ret;
 }
 
@@ -7537,6 +7794,14 @@ static int replmd_delete_link_value(struct ldb_module *module,
        /* if the existing link is active, remove its backlink */
        if (is_active) {
 
+               /*
+                * NOTE WELL: After this we will never (at runtime) be
+                * able to find this forward link (for instant
+                * removal) if/when the link target is deleted.
+                *
+                * We have dbcheck rules to cover this and cope otherwise
+                * by filtering at runtime (i.e. in the extended_dn module).
+                */
                ret = replmd_add_backlink(module, replmd_private, schema,
                                          src_obj_dn, target_guid, false,
                                          attr, NULL);
@@ -7656,18 +7921,18 @@ static int replmd_check_singleval_la_conflict(struct ldb_module *module,
   process one linked attribute structure
  */
 static int replmd_process_linked_attribute(struct ldb_module *module,
+                                          TALLOC_CTX *mem_ctx,
                                           struct replmd_private *replmd_private,
+                                          struct ldb_message *msg,
+                                          const struct dsdb_attribute *attr,
                                           struct la_entry *la_entry,
                                           struct ldb_request *parent)
 {
        struct drsuapi_DsReplicaLinkedAttribute *la = la_entry->la;
        struct ldb_context *ldb = ldb_module_get_ctx(module);
-       struct ldb_message *msg;
-       TALLOC_CTX *tmp_ctx = talloc_new(la_entry);
-       const struct dsdb_schema *schema = dsdb_get_schema(ldb, tmp_ctx);
+       const struct dsdb_schema *schema = dsdb_get_schema(ldb, mem_ctx);
        int ret;
-       const struct dsdb_attribute *attr;
-       struct dsdb_dn *dsdb_dn;
+       struct dsdb_dn *dsdb_dn = NULL;
        uint64_t seq_num = 0;
        struct ldb_message_element *old_el;
        time_t t = time(NULL);
@@ -7675,64 +7940,27 @@ static int replmd_process_linked_attribute(struct ldb_module *module,
        struct GUID guid = GUID_zero();
        bool active = (la->flags & DRSUAPI_DS_LINKED_ATTRIBUTE_FLAG_ACTIVE)?true:false;
        bool ignore_link;
-       enum deletion_state deletion_state = OBJECT_NOT_DELETED;
        struct dsdb_dn *old_dsdb_dn = NULL;
        struct ldb_val *val_to_update = NULL;
        bool add_as_inactive = false;
+       WERROR status;
 
-       /*
-        * get the attribute being modified, the search result for the source object,
-        * and the target object's DN details
-        */
-       ret = replmd_extract_la_entry_details(module, la_entry, tmp_ctx, &attr,
-                                             &msg, &dsdb_dn);
-
-       if (ret != LDB_SUCCESS) {
-               talloc_free(tmp_ctx);
-               return ret;
-       }
-
-       /*
-        * Check for deleted objects per MS-DRSR 4.1.10.6.14
-        * ProcessLinkValue, because link updates are not applied to
-        * recycled and tombstone objects.  We don't have to delete
-        * any existing link, that should have happened when the
-        * object deletion was replicated or initiated.
-        *
-        * This needs isDeleted and isRecycled to be included as
-        * attributes in the search and so in msg if set.
-        */
-       replmd_deletion_state(module, msg, &deletion_state, NULL);
-
-       if (deletion_state >= OBJECT_RECYCLED) {
-               talloc_free(tmp_ctx);
-               return LDB_SUCCESS;
+       /* the value blob for the attribute holds the target object DN */
+       status = dsdb_dn_la_from_blob(ldb, attr, schema, mem_ctx,
+                                     la->value.blob, &dsdb_dn);
+       if (!W_ERROR_IS_OK(status)) {
+               ldb_asprintf_errstring(ldb, "Failed to parsed linked attribute blob for %s on %s - %s\n",
+                                      attr->lDAPDisplayName,
+                                      ldb_dn_get_linearized(msg->dn),
+                                      win_errstr(status));
+               return LDB_ERR_OPERATIONS_ERROR;
        }
 
-       /*
-        * Now that we know the deletion_state, remove the extra
-        * attributes added for that purpose.  We need to do this
-        * otherwise in the case of isDeleted: FALSE the modify will
-        * fail with:
-        *
-        * Failed to apply linked attribute change 'attribute 'isDeleted':
-        * invalid modify flags on
-        * 'CN=g1_1527570609273,CN=Users,DC=samba,DC=example,DC=com':
-        * 0x0'
-        *
-        * This is becaue isDeleted is a Boolean, so FALSE is a
-        * legitimate value (set by Samba's deletetest.py)
-        */
-
-       ldb_msg_remove_attr(msg, "isDeleted");
-       ldb_msg_remove_attr(msg, "isRecycled");
-
        old_el = ldb_msg_find_element(msg, attr->lDAPDisplayName);
        if (old_el == NULL) {
                ret = ldb_msg_add_empty(msg, attr->lDAPDisplayName, LDB_FLAG_MOD_REPLACE, &old_el);
                if (ret != LDB_SUCCESS) {
                        ldb_module_oom(module);
-                       talloc_free(tmp_ctx);
                        return LDB_ERR_OPERATIONS_ERROR;
                }
        } else {
@@ -7740,11 +7968,10 @@ static int replmd_process_linked_attribute(struct ldb_module *module,
        }
 
        /* parse the existing links */
-       ret = get_parsed_dns_trusted(module, replmd_private, tmp_ctx, old_el, &pdn_list,
+       ret = get_parsed_dns_trusted(module, replmd_private, mem_ctx, old_el, &pdn_list,
                                     attr->syntax->ldap_oid, parent);
 
        if (ret != LDB_SUCCESS) {
-               talloc_free(tmp_ctx);
                return ret;
        }
 
@@ -7752,7 +7979,6 @@ static int replmd_process_linked_attribute(struct ldb_module *module,
                                         true, &guid, &ignore_link);
 
        if (ret != LDB_SUCCESS) {
-               talloc_free(tmp_ctx);
                return ret;
        }
 
@@ -7761,7 +7987,6 @@ static int replmd_process_linked_attribute(struct ldb_module *module,
         * OK to ignore the linked attribute
         */
        if (ignore_link) {
-               talloc_free(tmp_ctx);
                return ret;
        }
 
@@ -7774,22 +7999,19 @@ static int replmd_process_linked_attribute(struct ldb_module *module,
                             attr->syntax->ldap_oid,
                             true);
        if (ret != LDB_SUCCESS) {
-               talloc_free(tmp_ctx);
                return ret;
        }
 
        if (!replmd_link_update_is_newer(pdn, la)) {
                DEBUG(3,("Discarding older DRS linked attribute update to %s on %s from %s\n",
                         old_el->name, ldb_dn_get_linearized(msg->dn),
-                        GUID_string(tmp_ctx, &la->meta_data.originating_invocation_id)));
-               talloc_free(tmp_ctx);
+                        GUID_string(mem_ctx, &la->meta_data.originating_invocation_id)));
                return LDB_SUCCESS;
        }
 
        /* get a seq_num for this change */
        ret = ldb_sequence_number(ldb, LDB_SEQ_NEXT, &seq_num);
        if (ret != LDB_SUCCESS) {
-               talloc_free(tmp_ctx);
                return ret;
        }
 
@@ -7799,13 +8021,12 @@ static int replmd_process_linked_attribute(struct ldb_module *module,
         */
        if (active) {
                ret = replmd_check_singleval_la_conflict(module, replmd_private,
-                                                        tmp_ctx, msg->dn, la,
+                                                        mem_ctx, msg->dn, la,
                                                         dsdb_dn, pdn, pdn_list,
                                                         old_el, schema, attr,
                                                         seq_num,
                                                         &add_as_inactive);
                if (ret != LDB_SUCCESS) {
-                       talloc_free(tmp_ctx);
                        return ret;
                }
        }
@@ -7821,7 +8042,6 @@ static int replmd_process_linked_attribute(struct ldb_module *module,
                                                  &pdn->guid, false, attr,
                                                  parent);
                        if (ret != LDB_SUCCESS) {
-                               talloc_free(tmp_ctx);
                                return ret;
                        }
                }
@@ -7840,7 +8060,7 @@ static int replmd_process_linked_attribute(struct ldb_module *module,
                        offset = old_el->num_values;
                } else {
                        if (next->dsdb_dn == NULL) {
-                               ret = really_parse_trusted_dn(tmp_ctx, ldb, next,
+                               ret = really_parse_trusted_dn(mem_ctx, ldb, next,
                                                              attr->syntax->ldap_oid);
                                if (ret != LDB_SUCCESS) {
                                        return ret;
@@ -7848,7 +8068,6 @@ static int replmd_process_linked_attribute(struct ldb_module *module,
                        }
                        offset = next - pdn_list;
                        if (offset > old_el->num_values) {
-                               talloc_free(tmp_ctx);
                                return LDB_ERR_OPERATIONS_ERROR;
                        }
                }
@@ -7872,14 +8091,13 @@ static int replmd_process_linked_attribute(struct ldb_module *module,
        }
 
        /* set the link attribute's value to the info that was received */
-       ret = replmd_set_la_val(tmp_ctx, val_to_update, dsdb_dn, old_dsdb_dn,
+       ret = replmd_set_la_val(mem_ctx, val_to_update, dsdb_dn, old_dsdb_dn,
                                &la->meta_data.originating_invocation_id,
                                la->meta_data.originating_usn, seq_num,
                                la->meta_data.originating_change_time,
                                la->meta_data.version,
                                !active);
        if (ret != LDB_SUCCESS) {
-               talloc_free(tmp_ctx);
                return ret;
        }
 
@@ -7892,7 +8110,6 @@ static int replmd_process_linked_attribute(struct ldb_module *module,
                                               val_to_update);
 
                if (ret != LDB_SUCCESS) {
-                       talloc_free(tmp_ctx);
                        return ret;
                }
 
@@ -7905,55 +8122,38 @@ static int replmd_process_linked_attribute(struct ldb_module *module,
                                          &guid, true, attr,
                                          parent);
                if (ret != LDB_SUCCESS) {
-                       talloc_free(tmp_ctx);
                        return ret;
                }
        }
 
        /* we only change whenChanged and uSNChanged if the seq_num
           has changed */
+       ldb_msg_remove_attr(msg, "whenChanged");
+       ldb_msg_remove_attr(msg, "uSNChanged");
        ret = add_time_element(msg, "whenChanged", t);
        if (ret != LDB_SUCCESS) {
-               talloc_free(tmp_ctx);
                ldb_operr(ldb);
                return ret;
        }
 
        ret = add_uint64_element(ldb, msg, "uSNChanged", seq_num);
        if (ret != LDB_SUCCESS) {
-               talloc_free(tmp_ctx);
                ldb_operr(ldb);
                return ret;
        }
 
        old_el = ldb_msg_find_element(msg, attr->lDAPDisplayName);
        if (old_el == NULL) {
-               talloc_free(tmp_ctx);
                return ldb_operr(ldb);
        }
 
        ret = dsdb_check_single_valued_link(attr, old_el);
        if (ret != LDB_SUCCESS) {
-               talloc_free(tmp_ctx);
                return ret;
        }
 
        old_el->flags |= LDB_FLAG_INTERNAL_DISABLE_SINGLE_VALUE_CHECK;
 
-       ret = linked_attr_modify(module, msg, parent);
-       if (ret != LDB_SUCCESS) {
-               ldb_debug(ldb, LDB_DEBUG_WARNING, "Failed to apply linked attribute change '%s'\n%s\n",
-                         ldb_errstring(ldb),
-                         ldb_ldif_message_redacted_string(ldb,
-                                                          tmp_ctx,
-                                                          LDB_CHANGETYPE_MODIFY,
-                                                          msg));
-               talloc_free(tmp_ctx);
-               return ret;
-       }
-
-       talloc_free(tmp_ctx);
-
        return ret;
 }
 
@@ -7994,6 +8194,106 @@ static int replmd_start_transaction(struct ldb_module *module)
        return ldb_next_start_trans(module);
 }
 
+/**
+ * Processes a group of linked attributes that apply to the same source-object
+ * and attribute-ID
+ */
+static int replmd_process_la_group(struct ldb_module *module,
+                                  struct replmd_private *replmd_private,
+                                  struct la_group *la_group)
+{
+       struct la_entry *la = NULL;
+       struct la_entry *prev = NULL;
+       int ret;
+       TALLOC_CTX *tmp_ctx = talloc_new(la_group);
+       struct la_entry *first_la = DLIST_TAIL(la_group->la_entries);
+       struct ldb_message *msg = NULL;
+       enum deletion_state deletion_state = OBJECT_NOT_DELETED;
+       struct ldb_context *ldb = ldb_module_get_ctx(module);
+       const struct dsdb_attribute *attr = NULL;
+
+       /*
+        * get the attribute being modified and the search result for the
+        * source object
+        */
+       ret = replmd_get_la_entry_source(module, first_la, tmp_ctx, &attr,
+                                        &msg);
+
+       if (ret != LDB_SUCCESS) {
+               return ret;
+       }
+
+       /*
+        * Check for deleted objects per MS-DRSR 4.1.10.6.14
+        * ProcessLinkValue, because link updates are not applied to
+        * recycled and tombstone objects.  We don't have to delete
+        * any existing link, that should have happened when the
+        * object deletion was replicated or initiated.
+        *
+        * This needs isDeleted and isRecycled to be included as
+        * attributes in the search and so in msg if set.
+        */
+       replmd_deletion_state(module, msg, &deletion_state, NULL);
+
+       if (deletion_state >= OBJECT_RECYCLED) {
+               TALLOC_FREE(tmp_ctx);
+               return LDB_SUCCESS;
+       }
+
+       /*
+        * Now that we know the deletion_state, remove the extra
+        * attributes added for that purpose.  We need to do this
+        * otherwise in the case of isDeleted: FALSE the modify will
+        * fail with:
+        *
+        * Failed to apply linked attribute change 'attribute 'isDeleted':
+        * invalid modify flags on
+        * 'CN=g1_1527570609273,CN=Users,DC=samba,DC=example,DC=com':
+        * 0x0'
+        *
+        * This is becaue isDeleted is a Boolean, so FALSE is a
+        * legitimate value (set by Samba's deletetest.py)
+        */
+       ldb_msg_remove_attr(msg, "isDeleted");
+       ldb_msg_remove_attr(msg, "isRecycled");
+
+       /* go through and process the link targets for this source object */
+       for (la = DLIST_TAIL(la_group->la_entries); la; la=prev) {
+               prev = DLIST_PREV(la);
+               DLIST_REMOVE(la_group->la_entries, la);
+               ret = replmd_process_linked_attribute(module, tmp_ctx,
+                                                     replmd_private,
+                                                     msg, attr, la, NULL);
+               if (ret != LDB_SUCCESS) {
+                       replmd_txn_cleanup(replmd_private);
+                       return ret;
+               }
+
+               if ((++replmd_private->num_processed % 8192) == 0) {
+                       DBG_NOTICE("Processed %u/%u linked attributes\n",
+                                  replmd_private->num_processed,
+                                  replmd_private->total_links);
+               }
+       }
+
+       /* apply the link changes to the source object */
+       ret = linked_attr_modify(module, msg, NULL);
+       if (ret != LDB_SUCCESS) {
+               ldb_debug(ldb, LDB_DEBUG_WARNING,
+                         "Failed to apply linked attribute change '%s'\n%s\n",
+                         ldb_errstring(ldb),
+                         ldb_ldif_message_redacted_string(ldb,
+                                                          tmp_ctx,
+                                                          LDB_CHANGETYPE_MODIFY,
+                                                          msg));
+               TALLOC_FREE(tmp_ctx);
+               return ret;
+       }
+
+       TALLOC_FREE(tmp_ctx);
+       return LDB_SUCCESS;
+}
+
 /*
   on prepare commit we loop over our queued la_context structures and
   apply each of them
@@ -8002,21 +8302,31 @@ static int replmd_prepare_commit(struct ldb_module *module)
 {
        struct replmd_private *replmd_private =
                talloc_get_type(ldb_module_get_private(module), struct replmd_private);
-       struct la_entry *la, *prev;
+       struct la_group *la_group, *prev;
        int ret;
 
+       if (replmd_private->la_list != NULL) {
+               DBG_NOTICE("Processing linked attributes\n");
+       }
+
        /*
         * Walk the list of linked attributes from DRS replication.
         *
         * We walk backwards, to do the first entry first, as we
         * added the entries with DLIST_ADD() which puts them at the
         * start of the list
+        *
+        * Links are grouped together so we process links for the same
+        * source object in one go.
         */
-       for (la = DLIST_TAIL(replmd_private->la_list); la; la=prev) {
-               prev = DLIST_PREV(la);
-               DLIST_REMOVE(replmd_private->la_list, la);
-               ret = replmd_process_linked_attribute(module, replmd_private,
-                                                     la, NULL);
+       for (la_group = DLIST_TAIL(replmd_private->la_list);
+            la_group != NULL;
+            la_group = prev) {
+
+               prev = DLIST_PREV(la_group);
+               DLIST_REMOVE(replmd_private->la_list, la_group);
+               ret = replmd_process_la_group(module, replmd_private,
+                                             la_group);
                if (ret != LDB_SUCCESS) {
                        replmd_txn_cleanup(replmd_private);
                        return ret;