/*
- called by the transport layer when a packet comes in
+ called when we need to process a packet. This can be a requeued packet
+ after a lockwait, or a real packet from another node
*/
-void ctdb_recv_pkt(struct ctdb_context *ctdb, uint8_t *data, uint32_t length)
+void ctdb_input_pkt(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
{
- struct ctdb_req_header *hdr = (struct ctdb_req_header *)data;
TALLOC_CTX *tmp_ctx;
- ctdb->status.node_packets_recv++;
-
/* place the packet as a child of the tmp_ctx. We then use
talloc_free() below to free it. If any of the calls want
to keep it, then they will steal it somewhere else, and the
tmp_ctx = talloc_new(ctdb);
talloc_steal(tmp_ctx, hdr);
- if (length < sizeof(*hdr)) {
- ctdb_set_error(ctdb, "Bad packet length %d\n", length);
- goto done;
- }
- if (length != hdr->length) {
- ctdb_set_error(ctdb, "Bad header length %d expected %d\n",
- hdr->length, length);
- goto done;
- }
-
- if (hdr->ctdb_magic != CTDB_MAGIC) {
- ctdb_set_error(ctdb, "Non CTDB packet rejected\n");
- goto done;
- }
-
- if (hdr->ctdb_version != CTDB_VERSION) {
- ctdb_set_error(ctdb, "Bad CTDB version 0x%x rejected\n", hdr->ctdb_version);
- goto done;
- }
-
DEBUG(3,(__location__ " ctdb request %d of type %d length %d from "
"node %d to %d\n", hdr->reqid, hdr->operation, hdr->length,
hdr->srcnode, hdr->destnode));
- /* up the counter for this source node, so we know its alive */
- if (ctdb_validate_vnn(ctdb, hdr->srcnode)) {
- ctdb->nodes[hdr->srcnode]->rx_cnt++;
- }
-
switch (hdr->operation) {
case CTDB_REQ_CALL:
case CTDB_REPLY_CALL:
talloc_free(tmp_ctx);
}
+
/*
called by the transport layer when a packet comes in
*/
-void ctdb_recv_raw_pkt(void *p, uint8_t *data, uint32_t length)
+static void ctdb_recv_pkt(struct ctdb_context *ctdb, uint8_t *data, uint32_t length)
{
- struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context);
- ctdb_recv_pkt(ctdb, data, length);
+ struct ctdb_req_header *hdr = (struct ctdb_req_header *)data;
+
+ ctdb->status.node_packets_recv++;
+
+ if (length < sizeof(*hdr)) {
+ ctdb_set_error(ctdb, "Bad packet length %d\n", length);
+ return;
+ }
+ if (length != hdr->length) {
+ ctdb_set_error(ctdb, "Bad header length %d expected %d\n",
+ hdr->length, length);
+ return;
+ }
+
+ if (hdr->ctdb_magic != CTDB_MAGIC) {
+ ctdb_set_error(ctdb, "Non CTDB packet rejected\n");
+ return;
+ }
+
+ if (hdr->ctdb_version != CTDB_VERSION) {
+ ctdb_set_error(ctdb, "Bad CTDB version 0x%x rejected\n", hdr->ctdb_version);
+ return;
+ }
+
+ /* up the counter for this source node, so we know its alive */
+ if (ctdb_validate_vnn(ctdb, hdr->srcnode)) {
+ /* as a special case, redirected calls don't increment the rx_cnt */
+ if (hdr->operation != CTDB_REQ_CALL ||
+ ((struct ctdb_req_call *)hdr)->hopcount == 0) {
+ ctdb->nodes[hdr->srcnode]->rx_cnt++;
+ }
+ }
+
+ ctdb_input_pkt(ctdb, hdr);
}
+
/*
called by the transport layer when a node is dead
*/
struct timeval t, void *private_data)
{
struct queue_next *q = talloc_get_type(private_data, struct queue_next);
- ctdb_recv_pkt(q->ctdb, (uint8_t *)q->hdr, q->hdr->length);
- talloc_free(q);
+ ctdb_input_pkt(q->ctdb, q->hdr);
}
/*
}
#if 0
/* use this to put packets directly into our recv function */
- ctdb_recv_pkt(q->ctdb, (uint8_t *)q->hdr, q->hdr->length);
- talloc_free(q);
+ ctdb_input_pkt(q->ctdb, q->hdr);
#else
event_add_timed(ctdb->ev, q, timeval_zero(), queue_next_trigger, q);
#endif
}
+/*
+ a varient of input packet that can be used in lock requeue
+*/
+void ctdb_call_input_pkt(void *p, struct ctdb_req_header *hdr)
+{
+ struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context);
+ ctdb_input_pkt(ctdb, hdr);
+}
+
+
/*
local version of ctdb_call
*/
/* fetch the current record */
ret = ctdb_ltdb_lock_fetch_requeue(ctdb_db, key, &header, hdr, &data2,
- ctdb_recv_raw_pkt, ctdb, False);
+ ctdb_call_input_pkt, ctdb, False);
if (ret == -1) {
ctdb_fatal(ctdb, "ctdb_req_dmaster failed to fetch record");
return;
if the call will be answered locally */
ret = ctdb_ltdb_lock_fetch_requeue(ctdb_db, call.key, &header, hdr, &data,
- ctdb_recv_raw_pkt, ctdb, False);
+ ctdb_call_input_pkt, ctdb, False);
if (ret == -1) {
ctdb_send_error(ctdb, hdr, ret, "ltdb fetch failed in ctdb_request_call");
return;
data.dsize = c->datalen;
ret = ctdb_ltdb_lock_requeue(ctdb_db, key, hdr,
- ctdb_recv_raw_pkt, ctdb, False);
+ ctdb_call_input_pkt, ctdb, False);
if (ret == -2) {
return;
}
};
-static void daemon_incoming_packet(void *, uint8_t *, uint32_t );
+static void daemon_incoming_packet(void *, struct ctdb_req_header *);
static void ctdb_main_loop(struct ctdb_context *ctdb)
{
struct ctdb_req_control *c);
/* data contains a packet from the client */
-static void daemon_incoming_packet(void *p, uint8_t *data, uint32_t nread)
+static void daemon_incoming_packet(void *p, struct ctdb_req_header *hdr)
{
- struct ctdb_req_header *hdr = (struct ctdb_req_header *)data;
struct ctdb_client *client = talloc_get_type(p, struct ctdb_client);
TALLOC_CTX *tmp_ctx;
struct ctdb_context *ctdb = client->ctdb;
hdr->srcnode, hdr->destnode));
/* it is the responsibility of the incoming packet function to free 'data' */
- daemon_incoming_packet(client, data, cnt);
+ daemon_incoming_packet(client, hdr);
}
static void ctdb_accept_client(struct event_context *ev, struct fd_event *fde,
struct lock_fetch_state {
struct ctdb_context *ctdb;
- void (*recv_pkt)(void *, uint8_t *, uint32_t);
+ void (*recv_pkt)(void *, struct ctdb_req_header *);
void *recv_context;
struct ctdb_req_header *hdr;
uint32_t generation;
talloc_free(state->hdr);
return;
}
- state->recv_pkt(state->recv_context, (uint8_t *)state->hdr, state->hdr->length);
+ state->recv_pkt(state->recv_context, state->hdr);
DEBUG(2,(__location__ " PACKET REQUEUED\n"));
}
*/
int ctdb_ltdb_lock_requeue(struct ctdb_db_context *ctdb_db,
TDB_DATA key, struct ctdb_req_header *hdr,
- void (*recv_pkt)(void *, uint8_t *, uint32_t ),
+ void (*recv_pkt)(void *, struct ctdb_req_header *),
void *recv_context, bool ignore_generation)
{
int ret;
return -1;
}
- /* we need to move the packet off the temporary context in ctdb_recv_pkt(),
+ /* we need to move the packet off the temporary context in ctdb_input_pkt(),
so it won't be freed yet */
talloc_steal(state, hdr);
talloc_steal(state, h);
int ctdb_ltdb_lock_fetch_requeue(struct ctdb_db_context *ctdb_db,
TDB_DATA key, struct ctdb_ltdb_header *header,
struct ctdb_req_header *hdr, TDB_DATA *data,
- void (*recv_pkt)(void *, uint8_t *, uint32_t ),
+ void (*recv_pkt)(void *, struct ctdb_req_header *),
void *recv_context, bool ignore_generation)
{
int ret;
void ctdb_queue_packet(struct ctdb_context *ctdb, struct ctdb_req_header *hdr);
int ctdb_ltdb_lock_requeue(struct ctdb_db_context *ctdb_db,
TDB_DATA key, struct ctdb_req_header *hdr,
- void (*recv_pkt)(void *, uint8_t *, uint32_t ),
+ void (*recv_pkt)(void *, struct ctdb_req_header *),
void *recv_context, bool ignore_generation);
int ctdb_ltdb_lock_fetch_requeue(struct ctdb_db_context *ctdb_db,
TDB_DATA key, struct ctdb_ltdb_header *header,
struct ctdb_req_header *hdr, TDB_DATA *data,
- void (*recv_pkt)(void *, uint8_t *, uint32_t ),
+ void (*recv_pkt)(void *, struct ctdb_req_header *),
void *recv_context, bool ignore_generation);
-void ctdb_recv_pkt(struct ctdb_context *ctdb, uint8_t *data, uint32_t length);
+void ctdb_input_pkt(struct ctdb_context *ctdb, struct ctdb_req_header *);
struct ctdb_call_state *ctdb_call_local_send(struct ctdb_db_context *ctdb_db,
struct ctdb_call *call,