ctdb-client: Use ctdb_ltdb_header_extract()
[obnox/samba/samba-obnox.git] / ctdb / client / client_db.c
1 /*
2    CTDB client code
3
4    Copyright (C) Amitay Isaacs  2015
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "replace.h"
21 #include "system/network.h"
22 #include "system/filesys.h"
23
24 #include <talloc.h>
25 #include <tevent.h>
26 #include <tdb.h>
27
28 #include "common/logging.h"
29
30 #include "lib/tdb_wrap/tdb_wrap.h"
31 #include "lib/util/tevent_unix.h"
32 #include "lib/util/dlinklist.h"
33 #include "lib/util/debug.h"
34
35 #include "protocol/protocol.h"
36 #include "protocol/protocol_api.h"
37 #include "client/client_private.h"
38 #include "client/client.h"
39
40 static struct ctdb_db_context *client_db_handle(
41                                         struct ctdb_client_context *client,
42                                         const char *db_name)
43 {
44         struct ctdb_db_context *db;
45
46         for (db = client->db; db != NULL; db = db->next) {
47                 if (strcmp(db_name, db->db_name) == 0) {
48                         return db;
49                 }
50         }
51
52         return NULL;
53 }
54
55 struct ctdb_set_db_flags_state {
56         struct tevent_context *ev;
57         struct ctdb_client_context *client;
58         struct timeval timeout;
59         uint32_t db_id;
60         uint8_t db_flags;
61         bool readonly_done, sticky_done;
62         uint32_t *pnn_list;
63         int count;
64 };
65
66 static void ctdb_set_db_flags_nodemap_done(struct tevent_req *subreq);
67 static void ctdb_set_db_flags_readonly_done(struct tevent_req *subreq);
68 static void ctdb_set_db_flags_sticky_done(struct tevent_req *subreq);
69
70 static struct tevent_req *ctdb_set_db_flags_send(
71                                 TALLOC_CTX *mem_ctx,
72                                 struct tevent_context *ev,
73                                 struct ctdb_client_context *client,
74                                 uint32_t destnode, struct timeval timeout,
75                                 uint32_t db_id, uint8_t db_flags)
76 {
77         struct tevent_req *req, *subreq;
78         struct ctdb_set_db_flags_state *state;
79         struct ctdb_req_control request;
80
81         req = tevent_req_create(mem_ctx, &state,
82                                 struct ctdb_set_db_flags_state);
83         if (req == NULL) {
84                 return NULL;
85         }
86
87         if (! (db_flags & (CTDB_DB_FLAGS_READONLY | CTDB_DB_FLAGS_STICKY))) {
88                 tevent_req_done(req);
89                 return tevent_req_post(req, ev);
90         }
91
92         state->ev = ev;
93         state->client = client;
94         state->timeout = timeout;
95         state->db_id = db_id;
96         state->db_flags = db_flags;
97
98         ctdb_req_control_get_nodemap(&request);
99         subreq = ctdb_client_control_send(state, ev, client, destnode, timeout,
100                                           &request);
101         if (tevent_req_nomem(subreq, req)) {
102                 return tevent_req_post(req, ev);
103         }
104         tevent_req_set_callback(subreq, ctdb_set_db_flags_nodemap_done, req);
105
106         return req;
107 }
108
109 static void ctdb_set_db_flags_nodemap_done(struct tevent_req *subreq)
110 {
111         struct tevent_req *req = tevent_req_callback_data(
112                 subreq, struct tevent_req);
113         struct ctdb_set_db_flags_state *state = tevent_req_data(
114                 req, struct ctdb_set_db_flags_state);
115         struct ctdb_req_control request;
116         struct ctdb_reply_control *reply;
117         struct ctdb_node_map *nodemap;
118         int ret;
119         bool status;
120
121         status = ctdb_client_control_recv(subreq, &ret, state, &reply);
122         TALLOC_FREE(subreq);
123         if (! status) {
124                 tevent_req_error(req, ret);
125                 return;
126         }
127
128         ret = ctdb_reply_control_get_nodemap(reply, state, &nodemap);
129         talloc_free(reply);
130         if (ret != 0) {
131                 tevent_req_error(req, ret);
132                 return;
133         }
134
135         state->count = list_of_connected_nodes(nodemap, CTDB_UNKNOWN_PNN,
136                                                state, &state->pnn_list);
137         talloc_free(nodemap);
138         if (state->count <= 0) {
139                 tevent_req_error(req, ENOMEM);
140                 return;
141         }
142
143         if (state->db_flags & CTDB_DB_FLAGS_READONLY) {
144                 ctdb_req_control_set_db_readonly(&request, state->db_id);
145                 subreq = ctdb_client_control_multi_send(
146                                         state, state->ev, state->client,
147                                         state->pnn_list, state->count,
148                                         state->timeout, &request);
149                 if (tevent_req_nomem(subreq, req)) {
150                         return;
151                 }
152                 tevent_req_set_callback(subreq,
153                                         ctdb_set_db_flags_readonly_done, req);
154         } else {
155                 state->readonly_done = true;
156         }
157
158         if (state->db_flags & CTDB_DB_FLAGS_STICKY) {
159                 ctdb_req_control_set_db_sticky(&request, state->db_id);
160                 subreq = ctdb_client_control_multi_send(
161                                         state, state->ev, state->client,
162                                         state->pnn_list, state->count,
163                                         state->timeout, &request);
164                 if (tevent_req_nomem(subreq, req)) {
165                         return;
166                 }
167                 tevent_req_set_callback(subreq, ctdb_set_db_flags_sticky_done,
168                                         req);
169         } else {
170                 state->sticky_done = true;
171         }
172 }
173
174 static void ctdb_set_db_flags_readonly_done(struct tevent_req *subreq)
175 {
176         struct tevent_req *req = tevent_req_callback_data(
177                 subreq, struct tevent_req);
178         struct ctdb_set_db_flags_state *state = tevent_req_data(
179                 req, struct ctdb_set_db_flags_state);
180         int ret;
181         bool status;
182
183         status = ctdb_client_control_multi_recv(subreq, &ret, NULL, NULL,
184                                                 NULL);
185         TALLOC_FREE(subreq);
186         if (! status) {
187                 tevent_req_error(req, ret);
188                 return;
189         }
190
191         state->readonly_done = true;
192
193         if (state->readonly_done && state->sticky_done) {
194                 tevent_req_done(req);
195         }
196 }
197
198 static void ctdb_set_db_flags_sticky_done(struct tevent_req *subreq)
199 {
200         struct tevent_req *req = tevent_req_callback_data(
201                 subreq, struct tevent_req);
202         struct ctdb_set_db_flags_state *state = tevent_req_data(
203                 req, struct ctdb_set_db_flags_state);
204         int ret;
205         bool status;
206
207         status = ctdb_client_control_multi_recv(subreq, &ret, NULL, NULL,
208                                                 NULL);
209         TALLOC_FREE(subreq);
210         if (! status) {
211                 tevent_req_error(req, ret);
212                 return;
213         }
214
215         state->sticky_done = true;
216
217         if (state->readonly_done && state->sticky_done) {
218                 tevent_req_done(req);
219         }
220 }
221
222 static bool ctdb_set_db_flags_recv(struct tevent_req *req, int *perr)
223 {
224         int err;
225
226         if (tevent_req_is_unix_error(req, &err)) {
227                 if (perr != NULL) {
228                         *perr = err;
229                 }
230                 return false;
231         }
232         return true;
233 }
234
235 struct ctdb_attach_state {
236         struct tevent_context *ev;
237         struct ctdb_client_context *client;
238         struct timeval timeout;
239         uint32_t destnode;
240         uint8_t db_flags;
241         uint32_t tdb_flags;
242         struct ctdb_db_context *db;
243 };
244
245 static void ctdb_attach_mutex_done(struct tevent_req *subreq);
246 static void ctdb_attach_dbid_done(struct tevent_req *subreq);
247 static void ctdb_attach_dbpath_done(struct tevent_req *subreq);
248 static void ctdb_attach_health_done(struct tevent_req *subreq);
249 static void ctdb_attach_flags_done(struct tevent_req *subreq);
250
251 struct tevent_req *ctdb_attach_send(TALLOC_CTX *mem_ctx,
252                                     struct tevent_context *ev,
253                                     struct ctdb_client_context *client,
254                                     struct timeval timeout,
255                                     const char *db_name, uint8_t db_flags)
256 {
257         struct tevent_req *req, *subreq;
258         struct ctdb_attach_state *state;
259         struct ctdb_req_control request;
260
261         req = tevent_req_create(mem_ctx, &state, struct ctdb_attach_state);
262         if (req == NULL) {
263                 return NULL;
264         }
265
266         state->db = client_db_handle(client, db_name);
267         if (state->db != NULL) {
268                 tevent_req_done(req);
269                 return tevent_req_post(req, ev);
270         }
271
272         state->ev = ev;
273         state->client = client;
274         state->timeout = timeout;
275         state->destnode = ctdb_client_pnn(client);
276         state->db_flags = db_flags;
277
278         state->db = talloc_zero(client, struct ctdb_db_context);
279         if (tevent_req_nomem(state->db, req)) {
280                 return tevent_req_post(req, ev);
281         }
282
283         state->db->db_name = talloc_strdup(state->db, db_name);
284         if (tevent_req_nomem(state->db, req)) {
285                 return tevent_req_post(req, ev);
286         }
287
288         if (db_flags & CTDB_DB_FLAGS_PERSISTENT) {
289                 state->db->persistent = true;
290         }
291
292         ctdb_req_control_get_tunable(&request, "TDBMutexEnabled");
293         subreq = ctdb_client_control_send(state, ev, client,
294                                           ctdb_client_pnn(client), timeout,
295                                           &request);
296         if (tevent_req_nomem(subreq, req)) {
297                 return tevent_req_post(req, ev);
298         }
299         tevent_req_set_callback(subreq, ctdb_attach_mutex_done, req);
300
301         return req;
302 }
303
304 static void ctdb_attach_mutex_done(struct tevent_req *subreq)
305 {
306         struct tevent_req *req = tevent_req_callback_data(
307                 subreq, struct tevent_req);
308         struct ctdb_attach_state *state = tevent_req_data(
309                 req, struct ctdb_attach_state);
310         struct ctdb_reply_control *reply;
311         struct ctdb_req_control request;
312         uint32_t mutex_enabled;
313         int ret;
314         bool status;
315
316         status = ctdb_client_control_recv(subreq, &ret, state, &reply);
317         TALLOC_FREE(subreq);
318         if (! status) {
319                 tevent_req_error(req, ret);
320                 return;
321         }
322
323         ret = ctdb_reply_control_get_tunable(reply, &mutex_enabled);
324         if (ret != 0) {
325                 /* Treat error as mutex support not available */
326                 mutex_enabled = 0;
327         }
328
329         state->tdb_flags = TDB_DEFAULT;
330         if (! state->db->persistent) {
331                 state->tdb_flags |= (TDB_INCOMPATIBLE_HASH |
332                                      TDB_CLEAR_IF_FIRST);
333         }
334         if (mutex_enabled == 1) {
335                 state->tdb_flags |= TDB_MUTEX_LOCKING;
336         }
337
338         if (state->db->persistent) {
339                 ctdb_req_control_db_attach_persistent(&request,
340                                                       state->db->db_name,
341                                                       state->tdb_flags);
342         } else {
343                 ctdb_req_control_db_attach(&request, state->db->db_name,
344                                            state->tdb_flags);
345         }
346
347         subreq = ctdb_client_control_send(state, state->ev, state->client,
348                                           state->destnode, state->timeout,
349                                           &request);
350         if (tevent_req_nomem(subreq, req)) {
351                 return;
352         }
353         tevent_req_set_callback(subreq, ctdb_attach_dbid_done, req);
354 }
355
356 static void ctdb_attach_dbid_done(struct tevent_req *subreq)
357 {
358         struct tevent_req *req = tevent_req_callback_data(
359                 subreq, struct tevent_req);
360         struct ctdb_attach_state *state = tevent_req_data(
361                 req, struct ctdb_attach_state);
362         struct ctdb_req_control request;
363         struct ctdb_reply_control *reply;
364         bool status;
365         int ret;
366
367         status = ctdb_client_control_recv(subreq, &ret, state, &reply);
368         TALLOC_FREE(subreq);
369         if (! status) {
370                 tevent_req_error(req, ret);
371                 return;
372         }
373
374         if (state->db->persistent) {
375                 ret = ctdb_reply_control_db_attach_persistent(
376                                 reply, &state->db->db_id);
377         } else {
378                 ret = ctdb_reply_control_db_attach(reply, &state->db->db_id);
379         }
380         talloc_free(reply);
381         if (ret != 0) {
382                 tevent_req_error(req, ret);
383                 return;
384         }
385
386         ctdb_req_control_getdbpath(&request, state->db->db_id);
387         subreq = ctdb_client_control_send(state, state->ev, state->client,
388                                           state->destnode, state->timeout,
389                                           &request);
390         if (tevent_req_nomem(subreq, req)) {
391                 return;
392         }
393         tevent_req_set_callback(subreq, ctdb_attach_dbpath_done, req);
394 }
395
396 static void ctdb_attach_dbpath_done(struct tevent_req *subreq)
397 {
398         struct tevent_req *req = tevent_req_callback_data(
399                 subreq, struct tevent_req);
400         struct ctdb_attach_state *state = tevent_req_data(
401                 req, struct ctdb_attach_state);
402         struct ctdb_reply_control *reply;
403         struct ctdb_req_control request;
404         bool status;
405         int ret;
406
407         status = ctdb_client_control_recv(subreq, &ret, state, &reply);
408         TALLOC_FREE(subreq);
409         if (! status) {
410                 tevent_req_error(req, ret);
411                 return;
412         }
413
414         ret = ctdb_reply_control_getdbpath(reply, state->db,
415                                            &state->db->db_path);
416         talloc_free(reply);
417         if (ret != 0) {
418                 tevent_req_error(req, ret);
419                 return;
420         }
421
422         ctdb_req_control_db_get_health(&request, state->db->db_id);
423         subreq = ctdb_client_control_send(state, state->ev, state->client,
424                                           state->destnode, state->timeout,
425                                           &request);
426         if (tevent_req_nomem(subreq, req)) {
427                 return;
428         }
429         tevent_req_set_callback(subreq, ctdb_attach_health_done, req);
430 }
431
432 static void ctdb_attach_health_done(struct tevent_req *subreq)
433 {
434         struct tevent_req *req = tevent_req_callback_data(
435                 subreq, struct tevent_req);
436         struct ctdb_attach_state *state = tevent_req_data(
437                 req, struct ctdb_attach_state);
438         struct ctdb_reply_control *reply;
439         const char *reason;
440         bool status;
441         int ret;
442
443         status = ctdb_client_control_recv(subreq, &ret, state, &reply);
444         TALLOC_FREE(subreq);
445         if (! status) {
446                 tevent_req_error(req, ret);
447                 return;
448         }
449
450         ret = ctdb_reply_control_db_get_health(reply, state, &reason);
451         if (ret != 0) {
452                 tevent_req_error(req, ret);
453                 return;
454         }
455
456         if (reason != NULL) {
457                 /* Database unhealthy, avoid attach */
458                 /* FIXME: Log here */
459                 tevent_req_error(req, EIO);
460                 return;
461         }
462
463         subreq = ctdb_set_db_flags_send(state, state->ev, state->client,
464                                         state->destnode, state->timeout,
465                                         state->db->db_id, state->db_flags);
466         if (tevent_req_nomem(subreq, req)) {
467                 return;
468         }
469         tevent_req_set_callback(subreq, ctdb_attach_flags_done, req);
470 }
471
472 static void ctdb_attach_flags_done(struct tevent_req *subreq)
473 {
474         struct tevent_req *req = tevent_req_callback_data(
475                 subreq, struct tevent_req);
476         struct ctdb_attach_state *state = tevent_req_data(
477                 req, struct ctdb_attach_state);
478         bool status;
479         int ret;
480
481         status = ctdb_set_db_flags_recv(subreq, &ret);
482         TALLOC_FREE(subreq);
483         if (! status) {
484                 tevent_req_error(req, ret);
485                 return;
486         }
487
488         state->db->ltdb = tdb_wrap_open(state->db, state->db->db_path, 0,
489                                         state->tdb_flags, O_RDWR, 0);
490         if (tevent_req_nomem(state->db->ltdb, req)) {
491                 return;
492         }
493         DLIST_ADD(state->client->db, state->db);
494
495         tevent_req_done(req);
496 }
497
498 bool ctdb_attach_recv(struct tevent_req *req, int *perr,
499                       struct ctdb_db_context **out)
500 {
501         struct ctdb_attach_state *state = tevent_req_data(
502                 req, struct ctdb_attach_state);
503         int err;
504
505         if (tevent_req_is_unix_error(req, &err)) {
506                 if (perr != NULL) {
507                         *perr = err;
508                 }
509                 return false;
510         }
511
512         if (out != NULL) {
513                 *out = state->db;
514         }
515         return true;
516 }
517
518 int ctdb_attach(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
519                 struct ctdb_client_context *client,
520                 struct timeval timeout,
521                 const char *db_name, uint8_t db_flags,
522                 struct ctdb_db_context **out)
523 {
524         struct tevent_req *req;
525         bool status;
526         int ret;
527
528         req = ctdb_attach_send(mem_ctx, ev, client, timeout,
529                                db_name, db_flags);
530         if (req == NULL) {
531                 return ENOMEM;
532         }
533
534         tevent_req_poll(req, ev);
535
536         status = ctdb_attach_recv(req, &ret, out);
537         if (! status) {
538                 return ret;
539         }
540
541         /*
542         ctdb_set_call(db, CTDB_NULL_FUNC, ctdb_null_func);
543         ctdb_set_call(db, CTDB_FETCH_FUNC, ctdb_fetch_func);
544         ctdb_set_call(db, CTDB_FETCH_WITH_HEADER_FUNC, ctdb_fetch_with_header_func);
545         */
546
547         return 0;
548 }
549
550 int ctdb_detach(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
551                 struct ctdb_client_context *client,
552                 struct timeval timeout, uint32_t db_id)
553 {
554         struct ctdb_db_context *db;
555         int ret;
556
557         ret = ctdb_ctrl_db_detach(mem_ctx, ev, client, client->pnn, timeout,
558                                   db_id);
559         if (ret != 0) {
560                 return ret;
561         }
562
563         for (db = client->db; db != NULL; db = db->next) {
564                 if (db->db_id == db_id) {
565                         DLIST_REMOVE(client->db, db);
566                         break;
567                 }
568         }
569
570         return 0;
571 }
572
573 uint32_t ctdb_db_id(struct ctdb_db_context *db)
574 {
575         return db->db_id;
576 }
577
578 struct ctdb_db_traverse_state {
579         ctdb_rec_parser_func_t parser;
580         void *private_data;
581         bool extract_header;
582         int error;
583 };
584
585 static int ctdb_db_traverse_handler(struct tdb_context *tdb, TDB_DATA key,
586                                     TDB_DATA data, void *private_data)
587 {
588         struct ctdb_db_traverse_state *state =
589                 (struct ctdb_db_traverse_state *)private_data;
590         int ret;
591
592         if (state->extract_header) {
593                 struct ctdb_ltdb_header header;
594
595                 ret = ctdb_ltdb_header_extract(&data, &header);
596                 if (ret != 0) {
597                         state->error = ret;
598                         return 1;
599                 }
600
601                 ret = state->parser(0, &header, key, data, state->private_data);
602         } else {
603                 ret = state->parser(0, NULL, key, data, state->private_data);
604         }
605
606         if (ret != 0) {
607                 state->error = ret;
608                 return 1;
609         }
610
611         return 0;
612 }
613
614 int ctdb_db_traverse(struct ctdb_db_context *db, bool readonly,
615                      bool extract_header,
616                      ctdb_rec_parser_func_t parser, void *private_data)
617 {
618         struct ctdb_db_traverse_state state;
619         int ret;
620
621         state.parser = parser;
622         state.private_data = private_data;
623         state.extract_header = extract_header;
624         state.error = 0;
625
626         if (readonly) {
627                 ret = tdb_traverse_read(db->ltdb->tdb,
628                                         ctdb_db_traverse_handler, &state);
629         } else {
630                 ret = tdb_traverse(db->ltdb->tdb,
631                                    ctdb_db_traverse_handler, &state);
632         }
633
634         if (ret == -1) {
635                 return EIO;
636         }
637
638         return state.error;
639 }
640
641 static int ctdb_ltdb_fetch(struct ctdb_db_context *db, TDB_DATA key,
642                            struct ctdb_ltdb_header *header,
643                            TALLOC_CTX *mem_ctx, TDB_DATA *data)
644 {
645         TDB_DATA rec;
646         int ret;
647
648         rec = tdb_fetch(db->ltdb->tdb, key);
649         if (rec.dsize < sizeof(struct ctdb_ltdb_header)) {
650                 /* No record present */
651                 if (rec.dptr != NULL) {
652                         free(rec.dptr);
653                 }
654
655                 if (tdb_error(db->ltdb->tdb) != TDB_ERR_NOEXIST) {
656                         return EIO;
657                 }
658
659                 header->rsn = 0;
660                 header->dmaster = CTDB_UNKNOWN_PNN;
661                 header->flags = 0;
662
663                 if (data != NULL) {
664                         *data = tdb_null;
665                 }
666                 return 0;
667         }
668
669         ret = ctdb_ltdb_header_pull(rec.dptr, rec.dsize, header);
670         if (ret != 0) {
671                 return ret;
672         }
673
674         ret = 0;
675         if (data != NULL) {
676                 size_t offset = ctdb_ltdb_header_len(header);
677
678                 data->dsize = rec.dsize - offset;
679                 data->dptr = talloc_memdup(mem_ctx, rec.dptr + offset,
680                                            data->dsize);
681                 if (data->dptr == NULL) {
682                         ret = ENOMEM;
683                 }
684         }
685
686         free(rec.dptr);
687         return ret;
688 }
689
690 /*
691  * Fetch a record from volatile database
692  *
693  * Steps:
694  *  1. Get a lock on the hash chain
695  *  2. If the record does not exist, migrate the record
696  *  3. If readonly=true and delegations do not exist, migrate the record.
697  *  4. If readonly=false and delegations exist, migrate the record.
698  *  5. If the local node is not dmaster, migrate the record.
699  *  6. Return record
700  */
701
702 struct ctdb_fetch_lock_state {
703         struct tevent_context *ev;
704         struct ctdb_client_context *client;
705         struct ctdb_record_handle *h;
706         bool readonly;
707         uint32_t pnn;
708 };
709
710 static int ctdb_fetch_lock_check(struct tevent_req *req);
711 static void ctdb_fetch_lock_migrate(struct tevent_req *req);
712 static void ctdb_fetch_lock_migrate_done(struct tevent_req *subreq);
713
714 struct tevent_req *ctdb_fetch_lock_send(TALLOC_CTX *mem_ctx,
715                                         struct tevent_context *ev,
716                                         struct ctdb_client_context *client,
717                                         struct ctdb_db_context *db,
718                                         TDB_DATA key, bool readonly)
719 {
720         struct ctdb_fetch_lock_state *state;
721         struct tevent_req *req;
722         int ret;
723
724         req = tevent_req_create(mem_ctx, &state, struct ctdb_fetch_lock_state);
725         if (req == NULL) {
726                 return NULL;
727         }
728
729         state->ev = ev;
730         state->client = client;
731
732         state->h = talloc_zero(db, struct ctdb_record_handle);
733         if (tevent_req_nomem(state->h, req)) {
734                 return tevent_req_post(req, ev);
735         }
736         state->h->client = client;
737         state->h->db = db;
738         state->h->key.dptr = talloc_memdup(state->h, key.dptr, key.dsize);
739         if (tevent_req_nomem(state->h->key.dptr, req)) {
740                 return tevent_req_post(req, ev);
741         }
742         state->h->key.dsize = key.dsize;
743         state->h->readonly = false;
744
745         state->readonly = readonly;
746         state->pnn = ctdb_client_pnn(client);
747
748         /* Check that database is not persistent */
749         if (db->persistent) {
750                 tevent_req_error(req, EINVAL);
751                 return tevent_req_post(req, ev);
752         }
753
754         ret = ctdb_fetch_lock_check(req);
755         if (ret == 0) {
756                 tevent_req_done(req);
757                 return tevent_req_post(req, ev);
758         }
759         if (ret != EAGAIN) {
760                 tevent_req_error(req, ret);
761                 return tevent_req_post(req, ev);
762         }
763         return req;
764 }
765
766 static int ctdb_fetch_lock_check(struct tevent_req *req)
767 {
768         struct ctdb_fetch_lock_state *state = tevent_req_data(
769                 req, struct ctdb_fetch_lock_state);
770         struct ctdb_record_handle *h = state->h;
771         struct ctdb_ltdb_header header;
772         TDB_DATA data = tdb_null;
773         int ret, err = 0;
774         bool do_migrate = false;
775
776         ret = tdb_chainlock(state->h->db->ltdb->tdb, state->h->key);
777         if (ret != 0) {
778                 err = EIO;
779                 goto failed;
780         }
781
782         data = tdb_fetch(h->db->ltdb->tdb, h->key);
783         if (data.dptr == NULL) {
784                 if (tdb_error(h->db->ltdb->tdb) == TDB_ERR_NOEXIST) {
785                         goto migrate;
786                 } else {
787                         err = EIO;
788                         goto failed;
789                 }
790         }
791
792         /* Got the record */
793         ret = ctdb_ltdb_header_pull(data.dptr, data.dsize, &header);
794         if (ret != 0) {
795                 err = ret;
796                 goto failed;
797         }
798
799         if (! state->readonly) {
800                 /* Read/write access */
801                 if (header.dmaster == state->pnn &&
802                     header.flags & CTDB_REC_RO_HAVE_DELEGATIONS) {
803                         goto migrate;
804                 }
805
806                 if (header.dmaster != state->pnn) {
807                         goto migrate;
808                 }
809         } else {
810                 /* Readonly access */
811                 if (header.dmaster != state->pnn &&
812                     ! (header.flags & (CTDB_REC_RO_HAVE_READONLY |
813                                        CTDB_REC_RO_HAVE_DELEGATIONS))) {
814                         goto migrate;
815                 }
816         }
817
818         /* We are the dmaster or readonly delegation */
819         h->header = header;
820         h->data = data;
821         if (header.flags & (CTDB_REC_RO_HAVE_READONLY |
822                             CTDB_REC_RO_HAVE_DELEGATIONS)) {
823                 h->readonly = true;
824         }
825         return 0;
826
827 migrate:
828         do_migrate = true;
829         err = EAGAIN;
830
831 failed:
832         if (data.dptr != NULL) {
833                 free(data.dptr);
834         }
835         ret = tdb_chainunlock(h->db->ltdb->tdb, h->key);
836         if (ret != 0) {
837                 DEBUG(DEBUG_ERR, ("tdb_chainunlock failed on %s\n",
838                                   h->db->db_name));
839                 return EIO;
840         }
841
842         if (do_migrate) {
843                 ctdb_fetch_lock_migrate(req);
844         }
845         return err;
846 }
847
848 static void ctdb_fetch_lock_migrate(struct tevent_req *req)
849 {
850         struct ctdb_fetch_lock_state *state = tevent_req_data(
851                 req, struct ctdb_fetch_lock_state);
852         struct ctdb_req_call request;
853         struct tevent_req *subreq;
854
855         ZERO_STRUCT(request);
856         request.flags = CTDB_IMMEDIATE_MIGRATION;
857         if (state->readonly) {
858                 request.flags |= CTDB_WANT_READONLY;
859         }
860         request.db_id = state->h->db->db_id;
861         request.callid = CTDB_NULL_FUNC;
862         request.key = state->h->key;
863
864         subreq = ctdb_client_call_send(state, state->ev, state->client,
865                                        &request);
866         if (tevent_req_nomem(subreq, req)) {
867                 return;
868         }
869
870         tevent_req_set_callback(subreq, ctdb_fetch_lock_migrate_done, req);
871 }
872
873 static void ctdb_fetch_lock_migrate_done(struct tevent_req *subreq)
874 {
875         struct tevent_req *req = tevent_req_callback_data(
876                 subreq, struct tevent_req);
877         struct ctdb_fetch_lock_state *state = tevent_req_data(
878                 req, struct ctdb_fetch_lock_state);
879         struct ctdb_reply_call *reply;
880         int ret;
881         bool status;
882
883         status = ctdb_client_call_recv(subreq, state, &reply, &ret);
884         TALLOC_FREE(subreq);
885         if (! status) {
886                 tevent_req_error(req, ret);
887                 return;
888         }
889
890         if (reply->status != 0) {
891                 tevent_req_error(req, EIO);
892                 return;
893         }
894         talloc_free(reply);
895
896         ret = ctdb_fetch_lock_check(req);
897         if (ret != 0) {
898                 tevent_req_error(req, ret);
899                 return;
900         }
901
902         tevent_req_done(req);
903 }
904
905 static int ctdb_record_handle_destructor(struct ctdb_record_handle *h)
906 {
907         tdb_chainunlock(h->db->ltdb->tdb, h->key);
908         free(h->data.dptr);
909         return 0;
910 }
911
912 struct ctdb_record_handle *ctdb_fetch_lock_recv(struct tevent_req *req,
913                                                 struct ctdb_ltdb_header *header,
914                                                 TALLOC_CTX *mem_ctx,
915                                                 TDB_DATA *data, int *perr)
916 {
917         struct ctdb_fetch_lock_state *state = tevent_req_data(
918                 req, struct ctdb_fetch_lock_state);
919         struct ctdb_record_handle *h = state->h;
920         int err;
921
922         if (tevent_req_is_unix_error(req, &err)) {
923                 if (perr != NULL) {
924                         *perr = err;
925                 }
926                 return NULL;
927         }
928
929         if (header != NULL) {
930                 *header = h->header;
931         }
932         if (data != NULL) {
933                 size_t offset;
934
935                 offset = ctdb_ltdb_header_len(&h->header);
936
937                 data->dsize = h->data.dsize - offset;
938                 data->dptr = talloc_memdup(mem_ctx, h->data.dptr + offset,
939                                            data->dsize);
940                 if (data->dptr == NULL) {
941                         TALLOC_FREE(state->h);
942                         if (perr != NULL) {
943                                 *perr = ENOMEM;
944                         }
945                         return NULL;
946                 }
947         }
948
949         talloc_set_destructor(h, ctdb_record_handle_destructor);
950         return h;
951 }
952
953 int ctdb_fetch_lock(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
954                     struct ctdb_client_context *client,
955                     struct ctdb_db_context *db, TDB_DATA key, bool readonly,
956                     struct ctdb_record_handle **out,
957                     struct ctdb_ltdb_header *header, TDB_DATA *data)
958 {
959         struct tevent_req *req;
960         struct ctdb_record_handle *h;
961         int ret;
962
963         req = ctdb_fetch_lock_send(mem_ctx, ev, client, db, key, readonly);
964         if (req == NULL) {
965                 return ENOMEM;
966         }
967
968         tevent_req_poll(req, ev);
969
970         h = ctdb_fetch_lock_recv(req, header, mem_ctx, data, &ret);
971         if (h == NULL) {
972                 return ret;
973         }
974
975         *out = h;
976         return 0;
977 }
978
979 int ctdb_store_record(struct ctdb_record_handle *h, TDB_DATA data)
980 {
981         TDB_DATA rec;
982         size_t offset;
983         int ret;
984
985         /* Cannot modify the record if it was obtained as a readonly copy */
986         if (h->readonly) {
987                 return EINVAL;
988         }
989
990         /* Check if the new data is same */
991         if (h->data.dsize == data.dsize &&
992             memcmp(h->data.dptr, data.dptr, data.dsize) == 0) {
993                 /* No need to do anything */
994                 return 0;
995         }
996
997         offset = ctdb_ltdb_header_len(&h->header);
998         rec.dsize = offset + data.dsize;
999         rec.dptr = talloc_size(h, rec.dsize);
1000         if (rec.dptr == NULL) {
1001                 return ENOMEM;
1002         }
1003
1004         ctdb_ltdb_header_push(&h->header, rec.dptr);
1005         memcpy(rec.dptr + offset, data.dptr, data.dsize);
1006
1007         ret = tdb_store(h->db->ltdb->tdb, h->key, rec, TDB_REPLACE);
1008         if (ret != 0) {
1009                 DEBUG(DEBUG_ERR, ("Failed to store record in DB %s\n",
1010                                   h->db->db_name));
1011                 return EIO;
1012         }
1013
1014         talloc_free(rec.dptr);
1015         return 0;
1016 }
1017
1018 int ctdb_delete_record(struct ctdb_record_handle *h)
1019 {
1020         TDB_DATA rec;
1021         struct ctdb_key_data key;
1022         int ret;
1023
1024         /* Cannot delete the record if it was obtained as a readonly copy */
1025         if (h->readonly) {
1026                 return EINVAL;
1027         }
1028
1029         rec.dsize = ctdb_ltdb_header_len(&h->header);
1030         rec.dptr = talloc_size(h, rec.dsize);
1031         if (rec.dptr == NULL) {
1032                 return ENOMEM;
1033         }
1034
1035         ctdb_ltdb_header_push(&h->header, rec.dptr);
1036
1037         ret = tdb_store(h->db->ltdb->tdb, h->key, rec, TDB_REPLACE);
1038         talloc_free(rec.dptr);
1039         if (ret != 0) {
1040                 DEBUG(DEBUG_ERR, ("Failed to delete record in DB %s\n",
1041                                   h->db->db_name));
1042                 return EIO;
1043         }
1044
1045         key.db_id = h->db->db_id;
1046         key.header = h->header;
1047         key.key = h->key;
1048
1049         ret = ctdb_ctrl_schedule_for_deletion(h, h->ev, h->client,
1050                                               h->client->pnn,
1051                                               tevent_timeval_zero(), &key);
1052         if (ret != 0) {
1053                 DEBUG(DEBUG_WARNING,
1054                       ("Failed to mark record to be deleted in DB %s\n",
1055                        h->db->db_name));
1056                 return ret;
1057         }
1058
1059         return 0;
1060 }
1061
1062 /*
1063  * Global lock functions
1064  */
1065
1066 struct ctdb_g_lock_lock_state {
1067         struct tevent_context *ev;
1068         struct ctdb_client_context *client;
1069         struct ctdb_db_context *db;
1070         TDB_DATA key;
1071         struct ctdb_server_id my_sid;
1072         enum ctdb_g_lock_type lock_type;
1073         struct ctdb_record_handle *h;
1074         /* state for verification of active locks */
1075         struct ctdb_g_lock_list *lock_list;
1076         unsigned int current;
1077 };
1078
1079 static void ctdb_g_lock_lock_fetched(struct tevent_req *subreq);
1080 static void ctdb_g_lock_lock_process_locks(struct tevent_req *req);
1081 static void ctdb_g_lock_lock_checked(struct tevent_req *subreq);
1082 static int ctdb_g_lock_lock_update(struct tevent_req *req);
1083 static void ctdb_g_lock_lock_retry(struct tevent_req *subreq);
1084
1085 static bool ctdb_g_lock_conflicts(enum ctdb_g_lock_type l1,
1086                                   enum ctdb_g_lock_type l2)
1087 {
1088         if ((l1 == CTDB_G_LOCK_READ) && (l2 == CTDB_G_LOCK_READ)) {
1089                 return false;
1090         }
1091         return true;
1092 }
1093
1094 struct tevent_req *ctdb_g_lock_lock_send(TALLOC_CTX *mem_ctx,
1095                                          struct tevent_context *ev,
1096                                          struct ctdb_client_context *client,
1097                                          struct ctdb_db_context *db,
1098                                          const char *keyname,
1099                                          struct ctdb_server_id *sid,
1100                                          bool readonly)
1101 {
1102         struct tevent_req *req, *subreq;
1103         struct ctdb_g_lock_lock_state *state;
1104
1105         req = tevent_req_create(mem_ctx, &state,
1106                                 struct ctdb_g_lock_lock_state);
1107         if (req == NULL) {
1108                 return NULL;
1109         }
1110
1111         state->ev = ev;
1112         state->client = client;
1113         state->db = db;
1114         state->key.dptr = discard_const(keyname);
1115         state->key.dsize = strlen(keyname) + 1;
1116         state->my_sid = *sid;
1117         state->lock_type = (readonly ? CTDB_G_LOCK_READ : CTDB_G_LOCK_WRITE);
1118
1119         subreq = ctdb_fetch_lock_send(state, ev, client, db, state->key,
1120                                       false);
1121         if (tevent_req_nomem(subreq, req)) {
1122                 return tevent_req_post(req, ev);
1123         }
1124         tevent_req_set_callback(subreq, ctdb_g_lock_lock_fetched, req);
1125
1126         return req;
1127 }
1128
1129 static void ctdb_g_lock_lock_fetched(struct tevent_req *subreq)
1130 {
1131         struct tevent_req *req = tevent_req_callback_data(
1132                 subreq, struct tevent_req);
1133         struct ctdb_g_lock_lock_state *state = tevent_req_data(
1134                 req, struct ctdb_g_lock_lock_state);
1135         TDB_DATA data;
1136         int ret = 0;
1137
1138         state->h = ctdb_fetch_lock_recv(subreq, NULL, state, &data, &ret);
1139         TALLOC_FREE(subreq);
1140         if (state->h == NULL) {
1141                 tevent_req_error(req, ret);
1142                 return;
1143         }
1144
1145         if (state->lock_list != NULL) {
1146                 TALLOC_FREE(state->lock_list);
1147                 state->current = 0;
1148         }
1149
1150         ret = ctdb_g_lock_list_pull(data.dptr, data.dsize, state,
1151                                     &state->lock_list);
1152         talloc_free(data.dptr);
1153         if (ret != 0) {
1154                 tevent_req_error(req, ret);
1155                 return;
1156         }
1157
1158         ctdb_g_lock_lock_process_locks(req);
1159 }
1160
1161 static void ctdb_g_lock_lock_process_locks(struct tevent_req *req)
1162 {
1163         struct ctdb_g_lock_lock_state *state = tevent_req_data(
1164                 req, struct ctdb_g_lock_lock_state);
1165         struct tevent_req *subreq;
1166         struct ctdb_g_lock *lock;
1167         bool check_server = false;
1168         int ret;
1169
1170         while (state->current < state->lock_list->num) {
1171                 lock = &state->lock_list->lock[state->current];
1172
1173                 /* We should not ask for the same lock more than once */
1174                 if (ctdb_server_id_equal(&lock->sid, &state->my_sid)) {
1175                         tevent_req_error(req, EDEADLK);
1176                         return;
1177                 }
1178
1179                 if (ctdb_g_lock_conflicts(lock->type, state->lock_type)) {
1180                         check_server = true;
1181                         break;
1182                 }
1183
1184                 state->current += 1;
1185         }
1186
1187         if (check_server) {
1188                 struct ctdb_req_control request;
1189                 struct ctdb_uint64_array u64_array;
1190
1191                 u64_array.num = 1;
1192                 u64_array.val = &lock->sid.unique_id;
1193
1194                 ctdb_req_control_check_srvids(&request, &u64_array);
1195                 subreq = ctdb_client_control_send(state, state->ev,
1196                                                   state->client,
1197                                                   state->client->pnn,
1198                                                   tevent_timeval_zero(),
1199                                                   &request);
1200                 if (tevent_req_nomem(subreq, req)) {
1201                         return;
1202                 }
1203                 tevent_req_set_callback(subreq, ctdb_g_lock_lock_checked, req);
1204                 return;
1205         }
1206
1207         /* There is no conflict, add ourself to the lock_list */
1208         state->lock_list->lock = talloc_realloc(state->lock_list,
1209                                                 state->lock_list->lock,
1210                                                 struct ctdb_g_lock,
1211                                                 state->lock_list->num + 1);
1212         if (state->lock_list->lock == NULL) {
1213                 tevent_req_error(req, ENOMEM);
1214                 return;
1215         }
1216
1217         lock = &state->lock_list->lock[state->lock_list->num];
1218         lock->type = state->lock_type;
1219         lock->sid = state->my_sid;
1220         state->lock_list->num += 1;
1221
1222         ret = ctdb_g_lock_lock_update(req);
1223         if (ret != 0) {
1224                 tevent_req_error(req, ret);
1225                 return;
1226         }
1227
1228         tevent_req_done(req);
1229 }
1230
1231 static void ctdb_g_lock_lock_checked(struct tevent_req *subreq)
1232 {
1233         struct tevent_req *req = tevent_req_callback_data(
1234                 subreq, struct tevent_req);
1235         struct ctdb_g_lock_lock_state *state = tevent_req_data(
1236                 req, struct ctdb_g_lock_lock_state);
1237         struct ctdb_reply_control *reply;
1238         struct ctdb_uint8_array *u8_array;
1239         int ret;
1240         bool status;
1241         int8_t val;
1242
1243         status = ctdb_client_control_recv(subreq, &ret, state, &reply);
1244         TALLOC_FREE(subreq);
1245         if (! status) {
1246                 tevent_req_error(req, ret);
1247                 return;
1248         }
1249
1250         ret = ctdb_reply_control_check_srvids(reply, state, &u8_array);
1251         if (ret != 0) {
1252                 tevent_req_error(req, ENOMEM);
1253                 return;
1254         }
1255
1256         if (u8_array->num != 1) {
1257                 talloc_free(u8_array);
1258                 tevent_req_error(req, EIO);
1259                 return;
1260         }
1261
1262         val = u8_array->val[0];
1263         talloc_free(u8_array);
1264
1265         if (val == 1) {
1266                 /* server process exists, need to retry */
1267                 subreq = tevent_wakeup_send(state, state->ev,
1268                                             tevent_timeval_current_ofs(1,0));
1269                 if (tevent_req_nomem(subreq, req)) {
1270                         return;
1271                 }
1272                 tevent_req_set_callback(subreq, ctdb_g_lock_lock_retry, req);
1273                 return;
1274         }
1275
1276         /* server process does not exist, remove conflicting entry */
1277         state->lock_list->lock[state->current] =
1278                 state->lock_list->lock[state->lock_list->num-1];
1279         state->lock_list->num -= 1;
1280
1281         ret = ctdb_g_lock_lock_update(req);
1282         if (ret != 0) {
1283                 tevent_req_error(req, ret);
1284                 return;
1285         }
1286
1287         ctdb_g_lock_lock_process_locks(req);
1288 }
1289
1290 static int ctdb_g_lock_lock_update(struct tevent_req *req)
1291 {
1292         struct ctdb_g_lock_lock_state *state = tevent_req_data(
1293                 req, struct ctdb_g_lock_lock_state);
1294         TDB_DATA data;
1295         int ret;
1296
1297         data.dsize = ctdb_g_lock_list_len(state->lock_list);
1298         data.dptr = talloc_size(state, data.dsize);
1299         if (data.dptr == NULL) {
1300                 return ENOMEM;
1301         }
1302
1303         ctdb_g_lock_list_push(state->lock_list, data.dptr);
1304         ret = ctdb_store_record(state->h, data);
1305         talloc_free(data.dptr);
1306         return ret;
1307 }
1308
1309 #if 0
1310 static int ctdb_g_lock_lock_update(struct ctdb_g_lock_lock_state *state,
1311                                    struct ctdb_g_lock_list *lock_list,
1312                                    struct ctdb_record_handle *h)
1313 {
1314         struct ctdb_g_lock *lock;
1315         bool conflict = false;
1316         bool modified = false;
1317         int ret, i;
1318
1319         for (i=0; i<lock_list->num; i++) {
1320                 lock = &lock_list->lock[i];
1321
1322                 /* We should not ask for lock more than once */
1323                 if (ctdb_server_id_equal(&lock->sid, &state->my_sid)) {
1324                         return EDEADLK;
1325                 }
1326
1327                 if (ctdb_g_lock_conflicts(lock->type, state->lock_type)) {
1328                         bool exists;
1329
1330                         conflict = true;
1331                         ret = ctdb_server_id_exists(state->client, &lock->sid,
1332                                                     &exists);
1333                         if (ret != 0) {
1334                                 return ret;
1335                         }
1336
1337                         if (exists) {
1338                                 break;
1339                         }
1340
1341                         /* Server does not exist, delete conflicting entry */
1342                         lock_list->lock[i] = lock_list->lock[lock_list->num-1];
1343                         lock_list->num -= 1;
1344                         modified = true;
1345                 }
1346         }
1347
1348         if (! conflict) {
1349                 lock = talloc_realloc(lock_list, lock_list->lock,
1350                                       struct ctdb_g_lock, lock_list->num+1);
1351                 if (lock == NULL) {
1352                         return ENOMEM;
1353                 }
1354
1355                 lock[lock_list->num].type = state->lock_type;
1356                 lock[lock_list->num].sid = state->my_sid;
1357                 lock_list->lock = lock;
1358                 lock_list->num += 1;
1359                 modified = true;
1360         }
1361
1362         if (modified) {
1363                 TDB_DATA data;
1364
1365                 data.dsize = ctdb_g_lock_list_len(lock_list);
1366                 data.dptr = talloc_size(state, data.dsize);
1367                 if (data.dptr == NULL) {
1368                         return ENOMEM;
1369                 }
1370
1371                 ctdb_g_lock_list_push(lock_list, data.dptr);
1372                 ret = ctdb_store_record(h, data);
1373                 talloc_free(data.dptr);
1374                 if (ret != 0) {
1375                         return ret;
1376                 }
1377         }
1378
1379         if (conflict) {
1380                 return EAGAIN;
1381         }
1382         return 0;
1383 }
1384 #endif
1385
1386 static void ctdb_g_lock_lock_retry(struct tevent_req *subreq)
1387 {
1388         struct tevent_req *req = tevent_req_callback_data(
1389                 subreq, struct tevent_req);
1390         struct ctdb_g_lock_lock_state *state = tevent_req_data(
1391                 req, struct ctdb_g_lock_lock_state);
1392         bool success;
1393
1394         success = tevent_wakeup_recv(subreq);
1395         TALLOC_FREE(subreq);
1396         if (! success) {
1397                 tevent_req_error(req, ENOMEM);
1398                 return;
1399         }
1400
1401         subreq = ctdb_fetch_lock_send(state, state->ev, state->client,
1402                                       state->db, state->key, false);
1403         if (tevent_req_nomem(subreq, req)) {
1404                 return;
1405         }
1406         tevent_req_set_callback(subreq, ctdb_g_lock_lock_fetched, req);
1407 }
1408
1409 bool ctdb_g_lock_lock_recv(struct tevent_req *req, int *perr)
1410 {
1411         struct ctdb_g_lock_lock_state *state = tevent_req_data(
1412                 req, struct ctdb_g_lock_lock_state);
1413         int err;
1414
1415         TALLOC_FREE(state->h);
1416
1417         if (tevent_req_is_unix_error(req, &err)) {
1418                 if (perr != NULL) {
1419                         *perr = err;
1420                 }
1421                 return false;
1422         }
1423
1424         return true;
1425 }
1426
1427 struct ctdb_g_lock_unlock_state {
1428         struct tevent_context *ev;
1429         struct ctdb_client_context *client;
1430         struct ctdb_db_context *db;
1431         TDB_DATA key;
1432         struct ctdb_server_id my_sid;
1433         struct ctdb_record_handle *h;
1434         struct ctdb_g_lock_list *lock_list;
1435 };
1436
1437 static void ctdb_g_lock_unlock_fetched(struct tevent_req *subreq);
1438 static int ctdb_g_lock_unlock_update(struct tevent_req *req);
1439
1440 struct tevent_req *ctdb_g_lock_unlock_send(TALLOC_CTX *mem_ctx,
1441                                            struct tevent_context *ev,
1442                                            struct ctdb_client_context *client,
1443                                            struct ctdb_db_context *db,
1444                                            const char *keyname,
1445                                            struct ctdb_server_id sid)
1446 {
1447         struct tevent_req *req, *subreq;
1448         struct ctdb_g_lock_unlock_state *state;
1449
1450         req = tevent_req_create(mem_ctx, &state,
1451                                 struct ctdb_g_lock_unlock_state);
1452         if (req == NULL) {
1453                 return NULL;
1454         }
1455
1456         state->ev = ev;
1457         state->client = client;
1458         state->db = db;
1459         state->key.dptr = discard_const(keyname);
1460         state->key.dsize = strlen(keyname) + 1;
1461         state->my_sid = sid;
1462
1463         subreq = ctdb_fetch_lock_send(state, ev, client, db, state->key,
1464                                       false);
1465         if (tevent_req_nomem(subreq, req)) {
1466                 return tevent_req_post(req, ev);
1467         }
1468         tevent_req_set_callback(subreq, ctdb_g_lock_unlock_fetched, req);
1469
1470         return req;
1471 }
1472
1473 static void ctdb_g_lock_unlock_fetched(struct tevent_req *subreq)
1474 {
1475         struct tevent_req *req = tevent_req_callback_data(
1476                 subreq, struct tevent_req);
1477         struct ctdb_g_lock_unlock_state *state = tevent_req_data(
1478                 req, struct ctdb_g_lock_unlock_state);
1479         TDB_DATA data;
1480         int ret = 0;
1481
1482         state->h = ctdb_fetch_lock_recv(subreq, NULL, state, &data, &ret);
1483         TALLOC_FREE(subreq);
1484         if (state->h == NULL) {
1485                 tevent_req_error(req, ret);
1486                 return;
1487         }
1488
1489         ret = ctdb_g_lock_list_pull(data.dptr, data.dsize, state,
1490                                     &state->lock_list);
1491         if (ret != 0) {
1492                 tevent_req_error(req, ret);
1493                 return;
1494         }
1495
1496         ret = ctdb_g_lock_unlock_update(req);
1497         if (ret != 0) {
1498                 tevent_req_error(req, ret);
1499                 return;
1500         }
1501
1502         tevent_req_done(req);
1503 }
1504
1505 static int ctdb_g_lock_unlock_update(struct tevent_req *req)
1506 {
1507         struct ctdb_g_lock_unlock_state *state = tevent_req_data(
1508                 req, struct ctdb_g_lock_unlock_state);
1509         struct ctdb_g_lock *lock;
1510         int ret, i;
1511
1512         for (i=0; i<state->lock_list->num; i++) {
1513                 lock = &state->lock_list->lock[i];
1514
1515                 if (ctdb_server_id_equal(&lock->sid, &state->my_sid)) {
1516                         break;
1517                 }
1518         }
1519
1520         if (i < state->lock_list->num) {
1521                 state->lock_list->lock[i] =
1522                         state->lock_list->lock[state->lock_list->num-1];
1523                 state->lock_list->num -= 1;
1524         }
1525
1526         if (state->lock_list->num == 0) {
1527                 ctdb_delete_record(state->h);
1528         } else {
1529                 TDB_DATA data;
1530
1531                 data.dsize = ctdb_g_lock_list_len(state->lock_list);
1532                 data.dptr = talloc_size(state, data.dsize);
1533                 if (data.dptr == NULL) {
1534                         return ENOMEM;
1535                 }
1536
1537                 ctdb_g_lock_list_push(state->lock_list, data.dptr);
1538                 ret = ctdb_store_record(state->h, data);
1539                 talloc_free(data.dptr);
1540                 if (ret != 0) {
1541                         return ret;
1542                 }
1543         }
1544
1545         return 0;
1546 }
1547
1548 bool ctdb_g_lock_unlock_recv(struct tevent_req *req, int *perr)
1549 {
1550         struct ctdb_g_lock_unlock_state *state = tevent_req_data(
1551                 req, struct ctdb_g_lock_unlock_state);
1552         int err;
1553
1554         TALLOC_FREE(state->h);
1555
1556         if (tevent_req_is_unix_error(req, &err)) {
1557                 if (perr != NULL) {
1558                         *perr = err;
1559                 }
1560                 return false;
1561         }
1562
1563         return true;
1564 }
1565
1566 /*
1567  * Persistent database functions
1568  */
1569 struct ctdb_transaction_start_state {
1570         struct tevent_context *ev;
1571         struct ctdb_client_context *client;
1572         struct timeval timeout;
1573         struct ctdb_transaction_handle *h;
1574         uint32_t destnode;
1575 };
1576
1577 static void ctdb_transaction_g_lock_attached(struct tevent_req *subreq);
1578 static void ctdb_transaction_register_done(struct tevent_req *subreq);
1579 static void ctdb_transaction_g_lock_done(struct tevent_req *subreq);
1580 static int ctdb_transaction_handle_destructor(struct ctdb_transaction_handle *h);
1581
1582 struct tevent_req *ctdb_transaction_start_send(TALLOC_CTX *mem_ctx,
1583                                                struct tevent_context *ev,
1584                                                struct ctdb_client_context *client,
1585                                                struct timeval timeout,
1586                                                struct ctdb_db_context *db,
1587                                                bool readonly)
1588 {
1589         struct ctdb_transaction_start_state *state;
1590         struct tevent_req *req, *subreq;
1591         struct ctdb_transaction_handle *h;
1592
1593         req = tevent_req_create(mem_ctx, &state,
1594                                 struct ctdb_transaction_start_state);
1595         if (req == NULL) {
1596                 return NULL;
1597         }
1598
1599         if (! db->persistent) {
1600                 tevent_req_error(req, EINVAL);
1601                 return tevent_req_post(req, ev);
1602         }
1603
1604         state->ev = ev;
1605         state->client = client;
1606         state->destnode = ctdb_client_pnn(client);
1607
1608         h = talloc_zero(db, struct ctdb_transaction_handle);
1609         if (tevent_req_nomem(h, req)) {
1610                 return tevent_req_post(req, ev);
1611         }
1612
1613         h->client = client;
1614         h->db = db;
1615         h->readonly = readonly;
1616         h->updated = false;
1617
1618         /* SRVID is unique for databases, so client can have transactions active
1619          * for multiple databases */
1620         h->sid.pid = getpid();
1621         h->sid.task_id = db->db_id;
1622         h->sid.vnn = state->destnode;
1623         h->sid.unique_id = h->sid.task_id;
1624         h->sid.unique_id = (h->sid.unique_id << 32) | h->sid.pid;
1625
1626         h->recbuf = talloc_zero(h, struct ctdb_rec_buffer);
1627         if (tevent_req_nomem(h->recbuf, req)) {
1628                 return tevent_req_post(req, ev);
1629         }
1630
1631         h->lock_name = talloc_asprintf(h, "transaction_db_0x%08x", db->db_id);
1632         if (tevent_req_nomem(h->lock_name, req)) {
1633                 return tevent_req_post(req, ev);
1634         }
1635
1636         state->h = h;
1637
1638         subreq = ctdb_attach_send(state, ev, client, timeout, "g_lock.tdb", 0);
1639         if (tevent_req_nomem(subreq, req)) {
1640                 return tevent_req_post(req, ev);
1641         }
1642         tevent_req_set_callback(subreq, ctdb_transaction_g_lock_attached, req);
1643
1644         return req;
1645 }
1646
1647 static void ctdb_transaction_g_lock_attached(struct tevent_req *subreq)
1648 {
1649         struct tevent_req *req = tevent_req_callback_data(
1650                 subreq, struct tevent_req);
1651         struct ctdb_transaction_start_state *state = tevent_req_data(
1652                 req, struct ctdb_transaction_start_state);
1653         struct ctdb_req_control request;
1654         bool status;
1655         int ret;
1656
1657         status = ctdb_attach_recv(subreq, &ret, &state->h->db_g_lock);
1658         TALLOC_FREE(subreq);
1659         if (! status) {
1660                 tevent_req_error(req, ret);
1661                 return;
1662         }
1663
1664         ctdb_req_control_register_srvid(&request, state->h->sid.unique_id);
1665         subreq = ctdb_client_control_send(state, state->ev, state->client,
1666                                           state->destnode, state->timeout,
1667                                           &request);
1668         if (tevent_req_nomem(subreq, req)) {
1669                 return;
1670         }
1671         tevent_req_set_callback(subreq, ctdb_transaction_register_done, req);
1672 }
1673
1674 static void ctdb_transaction_register_done(struct tevent_req *subreq)
1675 {
1676         struct tevent_req *req = tevent_req_callback_data(
1677                 subreq, struct tevent_req);
1678         struct ctdb_transaction_start_state *state = tevent_req_data(
1679                 req, struct ctdb_transaction_start_state);
1680         struct ctdb_reply_control *reply;
1681         bool status;
1682         int ret;
1683
1684         status = ctdb_client_control_recv(subreq, &ret, state, &reply);
1685         TALLOC_FREE(subreq);
1686         if (! status) {
1687                 tevent_req_error(req, ret);
1688                 return;
1689         }
1690
1691         ret = ctdb_reply_control_register_srvid(reply);
1692         talloc_free(reply);
1693         if (ret != 0) {
1694                 tevent_req_error(req, ret);
1695                 return;
1696         }
1697
1698         subreq = ctdb_g_lock_lock_send(state, state->ev, state->client,
1699                                        state->h->db_g_lock, state->h->lock_name,
1700                                        &state->h->sid, state->h->readonly);
1701         if (tevent_req_nomem(subreq, req)) {
1702                 return;
1703         }
1704         tevent_req_set_callback(subreq, ctdb_transaction_g_lock_done, req);
1705 }
1706
1707 static void ctdb_transaction_g_lock_done(struct tevent_req *subreq)
1708 {
1709         struct tevent_req *req = tevent_req_callback_data(
1710                 subreq, struct tevent_req);
1711         int ret;
1712         bool status;
1713
1714         status = ctdb_g_lock_lock_recv(subreq, &ret);
1715         TALLOC_FREE(subreq);
1716         if (! status) {
1717                 tevent_req_error(req, ret);
1718                 return;
1719         }
1720
1721         tevent_req_done(req);
1722 }
1723
1724 struct ctdb_transaction_handle *ctdb_transaction_start_recv(
1725                                         struct tevent_req *req,
1726                                         int *perr)
1727 {
1728         struct ctdb_transaction_start_state *state = tevent_req_data(
1729                 req, struct ctdb_transaction_start_state);
1730         struct ctdb_transaction_handle *h = state->h;
1731         int err;
1732
1733         if (tevent_req_is_unix_error(req, &err)) {
1734                 if (perr != NULL) {
1735                         *perr = err;
1736                 }
1737                 return NULL;
1738         }
1739
1740         talloc_set_destructor(h, ctdb_transaction_handle_destructor);
1741         return h;
1742 }
1743
1744 static int ctdb_transaction_handle_destructor(struct ctdb_transaction_handle *h)
1745 {
1746         int ret;
1747
1748         ret = ctdb_ctrl_deregister_srvid(h, h->ev, h->client, h->client->pnn,
1749                                          tevent_timeval_zero(),
1750                                          h->sid.unique_id);
1751         if (ret != 0) {
1752                 DEBUG(DEBUG_WARNING, ("Failed to deregister SRVID\n"));
1753         }
1754
1755         return 0;
1756 }
1757
1758 int ctdb_transaction_start(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
1759                            struct ctdb_client_context *client,
1760                            struct timeval timeout,
1761                            struct ctdb_db_context *db, bool readonly,
1762                            struct ctdb_transaction_handle **out)
1763 {
1764         struct tevent_req *req;
1765         struct ctdb_transaction_handle *h;
1766         int ret;
1767
1768         req = ctdb_transaction_start_send(mem_ctx, ev, client, timeout, db,
1769                                           readonly);
1770         if (req == NULL) {
1771                 return ENOMEM;
1772         }
1773
1774         tevent_req_poll(req, ev);
1775
1776         h = ctdb_transaction_start_recv(req, &ret);
1777         if (h == NULL) {
1778                 return ret;
1779         }
1780
1781         *out = h;
1782         return 0;
1783 }
1784
1785 struct ctdb_transaction_record_fetch_state {
1786         TDB_DATA key, data;
1787         struct ctdb_ltdb_header header;
1788         bool found;
1789 };
1790
1791 static int ctdb_transaction_record_fetch_traverse(uint32_t reqid,
1792                                                   struct ctdb_ltdb_header *header,
1793                                                   TDB_DATA key,
1794                                                   TDB_DATA data,
1795                                                   void *private_data)
1796 {
1797         struct ctdb_transaction_record_fetch_state *state =
1798                 (struct ctdb_transaction_record_fetch_state *)private_data;
1799
1800         if (state->key.dsize == key.dsize &&
1801             memcmp(state->key.dptr, key.dptr, key.dsize) == 0) {
1802                 state->data = data;
1803                 state->header = *header;
1804                 state->found = true;
1805         }
1806
1807         return 0;
1808 }
1809
1810 static int ctdb_transaction_record_fetch(struct ctdb_transaction_handle *h,
1811                                          TDB_DATA key,
1812                                          struct ctdb_ltdb_header *header,
1813                                          TDB_DATA *data)
1814 {
1815         struct ctdb_transaction_record_fetch_state state;
1816         int ret;
1817
1818         state.key = key;
1819         state.found = false;
1820
1821         ret = ctdb_rec_buffer_traverse(h->recbuf,
1822                                        ctdb_transaction_record_fetch_traverse,
1823                                        &state);
1824         if (ret != 0) {
1825                 return ret;
1826         }
1827
1828         if (state.found) {
1829                 if (header != NULL) {
1830                         *header = state.header;
1831                 }
1832                 if (data != NULL) {
1833                         *data = state.data;
1834                 }
1835                 return 0;
1836         }
1837
1838         return ENOENT;
1839 }
1840
1841 int ctdb_transaction_fetch_record(struct ctdb_transaction_handle *h,
1842                                   TDB_DATA key,
1843                                   TALLOC_CTX *mem_ctx, TDB_DATA *data)
1844 {
1845         TDB_DATA tmp_data;
1846         struct ctdb_ltdb_header header;
1847         int ret;
1848
1849         ret = ctdb_transaction_record_fetch(h, key, NULL, &tmp_data);
1850         if (ret == 0) {
1851                 data->dptr = talloc_memdup(mem_ctx, tmp_data.dptr,
1852                                            tmp_data.dsize);
1853                 if (data->dptr == NULL) {
1854                         return ENOMEM;
1855                 }
1856                 data->dsize = tmp_data.dsize;
1857                 return 0;
1858         }
1859
1860         ret = ctdb_ltdb_fetch(h->db, key, &header, mem_ctx, data);
1861         if (ret != 0) {
1862                 return ret;
1863         }
1864
1865         ret = ctdb_rec_buffer_add(h, h->recbuf, 0, &header, key, *data);
1866         if (ret != 0) {
1867                 return ret;
1868         }
1869
1870         return 0;
1871 }
1872
1873 int ctdb_transaction_store_record(struct ctdb_transaction_handle *h,
1874                                   TDB_DATA key, TDB_DATA data)
1875 {
1876         TALLOC_CTX *tmp_ctx;
1877         struct ctdb_ltdb_header header;
1878         TDB_DATA old_data;
1879         int ret;
1880
1881         if (h->readonly) {
1882                 return EINVAL;
1883         }
1884
1885         tmp_ctx = talloc_new(h);
1886         if (tmp_ctx == NULL) {
1887                 return ENOMEM;
1888         }
1889
1890         ret = ctdb_transaction_record_fetch(h, key, &header, &old_data);
1891         if (ret != 0) {
1892                 ret = ctdb_ltdb_fetch(h->db, key, &header, tmp_ctx, &old_data);
1893                 if (ret != 0) {
1894                         return ret;
1895                 }
1896         }
1897
1898         if (old_data.dsize == data.dsize &&
1899             memcmp(old_data.dptr, data.dptr, data.dsize) == 0) {
1900                 talloc_free(tmp_ctx);
1901                 return 0;
1902         }
1903
1904         header.dmaster = ctdb_client_pnn(h->client);
1905         header.rsn += 1;
1906
1907         ret = ctdb_rec_buffer_add(h, h->recbuf, 0, &header, key, data);
1908         talloc_free(tmp_ctx);
1909         if (ret != 0) {
1910                 return ret;
1911         }
1912         h->updated = true;
1913
1914         return 0;
1915 }
1916
1917 int ctdb_transaction_delete_record(struct ctdb_transaction_handle *h,
1918                                    TDB_DATA key)
1919 {
1920         return ctdb_transaction_store_record(h, key, tdb_null);
1921 }
1922
1923 static int ctdb_transaction_store_db_seqnum(struct ctdb_transaction_handle *h,
1924                                             uint64_t seqnum)
1925 {
1926         const char *keyname = CTDB_DB_SEQNUM_KEY;
1927         TDB_DATA key, data;
1928
1929         key.dptr = discard_const(keyname);
1930         key.dsize = strlen(keyname) + 1;
1931
1932         data.dptr = (uint8_t *)&seqnum;
1933         data.dsize = sizeof(seqnum);
1934
1935         return ctdb_transaction_store_record(h, key, data);
1936 }
1937
1938 struct ctdb_transaction_commit_state {
1939         struct tevent_context *ev;
1940         struct ctdb_transaction_handle *h;
1941         uint64_t seqnum;
1942 };
1943
1944 static void ctdb_transaction_commit_done(struct tevent_req *subreq);
1945 static void ctdb_transaction_commit_try(struct tevent_req *subreq);
1946
1947 struct tevent_req *ctdb_transaction_commit_send(
1948                                         TALLOC_CTX *mem_ctx,
1949                                         struct tevent_context *ev,
1950                                         struct ctdb_transaction_handle *h)
1951 {
1952         struct tevent_req *req, *subreq;
1953         struct ctdb_transaction_commit_state *state;
1954         int ret;
1955
1956         req = tevent_req_create(mem_ctx, &state,
1957                                 struct ctdb_transaction_commit_state);
1958         if (req == NULL) {
1959                 return NULL;
1960         }
1961
1962         state->ev = ev;
1963         state->h = h;
1964
1965         ret = ctdb_ctrl_get_db_seqnum(state, ev, h->client,
1966                                       h->client->pnn, tevent_timeval_zero(),
1967                                       h->db->db_id, &state->seqnum);
1968         if (ret != 0) {
1969                 tevent_req_error(req, ret);
1970                 return tevent_req_post(req, ev);
1971         }
1972
1973         ret = ctdb_transaction_store_db_seqnum(h, state->seqnum+1);
1974         if (ret != 0) {
1975                 tevent_req_error(req, ret);
1976                 return tevent_req_post(req, ev);
1977         }
1978
1979         subreq = ctdb_recovery_wait_send(state, ev, h->client);
1980         if (tevent_req_nomem(subreq, req)) {
1981                 return tevent_req_post(req, ev);
1982         }
1983         tevent_req_set_callback(subreq, ctdb_transaction_commit_try, req);
1984
1985         return req;
1986 }
1987
1988 static void ctdb_transaction_commit_try(struct tevent_req *subreq)
1989 {
1990         struct tevent_req *req = tevent_req_callback_data(
1991                 subreq, struct tevent_req);
1992         struct ctdb_transaction_commit_state *state = tevent_req_data(
1993                 req, struct ctdb_transaction_commit_state);
1994         struct ctdb_req_control request;
1995         int ret;
1996         bool status;
1997
1998         status = ctdb_recovery_wait_recv(subreq, &ret);
1999         TALLOC_FREE(subreq);
2000         if (! status) {
2001                 tevent_req_error(req, ret);
2002                 return;
2003         }
2004
2005         ctdb_req_control_trans3_commit(&request, state->h->recbuf);
2006         subreq = ctdb_client_control_send(state, state->ev, state->h->client,
2007                                           state->h->client->pnn,
2008                                           tevent_timeval_zero(), &request);
2009         if (tevent_req_nomem(subreq, req)) {
2010                 return;
2011         }
2012         tevent_req_set_callback(subreq, ctdb_transaction_commit_done, req);
2013 }
2014
2015 static void ctdb_transaction_commit_done(struct tevent_req *subreq)
2016 {
2017         struct tevent_req *req = tevent_req_callback_data(
2018                 subreq, struct tevent_req);
2019         struct ctdb_transaction_commit_state *state = tevent_req_data(
2020                 req, struct ctdb_transaction_commit_state);
2021         struct ctdb_reply_control *reply;
2022         uint64_t seqnum;
2023         int ret;
2024         bool status;
2025
2026         status = ctdb_client_control_recv(subreq, &ret, state, &reply);
2027         TALLOC_FREE(subreq);
2028         if (! status) {
2029                 tevent_req_error(req, ret);
2030                 return;
2031         }
2032
2033         ret = ctdb_reply_control_trans3_commit(reply);
2034         if (ret < 0) {
2035                 /* Control failed due to recovery */
2036                 subreq = ctdb_recovery_wait_send(state, state->ev,
2037                                                  state->h->client);
2038                 if (tevent_req_nomem(subreq, req)) {
2039                         return;
2040                 }
2041                 tevent_req_set_callback(subreq, ctdb_transaction_commit_try,
2042                                         req);
2043                 return;
2044         }
2045
2046         ret = ctdb_ctrl_get_db_seqnum(state, state->ev, state->h->client,
2047                                       state->h->client->pnn,
2048                                       tevent_timeval_zero(),
2049                                       state->h->db->db_id, &seqnum);
2050         if (ret != 0) {
2051                 tevent_req_error(req, ret);
2052                 return;
2053         }
2054
2055         if (seqnum == state->seqnum) {
2056                 subreq = ctdb_recovery_wait_send(state, state->ev,
2057                                                  state->h->client);
2058                 if (tevent_req_nomem(subreq, req)) {
2059                         return;
2060                 }
2061                 tevent_req_set_callback(subreq, ctdb_transaction_commit_try,
2062                                         req);
2063                 return;
2064         }
2065
2066         if (seqnum != state->seqnum + 1) {
2067                 tevent_req_error(req, EIO);
2068                 return;
2069         }
2070
2071         tevent_req_done(req);
2072 }
2073
2074 bool ctdb_transaction_commit_recv(struct tevent_req *req, int *perr)
2075 {
2076         struct ctdb_transaction_commit_state *state = tevent_req_data(
2077                 req, struct ctdb_transaction_commit_state);
2078         int err;
2079
2080         if (tevent_req_is_unix_error(req, &err)) {
2081                 if (perr != NULL) {
2082                         *perr = err;
2083                 }
2084                 TALLOC_FREE(state->h);
2085                 return false;
2086         }
2087
2088         TALLOC_FREE(state->h);
2089         return true;
2090 }
2091
2092 int ctdb_transaction_commit(struct ctdb_transaction_handle *h)
2093 {
2094         struct tevent_req *req;
2095         int ret;
2096         bool status;
2097
2098         if (h->readonly || ! h->updated) {
2099                 talloc_free(h);
2100                 return 0;
2101         }
2102
2103         req = ctdb_transaction_commit_send(h, h->ev, h);
2104         if (req == NULL) {
2105                 talloc_free(h);
2106                 return ENOMEM;
2107         }
2108
2109         tevent_req_poll(req, h->ev);
2110
2111         status = ctdb_transaction_commit_recv(req, &ret);
2112         if (! status) {
2113                 talloc_free(h);
2114                 return ret;
2115         }
2116
2117         talloc_free(h);
2118         return 0;
2119 }
2120
2121 int ctdb_transaction_cancel(struct ctdb_transaction_handle *h)
2122 {
2123         talloc_free(h);
2124         return 0;
2125 }
2126
2127 /*
2128  * TODO:
2129  *
2130  * In future Samba should register SERVER_ID.
2131  * Make that structure same as struct srvid {}.
2132  */