locking: Schedule a new lock request everytime a lock is released
[obnox/samba/samba-obnox.git] / ctdb / server / ctdb_lock.c
1 /*
2    ctdb lock handling
3    provide API to do non-blocking locks for single or all databases
4
5    Copyright (C) Amitay Isaacs  2012
6
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 3 of the License, or
10    (at your option) any later version.
11
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16
17    You should have received a copy of the GNU General Public License
18    along with this program; if not, see <http://www.gnu.org/licenses/>.
19 */
20 #include "includes.h"
21 #include "include/ctdb_private.h"
22 #include "include/ctdb_protocol.h"
23 #include "tevent.h"
24 #include "tdb.h"
25 #include "db_wrap.h"
26 #include "system/filesys.h"
27 #include "lib/util/dlinklist.h"
28
29 /*
30  * Non-blocking Locking API
31  *
32  * 1. Create a child process to do blocking locks.
33  * 2. Once the locks are obtained, signal parent process via fd.
34  * 3. Invoke registered callback routine with locking status.
35  * 4. If the child process cannot get locks within certain time,
36  *    diagnose using /proc/locks and log warning message
37  *
38  * ctdb_lock_record()      - get a lock on a record
39  * ctdb_lock_db()          - get a lock on a DB
40  * ctdb_lock_alldb_prio()  - get a lock on all DBs with given priority
41  * ctdb_lock_alldb()       - get a lock on all DBs
42  *
43  *  auto_mark              - whether to mark/unmark DBs in before/after callback
44  */
45
46 /* FIXME: Add a tunable max_lock_processes_per_db */
47 #define MAX_LOCK_PROCESSES_PER_DB               (100)
48
49 enum lock_type {
50         LOCK_RECORD,
51         LOCK_DB,
52         LOCK_ALLDB_PRIO,
53         LOCK_ALLDB,
54 };
55
56 struct lock_request;
57
58 /* lock_context is the common part for a lock request */
59 struct lock_context {
60         struct lock_context *next, *prev;
61         enum lock_type type;
62         struct ctdb_context *ctdb;
63         struct ctdb_db_context *ctdb_db;
64         TDB_DATA key;
65         uint32_t priority;
66         bool auto_mark;
67         struct lock_request *req_queue;
68         pid_t child;
69         int fd[2];
70         struct tevent_fd *tfd;
71         struct tevent_timer *ttimer;
72         pid_t block_child;
73         int block_fd[2];
74         struct timeval start_time;
75 };
76
77 /* lock_request is the client specific part for a lock request */
78 struct lock_request {
79         struct lock_request *next, *prev;
80         struct lock_context *lctx;
81         void (*callback)(void *, bool);
82         void *private_data;
83 };
84
85
86 /*
87  * lock all databases
88  */
89 int ctdb_lockall_prio(struct ctdb_context *ctdb, uint32_t priority)
90 {
91         struct ctdb_db_context *ctdb_db;
92
93         for (ctdb_db = ctdb->db_list; ctdb_db; ctdb_db = ctdb_db->next) {
94                 if (ctdb_db->priority != priority) {
95                         continue;
96                 }
97                 DEBUG(DEBUG_INFO, ("locking database %s, priority:%u\n",
98                                    ctdb_db->db_name, priority));
99                 if (tdb_lockall(ctdb_db->ltdb->tdb) != 0) {
100                         DEBUG(DEBUG_ERR, ("Failed to lock database %s\n",
101                                           ctdb_db->db_name));
102                         return -1;
103                 }
104         }
105
106         return 0;
107 }
108
109 static int ctdb_lockall(struct ctdb_context *ctdb)
110 {
111         uint32_t priority;
112
113         for (priority=1; priority<=NUM_DB_PRIORITIES; priority++) {
114                 if (ctdb_lockall_prio(ctdb, priority) != 0) {
115                         return -1;
116                 }
117         }
118
119         return 0;
120 }
121
122
123 /*
124  * unlock all databases
125  */
126 int ctdb_unlockall_prio(struct ctdb_context *ctdb, uint32_t priority)
127 {
128         struct ctdb_db_context *ctdb_db;
129
130         for (ctdb_db = ctdb->db_list; ctdb_db; ctdb_db = ctdb_db->next) {
131                 if (ctdb_db->priority != priority) {
132                         continue;
133                 }
134                 DEBUG(DEBUG_INFO, ("unlocking database %s, priority:%u\n",
135                                    ctdb_db->db_name, priority));
136                 if (tdb_unlockall(ctdb_db->ltdb->tdb) != 0) {
137                         DEBUG(DEBUG_ERR, ("Failed to unlock database %s\n",
138                                           ctdb_db->db_name));
139                         return -1;
140                 }
141         }
142
143         return 0;
144 }
145
146 static int ctdb_unlockall(struct ctdb_context *ctdb)
147 {
148         uint32_t priority;
149
150         for (priority=NUM_DB_PRIORITIES; priority>=0; priority--) {
151                 if (ctdb_unlockall_prio(ctdb, priority) != 0) {
152                         return -1;
153                 }
154         }
155
156         return 0;
157 }
158
159
160 /*
161  * lock all databases - mark only
162  */
163 int ctdb_lockall_mark_prio(struct ctdb_context *ctdb, uint32_t priority)
164 {
165         struct ctdb_db_context *ctdb_db;
166         int tdb_transaction_write_lock_mark(struct tdb_context *);
167
168         /*
169          * This function is only used by the main dameon during recovery.
170          * At this stage, the databases have already been locked, by a
171          * dedicated child process. The freeze_mode variable is used to track
172          * whether the actual locks are held by the child process or not.
173          */
174
175         if (ctdb->freeze_mode[priority] != CTDB_FREEZE_FROZEN) {
176                 DEBUG(DEBUG_ERR, ("Attempt to mark all databases locked when not frozen\n"));
177                 return -1;
178         }
179
180         for (ctdb_db = ctdb->db_list; ctdb_db; ctdb_db = ctdb_db->next) {
181                 if (ctdb_db->priority != priority) {
182                         continue;
183                 }
184                 if (tdb_transaction_write_lock_mark(ctdb_db->ltdb->tdb) != 0) {
185                         return -1;
186                 }
187                 if (tdb_lockall_mark(ctdb_db->ltdb->tdb) != 0) {
188                         /* FIXME: Shouldn't we unmark here? */
189                         return -1;
190                 }
191         }
192
193         return 0;
194 }
195
196 static int ctdb_lockall_mark(struct ctdb_context *ctdb)
197 {
198         uint32_t priority;
199
200         for (priority=1; priority<=NUM_DB_PRIORITIES; priority++) {
201                 if (ctdb_lockall_mark_prio(ctdb, priority) != 0) {
202                         return -1;
203                 }
204         }
205
206         return 0;
207 }
208
209
210 /*
211  * lock all databases - unmark only
212  */
213 int ctdb_lockall_unmark_prio(struct ctdb_context *ctdb, uint32_t priority)
214 {
215         struct ctdb_db_context *ctdb_db;
216         int tdb_transaction_write_lock_unmark(struct tdb_context *);
217
218         /*
219          * This function is only used by the main dameon during recovery.
220          * At this stage, the databases have already been locked, by a
221          * dedicated child process. The freeze_mode variable is used to track
222          * whether the actual locks are held by the child process or not.
223          */
224
225         if (ctdb->freeze_mode[priority] != CTDB_FREEZE_FROZEN) {
226                 DEBUG(DEBUG_ERR, ("Attempt to unmark all databases locked when not frozen\n"));
227                 return -1;
228         }
229
230         for (ctdb_db = ctdb->db_list; ctdb_db; ctdb_db = ctdb_db->next) {
231                 if (ctdb_db->priority != priority) {
232                         continue;
233                 }
234                 if (tdb_transaction_write_lock_unmark(ctdb_db->ltdb->tdb) != 0) {
235                         return -1;
236                 }
237                 if (tdb_lockall_unmark(ctdb_db->ltdb->tdb) != 0) {
238                         return -1;
239                 }
240         }
241
242         return 0;
243 }
244
245 static int ctdb_lockall_unmark(struct ctdb_context *ctdb)
246 {
247         uint32_t priority;
248
249         for (priority=NUM_DB_PRIORITIES; priority>=0; priority--) {
250                 if (ctdb_lockall_unmark_prio(ctdb, priority) != 0) {
251                         return -1;
252                 }
253         }
254
255         return 0;
256 }
257
258
259 /*
260  * Lock record / db depending on lock_ctx->type
261  * Called from child context.
262  */
263 static bool ctdb_lock_item(struct lock_context *lock_ctx)
264 {
265         bool status = false;
266
267         switch (lock_ctx->type) {
268         case LOCK_RECORD:
269                 if (tdb_chainlock(lock_ctx->ctdb_db->ltdb->tdb, lock_ctx->key) == 0) {
270                         status = true;
271                 }
272                 break;
273
274         case LOCK_DB:
275                 if (tdb_lockall(lock_ctx->ctdb_db->ltdb->tdb) == 0) {
276                         status = true;
277                 }
278                 break;
279
280         case LOCK_ALLDB_PRIO:
281                 if (ctdb_lockall_prio(lock_ctx->ctdb, lock_ctx->priority) == 0) {
282                         status = true;
283                 }
284                 break;
285
286         case LOCK_ALLDB:
287                 if (ctdb_lockall(lock_ctx->ctdb) == 0) {
288                         status = true;
289                 }
290                 break;
291         }
292
293         return status;
294 }
295
296
297 /*
298  * Unlock record / db depending on lock_ctx->type
299  */
300 void ctdb_unlock_item(struct lock_context *lock_ctx)
301 {
302         switch (lock_ctx->type) {
303         case LOCK_RECORD:
304                 tdb_chainunlock(lock_ctx->ctdb_db->ltdb->tdb, lock_ctx->key);
305                 break;
306
307         case LOCK_DB:
308                 tdb_unlockall(lock_ctx->ctdb_db->ltdb->tdb);
309                 break;
310
311         case LOCK_ALLDB_PRIO:
312                 ctdb_unlockall_prio(lock_ctx->ctdb, lock_ctx->priority);
313                 break;
314
315         case LOCK_ALLDB:
316                 ctdb_unlockall(lock_ctx->ctdb);
317                 break;
318         }
319 }
320
321 static void ctdb_lock_schedule(struct ctdb_context *ctdb);
322
323 /*
324  * Destructor to kill the child locking process
325  */
326 static int ctdb_lock_context_destructor(struct lock_context *lock_ctx)
327 {
328         if (lock_ctx->child > 0) {
329                 ctdb_kill(lock_ctx->ctdb, lock_ctx->child, SIGKILL);
330                 DLIST_REMOVE(lock_ctx->ctdb->lock_current, lock_ctx);
331                 lock_ctx->ctdb->lock_num_current--;
332                 CTDB_DECREMENT_STAT(lock_ctx->ctdb, locks.num_current);
333                 if (lock_ctx->type == LOCK_RECORD || lock_ctx->type == LOCK_DB) {
334                         CTDB_DECREMENT_DB_STAT(lock_ctx->ctdb_db, locks.num_current);
335                 }
336         } else {
337                 DLIST_REMOVE(lock_ctx->ctdb->lock_pending, lock_ctx);
338                 lock_ctx->ctdb->lock_num_pending--;
339                 CTDB_DECREMENT_STAT(lock_ctx->ctdb, locks.num_pending);
340                 if (lock_ctx->type == LOCK_RECORD || lock_ctx->type == LOCK_DB) {
341                         CTDB_DECREMENT_DB_STAT(lock_ctx->ctdb_db, locks.num_pending);
342                 }
343         }
344
345         ctdb_lock_schedule(lock_ctx->ctdb);
346
347         return 0;
348 }
349
350
351 /*
352  * Destructor to remove lock request
353  */
354 static int ctdb_lock_request_destructor(struct lock_request *lock_request)
355 {
356         DLIST_REMOVE(lock_request->lctx->req_queue, lock_request);
357         return 0;
358 }
359
360
361 void ctdb_lock_free_request_context(struct lock_request *lock_req)
362 {
363         struct lock_context *lock_ctx;
364
365         lock_ctx = lock_req->lctx;
366         talloc_free(lock_req);
367         talloc_free(lock_ctx);
368 }
369
370
371 /*
372  * Process all the callbacks waiting for lock
373  *
374  * If lock has failed, callback is executed with locked=false
375  */
376 static void process_callbacks(struct lock_context *lock_ctx, bool locked)
377 {
378         struct lock_request *request, *next;
379
380         if (lock_ctx->auto_mark && locked) {
381                 switch (lock_ctx->type) {
382                 case LOCK_RECORD:
383                         tdb_chainlock_mark(lock_ctx->ctdb_db->ltdb->tdb, lock_ctx->key);
384                         break;
385
386                 case LOCK_DB:
387                         tdb_lockall_mark(lock_ctx->ctdb_db->ltdb->tdb);
388                         break;
389
390                 case LOCK_ALLDB_PRIO:
391                         ctdb_lockall_mark_prio(lock_ctx->ctdb, lock_ctx->priority);
392                         break;
393
394                 case LOCK_ALLDB:
395                         ctdb_lockall_mark(lock_ctx->ctdb);
396                         break;
397                 }
398         }
399
400         /* Iterate through all callbacks */
401         request = lock_ctx->req_queue;
402         while (request) {
403                 if (lock_ctx->auto_mark) {
404                         /* Reset the destructor, so request is not removed from the list */
405                         talloc_set_destructor(request, NULL);
406                 }
407
408                 /* In case, callback frees the request, store next */
409                 next = request->next;
410                 request->callback(request->private_data, locked);
411                 request = next;
412         }
413
414         if (lock_ctx->auto_mark && locked) {
415                 switch (lock_ctx->type) {
416                 case LOCK_RECORD:
417                         tdb_chainlock_unmark(lock_ctx->ctdb_db->ltdb->tdb, lock_ctx->key);
418                         break;
419
420                 case LOCK_DB:
421                         tdb_lockall_unmark(lock_ctx->ctdb_db->ltdb->tdb);
422                         break;
423
424                 case LOCK_ALLDB_PRIO:
425                         ctdb_lockall_unmark_prio(lock_ctx->ctdb, lock_ctx->priority);
426                         break;
427
428                 case LOCK_ALLDB:
429                         ctdb_lockall_unmark(lock_ctx->ctdb);
430                         break;
431                 }
432         }
433 }
434
435
436 static int lock_bucket_id(double t)
437 {
438         double us = 1.e-6, ms = 1.e-3, s = 1;
439         int id;
440
441         if (t < 1*us) {
442                 id = 0;
443         } else if (t < 10*us) {
444                 id = 1;
445         } else if (t < 100*us) {
446                 id = 2;
447         } else if (t < 1*ms) {
448                 id = 3;
449         } else if (t < 10*ms) {
450                 id = 4;
451         } else if (t < 100*ms) {
452                 id = 5;
453         } else if (t < 1*s) {
454                 id = 6;
455         } else if (t < 10*s) {
456                 id = 7;
457         } else {
458                 id = 8;
459         }
460
461         return id;
462 }
463
464 /*
465  * Callback routine when the required locks are obtained.
466  * Called from parent context
467  */
468 static void ctdb_lock_handler(struct tevent_context *ev,
469                             struct tevent_fd *tfd,
470                             uint16_t flags,
471                             void *private_data)
472 {
473         struct lock_context *lock_ctx;
474         TALLOC_CTX *tmp_ctx;
475         char c;
476         bool locked;
477         double t;
478         int id;
479
480         lock_ctx = talloc_get_type_abort(private_data, struct lock_context);
481
482         /* cancel the timeout event */
483         if (lock_ctx->ttimer) {
484                 TALLOC_FREE(lock_ctx->ttimer);
485         }
486
487         t = timeval_elapsed(&lock_ctx->start_time);
488         id = lock_bucket_id(t);
489
490         if (lock_ctx->auto_mark) {
491                 tmp_ctx = talloc_new(ev);
492                 talloc_steal(tmp_ctx, lock_ctx);
493         }
494
495         /* Read the status from the child process */
496         read(lock_ctx->fd[0], &c, 1);
497         locked = (c == 0 ? true : false);
498
499         /* Update statistics */
500         CTDB_DECREMENT_STAT(lock_ctx->ctdb, locks.num_pending);
501         CTDB_INCREMENT_STAT(lock_ctx->ctdb, locks.num_calls);
502         if (lock_ctx->ctdb_db) {
503                 CTDB_DECREMENT_DB_STAT(lock_ctx->ctdb_db, locks.num_pending);
504                 CTDB_INCREMENT_DB_STAT(lock_ctx->ctdb_db, locks.num_calls);
505         }
506
507         if (locked) {
508                 CTDB_INCREMENT_STAT(lock_ctx->ctdb, locks.num_current);
509                 CTDB_UPDATE_RECLOCK_LATENCY(lock_ctx->ctdb, "lock()", locks.latency, t);
510                 CTDB_INCREMENT_STAT(lock_ctx->ctdb, locks.buckets[id]);
511                 if (lock_ctx->ctdb_db) {
512                         CTDB_INCREMENT_DB_STAT(lock_ctx->ctdb_db, locks.num_current);
513                         CTDB_UPDATE_DB_RECLOCK_LATENCY(lock_ctx->ctdb_db, "lock()", locks.latency, t);
514                         CTDB_INCREMENT_DB_STAT(lock_ctx->ctdb_db, locks.buckets[id]);
515                 }
516         } else {
517                 CTDB_INCREMENT_STAT(lock_ctx->ctdb, locks.num_failed);
518                 if (lock_ctx->ctdb_db) {
519                         CTDB_INCREMENT_DB_STAT(lock_ctx->ctdb_db, locks.num_failed);
520                 }
521         }
522
523         process_callbacks(lock_ctx, locked);
524
525         if (lock_ctx->auto_mark) {
526                 talloc_free(tmp_ctx);
527         }
528 }
529
530
531 static void ctdb_lock_find_blocker(struct lock_context *lock_ctx);
532
533 /*
534  * Callback routine when required locks are not obtained within timeout
535  * Called from parent context
536  */
537 static void ctdb_lock_timeout_handler(struct tevent_context *ev,
538                                     struct tevent_timer *ttimer,
539                                     struct timeval current_time,
540                                     void *private_data)
541 {
542         struct lock_context *lock_ctx;
543         struct ctdb_context *ctdb;
544
545         lock_ctx = talloc_get_type_abort(private_data, struct lock_context);
546         ctdb = lock_ctx->ctdb;
547
548         /* fire a child process to find the blocking process */
549         if (lock_ctx->block_child == -1) {
550                 ctdb_lock_find_blocker(lock_ctx);
551         }
552
553         /* reset the timeout timer */
554         // talloc_free(lock_ctx->ttimer);
555         lock_ctx->ttimer = tevent_add_timer(ctdb->ev,
556                                             lock_ctx,
557                                             timeval_current_ofs(10, 0),
558                                             ctdb_lock_timeout_handler,
559                                             (void *)lock_ctx);
560 }
561
562
563 static char *lock_child_log_prefix(struct lock_context *lock_ctx)
564 {
565         char *prefix;
566         pid_t pid;
567
568         pid = getpid();
569
570         switch (lock_ctx->type) {
571         case LOCK_RECORD:
572                 prefix = talloc_asprintf(NULL, "lockR(%d): ", pid);
573                 break;
574
575         case LOCK_DB:
576                 prefix = talloc_asprintf(NULL, "lockD(%d): ", pid);
577                 break;
578
579         case LOCK_ALLDB_PRIO:
580                 prefix = talloc_asprintf(NULL, "lockP(%d): ", pid);
581                 break;
582
583         case LOCK_ALLDB:
584                 prefix = talloc_asprintf(NULL, "lockA(%d): ", pid);
585                 break;
586         }
587
588         return prefix;
589 }
590
591
592 /*
593  * Schedule a new lock child process
594  * Set up callback handler and timeout handler
595  */
596 static void ctdb_lock_schedule(struct ctdb_context *ctdb)
597 {
598         struct lock_context *lock_ctx, *next_ctx;
599         int ret;
600         pid_t parent;
601
602         if (ctdb->lock_num_current >= MAX_LOCK_PROCESSES_PER_DB) {
603                 return;
604         }
605
606         if (ctdb->lock_pending == NULL) {
607                 return;
608         }
609
610         /* Find a lock context with requests */
611         lock_ctx = ctdb->lock_pending;
612         while (lock_ctx != NULL) {
613                 if (! lock_ctx->req_queue) {
614                         next_ctx = lock_ctx->next;
615                         DEBUG(DEBUG_INFO, ("Removing lock context without lock requests\n"));
616                         DLIST_REMOVE(ctdb->lock_pending, lock_ctx);
617                         ctdb->lock_num_pending--;
618                         CTDB_DECREMENT_STAT(ctdb, locks.num_pending);
619                         if (lock_ctx->ctdb_db) {
620                                 CTDB_DECREMENT_DB_STAT(lock_ctx->ctdb_db, locks.num_pending);
621                         }
622                         talloc_free(lock_ctx);
623                         lock_ctx = next_ctx;
624                         continue;
625                 } else {
626                         /* Found a lock context with lock requests */
627                         break;
628                 }
629         }
630
631         if (lock_ctx == NULL) {
632                 return;
633         }
634
635         lock_ctx->child = -1;
636         ret = pipe(lock_ctx->fd);
637         if (ret != 0) {
638                 DEBUG(DEBUG_ERR, ("Failed to create pipe in ctdb_lock_schedule\n"));
639                 return;
640         }
641
642         parent = getpid();
643         lock_ctx->child = ctdb_fork(ctdb);
644
645         if (lock_ctx->child == (pid_t)-1) {
646                 DEBUG(DEBUG_ERR, ("Failed to create a child in ctdb_lock_schedule\n"));
647                 close(lock_ctx->fd[0]);
648                 close(lock_ctx->fd[1]);
649                 return;
650         }
651
652         /* Child process */
653         if (lock_ctx->child == 0) {
654                 char c;
655                 close(lock_ctx->fd[0]);
656                 debug_extra = lock_child_log_prefix(lock_ctx);
657                 if (ctdb_lock_item(lock_ctx)) {
658                         c = 0;
659                 } else {
660                         c = 1;
661                 }
662                 write(lock_ctx->fd[1], &c, 1);
663
664                 /* Hang around, but if parent dies, terminate */
665                 while (kill(parent, 0) == 0 || errno != ESRCH) {
666                         sleep(5);
667                 }
668                 _exit(0);
669         }
670
671         /* Parent process */
672         close(lock_ctx->fd[1]);
673         set_close_on_exec(lock_ctx->fd[0]);
674
675         talloc_set_destructor(lock_ctx, ctdb_lock_context_destructor);
676
677         /* Set up timeout handler */
678         lock_ctx->ttimer = tevent_add_timer(ctdb->ev,
679                                             lock_ctx,
680                                             timeval_current_ofs(10, 0),
681                                             ctdb_lock_timeout_handler,
682                                             (void *)lock_ctx);
683         if (lock_ctx->ttimer == NULL) {
684                 ctdb_kill(ctdb, lock_ctx->child, SIGKILL);
685                 lock_ctx->child = -1;
686                 talloc_set_destructor(lock_ctx, NULL);
687                 close(lock_ctx->fd[0]);
688                 return;
689         }
690
691         /* Set up callback */
692         lock_ctx->tfd = tevent_add_fd(ctdb->ev,
693                                       lock_ctx,
694                                       lock_ctx->fd[0],
695                                       EVENT_FD_READ,
696                                       ctdb_lock_handler,
697                                       (void *)lock_ctx);
698         if (lock_ctx->tfd == NULL) {
699                 TALLOC_FREE(lock_ctx->ttimer);
700                 ctdb_kill(ctdb, lock_ctx->child, SIGKILL);
701                 lock_ctx->child = -1;
702                 talloc_set_destructor(lock_ctx, NULL);
703                 close(lock_ctx->fd[0]);
704                 return;
705         }
706         tevent_fd_set_auto_close(lock_ctx->tfd);
707
708         /* Move the context from pending to current */
709         DLIST_REMOVE(ctdb->lock_pending, lock_ctx);
710         ctdb->lock_num_pending--;
711         DLIST_ADD_END(ctdb->lock_current, lock_ctx, NULL);
712         ctdb->lock_num_current++;
713 }
714
715
716 /*
717  * Find the lock context of a given type
718  */
719 static struct lock_context *find_lock_context(struct lock_context *lock_list,
720                                               struct ctdb_db_context *ctdb_db,
721                                               TDB_DATA key,
722                                               uint32_t priority,
723                                               enum lock_type type)
724 {
725         struct lock_context *lock_ctx;
726
727         /* Search active locks */
728         for (lock_ctx=lock_list; lock_ctx; lock_ctx=lock_ctx->next) {
729                 if (lock_ctx->type != type) {
730                         continue;
731                 }
732
733                 switch (lock_ctx->type) {
734                 case LOCK_RECORD:
735                         if (ctdb_db == lock_ctx->ctdb_db &&
736                             key.dsize == lock_ctx->key.dsize &&
737                             memcmp(key.dptr, lock_ctx->key.dptr, key.dsize) == 0) {
738                                 goto done;
739                         }
740                         break;
741
742                 case LOCK_DB:
743                         if (ctdb_db == lock_ctx->ctdb_db) {
744                                 goto done;
745                         }
746                         break;
747
748                 case LOCK_ALLDB_PRIO:
749                         if (priority == lock_ctx->priority) {
750                                 goto done;
751                         }
752                         break;
753
754                 case LOCK_ALLDB:
755                         goto done;
756                         break;
757                 }
758         }
759
760         /* Did not find the lock context we are searching for */
761         lock_ctx = NULL;
762
763 done:
764         return lock_ctx;
765
766 }
767
768
769 /*
770  * Lock record / db depending on type
771  */
772 static struct lock_request *ctdb_lock_internal(struct ctdb_context *ctdb,
773                                                struct ctdb_db_context *ctdb_db,
774                                                TDB_DATA key,
775                                                uint32_t priority,
776                                                void (*callback)(void *, bool),
777                                                void *private_data,
778                                                enum lock_type type,
779                                                bool auto_mark)
780 {
781         struct lock_context *lock_ctx;
782         struct lock_request *request;
783
784         if (callback == NULL) {
785                 DEBUG(DEBUG_WARNING, ("No callback function specified, not locking"));
786                 return NULL;
787         }
788
789         /* get a context for this key - search only the pending contexts,
790          * current contexts might in the middle of processing callbacks */
791         lock_ctx = find_lock_context(ctdb->lock_pending, ctdb_db, key, priority, type);
792
793         /* No existing context, create one */
794         if (lock_ctx == NULL) {
795                 lock_ctx = talloc_zero(ctdb, struct lock_context);
796                 if (lock_ctx == NULL) {
797                         DEBUG(DEBUG_ERR, ("Failed to create a new lock context"));
798                         return NULL;
799                 }
800
801                 lock_ctx->type = type;
802                 lock_ctx->ctdb = ctdb;
803                 lock_ctx->ctdb_db = ctdb_db;
804                 lock_ctx->key.dsize = key.dsize;
805                 if (key.dsize > 0) {
806                         lock_ctx->key.dptr = talloc_memdup(lock_ctx, key.dptr, key.dsize);
807                 } else {
808                         lock_ctx->key.dptr = NULL;
809                 }
810                 lock_ctx->priority = priority;
811                 lock_ctx->auto_mark = auto_mark;
812
813                 lock_ctx->child = -1;
814                 lock_ctx->block_child = -1;
815
816                 DLIST_ADD_END(ctdb->lock_pending, lock_ctx, NULL);
817                 ctdb->lock_num_pending++;
818                 CTDB_INCREMENT_STAT(ctdb, locks.num_pending);
819                 if (ctdb_db) {
820                         CTDB_INCREMENT_DB_STAT(ctdb_db, locks.num_pending);
821                 }
822
823                 /* Start the timer when we activate the context */
824                 lock_ctx->start_time = timeval_current();
825         }
826
827         if ((request = talloc_zero(lock_ctx, struct lock_request)) == NULL) {
828                 return NULL;
829         }
830
831         request->lctx = lock_ctx;
832         request->callback = callback;
833         request->private_data = private_data;
834
835         talloc_set_destructor(request, ctdb_lock_request_destructor);
836         DLIST_ADD_END(lock_ctx->req_queue, request, NULL);
837
838         ctdb_lock_schedule(ctdb);
839
840         return request;
841 }
842
843
844 /*
845  * obtain a lock on a record in a database
846  */
847 struct lock_request *ctdb_lock_record(struct ctdb_db_context *ctdb_db,
848                                       TDB_DATA key,
849                                       bool auto_mark,
850                                       void (*callback)(void *, bool),
851                                       void *private_data)
852 {
853         return ctdb_lock_internal(ctdb_db->ctdb,
854                                   ctdb_db,
855                                   key,
856                                   0,
857                                   callback,
858                                   private_data,
859                                   LOCK_RECORD,
860                                   auto_mark);
861 }
862
863
864 /*
865  * obtain a lock on a database
866  */
867 struct lock_request *ctdb_lock_db(struct ctdb_db_context *ctdb_db,
868                                   bool auto_mark,
869                                   void (*callback)(void *, bool),
870                                   void *private_data)
871 {
872         return ctdb_lock_internal(ctdb_db->ctdb,
873                                   ctdb_db,
874                                   tdb_null,
875                                   0,
876                                   callback,
877                                   private_data,
878                                   LOCK_DB,
879                                   auto_mark);
880 }
881
882
883 /*
884  * obtain locks on all databases of specified priority
885  */
886 struct lock_request *ctdb_lock_alldb_prio(struct ctdb_context *ctdb,
887                                           uint32_t priority,
888                                           bool auto_mark,
889                                           void (*callback)(void *, bool),
890                                           void *private_data)
891 {
892         if (priority < 0 || priority > NUM_DB_PRIORITIES) {
893                 DEBUG(DEBUG_ERR, ("Invalid db priority: %u\n", priority));
894                 return NULL;
895         }
896
897         return ctdb_lock_internal(ctdb,
898                                   NULL,
899                                   tdb_null,
900                                   priority,
901                                   callback,
902                                   private_data,
903                                   LOCK_ALLDB_PRIO,
904                                   auto_mark);
905 }
906
907
908 /*
909  * obtain locks on all databases
910  */
911 struct lock_request *ctdb_lock_alldb(struct ctdb_context *ctdb,
912                                      bool auto_mark,
913                                      void (*callback)(void *, bool),
914                                      void *private_data)
915 {
916         return ctdb_lock_internal(ctdb,
917                                   NULL,
918                                   tdb_null,
919                                   0,
920                                   callback,
921                                   private_data,
922                                   LOCK_ALLDB,
923                                   auto_mark);
924 }
925
926 /*
927  * Callback routine to read the PID of blocking process from the child and log
928  *
929  */
930 void ctdb_lock_blocked_handler(struct tevent_context *ev,
931                                 struct tevent_fd *tfd,
932                                 uint16_t flags,
933                                 void *private_data)
934 {
935         struct lock_context *lock_ctx;
936         pid_t blocker_pid = -1;
937         char *process_name = NULL;
938         const char *db_name = NULL;
939         ino_t inode;
940         struct ctdb_db_context *ctdb_db;
941         int fd;
942         struct stat stat_buf;
943
944         lock_ctx = talloc_get_type_abort(private_data, struct lock_context);
945
946         if (read(lock_ctx->block_fd[0], &blocker_pid, sizeof(blocker_pid)) != sizeof(blocker_pid)) {
947                 DEBUG(DEBUG_ERR, ("Error reading blocker process pid from child\n"));
948                 goto failed;
949         }
950         if (read(lock_ctx->block_fd[0], &inode, sizeof(inode)) != sizeof(inode)) {
951                 DEBUG(DEBUG_ERR, ("Error reading blocked inode from child\n"));
952                 goto failed;
953         }
954
955         if (blocker_pid < 0) {
956                 goto failed;
957         }
958
959         process_name = ctdb_get_process_name(blocker_pid);
960
961         if (lock_ctx->type == LOCK_RECORD || lock_ctx->type == LOCK_DB) {
962                 db_name = lock_ctx->ctdb_db->ltdb->name;
963         } else {
964                 for (ctdb_db = lock_ctx->ctdb->db_list; ctdb_db; ctdb_db = ctdb_db->next) {
965                         fd = tdb_fd(ctdb_db->ltdb->tdb);
966                         if (fstat(fd, &stat_buf) == 0) {
967                                 if (stat_buf.st_ino == inode) {
968                                         db_name = ctdb_db->ltdb->name;
969                                         break;
970                                 }
971                         }
972                 }
973         }
974
975         if (db_name) {
976                 DEBUG(DEBUG_WARNING,
977                       ("Process (pid=%d) blocked in locking\n", lock_ctx->child));
978                 DEBUG(DEBUG_WARNING,
979                       ("Process %s (pid=%d) locked database %s (inode %lu) for %.0lf seconds\n",
980                        (process_name ? process_name : "unknown"),
981                        blocker_pid, db_name, inode,
982                        timeval_elapsed(&lock_ctx->start_time)));
983         } else {
984                 DEBUG(DEBUG_WARNING,
985                       ("Process %s (pid=%d) locked database (inode %lu) for %.0lf seconds\n",
986                        (process_name ? process_name : "unknown"),
987                        blocker_pid, inode,
988                        timeval_elapsed(&lock_ctx->start_time)));
989         }
990
991         /*
992          * If ctdb is blocked by smbd for deadlock_interval, detect it as a deadlock
993          * and kill smbd process.
994          */
995         if (lock_ctx->ctdb->tunable.deadlock_timeout > 0 &&
996             timeval_elapsed(&lock_ctx->start_time) > lock_ctx->ctdb->tunable.deadlock_timeout &&
997             process_name && strstr(process_name, "smbd")) {
998                 DEBUG(DEBUG_WARNING,
999                       ("Deadlock detected. Killing smbd process (pid=%d)", blocker_pid));
1000                 ctdb_kill(lock_ctx->ctdb, blocker_pid, SIGKILL);
1001         }
1002
1003         free(process_name);
1004
1005 failed:
1006         if (lock_ctx->block_child > 0) {
1007                 ctdb_kill(lock_ctx->ctdb, lock_ctx->block_child, SIGKILL);
1008         }
1009         lock_ctx->block_child = -1;
1010         talloc_free(tfd);
1011 }
1012
1013
1014 /*
1015  * Find processes that holds lock we are interested in
1016  */
1017 void ctdb_lock_find_blocker(struct lock_context *lock_ctx)
1018 {
1019         struct tevent_fd *tfd;
1020         pid_t parent;
1021
1022         if (pipe(lock_ctx->block_fd) < 0) {
1023                 return;
1024         }
1025
1026         parent = getpid();
1027
1028         lock_ctx->block_child = ctdb_fork(lock_ctx->ctdb);
1029         if (lock_ctx->block_child == -1) {
1030                 close(lock_ctx->block_fd[0]);
1031                 close(lock_ctx->block_fd[1]);
1032                 return;
1033         }
1034
1035         /* Child process */
1036         if (lock_ctx->block_child == 0) {
1037                 struct ctdb_lock_info reqlock;
1038                 pid_t blocker_pid = -1;
1039                 bool status;
1040
1041                 close(lock_ctx->block_fd[0]);
1042                 if (ctdb_get_lock_info(lock_ctx->child, &reqlock)) {
1043                         status = ctdb_get_blocker_pid(&reqlock, &blocker_pid);
1044                         if (!status) {
1045                                 /* Could not find blocker pid */
1046                                 blocker_pid = -2;
1047                         }
1048                 }
1049                 write(lock_ctx->block_fd[1], &blocker_pid, sizeof(blocker_pid));
1050                 write(lock_ctx->block_fd[1], &reqlock.inode, sizeof(reqlock.inode));
1051
1052                 /* Hang around till parent dies */
1053                 while (kill(parent, 0) == 0 || errno != ESRCH) {
1054                         sleep(5);
1055                 }
1056                 _exit(0);
1057         }
1058
1059         /* Parent process */
1060         close(lock_ctx->block_fd[1]);
1061         set_close_on_exec(lock_ctx->block_fd[0]);
1062
1063         tfd = tevent_add_fd(lock_ctx->ctdb->ev,
1064                                 lock_ctx,
1065                                 lock_ctx->block_fd[0],
1066                                 EVENT_FD_READ,
1067                                 ctdb_lock_blocked_handler,
1068                                 (void *)lock_ctx);
1069         if (tfd == NULL) {
1070                 ctdb_kill(lock_ctx->ctdb, lock_ctx->block_child, SIGKILL);
1071                 close(lock_ctx->block_fd[0]);
1072         }
1073 }