ldb key_value: Add batch_mode option
[samba.git] / lib / ldb / ldb_key_value / ldb_kv.c
index 56316eb89757e46321301185c6e01b0933d7d2c6..f768fb5e1e44d60c2f1faf5d695b58c265e4fb45 100644 (file)
@@ -471,11 +471,16 @@ static bool ldb_kv_single_valued(const struct ldb_schema_attribute *a,
 
 /*
  * Starts a sub transaction if they are supported by the backend
+ * and the ldb connection has not been opened in batch mode.
  */
 static int ldb_kv_sub_transaction_start(struct ldb_kv_private *ldb_kv)
 {
        int ret = LDB_SUCCESS;
 
+       if (ldb_kv->batch_mode) {
+               return ret;
+       }
+
        ret = ldb_kv->kv_ops->begin_nested_write(ldb_kv);
        if (ret == LDB_SUCCESS) {
                ret = ldb_kv_index_sub_transaction_start(ldb_kv);
@@ -485,11 +490,16 @@ static int ldb_kv_sub_transaction_start(struct ldb_kv_private *ldb_kv)
 
 /*
  * Commits a sub transaction if they are supported by the backend
+ * and the ldb connection has not been opened in batch mode.
  */
 static int ldb_kv_sub_transaction_commit(struct ldb_kv_private *ldb_kv)
 {
        int ret = LDB_SUCCESS;
 
+       if (ldb_kv->batch_mode) {
+               return ret;
+       }
+
        ret = ldb_kv_index_sub_transaction_commit(ldb_kv);
        if (ret != LDB_SUCCESS) {
                return ret;
@@ -500,11 +510,16 @@ static int ldb_kv_sub_transaction_commit(struct ldb_kv_private *ldb_kv)
 
 /*
  * Cancels a sub transaction if they are supported by the backend
+ * and the ldb connection has not been opened in batch mode.
  */
 static int ldb_kv_sub_transaction_cancel(struct ldb_kv_private *ldb_kv)
 {
        int ret = LDB_SUCCESS;
 
+       if (ldb_kv->batch_mode) {
+               return ret;
+       }
+
        ret = ldb_kv_index_sub_transaction_cancel(ldb_kv);
        if (ret != LDB_SUCCESS) {
                struct ldb_context *ldb = ldb_module_get_ctx(ldb_kv->module);
@@ -691,6 +706,7 @@ static int ldb_kv_add(struct ldb_kv_context *ctx)
                                __location__
                                ": Unable to roll back sub transaction");
                }
+               ldb_kv->operation_failed = true;
                return ret;
        }
        ret = ldb_kv_sub_transaction_commit(ldb_kv);
@@ -816,6 +832,9 @@ static int ldb_kv_delete(struct ldb_kv_context *ctx)
                                __location__
                                ": Unable to roll back sub transaction");
                }
+               if (ret != LDB_ERR_NO_SUCH_OBJECT) {
+                       ldb_kv->operation_failed = true;
+               }
                return ret;
        }
        ret = ldb_kv_sub_transaction_commit(ldb_kv);
@@ -1380,6 +1399,9 @@ static int ldb_kv_modify(struct ldb_kv_context *ctx)
                                __location__
                                ": Unable to roll back sub transaction");
                }
+               if (ret != LDB_ERR_NO_SUCH_OBJECT) {
+                       ldb_kv->operation_failed = true;
+               }
                return ret;
        }
        ret = ldb_kv_sub_transaction_commit(ldb_kv);
@@ -1545,6 +1567,7 @@ static int ldb_kv_rename(struct ldb_kv_context *ctx)
                                ": Unable to roll back sub transaction");
                }
                talloc_free(msg);
+               ldb_kv->operation_failed = true;
                return ret;
        }
        ret = ldb_kv_sub_transaction_commit(ldb_kv);
@@ -1585,6 +1608,7 @@ static int ldb_kv_start_trans(struct ldb_module *module)
                ldb_kv->index_transaction_cache_size);
 
        ldb_kv->reindex_failed = false;
+       ldb_kv->operation_failed = false;
 
        return LDB_SUCCESS;
 }
@@ -1681,6 +1705,32 @@ static int ldb_kv_end_trans(struct ldb_module *module)
        struct ldb_kv_private *ldb_kv =
            talloc_get_type(data, struct ldb_kv_private);
 
+       /*
+        * If in batch mode and there has been an operation failure
+        * rollback the transaction rather than committing it to avoid
+        * any possible corruption
+        */
+       if (ldb_kv->batch_mode && ldb_kv->operation_failed) {
+               ret = ldb_kv_del_trans( module);
+               if (ret != LDB_SUCCESS) {
+                       ldb_debug_set(ldb_module_get_ctx(module),
+                                     LDB_DEBUG_FATAL,
+                                     "An operation failed during a batch mode "
+                                     "transaction. The transaction could not"
+                                     "be rolled back, ldb_kv_del_trans "
+                                     "returned (%s, %s)",
+                                     ldb_kv->kv_ops->errorstr(ldb_kv),
+                                     ldb_strerror(ret));
+               } else {
+                       ldb_debug_set(ldb_module_get_ctx(module),
+                                     LDB_DEBUG_FATAL,
+                                     "An operation failed during a batch mode "
+                                     "transaction, the transaction was "
+                                     "rolled back");
+               }
+               return LDB_ERR_OPERATIONS_ERROR;
+       }
+
        if (!ldb_kv->prepared_commit) {
                ret = ldb_kv_prepare_commit(module);
                if (ret != LDB_SUCCESS) {
@@ -2226,6 +2276,19 @@ int ldb_kv_init_store(struct ldb_kv_private *ldb_kv,
                        }
                }
        }
+       /*
+        * Set batch mode operation.
+        * This disables the nested sub transactions, and increases the
+        * chance of index corruption.  If using this mode the transaction
+        * commit will be aborted if any operation fails.
+        */
+       {
+               const char *batch_mode = ldb_options_find(
+                       ldb, options, "batch_mode");
+               if (batch_mode != NULL) {
+                       ldb_kv->batch_mode = true;
+               }
+       }
 
        return LDB_SUCCESS;
 }