server: add positive debug statements to trans2_commit and trans2_finished
[sahlberg/ctdb.git] / server / ctdb_persistent.c
1 /* 
2    persistent store logic
3
4    Copyright (C) Andrew Tridgell  2007
5    Copyright (C) Ronnie Sahlberg  2007
6
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 3 of the License, or
10    (at your option) any later version.
11    
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16    
17    You should have received a copy of the GNU General Public License
18    along with this program; if not, see <http://www.gnu.org/licenses/>.
19 */
20
21 #include "includes.h"
22 #include "lib/events/events.h"
23 #include "system/filesys.h"
24 #include "system/wait.h"
25 #include "db_wrap.h"
26 #include "lib/tdb/include/tdb.h"
27 #include "../include/ctdb_private.h"
28
29 struct ctdb_persistent_state {
30         struct ctdb_context *ctdb;
31         struct ctdb_req_control *c;
32         const char *errormsg;
33         uint32_t num_pending;
34         int32_t status;
35         uint32_t num_failed, num_sent;
36 };
37
38 /*
39   1) all nodes fail, and all nodes reply
40   2) some nodes fail, all nodes reply
41   3) some nodes timeout
42   4) all nodes succeed
43  */
44
45 /*
46   called when a node has acknowledged a ctdb_control_update_record call
47  */
48 static void ctdb_persistent_callback(struct ctdb_context *ctdb,
49                                      int32_t status, TDB_DATA data, 
50                                      const char *errormsg,
51                                      void *private_data)
52 {
53         struct ctdb_persistent_state *state = talloc_get_type(private_data, 
54                                                               struct ctdb_persistent_state);
55
56         if (status != 0) {
57                 DEBUG(DEBUG_ERR,("ctdb_persistent_callback failed with status %d (%s)\n",
58                          status, errormsg));
59                 state->status = status;
60                 state->errormsg = errormsg;
61                 state->num_failed++;
62         }
63         state->num_pending--;
64         if (state->num_pending == 0) {
65                 enum ctdb_trans2_commit_error etype;
66                 if (state->num_failed == state->num_sent) {
67                         etype = CTDB_TRANS2_COMMIT_ALLFAIL;
68                 } else if (state->num_failed != 0) {
69                         etype = CTDB_TRANS2_COMMIT_SOMEFAIL;
70                 } else {
71                         etype = CTDB_TRANS2_COMMIT_SUCCESS;
72                 }
73                 ctdb_request_control_reply(state->ctdb, state->c, NULL, etype, state->errormsg);
74                 talloc_free(state);
75         }
76 }
77
78 /*
79   called if persistent store times out
80  */
81 static void ctdb_persistent_store_timeout(struct event_context *ev, struct timed_event *te, 
82                                          struct timeval t, void *private_data)
83 {
84         struct ctdb_persistent_state *state = talloc_get_type(private_data, struct ctdb_persistent_state);
85         
86         ctdb_request_control_reply(state->ctdb, state->c, NULL, CTDB_TRANS2_COMMIT_TIMEOUT, 
87                                    "timeout in ctdb_persistent_state");
88
89         talloc_free(state);
90 }
91
92 /*
93   store a set of persistent records - called from a ctdb client when it has updated
94   some records in a persistent database. The client will have the record
95   locked for the duration of this call. The client is the dmaster when 
96   this call is made
97  */
98 int32_t ctdb_control_trans2_commit(struct ctdb_context *ctdb, 
99                                    struct ctdb_req_control *c, 
100                                    TDB_DATA recdata, bool *async_reply)
101 {
102         struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
103         struct ctdb_persistent_state *state;
104         int i;
105         struct ctdb_marshall_buffer *m = (struct ctdb_marshall_buffer *)recdata.dptr;
106         struct ctdb_db_context *ctdb_db;
107
108         if (ctdb->recovery_mode != CTDB_RECOVERY_NORMAL) {
109                 DEBUG(DEBUG_INFO,("rejecting ctdb_control_trans2_commit when recovery active\n"));
110                 return -1;
111         }
112
113         ctdb_db = find_ctdb_db(ctdb, m->db_id);
114         if (ctdb_db == NULL) {
115                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_trans2_commit: "
116                                  "Unknown database 0x%08x\n", m->db_id));
117                 return -1;
118         }
119
120         if (client == NULL) {
121                 DEBUG(DEBUG_ERR,(__location__ " can not match persistent_store to a client. Returning error\n"));
122                 return -1;
123         }
124
125         /* handling num_persistent_updates is a bit strange - 
126            there are 3 cases
127              1) very old clients, which never called CTDB_CONTROL_START_PERSISTENT_UPDATE
128                 They don't expect num_persistent_updates to be used at all
129
130              2) less old clients, which uses CTDB_CONTROL_START_PERSISTENT_UPDATE, and expected
131                 this commit to then decrement it
132
133              3) new clients which use TRANS2 commit functions, and
134                 expect this function to increment the counter, and
135                 then have it decremented in ctdb_control_trans2_error
136                 or ctdb_control_trans2_finished
137         */
138         switch (c->opcode) {
139         case CTDB_CONTROL_PERSISTENT_STORE:
140                 if (ctdb_db->transaction_active) {
141                         DEBUG(DEBUG_ERR, (__location__ " trans2_commit client db_id[%d] transaction active - refusing persistent store\n",
142                                 client->db_id));
143                         return -1;
144                 }
145                 if (client->num_persistent_updates > 0) {
146                         client->num_persistent_updates--;
147                 }
148                 break;
149         case CTDB_CONTROL_TRANS2_COMMIT:
150                 if (ctdb_db->transaction_active) {
151                         DEBUG(DEBUG_ERR,(__location__ " trans2_commit: client "
152                                          "already has a transaction commit "
153                                          "active on db_id[%d]\n",
154                                          client->db_id));
155                         return -1;
156                 }
157                 if (client->db_id != 0) {
158                         DEBUG(DEBUG_ERR,(__location__ " ERROR: trans2_commit: "
159                                          "client-db_id[%d] != 0\n",
160                                          client->db_id));
161                         return -1;
162                 }
163                 client->num_persistent_updates++;
164                 ctdb_db->transaction_active = true;
165                 client->db_id = m->db_id;
166                 DEBUG(DEBUG_DEBUG, (__location__ " client id[0x%08x] started to"
167                                   " commit transaction on db id[0x%08x]\n",
168                                   client->client_id, client->db_id));
169                 break;
170         case CTDB_CONTROL_TRANS2_COMMIT_RETRY:
171                 /* already updated from the first commit */
172                 if (client->db_id != m->db_id) {
173                         DEBUG(DEBUG_ERR,(__location__ " ERROR: trans2_commit "
174                                          "retry: client-db_id[%d] != db_id[%d]"
175                                          "\n", client->db_id, m->db_id));
176                         return -1;
177                 }
178                 DEBUG(DEBUG_DEBUG, (__location__ " client id[0x%08x] started "
179                                     "transaction commit retry on "
180                                     "db_id[0x%08x]\n",
181                                     client->client_id, client->db_id));
182                 break;
183         }
184
185         state = talloc_zero(ctdb, struct ctdb_persistent_state);
186         CTDB_NO_MEMORY(ctdb, state);
187
188         state->ctdb = ctdb;
189         state->c    = c;
190
191         for (i=0;i<ctdb->vnn_map->size;i++) {
192                 struct ctdb_node *node = ctdb->nodes[ctdb->vnn_map->map[i]];
193                 int ret;
194
195                 /* only send to active nodes */
196                 if (node->flags & NODE_FLAGS_INACTIVE) {
197                         continue;
198                 }
199
200                 /* don't send to ourselves */
201                 if (node->pnn == ctdb->pnn) {
202                         continue;
203                 }
204                 
205                 ret = ctdb_daemon_send_control(ctdb, node->pnn, 0, CTDB_CONTROL_UPDATE_RECORD,
206                                                c->client_id, 0, recdata, 
207                                                ctdb_persistent_callback, state);
208                 if (ret == -1) {
209                         DEBUG(DEBUG_ERR,("Unable to send CTDB_CONTROL_UPDATE_RECORD to pnn %u\n", node->pnn));
210                         talloc_free(state);
211                         return -1;
212                 }
213
214                 state->num_pending++;
215                 state->num_sent++;
216         }
217
218         if (state->num_pending == 0) {
219                 talloc_free(state);
220                 return 0;
221         }
222         
223         /* we need to wait for the replies */
224         *async_reply = true;
225
226         /* need to keep the control structure around */
227         talloc_steal(state, c);
228
229         /* but we won't wait forever */
230         event_add_timed(ctdb->ev, state, 
231                         timeval_current_ofs(ctdb->tunable.control_timeout, 0),
232                         ctdb_persistent_store_timeout, state);
233
234         return 0;
235 }
236
237
238 struct ctdb_persistent_write_state {
239         struct ctdb_db_context *ctdb_db;
240         struct ctdb_marshall_buffer *m;
241         struct ctdb_req_control *c;
242 };
243
244
245 /*
246   called from a child process to write the data
247  */
248 static int ctdb_persistent_store(struct ctdb_persistent_write_state *state)
249 {
250         int ret, i;
251         struct ctdb_rec_data *rec = NULL;
252         struct ctdb_marshall_buffer *m = state->m;
253
254         ret = tdb_transaction_start(state->ctdb_db->ltdb->tdb);
255         if (ret == -1) {
256                 DEBUG(DEBUG_ERR,("Failed to start transaction for db_id 0x%08x in ctdb_persistent_store\n",
257                                  state->ctdb_db->db_id));
258                 return -1;
259         }
260
261         for (i=0;i<m->count;i++) {
262                 struct ctdb_ltdb_header oldheader;
263                 struct ctdb_ltdb_header header;
264                 TDB_DATA key, data, olddata;
265                 TALLOC_CTX *tmp_ctx = talloc_new(state);
266
267                 rec = ctdb_marshall_loop_next(m, rec, NULL, &header, &key, &data);
268                 
269                 if (rec == NULL) {
270                         DEBUG(DEBUG_ERR,("Failed to get next record %d for db_id 0x%08x in ctdb_persistent_store\n",
271                                          i, state->ctdb_db->db_id));
272                         talloc_free(tmp_ctx);
273                         goto failed;                    
274                 }
275
276                 /* fetch the old header and ensure the rsn is less than the new rsn */
277                 ret = ctdb_ltdb_fetch(state->ctdb_db, key, &oldheader, tmp_ctx, &olddata);
278                 if (ret != 0) {
279                         DEBUG(DEBUG_ERR,("Failed to fetch old record for db_id 0x%08x in ctdb_persistent_store\n",
280                                          state->ctdb_db->db_id));
281                         talloc_free(tmp_ctx);
282                         goto failed;
283                 }
284
285                 if (oldheader.rsn >= header.rsn &&
286                     (olddata.dsize != data.dsize || 
287                      memcmp(olddata.dptr, data.dptr, data.dsize) != 0)) {
288                         DEBUG(DEBUG_CRIT,("existing header for db_id 0x%08x has larger RSN %llu than new RSN %llu in ctdb_persistent_store\n",
289                                           state->ctdb_db->db_id, 
290                                           (unsigned long long)oldheader.rsn, (unsigned long long)header.rsn));
291                         talloc_free(tmp_ctx);
292                         goto failed;
293                 }
294
295                 talloc_free(tmp_ctx);
296
297                 ret = ctdb_ltdb_store(state->ctdb_db, key, &header, data);
298                 if (ret != 0) {
299                         DEBUG(DEBUG_CRIT,("Failed to store record for db_id 0x%08x in ctdb_persistent_store\n", 
300                                           state->ctdb_db->db_id));
301                         goto failed;
302                 }
303         }
304
305         ret = tdb_transaction_commit(state->ctdb_db->ltdb->tdb);
306         if (ret == -1) {
307                 DEBUG(DEBUG_ERR,("Failed to commit transaction for db_id 0x%08x in ctdb_persistent_store\n",
308                                  state->ctdb_db->db_id));
309                 return -1;
310         }
311
312         return 0;
313         
314 failed:
315         tdb_transaction_cancel(state->ctdb_db->ltdb->tdb);
316         return -1;
317 }
318
319
320 /*
321   called when we the child has completed the persistent write
322   on our behalf
323  */
324 static void ctdb_persistent_write_callback(int status, void *private_data)
325 {
326         struct ctdb_persistent_write_state *state = talloc_get_type(private_data, 
327                                                                    struct ctdb_persistent_write_state);
328
329
330         ctdb_request_control_reply(state->ctdb_db->ctdb, state->c, NULL, status, NULL);
331
332         talloc_free(state);
333 }
334
335 /*
336   called if our lockwait child times out
337  */
338 static void ctdb_persistent_lock_timeout(struct event_context *ev, struct timed_event *te, 
339                                          struct timeval t, void *private_data)
340 {
341         struct ctdb_persistent_write_state *state = talloc_get_type(private_data, 
342                                                                    struct ctdb_persistent_write_state);
343         ctdb_request_control_reply(state->ctdb_db->ctdb, state->c, NULL, -1, "timeout in ctdb_persistent_lock");
344         talloc_free(state);
345 }
346
347 struct childwrite_handle {
348         struct ctdb_context *ctdb;
349         struct ctdb_db_context *ctdb_db;
350         struct fd_event *fde;
351         int fd[2];
352         pid_t child;
353         void *private_data;
354         void (*callback)(int, void *);
355         struct timeval start_time;
356 };
357
358 static int childwrite_destructor(struct childwrite_handle *h)
359 {
360         h->ctdb->statistics.pending_childwrite_calls--;
361         kill(h->child, SIGKILL);
362         return 0;
363 }
364
365 /* called when the child process has finished writing the record to the
366    database
367 */
368 static void childwrite_handler(struct event_context *ev, struct fd_event *fde, 
369                              uint16_t flags, void *private_data)
370 {
371         struct childwrite_handle *h = talloc_get_type(private_data, 
372                                                      struct childwrite_handle);
373         void *p = h->private_data;
374         void (*callback)(int, void *) = h->callback;
375         pid_t child = h->child;
376         TALLOC_CTX *tmp_ctx = talloc_new(ev);
377         int ret;
378         char c;
379
380         ctdb_latency(h->ctdb_db, "persistent", &h->ctdb->statistics.max_childwrite_latency, h->start_time);
381         h->ctdb->statistics.pending_childwrite_calls--;
382
383         /* the handle needs to go away when the context is gone - when
384            the handle goes away this implicitly closes the pipe, which
385            kills the child */
386         talloc_steal(tmp_ctx, h);
387
388         talloc_set_destructor(h, NULL);
389
390         ret = read(h->fd[0], &c, 1);
391         if (ret < 1) {
392                 DEBUG(DEBUG_ERR, (__location__ " Read returned %d. Childwrite failed\n", ret));
393                 c = 1;
394         }
395
396         callback(c, p);
397
398         kill(child, SIGKILL);
399         talloc_free(tmp_ctx);
400 }
401
402 /* this creates a child process which will take out a tdb transaction
403    and write the record to the database.
404 */
405 struct childwrite_handle *ctdb_childwrite(struct ctdb_db_context *ctdb_db,
406                                 void (*callback)(int, void *private_data),
407                                 struct ctdb_persistent_write_state *state)
408 {
409         struct childwrite_handle *result;
410         int ret;
411         pid_t parent = getpid();
412
413         ctdb_db->ctdb->statistics.childwrite_calls++;
414         ctdb_db->ctdb->statistics.pending_childwrite_calls++;
415
416         if (!(result = talloc_zero(state, struct childwrite_handle))) {
417                 ctdb_db->ctdb->statistics.pending_childwrite_calls--;
418                 return NULL;
419         }
420
421         ret = pipe(result->fd);
422
423         if (ret != 0) {
424                 talloc_free(result);
425                 ctdb_db->ctdb->statistics.pending_childwrite_calls--;
426                 return NULL;
427         }
428
429         result->child = fork();
430
431         if (result->child == (pid_t)-1) {
432                 close(result->fd[0]);
433                 close(result->fd[1]);
434                 talloc_free(result);
435                 ctdb_db->ctdb->statistics.pending_childwrite_calls--;
436                 return NULL;
437         }
438
439         result->callback = callback;
440         result->private_data = state;
441         result->ctdb = ctdb_db->ctdb;
442         result->ctdb_db = ctdb_db;
443
444         if (result->child == 0) {
445                 char c = 0;
446
447                 close(result->fd[0]);
448                 ret = ctdb_persistent_store(state);
449                 if (ret != 0) {
450                         DEBUG(DEBUG_ERR, (__location__ " Failed to write persistent data\n"));
451                         c = 1;
452                 }
453
454                 write(result->fd[1], &c, 1);
455
456                 /* make sure we die when our parent dies */
457                 while (kill(parent, 0) == 0 || errno != ESRCH) {
458                         sleep(5);
459                 }
460                 _exit(0);
461         }
462
463         close(result->fd[1]);
464         set_close_on_exec(result->fd[0]);
465
466         talloc_set_destructor(result, childwrite_destructor);
467
468         DEBUG(DEBUG_NOTICE, (__location__ " Created PIPE FD:%d for ctdb_childwrite\n", result->fd[0]));
469
470         result->fde = event_add_fd(ctdb_db->ctdb->ev, result, result->fd[0],
471                                    EVENT_FD_READ|EVENT_FD_AUTOCLOSE, childwrite_handler,
472                                    (void *)result);
473         if (result->fde == NULL) {
474                 talloc_free(result);
475                 ctdb_db->ctdb->statistics.pending_childwrite_calls--;
476                 return NULL;
477         }
478
479         result->start_time = timeval_current();
480
481         return result;
482 }
483
484 /* 
485    update a record on this node if the new record has a higher rsn than the
486    current record
487  */
488 int32_t ctdb_control_update_record(struct ctdb_context *ctdb, 
489                                    struct ctdb_req_control *c, TDB_DATA recdata, 
490                                    bool *async_reply)
491 {
492         struct ctdb_db_context *ctdb_db;
493         struct ctdb_persistent_write_state *state;
494         struct childwrite_handle *handle;
495         struct ctdb_marshall_buffer *m = (struct ctdb_marshall_buffer *)recdata.dptr;
496
497         if (ctdb->recovery_mode != CTDB_RECOVERY_NORMAL) {
498                 DEBUG(DEBUG_INFO,("rejecting ctdb_control_update_record when recovery active\n"));
499                 return -1;
500         }
501
502         ctdb_db = find_ctdb_db(ctdb, m->db_id);
503         if (ctdb_db == NULL) {
504                 DEBUG(DEBUG_ERR,("Unknown database 0x%08x in ctdb_control_update_record\n", m->db_id));
505                 return -1;
506         }
507
508         state = talloc(ctdb, struct ctdb_persistent_write_state);
509         CTDB_NO_MEMORY(ctdb, state);
510
511         state->ctdb_db = ctdb_db;
512         state->c       = c;
513         state->m       = m;
514
515         /* create a child process to take out a transaction and 
516            write the data.
517         */
518         handle = ctdb_childwrite(ctdb_db, ctdb_persistent_write_callback, state);
519         if (handle == NULL) {
520                 DEBUG(DEBUG_ERR,("Failed to setup childwrite handler in ctdb_control_update_record\n"));
521                 talloc_free(state);
522                 return -1;
523         }
524
525         /* we need to wait for the replies */
526         *async_reply = true;
527
528         /* need to keep the control structure around */
529         talloc_steal(state, c);
530
531         /* but we won't wait forever */
532         event_add_timed(ctdb->ev, state, timeval_current_ofs(ctdb->tunable.control_timeout, 0),
533                         ctdb_persistent_lock_timeout, state);
534
535         return 0;
536 }
537
538
539 /*
540   called when a client has finished a local commit in a transaction to 
541   a persistent database
542  */
543 int32_t ctdb_control_trans2_finished(struct ctdb_context *ctdb, 
544                                      struct ctdb_req_control *c)
545 {
546         struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
547         struct ctdb_db_context *ctdb_db;
548
549         ctdb_db = find_ctdb_db(ctdb, client->db_id);
550         if (ctdb_db == NULL) {
551                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_trans2_finish "
552                                  "Unknown database 0x%08x\n", client->db_id));
553                 return -1;
554         }
555         if (!ctdb_db->transaction_active) {
556                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_trans2_finish: "
557                                  "Database 0x%08x has no transaction commit "
558                                  "started\n", client->db_id));
559                 return -1;
560         }
561
562         ctdb_db->transaction_active = false;
563         client->db_id = 0;
564
565         if (client->num_persistent_updates == 0) {
566                 DEBUG(DEBUG_ERR, (__location__ " ERROR: num_persistent_updates == 0\n"));
567                 DEBUG(DEBUG_ERR,(__location__ " Forcing recovery\n"));
568                 client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
569                 return -1;
570         }
571         client->num_persistent_updates--;
572
573         DEBUG(DEBUG_DEBUG, (__location__ " client id[0x%08x] finished "
574                             "transaction commit db_id[0x%08x]\n",
575                             client->client_id, ctdb_db->db_id));
576
577         return 0;
578 }
579
580 /*
581   called when a client gets an error committing its database
582   during a transaction commit
583  */
584 int32_t ctdb_control_trans2_error(struct ctdb_context *ctdb, 
585                                   struct ctdb_req_control *c)
586 {
587         struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
588         struct ctdb_db_context *ctdb_db;
589
590         ctdb_db = find_ctdb_db(ctdb, client->db_id);
591         if (ctdb_db == NULL) {
592                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_trans2_error: "
593                                  "Unknown database 0x%08x\n", client->db_id));
594                 return -1;
595         }
596         if (!ctdb_db->transaction_active) {
597                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_trans2_error: "
598                                  "Database 0x%08x has no transaction commit "
599                                  "started\n", client->db_id));
600                 return -1;
601         }
602
603         ctdb_db->transaction_active = false;
604         client->db_id = 0;
605
606         if (client->num_persistent_updates == 0) {
607                 DEBUG(DEBUG_ERR, (__location__ " ERROR: num_persistent_updates == 0\n"));
608         } else {
609                 client->num_persistent_updates--;
610         }
611
612         DEBUG(DEBUG_ERR,(__location__ " Forcing recovery\n"));
613         client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
614
615         return 0;
616 }
617
618 /**
619  * Tell whether a transaction is active on this node on the give DB.
620  */
621 int32_t ctdb_control_trans2_active(struct ctdb_context *ctdb,
622                                    struct ctdb_req_control *c,
623                                    uint32_t db_id)
624 {
625         struct ctdb_db_context *ctdb_db;
626         struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
627
628         ctdb_db = find_ctdb_db(ctdb, db_id);
629         if (!ctdb_db) {
630                 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", db_id));
631                 return -1;
632         }
633
634         if (client->db_id == db_id) {
635                 return 0;
636         }
637
638         if (ctdb_db->transaction_active) {
639                 return 1;
640         } else {
641                 return 0;
642         }
643 }
644
645 /*
646   backwards compatibility:
647
648   start a persistent store operation. passing both the key, header and
649   data to the daemon. If the client disconnects before it has issued
650   a persistent_update call to the daemon we trigger a full recovery
651   to ensure the databases are brought back in sync.
652   for now we ignore the recdata that the client has passed to us.
653  */
654 int32_t ctdb_control_start_persistent_update(struct ctdb_context *ctdb, 
655                                       struct ctdb_req_control *c,
656                                       TDB_DATA recdata)
657 {
658         struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
659
660         if (client == NULL) {
661                 DEBUG(DEBUG_ERR,(__location__ " can not match start_persistent_update to a client. Returning error\n"));
662                 return -1;
663         }
664
665         client->num_persistent_updates++;
666
667         return 0;
668 }
669
670 /* 
671   backwards compatibility:
672
673   called to tell ctdbd that it is no longer doing a persistent update 
674 */
675 int32_t ctdb_control_cancel_persistent_update(struct ctdb_context *ctdb, 
676                                               struct ctdb_req_control *c,
677                                               TDB_DATA recdata)
678 {
679         struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
680
681         if (client == NULL) {
682                 DEBUG(DEBUG_ERR,(__location__ " can not match cancel_persistent_update to a client. Returning error\n"));
683                 return -1;
684         }
685
686         if (client->num_persistent_updates > 0) {
687                 client->num_persistent_updates--;
688         }
689
690         return 0;
691 }
692
693
694 /*
695   backwards compatibility:
696
697   single record varient of ctdb_control_trans2_commit for older clients
698  */
699 int32_t ctdb_control_persistent_store(struct ctdb_context *ctdb, 
700                                       struct ctdb_req_control *c, 
701                                       TDB_DATA recdata, bool *async_reply)
702 {
703         struct ctdb_marshall_buffer *m;
704         struct ctdb_rec_data *rec = (struct ctdb_rec_data *)recdata.dptr;
705         TDB_DATA key, data;
706
707         if (recdata.dsize != offsetof(struct ctdb_rec_data, data) + 
708             rec->keylen + rec->datalen) {
709                 DEBUG(DEBUG_ERR, (__location__ " Bad data size in recdata\n"));
710                 return -1;
711         }
712
713         key.dptr = &rec->data[0];
714         key.dsize = rec->keylen;
715         data.dptr = &rec->data[rec->keylen];
716         data.dsize = rec->datalen;
717
718         m = ctdb_marshall_add(c, NULL, rec->reqid, rec->reqid, key, NULL, data);
719         CTDB_NO_MEMORY(ctdb, m);
720
721         return ctdb_control_trans2_commit(ctdb, c, ctdb_marshall_finish(m), async_reply);
722 }
723
724