smbd: Add debugs to brlock.c
[mat/samba.git] / source3 / locking / brlock.c
index a0b94cdadfe83f076c9f12948f327e88adb1ead9..b5eebc8e0401984d1d618c99e20585b08690ad2f 100644 (file)
@@ -47,7 +47,7 @@ struct byte_range_lock {
        struct files_struct *fsp;
        unsigned int num_locks;
        bool modified;
-       bool read_only;
+       bool have_read_oplocks;
        struct lock_struct *lock_data;
        struct db_record *record;
 };
@@ -82,6 +82,21 @@ struct files_struct *brl_fsp(struct byte_range_lock *brl)
        return brl->fsp;
 }
 
+bool brl_have_read_oplocks(const struct byte_range_lock *brl)
+{
+       return brl->have_read_oplocks;
+}
+
+void brl_set_have_read_oplocks(struct byte_range_lock *brl,
+                              bool have_read_oplocks)
+{
+       DEBUG(10, ("Setting have_read_oplocks to %s\n",
+                  have_read_oplocks ? "true" : "false"));
+       SMB_ASSERT(brl->record != NULL); /* otherwise we're readonly */
+       brl->have_read_oplocks = have_read_oplocks;
+       brl->modified = true;
+}
+
 /****************************************************************************
  See if two locking contexts are equal.
 ****************************************************************************/
@@ -1879,15 +1894,21 @@ int brl_forall(void (*fn)(struct file_id id, struct server_id pid,
 
 static void byte_range_lock_flush(struct byte_range_lock *br_lck)
 {
-       if (br_lck->read_only) {
-               SMB_ASSERT(!br_lck->modified);
-       }
-
+       size_t data_len;
        if (!br_lck->modified) {
+               DEBUG(10, ("br_lck not modified\n"));
                goto done;
        }
 
-       if (br_lck->num_locks == 0) {
+       data_len = br_lck->num_locks * sizeof(struct lock_struct);
+
+       if (br_lck->have_read_oplocks) {
+               data_len += 1;
+       }
+
+       DEBUG(10, ("data_len=%d\n", (int)data_len));
+
+       if (data_len == 0) {
                /* No locks - delete this entry. */
                NTSTATUS status = dbwrap_record_delete(br_lck->record);
                if (!NT_STATUS_IS_OK(status)) {
@@ -1899,21 +1920,29 @@ static void byte_range_lock_flush(struct byte_range_lock *br_lck)
                TDB_DATA data;
                NTSTATUS status;
 
-               data.dptr = (uint8 *)br_lck->lock_data;
-               data.dsize = br_lck->num_locks * sizeof(struct lock_struct);
+               data.dsize = data_len;
+               data.dptr = talloc_array(talloc_tos(), uint8_t, data_len);
+               SMB_ASSERT(data.dptr != NULL);
+
+               memcpy(data.dptr, br_lck->lock_data,
+                      br_lck->num_locks * sizeof(struct lock_struct));
+
+               if (br_lck->have_read_oplocks) {
+                       data.dptr[data_len-1] = 1;
+               }
 
                status = dbwrap_record_store(br_lck->record, data, TDB_REPLACE);
+               TALLOC_FREE(data.dptr);
                if (!NT_STATUS_IS_OK(status)) {
                        DEBUG(0, ("store returned %s\n", nt_errstr(status)));
                        smb_panic("Could not store byte range mode entry");
                }
        }
 
- done:
+       DEBUG(10, ("seqnum=%d\n", dbwrap_get_seqnum(brlock_db)));
 
-       br_lck->read_only = true;
+ done:
        br_lck->modified = false;
-
        TALLOC_FREE(br_lck->record);
 }
 
@@ -1929,12 +1958,10 @@ static int byte_range_lock_destructor(struct byte_range_lock *br_lck)
  TALLOC_FREE(brl) will release the lock in the destructor.
 ********************************************************************/
 
-static struct byte_range_lock *brl_get_locks_internal(TALLOC_CTX *mem_ctx,
-                                       files_struct *fsp, bool read_only)
+struct byte_range_lock *brl_get_locks(TALLOC_CTX *mem_ctx, files_struct *fsp)
 {
        TDB_DATA key, data;
        struct byte_range_lock *br_lck = talloc(mem_ctx, struct byte_range_lock);
-       bool do_read_only = read_only;
 
        if (br_lck == NULL) {
                return NULL;
@@ -1942,45 +1969,22 @@ static struct byte_range_lock *brl_get_locks_internal(TALLOC_CTX *mem_ctx,
 
        br_lck->fsp = fsp;
        br_lck->num_locks = 0;
+       br_lck->have_read_oplocks = false;
        br_lck->modified = False;
 
        key.dptr = (uint8 *)&fsp->file_id;
        key.dsize = sizeof(struct file_id);
 
-       if (!fsp->lockdb_clean) {
-               /* We must be read/write to clean
-                  the dead entries. */
-               do_read_only = false;
-       }
+       br_lck->record = dbwrap_fetch_locked(brlock_db, br_lck, key);
 
-       if (do_read_only) {
-               NTSTATUS status;
-               status = dbwrap_fetch(brlock_db, br_lck, key, &data);
-               if (!NT_STATUS_IS_OK(status)) {
-                       DEBUG(3, ("Could not fetch byte range lock record\n"));
-                       TALLOC_FREE(br_lck);
-                       return NULL;
-               }
-               br_lck->record = NULL;
-       } else {
-               br_lck->record = dbwrap_fetch_locked(brlock_db, br_lck, key);
-
-               if (br_lck->record == NULL) {
-                       DEBUG(3, ("Could not lock byte range lock entry\n"));
-                       TALLOC_FREE(br_lck);
-                       return NULL;
-               }
-
-               data = dbwrap_record_get_value(br_lck->record);
-       }
-
-       if ((data.dsize % sizeof(struct lock_struct)) != 0) {
-               DEBUG(3, ("Got invalid brlock data\n"));
+       if (br_lck->record == NULL) {
+               DEBUG(3, ("Could not lock byte range lock entry\n"));
                TALLOC_FREE(br_lck);
                return NULL;
        }
 
-       br_lck->read_only = do_read_only;
+       data = dbwrap_record_get_value(br_lck->record);
+
        br_lck->lock_data = NULL;
 
        talloc_set_destructor(br_lck, byte_range_lock_destructor);
@@ -1996,7 +2000,14 @@ static struct byte_range_lock *brl_get_locks_internal(TALLOC_CTX *mem_ctx,
                        return NULL;
                }
 
-               memcpy(br_lck->lock_data, data.dptr, data.dsize);
+               memcpy(br_lck->lock_data, data.dptr,
+                      talloc_get_size(br_lck->lock_data));
+       }
+
+       DEBUG(10, ("data.dsize=%d\n", (int)data.dsize));
+
+       if ((data.dsize % sizeof(struct lock_struct)) == 1) {
+               br_lck->have_read_oplocks = (data.dptr[data.dsize-1] == 1);
        }
 
        if (!fsp->lockdb_clean) {
@@ -2041,47 +2052,136 @@ static struct byte_range_lock *brl_get_locks_internal(TALLOC_CTX *mem_ctx,
                }
        }
 
-       if (do_read_only != read_only) {
-               /*
-                * this stores the record and gets rid of
-                * the write lock that is needed for a cleanup
-                */
-               byte_range_lock_flush(br_lck);
-       }
-
        return br_lck;
 }
 
-struct byte_range_lock *brl_get_locks(TALLOC_CTX *mem_ctx,
-                                       files_struct *fsp)
+struct brl_get_locks_readonly_state {
+       TALLOC_CTX *mem_ctx;
+       struct byte_range_lock **br_lock;
+};
+
+static void brl_get_locks_readonly_parser(TDB_DATA key, TDB_DATA data,
+                                         void *private_data)
 {
-       return brl_get_locks_internal(mem_ctx, fsp, False);
+       struct brl_get_locks_readonly_state *state =
+               (struct brl_get_locks_readonly_state *)private_data;
+       struct byte_range_lock *br_lock;
+
+       br_lock = talloc_pooled_object(
+               state->mem_ctx, struct byte_range_lock, 1, data.dsize);
+       if (br_lock == NULL) {
+               *state->br_lock = NULL;
+               return;
+       }
+       br_lock->lock_data = (struct lock_struct *)talloc_memdup(
+               br_lock, data.dptr, data.dsize);
+       br_lock->num_locks = data.dsize / sizeof(struct lock_struct);
+
+       if ((data.dsize % sizeof(struct lock_struct)) == 1) {
+               br_lock->have_read_oplocks = (data.dptr[data.dsize-1] == 1);
+       }
+
+       DEBUG(10, ("Got %d bytes, have_read_oplocks: %s\n", (int)data.dsize,
+                  br_lock->have_read_oplocks ? "true" : "false"));
+
+       *state->br_lock = br_lock;
 }
 
 struct byte_range_lock *brl_get_locks_readonly(files_struct *fsp)
 {
-       struct byte_range_lock *br_lock;
+       struct byte_range_lock *br_lock = NULL;
+       struct byte_range_lock *rw = NULL;
+
+       DEBUG(10, ("seqnum=%d, fsp->brlock_seqnum=%d\n",
+                  dbwrap_get_seqnum(brlock_db), fsp->brlock_seqnum));
 
        if ((fsp->brlock_rec != NULL)
            && (dbwrap_get_seqnum(brlock_db) == fsp->brlock_seqnum)) {
+               /*
+                * We have cached the brlock_rec and the database did not
+                * change.
+                */
                return fsp->brlock_rec;
        }
 
-       if (lp_clustering()) {
-               return brl_get_locks_internal(talloc_tos(), fsp, true);
+       if (!fsp->lockdb_clean) {
+               /*
+                * Fetch the record in R/W mode to give validate_lock_entries
+                * a chance to kick in once.
+                */
+               rw = brl_get_locks(talloc_tos(), fsp);
+               if (rw == NULL) {
+                       return NULL;
+               }
+               fsp->lockdb_clean = true;
        }
 
-       TALLOC_FREE(fsp->brlock_rec);
+       if (rw != NULL) {
+               size_t lock_data_size;
 
-       br_lock = brl_get_locks_internal(talloc_tos(), fsp, true);
-       if (br_lock == NULL) {
-               return NULL;
+               /*
+                * Make a copy of the already retrieved and sanitized rw record
+                */
+               lock_data_size = rw->num_locks * sizeof(struct lock_struct);
+               br_lock = talloc_pooled_object(
+                       fsp, struct byte_range_lock, 1, lock_data_size);
+               if (br_lock == NULL) {
+                       goto fail;
+               }
+               br_lock->have_read_oplocks = rw->have_read_oplocks;
+               br_lock->num_locks = rw->num_locks;
+               br_lock->lock_data = (struct lock_struct *)talloc_memdup(
+                       br_lock, rw->lock_data, lock_data_size);
+       } else {
+               struct brl_get_locks_readonly_state state;
+               NTSTATUS status;
+
+               /*
+                * Parse the record fresh from the database
+                */
+
+               state.mem_ctx = fsp;
+               state.br_lock = &br_lock;
+
+               status = dbwrap_parse_record(
+                       brlock_db,
+                       make_tdb_data((uint8_t *)&fsp->file_id,
+                                     sizeof(fsp->file_id)),
+                       brl_get_locks_readonly_parser, &state);
+               if (!NT_STATUS_IS_OK(status)) {
+                       DEBUG(3, ("Could not parse byte range lock record: "
+                                 "%s\n", nt_errstr(status)));
+                       goto fail;
+               }
+               if (br_lock == NULL) {
+                       goto fail;
+               }
        }
-       fsp->brlock_seqnum = dbwrap_get_seqnum(brlock_db);
 
-       fsp->brlock_rec = talloc_move(fsp, &br_lock);
+       br_lock->fsp = fsp;
+       br_lock->modified = false;
+       br_lock->record = NULL;
+
+       if (lp_clustering()) {
+               /*
+                * In the cluster case we can't cache the brlock struct
+                * because dbwrap_get_seqnum does not work reliably over
+                * ctdb. Thus we have to throw away the brlock struct soon.
+                */
+               talloc_steal(talloc_tos(), br_lock);
+       } else {
+               /*
+                * Cache the brlock struct, invalidated when the dbwrap_seqnum
+                * changes. See beginning of this routine.
+                */
+               TALLOC_FREE(fsp->brlock_rec);
+               fsp->brlock_rec = br_lock;
+               fsp->brlock_seqnum = dbwrap_get_seqnum(brlock_db);
+       }
 
-       return fsp->brlock_rec;
+fail:
+       TALLOC_FREE(rw);
+       return br_lock;
 }
 
 struct brl_revalidate_state {