server: trans2_active: don't report a transaction active on the node that performs...
[metze/ctdb/wip.git] / server / ctdb_persistent.c
1 /* 
2    persistent store logic
3
4    Copyright (C) Andrew Tridgell  2007
5    Copyright (C) Ronnie Sahlberg  2007
6
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 3 of the License, or
10    (at your option) any later version.
11    
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16    
17    You should have received a copy of the GNU General Public License
18    along with this program; if not, see <http://www.gnu.org/licenses/>.
19 */
20
21 #include "includes.h"
22 #include "lib/events/events.h"
23 #include "system/filesys.h"
24 #include "system/wait.h"
25 #include "db_wrap.h"
26 #include "lib/tdb/include/tdb.h"
27 #include "../include/ctdb_private.h"
28
29 struct ctdb_persistent_state {
30         struct ctdb_context *ctdb;
31         struct ctdb_req_control *c;
32         const char *errormsg;
33         uint32_t num_pending;
34         int32_t status;
35         uint32_t num_failed, num_sent;
36 };
37
38 /*
39   1) all nodes fail, and all nodes reply
40   2) some nodes fail, all nodes reply
41   3) some nodes timeout
42   4) all nodes succeed
43  */
44
45 /*
46   called when a node has acknowledged a ctdb_control_update_record call
47  */
48 static void ctdb_persistent_callback(struct ctdb_context *ctdb,
49                                      int32_t status, TDB_DATA data, 
50                                      const char *errormsg,
51                                      void *private_data)
52 {
53         struct ctdb_persistent_state *state = talloc_get_type(private_data, 
54                                                               struct ctdb_persistent_state);
55
56         if (status != 0) {
57                 DEBUG(DEBUG_ERR,("ctdb_persistent_callback failed with status %d (%s)\n",
58                          status, errormsg));
59                 state->status = status;
60                 state->errormsg = errormsg;
61                 state->num_failed++;
62         }
63         state->num_pending--;
64         if (state->num_pending == 0) {
65                 enum ctdb_trans2_commit_error etype;
66                 if (state->num_failed == state->num_sent) {
67                         etype = CTDB_TRANS2_COMMIT_ALLFAIL;
68                 } else if (state->num_failed != 0) {
69                         etype = CTDB_TRANS2_COMMIT_SOMEFAIL;
70                 } else {
71                         etype = CTDB_TRANS2_COMMIT_SUCCESS;
72                 }
73                 ctdb_request_control_reply(state->ctdb, state->c, NULL, etype, state->errormsg);
74                 talloc_free(state);
75         }
76 }
77
78 /*
79   called if persistent store times out
80  */
81 static void ctdb_persistent_store_timeout(struct event_context *ev, struct timed_event *te, 
82                                          struct timeval t, void *private_data)
83 {
84         struct ctdb_persistent_state *state = talloc_get_type(private_data, struct ctdb_persistent_state);
85         
86         ctdb_request_control_reply(state->ctdb, state->c, NULL, CTDB_TRANS2_COMMIT_TIMEOUT, 
87                                    "timeout in ctdb_persistent_state");
88
89         talloc_free(state);
90 }
91
92 /*
93   store a set of persistent records - called from a ctdb client when it has updated
94   some records in a persistent database. The client will have the record
95   locked for the duration of this call. The client is the dmaster when 
96   this call is made
97  */
98 int32_t ctdb_control_trans2_commit(struct ctdb_context *ctdb, 
99                                    struct ctdb_req_control *c, 
100                                    TDB_DATA recdata, bool *async_reply)
101 {
102         struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
103         struct ctdb_persistent_state *state;
104         int i;
105         struct ctdb_marshall_buffer *m = (struct ctdb_marshall_buffer *)recdata.dptr;
106         struct ctdb_db_context *ctdb_db;
107
108         if (ctdb->recovery_mode != CTDB_RECOVERY_NORMAL) {
109                 DEBUG(DEBUG_INFO,("rejecting ctdb_control_trans2_commit when recovery active\n"));
110                 return -1;
111         }
112
113         ctdb_db = find_ctdb_db(ctdb, m->db_id);
114         if (ctdb_db == NULL) {
115                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_trans2_commit: "
116                                  "Unknown database 0x%08x\n", m->db_id));
117                 return -1;
118         }
119
120         if (client == NULL) {
121                 DEBUG(DEBUG_ERR,(__location__ " can not match persistent_store to a client. Returning error\n"));
122                 return -1;
123         }
124
125         /* handling num_persistent_updates is a bit strange - 
126            there are 3 cases
127              1) very old clients, which never called CTDB_CONTROL_START_PERSISTENT_UPDATE
128                 They don't expect num_persistent_updates to be used at all
129
130              2) less old clients, which uses CTDB_CONTROL_START_PERSISTENT_UPDATE, and expected
131                 this commit to then decrement it
132
133              3) new clients which use TRANS2 commit functions, and
134                 expect this function to increment the counter, and
135                 then have it decremented in ctdb_control_trans2_error
136                 or ctdb_control_trans2_finished
137         */
138         switch (c->opcode) {
139         case CTDB_CONTROL_PERSISTENT_STORE:
140                 if (ctdb_db->transaction_active) {
141                         DEBUG(DEBUG_ERR, (__location__ " trans2_commit client db_id[%d] transaction active - refusing persistent store\n",
142                                 client->db_id));
143                         return -1;
144                 }
145                 if (client->num_persistent_updates > 0) {
146                         client->num_persistent_updates--;
147                 }
148                 break;
149         case CTDB_CONTROL_TRANS2_COMMIT:
150                 if (ctdb_db->transaction_active) {
151                         DEBUG(DEBUG_ERR,(__location__ " trans2_commit: client "
152                                          "already has a transaction commit "
153                                          "active on db_id[%d]\n",
154                                          client->db_id));
155                         return -1;
156                 }
157                 if (client->db_id != 0) {
158                         DEBUG(DEBUG_ERR,(__location__ " ERROR: trans2_commit: "
159                                          "client-db_id[%d] != 0\n",
160                                          client->db_id));
161                         return -1;
162                 }
163                 client->num_persistent_updates++;
164                 ctdb_db->transaction_active = true;
165                 client->db_id = m->db_id;
166                 break;
167         case CTDB_CONTROL_TRANS2_COMMIT_RETRY:
168                 /* already updated from the first commit */
169                 if (client->db_id != m->db_id) {
170                         DEBUG(DEBUG_ERR,(__location__ " ERROR: trans2_commit "
171                                          "retry: client-db_id[%d] != db_id[%d]"
172                                          "\n", client->db_id, m->db_id));
173                         return -1;
174                 }
175                 break;
176         }
177
178         state = talloc_zero(ctdb, struct ctdb_persistent_state);
179         CTDB_NO_MEMORY(ctdb, state);
180
181         state->ctdb = ctdb;
182         state->c    = c;
183
184         for (i=0;i<ctdb->vnn_map->size;i++) {
185                 struct ctdb_node *node = ctdb->nodes[ctdb->vnn_map->map[i]];
186                 int ret;
187
188                 /* only send to active nodes */
189                 if (node->flags & NODE_FLAGS_INACTIVE) {
190                         continue;
191                 }
192
193                 /* don't send to ourselves */
194                 if (node->pnn == ctdb->pnn) {
195                         continue;
196                 }
197                 
198                 ret = ctdb_daemon_send_control(ctdb, node->pnn, 0, CTDB_CONTROL_UPDATE_RECORD,
199                                                c->client_id, 0, recdata, 
200                                                ctdb_persistent_callback, state);
201                 if (ret == -1) {
202                         DEBUG(DEBUG_ERR,("Unable to send CTDB_CONTROL_UPDATE_RECORD to pnn %u\n", node->pnn));
203                         talloc_free(state);
204                         return -1;
205                 }
206
207                 state->num_pending++;
208                 state->num_sent++;
209         }
210
211         if (state->num_pending == 0) {
212                 talloc_free(state);
213                 return 0;
214         }
215         
216         /* we need to wait for the replies */
217         *async_reply = true;
218
219         /* need to keep the control structure around */
220         talloc_steal(state, c);
221
222         /* but we won't wait forever */
223         event_add_timed(ctdb->ev, state, 
224                         timeval_current_ofs(ctdb->tunable.control_timeout, 0),
225                         ctdb_persistent_store_timeout, state);
226
227         return 0;
228 }
229
230
231 struct ctdb_persistent_write_state {
232         struct ctdb_db_context *ctdb_db;
233         struct ctdb_marshall_buffer *m;
234         struct ctdb_req_control *c;
235 };
236
237
238 /*
239   called from a child process to write the data
240  */
241 static int ctdb_persistent_store(struct ctdb_persistent_write_state *state)
242 {
243         int ret, i;
244         struct ctdb_rec_data *rec = NULL;
245         struct ctdb_marshall_buffer *m = state->m;
246
247         ret = tdb_transaction_start(state->ctdb_db->ltdb->tdb);
248         if (ret == -1) {
249                 DEBUG(DEBUG_ERR,("Failed to start transaction for db_id 0x%08x in ctdb_persistent_store\n",
250                                  state->ctdb_db->db_id));
251                 return -1;
252         }
253
254         for (i=0;i<m->count;i++) {
255                 struct ctdb_ltdb_header oldheader;
256                 struct ctdb_ltdb_header header;
257                 TDB_DATA key, data, olddata;
258                 TALLOC_CTX *tmp_ctx = talloc_new(state);
259
260                 rec = ctdb_marshall_loop_next(m, rec, NULL, &header, &key, &data);
261                 
262                 if (rec == NULL) {
263                         DEBUG(DEBUG_ERR,("Failed to get next record %d for db_id 0x%08x in ctdb_persistent_store\n",
264                                          i, state->ctdb_db->db_id));
265                         talloc_free(tmp_ctx);
266                         goto failed;                    
267                 }
268
269                 /* fetch the old header and ensure the rsn is less than the new rsn */
270                 ret = ctdb_ltdb_fetch(state->ctdb_db, key, &oldheader, tmp_ctx, &olddata);
271                 if (ret != 0) {
272                         DEBUG(DEBUG_ERR,("Failed to fetch old record for db_id 0x%08x in ctdb_persistent_store\n",
273                                          state->ctdb_db->db_id));
274                         talloc_free(tmp_ctx);
275                         goto failed;
276                 }
277
278                 if (oldheader.rsn >= header.rsn &&
279                     (olddata.dsize != data.dsize || 
280                      memcmp(olddata.dptr, data.dptr, data.dsize) != 0)) {
281                         DEBUG(DEBUG_CRIT,("existing header for db_id 0x%08x has larger RSN %llu than new RSN %llu in ctdb_persistent_store\n",
282                                           state->ctdb_db->db_id, 
283                                           (unsigned long long)oldheader.rsn, (unsigned long long)header.rsn));
284                         talloc_free(tmp_ctx);
285                         goto failed;
286                 }
287
288                 talloc_free(tmp_ctx);
289
290                 ret = ctdb_ltdb_store(state->ctdb_db, key, &header, data);
291                 if (ret != 0) {
292                         DEBUG(DEBUG_CRIT,("Failed to store record for db_id 0x%08x in ctdb_persistent_store\n", 
293                                           state->ctdb_db->db_id));
294                         goto failed;
295                 }
296         }
297
298         ret = tdb_transaction_commit(state->ctdb_db->ltdb->tdb);
299         if (ret == -1) {
300                 DEBUG(DEBUG_ERR,("Failed to commit transaction for db_id 0x%08x in ctdb_persistent_store\n",
301                                  state->ctdb_db->db_id));
302                 return -1;
303         }
304
305         return 0;
306         
307 failed:
308         tdb_transaction_cancel(state->ctdb_db->ltdb->tdb);
309         return -1;
310 }
311
312
313 /*
314   called when we the child has completed the persistent write
315   on our behalf
316  */
317 static void ctdb_persistent_write_callback(int status, void *private_data)
318 {
319         struct ctdb_persistent_write_state *state = talloc_get_type(private_data, 
320                                                                    struct ctdb_persistent_write_state);
321
322
323         ctdb_request_control_reply(state->ctdb_db->ctdb, state->c, NULL, status, NULL);
324
325         talloc_free(state);
326 }
327
328 /*
329   called if our lockwait child times out
330  */
331 static void ctdb_persistent_lock_timeout(struct event_context *ev, struct timed_event *te, 
332                                          struct timeval t, void *private_data)
333 {
334         struct ctdb_persistent_write_state *state = talloc_get_type(private_data, 
335                                                                    struct ctdb_persistent_write_state);
336         ctdb_request_control_reply(state->ctdb_db->ctdb, state->c, NULL, -1, "timeout in ctdb_persistent_lock");
337         talloc_free(state);
338 }
339
340 struct childwrite_handle {
341         struct ctdb_context *ctdb;
342         struct ctdb_db_context *ctdb_db;
343         struct fd_event *fde;
344         int fd[2];
345         pid_t child;
346         void *private_data;
347         void (*callback)(int, void *);
348         struct timeval start_time;
349 };
350
351 static int childwrite_destructor(struct childwrite_handle *h)
352 {
353         h->ctdb->statistics.pending_childwrite_calls--;
354         kill(h->child, SIGKILL);
355         return 0;
356 }
357
358 /* called when the child process has finished writing the record to the
359    database
360 */
361 static void childwrite_handler(struct event_context *ev, struct fd_event *fde, 
362                              uint16_t flags, void *private_data)
363 {
364         struct childwrite_handle *h = talloc_get_type(private_data, 
365                                                      struct childwrite_handle);
366         void *p = h->private_data;
367         void (*callback)(int, void *) = h->callback;
368         pid_t child = h->child;
369         TALLOC_CTX *tmp_ctx = talloc_new(ev);
370         int ret;
371         char c;
372
373         ctdb_latency(h->ctdb_db, "persistent", &h->ctdb->statistics.max_childwrite_latency, h->start_time);
374         h->ctdb->statistics.pending_childwrite_calls--;
375
376         /* the handle needs to go away when the context is gone - when
377            the handle goes away this implicitly closes the pipe, which
378            kills the child */
379         talloc_steal(tmp_ctx, h);
380
381         talloc_set_destructor(h, NULL);
382
383         ret = read(h->fd[0], &c, 1);
384         if (ret < 1) {
385                 DEBUG(DEBUG_ERR, (__location__ " Read returned %d. Childwrite failed\n", ret));
386                 c = 1;
387         }
388
389         callback(c, p);
390
391         kill(child, SIGKILL);
392         talloc_free(tmp_ctx);
393 }
394
395 /* this creates a child process which will take out a tdb transaction
396    and write the record to the database.
397 */
398 struct childwrite_handle *ctdb_childwrite(struct ctdb_db_context *ctdb_db,
399                                 void (*callback)(int, void *private_data),
400                                 struct ctdb_persistent_write_state *state)
401 {
402         struct childwrite_handle *result;
403         int ret;
404         pid_t parent = getpid();
405
406         ctdb_db->ctdb->statistics.childwrite_calls++;
407         ctdb_db->ctdb->statistics.pending_childwrite_calls++;
408
409         if (!(result = talloc_zero(state, struct childwrite_handle))) {
410                 ctdb_db->ctdb->statistics.pending_childwrite_calls--;
411                 return NULL;
412         }
413
414         ret = pipe(result->fd);
415
416         if (ret != 0) {
417                 talloc_free(result);
418                 ctdb_db->ctdb->statistics.pending_childwrite_calls--;
419                 return NULL;
420         }
421
422         result->child = fork();
423
424         if (result->child == (pid_t)-1) {
425                 close(result->fd[0]);
426                 close(result->fd[1]);
427                 talloc_free(result);
428                 ctdb_db->ctdb->statistics.pending_childwrite_calls--;
429                 return NULL;
430         }
431
432         result->callback = callback;
433         result->private_data = state;
434         result->ctdb = ctdb_db->ctdb;
435         result->ctdb_db = ctdb_db;
436
437         if (result->child == 0) {
438                 char c = 0;
439
440                 close(result->fd[0]);
441                 ret = ctdb_persistent_store(state);
442                 if (ret != 0) {
443                         DEBUG(DEBUG_ERR, (__location__ " Failed to write persistent data\n"));
444                         c = 1;
445                 }
446
447                 write(result->fd[1], &c, 1);
448
449                 /* make sure we die when our parent dies */
450                 while (kill(parent, 0) == 0 || errno != ESRCH) {
451                         sleep(5);
452                 }
453                 _exit(0);
454         }
455
456         close(result->fd[1]);
457         set_close_on_exec(result->fd[0]);
458
459         talloc_set_destructor(result, childwrite_destructor);
460
461         DEBUG(DEBUG_NOTICE, (__location__ " Created PIPE FD:%d for ctdb_childwrite\n", result->fd[0]));
462
463         result->fde = event_add_fd(ctdb_db->ctdb->ev, result, result->fd[0],
464                                    EVENT_FD_READ|EVENT_FD_AUTOCLOSE, childwrite_handler,
465                                    (void *)result);
466         if (result->fde == NULL) {
467                 talloc_free(result);
468                 ctdb_db->ctdb->statistics.pending_childwrite_calls--;
469                 return NULL;
470         }
471
472         result->start_time = timeval_current();
473
474         return result;
475 }
476
477 /* 
478    update a record on this node if the new record has a higher rsn than the
479    current record
480  */
481 int32_t ctdb_control_update_record(struct ctdb_context *ctdb, 
482                                    struct ctdb_req_control *c, TDB_DATA recdata, 
483                                    bool *async_reply)
484 {
485         struct ctdb_db_context *ctdb_db;
486         struct ctdb_persistent_write_state *state;
487         struct childwrite_handle *handle;
488         struct ctdb_marshall_buffer *m = (struct ctdb_marshall_buffer *)recdata.dptr;
489
490         if (ctdb->recovery_mode != CTDB_RECOVERY_NORMAL) {
491                 DEBUG(DEBUG_INFO,("rejecting ctdb_control_update_record when recovery active\n"));
492                 return -1;
493         }
494
495         ctdb_db = find_ctdb_db(ctdb, m->db_id);
496         if (ctdb_db == NULL) {
497                 DEBUG(DEBUG_ERR,("Unknown database 0x%08x in ctdb_control_update_record\n", m->db_id));
498                 return -1;
499         }
500
501         state = talloc(ctdb, struct ctdb_persistent_write_state);
502         CTDB_NO_MEMORY(ctdb, state);
503
504         state->ctdb_db = ctdb_db;
505         state->c       = c;
506         state->m       = m;
507
508         /* create a child process to take out a transaction and 
509            write the data.
510         */
511         handle = ctdb_childwrite(ctdb_db, ctdb_persistent_write_callback, state);
512         if (handle == NULL) {
513                 DEBUG(DEBUG_ERR,("Failed to setup childwrite handler in ctdb_control_update_record\n"));
514                 talloc_free(state);
515                 return -1;
516         }
517
518         /* we need to wait for the replies */
519         *async_reply = true;
520
521         /* need to keep the control structure around */
522         talloc_steal(state, c);
523
524         /* but we won't wait forever */
525         event_add_timed(ctdb->ev, state, timeval_current_ofs(ctdb->tunable.control_timeout, 0),
526                         ctdb_persistent_lock_timeout, state);
527
528         return 0;
529 }
530
531
532 /*
533   called when a client has finished a local commit in a transaction to 
534   a persistent database
535  */
536 int32_t ctdb_control_trans2_finished(struct ctdb_context *ctdb, 
537                                      struct ctdb_req_control *c)
538 {
539         struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
540         struct ctdb_db_context *ctdb_db;
541
542         ctdb_db = find_ctdb_db(ctdb, client->db_id);
543         if (ctdb_db == NULL) {
544                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_trans2_finish "
545                                  "Unknown database 0x%08x\n", client->db_id));
546                 return -1;
547         }
548         if (!ctdb_db->transaction_active) {
549                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_trans2_finish: "
550                                  "Database 0x%08x has no transaction commit "
551                                  "started\n", client->db_id));
552                 return -1;
553         }
554
555         ctdb_db->transaction_active = false;
556         client->db_id = 0;
557
558         if (client->num_persistent_updates == 0) {
559                 DEBUG(DEBUG_ERR, (__location__ " ERROR: num_persistent_updates == 0\n"));
560                 DEBUG(DEBUG_ERR,(__location__ " Forcing recovery\n"));
561                 client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
562                 return -1;
563         }
564         client->num_persistent_updates--;
565
566         return 0;
567 }
568
569 /*
570   called when a client gets an error committing its database
571   during a transaction commit
572  */
573 int32_t ctdb_control_trans2_error(struct ctdb_context *ctdb, 
574                                   struct ctdb_req_control *c)
575 {
576         struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
577         struct ctdb_db_context *ctdb_db;
578
579         ctdb_db = find_ctdb_db(ctdb, client->db_id);
580         if (ctdb_db == NULL) {
581                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_trans2_error: "
582                                  "Unknown database 0x%08x\n", client->db_id));
583                 return -1;
584         }
585         if (!ctdb_db->transaction_active) {
586                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_trans2_error: "
587                                  "Database 0x%08x has no transaction commit "
588                                  "started\n", client->db_id));
589                 return -1;
590         }
591
592         ctdb_db->transaction_active = false;
593         client->db_id = 0;
594
595         if (client->num_persistent_updates == 0) {
596                 DEBUG(DEBUG_ERR, (__location__ " ERROR: num_persistent_updates == 0\n"));
597         } else {
598                 client->num_persistent_updates--;
599         }
600
601         DEBUG(DEBUG_ERR,(__location__ " Forcing recovery\n"));
602         client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
603
604         return 0;
605 }
606
607 /**
608  * Tell whether a transaction is active on this node on the give DB.
609  */
610 int32_t ctdb_control_trans2_active(struct ctdb_context *ctdb,
611                                    struct ctdb_req_control *c,
612                                    uint32_t db_id)
613 {
614         struct ctdb_db_context *ctdb_db;
615         struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
616
617         ctdb_db = find_ctdb_db(ctdb, db_id);
618         if (!ctdb_db) {
619                 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", db_id));
620                 return -1;
621         }
622
623         if (client->db_id == db_id) {
624                 return 0;
625         }
626
627         if (ctdb_db->transaction_active) {
628                 return 1;
629         } else {
630                 return 0;
631         }
632 }
633
634 /*
635   backwards compatibility:
636
637   start a persistent store operation. passing both the key, header and
638   data to the daemon. If the client disconnects before it has issued
639   a persistent_update call to the daemon we trigger a full recovery
640   to ensure the databases are brought back in sync.
641   for now we ignore the recdata that the client has passed to us.
642  */
643 int32_t ctdb_control_start_persistent_update(struct ctdb_context *ctdb, 
644                                       struct ctdb_req_control *c,
645                                       TDB_DATA recdata)
646 {
647         struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
648
649         if (client == NULL) {
650                 DEBUG(DEBUG_ERR,(__location__ " can not match start_persistent_update to a client. Returning error\n"));
651                 return -1;
652         }
653
654         client->num_persistent_updates++;
655
656         return 0;
657 }
658
659 /* 
660   backwards compatibility:
661
662   called to tell ctdbd that it is no longer doing a persistent update 
663 */
664 int32_t ctdb_control_cancel_persistent_update(struct ctdb_context *ctdb, 
665                                               struct ctdb_req_control *c,
666                                               TDB_DATA recdata)
667 {
668         struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
669
670         if (client == NULL) {
671                 DEBUG(DEBUG_ERR,(__location__ " can not match cancel_persistent_update to a client. Returning error\n"));
672                 return -1;
673         }
674
675         if (client->num_persistent_updates > 0) {
676                 client->num_persistent_updates--;
677         }
678
679         return 0;
680 }
681
682
683 /*
684   backwards compatibility:
685
686   single record varient of ctdb_control_trans2_commit for older clients
687  */
688 int32_t ctdb_control_persistent_store(struct ctdb_context *ctdb, 
689                                       struct ctdb_req_control *c, 
690                                       TDB_DATA recdata, bool *async_reply)
691 {
692         struct ctdb_marshall_buffer *m;
693         struct ctdb_rec_data *rec = (struct ctdb_rec_data *)recdata.dptr;
694         TDB_DATA key, data;
695
696         if (recdata.dsize != offsetof(struct ctdb_rec_data, data) + 
697             rec->keylen + rec->datalen) {
698                 DEBUG(DEBUG_ERR, (__location__ " Bad data size in recdata\n"));
699                 return -1;
700         }
701
702         key.dptr = &rec->data[0];
703         key.dsize = rec->keylen;
704         data.dptr = &rec->data[rec->keylen];
705         data.dsize = rec->datalen;
706
707         m = ctdb_marshall_add(c, NULL, rec->reqid, rec->reqid, key, NULL, data);
708         CTDB_NO_MEMORY(ctdb, m);
709
710         return ctdb_control_trans2_commit(ctdb, c, ctdb_marshall_finish(m), async_reply);
711 }
712
713