s4:dsdb Reload partition metadata if the main db updates
authorAndrew Bartlett <abartlet@samba.org>
Wed, 14 Oct 2009 01:57:03 +0000 (12:57 +1100)
committerAndrew Bartlett <abartlet@samba.org>
Wed, 21 Oct 2009 11:43:52 +0000 (22:43 +1100)
This uses the fact that the primary DB does not change often.  Before
each operation, we see if the sequence number has changed.

Andrew Bartlett

source4/dsdb/samdb/ldb_modules/partition.c
source4/dsdb/samdb/ldb_modules/partition.h
source4/dsdb/samdb/ldb_modules/partition_init.c

index f6031fb944e58962a1318256d9f84f182aa3e61a..4626eefa7c3a82033fe054e118fffa846765fd27 100644 (file)
@@ -412,6 +412,11 @@ static int partition_replicate(struct ldb_module *module, struct ldb_request *re
        if (!data || !data->partitions) {
                return ldb_next_request(module, req);
        }
+
+       ret = partition_reload_if_required(module, data);
+       if (ret != LDB_SUCCESS) {
+               return ret;
+       }
        
        if (req->operation != LDB_SEARCH) {
                /* Is this a special DN, we need to replicate to every backend? */
@@ -463,6 +468,7 @@ static int partition_replicate(struct ldb_module *module, struct ldb_request *re
 /* search */
 static int partition_search(struct ldb_module *module, struct ldb_request *req)
 {
+       int ret;
        struct ldb_control **saved_controls;
        /* Find backend */
        struct partition_private_data *data = talloc_get_type(module->private_data, 
@@ -479,6 +485,11 @@ static int partition_search(struct ldb_module *module, struct ldb_request *req)
        struct ldb_search_options_control *search_options = NULL;
        struct dsdb_partition *p;
        
+       ret = partition_reload_if_required(module, data);
+       if (ret != LDB_SUCCESS) {
+               return ret;
+       }
+
        p = find_partition(data, NULL, req);
        if (p != NULL) {
                /* the caller specified what partition they want the
@@ -511,7 +522,7 @@ static int partition_search(struct ldb_module *module, struct ldb_request *req)
        */
        
        if (search_options && (search_options->search_options & LDB_SEARCH_OPTION_PHANTOM_ROOT)) {
-               int ret, i;
+               int i;
                struct partition_context *ac;
                if ((search_options->search_options & ~LDB_SEARCH_OPTION_PHANTOM_ROOT) == 0) {
                        /* We have processed this flag, so we are done with this control now */
@@ -615,6 +626,7 @@ static int partition_delete(struct ldb_module *module, struct ldb_request *req)
 /* rename */
 static int partition_rename(struct ldb_module *module, struct ldb_request *req)
 {
+       int ret;
        /* Find backend */
        struct dsdb_partition *backend, *backend2;
        
@@ -626,6 +638,11 @@ static int partition_rename(struct ldb_module *module, struct ldb_request *req)
                return LDB_ERR_OPERATIONS_ERROR;
        }
 
+       ret = partition_reload_if_required(module, data);
+       if (ret != LDB_SUCCESS) {
+               return ret;
+       }
+
        backend = find_partition(data, req->op.rename.olddn, req);
        backend2 = find_partition(data, req->op.rename.newdn, req);
 
@@ -751,6 +768,61 @@ static int partition_del_trans(struct ldb_module *module)
        return final_ret;
 }
 
+int partition_primary_sequence_number(struct ldb_module *module, TALLOC_CTX *mem_ctx, 
+                                    enum ldb_sequence_type type, uint64_t *seq_number) 
+{
+       int ret;
+       struct ldb_result *res;
+       struct ldb_seqnum_request *tseq;
+       struct ldb_request *treq;
+       struct ldb_seqnum_result *seqr;
+       res = talloc_zero(mem_ctx, struct ldb_result);
+       if (res == NULL) {
+               return LDB_ERR_OPERATIONS_ERROR;
+       }
+       tseq = talloc_zero(res, struct ldb_seqnum_request);
+       if (tseq == NULL) {
+               talloc_free(res);
+               return LDB_ERR_OPERATIONS_ERROR;
+       }
+       tseq->type = type;
+       
+       ret = ldb_build_extended_req(&treq, ldb_module_get_ctx(module), res,
+                                    LDB_EXTENDED_SEQUENCE_NUMBER,
+                                    tseq,
+                                    NULL,
+                                    res,
+                                    ldb_extended_default_callback,
+                                    NULL);
+       if (ret != LDB_SUCCESS) {
+               talloc_free(res);
+               return ret;
+       }
+       
+       ret = ldb_next_request(module, treq);
+       if (ret != LDB_SUCCESS) {
+               talloc_free(res);
+               return ret;
+       }
+       ret = ldb_wait(treq->handle, LDB_WAIT_ALL);
+       if (ret != LDB_SUCCESS) {
+               talloc_free(res);
+               return ret;
+       }
+       
+       seqr = talloc_get_type(res->extended->data,
+                              struct ldb_seqnum_result);
+       if (seqr->flags & LDB_SEQ_TIMESTAMP_SEQUENCE) {
+               ret = LDB_ERR_OPERATIONS_ERROR;
+               ldb_set_errstring(ldb_module_get_ctx(module), "Primary backend in partitions module returned a timestamp based seq number (must return a normal number)");
+               talloc_free(res);
+               return ret;
+       } else {
+               *seq_number = seqr->seq_num;
+       }
+       talloc_free(res);
+       return LDB_SUCCESS;
+}
 
 /* FIXME: This function is still semi-async */
 static int partition_sequence_number(struct ldb_module *module, struct ldb_request *req)
@@ -783,49 +855,12 @@ static int partition_sequence_number(struct ldb_module *module, struct ldb_reque
        switch (seq->type) {
        case LDB_SEQ_NEXT:
        case LDB_SEQ_HIGHEST_SEQ:
-               res = talloc_zero(req, struct ldb_result);
-               if (res == NULL) {
-                       return LDB_ERR_OPERATIONS_ERROR;
-               }
-               tseq = talloc_zero(res, struct ldb_seqnum_request);
-               if (tseq == NULL) {
-                       talloc_free(res);
-                       return LDB_ERR_OPERATIONS_ERROR;
-               }
-               tseq->type = seq->type;
 
-               ret = ldb_build_extended_req(&treq, ldb_module_get_ctx(module), res,
-                                            LDB_EXTENDED_SEQUENCE_NUMBER,
-                                            tseq,
-                                            NULL,
-                                            res,
-                                            ldb_extended_default_callback,
-                                            NULL);
+               ret = partition_primary_sequence_number(module, req, seq->type, &seq_number);
                if (ret != LDB_SUCCESS) {
-                       talloc_free(res);
                        return ret;
                }
 
-               ret = ldb_next_request(module, treq);
-               if (ret != LDB_SUCCESS) {
-                       talloc_free(res);
-                       return ret;
-               }
-               ret = ldb_wait(treq->handle, LDB_WAIT_ALL);
-               if (ret != LDB_SUCCESS) {
-                       talloc_free(res);
-                       return ret;
-               }
-
-               seqr = talloc_get_type(res->extended->data,
-                                       struct ldb_seqnum_result);
-               if (seqr->flags & LDB_SEQ_TIMESTAMP_SEQUENCE) {
-                       timestamp_sequence = seqr->seq_num;
-               } else {
-                       seq_number += seqr->seq_num;
-               }
-               talloc_free(res);
-
                /* Skip the lot if 'data' isn't here yet (initialisation) */
                for (i=0; data && data->partitions && data->partitions[i]; i++) {
 
@@ -1078,12 +1113,18 @@ static int partition_extended(struct ldb_module *module, struct ldb_request *req
 {
        struct partition_private_data *data;
        struct partition_context *ac;
+       int ret;
 
        data = talloc_get_type(module->private_data, struct partition_private_data);
        if (!data) {
                return ldb_next_request(module, req);
        }
 
+       ret = partition_reload_if_required(module, data);
+       if (ret != LDB_SUCCESS) {
+               return ret;
+       }
+       
        if (strcmp(req->op.extended.oid, LDB_EXTENDED_SEQUENCE_NUMBER) == 0) {
                return partition_sequence_number(module, req);
        }
index 2fbcada0a110e041dffb6403f8176f0958d1ca65..9bab2dbcbcd06283f0436be4a72f2d75baa6ce6f 100644 (file)
@@ -42,6 +42,8 @@ struct partition_private_data {
        
        struct partition_module **modules;
        const char *ldapBackend;
+
+       uint64_t metadata_seq;
 };
 
 #define PARTITION_FIND_OP_NOERROR(module, op) do { \
index cfec1aa58511ed0a14439600b6e569d80f0d21f4..2da938d20edb77da3881630916d43d6470643807 100644 (file)
@@ -159,6 +159,38 @@ static int partition_reload_metadata(struct ldb_module *module, struct partition
        return LDB_SUCCESS;
 }
 
+int partition_reload_if_required(struct ldb_module *module, 
+                                struct partition_private_data *data)
+       
+{
+       uint64_t seq;
+       int ret;
+       TALLOC_CTX *mem_ctx = talloc_new(data);
+       if (!data) {
+               /* Not initilised yet */
+               return LDB_SUCCESS;
+       }
+       if (!mem_ctx) {
+               ldb_oom(ldb_module_get_ctx(module));
+               return LDB_ERR_OPERATIONS_ERROR;
+       }
+       ret = partition_primary_sequence_number(module, mem_ctx, LDB_SEQ_HIGHEST_SEQ, &seq);
+       if (ret != LDB_SUCCESS) {
+               talloc_free(mem_ctx);
+               return ret;
+       }
+       if (seq != data->metadata_seq) {
+               ret = partition_reload_metadata(module, data, mem_ctx, NULL);
+               if (ret != LDB_SUCCESS) {
+                       talloc_free(mem_ctx);
+                       return ret;
+               }
+               data->metadata_seq = seq;
+       }
+       talloc_free(mem_ctx);
+       return LDB_SUCCESS;
+}
+
 static const char **find_modules_for_dn(struct partition_private_data *data, struct ldb_dn *dn) 
 {
        int i;
@@ -577,6 +609,12 @@ int partition_init(struct ldb_module *module)
                return LDB_ERR_OPERATIONS_ERROR;
        }
 
+       ret = partition_primary_sequence_number(module, mem_ctx, LDB_SEQ_HIGHEST_SEQ, &data->metadata_seq);
+       if (ret != LDB_SUCCESS) {
+               talloc_free(mem_ctx);
+               return ret;
+       }
+
        ret = partition_reload_metadata(module, data, mem_ctx, &msg);
        if (ret != LDB_SUCCESS) {
                return ret;