ctdb-protocol: Rename G_LOCK_READ/WRITE to CTDB_G_LOCK_READ/WRITE
[obnox/samba/samba-obnox.git] / ctdb / client / client_db.c
1 /*
2    CTDB client code
3
4    Copyright (C) Amitay Isaacs  2015
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "replace.h"
21 #include "system/network.h"
22 #include "system/filesys.h"
23
24 #include <talloc.h>
25 #include <tevent.h>
26 #include <tdb.h>
27
28 #include "lib/tdb_wrap/tdb_wrap.h"
29 #include "lib/util/tevent_unix.h"
30 #include "lib/util/dlinklist.h"
31 #include "lib/util/debug.h"
32 #include "ctdb_logging.h"
33
34 #include "protocol/protocol.h"
35 #include "protocol/protocol_api.h"
36 #include "client/client_private.h"
37 #include "client/client.h"
38
39 static struct ctdb_db_context *client_db_handle(
40                                         struct ctdb_client_context *client,
41                                         const char *db_name)
42 {
43         struct ctdb_db_context *db;
44
45         for (db = client->db; db != NULL; db = db->next) {
46                 if (strcmp(db_name, db->db_name) == 0) {
47                         return db;
48                 }
49         }
50
51         return NULL;
52 }
53
54 struct ctdb_set_db_flags_state {
55         struct tevent_context *ev;
56         struct ctdb_client_context *client;
57         struct timeval timeout;
58         uint32_t db_id;
59         uint8_t db_flags;
60         bool readonly_done, sticky_done;
61         uint32_t *pnn_list;
62         int count;
63 };
64
65 static void ctdb_set_db_flags_nodemap_done(struct tevent_req *subreq);
66 static void ctdb_set_db_flags_readonly_done(struct tevent_req *subreq);
67 static void ctdb_set_db_flags_sticky_done(struct tevent_req *subreq);
68
69 static struct tevent_req *ctdb_set_db_flags_send(
70                                 TALLOC_CTX *mem_ctx,
71                                 struct tevent_context *ev,
72                                 struct ctdb_client_context *client,
73                                 uint32_t destnode, struct timeval timeout,
74                                 uint32_t db_id, uint8_t db_flags)
75 {
76         struct tevent_req *req, *subreq;
77         struct ctdb_set_db_flags_state *state;
78         struct ctdb_req_control request;
79
80         req = tevent_req_create(mem_ctx, &state,
81                                 struct ctdb_set_db_flags_state);
82         if (req == NULL) {
83                 return NULL;
84         }
85
86         if (! (db_flags & (CTDB_DB_FLAGS_READONLY | CTDB_DB_FLAGS_STICKY))) {
87                 tevent_req_done(req);
88                 return tevent_req_post(req, ev);
89         }
90
91         state->ev = ev;
92         state->client = client;
93         state->timeout = timeout;
94         state->db_id = db_id;
95         state->db_flags = db_flags;
96
97         ctdb_req_control_get_nodemap(&request);
98         subreq = ctdb_client_control_send(state, ev, client, destnode, timeout,
99                                           &request);
100         if (tevent_req_nomem(subreq, req)) {
101                 return tevent_req_post(req, ev);
102         }
103         tevent_req_set_callback(subreq, ctdb_set_db_flags_nodemap_done, req);
104
105         return req;
106 }
107
108 static void ctdb_set_db_flags_nodemap_done(struct tevent_req *subreq)
109 {
110         struct tevent_req *req = tevent_req_callback_data(
111                 subreq, struct tevent_req);
112         struct ctdb_set_db_flags_state *state = tevent_req_data(
113                 req, struct ctdb_set_db_flags_state);
114         struct ctdb_req_control request;
115         struct ctdb_reply_control *reply;
116         struct ctdb_node_map *nodemap;
117         int ret;
118         bool status;
119
120         status = ctdb_client_control_recv(subreq, &ret, state, &reply);
121         TALLOC_FREE(subreq);
122         if (! status) {
123                 tevent_req_error(req, ret);
124                 return;
125         }
126
127         ret = ctdb_reply_control_get_nodemap(reply, state, &nodemap);
128         talloc_free(reply);
129         if (ret != 0) {
130                 tevent_req_error(req, ret);
131                 return;
132         }
133
134         state->count = list_of_connected_nodes(nodemap, CTDB_UNKNOWN_PNN,
135                                                state, &state->pnn_list);
136         talloc_free(nodemap);
137         if (state->count <= 0) {
138                 tevent_req_error(req, ENOMEM);
139                 return;
140         }
141
142         if (state->db_flags & CTDB_DB_FLAGS_READONLY) {
143                 ctdb_req_control_set_db_readonly(&request, state->db_id);
144                 subreq = ctdb_client_control_multi_send(
145                                         state, state->ev, state->client,
146                                         state->pnn_list, state->count,
147                                         state->timeout, &request);
148                 if (tevent_req_nomem(subreq, req)) {
149                         return;
150                 }
151                 tevent_req_set_callback(subreq,
152                                         ctdb_set_db_flags_readonly_done, req);
153         } else {
154                 state->readonly_done = true;
155         }
156
157         if (state->db_flags & CTDB_DB_FLAGS_STICKY) {
158                 ctdb_req_control_set_db_sticky(&request, state->db_id);
159                 subreq = ctdb_client_control_multi_send(
160                                         state, state->ev, state->client,
161                                         state->pnn_list, state->count,
162                                         state->timeout, &request);
163                 if (tevent_req_nomem(subreq, req)) {
164                         return;
165                 }
166                 tevent_req_set_callback(subreq, ctdb_set_db_flags_sticky_done,
167                                         req);
168         } else {
169                 state->sticky_done = true;
170         }
171 }
172
173 static void ctdb_set_db_flags_readonly_done(struct tevent_req *subreq)
174 {
175         struct tevent_req *req = tevent_req_callback_data(
176                 subreq, struct tevent_req);
177         struct ctdb_set_db_flags_state *state = tevent_req_data(
178                 req, struct ctdb_set_db_flags_state);
179         int ret;
180         bool status;
181
182         status = ctdb_client_control_multi_recv(subreq, &ret, NULL, NULL,
183                                                 NULL);
184         TALLOC_FREE(subreq);
185         if (! status) {
186                 tevent_req_error(req, ret);
187                 return;
188         }
189
190         state->readonly_done = true;
191
192         if (state->readonly_done && state->sticky_done) {
193                 tevent_req_done(req);
194         }
195 }
196
197 static void ctdb_set_db_flags_sticky_done(struct tevent_req *subreq)
198 {
199         struct tevent_req *req = tevent_req_callback_data(
200                 subreq, struct tevent_req);
201         struct ctdb_set_db_flags_state *state = tevent_req_data(
202                 req, struct ctdb_set_db_flags_state);
203         int ret;
204         bool status;
205
206         status = ctdb_client_control_multi_recv(subreq, &ret, NULL, NULL,
207                                                 NULL);
208         TALLOC_FREE(subreq);
209         if (! status) {
210                 tevent_req_error(req, ret);
211                 return;
212         }
213
214         state->sticky_done = true;
215
216         if (state->readonly_done && state->sticky_done) {
217                 tevent_req_done(req);
218         }
219 }
220
221 static bool ctdb_set_db_flags_recv(struct tevent_req *req, int *perr)
222 {
223         int err;
224
225         if (tevent_req_is_unix_error(req, &err)) {
226                 if (perr != NULL) {
227                         *perr = err;
228                 }
229                 return false;
230         }
231         return true;
232 }
233
234 struct ctdb_attach_state {
235         struct tevent_context *ev;
236         struct ctdb_client_context *client;
237         struct timeval timeout;
238         uint32_t destnode;
239         uint8_t db_flags;
240         uint32_t tdb_flags;
241         struct ctdb_db_context *db;
242 };
243
244 static void ctdb_attach_mutex_done(struct tevent_req *subreq);
245 static void ctdb_attach_dbid_done(struct tevent_req *subreq);
246 static void ctdb_attach_dbpath_done(struct tevent_req *subreq);
247 static void ctdb_attach_health_done(struct tevent_req *subreq);
248 static void ctdb_attach_flags_done(struct tevent_req *subreq);
249
250 struct tevent_req *ctdb_attach_send(TALLOC_CTX *mem_ctx,
251                                     struct tevent_context *ev,
252                                     struct ctdb_client_context *client,
253                                     struct timeval timeout,
254                                     const char *db_name, uint8_t db_flags)
255 {
256         struct tevent_req *req, *subreq;
257         struct ctdb_attach_state *state;
258         struct ctdb_req_control request;
259
260         req = tevent_req_create(mem_ctx, &state, struct ctdb_attach_state);
261         if (req == NULL) {
262                 return NULL;
263         }
264
265         state->db = client_db_handle(client, db_name);
266         if (state->db != NULL) {
267                 tevent_req_done(req);
268                 return tevent_req_post(req, ev);
269         }
270
271         state->ev = ev;
272         state->client = client;
273         state->timeout = timeout;
274         state->destnode = ctdb_client_pnn(client);
275         state->db_flags = db_flags;
276
277         state->db = talloc_zero(client, struct ctdb_db_context);
278         if (tevent_req_nomem(state->db, req)) {
279                 return tevent_req_post(req, ev);
280         }
281
282         state->db->db_name = talloc_strdup(state->db, db_name);
283         if (tevent_req_nomem(state->db, req)) {
284                 return tevent_req_post(req, ev);
285         }
286
287         if (db_flags & CTDB_DB_FLAGS_PERSISTENT) {
288                 state->db->persistent = true;
289         }
290
291         ctdb_req_control_get_tunable(&request, "TDBMutexEnabled");
292         subreq = ctdb_client_control_send(state, ev, client,
293                                           ctdb_client_pnn(client), timeout,
294                                           &request);
295         if (tevent_req_nomem(subreq, req)) {
296                 return tevent_req_post(req, ev);
297         }
298         tevent_req_set_callback(subreq, ctdb_attach_mutex_done, req);
299
300         return req;
301 }
302
303 static void ctdb_attach_mutex_done(struct tevent_req *subreq)
304 {
305         struct tevent_req *req = tevent_req_callback_data(
306                 subreq, struct tevent_req);
307         struct ctdb_attach_state *state = tevent_req_data(
308                 req, struct ctdb_attach_state);
309         struct ctdb_reply_control *reply;
310         struct ctdb_req_control request;
311         uint32_t mutex_enabled;
312         int ret;
313         bool status;
314
315         status = ctdb_client_control_recv(subreq, &ret, state, &reply);
316         TALLOC_FREE(subreq);
317         if (! status) {
318                 tevent_req_error(req, ret);
319                 return;
320         }
321
322         ret = ctdb_reply_control_get_tunable(reply, &mutex_enabled);
323         if (ret != 0) {
324                 /* Treat error as mutex support not available */
325                 mutex_enabled = 0;
326         }
327
328         state->tdb_flags = TDB_DEFAULT;
329         if (! state->db->persistent) {
330                 state->tdb_flags |= (TDB_INCOMPATIBLE_HASH |
331                                      TDB_CLEAR_IF_FIRST);
332         }
333         if (mutex_enabled == 1) {
334                 state->tdb_flags |= TDB_MUTEX_LOCKING;
335         }
336
337         if (state->db->persistent) {
338                 ctdb_req_control_db_attach_persistent(&request,
339                                                       state->db->db_name,
340                                                       state->tdb_flags);
341         } else {
342                 ctdb_req_control_db_attach(&request, state->db->db_name,
343                                            state->tdb_flags);
344         }
345
346         subreq = ctdb_client_control_send(state, state->ev, state->client,
347                                           state->destnode, state->timeout,
348                                           &request);
349         if (tevent_req_nomem(subreq, req)) {
350                 return;
351         }
352         tevent_req_set_callback(subreq, ctdb_attach_dbid_done, req);
353 }
354
355 static void ctdb_attach_dbid_done(struct tevent_req *subreq)
356 {
357         struct tevent_req *req = tevent_req_callback_data(
358                 subreq, struct tevent_req);
359         struct ctdb_attach_state *state = tevent_req_data(
360                 req, struct ctdb_attach_state);
361         struct ctdb_req_control request;
362         struct ctdb_reply_control *reply;
363         bool status;
364         int ret;
365
366         status = ctdb_client_control_recv(subreq, &ret, state, &reply);
367         TALLOC_FREE(subreq);
368         if (! status) {
369                 tevent_req_error(req, ret);
370                 return;
371         }
372
373         if (state->db->persistent) {
374                 ret = ctdb_reply_control_db_attach_persistent(
375                                 reply, &state->db->db_id);
376         } else {
377                 ret = ctdb_reply_control_db_attach(reply, &state->db->db_id);
378         }
379         talloc_free(reply);
380         if (ret != 0) {
381                 tevent_req_error(req, ret);
382                 return;
383         }
384
385         ctdb_req_control_getdbpath(&request, state->db->db_id);
386         subreq = ctdb_client_control_send(state, state->ev, state->client,
387                                           state->destnode, state->timeout,
388                                           &request);
389         if (tevent_req_nomem(subreq, req)) {
390                 return;
391         }
392         tevent_req_set_callback(subreq, ctdb_attach_dbpath_done, req);
393 }
394
395 static void ctdb_attach_dbpath_done(struct tevent_req *subreq)
396 {
397         struct tevent_req *req = tevent_req_callback_data(
398                 subreq, struct tevent_req);
399         struct ctdb_attach_state *state = tevent_req_data(
400                 req, struct ctdb_attach_state);
401         struct ctdb_reply_control *reply;
402         struct ctdb_req_control request;
403         bool status;
404         int ret;
405
406         status = ctdb_client_control_recv(subreq, &ret, state, &reply);
407         TALLOC_FREE(subreq);
408         if (! status) {
409                 tevent_req_error(req, ret);
410                 return;
411         }
412
413         ret = ctdb_reply_control_getdbpath(reply, state->db,
414                                            &state->db->db_path);
415         talloc_free(reply);
416         if (ret != 0) {
417                 tevent_req_error(req, ret);
418                 return;
419         }
420
421         ctdb_req_control_db_get_health(&request, state->db->db_id);
422         subreq = ctdb_client_control_send(state, state->ev, state->client,
423                                           state->destnode, state->timeout,
424                                           &request);
425         if (tevent_req_nomem(subreq, req)) {
426                 return;
427         }
428         tevent_req_set_callback(subreq, ctdb_attach_health_done, req);
429 }
430
431 static void ctdb_attach_health_done(struct tevent_req *subreq)
432 {
433         struct tevent_req *req = tevent_req_callback_data(
434                 subreq, struct tevent_req);
435         struct ctdb_attach_state *state = tevent_req_data(
436                 req, struct ctdb_attach_state);
437         struct ctdb_reply_control *reply;
438         const char *reason;
439         bool status;
440         int ret;
441
442         status = ctdb_client_control_recv(subreq, &ret, state, &reply);
443         TALLOC_FREE(subreq);
444         if (! status) {
445                 tevent_req_error(req, ret);
446                 return;
447         }
448
449         ret = ctdb_reply_control_db_get_health(reply, state, &reason);
450         if (ret != 0) {
451                 tevent_req_error(req, ret);
452                 return;
453         }
454
455         if (reason != NULL) {
456                 /* Database unhealthy, avoid attach */
457                 /* FIXME: Log here */
458                 tevent_req_error(req, EIO);
459                 return;
460         }
461
462         subreq = ctdb_set_db_flags_send(state, state->ev, state->client,
463                                         state->destnode, state->timeout,
464                                         state->db->db_id, state->db_flags);
465         if (tevent_req_nomem(subreq, req)) {
466                 return;
467         }
468         tevent_req_set_callback(subreq, ctdb_attach_flags_done, req);
469 }
470
471 static void ctdb_attach_flags_done(struct tevent_req *subreq)
472 {
473         struct tevent_req *req = tevent_req_callback_data(
474                 subreq, struct tevent_req);
475         struct ctdb_attach_state *state = tevent_req_data(
476                 req, struct ctdb_attach_state);
477         bool status;
478         int ret;
479
480         status = ctdb_set_db_flags_recv(subreq, &ret);
481         TALLOC_FREE(subreq);
482         if (! status) {
483                 tevent_req_error(req, ret);
484                 return;
485         }
486
487         state->db->ltdb = tdb_wrap_open(state->db, state->db->db_path, 0,
488                                         state->tdb_flags, O_RDWR, 0);
489         if (tevent_req_nomem(state->db->ltdb, req)) {
490                 return;
491         }
492         DLIST_ADD(state->client->db, state->db);
493
494         tevent_req_done(req);
495 }
496
497 bool ctdb_attach_recv(struct tevent_req *req, int *perr,
498                       struct ctdb_db_context **out)
499 {
500         struct ctdb_attach_state *state = tevent_req_data(
501                 req, struct ctdb_attach_state);
502         int err;
503
504         if (tevent_req_is_unix_error(req, &err)) {
505                 if (perr != NULL) {
506                         *perr = err;
507                 }
508                 return false;
509         }
510
511         if (out != NULL) {
512                 *out = state->db;
513         }
514         return true;
515 }
516
517 int ctdb_attach(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
518                 struct ctdb_client_context *client,
519                 struct timeval timeout,
520                 const char *db_name, uint8_t db_flags,
521                 struct ctdb_db_context **out)
522 {
523         struct tevent_req *req;
524         bool status;
525         int ret;
526
527         req = ctdb_attach_send(mem_ctx, ev, client, timeout,
528                                db_name, db_flags);
529         if (req == NULL) {
530                 return ENOMEM;
531         }
532
533         tevent_req_poll(req, ev);
534
535         status = ctdb_attach_recv(req, &ret, out);
536         if (! status) {
537                 return ret;
538         }
539
540         /*
541         ctdb_set_call(db, CTDB_NULL_FUNC, ctdb_null_func);
542         ctdb_set_call(db, CTDB_FETCH_FUNC, ctdb_fetch_func);
543         ctdb_set_call(db, CTDB_FETCH_WITH_HEADER_FUNC, ctdb_fetch_with_header_func);
544         */
545
546         return 0;
547 }
548
549 int ctdb_detach(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
550                 struct ctdb_client_context *client,
551                 struct timeval timeout, uint32_t db_id)
552 {
553         struct ctdb_db_context *db;
554         int ret;
555
556         ret = ctdb_ctrl_db_detach(mem_ctx, ev, client, client->pnn, timeout,
557                                   db_id);
558         if (ret != 0) {
559                 return ret;
560         }
561
562         for (db = client->db; db != NULL; db = db->next) {
563                 if (db->db_id == db_id) {
564                         DLIST_REMOVE(client->db, db);
565                         break;
566                 }
567         }
568
569         return 0;
570 }
571
572 uint32_t ctdb_db_id(struct ctdb_db_context *db)
573 {
574         return db->db_id;
575 }
576
577 struct ctdb_db_traverse_state {
578         ctdb_rec_parser_func_t parser;
579         void *private_data;
580         bool extract_header;
581         int error;
582 };
583
584 static int ctdb_db_traverse_handler(struct tdb_context *tdb, TDB_DATA key,
585                                     TDB_DATA data, void *private_data)
586 {
587         struct ctdb_db_traverse_state *state =
588                 (struct ctdb_db_traverse_state *)private_data;
589         int ret;
590
591         if (state->extract_header) {
592                 struct ctdb_ltdb_header header;
593                 size_t len;
594
595                 ret = ctdb_ltdb_header_pull(data.dptr, data.dsize, &header);
596                 if (ret != 0) {
597                         state->error = ret;
598                         return 1;
599                 }
600
601                 len = ctdb_ltdb_header_len(&header);
602
603                 data.dptr += len;
604                 data.dsize -= len;
605
606                 ret = state->parser(0, &header, key, data, state->private_data);
607         } else {
608                 ret = state->parser(0, NULL, key, data, state->private_data);
609         }
610
611         if (ret != 0) {
612                 state->error = ret;
613                 return 1;
614         }
615
616         return 0;
617 }
618
619 int ctdb_db_traverse(struct ctdb_db_context *db, bool readonly,
620                      bool extract_header,
621                      ctdb_rec_parser_func_t parser, void *private_data)
622 {
623         struct ctdb_db_traverse_state state;
624         int ret;
625
626         state.parser = parser;
627         state.private_data = private_data;
628         state.extract_header = extract_header;
629         state.error = 0;
630
631         if (readonly) {
632                 ret = tdb_traverse_read(db->ltdb->tdb,
633                                         ctdb_db_traverse_handler, &state);
634         } else {
635                 ret = tdb_traverse(db->ltdb->tdb,
636                                    ctdb_db_traverse_handler, &state);
637         }
638
639         if (ret == -1) {
640                 return EIO;
641         }
642
643         return state.error;
644 }
645
646 static int ctdb_ltdb_fetch(struct ctdb_db_context *db, TDB_DATA key,
647                            struct ctdb_ltdb_header *header,
648                            TALLOC_CTX *mem_ctx, TDB_DATA *data)
649 {
650         TDB_DATA rec;
651         int ret;
652
653         rec = tdb_fetch(db->ltdb->tdb, key);
654         if (rec.dsize < sizeof(struct ctdb_ltdb_header)) {
655                 /* No record present */
656                 if (rec.dptr != NULL) {
657                         free(rec.dptr);
658                 }
659
660                 if (tdb_error(db->ltdb->tdb) != TDB_ERR_NOEXIST) {
661                         return EIO;
662                 }
663
664                 header->rsn = 0;
665                 header->dmaster = CTDB_UNKNOWN_PNN;
666                 header->flags = 0;
667
668                 if (data != NULL) {
669                         *data = tdb_null;
670                 }
671                 return 0;
672         }
673
674         ret = ctdb_ltdb_header_pull(rec.dptr, rec.dsize, header);
675         if (ret != 0) {
676                 return ret;
677         }
678
679         ret = 0;
680         if (data != NULL) {
681                 size_t offset = ctdb_ltdb_header_len(header);
682
683                 data->dsize = rec.dsize - offset;
684                 data->dptr = talloc_memdup(mem_ctx, rec.dptr + offset,
685                                            data->dsize);
686                 if (data->dptr == NULL) {
687                         ret = ENOMEM;
688                 }
689         }
690
691         free(rec.dptr);
692         return ret;
693 }
694
695 /*
696  * Fetch a record from volatile database
697  *
698  * Steps:
699  *  1. Get a lock on the hash chain
700  *  2. If the record does not exist, migrate the record
701  *  3. If readonly=true and delegations do not exist, migrate the record.
702  *  4. If readonly=false and delegations exist, migrate the record.
703  *  5. If the local node is not dmaster, migrate the record.
704  *  6. Return record
705  */
706
707 struct ctdb_fetch_lock_state {
708         struct tevent_context *ev;
709         struct ctdb_client_context *client;
710         struct ctdb_record_handle *h;
711         bool readonly;
712         uint32_t pnn;
713 };
714
715 static int ctdb_fetch_lock_check(struct tevent_req *req);
716 static void ctdb_fetch_lock_migrate(struct tevent_req *req);
717 static void ctdb_fetch_lock_migrate_done(struct tevent_req *subreq);
718
719 struct tevent_req *ctdb_fetch_lock_send(TALLOC_CTX *mem_ctx,
720                                         struct tevent_context *ev,
721                                         struct ctdb_client_context *client,
722                                         struct ctdb_db_context *db,
723                                         TDB_DATA key, bool readonly)
724 {
725         struct ctdb_fetch_lock_state *state;
726         struct tevent_req *req;
727         int ret;
728
729         req = tevent_req_create(mem_ctx, &state, struct ctdb_fetch_lock_state);
730         if (req == NULL) {
731                 return NULL;
732         }
733
734         state->ev = ev;
735         state->client = client;
736
737         state->h = talloc_zero(db, struct ctdb_record_handle);
738         if (tevent_req_nomem(state->h, req)) {
739                 return tevent_req_post(req, ev);
740         }
741         state->h->client = client;
742         state->h->db = db;
743         state->h->key.dptr = talloc_memdup(state->h, key.dptr, key.dsize);
744         if (tevent_req_nomem(state->h->key.dptr, req)) {
745                 return tevent_req_post(req, ev);
746         }
747         state->h->key.dsize = key.dsize;
748         state->h->readonly = false;
749
750         state->readonly = readonly;
751         state->pnn = ctdb_client_pnn(client);
752
753         /* Check that database is not persistent */
754         if (db->persistent) {
755                 tevent_req_error(req, EINVAL);
756                 return tevent_req_post(req, ev);
757         }
758
759         ret = ctdb_fetch_lock_check(req);
760         if (ret == 0) {
761                 tevent_req_done(req);
762                 return tevent_req_post(req, ev);
763         }
764         if (ret != EAGAIN) {
765                 tevent_req_error(req, ret);
766                 return tevent_req_post(req, ev);
767         }
768         return req;
769 }
770
771 static int ctdb_fetch_lock_check(struct tevent_req *req)
772 {
773         struct ctdb_fetch_lock_state *state = tevent_req_data(
774                 req, struct ctdb_fetch_lock_state);
775         struct ctdb_record_handle *h = state->h;
776         struct ctdb_ltdb_header header;
777         TDB_DATA data = tdb_null;
778         int ret, err = 0;
779         bool do_migrate = false;
780
781         ret = tdb_chainlock(state->h->db->ltdb->tdb, state->h->key);
782         if (ret != 0) {
783                 err = EIO;
784                 goto failed;
785         }
786
787         data = tdb_fetch(h->db->ltdb->tdb, h->key);
788         if (data.dptr == NULL) {
789                 if (tdb_error(h->db->ltdb->tdb) == TDB_ERR_NOEXIST) {
790                         goto migrate;
791                 } else {
792                         err = EIO;
793                         goto failed;
794                 }
795         }
796
797         /* Got the record */
798         ret = ctdb_ltdb_header_pull(data.dptr, data.dsize, &header);
799         if (ret != 0) {
800                 err = ret;
801                 goto failed;
802         }
803
804         if (! state->readonly) {
805                 /* Read/write access */
806                 if (header.dmaster == state->pnn &&
807                     header.flags & CTDB_REC_RO_HAVE_DELEGATIONS) {
808                         goto migrate;
809                 }
810
811                 if (header.dmaster != state->pnn) {
812                         goto migrate;
813                 }
814         } else {
815                 /* Readonly access */
816                 if (header.dmaster != state->pnn &&
817                     ! (header.flags & (CTDB_REC_RO_HAVE_READONLY |
818                                        CTDB_REC_RO_HAVE_DELEGATIONS))) {
819                         goto migrate;
820                 }
821         }
822
823         /* We are the dmaster or readonly delegation */
824         h->header = header;
825         h->data = data;
826         if (header.flags & (CTDB_REC_RO_HAVE_READONLY |
827                             CTDB_REC_RO_HAVE_DELEGATIONS)) {
828                 h->readonly = true;
829         }
830         return 0;
831
832 migrate:
833         do_migrate = true;
834         err = EAGAIN;
835
836 failed:
837         if (data.dptr != NULL) {
838                 free(data.dptr);
839         }
840         ret = tdb_chainunlock(h->db->ltdb->tdb, h->key);
841         if (ret != 0) {
842                 DEBUG(DEBUG_ERR, ("tdb_chainunlock failed on %s\n",
843                                   h->db->db_name));
844                 return EIO;
845         }
846
847         if (do_migrate) {
848                 ctdb_fetch_lock_migrate(req);
849         }
850         return err;
851 }
852
853 static void ctdb_fetch_lock_migrate(struct tevent_req *req)
854 {
855         struct ctdb_fetch_lock_state *state = tevent_req_data(
856                 req, struct ctdb_fetch_lock_state);
857         struct ctdb_req_call request;
858         struct tevent_req *subreq;
859
860         ZERO_STRUCT(request);
861         request.flags = CTDB_IMMEDIATE_MIGRATION;
862         if (state->readonly) {
863                 request.flags |= CTDB_WANT_READONLY;
864         }
865         request.db_id = state->h->db->db_id;
866         request.callid = CTDB_NULL_FUNC;
867         request.key = state->h->key;
868
869         subreq = ctdb_client_call_send(state, state->ev, state->client,
870                                        &request);
871         if (tevent_req_nomem(subreq, req)) {
872                 return;
873         }
874
875         tevent_req_set_callback(subreq, ctdb_fetch_lock_migrate_done, req);
876 }
877
878 static void ctdb_fetch_lock_migrate_done(struct tevent_req *subreq)
879 {
880         struct tevent_req *req = tevent_req_callback_data(
881                 subreq, struct tevent_req);
882         struct ctdb_fetch_lock_state *state = tevent_req_data(
883                 req, struct ctdb_fetch_lock_state);
884         struct ctdb_reply_call *reply;
885         int ret;
886         bool status;
887
888         status = ctdb_client_call_recv(subreq, state, &reply, &ret);
889         TALLOC_FREE(subreq);
890         if (! status) {
891                 tevent_req_error(req, ret);
892                 return;
893         }
894
895         if (reply->status != 0) {
896                 tevent_req_error(req, EIO);
897                 return;
898         }
899         talloc_free(reply);
900
901         ret = ctdb_fetch_lock_check(req);
902         if (ret != 0) {
903                 tevent_req_error(req, ret);
904                 return;
905         }
906
907         tevent_req_done(req);
908 }
909
910 static int ctdb_record_handle_destructor(struct ctdb_record_handle *h)
911 {
912         tdb_chainunlock(h->db->ltdb->tdb, h->key);
913         free(h->data.dptr);
914         return 0;
915 }
916
917 struct ctdb_record_handle *ctdb_fetch_lock_recv(struct tevent_req *req,
918                                                 struct ctdb_ltdb_header *header,
919                                                 TALLOC_CTX *mem_ctx,
920                                                 TDB_DATA *data, int *perr)
921 {
922         struct ctdb_fetch_lock_state *state = tevent_req_data(
923                 req, struct ctdb_fetch_lock_state);
924         struct ctdb_record_handle *h = state->h;
925         int err;
926
927         if (tevent_req_is_unix_error(req, &err)) {
928                 if (perr != NULL) {
929                         *perr = err;
930                 }
931                 return NULL;
932         }
933
934         if (header != NULL) {
935                 *header = h->header;
936         }
937         if (data != NULL) {
938                 size_t offset;
939
940                 offset = ctdb_ltdb_header_len(&h->header);
941
942                 data->dsize = h->data.dsize - offset;
943                 data->dptr = talloc_memdup(mem_ctx, h->data.dptr + offset,
944                                            data->dsize);
945                 if (data->dptr == NULL) {
946                         TALLOC_FREE(state->h);
947                         if (perr != NULL) {
948                                 *perr = ENOMEM;
949                         }
950                         return NULL;
951                 }
952         }
953
954         talloc_set_destructor(h, ctdb_record_handle_destructor);
955         return h;
956 }
957
958 int ctdb_fetch_lock(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
959                     struct ctdb_client_context *client,
960                     struct ctdb_db_context *db, TDB_DATA key, bool readonly,
961                     struct ctdb_record_handle **out,
962                     struct ctdb_ltdb_header *header, TDB_DATA *data)
963 {
964         struct tevent_req *req;
965         struct ctdb_record_handle *h;
966         int ret;
967
968         req = ctdb_fetch_lock_send(mem_ctx, ev, client, db, key, readonly);
969         if (req == NULL) {
970                 return ENOMEM;
971         }
972
973         tevent_req_poll(req, ev);
974
975         h = ctdb_fetch_lock_recv(req, header, mem_ctx, data, &ret);
976         if (h == NULL) {
977                 return ret;
978         }
979
980         *out = h;
981         return 0;
982 }
983
984 int ctdb_store_record(struct ctdb_record_handle *h, TDB_DATA data)
985 {
986         TDB_DATA rec;
987         size_t offset;
988         int ret;
989
990         /* Cannot modify the record if it was obtained as a readonly copy */
991         if (h->readonly) {
992                 return EINVAL;
993         }
994
995         /* Check if the new data is same */
996         if (h->data.dsize == data.dsize &&
997             memcmp(h->data.dptr, data.dptr, data.dsize) == 0) {
998                 /* No need to do anything */
999                 return 0;
1000         }
1001
1002         offset = ctdb_ltdb_header_len(&h->header);
1003         rec.dsize = offset + data.dsize;
1004         rec.dptr = talloc_size(h, rec.dsize);
1005         if (rec.dptr == NULL) {
1006                 return ENOMEM;
1007         }
1008
1009         ctdb_ltdb_header_push(&h->header, rec.dptr);
1010         memcpy(rec.dptr + offset, data.dptr, data.dsize);
1011
1012         ret = tdb_store(h->db->ltdb->tdb, h->key, rec, TDB_REPLACE);
1013         if (ret != 0) {
1014                 DEBUG(DEBUG_ERR, ("Failed to store record in DB %s\n",
1015                                   h->db->db_name));
1016                 return EIO;
1017         }
1018
1019         talloc_free(rec.dptr);
1020         return 0;
1021 }
1022
1023 int ctdb_delete_record(struct ctdb_record_handle *h)
1024 {
1025         TDB_DATA rec;
1026         struct ctdb_key_data key;
1027         int ret;
1028
1029         /* Cannot delete the record if it was obtained as a readonly copy */
1030         if (h->readonly) {
1031                 return EINVAL;
1032         }
1033
1034         rec.dsize = ctdb_ltdb_header_len(&h->header);
1035         rec.dptr = talloc_size(h, rec.dsize);
1036         if (rec.dptr == NULL) {
1037                 return ENOMEM;
1038         }
1039
1040         ctdb_ltdb_header_push(&h->header, rec.dptr);
1041
1042         ret = tdb_store(h->db->ltdb->tdb, h->key, rec, TDB_REPLACE);
1043         talloc_free(rec.dptr);
1044         if (ret != 0) {
1045                 DEBUG(DEBUG_ERR, ("Failed to delete record in DB %s\n",
1046                                   h->db->db_name));
1047                 return EIO;
1048         }
1049
1050         key.db_id = h->db->db_id;
1051         key.header = h->header;
1052         key.key = h->key;
1053
1054         ret = ctdb_ctrl_schedule_for_deletion(h, h->ev, h->client,
1055                                               h->client->pnn,
1056                                               tevent_timeval_zero(), &key);
1057         if (ret != 0) {
1058                 DEBUG(DEBUG_WARNING,
1059                       ("Failed to mark record to be deleted in DB %s\n",
1060                        h->db->db_name));
1061                 return ret;
1062         }
1063
1064         return 0;
1065 }
1066
1067 /*
1068  * Global lock functions
1069  */
1070
1071 struct ctdb_g_lock_lock_state {
1072         struct tevent_context *ev;
1073         struct ctdb_client_context *client;
1074         struct ctdb_db_context *db;
1075         TDB_DATA key;
1076         struct ctdb_server_id my_sid;
1077         enum ctdb_g_lock_type lock_type;
1078         struct ctdb_record_handle *h;
1079         /* state for verification of active locks */
1080         struct ctdb_g_lock_list *lock_list;
1081         unsigned int current;
1082 };
1083
1084 static void ctdb_g_lock_lock_fetched(struct tevent_req *subreq);
1085 static void ctdb_g_lock_lock_process_locks(struct tevent_req *req);
1086 static void ctdb_g_lock_lock_checked(struct tevent_req *subreq);
1087 static int ctdb_g_lock_lock_update(struct tevent_req *req);
1088 static void ctdb_g_lock_lock_retry(struct tevent_req *subreq);
1089
1090 static bool ctdb_g_lock_conflicts(enum ctdb_g_lock_type l1,
1091                                   enum ctdb_g_lock_type l2)
1092 {
1093         if ((l1 == CTDB_G_LOCK_READ) && (l2 == CTDB_G_LOCK_READ)) {
1094                 return false;
1095         }
1096         return true;
1097 }
1098
1099 struct tevent_req *ctdb_g_lock_lock_send(TALLOC_CTX *mem_ctx,
1100                                          struct tevent_context *ev,
1101                                          struct ctdb_client_context *client,
1102                                          struct ctdb_db_context *db,
1103                                          const char *keyname,
1104                                          struct ctdb_server_id *sid,
1105                                          bool readonly)
1106 {
1107         struct tevent_req *req, *subreq;
1108         struct ctdb_g_lock_lock_state *state;
1109
1110         req = tevent_req_create(mem_ctx, &state,
1111                                 struct ctdb_g_lock_lock_state);
1112         if (req == NULL) {
1113                 return NULL;
1114         }
1115
1116         state->ev = ev;
1117         state->client = client;
1118         state->db = db;
1119         state->key.dptr = discard_const(keyname);
1120         state->key.dsize = strlen(keyname) + 1;
1121         state->my_sid = *sid;
1122         state->lock_type = (readonly ? CTDB_G_LOCK_READ : CTDB_G_LOCK_WRITE);
1123
1124         subreq = ctdb_fetch_lock_send(state, ev, client, db, state->key,
1125                                       false);
1126         if (tevent_req_nomem(subreq, req)) {
1127                 return tevent_req_post(req, ev);
1128         }
1129         tevent_req_set_callback(subreq, ctdb_g_lock_lock_fetched, req);
1130
1131         return req;
1132 }
1133
1134 static void ctdb_g_lock_lock_fetched(struct tevent_req *subreq)
1135 {
1136         struct tevent_req *req = tevent_req_callback_data(
1137                 subreq, struct tevent_req);
1138         struct ctdb_g_lock_lock_state *state = tevent_req_data(
1139                 req, struct ctdb_g_lock_lock_state);
1140         TDB_DATA data;
1141         int ret = 0;
1142
1143         state->h = ctdb_fetch_lock_recv(subreq, NULL, state, &data, &ret);
1144         TALLOC_FREE(subreq);
1145         if (state->h == NULL) {
1146                 tevent_req_error(req, ret);
1147                 return;
1148         }
1149
1150         if (state->lock_list != NULL) {
1151                 TALLOC_FREE(state->lock_list);
1152                 state->current = 0;
1153         }
1154
1155         ret = ctdb_g_lock_list_pull(data.dptr, data.dsize, state,
1156                                     &state->lock_list);
1157         talloc_free(data.dptr);
1158         if (ret != 0) {
1159                 tevent_req_error(req, ret);
1160                 return;
1161         }
1162
1163         ctdb_g_lock_lock_process_locks(req);
1164 }
1165
1166 static void ctdb_g_lock_lock_process_locks(struct tevent_req *req)
1167 {
1168         struct ctdb_g_lock_lock_state *state = tevent_req_data(
1169                 req, struct ctdb_g_lock_lock_state);
1170         struct tevent_req *subreq;
1171         struct ctdb_g_lock *lock;
1172         bool check_server = false;
1173         int ret;
1174
1175         while (state->current < state->lock_list->num) {
1176                 lock = &state->lock_list->lock[state->current];
1177
1178                 /* We should not ask for the same lock more than once */
1179                 if (ctdb_server_id_equal(&lock->sid, &state->my_sid)) {
1180                         tevent_req_error(req, EDEADLK);
1181                         return;
1182                 }
1183
1184                 if (ctdb_g_lock_conflicts(lock->type, state->lock_type)) {
1185                         check_server = true;
1186                         break;
1187                 }
1188
1189                 state->current += 1;
1190         }
1191
1192         if (check_server) {
1193                 struct ctdb_req_control request;
1194                 struct ctdb_uint64_array u64_array;
1195
1196                 u64_array.num = 1;
1197                 u64_array.val = &lock->sid.unique_id;
1198
1199                 ctdb_req_control_check_srvids(&request, &u64_array);
1200                 subreq = ctdb_client_control_send(state, state->ev,
1201                                                   state->client,
1202                                                   state->client->pnn,
1203                                                   tevent_timeval_zero(),
1204                                                   &request);
1205                 if (tevent_req_nomem(subreq, req)) {
1206                         return;
1207                 }
1208                 tevent_req_set_callback(subreq, ctdb_g_lock_lock_checked, req);
1209                 return;
1210         }
1211
1212         /* There is no conflict, add ourself to the lock_list */
1213         state->lock_list->lock = talloc_realloc(state->lock_list,
1214                                                 state->lock_list->lock,
1215                                                 struct ctdb_g_lock,
1216                                                 state->lock_list->num + 1);
1217         if (state->lock_list->lock == NULL) {
1218                 tevent_req_error(req, ENOMEM);
1219                 return;
1220         }
1221
1222         lock = &state->lock_list->lock[state->lock_list->num];
1223         lock->type = state->lock_type;
1224         lock->sid = state->my_sid;
1225         state->lock_list->num += 1;
1226
1227         ret = ctdb_g_lock_lock_update(req);
1228         if (ret != 0) {
1229                 tevent_req_error(req, ret);
1230                 return;
1231         }
1232
1233         tevent_req_done(req);
1234 }
1235
1236 static void ctdb_g_lock_lock_checked(struct tevent_req *subreq)
1237 {
1238         struct tevent_req *req = tevent_req_callback_data(
1239                 subreq, struct tevent_req);
1240         struct ctdb_g_lock_lock_state *state = tevent_req_data(
1241                 req, struct ctdb_g_lock_lock_state);
1242         struct ctdb_reply_control *reply;
1243         struct ctdb_uint8_array *u8_array;
1244         int ret;
1245         bool status;
1246         int8_t val;
1247
1248         status = ctdb_client_control_recv(subreq, &ret, state, &reply);
1249         TALLOC_FREE(subreq);
1250         if (! status) {
1251                 tevent_req_error(req, ret);
1252                 return;
1253         }
1254
1255         ret = ctdb_reply_control_check_srvids(reply, state, &u8_array);
1256         if (ret != 0) {
1257                 tevent_req_error(req, ENOMEM);
1258                 return;
1259         }
1260
1261         if (u8_array->num != 1) {
1262                 talloc_free(u8_array);
1263                 tevent_req_error(req, EIO);
1264                 return;
1265         }
1266
1267         val = u8_array->val[0];
1268         talloc_free(u8_array);
1269
1270         if (val == 1) {
1271                 /* server process exists, need to retry */
1272                 subreq = tevent_wakeup_send(state, state->ev,
1273                                             tevent_timeval_current_ofs(1,0));
1274                 if (tevent_req_nomem(subreq, req)) {
1275                         return;
1276                 }
1277                 tevent_req_set_callback(subreq, ctdb_g_lock_lock_retry, req);
1278                 return;
1279         }
1280
1281         /* server process does not exist, remove conflicting entry */
1282         state->lock_list->lock[state->current] =
1283                 state->lock_list->lock[state->lock_list->num-1];
1284         state->lock_list->num -= 1;
1285
1286         ret = ctdb_g_lock_lock_update(req);
1287         if (ret != 0) {
1288                 tevent_req_error(req, ret);
1289                 return;
1290         }
1291
1292         ctdb_g_lock_lock_process_locks(req);
1293 }
1294
1295 static int ctdb_g_lock_lock_update(struct tevent_req *req)
1296 {
1297         struct ctdb_g_lock_lock_state *state = tevent_req_data(
1298                 req, struct ctdb_g_lock_lock_state);
1299         TDB_DATA data;
1300         int ret;
1301
1302         data.dsize = ctdb_g_lock_list_len(state->lock_list);
1303         data.dptr = talloc_size(state, data.dsize);
1304         if (data.dptr == NULL) {
1305                 return ENOMEM;
1306         }
1307
1308         ctdb_g_lock_list_push(state->lock_list, data.dptr);
1309         ret = ctdb_store_record(state->h, data);
1310         talloc_free(data.dptr);
1311         return ret;
1312 }
1313
1314 #if 0
1315 static int ctdb_g_lock_lock_update(struct ctdb_g_lock_lock_state *state,
1316                                    struct ctdb_g_lock_list *lock_list,
1317                                    struct ctdb_record_handle *h)
1318 {
1319         struct ctdb_g_lock *lock;
1320         bool conflict = false;
1321         bool modified = false;
1322         int ret, i;
1323
1324         for (i=0; i<lock_list->num; i++) {
1325                 lock = &lock_list->lock[i];
1326
1327                 /* We should not ask for lock more than once */
1328                 if (ctdb_server_id_equal(&lock->sid, &state->my_sid)) {
1329                         return EDEADLK;
1330                 }
1331
1332                 if (ctdb_g_lock_conflicts(lock->type, state->lock_type)) {
1333                         bool exists;
1334
1335                         conflict = true;
1336                         ret = ctdb_server_id_exists(state->client, &lock->sid,
1337                                                     &exists);
1338                         if (ret != 0) {
1339                                 return ret;
1340                         }
1341
1342                         if (exists) {
1343                                 break;
1344                         }
1345
1346                         /* Server does not exist, delete conflicting entry */
1347                         lock_list->lock[i] = lock_list->lock[lock_list->num-1];
1348                         lock_list->num -= 1;
1349                         modified = true;
1350                 }
1351         }
1352
1353         if (! conflict) {
1354                 lock = talloc_realloc(lock_list, lock_list->lock,
1355                                       struct ctdb_g_lock, lock_list->num+1);
1356                 if (lock == NULL) {
1357                         return ENOMEM;
1358                 }
1359
1360                 lock[lock_list->num].type = state->lock_type;
1361                 lock[lock_list->num].sid = state->my_sid;
1362                 lock_list->lock = lock;
1363                 lock_list->num += 1;
1364                 modified = true;
1365         }
1366
1367         if (modified) {
1368                 TDB_DATA data;
1369
1370                 data.dsize = ctdb_g_lock_list_len(lock_list);
1371                 data.dptr = talloc_size(state, data.dsize);
1372                 if (data.dptr == NULL) {
1373                         return ENOMEM;
1374                 }
1375
1376                 ctdb_g_lock_list_push(lock_list, data.dptr);
1377                 ret = ctdb_store_record(h, data);
1378                 talloc_free(data.dptr);
1379                 if (ret != 0) {
1380                         return ret;
1381                 }
1382         }
1383
1384         if (conflict) {
1385                 return EAGAIN;
1386         }
1387         return 0;
1388 }
1389 #endif
1390
1391 static void ctdb_g_lock_lock_retry(struct tevent_req *subreq)
1392 {
1393         struct tevent_req *req = tevent_req_callback_data(
1394                 subreq, struct tevent_req);
1395         struct ctdb_g_lock_lock_state *state = tevent_req_data(
1396                 req, struct ctdb_g_lock_lock_state);
1397         bool success;
1398
1399         success = tevent_wakeup_recv(subreq);
1400         TALLOC_FREE(subreq);
1401         if (! success) {
1402                 tevent_req_error(req, ENOMEM);
1403                 return;
1404         }
1405
1406         subreq = ctdb_fetch_lock_send(state, state->ev, state->client,
1407                                       state->db, state->key, false);
1408         if (tevent_req_nomem(subreq, req)) {
1409                 return;
1410         }
1411         tevent_req_set_callback(subreq, ctdb_g_lock_lock_fetched, req);
1412 }
1413
1414 bool ctdb_g_lock_lock_recv(struct tevent_req *req, int *perr)
1415 {
1416         struct ctdb_g_lock_lock_state *state = tevent_req_data(
1417                 req, struct ctdb_g_lock_lock_state);
1418         int err;
1419
1420         TALLOC_FREE(state->h);
1421
1422         if (tevent_req_is_unix_error(req, &err)) {
1423                 if (perr != NULL) {
1424                         *perr = err;
1425                 }
1426                 return false;
1427         }
1428
1429         return true;
1430 }
1431
1432 struct ctdb_g_lock_unlock_state {
1433         struct tevent_context *ev;
1434         struct ctdb_client_context *client;
1435         struct ctdb_db_context *db;
1436         TDB_DATA key;
1437         struct ctdb_server_id my_sid;
1438         struct ctdb_record_handle *h;
1439         struct ctdb_g_lock_list *lock_list;
1440 };
1441
1442 static void ctdb_g_lock_unlock_fetched(struct tevent_req *subreq);
1443 static int ctdb_g_lock_unlock_update(struct tevent_req *req);
1444
1445 struct tevent_req *ctdb_g_lock_unlock_send(TALLOC_CTX *mem_ctx,
1446                                            struct tevent_context *ev,
1447                                            struct ctdb_client_context *client,
1448                                            struct ctdb_db_context *db,
1449                                            const char *keyname,
1450                                            struct ctdb_server_id sid)
1451 {
1452         struct tevent_req *req, *subreq;
1453         struct ctdb_g_lock_unlock_state *state;
1454
1455         req = tevent_req_create(mem_ctx, &state,
1456                                 struct ctdb_g_lock_unlock_state);
1457         if (req == NULL) {
1458                 return NULL;
1459         }
1460
1461         state->ev = ev;
1462         state->client = client;
1463         state->db = db;
1464         state->key.dptr = discard_const(keyname);
1465         state->key.dsize = strlen(keyname) + 1;
1466         state->my_sid = sid;
1467
1468         subreq = ctdb_fetch_lock_send(state, ev, client, db, state->key,
1469                                       false);
1470         if (tevent_req_nomem(subreq, req)) {
1471                 return tevent_req_post(req, ev);
1472         }
1473         tevent_req_set_callback(subreq, ctdb_g_lock_unlock_fetched, req);
1474
1475         return req;
1476 }
1477
1478 static void ctdb_g_lock_unlock_fetched(struct tevent_req *subreq)
1479 {
1480         struct tevent_req *req = tevent_req_callback_data(
1481                 subreq, struct tevent_req);
1482         struct ctdb_g_lock_unlock_state *state = tevent_req_data(
1483                 req, struct ctdb_g_lock_unlock_state);
1484         TDB_DATA data;
1485         int ret = 0;
1486
1487         state->h = ctdb_fetch_lock_recv(subreq, NULL, state, &data, &ret);
1488         TALLOC_FREE(subreq);
1489         if (state->h == NULL) {
1490                 tevent_req_error(req, ret);
1491                 return;
1492         }
1493
1494         ret = ctdb_g_lock_list_pull(data.dptr, data.dsize, state,
1495                                     &state->lock_list);
1496         if (ret != 0) {
1497                 tevent_req_error(req, ret);
1498                 return;
1499         }
1500
1501         ret = ctdb_g_lock_unlock_update(req);
1502         if (ret != 0) {
1503                 tevent_req_error(req, ret);
1504                 return;
1505         }
1506
1507         tevent_req_done(req);
1508 }
1509
1510 static int ctdb_g_lock_unlock_update(struct tevent_req *req)
1511 {
1512         struct ctdb_g_lock_unlock_state *state = tevent_req_data(
1513                 req, struct ctdb_g_lock_unlock_state);
1514         struct ctdb_g_lock *lock;
1515         int ret, i;
1516
1517         for (i=0; i<state->lock_list->num; i++) {
1518                 lock = &state->lock_list->lock[i];
1519
1520                 if (ctdb_server_id_equal(&lock->sid, &state->my_sid)) {
1521                         break;
1522                 }
1523         }
1524
1525         if (i < state->lock_list->num) {
1526                 state->lock_list->lock[i] =
1527                         state->lock_list->lock[state->lock_list->num-1];
1528                 state->lock_list->num -= 1;
1529         }
1530
1531         if (state->lock_list->num == 0) {
1532                 ctdb_delete_record(state->h);
1533         } else {
1534                 TDB_DATA data;
1535
1536                 data.dsize = ctdb_g_lock_list_len(state->lock_list);
1537                 data.dptr = talloc_size(state, data.dsize);
1538                 if (data.dptr == NULL) {
1539                         return ENOMEM;
1540                 }
1541
1542                 ctdb_g_lock_list_push(state->lock_list, data.dptr);
1543                 ret = ctdb_store_record(state->h, data);
1544                 talloc_free(data.dptr);
1545                 if (ret != 0) {
1546                         return ret;
1547                 }
1548         }
1549
1550         return 0;
1551 }
1552
1553 bool ctdb_g_lock_unlock_recv(struct tevent_req *req, int *perr)
1554 {
1555         struct ctdb_g_lock_unlock_state *state = tevent_req_data(
1556                 req, struct ctdb_g_lock_unlock_state);
1557         int err;
1558
1559         TALLOC_FREE(state->h);
1560
1561         if (tevent_req_is_unix_error(req, &err)) {
1562                 if (perr != NULL) {
1563                         *perr = err;
1564                 }
1565                 return false;
1566         }
1567
1568         return true;
1569 }
1570
1571 /*
1572  * Persistent database functions
1573  */
1574 struct ctdb_transaction_start_state {
1575         struct tevent_context *ev;
1576         struct ctdb_client_context *client;
1577         struct timeval timeout;
1578         struct ctdb_transaction_handle *h;
1579         uint32_t destnode;
1580 };
1581
1582 static void ctdb_transaction_g_lock_attached(struct tevent_req *subreq);
1583 static void ctdb_transaction_register_done(struct tevent_req *subreq);
1584 static void ctdb_transaction_g_lock_done(struct tevent_req *subreq);
1585 static int ctdb_transaction_handle_destructor(struct ctdb_transaction_handle *h);
1586
1587 struct tevent_req *ctdb_transaction_start_send(TALLOC_CTX *mem_ctx,
1588                                                struct tevent_context *ev,
1589                                                struct ctdb_client_context *client,
1590                                                struct timeval timeout,
1591                                                struct ctdb_db_context *db,
1592                                                bool readonly)
1593 {
1594         struct ctdb_transaction_start_state *state;
1595         struct tevent_req *req, *subreq;
1596         struct ctdb_transaction_handle *h;
1597
1598         req = tevent_req_create(mem_ctx, &state,
1599                                 struct ctdb_transaction_start_state);
1600         if (req == NULL) {
1601                 return NULL;
1602         }
1603
1604         if (! db->persistent) {
1605                 tevent_req_error(req, EINVAL);
1606                 return tevent_req_post(req, ev);
1607         }
1608
1609         state->ev = ev;
1610         state->client = client;
1611         state->destnode = ctdb_client_pnn(client);
1612
1613         h = talloc_zero(db, struct ctdb_transaction_handle);
1614         if (tevent_req_nomem(h, req)) {
1615                 return tevent_req_post(req, ev);
1616         }
1617
1618         h->client = client;
1619         h->db = db;
1620         h->readonly = readonly;
1621         h->updated = false;
1622
1623         /* SRVID is unique for databases, so client can have transactions active
1624          * for multiple databases */
1625         h->sid.pid = getpid();
1626         h->sid.task_id = db->db_id;
1627         h->sid.vnn = state->destnode;
1628         h->sid.unique_id = h->sid.task_id;
1629         h->sid.unique_id = (h->sid.unique_id << 32) | h->sid.pid;
1630
1631         h->recbuf = talloc_zero(h, struct ctdb_rec_buffer);
1632         if (tevent_req_nomem(h->recbuf, req)) {
1633                 return tevent_req_post(req, ev);
1634         }
1635
1636         h->lock_name = talloc_asprintf(h, "transaction_db_0x%08x", db->db_id);
1637         if (tevent_req_nomem(h->lock_name, req)) {
1638                 return tevent_req_post(req, ev);
1639         }
1640
1641         state->h = h;
1642
1643         subreq = ctdb_attach_send(state, ev, client, timeout, "g_lock.tdb", 0);
1644         if (tevent_req_nomem(subreq, req)) {
1645                 return tevent_req_post(req, ev);
1646         }
1647         tevent_req_set_callback(subreq, ctdb_transaction_g_lock_attached, req);
1648
1649         return req;
1650 }
1651
1652 static void ctdb_transaction_g_lock_attached(struct tevent_req *subreq)
1653 {
1654         struct tevent_req *req = tevent_req_callback_data(
1655                 subreq, struct tevent_req);
1656         struct ctdb_transaction_start_state *state = tevent_req_data(
1657                 req, struct ctdb_transaction_start_state);
1658         struct ctdb_req_control request;
1659         bool status;
1660         int ret;
1661
1662         status = ctdb_attach_recv(subreq, &ret, &state->h->db_g_lock);
1663         TALLOC_FREE(subreq);
1664         if (! status) {
1665                 tevent_req_error(req, ret);
1666                 return;
1667         }
1668
1669         ctdb_req_control_register_srvid(&request, state->h->sid.unique_id);
1670         subreq = ctdb_client_control_send(state, state->ev, state->client,
1671                                           state->destnode, state->timeout,
1672                                           &request);
1673         if (tevent_req_nomem(subreq, req)) {
1674                 return;
1675         }
1676         tevent_req_set_callback(subreq, ctdb_transaction_register_done, req);
1677 }
1678
1679 static void ctdb_transaction_register_done(struct tevent_req *subreq)
1680 {
1681         struct tevent_req *req = tevent_req_callback_data(
1682                 subreq, struct tevent_req);
1683         struct ctdb_transaction_start_state *state = tevent_req_data(
1684                 req, struct ctdb_transaction_start_state);
1685         struct ctdb_reply_control *reply;
1686         bool status;
1687         int ret;
1688
1689         status = ctdb_client_control_recv(subreq, &ret, state, &reply);
1690         TALLOC_FREE(subreq);
1691         if (! status) {
1692                 tevent_req_error(req, ret);
1693                 return;
1694         }
1695
1696         ret = ctdb_reply_control_register_srvid(reply);
1697         talloc_free(reply);
1698         if (ret != 0) {
1699                 tevent_req_error(req, ret);
1700                 return;
1701         }
1702
1703         subreq = ctdb_g_lock_lock_send(state, state->ev, state->client,
1704                                        state->h->db_g_lock, state->h->lock_name,
1705                                        &state->h->sid, state->h->readonly);
1706         if (tevent_req_nomem(subreq, req)) {
1707                 return;
1708         }
1709         tevent_req_set_callback(subreq, ctdb_transaction_g_lock_done, req);
1710 }
1711
1712 static void ctdb_transaction_g_lock_done(struct tevent_req *subreq)
1713 {
1714         struct tevent_req *req = tevent_req_callback_data(
1715                 subreq, struct tevent_req);
1716         int ret;
1717         bool status;
1718
1719         status = ctdb_g_lock_lock_recv(subreq, &ret);
1720         TALLOC_FREE(subreq);
1721         if (! status) {
1722                 tevent_req_error(req, ret);
1723                 return;
1724         }
1725
1726         tevent_req_done(req);
1727 }
1728
1729 struct ctdb_transaction_handle *ctdb_transaction_start_recv(
1730                                         struct tevent_req *req,
1731                                         int *perr)
1732 {
1733         struct ctdb_transaction_start_state *state = tevent_req_data(
1734                 req, struct ctdb_transaction_start_state);
1735         struct ctdb_transaction_handle *h = state->h;
1736         int err;
1737
1738         if (tevent_req_is_unix_error(req, &err)) {
1739                 if (perr != NULL) {
1740                         *perr = err;
1741                 }
1742                 return NULL;
1743         }
1744
1745         talloc_set_destructor(h, ctdb_transaction_handle_destructor);
1746         return h;
1747 }
1748
1749 static int ctdb_transaction_handle_destructor(struct ctdb_transaction_handle *h)
1750 {
1751         int ret;
1752
1753         ret = ctdb_ctrl_deregister_srvid(h, h->ev, h->client, h->client->pnn,
1754                                          tevent_timeval_zero(),
1755                                          h->sid.unique_id);
1756         if (ret != 0) {
1757                 DEBUG(DEBUG_WARNING, ("Failed to deregister SRVID\n"));
1758         }
1759
1760         return 0;
1761 }
1762
1763 int ctdb_transaction_start(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
1764                            struct ctdb_client_context *client,
1765                            struct timeval timeout,
1766                            struct ctdb_db_context *db, bool readonly,
1767                            struct ctdb_transaction_handle **out)
1768 {
1769         struct tevent_req *req;
1770         struct ctdb_transaction_handle *h;
1771         int ret;
1772
1773         req = ctdb_transaction_start_send(mem_ctx, ev, client, timeout, db,
1774                                           readonly);
1775         if (req == NULL) {
1776                 return ENOMEM;
1777         }
1778
1779         tevent_req_poll(req, ev);
1780
1781         h = ctdb_transaction_start_recv(req, &ret);
1782         if (h == NULL) {
1783                 return ret;
1784         }
1785
1786         *out = h;
1787         return 0;
1788 }
1789
1790 struct ctdb_transaction_record_fetch_state {
1791         TDB_DATA key, data;
1792         struct ctdb_ltdb_header header;
1793         bool found;
1794 };
1795
1796 static int ctdb_transaction_record_fetch_traverse(uint32_t reqid,
1797                                                   struct ctdb_ltdb_header *header,
1798                                                   TDB_DATA key,
1799                                                   TDB_DATA data,
1800                                                   void *private_data)
1801 {
1802         struct ctdb_transaction_record_fetch_state *state =
1803                 (struct ctdb_transaction_record_fetch_state *)private_data;
1804
1805         if (state->key.dsize == key.dsize &&
1806             memcmp(state->key.dptr, key.dptr, key.dsize) == 0) {
1807                 state->data = data;
1808                 state->header = *header;
1809                 state->found = true;
1810         }
1811
1812         return 0;
1813 }
1814
1815 static int ctdb_transaction_record_fetch(struct ctdb_transaction_handle *h,
1816                                          TDB_DATA key,
1817                                          struct ctdb_ltdb_header *header,
1818                                          TDB_DATA *data)
1819 {
1820         struct ctdb_transaction_record_fetch_state state;
1821         int ret;
1822
1823         state.key = key;
1824         state.found = false;
1825
1826         ret = ctdb_rec_buffer_traverse(h->recbuf,
1827                                        ctdb_transaction_record_fetch_traverse,
1828                                        &state);
1829         if (ret != 0) {
1830                 return ret;
1831         }
1832
1833         if (state.found) {
1834                 if (header != NULL) {
1835                         *header = state.header;
1836                 }
1837                 if (data != NULL) {
1838                         *data = state.data;
1839                 }
1840                 return 0;
1841         }
1842
1843         return ENOENT;
1844 }
1845
1846 int ctdb_transaction_fetch_record(struct ctdb_transaction_handle *h,
1847                                   TDB_DATA key,
1848                                   TALLOC_CTX *mem_ctx, TDB_DATA *data)
1849 {
1850         TDB_DATA tmp_data;
1851         struct ctdb_ltdb_header header;
1852         int ret;
1853
1854         ret = ctdb_transaction_record_fetch(h, key, NULL, &tmp_data);
1855         if (ret == 0) {
1856                 data->dptr = talloc_memdup(mem_ctx, tmp_data.dptr,
1857                                            tmp_data.dsize);
1858                 if (data->dptr == NULL) {
1859                         return ENOMEM;
1860                 }
1861                 data->dsize = tmp_data.dsize;
1862                 return 0;
1863         }
1864
1865         ret = ctdb_ltdb_fetch(h->db, key, &header, mem_ctx, data);
1866         if (ret != 0) {
1867                 return ret;
1868         }
1869
1870         ret = ctdb_rec_buffer_add(h, h->recbuf, 0, &header, key, *data);
1871         if (ret != 0) {
1872                 return ret;
1873         }
1874
1875         return 0;
1876 }
1877
1878 int ctdb_transaction_store_record(struct ctdb_transaction_handle *h,
1879                                   TDB_DATA key, TDB_DATA data)
1880 {
1881         TALLOC_CTX *tmp_ctx;
1882         struct ctdb_ltdb_header header;
1883         TDB_DATA old_data;
1884         int ret;
1885
1886         if (h->readonly) {
1887                 return EINVAL;
1888         }
1889
1890         tmp_ctx = talloc_new(h);
1891         if (tmp_ctx == NULL) {
1892                 return ENOMEM;
1893         }
1894
1895         ret = ctdb_transaction_record_fetch(h, key, &header, &old_data);
1896         if (ret != 0) {
1897                 ret = ctdb_ltdb_fetch(h->db, key, &header, tmp_ctx, &old_data);
1898                 if (ret != 0) {
1899                         return ret;
1900                 }
1901         }
1902
1903         if (old_data.dsize == data.dsize &&
1904             memcmp(old_data.dptr, data.dptr, data.dsize) == 0) {
1905                 talloc_free(tmp_ctx);
1906                 return 0;
1907         }
1908
1909         header.dmaster = ctdb_client_pnn(h->client);
1910         header.rsn += 1;
1911
1912         ret = ctdb_rec_buffer_add(h, h->recbuf, 0, &header, key, data);
1913         talloc_free(tmp_ctx);
1914         if (ret != 0) {
1915                 return ret;
1916         }
1917         h->updated = true;
1918
1919         return 0;
1920 }
1921
1922 int ctdb_transaction_delete_record(struct ctdb_transaction_handle *h,
1923                                    TDB_DATA key)
1924 {
1925         return ctdb_transaction_store_record(h, key, tdb_null);
1926 }
1927
1928 static int ctdb_transaction_store_db_seqnum(struct ctdb_transaction_handle *h,
1929                                             uint64_t seqnum)
1930 {
1931         const char *keyname = CTDB_DB_SEQNUM_KEY;
1932         TDB_DATA key, data;
1933
1934         key.dptr = discard_const(keyname);
1935         key.dsize = strlen(keyname) + 1;
1936
1937         data.dptr = (uint8_t *)&seqnum;
1938         data.dsize = sizeof(seqnum);
1939
1940         return ctdb_transaction_store_record(h, key, data);
1941 }
1942
1943 struct ctdb_transaction_commit_state {
1944         struct tevent_context *ev;
1945         struct ctdb_transaction_handle *h;
1946         uint64_t seqnum;
1947 };
1948
1949 static void ctdb_transaction_commit_done(struct tevent_req *subreq);
1950 static void ctdb_transaction_commit_try(struct tevent_req *subreq);
1951
1952 struct tevent_req *ctdb_transaction_commit_send(
1953                                         TALLOC_CTX *mem_ctx,
1954                                         struct tevent_context *ev,
1955                                         struct ctdb_transaction_handle *h)
1956 {
1957         struct tevent_req *req, *subreq;
1958         struct ctdb_transaction_commit_state *state;
1959         int ret;
1960
1961         req = tevent_req_create(mem_ctx, &state,
1962                                 struct ctdb_transaction_commit_state);
1963         if (req == NULL) {
1964                 return NULL;
1965         }
1966
1967         state->ev = ev;
1968         state->h = h;
1969
1970         ret = ctdb_ctrl_get_db_seqnum(state, ev, h->client,
1971                                       h->client->pnn, tevent_timeval_zero(),
1972                                       h->db->db_id, &state->seqnum);
1973         if (ret != 0) {
1974                 tevent_req_error(req, ret);
1975                 return tevent_req_post(req, ev);
1976         }
1977
1978         ret = ctdb_transaction_store_db_seqnum(h, state->seqnum+1);
1979         if (ret != 0) {
1980                 tevent_req_error(req, ret);
1981                 return tevent_req_post(req, ev);
1982         }
1983
1984         subreq = ctdb_recovery_wait_send(state, ev, h->client);
1985         if (tevent_req_nomem(subreq, req)) {
1986                 return tevent_req_post(req, ev);
1987         }
1988         tevent_req_set_callback(subreq, ctdb_transaction_commit_try, req);
1989
1990         return req;
1991 }
1992
1993 static void ctdb_transaction_commit_try(struct tevent_req *subreq)
1994 {
1995         struct tevent_req *req = tevent_req_callback_data(
1996                 subreq, struct tevent_req);
1997         struct ctdb_transaction_commit_state *state = tevent_req_data(
1998                 req, struct ctdb_transaction_commit_state);
1999         struct ctdb_req_control request;
2000         int ret;
2001         bool status;
2002
2003         status = ctdb_recovery_wait_recv(subreq, &ret);
2004         TALLOC_FREE(subreq);
2005         if (! status) {
2006                 tevent_req_error(req, ret);
2007                 return;
2008         }
2009
2010         ctdb_req_control_trans3_commit(&request, state->h->recbuf);
2011         subreq = ctdb_client_control_send(state, state->ev, state->h->client,
2012                                           state->h->client->pnn,
2013                                           tevent_timeval_zero(), &request);
2014         if (tevent_req_nomem(subreq, req)) {
2015                 return;
2016         }
2017         tevent_req_set_callback(subreq, ctdb_transaction_commit_done, req);
2018 }
2019
2020 static void ctdb_transaction_commit_done(struct tevent_req *subreq)
2021 {
2022         struct tevent_req *req = tevent_req_callback_data(
2023                 subreq, struct tevent_req);
2024         struct ctdb_transaction_commit_state *state = tevent_req_data(
2025                 req, struct ctdb_transaction_commit_state);
2026         struct ctdb_reply_control *reply;
2027         uint64_t seqnum;
2028         int ret;
2029         bool status;
2030
2031         status = ctdb_client_control_recv(subreq, &ret, state, &reply);
2032         TALLOC_FREE(subreq);
2033         if (! status) {
2034                 tevent_req_error(req, ret);
2035                 return;
2036         }
2037
2038         ret = ctdb_reply_control_trans3_commit(reply);
2039         if (ret < 0) {
2040                 /* Control failed due to recovery */
2041                 subreq = ctdb_recovery_wait_send(state, state->ev,
2042                                                  state->h->client);
2043                 if (tevent_req_nomem(subreq, req)) {
2044                         return;
2045                 }
2046                 tevent_req_set_callback(subreq, ctdb_transaction_commit_try,
2047                                         req);
2048                 return;
2049         }
2050
2051         ret = ctdb_ctrl_get_db_seqnum(state, state->ev, state->h->client,
2052                                       state->h->client->pnn,
2053                                       tevent_timeval_zero(),
2054                                       state->h->db->db_id, &seqnum);
2055         if (ret != 0) {
2056                 tevent_req_error(req, ret);
2057                 return;
2058         }
2059
2060         if (seqnum == state->seqnum) {
2061                 subreq = ctdb_recovery_wait_send(state, state->ev,
2062                                                  state->h->client);
2063                 if (tevent_req_nomem(subreq, req)) {
2064                         return;
2065                 }
2066                 tevent_req_set_callback(subreq, ctdb_transaction_commit_try,
2067                                         req);
2068                 return;
2069         }
2070
2071         if (seqnum != state->seqnum + 1) {
2072                 tevent_req_error(req, EIO);
2073                 return;
2074         }
2075
2076         tevent_req_done(req);
2077 }
2078
2079 bool ctdb_transaction_commit_recv(struct tevent_req *req, int *perr)
2080 {
2081         struct ctdb_transaction_commit_state *state = tevent_req_data(
2082                 req, struct ctdb_transaction_commit_state);
2083         int err;
2084
2085         if (tevent_req_is_unix_error(req, &err)) {
2086                 if (perr != NULL) {
2087                         *perr = err;
2088                 }
2089                 TALLOC_FREE(state->h);
2090                 return false;
2091         }
2092
2093         TALLOC_FREE(state->h);
2094         return true;
2095 }
2096
2097 int ctdb_transaction_commit(struct ctdb_transaction_handle *h)
2098 {
2099         struct tevent_req *req;
2100         int ret;
2101         bool status;
2102
2103         if (h->readonly || ! h->updated) {
2104                 talloc_free(h);
2105                 return 0;
2106         }
2107
2108         req = ctdb_transaction_commit_send(h, h->ev, h);
2109         if (req == NULL) {
2110                 talloc_free(h);
2111                 return ENOMEM;
2112         }
2113
2114         tevent_req_poll(req, h->ev);
2115
2116         status = ctdb_transaction_commit_recv(req, &ret);
2117         if (! status) {
2118                 talloc_free(h);
2119                 return ret;
2120         }
2121
2122         talloc_free(h);
2123         return 0;
2124 }
2125
2126 int ctdb_transaction_cancel(struct ctdb_transaction_handle *h)
2127 {
2128         talloc_free(h);
2129         return 0;
2130 }
2131
2132 /*
2133  * TODO:
2134  *
2135  * In future Samba should register SERVER_ID.
2136  * Make that structure same as struct srvid {}.
2137  */