add a function ctdb_writerecord() to write a record to the database.
authorRonnie Sahlberg <ronniesahlberg@gmail.com>
Mon, 17 May 2010 08:40:24 +0000 (18:40 +1000)
committerRonnie Sahlberg <ronniesahlberg@gmail.com>
Mon, 17 May 2010 08:40:24 +0000 (18:40 +1000)
This function can only be called while hoilding a ctdb_readreacordlock*() handle.
Either from the callback provided or after ctdb_readrecordlock_recv() has been called but before ctdb_free() is used to release the handle.

client/ctdb_client.c
include/ctdb.h
include/ctdb_private.h
libctdb/ctdb_client.c
libctdb/libctdb.c
libctdb/tst.c

index fd34de6b83644c8b26aee6663da7b0b758ff239f..4d492939d90e1c2d3c128e83a16b945e75659344 100644 (file)
 
 
 
-struct ctdb_record_handle {
-       struct ctdb_db_context *ctdb_db;
-       TDB_DATA key;
-       TDB_DATA *data;
-       struct ctdb_ltdb_header header;
-};
-
-
-
-
-
-
-
-
-
-/*
-  full ctdb_call. Equivalent to a ctdb_call_send() followed by a ctdb_call_recv()
-*/
-int ctdb_call(struct ctdb_db_context *ctdb_db, struct ctdb_call *call)
-{
-       struct ctdb_client_call_state *state;
-
-       state = ctdb_call_send(ctdb_db, call);
-       return ctdb_call_recv(state, call);
-}
-
-
-
-
-
-/*
-  cancel a ctdb_fetch_lock operation, releasing the lock
- */
-static int fetch_lock_destructor(struct ctdb_record_handle *h)
-{
-       ctdb_ltdb_unlock(h->ctdb_db, h->key);
-       return 0;
-}
-
-/*
-  force the migration of a record to this node
- */
-static int ctdb_client_force_migration(struct ctdb_db_context *ctdb_db, TDB_DATA key)
-{
-       struct ctdb_call call;
-       ZERO_STRUCT(call);
-       call.call_id = CTDB_NULL_FUNC;
-       call.key = key;
-       call.flags = CTDB_IMMEDIATE_MIGRATION;
-       return ctdb_call(ctdb_db, &call);
-}
-
-/*
-  get a lock on a record, and return the records data. Blocks until it gets the lock
- */
-struct ctdb_record_handle *ctdb_fetch_lock(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, 
-                                          TDB_DATA key, TDB_DATA *data)
-{
-       int ret;
-       struct ctdb_record_handle *h;
-
-       /*
-         procedure is as follows:
-
-         1) get the chain lock. 
-         2) check if we are dmaster
-         3) if we are the dmaster then return handle 
-         4) if not dmaster then ask ctdb daemon to make us dmaster, and wait for
-            reply from ctdbd
-         5) when we get the reply, goto (1)
-        */
-
-       h = talloc_zero(mem_ctx, struct ctdb_record_handle);
-       if (h == NULL) {
-               return NULL;
-       }
-
-       h->ctdb_db = ctdb_db;
-       h->key     = key;
-       h->key.dptr = talloc_memdup(h, key.dptr, key.dsize);
-       if (h->key.dptr == NULL) {
-               talloc_free(h);
-               return NULL;
-       }
-       h->data    = data;
-
-       DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: key=%*.*s\n", (int)key.dsize, (int)key.dsize, 
-                (const char *)key.dptr));
-
-again:
-       /* step 1 - get the chain lock */
-       ret = ctdb_ltdb_lock(ctdb_db, key);
-       if (ret != 0) {
-               DEBUG(DEBUG_ERR, (__location__ " failed to lock ltdb record\n"));
-               talloc_free(h);
-               return NULL;
-       }
-
-       DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: got chain lock\n"));
-
-       talloc_set_destructor(h, fetch_lock_destructor);
-
-       ret = ctdb_ltdb_fetch(ctdb_db, key, &h->header, h, data);
-
-       /* when torturing, ensure we test the remote path */
-       if ((ctdb_db->ctdb->flags & CTDB_FLAG_TORTURE) &&
-           random() % 5 == 0) {
-               h->header.dmaster = (uint32_t)-1;
-       }
-
-
-       DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: done local fetch\n"));
-
-       if (ret != 0 || h->header.dmaster != ctdb_db->ctdb->pnn) {
-               ctdb_ltdb_unlock(ctdb_db, key);
-               ret = ctdb_client_force_migration(ctdb_db, key);
-               if (ret != 0) {
-                       DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: force_migration failed\n"));
-                       talloc_free(h);
-                       return NULL;
-               }
-               goto again;
-       }
-
-       DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: we are dmaster - done\n"));
-       return h;
-}
-
-/*
-  store some data to the record that was locked with ctdb_fetch_lock()
-*/
-int ctdb_record_store(struct ctdb_record_handle *h, TDB_DATA data)
-{
-       if (h->ctdb_db->persistent) {
-               DEBUG(DEBUG_ERR, (__location__ " ctdb_record_store prohibited for persistent dbs\n"));
-               return -1;
-       }
-
-       return ctdb_ltdb_store(h->ctdb_db, h->key, &h->header, data);
-}
 
 /*
   non-locking fetch of a record
index d44fca229d2da0dc6a8df865af9d84230b8c3ead..d0b9507711a63efdb6b8e681922162643c840c75 100644 (file)
@@ -68,11 +68,11 @@ typedef void ctdb_handle;
  * functions to attach to a database
  * if the database does not exist it will be created.
  *
- * Use ctdb_free() to release the returned ctdb_db_context when finished.
+ * You have to free the handle with ctdb_free() when finished with it.
  */
 struct ctdb_db_context;
 
-typedef void (*ctdb_attachdb_cb)(int32_t status, struct ctdb_db_context *ctdb_db, void *private_data);
+typedef void (*ctdb_attachdb_cb)(int32_t status, ctdb_handle *, struct ctdb_db_context *ctdb_db, void *private_data);
 
 ctdb_handle *
 ctdb_attachdb_send(struct ctdb_context *ctdb,
@@ -86,6 +86,47 @@ int ctdb_attachdb(struct ctdb_context *ctdb,
                  struct ctdb_db_context **);
 
 
+/*
+ * functions to read a record from the database
+ * when the callback is invoked, the client will hold an exclusive lock
+ * on the record, until the handle is ctdb_free()d.
+ * the client MUST NOT block during holding this lock and MUST
+ * release it quickly by performing ctdb_free(handle).
+ *
+ * When the handle is freed, data is freed too, so make sure to copy the data
+ * before freeing the handle.
+ */
+typedef void (*ctdb_readrecordlock_cb)(int32_t status, ctdb_handle *handle, TDB_DATA data, void *private_data);
+
+ctdb_handle *
+ctdb_readrecordlock_send(struct ctdb_context *ctdb,
+               struct ctdb_db_context *ctdb_db_context,
+               TDB_DATA key,
+               ctdb_readrecordlock_cb callback,
+               void *private_data);
+int ctdb_readrecordlock_recv(struct ctdb_context *ctdb,
+               ctdb_handle *handle,
+               TDB_DATA **data);
+int ctdb_readrecordlock(struct ctdb_context *ctdb,
+               struct ctdb_db_context *ctdb_db_context,
+               TDB_DATA key,
+               TDB_DATA **data);
+
+
+
+/*
+ * Function to write data to a record
+ * This function may ONLY be called while holding a lock to the record 
+ * created by ctdb_readrecordlock*
+ * Either from the callback provided to ctdb_readrecordlock_send()
+ * or after calling ctdb_readrecordlock_recv() but before calling
+ * ctdb_free() to release the handle.
+ */
+int ctdb_writerecord(ctdb_handle *handle,
+               TDB_DATA key,
+               TDB_DATA data);
+
+
 
 /*
  * messaging functions
index 90d4c13298c83b295f50b3a04badea54a344bb65..ab60a9d3fa18b1601806b477e83a33066fb84706 100644 (file)
@@ -1652,4 +1652,11 @@ void ctdb_control_timeout_func(struct event_context *ev,
 
 void ctdb_invoke_control_callback(struct event_context *ev, struct timed_event *te, struct timeval t, void *private_data);
 
+struct ctdb_record_handle {
+       struct ctdb_db_context *ctdb_db;
+       TDB_DATA key;
+       TDB_DATA *data;
+       struct ctdb_ltdb_header header;
+};
+
 #endif
index a0841b5c0e567a82be652565367b3b2073b27933..a7b2112b8482c3f253d69f6ef32f372b3928cb8c 100644 (file)
@@ -660,6 +660,19 @@ int ctdb_call_recv(struct ctdb_client_call_state *state, struct ctdb_call *call)
        return 0;
 }
 
+/*
+  full ctdb_call. Equivalent to a ctdb_call_send() followed by a ctdb_call_recv()
+*/
+int ctdb_call(struct ctdb_db_context *ctdb_db, struct ctdb_call *call)
+{
+       struct ctdb_client_call_state *state;
+
+       state = ctdb_call_send(ctdb_db, call);
+       return ctdb_call_recv(state, call);
+}
+
+
+
 /*
   send a message - from client context
  */
@@ -727,3 +740,117 @@ int ctdb_set_call(struct ctdb_db_context *ctdb_db, ctdb_fn_t fn, uint32_t id)
        DLIST_ADD(ctdb_db->calls, call);        
        return 0;
 }
+
+
+
+/*
+  store some data to the record that was locked with ctdb_fetch_lock()
+*/
+int ctdb_record_store(struct ctdb_record_handle *h, TDB_DATA data)
+{
+       if (h->ctdb_db->persistent) {
+               DEBUG(DEBUG_ERR, (__location__ " ctdb_record_store prohibited for persistent dbs\n"));
+               return -1;
+       }
+
+       return ctdb_ltdb_store(h->ctdb_db, h->key, &h->header, data);
+}
+
+/*
+  cancel a ctdb_fetch_lock operation, releasing the lock
+ */
+static int fetch_lock_destructor(struct ctdb_record_handle *h)
+{
+       ctdb_ltdb_unlock(h->ctdb_db, h->key);
+       return 0;
+}
+
+
+/*
+  force the migration of a record to this node
+ */
+static int ctdb_client_force_migration(struct ctdb_db_context *ctdb_db, TDB_DATA key)
+{
+       struct ctdb_call call;
+       ZERO_STRUCT(call);
+       call.call_id = CTDB_NULL_FUNC;
+       call.key = key;
+       call.flags = CTDB_IMMEDIATE_MIGRATION;
+       return ctdb_call(ctdb_db, &call);
+}
+
+/*
+  get a lock on a record, and return the records data. Blocks until it gets the lock
+ */
+struct ctdb_record_handle *ctdb_fetch_lock(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, 
+                                          TDB_DATA key, TDB_DATA *data)
+{
+       int ret;
+       struct ctdb_record_handle *h;
+
+       /*
+         procedure is as follows:
+
+         1) get the chain lock. 
+         2) check if we are dmaster
+         3) if we are the dmaster then return handle 
+         4) if not dmaster then ask ctdb daemon to make us dmaster, and wait for
+            reply from ctdbd
+         5) when we get the reply, goto (1)
+        */
+
+       h = talloc_zero(mem_ctx, struct ctdb_record_handle);
+       if (h == NULL) {
+               return NULL;
+       }
+
+       h->ctdb_db = ctdb_db;
+       h->key     = key;
+       h->key.dptr = talloc_memdup(h, key.dptr, key.dsize);
+       if (h->key.dptr == NULL) {
+               talloc_free(h);
+               return NULL;
+       }
+       h->data    = data;
+
+       DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: key=%*.*s\n", (int)key.dsize, (int)key.dsize, 
+                (const char *)key.dptr));
+
+again:
+       /* step 1 - get the chain lock */
+       ret = ctdb_ltdb_lock(ctdb_db, key);
+       if (ret != 0) {
+               DEBUG(DEBUG_ERR, (__location__ " failed to lock ltdb record\n"));
+               talloc_free(h);
+               return NULL;
+       }
+
+       DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: got chain lock\n"));
+
+       talloc_set_destructor(h, fetch_lock_destructor);
+
+       ret = ctdb_ltdb_fetch(ctdb_db, key, &h->header, h, data);
+
+       /* when torturing, ensure we test the remote path */
+       if ((ctdb_db->ctdb->flags & CTDB_FLAG_TORTURE) &&
+           random() % 5 == 0) {
+               h->header.dmaster = (uint32_t)-1;
+       }
+
+
+       DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: done local fetch\n"));
+
+       if (ret != 0 || h->header.dmaster != ctdb_db->ctdb->pnn) {
+               ctdb_ltdb_unlock(ctdb_db, key);
+               ret = ctdb_client_force_migration(ctdb_db, key);
+               if (ret != 0) {
+                       DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: force_migration failed\n"));
+                       talloc_free(h);
+                       return NULL;
+               }
+               goto again;
+       }
+
+       DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: we are dmaster - done\n"));
+       return h;
+}
index 7e95431d2c57c15111135115e1b102fbfcd3bb8c..02b40abaafdbb66e607088a0a51613ab3a2533b5 100644 (file)
@@ -26,6 +26,7 @@
 #include "lib/util/dlinklist.h"
 #include "lib/events/events.h"
 #include "lib/events/events_internal.h"
+#include "lib/tdb/include/tdb.h"
 #include "include/ctdb.h"
 #include "include/ctdb_protocol.h"
 #include "include/ctdb_private.h"
@@ -338,13 +339,6 @@ int ctdb_getrecmaster(struct ctdb_context *ctdb, uint32_t destnode, uint32_t *re
 
 
 
-
-
-
-
-
-
-
 static void
 ctdb_set_message_handler_recv_cb(struct ctdb_client_control_state *state)
 {
@@ -763,8 +757,7 @@ ctdb_attachdb_recv2_cb(struct ctdb_client_control_state *state)
                DEBUG(DEBUG_ERR,(__location__ " getdbpath control failed with state:%d and status:%d\n", state->state, state->status));
                adb_state->state = CTDB_CONTROL_ERROR;
                if (callback != NULL) {
-                       callback(-1, NULL, adb_state->private_data);
-                       talloc_free(adb_state);
+                       callback(-1, adb_state, NULL, adb_state->private_data);
                }
                return;
        }
@@ -781,8 +774,7 @@ ctdb_attachdb_recv2_cb(struct ctdb_client_control_state *state)
                DEBUG(DEBUG_ERR, (__location__ " Failed to open tdb '%s'\n", ctdb_db->db_path));
                adb_state->state = CTDB_CONTROL_ERROR;
                if (callback != NULL) {
-                       callback(-1, NULL, adb_state->private_data);
-                       talloc_free(adb_state);
+                       callback(-1, adb_state, NULL, adb_state->private_data);
                }
                return;
        }
@@ -800,8 +792,7 @@ ctdb_attachdb_recv2_cb(struct ctdb_client_control_state *state)
 
        adb_state->state = CTDB_CONTROL_DONE;
        if (callback != NULL) {
-               callback(0, ctdb_db, adb_state->private_data);
-               talloc_free(adb_state);
+               callback(0, adb_state, ctdb_db, adb_state->private_data);
        }
 }
 
@@ -817,8 +808,7 @@ ctdb_attachdb_recv1_cb(struct ctdb_client_control_state *state)
                DEBUG(DEBUG_ERR,(__location__ " createdb control failed with state:%d and status:%d\n", state->state, state->status));
                adb_state->state = CTDB_CONTROL_ERROR;
                if (callback != NULL) {
-                       callback(-1, NULL, adb_state->private_data);
-                       talloc_free(adb_state);
+                       callback(-1, adb_state, NULL, adb_state->private_data);
                }
                return;
        }
@@ -827,8 +817,7 @@ ctdb_attachdb_recv1_cb(struct ctdb_client_control_state *state)
                DEBUG(DEBUG_ERR, (__location__ " Wrong size of data returned for CREATEDB control. Got %zd bytes but expected %zd\n", state->outdata.dsize, sizeof(uint32_t)));
                adb_state->state = CTDB_CONTROL_ERROR;
                if (callback != NULL) {
-                       callback(-1, NULL, adb_state->private_data);
-                       talloc_free(adb_state);
+                       callback(-1, adb_state, NULL, adb_state->private_data);
                }
                return;
        }
@@ -840,8 +829,7 @@ ctdb_attachdb_recv1_cb(struct ctdb_client_control_state *state)
                DEBUG(DEBUG_ERR,(__location__ " ctdb_getdbpath_send() failed.\n"));
                adb_state->state = CTDB_CONTROL_ERROR;
                if (callback != NULL) {
-                       callback(-1, NULL, adb_state->private_data);
-                       talloc_free(adb_state);
+                       callback(-1, adb_state, NULL, adb_state->private_data);
                }
                return;
        }
@@ -960,3 +948,78 @@ int ctdb_attachdb(struct ctdb_context *ctdb,
        return 0;
 }
 
+
+
+
+/*
+ * read a record from the database
+ *
+ * the returned data must be freed by using ctdb_free()
+ */
+struct ctdb_call_cb_data {
+       struct ctdb_db_context *ctdb_db;
+       struct ctdb_record_handle *h;
+       TDB_DATA data;
+
+       void *callback;
+       void *private_data;
+};
+
+/* These are not proper async methods yet */
+ctdb_handle *
+ctdb_readrecordlock_send(struct ctdb_context *ctdb,
+               struct ctdb_db_context *ctdb_db,
+               TDB_DATA key,
+               ctdb_readrecordlock_cb callback,
+               void *private_data)
+{
+       struct ctdb_call_cb_data *state;
+       TDB_DATA data;
+
+       state = talloc_zero(ctdb, struct ctdb_call_cb_data);
+       if (state == NULL) {
+               DEBUG(DEBUG_ERR, (__location__ " Failed to send CALL\n"));
+               callback(-1, NULL, tdb_null, private_data);
+               return NULL;
+       }
+
+       state->h = ctdb_fetch_lock(ctdb_db, state, key, &data);
+
+       /* since this is fake async, invoke the callback immediately if set */
+       if (callback != NULL) {
+               callback(0, state, data, private_data);
+               return state;
+       }
+
+       return state;
+}
+
+int ctdb_readrecord_recv(struct ctdb_context *ctdb,
+               ctdb_handle *handle,
+               TDB_DATA **data);
+/*qqq*/
+int ctdb_readrecord(struct ctdb_context *ctdb,
+               struct ctdb_db_context *ctdb_db_context,
+               TDB_DATA key,
+               TDB_DATA **data);
+/*qqq*/
+
+
+
+int ctdb_writerecord(ctdb_handle *handle,
+               TDB_DATA key,
+               TDB_DATA data)
+{
+       struct ctdb_call_cb_data *state = talloc_get_type(handle,
+                               struct ctdb_call_cb_data);
+
+       struct ctdb_record_handle *h = talloc_get_type(state->h,
+                               struct ctdb_record_handle);
+
+       if (h->ctdb_db->persistent) {
+               DEBUG(DEBUG_ERR, (__location__ " ctdb_record_store prohibited for persistent dbs\n"));
+               return -1;
+       }
+
+       return ctdb_ltdb_store(h->ctdb_db, h->key, &h->header, data);
+}
index dc030fe683e7a6015a976a016dc7a8ac89bf40f1..452ebe1a1c0c69b6c2cb7259cc2150ff50993bdc 100644 (file)
@@ -7,6 +7,8 @@
 #include "lib/tdb/include/tdb.h"
 #include "include/ctdb.h"
 
+TDB_DATA key;
+
 void msg_h(struct ctdb_context *ctdb, uint64_t srvid, TDB_DATA data, void *private_data)
 {
        printf("Message received on port %d : %s\n", (int)srvid, data.dptr);
@@ -23,12 +25,46 @@ void rm_cb(int32_t status, int32_t recmaster, void *private_data)
        printf("status:%d recmaster:%d\n", status, recmaster);
 }
 
-void adb_cb(int32_t status, struct ctdb_db_context *ctdb_db, void *private_data)
+/*
+ * example on how to first read(non-existing recortds are implicitely created
+ * on demand) a record and change it in the callback.
+ * This forms the atom for the read-modify-write cycle.
+ *
+ * Pure read, or pure write are just special cases of this cycle.
+ */
+void rrl_cb(int32_t status, ctdb_handle *handle, TDB_DATA outdata, void *private_data)
 {
-       printf("status:%d db:%p\n", status, ctdb_db);
+       TDB_DATA data;
+       char tmp[256];
+
+       if (status != 0) {
+               printf("rrl_cb returned error: status %d\n", status);
+               if (handle) {
+                       ctdb_free(handle);
+               }
+               return;
+       }
+
+       printf("rrl size:%d data:%s\n", outdata.dsize, outdata.dptr);
+       if (outdata.dsize == 0) {
+               tmp[0] = 0;
+       } else {
+               strcpy(tmp, outdata.dptr);
+       }
+       strcat(tmp, "*");
+
+       data.dptr  = tmp;
+       data.dsize = strlen(tmp) + 1;
+       ctdb_writerecord(handle, key, data);
+
+       printf("Wrote new record : %s\n", tmp);
+
+       ctdb_free(handle);
 }
 
 
+
+
 int main(int argc, char *argv[])
 {
        struct ctdb_context *ctdb_context;
@@ -38,6 +74,9 @@ int main(int argc, char *argv[])
        int ret;
        TDB_DATA msg;
 
+       key.dptr  = "Test Record";
+       key.dsize = strlen(key.dptr);
+
        ctdb_context = ctdb_connect("/tmp/ctdb.socket");
 
        handle = ctdb_set_message_handler_send(ctdb_context, 55, NULL, msg_h, NULL);
@@ -60,11 +99,17 @@ int main(int argc, char *argv[])
                exit(10);
        }
 
-       handle = ctdb_attachdb_send(ctdb_context, CTDB_CURRENT_NODE, "test_test.tdb", 0, 0, adb_cb, NULL);
+       handle = ctdb_attachdb_send(ctdb_context, "test_test.tdb", 0, 0, NULL, NULL);
        if (handle == NULL) {
                printf("Failed to send attachdb control\n");
                exit(10);
        }
+       ret = ctdb_attachdb_recv(ctdb_context, handle, &ctdb_db_context);
+       if (ret != 0 ) {
+               printf("Failed to attach to database\n");
+               exit(10);
+       }
+
 
        handle = ctdb_getpnn_send(ctdb_context, CTDB_CURRENT_NODE, pnn_cb, NULL);
        if (handle == NULL) {
@@ -72,9 +117,9 @@ int main(int argc, char *argv[])
                exit(10);
        }
 
-       handle = ctdb_getrecmaster_send(ctdb_context, CTDB_CURRENT_NODE, rm_cb, NULL);
+       handle = ctdb_readrecordlock_send(ctdb_context, ctdb_db_context, key, rrl_cb, NULL);
        if (handle == NULL) {
-               printf("Failed to send get_recmaster control\n");
+               printf("Failed to send READRECORDLOCK\n");
                exit(10);
        }