ctdb: Use ctdb_wait_for_process_to_exit()
[samba.git] / ctdb / server / ctdb_call.c
1 /* 
2    ctdb_call protocol code
3
4    Copyright (C) Andrew Tridgell  2006
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19 /*
20   see http://wiki.samba.org/index.php/Samba_%26_Clustering for
21   protocol design and packet details
22 */
23 #include "replace.h"
24 #include "system/network.h"
25 #include "system/filesys.h"
26
27 #include <talloc.h>
28 #include <tevent.h>
29
30 #include "lib/util/dlinklist.h"
31 #include "lib/util/debug.h"
32 #include "lib/util/samba_util.h"
33 #include "lib/util/util_process.h"
34
35 #include "ctdb_private.h"
36 #include "ctdb_client.h"
37
38 #include "common/rb_tree.h"
39 #include "common/reqid.h"
40 #include "common/system.h"
41 #include "common/common.h"
42 #include "common/logging.h"
43
44 struct ctdb_sticky_record {
45         struct ctdb_context *ctdb;
46         struct ctdb_db_context *ctdb_db;
47         TDB_CONTEXT *pindown;
48 };
49
50 /*
51   find the ctdb_db from a db index
52  */
53  struct ctdb_db_context *find_ctdb_db(struct ctdb_context *ctdb, uint32_t id)
54 {
55         struct ctdb_db_context *ctdb_db;
56
57         for (ctdb_db=ctdb->db_list; ctdb_db; ctdb_db=ctdb_db->next) {
58                 if (ctdb_db->db_id == id) {
59                         break;
60                 }
61         }
62         return ctdb_db;
63 }
64
65 /*
66   a varient of input packet that can be used in lock requeue
67 */
68 static void ctdb_call_input_pkt(void *p, struct ctdb_req_header *hdr)
69 {
70         struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context);
71         ctdb_input_pkt(ctdb, hdr);
72 }
73
74
75 /*
76   send an error reply
77 */
78 static void ctdb_send_error(struct ctdb_context *ctdb, 
79                             struct ctdb_req_header *hdr, uint32_t status,
80                             const char *fmt, ...) PRINTF_ATTRIBUTE(4,5);
81 static void ctdb_send_error(struct ctdb_context *ctdb, 
82                             struct ctdb_req_header *hdr, uint32_t status,
83                             const char *fmt, ...)
84 {
85         va_list ap;
86         struct ctdb_reply_error_old *r;
87         char *msg;
88         int msglen, len;
89
90         if (ctdb->methods == NULL) {
91                 DEBUG(DEBUG_INFO,(__location__ " Failed to send error. Transport is DOWN\n"));
92                 return;
93         }
94
95         va_start(ap, fmt);
96         msg = talloc_vasprintf(ctdb, fmt, ap);
97         if (msg == NULL) {
98                 ctdb_fatal(ctdb, "Unable to allocate error in ctdb_send_error\n");
99         }
100         va_end(ap);
101
102         msglen = strlen(msg)+1;
103         len = offsetof(struct ctdb_reply_error_old, msg);
104         r = ctdb_transport_allocate(ctdb, msg, CTDB_REPLY_ERROR, len + msglen, 
105                                     struct ctdb_reply_error_old);
106         CTDB_NO_MEMORY_FATAL(ctdb, r);
107
108         r->hdr.destnode  = hdr->srcnode;
109         r->hdr.reqid     = hdr->reqid;
110         r->status        = status;
111         r->msglen        = msglen;
112         memcpy(&r->msg[0], msg, msglen);
113
114         ctdb_queue_packet(ctdb, &r->hdr);
115
116         talloc_free(msg);
117 }
118
119
120 /**
121  * send a redirect reply
122  *
123  * The logic behind this function is this:
124  *
125  * A client wants to grab a record and sends a CTDB_REQ_CALL packet
126  * to its local ctdb (ctdb_request_call). If the node is not itself
127  * the record's DMASTER, it first redirects the packet to  the
128  * record's LMASTER. The LMASTER then redirects the call packet to
129  * the current DMASTER. Note that this works because of this: When
130  * a record is migrated off a node, then the new DMASTER is stored
131  * in the record's copy on the former DMASTER.
132  */
133 static void ctdb_call_send_redirect(struct ctdb_context *ctdb,
134                                     struct ctdb_db_context *ctdb_db,
135                                     TDB_DATA key,
136                                     struct ctdb_req_call_old *c, 
137                                     struct ctdb_ltdb_header *header)
138 {
139         uint32_t lmaster = ctdb_lmaster(ctdb, &key);
140
141         c->hdr.destnode = lmaster;
142         if (ctdb->pnn == lmaster) {
143                 c->hdr.destnode = header->dmaster;
144         }
145         c->hopcount++;
146
147         if (c->hopcount%100 > 95) {
148                 DEBUG(DEBUG_WARNING,("High hopcount %d dbid:%s "
149                         "key:0x%08x reqid=%08x pnn:%d src:%d lmaster:%d "
150                         "header->dmaster:%d dst:%d\n",
151                         c->hopcount, ctdb_db->db_name, ctdb_hash(&key),
152                         c->hdr.reqid, ctdb->pnn, c->hdr.srcnode, lmaster,
153                         header->dmaster, c->hdr.destnode));
154         }
155
156         ctdb_queue_packet(ctdb, &c->hdr);
157 }
158
159
160 /*
161   send a dmaster reply
162
163   caller must have the chainlock before calling this routine. Caller must be
164   the lmaster
165 */
166 static void ctdb_send_dmaster_reply(struct ctdb_db_context *ctdb_db,
167                                     struct ctdb_ltdb_header *header,
168                                     TDB_DATA key, TDB_DATA data,
169                                     uint32_t new_dmaster,
170                                     uint32_t reqid)
171 {
172         struct ctdb_context *ctdb = ctdb_db->ctdb;
173         struct ctdb_reply_dmaster_old *r;
174         int ret, len;
175         TALLOC_CTX *tmp_ctx;
176
177         if (ctdb->pnn != ctdb_lmaster(ctdb, &key)) {
178                 DEBUG(DEBUG_ALERT,(__location__ " Caller is not lmaster!\n"));
179                 return;
180         }
181
182         header->dmaster = new_dmaster;
183         ret = ctdb_ltdb_store(ctdb_db, key, header, data);
184         if (ret != 0) {
185                 ctdb_fatal(ctdb, "ctdb_send_dmaster_reply unable to update dmaster");
186                 return;
187         }
188
189         if (ctdb->methods == NULL) {
190                 ctdb_fatal(ctdb, "ctdb_send_dmaster_reply cant update dmaster since transport is down");
191                 return;
192         }
193
194         /* put the packet on a temporary context, allowing us to safely free
195            it below even if ctdb_reply_dmaster() has freed it already */
196         tmp_ctx = talloc_new(ctdb);
197
198         /* send the CTDB_REPLY_DMASTER */
199         len = offsetof(struct ctdb_reply_dmaster_old, data) + key.dsize + data.dsize + sizeof(uint32_t);
200         r = ctdb_transport_allocate(ctdb, tmp_ctx, CTDB_REPLY_DMASTER, len,
201                                     struct ctdb_reply_dmaster_old);
202         CTDB_NO_MEMORY_FATAL(ctdb, r);
203
204         r->hdr.destnode  = new_dmaster;
205         r->hdr.reqid     = reqid;
206         r->hdr.generation = ctdb_db->generation;
207         r->rsn           = header->rsn;
208         r->keylen        = key.dsize;
209         r->datalen       = data.dsize;
210         r->db_id         = ctdb_db->db_id;
211         memcpy(&r->data[0], key.dptr, key.dsize);
212         memcpy(&r->data[key.dsize], data.dptr, data.dsize);
213         memcpy(&r->data[key.dsize+data.dsize], &header->flags, sizeof(uint32_t));
214
215         ctdb_queue_packet(ctdb, &r->hdr);
216
217         talloc_free(tmp_ctx);
218 }
219
220 /*
221   send a dmaster request (give another node the dmaster for a record)
222
223   This is always sent to the lmaster, which ensures that the lmaster
224   always knows who the dmaster is. The lmaster will then send a
225   CTDB_REPLY_DMASTER to the new dmaster
226 */
227 static void ctdb_call_send_dmaster(struct ctdb_db_context *ctdb_db, 
228                                    struct ctdb_req_call_old *c, 
229                                    struct ctdb_ltdb_header *header,
230                                    TDB_DATA *key, TDB_DATA *data)
231 {
232         struct ctdb_req_dmaster_old *r;
233         struct ctdb_context *ctdb = ctdb_db->ctdb;
234         int len;
235         uint32_t lmaster = ctdb_lmaster(ctdb, key);
236
237         if (ctdb->methods == NULL) {
238                 ctdb_fatal(ctdb, "Failed ctdb_call_send_dmaster since transport is down");
239                 return;
240         }
241
242         if (data->dsize != 0) {
243                 header->flags |= CTDB_REC_FLAG_MIGRATED_WITH_DATA;
244         }
245
246         if (lmaster == ctdb->pnn) {
247                 ctdb_send_dmaster_reply(ctdb_db, header, *key, *data, 
248                                         c->hdr.srcnode, c->hdr.reqid);
249                 return;
250         }
251         
252         len = offsetof(struct ctdb_req_dmaster_old, data) + key->dsize + data->dsize
253                         + sizeof(uint32_t);
254         r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REQ_DMASTER, len, 
255                                     struct ctdb_req_dmaster_old);
256         CTDB_NO_MEMORY_FATAL(ctdb, r);
257         r->hdr.destnode  = lmaster;
258         r->hdr.reqid     = c->hdr.reqid;
259         r->hdr.generation = ctdb_db->generation;
260         r->db_id         = c->db_id;
261         r->rsn           = header->rsn;
262         r->dmaster       = c->hdr.srcnode;
263         r->keylen        = key->dsize;
264         r->datalen       = data->dsize;
265         memcpy(&r->data[0], key->dptr, key->dsize);
266         memcpy(&r->data[key->dsize], data->dptr, data->dsize);
267         memcpy(&r->data[key->dsize + data->dsize], &header->flags, sizeof(uint32_t));
268
269         header->dmaster = c->hdr.srcnode;
270         if (ctdb_ltdb_store(ctdb_db, *key, header, *data) != 0) {
271                 ctdb_fatal(ctdb, "Failed to store record in ctdb_call_send_dmaster");
272         }
273         
274         ctdb_queue_packet(ctdb, &r->hdr);
275
276         talloc_free(r);
277 }
278
279 static void ctdb_sticky_pindown_timeout(struct tevent_context *ev,
280                                         struct tevent_timer *te,
281                                         struct timeval t, void *private_data)
282 {
283         struct ctdb_sticky_record *sr = talloc_get_type(private_data, 
284                                                        struct ctdb_sticky_record);
285
286         DEBUG(DEBUG_ERR,("Pindown timeout db:%s  unstick record\n", sr->ctdb_db->db_name));
287         if (sr->pindown != NULL) {
288                 talloc_free(sr->pindown);
289                 sr->pindown = NULL;
290         }
291 }
292
293 static int
294 ctdb_set_sticky_pindown(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, TDB_DATA key)
295 {
296         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
297         uint32_t *k;
298         struct ctdb_sticky_record *sr;
299
300         k = ctdb_key_to_idkey(tmp_ctx, key);
301         if (k == NULL) {
302                 DEBUG(DEBUG_ERR,("Failed to allocate key for sticky record\n"));
303                 talloc_free(tmp_ctx);
304                 return -1;
305         }
306
307         sr = trbt_lookuparray32(ctdb_db->sticky_records, k[0], &k[0]);
308         if (sr == NULL) {
309                 talloc_free(tmp_ctx);
310                 return 0;
311         }
312
313         talloc_free(tmp_ctx);
314
315         if (sr->pindown == NULL) {
316                 DEBUG(DEBUG_ERR,("Pinning down record in %s for %d ms\n", ctdb_db->db_name, ctdb->tunable.sticky_pindown));
317                 sr->pindown = talloc_new(sr);
318                 if (sr->pindown == NULL) {
319                         DEBUG(DEBUG_ERR,("Failed to allocate pindown context for sticky record\n"));
320                         return -1;
321                 }
322                 tevent_add_timer(ctdb->ev, sr->pindown,
323                                  timeval_current_ofs(ctdb->tunable.sticky_pindown / 1000,
324                                                      (ctdb->tunable.sticky_pindown * 1000) % 1000000),
325                                  ctdb_sticky_pindown_timeout, sr);
326         }
327
328         return 0;
329 }
330
331 /*
332   called when a CTDB_REPLY_DMASTER packet comes in, or when the lmaster
333   gets a CTDB_REQUEST_DMASTER for itself. We become the dmaster.
334
335   must be called with the chainlock held. This function releases the chainlock
336 */
337 static void ctdb_become_dmaster(struct ctdb_db_context *ctdb_db,
338                                 struct ctdb_req_header *hdr,
339                                 TDB_DATA key, TDB_DATA data,
340                                 uint64_t rsn, uint32_t record_flags)
341 {
342         struct ctdb_call_state *state;
343         struct ctdb_context *ctdb = ctdb_db->ctdb;
344         struct ctdb_ltdb_header header;
345         int ret;
346
347         DEBUG(DEBUG_DEBUG,("pnn %u dmaster response %08x\n", ctdb->pnn, ctdb_hash(&key)));
348
349         ZERO_STRUCT(header);
350         header.rsn = rsn;
351         header.dmaster = ctdb->pnn;
352         header.flags = record_flags;
353
354         state = reqid_find(ctdb->idr, hdr->reqid, struct ctdb_call_state);
355
356         if (state) {
357                 if (state->call->flags & CTDB_CALL_FLAG_VACUUM_MIGRATION) {
358                         /*
359                          * We temporarily add the VACUUM_MIGRATED flag to
360                          * the record flags, so that ctdb_ltdb_store can
361                          * decide whether the record should be stored or
362                          * deleted.
363                          */
364                         header.flags |= CTDB_REC_FLAG_VACUUM_MIGRATED;
365                 }
366         }
367
368         if (ctdb_ltdb_store(ctdb_db, key, &header, data) != 0) {
369                 ctdb_fatal(ctdb, "ctdb_reply_dmaster store failed\n");
370
371                 ret = ctdb_ltdb_unlock(ctdb_db, key);
372                 if (ret != 0) {
373                         DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
374                 }
375                 return;
376         }
377
378         /* we just became DMASTER and this database is "sticky",
379            see if the record is flagged as "hot" and set up a pin-down
380            context to stop migrations for a little while if so
381         */
382         if (ctdb_db->sticky) {
383                 ctdb_set_sticky_pindown(ctdb, ctdb_db, key);
384         }
385
386         if (state == NULL) {
387                 DEBUG(DEBUG_ERR,("pnn %u Invalid reqid %u in ctdb_become_dmaster from node %u\n",
388                          ctdb->pnn, hdr->reqid, hdr->srcnode));
389
390                 ret = ctdb_ltdb_unlock(ctdb_db, key);
391                 if (ret != 0) {
392                         DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
393                 }
394                 return;
395         }
396
397         if (key.dsize != state->call->key.dsize || memcmp(key.dptr, state->call->key.dptr, key.dsize)) {
398                 DEBUG(DEBUG_ERR, ("Got bogus DMASTER packet reqid:%u from node %u. Key does not match key held in matching idr.\n", hdr->reqid, hdr->srcnode));
399
400                 ret = ctdb_ltdb_unlock(ctdb_db, key);
401                 if (ret != 0) {
402                         DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
403                 }
404                 return;
405         }
406
407         if (hdr->reqid != state->reqid) {
408                 /* we found a record  but it was the wrong one */
409                 DEBUG(DEBUG_ERR, ("Dropped orphan in ctdb_become_dmaster with reqid:%u\n from node %u", hdr->reqid, hdr->srcnode));
410
411                 ret = ctdb_ltdb_unlock(ctdb_db, key);
412                 if (ret != 0) {
413                         DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
414                 }
415                 return;
416         }
417
418         ctdb_call_local(ctdb_db, state->call, &header, state, &data, true);
419
420         ret = ctdb_ltdb_unlock(ctdb_db, state->call->key);
421         if (ret != 0) {
422                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
423         }
424
425         state->state = CTDB_CALL_DONE;
426         if (state->async.fn) {
427                 state->async.fn(state);
428         }
429 }
430
431 struct dmaster_defer_call {
432         struct dmaster_defer_call *next, *prev;
433         struct ctdb_context *ctdb;
434         struct ctdb_req_header *hdr;
435 };
436
437 struct dmaster_defer_queue {
438         struct ctdb_db_context *ctdb_db;
439         uint32_t generation;
440         struct dmaster_defer_call *deferred_calls;
441 };
442
443 static void dmaster_defer_reprocess(struct tevent_context *ev,
444                                     struct tevent_timer *te,
445                                     struct timeval t,
446                                     void *private_data)
447 {
448         struct dmaster_defer_call *call = talloc_get_type(
449                 private_data, struct dmaster_defer_call);
450
451         ctdb_input_pkt(call->ctdb, call->hdr);
452         talloc_free(call);
453 }
454
455 static int dmaster_defer_queue_destructor(struct dmaster_defer_queue *ddq)
456 {
457         /* Ignore requests, if database recovery happens in-between. */
458         if (ddq->generation != ddq->ctdb_db->generation) {
459                 return 0;
460         }
461
462         while (ddq->deferred_calls != NULL) {
463                 struct dmaster_defer_call *call = ddq->deferred_calls;
464
465                 DLIST_REMOVE(ddq->deferred_calls, call);
466
467                 talloc_steal(call->ctdb, call);
468                 tevent_add_timer(call->ctdb->ev, call, timeval_zero(),
469                                  dmaster_defer_reprocess, call);
470         }
471         return 0;
472 }
473
474 static void *insert_ddq_callback(void *parm, void *data)
475 {
476         if (data) {
477                 talloc_free(data);
478         }
479         return parm;
480 }
481
482 /**
483  * This function is used to reigster a key in database that needs to be updated.
484  * Any requests for that key should get deferred till this is completed.
485  */
486 static int dmaster_defer_setup(struct ctdb_db_context *ctdb_db,
487                                struct ctdb_req_header *hdr,
488                                TDB_DATA key)
489 {
490         uint32_t *k;
491         struct dmaster_defer_queue *ddq;
492
493         k = ctdb_key_to_idkey(hdr, key);
494         if (k == NULL) {
495                 DEBUG(DEBUG_ERR, ("Failed to allocate key for dmaster defer setup\n"));
496                 return -1;
497         }
498
499         /* Already exists */
500         ddq = trbt_lookuparray32(ctdb_db->defer_dmaster, k[0], k);
501         if (ddq != NULL) {
502                 if (ddq->generation == ctdb_db->generation) {
503                         talloc_free(k);
504                         return 0;
505                 }
506
507                 /* Recovery ocurred - get rid of old queue. All the deferred
508                  * requests will be resent anyway from ctdb_call_resend_db.
509                  */
510                 talloc_free(ddq);
511         }
512
513         ddq = talloc(hdr, struct dmaster_defer_queue);
514         if (ddq == NULL) {
515                 DEBUG(DEBUG_ERR, ("Failed to allocate dmaster defer queue\n"));
516                 talloc_free(k);
517                 return -1;
518         }
519         ddq->ctdb_db = ctdb_db;
520         ddq->generation = hdr->generation;
521         ddq->deferred_calls = NULL;
522
523         trbt_insertarray32_callback(ctdb_db->defer_dmaster, k[0], k,
524                                     insert_ddq_callback, ddq);
525         talloc_set_destructor(ddq, dmaster_defer_queue_destructor);
526
527         talloc_free(k);
528         return 0;
529 }
530
531 static int dmaster_defer_add(struct ctdb_db_context *ctdb_db,
532                              struct ctdb_req_header *hdr,
533                              TDB_DATA key)
534 {
535         struct dmaster_defer_queue *ddq;
536         struct dmaster_defer_call *call;
537         uint32_t *k;
538
539         k = ctdb_key_to_idkey(hdr, key);
540         if (k == NULL) {
541                 DEBUG(DEBUG_ERR, ("Failed to allocate key for dmaster defer add\n"));
542                 return -1;
543         }
544
545         ddq = trbt_lookuparray32(ctdb_db->defer_dmaster, k[0], k);
546         if (ddq == NULL) {
547                 talloc_free(k);
548                 return -1;
549         }
550
551         talloc_free(k);
552
553         if (ddq->generation != hdr->generation) {
554                 talloc_set_destructor(ddq, NULL);
555                 talloc_free(ddq);
556                 return -1;
557         }
558
559         call = talloc(ddq, struct dmaster_defer_call);
560         if (call == NULL) {
561                 DEBUG(DEBUG_ERR, ("Failed to allocate dmaster defer call\n"));
562                 return -1;
563         }
564
565         call->ctdb = ctdb_db->ctdb;
566         call->hdr = talloc_steal(call, hdr);
567
568         DLIST_ADD_END(ddq->deferred_calls, call);
569
570         return 0;
571 }
572
573 /*
574   called when a CTDB_REQ_DMASTER packet comes in
575
576   this comes into the lmaster for a record when the current dmaster
577   wants to give up the dmaster role and give it to someone else
578 */
579 void ctdb_request_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
580 {
581         struct ctdb_req_dmaster_old *c = (struct ctdb_req_dmaster_old *)hdr;
582         TDB_DATA key, data, data2;
583         struct ctdb_ltdb_header header;
584         struct ctdb_db_context *ctdb_db;
585         uint32_t record_flags = 0;
586         size_t len;
587         int ret;
588
589         key.dptr = c->data;
590         key.dsize = c->keylen;
591         data.dptr = c->data + c->keylen;
592         data.dsize = c->datalen;
593         len = offsetof(struct ctdb_req_dmaster_old, data) + key.dsize + data.dsize
594                         + sizeof(uint32_t);
595         if (len <= c->hdr.length) {
596                 memcpy(&record_flags, &c->data[c->keylen + c->datalen],
597                        sizeof(record_flags));
598         }
599
600         ctdb_db = find_ctdb_db(ctdb, c->db_id);
601         if (!ctdb_db) {
602                 ctdb_send_error(ctdb, hdr, -1,
603                                 "Unknown database in request. db_id==0x%08x",
604                                 c->db_id);
605                 return;
606         }
607
608         dmaster_defer_setup(ctdb_db, hdr, key);
609
610         /* fetch the current record */
611         ret = ctdb_ltdb_lock_fetch_requeue(ctdb_db, key, &header, hdr, &data2,
612                                            ctdb_call_input_pkt, ctdb, false);
613         if (ret == -1) {
614                 ctdb_fatal(ctdb, "ctdb_req_dmaster failed to fetch record");
615                 return;
616         }
617         if (ret == -2) {
618                 DEBUG(DEBUG_INFO,(__location__ " deferring ctdb_request_dmaster\n"));
619                 return;
620         }
621
622         if (ctdb_lmaster(ctdb, &key) != ctdb->pnn) {
623                 DEBUG(DEBUG_ERR, ("dmaster request to non-lmaster "
624                                   "db=%s lmaster=%u gen=%u curgen=%u\n",
625                                   ctdb_db->db_name, ctdb_lmaster(ctdb, &key),
626                                   hdr->generation, ctdb_db->generation));
627                 ctdb_fatal(ctdb, "ctdb_req_dmaster to non-lmaster");
628         }
629
630         DEBUG(DEBUG_DEBUG,("pnn %u dmaster request on %08x for %u from %u\n", 
631                  ctdb->pnn, ctdb_hash(&key), c->dmaster, c->hdr.srcnode));
632
633         /* its a protocol error if the sending node is not the current dmaster */
634         if (header.dmaster != hdr->srcnode) {
635                 DEBUG(DEBUG_ALERT,("pnn %u dmaster request for new-dmaster %u from non-master %u real-dmaster=%u key %08x dbid 0x%08x gen=%u curgen=%u c->rsn=%llu header.rsn=%llu reqid=%u keyval=0x%08x\n",
636                          ctdb->pnn, c->dmaster, hdr->srcnode, header.dmaster, ctdb_hash(&key),
637                          ctdb_db->db_id, hdr->generation, ctdb->vnn_map->generation,
638                          (unsigned long long)c->rsn, (unsigned long long)header.rsn, c->hdr.reqid,
639                          (key.dsize >= 4)?(*(uint32_t *)key.dptr):0));
640                 if (header.rsn != 0 || header.dmaster != ctdb->pnn) {
641                         DEBUG(DEBUG_ERR,("ctdb_req_dmaster from non-master. Force a recovery.\n"));
642
643                         ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
644                         ctdb_ltdb_unlock(ctdb_db, key);
645                         return;
646                 }
647         }
648
649         if (header.rsn > c->rsn) {
650                 DEBUG(DEBUG_ALERT,("pnn %u dmaster request with older RSN new-dmaster %u from %u real-dmaster=%u key %08x dbid 0x%08x gen=%u curgen=%u c->rsn=%llu header.rsn=%llu reqid=%u\n",
651                          ctdb->pnn, c->dmaster, hdr->srcnode, header.dmaster, ctdb_hash(&key),
652                          ctdb_db->db_id, hdr->generation, ctdb->vnn_map->generation,
653                          (unsigned long long)c->rsn, (unsigned long long)header.rsn, c->hdr.reqid));
654         }
655
656         /* use the rsn from the sending node */
657         header.rsn = c->rsn;
658
659         /* store the record flags from the sending node */
660         header.flags = record_flags;
661
662         /* check if the new dmaster is the lmaster, in which case we
663            skip the dmaster reply */
664         if (c->dmaster == ctdb->pnn) {
665                 ctdb_become_dmaster(ctdb_db, hdr, key, data, c->rsn, record_flags);
666         } else {
667                 ctdb_send_dmaster_reply(ctdb_db, &header, key, data, c->dmaster, hdr->reqid);
668
669                 ret = ctdb_ltdb_unlock(ctdb_db, key);
670                 if (ret != 0) {
671                         DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
672                 }
673         }
674 }
675
676 static void ctdb_sticky_record_timeout(struct tevent_context *ev,
677                                        struct tevent_timer *te,
678                                        struct timeval t, void *private_data)
679 {
680         struct ctdb_sticky_record *sr = talloc_get_type(private_data, 
681                                                        struct ctdb_sticky_record);
682         talloc_free(sr);
683 }
684
685 static void *ctdb_make_sticky_record_callback(void *parm, void *data)
686 {
687         if (data) {
688                 DEBUG(DEBUG_ERR,("Already have sticky record registered. Free old %p and create new %p\n", data, parm));
689                 talloc_free(data);
690         }
691         return parm;
692 }
693
694 static int
695 ctdb_make_record_sticky(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, TDB_DATA key)
696 {
697         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
698         uint32_t *k;
699         struct ctdb_sticky_record *sr;
700
701         k = ctdb_key_to_idkey(tmp_ctx, key);
702         if (k == NULL) {
703                 DEBUG(DEBUG_ERR,("Failed to allocate key for sticky record\n"));
704                 talloc_free(tmp_ctx);
705                 return -1;
706         }
707
708         sr = trbt_lookuparray32(ctdb_db->sticky_records, k[0], &k[0]);
709         if (sr != NULL) {
710                 talloc_free(tmp_ctx);
711                 return 0;
712         }
713
714         sr = talloc(ctdb_db->sticky_records, struct ctdb_sticky_record);
715         if (sr == NULL) {
716                 talloc_free(tmp_ctx);
717                 DEBUG(DEBUG_ERR,("Failed to allocate sticky record structure\n"));
718                 return -1;
719         }
720
721         sr->ctdb    = ctdb;
722         sr->ctdb_db = ctdb_db;
723         sr->pindown = NULL;
724
725         DEBUG(DEBUG_ERR,("Make record sticky for %d seconds in db %s key:0x%08x.\n",
726                          ctdb->tunable.sticky_duration,
727                          ctdb_db->db_name, ctdb_hash(&key)));
728
729         trbt_insertarray32_callback(ctdb_db->sticky_records, k[0], &k[0], ctdb_make_sticky_record_callback, sr);
730
731         tevent_add_timer(ctdb->ev, sr,
732                          timeval_current_ofs(ctdb->tunable.sticky_duration, 0),
733                          ctdb_sticky_record_timeout, sr);
734
735         talloc_free(tmp_ctx);
736         return 0;
737 }
738
739 struct pinned_down_requeue_handle {
740         struct ctdb_context *ctdb;
741         struct ctdb_req_header *hdr;
742 };
743
744 struct pinned_down_deferred_call {
745         struct ctdb_context *ctdb;
746         struct ctdb_req_header *hdr;
747 };
748
749 static void pinned_down_requeue(struct tevent_context *ev,
750                                 struct tevent_timer *te,
751                                 struct timeval t, void *private_data)
752 {
753         struct pinned_down_requeue_handle *handle = talloc_get_type(private_data, struct pinned_down_requeue_handle);
754         struct ctdb_context *ctdb = handle->ctdb;
755
756         talloc_steal(ctdb, handle->hdr);
757         ctdb_call_input_pkt(ctdb, handle->hdr);
758
759         talloc_free(handle);
760 }
761
762 static int pinned_down_destructor(struct pinned_down_deferred_call *pinned_down)
763 {
764         struct ctdb_context *ctdb = pinned_down->ctdb;
765         struct pinned_down_requeue_handle *handle = talloc(ctdb, struct pinned_down_requeue_handle);
766
767         handle->ctdb = pinned_down->ctdb;
768         handle->hdr  = pinned_down->hdr;
769         talloc_steal(handle, handle->hdr);
770
771         tevent_add_timer(ctdb->ev, handle, timeval_zero(),
772                          pinned_down_requeue, handle);
773
774         return 0;
775 }
776
777 static int
778 ctdb_defer_pinned_down_request(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, TDB_DATA key, struct ctdb_req_header *hdr)
779 {
780         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
781         uint32_t *k;
782         struct ctdb_sticky_record *sr;
783         struct pinned_down_deferred_call *pinned_down;
784
785         k = ctdb_key_to_idkey(tmp_ctx, key);
786         if (k == NULL) {
787                 DEBUG(DEBUG_ERR,("Failed to allocate key for sticky record\n"));
788                 talloc_free(tmp_ctx);
789                 return -1;
790         }
791
792         sr = trbt_lookuparray32(ctdb_db->sticky_records, k[0], &k[0]);
793         if (sr == NULL) {
794                 talloc_free(tmp_ctx);
795                 return -1;
796         }
797
798         talloc_free(tmp_ctx);
799
800         if (sr->pindown == NULL) {
801                 return -1;
802         }
803         
804         pinned_down = talloc(sr->pindown, struct pinned_down_deferred_call);
805         if (pinned_down == NULL) {
806                 DEBUG(DEBUG_ERR,("Failed to allocate structure for deferred pinned down request\n"));
807                 return -1;
808         }
809
810         pinned_down->ctdb = ctdb;
811         pinned_down->hdr  = hdr;
812
813         talloc_set_destructor(pinned_down, pinned_down_destructor);
814         talloc_steal(pinned_down, hdr);
815
816         return 0;
817 }
818
819 static void
820 ctdb_update_db_stat_hot_keys(struct ctdb_db_context *ctdb_db, TDB_DATA key, int hopcount)
821 {
822         int i, id;
823
824         /* smallest value is always at index 0 */
825         if (hopcount <= ctdb_db->statistics.hot_keys[0].count) {
826                 return;
827         }
828
829         /* see if we already know this key */
830         for (i = 0; i < MAX_HOT_KEYS; i++) {
831                 if (key.dsize != ctdb_db->statistics.hot_keys[i].key.dsize) {
832                         continue;
833                 }
834                 if (memcmp(key.dptr, ctdb_db->statistics.hot_keys[i].key.dptr, key.dsize)) {
835                         continue;
836                 }
837                 /* found an entry for this key */
838                 if (hopcount <= ctdb_db->statistics.hot_keys[i].count) {
839                         return;
840                 }
841                 ctdb_db->statistics.hot_keys[i].count = hopcount;
842                 goto sort_keys;
843         }
844
845         if (ctdb_db->statistics.num_hot_keys < MAX_HOT_KEYS) {
846                 id = ctdb_db->statistics.num_hot_keys;
847                 ctdb_db->statistics.num_hot_keys++;
848         } else {
849                 id = 0;
850         }
851
852         if (ctdb_db->statistics.hot_keys[id].key.dptr != NULL) {
853                 talloc_free(ctdb_db->statistics.hot_keys[id].key.dptr);
854         }
855         ctdb_db->statistics.hot_keys[id].key.dsize = key.dsize;
856         ctdb_db->statistics.hot_keys[id].key.dptr  = talloc_memdup(ctdb_db, key.dptr, key.dsize);
857         ctdb_db->statistics.hot_keys[id].count = hopcount;
858         DEBUG(DEBUG_NOTICE,("Updated hot key database=%s key=0x%08x id=%d hop_count=%d\n",
859                             ctdb_db->db_name, ctdb_hash(&key), id, hopcount));
860
861 sort_keys:
862         for (i = 1; i < MAX_HOT_KEYS; i++) {
863                 if (ctdb_db->statistics.hot_keys[i].count == 0) {
864                         continue;
865                 }
866                 if (ctdb_db->statistics.hot_keys[i].count < ctdb_db->statistics.hot_keys[0].count) {
867                         hopcount = ctdb_db->statistics.hot_keys[i].count;
868                         ctdb_db->statistics.hot_keys[i].count = ctdb_db->statistics.hot_keys[0].count;
869                         ctdb_db->statistics.hot_keys[0].count = hopcount;
870
871                         key = ctdb_db->statistics.hot_keys[i].key;
872                         ctdb_db->statistics.hot_keys[i].key = ctdb_db->statistics.hot_keys[0].key;
873                         ctdb_db->statistics.hot_keys[0].key = key;
874                 }
875         }
876 }
877
878 /*
879   called when a CTDB_REQ_CALL packet comes in
880 */
881 void ctdb_request_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
882 {
883         struct ctdb_req_call_old *c = (struct ctdb_req_call_old *)hdr;
884         TDB_DATA data;
885         struct ctdb_reply_call_old *r;
886         int ret, len;
887         struct ctdb_ltdb_header header;
888         struct ctdb_call *call;
889         struct ctdb_db_context *ctdb_db;
890         int tmp_count, bucket;
891
892         if (ctdb->methods == NULL) {
893                 DEBUG(DEBUG_INFO,(__location__ " Failed ctdb_request_call. Transport is DOWN\n"));
894                 return;
895         }
896
897
898         ctdb_db = find_ctdb_db(ctdb, c->db_id);
899         if (!ctdb_db) {
900                 ctdb_send_error(ctdb, hdr, -1,
901                                 "Unknown database in request. db_id==0x%08x",
902                                 c->db_id);
903                 return;
904         }
905
906         call = talloc(hdr, struct ctdb_call);
907         CTDB_NO_MEMORY_FATAL(ctdb, call);
908
909         call->call_id  = c->callid;
910         call->key.dptr = c->data;
911         call->key.dsize = c->keylen;
912         call->call_data.dptr = c->data + c->keylen;
913         call->call_data.dsize = c->calldatalen;
914         call->reply_data.dptr  = NULL;
915         call->reply_data.dsize = 0;
916
917
918         /* If this record is pinned down we should defer the
919            request until the pindown times out
920         */
921         if (ctdb_db->sticky) {
922                 if (ctdb_defer_pinned_down_request(ctdb, ctdb_db, call->key, hdr) == 0) {
923                         DEBUG(DEBUG_WARNING,
924                               ("Defer request for pinned down record in %s\n", ctdb_db->db_name));
925                         talloc_free(call);
926                         return;
927                 }
928         }
929
930         if (dmaster_defer_add(ctdb_db, hdr, call->key) == 0) {
931                 talloc_free(call);
932                 return;
933         }
934
935         /* determine if we are the dmaster for this key. This also
936            fetches the record data (if any), thus avoiding a 2nd fetch of the data 
937            if the call will be answered locally */
938
939         ret = ctdb_ltdb_lock_fetch_requeue(ctdb_db, call->key, &header, hdr, &data,
940                                            ctdb_call_input_pkt, ctdb, false);
941         if (ret == -1) {
942                 ctdb_send_error(ctdb, hdr, ret, "ltdb fetch failed in ctdb_request_call");
943                 talloc_free(call);
944                 return;
945         }
946         if (ret == -2) {
947                 DEBUG(DEBUG_INFO,(__location__ " deferred ctdb_request_call\n"));
948                 talloc_free(call);
949                 return;
950         }
951
952         /* Dont do READONLY if we don't have a tracking database */
953         if ((c->flags & CTDB_WANT_READONLY) && !ctdb_db->readonly) {
954                 c->flags &= ~CTDB_WANT_READONLY;
955         }
956
957         if (header.flags & CTDB_REC_RO_REVOKE_COMPLETE) {
958                 header.flags &= ~CTDB_REC_RO_FLAGS;
959                 CTDB_INCREMENT_STAT(ctdb, total_ro_revokes);
960                 CTDB_INCREMENT_DB_STAT(ctdb_db, db_ro_revokes);
961                 if (ctdb_ltdb_store(ctdb_db, call->key, &header, data) != 0) {
962                         ctdb_fatal(ctdb, "Failed to write header with cleared REVOKE flag");
963                 }
964                 /* and clear out the tracking data */
965                 if (tdb_delete(ctdb_db->rottdb, call->key) != 0) {
966                         DEBUG(DEBUG_ERR,(__location__ " Failed to clear out trackingdb record\n"));
967                 }
968         }
969
970         /* if we are revoking, we must defer all other calls until the revoke
971          * had completed.
972          */
973         if (header.flags & CTDB_REC_RO_REVOKING_READONLY) {
974                 talloc_free(data.dptr);
975                 ret = ctdb_ltdb_unlock(ctdb_db, call->key);
976
977                 if (ctdb_add_revoke_deferred_call(ctdb, ctdb_db, call->key, hdr, ctdb_call_input_pkt, ctdb) != 0) {
978                         ctdb_fatal(ctdb, "Failed to add deferred call for revoke child");
979                 }
980                 talloc_free(call);
981                 return;
982         }
983
984         /*
985          * If we are not the dmaster and are not hosting any delegations,
986          * then we redirect the request to the node than can answer it
987          * (the lmaster or the dmaster).
988          */
989         if ((header.dmaster != ctdb->pnn) 
990             && (!(header.flags & CTDB_REC_RO_HAVE_DELEGATIONS)) ) {
991                 talloc_free(data.dptr);
992                 ctdb_call_send_redirect(ctdb, ctdb_db, call->key, c, &header);
993
994                 ret = ctdb_ltdb_unlock(ctdb_db, call->key);
995                 if (ret != 0) {
996                         DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
997                 }
998                 talloc_free(call);
999                 return;
1000         }
1001
1002         if ( (!(c->flags & CTDB_WANT_READONLY))
1003         && (header.flags & (CTDB_REC_RO_HAVE_DELEGATIONS|CTDB_REC_RO_HAVE_READONLY)) ) {
1004                 header.flags   |= CTDB_REC_RO_REVOKING_READONLY;
1005                 if (ctdb_ltdb_store(ctdb_db, call->key, &header, data) != 0) {
1006                         ctdb_fatal(ctdb, "Failed to store record with HAVE_DELEGATIONS set");
1007                 }
1008                 ret = ctdb_ltdb_unlock(ctdb_db, call->key);
1009
1010                 if (ctdb_start_revoke_ro_record(ctdb, ctdb_db, call->key, &header, data) != 0) {
1011                         ctdb_fatal(ctdb, "Failed to start record revoke");
1012                 }
1013                 talloc_free(data.dptr);
1014
1015                 if (ctdb_add_revoke_deferred_call(ctdb, ctdb_db, call->key, hdr, ctdb_call_input_pkt, ctdb) != 0) {
1016                         ctdb_fatal(ctdb, "Failed to add deferred call for revoke child");
1017                 }
1018                 talloc_free(call);
1019
1020                 return;
1021         }               
1022
1023         /* If this is the first request for delegation. bump rsn and set
1024          * the delegations flag
1025          */
1026         if ((c->flags & CTDB_WANT_READONLY)
1027         &&  (c->callid == CTDB_FETCH_WITH_HEADER_FUNC)
1028         &&  (!(header.flags & CTDB_REC_RO_HAVE_DELEGATIONS))) {
1029                 header.rsn     += 3;
1030                 header.flags   |= CTDB_REC_RO_HAVE_DELEGATIONS;
1031                 if (ctdb_ltdb_store(ctdb_db, call->key, &header, data) != 0) {
1032                         ctdb_fatal(ctdb, "Failed to store record with HAVE_DELEGATIONS set");
1033                 }
1034         }
1035         if ((c->flags & CTDB_WANT_READONLY) 
1036         &&  (call->call_id == CTDB_FETCH_WITH_HEADER_FUNC)) {
1037                 TDB_DATA tdata;
1038
1039                 tdata = tdb_fetch(ctdb_db->rottdb, call->key);
1040                 if (ctdb_trackingdb_add_pnn(ctdb, &tdata, c->hdr.srcnode) != 0) {
1041                         ctdb_fatal(ctdb, "Failed to add node to trackingdb");
1042                 }
1043                 if (tdb_store(ctdb_db->rottdb, call->key, tdata, TDB_REPLACE) != 0) {
1044                         ctdb_fatal(ctdb, "Failed to store trackingdb data");
1045                 }
1046                 free(tdata.dptr);
1047
1048                 ret = ctdb_ltdb_unlock(ctdb_db, call->key);
1049                 if (ret != 0) {
1050                         DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
1051                 }
1052
1053                 len = offsetof(struct ctdb_reply_call_old, data) + data.dsize + sizeof(struct ctdb_ltdb_header);
1054                 r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REPLY_CALL, len, 
1055                                             struct ctdb_reply_call_old);
1056                 CTDB_NO_MEMORY_FATAL(ctdb, r);
1057                 r->hdr.destnode  = c->hdr.srcnode;
1058                 r->hdr.reqid     = c->hdr.reqid;
1059                 r->hdr.generation = ctdb_db->generation;
1060                 r->status        = 0;
1061                 r->datalen       = data.dsize + sizeof(struct ctdb_ltdb_header);
1062                 header.rsn      -= 2;
1063                 header.flags   |= CTDB_REC_RO_HAVE_READONLY;
1064                 header.flags   &= ~CTDB_REC_RO_HAVE_DELEGATIONS;
1065                 memcpy(&r->data[0], &header, sizeof(struct ctdb_ltdb_header));
1066
1067                 if (data.dsize) {
1068                         memcpy(&r->data[sizeof(struct ctdb_ltdb_header)], data.dptr, data.dsize);
1069                 }
1070
1071                 ctdb_queue_packet(ctdb, &r->hdr);
1072                 CTDB_INCREMENT_STAT(ctdb, total_ro_delegations);
1073                 CTDB_INCREMENT_DB_STAT(ctdb_db, db_ro_delegations);
1074
1075                 talloc_free(r);
1076                 talloc_free(call);
1077                 return;
1078         }
1079
1080         CTDB_UPDATE_STAT(ctdb, max_hop_count, c->hopcount);
1081         tmp_count = c->hopcount;
1082         bucket = 0;
1083         while (tmp_count) {
1084                 tmp_count >>= 2;
1085                 bucket++;
1086         }
1087         if (bucket >= MAX_COUNT_BUCKETS) {
1088                 bucket = MAX_COUNT_BUCKETS - 1;
1089         }
1090         CTDB_INCREMENT_STAT(ctdb, hop_count_bucket[bucket]);
1091         CTDB_INCREMENT_DB_STAT(ctdb_db, hop_count_bucket[bucket]);
1092         ctdb_update_db_stat_hot_keys(ctdb_db, call->key, c->hopcount);
1093
1094         /* If this database supports sticky records, then check if the
1095            hopcount is big. If it is it means the record is hot and we
1096            should make it sticky.
1097         */
1098         if (ctdb_db->sticky && c->hopcount >= ctdb->tunable.hopcount_make_sticky) {
1099                 ctdb_make_record_sticky(ctdb, ctdb_db, call->key);
1100         }
1101
1102
1103         /* Try if possible to migrate the record off to the caller node.
1104          * From the clients perspective a fetch of the data is just as 
1105          * expensive as a migration.
1106          */
1107         if (c->hdr.srcnode != ctdb->pnn) {
1108                 if (ctdb_db->persistent_state) {
1109                         DEBUG(DEBUG_INFO, (__location__ " refusing migration"
1110                               " of key %s while transaction is active\n",
1111                               (char *)call->key.dptr));
1112                 } else {
1113                         DEBUG(DEBUG_DEBUG,("pnn %u starting migration of %08x to %u\n",
1114                                  ctdb->pnn, ctdb_hash(&(call->key)), c->hdr.srcnode));
1115                         ctdb_call_send_dmaster(ctdb_db, c, &header, &(call->key), &data);
1116                         talloc_free(data.dptr);
1117
1118                         ret = ctdb_ltdb_unlock(ctdb_db, call->key);
1119                         if (ret != 0) {
1120                                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
1121                         }
1122                 }
1123                 talloc_free(call);
1124                 return;
1125         }
1126
1127         ret = ctdb_call_local(ctdb_db, call, &header, hdr, &data, true);
1128         if (ret != 0) {
1129                 DEBUG(DEBUG_ERR,(__location__ " ctdb_call_local failed\n"));
1130                 call->status = -1;
1131         }
1132
1133         ret = ctdb_ltdb_unlock(ctdb_db, call->key);
1134         if (ret != 0) {
1135                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
1136         }
1137
1138         len = offsetof(struct ctdb_reply_call_old, data) + call->reply_data.dsize;
1139         r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REPLY_CALL, len, 
1140                                     struct ctdb_reply_call_old);
1141         CTDB_NO_MEMORY_FATAL(ctdb, r);
1142         r->hdr.destnode  = hdr->srcnode;
1143         r->hdr.reqid     = hdr->reqid;
1144         r->hdr.generation = ctdb_db->generation;
1145         r->status        = call->status;
1146         r->datalen       = call->reply_data.dsize;
1147         if (call->reply_data.dsize) {
1148                 memcpy(&r->data[0], call->reply_data.dptr, call->reply_data.dsize);
1149         }
1150
1151         ctdb_queue_packet(ctdb, &r->hdr);
1152
1153         talloc_free(r);
1154         talloc_free(call);
1155 }
1156
1157 /**
1158  * called when a CTDB_REPLY_CALL packet comes in
1159  *
1160  * This packet comes in response to a CTDB_REQ_CALL request packet. It
1161  * contains any reply data from the call
1162  */
1163 void ctdb_reply_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
1164 {
1165         struct ctdb_reply_call_old *c = (struct ctdb_reply_call_old *)hdr;
1166         struct ctdb_call_state *state;
1167
1168         state = reqid_find(ctdb->idr, hdr->reqid, struct ctdb_call_state);
1169         if (state == NULL) {
1170                 DEBUG(DEBUG_ERR, (__location__ " reqid %u not found\n", hdr->reqid));
1171                 return;
1172         }
1173
1174         if (hdr->reqid != state->reqid) {
1175                 /* we found a record  but it was the wrong one */
1176                 DEBUG(DEBUG_ERR, ("Dropped orphaned call reply with reqid:%u\n",hdr->reqid));
1177                 return;
1178         }
1179
1180
1181         /* read only delegation processing */
1182         /* If we got a FETCH_WITH_HEADER we should check if this is a ro
1183          * delegation since we may need to update the record header
1184          */
1185         if (state->c->callid == CTDB_FETCH_WITH_HEADER_FUNC) {
1186                 struct ctdb_db_context *ctdb_db = state->ctdb_db;
1187                 struct ctdb_ltdb_header *header = (struct ctdb_ltdb_header *)&c->data[0];
1188                 struct ctdb_ltdb_header oldheader;
1189                 TDB_DATA key, data, olddata;
1190                 int ret;
1191
1192                 if (!(header->flags & CTDB_REC_RO_HAVE_READONLY)) {
1193                         goto finished_ro;
1194                         return;
1195                 }
1196
1197                 key.dsize = state->c->keylen;
1198                 key.dptr  = state->c->data;
1199                 ret = ctdb_ltdb_lock_requeue(ctdb_db, key, hdr,
1200                                      ctdb_call_input_pkt, ctdb, false);
1201                 if (ret == -2) {
1202                         return;
1203                 }
1204                 if (ret != 0) {
1205                         DEBUG(DEBUG_ERR,(__location__ " Failed to get lock in ctdb_reply_call\n"));
1206                         return;
1207                 }
1208
1209                 ret = ctdb_ltdb_fetch(ctdb_db, key, &oldheader, state, &olddata);
1210                 if (ret != 0) {
1211                         DEBUG(DEBUG_ERR, ("Failed to fetch old record in ctdb_reply_call\n"));
1212                         ctdb_ltdb_unlock(ctdb_db, key);
1213                         goto finished_ro;
1214                 }                       
1215
1216                 if (header->rsn <= oldheader.rsn) {
1217                         ctdb_ltdb_unlock(ctdb_db, key);
1218                         goto finished_ro;
1219                 }
1220
1221                 if (c->datalen < sizeof(struct ctdb_ltdb_header)) {
1222                         DEBUG(DEBUG_ERR,(__location__ " Got FETCH_WITH_HEADER reply with too little data: %d bytes\n", c->datalen));
1223                         ctdb_ltdb_unlock(ctdb_db, key);
1224                         goto finished_ro;
1225                 }
1226
1227                 data.dsize = c->datalen - sizeof(struct ctdb_ltdb_header);
1228                 data.dptr  = &c->data[sizeof(struct ctdb_ltdb_header)];
1229                 ret = ctdb_ltdb_store(ctdb_db, key, header, data);
1230                 if (ret != 0) {
1231                         DEBUG(DEBUG_ERR, ("Failed to store new record in ctdb_reply_call\n"));
1232                         ctdb_ltdb_unlock(ctdb_db, key);
1233                         goto finished_ro;
1234                 }                       
1235
1236                 ctdb_ltdb_unlock(ctdb_db, key);
1237         }
1238 finished_ro:
1239
1240         state->call->reply_data.dptr = c->data;
1241         state->call->reply_data.dsize = c->datalen;
1242         state->call->status = c->status;
1243
1244         talloc_steal(state, c);
1245
1246         state->state = CTDB_CALL_DONE;
1247         if (state->async.fn) {
1248                 state->async.fn(state);
1249         }
1250 }
1251
1252
1253 /**
1254  * called when a CTDB_REPLY_DMASTER packet comes in
1255  *
1256  * This packet comes in from the lmaster in response to a CTDB_REQ_CALL
1257  * request packet. It means that the current dmaster wants to give us
1258  * the dmaster role.
1259  */
1260 void ctdb_reply_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
1261 {
1262         struct ctdb_reply_dmaster_old *c = (struct ctdb_reply_dmaster_old *)hdr;
1263         struct ctdb_db_context *ctdb_db;
1264         TDB_DATA key, data;
1265         uint32_t record_flags = 0;
1266         size_t len;
1267         int ret;
1268
1269         ctdb_db = find_ctdb_db(ctdb, c->db_id);
1270         if (ctdb_db == NULL) {
1271                 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_reply_dmaster\n", c->db_id));
1272                 return;
1273         }
1274         
1275         key.dptr = c->data;
1276         key.dsize = c->keylen;
1277         data.dptr = &c->data[key.dsize];
1278         data.dsize = c->datalen;
1279         len = offsetof(struct ctdb_reply_dmaster_old, data) + key.dsize + data.dsize
1280                 + sizeof(uint32_t);
1281         if (len <= c->hdr.length) {
1282                 memcpy(&record_flags, &c->data[c->keylen + c->datalen],
1283                        sizeof(record_flags));
1284         }
1285
1286         dmaster_defer_setup(ctdb_db, hdr, key);
1287
1288         ret = ctdb_ltdb_lock_requeue(ctdb_db, key, hdr,
1289                                      ctdb_call_input_pkt, ctdb, false);
1290         if (ret == -2) {
1291                 return;
1292         }
1293         if (ret != 0) {
1294                 DEBUG(DEBUG_ERR,(__location__ " Failed to get lock in ctdb_reply_dmaster\n"));
1295                 return;
1296         }
1297
1298         ctdb_become_dmaster(ctdb_db, hdr, key, data, c->rsn, record_flags);
1299 }
1300
1301
1302 /*
1303   called when a CTDB_REPLY_ERROR packet comes in
1304 */
1305 void ctdb_reply_error(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
1306 {
1307         struct ctdb_reply_error_old *c = (struct ctdb_reply_error_old *)hdr;
1308         struct ctdb_call_state *state;
1309
1310         state = reqid_find(ctdb->idr, hdr->reqid, struct ctdb_call_state);
1311         if (state == NULL) {
1312                 DEBUG(DEBUG_ERR,("pnn %u Invalid reqid %u in ctdb_reply_error\n",
1313                          ctdb->pnn, hdr->reqid));
1314                 return;
1315         }
1316
1317         if (hdr->reqid != state->reqid) {
1318                 /* we found a record  but it was the wrong one */
1319                 DEBUG(DEBUG_ERR, ("Dropped orphaned error reply with reqid:%u\n",hdr->reqid));
1320                 return;
1321         }
1322
1323         talloc_steal(state, c);
1324
1325         state->state  = CTDB_CALL_ERROR;
1326         state->errmsg = (char *)c->msg;
1327         if (state->async.fn) {
1328                 state->async.fn(state);
1329         }
1330 }
1331
1332
1333 /*
1334   destroy a ctdb_call
1335 */
1336 static int ctdb_call_destructor(struct ctdb_call_state *state)
1337 {
1338         DLIST_REMOVE(state->ctdb_db->pending_calls, state);
1339         reqid_remove(state->ctdb_db->ctdb->idr, state->reqid);
1340         return 0;
1341 }
1342
1343
1344 /*
1345   called when a ctdb_call needs to be resent after a reconfigure event
1346 */
1347 static void ctdb_call_resend(struct ctdb_call_state *state)
1348 {
1349         struct ctdb_context *ctdb = state->ctdb_db->ctdb;
1350
1351         state->generation = state->ctdb_db->generation;
1352
1353         /* use a new reqid, in case the old reply does eventually come in */
1354         reqid_remove(ctdb->idr, state->reqid);
1355         state->reqid = reqid_new(ctdb->idr, state);
1356         state->c->hdr.reqid = state->reqid;
1357
1358         /* update the generation count for this request, so its valid with the new vnn_map */
1359         state->c->hdr.generation = state->generation;
1360
1361         /* send the packet to ourselves, it will be redirected appropriately */
1362         state->c->hdr.destnode = ctdb->pnn;
1363
1364         ctdb_queue_packet(ctdb, &state->c->hdr);
1365         DEBUG(DEBUG_NOTICE,("resent ctdb_call for db %s reqid %u generation %u\n",
1366                             state->ctdb_db->db_name, state->reqid, state->generation));
1367 }
1368
1369 /*
1370   resend all pending calls on recovery
1371  */
1372 void ctdb_call_resend_db(struct ctdb_db_context *ctdb_db)
1373 {
1374         struct ctdb_call_state *state, *next;
1375
1376         for (state = ctdb_db->pending_calls; state; state = next) {
1377                 next = state->next;
1378                 ctdb_call_resend(state);
1379         }
1380 }
1381
1382 void ctdb_call_resend_all(struct ctdb_context *ctdb)
1383 {
1384         struct ctdb_db_context *ctdb_db;
1385
1386         for (ctdb_db = ctdb->db_list; ctdb_db; ctdb_db = ctdb_db->next) {
1387                 ctdb_call_resend_db(ctdb_db);
1388         }
1389 }
1390
1391 /*
1392   this allows the caller to setup a async.fn 
1393 */
1394 static void call_local_trigger(struct tevent_context *ev,
1395                                struct tevent_timer *te,
1396                                struct timeval t, void *private_data)
1397 {
1398         struct ctdb_call_state *state = talloc_get_type(private_data, struct ctdb_call_state);
1399         if (state->async.fn) {
1400                 state->async.fn(state);
1401         }
1402 }       
1403
1404
1405 /*
1406   construct an event driven local ctdb_call
1407
1408   this is used so that locally processed ctdb_call requests are processed
1409   in an event driven manner
1410 */
1411 struct ctdb_call_state *ctdb_call_local_send(struct ctdb_db_context *ctdb_db, 
1412                                              struct ctdb_call *call,
1413                                              struct ctdb_ltdb_header *header,
1414                                              TDB_DATA *data)
1415 {
1416         struct ctdb_call_state *state;
1417         struct ctdb_context *ctdb = ctdb_db->ctdb;
1418         int ret;
1419
1420         state = talloc_zero(ctdb_db, struct ctdb_call_state);
1421         CTDB_NO_MEMORY_NULL(ctdb, state);
1422
1423         talloc_steal(state, data->dptr);
1424
1425         state->state = CTDB_CALL_DONE;
1426         state->call  = talloc(state, struct ctdb_call);
1427         CTDB_NO_MEMORY_NULL(ctdb, state->call);
1428         *(state->call) = *call;
1429         state->ctdb_db = ctdb_db;
1430
1431         ret = ctdb_call_local(ctdb_db, state->call, header, state, data, true);
1432         if (ret != 0) {
1433                 DEBUG(DEBUG_DEBUG,("ctdb_call_local() failed, ignoring return code %d\n", ret));
1434         }
1435
1436         tevent_add_timer(ctdb->ev, state, timeval_zero(),
1437                          call_local_trigger, state);
1438
1439         return state;
1440 }
1441
1442
1443 /*
1444   make a remote ctdb call - async send. Called in daemon context.
1445
1446   This constructs a ctdb_call request and queues it for processing. 
1447   This call never blocks.
1448 */
1449 struct ctdb_call_state *ctdb_daemon_call_send_remote(struct ctdb_db_context *ctdb_db, 
1450                                                      struct ctdb_call *call, 
1451                                                      struct ctdb_ltdb_header *header)
1452 {
1453         uint32_t len;
1454         struct ctdb_call_state *state;
1455         struct ctdb_context *ctdb = ctdb_db->ctdb;
1456
1457         if (ctdb->methods == NULL) {
1458                 DEBUG(DEBUG_INFO,(__location__ " Failed send packet. Transport is down\n"));
1459                 return NULL;
1460         }
1461
1462         state = talloc_zero(ctdb_db, struct ctdb_call_state);
1463         CTDB_NO_MEMORY_NULL(ctdb, state);
1464         state->call = talloc(state, struct ctdb_call);
1465         CTDB_NO_MEMORY_NULL(ctdb, state->call);
1466
1467         state->reqid = reqid_new(ctdb->idr, state);
1468         state->ctdb_db = ctdb_db;
1469         talloc_set_destructor(state, ctdb_call_destructor);
1470
1471         len = offsetof(struct ctdb_req_call_old, data) + call->key.dsize + call->call_data.dsize;
1472         state->c = ctdb_transport_allocate(ctdb, state, CTDB_REQ_CALL, len, 
1473                                            struct ctdb_req_call_old);
1474         CTDB_NO_MEMORY_NULL(ctdb, state->c);
1475         state->c->hdr.destnode  = header->dmaster;
1476
1477         /* this limits us to 16k outstanding messages - not unreasonable */
1478         state->c->hdr.reqid     = state->reqid;
1479         state->c->hdr.generation = ctdb_db->generation;
1480         state->c->flags         = call->flags;
1481         state->c->db_id         = ctdb_db->db_id;
1482         state->c->callid        = call->call_id;
1483         state->c->hopcount      = 0;
1484         state->c->keylen        = call->key.dsize;
1485         state->c->calldatalen   = call->call_data.dsize;
1486         memcpy(&state->c->data[0], call->key.dptr, call->key.dsize);
1487         memcpy(&state->c->data[call->key.dsize], 
1488                call->call_data.dptr, call->call_data.dsize);
1489         *(state->call)              = *call;
1490         state->call->call_data.dptr = &state->c->data[call->key.dsize];
1491         state->call->key.dptr       = &state->c->data[0];
1492
1493         state->state  = CTDB_CALL_WAIT;
1494         state->generation = ctdb_db->generation;
1495
1496         DLIST_ADD(ctdb_db->pending_calls, state);
1497
1498         ctdb_queue_packet(ctdb, &state->c->hdr);
1499
1500         return state;
1501 }
1502
1503 /*
1504   make a remote ctdb call - async recv - called in daemon context
1505
1506   This is called when the program wants to wait for a ctdb_call to complete and get the 
1507   results. This call will block unless the call has already completed.
1508 */
1509 int ctdb_daemon_call_recv(struct ctdb_call_state *state, struct ctdb_call *call)
1510 {
1511         while (state->state < CTDB_CALL_DONE) {
1512                 tevent_loop_once(state->ctdb_db->ctdb->ev);
1513         }
1514         if (state->state != CTDB_CALL_DONE) {
1515                 ctdb_set_error(state->ctdb_db->ctdb, "%s", state->errmsg);
1516                 talloc_free(state);
1517                 return -1;
1518         }
1519
1520         if (state->call->reply_data.dsize) {
1521                 call->reply_data.dptr = talloc_memdup(call,
1522                                                       state->call->reply_data.dptr,
1523                                                       state->call->reply_data.dsize);
1524                 call->reply_data.dsize = state->call->reply_data.dsize;
1525         } else {
1526                 call->reply_data.dptr = NULL;
1527                 call->reply_data.dsize = 0;
1528         }
1529         call->status = state->call->status;
1530         talloc_free(state);
1531         return 0;
1532 }
1533
1534
1535 /* 
1536    send a keepalive packet to the other node
1537 */
1538 void ctdb_send_keepalive(struct ctdb_context *ctdb, uint32_t destnode)
1539 {
1540         struct ctdb_req_keepalive_old *r;
1541         
1542         if (ctdb->methods == NULL) {
1543                 DEBUG(DEBUG_INFO,(__location__ " Failed to send keepalive. Transport is DOWN\n"));
1544                 return;
1545         }
1546
1547         r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REQ_KEEPALIVE,
1548                                     sizeof(struct ctdb_req_keepalive_old), 
1549                                     struct ctdb_req_keepalive_old);
1550         CTDB_NO_MEMORY_FATAL(ctdb, r);
1551         r->hdr.destnode  = destnode;
1552         r->hdr.reqid     = 0;
1553         
1554         CTDB_INCREMENT_STAT(ctdb, keepalive_packets_sent);
1555
1556         ctdb_queue_packet(ctdb, &r->hdr);
1557
1558         talloc_free(r);
1559 }
1560
1561
1562
1563 struct revokechild_deferred_call {
1564         struct ctdb_context *ctdb;
1565         struct ctdb_req_header *hdr;
1566         deferred_requeue_fn fn;
1567         void *ctx;
1568 };
1569
1570 struct revokechild_handle {
1571         struct revokechild_handle *next, *prev;
1572         struct ctdb_context *ctdb;
1573         struct ctdb_db_context *ctdb_db;
1574         struct tevent_fd *fde;
1575         int status;
1576         int fd[2];
1577         pid_t child;
1578         TDB_DATA key;
1579 };
1580
1581 struct revokechild_requeue_handle {
1582         struct ctdb_context *ctdb;
1583         struct ctdb_req_header *hdr;
1584         deferred_requeue_fn fn;
1585         void *ctx;
1586 };
1587
1588 static void deferred_call_requeue(struct tevent_context *ev,
1589                                   struct tevent_timer *te,
1590                                   struct timeval t, void *private_data)
1591 {
1592         struct revokechild_requeue_handle *requeue_handle = talloc_get_type(private_data, struct revokechild_requeue_handle);
1593
1594         requeue_handle->fn(requeue_handle->ctx, requeue_handle->hdr);
1595         talloc_free(requeue_handle);
1596 }
1597
1598 static int deferred_call_destructor(struct revokechild_deferred_call *deferred_call)
1599 {
1600         struct ctdb_context *ctdb = deferred_call->ctdb;
1601         struct revokechild_requeue_handle *requeue_handle = talloc(ctdb, struct revokechild_requeue_handle);
1602         struct ctdb_req_call_old *c = (struct ctdb_req_call_old *)deferred_call->hdr;
1603
1604         requeue_handle->ctdb = ctdb;
1605         requeue_handle->hdr  = deferred_call->hdr;
1606         requeue_handle->fn   = deferred_call->fn;
1607         requeue_handle->ctx  = deferred_call->ctx;
1608         talloc_steal(requeue_handle, requeue_handle->hdr);
1609
1610         /* when revoking, any READONLY requests have 1 second grace to let read/write finish first */
1611         tevent_add_timer(ctdb->ev, requeue_handle,
1612                          timeval_current_ofs(c->flags & CTDB_WANT_READONLY ? 1 : 0, 0),
1613                          deferred_call_requeue, requeue_handle);
1614
1615         return 0;
1616 }
1617
1618
1619 static int revokechild_destructor(struct revokechild_handle *rc)
1620 {
1621         if (rc->fde != NULL) {
1622                 talloc_free(rc->fde);
1623         }
1624
1625         if (rc->fd[0] != -1) {
1626                 close(rc->fd[0]);
1627         }
1628         if (rc->fd[1] != -1) {
1629                 close(rc->fd[1]);
1630         }
1631         ctdb_kill(rc->ctdb, rc->child, SIGKILL);
1632
1633         DLIST_REMOVE(rc->ctdb_db->revokechild_active, rc);
1634         return 0;
1635 }
1636
1637 static void revokechild_handler(struct tevent_context *ev,
1638                                 struct tevent_fd *fde,
1639                                 uint16_t flags, void *private_data)
1640 {
1641         struct revokechild_handle *rc = talloc_get_type(private_data, 
1642                                                      struct revokechild_handle);
1643         int ret;
1644         char c;
1645
1646         ret = sys_read(rc->fd[0], &c, 1);
1647         if (ret != 1) {
1648                 DEBUG(DEBUG_ERR,("Failed to read status from revokechild. errno:%d\n", errno));
1649                 rc->status = -1;
1650                 talloc_free(rc);
1651                 return;
1652         }
1653         if (c != 0) {
1654                 DEBUG(DEBUG_ERR,("revokechild returned failure. status:%d\n", c));
1655                 rc->status = -1;
1656                 talloc_free(rc);
1657                 return;
1658         }
1659
1660         talloc_free(rc);
1661 }
1662
1663 struct ctdb_revoke_state {
1664         struct ctdb_db_context *ctdb_db;
1665         TDB_DATA key;
1666         struct ctdb_ltdb_header *header;
1667         TDB_DATA data;
1668         int count;
1669         int status;
1670         int finished;
1671 };
1672
1673 static void update_record_cb(struct ctdb_client_control_state *state)
1674 {
1675         struct ctdb_revoke_state *revoke_state;
1676         int ret;
1677         int32_t res;
1678
1679         if (state == NULL) {
1680                 return;
1681         }
1682         revoke_state = state->async.private_data;
1683
1684         state->async.fn = NULL;
1685         ret = ctdb_control_recv(state->ctdb, state, state, NULL, &res, NULL);
1686         if ((ret != 0) || (res != 0)) {
1687                 DEBUG(DEBUG_ERR,("Recv for revoke update record failed ret:%d res:%d\n", ret, res));
1688                 revoke_state->status = -1;
1689         }
1690
1691         revoke_state->count--;
1692         if (revoke_state->count <= 0) {
1693                 revoke_state->finished = 1;
1694         }
1695 }
1696
1697 static void revoke_send_cb(struct ctdb_context *ctdb, uint32_t pnn, void *private_data)
1698 {
1699         struct ctdb_revoke_state *revoke_state = private_data;
1700         struct ctdb_client_control_state *state;
1701
1702         state = ctdb_ctrl_updaterecord_send(ctdb, revoke_state, timeval_current_ofs(ctdb->tunable.control_timeout,0), pnn, revoke_state->ctdb_db, revoke_state->key, revoke_state->header, revoke_state->data);
1703         if (state == NULL) {
1704                 DEBUG(DEBUG_ERR,("Failure to send update record to revoke readonly delegation\n"));
1705                 revoke_state->status = -1;
1706                 return;
1707         }
1708         state->async.fn           = update_record_cb;
1709         state->async.private_data = revoke_state;
1710
1711         revoke_state->count++;
1712
1713 }
1714
1715 static void ctdb_revoke_timeout_handler(struct tevent_context *ev,
1716                                         struct tevent_timer *te,
1717                                         struct timeval yt, void *private_data)
1718 {
1719         struct ctdb_revoke_state *state = private_data;
1720
1721         DEBUG(DEBUG_ERR,("Timed out waiting for revoke to finish\n"));
1722         state->finished = 1;
1723         state->status   = -1;
1724 }
1725
1726 static int ctdb_revoke_all_delegations(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, TDB_DATA tdata, TDB_DATA key, struct ctdb_ltdb_header *header, TDB_DATA data)
1727 {
1728         struct ctdb_revoke_state *state = talloc_zero(ctdb, struct ctdb_revoke_state);
1729         struct ctdb_ltdb_header new_header;
1730         TDB_DATA new_data;
1731
1732         state->ctdb_db = ctdb_db;
1733         state->key     = key;
1734         state->header  = header;
1735         state->data    = data;
1736  
1737         ctdb_trackingdb_traverse(ctdb, tdata, revoke_send_cb, state);
1738
1739         tevent_add_timer(ctdb->ev, state,
1740                          timeval_current_ofs(ctdb->tunable.control_timeout, 0),
1741                          ctdb_revoke_timeout_handler, state);
1742
1743         while (state->finished == 0) {
1744                 tevent_loop_once(ctdb->ev);
1745         }
1746
1747         if (ctdb_ltdb_lock(ctdb_db, key) != 0) {
1748                 DEBUG(DEBUG_ERR,("Failed to chainlock the database in revokechild\n"));
1749                 talloc_free(state);
1750                 return -1;
1751         }
1752         if (ctdb_ltdb_fetch(ctdb_db, key, &new_header, state, &new_data) != 0) {
1753                 ctdb_ltdb_unlock(ctdb_db, key);
1754                 DEBUG(DEBUG_ERR,("Failed for fetch tdb record in revokechild\n"));
1755                 talloc_free(state);
1756                 return -1;
1757         }
1758         header->rsn++;
1759         if (new_header.rsn > header->rsn) {
1760                 ctdb_ltdb_unlock(ctdb_db, key);
1761                 DEBUG(DEBUG_ERR,("RSN too high in tdb record in revokechild\n"));
1762                 talloc_free(state);
1763                 return -1;
1764         }
1765         if ( (new_header.flags & (CTDB_REC_RO_REVOKING_READONLY|CTDB_REC_RO_HAVE_DELEGATIONS)) != (CTDB_REC_RO_REVOKING_READONLY|CTDB_REC_RO_HAVE_DELEGATIONS) ) {
1766                 ctdb_ltdb_unlock(ctdb_db, key);
1767                 DEBUG(DEBUG_ERR,("Flags are wrong in tdb record in revokechild\n"));
1768                 talloc_free(state);
1769                 return -1;
1770         }
1771
1772         /*
1773          * If revoke on all nodes succeed, revoke is complete.  Otherwise,
1774          * remove CTDB_REC_RO_REVOKING_READONLY flag and retry.
1775          */
1776         if (state->status == 0) {
1777                 new_header.rsn++;
1778                 new_header.flags |= CTDB_REC_RO_REVOKE_COMPLETE;
1779         } else {
1780                 DEBUG(DEBUG_NOTICE, ("Revoke all delegations failed, retrying.\n"));
1781                 new_header.flags &= ~CTDB_REC_RO_REVOKING_READONLY;
1782         }
1783         if (ctdb_ltdb_store(ctdb_db, key, &new_header, new_data) != 0) {
1784                 ctdb_ltdb_unlock(ctdb_db, key);
1785                 DEBUG(DEBUG_ERR,("Failed to write new record in revokechild\n"));
1786                 talloc_free(state);
1787                 return -1;
1788         }
1789         ctdb_ltdb_unlock(ctdb_db, key);
1790
1791         talloc_free(state);
1792         return 0;
1793 }
1794
1795
1796 int ctdb_start_revoke_ro_record(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, TDB_DATA key, struct ctdb_ltdb_header *header, TDB_DATA data)
1797 {
1798         TDB_DATA tdata;
1799         struct revokechild_handle *rc;
1800         pid_t parent = getpid();
1801         int ret;
1802
1803         header->flags &= ~(CTDB_REC_RO_REVOKING_READONLY|CTDB_REC_RO_HAVE_DELEGATIONS|CTDB_REC_RO_HAVE_READONLY);
1804         header->flags |= CTDB_REC_FLAG_MIGRATED_WITH_DATA;
1805         header->rsn   -= 1;
1806
1807         if ((rc = talloc_zero(ctdb_db, struct revokechild_handle)) == NULL) {
1808                 DEBUG(DEBUG_ERR,("Failed to allocate revokechild_handle\n"));
1809                 return -1;
1810         }
1811
1812         tdata = tdb_fetch(ctdb_db->rottdb, key);
1813         if (tdata.dsize > 0) {
1814                 uint8_t *tmp;
1815
1816                 tmp = tdata.dptr;
1817                 tdata.dptr = talloc_memdup(rc, tdata.dptr, tdata.dsize);
1818                 free(tmp);
1819         }
1820
1821         rc->status    = 0;
1822         rc->ctdb      = ctdb;
1823         rc->ctdb_db   = ctdb_db;
1824         rc->fd[0]     = -1;
1825         rc->fd[1]     = -1;
1826
1827         talloc_set_destructor(rc, revokechild_destructor);
1828
1829         rc->key.dsize = key.dsize;
1830         rc->key.dptr  = talloc_memdup(rc, key.dptr, key.dsize);
1831         if (rc->key.dptr == NULL) {
1832                 DEBUG(DEBUG_ERR,("Failed to allocate key for revokechild_handle\n"));
1833                 talloc_free(rc);
1834                 return -1;
1835         }
1836
1837         ret = pipe(rc->fd);
1838         if (ret != 0) {
1839                 DEBUG(DEBUG_ERR,("Failed to allocate key for revokechild_handle\n"));
1840                 talloc_free(rc);
1841                 return -1;
1842         }
1843
1844
1845         rc->child = ctdb_fork(ctdb);
1846         if (rc->child == (pid_t)-1) {
1847                 DEBUG(DEBUG_ERR,("Failed to fork child for revokechild\n"));
1848                 talloc_free(rc);
1849                 return -1;
1850         }
1851
1852         if (rc->child == 0) {
1853                 char c = 0;
1854                 close(rc->fd[0]);
1855                 debug_extra = talloc_asprintf(NULL, "revokechild-%s:", ctdb_db->db_name);
1856
1857                 prctl_set_comment("ctdb_revokechild");
1858                 if (switch_from_server_to_client(ctdb, "revokechild-%s", ctdb_db->db_name) != 0) {
1859                         DEBUG(DEBUG_ERR,("Failed to switch from server to client for revokechild process\n"));
1860                         c = 1;
1861                         goto child_finished;
1862                 }
1863
1864                 c = ctdb_revoke_all_delegations(ctdb, ctdb_db, tdata, key, header, data);
1865
1866 child_finished:
1867                 sys_write(rc->fd[1], &c, 1);
1868                 ctdb_wait_for_process_to_exit(parent);
1869                 _exit(0);
1870         }
1871
1872         close(rc->fd[1]);
1873         rc->fd[1] = -1;
1874         set_close_on_exec(rc->fd[0]);
1875
1876         /* This is an active revokechild child process */
1877         DLIST_ADD_END(ctdb_db->revokechild_active, rc);
1878
1879         rc->fde = tevent_add_fd(ctdb->ev, rc, rc->fd[0], TEVENT_FD_READ,
1880                                 revokechild_handler, (void *)rc);
1881         if (rc->fde == NULL) {
1882                 DEBUG(DEBUG_ERR,("Failed to set up fd event for revokechild process\n"));
1883                 talloc_free(rc);
1884         }
1885         tevent_fd_set_auto_close(rc->fde);
1886
1887         return 0;
1888 }
1889
1890 int ctdb_add_revoke_deferred_call(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, TDB_DATA key, struct ctdb_req_header *hdr, deferred_requeue_fn fn, void *call_context)
1891 {
1892         struct revokechild_handle *rc;
1893         struct revokechild_deferred_call *deferred_call;
1894
1895         for (rc = ctdb_db->revokechild_active; rc; rc = rc->next) {
1896                 if (rc->key.dsize == 0) {
1897                         continue;
1898                 }
1899                 if (rc->key.dsize != key.dsize) {
1900                         continue;
1901                 }
1902                 if (!memcmp(rc->key.dptr, key.dptr, key.dsize)) {
1903                         break;
1904                 }
1905         }
1906
1907         if (rc == NULL) {
1908                 DEBUG(DEBUG_ERR,("Failed to add deferred call to revoke list. revoke structure not found\n"));
1909                 return -1;
1910         }
1911
1912         deferred_call = talloc(rc, struct revokechild_deferred_call);
1913         if (deferred_call == NULL) {
1914                 DEBUG(DEBUG_ERR,("Failed to allocate deferred call structure for revoking record\n"));
1915                 return -1;
1916         }
1917
1918         deferred_call->ctdb = ctdb;
1919         deferred_call->hdr  = hdr;
1920         deferred_call->fn   = fn;
1921         deferred_call->ctx  = call_context;
1922
1923         talloc_set_destructor(deferred_call, deferred_call_destructor);
1924         talloc_steal(deferred_call, hdr);
1925
1926         return 0;
1927 }