RB_TREE: Add mechanism to abort a traverse
authorRonnie Sahlberg <ronniesahlberg@gmail.com>
Wed, 2 Nov 2011 02:33:28 +0000 (13:33 +1100)
committerRonnie Sahlberg <ronniesahlberg@gmail.com>
Tue, 8 Nov 2011 02:40:28 +0000 (13:40 +1100)
This patch changes the callback signature for traversal
functions to allow a client to abort a traverse before it finishes.
Updates to all callers and examples as well as rb-test tool.

common/rb_tree.c
common/rb_tree.h
server/ctdb_serverids.c
server/ctdb_takeover.c
server/ctdb_vacuum.c
tests/src/rb_test.c
tools/ctdb.c
tools/ctdb_vacuum.c

index b2c2ee83d5036dd99d035882e76b1da553f6346b..8458c510bea606d8aadcd27209deea0573ace82d 100644 (file)
@@ -916,13 +916,17 @@ trbt_lookuparray32(trbt_tree_t *tree, uint32_t keylen, uint32_t *key)
 
 
 /* traverse a tree starting at node */
-static void 
+static int
 trbt_traversearray32_node(trbt_node_t *node, uint32_t keylen, 
-       void (*callback)(void *param, void *data), 
+       int (*callback)(void *param, void *data), 
        void *param)
 {
        if (node->left) {
-               trbt_traversearray32_node(node->left, keylen, callback, param);
+               int ret;
+               ret = trbt_traversearray32_node(node->left, keylen, callback, param);
+               if (ret != 0) {
+                       return ret;
+               }
        }
 
        /* this is the smallest node in this subtree
@@ -930,35 +934,52 @@ trbt_traversearray32_node(trbt_node_t *node, uint32_t keylen,
           otherwise we must pull the next subtree and traverse that one as well
        */
        if (keylen == 0) {
-               callback(param, node->data);
+               int ret;
+
+               ret = callback(param, node->data);
+               if (ret != 0) {
+                       return ret;
+               }
        } else {
-               trbt_traversearray32(node->data, keylen, callback, param);
+               int ret;
+
+               ret = trbt_traversearray32(node->data, keylen, callback, param);
+               if (ret != 0) {
+                       return ret;
+               }
        }
 
        if (node->right) {
-               trbt_traversearray32_node(node->right, keylen, callback, param);
+               int ret;
+
+               ret = trbt_traversearray32_node(node->right, keylen, callback, param);
+               if (ret != 0) {
+                       return ret;
+               }
        }
+
+       return 0;
 }
        
 
 /* traverse the tree using an array of uint32 as a key */
-void 
+int 
 trbt_traversearray32(trbt_tree_t *tree, uint32_t keylen, 
-       void (*callback)(void *param, void *data), 
+       int (*callback)(void *param, void *data), 
        void *param)
 {
        trbt_node_t *node;
 
        if (tree == NULL) {
-               return;
+               return 0;
        }
 
        node=tree->root;
        if (node == NULL) {
-               return;
+               return 0;
        }
 
-       trbt_traversearray32_node(node, keylen-1, callback, param);
+       return trbt_traversearray32_node(node, keylen-1, callback, param);
 }
 
 
@@ -999,7 +1020,7 @@ trbt_findfirstarray32(trbt_tree_t *tree, uint32_t keylen)
 }
 
 
-#if 0
+#if TEST_RB_TREE
 static void printtree(trbt_node_t *node, int levels)
 {
        int i;
@@ -1007,7 +1028,7 @@ static void printtree(trbt_node_t *node, int levels)
        printtree(node->left, levels+1);
 
        for(i=0;i<levels;i++)printf("    ");
-       printf("key:%d COLOR:%s (node:0x%08x parent:0x%08x left:0x%08x right:0x%08x)\n",node->key32,node->rb_color==TRBT_BLACK?"BLACK":"RED",(int)node,(int)node->parent, (int)node->left,(int)node->right);
+       printf("key:%d COLOR:%s (node:%p parent:%p left:%p right:%p)\n",node->key32,node->rb_color==TRBT_BLACK?"BLACK":"RED", node, node->parent, node->left, node->right);
 
        printtree(node->right, levels+1);
        printf("\n");
@@ -1021,13 +1042,11 @@ void print_tree(trbt_tree_t *tree)
        }
        printf("---\n");
        printtree(tree->root->left, 1);
-       printf("root node key:%d COLOR:%s (node:0x%08x left:0x%08x right:0x%08x)\n",tree->root->key32,tree->root->rb_color==TRBT_BLACK?"BLACK":"RED",(int)tree->root,(int)tree->root->left,(int)tree->root->right);
+       printf("root node key:%d COLOR:%s (node:%p left:%p right:%p)\n",tree->root->key32,tree->root->rb_color==TRBT_BLACK?"BLACK":"RED", tree->root, tree->root->left, tree->root->right);
        printtree(tree->root->right, 1);
        printf("===\n");
 }
-#endif
 
-# if 0
 void 
 test_tree(void)
 {
@@ -1037,7 +1056,7 @@ test_tree(void)
        int NUM=15;
        int cnt=0;
 
-       tree=trbt_create(talloc_new(NULL));
+       tree=trbt_create(talloc_new(NULL), 0);
 #if 0
        for(i=0;i<10;i++){
                printf("adding node %i\n",i);
index eef0bc5227a92dd8b126f8098ec7720523b59232..4f2ab22d36dd8b846f7b32130f980cac064d9ac5 100644 (file)
@@ -74,8 +74,13 @@ void trbt_insertarray32_callback(trbt_tree_t *tree, uint32_t keylen, uint32_t *k
    and return a pointer to data or NULL */
 void *trbt_lookuparray32(trbt_tree_t *tree, uint32_t keylen, uint32_t *key);
 
-/* Traverse a tree with a key based on an array of uint32 */
-void trbt_traversearray32(trbt_tree_t *tree, uint32_t keylen, void (*callback)(void *param, void *data), void *param);
+/* Traverse a tree with a key based on an array of uint32
+   returns 0 if traverse completed
+   !0 if the traverse was aborted
+
+   If the callback returns !0  the traverse will be aborted
+*/
+int trbt_traversearray32(trbt_tree_t *tree, uint32_t keylen, int (*callback)(void *param, void *data), void *param);
 
 /* Lookup the first node in the tree with a key based on an array of uint32 
    and return a pointer to data or NULL */
index 4ed3012a30fa6b81be0afd6be38db3b4d1d5d4b7..dba25edced95b8f7ccca95ae1d8388338d85b57c 100644 (file)
@@ -119,20 +119,21 @@ struct count_server_ids {
        struct ctdb_server_id_list *list;
 };
 
-static void server_id_count(void *param, void *data)
+static int server_id_count(void *param, void *data)
 {
        struct count_server_ids *svid = talloc_get_type(param, 
                                                struct count_server_ids);
 
        if (svid == NULL) {
                DEBUG(DEBUG_ERR, (__location__ " Got null pointer for svid\n"));
-               return;
+               return -1;
        }
 
        svid->count++;
+       return 0;
 }
 
-static void server_id_store(void *param, void *data)
+static int server_id_store(void *param, void *data)
 {
        struct count_server_ids *svid = talloc_get_type(param, 
                                                struct count_server_ids);
@@ -141,16 +142,17 @@ static void server_id_store(void *param, void *data)
 
        if (svid == NULL) {
                DEBUG(DEBUG_ERR, (__location__ " Got null pointer for svid\n"));
-               return;
+               return -1;
        }
 
        if (svid->count >= svid->list->num) {
                DEBUG(DEBUG_ERR, (__location__ " size of server id tree changed during traverse\n"));
-               return;
+               return -1;
        }
 
        memcpy(&svid->list->server_ids[svid->count], server_id, sizeof(struct ctdb_server_id));
        svid->count++;
+       return 0;
 }
 
 /* 
index a7f125041f3a1314fbd65117409d04d65c819a8d..31946e70ecf2144aa2d2fa5d74e45a7812e506e7 100644 (file)
@@ -1251,13 +1251,14 @@ static void *add_ip_callback(void *parm, void *data)
        return parm;
 }
 
-void getips_count_callback(void *param, void *data)
+static int getips_count_callback(void *param, void *data)
 {
        struct ctdb_public_ip_list **ip_list = (struct ctdb_public_ip_list **)param;
        struct ctdb_public_ip_list *new_ip = (struct ctdb_public_ip_list *)data;
 
        new_ip->next = *ip_list;
        *ip_list     = new_ip;
+       return 0;
 }
 
 static struct ctdb_public_ip_list *
@@ -2827,7 +2828,7 @@ static void capture_tcp_handler(struct event_context *ev, struct fd_event *fde,
     by a RST)
    this callback is called for each connection we are currently trying to kill
 */
-static void tickle_connection_traverse(void *param, void *data)
+static int tickle_connection_traverse(void *param, void *data)
 {
        struct ctdb_killtcp_con *con = talloc_get_type(data, struct ctdb_killtcp_con);
 
@@ -2835,7 +2836,7 @@ static void tickle_connection_traverse(void *param, void *data)
        if (con->count >= 5) {
                /* can't delete in traverse: reparent to delete_cons */
                talloc_steal(param, con);
-               return;
+               return 0;
        }
 
        /* othervise, try tickling it again */
@@ -2844,6 +2845,7 @@ static void tickle_connection_traverse(void *param, void *data)
                (ctdb_sock_addr *)&con->dst_addr,
                (ctdb_sock_addr *)&con->src_addr,
                0, 0, 0);
+       return 0;
 }
 
 
index 50ff45a01c343b5e293fea847aa515f890fdb226..2be3e4b876f98e441188bbec995787ecc3f9c890 100644 (file)
@@ -279,7 +279,7 @@ static int vacuum_traverse(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data,
  * traverse the tree of records to delete and marshall them into
  * a blob
  */
-static void delete_traverse(void *param, void *data)
+static int delete_traverse(void *param, void *data)
 {
        struct delete_record_data *dd = talloc_get_type(data, struct delete_record_data);
        struct delete_records_list *recs = talloc_get_type(param, struct delete_records_list);
@@ -289,17 +289,18 @@ static void delete_traverse(void *param, void *data)
        rec = ctdb_marshall_record(dd, recs->records->db_id, dd->key, &dd->hdr, tdb_null);
        if (rec == NULL) {
                DEBUG(DEBUG_ERR, (__location__ " failed to marshall record\n"));
-               return;
+               return 0;
        }
 
        old_size = talloc_get_size(recs->records);
        recs->records = talloc_realloc_size(NULL, recs->records, old_size + rec->length);
        if (recs->records == NULL) {
                DEBUG(DEBUG_ERR,(__location__ " Failed to expand\n"));
-               return;
+               return 0;
        }
        recs->records->count++;
        memcpy(old_size+(uint8_t *)(recs->records), rec, rec->length);
+       return 0;
 }
 
 /**
@@ -322,7 +323,7 @@ static void delete_traverse(void *param, void *data)
  *    add it to the list of records that are to be sent to
  *    the lmaster with the VACUUM_FETCH message.
  */
-static void delete_queue_traverse(void *param, void *data)
+static int delete_queue_traverse(void *param, void *data)
 {
        struct delete_record_data *dd =
                talloc_get_type(data, struct delete_record_data);
@@ -340,7 +341,7 @@ static void delete_queue_traverse(void *param, void *data)
        if (res != 0) {
                DEBUG(DEBUG_ERR, (__location__ " Error getting chainlock.\n"));
                vdata->fast_error++;
-               return;
+               return 0;
        }
 
        tdb_data = tdb_fetch(ctdb_db->ltdb->tdb, dd->key);
@@ -431,7 +432,7 @@ done:
        }
        tdb_chainunlock(ctdb_db->ltdb->tdb, dd->key);
 
-       return;
+       return 0;
 }
 
 /* 
index 707f7d71dec966527f97a16805cc813a08676e67..5de6c103568477eb89387d103e8ef3f0999aa05d 100644 (file)
@@ -62,27 +62,44 @@ void *random_add(void *p, void *d)
        return p;
 }
 
-void traverse(void *p, void *d)
+int traverse(void *p, void *d)
 {
        uint32_t *data = (uint32_t *)d;
 
        printf("traverse data:%d\n",*data);
+       return 0;
 }
 
-void random_traverse(void *p, void *d)
+int random_traverse(void *p, void *d)
 {
        printf("%s   ",(char *)d);
+       return 0;
 }
 
 static uint32_t calc_checksum = 0;     
-void traverse_checksum(void *p, void *d)
+int traverse_checksum(void *p, void *d)
 {
        int i,j,k;
 
        sscanf(d, "%d.%d.%d", &i, &j, &k);
        calc_checksum += i*100+j*10+k;
+       return 0;
+}
+
+int count_traverse(void *p, void *d)
+{
+       int *count = p;
+       (*count)++;
+       return 0;
 }
-                               
+
+int count_traverse_abort(void *p, void *d)
+{
+       int *count = p;
+       (*count)++;
+       return -1;
+}
+
 /*
   main program
 */
@@ -94,7 +111,7 @@ int main(int argc, const char *argv[])
                { "num-records", 'r', POPT_ARG_INT, &num_records, 0, "num_records", "integer" },
                POPT_TABLEEND
        };
-       int opt;
+       int opt, traverse_count;
        const char **extra_argv;
        int extra_argc = 0;
        poptContext pc;
@@ -303,7 +320,19 @@ int main(int argc, const char *argv[])
        printf("\n");
        printf("first node: %s\n", (char *)trbt_findfirstarray32(tree, 3));
 
+       traverse_count = 0;
+       trbt_traversearray32(tree, 3, count_traverse, &traverse_count);
+       printf("\n");
+       printf("number of entries in traverse %d\n", traverse_count);
 
+       traverse_count = 0;
+       trbt_traversearray32(tree, 3, count_traverse_abort, &traverse_count);
+       printf("\n");
+       printf("number of entries in aborted traverse %d\n", traverse_count);
+       if (traverse_count != 1) {
+               printf("Failed to abort the traverse. Should have been aborted after 1 element but did iterate over %d elements\n", traverse_count);
+               exit(10);
+       }
        printf("\ndeleting all entries\n");
        for(i=0;i<10;i++){
        for(j=0;j<10;j++){
index 876521a3d4f48c7be83c9845a669c4cc386ca7f6..5b5dd9c3dee4c67ef1c742aedd21d8bdd747e945 100644 (file)
@@ -1365,7 +1365,7 @@ static int control_moveip(struct ctdb_context *ctdb, int argc, const char **argv
        return 0;
 }
 
-void getips_store_callback(void *param, void *data)
+static int getips_store_callback(void *param, void *data)
 {
        struct ctdb_public_ip *node_ip = (struct ctdb_public_ip *)data;
        struct ctdb_all_public_ips *ips = param;
@@ -1374,13 +1374,15 @@ void getips_store_callback(void *param, void *data)
        i = ips->num++;
        ips->ips[i].pnn  = node_ip->pnn;
        ips->ips[i].addr = node_ip->addr;
+       return 0;
 }
 
-void getips_count_callback(void *param, void *data)
+static int getips_count_callback(void *param, void *data)
 {
        uint32_t *count = param;
 
        (*count)++;
+       return 0;
 }
 
 #define IP_KEYLEN      4
index a9dc28a9a60edcab61c634aadcf50df9f6f2354a..419e660a7faf15f9b38dfea2100accbda9fa3c3d 100644 (file)
@@ -160,7 +160,7 @@ struct delete_records_list {
  traverse the tree of records to delete and marshall them into
  a blob
 */
-static void
+static int
 delete_traverse(void *param, void *data)
 {
        struct delete_record_data *dd = talloc_get_type(data, struct delete_record_data);
@@ -171,21 +171,22 @@ delete_traverse(void *param, void *data)
        rec = ctdb_marshall_record(dd, recs->records->db_id, dd->key, &dd->hdr, tdb_null);
        if (rec == NULL) {
                DEBUG(DEBUG_ERR, (__location__ " failed to marshall record\n"));
-               return;
+               return 0;
        }
 
        old_size = talloc_get_size(recs->records);
        recs->records = talloc_realloc_size(NULL, recs->records, old_size + rec->length);
        if (recs->records == NULL) {
                DEBUG(DEBUG_ERR,(__location__ " Failed to expand\n"));
-               return;
+               return 0;
        }
        recs->records->count++;
        memcpy(old_size+(uint8_t *)(recs->records), rec, rec->length);
+       return 0;
 }
 
 
-static void delete_record(void *param, void *d)
+static int delete_record(void *param, void *d)
 {
        struct delete_record_data *dd = talloc_get_type(d, struct delete_record_data);
        struct ctdb_context *ctdb = dd->ctdb;
@@ -197,18 +198,18 @@ static void delete_record(void *param, void *d)
        /* its deleted on all other nodes - refetch, check and delete */
        if (tdb_chainlock_nonblock(ctdb_db->ltdb->tdb, dd->key) != 0) {
                /* the chain is busy - come back later */
-               return;
+               return 0;
        }
 
        data = tdb_fetch(ctdb_db->ltdb->tdb, dd->key);
        if (data.dptr == NULL) {
                tdb_chainunlock(ctdb_db->ltdb->tdb, dd->key);
-               return;
+               return 0;
        }
        if (data.dsize != sizeof(struct ctdb_ltdb_header)) {
                free(data.dptr);
                tdb_chainunlock(ctdb_db->ltdb->tdb, dd->key);
-               return;
+               return 0;
        }
 
        hdr = (struct ctdb_ltdb_header *)data.dptr;
@@ -219,7 +220,7 @@ static void delete_record(void *param, void *d)
            dd->hdr.rsn != hdr->rsn) {
                tdb_chainunlock(ctdb_db->ltdb->tdb, dd->key);
                free(data.dptr);
-               return;
+               return 0;
        }
 
        ctdb_block_signal(SIGALRM);
@@ -229,6 +230,7 @@ static void delete_record(void *param, void *d)
        free(data.dptr);
 
        (*count)++;
+       return 0;
 }
 
 /* vacuum one database */