ctdb-locking: Instead of comparing key, compare key hash
[obnox/samba/samba-obnox.git] / ctdb / server / ctdb_lock.c
1 /*
2    ctdb lock handling
3    provide API to do non-blocking locks for single or all databases
4
5    Copyright (C) Amitay Isaacs  2012
6
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 3 of the License, or
10    (at your option) any later version.
11
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16
17    You should have received a copy of the GNU General Public License
18    along with this program; if not, see <http://www.gnu.org/licenses/>.
19 */
20 #include "includes.h"
21 #include "include/ctdb_private.h"
22 #include "include/ctdb_protocol.h"
23 #include "tevent.h"
24 #include "tdb.h"
25 #include "db_wrap.h"
26 #include "system/filesys.h"
27 #include "lib/util/dlinklist.h"
28
29 /*
30  * Non-blocking Locking API
31  *
32  * 1. Create a child process to do blocking locks.
33  * 2. Once the locks are obtained, signal parent process via fd.
34  * 3. Invoke registered callback routine with locking status.
35  * 4. If the child process cannot get locks within certain time,
36  *    diagnose using /proc/locks and log warning message
37  *
38  * ctdb_lock_record()      - get a lock on a record
39  * ctdb_lock_db()          - get a lock on a DB
40  * ctdb_lock_alldb_prio()  - get a lock on all DBs with given priority
41  * ctdb_lock_alldb()       - get a lock on all DBs
42  *
43  *  auto_mark              - whether to mark/unmark DBs in before/after callback
44  */
45
46 /* FIXME: Add a tunable max_lock_processes_per_db */
47 #define MAX_LOCK_PROCESSES_PER_DB               (100)
48
49 enum lock_type {
50         LOCK_RECORD,
51         LOCK_DB,
52         LOCK_ALLDB_PRIO,
53         LOCK_ALLDB,
54 };
55
56 static const char * const lock_type_str[] = {
57         "lock_record",
58         "lock_db",
59         "lock_alldb_prio",
60         "lock_db",
61 };
62
63 struct lock_request;
64
65 /* lock_context is the common part for a lock request */
66 struct lock_context {
67         struct lock_context *next, *prev;
68         enum lock_type type;
69         struct ctdb_context *ctdb;
70         struct ctdb_db_context *ctdb_db;
71         TDB_DATA key;
72         uint32_t priority;
73         bool auto_mark;
74         struct lock_request *req_queue;
75         pid_t child;
76         int fd[2];
77         struct tevent_fd *tfd;
78         struct tevent_timer *ttimer;
79         pid_t block_child;
80         int block_fd[2];
81         struct timeval start_time;
82         uint32_t key_hash;
83 };
84
85 /* lock_request is the client specific part for a lock request */
86 struct lock_request {
87         struct lock_request *next, *prev;
88         struct lock_context *lctx;
89         void (*callback)(void *, bool);
90         void *private_data;
91 };
92
93
94 /*
95  * Support samba 3.6.x (and older) versions which do not set db priority.
96  *
97  * By default, all databases are set to priority 1. So only when priority
98  * is set to 1, check for databases that need higher priority.
99  */
100 static bool later_db(struct ctdb_context *ctdb, const char *name)
101 {
102         if (ctdb->tunable.samba3_hack == 0) {
103                 return false;
104         }
105
106         if (strstr(name, "brlock") ||
107             strstr(name, "g_lock") ||
108             strstr(name, "notify_onelevel") ||
109             strstr(name, "serverid") ||
110             strstr(name, "xattr_tdb")) {
111                 return true;
112         }
113
114         return false;
115 }
116
117 typedef int (*db_handler_t)(struct ctdb_db_context *ctdb_db,
118                             uint32_t priority,
119                             void *private_data);
120
121 static int ctdb_db_iterator(struct ctdb_context *ctdb, uint32_t priority,
122                             db_handler_t handler, void *private_data)
123 {
124         struct ctdb_db_context *ctdb_db;
125         int ret;
126
127         for (ctdb_db = ctdb->db_list; ctdb_db; ctdb_db = ctdb_db->next) {
128                 if (ctdb_db->priority != priority) {
129                         continue;
130                 }
131                 if (later_db(ctdb, ctdb_db->db_name)) {
132                         continue;
133                 }
134                 ret = handler(ctdb_db, priority, private_data);
135                 if (ret != 0) {
136                         return -1;
137                 }
138         }
139
140         /* If priority != 1, later_db check is not required and can return */
141         if (priority != 1) {
142                 return 0;
143         }
144
145         for (ctdb_db = ctdb->db_list; ctdb_db; ctdb_db = ctdb_db->next) {
146                 if (!later_db(ctdb, ctdb_db->db_name)) {
147                         continue;
148                 }
149                 ret = handler(ctdb_db, priority, private_data);
150                 if (ret != 0) {
151                         return -1;
152                 }
153         }
154
155         return 0;
156 }
157
158
159 /*
160  * lock all databases - mark only
161  */
162 static int db_lock_mark_handler(struct ctdb_db_context *ctdb_db, uint32_t priority,
163                                 void *private_data)
164 {
165         int tdb_transaction_write_lock_mark(struct tdb_context *);
166
167         DEBUG(DEBUG_INFO, ("marking locked database %s, priority:%u\n",
168                            ctdb_db->db_name, priority));
169
170         if (tdb_transaction_write_lock_mark(ctdb_db->ltdb->tdb) != 0) {
171                 DEBUG(DEBUG_ERR, ("Failed to mark (transaction lock) database %s\n",
172                                   ctdb_db->db_name));
173                 return -1;
174         }
175
176         if (tdb_lockall_mark(ctdb_db->ltdb->tdb) != 0) {
177                 DEBUG(DEBUG_ERR, ("Failed to mark (all lock) database %s\n",
178                                   ctdb_db->db_name));
179                 return -1;
180         }
181
182         return 0;
183 }
184
185 int ctdb_lockall_mark_prio(struct ctdb_context *ctdb, uint32_t priority)
186 {
187         /*
188          * This function is only used by the main dameon during recovery.
189          * At this stage, the databases have already been locked, by a
190          * dedicated child process. The freeze_mode variable is used to track
191          * whether the actual locks are held by the child process or not.
192          */
193
194         if (ctdb->freeze_mode[priority] != CTDB_FREEZE_FROZEN) {
195                 DEBUG(DEBUG_ERR, ("Attempt to mark all databases locked when not frozen\n"));
196                 return -1;
197         }
198
199         return ctdb_db_iterator(ctdb, priority, db_lock_mark_handler, NULL);
200 }
201
202 static int ctdb_lockall_mark(struct ctdb_context *ctdb)
203 {
204         uint32_t priority;
205
206         for (priority=1; priority<=NUM_DB_PRIORITIES; priority++) {
207                 if (ctdb_db_iterator(ctdb, priority, db_lock_mark_handler, NULL) != 0) {
208                         return -1;
209                 }
210         }
211
212         return 0;
213 }
214
215
216 /*
217  * lock all databases - unmark only
218  */
219 static int db_lock_unmark_handler(struct ctdb_db_context *ctdb_db, uint32_t priority,
220                                   void *private_data)
221 {
222         int tdb_transaction_write_lock_unmark(struct tdb_context *);
223
224         DEBUG(DEBUG_INFO, ("unmarking locked database %s, priority:%u\n",
225                            ctdb_db->db_name, priority));
226
227         if (tdb_transaction_write_lock_unmark(ctdb_db->ltdb->tdb) != 0) {
228                 DEBUG(DEBUG_ERR, ("Failed to unmark (transaction lock) database %s\n",
229                                   ctdb_db->db_name));
230                 return -1;
231         }
232
233         if (tdb_lockall_unmark(ctdb_db->ltdb->tdb) != 0) {
234                 DEBUG(DEBUG_ERR, ("Failed to unmark (all lock) database %s\n",
235                                   ctdb_db->db_name));
236                 return -1;
237         }
238
239         return 0;
240 }
241
242 int ctdb_lockall_unmark_prio(struct ctdb_context *ctdb, uint32_t priority)
243 {
244         /*
245          * This function is only used by the main dameon during recovery.
246          * At this stage, the databases have already been locked, by a
247          * dedicated child process. The freeze_mode variable is used to track
248          * whether the actual locks are held by the child process or not.
249          */
250
251         if (ctdb->freeze_mode[priority] != CTDB_FREEZE_FROZEN) {
252                 DEBUG(DEBUG_ERR, ("Attempt to unmark all databases locked when not frozen\n"));
253                 return -1;
254         }
255
256         return ctdb_db_iterator(ctdb, priority, db_lock_unmark_handler, NULL);
257 }
258
259 static int ctdb_lockall_unmark(struct ctdb_context *ctdb)
260 {
261         uint32_t priority;
262
263         for (priority=NUM_DB_PRIORITIES; priority>0; priority--) {
264                 if (ctdb_db_iterator(ctdb, priority, db_lock_unmark_handler, NULL) != 0) {
265                         return -1;
266                 }
267         }
268
269         return 0;
270 }
271
272
273 static void ctdb_lock_schedule(struct ctdb_context *ctdb);
274
275 /*
276  * Destructor to kill the child locking process
277  */
278 static int ctdb_lock_context_destructor(struct lock_context *lock_ctx)
279 {
280         if (lock_ctx->child > 0) {
281                 ctdb_kill(lock_ctx->ctdb, lock_ctx->child, SIGKILL);
282                 DLIST_REMOVE(lock_ctx->ctdb->lock_current, lock_ctx);
283                 if (lock_ctx->ctdb_db) {
284                         lock_ctx->ctdb_db->lock_num_current--;
285                 }
286                 CTDB_DECREMENT_STAT(lock_ctx->ctdb, locks.num_current);
287                 if (lock_ctx->type == LOCK_RECORD || lock_ctx->type == LOCK_DB) {
288                         CTDB_DECREMENT_DB_STAT(lock_ctx->ctdb_db, locks.num_current);
289                 }
290         } else {
291                 DLIST_REMOVE(lock_ctx->ctdb->lock_pending, lock_ctx);
292                 lock_ctx->ctdb->lock_num_pending--;
293                 CTDB_DECREMENT_STAT(lock_ctx->ctdb, locks.num_pending);
294                 if (lock_ctx->type == LOCK_RECORD || lock_ctx->type == LOCK_DB) {
295                         CTDB_DECREMENT_DB_STAT(lock_ctx->ctdb_db, locks.num_pending);
296                 }
297         }
298
299         ctdb_lock_schedule(lock_ctx->ctdb);
300
301         return 0;
302 }
303
304
305 /*
306  * Destructor to remove lock request
307  */
308 static int ctdb_lock_request_destructor(struct lock_request *lock_request)
309 {
310         DLIST_REMOVE(lock_request->lctx->req_queue, lock_request);
311         return 0;
312 }
313
314
315 void ctdb_lock_free_request_context(struct lock_request *lock_req)
316 {
317         struct lock_context *lock_ctx;
318
319         lock_ctx = lock_req->lctx;
320         talloc_free(lock_req);
321         talloc_free(lock_ctx);
322 }
323
324
325 /*
326  * Process all the callbacks waiting for lock
327  *
328  * If lock has failed, callback is executed with locked=false
329  */
330 static void process_callbacks(struct lock_context *lock_ctx, bool locked)
331 {
332         struct lock_request *request, *next;
333
334         if (lock_ctx->auto_mark && locked) {
335                 switch (lock_ctx->type) {
336                 case LOCK_RECORD:
337                         tdb_chainlock_mark(lock_ctx->ctdb_db->ltdb->tdb, lock_ctx->key);
338                         break;
339
340                 case LOCK_DB:
341                         tdb_lockall_mark(lock_ctx->ctdb_db->ltdb->tdb);
342                         break;
343
344                 case LOCK_ALLDB_PRIO:
345                         ctdb_lockall_mark_prio(lock_ctx->ctdb, lock_ctx->priority);
346                         break;
347
348                 case LOCK_ALLDB:
349                         ctdb_lockall_mark(lock_ctx->ctdb);
350                         break;
351                 }
352         }
353
354         /* Iterate through all callbacks */
355         request = lock_ctx->req_queue;
356         while (request) {
357                 if (lock_ctx->auto_mark) {
358                         /* Reset the destructor, so request is not removed from the list */
359                         talloc_set_destructor(request, NULL);
360                 }
361
362                 /* In case, callback frees the request, store next */
363                 next = request->next;
364                 request->callback(request->private_data, locked);
365                 request = next;
366         }
367
368         if (lock_ctx->auto_mark && locked) {
369                 switch (lock_ctx->type) {
370                 case LOCK_RECORD:
371                         tdb_chainlock_unmark(lock_ctx->ctdb_db->ltdb->tdb, lock_ctx->key);
372                         break;
373
374                 case LOCK_DB:
375                         tdb_lockall_unmark(lock_ctx->ctdb_db->ltdb->tdb);
376                         break;
377
378                 case LOCK_ALLDB_PRIO:
379                         ctdb_lockall_unmark_prio(lock_ctx->ctdb, lock_ctx->priority);
380                         break;
381
382                 case LOCK_ALLDB:
383                         ctdb_lockall_unmark(lock_ctx->ctdb);
384                         break;
385                 }
386         }
387 }
388
389
390 static int lock_bucket_id(double t)
391 {
392         double ms = 1.e-3, s = 1;
393         int id;
394
395         if (t < 1*ms) {
396                 id = 0;
397         } else if (t < 10*ms) {
398                 id = 1;
399         } else if (t < 100*ms) {
400                 id = 2;
401         } else if (t < 1*s) {
402                 id = 3;
403         } else if (t < 2*s) {
404                 id = 4;
405         } else if (t < 4*s) {
406                 id = 5;
407         } else if (t < 8*s) {
408                 id = 6;
409         } else if (t < 16*s) {
410                 id = 7;
411         } else if (t < 32*s) {
412                 id = 8;
413         } else if (t < 64*s) {
414                 id = 9;
415         } else {
416                 id = 10;
417         }
418
419         return id;
420 }
421
422 /*
423  * Callback routine when the required locks are obtained.
424  * Called from parent context
425  */
426 static void ctdb_lock_handler(struct tevent_context *ev,
427                             struct tevent_fd *tfd,
428                             uint16_t flags,
429                             void *private_data)
430 {
431         struct lock_context *lock_ctx;
432         TALLOC_CTX *tmp_ctx = NULL;
433         char c;
434         bool locked;
435         double t;
436         int id;
437
438         lock_ctx = talloc_get_type_abort(private_data, struct lock_context);
439
440         /* cancel the timeout event */
441         if (lock_ctx->ttimer) {
442                 TALLOC_FREE(lock_ctx->ttimer);
443         }
444
445         t = timeval_elapsed(&lock_ctx->start_time);
446         id = lock_bucket_id(t);
447
448         if (lock_ctx->auto_mark) {
449                 tmp_ctx = talloc_new(ev);
450                 talloc_steal(tmp_ctx, lock_ctx);
451         }
452
453         /* Read the status from the child process */
454         if (read(lock_ctx->fd[0], &c, 1) != 1) {
455                 locked = false;
456         } else {
457                 locked = (c == 0 ? true : false);
458         }
459
460         /* Update statistics */
461         CTDB_DECREMENT_STAT(lock_ctx->ctdb, locks.num_pending);
462         CTDB_INCREMENT_STAT(lock_ctx->ctdb, locks.num_calls);
463         if (lock_ctx->ctdb_db) {
464                 CTDB_DECREMENT_DB_STAT(lock_ctx->ctdb_db, locks.num_pending);
465                 CTDB_INCREMENT_DB_STAT(lock_ctx->ctdb_db, locks.num_calls);
466         }
467
468         if (locked) {
469                 if (lock_ctx->ctdb_db) {
470                         CTDB_INCREMENT_STAT(lock_ctx->ctdb, locks.buckets[id]);
471                         CTDB_UPDATE_LATENCY(lock_ctx->ctdb, lock_ctx->ctdb_db,
472                                             lock_type_str[lock_ctx->type], locks.latency,
473                                             lock_ctx->start_time);
474
475                         CTDB_UPDATE_DB_LATENCY(lock_ctx->ctdb_db, lock_type_str[lock_ctx->type], locks.latency, t);
476                         CTDB_INCREMENT_DB_STAT(lock_ctx->ctdb_db, locks.buckets[id]);
477                 }
478         } else {
479                 CTDB_INCREMENT_STAT(lock_ctx->ctdb, locks.num_failed);
480                 if (lock_ctx->ctdb_db) {
481                         CTDB_INCREMENT_DB_STAT(lock_ctx->ctdb_db, locks.num_failed);
482                 }
483         }
484
485         process_callbacks(lock_ctx, locked);
486
487         if (lock_ctx->auto_mark) {
488                 talloc_free(tmp_ctx);
489         }
490 }
491
492
493 /*
494  * Callback routine when required locks are not obtained within timeout
495  * Called from parent context
496  */
497 static void ctdb_lock_timeout_handler(struct tevent_context *ev,
498                                     struct tevent_timer *ttimer,
499                                     struct timeval current_time,
500                                     void *private_data)
501 {
502         static const char * debug_locks = NULL;
503         struct lock_context *lock_ctx;
504         struct ctdb_context *ctdb;
505         pid_t pid;
506
507         lock_ctx = talloc_get_type_abort(private_data, struct lock_context);
508         ctdb = lock_ctx->ctdb;
509
510         if (lock_ctx->type == LOCK_RECORD || lock_ctx->type == LOCK_DB) {
511                 DEBUG(DEBUG_WARNING,
512                       ("Unable to get %s lock on database %s for %.0lf seconds\n",
513                        (lock_ctx->type == LOCK_RECORD ? "RECORD" : "DB"),
514                        lock_ctx->ctdb_db->db_name,
515                        timeval_elapsed(&lock_ctx->start_time)));
516         } else {
517                 DEBUG(DEBUG_WARNING,
518                       ("Unable to get ALLDB locks for %.0lf seconds\n",
519                        timeval_elapsed(&lock_ctx->start_time)));
520         }
521
522         /* Fire a child process to find the blocking process. */
523         if (debug_locks == NULL) {
524                 debug_locks = getenv("CTDB_DEBUG_LOCKS");
525                 if (debug_locks == NULL) {
526                         debug_locks = talloc_asprintf(ctdb,
527                                                       "%s/debug_locks.sh",
528                                                       getenv("CTDB_BASE"));
529                 }
530         }
531         if (debug_locks != NULL) {
532                 pid = vfork();
533                 if (pid == 0) {
534                         execl(debug_locks, debug_locks, NULL);
535                         _exit(0);
536                 }
537                 ctdb_track_child(ctdb, pid);
538         } else {
539                 DEBUG(DEBUG_WARNING,
540                       (__location__
541                        " Unable to setup lock debugging - no memory?\n"));
542         }
543
544         /* reset the timeout timer */
545         // talloc_free(lock_ctx->ttimer);
546         lock_ctx->ttimer = tevent_add_timer(ctdb->ev,
547                                             lock_ctx,
548                                             timeval_current_ofs(10, 0),
549                                             ctdb_lock_timeout_handler,
550                                             (void *)lock_ctx);
551 }
552
553
554 static int db_count_handler(struct ctdb_db_context *ctdb_db, uint32_t priority,
555                             void *private_data)
556 {
557         int *count = (int *)private_data;
558
559         (*count)++;
560
561         return 0;
562 }
563
564 struct db_namelist {
565         char **names;
566         int n;
567 };
568
569 static int db_name_handler(struct ctdb_db_context *ctdb_db, uint32_t priority,
570                            void *private_data)
571 {
572         struct db_namelist *list = (struct db_namelist *)private_data;
573
574         list->names[list->n] = talloc_strdup(list->names, ctdb_db->db_path);
575         list->n++;
576
577         return 0;
578 }
579
580 static char **lock_helper_args(TALLOC_CTX *mem_ctx, struct lock_context *lock_ctx, int fd)
581 {
582         struct ctdb_context *ctdb = lock_ctx->ctdb;
583         char **args = NULL;
584         int nargs, i;
585         int priority;
586         struct db_namelist list;
587
588         switch (lock_ctx->type) {
589         case LOCK_RECORD:
590                 nargs = 6;
591                 break;
592
593         case LOCK_DB:
594                 nargs = 5;
595                 break;
596
597         case LOCK_ALLDB_PRIO:
598                 nargs = 4;
599                 ctdb_db_iterator(ctdb, lock_ctx->priority, db_count_handler, &nargs);
600                 break;
601
602         case LOCK_ALLDB:
603                 nargs = 4;
604                 for (priority=1; priority<NUM_DB_PRIORITIES; priority++) {
605                         ctdb_db_iterator(ctdb, priority, db_count_handler, &nargs);
606                 }
607                 break;
608         }
609
610         /* Add extra argument for null termination */
611         nargs++;
612
613         args = talloc_array(mem_ctx, char *, nargs);
614         if (args == NULL) {
615                 return NULL;
616         }
617
618         args[0] = talloc_strdup(args, "ctdb_lock_helper");
619         args[1] = talloc_asprintf(args, "%d", getpid());
620         args[2] = talloc_asprintf(args, "%d", fd);
621
622         switch (lock_ctx->type) {
623         case LOCK_RECORD:
624                 args[3] = talloc_strdup(args, "RECORD");
625                 args[4] = talloc_strdup(args, lock_ctx->ctdb_db->db_path);
626                 if (lock_ctx->key.dsize == 0) {
627                         args[5] = talloc_strdup(args, "NULL");
628                 } else {
629                         args[5] = hex_encode_talloc(args, lock_ctx->key.dptr, lock_ctx->key.dsize);
630                 }
631                 break;
632
633         case LOCK_DB:
634                 args[3] = talloc_strdup(args, "DB");
635                 args[4] = talloc_strdup(args, lock_ctx->ctdb_db->db_path);
636                 break;
637
638         case LOCK_ALLDB_PRIO:
639                 args[3] = talloc_strdup(args, "DB");
640                 list.names = args;
641                 list.n = 4;
642                 ctdb_db_iterator(ctdb, lock_ctx->priority, db_name_handler, &list);
643                 break;
644
645         case LOCK_ALLDB:
646                 args[3] = talloc_strdup(args, "DB");
647                 list.names = args;
648                 list.n = 4;
649                 for (priority=1; priority<NUM_DB_PRIORITIES; priority++) {
650                         ctdb_db_iterator(ctdb, priority, db_name_handler, &list);
651                 }
652                 break;
653         }
654
655         /* Make sure last argument is NULL */
656         args[nargs-1] = NULL;
657
658         for (i=0; i<nargs-1; i++) {
659                 if (args[i] == NULL) {
660                         talloc_free(args);
661                         return NULL;
662                 }
663         }
664
665         return args;
666 }
667
668
669 /*
670  * Find the lock context of a given type
671  */
672 static struct lock_context *find_lock_context(struct lock_context *lock_list,
673                                               struct ctdb_db_context *ctdb_db,
674                                               TDB_DATA key,
675                                               uint32_t priority,
676                                               enum lock_type type,
677                                               uint32_t key_hash)
678 {
679         struct lock_context *lock_ctx;
680
681         /* Search active locks */
682         for (lock_ctx=lock_list; lock_ctx; lock_ctx=lock_ctx->next) {
683                 if (lock_ctx->type != type) {
684                         continue;
685                 }
686
687                 switch (lock_ctx->type) {
688                 case LOCK_RECORD:
689                         if (ctdb_db == lock_ctx->ctdb_db &&
690                             key_hash == lock_ctx->key_hash) {
691                                 goto done;
692                         }
693                         break;
694
695                 case LOCK_DB:
696                         if (ctdb_db == lock_ctx->ctdb_db) {
697                                 goto done;
698                         }
699                         break;
700
701                 case LOCK_ALLDB_PRIO:
702                         if (priority == lock_ctx->priority) {
703                                 goto done;
704                         }
705                         break;
706
707                 case LOCK_ALLDB:
708                         goto done;
709                         break;
710                 }
711         }
712
713         /* Did not find the lock context we are searching for */
714         lock_ctx = NULL;
715
716 done:
717         return lock_ctx;
718
719 }
720
721
722 /*
723  * Schedule a new lock child process
724  * Set up callback handler and timeout handler
725  */
726 static void ctdb_lock_schedule(struct ctdb_context *ctdb)
727 {
728         struct lock_context *lock_ctx, *next_ctx, *active_ctx;
729         int ret;
730         TALLOC_CTX *tmp_ctx;
731         const char *helper = BINDIR "/ctdb_lock_helper";
732         static const char *prog = NULL;
733         char **args;
734
735         if (prog == NULL) {
736                 const char *t;
737
738                 t = getenv("CTDB_LOCK_HELPER");
739                 if (t != NULL) {
740                         prog = talloc_strdup(ctdb, t);
741                 } else {
742                         prog = talloc_strdup(ctdb, helper);
743                 }
744                 CTDB_NO_MEMORY_VOID(ctdb, prog);
745         }
746
747         if (ctdb->lock_pending == NULL) {
748                 return;
749         }
750
751         /* Find a lock context with requests */
752         lock_ctx = ctdb->lock_pending;
753         while (lock_ctx != NULL) {
754                 next_ctx = lock_ctx->next;
755                 if (! lock_ctx->req_queue) {
756                         DEBUG(DEBUG_INFO, ("Removing lock context without lock requests\n"));
757                         DLIST_REMOVE(ctdb->lock_pending, lock_ctx);
758                         ctdb->lock_num_pending--;
759                         CTDB_DECREMENT_STAT(ctdb, locks.num_pending);
760                         if (lock_ctx->ctdb_db) {
761                                 CTDB_DECREMENT_DB_STAT(lock_ctx->ctdb_db, locks.num_pending);
762                         }
763                         talloc_free(lock_ctx);
764                 } else {
765                         active_ctx = find_lock_context(ctdb->lock_current, lock_ctx->ctdb_db,
766                                                        lock_ctx->key, lock_ctx->priority,
767                                                        lock_ctx->type, lock_ctx->key_hash);
768                         if (active_ctx == NULL) {
769                                 if (lock_ctx->ctdb_db == NULL ||
770                                     lock_ctx->ctdb_db->lock_num_current < MAX_LOCK_PROCESSES_PER_DB) {
771                                         /* Found a lock context with lock requests */
772                                         break;
773                                 }
774                         }
775
776                         /* There is already a child waiting for the
777                          * same key.  So don't schedule another child
778                          * just yet.
779                          */
780                 }
781                 lock_ctx = next_ctx;
782         }
783
784         if (lock_ctx == NULL) {
785                 return;
786         }
787
788         lock_ctx->child = -1;
789         ret = pipe(lock_ctx->fd);
790         if (ret != 0) {
791                 DEBUG(DEBUG_ERR, ("Failed to create pipe in ctdb_lock_schedule\n"));
792                 return;
793         }
794
795         set_close_on_exec(lock_ctx->fd[0]);
796
797         /* Create data for child process */
798         tmp_ctx = talloc_new(lock_ctx);
799         if (tmp_ctx == NULL) {
800                 DEBUG(DEBUG_ERR, ("Failed to allocate memory for helper args\n"));
801                 close(lock_ctx->fd[0]);
802                 close(lock_ctx->fd[1]);
803                 return;
804         }
805
806         /* Create arguments for lock helper */
807         args = lock_helper_args(tmp_ctx, lock_ctx, lock_ctx->fd[1]);
808         if (args == NULL) {
809                 DEBUG(DEBUG_ERR, ("Failed to create lock helper args\n"));
810                 close(lock_ctx->fd[0]);
811                 close(lock_ctx->fd[1]);
812                 talloc_free(tmp_ctx);
813                 return;
814         }
815
816         lock_ctx->child = vfork();
817
818         if (lock_ctx->child == (pid_t)-1) {
819                 DEBUG(DEBUG_ERR, ("Failed to create a child in ctdb_lock_schedule\n"));
820                 close(lock_ctx->fd[0]);
821                 close(lock_ctx->fd[1]);
822                 talloc_free(tmp_ctx);
823                 return;
824         }
825
826
827         /* Child process */
828         if (lock_ctx->child == 0) {
829                 ret = execv(prog, args);
830                 if (ret < 0) {
831                         DEBUG(DEBUG_ERR, ("Failed to execute helper %s (%d, %s)\n",
832                                           prog, errno, strerror(errno)));
833                 }
834                 _exit(1);
835         }
836
837         /* Parent process */
838         ctdb_track_child(ctdb, lock_ctx->child);
839         close(lock_ctx->fd[1]);
840
841         talloc_set_destructor(lock_ctx, ctdb_lock_context_destructor);
842
843         talloc_free(tmp_ctx);
844
845         /* Set up timeout handler */
846         lock_ctx->ttimer = tevent_add_timer(ctdb->ev,
847                                             lock_ctx,
848                                             timeval_current_ofs(10, 0),
849                                             ctdb_lock_timeout_handler,
850                                             (void *)lock_ctx);
851         if (lock_ctx->ttimer == NULL) {
852                 ctdb_kill(ctdb, lock_ctx->child, SIGKILL);
853                 lock_ctx->child = -1;
854                 talloc_set_destructor(lock_ctx, NULL);
855                 close(lock_ctx->fd[0]);
856                 return;
857         }
858
859         /* Set up callback */
860         lock_ctx->tfd = tevent_add_fd(ctdb->ev,
861                                       lock_ctx,
862                                       lock_ctx->fd[0],
863                                       EVENT_FD_READ,
864                                       ctdb_lock_handler,
865                                       (void *)lock_ctx);
866         if (lock_ctx->tfd == NULL) {
867                 TALLOC_FREE(lock_ctx->ttimer);
868                 ctdb_kill(ctdb, lock_ctx->child, SIGKILL);
869                 lock_ctx->child = -1;
870                 talloc_set_destructor(lock_ctx, NULL);
871                 close(lock_ctx->fd[0]);
872                 return;
873         }
874         tevent_fd_set_auto_close(lock_ctx->tfd);
875
876         /* Move the context from pending to current */
877         DLIST_REMOVE(ctdb->lock_pending, lock_ctx);
878         ctdb->lock_num_pending--;
879         DLIST_ADD_END(ctdb->lock_current, lock_ctx, NULL);
880         if (lock_ctx->ctdb_db) {
881                 lock_ctx->ctdb_db->lock_num_current++;
882                 CTDB_INCREMENT_STAT(lock_ctx->ctdb, locks.num_current);
883                 CTDB_INCREMENT_DB_STAT(lock_ctx->ctdb_db, locks.num_current);
884         }
885 }
886
887
888 /*
889  * Lock record / db depending on type
890  */
891 static struct lock_request *ctdb_lock_internal(struct ctdb_context *ctdb,
892                                                struct ctdb_db_context *ctdb_db,
893                                                TDB_DATA key,
894                                                uint32_t priority,
895                                                void (*callback)(void *, bool),
896                                                void *private_data,
897                                                enum lock_type type,
898                                                bool auto_mark)
899 {
900         struct lock_context *lock_ctx = NULL;
901         struct lock_request *request;
902
903         if (callback == NULL) {
904                 DEBUG(DEBUG_WARNING, ("No callback function specified, not locking\n"));
905                 return NULL;
906         }
907
908 #if 0
909         /* Disable this optimization to ensure first-in-first-out fair
910          * scheduling of lock requests */
911
912         /* get a context for this key - search only the pending contexts,
913          * current contexts might in the middle of processing callbacks */
914         lock_ctx = find_lock_context(ctdb->lock_pending, ctdb_db, key, priority, type);
915 #endif
916
917         /* No existing context, create one */
918         if (lock_ctx == NULL) {
919                 lock_ctx = talloc_zero(ctdb, struct lock_context);
920                 if (lock_ctx == NULL) {
921                         DEBUG(DEBUG_ERR, ("Failed to create a new lock context\n"));
922                         return NULL;
923                 }
924
925                 lock_ctx->type = type;
926                 lock_ctx->ctdb = ctdb;
927                 lock_ctx->ctdb_db = ctdb_db;
928                 lock_ctx->key.dsize = key.dsize;
929                 if (key.dsize > 0) {
930                         lock_ctx->key.dptr = talloc_memdup(lock_ctx, key.dptr, key.dsize);
931                         if (lock_ctx->key.dptr == NULL) {
932                                 DEBUG(DEBUG_ERR, (__location__ "Memory allocation error\n"));
933                                 talloc_free(lock_ctx);
934                                 return NULL;
935                         }
936                         lock_ctx->key_hash = ctdb_hash(&key);
937                 } else {
938                         lock_ctx->key.dptr = NULL;
939                 }
940                 lock_ctx->priority = priority;
941                 lock_ctx->auto_mark = auto_mark;
942
943                 lock_ctx->child = -1;
944                 lock_ctx->block_child = -1;
945
946                 DLIST_ADD_END(ctdb->lock_pending, lock_ctx, NULL);
947                 ctdb->lock_num_pending++;
948                 CTDB_INCREMENT_STAT(ctdb, locks.num_pending);
949                 if (ctdb_db) {
950                         CTDB_INCREMENT_DB_STAT(ctdb_db, locks.num_pending);
951                 }
952
953                 /* Start the timer when we activate the context */
954                 lock_ctx->start_time = timeval_current();
955         }
956
957         if ((request = talloc_zero(lock_ctx, struct lock_request)) == NULL) {
958                 talloc_free(lock_ctx);
959                 return NULL;
960         }
961
962         request->lctx = lock_ctx;
963         request->callback = callback;
964         request->private_data = private_data;
965
966         talloc_set_destructor(request, ctdb_lock_request_destructor);
967         DLIST_ADD_END(lock_ctx->req_queue, request, NULL);
968
969         ctdb_lock_schedule(ctdb);
970
971         return request;
972 }
973
974
975 /*
976  * obtain a lock on a record in a database
977  */
978 struct lock_request *ctdb_lock_record(struct ctdb_db_context *ctdb_db,
979                                       TDB_DATA key,
980                                       bool auto_mark,
981                                       void (*callback)(void *, bool),
982                                       void *private_data)
983 {
984         return ctdb_lock_internal(ctdb_db->ctdb,
985                                   ctdb_db,
986                                   key,
987                                   0,
988                                   callback,
989                                   private_data,
990                                   LOCK_RECORD,
991                                   auto_mark);
992 }
993
994
995 /*
996  * obtain a lock on a database
997  */
998 struct lock_request *ctdb_lock_db(struct ctdb_db_context *ctdb_db,
999                                   bool auto_mark,
1000                                   void (*callback)(void *, bool),
1001                                   void *private_data)
1002 {
1003         return ctdb_lock_internal(ctdb_db->ctdb,
1004                                   ctdb_db,
1005                                   tdb_null,
1006                                   0,
1007                                   callback,
1008                                   private_data,
1009                                   LOCK_DB,
1010                                   auto_mark);
1011 }
1012
1013
1014 /*
1015  * obtain locks on all databases of specified priority
1016  */
1017 struct lock_request *ctdb_lock_alldb_prio(struct ctdb_context *ctdb,
1018                                           uint32_t priority,
1019                                           bool auto_mark,
1020                                           void (*callback)(void *, bool),
1021                                           void *private_data)
1022 {
1023         if (priority < 1 || priority > NUM_DB_PRIORITIES) {
1024                 DEBUG(DEBUG_ERR, ("Invalid db priority: %u\n", priority));
1025                 return NULL;
1026         }
1027
1028         return ctdb_lock_internal(ctdb,
1029                                   NULL,
1030                                   tdb_null,
1031                                   priority,
1032                                   callback,
1033                                   private_data,
1034                                   LOCK_ALLDB_PRIO,
1035                                   auto_mark);
1036 }
1037
1038
1039 /*
1040  * obtain locks on all databases
1041  */
1042 struct lock_request *ctdb_lock_alldb(struct ctdb_context *ctdb,
1043                                      bool auto_mark,
1044                                      void (*callback)(void *, bool),
1045                                      void *private_data)
1046 {
1047         return ctdb_lock_internal(ctdb,
1048                                   NULL,
1049                                   tdb_null,
1050                                   0,
1051                                   callback,
1052                                   private_data,
1053                                   LOCK_ALLDB,
1054                                   auto_mark);
1055 }
1056