smbd: Restructure brl_get_locks_readonly
authorVolker Lendecke <vl@samba.org>
Wed, 11 Sep 2013 11:36:54 +0000 (11:36 +0000)
committerStefan Metzmacher <metze@samba.org>
Sun, 6 Oct 2013 18:22:09 +0000 (20:22 +0200)
This is step 1 to get rid of brl_get_locks_internal with its complex readonly
business. It also optimizes 2 things: First, it uses dbwrap_parse_record to
avoid a talloc and memcpy, and second it uses talloc_pooled_object.

And -- hopefully it is easier to understand the caching logic with
fsp->brlock_rec and the clustering escape.

Signed-off-by: Volker Lendecke <vl@samba.org>
Reviewed-by: Stefan Metzmacher <metze@samba.org>
source3/locking/brlock.c

index a0b94cdadfe83f076c9f12948f327e88adb1ead9..e0335dcf30f36285c1574f0226c0b3748d6c320a 100644 (file)
@@ -2058,30 +2058,123 @@ struct byte_range_lock *brl_get_locks(TALLOC_CTX *mem_ctx,
        return brl_get_locks_internal(mem_ctx, fsp, False);
 }
 
-struct byte_range_lock *brl_get_locks_readonly(files_struct *fsp)
+struct brl_get_locks_readonly_state {
+       TALLOC_CTX *mem_ctx;
+       struct byte_range_lock **br_lock;
+};
+
+static void brl_get_locks_readonly_parser(TDB_DATA key, TDB_DATA data,
+                                         void *private_data)
 {
+       struct brl_get_locks_readonly_state *state =
+               (struct brl_get_locks_readonly_state *)private_data;
        struct byte_range_lock *br_lock;
 
+       br_lock = talloc_pooled_object(
+               state->mem_ctx, struct byte_range_lock, 1, data.dsize);
+       if (br_lock == NULL) {
+               *state->br_lock = NULL;
+               return;
+       }
+       br_lock->lock_data = (struct lock_struct *)talloc_memdup(
+               br_lock, data.dptr, data.dsize);
+       br_lock->num_locks = data.dsize / sizeof(struct lock_struct);
+
+       *state->br_lock = br_lock;
+}
+
+struct byte_range_lock *brl_get_locks_readonly(files_struct *fsp)
+{
+       struct byte_range_lock *br_lock = NULL;
+       struct byte_range_lock *rw = NULL;
+
        if ((fsp->brlock_rec != NULL)
            && (dbwrap_get_seqnum(brlock_db) == fsp->brlock_seqnum)) {
+               /*
+                * We have cached the brlock_rec and the database did not
+                * change.
+                */
                return fsp->brlock_rec;
        }
 
-       if (lp_clustering()) {
-               return brl_get_locks_internal(talloc_tos(), fsp, true);
+       if (!fsp->lockdb_clean) {
+               /*
+                * Fetch the record in R/W mode to give validate_lock_entries
+                * a chance to kick in once.
+                */
+               rw = brl_get_locks_internal(talloc_tos(), fsp, false);
+               if (rw == NULL) {
+                       return NULL;
+               }
+               fsp->lockdb_clean = true;
        }
 
-       TALLOC_FREE(fsp->brlock_rec);
+       if (rw != NULL) {
+               size_t lock_data_size;
 
-       br_lock = brl_get_locks_internal(talloc_tos(), fsp, true);
-       if (br_lock == NULL) {
-               return NULL;
+               /*
+                * Make a copy of the already retrieved and sanitized rw record
+                */
+               lock_data_size = rw->num_locks * sizeof(struct lock_struct);
+               br_lock = talloc_pooled_object(
+                       fsp, struct byte_range_lock, 1, lock_data_size);
+               if (br_lock == NULL) {
+                       goto fail;
+               }
+               br_lock->num_locks = rw->num_locks;
+               br_lock->lock_data = (struct lock_struct *)talloc_memdup(
+                       br_lock, rw->lock_data, lock_data_size);
+       } else {
+               struct brl_get_locks_readonly_state state;
+               NTSTATUS status;
+
+               /*
+                * Parse the record fresh from the database
+                */
+
+               state.mem_ctx = fsp;
+               state.br_lock = &br_lock;
+
+               status = dbwrap_parse_record(
+                       brlock_db,
+                       make_tdb_data((uint8_t *)&fsp->file_id,
+                                     sizeof(fsp->file_id)),
+                       brl_get_locks_readonly_parser, &state);
+               if (!NT_STATUS_IS_OK(status)) {
+                       DEBUG(3, ("Could not parse byte range lock record: "
+                                 "%s\n", nt_errstr(status)));
+                       goto fail;
+               }
+               if (br_lock == NULL) {
+                       goto fail;
+               }
        }
-       fsp->brlock_seqnum = dbwrap_get_seqnum(brlock_db);
 
-       fsp->brlock_rec = talloc_move(fsp, &br_lock);
+       br_lock->fsp = fsp;
+       br_lock->modified = false;
+       br_lock->read_only = true;
+       br_lock->record = NULL;
+
+       if (lp_clustering()) {
+               /*
+                * In the cluster case we can't cache the brlock struct
+                * because dbwrap_get_seqnum does not work reliably over
+                * ctdb. Thus we have to throw away the brlock struct soon.
+                */
+               talloc_steal(talloc_tos(), br_lock);
+       } else {
+               /*
+                * Cache the brlock struct, invalidated when the dbwrap_seqnum
+                * changes. See beginning of this routine.
+                */
+               TALLOC_FREE(fsp->brlock_rec);
+               fsp->brlock_rec = br_lock;
+               fsp->brlock_seqnum = dbwrap_get_seqnum(brlock_db);
+       }
 
-       return fsp->brlock_rec;
+fail:
+       TALLOC_FREE(rw);
+       return br_lock;
 }
 
 struct brl_revalidate_state {