4c59f0fe0f5ae6984d51cc6bfdcc352b2b5a7df5
[obnox/glusterfs.git] / xlators / mgmt / glusterd / src / glusterd-replace-brick.c
1 /*
2    Copyright (c) 2011-2012 Red Hat, Inc. <http://www.redhat.com>
3    This file is part of GlusterFS.
4
5    This file is licensed to you under your choice of the GNU Lesser
6    General Public License, version 3 or any later version (LGPLv3 or
7    later), or the GNU General Public License, version 2 (GPLv2), in all
8    cases as published by the Free Software Foundation.
9 */
10 #include "common-utils.h"
11 #include "cli1-xdr.h"
12 #include "xdr-generic.h"
13 #include "glusterfs.h"
14 #include "glusterd.h"
15 #include "glusterd-op-sm.h"
16 #include "glusterd-geo-rep.h"
17 #include "glusterd-store.h"
18 #include "glusterd-utils.h"
19 #include "glusterd-svc-mgmt.h"
20 #include "glusterd-svc-helper.h"
21 #include "glusterd-nfs-svc.h"
22 #include "glusterd-volgen.h"
23 #include "glusterd-messages.h"
24 #include "glusterd-mgmt.h"
25 #include "run.h"
26 #include "syscall.h"
27
28 #include <signal.h>
29
30 #define GLUSTERD_GET_RB_MNTPT(path, len, volinfo)                           \
31         snprintf (path, len,                                                \
32                   DEFAULT_VAR_RUN_DIRECTORY"/%s-"RB_CLIENT_MOUNTPOINT,      \
33                   volinfo->volname);
34
35 extern uuid_t global_txn_id;
36
37 int
38 glusterd_mgmt_v3_initiate_replace_brick_cmd_phases (rpcsvc_request_t *req,
39                                                     glusterd_op_t op,
40                                                     dict_t *dict);
41 int
42 glusterd_handle_replicate_replace_brick (glusterd_volinfo_t *volinfo,
43                                          glusterd_brickinfo_t *brickinfo)
44 {
45         int32_t                    ret               = -1;
46         char                       tmpmount[]        = "/tmp/mntXXXXXX";
47         char                       logfile[PATH_MAX] = {0,};
48         int                        dirty[3]          = {0,};
49         runner_t                   runner            = {0};
50         glusterd_conf_t           *priv              = NULL;
51         char                      *pid               = NULL;
52         char                      *volfileserver     = NULL;
53
54         priv = THIS->private;
55
56         dirty[2] = hton32(1);
57
58         ret = sys_lsetxattr (brickinfo->path, GF_AFR_DIRTY, dirty,
59                              sizeof (dirty), 0);
60         if (ret == -1) {
61                 gf_msg (THIS->name, GF_LOG_ERROR, errno,
62                         GD_MSG_SETXATTR_FAIL, "Failed to set extended"
63                         " attribute %s : %s.", GF_AFR_DIRTY, strerror (errno));
64                 goto out;
65         }
66
67         if (mkdtemp (tmpmount) == NULL) {
68                 gf_msg (THIS->name, GF_LOG_ERROR, errno,
69                         GD_MSG_DIR_OP_FAILED,
70                         "failed to create a temporary mount directory.");
71                 ret = -1;
72                 goto out;
73         }
74         snprintf (logfile, sizeof (logfile),
75                   DEFAULT_LOG_FILE_DIRECTORY"/%s-replace-brick-mount.log",
76                   volinfo->volname);
77
78         ret = gf_asprintf (&pid, "%d", GF_CLIENT_PID_AFR_SELF_HEALD);
79         if (ret < 0)
80                 goto out;
81
82         if (dict_get_str (THIS->options, "transport.socket.bind-address",
83                           &volfileserver) != 0)
84                 volfileserver = "localhost";
85
86         runinit (&runner);
87         runner_add_args (&runner, SBIN_DIR"/glusterfs",
88                          "-s", volfileserver,
89                          "--volfile-id", volinfo->volname,
90                          "--client-pid", pid,
91                          "-l", logfile, tmpmount, NULL);
92         synclock_unlock (&priv->big_lock);
93         ret = runner_run (&runner);
94
95         if (ret) {
96                 runner_log (&runner, THIS->name, GF_LOG_ERROR, "mount command"
97                             "failed.");
98                 goto lock;
99         }
100         ret = sys_lsetxattr (tmpmount, GF_AFR_REPLACE_BRICK,
101                              brickinfo->brick_id, sizeof (brickinfo->brick_id),
102                              0);
103         if (ret == -1)
104                 gf_msg (THIS->name, GF_LOG_ERROR, errno,
105                         GD_MSG_SETXATTR_FAIL, "Failed to set extended"
106                         " attribute %s : %s", GF_AFR_REPLACE_BRICK,
107                         strerror (errno));
108         gf_umount_lazy (THIS->name, tmpmount, 1);
109 lock:
110         synclock_lock (&priv->big_lock);
111 out:
112         if (pid)
113                 GF_FREE (pid);
114         gf_msg_debug ("glusterd", 0, "Returning with ret");
115         return ret;
116 }
117
118 int
119 __glusterd_handle_replace_brick (rpcsvc_request_t *req)
120 {
121         int32_t                         ret = -1;
122         gf_cli_req                      cli_req = {{0,}};
123         dict_t                          *dict = NULL;
124         char                            *src_brick = NULL;
125         char                            *dst_brick = NULL;
126         int32_t                         op = 0;
127         glusterd_op_t                   cli_op = GD_OP_REPLACE_BRICK;
128         char                            *volname = NULL;
129         char                            msg[2048] = {0,};
130         xlator_t                        *this = NULL;
131
132         GF_ASSERT (req);
133         this = THIS;
134         GF_ASSERT (this);
135
136         ret = xdr_to_generic (req->msg[0], &cli_req, (xdrproc_t)xdr_gf_cli_req);
137         if (ret < 0) {
138                 //failed to decode msg;
139                 gf_msg (this->name, GF_LOG_ERROR, 0,
140                         GD_MSG_REQ_DECODE_FAIL, "Failed to decode "
141                         "request received from cli");
142                 req->rpc_err = GARBAGE_ARGS;
143                 goto out;
144         }
145
146         gf_msg (this->name, GF_LOG_INFO, 0,
147                 GD_MSG_REPLACE_BRK_REQ_RCVD,
148                 "Received replace brick req");
149
150         if (cli_req.dict.dict_len) {
151                 /* Unserialize the dictionary */
152                 dict  = dict_new ();
153
154                 ret = dict_unserialize (cli_req.dict.dict_val,
155                                         cli_req.dict.dict_len,
156                                         &dict);
157                 if (ret < 0) {
158                         gf_msg (this->name, GF_LOG_ERROR, 0,
159                                 GD_MSG_DICT_UNSERIALIZE_FAIL,
160                                 "failed to "
161                                 "unserialize req-buffer to dictionary");
162                         snprintf (msg, sizeof (msg), "Unable to decode the "
163                                   "command");
164                         goto out;
165                 }
166         }
167
168         ret = dict_get_str (dict, "volname", &volname);
169         if (ret) {
170                 snprintf (msg, sizeof (msg), "Could not get volume name");
171                 gf_msg (this->name, GF_LOG_ERROR, 0,
172                         GD_MSG_DICT_GET_FAILED, "%s", msg);
173                 goto out;
174         }
175
176         ret = dict_get_int32 (dict, "operation", &op);
177         if (ret) {
178                 gf_msg_debug (this->name, 0,
179                         "dict_get on operation failed");
180                 snprintf (msg, sizeof (msg), "Could not get operation");
181                 goto out;
182         }
183
184         ret = dict_get_str (dict, "src-brick", &src_brick);
185
186         if (ret) {
187                 snprintf (msg, sizeof (msg), "Failed to get src brick");
188                 gf_msg (this->name, GF_LOG_ERROR, 0,
189                         GD_MSG_DICT_GET_FAILED, "%s", msg);
190                 goto out;
191         }
192         gf_msg_debug (this->name, 0,
193                 "src brick=%s", src_brick);
194
195         ret = dict_get_str (dict, "dst-brick", &dst_brick);
196
197         if (ret) {
198                 snprintf (msg, sizeof (msg), "Failed to get dest brick");
199                 gf_msg (this->name, GF_LOG_ERROR, 0,
200                         GD_MSG_DICT_GET_FAILED, "%s", msg);
201                 goto out;
202         }
203
204         gf_msg_debug (this->name, 0, "dst brick=%s", dst_brick);
205         gf_msg (this->name, GF_LOG_INFO, 0,
206                 GD_MSG_REPLACE_BRK_COMMIT_FORCE_REQ_RCVD,
207                 "Received replace brick commit-force "
208                 "request operation");
209
210         ret = glusterd_mgmt_v3_initiate_replace_brick_cmd_phases (req,
211                                             GD_OP_REPLACE_BRICK, dict);
212
213 out:
214         free (cli_req.dict.dict_val);//malloced by xdr
215
216         return ret;
217 }
218
219 int
220 glusterd_handle_replace_brick (rpcsvc_request_t *req)
221 {
222         return glusterd_big_locked_handler (req,
223                                             __glusterd_handle_replace_brick);
224 }
225
226 static int
227 glusterd_get_rb_dst_brickinfo (glusterd_volinfo_t *volinfo,
228                                glusterd_brickinfo_t **brickinfo)
229 {
230         int32_t                 ret = -1;
231
232         if (!volinfo || !brickinfo)
233                 goto out;
234
235         *brickinfo = volinfo->rep_brick.dst_brick;
236
237         ret = 0;
238
239 out:
240         return ret;
241 }
242
243 int
244 glusterd_op_stage_replace_brick (dict_t *dict, char **op_errstr,
245                                  dict_t *rsp_dict)
246 {
247         int                                      ret                = 0;
248         int32_t                                  port               = 0;
249         char                                    *src_brick          = NULL;
250         char                                    *dst_brick          = NULL;
251         char                                    *volname            = NULL;
252         char                                    *replace_op         = NULL;
253         glusterd_volinfo_t                      *volinfo            = NULL;
254         glusterd_brickinfo_t                    *src_brickinfo      = NULL;
255         char                                    *host               = NULL;
256         char                                    *path               = NULL;
257         char                                     msg[2048]          = {0};
258         char                                    *dup_dstbrick       = NULL;
259         glusterd_peerinfo_t                     *peerinfo           = NULL;
260         glusterd_brickinfo_t                    *dst_brickinfo      = NULL;
261         gf_boolean_t                             enabled            = _gf_false;
262         glusterd_conf_t                         *priv               = NULL;
263         char                                    *savetok            = NULL;
264         char                                     pidfile[PATH_MAX]  = {0};
265         char                                    *task_id_str        = NULL;
266         xlator_t                                *this               = NULL;
267         gf_boolean_t                             is_force           = _gf_false;
268         gsync_status_param_t                     param              = {0,};
269
270         this = THIS;
271         GF_ASSERT (this);
272
273         priv = this->private;
274         GF_ASSERT (priv);
275
276         ret = dict_get_str (dict, "src-brick", &src_brick);
277
278         if (ret) {
279                 gf_msg (this->name, GF_LOG_ERROR, 0,
280                         GD_MSG_DICT_GET_FAILED, "Unable to get src brick");
281                 goto out;
282         }
283
284         gf_msg_debug (this->name, 0, "src brick=%s", src_brick);
285
286         ret = dict_get_str (dict, "dst-brick", &dst_brick);
287
288         if (ret) {
289                 gf_msg (this->name, GF_LOG_ERROR, 0,
290                         GD_MSG_DICT_GET_FAILED, "Unable to get dest brick");
291                 goto out;
292         }
293
294         gf_msg_debug (this->name, 0, "dst brick=%s", dst_brick);
295
296         ret = dict_get_str (dict, "volname", &volname);
297
298         if (ret) {
299                 gf_msg (this->name, GF_LOG_ERROR, 0,
300                         GD_MSG_DICT_GET_FAILED, "Unable to get volume name");
301                 goto out;
302         }
303
304         ret = dict_get_str (dict, "operation", &replace_op);
305         if (ret) {
306                 gf_msg_debug (this->name, 0,
307                         "dict get on replace-brick operation failed");
308                 goto out;
309         }
310
311         ret = glusterd_volinfo_find (volname, &volinfo);
312         if (ret) {
313                 snprintf (msg, sizeof (msg), "volume: %s does not exist",
314                           volname);
315                 *op_errstr = gf_strdup (msg);
316                 goto out;
317         }
318
319         if (GLUSTERD_STATUS_STARTED != volinfo->status) {
320                 ret = -1;
321                 snprintf (msg, sizeof (msg), "volume: %s is not started",
322                           volname);
323                 *op_errstr = gf_strdup (msg);
324                 goto out;
325         }
326
327         ret = glusterd_disallow_op_for_tier (volinfo, GD_OP_REPLACE_BRICK, -1);
328         if (ret) {
329                 snprintf (msg, sizeof (msg), "Replace brick commands are not "
330                           "supported on tiered volume %s", volname);
331                 *op_errstr = gf_strdup (msg);
332                 goto out;
333         }
334
335         if (!glusterd_store_is_valid_brickpath (volname, dst_brick) ||
336                 !glusterd_is_valid_volfpath (volname, dst_brick)) {
337                 snprintf (msg, sizeof (msg), "brick path %s is too "
338                           "long.", dst_brick);
339                 gf_msg (this->name, GF_LOG_ERROR, 0,
340                         GD_MSG_BRKPATH_TOO_LONG, "%s", msg);
341                 *op_errstr = gf_strdup (msg);
342
343                 ret = -1;
344                 goto out;
345         }
346
347         /* If geo-rep is configured, for this volume, it should be stopped. */
348         param.volinfo = volinfo;
349         ret = glusterd_check_geo_rep_running (&param, op_errstr);
350         if (ret || param.is_active) {
351                 ret = -1;
352                 goto out;
353         }
354
355         if (glusterd_is_defrag_on(volinfo)) {
356                 snprintf (msg, sizeof(msg), "Volume name %s rebalance is in "
357                           "progress. Please retry after completion", volname);
358                 gf_msg (this->name, GF_LOG_ERROR, 0,
359                         GD_MSG_OIP_RETRY_LATER, "%s", msg);
360                 *op_errstr = gf_strdup (msg);
361                 ret = -1;
362                 goto out;
363         }
364
365         if (!strcmp(replace_op, "GF_REPLACE_OP_COMMIT_FORCE")) {
366                 is_force = _gf_true;
367         } else {
368                 ret = -1;
369                 goto out;
370         }
371
372         ret = glusterd_volume_brickinfo_get_by_brick (src_brick, volinfo,
373                                                       &src_brickinfo);
374         if (ret) {
375                 snprintf (msg, sizeof (msg), "brick: %s does not exist in "
376                           "volume: %s", src_brick, volname);
377                 *op_errstr = gf_strdup (msg);
378                 goto out;
379         }
380
381         if (dict) {
382                 if (!glusterd_is_fuse_available ()) {
383                         gf_msg (this->name, GF_LOG_ERROR, 0,
384                                 GD_MSG_RB_CMD_FAIL, "Unable to open /dev/"
385                                 "fuse (%s), replace-brick command failed",
386                                 strerror (errno));
387                         snprintf (msg, sizeof(msg), "Fuse unavailable\n "
388                                 "Replace-brick failed");
389                         *op_errstr = gf_strdup (msg);
390                         ret = -1;
391                         goto out;
392                 }
393         }
394
395         if (gf_is_local_addr (src_brickinfo->hostname)) {
396                 gf_msg_debug (this->name, 0,
397                         "I AM THE SOURCE HOST");
398                 if (src_brickinfo->port && rsp_dict) {
399                         ret = dict_set_int32 (rsp_dict, "src-brick-port",
400                                               src_brickinfo->port);
401                         if (ret) {
402                                 gf_msg_debug (this->name, 0,
403                                         "Could not set src-brick-port=%d",
404                                         src_brickinfo->port);
405                         }
406                 }
407
408                 GLUSTERD_GET_BRICK_PIDFILE (pidfile, volinfo, src_brickinfo,
409                                             priv);
410
411         }
412
413         dup_dstbrick = gf_strdup (dst_brick);
414         if (!dup_dstbrick) {
415                 ret = -1;
416                 gf_msg (this->name, GF_LOG_ERROR, ENOMEM,
417                         GD_MSG_NO_MEMORY, "Memory allocation failed");
418                 goto out;
419         }
420         host = strtok_r (dup_dstbrick, ":", &savetok);
421         path = strtok_r (NULL, ":", &savetok);
422
423         if (!host || !path) {
424                 gf_msg (this->name, GF_LOG_ERROR, 0,
425                         GD_MSG_BAD_FORMAT,
426                         "dst brick %s is not of form <HOSTNAME>:<export-dir>",
427                         dst_brick);
428                 ret = -1;
429                 goto out;
430         }
431
432         ret = glusterd_brickinfo_new_from_brick (dst_brick, &dst_brickinfo);
433         if (ret)
434                 goto out;
435
436         ret = glusterd_new_brick_validate (dst_brick, dst_brickinfo,
437                                            msg, sizeof (msg));
438         if (ret) {
439                 *op_errstr = gf_strdup (msg);
440                 ret = -1;
441                 gf_msg (this->name, GF_LOG_ERROR, 0,
442                         GD_MSG_BRICK_VALIDATE_FAIL, "%s", *op_errstr);
443                 goto out;
444         }
445
446         if (!strcmp(replace_op, "GF_REPLACE_OP_COMMIT_FORCE")) {
447
448                 volinfo->rep_brick.src_brick = src_brickinfo;
449                 volinfo->rep_brick.dst_brick = dst_brickinfo;
450         }
451
452         if (glusterd_rb_check_bricks (volinfo, src_brickinfo, dst_brickinfo)) {
453
454                 ret = -1;
455                 *op_errstr = gf_strdup ("Incorrect source or "
456                                         "destination brick");
457                 if (*op_errstr)
458                         gf_msg (this->name, GF_LOG_ERROR, EINVAL,
459                                 GD_MSG_BRICK_NOT_FOUND, "%s", *op_errstr);
460                 goto out;
461        }
462
463         if (gf_is_local_addr (host)) {
464                 ret = glusterd_validate_and_create_brickpath (dst_brickinfo,
465                                                   volinfo->volume_id,
466                                                   op_errstr, is_force);
467                 if (ret)
468                         goto out;
469         }
470
471         if (!gf_is_local_addr (host)) {
472                 rcu_read_lock ();
473
474                 peerinfo = glusterd_peerinfo_find (NULL, host);
475                 if (peerinfo == NULL) {
476                         ret = -1;
477                         snprintf (msg, sizeof (msg), "%s, is not a friend",
478                                   host);
479                         *op_errstr = gf_strdup (msg);
480
481                 } else if (!peerinfo->connected) {
482                         snprintf (msg, sizeof (msg), "%s, is not connected at "
483                                   "the moment", host);
484                         *op_errstr = gf_strdup (msg);
485                         ret = -1;
486
487                 } else if (GD_FRIEND_STATE_BEFRIENDED !=
488                                 peerinfo->state.state) {
489                         snprintf (msg, sizeof (msg), "%s, is not befriended "
490                                   "at the moment", host);
491                         *op_errstr = gf_strdup (msg);
492                         ret = -1;
493                 }
494                 rcu_read_unlock ();
495
496                 if (ret)
497                         goto out;
498
499         } else if (priv->op_version >= GD_OP_VERSION_3_6_0) {
500                 /* A bricks mount dir is required only by snapshots which were
501                  * introduced in gluster-3.6.0
502                  */
503                 ret = glusterd_get_brick_mount_dir (dst_brickinfo->path,
504                                                     dst_brickinfo->hostname,
505                                                     dst_brickinfo->mount_dir);
506                 if (ret) {
507                         gf_msg (this->name, GF_LOG_ERROR, 0,
508                                 GD_MSG_BRICK_MOUNTDIR_GET_FAIL,
509                                 "Failed to get brick mount_dir");
510                         goto out;
511                 }
512
513                 ret = dict_set_dynstr_with_alloc (rsp_dict, "brick1.mount_dir",
514                                                   dst_brickinfo->mount_dir);
515                 if (ret) {
516                         gf_msg (this->name, GF_LOG_ERROR, 0,
517                                 GD_MSG_DICT_SET_FAILED,
518                                 "Failed to set brick1.mount_dir");
519                         goto out;
520                 }
521
522                 ret = dict_set_int32 (rsp_dict, "brick_count", 1);
523                 if (ret) {
524                         gf_msg (this->name, GF_LOG_ERROR, 0,
525                                 GD_MSG_DICT_SET_FAILED,
526                                 "Failed to set local_brick_count");
527                         goto out;
528                 }
529         }
530
531         ret = 0;
532
533 out:
534         GF_FREE (dup_dstbrick);
535         gf_msg_debug (this->name, 0, "Returning %d", ret);
536
537         return ret;
538 }
539
540 static int
541 rb_kill_destination_brick (glusterd_volinfo_t *volinfo,
542                            glusterd_brickinfo_t *dst_brickinfo)
543 {
544         glusterd_conf_t  *priv               = NULL;
545         char              pidfile[PATH_MAX]  = {0,};
546
547         priv = THIS->private;
548
549         snprintf (pidfile, PATH_MAX, "%s/vols/%s/%s",
550                   priv->workdir, volinfo->volname,
551                   RB_DSTBRICK_PIDFILE);
552
553         return glusterd_service_stop ("brick", pidfile, SIGTERM, _gf_true);
554 }
555
556 static int
557 rb_update_dstbrick_port (glusterd_brickinfo_t *dst_brickinfo, dict_t *rsp_dict,
558                          dict_t *req_dict, char *replace_op)
559 {
560         int     ret           = 0;
561         int     dict_ret      = 0;
562         int     dst_port      = 0;
563
564         dict_ret = dict_get_int32 (req_dict, "dst-brick-port", &dst_port);
565         if (!dict_ret)
566                 dst_brickinfo->port = dst_port;
567
568         if (gf_is_local_addr (dst_brickinfo->hostname)) {
569                 gf_msg ("glusterd", GF_LOG_INFO, 0,
570                         GD_MSG_BRK_PORT_NO_ADD_INDO,
571                         "adding dst-brick port no");
572
573                 if (rsp_dict) {
574                         ret = dict_set_int32 (rsp_dict, "dst-brick-port",
575                                               dst_brickinfo->port);
576                         if (ret) {
577                                 gf_msg_debug ("glusterd", 0,
578                                         "Could not set dst-brick port no in rsp dict");
579                                 goto out;
580                         }
581                 }
582
583                 if (req_dict) {
584                         ret = dict_set_int32 (req_dict, "dst-brick-port",
585                                               dst_brickinfo->port);
586                         if (ret) {
587                                 gf_msg_debug ("glusterd", 0,
588                                         "Could not set dst-brick port no");
589                                 goto out;
590                         }
591                 }
592         }
593 out:
594         return ret;
595 }
596
597 static int
598 glusterd_op_perform_replace_brick (glusterd_volinfo_t  *volinfo,
599                                    char *old_brick, char *new_brick,
600                                    dict_t *dict)
601 {
602         char                                    *brick_mount_dir = NULL;
603         glusterd_brickinfo_t                    *old_brickinfo = NULL;
604         glusterd_brickinfo_t                    *new_brickinfo = NULL;
605         int32_t                                 ret = -1;
606         xlator_t                                *this = NULL;
607         glusterd_conf_t                         *conf = NULL;
608
609         this = THIS;
610         GF_ASSERT (this);
611         GF_ASSERT (dict);
612         GF_ASSERT (volinfo);
613
614         conf = this->private;
615         GF_ASSERT (conf);
616
617         ret = glusterd_brickinfo_new_from_brick (new_brick,
618                                                  &new_brickinfo);
619         if (ret)
620                 goto out;
621
622         ret = glusterd_resolve_brick (new_brickinfo);
623
624         if (ret)
625                 goto out;
626
627         ret = glusterd_volume_brickinfo_get_by_brick (old_brick,
628                                                       volinfo, &old_brickinfo);
629         if (ret)
630                 goto out;
631
632         strncpy (new_brickinfo->brick_id, old_brickinfo->brick_id,
633                  sizeof (new_brickinfo->brick_id));
634
635         /* A bricks mount dir is required only by snapshots which were
636          * introduced in gluster-3.6.0
637          */
638         if (conf->op_version >= GD_OP_VERSION_3_6_0) {
639                 ret = dict_get_str (dict, "brick1.mount_dir", &brick_mount_dir);
640                 if (ret) {
641                         gf_msg (this->name, GF_LOG_ERROR, errno,
642                                 GD_MSG_BRICK_MOUNTDIR_GET_FAIL,
643                                 "brick1.mount_dir not present");
644                         goto out;
645                 }
646                 strncpy (new_brickinfo->mount_dir, brick_mount_dir,
647                          sizeof(new_brickinfo->mount_dir));
648         }
649
650         cds_list_add_tail (&new_brickinfo->brick_list,
651                            &old_brickinfo->brick_list);
652
653         volinfo->brick_count++;
654
655         ret = glusterd_op_perform_remove_brick (volinfo, old_brick, 1, NULL);
656         if (ret)
657                 goto out;
658
659         /* if the volume is a replicate volume, do: */
660         if (glusterd_is_volume_replicate (volinfo)) {
661                 if (!gf_uuid_compare (new_brickinfo->uuid, MY_UUID)) {
662                         ret = glusterd_handle_replicate_replace_brick
663                                   (volinfo, new_brickinfo);
664                         if (ret < 0)
665                                 goto out;
666                 }
667         }
668
669         ret = glusterd_create_volfiles_and_notify_services (volinfo);
670         if (ret)
671                 goto out;
672
673         if (GLUSTERD_STATUS_STARTED == volinfo->status) {
674                 ret = glusterd_brick_start (volinfo, new_brickinfo, _gf_false);
675                 if (ret)
676                         goto out;
677         }
678
679 out:
680
681         gf_msg_debug ("glusterd", 0, "Returning %d", ret);
682         return ret;
683 }
684
685 int
686 glusterd_op_replace_brick (dict_t *dict, dict_t *rsp_dict)
687 {
688         int                                      ret           = 0;
689         dict_t                                  *ctx           = NULL;
690         char                                    *replace_op    = NULL;
691         glusterd_volinfo_t                      *volinfo       = NULL;
692         char                                    *volname       = NULL;
693         xlator_t                                *this          = NULL;
694         glusterd_conf_t                         *priv          = NULL;
695         char                                    *src_brick     = NULL;
696         char                                    *dst_brick     = NULL;
697         glusterd_brickinfo_t                    *src_brickinfo = NULL;
698         glusterd_brickinfo_t                    *dst_brickinfo = NULL;
699         char                                    *task_id_str   = NULL;
700
701         this = THIS;
702         GF_ASSERT (this);
703
704         priv = this->private;
705         GF_ASSERT (priv);
706
707         ret = dict_get_str (dict, "src-brick", &src_brick);
708         if (ret) {
709                 gf_msg (this->name, GF_LOG_ERROR, 0,
710                         GD_MSG_DICT_GET_FAILED, "Unable to get src brick");
711                 goto out;
712         }
713
714         gf_msg_debug (this->name, 0, "src brick=%s", src_brick);
715
716         ret = dict_get_str (dict, "dst-brick", &dst_brick);
717         if (ret) {
718                 gf_msg (this->name, GF_LOG_ERROR, 0,
719                         GD_MSG_DICT_GET_FAILED, "Unable to get dst brick");
720                 goto out;
721         }
722
723         gf_msg_debug (this->name, 0, "dst brick=%s", dst_brick);
724
725         ret = dict_get_str (dict, "volname", &volname);
726         if (ret) {
727                 gf_msg (this->name, GF_LOG_ERROR, 0,
728                         GD_MSG_DICT_GET_FAILED, "Unable to get volume name");
729                 goto out;
730         }
731
732         ret = dict_get_str (dict, "operation", &replace_op);
733         if (ret) {
734                 gf_msg_debug (this->name, 0,
735                         "dict_get on operation failed");
736                 goto out;
737         }
738
739         ret = glusterd_volinfo_find (volname, &volinfo);
740         if (ret) {
741                 gf_msg (this->name, GF_LOG_ERROR, ENOMEM,
742                         GD_MSG_NO_MEMORY, "Unable to allocate memory");
743                 goto out;
744         }
745
746         ret = glusterd_volume_brickinfo_get_by_brick (src_brick, volinfo,
747                                                       &src_brickinfo);
748         if (ret) {
749                 gf_msg_debug (this->name, 0,
750                         "Unable to get src-brickinfo");
751                 goto out;
752         }
753
754
755         ret = glusterd_get_rb_dst_brickinfo (volinfo, &dst_brickinfo);
756         if (ret) {
757                 gf_msg (this->name, GF_LOG_ERROR, 0,
758                         GD_MSG_RB_BRICKINFO_GET_FAIL, "Unable to get "
759                          "replace brick destination brickinfo");
760                 goto out;
761         }
762
763         ret = glusterd_resolve_brick (dst_brickinfo);
764         if (ret) {
765                 gf_msg_debug (this->name, 0,
766                         "Unable to resolve dst-brickinfo");
767                 goto out;
768         }
769
770         ret = rb_update_dstbrick_port (dst_brickinfo, rsp_dict,
771                                        dict, replace_op);
772         if (ret)
773                 goto out;
774
775         if (strcmp(replace_op, "GF_REPLACE_OP_COMMIT_FORCE")) {
776                 ret = -1;
777                 goto out;
778         }
779
780         if (gf_is_local_addr (dst_brickinfo->hostname)) {
781                 gf_msg_debug (this->name, 0, "I AM THE DESTINATION HOST");
782                 ret = rb_kill_destination_brick (volinfo, dst_brickinfo);
783                 if (ret) {
784                         gf_msg (this->name, GF_LOG_CRITICAL, 0,
785                                 GD_MSG_BRK_CLEANUP_FAIL,
786                                 "Unable to cleanup dst brick");
787                         goto out;
788                 }
789         }
790
791         ret = glusterd_svcs_stop (volinfo);
792         if (ret) {
793                 gf_msg (this->name, GF_LOG_ERROR, 0,
794                         GD_MSG_NFS_SERVER_STOP_FAIL,
795                         "Unable to stop nfs server, ret: %d", ret);
796         }
797
798         ret = glusterd_op_perform_replace_brick (volinfo, src_brick,
799                                                  dst_brick, dict);
800         if (ret) {
801                 gf_msg (this->name, GF_LOG_CRITICAL, 0,
802                         GD_MSG_BRICK_ADD_FAIL, "Unable to add dst-brick: "
803                         "%s to volume: %s", dst_brick, volinfo->volname);
804                 (void) glusterd_svcs_manager (volinfo);
805                 goto out;
806         }
807
808         volinfo->rebal.defrag_status = 0;
809
810         ret = glusterd_svcs_manager (volinfo);
811         if (ret) {
812                 gf_msg (this->name, GF_LOG_CRITICAL, 0,
813                         GD_MSG_NFS_VOL_FILE_GEN_FAIL,
814                         "Failed to generate nfs volume file");
815         }
816
817
818         ret = glusterd_fetchspec_notify (THIS);
819         glusterd_brickinfo_delete (volinfo->rep_brick.dst_brick);
820         volinfo->rep_brick.src_brick = NULL;
821         volinfo->rep_brick.dst_brick = NULL;
822
823         if (!ret)
824                 ret = glusterd_store_volinfo (volinfo,
825                                               GLUSTERD_VOLINFO_VER_AC_INCREMENT);
826         if (ret)
827                 gf_msg (this->name, GF_LOG_ERROR, 0,
828                         GD_MSG_RBOP_STATE_STORE_FAIL, "Couldn't store"
829                         " replace brick operation's state");
830
831 out:
832         return ret;
833 }
834
835 int
836 glusterd_mgmt_v3_initiate_replace_brick_cmd_phases (rpcsvc_request_t *req,
837                                                     glusterd_op_t op,
838                                                     dict_t *dict)
839 {
840         int32_t                     ret              = -1;
841         int32_t                     op_ret           = -1;
842         uint32_t                    txn_generation   = 0;
843         uint32_t                    op_errno         = 0;
844         char                        *cli_errstr      = NULL;
845         char                        *op_errstr       = NULL;
846         dict_t                      *req_dict        = NULL;
847         dict_t                      *tmp_dict        = NULL;
848         uuid_t                      *originator_uuid = NULL;
849         xlator_t                    *this            = NULL;
850         glusterd_conf_t             *conf            = NULL;
851         gf_boolean_t                success          = _gf_false;
852         gf_boolean_t                is_acquired      = _gf_false;
853
854         this = THIS;
855         GF_ASSERT (this);
856         GF_ASSERT (req);
857         GF_ASSERT (dict);
858         conf = this->private;
859         GF_ASSERT (conf);
860
861         txn_generation = conf->generation;
862         originator_uuid = GF_CALLOC (1, sizeof(uuid_t),
863                                      gf_common_mt_uuid_t);
864         if (!originator_uuid) {
865                 ret = -1;
866                 goto out;
867         }
868
869         gf_uuid_copy (*originator_uuid, MY_UUID);
870         ret = dict_set_bin (dict, "originator_uuid",
871                             originator_uuid, sizeof (uuid_t));
872         if (ret) {
873                 gf_msg (this->name, GF_LOG_ERROR, 0,
874                         GD_MSG_DICT_SET_FAILED,
875                         "Failed to set originator_uuid.");
876                 GF_FREE (originator_uuid);
877                 goto out;
878         }
879
880         ret = dict_set_int32 (dict, "is_synctasked", _gf_true);
881         if (ret) {
882                 gf_msg (this->name, GF_LOG_ERROR, 0,
883                         GD_MSG_DICT_SET_FAILED,
884                         "Failed to set synctasked flag to true.");
885                 goto out;
886         }
887
888         tmp_dict = dict_new();
889         if (!tmp_dict) {
890                 gf_msg (this->name, GF_LOG_ERROR, 0,
891                         GD_MSG_DICT_CREATE_FAIL, "Unable to create dict");
892                 goto out;
893         }
894         dict_copy (dict, tmp_dict);
895
896         ret = glusterd_mgmt_v3_initiate_lockdown (op, dict, &op_errstr,
897                                                   &op_errno, &is_acquired,
898                                                   txn_generation);
899         if (ret) {
900                 gf_msg (this->name, GF_LOG_ERROR, 0,
901                         GD_MSG_MGMTV3_LOCKDOWN_FAIL,
902                         "mgmt_v3 lockdown failed.");
903                 goto out;
904         }
905
906         ret = glusterd_mgmt_v3_build_payload (&req_dict, &op_errstr, dict, op);
907         if (ret) {
908                 gf_msg (this->name, GF_LOG_ERROR, 0,
909                         GD_MSG_MGMTV3_PAYLOAD_BUILD_FAIL, LOGSTR_BUILD_PAYLOAD,
910                         gd_op_list[op]);
911                 if (op_errstr == NULL)
912                         gf_asprintf (&op_errstr, OPERRSTR_BUILD_PAYLOAD);
913                 goto out;
914         }
915
916         ret = glusterd_mgmt_v3_pre_validate (op, req_dict, &op_errstr,
917                                              &op_errno, txn_generation);
918         if (ret) {
919                 gf_msg (this->name, GF_LOG_ERROR, 0,
920                         GD_MSG_PRE_VALIDATION_FAIL, "Pre Validation Failed");
921                 goto out;
922         }
923
924         ret = glusterd_mgmt_v3_commit (op, dict, req_dict, &op_errstr,
925                                        &op_errno, txn_generation);
926         if (ret) {
927                 gf_msg (this->name, GF_LOG_ERROR, 0,
928                         GD_MSG_COMMIT_OP_FAIL, "Commit Op Failed");
929                 goto out;
930         }
931
932         ret = 0;
933
934 out:
935         op_ret = ret;
936
937         (void) glusterd_mgmt_v3_release_peer_locks (op, dict, op_ret,
938                                                     &op_errstr, is_acquired,
939                                                     txn_generation);
940
941         if (is_acquired) {
942                 ret = glusterd_multiple_mgmt_v3_unlock (tmp_dict, MY_UUID);
943                 if (ret) {
944                         gf_msg (this->name, GF_LOG_ERROR, 0,
945                                 GD_MSG_MGMTV3_UNLOCK_FAIL,
946                                 "Failed to release mgmt_v3 locks on "
947                                 "localhost.");
948                         op_ret = ret;
949                 }
950         }
951         /* SEND CLI RESPONSE */
952         glusterd_op_send_cli_response (op, op_ret, op_errno, req,
953                                        dict, op_errstr);
954
955         if (req_dict)
956                 dict_unref (req_dict);
957
958         if (tmp_dict)
959                 dict_unref (tmp_dict);
960
961         if (op_errstr) {
962                 GF_FREE (op_errstr);
963                 op_errstr = NULL;
964         }
965
966         return 0;
967 }