/*
* This order must match that in prepare_commit(), start with
- * the metadata partition (sam.ldb) lock
+ * the top level DB (sam.ldb) lock
*/
ret = ldb_next_read_lock(module);
if (ret != LDB_SUCCESS) {
}
/*
- * The metadata partition (sam.ldb) lock is not
+ * The top level DB (sam.ldb) lock is not
* enough to block another process in prepare_commit(),
* because prepare_commit() is a no-op, if nothing
* was changed in the specific backend.
ldb_dn_get_linearized(
data->partitions[i]->ctrl->dn));
- /* Back it out, if it fails on one */
- for (i--; i >= 0; i--) {
- ret2 = ldb_next_read_unlock(data->partitions[i]->module);
- if (ret2 != LDB_SUCCESS) {
- ldb_debug(ldb,
- LDB_DEBUG_FATAL,
- "Failed to unlock db: %s / %s",
- ldb_errstring(ldb),
- ldb_strerror(ret2));
- }
- }
- ret2 = ldb_next_read_unlock(module);
+ goto failed;
+ }
+
+ /*
+ * Because in prepare_commit this must come last, to ensure
+ * lock ordering we have to do this last here also
+ */
+ ret = partition_metadata_read_lock(module);
+ if (ret != LDB_SUCCESS) {
+ goto failed;
+ }
+
+ return LDB_SUCCESS;
+
+failed:
+ /* Back it out, if it fails on one */
+ for (i--; i >= 0; i--) {
+ ret2 = ldb_next_read_unlock(data->partitions[i]->module);
if (ret2 != LDB_SUCCESS) {
ldb_debug(ldb,
LDB_DEBUG_FATAL,
ldb_errstring(ldb),
ldb_strerror(ret2));
}
- return ret;
}
-
- return LDB_SUCCESS;
+ ret2 = ldb_next_read_unlock(module);
+ if (ret2 != LDB_SUCCESS) {
+ ldb_debug(ldb,
+ LDB_DEBUG_FATAL,
+ "Failed to unlock db: %s / %s",
+ ldb_errstring(ldb),
+ ldb_strerror(ret2));
+ }
+ return ret;
}
/* unlock all the backends */
}
}
+ /*
+ * Because in prepare_commit this must come last, to ensure
+ * lock ordering we have to do this last here also
+ */
+ ret = partition_metadata_read_unlock(module);
+
+ /*
+ * Don't overwrite the original failure code
+ * if there was one
+ */
+ if (ret == LDB_SUCCESS) {
+ ret = ret2;
+ }
+
return ret;
}
ret = partition_metadata_set_uint64(module, LDB_METADATA_SEQ_NUM, *value, false);
return ret;
}
+/*
+ lock the database for read - use by partition_lock_read
+*/
+int partition_metadata_read_lock(struct ldb_module *module)
+{
+ struct partition_private_data *data
+ = talloc_get_type_abort(ldb_module_get_private(module),
+ struct partition_private_data);
+ struct tdb_context *tdb = NULL;
+ int tdb_ret = 0;
+ int ret;
+
+ if (!data || !data->metadata || !data->metadata->db) {
+ return ldb_module_error(module, LDB_ERR_OPERATIONS_ERROR,
+ "partition_metadata: metadata not initialized");
+ }
+ tdb = data->metadata->db->tdb;
+
+ if (tdb_transaction_active(tdb) == false &&
+ data->metadata->read_lock_count == 0) {
+ tdb_ret = tdb_lockall_read(tdb);
+ }
+ if (tdb_ret == 0) {
+ data->metadata->read_lock_count++;
+ return LDB_SUCCESS;
+ } else {
+ /* Sadly we can't call ltdb_err_map(tdb_error(tdb)); */
+ ret = LDB_ERR_BUSY;
+ }
+ ldb_debug_set(ldb_module_get_ctx(module),
+ LDB_DEBUG_FATAL,
+ "Failure during partition_metadata_read_lock(): %s",
+ tdb_errorstr(tdb));
+ return ret;
+}
+
+/*
+ unlock the database after a partition_metadata_lock_read()
+*/
+int partition_metadata_read_unlock(struct ldb_module *module)
+{
+ struct partition_private_data *data
+ = talloc_get_type_abort(ldb_module_get_private(module),
+ struct partition_private_data);
+ struct tdb_context *tdb = NULL;
+
+ data = talloc_get_type_abort(ldb_module_get_private(module),
+ struct partition_private_data);
+ if (!data || !data->metadata || !data->metadata->db) {
+ return ldb_module_error(module, LDB_ERR_OPERATIONS_ERROR,
+ "partition_metadata: metadata not initialized");
+ }
+ tdb = data->metadata->db->tdb;
+
+ if (!tdb_transaction_active(tdb) &&
+ data->metadata->read_lock_count == 1) {
+ tdb_unlockall_read(tdb);
+ data->metadata->read_lock_count--;
+ return 0;
+ }
+ data->metadata->read_lock_count--;
+ return 0;
+}
/*