23faa4b5af86c56fd339dea7552009e54455cd8c
[sahlberg/ctdb.git] / libctdb / ctdb_client.c
1 /* 
2    ctdb client code
3
4    Copyright (C) Andrew Tridgell  2007
5    Copyright (C) Ronnie Sahlberg  2007-2010
6
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 3 of the License, or
10    (at your option) any later version.
11    
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16    
17    You should have received a copy of the GNU General Public License
18    along with this program; if not, see <http://www.gnu.org/licenses/>.
19 */
20
21 #include "includes.h"
22 #include "db_wrap.h"
23 #include "lib/tdb/include/tdb.h"
24 #include "lib/util/dlinklist.h"
25 #include "lib/events/events.h"
26 #include "system/network.h"
27 #include "system/filesys.h"
28 #include "system/locale.h"
29 #include <stdlib.h>
30 #include "include/ctdb_protocol.h"
31 #include "include/ctdb_private.h"
32 #include "lib/util/dlinklist.h"
33
34
35 /*
36   allocate a packet for use in client<->daemon communication
37  */
38 struct ctdb_req_header *_ctdbd_allocate_pkt(struct ctdb_context *ctdb,
39                                             TALLOC_CTX *mem_ctx, 
40                                             enum ctdb_operation operation, 
41                                             size_t length, size_t slength,
42                                             const char *type)
43 {
44         int size;
45         struct ctdb_req_header *hdr;
46
47         length = MAX(length, slength);
48         size = (length+(CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1);
49
50         hdr = (struct ctdb_req_header *)talloc_size(mem_ctx, size);
51         if (hdr == NULL) {
52                 DEBUG(DEBUG_ERR,("Unable to allocate packet for operation %u of length %u\n",
53                          operation, (unsigned)length));
54                 return NULL;
55         }
56         talloc_set_name_const(hdr, type);
57         memset(hdr, 0, slength);
58         hdr->length       = length;
59         hdr->operation    = operation;
60         hdr->ctdb_magic   = CTDB_MAGIC;
61         hdr->ctdb_version = CTDB_VERSION;
62         hdr->srcnode      = ctdb->pnn;
63         if (ctdb->vnn_map) {
64                 hdr->generation = ctdb->vnn_map->generation;
65         }
66
67         return hdr;
68 }
69
70 /*
71   local version of ctdb_call
72 */
73 int ctdb_call_local(struct ctdb_db_context *ctdb_db, struct ctdb_call *call,
74                     struct ctdb_ltdb_header *header, TALLOC_CTX *mem_ctx,
75                     TDB_DATA *data, uint32_t caller)
76 {
77         struct ctdb_call_info *c;
78         struct ctdb_registered_call *fn;
79         struct ctdb_context *ctdb = ctdb_db->ctdb;
80         
81         c = talloc(ctdb, struct ctdb_call_info);
82         CTDB_NO_MEMORY(ctdb, c);
83
84         c->key = call->key;
85         c->call_data = &call->call_data;
86         c->record_data.dptr = talloc_memdup(c, data->dptr, data->dsize);
87         c->record_data.dsize = data->dsize;
88         CTDB_NO_MEMORY(ctdb, c->record_data.dptr);
89         c->new_data = NULL;
90         c->reply_data = NULL;
91         c->status = 0;
92
93         for (fn=ctdb_db->calls;fn;fn=fn->next) {
94                 if (fn->id == call->call_id) break;
95         }
96         if (fn == NULL) {
97                 ctdb_set_error(ctdb, "Unknown call id %u\n", call->call_id);
98                 talloc_free(c);
99                 return -1;
100         }
101
102         if (fn->fn(c) != 0) {
103                 ctdb_set_error(ctdb, "ctdb_call %u failed\n", call->call_id);
104                 talloc_free(c);
105                 return -1;
106         }
107
108         if (header->laccessor != caller) {
109                 header->lacount = 0;
110         }
111         header->laccessor = caller;
112         header->lacount++;
113
114         /* we need to force the record to be written out if this was a remote access,
115            so that the lacount is updated */
116         if (c->new_data == NULL && header->laccessor != ctdb->pnn) {
117                 c->new_data = &c->record_data;
118         }
119
120         if (c->new_data) {
121                 /* XXX check that we always have the lock here? */
122                 if (ctdb_ltdb_store(ctdb_db, call->key, header, *c->new_data) != 0) {
123                         ctdb_set_error(ctdb, "ctdb_call tdb_store failed\n");
124                         talloc_free(c);
125                         return -1;
126                 }
127         }
128
129         if (c->reply_data) {
130                 call->reply_data = *c->reply_data;
131
132                 talloc_steal(call, call->reply_data.dptr);
133                 talloc_set_name_const(call->reply_data.dptr, __location__);
134         } else {
135                 call->reply_data.dptr = NULL;
136                 call->reply_data.dsize = 0;
137         }
138         call->status = c->status;
139
140         talloc_free(c);
141
142         return 0;
143 }
144
145
146 /* async version of receive control reply */
147 int ctdb_control_recv(struct ctdb_context *ctdb, 
148                 struct ctdb_client_control_state *state, 
149                 TALLOC_CTX *mem_ctx,
150                 TDB_DATA *outdata, int32_t *status, char **errormsg)
151 {
152         TALLOC_CTX *tmp_ctx;
153
154         if (status != NULL) {
155                 *status = -1;
156         }
157         if (errormsg != NULL) {
158                 *errormsg = NULL;
159         }
160
161         if (state == NULL) {
162                 return -1;
163         }
164
165         /* prevent double free of state */
166         tmp_ctx = talloc_new(ctdb);
167         talloc_steal(tmp_ctx, state);
168
169         /* loop one event at a time until we either timeout or the control
170            completes.
171         */
172         while (state->state == CTDB_CONTROL_WAIT) {
173                 event_loop_once(ctdb->ev);
174         }
175
176         if (state->state != CTDB_CONTROL_DONE) {
177                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_recv failed\n"));
178                 if (state->async.fn) {
179                         state->async.fn(state);
180                 }
181                 talloc_free(tmp_ctx);
182                 return -1;
183         }
184
185         if (state->errormsg) {
186                 DEBUG(DEBUG_ERR,("ctdb_control error: '%s'\n", state->errormsg));
187                 if (errormsg) {
188                         (*errormsg) = talloc_move(mem_ctx, &state->errormsg);
189                 }
190                 if (state->async.fn) {
191                         state->async.fn(state);
192                 }
193                 talloc_free(tmp_ctx);
194                 return -1;
195         }
196
197         if (outdata) {
198                 *outdata = state->outdata;
199                 outdata->dptr = talloc_memdup(mem_ctx, outdata->dptr, outdata->dsize);
200         }
201
202         if (status) {
203                 *status = state->status;
204         }
205
206         if (state->async.fn) {
207                 state->async.fn(state);
208         }
209
210         talloc_free(tmp_ctx);
211         return 0;
212 }
213
214
215 /*
216    called when a control completes or timesout to invoke the callback
217    function the user provided
218 */
219 void ctdb_invoke_control_callback(struct event_context *ev, struct timed_event *te, 
220         struct timeval t, void *private_data)
221 {
222         struct ctdb_client_control_state *state;
223         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
224         int ret;
225
226         state = talloc_get_type(private_data, struct ctdb_client_control_state);
227         talloc_steal(tmp_ctx, state);
228
229         ret = ctdb_control_recv(state->ctdb, state, state,
230                         NULL, 
231                         NULL, 
232                         NULL);
233
234         talloc_free(tmp_ctx);
235 }
236
237 /*
238   called when a CTDB_REPLY_CONTROL packet comes in in the client
239
240   This packet comes in response to a CTDB_REQ_CONTROL request packet. It
241   contains any reply data from the control
242 */
243 static void ctdb_client_reply_control(struct ctdb_context *ctdb, 
244                                       struct ctdb_req_header *hdr)
245 {
246         struct ctdb_reply_control *c = (struct ctdb_reply_control *)hdr;
247         struct ctdb_client_control_state *state;
248
249         state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_client_control_state);
250         if (state == NULL) {
251                 DEBUG(DEBUG_ERR,(__location__ " reqid %u not found\n", hdr->reqid));
252                 return;
253         }
254
255         if (hdr->reqid != state->reqid) {
256                 /* we found a record  but it was the wrong one */
257                 DEBUG(DEBUG_ERR, ("Dropped orphaned reply control with reqid:%u\n",hdr->reqid));
258                 return;
259         }
260
261         state->outdata.dptr = c->data;
262         state->outdata.dsize = c->datalen;
263         state->status = c->status;
264         if (c->errorlen) {
265                 state->errormsg = talloc_strndup(state, 
266                                                  (char *)&c->data[c->datalen], 
267                                                  c->errorlen);
268         }
269
270         /* state->outdata now uses resources from c so we dont want c
271            to just dissappear from under us while state is still alive
272         */
273         talloc_steal(state, c);
274
275         state->state = CTDB_CONTROL_DONE;
276
277         /* if we had a callback registered for this control, pull the response
278            and call the callback.
279         */
280         if (state->async.fn) {
281                 event_add_timed(ctdb->ev, state, timeval_zero(), ctdb_invoke_control_callback, state);
282         }
283 }
284
285
286 /*
287   called when a CTDB_REPLY_CALL packet comes in in the client
288
289   This packet comes in response to a CTDB_REQ_CALL request packet. It
290   contains any reply data from the call
291 */
292 static void ctdb_client_reply_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
293 {
294         struct ctdb_reply_call *c = (struct ctdb_reply_call *)hdr;
295         struct ctdb_client_call_state *state;
296
297         state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_client_call_state);
298         if (state == NULL) {
299                 DEBUG(DEBUG_ERR,(__location__ " reqid %u not found\n", hdr->reqid));
300                 return;
301         }
302
303         if (hdr->reqid != state->reqid) {
304                 /* we found a record  but it was the wrong one */
305                 DEBUG(DEBUG_ERR, ("Dropped client call reply with reqid:%u\n",hdr->reqid));
306                 return;
307         }
308
309         state->call->reply_data.dptr = c->data;
310         state->call->reply_data.dsize = c->datalen;
311         state->call->status = c->status;
312
313         talloc_steal(state, c);
314
315         state->state = CTDB_CALL_DONE;
316
317         if (state->async.fn) {
318                 state->async.fn(state);
319         }
320 }
321
322 /*
323   this is called in the client, when data comes in from the daemon
324  */
325 static void ctdb_client_read_cb(uint8_t *data, size_t cnt, void *args)
326 {
327         struct ctdb_context *ctdb = talloc_get_type(args, struct ctdb_context);
328         struct ctdb_req_header *hdr = (struct ctdb_req_header *)data;
329         TALLOC_CTX *tmp_ctx;
330
331         /* place the packet as a child of a tmp_ctx. We then use
332            talloc_free() below to free it. If any of the calls want
333            to keep it, then they will steal it somewhere else, and the
334            talloc_free() will be a no-op */
335         tmp_ctx = talloc_new(ctdb);
336         talloc_steal(tmp_ctx, hdr);
337
338         if (cnt == 0) {
339                 DEBUG(DEBUG_INFO,("Daemon has exited - shutting down client\n"));
340                 exit(0);
341         }
342
343         if (cnt < sizeof(*hdr)) {
344                 DEBUG(DEBUG_CRIT,("Bad packet length %u in client\n", (unsigned)cnt));
345                 goto done;
346         }
347         if (cnt != hdr->length) {
348                 ctdb_set_error(ctdb, "Bad header length %u expected %u in client\n", 
349                                (unsigned)hdr->length, (unsigned)cnt);
350                 goto done;
351         }
352
353         if (hdr->ctdb_magic != CTDB_MAGIC) {
354                 ctdb_set_error(ctdb, "Non CTDB packet rejected in client\n");
355                 goto done;
356         }
357
358         if (hdr->ctdb_version != CTDB_VERSION) {
359                 ctdb_set_error(ctdb, "Bad CTDB version 0x%x rejected in client\n", hdr->ctdb_version);
360                 goto done;
361         }
362
363         switch (hdr->operation) {
364         case CTDB_REPLY_CALL:
365                 ctdb_client_reply_call(ctdb, hdr);
366                 break;
367
368         case CTDB_REQ_MESSAGE:
369                 ctdb_request_message(ctdb, hdr);
370                 break;
371
372         case CTDB_REPLY_CONTROL:
373                 ctdb_client_reply_control(ctdb, hdr);
374                 break;
375
376         default:
377                 DEBUG(DEBUG_CRIT,("bogus operation code:%u\n",hdr->operation));
378         }
379
380 done:
381         talloc_free(tmp_ctx);
382 }
383
384
385 /*
386   connect to a unix domain socket
387 */
388 int ctdb_socket_connect(struct ctdb_context *ctdb)
389 {
390         struct sockaddr_un addr;
391
392         memset(&addr, 0, sizeof(addr));
393         addr.sun_family = AF_UNIX;
394         strncpy(addr.sun_path, ctdb->daemon.name, sizeof(addr.sun_path));
395
396         ctdb->daemon.sd = socket(AF_UNIX, SOCK_STREAM, 0);
397         if (ctdb->daemon.sd == -1) {
398                 DEBUG(DEBUG_ERR,(__location__ " Failed to open client socket. Errno:%s(%d)\n", strerror(errno), errno));
399                 return -1;
400         }
401
402         set_nonblocking(ctdb->daemon.sd);
403         set_close_on_exec(ctdb->daemon.sd);
404         
405         if (connect(ctdb->daemon.sd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
406                 close(ctdb->daemon.sd);
407                 ctdb->daemon.sd = -1;
408                 DEBUG(DEBUG_ERR,(__location__ " Failed to connect client socket to daemon. Errno:%s(%d)\n", strerror(errno), errno));
409                 return -1;
410         }
411
412         ctdb->daemon.queue = ctdb_queue_setup(ctdb, ctdb, ctdb->daemon.sd, 
413                                               CTDB_DS_ALIGNMENT, 
414                                               ctdb_client_read_cb, ctdb);
415         return 0;
416 }
417
418
419
420 /*
421   destroy a ctdb_control in client
422 */
423 int ctdb_control_destructor(struct ctdb_client_control_state *state)    
424 {
425         ctdb_reqid_remove(state->ctdb, state->reqid);
426         return 0;
427 }
428
429
430 /*
431   queue a packet for sending from client to daemon
432 */
433 static int ctdb_client_queue_pkt(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
434 {
435         return ctdb_queue_send(ctdb->daemon.queue, (uint8_t *)hdr, hdr->length);
436 }
437
438
439 /* async version of send control request */
440 struct ctdb_client_control_state *ctdb_control_send(struct ctdb_context *ctdb, 
441                 uint32_t destnode, uint64_t srvid, 
442                 uint32_t opcode, uint32_t flags, TDB_DATA data, 
443                 TALLOC_CTX *mem_ctx,
444                 char **errormsg)
445 {
446         struct ctdb_client_control_state *state;
447         size_t len;
448         struct ctdb_req_control *c;
449         int ret;
450
451         if (errormsg) {
452                 *errormsg = NULL;
453         }
454
455         /* if the domain socket is not yet open, open it */
456         if (ctdb->daemon.sd==-1) {
457                 ctdb_socket_connect(ctdb);
458         }
459
460         state = talloc_zero(mem_ctx, struct ctdb_client_control_state);
461         CTDB_NO_MEMORY_NULL(ctdb, state);
462
463         state->ctdb       = ctdb;
464         state->reqid      = ctdb_reqid_new(ctdb, state);
465         state->state      = CTDB_CONTROL_WAIT;
466         state->errormsg   = NULL;
467
468         talloc_set_destructor(state, ctdb_control_destructor);
469
470         len = offsetof(struct ctdb_req_control, data) + data.dsize;
471         c = ctdbd_allocate_pkt(ctdb, state, CTDB_REQ_CONTROL, 
472                                len, struct ctdb_req_control);
473         state->c            = c;        
474         CTDB_NO_MEMORY_NULL(ctdb, c);
475         c->hdr.reqid        = state->reqid;
476         c->hdr.destnode     = destnode;
477         c->opcode           = opcode;
478         c->client_id        = 0;
479         c->flags            = flags;
480         c->srvid            = srvid;
481         c->datalen          = data.dsize;
482         if (data.dsize) {
483                 memcpy(&c->data[0], data.dptr, data.dsize);
484         }
485
486         ret = ctdb_client_queue_pkt(ctdb, &(c->hdr));
487         if (ret != 0) {
488                 talloc_free(state);
489                 return NULL;
490         }
491
492         if (flags & CTDB_CTRL_FLAG_NOREPLY) {
493                 talloc_free(state);
494                 return NULL;
495         }
496
497         return state;
498 }
499
500
501
502
503 /*
504   destroy a ctdb_call in client
505 */
506 static int ctdb_client_call_destructor(struct ctdb_client_call_state *state)    
507 {
508         ctdb_reqid_remove(state->ctdb_db->ctdb, state->reqid);
509         return 0;
510 }
511
512 /*
513   construct an event driven local ctdb_call
514
515   this is used so that locally processed ctdb_call requests are processed
516   in an event driven manner
517 */
518 static struct ctdb_client_call_state *ctdb_client_call_local_send(struct ctdb_db_context *ctdb_db, 
519                                                                   struct ctdb_call *call,
520                                                                   struct ctdb_ltdb_header *header,
521                                                                   TDB_DATA *data)
522 {
523         struct ctdb_client_call_state *state;
524         struct ctdb_context *ctdb = ctdb_db->ctdb;
525         int ret;
526
527         state = talloc_zero(ctdb_db, struct ctdb_client_call_state);
528         CTDB_NO_MEMORY_NULL(ctdb, state);
529         state->call = talloc_zero(state, struct ctdb_call);
530         CTDB_NO_MEMORY_NULL(ctdb, state->call);
531
532         talloc_steal(state, data->dptr);
533
534         state->state   = CTDB_CALL_DONE;
535         *(state->call) = *call;
536         state->ctdb_db = ctdb_db;
537
538         ret = ctdb_call_local(ctdb_db, state->call, header, state, data, ctdb->pnn);
539
540         return state;
541 }
542
543 /*
544   make a ctdb call to the local daemon - async send. Called from client context.
545
546   This constructs a ctdb_call request and queues it for processing. 
547   This call never blocks.
548 */
549 struct ctdb_client_call_state *ctdb_call_send(struct ctdb_db_context *ctdb_db, 
550                                               struct ctdb_call *call)
551 {
552         struct ctdb_client_call_state *state;
553         struct ctdb_context *ctdb = ctdb_db->ctdb;
554         struct ctdb_ltdb_header header;
555         TDB_DATA data;
556         int ret;
557         size_t len;
558         struct ctdb_req_call *c;
559
560         /* if the domain socket is not yet open, open it */
561         if (ctdb->daemon.sd==-1) {
562                 ctdb_socket_connect(ctdb);
563         }
564
565         ret = ctdb_ltdb_lock(ctdb_db, call->key);
566         if (ret != 0) {
567                 DEBUG(DEBUG_ERR,(__location__ " Failed to get chainlock\n"));
568                 return NULL;
569         }
570
571         ret = ctdb_ltdb_fetch(ctdb_db, call->key, &header, ctdb_db, &data);
572
573         if (ret == 0 && header.dmaster == ctdb->pnn) {
574                 state = ctdb_client_call_local_send(ctdb_db, call, &header, &data);
575                 talloc_free(data.dptr);
576                 ctdb_ltdb_unlock(ctdb_db, call->key);
577                 return state;
578         }
579
580         ctdb_ltdb_unlock(ctdb_db, call->key);
581         talloc_free(data.dptr);
582
583         state = talloc_zero(ctdb_db, struct ctdb_client_call_state);
584         if (state == NULL) {
585                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate state\n"));
586                 return NULL;
587         }
588         state->call = talloc_zero(state, struct ctdb_call);
589         if (state->call == NULL) {
590                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate state->call\n"));
591                 return NULL;
592         }
593
594         len = offsetof(struct ctdb_req_call, data) + call->key.dsize + call->call_data.dsize;
595         c = ctdbd_allocate_pkt(ctdb, state, CTDB_REQ_CALL, len, struct ctdb_req_call);
596         if (c == NULL) {
597                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate packet\n"));
598                 return NULL;
599         }
600
601         state->reqid     = ctdb_reqid_new(ctdb, state);
602         state->ctdb_db = ctdb_db;
603         talloc_set_destructor(state, ctdb_client_call_destructor);
604
605         c->hdr.reqid     = state->reqid;
606         c->flags         = call->flags;
607         c->db_id         = ctdb_db->db_id;
608         c->callid        = call->call_id;
609         c->hopcount      = 0;
610         c->keylen        = call->key.dsize;
611         c->calldatalen   = call->call_data.dsize;
612         memcpy(&c->data[0], call->key.dptr, call->key.dsize);
613         memcpy(&c->data[call->key.dsize], 
614                call->call_data.dptr, call->call_data.dsize);
615         *(state->call)              = *call;
616         state->call->call_data.dptr = &c->data[call->key.dsize];
617         state->call->key.dptr       = &c->data[0];
618
619         state->state  = CTDB_CALL_WAIT;
620
621
622         ctdb_client_queue_pkt(ctdb, &c->hdr);
623
624         return state;
625 }
626
627 /*
628   send a message - from client context
629  */
630 int ctdb_send_message(struct ctdb_context *ctdb, uint32_t pnn,
631                       uint64_t srvid, TDB_DATA data)
632 {
633         struct ctdb_req_message *r;
634         int len, res;
635
636         len = offsetof(struct ctdb_req_message, data) + data.dsize;
637         r = ctdbd_allocate_pkt(ctdb, ctdb, CTDB_REQ_MESSAGE, 
638                                len, struct ctdb_req_message);
639         CTDB_NO_MEMORY(ctdb, r);
640
641         r->hdr.destnode  = pnn;
642         r->srvid         = srvid;
643         r->datalen       = data.dsize;
644         memcpy(&r->data[0], data.dptr, data.dsize);
645         
646         res = ctdb_client_queue_pkt(ctdb, &r->hdr);
647         if (res != 0) {
648                 return res;
649         }
650
651         talloc_free(r);
652         return 0;
653 }
654
655
656
657