ctdb-client: Refactor cluster-wide database traverse api
[metze/samba/wip.git] / ctdb / client / client_db.c
1 /*
2    CTDB client code
3
4    Copyright (C) Amitay Isaacs  2015
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "replace.h"
21 #include "system/network.h"
22 #include "system/filesys.h"
23
24 #include <talloc.h>
25 #include <tevent.h>
26 #include <tdb.h>
27
28 #include "common/logging.h"
29
30 #include "lib/tdb_wrap/tdb_wrap.h"
31 #include "lib/util/tevent_unix.h"
32 #include "lib/util/dlinklist.h"
33 #include "lib/util/debug.h"
34
35 #include "protocol/protocol.h"
36 #include "protocol/protocol_api.h"
37 #include "client/client_private.h"
38 #include "client/client.h"
39
40 static struct ctdb_db_context *client_db_handle(
41                                         struct ctdb_client_context *client,
42                                         const char *db_name)
43 {
44         struct ctdb_db_context *db;
45
46         for (db = client->db; db != NULL; db = db->next) {
47                 if (strcmp(db_name, db->db_name) == 0) {
48                         return db;
49                 }
50         }
51
52         return NULL;
53 }
54
55 struct ctdb_set_db_flags_state {
56         struct tevent_context *ev;
57         struct ctdb_client_context *client;
58         struct timeval timeout;
59         uint32_t db_id;
60         uint8_t db_flags;
61         bool readonly_done, sticky_done;
62         uint32_t *pnn_list;
63         int count;
64 };
65
66 static void ctdb_set_db_flags_nodemap_done(struct tevent_req *subreq);
67 static void ctdb_set_db_flags_readonly_done(struct tevent_req *subreq);
68 static void ctdb_set_db_flags_sticky_done(struct tevent_req *subreq);
69
70 static struct tevent_req *ctdb_set_db_flags_send(
71                                 TALLOC_CTX *mem_ctx,
72                                 struct tevent_context *ev,
73                                 struct ctdb_client_context *client,
74                                 uint32_t destnode, struct timeval timeout,
75                                 uint32_t db_id, uint8_t db_flags)
76 {
77         struct tevent_req *req, *subreq;
78         struct ctdb_set_db_flags_state *state;
79         struct ctdb_req_control request;
80
81         req = tevent_req_create(mem_ctx, &state,
82                                 struct ctdb_set_db_flags_state);
83         if (req == NULL) {
84                 return NULL;
85         }
86
87         if (! (db_flags & (CTDB_DB_FLAGS_READONLY | CTDB_DB_FLAGS_STICKY))) {
88                 tevent_req_done(req);
89                 return tevent_req_post(req, ev);
90         }
91
92         state->ev = ev;
93         state->client = client;
94         state->timeout = timeout;
95         state->db_id = db_id;
96         state->db_flags = db_flags;
97
98         ctdb_req_control_get_nodemap(&request);
99         subreq = ctdb_client_control_send(state, ev, client, destnode, timeout,
100                                           &request);
101         if (tevent_req_nomem(subreq, req)) {
102                 return tevent_req_post(req, ev);
103         }
104         tevent_req_set_callback(subreq, ctdb_set_db_flags_nodemap_done, req);
105
106         return req;
107 }
108
109 static void ctdb_set_db_flags_nodemap_done(struct tevent_req *subreq)
110 {
111         struct tevent_req *req = tevent_req_callback_data(
112                 subreq, struct tevent_req);
113         struct ctdb_set_db_flags_state *state = tevent_req_data(
114                 req, struct ctdb_set_db_flags_state);
115         struct ctdb_req_control request;
116         struct ctdb_reply_control *reply;
117         struct ctdb_node_map *nodemap;
118         int ret;
119         bool status;
120
121         status = ctdb_client_control_recv(subreq, &ret, state, &reply);
122         TALLOC_FREE(subreq);
123         if (! status) {
124                 DEBUG(DEBUG_ERR,
125                       ("set_db_flags: 0x%08x GET_NODEMAP failed, ret=%d\n",
126                        state->db_id, ret));
127                 tevent_req_error(req, ret);
128                 return;
129         }
130
131         ret = ctdb_reply_control_get_nodemap(reply, state, &nodemap);
132         talloc_free(reply);
133         if (ret != 0) {
134                 DEBUG(DEBUG_ERR,
135                       ("set_db_flags: 0x%08x GET_NODEMAP parse failed, ret=%d\n",
136                       state->db_id, ret));
137                 tevent_req_error(req, ret);
138                 return;
139         }
140
141         state->count = list_of_connected_nodes(nodemap, CTDB_UNKNOWN_PNN,
142                                                state, &state->pnn_list);
143         talloc_free(nodemap);
144         if (state->count <= 0) {
145                 DEBUG(DEBUG_ERR,
146                       ("set_db_flags: 0x%08x no connected nodes, count=%d\n",
147                        state->db_id, state->count));
148                 tevent_req_error(req, ENOMEM);
149                 return;
150         }
151
152         if (state->db_flags & CTDB_DB_FLAGS_READONLY) {
153                 ctdb_req_control_set_db_readonly(&request, state->db_id);
154                 subreq = ctdb_client_control_multi_send(
155                                         state, state->ev, state->client,
156                                         state->pnn_list, state->count,
157                                         state->timeout, &request);
158                 if (tevent_req_nomem(subreq, req)) {
159                         return;
160                 }
161                 tevent_req_set_callback(subreq,
162                                         ctdb_set_db_flags_readonly_done, req);
163         } else {
164                 state->readonly_done = true;
165         }
166
167         if (state->db_flags & CTDB_DB_FLAGS_STICKY) {
168                 ctdb_req_control_set_db_sticky(&request, state->db_id);
169                 subreq = ctdb_client_control_multi_send(
170                                         state, state->ev, state->client,
171                                         state->pnn_list, state->count,
172                                         state->timeout, &request);
173                 if (tevent_req_nomem(subreq, req)) {
174                         return;
175                 }
176                 tevent_req_set_callback(subreq, ctdb_set_db_flags_sticky_done,
177                                         req);
178         } else {
179                 state->sticky_done = true;
180         }
181 }
182
183 static void ctdb_set_db_flags_readonly_done(struct tevent_req *subreq)
184 {
185         struct tevent_req *req = tevent_req_callback_data(
186                 subreq, struct tevent_req);
187         struct ctdb_set_db_flags_state *state = tevent_req_data(
188                 req, struct ctdb_set_db_flags_state);
189         int ret;
190         bool status;
191
192         status = ctdb_client_control_multi_recv(subreq, &ret, NULL, NULL,
193                                                 NULL);
194         TALLOC_FREE(subreq);
195         if (! status) {
196                 DEBUG(DEBUG_ERR,
197                       ("set_db_flags: 0x%08x SET_DB_READONLY failed, ret=%d\n",
198                        state->db_id, ret));
199                 tevent_req_error(req, ret);
200                 return;
201         }
202
203         state->readonly_done = true;
204
205         if (state->readonly_done && state->sticky_done) {
206                 tevent_req_done(req);
207         }
208 }
209
210 static void ctdb_set_db_flags_sticky_done(struct tevent_req *subreq)
211 {
212         struct tevent_req *req = tevent_req_callback_data(
213                 subreq, struct tevent_req);
214         struct ctdb_set_db_flags_state *state = tevent_req_data(
215                 req, struct ctdb_set_db_flags_state);
216         int ret;
217         bool status;
218
219         status = ctdb_client_control_multi_recv(subreq, &ret, NULL, NULL,
220                                                 NULL);
221         TALLOC_FREE(subreq);
222         if (! status) {
223                 DEBUG(DEBUG_ERR,
224                       ("set_db_flags: 0x%08x SET_DB_STICKY failed, ret=%d\n",
225                        state->db_id, ret));
226                 tevent_req_error(req, ret);
227                 return;
228         }
229
230         state->sticky_done = true;
231
232         if (state->readonly_done && state->sticky_done) {
233                 tevent_req_done(req);
234         }
235 }
236
237 static bool ctdb_set_db_flags_recv(struct tevent_req *req, int *perr)
238 {
239         int err;
240
241         if (tevent_req_is_unix_error(req, &err)) {
242                 if (perr != NULL) {
243                         *perr = err;
244                 }
245                 return false;
246         }
247         return true;
248 }
249
250 struct ctdb_attach_state {
251         struct tevent_context *ev;
252         struct ctdb_client_context *client;
253         struct timeval timeout;
254         uint32_t destnode;
255         uint8_t db_flags;
256         uint32_t tdb_flags;
257         struct ctdb_db_context *db;
258 };
259
260 static void ctdb_attach_mutex_done(struct tevent_req *subreq);
261 static void ctdb_attach_dbid_done(struct tevent_req *subreq);
262 static void ctdb_attach_dbpath_done(struct tevent_req *subreq);
263 static void ctdb_attach_health_done(struct tevent_req *subreq);
264 static void ctdb_attach_flags_done(struct tevent_req *subreq);
265
266 struct tevent_req *ctdb_attach_send(TALLOC_CTX *mem_ctx,
267                                     struct tevent_context *ev,
268                                     struct ctdb_client_context *client,
269                                     struct timeval timeout,
270                                     const char *db_name, uint8_t db_flags)
271 {
272         struct tevent_req *req, *subreq;
273         struct ctdb_attach_state *state;
274         struct ctdb_req_control request;
275
276         req = tevent_req_create(mem_ctx, &state, struct ctdb_attach_state);
277         if (req == NULL) {
278                 return NULL;
279         }
280
281         state->db = client_db_handle(client, db_name);
282         if (state->db != NULL) {
283                 tevent_req_done(req);
284                 return tevent_req_post(req, ev);
285         }
286
287         state->ev = ev;
288         state->client = client;
289         state->timeout = timeout;
290         state->destnode = ctdb_client_pnn(client);
291         state->db_flags = db_flags;
292
293         state->db = talloc_zero(client, struct ctdb_db_context);
294         if (tevent_req_nomem(state->db, req)) {
295                 return tevent_req_post(req, ev);
296         }
297
298         state->db->db_name = talloc_strdup(state->db, db_name);
299         if (tevent_req_nomem(state->db, req)) {
300                 return tevent_req_post(req, ev);
301         }
302
303         if (db_flags & CTDB_DB_FLAGS_PERSISTENT) {
304                 state->db->persistent = true;
305         }
306
307         ctdb_req_control_get_tunable(&request, "TDBMutexEnabled");
308         subreq = ctdb_client_control_send(state, ev, client,
309                                           ctdb_client_pnn(client), timeout,
310                                           &request);
311         if (tevent_req_nomem(subreq, req)) {
312                 return tevent_req_post(req, ev);
313         }
314         tevent_req_set_callback(subreq, ctdb_attach_mutex_done, req);
315
316         return req;
317 }
318
319 static void ctdb_attach_mutex_done(struct tevent_req *subreq)
320 {
321         struct tevent_req *req = tevent_req_callback_data(
322                 subreq, struct tevent_req);
323         struct ctdb_attach_state *state = tevent_req_data(
324                 req, struct ctdb_attach_state);
325         struct ctdb_reply_control *reply;
326         struct ctdb_req_control request;
327         uint32_t mutex_enabled;
328         int ret;
329         bool status;
330
331         status = ctdb_client_control_recv(subreq, &ret, state, &reply);
332         TALLOC_FREE(subreq);
333         if (! status) {
334                 DEBUG(DEBUG_ERR, ("attach: %s GET_TUNABLE failed, ret=%d\n",
335                                   state->db->db_name, ret));
336                 tevent_req_error(req, ret);
337                 return;
338         }
339
340         ret = ctdb_reply_control_get_tunable(reply, &mutex_enabled);
341         if (ret != 0) {
342                 /* Treat error as mutex support not available */
343                 mutex_enabled = 0;
344         }
345
346         if (state->db->persistent) {
347                 state->tdb_flags = TDB_DEFAULT;
348         } else {
349                 state->tdb_flags = (TDB_NOSYNC | TDB_INCOMPATIBLE_HASH |
350                                     TDB_CLEAR_IF_FIRST);
351                 if (mutex_enabled == 1) {
352                         state->tdb_flags |= TDB_MUTEX_LOCKING;
353                 }
354         }
355
356         if (state->db->persistent) {
357                 ctdb_req_control_db_attach_persistent(&request,
358                                                       state->db->db_name,
359                                                       state->tdb_flags);
360         } else {
361                 ctdb_req_control_db_attach(&request, state->db->db_name,
362                                            state->tdb_flags);
363         }
364
365         subreq = ctdb_client_control_send(state, state->ev, state->client,
366                                           state->destnode, state->timeout,
367                                           &request);
368         if (tevent_req_nomem(subreq, req)) {
369                 return;
370         }
371         tevent_req_set_callback(subreq, ctdb_attach_dbid_done, req);
372 }
373
374 static void ctdb_attach_dbid_done(struct tevent_req *subreq)
375 {
376         struct tevent_req *req = tevent_req_callback_data(
377                 subreq, struct tevent_req);
378         struct ctdb_attach_state *state = tevent_req_data(
379                 req, struct ctdb_attach_state);
380         struct ctdb_req_control request;
381         struct ctdb_reply_control *reply;
382         bool status;
383         int ret;
384
385         status = ctdb_client_control_recv(subreq, &ret, state, &reply);
386         TALLOC_FREE(subreq);
387         if (! status) {
388                 DEBUG(DEBUG_ERR, ("attach: %s %s failed, ret=%d\n",
389                                   state->db->db_name,
390                                   (state->db->persistent
391                                         ? "DB_ATTACH_PERSISTENT"
392                                         : "DB_ATTACH"),
393                                   ret));
394                 tevent_req_error(req, ret);
395                 return;
396         }
397
398         if (state->db->persistent) {
399                 ret = ctdb_reply_control_db_attach_persistent(
400                                 reply, &state->db->db_id);
401         } else {
402                 ret = ctdb_reply_control_db_attach(reply, &state->db->db_id);
403         }
404         talloc_free(reply);
405         if (ret != 0) {
406                 DEBUG(DEBUG_ERR, ("attach: %s failed to get db_id, ret=%d\n",
407                                   state->db->db_name, ret));
408                 tevent_req_error(req, ret);
409                 return;
410         }
411
412         ctdb_req_control_getdbpath(&request, state->db->db_id);
413         subreq = ctdb_client_control_send(state, state->ev, state->client,
414                                           state->destnode, state->timeout,
415                                           &request);
416         if (tevent_req_nomem(subreq, req)) {
417                 return;
418         }
419         tevent_req_set_callback(subreq, ctdb_attach_dbpath_done, req);
420 }
421
422 static void ctdb_attach_dbpath_done(struct tevent_req *subreq)
423 {
424         struct tevent_req *req = tevent_req_callback_data(
425                 subreq, struct tevent_req);
426         struct ctdb_attach_state *state = tevent_req_data(
427                 req, struct ctdb_attach_state);
428         struct ctdb_reply_control *reply;
429         struct ctdb_req_control request;
430         bool status;
431         int ret;
432
433         status = ctdb_client_control_recv(subreq, &ret, state, &reply);
434         TALLOC_FREE(subreq);
435         if (! status) {
436                 DEBUG(DEBUG_ERR, ("attach: %s GETDBPATH failed, ret=%d\n",
437                                   state->db->db_name, ret));
438                 tevent_req_error(req, ret);
439                 return;
440         }
441
442         ret = ctdb_reply_control_getdbpath(reply, state->db,
443                                            &state->db->db_path);
444         talloc_free(reply);
445         if (ret != 0) {
446                 DEBUG(DEBUG_ERR, ("attach: %s GETDBPATH parse failed, ret=%d\n",
447                                   state->db->db_name, ret));
448                 tevent_req_error(req, ret);
449                 return;
450         }
451
452         ctdb_req_control_db_get_health(&request, state->db->db_id);
453         subreq = ctdb_client_control_send(state, state->ev, state->client,
454                                           state->destnode, state->timeout,
455                                           &request);
456         if (tevent_req_nomem(subreq, req)) {
457                 return;
458         }
459         tevent_req_set_callback(subreq, ctdb_attach_health_done, req);
460 }
461
462 static void ctdb_attach_health_done(struct tevent_req *subreq)
463 {
464         struct tevent_req *req = tevent_req_callback_data(
465                 subreq, struct tevent_req);
466         struct ctdb_attach_state *state = tevent_req_data(
467                 req, struct ctdb_attach_state);
468         struct ctdb_reply_control *reply;
469         const char *reason;
470         bool status;
471         int ret;
472
473         status = ctdb_client_control_recv(subreq, &ret, state, &reply);
474         TALLOC_FREE(subreq);
475         if (! status) {
476                 DEBUG(DEBUG_ERR, ("attach: %s DB_GET_HEALTH failed, ret=%d\n",
477                                   state->db->db_name, ret));
478                 tevent_req_error(req, ret);
479                 return;
480         }
481
482         ret = ctdb_reply_control_db_get_health(reply, state, &reason);
483         if (ret != 0) {
484                 DEBUG(DEBUG_ERR,
485                       ("attach: %s DB_GET_HEALTH parse failed, ret=%d\n",
486                        state->db->db_name, ret));
487                 tevent_req_error(req, ret);
488                 return;
489         }
490
491         if (reason != NULL) {
492                 /* Database unhealthy, avoid attach */
493                 DEBUG(DEBUG_ERR, ("attach: %s database unhealthy (%s)\n",
494                                   state->db->db_name, reason));
495                 tevent_req_error(req, EIO);
496                 return;
497         }
498
499         subreq = ctdb_set_db_flags_send(state, state->ev, state->client,
500                                         state->destnode, state->timeout,
501                                         state->db->db_id, state->db_flags);
502         if (tevent_req_nomem(subreq, req)) {
503                 return;
504         }
505         tevent_req_set_callback(subreq, ctdb_attach_flags_done, req);
506 }
507
508 static void ctdb_attach_flags_done(struct tevent_req *subreq)
509 {
510         struct tevent_req *req = tevent_req_callback_data(
511                 subreq, struct tevent_req);
512         struct ctdb_attach_state *state = tevent_req_data(
513                 req, struct ctdb_attach_state);
514         bool status;
515         int ret;
516
517         status = ctdb_set_db_flags_recv(subreq, &ret);
518         TALLOC_FREE(subreq);
519         if (! status) {
520                 DEBUG(DEBUG_ERR, ("attach: %s set db flags 0x%08x failed\n",
521                                   state->db->db_name, state->db_flags));
522                 tevent_req_error(req, ret);
523                 return;
524         }
525
526         state->db->ltdb = tdb_wrap_open(state->db, state->db->db_path, 0,
527                                         state->tdb_flags, O_RDWR, 0);
528         if (tevent_req_nomem(state->db->ltdb, req)) {
529                 DEBUG(DEBUG_ERR, ("attach: %s tdb_wrap_open failed\n",
530                                   state->db->db_name));
531                 return;
532         }
533         DLIST_ADD(state->client->db, state->db);
534
535         tevent_req_done(req);
536 }
537
538 bool ctdb_attach_recv(struct tevent_req *req, int *perr,
539                       struct ctdb_db_context **out)
540 {
541         struct ctdb_attach_state *state = tevent_req_data(
542                 req, struct ctdb_attach_state);
543         int err;
544
545         if (tevent_req_is_unix_error(req, &err)) {
546                 if (perr != NULL) {
547                         *perr = err;
548                 }
549                 return false;
550         }
551
552         if (out != NULL) {
553                 *out = state->db;
554         }
555         return true;
556 }
557
558 int ctdb_attach(struct tevent_context *ev,
559                 struct ctdb_client_context *client,
560                 struct timeval timeout,
561                 const char *db_name, uint8_t db_flags,
562                 struct ctdb_db_context **out)
563 {
564         TALLOC_CTX *mem_ctx;
565         struct tevent_req *req;
566         bool status;
567         int ret;
568
569         mem_ctx = talloc_new(client);
570         if (mem_ctx == NULL) {
571                 return ENOMEM;
572         }
573
574         req = ctdb_attach_send(mem_ctx, ev, client, timeout,
575                                db_name, db_flags);
576         if (req == NULL) {
577                 talloc_free(mem_ctx);
578                 return ENOMEM;
579         }
580
581         tevent_req_poll(req, ev);
582
583         status = ctdb_attach_recv(req, &ret, out);
584         if (! status) {
585                 talloc_free(mem_ctx);
586                 return ret;
587         }
588
589         /*
590         ctdb_set_call(db, CTDB_NULL_FUNC, ctdb_null_func);
591         ctdb_set_call(db, CTDB_FETCH_FUNC, ctdb_fetch_func);
592         ctdb_set_call(db, CTDB_FETCH_WITH_HEADER_FUNC, ctdb_fetch_with_header_func);
593         */
594
595         talloc_free(mem_ctx);
596         return 0;
597 }
598
599 int ctdb_detach(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
600                 struct ctdb_client_context *client,
601                 struct timeval timeout, uint32_t db_id)
602 {
603         struct ctdb_db_context *db;
604         int ret;
605
606         ret = ctdb_ctrl_db_detach(mem_ctx, ev, client, client->pnn, timeout,
607                                   db_id);
608         if (ret != 0) {
609                 return ret;
610         }
611
612         for (db = client->db; db != NULL; db = db->next) {
613                 if (db->db_id == db_id) {
614                         DLIST_REMOVE(client->db, db);
615                         break;
616                 }
617         }
618
619         return 0;
620 }
621
622 uint32_t ctdb_db_id(struct ctdb_db_context *db)
623 {
624         return db->db_id;
625 }
626
627 struct ctdb_db_traverse_local_state {
628         ctdb_rec_parser_func_t parser;
629         void *private_data;
630         bool extract_header;
631         int error;
632 };
633
634 static int ctdb_db_traverse_local_handler(struct tdb_context *tdb,
635                                           TDB_DATA key, TDB_DATA data,
636                                           void *private_data)
637 {
638         struct ctdb_db_traverse_local_state *state =
639                 (struct ctdb_db_traverse_local_state *)private_data;
640         int ret;
641
642         if (state->extract_header) {
643                 struct ctdb_ltdb_header header;
644
645                 ret = ctdb_ltdb_header_extract(&data, &header);
646                 if (ret != 0) {
647                         state->error = ret;
648                         return 1;
649                 }
650
651                 ret = state->parser(0, &header, key, data, state->private_data);
652         } else {
653                 ret = state->parser(0, NULL, key, data, state->private_data);
654         }
655
656         if (ret != 0) {
657                 state->error = ret;
658                 return 1;
659         }
660
661         return 0;
662 }
663
664 int ctdb_db_traverse_local(struct ctdb_db_context *db, bool readonly,
665                            bool extract_header,
666                            ctdb_rec_parser_func_t parser, void *private_data)
667 {
668         struct ctdb_db_traverse_local_state state;
669         int ret;
670
671         state.parser = parser;
672         state.private_data = private_data;
673         state.extract_header = extract_header;
674         state.error = 0;
675
676         if (readonly) {
677                 ret = tdb_traverse_read(db->ltdb->tdb,
678                                         ctdb_db_traverse_local_handler,
679                                         &state);
680         } else {
681                 ret = tdb_traverse(db->ltdb->tdb,
682                                    ctdb_db_traverse_local_handler, &state);
683         }
684
685         if (ret == -1) {
686                 return EIO;
687         }
688
689         return state.error;
690 }
691
692 struct ctdb_db_traverse_state {
693         struct tevent_context *ev;
694         struct ctdb_client_context *client;
695         struct ctdb_db_context *db;
696         uint32_t destnode;
697         uint64_t srvid;
698         struct timeval timeout;
699         ctdb_rec_parser_func_t parser;
700         void *private_data;
701         int result;
702 };
703
704 static void ctdb_db_traverse_handler_set(struct tevent_req *subreq);
705 static void ctdb_db_traverse_started(struct tevent_req *subreq);
706 static void ctdb_db_traverse_handler(uint64_t srvid, TDB_DATA data,
707                                      void *private_data);
708 static void ctdb_db_traverse_remove_handler(struct tevent_req *req);
709 static void ctdb_db_traverse_handler_removed(struct tevent_req *subreq);
710
711 struct tevent_req *ctdb_db_traverse_send(TALLOC_CTX *mem_ctx,
712                                          struct tevent_context *ev,
713                                          struct ctdb_client_context *client,
714                                          struct ctdb_db_context *db,
715                                          uint32_t destnode,
716                                          struct timeval timeout,
717                                          ctdb_rec_parser_func_t parser,
718                                          void *private_data)
719 {
720         struct tevent_req *req, *subreq;
721         struct ctdb_db_traverse_state *state;
722
723         req = tevent_req_create(mem_ctx, &state,
724                                 struct ctdb_db_traverse_state);
725         if (req == NULL) {
726                 return NULL;
727         }
728
729         state->ev = ev;
730         state->client = client;
731         state->db = db;
732         state->destnode = destnode;
733         state->srvid = CTDB_SRVID_CLIENT_RANGE | getpid();
734         state->timeout = timeout;
735         state->parser = parser;
736         state->private_data = private_data;
737
738         subreq = ctdb_client_set_message_handler_send(state, ev, client,
739                                                       state->srvid,
740                                                       ctdb_db_traverse_handler,
741                                                       req);
742         if (tevent_req_nomem(subreq, req)) {
743                 return tevent_req_post(req, ev);
744         }
745         tevent_req_set_callback(subreq, ctdb_db_traverse_handler_set, req);
746
747         return req;
748 }
749
750 static void ctdb_db_traverse_handler_set(struct tevent_req *subreq)
751 {
752         struct tevent_req *req = tevent_req_callback_data(
753                 subreq, struct tevent_req);
754         struct ctdb_db_traverse_state *state = tevent_req_data(
755                 req, struct ctdb_db_traverse_state);
756         struct ctdb_traverse_start_ext traverse;
757         struct ctdb_req_control request;
758         int ret = 0;
759         bool status;
760
761         status = ctdb_client_set_message_handler_recv(subreq, &ret);
762         TALLOC_FREE(subreq);
763         if (! status) {
764                 tevent_req_error(req, ret);
765                 return;
766         }
767
768         traverse = (struct ctdb_traverse_start_ext) {
769                 .db_id = ctdb_db_id(state->db),
770                 .reqid = 0,
771                 .srvid = state->srvid,
772                 .withemptyrecords = false,
773         };
774
775         ctdb_req_control_traverse_start_ext(&request, &traverse);
776         subreq = ctdb_client_control_send(state, state->ev, state->client,
777                                           state->destnode, state->timeout,
778                                           &request);
779         if (subreq == NULL) {
780                 state->result = ENOMEM;
781                 ctdb_db_traverse_remove_handler(req);
782                 return;
783         }
784         tevent_req_set_callback(subreq, ctdb_db_traverse_started, req);
785 }
786
787 static void ctdb_db_traverse_started(struct tevent_req *subreq)
788 {
789         struct tevent_req *req = tevent_req_callback_data(
790                 subreq, struct tevent_req);
791         struct ctdb_db_traverse_state *state = tevent_req_data(
792                 req, struct ctdb_db_traverse_state);
793         struct ctdb_reply_control *reply;
794         int ret = 0;
795         bool status;
796
797         status = ctdb_client_control_recv(subreq, &ret, state, &reply);
798         TALLOC_FREE(subreq);
799         if (! status) {
800                 DEBUG(DEBUG_ERR, ("traverse: control failed, ret=%d\n", ret));
801                 state->result = ret;
802                 ctdb_db_traverse_remove_handler(req);
803                 return;
804         }
805
806         ret = ctdb_reply_control_traverse_start_ext(reply);
807         talloc_free(reply);
808         if (ret != 0) {
809                 DEBUG(DEBUG_ERR, ("traverse: control reply failed, ret=%d\n",
810                                   ret));
811                 state->result = ret;
812                 ctdb_db_traverse_remove_handler(req);
813                 return;
814         }
815 }
816
817 static void ctdb_db_traverse_handler(uint64_t srvid, TDB_DATA data,
818                                      void *private_data)
819 {
820         struct tevent_req *req = talloc_get_type_abort(
821                 private_data, struct tevent_req);
822         struct ctdb_db_traverse_state *state = tevent_req_data(
823                 req, struct ctdb_db_traverse_state);
824         struct ctdb_rec_data *rec;
825         struct ctdb_ltdb_header header;
826         int ret;
827
828         ret = ctdb_rec_data_pull(data.dptr, data.dsize, state, &rec);
829         if (ret != 0) {
830                 return;
831         }
832
833         if (rec->key.dsize == 0 && rec->data.dsize == 0) {
834                 talloc_free(rec);
835                 ctdb_db_traverse_remove_handler(req);
836                 return;
837         }
838
839         ret = ctdb_ltdb_header_extract(&rec->data, &header);
840         if (ret != 0) {
841                 talloc_free(rec);
842                 return;
843         }
844
845         if (rec->data.dsize == 0) {
846                 talloc_free(rec);
847                 return;
848         }
849
850         ret = state->parser(rec->reqid, &header, rec->key, rec->data,
851                             state->private_data);
852         talloc_free(rec);
853         if (ret != 0) {
854                 state->result = ret;
855                 ctdb_db_traverse_remove_handler(req);
856         }
857 }
858
859 static void ctdb_db_traverse_remove_handler(struct tevent_req *req)
860 {
861         struct ctdb_db_traverse_state *state = tevent_req_data(
862                 req, struct ctdb_db_traverse_state);
863         struct tevent_req *subreq;
864
865         subreq = ctdb_client_remove_message_handler_send(state, state->ev,
866                                                          state->client,
867                                                          state->srvid, req);
868         if (tevent_req_nomem(subreq, req)) {
869                 return;
870         }
871         tevent_req_set_callback(subreq, ctdb_db_traverse_handler_removed, req);
872 }
873
874 static void ctdb_db_traverse_handler_removed(struct tevent_req *subreq)
875 {
876         struct tevent_req *req = tevent_req_callback_data(
877                 subreq, struct tevent_req);
878         struct ctdb_db_traverse_state *state = tevent_req_data(
879                 req, struct ctdb_db_traverse_state);
880         int ret;
881         bool status;
882
883         status = ctdb_client_remove_message_handler_recv(subreq, &ret);
884         TALLOC_FREE(subreq);
885         if (! status) {
886                 tevent_req_error(req, ret);
887                 return;
888         }
889
890         if (state->result != 0) {
891                 tevent_req_error(req, state->result);
892                 return;
893         }
894
895         tevent_req_done(req);
896 }
897
898 bool ctdb_db_traverse_recv(struct tevent_req *req, int *perr)
899 {
900         int ret;
901
902         if (tevent_req_is_unix_error(req, &ret)) {
903                 if (perr != NULL) {
904                         *perr = ret;
905                 }
906                 return false;
907         }
908
909         return true;
910 }
911
912 int ctdb_db_traverse(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
913                      struct ctdb_client_context *client,
914                      struct ctdb_db_context *db,
915                      uint32_t destnode, struct timeval timeout,
916                      ctdb_rec_parser_func_t parser, void *private_data)
917 {
918         struct tevent_req *req;
919         int ret = 0;
920         bool status;
921
922         req = ctdb_db_traverse_send(mem_ctx, ev, client, db, destnode,
923                                     timeout, parser, private_data);
924         if (req == NULL) {
925                 return ENOMEM;
926         }
927
928         tevent_req_poll(req, ev);
929
930         status = ctdb_db_traverse_recv(req, &ret);
931         if (! status) {
932                 return ret;
933         }
934
935         return 0;
936 }
937
938 int ctdb_ltdb_fetch(struct ctdb_db_context *db, TDB_DATA key,
939                     struct ctdb_ltdb_header *header,
940                     TALLOC_CTX *mem_ctx, TDB_DATA *data)
941 {
942         TDB_DATA rec;
943         int ret;
944
945         rec = tdb_fetch(db->ltdb->tdb, key);
946         if (rec.dsize < sizeof(struct ctdb_ltdb_header)) {
947                 /* No record present */
948                 if (rec.dptr != NULL) {
949                         free(rec.dptr);
950                 }
951
952                 if (tdb_error(db->ltdb->tdb) != TDB_ERR_NOEXIST) {
953                         return EIO;
954                 }
955
956                 header->rsn = 0;
957                 header->dmaster = CTDB_UNKNOWN_PNN;
958                 header->flags = 0;
959
960                 if (data != NULL) {
961                         *data = tdb_null;
962                 }
963                 return 0;
964         }
965
966         ret = ctdb_ltdb_header_pull(rec.dptr, rec.dsize, header);
967         if (ret != 0) {
968                 return ret;
969         }
970
971         ret = 0;
972         if (data != NULL) {
973                 size_t offset = ctdb_ltdb_header_len(header);
974
975                 data->dsize = rec.dsize - offset;
976                 data->dptr = talloc_memdup(mem_ctx, rec.dptr + offset,
977                                            data->dsize);
978                 if (data->dptr == NULL) {
979                         ret = ENOMEM;
980                 }
981         }
982
983         free(rec.dptr);
984         return ret;
985 }
986
987 /*
988  * Fetch a record from volatile database
989  *
990  * Steps:
991  *  1. Get a lock on the hash chain
992  *  2. If the record does not exist, migrate the record
993  *  3. If readonly=true and delegations do not exist, migrate the record.
994  *  4. If readonly=false and delegations exist, migrate the record.
995  *  5. If the local node is not dmaster, migrate the record.
996  *  6. Return record
997  */
998
999 struct ctdb_fetch_lock_state {
1000         struct tevent_context *ev;
1001         struct ctdb_client_context *client;
1002         struct ctdb_record_handle *h;
1003         bool readonly;
1004         uint32_t pnn;
1005 };
1006
1007 static int ctdb_fetch_lock_check(struct tevent_req *req);
1008 static void ctdb_fetch_lock_migrate(struct tevent_req *req);
1009 static void ctdb_fetch_lock_migrate_done(struct tevent_req *subreq);
1010
1011 struct tevent_req *ctdb_fetch_lock_send(TALLOC_CTX *mem_ctx,
1012                                         struct tevent_context *ev,
1013                                         struct ctdb_client_context *client,
1014                                         struct ctdb_db_context *db,
1015                                         TDB_DATA key, bool readonly)
1016 {
1017         struct ctdb_fetch_lock_state *state;
1018         struct tevent_req *req;
1019         int ret;
1020
1021         req = tevent_req_create(mem_ctx, &state, struct ctdb_fetch_lock_state);
1022         if (req == NULL) {
1023                 return NULL;
1024         }
1025
1026         state->ev = ev;
1027         state->client = client;
1028
1029         state->h = talloc_zero(db, struct ctdb_record_handle);
1030         if (tevent_req_nomem(state->h, req)) {
1031                 return tevent_req_post(req, ev);
1032         }
1033         state->h->client = client;
1034         state->h->db = db;
1035         state->h->key.dptr = talloc_memdup(state->h, key.dptr, key.dsize);
1036         if (tevent_req_nomem(state->h->key.dptr, req)) {
1037                 return tevent_req_post(req, ev);
1038         }
1039         state->h->key.dsize = key.dsize;
1040         state->h->readonly = false;
1041
1042         state->readonly = readonly;
1043         state->pnn = ctdb_client_pnn(client);
1044
1045         /* Check that database is not persistent */
1046         if (db->persistent) {
1047                 DEBUG(DEBUG_ERR, ("fetch_lock: %s database not volatile\n",
1048                                   db->db_name));
1049                 tevent_req_error(req, EINVAL);
1050                 return tevent_req_post(req, ev);
1051         }
1052
1053         ret = ctdb_fetch_lock_check(req);
1054         if (ret == 0) {
1055                 tevent_req_done(req);
1056                 return tevent_req_post(req, ev);
1057         }
1058         if (ret != EAGAIN) {
1059                 tevent_req_error(req, ret);
1060                 return tevent_req_post(req, ev);
1061         }
1062         return req;
1063 }
1064
1065 static int ctdb_fetch_lock_check(struct tevent_req *req)
1066 {
1067         struct ctdb_fetch_lock_state *state = tevent_req_data(
1068                 req, struct ctdb_fetch_lock_state);
1069         struct ctdb_record_handle *h = state->h;
1070         struct ctdb_ltdb_header header;
1071         TDB_DATA data = tdb_null;
1072         int ret, err = 0;
1073         bool do_migrate = false;
1074
1075         ret = tdb_chainlock(h->db->ltdb->tdb, h->key);
1076         if (ret != 0) {
1077                 DEBUG(DEBUG_ERR,
1078                       ("fetch_lock: %s tdb_chainlock failed, %s\n",
1079                        h->db->db_name, tdb_errorstr(h->db->ltdb->tdb)));
1080                 err = EIO;
1081                 goto failed;
1082         }
1083
1084         data = tdb_fetch(h->db->ltdb->tdb, h->key);
1085         if (data.dptr == NULL) {
1086                 if (tdb_error(h->db->ltdb->tdb) == TDB_ERR_NOEXIST) {
1087                         goto migrate;
1088                 } else {
1089                         err = EIO;
1090                         goto failed;
1091                 }
1092         }
1093
1094         /* Got the record */
1095         ret = ctdb_ltdb_header_pull(data.dptr, data.dsize, &header);
1096         if (ret != 0) {
1097                 err = ret;
1098                 goto failed;
1099         }
1100
1101         if (! state->readonly) {
1102                 /* Read/write access */
1103                 if (header.dmaster == state->pnn &&
1104                     header.flags & CTDB_REC_RO_HAVE_DELEGATIONS) {
1105                         goto migrate;
1106                 }
1107
1108                 if (header.dmaster != state->pnn) {
1109                         goto migrate;
1110                 }
1111         } else {
1112                 /* Readonly access */
1113                 if (header.dmaster != state->pnn &&
1114                     ! (header.flags & (CTDB_REC_RO_HAVE_READONLY |
1115                                        CTDB_REC_RO_HAVE_DELEGATIONS))) {
1116                         goto migrate;
1117                 }
1118         }
1119
1120         /* We are the dmaster or readonly delegation */
1121         h->header = header;
1122         h->data = data;
1123         if (header.flags & (CTDB_REC_RO_HAVE_READONLY |
1124                             CTDB_REC_RO_HAVE_DELEGATIONS)) {
1125                 h->readonly = true;
1126         }
1127         return 0;
1128
1129 migrate:
1130         do_migrate = true;
1131         err = EAGAIN;
1132
1133 failed:
1134         if (data.dptr != NULL) {
1135                 free(data.dptr);
1136         }
1137         ret = tdb_chainunlock(h->db->ltdb->tdb, h->key);
1138         if (ret != 0) {
1139                 DEBUG(DEBUG_ERR,
1140                       ("fetch_lock: %s tdb_chainunlock failed, %s\n",
1141                        h->db->db_name, tdb_errorstr(h->db->ltdb->tdb)));
1142                 return EIO;
1143         }
1144
1145         if (do_migrate) {
1146                 ctdb_fetch_lock_migrate(req);
1147         }
1148         return err;
1149 }
1150
1151 static void ctdb_fetch_lock_migrate(struct tevent_req *req)
1152 {
1153         struct ctdb_fetch_lock_state *state = tevent_req_data(
1154                 req, struct ctdb_fetch_lock_state);
1155         struct ctdb_req_call request;
1156         struct tevent_req *subreq;
1157
1158         ZERO_STRUCT(request);
1159         request.flags = CTDB_IMMEDIATE_MIGRATION;
1160         if (state->readonly) {
1161                 request.flags |= CTDB_WANT_READONLY;
1162         }
1163         request.db_id = state->h->db->db_id;
1164         request.callid = CTDB_NULL_FUNC;
1165         request.key = state->h->key;
1166         request.calldata = tdb_null;
1167
1168         subreq = ctdb_client_call_send(state, state->ev, state->client,
1169                                        &request);
1170         if (tevent_req_nomem(subreq, req)) {
1171                 return;
1172         }
1173
1174         tevent_req_set_callback(subreq, ctdb_fetch_lock_migrate_done, req);
1175 }
1176
1177 static void ctdb_fetch_lock_migrate_done(struct tevent_req *subreq)
1178 {
1179         struct tevent_req *req = tevent_req_callback_data(
1180                 subreq, struct tevent_req);
1181         struct ctdb_fetch_lock_state *state = tevent_req_data(
1182                 req, struct ctdb_fetch_lock_state);
1183         struct ctdb_reply_call *reply;
1184         int ret;
1185         bool status;
1186
1187         status = ctdb_client_call_recv(subreq, state, &reply, &ret);
1188         TALLOC_FREE(subreq);
1189         if (! status) {
1190                 DEBUG(DEBUG_ERR, ("fetch_lock: %s CALL failed, ret=%d\n",
1191                                   state->h->db->db_name, ret));
1192                 tevent_req_error(req, ret);
1193                 return;
1194         }
1195
1196         if (reply->status != 0) {
1197                 tevent_req_error(req, EIO);
1198                 return;
1199         }
1200         talloc_free(reply);
1201
1202         ret = ctdb_fetch_lock_check(req);
1203         if (ret != 0) {
1204                 if (ret != EAGAIN) {
1205                         tevent_req_error(req, ret);
1206                 }
1207                 return;
1208         }
1209
1210         tevent_req_done(req);
1211 }
1212
1213 static int ctdb_record_handle_destructor(struct ctdb_record_handle *h)
1214 {
1215         int ret;
1216
1217         ret = tdb_chainunlock(h->db->ltdb->tdb, h->key);
1218         if (ret != 0) {
1219                 DEBUG(DEBUG_ERR,
1220                       ("fetch_lock: %s tdb_chainunlock failed, %s\n",
1221                        h->db->db_name, tdb_errorstr(h->db->ltdb->tdb)));
1222         }
1223         free(h->data.dptr);
1224         return 0;
1225 }
1226
1227 struct ctdb_record_handle *ctdb_fetch_lock_recv(struct tevent_req *req,
1228                                                 struct ctdb_ltdb_header *header,
1229                                                 TALLOC_CTX *mem_ctx,
1230                                                 TDB_DATA *data, int *perr)
1231 {
1232         struct ctdb_fetch_lock_state *state = tevent_req_data(
1233                 req, struct ctdb_fetch_lock_state);
1234         struct ctdb_record_handle *h = state->h;
1235         int err;
1236
1237         if (tevent_req_is_unix_error(req, &err)) {
1238                 if (perr != NULL) {
1239                         TALLOC_FREE(state->h);
1240                         *perr = err;
1241                 }
1242                 return NULL;
1243         }
1244
1245         if (header != NULL) {
1246                 *header = h->header;
1247         }
1248         if (data != NULL) {
1249                 size_t offset;
1250
1251                 offset = ctdb_ltdb_header_len(&h->header);
1252
1253                 data->dsize = h->data.dsize - offset;
1254                 data->dptr = talloc_memdup(mem_ctx, h->data.dptr + offset,
1255                                            data->dsize);
1256                 if (data->dptr == NULL) {
1257                         TALLOC_FREE(state->h);
1258                         if (perr != NULL) {
1259                                 *perr = ENOMEM;
1260                         }
1261                         return NULL;
1262                 }
1263         }
1264
1265         talloc_set_destructor(h, ctdb_record_handle_destructor);
1266         return h;
1267 }
1268
1269 int ctdb_fetch_lock(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
1270                     struct ctdb_client_context *client,
1271                     struct ctdb_db_context *db, TDB_DATA key, bool readonly,
1272                     struct ctdb_record_handle **out,
1273                     struct ctdb_ltdb_header *header, TDB_DATA *data)
1274 {
1275         struct tevent_req *req;
1276         struct ctdb_record_handle *h;
1277         int ret;
1278
1279         req = ctdb_fetch_lock_send(mem_ctx, ev, client, db, key, readonly);
1280         if (req == NULL) {
1281                 return ENOMEM;
1282         }
1283
1284         tevent_req_poll(req, ev);
1285
1286         h = ctdb_fetch_lock_recv(req, header, mem_ctx, data, &ret);
1287         if (h == NULL) {
1288                 return ret;
1289         }
1290
1291         *out = h;
1292         return 0;
1293 }
1294
1295 int ctdb_store_record(struct ctdb_record_handle *h, TDB_DATA data)
1296 {
1297         uint8_t header[sizeof(struct ctdb_ltdb_header)];
1298         TDB_DATA rec[2];
1299         int ret;
1300
1301         /* Cannot modify the record if it was obtained as a readonly copy */
1302         if (h->readonly) {
1303                 return EINVAL;
1304         }
1305
1306         /* Check if the new data is same */
1307         if (h->data.dsize == data.dsize &&
1308             memcmp(h->data.dptr, data.dptr, data.dsize) == 0) {
1309                 /* No need to do anything */
1310                 return 0;
1311         }
1312
1313         ctdb_ltdb_header_push(&h->header, header);
1314
1315         rec[0].dsize = ctdb_ltdb_header_len(&h->header);
1316         rec[0].dptr = header;
1317
1318         rec[1].dsize = data.dsize;
1319         rec[1].dptr = data.dptr;
1320
1321         ret = tdb_storev(h->db->ltdb->tdb, h->key, rec, 2, TDB_REPLACE);
1322         if (ret != 0) {
1323                 DEBUG(DEBUG_ERR,
1324                       ("store_record: %s tdb_storev failed, %s\n",
1325                        h->db->db_name, tdb_errorstr(h->db->ltdb->tdb)));
1326                 return EIO;
1327         }
1328
1329         return 0;
1330 }
1331
1332 struct ctdb_delete_record_state {
1333         struct ctdb_record_handle *h;
1334 };
1335
1336 static void ctdb_delete_record_done(struct tevent_req *subreq);
1337
1338 struct tevent_req *ctdb_delete_record_send(TALLOC_CTX *mem_ctx,
1339                                            struct tevent_context *ev,
1340                                            struct ctdb_record_handle *h)
1341 {
1342         struct tevent_req *req, *subreq;
1343         struct ctdb_delete_record_state *state;
1344         struct ctdb_key_data key;
1345         struct ctdb_req_control request;
1346         uint8_t header[sizeof(struct ctdb_ltdb_header)];
1347         TDB_DATA rec;
1348         int ret;
1349
1350         req = tevent_req_create(mem_ctx, &state,
1351                                 struct ctdb_delete_record_state);
1352         if (req == NULL) {
1353                 return NULL;
1354         }
1355
1356         state->h = h;
1357
1358         /* Cannot delete the record if it was obtained as a readonly copy */
1359         if (h->readonly) {
1360                 DEBUG(DEBUG_ERR, ("fetch_lock delete: %s readonly record\n",
1361                                   h->db->db_name));
1362                 tevent_req_error(req, EINVAL);
1363                 return tevent_req_post(req, ev);
1364         }
1365
1366         ctdb_ltdb_header_push(&h->header, header);
1367
1368         rec.dsize = ctdb_ltdb_header_len(&h->header);
1369         rec.dptr = header;
1370
1371         ret = tdb_store(h->db->ltdb->tdb, h->key, rec, TDB_REPLACE);
1372         if (ret != 0) {
1373                 DEBUG(DEBUG_ERR,
1374                       ("fetch_lock delete: %s tdb_sore failed, %s\n",
1375                        h->db->db_name, tdb_errorstr(h->db->ltdb->tdb)));
1376                 tevent_req_error(req, EIO);
1377                 return tevent_req_post(req, ev);
1378         }
1379
1380         key.db_id = h->db->db_id;
1381         key.header = h->header;
1382         key.key = h->key;
1383
1384         ctdb_req_control_schedule_for_deletion(&request, &key);
1385         subreq = ctdb_client_control_send(state, ev, h->client,
1386                                           ctdb_client_pnn(h->client),
1387                                           tevent_timeval_zero(),
1388                                           &request);
1389         if (tevent_req_nomem(subreq, req)) {
1390                 return tevent_req_post(req, ev);
1391         }
1392         tevent_req_set_callback(subreq, ctdb_delete_record_done, req);
1393
1394         return req;
1395 }
1396
1397 static void ctdb_delete_record_done(struct tevent_req *subreq)
1398 {
1399         struct tevent_req *req = tevent_req_callback_data(
1400                 subreq, struct tevent_req);
1401         struct ctdb_delete_record_state *state = tevent_req_data(
1402                 req, struct ctdb_delete_record_state);
1403         int ret;
1404         bool status;
1405
1406         status = ctdb_client_control_recv(subreq, &ret, NULL, NULL);
1407         TALLOC_FREE(subreq);
1408         if (! status) {
1409                 DEBUG(DEBUG_ERR,
1410                       ("delete_record: %s SCHDULE_FOR_DELETION failed, "
1411                        "ret=%d\n", state->h->db->db_name, ret));
1412                 tevent_req_error(req, ret);
1413                 return;
1414         }
1415
1416         tevent_req_done(req);
1417 }
1418
1419 bool ctdb_delete_record_recv(struct tevent_req *req, int *perr)
1420 {
1421         int err;
1422
1423         if (tevent_req_is_unix_error(req, &err)) {
1424                 if (perr != NULL) {
1425                         *perr = err;
1426                 }
1427                 return false;
1428         }
1429
1430         return true;
1431 }
1432
1433
1434 int ctdb_delete_record(struct ctdb_record_handle *h)
1435 {
1436         struct tevent_context *ev = h->ev;
1437         TALLOC_CTX *mem_ctx;
1438         struct tevent_req *req;
1439         int ret;
1440         bool status;
1441
1442         mem_ctx = talloc_new(NULL);
1443         if (mem_ctx == NULL) {
1444                 return ENOMEM;
1445         }
1446
1447         req = ctdb_delete_record_send(mem_ctx, ev, h);
1448         if (req == NULL) {
1449                 talloc_free(mem_ctx);
1450                 return ENOMEM;
1451         }
1452
1453         tevent_req_poll(req, ev);
1454
1455         status = ctdb_delete_record_recv(req, &ret);
1456         talloc_free(mem_ctx);
1457         if (! status) {
1458                 return ret;
1459         }
1460
1461         return 0;
1462 }
1463
1464 /*
1465  * Global lock functions
1466  */
1467
1468 struct ctdb_g_lock_lock_state {
1469         struct tevent_context *ev;
1470         struct ctdb_client_context *client;
1471         struct ctdb_db_context *db;
1472         TDB_DATA key;
1473         struct ctdb_server_id my_sid;
1474         enum ctdb_g_lock_type lock_type;
1475         struct ctdb_record_handle *h;
1476         /* state for verification of active locks */
1477         struct ctdb_g_lock_list *lock_list;
1478         unsigned int current;
1479 };
1480
1481 static void ctdb_g_lock_lock_fetched(struct tevent_req *subreq);
1482 static void ctdb_g_lock_lock_process_locks(struct tevent_req *req);
1483 static void ctdb_g_lock_lock_checked(struct tevent_req *subreq);
1484 static int ctdb_g_lock_lock_update(struct tevent_req *req);
1485 static void ctdb_g_lock_lock_retry(struct tevent_req *subreq);
1486
1487 static bool ctdb_g_lock_conflicts(enum ctdb_g_lock_type l1,
1488                                   enum ctdb_g_lock_type l2)
1489 {
1490         if ((l1 == CTDB_G_LOCK_READ) && (l2 == CTDB_G_LOCK_READ)) {
1491                 return false;
1492         }
1493         return true;
1494 }
1495
1496 struct tevent_req *ctdb_g_lock_lock_send(TALLOC_CTX *mem_ctx,
1497                                          struct tevent_context *ev,
1498                                          struct ctdb_client_context *client,
1499                                          struct ctdb_db_context *db,
1500                                          const char *keyname,
1501                                          struct ctdb_server_id *sid,
1502                                          bool readonly)
1503 {
1504         struct tevent_req *req, *subreq;
1505         struct ctdb_g_lock_lock_state *state;
1506
1507         req = tevent_req_create(mem_ctx, &state,
1508                                 struct ctdb_g_lock_lock_state);
1509         if (req == NULL) {
1510                 return NULL;
1511         }
1512
1513         state->ev = ev;
1514         state->client = client;
1515         state->db = db;
1516         state->key.dptr = discard_const(keyname);
1517         state->key.dsize = strlen(keyname) + 1;
1518         state->my_sid = *sid;
1519         state->lock_type = (readonly ? CTDB_G_LOCK_READ : CTDB_G_LOCK_WRITE);
1520
1521         subreq = ctdb_fetch_lock_send(state, ev, client, db, state->key,
1522                                       false);
1523         if (tevent_req_nomem(subreq, req)) {
1524                 return tevent_req_post(req, ev);
1525         }
1526         tevent_req_set_callback(subreq, ctdb_g_lock_lock_fetched, req);
1527
1528         return req;
1529 }
1530
1531 static void ctdb_g_lock_lock_fetched(struct tevent_req *subreq)
1532 {
1533         struct tevent_req *req = tevent_req_callback_data(
1534                 subreq, struct tevent_req);
1535         struct ctdb_g_lock_lock_state *state = tevent_req_data(
1536                 req, struct ctdb_g_lock_lock_state);
1537         TDB_DATA data;
1538         int ret = 0;
1539
1540         state->h = ctdb_fetch_lock_recv(subreq, NULL, state, &data, &ret);
1541         TALLOC_FREE(subreq);
1542         if (state->h == NULL) {
1543                 DEBUG(DEBUG_ERR, ("g_lock_lock: %s fetch lock failed\n",
1544                                   (char *)state->key.dptr));
1545                 tevent_req_error(req, ret);
1546                 return;
1547         }
1548
1549         if (state->lock_list != NULL) {
1550                 TALLOC_FREE(state->lock_list);
1551                 state->current = 0;
1552         }
1553
1554         ret = ctdb_g_lock_list_pull(data.dptr, data.dsize, state,
1555                                     &state->lock_list);
1556         talloc_free(data.dptr);
1557         if (ret != 0) {
1558                 DEBUG(DEBUG_ERR, ("g_lock_lock: %s invalid lock data\n",
1559                                   (char *)state->key.dptr));
1560                 tevent_req_error(req, ret);
1561                 return;
1562         }
1563
1564         ctdb_g_lock_lock_process_locks(req);
1565 }
1566
1567 static void ctdb_g_lock_lock_process_locks(struct tevent_req *req)
1568 {
1569         struct ctdb_g_lock_lock_state *state = tevent_req_data(
1570                 req, struct ctdb_g_lock_lock_state);
1571         struct tevent_req *subreq;
1572         struct ctdb_g_lock *lock;
1573         bool check_server = false;
1574         int ret;
1575
1576         while (state->current < state->lock_list->num) {
1577                 lock = &state->lock_list->lock[state->current];
1578
1579                 /* We should not ask for the same lock more than once */
1580                 if (ctdb_server_id_equal(&lock->sid, &state->my_sid)) {
1581                         DEBUG(DEBUG_ERR, ("g_lock_lock: %s deadlock\n",
1582                                           (char *)state->key.dptr));
1583                         tevent_req_error(req, EDEADLK);
1584                         return;
1585                 }
1586
1587                 if (ctdb_g_lock_conflicts(lock->type, state->lock_type)) {
1588                         check_server = true;
1589                         break;
1590                 }
1591
1592                 state->current += 1;
1593         }
1594
1595         if (check_server) {
1596                 struct ctdb_req_control request;
1597
1598                 ctdb_req_control_process_exists(&request, lock->sid.pid);
1599                 subreq = ctdb_client_control_send(state, state->ev,
1600                                                   state->client,
1601                                                   lock->sid.vnn,
1602                                                   tevent_timeval_zero(),
1603                                                   &request);
1604                 if (tevent_req_nomem(subreq, req)) {
1605                         return;
1606                 }
1607                 tevent_req_set_callback(subreq, ctdb_g_lock_lock_checked, req);
1608                 return;
1609         }
1610
1611         /* There is no conflict, add ourself to the lock_list */
1612         state->lock_list->lock = talloc_realloc(state->lock_list,
1613                                                 state->lock_list->lock,
1614                                                 struct ctdb_g_lock,
1615                                                 state->lock_list->num + 1);
1616         if (state->lock_list->lock == NULL) {
1617                 tevent_req_error(req, ENOMEM);
1618                 return;
1619         }
1620
1621         lock = &state->lock_list->lock[state->lock_list->num];
1622         lock->type = state->lock_type;
1623         lock->sid = state->my_sid;
1624         state->lock_list->num += 1;
1625
1626         ret = ctdb_g_lock_lock_update(req);
1627         if (ret != 0) {
1628                 tevent_req_error(req, ret);
1629                 return;
1630         }
1631
1632         TALLOC_FREE(state->h);
1633         tevent_req_done(req);
1634 }
1635
1636 static void ctdb_g_lock_lock_checked(struct tevent_req *subreq)
1637 {
1638         struct tevent_req *req = tevent_req_callback_data(
1639                 subreq, struct tevent_req);
1640         struct ctdb_g_lock_lock_state *state = tevent_req_data(
1641                 req, struct ctdb_g_lock_lock_state);
1642         struct ctdb_reply_control *reply;
1643         int ret, value;
1644         bool status;
1645
1646         status = ctdb_client_control_recv(subreq, &ret, state, &reply);
1647         TALLOC_FREE(subreq);
1648         if (! status) {
1649                 DEBUG(DEBUG_ERR,
1650                       ("g_lock_lock: %s PROCESS_EXISTS failed, ret=%d\n",
1651                        (char *)state->key.dptr, ret));
1652                 tevent_req_error(req, ret);
1653                 return;
1654         }
1655
1656         ret = ctdb_reply_control_process_exists(reply, &value);
1657         if (ret != 0) {
1658                 tevent_req_error(req, ret);
1659                 return;
1660         }
1661         talloc_free(reply);
1662
1663         if (value == 0) {
1664                 /* server process exists, need to retry */
1665                 TALLOC_FREE(state->h);
1666                 subreq = tevent_wakeup_send(state, state->ev,
1667                                             tevent_timeval_current_ofs(0,1000));
1668                 if (tevent_req_nomem(subreq, req)) {
1669                         return;
1670                 }
1671                 tevent_req_set_callback(subreq, ctdb_g_lock_lock_retry, req);
1672                 return;
1673         }
1674
1675         /* server process does not exist, remove conflicting entry */
1676         state->lock_list->lock[state->current] =
1677                 state->lock_list->lock[state->lock_list->num-1];
1678         state->lock_list->num -= 1;
1679
1680         ret = ctdb_g_lock_lock_update(req);
1681         if (ret != 0) {
1682                 tevent_req_error(req, ret);
1683                 return;
1684         }
1685
1686         ctdb_g_lock_lock_process_locks(req);
1687 }
1688
1689 static int ctdb_g_lock_lock_update(struct tevent_req *req)
1690 {
1691         struct ctdb_g_lock_lock_state *state = tevent_req_data(
1692                 req, struct ctdb_g_lock_lock_state);
1693         TDB_DATA data;
1694         int ret;
1695
1696         data.dsize = ctdb_g_lock_list_len(state->lock_list);
1697         data.dptr = talloc_size(state, data.dsize);
1698         if (data.dptr == NULL) {
1699                 return ENOMEM;
1700         }
1701
1702         ctdb_g_lock_list_push(state->lock_list, data.dptr);
1703         ret = ctdb_store_record(state->h, data);
1704         talloc_free(data.dptr);
1705         return ret;
1706 }
1707
1708 static void ctdb_g_lock_lock_retry(struct tevent_req *subreq)
1709 {
1710         struct tevent_req *req = tevent_req_callback_data(
1711                 subreq, struct tevent_req);
1712         struct ctdb_g_lock_lock_state *state = tevent_req_data(
1713                 req, struct ctdb_g_lock_lock_state);
1714         bool success;
1715
1716         success = tevent_wakeup_recv(subreq);
1717         TALLOC_FREE(subreq);
1718         if (! success) {
1719                 tevent_req_error(req, ENOMEM);
1720                 return;
1721         }
1722
1723         subreq = ctdb_fetch_lock_send(state, state->ev, state->client,
1724                                       state->db, state->key, false);
1725         if (tevent_req_nomem(subreq, req)) {
1726                 return;
1727         }
1728         tevent_req_set_callback(subreq, ctdb_g_lock_lock_fetched, req);
1729 }
1730
1731 bool ctdb_g_lock_lock_recv(struct tevent_req *req, int *perr)
1732 {
1733         struct ctdb_g_lock_lock_state *state = tevent_req_data(
1734                 req, struct ctdb_g_lock_lock_state);
1735         int err;
1736
1737         TALLOC_FREE(state->h);
1738
1739         if (tevent_req_is_unix_error(req, &err)) {
1740                 if (perr != NULL) {
1741                         *perr = err;
1742                 }
1743                 return false;
1744         }
1745
1746         return true;
1747 }
1748
1749 struct ctdb_g_lock_unlock_state {
1750         struct tevent_context *ev;
1751         struct ctdb_client_context *client;
1752         struct ctdb_db_context *db;
1753         TDB_DATA key;
1754         struct ctdb_server_id my_sid;
1755         struct ctdb_record_handle *h;
1756         struct ctdb_g_lock_list *lock_list;
1757 };
1758
1759 static void ctdb_g_lock_unlock_fetched(struct tevent_req *subreq);
1760 static int ctdb_g_lock_unlock_update(struct tevent_req *req);
1761 static void ctdb_g_lock_unlock_deleted(struct tevent_req *subreq);
1762
1763 struct tevent_req *ctdb_g_lock_unlock_send(TALLOC_CTX *mem_ctx,
1764                                            struct tevent_context *ev,
1765                                            struct ctdb_client_context *client,
1766                                            struct ctdb_db_context *db,
1767                                            const char *keyname,
1768                                            struct ctdb_server_id sid)
1769 {
1770         struct tevent_req *req, *subreq;
1771         struct ctdb_g_lock_unlock_state *state;
1772
1773         req = tevent_req_create(mem_ctx, &state,
1774                                 struct ctdb_g_lock_unlock_state);
1775         if (req == NULL) {
1776                 return NULL;
1777         }
1778
1779         state->ev = ev;
1780         state->client = client;
1781         state->db = db;
1782         state->key.dptr = discard_const(keyname);
1783         state->key.dsize = strlen(keyname) + 1;
1784         state->my_sid = sid;
1785
1786         subreq = ctdb_fetch_lock_send(state, ev, client, db, state->key,
1787                                       false);
1788         if (tevent_req_nomem(subreq, req)) {
1789                 return tevent_req_post(req, ev);
1790         }
1791         tevent_req_set_callback(subreq, ctdb_g_lock_unlock_fetched, req);
1792
1793         return req;
1794 }
1795
1796 static void ctdb_g_lock_unlock_fetched(struct tevent_req *subreq)
1797 {
1798         struct tevent_req *req = tevent_req_callback_data(
1799                 subreq, struct tevent_req);
1800         struct ctdb_g_lock_unlock_state *state = tevent_req_data(
1801                 req, struct ctdb_g_lock_unlock_state);
1802         TDB_DATA data;
1803         int ret = 0;
1804
1805         state->h = ctdb_fetch_lock_recv(subreq, NULL, state, &data, &ret);
1806         TALLOC_FREE(subreq);
1807         if (state->h == NULL) {
1808                 DEBUG(DEBUG_ERR, ("g_lock_unlock: %s fetch lock failed\n",
1809                                   (char *)state->key.dptr));
1810                 tevent_req_error(req, ret);
1811                 return;
1812         }
1813
1814         ret = ctdb_g_lock_list_pull(data.dptr, data.dsize, state,
1815                                     &state->lock_list);
1816         if (ret != 0) {
1817                 DEBUG(DEBUG_ERR, ("g_lock_unlock: %s invalid lock data\n",
1818                                   (char *)state->key.dptr));
1819                 tevent_req_error(req, ret);
1820                 return;
1821         }
1822
1823         ret = ctdb_g_lock_unlock_update(req);
1824         if (ret != 0) {
1825                 tevent_req_error(req, ret);
1826                 return;
1827         }
1828
1829         if (state->lock_list->num == 0) {
1830                 subreq = ctdb_delete_record_send(state, state->ev, state->h);
1831                 if (tevent_req_nomem(subreq, req)) {
1832                         return;
1833                 }
1834                 tevent_req_set_callback(subreq, ctdb_g_lock_unlock_deleted,
1835                                         req);
1836                 return;
1837         }
1838
1839         TALLOC_FREE(state->h);
1840         tevent_req_done(req);
1841 }
1842
1843 static int ctdb_g_lock_unlock_update(struct tevent_req *req)
1844 {
1845         struct ctdb_g_lock_unlock_state *state = tevent_req_data(
1846                 req, struct ctdb_g_lock_unlock_state);
1847         struct ctdb_g_lock *lock;
1848         int ret, i;
1849
1850         for (i=0; i<state->lock_list->num; i++) {
1851                 lock = &state->lock_list->lock[i];
1852
1853                 if (ctdb_server_id_equal(&lock->sid, &state->my_sid)) {
1854                         break;
1855                 }
1856         }
1857
1858         if (i < state->lock_list->num) {
1859                 state->lock_list->lock[i] =
1860                         state->lock_list->lock[state->lock_list->num-1];
1861                 state->lock_list->num -= 1;
1862         }
1863
1864         if (state->lock_list->num != 0) {
1865                 TDB_DATA data;
1866
1867                 data.dsize = ctdb_g_lock_list_len(state->lock_list);
1868                 data.dptr = talloc_size(state, data.dsize);
1869                 if (data.dptr == NULL) {
1870                         return ENOMEM;
1871                 }
1872
1873                 ctdb_g_lock_list_push(state->lock_list, data.dptr);
1874                 ret = ctdb_store_record(state->h, data);
1875                 talloc_free(data.dptr);
1876                 if (ret != 0) {
1877                         return ret;
1878                 }
1879         }
1880
1881         return 0;
1882 }
1883
1884 static void ctdb_g_lock_unlock_deleted(struct tevent_req *subreq)
1885 {
1886         struct tevent_req *req = tevent_req_callback_data(
1887                 subreq, struct tevent_req);
1888         struct ctdb_g_lock_unlock_state *state = tevent_req_data(
1889                 req, struct ctdb_g_lock_unlock_state);
1890         int ret;
1891         bool status;
1892
1893         status = ctdb_delete_record_recv(subreq, &ret);
1894         if (! status) {
1895                 DEBUG(DEBUG_ERR,
1896                       ("g_lock_unlock %s delete record failed, ret=%d\n",
1897                        (char *)state->key.dptr, ret));
1898                 tevent_req_error(req, ret);
1899                 return;
1900         }
1901
1902         TALLOC_FREE(state->h);
1903         tevent_req_done(req);
1904 }
1905
1906 bool ctdb_g_lock_unlock_recv(struct tevent_req *req, int *perr)
1907 {
1908         struct ctdb_g_lock_unlock_state *state = tevent_req_data(
1909                 req, struct ctdb_g_lock_unlock_state);
1910         int err;
1911
1912         TALLOC_FREE(state->h);
1913
1914         if (tevent_req_is_unix_error(req, &err)) {
1915                 if (perr != NULL) {
1916                         *perr = err;
1917                 }
1918                 return false;
1919         }
1920
1921         return true;
1922 }
1923
1924 /*
1925  * Persistent database functions
1926  */
1927 struct ctdb_transaction_start_state {
1928         struct tevent_context *ev;
1929         struct ctdb_client_context *client;
1930         struct timeval timeout;
1931         struct ctdb_transaction_handle *h;
1932         uint32_t destnode;
1933 };
1934
1935 static void ctdb_transaction_g_lock_attached(struct tevent_req *subreq);
1936 static void ctdb_transaction_g_lock_done(struct tevent_req *subreq);
1937
1938 struct tevent_req *ctdb_transaction_start_send(TALLOC_CTX *mem_ctx,
1939                                                struct tevent_context *ev,
1940                                                struct ctdb_client_context *client,
1941                                                struct timeval timeout,
1942                                                struct ctdb_db_context *db,
1943                                                bool readonly)
1944 {
1945         struct ctdb_transaction_start_state *state;
1946         struct tevent_req *req, *subreq;
1947         struct ctdb_transaction_handle *h;
1948
1949         req = tevent_req_create(mem_ctx, &state,
1950                                 struct ctdb_transaction_start_state);
1951         if (req == NULL) {
1952                 return NULL;
1953         }
1954
1955         if (! db->persistent) {
1956                 tevent_req_error(req, EINVAL);
1957                 return tevent_req_post(req, ev);
1958         }
1959
1960         state->ev = ev;
1961         state->client = client;
1962         state->destnode = ctdb_client_pnn(client);
1963
1964         h = talloc_zero(db, struct ctdb_transaction_handle);
1965         if (tevent_req_nomem(h, req)) {
1966                 return tevent_req_post(req, ev);
1967         }
1968
1969         h->ev = ev;
1970         h->client = client;
1971         h->db = db;
1972         h->readonly = readonly;
1973         h->updated = false;
1974
1975         /* SRVID is unique for databases, so client can have transactions
1976          * active for multiple databases */
1977         h->sid = ctdb_client_get_server_id(client, db->db_id);
1978
1979         h->recbuf = ctdb_rec_buffer_init(h, db->db_id);
1980         if (tevent_req_nomem(h->recbuf, req)) {
1981                 return tevent_req_post(req, ev);
1982         }
1983
1984         h->lock_name = talloc_asprintf(h, "transaction_db_0x%08x", db->db_id);
1985         if (tevent_req_nomem(h->lock_name, req)) {
1986                 return tevent_req_post(req, ev);
1987         }
1988
1989         state->h = h;
1990
1991         subreq = ctdb_attach_send(state, ev, client, timeout, "g_lock.tdb", 0);
1992         if (tevent_req_nomem(subreq, req)) {
1993                 return tevent_req_post(req, ev);
1994         }
1995         tevent_req_set_callback(subreq, ctdb_transaction_g_lock_attached, req);
1996
1997         return req;
1998 }
1999
2000 static void ctdb_transaction_g_lock_attached(struct tevent_req *subreq)
2001 {
2002         struct tevent_req *req = tevent_req_callback_data(
2003                 subreq, struct tevent_req);
2004         struct ctdb_transaction_start_state *state = tevent_req_data(
2005                 req, struct ctdb_transaction_start_state);
2006         bool status;
2007         int ret;
2008
2009         status = ctdb_attach_recv(subreq, &ret, &state->h->db_g_lock);
2010         TALLOC_FREE(subreq);
2011         if (! status) {
2012                 DEBUG(DEBUG_ERR,
2013                       ("transaction_start: %s attach g_lock.tdb failed\n",
2014                        state->h->db->db_name));
2015                 tevent_req_error(req, ret);
2016                 return;
2017         }
2018
2019         subreq = ctdb_g_lock_lock_send(state, state->ev, state->client,
2020                                        state->h->db_g_lock,
2021                                        state->h->lock_name,
2022                                        &state->h->sid, state->h->readonly);
2023         if (tevent_req_nomem(subreq, req)) {
2024                 return;
2025         }
2026         tevent_req_set_callback(subreq, ctdb_transaction_g_lock_done, req);
2027 }
2028
2029 static void ctdb_transaction_g_lock_done(struct tevent_req *subreq)
2030 {
2031         struct tevent_req *req = tevent_req_callback_data(
2032                 subreq, struct tevent_req);
2033         struct ctdb_transaction_start_state *state = tevent_req_data(
2034                 req, struct ctdb_transaction_start_state);
2035         int ret;
2036         bool status;
2037
2038         status = ctdb_g_lock_lock_recv(subreq, &ret);
2039         TALLOC_FREE(subreq);
2040         if (! status) {
2041                 DEBUG(DEBUG_ERR,
2042                       ("transaction_start: %s g_lock lock failed, ret=%d\n",
2043                        state->h->db->db_name, ret));
2044                 tevent_req_error(req, ret);
2045                 return;
2046         }
2047
2048         tevent_req_done(req);
2049 }
2050
2051 struct ctdb_transaction_handle *ctdb_transaction_start_recv(
2052                                         struct tevent_req *req,
2053                                         int *perr)
2054 {
2055         struct ctdb_transaction_start_state *state = tevent_req_data(
2056                 req, struct ctdb_transaction_start_state);
2057         int err;
2058
2059         if (tevent_req_is_unix_error(req, &err)) {
2060                 if (perr != NULL) {
2061                         *perr = err;
2062                 }
2063                 return NULL;
2064         }
2065
2066         return state->h;
2067 }
2068
2069 int ctdb_transaction_start(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
2070                            struct ctdb_client_context *client,
2071                            struct timeval timeout,
2072                            struct ctdb_db_context *db, bool readonly,
2073                            struct ctdb_transaction_handle **out)
2074 {
2075         struct tevent_req *req;
2076         struct ctdb_transaction_handle *h;
2077         int ret;
2078
2079         req = ctdb_transaction_start_send(mem_ctx, ev, client, timeout, db,
2080                                           readonly);
2081         if (req == NULL) {
2082                 return ENOMEM;
2083         }
2084
2085         tevent_req_poll(req, ev);
2086
2087         h = ctdb_transaction_start_recv(req, &ret);
2088         if (h == NULL) {
2089                 return ret;
2090         }
2091
2092         *out = h;
2093         return 0;
2094 }
2095
2096 struct ctdb_transaction_record_fetch_state {
2097         TDB_DATA key, data;
2098         struct ctdb_ltdb_header header;
2099         bool found;
2100 };
2101
2102 static int ctdb_transaction_record_fetch_traverse(
2103                                 uint32_t reqid,
2104                                 struct ctdb_ltdb_header *nullheader,
2105                                 TDB_DATA key, TDB_DATA data,
2106                                 void *private_data)
2107 {
2108         struct ctdb_transaction_record_fetch_state *state =
2109                 (struct ctdb_transaction_record_fetch_state *)private_data;
2110
2111         if (state->key.dsize == key.dsize &&
2112             memcmp(state->key.dptr, key.dptr, key.dsize) == 0) {
2113                 int ret;
2114
2115                 ret = ctdb_ltdb_header_extract(&data, &state->header);
2116                 if (ret != 0) {
2117                         DEBUG(DEBUG_ERR,
2118                               ("record_fetch: Failed to extract header, "
2119                                "ret=%d\n", ret));
2120                         return 1;
2121                 }
2122
2123                 state->data = data;
2124                 state->found = true;
2125         }
2126
2127         return 0;
2128 }
2129
2130 static int ctdb_transaction_record_fetch(struct ctdb_transaction_handle *h,
2131                                          TDB_DATA key,
2132                                          struct ctdb_ltdb_header *header,
2133                                          TDB_DATA *data)
2134 {
2135         struct ctdb_transaction_record_fetch_state state;
2136         int ret;
2137
2138         state.key = key;
2139         state.found = false;
2140
2141         ret = ctdb_rec_buffer_traverse(h->recbuf,
2142                                        ctdb_transaction_record_fetch_traverse,
2143                                        &state);
2144         if (ret != 0) {
2145                 return ret;
2146         }
2147
2148         if (state.found) {
2149                 if (header != NULL) {
2150                         *header = state.header;
2151                 }
2152                 if (data != NULL) {
2153                         *data = state.data;
2154                 }
2155                 return 0;
2156         }
2157
2158         return ENOENT;
2159 }
2160
2161 int ctdb_transaction_fetch_record(struct ctdb_transaction_handle *h,
2162                                   TDB_DATA key,
2163                                   TALLOC_CTX *mem_ctx, TDB_DATA *data)
2164 {
2165         TDB_DATA tmp_data;
2166         struct ctdb_ltdb_header header;
2167         int ret;
2168
2169         ret = ctdb_transaction_record_fetch(h, key, NULL, &tmp_data);
2170         if (ret == 0) {
2171                 data->dptr = talloc_memdup(mem_ctx, tmp_data.dptr,
2172                                            tmp_data.dsize);
2173                 if (data->dptr == NULL) {
2174                         return ENOMEM;
2175                 }
2176                 data->dsize = tmp_data.dsize;
2177                 return 0;
2178         }
2179
2180         ret = ctdb_ltdb_fetch(h->db, key, &header, mem_ctx, data);
2181         if (ret != 0) {
2182                 return ret;
2183         }
2184
2185         ret = ctdb_rec_buffer_add(h, h->recbuf, 0, &header, key, *data);
2186         if (ret != 0) {
2187                 return ret;
2188         }
2189
2190         return 0;
2191 }
2192
2193 int ctdb_transaction_store_record(struct ctdb_transaction_handle *h,
2194                                   TDB_DATA key, TDB_DATA data)
2195 {
2196         TALLOC_CTX *tmp_ctx;
2197         struct ctdb_ltdb_header header;
2198         TDB_DATA old_data;
2199         int ret;
2200
2201         if (h->readonly) {
2202                 return EINVAL;
2203         }
2204
2205         tmp_ctx = talloc_new(h);
2206         if (tmp_ctx == NULL) {
2207                 return ENOMEM;
2208         }
2209
2210         ret = ctdb_transaction_record_fetch(h, key, &header, &old_data);
2211         if (ret != 0) {
2212                 ret = ctdb_ltdb_fetch(h->db, key, &header, tmp_ctx, &old_data);
2213                 if (ret != 0) {
2214                         return ret;
2215                 }
2216         }
2217
2218         if (old_data.dsize == data.dsize &&
2219             memcmp(old_data.dptr, data.dptr, data.dsize) == 0) {
2220                 talloc_free(tmp_ctx);
2221                 return 0;
2222         }
2223
2224         header.dmaster = ctdb_client_pnn(h->client);
2225         header.rsn += 1;
2226
2227         ret = ctdb_rec_buffer_add(h, h->recbuf, 0, &header, key, data);
2228         talloc_free(tmp_ctx);
2229         if (ret != 0) {
2230                 return ret;
2231         }
2232         h->updated = true;
2233
2234         return 0;
2235 }
2236
2237 int ctdb_transaction_delete_record(struct ctdb_transaction_handle *h,
2238                                    TDB_DATA key)
2239 {
2240         return ctdb_transaction_store_record(h, key, tdb_null);
2241 }
2242
2243 static int ctdb_transaction_fetch_db_seqnum(struct ctdb_transaction_handle *h,
2244                                             uint64_t *seqnum)
2245 {
2246         const char *keyname = CTDB_DB_SEQNUM_KEY;
2247         TDB_DATA key, data;
2248         struct ctdb_ltdb_header header;
2249         int ret;
2250
2251         key.dptr = discard_const(keyname);
2252         key.dsize = strlen(keyname) + 1;
2253
2254         ret = ctdb_ltdb_fetch(h->db, key, &header, h, &data);
2255         if (ret != 0) {
2256                 DEBUG(DEBUG_ERR,
2257                       ("transaction_commit: %s seqnum fetch failed, ret=%d\n",
2258                        h->db->db_name, ret));
2259                 return ret;
2260         }
2261
2262         if (data.dsize == 0) {
2263                 /* initial data */
2264                 *seqnum = 0;
2265                 return 0;
2266         }
2267
2268         if (data.dsize != sizeof(uint64_t)) {
2269                 talloc_free(data.dptr);
2270                 return EINVAL;
2271         }
2272
2273         *seqnum = *(uint64_t *)data.dptr;
2274
2275         talloc_free(data.dptr);
2276         return 0;
2277 }
2278
2279 static int ctdb_transaction_store_db_seqnum(struct ctdb_transaction_handle *h,
2280                                             uint64_t seqnum)
2281 {
2282         const char *keyname = CTDB_DB_SEQNUM_KEY;
2283         TDB_DATA key, data;
2284
2285         key.dptr = discard_const(keyname);
2286         key.dsize = strlen(keyname) + 1;
2287
2288         data.dptr = (uint8_t *)&seqnum;
2289         data.dsize = sizeof(seqnum);
2290
2291         return ctdb_transaction_store_record(h, key, data);
2292 }
2293
2294 struct ctdb_transaction_commit_state {
2295         struct tevent_context *ev;
2296         struct timeval timeout;
2297         struct ctdb_transaction_handle *h;
2298         uint64_t seqnum;
2299 };
2300
2301 static void ctdb_transaction_commit_done(struct tevent_req *subreq);
2302 static void ctdb_transaction_commit_g_lock_done(struct tevent_req *subreq);
2303
2304 struct tevent_req *ctdb_transaction_commit_send(
2305                                         TALLOC_CTX *mem_ctx,
2306                                         struct tevent_context *ev,
2307                                         struct timeval timeout,
2308                                         struct ctdb_transaction_handle *h)
2309 {
2310         struct tevent_req *req, *subreq;
2311         struct ctdb_transaction_commit_state *state;
2312         struct ctdb_req_control request;
2313         int ret;
2314
2315         req = tevent_req_create(mem_ctx, &state,
2316                                 struct ctdb_transaction_commit_state);
2317         if (req == NULL) {
2318                 return NULL;
2319         }
2320
2321         state->ev = ev;
2322         state->timeout = timeout;
2323         state->h = h;
2324
2325         ret = ctdb_transaction_fetch_db_seqnum(h, &state->seqnum);
2326         if (ret != 0) {
2327                 tevent_req_error(req, ret);
2328                 return tevent_req_post(req, ev);
2329         }
2330
2331         ret = ctdb_transaction_store_db_seqnum(h, state->seqnum+1);
2332         if (ret != 0) {
2333                 tevent_req_error(req, ret);
2334                 return tevent_req_post(req, ev);
2335         }
2336
2337         ctdb_req_control_trans3_commit(&request, h->recbuf);
2338         subreq = ctdb_client_control_send(state, ev, h->client,
2339                                           ctdb_client_pnn(h->client),
2340                                           timeout, &request);
2341         if (tevent_req_nomem(subreq, req)) {
2342                 return tevent_req_post(req, ev);
2343         }
2344         tevent_req_set_callback(subreq, ctdb_transaction_commit_done, req);
2345
2346         return req;
2347 }
2348
2349 static void ctdb_transaction_commit_done(struct tevent_req *subreq)
2350 {
2351         struct tevent_req *req = tevent_req_callback_data(
2352                 subreq, struct tevent_req);
2353         struct ctdb_transaction_commit_state *state = tevent_req_data(
2354                 req, struct ctdb_transaction_commit_state);
2355         struct ctdb_transaction_handle *h = state->h;
2356         struct ctdb_reply_control *reply;
2357         uint64_t seqnum;
2358         int ret;
2359         bool status;
2360
2361         status = ctdb_client_control_recv(subreq, &ret, state, &reply);
2362         TALLOC_FREE(subreq);
2363         if (! status) {
2364                 DEBUG(DEBUG_ERR,
2365                       ("transaction_commit: %s TRANS3_COMMIT failed, ret=%d\n",
2366                        h->db->db_name, ret));
2367                 tevent_req_error(req, ret);
2368                 return;
2369         }
2370
2371         ret = ctdb_reply_control_trans3_commit(reply);
2372         talloc_free(reply);
2373
2374         if (ret != 0) {
2375                 /* Control failed due to recovery */
2376
2377                 ret = ctdb_transaction_fetch_db_seqnum(h, &seqnum);
2378                 if (ret != 0) {
2379                         tevent_req_error(req, ret);
2380                         return;
2381                 }
2382
2383                 if (seqnum == state->seqnum) {
2384                         struct ctdb_req_control request;
2385
2386                         /* try again */
2387                         ctdb_req_control_trans3_commit(&request,
2388                                                        state->h->recbuf);
2389                         subreq = ctdb_client_control_send(
2390                                         state, state->ev, state->h->client,
2391                                         ctdb_client_pnn(state->h->client),
2392                                         state->timeout, &request);
2393                         if (tevent_req_nomem(subreq, req)) {
2394                                 return;
2395                         }
2396                         tevent_req_set_callback(subreq,
2397                                                 ctdb_transaction_commit_done,
2398                                                 req);
2399                         return;
2400                 }
2401
2402                 if (seqnum != state->seqnum + 1) {
2403                         DEBUG(DEBUG_ERR,
2404                               ("transaction_commit: %s seqnum mismatch "
2405                                "0x%"PRIx64" != 0x%"PRIx64" + 1\n",
2406                                state->h->db->db_name, seqnum, state->seqnum));
2407                         tevent_req_error(req, EIO);
2408                         return;
2409                 }
2410         }
2411
2412         /* trans3_commit successful */
2413         subreq = ctdb_g_lock_unlock_send(state, state->ev, h->client,
2414                                          h->db_g_lock, h->lock_name, h->sid);
2415         if (tevent_req_nomem(subreq, req)) {
2416                 return;
2417         }
2418         tevent_req_set_callback(subreq, ctdb_transaction_commit_g_lock_done,
2419                                 req);
2420 }
2421
2422 static void ctdb_transaction_commit_g_lock_done(struct tevent_req *subreq)
2423 {
2424         struct tevent_req *req = tevent_req_callback_data(
2425                 subreq, struct tevent_req);
2426         struct ctdb_transaction_commit_state *state = tevent_req_data(
2427                 req, struct ctdb_transaction_commit_state);
2428         int ret;
2429         bool status;
2430
2431         status = ctdb_g_lock_unlock_recv(subreq, &ret);
2432         TALLOC_FREE(subreq);
2433         if (! status) {
2434                 DEBUG(DEBUG_ERR,
2435                       ("transaction_commit: %s g_lock unlock failed, ret=%d\n",
2436                        state->h->db->db_name, ret));
2437                 tevent_req_error(req, ret);
2438                 return;
2439         }
2440
2441         talloc_free(state->h);
2442         tevent_req_done(req);
2443 }
2444
2445 bool ctdb_transaction_commit_recv(struct tevent_req *req, int *perr)
2446 {
2447         int err;
2448
2449         if (tevent_req_is_unix_error(req, &err)) {
2450                 if (perr != NULL) {
2451                         *perr = err;
2452                 }
2453                 return false;
2454         }
2455
2456         return true;
2457 }
2458
2459 int ctdb_transaction_commit(struct ctdb_transaction_handle *h)
2460 {
2461         struct tevent_context *ev = h->ev;
2462         TALLOC_CTX *mem_ctx;
2463         struct tevent_req *req;
2464         int ret;
2465         bool status;
2466
2467         if (h->readonly || ! h->updated) {
2468                 return ctdb_transaction_cancel(h);
2469         }
2470
2471         mem_ctx = talloc_new(NULL);
2472         if (mem_ctx == NULL) {
2473                 return ENOMEM;
2474         }
2475
2476         req = ctdb_transaction_commit_send(mem_ctx, ev,
2477                                            tevent_timeval_zero(), h);
2478         if (req == NULL) {
2479                 talloc_free(mem_ctx);
2480                 return ENOMEM;
2481         }
2482
2483         tevent_req_poll(req, ev);
2484
2485         status = ctdb_transaction_commit_recv(req, &ret);
2486         if (! status) {
2487                 talloc_free(mem_ctx);
2488                 return ret;
2489         }
2490
2491         talloc_free(mem_ctx);
2492         return 0;
2493 }
2494
2495 struct ctdb_transaction_cancel_state {
2496         struct tevent_context *ev;
2497         struct ctdb_transaction_handle *h;
2498         struct timeval timeout;
2499 };
2500
2501 static void ctdb_transaction_cancel_done(struct tevent_req *subreq);
2502
2503 struct tevent_req *ctdb_transaction_cancel_send(
2504                                         TALLOC_CTX *mem_ctx,
2505                                         struct tevent_context *ev,
2506                                         struct timeval timeout,
2507                                         struct ctdb_transaction_handle *h)
2508 {
2509         struct tevent_req *req, *subreq;
2510         struct ctdb_transaction_cancel_state *state;
2511
2512         req = tevent_req_create(mem_ctx, &state,
2513                                 struct ctdb_transaction_cancel_state);
2514         if (req == NULL) {
2515                 return NULL;
2516         }
2517
2518         state->ev = ev;
2519         state->h = h;
2520         state->timeout = timeout;
2521
2522         subreq = ctdb_g_lock_unlock_send(state, state->ev, state->h->client,
2523                                          state->h->db_g_lock,
2524                                          state->h->lock_name, state->h->sid);
2525         if (tevent_req_nomem(subreq, req)) {
2526                 return tevent_req_post(req, ev);
2527         }
2528         tevent_req_set_callback(subreq, ctdb_transaction_cancel_done,
2529                                 req);
2530
2531         return req;
2532 }
2533
2534 static void ctdb_transaction_cancel_done(struct tevent_req *subreq)
2535 {
2536         struct tevent_req *req = tevent_req_callback_data(
2537                 subreq, struct tevent_req);
2538         struct ctdb_transaction_cancel_state *state = tevent_req_data(
2539                 req, struct ctdb_transaction_cancel_state);
2540         int ret;
2541         bool status;
2542
2543         status = ctdb_g_lock_unlock_recv(subreq, &ret);
2544         TALLOC_FREE(subreq);
2545         if (! status) {
2546                 DEBUG(DEBUG_ERR,
2547                       ("transaction_cancel: %s g_lock unlock failed, ret=%d\n",
2548                        state->h->db->db_name, ret));
2549                 talloc_free(state->h);
2550                 tevent_req_error(req, ret);
2551                 return;
2552         }
2553
2554         talloc_free(state->h);
2555         tevent_req_done(req);
2556 }
2557
2558 bool ctdb_transaction_cancel_recv(struct tevent_req *req, int *perr)
2559 {
2560         int err;
2561
2562         if (tevent_req_is_unix_error(req, &err)) {
2563                 if (perr != NULL) {
2564                         *perr = err;
2565                 }
2566                 return false;
2567         }
2568
2569         return true;
2570 }
2571
2572 int ctdb_transaction_cancel(struct ctdb_transaction_handle *h)
2573 {
2574         struct tevent_context *ev = h->ev;
2575         struct tevent_req *req;
2576         TALLOC_CTX *mem_ctx;
2577         int ret;
2578         bool status;
2579
2580         mem_ctx = talloc_new(NULL);
2581         if (mem_ctx == NULL) {
2582                 talloc_free(h);
2583                 return ENOMEM;
2584         }
2585
2586         req = ctdb_transaction_cancel_send(mem_ctx, ev,
2587                                            tevent_timeval_zero(), h);
2588         if (req == NULL) {
2589                 talloc_free(mem_ctx);
2590                 talloc_free(h);
2591                 return ENOMEM;
2592         }
2593
2594         tevent_req_poll(req, ev);
2595
2596         status = ctdb_transaction_cancel_recv(req, &ret);
2597         if (! status) {
2598                 talloc_free(mem_ctx);
2599                 return ret;
2600         }
2601
2602         talloc_free(mem_ctx);
2603         return 0;
2604 }
2605
2606 /*
2607  * TODO:
2608  *
2609  * In future Samba should register SERVER_ID.
2610  * Make that structure same as struct srvid {}.
2611  */