ctdb-client: Add missing initialisation of calldata
[obnox/samba/samba-obnox.git] / ctdb / client / client_db.c
1 /*
2    CTDB client code
3
4    Copyright (C) Amitay Isaacs  2015
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "replace.h"
21 #include "system/network.h"
22 #include "system/filesys.h"
23
24 #include <talloc.h>
25 #include <tevent.h>
26 #include <tdb.h>
27
28 #include "common/logging.h"
29
30 #include "lib/tdb_wrap/tdb_wrap.h"
31 #include "lib/util/tevent_unix.h"
32 #include "lib/util/dlinklist.h"
33 #include "lib/util/debug.h"
34
35 #include "protocol/protocol.h"
36 #include "protocol/protocol_api.h"
37 #include "client/client_private.h"
38 #include "client/client.h"
39
40 static struct ctdb_db_context *client_db_handle(
41                                         struct ctdb_client_context *client,
42                                         const char *db_name)
43 {
44         struct ctdb_db_context *db;
45
46         for (db = client->db; db != NULL; db = db->next) {
47                 if (strcmp(db_name, db->db_name) == 0) {
48                         return db;
49                 }
50         }
51
52         return NULL;
53 }
54
55 struct ctdb_set_db_flags_state {
56         struct tevent_context *ev;
57         struct ctdb_client_context *client;
58         struct timeval timeout;
59         uint32_t db_id;
60         uint8_t db_flags;
61         bool readonly_done, sticky_done;
62         uint32_t *pnn_list;
63         int count;
64 };
65
66 static void ctdb_set_db_flags_nodemap_done(struct tevent_req *subreq);
67 static void ctdb_set_db_flags_readonly_done(struct tevent_req *subreq);
68 static void ctdb_set_db_flags_sticky_done(struct tevent_req *subreq);
69
70 static struct tevent_req *ctdb_set_db_flags_send(
71                                 TALLOC_CTX *mem_ctx,
72                                 struct tevent_context *ev,
73                                 struct ctdb_client_context *client,
74                                 uint32_t destnode, struct timeval timeout,
75                                 uint32_t db_id, uint8_t db_flags)
76 {
77         struct tevent_req *req, *subreq;
78         struct ctdb_set_db_flags_state *state;
79         struct ctdb_req_control request;
80
81         req = tevent_req_create(mem_ctx, &state,
82                                 struct ctdb_set_db_flags_state);
83         if (req == NULL) {
84                 return NULL;
85         }
86
87         if (! (db_flags & (CTDB_DB_FLAGS_READONLY | CTDB_DB_FLAGS_STICKY))) {
88                 tevent_req_done(req);
89                 return tevent_req_post(req, ev);
90         }
91
92         state->ev = ev;
93         state->client = client;
94         state->timeout = timeout;
95         state->db_id = db_id;
96         state->db_flags = db_flags;
97
98         ctdb_req_control_get_nodemap(&request);
99         subreq = ctdb_client_control_send(state, ev, client, destnode, timeout,
100                                           &request);
101         if (tevent_req_nomem(subreq, req)) {
102                 return tevent_req_post(req, ev);
103         }
104         tevent_req_set_callback(subreq, ctdb_set_db_flags_nodemap_done, req);
105
106         return req;
107 }
108
109 static void ctdb_set_db_flags_nodemap_done(struct tevent_req *subreq)
110 {
111         struct tevent_req *req = tevent_req_callback_data(
112                 subreq, struct tevent_req);
113         struct ctdb_set_db_flags_state *state = tevent_req_data(
114                 req, struct ctdb_set_db_flags_state);
115         struct ctdb_req_control request;
116         struct ctdb_reply_control *reply;
117         struct ctdb_node_map *nodemap;
118         int ret;
119         bool status;
120
121         status = ctdb_client_control_recv(subreq, &ret, state, &reply);
122         TALLOC_FREE(subreq);
123         if (! status) {
124                 tevent_req_error(req, ret);
125                 return;
126         }
127
128         ret = ctdb_reply_control_get_nodemap(reply, state, &nodemap);
129         talloc_free(reply);
130         if (ret != 0) {
131                 tevent_req_error(req, ret);
132                 return;
133         }
134
135         state->count = list_of_connected_nodes(nodemap, CTDB_UNKNOWN_PNN,
136                                                state, &state->pnn_list);
137         talloc_free(nodemap);
138         if (state->count <= 0) {
139                 tevent_req_error(req, ENOMEM);
140                 return;
141         }
142
143         if (state->db_flags & CTDB_DB_FLAGS_READONLY) {
144                 ctdb_req_control_set_db_readonly(&request, state->db_id);
145                 subreq = ctdb_client_control_multi_send(
146                                         state, state->ev, state->client,
147                                         state->pnn_list, state->count,
148                                         state->timeout, &request);
149                 if (tevent_req_nomem(subreq, req)) {
150                         return;
151                 }
152                 tevent_req_set_callback(subreq,
153                                         ctdb_set_db_flags_readonly_done, req);
154         } else {
155                 state->readonly_done = true;
156         }
157
158         if (state->db_flags & CTDB_DB_FLAGS_STICKY) {
159                 ctdb_req_control_set_db_sticky(&request, state->db_id);
160                 subreq = ctdb_client_control_multi_send(
161                                         state, state->ev, state->client,
162                                         state->pnn_list, state->count,
163                                         state->timeout, &request);
164                 if (tevent_req_nomem(subreq, req)) {
165                         return;
166                 }
167                 tevent_req_set_callback(subreq, ctdb_set_db_flags_sticky_done,
168                                         req);
169         } else {
170                 state->sticky_done = true;
171         }
172 }
173
174 static void ctdb_set_db_flags_readonly_done(struct tevent_req *subreq)
175 {
176         struct tevent_req *req = tevent_req_callback_data(
177                 subreq, struct tevent_req);
178         struct ctdb_set_db_flags_state *state = tevent_req_data(
179                 req, struct ctdb_set_db_flags_state);
180         int ret;
181         bool status;
182
183         status = ctdb_client_control_multi_recv(subreq, &ret, NULL, NULL,
184                                                 NULL);
185         TALLOC_FREE(subreq);
186         if (! status) {
187                 tevent_req_error(req, ret);
188                 return;
189         }
190
191         state->readonly_done = true;
192
193         if (state->readonly_done && state->sticky_done) {
194                 tevent_req_done(req);
195         }
196 }
197
198 static void ctdb_set_db_flags_sticky_done(struct tevent_req *subreq)
199 {
200         struct tevent_req *req = tevent_req_callback_data(
201                 subreq, struct tevent_req);
202         struct ctdb_set_db_flags_state *state = tevent_req_data(
203                 req, struct ctdb_set_db_flags_state);
204         int ret;
205         bool status;
206
207         status = ctdb_client_control_multi_recv(subreq, &ret, NULL, NULL,
208                                                 NULL);
209         TALLOC_FREE(subreq);
210         if (! status) {
211                 tevent_req_error(req, ret);
212                 return;
213         }
214
215         state->sticky_done = true;
216
217         if (state->readonly_done && state->sticky_done) {
218                 tevent_req_done(req);
219         }
220 }
221
222 static bool ctdb_set_db_flags_recv(struct tevent_req *req, int *perr)
223 {
224         int err;
225
226         if (tevent_req_is_unix_error(req, &err)) {
227                 if (perr != NULL) {
228                         *perr = err;
229                 }
230                 return false;
231         }
232         return true;
233 }
234
235 struct ctdb_attach_state {
236         struct tevent_context *ev;
237         struct ctdb_client_context *client;
238         struct timeval timeout;
239         uint32_t destnode;
240         uint8_t db_flags;
241         uint32_t tdb_flags;
242         struct ctdb_db_context *db;
243 };
244
245 static void ctdb_attach_mutex_done(struct tevent_req *subreq);
246 static void ctdb_attach_dbid_done(struct tevent_req *subreq);
247 static void ctdb_attach_dbpath_done(struct tevent_req *subreq);
248 static void ctdb_attach_health_done(struct tevent_req *subreq);
249 static void ctdb_attach_flags_done(struct tevent_req *subreq);
250
251 struct tevent_req *ctdb_attach_send(TALLOC_CTX *mem_ctx,
252                                     struct tevent_context *ev,
253                                     struct ctdb_client_context *client,
254                                     struct timeval timeout,
255                                     const char *db_name, uint8_t db_flags)
256 {
257         struct tevent_req *req, *subreq;
258         struct ctdb_attach_state *state;
259         struct ctdb_req_control request;
260
261         req = tevent_req_create(mem_ctx, &state, struct ctdb_attach_state);
262         if (req == NULL) {
263                 return NULL;
264         }
265
266         state->db = client_db_handle(client, db_name);
267         if (state->db != NULL) {
268                 tevent_req_done(req);
269                 return tevent_req_post(req, ev);
270         }
271
272         state->ev = ev;
273         state->client = client;
274         state->timeout = timeout;
275         state->destnode = ctdb_client_pnn(client);
276         state->db_flags = db_flags;
277
278         state->db = talloc_zero(client, struct ctdb_db_context);
279         if (tevent_req_nomem(state->db, req)) {
280                 return tevent_req_post(req, ev);
281         }
282
283         state->db->db_name = talloc_strdup(state->db, db_name);
284         if (tevent_req_nomem(state->db, req)) {
285                 return tevent_req_post(req, ev);
286         }
287
288         if (db_flags & CTDB_DB_FLAGS_PERSISTENT) {
289                 state->db->persistent = true;
290         }
291
292         ctdb_req_control_get_tunable(&request, "TDBMutexEnabled");
293         subreq = ctdb_client_control_send(state, ev, client,
294                                           ctdb_client_pnn(client), timeout,
295                                           &request);
296         if (tevent_req_nomem(subreq, req)) {
297                 return tevent_req_post(req, ev);
298         }
299         tevent_req_set_callback(subreq, ctdb_attach_mutex_done, req);
300
301         return req;
302 }
303
304 static void ctdb_attach_mutex_done(struct tevent_req *subreq)
305 {
306         struct tevent_req *req = tevent_req_callback_data(
307                 subreq, struct tevent_req);
308         struct ctdb_attach_state *state = tevent_req_data(
309                 req, struct ctdb_attach_state);
310         struct ctdb_reply_control *reply;
311         struct ctdb_req_control request;
312         uint32_t mutex_enabled;
313         int ret;
314         bool status;
315
316         status = ctdb_client_control_recv(subreq, &ret, state, &reply);
317         TALLOC_FREE(subreq);
318         if (! status) {
319                 tevent_req_error(req, ret);
320                 return;
321         }
322
323         ret = ctdb_reply_control_get_tunable(reply, &mutex_enabled);
324         if (ret != 0) {
325                 /* Treat error as mutex support not available */
326                 mutex_enabled = 0;
327         }
328
329         state->tdb_flags = TDB_DEFAULT;
330         if (! state->db->persistent) {
331                 state->tdb_flags |= (TDB_INCOMPATIBLE_HASH |
332                                      TDB_CLEAR_IF_FIRST);
333         }
334         if (mutex_enabled == 1) {
335                 state->tdb_flags |= TDB_MUTEX_LOCKING;
336         }
337
338         if (state->db->persistent) {
339                 ctdb_req_control_db_attach_persistent(&request,
340                                                       state->db->db_name,
341                                                       state->tdb_flags);
342         } else {
343                 ctdb_req_control_db_attach(&request, state->db->db_name,
344                                            state->tdb_flags);
345         }
346
347         subreq = ctdb_client_control_send(state, state->ev, state->client,
348                                           state->destnode, state->timeout,
349                                           &request);
350         if (tevent_req_nomem(subreq, req)) {
351                 return;
352         }
353         tevent_req_set_callback(subreq, ctdb_attach_dbid_done, req);
354 }
355
356 static void ctdb_attach_dbid_done(struct tevent_req *subreq)
357 {
358         struct tevent_req *req = tevent_req_callback_data(
359                 subreq, struct tevent_req);
360         struct ctdb_attach_state *state = tevent_req_data(
361                 req, struct ctdb_attach_state);
362         struct ctdb_req_control request;
363         struct ctdb_reply_control *reply;
364         bool status;
365         int ret;
366
367         status = ctdb_client_control_recv(subreq, &ret, state, &reply);
368         TALLOC_FREE(subreq);
369         if (! status) {
370                 tevent_req_error(req, ret);
371                 return;
372         }
373
374         if (state->db->persistent) {
375                 ret = ctdb_reply_control_db_attach_persistent(
376                                 reply, &state->db->db_id);
377         } else {
378                 ret = ctdb_reply_control_db_attach(reply, &state->db->db_id);
379         }
380         talloc_free(reply);
381         if (ret != 0) {
382                 tevent_req_error(req, ret);
383                 return;
384         }
385
386         ctdb_req_control_getdbpath(&request, state->db->db_id);
387         subreq = ctdb_client_control_send(state, state->ev, state->client,
388                                           state->destnode, state->timeout,
389                                           &request);
390         if (tevent_req_nomem(subreq, req)) {
391                 return;
392         }
393         tevent_req_set_callback(subreq, ctdb_attach_dbpath_done, req);
394 }
395
396 static void ctdb_attach_dbpath_done(struct tevent_req *subreq)
397 {
398         struct tevent_req *req = tevent_req_callback_data(
399                 subreq, struct tevent_req);
400         struct ctdb_attach_state *state = tevent_req_data(
401                 req, struct ctdb_attach_state);
402         struct ctdb_reply_control *reply;
403         struct ctdb_req_control request;
404         bool status;
405         int ret;
406
407         status = ctdb_client_control_recv(subreq, &ret, state, &reply);
408         TALLOC_FREE(subreq);
409         if (! status) {
410                 tevent_req_error(req, ret);
411                 return;
412         }
413
414         ret = ctdb_reply_control_getdbpath(reply, state->db,
415                                            &state->db->db_path);
416         talloc_free(reply);
417         if (ret != 0) {
418                 tevent_req_error(req, ret);
419                 return;
420         }
421
422         ctdb_req_control_db_get_health(&request, state->db->db_id);
423         subreq = ctdb_client_control_send(state, state->ev, state->client,
424                                           state->destnode, state->timeout,
425                                           &request);
426         if (tevent_req_nomem(subreq, req)) {
427                 return;
428         }
429         tevent_req_set_callback(subreq, ctdb_attach_health_done, req);
430 }
431
432 static void ctdb_attach_health_done(struct tevent_req *subreq)
433 {
434         struct tevent_req *req = tevent_req_callback_data(
435                 subreq, struct tevent_req);
436         struct ctdb_attach_state *state = tevent_req_data(
437                 req, struct ctdb_attach_state);
438         struct ctdb_reply_control *reply;
439         const char *reason;
440         bool status;
441         int ret;
442
443         status = ctdb_client_control_recv(subreq, &ret, state, &reply);
444         TALLOC_FREE(subreq);
445         if (! status) {
446                 tevent_req_error(req, ret);
447                 return;
448         }
449
450         ret = ctdb_reply_control_db_get_health(reply, state, &reason);
451         if (ret != 0) {
452                 tevent_req_error(req, ret);
453                 return;
454         }
455
456         if (reason != NULL) {
457                 /* Database unhealthy, avoid attach */
458                 /* FIXME: Log here */
459                 tevent_req_error(req, EIO);
460                 return;
461         }
462
463         subreq = ctdb_set_db_flags_send(state, state->ev, state->client,
464                                         state->destnode, state->timeout,
465                                         state->db->db_id, state->db_flags);
466         if (tevent_req_nomem(subreq, req)) {
467                 return;
468         }
469         tevent_req_set_callback(subreq, ctdb_attach_flags_done, req);
470 }
471
472 static void ctdb_attach_flags_done(struct tevent_req *subreq)
473 {
474         struct tevent_req *req = tevent_req_callback_data(
475                 subreq, struct tevent_req);
476         struct ctdb_attach_state *state = tevent_req_data(
477                 req, struct ctdb_attach_state);
478         bool status;
479         int ret;
480
481         status = ctdb_set_db_flags_recv(subreq, &ret);
482         TALLOC_FREE(subreq);
483         if (! status) {
484                 tevent_req_error(req, ret);
485                 return;
486         }
487
488         state->db->ltdb = tdb_wrap_open(state->db, state->db->db_path, 0,
489                                         state->tdb_flags, O_RDWR, 0);
490         if (tevent_req_nomem(state->db->ltdb, req)) {
491                 return;
492         }
493         DLIST_ADD(state->client->db, state->db);
494
495         tevent_req_done(req);
496 }
497
498 bool ctdb_attach_recv(struct tevent_req *req, int *perr,
499                       struct ctdb_db_context **out)
500 {
501         struct ctdb_attach_state *state = tevent_req_data(
502                 req, struct ctdb_attach_state);
503         int err;
504
505         if (tevent_req_is_unix_error(req, &err)) {
506                 if (perr != NULL) {
507                         *perr = err;
508                 }
509                 return false;
510         }
511
512         if (out != NULL) {
513                 *out = state->db;
514         }
515         return true;
516 }
517
518 int ctdb_attach(struct tevent_context *ev,
519                 struct ctdb_client_context *client,
520                 struct timeval timeout,
521                 const char *db_name, uint8_t db_flags,
522                 struct ctdb_db_context **out)
523 {
524         TALLOC_CTX *mem_ctx;
525         struct tevent_req *req;
526         bool status;
527         int ret;
528
529         mem_ctx = talloc_new(client);
530         if (mem_ctx == NULL) {
531                 return ENOMEM;
532         }
533
534         req = ctdb_attach_send(mem_ctx, ev, client, timeout,
535                                db_name, db_flags);
536         if (req == NULL) {
537                 talloc_free(mem_ctx);
538                 return ENOMEM;
539         }
540
541         tevent_req_poll(req, ev);
542
543         status = ctdb_attach_recv(req, &ret, out);
544         if (! status) {
545                 talloc_free(mem_ctx);
546                 return ret;
547         }
548
549         /*
550         ctdb_set_call(db, CTDB_NULL_FUNC, ctdb_null_func);
551         ctdb_set_call(db, CTDB_FETCH_FUNC, ctdb_fetch_func);
552         ctdb_set_call(db, CTDB_FETCH_WITH_HEADER_FUNC, ctdb_fetch_with_header_func);
553         */
554
555         talloc_free(mem_ctx);
556         return 0;
557 }
558
559 int ctdb_detach(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
560                 struct ctdb_client_context *client,
561                 struct timeval timeout, uint32_t db_id)
562 {
563         struct ctdb_db_context *db;
564         int ret;
565
566         ret = ctdb_ctrl_db_detach(mem_ctx, ev, client, client->pnn, timeout,
567                                   db_id);
568         if (ret != 0) {
569                 return ret;
570         }
571
572         for (db = client->db; db != NULL; db = db->next) {
573                 if (db->db_id == db_id) {
574                         DLIST_REMOVE(client->db, db);
575                         break;
576                 }
577         }
578
579         return 0;
580 }
581
582 uint32_t ctdb_db_id(struct ctdb_db_context *db)
583 {
584         return db->db_id;
585 }
586
587 struct ctdb_db_traverse_state {
588         ctdb_rec_parser_func_t parser;
589         void *private_data;
590         bool extract_header;
591         int error;
592 };
593
594 static int ctdb_db_traverse_handler(struct tdb_context *tdb, TDB_DATA key,
595                                     TDB_DATA data, void *private_data)
596 {
597         struct ctdb_db_traverse_state *state =
598                 (struct ctdb_db_traverse_state *)private_data;
599         int ret;
600
601         if (state->extract_header) {
602                 struct ctdb_ltdb_header header;
603
604                 ret = ctdb_ltdb_header_extract(&data, &header);
605                 if (ret != 0) {
606                         state->error = ret;
607                         return 1;
608                 }
609
610                 ret = state->parser(0, &header, key, data, state->private_data);
611         } else {
612                 ret = state->parser(0, NULL, key, data, state->private_data);
613         }
614
615         if (ret != 0) {
616                 state->error = ret;
617                 return 1;
618         }
619
620         return 0;
621 }
622
623 int ctdb_db_traverse(struct ctdb_db_context *db, bool readonly,
624                      bool extract_header,
625                      ctdb_rec_parser_func_t parser, void *private_data)
626 {
627         struct ctdb_db_traverse_state state;
628         int ret;
629
630         state.parser = parser;
631         state.private_data = private_data;
632         state.extract_header = extract_header;
633         state.error = 0;
634
635         if (readonly) {
636                 ret = tdb_traverse_read(db->ltdb->tdb,
637                                         ctdb_db_traverse_handler, &state);
638         } else {
639                 ret = tdb_traverse(db->ltdb->tdb,
640                                    ctdb_db_traverse_handler, &state);
641         }
642
643         if (ret == -1) {
644                 return EIO;
645         }
646
647         return state.error;
648 }
649
650 static int ctdb_ltdb_fetch(struct ctdb_db_context *db, TDB_DATA key,
651                            struct ctdb_ltdb_header *header,
652                            TALLOC_CTX *mem_ctx, TDB_DATA *data)
653 {
654         TDB_DATA rec;
655         int ret;
656
657         rec = tdb_fetch(db->ltdb->tdb, key);
658         if (rec.dsize < sizeof(struct ctdb_ltdb_header)) {
659                 /* No record present */
660                 if (rec.dptr != NULL) {
661                         free(rec.dptr);
662                 }
663
664                 if (tdb_error(db->ltdb->tdb) != TDB_ERR_NOEXIST) {
665                         return EIO;
666                 }
667
668                 header->rsn = 0;
669                 header->dmaster = CTDB_UNKNOWN_PNN;
670                 header->flags = 0;
671
672                 if (data != NULL) {
673                         *data = tdb_null;
674                 }
675                 return 0;
676         }
677
678         ret = ctdb_ltdb_header_pull(rec.dptr, rec.dsize, header);
679         if (ret != 0) {
680                 return ret;
681         }
682
683         ret = 0;
684         if (data != NULL) {
685                 size_t offset = ctdb_ltdb_header_len(header);
686
687                 data->dsize = rec.dsize - offset;
688                 data->dptr = talloc_memdup(mem_ctx, rec.dptr + offset,
689                                            data->dsize);
690                 if (data->dptr == NULL) {
691                         ret = ENOMEM;
692                 }
693         }
694
695         free(rec.dptr);
696         return ret;
697 }
698
699 /*
700  * Fetch a record from volatile database
701  *
702  * Steps:
703  *  1. Get a lock on the hash chain
704  *  2. If the record does not exist, migrate the record
705  *  3. If readonly=true and delegations do not exist, migrate the record.
706  *  4. If readonly=false and delegations exist, migrate the record.
707  *  5. If the local node is not dmaster, migrate the record.
708  *  6. Return record
709  */
710
711 struct ctdb_fetch_lock_state {
712         struct tevent_context *ev;
713         struct ctdb_client_context *client;
714         struct ctdb_record_handle *h;
715         bool readonly;
716         uint32_t pnn;
717 };
718
719 static int ctdb_fetch_lock_check(struct tevent_req *req);
720 static void ctdb_fetch_lock_migrate(struct tevent_req *req);
721 static void ctdb_fetch_lock_migrate_done(struct tevent_req *subreq);
722
723 struct tevent_req *ctdb_fetch_lock_send(TALLOC_CTX *mem_ctx,
724                                         struct tevent_context *ev,
725                                         struct ctdb_client_context *client,
726                                         struct ctdb_db_context *db,
727                                         TDB_DATA key, bool readonly)
728 {
729         struct ctdb_fetch_lock_state *state;
730         struct tevent_req *req;
731         int ret;
732
733         req = tevent_req_create(mem_ctx, &state, struct ctdb_fetch_lock_state);
734         if (req == NULL) {
735                 return NULL;
736         }
737
738         state->ev = ev;
739         state->client = client;
740
741         state->h = talloc_zero(db, struct ctdb_record_handle);
742         if (tevent_req_nomem(state->h, req)) {
743                 return tevent_req_post(req, ev);
744         }
745         state->h->client = client;
746         state->h->db = db;
747         state->h->key.dptr = talloc_memdup(state->h, key.dptr, key.dsize);
748         if (tevent_req_nomem(state->h->key.dptr, req)) {
749                 return tevent_req_post(req, ev);
750         }
751         state->h->key.dsize = key.dsize;
752         state->h->readonly = false;
753
754         state->readonly = readonly;
755         state->pnn = ctdb_client_pnn(client);
756
757         /* Check that database is not persistent */
758         if (db->persistent) {
759                 tevent_req_error(req, EINVAL);
760                 return tevent_req_post(req, ev);
761         }
762
763         ret = ctdb_fetch_lock_check(req);
764         if (ret == 0) {
765                 tevent_req_done(req);
766                 return tevent_req_post(req, ev);
767         }
768         if (ret != EAGAIN) {
769                 tevent_req_error(req, ret);
770                 return tevent_req_post(req, ev);
771         }
772         return req;
773 }
774
775 static int ctdb_fetch_lock_check(struct tevent_req *req)
776 {
777         struct ctdb_fetch_lock_state *state = tevent_req_data(
778                 req, struct ctdb_fetch_lock_state);
779         struct ctdb_record_handle *h = state->h;
780         struct ctdb_ltdb_header header;
781         TDB_DATA data = tdb_null;
782         int ret, err = 0;
783         bool do_migrate = false;
784
785         ret = tdb_chainlock(state->h->db->ltdb->tdb, state->h->key);
786         if (ret != 0) {
787                 err = EIO;
788                 goto failed;
789         }
790
791         data = tdb_fetch(h->db->ltdb->tdb, h->key);
792         if (data.dptr == NULL) {
793                 if (tdb_error(h->db->ltdb->tdb) == TDB_ERR_NOEXIST) {
794                         goto migrate;
795                 } else {
796                         err = EIO;
797                         goto failed;
798                 }
799         }
800
801         /* Got the record */
802         ret = ctdb_ltdb_header_pull(data.dptr, data.dsize, &header);
803         if (ret != 0) {
804                 err = ret;
805                 goto failed;
806         }
807
808         if (! state->readonly) {
809                 /* Read/write access */
810                 if (header.dmaster == state->pnn &&
811                     header.flags & CTDB_REC_RO_HAVE_DELEGATIONS) {
812                         goto migrate;
813                 }
814
815                 if (header.dmaster != state->pnn) {
816                         goto migrate;
817                 }
818         } else {
819                 /* Readonly access */
820                 if (header.dmaster != state->pnn &&
821                     ! (header.flags & (CTDB_REC_RO_HAVE_READONLY |
822                                        CTDB_REC_RO_HAVE_DELEGATIONS))) {
823                         goto migrate;
824                 }
825         }
826
827         /* We are the dmaster or readonly delegation */
828         h->header = header;
829         h->data = data;
830         if (header.flags & (CTDB_REC_RO_HAVE_READONLY |
831                             CTDB_REC_RO_HAVE_DELEGATIONS)) {
832                 h->readonly = true;
833         }
834         return 0;
835
836 migrate:
837         do_migrate = true;
838         err = EAGAIN;
839
840 failed:
841         if (data.dptr != NULL) {
842                 free(data.dptr);
843         }
844         ret = tdb_chainunlock(h->db->ltdb->tdb, h->key);
845         if (ret != 0) {
846                 DEBUG(DEBUG_ERR, ("tdb_chainunlock failed on %s\n",
847                                   h->db->db_name));
848                 return EIO;
849         }
850
851         if (do_migrate) {
852                 ctdb_fetch_lock_migrate(req);
853         }
854         return err;
855 }
856
857 static void ctdb_fetch_lock_migrate(struct tevent_req *req)
858 {
859         struct ctdb_fetch_lock_state *state = tevent_req_data(
860                 req, struct ctdb_fetch_lock_state);
861         struct ctdb_req_call request;
862         struct tevent_req *subreq;
863
864         ZERO_STRUCT(request);
865         request.flags = CTDB_IMMEDIATE_MIGRATION;
866         if (state->readonly) {
867                 request.flags |= CTDB_WANT_READONLY;
868         }
869         request.db_id = state->h->db->db_id;
870         request.callid = CTDB_NULL_FUNC;
871         request.key = state->h->key;
872         request.calldata = tdb_null;
873
874         subreq = ctdb_client_call_send(state, state->ev, state->client,
875                                        &request);
876         if (tevent_req_nomem(subreq, req)) {
877                 return;
878         }
879
880         tevent_req_set_callback(subreq, ctdb_fetch_lock_migrate_done, req);
881 }
882
883 static void ctdb_fetch_lock_migrate_done(struct tevent_req *subreq)
884 {
885         struct tevent_req *req = tevent_req_callback_data(
886                 subreq, struct tevent_req);
887         struct ctdb_fetch_lock_state *state = tevent_req_data(
888                 req, struct ctdb_fetch_lock_state);
889         struct ctdb_reply_call *reply;
890         int ret;
891         bool status;
892
893         status = ctdb_client_call_recv(subreq, state, &reply, &ret);
894         TALLOC_FREE(subreq);
895         if (! status) {
896                 tevent_req_error(req, ret);
897                 return;
898         }
899
900         if (reply->status != 0) {
901                 tevent_req_error(req, EIO);
902                 return;
903         }
904         talloc_free(reply);
905
906         ret = ctdb_fetch_lock_check(req);
907         if (ret != 0) {
908                 if (ret != EAGAIN) {
909                         tevent_req_error(req, ret);
910                 }
911                 return;
912         }
913
914         tevent_req_done(req);
915 }
916
917 static int ctdb_record_handle_destructor(struct ctdb_record_handle *h)
918 {
919         tdb_chainunlock(h->db->ltdb->tdb, h->key);
920         free(h->data.dptr);
921         return 0;
922 }
923
924 struct ctdb_record_handle *ctdb_fetch_lock_recv(struct tevent_req *req,
925                                                 struct ctdb_ltdb_header *header,
926                                                 TALLOC_CTX *mem_ctx,
927                                                 TDB_DATA *data, int *perr)
928 {
929         struct ctdb_fetch_lock_state *state = tevent_req_data(
930                 req, struct ctdb_fetch_lock_state);
931         struct ctdb_record_handle *h = state->h;
932         int err;
933
934         if (tevent_req_is_unix_error(req, &err)) {
935                 if (perr != NULL) {
936                         *perr = err;
937                 }
938                 return NULL;
939         }
940
941         if (header != NULL) {
942                 *header = h->header;
943         }
944         if (data != NULL) {
945                 size_t offset;
946
947                 offset = ctdb_ltdb_header_len(&h->header);
948
949                 data->dsize = h->data.dsize - offset;
950                 data->dptr = talloc_memdup(mem_ctx, h->data.dptr + offset,
951                                            data->dsize);
952                 if (data->dptr == NULL) {
953                         TALLOC_FREE(state->h);
954                         if (perr != NULL) {
955                                 *perr = ENOMEM;
956                         }
957                         return NULL;
958                 }
959         }
960
961         talloc_set_destructor(h, ctdb_record_handle_destructor);
962         return h;
963 }
964
965 int ctdb_fetch_lock(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
966                     struct ctdb_client_context *client,
967                     struct ctdb_db_context *db, TDB_DATA key, bool readonly,
968                     struct ctdb_record_handle **out,
969                     struct ctdb_ltdb_header *header, TDB_DATA *data)
970 {
971         struct tevent_req *req;
972         struct ctdb_record_handle *h;
973         int ret;
974
975         req = ctdb_fetch_lock_send(mem_ctx, ev, client, db, key, readonly);
976         if (req == NULL) {
977                 return ENOMEM;
978         }
979
980         tevent_req_poll(req, ev);
981
982         h = ctdb_fetch_lock_recv(req, header, mem_ctx, data, &ret);
983         if (h == NULL) {
984                 return ret;
985         }
986
987         *out = h;
988         return 0;
989 }
990
991 int ctdb_store_record(struct ctdb_record_handle *h, TDB_DATA data)
992 {
993         TDB_DATA rec;
994         size_t offset;
995         int ret;
996
997         /* Cannot modify the record if it was obtained as a readonly copy */
998         if (h->readonly) {
999                 return EINVAL;
1000         }
1001
1002         /* Check if the new data is same */
1003         if (h->data.dsize == data.dsize &&
1004             memcmp(h->data.dptr, data.dptr, data.dsize) == 0) {
1005                 /* No need to do anything */
1006                 return 0;
1007         }
1008
1009         offset = ctdb_ltdb_header_len(&h->header);
1010         rec.dsize = offset + data.dsize;
1011         rec.dptr = talloc_size(h, rec.dsize);
1012         if (rec.dptr == NULL) {
1013                 return ENOMEM;
1014         }
1015
1016         ctdb_ltdb_header_push(&h->header, rec.dptr);
1017         memcpy(rec.dptr + offset, data.dptr, data.dsize);
1018
1019         ret = tdb_store(h->db->ltdb->tdb, h->key, rec, TDB_REPLACE);
1020         if (ret != 0) {
1021                 DEBUG(DEBUG_ERR, ("Failed to store record in DB %s\n",
1022                                   h->db->db_name));
1023                 return EIO;
1024         }
1025
1026         talloc_free(rec.dptr);
1027         return 0;
1028 }
1029
1030 int ctdb_delete_record(struct ctdb_record_handle *h)
1031 {
1032         TDB_DATA rec;
1033         struct ctdb_key_data key;
1034         int ret;
1035
1036         /* Cannot delete the record if it was obtained as a readonly copy */
1037         if (h->readonly) {
1038                 return EINVAL;
1039         }
1040
1041         rec.dsize = ctdb_ltdb_header_len(&h->header);
1042         rec.dptr = talloc_size(h, rec.dsize);
1043         if (rec.dptr == NULL) {
1044                 return ENOMEM;
1045         }
1046
1047         ctdb_ltdb_header_push(&h->header, rec.dptr);
1048
1049         ret = tdb_store(h->db->ltdb->tdb, h->key, rec, TDB_REPLACE);
1050         talloc_free(rec.dptr);
1051         if (ret != 0) {
1052                 DEBUG(DEBUG_ERR, ("Failed to delete record in DB %s\n",
1053                                   h->db->db_name));
1054                 return EIO;
1055         }
1056
1057         key.db_id = h->db->db_id;
1058         key.header = h->header;
1059         key.key = h->key;
1060
1061         ret = ctdb_ctrl_schedule_for_deletion(h, h->ev, h->client,
1062                                               h->client->pnn,
1063                                               tevent_timeval_zero(), &key);
1064         if (ret != 0) {
1065                 DEBUG(DEBUG_WARNING,
1066                       ("Failed to mark record to be deleted in DB %s\n",
1067                        h->db->db_name));
1068                 return ret;
1069         }
1070
1071         return 0;
1072 }
1073
1074 /*
1075  * Global lock functions
1076  */
1077
1078 struct ctdb_g_lock_lock_state {
1079         struct tevent_context *ev;
1080         struct ctdb_client_context *client;
1081         struct ctdb_db_context *db;
1082         TDB_DATA key;
1083         struct ctdb_server_id my_sid;
1084         enum ctdb_g_lock_type lock_type;
1085         struct ctdb_record_handle *h;
1086         /* state for verification of active locks */
1087         struct ctdb_g_lock_list *lock_list;
1088         unsigned int current;
1089 };
1090
1091 static void ctdb_g_lock_lock_fetched(struct tevent_req *subreq);
1092 static void ctdb_g_lock_lock_process_locks(struct tevent_req *req);
1093 static void ctdb_g_lock_lock_checked(struct tevent_req *subreq);
1094 static int ctdb_g_lock_lock_update(struct tevent_req *req);
1095 static void ctdb_g_lock_lock_retry(struct tevent_req *subreq);
1096
1097 static bool ctdb_g_lock_conflicts(enum ctdb_g_lock_type l1,
1098                                   enum ctdb_g_lock_type l2)
1099 {
1100         if ((l1 == CTDB_G_LOCK_READ) && (l2 == CTDB_G_LOCK_READ)) {
1101                 return false;
1102         }
1103         return true;
1104 }
1105
1106 struct tevent_req *ctdb_g_lock_lock_send(TALLOC_CTX *mem_ctx,
1107                                          struct tevent_context *ev,
1108                                          struct ctdb_client_context *client,
1109                                          struct ctdb_db_context *db,
1110                                          const char *keyname,
1111                                          struct ctdb_server_id *sid,
1112                                          bool readonly)
1113 {
1114         struct tevent_req *req, *subreq;
1115         struct ctdb_g_lock_lock_state *state;
1116
1117         req = tevent_req_create(mem_ctx, &state,
1118                                 struct ctdb_g_lock_lock_state);
1119         if (req == NULL) {
1120                 return NULL;
1121         }
1122
1123         state->ev = ev;
1124         state->client = client;
1125         state->db = db;
1126         state->key.dptr = discard_const(keyname);
1127         state->key.dsize = strlen(keyname) + 1;
1128         state->my_sid = *sid;
1129         state->lock_type = (readonly ? CTDB_G_LOCK_READ : CTDB_G_LOCK_WRITE);
1130
1131         subreq = ctdb_fetch_lock_send(state, ev, client, db, state->key,
1132                                       false);
1133         if (tevent_req_nomem(subreq, req)) {
1134                 return tevent_req_post(req, ev);
1135         }
1136         tevent_req_set_callback(subreq, ctdb_g_lock_lock_fetched, req);
1137
1138         return req;
1139 }
1140
1141 static void ctdb_g_lock_lock_fetched(struct tevent_req *subreq)
1142 {
1143         struct tevent_req *req = tevent_req_callback_data(
1144                 subreq, struct tevent_req);
1145         struct ctdb_g_lock_lock_state *state = tevent_req_data(
1146                 req, struct ctdb_g_lock_lock_state);
1147         TDB_DATA data;
1148         int ret = 0;
1149
1150         state->h = ctdb_fetch_lock_recv(subreq, NULL, state, &data, &ret);
1151         TALLOC_FREE(subreq);
1152         if (state->h == NULL) {
1153                 tevent_req_error(req, ret);
1154                 return;
1155         }
1156
1157         if (state->lock_list != NULL) {
1158                 TALLOC_FREE(state->lock_list);
1159                 state->current = 0;
1160         }
1161
1162         ret = ctdb_g_lock_list_pull(data.dptr, data.dsize, state,
1163                                     &state->lock_list);
1164         talloc_free(data.dptr);
1165         if (ret != 0) {
1166                 tevent_req_error(req, ret);
1167                 return;
1168         }
1169
1170         ctdb_g_lock_lock_process_locks(req);
1171 }
1172
1173 static void ctdb_g_lock_lock_process_locks(struct tevent_req *req)
1174 {
1175         struct ctdb_g_lock_lock_state *state = tevent_req_data(
1176                 req, struct ctdb_g_lock_lock_state);
1177         struct tevent_req *subreq;
1178         struct ctdb_g_lock *lock;
1179         bool check_server = false;
1180         int ret;
1181
1182         while (state->current < state->lock_list->num) {
1183                 lock = &state->lock_list->lock[state->current];
1184
1185                 /* We should not ask for the same lock more than once */
1186                 if (ctdb_server_id_equal(&lock->sid, &state->my_sid)) {
1187                         tevent_req_error(req, EDEADLK);
1188                         return;
1189                 }
1190
1191                 if (ctdb_g_lock_conflicts(lock->type, state->lock_type)) {
1192                         check_server = true;
1193                         break;
1194                 }
1195
1196                 state->current += 1;
1197         }
1198
1199         if (check_server) {
1200                 struct ctdb_req_control request;
1201                 struct ctdb_uint64_array u64_array;
1202
1203                 u64_array.num = 1;
1204                 u64_array.val = &lock->sid.unique_id;
1205
1206                 ctdb_req_control_check_srvids(&request, &u64_array);
1207                 subreq = ctdb_client_control_send(state, state->ev,
1208                                                   state->client,
1209                                                   state->client->pnn,
1210                                                   tevent_timeval_zero(),
1211                                                   &request);
1212                 if (tevent_req_nomem(subreq, req)) {
1213                         return;
1214                 }
1215                 tevent_req_set_callback(subreq, ctdb_g_lock_lock_checked, req);
1216                 return;
1217         }
1218
1219         /* There is no conflict, add ourself to the lock_list */
1220         state->lock_list->lock = talloc_realloc(state->lock_list,
1221                                                 state->lock_list->lock,
1222                                                 struct ctdb_g_lock,
1223                                                 state->lock_list->num + 1);
1224         if (state->lock_list->lock == NULL) {
1225                 tevent_req_error(req, ENOMEM);
1226                 return;
1227         }
1228
1229         lock = &state->lock_list->lock[state->lock_list->num];
1230         lock->type = state->lock_type;
1231         lock->sid = state->my_sid;
1232         state->lock_list->num += 1;
1233
1234         ret = ctdb_g_lock_lock_update(req);
1235         if (ret != 0) {
1236                 tevent_req_error(req, ret);
1237                 return;
1238         }
1239
1240         tevent_req_done(req);
1241 }
1242
1243 static void ctdb_g_lock_lock_checked(struct tevent_req *subreq)
1244 {
1245         struct tevent_req *req = tevent_req_callback_data(
1246                 subreq, struct tevent_req);
1247         struct ctdb_g_lock_lock_state *state = tevent_req_data(
1248                 req, struct ctdb_g_lock_lock_state);
1249         struct ctdb_reply_control *reply;
1250         struct ctdb_uint8_array *u8_array;
1251         int ret;
1252         bool status;
1253         int8_t val;
1254
1255         status = ctdb_client_control_recv(subreq, &ret, state, &reply);
1256         TALLOC_FREE(subreq);
1257         if (! status) {
1258                 tevent_req_error(req, ret);
1259                 return;
1260         }
1261
1262         ret = ctdb_reply_control_check_srvids(reply, state, &u8_array);
1263         if (ret != 0) {
1264                 tevent_req_error(req, ENOMEM);
1265                 return;
1266         }
1267
1268         if (u8_array->num != 1) {
1269                 talloc_free(u8_array);
1270                 tevent_req_error(req, EIO);
1271                 return;
1272         }
1273
1274         val = u8_array->val[0];
1275         talloc_free(u8_array);
1276
1277         if (val == 1) {
1278                 /* server process exists, need to retry */
1279                 subreq = tevent_wakeup_send(state, state->ev,
1280                                             tevent_timeval_current_ofs(1,0));
1281                 if (tevent_req_nomem(subreq, req)) {
1282                         return;
1283                 }
1284                 tevent_req_set_callback(subreq, ctdb_g_lock_lock_retry, req);
1285                 return;
1286         }
1287
1288         /* server process does not exist, remove conflicting entry */
1289         state->lock_list->lock[state->current] =
1290                 state->lock_list->lock[state->lock_list->num-1];
1291         state->lock_list->num -= 1;
1292
1293         ret = ctdb_g_lock_lock_update(req);
1294         if (ret != 0) {
1295                 tevent_req_error(req, ret);
1296                 return;
1297         }
1298
1299         ctdb_g_lock_lock_process_locks(req);
1300 }
1301
1302 static int ctdb_g_lock_lock_update(struct tevent_req *req)
1303 {
1304         struct ctdb_g_lock_lock_state *state = tevent_req_data(
1305                 req, struct ctdb_g_lock_lock_state);
1306         TDB_DATA data;
1307         int ret;
1308
1309         data.dsize = ctdb_g_lock_list_len(state->lock_list);
1310         data.dptr = talloc_size(state, data.dsize);
1311         if (data.dptr == NULL) {
1312                 return ENOMEM;
1313         }
1314
1315         ctdb_g_lock_list_push(state->lock_list, data.dptr);
1316         ret = ctdb_store_record(state->h, data);
1317         talloc_free(data.dptr);
1318         return ret;
1319 }
1320
1321 #if 0
1322 static int ctdb_g_lock_lock_update(struct ctdb_g_lock_lock_state *state,
1323                                    struct ctdb_g_lock_list *lock_list,
1324                                    struct ctdb_record_handle *h)
1325 {
1326         struct ctdb_g_lock *lock;
1327         bool conflict = false;
1328         bool modified = false;
1329         int ret, i;
1330
1331         for (i=0; i<lock_list->num; i++) {
1332                 lock = &lock_list->lock[i];
1333
1334                 /* We should not ask for lock more than once */
1335                 if (ctdb_server_id_equal(&lock->sid, &state->my_sid)) {
1336                         return EDEADLK;
1337                 }
1338
1339                 if (ctdb_g_lock_conflicts(lock->type, state->lock_type)) {
1340                         bool exists;
1341
1342                         conflict = true;
1343                         ret = ctdb_server_id_exists(state->client, &lock->sid,
1344                                                     &exists);
1345                         if (ret != 0) {
1346                                 return ret;
1347                         }
1348
1349                         if (exists) {
1350                                 break;
1351                         }
1352
1353                         /* Server does not exist, delete conflicting entry */
1354                         lock_list->lock[i] = lock_list->lock[lock_list->num-1];
1355                         lock_list->num -= 1;
1356                         modified = true;
1357                 }
1358         }
1359
1360         if (! conflict) {
1361                 lock = talloc_realloc(lock_list, lock_list->lock,
1362                                       struct ctdb_g_lock, lock_list->num+1);
1363                 if (lock == NULL) {
1364                         return ENOMEM;
1365                 }
1366
1367                 lock[lock_list->num].type = state->lock_type;
1368                 lock[lock_list->num].sid = state->my_sid;
1369                 lock_list->lock = lock;
1370                 lock_list->num += 1;
1371                 modified = true;
1372         }
1373
1374         if (modified) {
1375                 TDB_DATA data;
1376
1377                 data.dsize = ctdb_g_lock_list_len(lock_list);
1378                 data.dptr = talloc_size(state, data.dsize);
1379                 if (data.dptr == NULL) {
1380                         return ENOMEM;
1381                 }
1382
1383                 ctdb_g_lock_list_push(lock_list, data.dptr);
1384                 ret = ctdb_store_record(h, data);
1385                 talloc_free(data.dptr);
1386                 if (ret != 0) {
1387                         return ret;
1388                 }
1389         }
1390
1391         if (conflict) {
1392                 return EAGAIN;
1393         }
1394         return 0;
1395 }
1396 #endif
1397
1398 static void ctdb_g_lock_lock_retry(struct tevent_req *subreq)
1399 {
1400         struct tevent_req *req = tevent_req_callback_data(
1401                 subreq, struct tevent_req);
1402         struct ctdb_g_lock_lock_state *state = tevent_req_data(
1403                 req, struct ctdb_g_lock_lock_state);
1404         bool success;
1405
1406         success = tevent_wakeup_recv(subreq);
1407         TALLOC_FREE(subreq);
1408         if (! success) {
1409                 tevent_req_error(req, ENOMEM);
1410                 return;
1411         }
1412
1413         subreq = ctdb_fetch_lock_send(state, state->ev, state->client,
1414                                       state->db, state->key, false);
1415         if (tevent_req_nomem(subreq, req)) {
1416                 return;
1417         }
1418         tevent_req_set_callback(subreq, ctdb_g_lock_lock_fetched, req);
1419 }
1420
1421 bool ctdb_g_lock_lock_recv(struct tevent_req *req, int *perr)
1422 {
1423         struct ctdb_g_lock_lock_state *state = tevent_req_data(
1424                 req, struct ctdb_g_lock_lock_state);
1425         int err;
1426
1427         TALLOC_FREE(state->h);
1428
1429         if (tevent_req_is_unix_error(req, &err)) {
1430                 if (perr != NULL) {
1431                         *perr = err;
1432                 }
1433                 return false;
1434         }
1435
1436         return true;
1437 }
1438
1439 struct ctdb_g_lock_unlock_state {
1440         struct tevent_context *ev;
1441         struct ctdb_client_context *client;
1442         struct ctdb_db_context *db;
1443         TDB_DATA key;
1444         struct ctdb_server_id my_sid;
1445         struct ctdb_record_handle *h;
1446         struct ctdb_g_lock_list *lock_list;
1447 };
1448
1449 static void ctdb_g_lock_unlock_fetched(struct tevent_req *subreq);
1450 static int ctdb_g_lock_unlock_update(struct tevent_req *req);
1451
1452 struct tevent_req *ctdb_g_lock_unlock_send(TALLOC_CTX *mem_ctx,
1453                                            struct tevent_context *ev,
1454                                            struct ctdb_client_context *client,
1455                                            struct ctdb_db_context *db,
1456                                            const char *keyname,
1457                                            struct ctdb_server_id sid)
1458 {
1459         struct tevent_req *req, *subreq;
1460         struct ctdb_g_lock_unlock_state *state;
1461
1462         req = tevent_req_create(mem_ctx, &state,
1463                                 struct ctdb_g_lock_unlock_state);
1464         if (req == NULL) {
1465                 return NULL;
1466         }
1467
1468         state->ev = ev;
1469         state->client = client;
1470         state->db = db;
1471         state->key.dptr = discard_const(keyname);
1472         state->key.dsize = strlen(keyname) + 1;
1473         state->my_sid = sid;
1474
1475         subreq = ctdb_fetch_lock_send(state, ev, client, db, state->key,
1476                                       false);
1477         if (tevent_req_nomem(subreq, req)) {
1478                 return tevent_req_post(req, ev);
1479         }
1480         tevent_req_set_callback(subreq, ctdb_g_lock_unlock_fetched, req);
1481
1482         return req;
1483 }
1484
1485 static void ctdb_g_lock_unlock_fetched(struct tevent_req *subreq)
1486 {
1487         struct tevent_req *req = tevent_req_callback_data(
1488                 subreq, struct tevent_req);
1489         struct ctdb_g_lock_unlock_state *state = tevent_req_data(
1490                 req, struct ctdb_g_lock_unlock_state);
1491         TDB_DATA data;
1492         int ret = 0;
1493
1494         state->h = ctdb_fetch_lock_recv(subreq, NULL, state, &data, &ret);
1495         TALLOC_FREE(subreq);
1496         if (state->h == NULL) {
1497                 tevent_req_error(req, ret);
1498                 return;
1499         }
1500
1501         ret = ctdb_g_lock_list_pull(data.dptr, data.dsize, state,
1502                                     &state->lock_list);
1503         if (ret != 0) {
1504                 tevent_req_error(req, ret);
1505                 return;
1506         }
1507
1508         ret = ctdb_g_lock_unlock_update(req);
1509         if (ret != 0) {
1510                 tevent_req_error(req, ret);
1511                 return;
1512         }
1513
1514         tevent_req_done(req);
1515 }
1516
1517 static int ctdb_g_lock_unlock_update(struct tevent_req *req)
1518 {
1519         struct ctdb_g_lock_unlock_state *state = tevent_req_data(
1520                 req, struct ctdb_g_lock_unlock_state);
1521         struct ctdb_g_lock *lock;
1522         int ret, i;
1523
1524         for (i=0; i<state->lock_list->num; i++) {
1525                 lock = &state->lock_list->lock[i];
1526
1527                 if (ctdb_server_id_equal(&lock->sid, &state->my_sid)) {
1528                         break;
1529                 }
1530         }
1531
1532         if (i < state->lock_list->num) {
1533                 state->lock_list->lock[i] =
1534                         state->lock_list->lock[state->lock_list->num-1];
1535                 state->lock_list->num -= 1;
1536         }
1537
1538         if (state->lock_list->num == 0) {
1539                 ctdb_delete_record(state->h);
1540         } else {
1541                 TDB_DATA data;
1542
1543                 data.dsize = ctdb_g_lock_list_len(state->lock_list);
1544                 data.dptr = talloc_size(state, data.dsize);
1545                 if (data.dptr == NULL) {
1546                         return ENOMEM;
1547                 }
1548
1549                 ctdb_g_lock_list_push(state->lock_list, data.dptr);
1550                 ret = ctdb_store_record(state->h, data);
1551                 talloc_free(data.dptr);
1552                 if (ret != 0) {
1553                         return ret;
1554                 }
1555         }
1556
1557         return 0;
1558 }
1559
1560 bool ctdb_g_lock_unlock_recv(struct tevent_req *req, int *perr)
1561 {
1562         struct ctdb_g_lock_unlock_state *state = tevent_req_data(
1563                 req, struct ctdb_g_lock_unlock_state);
1564         int err;
1565
1566         TALLOC_FREE(state->h);
1567
1568         if (tevent_req_is_unix_error(req, &err)) {
1569                 if (perr != NULL) {
1570                         *perr = err;
1571                 }
1572                 return false;
1573         }
1574
1575         return true;
1576 }
1577
1578 /*
1579  * Persistent database functions
1580  */
1581 struct ctdb_transaction_start_state {
1582         struct tevent_context *ev;
1583         struct ctdb_client_context *client;
1584         struct timeval timeout;
1585         struct ctdb_transaction_handle *h;
1586         uint32_t destnode;
1587 };
1588
1589 static void ctdb_transaction_g_lock_attached(struct tevent_req *subreq);
1590 static void ctdb_transaction_register_done(struct tevent_req *subreq);
1591 static void ctdb_transaction_g_lock_done(struct tevent_req *subreq);
1592 static int ctdb_transaction_handle_destructor(struct ctdb_transaction_handle *h);
1593
1594 struct tevent_req *ctdb_transaction_start_send(TALLOC_CTX *mem_ctx,
1595                                                struct tevent_context *ev,
1596                                                struct ctdb_client_context *client,
1597                                                struct timeval timeout,
1598                                                struct ctdb_db_context *db,
1599                                                bool readonly)
1600 {
1601         struct ctdb_transaction_start_state *state;
1602         struct tevent_req *req, *subreq;
1603         struct ctdb_transaction_handle *h;
1604
1605         req = tevent_req_create(mem_ctx, &state,
1606                                 struct ctdb_transaction_start_state);
1607         if (req == NULL) {
1608                 return NULL;
1609         }
1610
1611         if (! db->persistent) {
1612                 tevent_req_error(req, EINVAL);
1613                 return tevent_req_post(req, ev);
1614         }
1615
1616         state->ev = ev;
1617         state->client = client;
1618         state->destnode = ctdb_client_pnn(client);
1619
1620         h = talloc_zero(db, struct ctdb_transaction_handle);
1621         if (tevent_req_nomem(h, req)) {
1622                 return tevent_req_post(req, ev);
1623         }
1624
1625         h->ev = ev;
1626         h->client = client;
1627         h->db = db;
1628         h->readonly = readonly;
1629         h->updated = false;
1630
1631         /* SRVID is unique for databases, so client can have transactions active
1632          * for multiple databases */
1633         h->sid.pid = getpid();
1634         h->sid.task_id = db->db_id;
1635         h->sid.vnn = state->destnode;
1636         h->sid.unique_id = h->sid.task_id;
1637         h->sid.unique_id = (h->sid.unique_id << 32) | h->sid.pid;
1638
1639         h->recbuf = ctdb_rec_buffer_init(h, db->db_id);
1640         if (tevent_req_nomem(h->recbuf, req)) {
1641                 return tevent_req_post(req, ev);
1642         }
1643
1644         h->lock_name = talloc_asprintf(h, "transaction_db_0x%08x", db->db_id);
1645         if (tevent_req_nomem(h->lock_name, req)) {
1646                 return tevent_req_post(req, ev);
1647         }
1648
1649         state->h = h;
1650
1651         subreq = ctdb_attach_send(state, ev, client, timeout, "g_lock.tdb", 0);
1652         if (tevent_req_nomem(subreq, req)) {
1653                 return tevent_req_post(req, ev);
1654         }
1655         tevent_req_set_callback(subreq, ctdb_transaction_g_lock_attached, req);
1656
1657         return req;
1658 }
1659
1660 static void ctdb_transaction_g_lock_attached(struct tevent_req *subreq)
1661 {
1662         struct tevent_req *req = tevent_req_callback_data(
1663                 subreq, struct tevent_req);
1664         struct ctdb_transaction_start_state *state = tevent_req_data(
1665                 req, struct ctdb_transaction_start_state);
1666         struct ctdb_req_control request;
1667         bool status;
1668         int ret;
1669
1670         status = ctdb_attach_recv(subreq, &ret, &state->h->db_g_lock);
1671         TALLOC_FREE(subreq);
1672         if (! status) {
1673                 tevent_req_error(req, ret);
1674                 return;
1675         }
1676
1677         ctdb_req_control_register_srvid(&request, state->h->sid.unique_id);
1678         subreq = ctdb_client_control_send(state, state->ev, state->client,
1679                                           state->destnode, state->timeout,
1680                                           &request);
1681         if (tevent_req_nomem(subreq, req)) {
1682                 return;
1683         }
1684         tevent_req_set_callback(subreq, ctdb_transaction_register_done, req);
1685 }
1686
1687 static void ctdb_transaction_register_done(struct tevent_req *subreq)
1688 {
1689         struct tevent_req *req = tevent_req_callback_data(
1690                 subreq, struct tevent_req);
1691         struct ctdb_transaction_start_state *state = tevent_req_data(
1692                 req, struct ctdb_transaction_start_state);
1693         struct ctdb_reply_control *reply;
1694         bool status;
1695         int ret;
1696
1697         status = ctdb_client_control_recv(subreq, &ret, state, &reply);
1698         TALLOC_FREE(subreq);
1699         if (! status) {
1700                 tevent_req_error(req, ret);
1701                 return;
1702         }
1703
1704         ret = ctdb_reply_control_register_srvid(reply);
1705         talloc_free(reply);
1706         if (ret != 0) {
1707                 tevent_req_error(req, ret);
1708                 return;
1709         }
1710
1711         subreq = ctdb_g_lock_lock_send(state, state->ev, state->client,
1712                                        state->h->db_g_lock, state->h->lock_name,
1713                                        &state->h->sid, state->h->readonly);
1714         if (tevent_req_nomem(subreq, req)) {
1715                 return;
1716         }
1717         tevent_req_set_callback(subreq, ctdb_transaction_g_lock_done, req);
1718 }
1719
1720 static void ctdb_transaction_g_lock_done(struct tevent_req *subreq)
1721 {
1722         struct tevent_req *req = tevent_req_callback_data(
1723                 subreq, struct tevent_req);
1724         int ret;
1725         bool status;
1726
1727         status = ctdb_g_lock_lock_recv(subreq, &ret);
1728         TALLOC_FREE(subreq);
1729         if (! status) {
1730                 tevent_req_error(req, ret);
1731                 return;
1732         }
1733
1734         tevent_req_done(req);
1735 }
1736
1737 struct ctdb_transaction_handle *ctdb_transaction_start_recv(
1738                                         struct tevent_req *req,
1739                                         int *perr)
1740 {
1741         struct ctdb_transaction_start_state *state = tevent_req_data(
1742                 req, struct ctdb_transaction_start_state);
1743         struct ctdb_transaction_handle *h = state->h;
1744         int err;
1745
1746         if (tevent_req_is_unix_error(req, &err)) {
1747                 if (perr != NULL) {
1748                         *perr = err;
1749                 }
1750                 return NULL;
1751         }
1752
1753         talloc_set_destructor(h, ctdb_transaction_handle_destructor);
1754         return h;
1755 }
1756
1757 static int ctdb_transaction_handle_destructor(struct ctdb_transaction_handle *h)
1758 {
1759         int ret;
1760
1761         ret = ctdb_ctrl_deregister_srvid(h, h->ev, h->client, h->client->pnn,
1762                                          tevent_timeval_zero(),
1763                                          h->sid.unique_id);
1764         if (ret != 0) {
1765                 DEBUG(DEBUG_WARNING, ("Failed to deregister SRVID\n"));
1766         }
1767
1768         return 0;
1769 }
1770
1771 int ctdb_transaction_start(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
1772                            struct ctdb_client_context *client,
1773                            struct timeval timeout,
1774                            struct ctdb_db_context *db, bool readonly,
1775                            struct ctdb_transaction_handle **out)
1776 {
1777         struct tevent_req *req;
1778         struct ctdb_transaction_handle *h;
1779         int ret;
1780
1781         req = ctdb_transaction_start_send(mem_ctx, ev, client, timeout, db,
1782                                           readonly);
1783         if (req == NULL) {
1784                 return ENOMEM;
1785         }
1786
1787         tevent_req_poll(req, ev);
1788
1789         h = ctdb_transaction_start_recv(req, &ret);
1790         if (h == NULL) {
1791                 return ret;
1792         }
1793
1794         *out = h;
1795         return 0;
1796 }
1797
1798 struct ctdb_transaction_record_fetch_state {
1799         TDB_DATA key, data;
1800         struct ctdb_ltdb_header header;
1801         bool found;
1802 };
1803
1804 static int ctdb_transaction_record_fetch_traverse(uint32_t reqid,
1805                                                   struct ctdb_ltdb_header *header,
1806                                                   TDB_DATA key,
1807                                                   TDB_DATA data,
1808                                                   void *private_data)
1809 {
1810         struct ctdb_transaction_record_fetch_state *state =
1811                 (struct ctdb_transaction_record_fetch_state *)private_data;
1812
1813         if (state->key.dsize == key.dsize &&
1814             memcmp(state->key.dptr, key.dptr, key.dsize) == 0) {
1815                 state->data = data;
1816                 state->header = *header;
1817                 state->found = true;
1818         }
1819
1820         return 0;
1821 }
1822
1823 static int ctdb_transaction_record_fetch(struct ctdb_transaction_handle *h,
1824                                          TDB_DATA key,
1825                                          struct ctdb_ltdb_header *header,
1826                                          TDB_DATA *data)
1827 {
1828         struct ctdb_transaction_record_fetch_state state;
1829         int ret;
1830
1831         state.key = key;
1832         state.found = false;
1833
1834         ret = ctdb_rec_buffer_traverse(h->recbuf,
1835                                        ctdb_transaction_record_fetch_traverse,
1836                                        &state);
1837         if (ret != 0) {
1838                 return ret;
1839         }
1840
1841         if (state.found) {
1842                 if (header != NULL) {
1843                         *header = state.header;
1844                 }
1845                 if (data != NULL) {
1846                         *data = state.data;
1847                 }
1848                 return 0;
1849         }
1850
1851         return ENOENT;
1852 }
1853
1854 int ctdb_transaction_fetch_record(struct ctdb_transaction_handle *h,
1855                                   TDB_DATA key,
1856                                   TALLOC_CTX *mem_ctx, TDB_DATA *data)
1857 {
1858         TDB_DATA tmp_data;
1859         struct ctdb_ltdb_header header;
1860         int ret;
1861
1862         ret = ctdb_transaction_record_fetch(h, key, NULL, &tmp_data);
1863         if (ret == 0) {
1864                 data->dptr = talloc_memdup(mem_ctx, tmp_data.dptr,
1865                                            tmp_data.dsize);
1866                 if (data->dptr == NULL) {
1867                         return ENOMEM;
1868                 }
1869                 data->dsize = tmp_data.dsize;
1870                 return 0;
1871         }
1872
1873         ret = ctdb_ltdb_fetch(h->db, key, &header, mem_ctx, data);
1874         if (ret != 0) {
1875                 return ret;
1876         }
1877
1878         ret = ctdb_rec_buffer_add(h, h->recbuf, 0, &header, key, *data);
1879         if (ret != 0) {
1880                 return ret;
1881         }
1882
1883         return 0;
1884 }
1885
1886 int ctdb_transaction_store_record(struct ctdb_transaction_handle *h,
1887                                   TDB_DATA key, TDB_DATA data)
1888 {
1889         TALLOC_CTX *tmp_ctx;
1890         struct ctdb_ltdb_header header;
1891         TDB_DATA old_data;
1892         int ret;
1893
1894         if (h->readonly) {
1895                 return EINVAL;
1896         }
1897
1898         tmp_ctx = talloc_new(h);
1899         if (tmp_ctx == NULL) {
1900                 return ENOMEM;
1901         }
1902
1903         ret = ctdb_transaction_record_fetch(h, key, &header, &old_data);
1904         if (ret != 0) {
1905                 ret = ctdb_ltdb_fetch(h->db, key, &header, tmp_ctx, &old_data);
1906                 if (ret != 0) {
1907                         return ret;
1908                 }
1909         }
1910
1911         if (old_data.dsize == data.dsize &&
1912             memcmp(old_data.dptr, data.dptr, data.dsize) == 0) {
1913                 talloc_free(tmp_ctx);
1914                 return 0;
1915         }
1916
1917         header.dmaster = ctdb_client_pnn(h->client);
1918         header.rsn += 1;
1919
1920         ret = ctdb_rec_buffer_add(h, h->recbuf, 0, &header, key, data);
1921         talloc_free(tmp_ctx);
1922         if (ret != 0) {
1923                 return ret;
1924         }
1925         h->updated = true;
1926
1927         return 0;
1928 }
1929
1930 int ctdb_transaction_delete_record(struct ctdb_transaction_handle *h,
1931                                    TDB_DATA key)
1932 {
1933         return ctdb_transaction_store_record(h, key, tdb_null);
1934 }
1935
1936 static int ctdb_transaction_store_db_seqnum(struct ctdb_transaction_handle *h,
1937                                             uint64_t seqnum)
1938 {
1939         const char *keyname = CTDB_DB_SEQNUM_KEY;
1940         TDB_DATA key, data;
1941
1942         key.dptr = discard_const(keyname);
1943         key.dsize = strlen(keyname) + 1;
1944
1945         data.dptr = (uint8_t *)&seqnum;
1946         data.dsize = sizeof(seqnum);
1947
1948         return ctdb_transaction_store_record(h, key, data);
1949 }
1950
1951 struct ctdb_transaction_commit_state {
1952         struct tevent_context *ev;
1953         struct ctdb_transaction_handle *h;
1954         uint64_t seqnum;
1955 };
1956
1957 static void ctdb_transaction_commit_seqnum_done(struct tevent_req *subreq);
1958 static void ctdb_transaction_commit_try(struct tevent_req *subreq);
1959 static void ctdb_transaction_commit_done(struct tevent_req *subreq);
1960 static void ctdb_transaction_commit_seqnum2_done(struct tevent_req *subreq);
1961
1962 struct tevent_req *ctdb_transaction_commit_send(
1963                                         TALLOC_CTX *mem_ctx,
1964                                         struct tevent_context *ev,
1965                                         struct ctdb_transaction_handle *h)
1966 {
1967         struct tevent_req *req, *subreq;
1968         struct ctdb_transaction_commit_state *state;
1969         struct ctdb_req_control request;
1970
1971         req = tevent_req_create(mem_ctx, &state,
1972                                 struct ctdb_transaction_commit_state);
1973         if (req == NULL) {
1974                 return NULL;
1975         }
1976
1977         state->ev = ev;
1978         state->h = h;
1979
1980         ctdb_req_control_get_db_seqnum(&request, h->db->db_id);
1981         subreq = ctdb_client_control_send(state, ev, h->client,
1982                                           h->client->pnn,
1983                                           tevent_timeval_zero(), &request);
1984         if (tevent_req_nomem(subreq, req)) {
1985                 return tevent_req_post(req, ev);
1986         }
1987         tevent_req_set_callback(subreq, ctdb_transaction_commit_seqnum_done,
1988                                 req);
1989
1990         return req;
1991 }
1992
1993 static void ctdb_transaction_commit_seqnum_done(struct tevent_req *subreq)
1994 {
1995         struct tevent_req *req = tevent_req_callback_data(
1996                 subreq, struct tevent_req);
1997         struct ctdb_transaction_commit_state *state = tevent_req_data(
1998                 req, struct ctdb_transaction_commit_state);
1999         struct ctdb_reply_control *reply;
2000         int ret;
2001         bool status;
2002
2003         status = ctdb_client_control_recv(subreq, &ret, state, &reply);
2004         TALLOC_FREE(subreq);
2005         if (! status) {
2006                 tevent_req_error(req, ret);
2007                 return;
2008         }
2009
2010         ret = ctdb_reply_control_get_db_seqnum(reply, &state->seqnum);
2011         if (ret != 0) {
2012                 tevent_req_error(req, ret);
2013                 return;
2014         }
2015
2016         ret = ctdb_transaction_store_db_seqnum(state->h, state->seqnum+1);
2017         if (ret != 0) {
2018                 tevent_req_error(req, ret);
2019                 return;
2020         }
2021
2022         subreq = ctdb_recovery_wait_send(state, state->ev, state->h->client);
2023         if (tevent_req_nomem(subreq, req)) {
2024                 return;
2025         }
2026         tevent_req_set_callback(subreq, ctdb_transaction_commit_try, req);
2027 }
2028
2029 static void ctdb_transaction_commit_try(struct tevent_req *subreq)
2030 {
2031         struct tevent_req *req = tevent_req_callback_data(
2032                 subreq, struct tevent_req);
2033         struct ctdb_transaction_commit_state *state = tevent_req_data(
2034                 req, struct ctdb_transaction_commit_state);
2035         struct ctdb_req_control request;
2036         int ret;
2037         bool status;
2038
2039         status = ctdb_recovery_wait_recv(subreq, &ret);
2040         TALLOC_FREE(subreq);
2041         if (! status) {
2042                 tevent_req_error(req, ret);
2043                 return;
2044         }
2045
2046         ctdb_req_control_trans3_commit(&request, state->h->recbuf);
2047         subreq = ctdb_client_control_send(state, state->ev, state->h->client,
2048                                           state->h->client->pnn,
2049                                           tevent_timeval_zero(), &request);
2050         if (tevent_req_nomem(subreq, req)) {
2051                 return;
2052         }
2053         tevent_req_set_callback(subreq, ctdb_transaction_commit_done, req);
2054 }
2055
2056 static void ctdb_transaction_commit_done(struct tevent_req *subreq)
2057 {
2058         struct tevent_req *req = tevent_req_callback_data(
2059                 subreq, struct tevent_req);
2060         struct ctdb_transaction_commit_state *state = tevent_req_data(
2061                 req, struct ctdb_transaction_commit_state);
2062         struct ctdb_reply_control *reply;
2063         struct ctdb_req_control request;
2064         int ret;
2065         bool status;
2066
2067         status = ctdb_client_control_recv(subreq, &ret, state, &reply);
2068         TALLOC_FREE(subreq);
2069         if (! status) {
2070                 tevent_req_error(req, ret);
2071                 return;
2072         }
2073
2074         ret = ctdb_reply_control_trans3_commit(reply);
2075         if (ret < 0) {
2076                 /* Control failed due to recovery */
2077                 subreq = ctdb_recovery_wait_send(state, state->ev,
2078                                                  state->h->client);
2079                 if (tevent_req_nomem(subreq, req)) {
2080                         return;
2081                 }
2082                 tevent_req_set_callback(subreq, ctdb_transaction_commit_try,
2083                                         req);
2084                 return;
2085         }
2086
2087         ctdb_req_control_get_db_seqnum(&request, state->h->db->db_id);
2088         subreq = ctdb_client_control_send(state, state->ev, state->h->client,
2089                                           state->h->client->pnn,
2090                                           tevent_timeval_zero(), &request);
2091         if (tevent_req_nomem(subreq, req)) {
2092                 return;
2093         }
2094         tevent_req_set_callback(subreq, ctdb_transaction_commit_seqnum2_done,
2095                                 req);
2096 }
2097
2098 static void ctdb_transaction_commit_seqnum2_done(struct tevent_req *subreq)
2099 {
2100         struct tevent_req *req = tevent_req_callback_data(
2101                 subreq, struct tevent_req);
2102         struct ctdb_transaction_commit_state *state = tevent_req_data(
2103                 req, struct ctdb_transaction_commit_state);
2104         struct ctdb_reply_control *reply;
2105         uint64_t seqnum;
2106         int ret;
2107         bool status;
2108
2109         status = ctdb_client_control_recv(subreq, &ret, state, &reply);
2110         TALLOC_FREE(subreq);
2111         if (! status) {
2112                 tevent_req_error(req, ret);
2113                 return;
2114         }
2115
2116         ret = ctdb_reply_control_get_db_seqnum(reply, &seqnum);
2117         if (ret != 0) {
2118                 tevent_req_error(req, ret);
2119                 return;
2120         }
2121
2122         if (seqnum == state->seqnum) {
2123                 subreq = ctdb_recovery_wait_send(state, state->ev,
2124                                                  state->h->client);
2125                 if (tevent_req_nomem(subreq, req)) {
2126                         return;
2127                 }
2128                 tevent_req_set_callback(subreq, ctdb_transaction_commit_try,
2129                                         req);
2130                 return;
2131         }
2132
2133         if (seqnum != state->seqnum + 1) {
2134                 tevent_req_error(req, EIO);
2135                 return;
2136         }
2137
2138         tevent_req_done(req);
2139 }
2140
2141 bool ctdb_transaction_commit_recv(struct tevent_req *req, int *perr)
2142 {
2143         int err;
2144
2145         if (tevent_req_is_unix_error(req, &err)) {
2146                 if (perr != NULL) {
2147                         *perr = err;
2148                 }
2149                 return false;
2150         }
2151
2152         return true;
2153 }
2154
2155 int ctdb_transaction_commit(struct ctdb_transaction_handle *h)
2156 {
2157         struct tevent_req *req;
2158         int ret;
2159         bool status;
2160
2161         if (h->readonly || ! h->updated) {
2162                 talloc_free(h);
2163                 return 0;
2164         }
2165
2166         req = ctdb_transaction_commit_send(h, h->ev, h);
2167         if (req == NULL) {
2168                 talloc_free(h);
2169                 return ENOMEM;
2170         }
2171
2172         tevent_req_poll(req, h->ev);
2173
2174         status = ctdb_transaction_commit_recv(req, &ret);
2175         if (! status) {
2176                 talloc_free(h);
2177                 return ret;
2178         }
2179
2180         talloc_free(h);
2181         return 0;
2182 }
2183
2184 int ctdb_transaction_cancel(struct ctdb_transaction_handle *h)
2185 {
2186         talloc_free(h);
2187         return 0;
2188 }
2189
2190 /*
2191  * TODO:
2192  *
2193  * In future Samba should register SERVER_ID.
2194  * Make that structure same as struct srvid {}.
2195  */