r20705: store the "replUpToDateVector" attribute in DSDB_EXTENDED_REPLICATED_OBJECTS
authorStefan Metzmacher <metze@samba.org>
Fri, 12 Jan 2007 13:17:25 +0000 (13:17 +0000)
committerGerald (Jerry) Carter <jerry@samba.org>
Wed, 10 Oct 2007 19:40:28 +0000 (14:40 -0500)
metze
(This used to be commit c9e7a58f6a16dfa28323fd0fd01ad6ee516c51b0)

source4/dsdb/repl/replicated_objects.c
source4/dsdb/samdb/ldb_modules/repl_meta_data.c
source4/dsdb/samdb/samdb.h
source4/torture/libnet/libnet_BecomeDC.c

index 0ccb46b9c8988cdef53383b0452c23c43fbfa84e..da3f6d0461ee485bfe8a2d3e2078c00121ab56a2 100644 (file)
@@ -183,6 +183,7 @@ WERROR dsdb_extended_replicated_objects_commit(struct ldb_context *ldb,
                                               const struct drsuapi_DsReplicaObjectListItemEx *first_object,
                                               uint32_t linked_attributes_count,
                                               const struct drsuapi_DsReplicaLinkedAttribute *linked_attributes,
+                                              const struct GUID *source_dsa_invocation_id,
                                               const struct drsuapi_DsReplicaHighWaterMark *new_highwatermark,
                                               const struct drsuapi_DsReplicaCursor2CtrEx *uptodateness_vector,
                                               TALLOC_CTX *mem_ctx,
@@ -204,6 +205,10 @@ WERROR dsdb_extended_replicated_objects_commit(struct ldb_context *ldb,
        out->partition_dn = ldb_dn_new(out, ldb, partition_dn);
        W_ERROR_HAVE_NO_MEMORY(out->partition_dn);
 
+       out->source_dsa_invocation_id   = source_dsa_invocation_id;
+       out->new_highwatermark          = new_highwatermark;
+       out->uptodateness_vector        = uptodateness_vector;
+
        out->num_objects        = object_count;
        out->objects            = talloc_array(out,
                                               struct dsdb_extended_replicated_object,
index 487a6146af3f745d9ddd7a74f484ebf32fa9c128..7998d5466c4cac15ea4d9bb5d9bbe0349bb81363 100644 (file)
@@ -45,6 +45,7 @@
 #include "lib/ldb/include/ldb_private.h"
 #include "dsdb/samdb/samdb.h"
 #include "librpc/gen_ndr/ndr_misc.h"
+#include "librpc/gen_ndr/ndr_drsuapi.h"
 #include "librpc/gen_ndr/ndr_drsblobs.h"
 
 struct replmd_replicated_request {
@@ -606,9 +607,11 @@ static int replmd_replicated_apply_search(struct replmd_replicated_request *ar)
 
 static int replmd_replicated_apply_next(struct replmd_replicated_request *ar)
 {
+#ifdef REPLMD_FULL_ASYNC /* TODO: active this code when ldb support full async code */ 
        if (ar->index_current >= ar->objs->num_objects) {
-               return replmd_replicated_request_done(ar);
+               return replmd_replicated_uptodate_vector(ar);
        }
+#endif
 
        ar->sub.mem_ctx = talloc_new(ar);
        if (!ar->sub.mem_ctx) return replmd_replicated_request_werror(ar, WERR_NOMEM);
@@ -616,6 +619,331 @@ static int replmd_replicated_apply_next(struct replmd_replicated_request *ar)
        return replmd_replicated_apply_search(ar);
 }
 
+static int replmd_replicated_uptodate_modify_callback(struct ldb_context *ldb,
+                                                     void *private_data,
+                                                     struct ldb_reply *ares)
+{
+#ifdef REPLMD_FULL_ASYNC /* TODO: active this code when ldb support full async code */ 
+       struct replmd_replicated_request *ar = talloc_get_type(private_data,
+                                              struct replmd_replicated_request);
+
+       ar->sub.change_ret = ldb_wait(ar->sub.search_req->handle, LDB_WAIT_ALL);
+       if (ar->sub.change_ret != LDB_SUCCESS) {
+               return replmd_replicated_request_error(ar, ar->sub.change_ret);
+       }
+
+       talloc_free(ar->sub.mem_ctx);
+       ZERO_STRUCT(ar->sub);
+
+       return replmd_replicated_request_done(ar);
+#else
+       return LDB_SUCCESS;
+#endif
+}
+
+static int replmd_replicated_uptodate_modify(struct replmd_replicated_request *ar)
+{
+       NTSTATUS nt_status;
+       struct ldb_message *msg;
+       struct replUpToDateVectorBlob ouv;
+       const struct ldb_val *ouv_value;
+       const struct drsuapi_DsReplicaCursor2CtrEx *ruv;
+       struct replUpToDateVectorBlob nuv;
+       struct ldb_val nuv_value;
+       struct ldb_message_element *nuv_el = NULL;
+       struct GUID *our_invocation_id;
+       uint32_t i,j,ni=0;
+       uint64_t seq_num;
+       bool found = false;
+       time_t t = time(NULL);
+       NTTIME now;
+       int ret;
+
+       ruv = ar->objs->uptodateness_vector;
+       ZERO_STRUCT(ouv);
+       ouv.version = 2;
+       ZERO_STRUCT(nuv);
+       nuv.version = 2;
+
+       unix_to_nt_time(&now, t);
+
+       /* 
+        * we use the next sequence number for our own highest_usn
+        * because we will do a modify request and this will increment
+        * our highest_usn
+        */
+       ret = ldb_sequence_number(ar->module->ldb, LDB_SEQ_NEXT, &seq_num);
+       if (ret != LDB_SUCCESS) {
+               return replmd_replicated_request_error(ar, ret);
+       }
+
+       ouv_value = ldb_msg_find_ldb_val(ar->sub.search_msg, "replUpToDateVector");
+       if (ouv_value) {
+               nt_status = ndr_pull_struct_blob(ouv_value, ar->sub.mem_ctx, &ouv,
+                                                (ndr_pull_flags_fn_t)ndr_pull_replUpToDateVectorBlob);
+               if (!NT_STATUS_IS_OK(nt_status)) {
+                       return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
+               }
+
+               if (ouv.version != 2) {
+                       return replmd_replicated_request_werror(ar, WERR_DS_DRA_INTERNAL_ERROR);
+               }
+       }
+
+       /*
+        * the new uptodateness vector will at least
+        * contain 2 entries, one for the source_dsa and one the local server
+        *
+        * plus optional values from our old vector and the one from the source_dsa
+        */
+       nuv.ctr.ctr2.count = 2 + ouv.ctr.ctr2.count;
+       if (ruv) nuv.ctr.ctr2.count += ruv->count;
+       nuv.ctr.ctr2.cursors = talloc_array(ar->sub.mem_ctx,
+                                           struct drsuapi_DsReplicaCursor2,
+                                           nuv.ctr.ctr2.count);
+       if (!nuv.ctr.ctr2.cursors) return replmd_replicated_request_werror(ar, WERR_NOMEM);
+
+       /* first copy the old vector */
+       for (i=0; i < ouv.ctr.ctr2.count; i++) {
+               nuv.ctr.ctr2.cursors[ni] = ouv.ctr.ctr2.cursors[i];
+               ni++;
+       }
+
+       /* merge in the source_dsa vector is available */
+       for (i=0; (ruv && i < ruv->count); i++) {
+               found = false;
+
+               for (j=0; j < ni; j++) {
+                       if (!GUID_equal(&ruv->cursors[i].source_dsa_invocation_id,
+                                       &nuv.ctr.ctr2.cursors[j].source_dsa_invocation_id)) {
+                               continue;
+                       }
+
+                       found = true;
+
+                       /*
+                        * we update only the highest_usn and not the latest_sync_success time,
+                        * because the last success stands for direct replication
+                        */
+                       if (ruv->cursors[i].highest_usn > nuv.ctr.ctr2.cursors[j].highest_usn) {
+                               nuv.ctr.ctr2.cursors[j].highest_usn = ruv->cursors[i].highest_usn;
+                       }
+                       break;                  
+               }
+
+               if (found) continue;
+
+               /* if it's not there yet, add it */
+               nuv.ctr.ctr2.cursors[ni] = ruv->cursors[i];
+               ni++;
+       }
+
+       /*
+        * merge in the current highwatermark for the source_dsa
+        */
+       found = false;
+       for (j=0; j < ni; j++) {
+               if (!GUID_equal(ar->objs->source_dsa_invocation_id,
+                               &nuv.ctr.ctr2.cursors[j].source_dsa_invocation_id)) {
+                       continue;
+               }
+
+               found = true;
+
+               /*
+                * here we update the highest_usn and last_sync_success time
+                * because we're directly replicating from the source_dsa
+                *
+                * and use the tmp_highest_usn because this is what we have just applied
+                * to our ldb
+                */
+               nuv.ctr.ctr2.cursors[j].highest_usn             = ar->objs->new_highwatermark->tmp_highest_usn;
+               nuv.ctr.ctr2.cursors[j].last_sync_success       = now;
+               break;
+       }
+       if (!found) {
+               /*
+                * here we update the highest_usn and last_sync_success time
+                * because we're directly replicating from the source_dsa
+                *
+                * and use the tmp_highest_usn because this is what we have just applied
+                * to our ldb
+                */
+               nuv.ctr.ctr2.cursors[ni].source_dsa_invocation_id= *ar->objs->source_dsa_invocation_id;
+               nuv.ctr.ctr2.cursors[ni].highest_usn            = ar->objs->new_highwatermark->tmp_highest_usn;
+               nuv.ctr.ctr2.cursors[ni].last_sync_success      = now;
+               ni++;
+       }
+
+       /*
+        * merge our own current values if we have a invocation_id already
+        * attached to the ldb
+        */
+       our_invocation_id = samdb_ntds_invocation_id(ar->module->ldb);
+       if (our_invocation_id) {
+               found = false;
+               for (j=0; j < ni; j++) {
+                       if (!GUID_equal(our_invocation_id,
+                                       &nuv.ctr.ctr2.cursors[j].source_dsa_invocation_id)) {
+                               continue;
+                       }
+
+                       found = true;
+
+                       /*
+                        * here we update the highest_usn and last_sync_success time
+                        * because it's our own entry
+                        */
+                       nuv.ctr.ctr2.cursors[j].highest_usn             = seq_num;
+                       nuv.ctr.ctr2.cursors[j].last_sync_success       = now;
+                       break;
+               }
+               if (!found) {
+                       /*
+                        * here we update the highest_usn and last_sync_success time
+                        * because it's our own entry
+                        */
+                       nuv.ctr.ctr2.cursors[ni].source_dsa_invocation_id= *our_invocation_id;
+                       nuv.ctr.ctr2.cursors[ni].highest_usn            = seq_num;
+                       nuv.ctr.ctr2.cursors[ni].last_sync_success      = now;
+                       ni++;
+               }
+       }
+
+       /*
+        * finally correct the size of the cursors array
+        */
+       nuv.ctr.ctr2.count = ni;
+
+       /*
+        * create the change ldb_message
+        */
+       msg = ldb_msg_new(ar->sub.mem_ctx);
+       if (!msg) return replmd_replicated_request_werror(ar, WERR_NOMEM);
+       msg->dn = ar->sub.search_msg->dn;
+
+       nt_status = ndr_push_struct_blob(&nuv_value, msg, &nuv,
+                                        (ndr_push_flags_fn_t)ndr_push_replUpToDateVectorBlob);
+       if (!NT_STATUS_IS_OK(nt_status)) {
+               return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
+       }
+       ret = ldb_msg_add_value(msg, "replUpToDateVector", &nuv_value, &nuv_el);
+       if (ret != LDB_SUCCESS) {
+               return replmd_replicated_request_error(ar, ret);
+       }
+       nuv_el->flags = LDB_FLAG_MOD_REPLACE;
+
+       ret = ldb_build_mod_req(&ar->sub.change_req,
+                               ar->module->ldb,
+                               ar->sub.mem_ctx,
+                               msg,
+                               NULL,
+                               ar,
+                               replmd_replicated_uptodate_modify_callback);
+       if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
+
+#ifdef REPLMD_FULL_ASYNC /* TODO: active this code when ldb support full async code */ 
+       return ldb_next_request(ar->module, ar->sub.change_req);
+#else
+       ret = ldb_next_request(ar->module, ar->sub.change_req);
+       if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
+
+       ar->sub.change_ret = ldb_wait(ar->sub.search_req->handle, LDB_WAIT_ALL);
+       if (ar->sub.change_ret != LDB_SUCCESS) {
+               return replmd_replicated_request_error(ar, ar->sub.change_ret);
+       }
+
+       talloc_free(ar->sub.mem_ctx);
+       ZERO_STRUCT(ar->sub);
+
+       return replmd_replicated_request_done(ar);
+#endif
+}
+
+static int replmd_replicated_uptodate_search_callback(struct ldb_context *ldb,
+                                                     void *private_data,
+                                                     struct ldb_reply *ares)
+{
+       struct replmd_replicated_request *ar = talloc_get_type(private_data,
+                                              struct replmd_replicated_request);
+       bool is_done = false;
+
+       switch (ares->type) {
+       case LDB_REPLY_ENTRY:
+               ar->sub.search_msg = talloc_steal(ar->sub.mem_ctx, ares->message);
+               break;
+       case LDB_REPLY_REFERRAL:
+               /* we ignore referrals */
+               break;
+       case LDB_REPLY_EXTENDED:
+       case LDB_REPLY_DONE:
+               is_done = true;
+       }
+
+       talloc_free(ares);
+
+#ifdef REPLMD_FULL_ASYNC /* TODO: active this code when ldb support full async code */ 
+       if (is_done) {
+               ar->sub.search_ret = ldb_wait(ar->sub.search_req->handle, LDB_WAIT_ALL);
+               if (ar->sub.search_ret != LDB_SUCCESS) {
+                       return replmd_replicated_request_error(ar, ar->sub.search_ret);
+               }
+               if (!ar->sub.search_msg) {
+                       return replmd_replicated_request_werror(ar, WERR_DS_DRA_INTERNAL_ERROR);
+               }
+
+               return replmd_replicated_uptodate_modify(ar);
+       }
+#endif
+       return LDB_SUCCESS;
+}
+
+static int replmd_replicated_uptodate_search(struct replmd_replicated_request *ar)
+{
+       int ret;
+       static const char *attrs[] = {
+               "replUpToDateVector",
+               NULL
+       };
+
+       ret = ldb_build_search_req(&ar->sub.search_req,
+                                  ar->module->ldb,
+                                  ar->sub.mem_ctx,
+                                  ar->objs->partition_dn,
+                                  LDB_SCOPE_BASE,
+                                  "(objectClass=*)",
+                                  attrs,
+                                  NULL,
+                                  ar,
+                                  replmd_replicated_uptodate_search_callback);
+       if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
+
+#ifdef REPLMD_FULL_ASYNC /* TODO: active this code when ldb support full async code */ 
+       return ldb_next_request(ar->module, ar->sub.search_req);
+#else
+       ret = ldb_next_request(ar->module, ar->sub.search_req);
+       if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
+
+       ar->sub.search_ret = ldb_wait(ar->sub.search_req->handle, LDB_WAIT_ALL);
+       if (ar->sub.search_ret != LDB_SUCCESS) {
+               return replmd_replicated_request_error(ar, ar->sub.search_ret);
+       }
+       if (!ar->sub.search_msg) {
+               return replmd_replicated_request_werror(ar, WERR_DS_DRA_INTERNAL_ERROR);
+       }
+
+       return replmd_replicated_uptodate_modify(ar);
+#endif
+}
+
+static int replmd_replicated_uptodate_vector(struct replmd_replicated_request *ar)
+{
+       ar->sub.mem_ctx = talloc_new(ar);
+       if (!ar->sub.mem_ctx) return replmd_replicated_request_werror(ar, WERR_NOMEM);
+
+       return replmd_replicated_uptodate_search(ar);
+}
+
 static int replmd_extended_replicated_objects(struct ldb_module *module, struct ldb_request *req)
 {
        struct dsdb_extended_replicated_objects *objs;
@@ -636,10 +964,15 @@ static int replmd_extended_replicated_objects(struct ldb_module *module, struct
 #ifdef REPLMD_FULL_ASYNC /* TODO: active this code when ldb support full async code */ 
        return replmd_replicated_apply_next(ar);
 #else
-       while (req->handle->state != LDB_ASYNC_DONE) {
+       while (ar->index_current < ar->objs->num_objects &&
+              req->handle->state != LDB_ASYNC_DONE) { 
                replmd_replicated_apply_next(ar);
        }
 
+       if (req->handle->state != LDB_ASYNC_DONE) {
+               replmd_replicated_uptodate_vector(ar);
+       }
+
        return LDB_SUCCESS;
 #endif
 }
index 14577400ba9f8b8cdf63d47132c90b2d006cbdb1..73e40183aab03175a3456c78a8f1e7699b6167fe 100644 (file)
@@ -50,6 +50,10 @@ struct dsdb_extended_replicated_object {
 struct dsdb_extended_replicated_objects {
        struct ldb_dn *partition_dn;
 
+       const struct GUID *source_dsa_invocation_id;
+       const struct drsuapi_DsReplicaHighWaterMark *new_highwatermark;
+       const struct drsuapi_DsReplicaCursor2CtrEx *uptodateness_vector;
+
        uint32_t num_objects;
        struct dsdb_extended_replicated_object *objects;
 };
index 9510572b57d069d22ed20f20f1482939baf5f272..d6fd89b7f367eb550bfb1bf874304996480a5a23 100644 (file)
@@ -139,6 +139,7 @@ static NTSTATUS test_become_dc_prepare_db(void *private_data,
        struct test_become_dc_state *s = talloc_get_type(private_data, struct test_become_dc_state);
        char *ejs;
        int ret;
+       bool ok;
 
        DEBUG(0,("New Server[%s] in Site[%s]\n",
                p->dest_dsa->dns_name, p->dest_dsa->site_name));
@@ -233,6 +234,17 @@ static NTSTATUS test_become_dc_prepare_db(void *private_data,
                return NT_STATUS_INTERNAL_DB_ERROR;
        }
 
+       ok = samdb_set_ntds_invocation_id(s->ldb, &p->dest_dsa->invocation_id);
+       if (!ok) {
+               DEBUG(0,("Failed to set cached ntds invocationId\n"));
+               return NT_STATUS_FOOBAR;
+       }
+       ok = samdb_set_ntds_objectGUID(s->ldb, &p->dest_dsa->ntds_guid);
+       if (!ok) {
+               DEBUG(0,("Failed to set cached ntds objectGUID\n"));
+               return NT_STATUS_FOOBAR;
+       }
+
        return NT_STATUS_OK;
 }
 
@@ -247,6 +259,7 @@ static NTSTATUS test_apply_schema(struct test_become_dc_state *s,
        struct drsuapi_DsReplicaObjectListItemEx *cur;
        uint32_t linked_attributes_count;
        struct drsuapi_DsReplicaLinkedAttribute *linked_attributes;
+       const struct GUID *source_dsa_invocation_id;
        const struct drsuapi_DsReplicaHighWaterMark *new_highwatermark;
        const struct drsuapi_DsReplicaCursor2CtrEx *uptodateness_vector;
        struct dsdb_extended_replicated_objects *objs;
@@ -260,6 +273,7 @@ static NTSTATUS test_apply_schema(struct test_become_dc_state *s,
                first_object            = s->schema_part.first_object;
                linked_attributes_count = 0;
                linked_attributes       = NULL;
+               source_dsa_invocation_id= &c->ctr1->source_dsa_invocation_id;
                new_highwatermark       = &c->ctr1->new_highwatermark;
                uptodateness_vector     = NULL; /* TODO: map it */
                break;
@@ -270,6 +284,7 @@ static NTSTATUS test_apply_schema(struct test_become_dc_state *s,
                first_object            = s->schema_part.first_object;
                linked_attributes_count = 0; /* TODO: ! */
                linked_attributes       = NULL; /* TODO: ! */;
+               source_dsa_invocation_id= &c->ctr6->source_dsa_invocation_id;
                new_highwatermark       = &c->ctr6->new_highwatermark;
                uptodateness_vector     = c->ctr6->uptodateness_vector;
                break;
@@ -353,6 +368,7 @@ static NTSTATUS test_apply_schema(struct test_become_dc_state *s,
                                                         first_object,
                                                         linked_attributes_count,
                                                         linked_attributes,
+                                                        source_dsa_invocation_id,
                                                         new_highwatermark,
                                                         uptodateness_vector,
                                                         s, &objs);
@@ -456,6 +472,7 @@ static NTSTATUS test_become_dc_store_chunk(void *private_data,
        struct drsuapi_DsReplicaObjectListItemEx *first_object;
        uint32_t linked_attributes_count;
        struct drsuapi_DsReplicaLinkedAttribute *linked_attributes;
+       const struct GUID *source_dsa_invocation_id;
        const struct drsuapi_DsReplicaHighWaterMark *new_highwatermark;
        const struct drsuapi_DsReplicaCursor2CtrEx *uptodateness_vector;
        struct dsdb_extended_replicated_objects *objs;
@@ -469,6 +486,7 @@ static NTSTATUS test_become_dc_store_chunk(void *private_data,
                first_object            = c->ctr1->first_object;
                linked_attributes_count = 0;
                linked_attributes       = NULL;
+               source_dsa_invocation_id= &c->ctr1->source_dsa_invocation_id;
                new_highwatermark       = &c->ctr1->new_highwatermark;
                uptodateness_vector     = NULL; /* TODO: map it */
                break;
@@ -479,6 +497,7 @@ static NTSTATUS test_become_dc_store_chunk(void *private_data,
                first_object            = c->ctr6->first_object;
                linked_attributes_count = c->ctr6->linked_attributes_count;
                linked_attributes       = c->ctr6->linked_attributes;
+               source_dsa_invocation_id= &c->ctr6->source_dsa_invocation_id;
                new_highwatermark       = &c->ctr6->new_highwatermark;
                uptodateness_vector     = c->ctr6->uptodateness_vector;
                break;
@@ -502,6 +521,7 @@ static NTSTATUS test_become_dc_store_chunk(void *private_data,
                                                         first_object,
                                                         linked_attributes_count,
                                                         linked_attributes,
+                                                        source_dsa_invocation_id,
                                                         new_highwatermark,
                                                         uptodateness_vector,
                                                         s, &objs);