ctdb-client: Add async api for detaching a database
[metze/samba/wip.git] / ctdb / client / client_db.c
1 /*
2    CTDB client code
3
4    Copyright (C) Amitay Isaacs  2015
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "replace.h"
21 #include "system/network.h"
22 #include "system/filesys.h"
23
24 #include <talloc.h>
25 #include <tevent.h>
26 #include <tdb.h>
27
28 #include "common/logging.h"
29
30 #include "lib/tdb_wrap/tdb_wrap.h"
31 #include "lib/util/tevent_unix.h"
32 #include "lib/util/dlinklist.h"
33 #include "lib/util/debug.h"
34
35 #include "protocol/protocol.h"
36 #include "protocol/protocol_api.h"
37 #include "client/client_private.h"
38 #include "client/client.h"
39
40 static struct ctdb_db_context *client_db_handle(
41                                         struct ctdb_client_context *client,
42                                         const char *db_name)
43 {
44         struct ctdb_db_context *db;
45
46         for (db = client->db; db != NULL; db = db->next) {
47                 if (strcmp(db_name, db->db_name) == 0) {
48                         return db;
49                 }
50         }
51
52         return NULL;
53 }
54
55 struct ctdb_set_db_flags_state {
56         struct tevent_context *ev;
57         struct ctdb_client_context *client;
58         struct timeval timeout;
59         uint32_t db_id;
60         uint8_t db_flags;
61         bool readonly_done, sticky_done;
62         uint32_t *pnn_list;
63         int count;
64 };
65
66 static void ctdb_set_db_flags_nodemap_done(struct tevent_req *subreq);
67 static void ctdb_set_db_flags_readonly_done(struct tevent_req *subreq);
68 static void ctdb_set_db_flags_sticky_done(struct tevent_req *subreq);
69
70 static struct tevent_req *ctdb_set_db_flags_send(
71                                 TALLOC_CTX *mem_ctx,
72                                 struct tevent_context *ev,
73                                 struct ctdb_client_context *client,
74                                 uint32_t destnode, struct timeval timeout,
75                                 uint32_t db_id, uint8_t db_flags)
76 {
77         struct tevent_req *req, *subreq;
78         struct ctdb_set_db_flags_state *state;
79         struct ctdb_req_control request;
80
81         req = tevent_req_create(mem_ctx, &state,
82                                 struct ctdb_set_db_flags_state);
83         if (req == NULL) {
84                 return NULL;
85         }
86
87         if (! (db_flags & (CTDB_DB_FLAGS_READONLY | CTDB_DB_FLAGS_STICKY))) {
88                 tevent_req_done(req);
89                 return tevent_req_post(req, ev);
90         }
91
92         state->ev = ev;
93         state->client = client;
94         state->timeout = timeout;
95         state->db_id = db_id;
96         state->db_flags = db_flags;
97
98         ctdb_req_control_get_nodemap(&request);
99         subreq = ctdb_client_control_send(state, ev, client, destnode, timeout,
100                                           &request);
101         if (tevent_req_nomem(subreq, req)) {
102                 return tevent_req_post(req, ev);
103         }
104         tevent_req_set_callback(subreq, ctdb_set_db_flags_nodemap_done, req);
105
106         return req;
107 }
108
109 static void ctdb_set_db_flags_nodemap_done(struct tevent_req *subreq)
110 {
111         struct tevent_req *req = tevent_req_callback_data(
112                 subreq, struct tevent_req);
113         struct ctdb_set_db_flags_state *state = tevent_req_data(
114                 req, struct ctdb_set_db_flags_state);
115         struct ctdb_req_control request;
116         struct ctdb_reply_control *reply;
117         struct ctdb_node_map *nodemap;
118         int ret;
119         bool status;
120
121         status = ctdb_client_control_recv(subreq, &ret, state, &reply);
122         TALLOC_FREE(subreq);
123         if (! status) {
124                 DEBUG(DEBUG_ERR,
125                       ("set_db_flags: 0x%08x GET_NODEMAP failed, ret=%d\n",
126                        state->db_id, ret));
127                 tevent_req_error(req, ret);
128                 return;
129         }
130
131         ret = ctdb_reply_control_get_nodemap(reply, state, &nodemap);
132         talloc_free(reply);
133         if (ret != 0) {
134                 DEBUG(DEBUG_ERR,
135                       ("set_db_flags: 0x%08x GET_NODEMAP parse failed, ret=%d\n",
136                       state->db_id, ret));
137                 tevent_req_error(req, ret);
138                 return;
139         }
140
141         state->count = list_of_connected_nodes(nodemap, CTDB_UNKNOWN_PNN,
142                                                state, &state->pnn_list);
143         talloc_free(nodemap);
144         if (state->count <= 0) {
145                 DEBUG(DEBUG_ERR,
146                       ("set_db_flags: 0x%08x no connected nodes, count=%d\n",
147                        state->db_id, state->count));
148                 tevent_req_error(req, ENOMEM);
149                 return;
150         }
151
152         if (state->db_flags & CTDB_DB_FLAGS_READONLY) {
153                 ctdb_req_control_set_db_readonly(&request, state->db_id);
154                 subreq = ctdb_client_control_multi_send(
155                                         state, state->ev, state->client,
156                                         state->pnn_list, state->count,
157                                         state->timeout, &request);
158                 if (tevent_req_nomem(subreq, req)) {
159                         return;
160                 }
161                 tevent_req_set_callback(subreq,
162                                         ctdb_set_db_flags_readonly_done, req);
163         } else {
164                 state->readonly_done = true;
165         }
166
167         if (state->db_flags & CTDB_DB_FLAGS_STICKY) {
168                 ctdb_req_control_set_db_sticky(&request, state->db_id);
169                 subreq = ctdb_client_control_multi_send(
170                                         state, state->ev, state->client,
171                                         state->pnn_list, state->count,
172                                         state->timeout, &request);
173                 if (tevent_req_nomem(subreq, req)) {
174                         return;
175                 }
176                 tevent_req_set_callback(subreq, ctdb_set_db_flags_sticky_done,
177                                         req);
178         } else {
179                 state->sticky_done = true;
180         }
181 }
182
183 static void ctdb_set_db_flags_readonly_done(struct tevent_req *subreq)
184 {
185         struct tevent_req *req = tevent_req_callback_data(
186                 subreq, struct tevent_req);
187         struct ctdb_set_db_flags_state *state = tevent_req_data(
188                 req, struct ctdb_set_db_flags_state);
189         int ret;
190         bool status;
191
192         status = ctdb_client_control_multi_recv(subreq, &ret, NULL, NULL,
193                                                 NULL);
194         TALLOC_FREE(subreq);
195         if (! status) {
196                 DEBUG(DEBUG_ERR,
197                       ("set_db_flags: 0x%08x SET_DB_READONLY failed, ret=%d\n",
198                        state->db_id, ret));
199                 tevent_req_error(req, ret);
200                 return;
201         }
202
203         state->readonly_done = true;
204
205         if (state->readonly_done && state->sticky_done) {
206                 tevent_req_done(req);
207         }
208 }
209
210 static void ctdb_set_db_flags_sticky_done(struct tevent_req *subreq)
211 {
212         struct tevent_req *req = tevent_req_callback_data(
213                 subreq, struct tevent_req);
214         struct ctdb_set_db_flags_state *state = tevent_req_data(
215                 req, struct ctdb_set_db_flags_state);
216         int ret;
217         bool status;
218
219         status = ctdb_client_control_multi_recv(subreq, &ret, NULL, NULL,
220                                                 NULL);
221         TALLOC_FREE(subreq);
222         if (! status) {
223                 DEBUG(DEBUG_ERR,
224                       ("set_db_flags: 0x%08x SET_DB_STICKY failed, ret=%d\n",
225                        state->db_id, ret));
226                 tevent_req_error(req, ret);
227                 return;
228         }
229
230         state->sticky_done = true;
231
232         if (state->readonly_done && state->sticky_done) {
233                 tevent_req_done(req);
234         }
235 }
236
237 static bool ctdb_set_db_flags_recv(struct tevent_req *req, int *perr)
238 {
239         int err;
240
241         if (tevent_req_is_unix_error(req, &err)) {
242                 if (perr != NULL) {
243                         *perr = err;
244                 }
245                 return false;
246         }
247         return true;
248 }
249
250 struct ctdb_attach_state {
251         struct tevent_context *ev;
252         struct ctdb_client_context *client;
253         struct timeval timeout;
254         uint32_t destnode;
255         uint8_t db_flags;
256         uint32_t tdb_flags;
257         struct ctdb_db_context *db;
258 };
259
260 static void ctdb_attach_mutex_done(struct tevent_req *subreq);
261 static void ctdb_attach_dbid_done(struct tevent_req *subreq);
262 static void ctdb_attach_dbpath_done(struct tevent_req *subreq);
263 static void ctdb_attach_health_done(struct tevent_req *subreq);
264 static void ctdb_attach_flags_done(struct tevent_req *subreq);
265
266 struct tevent_req *ctdb_attach_send(TALLOC_CTX *mem_ctx,
267                                     struct tevent_context *ev,
268                                     struct ctdb_client_context *client,
269                                     struct timeval timeout,
270                                     const char *db_name, uint8_t db_flags)
271 {
272         struct tevent_req *req, *subreq;
273         struct ctdb_attach_state *state;
274         struct ctdb_req_control request;
275
276         req = tevent_req_create(mem_ctx, &state, struct ctdb_attach_state);
277         if (req == NULL) {
278                 return NULL;
279         }
280
281         state->db = client_db_handle(client, db_name);
282         if (state->db != NULL) {
283                 tevent_req_done(req);
284                 return tevent_req_post(req, ev);
285         }
286
287         state->ev = ev;
288         state->client = client;
289         state->timeout = timeout;
290         state->destnode = ctdb_client_pnn(client);
291         state->db_flags = db_flags;
292
293         state->db = talloc_zero(client, struct ctdb_db_context);
294         if (tevent_req_nomem(state->db, req)) {
295                 return tevent_req_post(req, ev);
296         }
297
298         state->db->db_name = talloc_strdup(state->db, db_name);
299         if (tevent_req_nomem(state->db, req)) {
300                 return tevent_req_post(req, ev);
301         }
302
303         if (db_flags & CTDB_DB_FLAGS_PERSISTENT) {
304                 state->db->persistent = true;
305         }
306
307         ctdb_req_control_get_tunable(&request, "TDBMutexEnabled");
308         subreq = ctdb_client_control_send(state, ev, client,
309                                           ctdb_client_pnn(client), timeout,
310                                           &request);
311         if (tevent_req_nomem(subreq, req)) {
312                 return tevent_req_post(req, ev);
313         }
314         tevent_req_set_callback(subreq, ctdb_attach_mutex_done, req);
315
316         return req;
317 }
318
319 static void ctdb_attach_mutex_done(struct tevent_req *subreq)
320 {
321         struct tevent_req *req = tevent_req_callback_data(
322                 subreq, struct tevent_req);
323         struct ctdb_attach_state *state = tevent_req_data(
324                 req, struct ctdb_attach_state);
325         struct ctdb_reply_control *reply;
326         struct ctdb_req_control request;
327         uint32_t mutex_enabled;
328         int ret;
329         bool status;
330
331         status = ctdb_client_control_recv(subreq, &ret, state, &reply);
332         TALLOC_FREE(subreq);
333         if (! status) {
334                 DEBUG(DEBUG_ERR, ("attach: %s GET_TUNABLE failed, ret=%d\n",
335                                   state->db->db_name, ret));
336                 tevent_req_error(req, ret);
337                 return;
338         }
339
340         ret = ctdb_reply_control_get_tunable(reply, &mutex_enabled);
341         if (ret != 0) {
342                 /* Treat error as mutex support not available */
343                 mutex_enabled = 0;
344         }
345
346         if (state->db->persistent) {
347                 state->tdb_flags = TDB_DEFAULT;
348         } else {
349                 state->tdb_flags = (TDB_NOSYNC | TDB_INCOMPATIBLE_HASH |
350                                     TDB_CLEAR_IF_FIRST);
351                 if (mutex_enabled == 1) {
352                         state->tdb_flags |= TDB_MUTEX_LOCKING;
353                 }
354         }
355
356         if (state->db->persistent) {
357                 ctdb_req_control_db_attach_persistent(&request,
358                                                       state->db->db_name,
359                                                       state->tdb_flags);
360         } else {
361                 ctdb_req_control_db_attach(&request, state->db->db_name,
362                                            state->tdb_flags);
363         }
364
365         subreq = ctdb_client_control_send(state, state->ev, state->client,
366                                           state->destnode, state->timeout,
367                                           &request);
368         if (tevent_req_nomem(subreq, req)) {
369                 return;
370         }
371         tevent_req_set_callback(subreq, ctdb_attach_dbid_done, req);
372 }
373
374 static void ctdb_attach_dbid_done(struct tevent_req *subreq)
375 {
376         struct tevent_req *req = tevent_req_callback_data(
377                 subreq, struct tevent_req);
378         struct ctdb_attach_state *state = tevent_req_data(
379                 req, struct ctdb_attach_state);
380         struct ctdb_req_control request;
381         struct ctdb_reply_control *reply;
382         bool status;
383         int ret;
384
385         status = ctdb_client_control_recv(subreq, &ret, state, &reply);
386         TALLOC_FREE(subreq);
387         if (! status) {
388                 DEBUG(DEBUG_ERR, ("attach: %s %s failed, ret=%d\n",
389                                   state->db->db_name,
390                                   (state->db->persistent
391                                         ? "DB_ATTACH_PERSISTENT"
392                                         : "DB_ATTACH"),
393                                   ret));
394                 tevent_req_error(req, ret);
395                 return;
396         }
397
398         if (state->db->persistent) {
399                 ret = ctdb_reply_control_db_attach_persistent(
400                                 reply, &state->db->db_id);
401         } else {
402                 ret = ctdb_reply_control_db_attach(reply, &state->db->db_id);
403         }
404         talloc_free(reply);
405         if (ret != 0) {
406                 DEBUG(DEBUG_ERR, ("attach: %s failed to get db_id, ret=%d\n",
407                                   state->db->db_name, ret));
408                 tevent_req_error(req, ret);
409                 return;
410         }
411
412         ctdb_req_control_getdbpath(&request, state->db->db_id);
413         subreq = ctdb_client_control_send(state, state->ev, state->client,
414                                           state->destnode, state->timeout,
415                                           &request);
416         if (tevent_req_nomem(subreq, req)) {
417                 return;
418         }
419         tevent_req_set_callback(subreq, ctdb_attach_dbpath_done, req);
420 }
421
422 static void ctdb_attach_dbpath_done(struct tevent_req *subreq)
423 {
424         struct tevent_req *req = tevent_req_callback_data(
425                 subreq, struct tevent_req);
426         struct ctdb_attach_state *state = tevent_req_data(
427                 req, struct ctdb_attach_state);
428         struct ctdb_reply_control *reply;
429         struct ctdb_req_control request;
430         bool status;
431         int ret;
432
433         status = ctdb_client_control_recv(subreq, &ret, state, &reply);
434         TALLOC_FREE(subreq);
435         if (! status) {
436                 DEBUG(DEBUG_ERR, ("attach: %s GETDBPATH failed, ret=%d\n",
437                                   state->db->db_name, ret));
438                 tevent_req_error(req, ret);
439                 return;
440         }
441
442         ret = ctdb_reply_control_getdbpath(reply, state->db,
443                                            &state->db->db_path);
444         talloc_free(reply);
445         if (ret != 0) {
446                 DEBUG(DEBUG_ERR, ("attach: %s GETDBPATH parse failed, ret=%d\n",
447                                   state->db->db_name, ret));
448                 tevent_req_error(req, ret);
449                 return;
450         }
451
452         ctdb_req_control_db_get_health(&request, state->db->db_id);
453         subreq = ctdb_client_control_send(state, state->ev, state->client,
454                                           state->destnode, state->timeout,
455                                           &request);
456         if (tevent_req_nomem(subreq, req)) {
457                 return;
458         }
459         tevent_req_set_callback(subreq, ctdb_attach_health_done, req);
460 }
461
462 static void ctdb_attach_health_done(struct tevent_req *subreq)
463 {
464         struct tevent_req *req = tevent_req_callback_data(
465                 subreq, struct tevent_req);
466         struct ctdb_attach_state *state = tevent_req_data(
467                 req, struct ctdb_attach_state);
468         struct ctdb_reply_control *reply;
469         const char *reason;
470         bool status;
471         int ret;
472
473         status = ctdb_client_control_recv(subreq, &ret, state, &reply);
474         TALLOC_FREE(subreq);
475         if (! status) {
476                 DEBUG(DEBUG_ERR, ("attach: %s DB_GET_HEALTH failed, ret=%d\n",
477                                   state->db->db_name, ret));
478                 tevent_req_error(req, ret);
479                 return;
480         }
481
482         ret = ctdb_reply_control_db_get_health(reply, state, &reason);
483         if (ret != 0) {
484                 DEBUG(DEBUG_ERR,
485                       ("attach: %s DB_GET_HEALTH parse failed, ret=%d\n",
486                        state->db->db_name, ret));
487                 tevent_req_error(req, ret);
488                 return;
489         }
490
491         if (reason != NULL) {
492                 /* Database unhealthy, avoid attach */
493                 DEBUG(DEBUG_ERR, ("attach: %s database unhealthy (%s)\n",
494                                   state->db->db_name, reason));
495                 tevent_req_error(req, EIO);
496                 return;
497         }
498
499         subreq = ctdb_set_db_flags_send(state, state->ev, state->client,
500                                         state->destnode, state->timeout,
501                                         state->db->db_id, state->db_flags);
502         if (tevent_req_nomem(subreq, req)) {
503                 return;
504         }
505         tevent_req_set_callback(subreq, ctdb_attach_flags_done, req);
506 }
507
508 static void ctdb_attach_flags_done(struct tevent_req *subreq)
509 {
510         struct tevent_req *req = tevent_req_callback_data(
511                 subreq, struct tevent_req);
512         struct ctdb_attach_state *state = tevent_req_data(
513                 req, struct ctdb_attach_state);
514         bool status;
515         int ret;
516
517         status = ctdb_set_db_flags_recv(subreq, &ret);
518         TALLOC_FREE(subreq);
519         if (! status) {
520                 DEBUG(DEBUG_ERR, ("attach: %s set db flags 0x%08x failed\n",
521                                   state->db->db_name, state->db_flags));
522                 tevent_req_error(req, ret);
523                 return;
524         }
525
526         state->db->ltdb = tdb_wrap_open(state->db, state->db->db_path, 0,
527                                         state->tdb_flags, O_RDWR, 0);
528         if (tevent_req_nomem(state->db->ltdb, req)) {
529                 DEBUG(DEBUG_ERR, ("attach: %s tdb_wrap_open failed\n",
530                                   state->db->db_name));
531                 return;
532         }
533         DLIST_ADD(state->client->db, state->db);
534
535         tevent_req_done(req);
536 }
537
538 bool ctdb_attach_recv(struct tevent_req *req, int *perr,
539                       struct ctdb_db_context **out)
540 {
541         struct ctdb_attach_state *state = tevent_req_data(
542                 req, struct ctdb_attach_state);
543         int err;
544
545         if (tevent_req_is_unix_error(req, &err)) {
546                 if (perr != NULL) {
547                         *perr = err;
548                 }
549                 return false;
550         }
551
552         if (out != NULL) {
553                 *out = state->db;
554         }
555         return true;
556 }
557
558 int ctdb_attach(struct tevent_context *ev,
559                 struct ctdb_client_context *client,
560                 struct timeval timeout,
561                 const char *db_name, uint8_t db_flags,
562                 struct ctdb_db_context **out)
563 {
564         TALLOC_CTX *mem_ctx;
565         struct tevent_req *req;
566         bool status;
567         int ret;
568
569         mem_ctx = talloc_new(client);
570         if (mem_ctx == NULL) {
571                 return ENOMEM;
572         }
573
574         req = ctdb_attach_send(mem_ctx, ev, client, timeout,
575                                db_name, db_flags);
576         if (req == NULL) {
577                 talloc_free(mem_ctx);
578                 return ENOMEM;
579         }
580
581         tevent_req_poll(req, ev);
582
583         status = ctdb_attach_recv(req, &ret, out);
584         if (! status) {
585                 talloc_free(mem_ctx);
586                 return ret;
587         }
588
589         /*
590         ctdb_set_call(db, CTDB_NULL_FUNC, ctdb_null_func);
591         ctdb_set_call(db, CTDB_FETCH_FUNC, ctdb_fetch_func);
592         ctdb_set_call(db, CTDB_FETCH_WITH_HEADER_FUNC, ctdb_fetch_with_header_func);
593         */
594
595         talloc_free(mem_ctx);
596         return 0;
597 }
598
599 struct ctdb_detach_state {
600         struct ctdb_client_context *client;
601         struct tevent_context *ev;
602         struct timeval timeout;
603         uint32_t db_id;
604         const char *db_name;
605 };
606
607 static void ctdb_detach_dbname_done(struct tevent_req *subreq);
608 static void ctdb_detach_done(struct tevent_req *subreq);
609
610 struct tevent_req *ctdb_detach_send(TALLOC_CTX *mem_ctx,
611                                     struct tevent_context *ev,
612                                     struct ctdb_client_context *client,
613                                     struct timeval timeout, uint32_t db_id)
614 {
615         struct tevent_req *req, *subreq;
616         struct ctdb_detach_state *state;
617         struct ctdb_req_control request;
618
619         req = tevent_req_create(mem_ctx, &state, struct ctdb_detach_state);
620         if (req == NULL) {
621                 return NULL;
622         }
623
624         state->client = client;
625         state->ev = ev;
626         state->timeout = timeout;
627         state->db_id = db_id;
628
629         ctdb_req_control_get_dbname(&request, db_id);
630         subreq = ctdb_client_control_send(state, ev, client,
631                                           ctdb_client_pnn(client), timeout,
632                                           &request);
633         if (tevent_req_nomem(subreq, req)) {
634                 return tevent_req_post(req, ev);
635         }
636         tevent_req_set_callback(subreq, ctdb_detach_dbname_done, req);
637
638         return req;
639 }
640
641 static void ctdb_detach_dbname_done(struct tevent_req *subreq)
642 {
643         struct tevent_req *req = tevent_req_callback_data(
644                 subreq, struct tevent_req);
645         struct ctdb_detach_state *state = tevent_req_data(
646                 req, struct ctdb_detach_state);
647         struct ctdb_reply_control *reply;
648         struct ctdb_req_control request;
649         int ret;
650         bool status;
651
652         status = ctdb_client_control_recv(subreq, &ret, state, &reply);
653         TALLOC_FREE(subreq);
654         if (! status) {
655                 DEBUG(DEBUG_ERR, ("detach: 0x%x GET_DBNAME failed, ret=%d\n",
656                                   state->db_id, ret));
657                 tevent_req_error(req, ret);
658                 return;
659         }
660
661         ret = ctdb_reply_control_get_dbname(reply, state, &state->db_name);
662         if (ret != 0) {
663                 DEBUG(DEBUG_ERR, ("detach: 0x%x GET_DBNAME failed, ret=%d\n",
664                                   state->db_id, ret));
665                 tevent_req_error(req, ret);
666                 return;
667         }
668
669         ctdb_req_control_db_detach(&request, state->db_id);
670         subreq = ctdb_client_control_send(state, state->ev, state->client,
671                                           ctdb_client_pnn(state->client),
672                                           state->timeout, &request);
673         if (tevent_req_nomem(subreq, req)) {
674                 return;
675         }
676         tevent_req_set_callback(subreq, ctdb_detach_done, req);
677
678 }
679
680 static void ctdb_detach_done(struct tevent_req *subreq)
681 {
682         struct tevent_req *req = tevent_req_callback_data(
683                 subreq, struct tevent_req);
684         struct ctdb_detach_state *state = tevent_req_data(
685                 req, struct ctdb_detach_state);
686         struct ctdb_reply_control *reply;
687         struct ctdb_db_context *db;
688         int ret;
689         bool status;
690
691         status = ctdb_client_control_recv(subreq, &ret, state, &reply);
692         TALLOC_FREE(subreq);
693         if (! status) {
694                 DEBUG(DEBUG_ERR, ("detach: %s DB_DETACH failed, ret=%d\n",
695                                   state->db_name, ret));
696                 tevent_req_error(req, ret);
697                 return;
698         }
699
700         ret = ctdb_reply_control_db_detach(reply);
701         if (ret != 0) {
702                 DEBUG(DEBUG_ERR, ("detach: %s DB_DETACH failed, ret=%d\n",
703                                   state->db_name, ret));
704                 tevent_req_error(req, ret);
705                 return;
706         }
707
708         db = client_db_handle(state->client, state->db_name);
709         if (db != NULL) {
710                 DLIST_REMOVE(state->client->db, db);
711                 TALLOC_FREE(db);
712         }
713
714         tevent_req_done(req);
715 }
716
717 bool ctdb_detach_recv(struct tevent_req *req, int *perr)
718 {
719         int ret;
720
721         if (tevent_req_is_unix_error(req, &ret)) {
722                 if (perr != NULL) {
723                         *perr = ret;
724                 }
725                 return false;
726         }
727
728         return true;
729 }
730
731 int ctdb_detach(struct tevent_context *ev,
732                 struct ctdb_client_context *client,
733                 struct timeval timeout, uint32_t db_id)
734 {
735         TALLOC_CTX *mem_ctx;
736         struct tevent_req *req;
737         int ret;
738         bool status;
739
740         mem_ctx = talloc_new(client);
741         if (mem_ctx == NULL) {
742                 return ENOMEM;
743         }
744
745         req = ctdb_detach_send(mem_ctx, ev, client, timeout, db_id);
746         if (req == NULL) {
747                 talloc_free(mem_ctx);
748                 return ENOMEM;
749         }
750
751         tevent_req_poll(req, ev);
752
753         status = ctdb_detach_recv(req, &ret);
754         if (! status) {
755                 talloc_free(mem_ctx);
756                 return ret;
757         }
758
759         talloc_free(mem_ctx);
760         return 0;
761 }
762
763 uint32_t ctdb_db_id(struct ctdb_db_context *db)
764 {
765         return db->db_id;
766 }
767
768 struct ctdb_db_traverse_local_state {
769         ctdb_rec_parser_func_t parser;
770         void *private_data;
771         bool extract_header;
772         int error;
773 };
774
775 static int ctdb_db_traverse_local_handler(struct tdb_context *tdb,
776                                           TDB_DATA key, TDB_DATA data,
777                                           void *private_data)
778 {
779         struct ctdb_db_traverse_local_state *state =
780                 (struct ctdb_db_traverse_local_state *)private_data;
781         int ret;
782
783         if (state->extract_header) {
784                 struct ctdb_ltdb_header header;
785
786                 ret = ctdb_ltdb_header_extract(&data, &header);
787                 if (ret != 0) {
788                         state->error = ret;
789                         return 1;
790                 }
791
792                 ret = state->parser(0, &header, key, data, state->private_data);
793         } else {
794                 ret = state->parser(0, NULL, key, data, state->private_data);
795         }
796
797         if (ret != 0) {
798                 state->error = ret;
799                 return 1;
800         }
801
802         return 0;
803 }
804
805 int ctdb_db_traverse_local(struct ctdb_db_context *db, bool readonly,
806                            bool extract_header,
807                            ctdb_rec_parser_func_t parser, void *private_data)
808 {
809         struct ctdb_db_traverse_local_state state;
810         int ret;
811
812         state.parser = parser;
813         state.private_data = private_data;
814         state.extract_header = extract_header;
815         state.error = 0;
816
817         if (readonly) {
818                 ret = tdb_traverse_read(db->ltdb->tdb,
819                                         ctdb_db_traverse_local_handler,
820                                         &state);
821         } else {
822                 ret = tdb_traverse(db->ltdb->tdb,
823                                    ctdb_db_traverse_local_handler, &state);
824         }
825
826         if (ret == -1) {
827                 return EIO;
828         }
829
830         return state.error;
831 }
832
833 struct ctdb_db_traverse_state {
834         struct tevent_context *ev;
835         struct ctdb_client_context *client;
836         struct ctdb_db_context *db;
837         uint32_t destnode;
838         uint64_t srvid;
839         struct timeval timeout;
840         ctdb_rec_parser_func_t parser;
841         void *private_data;
842         int result;
843 };
844
845 static void ctdb_db_traverse_handler_set(struct tevent_req *subreq);
846 static void ctdb_db_traverse_started(struct tevent_req *subreq);
847 static void ctdb_db_traverse_handler(uint64_t srvid, TDB_DATA data,
848                                      void *private_data);
849 static void ctdb_db_traverse_remove_handler(struct tevent_req *req);
850 static void ctdb_db_traverse_handler_removed(struct tevent_req *subreq);
851
852 struct tevent_req *ctdb_db_traverse_send(TALLOC_CTX *mem_ctx,
853                                          struct tevent_context *ev,
854                                          struct ctdb_client_context *client,
855                                          struct ctdb_db_context *db,
856                                          uint32_t destnode,
857                                          struct timeval timeout,
858                                          ctdb_rec_parser_func_t parser,
859                                          void *private_data)
860 {
861         struct tevent_req *req, *subreq;
862         struct ctdb_db_traverse_state *state;
863
864         req = tevent_req_create(mem_ctx, &state,
865                                 struct ctdb_db_traverse_state);
866         if (req == NULL) {
867                 return NULL;
868         }
869
870         state->ev = ev;
871         state->client = client;
872         state->db = db;
873         state->destnode = destnode;
874         state->srvid = CTDB_SRVID_CLIENT_RANGE | getpid();
875         state->timeout = timeout;
876         state->parser = parser;
877         state->private_data = private_data;
878
879         subreq = ctdb_client_set_message_handler_send(state, ev, client,
880                                                       state->srvid,
881                                                       ctdb_db_traverse_handler,
882                                                       req);
883         if (tevent_req_nomem(subreq, req)) {
884                 return tevent_req_post(req, ev);
885         }
886         tevent_req_set_callback(subreq, ctdb_db_traverse_handler_set, req);
887
888         return req;
889 }
890
891 static void ctdb_db_traverse_handler_set(struct tevent_req *subreq)
892 {
893         struct tevent_req *req = tevent_req_callback_data(
894                 subreq, struct tevent_req);
895         struct ctdb_db_traverse_state *state = tevent_req_data(
896                 req, struct ctdb_db_traverse_state);
897         struct ctdb_traverse_start_ext traverse;
898         struct ctdb_req_control request;
899         int ret = 0;
900         bool status;
901
902         status = ctdb_client_set_message_handler_recv(subreq, &ret);
903         TALLOC_FREE(subreq);
904         if (! status) {
905                 tevent_req_error(req, ret);
906                 return;
907         }
908
909         traverse = (struct ctdb_traverse_start_ext) {
910                 .db_id = ctdb_db_id(state->db),
911                 .reqid = 0,
912                 .srvid = state->srvid,
913                 .withemptyrecords = false,
914         };
915
916         ctdb_req_control_traverse_start_ext(&request, &traverse);
917         subreq = ctdb_client_control_send(state, state->ev, state->client,
918                                           state->destnode, state->timeout,
919                                           &request);
920         if (subreq == NULL) {
921                 state->result = ENOMEM;
922                 ctdb_db_traverse_remove_handler(req);
923                 return;
924         }
925         tevent_req_set_callback(subreq, ctdb_db_traverse_started, req);
926 }
927
928 static void ctdb_db_traverse_started(struct tevent_req *subreq)
929 {
930         struct tevent_req *req = tevent_req_callback_data(
931                 subreq, struct tevent_req);
932         struct ctdb_db_traverse_state *state = tevent_req_data(
933                 req, struct ctdb_db_traverse_state);
934         struct ctdb_reply_control *reply;
935         int ret = 0;
936         bool status;
937
938         status = ctdb_client_control_recv(subreq, &ret, state, &reply);
939         TALLOC_FREE(subreq);
940         if (! status) {
941                 DEBUG(DEBUG_ERR, ("traverse: control failed, ret=%d\n", ret));
942                 state->result = ret;
943                 ctdb_db_traverse_remove_handler(req);
944                 return;
945         }
946
947         ret = ctdb_reply_control_traverse_start_ext(reply);
948         talloc_free(reply);
949         if (ret != 0) {
950                 DEBUG(DEBUG_ERR, ("traverse: control reply failed, ret=%d\n",
951                                   ret));
952                 state->result = ret;
953                 ctdb_db_traverse_remove_handler(req);
954                 return;
955         }
956 }
957
958 static void ctdb_db_traverse_handler(uint64_t srvid, TDB_DATA data,
959                                      void *private_data)
960 {
961         struct tevent_req *req = talloc_get_type_abort(
962                 private_data, struct tevent_req);
963         struct ctdb_db_traverse_state *state = tevent_req_data(
964                 req, struct ctdb_db_traverse_state);
965         struct ctdb_rec_data *rec;
966         struct ctdb_ltdb_header header;
967         int ret;
968
969         ret = ctdb_rec_data_pull(data.dptr, data.dsize, state, &rec);
970         if (ret != 0) {
971                 return;
972         }
973
974         if (rec->key.dsize == 0 && rec->data.dsize == 0) {
975                 talloc_free(rec);
976                 ctdb_db_traverse_remove_handler(req);
977                 return;
978         }
979
980         ret = ctdb_ltdb_header_extract(&rec->data, &header);
981         if (ret != 0) {
982                 talloc_free(rec);
983                 return;
984         }
985
986         if (rec->data.dsize == 0) {
987                 talloc_free(rec);
988                 return;
989         }
990
991         ret = state->parser(rec->reqid, &header, rec->key, rec->data,
992                             state->private_data);
993         talloc_free(rec);
994         if (ret != 0) {
995                 state->result = ret;
996                 ctdb_db_traverse_remove_handler(req);
997         }
998 }
999
1000 static void ctdb_db_traverse_remove_handler(struct tevent_req *req)
1001 {
1002         struct ctdb_db_traverse_state *state = tevent_req_data(
1003                 req, struct ctdb_db_traverse_state);
1004         struct tevent_req *subreq;
1005
1006         subreq = ctdb_client_remove_message_handler_send(state, state->ev,
1007                                                          state->client,
1008                                                          state->srvid, req);
1009         if (tevent_req_nomem(subreq, req)) {
1010                 return;
1011         }
1012         tevent_req_set_callback(subreq, ctdb_db_traverse_handler_removed, req);
1013 }
1014
1015 static void ctdb_db_traverse_handler_removed(struct tevent_req *subreq)
1016 {
1017         struct tevent_req *req = tevent_req_callback_data(
1018                 subreq, struct tevent_req);
1019         struct ctdb_db_traverse_state *state = tevent_req_data(
1020                 req, struct ctdb_db_traverse_state);
1021         int ret;
1022         bool status;
1023
1024         status = ctdb_client_remove_message_handler_recv(subreq, &ret);
1025         TALLOC_FREE(subreq);
1026         if (! status) {
1027                 tevent_req_error(req, ret);
1028                 return;
1029         }
1030
1031         if (state->result != 0) {
1032                 tevent_req_error(req, state->result);
1033                 return;
1034         }
1035
1036         tevent_req_done(req);
1037 }
1038
1039 bool ctdb_db_traverse_recv(struct tevent_req *req, int *perr)
1040 {
1041         int ret;
1042
1043         if (tevent_req_is_unix_error(req, &ret)) {
1044                 if (perr != NULL) {
1045                         *perr = ret;
1046                 }
1047                 return false;
1048         }
1049
1050         return true;
1051 }
1052
1053 int ctdb_db_traverse(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
1054                      struct ctdb_client_context *client,
1055                      struct ctdb_db_context *db,
1056                      uint32_t destnode, struct timeval timeout,
1057                      ctdb_rec_parser_func_t parser, void *private_data)
1058 {
1059         struct tevent_req *req;
1060         int ret = 0;
1061         bool status;
1062
1063         req = ctdb_db_traverse_send(mem_ctx, ev, client, db, destnode,
1064                                     timeout, parser, private_data);
1065         if (req == NULL) {
1066                 return ENOMEM;
1067         }
1068
1069         tevent_req_poll(req, ev);
1070
1071         status = ctdb_db_traverse_recv(req, &ret);
1072         if (! status) {
1073                 return ret;
1074         }
1075
1076         return 0;
1077 }
1078
1079 int ctdb_ltdb_fetch(struct ctdb_db_context *db, TDB_DATA key,
1080                     struct ctdb_ltdb_header *header,
1081                     TALLOC_CTX *mem_ctx, TDB_DATA *data)
1082 {
1083         TDB_DATA rec;
1084         int ret;
1085
1086         rec = tdb_fetch(db->ltdb->tdb, key);
1087         if (rec.dsize < sizeof(struct ctdb_ltdb_header)) {
1088                 /* No record present */
1089                 if (rec.dptr != NULL) {
1090                         free(rec.dptr);
1091                 }
1092
1093                 if (tdb_error(db->ltdb->tdb) != TDB_ERR_NOEXIST) {
1094                         return EIO;
1095                 }
1096
1097                 header->rsn = 0;
1098                 header->dmaster = CTDB_UNKNOWN_PNN;
1099                 header->flags = 0;
1100
1101                 if (data != NULL) {
1102                         *data = tdb_null;
1103                 }
1104                 return 0;
1105         }
1106
1107         ret = ctdb_ltdb_header_pull(rec.dptr, rec.dsize, header);
1108         if (ret != 0) {
1109                 return ret;
1110         }
1111
1112         ret = 0;
1113         if (data != NULL) {
1114                 size_t offset = ctdb_ltdb_header_len(header);
1115
1116                 data->dsize = rec.dsize - offset;
1117                 data->dptr = talloc_memdup(mem_ctx, rec.dptr + offset,
1118                                            data->dsize);
1119                 if (data->dptr == NULL) {
1120                         ret = ENOMEM;
1121                 }
1122         }
1123
1124         free(rec.dptr);
1125         return ret;
1126 }
1127
1128 /*
1129  * Fetch a record from volatile database
1130  *
1131  * Steps:
1132  *  1. Get a lock on the hash chain
1133  *  2. If the record does not exist, migrate the record
1134  *  3. If readonly=true and delegations do not exist, migrate the record.
1135  *  4. If readonly=false and delegations exist, migrate the record.
1136  *  5. If the local node is not dmaster, migrate the record.
1137  *  6. Return record
1138  */
1139
1140 struct ctdb_fetch_lock_state {
1141         struct tevent_context *ev;
1142         struct ctdb_client_context *client;
1143         struct ctdb_record_handle *h;
1144         bool readonly;
1145         uint32_t pnn;
1146 };
1147
1148 static int ctdb_fetch_lock_check(struct tevent_req *req);
1149 static void ctdb_fetch_lock_migrate(struct tevent_req *req);
1150 static void ctdb_fetch_lock_migrate_done(struct tevent_req *subreq);
1151
1152 struct tevent_req *ctdb_fetch_lock_send(TALLOC_CTX *mem_ctx,
1153                                         struct tevent_context *ev,
1154                                         struct ctdb_client_context *client,
1155                                         struct ctdb_db_context *db,
1156                                         TDB_DATA key, bool readonly)
1157 {
1158         struct ctdb_fetch_lock_state *state;
1159         struct tevent_req *req;
1160         int ret;
1161
1162         req = tevent_req_create(mem_ctx, &state, struct ctdb_fetch_lock_state);
1163         if (req == NULL) {
1164                 return NULL;
1165         }
1166
1167         state->ev = ev;
1168         state->client = client;
1169
1170         state->h = talloc_zero(db, struct ctdb_record_handle);
1171         if (tevent_req_nomem(state->h, req)) {
1172                 return tevent_req_post(req, ev);
1173         }
1174         state->h->client = client;
1175         state->h->db = db;
1176         state->h->key.dptr = talloc_memdup(state->h, key.dptr, key.dsize);
1177         if (tevent_req_nomem(state->h->key.dptr, req)) {
1178                 return tevent_req_post(req, ev);
1179         }
1180         state->h->key.dsize = key.dsize;
1181         state->h->readonly = false;
1182
1183         state->readonly = readonly;
1184         state->pnn = ctdb_client_pnn(client);
1185
1186         /* Check that database is not persistent */
1187         if (db->persistent) {
1188                 DEBUG(DEBUG_ERR, ("fetch_lock: %s database not volatile\n",
1189                                   db->db_name));
1190                 tevent_req_error(req, EINVAL);
1191                 return tevent_req_post(req, ev);
1192         }
1193
1194         ret = ctdb_fetch_lock_check(req);
1195         if (ret == 0) {
1196                 tevent_req_done(req);
1197                 return tevent_req_post(req, ev);
1198         }
1199         if (ret != EAGAIN) {
1200                 tevent_req_error(req, ret);
1201                 return tevent_req_post(req, ev);
1202         }
1203         return req;
1204 }
1205
1206 static int ctdb_fetch_lock_check(struct tevent_req *req)
1207 {
1208         struct ctdb_fetch_lock_state *state = tevent_req_data(
1209                 req, struct ctdb_fetch_lock_state);
1210         struct ctdb_record_handle *h = state->h;
1211         struct ctdb_ltdb_header header;
1212         TDB_DATA data = tdb_null;
1213         int ret, err = 0;
1214         bool do_migrate = false;
1215
1216         ret = tdb_chainlock(h->db->ltdb->tdb, h->key);
1217         if (ret != 0) {
1218                 DEBUG(DEBUG_ERR,
1219                       ("fetch_lock: %s tdb_chainlock failed, %s\n",
1220                        h->db->db_name, tdb_errorstr(h->db->ltdb->tdb)));
1221                 err = EIO;
1222                 goto failed;
1223         }
1224
1225         data = tdb_fetch(h->db->ltdb->tdb, h->key);
1226         if (data.dptr == NULL) {
1227                 if (tdb_error(h->db->ltdb->tdb) == TDB_ERR_NOEXIST) {
1228                         goto migrate;
1229                 } else {
1230                         err = EIO;
1231                         goto failed;
1232                 }
1233         }
1234
1235         /* Got the record */
1236         ret = ctdb_ltdb_header_pull(data.dptr, data.dsize, &header);
1237         if (ret != 0) {
1238                 err = ret;
1239                 goto failed;
1240         }
1241
1242         if (! state->readonly) {
1243                 /* Read/write access */
1244                 if (header.dmaster == state->pnn &&
1245                     header.flags & CTDB_REC_RO_HAVE_DELEGATIONS) {
1246                         goto migrate;
1247                 }
1248
1249                 if (header.dmaster != state->pnn) {
1250                         goto migrate;
1251                 }
1252         } else {
1253                 /* Readonly access */
1254                 if (header.dmaster != state->pnn &&
1255                     ! (header.flags & (CTDB_REC_RO_HAVE_READONLY |
1256                                        CTDB_REC_RO_HAVE_DELEGATIONS))) {
1257                         goto migrate;
1258                 }
1259         }
1260
1261         /* We are the dmaster or readonly delegation */
1262         h->header = header;
1263         h->data = data;
1264         if (header.flags & (CTDB_REC_RO_HAVE_READONLY |
1265                             CTDB_REC_RO_HAVE_DELEGATIONS)) {
1266                 h->readonly = true;
1267         }
1268         return 0;
1269
1270 migrate:
1271         do_migrate = true;
1272         err = EAGAIN;
1273
1274 failed:
1275         if (data.dptr != NULL) {
1276                 free(data.dptr);
1277         }
1278         ret = tdb_chainunlock(h->db->ltdb->tdb, h->key);
1279         if (ret != 0) {
1280                 DEBUG(DEBUG_ERR,
1281                       ("fetch_lock: %s tdb_chainunlock failed, %s\n",
1282                        h->db->db_name, tdb_errorstr(h->db->ltdb->tdb)));
1283                 return EIO;
1284         }
1285
1286         if (do_migrate) {
1287                 ctdb_fetch_lock_migrate(req);
1288         }
1289         return err;
1290 }
1291
1292 static void ctdb_fetch_lock_migrate(struct tevent_req *req)
1293 {
1294         struct ctdb_fetch_lock_state *state = tevent_req_data(
1295                 req, struct ctdb_fetch_lock_state);
1296         struct ctdb_req_call request;
1297         struct tevent_req *subreq;
1298
1299         ZERO_STRUCT(request);
1300         request.flags = CTDB_IMMEDIATE_MIGRATION;
1301         if (state->readonly) {
1302                 request.flags |= CTDB_WANT_READONLY;
1303         }
1304         request.db_id = state->h->db->db_id;
1305         request.callid = CTDB_NULL_FUNC;
1306         request.key = state->h->key;
1307         request.calldata = tdb_null;
1308
1309         subreq = ctdb_client_call_send(state, state->ev, state->client,
1310                                        &request);
1311         if (tevent_req_nomem(subreq, req)) {
1312                 return;
1313         }
1314
1315         tevent_req_set_callback(subreq, ctdb_fetch_lock_migrate_done, req);
1316 }
1317
1318 static void ctdb_fetch_lock_migrate_done(struct tevent_req *subreq)
1319 {
1320         struct tevent_req *req = tevent_req_callback_data(
1321                 subreq, struct tevent_req);
1322         struct ctdb_fetch_lock_state *state = tevent_req_data(
1323                 req, struct ctdb_fetch_lock_state);
1324         struct ctdb_reply_call *reply;
1325         int ret;
1326         bool status;
1327
1328         status = ctdb_client_call_recv(subreq, state, &reply, &ret);
1329         TALLOC_FREE(subreq);
1330         if (! status) {
1331                 DEBUG(DEBUG_ERR, ("fetch_lock: %s CALL failed, ret=%d\n",
1332                                   state->h->db->db_name, ret));
1333                 tevent_req_error(req, ret);
1334                 return;
1335         }
1336
1337         if (reply->status != 0) {
1338                 tevent_req_error(req, EIO);
1339                 return;
1340         }
1341         talloc_free(reply);
1342
1343         ret = ctdb_fetch_lock_check(req);
1344         if (ret != 0) {
1345                 if (ret != EAGAIN) {
1346                         tevent_req_error(req, ret);
1347                 }
1348                 return;
1349         }
1350
1351         tevent_req_done(req);
1352 }
1353
1354 static int ctdb_record_handle_destructor(struct ctdb_record_handle *h)
1355 {
1356         int ret;
1357
1358         ret = tdb_chainunlock(h->db->ltdb->tdb, h->key);
1359         if (ret != 0) {
1360                 DEBUG(DEBUG_ERR,
1361                       ("fetch_lock: %s tdb_chainunlock failed, %s\n",
1362                        h->db->db_name, tdb_errorstr(h->db->ltdb->tdb)));
1363         }
1364         free(h->data.dptr);
1365         return 0;
1366 }
1367
1368 struct ctdb_record_handle *ctdb_fetch_lock_recv(struct tevent_req *req,
1369                                                 struct ctdb_ltdb_header *header,
1370                                                 TALLOC_CTX *mem_ctx,
1371                                                 TDB_DATA *data, int *perr)
1372 {
1373         struct ctdb_fetch_lock_state *state = tevent_req_data(
1374                 req, struct ctdb_fetch_lock_state);
1375         struct ctdb_record_handle *h = state->h;
1376         int err;
1377
1378         if (tevent_req_is_unix_error(req, &err)) {
1379                 if (perr != NULL) {
1380                         TALLOC_FREE(state->h);
1381                         *perr = err;
1382                 }
1383                 return NULL;
1384         }
1385
1386         if (header != NULL) {
1387                 *header = h->header;
1388         }
1389         if (data != NULL) {
1390                 size_t offset;
1391
1392                 offset = ctdb_ltdb_header_len(&h->header);
1393
1394                 data->dsize = h->data.dsize - offset;
1395                 data->dptr = talloc_memdup(mem_ctx, h->data.dptr + offset,
1396                                            data->dsize);
1397                 if (data->dptr == NULL) {
1398                         TALLOC_FREE(state->h);
1399                         if (perr != NULL) {
1400                                 *perr = ENOMEM;
1401                         }
1402                         return NULL;
1403                 }
1404         }
1405
1406         talloc_set_destructor(h, ctdb_record_handle_destructor);
1407         return h;
1408 }
1409
1410 int ctdb_fetch_lock(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
1411                     struct ctdb_client_context *client,
1412                     struct ctdb_db_context *db, TDB_DATA key, bool readonly,
1413                     struct ctdb_record_handle **out,
1414                     struct ctdb_ltdb_header *header, TDB_DATA *data)
1415 {
1416         struct tevent_req *req;
1417         struct ctdb_record_handle *h;
1418         int ret;
1419
1420         req = ctdb_fetch_lock_send(mem_ctx, ev, client, db, key, readonly);
1421         if (req == NULL) {
1422                 return ENOMEM;
1423         }
1424
1425         tevent_req_poll(req, ev);
1426
1427         h = ctdb_fetch_lock_recv(req, header, mem_ctx, data, &ret);
1428         if (h == NULL) {
1429                 return ret;
1430         }
1431
1432         *out = h;
1433         return 0;
1434 }
1435
1436 int ctdb_store_record(struct ctdb_record_handle *h, TDB_DATA data)
1437 {
1438         uint8_t header[sizeof(struct ctdb_ltdb_header)];
1439         TDB_DATA rec[2];
1440         int ret;
1441
1442         /* Cannot modify the record if it was obtained as a readonly copy */
1443         if (h->readonly) {
1444                 return EINVAL;
1445         }
1446
1447         /* Check if the new data is same */
1448         if (h->data.dsize == data.dsize &&
1449             memcmp(h->data.dptr, data.dptr, data.dsize) == 0) {
1450                 /* No need to do anything */
1451                 return 0;
1452         }
1453
1454         ctdb_ltdb_header_push(&h->header, header);
1455
1456         rec[0].dsize = ctdb_ltdb_header_len(&h->header);
1457         rec[0].dptr = header;
1458
1459         rec[1].dsize = data.dsize;
1460         rec[1].dptr = data.dptr;
1461
1462         ret = tdb_storev(h->db->ltdb->tdb, h->key, rec, 2, TDB_REPLACE);
1463         if (ret != 0) {
1464                 DEBUG(DEBUG_ERR,
1465                       ("store_record: %s tdb_storev failed, %s\n",
1466                        h->db->db_name, tdb_errorstr(h->db->ltdb->tdb)));
1467                 return EIO;
1468         }
1469
1470         return 0;
1471 }
1472
1473 struct ctdb_delete_record_state {
1474         struct ctdb_record_handle *h;
1475 };
1476
1477 static void ctdb_delete_record_done(struct tevent_req *subreq);
1478
1479 struct tevent_req *ctdb_delete_record_send(TALLOC_CTX *mem_ctx,
1480                                            struct tevent_context *ev,
1481                                            struct ctdb_record_handle *h)
1482 {
1483         struct tevent_req *req, *subreq;
1484         struct ctdb_delete_record_state *state;
1485         struct ctdb_key_data key;
1486         struct ctdb_req_control request;
1487         uint8_t header[sizeof(struct ctdb_ltdb_header)];
1488         TDB_DATA rec;
1489         int ret;
1490
1491         req = tevent_req_create(mem_ctx, &state,
1492                                 struct ctdb_delete_record_state);
1493         if (req == NULL) {
1494                 return NULL;
1495         }
1496
1497         state->h = h;
1498
1499         /* Cannot delete the record if it was obtained as a readonly copy */
1500         if (h->readonly) {
1501                 DEBUG(DEBUG_ERR, ("fetch_lock delete: %s readonly record\n",
1502                                   h->db->db_name));
1503                 tevent_req_error(req, EINVAL);
1504                 return tevent_req_post(req, ev);
1505         }
1506
1507         ctdb_ltdb_header_push(&h->header, header);
1508
1509         rec.dsize = ctdb_ltdb_header_len(&h->header);
1510         rec.dptr = header;
1511
1512         ret = tdb_store(h->db->ltdb->tdb, h->key, rec, TDB_REPLACE);
1513         if (ret != 0) {
1514                 DEBUG(DEBUG_ERR,
1515                       ("fetch_lock delete: %s tdb_sore failed, %s\n",
1516                        h->db->db_name, tdb_errorstr(h->db->ltdb->tdb)));
1517                 tevent_req_error(req, EIO);
1518                 return tevent_req_post(req, ev);
1519         }
1520
1521         key.db_id = h->db->db_id;
1522         key.header = h->header;
1523         key.key = h->key;
1524
1525         ctdb_req_control_schedule_for_deletion(&request, &key);
1526         subreq = ctdb_client_control_send(state, ev, h->client,
1527                                           ctdb_client_pnn(h->client),
1528                                           tevent_timeval_zero(),
1529                                           &request);
1530         if (tevent_req_nomem(subreq, req)) {
1531                 return tevent_req_post(req, ev);
1532         }
1533         tevent_req_set_callback(subreq, ctdb_delete_record_done, req);
1534
1535         return req;
1536 }
1537
1538 static void ctdb_delete_record_done(struct tevent_req *subreq)
1539 {
1540         struct tevent_req *req = tevent_req_callback_data(
1541                 subreq, struct tevent_req);
1542         struct ctdb_delete_record_state *state = tevent_req_data(
1543                 req, struct ctdb_delete_record_state);
1544         int ret;
1545         bool status;
1546
1547         status = ctdb_client_control_recv(subreq, &ret, NULL, NULL);
1548         TALLOC_FREE(subreq);
1549         if (! status) {
1550                 DEBUG(DEBUG_ERR,
1551                       ("delete_record: %s SCHDULE_FOR_DELETION failed, "
1552                        "ret=%d\n", state->h->db->db_name, ret));
1553                 tevent_req_error(req, ret);
1554                 return;
1555         }
1556
1557         tevent_req_done(req);
1558 }
1559
1560 bool ctdb_delete_record_recv(struct tevent_req *req, int *perr)
1561 {
1562         int err;
1563
1564         if (tevent_req_is_unix_error(req, &err)) {
1565                 if (perr != NULL) {
1566                         *perr = err;
1567                 }
1568                 return false;
1569         }
1570
1571         return true;
1572 }
1573
1574
1575 int ctdb_delete_record(struct ctdb_record_handle *h)
1576 {
1577         struct tevent_context *ev = h->ev;
1578         TALLOC_CTX *mem_ctx;
1579         struct tevent_req *req;
1580         int ret;
1581         bool status;
1582
1583         mem_ctx = talloc_new(NULL);
1584         if (mem_ctx == NULL) {
1585                 return ENOMEM;
1586         }
1587
1588         req = ctdb_delete_record_send(mem_ctx, ev, h);
1589         if (req == NULL) {
1590                 talloc_free(mem_ctx);
1591                 return ENOMEM;
1592         }
1593
1594         tevent_req_poll(req, ev);
1595
1596         status = ctdb_delete_record_recv(req, &ret);
1597         talloc_free(mem_ctx);
1598         if (! status) {
1599                 return ret;
1600         }
1601
1602         return 0;
1603 }
1604
1605 /*
1606  * Global lock functions
1607  */
1608
1609 struct ctdb_g_lock_lock_state {
1610         struct tevent_context *ev;
1611         struct ctdb_client_context *client;
1612         struct ctdb_db_context *db;
1613         TDB_DATA key;
1614         struct ctdb_server_id my_sid;
1615         enum ctdb_g_lock_type lock_type;
1616         struct ctdb_record_handle *h;
1617         /* state for verification of active locks */
1618         struct ctdb_g_lock_list *lock_list;
1619         unsigned int current;
1620 };
1621
1622 static void ctdb_g_lock_lock_fetched(struct tevent_req *subreq);
1623 static void ctdb_g_lock_lock_process_locks(struct tevent_req *req);
1624 static void ctdb_g_lock_lock_checked(struct tevent_req *subreq);
1625 static int ctdb_g_lock_lock_update(struct tevent_req *req);
1626 static void ctdb_g_lock_lock_retry(struct tevent_req *subreq);
1627
1628 static bool ctdb_g_lock_conflicts(enum ctdb_g_lock_type l1,
1629                                   enum ctdb_g_lock_type l2)
1630 {
1631         if ((l1 == CTDB_G_LOCK_READ) && (l2 == CTDB_G_LOCK_READ)) {
1632                 return false;
1633         }
1634         return true;
1635 }
1636
1637 struct tevent_req *ctdb_g_lock_lock_send(TALLOC_CTX *mem_ctx,
1638                                          struct tevent_context *ev,
1639                                          struct ctdb_client_context *client,
1640                                          struct ctdb_db_context *db,
1641                                          const char *keyname,
1642                                          struct ctdb_server_id *sid,
1643                                          bool readonly)
1644 {
1645         struct tevent_req *req, *subreq;
1646         struct ctdb_g_lock_lock_state *state;
1647
1648         req = tevent_req_create(mem_ctx, &state,
1649                                 struct ctdb_g_lock_lock_state);
1650         if (req == NULL) {
1651                 return NULL;
1652         }
1653
1654         state->ev = ev;
1655         state->client = client;
1656         state->db = db;
1657         state->key.dptr = discard_const(keyname);
1658         state->key.dsize = strlen(keyname) + 1;
1659         state->my_sid = *sid;
1660         state->lock_type = (readonly ? CTDB_G_LOCK_READ : CTDB_G_LOCK_WRITE);
1661
1662         subreq = ctdb_fetch_lock_send(state, ev, client, db, state->key,
1663                                       false);
1664         if (tevent_req_nomem(subreq, req)) {
1665                 return tevent_req_post(req, ev);
1666         }
1667         tevent_req_set_callback(subreq, ctdb_g_lock_lock_fetched, req);
1668
1669         return req;
1670 }
1671
1672 static void ctdb_g_lock_lock_fetched(struct tevent_req *subreq)
1673 {
1674         struct tevent_req *req = tevent_req_callback_data(
1675                 subreq, struct tevent_req);
1676         struct ctdb_g_lock_lock_state *state = tevent_req_data(
1677                 req, struct ctdb_g_lock_lock_state);
1678         TDB_DATA data;
1679         int ret = 0;
1680
1681         state->h = ctdb_fetch_lock_recv(subreq, NULL, state, &data, &ret);
1682         TALLOC_FREE(subreq);
1683         if (state->h == NULL) {
1684                 DEBUG(DEBUG_ERR, ("g_lock_lock: %s fetch lock failed\n",
1685                                   (char *)state->key.dptr));
1686                 tevent_req_error(req, ret);
1687                 return;
1688         }
1689
1690         if (state->lock_list != NULL) {
1691                 TALLOC_FREE(state->lock_list);
1692                 state->current = 0;
1693         }
1694
1695         ret = ctdb_g_lock_list_pull(data.dptr, data.dsize, state,
1696                                     &state->lock_list);
1697         talloc_free(data.dptr);
1698         if (ret != 0) {
1699                 DEBUG(DEBUG_ERR, ("g_lock_lock: %s invalid lock data\n",
1700                                   (char *)state->key.dptr));
1701                 tevent_req_error(req, ret);
1702                 return;
1703         }
1704
1705         ctdb_g_lock_lock_process_locks(req);
1706 }
1707
1708 static void ctdb_g_lock_lock_process_locks(struct tevent_req *req)
1709 {
1710         struct ctdb_g_lock_lock_state *state = tevent_req_data(
1711                 req, struct ctdb_g_lock_lock_state);
1712         struct tevent_req *subreq;
1713         struct ctdb_g_lock *lock;
1714         bool check_server = false;
1715         int ret;
1716
1717         while (state->current < state->lock_list->num) {
1718                 lock = &state->lock_list->lock[state->current];
1719
1720                 /* We should not ask for the same lock more than once */
1721                 if (ctdb_server_id_equal(&lock->sid, &state->my_sid)) {
1722                         DEBUG(DEBUG_ERR, ("g_lock_lock: %s deadlock\n",
1723                                           (char *)state->key.dptr));
1724                         tevent_req_error(req, EDEADLK);
1725                         return;
1726                 }
1727
1728                 if (ctdb_g_lock_conflicts(lock->type, state->lock_type)) {
1729                         check_server = true;
1730                         break;
1731                 }
1732
1733                 state->current += 1;
1734         }
1735
1736         if (check_server) {
1737                 struct ctdb_req_control request;
1738
1739                 ctdb_req_control_process_exists(&request, lock->sid.pid);
1740                 subreq = ctdb_client_control_send(state, state->ev,
1741                                                   state->client,
1742                                                   lock->sid.vnn,
1743                                                   tevent_timeval_zero(),
1744                                                   &request);
1745                 if (tevent_req_nomem(subreq, req)) {
1746                         return;
1747                 }
1748                 tevent_req_set_callback(subreq, ctdb_g_lock_lock_checked, req);
1749                 return;
1750         }
1751
1752         /* There is no conflict, add ourself to the lock_list */
1753         state->lock_list->lock = talloc_realloc(state->lock_list,
1754                                                 state->lock_list->lock,
1755                                                 struct ctdb_g_lock,
1756                                                 state->lock_list->num + 1);
1757         if (state->lock_list->lock == NULL) {
1758                 tevent_req_error(req, ENOMEM);
1759                 return;
1760         }
1761
1762         lock = &state->lock_list->lock[state->lock_list->num];
1763         lock->type = state->lock_type;
1764         lock->sid = state->my_sid;
1765         state->lock_list->num += 1;
1766
1767         ret = ctdb_g_lock_lock_update(req);
1768         if (ret != 0) {
1769                 tevent_req_error(req, ret);
1770                 return;
1771         }
1772
1773         TALLOC_FREE(state->h);
1774         tevent_req_done(req);
1775 }
1776
1777 static void ctdb_g_lock_lock_checked(struct tevent_req *subreq)
1778 {
1779         struct tevent_req *req = tevent_req_callback_data(
1780                 subreq, struct tevent_req);
1781         struct ctdb_g_lock_lock_state *state = tevent_req_data(
1782                 req, struct ctdb_g_lock_lock_state);
1783         struct ctdb_reply_control *reply;
1784         int ret, value;
1785         bool status;
1786
1787         status = ctdb_client_control_recv(subreq, &ret, state, &reply);
1788         TALLOC_FREE(subreq);
1789         if (! status) {
1790                 DEBUG(DEBUG_ERR,
1791                       ("g_lock_lock: %s PROCESS_EXISTS failed, ret=%d\n",
1792                        (char *)state->key.dptr, ret));
1793                 tevent_req_error(req, ret);
1794                 return;
1795         }
1796
1797         ret = ctdb_reply_control_process_exists(reply, &value);
1798         if (ret != 0) {
1799                 tevent_req_error(req, ret);
1800                 return;
1801         }
1802         talloc_free(reply);
1803
1804         if (value == 0) {
1805                 /* server process exists, need to retry */
1806                 TALLOC_FREE(state->h);
1807                 subreq = tevent_wakeup_send(state, state->ev,
1808                                             tevent_timeval_current_ofs(0,1000));
1809                 if (tevent_req_nomem(subreq, req)) {
1810                         return;
1811                 }
1812                 tevent_req_set_callback(subreq, ctdb_g_lock_lock_retry, req);
1813                 return;
1814         }
1815
1816         /* server process does not exist, remove conflicting entry */
1817         state->lock_list->lock[state->current] =
1818                 state->lock_list->lock[state->lock_list->num-1];
1819         state->lock_list->num -= 1;
1820
1821         ret = ctdb_g_lock_lock_update(req);
1822         if (ret != 0) {
1823                 tevent_req_error(req, ret);
1824                 return;
1825         }
1826
1827         ctdb_g_lock_lock_process_locks(req);
1828 }
1829
1830 static int ctdb_g_lock_lock_update(struct tevent_req *req)
1831 {
1832         struct ctdb_g_lock_lock_state *state = tevent_req_data(
1833                 req, struct ctdb_g_lock_lock_state);
1834         TDB_DATA data;
1835         int ret;
1836
1837         data.dsize = ctdb_g_lock_list_len(state->lock_list);
1838         data.dptr = talloc_size(state, data.dsize);
1839         if (data.dptr == NULL) {
1840                 return ENOMEM;
1841         }
1842
1843         ctdb_g_lock_list_push(state->lock_list, data.dptr);
1844         ret = ctdb_store_record(state->h, data);
1845         talloc_free(data.dptr);
1846         return ret;
1847 }
1848
1849 static void ctdb_g_lock_lock_retry(struct tevent_req *subreq)
1850 {
1851         struct tevent_req *req = tevent_req_callback_data(
1852                 subreq, struct tevent_req);
1853         struct ctdb_g_lock_lock_state *state = tevent_req_data(
1854                 req, struct ctdb_g_lock_lock_state);
1855         bool success;
1856
1857         success = tevent_wakeup_recv(subreq);
1858         TALLOC_FREE(subreq);
1859         if (! success) {
1860                 tevent_req_error(req, ENOMEM);
1861                 return;
1862         }
1863
1864         subreq = ctdb_fetch_lock_send(state, state->ev, state->client,
1865                                       state->db, state->key, false);
1866         if (tevent_req_nomem(subreq, req)) {
1867                 return;
1868         }
1869         tevent_req_set_callback(subreq, ctdb_g_lock_lock_fetched, req);
1870 }
1871
1872 bool ctdb_g_lock_lock_recv(struct tevent_req *req, int *perr)
1873 {
1874         struct ctdb_g_lock_lock_state *state = tevent_req_data(
1875                 req, struct ctdb_g_lock_lock_state);
1876         int err;
1877
1878         TALLOC_FREE(state->h);
1879
1880         if (tevent_req_is_unix_error(req, &err)) {
1881                 if (perr != NULL) {
1882                         *perr = err;
1883                 }
1884                 return false;
1885         }
1886
1887         return true;
1888 }
1889
1890 struct ctdb_g_lock_unlock_state {
1891         struct tevent_context *ev;
1892         struct ctdb_client_context *client;
1893         struct ctdb_db_context *db;
1894         TDB_DATA key;
1895         struct ctdb_server_id my_sid;
1896         struct ctdb_record_handle *h;
1897         struct ctdb_g_lock_list *lock_list;
1898 };
1899
1900 static void ctdb_g_lock_unlock_fetched(struct tevent_req *subreq);
1901 static int ctdb_g_lock_unlock_update(struct tevent_req *req);
1902 static void ctdb_g_lock_unlock_deleted(struct tevent_req *subreq);
1903
1904 struct tevent_req *ctdb_g_lock_unlock_send(TALLOC_CTX *mem_ctx,
1905                                            struct tevent_context *ev,
1906                                            struct ctdb_client_context *client,
1907                                            struct ctdb_db_context *db,
1908                                            const char *keyname,
1909                                            struct ctdb_server_id sid)
1910 {
1911         struct tevent_req *req, *subreq;
1912         struct ctdb_g_lock_unlock_state *state;
1913
1914         req = tevent_req_create(mem_ctx, &state,
1915                                 struct ctdb_g_lock_unlock_state);
1916         if (req == NULL) {
1917                 return NULL;
1918         }
1919
1920         state->ev = ev;
1921         state->client = client;
1922         state->db = db;
1923         state->key.dptr = discard_const(keyname);
1924         state->key.dsize = strlen(keyname) + 1;
1925         state->my_sid = sid;
1926
1927         subreq = ctdb_fetch_lock_send(state, ev, client, db, state->key,
1928                                       false);
1929         if (tevent_req_nomem(subreq, req)) {
1930                 return tevent_req_post(req, ev);
1931         }
1932         tevent_req_set_callback(subreq, ctdb_g_lock_unlock_fetched, req);
1933
1934         return req;
1935 }
1936
1937 static void ctdb_g_lock_unlock_fetched(struct tevent_req *subreq)
1938 {
1939         struct tevent_req *req = tevent_req_callback_data(
1940                 subreq, struct tevent_req);
1941         struct ctdb_g_lock_unlock_state *state = tevent_req_data(
1942                 req, struct ctdb_g_lock_unlock_state);
1943         TDB_DATA data;
1944         int ret = 0;
1945
1946         state->h = ctdb_fetch_lock_recv(subreq, NULL, state, &data, &ret);
1947         TALLOC_FREE(subreq);
1948         if (state->h == NULL) {
1949                 DEBUG(DEBUG_ERR, ("g_lock_unlock: %s fetch lock failed\n",
1950                                   (char *)state->key.dptr));
1951                 tevent_req_error(req, ret);
1952                 return;
1953         }
1954
1955         ret = ctdb_g_lock_list_pull(data.dptr, data.dsize, state,
1956                                     &state->lock_list);
1957         if (ret != 0) {
1958                 DEBUG(DEBUG_ERR, ("g_lock_unlock: %s invalid lock data\n",
1959                                   (char *)state->key.dptr));
1960                 tevent_req_error(req, ret);
1961                 return;
1962         }
1963
1964         ret = ctdb_g_lock_unlock_update(req);
1965         if (ret != 0) {
1966                 tevent_req_error(req, ret);
1967                 return;
1968         }
1969
1970         if (state->lock_list->num == 0) {
1971                 subreq = ctdb_delete_record_send(state, state->ev, state->h);
1972                 if (tevent_req_nomem(subreq, req)) {
1973                         return;
1974                 }
1975                 tevent_req_set_callback(subreq, ctdb_g_lock_unlock_deleted,
1976                                         req);
1977                 return;
1978         }
1979
1980         TALLOC_FREE(state->h);
1981         tevent_req_done(req);
1982 }
1983
1984 static int ctdb_g_lock_unlock_update(struct tevent_req *req)
1985 {
1986         struct ctdb_g_lock_unlock_state *state = tevent_req_data(
1987                 req, struct ctdb_g_lock_unlock_state);
1988         struct ctdb_g_lock *lock;
1989         int ret, i;
1990
1991         for (i=0; i<state->lock_list->num; i++) {
1992                 lock = &state->lock_list->lock[i];
1993
1994                 if (ctdb_server_id_equal(&lock->sid, &state->my_sid)) {
1995                         break;
1996                 }
1997         }
1998
1999         if (i < state->lock_list->num) {
2000                 state->lock_list->lock[i] =
2001                         state->lock_list->lock[state->lock_list->num-1];
2002                 state->lock_list->num -= 1;
2003         }
2004
2005         if (state->lock_list->num != 0) {
2006                 TDB_DATA data;
2007
2008                 data.dsize = ctdb_g_lock_list_len(state->lock_list);
2009                 data.dptr = talloc_size(state, data.dsize);
2010                 if (data.dptr == NULL) {
2011                         return ENOMEM;
2012                 }
2013
2014                 ctdb_g_lock_list_push(state->lock_list, data.dptr);
2015                 ret = ctdb_store_record(state->h, data);
2016                 talloc_free(data.dptr);
2017                 if (ret != 0) {
2018                         return ret;
2019                 }
2020         }
2021
2022         return 0;
2023 }
2024
2025 static void ctdb_g_lock_unlock_deleted(struct tevent_req *subreq)
2026 {
2027         struct tevent_req *req = tevent_req_callback_data(
2028                 subreq, struct tevent_req);
2029         struct ctdb_g_lock_unlock_state *state = tevent_req_data(
2030                 req, struct ctdb_g_lock_unlock_state);
2031         int ret;
2032         bool status;
2033
2034         status = ctdb_delete_record_recv(subreq, &ret);
2035         if (! status) {
2036                 DEBUG(DEBUG_ERR,
2037                       ("g_lock_unlock %s delete record failed, ret=%d\n",
2038                        (char *)state->key.dptr, ret));
2039                 tevent_req_error(req, ret);
2040                 return;
2041         }
2042
2043         TALLOC_FREE(state->h);
2044         tevent_req_done(req);
2045 }
2046
2047 bool ctdb_g_lock_unlock_recv(struct tevent_req *req, int *perr)
2048 {
2049         struct ctdb_g_lock_unlock_state *state = tevent_req_data(
2050                 req, struct ctdb_g_lock_unlock_state);
2051         int err;
2052
2053         TALLOC_FREE(state->h);
2054
2055         if (tevent_req_is_unix_error(req, &err)) {
2056                 if (perr != NULL) {
2057                         *perr = err;
2058                 }
2059                 return false;
2060         }
2061
2062         return true;
2063 }
2064
2065 /*
2066  * Persistent database functions
2067  */
2068 struct ctdb_transaction_start_state {
2069         struct tevent_context *ev;
2070         struct ctdb_client_context *client;
2071         struct timeval timeout;
2072         struct ctdb_transaction_handle *h;
2073         uint32_t destnode;
2074 };
2075
2076 static void ctdb_transaction_g_lock_attached(struct tevent_req *subreq);
2077 static void ctdb_transaction_g_lock_done(struct tevent_req *subreq);
2078
2079 struct tevent_req *ctdb_transaction_start_send(TALLOC_CTX *mem_ctx,
2080                                                struct tevent_context *ev,
2081                                                struct ctdb_client_context *client,
2082                                                struct timeval timeout,
2083                                                struct ctdb_db_context *db,
2084                                                bool readonly)
2085 {
2086         struct ctdb_transaction_start_state *state;
2087         struct tevent_req *req, *subreq;
2088         struct ctdb_transaction_handle *h;
2089
2090         req = tevent_req_create(mem_ctx, &state,
2091                                 struct ctdb_transaction_start_state);
2092         if (req == NULL) {
2093                 return NULL;
2094         }
2095
2096         if (! db->persistent) {
2097                 tevent_req_error(req, EINVAL);
2098                 return tevent_req_post(req, ev);
2099         }
2100
2101         state->ev = ev;
2102         state->client = client;
2103         state->destnode = ctdb_client_pnn(client);
2104
2105         h = talloc_zero(db, struct ctdb_transaction_handle);
2106         if (tevent_req_nomem(h, req)) {
2107                 return tevent_req_post(req, ev);
2108         }
2109
2110         h->ev = ev;
2111         h->client = client;
2112         h->db = db;
2113         h->readonly = readonly;
2114         h->updated = false;
2115
2116         /* SRVID is unique for databases, so client can have transactions
2117          * active for multiple databases */
2118         h->sid = ctdb_client_get_server_id(client, db->db_id);
2119
2120         h->recbuf = ctdb_rec_buffer_init(h, db->db_id);
2121         if (tevent_req_nomem(h->recbuf, req)) {
2122                 return tevent_req_post(req, ev);
2123         }
2124
2125         h->lock_name = talloc_asprintf(h, "transaction_db_0x%08x", db->db_id);
2126         if (tevent_req_nomem(h->lock_name, req)) {
2127                 return tevent_req_post(req, ev);
2128         }
2129
2130         state->h = h;
2131
2132         subreq = ctdb_attach_send(state, ev, client, timeout, "g_lock.tdb", 0);
2133         if (tevent_req_nomem(subreq, req)) {
2134                 return tevent_req_post(req, ev);
2135         }
2136         tevent_req_set_callback(subreq, ctdb_transaction_g_lock_attached, req);
2137
2138         return req;
2139 }
2140
2141 static void ctdb_transaction_g_lock_attached(struct tevent_req *subreq)
2142 {
2143         struct tevent_req *req = tevent_req_callback_data(
2144                 subreq, struct tevent_req);
2145         struct ctdb_transaction_start_state *state = tevent_req_data(
2146                 req, struct ctdb_transaction_start_state);
2147         bool status;
2148         int ret;
2149
2150         status = ctdb_attach_recv(subreq, &ret, &state->h->db_g_lock);
2151         TALLOC_FREE(subreq);
2152         if (! status) {
2153                 DEBUG(DEBUG_ERR,
2154                       ("transaction_start: %s attach g_lock.tdb failed\n",
2155                        state->h->db->db_name));
2156                 tevent_req_error(req, ret);
2157                 return;
2158         }
2159
2160         subreq = ctdb_g_lock_lock_send(state, state->ev, state->client,
2161                                        state->h->db_g_lock,
2162                                        state->h->lock_name,
2163                                        &state->h->sid, state->h->readonly);
2164         if (tevent_req_nomem(subreq, req)) {
2165                 return;
2166         }
2167         tevent_req_set_callback(subreq, ctdb_transaction_g_lock_done, req);
2168 }
2169
2170 static void ctdb_transaction_g_lock_done(struct tevent_req *subreq)
2171 {
2172         struct tevent_req *req = tevent_req_callback_data(
2173                 subreq, struct tevent_req);
2174         struct ctdb_transaction_start_state *state = tevent_req_data(
2175                 req, struct ctdb_transaction_start_state);
2176         int ret;
2177         bool status;
2178
2179         status = ctdb_g_lock_lock_recv(subreq, &ret);
2180         TALLOC_FREE(subreq);
2181         if (! status) {
2182                 DEBUG(DEBUG_ERR,
2183                       ("transaction_start: %s g_lock lock failed, ret=%d\n",
2184                        state->h->db->db_name, ret));
2185                 tevent_req_error(req, ret);
2186                 return;
2187         }
2188
2189         tevent_req_done(req);
2190 }
2191
2192 struct ctdb_transaction_handle *ctdb_transaction_start_recv(
2193                                         struct tevent_req *req,
2194                                         int *perr)
2195 {
2196         struct ctdb_transaction_start_state *state = tevent_req_data(
2197                 req, struct ctdb_transaction_start_state);
2198         int err;
2199
2200         if (tevent_req_is_unix_error(req, &err)) {
2201                 if (perr != NULL) {
2202                         *perr = err;
2203                 }
2204                 return NULL;
2205         }
2206
2207         return state->h;
2208 }
2209
2210 int ctdb_transaction_start(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
2211                            struct ctdb_client_context *client,
2212                            struct timeval timeout,
2213                            struct ctdb_db_context *db, bool readonly,
2214                            struct ctdb_transaction_handle **out)
2215 {
2216         struct tevent_req *req;
2217         struct ctdb_transaction_handle *h;
2218         int ret;
2219
2220         req = ctdb_transaction_start_send(mem_ctx, ev, client, timeout, db,
2221                                           readonly);
2222         if (req == NULL) {
2223                 return ENOMEM;
2224         }
2225
2226         tevent_req_poll(req, ev);
2227
2228         h = ctdb_transaction_start_recv(req, &ret);
2229         if (h == NULL) {
2230                 return ret;
2231         }
2232
2233         *out = h;
2234         return 0;
2235 }
2236
2237 struct ctdb_transaction_record_fetch_state {
2238         TDB_DATA key, data;
2239         struct ctdb_ltdb_header header;
2240         bool found;
2241 };
2242
2243 static int ctdb_transaction_record_fetch_traverse(
2244                                 uint32_t reqid,
2245                                 struct ctdb_ltdb_header *nullheader,
2246                                 TDB_DATA key, TDB_DATA data,
2247                                 void *private_data)
2248 {
2249         struct ctdb_transaction_record_fetch_state *state =
2250                 (struct ctdb_transaction_record_fetch_state *)private_data;
2251
2252         if (state->key.dsize == key.dsize &&
2253             memcmp(state->key.dptr, key.dptr, key.dsize) == 0) {
2254                 int ret;
2255
2256                 ret = ctdb_ltdb_header_extract(&data, &state->header);
2257                 if (ret != 0) {
2258                         DEBUG(DEBUG_ERR,
2259                               ("record_fetch: Failed to extract header, "
2260                                "ret=%d\n", ret));
2261                         return 1;
2262                 }
2263
2264                 state->data = data;
2265                 state->found = true;
2266         }
2267
2268         return 0;
2269 }
2270
2271 static int ctdb_transaction_record_fetch(struct ctdb_transaction_handle *h,
2272                                          TDB_DATA key,
2273                                          struct ctdb_ltdb_header *header,
2274                                          TDB_DATA *data)
2275 {
2276         struct ctdb_transaction_record_fetch_state state;
2277         int ret;
2278
2279         state.key = key;
2280         state.found = false;
2281
2282         ret = ctdb_rec_buffer_traverse(h->recbuf,
2283                                        ctdb_transaction_record_fetch_traverse,
2284                                        &state);
2285         if (ret != 0) {
2286                 return ret;
2287         }
2288
2289         if (state.found) {
2290                 if (header != NULL) {
2291                         *header = state.header;
2292                 }
2293                 if (data != NULL) {
2294                         *data = state.data;
2295                 }
2296                 return 0;
2297         }
2298
2299         return ENOENT;
2300 }
2301
2302 int ctdb_transaction_fetch_record(struct ctdb_transaction_handle *h,
2303                                   TDB_DATA key,
2304                                   TALLOC_CTX *mem_ctx, TDB_DATA *data)
2305 {
2306         TDB_DATA tmp_data;
2307         struct ctdb_ltdb_header header;
2308         int ret;
2309
2310         ret = ctdb_transaction_record_fetch(h, key, NULL, &tmp_data);
2311         if (ret == 0) {
2312                 data->dptr = talloc_memdup(mem_ctx, tmp_data.dptr,
2313                                            tmp_data.dsize);
2314                 if (data->dptr == NULL) {
2315                         return ENOMEM;
2316                 }
2317                 data->dsize = tmp_data.dsize;
2318                 return 0;
2319         }
2320
2321         ret = ctdb_ltdb_fetch(h->db, key, &header, mem_ctx, data);
2322         if (ret != 0) {
2323                 return ret;
2324         }
2325
2326         ret = ctdb_rec_buffer_add(h, h->recbuf, 0, &header, key, *data);
2327         if (ret != 0) {
2328                 return ret;
2329         }
2330
2331         return 0;
2332 }
2333
2334 int ctdb_transaction_store_record(struct ctdb_transaction_handle *h,
2335                                   TDB_DATA key, TDB_DATA data)
2336 {
2337         TALLOC_CTX *tmp_ctx;
2338         struct ctdb_ltdb_header header;
2339         TDB_DATA old_data;
2340         int ret;
2341
2342         if (h->readonly) {
2343                 return EINVAL;
2344         }
2345
2346         tmp_ctx = talloc_new(h);
2347         if (tmp_ctx == NULL) {
2348                 return ENOMEM;
2349         }
2350
2351         ret = ctdb_transaction_record_fetch(h, key, &header, &old_data);
2352         if (ret != 0) {
2353                 ret = ctdb_ltdb_fetch(h->db, key, &header, tmp_ctx, &old_data);
2354                 if (ret != 0) {
2355                         return ret;
2356                 }
2357         }
2358
2359         if (old_data.dsize == data.dsize &&
2360             memcmp(old_data.dptr, data.dptr, data.dsize) == 0) {
2361                 talloc_free(tmp_ctx);
2362                 return 0;
2363         }
2364
2365         header.dmaster = ctdb_client_pnn(h->client);
2366         header.rsn += 1;
2367
2368         ret = ctdb_rec_buffer_add(h, h->recbuf, 0, &header, key, data);
2369         talloc_free(tmp_ctx);
2370         if (ret != 0) {
2371                 return ret;
2372         }
2373         h->updated = true;
2374
2375         return 0;
2376 }
2377
2378 int ctdb_transaction_delete_record(struct ctdb_transaction_handle *h,
2379                                    TDB_DATA key)
2380 {
2381         return ctdb_transaction_store_record(h, key, tdb_null);
2382 }
2383
2384 static int ctdb_transaction_fetch_db_seqnum(struct ctdb_transaction_handle *h,
2385                                             uint64_t *seqnum)
2386 {
2387         const char *keyname = CTDB_DB_SEQNUM_KEY;
2388         TDB_DATA key, data;
2389         struct ctdb_ltdb_header header;
2390         int ret;
2391
2392         key.dptr = discard_const(keyname);
2393         key.dsize = strlen(keyname) + 1;
2394
2395         ret = ctdb_ltdb_fetch(h->db, key, &header, h, &data);
2396         if (ret != 0) {
2397                 DEBUG(DEBUG_ERR,
2398                       ("transaction_commit: %s seqnum fetch failed, ret=%d\n",
2399                        h->db->db_name, ret));
2400                 return ret;
2401         }
2402
2403         if (data.dsize == 0) {
2404                 /* initial data */
2405                 *seqnum = 0;
2406                 return 0;
2407         }
2408
2409         if (data.dsize != sizeof(uint64_t)) {
2410                 talloc_free(data.dptr);
2411                 return EINVAL;
2412         }
2413
2414         *seqnum = *(uint64_t *)data.dptr;
2415
2416         talloc_free(data.dptr);
2417         return 0;
2418 }
2419
2420 static int ctdb_transaction_store_db_seqnum(struct ctdb_transaction_handle *h,
2421                                             uint64_t seqnum)
2422 {
2423         const char *keyname = CTDB_DB_SEQNUM_KEY;
2424         TDB_DATA key, data;
2425
2426         key.dptr = discard_const(keyname);
2427         key.dsize = strlen(keyname) + 1;
2428
2429         data.dptr = (uint8_t *)&seqnum;
2430         data.dsize = sizeof(seqnum);
2431
2432         return ctdb_transaction_store_record(h, key, data);
2433 }
2434
2435 struct ctdb_transaction_commit_state {
2436         struct tevent_context *ev;
2437         struct timeval timeout;
2438         struct ctdb_transaction_handle *h;
2439         uint64_t seqnum;
2440 };
2441
2442 static void ctdb_transaction_commit_done(struct tevent_req *subreq);
2443 static void ctdb_transaction_commit_g_lock_done(struct tevent_req *subreq);
2444
2445 struct tevent_req *ctdb_transaction_commit_send(
2446                                         TALLOC_CTX *mem_ctx,
2447                                         struct tevent_context *ev,
2448                                         struct timeval timeout,
2449                                         struct ctdb_transaction_handle *h)
2450 {
2451         struct tevent_req *req, *subreq;
2452         struct ctdb_transaction_commit_state *state;
2453         struct ctdb_req_control request;
2454         int ret;
2455
2456         req = tevent_req_create(mem_ctx, &state,
2457                                 struct ctdb_transaction_commit_state);
2458         if (req == NULL) {
2459                 return NULL;
2460         }
2461
2462         state->ev = ev;
2463         state->timeout = timeout;
2464         state->h = h;
2465
2466         ret = ctdb_transaction_fetch_db_seqnum(h, &state->seqnum);
2467         if (ret != 0) {
2468                 tevent_req_error(req, ret);
2469                 return tevent_req_post(req, ev);
2470         }
2471
2472         ret = ctdb_transaction_store_db_seqnum(h, state->seqnum+1);
2473         if (ret != 0) {
2474                 tevent_req_error(req, ret);
2475                 return tevent_req_post(req, ev);
2476         }
2477
2478         ctdb_req_control_trans3_commit(&request, h->recbuf);
2479         subreq = ctdb_client_control_send(state, ev, h->client,
2480                                           ctdb_client_pnn(h->client),
2481                                           timeout, &request);
2482         if (tevent_req_nomem(subreq, req)) {
2483                 return tevent_req_post(req, ev);
2484         }
2485         tevent_req_set_callback(subreq, ctdb_transaction_commit_done, req);
2486
2487         return req;
2488 }
2489
2490 static void ctdb_transaction_commit_done(struct tevent_req *subreq)
2491 {
2492         struct tevent_req *req = tevent_req_callback_data(
2493                 subreq, struct tevent_req);
2494         struct ctdb_transaction_commit_state *state = tevent_req_data(
2495                 req, struct ctdb_transaction_commit_state);
2496         struct ctdb_transaction_handle *h = state->h;
2497         struct ctdb_reply_control *reply;
2498         uint64_t seqnum;
2499         int ret;
2500         bool status;
2501
2502         status = ctdb_client_control_recv(subreq, &ret, state, &reply);
2503         TALLOC_FREE(subreq);
2504         if (! status) {
2505                 DEBUG(DEBUG_ERR,
2506                       ("transaction_commit: %s TRANS3_COMMIT failed, ret=%d\n",
2507                        h->db->db_name, ret));
2508                 tevent_req_error(req, ret);
2509                 return;
2510         }
2511
2512         ret = ctdb_reply_control_trans3_commit(reply);
2513         talloc_free(reply);
2514
2515         if (ret != 0) {
2516                 /* Control failed due to recovery */
2517
2518                 ret = ctdb_transaction_fetch_db_seqnum(h, &seqnum);
2519                 if (ret != 0) {
2520                         tevent_req_error(req, ret);
2521                         return;
2522                 }
2523
2524                 if (seqnum == state->seqnum) {
2525                         struct ctdb_req_control request;
2526
2527                         /* try again */
2528                         ctdb_req_control_trans3_commit(&request,
2529                                                        state->h->recbuf);
2530                         subreq = ctdb_client_control_send(
2531                                         state, state->ev, state->h->client,
2532                                         ctdb_client_pnn(state->h->client),
2533                                         state->timeout, &request);
2534                         if (tevent_req_nomem(subreq, req)) {
2535                                 return;
2536                         }
2537                         tevent_req_set_callback(subreq,
2538                                                 ctdb_transaction_commit_done,
2539                                                 req);
2540                         return;
2541                 }
2542
2543                 if (seqnum != state->seqnum + 1) {
2544                         DEBUG(DEBUG_ERR,
2545                               ("transaction_commit: %s seqnum mismatch "
2546                                "0x%"PRIx64" != 0x%"PRIx64" + 1\n",
2547                                state->h->db->db_name, seqnum, state->seqnum));
2548                         tevent_req_error(req, EIO);
2549                         return;
2550                 }
2551         }
2552
2553         /* trans3_commit successful */
2554         subreq = ctdb_g_lock_unlock_send(state, state->ev, h->client,
2555                                          h->db_g_lock, h->lock_name, h->sid);
2556         if (tevent_req_nomem(subreq, req)) {
2557                 return;
2558         }
2559         tevent_req_set_callback(subreq, ctdb_transaction_commit_g_lock_done,
2560                                 req);
2561 }
2562
2563 static void ctdb_transaction_commit_g_lock_done(struct tevent_req *subreq)
2564 {
2565         struct tevent_req *req = tevent_req_callback_data(
2566                 subreq, struct tevent_req);
2567         struct ctdb_transaction_commit_state *state = tevent_req_data(
2568                 req, struct ctdb_transaction_commit_state);
2569         int ret;
2570         bool status;
2571
2572         status = ctdb_g_lock_unlock_recv(subreq, &ret);
2573         TALLOC_FREE(subreq);
2574         if (! status) {
2575                 DEBUG(DEBUG_ERR,
2576                       ("transaction_commit: %s g_lock unlock failed, ret=%d\n",
2577                        state->h->db->db_name, ret));
2578                 tevent_req_error(req, ret);
2579                 return;
2580         }
2581
2582         talloc_free(state->h);
2583         tevent_req_done(req);
2584 }
2585
2586 bool ctdb_transaction_commit_recv(struct tevent_req *req, int *perr)
2587 {
2588         int err;
2589
2590         if (tevent_req_is_unix_error(req, &err)) {
2591                 if (perr != NULL) {
2592                         *perr = err;
2593                 }
2594                 return false;
2595         }
2596
2597         return true;
2598 }
2599
2600 int ctdb_transaction_commit(struct ctdb_transaction_handle *h)
2601 {
2602         struct tevent_context *ev = h->ev;
2603         TALLOC_CTX *mem_ctx;
2604         struct tevent_req *req;
2605         int ret;
2606         bool status;
2607
2608         if (h->readonly || ! h->updated) {
2609                 return ctdb_transaction_cancel(h);
2610         }
2611
2612         mem_ctx = talloc_new(NULL);
2613         if (mem_ctx == NULL) {
2614                 return ENOMEM;
2615         }
2616
2617         req = ctdb_transaction_commit_send(mem_ctx, ev,
2618                                            tevent_timeval_zero(), h);
2619         if (req == NULL) {
2620                 talloc_free(mem_ctx);
2621                 return ENOMEM;
2622         }
2623
2624         tevent_req_poll(req, ev);
2625
2626         status = ctdb_transaction_commit_recv(req, &ret);
2627         if (! status) {
2628                 talloc_free(mem_ctx);
2629                 return ret;
2630         }
2631
2632         talloc_free(mem_ctx);
2633         return 0;
2634 }
2635
2636 struct ctdb_transaction_cancel_state {
2637         struct tevent_context *ev;
2638         struct ctdb_transaction_handle *h;
2639         struct timeval timeout;
2640 };
2641
2642 static void ctdb_transaction_cancel_done(struct tevent_req *subreq);
2643
2644 struct tevent_req *ctdb_transaction_cancel_send(
2645                                         TALLOC_CTX *mem_ctx,
2646                                         struct tevent_context *ev,
2647                                         struct timeval timeout,
2648                                         struct ctdb_transaction_handle *h)
2649 {
2650         struct tevent_req *req, *subreq;
2651         struct ctdb_transaction_cancel_state *state;
2652
2653         req = tevent_req_create(mem_ctx, &state,
2654                                 struct ctdb_transaction_cancel_state);
2655         if (req == NULL) {
2656                 return NULL;
2657         }
2658
2659         state->ev = ev;
2660         state->h = h;
2661         state->timeout = timeout;
2662
2663         subreq = ctdb_g_lock_unlock_send(state, state->ev, state->h->client,
2664                                          state->h->db_g_lock,
2665                                          state->h->lock_name, state->h->sid);
2666         if (tevent_req_nomem(subreq, req)) {
2667                 return tevent_req_post(req, ev);
2668         }
2669         tevent_req_set_callback(subreq, ctdb_transaction_cancel_done,
2670                                 req);
2671
2672         return req;
2673 }
2674
2675 static void ctdb_transaction_cancel_done(struct tevent_req *subreq)
2676 {
2677         struct tevent_req *req = tevent_req_callback_data(
2678                 subreq, struct tevent_req);
2679         struct ctdb_transaction_cancel_state *state = tevent_req_data(
2680                 req, struct ctdb_transaction_cancel_state);
2681         int ret;
2682         bool status;
2683
2684         status = ctdb_g_lock_unlock_recv(subreq, &ret);
2685         TALLOC_FREE(subreq);
2686         if (! status) {
2687                 DEBUG(DEBUG_ERR,
2688                       ("transaction_cancel: %s g_lock unlock failed, ret=%d\n",
2689                        state->h->db->db_name, ret));
2690                 talloc_free(state->h);
2691                 tevent_req_error(req, ret);
2692                 return;
2693         }
2694
2695         talloc_free(state->h);
2696         tevent_req_done(req);
2697 }
2698
2699 bool ctdb_transaction_cancel_recv(struct tevent_req *req, int *perr)
2700 {
2701         int err;
2702
2703         if (tevent_req_is_unix_error(req, &err)) {
2704                 if (perr != NULL) {
2705                         *perr = err;
2706                 }
2707                 return false;
2708         }
2709
2710         return true;
2711 }
2712
2713 int ctdb_transaction_cancel(struct ctdb_transaction_handle *h)
2714 {
2715         struct tevent_context *ev = h->ev;
2716         struct tevent_req *req;
2717         TALLOC_CTX *mem_ctx;
2718         int ret;
2719         bool status;
2720
2721         mem_ctx = talloc_new(NULL);
2722         if (mem_ctx == NULL) {
2723                 talloc_free(h);
2724                 return ENOMEM;
2725         }
2726
2727         req = ctdb_transaction_cancel_send(mem_ctx, ev,
2728                                            tevent_timeval_zero(), h);
2729         if (req == NULL) {
2730                 talloc_free(mem_ctx);
2731                 talloc_free(h);
2732                 return ENOMEM;
2733         }
2734
2735         tevent_req_poll(req, ev);
2736
2737         status = ctdb_transaction_cancel_recv(req, &ret);
2738         if (! status) {
2739                 talloc_free(mem_ctx);
2740                 return ret;
2741         }
2742
2743         talloc_free(mem_ctx);
2744         return 0;
2745 }
2746
2747 /*
2748  * TODO:
2749  *
2750  * In future Samba should register SERVER_ID.
2751  * Make that structure same as struct srvid {}.
2752  */