ctdb-daemon: Rename struct ctdb_reply_dmaster to ctdb_reply_dmaster_old
[obnox/samba/samba-obnox.git] / ctdb / server / ctdb_call.c
1 /* 
2    ctdb_call protocol code
3
4    Copyright (C) Andrew Tridgell  2006
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19 /*
20   see http://wiki.samba.org/index.php/Samba_%26_Clustering for
21   protocol design and packet details
22 */
23 #include "replace.h"
24 #include "system/network.h"
25 #include "system/filesys.h"
26
27 #include <talloc.h>
28 #include <tevent.h>
29
30 #include "lib/util/dlinklist.h"
31 #include "lib/util/debug.h"
32 #include "lib/util/samba_util.h"
33
34 #include "ctdb_private.h"
35 #include "ctdb_client.h"
36 #include "ctdb_logging.h"
37
38 #include "common/rb_tree.h"
39 #include "common/reqid.h"
40 #include "common/system.h"
41 #include "common/common.h"
42
43 struct ctdb_sticky_record {
44         struct ctdb_context *ctdb;
45         struct ctdb_db_context *ctdb_db;
46         TDB_CONTEXT *pindown;
47 };
48
49 /*
50   find the ctdb_db from a db index
51  */
52  struct ctdb_db_context *find_ctdb_db(struct ctdb_context *ctdb, uint32_t id)
53 {
54         struct ctdb_db_context *ctdb_db;
55
56         for (ctdb_db=ctdb->db_list; ctdb_db; ctdb_db=ctdb_db->next) {
57                 if (ctdb_db->db_id == id) {
58                         break;
59                 }
60         }
61         return ctdb_db;
62 }
63
64 /*
65   a varient of input packet that can be used in lock requeue
66 */
67 static void ctdb_call_input_pkt(void *p, struct ctdb_req_header *hdr)
68 {
69         struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context);
70         ctdb_input_pkt(ctdb, hdr);
71 }
72
73
74 /*
75   send an error reply
76 */
77 static void ctdb_send_error(struct ctdb_context *ctdb, 
78                             struct ctdb_req_header *hdr, uint32_t status,
79                             const char *fmt, ...) PRINTF_ATTRIBUTE(4,5);
80 static void ctdb_send_error(struct ctdb_context *ctdb, 
81                             struct ctdb_req_header *hdr, uint32_t status,
82                             const char *fmt, ...)
83 {
84         va_list ap;
85         struct ctdb_reply_error_old *r;
86         char *msg;
87         int msglen, len;
88
89         if (ctdb->methods == NULL) {
90                 DEBUG(DEBUG_INFO,(__location__ " Failed to send error. Transport is DOWN\n"));
91                 return;
92         }
93
94         va_start(ap, fmt);
95         msg = talloc_vasprintf(ctdb, fmt, ap);
96         if (msg == NULL) {
97                 ctdb_fatal(ctdb, "Unable to allocate error in ctdb_send_error\n");
98         }
99         va_end(ap);
100
101         msglen = strlen(msg)+1;
102         len = offsetof(struct ctdb_reply_error_old, msg);
103         r = ctdb_transport_allocate(ctdb, msg, CTDB_REPLY_ERROR, len + msglen, 
104                                     struct ctdb_reply_error_old);
105         CTDB_NO_MEMORY_FATAL(ctdb, r);
106
107         r->hdr.destnode  = hdr->srcnode;
108         r->hdr.reqid     = hdr->reqid;
109         r->status        = status;
110         r->msglen        = msglen;
111         memcpy(&r->msg[0], msg, msglen);
112
113         ctdb_queue_packet(ctdb, &r->hdr);
114
115         talloc_free(msg);
116 }
117
118
119 /**
120  * send a redirect reply
121  *
122  * The logic behind this function is this:
123  *
124  * A client wants to grab a record and sends a CTDB_REQ_CALL packet
125  * to its local ctdb (ctdb_request_call). If the node is not itself
126  * the record's DMASTER, it first redirects the packet to  the
127  * record's LMASTER. The LMASTER then redirects the call packet to
128  * the current DMASTER. Note that this works because of this: When
129  * a record is migrated off a node, then the new DMASTER is stored
130  * in the record's copy on the former DMASTER.
131  */
132 static void ctdb_call_send_redirect(struct ctdb_context *ctdb,
133                                     struct ctdb_db_context *ctdb_db,
134                                     TDB_DATA key,
135                                     struct ctdb_req_call_old *c, 
136                                     struct ctdb_ltdb_header *header)
137 {
138         uint32_t lmaster = ctdb_lmaster(ctdb, &key);
139
140         c->hdr.destnode = lmaster;
141         if (ctdb->pnn == lmaster) {
142                 c->hdr.destnode = header->dmaster;
143         }
144         c->hopcount++;
145
146         if (c->hopcount%100 > 95) {
147                 DEBUG(DEBUG_WARNING,("High hopcount %d dbid:%s "
148                         "key:0x%08x reqid=%08x pnn:%d src:%d lmaster:%d "
149                         "header->dmaster:%d dst:%d\n",
150                         c->hopcount, ctdb_db->db_name, ctdb_hash(&key),
151                         c->hdr.reqid, ctdb->pnn, c->hdr.srcnode, lmaster,
152                         header->dmaster, c->hdr.destnode));
153         }
154
155         ctdb_queue_packet(ctdb, &c->hdr);
156 }
157
158
159 /*
160   send a dmaster reply
161
162   caller must have the chainlock before calling this routine. Caller must be
163   the lmaster
164 */
165 static void ctdb_send_dmaster_reply(struct ctdb_db_context *ctdb_db,
166                                     struct ctdb_ltdb_header *header,
167                                     TDB_DATA key, TDB_DATA data,
168                                     uint32_t new_dmaster,
169                                     uint32_t reqid)
170 {
171         struct ctdb_context *ctdb = ctdb_db->ctdb;
172         struct ctdb_reply_dmaster_old *r;
173         int ret, len;
174         TALLOC_CTX *tmp_ctx;
175
176         if (ctdb->pnn != ctdb_lmaster(ctdb, &key)) {
177                 DEBUG(DEBUG_ALERT,(__location__ " Caller is not lmaster!\n"));
178                 return;
179         }
180
181         header->dmaster = new_dmaster;
182         ret = ctdb_ltdb_store(ctdb_db, key, header, data);
183         if (ret != 0) {
184                 ctdb_fatal(ctdb, "ctdb_send_dmaster_reply unable to update dmaster");
185                 return;
186         }
187
188         if (ctdb->methods == NULL) {
189                 ctdb_fatal(ctdb, "ctdb_send_dmaster_reply cant update dmaster since transport is down");
190                 return;
191         }
192
193         /* put the packet on a temporary context, allowing us to safely free
194            it below even if ctdb_reply_dmaster() has freed it already */
195         tmp_ctx = talloc_new(ctdb);
196
197         /* send the CTDB_REPLY_DMASTER */
198         len = offsetof(struct ctdb_reply_dmaster_old, data) + key.dsize + data.dsize + sizeof(uint32_t);
199         r = ctdb_transport_allocate(ctdb, tmp_ctx, CTDB_REPLY_DMASTER, len,
200                                     struct ctdb_reply_dmaster_old);
201         CTDB_NO_MEMORY_FATAL(ctdb, r);
202
203         r->hdr.destnode  = new_dmaster;
204         r->hdr.reqid     = reqid;
205         r->hdr.generation = ctdb_db->generation;
206         r->rsn           = header->rsn;
207         r->keylen        = key.dsize;
208         r->datalen       = data.dsize;
209         r->db_id         = ctdb_db->db_id;
210         memcpy(&r->data[0], key.dptr, key.dsize);
211         memcpy(&r->data[key.dsize], data.dptr, data.dsize);
212         memcpy(&r->data[key.dsize+data.dsize], &header->flags, sizeof(uint32_t));
213
214         ctdb_queue_packet(ctdb, &r->hdr);
215
216         talloc_free(tmp_ctx);
217 }
218
219 /*
220   send a dmaster request (give another node the dmaster for a record)
221
222   This is always sent to the lmaster, which ensures that the lmaster
223   always knows who the dmaster is. The lmaster will then send a
224   CTDB_REPLY_DMASTER to the new dmaster
225 */
226 static void ctdb_call_send_dmaster(struct ctdb_db_context *ctdb_db, 
227                                    struct ctdb_req_call_old *c, 
228                                    struct ctdb_ltdb_header *header,
229                                    TDB_DATA *key, TDB_DATA *data)
230 {
231         struct ctdb_req_dmaster_old *r;
232         struct ctdb_context *ctdb = ctdb_db->ctdb;
233         int len;
234         uint32_t lmaster = ctdb_lmaster(ctdb, key);
235
236         if (ctdb->methods == NULL) {
237                 ctdb_fatal(ctdb, "Failed ctdb_call_send_dmaster since transport is down");
238                 return;
239         }
240
241         if (data->dsize != 0) {
242                 header->flags |= CTDB_REC_FLAG_MIGRATED_WITH_DATA;
243         }
244
245         if (lmaster == ctdb->pnn) {
246                 ctdb_send_dmaster_reply(ctdb_db, header, *key, *data, 
247                                         c->hdr.srcnode, c->hdr.reqid);
248                 return;
249         }
250         
251         len = offsetof(struct ctdb_req_dmaster_old, data) + key->dsize + data->dsize
252                         + sizeof(uint32_t);
253         r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REQ_DMASTER, len, 
254                                     struct ctdb_req_dmaster_old);
255         CTDB_NO_MEMORY_FATAL(ctdb, r);
256         r->hdr.destnode  = lmaster;
257         r->hdr.reqid     = c->hdr.reqid;
258         r->hdr.generation = ctdb_db->generation;
259         r->db_id         = c->db_id;
260         r->rsn           = header->rsn;
261         r->dmaster       = c->hdr.srcnode;
262         r->keylen        = key->dsize;
263         r->datalen       = data->dsize;
264         memcpy(&r->data[0], key->dptr, key->dsize);
265         memcpy(&r->data[key->dsize], data->dptr, data->dsize);
266         memcpy(&r->data[key->dsize + data->dsize], &header->flags, sizeof(uint32_t));
267
268         header->dmaster = c->hdr.srcnode;
269         if (ctdb_ltdb_store(ctdb_db, *key, header, *data) != 0) {
270                 ctdb_fatal(ctdb, "Failed to store record in ctdb_call_send_dmaster");
271         }
272         
273         ctdb_queue_packet(ctdb, &r->hdr);
274
275         talloc_free(r);
276 }
277
278 static void ctdb_sticky_pindown_timeout(struct tevent_context *ev,
279                                         struct tevent_timer *te,
280                                         struct timeval t, void *private_data)
281 {
282         struct ctdb_sticky_record *sr = talloc_get_type(private_data, 
283                                                        struct ctdb_sticky_record);
284
285         DEBUG(DEBUG_ERR,("Pindown timeout db:%s  unstick record\n", sr->ctdb_db->db_name));
286         if (sr->pindown != NULL) {
287                 talloc_free(sr->pindown);
288                 sr->pindown = NULL;
289         }
290 }
291
292 static int
293 ctdb_set_sticky_pindown(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, TDB_DATA key)
294 {
295         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
296         uint32_t *k;
297         struct ctdb_sticky_record *sr;
298
299         k = ctdb_key_to_idkey(tmp_ctx, key);
300         if (k == NULL) {
301                 DEBUG(DEBUG_ERR,("Failed to allocate key for sticky record\n"));
302                 talloc_free(tmp_ctx);
303                 return -1;
304         }
305
306         sr = trbt_lookuparray32(ctdb_db->sticky_records, k[0], &k[0]);
307         if (sr == NULL) {
308                 talloc_free(tmp_ctx);
309                 return 0;
310         }
311
312         talloc_free(tmp_ctx);
313
314         if (sr->pindown == NULL) {
315                 DEBUG(DEBUG_ERR,("Pinning down record in %s for %d ms\n", ctdb_db->db_name, ctdb->tunable.sticky_pindown));
316                 sr->pindown = talloc_new(sr);
317                 if (sr->pindown == NULL) {
318                         DEBUG(DEBUG_ERR,("Failed to allocate pindown context for sticky record\n"));
319                         return -1;
320                 }
321                 tevent_add_timer(ctdb->ev, sr->pindown,
322                                  timeval_current_ofs(ctdb->tunable.sticky_pindown / 1000,
323                                                      (ctdb->tunable.sticky_pindown * 1000) % 1000000),
324                                  ctdb_sticky_pindown_timeout, sr);
325         }
326
327         return 0;
328 }
329
330 /*
331   called when a CTDB_REPLY_DMASTER packet comes in, or when the lmaster
332   gets a CTDB_REQUEST_DMASTER for itself. We become the dmaster.
333
334   must be called with the chainlock held. This function releases the chainlock
335 */
336 static void ctdb_become_dmaster(struct ctdb_db_context *ctdb_db,
337                                 struct ctdb_req_header *hdr,
338                                 TDB_DATA key, TDB_DATA data,
339                                 uint64_t rsn, uint32_t record_flags)
340 {
341         struct ctdb_call_state *state;
342         struct ctdb_context *ctdb = ctdb_db->ctdb;
343         struct ctdb_ltdb_header header;
344         int ret;
345
346         DEBUG(DEBUG_DEBUG,("pnn %u dmaster response %08x\n", ctdb->pnn, ctdb_hash(&key)));
347
348         ZERO_STRUCT(header);
349         header.rsn = rsn;
350         header.dmaster = ctdb->pnn;
351         header.flags = record_flags;
352
353         state = reqid_find(ctdb->idr, hdr->reqid, struct ctdb_call_state);
354
355         if (state) {
356                 if (state->call->flags & CTDB_CALL_FLAG_VACUUM_MIGRATION) {
357                         /*
358                          * We temporarily add the VACUUM_MIGRATED flag to
359                          * the record flags, so that ctdb_ltdb_store can
360                          * decide whether the record should be stored or
361                          * deleted.
362                          */
363                         header.flags |= CTDB_REC_FLAG_VACUUM_MIGRATED;
364                 }
365         }
366
367         if (ctdb_ltdb_store(ctdb_db, key, &header, data) != 0) {
368                 ctdb_fatal(ctdb, "ctdb_reply_dmaster store failed\n");
369
370                 ret = ctdb_ltdb_unlock(ctdb_db, key);
371                 if (ret != 0) {
372                         DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
373                 }
374                 return;
375         }
376
377         /* we just became DMASTER and this database is "sticky",
378            see if the record is flagged as "hot" and set up a pin-down
379            context to stop migrations for a little while if so
380         */
381         if (ctdb_db->sticky) {
382                 ctdb_set_sticky_pindown(ctdb, ctdb_db, key);
383         }
384
385         if (state == NULL) {
386                 DEBUG(DEBUG_ERR,("pnn %u Invalid reqid %u in ctdb_become_dmaster from node %u\n",
387                          ctdb->pnn, hdr->reqid, hdr->srcnode));
388
389                 ret = ctdb_ltdb_unlock(ctdb_db, key);
390                 if (ret != 0) {
391                         DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
392                 }
393                 return;
394         }
395
396         if (key.dsize != state->call->key.dsize || memcmp(key.dptr, state->call->key.dptr, key.dsize)) {
397                 DEBUG(DEBUG_ERR, ("Got bogus DMASTER packet reqid:%u from node %u. Key does not match key held in matching idr.\n", hdr->reqid, hdr->srcnode));
398
399                 ret = ctdb_ltdb_unlock(ctdb_db, key);
400                 if (ret != 0) {
401                         DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
402                 }
403                 return;
404         }
405
406         if (hdr->reqid != state->reqid) {
407                 /* we found a record  but it was the wrong one */
408                 DEBUG(DEBUG_ERR, ("Dropped orphan in ctdb_become_dmaster with reqid:%u\n from node %u", hdr->reqid, hdr->srcnode));
409
410                 ret = ctdb_ltdb_unlock(ctdb_db, key);
411                 if (ret != 0) {
412                         DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
413                 }
414                 return;
415         }
416
417         ctdb_call_local(ctdb_db, state->call, &header, state, &data, true);
418
419         ret = ctdb_ltdb_unlock(ctdb_db, state->call->key);
420         if (ret != 0) {
421                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
422         }
423
424         state->state = CTDB_CALL_DONE;
425         if (state->async.fn) {
426                 state->async.fn(state);
427         }
428 }
429
430 struct dmaster_defer_call {
431         struct dmaster_defer_call *next, *prev;
432         struct ctdb_context *ctdb;
433         struct ctdb_req_header *hdr;
434 };
435
436 struct dmaster_defer_queue {
437         struct ctdb_db_context *ctdb_db;
438         uint32_t generation;
439         struct dmaster_defer_call *deferred_calls;
440 };
441
442 static void dmaster_defer_reprocess(struct tevent_context *ev,
443                                     struct tevent_timer *te,
444                                     struct timeval t,
445                                     void *private_data)
446 {
447         struct dmaster_defer_call *call = talloc_get_type(
448                 private_data, struct dmaster_defer_call);
449
450         ctdb_input_pkt(call->ctdb, call->hdr);
451         talloc_free(call);
452 }
453
454 static int dmaster_defer_queue_destructor(struct dmaster_defer_queue *ddq)
455 {
456         /* Ignore requests, if database recovery happens in-between. */
457         if (ddq->generation != ddq->ctdb_db->generation) {
458                 return 0;
459         }
460
461         while (ddq->deferred_calls != NULL) {
462                 struct dmaster_defer_call *call = ddq->deferred_calls;
463
464                 DLIST_REMOVE(ddq->deferred_calls, call);
465
466                 talloc_steal(call->ctdb, call);
467                 tevent_add_timer(call->ctdb->ev, call, timeval_zero(),
468                                  dmaster_defer_reprocess, call);
469         }
470         return 0;
471 }
472
473 static void *insert_ddq_callback(void *parm, void *data)
474 {
475         if (data) {
476                 talloc_free(data);
477         }
478         return parm;
479 }
480
481 /**
482  * This function is used to reigster a key in database that needs to be updated.
483  * Any requests for that key should get deferred till this is completed.
484  */
485 static int dmaster_defer_setup(struct ctdb_db_context *ctdb_db,
486                                struct ctdb_req_header *hdr,
487                                TDB_DATA key)
488 {
489         uint32_t *k;
490         struct dmaster_defer_queue *ddq;
491
492         k = ctdb_key_to_idkey(hdr, key);
493         if (k == NULL) {
494                 DEBUG(DEBUG_ERR, ("Failed to allocate key for dmaster defer setup\n"));
495                 return -1;
496         }
497
498         /* Already exists */
499         ddq = trbt_lookuparray32(ctdb_db->defer_dmaster, k[0], k);
500         if (ddq != NULL) {
501                 if (ddq->generation == ctdb_db->generation) {
502                         talloc_free(k);
503                         return 0;
504                 }
505
506                 /* Recovery ocurred - get rid of old queue. All the deferred
507                  * requests will be resent anyway from ctdb_call_resend_db.
508                  */
509                 talloc_free(ddq);
510         }
511
512         ddq = talloc(hdr, struct dmaster_defer_queue);
513         if (ddq == NULL) {
514                 DEBUG(DEBUG_ERR, ("Failed to allocate dmaster defer queue\n"));
515                 talloc_free(k);
516                 return -1;
517         }
518         ddq->ctdb_db = ctdb_db;
519         ddq->generation = hdr->generation;
520         ddq->deferred_calls = NULL;
521
522         trbt_insertarray32_callback(ctdb_db->defer_dmaster, k[0], k,
523                                     insert_ddq_callback, ddq);
524         talloc_set_destructor(ddq, dmaster_defer_queue_destructor);
525
526         talloc_free(k);
527         return 0;
528 }
529
530 static int dmaster_defer_add(struct ctdb_db_context *ctdb_db,
531                              struct ctdb_req_header *hdr,
532                              TDB_DATA key)
533 {
534         struct dmaster_defer_queue *ddq;
535         struct dmaster_defer_call *call;
536         uint32_t *k;
537
538         k = ctdb_key_to_idkey(hdr, key);
539         if (k == NULL) {
540                 DEBUG(DEBUG_ERR, ("Failed to allocate key for dmaster defer add\n"));
541                 return -1;
542         }
543
544         ddq = trbt_lookuparray32(ctdb_db->defer_dmaster, k[0], k);
545         if (ddq == NULL) {
546                 talloc_free(k);
547                 return -1;
548         }
549
550         talloc_free(k);
551
552         if (ddq->generation != hdr->generation) {
553                 talloc_set_destructor(ddq, NULL);
554                 talloc_free(ddq);
555                 return -1;
556         }
557
558         call = talloc(ddq, struct dmaster_defer_call);
559         if (call == NULL) {
560                 DEBUG(DEBUG_ERR, ("Failed to allocate dmaster defer call\n"));
561                 return -1;
562         }
563
564         call->ctdb = ctdb_db->ctdb;
565         call->hdr = talloc_steal(call, hdr);
566
567         DLIST_ADD_END(ddq->deferred_calls, call, NULL);
568
569         return 0;
570 }
571
572 /*
573   called when a CTDB_REQ_DMASTER packet comes in
574
575   this comes into the lmaster for a record when the current dmaster
576   wants to give up the dmaster role and give it to someone else
577 */
578 void ctdb_request_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
579 {
580         struct ctdb_req_dmaster_old *c = (struct ctdb_req_dmaster_old *)hdr;
581         TDB_DATA key, data, data2;
582         struct ctdb_ltdb_header header;
583         struct ctdb_db_context *ctdb_db;
584         uint32_t record_flags = 0;
585         size_t len;
586         int ret;
587
588         ctdb_db = find_ctdb_db(ctdb, c->db_id);
589         if (!ctdb_db) {
590                 ctdb_send_error(ctdb, hdr, -1,
591                                 "Unknown database in request. db_id==0x%08x",
592                                 c->db_id);
593                 return;
594         }
595
596         if (hdr->generation != ctdb_db->generation) {
597                 DEBUG(DEBUG_DEBUG,
598                       ("ctdb operation %u request %u from node %u to %u had an"
599                        " invalid generation:%u while our generation is:%u\n",
600                        hdr->operation, hdr->reqid, hdr->srcnode, hdr->destnode,
601                        hdr->generation, ctdb_db->generation));
602                 return;
603         }
604
605         key.dptr = c->data;
606         key.dsize = c->keylen;
607         data.dptr = c->data + c->keylen;
608         data.dsize = c->datalen;
609         len = offsetof(struct ctdb_req_dmaster_old, data) + key.dsize + data.dsize
610                         + sizeof(uint32_t);
611         if (len <= c->hdr.length) {
612                 memcpy(&record_flags, &c->data[c->keylen + c->datalen],
613                        sizeof(record_flags));
614         }
615
616         dmaster_defer_setup(ctdb_db, hdr, key);
617
618         /* fetch the current record */
619         ret = ctdb_ltdb_lock_fetch_requeue(ctdb_db, key, &header, hdr, &data2,
620                                            ctdb_call_input_pkt, ctdb, false);
621         if (ret == -1) {
622                 ctdb_fatal(ctdb, "ctdb_req_dmaster failed to fetch record");
623                 return;
624         }
625         if (ret == -2) {
626                 DEBUG(DEBUG_INFO,(__location__ " deferring ctdb_request_dmaster\n"));
627                 return;
628         }
629
630         if (ctdb_lmaster(ctdb, &key) != ctdb->pnn) {
631                 DEBUG(DEBUG_ALERT,("pnn %u dmaster request to non-lmaster lmaster=%u gen=%u curgen=%u\n",
632                          ctdb->pnn, ctdb_lmaster(ctdb, &key), 
633                          hdr->generation, ctdb->vnn_map->generation));
634                 ctdb_fatal(ctdb, "ctdb_req_dmaster to non-lmaster");
635         }
636
637         DEBUG(DEBUG_DEBUG,("pnn %u dmaster request on %08x for %u from %u\n", 
638                  ctdb->pnn, ctdb_hash(&key), c->dmaster, c->hdr.srcnode));
639
640         /* its a protocol error if the sending node is not the current dmaster */
641         if (header.dmaster != hdr->srcnode) {
642                 DEBUG(DEBUG_ALERT,("pnn %u dmaster request for new-dmaster %u from non-master %u real-dmaster=%u key %08x dbid 0x%08x gen=%u curgen=%u c->rsn=%llu header.rsn=%llu reqid=%u keyval=0x%08x\n",
643                          ctdb->pnn, c->dmaster, hdr->srcnode, header.dmaster, ctdb_hash(&key),
644                          ctdb_db->db_id, hdr->generation, ctdb_db->generation,
645                          (unsigned long long)c->rsn, (unsigned long long)header.rsn, c->hdr.reqid,
646                          (key.dsize >= 4)?(*(uint32_t *)key.dptr):0));
647                 if (header.rsn != 0 || header.dmaster != ctdb->pnn) {
648                         DEBUG(DEBUG_ERR,("ctdb_req_dmaster from non-master. Force a recovery.\n"));
649
650                         ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
651                         ctdb_ltdb_unlock(ctdb_db, key);
652                         return;
653                 }
654         }
655
656         if (header.rsn > c->rsn) {
657                 DEBUG(DEBUG_ALERT,("pnn %u dmaster request with older RSN new-dmaster %u from %u real-dmaster=%u key %08x dbid 0x%08x gen=%u curgen=%u c->rsn=%llu header.rsn=%llu reqid=%u\n",
658                          ctdb->pnn, c->dmaster, hdr->srcnode, header.dmaster, ctdb_hash(&key),
659                          ctdb_db->db_id, hdr->generation, ctdb_db->generation,
660                          (unsigned long long)c->rsn, (unsigned long long)header.rsn, c->hdr.reqid));
661         }
662
663         /* use the rsn from the sending node */
664         header.rsn = c->rsn;
665
666         /* store the record flags from the sending node */
667         header.flags = record_flags;
668
669         /* check if the new dmaster is the lmaster, in which case we
670            skip the dmaster reply */
671         if (c->dmaster == ctdb->pnn) {
672                 ctdb_become_dmaster(ctdb_db, hdr, key, data, c->rsn, record_flags);
673         } else {
674                 ctdb_send_dmaster_reply(ctdb_db, &header, key, data, c->dmaster, hdr->reqid);
675
676                 ret = ctdb_ltdb_unlock(ctdb_db, key);
677                 if (ret != 0) {
678                         DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
679                 }
680         }
681 }
682
683 static void ctdb_sticky_record_timeout(struct tevent_context *ev,
684                                        struct tevent_timer *te,
685                                        struct timeval t, void *private_data)
686 {
687         struct ctdb_sticky_record *sr = talloc_get_type(private_data, 
688                                                        struct ctdb_sticky_record);
689         talloc_free(sr);
690 }
691
692 static void *ctdb_make_sticky_record_callback(void *parm, void *data)
693 {
694         if (data) {
695                 DEBUG(DEBUG_ERR,("Already have sticky record registered. Free old %p and create new %p\n", data, parm));
696                 talloc_free(data);
697         }
698         return parm;
699 }
700
701 static int
702 ctdb_make_record_sticky(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, TDB_DATA key)
703 {
704         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
705         uint32_t *k;
706         struct ctdb_sticky_record *sr;
707
708         k = ctdb_key_to_idkey(tmp_ctx, key);
709         if (k == NULL) {
710                 DEBUG(DEBUG_ERR,("Failed to allocate key for sticky record\n"));
711                 talloc_free(tmp_ctx);
712                 return -1;
713         }
714
715         sr = trbt_lookuparray32(ctdb_db->sticky_records, k[0], &k[0]);
716         if (sr != NULL) {
717                 talloc_free(tmp_ctx);
718                 return 0;
719         }
720
721         sr = talloc(ctdb_db->sticky_records, struct ctdb_sticky_record);
722         if (sr == NULL) {
723                 talloc_free(tmp_ctx);
724                 DEBUG(DEBUG_ERR,("Failed to allocate sticky record structure\n"));
725                 return -1;
726         }
727
728         sr->ctdb    = ctdb;
729         sr->ctdb_db = ctdb_db;
730         sr->pindown = NULL;
731
732         DEBUG(DEBUG_ERR,("Make record sticky for %d seconds in db %s key:0x%08x.\n",
733                          ctdb->tunable.sticky_duration,
734                          ctdb_db->db_name, ctdb_hash(&key)));
735
736         trbt_insertarray32_callback(ctdb_db->sticky_records, k[0], &k[0], ctdb_make_sticky_record_callback, sr);
737
738         tevent_add_timer(ctdb->ev, sr,
739                          timeval_current_ofs(ctdb->tunable.sticky_duration, 0),
740                          ctdb_sticky_record_timeout, sr);
741
742         talloc_free(tmp_ctx);
743         return 0;
744 }
745
746 struct pinned_down_requeue_handle {
747         struct ctdb_context *ctdb;
748         struct ctdb_req_header *hdr;
749 };
750
751 struct pinned_down_deferred_call {
752         struct ctdb_context *ctdb;
753         struct ctdb_req_header *hdr;
754 };
755
756 static void pinned_down_requeue(struct tevent_context *ev,
757                                 struct tevent_timer *te,
758                                 struct timeval t, void *private_data)
759 {
760         struct pinned_down_requeue_handle *handle = talloc_get_type(private_data, struct pinned_down_requeue_handle);
761         struct ctdb_context *ctdb = handle->ctdb;
762
763         talloc_steal(ctdb, handle->hdr);
764         ctdb_call_input_pkt(ctdb, handle->hdr);
765
766         talloc_free(handle);
767 }
768
769 static int pinned_down_destructor(struct pinned_down_deferred_call *pinned_down)
770 {
771         struct ctdb_context *ctdb = pinned_down->ctdb;
772         struct pinned_down_requeue_handle *handle = talloc(ctdb, struct pinned_down_requeue_handle);
773
774         handle->ctdb = pinned_down->ctdb;
775         handle->hdr  = pinned_down->hdr;
776         talloc_steal(handle, handle->hdr);
777
778         tevent_add_timer(ctdb->ev, handle, timeval_zero(),
779                          pinned_down_requeue, handle);
780
781         return 0;
782 }
783
784 static int
785 ctdb_defer_pinned_down_request(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, TDB_DATA key, struct ctdb_req_header *hdr)
786 {
787         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
788         uint32_t *k;
789         struct ctdb_sticky_record *sr;
790         struct pinned_down_deferred_call *pinned_down;
791
792         k = ctdb_key_to_idkey(tmp_ctx, key);
793         if (k == NULL) {
794                 DEBUG(DEBUG_ERR,("Failed to allocate key for sticky record\n"));
795                 talloc_free(tmp_ctx);
796                 return -1;
797         }
798
799         sr = trbt_lookuparray32(ctdb_db->sticky_records, k[0], &k[0]);
800         if (sr == NULL) {
801                 talloc_free(tmp_ctx);
802                 return -1;
803         }
804
805         talloc_free(tmp_ctx);
806
807         if (sr->pindown == NULL) {
808                 return -1;
809         }
810         
811         pinned_down = talloc(sr->pindown, struct pinned_down_deferred_call);
812         if (pinned_down == NULL) {
813                 DEBUG(DEBUG_ERR,("Failed to allocate structure for deferred pinned down request\n"));
814                 return -1;
815         }
816
817         pinned_down->ctdb = ctdb;
818         pinned_down->hdr  = hdr;
819
820         talloc_set_destructor(pinned_down, pinned_down_destructor);
821         talloc_steal(pinned_down, hdr);
822
823         return 0;
824 }
825
826 static void
827 ctdb_update_db_stat_hot_keys(struct ctdb_db_context *ctdb_db, TDB_DATA key, int hopcount)
828 {
829         int i, id;
830
831         /* smallest value is always at index 0 */
832         if (hopcount <= ctdb_db->statistics.hot_keys[0].count) {
833                 return;
834         }
835
836         /* see if we already know this key */
837         for (i = 0; i < MAX_HOT_KEYS; i++) {
838                 if (key.dsize != ctdb_db->statistics.hot_keys[i].key.dsize) {
839                         continue;
840                 }
841                 if (memcmp(key.dptr, ctdb_db->statistics.hot_keys[i].key.dptr, key.dsize)) {
842                         continue;
843                 }
844                 /* found an entry for this key */
845                 if (hopcount <= ctdb_db->statistics.hot_keys[i].count) {
846                         return;
847                 }
848                 ctdb_db->statistics.hot_keys[i].count = hopcount;
849                 goto sort_keys;
850         }
851
852         if (ctdb_db->statistics.num_hot_keys < MAX_HOT_KEYS) {
853                 id = ctdb_db->statistics.num_hot_keys;
854                 ctdb_db->statistics.num_hot_keys++;
855         } else {
856                 id = 0;
857         }
858
859         if (ctdb_db->statistics.hot_keys[id].key.dptr != NULL) {
860                 talloc_free(ctdb_db->statistics.hot_keys[id].key.dptr);
861         }
862         ctdb_db->statistics.hot_keys[id].key.dsize = key.dsize;
863         ctdb_db->statistics.hot_keys[id].key.dptr  = talloc_memdup(ctdb_db, key.dptr, key.dsize);
864         ctdb_db->statistics.hot_keys[id].count = hopcount;
865         DEBUG(DEBUG_NOTICE,("Updated hot key database=%s key=0x%08x id=%d hop_count=%d\n",
866                             ctdb_db->db_name, ctdb_hash(&key), id, hopcount));
867
868 sort_keys:
869         for (i = 1; i < MAX_HOT_KEYS; i++) {
870                 if (ctdb_db->statistics.hot_keys[i].count == 0) {
871                         continue;
872                 }
873                 if (ctdb_db->statistics.hot_keys[i].count < ctdb_db->statistics.hot_keys[0].count) {
874                         hopcount = ctdb_db->statistics.hot_keys[i].count;
875                         ctdb_db->statistics.hot_keys[i].count = ctdb_db->statistics.hot_keys[0].count;
876                         ctdb_db->statistics.hot_keys[0].count = hopcount;
877
878                         key = ctdb_db->statistics.hot_keys[i].key;
879                         ctdb_db->statistics.hot_keys[i].key = ctdb_db->statistics.hot_keys[0].key;
880                         ctdb_db->statistics.hot_keys[0].key = key;
881                 }
882         }
883 }
884
885 /*
886   called when a CTDB_REQ_CALL packet comes in
887 */
888 void ctdb_request_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
889 {
890         struct ctdb_req_call_old *c = (struct ctdb_req_call_old *)hdr;
891         TDB_DATA data;
892         struct ctdb_reply_call_old *r;
893         int ret, len;
894         struct ctdb_ltdb_header header;
895         struct ctdb_call *call;
896         struct ctdb_db_context *ctdb_db;
897         int tmp_count, bucket;
898
899         if (ctdb->methods == NULL) {
900                 DEBUG(DEBUG_INFO,(__location__ " Failed ctdb_request_call. Transport is DOWN\n"));
901                 return;
902         }
903
904         ctdb_db = find_ctdb_db(ctdb, c->db_id);
905         if (!ctdb_db) {
906                 ctdb_send_error(ctdb, hdr, -1,
907                                 "Unknown database in request. db_id==0x%08x",
908                                 c->db_id);
909                 return;
910         }
911
912         if (hdr->generation != ctdb_db->generation) {
913                 DEBUG(DEBUG_DEBUG,
914                       ("ctdb operation %u request %u from node %u to %u had an"
915                        " invalid generation:%u while our generation is:%u\n",
916                        hdr->operation, hdr->reqid, hdr->srcnode, hdr->destnode,
917                        hdr->generation, ctdb_db->generation));
918                 return;
919         }
920
921         call = talloc(hdr, struct ctdb_call);
922         CTDB_NO_MEMORY_FATAL(ctdb, call);
923
924         call->call_id  = c->callid;
925         call->key.dptr = c->data;
926         call->key.dsize = c->keylen;
927         call->call_data.dptr = c->data + c->keylen;
928         call->call_data.dsize = c->calldatalen;
929         call->reply_data.dptr  = NULL;
930         call->reply_data.dsize = 0;
931
932
933         /* If this record is pinned down we should defer the
934            request until the pindown times out
935         */
936         if (ctdb_db->sticky) {
937                 if (ctdb_defer_pinned_down_request(ctdb, ctdb_db, call->key, hdr) == 0) {
938                         DEBUG(DEBUG_WARNING,
939                               ("Defer request for pinned down record in %s\n", ctdb_db->db_name));
940                         talloc_free(call);
941                         return;
942                 }
943         }
944
945         if (dmaster_defer_add(ctdb_db, hdr, call->key) == 0) {
946                 talloc_free(call);
947                 return;
948         }
949
950         /* determine if we are the dmaster for this key. This also
951            fetches the record data (if any), thus avoiding a 2nd fetch of the data 
952            if the call will be answered locally */
953
954         ret = ctdb_ltdb_lock_fetch_requeue(ctdb_db, call->key, &header, hdr, &data,
955                                            ctdb_call_input_pkt, ctdb, false);
956         if (ret == -1) {
957                 ctdb_send_error(ctdb, hdr, ret, "ltdb fetch failed in ctdb_request_call");
958                 talloc_free(call);
959                 return;
960         }
961         if (ret == -2) {
962                 DEBUG(DEBUG_INFO,(__location__ " deferred ctdb_request_call\n"));
963                 talloc_free(call);
964                 return;
965         }
966
967         /* Dont do READONLY if we dont have a tracking database */
968         if ((c->flags & CTDB_WANT_READONLY) && !ctdb_db->readonly) {
969                 c->flags &= ~CTDB_WANT_READONLY;
970         }
971
972         if (header.flags & CTDB_REC_RO_REVOKE_COMPLETE) {
973                 header.flags &= ~CTDB_REC_RO_FLAGS;
974                 CTDB_INCREMENT_STAT(ctdb, total_ro_revokes);
975                 CTDB_INCREMENT_DB_STAT(ctdb_db, db_ro_revokes);
976                 if (ctdb_ltdb_store(ctdb_db, call->key, &header, data) != 0) {
977                         ctdb_fatal(ctdb, "Failed to write header with cleared REVOKE flag");
978                 }
979                 /* and clear out the tracking data */
980                 if (tdb_delete(ctdb_db->rottdb, call->key) != 0) {
981                         DEBUG(DEBUG_ERR,(__location__ " Failed to clear out trackingdb record\n"));
982                 }
983         }
984
985         /* if we are revoking, we must defer all other calls until the revoke
986          * had completed.
987          */
988         if (header.flags & CTDB_REC_RO_REVOKING_READONLY) {
989                 talloc_free(data.dptr);
990                 ret = ctdb_ltdb_unlock(ctdb_db, call->key);
991
992                 if (ctdb_add_revoke_deferred_call(ctdb, ctdb_db, call->key, hdr, ctdb_call_input_pkt, ctdb) != 0) {
993                         ctdb_fatal(ctdb, "Failed to add deferred call for revoke child");
994                 }
995                 talloc_free(call);
996                 return;
997         }
998
999         /*
1000          * If we are not the dmaster and are not hosting any delegations,
1001          * then we redirect the request to the node than can answer it
1002          * (the lmaster or the dmaster).
1003          */
1004         if ((header.dmaster != ctdb->pnn) 
1005             && (!(header.flags & CTDB_REC_RO_HAVE_DELEGATIONS)) ) {
1006                 talloc_free(data.dptr);
1007                 ctdb_call_send_redirect(ctdb, ctdb_db, call->key, c, &header);
1008
1009                 ret = ctdb_ltdb_unlock(ctdb_db, call->key);
1010                 if (ret != 0) {
1011                         DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
1012                 }
1013                 talloc_free(call);
1014                 return;
1015         }
1016
1017         if ( (!(c->flags & CTDB_WANT_READONLY))
1018         && (header.flags & (CTDB_REC_RO_HAVE_DELEGATIONS|CTDB_REC_RO_HAVE_READONLY)) ) {
1019                 header.flags   |= CTDB_REC_RO_REVOKING_READONLY;
1020                 if (ctdb_ltdb_store(ctdb_db, call->key, &header, data) != 0) {
1021                         ctdb_fatal(ctdb, "Failed to store record with HAVE_DELEGATIONS set");
1022                 }
1023                 ret = ctdb_ltdb_unlock(ctdb_db, call->key);
1024
1025                 if (ctdb_start_revoke_ro_record(ctdb, ctdb_db, call->key, &header, data) != 0) {
1026                         ctdb_fatal(ctdb, "Failed to start record revoke");
1027                 }
1028                 talloc_free(data.dptr);
1029
1030                 if (ctdb_add_revoke_deferred_call(ctdb, ctdb_db, call->key, hdr, ctdb_call_input_pkt, ctdb) != 0) {
1031                         ctdb_fatal(ctdb, "Failed to add deferred call for revoke child");
1032                 }
1033                 talloc_free(call);
1034
1035                 return;
1036         }               
1037
1038         /* If this is the first request for delegation. bump rsn and set
1039          * the delegations flag
1040          */
1041         if ((c->flags & CTDB_WANT_READONLY)
1042         &&  (c->callid == CTDB_FETCH_WITH_HEADER_FUNC)
1043         &&  (!(header.flags & CTDB_REC_RO_HAVE_DELEGATIONS))) {
1044                 header.rsn     += 3;
1045                 header.flags   |= CTDB_REC_RO_HAVE_DELEGATIONS;
1046                 if (ctdb_ltdb_store(ctdb_db, call->key, &header, data) != 0) {
1047                         ctdb_fatal(ctdb, "Failed to store record with HAVE_DELEGATIONS set");
1048                 }
1049         }
1050         if ((c->flags & CTDB_WANT_READONLY) 
1051         &&  (call->call_id == CTDB_FETCH_WITH_HEADER_FUNC)) {
1052                 TDB_DATA tdata;
1053
1054                 tdata = tdb_fetch(ctdb_db->rottdb, call->key);
1055                 if (ctdb_trackingdb_add_pnn(ctdb, &tdata, c->hdr.srcnode) != 0) {
1056                         ctdb_fatal(ctdb, "Failed to add node to trackingdb");
1057                 }
1058                 if (tdb_store(ctdb_db->rottdb, call->key, tdata, TDB_REPLACE) != 0) {
1059                         ctdb_fatal(ctdb, "Failed to store trackingdb data");
1060                 }
1061                 free(tdata.dptr);
1062
1063                 ret = ctdb_ltdb_unlock(ctdb_db, call->key);
1064                 if (ret != 0) {
1065                         DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
1066                 }
1067
1068                 len = offsetof(struct ctdb_reply_call_old, data) + data.dsize + sizeof(struct ctdb_ltdb_header);
1069                 r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REPLY_CALL, len, 
1070                                             struct ctdb_reply_call_old);
1071                 CTDB_NO_MEMORY_FATAL(ctdb, r);
1072                 r->hdr.destnode  = c->hdr.srcnode;
1073                 r->hdr.reqid     = c->hdr.reqid;
1074                 r->hdr.generation = ctdb_db->generation;
1075                 r->status        = 0;
1076                 r->datalen       = data.dsize + sizeof(struct ctdb_ltdb_header);
1077                 header.rsn      -= 2;
1078                 header.flags   |= CTDB_REC_RO_HAVE_READONLY;
1079                 header.flags   &= ~CTDB_REC_RO_HAVE_DELEGATIONS;
1080                 memcpy(&r->data[0], &header, sizeof(struct ctdb_ltdb_header));
1081
1082                 if (data.dsize) {
1083                         memcpy(&r->data[sizeof(struct ctdb_ltdb_header)], data.dptr, data.dsize);
1084                 }
1085
1086                 ctdb_queue_packet(ctdb, &r->hdr);
1087                 CTDB_INCREMENT_STAT(ctdb, total_ro_delegations);
1088                 CTDB_INCREMENT_DB_STAT(ctdb_db, db_ro_delegations);
1089
1090                 talloc_free(r);
1091                 talloc_free(call);
1092                 return;
1093         }
1094
1095         CTDB_UPDATE_STAT(ctdb, max_hop_count, c->hopcount);
1096         tmp_count = c->hopcount;
1097         bucket = 0;
1098         while (tmp_count) {
1099                 tmp_count >>= 2;
1100                 bucket++;
1101         }
1102         if (bucket >= MAX_COUNT_BUCKETS) {
1103                 bucket = MAX_COUNT_BUCKETS - 1;
1104         }
1105         CTDB_INCREMENT_STAT(ctdb, hop_count_bucket[bucket]);
1106         CTDB_INCREMENT_DB_STAT(ctdb_db, hop_count_bucket[bucket]);
1107         ctdb_update_db_stat_hot_keys(ctdb_db, call->key, c->hopcount);
1108
1109         /* If this database supports sticky records, then check if the
1110            hopcount is big. If it is it means the record is hot and we
1111            should make it sticky.
1112         */
1113         if (ctdb_db->sticky && c->hopcount >= ctdb->tunable.hopcount_make_sticky) {
1114                 ctdb_make_record_sticky(ctdb, ctdb_db, call->key);
1115         }
1116
1117
1118         /* Try if possible to migrate the record off to the caller node.
1119          * From the clients perspective a fetch of the data is just as 
1120          * expensive as a migration.
1121          */
1122         if (c->hdr.srcnode != ctdb->pnn) {
1123                 if (ctdb_db->persistent_state) {
1124                         DEBUG(DEBUG_INFO, (__location__ " refusing migration"
1125                               " of key %s while transaction is active\n",
1126                               (char *)call->key.dptr));
1127                 } else {
1128                         DEBUG(DEBUG_DEBUG,("pnn %u starting migration of %08x to %u\n",
1129                                  ctdb->pnn, ctdb_hash(&(call->key)), c->hdr.srcnode));
1130                         ctdb_call_send_dmaster(ctdb_db, c, &header, &(call->key), &data);
1131                         talloc_free(data.dptr);
1132
1133                         ret = ctdb_ltdb_unlock(ctdb_db, call->key);
1134                         if (ret != 0) {
1135                                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
1136                         }
1137                 }
1138                 talloc_free(call);
1139                 return;
1140         }
1141
1142         ret = ctdb_call_local(ctdb_db, call, &header, hdr, &data, true);
1143         if (ret != 0) {
1144                 DEBUG(DEBUG_ERR,(__location__ " ctdb_call_local failed\n"));
1145                 call->status = -1;
1146         }
1147
1148         ret = ctdb_ltdb_unlock(ctdb_db, call->key);
1149         if (ret != 0) {
1150                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
1151         }
1152
1153         len = offsetof(struct ctdb_reply_call_old, data) + call->reply_data.dsize;
1154         r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REPLY_CALL, len, 
1155                                     struct ctdb_reply_call_old);
1156         CTDB_NO_MEMORY_FATAL(ctdb, r);
1157         r->hdr.destnode  = hdr->srcnode;
1158         r->hdr.reqid     = hdr->reqid;
1159         r->hdr.generation = ctdb_db->generation;
1160         r->status        = call->status;
1161         r->datalen       = call->reply_data.dsize;
1162         if (call->reply_data.dsize) {
1163                 memcpy(&r->data[0], call->reply_data.dptr, call->reply_data.dsize);
1164         }
1165
1166         ctdb_queue_packet(ctdb, &r->hdr);
1167
1168         talloc_free(r);
1169         talloc_free(call);
1170 }
1171
1172 /**
1173  * called when a CTDB_REPLY_CALL packet comes in
1174  *
1175  * This packet comes in response to a CTDB_REQ_CALL request packet. It
1176  * contains any reply data from the call
1177  */
1178 void ctdb_reply_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
1179 {
1180         struct ctdb_reply_call_old *c = (struct ctdb_reply_call_old *)hdr;
1181         struct ctdb_call_state *state;
1182
1183         state = reqid_find(ctdb->idr, hdr->reqid, struct ctdb_call_state);
1184         if (state == NULL) {
1185                 DEBUG(DEBUG_ERR, (__location__ " reqid %u not found\n", hdr->reqid));
1186                 return;
1187         }
1188
1189         if (hdr->reqid != state->reqid) {
1190                 /* we found a record  but it was the wrong one */
1191                 DEBUG(DEBUG_ERR, ("Dropped orphaned call reply with reqid:%u\n",hdr->reqid));
1192                 return;
1193         }
1194
1195         if (hdr->generation != state->generation) {
1196                 DEBUG(DEBUG_DEBUG,
1197                       ("ctdb operation %u request %u from node %u to %u had an"
1198                        " invalid generation:%u while our generation is:%u\n",
1199                        hdr->operation, hdr->reqid, hdr->srcnode, hdr->destnode,
1200                        hdr->generation, state->generation));
1201                 return;
1202         }
1203
1204
1205         /* read only delegation processing */
1206         /* If we got a FETCH_WITH_HEADER we should check if this is a ro
1207          * delegation since we may need to update the record header
1208          */
1209         if (state->c->callid == CTDB_FETCH_WITH_HEADER_FUNC) {
1210                 struct ctdb_db_context *ctdb_db = state->ctdb_db;
1211                 struct ctdb_ltdb_header *header = (struct ctdb_ltdb_header *)&c->data[0];
1212                 struct ctdb_ltdb_header oldheader;
1213                 TDB_DATA key, data, olddata;
1214                 int ret;
1215
1216                 if (!(header->flags & CTDB_REC_RO_HAVE_READONLY)) {
1217                         goto finished_ro;
1218                         return;
1219                 }
1220
1221                 key.dsize = state->c->keylen;
1222                 key.dptr  = state->c->data;
1223                 ret = ctdb_ltdb_lock_requeue(ctdb_db, key, hdr,
1224                                      ctdb_call_input_pkt, ctdb, false);
1225                 if (ret == -2) {
1226                         return;
1227                 }
1228                 if (ret != 0) {
1229                         DEBUG(DEBUG_ERR,(__location__ " Failed to get lock in ctdb_reply_call\n"));
1230                         return;
1231                 }
1232
1233                 ret = ctdb_ltdb_fetch(ctdb_db, key, &oldheader, state, &olddata);
1234                 if (ret != 0) {
1235                         DEBUG(DEBUG_ERR, ("Failed to fetch old record in ctdb_reply_call\n"));
1236                         ctdb_ltdb_unlock(ctdb_db, key);
1237                         goto finished_ro;
1238                 }                       
1239
1240                 if (header->rsn <= oldheader.rsn) {
1241                         ctdb_ltdb_unlock(ctdb_db, key);
1242                         goto finished_ro;
1243                 }
1244
1245                 if (c->datalen < sizeof(struct ctdb_ltdb_header)) {
1246                         DEBUG(DEBUG_ERR,(__location__ " Got FETCH_WITH_HEADER reply with too little data: %d bytes\n", c->datalen));
1247                         ctdb_ltdb_unlock(ctdb_db, key);
1248                         goto finished_ro;
1249                 }
1250
1251                 data.dsize = c->datalen - sizeof(struct ctdb_ltdb_header);
1252                 data.dptr  = &c->data[sizeof(struct ctdb_ltdb_header)];
1253                 ret = ctdb_ltdb_store(ctdb_db, key, header, data);
1254                 if (ret != 0) {
1255                         DEBUG(DEBUG_ERR, ("Failed to store new record in ctdb_reply_call\n"));
1256                         ctdb_ltdb_unlock(ctdb_db, key);
1257                         goto finished_ro;
1258                 }                       
1259
1260                 ctdb_ltdb_unlock(ctdb_db, key);
1261         }
1262 finished_ro:
1263
1264         state->call->reply_data.dptr = c->data;
1265         state->call->reply_data.dsize = c->datalen;
1266         state->call->status = c->status;
1267
1268         talloc_steal(state, c);
1269
1270         state->state = CTDB_CALL_DONE;
1271         if (state->async.fn) {
1272                 state->async.fn(state);
1273         }
1274 }
1275
1276
1277 /**
1278  * called when a CTDB_REPLY_DMASTER packet comes in
1279  *
1280  * This packet comes in from the lmaster in response to a CTDB_REQ_CALL
1281  * request packet. It means that the current dmaster wants to give us
1282  * the dmaster role.
1283  */
1284 void ctdb_reply_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
1285 {
1286         struct ctdb_reply_dmaster_old *c = (struct ctdb_reply_dmaster_old *)hdr;
1287         struct ctdb_db_context *ctdb_db;
1288         TDB_DATA key, data;
1289         uint32_t record_flags = 0;
1290         size_t len;
1291         int ret;
1292
1293         ctdb_db = find_ctdb_db(ctdb, c->db_id);
1294         if (ctdb_db == NULL) {
1295                 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_reply_dmaster\n", c->db_id));
1296                 return;
1297         }
1298
1299         if (hdr->generation != ctdb_db->generation) {
1300                 DEBUG(DEBUG_DEBUG,
1301                       ("ctdb operation %u request %u from node %u to %u had an"
1302                        " invalid generation:%u while our generation is:%u\n",
1303                        hdr->operation, hdr->reqid, hdr->srcnode, hdr->destnode,
1304                        hdr->generation, ctdb_db->generation));
1305                 return;
1306         }
1307
1308         key.dptr = c->data;
1309         key.dsize = c->keylen;
1310         data.dptr = &c->data[key.dsize];
1311         data.dsize = c->datalen;
1312         len = offsetof(struct ctdb_reply_dmaster_old, data) + key.dsize + data.dsize
1313                 + sizeof(uint32_t);
1314         if (len <= c->hdr.length) {
1315                 memcpy(&record_flags, &c->data[c->keylen + c->datalen],
1316                        sizeof(record_flags));
1317         }
1318
1319         dmaster_defer_setup(ctdb_db, hdr, key);
1320
1321         ret = ctdb_ltdb_lock_requeue(ctdb_db, key, hdr,
1322                                      ctdb_call_input_pkt, ctdb, false);
1323         if (ret == -2) {
1324                 return;
1325         }
1326         if (ret != 0) {
1327                 DEBUG(DEBUG_ERR,(__location__ " Failed to get lock in ctdb_reply_dmaster\n"));
1328                 return;
1329         }
1330
1331         ctdb_become_dmaster(ctdb_db, hdr, key, data, c->rsn, record_flags);
1332 }
1333
1334
1335 /*
1336   called when a CTDB_REPLY_ERROR packet comes in
1337 */
1338 void ctdb_reply_error(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
1339 {
1340         struct ctdb_reply_error_old *c = (struct ctdb_reply_error_old *)hdr;
1341         struct ctdb_call_state *state;
1342
1343         state = reqid_find(ctdb->idr, hdr->reqid, struct ctdb_call_state);
1344         if (state == NULL) {
1345                 DEBUG(DEBUG_ERR,("pnn %u Invalid reqid %u in ctdb_reply_error\n",
1346                          ctdb->pnn, hdr->reqid));
1347                 return;
1348         }
1349
1350         if (hdr->reqid != state->reqid) {
1351                 /* we found a record  but it was the wrong one */
1352                 DEBUG(DEBUG_ERR, ("Dropped orphaned error reply with reqid:%u\n",hdr->reqid));
1353                 return;
1354         }
1355
1356         talloc_steal(state, c);
1357
1358         state->state  = CTDB_CALL_ERROR;
1359         state->errmsg = (char *)c->msg;
1360         if (state->async.fn) {
1361                 state->async.fn(state);
1362         }
1363 }
1364
1365
1366 /*
1367   destroy a ctdb_call
1368 */
1369 static int ctdb_call_destructor(struct ctdb_call_state *state)
1370 {
1371         DLIST_REMOVE(state->ctdb_db->pending_calls, state);
1372         reqid_remove(state->ctdb_db->ctdb->idr, state->reqid);
1373         return 0;
1374 }
1375
1376
1377 /*
1378   called when a ctdb_call needs to be resent after a reconfigure event
1379 */
1380 static void ctdb_call_resend(struct ctdb_call_state *state)
1381 {
1382         struct ctdb_context *ctdb = state->ctdb_db->ctdb;
1383
1384         state->generation = state->ctdb_db->generation;
1385
1386         /* use a new reqid, in case the old reply does eventually come in */
1387         reqid_remove(ctdb->idr, state->reqid);
1388         state->reqid = reqid_new(ctdb->idr, state);
1389         state->c->hdr.reqid = state->reqid;
1390
1391         /* update the generation count for this request, so its valid with the new vnn_map */
1392         state->c->hdr.generation = state->generation;
1393
1394         /* send the packet to ourselves, it will be redirected appropriately */
1395         state->c->hdr.destnode = ctdb->pnn;
1396
1397         ctdb_queue_packet(ctdb, &state->c->hdr);
1398         DEBUG(DEBUG_NOTICE,("resent ctdb_call for db %s reqid %u generation %u\n",
1399                             state->ctdb_db->db_name, state->reqid, state->generation));
1400 }
1401
1402 /*
1403   resend all pending calls on recovery
1404  */
1405 void ctdb_call_resend_db(struct ctdb_db_context *ctdb_db)
1406 {
1407         struct ctdb_call_state *state, *next;
1408
1409         for (state = ctdb_db->pending_calls; state; state = next) {
1410                 next = state->next;
1411                 ctdb_call_resend(state);
1412         }
1413 }
1414
1415 void ctdb_call_resend_all(struct ctdb_context *ctdb)
1416 {
1417         struct ctdb_db_context *ctdb_db;
1418
1419         for (ctdb_db = ctdb->db_list; ctdb_db; ctdb_db = ctdb_db->next) {
1420                 ctdb_call_resend_db(ctdb_db);
1421         }
1422 }
1423
1424 /*
1425   this allows the caller to setup a async.fn 
1426 */
1427 static void call_local_trigger(struct tevent_context *ev,
1428                                struct tevent_timer *te,
1429                                struct timeval t, void *private_data)
1430 {
1431         struct ctdb_call_state *state = talloc_get_type(private_data, struct ctdb_call_state);
1432         if (state->async.fn) {
1433                 state->async.fn(state);
1434         }
1435 }       
1436
1437
1438 /*
1439   construct an event driven local ctdb_call
1440
1441   this is used so that locally processed ctdb_call requests are processed
1442   in an event driven manner
1443 */
1444 struct ctdb_call_state *ctdb_call_local_send(struct ctdb_db_context *ctdb_db, 
1445                                              struct ctdb_call *call,
1446                                              struct ctdb_ltdb_header *header,
1447                                              TDB_DATA *data)
1448 {
1449         struct ctdb_call_state *state;
1450         struct ctdb_context *ctdb = ctdb_db->ctdb;
1451         int ret;
1452
1453         state = talloc_zero(ctdb_db, struct ctdb_call_state);
1454         CTDB_NO_MEMORY_NULL(ctdb, state);
1455
1456         talloc_steal(state, data->dptr);
1457
1458         state->state = CTDB_CALL_DONE;
1459         state->call  = talloc(state, struct ctdb_call);
1460         CTDB_NO_MEMORY_NULL(ctdb, state->call);
1461         *(state->call) = *call;
1462         state->ctdb_db = ctdb_db;
1463
1464         ret = ctdb_call_local(ctdb_db, state->call, header, state, data, true);
1465         if (ret != 0) {
1466                 DEBUG(DEBUG_DEBUG,("ctdb_call_local() failed, ignoring return code %d\n", ret));
1467         }
1468
1469         tevent_add_timer(ctdb->ev, state, timeval_zero(),
1470                          call_local_trigger, state);
1471
1472         return state;
1473 }
1474
1475
1476 /*
1477   make a remote ctdb call - async send. Called in daemon context.
1478
1479   This constructs a ctdb_call request and queues it for processing. 
1480   This call never blocks.
1481 */
1482 struct ctdb_call_state *ctdb_daemon_call_send_remote(struct ctdb_db_context *ctdb_db, 
1483                                                      struct ctdb_call *call, 
1484                                                      struct ctdb_ltdb_header *header)
1485 {
1486         uint32_t len;
1487         struct ctdb_call_state *state;
1488         struct ctdb_context *ctdb = ctdb_db->ctdb;
1489
1490         if (ctdb->methods == NULL) {
1491                 DEBUG(DEBUG_INFO,(__location__ " Failed send packet. Transport is down\n"));
1492                 return NULL;
1493         }
1494
1495         state = talloc_zero(ctdb_db, struct ctdb_call_state);
1496         CTDB_NO_MEMORY_NULL(ctdb, state);
1497         state->call = talloc(state, struct ctdb_call);
1498         CTDB_NO_MEMORY_NULL(ctdb, state->call);
1499
1500         state->reqid = reqid_new(ctdb->idr, state);
1501         state->ctdb_db = ctdb_db;
1502         talloc_set_destructor(state, ctdb_call_destructor);
1503
1504         len = offsetof(struct ctdb_req_call_old, data) + call->key.dsize + call->call_data.dsize;
1505         state->c = ctdb_transport_allocate(ctdb, state, CTDB_REQ_CALL, len, 
1506                                            struct ctdb_req_call_old);
1507         CTDB_NO_MEMORY_NULL(ctdb, state->c);
1508         state->c->hdr.destnode  = header->dmaster;
1509
1510         /* this limits us to 16k outstanding messages - not unreasonable */
1511         state->c->hdr.reqid     = state->reqid;
1512         state->c->hdr.generation = ctdb_db->generation;
1513         state->c->flags         = call->flags;
1514         state->c->db_id         = ctdb_db->db_id;
1515         state->c->callid        = call->call_id;
1516         state->c->hopcount      = 0;
1517         state->c->keylen        = call->key.dsize;
1518         state->c->calldatalen   = call->call_data.dsize;
1519         memcpy(&state->c->data[0], call->key.dptr, call->key.dsize);
1520         memcpy(&state->c->data[call->key.dsize], 
1521                call->call_data.dptr, call->call_data.dsize);
1522         *(state->call)              = *call;
1523         state->call->call_data.dptr = &state->c->data[call->key.dsize];
1524         state->call->key.dptr       = &state->c->data[0];
1525
1526         state->state  = CTDB_CALL_WAIT;
1527         state->generation = ctdb_db->generation;
1528
1529         DLIST_ADD(ctdb_db->pending_calls, state);
1530
1531         ctdb_queue_packet(ctdb, &state->c->hdr);
1532
1533         return state;
1534 }
1535
1536 /*
1537   make a remote ctdb call - async recv - called in daemon context
1538
1539   This is called when the program wants to wait for a ctdb_call to complete and get the 
1540   results. This call will block unless the call has already completed.
1541 */
1542 int ctdb_daemon_call_recv(struct ctdb_call_state *state, struct ctdb_call *call)
1543 {
1544         while (state->state < CTDB_CALL_DONE) {
1545                 tevent_loop_once(state->ctdb_db->ctdb->ev);
1546         }
1547         if (state->state != CTDB_CALL_DONE) {
1548                 ctdb_set_error(state->ctdb_db->ctdb, "%s", state->errmsg);
1549                 talloc_free(state);
1550                 return -1;
1551         }
1552
1553         if (state->call->reply_data.dsize) {
1554                 call->reply_data.dptr = talloc_memdup(call,
1555                                                       state->call->reply_data.dptr,
1556                                                       state->call->reply_data.dsize);
1557                 call->reply_data.dsize = state->call->reply_data.dsize;
1558         } else {
1559                 call->reply_data.dptr = NULL;
1560                 call->reply_data.dsize = 0;
1561         }
1562         call->status = state->call->status;
1563         talloc_free(state);
1564         return 0;
1565 }
1566
1567
1568 /* 
1569    send a keepalive packet to the other node
1570 */
1571 void ctdb_send_keepalive(struct ctdb_context *ctdb, uint32_t destnode)
1572 {
1573         struct ctdb_req_keepalive *r;
1574         
1575         if (ctdb->methods == NULL) {
1576                 DEBUG(DEBUG_INFO,(__location__ " Failed to send keepalive. Transport is DOWN\n"));
1577                 return;
1578         }
1579
1580         r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REQ_KEEPALIVE,
1581                                     sizeof(struct ctdb_req_keepalive), 
1582                                     struct ctdb_req_keepalive);
1583         CTDB_NO_MEMORY_FATAL(ctdb, r);
1584         r->hdr.destnode  = destnode;
1585         r->hdr.reqid     = 0;
1586         
1587         CTDB_INCREMENT_STAT(ctdb, keepalive_packets_sent);
1588
1589         ctdb_queue_packet(ctdb, &r->hdr);
1590
1591         talloc_free(r);
1592 }
1593
1594
1595
1596 struct revokechild_deferred_call {
1597         struct ctdb_context *ctdb;
1598         struct ctdb_req_header *hdr;
1599         deferred_requeue_fn fn;
1600         void *ctx;
1601 };
1602
1603 struct revokechild_handle {
1604         struct revokechild_handle *next, *prev;
1605         struct ctdb_context *ctdb;
1606         struct ctdb_db_context *ctdb_db;
1607         struct tevent_fd *fde;
1608         int status;
1609         int fd[2];
1610         pid_t child;
1611         TDB_DATA key;
1612 };
1613
1614 struct revokechild_requeue_handle {
1615         struct ctdb_context *ctdb;
1616         struct ctdb_req_header *hdr;
1617         deferred_requeue_fn fn;
1618         void *ctx;
1619 };
1620
1621 static void deferred_call_requeue(struct tevent_context *ev,
1622                                   struct tevent_timer *te,
1623                                   struct timeval t, void *private_data)
1624 {
1625         struct revokechild_requeue_handle *requeue_handle = talloc_get_type(private_data, struct revokechild_requeue_handle);
1626
1627         requeue_handle->fn(requeue_handle->ctx, requeue_handle->hdr);
1628         talloc_free(requeue_handle);
1629 }
1630
1631 static int deferred_call_destructor(struct revokechild_deferred_call *deferred_call)
1632 {
1633         struct ctdb_context *ctdb = deferred_call->ctdb;
1634         struct revokechild_requeue_handle *requeue_handle = talloc(ctdb, struct revokechild_requeue_handle);
1635         struct ctdb_req_call_old *c = (struct ctdb_req_call_old *)deferred_call->hdr;
1636
1637         requeue_handle->ctdb = ctdb;
1638         requeue_handle->hdr  = deferred_call->hdr;
1639         requeue_handle->fn   = deferred_call->fn;
1640         requeue_handle->ctx  = deferred_call->ctx;
1641         talloc_steal(requeue_handle, requeue_handle->hdr);
1642
1643         /* when revoking, any READONLY requests have 1 second grace to let read/write finish first */
1644         tevent_add_timer(ctdb->ev, requeue_handle,
1645                          timeval_current_ofs(c->flags & CTDB_WANT_READONLY ? 1 : 0, 0),
1646                          deferred_call_requeue, requeue_handle);
1647
1648         return 0;
1649 }
1650
1651
1652 static int revokechild_destructor(struct revokechild_handle *rc)
1653 {
1654         if (rc->fde != NULL) {
1655                 talloc_free(rc->fde);
1656         }
1657
1658         if (rc->fd[0] != -1) {
1659                 close(rc->fd[0]);
1660         }
1661         if (rc->fd[1] != -1) {
1662                 close(rc->fd[1]);
1663         }
1664         ctdb_kill(rc->ctdb, rc->child, SIGKILL);
1665
1666         DLIST_REMOVE(rc->ctdb_db->revokechild_active, rc);
1667         return 0;
1668 }
1669
1670 static void revokechild_handler(struct tevent_context *ev,
1671                                 struct tevent_fd *fde,
1672                                 uint16_t flags, void *private_data)
1673 {
1674         struct revokechild_handle *rc = talloc_get_type(private_data, 
1675                                                      struct revokechild_handle);
1676         int ret;
1677         char c;
1678
1679         ret = sys_read(rc->fd[0], &c, 1);
1680         if (ret != 1) {
1681                 DEBUG(DEBUG_ERR,("Failed to read status from revokechild. errno:%d\n", errno));
1682                 rc->status = -1;
1683                 talloc_free(rc);
1684                 return;
1685         }
1686         if (c != 0) {
1687                 DEBUG(DEBUG_ERR,("revokechild returned failure. status:%d\n", c));
1688                 rc->status = -1;
1689                 talloc_free(rc);
1690                 return;
1691         }
1692
1693         talloc_free(rc);
1694 }
1695
1696 struct ctdb_revoke_state {
1697         struct ctdb_db_context *ctdb_db;
1698         TDB_DATA key;
1699         struct ctdb_ltdb_header *header;
1700         TDB_DATA data;
1701         int count;
1702         int status;
1703         int finished;
1704 };
1705
1706 static void update_record_cb(struct ctdb_client_control_state *state)
1707 {
1708         struct ctdb_revoke_state *revoke_state;
1709         int ret;
1710         int32_t res;
1711
1712         if (state == NULL) {
1713                 return;
1714         }
1715         revoke_state = state->async.private_data;
1716
1717         state->async.fn = NULL;
1718         ret = ctdb_control_recv(state->ctdb, state, state, NULL, &res, NULL);
1719         if ((ret != 0) || (res != 0)) {
1720                 DEBUG(DEBUG_ERR,("Recv for revoke update record failed ret:%d res:%d\n", ret, res));
1721                 revoke_state->status = -1;
1722         }
1723
1724         revoke_state->count--;
1725         if (revoke_state->count <= 0) {
1726                 revoke_state->finished = 1;
1727         }
1728 }
1729
1730 static void revoke_send_cb(struct ctdb_context *ctdb, uint32_t pnn, void *private_data)
1731 {
1732         struct ctdb_revoke_state *revoke_state = private_data;
1733         struct ctdb_client_control_state *state;
1734
1735         state = ctdb_ctrl_updaterecord_send(ctdb, revoke_state, timeval_current_ofs(ctdb->tunable.control_timeout,0), pnn, revoke_state->ctdb_db, revoke_state->key, revoke_state->header, revoke_state->data);
1736         if (state == NULL) {
1737                 DEBUG(DEBUG_ERR,("Failure to send update record to revoke readonly delegation\n"));
1738                 revoke_state->status = -1;
1739                 return;
1740         }
1741         state->async.fn           = update_record_cb;
1742         state->async.private_data = revoke_state;
1743
1744         revoke_state->count++;
1745
1746 }
1747
1748 static void ctdb_revoke_timeout_handler(struct tevent_context *ev,
1749                                         struct tevent_timer *te,
1750                                         struct timeval yt, void *private_data)
1751 {
1752         struct ctdb_revoke_state *state = private_data;
1753
1754         DEBUG(DEBUG_ERR,("Timed out waiting for revoke to finish\n"));
1755         state->finished = 1;
1756         state->status   = -1;
1757 }
1758
1759 static int ctdb_revoke_all_delegations(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, TDB_DATA tdata, TDB_DATA key, struct ctdb_ltdb_header *header, TDB_DATA data)
1760 {
1761         struct ctdb_revoke_state *state = talloc_zero(ctdb, struct ctdb_revoke_state);
1762         struct ctdb_ltdb_header new_header;
1763         TDB_DATA new_data;
1764
1765         state->ctdb_db = ctdb_db;
1766         state->key     = key;
1767         state->header  = header;
1768         state->data    = data;
1769  
1770         ctdb_trackingdb_traverse(ctdb, tdata, revoke_send_cb, state);
1771
1772         tevent_add_timer(ctdb->ev, state,
1773                          timeval_current_ofs(ctdb->tunable.control_timeout, 0),
1774                          ctdb_revoke_timeout_handler, state);
1775
1776         while (state->finished == 0) {
1777                 tevent_loop_once(ctdb->ev);
1778         }
1779
1780         if (ctdb_ltdb_lock(ctdb_db, key) != 0) {
1781                 DEBUG(DEBUG_ERR,("Failed to chainlock the database in revokechild\n"));
1782                 talloc_free(state);
1783                 return -1;
1784         }
1785         if (ctdb_ltdb_fetch(ctdb_db, key, &new_header, state, &new_data) != 0) {
1786                 ctdb_ltdb_unlock(ctdb_db, key);
1787                 DEBUG(DEBUG_ERR,("Failed for fetch tdb record in revokechild\n"));
1788                 talloc_free(state);
1789                 return -1;
1790         }
1791         header->rsn++;
1792         if (new_header.rsn > header->rsn) {
1793                 ctdb_ltdb_unlock(ctdb_db, key);
1794                 DEBUG(DEBUG_ERR,("RSN too high in tdb record in revokechild\n"));
1795                 talloc_free(state);
1796                 return -1;
1797         }
1798         if ( (new_header.flags & (CTDB_REC_RO_REVOKING_READONLY|CTDB_REC_RO_HAVE_DELEGATIONS)) != (CTDB_REC_RO_REVOKING_READONLY|CTDB_REC_RO_HAVE_DELEGATIONS) ) {
1799                 ctdb_ltdb_unlock(ctdb_db, key);
1800                 DEBUG(DEBUG_ERR,("Flags are wrong in tdb record in revokechild\n"));
1801                 talloc_free(state);
1802                 return -1;
1803         }
1804
1805         /*
1806          * If revoke on all nodes succeed, revoke is complete.  Otherwise,
1807          * remove CTDB_REC_RO_REVOKING_READONLY flag and retry.
1808          */
1809         if (state->status == 0) {
1810                 new_header.rsn++;
1811                 new_header.flags |= CTDB_REC_RO_REVOKE_COMPLETE;
1812         } else {
1813                 DEBUG(DEBUG_NOTICE, ("Revoke all delegations failed, retrying.\n"));
1814                 new_header.flags &= ~CTDB_REC_RO_REVOKING_READONLY;
1815         }
1816         if (ctdb_ltdb_store(ctdb_db, key, &new_header, new_data) != 0) {
1817                 ctdb_ltdb_unlock(ctdb_db, key);
1818                 DEBUG(DEBUG_ERR,("Failed to write new record in revokechild\n"));
1819                 talloc_free(state);
1820                 return -1;
1821         }
1822         ctdb_ltdb_unlock(ctdb_db, key);
1823
1824         talloc_free(state);
1825         return 0;
1826 }
1827
1828
1829 int ctdb_start_revoke_ro_record(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, TDB_DATA key, struct ctdb_ltdb_header *header, TDB_DATA data)
1830 {
1831         TDB_DATA tdata;
1832         struct revokechild_handle *rc;
1833         pid_t parent = getpid();
1834         int ret;
1835
1836         header->flags &= ~(CTDB_REC_RO_REVOKING_READONLY|CTDB_REC_RO_HAVE_DELEGATIONS|CTDB_REC_RO_HAVE_READONLY);
1837         header->flags |= CTDB_REC_FLAG_MIGRATED_WITH_DATA;
1838         header->rsn   -= 1;
1839
1840         if ((rc = talloc_zero(ctdb_db, struct revokechild_handle)) == NULL) {
1841                 DEBUG(DEBUG_ERR,("Failed to allocate revokechild_handle\n"));
1842                 return -1;
1843         }
1844
1845         tdata = tdb_fetch(ctdb_db->rottdb, key);
1846         if (tdata.dsize > 0) {
1847                 uint8_t *tmp;
1848
1849                 tmp = tdata.dptr;
1850                 tdata.dptr = talloc_memdup(rc, tdata.dptr, tdata.dsize);
1851                 free(tmp);
1852         }
1853
1854         rc->status    = 0;
1855         rc->ctdb      = ctdb;
1856         rc->ctdb_db   = ctdb_db;
1857         rc->fd[0]     = -1;
1858         rc->fd[1]     = -1;
1859
1860         talloc_set_destructor(rc, revokechild_destructor);
1861
1862         rc->key.dsize = key.dsize;
1863         rc->key.dptr  = talloc_memdup(rc, key.dptr, key.dsize);
1864         if (rc->key.dptr == NULL) {
1865                 DEBUG(DEBUG_ERR,("Failed to allocate key for revokechild_handle\n"));
1866                 talloc_free(rc);
1867                 return -1;
1868         }
1869
1870         ret = pipe(rc->fd);
1871         if (ret != 0) {
1872                 DEBUG(DEBUG_ERR,("Failed to allocate key for revokechild_handle\n"));
1873                 talloc_free(rc);
1874                 return -1;
1875         }
1876
1877
1878         rc->child = ctdb_fork(ctdb);
1879         if (rc->child == (pid_t)-1) {
1880                 DEBUG(DEBUG_ERR,("Failed to fork child for revokechild\n"));
1881                 talloc_free(rc);
1882                 return -1;
1883         }
1884
1885         if (rc->child == 0) {
1886                 char c = 0;
1887                 close(rc->fd[0]);
1888                 debug_extra = talloc_asprintf(NULL, "revokechild-%s:", ctdb_db->db_name);
1889
1890                 ctdb_set_process_name("ctdb_revokechild");
1891                 if (switch_from_server_to_client(ctdb, "revokechild-%s", ctdb_db->db_name) != 0) {
1892                         DEBUG(DEBUG_ERR,("Failed to switch from server to client for revokechild process\n"));
1893                         c = 1;
1894                         goto child_finished;
1895                 }
1896
1897                 c = ctdb_revoke_all_delegations(ctdb, ctdb_db, tdata, key, header, data);
1898
1899 child_finished:
1900                 sys_write(rc->fd[1], &c, 1);
1901                 /* make sure we die when our parent dies */
1902                 while (ctdb_kill(ctdb, parent, 0) == 0 || errno != ESRCH) {
1903                         sleep(5);
1904                 }
1905                 _exit(0);
1906         }
1907
1908         close(rc->fd[1]);
1909         rc->fd[1] = -1;
1910         set_close_on_exec(rc->fd[0]);
1911
1912         /* This is an active revokechild child process */
1913         DLIST_ADD_END(ctdb_db->revokechild_active, rc, NULL);
1914
1915         rc->fde = tevent_add_fd(ctdb->ev, rc, rc->fd[0], TEVENT_FD_READ,
1916                                 revokechild_handler, (void *)rc);
1917         if (rc->fde == NULL) {
1918                 DEBUG(DEBUG_ERR,("Failed to set up fd event for revokechild process\n"));
1919                 talloc_free(rc);
1920         }
1921         tevent_fd_set_auto_close(rc->fde);
1922
1923         return 0;
1924 }
1925
1926 int ctdb_add_revoke_deferred_call(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, TDB_DATA key, struct ctdb_req_header *hdr, deferred_requeue_fn fn, void *call_context)
1927 {
1928         struct revokechild_handle *rc;
1929         struct revokechild_deferred_call *deferred_call;
1930
1931         for (rc = ctdb_db->revokechild_active; rc; rc = rc->next) {
1932                 if (rc->key.dsize == 0) {
1933                         continue;
1934                 }
1935                 if (rc->key.dsize != key.dsize) {
1936                         continue;
1937                 }
1938                 if (!memcmp(rc->key.dptr, key.dptr, key.dsize)) {
1939                         break;
1940                 }
1941         }
1942
1943         if (rc == NULL) {
1944                 DEBUG(DEBUG_ERR,("Failed to add deferred call to revoke list. revoke structure not found\n"));
1945                 return -1;
1946         }
1947
1948         deferred_call = talloc(rc, struct revokechild_deferred_call);
1949         if (deferred_call == NULL) {
1950                 DEBUG(DEBUG_ERR,("Failed to allocate deferred call structure for revoking record\n"));
1951                 return -1;
1952         }
1953
1954         deferred_call->ctdb = ctdb;
1955         deferred_call->hdr  = hdr;
1956         deferred_call->fn   = fn;
1957         deferred_call->ctx  = call_context;
1958
1959         talloc_set_destructor(deferred_call, deferred_call_destructor);
1960         talloc_steal(deferred_call, hdr);
1961
1962         return 0;
1963 }