}
}
+static struct ctdb_req_header header_reply_call(
+ struct ctdb_req_header *header,
+ struct ctdbd_context *ctdb)
+{
+ struct ctdb_req_header reply_header;
+
+ reply_header = (struct ctdb_req_header) {
+ .ctdb_magic = CTDB_MAGIC,
+ .ctdb_version = CTDB_PROTOCOL,
+ .generation = ctdb->vnn_map->generation,
+ .operation = CTDB_REPLY_CALL,
+ .destnode = header->srcnode,
+ .srcnode = header->destnode,
+ .reqid = header->reqid,
+ };
+
+ return reply_header;
+}
+
static struct ctdb_req_header header_reply_control(
struct ctdb_req_header *header,
struct ctdbd_context *ctdb)
};
/*
- * Send replies to controls and messages
+ * Send replies to call, controls and messages
*/
static void client_reply_done(struct tevent_req *subreq);
+static void client_send_call(struct tevent_req *req,
+ struct ctdb_req_header *header,
+ struct ctdb_reply_call *reply)
+{
+ struct client_state *state = tevent_req_data(
+ req, struct client_state);
+ struct ctdbd_context *ctdb = state->ctdb;
+ struct tevent_req *subreq;
+ struct ctdb_req_header reply_header;
+ uint8_t *buf;
+ size_t datalen, buflen;
+ int ret;
+
+ reply_header = header_reply_call(header, ctdb);
+
+ datalen = ctdb_reply_call_len(&reply_header, reply);
+ ret = ctdb_allocate_pkt(state, datalen, &buf, &buflen);
+ if (ret != 0) {
+ tevent_req_error(req, ret);
+ return;
+ }
+
+ ret = ctdb_reply_call_push(&reply_header, reply, buf, &buflen);
+ if (ret != 0) {
+ tevent_req_error(req, ret);
+ return;
+ }
+
+ subreq = comm_write_send(state, state->ev, state->comm, buf, buflen);
+ if (tevent_req_nomem(subreq, req)) {
+ return;
+ }
+ tevent_req_set_callback(subreq, client_reply_done, req);
+
+ talloc_steal(subreq, buf);
+}
+
static void client_send_message(struct tevent_req *req,
struct ctdb_req_header *header,
struct ctdb_req_message_data *message)
static void client_dead_handler(void *private_data);
static void client_process_packet(struct tevent_req *req,
uint8_t *buf, size_t buflen);
+static void client_process_call(struct tevent_req *req,
+ uint8_t *buf, size_t buflen);
static void client_process_message(struct tevent_req *req,
uint8_t *buf, size_t buflen);
static void client_process_control(struct tevent_req *req,
}
switch (header.operation) {
+ case CTDB_REQ_CALL:
+ client_process_call(req, buf, buflen);
+ break;
+
case CTDB_REQ_MESSAGE:
client_process_message(req, buf, buflen);
break;
}
}
+static void client_process_call(struct tevent_req *req,
+ uint8_t *buf, size_t buflen)
+{
+ struct client_state *state = tevent_req_data(
+ req, struct client_state);
+ struct ctdbd_context *ctdb = state->ctdb;
+ TALLOC_CTX *mem_ctx;
+ struct ctdb_req_header header;
+ struct ctdb_req_call request;
+ struct ctdb_reply_call reply;
+ struct database *db;
+ struct ctdb_ltdb_header hdr;
+ TDB_DATA data;
+ int ret;
+
+ mem_ctx = talloc_new(state);
+ if (tevent_req_nomem(mem_ctx, req)) {
+ return;
+ }
+
+ ret = ctdb_req_call_pull(buf, buflen, &header, mem_ctx, &request);
+ if (ret != 0) {
+ talloc_free(mem_ctx);
+ tevent_req_error(req, ret);
+ return;
+ }
+
+ header_fix_pnn(&header, ctdb);
+
+ if (header.destnode >= ctdb->node_map->num_nodes) {
+ goto fail;
+ }
+
+ DEBUG(DEBUG_INFO, ("call db_id = %u\n", request.db_id));
+
+ db = database_find(ctdb->db_map, request.db_id);
+ if (db == NULL) {
+ goto fail;
+ }
+
+ ret = ltdb_fetch(db, request.key, &hdr, mem_ctx, &data);
+ if (ret != 0) {
+ goto fail;
+ }
+
+ /* Fake migration */
+ if (hdr.dmaster != ctdb->node_map->pnn) {
+ hdr.dmaster = ctdb->node_map->pnn;
+
+ ret = ltdb_store(db, request.key, &hdr, data);
+ if (ret != 0) {
+ goto fail;
+ }
+ }
+
+ talloc_free(mem_ctx);
+
+ reply.status = 0;
+ reply.data = tdb_null;
+
+ client_send_call(req, &header, &reply);
+ return;
+
+fail:
+ talloc_free(mem_ctx);
+ reply.status = -1;
+ reply.data = tdb_null;
+
+ client_send_call(req, &header, &reply);
+}
+
static void client_process_message(struct tevent_req *req,
uint8_t *buf, size_t buflen)
{