make sure we ignore requeued ctdb_call packets of older generations except for packet...
authorAndrew Tridgell <tridge@samba.org>
Sat, 12 May 2007 08:08:50 +0000 (18:08 +1000)
committerAndrew Tridgell <tridge@samba.org>
Sat, 12 May 2007 08:08:50 +0000 (18:08 +1000)
(This used to be ctdb commit facab105fbd7fe50f96bdd763ae50ddc54fbdacc)

ctdb/common/ctdb_call.c
ctdb/common/ctdb_daemon.c
ctdb/common/ctdb_ltdb.c
ctdb/common/ctdb_recover.c
ctdb/include/ctdb_private.h

index a8e470126de4d9aa16701630fc0ee453fbb82550..faf2065049dc3372f563191303d16d00f933e0df 100644 (file)
@@ -363,7 +363,7 @@ void ctdb_request_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr
        
        /* fetch the current record */
        ret = ctdb_ltdb_lock_fetch_requeue(ctdb_db, key, &header, hdr, &data2,
-                                          ctdb_recv_raw_pkt, ctdb);
+                                          ctdb_recv_raw_pkt, ctdb, False);
        if (ret == -1) {
                ctdb_fatal(ctdb, "ctdb_req_dmaster failed to fetch record");
                return;
@@ -433,7 +433,7 @@ void ctdb_request_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
           if the call will be answered locally */
 
        ret = ctdb_ltdb_lock_fetch_requeue(ctdb_db, call.key, &header, hdr, &data,
-                                          ctdb_recv_raw_pkt, ctdb);
+                                          ctdb_recv_raw_pkt, ctdb, False);
        if (ret == -1) {
                ctdb_send_error(ctdb, hdr, ret, "ltdb fetch failed in ctdb_request_call");
                return;
@@ -556,7 +556,7 @@ void ctdb_reply_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
        data.dsize = c->datalen;
 
        ret = ctdb_ltdb_lock_requeue(ctdb_db, key, hdr,
-                                    ctdb_recv_raw_pkt, ctdb);
+                                    ctdb_recv_raw_pkt, ctdb, False);
        if (ret == -2) {
                return;
        }
index b2fb14e0e0a5bbbda525df2d56551ab28ab445b0..6d4601d7595f2fd1f2429e016a6522e5ba17dc52 100644 (file)
@@ -413,7 +413,7 @@ static void daemon_request_call_from_client(struct ctdb_client *client,
 
        ret = ctdb_ltdb_lock_fetch_requeue(ctdb_db, key, &header, 
                                           (struct ctdb_req_header *)c, &data,
-                                          daemon_incoming_packet, client);
+                                          daemon_incoming_packet, client, True);
        if (ret == -2) {
                /* will retry later */
                ctdb->status.pending_calls--;
index 77ce0e80cb1438f68cdc8f26f246a88e5e36c9c4..c5a5fdec95796ae0fd2344b7a8d1662e3ea80453 100644 (file)
@@ -195,6 +195,8 @@ struct lock_fetch_state {
        void (*recv_pkt)(void *, uint8_t *, uint32_t);
        void *recv_context;
        struct ctdb_req_header *hdr;
+       uint32_t generation;
+       bool ignore_generation;
 };
 
 /*
@@ -203,6 +205,12 @@ struct lock_fetch_state {
 static void lock_fetch_callback(void *p)
 {
        struct lock_fetch_state *state = talloc_get_type(p, struct lock_fetch_state);
+       if (!state->ignore_generation &&
+           state->generation != state->ctdb->vnn_map->generation) {
+               DEBUG(0,("Discarding previous generation lockwait packet\n"));
+               talloc_free(state->hdr);
+               return;
+       }
        state->recv_pkt(state->recv_context, (uint8_t *)state->hdr, state->hdr->length);
        DEBUG(2,(__location__ " PACKET REQUEUED\n"));
 }
@@ -235,7 +243,7 @@ static void lock_fetch_callback(void *p)
 int ctdb_ltdb_lock_requeue(struct ctdb_db_context *ctdb_db, 
                           TDB_DATA key, struct ctdb_req_header *hdr,
                           void (*recv_pkt)(void *, uint8_t *, uint32_t ),
-                          void *recv_context)
+                          void *recv_context, bool ignore_generation)
 {
        int ret;
        struct tdb_context *tdb = ctdb_db->ltdb->tdb;
@@ -267,6 +275,8 @@ int ctdb_ltdb_lock_requeue(struct ctdb_db_context *ctdb_db,
        state->hdr = hdr;
        state->recv_pkt = recv_pkt;
        state->recv_context = recv_context;
+       state->generation = ctdb_db->ctdb->vnn_map->generation;
+       state->ignore_generation = ignore_generation;
 
        /* now the contended path */
        h = ctdb_lockwait(ctdb_db, key, lock_fetch_callback, state);
@@ -291,11 +301,12 @@ int ctdb_ltdb_lock_fetch_requeue(struct ctdb_db_context *ctdb_db,
                                 TDB_DATA key, struct ctdb_ltdb_header *header, 
                                 struct ctdb_req_header *hdr, TDB_DATA *data,
                                 void (*recv_pkt)(void *, uint8_t *, uint32_t ),
-                                void *recv_context)
+                                void *recv_context, bool ignore_generation)
 {
        int ret;
 
-       ret = ctdb_ltdb_lock_requeue(ctdb_db, key, hdr, recv_pkt, recv_context);
+       ret = ctdb_ltdb_lock_requeue(ctdb_db, key, hdr, recv_pkt, 
+                                    recv_context, ignore_generation);
        if (ret == 0) {
                ret = ctdb_ltdb_fetch(ctdb_db, key, header, hdr, data);
                if (ret != 0) {
index 5a3188c755c6f09a81758c053bd25ebba7dee0f3..a3d373c2ef4038662fe3e622e4f0a46a2ac40990 100644 (file)
@@ -318,7 +318,9 @@ int32_t ctdb_control_push_db(struct ctdb_context *ctdb, TDB_DATA indata)
                        DEBUG(0, (__location__ " Unable to fetch record\n"));
                        goto failed;
                }
-               if (header.rsn < hdr->rsn) {
+               /* the <= is to cope with just-created records, which
+                  have a rsn of zero */
+               if (header.rsn <= hdr->rsn) {
                        ret = ctdb_ltdb_store(ctdb_db, key, hdr, data);
                        if (ret != 0) {
                                DEBUG(0, (__location__ " Unable to store record\n"));
index 11231d680eda0ae283bcddfedcabd3898acc40a0..e1ed96c91e5ddcad649d892c0a2e97b016847fe4 100644 (file)
@@ -556,12 +556,12 @@ void ctdb_queue_packet(struct ctdb_context *ctdb, struct ctdb_req_header *hdr);
 int ctdb_ltdb_lock_requeue(struct ctdb_db_context *ctdb_db, 
                           TDB_DATA key, struct ctdb_req_header *hdr,
                           void (*recv_pkt)(void *, uint8_t *, uint32_t ),
-                          void *recv_context);
+                          void *recv_context, bool ignore_generation);
 int ctdb_ltdb_lock_fetch_requeue(struct ctdb_db_context *ctdb_db, 
                                 TDB_DATA key, struct ctdb_ltdb_header *header, 
                                 struct ctdb_req_header *hdr, TDB_DATA *data,
                                 void (*recv_pkt)(void *, uint8_t *, uint32_t ),
-                                void *recv_context);
+                                void *recv_context, bool ignore_generation);
 void ctdb_recv_pkt(struct ctdb_context *ctdb, uint8_t *data, uint32_t length);
 
 struct ctdb_call_state *ctdb_call_local_send(struct ctdb_db_context *ctdb_db,