make sure we don't increment rx_cnt for redirected packets, or for packets that have...
authorAndrew Tridgell <tridge@samba.org>
Sat, 19 May 2007 03:45:24 +0000 (13:45 +1000)
committerAndrew Tridgell <tridge@samba.org>
Sat, 19 May 2007 03:45:24 +0000 (13:45 +1000)
(This used to be ctdb commit 92e5569407dba173a27e9645b4339ce3e2c00520)

ctdb/common/ctdb.c
ctdb/common/ctdb_call.c
ctdb/common/ctdb_daemon.c
ctdb/common/ctdb_ltdb.c
ctdb/include/ctdb_private.h

index 922f5c46dbf094b81c35d054d7f45e0325e0b50c..71308c86f26f5b67652eae8908e380b06b9163ff 100644 (file)
@@ -234,15 +234,13 @@ uint32_t ctdb_get_num_connected_nodes(struct ctdb_context *ctdb)
 
 
 /*
-  called by the transport layer when a packet comes in
+  called when we need to process a packet. This can be a requeued packet
+  after a lockwait, or a real packet from another node
 */
-void ctdb_recv_pkt(struct ctdb_context *ctdb, uint8_t *data, uint32_t length)
+void ctdb_input_pkt(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
 {
-       struct ctdb_req_header *hdr = (struct ctdb_req_header *)data;
        TALLOC_CTX *tmp_ctx;
 
-       ctdb->status.node_packets_recv++;
-
        /* place the packet as a child of the tmp_ctx. We then use
           talloc_free() below to free it. If any of the calls want
           to keep it, then they will steal it somewhere else, and the
@@ -250,35 +248,10 @@ void ctdb_recv_pkt(struct ctdb_context *ctdb, uint8_t *data, uint32_t length)
        tmp_ctx = talloc_new(ctdb);
        talloc_steal(tmp_ctx, hdr);
 
-       if (length < sizeof(*hdr)) {
-               ctdb_set_error(ctdb, "Bad packet length %d\n", length);
-               goto done;
-       }
-       if (length != hdr->length) {
-               ctdb_set_error(ctdb, "Bad header length %d expected %d\n", 
-                              hdr->length, length);
-               goto done;
-       }
-
-       if (hdr->ctdb_magic != CTDB_MAGIC) {
-               ctdb_set_error(ctdb, "Non CTDB packet rejected\n");
-               goto done;
-       }
-
-       if (hdr->ctdb_version != CTDB_VERSION) {
-               ctdb_set_error(ctdb, "Bad CTDB version 0x%x rejected\n", hdr->ctdb_version);
-               goto done;
-       }
-
        DEBUG(3,(__location__ " ctdb request %d of type %d length %d from "
                 "node %d to %d\n", hdr->reqid, hdr->operation, hdr->length,
                 hdr->srcnode, hdr->destnode));
 
-       /* up the counter for this source node, so we know its alive */
-       if (ctdb_validate_vnn(ctdb, hdr->srcnode)) {
-               ctdb->nodes[hdr->srcnode]->rx_cnt++;
-       }
-
        switch (hdr->operation) {
        case CTDB_REQ_CALL:
        case CTDB_REPLY_CALL:
@@ -361,15 +334,49 @@ done:
        talloc_free(tmp_ctx);
 }
 
+
 /*
   called by the transport layer when a packet comes in
 */
-void ctdb_recv_raw_pkt(void *p, uint8_t *data, uint32_t length)
+static void ctdb_recv_pkt(struct ctdb_context *ctdb, uint8_t *data, uint32_t length)
 {
-       struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context);
-       ctdb_recv_pkt(ctdb, data, length);
+       struct ctdb_req_header *hdr = (struct ctdb_req_header *)data;
+
+       ctdb->status.node_packets_recv++;
+
+       if (length < sizeof(*hdr)) {
+               ctdb_set_error(ctdb, "Bad packet length %d\n", length);
+               return;
+       }
+       if (length != hdr->length) {
+               ctdb_set_error(ctdb, "Bad header length %d expected %d\n", 
+                              hdr->length, length);
+               return;
+       }
+
+       if (hdr->ctdb_magic != CTDB_MAGIC) {
+               ctdb_set_error(ctdb, "Non CTDB packet rejected\n");
+               return;
+       }
+
+       if (hdr->ctdb_version != CTDB_VERSION) {
+               ctdb_set_error(ctdb, "Bad CTDB version 0x%x rejected\n", hdr->ctdb_version);
+               return;
+       }
+
+       /* up the counter for this source node, so we know its alive */
+       if (ctdb_validate_vnn(ctdb, hdr->srcnode)) {
+               /* as a special case, redirected calls don't increment the rx_cnt */
+               if (hdr->operation != CTDB_REQ_CALL ||
+                   ((struct ctdb_req_call *)hdr)->hopcount == 0) {
+                       ctdb->nodes[hdr->srcnode]->rx_cnt++;
+               }
+       }
+
+       ctdb_input_pkt(ctdb, hdr);
 }
 
+
 /*
   called by the transport layer when a node is dead
 */
@@ -423,8 +430,7 @@ static void queue_next_trigger(struct event_context *ev, struct timed_event *te,
                               struct timeval t, void *private_data)
 {
        struct queue_next *q = talloc_get_type(private_data, struct queue_next);
-       ctdb_recv_pkt(q->ctdb, (uint8_t *)q->hdr, q->hdr->length);
-       talloc_free(q);
+       ctdb_input_pkt(q->ctdb, q->hdr);
 }      
 
 /*
@@ -447,8 +453,7 @@ static void ctdb_defer_packet(struct ctdb_context *ctdb, struct ctdb_req_header
        }
 #if 0
        /* use this to put packets directly into our recv function */
-       ctdb_recv_pkt(q->ctdb, (uint8_t *)q->hdr, q->hdr->length);
-       talloc_free(q);
+       ctdb_input_pkt(q->ctdb, q->hdr);
 #else
        event_add_timed(ctdb->ev, q, timeval_zero(), queue_next_trigger, q);
 #endif
index c19d88f660effd79e35b68f6414ef61af8686035..b65f30907532eddc33fa29922106e3c5bafbe659 100644 (file)
 }
 
 
+/*
+  a varient of input packet that can be used in lock requeue
+*/
+void ctdb_call_input_pkt(void *p, struct ctdb_req_header *hdr)
+{
+       struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context);
+       ctdb_input_pkt(ctdb, hdr);
+}
+
+
 /*
   local version of ctdb_call
 */
@@ -363,7 +373,7 @@ void ctdb_request_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr
        
        /* fetch the current record */
        ret = ctdb_ltdb_lock_fetch_requeue(ctdb_db, key, &header, hdr, &data2,
-                                          ctdb_recv_raw_pkt, ctdb, False);
+                                          ctdb_call_input_pkt, ctdb, False);
        if (ret == -1) {
                ctdb_fatal(ctdb, "ctdb_req_dmaster failed to fetch record");
                return;
@@ -436,7 +446,7 @@ void ctdb_request_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
           if the call will be answered locally */
 
        ret = ctdb_ltdb_lock_fetch_requeue(ctdb_db, call.key, &header, hdr, &data,
-                                          ctdb_recv_raw_pkt, ctdb, False);
+                                          ctdb_call_input_pkt, ctdb, False);
        if (ret == -1) {
                ctdb_send_error(ctdb, hdr, ret, "ltdb fetch failed in ctdb_request_call");
                return;
@@ -558,7 +568,7 @@ void ctdb_reply_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
        data.dsize = c->datalen;
 
        ret = ctdb_ltdb_lock_requeue(ctdb_db, key, hdr,
-                                    ctdb_recv_raw_pkt, ctdb, False);
+                                    ctdb_call_input_pkt, ctdb, False);
        if (ret == -2) {
                return;
        }
index c0f8d422e8eee5d94bc45040ac5dcf3230e505ef..4bf16cd8a698ba364552fdca3c730b48fe7e24dc 100644 (file)
@@ -40,7 +40,7 @@ struct ctdb_client {
 };
 
 
-static void daemon_incoming_packet(void *, uint8_t *, uint32_t );
+static void daemon_incoming_packet(void *, struct ctdb_req_header *);
 
 static void ctdb_main_loop(struct ctdb_context *ctdb)
 {
@@ -438,9 +438,8 @@ static void daemon_request_control_from_client(struct ctdb_client *client,
                                               struct ctdb_req_control *c);
 
 /* data contains a packet from the client */
-static void daemon_incoming_packet(void *p, uint8_t *data, uint32_t nread)
+static void daemon_incoming_packet(void *p, struct ctdb_req_header *hdr)
 {
-       struct ctdb_req_header *hdr = (struct ctdb_req_header *)data;
        struct ctdb_client *client = talloc_get_type(p, struct ctdb_client);
        TALLOC_CTX *tmp_ctx;
        struct ctdb_context *ctdb = client->ctdb;
@@ -539,7 +538,7 @@ static void ctdb_daemon_read_cb(uint8_t *data, size_t cnt, void *args)
                 hdr->srcnode, hdr->destnode));
 
        /* it is the responsibility of the incoming packet function to free 'data' */
-       daemon_incoming_packet(client, data, cnt);
+       daemon_incoming_packet(client, hdr);
 }
 
 static void ctdb_accept_client(struct event_context *ev, struct fd_event *fde, 
index c5a5fdec95796ae0fd2344b7a8d1662e3ea80453..caa538ffc1c97a5e39497959606573d7d4ea56c6 100644 (file)
@@ -192,7 +192,7 @@ int ctdb_ltdb_unlock(struct ctdb_db_context *ctdb_db, TDB_DATA key)
 
 struct lock_fetch_state {
        struct ctdb_context *ctdb;
-       void (*recv_pkt)(void *, uint8_t *, uint32_t);
+       void (*recv_pkt)(void *, struct ctdb_req_header *);
        void *recv_context;
        struct ctdb_req_header *hdr;
        uint32_t generation;
@@ -211,7 +211,7 @@ static void lock_fetch_callback(void *p)
                talloc_free(state->hdr);
                return;
        }
-       state->recv_pkt(state->recv_context, (uint8_t *)state->hdr, state->hdr->length);
+       state->recv_pkt(state->recv_context, state->hdr);
        DEBUG(2,(__location__ " PACKET REQUEUED\n"));
 }
 
@@ -242,7 +242,7 @@ static void lock_fetch_callback(void *p)
  */
 int ctdb_ltdb_lock_requeue(struct ctdb_db_context *ctdb_db, 
                           TDB_DATA key, struct ctdb_req_header *hdr,
-                          void (*recv_pkt)(void *, uint8_t *, uint32_t ),
+                          void (*recv_pkt)(void *, struct ctdb_req_header *),
                           void *recv_context, bool ignore_generation)
 {
        int ret;
@@ -285,7 +285,7 @@ int ctdb_ltdb_lock_requeue(struct ctdb_db_context *ctdb_db,
                return -1;
        }
 
-       /* we need to move the packet off the temporary context in ctdb_recv_pkt(),
+       /* we need to move the packet off the temporary context in ctdb_input_pkt(),
           so it won't be freed yet */
        talloc_steal(state, hdr);
        talloc_steal(state, h);
@@ -300,7 +300,7 @@ int ctdb_ltdb_lock_requeue(struct ctdb_db_context *ctdb_db,
 int ctdb_ltdb_lock_fetch_requeue(struct ctdb_db_context *ctdb_db, 
                                 TDB_DATA key, struct ctdb_ltdb_header *header, 
                                 struct ctdb_req_header *hdr, TDB_DATA *data,
-                                void (*recv_pkt)(void *, uint8_t *, uint32_t ),
+                                void (*recv_pkt)(void *, struct ctdb_req_header *),
                                 void *recv_context, bool ignore_generation)
 {
        int ret;
index a2545ac7a7f6a97f994f4c4575c331eef6e94a45..b4470b00cb122abdf5f9ed4c861a93d7618f8555 100644 (file)
@@ -576,14 +576,14 @@ int ctdb_ltdb_store(struct ctdb_db_context *ctdb_db, TDB_DATA key,
 void ctdb_queue_packet(struct ctdb_context *ctdb, struct ctdb_req_header *hdr);
 int ctdb_ltdb_lock_requeue(struct ctdb_db_context *ctdb_db, 
                           TDB_DATA key, struct ctdb_req_header *hdr,
-                          void (*recv_pkt)(void *, uint8_t *, uint32_t ),
+                          void (*recv_pkt)(void *, struct ctdb_req_header *),
                           void *recv_context, bool ignore_generation);
 int ctdb_ltdb_lock_fetch_requeue(struct ctdb_db_context *ctdb_db, 
                                 TDB_DATA key, struct ctdb_ltdb_header *header, 
                                 struct ctdb_req_header *hdr, TDB_DATA *data,
-                                void (*recv_pkt)(void *, uint8_t *, uint32_t ),
+                                void (*recv_pkt)(void *, struct ctdb_req_header *),
                                 void *recv_context, bool ignore_generation);
-void ctdb_recv_pkt(struct ctdb_context *ctdb, uint8_t *data, uint32_t length);
+void ctdb_input_pkt(struct ctdb_context *ctdb, struct ctdb_req_header *);
 
 struct ctdb_call_state *ctdb_call_local_send(struct ctdb_db_context *ctdb_db, 
                                             struct ctdb_call *call,