add a function ctdb_writerecord() to write a record to the database.
[sahlberg/ctdb.git] / libctdb / libctdb.c
1 /* 
2    ctdb database library
3    Utility functions to connect to the ctdb daemon
4
5    Copyright (C) Andrew Tridgell  2006
6    Copyright (C) Ronnie sahlberg  2010
7
8    This program is free software; you can redistribute it and/or modify
9    it under the terms of the GNU General Public License as published by
10    the Free Software Foundation; either version 3 of the License, or
11    (at your option) any later version.
12    
13    This program is distributed in the hope that it will be useful,
14    but WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16    GNU General Public License for more details.
17    
18    You should have received a copy of the GNU General Public License
19    along with this program; if not, see <http://www.gnu.org/licenses/>.
20 */
21
22 #include <poll.h>
23 #include "includes.h"
24 #include "db_wrap.h"
25 #include "lib/tdb/include/tdb.h"
26 #include "lib/util/dlinklist.h"
27 #include "lib/events/events.h"
28 #include "lib/events/events_internal.h"
29 #include "lib/tdb/include/tdb.h"
30 #include "include/ctdb.h"
31 #include "include/ctdb_protocol.h"
32 #include "include/ctdb_private.h"
33 #include <sys/time.h>
34 #include "system/filesys.h"
35
36
37 /*
38   this is the dummy null procedure that all databases support
39 */
40 static int ctdb_null_func(struct ctdb_call_info *call)
41 {
42         return 0;
43 }
44
45 /*
46   this is a plain fetch procedure that all databases support
47 */
48 static int ctdb_fetch_func(struct ctdb_call_info *call)
49 {
50         call->reply_data = &call->record_data;
51         return 0;
52 }
53
54
55
56 struct ctdb_context *ctdb_connect(const char *addr)
57 {
58         struct event_context *ev;
59         struct ctdb_context *ctdb;
60         int ret;
61
62         ev = event_context_init(NULL);
63
64         /* initialise ctdb */
65         ctdb = ctdb_init(ev);
66         if (ctdb == NULL) {
67                 fprintf(stderr, "Failed to init ctdb\n");
68                 exit(1);
69         }
70
71         ret = ctdb_set_socketname(ctdb, addr);
72         if (ret == -1) {
73                 fprintf(stderr, __location__ " ctdb_set_socketname failed - %s\n", ctdb_errstr(ctdb));
74                 talloc_free(ctdb);
75                 exit(1);
76         }
77
78         ret = ctdb_socket_connect(ctdb);
79         if (ret != 0) {
80                 fprintf(stderr, __location__ " Failed to connect to daemon\n");
81                 talloc_free(ctdb);
82                 return NULL;
83         }
84
85         return ctdb;
86 }
87
88
89 int ctdb_get_fd(struct ctdb_context *ctdb)
90 {
91         return ctdb->daemon.sd;
92 }
93
94 int ctdb_which_events(struct ctdb_context *ctdb)
95 {
96         if (ctdb_queue_length(ctdb->daemon.queue) > 0) {
97                 return POLLIN|POLLOUT;
98         }
99
100         return POLLIN;
101 }
102
103
104
105 /*
106   initialise the ctdb daemon for client applications
107 */
108 struct ctdb_context *ctdb_init(struct event_context *ev)
109 {
110         int ret;
111         struct ctdb_context *ctdb;
112
113         ctdb = talloc_zero(ev, struct ctdb_context);
114         if (ctdb == NULL) {
115                 DEBUG(DEBUG_ERR,(__location__ " talloc_zero failed.\n"));
116                 return NULL;
117         }
118         ctdb->ev  = ev;
119         ctdb->idr = idr_init(ctdb);
120         CTDB_NO_MEMORY_NULL(ctdb, ctdb->idr);
121
122         ret = ctdb_set_socketname(ctdb, CTDB_PATH);
123         if (ret != 0) {
124                 DEBUG(DEBUG_ERR,(__location__ " ctdb_set_socketname failed.\n"));
125                 talloc_free(ctdb);
126                 return NULL;
127         }
128
129
130
131         return ctdb;
132 }
133
134
135
136 /* Ouch.
137  * This is a bit hairy. due to the way ctdbd uses events.
138  * ctdbd quite frequently uses 
139  *     event_add_timed(... timeval_zero() ...)
140  *
141  * for example once it has finished reading off a full pdu off the
142  * domain socket, before calling the actual recdeive function.
143  *
144  * we probably need a new event function to handle these timed events
145  * event_loop_all_queued() or similar
146  */
147 int ctdb_service(struct ctdb_context *ctdb)
148 {
149         int ret;
150         struct timeval t;
151
152         ret = event_loop_once(ctdb->ev);
153         do {
154                 t = common_event_loop_timer_delay(ctdb->ev);
155         } while(timeval_is_zero(&t));
156
157         return 0;
158 }
159
160
161
162 int ctdb_free(ctdb_handle *handle)
163 {
164         talloc_free(handle);
165         return 0;
166 }
167
168 struct ctdb_control_cb_data {
169         void *callback;
170         void *private_data;
171         uint32_t db_id;
172 };
173
174
175
176
177 /*************************
178  * GET PNN of local node *
179  *************************/
180 static void
181 ctdb_getpnn_recv_cb(struct ctdb_client_control_state *state)
182 {
183         struct ctdb_control_cb_data *cb_data = state->async.private_data;
184         ctdb_getpnn_cb callback = (ctdb_getpnn_cb)cb_data->callback;
185
186         if (state->state != CTDB_CONTROL_DONE) {
187                 DEBUG(DEBUG_ERR, (__location__ " ctdb_getpnn_recv_cb failed with state:%d\n", state->state));
188                 callback(-1, 0, cb_data->private_data);
189                 return;
190         }
191
192         callback(0, state->status, cb_data->private_data);
193 }
194
195 ctdb_handle *
196 ctdb_getpnn_send(struct ctdb_context *ctdb,
197                         uint32_t destnode,
198                         ctdb_getpnn_cb callback,
199                         void *private_data)
200 {
201         struct ctdb_client_control_state *state;
202         struct ctdb_control_cb_data *cb_data;
203
204         state = ctdb_control_send(ctdb, destnode, 0, 
205                            CTDB_CONTROL_GET_PNN, 0, tdb_null, 
206                            ctdb, NULL);
207
208         if (state == NULL) {
209                 DEBUG(DEBUG_ERR,(__location__ " Failed to send GET_PNN control\n"));
210                 return NULL;
211         }
212
213         if (callback != NULL) {
214                 cb_data = talloc(state, struct ctdb_control_cb_data);
215                 cb_data->callback     = callback;
216                 cb_data->private_data = private_data;
217
218                 state->async.fn           = ctdb_getpnn_recv_cb;
219                 state->async.private_data = cb_data;
220         }
221
222         return (ctdb_handle *)state;
223 }
224
225 int ctdb_getpnn_recv(struct ctdb_context *ctdb, ctdb_handle *handle, uint32_t *pnn)
226 {
227         struct ctdb_client_control_state *state = talloc_get_type(handle, struct ctdb_client_control_state);
228         int ret;
229         int32_t res;
230
231         ret = ctdb_control_recv(ctdb, state, state, NULL, &res, NULL);
232         if (ret != 0) {
233                 DEBUG(DEBUG_ERR,(__location__ " ctdb_getpnn_recv failed\n"));
234                 return -1;
235         }
236
237         if (pnn != NULL) {
238                 *pnn = (uint32_t)res;
239         }
240
241         return 0;
242 }
243
244 int ctdb_getpnn(struct ctdb_context *ctdb, uint32_t destnode, uint32_t *pnn)
245 {
246         struct ctdb_client_control_state *state;
247         
248         state = ctdb_getpnn_send(ctdb, destnode, NULL, NULL);
249         if (state == NULL) {
250                 DEBUG(DEBUG_ERR,(__location__ " ctdb_getpnn_send() failed.\n"));
251                 return -1;
252         }
253
254         return ctdb_getpnn_recv(ctdb, state, pnn);
255 }
256
257
258
259 /***********************
260  * GET RECOVERY MASTER *
261  ***********************/
262 static void
263 ctdb_getrecmaster_recv_cb(struct ctdb_client_control_state *state)
264 {
265         struct ctdb_control_cb_data *cb_data = state->async.private_data;
266         ctdb_getrecmaster_cb callback = (ctdb_getrecmaster_cb)cb_data->callback;
267
268         if (state->state != CTDB_CONTROL_DONE) {
269                 DEBUG(DEBUG_ERR, (__location__ " ctdb_getrecmaster_recv_cb failed with state:%d\n", state->state));
270                 callback(-1, 0, cb_data->private_data);
271                 return;
272         }
273
274         callback(0, state->status, cb_data->private_data);
275 }
276
277 ctdb_handle *
278 ctdb_getrecmaster_send(struct ctdb_context *ctdb,
279                         uint32_t destnode,
280                         ctdb_getrecmaster_cb callback,
281                         void *private_data)
282 {
283         struct ctdb_client_control_state *state;
284         struct ctdb_control_cb_data *cb_data;
285
286         state = ctdb_control_send(ctdb, destnode, 0, 
287                            CTDB_CONTROL_GET_RECMASTER, 0, tdb_null, 
288                            ctdb, NULL);
289
290         if (state == NULL) {
291                 DEBUG(DEBUG_ERR,(__location__ " Failed to send GET_RECMASTER control\n"));
292                 return NULL;
293         }
294
295         if (callback != NULL) {
296                 cb_data = talloc(state, struct ctdb_control_cb_data);
297                 cb_data->callback     = callback;
298                 cb_data->private_data = private_data;
299
300                 state->async.fn           = ctdb_getrecmaster_recv_cb;
301                 state->async.private_data = cb_data;
302         }
303
304         return (ctdb_handle *)state;
305 }
306
307 int ctdb_getrecmaster_recv(struct ctdb_context *ctdb, ctdb_handle *handle, uint32_t *recmaster)
308 {
309         struct ctdb_client_control_state *state = talloc_get_type(handle, struct ctdb_client_control_state);
310         int ret;
311         int32_t res;
312
313         ret = ctdb_control_recv(ctdb, state, state, NULL, &res, NULL);
314         if (ret != 0) {
315                 DEBUG(DEBUG_ERR,(__location__ " ctdb_getrecmaster_recv failed\n"));
316                 return -1;
317         }
318
319         if (recmaster != NULL) {
320                 *recmaster = (uint32_t)res;
321         }
322
323         return 0;
324 }
325
326 int ctdb_getrecmaster(struct ctdb_context *ctdb, uint32_t destnode, uint32_t *recmaster)
327 {
328         struct ctdb_client_control_state *state;
329         
330         state = ctdb_getrecmaster_send(ctdb, destnode, NULL, NULL);
331         if (state == NULL) {
332                 DEBUG(DEBUG_ERR,(__location__ " ctdb_getrecmaster_send() failed.\n"));
333                 return -1;
334         }
335
336         return ctdb_getrecmaster_recv(ctdb, state, recmaster);
337 }
338
339
340
341
342 static void
343 ctdb_set_message_handler_recv_cb(struct ctdb_client_control_state *state)
344 {
345         struct ctdb_control_cb_data *cb_data = state->async.private_data;
346         ctdb_set_message_handler_cb callback = (ctdb_set_message_handler_cb)cb_data->callback;
347
348         if (state->state != CTDB_CONTROL_DONE) {
349                 DEBUG(DEBUG_ERR, (__location__ " ctdb_getrecmaster_recv_cb failed with state:%d\n", state->state));
350                 callback(-1, cb_data->private_data);
351                 return;
352         }
353
354         callback(state->status, cb_data->private_data);
355 }
356
357
358 /*
359   tell the daemon what messaging srvid we will use, and register the message
360   handler function in the client
361 */
362 ctdb_handle *
363 ctdb_set_message_handler_send(struct ctdb_context *ctdb, uint64_t srvid, 
364                              ctdb_set_message_handler_cb callback,
365                              ctdb_message_fn_t handler,
366                              void *private_data)
367                                     
368 {
369         struct ctdb_client_control_state *state;
370         struct ctdb_control_cb_data *cb_data;
371
372         if (ctdb_register_message_handler(ctdb, ctdb, srvid, handler, private_data) != 0) {
373                 return NULL;
374         }
375
376         state = ctdb_control_send(ctdb, CTDB_CURRENT_NODE, srvid, 
377                            CTDB_CONTROL_REGISTER_SRVID, 0, tdb_null, 
378                            ctdb, NULL);
379
380         if (state == NULL) {
381                 DEBUG(DEBUG_ERR,(__location__ " Failed to send REGISTER_SRVID control\n"));
382                 return NULL;
383         }
384
385         if (callback != NULL) {
386                 cb_data = talloc(state, struct ctdb_control_cb_data);
387                 cb_data->callback     = callback;
388                 cb_data->private_data = private_data;
389
390                 state->async.fn           = ctdb_set_message_handler_recv_cb;
391                 state->async.private_data = cb_data;
392         }
393
394         return (ctdb_handle *)state;
395 }
396
397 int ctdb_set_message_handler_recv(struct ctdb_context *ctdb, ctdb_handle *handle)
398 {
399         struct ctdb_client_control_state *state = talloc_get_type(handle, struct ctdb_client_control_state);
400         int ret;
401         int32_t res;
402
403         ret = ctdb_control_recv(ctdb, state, state, NULL, &res, NULL);
404         if (ret != 0 || res != 0) {
405                 DEBUG(DEBUG_ERR,(__location__ " ctdb_set_message_handler_recv failed\n"));
406                 return -1;
407         }
408
409         return state->status;
410 }
411
412 int ctdb_set_message_handler(struct ctdb_context *ctdb, uint64_t srvid, ctdb_message_fn_t handler, void *private_data)
413 {
414         struct ctdb_client_control_state *state;
415         
416         state = ctdb_set_message_handler_send(ctdb, srvid, NULL, handler, private_data);
417         if (state == NULL) {
418                 DEBUG(DEBUG_ERR,(__location__ " ctdb_set_message_handler_send() failed.\n"));
419                 return -1;
420         }
421
422         return ctdb_set_message_handler_recv(ctdb, state);
423 }
424
425
426
427 static void
428 ctdb_remove_message_handler_recv_cb(struct ctdb_client_control_state *state)
429 {
430         struct ctdb_control_cb_data *cb_data = state->async.private_data;
431         ctdb_remove_message_handler_cb callback = (ctdb_remove_message_handler_cb)cb_data->callback;
432
433         if (state->state != CTDB_CONTROL_DONE) {
434                 DEBUG(DEBUG_ERR, (__location__ " ctdb_getrecmaster_recv_cb failed with state:%d\n", state->state));
435                 callback(-1, cb_data->private_data);
436                 return;
437         }
438
439         callback(state->status, cb_data->private_data);
440 }
441
442
443 ctdb_handle *
444 ctdb_remove_message_handler_send(struct ctdb_context *ctdb, uint64_t srvid, 
445                              ctdb_remove_message_handler_cb callback,
446                              void *private_data)
447                                     
448 {
449         struct ctdb_client_control_state *state;
450         struct ctdb_control_cb_data *cb_data;
451
452         if (ctdb_deregister_message_handler(ctdb, srvid, private_data)) {
453                 return NULL;
454         }
455
456         state = ctdb_control_send(ctdb, CTDB_CURRENT_NODE, srvid, 
457                            CTDB_CONTROL_DEREGISTER_SRVID, 0, tdb_null, 
458                            ctdb, NULL);
459
460         if (state == NULL) {
461                 DEBUG(DEBUG_ERR,(__location__ " Failed to send DEREGISTER_SRVID control\n"));
462                 return NULL;
463         }
464
465         if (callback != NULL) {
466                 cb_data = talloc(state, struct ctdb_control_cb_data);
467                 cb_data->callback     = callback;
468                 cb_data->private_data = private_data;
469
470                 state->async.fn           = ctdb_remove_message_handler_recv_cb;
471                 state->async.private_data = cb_data;
472         }
473
474         return (ctdb_handle *)state;
475 }
476
477
478 int ctdb_remove_message_handler_recv(struct ctdb_context *ctdb, ctdb_handle *handle)
479 {
480         struct ctdb_client_control_state *state = talloc_get_type(handle, struct ctdb_client_control_state);
481         int ret;
482         int32_t res;
483
484         ret = ctdb_control_recv(ctdb, state, state, NULL, &res, NULL);
485         if (ret != 0 || res != 0) {
486                 DEBUG(DEBUG_ERR,(__location__ " ctdb_remove_message_handler_recv failed\n"));
487                 return -1;
488         }
489
490         return state->status;
491 }
492
493 /*
494   tell the daemon we no longer want a srvid
495 */
496 int ctdb_remove_message_handler(struct ctdb_context *ctdb, uint64_t srvid)
497 {
498         struct ctdb_client_control_state *state;
499         
500         state = ctdb_remove_message_handler_send(ctdb, srvid, NULL, NULL);
501         if (state == NULL) {
502                 DEBUG(DEBUG_ERR,(__location__ " ctdb_remove_message_handler_send() failed.\n"));
503                 return -1;
504         }
505
506         return ctdb_remove_message_handler_recv(ctdb, state);
507 }
508
509
510
511
512
513
514
515 /*
516  * Create a database. If the database already exists this is a NOP.
517  */
518 static void
519 ctdb_createdb_recv_cb(struct ctdb_client_control_state *state)
520 {
521         struct ctdb_control_cb_data *cb_data = state->async.private_data;
522         ctdb_createdb_cb callback = (ctdb_createdb_cb)cb_data->callback;
523         uint32_t db_id;
524
525         if (state->state != CTDB_CONTROL_DONE) {
526                 DEBUG(DEBUG_ERR,(__location__ " control failed with state:%d and status:%d\n", state->state, state->status));
527                 callback(-1, 0, cb_data->private_data);
528                 return;
529         }
530
531         if (state->outdata.dsize != sizeof(uint32_t)) {
532                 DEBUG(DEBUG_ERR, (" Wrong size of data returned for CREATEDB control. Got %zd bytes but expected %zd\n", state->outdata.dsize, sizeof(uint32_t)));
533                 callback(-1, 0, cb_data->private_data);
534                 return;
535         }
536
537         db_id = *(uint32_t *)state->outdata.dptr;
538         callback(state->status, db_id, cb_data->private_data);
539 }
540
541
542 ctdb_handle *
543 ctdb_createdb_send(struct ctdb_context *ctdb, uint32_t destnode,
544                    const char *name, int persistent,
545                    uint32_t tdb_flags,
546                    ctdb_createdb_cb callback,
547                    void *private_data)
548                                     
549 {
550         struct ctdb_client_control_state *state;
551         struct ctdb_control_cb_data *cb_data;
552         TDB_DATA data;
553
554         data.dptr = discard_const(name);
555         data.dsize = strlen(name)+1;
556
557         state = ctdb_control_send(ctdb, destnode, tdb_flags,
558                         persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:CTDB_CONTROL_DB_ATTACH,
559                         0, data, 
560                         ctdb, NULL);
561         if (state == NULL) {
562                 DEBUG(DEBUG_ERR,(__location__ " Failed to send CREATEDB control\n"));
563                 return NULL;
564         }
565
566         if (callback != NULL) {
567                 cb_data = talloc(state, struct ctdb_control_cb_data);
568                 cb_data->callback     = callback;
569                 cb_data->private_data = private_data;
570
571                 state->async.fn           = ctdb_createdb_recv_cb;
572                 state->async.private_data = cb_data;
573         }
574
575         return (ctdb_handle *)state;
576 }
577
578
579 int ctdb_createdb_recv(struct ctdb_context *ctdb, ctdb_handle *handle, uint32_t *db_id)
580 {
581         struct ctdb_client_control_state *state = talloc_get_type(handle, struct ctdb_client_control_state);
582         int ret;
583         int32_t res;
584         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
585         TDB_DATA data;
586
587         ret = ctdb_control_recv(ctdb, state, tmp_ctx, &data, &res, NULL);
588         if (ret != 0 || res != 0) {
589                 DEBUG(DEBUG_ERR,(__location__ " ctdb_createdb_recv failed\n"));
590                 talloc_free(tmp_ctx);
591                 return -1;
592         }
593
594         if (data.dsize != sizeof(uint32_t)) {
595                 DEBUG(DEBUG_ERR, (__location__ " Wrong size of returned data. Got %zd bytes but expected %zd\n", data.dsize, sizeof(uint32_t)));
596                 talloc_free(tmp_ctx);
597                 return -1;
598         }
599
600         if (db_id != NULL) {
601                 *db_id = *(uint32_t *)data.dptr;
602         }
603
604         talloc_free(tmp_ctx);
605         return state->status;
606 }
607
608 int ctdb_createdb(struct ctdb_context *ctdb, uint32_t destnode, const char *name, int persistent, uint32_t tdb_flags, uint32_t *db_id)
609 {
610         struct ctdb_client_control_state *state;
611         
612         state = ctdb_createdb_send(ctdb, destnode, name, persistent, tdb_flags, NULL, NULL);
613         if (state == NULL) {
614                 DEBUG(DEBUG_ERR,(__location__ " ctdb_createdb_send() failed.\n"));
615                 return -1;
616         }
617
618         return ctdb_createdb_recv(ctdb, state, db_id);
619 }
620
621
622
623
624
625 /*
626  * find the path to the tdb file of a database
627  */
628 static void
629 ctdb_getdbpath_recv_cb(struct ctdb_client_control_state *state)
630 {
631         struct ctdb_control_cb_data *cb_data = state->async.private_data;
632         ctdb_getdbpath_cb callback = (ctdb_getdbpath_cb)cb_data->callback;
633
634         if (state->state != CTDB_CONTROL_DONE || state->status != 0) {
635                 DEBUG(DEBUG_ERR,(__location__ " control failed with state:%d and status:%d\n", state->state, state->status));
636                 callback(-1, NULL, cb_data->private_data);
637                 return;
638         }
639
640         callback(state->status, talloc_strdup(state->ctdb, (char *)state->outdata.dptr), cb_data->private_data);
641 }
642
643 ctdb_handle *
644 ctdb_getdbpath_send(struct ctdb_context *ctdb, uint32_t destnode,
645                     uint32_t db_id,
646                     ctdb_getdbpath_cb callback,
647                     void *private_data)
648 {
649         struct ctdb_client_control_state *state;
650         struct ctdb_control_cb_data *cb_data;
651         TDB_DATA data;
652
653         data.dptr = (uint8_t *)&db_id;
654         data.dsize = sizeof(uint32_t);
655
656         state = ctdb_control_send(ctdb, destnode, 0,
657                         CTDB_CONTROL_GETDBPATH, 0, data,
658                         ctdb, NULL);
659         if (state == NULL) {
660                 DEBUG(DEBUG_ERR,(__location__ " Failed to send GETDBPATH control\n"));
661                 return NULL;
662         }
663
664         if (callback != NULL) {
665                 cb_data = talloc(state, struct ctdb_control_cb_data);
666                 cb_data->callback     = callback;
667                 cb_data->private_data = private_data;
668
669                 state->async.fn           = ctdb_getdbpath_recv_cb;
670                 state->async.private_data = cb_data;
671         }
672
673         return (ctdb_handle *)state;
674 }
675
676 int ctdb_getdbpath_recv(struct ctdb_context *ctdb, ctdb_handle *handle, const char **path)
677 {
678         struct ctdb_client_control_state *state = talloc_get_type(handle, struct ctdb_client_control_state);
679         int ret;
680         int32_t res;
681         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
682         TDB_DATA data;
683
684         if (path) {
685                 *path = NULL;
686         }
687
688         ret = ctdb_control_recv(ctdb, state, tmp_ctx, &data, &res, NULL);
689         if (ret != 0 || res != 0) {
690                 DEBUG(DEBUG_ERR,(__location__ " ctdb_getdbpath_recv failed\n"));
691                 talloc_free(tmp_ctx);
692                 return -1;
693         }
694
695         if (state->status == 0 && path != NULL) {
696                 *path = talloc_strdup(ctdb, (char *)data.dptr);
697         }
698
699         talloc_free(tmp_ctx);
700         return state->status;
701 }
702
703 int ctdb_getdbpath(struct ctdb_context *ctdb, uint32_t destnode,
704                    uint32_t db_id,
705                    const char **path)
706 {
707         struct ctdb_client_control_state *state;
708         
709         state = ctdb_getdbpath_send(ctdb, destnode, db_id, NULL, NULL);
710         if (state == NULL) {
711                 DEBUG(DEBUG_ERR,(__location__ " ctdb_getdbpath_send() failed.\n"));
712                 return -1;
713         }
714
715         return ctdb_getdbpath_recv(ctdb, state, path);
716 }
717
718
719
720 /*
721  * Attach to a database
722  */
723
724 struct ctdb_attachdb_state {
725         enum control_state state;
726         struct ctdb_context *ctdb;
727         struct ctdb_db_context *ctdb_db;
728         struct ctdb_client_control_state *cdb_state;
729         struct ctdb_client_control_state *gdp_state;
730         bool persistent;
731         uint32_t tdb_flags;
732
733         ctdb_attachdb_cb callback;
734         void *private_data;
735 };      
736
737
738 static void
739 ctdb_attachdb_recv2_cb(struct ctdb_client_control_state *state)
740 {
741         struct ctdb_attachdb_state *adb_state = 
742                         talloc_get_type(state->async.private_data,
743                                         struct ctdb_attachdb_state);
744         ctdb_attachdb_cb callback = (ctdb_attachdb_cb)adb_state->callback;
745
746         struct ctdb_db_context *ctdb_db =
747                         talloc_get_type(adb_state->ctdb_db,
748                                         struct ctdb_db_context);
749         struct ctdb_context *ctdb = 
750                         talloc_get_type(ctdb_db->ctdb,
751                         struct ctdb_context);
752
753         uint32_t tdb_flags = adb_state->tdb_flags;
754         bool persistent = adb_state->persistent;
755
756         if (state->state != CTDB_CONTROL_DONE || state->status != 0) {
757                 DEBUG(DEBUG_ERR,(__location__ " getdbpath control failed with state:%d and status:%d\n", state->state, state->status));
758                 adb_state->state = CTDB_CONTROL_ERROR;
759                 if (callback != NULL) {
760                         callback(-1, adb_state, NULL, adb_state->private_data);
761                 }
762                 return;
763         }
764         ctdb_db->db_path = talloc_strdup(ctdb_db, (char *)state->outdata.dptr);
765
766         tdb_flags = persistent?TDB_DEFAULT:TDB_NOSYNC;
767         if (ctdb->valgrinding) {
768                 tdb_flags |= TDB_NOMMAP;
769         }
770         tdb_flags |= TDB_DISALLOW_NESTING;
771
772         ctdb_db->ltdb = tdb_wrap_open(ctdb, ctdb_db->db_path, 0, tdb_flags, O_RDWR, 0);
773         if (ctdb_db->ltdb == NULL) {
774                 DEBUG(DEBUG_ERR, (__location__ " Failed to open tdb '%s'\n", ctdb_db->db_path));
775                 adb_state->state = CTDB_CONTROL_ERROR;
776                 if (callback != NULL) {
777                         callback(-1, adb_state, NULL, adb_state->private_data);
778                 }
779                 return;
780         }
781
782         ctdb_db->persistent = persistent;
783
784         DLIST_ADD(ctdb->db_list, ctdb_db);
785
786         /* add well known functions */
787         ctdb_set_call(ctdb_db, ctdb_null_func, CTDB_NULL_FUNC);
788         ctdb_set_call(ctdb_db, ctdb_fetch_func, CTDB_FETCH_FUNC);
789
790
791         talloc_steal(ctdb, ctdb_db);
792
793         adb_state->state = CTDB_CONTROL_DONE;
794         if (callback != NULL) {
795                 callback(0, adb_state, ctdb_db, adb_state->private_data);
796         }
797 }
798
799 static void
800 ctdb_attachdb_recv1_cb(struct ctdb_client_control_state *state)
801 {
802         struct ctdb_attachdb_state *adb_state =
803                         talloc_get_type(state->async.private_data,
804                         struct ctdb_attachdb_state);
805         ctdb_attachdb_cb callback = (ctdb_attachdb_cb)adb_state->callback;
806
807         if (state->state != CTDB_CONTROL_DONE) {
808                 DEBUG(DEBUG_ERR,(__location__ " createdb control failed with state:%d and status:%d\n", state->state, state->status));
809                 adb_state->state = CTDB_CONTROL_ERROR;
810                 if (callback != NULL) {
811                         callback(-1, adb_state, NULL, adb_state->private_data);
812                 }
813                 return;
814         }
815
816         if (state->outdata.dsize != sizeof(uint32_t)) {
817                 DEBUG(DEBUG_ERR, (__location__ " Wrong size of data returned for CREATEDB control. Got %zd bytes but expected %zd\n", state->outdata.dsize, sizeof(uint32_t)));
818                 adb_state->state = CTDB_CONTROL_ERROR;
819                 if (callback != NULL) {
820                         callback(-1, adb_state, NULL, adb_state->private_data);
821                 }
822                 return;
823         }
824
825         adb_state->ctdb_db->db_id = *(uint32_t *)state->outdata.dptr;
826
827         adb_state->gdp_state = ctdb_getdbpath_send(adb_state->ctdb, CTDB_CURRENT_NODE, adb_state->ctdb_db->db_id, NULL, NULL);
828         if (state == NULL) {
829                 DEBUG(DEBUG_ERR,(__location__ " ctdb_getdbpath_send() failed.\n"));
830                 adb_state->state = CTDB_CONTROL_ERROR;
831                 if (callback != NULL) {
832                         callback(-1, adb_state, NULL, adb_state->private_data);
833                 }
834                 return;
835         }
836         talloc_steal(adb_state, adb_state->gdp_state);
837
838         adb_state->gdp_state->async.fn           = ctdb_attachdb_recv2_cb;
839         adb_state->gdp_state->async.private_data = adb_state;
840 }
841
842 ctdb_handle *
843 ctdb_attachdb_send(struct ctdb_context *ctdb,
844                    const char *name, int persistent, uint32_t tdb_flags,
845                    ctdb_attachdb_cb callback,
846                    void *private_data)
847 {
848         struct ctdb_attachdb_state *state;
849         struct ctdb_db_context *ctdb_db;
850
851         ctdb_db = ctdb_db_handle(ctdb, name);
852         if (ctdb_db != NULL) {
853                 DEBUG(DEBUG_ERR, (__location__ " There is already a database with this name. Can not attach twice.\n"));
854                 return NULL;
855         }
856
857         state = talloc_zero(ctdb, struct ctdb_attachdb_state);
858         if (state == NULL) {
859                 DEBUG(DEBUG_ERR,(__location__ " Failed to allocate attachdb_state\n"));
860                 return NULL;
861         }
862
863         ctdb_db = talloc_zero(state, struct ctdb_db_context);
864         if (ctdb_db == NULL) {
865                 DEBUG(DEBUG_ERR, (__location__ " Failed to allocate ctdb db context\n"));
866                 talloc_free(state);
867                 return NULL;
868         }
869
870         state->state      = CTDB_CONTROL_WAIT;
871         state->ctdb       = ctdb;       
872         state->ctdb_db    = ctdb_db;
873         state->tdb_flags  = tdb_flags;
874         state->persistent = persistent?True:False;
875
876         ctdb_db->ctdb    = ctdb;
877         ctdb_db->db_name = talloc_strdup(ctdb_db, name);
878         if (ctdb_db->db_name == NULL) {
879                 DEBUG(DEBUG_ERR, (__location__ " Failed to strdup db name\n"));
880                 talloc_free(state);
881                 return NULL;
882         }
883
884         state->cdb_state = ctdb_createdb_send(ctdb, CTDB_CURRENT_NODE,
885                                     name, persistent, tdb_flags,
886                                     NULL, NULL);
887         if (state->cdb_state == NULL) {
888                 DEBUG(DEBUG_ERR, (__location__ " Failed to send CREATEDB control\n"));
889                 talloc_free(ctdb_db);
890                 return NULL;
891         }
892         talloc_steal(state, state->cdb_state);
893
894         state->cdb_state->async.fn           = ctdb_attachdb_recv1_cb;
895         state->cdb_state->async.private_data = state;
896
897         if (callback != NULL) {
898                 state->callback     = callback;
899                 state->private_data = private_data;
900         }
901
902         return (ctdb_handle *)state;
903 }
904
905 int ctdb_attachdb_recv(struct ctdb_context *ctdb,
906                        ctdb_handle *handle, struct ctdb_db_context **ctdb_db)
907 {
908         struct ctdb_attachdb_state *state = talloc_get_type(handle, struct ctdb_attachdb_state);
909
910         while (state->state == CTDB_CONTROL_WAIT) {
911                 event_loop_once(ctdb->ev);
912         }
913
914         if (state->state != CTDB_CONTROL_DONE) {
915                 if (ctdb_db != NULL) {
916                         *ctdb_db = NULL;
917                 }
918                 return -1;
919         }
920
921         if (ctdb_db != NULL) {
922                 *ctdb_db = state->ctdb_db;
923         }
924
925         talloc_free(state);
926         return 0;
927 }
928
929 int ctdb_attachdb(struct ctdb_context *ctdb,
930                   const char *name, int persistent, uint32_t tdb_flags,
931                   struct ctdb_db_context **ctdb_db)
932 {
933         ctdb_handle *handle;
934         int ret;
935
936         handle = ctdb_attachdb_send(ctdb, name, persistent, tdb_flags, NULL, NULL);
937         if (handle == NULL) {
938                 DEBUG(DEBUG_ERR, (__location__ " Failed to send attachdb control\n"));
939                 return -1;
940         }
941
942         ret = ctdb_attachdb_recv(ctdb, handle, ctdb_db);
943         if (ret != 0) {
944                 DEBUG(DEBUG_ERR, (__location__ " receive of attachdb reply failed\n"));
945                 return -1;
946         }
947
948         return 0;
949 }
950
951
952
953
954 /*
955  * read a record from the database
956  *
957  * the returned data must be freed by using ctdb_free()
958  */
959 struct ctdb_call_cb_data {
960         struct ctdb_db_context *ctdb_db;
961         struct ctdb_record_handle *h;
962         TDB_DATA data;
963
964         void *callback;
965         void *private_data;
966 };
967
968 /* These are not proper async methods yet */
969 ctdb_handle *
970 ctdb_readrecordlock_send(struct ctdb_context *ctdb,
971                 struct ctdb_db_context *ctdb_db,
972                 TDB_DATA key,
973                 ctdb_readrecordlock_cb callback,
974                 void *private_data)
975 {
976         struct ctdb_call_cb_data *state;
977         TDB_DATA data;
978
979         state = talloc_zero(ctdb, struct ctdb_call_cb_data);
980         if (state == NULL) {
981                 DEBUG(DEBUG_ERR, (__location__ " Failed to send CALL\n"));
982                 callback(-1, NULL, tdb_null, private_data);
983                 return NULL;
984         }
985
986         state->h = ctdb_fetch_lock(ctdb_db, state, key, &data);
987
988         /* since this is fake async, invoke the callback immediately if set */
989         if (callback != NULL) {
990                 callback(0, state, data, private_data);
991                 return state;
992         }
993
994         return state;
995 }
996
997 int ctdb_readrecord_recv(struct ctdb_context *ctdb,
998                 ctdb_handle *handle,
999                 TDB_DATA **data);
1000 /*qqq*/
1001 int ctdb_readrecord(struct ctdb_context *ctdb,
1002                 struct ctdb_db_context *ctdb_db_context,
1003                 TDB_DATA key,
1004                 TDB_DATA **data);
1005 /*qqq*/
1006
1007
1008
1009 int ctdb_writerecord(ctdb_handle *handle,
1010                 TDB_DATA key,
1011                 TDB_DATA data)
1012 {
1013         struct ctdb_call_cb_data *state = talloc_get_type(handle,
1014                                 struct ctdb_call_cb_data);
1015
1016         struct ctdb_record_handle *h = talloc_get_type(state->h,
1017                                 struct ctdb_record_handle);
1018
1019         if (h->ctdb_db->persistent) {
1020                 DEBUG(DEBUG_ERR, (__location__ " ctdb_record_store prohibited for persistent dbs\n"));
1021                 return -1;
1022         }
1023
1024         return ctdb_ltdb_store(h->ctdb_db, h->key, &h->header, data);
1025 }