server: extend a debug message in ctdb_control_trans2_error()
[metze/ctdb/wip.git] / server / ctdb_persistent.c
1 /* 
2    persistent store logic
3
4    Copyright (C) Andrew Tridgell  2007
5    Copyright (C) Ronnie Sahlberg  2007
6
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 3 of the License, or
10    (at your option) any later version.
11    
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16    
17    You should have received a copy of the GNU General Public License
18    along with this program; if not, see <http://www.gnu.org/licenses/>.
19 */
20
21 #include "includes.h"
22 #include "lib/events/events.h"
23 #include "system/filesys.h"
24 #include "system/wait.h"
25 #include "db_wrap.h"
26 #include "lib/tdb/include/tdb.h"
27 #include "../include/ctdb_private.h"
28
29 struct ctdb_persistent_state {
30         struct ctdb_context *ctdb;
31         struct ctdb_req_control *c;
32         const char *errormsg;
33         uint32_t num_pending;
34         int32_t status;
35         uint32_t num_failed, num_sent;
36 };
37
38 /*
39   1) all nodes fail, and all nodes reply
40   2) some nodes fail, all nodes reply
41   3) some nodes timeout
42   4) all nodes succeed
43  */
44
45 /*
46   called when a node has acknowledged a ctdb_control_update_record call
47  */
48 static void ctdb_persistent_callback(struct ctdb_context *ctdb,
49                                      int32_t status, TDB_DATA data, 
50                                      const char *errormsg,
51                                      void *private_data)
52 {
53         struct ctdb_persistent_state *state = talloc_get_type(private_data, 
54                                                               struct ctdb_persistent_state);
55
56         if (status != 0) {
57                 DEBUG(DEBUG_ERR,("ctdb_persistent_callback failed with status %d (%s)\n",
58                          status, errormsg));
59                 state->status = status;
60                 state->errormsg = errormsg;
61                 state->num_failed++;
62         }
63         state->num_pending--;
64         if (state->num_pending == 0) {
65                 enum ctdb_trans2_commit_error etype;
66                 if (state->num_failed == state->num_sent) {
67                         etype = CTDB_TRANS2_COMMIT_ALLFAIL;
68                 } else if (state->num_failed != 0) {
69                         etype = CTDB_TRANS2_COMMIT_SOMEFAIL;
70                 } else {
71                         etype = CTDB_TRANS2_COMMIT_SUCCESS;
72                 }
73                 ctdb_request_control_reply(state->ctdb, state->c, NULL, etype, state->errormsg);
74                 talloc_free(state);
75         }
76 }
77
78 /*
79   called if persistent store times out
80  */
81 static void ctdb_persistent_store_timeout(struct event_context *ev, struct timed_event *te, 
82                                          struct timeval t, void *private_data)
83 {
84         struct ctdb_persistent_state *state = talloc_get_type(private_data, struct ctdb_persistent_state);
85         
86         ctdb_request_control_reply(state->ctdb, state->c, NULL, CTDB_TRANS2_COMMIT_TIMEOUT, 
87                                    "timeout in ctdb_persistent_state");
88
89         talloc_free(state);
90 }
91
92 /*
93   store a set of persistent records - called from a ctdb client when it has updated
94   some records in a persistent database. The client will have the record
95   locked for the duration of this call. The client is the dmaster when 
96   this call is made
97  */
98 int32_t ctdb_control_trans2_commit(struct ctdb_context *ctdb, 
99                                    struct ctdb_req_control *c, 
100                                    TDB_DATA recdata, bool *async_reply)
101 {
102         struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
103         struct ctdb_persistent_state *state;
104         int i;
105         struct ctdb_marshall_buffer *m = (struct ctdb_marshall_buffer *)recdata.dptr;
106         struct ctdb_db_context *ctdb_db;
107
108         if (ctdb->recovery_mode != CTDB_RECOVERY_NORMAL) {
109                 DEBUG(DEBUG_INFO,("rejecting ctdb_control_trans2_commit when recovery active\n"));
110                 return -1;
111         }
112
113         ctdb_db = find_ctdb_db(ctdb, m->db_id);
114         if (ctdb_db == NULL) {
115                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_trans2_commit: "
116                                  "Unknown database 0x%08x\n", m->db_id));
117                 return -1;
118         }
119
120         if (client == NULL) {
121                 DEBUG(DEBUG_ERR,(__location__ " can not match persistent_store to a client. Returning error\n"));
122                 return -1;
123         }
124
125         /* handling num_persistent_updates is a bit strange - 
126            there are 3 cases
127              1) very old clients, which never called CTDB_CONTROL_START_PERSISTENT_UPDATE
128                 They don't expect num_persistent_updates to be used at all
129
130              2) less old clients, which uses CTDB_CONTROL_START_PERSISTENT_UPDATE, and expected
131                 this commit to then decrement it
132
133              3) new clients which use TRANS2 commit functions, and
134                 expect this function to increment the counter, and
135                 then have it decremented in ctdb_control_trans2_error
136                 or ctdb_control_trans2_finished
137         */
138         switch (c->opcode) {
139         case CTDB_CONTROL_PERSISTENT_STORE:
140                 if (ctdb_db->transaction_active) {
141                         DEBUG(DEBUG_ERR, (__location__ " trans2_commit client db_id[%d] transaction active - refusing persistent store\n",
142                                 client->db_id));
143                         return -1;
144                 }
145                 if (client->num_persistent_updates > 0) {
146                         client->num_persistent_updates--;
147                 }
148                 break;
149         case CTDB_CONTROL_TRANS2_COMMIT:
150                 if (ctdb_db->transaction_active) {
151                         DEBUG(DEBUG_ERR,(__location__ " trans2_commit: client "
152                                          "already has a transaction commit "
153                                          "active on db_id[%d]\n",
154                                          client->db_id));
155                         return -1;
156                 }
157                 if (client->db_id != 0) {
158                         DEBUG(DEBUG_ERR,(__location__ " ERROR: trans2_commit: "
159                                          "client-db_id[%d] != 0\n",
160                                          client->db_id));
161                         return -1;
162                 }
163                 client->num_persistent_updates++;
164                 ctdb_db->transaction_active = true;
165                 client->db_id = m->db_id;
166                 DEBUG(DEBUG_DEBUG, (__location__ " client id[0x%08x] started to"
167                                   " commit transaction on db id[0x%08x]\n",
168                                   client->client_id, client->db_id));
169                 break;
170         case CTDB_CONTROL_TRANS2_COMMIT_RETRY:
171                 /* already updated from the first commit */
172                 if (client->db_id != m->db_id) {
173                         DEBUG(DEBUG_ERR,(__location__ " ERROR: trans2_commit "
174                                          "retry: client-db_id[%d] != db_id[%d]"
175                                          "\n", client->db_id, m->db_id));
176                         return -1;
177                 }
178                 DEBUG(DEBUG_DEBUG, (__location__ " client id[0x%08x] started "
179                                     "transaction commit retry on "
180                                     "db_id[0x%08x]\n",
181                                     client->client_id, client->db_id));
182                 break;
183         }
184
185         state = talloc_zero(ctdb, struct ctdb_persistent_state);
186         CTDB_NO_MEMORY(ctdb, state);
187
188         state->ctdb = ctdb;
189         state->c    = c;
190
191         for (i=0;i<ctdb->vnn_map->size;i++) {
192                 struct ctdb_node *node = ctdb->nodes[ctdb->vnn_map->map[i]];
193                 int ret;
194
195                 /* only send to active nodes */
196                 if (node->flags & NODE_FLAGS_INACTIVE) {
197                         continue;
198                 }
199
200                 /* don't send to ourselves */
201                 if (node->pnn == ctdb->pnn) {
202                         continue;
203                 }
204                 
205                 ret = ctdb_daemon_send_control(ctdb, node->pnn, 0, CTDB_CONTROL_UPDATE_RECORD,
206                                                c->client_id, 0, recdata, 
207                                                ctdb_persistent_callback, state);
208                 if (ret == -1) {
209                         DEBUG(DEBUG_ERR,("Unable to send CTDB_CONTROL_UPDATE_RECORD to pnn %u\n", node->pnn));
210                         talloc_free(state);
211                         return -1;
212                 }
213
214                 state->num_pending++;
215                 state->num_sent++;
216         }
217
218         if (state->num_pending == 0) {
219                 talloc_free(state);
220                 return 0;
221         }
222         
223         /* we need to wait for the replies */
224         *async_reply = true;
225
226         /* need to keep the control structure around */
227         talloc_steal(state, c);
228
229         /* but we won't wait forever */
230         event_add_timed(ctdb->ev, state, 
231                         timeval_current_ofs(ctdb->tunable.control_timeout, 0),
232                         ctdb_persistent_store_timeout, state);
233
234         return 0;
235 }
236
237
238 struct ctdb_persistent_write_state {
239         struct ctdb_db_context *ctdb_db;
240         struct ctdb_marshall_buffer *m;
241         struct ctdb_req_control *c;
242 };
243
244
245 /*
246   called from a child process to write the data
247  */
248 static int ctdb_persistent_store(struct ctdb_persistent_write_state *state)
249 {
250         int ret, i;
251         struct ctdb_rec_data *rec = NULL;
252         struct ctdb_marshall_buffer *m = state->m;
253
254         ret = tdb_transaction_start(state->ctdb_db->ltdb->tdb);
255         if (ret == -1) {
256                 DEBUG(DEBUG_ERR,("Failed to start transaction for db_id 0x%08x in ctdb_persistent_store\n",
257                                  state->ctdb_db->db_id));
258                 return -1;
259         }
260
261         for (i=0;i<m->count;i++) {
262                 struct ctdb_ltdb_header oldheader;
263                 struct ctdb_ltdb_header header;
264                 TDB_DATA key, data, olddata;
265                 TALLOC_CTX *tmp_ctx = talloc_new(state);
266
267                 rec = ctdb_marshall_loop_next(m, rec, NULL, &header, &key, &data);
268                 
269                 if (rec == NULL) {
270                         DEBUG(DEBUG_ERR,("Failed to get next record %d for db_id 0x%08x in ctdb_persistent_store\n",
271                                          i, state->ctdb_db->db_id));
272                         talloc_free(tmp_ctx);
273                         goto failed;                    
274                 }
275
276                 /* fetch the old header and ensure the rsn is less than the new rsn */
277                 ret = ctdb_ltdb_fetch(state->ctdb_db, key, &oldheader, tmp_ctx, &olddata);
278                 if (ret != 0) {
279                         DEBUG(DEBUG_ERR,("Failed to fetch old record for db_id 0x%08x in ctdb_persistent_store\n",
280                                          state->ctdb_db->db_id));
281                         talloc_free(tmp_ctx);
282                         goto failed;
283                 }
284
285                 if (oldheader.rsn >= header.rsn &&
286                     (olddata.dsize != data.dsize || 
287                      memcmp(olddata.dptr, data.dptr, data.dsize) != 0)) {
288                         DEBUG(DEBUG_CRIT,("existing header for db_id 0x%08x has larger RSN %llu than new RSN %llu in ctdb_persistent_store\n",
289                                           state->ctdb_db->db_id, 
290                                           (unsigned long long)oldheader.rsn, (unsigned long long)header.rsn));
291                         talloc_free(tmp_ctx);
292                         goto failed;
293                 }
294
295                 talloc_free(tmp_ctx);
296
297                 ret = ctdb_ltdb_store(state->ctdb_db, key, &header, data);
298                 if (ret != 0) {
299                         DEBUG(DEBUG_CRIT,("Failed to store record for db_id 0x%08x in ctdb_persistent_store\n", 
300                                           state->ctdb_db->db_id));
301                         goto failed;
302                 }
303         }
304
305         ret = tdb_transaction_commit(state->ctdb_db->ltdb->tdb);
306         if (ret == -1) {
307                 DEBUG(DEBUG_ERR,("Failed to commit transaction for db_id 0x%08x in ctdb_persistent_store\n",
308                                  state->ctdb_db->db_id));
309                 return -1;
310         }
311
312         return 0;
313         
314 failed:
315         tdb_transaction_cancel(state->ctdb_db->ltdb->tdb);
316         return -1;
317 }
318
319
320 /*
321   called when we the child has completed the persistent write
322   on our behalf
323  */
324 static void ctdb_persistent_write_callback(int status, void *private_data)
325 {
326         struct ctdb_persistent_write_state *state = talloc_get_type(private_data, 
327                                                                    struct ctdb_persistent_write_state);
328
329
330         ctdb_request_control_reply(state->ctdb_db->ctdb, state->c, NULL, status, NULL);
331
332         talloc_free(state);
333 }
334
335 /*
336   called if our lockwait child times out
337  */
338 static void ctdb_persistent_lock_timeout(struct event_context *ev, struct timed_event *te, 
339                                          struct timeval t, void *private_data)
340 {
341         struct ctdb_persistent_write_state *state = talloc_get_type(private_data, 
342                                                                    struct ctdb_persistent_write_state);
343         ctdb_request_control_reply(state->ctdb_db->ctdb, state->c, NULL, -1, "timeout in ctdb_persistent_lock");
344         talloc_free(state);
345 }
346
347 struct childwrite_handle {
348         struct ctdb_context *ctdb;
349         struct ctdb_db_context *ctdb_db;
350         struct fd_event *fde;
351         int fd[2];
352         pid_t child;
353         void *private_data;
354         void (*callback)(int, void *);
355         struct timeval start_time;
356 };
357
358 static int childwrite_destructor(struct childwrite_handle *h)
359 {
360         h->ctdb->statistics.pending_childwrite_calls--;
361         kill(h->child, SIGKILL);
362         return 0;
363 }
364
365 /* called when the child process has finished writing the record to the
366    database
367 */
368 static void childwrite_handler(struct event_context *ev, struct fd_event *fde, 
369                              uint16_t flags, void *private_data)
370 {
371         struct childwrite_handle *h = talloc_get_type(private_data, 
372                                                      struct childwrite_handle);
373         void *p = h->private_data;
374         void (*callback)(int, void *) = h->callback;
375         pid_t child = h->child;
376         TALLOC_CTX *tmp_ctx = talloc_new(ev);
377         int ret;
378         char c;
379
380         ctdb_latency(h->ctdb_db, "persistent", &h->ctdb->statistics.max_childwrite_latency, h->start_time);
381         h->ctdb->statistics.pending_childwrite_calls--;
382
383         /* the handle needs to go away when the context is gone - when
384            the handle goes away this implicitly closes the pipe, which
385            kills the child */
386         talloc_steal(tmp_ctx, h);
387
388         talloc_set_destructor(h, NULL);
389
390         ret = read(h->fd[0], &c, 1);
391         if (ret < 1) {
392                 DEBUG(DEBUG_ERR, (__location__ " Read returned %d. Childwrite failed\n", ret));
393                 c = 1;
394         }
395
396         callback(c, p);
397
398         kill(child, SIGKILL);
399         talloc_free(tmp_ctx);
400 }
401
402 /* this creates a child process which will take out a tdb transaction
403    and write the record to the database.
404 */
405 struct childwrite_handle *ctdb_childwrite(struct ctdb_db_context *ctdb_db,
406                                 void (*callback)(int, void *private_data),
407                                 struct ctdb_persistent_write_state *state)
408 {
409         struct childwrite_handle *result;
410         int ret;
411         pid_t parent = getpid();
412
413         ctdb_db->ctdb->statistics.childwrite_calls++;
414         ctdb_db->ctdb->statistics.pending_childwrite_calls++;
415
416         if (!(result = talloc_zero(state, struct childwrite_handle))) {
417                 ctdb_db->ctdb->statistics.pending_childwrite_calls--;
418                 return NULL;
419         }
420
421         ret = pipe(result->fd);
422
423         if (ret != 0) {
424                 talloc_free(result);
425                 ctdb_db->ctdb->statistics.pending_childwrite_calls--;
426                 return NULL;
427         }
428
429         result->child = fork();
430
431         if (result->child == (pid_t)-1) {
432                 close(result->fd[0]);
433                 close(result->fd[1]);
434                 talloc_free(result);
435                 ctdb_db->ctdb->statistics.pending_childwrite_calls--;
436                 return NULL;
437         }
438
439         result->callback = callback;
440         result->private_data = state;
441         result->ctdb = ctdb_db->ctdb;
442         result->ctdb_db = ctdb_db;
443
444         if (result->child == 0) {
445                 char c = 0;
446
447                 close(result->fd[0]);
448                 ret = ctdb_persistent_store(state);
449                 if (ret != 0) {
450                         DEBUG(DEBUG_ERR, (__location__ " Failed to write persistent data\n"));
451                         c = 1;
452                 }
453
454                 write(result->fd[1], &c, 1);
455
456                 /* make sure we die when our parent dies */
457                 while (kill(parent, 0) == 0 || errno != ESRCH) {
458                         sleep(5);
459                 }
460                 _exit(0);
461         }
462
463         close(result->fd[1]);
464         set_close_on_exec(result->fd[0]);
465
466         talloc_set_destructor(result, childwrite_destructor);
467
468         DEBUG(DEBUG_NOTICE, (__location__ " Created PIPE FD:%d for ctdb_childwrite\n", result->fd[0]));
469
470         result->fde = event_add_fd(ctdb_db->ctdb->ev, result, result->fd[0],
471                                    EVENT_FD_READ|EVENT_FD_AUTOCLOSE, childwrite_handler,
472                                    (void *)result);
473         if (result->fde == NULL) {
474                 talloc_free(result);
475                 ctdb_db->ctdb->statistics.pending_childwrite_calls--;
476                 return NULL;
477         }
478
479         result->start_time = timeval_current();
480
481         return result;
482 }
483
484 /* 
485    update a record on this node if the new record has a higher rsn than the
486    current record
487  */
488 int32_t ctdb_control_update_record(struct ctdb_context *ctdb, 
489                                    struct ctdb_req_control *c, TDB_DATA recdata, 
490                                    bool *async_reply)
491 {
492         struct ctdb_db_context *ctdb_db;
493         struct ctdb_persistent_write_state *state;
494         struct childwrite_handle *handle;
495         struct ctdb_marshall_buffer *m = (struct ctdb_marshall_buffer *)recdata.dptr;
496
497         if (ctdb->recovery_mode != CTDB_RECOVERY_NORMAL) {
498                 DEBUG(DEBUG_INFO,("rejecting ctdb_control_update_record when recovery active\n"));
499                 return -1;
500         }
501
502         ctdb_db = find_ctdb_db(ctdb, m->db_id);
503         if (ctdb_db == NULL) {
504                 DEBUG(DEBUG_ERR,("Unknown database 0x%08x in ctdb_control_update_record\n", m->db_id));
505                 return -1;
506         }
507
508         state = talloc(ctdb, struct ctdb_persistent_write_state);
509         CTDB_NO_MEMORY(ctdb, state);
510
511         state->ctdb_db = ctdb_db;
512         state->c       = c;
513         state->m       = m;
514
515         /* create a child process to take out a transaction and 
516            write the data.
517         */
518         handle = ctdb_childwrite(ctdb_db, ctdb_persistent_write_callback, state);
519         if (handle == NULL) {
520                 DEBUG(DEBUG_ERR,("Failed to setup childwrite handler in ctdb_control_update_record\n"));
521                 talloc_free(state);
522                 return -1;
523         }
524
525         /* we need to wait for the replies */
526         *async_reply = true;
527
528         /* need to keep the control structure around */
529         talloc_steal(state, c);
530
531         /* but we won't wait forever */
532         event_add_timed(ctdb->ev, state, timeval_current_ofs(ctdb->tunable.control_timeout, 0),
533                         ctdb_persistent_lock_timeout, state);
534
535         return 0;
536 }
537
538
539 /*
540   called when a client has finished a local commit in a transaction to 
541   a persistent database
542  */
543 int32_t ctdb_control_trans2_finished(struct ctdb_context *ctdb, 
544                                      struct ctdb_req_control *c)
545 {
546         struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
547         struct ctdb_db_context *ctdb_db;
548
549         ctdb_db = find_ctdb_db(ctdb, client->db_id);
550         if (ctdb_db == NULL) {
551                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_trans2_finish "
552                                  "Unknown database 0x%08x\n", client->db_id));
553                 return -1;
554         }
555         if (!ctdb_db->transaction_active) {
556                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_trans2_finish: "
557                                  "Database 0x%08x has no transaction commit "
558                                  "started\n", client->db_id));
559                 return -1;
560         }
561
562         ctdb_db->transaction_active = false;
563         client->db_id = 0;
564
565         if (client->num_persistent_updates == 0) {
566                 DEBUG(DEBUG_ERR, (__location__ " ERROR: num_persistent_updates == 0\n"));
567                 DEBUG(DEBUG_ERR,(__location__ " Forcing recovery\n"));
568                 client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
569                 return -1;
570         }
571         client->num_persistent_updates--;
572
573         DEBUG(DEBUG_DEBUG, (__location__ " client id[0x%08x] finished "
574                             "transaction commit db_id[0x%08x]\n",
575                             client->client_id, ctdb_db->db_id));
576
577         return 0;
578 }
579
580 /*
581   called when a client gets an error committing its database
582   during a transaction commit
583  */
584 int32_t ctdb_control_trans2_error(struct ctdb_context *ctdb, 
585                                   struct ctdb_req_control *c)
586 {
587         struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
588         struct ctdb_db_context *ctdb_db;
589
590         ctdb_db = find_ctdb_db(ctdb, client->db_id);
591         if (ctdb_db == NULL) {
592                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_trans2_error: "
593                                  "Unknown database 0x%08x\n", client->db_id));
594                 return -1;
595         }
596         if (!ctdb_db->transaction_active) {
597                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_trans2_error: "
598                                  "Database 0x%08x has no transaction commit "
599                                  "started\n", client->db_id));
600                 return -1;
601         }
602
603         ctdb_db->transaction_active = false;
604         client->db_id = 0;
605
606         if (client->num_persistent_updates == 0) {
607                 DEBUG(DEBUG_ERR, (__location__ " ERROR: num_persistent_updates == 0\n"));
608         } else {
609                 client->num_persistent_updates--;
610         }
611
612         DEBUG(DEBUG_ERR,(__location__ " An error occurred during transaction on"
613                          " db_id[0x%08x] - forcing recovery\n",
614                          ctdb_db->db_id));
615         client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
616
617         return 0;
618 }
619
620 /**
621  * Tell whether a transaction is active on this node on the give DB.
622  */
623 int32_t ctdb_control_trans2_active(struct ctdb_context *ctdb,
624                                    struct ctdb_req_control *c,
625                                    uint32_t db_id)
626 {
627         struct ctdb_db_context *ctdb_db;
628         struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
629
630         ctdb_db = find_ctdb_db(ctdb, db_id);
631         if (!ctdb_db) {
632                 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", db_id));
633                 return -1;
634         }
635
636         if (client->db_id == db_id) {
637                 return 0;
638         }
639
640         if (ctdb_db->transaction_active) {
641                 return 1;
642         } else {
643                 return 0;
644         }
645 }
646
647 /*
648   backwards compatibility:
649
650   start a persistent store operation. passing both the key, header and
651   data to the daemon. If the client disconnects before it has issued
652   a persistent_update call to the daemon we trigger a full recovery
653   to ensure the databases are brought back in sync.
654   for now we ignore the recdata that the client has passed to us.
655  */
656 int32_t ctdb_control_start_persistent_update(struct ctdb_context *ctdb, 
657                                       struct ctdb_req_control *c,
658                                       TDB_DATA recdata)
659 {
660         struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
661
662         if (client == NULL) {
663                 DEBUG(DEBUG_ERR,(__location__ " can not match start_persistent_update to a client. Returning error\n"));
664                 return -1;
665         }
666
667         client->num_persistent_updates++;
668
669         return 0;
670 }
671
672 /* 
673   backwards compatibility:
674
675   called to tell ctdbd that it is no longer doing a persistent update 
676 */
677 int32_t ctdb_control_cancel_persistent_update(struct ctdb_context *ctdb, 
678                                               struct ctdb_req_control *c,
679                                               TDB_DATA recdata)
680 {
681         struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
682
683         if (client == NULL) {
684                 DEBUG(DEBUG_ERR,(__location__ " can not match cancel_persistent_update to a client. Returning error\n"));
685                 return -1;
686         }
687
688         if (client->num_persistent_updates > 0) {
689                 client->num_persistent_updates--;
690         }
691
692         return 0;
693 }
694
695
696 /*
697   backwards compatibility:
698
699   single record varient of ctdb_control_trans2_commit for older clients
700  */
701 int32_t ctdb_control_persistent_store(struct ctdb_context *ctdb, 
702                                       struct ctdb_req_control *c, 
703                                       TDB_DATA recdata, bool *async_reply)
704 {
705         struct ctdb_marshall_buffer *m;
706         struct ctdb_rec_data *rec = (struct ctdb_rec_data *)recdata.dptr;
707         TDB_DATA key, data;
708
709         if (recdata.dsize != offsetof(struct ctdb_rec_data, data) + 
710             rec->keylen + rec->datalen) {
711                 DEBUG(DEBUG_ERR, (__location__ " Bad data size in recdata\n"));
712                 return -1;
713         }
714
715         key.dptr = &rec->data[0];
716         key.dsize = rec->keylen;
717         data.dptr = &rec->data[rec->keylen];
718         data.dsize = rec->datalen;
719
720         m = ctdb_marshall_add(c, NULL, rec->reqid, rec->reqid, key, NULL, data);
721         CTDB_NO_MEMORY(ctdb, m);
722
723         return ctdb_control_trans2_commit(ctdb, c, ctdb_marshall_finish(m), async_reply);
724 }
725
726