ctdb_call: don't bump the rsn in ctdb_become_dmaster() any more
[ctdb.git] / server / ctdb_call.c
1 /* 
2    ctdb_call protocol code
3
4    Copyright (C) Andrew Tridgell  2006
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19 /*
20   see http://wiki.samba.org/index.php/Samba_%26_Clustering for
21   protocol design and packet details
22 */
23 #include "includes.h"
24 #include "lib/tdb/include/tdb.h"
25 #include "lib/util/dlinklist.h"
26 #include "system/network.h"
27 #include "system/filesys.h"
28 #include "../include/ctdb_private.h"
29 #include "../common/rb_tree.h"
30
31 struct ctdb_sticky_record {
32         struct ctdb_context *ctdb;
33         struct ctdb_db_context *ctdb_db;
34         TDB_CONTEXT *pindown;
35 };
36
37 /*
38   find the ctdb_db from a db index
39  */
40  struct ctdb_db_context *find_ctdb_db(struct ctdb_context *ctdb, uint32_t id)
41 {
42         struct ctdb_db_context *ctdb_db;
43
44         for (ctdb_db=ctdb->db_list; ctdb_db; ctdb_db=ctdb_db->next) {
45                 if (ctdb_db->db_id == id) {
46                         break;
47                 }
48         }
49         return ctdb_db;
50 }
51
52 /*
53   a varient of input packet that can be used in lock requeue
54 */
55 static void ctdb_call_input_pkt(void *p, struct ctdb_req_header *hdr)
56 {
57         struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context);
58         ctdb_input_pkt(ctdb, hdr);
59 }
60
61
62 /*
63   send an error reply
64 */
65 static void ctdb_send_error(struct ctdb_context *ctdb, 
66                             struct ctdb_req_header *hdr, uint32_t status,
67                             const char *fmt, ...) PRINTF_ATTRIBUTE(4,5);
68 static void ctdb_send_error(struct ctdb_context *ctdb, 
69                             struct ctdb_req_header *hdr, uint32_t status,
70                             const char *fmt, ...)
71 {
72         va_list ap;
73         struct ctdb_reply_error *r;
74         char *msg;
75         int msglen, len;
76
77         if (ctdb->methods == NULL) {
78                 DEBUG(DEBUG_INFO,(__location__ " Failed to send error. Transport is DOWN\n"));
79                 return;
80         }
81
82         va_start(ap, fmt);
83         msg = talloc_vasprintf(ctdb, fmt, ap);
84         if (msg == NULL) {
85                 ctdb_fatal(ctdb, "Unable to allocate error in ctdb_send_error\n");
86         }
87         va_end(ap);
88
89         msglen = strlen(msg)+1;
90         len = offsetof(struct ctdb_reply_error, msg);
91         r = ctdb_transport_allocate(ctdb, msg, CTDB_REPLY_ERROR, len + msglen, 
92                                     struct ctdb_reply_error);
93         CTDB_NO_MEMORY_FATAL(ctdb, r);
94
95         r->hdr.destnode  = hdr->srcnode;
96         r->hdr.reqid     = hdr->reqid;
97         r->status        = status;
98         r->msglen        = msglen;
99         memcpy(&r->msg[0], msg, msglen);
100
101         ctdb_queue_packet(ctdb, &r->hdr);
102
103         talloc_free(msg);
104 }
105
106
107 /**
108  * send a redirect reply
109  *
110  * The logic behind this function is this:
111  *
112  * A client wants to grab a record and sends a CTDB_REQ_CALL packet
113  * to its local ctdb (ctdb_request_call). If the node is not itself
114  * the record's DMASTER, it first redirects the packet to  the
115  * record's LMASTER. The LMASTER then redirects the call packet to
116  * the current DMASTER. But there is a race: The record may have
117  * been migrated off the DMASTER while the redirected packet is
118  * on the wire (or in the local queue). So in case the record has
119  * migrated off the new destinaton of the call packet, instead of
120  * going back to the LMASTER to get the new DMASTER, we try to
121  * reduce round-trips by first chasing the record a couple of times
122  * before giving up the direct chase and finally going back to the
123  * LMASTER (again). Note that this works because of this: When
124  * a record is migrated off a node, then the new DMASTER is stored
125  * in the record's copy on the former DMASTER.
126  *
127  * The maximum number of attempts for direct chase to make before
128  * going back to the LMASTER is configurable by the tunable
129  * "MaxRedirectCount".
130  */
131 static void ctdb_call_send_redirect(struct ctdb_context *ctdb,
132                                     struct ctdb_db_context *ctdb_db,
133                                     TDB_DATA key,
134                                     struct ctdb_req_call *c, 
135                                     struct ctdb_ltdb_header *header)
136 {
137         
138         uint32_t lmaster = ctdb_lmaster(ctdb, &key);
139
140         c->hdr.destnode = lmaster;
141         if (ctdb->pnn == lmaster) {
142                 c->hdr.destnode = header->dmaster;
143         }
144         c->hopcount++;
145
146         if (c->hopcount%100 == 99) {
147                 DEBUG(DEBUG_WARNING,("High hopcount %d dbid:0x%08x "
148                         "key:0x%08x pnn:%d src:%d lmaster:%d "
149                         "header->dmaster:%d dst:%d\n",
150                         c->hopcount, ctdb_db->db_id, ctdb_hash(&key),
151                         ctdb->pnn, c->hdr.srcnode, lmaster,
152                         header->dmaster, c->hdr.destnode));
153         }
154
155         ctdb_queue_packet(ctdb, &c->hdr);
156 }
157
158
159 /*
160   send a dmaster reply
161
162   caller must have the chainlock before calling this routine. Caller must be
163   the lmaster
164 */
165 static void ctdb_send_dmaster_reply(struct ctdb_db_context *ctdb_db,
166                                     struct ctdb_ltdb_header *header,
167                                     TDB_DATA key, TDB_DATA data,
168                                     uint32_t new_dmaster,
169                                     uint32_t reqid)
170 {
171         struct ctdb_context *ctdb = ctdb_db->ctdb;
172         struct ctdb_reply_dmaster *r;
173         int ret, len;
174         TALLOC_CTX *tmp_ctx;
175
176         if (ctdb->pnn != ctdb_lmaster(ctdb, &key)) {
177                 DEBUG(DEBUG_ALERT,(__location__ " Caller is not lmaster!\n"));
178                 return;
179         }
180
181         header->dmaster = new_dmaster;
182         ret = ctdb_ltdb_store(ctdb_db, key, header, data);
183         if (ret != 0) {
184                 ctdb_fatal(ctdb, "ctdb_send_dmaster_reply unable to update dmaster");
185                 return;
186         }
187
188         if (ctdb->methods == NULL) {
189                 ctdb_fatal(ctdb, "ctdb_send_dmaster_reply cant update dmaster since transport is down");
190                 return;
191         }
192
193         /* put the packet on a temporary context, allowing us to safely free
194            it below even if ctdb_reply_dmaster() has freed it already */
195         tmp_ctx = talloc_new(ctdb);
196
197         /* send the CTDB_REPLY_DMASTER */
198         len = offsetof(struct ctdb_reply_dmaster, data) + key.dsize + data.dsize + sizeof(uint32_t);
199         r = ctdb_transport_allocate(ctdb, tmp_ctx, CTDB_REPLY_DMASTER, len,
200                                     struct ctdb_reply_dmaster);
201         CTDB_NO_MEMORY_FATAL(ctdb, r);
202
203         r->hdr.destnode  = new_dmaster;
204         r->hdr.reqid     = reqid;
205         r->rsn           = header->rsn;
206         r->keylen        = key.dsize;
207         r->datalen       = data.dsize;
208         r->db_id         = ctdb_db->db_id;
209         memcpy(&r->data[0], key.dptr, key.dsize);
210         memcpy(&r->data[key.dsize], data.dptr, data.dsize);
211         memcpy(&r->data[key.dsize+data.dsize], &header->flags, sizeof(uint32_t));
212
213         ctdb_queue_packet(ctdb, &r->hdr);
214
215         talloc_free(tmp_ctx);
216 }
217
218 /*
219   send a dmaster request (give another node the dmaster for a record)
220
221   This is always sent to the lmaster, which ensures that the lmaster
222   always knows who the dmaster is. The lmaster will then send a
223   CTDB_REPLY_DMASTER to the new dmaster
224 */
225 static void ctdb_call_send_dmaster(struct ctdb_db_context *ctdb_db, 
226                                    struct ctdb_req_call *c, 
227                                    struct ctdb_ltdb_header *header,
228                                    TDB_DATA *key, TDB_DATA *data)
229 {
230         struct ctdb_req_dmaster *r;
231         struct ctdb_context *ctdb = ctdb_db->ctdb;
232         int len;
233         uint32_t lmaster = ctdb_lmaster(ctdb, key);
234
235         if (ctdb->methods == NULL) {
236                 ctdb_fatal(ctdb, "Failed ctdb_call_send_dmaster since transport is down");
237                 return;
238         }
239
240         if (data->dsize != 0) {
241                 header->flags |= CTDB_REC_FLAG_MIGRATED_WITH_DATA;
242         }
243
244         if (lmaster == ctdb->pnn) {
245                 ctdb_send_dmaster_reply(ctdb_db, header, *key, *data, 
246                                         c->hdr.srcnode, c->hdr.reqid);
247                 return;
248         }
249         
250         len = offsetof(struct ctdb_req_dmaster, data) + key->dsize + data->dsize
251                         + sizeof(uint32_t);
252         r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REQ_DMASTER, len, 
253                                     struct ctdb_req_dmaster);
254         CTDB_NO_MEMORY_FATAL(ctdb, r);
255         r->hdr.destnode  = lmaster;
256         r->hdr.reqid     = c->hdr.reqid;
257         r->db_id         = c->db_id;
258         r->rsn           = header->rsn;
259         r->dmaster       = c->hdr.srcnode;
260         r->keylen        = key->dsize;
261         r->datalen       = data->dsize;
262         memcpy(&r->data[0], key->dptr, key->dsize);
263         memcpy(&r->data[key->dsize], data->dptr, data->dsize);
264         memcpy(&r->data[key->dsize + data->dsize], &header->flags, sizeof(uint32_t));
265
266         header->dmaster = c->hdr.srcnode;
267         if (ctdb_ltdb_store(ctdb_db, *key, header, *data) != 0) {
268                 ctdb_fatal(ctdb, "Failed to store record in ctdb_call_send_dmaster");
269         }
270         
271         ctdb_queue_packet(ctdb, &r->hdr);
272
273         talloc_free(r);
274 }
275
276 static void ctdb_sticky_pindown_timeout(struct event_context *ev, struct timed_event *te, 
277                                        struct timeval t, void *private_data)
278 {
279         struct ctdb_sticky_record *sr = talloc_get_type(private_data, 
280                                                        struct ctdb_sticky_record);
281
282         DEBUG(DEBUG_ERR,("Pindown timeout db:%s  unstick record\n", sr->ctdb_db->db_name));
283         if (sr->pindown != NULL) {
284                 talloc_free(sr->pindown);
285                 sr->pindown = NULL;
286         }
287 }
288
289 static int
290 ctdb_set_sticky_pindown(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, TDB_DATA key)
291 {
292         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
293         uint32_t *k;
294         struct ctdb_sticky_record *sr;
295
296         k = talloc_zero_size(tmp_ctx, ((key.dsize + 3) & 0xfffffffc) + 4);
297         if (k == NULL) {
298                 DEBUG(DEBUG_ERR,("Failed to allocate key for sticky record\n"));
299                 talloc_free(tmp_ctx);
300                 return -1;
301         }
302
303         k[0] = (key.dsize + 3) / 4 + 1;
304         memcpy(&k[1], key.dptr, key.dsize);
305
306         sr = trbt_lookuparray32(ctdb_db->sticky_records, k[0], &k[0]);
307         if (sr == NULL) {
308                 talloc_free(tmp_ctx);
309                 return 0;
310         }
311
312         talloc_free(tmp_ctx);
313
314         if (sr->pindown == NULL) {
315                 DEBUG(DEBUG_ERR,("Pinning down record in %s for %d ms\n", ctdb_db->db_name, ctdb->tunable.sticky_pindown));
316                 sr->pindown = talloc_new(sr);
317                 if (sr->pindown == NULL) {
318                         DEBUG(DEBUG_ERR,("Failed to allocate pindown context for sticky record\n"));
319                         return -1;
320                 }
321                 event_add_timed(ctdb->ev, sr->pindown, timeval_current_ofs(ctdb->tunable.sticky_pindown / 1000, (ctdb->tunable.sticky_pindown * 1000) % 1000000), ctdb_sticky_pindown_timeout, sr);
322         }
323
324         return 0;
325 }
326
327 /*
328   called when a CTDB_REPLY_DMASTER packet comes in, or when the lmaster
329   gets a CTDB_REQUEST_DMASTER for itself. We become the dmaster.
330
331   must be called with the chainlock held. This function releases the chainlock
332 */
333 static void ctdb_become_dmaster(struct ctdb_db_context *ctdb_db,
334                                 struct ctdb_req_header *hdr,
335                                 TDB_DATA key, TDB_DATA data,
336                                 uint64_t rsn, uint32_t record_flags)
337 {
338         struct ctdb_call_state *state;
339         struct ctdb_context *ctdb = ctdb_db->ctdb;
340         struct ctdb_ltdb_header header;
341         int ret;
342
343         DEBUG(DEBUG_DEBUG,("pnn %u dmaster response %08x\n", ctdb->pnn, ctdb_hash(&key)));
344
345         ZERO_STRUCT(header);
346         header.rsn = rsn;
347         header.dmaster = ctdb->pnn;
348         header.flags = record_flags;
349
350         state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_call_state);
351
352         if (state) {
353                 if (state->call->flags & CTDB_CALL_FLAG_VACUUM_MIGRATION) {
354                         /*
355                          * We temporarily add the VACUUM_MIGRATED flag to
356                          * the record flags, so that ctdb_ltdb_store can
357                          * decide whether the record should be stored or
358                          * deleted.
359                          */
360                         header.flags |= CTDB_REC_FLAG_VACUUM_MIGRATED;
361                 }
362         }
363
364         if (ctdb_ltdb_store(ctdb_db, key, &header, data) != 0) {
365                 ctdb_fatal(ctdb, "ctdb_reply_dmaster store failed\n");
366
367                 ret = ctdb_ltdb_unlock(ctdb_db, key);
368                 if (ret != 0) {
369                         DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
370                 }
371                 return;
372         }
373
374         /* we just became DMASTER and this database is "sticky",
375            see if the record is flagged as "hot" and set up a pin-down
376            context to stop migrations for a little while if so
377         */
378         if (ctdb_db->sticky) {
379                 ctdb_set_sticky_pindown(ctdb, ctdb_db, key);
380         }
381
382         if (state == NULL) {
383                 DEBUG(DEBUG_ERR,("pnn %u Invalid reqid %u in ctdb_become_dmaster from node %u\n",
384                          ctdb->pnn, hdr->reqid, hdr->srcnode));
385
386                 ret = ctdb_ltdb_unlock(ctdb_db, key);
387                 if (ret != 0) {
388                         DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
389                 }
390                 return;
391         }
392
393         if (key.dsize != state->call->key.dsize || memcmp(key.dptr, state->call->key.dptr, key.dsize)) {
394                 DEBUG(DEBUG_ERR, ("Got bogus DMASTER packet reqid:%u from node %u. Key does not match key held in matching idr.\n", hdr->reqid, hdr->srcnode));
395
396                 ret = ctdb_ltdb_unlock(ctdb_db, key);
397                 if (ret != 0) {
398                         DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
399                 }
400                 return;
401         }
402
403         if (hdr->reqid != state->reqid) {
404                 /* we found a record  but it was the wrong one */
405                 DEBUG(DEBUG_ERR, ("Dropped orphan in ctdb_become_dmaster with reqid:%u\n from node %u", hdr->reqid, hdr->srcnode));
406
407                 ret = ctdb_ltdb_unlock(ctdb_db, key);
408                 if (ret != 0) {
409                         DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
410                 }
411                 return;
412         }
413
414         ctdb_call_local(ctdb_db, state->call, &header, state, &data, true, ctdb->pnn);
415
416         ret = ctdb_ltdb_unlock(ctdb_db, state->call->key);
417         if (ret != 0) {
418                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
419         }
420
421         state->state = CTDB_CALL_DONE;
422         if (state->async.fn) {
423                 state->async.fn(state);
424         }
425 }
426
427
428
429 /*
430   called when a CTDB_REQ_DMASTER packet comes in
431
432   this comes into the lmaster for a record when the current dmaster
433   wants to give up the dmaster role and give it to someone else
434 */
435 void ctdb_request_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
436 {
437         struct ctdb_req_dmaster *c = (struct ctdb_req_dmaster *)hdr;
438         TDB_DATA key, data, data2;
439         struct ctdb_ltdb_header header;
440         struct ctdb_db_context *ctdb_db;
441         uint32_t record_flags = 0;
442         size_t len;
443         int ret;
444
445         key.dptr = c->data;
446         key.dsize = c->keylen;
447         data.dptr = c->data + c->keylen;
448         data.dsize = c->datalen;
449         len = offsetof(struct ctdb_req_dmaster, data) + key.dsize + data.dsize
450                         + sizeof(uint32_t);
451         if (len <= c->hdr.length) {
452                 record_flags = *(uint32_t *)&c->data[c->keylen + c->datalen];
453         }
454
455         ctdb_db = find_ctdb_db(ctdb, c->db_id);
456         if (!ctdb_db) {
457                 ctdb_send_error(ctdb, hdr, -1,
458                                 "Unknown database in request. db_id==0x%08x",
459                                 c->db_id);
460                 return;
461         }
462         
463         /* fetch the current record */
464         ret = ctdb_ltdb_lock_fetch_requeue(ctdb_db, key, &header, hdr, &data2,
465                                            ctdb_call_input_pkt, ctdb, false);
466         if (ret == -1) {
467                 ctdb_fatal(ctdb, "ctdb_req_dmaster failed to fetch record");
468                 return;
469         }
470         if (ret == -2) {
471                 DEBUG(DEBUG_INFO,(__location__ " deferring ctdb_request_dmaster\n"));
472                 return;
473         }
474
475         if (ctdb_lmaster(ctdb, &key) != ctdb->pnn) {
476                 DEBUG(DEBUG_ALERT,("pnn %u dmaster request to non-lmaster lmaster=%u gen=%u curgen=%u\n",
477                          ctdb->pnn, ctdb_lmaster(ctdb, &key), 
478                          hdr->generation, ctdb->vnn_map->generation));
479                 ctdb_fatal(ctdb, "ctdb_req_dmaster to non-lmaster");
480         }
481
482         DEBUG(DEBUG_DEBUG,("pnn %u dmaster request on %08x for %u from %u\n", 
483                  ctdb->pnn, ctdb_hash(&key), c->dmaster, c->hdr.srcnode));
484
485         /* its a protocol error if the sending node is not the current dmaster */
486         if (header.dmaster != hdr->srcnode) {
487                 DEBUG(DEBUG_ALERT,("pnn %u dmaster request for new-dmaster %u from non-master %u real-dmaster=%u key %08x dbid 0x%08x gen=%u curgen=%u c->rsn=%llu header.rsn=%llu reqid=%u keyval=0x%08x\n",
488                          ctdb->pnn, c->dmaster, hdr->srcnode, header.dmaster, ctdb_hash(&key),
489                          ctdb_db->db_id, hdr->generation, ctdb->vnn_map->generation,
490                          (unsigned long long)c->rsn, (unsigned long long)header.rsn, c->hdr.reqid,
491                          (key.dsize >= 4)?(*(uint32_t *)key.dptr):0));
492                 if (header.rsn != 0 || header.dmaster != ctdb->pnn) {
493                         DEBUG(DEBUG_ERR,("ctdb_req_dmaster from non-master. Force a recovery.\n"));
494
495                         ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
496                         ctdb_ltdb_unlock(ctdb_db, key);
497                         return;
498                 }
499         }
500
501         if (header.rsn > c->rsn) {
502                 DEBUG(DEBUG_ALERT,("pnn %u dmaster request with older RSN new-dmaster %u from %u real-dmaster=%u key %08x dbid 0x%08x gen=%u curgen=%u c->rsn=%llu header.rsn=%llu reqid=%u\n",
503                          ctdb->pnn, c->dmaster, hdr->srcnode, header.dmaster, ctdb_hash(&key),
504                          ctdb_db->db_id, hdr->generation, ctdb->vnn_map->generation,
505                          (unsigned long long)c->rsn, (unsigned long long)header.rsn, c->hdr.reqid));
506         }
507
508         /* use the rsn from the sending node */
509         header.rsn = c->rsn;
510
511         /* store the record flags from the sending node */
512         header.flags = record_flags;
513
514         /* check if the new dmaster is the lmaster, in which case we
515            skip the dmaster reply */
516         if (c->dmaster == ctdb->pnn) {
517                 ctdb_become_dmaster(ctdb_db, hdr, key, data, c->rsn, record_flags);
518         } else {
519                 ctdb_send_dmaster_reply(ctdb_db, &header, key, data, c->dmaster, hdr->reqid);
520
521                 ret = ctdb_ltdb_unlock(ctdb_db, key);
522                 if (ret != 0) {
523                         DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
524                 }
525         }
526 }
527
528 static void ctdb_sticky_record_timeout(struct event_context *ev, struct timed_event *te, 
529                                        struct timeval t, void *private_data)
530 {
531         struct ctdb_sticky_record *sr = talloc_get_type(private_data, 
532                                                        struct ctdb_sticky_record);
533         talloc_free(sr);
534 }
535
536 static void *ctdb_make_sticky_record_callback(void *parm, void *data)
537 {
538         if (data) {
539                 DEBUG(DEBUG_ERR,("Already have sticky record registered. Free old %p and create new %p\n", data, parm));
540                 talloc_free(data);
541         }
542         return parm;
543 }
544
545 static int
546 ctdb_make_record_sticky(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, TDB_DATA key)
547 {
548         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
549         uint32_t *k;
550         struct ctdb_sticky_record *sr;
551
552         k = talloc_zero_size(tmp_ctx, ((key.dsize + 3) & 0xfffffffc) + 4);
553         if (k == NULL) {
554                 DEBUG(DEBUG_ERR,("Failed to allocate key for sticky record\n"));
555                 talloc_free(tmp_ctx);
556                 return -1;
557         }
558
559         k[0] = (key.dsize + 3) / 4 + 1;
560         memcpy(&k[1], key.dptr, key.dsize);
561
562         sr = trbt_lookuparray32(ctdb_db->sticky_records, k[0], &k[0]);
563         if (sr != NULL) {
564                 talloc_free(tmp_ctx);
565                 return 0;
566         }
567
568         sr = talloc(ctdb_db->sticky_records, struct ctdb_sticky_record);
569         if (sr == NULL) {
570                 talloc_free(tmp_ctx);
571                 DEBUG(DEBUG_ERR,("Failed to allocate sticky record structure\n"));
572                 return -1;
573         }
574
575         sr->ctdb    = ctdb;
576         sr->ctdb_db = ctdb_db;
577         sr->pindown = NULL;
578
579         DEBUG(DEBUG_ERR,("Make record sticky in db %s\n", ctdb_db->db_name));
580
581         trbt_insertarray32_callback(ctdb_db->sticky_records, k[0], &k[0], ctdb_make_sticky_record_callback, sr);
582
583         event_add_timed(ctdb->ev, sr, timeval_current_ofs(ctdb->tunable.sticky_duration, 0), ctdb_sticky_record_timeout, sr);
584
585         talloc_free(tmp_ctx);
586         return 0;
587 }
588
589 struct pinned_down_requeue_handle {
590         struct ctdb_context *ctdb;
591         struct ctdb_req_header *hdr;
592 };
593
594 struct pinned_down_deferred_call {
595         struct ctdb_context *ctdb;
596         struct ctdb_req_header *hdr;
597 };
598
599 static void pinned_down_requeue(struct event_context *ev, struct timed_event *te, 
600                        struct timeval t, void *private_data)
601 {
602         struct pinned_down_requeue_handle *handle = talloc_get_type(private_data, struct pinned_down_requeue_handle);
603         struct ctdb_context *ctdb = handle->ctdb;
604
605         talloc_steal(ctdb, handle->hdr);
606         ctdb_call_input_pkt(ctdb, handle->hdr);
607
608         talloc_free(handle);
609 }
610
611 static int pinned_down_destructor(struct pinned_down_deferred_call *pinned_down)
612 {
613         struct ctdb_context *ctdb = pinned_down->ctdb;
614         struct pinned_down_requeue_handle *handle = talloc(ctdb, struct pinned_down_requeue_handle);
615
616         handle->ctdb = pinned_down->ctdb;
617         handle->hdr  = pinned_down->hdr;
618         talloc_steal(handle, handle->hdr);
619
620         event_add_timed(ctdb->ev, handle, timeval_zero(), pinned_down_requeue, handle);
621
622         return 0;
623 }
624
625 static int
626 ctdb_defer_pinned_down_request(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, TDB_DATA key, struct ctdb_req_header *hdr)
627 {
628         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
629         uint32_t *k;
630         struct ctdb_sticky_record *sr;
631         struct pinned_down_deferred_call *pinned_down;
632
633         k = talloc_zero_size(tmp_ctx, ((key.dsize + 3) & 0xfffffffc) + 4);
634         if (k == NULL) {
635                 DEBUG(DEBUG_ERR,("Failed to allocate key for sticky record\n"));
636                 talloc_free(tmp_ctx);
637                 return -1;
638         }
639
640         k[0] = (key.dsize + 3) / 4 + 1;
641         memcpy(&k[1], key.dptr, key.dsize);
642
643         sr = trbt_lookuparray32(ctdb_db->sticky_records, k[0], &k[0]);
644         if (sr == NULL) {
645                 talloc_free(tmp_ctx);
646                 return -1;
647         }
648
649         talloc_free(tmp_ctx);
650
651         if (sr->pindown == NULL) {
652                 return -1;
653         }
654         
655         pinned_down = talloc(sr->pindown, struct pinned_down_deferred_call);
656         if (pinned_down == NULL) {
657                 DEBUG(DEBUG_ERR,("Failed to allocate structure for deferred pinned down request\n"));
658                 return -1;
659         }
660
661         pinned_down->ctdb = ctdb;
662         pinned_down->hdr  = hdr;
663
664         talloc_set_destructor(pinned_down, pinned_down_destructor);
665         talloc_steal(pinned_down, hdr);
666
667         return 0;
668 }
669
670 static void
671 ctdb_update_db_stat_hot_keys(struct ctdb_db_context *ctdb_db, TDB_DATA key, int hopcount)
672 {
673         int i;
674
675         /* smallest value is always at index 0 */
676         if (hopcount <= ctdb_db->statistics.hot_keys[0].count) {
677                 return;
678         }
679
680         /* see if we already know this key */
681         for (i = 0; i < MAX_HOT_KEYS; i++) {
682                 if (key.dsize != ctdb_db->statistics.hot_keys[i].key.dsize) {
683                         continue;
684                 }
685                 if (memcmp(key.dptr, ctdb_db->statistics.hot_keys[i].key.dptr, key.dsize)) {
686                         continue;
687                 }
688                 /* found an entry for this key */
689                 if (hopcount <= ctdb_db->statistics.hot_keys[i].count) {
690                         return;
691                 }
692                 ctdb_db->statistics.hot_keys[i].count = hopcount;
693                 goto sort_keys;
694         }
695
696         if (ctdb_db->statistics.hot_keys[0].key.dptr != NULL) {
697                 talloc_free(ctdb_db->statistics.hot_keys[0].key.dptr);
698         }
699         ctdb_db->statistics.hot_keys[0].key.dsize = key.dsize;
700         ctdb_db->statistics.hot_keys[0].key.dptr  = talloc_memdup(ctdb_db, key.dptr, key.dsize);
701         ctdb_db->statistics.hot_keys[0].count = hopcount;
702
703
704 sort_keys:
705         for (i = 2; i < MAX_HOT_KEYS; i++) {
706                 if (ctdb_db->statistics.hot_keys[i].count < ctdb_db->statistics.hot_keys[0].count) {
707                         hopcount = ctdb_db->statistics.hot_keys[i].count;
708                         ctdb_db->statistics.hot_keys[i].count = ctdb_db->statistics.hot_keys[0].count;
709                         ctdb_db->statistics.hot_keys[0].count = hopcount;
710
711                         key = ctdb_db->statistics.hot_keys[i].key;
712                         ctdb_db->statistics.hot_keys[i].key = ctdb_db->statistics.hot_keys[0].key;
713                         ctdb_db->statistics.hot_keys[0].key = key;
714                 }
715         }
716 }
717
718 /*
719   called when a CTDB_REQ_CALL packet comes in
720 */
721 void ctdb_request_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
722 {
723         struct ctdb_req_call *c = (struct ctdb_req_call *)hdr;
724         TDB_DATA data;
725         struct ctdb_reply_call *r;
726         int ret, len;
727         struct ctdb_ltdb_header header;
728         struct ctdb_call *call;
729         struct ctdb_db_context *ctdb_db;
730         int tmp_count, bucket;
731
732         if (ctdb->methods == NULL) {
733                 DEBUG(DEBUG_INFO,(__location__ " Failed ctdb_request_call. Transport is DOWN\n"));
734                 return;
735         }
736
737
738         ctdb_db = find_ctdb_db(ctdb, c->db_id);
739         if (!ctdb_db) {
740                 ctdb_send_error(ctdb, hdr, -1,
741                                 "Unknown database in request. db_id==0x%08x",
742                                 c->db_id);
743                 return;
744         }
745
746         call = talloc(hdr, struct ctdb_call);
747         CTDB_NO_MEMORY_FATAL(ctdb, call);
748
749         call->call_id  = c->callid;
750         call->key.dptr = c->data;
751         call->key.dsize = c->keylen;
752         call->call_data.dptr = c->data + c->keylen;
753         call->call_data.dsize = c->calldatalen;
754         call->reply_data.dptr  = NULL;
755         call->reply_data.dsize = 0;
756
757
758         /* If this record is pinned down we should defer the
759            request until the pindown times out
760         */
761         if (ctdb_db->sticky) {
762                 if (ctdb_defer_pinned_down_request(ctdb, ctdb_db, call->key, hdr) == 0) {
763                   DEBUG(DEBUG_WARNING,("Defer request for pinned down record in %s\n", ctdb_db->db_name));
764                         return;
765                 }
766         }
767
768
769         /* determine if we are the dmaster for this key. This also
770            fetches the record data (if any), thus avoiding a 2nd fetch of the data 
771            if the call will be answered locally */
772
773         ret = ctdb_ltdb_lock_fetch_requeue(ctdb_db, call->key, &header, hdr, &data,
774                                            ctdb_call_input_pkt, ctdb, false);
775         if (ret == -1) {
776                 ctdb_send_error(ctdb, hdr, ret, "ltdb fetch failed in ctdb_request_call");
777                 return;
778         }
779         if (ret == -2) {
780                 DEBUG(DEBUG_INFO,(__location__ " deferred ctdb_request_call\n"));
781                 return;
782         }
783
784         /* Dont do READONLY if we dont have a tracking database */
785         if ((c->flags & CTDB_WANT_READONLY) && !ctdb_db->readonly) {
786                 c->flags &= ~CTDB_WANT_READONLY;
787         }
788
789         if (header.flags & CTDB_REC_RO_REVOKE_COMPLETE) {
790                 header.flags &= ~(CTDB_REC_RO_HAVE_DELEGATIONS|CTDB_REC_RO_HAVE_READONLY|CTDB_REC_RO_REVOKING_READONLY|CTDB_REC_RO_REVOKE_COMPLETE);
791                 CTDB_INCREMENT_STAT(ctdb, total_ro_revokes);
792                 CTDB_INCREMENT_DB_STAT(ctdb_db, db_ro_revokes);
793                 if (ctdb_ltdb_store(ctdb_db, call->key, &header, data) != 0) {
794                         ctdb_fatal(ctdb, "Failed to write header with cleared REVOKE flag");
795                 }
796                 /* and clear out the tracking data */
797                 if (tdb_delete(ctdb_db->rottdb, call->key) != 0) {
798                         DEBUG(DEBUG_ERR,(__location__ " Failed to clear out trackingdb record\n"));
799                 }
800         }
801
802         /* if we are revoking, we must defer all other calls until the revoke
803          * had completed.
804          */
805         if (header.flags & CTDB_REC_RO_REVOKING_READONLY) {
806                 talloc_free(data.dptr);
807                 ret = ctdb_ltdb_unlock(ctdb_db, call->key);
808
809                 if (ctdb_add_revoke_deferred_call(ctdb, ctdb_db, call->key, hdr, ctdb_call_input_pkt, ctdb) != 0) {
810                         ctdb_fatal(ctdb, "Failed to add deferred call for revoke child");
811                 }
812                 talloc_free(call);
813                 return;
814         }
815
816         /* if we are not the dmaster and are not hosting any delegations,
817            then send a redirect to the requesting node */
818         if ((header.dmaster != ctdb->pnn) 
819             && (!(header.flags & CTDB_REC_RO_HAVE_DELEGATIONS)) ) {
820                 talloc_free(data.dptr);
821                 ctdb_call_send_redirect(ctdb, ctdb_db, call->key, c, &header);
822
823                 ret = ctdb_ltdb_unlock(ctdb_db, call->key);
824                 if (ret != 0) {
825                         DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
826                 }
827                 return;
828         }
829
830         if ( (!(c->flags & CTDB_WANT_READONLY))
831         && (header.flags & (CTDB_REC_RO_HAVE_DELEGATIONS|CTDB_REC_RO_HAVE_READONLY)) ) {
832                 header.flags   |= CTDB_REC_RO_REVOKING_READONLY;
833                 if (ctdb_ltdb_store(ctdb_db, call->key, &header, data) != 0) {
834                         ctdb_fatal(ctdb, "Failed to store record with HAVE_DELEGATIONS set");
835                 }
836                 ret = ctdb_ltdb_unlock(ctdb_db, call->key);
837
838                 if (ctdb_start_revoke_ro_record(ctdb, ctdb_db, call->key, &header, data) != 0) {
839                         ctdb_fatal(ctdb, "Failed to start record revoke");
840                 }
841                 talloc_free(data.dptr);
842
843                 if (ctdb_add_revoke_deferred_call(ctdb, ctdb_db, call->key, hdr, ctdb_call_input_pkt, ctdb) != 0) {
844                         ctdb_fatal(ctdb, "Failed to add deferred call for revoke child");
845                 }
846                 talloc_free(call);
847
848                 return;
849         }               
850
851         /* If this is the first request for delegation. bump rsn and set
852          * the delegations flag
853          */
854         if ((c->flags & CTDB_WANT_READONLY)
855         &&  (c->callid == CTDB_FETCH_WITH_HEADER_FUNC)
856         &&  (!(header.flags & CTDB_REC_RO_HAVE_DELEGATIONS))) {
857                 header.rsn     += 3;
858                 header.flags   |= CTDB_REC_RO_HAVE_DELEGATIONS;
859                 if (ctdb_ltdb_store(ctdb_db, call->key, &header, data) != 0) {
860                         ctdb_fatal(ctdb, "Failed to store record with HAVE_DELEGATIONS set");
861                 }
862         }
863         if ((c->flags & CTDB_WANT_READONLY) 
864         &&  (call->call_id == CTDB_FETCH_WITH_HEADER_FUNC)) {
865                 TDB_DATA tdata;
866
867                 tdata = tdb_fetch(ctdb_db->rottdb, call->key);
868                 if (ctdb_trackingdb_add_pnn(ctdb, &tdata, c->hdr.srcnode) != 0) {
869                         ctdb_fatal(ctdb, "Failed to add node to trackingdb");
870                 }
871                 if (tdb_store(ctdb_db->rottdb, call->key, tdata, TDB_REPLACE) != 0) {
872                         ctdb_fatal(ctdb, "Failed to store trackingdb data");
873                 }
874                 free(tdata.dptr);
875
876                 ret = ctdb_ltdb_unlock(ctdb_db, call->key);
877                 if (ret != 0) {
878                         DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
879                 }
880
881                 len = offsetof(struct ctdb_reply_call, data) + data.dsize + sizeof(struct ctdb_ltdb_header);
882                 r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REPLY_CALL, len, 
883                                             struct ctdb_reply_call);
884                 CTDB_NO_MEMORY_FATAL(ctdb, r);
885                 r->hdr.destnode  = c->hdr.srcnode;
886                 r->hdr.reqid     = c->hdr.reqid;
887                 r->status        = 0;
888                 r->datalen       = data.dsize + sizeof(struct ctdb_ltdb_header);
889                 header.rsn      -= 2;
890                 header.flags   |= CTDB_REC_RO_HAVE_READONLY;
891                 header.flags   &= ~CTDB_REC_RO_HAVE_DELEGATIONS;
892                 memcpy(&r->data[0], &header, sizeof(struct ctdb_ltdb_header));
893
894                 if (data.dsize) {
895                         memcpy(&r->data[sizeof(struct ctdb_ltdb_header)], data.dptr, data.dsize);
896                 }
897
898                 ctdb_queue_packet(ctdb, &r->hdr);
899                 CTDB_INCREMENT_STAT(ctdb, total_ro_delegations);
900                 CTDB_INCREMENT_DB_STAT(ctdb_db, db_ro_delegations);
901
902                 talloc_free(r);
903                 return;
904         }
905
906         CTDB_UPDATE_STAT(ctdb, max_hop_count, c->hopcount);
907         tmp_count = c->hopcount;
908         bucket = 0;
909         while (tmp_count) {
910                 tmp_count >>= 2;
911                 bucket++;
912         }
913         if (bucket >= MAX_COUNT_BUCKETS) {
914                 bucket = MAX_COUNT_BUCKETS - 1;
915         }
916         CTDB_INCREMENT_STAT(ctdb, hop_count_bucket[bucket]);
917         CTDB_INCREMENT_DB_STAT(ctdb_db, hop_count_bucket[bucket]);
918         ctdb_update_db_stat_hot_keys(ctdb_db, call->key, c->hopcount);
919
920         /* If this database supports sticky records, then check if the
921            hopcount is big. If it is it means the record is hot and we
922            should make it sticky.
923         */
924         if (ctdb_db->sticky && c->hopcount >= ctdb->tunable.hopcount_make_sticky) {
925                 DEBUG(DEBUG_ERR, ("Hot record in database %s. Hopcount is %d. Make record sticky for %d seconds\n", ctdb_db->db_name, c->hopcount, ctdb->tunable.sticky_duration));
926                 ctdb_make_record_sticky(ctdb, ctdb_db, call->key);
927         }
928
929
930         /* if this nodes has done enough consecutive calls on the same record
931            then give them the record
932            or if the node requested an immediate migration
933         */
934         if ( c->hdr.srcnode != ctdb->pnn &&
935              ((header.laccessor == c->hdr.srcnode
936                && header.lacount >= ctdb->tunable.max_lacount
937                && ctdb->tunable.max_lacount != 0)
938               || (c->flags & CTDB_IMMEDIATE_MIGRATION)) ) {
939                 if (ctdb_db->transaction_active) {
940                         DEBUG(DEBUG_INFO, (__location__ " refusing migration"
941                               " of key %s while transaction is active\n",
942                               (char *)call->key.dptr));
943                 } else {
944                         DEBUG(DEBUG_DEBUG,("pnn %u starting migration of %08x to %u\n",
945                                  ctdb->pnn, ctdb_hash(&(call->key)), c->hdr.srcnode));
946                         ctdb_call_send_dmaster(ctdb_db, c, &header, &(call->key), &data);
947                         talloc_free(data.dptr);
948
949                         ret = ctdb_ltdb_unlock(ctdb_db, call->key);
950                         if (ret != 0) {
951                                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
952                         }
953                         return;
954                 }
955         }
956
957         ret = ctdb_call_local(ctdb_db, call, &header, hdr, &data, true, c->hdr.srcnode);
958         if (ret != 0) {
959                 DEBUG(DEBUG_ERR,(__location__ " ctdb_call_local failed\n"));
960                 call->status = -1;
961         }
962
963         ret = ctdb_ltdb_unlock(ctdb_db, call->key);
964         if (ret != 0) {
965                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
966         }
967
968         len = offsetof(struct ctdb_reply_call, data) + call->reply_data.dsize;
969         r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REPLY_CALL, len, 
970                                     struct ctdb_reply_call);
971         CTDB_NO_MEMORY_FATAL(ctdb, r);
972         r->hdr.destnode  = hdr->srcnode;
973         r->hdr.reqid     = hdr->reqid;
974         r->status        = call->status;
975         r->datalen       = call->reply_data.dsize;
976         if (call->reply_data.dsize) {
977                 memcpy(&r->data[0], call->reply_data.dptr, call->reply_data.dsize);
978         }
979
980         ctdb_queue_packet(ctdb, &r->hdr);
981
982         talloc_free(r);
983 }
984
985 /*
986   called when a CTDB_REPLY_CALL packet comes in
987
988   This packet comes in response to a CTDB_REQ_CALL request packet. It
989   contains any reply data from the call
990 */
991 void ctdb_reply_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
992 {
993         struct ctdb_reply_call *c = (struct ctdb_reply_call *)hdr;
994         struct ctdb_call_state *state;
995
996         state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_call_state);
997         if (state == NULL) {
998                 DEBUG(DEBUG_ERR, (__location__ " reqid %u not found\n", hdr->reqid));
999                 return;
1000         }
1001
1002         if (hdr->reqid != state->reqid) {
1003                 /* we found a record  but it was the wrong one */
1004                 DEBUG(DEBUG_ERR, ("Dropped orphaned call reply with reqid:%u\n",hdr->reqid));
1005                 return;
1006         }
1007
1008
1009         /* read only delegation processing */
1010         /* If we got a FETCH_WITH_HEADER we should check if this is a ro
1011          * delegation since we may need to update the record header
1012          */
1013         if (state->c->callid == CTDB_FETCH_WITH_HEADER_FUNC) {
1014                 struct ctdb_db_context *ctdb_db = state->ctdb_db;
1015                 struct ctdb_ltdb_header *header = (struct ctdb_ltdb_header *)&c->data[0];
1016                 struct ctdb_ltdb_header oldheader;
1017                 TDB_DATA key, data, olddata;
1018                 int ret;
1019
1020                 if (!(header->flags & CTDB_REC_RO_HAVE_READONLY)) {
1021                         goto finished_ro;
1022                         return;
1023                 }
1024
1025                 key.dsize = state->c->keylen;
1026                 key.dptr  = state->c->data;
1027                 ret = ctdb_ltdb_lock_requeue(ctdb_db, key, hdr,
1028                                      ctdb_call_input_pkt, ctdb, false);
1029                 if (ret == -2) {
1030                         return;
1031                 }
1032                 if (ret != 0) {
1033                         DEBUG(DEBUG_ERR,(__location__ " Failed to get lock in ctdb_reply_call\n"));
1034                         return;
1035                 }
1036
1037                 ret = ctdb_ltdb_fetch(ctdb_db, key, &oldheader, state, &olddata);
1038                 if (ret != 0) {
1039                         DEBUG(DEBUG_ERR, ("Failed to fetch old record in ctdb_reply_call\n"));
1040                         ctdb_ltdb_unlock(ctdb_db, key);
1041                         goto finished_ro;
1042                 }                       
1043
1044                 if (header->rsn <= oldheader.rsn) {
1045                         ctdb_ltdb_unlock(ctdb_db, key);
1046                         goto finished_ro;
1047                 }
1048
1049                 if (c->datalen < sizeof(struct ctdb_ltdb_header)) {
1050                         DEBUG(DEBUG_ERR,(__location__ " Got FETCH_WITH_HEADER reply with too little data: %d bytes\n", c->datalen));
1051                         ctdb_ltdb_unlock(ctdb_db, key);
1052                         goto finished_ro;
1053                 }
1054
1055                 data.dsize = c->datalen - sizeof(struct ctdb_ltdb_header);
1056                 data.dptr  = &c->data[sizeof(struct ctdb_ltdb_header)];
1057                 ret = ctdb_ltdb_store(ctdb_db, key, header, data);
1058                 if (ret != 0) {
1059                         DEBUG(DEBUG_ERR, ("Failed to store new record in ctdb_reply_call\n"));
1060                         ctdb_ltdb_unlock(ctdb_db, key);
1061                         goto finished_ro;
1062                 }                       
1063
1064                 ctdb_ltdb_unlock(ctdb_db, key);
1065         }
1066 finished_ro:
1067
1068         state->call->reply_data.dptr = c->data;
1069         state->call->reply_data.dsize = c->datalen;
1070         state->call->status = c->status;
1071
1072         talloc_steal(state, c);
1073
1074         state->state = CTDB_CALL_DONE;
1075         if (state->async.fn) {
1076                 state->async.fn(state);
1077         }
1078 }
1079
1080
1081 /*
1082   called when a CTDB_REPLY_DMASTER packet comes in
1083
1084   This packet comes in from the lmaster response to a CTDB_REQ_CALL
1085   request packet. It means that the current dmaster wants to give us
1086   the dmaster role
1087 */
1088 void ctdb_reply_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
1089 {
1090         struct ctdb_reply_dmaster *c = (struct ctdb_reply_dmaster *)hdr;
1091         struct ctdb_db_context *ctdb_db;
1092         TDB_DATA key, data;
1093         uint32_t record_flags = 0;
1094         size_t len;
1095         int ret;
1096
1097         ctdb_db = find_ctdb_db(ctdb, c->db_id);
1098         if (ctdb_db == NULL) {
1099                 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_reply_dmaster\n", c->db_id));
1100                 return;
1101         }
1102         
1103         key.dptr = c->data;
1104         key.dsize = c->keylen;
1105         data.dptr = &c->data[key.dsize];
1106         data.dsize = c->datalen;
1107         len = offsetof(struct ctdb_reply_dmaster, data) + key.dsize + data.dsize
1108                 + sizeof(uint32_t);
1109         if (len <= c->hdr.length) {
1110                 record_flags = *(uint32_t *)&c->data[c->keylen + c->datalen];
1111         }
1112
1113         ret = ctdb_ltdb_lock_requeue(ctdb_db, key, hdr,
1114                                      ctdb_call_input_pkt, ctdb, false);
1115         if (ret == -2) {
1116                 return;
1117         }
1118         if (ret != 0) {
1119                 DEBUG(DEBUG_ERR,(__location__ " Failed to get lock in ctdb_reply_dmaster\n"));
1120                 return;
1121         }
1122
1123         ctdb_become_dmaster(ctdb_db, hdr, key, data, c->rsn, record_flags);
1124 }
1125
1126
1127 /*
1128   called when a CTDB_REPLY_ERROR packet comes in
1129 */
1130 void ctdb_reply_error(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
1131 {
1132         struct ctdb_reply_error *c = (struct ctdb_reply_error *)hdr;
1133         struct ctdb_call_state *state;
1134
1135         state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_call_state);
1136         if (state == NULL) {
1137                 DEBUG(DEBUG_ERR,("pnn %u Invalid reqid %u in ctdb_reply_error\n",
1138                          ctdb->pnn, hdr->reqid));
1139                 return;
1140         }
1141
1142         if (hdr->reqid != state->reqid) {
1143                 /* we found a record  but it was the wrong one */
1144                 DEBUG(DEBUG_ERR, ("Dropped orphaned error reply with reqid:%u\n",hdr->reqid));
1145                 return;
1146         }
1147
1148         talloc_steal(state, c);
1149
1150         state->state  = CTDB_CALL_ERROR;
1151         state->errmsg = (char *)c->msg;
1152         if (state->async.fn) {
1153                 state->async.fn(state);
1154         }
1155 }
1156
1157
1158 /*
1159   destroy a ctdb_call
1160 */
1161 static int ctdb_call_destructor(struct ctdb_call_state *state)
1162 {
1163         DLIST_REMOVE(state->ctdb_db->ctdb->pending_calls, state);
1164         ctdb_reqid_remove(state->ctdb_db->ctdb, state->reqid);
1165         return 0;
1166 }
1167
1168
1169 /*
1170   called when a ctdb_call needs to be resent after a reconfigure event
1171 */
1172 static void ctdb_call_resend(struct ctdb_call_state *state)
1173 {
1174         struct ctdb_context *ctdb = state->ctdb_db->ctdb;
1175
1176         state->generation = ctdb->vnn_map->generation;
1177
1178         /* use a new reqid, in case the old reply does eventually come in */
1179         ctdb_reqid_remove(ctdb, state->reqid);
1180         state->reqid = ctdb_reqid_new(ctdb, state);
1181         state->c->hdr.reqid = state->reqid;
1182
1183         /* update the generation count for this request, so its valid with the new vnn_map */
1184         state->c->hdr.generation = state->generation;
1185
1186         /* send the packet to ourselves, it will be redirected appropriately */
1187         state->c->hdr.destnode = ctdb->pnn;
1188
1189         ctdb_queue_packet(ctdb, &state->c->hdr);
1190         DEBUG(DEBUG_NOTICE,("resent ctdb_call\n"));
1191 }
1192
1193 /*
1194   resend all pending calls on recovery
1195  */
1196 void ctdb_call_resend_all(struct ctdb_context *ctdb)
1197 {
1198         struct ctdb_call_state *state, *next;
1199         for (state=ctdb->pending_calls;state;state=next) {
1200                 next = state->next;
1201                 ctdb_call_resend(state);
1202         }
1203 }
1204
1205 /*
1206   this allows the caller to setup a async.fn 
1207 */
1208 static void call_local_trigger(struct event_context *ev, struct timed_event *te, 
1209                        struct timeval t, void *private_data)
1210 {
1211         struct ctdb_call_state *state = talloc_get_type(private_data, struct ctdb_call_state);
1212         if (state->async.fn) {
1213                 state->async.fn(state);
1214         }
1215 }       
1216
1217
1218 /*
1219   construct an event driven local ctdb_call
1220
1221   this is used so that locally processed ctdb_call requests are processed
1222   in an event driven manner
1223 */
1224 struct ctdb_call_state *ctdb_call_local_send(struct ctdb_db_context *ctdb_db, 
1225                                              struct ctdb_call *call,
1226                                              struct ctdb_ltdb_header *header,
1227                                              TDB_DATA *data)
1228 {
1229         struct ctdb_call_state *state;
1230         struct ctdb_context *ctdb = ctdb_db->ctdb;
1231         int ret;
1232
1233         state = talloc_zero(ctdb_db, struct ctdb_call_state);
1234         CTDB_NO_MEMORY_NULL(ctdb, state);
1235
1236         talloc_steal(state, data->dptr);
1237
1238         state->state = CTDB_CALL_DONE;
1239         state->call  = talloc(state, struct ctdb_call);
1240         CTDB_NO_MEMORY_NULL(ctdb, state->call);
1241         *(state->call) = *call;
1242         state->ctdb_db = ctdb_db;
1243
1244         ret = ctdb_call_local(ctdb_db, state->call, header, state, data, true, ctdb->pnn);
1245         if (ret != 0) {
1246                 DEBUG(DEBUG_DEBUG,("ctdb_call_local() failed, ignoring return code %d\n", ret));
1247         }
1248
1249         event_add_timed(ctdb->ev, state, timeval_zero(), call_local_trigger, state);
1250
1251         return state;
1252 }
1253
1254
1255 /*
1256   make a remote ctdb call - async send. Called in daemon context.
1257
1258   This constructs a ctdb_call request and queues it for processing. 
1259   This call never blocks.
1260 */
1261 struct ctdb_call_state *ctdb_daemon_call_send_remote(struct ctdb_db_context *ctdb_db, 
1262                                                      struct ctdb_call *call, 
1263                                                      struct ctdb_ltdb_header *header)
1264 {
1265         uint32_t len;
1266         struct ctdb_call_state *state;
1267         struct ctdb_context *ctdb = ctdb_db->ctdb;
1268
1269         if (ctdb->methods == NULL) {
1270                 DEBUG(DEBUG_INFO,(__location__ " Failed send packet. Transport is down\n"));
1271                 return NULL;
1272         }
1273
1274         state = talloc_zero(ctdb_db, struct ctdb_call_state);
1275         CTDB_NO_MEMORY_NULL(ctdb, state);
1276         state->call = talloc(state, struct ctdb_call);
1277         CTDB_NO_MEMORY_NULL(ctdb, state->call);
1278
1279         state->reqid = ctdb_reqid_new(ctdb, state);
1280         state->ctdb_db = ctdb_db;
1281         talloc_set_destructor(state, ctdb_call_destructor);
1282
1283         len = offsetof(struct ctdb_req_call, data) + call->key.dsize + call->call_data.dsize;
1284         state->c = ctdb_transport_allocate(ctdb, state, CTDB_REQ_CALL, len, 
1285                                            struct ctdb_req_call);
1286         CTDB_NO_MEMORY_NULL(ctdb, state->c);
1287         state->c->hdr.destnode  = header->dmaster;
1288
1289         /* this limits us to 16k outstanding messages - not unreasonable */
1290         state->c->hdr.reqid     = state->reqid;
1291         state->c->flags         = call->flags;
1292         state->c->db_id         = ctdb_db->db_id;
1293         state->c->callid        = call->call_id;
1294         state->c->hopcount      = 0;
1295         state->c->keylen        = call->key.dsize;
1296         state->c->calldatalen   = call->call_data.dsize;
1297         memcpy(&state->c->data[0], call->key.dptr, call->key.dsize);
1298         memcpy(&state->c->data[call->key.dsize], 
1299                call->call_data.dptr, call->call_data.dsize);
1300         *(state->call)              = *call;
1301         state->call->call_data.dptr = &state->c->data[call->key.dsize];
1302         state->call->key.dptr       = &state->c->data[0];
1303
1304         state->state  = CTDB_CALL_WAIT;
1305         state->generation = ctdb->vnn_map->generation;
1306
1307         DLIST_ADD(ctdb->pending_calls, state);
1308
1309         ctdb_queue_packet(ctdb, &state->c->hdr);
1310
1311         return state;
1312 }
1313
1314 /*
1315   make a remote ctdb call - async recv - called in daemon context
1316
1317   This is called when the program wants to wait for a ctdb_call to complete and get the 
1318   results. This call will block unless the call has already completed.
1319 */
1320 int ctdb_daemon_call_recv(struct ctdb_call_state *state, struct ctdb_call *call)
1321 {
1322         while (state->state < CTDB_CALL_DONE) {
1323                 event_loop_once(state->ctdb_db->ctdb->ev);
1324         }
1325         if (state->state != CTDB_CALL_DONE) {
1326                 ctdb_set_error(state->ctdb_db->ctdb, "%s", state->errmsg);
1327                 talloc_free(state);
1328                 return -1;
1329         }
1330
1331         if (state->call->reply_data.dsize) {
1332                 call->reply_data.dptr = talloc_memdup(call,
1333                                                       state->call->reply_data.dptr,
1334                                                       state->call->reply_data.dsize);
1335                 call->reply_data.dsize = state->call->reply_data.dsize;
1336         } else {
1337                 call->reply_data.dptr = NULL;
1338                 call->reply_data.dsize = 0;
1339         }
1340         call->status = state->call->status;
1341         talloc_free(state);
1342         return 0;
1343 }
1344
1345
1346 /* 
1347    send a keepalive packet to the other node
1348 */
1349 void ctdb_send_keepalive(struct ctdb_context *ctdb, uint32_t destnode)
1350 {
1351         struct ctdb_req_keepalive *r;
1352         
1353         if (ctdb->methods == NULL) {
1354                 DEBUG(DEBUG_INFO,(__location__ " Failed to send keepalive. Transport is DOWN\n"));
1355                 return;
1356         }
1357
1358         r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REQ_KEEPALIVE,
1359                                     sizeof(struct ctdb_req_keepalive), 
1360                                     struct ctdb_req_keepalive);
1361         CTDB_NO_MEMORY_FATAL(ctdb, r);
1362         r->hdr.destnode  = destnode;
1363         r->hdr.reqid     = 0;
1364         
1365         CTDB_INCREMENT_STAT(ctdb, keepalive_packets_sent);
1366
1367         ctdb_queue_packet(ctdb, &r->hdr);
1368
1369         talloc_free(r);
1370 }
1371
1372
1373
1374 struct revokechild_deferred_call {
1375         struct ctdb_context *ctdb;
1376         struct ctdb_req_header *hdr;
1377         deferred_requeue_fn fn;
1378         void *ctx;
1379 };
1380
1381 struct revokechild_handle {
1382         struct revokechild_handle *next, *prev;
1383         struct ctdb_context *ctdb;
1384         struct ctdb_db_context *ctdb_db;
1385         struct fd_event *fde;
1386         int status;
1387         int fd[2];
1388         pid_t child;
1389         TDB_DATA key;
1390 };
1391
1392 struct revokechild_requeue_handle {
1393         struct ctdb_context *ctdb;
1394         struct ctdb_req_header *hdr;
1395         deferred_requeue_fn fn;
1396         void *ctx;
1397 };
1398
1399 static void deferred_call_requeue(struct event_context *ev, struct timed_event *te, 
1400                        struct timeval t, void *private_data)
1401 {
1402         struct revokechild_requeue_handle *requeue_handle = talloc_get_type(private_data, struct revokechild_requeue_handle);
1403
1404         requeue_handle->fn(requeue_handle->ctx, requeue_handle->hdr);
1405         talloc_free(requeue_handle);
1406 }
1407
1408 static int deferred_call_destructor(struct revokechild_deferred_call *deferred_call)
1409 {
1410         struct ctdb_context *ctdb = deferred_call->ctdb;
1411         struct revokechild_requeue_handle *requeue_handle = talloc(ctdb, struct revokechild_requeue_handle);
1412         struct ctdb_req_call *c = (struct ctdb_req_call *)deferred_call->hdr;
1413
1414         requeue_handle->ctdb = ctdb;
1415         requeue_handle->hdr  = deferred_call->hdr;
1416         requeue_handle->fn   = deferred_call->fn;
1417         requeue_handle->ctx  = deferred_call->ctx;
1418         talloc_steal(requeue_handle, requeue_handle->hdr);
1419
1420         /* when revoking, any READONLY requests have 1 second grace to let read/write finish first */
1421         event_add_timed(ctdb->ev, requeue_handle, timeval_current_ofs(c->flags & CTDB_WANT_READONLY ? 1 : 0, 0), deferred_call_requeue, requeue_handle);
1422
1423         return 0;
1424 }
1425
1426
1427 static int revokechild_destructor(struct revokechild_handle *rc)
1428 {
1429         if (rc->fde != NULL) {
1430                 talloc_free(rc->fde);
1431         }
1432
1433         if (rc->fd[0] != -1) {
1434                 close(rc->fd[0]);
1435         }
1436         if (rc->fd[1] != -1) {
1437                 close(rc->fd[1]);
1438         }
1439         ctdb_kill(rc->ctdb, rc->child, SIGKILL);
1440
1441         DLIST_REMOVE(rc->ctdb_db->revokechild_active, rc);
1442         return 0;
1443 }
1444
1445 static void revokechild_handler(struct event_context *ev, struct fd_event *fde, 
1446                              uint16_t flags, void *private_data)
1447 {
1448         struct revokechild_handle *rc = talloc_get_type(private_data, 
1449                                                      struct revokechild_handle);
1450         int ret;
1451         char c;
1452
1453         ret = read(rc->fd[0], &c, 1);
1454         if (ret != 1) {
1455                 DEBUG(DEBUG_ERR,("Failed to read status from revokechild. errno:%d\n", errno));
1456                 rc->status = -1;
1457                 talloc_free(rc);
1458                 return;
1459         }
1460         if (c != 0) {
1461                 DEBUG(DEBUG_ERR,("revokechild returned failure. status:%d\n", c));
1462                 rc->status = -1;
1463                 talloc_free(rc);
1464                 return;
1465         }
1466
1467         talloc_free(rc);
1468 }
1469
1470 struct ctdb_revoke_state {
1471         struct ctdb_db_context *ctdb_db;
1472         TDB_DATA key;
1473         struct ctdb_ltdb_header *header;
1474         TDB_DATA data;
1475         int count;
1476         int status;
1477         int finished;
1478 };
1479
1480 static void update_record_cb(struct ctdb_client_control_state *state)
1481 {
1482         struct ctdb_revoke_state *revoke_state;
1483         int ret;
1484         int32_t res;
1485
1486         if (state == NULL) {
1487                 return;
1488         }
1489         revoke_state = state->async.private_data;
1490
1491         state->async.fn = NULL;
1492         ret = ctdb_control_recv(state->ctdb, state, state, NULL, &res, NULL);
1493         if ((ret != 0) || (res != 0)) {
1494                 DEBUG(DEBUG_ERR,("Recv for revoke update record failed ret:%d res:%d\n", ret, res));
1495                 revoke_state->status = -1;
1496         }
1497
1498         revoke_state->count--;
1499         if (revoke_state->count <= 0) {
1500                 revoke_state->finished = 1;
1501         }
1502 }
1503
1504 static void revoke_send_cb(struct ctdb_context *ctdb, uint32_t pnn, void *private_data)
1505 {
1506         struct ctdb_revoke_state *revoke_state = private_data;
1507         struct ctdb_client_control_state *state;
1508
1509         state = ctdb_ctrl_updaterecord_send(ctdb, revoke_state, timeval_current_ofs(5,0), pnn, revoke_state->ctdb_db, revoke_state->key, revoke_state->header, revoke_state->data);
1510         if (state == NULL) {
1511                 DEBUG(DEBUG_ERR,("Failure to send update record to revoke readonly delegation\n"));
1512                 revoke_state->status = -1;
1513                 return;
1514         }
1515         state->async.fn           = update_record_cb;
1516         state->async.private_data = revoke_state;
1517
1518         revoke_state->count++;
1519
1520 }
1521
1522 static void ctdb_revoke_timeout_handler(struct event_context *ev, struct timed_event *te, 
1523                               struct timeval yt, void *private_data)
1524 {
1525         struct ctdb_revoke_state *state = private_data;
1526
1527         DEBUG(DEBUG_ERR,("Timed out waiting for revoke to finish\n"));
1528         state->finished = 1;
1529         state->status   = -1;
1530 }
1531
1532 static int ctdb_revoke_all_delegations(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, TDB_DATA tdata, TDB_DATA key, struct ctdb_ltdb_header *header, TDB_DATA data)
1533 {
1534         struct ctdb_revoke_state *state = talloc_zero(ctdb, struct ctdb_revoke_state);
1535         int status;
1536
1537         state->ctdb_db = ctdb_db;
1538         state->key     = key;
1539         state->header  = header;
1540         state->data    = data;
1541  
1542         ctdb_trackingdb_traverse(ctdb, tdata, revoke_send_cb, state);
1543
1544         event_add_timed(ctdb->ev, state, timeval_current_ofs(5, 0), ctdb_revoke_timeout_handler, state);
1545
1546         while (state->finished == 0) {
1547                 event_loop_once(ctdb->ev);
1548         }
1549
1550         status = state->status;
1551
1552         if (status == 0) {
1553                 struct ctdb_ltdb_header new_header;
1554                 TDB_DATA new_data;
1555
1556                 if (ctdb_ltdb_lock(ctdb_db, key) != 0) {
1557                         DEBUG(DEBUG_ERR,("Failed to chainlock the database in revokechild\n"));
1558                         talloc_free(state);
1559                         return -1;
1560                 }
1561                 if (ctdb_ltdb_fetch(ctdb_db, key, &new_header, state, &new_data) != 0) {
1562                         ctdb_ltdb_unlock(ctdb_db, key);
1563                         DEBUG(DEBUG_ERR,("Failed for fetch tdb record in revokechild\n"));
1564                         talloc_free(state);
1565                         return -1;
1566                 }
1567                 header->rsn++;
1568                 if (new_header.rsn > header->rsn) {
1569                         ctdb_ltdb_unlock(ctdb_db, key);
1570                         DEBUG(DEBUG_ERR,("RSN too high in tdb record in revokechild\n"));
1571                         talloc_free(state);
1572                         return -1;
1573                 }
1574                 if ( (new_header.flags & (CTDB_REC_RO_REVOKING_READONLY|CTDB_REC_RO_HAVE_DELEGATIONS)) != (CTDB_REC_RO_REVOKING_READONLY|CTDB_REC_RO_HAVE_DELEGATIONS) ) {
1575                         ctdb_ltdb_unlock(ctdb_db, key);
1576                         DEBUG(DEBUG_ERR,("Flags are wrong in tdb record in revokechild\n"));
1577                         talloc_free(state);
1578                         return -1;
1579                 }
1580                 new_header.rsn++;
1581                 new_header.flags |= CTDB_REC_RO_REVOKE_COMPLETE;
1582                 if (ctdb_ltdb_store(ctdb_db, key, &new_header, new_data) != 0) {
1583                         ctdb_ltdb_unlock(ctdb_db, key);
1584                         DEBUG(DEBUG_ERR,("Failed to write new record in revokechild\n"));
1585                         talloc_free(state);
1586                         return -1;
1587                 }
1588                 ctdb_ltdb_unlock(ctdb_db, key);
1589         }
1590
1591         talloc_free(state);
1592         return status;
1593 }
1594
1595
1596 int ctdb_start_revoke_ro_record(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, TDB_DATA key, struct ctdb_ltdb_header *header, TDB_DATA data)
1597 {
1598         TDB_DATA tdata;
1599         struct revokechild_handle *rc;
1600         pid_t parent = getpid();
1601         int ret;
1602
1603         header->flags &= ~(CTDB_REC_RO_REVOKING_READONLY|CTDB_REC_RO_HAVE_DELEGATIONS|CTDB_REC_RO_HAVE_READONLY);
1604         header->flags |= CTDB_REC_FLAG_MIGRATED_WITH_DATA;
1605         header->rsn   -= 1;
1606
1607         if ((rc = talloc_zero(ctdb_db, struct revokechild_handle)) == NULL) {
1608                 DEBUG(DEBUG_ERR,("Failed to allocate revokechild_handle\n"));
1609                 return -1;
1610         }
1611
1612         tdata = tdb_fetch(ctdb_db->rottdb, key);
1613         if (tdata.dsize > 0) {
1614                 uint8_t *tmp;
1615
1616                 tmp = tdata.dptr;
1617                 tdata.dptr = talloc_memdup(rc, tdata.dptr, tdata.dsize);
1618                 free(tmp);
1619         }
1620
1621         rc->status    = 0;
1622         rc->ctdb      = ctdb;
1623         rc->ctdb_db   = ctdb_db;
1624         rc->fd[0]     = -1;
1625         rc->fd[1]     = -1;
1626
1627         talloc_set_destructor(rc, revokechild_destructor);
1628
1629         rc->key.dsize = key.dsize;
1630         rc->key.dptr  = talloc_memdup(rc, key.dptr, key.dsize);
1631         if (rc->key.dptr == NULL) {
1632                 DEBUG(DEBUG_ERR,("Failed to allocate key for revokechild_handle\n"));
1633                 talloc_free(rc);
1634                 return -1;
1635         }
1636
1637         ret = pipe(rc->fd);
1638         if (ret != 0) {
1639                 DEBUG(DEBUG_ERR,("Failed to allocate key for revokechild_handle\n"));
1640                 talloc_free(rc);
1641                 return -1;
1642         }
1643
1644
1645         rc->child = ctdb_fork(ctdb);
1646         if (rc->child == (pid_t)-1) {
1647                 DEBUG(DEBUG_ERR,("Failed to fork child for revokechild\n"));
1648                 talloc_free(rc);
1649                 return -1;
1650         }
1651
1652         if (rc->child == 0) {
1653                 char c = 0;
1654                 close(rc->fd[0]);
1655                 debug_extra = talloc_asprintf(NULL, "revokechild-%s:", ctdb_db->db_name);
1656
1657                 if (switch_from_server_to_client(ctdb, "revokechild-%s", ctdb_db->db_name) != 0) {
1658                         DEBUG(DEBUG_ERR,("Failed to switch from server to client for revokechild process\n"));
1659                         c = 1;
1660                         goto child_finished;
1661                 }
1662
1663                 c = ctdb_revoke_all_delegations(ctdb, ctdb_db, tdata, key, header, data);
1664
1665 child_finished:
1666                 write(rc->fd[1], &c, 1);
1667                 /* make sure we die when our parent dies */
1668                 while (ctdb_kill(ctdb, parent, 0) == 0 || errno != ESRCH) {
1669                         sleep(5);
1670                 }
1671                 _exit(0);
1672         }
1673
1674         close(rc->fd[1]);
1675         rc->fd[1] = -1;
1676         set_close_on_exec(rc->fd[0]);
1677
1678         /* This is an active revokechild child process */
1679         DLIST_ADD_END(ctdb_db->revokechild_active, rc, NULL);
1680
1681         rc->fde = event_add_fd(ctdb->ev, rc, rc->fd[0],
1682                                    EVENT_FD_READ, revokechild_handler,
1683                                    (void *)rc);
1684         if (rc->fde == NULL) {
1685                 DEBUG(DEBUG_ERR,("Failed to set up fd event for revokechild process\n"));
1686                 talloc_free(rc);
1687         }
1688         tevent_fd_set_auto_close(rc->fde);
1689
1690         return 0;
1691 }
1692
1693 int ctdb_add_revoke_deferred_call(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, TDB_DATA key, struct ctdb_req_header *hdr, deferred_requeue_fn fn, void *call_context)
1694 {
1695         struct revokechild_handle *rc;
1696         struct revokechild_deferred_call *deferred_call;
1697
1698         for (rc = ctdb_db->revokechild_active; rc; rc = rc->next) {
1699                 if (rc->key.dsize == 0) {
1700                         continue;
1701                 }
1702                 if (rc->key.dsize != key.dsize) {
1703                         continue;
1704                 }
1705                 if (!memcmp(rc->key.dptr, key.dptr, key.dsize)) {
1706                         break;
1707                 }
1708         }
1709
1710         if (rc == NULL) {
1711                 DEBUG(DEBUG_ERR,("Failed to add deferred call to revoke list. revoke structure not found\n"));
1712                 return -1;
1713         }
1714
1715         deferred_call = talloc(rc, struct revokechild_deferred_call);
1716         if (deferred_call == NULL) {
1717                 DEBUG(DEBUG_ERR,("Failed to allocate deferred call structure for revoking record\n"));
1718                 return -1;
1719         }
1720
1721         deferred_call->ctdb = ctdb;
1722         deferred_call->hdr  = hdr;
1723         deferred_call->fn   = fn;
1724         deferred_call->ctx  = call_context;
1725
1726         talloc_set_destructor(deferred_call, deferred_call_destructor);
1727         talloc_steal(deferred_call, hdr);
1728
1729         return 0;
1730 }