ctdb-daemon: Add tracking of migration records
[samba.git] / ctdb / server / ctdb_call.c
1 /* 
2    ctdb_call protocol code
3
4    Copyright (C) Andrew Tridgell  2006
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19 /*
20   see http://wiki.samba.org/index.php/Samba_%26_Clustering for
21   protocol design and packet details
22 */
23 #include "replace.h"
24 #include "system/network.h"
25 #include "system/filesys.h"
26
27 #include <talloc.h>
28 #include <tevent.h>
29
30 #include "lib/util/dlinklist.h"
31 #include "lib/util/debug.h"
32 #include "lib/util/samba_util.h"
33 #include "lib/util/sys_rw.h"
34 #include "lib/util/util_process.h"
35
36 #include "ctdb_private.h"
37 #include "ctdb_client.h"
38
39 #include "common/rb_tree.h"
40 #include "common/reqid.h"
41 #include "common/system.h"
42 #include "common/common.h"
43 #include "common/logging.h"
44 #include "common/hash_count.h"
45
46 struct ctdb_sticky_record {
47         struct ctdb_context *ctdb;
48         struct ctdb_db_context *ctdb_db;
49         TDB_CONTEXT *pindown;
50 };
51
52 /*
53   find the ctdb_db from a db index
54  */
55  struct ctdb_db_context *find_ctdb_db(struct ctdb_context *ctdb, uint32_t id)
56 {
57         struct ctdb_db_context *ctdb_db;
58
59         for (ctdb_db=ctdb->db_list; ctdb_db; ctdb_db=ctdb_db->next) {
60                 if (ctdb_db->db_id == id) {
61                         break;
62                 }
63         }
64         return ctdb_db;
65 }
66
67 /*
68   a varient of input packet that can be used in lock requeue
69 */
70 static void ctdb_call_input_pkt(void *p, struct ctdb_req_header *hdr)
71 {
72         struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context);
73         ctdb_input_pkt(ctdb, hdr);
74 }
75
76
77 /*
78   send an error reply
79 */
80 static void ctdb_send_error(struct ctdb_context *ctdb, 
81                             struct ctdb_req_header *hdr, uint32_t status,
82                             const char *fmt, ...) PRINTF_ATTRIBUTE(4,5);
83 static void ctdb_send_error(struct ctdb_context *ctdb, 
84                             struct ctdb_req_header *hdr, uint32_t status,
85                             const char *fmt, ...)
86 {
87         va_list ap;
88         struct ctdb_reply_error_old *r;
89         char *msg;
90         int msglen, len;
91
92         if (ctdb->methods == NULL) {
93                 DEBUG(DEBUG_INFO,(__location__ " Failed to send error. Transport is DOWN\n"));
94                 return;
95         }
96
97         va_start(ap, fmt);
98         msg = talloc_vasprintf(ctdb, fmt, ap);
99         if (msg == NULL) {
100                 ctdb_fatal(ctdb, "Unable to allocate error in ctdb_send_error\n");
101         }
102         va_end(ap);
103
104         msglen = strlen(msg)+1;
105         len = offsetof(struct ctdb_reply_error_old, msg);
106         r = ctdb_transport_allocate(ctdb, msg, CTDB_REPLY_ERROR, len + msglen, 
107                                     struct ctdb_reply_error_old);
108         CTDB_NO_MEMORY_FATAL(ctdb, r);
109
110         r->hdr.destnode  = hdr->srcnode;
111         r->hdr.reqid     = hdr->reqid;
112         r->status        = status;
113         r->msglen        = msglen;
114         memcpy(&r->msg[0], msg, msglen);
115
116         ctdb_queue_packet(ctdb, &r->hdr);
117
118         talloc_free(msg);
119 }
120
121
122 /**
123  * send a redirect reply
124  *
125  * The logic behind this function is this:
126  *
127  * A client wants to grab a record and sends a CTDB_REQ_CALL packet
128  * to its local ctdb (ctdb_request_call). If the node is not itself
129  * the record's DMASTER, it first redirects the packet to  the
130  * record's LMASTER. The LMASTER then redirects the call packet to
131  * the current DMASTER. Note that this works because of this: When
132  * a record is migrated off a node, then the new DMASTER is stored
133  * in the record's copy on the former DMASTER.
134  */
135 static void ctdb_call_send_redirect(struct ctdb_context *ctdb,
136                                     struct ctdb_db_context *ctdb_db,
137                                     TDB_DATA key,
138                                     struct ctdb_req_call_old *c, 
139                                     struct ctdb_ltdb_header *header)
140 {
141         uint32_t lmaster = ctdb_lmaster(ctdb, &key);
142
143         c->hdr.destnode = lmaster;
144         if (ctdb->pnn == lmaster) {
145                 c->hdr.destnode = header->dmaster;
146         }
147         c->hopcount++;
148
149         if (c->hopcount%100 > 95) {
150                 DEBUG(DEBUG_WARNING,("High hopcount %d dbid:%s "
151                         "key:0x%08x reqid=%08x pnn:%d src:%d lmaster:%d "
152                         "header->dmaster:%d dst:%d\n",
153                         c->hopcount, ctdb_db->db_name, ctdb_hash(&key),
154                         c->hdr.reqid, ctdb->pnn, c->hdr.srcnode, lmaster,
155                         header->dmaster, c->hdr.destnode));
156         }
157
158         ctdb_queue_packet(ctdb, &c->hdr);
159 }
160
161
162 /*
163   send a dmaster reply
164
165   caller must have the chainlock before calling this routine. Caller must be
166   the lmaster
167 */
168 static void ctdb_send_dmaster_reply(struct ctdb_db_context *ctdb_db,
169                                     struct ctdb_ltdb_header *header,
170                                     TDB_DATA key, TDB_DATA data,
171                                     uint32_t new_dmaster,
172                                     uint32_t reqid)
173 {
174         struct ctdb_context *ctdb = ctdb_db->ctdb;
175         struct ctdb_reply_dmaster_old *r;
176         int ret, len;
177         TALLOC_CTX *tmp_ctx;
178
179         if (ctdb->pnn != ctdb_lmaster(ctdb, &key)) {
180                 DEBUG(DEBUG_ALERT,(__location__ " Caller is not lmaster!\n"));
181                 return;
182         }
183
184         header->dmaster = new_dmaster;
185         ret = ctdb_ltdb_store(ctdb_db, key, header, data);
186         if (ret != 0) {
187                 ctdb_fatal(ctdb, "ctdb_send_dmaster_reply unable to update dmaster");
188                 return;
189         }
190
191         if (ctdb->methods == NULL) {
192                 ctdb_fatal(ctdb, "ctdb_send_dmaster_reply cant update dmaster since transport is down");
193                 return;
194         }
195
196         /* put the packet on a temporary context, allowing us to safely free
197            it below even if ctdb_reply_dmaster() has freed it already */
198         tmp_ctx = talloc_new(ctdb);
199
200         /* send the CTDB_REPLY_DMASTER */
201         len = offsetof(struct ctdb_reply_dmaster_old, data) + key.dsize + data.dsize + sizeof(uint32_t);
202         r = ctdb_transport_allocate(ctdb, tmp_ctx, CTDB_REPLY_DMASTER, len,
203                                     struct ctdb_reply_dmaster_old);
204         CTDB_NO_MEMORY_FATAL(ctdb, r);
205
206         r->hdr.destnode  = new_dmaster;
207         r->hdr.reqid     = reqid;
208         r->hdr.generation = ctdb_db->generation;
209         r->rsn           = header->rsn;
210         r->keylen        = key.dsize;
211         r->datalen       = data.dsize;
212         r->db_id         = ctdb_db->db_id;
213         memcpy(&r->data[0], key.dptr, key.dsize);
214         memcpy(&r->data[key.dsize], data.dptr, data.dsize);
215         memcpy(&r->data[key.dsize+data.dsize], &header->flags, sizeof(uint32_t));
216
217         ctdb_queue_packet(ctdb, &r->hdr);
218
219         talloc_free(tmp_ctx);
220 }
221
222 /*
223   send a dmaster request (give another node the dmaster for a record)
224
225   This is always sent to the lmaster, which ensures that the lmaster
226   always knows who the dmaster is. The lmaster will then send a
227   CTDB_REPLY_DMASTER to the new dmaster
228 */
229 static void ctdb_call_send_dmaster(struct ctdb_db_context *ctdb_db, 
230                                    struct ctdb_req_call_old *c, 
231                                    struct ctdb_ltdb_header *header,
232                                    TDB_DATA *key, TDB_DATA *data)
233 {
234         struct ctdb_req_dmaster_old *r;
235         struct ctdb_context *ctdb = ctdb_db->ctdb;
236         int len;
237         uint32_t lmaster = ctdb_lmaster(ctdb, key);
238
239         if (ctdb->methods == NULL) {
240                 ctdb_fatal(ctdb, "Failed ctdb_call_send_dmaster since transport is down");
241                 return;
242         }
243
244         if (data->dsize != 0) {
245                 header->flags |= CTDB_REC_FLAG_MIGRATED_WITH_DATA;
246         }
247
248         if (lmaster == ctdb->pnn) {
249                 ctdb_send_dmaster_reply(ctdb_db, header, *key, *data, 
250                                         c->hdr.srcnode, c->hdr.reqid);
251                 return;
252         }
253         
254         len = offsetof(struct ctdb_req_dmaster_old, data) + key->dsize + data->dsize
255                         + sizeof(uint32_t);
256         r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REQ_DMASTER, len, 
257                                     struct ctdb_req_dmaster_old);
258         CTDB_NO_MEMORY_FATAL(ctdb, r);
259         r->hdr.destnode  = lmaster;
260         r->hdr.reqid     = c->hdr.reqid;
261         r->hdr.generation = ctdb_db->generation;
262         r->db_id         = c->db_id;
263         r->rsn           = header->rsn;
264         r->dmaster       = c->hdr.srcnode;
265         r->keylen        = key->dsize;
266         r->datalen       = data->dsize;
267         memcpy(&r->data[0], key->dptr, key->dsize);
268         memcpy(&r->data[key->dsize], data->dptr, data->dsize);
269         memcpy(&r->data[key->dsize + data->dsize], &header->flags, sizeof(uint32_t));
270
271         header->dmaster = c->hdr.srcnode;
272         if (ctdb_ltdb_store(ctdb_db, *key, header, *data) != 0) {
273                 ctdb_fatal(ctdb, "Failed to store record in ctdb_call_send_dmaster");
274         }
275         
276         ctdb_queue_packet(ctdb, &r->hdr);
277
278         talloc_free(r);
279 }
280
281 static void ctdb_sticky_pindown_timeout(struct tevent_context *ev,
282                                         struct tevent_timer *te,
283                                         struct timeval t, void *private_data)
284 {
285         struct ctdb_sticky_record *sr = talloc_get_type(private_data, 
286                                                        struct ctdb_sticky_record);
287
288         DEBUG(DEBUG_ERR,("Pindown timeout db:%s  unstick record\n", sr->ctdb_db->db_name));
289         if (sr->pindown != NULL) {
290                 talloc_free(sr->pindown);
291                 sr->pindown = NULL;
292         }
293 }
294
295 static int
296 ctdb_set_sticky_pindown(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, TDB_DATA key)
297 {
298         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
299         uint32_t *k;
300         struct ctdb_sticky_record *sr;
301
302         k = ctdb_key_to_idkey(tmp_ctx, key);
303         if (k == NULL) {
304                 DEBUG(DEBUG_ERR,("Failed to allocate key for sticky record\n"));
305                 talloc_free(tmp_ctx);
306                 return -1;
307         }
308
309         sr = trbt_lookuparray32(ctdb_db->sticky_records, k[0], &k[0]);
310         if (sr == NULL) {
311                 talloc_free(tmp_ctx);
312                 return 0;
313         }
314
315         talloc_free(tmp_ctx);
316
317         if (sr->pindown == NULL) {
318                 DEBUG(DEBUG_ERR,("Pinning down record in %s for %d ms\n", ctdb_db->db_name, ctdb->tunable.sticky_pindown));
319                 sr->pindown = talloc_new(sr);
320                 if (sr->pindown == NULL) {
321                         DEBUG(DEBUG_ERR,("Failed to allocate pindown context for sticky record\n"));
322                         return -1;
323                 }
324                 tevent_add_timer(ctdb->ev, sr->pindown,
325                                  timeval_current_ofs(ctdb->tunable.sticky_pindown / 1000,
326                                                      (ctdb->tunable.sticky_pindown * 1000) % 1000000),
327                                  ctdb_sticky_pindown_timeout, sr);
328         }
329
330         return 0;
331 }
332
333 /*
334   called when a CTDB_REPLY_DMASTER packet comes in, or when the lmaster
335   gets a CTDB_REQUEST_DMASTER for itself. We become the dmaster.
336
337   must be called with the chainlock held. This function releases the chainlock
338 */
339 static void ctdb_become_dmaster(struct ctdb_db_context *ctdb_db,
340                                 struct ctdb_req_header *hdr,
341                                 TDB_DATA key, TDB_DATA data,
342                                 uint64_t rsn, uint32_t record_flags)
343 {
344         struct ctdb_call_state *state;
345         struct ctdb_context *ctdb = ctdb_db->ctdb;
346         struct ctdb_ltdb_header header;
347         int ret;
348
349         DEBUG(DEBUG_DEBUG,("pnn %u dmaster response %08x\n", ctdb->pnn, ctdb_hash(&key)));
350
351         ZERO_STRUCT(header);
352         header.rsn = rsn;
353         header.dmaster = ctdb->pnn;
354         header.flags = record_flags;
355
356         state = reqid_find(ctdb->idr, hdr->reqid, struct ctdb_call_state);
357
358         if (state) {
359                 if (state->call->flags & CTDB_CALL_FLAG_VACUUM_MIGRATION) {
360                         /*
361                          * We temporarily add the VACUUM_MIGRATED flag to
362                          * the record flags, so that ctdb_ltdb_store can
363                          * decide whether the record should be stored or
364                          * deleted.
365                          */
366                         header.flags |= CTDB_REC_FLAG_VACUUM_MIGRATED;
367                 }
368         }
369
370         if (ctdb_ltdb_store(ctdb_db, key, &header, data) != 0) {
371                 ctdb_fatal(ctdb, "ctdb_reply_dmaster store failed\n");
372
373                 ret = ctdb_ltdb_unlock(ctdb_db, key);
374                 if (ret != 0) {
375                         DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
376                 }
377                 return;
378         }
379
380         /* we just became DMASTER and this database is "sticky",
381            see if the record is flagged as "hot" and set up a pin-down
382            context to stop migrations for a little while if so
383         */
384         if (ctdb_db->sticky) {
385                 ctdb_set_sticky_pindown(ctdb, ctdb_db, key);
386         }
387
388         if (state == NULL) {
389                 DEBUG(DEBUG_ERR,("pnn %u Invalid reqid %u in ctdb_become_dmaster from node %u\n",
390                          ctdb->pnn, hdr->reqid, hdr->srcnode));
391
392                 ret = ctdb_ltdb_unlock(ctdb_db, key);
393                 if (ret != 0) {
394                         DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
395                 }
396                 return;
397         }
398
399         if (key.dsize != state->call->key.dsize || memcmp(key.dptr, state->call->key.dptr, key.dsize)) {
400                 DEBUG(DEBUG_ERR, ("Got bogus DMASTER packet reqid:%u from node %u. Key does not match key held in matching idr.\n", hdr->reqid, hdr->srcnode));
401
402                 ret = ctdb_ltdb_unlock(ctdb_db, key);
403                 if (ret != 0) {
404                         DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
405                 }
406                 return;
407         }
408
409         if (hdr->reqid != state->reqid) {
410                 /* we found a record  but it was the wrong one */
411                 DEBUG(DEBUG_ERR, ("Dropped orphan in ctdb_become_dmaster with reqid:%u\n from node %u", hdr->reqid, hdr->srcnode));
412
413                 ret = ctdb_ltdb_unlock(ctdb_db, key);
414                 if (ret != 0) {
415                         DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
416                 }
417                 return;
418         }
419
420         (void) hash_count_increment(ctdb_db->migratedb, key);
421
422         ctdb_call_local(ctdb_db, state->call, &header, state, &data, true);
423
424         ret = ctdb_ltdb_unlock(ctdb_db, state->call->key);
425         if (ret != 0) {
426                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
427         }
428
429         state->state = CTDB_CALL_DONE;
430         if (state->async.fn) {
431                 state->async.fn(state);
432         }
433 }
434
435 struct dmaster_defer_call {
436         struct dmaster_defer_call *next, *prev;
437         struct ctdb_context *ctdb;
438         struct ctdb_req_header *hdr;
439 };
440
441 struct dmaster_defer_queue {
442         struct ctdb_db_context *ctdb_db;
443         uint32_t generation;
444         struct dmaster_defer_call *deferred_calls;
445 };
446
447 static void dmaster_defer_reprocess(struct tevent_context *ev,
448                                     struct tevent_timer *te,
449                                     struct timeval t,
450                                     void *private_data)
451 {
452         struct dmaster_defer_call *call = talloc_get_type(
453                 private_data, struct dmaster_defer_call);
454
455         ctdb_input_pkt(call->ctdb, call->hdr);
456         talloc_free(call);
457 }
458
459 static int dmaster_defer_queue_destructor(struct dmaster_defer_queue *ddq)
460 {
461         /* Ignore requests, if database recovery happens in-between. */
462         if (ddq->generation != ddq->ctdb_db->generation) {
463                 return 0;
464         }
465
466         while (ddq->deferred_calls != NULL) {
467                 struct dmaster_defer_call *call = ddq->deferred_calls;
468
469                 DLIST_REMOVE(ddq->deferred_calls, call);
470
471                 talloc_steal(call->ctdb, call);
472                 tevent_add_timer(call->ctdb->ev, call, timeval_zero(),
473                                  dmaster_defer_reprocess, call);
474         }
475         return 0;
476 }
477
478 static void *insert_ddq_callback(void *parm, void *data)
479 {
480         if (data) {
481                 talloc_free(data);
482         }
483         return parm;
484 }
485
486 /**
487  * This function is used to reigster a key in database that needs to be updated.
488  * Any requests for that key should get deferred till this is completed.
489  */
490 static int dmaster_defer_setup(struct ctdb_db_context *ctdb_db,
491                                struct ctdb_req_header *hdr,
492                                TDB_DATA key)
493 {
494         uint32_t *k;
495         struct dmaster_defer_queue *ddq;
496
497         k = ctdb_key_to_idkey(hdr, key);
498         if (k == NULL) {
499                 DEBUG(DEBUG_ERR, ("Failed to allocate key for dmaster defer setup\n"));
500                 return -1;
501         }
502
503         /* Already exists */
504         ddq = trbt_lookuparray32(ctdb_db->defer_dmaster, k[0], k);
505         if (ddq != NULL) {
506                 if (ddq->generation == ctdb_db->generation) {
507                         talloc_free(k);
508                         return 0;
509                 }
510
511                 /* Recovery ocurred - get rid of old queue. All the deferred
512                  * requests will be resent anyway from ctdb_call_resend_db.
513                  */
514                 talloc_free(ddq);
515         }
516
517         ddq = talloc(hdr, struct dmaster_defer_queue);
518         if (ddq == NULL) {
519                 DEBUG(DEBUG_ERR, ("Failed to allocate dmaster defer queue\n"));
520                 talloc_free(k);
521                 return -1;
522         }
523         ddq->ctdb_db = ctdb_db;
524         ddq->generation = hdr->generation;
525         ddq->deferred_calls = NULL;
526
527         trbt_insertarray32_callback(ctdb_db->defer_dmaster, k[0], k,
528                                     insert_ddq_callback, ddq);
529         talloc_set_destructor(ddq, dmaster_defer_queue_destructor);
530
531         talloc_free(k);
532         return 0;
533 }
534
535 static int dmaster_defer_add(struct ctdb_db_context *ctdb_db,
536                              struct ctdb_req_header *hdr,
537                              TDB_DATA key)
538 {
539         struct dmaster_defer_queue *ddq;
540         struct dmaster_defer_call *call;
541         uint32_t *k;
542
543         k = ctdb_key_to_idkey(hdr, key);
544         if (k == NULL) {
545                 DEBUG(DEBUG_ERR, ("Failed to allocate key for dmaster defer add\n"));
546                 return -1;
547         }
548
549         ddq = trbt_lookuparray32(ctdb_db->defer_dmaster, k[0], k);
550         if (ddq == NULL) {
551                 talloc_free(k);
552                 return -1;
553         }
554
555         talloc_free(k);
556
557         if (ddq->generation != hdr->generation) {
558                 talloc_set_destructor(ddq, NULL);
559                 talloc_free(ddq);
560                 return -1;
561         }
562
563         call = talloc(ddq, struct dmaster_defer_call);
564         if (call == NULL) {
565                 DEBUG(DEBUG_ERR, ("Failed to allocate dmaster defer call\n"));
566                 return -1;
567         }
568
569         call->ctdb = ctdb_db->ctdb;
570         call->hdr = talloc_steal(call, hdr);
571
572         DLIST_ADD_END(ddq->deferred_calls, call);
573
574         return 0;
575 }
576
577 /*
578   called when a CTDB_REQ_DMASTER packet comes in
579
580   this comes into the lmaster for a record when the current dmaster
581   wants to give up the dmaster role and give it to someone else
582 */
583 void ctdb_request_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
584 {
585         struct ctdb_req_dmaster_old *c = (struct ctdb_req_dmaster_old *)hdr;
586         TDB_DATA key, data, data2;
587         struct ctdb_ltdb_header header;
588         struct ctdb_db_context *ctdb_db;
589         uint32_t record_flags = 0;
590         size_t len;
591         int ret;
592
593         key.dptr = c->data;
594         key.dsize = c->keylen;
595         data.dptr = c->data + c->keylen;
596         data.dsize = c->datalen;
597         len = offsetof(struct ctdb_req_dmaster_old, data) + key.dsize + data.dsize
598                         + sizeof(uint32_t);
599         if (len <= c->hdr.length) {
600                 memcpy(&record_flags, &c->data[c->keylen + c->datalen],
601                        sizeof(record_flags));
602         }
603
604         ctdb_db = find_ctdb_db(ctdb, c->db_id);
605         if (!ctdb_db) {
606                 ctdb_send_error(ctdb, hdr, -1,
607                                 "Unknown database in request. db_id==0x%08x",
608                                 c->db_id);
609                 return;
610         }
611
612         dmaster_defer_setup(ctdb_db, hdr, key);
613
614         /* fetch the current record */
615         ret = ctdb_ltdb_lock_fetch_requeue(ctdb_db, key, &header, hdr, &data2,
616                                            ctdb_call_input_pkt, ctdb, false);
617         if (ret == -1) {
618                 ctdb_fatal(ctdb, "ctdb_req_dmaster failed to fetch record");
619                 return;
620         }
621         if (ret == -2) {
622                 DEBUG(DEBUG_INFO,(__location__ " deferring ctdb_request_dmaster\n"));
623                 return;
624         }
625
626         if (ctdb_lmaster(ctdb, &key) != ctdb->pnn) {
627                 DEBUG(DEBUG_ERR, ("dmaster request to non-lmaster "
628                                   "db=%s lmaster=%u gen=%u curgen=%u\n",
629                                   ctdb_db->db_name, ctdb_lmaster(ctdb, &key),
630                                   hdr->generation, ctdb_db->generation));
631                 ctdb_fatal(ctdb, "ctdb_req_dmaster to non-lmaster");
632         }
633
634         DEBUG(DEBUG_DEBUG,("pnn %u dmaster request on %08x for %u from %u\n", 
635                  ctdb->pnn, ctdb_hash(&key), c->dmaster, c->hdr.srcnode));
636
637         /* its a protocol error if the sending node is not the current dmaster */
638         if (header.dmaster != hdr->srcnode) {
639                 DEBUG(DEBUG_ALERT,("pnn %u dmaster request for new-dmaster %u from non-master %u real-dmaster=%u key %08x dbid 0x%08x gen=%u curgen=%u c->rsn=%llu header.rsn=%llu reqid=%u keyval=0x%08x\n",
640                          ctdb->pnn, c->dmaster, hdr->srcnode, header.dmaster, ctdb_hash(&key),
641                          ctdb_db->db_id, hdr->generation, ctdb->vnn_map->generation,
642                          (unsigned long long)c->rsn, (unsigned long long)header.rsn, c->hdr.reqid,
643                          (key.dsize >= 4)?(*(uint32_t *)key.dptr):0));
644                 if (header.rsn != 0 || header.dmaster != ctdb->pnn) {
645                         DEBUG(DEBUG_ERR,("ctdb_req_dmaster from non-master. Force a recovery.\n"));
646
647                         ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
648                         ctdb_ltdb_unlock(ctdb_db, key);
649                         return;
650                 }
651         }
652
653         if (header.rsn > c->rsn) {
654                 DEBUG(DEBUG_ALERT,("pnn %u dmaster request with older RSN new-dmaster %u from %u real-dmaster=%u key %08x dbid 0x%08x gen=%u curgen=%u c->rsn=%llu header.rsn=%llu reqid=%u\n",
655                          ctdb->pnn, c->dmaster, hdr->srcnode, header.dmaster, ctdb_hash(&key),
656                          ctdb_db->db_id, hdr->generation, ctdb->vnn_map->generation,
657                          (unsigned long long)c->rsn, (unsigned long long)header.rsn, c->hdr.reqid));
658         }
659
660         /* use the rsn from the sending node */
661         header.rsn = c->rsn;
662
663         /* store the record flags from the sending node */
664         header.flags = record_flags;
665
666         /* check if the new dmaster is the lmaster, in which case we
667            skip the dmaster reply */
668         if (c->dmaster == ctdb->pnn) {
669                 ctdb_become_dmaster(ctdb_db, hdr, key, data, c->rsn, record_flags);
670         } else {
671                 ctdb_send_dmaster_reply(ctdb_db, &header, key, data, c->dmaster, hdr->reqid);
672
673                 ret = ctdb_ltdb_unlock(ctdb_db, key);
674                 if (ret != 0) {
675                         DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
676                 }
677         }
678 }
679
680 static void ctdb_sticky_record_timeout(struct tevent_context *ev,
681                                        struct tevent_timer *te,
682                                        struct timeval t, void *private_data)
683 {
684         struct ctdb_sticky_record *sr = talloc_get_type(private_data, 
685                                                        struct ctdb_sticky_record);
686         talloc_free(sr);
687 }
688
689 static void *ctdb_make_sticky_record_callback(void *parm, void *data)
690 {
691         if (data) {
692                 DEBUG(DEBUG_ERR,("Already have sticky record registered. Free old %p and create new %p\n", data, parm));
693                 talloc_free(data);
694         }
695         return parm;
696 }
697
698 static int
699 ctdb_make_record_sticky(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, TDB_DATA key)
700 {
701         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
702         uint32_t *k;
703         struct ctdb_sticky_record *sr;
704
705         k = ctdb_key_to_idkey(tmp_ctx, key);
706         if (k == NULL) {
707                 DEBUG(DEBUG_ERR,("Failed to allocate key for sticky record\n"));
708                 talloc_free(tmp_ctx);
709                 return -1;
710         }
711
712         sr = trbt_lookuparray32(ctdb_db->sticky_records, k[0], &k[0]);
713         if (sr != NULL) {
714                 talloc_free(tmp_ctx);
715                 return 0;
716         }
717
718         sr = talloc(ctdb_db->sticky_records, struct ctdb_sticky_record);
719         if (sr == NULL) {
720                 talloc_free(tmp_ctx);
721                 DEBUG(DEBUG_ERR,("Failed to allocate sticky record structure\n"));
722                 return -1;
723         }
724
725         sr->ctdb    = ctdb;
726         sr->ctdb_db = ctdb_db;
727         sr->pindown = NULL;
728
729         DEBUG(DEBUG_ERR,("Make record sticky for %d seconds in db %s key:0x%08x.\n",
730                          ctdb->tunable.sticky_duration,
731                          ctdb_db->db_name, ctdb_hash(&key)));
732
733         trbt_insertarray32_callback(ctdb_db->sticky_records, k[0], &k[0], ctdb_make_sticky_record_callback, sr);
734
735         tevent_add_timer(ctdb->ev, sr,
736                          timeval_current_ofs(ctdb->tunable.sticky_duration, 0),
737                          ctdb_sticky_record_timeout, sr);
738
739         talloc_free(tmp_ctx);
740         return 0;
741 }
742
743 struct pinned_down_requeue_handle {
744         struct ctdb_context *ctdb;
745         struct ctdb_req_header *hdr;
746 };
747
748 struct pinned_down_deferred_call {
749         struct ctdb_context *ctdb;
750         struct ctdb_req_header *hdr;
751 };
752
753 static void pinned_down_requeue(struct tevent_context *ev,
754                                 struct tevent_timer *te,
755                                 struct timeval t, void *private_data)
756 {
757         struct pinned_down_requeue_handle *handle = talloc_get_type(private_data, struct pinned_down_requeue_handle);
758         struct ctdb_context *ctdb = handle->ctdb;
759
760         talloc_steal(ctdb, handle->hdr);
761         ctdb_call_input_pkt(ctdb, handle->hdr);
762
763         talloc_free(handle);
764 }
765
766 static int pinned_down_destructor(struct pinned_down_deferred_call *pinned_down)
767 {
768         struct ctdb_context *ctdb = pinned_down->ctdb;
769         struct pinned_down_requeue_handle *handle = talloc(ctdb, struct pinned_down_requeue_handle);
770
771         handle->ctdb = pinned_down->ctdb;
772         handle->hdr  = pinned_down->hdr;
773         talloc_steal(handle, handle->hdr);
774
775         tevent_add_timer(ctdb->ev, handle, timeval_zero(),
776                          pinned_down_requeue, handle);
777
778         return 0;
779 }
780
781 static int
782 ctdb_defer_pinned_down_request(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, TDB_DATA key, struct ctdb_req_header *hdr)
783 {
784         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
785         uint32_t *k;
786         struct ctdb_sticky_record *sr;
787         struct pinned_down_deferred_call *pinned_down;
788
789         k = ctdb_key_to_idkey(tmp_ctx, key);
790         if (k == NULL) {
791                 DEBUG(DEBUG_ERR,("Failed to allocate key for sticky record\n"));
792                 talloc_free(tmp_ctx);
793                 return -1;
794         }
795
796         sr = trbt_lookuparray32(ctdb_db->sticky_records, k[0], &k[0]);
797         if (sr == NULL) {
798                 talloc_free(tmp_ctx);
799                 return -1;
800         }
801
802         talloc_free(tmp_ctx);
803
804         if (sr->pindown == NULL) {
805                 return -1;
806         }
807         
808         pinned_down = talloc(sr->pindown, struct pinned_down_deferred_call);
809         if (pinned_down == NULL) {
810                 DEBUG(DEBUG_ERR,("Failed to allocate structure for deferred pinned down request\n"));
811                 return -1;
812         }
813
814         pinned_down->ctdb = ctdb;
815         pinned_down->hdr  = hdr;
816
817         talloc_set_destructor(pinned_down, pinned_down_destructor);
818         talloc_steal(pinned_down, hdr);
819
820         return 0;
821 }
822
823 static void
824 ctdb_update_db_stat_hot_keys(struct ctdb_db_context *ctdb_db, TDB_DATA key,
825                              int count)
826 {
827         int i, id;
828
829         /* smallest value is always at index 0 */
830         if (count <= ctdb_db->statistics.hot_keys[0].count) {
831                 return;
832         }
833
834         /* see if we already know this key */
835         for (i = 0; i < MAX_HOT_KEYS; i++) {
836                 if (key.dsize != ctdb_db->statistics.hot_keys[i].key.dsize) {
837                         continue;
838                 }
839                 if (memcmp(key.dptr, ctdb_db->statistics.hot_keys[i].key.dptr, key.dsize)) {
840                         continue;
841                 }
842                 /* found an entry for this key */
843                 if (count <= ctdb_db->statistics.hot_keys[i].count) {
844                         return;
845                 }
846                 ctdb_db->statistics.hot_keys[i].count = count;
847                 goto sort_keys;
848         }
849
850         if (ctdb_db->statistics.num_hot_keys < MAX_HOT_KEYS) {
851                 id = ctdb_db->statistics.num_hot_keys;
852                 ctdb_db->statistics.num_hot_keys++;
853         } else {
854                 id = 0;
855         }
856
857         if (ctdb_db->statistics.hot_keys[id].key.dptr != NULL) {
858                 talloc_free(ctdb_db->statistics.hot_keys[id].key.dptr);
859         }
860         ctdb_db->statistics.hot_keys[id].key.dsize = key.dsize;
861         ctdb_db->statistics.hot_keys[id].key.dptr  = talloc_memdup(ctdb_db, key.dptr, key.dsize);
862         ctdb_db->statistics.hot_keys[id].count = count;
863         DEBUG(DEBUG_NOTICE,
864               ("Updated hot key database=%s key=0x%08x id=%d count=%d\n",
865                ctdb_db->db_name, ctdb_hash(&key), id, count));
866
867 sort_keys:
868         for (i = 1; i < MAX_HOT_KEYS; i++) {
869                 if (ctdb_db->statistics.hot_keys[i].count == 0) {
870                         continue;
871                 }
872                 if (ctdb_db->statistics.hot_keys[i].count < ctdb_db->statistics.hot_keys[0].count) {
873                         count = ctdb_db->statistics.hot_keys[i].count;
874                         ctdb_db->statistics.hot_keys[i].count = ctdb_db->statistics.hot_keys[0].count;
875                         ctdb_db->statistics.hot_keys[0].count = count;
876
877                         key = ctdb_db->statistics.hot_keys[i].key;
878                         ctdb_db->statistics.hot_keys[i].key = ctdb_db->statistics.hot_keys[0].key;
879                         ctdb_db->statistics.hot_keys[0].key = key;
880                 }
881         }
882 }
883
884 /*
885   called when a CTDB_REQ_CALL packet comes in
886 */
887 void ctdb_request_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
888 {
889         struct ctdb_req_call_old *c = (struct ctdb_req_call_old *)hdr;
890         TDB_DATA data;
891         struct ctdb_reply_call_old *r;
892         int ret, len;
893         struct ctdb_ltdb_header header;
894         struct ctdb_call *call;
895         struct ctdb_db_context *ctdb_db;
896         int tmp_count, bucket;
897
898         if (ctdb->methods == NULL) {
899                 DEBUG(DEBUG_INFO,(__location__ " Failed ctdb_request_call. Transport is DOWN\n"));
900                 return;
901         }
902
903
904         ctdb_db = find_ctdb_db(ctdb, c->db_id);
905         if (!ctdb_db) {
906                 ctdb_send_error(ctdb, hdr, -1,
907                                 "Unknown database in request. db_id==0x%08x",
908                                 c->db_id);
909                 return;
910         }
911
912         call = talloc(hdr, struct ctdb_call);
913         CTDB_NO_MEMORY_FATAL(ctdb, call);
914
915         call->call_id  = c->callid;
916         call->key.dptr = c->data;
917         call->key.dsize = c->keylen;
918         call->call_data.dptr = c->data + c->keylen;
919         call->call_data.dsize = c->calldatalen;
920         call->reply_data.dptr  = NULL;
921         call->reply_data.dsize = 0;
922
923
924         /* If this record is pinned down we should defer the
925            request until the pindown times out
926         */
927         if (ctdb_db->sticky) {
928                 if (ctdb_defer_pinned_down_request(ctdb, ctdb_db, call->key, hdr) == 0) {
929                         DEBUG(DEBUG_WARNING,
930                               ("Defer request for pinned down record in %s\n", ctdb_db->db_name));
931                         talloc_free(call);
932                         return;
933                 }
934         }
935
936         if (dmaster_defer_add(ctdb_db, hdr, call->key) == 0) {
937                 talloc_free(call);
938                 return;
939         }
940
941         /* determine if we are the dmaster for this key. This also
942            fetches the record data (if any), thus avoiding a 2nd fetch of the data 
943            if the call will be answered locally */
944
945         ret = ctdb_ltdb_lock_fetch_requeue(ctdb_db, call->key, &header, hdr, &data,
946                                            ctdb_call_input_pkt, ctdb, false);
947         if (ret == -1) {
948                 ctdb_send_error(ctdb, hdr, ret, "ltdb fetch failed in ctdb_request_call");
949                 talloc_free(call);
950                 return;
951         }
952         if (ret == -2) {
953                 DEBUG(DEBUG_INFO,(__location__ " deferred ctdb_request_call\n"));
954                 talloc_free(call);
955                 return;
956         }
957
958         /* Dont do READONLY if we don't have a tracking database */
959         if ((c->flags & CTDB_WANT_READONLY) && !ctdb_db->readonly) {
960                 c->flags &= ~CTDB_WANT_READONLY;
961         }
962
963         if (header.flags & CTDB_REC_RO_REVOKE_COMPLETE) {
964                 header.flags &= ~CTDB_REC_RO_FLAGS;
965                 CTDB_INCREMENT_STAT(ctdb, total_ro_revokes);
966                 CTDB_INCREMENT_DB_STAT(ctdb_db, db_ro_revokes);
967                 if (ctdb_ltdb_store(ctdb_db, call->key, &header, data) != 0) {
968                         ctdb_fatal(ctdb, "Failed to write header with cleared REVOKE flag");
969                 }
970                 /* and clear out the tracking data */
971                 if (tdb_delete(ctdb_db->rottdb, call->key) != 0) {
972                         DEBUG(DEBUG_ERR,(__location__ " Failed to clear out trackingdb record\n"));
973                 }
974         }
975
976         /* if we are revoking, we must defer all other calls until the revoke
977          * had completed.
978          */
979         if (header.flags & CTDB_REC_RO_REVOKING_READONLY) {
980                 talloc_free(data.dptr);
981                 ret = ctdb_ltdb_unlock(ctdb_db, call->key);
982
983                 if (ctdb_add_revoke_deferred_call(ctdb, ctdb_db, call->key, hdr, ctdb_call_input_pkt, ctdb) != 0) {
984                         ctdb_fatal(ctdb, "Failed to add deferred call for revoke child");
985                 }
986                 talloc_free(call);
987                 return;
988         }
989
990         /*
991          * If we are not the dmaster and are not hosting any delegations,
992          * then we redirect the request to the node than can answer it
993          * (the lmaster or the dmaster).
994          */
995         if ((header.dmaster != ctdb->pnn) 
996             && (!(header.flags & CTDB_REC_RO_HAVE_DELEGATIONS)) ) {
997                 talloc_free(data.dptr);
998                 ctdb_call_send_redirect(ctdb, ctdb_db, call->key, c, &header);
999
1000                 ret = ctdb_ltdb_unlock(ctdb_db, call->key);
1001                 if (ret != 0) {
1002                         DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
1003                 }
1004                 talloc_free(call);
1005                 return;
1006         }
1007
1008         if ( (!(c->flags & CTDB_WANT_READONLY))
1009         && (header.flags & (CTDB_REC_RO_HAVE_DELEGATIONS|CTDB_REC_RO_HAVE_READONLY)) ) {
1010                 header.flags   |= CTDB_REC_RO_REVOKING_READONLY;
1011                 if (ctdb_ltdb_store(ctdb_db, call->key, &header, data) != 0) {
1012                         ctdb_fatal(ctdb, "Failed to store record with HAVE_DELEGATIONS set");
1013                 }
1014                 ret = ctdb_ltdb_unlock(ctdb_db, call->key);
1015
1016                 if (ctdb_start_revoke_ro_record(ctdb, ctdb_db, call->key, &header, data) != 0) {
1017                         ctdb_fatal(ctdb, "Failed to start record revoke");
1018                 }
1019                 talloc_free(data.dptr);
1020
1021                 if (ctdb_add_revoke_deferred_call(ctdb, ctdb_db, call->key, hdr, ctdb_call_input_pkt, ctdb) != 0) {
1022                         ctdb_fatal(ctdb, "Failed to add deferred call for revoke child");
1023                 }
1024                 talloc_free(call);
1025
1026                 return;
1027         }               
1028
1029         /* If this is the first request for delegation. bump rsn and set
1030          * the delegations flag
1031          */
1032         if ((c->flags & CTDB_WANT_READONLY)
1033         &&  (c->callid == CTDB_FETCH_WITH_HEADER_FUNC)
1034         &&  (!(header.flags & CTDB_REC_RO_HAVE_DELEGATIONS))) {
1035                 header.rsn     += 3;
1036                 header.flags   |= CTDB_REC_RO_HAVE_DELEGATIONS;
1037                 if (ctdb_ltdb_store(ctdb_db, call->key, &header, data) != 0) {
1038                         ctdb_fatal(ctdb, "Failed to store record with HAVE_DELEGATIONS set");
1039                 }
1040         }
1041         if ((c->flags & CTDB_WANT_READONLY) 
1042         &&  (call->call_id == CTDB_FETCH_WITH_HEADER_FUNC)) {
1043                 TDB_DATA tdata;
1044
1045                 tdata = tdb_fetch(ctdb_db->rottdb, call->key);
1046                 if (ctdb_trackingdb_add_pnn(ctdb, &tdata, c->hdr.srcnode) != 0) {
1047                         ctdb_fatal(ctdb, "Failed to add node to trackingdb");
1048                 }
1049                 if (tdb_store(ctdb_db->rottdb, call->key, tdata, TDB_REPLACE) != 0) {
1050                         ctdb_fatal(ctdb, "Failed to store trackingdb data");
1051                 }
1052                 free(tdata.dptr);
1053
1054                 ret = ctdb_ltdb_unlock(ctdb_db, call->key);
1055                 if (ret != 0) {
1056                         DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
1057                 }
1058
1059                 len = offsetof(struct ctdb_reply_call_old, data) + data.dsize + sizeof(struct ctdb_ltdb_header);
1060                 r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REPLY_CALL, len, 
1061                                             struct ctdb_reply_call_old);
1062                 CTDB_NO_MEMORY_FATAL(ctdb, r);
1063                 r->hdr.destnode  = c->hdr.srcnode;
1064                 r->hdr.reqid     = c->hdr.reqid;
1065                 r->hdr.generation = ctdb_db->generation;
1066                 r->status        = 0;
1067                 r->datalen       = data.dsize + sizeof(struct ctdb_ltdb_header);
1068                 header.rsn      -= 2;
1069                 header.flags   |= CTDB_REC_RO_HAVE_READONLY;
1070                 header.flags   &= ~CTDB_REC_RO_HAVE_DELEGATIONS;
1071                 memcpy(&r->data[0], &header, sizeof(struct ctdb_ltdb_header));
1072
1073                 if (data.dsize) {
1074                         memcpy(&r->data[sizeof(struct ctdb_ltdb_header)], data.dptr, data.dsize);
1075                 }
1076
1077                 ctdb_queue_packet(ctdb, &r->hdr);
1078                 CTDB_INCREMENT_STAT(ctdb, total_ro_delegations);
1079                 CTDB_INCREMENT_DB_STAT(ctdb_db, db_ro_delegations);
1080
1081                 talloc_free(r);
1082                 talloc_free(call);
1083                 return;
1084         }
1085
1086         CTDB_UPDATE_STAT(ctdb, max_hop_count, c->hopcount);
1087         tmp_count = c->hopcount;
1088         bucket = 0;
1089         while (tmp_count) {
1090                 tmp_count >>= 2;
1091                 bucket++;
1092         }
1093         if (bucket >= MAX_COUNT_BUCKETS) {
1094                 bucket = MAX_COUNT_BUCKETS - 1;
1095         }
1096         CTDB_INCREMENT_STAT(ctdb, hop_count_bucket[bucket]);
1097         CTDB_INCREMENT_DB_STAT(ctdb_db, hop_count_bucket[bucket]);
1098
1099         /* If this database supports sticky records, then check if the
1100            hopcount is big. If it is it means the record is hot and we
1101            should make it sticky.
1102         */
1103         if (ctdb_db->sticky && c->hopcount >= ctdb->tunable.hopcount_make_sticky) {
1104                 ctdb_make_record_sticky(ctdb, ctdb_db, call->key);
1105         }
1106
1107
1108         /* Try if possible to migrate the record off to the caller node.
1109          * From the clients perspective a fetch of the data is just as 
1110          * expensive as a migration.
1111          */
1112         if (c->hdr.srcnode != ctdb->pnn) {
1113                 if (ctdb_db->persistent_state) {
1114                         DEBUG(DEBUG_INFO, (__location__ " refusing migration"
1115                               " of key %s while transaction is active\n",
1116                               (char *)call->key.dptr));
1117                 } else {
1118                         DEBUG(DEBUG_DEBUG,("pnn %u starting migration of %08x to %u\n",
1119                                  ctdb->pnn, ctdb_hash(&(call->key)), c->hdr.srcnode));
1120                         ctdb_call_send_dmaster(ctdb_db, c, &header, &(call->key), &data);
1121                         talloc_free(data.dptr);
1122
1123                         ret = ctdb_ltdb_unlock(ctdb_db, call->key);
1124                         if (ret != 0) {
1125                                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
1126                         }
1127                 }
1128                 talloc_free(call);
1129                 return;
1130         }
1131
1132         ret = ctdb_call_local(ctdb_db, call, &header, hdr, &data, true);
1133         if (ret != 0) {
1134                 DEBUG(DEBUG_ERR,(__location__ " ctdb_call_local failed\n"));
1135                 call->status = -1;
1136         }
1137
1138         ret = ctdb_ltdb_unlock(ctdb_db, call->key);
1139         if (ret != 0) {
1140                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
1141         }
1142
1143         len = offsetof(struct ctdb_reply_call_old, data) + call->reply_data.dsize;
1144         r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REPLY_CALL, len, 
1145                                     struct ctdb_reply_call_old);
1146         CTDB_NO_MEMORY_FATAL(ctdb, r);
1147         r->hdr.destnode  = hdr->srcnode;
1148         r->hdr.reqid     = hdr->reqid;
1149         r->hdr.generation = ctdb_db->generation;
1150         r->status        = call->status;
1151         r->datalen       = call->reply_data.dsize;
1152         if (call->reply_data.dsize) {
1153                 memcpy(&r->data[0], call->reply_data.dptr, call->reply_data.dsize);
1154         }
1155
1156         ctdb_queue_packet(ctdb, &r->hdr);
1157
1158         talloc_free(r);
1159         talloc_free(call);
1160 }
1161
1162 /**
1163  * called when a CTDB_REPLY_CALL packet comes in
1164  *
1165  * This packet comes in response to a CTDB_REQ_CALL request packet. It
1166  * contains any reply data from the call
1167  */
1168 void ctdb_reply_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
1169 {
1170         struct ctdb_reply_call_old *c = (struct ctdb_reply_call_old *)hdr;
1171         struct ctdb_call_state *state;
1172
1173         state = reqid_find(ctdb->idr, hdr->reqid, struct ctdb_call_state);
1174         if (state == NULL) {
1175                 DEBUG(DEBUG_ERR, (__location__ " reqid %u not found\n", hdr->reqid));
1176                 return;
1177         }
1178
1179         if (hdr->reqid != state->reqid) {
1180                 /* we found a record  but it was the wrong one */
1181                 DEBUG(DEBUG_ERR, ("Dropped orphaned call reply with reqid:%u\n",hdr->reqid));
1182                 return;
1183         }
1184
1185
1186         /* read only delegation processing */
1187         /* If we got a FETCH_WITH_HEADER we should check if this is a ro
1188          * delegation since we may need to update the record header
1189          */
1190         if (state->c->callid == CTDB_FETCH_WITH_HEADER_FUNC) {
1191                 struct ctdb_db_context *ctdb_db = state->ctdb_db;
1192                 struct ctdb_ltdb_header *header = (struct ctdb_ltdb_header *)&c->data[0];
1193                 struct ctdb_ltdb_header oldheader;
1194                 TDB_DATA key, data, olddata;
1195                 int ret;
1196
1197                 if (!(header->flags & CTDB_REC_RO_HAVE_READONLY)) {
1198                         goto finished_ro;
1199                         return;
1200                 }
1201
1202                 key.dsize = state->c->keylen;
1203                 key.dptr  = state->c->data;
1204                 ret = ctdb_ltdb_lock_requeue(ctdb_db, key, hdr,
1205                                      ctdb_call_input_pkt, ctdb, false);
1206                 if (ret == -2) {
1207                         return;
1208                 }
1209                 if (ret != 0) {
1210                         DEBUG(DEBUG_ERR,(__location__ " Failed to get lock in ctdb_reply_call\n"));
1211                         return;
1212                 }
1213
1214                 ret = ctdb_ltdb_fetch(ctdb_db, key, &oldheader, state, &olddata);
1215                 if (ret != 0) {
1216                         DEBUG(DEBUG_ERR, ("Failed to fetch old record in ctdb_reply_call\n"));
1217                         ctdb_ltdb_unlock(ctdb_db, key);
1218                         goto finished_ro;
1219                 }                       
1220
1221                 if (header->rsn <= oldheader.rsn) {
1222                         ctdb_ltdb_unlock(ctdb_db, key);
1223                         goto finished_ro;
1224                 }
1225
1226                 if (c->datalen < sizeof(struct ctdb_ltdb_header)) {
1227                         DEBUG(DEBUG_ERR,(__location__ " Got FETCH_WITH_HEADER reply with too little data: %d bytes\n", c->datalen));
1228                         ctdb_ltdb_unlock(ctdb_db, key);
1229                         goto finished_ro;
1230                 }
1231
1232                 data.dsize = c->datalen - sizeof(struct ctdb_ltdb_header);
1233                 data.dptr  = &c->data[sizeof(struct ctdb_ltdb_header)];
1234                 ret = ctdb_ltdb_store(ctdb_db, key, header, data);
1235                 if (ret != 0) {
1236                         DEBUG(DEBUG_ERR, ("Failed to store new record in ctdb_reply_call\n"));
1237                         ctdb_ltdb_unlock(ctdb_db, key);
1238                         goto finished_ro;
1239                 }                       
1240
1241                 ctdb_ltdb_unlock(ctdb_db, key);
1242         }
1243 finished_ro:
1244
1245         state->call->reply_data.dptr = c->data;
1246         state->call->reply_data.dsize = c->datalen;
1247         state->call->status = c->status;
1248
1249         talloc_steal(state, c);
1250
1251         state->state = CTDB_CALL_DONE;
1252         if (state->async.fn) {
1253                 state->async.fn(state);
1254         }
1255 }
1256
1257
1258 /**
1259  * called when a CTDB_REPLY_DMASTER packet comes in
1260  *
1261  * This packet comes in from the lmaster in response to a CTDB_REQ_CALL
1262  * request packet. It means that the current dmaster wants to give us
1263  * the dmaster role.
1264  */
1265 void ctdb_reply_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
1266 {
1267         struct ctdb_reply_dmaster_old *c = (struct ctdb_reply_dmaster_old *)hdr;
1268         struct ctdb_db_context *ctdb_db;
1269         TDB_DATA key, data;
1270         uint32_t record_flags = 0;
1271         size_t len;
1272         int ret;
1273
1274         ctdb_db = find_ctdb_db(ctdb, c->db_id);
1275         if (ctdb_db == NULL) {
1276                 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_reply_dmaster\n", c->db_id));
1277                 return;
1278         }
1279         
1280         key.dptr = c->data;
1281         key.dsize = c->keylen;
1282         data.dptr = &c->data[key.dsize];
1283         data.dsize = c->datalen;
1284         len = offsetof(struct ctdb_reply_dmaster_old, data) + key.dsize + data.dsize
1285                 + sizeof(uint32_t);
1286         if (len <= c->hdr.length) {
1287                 memcpy(&record_flags, &c->data[c->keylen + c->datalen],
1288                        sizeof(record_flags));
1289         }
1290
1291         dmaster_defer_setup(ctdb_db, hdr, key);
1292
1293         ret = ctdb_ltdb_lock_requeue(ctdb_db, key, hdr,
1294                                      ctdb_call_input_pkt, ctdb, false);
1295         if (ret == -2) {
1296                 return;
1297         }
1298         if (ret != 0) {
1299                 DEBUG(DEBUG_ERR,(__location__ " Failed to get lock in ctdb_reply_dmaster\n"));
1300                 return;
1301         }
1302
1303         ctdb_become_dmaster(ctdb_db, hdr, key, data, c->rsn, record_flags);
1304 }
1305
1306
1307 /*
1308   called when a CTDB_REPLY_ERROR packet comes in
1309 */
1310 void ctdb_reply_error(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
1311 {
1312         struct ctdb_reply_error_old *c = (struct ctdb_reply_error_old *)hdr;
1313         struct ctdb_call_state *state;
1314
1315         state = reqid_find(ctdb->idr, hdr->reqid, struct ctdb_call_state);
1316         if (state == NULL) {
1317                 DEBUG(DEBUG_ERR,("pnn %u Invalid reqid %u in ctdb_reply_error\n",
1318                          ctdb->pnn, hdr->reqid));
1319                 return;
1320         }
1321
1322         if (hdr->reqid != state->reqid) {
1323                 /* we found a record  but it was the wrong one */
1324                 DEBUG(DEBUG_ERR, ("Dropped orphaned error reply with reqid:%u\n",hdr->reqid));
1325                 return;
1326         }
1327
1328         talloc_steal(state, c);
1329
1330         state->state  = CTDB_CALL_ERROR;
1331         state->errmsg = (char *)c->msg;
1332         if (state->async.fn) {
1333                 state->async.fn(state);
1334         }
1335 }
1336
1337
1338 /*
1339   destroy a ctdb_call
1340 */
1341 static int ctdb_call_destructor(struct ctdb_call_state *state)
1342 {
1343         DLIST_REMOVE(state->ctdb_db->pending_calls, state);
1344         reqid_remove(state->ctdb_db->ctdb->idr, state->reqid);
1345         return 0;
1346 }
1347
1348
1349 /*
1350   called when a ctdb_call needs to be resent after a reconfigure event
1351 */
1352 static void ctdb_call_resend(struct ctdb_call_state *state)
1353 {
1354         struct ctdb_context *ctdb = state->ctdb_db->ctdb;
1355
1356         state->generation = state->ctdb_db->generation;
1357
1358         /* use a new reqid, in case the old reply does eventually come in */
1359         reqid_remove(ctdb->idr, state->reqid);
1360         state->reqid = reqid_new(ctdb->idr, state);
1361         state->c->hdr.reqid = state->reqid;
1362
1363         /* update the generation count for this request, so its valid with the new vnn_map */
1364         state->c->hdr.generation = state->generation;
1365
1366         /* send the packet to ourselves, it will be redirected appropriately */
1367         state->c->hdr.destnode = ctdb->pnn;
1368
1369         ctdb_queue_packet(ctdb, &state->c->hdr);
1370         DEBUG(DEBUG_NOTICE,("resent ctdb_call for db %s reqid %u generation %u\n",
1371                             state->ctdb_db->db_name, state->reqid, state->generation));
1372 }
1373
1374 /*
1375   resend all pending calls on recovery
1376  */
1377 void ctdb_call_resend_db(struct ctdb_db_context *ctdb_db)
1378 {
1379         struct ctdb_call_state *state, *next;
1380
1381         for (state = ctdb_db->pending_calls; state; state = next) {
1382                 next = state->next;
1383                 ctdb_call_resend(state);
1384         }
1385 }
1386
1387 void ctdb_call_resend_all(struct ctdb_context *ctdb)
1388 {
1389         struct ctdb_db_context *ctdb_db;
1390
1391         for (ctdb_db = ctdb->db_list; ctdb_db; ctdb_db = ctdb_db->next) {
1392                 ctdb_call_resend_db(ctdb_db);
1393         }
1394 }
1395
1396 /*
1397   this allows the caller to setup a async.fn 
1398 */
1399 static void call_local_trigger(struct tevent_context *ev,
1400                                struct tevent_timer *te,
1401                                struct timeval t, void *private_data)
1402 {
1403         struct ctdb_call_state *state = talloc_get_type(private_data, struct ctdb_call_state);
1404         if (state->async.fn) {
1405                 state->async.fn(state);
1406         }
1407 }       
1408
1409
1410 /*
1411   construct an event driven local ctdb_call
1412
1413   this is used so that locally processed ctdb_call requests are processed
1414   in an event driven manner
1415 */
1416 struct ctdb_call_state *ctdb_call_local_send(struct ctdb_db_context *ctdb_db, 
1417                                              struct ctdb_call *call,
1418                                              struct ctdb_ltdb_header *header,
1419                                              TDB_DATA *data)
1420 {
1421         struct ctdb_call_state *state;
1422         struct ctdb_context *ctdb = ctdb_db->ctdb;
1423         int ret;
1424
1425         state = talloc_zero(ctdb_db, struct ctdb_call_state);
1426         CTDB_NO_MEMORY_NULL(ctdb, state);
1427
1428         talloc_steal(state, data->dptr);
1429
1430         state->state = CTDB_CALL_DONE;
1431         state->call  = talloc(state, struct ctdb_call);
1432         CTDB_NO_MEMORY_NULL(ctdb, state->call);
1433         *(state->call) = *call;
1434         state->ctdb_db = ctdb_db;
1435
1436         ret = ctdb_call_local(ctdb_db, state->call, header, state, data, true);
1437         if (ret != 0) {
1438                 DEBUG(DEBUG_DEBUG,("ctdb_call_local() failed, ignoring return code %d\n", ret));
1439         }
1440
1441         tevent_add_timer(ctdb->ev, state, timeval_zero(),
1442                          call_local_trigger, state);
1443
1444         return state;
1445 }
1446
1447
1448 /*
1449   make a remote ctdb call - async send. Called in daemon context.
1450
1451   This constructs a ctdb_call request and queues it for processing. 
1452   This call never blocks.
1453 */
1454 struct ctdb_call_state *ctdb_daemon_call_send_remote(struct ctdb_db_context *ctdb_db, 
1455                                                      struct ctdb_call *call, 
1456                                                      struct ctdb_ltdb_header *header)
1457 {
1458         uint32_t len;
1459         struct ctdb_call_state *state;
1460         struct ctdb_context *ctdb = ctdb_db->ctdb;
1461
1462         if (ctdb->methods == NULL) {
1463                 DEBUG(DEBUG_INFO,(__location__ " Failed send packet. Transport is down\n"));
1464                 return NULL;
1465         }
1466
1467         state = talloc_zero(ctdb_db, struct ctdb_call_state);
1468         CTDB_NO_MEMORY_NULL(ctdb, state);
1469         state->call = talloc(state, struct ctdb_call);
1470         CTDB_NO_MEMORY_NULL(ctdb, state->call);
1471
1472         state->reqid = reqid_new(ctdb->idr, state);
1473         state->ctdb_db = ctdb_db;
1474         talloc_set_destructor(state, ctdb_call_destructor);
1475
1476         len = offsetof(struct ctdb_req_call_old, data) + call->key.dsize + call->call_data.dsize;
1477         state->c = ctdb_transport_allocate(ctdb, state, CTDB_REQ_CALL, len, 
1478                                            struct ctdb_req_call_old);
1479         CTDB_NO_MEMORY_NULL(ctdb, state->c);
1480         state->c->hdr.destnode  = header->dmaster;
1481
1482         /* this limits us to 16k outstanding messages - not unreasonable */
1483         state->c->hdr.reqid     = state->reqid;
1484         state->c->hdr.generation = ctdb_db->generation;
1485         state->c->flags         = call->flags;
1486         state->c->db_id         = ctdb_db->db_id;
1487         state->c->callid        = call->call_id;
1488         state->c->hopcount      = 0;
1489         state->c->keylen        = call->key.dsize;
1490         state->c->calldatalen   = call->call_data.dsize;
1491         memcpy(&state->c->data[0], call->key.dptr, call->key.dsize);
1492         memcpy(&state->c->data[call->key.dsize], 
1493                call->call_data.dptr, call->call_data.dsize);
1494         *(state->call)              = *call;
1495         state->call->call_data.dptr = &state->c->data[call->key.dsize];
1496         state->call->key.dptr       = &state->c->data[0];
1497
1498         state->state  = CTDB_CALL_WAIT;
1499         state->generation = ctdb_db->generation;
1500
1501         DLIST_ADD(ctdb_db->pending_calls, state);
1502
1503         ctdb_queue_packet(ctdb, &state->c->hdr);
1504
1505         return state;
1506 }
1507
1508 /*
1509   make a remote ctdb call - async recv - called in daemon context
1510
1511   This is called when the program wants to wait for a ctdb_call to complete and get the 
1512   results. This call will block unless the call has already completed.
1513 */
1514 int ctdb_daemon_call_recv(struct ctdb_call_state *state, struct ctdb_call *call)
1515 {
1516         while (state->state < CTDB_CALL_DONE) {
1517                 tevent_loop_once(state->ctdb_db->ctdb->ev);
1518         }
1519         if (state->state != CTDB_CALL_DONE) {
1520                 ctdb_set_error(state->ctdb_db->ctdb, "%s", state->errmsg);
1521                 talloc_free(state);
1522                 return -1;
1523         }
1524
1525         if (state->call->reply_data.dsize) {
1526                 call->reply_data.dptr = talloc_memdup(call,
1527                                                       state->call->reply_data.dptr,
1528                                                       state->call->reply_data.dsize);
1529                 call->reply_data.dsize = state->call->reply_data.dsize;
1530         } else {
1531                 call->reply_data.dptr = NULL;
1532                 call->reply_data.dsize = 0;
1533         }
1534         call->status = state->call->status;
1535         talloc_free(state);
1536         return 0;
1537 }
1538
1539
1540 /* 
1541    send a keepalive packet to the other node
1542 */
1543 void ctdb_send_keepalive(struct ctdb_context *ctdb, uint32_t destnode)
1544 {
1545         struct ctdb_req_keepalive_old *r;
1546         
1547         if (ctdb->methods == NULL) {
1548                 DEBUG(DEBUG_INFO,(__location__ " Failed to send keepalive. Transport is DOWN\n"));
1549                 return;
1550         }
1551
1552         r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REQ_KEEPALIVE,
1553                                     sizeof(struct ctdb_req_keepalive_old), 
1554                                     struct ctdb_req_keepalive_old);
1555         CTDB_NO_MEMORY_FATAL(ctdb, r);
1556         r->hdr.destnode  = destnode;
1557         r->hdr.reqid     = 0;
1558         
1559         CTDB_INCREMENT_STAT(ctdb, keepalive_packets_sent);
1560
1561         ctdb_queue_packet(ctdb, &r->hdr);
1562
1563         talloc_free(r);
1564 }
1565
1566
1567
1568 struct revokechild_deferred_call {
1569         struct ctdb_context *ctdb;
1570         struct ctdb_req_header *hdr;
1571         deferred_requeue_fn fn;
1572         void *ctx;
1573 };
1574
1575 struct revokechild_handle {
1576         struct revokechild_handle *next, *prev;
1577         struct ctdb_context *ctdb;
1578         struct ctdb_db_context *ctdb_db;
1579         struct tevent_fd *fde;
1580         int status;
1581         int fd[2];
1582         pid_t child;
1583         TDB_DATA key;
1584 };
1585
1586 struct revokechild_requeue_handle {
1587         struct ctdb_context *ctdb;
1588         struct ctdb_req_header *hdr;
1589         deferred_requeue_fn fn;
1590         void *ctx;
1591 };
1592
1593 static void deferred_call_requeue(struct tevent_context *ev,
1594                                   struct tevent_timer *te,
1595                                   struct timeval t, void *private_data)
1596 {
1597         struct revokechild_requeue_handle *requeue_handle = talloc_get_type(private_data, struct revokechild_requeue_handle);
1598
1599         requeue_handle->fn(requeue_handle->ctx, requeue_handle->hdr);
1600         talloc_free(requeue_handle);
1601 }
1602
1603 static int deferred_call_destructor(struct revokechild_deferred_call *deferred_call)
1604 {
1605         struct ctdb_context *ctdb = deferred_call->ctdb;
1606         struct revokechild_requeue_handle *requeue_handle = talloc(ctdb, struct revokechild_requeue_handle);
1607
1608         requeue_handle->ctdb = ctdb;
1609         requeue_handle->hdr  = deferred_call->hdr;
1610         requeue_handle->fn   = deferred_call->fn;
1611         requeue_handle->ctx  = deferred_call->ctx;
1612         talloc_steal(requeue_handle, requeue_handle->hdr);
1613
1614         /* Always delay revoke requests.  Either wait for the read/write
1615          * operation to complete, or if revoking failed wait for recovery to
1616          * complete
1617          */
1618         tevent_add_timer(ctdb->ev, requeue_handle,
1619                          timeval_current_ofs(1, 0),
1620                          deferred_call_requeue, requeue_handle);
1621
1622         return 0;
1623 }
1624
1625
1626 static int revokechild_destructor(struct revokechild_handle *rc)
1627 {
1628         if (rc->fde != NULL) {
1629                 talloc_free(rc->fde);
1630         }
1631
1632         if (rc->fd[0] != -1) {
1633                 close(rc->fd[0]);
1634         }
1635         if (rc->fd[1] != -1) {
1636                 close(rc->fd[1]);
1637         }
1638         ctdb_kill(rc->ctdb, rc->child, SIGKILL);
1639
1640         DLIST_REMOVE(rc->ctdb_db->revokechild_active, rc);
1641         return 0;
1642 }
1643
1644 static void revokechild_handler(struct tevent_context *ev,
1645                                 struct tevent_fd *fde,
1646                                 uint16_t flags, void *private_data)
1647 {
1648         struct revokechild_handle *rc = talloc_get_type(private_data, 
1649                                                      struct revokechild_handle);
1650         int ret;
1651         char c;
1652
1653         ret = sys_read(rc->fd[0], &c, 1);
1654         if (ret != 1) {
1655                 DEBUG(DEBUG_ERR,("Failed to read status from revokechild. errno:%d\n", errno));
1656                 rc->status = -1;
1657                 talloc_free(rc);
1658                 return;
1659         }
1660         if (c != 0) {
1661                 DEBUG(DEBUG_ERR,("revokechild returned failure. status:%d\n", c));
1662                 rc->status = -1;
1663                 talloc_free(rc);
1664                 return;
1665         }
1666
1667         talloc_free(rc);
1668 }
1669
1670 struct ctdb_revoke_state {
1671         struct ctdb_db_context *ctdb_db;
1672         TDB_DATA key;
1673         struct ctdb_ltdb_header *header;
1674         TDB_DATA data;
1675         int count;
1676         int status;
1677         int finished;
1678 };
1679
1680 static void update_record_cb(struct ctdb_client_control_state *state)
1681 {
1682         struct ctdb_revoke_state *revoke_state;
1683         int ret;
1684         int32_t res;
1685
1686         if (state == NULL) {
1687                 return;
1688         }
1689         revoke_state = state->async.private_data;
1690
1691         state->async.fn = NULL;
1692         ret = ctdb_control_recv(state->ctdb, state, state, NULL, &res, NULL);
1693         if ((ret != 0) || (res != 0)) {
1694                 DEBUG(DEBUG_ERR,("Recv for revoke update record failed ret:%d res:%d\n", ret, res));
1695                 revoke_state->status = -1;
1696         }
1697
1698         revoke_state->count--;
1699         if (revoke_state->count <= 0) {
1700                 revoke_state->finished = 1;
1701         }
1702 }
1703
1704 static void revoke_send_cb(struct ctdb_context *ctdb, uint32_t pnn, void *private_data)
1705 {
1706         struct ctdb_revoke_state *revoke_state = private_data;
1707         struct ctdb_client_control_state *state;
1708
1709         state = ctdb_ctrl_updaterecord_send(ctdb, revoke_state, timeval_current_ofs(ctdb->tunable.control_timeout,0), pnn, revoke_state->ctdb_db, revoke_state->key, revoke_state->header, revoke_state->data);
1710         if (state == NULL) {
1711                 DEBUG(DEBUG_ERR,("Failure to send update record to revoke readonly delegation\n"));
1712                 revoke_state->status = -1;
1713                 return;
1714         }
1715         state->async.fn           = update_record_cb;
1716         state->async.private_data = revoke_state;
1717
1718         revoke_state->count++;
1719
1720 }
1721
1722 static void ctdb_revoke_timeout_handler(struct tevent_context *ev,
1723                                         struct tevent_timer *te,
1724                                         struct timeval yt, void *private_data)
1725 {
1726         struct ctdb_revoke_state *state = private_data;
1727
1728         DEBUG(DEBUG_ERR,("Timed out waiting for revoke to finish\n"));
1729         state->finished = 1;
1730         state->status   = -1;
1731 }
1732
1733 static int ctdb_revoke_all_delegations(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, TDB_DATA tdata, TDB_DATA key, struct ctdb_ltdb_header *header, TDB_DATA data)
1734 {
1735         struct ctdb_revoke_state *state = talloc_zero(ctdb, struct ctdb_revoke_state);
1736         struct ctdb_ltdb_header new_header;
1737         TDB_DATA new_data;
1738
1739         state->ctdb_db = ctdb_db;
1740         state->key     = key;
1741         state->header  = header;
1742         state->data    = data;
1743  
1744         ctdb_trackingdb_traverse(ctdb, tdata, revoke_send_cb, state);
1745
1746         tevent_add_timer(ctdb->ev, state,
1747                          timeval_current_ofs(ctdb->tunable.control_timeout, 0),
1748                          ctdb_revoke_timeout_handler, state);
1749
1750         while (state->finished == 0) {
1751                 tevent_loop_once(ctdb->ev);
1752         }
1753
1754         if (ctdb_ltdb_lock(ctdb_db, key) != 0) {
1755                 DEBUG(DEBUG_ERR,("Failed to chainlock the database in revokechild\n"));
1756                 talloc_free(state);
1757                 return -1;
1758         }
1759         if (ctdb_ltdb_fetch(ctdb_db, key, &new_header, state, &new_data) != 0) {
1760                 ctdb_ltdb_unlock(ctdb_db, key);
1761                 DEBUG(DEBUG_ERR,("Failed for fetch tdb record in revokechild\n"));
1762                 talloc_free(state);
1763                 return -1;
1764         }
1765         header->rsn++;
1766         if (new_header.rsn > header->rsn) {
1767                 ctdb_ltdb_unlock(ctdb_db, key);
1768                 DEBUG(DEBUG_ERR,("RSN too high in tdb record in revokechild\n"));
1769                 talloc_free(state);
1770                 return -1;
1771         }
1772         if ( (new_header.flags & (CTDB_REC_RO_REVOKING_READONLY|CTDB_REC_RO_HAVE_DELEGATIONS)) != (CTDB_REC_RO_REVOKING_READONLY|CTDB_REC_RO_HAVE_DELEGATIONS) ) {
1773                 ctdb_ltdb_unlock(ctdb_db, key);
1774                 DEBUG(DEBUG_ERR,("Flags are wrong in tdb record in revokechild\n"));
1775                 talloc_free(state);
1776                 return -1;
1777         }
1778
1779         /*
1780          * If revoke on all nodes succeed, revoke is complete.  Otherwise,
1781          * remove CTDB_REC_RO_REVOKING_READONLY flag and retry.
1782          */
1783         if (state->status == 0) {
1784                 new_header.rsn++;
1785                 new_header.flags |= CTDB_REC_RO_REVOKE_COMPLETE;
1786         } else {
1787                 DEBUG(DEBUG_NOTICE, ("Revoke all delegations failed, retrying.\n"));
1788                 new_header.flags &= ~CTDB_REC_RO_REVOKING_READONLY;
1789         }
1790         if (ctdb_ltdb_store(ctdb_db, key, &new_header, new_data) != 0) {
1791                 ctdb_ltdb_unlock(ctdb_db, key);
1792                 DEBUG(DEBUG_ERR,("Failed to write new record in revokechild\n"));
1793                 talloc_free(state);
1794                 return -1;
1795         }
1796         ctdb_ltdb_unlock(ctdb_db, key);
1797
1798         talloc_free(state);
1799         return 0;
1800 }
1801
1802
1803 int ctdb_start_revoke_ro_record(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, TDB_DATA key, struct ctdb_ltdb_header *header, TDB_DATA data)
1804 {
1805         TDB_DATA tdata;
1806         struct revokechild_handle *rc;
1807         pid_t parent = getpid();
1808         int ret;
1809
1810         header->flags &= ~(CTDB_REC_RO_REVOKING_READONLY|CTDB_REC_RO_HAVE_DELEGATIONS|CTDB_REC_RO_HAVE_READONLY);
1811         header->flags |= CTDB_REC_FLAG_MIGRATED_WITH_DATA;
1812         header->rsn   -= 1;
1813
1814         if ((rc = talloc_zero(ctdb_db, struct revokechild_handle)) == NULL) {
1815                 DEBUG(DEBUG_ERR,("Failed to allocate revokechild_handle\n"));
1816                 return -1;
1817         }
1818
1819         tdata = tdb_fetch(ctdb_db->rottdb, key);
1820         if (tdata.dsize > 0) {
1821                 uint8_t *tmp;
1822
1823                 tmp = tdata.dptr;
1824                 tdata.dptr = talloc_memdup(rc, tdata.dptr, tdata.dsize);
1825                 free(tmp);
1826         }
1827
1828         rc->status    = 0;
1829         rc->ctdb      = ctdb;
1830         rc->ctdb_db   = ctdb_db;
1831         rc->fd[0]     = -1;
1832         rc->fd[1]     = -1;
1833
1834         talloc_set_destructor(rc, revokechild_destructor);
1835
1836         rc->key.dsize = key.dsize;
1837         rc->key.dptr  = talloc_memdup(rc, key.dptr, key.dsize);
1838         if (rc->key.dptr == NULL) {
1839                 DEBUG(DEBUG_ERR,("Failed to allocate key for revokechild_handle\n"));
1840                 talloc_free(rc);
1841                 return -1;
1842         }
1843
1844         ret = pipe(rc->fd);
1845         if (ret != 0) {
1846                 DEBUG(DEBUG_ERR,("Failed to allocate key for revokechild_handle\n"));
1847                 talloc_free(rc);
1848                 return -1;
1849         }
1850
1851
1852         rc->child = ctdb_fork(ctdb);
1853         if (rc->child == (pid_t)-1) {
1854                 DEBUG(DEBUG_ERR,("Failed to fork child for revokechild\n"));
1855                 talloc_free(rc);
1856                 return -1;
1857         }
1858
1859         if (rc->child == 0) {
1860                 char c = 0;
1861                 close(rc->fd[0]);
1862
1863                 prctl_set_comment("ctdb_revokechild");
1864                 if (switch_from_server_to_client(ctdb) != 0) {
1865                         DEBUG(DEBUG_ERR,("Failed to switch from server to client for revokechild process\n"));
1866                         c = 1;
1867                         goto child_finished;
1868                 }
1869
1870                 c = ctdb_revoke_all_delegations(ctdb, ctdb_db, tdata, key, header, data);
1871
1872 child_finished:
1873                 sys_write(rc->fd[1], &c, 1);
1874                 ctdb_wait_for_process_to_exit(parent);
1875                 _exit(0);
1876         }
1877
1878         close(rc->fd[1]);
1879         rc->fd[1] = -1;
1880         set_close_on_exec(rc->fd[0]);
1881
1882         /* This is an active revokechild child process */
1883         DLIST_ADD_END(ctdb_db->revokechild_active, rc);
1884
1885         rc->fde = tevent_add_fd(ctdb->ev, rc, rc->fd[0], TEVENT_FD_READ,
1886                                 revokechild_handler, (void *)rc);
1887         if (rc->fde == NULL) {
1888                 DEBUG(DEBUG_ERR,("Failed to set up fd event for revokechild process\n"));
1889                 talloc_free(rc);
1890         }
1891         tevent_fd_set_auto_close(rc->fde);
1892
1893         return 0;
1894 }
1895
1896 int ctdb_add_revoke_deferred_call(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, TDB_DATA key, struct ctdb_req_header *hdr, deferred_requeue_fn fn, void *call_context)
1897 {
1898         struct revokechild_handle *rc;
1899         struct revokechild_deferred_call *deferred_call;
1900
1901         for (rc = ctdb_db->revokechild_active; rc; rc = rc->next) {
1902                 if (rc->key.dsize == 0) {
1903                         continue;
1904                 }
1905                 if (rc->key.dsize != key.dsize) {
1906                         continue;
1907                 }
1908                 if (!memcmp(rc->key.dptr, key.dptr, key.dsize)) {
1909                         break;
1910                 }
1911         }
1912
1913         if (rc == NULL) {
1914                 DEBUG(DEBUG_ERR,("Failed to add deferred call to revoke list. revoke structure not found\n"));
1915                 return -1;
1916         }
1917
1918         deferred_call = talloc(rc, struct revokechild_deferred_call);
1919         if (deferred_call == NULL) {
1920                 DEBUG(DEBUG_ERR,("Failed to allocate deferred call structure for revoking record\n"));
1921                 return -1;
1922         }
1923
1924         deferred_call->ctdb = ctdb;
1925         deferred_call->hdr  = hdr;
1926         deferred_call->fn   = fn;
1927         deferred_call->ctx  = call_context;
1928
1929         talloc_set_destructor(deferred_call, deferred_call_destructor);
1930         talloc_steal(deferred_call, hdr);
1931
1932         return 0;
1933 }
1934
1935 static void ctdb_migration_count_handler(TDB_DATA key, uint64_t counter,
1936                                          void *private_data)
1937 {
1938         struct ctdb_db_context *ctdb_db = talloc_get_type_abort(
1939                 private_data, struct ctdb_db_context);
1940         int value;
1941
1942         value = (counter < INT_MAX ? counter : INT_MAX);
1943         ctdb_update_db_stat_hot_keys(ctdb_db, key, value);
1944 }
1945
1946 static void ctdb_migration_cleandb_event(struct tevent_context *ev,
1947                                          struct tevent_timer *te,
1948                                          struct timeval current_time,
1949                                          void *private_data)
1950 {
1951         struct ctdb_db_context *ctdb_db = talloc_get_type_abort(
1952                 private_data, struct ctdb_db_context);
1953
1954         if (ctdb_db->migratedb == NULL) {
1955                 return;
1956         }
1957
1958         hash_count_expire(ctdb_db->migratedb, NULL);
1959
1960         te = tevent_add_timer(ctdb_db->ctdb->ev, ctdb_db->migratedb,
1961                               tevent_timeval_current_ofs(10, 0),
1962                               ctdb_migration_cleandb_event, ctdb_db);
1963         if (te == NULL) {
1964                 DEBUG(DEBUG_ERR,
1965                       ("Memory error in migration cleandb event for %s\n",
1966                        ctdb_db->db_name));
1967                 TALLOC_FREE(ctdb_db->migratedb);
1968         }
1969 }
1970
1971 int ctdb_migration_init(struct ctdb_db_context *ctdb_db)
1972 {
1973         struct timeval one_second = { 1, 0 };
1974         struct tevent_timer *te;
1975         int ret;
1976
1977         if (ctdb_db->persistent) {
1978                 return 0;
1979         }
1980
1981         ret = hash_count_init(ctdb_db, one_second,
1982                               ctdb_migration_count_handler, ctdb_db,
1983                               &ctdb_db->migratedb);
1984         if (ret != 0) {
1985                 DEBUG(DEBUG_ERR,
1986                       ("Memory error in migration init for %s\n",
1987                        ctdb_db->db_name));
1988                 return -1;
1989         }
1990
1991         te = tevent_add_timer(ctdb_db->ctdb->ev, ctdb_db->migratedb,
1992                               tevent_timeval_current_ofs(10, 0),
1993                               ctdb_migration_cleandb_event, ctdb_db);
1994         if (te == NULL) {
1995                 DEBUG(DEBUG_ERR,
1996                       ("Memory error in migration init for %s\n",
1997                        ctdb_db->db_name));
1998                 TALLOC_FREE(ctdb_db->migratedb);
1999                 return -1;
2000         }
2001
2002         return 0;
2003 }