1086b0e14e64b0c08da10662d25f81e32f3249c1
[obnox/samba/samba-obnox.git] / ctdb / client / client_db.c
1 /*
2    CTDB client code
3
4    Copyright (C) Amitay Isaacs  2015
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "replace.h"
21 #include "system/network.h"
22 #include "system/filesys.h"
23
24 #include <talloc.h>
25 #include <tevent.h>
26 #include <tdb.h>
27
28 #include "common/logging.h"
29
30 #include "lib/tdb_wrap/tdb_wrap.h"
31 #include "lib/util/tevent_unix.h"
32 #include "lib/util/dlinklist.h"
33 #include "lib/util/debug.h"
34
35 #include "protocol/protocol.h"
36 #include "protocol/protocol_api.h"
37 #include "client/client_private.h"
38 #include "client/client.h"
39
40 static struct ctdb_db_context *client_db_handle(
41                                         struct ctdb_client_context *client,
42                                         const char *db_name)
43 {
44         struct ctdb_db_context *db;
45
46         for (db = client->db; db != NULL; db = db->next) {
47                 if (strcmp(db_name, db->db_name) == 0) {
48                         return db;
49                 }
50         }
51
52         return NULL;
53 }
54
55 struct ctdb_set_db_flags_state {
56         struct tevent_context *ev;
57         struct ctdb_client_context *client;
58         struct timeval timeout;
59         uint32_t db_id;
60         uint8_t db_flags;
61         bool readonly_done, sticky_done;
62         uint32_t *pnn_list;
63         int count;
64 };
65
66 static void ctdb_set_db_flags_nodemap_done(struct tevent_req *subreq);
67 static void ctdb_set_db_flags_readonly_done(struct tevent_req *subreq);
68 static void ctdb_set_db_flags_sticky_done(struct tevent_req *subreq);
69
70 static struct tevent_req *ctdb_set_db_flags_send(
71                                 TALLOC_CTX *mem_ctx,
72                                 struct tevent_context *ev,
73                                 struct ctdb_client_context *client,
74                                 uint32_t destnode, struct timeval timeout,
75                                 uint32_t db_id, uint8_t db_flags)
76 {
77         struct tevent_req *req, *subreq;
78         struct ctdb_set_db_flags_state *state;
79         struct ctdb_req_control request;
80
81         req = tevent_req_create(mem_ctx, &state,
82                                 struct ctdb_set_db_flags_state);
83         if (req == NULL) {
84                 return NULL;
85         }
86
87         if (! (db_flags & (CTDB_DB_FLAGS_READONLY | CTDB_DB_FLAGS_STICKY))) {
88                 tevent_req_done(req);
89                 return tevent_req_post(req, ev);
90         }
91
92         state->ev = ev;
93         state->client = client;
94         state->timeout = timeout;
95         state->db_id = db_id;
96         state->db_flags = db_flags;
97
98         ctdb_req_control_get_nodemap(&request);
99         subreq = ctdb_client_control_send(state, ev, client, destnode, timeout,
100                                           &request);
101         if (tevent_req_nomem(subreq, req)) {
102                 return tevent_req_post(req, ev);
103         }
104         tevent_req_set_callback(subreq, ctdb_set_db_flags_nodemap_done, req);
105
106         return req;
107 }
108
109 static void ctdb_set_db_flags_nodemap_done(struct tevent_req *subreq)
110 {
111         struct tevent_req *req = tevent_req_callback_data(
112                 subreq, struct tevent_req);
113         struct ctdb_set_db_flags_state *state = tevent_req_data(
114                 req, struct ctdb_set_db_flags_state);
115         struct ctdb_req_control request;
116         struct ctdb_reply_control *reply;
117         struct ctdb_node_map *nodemap;
118         int ret;
119         bool status;
120
121         status = ctdb_client_control_recv(subreq, &ret, state, &reply);
122         TALLOC_FREE(subreq);
123         if (! status) {
124                 tevent_req_error(req, ret);
125                 return;
126         }
127
128         ret = ctdb_reply_control_get_nodemap(reply, state, &nodemap);
129         talloc_free(reply);
130         if (ret != 0) {
131                 tevent_req_error(req, ret);
132                 return;
133         }
134
135         state->count = list_of_connected_nodes(nodemap, CTDB_UNKNOWN_PNN,
136                                                state, &state->pnn_list);
137         talloc_free(nodemap);
138         if (state->count <= 0) {
139                 tevent_req_error(req, ENOMEM);
140                 return;
141         }
142
143         if (state->db_flags & CTDB_DB_FLAGS_READONLY) {
144                 ctdb_req_control_set_db_readonly(&request, state->db_id);
145                 subreq = ctdb_client_control_multi_send(
146                                         state, state->ev, state->client,
147                                         state->pnn_list, state->count,
148                                         state->timeout, &request);
149                 if (tevent_req_nomem(subreq, req)) {
150                         return;
151                 }
152                 tevent_req_set_callback(subreq,
153                                         ctdb_set_db_flags_readonly_done, req);
154         } else {
155                 state->readonly_done = true;
156         }
157
158         if (state->db_flags & CTDB_DB_FLAGS_STICKY) {
159                 ctdb_req_control_set_db_sticky(&request, state->db_id);
160                 subreq = ctdb_client_control_multi_send(
161                                         state, state->ev, state->client,
162                                         state->pnn_list, state->count,
163                                         state->timeout, &request);
164                 if (tevent_req_nomem(subreq, req)) {
165                         return;
166                 }
167                 tevent_req_set_callback(subreq, ctdb_set_db_flags_sticky_done,
168                                         req);
169         } else {
170                 state->sticky_done = true;
171         }
172 }
173
174 static void ctdb_set_db_flags_readonly_done(struct tevent_req *subreq)
175 {
176         struct tevent_req *req = tevent_req_callback_data(
177                 subreq, struct tevent_req);
178         struct ctdb_set_db_flags_state *state = tevent_req_data(
179                 req, struct ctdb_set_db_flags_state);
180         int ret;
181         bool status;
182
183         status = ctdb_client_control_multi_recv(subreq, &ret, NULL, NULL,
184                                                 NULL);
185         TALLOC_FREE(subreq);
186         if (! status) {
187                 tevent_req_error(req, ret);
188                 return;
189         }
190
191         state->readonly_done = true;
192
193         if (state->readonly_done && state->sticky_done) {
194                 tevent_req_done(req);
195         }
196 }
197
198 static void ctdb_set_db_flags_sticky_done(struct tevent_req *subreq)
199 {
200         struct tevent_req *req = tevent_req_callback_data(
201                 subreq, struct tevent_req);
202         struct ctdb_set_db_flags_state *state = tevent_req_data(
203                 req, struct ctdb_set_db_flags_state);
204         int ret;
205         bool status;
206
207         status = ctdb_client_control_multi_recv(subreq, &ret, NULL, NULL,
208                                                 NULL);
209         TALLOC_FREE(subreq);
210         if (! status) {
211                 tevent_req_error(req, ret);
212                 return;
213         }
214
215         state->sticky_done = true;
216
217         if (state->readonly_done && state->sticky_done) {
218                 tevent_req_done(req);
219         }
220 }
221
222 static bool ctdb_set_db_flags_recv(struct tevent_req *req, int *perr)
223 {
224         int err;
225
226         if (tevent_req_is_unix_error(req, &err)) {
227                 if (perr != NULL) {
228                         *perr = err;
229                 }
230                 return false;
231         }
232         return true;
233 }
234
235 struct ctdb_attach_state {
236         struct tevent_context *ev;
237         struct ctdb_client_context *client;
238         struct timeval timeout;
239         uint32_t destnode;
240         uint8_t db_flags;
241         uint32_t tdb_flags;
242         struct ctdb_db_context *db;
243 };
244
245 static void ctdb_attach_mutex_done(struct tevent_req *subreq);
246 static void ctdb_attach_dbid_done(struct tevent_req *subreq);
247 static void ctdb_attach_dbpath_done(struct tevent_req *subreq);
248 static void ctdb_attach_health_done(struct tevent_req *subreq);
249 static void ctdb_attach_flags_done(struct tevent_req *subreq);
250
251 struct tevent_req *ctdb_attach_send(TALLOC_CTX *mem_ctx,
252                                     struct tevent_context *ev,
253                                     struct ctdb_client_context *client,
254                                     struct timeval timeout,
255                                     const char *db_name, uint8_t db_flags)
256 {
257         struct tevent_req *req, *subreq;
258         struct ctdb_attach_state *state;
259         struct ctdb_req_control request;
260
261         req = tevent_req_create(mem_ctx, &state, struct ctdb_attach_state);
262         if (req == NULL) {
263                 return NULL;
264         }
265
266         state->db = client_db_handle(client, db_name);
267         if (state->db != NULL) {
268                 tevent_req_done(req);
269                 return tevent_req_post(req, ev);
270         }
271
272         state->ev = ev;
273         state->client = client;
274         state->timeout = timeout;
275         state->destnode = ctdb_client_pnn(client);
276         state->db_flags = db_flags;
277
278         state->db = talloc_zero(client, struct ctdb_db_context);
279         if (tevent_req_nomem(state->db, req)) {
280                 return tevent_req_post(req, ev);
281         }
282
283         state->db->db_name = talloc_strdup(state->db, db_name);
284         if (tevent_req_nomem(state->db, req)) {
285                 return tevent_req_post(req, ev);
286         }
287
288         if (db_flags & CTDB_DB_FLAGS_PERSISTENT) {
289                 state->db->persistent = true;
290         }
291
292         ctdb_req_control_get_tunable(&request, "TDBMutexEnabled");
293         subreq = ctdb_client_control_send(state, ev, client,
294                                           ctdb_client_pnn(client), timeout,
295                                           &request);
296         if (tevent_req_nomem(subreq, req)) {
297                 return tevent_req_post(req, ev);
298         }
299         tevent_req_set_callback(subreq, ctdb_attach_mutex_done, req);
300
301         return req;
302 }
303
304 static void ctdb_attach_mutex_done(struct tevent_req *subreq)
305 {
306         struct tevent_req *req = tevent_req_callback_data(
307                 subreq, struct tevent_req);
308         struct ctdb_attach_state *state = tevent_req_data(
309                 req, struct ctdb_attach_state);
310         struct ctdb_reply_control *reply;
311         struct ctdb_req_control request;
312         uint32_t mutex_enabled;
313         int ret;
314         bool status;
315
316         status = ctdb_client_control_recv(subreq, &ret, state, &reply);
317         TALLOC_FREE(subreq);
318         if (! status) {
319                 tevent_req_error(req, ret);
320                 return;
321         }
322
323         ret = ctdb_reply_control_get_tunable(reply, &mutex_enabled);
324         if (ret != 0) {
325                 /* Treat error as mutex support not available */
326                 mutex_enabled = 0;
327         }
328
329         state->tdb_flags = TDB_DEFAULT;
330         if (! state->db->persistent) {
331                 state->tdb_flags |= (TDB_INCOMPATIBLE_HASH |
332                                      TDB_CLEAR_IF_FIRST);
333         }
334         if (mutex_enabled == 1) {
335                 state->tdb_flags |= TDB_MUTEX_LOCKING;
336         }
337
338         if (state->db->persistent) {
339                 ctdb_req_control_db_attach_persistent(&request,
340                                                       state->db->db_name,
341                                                       state->tdb_flags);
342         } else {
343                 ctdb_req_control_db_attach(&request, state->db->db_name,
344                                            state->tdb_flags);
345         }
346
347         subreq = ctdb_client_control_send(state, state->ev, state->client,
348                                           state->destnode, state->timeout,
349                                           &request);
350         if (tevent_req_nomem(subreq, req)) {
351                 return;
352         }
353         tevent_req_set_callback(subreq, ctdb_attach_dbid_done, req);
354 }
355
356 static void ctdb_attach_dbid_done(struct tevent_req *subreq)
357 {
358         struct tevent_req *req = tevent_req_callback_data(
359                 subreq, struct tevent_req);
360         struct ctdb_attach_state *state = tevent_req_data(
361                 req, struct ctdb_attach_state);
362         struct ctdb_req_control request;
363         struct ctdb_reply_control *reply;
364         bool status;
365         int ret;
366
367         status = ctdb_client_control_recv(subreq, &ret, state, &reply);
368         TALLOC_FREE(subreq);
369         if (! status) {
370                 tevent_req_error(req, ret);
371                 return;
372         }
373
374         if (state->db->persistent) {
375                 ret = ctdb_reply_control_db_attach_persistent(
376                                 reply, &state->db->db_id);
377         } else {
378                 ret = ctdb_reply_control_db_attach(reply, &state->db->db_id);
379         }
380         talloc_free(reply);
381         if (ret != 0) {
382                 tevent_req_error(req, ret);
383                 return;
384         }
385
386         ctdb_req_control_getdbpath(&request, state->db->db_id);
387         subreq = ctdb_client_control_send(state, state->ev, state->client,
388                                           state->destnode, state->timeout,
389                                           &request);
390         if (tevent_req_nomem(subreq, req)) {
391                 return;
392         }
393         tevent_req_set_callback(subreq, ctdb_attach_dbpath_done, req);
394 }
395
396 static void ctdb_attach_dbpath_done(struct tevent_req *subreq)
397 {
398         struct tevent_req *req = tevent_req_callback_data(
399                 subreq, struct tevent_req);
400         struct ctdb_attach_state *state = tevent_req_data(
401                 req, struct ctdb_attach_state);
402         struct ctdb_reply_control *reply;
403         struct ctdb_req_control request;
404         bool status;
405         int ret;
406
407         status = ctdb_client_control_recv(subreq, &ret, state, &reply);
408         TALLOC_FREE(subreq);
409         if (! status) {
410                 tevent_req_error(req, ret);
411                 return;
412         }
413
414         ret = ctdb_reply_control_getdbpath(reply, state->db,
415                                            &state->db->db_path);
416         talloc_free(reply);
417         if (ret != 0) {
418                 tevent_req_error(req, ret);
419                 return;
420         }
421
422         ctdb_req_control_db_get_health(&request, state->db->db_id);
423         subreq = ctdb_client_control_send(state, state->ev, state->client,
424                                           state->destnode, state->timeout,
425                                           &request);
426         if (tevent_req_nomem(subreq, req)) {
427                 return;
428         }
429         tevent_req_set_callback(subreq, ctdb_attach_health_done, req);
430 }
431
432 static void ctdb_attach_health_done(struct tevent_req *subreq)
433 {
434         struct tevent_req *req = tevent_req_callback_data(
435                 subreq, struct tevent_req);
436         struct ctdb_attach_state *state = tevent_req_data(
437                 req, struct ctdb_attach_state);
438         struct ctdb_reply_control *reply;
439         const char *reason;
440         bool status;
441         int ret;
442
443         status = ctdb_client_control_recv(subreq, &ret, state, &reply);
444         TALLOC_FREE(subreq);
445         if (! status) {
446                 tevent_req_error(req, ret);
447                 return;
448         }
449
450         ret = ctdb_reply_control_db_get_health(reply, state, &reason);
451         if (ret != 0) {
452                 tevent_req_error(req, ret);
453                 return;
454         }
455
456         if (reason != NULL) {
457                 /* Database unhealthy, avoid attach */
458                 /* FIXME: Log here */
459                 tevent_req_error(req, EIO);
460                 return;
461         }
462
463         subreq = ctdb_set_db_flags_send(state, state->ev, state->client,
464                                         state->destnode, state->timeout,
465                                         state->db->db_id, state->db_flags);
466         if (tevent_req_nomem(subreq, req)) {
467                 return;
468         }
469         tevent_req_set_callback(subreq, ctdb_attach_flags_done, req);
470 }
471
472 static void ctdb_attach_flags_done(struct tevent_req *subreq)
473 {
474         struct tevent_req *req = tevent_req_callback_data(
475                 subreq, struct tevent_req);
476         struct ctdb_attach_state *state = tevent_req_data(
477                 req, struct ctdb_attach_state);
478         bool status;
479         int ret;
480
481         status = ctdb_set_db_flags_recv(subreq, &ret);
482         TALLOC_FREE(subreq);
483         if (! status) {
484                 tevent_req_error(req, ret);
485                 return;
486         }
487
488         state->db->ltdb = tdb_wrap_open(state->db, state->db->db_path, 0,
489                                         state->tdb_flags, O_RDWR, 0);
490         if (tevent_req_nomem(state->db->ltdb, req)) {
491                 return;
492         }
493         DLIST_ADD(state->client->db, state->db);
494
495         tevent_req_done(req);
496 }
497
498 bool ctdb_attach_recv(struct tevent_req *req, int *perr,
499                       struct ctdb_db_context **out)
500 {
501         struct ctdb_attach_state *state = tevent_req_data(
502                 req, struct ctdb_attach_state);
503         int err;
504
505         if (tevent_req_is_unix_error(req, &err)) {
506                 if (perr != NULL) {
507                         *perr = err;
508                 }
509                 return false;
510         }
511
512         if (out != NULL) {
513                 *out = state->db;
514         }
515         return true;
516 }
517
518 int ctdb_attach(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
519                 struct ctdb_client_context *client,
520                 struct timeval timeout,
521                 const char *db_name, uint8_t db_flags,
522                 struct ctdb_db_context **out)
523 {
524         struct tevent_req *req;
525         bool status;
526         int ret;
527
528         req = ctdb_attach_send(mem_ctx, ev, client, timeout,
529                                db_name, db_flags);
530         if (req == NULL) {
531                 return ENOMEM;
532         }
533
534         tevent_req_poll(req, ev);
535
536         status = ctdb_attach_recv(req, &ret, out);
537         if (! status) {
538                 return ret;
539         }
540
541         /*
542         ctdb_set_call(db, CTDB_NULL_FUNC, ctdb_null_func);
543         ctdb_set_call(db, CTDB_FETCH_FUNC, ctdb_fetch_func);
544         ctdb_set_call(db, CTDB_FETCH_WITH_HEADER_FUNC, ctdb_fetch_with_header_func);
545         */
546
547         return 0;
548 }
549
550 int ctdb_detach(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
551                 struct ctdb_client_context *client,
552                 struct timeval timeout, uint32_t db_id)
553 {
554         struct ctdb_db_context *db;
555         int ret;
556
557         ret = ctdb_ctrl_db_detach(mem_ctx, ev, client, client->pnn, timeout,
558                                   db_id);
559         if (ret != 0) {
560                 return ret;
561         }
562
563         for (db = client->db; db != NULL; db = db->next) {
564                 if (db->db_id == db_id) {
565                         DLIST_REMOVE(client->db, db);
566                         break;
567                 }
568         }
569
570         return 0;
571 }
572
573 uint32_t ctdb_db_id(struct ctdb_db_context *db)
574 {
575         return db->db_id;
576 }
577
578 struct ctdb_db_traverse_state {
579         ctdb_rec_parser_func_t parser;
580         void *private_data;
581         bool extract_header;
582         int error;
583 };
584
585 static int ctdb_db_traverse_handler(struct tdb_context *tdb, TDB_DATA key,
586                                     TDB_DATA data, void *private_data)
587 {
588         struct ctdb_db_traverse_state *state =
589                 (struct ctdb_db_traverse_state *)private_data;
590         int ret;
591
592         if (state->extract_header) {
593                 struct ctdb_ltdb_header header;
594
595                 ret = ctdb_ltdb_header_extract(&data, &header);
596                 if (ret != 0) {
597                         state->error = ret;
598                         return 1;
599                 }
600
601                 ret = state->parser(0, &header, key, data, state->private_data);
602         } else {
603                 ret = state->parser(0, NULL, key, data, state->private_data);
604         }
605
606         if (ret != 0) {
607                 state->error = ret;
608                 return 1;
609         }
610
611         return 0;
612 }
613
614 int ctdb_db_traverse(struct ctdb_db_context *db, bool readonly,
615                      bool extract_header,
616                      ctdb_rec_parser_func_t parser, void *private_data)
617 {
618         struct ctdb_db_traverse_state state;
619         int ret;
620
621         state.parser = parser;
622         state.private_data = private_data;
623         state.extract_header = extract_header;
624         state.error = 0;
625
626         if (readonly) {
627                 ret = tdb_traverse_read(db->ltdb->tdb,
628                                         ctdb_db_traverse_handler, &state);
629         } else {
630                 ret = tdb_traverse(db->ltdb->tdb,
631                                    ctdb_db_traverse_handler, &state);
632         }
633
634         if (ret == -1) {
635                 return EIO;
636         }
637
638         return state.error;
639 }
640
641 static int ctdb_ltdb_fetch(struct ctdb_db_context *db, TDB_DATA key,
642                            struct ctdb_ltdb_header *header,
643                            TALLOC_CTX *mem_ctx, TDB_DATA *data)
644 {
645         TDB_DATA rec;
646         int ret;
647
648         rec = tdb_fetch(db->ltdb->tdb, key);
649         if (rec.dsize < sizeof(struct ctdb_ltdb_header)) {
650                 /* No record present */
651                 if (rec.dptr != NULL) {
652                         free(rec.dptr);
653                 }
654
655                 if (tdb_error(db->ltdb->tdb) != TDB_ERR_NOEXIST) {
656                         return EIO;
657                 }
658
659                 header->rsn = 0;
660                 header->dmaster = CTDB_UNKNOWN_PNN;
661                 header->flags = 0;
662
663                 if (data != NULL) {
664                         *data = tdb_null;
665                 }
666                 return 0;
667         }
668
669         ret = ctdb_ltdb_header_pull(rec.dptr, rec.dsize, header);
670         if (ret != 0) {
671                 return ret;
672         }
673
674         ret = 0;
675         if (data != NULL) {
676                 size_t offset = ctdb_ltdb_header_len(header);
677
678                 data->dsize = rec.dsize - offset;
679                 data->dptr = talloc_memdup(mem_ctx, rec.dptr + offset,
680                                            data->dsize);
681                 if (data->dptr == NULL) {
682                         ret = ENOMEM;
683                 }
684         }
685
686         free(rec.dptr);
687         return ret;
688 }
689
690 /*
691  * Fetch a record from volatile database
692  *
693  * Steps:
694  *  1. Get a lock on the hash chain
695  *  2. If the record does not exist, migrate the record
696  *  3. If readonly=true and delegations do not exist, migrate the record.
697  *  4. If readonly=false and delegations exist, migrate the record.
698  *  5. If the local node is not dmaster, migrate the record.
699  *  6. Return record
700  */
701
702 struct ctdb_fetch_lock_state {
703         struct tevent_context *ev;
704         struct ctdb_client_context *client;
705         struct ctdb_record_handle *h;
706         bool readonly;
707         uint32_t pnn;
708 };
709
710 static int ctdb_fetch_lock_check(struct tevent_req *req);
711 static void ctdb_fetch_lock_migrate(struct tevent_req *req);
712 static void ctdb_fetch_lock_migrate_done(struct tevent_req *subreq);
713
714 struct tevent_req *ctdb_fetch_lock_send(TALLOC_CTX *mem_ctx,
715                                         struct tevent_context *ev,
716                                         struct ctdb_client_context *client,
717                                         struct ctdb_db_context *db,
718                                         TDB_DATA key, bool readonly)
719 {
720         struct ctdb_fetch_lock_state *state;
721         struct tevent_req *req;
722         int ret;
723
724         req = tevent_req_create(mem_ctx, &state, struct ctdb_fetch_lock_state);
725         if (req == NULL) {
726                 return NULL;
727         }
728
729         state->ev = ev;
730         state->client = client;
731
732         state->h = talloc_zero(db, struct ctdb_record_handle);
733         if (tevent_req_nomem(state->h, req)) {
734                 return tevent_req_post(req, ev);
735         }
736         state->h->client = client;
737         state->h->db = db;
738         state->h->key.dptr = talloc_memdup(state->h, key.dptr, key.dsize);
739         if (tevent_req_nomem(state->h->key.dptr, req)) {
740                 return tevent_req_post(req, ev);
741         }
742         state->h->key.dsize = key.dsize;
743         state->h->readonly = false;
744
745         state->readonly = readonly;
746         state->pnn = ctdb_client_pnn(client);
747
748         /* Check that database is not persistent */
749         if (db->persistent) {
750                 tevent_req_error(req, EINVAL);
751                 return tevent_req_post(req, ev);
752         }
753
754         ret = ctdb_fetch_lock_check(req);
755         if (ret == 0) {
756                 tevent_req_done(req);
757                 return tevent_req_post(req, ev);
758         }
759         if (ret != EAGAIN) {
760                 tevent_req_error(req, ret);
761                 return tevent_req_post(req, ev);
762         }
763         return req;
764 }
765
766 static int ctdb_fetch_lock_check(struct tevent_req *req)
767 {
768         struct ctdb_fetch_lock_state *state = tevent_req_data(
769                 req, struct ctdb_fetch_lock_state);
770         struct ctdb_record_handle *h = state->h;
771         struct ctdb_ltdb_header header;
772         TDB_DATA data = tdb_null;
773         int ret, err = 0;
774         bool do_migrate = false;
775
776         ret = tdb_chainlock(state->h->db->ltdb->tdb, state->h->key);
777         if (ret != 0) {
778                 err = EIO;
779                 goto failed;
780         }
781
782         data = tdb_fetch(h->db->ltdb->tdb, h->key);
783         if (data.dptr == NULL) {
784                 if (tdb_error(h->db->ltdb->tdb) == TDB_ERR_NOEXIST) {
785                         goto migrate;
786                 } else {
787                         err = EIO;
788                         goto failed;
789                 }
790         }
791
792         /* Got the record */
793         ret = ctdb_ltdb_header_pull(data.dptr, data.dsize, &header);
794         if (ret != 0) {
795                 err = ret;
796                 goto failed;
797         }
798
799         if (! state->readonly) {
800                 /* Read/write access */
801                 if (header.dmaster == state->pnn &&
802                     header.flags & CTDB_REC_RO_HAVE_DELEGATIONS) {
803                         goto migrate;
804                 }
805
806                 if (header.dmaster != state->pnn) {
807                         goto migrate;
808                 }
809         } else {
810                 /* Readonly access */
811                 if (header.dmaster != state->pnn &&
812                     ! (header.flags & (CTDB_REC_RO_HAVE_READONLY |
813                                        CTDB_REC_RO_HAVE_DELEGATIONS))) {
814                         goto migrate;
815                 }
816         }
817
818         /* We are the dmaster or readonly delegation */
819         h->header = header;
820         h->data = data;
821         if (header.flags & (CTDB_REC_RO_HAVE_READONLY |
822                             CTDB_REC_RO_HAVE_DELEGATIONS)) {
823                 h->readonly = true;
824         }
825         return 0;
826
827 migrate:
828         do_migrate = true;
829         err = EAGAIN;
830
831 failed:
832         if (data.dptr != NULL) {
833                 free(data.dptr);
834         }
835         ret = tdb_chainunlock(h->db->ltdb->tdb, h->key);
836         if (ret != 0) {
837                 DEBUG(DEBUG_ERR, ("tdb_chainunlock failed on %s\n",
838                                   h->db->db_name));
839                 return EIO;
840         }
841
842         if (do_migrate) {
843                 ctdb_fetch_lock_migrate(req);
844         }
845         return err;
846 }
847
848 static void ctdb_fetch_lock_migrate(struct tevent_req *req)
849 {
850         struct ctdb_fetch_lock_state *state = tevent_req_data(
851                 req, struct ctdb_fetch_lock_state);
852         struct ctdb_req_call request;
853         struct tevent_req *subreq;
854
855         ZERO_STRUCT(request);
856         request.flags = CTDB_IMMEDIATE_MIGRATION;
857         if (state->readonly) {
858                 request.flags |= CTDB_WANT_READONLY;
859         }
860         request.db_id = state->h->db->db_id;
861         request.callid = CTDB_NULL_FUNC;
862         request.key = state->h->key;
863
864         subreq = ctdb_client_call_send(state, state->ev, state->client,
865                                        &request);
866         if (tevent_req_nomem(subreq, req)) {
867                 return;
868         }
869
870         tevent_req_set_callback(subreq, ctdb_fetch_lock_migrate_done, req);
871 }
872
873 static void ctdb_fetch_lock_migrate_done(struct tevent_req *subreq)
874 {
875         struct tevent_req *req = tevent_req_callback_data(
876                 subreq, struct tevent_req);
877         struct ctdb_fetch_lock_state *state = tevent_req_data(
878                 req, struct ctdb_fetch_lock_state);
879         struct ctdb_reply_call *reply;
880         int ret;
881         bool status;
882
883         status = ctdb_client_call_recv(subreq, state, &reply, &ret);
884         TALLOC_FREE(subreq);
885         if (! status) {
886                 tevent_req_error(req, ret);
887                 return;
888         }
889
890         if (reply->status != 0) {
891                 tevent_req_error(req, EIO);
892                 return;
893         }
894         talloc_free(reply);
895
896         ret = ctdb_fetch_lock_check(req);
897         if (ret != 0) {
898                 tevent_req_error(req, ret);
899                 return;
900         }
901
902         tevent_req_done(req);
903 }
904
905 static int ctdb_record_handle_destructor(struct ctdb_record_handle *h)
906 {
907         tdb_chainunlock(h->db->ltdb->tdb, h->key);
908         free(h->data.dptr);
909         return 0;
910 }
911
912 struct ctdb_record_handle *ctdb_fetch_lock_recv(struct tevent_req *req,
913                                                 struct ctdb_ltdb_header *header,
914                                                 TALLOC_CTX *mem_ctx,
915                                                 TDB_DATA *data, int *perr)
916 {
917         struct ctdb_fetch_lock_state *state = tevent_req_data(
918                 req, struct ctdb_fetch_lock_state);
919         struct ctdb_record_handle *h = state->h;
920         int err;
921
922         if (tevent_req_is_unix_error(req, &err)) {
923                 if (perr != NULL) {
924                         *perr = err;
925                 }
926                 return NULL;
927         }
928
929         if (header != NULL) {
930                 *header = h->header;
931         }
932         if (data != NULL) {
933                 size_t offset;
934
935                 offset = ctdb_ltdb_header_len(&h->header);
936
937                 data->dsize = h->data.dsize - offset;
938                 data->dptr = talloc_memdup(mem_ctx, h->data.dptr + offset,
939                                            data->dsize);
940                 if (data->dptr == NULL) {
941                         TALLOC_FREE(state->h);
942                         if (perr != NULL) {
943                                 *perr = ENOMEM;
944                         }
945                         return NULL;
946                 }
947         }
948
949         talloc_set_destructor(h, ctdb_record_handle_destructor);
950         return h;
951 }
952
953 int ctdb_fetch_lock(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
954                     struct ctdb_client_context *client,
955                     struct ctdb_db_context *db, TDB_DATA key, bool readonly,
956                     struct ctdb_record_handle **out,
957                     struct ctdb_ltdb_header *header, TDB_DATA *data)
958 {
959         struct tevent_req *req;
960         struct ctdb_record_handle *h;
961         int ret;
962
963         req = ctdb_fetch_lock_send(mem_ctx, ev, client, db, key, readonly);
964         if (req == NULL) {
965                 return ENOMEM;
966         }
967
968         tevent_req_poll(req, ev);
969
970         h = ctdb_fetch_lock_recv(req, header, mem_ctx, data, &ret);
971         if (h == NULL) {
972                 return ret;
973         }
974
975         *out = h;
976         return 0;
977 }
978
979 int ctdb_store_record(struct ctdb_record_handle *h, TDB_DATA data)
980 {
981         TDB_DATA rec;
982         size_t offset;
983         int ret;
984
985         /* Cannot modify the record if it was obtained as a readonly copy */
986         if (h->readonly) {
987                 return EINVAL;
988         }
989
990         /* Check if the new data is same */
991         if (h->data.dsize == data.dsize &&
992             memcmp(h->data.dptr, data.dptr, data.dsize) == 0) {
993                 /* No need to do anything */
994                 return 0;
995         }
996
997         offset = ctdb_ltdb_header_len(&h->header);
998         rec.dsize = offset + data.dsize;
999         rec.dptr = talloc_size(h, rec.dsize);
1000         if (rec.dptr == NULL) {
1001                 return ENOMEM;
1002         }
1003
1004         ctdb_ltdb_header_push(&h->header, rec.dptr);
1005         memcpy(rec.dptr + offset, data.dptr, data.dsize);
1006
1007         ret = tdb_store(h->db->ltdb->tdb, h->key, rec, TDB_REPLACE);
1008         if (ret != 0) {
1009                 DEBUG(DEBUG_ERR, ("Failed to store record in DB %s\n",
1010                                   h->db->db_name));
1011                 return EIO;
1012         }
1013
1014         talloc_free(rec.dptr);
1015         return 0;
1016 }
1017
1018 int ctdb_delete_record(struct ctdb_record_handle *h)
1019 {
1020         TDB_DATA rec;
1021         struct ctdb_key_data key;
1022         int ret;
1023
1024         /* Cannot delete the record if it was obtained as a readonly copy */
1025         if (h->readonly) {
1026                 return EINVAL;
1027         }
1028
1029         rec.dsize = ctdb_ltdb_header_len(&h->header);
1030         rec.dptr = talloc_size(h, rec.dsize);
1031         if (rec.dptr == NULL) {
1032                 return ENOMEM;
1033         }
1034
1035         ctdb_ltdb_header_push(&h->header, rec.dptr);
1036
1037         ret = tdb_store(h->db->ltdb->tdb, h->key, rec, TDB_REPLACE);
1038         talloc_free(rec.dptr);
1039         if (ret != 0) {
1040                 DEBUG(DEBUG_ERR, ("Failed to delete record in DB %s\n",
1041                                   h->db->db_name));
1042                 return EIO;
1043         }
1044
1045         key.db_id = h->db->db_id;
1046         key.header = h->header;
1047         key.key = h->key;
1048
1049         ret = ctdb_ctrl_schedule_for_deletion(h, h->ev, h->client,
1050                                               h->client->pnn,
1051                                               tevent_timeval_zero(), &key);
1052         if (ret != 0) {
1053                 DEBUG(DEBUG_WARNING,
1054                       ("Failed to mark record to be deleted in DB %s\n",
1055                        h->db->db_name));
1056                 return ret;
1057         }
1058
1059         return 0;
1060 }
1061
1062 /*
1063  * Global lock functions
1064  */
1065
1066 struct ctdb_g_lock_lock_state {
1067         struct tevent_context *ev;
1068         struct ctdb_client_context *client;
1069         struct ctdb_db_context *db;
1070         TDB_DATA key;
1071         struct ctdb_server_id my_sid;
1072         enum ctdb_g_lock_type lock_type;
1073         struct ctdb_record_handle *h;
1074         /* state for verification of active locks */
1075         struct ctdb_g_lock_list *lock_list;
1076         unsigned int current;
1077 };
1078
1079 static void ctdb_g_lock_lock_fetched(struct tevent_req *subreq);
1080 static void ctdb_g_lock_lock_process_locks(struct tevent_req *req);
1081 static void ctdb_g_lock_lock_checked(struct tevent_req *subreq);
1082 static int ctdb_g_lock_lock_update(struct tevent_req *req);
1083 static void ctdb_g_lock_lock_retry(struct tevent_req *subreq);
1084
1085 static bool ctdb_g_lock_conflicts(enum ctdb_g_lock_type l1,
1086                                   enum ctdb_g_lock_type l2)
1087 {
1088         if ((l1 == CTDB_G_LOCK_READ) && (l2 == CTDB_G_LOCK_READ)) {
1089                 return false;
1090         }
1091         return true;
1092 }
1093
1094 struct tevent_req *ctdb_g_lock_lock_send(TALLOC_CTX *mem_ctx,
1095                                          struct tevent_context *ev,
1096                                          struct ctdb_client_context *client,
1097                                          struct ctdb_db_context *db,
1098                                          const char *keyname,
1099                                          struct ctdb_server_id *sid,
1100                                          bool readonly)
1101 {
1102         struct tevent_req *req, *subreq;
1103         struct ctdb_g_lock_lock_state *state;
1104
1105         req = tevent_req_create(mem_ctx, &state,
1106                                 struct ctdb_g_lock_lock_state);
1107         if (req == NULL) {
1108                 return NULL;
1109         }
1110
1111         state->ev = ev;
1112         state->client = client;
1113         state->db = db;
1114         state->key.dptr = discard_const(keyname);
1115         state->key.dsize = strlen(keyname) + 1;
1116         state->my_sid = *sid;
1117         state->lock_type = (readonly ? CTDB_G_LOCK_READ : CTDB_G_LOCK_WRITE);
1118
1119         subreq = ctdb_fetch_lock_send(state, ev, client, db, state->key,
1120                                       false);
1121         if (tevent_req_nomem(subreq, req)) {
1122                 return tevent_req_post(req, ev);
1123         }
1124         tevent_req_set_callback(subreq, ctdb_g_lock_lock_fetched, req);
1125
1126         return req;
1127 }
1128
1129 static void ctdb_g_lock_lock_fetched(struct tevent_req *subreq)
1130 {
1131         struct tevent_req *req = tevent_req_callback_data(
1132                 subreq, struct tevent_req);
1133         struct ctdb_g_lock_lock_state *state = tevent_req_data(
1134                 req, struct ctdb_g_lock_lock_state);
1135         TDB_DATA data;
1136         int ret = 0;
1137
1138         state->h = ctdb_fetch_lock_recv(subreq, NULL, state, &data, &ret);
1139         TALLOC_FREE(subreq);
1140         if (state->h == NULL) {
1141                 tevent_req_error(req, ret);
1142                 return;
1143         }
1144
1145         if (state->lock_list != NULL) {
1146                 TALLOC_FREE(state->lock_list);
1147                 state->current = 0;
1148         }
1149
1150         ret = ctdb_g_lock_list_pull(data.dptr, data.dsize, state,
1151                                     &state->lock_list);
1152         talloc_free(data.dptr);
1153         if (ret != 0) {
1154                 tevent_req_error(req, ret);
1155                 return;
1156         }
1157
1158         ctdb_g_lock_lock_process_locks(req);
1159 }
1160
1161 static void ctdb_g_lock_lock_process_locks(struct tevent_req *req)
1162 {
1163         struct ctdb_g_lock_lock_state *state = tevent_req_data(
1164                 req, struct ctdb_g_lock_lock_state);
1165         struct tevent_req *subreq;
1166         struct ctdb_g_lock *lock;
1167         bool check_server = false;
1168         int ret;
1169
1170         while (state->current < state->lock_list->num) {
1171                 lock = &state->lock_list->lock[state->current];
1172
1173                 /* We should not ask for the same lock more than once */
1174                 if (ctdb_server_id_equal(&lock->sid, &state->my_sid)) {
1175                         tevent_req_error(req, EDEADLK);
1176                         return;
1177                 }
1178
1179                 if (ctdb_g_lock_conflicts(lock->type, state->lock_type)) {
1180                         check_server = true;
1181                         break;
1182                 }
1183
1184                 state->current += 1;
1185         }
1186
1187         if (check_server) {
1188                 struct ctdb_req_control request;
1189                 struct ctdb_uint64_array u64_array;
1190
1191                 u64_array.num = 1;
1192                 u64_array.val = &lock->sid.unique_id;
1193
1194                 ctdb_req_control_check_srvids(&request, &u64_array);
1195                 subreq = ctdb_client_control_send(state, state->ev,
1196                                                   state->client,
1197                                                   state->client->pnn,
1198                                                   tevent_timeval_zero(),
1199                                                   &request);
1200                 if (tevent_req_nomem(subreq, req)) {
1201                         return;
1202                 }
1203                 tevent_req_set_callback(subreq, ctdb_g_lock_lock_checked, req);
1204                 return;
1205         }
1206
1207         /* There is no conflict, add ourself to the lock_list */
1208         state->lock_list->lock = talloc_realloc(state->lock_list,
1209                                                 state->lock_list->lock,
1210                                                 struct ctdb_g_lock,
1211                                                 state->lock_list->num + 1);
1212         if (state->lock_list->lock == NULL) {
1213                 tevent_req_error(req, ENOMEM);
1214                 return;
1215         }
1216
1217         lock = &state->lock_list->lock[state->lock_list->num];
1218         lock->type = state->lock_type;
1219         lock->sid = state->my_sid;
1220         state->lock_list->num += 1;
1221
1222         ret = ctdb_g_lock_lock_update(req);
1223         if (ret != 0) {
1224                 tevent_req_error(req, ret);
1225                 return;
1226         }
1227
1228         tevent_req_done(req);
1229 }
1230
1231 static void ctdb_g_lock_lock_checked(struct tevent_req *subreq)
1232 {
1233         struct tevent_req *req = tevent_req_callback_data(
1234                 subreq, struct tevent_req);
1235         struct ctdb_g_lock_lock_state *state = tevent_req_data(
1236                 req, struct ctdb_g_lock_lock_state);
1237         struct ctdb_reply_control *reply;
1238         struct ctdb_uint8_array *u8_array;
1239         int ret;
1240         bool status;
1241         int8_t val;
1242
1243         status = ctdb_client_control_recv(subreq, &ret, state, &reply);
1244         TALLOC_FREE(subreq);
1245         if (! status) {
1246                 tevent_req_error(req, ret);
1247                 return;
1248         }
1249
1250         ret = ctdb_reply_control_check_srvids(reply, state, &u8_array);
1251         if (ret != 0) {
1252                 tevent_req_error(req, ENOMEM);
1253                 return;
1254         }
1255
1256         if (u8_array->num != 1) {
1257                 talloc_free(u8_array);
1258                 tevent_req_error(req, EIO);
1259                 return;
1260         }
1261
1262         val = u8_array->val[0];
1263         talloc_free(u8_array);
1264
1265         if (val == 1) {
1266                 /* server process exists, need to retry */
1267                 subreq = tevent_wakeup_send(state, state->ev,
1268                                             tevent_timeval_current_ofs(1,0));
1269                 if (tevent_req_nomem(subreq, req)) {
1270                         return;
1271                 }
1272                 tevent_req_set_callback(subreq, ctdb_g_lock_lock_retry, req);
1273                 return;
1274         }
1275
1276         /* server process does not exist, remove conflicting entry */
1277         state->lock_list->lock[state->current] =
1278                 state->lock_list->lock[state->lock_list->num-1];
1279         state->lock_list->num -= 1;
1280
1281         ret = ctdb_g_lock_lock_update(req);
1282         if (ret != 0) {
1283                 tevent_req_error(req, ret);
1284                 return;
1285         }
1286
1287         ctdb_g_lock_lock_process_locks(req);
1288 }
1289
1290 static int ctdb_g_lock_lock_update(struct tevent_req *req)
1291 {
1292         struct ctdb_g_lock_lock_state *state = tevent_req_data(
1293                 req, struct ctdb_g_lock_lock_state);
1294         TDB_DATA data;
1295         int ret;
1296
1297         data.dsize = ctdb_g_lock_list_len(state->lock_list);
1298         data.dptr = talloc_size(state, data.dsize);
1299         if (data.dptr == NULL) {
1300                 return ENOMEM;
1301         }
1302
1303         ctdb_g_lock_list_push(state->lock_list, data.dptr);
1304         ret = ctdb_store_record(state->h, data);
1305         talloc_free(data.dptr);
1306         return ret;
1307 }
1308
1309 #if 0
1310 static int ctdb_g_lock_lock_update(struct ctdb_g_lock_lock_state *state,
1311                                    struct ctdb_g_lock_list *lock_list,
1312                                    struct ctdb_record_handle *h)
1313 {
1314         struct ctdb_g_lock *lock;
1315         bool conflict = false;
1316         bool modified = false;
1317         int ret, i;
1318
1319         for (i=0; i<lock_list->num; i++) {
1320                 lock = &lock_list->lock[i];
1321
1322                 /* We should not ask for lock more than once */
1323                 if (ctdb_server_id_equal(&lock->sid, &state->my_sid)) {
1324                         return EDEADLK;
1325                 }
1326
1327                 if (ctdb_g_lock_conflicts(lock->type, state->lock_type)) {
1328                         bool exists;
1329
1330                         conflict = true;
1331                         ret = ctdb_server_id_exists(state->client, &lock->sid,
1332                                                     &exists);
1333                         if (ret != 0) {
1334                                 return ret;
1335                         }
1336
1337                         if (exists) {
1338                                 break;
1339                         }
1340
1341                         /* Server does not exist, delete conflicting entry */
1342                         lock_list->lock[i] = lock_list->lock[lock_list->num-1];
1343                         lock_list->num -= 1;
1344                         modified = true;
1345                 }
1346         }
1347
1348         if (! conflict) {
1349                 lock = talloc_realloc(lock_list, lock_list->lock,
1350                                       struct ctdb_g_lock, lock_list->num+1);
1351                 if (lock == NULL) {
1352                         return ENOMEM;
1353                 }
1354
1355                 lock[lock_list->num].type = state->lock_type;
1356                 lock[lock_list->num].sid = state->my_sid;
1357                 lock_list->lock = lock;
1358                 lock_list->num += 1;
1359                 modified = true;
1360         }
1361
1362         if (modified) {
1363                 TDB_DATA data;
1364
1365                 data.dsize = ctdb_g_lock_list_len(lock_list);
1366                 data.dptr = talloc_size(state, data.dsize);
1367                 if (data.dptr == NULL) {
1368                         return ENOMEM;
1369                 }
1370
1371                 ctdb_g_lock_list_push(lock_list, data.dptr);
1372                 ret = ctdb_store_record(h, data);
1373                 talloc_free(data.dptr);
1374                 if (ret != 0) {
1375                         return ret;
1376                 }
1377         }
1378
1379         if (conflict) {
1380                 return EAGAIN;
1381         }
1382         return 0;
1383 }
1384 #endif
1385
1386 static void ctdb_g_lock_lock_retry(struct tevent_req *subreq)
1387 {
1388         struct tevent_req *req = tevent_req_callback_data(
1389                 subreq, struct tevent_req);
1390         struct ctdb_g_lock_lock_state *state = tevent_req_data(
1391                 req, struct ctdb_g_lock_lock_state);
1392         bool success;
1393
1394         success = tevent_wakeup_recv(subreq);
1395         TALLOC_FREE(subreq);
1396         if (! success) {
1397                 tevent_req_error(req, ENOMEM);
1398                 return;
1399         }
1400
1401         subreq = ctdb_fetch_lock_send(state, state->ev, state->client,
1402                                       state->db, state->key, false);
1403         if (tevent_req_nomem(subreq, req)) {
1404                 return;
1405         }
1406         tevent_req_set_callback(subreq, ctdb_g_lock_lock_fetched, req);
1407 }
1408
1409 bool ctdb_g_lock_lock_recv(struct tevent_req *req, int *perr)
1410 {
1411         struct ctdb_g_lock_lock_state *state = tevent_req_data(
1412                 req, struct ctdb_g_lock_lock_state);
1413         int err;
1414
1415         TALLOC_FREE(state->h);
1416
1417         if (tevent_req_is_unix_error(req, &err)) {
1418                 if (perr != NULL) {
1419                         *perr = err;
1420                 }
1421                 return false;
1422         }
1423
1424         return true;
1425 }
1426
1427 struct ctdb_g_lock_unlock_state {
1428         struct tevent_context *ev;
1429         struct ctdb_client_context *client;
1430         struct ctdb_db_context *db;
1431         TDB_DATA key;
1432         struct ctdb_server_id my_sid;
1433         struct ctdb_record_handle *h;
1434         struct ctdb_g_lock_list *lock_list;
1435 };
1436
1437 static void ctdb_g_lock_unlock_fetched(struct tevent_req *subreq);
1438 static int ctdb_g_lock_unlock_update(struct tevent_req *req);
1439
1440 struct tevent_req *ctdb_g_lock_unlock_send(TALLOC_CTX *mem_ctx,
1441                                            struct tevent_context *ev,
1442                                            struct ctdb_client_context *client,
1443                                            struct ctdb_db_context *db,
1444                                            const char *keyname,
1445                                            struct ctdb_server_id sid)
1446 {
1447         struct tevent_req *req, *subreq;
1448         struct ctdb_g_lock_unlock_state *state;
1449
1450         req = tevent_req_create(mem_ctx, &state,
1451                                 struct ctdb_g_lock_unlock_state);
1452         if (req == NULL) {
1453                 return NULL;
1454         }
1455
1456         state->ev = ev;
1457         state->client = client;
1458         state->db = db;
1459         state->key.dptr = discard_const(keyname);
1460         state->key.dsize = strlen(keyname) + 1;
1461         state->my_sid = sid;
1462
1463         subreq = ctdb_fetch_lock_send(state, ev, client, db, state->key,
1464                                       false);
1465         if (tevent_req_nomem(subreq, req)) {
1466                 return tevent_req_post(req, ev);
1467         }
1468         tevent_req_set_callback(subreq, ctdb_g_lock_unlock_fetched, req);
1469
1470         return req;
1471 }
1472
1473 static void ctdb_g_lock_unlock_fetched(struct tevent_req *subreq)
1474 {
1475         struct tevent_req *req = tevent_req_callback_data(
1476                 subreq, struct tevent_req);
1477         struct ctdb_g_lock_unlock_state *state = tevent_req_data(
1478                 req, struct ctdb_g_lock_unlock_state);
1479         TDB_DATA data;
1480         int ret = 0;
1481
1482         state->h = ctdb_fetch_lock_recv(subreq, NULL, state, &data, &ret);
1483         TALLOC_FREE(subreq);
1484         if (state->h == NULL) {
1485                 tevent_req_error(req, ret);
1486                 return;
1487         }
1488
1489         ret = ctdb_g_lock_list_pull(data.dptr, data.dsize, state,
1490                                     &state->lock_list);
1491         if (ret != 0) {
1492                 tevent_req_error(req, ret);
1493                 return;
1494         }
1495
1496         ret = ctdb_g_lock_unlock_update(req);
1497         if (ret != 0) {
1498                 tevent_req_error(req, ret);
1499                 return;
1500         }
1501
1502         tevent_req_done(req);
1503 }
1504
1505 static int ctdb_g_lock_unlock_update(struct tevent_req *req)
1506 {
1507         struct ctdb_g_lock_unlock_state *state = tevent_req_data(
1508                 req, struct ctdb_g_lock_unlock_state);
1509         struct ctdb_g_lock *lock;
1510         int ret, i;
1511
1512         for (i=0; i<state->lock_list->num; i++) {
1513                 lock = &state->lock_list->lock[i];
1514
1515                 if (ctdb_server_id_equal(&lock->sid, &state->my_sid)) {
1516                         break;
1517                 }
1518         }
1519
1520         if (i < state->lock_list->num) {
1521                 state->lock_list->lock[i] =
1522                         state->lock_list->lock[state->lock_list->num-1];
1523                 state->lock_list->num -= 1;
1524         }
1525
1526         if (state->lock_list->num == 0) {
1527                 ctdb_delete_record(state->h);
1528         } else {
1529                 TDB_DATA data;
1530
1531                 data.dsize = ctdb_g_lock_list_len(state->lock_list);
1532                 data.dptr = talloc_size(state, data.dsize);
1533                 if (data.dptr == NULL) {
1534                         return ENOMEM;
1535                 }
1536
1537                 ctdb_g_lock_list_push(state->lock_list, data.dptr);
1538                 ret = ctdb_store_record(state->h, data);
1539                 talloc_free(data.dptr);
1540                 if (ret != 0) {
1541                         return ret;
1542                 }
1543         }
1544
1545         return 0;
1546 }
1547
1548 bool ctdb_g_lock_unlock_recv(struct tevent_req *req, int *perr)
1549 {
1550         struct ctdb_g_lock_unlock_state *state = tevent_req_data(
1551                 req, struct ctdb_g_lock_unlock_state);
1552         int err;
1553
1554         TALLOC_FREE(state->h);
1555
1556         if (tevent_req_is_unix_error(req, &err)) {
1557                 if (perr != NULL) {
1558                         *perr = err;
1559                 }
1560                 return false;
1561         }
1562
1563         return true;
1564 }
1565
1566 /*
1567  * Persistent database functions
1568  */
1569 struct ctdb_transaction_start_state {
1570         struct tevent_context *ev;
1571         struct ctdb_client_context *client;
1572         struct timeval timeout;
1573         struct ctdb_transaction_handle *h;
1574         uint32_t destnode;
1575 };
1576
1577 static void ctdb_transaction_g_lock_attached(struct tevent_req *subreq);
1578 static void ctdb_transaction_register_done(struct tevent_req *subreq);
1579 static void ctdb_transaction_g_lock_done(struct tevent_req *subreq);
1580 static int ctdb_transaction_handle_destructor(struct ctdb_transaction_handle *h);
1581
1582 struct tevent_req *ctdb_transaction_start_send(TALLOC_CTX *mem_ctx,
1583                                                struct tevent_context *ev,
1584                                                struct ctdb_client_context *client,
1585                                                struct timeval timeout,
1586                                                struct ctdb_db_context *db,
1587                                                bool readonly)
1588 {
1589         struct ctdb_transaction_start_state *state;
1590         struct tevent_req *req, *subreq;
1591         struct ctdb_transaction_handle *h;
1592
1593         req = tevent_req_create(mem_ctx, &state,
1594                                 struct ctdb_transaction_start_state);
1595         if (req == NULL) {
1596                 return NULL;
1597         }
1598
1599         if (! db->persistent) {
1600                 tevent_req_error(req, EINVAL);
1601                 return tevent_req_post(req, ev);
1602         }
1603
1604         state->ev = ev;
1605         state->client = client;
1606         state->destnode = ctdb_client_pnn(client);
1607
1608         h = talloc_zero(db, struct ctdb_transaction_handle);
1609         if (tevent_req_nomem(h, req)) {
1610                 return tevent_req_post(req, ev);
1611         }
1612
1613         h->ev = ev;
1614         h->client = client;
1615         h->db = db;
1616         h->readonly = readonly;
1617         h->updated = false;
1618
1619         /* SRVID is unique for databases, so client can have transactions active
1620          * for multiple databases */
1621         h->sid.pid = getpid();
1622         h->sid.task_id = db->db_id;
1623         h->sid.vnn = state->destnode;
1624         h->sid.unique_id = h->sid.task_id;
1625         h->sid.unique_id = (h->sid.unique_id << 32) | h->sid.pid;
1626
1627         h->recbuf = talloc_zero(h, struct ctdb_rec_buffer);
1628         if (tevent_req_nomem(h->recbuf, req)) {
1629                 return tevent_req_post(req, ev);
1630         }
1631
1632         h->lock_name = talloc_asprintf(h, "transaction_db_0x%08x", db->db_id);
1633         if (tevent_req_nomem(h->lock_name, req)) {
1634                 return tevent_req_post(req, ev);
1635         }
1636
1637         state->h = h;
1638
1639         subreq = ctdb_attach_send(state, ev, client, timeout, "g_lock.tdb", 0);
1640         if (tevent_req_nomem(subreq, req)) {
1641                 return tevent_req_post(req, ev);
1642         }
1643         tevent_req_set_callback(subreq, ctdb_transaction_g_lock_attached, req);
1644
1645         return req;
1646 }
1647
1648 static void ctdb_transaction_g_lock_attached(struct tevent_req *subreq)
1649 {
1650         struct tevent_req *req = tevent_req_callback_data(
1651                 subreq, struct tevent_req);
1652         struct ctdb_transaction_start_state *state = tevent_req_data(
1653                 req, struct ctdb_transaction_start_state);
1654         struct ctdb_req_control request;
1655         bool status;
1656         int ret;
1657
1658         status = ctdb_attach_recv(subreq, &ret, &state->h->db_g_lock);
1659         TALLOC_FREE(subreq);
1660         if (! status) {
1661                 tevent_req_error(req, ret);
1662                 return;
1663         }
1664
1665         ctdb_req_control_register_srvid(&request, state->h->sid.unique_id);
1666         subreq = ctdb_client_control_send(state, state->ev, state->client,
1667                                           state->destnode, state->timeout,
1668                                           &request);
1669         if (tevent_req_nomem(subreq, req)) {
1670                 return;
1671         }
1672         tevent_req_set_callback(subreq, ctdb_transaction_register_done, req);
1673 }
1674
1675 static void ctdb_transaction_register_done(struct tevent_req *subreq)
1676 {
1677         struct tevent_req *req = tevent_req_callback_data(
1678                 subreq, struct tevent_req);
1679         struct ctdb_transaction_start_state *state = tevent_req_data(
1680                 req, struct ctdb_transaction_start_state);
1681         struct ctdb_reply_control *reply;
1682         bool status;
1683         int ret;
1684
1685         status = ctdb_client_control_recv(subreq, &ret, state, &reply);
1686         TALLOC_FREE(subreq);
1687         if (! status) {
1688                 tevent_req_error(req, ret);
1689                 return;
1690         }
1691
1692         ret = ctdb_reply_control_register_srvid(reply);
1693         talloc_free(reply);
1694         if (ret != 0) {
1695                 tevent_req_error(req, ret);
1696                 return;
1697         }
1698
1699         subreq = ctdb_g_lock_lock_send(state, state->ev, state->client,
1700                                        state->h->db_g_lock, state->h->lock_name,
1701                                        &state->h->sid, state->h->readonly);
1702         if (tevent_req_nomem(subreq, req)) {
1703                 return;
1704         }
1705         tevent_req_set_callback(subreq, ctdb_transaction_g_lock_done, req);
1706 }
1707
1708 static void ctdb_transaction_g_lock_done(struct tevent_req *subreq)
1709 {
1710         struct tevent_req *req = tevent_req_callback_data(
1711                 subreq, struct tevent_req);
1712         int ret;
1713         bool status;
1714
1715         status = ctdb_g_lock_lock_recv(subreq, &ret);
1716         TALLOC_FREE(subreq);
1717         if (! status) {
1718                 tevent_req_error(req, ret);
1719                 return;
1720         }
1721
1722         tevent_req_done(req);
1723 }
1724
1725 struct ctdb_transaction_handle *ctdb_transaction_start_recv(
1726                                         struct tevent_req *req,
1727                                         int *perr)
1728 {
1729         struct ctdb_transaction_start_state *state = tevent_req_data(
1730                 req, struct ctdb_transaction_start_state);
1731         struct ctdb_transaction_handle *h = state->h;
1732         int err;
1733
1734         if (tevent_req_is_unix_error(req, &err)) {
1735                 if (perr != NULL) {
1736                         *perr = err;
1737                 }
1738                 return NULL;
1739         }
1740
1741         talloc_set_destructor(h, ctdb_transaction_handle_destructor);
1742         return h;
1743 }
1744
1745 static int ctdb_transaction_handle_destructor(struct ctdb_transaction_handle *h)
1746 {
1747         int ret;
1748
1749         ret = ctdb_ctrl_deregister_srvid(h, h->ev, h->client, h->client->pnn,
1750                                          tevent_timeval_zero(),
1751                                          h->sid.unique_id);
1752         if (ret != 0) {
1753                 DEBUG(DEBUG_WARNING, ("Failed to deregister SRVID\n"));
1754         }
1755
1756         return 0;
1757 }
1758
1759 int ctdb_transaction_start(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
1760                            struct ctdb_client_context *client,
1761                            struct timeval timeout,
1762                            struct ctdb_db_context *db, bool readonly,
1763                            struct ctdb_transaction_handle **out)
1764 {
1765         struct tevent_req *req;
1766         struct ctdb_transaction_handle *h;
1767         int ret;
1768
1769         req = ctdb_transaction_start_send(mem_ctx, ev, client, timeout, db,
1770                                           readonly);
1771         if (req == NULL) {
1772                 return ENOMEM;
1773         }
1774
1775         tevent_req_poll(req, ev);
1776
1777         h = ctdb_transaction_start_recv(req, &ret);
1778         if (h == NULL) {
1779                 return ret;
1780         }
1781
1782         *out = h;
1783         return 0;
1784 }
1785
1786 struct ctdb_transaction_record_fetch_state {
1787         TDB_DATA key, data;
1788         struct ctdb_ltdb_header header;
1789         bool found;
1790 };
1791
1792 static int ctdb_transaction_record_fetch_traverse(uint32_t reqid,
1793                                                   struct ctdb_ltdb_header *header,
1794                                                   TDB_DATA key,
1795                                                   TDB_DATA data,
1796                                                   void *private_data)
1797 {
1798         struct ctdb_transaction_record_fetch_state *state =
1799                 (struct ctdb_transaction_record_fetch_state *)private_data;
1800
1801         if (state->key.dsize == key.dsize &&
1802             memcmp(state->key.dptr, key.dptr, key.dsize) == 0) {
1803                 state->data = data;
1804                 state->header = *header;
1805                 state->found = true;
1806         }
1807
1808         return 0;
1809 }
1810
1811 static int ctdb_transaction_record_fetch(struct ctdb_transaction_handle *h,
1812                                          TDB_DATA key,
1813                                          struct ctdb_ltdb_header *header,
1814                                          TDB_DATA *data)
1815 {
1816         struct ctdb_transaction_record_fetch_state state;
1817         int ret;
1818
1819         state.key = key;
1820         state.found = false;
1821
1822         ret = ctdb_rec_buffer_traverse(h->recbuf,
1823                                        ctdb_transaction_record_fetch_traverse,
1824                                        &state);
1825         if (ret != 0) {
1826                 return ret;
1827         }
1828
1829         if (state.found) {
1830                 if (header != NULL) {
1831                         *header = state.header;
1832                 }
1833                 if (data != NULL) {
1834                         *data = state.data;
1835                 }
1836                 return 0;
1837         }
1838
1839         return ENOENT;
1840 }
1841
1842 int ctdb_transaction_fetch_record(struct ctdb_transaction_handle *h,
1843                                   TDB_DATA key,
1844                                   TALLOC_CTX *mem_ctx, TDB_DATA *data)
1845 {
1846         TDB_DATA tmp_data;
1847         struct ctdb_ltdb_header header;
1848         int ret;
1849
1850         ret = ctdb_transaction_record_fetch(h, key, NULL, &tmp_data);
1851         if (ret == 0) {
1852                 data->dptr = talloc_memdup(mem_ctx, tmp_data.dptr,
1853                                            tmp_data.dsize);
1854                 if (data->dptr == NULL) {
1855                         return ENOMEM;
1856                 }
1857                 data->dsize = tmp_data.dsize;
1858                 return 0;
1859         }
1860
1861         ret = ctdb_ltdb_fetch(h->db, key, &header, mem_ctx, data);
1862         if (ret != 0) {
1863                 return ret;
1864         }
1865
1866         ret = ctdb_rec_buffer_add(h, h->recbuf, 0, &header, key, *data);
1867         if (ret != 0) {
1868                 return ret;
1869         }
1870
1871         return 0;
1872 }
1873
1874 int ctdb_transaction_store_record(struct ctdb_transaction_handle *h,
1875                                   TDB_DATA key, TDB_DATA data)
1876 {
1877         TALLOC_CTX *tmp_ctx;
1878         struct ctdb_ltdb_header header;
1879         TDB_DATA old_data;
1880         int ret;
1881
1882         if (h->readonly) {
1883                 return EINVAL;
1884         }
1885
1886         tmp_ctx = talloc_new(h);
1887         if (tmp_ctx == NULL) {
1888                 return ENOMEM;
1889         }
1890
1891         ret = ctdb_transaction_record_fetch(h, key, &header, &old_data);
1892         if (ret != 0) {
1893                 ret = ctdb_ltdb_fetch(h->db, key, &header, tmp_ctx, &old_data);
1894                 if (ret != 0) {
1895                         return ret;
1896                 }
1897         }
1898
1899         if (old_data.dsize == data.dsize &&
1900             memcmp(old_data.dptr, data.dptr, data.dsize) == 0) {
1901                 talloc_free(tmp_ctx);
1902                 return 0;
1903         }
1904
1905         header.dmaster = ctdb_client_pnn(h->client);
1906         header.rsn += 1;
1907
1908         ret = ctdb_rec_buffer_add(h, h->recbuf, 0, &header, key, data);
1909         talloc_free(tmp_ctx);
1910         if (ret != 0) {
1911                 return ret;
1912         }
1913         h->updated = true;
1914
1915         return 0;
1916 }
1917
1918 int ctdb_transaction_delete_record(struct ctdb_transaction_handle *h,
1919                                    TDB_DATA key)
1920 {
1921         return ctdb_transaction_store_record(h, key, tdb_null);
1922 }
1923
1924 static int ctdb_transaction_store_db_seqnum(struct ctdb_transaction_handle *h,
1925                                             uint64_t seqnum)
1926 {
1927         const char *keyname = CTDB_DB_SEQNUM_KEY;
1928         TDB_DATA key, data;
1929
1930         key.dptr = discard_const(keyname);
1931         key.dsize = strlen(keyname) + 1;
1932
1933         data.dptr = (uint8_t *)&seqnum;
1934         data.dsize = sizeof(seqnum);
1935
1936         return ctdb_transaction_store_record(h, key, data);
1937 }
1938
1939 struct ctdb_transaction_commit_state {
1940         struct tevent_context *ev;
1941         struct ctdb_transaction_handle *h;
1942         uint64_t seqnum;
1943 };
1944
1945 static void ctdb_transaction_commit_done(struct tevent_req *subreq);
1946 static void ctdb_transaction_commit_try(struct tevent_req *subreq);
1947
1948 struct tevent_req *ctdb_transaction_commit_send(
1949                                         TALLOC_CTX *mem_ctx,
1950                                         struct tevent_context *ev,
1951                                         struct ctdb_transaction_handle *h)
1952 {
1953         struct tevent_req *req, *subreq;
1954         struct ctdb_transaction_commit_state *state;
1955         int ret;
1956
1957         req = tevent_req_create(mem_ctx, &state,
1958                                 struct ctdb_transaction_commit_state);
1959         if (req == NULL) {
1960                 return NULL;
1961         }
1962
1963         state->ev = ev;
1964         state->h = h;
1965
1966         ret = ctdb_ctrl_get_db_seqnum(state, ev, h->client,
1967                                       h->client->pnn, tevent_timeval_zero(),
1968                                       h->db->db_id, &state->seqnum);
1969         if (ret != 0) {
1970                 tevent_req_error(req, ret);
1971                 return tevent_req_post(req, ev);
1972         }
1973
1974         ret = ctdb_transaction_store_db_seqnum(h, state->seqnum+1);
1975         if (ret != 0) {
1976                 tevent_req_error(req, ret);
1977                 return tevent_req_post(req, ev);
1978         }
1979
1980         subreq = ctdb_recovery_wait_send(state, ev, h->client);
1981         if (tevent_req_nomem(subreq, req)) {
1982                 return tevent_req_post(req, ev);
1983         }
1984         tevent_req_set_callback(subreq, ctdb_transaction_commit_try, req);
1985
1986         return req;
1987 }
1988
1989 static void ctdb_transaction_commit_try(struct tevent_req *subreq)
1990 {
1991         struct tevent_req *req = tevent_req_callback_data(
1992                 subreq, struct tevent_req);
1993         struct ctdb_transaction_commit_state *state = tevent_req_data(
1994                 req, struct ctdb_transaction_commit_state);
1995         struct ctdb_req_control request;
1996         int ret;
1997         bool status;
1998
1999         status = ctdb_recovery_wait_recv(subreq, &ret);
2000         TALLOC_FREE(subreq);
2001         if (! status) {
2002                 tevent_req_error(req, ret);
2003                 return;
2004         }
2005
2006         ctdb_req_control_trans3_commit(&request, state->h->recbuf);
2007         subreq = ctdb_client_control_send(state, state->ev, state->h->client,
2008                                           state->h->client->pnn,
2009                                           tevent_timeval_zero(), &request);
2010         if (tevent_req_nomem(subreq, req)) {
2011                 return;
2012         }
2013         tevent_req_set_callback(subreq, ctdb_transaction_commit_done, req);
2014 }
2015
2016 static void ctdb_transaction_commit_done(struct tevent_req *subreq)
2017 {
2018         struct tevent_req *req = tevent_req_callback_data(
2019                 subreq, struct tevent_req);
2020         struct ctdb_transaction_commit_state *state = tevent_req_data(
2021                 req, struct ctdb_transaction_commit_state);
2022         struct ctdb_reply_control *reply;
2023         uint64_t seqnum;
2024         int ret;
2025         bool status;
2026
2027         status = ctdb_client_control_recv(subreq, &ret, state, &reply);
2028         TALLOC_FREE(subreq);
2029         if (! status) {
2030                 tevent_req_error(req, ret);
2031                 return;
2032         }
2033
2034         ret = ctdb_reply_control_trans3_commit(reply);
2035         if (ret < 0) {
2036                 /* Control failed due to recovery */
2037                 subreq = ctdb_recovery_wait_send(state, state->ev,
2038                                                  state->h->client);
2039                 if (tevent_req_nomem(subreq, req)) {
2040                         return;
2041                 }
2042                 tevent_req_set_callback(subreq, ctdb_transaction_commit_try,
2043                                         req);
2044                 return;
2045         }
2046
2047         ret = ctdb_ctrl_get_db_seqnum(state, state->ev, state->h->client,
2048                                       state->h->client->pnn,
2049                                       tevent_timeval_zero(),
2050                                       state->h->db->db_id, &seqnum);
2051         if (ret != 0) {
2052                 tevent_req_error(req, ret);
2053                 return;
2054         }
2055
2056         if (seqnum == state->seqnum) {
2057                 subreq = ctdb_recovery_wait_send(state, state->ev,
2058                                                  state->h->client);
2059                 if (tevent_req_nomem(subreq, req)) {
2060                         return;
2061                 }
2062                 tevent_req_set_callback(subreq, ctdb_transaction_commit_try,
2063                                         req);
2064                 return;
2065         }
2066
2067         if (seqnum != state->seqnum + 1) {
2068                 tevent_req_error(req, EIO);
2069                 return;
2070         }
2071
2072         tevent_req_done(req);
2073 }
2074
2075 bool ctdb_transaction_commit_recv(struct tevent_req *req, int *perr)
2076 {
2077         struct ctdb_transaction_commit_state *state = tevent_req_data(
2078                 req, struct ctdb_transaction_commit_state);
2079         int err;
2080
2081         if (tevent_req_is_unix_error(req, &err)) {
2082                 if (perr != NULL) {
2083                         *perr = err;
2084                 }
2085                 TALLOC_FREE(state->h);
2086                 return false;
2087         }
2088
2089         TALLOC_FREE(state->h);
2090         return true;
2091 }
2092
2093 int ctdb_transaction_commit(struct ctdb_transaction_handle *h)
2094 {
2095         struct tevent_req *req;
2096         int ret;
2097         bool status;
2098
2099         if (h->readonly || ! h->updated) {
2100                 talloc_free(h);
2101                 return 0;
2102         }
2103
2104         req = ctdb_transaction_commit_send(h, h->ev, h);
2105         if (req == NULL) {
2106                 talloc_free(h);
2107                 return ENOMEM;
2108         }
2109
2110         tevent_req_poll(req, h->ev);
2111
2112         status = ctdb_transaction_commit_recv(req, &ret);
2113         if (! status) {
2114                 talloc_free(h);
2115                 return ret;
2116         }
2117
2118         talloc_free(h);
2119         return 0;
2120 }
2121
2122 int ctdb_transaction_cancel(struct ctdb_transaction_handle *h)
2123 {
2124         talloc_free(h);
2125         return 0;
2126 }
2127
2128 /*
2129  * TODO:
2130  *
2131  * In future Samba should register SERVER_ID.
2132  * Make that structure same as struct srvid {}.
2133  */