Merge commit 'martins/master'
[sahlberg/ctdb.git] / server / ctdb_persistent.c
1 /* 
2    persistent store logic
3
4    Copyright (C) Andrew Tridgell  2007
5    Copyright (C) Ronnie Sahlberg  2007
6
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 3 of the License, or
10    (at your option) any later version.
11    
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16    
17    You should have received a copy of the GNU General Public License
18    along with this program; if not, see <http://www.gnu.org/licenses/>.
19 */
20
21 #include "includes.h"
22 #include "lib/events/events.h"
23 #include "system/filesys.h"
24 #include "system/wait.h"
25 #include "db_wrap.h"
26 #include "lib/tdb/include/tdb.h"
27 #include "../include/ctdb_private.h"
28
29 struct ctdb_persistent_state {
30         struct ctdb_context *ctdb;
31         struct ctdb_req_control *c;
32         const char *errormsg;
33         uint32_t num_pending;
34         int32_t status;
35         uint32_t num_failed, num_sent;
36 };
37
38 /*
39   1) all nodes fail, and all nodes reply
40   2) some nodes fail, all nodes reply
41   3) some nodes timeout
42   4) all nodes succeed
43  */
44
45 /*
46   called when a node has acknowledged a ctdb_control_update_record call
47  */
48 static void ctdb_persistent_callback(struct ctdb_context *ctdb,
49                                      int32_t status, TDB_DATA data, 
50                                      const char *errormsg,
51                                      void *private_data)
52 {
53         struct ctdb_persistent_state *state = talloc_get_type(private_data, 
54                                                               struct ctdb_persistent_state);
55
56         if (status != 0) {
57                 DEBUG(DEBUG_ERR,("ctdb_persistent_callback failed with status %d (%s)\n",
58                          status, errormsg));
59                 state->status = status;
60                 state->errormsg = errormsg;
61                 state->num_failed++;
62         }
63         state->num_pending--;
64         if (state->num_pending == 0) {
65                 enum ctdb_trans2_commit_error etype;
66                 if (state->num_failed == state->num_sent) {
67                         etype = CTDB_TRANS2_COMMIT_ALLFAIL;
68                 } else if (state->num_failed != 0) {
69                         etype = CTDB_TRANS2_COMMIT_SOMEFAIL;
70                 } else {
71                         etype = CTDB_TRANS2_COMMIT_SUCCESS;
72                 }
73                 ctdb_request_control_reply(state->ctdb, state->c, NULL, etype, state->errormsg);
74                 talloc_free(state);
75         }
76 }
77
78 /*
79   called if persistent store times out
80  */
81 static void ctdb_persistent_store_timeout(struct event_context *ev, struct timed_event *te, 
82                                          struct timeval t, void *private_data)
83 {
84         struct ctdb_persistent_state *state = talloc_get_type(private_data, struct ctdb_persistent_state);
85         
86         ctdb_request_control_reply(state->ctdb, state->c, NULL, CTDB_TRANS2_COMMIT_TIMEOUT, 
87                                    "timeout in ctdb_persistent_state");
88
89         talloc_free(state);
90 }
91
92 /*
93   store a set of persistent records - called from a ctdb client when it has updated
94   some records in a persistent database. The client will have the record
95   locked for the duration of this call. The client is the dmaster when 
96   this call is made
97  */
98 int32_t ctdb_control_trans2_commit(struct ctdb_context *ctdb, 
99                                    struct ctdb_req_control *c, 
100                                    TDB_DATA recdata, bool *async_reply)
101 {
102         struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
103         struct ctdb_persistent_state *state;
104         int i;
105         struct ctdb_marshall_buffer *m = (struct ctdb_marshall_buffer *)recdata.dptr;
106         struct ctdb_db_context *ctdb_db;
107
108         if (ctdb->recovery_mode != CTDB_RECOVERY_NORMAL) {
109                 DEBUG(DEBUG_INFO,("rejecting ctdb_control_trans2_commit when recovery active\n"));
110                 return -1;
111         }
112
113         ctdb_db = find_ctdb_db(ctdb, m->db_id);
114         if (ctdb_db == NULL) {
115                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_trans2_commit: "
116                                  "Unknown database 0x%08x\n", m->db_id));
117                 return -1;
118         }
119
120         if (client == NULL) {
121                 DEBUG(DEBUG_ERR,(__location__ " can not match persistent_store to a client. Returning error\n"));
122                 return -1;
123         }
124
125         /* handling num_persistent_updates is a bit strange - 
126            there are 3 cases
127              1) very old clients, which never called CTDB_CONTROL_START_PERSISTENT_UPDATE
128                 They don't expect num_persistent_updates to be used at all
129
130              2) less old clients, which uses CTDB_CONTROL_START_PERSISTENT_UPDATE, and expected
131                 this commit to then decrement it
132
133              3) new clients which use TRANS2 commit functions, and
134                 expect this function to increment the counter, and
135                 then have it decremented in ctdb_control_trans2_error
136                 or ctdb_control_trans2_finished
137         */
138         switch (c->opcode) {
139         case CTDB_CONTROL_PERSISTENT_STORE:
140                 if (ctdb_db->transaction_active) {
141                         DEBUG(DEBUG_ERR, (__location__ " trans2_commit client db_id[%d] transaction active - refusing persistent store\n",
142                                 client->db_id));
143                         return -1;
144                 }
145                 if (client->num_persistent_updates > 0) {
146                         client->num_persistent_updates--;
147                 }
148                 break;
149         case CTDB_CONTROL_TRANS2_COMMIT:
150                 if (ctdb_db->transaction_active) {
151                         DEBUG(DEBUG_ERR,(__location__ " trans2_commit: client "
152                                          "already has a transaction commit "
153                                          "active on db_id[%d]\n",
154                                          client->db_id));
155                         return -1;
156                 }
157                 if (client->db_id != 0) {
158                         DEBUG(DEBUG_ERR,(__location__ " ERROR: trans2_commit: "
159                                          "client-db_id[%d] != 0\n",
160                                          client->db_id));
161                         return -1;
162                 }
163                 client->num_persistent_updates++;
164                 ctdb_db->transaction_active = true;
165                 client->db_id = m->db_id;
166                 break;
167         case CTDB_CONTROL_TRANS2_COMMIT_RETRY:
168                 /* already updated from the first commit */
169                 if (client->db_id != m->db_id) {
170                         DEBUG(DEBUG_ERR,(__location__ " ERROR: trans2_commit "
171                                          "retry: client-db_id[%d] != db_id[%d]"
172                                          "\n", client->db_id, m->db_id));
173                         return -1;
174                 }
175                 break;
176         }
177
178         state = talloc_zero(ctdb, struct ctdb_persistent_state);
179         CTDB_NO_MEMORY(ctdb, state);
180
181         state->ctdb = ctdb;
182         state->c    = c;
183
184         for (i=0;i<ctdb->vnn_map->size;i++) {
185                 struct ctdb_node *node = ctdb->nodes[ctdb->vnn_map->map[i]];
186                 int ret;
187
188                 /* only send to active nodes */
189                 if (node->flags & NODE_FLAGS_INACTIVE) {
190                         continue;
191                 }
192
193                 /* don't send to ourselves */
194                 if (node->pnn == ctdb->pnn) {
195                         continue;
196                 }
197                 
198                 ret = ctdb_daemon_send_control(ctdb, node->pnn, 0, CTDB_CONTROL_UPDATE_RECORD,
199                                                c->client_id, 0, recdata, 
200                                                ctdb_persistent_callback, state);
201                 if (ret == -1) {
202                         DEBUG(DEBUG_ERR,("Unable to send CTDB_CONTROL_UPDATE_RECORD to pnn %u\n", node->pnn));
203                         talloc_free(state);
204                         return -1;
205                 }
206
207                 state->num_pending++;
208                 state->num_sent++;
209         }
210
211         if (state->num_pending == 0) {
212                 talloc_free(state);
213                 return 0;
214         }
215         
216         /* we need to wait for the replies */
217         *async_reply = true;
218
219         /* need to keep the control structure around */
220         talloc_steal(state, c);
221
222         /* but we won't wait forever */
223         event_add_timed(ctdb->ev, state, 
224                         timeval_current_ofs(ctdb->tunable.control_timeout, 0),
225                         ctdb_persistent_store_timeout, state);
226
227         return 0;
228 }
229
230
231 struct ctdb_persistent_write_state {
232         struct ctdb_db_context *ctdb_db;
233         struct ctdb_marshall_buffer *m;
234         struct ctdb_req_control *c;
235 };
236
237
238 /*
239   called from a child process to write the data
240  */
241 static int ctdb_persistent_store(struct ctdb_persistent_write_state *state)
242 {
243         int ret, i;
244         struct ctdb_rec_data *rec = NULL;
245         struct ctdb_marshall_buffer *m = state->m;
246
247         ret = tdb_transaction_start(state->ctdb_db->ltdb->tdb);
248         if (ret == -1) {
249                 DEBUG(DEBUG_ERR,("Failed to start transaction for db_id 0x%08x in ctdb_persistent_store\n",
250                                  state->ctdb_db->db_id));
251                 return -1;
252         }
253
254         for (i=0;i<m->count;i++) {
255                 struct ctdb_ltdb_header oldheader;
256                 struct ctdb_ltdb_header header;
257                 TDB_DATA key, data, olddata;
258                 TALLOC_CTX *tmp_ctx = talloc_new(state);
259
260                 rec = ctdb_marshall_loop_next(m, rec, NULL, &header, &key, &data);
261                 
262                 if (rec == NULL) {
263                         DEBUG(DEBUG_ERR,("Failed to get next record %d for db_id 0x%08x in ctdb_persistent_store\n",
264                                          i, state->ctdb_db->db_id));
265                         talloc_free(tmp_ctx);
266                         goto failed;                    
267                 }
268
269                 /* fetch the old header and ensure the rsn is less than the new rsn */
270                 ret = ctdb_ltdb_fetch(state->ctdb_db, key, &oldheader, tmp_ctx, &olddata);
271                 if (ret != 0) {
272                         DEBUG(DEBUG_ERR,("Failed to fetch old record for db_id 0x%08x in ctdb_persistent_store\n",
273                                          state->ctdb_db->db_id));
274                         talloc_free(tmp_ctx);
275                         goto failed;
276                 }
277
278                 if (oldheader.rsn >= header.rsn &&
279                     (olddata.dsize != data.dsize || 
280                      memcmp(olddata.dptr, data.dptr, data.dsize) != 0)) {
281                         DEBUG(DEBUG_CRIT,("existing header for db_id 0x%08x has larger RSN %llu than new RSN %llu in ctdb_persistent_store\n",
282                                           state->ctdb_db->db_id, 
283                                           (unsigned long long)oldheader.rsn, (unsigned long long)header.rsn));
284                         talloc_free(tmp_ctx);
285                         goto failed;
286                 }
287
288                 talloc_free(tmp_ctx);
289
290                 ret = ctdb_ltdb_store(state->ctdb_db, key, &header, data);
291                 if (ret != 0) {
292                         DEBUG(DEBUG_CRIT,("Failed to store record for db_id 0x%08x in ctdb_persistent_store\n", 
293                                           state->ctdb_db->db_id));
294                         return -1;
295                 }
296         }
297
298         ret = tdb_transaction_commit(state->ctdb_db->ltdb->tdb);
299         if (ret == -1) {
300                 DEBUG(DEBUG_ERR,("Failed to commit transaction for db_id 0x%08x in ctdb_persistent_store\n",
301                                  state->ctdb_db->db_id));
302                 return -1;
303         }
304
305         return 0;
306         
307 failed:
308         tdb_transaction_cancel(state->ctdb_db->ltdb->tdb);
309         return -1;
310 }
311
312
313 /*
314   called when we the child has completed the persistent write
315   on our behalf
316  */
317 static void ctdb_persistent_write_callback(int status, void *private_data)
318 {
319         struct ctdb_persistent_write_state *state = talloc_get_type(private_data, 
320                                                                    struct ctdb_persistent_write_state);
321
322
323         ctdb_request_control_reply(state->ctdb_db->ctdb, state->c, NULL, status, NULL);
324
325         talloc_free(state);
326 }
327
328 /*
329   called if our lockwait child times out
330  */
331 static void ctdb_persistent_lock_timeout(struct event_context *ev, struct timed_event *te, 
332                                          struct timeval t, void *private_data)
333 {
334         struct ctdb_persistent_write_state *state = talloc_get_type(private_data, 
335                                                                    struct ctdb_persistent_write_state);
336         ctdb_request_control_reply(state->ctdb_db->ctdb, state->c, NULL, -1, "timeout in ctdb_persistent_lock");
337         talloc_free(state);
338 }
339
340 struct childwrite_handle {
341         struct ctdb_context *ctdb;
342         struct ctdb_db_context *ctdb_db;
343         struct fd_event *fde;
344         int fd[2];
345         pid_t child;
346         void *private_data;
347         void (*callback)(int, void *);
348         struct timeval start_time;
349 };
350
351 static int childwrite_destructor(struct childwrite_handle *h)
352 {
353         h->ctdb->statistics.pending_childwrite_calls--;
354         kill(h->child, SIGKILL);
355         return 0;
356 }
357
358 /* called when the child process has finished writing the record to the
359    database
360 */
361 static void childwrite_handler(struct event_context *ev, struct fd_event *fde, 
362                              uint16_t flags, void *private_data)
363 {
364         struct childwrite_handle *h = talloc_get_type(private_data, 
365                                                      struct childwrite_handle);
366         void *p = h->private_data;
367         void (*callback)(int, void *) = h->callback;
368         pid_t child = h->child;
369         TALLOC_CTX *tmp_ctx = talloc_new(ev);
370         int ret;
371         char c;
372
373         ctdb_latency(h->ctdb_db, "persistent", &h->ctdb->statistics.max_childwrite_latency, h->start_time);
374         h->ctdb->statistics.pending_childwrite_calls--;
375
376         /* the handle needs to go away when the context is gone - when
377            the handle goes away this implicitly closes the pipe, which
378            kills the child */
379         talloc_steal(tmp_ctx, h);
380
381         talloc_set_destructor(h, NULL);
382
383         ret = read(h->fd[0], &c, 1);
384         if (ret < 1) {
385                 DEBUG(DEBUG_ERR, (__location__ " Read returned %d. Childwrite failed\n", ret));
386                 c = 1;
387         }
388
389         callback(c, p);
390
391         kill(child, SIGKILL);
392         talloc_free(tmp_ctx);
393 }
394
395 /* this creates a child process which will take out a tdb transaction
396    and write the record to the database.
397 */
398 struct childwrite_handle *ctdb_childwrite(struct ctdb_db_context *ctdb_db,
399                                 void (*callback)(int, void *private_data),
400                                 struct ctdb_persistent_write_state *state)
401 {
402         struct childwrite_handle *result;
403         int ret;
404         pid_t parent = getpid();
405
406         ctdb_db->ctdb->statistics.childwrite_calls++;
407         ctdb_db->ctdb->statistics.pending_childwrite_calls++;
408
409         if (!(result = talloc_zero(state, struct childwrite_handle))) {
410                 ctdb_db->ctdb->statistics.pending_childwrite_calls--;
411                 return NULL;
412         }
413
414         ret = pipe(result->fd);
415
416         if (ret != 0) {
417                 talloc_free(result);
418                 ctdb_db->ctdb->statistics.pending_childwrite_calls--;
419                 return NULL;
420         }
421
422         result->child = fork();
423
424         if (result->child == (pid_t)-1) {
425                 close(result->fd[0]);
426                 close(result->fd[1]);
427                 talloc_free(result);
428                 ctdb_db->ctdb->statistics.pending_childwrite_calls--;
429                 return NULL;
430         }
431
432         result->callback = callback;
433         result->private_data = state;
434         result->ctdb = ctdb_db->ctdb;
435         result->ctdb_db = ctdb_db;
436
437         if (result->child == 0) {
438                 char c = 0;
439
440                 close(result->fd[0]);
441                 ret = ctdb_persistent_store(state);
442                 if (ret != 0) {
443                         DEBUG(DEBUG_ERR, (__location__ " Failed to write persistent data\n"));
444                         c = 1;
445                 }
446
447                 write(result->fd[1], &c, 1);
448
449                 /* make sure we die when our parent dies */
450                 while (kill(parent, 0) == 0 || errno != ESRCH) {
451                         sleep(5);
452                 }
453                 _exit(0);
454         }
455
456         close(result->fd[1]);
457         talloc_set_destructor(result, childwrite_destructor);
458
459         result->fde = event_add_fd(ctdb_db->ctdb->ev, result, result->fd[0],
460                                    EVENT_FD_READ|EVENT_FD_AUTOCLOSE, childwrite_handler,
461                                    (void *)result);
462         if (result->fde == NULL) {
463                 talloc_free(result);
464                 ctdb_db->ctdb->statistics.pending_childwrite_calls--;
465                 return NULL;
466         }
467
468         result->start_time = timeval_current();
469
470         return result;
471 }
472
473 /* 
474    update a record on this node if the new record has a higher rsn than the
475    current record
476  */
477 int32_t ctdb_control_update_record(struct ctdb_context *ctdb, 
478                                    struct ctdb_req_control *c, TDB_DATA recdata, 
479                                    bool *async_reply)
480 {
481         struct ctdb_db_context *ctdb_db;
482         struct ctdb_persistent_write_state *state;
483         struct childwrite_handle *handle;
484         struct ctdb_marshall_buffer *m = (struct ctdb_marshall_buffer *)recdata.dptr;
485
486         if (ctdb->recovery_mode != CTDB_RECOVERY_NORMAL) {
487                 DEBUG(DEBUG_INFO,("rejecting ctdb_control_update_record when recovery active\n"));
488                 return -1;
489         }
490
491         ctdb_db = find_ctdb_db(ctdb, m->db_id);
492         if (ctdb_db == NULL) {
493                 DEBUG(DEBUG_ERR,("Unknown database 0x%08x in ctdb_control_update_record\n", m->db_id));
494                 return -1;
495         }
496
497         state = talloc(ctdb, struct ctdb_persistent_write_state);
498         CTDB_NO_MEMORY(ctdb, state);
499
500         state->ctdb_db = ctdb_db;
501         state->c       = c;
502         state->m       = m;
503
504         /* create a child process to take out a transaction and 
505            write the data.
506         */
507         handle = ctdb_childwrite(ctdb_db, ctdb_persistent_write_callback, state);
508         if (handle == NULL) {
509                 DEBUG(DEBUG_ERR,("Failed to setup childwrite handler in ctdb_control_update_record\n"));
510                 talloc_free(state);
511                 return -1;
512         }
513
514         /* we need to wait for the replies */
515         *async_reply = true;
516
517         /* need to keep the control structure around */
518         talloc_steal(state, c);
519
520         /* but we won't wait forever */
521         event_add_timed(ctdb->ev, state, timeval_current_ofs(ctdb->tunable.control_timeout, 0),
522                         ctdb_persistent_lock_timeout, state);
523
524         return 0;
525 }
526
527
528 /*
529   called when a client has finished a local commit in a transaction to 
530   a persistent database
531  */
532 int32_t ctdb_control_trans2_finished(struct ctdb_context *ctdb, 
533                                      struct ctdb_req_control *c)
534 {
535         struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
536         struct ctdb_db_context *ctdb_db;
537
538         ctdb_db = find_ctdb_db(ctdb, client->db_id);
539         if (ctdb_db == NULL) {
540                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_trans2_finish "
541                                  "Unknown database 0x%08x\n", client->db_id));
542                 return -1;
543         }
544         if (!ctdb_db->transaction_active) {
545                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_trans2_finish: "
546                                  "Database 0x%08x has no transaction commit "
547                                  "started\n", client->db_id));
548                 return -1;
549         }
550
551         ctdb_db->transaction_active = false;
552         client->db_id = 0;
553
554         if (client->num_persistent_updates == 0) {
555                 DEBUG(DEBUG_ERR, (__location__ " ERROR: num_persistent_updates == 0\n"));
556                 DEBUG(DEBUG_ERR,(__location__ " Forcing recovery\n"));
557                 client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
558                 return -1;
559         }
560         client->num_persistent_updates--;
561
562         return 0;
563 }
564
565 /*
566   called when a client gets an error committing its database
567   during a transaction commit
568  */
569 int32_t ctdb_control_trans2_error(struct ctdb_context *ctdb, 
570                                   struct ctdb_req_control *c)
571 {
572         struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
573         struct ctdb_db_context *ctdb_db;
574
575         ctdb_db = find_ctdb_db(ctdb, client->db_id);
576         if (ctdb_db == NULL) {
577                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_trans2_error: "
578                                  "Unknown database 0x%08x\n", client->db_id));
579                 return -1;
580         }
581         if (!ctdb_db->transaction_active) {
582                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_trans2_error: "
583                                  "Database 0x%08x has no transaction commit "
584                                  "started\n", client->db_id));
585                 return -1;
586         }
587
588         ctdb_db->transaction_active = false;
589         client->db_id = 0;
590
591         if (client->num_persistent_updates == 0) {
592                 DEBUG(DEBUG_ERR, (__location__ " ERROR: num_persistent_updates == 0\n"));
593         } else {
594                 client->num_persistent_updates--;
595         }
596
597         DEBUG(DEBUG_ERR,(__location__ " Forcing recovery\n"));
598         client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
599
600         return 0;
601 }
602
603
604 /*
605   backwards compatibility:
606
607   start a persistent store operation. passing both the key, header and
608   data to the daemon. If the client disconnects before it has issued
609   a persistent_update call to the daemon we trigger a full recovery
610   to ensure the databases are brought back in sync.
611   for now we ignore the recdata that the client has passed to us.
612  */
613 int32_t ctdb_control_start_persistent_update(struct ctdb_context *ctdb, 
614                                       struct ctdb_req_control *c,
615                                       TDB_DATA recdata)
616 {
617         struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
618
619         if (client == NULL) {
620                 DEBUG(DEBUG_ERR,(__location__ " can not match start_persistent_update to a client. Returning error\n"));
621                 return -1;
622         }
623
624         client->num_persistent_updates++;
625
626         return 0;
627 }
628
629 /* 
630   backwards compatibility:
631
632   called to tell ctdbd that it is no longer doing a persistent update 
633 */
634 int32_t ctdb_control_cancel_persistent_update(struct ctdb_context *ctdb, 
635                                               struct ctdb_req_control *c,
636                                               TDB_DATA recdata)
637 {
638         struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
639
640         if (client == NULL) {
641                 DEBUG(DEBUG_ERR,(__location__ " can not match cancel_persistent_update to a client. Returning error\n"));
642                 return -1;
643         }
644
645         if (client->num_persistent_updates > 0) {
646                 client->num_persistent_updates--;
647         }
648
649         return 0;
650 }
651
652
653 /*
654   backwards compatibility:
655
656   single record varient of ctdb_control_trans2_commit for older clients
657  */
658 int32_t ctdb_control_persistent_store(struct ctdb_context *ctdb, 
659                                       struct ctdb_req_control *c, 
660                                       TDB_DATA recdata, bool *async_reply)
661 {
662         struct ctdb_marshall_buffer *m;
663         struct ctdb_rec_data *rec = (struct ctdb_rec_data *)recdata.dptr;
664         TDB_DATA key, data;
665
666         if (recdata.dsize != offsetof(struct ctdb_rec_data, data) + 
667             rec->keylen + rec->datalen) {
668                 DEBUG(DEBUG_ERR, (__location__ " Bad data size in recdata\n"));
669                 return -1;
670         }
671
672         key.dptr = &rec->data[0];
673         key.dsize = rec->keylen;
674         data.dptr = &rec->data[rec->keylen];
675         data.dsize = rec->datalen;
676
677         m = ctdb_marshall_add(c, NULL, rec->reqid, rec->reqid, key, NULL, data);
678         CTDB_NO_MEMORY(ctdb, m);
679
680         return ctdb_control_trans2_commit(ctdb, c, ctdb_marshall_finish(m), async_reply);
681 }
682
683