traverse: Check if local traverse failed or succeeded
authorAmitay Isaacs <amitay@gmail.com>
Fri, 6 Sep 2013 08:11:40 +0000 (18:11 +1000)
committerAmitay Isaacs <amitay@gmail.com>
Wed, 25 Sep 2013 04:59:45 +0000 (14:59 +1000)
By passing the result of tdb_traverse_read() allows ctdbd to determine
if the local traverse succeeded or not.  In case of a problem with local
traverse, ctdbd can log an error.

Signed-off-by: Amitay Isaacs <amitay@gmail.com>
server/ctdb_traverse.c

index e37ba9c2daeaf58e76c09c4ed4e1903e4a78a5ed..83fd5dc1dfcb0c58f300f24129c2658ca7888a7c 100644 (file)
@@ -44,6 +44,8 @@ struct ctdb_traverse_local_handle {
        ctdb_traverse_fn_t callback;
        bool withemptyrecords;
        struct tevent_fd *fde;
+       int records_failed;
+       int records_sent;
 };
 
 /*
@@ -56,12 +58,25 @@ static void ctdb_traverse_child_handler(struct tevent_context *ev, struct tevent
                                                        struct ctdb_traverse_local_handle);
        ctdb_traverse_fn_t callback = h->callback;
        void *p = h->private_data;
-       char res;
+       int res;
+       ssize_t n;
+
+       /* Read the number of records sent by traverse child */
+       n = read(h->fd[0], &res, sizeof(res));
+       if (n < 0 || n != sizeof(res)) {
+               /* Traverse child failed */
+               DEBUG(DEBUG_ERR, ("Local traverse failed db:%s reqid:%d\n",
+                                 h->ctdb_db->db_name, h->reqid));
+       } else if (res < 0) {
+               /* Traverse failed */
+               res = -res;
+               DEBUG(DEBUG_ERR, ("Local traverse failed db:%s reqid:%d records:%d\n",
+                                 h->ctdb_db->db_name, h->reqid, res));
+       } else {
+               DEBUG(DEBUG_INFO, ("Local traverse end db:%s reqid:%d records:%d\n",
+                                  h->ctdb_db->db_name, h->reqid, res));
+       }
 
-       /* FIXME: There is no way to distinguish between failed traverse and
-        * successful traverse.  The only way to signal the end is by sending
-        * tdb_null for key and data. */
-       read(h->fd[0], &res, 1);
        callback(p, tdb_null, tdb_null);
 }
 
@@ -106,6 +121,7 @@ static int ctdb_traverse_local_fn(struct tdb_context *tdb, TDB_DATA key, TDB_DAT
        d = ctdb_marshall_record(h, h->reqid, key, NULL, data);
        if (d == NULL) {
                /* error handling is tricky in this child code .... */
+               h->records_failed++;
                return -1;
        }
 
@@ -115,9 +131,11 @@ static int ctdb_traverse_local_fn(struct tdb_context *tdb, TDB_DATA key, TDB_DAT
        res = ctdb_control(h->ctdb_db->ctdb, h->srcnode, 0, CTDB_CONTROL_TRAVERSE_DATA,
                           CTDB_CTRL_FLAG_NOREPLY, outdata, NULL, NULL, &status, NULL, NULL);
        if (res != 0 || status != 0) {
+               h->records_failed++;
                return -1;
        }
 
+       h->records_sent++;
        return 0;
 }
 
@@ -177,7 +195,7 @@ static struct ctdb_traverse_local_handle *ctdb_traverse_local(struct ctdb_db_con
 
        if (h->child == 0) {
                /* start the traverse in the child */
-               char res = 0;
+               int res;
                pid_t parent = getpid();
 
                close(h->fd[0]);
@@ -187,13 +205,17 @@ static struct ctdb_traverse_local_handle *ctdb_traverse_local(struct ctdb_db_con
                                                 "traverse_local-%s:",
                                                 ctdb_db->db_name) != 0) {
                        DEBUG(DEBUG_CRIT, ("Failed to switch traverse child into client mode\n"));
-                       res = -1;
+                       _exit(0);
                }
 
-               if (tdb_traverse_read(ctdb_db->ltdb->tdb, ctdb_traverse_local_fn, h) != 0) {
-                       res = -1;
+               res = tdb_traverse_read(ctdb_db->ltdb->tdb, ctdb_traverse_local_fn, h);
+               if (res == -1 || h->records_failed > 0) {
+                       /* traverse failed */
+                       res = -(h->records_sent);
+               } else {
+                       res = h->records_sent;
                }
-               write(h->fd[1], &res, 1);
+               write(h->fd[1], &res, sizeof(res));
 
                while (ctdb_kill(ctdb_db->ctdb, parent, 0) == 0 || errno != ESRCH) {
                        sleep(5);