daemon: Add a tunable to enable automatic database priority setting
[obnox/samba/samba-obnox.git] / ctdb / server / ctdb_lock.c
1 /*
2    ctdb lock handling
3    provide API to do non-blocking locks for single or all databases
4
5    Copyright (C) Amitay Isaacs  2012
6
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 3 of the License, or
10    (at your option) any later version.
11
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16
17    You should have received a copy of the GNU General Public License
18    along with this program; if not, see <http://www.gnu.org/licenses/>.
19 */
20 #include "includes.h"
21 #include "include/ctdb_private.h"
22 #include "include/ctdb_protocol.h"
23 #include "tevent.h"
24 #include "tdb.h"
25 #include "db_wrap.h"
26 #include "system/filesys.h"
27 #include "lib/util/dlinklist.h"
28
29 /*
30  * Non-blocking Locking API
31  *
32  * 1. Create a child process to do blocking locks.
33  * 2. Once the locks are obtained, signal parent process via fd.
34  * 3. Invoke registered callback routine with locking status.
35  * 4. If the child process cannot get locks within certain time,
36  *    diagnose using /proc/locks and log warning message
37  *
38  * ctdb_lock_record()      - get a lock on a record
39  * ctdb_lock_db()          - get a lock on a DB
40  * ctdb_lock_alldb_prio()  - get a lock on all DBs with given priority
41  * ctdb_lock_alldb()       - get a lock on all DBs
42  *
43  *  auto_mark              - whether to mark/unmark DBs in before/after callback
44  */
45
46 /* FIXME: Add a tunable max_lock_processes_per_db */
47 #define MAX_LOCK_PROCESSES_PER_DB               (100)
48
49 enum lock_type {
50         LOCK_RECORD,
51         LOCK_DB,
52         LOCK_ALLDB_PRIO,
53         LOCK_ALLDB,
54 };
55
56 static const char * const lock_type_str[] = {
57         "lock_record",
58         "lock_db",
59         "lock_alldb_prio",
60         "lock_db",
61 };
62
63 struct lock_request;
64
65 /* lock_context is the common part for a lock request */
66 struct lock_context {
67         struct lock_context *next, *prev;
68         enum lock_type type;
69         struct ctdb_context *ctdb;
70         struct ctdb_db_context *ctdb_db;
71         TDB_DATA key;
72         uint32_t priority;
73         bool auto_mark;
74         struct lock_request *req_queue;
75         pid_t child;
76         int fd[2];
77         struct tevent_fd *tfd;
78         struct tevent_timer *ttimer;
79         pid_t block_child;
80         int block_fd[2];
81         struct timeval start_time;
82 };
83
84 /* lock_request is the client specific part for a lock request */
85 struct lock_request {
86         struct lock_request *next, *prev;
87         struct lock_context *lctx;
88         void (*callback)(void *, bool);
89         void *private_data;
90 };
91
92
93 /*
94  * Support samba 3.6.x (and older) versions which do not set db priority.
95  *
96  * By default, all databases are set to priority 1. So only when priority
97  * is set to 1, check for databases that need higher priority.
98  */
99 static bool later_db(struct ctdb_context *ctdb, const char *name)
100 {
101         if (ctdb->tunable.samba3_hack == 0) {
102                 return false;
103         }
104
105         if (strstr(name, "brlock") ||
106             strstr(name, "g_lock") ||
107             strstr(name, "notify_onelevel") ||
108             strstr(name, "serverid") ||
109             strstr(name, "xattr_tdb")) {
110                 return true;
111         }
112
113         return false;
114 }
115
116 /*
117  * lock all databases
118  */
119 int ctdb_lockall_prio(struct ctdb_context *ctdb, uint32_t priority)
120 {
121         struct ctdb_db_context *ctdb_db;
122
123         for (ctdb_db = ctdb->db_list; ctdb_db; ctdb_db = ctdb_db->next) {
124                 if (ctdb_db->priority != priority) {
125                         continue;
126                 }
127                 if (later_db(ctdb, ctdb_db->db_name)) {
128                         continue;
129                 }
130                 DEBUG(DEBUG_INFO, ("locking database %s, priority:%u\n",
131                                    ctdb_db->db_name, priority));
132                 if (tdb_lockall(ctdb_db->ltdb->tdb) != 0) {
133                         DEBUG(DEBUG_ERR, ("Failed to lock database %s\n",
134                                           ctdb_db->db_name));
135                         return -1;
136                 }
137         }
138
139         /* If priority != 1, later_db check is not required and can return */
140         if (priority != 1) {
141                 return 0;
142         }
143
144         for (ctdb_db = ctdb->db_list; ctdb_db; ctdb_db = ctdb_db->next) {
145                 if (!later_db(ctdb, ctdb_db->db_name)) {
146                         continue;
147                 }
148                 DEBUG(DEBUG_INFO, ("locking database %s, priority:%u\n",
149                                    ctdb_db->db_name, priority));
150                 if (tdb_lockall(ctdb_db->ltdb->tdb) != 0) {
151                         DEBUG(DEBUG_ERR, ("Failed to lock database %s\n",
152                                           ctdb_db->db_name));
153                         return -1;
154                 }
155         }
156
157         return 0;
158 }
159
160 static int ctdb_lockall(struct ctdb_context *ctdb)
161 {
162         uint32_t priority;
163
164         for (priority=1; priority<=NUM_DB_PRIORITIES; priority++) {
165                 if (ctdb_lockall_prio(ctdb, priority) != 0) {
166                         return -1;
167                 }
168         }
169
170         return 0;
171 }
172
173
174 /*
175  * unlock all databases
176  */
177 int ctdb_unlockall_prio(struct ctdb_context *ctdb, uint32_t priority)
178 {
179         struct ctdb_db_context *ctdb_db;
180
181         for (ctdb_db = ctdb->db_list; ctdb_db; ctdb_db = ctdb_db->next) {
182                 if (ctdb_db->priority != priority) {
183                         continue;
184                 }
185                 DEBUG(DEBUG_INFO, ("unlocking database %s, priority:%u\n",
186                                    ctdb_db->db_name, priority));
187                 if (tdb_unlockall(ctdb_db->ltdb->tdb) != 0) {
188                         DEBUG(DEBUG_ERR, ("Failed to unlock database %s\n",
189                                           ctdb_db->db_name));
190                         return -1;
191                 }
192         }
193
194         return 0;
195 }
196
197 static int ctdb_unlockall(struct ctdb_context *ctdb)
198 {
199         uint32_t priority;
200
201         for (priority=NUM_DB_PRIORITIES; priority>=0; priority--) {
202                 if (ctdb_unlockall_prio(ctdb, priority) != 0) {
203                         return -1;
204                 }
205         }
206
207         return 0;
208 }
209
210
211 /*
212  * lock all databases - mark only
213  */
214 int ctdb_lockall_mark_prio(struct ctdb_context *ctdb, uint32_t priority)
215 {
216         struct ctdb_db_context *ctdb_db;
217         int tdb_transaction_write_lock_mark(struct tdb_context *);
218
219         /*
220          * This function is only used by the main dameon during recovery.
221          * At this stage, the databases have already been locked, by a
222          * dedicated child process. The freeze_mode variable is used to track
223          * whether the actual locks are held by the child process or not.
224          */
225
226         if (ctdb->freeze_mode[priority] != CTDB_FREEZE_FROZEN) {
227                 DEBUG(DEBUG_ERR, ("Attempt to mark all databases locked when not frozen\n"));
228                 return -1;
229         }
230
231         for (ctdb_db = ctdb->db_list; ctdb_db; ctdb_db = ctdb_db->next) {
232                 if (ctdb_db->priority != priority) {
233                         continue;
234                 }
235                 if (later_db(ctdb, ctdb_db->db_name)) {
236                         continue;
237                 }
238                 if (tdb_transaction_write_lock_mark(ctdb_db->ltdb->tdb) != 0) {
239                         return -1;
240                 }
241                 if (tdb_lockall_mark(ctdb_db->ltdb->tdb) != 0) {
242                         /* FIXME: Shouldn't we unmark here? */
243                         return -1;
244                 }
245         }
246
247         /* If priority != 1, later_db check is not required and can return */
248         if (priority != 1) {
249                 return 0;
250         }
251
252         for (ctdb_db = ctdb->db_list; ctdb_db; ctdb_db = ctdb_db->next) {
253                 if (!later_db(ctdb, ctdb_db->db_name)) {
254                         continue;
255                 }
256                 if (tdb_transaction_write_lock_mark(ctdb_db->ltdb->tdb) != 0) {
257                         return -1;
258                 }
259                 if (tdb_lockall_mark(ctdb_db->ltdb->tdb) != 0) {
260                         /* FIXME: Shouldn't we unmark here? */
261                         return -1;
262                 }
263         }
264
265         return 0;
266 }
267
268 static int ctdb_lockall_mark(struct ctdb_context *ctdb)
269 {
270         uint32_t priority;
271
272         for (priority=1; priority<=NUM_DB_PRIORITIES; priority++) {
273                 if (ctdb_lockall_mark_prio(ctdb, priority) != 0) {
274                         return -1;
275                 }
276         }
277
278         return 0;
279 }
280
281
282 /*
283  * lock all databases - unmark only
284  */
285 int ctdb_lockall_unmark_prio(struct ctdb_context *ctdb, uint32_t priority)
286 {
287         struct ctdb_db_context *ctdb_db;
288         int tdb_transaction_write_lock_unmark(struct tdb_context *);
289
290         /*
291          * This function is only used by the main dameon during recovery.
292          * At this stage, the databases have already been locked, by a
293          * dedicated child process. The freeze_mode variable is used to track
294          * whether the actual locks are held by the child process or not.
295          */
296
297         if (ctdb->freeze_mode[priority] != CTDB_FREEZE_FROZEN) {
298                 DEBUG(DEBUG_ERR, ("Attempt to unmark all databases locked when not frozen\n"));
299                 return -1;
300         }
301
302         for (ctdb_db = ctdb->db_list; ctdb_db; ctdb_db = ctdb_db->next) {
303                 if (ctdb_db->priority != priority) {
304                         continue;
305                 }
306                 if (tdb_transaction_write_lock_unmark(ctdb_db->ltdb->tdb) != 0) {
307                         return -1;
308                 }
309                 if (tdb_lockall_unmark(ctdb_db->ltdb->tdb) != 0) {
310                         return -1;
311                 }
312         }
313
314         return 0;
315 }
316
317 static int ctdb_lockall_unmark(struct ctdb_context *ctdb)
318 {
319         uint32_t priority;
320
321         for (priority=NUM_DB_PRIORITIES; priority>=0; priority--) {
322                 if (ctdb_lockall_unmark_prio(ctdb, priority) != 0) {
323                         return -1;
324                 }
325         }
326
327         return 0;
328 }
329
330
331 /*
332  * Lock record / db depending on lock_ctx->type
333  * Called from child context.
334  */
335 static bool ctdb_lock_item(struct lock_context *lock_ctx)
336 {
337         bool status = false;
338
339         switch (lock_ctx->type) {
340         case LOCK_RECORD:
341                 if (tdb_chainlock(lock_ctx->ctdb_db->ltdb->tdb, lock_ctx->key) == 0) {
342                         status = true;
343                 }
344                 break;
345
346         case LOCK_DB:
347                 if (tdb_lockall(lock_ctx->ctdb_db->ltdb->tdb) == 0) {
348                         status = true;
349                 }
350                 break;
351
352         case LOCK_ALLDB_PRIO:
353                 if (ctdb_lockall_prio(lock_ctx->ctdb, lock_ctx->priority) == 0) {
354                         status = true;
355                 }
356                 break;
357
358         case LOCK_ALLDB:
359                 if (ctdb_lockall(lock_ctx->ctdb) == 0) {
360                         status = true;
361                 }
362                 break;
363         }
364
365         return status;
366 }
367
368
369 /*
370  * Unlock record / db depending on lock_ctx->type
371  */
372 void ctdb_unlock_item(struct lock_context *lock_ctx)
373 {
374         switch (lock_ctx->type) {
375         case LOCK_RECORD:
376                 tdb_chainunlock(lock_ctx->ctdb_db->ltdb->tdb, lock_ctx->key);
377                 break;
378
379         case LOCK_DB:
380                 tdb_unlockall(lock_ctx->ctdb_db->ltdb->tdb);
381                 break;
382
383         case LOCK_ALLDB_PRIO:
384                 ctdb_unlockall_prio(lock_ctx->ctdb, lock_ctx->priority);
385                 break;
386
387         case LOCK_ALLDB:
388                 ctdb_unlockall(lock_ctx->ctdb);
389                 break;
390         }
391 }
392
393 static void ctdb_lock_schedule(struct ctdb_context *ctdb);
394
395 /*
396  * Destructor to kill the child locking process
397  */
398 static int ctdb_lock_context_destructor(struct lock_context *lock_ctx)
399 {
400         if (lock_ctx->child > 0) {
401                 ctdb_kill(lock_ctx->ctdb, lock_ctx->child, SIGKILL);
402                 DLIST_REMOVE(lock_ctx->ctdb->lock_current, lock_ctx);
403                 lock_ctx->ctdb->lock_num_current--;
404                 CTDB_DECREMENT_STAT(lock_ctx->ctdb, locks.num_current);
405                 if (lock_ctx->type == LOCK_RECORD || lock_ctx->type == LOCK_DB) {
406                         CTDB_DECREMENT_DB_STAT(lock_ctx->ctdb_db, locks.num_current);
407                 }
408         } else {
409                 DLIST_REMOVE(lock_ctx->ctdb->lock_pending, lock_ctx);
410                 lock_ctx->ctdb->lock_num_pending--;
411                 CTDB_DECREMENT_STAT(lock_ctx->ctdb, locks.num_pending);
412                 if (lock_ctx->type == LOCK_RECORD || lock_ctx->type == LOCK_DB) {
413                         CTDB_DECREMENT_DB_STAT(lock_ctx->ctdb_db, locks.num_pending);
414                 }
415         }
416
417         ctdb_lock_schedule(lock_ctx->ctdb);
418
419         return 0;
420 }
421
422
423 /*
424  * Destructor to remove lock request
425  */
426 static int ctdb_lock_request_destructor(struct lock_request *lock_request)
427 {
428         DLIST_REMOVE(lock_request->lctx->req_queue, lock_request);
429         return 0;
430 }
431
432
433 void ctdb_lock_free_request_context(struct lock_request *lock_req)
434 {
435         struct lock_context *lock_ctx;
436
437         lock_ctx = lock_req->lctx;
438         talloc_free(lock_req);
439         talloc_free(lock_ctx);
440 }
441
442
443 /*
444  * Process all the callbacks waiting for lock
445  *
446  * If lock has failed, callback is executed with locked=false
447  */
448 static void process_callbacks(struct lock_context *lock_ctx, bool locked)
449 {
450         struct lock_request *request, *next;
451
452         if (lock_ctx->auto_mark && locked) {
453                 switch (lock_ctx->type) {
454                 case LOCK_RECORD:
455                         tdb_chainlock_mark(lock_ctx->ctdb_db->ltdb->tdb, lock_ctx->key);
456                         break;
457
458                 case LOCK_DB:
459                         tdb_lockall_mark(lock_ctx->ctdb_db->ltdb->tdb);
460                         break;
461
462                 case LOCK_ALLDB_PRIO:
463                         ctdb_lockall_mark_prio(lock_ctx->ctdb, lock_ctx->priority);
464                         break;
465
466                 case LOCK_ALLDB:
467                         ctdb_lockall_mark(lock_ctx->ctdb);
468                         break;
469                 }
470         }
471
472         /* Iterate through all callbacks */
473         request = lock_ctx->req_queue;
474         while (request) {
475                 if (lock_ctx->auto_mark) {
476                         /* Reset the destructor, so request is not removed from the list */
477                         talloc_set_destructor(request, NULL);
478                 }
479
480                 /* In case, callback frees the request, store next */
481                 next = request->next;
482                 request->callback(request->private_data, locked);
483                 request = next;
484         }
485
486         if (lock_ctx->auto_mark && locked) {
487                 switch (lock_ctx->type) {
488                 case LOCK_RECORD:
489                         tdb_chainlock_unmark(lock_ctx->ctdb_db->ltdb->tdb, lock_ctx->key);
490                         break;
491
492                 case LOCK_DB:
493                         tdb_lockall_unmark(lock_ctx->ctdb_db->ltdb->tdb);
494                         break;
495
496                 case LOCK_ALLDB_PRIO:
497                         ctdb_lockall_unmark_prio(lock_ctx->ctdb, lock_ctx->priority);
498                         break;
499
500                 case LOCK_ALLDB:
501                         ctdb_lockall_unmark(lock_ctx->ctdb);
502                         break;
503                 }
504         }
505 }
506
507
508 static int lock_bucket_id(double t)
509 {
510         double us = 1.e-6, ms = 1.e-3, s = 1;
511         int id;
512
513         if (t < 1*us) {
514                 id = 0;
515         } else if (t < 10*us) {
516                 id = 1;
517         } else if (t < 100*us) {
518                 id = 2;
519         } else if (t < 1*ms) {
520                 id = 3;
521         } else if (t < 10*ms) {
522                 id = 4;
523         } else if (t < 100*ms) {
524                 id = 5;
525         } else if (t < 1*s) {
526                 id = 6;
527         } else if (t < 10*s) {
528                 id = 7;
529         } else {
530                 id = 8;
531         }
532
533         return id;
534 }
535
536 /*
537  * Callback routine when the required locks are obtained.
538  * Called from parent context
539  */
540 static void ctdb_lock_handler(struct tevent_context *ev,
541                             struct tevent_fd *tfd,
542                             uint16_t flags,
543                             void *private_data)
544 {
545         struct lock_context *lock_ctx;
546         TALLOC_CTX *tmp_ctx;
547         char c;
548         bool locked;
549         double t;
550         int id;
551
552         lock_ctx = talloc_get_type_abort(private_data, struct lock_context);
553
554         /* cancel the timeout event */
555         if (lock_ctx->ttimer) {
556                 TALLOC_FREE(lock_ctx->ttimer);
557         }
558
559         t = timeval_elapsed(&lock_ctx->start_time);
560         id = lock_bucket_id(t);
561
562         if (lock_ctx->auto_mark) {
563                 tmp_ctx = talloc_new(ev);
564                 talloc_steal(tmp_ctx, lock_ctx);
565         }
566
567         /* Read the status from the child process */
568         read(lock_ctx->fd[0], &c, 1);
569         locked = (c == 0 ? true : false);
570
571         /* Update statistics */
572         CTDB_DECREMENT_STAT(lock_ctx->ctdb, locks.num_pending);
573         CTDB_INCREMENT_STAT(lock_ctx->ctdb, locks.num_calls);
574         if (lock_ctx->ctdb_db) {
575                 CTDB_DECREMENT_DB_STAT(lock_ctx->ctdb_db, locks.num_pending);
576                 CTDB_INCREMENT_DB_STAT(lock_ctx->ctdb_db, locks.num_calls);
577         }
578
579         if (locked) {
580                 CTDB_INCREMENT_STAT(lock_ctx->ctdb, locks.num_current);
581                 CTDB_INCREMENT_STAT(lock_ctx->ctdb, locks.buckets[id]);
582                 if (lock_ctx->ctdb_db) {
583                         CTDB_INCREMENT_DB_STAT(lock_ctx->ctdb_db, locks.num_current);
584                         CTDB_UPDATE_DB_LATENCY(lock_ctx->ctdb_db, lock_type_str[lock_ctx->type], locks.latency, t);
585                         CTDB_INCREMENT_DB_STAT(lock_ctx->ctdb_db, locks.buckets[id]);
586                 }
587         } else {
588                 CTDB_INCREMENT_STAT(lock_ctx->ctdb, locks.num_failed);
589                 if (lock_ctx->ctdb_db) {
590                         CTDB_INCREMENT_DB_STAT(lock_ctx->ctdb_db, locks.num_failed);
591                 }
592         }
593
594         process_callbacks(lock_ctx, locked);
595
596         if (lock_ctx->auto_mark) {
597                 talloc_free(tmp_ctx);
598         }
599 }
600
601
602 static void ctdb_lock_find_blocker(struct lock_context *lock_ctx);
603
604 /*
605  * Callback routine when required locks are not obtained within timeout
606  * Called from parent context
607  */
608 static void ctdb_lock_timeout_handler(struct tevent_context *ev,
609                                     struct tevent_timer *ttimer,
610                                     struct timeval current_time,
611                                     void *private_data)
612 {
613         struct lock_context *lock_ctx;
614         struct ctdb_context *ctdb;
615
616         lock_ctx = talloc_get_type_abort(private_data, struct lock_context);
617         ctdb = lock_ctx->ctdb;
618
619         /* fire a child process to find the blocking process */
620         if (lock_ctx->block_child == -1) {
621                 ctdb_lock_find_blocker(lock_ctx);
622         }
623
624         /* reset the timeout timer */
625         // talloc_free(lock_ctx->ttimer);
626         lock_ctx->ttimer = tevent_add_timer(ctdb->ev,
627                                             lock_ctx,
628                                             timeval_current_ofs(10, 0),
629                                             ctdb_lock_timeout_handler,
630                                             (void *)lock_ctx);
631 }
632
633
634 static char *lock_child_log_prefix(struct lock_context *lock_ctx)
635 {
636         char *prefix;
637         pid_t pid;
638
639         pid = getpid();
640
641         switch (lock_ctx->type) {
642         case LOCK_RECORD:
643                 prefix = talloc_asprintf(NULL, "lockR(%d): ", pid);
644                 break;
645
646         case LOCK_DB:
647                 prefix = talloc_asprintf(NULL, "lockD(%d): ", pid);
648                 break;
649
650         case LOCK_ALLDB_PRIO:
651                 prefix = talloc_asprintf(NULL, "lockP(%d): ", pid);
652                 break;
653
654         case LOCK_ALLDB:
655                 prefix = talloc_asprintf(NULL, "lockA(%d): ", pid);
656                 break;
657         }
658
659         return prefix;
660 }
661
662
663 /*
664  * Schedule a new lock child process
665  * Set up callback handler and timeout handler
666  */
667 static void ctdb_lock_schedule(struct ctdb_context *ctdb)
668 {
669         struct lock_context *lock_ctx, *next_ctx;
670         int ret;
671         pid_t parent;
672
673         if (ctdb->lock_num_current >= MAX_LOCK_PROCESSES_PER_DB) {
674                 return;
675         }
676
677         if (ctdb->lock_pending == NULL) {
678                 return;
679         }
680
681         /* Find a lock context with requests */
682         lock_ctx = ctdb->lock_pending;
683         while (lock_ctx != NULL) {
684                 if (! lock_ctx->req_queue) {
685                         next_ctx = lock_ctx->next;
686                         DEBUG(DEBUG_INFO, ("Removing lock context without lock requests\n"));
687                         DLIST_REMOVE(ctdb->lock_pending, lock_ctx);
688                         ctdb->lock_num_pending--;
689                         CTDB_DECREMENT_STAT(ctdb, locks.num_pending);
690                         if (lock_ctx->ctdb_db) {
691                                 CTDB_DECREMENT_DB_STAT(lock_ctx->ctdb_db, locks.num_pending);
692                         }
693                         talloc_free(lock_ctx);
694                         lock_ctx = next_ctx;
695                         continue;
696                 } else {
697                         /* Found a lock context with lock requests */
698                         break;
699                 }
700         }
701
702         if (lock_ctx == NULL) {
703                 return;
704         }
705
706         lock_ctx->child = -1;
707         ret = pipe(lock_ctx->fd);
708         if (ret != 0) {
709                 DEBUG(DEBUG_ERR, ("Failed to create pipe in ctdb_lock_schedule\n"));
710                 return;
711         }
712
713         parent = getpid();
714         lock_ctx->child = ctdb_fork(ctdb);
715
716         if (lock_ctx->child == (pid_t)-1) {
717                 DEBUG(DEBUG_ERR, ("Failed to create a child in ctdb_lock_schedule\n"));
718                 close(lock_ctx->fd[0]);
719                 close(lock_ctx->fd[1]);
720                 return;
721         }
722
723         /* Child process */
724         if (lock_ctx->child == 0) {
725                 char c;
726                 close(lock_ctx->fd[0]);
727                 debug_extra = lock_child_log_prefix(lock_ctx);
728                 if (ctdb_lock_item(lock_ctx)) {
729                         c = 0;
730                 } else {
731                         c = 1;
732                 }
733                 write(lock_ctx->fd[1], &c, 1);
734
735                 /* Hang around, but if parent dies, terminate */
736                 while (kill(parent, 0) == 0 || errno != ESRCH) {
737                         sleep(5);
738                 }
739                 _exit(0);
740         }
741
742         /* Parent process */
743         close(lock_ctx->fd[1]);
744         set_close_on_exec(lock_ctx->fd[0]);
745
746         talloc_set_destructor(lock_ctx, ctdb_lock_context_destructor);
747
748         /* Set up timeout handler */
749         lock_ctx->ttimer = tevent_add_timer(ctdb->ev,
750                                             lock_ctx,
751                                             timeval_current_ofs(10, 0),
752                                             ctdb_lock_timeout_handler,
753                                             (void *)lock_ctx);
754         if (lock_ctx->ttimer == NULL) {
755                 ctdb_kill(ctdb, lock_ctx->child, SIGKILL);
756                 lock_ctx->child = -1;
757                 talloc_set_destructor(lock_ctx, NULL);
758                 close(lock_ctx->fd[0]);
759                 return;
760         }
761
762         /* Set up callback */
763         lock_ctx->tfd = tevent_add_fd(ctdb->ev,
764                                       lock_ctx,
765                                       lock_ctx->fd[0],
766                                       EVENT_FD_READ,
767                                       ctdb_lock_handler,
768                                       (void *)lock_ctx);
769         if (lock_ctx->tfd == NULL) {
770                 TALLOC_FREE(lock_ctx->ttimer);
771                 ctdb_kill(ctdb, lock_ctx->child, SIGKILL);
772                 lock_ctx->child = -1;
773                 talloc_set_destructor(lock_ctx, NULL);
774                 close(lock_ctx->fd[0]);
775                 return;
776         }
777         tevent_fd_set_auto_close(lock_ctx->tfd);
778
779         /* Move the context from pending to current */
780         DLIST_REMOVE(ctdb->lock_pending, lock_ctx);
781         ctdb->lock_num_pending--;
782         DLIST_ADD_END(ctdb->lock_current, lock_ctx, NULL);
783         ctdb->lock_num_current++;
784 }
785
786
787 /*
788  * Find the lock context of a given type
789  */
790 static struct lock_context *find_lock_context(struct lock_context *lock_list,
791                                               struct ctdb_db_context *ctdb_db,
792                                               TDB_DATA key,
793                                               uint32_t priority,
794                                               enum lock_type type)
795 {
796         struct lock_context *lock_ctx;
797
798         /* Search active locks */
799         for (lock_ctx=lock_list; lock_ctx; lock_ctx=lock_ctx->next) {
800                 if (lock_ctx->type != type) {
801                         continue;
802                 }
803
804                 switch (lock_ctx->type) {
805                 case LOCK_RECORD:
806                         if (ctdb_db == lock_ctx->ctdb_db &&
807                             key.dsize == lock_ctx->key.dsize &&
808                             memcmp(key.dptr, lock_ctx->key.dptr, key.dsize) == 0) {
809                                 goto done;
810                         }
811                         break;
812
813                 case LOCK_DB:
814                         if (ctdb_db == lock_ctx->ctdb_db) {
815                                 goto done;
816                         }
817                         break;
818
819                 case LOCK_ALLDB_PRIO:
820                         if (priority == lock_ctx->priority) {
821                                 goto done;
822                         }
823                         break;
824
825                 case LOCK_ALLDB:
826                         goto done;
827                         break;
828                 }
829         }
830
831         /* Did not find the lock context we are searching for */
832         lock_ctx = NULL;
833
834 done:
835         return lock_ctx;
836
837 }
838
839
840 /*
841  * Lock record / db depending on type
842  */
843 static struct lock_request *ctdb_lock_internal(struct ctdb_context *ctdb,
844                                                struct ctdb_db_context *ctdb_db,
845                                                TDB_DATA key,
846                                                uint32_t priority,
847                                                void (*callback)(void *, bool),
848                                                void *private_data,
849                                                enum lock_type type,
850                                                bool auto_mark)
851 {
852         struct lock_context *lock_ctx;
853         struct lock_request *request;
854
855         if (callback == NULL) {
856                 DEBUG(DEBUG_WARNING, ("No callback function specified, not locking"));
857                 return NULL;
858         }
859
860         /* get a context for this key - search only the pending contexts,
861          * current contexts might in the middle of processing callbacks */
862         lock_ctx = find_lock_context(ctdb->lock_pending, ctdb_db, key, priority, type);
863
864         /* No existing context, create one */
865         if (lock_ctx == NULL) {
866                 lock_ctx = talloc_zero(ctdb, struct lock_context);
867                 if (lock_ctx == NULL) {
868                         DEBUG(DEBUG_ERR, ("Failed to create a new lock context"));
869                         return NULL;
870                 }
871
872                 lock_ctx->type = type;
873                 lock_ctx->ctdb = ctdb;
874                 lock_ctx->ctdb_db = ctdb_db;
875                 lock_ctx->key.dsize = key.dsize;
876                 if (key.dsize > 0) {
877                         lock_ctx->key.dptr = talloc_memdup(lock_ctx, key.dptr, key.dsize);
878                 } else {
879                         lock_ctx->key.dptr = NULL;
880                 }
881                 lock_ctx->priority = priority;
882                 lock_ctx->auto_mark = auto_mark;
883
884                 lock_ctx->child = -1;
885                 lock_ctx->block_child = -1;
886
887                 DLIST_ADD_END(ctdb->lock_pending, lock_ctx, NULL);
888                 ctdb->lock_num_pending++;
889                 CTDB_INCREMENT_STAT(ctdb, locks.num_pending);
890                 if (ctdb_db) {
891                         CTDB_INCREMENT_DB_STAT(ctdb_db, locks.num_pending);
892                 }
893
894                 /* Start the timer when we activate the context */
895                 lock_ctx->start_time = timeval_current();
896         }
897
898         if ((request = talloc_zero(lock_ctx, struct lock_request)) == NULL) {
899                 return NULL;
900         }
901
902         request->lctx = lock_ctx;
903         request->callback = callback;
904         request->private_data = private_data;
905
906         talloc_set_destructor(request, ctdb_lock_request_destructor);
907         DLIST_ADD_END(lock_ctx->req_queue, request, NULL);
908
909         ctdb_lock_schedule(ctdb);
910
911         return request;
912 }
913
914
915 /*
916  * obtain a lock on a record in a database
917  */
918 struct lock_request *ctdb_lock_record(struct ctdb_db_context *ctdb_db,
919                                       TDB_DATA key,
920                                       bool auto_mark,
921                                       void (*callback)(void *, bool),
922                                       void *private_data)
923 {
924         return ctdb_lock_internal(ctdb_db->ctdb,
925                                   ctdb_db,
926                                   key,
927                                   0,
928                                   callback,
929                                   private_data,
930                                   LOCK_RECORD,
931                                   auto_mark);
932 }
933
934
935 /*
936  * obtain a lock on a database
937  */
938 struct lock_request *ctdb_lock_db(struct ctdb_db_context *ctdb_db,
939                                   bool auto_mark,
940                                   void (*callback)(void *, bool),
941                                   void *private_data)
942 {
943         return ctdb_lock_internal(ctdb_db->ctdb,
944                                   ctdb_db,
945                                   tdb_null,
946                                   0,
947                                   callback,
948                                   private_data,
949                                   LOCK_DB,
950                                   auto_mark);
951 }
952
953
954 /*
955  * obtain locks on all databases of specified priority
956  */
957 struct lock_request *ctdb_lock_alldb_prio(struct ctdb_context *ctdb,
958                                           uint32_t priority,
959                                           bool auto_mark,
960                                           void (*callback)(void *, bool),
961                                           void *private_data)
962 {
963         if (priority < 0 || priority > NUM_DB_PRIORITIES) {
964                 DEBUG(DEBUG_ERR, ("Invalid db priority: %u\n", priority));
965                 return NULL;
966         }
967
968         return ctdb_lock_internal(ctdb,
969                                   NULL,
970                                   tdb_null,
971                                   priority,
972                                   callback,
973                                   private_data,
974                                   LOCK_ALLDB_PRIO,
975                                   auto_mark);
976 }
977
978
979 /*
980  * obtain locks on all databases
981  */
982 struct lock_request *ctdb_lock_alldb(struct ctdb_context *ctdb,
983                                      bool auto_mark,
984                                      void (*callback)(void *, bool),
985                                      void *private_data)
986 {
987         return ctdb_lock_internal(ctdb,
988                                   NULL,
989                                   tdb_null,
990                                   0,
991                                   callback,
992                                   private_data,
993                                   LOCK_ALLDB,
994                                   auto_mark);
995 }
996
997 /*
998  * Callback routine to read the PID of blocking process from the child and log
999  *
1000  */
1001 void ctdb_lock_blocked_handler(struct tevent_context *ev,
1002                                 struct tevent_fd *tfd,
1003                                 uint16_t flags,
1004                                 void *private_data)
1005 {
1006         struct lock_context *lock_ctx;
1007         pid_t blocker_pid = -1;
1008         char *process_name = NULL;
1009         const char *db_name = NULL;
1010         ino_t inode;
1011         struct ctdb_db_context *ctdb_db;
1012         int fd;
1013         struct stat stat_buf;
1014
1015         lock_ctx = talloc_get_type_abort(private_data, struct lock_context);
1016
1017         if (read(lock_ctx->block_fd[0], &blocker_pid, sizeof(blocker_pid)) != sizeof(blocker_pid)) {
1018                 DEBUG(DEBUG_ERR, ("Error reading blocker process pid from child\n"));
1019                 goto failed;
1020         }
1021         if (read(lock_ctx->block_fd[0], &inode, sizeof(inode)) != sizeof(inode)) {
1022                 DEBUG(DEBUG_ERR, ("Error reading blocked inode from child\n"));
1023                 goto failed;
1024         }
1025
1026         if (blocker_pid < 0) {
1027                 goto failed;
1028         }
1029
1030         process_name = ctdb_get_process_name(blocker_pid);
1031
1032         if (lock_ctx->type == LOCK_RECORD || lock_ctx->type == LOCK_DB) {
1033                 db_name = lock_ctx->ctdb_db->ltdb->name;
1034         } else {
1035                 for (ctdb_db = lock_ctx->ctdb->db_list; ctdb_db; ctdb_db = ctdb_db->next) {
1036                         fd = tdb_fd(ctdb_db->ltdb->tdb);
1037                         if (fstat(fd, &stat_buf) == 0) {
1038                                 if (stat_buf.st_ino == inode) {
1039                                         db_name = ctdb_db->ltdb->name;
1040                                         break;
1041                                 }
1042                         }
1043                 }
1044         }
1045
1046         if (db_name) {
1047                 DEBUG(DEBUG_WARNING,
1048                       ("Process (pid=%d) blocked in locking\n", lock_ctx->child));
1049                 DEBUG(DEBUG_WARNING,
1050                       ("Process %s (pid=%d) locked database %s (inode %lu) for %.0lf seconds\n",
1051                        (process_name ? process_name : "unknown"),
1052                        blocker_pid, db_name, (unsigned long)inode,
1053                        timeval_elapsed(&lock_ctx->start_time)));
1054         } else {
1055                 DEBUG(DEBUG_WARNING,
1056                       ("Process %s (pid=%d) locked database (inode %lu) for %.0lf seconds\n",
1057                        (process_name ? process_name : "unknown"),
1058                        blocker_pid, (unsigned long)inode,
1059                        timeval_elapsed(&lock_ctx->start_time)));
1060         }
1061
1062         /*
1063          * If ctdb is blocked by smbd for deadlock_interval, detect it as a deadlock
1064          * and kill smbd process.
1065          */
1066         if (lock_ctx->ctdb->tunable.deadlock_timeout > 0 &&
1067             timeval_elapsed(&lock_ctx->start_time) > lock_ctx->ctdb->tunable.deadlock_timeout &&
1068             process_name && strstr(process_name, "smbd")) {
1069                 DEBUG(DEBUG_WARNING,
1070                       ("Deadlock detected. Killing smbd process (pid=%d)", blocker_pid));
1071                 kill(blocker_pid, SIGKILL);
1072         }
1073
1074         free(process_name);
1075
1076 failed:
1077         if (lock_ctx->block_child > 0) {
1078                 ctdb_kill(lock_ctx->ctdb, lock_ctx->block_child, SIGKILL);
1079         }
1080         lock_ctx->block_child = -1;
1081         talloc_free(tfd);
1082 }
1083
1084
1085 /*
1086  * Find processes that holds lock we are interested in
1087  */
1088 void ctdb_lock_find_blocker(struct lock_context *lock_ctx)
1089 {
1090         struct tevent_fd *tfd;
1091         pid_t parent;
1092
1093         if (pipe(lock_ctx->block_fd) < 0) {
1094                 return;
1095         }
1096
1097         parent = getpid();
1098
1099         lock_ctx->block_child = ctdb_fork(lock_ctx->ctdb);
1100         if (lock_ctx->block_child == -1) {
1101                 close(lock_ctx->block_fd[0]);
1102                 close(lock_ctx->block_fd[1]);
1103                 return;
1104         }
1105
1106         /* Child process */
1107         if (lock_ctx->block_child == 0) {
1108                 struct ctdb_lock_info reqlock;
1109                 pid_t blocker_pid = -1;
1110                 bool status;
1111
1112                 close(lock_ctx->block_fd[0]);
1113                 if (ctdb_get_lock_info(lock_ctx->child, &reqlock)) {
1114                         status = ctdb_get_blocker_pid(&reqlock, &blocker_pid);
1115                         if (!status) {
1116                                 /* Could not find blocker pid */
1117                                 blocker_pid = -2;
1118                         }
1119                 }
1120                 write(lock_ctx->block_fd[1], &blocker_pid, sizeof(blocker_pid));
1121                 write(lock_ctx->block_fd[1], &reqlock.inode, sizeof(reqlock.inode));
1122
1123                 /* Hang around till parent dies */
1124                 while (kill(parent, 0) == 0 || errno != ESRCH) {
1125                         sleep(5);
1126                 }
1127                 _exit(0);
1128         }
1129
1130         /* Parent process */
1131         close(lock_ctx->block_fd[1]);
1132         set_close_on_exec(lock_ctx->block_fd[0]);
1133
1134         tfd = tevent_add_fd(lock_ctx->ctdb->ev,
1135                                 lock_ctx,
1136                                 lock_ctx->block_fd[0],
1137                                 EVENT_FD_READ,
1138                                 ctdb_lock_blocked_handler,
1139                                 (void *)lock_ctx);
1140         if (tfd == NULL) {
1141                 ctdb_kill(lock_ctx->ctdb, lock_ctx->block_child, SIGKILL);
1142                 close(lock_ctx->block_fd[0]);
1143         }
1144 }