added new multi-record transaction commit code
authorAndrew Tridgell <tridge@samba.org>
Wed, 30 Jul 2008 09:57:00 +0000 (19:57 +1000)
committerAndrew Tridgell <tridge@samba.org>
Wed, 30 Jul 2008 09:57:00 +0000 (19:57 +1000)
include/ctdb_private.h
server/ctdb_control.c
server/ctdb_persistent.c

index 4d547782c2a27e490a67486a843e6d70f5a1098a..79046aa7eed65f50e85996c18453e56b259780e5 100644 (file)
@@ -430,7 +430,6 @@ struct ctdb_db_context {
        struct ctdb_registered_call *calls; /* list of registered calls */
        uint32_t seqnum;
        struct timed_event *te;
-       uint32_t client_tdb_flags;
 };
 
 
@@ -547,6 +546,9 @@ enum ctdb_controls {CTDB_CONTROL_PROCESS_EXISTS          = 0,
                    CTDB_CONTROL_GET_CAPABILITIES        = 80,
                    CTDB_CONTROL_START_PERSISTENT_UPDATE = 81,
                    CTDB_CONTROL_CANCEL_PERSISTENT_UPDATE= 82,
+                   CTDB_CONTROL_TRANS2_COMMIT           = 83,
+                   CTDB_CONTROL_TRANS2_FINISHED         = 84,
+                   CTDB_CONTROL_TRANS2_ERROR            = 85,
 };     
 
 /*
@@ -813,8 +815,6 @@ int ctdb_ltdb_fetch(struct ctdb_db_context *ctdb_db,
                    TALLOC_CTX *mem_ctx, TDB_DATA *data);
 int ctdb_ltdb_store(struct ctdb_db_context *ctdb_db, TDB_DATA key, 
                    struct ctdb_ltdb_header *header, TDB_DATA data);
-int ctdb_ltdb_persistent_store(struct ctdb_db_context *ctdb_db, TDB_DATA key, 
-                   struct ctdb_ltdb_header *header, TDB_DATA data);
 int32_t ctdb_control_start_persistent_update(struct ctdb_context *ctdb, 
                        struct ctdb_req_control *c,
                        TDB_DATA recdata);
@@ -1121,6 +1121,11 @@ int32_t ctdb_ltdb_set_seqnum_frequency(struct ctdb_context *ctdb, uint32_t frequ
 struct ctdb_rec_data *ctdb_marshall_record(TALLOC_CTX *mem_ctx, uint32_t reqid,        
                                           TDB_DATA key, struct ctdb_ltdb_header *, TDB_DATA data);
 
+struct ctdb_rec_data *ctdb_marshall_loop_next(struct ctdb_marshall_buffer *m, struct ctdb_rec_data *r,
+                                             uint32_t *reqid,
+                                             struct ctdb_ltdb_header *header,
+                                             TDB_DATA *key, TDB_DATA *data);
+
 int32_t ctdb_control_pull_db(struct ctdb_context *ctdb, TDB_DATA indata, TDB_DATA *outdata);
 int32_t ctdb_control_push_db(struct ctdb_context *ctdb, TDB_DATA indata);
 int32_t ctdb_control_set_dmaster(struct ctdb_context *ctdb, TDB_DATA indata);
@@ -1308,6 +1313,9 @@ int32_t ctdb_control_persistent_store(struct ctdb_context *ctdb,
 int32_t ctdb_control_update_record(struct ctdb_context *ctdb, 
                                   struct ctdb_req_control *c, TDB_DATA recdata, 
                                   bool *async_reply);
+int32_t ctdb_control_trans2_commit(struct ctdb_context *ctdb, 
+                                  struct ctdb_req_control *c, 
+                                  TDB_DATA recdata, bool *async_reply);
 
 int32_t ctdb_control_transaction_start(struct ctdb_context *ctdb, uint32_t id);
 int32_t ctdb_control_transaction_commit(struct ctdb_context *ctdb, uint32_t id);
@@ -1353,4 +1361,9 @@ int ctdb_control_reload_nodes_file(struct ctdb_context *ctdb, uint32_t opcode);
 int32_t ctdb_dump_memory(struct ctdb_context *ctdb, TDB_DATA *outdata);
 int32_t ctdb_control_get_capabilities(struct ctdb_context *ctdb, TDB_DATA *outdata);
 
+int32_t ctdb_control_trans2_finished(struct ctdb_context *ctdb, 
+                                    struct ctdb_req_control *c);
+int32_t ctdb_control_trans2_error(struct ctdb_context *ctdb, 
+                                 struct ctdb_req_control *c);
+
 #endif
index 1ae8e85cacf18fe0a923f44febcefd4df1d02310..3d223299914233ab0e7143a42cc6079e91910095 100644 (file)
@@ -400,6 +400,15 @@ static int32_t ctdb_control_dispatch(struct ctdb_context *ctdb,
        case CTDB_CONTROL_CANCEL_PERSISTENT_UPDATE:
                return ctdb_control_cancel_persistent_update(ctdb, c, indata);
 
+       case CTDB_CONTROL_TRANS2_COMMIT:
+               return ctdb_control_trans2_commit(ctdb, c, indata, async_reply);
+
+       case CTDB_CONTROL_TRANS2_ERROR:
+               return ctdb_control_trans2_error(ctdb, c);
+
+       case CTDB_CONTROL_TRANS2_FINISHED:
+               return ctdb_control_trans2_finished(ctdb, c);
+
        default:
                DEBUG(DEBUG_CRIT,(__location__ " Unknown CTDB control opcode %u\n", opcode));
                return -1;
index 455ccba4b05c660ca64c95c8c753982cc17ff422..81c623999a8a70e6a902ae9320cf27e73b3a5eef 100644 (file)
@@ -50,6 +50,8 @@ static void ctdb_persistent_callback(struct ctdb_context *ctdb,
                         status, errormsg));
                state->status = status;
                state->errormsg = errormsg;
+               DEBUG(DEBUG_ERR,(__location__ " Forcing recovery\n"));
+               ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
        }
        state->num_pending--;
        if (state->num_pending == 0) {
@@ -67,19 +69,21 @@ static void ctdb_persistent_store_timeout(struct event_context *ev, struct timed
        struct ctdb_persistent_state *state = talloc_get_type(private_data, struct ctdb_persistent_state);
        
        ctdb_request_control_reply(state->ctdb, state->c, NULL, -1, "timeout in ctdb_persistent_state");
+       DEBUG(DEBUG_ERR,(__location__ " Forcing recovery\n"));
+       state->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
 
        talloc_free(state);
 }
 
 /*
-  store a persistent record - called from a ctdb client when it has updated
-  a record in a persistent database. The client will have the record
+  store a set of persistent records - called from a ctdb client when it has updated
+  some records in a persistent database. The client will have the record
   locked for the duration of this call. The client is the dmaster when 
   this call is made
  */
-int32_t ctdb_control_persistent_store(struct ctdb_context *ctdb, 
-                                     struct ctdb_req_control *c, 
-                                     TDB_DATA recdata, bool *async_reply)
+int32_t ctdb_control_trans2_commit(struct ctdb_context *ctdb, 
+                                  struct ctdb_req_control *c, 
+                                  TDB_DATA recdata, bool *async_reply)
 {
        struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
        struct ctdb_persistent_state *state;
@@ -89,8 +93,26 @@ int32_t ctdb_control_persistent_store(struct ctdb_context *ctdb,
                DEBUG(DEBUG_ERR,(__location__ " can not match persistent_store to a client. Returning error\n"));
                return -1;
        }
-       if (client->num_persistent_updates > 0) {
-               client->num_persistent_updates--;
+
+       /* handling num_persistent_updates is a bit strange - 
+          there are 3 cases
+            1) very old clients, which never called CTDB_CONTROL_START_PERSISTENT_UPDATE
+               They don't expect num_persistent_updates to be used at all
+
+            2) less old clients, which uses CTDB_CONTROL_START_PERSISTENT_UPDATE, and expected
+               this commit to then decrement it
+
+             3) new clients which use TRANS2 commit functions, and
+               expect this function to increment the counter, and
+               then have it decremented in ctdb_control_trans2_error
+               or ctdb_control_trans2_finished
+       */
+       if (c->opcode == CTDB_CONTROL_PERSISTENT_STORE) {
+               if (client->num_persistent_updates > 0) {
+                       client->num_persistent_updates--;
+               }               
+       } else {
+               client->num_persistent_updates++;
        }
 
        state = talloc_zero(ctdb, struct ctdb_persistent_state);
@@ -147,10 +169,7 @@ int32_t ctdb_control_persistent_store(struct ctdb_context *ctdb,
 
 struct ctdb_persistent_write_state {
        struct ctdb_db_context *ctdb_db;
-       TDB_DATA key;
-       TDB_DATA data;
-       struct ctdb_ltdb_header *header;
-       struct tdb_context *tdb;
+       struct ctdb_marshall_buffer *m;
        struct ctdb_req_control *c;
 };
 
@@ -160,32 +179,65 @@ struct ctdb_persistent_write_state {
  */
 static int ctdb_persistent_store(struct ctdb_persistent_write_state *state)
 {
-       struct ctdb_ltdb_header oldheader;
-       int ret;
-
-       /* fetch the old header and ensure the rsn is less than the new rsn */
-       ret = ctdb_ltdb_fetch(state->ctdb_db, state->key, &oldheader, NULL, NULL);
-       if (ret != 0) {
-               DEBUG(DEBUG_ERR,("Failed to fetch old record for db_id 0x%08x in ctdb_persistent_store\n",
-                        state->ctdb_db->db_id));
+       int ret, i;
+       struct ctdb_rec_data *rec = NULL;
+       struct ctdb_marshall_buffer *m = state->m;
+
+       ret = tdb_transaction_start(state->ctdb_db->ltdb->tdb);
+       if (ret == -1) {
+               DEBUG(DEBUG_ERR,("Failed to start transaction for db_id 0x%08x in ctdb_persistent_store\n",
+                                state->ctdb_db->db_id));
                return -1;
        }
 
-       if (oldheader.rsn >= state->header->rsn) {
-               DEBUG(DEBUG_CRIT,("existing header for db_id 0x%08x has larger RSN %llu than new RSN %llu in ctdb_persistent_store\n",
-                        state->ctdb_db->db_id, 
-                        (unsigned long long)oldheader.rsn, (unsigned long long)state->header->rsn));
-               return -1;
+       for (i=0;i<m->count;i++) {
+               struct ctdb_ltdb_header oldheader;
+               struct ctdb_ltdb_header header;
+               TDB_DATA key, data;
+
+               rec = ctdb_marshall_loop_next(m, rec, NULL, &header, &key, &data);
+               
+               if (rec == NULL) {
+                       DEBUG(DEBUG_ERR,("Failed to get next record %d for db_id 0x%08x in ctdb_persistent_store\n",
+                                        i, state->ctdb_db->db_id));
+                       goto failed;                    
+               }
+
+               /* fetch the old header and ensure the rsn is less than the new rsn */
+               ret = ctdb_ltdb_fetch(state->ctdb_db, key, &oldheader, NULL, NULL);
+               if (ret != 0) {
+                       DEBUG(DEBUG_ERR,("Failed to fetch old record for db_id 0x%08x in ctdb_persistent_store\n",
+                                        state->ctdb_db->db_id));
+                       goto failed;
+               }
+
+               if (oldheader.rsn >= header.rsn) {
+                       DEBUG(DEBUG_CRIT,("existing header for db_id 0x%08x has larger RSN %llu than new RSN %llu in ctdb_persistent_store\n",
+                                         state->ctdb_db->db_id, 
+                                         (unsigned long long)oldheader.rsn, (unsigned long long)header.rsn));
+                       goto failed;
+               }
+
+               ret = ctdb_ltdb_store(state->ctdb_db, key, &header, data);
+               if (ret != 0) {
+                       DEBUG(DEBUG_CRIT,("Failed to store record for db_id 0x%08x in ctdb_persistent_store\n", 
+                                         state->ctdb_db->db_id));
+                       return -1;
+               }
        }
 
-       ret = ctdb_ltdb_persistent_store(state->ctdb_db, state->key, state->header, state->data);
-       if (ret != 0) {
-               DEBUG(DEBUG_CRIT,("Failed to store record for db_id 0x%08x in ctdb_persistent_store\n", 
-                        state->ctdb_db->db_id));
+       ret = tdb_transaction_commit(state->ctdb_db->ltdb->tdb);
+       if (ret == -1) {
+               DEBUG(DEBUG_ERR,("Failed to commit transaction for db_id 0x%08x in ctdb_persistent_store\n",
+                                state->ctdb_db->db_id));
                return -1;
        }
 
        return 0;
+       
+failed:
+       tdb_transaction_cancel(state->ctdb_db->ltdb->tdb);
+       return -1;
 }
 
 
@@ -357,20 +409,19 @@ int32_t ctdb_control_update_record(struct ctdb_context *ctdb,
                                   struct ctdb_req_control *c, TDB_DATA recdata, 
                                   bool *async_reply)
 {
-       struct ctdb_rec_data *rec = (struct ctdb_rec_data *)&recdata.dptr[0];
        struct ctdb_db_context *ctdb_db;
-       uint32_t db_id = rec->reqid;
        struct ctdb_persistent_write_state *state;
        struct childwrite_handle *handle;
+       struct ctdb_marshall_buffer *m = (struct ctdb_marshall_buffer *)recdata.dptr;
 
        if (ctdb->recovery_mode != CTDB_RECOVERY_NORMAL) {
                DEBUG(DEBUG_DEBUG,("rejecting ctdb_control_update_record when recovery active\n"));
                return -1;
        }
 
-       ctdb_db = find_ctdb_db(ctdb, db_id);
+       ctdb_db = find_ctdb_db(ctdb, m->db_id);
        if (ctdb_db == NULL) {
-               DEBUG(DEBUG_ERR,("Unknown database 0x%08x in ctdb_control_update_record\n", db_id));
+               DEBUG(DEBUG_ERR,("Unknown database 0x%08x in ctdb_control_update_record\n", m->db_id));
                return -1;
        }
 
@@ -379,23 +430,7 @@ int32_t ctdb_control_update_record(struct ctdb_context *ctdb,
 
        state->ctdb_db = ctdb_db;
        state->c       = c;
-       state->tdb     = ctdb_db->ltdb->tdb;
-       state->key.dptr   = &rec->data[0];
-       state->key.dsize  = rec->keylen;
-       state->data.dptr  = &rec->data[rec->keylen];
-       state->data.dsize = rec->datalen;
-
-       if (state->data.dsize < sizeof(struct ctdb_ltdb_header)) {
-               DEBUG(DEBUG_CRIT,("Invalid data size %u in ctdb_control_update_record\n", 
-                        (unsigned)state->data.dsize));
-               talloc_free(state);
-               return -1;
-       }
-
-       state->header = (struct ctdb_ltdb_header *)&state->data.dptr[0];
-       state->data.dptr  += sizeof(struct ctdb_ltdb_header);
-       state->data.dsize -= sizeof(struct ctdb_ltdb_header);
-
+       state->m       = m;
 
        /* create a child process to take out a transaction and 
           write the data.
@@ -421,8 +456,49 @@ int32_t ctdb_control_update_record(struct ctdb_context *ctdb,
 }
 
 
+/*
+  called when a client has finished a local commit in a transaction to 
+  a persistent database
+ */
+int32_t ctdb_control_trans2_finished(struct ctdb_context *ctdb, 
+                                    struct ctdb_req_control *c)
+{
+       struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
+
+       if (client->num_persistent_updates == 0) {
+               DEBUG(DEBUG_ERR, (__location__ " ERROR: num_persistent_updates == 0\n"));
+               return -1;
+       }
+       client->num_persistent_updates--;
+
+       return 0;
+}
+
+/*
+  called when a client gets an error committing its database
+  during a transaction commit
+ */
+int32_t ctdb_control_trans2_error(struct ctdb_context *ctdb, 
+                                 struct ctdb_req_control *c)
+{
+       struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
+       
+       if (client->num_persistent_updates == 0) {
+               DEBUG(DEBUG_ERR, (__location__ " ERROR: num_persistent_updates == 0\n"));
+               return -1;
+       }
+       client->num_persistent_updates--;
+
+       DEBUG(DEBUG_ERR,(__location__ " Forcing recovery\n"));
+       client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
+
+       return 0;
+}
+
 
 /*
+  backwards compatibility:
+
   start a persistent store operation. passing both the key, header and
   data to the daemon. If the client disconnects before it has issued
   a persistent_update call to the daemon we trigger a full recovery
@@ -445,9 +521,14 @@ int32_t ctdb_control_start_persistent_update(struct ctdb_context *ctdb,
        return 0;
 }
 
+/* 
+  backwards compatibility:
+
+  called to tell ctdbd that it is no longer doing a persistent update 
+*/
 int32_t ctdb_control_cancel_persistent_update(struct ctdb_context *ctdb, 
-                                     struct ctdb_req_control *c,
-                                     TDB_DATA recdata)
+                                             struct ctdb_req_control *c,
+                                             TDB_DATA recdata)
 {
        struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
 
@@ -462,3 +543,36 @@ int32_t ctdb_control_cancel_persistent_update(struct ctdb_context *ctdb,
 
        return 0;
 }
+
+
+/*
+  backwards compatibility:
+
+  single record varient of ctdb_control_trans2_commit for older clients
+ */
+int32_t ctdb_control_persistent_store(struct ctdb_context *ctdb, 
+                                     struct ctdb_req_control *c, 
+                                     TDB_DATA recdata, bool *async_reply)
+{
+       struct ctdb_marshall_buffer *m;
+       struct ctdb_rec_data *rec = (struct ctdb_rec_data *)recdata.dptr;
+       TDB_DATA key, data;
+
+       if (recdata.dsize != offsetof(struct ctdb_rec_data, data) + 
+           rec->keylen + rec->datalen) {
+               DEBUG(DEBUG_ERR, (__location__ " Bad data size in recdata\n"));
+               return -1;
+       }
+
+       key.dptr = &rec->data[0];
+       key.dsize = rec->keylen;
+       data.dptr = &rec->data[rec->keylen];
+       data.dsize = rec->datalen;
+
+       m = ctdb_marshall_add(c, NULL, rec->reqid, rec->reqid, key, NULL, data);
+       CTDB_NO_MEMORY(ctdb, m);
+
+       return ctdb_control_trans2_commit(ctdb, c, ctdb_marshall_finish(m), async_reply);
+}
+
+