6af607af09b696acba62bfd84cc132b5e673048d
[obnox/samba/samba-obnox.git] / ctdb / client / client_db.c
1 /*
2    CTDB client code
3
4    Copyright (C) Amitay Isaacs  2015
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "replace.h"
21 #include "system/network.h"
22 #include "system/filesys.h"
23
24 #include <talloc.h>
25 #include <tevent.h>
26 #include <tdb.h>
27
28 #include "common/logging.h"
29
30 #include "lib/tdb_wrap/tdb_wrap.h"
31 #include "lib/util/tevent_unix.h"
32 #include "lib/util/dlinklist.h"
33 #include "lib/util/debug.h"
34
35 #include "protocol/protocol.h"
36 #include "protocol/protocol_api.h"
37 #include "client/client_private.h"
38 #include "client/client.h"
39
40 static struct ctdb_db_context *client_db_handle(
41                                         struct ctdb_client_context *client,
42                                         const char *db_name)
43 {
44         struct ctdb_db_context *db;
45
46         for (db = client->db; db != NULL; db = db->next) {
47                 if (strcmp(db_name, db->db_name) == 0) {
48                         return db;
49                 }
50         }
51
52         return NULL;
53 }
54
55 struct ctdb_set_db_flags_state {
56         struct tevent_context *ev;
57         struct ctdb_client_context *client;
58         struct timeval timeout;
59         uint32_t db_id;
60         uint8_t db_flags;
61         bool readonly_done, sticky_done;
62         uint32_t *pnn_list;
63         int count;
64 };
65
66 static void ctdb_set_db_flags_nodemap_done(struct tevent_req *subreq);
67 static void ctdb_set_db_flags_readonly_done(struct tevent_req *subreq);
68 static void ctdb_set_db_flags_sticky_done(struct tevent_req *subreq);
69
70 static struct tevent_req *ctdb_set_db_flags_send(
71                                 TALLOC_CTX *mem_ctx,
72                                 struct tevent_context *ev,
73                                 struct ctdb_client_context *client,
74                                 uint32_t destnode, struct timeval timeout,
75                                 uint32_t db_id, uint8_t db_flags)
76 {
77         struct tevent_req *req, *subreq;
78         struct ctdb_set_db_flags_state *state;
79         struct ctdb_req_control request;
80
81         req = tevent_req_create(mem_ctx, &state,
82                                 struct ctdb_set_db_flags_state);
83         if (req == NULL) {
84                 return NULL;
85         }
86
87         if (! (db_flags & (CTDB_DB_FLAGS_READONLY | CTDB_DB_FLAGS_STICKY))) {
88                 tevent_req_done(req);
89                 return tevent_req_post(req, ev);
90         }
91
92         state->ev = ev;
93         state->client = client;
94         state->timeout = timeout;
95         state->db_id = db_id;
96         state->db_flags = db_flags;
97
98         ctdb_req_control_get_nodemap(&request);
99         subreq = ctdb_client_control_send(state, ev, client, destnode, timeout,
100                                           &request);
101         if (tevent_req_nomem(subreq, req)) {
102                 return tevent_req_post(req, ev);
103         }
104         tevent_req_set_callback(subreq, ctdb_set_db_flags_nodemap_done, req);
105
106         return req;
107 }
108
109 static void ctdb_set_db_flags_nodemap_done(struct tevent_req *subreq)
110 {
111         struct tevent_req *req = tevent_req_callback_data(
112                 subreq, struct tevent_req);
113         struct ctdb_set_db_flags_state *state = tevent_req_data(
114                 req, struct ctdb_set_db_flags_state);
115         struct ctdb_req_control request;
116         struct ctdb_reply_control *reply;
117         struct ctdb_node_map *nodemap;
118         int ret;
119         bool status;
120
121         status = ctdb_client_control_recv(subreq, &ret, state, &reply);
122         TALLOC_FREE(subreq);
123         if (! status) {
124                 tevent_req_error(req, ret);
125                 return;
126         }
127
128         ret = ctdb_reply_control_get_nodemap(reply, state, &nodemap);
129         talloc_free(reply);
130         if (ret != 0) {
131                 tevent_req_error(req, ret);
132                 return;
133         }
134
135         state->count = list_of_connected_nodes(nodemap, CTDB_UNKNOWN_PNN,
136                                                state, &state->pnn_list);
137         talloc_free(nodemap);
138         if (state->count <= 0) {
139                 tevent_req_error(req, ENOMEM);
140                 return;
141         }
142
143         if (state->db_flags & CTDB_DB_FLAGS_READONLY) {
144                 ctdb_req_control_set_db_readonly(&request, state->db_id);
145                 subreq = ctdb_client_control_multi_send(
146                                         state, state->ev, state->client,
147                                         state->pnn_list, state->count,
148                                         state->timeout, &request);
149                 if (tevent_req_nomem(subreq, req)) {
150                         return;
151                 }
152                 tevent_req_set_callback(subreq,
153                                         ctdb_set_db_flags_readonly_done, req);
154         } else {
155                 state->readonly_done = true;
156         }
157
158         if (state->db_flags & CTDB_DB_FLAGS_STICKY) {
159                 ctdb_req_control_set_db_sticky(&request, state->db_id);
160                 subreq = ctdb_client_control_multi_send(
161                                         state, state->ev, state->client,
162                                         state->pnn_list, state->count,
163                                         state->timeout, &request);
164                 if (tevent_req_nomem(subreq, req)) {
165                         return;
166                 }
167                 tevent_req_set_callback(subreq, ctdb_set_db_flags_sticky_done,
168                                         req);
169         } else {
170                 state->sticky_done = true;
171         }
172 }
173
174 static void ctdb_set_db_flags_readonly_done(struct tevent_req *subreq)
175 {
176         struct tevent_req *req = tevent_req_callback_data(
177                 subreq, struct tevent_req);
178         struct ctdb_set_db_flags_state *state = tevent_req_data(
179                 req, struct ctdb_set_db_flags_state);
180         int ret;
181         bool status;
182
183         status = ctdb_client_control_multi_recv(subreq, &ret, NULL, NULL,
184                                                 NULL);
185         TALLOC_FREE(subreq);
186         if (! status) {
187                 tevent_req_error(req, ret);
188                 return;
189         }
190
191         state->readonly_done = true;
192
193         if (state->readonly_done && state->sticky_done) {
194                 tevent_req_done(req);
195         }
196 }
197
198 static void ctdb_set_db_flags_sticky_done(struct tevent_req *subreq)
199 {
200         struct tevent_req *req = tevent_req_callback_data(
201                 subreq, struct tevent_req);
202         struct ctdb_set_db_flags_state *state = tevent_req_data(
203                 req, struct ctdb_set_db_flags_state);
204         int ret;
205         bool status;
206
207         status = ctdb_client_control_multi_recv(subreq, &ret, NULL, NULL,
208                                                 NULL);
209         TALLOC_FREE(subreq);
210         if (! status) {
211                 tevent_req_error(req, ret);
212                 return;
213         }
214
215         state->sticky_done = true;
216
217         if (state->readonly_done && state->sticky_done) {
218                 tevent_req_done(req);
219         }
220 }
221
222 static bool ctdb_set_db_flags_recv(struct tevent_req *req, int *perr)
223 {
224         int err;
225
226         if (tevent_req_is_unix_error(req, &err)) {
227                 if (perr != NULL) {
228                         *perr = err;
229                 }
230                 return false;
231         }
232         return true;
233 }
234
235 struct ctdb_attach_state {
236         struct tevent_context *ev;
237         struct ctdb_client_context *client;
238         struct timeval timeout;
239         uint32_t destnode;
240         uint8_t db_flags;
241         uint32_t tdb_flags;
242         struct ctdb_db_context *db;
243 };
244
245 static void ctdb_attach_mutex_done(struct tevent_req *subreq);
246 static void ctdb_attach_dbid_done(struct tevent_req *subreq);
247 static void ctdb_attach_dbpath_done(struct tevent_req *subreq);
248 static void ctdb_attach_health_done(struct tevent_req *subreq);
249 static void ctdb_attach_flags_done(struct tevent_req *subreq);
250
251 struct tevent_req *ctdb_attach_send(TALLOC_CTX *mem_ctx,
252                                     struct tevent_context *ev,
253                                     struct ctdb_client_context *client,
254                                     struct timeval timeout,
255                                     const char *db_name, uint8_t db_flags)
256 {
257         struct tevent_req *req, *subreq;
258         struct ctdb_attach_state *state;
259         struct ctdb_req_control request;
260
261         req = tevent_req_create(mem_ctx, &state, struct ctdb_attach_state);
262         if (req == NULL) {
263                 return NULL;
264         }
265
266         state->db = client_db_handle(client, db_name);
267         if (state->db != NULL) {
268                 tevent_req_done(req);
269                 return tevent_req_post(req, ev);
270         }
271
272         state->ev = ev;
273         state->client = client;
274         state->timeout = timeout;
275         state->destnode = ctdb_client_pnn(client);
276         state->db_flags = db_flags;
277
278         state->db = talloc_zero(client, struct ctdb_db_context);
279         if (tevent_req_nomem(state->db, req)) {
280                 return tevent_req_post(req, ev);
281         }
282
283         state->db->db_name = talloc_strdup(state->db, db_name);
284         if (tevent_req_nomem(state->db, req)) {
285                 return tevent_req_post(req, ev);
286         }
287
288         if (db_flags & CTDB_DB_FLAGS_PERSISTENT) {
289                 state->db->persistent = true;
290         }
291
292         ctdb_req_control_get_tunable(&request, "TDBMutexEnabled");
293         subreq = ctdb_client_control_send(state, ev, client,
294                                           ctdb_client_pnn(client), timeout,
295                                           &request);
296         if (tevent_req_nomem(subreq, req)) {
297                 return tevent_req_post(req, ev);
298         }
299         tevent_req_set_callback(subreq, ctdb_attach_mutex_done, req);
300
301         return req;
302 }
303
304 static void ctdb_attach_mutex_done(struct tevent_req *subreq)
305 {
306         struct tevent_req *req = tevent_req_callback_data(
307                 subreq, struct tevent_req);
308         struct ctdb_attach_state *state = tevent_req_data(
309                 req, struct ctdb_attach_state);
310         struct ctdb_reply_control *reply;
311         struct ctdb_req_control request;
312         uint32_t mutex_enabled;
313         int ret;
314         bool status;
315
316         status = ctdb_client_control_recv(subreq, &ret, state, &reply);
317         TALLOC_FREE(subreq);
318         if (! status) {
319                 tevent_req_error(req, ret);
320                 return;
321         }
322
323         ret = ctdb_reply_control_get_tunable(reply, &mutex_enabled);
324         if (ret != 0) {
325                 /* Treat error as mutex support not available */
326                 mutex_enabled = 0;
327         }
328
329         state->tdb_flags = TDB_DEFAULT;
330         if (! state->db->persistent) {
331                 state->tdb_flags |= (TDB_INCOMPATIBLE_HASH |
332                                      TDB_CLEAR_IF_FIRST);
333         }
334         if (mutex_enabled == 1) {
335                 state->tdb_flags |= TDB_MUTEX_LOCKING;
336         }
337
338         if (state->db->persistent) {
339                 ctdb_req_control_db_attach_persistent(&request,
340                                                       state->db->db_name,
341                                                       state->tdb_flags);
342         } else {
343                 ctdb_req_control_db_attach(&request, state->db->db_name,
344                                            state->tdb_flags);
345         }
346
347         subreq = ctdb_client_control_send(state, state->ev, state->client,
348                                           state->destnode, state->timeout,
349                                           &request);
350         if (tevent_req_nomem(subreq, req)) {
351                 return;
352         }
353         tevent_req_set_callback(subreq, ctdb_attach_dbid_done, req);
354 }
355
356 static void ctdb_attach_dbid_done(struct tevent_req *subreq)
357 {
358         struct tevent_req *req = tevent_req_callback_data(
359                 subreq, struct tevent_req);
360         struct ctdb_attach_state *state = tevent_req_data(
361                 req, struct ctdb_attach_state);
362         struct ctdb_req_control request;
363         struct ctdb_reply_control *reply;
364         bool status;
365         int ret;
366
367         status = ctdb_client_control_recv(subreq, &ret, state, &reply);
368         TALLOC_FREE(subreq);
369         if (! status) {
370                 tevent_req_error(req, ret);
371                 return;
372         }
373
374         if (state->db->persistent) {
375                 ret = ctdb_reply_control_db_attach_persistent(
376                                 reply, &state->db->db_id);
377         } else {
378                 ret = ctdb_reply_control_db_attach(reply, &state->db->db_id);
379         }
380         talloc_free(reply);
381         if (ret != 0) {
382                 tevent_req_error(req, ret);
383                 return;
384         }
385
386         ctdb_req_control_getdbpath(&request, state->db->db_id);
387         subreq = ctdb_client_control_send(state, state->ev, state->client,
388                                           state->destnode, state->timeout,
389                                           &request);
390         if (tevent_req_nomem(subreq, req)) {
391                 return;
392         }
393         tevent_req_set_callback(subreq, ctdb_attach_dbpath_done, req);
394 }
395
396 static void ctdb_attach_dbpath_done(struct tevent_req *subreq)
397 {
398         struct tevent_req *req = tevent_req_callback_data(
399                 subreq, struct tevent_req);
400         struct ctdb_attach_state *state = tevent_req_data(
401                 req, struct ctdb_attach_state);
402         struct ctdb_reply_control *reply;
403         struct ctdb_req_control request;
404         bool status;
405         int ret;
406
407         status = ctdb_client_control_recv(subreq, &ret, state, &reply);
408         TALLOC_FREE(subreq);
409         if (! status) {
410                 tevent_req_error(req, ret);
411                 return;
412         }
413
414         ret = ctdb_reply_control_getdbpath(reply, state->db,
415                                            &state->db->db_path);
416         talloc_free(reply);
417         if (ret != 0) {
418                 tevent_req_error(req, ret);
419                 return;
420         }
421
422         ctdb_req_control_db_get_health(&request, state->db->db_id);
423         subreq = ctdb_client_control_send(state, state->ev, state->client,
424                                           state->destnode, state->timeout,
425                                           &request);
426         if (tevent_req_nomem(subreq, req)) {
427                 return;
428         }
429         tevent_req_set_callback(subreq, ctdb_attach_health_done, req);
430 }
431
432 static void ctdb_attach_health_done(struct tevent_req *subreq)
433 {
434         struct tevent_req *req = tevent_req_callback_data(
435                 subreq, struct tevent_req);
436         struct ctdb_attach_state *state = tevent_req_data(
437                 req, struct ctdb_attach_state);
438         struct ctdb_reply_control *reply;
439         const char *reason;
440         bool status;
441         int ret;
442
443         status = ctdb_client_control_recv(subreq, &ret, state, &reply);
444         TALLOC_FREE(subreq);
445         if (! status) {
446                 tevent_req_error(req, ret);
447                 return;
448         }
449
450         ret = ctdb_reply_control_db_get_health(reply, state, &reason);
451         if (ret != 0) {
452                 tevent_req_error(req, ret);
453                 return;
454         }
455
456         if (reason != NULL) {
457                 /* Database unhealthy, avoid attach */
458                 /* FIXME: Log here */
459                 tevent_req_error(req, EIO);
460                 return;
461         }
462
463         subreq = ctdb_set_db_flags_send(state, state->ev, state->client,
464                                         state->destnode, state->timeout,
465                                         state->db->db_id, state->db_flags);
466         if (tevent_req_nomem(subreq, req)) {
467                 return;
468         }
469         tevent_req_set_callback(subreq, ctdb_attach_flags_done, req);
470 }
471
472 static void ctdb_attach_flags_done(struct tevent_req *subreq)
473 {
474         struct tevent_req *req = tevent_req_callback_data(
475                 subreq, struct tevent_req);
476         struct ctdb_attach_state *state = tevent_req_data(
477                 req, struct ctdb_attach_state);
478         bool status;
479         int ret;
480
481         status = ctdb_set_db_flags_recv(subreq, &ret);
482         TALLOC_FREE(subreq);
483         if (! status) {
484                 tevent_req_error(req, ret);
485                 return;
486         }
487
488         state->db->ltdb = tdb_wrap_open(state->db, state->db->db_path, 0,
489                                         state->tdb_flags, O_RDWR, 0);
490         if (tevent_req_nomem(state->db->ltdb, req)) {
491                 return;
492         }
493         DLIST_ADD(state->client->db, state->db);
494
495         tevent_req_done(req);
496 }
497
498 bool ctdb_attach_recv(struct tevent_req *req, int *perr,
499                       struct ctdb_db_context **out)
500 {
501         struct ctdb_attach_state *state = tevent_req_data(
502                 req, struct ctdb_attach_state);
503         int err;
504
505         if (tevent_req_is_unix_error(req, &err)) {
506                 if (perr != NULL) {
507                         *perr = err;
508                 }
509                 return false;
510         }
511
512         if (out != NULL) {
513                 *out = state->db;
514         }
515         return true;
516 }
517
518 int ctdb_attach(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
519                 struct ctdb_client_context *client,
520                 struct timeval timeout,
521                 const char *db_name, uint8_t db_flags,
522                 struct ctdb_db_context **out)
523 {
524         struct tevent_req *req;
525         bool status;
526         int ret;
527
528         req = ctdb_attach_send(mem_ctx, ev, client, timeout,
529                                db_name, db_flags);
530         if (req == NULL) {
531                 return ENOMEM;
532         }
533
534         tevent_req_poll(req, ev);
535
536         status = ctdb_attach_recv(req, &ret, out);
537         if (! status) {
538                 return ret;
539         }
540
541         /*
542         ctdb_set_call(db, CTDB_NULL_FUNC, ctdb_null_func);
543         ctdb_set_call(db, CTDB_FETCH_FUNC, ctdb_fetch_func);
544         ctdb_set_call(db, CTDB_FETCH_WITH_HEADER_FUNC, ctdb_fetch_with_header_func);
545         */
546
547         return 0;
548 }
549
550 int ctdb_detach(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
551                 struct ctdb_client_context *client,
552                 struct timeval timeout, uint32_t db_id)
553 {
554         struct ctdb_db_context *db;
555         int ret;
556
557         ret = ctdb_ctrl_db_detach(mem_ctx, ev, client, client->pnn, timeout,
558                                   db_id);
559         if (ret != 0) {
560                 return ret;
561         }
562
563         for (db = client->db; db != NULL; db = db->next) {
564                 if (db->db_id == db_id) {
565                         DLIST_REMOVE(client->db, db);
566                         break;
567                 }
568         }
569
570         return 0;
571 }
572
573 uint32_t ctdb_db_id(struct ctdb_db_context *db)
574 {
575         return db->db_id;
576 }
577
578 struct ctdb_db_traverse_state {
579         ctdb_rec_parser_func_t parser;
580         void *private_data;
581         bool extract_header;
582         int error;
583 };
584
585 static int ctdb_db_traverse_handler(struct tdb_context *tdb, TDB_DATA key,
586                                     TDB_DATA data, void *private_data)
587 {
588         struct ctdb_db_traverse_state *state =
589                 (struct ctdb_db_traverse_state *)private_data;
590         int ret;
591
592         if (state->extract_header) {
593                 struct ctdb_ltdb_header header;
594                 size_t len;
595
596                 ret = ctdb_ltdb_header_pull(data.dptr, data.dsize, &header);
597                 if (ret != 0) {
598                         state->error = ret;
599                         return 1;
600                 }
601
602                 len = ctdb_ltdb_header_len(&header);
603
604                 data.dptr += len;
605                 data.dsize -= len;
606
607                 ret = state->parser(0, &header, key, data, state->private_data);
608         } else {
609                 ret = state->parser(0, NULL, key, data, state->private_data);
610         }
611
612         if (ret != 0) {
613                 state->error = ret;
614                 return 1;
615         }
616
617         return 0;
618 }
619
620 int ctdb_db_traverse(struct ctdb_db_context *db, bool readonly,
621                      bool extract_header,
622                      ctdb_rec_parser_func_t parser, void *private_data)
623 {
624         struct ctdb_db_traverse_state state;
625         int ret;
626
627         state.parser = parser;
628         state.private_data = private_data;
629         state.extract_header = extract_header;
630         state.error = 0;
631
632         if (readonly) {
633                 ret = tdb_traverse_read(db->ltdb->tdb,
634                                         ctdb_db_traverse_handler, &state);
635         } else {
636                 ret = tdb_traverse(db->ltdb->tdb,
637                                    ctdb_db_traverse_handler, &state);
638         }
639
640         if (ret == -1) {
641                 return EIO;
642         }
643
644         return state.error;
645 }
646
647 static int ctdb_ltdb_fetch(struct ctdb_db_context *db, TDB_DATA key,
648                            struct ctdb_ltdb_header *header,
649                            TALLOC_CTX *mem_ctx, TDB_DATA *data)
650 {
651         TDB_DATA rec;
652         int ret;
653
654         rec = tdb_fetch(db->ltdb->tdb, key);
655         if (rec.dsize < sizeof(struct ctdb_ltdb_header)) {
656                 /* No record present */
657                 if (rec.dptr != NULL) {
658                         free(rec.dptr);
659                 }
660
661                 if (tdb_error(db->ltdb->tdb) != TDB_ERR_NOEXIST) {
662                         return EIO;
663                 }
664
665                 header->rsn = 0;
666                 header->dmaster = CTDB_UNKNOWN_PNN;
667                 header->flags = 0;
668
669                 if (data != NULL) {
670                         *data = tdb_null;
671                 }
672                 return 0;
673         }
674
675         ret = ctdb_ltdb_header_pull(rec.dptr, rec.dsize, header);
676         if (ret != 0) {
677                 return ret;
678         }
679
680         ret = 0;
681         if (data != NULL) {
682                 size_t offset = ctdb_ltdb_header_len(header);
683
684                 data->dsize = rec.dsize - offset;
685                 data->dptr = talloc_memdup(mem_ctx, rec.dptr + offset,
686                                            data->dsize);
687                 if (data->dptr == NULL) {
688                         ret = ENOMEM;
689                 }
690         }
691
692         free(rec.dptr);
693         return ret;
694 }
695
696 /*
697  * Fetch a record from volatile database
698  *
699  * Steps:
700  *  1. Get a lock on the hash chain
701  *  2. If the record does not exist, migrate the record
702  *  3. If readonly=true and delegations do not exist, migrate the record.
703  *  4. If readonly=false and delegations exist, migrate the record.
704  *  5. If the local node is not dmaster, migrate the record.
705  *  6. Return record
706  */
707
708 struct ctdb_fetch_lock_state {
709         struct tevent_context *ev;
710         struct ctdb_client_context *client;
711         struct ctdb_record_handle *h;
712         bool readonly;
713         uint32_t pnn;
714 };
715
716 static int ctdb_fetch_lock_check(struct tevent_req *req);
717 static void ctdb_fetch_lock_migrate(struct tevent_req *req);
718 static void ctdb_fetch_lock_migrate_done(struct tevent_req *subreq);
719
720 struct tevent_req *ctdb_fetch_lock_send(TALLOC_CTX *mem_ctx,
721                                         struct tevent_context *ev,
722                                         struct ctdb_client_context *client,
723                                         struct ctdb_db_context *db,
724                                         TDB_DATA key, bool readonly)
725 {
726         struct ctdb_fetch_lock_state *state;
727         struct tevent_req *req;
728         int ret;
729
730         req = tevent_req_create(mem_ctx, &state, struct ctdb_fetch_lock_state);
731         if (req == NULL) {
732                 return NULL;
733         }
734
735         state->ev = ev;
736         state->client = client;
737
738         state->h = talloc_zero(db, struct ctdb_record_handle);
739         if (tevent_req_nomem(state->h, req)) {
740                 return tevent_req_post(req, ev);
741         }
742         state->h->client = client;
743         state->h->db = db;
744         state->h->key.dptr = talloc_memdup(state->h, key.dptr, key.dsize);
745         if (tevent_req_nomem(state->h->key.dptr, req)) {
746                 return tevent_req_post(req, ev);
747         }
748         state->h->key.dsize = key.dsize;
749         state->h->readonly = false;
750
751         state->readonly = readonly;
752         state->pnn = ctdb_client_pnn(client);
753
754         /* Check that database is not persistent */
755         if (db->persistent) {
756                 tevent_req_error(req, EINVAL);
757                 return tevent_req_post(req, ev);
758         }
759
760         ret = ctdb_fetch_lock_check(req);
761         if (ret == 0) {
762                 tevent_req_done(req);
763                 return tevent_req_post(req, ev);
764         }
765         if (ret != EAGAIN) {
766                 tevent_req_error(req, ret);
767                 return tevent_req_post(req, ev);
768         }
769         return req;
770 }
771
772 static int ctdb_fetch_lock_check(struct tevent_req *req)
773 {
774         struct ctdb_fetch_lock_state *state = tevent_req_data(
775                 req, struct ctdb_fetch_lock_state);
776         struct ctdb_record_handle *h = state->h;
777         struct ctdb_ltdb_header header;
778         TDB_DATA data = tdb_null;
779         int ret, err = 0;
780         bool do_migrate = false;
781
782         ret = tdb_chainlock(state->h->db->ltdb->tdb, state->h->key);
783         if (ret != 0) {
784                 err = EIO;
785                 goto failed;
786         }
787
788         data = tdb_fetch(h->db->ltdb->tdb, h->key);
789         if (data.dptr == NULL) {
790                 if (tdb_error(h->db->ltdb->tdb) == TDB_ERR_NOEXIST) {
791                         goto migrate;
792                 } else {
793                         err = EIO;
794                         goto failed;
795                 }
796         }
797
798         /* Got the record */
799         ret = ctdb_ltdb_header_pull(data.dptr, data.dsize, &header);
800         if (ret != 0) {
801                 err = ret;
802                 goto failed;
803         }
804
805         if (! state->readonly) {
806                 /* Read/write access */
807                 if (header.dmaster == state->pnn &&
808                     header.flags & CTDB_REC_RO_HAVE_DELEGATIONS) {
809                         goto migrate;
810                 }
811
812                 if (header.dmaster != state->pnn) {
813                         goto migrate;
814                 }
815         } else {
816                 /* Readonly access */
817                 if (header.dmaster != state->pnn &&
818                     ! (header.flags & (CTDB_REC_RO_HAVE_READONLY |
819                                        CTDB_REC_RO_HAVE_DELEGATIONS))) {
820                         goto migrate;
821                 }
822         }
823
824         /* We are the dmaster or readonly delegation */
825         h->header = header;
826         h->data = data;
827         if (header.flags & (CTDB_REC_RO_HAVE_READONLY |
828                             CTDB_REC_RO_HAVE_DELEGATIONS)) {
829                 h->readonly = true;
830         }
831         return 0;
832
833 migrate:
834         do_migrate = true;
835         err = EAGAIN;
836
837 failed:
838         if (data.dptr != NULL) {
839                 free(data.dptr);
840         }
841         ret = tdb_chainunlock(h->db->ltdb->tdb, h->key);
842         if (ret != 0) {
843                 DEBUG(DEBUG_ERR, ("tdb_chainunlock failed on %s\n",
844                                   h->db->db_name));
845                 return EIO;
846         }
847
848         if (do_migrate) {
849                 ctdb_fetch_lock_migrate(req);
850         }
851         return err;
852 }
853
854 static void ctdb_fetch_lock_migrate(struct tevent_req *req)
855 {
856         struct ctdb_fetch_lock_state *state = tevent_req_data(
857                 req, struct ctdb_fetch_lock_state);
858         struct ctdb_req_call request;
859         struct tevent_req *subreq;
860
861         ZERO_STRUCT(request);
862         request.flags = CTDB_IMMEDIATE_MIGRATION;
863         if (state->readonly) {
864                 request.flags |= CTDB_WANT_READONLY;
865         }
866         request.db_id = state->h->db->db_id;
867         request.callid = CTDB_NULL_FUNC;
868         request.key = state->h->key;
869
870         subreq = ctdb_client_call_send(state, state->ev, state->client,
871                                        &request);
872         if (tevent_req_nomem(subreq, req)) {
873                 return;
874         }
875
876         tevent_req_set_callback(subreq, ctdb_fetch_lock_migrate_done, req);
877 }
878
879 static void ctdb_fetch_lock_migrate_done(struct tevent_req *subreq)
880 {
881         struct tevent_req *req = tevent_req_callback_data(
882                 subreq, struct tevent_req);
883         struct ctdb_fetch_lock_state *state = tevent_req_data(
884                 req, struct ctdb_fetch_lock_state);
885         struct ctdb_reply_call *reply;
886         int ret;
887         bool status;
888
889         status = ctdb_client_call_recv(subreq, state, &reply, &ret);
890         TALLOC_FREE(subreq);
891         if (! status) {
892                 tevent_req_error(req, ret);
893                 return;
894         }
895
896         if (reply->status != 0) {
897                 tevent_req_error(req, EIO);
898                 return;
899         }
900         talloc_free(reply);
901
902         ret = ctdb_fetch_lock_check(req);
903         if (ret != 0) {
904                 tevent_req_error(req, ret);
905                 return;
906         }
907
908         tevent_req_done(req);
909 }
910
911 static int ctdb_record_handle_destructor(struct ctdb_record_handle *h)
912 {
913         tdb_chainunlock(h->db->ltdb->tdb, h->key);
914         free(h->data.dptr);
915         return 0;
916 }
917
918 struct ctdb_record_handle *ctdb_fetch_lock_recv(struct tevent_req *req,
919                                                 struct ctdb_ltdb_header *header,
920                                                 TALLOC_CTX *mem_ctx,
921                                                 TDB_DATA *data, int *perr)
922 {
923         struct ctdb_fetch_lock_state *state = tevent_req_data(
924                 req, struct ctdb_fetch_lock_state);
925         struct ctdb_record_handle *h = state->h;
926         int err;
927
928         if (tevent_req_is_unix_error(req, &err)) {
929                 if (perr != NULL) {
930                         *perr = err;
931                 }
932                 return NULL;
933         }
934
935         if (header != NULL) {
936                 *header = h->header;
937         }
938         if (data != NULL) {
939                 size_t offset;
940
941                 offset = ctdb_ltdb_header_len(&h->header);
942
943                 data->dsize = h->data.dsize - offset;
944                 data->dptr = talloc_memdup(mem_ctx, h->data.dptr + offset,
945                                            data->dsize);
946                 if (data->dptr == NULL) {
947                         TALLOC_FREE(state->h);
948                         if (perr != NULL) {
949                                 *perr = ENOMEM;
950                         }
951                         return NULL;
952                 }
953         }
954
955         talloc_set_destructor(h, ctdb_record_handle_destructor);
956         return h;
957 }
958
959 int ctdb_fetch_lock(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
960                     struct ctdb_client_context *client,
961                     struct ctdb_db_context *db, TDB_DATA key, bool readonly,
962                     struct ctdb_record_handle **out,
963                     struct ctdb_ltdb_header *header, TDB_DATA *data)
964 {
965         struct tevent_req *req;
966         struct ctdb_record_handle *h;
967         int ret;
968
969         req = ctdb_fetch_lock_send(mem_ctx, ev, client, db, key, readonly);
970         if (req == NULL) {
971                 return ENOMEM;
972         }
973
974         tevent_req_poll(req, ev);
975
976         h = ctdb_fetch_lock_recv(req, header, mem_ctx, data, &ret);
977         if (h == NULL) {
978                 return ret;
979         }
980
981         *out = h;
982         return 0;
983 }
984
985 int ctdb_store_record(struct ctdb_record_handle *h, TDB_DATA data)
986 {
987         TDB_DATA rec;
988         size_t offset;
989         int ret;
990
991         /* Cannot modify the record if it was obtained as a readonly copy */
992         if (h->readonly) {
993                 return EINVAL;
994         }
995
996         /* Check if the new data is same */
997         if (h->data.dsize == data.dsize &&
998             memcmp(h->data.dptr, data.dptr, data.dsize) == 0) {
999                 /* No need to do anything */
1000                 return 0;
1001         }
1002
1003         offset = ctdb_ltdb_header_len(&h->header);
1004         rec.dsize = offset + data.dsize;
1005         rec.dptr = talloc_size(h, rec.dsize);
1006         if (rec.dptr == NULL) {
1007                 return ENOMEM;
1008         }
1009
1010         ctdb_ltdb_header_push(&h->header, rec.dptr);
1011         memcpy(rec.dptr + offset, data.dptr, data.dsize);
1012
1013         ret = tdb_store(h->db->ltdb->tdb, h->key, rec, TDB_REPLACE);
1014         if (ret != 0) {
1015                 DEBUG(DEBUG_ERR, ("Failed to store record in DB %s\n",
1016                                   h->db->db_name));
1017                 return EIO;
1018         }
1019
1020         talloc_free(rec.dptr);
1021         return 0;
1022 }
1023
1024 int ctdb_delete_record(struct ctdb_record_handle *h)
1025 {
1026         TDB_DATA rec;
1027         struct ctdb_key_data key;
1028         int ret;
1029
1030         /* Cannot delete the record if it was obtained as a readonly copy */
1031         if (h->readonly) {
1032                 return EINVAL;
1033         }
1034
1035         rec.dsize = ctdb_ltdb_header_len(&h->header);
1036         rec.dptr = talloc_size(h, rec.dsize);
1037         if (rec.dptr == NULL) {
1038                 return ENOMEM;
1039         }
1040
1041         ctdb_ltdb_header_push(&h->header, rec.dptr);
1042
1043         ret = tdb_store(h->db->ltdb->tdb, h->key, rec, TDB_REPLACE);
1044         talloc_free(rec.dptr);
1045         if (ret != 0) {
1046                 DEBUG(DEBUG_ERR, ("Failed to delete record in DB %s\n",
1047                                   h->db->db_name));
1048                 return EIO;
1049         }
1050
1051         key.db_id = h->db->db_id;
1052         key.header = h->header;
1053         key.key = h->key;
1054
1055         ret = ctdb_ctrl_schedule_for_deletion(h, h->ev, h->client,
1056                                               h->client->pnn,
1057                                               tevent_timeval_zero(), &key);
1058         if (ret != 0) {
1059                 DEBUG(DEBUG_WARNING,
1060                       ("Failed to mark record to be deleted in DB %s\n",
1061                        h->db->db_name));
1062                 return ret;
1063         }
1064
1065         return 0;
1066 }
1067
1068 /*
1069  * Global lock functions
1070  */
1071
1072 struct ctdb_g_lock_lock_state {
1073         struct tevent_context *ev;
1074         struct ctdb_client_context *client;
1075         struct ctdb_db_context *db;
1076         TDB_DATA key;
1077         struct ctdb_server_id my_sid;
1078         enum ctdb_g_lock_type lock_type;
1079         struct ctdb_record_handle *h;
1080         /* state for verification of active locks */
1081         struct ctdb_g_lock_list *lock_list;
1082         unsigned int current;
1083 };
1084
1085 static void ctdb_g_lock_lock_fetched(struct tevent_req *subreq);
1086 static void ctdb_g_lock_lock_process_locks(struct tevent_req *req);
1087 static void ctdb_g_lock_lock_checked(struct tevent_req *subreq);
1088 static int ctdb_g_lock_lock_update(struct tevent_req *req);
1089 static void ctdb_g_lock_lock_retry(struct tevent_req *subreq);
1090
1091 static bool ctdb_g_lock_conflicts(enum ctdb_g_lock_type l1,
1092                                   enum ctdb_g_lock_type l2)
1093 {
1094         if ((l1 == CTDB_G_LOCK_READ) && (l2 == CTDB_G_LOCK_READ)) {
1095                 return false;
1096         }
1097         return true;
1098 }
1099
1100 struct tevent_req *ctdb_g_lock_lock_send(TALLOC_CTX *mem_ctx,
1101                                          struct tevent_context *ev,
1102                                          struct ctdb_client_context *client,
1103                                          struct ctdb_db_context *db,
1104                                          const char *keyname,
1105                                          struct ctdb_server_id *sid,
1106                                          bool readonly)
1107 {
1108         struct tevent_req *req, *subreq;
1109         struct ctdb_g_lock_lock_state *state;
1110
1111         req = tevent_req_create(mem_ctx, &state,
1112                                 struct ctdb_g_lock_lock_state);
1113         if (req == NULL) {
1114                 return NULL;
1115         }
1116
1117         state->ev = ev;
1118         state->client = client;
1119         state->db = db;
1120         state->key.dptr = discard_const(keyname);
1121         state->key.dsize = strlen(keyname) + 1;
1122         state->my_sid = *sid;
1123         state->lock_type = (readonly ? CTDB_G_LOCK_READ : CTDB_G_LOCK_WRITE);
1124
1125         subreq = ctdb_fetch_lock_send(state, ev, client, db, state->key,
1126                                       false);
1127         if (tevent_req_nomem(subreq, req)) {
1128                 return tevent_req_post(req, ev);
1129         }
1130         tevent_req_set_callback(subreq, ctdb_g_lock_lock_fetched, req);
1131
1132         return req;
1133 }
1134
1135 static void ctdb_g_lock_lock_fetched(struct tevent_req *subreq)
1136 {
1137         struct tevent_req *req = tevent_req_callback_data(
1138                 subreq, struct tevent_req);
1139         struct ctdb_g_lock_lock_state *state = tevent_req_data(
1140                 req, struct ctdb_g_lock_lock_state);
1141         TDB_DATA data;
1142         int ret = 0;
1143
1144         state->h = ctdb_fetch_lock_recv(subreq, NULL, state, &data, &ret);
1145         TALLOC_FREE(subreq);
1146         if (state->h == NULL) {
1147                 tevent_req_error(req, ret);
1148                 return;
1149         }
1150
1151         if (state->lock_list != NULL) {
1152                 TALLOC_FREE(state->lock_list);
1153                 state->current = 0;
1154         }
1155
1156         ret = ctdb_g_lock_list_pull(data.dptr, data.dsize, state,
1157                                     &state->lock_list);
1158         talloc_free(data.dptr);
1159         if (ret != 0) {
1160                 tevent_req_error(req, ret);
1161                 return;
1162         }
1163
1164         ctdb_g_lock_lock_process_locks(req);
1165 }
1166
1167 static void ctdb_g_lock_lock_process_locks(struct tevent_req *req)
1168 {
1169         struct ctdb_g_lock_lock_state *state = tevent_req_data(
1170                 req, struct ctdb_g_lock_lock_state);
1171         struct tevent_req *subreq;
1172         struct ctdb_g_lock *lock;
1173         bool check_server = false;
1174         int ret;
1175
1176         while (state->current < state->lock_list->num) {
1177                 lock = &state->lock_list->lock[state->current];
1178
1179                 /* We should not ask for the same lock more than once */
1180                 if (ctdb_server_id_equal(&lock->sid, &state->my_sid)) {
1181                         tevent_req_error(req, EDEADLK);
1182                         return;
1183                 }
1184
1185                 if (ctdb_g_lock_conflicts(lock->type, state->lock_type)) {
1186                         check_server = true;
1187                         break;
1188                 }
1189
1190                 state->current += 1;
1191         }
1192
1193         if (check_server) {
1194                 struct ctdb_req_control request;
1195                 struct ctdb_uint64_array u64_array;
1196
1197                 u64_array.num = 1;
1198                 u64_array.val = &lock->sid.unique_id;
1199
1200                 ctdb_req_control_check_srvids(&request, &u64_array);
1201                 subreq = ctdb_client_control_send(state, state->ev,
1202                                                   state->client,
1203                                                   state->client->pnn,
1204                                                   tevent_timeval_zero(),
1205                                                   &request);
1206                 if (tevent_req_nomem(subreq, req)) {
1207                         return;
1208                 }
1209                 tevent_req_set_callback(subreq, ctdb_g_lock_lock_checked, req);
1210                 return;
1211         }
1212
1213         /* There is no conflict, add ourself to the lock_list */
1214         state->lock_list->lock = talloc_realloc(state->lock_list,
1215                                                 state->lock_list->lock,
1216                                                 struct ctdb_g_lock,
1217                                                 state->lock_list->num + 1);
1218         if (state->lock_list->lock == NULL) {
1219                 tevent_req_error(req, ENOMEM);
1220                 return;
1221         }
1222
1223         lock = &state->lock_list->lock[state->lock_list->num];
1224         lock->type = state->lock_type;
1225         lock->sid = state->my_sid;
1226         state->lock_list->num += 1;
1227
1228         ret = ctdb_g_lock_lock_update(req);
1229         if (ret != 0) {
1230                 tevent_req_error(req, ret);
1231                 return;
1232         }
1233
1234         tevent_req_done(req);
1235 }
1236
1237 static void ctdb_g_lock_lock_checked(struct tevent_req *subreq)
1238 {
1239         struct tevent_req *req = tevent_req_callback_data(
1240                 subreq, struct tevent_req);
1241         struct ctdb_g_lock_lock_state *state = tevent_req_data(
1242                 req, struct ctdb_g_lock_lock_state);
1243         struct ctdb_reply_control *reply;
1244         struct ctdb_uint8_array *u8_array;
1245         int ret;
1246         bool status;
1247         int8_t val;
1248
1249         status = ctdb_client_control_recv(subreq, &ret, state, &reply);
1250         TALLOC_FREE(subreq);
1251         if (! status) {
1252                 tevent_req_error(req, ret);
1253                 return;
1254         }
1255
1256         ret = ctdb_reply_control_check_srvids(reply, state, &u8_array);
1257         if (ret != 0) {
1258                 tevent_req_error(req, ENOMEM);
1259                 return;
1260         }
1261
1262         if (u8_array->num != 1) {
1263                 talloc_free(u8_array);
1264                 tevent_req_error(req, EIO);
1265                 return;
1266         }
1267
1268         val = u8_array->val[0];
1269         talloc_free(u8_array);
1270
1271         if (val == 1) {
1272                 /* server process exists, need to retry */
1273                 subreq = tevent_wakeup_send(state, state->ev,
1274                                             tevent_timeval_current_ofs(1,0));
1275                 if (tevent_req_nomem(subreq, req)) {
1276                         return;
1277                 }
1278                 tevent_req_set_callback(subreq, ctdb_g_lock_lock_retry, req);
1279                 return;
1280         }
1281
1282         /* server process does not exist, remove conflicting entry */
1283         state->lock_list->lock[state->current] =
1284                 state->lock_list->lock[state->lock_list->num-1];
1285         state->lock_list->num -= 1;
1286
1287         ret = ctdb_g_lock_lock_update(req);
1288         if (ret != 0) {
1289                 tevent_req_error(req, ret);
1290                 return;
1291         }
1292
1293         ctdb_g_lock_lock_process_locks(req);
1294 }
1295
1296 static int ctdb_g_lock_lock_update(struct tevent_req *req)
1297 {
1298         struct ctdb_g_lock_lock_state *state = tevent_req_data(
1299                 req, struct ctdb_g_lock_lock_state);
1300         TDB_DATA data;
1301         int ret;
1302
1303         data.dsize = ctdb_g_lock_list_len(state->lock_list);
1304         data.dptr = talloc_size(state, data.dsize);
1305         if (data.dptr == NULL) {
1306                 return ENOMEM;
1307         }
1308
1309         ctdb_g_lock_list_push(state->lock_list, data.dptr);
1310         ret = ctdb_store_record(state->h, data);
1311         talloc_free(data.dptr);
1312         return ret;
1313 }
1314
1315 #if 0
1316 static int ctdb_g_lock_lock_update(struct ctdb_g_lock_lock_state *state,
1317                                    struct ctdb_g_lock_list *lock_list,
1318                                    struct ctdb_record_handle *h)
1319 {
1320         struct ctdb_g_lock *lock;
1321         bool conflict = false;
1322         bool modified = false;
1323         int ret, i;
1324
1325         for (i=0; i<lock_list->num; i++) {
1326                 lock = &lock_list->lock[i];
1327
1328                 /* We should not ask for lock more than once */
1329                 if (ctdb_server_id_equal(&lock->sid, &state->my_sid)) {
1330                         return EDEADLK;
1331                 }
1332
1333                 if (ctdb_g_lock_conflicts(lock->type, state->lock_type)) {
1334                         bool exists;
1335
1336                         conflict = true;
1337                         ret = ctdb_server_id_exists(state->client, &lock->sid,
1338                                                     &exists);
1339                         if (ret != 0) {
1340                                 return ret;
1341                         }
1342
1343                         if (exists) {
1344                                 break;
1345                         }
1346
1347                         /* Server does not exist, delete conflicting entry */
1348                         lock_list->lock[i] = lock_list->lock[lock_list->num-1];
1349                         lock_list->num -= 1;
1350                         modified = true;
1351                 }
1352         }
1353
1354         if (! conflict) {
1355                 lock = talloc_realloc(lock_list, lock_list->lock,
1356                                       struct ctdb_g_lock, lock_list->num+1);
1357                 if (lock == NULL) {
1358                         return ENOMEM;
1359                 }
1360
1361                 lock[lock_list->num].type = state->lock_type;
1362                 lock[lock_list->num].sid = state->my_sid;
1363                 lock_list->lock = lock;
1364                 lock_list->num += 1;
1365                 modified = true;
1366         }
1367
1368         if (modified) {
1369                 TDB_DATA data;
1370
1371                 data.dsize = ctdb_g_lock_list_len(lock_list);
1372                 data.dptr = talloc_size(state, data.dsize);
1373                 if (data.dptr == NULL) {
1374                         return ENOMEM;
1375                 }
1376
1377                 ctdb_g_lock_list_push(lock_list, data.dptr);
1378                 ret = ctdb_store_record(h, data);
1379                 talloc_free(data.dptr);
1380                 if (ret != 0) {
1381                         return ret;
1382                 }
1383         }
1384
1385         if (conflict) {
1386                 return EAGAIN;
1387         }
1388         return 0;
1389 }
1390 #endif
1391
1392 static void ctdb_g_lock_lock_retry(struct tevent_req *subreq)
1393 {
1394         struct tevent_req *req = tevent_req_callback_data(
1395                 subreq, struct tevent_req);
1396         struct ctdb_g_lock_lock_state *state = tevent_req_data(
1397                 req, struct ctdb_g_lock_lock_state);
1398         bool success;
1399
1400         success = tevent_wakeup_recv(subreq);
1401         TALLOC_FREE(subreq);
1402         if (! success) {
1403                 tevent_req_error(req, ENOMEM);
1404                 return;
1405         }
1406
1407         subreq = ctdb_fetch_lock_send(state, state->ev, state->client,
1408                                       state->db, state->key, false);
1409         if (tevent_req_nomem(subreq, req)) {
1410                 return;
1411         }
1412         tevent_req_set_callback(subreq, ctdb_g_lock_lock_fetched, req);
1413 }
1414
1415 bool ctdb_g_lock_lock_recv(struct tevent_req *req, int *perr)
1416 {
1417         struct ctdb_g_lock_lock_state *state = tevent_req_data(
1418                 req, struct ctdb_g_lock_lock_state);
1419         int err;
1420
1421         TALLOC_FREE(state->h);
1422
1423         if (tevent_req_is_unix_error(req, &err)) {
1424                 if (perr != NULL) {
1425                         *perr = err;
1426                 }
1427                 return false;
1428         }
1429
1430         return true;
1431 }
1432
1433 struct ctdb_g_lock_unlock_state {
1434         struct tevent_context *ev;
1435         struct ctdb_client_context *client;
1436         struct ctdb_db_context *db;
1437         TDB_DATA key;
1438         struct ctdb_server_id my_sid;
1439         struct ctdb_record_handle *h;
1440         struct ctdb_g_lock_list *lock_list;
1441 };
1442
1443 static void ctdb_g_lock_unlock_fetched(struct tevent_req *subreq);
1444 static int ctdb_g_lock_unlock_update(struct tevent_req *req);
1445
1446 struct tevent_req *ctdb_g_lock_unlock_send(TALLOC_CTX *mem_ctx,
1447                                            struct tevent_context *ev,
1448                                            struct ctdb_client_context *client,
1449                                            struct ctdb_db_context *db,
1450                                            const char *keyname,
1451                                            struct ctdb_server_id sid)
1452 {
1453         struct tevent_req *req, *subreq;
1454         struct ctdb_g_lock_unlock_state *state;
1455
1456         req = tevent_req_create(mem_ctx, &state,
1457                                 struct ctdb_g_lock_unlock_state);
1458         if (req == NULL) {
1459                 return NULL;
1460         }
1461
1462         state->ev = ev;
1463         state->client = client;
1464         state->db = db;
1465         state->key.dptr = discard_const(keyname);
1466         state->key.dsize = strlen(keyname) + 1;
1467         state->my_sid = sid;
1468
1469         subreq = ctdb_fetch_lock_send(state, ev, client, db, state->key,
1470                                       false);
1471         if (tevent_req_nomem(subreq, req)) {
1472                 return tevent_req_post(req, ev);
1473         }
1474         tevent_req_set_callback(subreq, ctdb_g_lock_unlock_fetched, req);
1475
1476         return req;
1477 }
1478
1479 static void ctdb_g_lock_unlock_fetched(struct tevent_req *subreq)
1480 {
1481         struct tevent_req *req = tevent_req_callback_data(
1482                 subreq, struct tevent_req);
1483         struct ctdb_g_lock_unlock_state *state = tevent_req_data(
1484                 req, struct ctdb_g_lock_unlock_state);
1485         TDB_DATA data;
1486         int ret = 0;
1487
1488         state->h = ctdb_fetch_lock_recv(subreq, NULL, state, &data, &ret);
1489         TALLOC_FREE(subreq);
1490         if (state->h == NULL) {
1491                 tevent_req_error(req, ret);
1492                 return;
1493         }
1494
1495         ret = ctdb_g_lock_list_pull(data.dptr, data.dsize, state,
1496                                     &state->lock_list);
1497         if (ret != 0) {
1498                 tevent_req_error(req, ret);
1499                 return;
1500         }
1501
1502         ret = ctdb_g_lock_unlock_update(req);
1503         if (ret != 0) {
1504                 tevent_req_error(req, ret);
1505                 return;
1506         }
1507
1508         tevent_req_done(req);
1509 }
1510
1511 static int ctdb_g_lock_unlock_update(struct tevent_req *req)
1512 {
1513         struct ctdb_g_lock_unlock_state *state = tevent_req_data(
1514                 req, struct ctdb_g_lock_unlock_state);
1515         struct ctdb_g_lock *lock;
1516         int ret, i;
1517
1518         for (i=0; i<state->lock_list->num; i++) {
1519                 lock = &state->lock_list->lock[i];
1520
1521                 if (ctdb_server_id_equal(&lock->sid, &state->my_sid)) {
1522                         break;
1523                 }
1524         }
1525
1526         if (i < state->lock_list->num) {
1527                 state->lock_list->lock[i] =
1528                         state->lock_list->lock[state->lock_list->num-1];
1529                 state->lock_list->num -= 1;
1530         }
1531
1532         if (state->lock_list->num == 0) {
1533                 ctdb_delete_record(state->h);
1534         } else {
1535                 TDB_DATA data;
1536
1537                 data.dsize = ctdb_g_lock_list_len(state->lock_list);
1538                 data.dptr = talloc_size(state, data.dsize);
1539                 if (data.dptr == NULL) {
1540                         return ENOMEM;
1541                 }
1542
1543                 ctdb_g_lock_list_push(state->lock_list, data.dptr);
1544                 ret = ctdb_store_record(state->h, data);
1545                 talloc_free(data.dptr);
1546                 if (ret != 0) {
1547                         return ret;
1548                 }
1549         }
1550
1551         return 0;
1552 }
1553
1554 bool ctdb_g_lock_unlock_recv(struct tevent_req *req, int *perr)
1555 {
1556         struct ctdb_g_lock_unlock_state *state = tevent_req_data(
1557                 req, struct ctdb_g_lock_unlock_state);
1558         int err;
1559
1560         TALLOC_FREE(state->h);
1561
1562         if (tevent_req_is_unix_error(req, &err)) {
1563                 if (perr != NULL) {
1564                         *perr = err;
1565                 }
1566                 return false;
1567         }
1568
1569         return true;
1570 }
1571
1572 /*
1573  * Persistent database functions
1574  */
1575 struct ctdb_transaction_start_state {
1576         struct tevent_context *ev;
1577         struct ctdb_client_context *client;
1578         struct timeval timeout;
1579         struct ctdb_transaction_handle *h;
1580         uint32_t destnode;
1581 };
1582
1583 static void ctdb_transaction_g_lock_attached(struct tevent_req *subreq);
1584 static void ctdb_transaction_register_done(struct tevent_req *subreq);
1585 static void ctdb_transaction_g_lock_done(struct tevent_req *subreq);
1586 static int ctdb_transaction_handle_destructor(struct ctdb_transaction_handle *h);
1587
1588 struct tevent_req *ctdb_transaction_start_send(TALLOC_CTX *mem_ctx,
1589                                                struct tevent_context *ev,
1590                                                struct ctdb_client_context *client,
1591                                                struct timeval timeout,
1592                                                struct ctdb_db_context *db,
1593                                                bool readonly)
1594 {
1595         struct ctdb_transaction_start_state *state;
1596         struct tevent_req *req, *subreq;
1597         struct ctdb_transaction_handle *h;
1598
1599         req = tevent_req_create(mem_ctx, &state,
1600                                 struct ctdb_transaction_start_state);
1601         if (req == NULL) {
1602                 return NULL;
1603         }
1604
1605         if (! db->persistent) {
1606                 tevent_req_error(req, EINVAL);
1607                 return tevent_req_post(req, ev);
1608         }
1609
1610         state->ev = ev;
1611         state->client = client;
1612         state->destnode = ctdb_client_pnn(client);
1613
1614         h = talloc_zero(db, struct ctdb_transaction_handle);
1615         if (tevent_req_nomem(h, req)) {
1616                 return tevent_req_post(req, ev);
1617         }
1618
1619         h->client = client;
1620         h->db = db;
1621         h->readonly = readonly;
1622         h->updated = false;
1623
1624         /* SRVID is unique for databases, so client can have transactions active
1625          * for multiple databases */
1626         h->sid.pid = getpid();
1627         h->sid.task_id = db->db_id;
1628         h->sid.vnn = state->destnode;
1629         h->sid.unique_id = h->sid.task_id;
1630         h->sid.unique_id = (h->sid.unique_id << 32) | h->sid.pid;
1631
1632         h->recbuf = talloc_zero(h, struct ctdb_rec_buffer);
1633         if (tevent_req_nomem(h->recbuf, req)) {
1634                 return tevent_req_post(req, ev);
1635         }
1636
1637         h->lock_name = talloc_asprintf(h, "transaction_db_0x%08x", db->db_id);
1638         if (tevent_req_nomem(h->lock_name, req)) {
1639                 return tevent_req_post(req, ev);
1640         }
1641
1642         state->h = h;
1643
1644         subreq = ctdb_attach_send(state, ev, client, timeout, "g_lock.tdb", 0);
1645         if (tevent_req_nomem(subreq, req)) {
1646                 return tevent_req_post(req, ev);
1647         }
1648         tevent_req_set_callback(subreq, ctdb_transaction_g_lock_attached, req);
1649
1650         return req;
1651 }
1652
1653 static void ctdb_transaction_g_lock_attached(struct tevent_req *subreq)
1654 {
1655         struct tevent_req *req = tevent_req_callback_data(
1656                 subreq, struct tevent_req);
1657         struct ctdb_transaction_start_state *state = tevent_req_data(
1658                 req, struct ctdb_transaction_start_state);
1659         struct ctdb_req_control request;
1660         bool status;
1661         int ret;
1662
1663         status = ctdb_attach_recv(subreq, &ret, &state->h->db_g_lock);
1664         TALLOC_FREE(subreq);
1665         if (! status) {
1666                 tevent_req_error(req, ret);
1667                 return;
1668         }
1669
1670         ctdb_req_control_register_srvid(&request, state->h->sid.unique_id);
1671         subreq = ctdb_client_control_send(state, state->ev, state->client,
1672                                           state->destnode, state->timeout,
1673                                           &request);
1674         if (tevent_req_nomem(subreq, req)) {
1675                 return;
1676         }
1677         tevent_req_set_callback(subreq, ctdb_transaction_register_done, req);
1678 }
1679
1680 static void ctdb_transaction_register_done(struct tevent_req *subreq)
1681 {
1682         struct tevent_req *req = tevent_req_callback_data(
1683                 subreq, struct tevent_req);
1684         struct ctdb_transaction_start_state *state = tevent_req_data(
1685                 req, struct ctdb_transaction_start_state);
1686         struct ctdb_reply_control *reply;
1687         bool status;
1688         int ret;
1689
1690         status = ctdb_client_control_recv(subreq, &ret, state, &reply);
1691         TALLOC_FREE(subreq);
1692         if (! status) {
1693                 tevent_req_error(req, ret);
1694                 return;
1695         }
1696
1697         ret = ctdb_reply_control_register_srvid(reply);
1698         talloc_free(reply);
1699         if (ret != 0) {
1700                 tevent_req_error(req, ret);
1701                 return;
1702         }
1703
1704         subreq = ctdb_g_lock_lock_send(state, state->ev, state->client,
1705                                        state->h->db_g_lock, state->h->lock_name,
1706                                        &state->h->sid, state->h->readonly);
1707         if (tevent_req_nomem(subreq, req)) {
1708                 return;
1709         }
1710         tevent_req_set_callback(subreq, ctdb_transaction_g_lock_done, req);
1711 }
1712
1713 static void ctdb_transaction_g_lock_done(struct tevent_req *subreq)
1714 {
1715         struct tevent_req *req = tevent_req_callback_data(
1716                 subreq, struct tevent_req);
1717         int ret;
1718         bool status;
1719
1720         status = ctdb_g_lock_lock_recv(subreq, &ret);
1721         TALLOC_FREE(subreq);
1722         if (! status) {
1723                 tevent_req_error(req, ret);
1724                 return;
1725         }
1726
1727         tevent_req_done(req);
1728 }
1729
1730 struct ctdb_transaction_handle *ctdb_transaction_start_recv(
1731                                         struct tevent_req *req,
1732                                         int *perr)
1733 {
1734         struct ctdb_transaction_start_state *state = tevent_req_data(
1735                 req, struct ctdb_transaction_start_state);
1736         struct ctdb_transaction_handle *h = state->h;
1737         int err;
1738
1739         if (tevent_req_is_unix_error(req, &err)) {
1740                 if (perr != NULL) {
1741                         *perr = err;
1742                 }
1743                 return NULL;
1744         }
1745
1746         talloc_set_destructor(h, ctdb_transaction_handle_destructor);
1747         return h;
1748 }
1749
1750 static int ctdb_transaction_handle_destructor(struct ctdb_transaction_handle *h)
1751 {
1752         int ret;
1753
1754         ret = ctdb_ctrl_deregister_srvid(h, h->ev, h->client, h->client->pnn,
1755                                          tevent_timeval_zero(),
1756                                          h->sid.unique_id);
1757         if (ret != 0) {
1758                 DEBUG(DEBUG_WARNING, ("Failed to deregister SRVID\n"));
1759         }
1760
1761         return 0;
1762 }
1763
1764 int ctdb_transaction_start(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
1765                            struct ctdb_client_context *client,
1766                            struct timeval timeout,
1767                            struct ctdb_db_context *db, bool readonly,
1768                            struct ctdb_transaction_handle **out)
1769 {
1770         struct tevent_req *req;
1771         struct ctdb_transaction_handle *h;
1772         int ret;
1773
1774         req = ctdb_transaction_start_send(mem_ctx, ev, client, timeout, db,
1775                                           readonly);
1776         if (req == NULL) {
1777                 return ENOMEM;
1778         }
1779
1780         tevent_req_poll(req, ev);
1781
1782         h = ctdb_transaction_start_recv(req, &ret);
1783         if (h == NULL) {
1784                 return ret;
1785         }
1786
1787         *out = h;
1788         return 0;
1789 }
1790
1791 struct ctdb_transaction_record_fetch_state {
1792         TDB_DATA key, data;
1793         struct ctdb_ltdb_header header;
1794         bool found;
1795 };
1796
1797 static int ctdb_transaction_record_fetch_traverse(uint32_t reqid,
1798                                                   struct ctdb_ltdb_header *header,
1799                                                   TDB_DATA key,
1800                                                   TDB_DATA data,
1801                                                   void *private_data)
1802 {
1803         struct ctdb_transaction_record_fetch_state *state =
1804                 (struct ctdb_transaction_record_fetch_state *)private_data;
1805
1806         if (state->key.dsize == key.dsize &&
1807             memcmp(state->key.dptr, key.dptr, key.dsize) == 0) {
1808                 state->data = data;
1809                 state->header = *header;
1810                 state->found = true;
1811         }
1812
1813         return 0;
1814 }
1815
1816 static int ctdb_transaction_record_fetch(struct ctdb_transaction_handle *h,
1817                                          TDB_DATA key,
1818                                          struct ctdb_ltdb_header *header,
1819                                          TDB_DATA *data)
1820 {
1821         struct ctdb_transaction_record_fetch_state state;
1822         int ret;
1823
1824         state.key = key;
1825         state.found = false;
1826
1827         ret = ctdb_rec_buffer_traverse(h->recbuf,
1828                                        ctdb_transaction_record_fetch_traverse,
1829                                        &state);
1830         if (ret != 0) {
1831                 return ret;
1832         }
1833
1834         if (state.found) {
1835                 if (header != NULL) {
1836                         *header = state.header;
1837                 }
1838                 if (data != NULL) {
1839                         *data = state.data;
1840                 }
1841                 return 0;
1842         }
1843
1844         return ENOENT;
1845 }
1846
1847 int ctdb_transaction_fetch_record(struct ctdb_transaction_handle *h,
1848                                   TDB_DATA key,
1849                                   TALLOC_CTX *mem_ctx, TDB_DATA *data)
1850 {
1851         TDB_DATA tmp_data;
1852         struct ctdb_ltdb_header header;
1853         int ret;
1854
1855         ret = ctdb_transaction_record_fetch(h, key, NULL, &tmp_data);
1856         if (ret == 0) {
1857                 data->dptr = talloc_memdup(mem_ctx, tmp_data.dptr,
1858                                            tmp_data.dsize);
1859                 if (data->dptr == NULL) {
1860                         return ENOMEM;
1861                 }
1862                 data->dsize = tmp_data.dsize;
1863                 return 0;
1864         }
1865
1866         ret = ctdb_ltdb_fetch(h->db, key, &header, mem_ctx, data);
1867         if (ret != 0) {
1868                 return ret;
1869         }
1870
1871         ret = ctdb_rec_buffer_add(h, h->recbuf, 0, &header, key, *data);
1872         if (ret != 0) {
1873                 return ret;
1874         }
1875
1876         return 0;
1877 }
1878
1879 int ctdb_transaction_store_record(struct ctdb_transaction_handle *h,
1880                                   TDB_DATA key, TDB_DATA data)
1881 {
1882         TALLOC_CTX *tmp_ctx;
1883         struct ctdb_ltdb_header header;
1884         TDB_DATA old_data;
1885         int ret;
1886
1887         if (h->readonly) {
1888                 return EINVAL;
1889         }
1890
1891         tmp_ctx = talloc_new(h);
1892         if (tmp_ctx == NULL) {
1893                 return ENOMEM;
1894         }
1895
1896         ret = ctdb_transaction_record_fetch(h, key, &header, &old_data);
1897         if (ret != 0) {
1898                 ret = ctdb_ltdb_fetch(h->db, key, &header, tmp_ctx, &old_data);
1899                 if (ret != 0) {
1900                         return ret;
1901                 }
1902         }
1903
1904         if (old_data.dsize == data.dsize &&
1905             memcmp(old_data.dptr, data.dptr, data.dsize) == 0) {
1906                 talloc_free(tmp_ctx);
1907                 return 0;
1908         }
1909
1910         header.dmaster = ctdb_client_pnn(h->client);
1911         header.rsn += 1;
1912
1913         ret = ctdb_rec_buffer_add(h, h->recbuf, 0, &header, key, data);
1914         talloc_free(tmp_ctx);
1915         if (ret != 0) {
1916                 return ret;
1917         }
1918         h->updated = true;
1919
1920         return 0;
1921 }
1922
1923 int ctdb_transaction_delete_record(struct ctdb_transaction_handle *h,
1924                                    TDB_DATA key)
1925 {
1926         return ctdb_transaction_store_record(h, key, tdb_null);
1927 }
1928
1929 static int ctdb_transaction_store_db_seqnum(struct ctdb_transaction_handle *h,
1930                                             uint64_t seqnum)
1931 {
1932         const char *keyname = CTDB_DB_SEQNUM_KEY;
1933         TDB_DATA key, data;
1934
1935         key.dptr = discard_const(keyname);
1936         key.dsize = strlen(keyname) + 1;
1937
1938         data.dptr = (uint8_t *)&seqnum;
1939         data.dsize = sizeof(seqnum);
1940
1941         return ctdb_transaction_store_record(h, key, data);
1942 }
1943
1944 struct ctdb_transaction_commit_state {
1945         struct tevent_context *ev;
1946         struct ctdb_transaction_handle *h;
1947         uint64_t seqnum;
1948 };
1949
1950 static void ctdb_transaction_commit_done(struct tevent_req *subreq);
1951 static void ctdb_transaction_commit_try(struct tevent_req *subreq);
1952
1953 struct tevent_req *ctdb_transaction_commit_send(
1954                                         TALLOC_CTX *mem_ctx,
1955                                         struct tevent_context *ev,
1956                                         struct ctdb_transaction_handle *h)
1957 {
1958         struct tevent_req *req, *subreq;
1959         struct ctdb_transaction_commit_state *state;
1960         int ret;
1961
1962         req = tevent_req_create(mem_ctx, &state,
1963                                 struct ctdb_transaction_commit_state);
1964         if (req == NULL) {
1965                 return NULL;
1966         }
1967
1968         state->ev = ev;
1969         state->h = h;
1970
1971         ret = ctdb_ctrl_get_db_seqnum(state, ev, h->client,
1972                                       h->client->pnn, tevent_timeval_zero(),
1973                                       h->db->db_id, &state->seqnum);
1974         if (ret != 0) {
1975                 tevent_req_error(req, ret);
1976                 return tevent_req_post(req, ev);
1977         }
1978
1979         ret = ctdb_transaction_store_db_seqnum(h, state->seqnum+1);
1980         if (ret != 0) {
1981                 tevent_req_error(req, ret);
1982                 return tevent_req_post(req, ev);
1983         }
1984
1985         subreq = ctdb_recovery_wait_send(state, ev, h->client);
1986         if (tevent_req_nomem(subreq, req)) {
1987                 return tevent_req_post(req, ev);
1988         }
1989         tevent_req_set_callback(subreq, ctdb_transaction_commit_try, req);
1990
1991         return req;
1992 }
1993
1994 static void ctdb_transaction_commit_try(struct tevent_req *subreq)
1995 {
1996         struct tevent_req *req = tevent_req_callback_data(
1997                 subreq, struct tevent_req);
1998         struct ctdb_transaction_commit_state *state = tevent_req_data(
1999                 req, struct ctdb_transaction_commit_state);
2000         struct ctdb_req_control request;
2001         int ret;
2002         bool status;
2003
2004         status = ctdb_recovery_wait_recv(subreq, &ret);
2005         TALLOC_FREE(subreq);
2006         if (! status) {
2007                 tevent_req_error(req, ret);
2008                 return;
2009         }
2010
2011         ctdb_req_control_trans3_commit(&request, state->h->recbuf);
2012         subreq = ctdb_client_control_send(state, state->ev, state->h->client,
2013                                           state->h->client->pnn,
2014                                           tevent_timeval_zero(), &request);
2015         if (tevent_req_nomem(subreq, req)) {
2016                 return;
2017         }
2018         tevent_req_set_callback(subreq, ctdb_transaction_commit_done, req);
2019 }
2020
2021 static void ctdb_transaction_commit_done(struct tevent_req *subreq)
2022 {
2023         struct tevent_req *req = tevent_req_callback_data(
2024                 subreq, struct tevent_req);
2025         struct ctdb_transaction_commit_state *state = tevent_req_data(
2026                 req, struct ctdb_transaction_commit_state);
2027         struct ctdb_reply_control *reply;
2028         uint64_t seqnum;
2029         int ret;
2030         bool status;
2031
2032         status = ctdb_client_control_recv(subreq, &ret, state, &reply);
2033         TALLOC_FREE(subreq);
2034         if (! status) {
2035                 tevent_req_error(req, ret);
2036                 return;
2037         }
2038
2039         ret = ctdb_reply_control_trans3_commit(reply);
2040         if (ret < 0) {
2041                 /* Control failed due to recovery */
2042                 subreq = ctdb_recovery_wait_send(state, state->ev,
2043                                                  state->h->client);
2044                 if (tevent_req_nomem(subreq, req)) {
2045                         return;
2046                 }
2047                 tevent_req_set_callback(subreq, ctdb_transaction_commit_try,
2048                                         req);
2049                 return;
2050         }
2051
2052         ret = ctdb_ctrl_get_db_seqnum(state, state->ev, state->h->client,
2053                                       state->h->client->pnn,
2054                                       tevent_timeval_zero(),
2055                                       state->h->db->db_id, &seqnum);
2056         if (ret != 0) {
2057                 tevent_req_error(req, ret);
2058                 return;
2059         }
2060
2061         if (seqnum == state->seqnum) {
2062                 subreq = ctdb_recovery_wait_send(state, state->ev,
2063                                                  state->h->client);
2064                 if (tevent_req_nomem(subreq, req)) {
2065                         return;
2066                 }
2067                 tevent_req_set_callback(subreq, ctdb_transaction_commit_try,
2068                                         req);
2069                 return;
2070         }
2071
2072         if (seqnum != state->seqnum + 1) {
2073                 tevent_req_error(req, EIO);
2074                 return;
2075         }
2076
2077         tevent_req_done(req);
2078 }
2079
2080 bool ctdb_transaction_commit_recv(struct tevent_req *req, int *perr)
2081 {
2082         struct ctdb_transaction_commit_state *state = tevent_req_data(
2083                 req, struct ctdb_transaction_commit_state);
2084         int err;
2085
2086         if (tevent_req_is_unix_error(req, &err)) {
2087                 if (perr != NULL) {
2088                         *perr = err;
2089                 }
2090                 TALLOC_FREE(state->h);
2091                 return false;
2092         }
2093
2094         TALLOC_FREE(state->h);
2095         return true;
2096 }
2097
2098 int ctdb_transaction_commit(struct ctdb_transaction_handle *h)
2099 {
2100         struct tevent_req *req;
2101         int ret;
2102         bool status;
2103
2104         if (h->readonly || ! h->updated) {
2105                 talloc_free(h);
2106                 return 0;
2107         }
2108
2109         req = ctdb_transaction_commit_send(h, h->ev, h);
2110         if (req == NULL) {
2111                 talloc_free(h);
2112                 return ENOMEM;
2113         }
2114
2115         tevent_req_poll(req, h->ev);
2116
2117         status = ctdb_transaction_commit_recv(req, &ret);
2118         if (! status) {
2119                 talloc_free(h);
2120                 return ret;
2121         }
2122
2123         talloc_free(h);
2124         return 0;
2125 }
2126
2127 int ctdb_transaction_cancel(struct ctdb_transaction_handle *h)
2128 {
2129         talloc_free(h);
2130         return 0;
2131 }
2132
2133 /*
2134  * TODO:
2135  *
2136  * In future Samba should register SERVER_ID.
2137  * Make that structure same as struct srvid {}.
2138  */