struct ctdb_registered_call *calls; /* list of registered calls */
uint32_t seqnum;
struct timed_event *te;
- uint32_t client_tdb_flags;
};
CTDB_CONTROL_GET_CAPABILITIES = 80,
CTDB_CONTROL_START_PERSISTENT_UPDATE = 81,
CTDB_CONTROL_CANCEL_PERSISTENT_UPDATE= 82,
+ CTDB_CONTROL_TRANS2_COMMIT = 83,
+ CTDB_CONTROL_TRANS2_FINISHED = 84,
+ CTDB_CONTROL_TRANS2_ERROR = 85,
};
/*
TALLOC_CTX *mem_ctx, TDB_DATA *data);
int ctdb_ltdb_store(struct ctdb_db_context *ctdb_db, TDB_DATA key,
struct ctdb_ltdb_header *header, TDB_DATA data);
-int ctdb_ltdb_persistent_store(struct ctdb_db_context *ctdb_db, TDB_DATA key,
- struct ctdb_ltdb_header *header, TDB_DATA data);
int32_t ctdb_control_start_persistent_update(struct ctdb_context *ctdb,
struct ctdb_req_control *c,
TDB_DATA recdata);
struct ctdb_rec_data *ctdb_marshall_record(TALLOC_CTX *mem_ctx, uint32_t reqid,
TDB_DATA key, struct ctdb_ltdb_header *, TDB_DATA data);
+struct ctdb_rec_data *ctdb_marshall_loop_next(struct ctdb_marshall_buffer *m, struct ctdb_rec_data *r,
+ uint32_t *reqid,
+ struct ctdb_ltdb_header *header,
+ TDB_DATA *key, TDB_DATA *data);
+
int32_t ctdb_control_pull_db(struct ctdb_context *ctdb, TDB_DATA indata, TDB_DATA *outdata);
int32_t ctdb_control_push_db(struct ctdb_context *ctdb, TDB_DATA indata);
int32_t ctdb_control_set_dmaster(struct ctdb_context *ctdb, TDB_DATA indata);
int32_t ctdb_control_update_record(struct ctdb_context *ctdb,
struct ctdb_req_control *c, TDB_DATA recdata,
bool *async_reply);
+int32_t ctdb_control_trans2_commit(struct ctdb_context *ctdb,
+ struct ctdb_req_control *c,
+ TDB_DATA recdata, bool *async_reply);
int32_t ctdb_control_transaction_start(struct ctdb_context *ctdb, uint32_t id);
int32_t ctdb_control_transaction_commit(struct ctdb_context *ctdb, uint32_t id);
int32_t ctdb_dump_memory(struct ctdb_context *ctdb, TDB_DATA *outdata);
int32_t ctdb_control_get_capabilities(struct ctdb_context *ctdb, TDB_DATA *outdata);
+int32_t ctdb_control_trans2_finished(struct ctdb_context *ctdb,
+ struct ctdb_req_control *c);
+int32_t ctdb_control_trans2_error(struct ctdb_context *ctdb,
+ struct ctdb_req_control *c);
+
#endif
status, errormsg));
state->status = status;
state->errormsg = errormsg;
+ DEBUG(DEBUG_ERR,(__location__ " Forcing recovery\n"));
+ ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
}
state->num_pending--;
if (state->num_pending == 0) {
struct ctdb_persistent_state *state = talloc_get_type(private_data, struct ctdb_persistent_state);
ctdb_request_control_reply(state->ctdb, state->c, NULL, -1, "timeout in ctdb_persistent_state");
+ DEBUG(DEBUG_ERR,(__location__ " Forcing recovery\n"));
+ state->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
talloc_free(state);
}
/*
- store a persistent record - called from a ctdb client when it has updated
- a record in a persistent database. The client will have the record
+ store a set of persistent records - called from a ctdb client when it has updated
+ some records in a persistent database. The client will have the record
locked for the duration of this call. The client is the dmaster when
this call is made
*/
-int32_t ctdb_control_persistent_store(struct ctdb_context *ctdb,
- struct ctdb_req_control *c,
- TDB_DATA recdata, bool *async_reply)
+int32_t ctdb_control_trans2_commit(struct ctdb_context *ctdb,
+ struct ctdb_req_control *c,
+ TDB_DATA recdata, bool *async_reply)
{
struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
struct ctdb_persistent_state *state;
DEBUG(DEBUG_ERR,(__location__ " can not match persistent_store to a client. Returning error\n"));
return -1;
}
- if (client->num_persistent_updates > 0) {
- client->num_persistent_updates--;
+
+ /* handling num_persistent_updates is a bit strange -
+ there are 3 cases
+ 1) very old clients, which never called CTDB_CONTROL_START_PERSISTENT_UPDATE
+ They don't expect num_persistent_updates to be used at all
+
+ 2) less old clients, which uses CTDB_CONTROL_START_PERSISTENT_UPDATE, and expected
+ this commit to then decrement it
+
+ 3) new clients which use TRANS2 commit functions, and
+ expect this function to increment the counter, and
+ then have it decremented in ctdb_control_trans2_error
+ or ctdb_control_trans2_finished
+ */
+ if (c->opcode == CTDB_CONTROL_PERSISTENT_STORE) {
+ if (client->num_persistent_updates > 0) {
+ client->num_persistent_updates--;
+ }
+ } else {
+ client->num_persistent_updates++;
}
state = talloc_zero(ctdb, struct ctdb_persistent_state);
struct ctdb_persistent_write_state {
struct ctdb_db_context *ctdb_db;
- TDB_DATA key;
- TDB_DATA data;
- struct ctdb_ltdb_header *header;
- struct tdb_context *tdb;
+ struct ctdb_marshall_buffer *m;
struct ctdb_req_control *c;
};
*/
static int ctdb_persistent_store(struct ctdb_persistent_write_state *state)
{
- struct ctdb_ltdb_header oldheader;
- int ret;
-
- /* fetch the old header and ensure the rsn is less than the new rsn */
- ret = ctdb_ltdb_fetch(state->ctdb_db, state->key, &oldheader, NULL, NULL);
- if (ret != 0) {
- DEBUG(DEBUG_ERR,("Failed to fetch old record for db_id 0x%08x in ctdb_persistent_store\n",
- state->ctdb_db->db_id));
+ int ret, i;
+ struct ctdb_rec_data *rec = NULL;
+ struct ctdb_marshall_buffer *m = state->m;
+
+ ret = tdb_transaction_start(state->ctdb_db->ltdb->tdb);
+ if (ret == -1) {
+ DEBUG(DEBUG_ERR,("Failed to start transaction for db_id 0x%08x in ctdb_persistent_store\n",
+ state->ctdb_db->db_id));
return -1;
}
- if (oldheader.rsn >= state->header->rsn) {
- DEBUG(DEBUG_CRIT,("existing header for db_id 0x%08x has larger RSN %llu than new RSN %llu in ctdb_persistent_store\n",
- state->ctdb_db->db_id,
- (unsigned long long)oldheader.rsn, (unsigned long long)state->header->rsn));
- return -1;
+ for (i=0;i<m->count;i++) {
+ struct ctdb_ltdb_header oldheader;
+ struct ctdb_ltdb_header header;
+ TDB_DATA key, data;
+
+ rec = ctdb_marshall_loop_next(m, rec, NULL, &header, &key, &data);
+
+ if (rec == NULL) {
+ DEBUG(DEBUG_ERR,("Failed to get next record %d for db_id 0x%08x in ctdb_persistent_store\n",
+ i, state->ctdb_db->db_id));
+ goto failed;
+ }
+
+ /* fetch the old header and ensure the rsn is less than the new rsn */
+ ret = ctdb_ltdb_fetch(state->ctdb_db, key, &oldheader, NULL, NULL);
+ if (ret != 0) {
+ DEBUG(DEBUG_ERR,("Failed to fetch old record for db_id 0x%08x in ctdb_persistent_store\n",
+ state->ctdb_db->db_id));
+ goto failed;
+ }
+
+ if (oldheader.rsn >= header.rsn) {
+ DEBUG(DEBUG_CRIT,("existing header for db_id 0x%08x has larger RSN %llu than new RSN %llu in ctdb_persistent_store\n",
+ state->ctdb_db->db_id,
+ (unsigned long long)oldheader.rsn, (unsigned long long)header.rsn));
+ goto failed;
+ }
+
+ ret = ctdb_ltdb_store(state->ctdb_db, key, &header, data);
+ if (ret != 0) {
+ DEBUG(DEBUG_CRIT,("Failed to store record for db_id 0x%08x in ctdb_persistent_store\n",
+ state->ctdb_db->db_id));
+ return -1;
+ }
}
- ret = ctdb_ltdb_persistent_store(state->ctdb_db, state->key, state->header, state->data);
- if (ret != 0) {
- DEBUG(DEBUG_CRIT,("Failed to store record for db_id 0x%08x in ctdb_persistent_store\n",
- state->ctdb_db->db_id));
+ ret = tdb_transaction_commit(state->ctdb_db->ltdb->tdb);
+ if (ret == -1) {
+ DEBUG(DEBUG_ERR,("Failed to commit transaction for db_id 0x%08x in ctdb_persistent_store\n",
+ state->ctdb_db->db_id));
return -1;
}
return 0;
+
+failed:
+ tdb_transaction_cancel(state->ctdb_db->ltdb->tdb);
+ return -1;
}
struct ctdb_req_control *c, TDB_DATA recdata,
bool *async_reply)
{
- struct ctdb_rec_data *rec = (struct ctdb_rec_data *)&recdata.dptr[0];
struct ctdb_db_context *ctdb_db;
- uint32_t db_id = rec->reqid;
struct ctdb_persistent_write_state *state;
struct childwrite_handle *handle;
+ struct ctdb_marshall_buffer *m = (struct ctdb_marshall_buffer *)recdata.dptr;
if (ctdb->recovery_mode != CTDB_RECOVERY_NORMAL) {
DEBUG(DEBUG_DEBUG,("rejecting ctdb_control_update_record when recovery active\n"));
return -1;
}
- ctdb_db = find_ctdb_db(ctdb, db_id);
+ ctdb_db = find_ctdb_db(ctdb, m->db_id);
if (ctdb_db == NULL) {
- DEBUG(DEBUG_ERR,("Unknown database 0x%08x in ctdb_control_update_record\n", db_id));
+ DEBUG(DEBUG_ERR,("Unknown database 0x%08x in ctdb_control_update_record\n", m->db_id));
return -1;
}
state->ctdb_db = ctdb_db;
state->c = c;
- state->tdb = ctdb_db->ltdb->tdb;
- state->key.dptr = &rec->data[0];
- state->key.dsize = rec->keylen;
- state->data.dptr = &rec->data[rec->keylen];
- state->data.dsize = rec->datalen;
-
- if (state->data.dsize < sizeof(struct ctdb_ltdb_header)) {
- DEBUG(DEBUG_CRIT,("Invalid data size %u in ctdb_control_update_record\n",
- (unsigned)state->data.dsize));
- talloc_free(state);
- return -1;
- }
-
- state->header = (struct ctdb_ltdb_header *)&state->data.dptr[0];
- state->data.dptr += sizeof(struct ctdb_ltdb_header);
- state->data.dsize -= sizeof(struct ctdb_ltdb_header);
-
+ state->m = m;
/* create a child process to take out a transaction and
write the data.
}
+/*
+ called when a client has finished a local commit in a transaction to
+ a persistent database
+ */
+int32_t ctdb_control_trans2_finished(struct ctdb_context *ctdb,
+ struct ctdb_req_control *c)
+{
+ struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
+
+ if (client->num_persistent_updates == 0) {
+ DEBUG(DEBUG_ERR, (__location__ " ERROR: num_persistent_updates == 0\n"));
+ return -1;
+ }
+ client->num_persistent_updates--;
+
+ return 0;
+}
+
+/*
+ called when a client gets an error committing its database
+ during a transaction commit
+ */
+int32_t ctdb_control_trans2_error(struct ctdb_context *ctdb,
+ struct ctdb_req_control *c)
+{
+ struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
+
+ if (client->num_persistent_updates == 0) {
+ DEBUG(DEBUG_ERR, (__location__ " ERROR: num_persistent_updates == 0\n"));
+ return -1;
+ }
+ client->num_persistent_updates--;
+
+ DEBUG(DEBUG_ERR,(__location__ " Forcing recovery\n"));
+ client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
+
+ return 0;
+}
+
/*
+ backwards compatibility:
+
start a persistent store operation. passing both the key, header and
data to the daemon. If the client disconnects before it has issued
a persistent_update call to the daemon we trigger a full recovery
return 0;
}
+/*
+ backwards compatibility:
+
+ called to tell ctdbd that it is no longer doing a persistent update
+*/
int32_t ctdb_control_cancel_persistent_update(struct ctdb_context *ctdb,
- struct ctdb_req_control *c,
- TDB_DATA recdata)
+ struct ctdb_req_control *c,
+ TDB_DATA recdata)
{
struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
return 0;
}
+
+
+/*
+ backwards compatibility:
+
+ single record varient of ctdb_control_trans2_commit for older clients
+ */
+int32_t ctdb_control_persistent_store(struct ctdb_context *ctdb,
+ struct ctdb_req_control *c,
+ TDB_DATA recdata, bool *async_reply)
+{
+ struct ctdb_marshall_buffer *m;
+ struct ctdb_rec_data *rec = (struct ctdb_rec_data *)recdata.dptr;
+ TDB_DATA key, data;
+
+ if (recdata.dsize != offsetof(struct ctdb_rec_data, data) +
+ rec->keylen + rec->datalen) {
+ DEBUG(DEBUG_ERR, (__location__ " Bad data size in recdata\n"));
+ return -1;
+ }
+
+ key.dptr = &rec->data[0];
+ key.dsize = rec->keylen;
+ data.dptr = &rec->data[rec->keylen];
+ data.dsize = rec->datalen;
+
+ m = ctdb_marshall_add(c, NULL, rec->reqid, rec->reqid, key, NULL, data);
+ CTDB_NO_MEMORY(ctdb, m);
+
+ return ctdb_control_trans2_commit(ctdb, c, ctdb_marshall_finish(m), async_reply);
+}
+
+