ctdb-client: If g_lock lock conflicts, try again sooner
[obnox/samba/samba-obnox.git] / ctdb / client / client_db.c
1 /*
2    CTDB client code
3
4    Copyright (C) Amitay Isaacs  2015
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "replace.h"
21 #include "system/network.h"
22 #include "system/filesys.h"
23
24 #include <talloc.h>
25 #include <tevent.h>
26 #include <tdb.h>
27
28 #include "common/logging.h"
29
30 #include "lib/tdb_wrap/tdb_wrap.h"
31 #include "lib/util/tevent_unix.h"
32 #include "lib/util/dlinklist.h"
33 #include "lib/util/debug.h"
34
35 #include "protocol/protocol.h"
36 #include "protocol/protocol_api.h"
37 #include "client/client_private.h"
38 #include "client/client.h"
39
40 static struct ctdb_db_context *client_db_handle(
41                                         struct ctdb_client_context *client,
42                                         const char *db_name)
43 {
44         struct ctdb_db_context *db;
45
46         for (db = client->db; db != NULL; db = db->next) {
47                 if (strcmp(db_name, db->db_name) == 0) {
48                         return db;
49                 }
50         }
51
52         return NULL;
53 }
54
55 struct ctdb_set_db_flags_state {
56         struct tevent_context *ev;
57         struct ctdb_client_context *client;
58         struct timeval timeout;
59         uint32_t db_id;
60         uint8_t db_flags;
61         bool readonly_done, sticky_done;
62         uint32_t *pnn_list;
63         int count;
64 };
65
66 static void ctdb_set_db_flags_nodemap_done(struct tevent_req *subreq);
67 static void ctdb_set_db_flags_readonly_done(struct tevent_req *subreq);
68 static void ctdb_set_db_flags_sticky_done(struct tevent_req *subreq);
69
70 static struct tevent_req *ctdb_set_db_flags_send(
71                                 TALLOC_CTX *mem_ctx,
72                                 struct tevent_context *ev,
73                                 struct ctdb_client_context *client,
74                                 uint32_t destnode, struct timeval timeout,
75                                 uint32_t db_id, uint8_t db_flags)
76 {
77         struct tevent_req *req, *subreq;
78         struct ctdb_set_db_flags_state *state;
79         struct ctdb_req_control request;
80
81         req = tevent_req_create(mem_ctx, &state,
82                                 struct ctdb_set_db_flags_state);
83         if (req == NULL) {
84                 return NULL;
85         }
86
87         if (! (db_flags & (CTDB_DB_FLAGS_READONLY | CTDB_DB_FLAGS_STICKY))) {
88                 tevent_req_done(req);
89                 return tevent_req_post(req, ev);
90         }
91
92         state->ev = ev;
93         state->client = client;
94         state->timeout = timeout;
95         state->db_id = db_id;
96         state->db_flags = db_flags;
97
98         ctdb_req_control_get_nodemap(&request);
99         subreq = ctdb_client_control_send(state, ev, client, destnode, timeout,
100                                           &request);
101         if (tevent_req_nomem(subreq, req)) {
102                 return tevent_req_post(req, ev);
103         }
104         tevent_req_set_callback(subreq, ctdb_set_db_flags_nodemap_done, req);
105
106         return req;
107 }
108
109 static void ctdb_set_db_flags_nodemap_done(struct tevent_req *subreq)
110 {
111         struct tevent_req *req = tevent_req_callback_data(
112                 subreq, struct tevent_req);
113         struct ctdb_set_db_flags_state *state = tevent_req_data(
114                 req, struct ctdb_set_db_flags_state);
115         struct ctdb_req_control request;
116         struct ctdb_reply_control *reply;
117         struct ctdb_node_map *nodemap;
118         int ret;
119         bool status;
120
121         status = ctdb_client_control_recv(subreq, &ret, state, &reply);
122         TALLOC_FREE(subreq);
123         if (! status) {
124                 tevent_req_error(req, ret);
125                 return;
126         }
127
128         ret = ctdb_reply_control_get_nodemap(reply, state, &nodemap);
129         talloc_free(reply);
130         if (ret != 0) {
131                 tevent_req_error(req, ret);
132                 return;
133         }
134
135         state->count = list_of_connected_nodes(nodemap, CTDB_UNKNOWN_PNN,
136                                                state, &state->pnn_list);
137         talloc_free(nodemap);
138         if (state->count <= 0) {
139                 tevent_req_error(req, ENOMEM);
140                 return;
141         }
142
143         if (state->db_flags & CTDB_DB_FLAGS_READONLY) {
144                 ctdb_req_control_set_db_readonly(&request, state->db_id);
145                 subreq = ctdb_client_control_multi_send(
146                                         state, state->ev, state->client,
147                                         state->pnn_list, state->count,
148                                         state->timeout, &request);
149                 if (tevent_req_nomem(subreq, req)) {
150                         return;
151                 }
152                 tevent_req_set_callback(subreq,
153                                         ctdb_set_db_flags_readonly_done, req);
154         } else {
155                 state->readonly_done = true;
156         }
157
158         if (state->db_flags & CTDB_DB_FLAGS_STICKY) {
159                 ctdb_req_control_set_db_sticky(&request, state->db_id);
160                 subreq = ctdb_client_control_multi_send(
161                                         state, state->ev, state->client,
162                                         state->pnn_list, state->count,
163                                         state->timeout, &request);
164                 if (tevent_req_nomem(subreq, req)) {
165                         return;
166                 }
167                 tevent_req_set_callback(subreq, ctdb_set_db_flags_sticky_done,
168                                         req);
169         } else {
170                 state->sticky_done = true;
171         }
172 }
173
174 static void ctdb_set_db_flags_readonly_done(struct tevent_req *subreq)
175 {
176         struct tevent_req *req = tevent_req_callback_data(
177                 subreq, struct tevent_req);
178         struct ctdb_set_db_flags_state *state = tevent_req_data(
179                 req, struct ctdb_set_db_flags_state);
180         int ret;
181         bool status;
182
183         status = ctdb_client_control_multi_recv(subreq, &ret, NULL, NULL,
184                                                 NULL);
185         TALLOC_FREE(subreq);
186         if (! status) {
187                 tevent_req_error(req, ret);
188                 return;
189         }
190
191         state->readonly_done = true;
192
193         if (state->readonly_done && state->sticky_done) {
194                 tevent_req_done(req);
195         }
196 }
197
198 static void ctdb_set_db_flags_sticky_done(struct tevent_req *subreq)
199 {
200         struct tevent_req *req = tevent_req_callback_data(
201                 subreq, struct tevent_req);
202         struct ctdb_set_db_flags_state *state = tevent_req_data(
203                 req, struct ctdb_set_db_flags_state);
204         int ret;
205         bool status;
206
207         status = ctdb_client_control_multi_recv(subreq, &ret, NULL, NULL,
208                                                 NULL);
209         TALLOC_FREE(subreq);
210         if (! status) {
211                 tevent_req_error(req, ret);
212                 return;
213         }
214
215         state->sticky_done = true;
216
217         if (state->readonly_done && state->sticky_done) {
218                 tevent_req_done(req);
219         }
220 }
221
222 static bool ctdb_set_db_flags_recv(struct tevent_req *req, int *perr)
223 {
224         int err;
225
226         if (tevent_req_is_unix_error(req, &err)) {
227                 if (perr != NULL) {
228                         *perr = err;
229                 }
230                 return false;
231         }
232         return true;
233 }
234
235 struct ctdb_attach_state {
236         struct tevent_context *ev;
237         struct ctdb_client_context *client;
238         struct timeval timeout;
239         uint32_t destnode;
240         uint8_t db_flags;
241         uint32_t tdb_flags;
242         struct ctdb_db_context *db;
243 };
244
245 static void ctdb_attach_mutex_done(struct tevent_req *subreq);
246 static void ctdb_attach_dbid_done(struct tevent_req *subreq);
247 static void ctdb_attach_dbpath_done(struct tevent_req *subreq);
248 static void ctdb_attach_health_done(struct tevent_req *subreq);
249 static void ctdb_attach_flags_done(struct tevent_req *subreq);
250
251 struct tevent_req *ctdb_attach_send(TALLOC_CTX *mem_ctx,
252                                     struct tevent_context *ev,
253                                     struct ctdb_client_context *client,
254                                     struct timeval timeout,
255                                     const char *db_name, uint8_t db_flags)
256 {
257         struct tevent_req *req, *subreq;
258         struct ctdb_attach_state *state;
259         struct ctdb_req_control request;
260
261         req = tevent_req_create(mem_ctx, &state, struct ctdb_attach_state);
262         if (req == NULL) {
263                 return NULL;
264         }
265
266         state->db = client_db_handle(client, db_name);
267         if (state->db != NULL) {
268                 tevent_req_done(req);
269                 return tevent_req_post(req, ev);
270         }
271
272         state->ev = ev;
273         state->client = client;
274         state->timeout = timeout;
275         state->destnode = ctdb_client_pnn(client);
276         state->db_flags = db_flags;
277
278         state->db = talloc_zero(client, struct ctdb_db_context);
279         if (tevent_req_nomem(state->db, req)) {
280                 return tevent_req_post(req, ev);
281         }
282
283         state->db->db_name = talloc_strdup(state->db, db_name);
284         if (tevent_req_nomem(state->db, req)) {
285                 return tevent_req_post(req, ev);
286         }
287
288         if (db_flags & CTDB_DB_FLAGS_PERSISTENT) {
289                 state->db->persistent = true;
290         }
291
292         ctdb_req_control_get_tunable(&request, "TDBMutexEnabled");
293         subreq = ctdb_client_control_send(state, ev, client,
294                                           ctdb_client_pnn(client), timeout,
295                                           &request);
296         if (tevent_req_nomem(subreq, req)) {
297                 return tevent_req_post(req, ev);
298         }
299         tevent_req_set_callback(subreq, ctdb_attach_mutex_done, req);
300
301         return req;
302 }
303
304 static void ctdb_attach_mutex_done(struct tevent_req *subreq)
305 {
306         struct tevent_req *req = tevent_req_callback_data(
307                 subreq, struct tevent_req);
308         struct ctdb_attach_state *state = tevent_req_data(
309                 req, struct ctdb_attach_state);
310         struct ctdb_reply_control *reply;
311         struct ctdb_req_control request;
312         uint32_t mutex_enabled;
313         int ret;
314         bool status;
315
316         status = ctdb_client_control_recv(subreq, &ret, state, &reply);
317         TALLOC_FREE(subreq);
318         if (! status) {
319                 tevent_req_error(req, ret);
320                 return;
321         }
322
323         ret = ctdb_reply_control_get_tunable(reply, &mutex_enabled);
324         if (ret != 0) {
325                 /* Treat error as mutex support not available */
326                 mutex_enabled = 0;
327         }
328
329         if (state->db->persistent) {
330                 state->tdb_flags = TDB_DEFAULT;
331         } else {
332                 state->tdb_flags = (TDB_NOSYNC | TDB_INCOMPATIBLE_HASH |
333                                     TDB_CLEAR_IF_FIRST);
334                 if (mutex_enabled == 1) {
335                         state->tdb_flags |= TDB_MUTEX_LOCKING;
336                 }
337         }
338
339         if (state->db->persistent) {
340                 ctdb_req_control_db_attach_persistent(&request,
341                                                       state->db->db_name,
342                                                       state->tdb_flags);
343         } else {
344                 ctdb_req_control_db_attach(&request, state->db->db_name,
345                                            state->tdb_flags);
346         }
347
348         subreq = ctdb_client_control_send(state, state->ev, state->client,
349                                           state->destnode, state->timeout,
350                                           &request);
351         if (tevent_req_nomem(subreq, req)) {
352                 return;
353         }
354         tevent_req_set_callback(subreq, ctdb_attach_dbid_done, req);
355 }
356
357 static void ctdb_attach_dbid_done(struct tevent_req *subreq)
358 {
359         struct tevent_req *req = tevent_req_callback_data(
360                 subreq, struct tevent_req);
361         struct ctdb_attach_state *state = tevent_req_data(
362                 req, struct ctdb_attach_state);
363         struct ctdb_req_control request;
364         struct ctdb_reply_control *reply;
365         bool status;
366         int ret;
367
368         status = ctdb_client_control_recv(subreq, &ret, state, &reply);
369         TALLOC_FREE(subreq);
370         if (! status) {
371                 tevent_req_error(req, ret);
372                 return;
373         }
374
375         if (state->db->persistent) {
376                 ret = ctdb_reply_control_db_attach_persistent(
377                                 reply, &state->db->db_id);
378         } else {
379                 ret = ctdb_reply_control_db_attach(reply, &state->db->db_id);
380         }
381         talloc_free(reply);
382         if (ret != 0) {
383                 tevent_req_error(req, ret);
384                 return;
385         }
386
387         ctdb_req_control_getdbpath(&request, state->db->db_id);
388         subreq = ctdb_client_control_send(state, state->ev, state->client,
389                                           state->destnode, state->timeout,
390                                           &request);
391         if (tevent_req_nomem(subreq, req)) {
392                 return;
393         }
394         tevent_req_set_callback(subreq, ctdb_attach_dbpath_done, req);
395 }
396
397 static void ctdb_attach_dbpath_done(struct tevent_req *subreq)
398 {
399         struct tevent_req *req = tevent_req_callback_data(
400                 subreq, struct tevent_req);
401         struct ctdb_attach_state *state = tevent_req_data(
402                 req, struct ctdb_attach_state);
403         struct ctdb_reply_control *reply;
404         struct ctdb_req_control request;
405         bool status;
406         int ret;
407
408         status = ctdb_client_control_recv(subreq, &ret, state, &reply);
409         TALLOC_FREE(subreq);
410         if (! status) {
411                 tevent_req_error(req, ret);
412                 return;
413         }
414
415         ret = ctdb_reply_control_getdbpath(reply, state->db,
416                                            &state->db->db_path);
417         talloc_free(reply);
418         if (ret != 0) {
419                 tevent_req_error(req, ret);
420                 return;
421         }
422
423         ctdb_req_control_db_get_health(&request, state->db->db_id);
424         subreq = ctdb_client_control_send(state, state->ev, state->client,
425                                           state->destnode, state->timeout,
426                                           &request);
427         if (tevent_req_nomem(subreq, req)) {
428                 return;
429         }
430         tevent_req_set_callback(subreq, ctdb_attach_health_done, req);
431 }
432
433 static void ctdb_attach_health_done(struct tevent_req *subreq)
434 {
435         struct tevent_req *req = tevent_req_callback_data(
436                 subreq, struct tevent_req);
437         struct ctdb_attach_state *state = tevent_req_data(
438                 req, struct ctdb_attach_state);
439         struct ctdb_reply_control *reply;
440         const char *reason;
441         bool status;
442         int ret;
443
444         status = ctdb_client_control_recv(subreq, &ret, state, &reply);
445         TALLOC_FREE(subreq);
446         if (! status) {
447                 tevent_req_error(req, ret);
448                 return;
449         }
450
451         ret = ctdb_reply_control_db_get_health(reply, state, &reason);
452         if (ret != 0) {
453                 tevent_req_error(req, ret);
454                 return;
455         }
456
457         if (reason != NULL) {
458                 /* Database unhealthy, avoid attach */
459                 /* FIXME: Log here */
460                 tevent_req_error(req, EIO);
461                 return;
462         }
463
464         subreq = ctdb_set_db_flags_send(state, state->ev, state->client,
465                                         state->destnode, state->timeout,
466                                         state->db->db_id, state->db_flags);
467         if (tevent_req_nomem(subreq, req)) {
468                 return;
469         }
470         tevent_req_set_callback(subreq, ctdb_attach_flags_done, req);
471 }
472
473 static void ctdb_attach_flags_done(struct tevent_req *subreq)
474 {
475         struct tevent_req *req = tevent_req_callback_data(
476                 subreq, struct tevent_req);
477         struct ctdb_attach_state *state = tevent_req_data(
478                 req, struct ctdb_attach_state);
479         bool status;
480         int ret;
481
482         status = ctdb_set_db_flags_recv(subreq, &ret);
483         TALLOC_FREE(subreq);
484         if (! status) {
485                 tevent_req_error(req, ret);
486                 return;
487         }
488
489         state->db->ltdb = tdb_wrap_open(state->db, state->db->db_path, 0,
490                                         state->tdb_flags, O_RDWR, 0);
491         if (tevent_req_nomem(state->db->ltdb, req)) {
492                 return;
493         }
494         DLIST_ADD(state->client->db, state->db);
495
496         tevent_req_done(req);
497 }
498
499 bool ctdb_attach_recv(struct tevent_req *req, int *perr,
500                       struct ctdb_db_context **out)
501 {
502         struct ctdb_attach_state *state = tevent_req_data(
503                 req, struct ctdb_attach_state);
504         int err;
505
506         if (tevent_req_is_unix_error(req, &err)) {
507                 if (perr != NULL) {
508                         *perr = err;
509                 }
510                 return false;
511         }
512
513         if (out != NULL) {
514                 *out = state->db;
515         }
516         return true;
517 }
518
519 int ctdb_attach(struct tevent_context *ev,
520                 struct ctdb_client_context *client,
521                 struct timeval timeout,
522                 const char *db_name, uint8_t db_flags,
523                 struct ctdb_db_context **out)
524 {
525         TALLOC_CTX *mem_ctx;
526         struct tevent_req *req;
527         bool status;
528         int ret;
529
530         mem_ctx = talloc_new(client);
531         if (mem_ctx == NULL) {
532                 return ENOMEM;
533         }
534
535         req = ctdb_attach_send(mem_ctx, ev, client, timeout,
536                                db_name, db_flags);
537         if (req == NULL) {
538                 talloc_free(mem_ctx);
539                 return ENOMEM;
540         }
541
542         tevent_req_poll(req, ev);
543
544         status = ctdb_attach_recv(req, &ret, out);
545         if (! status) {
546                 talloc_free(mem_ctx);
547                 return ret;
548         }
549
550         /*
551         ctdb_set_call(db, CTDB_NULL_FUNC, ctdb_null_func);
552         ctdb_set_call(db, CTDB_FETCH_FUNC, ctdb_fetch_func);
553         ctdb_set_call(db, CTDB_FETCH_WITH_HEADER_FUNC, ctdb_fetch_with_header_func);
554         */
555
556         talloc_free(mem_ctx);
557         return 0;
558 }
559
560 int ctdb_detach(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
561                 struct ctdb_client_context *client,
562                 struct timeval timeout, uint32_t db_id)
563 {
564         struct ctdb_db_context *db;
565         int ret;
566
567         ret = ctdb_ctrl_db_detach(mem_ctx, ev, client, client->pnn, timeout,
568                                   db_id);
569         if (ret != 0) {
570                 return ret;
571         }
572
573         for (db = client->db; db != NULL; db = db->next) {
574                 if (db->db_id == db_id) {
575                         DLIST_REMOVE(client->db, db);
576                         break;
577                 }
578         }
579
580         return 0;
581 }
582
583 uint32_t ctdb_db_id(struct ctdb_db_context *db)
584 {
585         return db->db_id;
586 }
587
588 struct ctdb_db_traverse_state {
589         ctdb_rec_parser_func_t parser;
590         void *private_data;
591         bool extract_header;
592         int error;
593 };
594
595 static int ctdb_db_traverse_handler(struct tdb_context *tdb, TDB_DATA key,
596                                     TDB_DATA data, void *private_data)
597 {
598         struct ctdb_db_traverse_state *state =
599                 (struct ctdb_db_traverse_state *)private_data;
600         int ret;
601
602         if (state->extract_header) {
603                 struct ctdb_ltdb_header header;
604
605                 ret = ctdb_ltdb_header_extract(&data, &header);
606                 if (ret != 0) {
607                         state->error = ret;
608                         return 1;
609                 }
610
611                 ret = state->parser(0, &header, key, data, state->private_data);
612         } else {
613                 ret = state->parser(0, NULL, key, data, state->private_data);
614         }
615
616         if (ret != 0) {
617                 state->error = ret;
618                 return 1;
619         }
620
621         return 0;
622 }
623
624 int ctdb_db_traverse(struct ctdb_db_context *db, bool readonly,
625                      bool extract_header,
626                      ctdb_rec_parser_func_t parser, void *private_data)
627 {
628         struct ctdb_db_traverse_state state;
629         int ret;
630
631         state.parser = parser;
632         state.private_data = private_data;
633         state.extract_header = extract_header;
634         state.error = 0;
635
636         if (readonly) {
637                 ret = tdb_traverse_read(db->ltdb->tdb,
638                                         ctdb_db_traverse_handler, &state);
639         } else {
640                 ret = tdb_traverse(db->ltdb->tdb,
641                                    ctdb_db_traverse_handler, &state);
642         }
643
644         if (ret == -1) {
645                 return EIO;
646         }
647
648         return state.error;
649 }
650
651 static int ctdb_ltdb_fetch(struct ctdb_db_context *db, TDB_DATA key,
652                            struct ctdb_ltdb_header *header,
653                            TALLOC_CTX *mem_ctx, TDB_DATA *data)
654 {
655         TDB_DATA rec;
656         int ret;
657
658         rec = tdb_fetch(db->ltdb->tdb, key);
659         if (rec.dsize < sizeof(struct ctdb_ltdb_header)) {
660                 /* No record present */
661                 if (rec.dptr != NULL) {
662                         free(rec.dptr);
663                 }
664
665                 if (tdb_error(db->ltdb->tdb) != TDB_ERR_NOEXIST) {
666                         return EIO;
667                 }
668
669                 header->rsn = 0;
670                 header->dmaster = CTDB_UNKNOWN_PNN;
671                 header->flags = 0;
672
673                 if (data != NULL) {
674                         *data = tdb_null;
675                 }
676                 return 0;
677         }
678
679         ret = ctdb_ltdb_header_pull(rec.dptr, rec.dsize, header);
680         if (ret != 0) {
681                 return ret;
682         }
683
684         ret = 0;
685         if (data != NULL) {
686                 size_t offset = ctdb_ltdb_header_len(header);
687
688                 data->dsize = rec.dsize - offset;
689                 data->dptr = talloc_memdup(mem_ctx, rec.dptr + offset,
690                                            data->dsize);
691                 if (data->dptr == NULL) {
692                         ret = ENOMEM;
693                 }
694         }
695
696         free(rec.dptr);
697         return ret;
698 }
699
700 /*
701  * Fetch a record from volatile database
702  *
703  * Steps:
704  *  1. Get a lock on the hash chain
705  *  2. If the record does not exist, migrate the record
706  *  3. If readonly=true and delegations do not exist, migrate the record.
707  *  4. If readonly=false and delegations exist, migrate the record.
708  *  5. If the local node is not dmaster, migrate the record.
709  *  6. Return record
710  */
711
712 struct ctdb_fetch_lock_state {
713         struct tevent_context *ev;
714         struct ctdb_client_context *client;
715         struct ctdb_record_handle *h;
716         bool readonly;
717         uint32_t pnn;
718 };
719
720 static int ctdb_fetch_lock_check(struct tevent_req *req);
721 static void ctdb_fetch_lock_migrate(struct tevent_req *req);
722 static void ctdb_fetch_lock_migrate_done(struct tevent_req *subreq);
723
724 struct tevent_req *ctdb_fetch_lock_send(TALLOC_CTX *mem_ctx,
725                                         struct tevent_context *ev,
726                                         struct ctdb_client_context *client,
727                                         struct ctdb_db_context *db,
728                                         TDB_DATA key, bool readonly)
729 {
730         struct ctdb_fetch_lock_state *state;
731         struct tevent_req *req;
732         int ret;
733
734         req = tevent_req_create(mem_ctx, &state, struct ctdb_fetch_lock_state);
735         if (req == NULL) {
736                 return NULL;
737         }
738
739         state->ev = ev;
740         state->client = client;
741
742         state->h = talloc_zero(db, struct ctdb_record_handle);
743         if (tevent_req_nomem(state->h, req)) {
744                 return tevent_req_post(req, ev);
745         }
746         state->h->client = client;
747         state->h->db = db;
748         state->h->key.dptr = talloc_memdup(state->h, key.dptr, key.dsize);
749         if (tevent_req_nomem(state->h->key.dptr, req)) {
750                 return tevent_req_post(req, ev);
751         }
752         state->h->key.dsize = key.dsize;
753         state->h->readonly = false;
754
755         state->readonly = readonly;
756         state->pnn = ctdb_client_pnn(client);
757
758         /* Check that database is not persistent */
759         if (db->persistent) {
760                 tevent_req_error(req, EINVAL);
761                 return tevent_req_post(req, ev);
762         }
763
764         ret = ctdb_fetch_lock_check(req);
765         if (ret == 0) {
766                 tevent_req_done(req);
767                 return tevent_req_post(req, ev);
768         }
769         if (ret != EAGAIN) {
770                 tevent_req_error(req, ret);
771                 return tevent_req_post(req, ev);
772         }
773         return req;
774 }
775
776 static int ctdb_fetch_lock_check(struct tevent_req *req)
777 {
778         struct ctdb_fetch_lock_state *state = tevent_req_data(
779                 req, struct ctdb_fetch_lock_state);
780         struct ctdb_record_handle *h = state->h;
781         struct ctdb_ltdb_header header;
782         TDB_DATA data = tdb_null;
783         int ret, err = 0;
784         bool do_migrate = false;
785
786         ret = tdb_chainlock(state->h->db->ltdb->tdb, state->h->key);
787         if (ret != 0) {
788                 err = EIO;
789                 goto failed;
790         }
791
792         data = tdb_fetch(h->db->ltdb->tdb, h->key);
793         if (data.dptr == NULL) {
794                 if (tdb_error(h->db->ltdb->tdb) == TDB_ERR_NOEXIST) {
795                         goto migrate;
796                 } else {
797                         err = EIO;
798                         goto failed;
799                 }
800         }
801
802         /* Got the record */
803         ret = ctdb_ltdb_header_pull(data.dptr, data.dsize, &header);
804         if (ret != 0) {
805                 err = ret;
806                 goto failed;
807         }
808
809         if (! state->readonly) {
810                 /* Read/write access */
811                 if (header.dmaster == state->pnn &&
812                     header.flags & CTDB_REC_RO_HAVE_DELEGATIONS) {
813                         goto migrate;
814                 }
815
816                 if (header.dmaster != state->pnn) {
817                         goto migrate;
818                 }
819         } else {
820                 /* Readonly access */
821                 if (header.dmaster != state->pnn &&
822                     ! (header.flags & (CTDB_REC_RO_HAVE_READONLY |
823                                        CTDB_REC_RO_HAVE_DELEGATIONS))) {
824                         goto migrate;
825                 }
826         }
827
828         /* We are the dmaster or readonly delegation */
829         h->header = header;
830         h->data = data;
831         if (header.flags & (CTDB_REC_RO_HAVE_READONLY |
832                             CTDB_REC_RO_HAVE_DELEGATIONS)) {
833                 h->readonly = true;
834         }
835         return 0;
836
837 migrate:
838         do_migrate = true;
839         err = EAGAIN;
840
841 failed:
842         if (data.dptr != NULL) {
843                 free(data.dptr);
844         }
845         ret = tdb_chainunlock(h->db->ltdb->tdb, h->key);
846         if (ret != 0) {
847                 DEBUG(DEBUG_ERR, ("tdb_chainunlock failed on %s\n",
848                                   h->db->db_name));
849                 return EIO;
850         }
851
852         if (do_migrate) {
853                 ctdb_fetch_lock_migrate(req);
854         }
855         return err;
856 }
857
858 static void ctdb_fetch_lock_migrate(struct tevent_req *req)
859 {
860         struct ctdb_fetch_lock_state *state = tevent_req_data(
861                 req, struct ctdb_fetch_lock_state);
862         struct ctdb_req_call request;
863         struct tevent_req *subreq;
864
865         ZERO_STRUCT(request);
866         request.flags = CTDB_IMMEDIATE_MIGRATION;
867         if (state->readonly) {
868                 request.flags |= CTDB_WANT_READONLY;
869         }
870         request.db_id = state->h->db->db_id;
871         request.callid = CTDB_NULL_FUNC;
872         request.key = state->h->key;
873         request.calldata = tdb_null;
874
875         subreq = ctdb_client_call_send(state, state->ev, state->client,
876                                        &request);
877         if (tevent_req_nomem(subreq, req)) {
878                 return;
879         }
880
881         tevent_req_set_callback(subreq, ctdb_fetch_lock_migrate_done, req);
882 }
883
884 static void ctdb_fetch_lock_migrate_done(struct tevent_req *subreq)
885 {
886         struct tevent_req *req = tevent_req_callback_data(
887                 subreq, struct tevent_req);
888         struct ctdb_fetch_lock_state *state = tevent_req_data(
889                 req, struct ctdb_fetch_lock_state);
890         struct ctdb_reply_call *reply;
891         int ret;
892         bool status;
893
894         status = ctdb_client_call_recv(subreq, state, &reply, &ret);
895         TALLOC_FREE(subreq);
896         if (! status) {
897                 tevent_req_error(req, ret);
898                 return;
899         }
900
901         if (reply->status != 0) {
902                 tevent_req_error(req, EIO);
903                 return;
904         }
905         talloc_free(reply);
906
907         ret = ctdb_fetch_lock_check(req);
908         if (ret != 0) {
909                 if (ret != EAGAIN) {
910                         tevent_req_error(req, ret);
911                 }
912                 return;
913         }
914
915         tevent_req_done(req);
916 }
917
918 static int ctdb_record_handle_destructor(struct ctdb_record_handle *h)
919 {
920         tdb_chainunlock(h->db->ltdb->tdb, h->key);
921         free(h->data.dptr);
922         return 0;
923 }
924
925 struct ctdb_record_handle *ctdb_fetch_lock_recv(struct tevent_req *req,
926                                                 struct ctdb_ltdb_header *header,
927                                                 TALLOC_CTX *mem_ctx,
928                                                 TDB_DATA *data, int *perr)
929 {
930         struct ctdb_fetch_lock_state *state = tevent_req_data(
931                 req, struct ctdb_fetch_lock_state);
932         struct ctdb_record_handle *h = state->h;
933         int err;
934
935         if (tevent_req_is_unix_error(req, &err)) {
936                 if (perr != NULL) {
937                         *perr = err;
938                 }
939                 return NULL;
940         }
941
942         if (header != NULL) {
943                 *header = h->header;
944         }
945         if (data != NULL) {
946                 size_t offset;
947
948                 offset = ctdb_ltdb_header_len(&h->header);
949
950                 data->dsize = h->data.dsize - offset;
951                 data->dptr = talloc_memdup(mem_ctx, h->data.dptr + offset,
952                                            data->dsize);
953                 if (data->dptr == NULL) {
954                         TALLOC_FREE(state->h);
955                         if (perr != NULL) {
956                                 *perr = ENOMEM;
957                         }
958                         return NULL;
959                 }
960         }
961
962         talloc_set_destructor(h, ctdb_record_handle_destructor);
963         return h;
964 }
965
966 int ctdb_fetch_lock(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
967                     struct ctdb_client_context *client,
968                     struct ctdb_db_context *db, TDB_DATA key, bool readonly,
969                     struct ctdb_record_handle **out,
970                     struct ctdb_ltdb_header *header, TDB_DATA *data)
971 {
972         struct tevent_req *req;
973         struct ctdb_record_handle *h;
974         int ret;
975
976         req = ctdb_fetch_lock_send(mem_ctx, ev, client, db, key, readonly);
977         if (req == NULL) {
978                 return ENOMEM;
979         }
980
981         tevent_req_poll(req, ev);
982
983         h = ctdb_fetch_lock_recv(req, header, mem_ctx, data, &ret);
984         if (h == NULL) {
985                 return ret;
986         }
987
988         *out = h;
989         return 0;
990 }
991
992 int ctdb_store_record(struct ctdb_record_handle *h, TDB_DATA data)
993 {
994         TDB_DATA rec;
995         size_t offset;
996         int ret;
997
998         /* Cannot modify the record if it was obtained as a readonly copy */
999         if (h->readonly) {
1000                 return EINVAL;
1001         }
1002
1003         /* Check if the new data is same */
1004         if (h->data.dsize == data.dsize &&
1005             memcmp(h->data.dptr, data.dptr, data.dsize) == 0) {
1006                 /* No need to do anything */
1007                 return 0;
1008         }
1009
1010         offset = ctdb_ltdb_header_len(&h->header);
1011         rec.dsize = offset + data.dsize;
1012         rec.dptr = talloc_size(h, rec.dsize);
1013         if (rec.dptr == NULL) {
1014                 return ENOMEM;
1015         }
1016
1017         ctdb_ltdb_header_push(&h->header, rec.dptr);
1018         memcpy(rec.dptr + offset, data.dptr, data.dsize);
1019
1020         ret = tdb_store(h->db->ltdb->tdb, h->key, rec, TDB_REPLACE);
1021         if (ret != 0) {
1022                 DEBUG(DEBUG_ERR, ("Failed to store record in DB %s\n",
1023                                   h->db->db_name));
1024                 return EIO;
1025         }
1026
1027         talloc_free(rec.dptr);
1028         return 0;
1029 }
1030
1031 struct ctdb_delete_record_state {
1032         struct ctdb_record_handle *h;
1033 };
1034
1035 static void ctdb_delete_record_done(struct tevent_req *subreq);
1036
1037 struct tevent_req *ctdb_delete_record_send(TALLOC_CTX *mem_ctx,
1038                                            struct tevent_context *ev,
1039                                            struct ctdb_record_handle *h)
1040 {
1041         struct tevent_req *req, *subreq;
1042         struct ctdb_delete_record_state *state;
1043         struct ctdb_key_data key;
1044         struct ctdb_req_control request;
1045         TDB_DATA rec;
1046         int ret;
1047
1048         req = tevent_req_create(mem_ctx, &state,
1049                                 struct ctdb_delete_record_state);
1050         if (req == NULL) {
1051                 return NULL;
1052         }
1053
1054         state->h = h;
1055
1056         /* Cannot delete the record if it was obtained as a readonly copy */
1057         if (h->readonly) {
1058                 tevent_req_error(req, EINVAL);
1059                 return tevent_req_post(req, ev);
1060         }
1061
1062         rec.dsize = ctdb_ltdb_header_len(&h->header);
1063         rec.dptr = talloc_size(h, rec.dsize);
1064         if (tevent_req_nomem(rec.dptr, req)) {
1065                 return tevent_req_post(req, ev);
1066         }
1067
1068         ctdb_ltdb_header_push(&h->header, rec.dptr);
1069
1070         ret = tdb_store(h->db->ltdb->tdb, h->key, rec, TDB_REPLACE);
1071         talloc_free(rec.dptr);
1072         if (ret != 0) {
1073                 DEBUG(DEBUG_ERR, ("Failed to delete record in DB %s\n",
1074                                   h->db->db_name));
1075                 tevent_req_error(req, EIO);
1076                 return tevent_req_post(req, ev);
1077         }
1078
1079         key.db_id = h->db->db_id;
1080         key.header = h->header;
1081         key.key = h->key;
1082
1083         ctdb_req_control_schedule_for_deletion(&request, &key);
1084         subreq = ctdb_client_control_send(state, ev, h->client,
1085                                           ctdb_client_pnn(h->client),
1086                                           tevent_timeval_zero(),
1087                                           &request);
1088         if (tevent_req_nomem(subreq, req)) {
1089                 return tevent_req_post(req, ev);
1090         }
1091         tevent_req_set_callback(subreq, ctdb_delete_record_done, req);
1092
1093         return req;
1094 }
1095
1096 static void ctdb_delete_record_done(struct tevent_req *subreq)
1097 {
1098         struct tevent_req *req = tevent_req_callback_data(
1099                 subreq, struct tevent_req);
1100         struct ctdb_delete_record_state *state = tevent_req_data(
1101                 req, struct ctdb_delete_record_state);
1102         int ret;
1103         bool status;
1104
1105         status = ctdb_client_control_recv(subreq, &ret, NULL, NULL);
1106         TALLOC_FREE(subreq);
1107         if (! status) {
1108                 DEBUG(DEBUG_ERR,
1109                       ("delete_record: %s SCHDULE_FOR_DELETION failed, "
1110                        "ret=%d\n", state->h->db->db_name, ret));
1111                 tevent_req_error(req, ret);
1112                 return;
1113         }
1114
1115         tevent_req_done(req);
1116 }
1117
1118 bool ctdb_delete_record_recv(struct tevent_req *req, int *perr)
1119 {
1120         int err;
1121
1122         if (tevent_req_is_unix_error(req, &err)) {
1123                 if (perr != NULL) {
1124                         *perr = err;
1125                 }
1126                 return false;
1127         }
1128
1129         return true;
1130 }
1131
1132
1133 int ctdb_delete_record(struct ctdb_record_handle *h)
1134 {
1135         struct tevent_context *ev = h->ev;
1136         TALLOC_CTX *mem_ctx;
1137         struct tevent_req *req;
1138         int ret;
1139         bool status;
1140
1141         mem_ctx = talloc_new(NULL);
1142         if (mem_ctx == NULL) {
1143                 return ENOMEM;
1144         }
1145
1146         req = ctdb_delete_record_send(mem_ctx, ev, h);
1147         if (req == NULL) {
1148                 talloc_free(mem_ctx);
1149                 return ENOMEM;
1150         }
1151
1152         tevent_req_poll(req, ev);
1153
1154         status = ctdb_delete_record_recv(req, &ret);
1155         talloc_free(mem_ctx);
1156         if (! status) {
1157                 return ret;
1158         }
1159
1160         return 0;
1161 }
1162
1163 /*
1164  * Global lock functions
1165  */
1166
1167 struct ctdb_g_lock_lock_state {
1168         struct tevent_context *ev;
1169         struct ctdb_client_context *client;
1170         struct ctdb_db_context *db;
1171         TDB_DATA key;
1172         struct ctdb_server_id my_sid;
1173         enum ctdb_g_lock_type lock_type;
1174         struct ctdb_record_handle *h;
1175         /* state for verification of active locks */
1176         struct ctdb_g_lock_list *lock_list;
1177         unsigned int current;
1178 };
1179
1180 static void ctdb_g_lock_lock_fetched(struct tevent_req *subreq);
1181 static void ctdb_g_lock_lock_process_locks(struct tevent_req *req);
1182 static void ctdb_g_lock_lock_checked(struct tevent_req *subreq);
1183 static int ctdb_g_lock_lock_update(struct tevent_req *req);
1184 static void ctdb_g_lock_lock_retry(struct tevent_req *subreq);
1185
1186 static bool ctdb_g_lock_conflicts(enum ctdb_g_lock_type l1,
1187                                   enum ctdb_g_lock_type l2)
1188 {
1189         if ((l1 == CTDB_G_LOCK_READ) && (l2 == CTDB_G_LOCK_READ)) {
1190                 return false;
1191         }
1192         return true;
1193 }
1194
1195 struct tevent_req *ctdb_g_lock_lock_send(TALLOC_CTX *mem_ctx,
1196                                          struct tevent_context *ev,
1197                                          struct ctdb_client_context *client,
1198                                          struct ctdb_db_context *db,
1199                                          const char *keyname,
1200                                          struct ctdb_server_id *sid,
1201                                          bool readonly)
1202 {
1203         struct tevent_req *req, *subreq;
1204         struct ctdb_g_lock_lock_state *state;
1205
1206         req = tevent_req_create(mem_ctx, &state,
1207                                 struct ctdb_g_lock_lock_state);
1208         if (req == NULL) {
1209                 return NULL;
1210         }
1211
1212         state->ev = ev;
1213         state->client = client;
1214         state->db = db;
1215         state->key.dptr = discard_const(keyname);
1216         state->key.dsize = strlen(keyname) + 1;
1217         state->my_sid = *sid;
1218         state->lock_type = (readonly ? CTDB_G_LOCK_READ : CTDB_G_LOCK_WRITE);
1219
1220         subreq = ctdb_fetch_lock_send(state, ev, client, db, state->key,
1221                                       false);
1222         if (tevent_req_nomem(subreq, req)) {
1223                 return tevent_req_post(req, ev);
1224         }
1225         tevent_req_set_callback(subreq, ctdb_g_lock_lock_fetched, req);
1226
1227         return req;
1228 }
1229
1230 static void ctdb_g_lock_lock_fetched(struct tevent_req *subreq)
1231 {
1232         struct tevent_req *req = tevent_req_callback_data(
1233                 subreq, struct tevent_req);
1234         struct ctdb_g_lock_lock_state *state = tevent_req_data(
1235                 req, struct ctdb_g_lock_lock_state);
1236         TDB_DATA data;
1237         int ret = 0;
1238
1239         state->h = ctdb_fetch_lock_recv(subreq, NULL, state, &data, &ret);
1240         TALLOC_FREE(subreq);
1241         if (state->h == NULL) {
1242                 tevent_req_error(req, ret);
1243                 return;
1244         }
1245
1246         if (state->lock_list != NULL) {
1247                 TALLOC_FREE(state->lock_list);
1248                 state->current = 0;
1249         }
1250
1251         ret = ctdb_g_lock_list_pull(data.dptr, data.dsize, state,
1252                                     &state->lock_list);
1253         talloc_free(data.dptr);
1254         if (ret != 0) {
1255                 tevent_req_error(req, ret);
1256                 return;
1257         }
1258
1259         ctdb_g_lock_lock_process_locks(req);
1260 }
1261
1262 static void ctdb_g_lock_lock_process_locks(struct tevent_req *req)
1263 {
1264         struct ctdb_g_lock_lock_state *state = tevent_req_data(
1265                 req, struct ctdb_g_lock_lock_state);
1266         struct tevent_req *subreq;
1267         struct ctdb_g_lock *lock;
1268         bool check_server = false;
1269         int ret;
1270
1271         while (state->current < state->lock_list->num) {
1272                 lock = &state->lock_list->lock[state->current];
1273
1274                 /* We should not ask for the same lock more than once */
1275                 if (ctdb_server_id_equal(&lock->sid, &state->my_sid)) {
1276                         tevent_req_error(req, EDEADLK);
1277                         return;
1278                 }
1279
1280                 if (ctdb_g_lock_conflicts(lock->type, state->lock_type)) {
1281                         check_server = true;
1282                         break;
1283                 }
1284
1285                 state->current += 1;
1286         }
1287
1288         if (check_server) {
1289                 struct ctdb_req_control request;
1290                 struct ctdb_uint64_array u64_array;
1291
1292                 u64_array.num = 1;
1293                 u64_array.val = &lock->sid.unique_id;
1294
1295                 ctdb_req_control_check_srvids(&request, &u64_array);
1296                 subreq = ctdb_client_control_send(state, state->ev,
1297                                                   state->client,
1298                                                   state->client->pnn,
1299                                                   tevent_timeval_zero(),
1300                                                   &request);
1301                 if (tevent_req_nomem(subreq, req)) {
1302                         return;
1303                 }
1304                 tevent_req_set_callback(subreq, ctdb_g_lock_lock_checked, req);
1305                 return;
1306         }
1307
1308         /* There is no conflict, add ourself to the lock_list */
1309         state->lock_list->lock = talloc_realloc(state->lock_list,
1310                                                 state->lock_list->lock,
1311                                                 struct ctdb_g_lock,
1312                                                 state->lock_list->num + 1);
1313         if (state->lock_list->lock == NULL) {
1314                 tevent_req_error(req, ENOMEM);
1315                 return;
1316         }
1317
1318         lock = &state->lock_list->lock[state->lock_list->num];
1319         lock->type = state->lock_type;
1320         lock->sid = state->my_sid;
1321         state->lock_list->num += 1;
1322
1323         ret = ctdb_g_lock_lock_update(req);
1324         if (ret != 0) {
1325                 tevent_req_error(req, ret);
1326                 return;
1327         }
1328
1329         tevent_req_done(req);
1330 }
1331
1332 static void ctdb_g_lock_lock_checked(struct tevent_req *subreq)
1333 {
1334         struct tevent_req *req = tevent_req_callback_data(
1335                 subreq, struct tevent_req);
1336         struct ctdb_g_lock_lock_state *state = tevent_req_data(
1337                 req, struct ctdb_g_lock_lock_state);
1338         struct ctdb_reply_control *reply;
1339         struct ctdb_uint8_array *u8_array;
1340         int ret;
1341         bool status;
1342         int8_t val;
1343
1344         status = ctdb_client_control_recv(subreq, &ret, state, &reply);
1345         TALLOC_FREE(subreq);
1346         if (! status) {
1347                 tevent_req_error(req, ret);
1348                 return;
1349         }
1350
1351         ret = ctdb_reply_control_check_srvids(reply, state, &u8_array);
1352         if (ret != 0) {
1353                 tevent_req_error(req, ENOMEM);
1354                 return;
1355         }
1356
1357         if (u8_array->num != 1) {
1358                 talloc_free(u8_array);
1359                 tevent_req_error(req, EIO);
1360                 return;
1361         }
1362
1363         val = u8_array->val[0];
1364         talloc_free(u8_array);
1365
1366         if (val == 1) {
1367                 /* server process exists, need to retry */
1368                 subreq = tevent_wakeup_send(state, state->ev,
1369                                             tevent_timeval_current_ofs(0,1000));
1370                 if (tevent_req_nomem(subreq, req)) {
1371                         return;
1372                 }
1373                 tevent_req_set_callback(subreq, ctdb_g_lock_lock_retry, req);
1374                 return;
1375         }
1376
1377         /* server process does not exist, remove conflicting entry */
1378         state->lock_list->lock[state->current] =
1379                 state->lock_list->lock[state->lock_list->num-1];
1380         state->lock_list->num -= 1;
1381
1382         ret = ctdb_g_lock_lock_update(req);
1383         if (ret != 0) {
1384                 tevent_req_error(req, ret);
1385                 return;
1386         }
1387
1388         ctdb_g_lock_lock_process_locks(req);
1389 }
1390
1391 static int ctdb_g_lock_lock_update(struct tevent_req *req)
1392 {
1393         struct ctdb_g_lock_lock_state *state = tevent_req_data(
1394                 req, struct ctdb_g_lock_lock_state);
1395         TDB_DATA data;
1396         int ret;
1397
1398         data.dsize = ctdb_g_lock_list_len(state->lock_list);
1399         data.dptr = talloc_size(state, data.dsize);
1400         if (data.dptr == NULL) {
1401                 return ENOMEM;
1402         }
1403
1404         ctdb_g_lock_list_push(state->lock_list, data.dptr);
1405         ret = ctdb_store_record(state->h, data);
1406         talloc_free(data.dptr);
1407         return ret;
1408 }
1409
1410 #if 0
1411 static int ctdb_g_lock_lock_update(struct ctdb_g_lock_lock_state *state,
1412                                    struct ctdb_g_lock_list *lock_list,
1413                                    struct ctdb_record_handle *h)
1414 {
1415         struct ctdb_g_lock *lock;
1416         bool conflict = false;
1417         bool modified = false;
1418         int ret, i;
1419
1420         for (i=0; i<lock_list->num; i++) {
1421                 lock = &lock_list->lock[i];
1422
1423                 /* We should not ask for lock more than once */
1424                 if (ctdb_server_id_equal(&lock->sid, &state->my_sid)) {
1425                         return EDEADLK;
1426                 }
1427
1428                 if (ctdb_g_lock_conflicts(lock->type, state->lock_type)) {
1429                         bool exists;
1430
1431                         conflict = true;
1432                         ret = ctdb_server_id_exists(state->client, &lock->sid,
1433                                                     &exists);
1434                         if (ret != 0) {
1435                                 return ret;
1436                         }
1437
1438                         if (exists) {
1439                                 break;
1440                         }
1441
1442                         /* Server does not exist, delete conflicting entry */
1443                         lock_list->lock[i] = lock_list->lock[lock_list->num-1];
1444                         lock_list->num -= 1;
1445                         modified = true;
1446                 }
1447         }
1448
1449         if (! conflict) {
1450                 lock = talloc_realloc(lock_list, lock_list->lock,
1451                                       struct ctdb_g_lock, lock_list->num+1);
1452                 if (lock == NULL) {
1453                         return ENOMEM;
1454                 }
1455
1456                 lock[lock_list->num].type = state->lock_type;
1457                 lock[lock_list->num].sid = state->my_sid;
1458                 lock_list->lock = lock;
1459                 lock_list->num += 1;
1460                 modified = true;
1461         }
1462
1463         if (modified) {
1464                 TDB_DATA data;
1465
1466                 data.dsize = ctdb_g_lock_list_len(lock_list);
1467                 data.dptr = talloc_size(state, data.dsize);
1468                 if (data.dptr == NULL) {
1469                         return ENOMEM;
1470                 }
1471
1472                 ctdb_g_lock_list_push(lock_list, data.dptr);
1473                 ret = ctdb_store_record(h, data);
1474                 talloc_free(data.dptr);
1475                 if (ret != 0) {
1476                         return ret;
1477                 }
1478         }
1479
1480         if (conflict) {
1481                 return EAGAIN;
1482         }
1483         return 0;
1484 }
1485 #endif
1486
1487 static void ctdb_g_lock_lock_retry(struct tevent_req *subreq)
1488 {
1489         struct tevent_req *req = tevent_req_callback_data(
1490                 subreq, struct tevent_req);
1491         struct ctdb_g_lock_lock_state *state = tevent_req_data(
1492                 req, struct ctdb_g_lock_lock_state);
1493         bool success;
1494
1495         success = tevent_wakeup_recv(subreq);
1496         TALLOC_FREE(subreq);
1497         if (! success) {
1498                 tevent_req_error(req, ENOMEM);
1499                 return;
1500         }
1501
1502         subreq = ctdb_fetch_lock_send(state, state->ev, state->client,
1503                                       state->db, state->key, false);
1504         if (tevent_req_nomem(subreq, req)) {
1505                 return;
1506         }
1507         tevent_req_set_callback(subreq, ctdb_g_lock_lock_fetched, req);
1508 }
1509
1510 bool ctdb_g_lock_lock_recv(struct tevent_req *req, int *perr)
1511 {
1512         struct ctdb_g_lock_lock_state *state = tevent_req_data(
1513                 req, struct ctdb_g_lock_lock_state);
1514         int err;
1515
1516         TALLOC_FREE(state->h);
1517
1518         if (tevent_req_is_unix_error(req, &err)) {
1519                 if (perr != NULL) {
1520                         *perr = err;
1521                 }
1522                 return false;
1523         }
1524
1525         return true;
1526 }
1527
1528 struct ctdb_g_lock_unlock_state {
1529         struct tevent_context *ev;
1530         struct ctdb_client_context *client;
1531         struct ctdb_db_context *db;
1532         TDB_DATA key;
1533         struct ctdb_server_id my_sid;
1534         struct ctdb_record_handle *h;
1535         struct ctdb_g_lock_list *lock_list;
1536 };
1537
1538 static void ctdb_g_lock_unlock_fetched(struct tevent_req *subreq);
1539 static int ctdb_g_lock_unlock_update(struct tevent_req *req);
1540 static void ctdb_g_lock_unlock_deleted(struct tevent_req *subreq);
1541
1542 struct tevent_req *ctdb_g_lock_unlock_send(TALLOC_CTX *mem_ctx,
1543                                            struct tevent_context *ev,
1544                                            struct ctdb_client_context *client,
1545                                            struct ctdb_db_context *db,
1546                                            const char *keyname,
1547                                            struct ctdb_server_id sid)
1548 {
1549         struct tevent_req *req, *subreq;
1550         struct ctdb_g_lock_unlock_state *state;
1551
1552         req = tevent_req_create(mem_ctx, &state,
1553                                 struct ctdb_g_lock_unlock_state);
1554         if (req == NULL) {
1555                 return NULL;
1556         }
1557
1558         state->ev = ev;
1559         state->client = client;
1560         state->db = db;
1561         state->key.dptr = discard_const(keyname);
1562         state->key.dsize = strlen(keyname) + 1;
1563         state->my_sid = sid;
1564
1565         subreq = ctdb_fetch_lock_send(state, ev, client, db, state->key,
1566                                       false);
1567         if (tevent_req_nomem(subreq, req)) {
1568                 return tevent_req_post(req, ev);
1569         }
1570         tevent_req_set_callback(subreq, ctdb_g_lock_unlock_fetched, req);
1571
1572         return req;
1573 }
1574
1575 static void ctdb_g_lock_unlock_fetched(struct tevent_req *subreq)
1576 {
1577         struct tevent_req *req = tevent_req_callback_data(
1578                 subreq, struct tevent_req);
1579         struct ctdb_g_lock_unlock_state *state = tevent_req_data(
1580                 req, struct ctdb_g_lock_unlock_state);
1581         TDB_DATA data;
1582         int ret = 0;
1583
1584         state->h = ctdb_fetch_lock_recv(subreq, NULL, state, &data, &ret);
1585         TALLOC_FREE(subreq);
1586         if (state->h == NULL) {
1587                 tevent_req_error(req, ret);
1588                 return;
1589         }
1590
1591         ret = ctdb_g_lock_list_pull(data.dptr, data.dsize, state,
1592                                     &state->lock_list);
1593         if (ret != 0) {
1594                 tevent_req_error(req, ret);
1595                 return;
1596         }
1597
1598         ret = ctdb_g_lock_unlock_update(req);
1599         if (ret != 0) {
1600                 tevent_req_error(req, ret);
1601                 return;
1602         }
1603
1604         if (state->lock_list->num == 0) {
1605                 subreq = ctdb_delete_record_send(state, state->ev, state->h);
1606                 if (tevent_req_nomem(subreq, req)) {
1607                         return;
1608                 }
1609                 tevent_req_set_callback(subreq, ctdb_g_lock_unlock_deleted,
1610                                         req);
1611                 return;
1612         }
1613
1614         tevent_req_done(req);
1615 }
1616
1617 static int ctdb_g_lock_unlock_update(struct tevent_req *req)
1618 {
1619         struct ctdb_g_lock_unlock_state *state = tevent_req_data(
1620                 req, struct ctdb_g_lock_unlock_state);
1621         struct ctdb_g_lock *lock;
1622         int ret, i;
1623
1624         for (i=0; i<state->lock_list->num; i++) {
1625                 lock = &state->lock_list->lock[i];
1626
1627                 if (ctdb_server_id_equal(&lock->sid, &state->my_sid)) {
1628                         break;
1629                 }
1630         }
1631
1632         if (i < state->lock_list->num) {
1633                 state->lock_list->lock[i] =
1634                         state->lock_list->lock[state->lock_list->num-1];
1635                 state->lock_list->num -= 1;
1636         }
1637
1638         if (state->lock_list->num != 0) {
1639                 TDB_DATA data;
1640
1641                 data.dsize = ctdb_g_lock_list_len(state->lock_list);
1642                 data.dptr = talloc_size(state, data.dsize);
1643                 if (data.dptr == NULL) {
1644                         return ENOMEM;
1645                 }
1646
1647                 ctdb_g_lock_list_push(state->lock_list, data.dptr);
1648                 ret = ctdb_store_record(state->h, data);
1649                 talloc_free(data.dptr);
1650                 if (ret != 0) {
1651                         return ret;
1652                 }
1653         }
1654
1655         return 0;
1656 }
1657
1658 static void ctdb_g_lock_unlock_deleted(struct tevent_req *subreq)
1659 {
1660         struct tevent_req *req = tevent_req_callback_data(
1661                 subreq, struct tevent_req);
1662         struct ctdb_g_lock_unlock_state *state = tevent_req_data(
1663                 req, struct ctdb_g_lock_unlock_state);
1664         int ret;
1665         bool status;
1666
1667         status = ctdb_delete_record_recv(subreq, &ret);
1668         if (! status) {
1669                 DEBUG(DEBUG_ERR,
1670                       ("g_lock_unlock %s delete record failed, ret=%d\n",
1671                        (char *)state->key.dptr, ret));
1672                 tevent_req_error(req, ret);
1673                 return;
1674         }
1675
1676         tevent_req_done(req);
1677 }
1678
1679 bool ctdb_g_lock_unlock_recv(struct tevent_req *req, int *perr)
1680 {
1681         struct ctdb_g_lock_unlock_state *state = tevent_req_data(
1682                 req, struct ctdb_g_lock_unlock_state);
1683         int err;
1684
1685         TALLOC_FREE(state->h);
1686
1687         if (tevent_req_is_unix_error(req, &err)) {
1688                 if (perr != NULL) {
1689                         *perr = err;
1690                 }
1691                 return false;
1692         }
1693
1694         return true;
1695 }
1696
1697 /*
1698  * Persistent database functions
1699  */
1700 struct ctdb_transaction_start_state {
1701         struct tevent_context *ev;
1702         struct ctdb_client_context *client;
1703         struct timeval timeout;
1704         struct ctdb_transaction_handle *h;
1705         uint32_t destnode;
1706 };
1707
1708 static void ctdb_transaction_g_lock_attached(struct tevent_req *subreq);
1709 static void ctdb_transaction_register_done(struct tevent_req *subreq);
1710 static void ctdb_transaction_g_lock_done(struct tevent_req *subreq);
1711 static int ctdb_transaction_handle_destructor(struct ctdb_transaction_handle *h);
1712
1713 struct tevent_req *ctdb_transaction_start_send(TALLOC_CTX *mem_ctx,
1714                                                struct tevent_context *ev,
1715                                                struct ctdb_client_context *client,
1716                                                struct timeval timeout,
1717                                                struct ctdb_db_context *db,
1718                                                bool readonly)
1719 {
1720         struct ctdb_transaction_start_state *state;
1721         struct tevent_req *req, *subreq;
1722         struct ctdb_transaction_handle *h;
1723
1724         req = tevent_req_create(mem_ctx, &state,
1725                                 struct ctdb_transaction_start_state);
1726         if (req == NULL) {
1727                 return NULL;
1728         }
1729
1730         if (! db->persistent) {
1731                 tevent_req_error(req, EINVAL);
1732                 return tevent_req_post(req, ev);
1733         }
1734
1735         state->ev = ev;
1736         state->client = client;
1737         state->destnode = ctdb_client_pnn(client);
1738
1739         h = talloc_zero(db, struct ctdb_transaction_handle);
1740         if (tevent_req_nomem(h, req)) {
1741                 return tevent_req_post(req, ev);
1742         }
1743
1744         h->ev = ev;
1745         h->client = client;
1746         h->db = db;
1747         h->readonly = readonly;
1748         h->updated = false;
1749
1750         /* SRVID is unique for databases, so client can have transactions
1751          * active for multiple databases */
1752         h->sid = ctdb_client_get_server_id(client, db->db_id);
1753
1754         h->recbuf = ctdb_rec_buffer_init(h, db->db_id);
1755         if (tevent_req_nomem(h->recbuf, req)) {
1756                 return tevent_req_post(req, ev);
1757         }
1758
1759         h->lock_name = talloc_asprintf(h, "transaction_db_0x%08x", db->db_id);
1760         if (tevent_req_nomem(h->lock_name, req)) {
1761                 return tevent_req_post(req, ev);
1762         }
1763
1764         state->h = h;
1765
1766         subreq = ctdb_attach_send(state, ev, client, timeout, "g_lock.tdb", 0);
1767         if (tevent_req_nomem(subreq, req)) {
1768                 return tevent_req_post(req, ev);
1769         }
1770         tevent_req_set_callback(subreq, ctdb_transaction_g_lock_attached, req);
1771
1772         return req;
1773 }
1774
1775 static void ctdb_transaction_g_lock_attached(struct tevent_req *subreq)
1776 {
1777         struct tevent_req *req = tevent_req_callback_data(
1778                 subreq, struct tevent_req);
1779         struct ctdb_transaction_start_state *state = tevent_req_data(
1780                 req, struct ctdb_transaction_start_state);
1781         struct ctdb_req_control request;
1782         bool status;
1783         int ret;
1784
1785         status = ctdb_attach_recv(subreq, &ret, &state->h->db_g_lock);
1786         TALLOC_FREE(subreq);
1787         if (! status) {
1788                 tevent_req_error(req, ret);
1789                 return;
1790         }
1791
1792         ctdb_req_control_register_srvid(&request, state->h->sid.unique_id);
1793         subreq = ctdb_client_control_send(state, state->ev, state->client,
1794                                           state->destnode, state->timeout,
1795                                           &request);
1796         if (tevent_req_nomem(subreq, req)) {
1797                 return;
1798         }
1799         tevent_req_set_callback(subreq, ctdb_transaction_register_done, req);
1800 }
1801
1802 static void ctdb_transaction_register_done(struct tevent_req *subreq)
1803 {
1804         struct tevent_req *req = tevent_req_callback_data(
1805                 subreq, struct tevent_req);
1806         struct ctdb_transaction_start_state *state = tevent_req_data(
1807                 req, struct ctdb_transaction_start_state);
1808         struct ctdb_reply_control *reply;
1809         bool status;
1810         int ret;
1811
1812         status = ctdb_client_control_recv(subreq, &ret, state, &reply);
1813         TALLOC_FREE(subreq);
1814         if (! status) {
1815                 tevent_req_error(req, ret);
1816                 return;
1817         }
1818
1819         ret = ctdb_reply_control_register_srvid(reply);
1820         talloc_free(reply);
1821         if (ret != 0) {
1822                 tevent_req_error(req, ret);
1823                 return;
1824         }
1825
1826         subreq = ctdb_g_lock_lock_send(state, state->ev, state->client,
1827                                        state->h->db_g_lock, state->h->lock_name,
1828                                        &state->h->sid, state->h->readonly);
1829         if (tevent_req_nomem(subreq, req)) {
1830                 return;
1831         }
1832         tevent_req_set_callback(subreq, ctdb_transaction_g_lock_done, req);
1833 }
1834
1835 static void ctdb_transaction_g_lock_done(struct tevent_req *subreq)
1836 {
1837         struct tevent_req *req = tevent_req_callback_data(
1838                 subreq, struct tevent_req);
1839         int ret;
1840         bool status;
1841
1842         status = ctdb_g_lock_lock_recv(subreq, &ret);
1843         TALLOC_FREE(subreq);
1844         if (! status) {
1845                 tevent_req_error(req, ret);
1846                 return;
1847         }
1848
1849         tevent_req_done(req);
1850 }
1851
1852 struct ctdb_transaction_handle *ctdb_transaction_start_recv(
1853                                         struct tevent_req *req,
1854                                         int *perr)
1855 {
1856         struct ctdb_transaction_start_state *state = tevent_req_data(
1857                 req, struct ctdb_transaction_start_state);
1858         struct ctdb_transaction_handle *h = state->h;
1859         int err;
1860
1861         if (tevent_req_is_unix_error(req, &err)) {
1862                 if (perr != NULL) {
1863                         *perr = err;
1864                 }
1865                 return NULL;
1866         }
1867
1868         talloc_set_destructor(h, ctdb_transaction_handle_destructor);
1869         return h;
1870 }
1871
1872 static int ctdb_transaction_handle_destructor(struct ctdb_transaction_handle *h)
1873 {
1874         int ret;
1875
1876         ret = ctdb_ctrl_deregister_srvid(h, h->ev, h->client, h->client->pnn,
1877                                          tevent_timeval_zero(),
1878                                          h->sid.unique_id);
1879         if (ret != 0) {
1880                 DEBUG(DEBUG_WARNING, ("Failed to deregister SRVID\n"));
1881         }
1882
1883         return 0;
1884 }
1885
1886 int ctdb_transaction_start(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
1887                            struct ctdb_client_context *client,
1888                            struct timeval timeout,
1889                            struct ctdb_db_context *db, bool readonly,
1890                            struct ctdb_transaction_handle **out)
1891 {
1892         struct tevent_req *req;
1893         struct ctdb_transaction_handle *h;
1894         int ret;
1895
1896         req = ctdb_transaction_start_send(mem_ctx, ev, client, timeout, db,
1897                                           readonly);
1898         if (req == NULL) {
1899                 return ENOMEM;
1900         }
1901
1902         tevent_req_poll(req, ev);
1903
1904         h = ctdb_transaction_start_recv(req, &ret);
1905         if (h == NULL) {
1906                 return ret;
1907         }
1908
1909         *out = h;
1910         return 0;
1911 }
1912
1913 struct ctdb_transaction_record_fetch_state {
1914         TDB_DATA key, data;
1915         struct ctdb_ltdb_header header;
1916         bool found;
1917 };
1918
1919 static int ctdb_transaction_record_fetch_traverse(
1920                                 uint32_t reqid,
1921                                 struct ctdb_ltdb_header *nullheader,
1922                                 TDB_DATA key, TDB_DATA data,
1923                                 void *private_data)
1924 {
1925         struct ctdb_transaction_record_fetch_state *state =
1926                 (struct ctdb_transaction_record_fetch_state *)private_data;
1927
1928         if (state->key.dsize == key.dsize &&
1929             memcmp(state->key.dptr, key.dptr, key.dsize) == 0) {
1930                 int ret;
1931
1932                 ret = ctdb_ltdb_header_extract(&data, &state->header);
1933                 if (ret != 0) {
1934                         DEBUG(DEBUG_ERR, ("Failed to extract header\n"));
1935                         return 1;
1936                 }
1937
1938                 state->data = data;
1939                 state->found = true;
1940         }
1941
1942         return 0;
1943 }
1944
1945 static int ctdb_transaction_record_fetch(struct ctdb_transaction_handle *h,
1946                                          TDB_DATA key,
1947                                          struct ctdb_ltdb_header *header,
1948                                          TDB_DATA *data)
1949 {
1950         struct ctdb_transaction_record_fetch_state state;
1951         int ret;
1952
1953         state.key = key;
1954         state.found = false;
1955
1956         ret = ctdb_rec_buffer_traverse(h->recbuf,
1957                                        ctdb_transaction_record_fetch_traverse,
1958                                        &state);
1959         if (ret != 0) {
1960                 return ret;
1961         }
1962
1963         if (state.found) {
1964                 if (header != NULL) {
1965                         *header = state.header;
1966                 }
1967                 if (data != NULL) {
1968                         *data = state.data;
1969                 }
1970                 return 0;
1971         }
1972
1973         return ENOENT;
1974 }
1975
1976 int ctdb_transaction_fetch_record(struct ctdb_transaction_handle *h,
1977                                   TDB_DATA key,
1978                                   TALLOC_CTX *mem_ctx, TDB_DATA *data)
1979 {
1980         TDB_DATA tmp_data;
1981         struct ctdb_ltdb_header header;
1982         int ret;
1983
1984         ret = ctdb_transaction_record_fetch(h, key, NULL, &tmp_data);
1985         if (ret == 0) {
1986                 data->dptr = talloc_memdup(mem_ctx, tmp_data.dptr,
1987                                            tmp_data.dsize);
1988                 if (data->dptr == NULL) {
1989                         return ENOMEM;
1990                 }
1991                 data->dsize = tmp_data.dsize;
1992                 return 0;
1993         }
1994
1995         ret = ctdb_ltdb_fetch(h->db, key, &header, mem_ctx, data);
1996         if (ret != 0) {
1997                 return ret;
1998         }
1999
2000         ret = ctdb_rec_buffer_add(h, h->recbuf, 0, &header, key, *data);
2001         if (ret != 0) {
2002                 return ret;
2003         }
2004
2005         return 0;
2006 }
2007
2008 int ctdb_transaction_store_record(struct ctdb_transaction_handle *h,
2009                                   TDB_DATA key, TDB_DATA data)
2010 {
2011         TALLOC_CTX *tmp_ctx;
2012         struct ctdb_ltdb_header header;
2013         TDB_DATA old_data;
2014         int ret;
2015
2016         if (h->readonly) {
2017                 return EINVAL;
2018         }
2019
2020         tmp_ctx = talloc_new(h);
2021         if (tmp_ctx == NULL) {
2022                 return ENOMEM;
2023         }
2024
2025         ret = ctdb_transaction_record_fetch(h, key, &header, &old_data);
2026         if (ret != 0) {
2027                 ret = ctdb_ltdb_fetch(h->db, key, &header, tmp_ctx, &old_data);
2028                 if (ret != 0) {
2029                         return ret;
2030                 }
2031         }
2032
2033         if (old_data.dsize == data.dsize &&
2034             memcmp(old_data.dptr, data.dptr, data.dsize) == 0) {
2035                 talloc_free(tmp_ctx);
2036                 return 0;
2037         }
2038
2039         header.dmaster = ctdb_client_pnn(h->client);
2040         header.rsn += 1;
2041
2042         ret = ctdb_rec_buffer_add(h, h->recbuf, 0, &header, key, data);
2043         talloc_free(tmp_ctx);
2044         if (ret != 0) {
2045                 return ret;
2046         }
2047         h->updated = true;
2048
2049         return 0;
2050 }
2051
2052 int ctdb_transaction_delete_record(struct ctdb_transaction_handle *h,
2053                                    TDB_DATA key)
2054 {
2055         return ctdb_transaction_store_record(h, key, tdb_null);
2056 }
2057
2058 static int ctdb_transaction_store_db_seqnum(struct ctdb_transaction_handle *h,
2059                                             uint64_t seqnum)
2060 {
2061         const char *keyname = CTDB_DB_SEQNUM_KEY;
2062         TDB_DATA key, data;
2063
2064         key.dptr = discard_const(keyname);
2065         key.dsize = strlen(keyname) + 1;
2066
2067         data.dptr = (uint8_t *)&seqnum;
2068         data.dsize = sizeof(seqnum);
2069
2070         return ctdb_transaction_store_record(h, key, data);
2071 }
2072
2073 struct ctdb_transaction_commit_state {
2074         struct tevent_context *ev;
2075         struct ctdb_transaction_handle *h;
2076         uint64_t seqnum;
2077 };
2078
2079 static void ctdb_transaction_commit_seqnum_done(struct tevent_req *subreq);
2080 static void ctdb_transaction_commit_try(struct tevent_req *subreq);
2081 static void ctdb_transaction_commit_done(struct tevent_req *subreq);
2082 static void ctdb_transaction_commit_seqnum2_done(struct tevent_req *subreq);
2083
2084 struct tevent_req *ctdb_transaction_commit_send(
2085                                         TALLOC_CTX *mem_ctx,
2086                                         struct tevent_context *ev,
2087                                         struct ctdb_transaction_handle *h)
2088 {
2089         struct tevent_req *req, *subreq;
2090         struct ctdb_transaction_commit_state *state;
2091         struct ctdb_req_control request;
2092
2093         req = tevent_req_create(mem_ctx, &state,
2094                                 struct ctdb_transaction_commit_state);
2095         if (req == NULL) {
2096                 return NULL;
2097         }
2098
2099         state->ev = ev;
2100         state->h = h;
2101
2102         ctdb_req_control_get_db_seqnum(&request, h->db->db_id);
2103         subreq = ctdb_client_control_send(state, ev, h->client,
2104                                           h->client->pnn,
2105                                           tevent_timeval_zero(), &request);
2106         if (tevent_req_nomem(subreq, req)) {
2107                 return tevent_req_post(req, ev);
2108         }
2109         tevent_req_set_callback(subreq, ctdb_transaction_commit_seqnum_done,
2110                                 req);
2111
2112         return req;
2113 }
2114
2115 static void ctdb_transaction_commit_seqnum_done(struct tevent_req *subreq)
2116 {
2117         struct tevent_req *req = tevent_req_callback_data(
2118                 subreq, struct tevent_req);
2119         struct ctdb_transaction_commit_state *state = tevent_req_data(
2120                 req, struct ctdb_transaction_commit_state);
2121         struct ctdb_reply_control *reply;
2122         int ret;
2123         bool status;
2124
2125         status = ctdb_client_control_recv(subreq, &ret, state, &reply);
2126         TALLOC_FREE(subreq);
2127         if (! status) {
2128                 tevent_req_error(req, ret);
2129                 return;
2130         }
2131
2132         ret = ctdb_reply_control_get_db_seqnum(reply, &state->seqnum);
2133         if (ret != 0) {
2134                 tevent_req_error(req, ret);
2135                 return;
2136         }
2137
2138         ret = ctdb_transaction_store_db_seqnum(state->h, state->seqnum+1);
2139         if (ret != 0) {
2140                 tevent_req_error(req, ret);
2141                 return;
2142         }
2143
2144         subreq = ctdb_recovery_wait_send(state, state->ev, state->h->client);
2145         if (tevent_req_nomem(subreq, req)) {
2146                 return;
2147         }
2148         tevent_req_set_callback(subreq, ctdb_transaction_commit_try, req);
2149 }
2150
2151 static void ctdb_transaction_commit_try(struct tevent_req *subreq)
2152 {
2153         struct tevent_req *req = tevent_req_callback_data(
2154                 subreq, struct tevent_req);
2155         struct ctdb_transaction_commit_state *state = tevent_req_data(
2156                 req, struct ctdb_transaction_commit_state);
2157         struct ctdb_req_control request;
2158         int ret;
2159         bool status;
2160
2161         status = ctdb_recovery_wait_recv(subreq, &ret);
2162         TALLOC_FREE(subreq);
2163         if (! status) {
2164                 tevent_req_error(req, ret);
2165                 return;
2166         }
2167
2168         ctdb_req_control_trans3_commit(&request, state->h->recbuf);
2169         subreq = ctdb_client_control_send(state, state->ev, state->h->client,
2170                                           state->h->client->pnn,
2171                                           tevent_timeval_zero(), &request);
2172         if (tevent_req_nomem(subreq, req)) {
2173                 return;
2174         }
2175         tevent_req_set_callback(subreq, ctdb_transaction_commit_done, req);
2176 }
2177
2178 static void ctdb_transaction_commit_done(struct tevent_req *subreq)
2179 {
2180         struct tevent_req *req = tevent_req_callback_data(
2181                 subreq, struct tevent_req);
2182         struct ctdb_transaction_commit_state *state = tevent_req_data(
2183                 req, struct ctdb_transaction_commit_state);
2184         struct ctdb_reply_control *reply;
2185         struct ctdb_req_control request;
2186         int ret;
2187         bool status;
2188
2189         status = ctdb_client_control_recv(subreq, &ret, state, &reply);
2190         TALLOC_FREE(subreq);
2191         if (! status) {
2192                 tevent_req_error(req, ret);
2193                 return;
2194         }
2195
2196         ret = ctdb_reply_control_trans3_commit(reply);
2197         if (ret < 0) {
2198                 /* Control failed due to recovery */
2199                 subreq = ctdb_recovery_wait_send(state, state->ev,
2200                                                  state->h->client);
2201                 if (tevent_req_nomem(subreq, req)) {
2202                         return;
2203                 }
2204                 tevent_req_set_callback(subreq, ctdb_transaction_commit_try,
2205                                         req);
2206                 return;
2207         }
2208
2209         ctdb_req_control_get_db_seqnum(&request, state->h->db->db_id);
2210         subreq = ctdb_client_control_send(state, state->ev, state->h->client,
2211                                           state->h->client->pnn,
2212                                           tevent_timeval_zero(), &request);
2213         if (tevent_req_nomem(subreq, req)) {
2214                 return;
2215         }
2216         tevent_req_set_callback(subreq, ctdb_transaction_commit_seqnum2_done,
2217                                 req);
2218 }
2219
2220 static void ctdb_transaction_commit_seqnum2_done(struct tevent_req *subreq)
2221 {
2222         struct tevent_req *req = tevent_req_callback_data(
2223                 subreq, struct tevent_req);
2224         struct ctdb_transaction_commit_state *state = tevent_req_data(
2225                 req, struct ctdb_transaction_commit_state);
2226         struct ctdb_reply_control *reply;
2227         uint64_t seqnum;
2228         int ret;
2229         bool status;
2230
2231         status = ctdb_client_control_recv(subreq, &ret, state, &reply);
2232         TALLOC_FREE(subreq);
2233         if (! status) {
2234                 tevent_req_error(req, ret);
2235                 return;
2236         }
2237
2238         ret = ctdb_reply_control_get_db_seqnum(reply, &seqnum);
2239         if (ret != 0) {
2240                 tevent_req_error(req, ret);
2241                 return;
2242         }
2243
2244         if (seqnum == state->seqnum) {
2245                 subreq = ctdb_recovery_wait_send(state, state->ev,
2246                                                  state->h->client);
2247                 if (tevent_req_nomem(subreq, req)) {
2248                         return;
2249                 }
2250                 tevent_req_set_callback(subreq, ctdb_transaction_commit_try,
2251                                         req);
2252                 return;
2253         }
2254
2255         if (seqnum != state->seqnum + 1) {
2256                 tevent_req_error(req, EIO);
2257                 return;
2258         }
2259
2260         tevent_req_done(req);
2261 }
2262
2263 bool ctdb_transaction_commit_recv(struct tevent_req *req, int *perr)
2264 {
2265         int err;
2266
2267         if (tevent_req_is_unix_error(req, &err)) {
2268                 if (perr != NULL) {
2269                         *perr = err;
2270                 }
2271                 return false;
2272         }
2273
2274         return true;
2275 }
2276
2277 int ctdb_transaction_commit(struct ctdb_transaction_handle *h)
2278 {
2279         struct tevent_req *req;
2280         int ret;
2281         bool status;
2282
2283         if (h->readonly || ! h->updated) {
2284                 talloc_free(h);
2285                 return 0;
2286         }
2287
2288         req = ctdb_transaction_commit_send(h, h->ev, h);
2289         if (req == NULL) {
2290                 talloc_free(h);
2291                 return ENOMEM;
2292         }
2293
2294         tevent_req_poll(req, h->ev);
2295
2296         status = ctdb_transaction_commit_recv(req, &ret);
2297         if (! status) {
2298                 talloc_free(h);
2299                 return ret;
2300         }
2301
2302         talloc_free(h);
2303         return 0;
2304 }
2305
2306 int ctdb_transaction_cancel(struct ctdb_transaction_handle *h)
2307 {
2308         talloc_free(h);
2309         return 0;
2310 }
2311
2312 /*
2313  * TODO:
2314  *
2315  * In future Samba should register SERVER_ID.
2316  * Make that structure same as struct srvid {}.
2317  */