From: Ronnie Sahlberg Date: Mon, 17 May 2010 08:40:24 +0000 (+1000) Subject: add a function ctdb_writerecord() to write a record to the database. X-Git-Url: http://git.samba.org/?p=sahlberg%2Fctdb.git;a=commitdiff_plain;h=b28816fecc7c56a15f6027676c9557283ebc4338 add a function ctdb_writerecord() to write a record to the database. This function can only be called while hoilding a ctdb_readreacordlock*() handle. Either from the callback provided or after ctdb_readrecordlock_recv() has been called but before ctdb_free() is used to release the handle. --- diff --git a/client/ctdb_client.c b/client/ctdb_client.c index fd34de6b..4d492939 100644 --- a/client/ctdb_client.c +++ b/client/ctdb_client.c @@ -33,146 +33,6 @@ -struct ctdb_record_handle { - struct ctdb_db_context *ctdb_db; - TDB_DATA key; - TDB_DATA *data; - struct ctdb_ltdb_header header; -}; - - - - - - - - - -/* - full ctdb_call. Equivalent to a ctdb_call_send() followed by a ctdb_call_recv() -*/ -int ctdb_call(struct ctdb_db_context *ctdb_db, struct ctdb_call *call) -{ - struct ctdb_client_call_state *state; - - state = ctdb_call_send(ctdb_db, call); - return ctdb_call_recv(state, call); -} - - - - - -/* - cancel a ctdb_fetch_lock operation, releasing the lock - */ -static int fetch_lock_destructor(struct ctdb_record_handle *h) -{ - ctdb_ltdb_unlock(h->ctdb_db, h->key); - return 0; -} - -/* - force the migration of a record to this node - */ -static int ctdb_client_force_migration(struct ctdb_db_context *ctdb_db, TDB_DATA key) -{ - struct ctdb_call call; - ZERO_STRUCT(call); - call.call_id = CTDB_NULL_FUNC; - call.key = key; - call.flags = CTDB_IMMEDIATE_MIGRATION; - return ctdb_call(ctdb_db, &call); -} - -/* - get a lock on a record, and return the records data. Blocks until it gets the lock - */ -struct ctdb_record_handle *ctdb_fetch_lock(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, - TDB_DATA key, TDB_DATA *data) -{ - int ret; - struct ctdb_record_handle *h; - - /* - procedure is as follows: - - 1) get the chain lock. - 2) check if we are dmaster - 3) if we are the dmaster then return handle - 4) if not dmaster then ask ctdb daemon to make us dmaster, and wait for - reply from ctdbd - 5) when we get the reply, goto (1) - */ - - h = talloc_zero(mem_ctx, struct ctdb_record_handle); - if (h == NULL) { - return NULL; - } - - h->ctdb_db = ctdb_db; - h->key = key; - h->key.dptr = talloc_memdup(h, key.dptr, key.dsize); - if (h->key.dptr == NULL) { - talloc_free(h); - return NULL; - } - h->data = data; - - DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: key=%*.*s\n", (int)key.dsize, (int)key.dsize, - (const char *)key.dptr)); - -again: - /* step 1 - get the chain lock */ - ret = ctdb_ltdb_lock(ctdb_db, key); - if (ret != 0) { - DEBUG(DEBUG_ERR, (__location__ " failed to lock ltdb record\n")); - talloc_free(h); - return NULL; - } - - DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: got chain lock\n")); - - talloc_set_destructor(h, fetch_lock_destructor); - - ret = ctdb_ltdb_fetch(ctdb_db, key, &h->header, h, data); - - /* when torturing, ensure we test the remote path */ - if ((ctdb_db->ctdb->flags & CTDB_FLAG_TORTURE) && - random() % 5 == 0) { - h->header.dmaster = (uint32_t)-1; - } - - - DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: done local fetch\n")); - - if (ret != 0 || h->header.dmaster != ctdb_db->ctdb->pnn) { - ctdb_ltdb_unlock(ctdb_db, key); - ret = ctdb_client_force_migration(ctdb_db, key); - if (ret != 0) { - DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: force_migration failed\n")); - talloc_free(h); - return NULL; - } - goto again; - } - - DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: we are dmaster - done\n")); - return h; -} - -/* - store some data to the record that was locked with ctdb_fetch_lock() -*/ -int ctdb_record_store(struct ctdb_record_handle *h, TDB_DATA data) -{ - if (h->ctdb_db->persistent) { - DEBUG(DEBUG_ERR, (__location__ " ctdb_record_store prohibited for persistent dbs\n")); - return -1; - } - - return ctdb_ltdb_store(h->ctdb_db, h->key, &h->header, data); -} /* non-locking fetch of a record diff --git a/include/ctdb.h b/include/ctdb.h index d44fca22..d0b95077 100644 --- a/include/ctdb.h +++ b/include/ctdb.h @@ -68,11 +68,11 @@ typedef void ctdb_handle; * functions to attach to a database * if the database does not exist it will be created. * - * Use ctdb_free() to release the returned ctdb_db_context when finished. + * You have to free the handle with ctdb_free() when finished with it. */ struct ctdb_db_context; -typedef void (*ctdb_attachdb_cb)(int32_t status, struct ctdb_db_context *ctdb_db, void *private_data); +typedef void (*ctdb_attachdb_cb)(int32_t status, ctdb_handle *, struct ctdb_db_context *ctdb_db, void *private_data); ctdb_handle * ctdb_attachdb_send(struct ctdb_context *ctdb, @@ -86,6 +86,47 @@ int ctdb_attachdb(struct ctdb_context *ctdb, struct ctdb_db_context **); +/* + * functions to read a record from the database + * when the callback is invoked, the client will hold an exclusive lock + * on the record, until the handle is ctdb_free()d. + * the client MUST NOT block during holding this lock and MUST + * release it quickly by performing ctdb_free(handle). + * + * When the handle is freed, data is freed too, so make sure to copy the data + * before freeing the handle. + */ +typedef void (*ctdb_readrecordlock_cb)(int32_t status, ctdb_handle *handle, TDB_DATA data, void *private_data); + +ctdb_handle * +ctdb_readrecordlock_send(struct ctdb_context *ctdb, + struct ctdb_db_context *ctdb_db_context, + TDB_DATA key, + ctdb_readrecordlock_cb callback, + void *private_data); +int ctdb_readrecordlock_recv(struct ctdb_context *ctdb, + ctdb_handle *handle, + TDB_DATA **data); +int ctdb_readrecordlock(struct ctdb_context *ctdb, + struct ctdb_db_context *ctdb_db_context, + TDB_DATA key, + TDB_DATA **data); + + + +/* + * Function to write data to a record + * This function may ONLY be called while holding a lock to the record + * created by ctdb_readrecordlock* + * Either from the callback provided to ctdb_readrecordlock_send() + * or after calling ctdb_readrecordlock_recv() but before calling + * ctdb_free() to release the handle. + */ +int ctdb_writerecord(ctdb_handle *handle, + TDB_DATA key, + TDB_DATA data); + + /* * messaging functions diff --git a/include/ctdb_private.h b/include/ctdb_private.h index 90d4c132..ab60a9d3 100644 --- a/include/ctdb_private.h +++ b/include/ctdb_private.h @@ -1652,4 +1652,11 @@ void ctdb_control_timeout_func(struct event_context *ev, void ctdb_invoke_control_callback(struct event_context *ev, struct timed_event *te, struct timeval t, void *private_data); +struct ctdb_record_handle { + struct ctdb_db_context *ctdb_db; + TDB_DATA key; + TDB_DATA *data; + struct ctdb_ltdb_header header; +}; + #endif diff --git a/libctdb/ctdb_client.c b/libctdb/ctdb_client.c index a0841b5c..a7b2112b 100644 --- a/libctdb/ctdb_client.c +++ b/libctdb/ctdb_client.c @@ -660,6 +660,19 @@ int ctdb_call_recv(struct ctdb_client_call_state *state, struct ctdb_call *call) return 0; } +/* + full ctdb_call. Equivalent to a ctdb_call_send() followed by a ctdb_call_recv() +*/ +int ctdb_call(struct ctdb_db_context *ctdb_db, struct ctdb_call *call) +{ + struct ctdb_client_call_state *state; + + state = ctdb_call_send(ctdb_db, call); + return ctdb_call_recv(state, call); +} + + + /* send a message - from client context */ @@ -727,3 +740,117 @@ int ctdb_set_call(struct ctdb_db_context *ctdb_db, ctdb_fn_t fn, uint32_t id) DLIST_ADD(ctdb_db->calls, call); return 0; } + + + +/* + store some data to the record that was locked with ctdb_fetch_lock() +*/ +int ctdb_record_store(struct ctdb_record_handle *h, TDB_DATA data) +{ + if (h->ctdb_db->persistent) { + DEBUG(DEBUG_ERR, (__location__ " ctdb_record_store prohibited for persistent dbs\n")); + return -1; + } + + return ctdb_ltdb_store(h->ctdb_db, h->key, &h->header, data); +} + +/* + cancel a ctdb_fetch_lock operation, releasing the lock + */ +static int fetch_lock_destructor(struct ctdb_record_handle *h) +{ + ctdb_ltdb_unlock(h->ctdb_db, h->key); + return 0; +} + + +/* + force the migration of a record to this node + */ +static int ctdb_client_force_migration(struct ctdb_db_context *ctdb_db, TDB_DATA key) +{ + struct ctdb_call call; + ZERO_STRUCT(call); + call.call_id = CTDB_NULL_FUNC; + call.key = key; + call.flags = CTDB_IMMEDIATE_MIGRATION; + return ctdb_call(ctdb_db, &call); +} + +/* + get a lock on a record, and return the records data. Blocks until it gets the lock + */ +struct ctdb_record_handle *ctdb_fetch_lock(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, + TDB_DATA key, TDB_DATA *data) +{ + int ret; + struct ctdb_record_handle *h; + + /* + procedure is as follows: + + 1) get the chain lock. + 2) check if we are dmaster + 3) if we are the dmaster then return handle + 4) if not dmaster then ask ctdb daemon to make us dmaster, and wait for + reply from ctdbd + 5) when we get the reply, goto (1) + */ + + h = talloc_zero(mem_ctx, struct ctdb_record_handle); + if (h == NULL) { + return NULL; + } + + h->ctdb_db = ctdb_db; + h->key = key; + h->key.dptr = talloc_memdup(h, key.dptr, key.dsize); + if (h->key.dptr == NULL) { + talloc_free(h); + return NULL; + } + h->data = data; + + DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: key=%*.*s\n", (int)key.dsize, (int)key.dsize, + (const char *)key.dptr)); + +again: + /* step 1 - get the chain lock */ + ret = ctdb_ltdb_lock(ctdb_db, key); + if (ret != 0) { + DEBUG(DEBUG_ERR, (__location__ " failed to lock ltdb record\n")); + talloc_free(h); + return NULL; + } + + DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: got chain lock\n")); + + talloc_set_destructor(h, fetch_lock_destructor); + + ret = ctdb_ltdb_fetch(ctdb_db, key, &h->header, h, data); + + /* when torturing, ensure we test the remote path */ + if ((ctdb_db->ctdb->flags & CTDB_FLAG_TORTURE) && + random() % 5 == 0) { + h->header.dmaster = (uint32_t)-1; + } + + + DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: done local fetch\n")); + + if (ret != 0 || h->header.dmaster != ctdb_db->ctdb->pnn) { + ctdb_ltdb_unlock(ctdb_db, key); + ret = ctdb_client_force_migration(ctdb_db, key); + if (ret != 0) { + DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: force_migration failed\n")); + talloc_free(h); + return NULL; + } + goto again; + } + + DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: we are dmaster - done\n")); + return h; +} diff --git a/libctdb/libctdb.c b/libctdb/libctdb.c index 7e95431d..02b40aba 100644 --- a/libctdb/libctdb.c +++ b/libctdb/libctdb.c @@ -26,6 +26,7 @@ #include "lib/util/dlinklist.h" #include "lib/events/events.h" #include "lib/events/events_internal.h" +#include "lib/tdb/include/tdb.h" #include "include/ctdb.h" #include "include/ctdb_protocol.h" #include "include/ctdb_private.h" @@ -338,13 +339,6 @@ int ctdb_getrecmaster(struct ctdb_context *ctdb, uint32_t destnode, uint32_t *re - - - - - - - static void ctdb_set_message_handler_recv_cb(struct ctdb_client_control_state *state) { @@ -763,8 +757,7 @@ ctdb_attachdb_recv2_cb(struct ctdb_client_control_state *state) DEBUG(DEBUG_ERR,(__location__ " getdbpath control failed with state:%d and status:%d\n", state->state, state->status)); adb_state->state = CTDB_CONTROL_ERROR; if (callback != NULL) { - callback(-1, NULL, adb_state->private_data); - talloc_free(adb_state); + callback(-1, adb_state, NULL, adb_state->private_data); } return; } @@ -781,8 +774,7 @@ ctdb_attachdb_recv2_cb(struct ctdb_client_control_state *state) DEBUG(DEBUG_ERR, (__location__ " Failed to open tdb '%s'\n", ctdb_db->db_path)); adb_state->state = CTDB_CONTROL_ERROR; if (callback != NULL) { - callback(-1, NULL, adb_state->private_data); - talloc_free(adb_state); + callback(-1, adb_state, NULL, adb_state->private_data); } return; } @@ -800,8 +792,7 @@ ctdb_attachdb_recv2_cb(struct ctdb_client_control_state *state) adb_state->state = CTDB_CONTROL_DONE; if (callback != NULL) { - callback(0, ctdb_db, adb_state->private_data); - talloc_free(adb_state); + callback(0, adb_state, ctdb_db, adb_state->private_data); } } @@ -817,8 +808,7 @@ ctdb_attachdb_recv1_cb(struct ctdb_client_control_state *state) DEBUG(DEBUG_ERR,(__location__ " createdb control failed with state:%d and status:%d\n", state->state, state->status)); adb_state->state = CTDB_CONTROL_ERROR; if (callback != NULL) { - callback(-1, NULL, adb_state->private_data); - talloc_free(adb_state); + callback(-1, adb_state, NULL, adb_state->private_data); } return; } @@ -827,8 +817,7 @@ ctdb_attachdb_recv1_cb(struct ctdb_client_control_state *state) DEBUG(DEBUG_ERR, (__location__ " Wrong size of data returned for CREATEDB control. Got %zd bytes but expected %zd\n", state->outdata.dsize, sizeof(uint32_t))); adb_state->state = CTDB_CONTROL_ERROR; if (callback != NULL) { - callback(-1, NULL, adb_state->private_data); - talloc_free(adb_state); + callback(-1, adb_state, NULL, adb_state->private_data); } return; } @@ -840,8 +829,7 @@ ctdb_attachdb_recv1_cb(struct ctdb_client_control_state *state) DEBUG(DEBUG_ERR,(__location__ " ctdb_getdbpath_send() failed.\n")); adb_state->state = CTDB_CONTROL_ERROR; if (callback != NULL) { - callback(-1, NULL, adb_state->private_data); - talloc_free(adb_state); + callback(-1, adb_state, NULL, adb_state->private_data); } return; } @@ -960,3 +948,78 @@ int ctdb_attachdb(struct ctdb_context *ctdb, return 0; } + + + +/* + * read a record from the database + * + * the returned data must be freed by using ctdb_free() + */ +struct ctdb_call_cb_data { + struct ctdb_db_context *ctdb_db; + struct ctdb_record_handle *h; + TDB_DATA data; + + void *callback; + void *private_data; +}; + +/* These are not proper async methods yet */ +ctdb_handle * +ctdb_readrecordlock_send(struct ctdb_context *ctdb, + struct ctdb_db_context *ctdb_db, + TDB_DATA key, + ctdb_readrecordlock_cb callback, + void *private_data) +{ + struct ctdb_call_cb_data *state; + TDB_DATA data; + + state = talloc_zero(ctdb, struct ctdb_call_cb_data); + if (state == NULL) { + DEBUG(DEBUG_ERR, (__location__ " Failed to send CALL\n")); + callback(-1, NULL, tdb_null, private_data); + return NULL; + } + + state->h = ctdb_fetch_lock(ctdb_db, state, key, &data); + + /* since this is fake async, invoke the callback immediately if set */ + if (callback != NULL) { + callback(0, state, data, private_data); + return state; + } + + return state; +} + +int ctdb_readrecord_recv(struct ctdb_context *ctdb, + ctdb_handle *handle, + TDB_DATA **data); +/*qqq*/ +int ctdb_readrecord(struct ctdb_context *ctdb, + struct ctdb_db_context *ctdb_db_context, + TDB_DATA key, + TDB_DATA **data); +/*qqq*/ + + + +int ctdb_writerecord(ctdb_handle *handle, + TDB_DATA key, + TDB_DATA data) +{ + struct ctdb_call_cb_data *state = talloc_get_type(handle, + struct ctdb_call_cb_data); + + struct ctdb_record_handle *h = talloc_get_type(state->h, + struct ctdb_record_handle); + + if (h->ctdb_db->persistent) { + DEBUG(DEBUG_ERR, (__location__ " ctdb_record_store prohibited for persistent dbs\n")); + return -1; + } + + return ctdb_ltdb_store(h->ctdb_db, h->key, &h->header, data); +} diff --git a/libctdb/tst.c b/libctdb/tst.c index dc030fe6..452ebe1a 100644 --- a/libctdb/tst.c +++ b/libctdb/tst.c @@ -7,6 +7,8 @@ #include "lib/tdb/include/tdb.h" #include "include/ctdb.h" +TDB_DATA key; + void msg_h(struct ctdb_context *ctdb, uint64_t srvid, TDB_DATA data, void *private_data) { printf("Message received on port %d : %s\n", (int)srvid, data.dptr); @@ -23,12 +25,46 @@ void rm_cb(int32_t status, int32_t recmaster, void *private_data) printf("status:%d recmaster:%d\n", status, recmaster); } -void adb_cb(int32_t status, struct ctdb_db_context *ctdb_db, void *private_data) +/* + * example on how to first read(non-existing recortds are implicitely created + * on demand) a record and change it in the callback. + * This forms the atom for the read-modify-write cycle. + * + * Pure read, or pure write are just special cases of this cycle. + */ +void rrl_cb(int32_t status, ctdb_handle *handle, TDB_DATA outdata, void *private_data) { - printf("status:%d db:%p\n", status, ctdb_db); + TDB_DATA data; + char tmp[256]; + + if (status != 0) { + printf("rrl_cb returned error: status %d\n", status); + if (handle) { + ctdb_free(handle); + } + return; + } + + printf("rrl size:%d data:%s\n", outdata.dsize, outdata.dptr); + if (outdata.dsize == 0) { + tmp[0] = 0; + } else { + strcpy(tmp, outdata.dptr); + } + strcat(tmp, "*"); + + data.dptr = tmp; + data.dsize = strlen(tmp) + 1; + ctdb_writerecord(handle, key, data); + + printf("Wrote new record : %s\n", tmp); + + ctdb_free(handle); } + + int main(int argc, char *argv[]) { struct ctdb_context *ctdb_context; @@ -38,6 +74,9 @@ int main(int argc, char *argv[]) int ret; TDB_DATA msg; + key.dptr = "Test Record"; + key.dsize = strlen(key.dptr); + ctdb_context = ctdb_connect("/tmp/ctdb.socket"); handle = ctdb_set_message_handler_send(ctdb_context, 55, NULL, msg_h, NULL); @@ -60,11 +99,17 @@ int main(int argc, char *argv[]) exit(10); } - handle = ctdb_attachdb_send(ctdb_context, CTDB_CURRENT_NODE, "test_test.tdb", 0, 0, adb_cb, NULL); + handle = ctdb_attachdb_send(ctdb_context, "test_test.tdb", 0, 0, NULL, NULL); if (handle == NULL) { printf("Failed to send attachdb control\n"); exit(10); } + ret = ctdb_attachdb_recv(ctdb_context, handle, &ctdb_db_context); + if (ret != 0 ) { + printf("Failed to attach to database\n"); + exit(10); + } + handle = ctdb_getpnn_send(ctdb_context, CTDB_CURRENT_NODE, pnn_cb, NULL); if (handle == NULL) { @@ -72,9 +117,9 @@ int main(int argc, char *argv[]) exit(10); } - handle = ctdb_getrecmaster_send(ctdb_context, CTDB_CURRENT_NODE, rm_cb, NULL); + handle = ctdb_readrecordlock_send(ctdb_context, ctdb_db_context, key, rrl_cb, NULL); if (handle == NULL) { - printf("Failed to send get_recmaster control\n"); + printf("Failed to send READRECORDLOCK\n"); exit(10); }