tier/ctr: Correcting rename logic
authorJoseph Fernandes <josferna@redhat.com>
Fri, 20 Nov 2015 19:34:21 +0000 (01:04 +0530)
committerDan Lambright <dlambrig@redhat.com>
Wed, 25 Nov 2015 16:21:35 +0000 (08:21 -0800)
Problem: When a file with old_file_name and GFID_1 is renamed with a new_file_name
which already exists and with GFID_2, this is what happens in linux internaly.
   a. "new_file_name" is unlinked for GFID_2
   b. a hardlink "new_file_name" is created to GFID_1
   c. "old_file_name" hardlink is unlinked for GFID_2.
Well this is all internal to linux, and gluster just issues a rename system call
at POSIX layer. But CTR Xlator doesn't delete the entries corresponding to the
"new_file_name" and GFID_2. Thus leaving the stale entry in the DB.
The following are the implications.
a. Promotion are tried on these stale entries which will fail and show
false results in the status of migration,
b. GFID_2 Files with 2 hardlinks, which will have only one hardlink
after the rename will not be promoted or demoted as the DB shows 2 entries.

Solution: Delete the older database entry for the replaced hardlink

Change-Id: I4eafa0872253e29ff1f0bec4283bcfc579ecf0e2
BUG: 1284090
Signed-off-by: Joseph Fernandes <josferna@redhat.com>
Reviewed-on: http://review.gluster.org/12711
Tested-by: NetBSD Build System <jenkins@build.gluster.org>
Tested-by: Gluster Build System <jenkins@build.gluster.com>
Reviewed-by: Dan Lambright <dlambrig@redhat.com>
Tested-by: Dan Lambright <dlambrig@redhat.com>
libglusterfs/src/gfdb/gfdb_data_store_types.h
tests/basic/tier/ctr-rename-overwrite.t [new file with mode: 0755]
xlators/features/changetimerecorder/src/changetimerecorder.c
xlators/features/changetimerecorder/src/ctr-helper.c
xlators/features/changetimerecorder/src/ctr-helper.h

index ce09e7317460ef740cf6da7c37fdb58ded113f31..d79bf41a8c91b753c74ccd6d6bd31e5af5d1030b 100644 (file)
@@ -268,13 +268,20 @@ isdentrycreatefop(gfdb_fop_type_t fop_type)
 /*The structure that is used to send insert/update the databases
  * using insert_db api*/
 typedef struct gfdb_db_record {
+        /* GFID */
         uuid_t                          gfid;
+        /* Used during a rename refer ctr_rename() in changetimerecorder
+         * xlator*/
+        uuid_t                          old_gfid;
+        /* Parent GFID */
         uuid_t                          pargfid;
         uuid_t                          old_pargfid;
-        char                            file_name[PATH_MAX];
+        /* File names and paths */
+        char                            file_name[GF_NAME_MAX];
         char                            file_path[PATH_MAX];
-        char                            old_file_name[PATH_MAX];
+        char                            old_file_name[GF_NAME_MAX];
         char                            old_path[PATH_MAX];
+        /* FOP type and FOP path*/
         gfdb_fop_type_t                 gfdb_fop_type;
         gfdb_fop_path_t                 gfdb_fop_path;
         /*Time of change or access*/
diff --git a/tests/basic/tier/ctr-rename-overwrite.t b/tests/basic/tier/ctr-rename-overwrite.t
new file mode 100755 (executable)
index 0000000..a1d5af0
--- /dev/null
@@ -0,0 +1,49 @@
+#!/bin/bash
+
+. $(dirname $0)/../../include.rc
+. $(dirname $0)/../../volume.rc
+. $(dirname $0)/../../tier.rc
+
+LAST_BRICK=1
+CACHE_BRICK_FIRST=4
+CACHE_BRICK_LAST=5
+
+DEMOTE_FREQ=5
+PROMOTE_FREQ=5
+
+cleanup
+
+# Start glusterd
+TEST glusterd
+TEST pidof glusterd
+
+# Set-up tier cluster
+TEST $CLI volume create $V0 replica 2 $H0:$B0/${V0}{0..$LAST_BRICK}
+TEST $CLI volume start $V0
+TEST $CLI volume attach-tier $V0 replica 2 $H0:$B0/${V0}$CACHE_BRICK_FIRST $H0:$B0/${V0}$CACHE_BRICK_LAST
+
+TEST $CLI volume set $V0 cluster.tier-demote-frequency $DEMOTE_FREQ
+TEST $CLI volume set $V0 cluster.tier-promote-frequency $PROMOTE_FREQ
+
+# Start and mount the volume after enabling CTR
+TEST $CLI volume set $V0 features.ctr-enabled on
+TEST $GFS --volfile-id=/$V0 --volfile-server=$H0 $M0;
+
+# create two files
+echo "hello world" > $M0/file1
+echo "hello world" > $M0/file2
+
+# db in hot brick shows 4 record. 2 for file1 and 2 for file2
+ENTRY_COUNT=$(echo "select * from gf_file_tb; select * from gf_flink_tb;" | \
+        sqlite3 $B0/${V0}5/.glusterfs/${V0}5.db | wc -l )
+TEST [ $ENTRY_COUNT -eq 4 ]
+
+#overwrite file2 with file1
+mv -f $M0/file1 $M0/file2
+
+# Now the db in hot tier should have only 2 records for file1.
+ENTRY_COUNT=$(echo "select * from gf_file_tb; select * from gf_flink_tb;" | \
+        sqlite3 $B0/${V0}5/.glusterfs/${V0}5.db | wc -l )
+TEST [ $ENTRY_COUNT -eq 2 ]
+
+cleanup
index 242fb2b51fc24a8033fbfedb6c666fc4f4b7c4b7..2c05d07dcb70639427ef7b37b319204fdd096d1f 100644 (file)
@@ -374,9 +374,10 @@ ctr_writev_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
 {
         int ret = -1;
 
-        CTR_IS_DISABLED_THEN_GOTO(this, out);
+        CTR_IS_DISABLED_THEN_GOTO (this, out);
+        CTR_IF_FOP_FAILED_THEN_GOTO (this, op_ret, op_errno, out);
 
-        ret = ctr_insert_unwind(frame, this,
+        ret = ctr_insert_unwind (frame, this,
                         GFDB_FOP_INODE_WRITE, GFDB_FOP_UNWIND);
         if (ret) {
                 gf_msg (this->name, GF_LOG_ERROR, 0,
@@ -386,6 +387,8 @@ ctr_writev_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
 
 
 out:
+        ctr_free_frame_local (frame);
+
         STACK_UNWIND_STRICT (writev, frame, op_ret, op_errno, prebuf,
                         postbuf, xdata);
 
@@ -438,6 +441,7 @@ ctr_setattr_cbk (call_frame_t *frame,
         int ret = -1;
 
         CTR_IS_DISABLED_THEN_GOTO(this, out);
+        CTR_IF_FOP_FAILED_THEN_GOTO (this, op_ret, op_errno, out);
 
         ret = ctr_insert_unwind(frame, this,
                         GFDB_FOP_INODE_WRITE, GFDB_FOP_UNWIND);
@@ -448,6 +452,8 @@ ctr_setattr_cbk (call_frame_t *frame,
         }
 
 out:
+        ctr_free_frame_local (frame);
+
         STACK_UNWIND_STRICT (setattr, frame, op_ret, op_errno, preop_stbuf,
                        postop_stbuf, xdata);
 
@@ -499,6 +505,7 @@ ctr_fsetattr_cbk (call_frame_t *frame,
         int ret = -1;
 
         CTR_IS_DISABLED_THEN_GOTO(this, out);
+        CTR_IF_FOP_FAILED_THEN_GOTO (this, op_ret, op_errno, out);
 
         ret = ctr_insert_unwind(frame, this,
                         GFDB_FOP_INODE_WRITE, GFDB_FOP_UNWIND);
@@ -509,6 +516,8 @@ ctr_fsetattr_cbk (call_frame_t *frame,
         }
 
 out:
+        ctr_free_frame_local (frame);
+
         STACK_UNWIND_STRICT (fsetattr, frame, op_ret, op_errno,
                                 preop_stbuf, postop_stbuf, xdata);
 
@@ -557,7 +566,7 @@ ctr_fremovexattr_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
         int ret = -1;
 
         CTR_IS_DISABLED_THEN_GOTO(this, out);
-
+        CTR_IF_FOP_FAILED_THEN_GOTO (this, op_ret, op_errno, out);
 
         ret = ctr_insert_unwind(frame, this,
                         GFDB_FOP_INODE_WRITE, GFDB_FOP_UNWIND);
@@ -568,6 +577,8 @@ ctr_fremovexattr_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
         }
 
 out:
+        ctr_free_frame_local (frame);
+
         STACK_UNWIND_STRICT (fremovexattr, frame, op_ret, op_errno, xdata);
 
         return 0;
@@ -614,6 +625,7 @@ ctr_removexattr_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
         int ret = -1;
 
         CTR_IS_DISABLED_THEN_GOTO(this, out);
+        CTR_IF_FOP_FAILED_THEN_GOTO (this, op_ret, op_errno, out);
         CTR_IF_INTERNAL_FOP_THEN_GOTO (frame, xdata, out);
 
 
@@ -626,6 +638,8 @@ ctr_removexattr_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
         }
 
 out:
+        ctr_free_frame_local (frame);
+
         STACK_UNWIND_STRICT (removexattr, frame, op_ret, op_errno, xdata);
 
         return 0;
@@ -673,7 +687,7 @@ ctr_truncate_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
         int ret = -1;
 
         CTR_IS_DISABLED_THEN_GOTO(this, out);
-
+        CTR_IF_FOP_FAILED_THEN_GOTO (this, op_ret, op_errno, out);
 
         ret = ctr_insert_unwind(frame, this,
                         GFDB_FOP_INODE_WRITE, GFDB_FOP_UNWIND);
@@ -685,6 +699,8 @@ ctr_truncate_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
 
 
 out:
+        ctr_free_frame_local (frame);
+
         STACK_UNWIND_STRICT (truncate, frame, op_ret, op_errno, prebuf,
                       postbuf, xdata);
 
@@ -731,6 +747,7 @@ ctr_ftruncate_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
         int ret = -1;
 
         CTR_IS_DISABLED_THEN_GOTO(this, out);
+        CTR_IF_FOP_FAILED_THEN_GOTO (this, op_ret, op_errno, out);
 
         ret = ctr_insert_unwind(frame, this,
                         GFDB_FOP_INODE_WRITE, GFDB_FOP_UNWIND);
@@ -741,6 +758,8 @@ ctr_ftruncate_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
         }
 
 out:
+        ctr_free_frame_local (frame);
+
         STACK_UNWIND_STRICT (ftruncate, frame, op_ret, op_errno, prebuf,
                       postbuf, xdata);
 
@@ -779,7 +798,6 @@ out:
 }
 
 /****************************rename******************************************/
-
 int32_t
 ctr_rename_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
                     int32_t op_ret, int32_t op_errno, struct iatt *buf,
@@ -787,19 +805,78 @@ ctr_rename_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
                     struct iatt *prenewparent, struct iatt *postnewparent,
                     dict_t *xdata)
 {
-        int ret = -1;
+        int ret                         = -1;
+        uint32_t remaining_links        = -1;
+        gf_ctr_local_t *ctr_local       = NULL;
+        gfdb_fop_type_t fop_type        = GFDB_FOP_INVALID_OP;
+        gfdb_fop_path_t fop_path        = GFDB_FOP_INVALID;
 
-        CTR_IS_DISABLED_THEN_GOTO(this, out);
+        GF_ASSERT(frame);
+        GF_ASSERT(this);
 
-        ret = ctr_insert_unwind(frame, this,
+        CTR_IS_DISABLED_THEN_GOTO (this, out);
+        CTR_IF_FOP_FAILED_THEN_GOTO (this, op_ret, op_errno, out);
+
+        ret = ctr_insert_unwind (frame, this,
                         GFDB_FOP_DENTRY_WRITE, GFDB_FOP_UNWIND);
         if (ret) {
                 gf_msg (this->name, GF_LOG_ERROR, 0,
                         CTR_MSG_INSERT_RENAME_UNWIND_FAILED,
                         "Failed to insert rename unwind");
+                goto out;
+        }
+
+        if (!xdata)
+                goto out;
+        /*
+         *
+         * Extracting GF_RESPONSE_LINK_COUNT_XDATA from POSIX Xlator
+         * This is only set when we are overwriting hardlinks.
+         *
+         * */
+        ret = dict_get_uint32 (xdata , GF_RESPONSE_LINK_COUNT_XDATA,
+                                &remaining_links);
+        if (ret) {
+                gf_msg (this->name, GF_LOG_ERROR, 0,
+                        CTR_MSG_GET_CTR_RESPONSE_LINK_COUNT_XDATA_FAILED,
+                        "Failed to getting GF_RESPONSE_LINK_COUNT_XDATA");
+                remaining_links = -1;
+                goto out;
+        }
+
+        ctr_local = frame->local;
+
+        /* This is not the only link */
+        if (remaining_links > 1) {
+                fop_type = GFDB_FOP_DENTRY_WRITE;
+                fop_path = GFDB_FOP_UNDEL;
+        }
+        /* Last link that was deleted */
+        else if (remaining_links == 1) {
+                fop_type = GFDB_FOP_DENTRY_WRITE;
+                fop_path = GFDB_FOP_UNDEL_ALL;
+        } else {
+                gf_msg (this->name, GF_LOG_ERROR, 0,
+                        CTR_MSG_INSERT_RENAME_UNWIND_FAILED,
+                        "Invalid link count from posix");
+                goto out;
+        }
+
+        ret = ctr_delete_hard_link_from_db (this,
+                                    CTR_DB_REC(ctr_local).old_gfid,
+                                    CTR_DB_REC(ctr_local).pargfid,
+                                    CTR_DB_REC(ctr_local).file_name,
+                                    fop_type, fop_path);
+        if (ret) {
+                gf_msg (this->name, GF_LOG_ERROR, 0,
+                        CTR_MSG_INSERT_UNLINK_UNWIND_FAILED,
+                        "Failed to delete records of %s",
+                        CTR_DB_REC(ctr_local).old_file_name);
         }
 
 out:
+        ctr_free_frame_local (frame);
+
         STACK_UNWIND_STRICT (rename, frame, op_ret, op_errno, buf,
                              preoldparent, postoldparent, prenewparent,
                              postnewparent,
@@ -812,12 +889,14 @@ int32_t
 ctr_rename (call_frame_t *frame, xlator_t *this, loc_t *oldloc,
                 loc_t *newloc, dict_t *xdata)
 {
-        int ret = -1;
+        int ret                                         = -1;
         gf_ctr_inode_context_t ctr_inode_cx;
-        gf_ctr_inode_context_t *_inode_cx = &ctr_inode_cx;
+        gf_ctr_inode_context_t *_inode_cx               = &ctr_inode_cx;
         gf_ctr_link_context_t new_link_cx, old_link_cx;
-        gf_ctr_link_context_t *_nlink_cx = &new_link_cx;
-        gf_ctr_link_context_t *_olink_cx = &old_link_cx;
+        gf_ctr_link_context_t *_nlink_cx                = &new_link_cx;
+        gf_ctr_link_context_t *_olink_cx                = &old_link_cx;
+        int is_dict_created                             = 0;
+        ctr_xlator_ctx_t *ctr_xlator_ctx                = NULL;
 
         CTR_IS_DISABLED_THEN_GOTO(this, out);
         CTR_IF_INTERNAL_FOP_THEN_GOTO (frame, xdata, out);
@@ -835,6 +914,20 @@ ctr_rename (call_frame_t *frame, xlator_t *this, loc_t *oldloc,
                 oldloc->inode->gfid, _nlink_cx, _olink_cx,
                 GFDB_FOP_DENTRY_WRITE, GFDB_FOP_WIND);
 
+
+        /* If the rename is a overwrite of hardlink
+         * rename ("file1", "file2")
+         * file1 is hardlink for gfid say 00000000-0000-0000-0000-00000000000A
+         * file2 is hardlink for gfid say 00000000-0000-0000-0000-00000000000B
+         * so we are saving file2 gfid in old_gfid so that we delete entries
+         * from the db during rename callback if the fop is successful
+         * */
+        if (newloc->inode) {
+                /* This is the GFID from where the newloc hardlink will be
+                 * unlinked */
+                _inode_cx->old_gfid = &newloc->inode->gfid;
+        }
+
         /* Is a metatdata fop */
         _inode_cx->is_metadata_fop = _gf_true;
 
@@ -852,6 +945,45 @@ ctr_rename (call_frame_t *frame, xlator_t *this, loc_t *oldloc,
                         gf_msg (this->name, GF_LOG_ERROR, 0,
                                 CTR_MSG_UPDATE_HARDLINK_FAILED, "Failed "
                                 "updating hard link in ctr inode context");
+                        goto out;
+                }
+
+                /* If the newloc has an inode. i.e aquiring hardlink of an
+                 * exisitng file i.e overwritting a file.
+                 * */
+                if (newloc->inode) {
+
+                        /* Getting the ctr inode context variable for
+                         * inode whose hardlink will be aquired during
+                         * the rename
+                         * */
+                        ctr_xlator_ctx = get_ctr_xlator_ctx (this,
+                                                                newloc->inode);
+                        if (!ctr_xlator_ctx) {
+                                /* Since there is no ctr inode context
+                                 * so nothing more to do */
+                                ret = 0;
+                                goto out;
+                        }
+
+                        /* Deleting hardlink from context variable */
+                        ret = ctr_delete_hard_link (this, ctr_xlator_ctx,
+                                                newloc->pargfid, newloc->name);
+                        if (ret) {
+                                gf_msg (this->name, GF_LOG_ERROR, 0,
+                                        CTR_MSG_DELETE_HARDLINK_FAILED,
+                                        "Failed to delete hard link");
+                                goto out;
+                        }
+
+                        /* Requesting for number of hardlinks on the newloc
+                         * inode from POSIX.
+                         * */
+                        is_dict_created = set_posix_link_request (this, &xdata);
+                        if (is_dict_created == -1) {
+                                ret = -1;
+                                goto out;
+                        }
                 }
         }
 
@@ -859,6 +991,11 @@ out:
         STACK_WIND (frame, ctr_rename_cbk, FIRST_CHILD (this),
                     FIRST_CHILD (this)->fops->rename,
                     oldloc, newloc, xdata);
+
+        if (is_dict_created == 1) {
+                dict_unref (xdata);
+        }
+
         return 0;
 }
 
@@ -872,6 +1009,7 @@ ctr_unlink_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
         uint32_t remaining_links                = -1;
 
         CTR_IS_DISABLED_THEN_GOTO(this, out);
+        CTR_IF_FOP_FAILED_THEN_GOTO (this, op_ret, op_errno, out);
 
         if (!xdata)
                 goto out;
@@ -914,6 +1052,8 @@ ctr_unlink_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
         }
 
 out:
+        ctr_free_frame_local (frame);
+
         STACK_UNWIND_STRICT (unlink, frame, op_ret, op_errno, preparent,
                              postparent, xdata);
 
@@ -1022,6 +1162,7 @@ ctr_fsync_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
         int ret = -1;
 
         CTR_IS_DISABLED_THEN_GOTO(this, out);
+        CTR_IF_FOP_FAILED_THEN_GOTO (this, op_ret, op_errno, out);
 
         ret = ctr_insert_unwind(frame, this, GFDB_FOP_INODE_WRITE,
                                 GFDB_FOP_UNWIND);
@@ -1032,6 +1173,8 @@ ctr_fsync_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
         }
 
 out:
+        ctr_free_frame_local (frame);
+
         STACK_UNWIND_STRICT (fsync, frame, op_ret, op_errno, prebuf, postbuf,
                       xdata);
 
@@ -1088,6 +1231,8 @@ ctr_setxattr_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
         }
 
 out:
+        ctr_free_frame_local (frame);
+
         STACK_UNWIND_STRICT (setxattr, frame, op_ret, op_errno, xdata);
 
         return 0;
@@ -1133,6 +1278,7 @@ ctr_fsetxattr_cbk (call_frame_t *frame,
         int ret = -1;
 
         CTR_IS_DISABLED_THEN_GOTO(this, out);
+        CTR_IF_FOP_FAILED_THEN_GOTO (this, op_ret, op_errno, out);
 
         ret = ctr_insert_unwind(frame, this, GFDB_FOP_INODE_WRITE,
                                 GFDB_FOP_UNWIND);
@@ -1143,6 +1289,8 @@ ctr_fsetxattr_cbk (call_frame_t *frame,
         }
 
 out:
+        ctr_free_frame_local (frame);
+
         STACK_UNWIND_STRICT (fsetxattr, frame, op_ret, op_errno, xdata);
 
         return 0;
@@ -1192,6 +1340,7 @@ ctr_mknod_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
         ctr_heal_ret_val_t ret_val = CTR_CTX_ERROR;
 
         CTR_IS_DISABLED_THEN_GOTO(this, out);
+        CTR_IF_FOP_FAILED_THEN_GOTO (this, op_ret, op_errno, out);
 
         /* Add hard link to the list */
         ret_val = add_hard_link_ctx (frame, this, inode);
@@ -1208,6 +1357,8 @@ ctr_mknod_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
         }
 
 out:
+        ctr_free_frame_local (frame);
+
         STACK_UNWIND_STRICT (mknod, frame, op_ret, op_errno, inode, buf,
                 preparent, postparent, xdata);
 
@@ -1276,7 +1427,7 @@ ctr_create_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
         int ret = -1;
 
         CTR_IS_DISABLED_THEN_GOTO(this, out);
-
+        CTR_IF_FOP_FAILED_THEN_GOTO (this, op_ret, op_errno, out);
 
         ret = add_hard_link_ctx (frame, this, inode);
         if (ret) {
@@ -1294,6 +1445,8 @@ ctr_create_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
         }
 
 out:
+        ctr_free_frame_local (frame);
+
         STACK_UNWIND_STRICT (create, frame, op_ret, op_errno, fd, inode,
                              stbuf,
                         preparent, postparent, xdata);
@@ -1373,6 +1526,7 @@ ctr_link_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
         int ret = -1;
 
         CTR_IS_DISABLED_THEN_GOTO(this, out);
+        CTR_IF_FOP_FAILED_THEN_GOTO (this, op_ret, op_errno, out);
 
         /* Add hard link to the list */
         ret = add_hard_link_ctx (frame, this, inode);
@@ -1389,6 +1543,8 @@ ctr_link_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
         }
 
 out:
+        ctr_free_frame_local (frame);
+
         STACK_UNWIND_STRICT (link, frame, op_ret, op_errno, inode, stbuf,
                        preparent, postparent, xdata);
         return 0;
@@ -1456,6 +1612,7 @@ int ctr_readv_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
         int ret = -1;
 
         CTR_IS_DISABLED_THEN_GOTO(this, out);
+        CTR_IF_FOP_FAILED_THEN_GOTO (this, op_ret, op_errno, out);
 
         ret = ctr_insert_unwind(frame, this, GFDB_FOP_INODE_READ,
                                 GFDB_FOP_UNWIND);
@@ -1466,6 +1623,8 @@ int ctr_readv_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
         }
 
 out:
+        ctr_free_frame_local (frame);
+
         STACK_UNWIND_STRICT (readv, frame, op_ret, op_errno, vector, count,
                                 stbuf, iobref, xdata);
         return 0;
index ab918eac825c9be6471a310e0dae8fd043af2f4b..ba48a70f58324e55c6422aea994289c318216eea 100644 (file)
@@ -120,9 +120,16 @@ fill_db_record_for_wind (xlator_t               *this,
                 memset(ctr_wtime, 0, sizeof(*ctr_wtime));
         }
 
-        /*Copy gfid into db record*/
+        /* Copy gfid into db record */
         gf_uuid_copy (CTR_DB_REC(ctr_local).gfid, *(ctr_inode_cx->gfid));
 
+        /* Copy older gfid if any */
+        if (ctr_inode_cx->old_gfid &&
+                (!gf_uuid_is_null (*(ctr_inode_cx->old_gfid)))) {
+                gf_uuid_copy (CTR_DB_REC(ctr_local).old_gfid,
+                                *(ctr_inode_cx->old_gfid));
+        }
+
         /*Hard Links*/
         if (isdentryfop(ctr_inode_cx->fop_type)) {
                 /*new link fop*/
index 6ddc5b8ca195a393278c4524fdb44c8185f135d9..025965898d268ccb95baeb9b0f203bca38fff846 100644 (file)
@@ -167,6 +167,7 @@ typedef struct gf_ctr_link_context {
 typedef struct gf_ctr_inode_context {
         ia_type_t               ia_type;
         uuid_t                  *gfid;
+        uuid_t                  *old_gfid;
         gf_ctr_link_context_t   *new_link_cx;
         gf_ctr_link_context_t   *old_link_cx;
         gfdb_fop_type_t         fop_type;
@@ -230,10 +231,10 @@ do {\
                                 _fop_type,\
                                 _fop_path)\
 do {\
-        GF_ASSERT(ctr_inode_cx);\
-        GF_ASSERT(_gfid);\
-        GF_ASSERT(_fop_type != GFDB_FOP_INVALID_OP);\
-        GF_ASSERT(_fop_path != GFDB_FOP_INVALID);\
+        GF_ASSERT (ctr_inode_cx);\
+        GF_ASSERT (_gfid);\
+        GF_ASSERT (_fop_type != GFDB_FOP_INVALID_OP);\
+        GF_ASSERT (_fop_path != GFDB_FOP_INVALID);\
         memset(ctr_inode_cx, 0, sizeof(*ctr_inode_cx));\
         ctr_inode_cx->ia_type = _ia_type;\
         ctr_inode_cx->gfid = &_gfid;\
@@ -247,12 +248,74 @@ do {\
         ctr_inode_cx->fop_path = _fop_path;\
 } while (0)
 
+
 /******************************************************************************
  *
  *                      Util functions or macros used by
  *                      insert wind and insert unwind
  *
  * ****************************************************************************/
+/* Free ctr frame local */
+static inline void
+ctr_free_frame_local (call_frame_t *frame) {
+        if (frame) {
+                free_ctr_local ((gf_ctr_local_t *) frame->local);
+                frame->local = NULL;
+        }
+}
+
+/* Setting GF_REQUEST_LINK_COUNT_XDATA in dict
+ * that has to be sent to POSIX Xlator to send
+ * link count in unwind path.
+ * return 0 for success with not creation of dict
+ * return 1 for success with creation of dict
+ * return -1 for failure.
+ * */
+static inline int
+set_posix_link_request (xlator_t        *this,
+                        dict_t          **xdata)
+{
+        int ret                         = -1;
+        gf_boolean_t is_created         = _gf_false;
+
+        GF_VALIDATE_OR_GOTO ("ctr", this, out);
+        GF_VALIDATE_OR_GOTO (this->name, xdata, out);
+
+        /*create xdata if NULL*/
+        if (!*xdata) {
+                *xdata = dict_new();
+                is_created = _gf_true;
+                ret = 1;
+        } else {
+                ret = 0;
+        }
+
+        if (!*xdata) {
+                gf_msg (this->name, GF_LOG_ERROR, 0, CTR_MSG_XDATA_NULL,
+                        "xdata is NULL :Cannot send "
+                        "GF_REQUEST_LINK_COUNT_XDATA to posix");
+                ret = -1;
+                goto out;
+        }
+
+        ret = dict_set_int32 (*xdata, GF_REQUEST_LINK_COUNT_XDATA, 1);
+        if (ret) {
+                gf_msg (this->name, GF_LOG_ERROR, 0,
+                        CTR_MSG_SET_CTR_RESPONSE_LINK_COUNT_XDATA_FAILED,
+                        "Failed setting GF_REQUEST_LINK_COUNT_XDATA");
+                ret = -1;
+                goto out;
+        }
+        ret = 0;
+out:
+        if (ret == -1) {
+                if (*xdata && is_created) {
+                        dict_unref (*xdata);
+                }
+        }
+        return ret;
+}
+
 
 /*
  * If a bitrot fop
@@ -293,8 +356,8 @@ do {\
  * Internal fop
  *
  * */
-static inline
-gf_boolean_t is_internal_fop (call_frame_t *frame,
+static inline gf_boolean_t
+is_internal_fop (call_frame_t *frame,
                               dict_t       *xdata)
 {
         gf_boolean_t ret = _gf_false;
@@ -327,6 +390,15 @@ do {\
                         goto label; \
 } while (0)
 
+/* if fop has failed exit */
+#define CTR_IF_FOP_FAILED_THEN_GOTO(this, op_ret, op_errno, label)\
+do {\
+        if (op_ret == -1) {\
+                gf_msg_trace (this->name, 0, "Failed fop with %s",\
+                              strerror (op_errno));\
+                goto label;\
+        };\
+} while (0)
 
 /*
  * IS CTR Xlator is disabled then goto to label
@@ -372,6 +444,7 @@ fill_db_record_for_wind (xlator_t                *this,
  * This function creates ctr_local structure into the frame of the fop
  * call.
  * ****************************************************************************/
+
 static inline int
 ctr_insert_wind (call_frame_t                    *frame,
                 xlator_t                        *this,
@@ -538,8 +611,60 @@ ctr_insert_unwind (call_frame_t          *frame,
         }
         ret = 0;
 out:
-        free_ctr_local (ctr_local);
-        frame->local = NULL;
+        return ret;
+}
+
+/******************************************************************************
+ *                          Delete file/flink record/s from db
+ * ****************************************************************************/
+static inline int
+ctr_delete_hard_link_from_db (xlator_t               *this,
+                              uuid_t                 gfid,
+                              uuid_t                 pargfid,
+                              char                   *basename,
+                              gfdb_fop_type_t        fop_type,
+                              gfdb_fop_path_t        fop_path)
+{
+        int                     ret                = -1;
+        gfdb_db_record_t        gfdb_db_record;
+        gf_ctr_private_t        *_priv             = NULL;
+
+        _priv = this->private;
+        GF_VALIDATE_OR_GOTO (this->name, _priv, out);
+        GF_VALIDATE_OR_GOTO (this->name, (!gf_uuid_is_null (gfid)), out);
+        GF_VALIDATE_OR_GOTO (this->name, (!gf_uuid_is_null (pargfid)), out);
+        GF_VALIDATE_OR_GOTO (this->name, (fop_type == GFDB_FOP_DENTRY_WRITE),
+                             out);
+        GF_VALIDATE_OR_GOTO (this->name,
+                             (fop_path == GFDB_FOP_UNDEL || GFDB_FOP_UNDEL_ALL),
+                             out);
+
+        /* Set gfdb_db_record to 0 */
+        memset (&gfdb_db_record, 0, sizeof(gfdb_db_record));
+
+        /* Copy gfid into db record */
+        gf_uuid_copy (gfdb_db_record.gfid, gfid);
+
+        /* Copy pargid into db record */
+        gf_uuid_copy (gfdb_db_record.pargfid, pargfid);
+
+        /* Copy basename */
+        strncpy (gfdb_db_record.file_name, basename, GF_NAME_MAX - 1);
+
+        gfdb_db_record.gfdb_fop_path = fop_path;
+        gfdb_db_record.gfdb_fop_type = fop_type;
+
+        /*send delete request to db*/
+        ret = insert_record (_priv->_db_conn, &gfdb_db_record);
+        if (ret) {
+                gf_msg (this->name, GF_LOG_ERROR, 0,
+                        CTR_MSG_INSERT_RECORD_WIND_FAILED,
+                        "Failed to delete record. %s", basename);
+                goto out;
+        }
+
+        ret = 0;
+out:
         return ret;
 }