454124fadb175b4c51eaedd10f364b25cd4991a5
[obnox/glusterfs.git] / xlators / cluster / afr / src / afr-inode-write.c
1 /*
2   Copyright (c) 2008-2012 Red Hat, Inc. <http://www.redhat.com>
3   This file is part of GlusterFS.
4
5   This file is licensed to you under your choice of the GNU Lesser
6   General Public License, version 3 or any later version (LGPLv3 or
7   later), or the GNU General Public License, version 2 (GPLv2), in all
8   cases as published by the Free Software Foundation.
9 */
10
11
12 #include <libgen.h>
13 #include <unistd.h>
14 #include <fnmatch.h>
15 #include <sys/time.h>
16 #include <stdlib.h>
17 #include <signal.h>
18
19 #include "glusterfs.h"
20 #include "afr.h"
21 #include "dict.h"
22 #include "xlator.h"
23 #include "hashfn.h"
24 #include "logging.h"
25 #include "stack.h"
26 #include "list.h"
27 #include "call-stub.h"
28 #include "defaults.h"
29 #include "common-utils.h"
30 #include "compat-errno.h"
31 #include "compat.h"
32 #include "protocol-common.h"
33 #include "byte-order.h"
34 #include "afr-transaction.h"
35 #include "afr-self-heal.h"
36 #include "afr-messages.h"
37
38 static void
39 __afr_inode_write_finalize (call_frame_t *frame, xlator_t *this)
40 {
41         afr_local_t *local = NULL;
42         afr_private_t *priv = NULL;
43         int read_subvol = 0;
44         int i = 0;
45
46         local = frame->local;
47         priv = this->private;
48
49         if (local->inode) {
50                 if (local->transaction.type == AFR_METADATA_TRANSACTION)
51                         read_subvol = afr_metadata_subvol_get (local->inode, this,
52                                                                NULL, NULL,
53                                                                NULL);
54                 else
55                         read_subvol = afr_data_subvol_get (local->inode, this,
56                                                            NULL, NULL, NULL);
57         }
58
59         local->op_ret = -1;
60         local->op_errno = afr_final_errno (local, priv);
61
62         for (i = 0; i < priv->child_count; i++) {
63                 if (!local->replies[i].valid)
64                         continue;
65                 if (local->replies[i].op_ret < 0) {
66                         afr_inode_read_subvol_reset (local->inode, this);
67                         continue;
68                 }
69
70                 /* Order of checks in the compound conditional
71                    below is important.
72
73                    - Highest precedence: largest op_ret
74                    - Next precendence: if all op_rets are equal, read subvol
75                    - Least precedence: any succeeded subvol
76                 */
77                 if ((local->op_ret < local->replies[i].op_ret) ||
78                     ((local->op_ret == local->replies[i].op_ret) &&
79                      (i == read_subvol))) {
80
81                         local->op_ret = local->replies[i].op_ret;
82                         local->op_errno = local->replies[i].op_errno;
83
84                         local->cont.inode_wfop.prebuf =
85                                 local->replies[i].prestat;
86                         local->cont.inode_wfop.postbuf =
87                                 local->replies[i].poststat;
88
89                         if (local->replies[i].xdata) {
90                                 if (local->xdata_rsp)
91                                         dict_unref (local->xdata_rsp);
92                                 local->xdata_rsp =
93                                         dict_ref (local->replies[i].xdata);
94                         }
95                         if (local->replies[i].xattr) {
96                                 if (local->xattr_rsp)
97                                         dict_unref (local->xattr_rsp);
98                                 local->xattr_rsp =
99                                         dict_ref (local->replies[i].xattr);
100                         }
101                 }
102         }
103
104         afr_txn_arbitrate_fop_cbk (frame, this);
105 }
106
107
108 static void
109 __afr_inode_write_fill (call_frame_t *frame, xlator_t *this, int child_index,
110                         int op_ret, int op_errno,
111                         struct iatt *prebuf, struct iatt *postbuf,
112                         dict_t *xattr, dict_t *xdata)
113 {
114         afr_local_t *local = NULL;
115         afr_private_t *priv = NULL;
116
117         local = frame->local;
118         priv = this->private;
119
120         local->replies[child_index].valid = 1;
121
122         if (AFR_IS_ARBITER_BRICK(priv, child_index) && op_ret == 1)
123                 op_ret = iov_length (local->cont.writev.vector,
124                                      local->cont.writev.count);
125
126         local->replies[child_index].op_ret = op_ret;
127         local->replies[child_index].op_errno = op_errno;
128
129         if (op_ret >= 0) {
130                 if (prebuf)
131                         local->replies[child_index].prestat = *prebuf;
132                 if (postbuf)
133                         local->replies[child_index].poststat = *postbuf;
134                 if (xattr)
135                         local->replies[child_index].xattr = dict_ref (xattr);
136                 if (xdata)
137                         local->replies[child_index].xdata = dict_ref (xdata);
138         } else {
139                 afr_transaction_fop_failed (frame, this, child_index);
140         }
141
142         return;
143 }
144
145
146 static int
147 __afr_inode_write_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
148                        int32_t op_ret, int32_t op_errno, struct iatt *prebuf,
149                        struct iatt *postbuf, dict_t *xattr, dict_t *xdata)
150 {
151         afr_local_t *local = NULL;
152         int child_index = (long) cookie;
153         int call_count = -1;
154
155         local = frame->local;
156
157         LOCK (&frame->lock);
158         {
159                 __afr_inode_write_fill (frame, this, child_index, op_ret,
160                                         op_errno, prebuf, postbuf, xattr,
161                                         xdata);
162         }
163         UNLOCK (&frame->lock);
164
165         call_count = afr_frame_return (frame);
166
167         if (call_count == 0) {
168                 __afr_inode_write_finalize (frame, this);
169
170                 if (afr_txn_nothing_failed (frame, this))
171                         local->transaction.unwind (frame, this);
172
173                 local->transaction.resume (frame, this);
174         }
175
176         return 0;
177 }
178
179 /* {{{ writev */
180
181 void
182 afr_writev_copy_outvars (call_frame_t *src_frame, call_frame_t *dst_frame)
183 {
184         afr_local_t *src_local = NULL;
185         afr_local_t *dst_local = NULL;
186
187         src_local = src_frame->local;
188         dst_local = dst_frame->local;
189
190         dst_local->op_ret = src_local->op_ret;
191         dst_local->op_errno = src_local->op_errno;
192         dst_local->cont.inode_wfop.prebuf = src_local->cont.inode_wfop.prebuf;
193         dst_local->cont.inode_wfop.postbuf = src_local->cont.inode_wfop.postbuf;
194         if (src_local->xdata_rsp)
195                 dst_local->xdata_rsp = dict_ref (src_local->xdata_rsp);
196 }
197
198 void
199 afr_writev_unwind (call_frame_t *frame, xlator_t *this)
200 {
201         afr_local_t *   local = NULL;
202         local = frame->local;
203
204         AFR_STACK_UNWIND (writev, frame,
205                           local->op_ret, local->op_errno,
206                           &local->cont.inode_wfop.prebuf,
207                           &local->cont.inode_wfop.postbuf,
208                           local->xdata_rsp);
209 }
210
211
212 int
213 afr_transaction_writev_unwind (call_frame_t *frame, xlator_t *this)
214 {
215         call_frame_t *fop_frame = NULL;
216
217         fop_frame = afr_transaction_detach_fop_frame (frame);
218
219         if (fop_frame) {
220                 afr_writev_copy_outvars (frame, fop_frame);
221                 afr_writev_unwind (fop_frame, this);
222         }
223         return 0;
224 }
225
226 static void
227 afr_writev_handle_short_writes (call_frame_t *frame, xlator_t *this)
228 {
229         afr_local_t   *local = NULL;
230         afr_private_t *priv  = NULL;
231         int           i      = 0;
232
233         local = frame->local;
234         priv = this->private;
235         /*
236          * We already have the best case result of the writev calls staged
237          * as the return value. Any writev that returns some value less
238          * than the best case is now out of sync, so mark the fop as
239          * failed. Note that fops that have returned with errors have
240          * already been marked as failed.
241          */
242         for (i = 0; i < priv->child_count; i++) {
243                 if ((!local->replies[i].valid) ||
244                     (local->replies[i].op_ret == -1))
245                         continue;
246
247                 if (local->replies[i].op_ret < local->op_ret)
248                         afr_transaction_fop_failed (frame, this, i);
249         }
250 }
251
252 int
253 afr_writev_wind_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
254                      int32_t op_ret, int32_t op_errno, struct iatt *prebuf,
255                      struct iatt *postbuf, dict_t *xdata)
256 {
257         afr_local_t *   local = NULL;
258         call_frame_t    *fop_frame = NULL;
259         int child_index = (long) cookie;
260         int call_count  = -1;
261         int ret = 0;
262         uint32_t open_fd_count = 0;
263         uint32_t write_is_append = 0;
264
265         local = frame->local;
266
267         LOCK (&frame->lock);
268         {
269                 __afr_inode_write_fill (frame, this, child_index, op_ret,
270                                         op_errno, prebuf, postbuf, NULL, xdata);
271                 if (op_ret == -1 || !xdata)
272                         goto unlock;
273
274                 write_is_append = 0;
275                 ret = dict_get_uint32 (xdata, GLUSTERFS_WRITE_IS_APPEND,
276                                        &write_is_append);
277                 if (ret || !write_is_append)
278                         local->append_write = _gf_false;
279
280                 ret = dict_get_uint32 (xdata, GLUSTERFS_OPEN_FD_COUNT,
281                                        &open_fd_count);
282                 if (ret == -1)
283                         goto unlock;
284                 if ((open_fd_count > local->open_fd_count)) {
285                         local->open_fd_count = open_fd_count;
286                         local->update_open_fd_count = _gf_true;
287                 }
288         }
289 unlock:
290         UNLOCK (&frame->lock);
291
292         call_count = afr_frame_return (frame);
293
294         if (call_count == 0) {
295                 if (!local->stable_write && !local->append_write)
296                         /* An appended write removes the necessity to
297                            fsync() the file. This is because self-heal
298                            has the logic to check for larger file when
299                            the xattrs are not reliably pointing at
300                            a stale file.
301                         */
302                         afr_fd_report_unstable_write (this, local->fd);
303
304                 __afr_inode_write_finalize (frame, this);
305
306                 afr_writev_handle_short_writes (frame, this);
307
308                 if (local->update_open_fd_count)
309                         afr_handle_open_fd_count (frame, this);
310
311                 if (!afr_txn_nothing_failed (frame, this)) {
312                         //Don't unwind until post-op is complete
313                         local->transaction.resume (frame, this);
314                 } else {
315                 /*
316                  * Generally inode-write fops do transaction.unwind then
317                  * transaction.resume, but writev needs to make sure that
318                  * delayed post-op frame is placed in fdctx before unwind
319                  * happens. This prevents the race of flush doing the
320                  * changelog wakeup first in fuse thread and then this
321                  * writev placing its delayed post-op frame in fdctx.
322                  * This helps flush make sure all the delayed post-ops are
323                  * completed.
324                  */
325
326                         fop_frame = afr_transaction_detach_fop_frame (frame);
327                         afr_writev_copy_outvars (frame, fop_frame);
328                         local->transaction.resume (frame, this);
329                         afr_writev_unwind (fop_frame, this);
330                 }
331         }
332         return 0;
333 }
334
335 static int
336 afr_arbiter_writev_wind (call_frame_t *frame, xlator_t *this, int subvol)
337 {
338         afr_local_t *local = frame->local;
339         afr_private_t *priv = this->private;
340         static char byte = 0xFF;
341         static struct iovec vector = {&byte, 1};
342         int32_t count = 1;
343
344         STACK_WIND_COOKIE (frame, afr_writev_wind_cbk, (void *) (long) subvol,
345                            priv->children[subvol],
346                            priv->children[subvol]->fops->writev,
347                            local->fd, &vector, count, local->cont.writev.offset,
348                            local->cont.writev.flags, local->cont.writev.iobref,
349                            local->xdata_req);
350
351         return 0;
352 }
353
354 int
355 afr_writev_wind (call_frame_t *frame, xlator_t *this, int subvol)
356 {
357         afr_local_t *local = NULL;
358         afr_private_t *priv = NULL;
359
360         local = frame->local;
361         priv = this->private;
362
363         if (AFR_IS_ARBITER_BRICK(priv, subvol)) {
364                 afr_arbiter_writev_wind (frame, this, subvol);
365                 return 0;
366         }
367
368         STACK_WIND_COOKIE (frame, afr_writev_wind_cbk, (void *) (long) subvol,
369                            priv->children[subvol],
370                            priv->children[subvol]->fops->writev,
371                            local->fd, local->cont.writev.vector,
372                            local->cont.writev.count, local->cont.writev.offset,
373                            local->cont.writev.flags, local->cont.writev.iobref,
374                            local->xdata_req);
375         return 0;
376 }
377
378
379 int
380 afr_do_writev (call_frame_t *frame, xlator_t *this)
381 {
382         call_frame_t    *transaction_frame = NULL;
383         afr_local_t     *local             = NULL;
384         afr_private_t   *priv              = NULL;
385         int             ret   = -1;
386         int             op_errno = ENOMEM;
387
388         transaction_frame = copy_frame (frame);
389         if (!transaction_frame)
390                 goto out;
391
392         local = frame->local;
393         priv = this->private;
394         transaction_frame->local = local;
395         frame->local = NULL;
396
397         if (!AFR_FRAME_INIT (frame, op_errno))
398                 goto out;
399
400         local->op = GF_FOP_WRITE;
401
402         local->transaction.wind   = afr_writev_wind;
403         local->transaction.fop    = __afr_txn_write_fop;
404         local->transaction.done   = __afr_txn_write_done;
405         local->transaction.unwind = afr_transaction_writev_unwind;
406
407         local->transaction.main_frame = frame;
408
409         if (local->fd->flags & O_APPEND) {
410                /*
411                 * Backend vfs ignores the 'offset' for append mode fd so
412                 * locking just the region provided for the writev does not
413                 * give consistency guarantee. The actual write may happen at a
414                 * completely different range than the one provided by the
415                 * offset, len in the fop. So lock the entire file.
416                 */
417                 local->transaction.start   = 0;
418                 local->transaction.len     = 0;
419         } else {
420                 local->transaction.start   = local->cont.writev.offset;
421                 local->transaction.len     = iov_length (local->cont.writev.vector,
422                                                          local->cont.writev.count);
423
424                 /*Lock entire file to avoid network split brains.*/
425                 if (priv->arbiter_count == 1) {
426                         local->transaction.start   = 0;
427                         local->transaction.len     = 0;
428                 }
429         }
430
431         ret = afr_transaction (transaction_frame, this, AFR_DATA_TRANSACTION);
432         if (ret < 0) {
433             op_errno = -ret;
434             goto out;
435         }
436
437         return 0;
438 out:
439         if (transaction_frame)
440                 AFR_STACK_DESTROY (transaction_frame);
441
442         AFR_STACK_UNWIND (writev, frame, -1, op_errno, NULL, NULL, NULL);
443         return 0;
444 }
445
446
447 int
448 afr_writev (call_frame_t *frame, xlator_t *this, fd_t *fd,
449             struct iovec *vector, int32_t count, off_t offset,
450             uint32_t flags, struct iobref *iobref, dict_t *xdata)
451 {
452         afr_local_t *local = NULL;
453         int op_errno = ENOMEM;
454
455         local = AFR_FRAME_INIT (frame, op_errno);
456         if (!local)
457                 goto out;
458
459         local->cont.writev.vector = iov_dup (vector, count);
460         if (!local->cont.writev.vector)
461                 goto out;
462         local->cont.writev.count      = count;
463         local->cont.writev.offset     = offset;
464         local->cont.writev.flags      = flags;
465         local->cont.writev.iobref     = iobref_ref (iobref);
466
467         if (xdata)
468                 local->xdata_req = dict_copy_with_ref (xdata, NULL);
469         else
470                 local->xdata_req = dict_new ();
471
472         if (!local->xdata_req)
473                 goto out;
474
475         local->fd = fd_ref (fd);
476         local->inode = inode_ref (fd->inode);
477
478         if (dict_set_uint32 (local->xdata_req, GLUSTERFS_OPEN_FD_COUNT, 4)) {
479                 op_errno = ENOMEM;
480                 goto out;
481         }
482
483         if (dict_set_uint32 (local->xdata_req, GLUSTERFS_WRITE_IS_APPEND, 4)) {
484                 op_errno = ENOMEM;
485                 goto out;
486         }
487
488         /* Set append_write to be true speculatively. If on any
489            server it turns not be true, we unset it in the
490            callback.
491         */
492         local->append_write = _gf_true;
493
494         /* detect here, but set it in writev_wind_cbk *after* the unstable
495            write is performed
496         */
497         local->stable_write = !!((fd->flags|flags)&(O_SYNC|O_DSYNC));
498
499         afr_fix_open (fd, this);
500
501         afr_do_writev (frame, this);
502
503         return 0;
504 out:
505         AFR_STACK_UNWIND (writev, frame, -1, op_errno, NULL, NULL, NULL);
506
507         return 0;
508 }
509
510
511 /* }}} */
512
513 /* {{{ truncate */
514
515 int
516 afr_truncate_unwind (call_frame_t *frame, xlator_t *this)
517 {
518         afr_local_t *   local = NULL;
519         call_frame_t   *main_frame = NULL;
520
521         local = frame->local;
522
523         main_frame = afr_transaction_detach_fop_frame (frame);
524         if (!main_frame)
525                 return 0;
526
527         AFR_STACK_UNWIND (truncate, main_frame, local->op_ret, local->op_errno,
528                           &local->cont.inode_wfop.prebuf,
529                           &local->cont.inode_wfop.postbuf, local->xdata_rsp);
530         return 0;
531 }
532
533
534 int
535 afr_truncate_wind_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
536                        int32_t op_ret, int32_t op_errno, struct iatt *prebuf,
537                        struct iatt *postbuf, dict_t *xdata)
538 {
539         afr_local_t *local = NULL;
540
541         local = frame->local;
542
543         if (op_ret == 0 && prebuf->ia_size != postbuf->ia_size)
544                 local->stable_write = _gf_false;
545
546         return __afr_inode_write_cbk (frame, cookie, this, op_ret, op_errno,
547                                       prebuf, postbuf, NULL, xdata);
548 }
549
550
551 int
552 afr_truncate_wind (call_frame_t *frame, xlator_t *this, int subvol)
553 {
554         afr_local_t *local = NULL;
555         afr_private_t *priv = NULL;
556
557         local = frame->local;
558         priv = this->private;
559
560         STACK_WIND_COOKIE (frame, afr_truncate_wind_cbk, (void *) (long) subvol,
561                            priv->children[subvol],
562                            priv->children[subvol]->fops->truncate,
563                            &local->loc, local->cont.truncate.offset,
564                            local->xdata_req);
565         return 0;
566 }
567
568
569 int
570 afr_truncate (call_frame_t *frame, xlator_t *this,
571               loc_t *loc, off_t offset, dict_t *xdata)
572 {
573         afr_local_t   * local = NULL;
574         call_frame_t   *transaction_frame = NULL;
575         int ret = -1;
576         int op_errno = ENOMEM;
577
578         transaction_frame = copy_frame (frame);
579         if (!transaction_frame)
580                 goto out;
581
582         local = AFR_FRAME_INIT (transaction_frame, op_errno);
583         if (!local)
584                 goto out;
585
586         local->cont.truncate.offset  = offset;
587         if (xdata)
588                 local->xdata_req = dict_copy_with_ref (xdata, NULL);
589         else
590                 local->xdata_req = dict_new ();
591
592         if (!local->xdata_req)
593                 goto out;
594
595         local->transaction.wind   = afr_truncate_wind;
596         local->transaction.fop    = __afr_txn_write_fop;
597         local->transaction.done   = __afr_txn_write_done;
598         local->transaction.unwind = afr_truncate_unwind;
599
600         loc_copy (&local->loc, loc);
601         local->inode = inode_ref (loc->inode);
602
603         local->op = GF_FOP_TRUNCATE;
604
605         local->transaction.main_frame = frame;
606         local->transaction.start   = offset;
607         local->transaction.len     = 0;
608
609         /* Set it true speculatively, will get reset in afr_truncate_wind_cbk
610            if truncate was not a NOP */
611         local->stable_write = _gf_true;
612
613         ret = afr_transaction (transaction_frame, this, AFR_DATA_TRANSACTION);
614         if (ret < 0) {
615                 op_errno = -ret;
616                 goto out;
617         }
618
619         return 0;
620 out:
621         if (transaction_frame)
622                 AFR_STACK_DESTROY (transaction_frame);
623
624         AFR_STACK_UNWIND (truncate, frame, -1, op_errno, NULL, NULL, NULL);
625         return 0;
626 }
627
628
629 /* }}} */
630
631 /* {{{ ftruncate */
632
633
634 int
635 afr_ftruncate_unwind (call_frame_t *frame, xlator_t *this)
636 {
637         afr_local_t *   local = NULL;
638         call_frame_t   *main_frame = NULL;
639
640         local = frame->local;
641
642         main_frame = afr_transaction_detach_fop_frame (frame);
643         if (!main_frame)
644                 return 0;
645
646         AFR_STACK_UNWIND (ftruncate, main_frame, local->op_ret, local->op_errno,
647                           &local->cont.inode_wfop.prebuf,
648                           &local->cont.inode_wfop.postbuf, local->xdata_rsp);
649         return 0;
650 }
651
652
653 int
654 afr_ftruncate_wind_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
655                         int32_t op_ret, int32_t op_errno, struct iatt *prebuf,
656                         struct iatt *postbuf, dict_t *xdata)
657 {
658         afr_local_t *local = NULL;
659
660         local = frame->local;
661
662         if (op_ret == 0 && prebuf->ia_size != postbuf->ia_size)
663                 local->stable_write = _gf_false;
664
665         return __afr_inode_write_cbk (frame, cookie, this, op_ret, op_errno,
666                                       prebuf, postbuf, NULL, xdata);
667 }
668
669
670 int
671 afr_ftruncate_wind (call_frame_t *frame, xlator_t *this, int subvol)
672 {
673         afr_local_t *local = NULL;
674         afr_private_t *priv = NULL;
675
676         local = frame->local;
677         priv = this->private;
678
679         STACK_WIND_COOKIE (frame, afr_ftruncate_wind_cbk, (void *) (long) subvol,
680                            priv->children[subvol],
681                            priv->children[subvol]->fops->ftruncate,
682                            local->fd, local->cont.ftruncate.offset,
683                            local->xdata_req);
684         return 0;
685 }
686
687
688 int
689 afr_ftruncate (call_frame_t *frame, xlator_t *this, fd_t *fd, off_t offset,
690                dict_t *xdata)
691 {
692         afr_local_t *local = NULL;
693         call_frame_t *transaction_frame = NULL;
694         int ret = -1;
695         int op_errno = ENOMEM;
696
697         transaction_frame = copy_frame (frame);
698         if (!transaction_frame)
699                 goto out;
700
701         local = AFR_FRAME_INIT (transaction_frame, op_errno);
702         if (!local)
703                 goto out;
704
705         local->cont.ftruncate.offset  = offset;
706         if (xdata)
707                 local->xdata_req = dict_copy_with_ref (xdata, NULL);
708         else
709                 local->xdata_req = dict_new ();
710
711         if (!local->xdata_req)
712                 goto out;
713
714         local->fd = fd_ref (fd);
715         local->inode = inode_ref (fd->inode);
716
717         local->op = GF_FOP_FTRUNCATE;
718
719         local->transaction.wind   = afr_ftruncate_wind;
720         local->transaction.fop    = __afr_txn_write_fop;
721         local->transaction.done   = __afr_txn_write_done;
722         local->transaction.unwind = afr_ftruncate_unwind;
723
724         local->transaction.main_frame = frame;
725
726         local->transaction.start   = local->cont.ftruncate.offset;
727         local->transaction.len     = 0;
728
729         afr_fix_open (fd, this);
730
731         /* Set it true speculatively, will get reset in afr_ftruncate_wind_cbk
732            if truncate was not a NOP */
733         local->stable_write = _gf_true;
734
735         ret = afr_transaction (transaction_frame, this, AFR_DATA_TRANSACTION);
736         if (ret < 0) {
737                 op_errno = -ret;
738                 goto out;
739         }
740
741         return 0;
742 out:
743         AFR_STACK_UNWIND (ftruncate, frame, -1, op_errno, NULL, NULL, NULL);
744
745         return 0;
746 }
747
748 /* }}} */
749
750 /* {{{ setattr */
751
752 int
753 afr_setattr_unwind (call_frame_t *frame, xlator_t *this)
754 {
755         afr_local_t *local = NULL;
756         call_frame_t *main_frame = NULL;
757
758         local = frame->local;
759
760         main_frame = afr_transaction_detach_fop_frame (frame);
761         if (!main_frame)
762                 return 0;
763
764         AFR_STACK_UNWIND (setattr, main_frame, local->op_ret, local->op_errno,
765                           &local->cont.inode_wfop.prebuf,
766                           &local->cont.inode_wfop.postbuf,
767                           local->xdata_rsp);
768         return 0;
769 }
770
771
772 int
773 afr_setattr_wind_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
774                       int op_ret, int op_errno,
775                       struct iatt *preop, struct iatt *postop, dict_t *xdata)
776 {
777         return __afr_inode_write_cbk (frame, cookie, this, op_ret, op_errno,
778                                       preop, postop, NULL, xdata);
779 }
780
781
782 int
783 afr_setattr_wind (call_frame_t *frame, xlator_t *this, int subvol)
784 {
785         afr_local_t *local = NULL;
786         afr_private_t *priv = NULL;
787
788         local = frame->local;
789         priv = this->private;
790
791         STACK_WIND_COOKIE (frame, afr_setattr_wind_cbk, (void *) (long) subvol,
792                            priv->children[subvol],
793                            priv->children[subvol]->fops->setattr,
794                            &local->loc, &local->cont.setattr.in_buf,
795                            local->cont.setattr.valid, local->xdata_req);
796         return 0;
797 }
798
799
800 int
801 afr_setattr (call_frame_t *frame, xlator_t *this, loc_t *loc, struct iatt *buf,
802              int32_t valid, dict_t *xdata)
803 {
804         afr_local_t *local = NULL;
805         call_frame_t *transaction_frame = NULL;
806         int ret = -1;
807         int op_errno = ENOMEM;
808
809         transaction_frame = copy_frame (frame);
810         if (!transaction_frame)
811                 goto out;
812
813         local = AFR_FRAME_INIT (transaction_frame, op_errno);
814         if (!local)
815                 goto out;
816
817         local->cont.setattr.in_buf = *buf;
818         local->cont.setattr.valid  = valid;
819         if (xdata)
820                 local->xdata_req = dict_copy_with_ref (xdata, NULL);
821         else
822                 local->xdata_req = dict_new ();
823
824         if (!local->xdata_req)
825                 goto out;
826
827         local->transaction.wind   = afr_setattr_wind;
828         local->transaction.fop    = __afr_txn_write_fop;
829         local->transaction.done   = __afr_txn_write_done;
830         local->transaction.unwind = afr_setattr_unwind;
831
832         loc_copy (&local->loc, loc);
833         local->inode = inode_ref (loc->inode);
834
835         local->op = GF_FOP_SETATTR;
836
837         local->transaction.main_frame = frame;
838         local->transaction.start   = LLONG_MAX - 1;
839         local->transaction.len     = 0;
840
841         ret = afr_transaction (transaction_frame, this, AFR_METADATA_TRANSACTION);
842         if (ret < 0) {
843                 op_errno = -ret;
844                 goto out;
845         }
846
847         return 0;
848 out:
849         if (transaction_frame)
850                 AFR_STACK_DESTROY (transaction_frame);
851
852         AFR_STACK_UNWIND (setattr, frame, -1, op_errno, NULL, NULL, NULL);
853         return 0;
854 }
855
856 /* {{{ fsetattr */
857
858 int
859 afr_fsetattr_unwind (call_frame_t *frame, xlator_t *this)
860 {
861         afr_local_t *   local = NULL;
862         call_frame_t   *main_frame = NULL;
863
864         local = frame->local;
865
866         main_frame = afr_transaction_detach_fop_frame (frame);
867         if (!main_frame)
868                 return 0;
869
870         AFR_STACK_UNWIND (fsetattr, main_frame, local->op_ret, local->op_errno,
871                           &local->cont.inode_wfop.prebuf,
872                           &local->cont.inode_wfop.postbuf, local->xdata_rsp);
873         return 0;
874 }
875
876
877 int
878 afr_fsetattr_wind_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
879                        int32_t op_ret, int32_t op_errno,
880                        struct iatt *preop, struct iatt *postop, dict_t *xdata)
881 {
882         return __afr_inode_write_cbk (frame, cookie, this, op_ret, op_errno,
883                                       preop, postop, NULL, xdata);
884 }
885
886
887 int
888 afr_fsetattr_wind (call_frame_t *frame, xlator_t *this, int subvol)
889 {
890         afr_local_t *local = NULL;
891         afr_private_t *priv = NULL;
892
893         local = frame->local;
894         priv = this->private;
895
896         STACK_WIND_COOKIE (frame, afr_fsetattr_wind_cbk, (void *) (long) subvol,
897                            priv->children[subvol],
898                            priv->children[subvol]->fops->fsetattr,
899                            local->fd, &local->cont.fsetattr.in_buf,
900                            local->cont.fsetattr.valid, local->xdata_req);
901         return 0;
902 }
903
904
905 int
906 afr_fsetattr (call_frame_t *frame, xlator_t *this,
907               fd_t *fd, struct iatt *buf, int32_t valid, dict_t *xdata)
908 {
909         afr_local_t *local = NULL;
910         call_frame_t *transaction_frame = NULL;
911         int ret = -1;
912         int op_errno = ENOMEM;
913
914         transaction_frame = copy_frame (frame);
915         if (!transaction_frame)
916                 goto out;
917
918         local = AFR_FRAME_INIT (transaction_frame, op_errno);
919         if (!local)
920                 goto out;
921
922         local->cont.fsetattr.in_buf = *buf;
923         local->cont.fsetattr.valid  = valid;
924         if (xdata)
925                 local->xdata_req = dict_copy_with_ref (xdata, NULL);
926         else
927                 local->xdata_req = dict_new ();
928
929         if (!local->xdata_req)
930                 goto out;
931
932         local->transaction.wind   = afr_fsetattr_wind;
933         local->transaction.fop    = __afr_txn_write_fop;
934         local->transaction.done   = __afr_txn_write_done;
935         local->transaction.unwind = afr_fsetattr_unwind;
936
937         local->fd                 = fd_ref (fd);
938         local->inode = inode_ref (fd->inode);
939
940         local->op = GF_FOP_FSETATTR;
941
942         afr_fix_open (fd, this);
943
944         local->transaction.main_frame = frame;
945         local->transaction.start   = LLONG_MAX - 1;
946         local->transaction.len     = 0;
947
948         ret = afr_transaction (transaction_frame, this, AFR_METADATA_TRANSACTION);
949         if (ret < 0) {
950                 op_errno = -ret;
951                 goto out;
952         }
953
954         return 0;
955 out:
956         if (transaction_frame)
957                 AFR_STACK_DESTROY (transaction_frame);
958
959         AFR_STACK_UNWIND (fsetattr, frame, -1, op_errno, NULL, NULL, NULL);
960         return 0;
961 }
962
963
964 /* {{{ setxattr */
965
966
967 int
968 afr_setxattr_unwind (call_frame_t *frame, xlator_t *this)
969 {
970         afr_local_t *   local = NULL;
971         call_frame_t   *main_frame = NULL;
972
973         local = frame->local;
974
975         main_frame = afr_transaction_detach_fop_frame (frame);
976         if (!main_frame)
977                 return 0;
978
979         AFR_STACK_UNWIND (setxattr, main_frame, local->op_ret, local->op_errno,
980                           local->xdata_rsp);
981         return 0;
982 }
983
984
985 int
986 afr_setxattr_wind_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
987                        int32_t op_ret, int32_t op_errno, dict_t *xdata)
988 {
989         return __afr_inode_write_cbk (frame, cookie, this, op_ret, op_errno,
990                                       NULL, NULL, NULL, xdata);
991 }
992
993
994 int
995 afr_setxattr_wind (call_frame_t *frame, xlator_t *this, int subvol)
996 {
997         afr_local_t      *local         = NULL;
998         afr_private_t    *priv          = NULL;
999
1000         local = frame->local;
1001         priv = this->private;
1002
1003         STACK_WIND_COOKIE (frame, afr_setxattr_wind_cbk, (void *) (long) subvol,
1004                            priv->children[subvol],
1005                            priv->children[subvol]->fops->setxattr,
1006                            &local->loc, local->cont.setxattr.dict,
1007                            local->cont.setxattr.flags, local->xdata_req);
1008         return 0;
1009 }
1010
1011 int
1012 afr_rb_set_pending_changelog_cbk (call_frame_t *frame, void *cookie,
1013                                   xlator_t *this, int op_ret, int op_errno,
1014                                   dict_t *xattr, dict_t *xdata)
1015
1016 {
1017         afr_local_t *local = NULL;
1018         afr_private_t *priv = NULL;
1019         int i = 0;
1020
1021         local = frame->local;
1022         priv = this->private;
1023         i = (long) cookie;
1024
1025         local->replies[i].valid = 1;
1026         local->replies[i].op_ret = op_ret;
1027         local->replies[i].op_errno = op_errno;
1028         gf_msg (this->name, op_ret ? GF_LOG_ERROR : GF_LOG_INFO,
1029                 op_ret ? op_errno : 0,
1030                 AFR_MSG_REPLACE_BRICK_STATUS, "Set of pending xattr %s on"
1031                 " %s.", op_ret ? "failed" : "succeeded",
1032                 priv->children[i]->name);
1033
1034         syncbarrier_wake (&local->barrier);
1035         return 0;
1036 }
1037
1038 int
1039 afr_rb_set_pending_changelog (call_frame_t *frame, xlator_t *this,
1040                               unsigned char *locked_nodes)
1041 {
1042         afr_local_t *local = NULL;
1043         afr_private_t *priv = NULL;
1044         int ret = 0, i = 0;
1045
1046         local = frame->local;
1047         priv = this->private;
1048
1049         AFR_ONLIST (locked_nodes, frame, afr_rb_set_pending_changelog_cbk,
1050                     xattrop, &local->loc, GF_XATTROP_ADD_ARRAY,
1051                     local->xdata_req, NULL);
1052
1053         /* It is sufficient if xattrop was successful on one child */
1054         for (i = 0; i < priv->child_count; i++) {
1055                 if (!local->replies[i].valid)
1056                         continue;
1057
1058                 if (local->replies[i].op_ret == 0) {
1059                         ret = 0;
1060                         goto out;
1061                 } else {
1062                         ret = afr_higher_errno (ret,
1063                                                 local->replies[i].op_errno);
1064                 }
1065         }
1066 out:
1067         return -ret;
1068 }
1069
1070 int
1071 _afr_handle_replace_brick_type (xlator_t *this, call_frame_t *frame,
1072                                 loc_t *loc, int rb_index,
1073                                 afr_transaction_type type)
1074 {
1075         afr_local_t     *local            = NULL;
1076         afr_private_t   *priv             = NULL;
1077         unsigned char   *locked_nodes     = NULL;
1078         int              count            = 0;
1079         int              ret              = -ENOMEM;
1080         int              idx              = -1;
1081
1082         priv = this->private;
1083         local = frame->local;
1084
1085         locked_nodes = alloca0 (priv->child_count);
1086
1087         idx = afr_index_for_transaction_type (type);
1088
1089         local->pending = afr_matrix_create (priv->child_count,
1090                                             AFR_NUM_CHANGE_LOGS);
1091         if (!local->pending)
1092                 goto out;
1093
1094         local->pending[rb_index][idx] = hton32 (1);
1095
1096         local->xdata_req = dict_new ();
1097         if (!local->xdata_req)
1098                 goto out;
1099
1100         ret = afr_set_pending_dict (priv, local->xdata_req, local->pending);
1101         if (ret < 0)
1102                 goto out;
1103
1104         if (AFR_ENTRY_TRANSACTION == type) {
1105                 count = afr_selfheal_entrylk (frame, this, loc->inode,
1106                                               this->name, NULL, locked_nodes);
1107         } else {
1108                 count = afr_selfheal_inodelk (frame, this, loc->inode,
1109                                               this->name, LLONG_MAX - 1, 0,
1110                                               locked_nodes);
1111         }
1112
1113         if (!count) {
1114                 gf_log (this->name, GF_LOG_ERROR, "Couldn't acquire lock on"
1115                         " any child.");
1116                 ret = -EAGAIN;
1117                 goto unlock;
1118         }
1119
1120         ret = afr_rb_set_pending_changelog (frame, this, locked_nodes);
1121         if (ret)
1122                 goto unlock;
1123         ret = 0;
1124 unlock:
1125         if (AFR_ENTRY_TRANSACTION == type) {
1126                 afr_selfheal_unentrylk (frame, this, loc->inode, this->name,
1127                                         NULL, locked_nodes);
1128         } else {
1129                 afr_selfheal_uninodelk (frame, this, loc->inode, this->name,
1130                                         LLONG_MAX - 1, 0, locked_nodes);
1131         }
1132 out:
1133         return ret;
1134 }
1135
1136 int
1137 _afr_handle_replace_brick_cbk (int ret, call_frame_t *frame, void *opaque)
1138 {
1139         afr_replace_brick_args_t *data = NULL;
1140
1141         data = opaque;
1142         loc_wipe (&data->loc);
1143         GF_FREE (data);
1144         return 0;
1145 }
1146
1147 int
1148 _afr_handle_replace_brick (void *opaque)
1149 {
1150
1151         afr_local_t     *local          = NULL;
1152         afr_private_t   *priv           = NULL;
1153         int              rb_index       = -1;
1154         int              ret            = -1;
1155         int              op_errno       = ENOMEM;
1156         call_frame_t    *frame          = NULL;
1157         xlator_t        *this           = NULL;
1158         afr_replace_brick_args_t *data  = NULL;
1159
1160         data = opaque;
1161         frame = data->frame;
1162         rb_index = data->rb_index;
1163         this = frame->this;
1164         priv = this->private;
1165
1166         local = AFR_FRAME_INIT (frame, op_errno);
1167         if (!local)
1168                 goto out;
1169
1170         loc_copy (&local->loc, &data->loc);
1171
1172         gf_log (this->name, GF_LOG_DEBUG, "Child being replaced is : %s",
1173                 priv->children[rb_index]->name);
1174
1175         ret = _afr_handle_replace_brick_type (this, frame, &local->loc, rb_index,
1176                                               AFR_METADATA_TRANSACTION);
1177         if (ret) {
1178                 op_errno = -ret;
1179                 ret = -1;
1180                 goto out;
1181         }
1182
1183         dict_unref (local->xdata_req);
1184         afr_matrix_cleanup (local->pending, priv->child_count);
1185         local->pending = NULL;
1186         local->xdata_req = NULL;
1187
1188         ret = _afr_handle_replace_brick_type (this, frame, &local->loc, rb_index,
1189                                               AFR_ENTRY_TRANSACTION);
1190         if (ret) {
1191                 op_errno = -ret;
1192                 ret = -1;
1193                 goto out;
1194         }
1195         ret = 0;
1196 out:
1197         AFR_STACK_UNWIND (setxattr, frame, ret, op_errno, NULL);
1198         return 0;
1199 }
1200
1201
1202 int
1203 afr_split_brain_resolve_do (call_frame_t *frame, xlator_t *this, loc_t *loc,
1204                             char *data)
1205 {
1206         afr_local_t    *local             = NULL;
1207         int     ret                       = -1;
1208         int     op_errno                  = EINVAL;
1209
1210         local = frame->local;
1211         local->xdata_req = dict_new ();
1212
1213         if (!local->xdata_req) {
1214                 op_errno = ENOMEM;
1215                 goto out;
1216         }
1217
1218         ret = dict_set_int32 (local->xdata_req, "heal-op",
1219                               GF_SHD_OP_SBRAIN_HEAL_FROM_BRICK);
1220         if (ret) {
1221                 op_errno = -ret;
1222                 ret = -1;
1223                 goto out;
1224         }
1225         ret = dict_set_str (local->xdata_req, "child-name", data);
1226         if (ret) {
1227                 op_errno = -ret;
1228                 ret = -1;
1229                 goto out;
1230         }
1231         /* set spb choice to -1 whether heal succeeds or not:
1232          * If heal succeeds : spb-choice should be set to -1 as
1233          *                    it is no longer valid; file is not
1234          *                    in split-brain anymore.
1235          * If heal doesn't succeed:
1236          *                    spb-choice should be set to -1
1237          *                    otherwise reads will be served
1238          *                    from spb-choice which is misleading.
1239          */
1240         ret = afr_inode_split_brain_choice_set (loc->inode, this, -1);
1241         if (ret)
1242                 gf_msg (this->name, GF_LOG_WARNING, 0,
1243                         AFR_MSG_SPLIT_BRAIN_CHOICE_ERROR, "Failed to set"
1244                         "split-brain choice to -1");
1245         afr_heal_splitbrain_file (frame, this, loc);
1246         ret = 0;
1247 out:
1248         if (ret < 0)
1249                 AFR_STACK_UNWIND (setxattr, frame, -1, op_errno, NULL);
1250         return 0;
1251 }
1252
1253 int
1254 afr_get_split_brain_child_index (xlator_t *this, void *value, size_t len)
1255 {
1256         int             spb_child_index   = -1;
1257         char           *spb_child_str     = NULL;
1258
1259         spb_child_str =  alloca0 (len + 1);
1260         memcpy (spb_child_str, value, len);
1261
1262         if (!strcmp (spb_child_str, "none"))
1263                 return -2;
1264
1265         spb_child_index = afr_get_child_index_from_name (this,
1266                                                          spb_child_str);
1267         if (spb_child_index < 0) {
1268                 gf_msg (this->name, GF_LOG_ERROR, 0,
1269                         AFR_MSG_INVALID_SUBVOL, "Invalid subvol: %s",
1270                         spb_child_str);
1271         }
1272         return spb_child_index;
1273 }
1274
1275 int
1276 afr_can_set_split_brain_choice (void *opaque)
1277 {
1278         afr_spbc_timeout_t        *data         = opaque;
1279         call_frame_t              *frame        = NULL;
1280         xlator_t                  *this         = NULL;
1281         loc_t                     *loc          = NULL;
1282         int                        ret          = -1;
1283
1284         frame = data->frame;
1285         loc = data->loc;
1286         this = frame->this;
1287
1288         ret = afr_is_split_brain (frame, this, loc->inode, loc->gfid,
1289                                   &data->d_spb, &data->m_spb);
1290
1291         if (ret)
1292                 gf_msg (this->name, GF_LOG_ERROR, 0,
1293                         AFR_MSG_SPLIT_BRAIN_CHOICE_ERROR,
1294                         "Failed to determine if %s"
1295                         " is in split-brain. "
1296                         "Aborting split-brain-choice set.",
1297                         uuid_utoa (loc->gfid));
1298         return ret;
1299 }
1300
1301 int
1302 afr_handle_split_brain_commands (xlator_t *this, call_frame_t *frame,
1303                                 loc_t *loc, dict_t *dict)
1304 {
1305         void           *value             = NULL;
1306         afr_private_t  *priv              = NULL;
1307         afr_local_t    *local             = NULL;
1308         afr_spbc_timeout_t *data          = NULL;
1309         int             len               = 0;
1310         int             spb_child_index   = -1;
1311         int             ret               = -1;
1312         int             op_errno          = EINVAL;
1313
1314         priv = this->private;
1315
1316         local = AFR_FRAME_INIT (frame, op_errno);
1317         if (!local) {
1318                 ret = 1;
1319                 goto out;
1320         }
1321
1322         local->op = GF_FOP_SETXATTR;
1323
1324         ret =  dict_get_ptr_and_len (dict, GF_AFR_SBRAIN_CHOICE, &value,
1325                                      &len);
1326         if (value) {
1327                 spb_child_index = afr_get_split_brain_child_index (this, value,
1328                                                                    len);
1329                 if (spb_child_index < 0) {
1330                         /* Case where value was "none" */
1331                         if (spb_child_index == -2)
1332                                 spb_child_index = -1;
1333                         else {
1334                                 ret = 1;
1335                                 op_errno = EINVAL;
1336                                 goto out;
1337                         }
1338                 }
1339
1340                 data = GF_CALLOC (1, sizeof (*data), gf_afr_mt_spbc_timeout_t);
1341                 if (!data) {
1342                         ret = 1;
1343                         goto out;
1344                 }
1345                 data->spb_child_index = spb_child_index;
1346                 data->frame = frame;
1347                 data->loc = loc;
1348                 ret = synctask_new (this->ctx->env,
1349                                     afr_can_set_split_brain_choice,
1350                                     afr_set_split_brain_choice, NULL, data);
1351                 if (ret) {
1352                         gf_msg (this->name, GF_LOG_ERROR, 0,
1353                                 AFR_MSG_SPLIT_BRAIN_CHOICE_ERROR,
1354                                 "Failed to create"
1355                                 " synctask. Aborting split-brain choice set"
1356                                 " for %s", loc->name);
1357                         ret = 1;
1358                         op_errno = ENOMEM;
1359                         goto out;
1360                 }
1361                 ret = 0;
1362                 goto out;
1363         }
1364
1365         ret = dict_get_ptr_and_len (dict, GF_AFR_SBRAIN_RESOLVE, &value, &len);
1366         if (value) {
1367                 spb_child_index = afr_get_split_brain_child_index (this, value,
1368                                                                    len);
1369                 if (spb_child_index < 0) {
1370                         ret = 1;
1371                         goto out;
1372                 }
1373
1374                 afr_split_brain_resolve_do (frame, this, loc,
1375                                             priv->children[spb_child_index]->name);
1376                 ret = 0;
1377         }
1378 out:
1379         /* key was correct but value was invalid when ret == 1 */
1380         if (ret == 1) {
1381                 AFR_STACK_UNWIND (setxattr, frame, -1, op_errno, NULL);
1382                 if (data)
1383                         GF_FREE (data);
1384                 ret = 0;
1385         }
1386         return ret;
1387 }
1388
1389 int
1390 afr_handle_spb_choice_timeout (xlator_t *this, call_frame_t *frame,
1391                                dict_t *dict)
1392 {
1393         int             ret               = -1;
1394         int             op_errno          = 0;
1395         uint64_t        timeout           = 0;
1396         afr_private_t  *priv              = NULL;
1397
1398         priv = this->private;
1399
1400         ret = dict_get_uint64 (dict, GF_AFR_SPB_CHOICE_TIMEOUT, &timeout);
1401         if (!ret) {
1402                 priv->spb_choice_timeout = timeout * 60;
1403                 AFR_STACK_UNWIND (setxattr, frame, ret, op_errno, NULL);
1404         }
1405
1406         return ret;
1407 }
1408
1409 int
1410 afr_handle_replace_brick (xlator_t *this, call_frame_t *frame, loc_t *loc,
1411                           dict_t *dict)
1412 {
1413         int             ret               = -1;
1414         int             rb_index          = -1;
1415         int             op_errno          = EPERM;
1416         char           *replace_brick     = NULL;
1417         afr_replace_brick_args_t *data    = NULL;
1418
1419         ret =  dict_get_str (dict, GF_AFR_REPLACE_BRICK, &replace_brick);
1420
1421         if (!ret) {
1422                 if (frame->root->pid != GF_CLIENT_PID_AFR_SELF_HEALD) {
1423                         gf_log (this->name, GF_LOG_ERROR, "'%s' is an internal"
1424                                 " extended attribute : %s.",
1425                                 GF_AFR_REPLACE_BRICK, strerror (EPERM));
1426                         ret = 1;
1427                         goto out;
1428                 }
1429                 rb_index = afr_get_child_index_from_name (this, replace_brick);
1430
1431                 if (rb_index < 0) {
1432                          /* Didn't belong to this replica pair
1433                           * Just do a no-op
1434                           */
1435                         AFR_STACK_UNWIND (setxattr, frame, 0, 0, NULL);
1436                         return 0;
1437                 } else {
1438                         data = GF_CALLOC (1, sizeof (*data),
1439                                           gf_afr_mt_replace_brick_t);
1440                         if (!data) {
1441                                 ret = 1;
1442                                 op_errno = ENOMEM;
1443                                 goto out;
1444                         }
1445                         data->frame = frame;
1446                         loc_copy (&data->loc, loc);
1447                         data->rb_index = rb_index;
1448                         ret = synctask_new (this->ctx->env,
1449                                             _afr_handle_replace_brick,
1450                                             _afr_handle_replace_brick_cbk,
1451                                             NULL, data);
1452                         if (ret) {
1453                                 gf_msg (this->name, GF_LOG_ERROR, 0,
1454                                         AFR_MSG_REPLACE_BRICK_FAILED,
1455                                         "Failed to create synctask. Unable to "
1456                                         "perform replace-brick.");
1457                                 ret = 1;
1458                                 op_errno = ENOMEM;
1459                                 loc_wipe (&data->loc);
1460                                 GF_FREE (data);
1461                                 goto out;
1462                         }
1463                 }
1464                 ret = 0;
1465         }
1466 out:
1467         if (ret == 1) {
1468                 AFR_STACK_UNWIND (setxattr, frame, -1, op_errno, NULL);
1469                 ret = 0;
1470         }
1471         return ret;
1472 }
1473
1474 static int
1475 afr_handle_special_xattr (xlator_t *this, call_frame_t *frame, loc_t *loc,
1476                           dict_t *dict)
1477 {
1478         int     ret     = -1;
1479
1480         ret = afr_handle_split_brain_commands (this, frame, loc, dict);
1481         if (ret == 0)
1482                 goto out;
1483
1484         ret = afr_handle_spb_choice_timeout (this, frame, dict);
1485         if (ret == 0)
1486                 goto out;
1487
1488         ret = afr_handle_replace_brick (this, frame, loc, dict);
1489 out:
1490         return ret;
1491 }
1492
1493 int
1494 afr_setxattr (call_frame_t *frame, xlator_t *this, loc_t *loc, dict_t *dict,
1495               int32_t flags, dict_t *xdata)
1496 {
1497         afr_local_t    *local             = NULL;
1498         call_frame_t   *transaction_frame = NULL;
1499         int             ret               = -1;
1500         int             op_errno          = EINVAL;
1501
1502         GF_IF_INTERNAL_XATTR_GOTO ("trusted.afr.*", dict,
1503                                    op_errno, out);
1504
1505         GF_IF_INTERNAL_XATTR_GOTO ("trusted.glusterfs.afr.*", dict,
1506                                    op_errno, out);
1507
1508         ret = afr_handle_special_xattr (this, frame, loc, dict);
1509         if (ret == 0)
1510                 return 0;
1511
1512         transaction_frame = copy_frame (frame);
1513         if (!transaction_frame)
1514                 goto out;
1515
1516         local = AFR_FRAME_INIT (transaction_frame, op_errno);
1517         if (!local)
1518                 goto out;
1519
1520         local->cont.setxattr.dict  = dict_ref (dict);
1521         local->cont.setxattr.flags = flags;
1522         if (xdata)
1523                 local->xdata_req = dict_copy_with_ref (xdata, NULL);
1524         else
1525                 local->xdata_req = dict_new ();
1526
1527         if (!local->xdata_req)
1528                 goto out;
1529
1530         local->transaction.wind   = afr_setxattr_wind;
1531         local->transaction.fop    = __afr_txn_write_fop;
1532         local->transaction.done   = __afr_txn_write_done;
1533         local->transaction.unwind = afr_setxattr_unwind;
1534
1535         loc_copy (&local->loc, loc);
1536         local->inode = inode_ref (loc->inode);
1537
1538         local->transaction.main_frame = frame;
1539         local->transaction.start   = LLONG_MAX - 1;
1540         local->transaction.len     = 0;
1541
1542         local->op = GF_FOP_SETXATTR;
1543
1544         ret = afr_transaction (transaction_frame, this, AFR_METADATA_TRANSACTION);
1545         if (ret < 0) {
1546                 op_errno = -ret;
1547                 goto out;
1548         }
1549
1550         return 0;
1551 out:
1552         if (transaction_frame)
1553                 AFR_STACK_DESTROY (transaction_frame);
1554
1555         AFR_STACK_UNWIND (setxattr, frame, -1, op_errno, NULL);
1556
1557         return 0;
1558 }
1559
1560 /* {{{ fsetxattr */
1561
1562
1563 int
1564 afr_fsetxattr_unwind (call_frame_t *frame, xlator_t *this)
1565 {
1566         afr_local_t    *local         = NULL;
1567         call_frame_t   *main_frame    = NULL;
1568
1569         local = frame->local;
1570
1571         main_frame = afr_transaction_detach_fop_frame (frame);
1572         if (!main_frame)
1573                 return 0;
1574
1575         AFR_STACK_UNWIND (fsetxattr, main_frame, local->op_ret, local->op_errno,
1576                           local->xdata_rsp);
1577         return 0;
1578 }
1579
1580
1581 int
1582 afr_fsetxattr_wind_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
1583                         int32_t op_ret, int32_t op_errno, dict_t *xdata)
1584 {
1585         return __afr_inode_write_cbk (frame, cookie, this, op_ret, op_errno,
1586                                       NULL, NULL, NULL, xdata);
1587 }
1588
1589
1590 int
1591 afr_fsetxattr_wind (call_frame_t *frame, xlator_t *this, int subvol)
1592 {
1593         afr_local_t        *local       = NULL;
1594         afr_private_t      *priv        = NULL;
1595
1596         local = frame->local;
1597         priv = this->private;
1598
1599         STACK_WIND_COOKIE (frame, afr_fsetxattr_wind_cbk, (void *) (long) subvol,
1600                            priv->children[subvol],
1601                            priv->children[subvol]->fops->fsetxattr,
1602                            local->fd, local->cont.fsetxattr.dict,
1603                            local->cont.fsetxattr.flags, local->xdata_req);
1604         return 0;
1605 }
1606
1607
1608 int
1609 afr_fsetxattr (call_frame_t *frame, xlator_t *this,
1610                fd_t *fd, dict_t *dict, int32_t flags, dict_t *xdata)
1611 {
1612         afr_local_t      *local             = NULL;
1613         call_frame_t     *transaction_frame = NULL;
1614         int               ret               = -1;
1615         int               op_errno          = ENOMEM;
1616
1617         GF_IF_INTERNAL_XATTR_GOTO ("trusted.afr.*", dict,
1618                                    op_errno, out);
1619
1620         GF_IF_INTERNAL_XATTR_GOTO ("trusted.glusterfs.afr.*", dict,
1621                                    op_errno, out);
1622
1623         transaction_frame = copy_frame (frame);
1624         if (!transaction_frame)
1625                 goto out;
1626
1627         local = AFR_FRAME_INIT (transaction_frame, op_errno);
1628         if (!local)
1629                 goto out;
1630
1631         local->cont.fsetxattr.dict  = dict_ref (dict);
1632         local->cont.fsetxattr.flags = flags;
1633
1634         if (xdata)
1635                 local->xdata_req = dict_copy_with_ref (xdata, NULL);
1636         else
1637                 local->xdata_req = dict_new ();
1638
1639         if (!local->xdata_req)
1640                 goto out;
1641
1642         local->transaction.wind   = afr_fsetxattr_wind;
1643         local->transaction.fop    = __afr_txn_write_fop;
1644         local->transaction.done   = __afr_txn_write_done;
1645         local->transaction.unwind = afr_fsetxattr_unwind;
1646
1647         local->fd                 = fd_ref (fd);
1648         local->inode = inode_ref (fd->inode);
1649
1650         local->op = GF_FOP_FSETXATTR;
1651
1652         local->transaction.main_frame = frame;
1653         local->transaction.start  = LLONG_MAX - 1;
1654         local->transaction.len    = 0;
1655
1656         ret = afr_transaction (transaction_frame, this, AFR_METADATA_TRANSACTION);
1657         if (ret < 0) {
1658                 op_errno = -ret;
1659                 goto out;
1660         }
1661
1662         return 0;
1663 out:
1664         if (transaction_frame)
1665                 AFR_STACK_DESTROY (transaction_frame);
1666
1667         AFR_STACK_UNWIND (fsetxattr, frame, -1, op_errno, NULL);
1668         return 0;
1669 }
1670
1671 /* }}} */
1672
1673
1674 /* {{{ removexattr */
1675
1676
1677 int
1678 afr_removexattr_unwind (call_frame_t *frame, xlator_t *this)
1679 {
1680         afr_local_t *   local = NULL;
1681         call_frame_t   *main_frame = NULL;
1682
1683         local = frame->local;
1684
1685         main_frame = afr_transaction_detach_fop_frame (frame);
1686         if (!main_frame)
1687                 return 0;
1688
1689         AFR_STACK_UNWIND (removexattr, main_frame, local->op_ret, local->op_errno,
1690                           local->xdata_rsp);
1691         return 0;
1692 }
1693
1694
1695 int
1696 afr_removexattr_wind_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
1697                           int32_t op_ret, int32_t op_errno, dict_t *xdata)
1698 {
1699         return __afr_inode_write_cbk (frame, cookie, this, op_ret, op_errno,
1700                                       NULL, NULL, NULL, xdata);
1701 }
1702
1703
1704 int
1705 afr_removexattr_wind (call_frame_t *frame, xlator_t *this, int subvol)
1706 {
1707         afr_local_t *local = NULL;
1708         afr_private_t *priv = NULL;
1709
1710         local = frame->local;
1711         priv = this->private;
1712
1713         STACK_WIND_COOKIE (frame, afr_removexattr_wind_cbk, (void *) (long) subvol,
1714                            priv->children[subvol],
1715                            priv->children[subvol]->fops->removexattr,
1716                            &local->loc, local->cont.removexattr.name,
1717                            local->xdata_req);
1718         return 0;
1719 }
1720
1721
1722 int
1723 afr_removexattr (call_frame_t *frame, xlator_t *this,
1724                  loc_t *loc, const char *name, dict_t *xdata)
1725 {
1726         afr_local_t     *local             = NULL;
1727         call_frame_t    *transaction_frame = NULL;
1728         int              ret               = -1;
1729         int              op_errno          = ENOMEM;
1730
1731         GF_IF_NATIVE_XATTR_GOTO ("trusted.afr.*",
1732                                  name, op_errno, out);
1733
1734         GF_IF_NATIVE_XATTR_GOTO ("trusted.glusterfs.afr.*",
1735                                  name, op_errno, out);
1736
1737         transaction_frame = copy_frame (frame);
1738         if (!transaction_frame)
1739                 goto out;
1740
1741         local = AFR_FRAME_INIT (transaction_frame, op_errno);
1742         if (!local)
1743                 goto out;
1744
1745         local->cont.removexattr.name = gf_strdup (name);
1746
1747         if (xdata)
1748                 local->xdata_req = dict_copy_with_ref (xdata, NULL);
1749         else
1750                 local->xdata_req = dict_new ();
1751
1752         if (!local->xdata_req)
1753                 goto out;
1754
1755         local->transaction.wind   = afr_removexattr_wind;
1756         local->transaction.fop    = __afr_txn_write_fop;
1757         local->transaction.done   = __afr_txn_write_done;
1758         local->transaction.unwind = afr_removexattr_unwind;
1759
1760         loc_copy (&local->loc, loc);
1761         local->inode = inode_ref (loc->inode);
1762
1763         local->op = GF_FOP_REMOVEXATTR;
1764
1765         local->transaction.main_frame = frame;
1766         local->transaction.start   = LLONG_MAX - 1;
1767         local->transaction.len     = 0;
1768
1769         ret = afr_transaction (transaction_frame, this, AFR_METADATA_TRANSACTION);
1770         if (ret < 0) {
1771                 op_errno = -ret;
1772                 goto out;
1773         }
1774
1775         return 0;
1776 out:
1777         if (transaction_frame)
1778                 AFR_STACK_DESTROY (transaction_frame);
1779
1780         AFR_STACK_UNWIND (removexattr, frame, -1, op_errno, NULL);
1781         return 0;
1782 }
1783
1784 /* ffremovexattr */
1785 int
1786 afr_fremovexattr_unwind (call_frame_t *frame, xlator_t *this)
1787 {
1788         afr_local_t *   local = NULL;
1789         call_frame_t   *main_frame = NULL;
1790
1791         local = frame->local;
1792
1793         main_frame = afr_transaction_detach_fop_frame (frame);
1794         if (!main_frame)
1795                 return 0;
1796
1797         AFR_STACK_UNWIND (fremovexattr, main_frame, local->op_ret, local->op_errno,
1798                           local->xdata_rsp);
1799         return 0;
1800 }
1801
1802
1803 int
1804 afr_fremovexattr_wind_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
1805                           int32_t op_ret, int32_t op_errno, dict_t *xdata)
1806 {
1807         return __afr_inode_write_cbk (frame, cookie, this, op_ret, op_errno,
1808                                       NULL, NULL, NULL, xdata);
1809 }
1810
1811
1812 int
1813 afr_fremovexattr_wind (call_frame_t *frame, xlator_t *this, int subvol)
1814 {
1815         afr_local_t *local = NULL;
1816         afr_private_t *priv = NULL;
1817
1818         local = frame->local;
1819         priv = this->private;
1820
1821         STACK_WIND_COOKIE (frame, afr_fremovexattr_wind_cbk, (void *) (long) subvol,
1822                            priv->children[subvol],
1823                            priv->children[subvol]->fops->fremovexattr,
1824                            local->fd, local->cont.removexattr.name,
1825                            local->xdata_req);
1826         return 0;
1827 }
1828
1829
1830 int
1831 afr_fremovexattr (call_frame_t *frame, xlator_t *this, fd_t *fd,
1832                   const char *name, dict_t *xdata)
1833 {
1834         afr_local_t *local = NULL;
1835         call_frame_t *transaction_frame = NULL;
1836         int ret = -1;
1837         int op_errno = ENOMEM;
1838
1839         GF_IF_NATIVE_XATTR_GOTO ("trusted.afr.*",
1840                                  name, op_errno, out);
1841
1842         GF_IF_NATIVE_XATTR_GOTO ("trusted.glusterfs.afr.*",
1843                                  name, op_errno, out);
1844
1845         transaction_frame = copy_frame (frame);
1846         if (!transaction_frame)
1847                 goto out;
1848
1849         local = AFR_FRAME_INIT (transaction_frame, op_errno);
1850         if (!local)
1851                 goto out;
1852
1853         local->cont.removexattr.name = gf_strdup (name);
1854         if (xdata)
1855                 local->xdata_req = dict_copy_with_ref (xdata, NULL);
1856         else
1857                 local->xdata_req = dict_new ();
1858
1859         if (!local->xdata_req)
1860                 goto out;
1861
1862         local->transaction.wind   = afr_fremovexattr_wind;
1863         local->transaction.fop    = __afr_txn_write_fop;
1864         local->transaction.done   = __afr_txn_write_done;
1865         local->transaction.unwind = afr_fremovexattr_unwind;
1866
1867         local->fd = fd_ref (fd);
1868         local->inode = inode_ref (fd->inode);
1869
1870         local->op = GF_FOP_FREMOVEXATTR;
1871
1872         local->transaction.main_frame = frame;
1873         local->transaction.start   = LLONG_MAX - 1;
1874         local->transaction.len     = 0;
1875
1876         ret = afr_transaction (transaction_frame, this, AFR_METADATA_TRANSACTION);
1877         if (ret < 0) {
1878                 op_errno = -ret;
1879                 goto out;
1880         }
1881
1882         return 0;
1883 out:
1884         if (transaction_frame)
1885                 AFR_STACK_DESTROY (transaction_frame);
1886
1887         AFR_STACK_UNWIND (fremovexattr, frame, -1, op_errno, NULL);
1888
1889         return 0;
1890 }
1891
1892
1893 int
1894 afr_fallocate_unwind (call_frame_t *frame, xlator_t *this)
1895 {
1896         afr_local_t *   local = NULL;
1897         call_frame_t   *main_frame = NULL;
1898
1899         local = frame->local;
1900
1901         main_frame = afr_transaction_detach_fop_frame (frame);
1902         if (!main_frame)
1903                 return 0;
1904
1905         AFR_STACK_UNWIND (fallocate, main_frame, local->op_ret, local->op_errno,
1906                           &local->cont.inode_wfop.prebuf,
1907                           &local->cont.inode_wfop.postbuf, local->xdata_rsp);
1908         return 0;
1909 }
1910
1911
1912 int
1913 afr_fallocate_wind_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
1914                         int32_t op_ret, int32_t op_errno, struct iatt *prebuf,
1915                         struct iatt *postbuf, dict_t *xdata)
1916 {
1917         return __afr_inode_write_cbk (frame, cookie, this, op_ret, op_errno,
1918                                       prebuf, postbuf, NULL, xdata);
1919 }
1920
1921
1922 int
1923 afr_fallocate_wind (call_frame_t *frame, xlator_t *this, int subvol)
1924 {
1925         afr_local_t *local = NULL;
1926         afr_private_t *priv = NULL;
1927
1928         local = frame->local;
1929         priv = this->private;
1930
1931         STACK_WIND_COOKIE (frame, afr_fallocate_wind_cbk, (void *) (long) subvol,
1932                            priv->children[subvol],
1933                            priv->children[subvol]->fops->fallocate,
1934                            local->fd, local->cont.fallocate.mode,
1935                            local->cont.fallocate.offset,
1936                            local->cont.fallocate.len, local->xdata_req);
1937         return 0;
1938 }
1939
1940
1941 int
1942 afr_fallocate (call_frame_t *frame, xlator_t *this, fd_t *fd, int32_t mode,
1943                off_t offset, size_t len, dict_t *xdata)
1944 {
1945         call_frame_t *transaction_frame = NULL;
1946         afr_local_t *local = NULL;
1947         int ret = -1;
1948         int op_errno = ENOMEM;
1949
1950         transaction_frame = copy_frame (frame);
1951         if (!transaction_frame)
1952                 goto out;
1953
1954         local = AFR_FRAME_INIT (transaction_frame, op_errno);
1955         if (!local)
1956                 goto out;
1957
1958         local->cont.fallocate.mode = mode;
1959         local->cont.fallocate.offset  = offset;
1960         local->cont.fallocate.len = len;
1961
1962         local->fd = fd_ref (fd);
1963         local->inode = inode_ref (fd->inode);
1964
1965         if (xdata)
1966                 local->xdata_req = dict_copy_with_ref (xdata, NULL);
1967         else
1968                 local->xdata_req = dict_new ();
1969
1970         if (!local->xdata_req)
1971                 goto out;
1972
1973         local->op = GF_FOP_FALLOCATE;
1974
1975         local->transaction.wind   = afr_fallocate_wind;
1976         local->transaction.fop    = __afr_txn_write_fop;
1977         local->transaction.done   = __afr_txn_write_done;
1978         local->transaction.unwind = afr_fallocate_unwind;
1979
1980         local->transaction.main_frame = frame;
1981
1982         local->transaction.start   = local->cont.fallocate.offset;
1983         local->transaction.len     = 0;
1984
1985         afr_fix_open (fd, this);
1986
1987         ret = afr_transaction (transaction_frame, this, AFR_DATA_TRANSACTION);
1988         if (ret < 0) {
1989                 op_errno = -ret;
1990                 goto out;
1991         }
1992
1993         return 0;
1994 out:
1995         if (transaction_frame)
1996                 AFR_STACK_DESTROY (transaction_frame);
1997
1998         AFR_STACK_UNWIND (fallocate, frame, -1, op_errno, NULL, NULL, NULL);
1999         return 0;
2000 }
2001
2002
2003 /* }}} */
2004
2005 /* {{{ discard */
2006
2007 int
2008 afr_discard_unwind (call_frame_t *frame, xlator_t *this)
2009 {
2010         afr_local_t *   local = NULL;
2011         call_frame_t   *main_frame = NULL;
2012
2013         local = frame->local;
2014
2015         main_frame = afr_transaction_detach_fop_frame (frame);
2016         if (!main_frame)
2017                 return 0;
2018
2019         AFR_STACK_UNWIND (discard, main_frame, local->op_ret, local->op_errno,
2020                           &local->cont.inode_wfop.prebuf,
2021                           &local->cont.inode_wfop.postbuf, local->xdata_rsp);
2022         return 0;
2023 }
2024
2025
2026 int
2027 afr_discard_wind_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
2028                       int32_t op_ret, int32_t op_errno, struct iatt *prebuf,
2029                       struct iatt *postbuf, dict_t *xdata)
2030 {
2031         return __afr_inode_write_cbk (frame, cookie, this, op_ret, op_errno,
2032                                       prebuf, postbuf, NULL, xdata);
2033 }
2034
2035
2036 int
2037 afr_discard_wind (call_frame_t *frame, xlator_t *this, int subvol)
2038 {
2039         afr_local_t *local = NULL;
2040         afr_private_t *priv = NULL;
2041
2042         local = frame->local;
2043         priv = this->private;
2044
2045         STACK_WIND_COOKIE (frame, afr_discard_wind_cbk, (void *) (long) subvol,
2046                            priv->children[subvol],
2047                            priv->children[subvol]->fops->discard,
2048                            local->fd, local->cont.discard.offset,
2049                            local->cont.discard.len, local->xdata_req);
2050         return 0;
2051 }
2052
2053
2054 int
2055 afr_discard (call_frame_t *frame, xlator_t *this, fd_t *fd, off_t offset,
2056              size_t len, dict_t *xdata)
2057 {
2058         afr_local_t *local = NULL;
2059         call_frame_t *transaction_frame = NULL;
2060         int ret = -1;
2061         int op_errno = ENOMEM;
2062
2063         transaction_frame = copy_frame (frame);
2064         if (!transaction_frame)
2065                 goto out;
2066
2067         local = AFR_FRAME_INIT (transaction_frame, op_errno);
2068         if (!local)
2069                 goto out;
2070
2071         local->cont.discard.offset  = offset;
2072         local->cont.discard.len = len;
2073
2074         local->fd = fd_ref (fd);
2075         local->inode = inode_ref (fd->inode);
2076
2077         if (xdata)
2078                 local->xdata_req = dict_copy_with_ref (xdata, NULL);
2079         else
2080                 local->xdata_req = dict_new ();
2081
2082         if (!local->xdata_req)
2083                 goto out;
2084
2085         local->op = GF_FOP_DISCARD;
2086
2087         local->transaction.wind   = afr_discard_wind;
2088         local->transaction.fop    = __afr_txn_write_fop;
2089         local->transaction.done   = __afr_txn_write_done;
2090         local->transaction.unwind = afr_discard_unwind;
2091
2092         local->transaction.main_frame = frame;
2093
2094         local->transaction.start   = local->cont.discard.offset;
2095         local->transaction.len     = 0;
2096
2097         afr_fix_open (fd, this);
2098
2099         ret = afr_transaction (transaction_frame, this, AFR_DATA_TRANSACTION);
2100         if (ret < 0) {
2101                 op_errno = -ret;
2102                 goto out;
2103         }
2104
2105         return 0;
2106 out:
2107         if (transaction_frame)
2108                 AFR_STACK_DESTROY (transaction_frame);
2109
2110         AFR_STACK_UNWIND (discard, frame, -1, op_errno, NULL, NULL, NULL);
2111         return 0;
2112 }
2113
2114
2115 /* {{{ zerofill */
2116
2117 int
2118 afr_zerofill_unwind (call_frame_t *frame, xlator_t *this)
2119 {
2120         afr_local_t *   local = NULL;
2121         call_frame_t   *main_frame = NULL;
2122
2123         local = frame->local;
2124
2125         main_frame = afr_transaction_detach_fop_frame (frame);
2126         if (!main_frame)
2127                 return 0;
2128
2129         AFR_STACK_UNWIND (discard, main_frame, local->op_ret, local->op_errno,
2130                           &local->cont.inode_wfop.prebuf,
2131                           &local->cont.inode_wfop.postbuf, local->xdata_rsp);
2132         return 0;
2133 }
2134
2135
2136 int
2137 afr_zerofill_wind_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
2138                       int32_t op_ret, int32_t op_errno, struct iatt *prebuf,
2139                       struct iatt *postbuf, dict_t *xdata)
2140 {
2141         return __afr_inode_write_cbk (frame, cookie, this, op_ret, op_errno,
2142                                       prebuf, postbuf, NULL, xdata);
2143 }
2144
2145
2146 int
2147 afr_zerofill_wind (call_frame_t *frame, xlator_t *this, int subvol)
2148 {
2149         afr_local_t *local = NULL;
2150         afr_private_t *priv = NULL;
2151
2152         local = frame->local;
2153         priv = this->private;
2154
2155         STACK_WIND_COOKIE (frame, afr_zerofill_wind_cbk, (void *) (long) subvol,
2156                            priv->children[subvol],
2157                            priv->children[subvol]->fops->zerofill,
2158                            local->fd, local->cont.zerofill.offset,
2159                            local->cont.zerofill.len, local->xdata_req);
2160         return 0;
2161 }
2162
2163 int
2164 afr_zerofill (call_frame_t *frame, xlator_t *this, fd_t *fd, off_t offset,
2165              size_t len, dict_t *xdata)
2166 {
2167         afr_local_t *local = NULL;
2168         call_frame_t *transaction_frame = NULL;
2169         int ret = -1;
2170         int op_errno = ENOMEM;
2171
2172         transaction_frame = copy_frame (frame);
2173         if (!transaction_frame)
2174                 goto out;
2175
2176         local = AFR_FRAME_INIT (transaction_frame, op_errno);
2177         if (!local)
2178                 goto out;
2179
2180         local->cont.zerofill.offset  = offset;
2181         local->cont.zerofill.len = len;
2182
2183         local->fd = fd_ref (fd);
2184         local->inode = inode_ref (fd->inode);
2185
2186         if (xdata)
2187                 local->xdata_req = dict_copy_with_ref (xdata, NULL);
2188         else
2189                 local->xdata_req = dict_new ();
2190
2191         if (!local->xdata_req)
2192                 goto out;
2193
2194         local->op = GF_FOP_ZEROFILL;
2195
2196         local->transaction.wind   = afr_zerofill_wind;
2197         local->transaction.fop    = __afr_txn_write_fop;
2198         local->transaction.done   = __afr_txn_write_done;
2199         local->transaction.unwind = afr_zerofill_unwind;
2200
2201         local->transaction.main_frame = frame;
2202
2203         local->transaction.start   = local->cont.discard.offset;
2204         local->transaction.len     = len;
2205
2206         afr_fix_open (fd, this);
2207
2208         ret = afr_transaction (transaction_frame, this, AFR_DATA_TRANSACTION);
2209         if (ret < 0) {
2210                 op_errno = -ret;
2211                 goto out;
2212         }
2213
2214         return 0;
2215 out:
2216         if (transaction_frame)
2217                 AFR_STACK_DESTROY (transaction_frame);
2218
2219         AFR_STACK_UNWIND (zerofill, frame, -1, op_errno, NULL, NULL, NULL);
2220         return 0;
2221 }
2222
2223 /* }}} */
2224
2225 int32_t
2226 afr_xattrop_wind_cbk (call_frame_t *frame, void *cookie,
2227                       xlator_t *this, int32_t op_ret, int32_t op_errno,
2228                       dict_t *xattr, dict_t *xdata)
2229 {
2230         return __afr_inode_write_cbk (frame, cookie, this, op_ret, op_errno,
2231                                       NULL, NULL, xattr, xdata);
2232 }
2233
2234 int
2235 afr_xattrop_wind (call_frame_t *frame, xlator_t *this, int subvol)
2236 {
2237         afr_local_t *local = NULL;
2238         afr_private_t *priv = NULL;
2239
2240         local = frame->local;
2241         priv = this->private;
2242
2243         STACK_WIND_COOKIE (frame, afr_xattrop_wind_cbk, (void *) (long) subvol,
2244                            priv->children[subvol],
2245                            priv->children[subvol]->fops->xattrop,
2246                            &local->loc, local->cont.xattrop.optype,
2247                            local->cont.xattrop.xattr, local->xdata_req);
2248         return 0;
2249 }
2250
2251 int
2252 afr_xattrop_unwind (call_frame_t *frame, xlator_t *this)
2253 {
2254         afr_local_t *local = NULL;
2255         call_frame_t *main_frame = NULL;
2256
2257         local = frame->local;
2258
2259         main_frame = afr_transaction_detach_fop_frame (frame);
2260         if (!main_frame)
2261                 return 0;
2262
2263         AFR_STACK_UNWIND (xattrop, main_frame, local->op_ret, local->op_errno,
2264                           local->xattr_rsp, local->xdata_rsp);
2265         return 0;
2266 }
2267
2268 int32_t
2269 afr_xattrop (call_frame_t *frame, xlator_t *this, loc_t *loc,
2270              gf_xattrop_flags_t optype, dict_t *xattr, dict_t *xdata)
2271 {
2272         afr_local_t *local = NULL;
2273         call_frame_t *transaction_frame = NULL;
2274         int ret = -1;
2275         int op_errno = ENOMEM;
2276
2277         transaction_frame = copy_frame (frame);
2278         if (!transaction_frame)
2279                 goto out;
2280
2281         local = AFR_FRAME_INIT (transaction_frame, op_errno);
2282         if (!local)
2283                 goto out;
2284
2285         local->cont.xattrop.xattr = dict_ref (xattr);
2286         local->cont.xattrop.optype = optype;
2287         if (xdata)
2288                 local->xdata_req = dict_ref (xdata);
2289
2290         local->transaction.wind   = afr_xattrop_wind;
2291         local->transaction.fop    = __afr_txn_write_fop;
2292         local->transaction.done   = __afr_txn_write_done;
2293         local->transaction.unwind = afr_xattrop_unwind;
2294
2295         loc_copy (&local->loc, loc);
2296         local->inode = inode_ref (loc->inode);
2297
2298         local->op = GF_FOP_XATTROP;
2299
2300         local->transaction.main_frame = frame;
2301         local->transaction.start   = LLONG_MAX - 1;
2302         local->transaction.len     = 0;
2303
2304         ret = afr_transaction (transaction_frame, this, AFR_METADATA_TRANSACTION);
2305         if (ret < 0) {
2306                 op_errno = -ret;
2307                 goto out;
2308         }
2309
2310         return 0;
2311 out:
2312         if (transaction_frame)
2313                 AFR_STACK_DESTROY (transaction_frame);
2314
2315         AFR_STACK_UNWIND (xattrop, frame, -1, op_errno, NULL, NULL);
2316         return 0;
2317 }
2318
2319 int32_t
2320 afr_fxattrop_wind_cbk (call_frame_t *frame, void *cookie,
2321                        xlator_t *this, int32_t op_ret, int32_t op_errno,
2322                        dict_t *xattr, dict_t *xdata)
2323 {
2324         return __afr_inode_write_cbk (frame, cookie, this, op_ret, op_errno,
2325                                       NULL, NULL, xattr, xdata);
2326 }
2327
2328 int
2329 afr_fxattrop_wind (call_frame_t *frame, xlator_t *this, int subvol)
2330 {
2331         afr_local_t *local = NULL;
2332         afr_private_t *priv = NULL;
2333
2334         local = frame->local;
2335         priv = this->private;
2336
2337         STACK_WIND_COOKIE (frame, afr_fxattrop_wind_cbk, (void *) (long) subvol,
2338                            priv->children[subvol],
2339                            priv->children[subvol]->fops->fxattrop,
2340                            local->fd, local->cont.xattrop.optype,
2341                            local->cont.xattrop.xattr, local->xdata_req);
2342         return 0;
2343 }
2344
2345 int
2346 afr_fxattrop_unwind (call_frame_t *frame, xlator_t *this)
2347 {
2348         afr_local_t *local = NULL;
2349         call_frame_t *main_frame = NULL;
2350
2351         local = frame->local;
2352
2353         main_frame = afr_transaction_detach_fop_frame (frame);
2354         if (!main_frame)
2355                 return 0;
2356
2357         AFR_STACK_UNWIND (fxattrop, main_frame, local->op_ret, local->op_errno,
2358                           local->xattr_rsp, local->xdata_rsp);
2359         return 0;
2360 }
2361
2362 int32_t
2363 afr_fxattrop (call_frame_t *frame, xlator_t *this, fd_t *fd,
2364               gf_xattrop_flags_t optype, dict_t *xattr, dict_t *xdata)
2365 {
2366         afr_local_t *local = NULL;
2367         call_frame_t *transaction_frame = NULL;
2368         int ret = -1;
2369         int op_errno = ENOMEM;
2370
2371         transaction_frame = copy_frame (frame);
2372         if (!transaction_frame)
2373                 goto out;
2374
2375         local = AFR_FRAME_INIT (transaction_frame, op_errno);
2376         if (!local)
2377                 goto out;
2378
2379         local->cont.xattrop.xattr = dict_ref (xattr);
2380         local->cont.xattrop.optype = optype;
2381         if (xdata)
2382                 local->xdata_req = dict_ref (xdata);
2383
2384         local->transaction.wind   = afr_fxattrop_wind;
2385         local->transaction.fop    = __afr_txn_write_fop;
2386         local->transaction.done   = __afr_txn_write_done;
2387         local->transaction.unwind = afr_fxattrop_unwind;
2388
2389         local->fd = fd_ref (fd);
2390         local->inode = inode_ref (fd->inode);
2391
2392         local->op = GF_FOP_FXATTROP;
2393
2394         local->transaction.main_frame = frame;
2395         local->transaction.start   = LLONG_MAX - 1;
2396         local->transaction.len     = 0;
2397
2398         ret = afr_transaction (transaction_frame, this,
2399                                AFR_METADATA_TRANSACTION);
2400         if (ret < 0) {
2401                 op_errno = -ret;
2402                 goto out;
2403         }
2404
2405         return 0;
2406 out:
2407         if (transaction_frame)
2408                 AFR_STACK_DESTROY (transaction_frame);
2409
2410         AFR_STACK_UNWIND (fxattrop, frame, -1, op_errno, NULL, NULL);
2411         return 0;
2412 }