f6220e02033a0bb8cff30d9aade4b77a381abc66
[obnox/ctdb.git] / server / ctdb_lock.c
1 /*
2    ctdb lock handling
3    provide API to do non-blocking locks for single or all databases
4
5    Copyright (C) Amitay Isaacs  2012
6
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 3 of the License, or
10    (at your option) any later version.
11
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16
17    You should have received a copy of the GNU General Public License
18    along with this program; if not, see <http://www.gnu.org/licenses/>.
19 */
20 #include "includes.h"
21 #include "include/ctdb_private.h"
22 #include "include/ctdb_protocol.h"
23 #include "tevent.h"
24 #include "tdb.h"
25 #include "db_wrap.h"
26 #include "system/filesys.h"
27 #include "lib/util/dlinklist.h"
28
29 /*
30  * Non-blocking Locking API
31  *
32  * 1. Create a child process to do blocking locks.
33  * 2. Once the locks are obtained, signal parent process via fd.
34  * 3. Invoke registered callback routine with locking status.
35  * 4. If the child process cannot get locks within certain time,
36  *    diagnose using /proc/locks and log warning message
37  *
38  * ctdb_lock_record()      - get a lock on a record
39  * ctdb_lock_db()          - get a lock on a DB
40  * ctdb_lock_alldb_prio()  - get a lock on all DBs with given priority
41  * ctdb_lock_alldb()       - get a lock on all DBs
42  *
43  *  auto_mark              - whether to mark/unmark DBs in before/after callback
44  */
45
46 /* FIXME: Add a tunable max_lock_processes_per_db */
47 #define MAX_LOCK_PROCESSES_PER_DB               (100)
48
49 enum lock_type {
50         LOCK_RECORD,
51         LOCK_DB,
52         LOCK_ALLDB_PRIO,
53         LOCK_ALLDB,
54 };
55
56 static const char * const lock_type_str[] = {
57         "lock_record",
58         "lock_db",
59         "lock_alldb_prio",
60         "lock_db",
61 };
62
63 struct lock_request;
64
65 /* lock_context is the common part for a lock request */
66 struct lock_context {
67         struct lock_context *next, *prev;
68         enum lock_type type;
69         struct ctdb_context *ctdb;
70         struct ctdb_db_context *ctdb_db;
71         TDB_DATA key;
72         uint32_t priority;
73         bool auto_mark;
74         struct lock_request *req_queue;
75         pid_t child;
76         int fd[2];
77         struct tevent_fd *tfd;
78         struct tevent_timer *ttimer;
79         pid_t block_child;
80         int block_fd[2];
81         struct timeval start_time;
82 };
83
84 /* lock_request is the client specific part for a lock request */
85 struct lock_request {
86         struct lock_request *next, *prev;
87         struct lock_context *lctx;
88         void (*callback)(void *, bool);
89         void *private_data;
90 };
91
92
93 /*
94  * Support samba 3.6.x (and older) versions which do not set db priority.
95  *
96  * By default, all databases are set to priority 1. So only when priority
97  * is set to 1, check for databases that need higher priority.
98  */
99 static bool later_db(struct ctdb_context *ctdb, const char *name)
100 {
101         if (ctdb->tunable.samba3_hack == 0) {
102                 return false;
103         }
104
105         if (strstr(name, "brlock") ||
106             strstr(name, "g_lock") ||
107             strstr(name, "notify_onelevel") ||
108             strstr(name, "serverid") ||
109             strstr(name, "xattr_tdb")) {
110                 return true;
111         }
112
113         return false;
114 }
115
116 typedef int (*db_handler_t)(struct ctdb_db_context *ctdb_db,
117                             uint32_t priority,
118                             void *private_data);
119
120 static int ctdb_db_iterator(struct ctdb_context *ctdb, uint32_t priority,
121                             db_handler_t handler, void *private_data)
122 {
123         struct ctdb_db_context *ctdb_db;
124         int ret;
125
126         for (ctdb_db = ctdb->db_list; ctdb_db; ctdb_db = ctdb_db->next) {
127                 if (ctdb_db->priority != priority) {
128                         continue;
129                 }
130                 if (later_db(ctdb, ctdb_db->db_name)) {
131                         continue;
132                 }
133                 ret = handler(ctdb_db, priority, private_data);
134                 if (ret != 0) {
135                         return -1;
136                 }
137         }
138
139         /* If priority != 1, later_db check is not required and can return */
140         if (priority != 1) {
141                 return 0;
142         }
143
144         for (ctdb_db = ctdb->db_list; ctdb_db; ctdb_db = ctdb_db->next) {
145                 if (!later_db(ctdb, ctdb_db->db_name)) {
146                         continue;
147                 }
148                 ret = handler(ctdb_db, priority, private_data);
149                 if (ret != 0) {
150                         return -1;
151                 }
152         }
153
154         return 0;
155 }
156
157
158 /*
159  * lock all databases
160  */
161 static int db_lock_handler(struct ctdb_db_context *ctdb_db, uint32_t priority,
162                            void *private_data)
163 {
164         if (priority == 0) {
165                 DEBUG(DEBUG_INFO, ("locking database %s\n",
166                                    ctdb_db->db_name));
167         } else {
168                 DEBUG(DEBUG_INFO, ("locking database %s, priority:%u\n",
169                                    ctdb_db->db_name, priority));
170         }
171
172         if (tdb_lockall(ctdb_db->ltdb->tdb) != 0) {
173                 DEBUG(DEBUG_ERR, ("Failed to lock database %s\n",
174                                   ctdb_db->db_name));
175                 return -1;
176         }
177
178         return 0;
179 }
180
181 int ctdb_lockall_prio(struct ctdb_context *ctdb, uint32_t priority)
182 {
183         return ctdb_db_iterator(ctdb, priority, db_lock_handler, NULL);
184 }
185
186 static int ctdb_lockall(struct ctdb_context *ctdb)
187 {
188         uint32_t priority;
189
190         for (priority=1; priority<=NUM_DB_PRIORITIES; priority++) {
191                 if (ctdb_db_iterator(ctdb, priority, db_lock_handler, NULL) != 0) {
192                         return -1;
193                 }
194         }
195
196         return 0;
197 }
198
199
200 /*
201  * unlock all databases
202  */
203 static int db_unlock_handler(struct ctdb_db_context *ctdb_db, uint32_t priority,
204                              void *private_data)
205 {
206         if (priority == 0) {
207                 DEBUG(DEBUG_INFO, ("unlocking database %s\n",
208                                    ctdb_db->db_name));
209         } else {
210                 DEBUG(DEBUG_INFO, ("unlocking database %s, priority:%u\n",
211                                    ctdb_db->db_name, priority));
212         }
213
214         if (tdb_unlockall(ctdb_db->ltdb->tdb) != 0) {
215                 DEBUG(DEBUG_ERR, ("Failed to unlock database %s\n",
216                                   ctdb_db->db_name));
217                 return -1;
218         }
219
220         return 0;
221 }
222
223 int ctdb_unlockall_prio(struct ctdb_context *ctdb, uint32_t priority)
224 {
225         return ctdb_db_iterator(ctdb, priority, db_unlock_handler, NULL);
226 }
227
228 static int ctdb_unlockall(struct ctdb_context *ctdb)
229 {
230         uint32_t priority;
231
232         for (priority=NUM_DB_PRIORITIES; priority>=0; priority--) {
233                 if (ctdb_db_iterator(ctdb, priority, db_unlock_handler, NULL) != 0) {
234                         return -1;
235                 }
236         }
237
238         return 0;
239 }
240
241
242 /*
243  * lock all databases - mark only
244  */
245 int ctdb_lockall_mark_prio(struct ctdb_context *ctdb, uint32_t priority)
246 {
247         struct ctdb_db_context *ctdb_db;
248         int tdb_transaction_write_lock_mark(struct tdb_context *);
249
250         /*
251          * This function is only used by the main dameon during recovery.
252          * At this stage, the databases have already been locked, by a
253          * dedicated child process. The freeze_mode variable is used to track
254          * whether the actual locks are held by the child process or not.
255          */
256
257         if (ctdb->freeze_mode[priority] != CTDB_FREEZE_FROZEN) {
258                 DEBUG(DEBUG_ERR, ("Attempt to mark all databases locked when not frozen\n"));
259                 return -1;
260         }
261
262         for (ctdb_db = ctdb->db_list; ctdb_db; ctdb_db = ctdb_db->next) {
263                 if (ctdb_db->priority != priority) {
264                         continue;
265                 }
266                 if (later_db(ctdb, ctdb_db->db_name)) {
267                         continue;
268                 }
269                 if (tdb_transaction_write_lock_mark(ctdb_db->ltdb->tdb) != 0) {
270                         return -1;
271                 }
272                 if (tdb_lockall_mark(ctdb_db->ltdb->tdb) != 0) {
273                         /* FIXME: Shouldn't we unmark here? */
274                         return -1;
275                 }
276         }
277
278         /* If priority != 1, later_db check is not required and can return */
279         if (priority != 1) {
280                 return 0;
281         }
282
283         for (ctdb_db = ctdb->db_list; ctdb_db; ctdb_db = ctdb_db->next) {
284                 if (!later_db(ctdb, ctdb_db->db_name)) {
285                         continue;
286                 }
287                 if (tdb_transaction_write_lock_mark(ctdb_db->ltdb->tdb) != 0) {
288                         return -1;
289                 }
290                 if (tdb_lockall_mark(ctdb_db->ltdb->tdb) != 0) {
291                         /* FIXME: Shouldn't we unmark here? */
292                         return -1;
293                 }
294         }
295
296         return 0;
297 }
298
299 static int ctdb_lockall_mark(struct ctdb_context *ctdb)
300 {
301         uint32_t priority;
302
303         for (priority=1; priority<=NUM_DB_PRIORITIES; priority++) {
304                 if (ctdb_lockall_mark_prio(ctdb, priority) != 0) {
305                         return -1;
306                 }
307         }
308
309         return 0;
310 }
311
312
313 /*
314  * lock all databases - unmark only
315  */
316 int ctdb_lockall_unmark_prio(struct ctdb_context *ctdb, uint32_t priority)
317 {
318         struct ctdb_db_context *ctdb_db;
319         int tdb_transaction_write_lock_unmark(struct tdb_context *);
320
321         /*
322          * This function is only used by the main dameon during recovery.
323          * At this stage, the databases have already been locked, by a
324          * dedicated child process. The freeze_mode variable is used to track
325          * whether the actual locks are held by the child process or not.
326          */
327
328         if (ctdb->freeze_mode[priority] != CTDB_FREEZE_FROZEN) {
329                 DEBUG(DEBUG_ERR, ("Attempt to unmark all databases locked when not frozen\n"));
330                 return -1;
331         }
332
333         for (ctdb_db = ctdb->db_list; ctdb_db; ctdb_db = ctdb_db->next) {
334                 if (ctdb_db->priority != priority) {
335                         continue;
336                 }
337                 if (tdb_transaction_write_lock_unmark(ctdb_db->ltdb->tdb) != 0) {
338                         return -1;
339                 }
340                 if (tdb_lockall_unmark(ctdb_db->ltdb->tdb) != 0) {
341                         return -1;
342                 }
343         }
344
345         return 0;
346 }
347
348 static int ctdb_lockall_unmark(struct ctdb_context *ctdb)
349 {
350         uint32_t priority;
351
352         for (priority=NUM_DB_PRIORITIES; priority>=0; priority--) {
353                 if (ctdb_lockall_unmark_prio(ctdb, priority) != 0) {
354                         return -1;
355                 }
356         }
357
358         return 0;
359 }
360
361
362 /*
363  * Lock record / db depending on lock_ctx->type
364  * Called from child context.
365  */
366 static bool ctdb_lock_item(struct lock_context *lock_ctx)
367 {
368         bool status = false;
369
370         switch (lock_ctx->type) {
371         case LOCK_RECORD:
372                 if (tdb_chainlock(lock_ctx->ctdb_db->ltdb->tdb, lock_ctx->key) == 0) {
373                         status = true;
374                 }
375                 break;
376
377         case LOCK_DB:
378                 if (tdb_lockall(lock_ctx->ctdb_db->ltdb->tdb) == 0) {
379                         status = true;
380                 }
381                 break;
382
383         case LOCK_ALLDB_PRIO:
384                 if (ctdb_lockall_prio(lock_ctx->ctdb, lock_ctx->priority) == 0) {
385                         status = true;
386                 }
387                 break;
388
389         case LOCK_ALLDB:
390                 if (ctdb_lockall(lock_ctx->ctdb) == 0) {
391                         status = true;
392                 }
393                 break;
394         }
395
396         return status;
397 }
398
399
400 /*
401  * Unlock record / db depending on lock_ctx->type
402  */
403 void ctdb_unlock_item(struct lock_context *lock_ctx)
404 {
405         switch (lock_ctx->type) {
406         case LOCK_RECORD:
407                 tdb_chainunlock(lock_ctx->ctdb_db->ltdb->tdb, lock_ctx->key);
408                 break;
409
410         case LOCK_DB:
411                 tdb_unlockall(lock_ctx->ctdb_db->ltdb->tdb);
412                 break;
413
414         case LOCK_ALLDB_PRIO:
415                 ctdb_unlockall_prio(lock_ctx->ctdb, lock_ctx->priority);
416                 break;
417
418         case LOCK_ALLDB:
419                 ctdb_unlockall(lock_ctx->ctdb);
420                 break;
421         }
422 }
423
424 static void ctdb_lock_schedule(struct ctdb_context *ctdb);
425
426 /*
427  * Destructor to kill the child locking process
428  */
429 static int ctdb_lock_context_destructor(struct lock_context *lock_ctx)
430 {
431         if (lock_ctx->child > 0) {
432                 ctdb_kill(lock_ctx->ctdb, lock_ctx->child, SIGKILL);
433                 DLIST_REMOVE(lock_ctx->ctdb->lock_current, lock_ctx);
434                 lock_ctx->ctdb->lock_num_current--;
435                 CTDB_DECREMENT_STAT(lock_ctx->ctdb, locks.num_current);
436                 if (lock_ctx->type == LOCK_RECORD || lock_ctx->type == LOCK_DB) {
437                         CTDB_DECREMENT_DB_STAT(lock_ctx->ctdb_db, locks.num_current);
438                 }
439         } else {
440                 DLIST_REMOVE(lock_ctx->ctdb->lock_pending, lock_ctx);
441                 lock_ctx->ctdb->lock_num_pending--;
442                 CTDB_DECREMENT_STAT(lock_ctx->ctdb, locks.num_pending);
443                 if (lock_ctx->type == LOCK_RECORD || lock_ctx->type == LOCK_DB) {
444                         CTDB_DECREMENT_DB_STAT(lock_ctx->ctdb_db, locks.num_pending);
445                 }
446         }
447
448         ctdb_lock_schedule(lock_ctx->ctdb);
449
450         return 0;
451 }
452
453
454 /*
455  * Destructor to remove lock request
456  */
457 static int ctdb_lock_request_destructor(struct lock_request *lock_request)
458 {
459         DLIST_REMOVE(lock_request->lctx->req_queue, lock_request);
460         return 0;
461 }
462
463
464 void ctdb_lock_free_request_context(struct lock_request *lock_req)
465 {
466         struct lock_context *lock_ctx;
467
468         lock_ctx = lock_req->lctx;
469         talloc_free(lock_req);
470         talloc_free(lock_ctx);
471 }
472
473
474 /*
475  * Process all the callbacks waiting for lock
476  *
477  * If lock has failed, callback is executed with locked=false
478  */
479 static void process_callbacks(struct lock_context *lock_ctx, bool locked)
480 {
481         struct lock_request *request, *next;
482
483         if (lock_ctx->auto_mark && locked) {
484                 switch (lock_ctx->type) {
485                 case LOCK_RECORD:
486                         tdb_chainlock_mark(lock_ctx->ctdb_db->ltdb->tdb, lock_ctx->key);
487                         break;
488
489                 case LOCK_DB:
490                         tdb_lockall_mark(lock_ctx->ctdb_db->ltdb->tdb);
491                         break;
492
493                 case LOCK_ALLDB_PRIO:
494                         ctdb_lockall_mark_prio(lock_ctx->ctdb, lock_ctx->priority);
495                         break;
496
497                 case LOCK_ALLDB:
498                         ctdb_lockall_mark(lock_ctx->ctdb);
499                         break;
500                 }
501         }
502
503         /* Iterate through all callbacks */
504         request = lock_ctx->req_queue;
505         while (request) {
506                 if (lock_ctx->auto_mark) {
507                         /* Reset the destructor, so request is not removed from the list */
508                         talloc_set_destructor(request, NULL);
509                 }
510
511                 /* In case, callback frees the request, store next */
512                 next = request->next;
513                 request->callback(request->private_data, locked);
514                 request = next;
515         }
516
517         if (lock_ctx->auto_mark && locked) {
518                 switch (lock_ctx->type) {
519                 case LOCK_RECORD:
520                         tdb_chainlock_unmark(lock_ctx->ctdb_db->ltdb->tdb, lock_ctx->key);
521                         break;
522
523                 case LOCK_DB:
524                         tdb_lockall_unmark(lock_ctx->ctdb_db->ltdb->tdb);
525                         break;
526
527                 case LOCK_ALLDB_PRIO:
528                         ctdb_lockall_unmark_prio(lock_ctx->ctdb, lock_ctx->priority);
529                         break;
530
531                 case LOCK_ALLDB:
532                         ctdb_lockall_unmark(lock_ctx->ctdb);
533                         break;
534                 }
535         }
536 }
537
538
539 static int lock_bucket_id(double t)
540 {
541         double us = 1.e-6, ms = 1.e-3, s = 1;
542         int id;
543
544         if (t < 1*us) {
545                 id = 0;
546         } else if (t < 10*us) {
547                 id = 1;
548         } else if (t < 100*us) {
549                 id = 2;
550         } else if (t < 1*ms) {
551                 id = 3;
552         } else if (t < 10*ms) {
553                 id = 4;
554         } else if (t < 100*ms) {
555                 id = 5;
556         } else if (t < 1*s) {
557                 id = 6;
558         } else if (t < 10*s) {
559                 id = 7;
560         } else {
561                 id = 8;
562         }
563
564         return id;
565 }
566
567 /*
568  * Callback routine when the required locks are obtained.
569  * Called from parent context
570  */
571 static void ctdb_lock_handler(struct tevent_context *ev,
572                             struct tevent_fd *tfd,
573                             uint16_t flags,
574                             void *private_data)
575 {
576         struct lock_context *lock_ctx;
577         TALLOC_CTX *tmp_ctx;
578         char c;
579         bool locked;
580         double t;
581         int id;
582
583         lock_ctx = talloc_get_type_abort(private_data, struct lock_context);
584
585         /* cancel the timeout event */
586         if (lock_ctx->ttimer) {
587                 TALLOC_FREE(lock_ctx->ttimer);
588         }
589
590         t = timeval_elapsed(&lock_ctx->start_time);
591         id = lock_bucket_id(t);
592
593         if (lock_ctx->auto_mark) {
594                 tmp_ctx = talloc_new(ev);
595                 talloc_steal(tmp_ctx, lock_ctx);
596         }
597
598         /* Read the status from the child process */
599         read(lock_ctx->fd[0], &c, 1);
600         locked = (c == 0 ? true : false);
601
602         /* Update statistics */
603         CTDB_DECREMENT_STAT(lock_ctx->ctdb, locks.num_pending);
604         CTDB_INCREMENT_STAT(lock_ctx->ctdb, locks.num_calls);
605         if (lock_ctx->ctdb_db) {
606                 CTDB_DECREMENT_DB_STAT(lock_ctx->ctdb_db, locks.num_pending);
607                 CTDB_INCREMENT_DB_STAT(lock_ctx->ctdb_db, locks.num_calls);
608         }
609
610         if (locked) {
611                 CTDB_INCREMENT_STAT(lock_ctx->ctdb, locks.num_current);
612                 CTDB_INCREMENT_STAT(lock_ctx->ctdb, locks.buckets[id]);
613                 if (lock_ctx->ctdb_db) {
614                         CTDB_INCREMENT_DB_STAT(lock_ctx->ctdb_db, locks.num_current);
615                         CTDB_UPDATE_DB_LATENCY(lock_ctx->ctdb_db, lock_type_str[lock_ctx->type], locks.latency, t);
616                         CTDB_INCREMENT_DB_STAT(lock_ctx->ctdb_db, locks.buckets[id]);
617                 }
618         } else {
619                 CTDB_INCREMENT_STAT(lock_ctx->ctdb, locks.num_failed);
620                 if (lock_ctx->ctdb_db) {
621                         CTDB_INCREMENT_DB_STAT(lock_ctx->ctdb_db, locks.num_failed);
622                 }
623         }
624
625         process_callbacks(lock_ctx, locked);
626
627         if (lock_ctx->auto_mark) {
628                 talloc_free(tmp_ctx);
629         }
630 }
631
632
633 static void ctdb_lock_find_blocker(struct lock_context *lock_ctx);
634
635 /*
636  * Callback routine when required locks are not obtained within timeout
637  * Called from parent context
638  */
639 static void ctdb_lock_timeout_handler(struct tevent_context *ev,
640                                     struct tevent_timer *ttimer,
641                                     struct timeval current_time,
642                                     void *private_data)
643 {
644         struct lock_context *lock_ctx;
645         struct ctdb_context *ctdb;
646
647         lock_ctx = talloc_get_type_abort(private_data, struct lock_context);
648         ctdb = lock_ctx->ctdb;
649
650         /* fire a child process to find the blocking process */
651         if (lock_ctx->block_child == -1) {
652                 ctdb_lock_find_blocker(lock_ctx);
653         }
654
655         /* reset the timeout timer */
656         // talloc_free(lock_ctx->ttimer);
657         lock_ctx->ttimer = tevent_add_timer(ctdb->ev,
658                                             lock_ctx,
659                                             timeval_current_ofs(10, 0),
660                                             ctdb_lock_timeout_handler,
661                                             (void *)lock_ctx);
662 }
663
664
665 static char *lock_child_log_prefix(struct lock_context *lock_ctx)
666 {
667         char *prefix;
668         pid_t pid;
669
670         pid = getpid();
671
672         switch (lock_ctx->type) {
673         case LOCK_RECORD:
674                 prefix = talloc_asprintf(NULL, "lockR(%d): ", pid);
675                 break;
676
677         case LOCK_DB:
678                 prefix = talloc_asprintf(NULL, "lockD(%d): ", pid);
679                 break;
680
681         case LOCK_ALLDB_PRIO:
682                 prefix = talloc_asprintf(NULL, "lockP(%d): ", pid);
683                 break;
684
685         case LOCK_ALLDB:
686                 prefix = talloc_asprintf(NULL, "lockA(%d): ", pid);
687                 break;
688         }
689
690         return prefix;
691 }
692
693
694 /*
695  * Schedule a new lock child process
696  * Set up callback handler and timeout handler
697  */
698 static void ctdb_lock_schedule(struct ctdb_context *ctdb)
699 {
700         struct lock_context *lock_ctx, *next_ctx;
701         int ret;
702         pid_t parent;
703
704         if (ctdb->lock_num_current >= MAX_LOCK_PROCESSES_PER_DB) {
705                 return;
706         }
707
708         if (ctdb->lock_pending == NULL) {
709                 return;
710         }
711
712         /* Find a lock context with requests */
713         lock_ctx = ctdb->lock_pending;
714         while (lock_ctx != NULL) {
715                 if (! lock_ctx->req_queue) {
716                         next_ctx = lock_ctx->next;
717                         DEBUG(DEBUG_INFO, ("Removing lock context without lock requests\n"));
718                         DLIST_REMOVE(ctdb->lock_pending, lock_ctx);
719                         ctdb->lock_num_pending--;
720                         CTDB_DECREMENT_STAT(ctdb, locks.num_pending);
721                         if (lock_ctx->ctdb_db) {
722                                 CTDB_DECREMENT_DB_STAT(lock_ctx->ctdb_db, locks.num_pending);
723                         }
724                         talloc_free(lock_ctx);
725                         lock_ctx = next_ctx;
726                         continue;
727                 } else {
728                         /* Found a lock context with lock requests */
729                         break;
730                 }
731         }
732
733         if (lock_ctx == NULL) {
734                 return;
735         }
736
737         lock_ctx->child = -1;
738         ret = pipe(lock_ctx->fd);
739         if (ret != 0) {
740                 DEBUG(DEBUG_ERR, ("Failed to create pipe in ctdb_lock_schedule\n"));
741                 return;
742         }
743
744         parent = getpid();
745         lock_ctx->child = ctdb_fork(ctdb);
746
747         if (lock_ctx->child == (pid_t)-1) {
748                 DEBUG(DEBUG_ERR, ("Failed to create a child in ctdb_lock_schedule\n"));
749                 close(lock_ctx->fd[0]);
750                 close(lock_ctx->fd[1]);
751                 return;
752         }
753
754         /* Child process */
755         if (lock_ctx->child == 0) {
756                 char c;
757                 close(lock_ctx->fd[0]);
758                 debug_extra = lock_child_log_prefix(lock_ctx);
759                 if (ctdb_lock_item(lock_ctx)) {
760                         c = 0;
761                 } else {
762                         c = 1;
763                 }
764                 write(lock_ctx->fd[1], &c, 1);
765
766                 /* Hang around, but if parent dies, terminate */
767                 while (kill(parent, 0) == 0 || errno != ESRCH) {
768                         sleep(5);
769                 }
770                 _exit(0);
771         }
772
773         /* Parent process */
774         close(lock_ctx->fd[1]);
775         set_close_on_exec(lock_ctx->fd[0]);
776
777         talloc_set_destructor(lock_ctx, ctdb_lock_context_destructor);
778
779         /* Set up timeout handler */
780         lock_ctx->ttimer = tevent_add_timer(ctdb->ev,
781                                             lock_ctx,
782                                             timeval_current_ofs(10, 0),
783                                             ctdb_lock_timeout_handler,
784                                             (void *)lock_ctx);
785         if (lock_ctx->ttimer == NULL) {
786                 ctdb_kill(ctdb, lock_ctx->child, SIGKILL);
787                 lock_ctx->child = -1;
788                 talloc_set_destructor(lock_ctx, NULL);
789                 close(lock_ctx->fd[0]);
790                 return;
791         }
792
793         /* Set up callback */
794         lock_ctx->tfd = tevent_add_fd(ctdb->ev,
795                                       lock_ctx,
796                                       lock_ctx->fd[0],
797                                       EVENT_FD_READ,
798                                       ctdb_lock_handler,
799                                       (void *)lock_ctx);
800         if (lock_ctx->tfd == NULL) {
801                 TALLOC_FREE(lock_ctx->ttimer);
802                 ctdb_kill(ctdb, lock_ctx->child, SIGKILL);
803                 lock_ctx->child = -1;
804                 talloc_set_destructor(lock_ctx, NULL);
805                 close(lock_ctx->fd[0]);
806                 return;
807         }
808         tevent_fd_set_auto_close(lock_ctx->tfd);
809
810         /* Move the context from pending to current */
811         DLIST_REMOVE(ctdb->lock_pending, lock_ctx);
812         ctdb->lock_num_pending--;
813         DLIST_ADD_END(ctdb->lock_current, lock_ctx, NULL);
814         ctdb->lock_num_current++;
815 }
816
817
818 /*
819  * Find the lock context of a given type
820  */
821 static struct lock_context *find_lock_context(struct lock_context *lock_list,
822                                               struct ctdb_db_context *ctdb_db,
823                                               TDB_DATA key,
824                                               uint32_t priority,
825                                               enum lock_type type)
826 {
827         struct lock_context *lock_ctx;
828
829         /* Search active locks */
830         for (lock_ctx=lock_list; lock_ctx; lock_ctx=lock_ctx->next) {
831                 if (lock_ctx->type != type) {
832                         continue;
833                 }
834
835                 switch (lock_ctx->type) {
836                 case LOCK_RECORD:
837                         if (ctdb_db == lock_ctx->ctdb_db &&
838                             key.dsize == lock_ctx->key.dsize &&
839                             memcmp(key.dptr, lock_ctx->key.dptr, key.dsize) == 0) {
840                                 goto done;
841                         }
842                         break;
843
844                 case LOCK_DB:
845                         if (ctdb_db == lock_ctx->ctdb_db) {
846                                 goto done;
847                         }
848                         break;
849
850                 case LOCK_ALLDB_PRIO:
851                         if (priority == lock_ctx->priority) {
852                                 goto done;
853                         }
854                         break;
855
856                 case LOCK_ALLDB:
857                         goto done;
858                         break;
859                 }
860         }
861
862         /* Did not find the lock context we are searching for */
863         lock_ctx = NULL;
864
865 done:
866         return lock_ctx;
867
868 }
869
870
871 /*
872  * Lock record / db depending on type
873  */
874 static struct lock_request *ctdb_lock_internal(struct ctdb_context *ctdb,
875                                                struct ctdb_db_context *ctdb_db,
876                                                TDB_DATA key,
877                                                uint32_t priority,
878                                                void (*callback)(void *, bool),
879                                                void *private_data,
880                                                enum lock_type type,
881                                                bool auto_mark)
882 {
883         struct lock_context *lock_ctx;
884         struct lock_request *request;
885
886         if (callback == NULL) {
887                 DEBUG(DEBUG_WARNING, ("No callback function specified, not locking\n"));
888                 return NULL;
889         }
890
891         /* get a context for this key - search only the pending contexts,
892          * current contexts might in the middle of processing callbacks */
893         lock_ctx = find_lock_context(ctdb->lock_pending, ctdb_db, key, priority, type);
894
895         /* No existing context, create one */
896         if (lock_ctx == NULL) {
897                 lock_ctx = talloc_zero(ctdb, struct lock_context);
898                 if (lock_ctx == NULL) {
899                         DEBUG(DEBUG_ERR, ("Failed to create a new lock context\n"));
900                         return NULL;
901                 }
902
903                 lock_ctx->type = type;
904                 lock_ctx->ctdb = ctdb;
905                 lock_ctx->ctdb_db = ctdb_db;
906                 lock_ctx->key.dsize = key.dsize;
907                 if (key.dsize > 0) {
908                         lock_ctx->key.dptr = talloc_memdup(lock_ctx, key.dptr, key.dsize);
909                 } else {
910                         lock_ctx->key.dptr = NULL;
911                 }
912                 lock_ctx->priority = priority;
913                 lock_ctx->auto_mark = auto_mark;
914
915                 lock_ctx->child = -1;
916                 lock_ctx->block_child = -1;
917
918                 DLIST_ADD_END(ctdb->lock_pending, lock_ctx, NULL);
919                 ctdb->lock_num_pending++;
920                 CTDB_INCREMENT_STAT(ctdb, locks.num_pending);
921                 if (ctdb_db) {
922                         CTDB_INCREMENT_DB_STAT(ctdb_db, locks.num_pending);
923                 }
924
925                 /* Start the timer when we activate the context */
926                 lock_ctx->start_time = timeval_current();
927         }
928
929         if ((request = talloc_zero(lock_ctx, struct lock_request)) == NULL) {
930                 return NULL;
931         }
932
933         request->lctx = lock_ctx;
934         request->callback = callback;
935         request->private_data = private_data;
936
937         talloc_set_destructor(request, ctdb_lock_request_destructor);
938         DLIST_ADD_END(lock_ctx->req_queue, request, NULL);
939
940         ctdb_lock_schedule(ctdb);
941
942         return request;
943 }
944
945
946 /*
947  * obtain a lock on a record in a database
948  */
949 struct lock_request *ctdb_lock_record(struct ctdb_db_context *ctdb_db,
950                                       TDB_DATA key,
951                                       bool auto_mark,
952                                       void (*callback)(void *, bool),
953                                       void *private_data)
954 {
955         return ctdb_lock_internal(ctdb_db->ctdb,
956                                   ctdb_db,
957                                   key,
958                                   0,
959                                   callback,
960                                   private_data,
961                                   LOCK_RECORD,
962                                   auto_mark);
963 }
964
965
966 /*
967  * obtain a lock on a database
968  */
969 struct lock_request *ctdb_lock_db(struct ctdb_db_context *ctdb_db,
970                                   bool auto_mark,
971                                   void (*callback)(void *, bool),
972                                   void *private_data)
973 {
974         return ctdb_lock_internal(ctdb_db->ctdb,
975                                   ctdb_db,
976                                   tdb_null,
977                                   0,
978                                   callback,
979                                   private_data,
980                                   LOCK_DB,
981                                   auto_mark);
982 }
983
984
985 /*
986  * obtain locks on all databases of specified priority
987  */
988 struct lock_request *ctdb_lock_alldb_prio(struct ctdb_context *ctdb,
989                                           uint32_t priority,
990                                           bool auto_mark,
991                                           void (*callback)(void *, bool),
992                                           void *private_data)
993 {
994         if (priority < 0 || priority > NUM_DB_PRIORITIES) {
995                 DEBUG(DEBUG_ERR, ("Invalid db priority: %u\n", priority));
996                 return NULL;
997         }
998
999         return ctdb_lock_internal(ctdb,
1000                                   NULL,
1001                                   tdb_null,
1002                                   priority,
1003                                   callback,
1004                                   private_data,
1005                                   LOCK_ALLDB_PRIO,
1006                                   auto_mark);
1007 }
1008
1009
1010 /*
1011  * obtain locks on all databases
1012  */
1013 struct lock_request *ctdb_lock_alldb(struct ctdb_context *ctdb,
1014                                      bool auto_mark,
1015                                      void (*callback)(void *, bool),
1016                                      void *private_data)
1017 {
1018         return ctdb_lock_internal(ctdb,
1019                                   NULL,
1020                                   tdb_null,
1021                                   0,
1022                                   callback,
1023                                   private_data,
1024                                   LOCK_ALLDB,
1025                                   auto_mark);
1026 }
1027
1028 /*
1029  * Callback routine to read the PID of blocking process from the child and log
1030  *
1031  */
1032 void ctdb_lock_blocked_handler(struct tevent_context *ev,
1033                                 struct tevent_fd *tfd,
1034                                 uint16_t flags,
1035                                 void *private_data)
1036 {
1037         struct lock_context *lock_ctx;
1038         pid_t blocker_pid = -1;
1039         char *process_name = NULL;
1040         const char *db_name = NULL;
1041         ino_t inode;
1042         struct ctdb_db_context *ctdb_db;
1043         int fd;
1044         struct stat stat_buf;
1045
1046         lock_ctx = talloc_get_type_abort(private_data, struct lock_context);
1047
1048         if (read(lock_ctx->block_fd[0], &blocker_pid, sizeof(blocker_pid)) != sizeof(blocker_pid)) {
1049                 DEBUG(DEBUG_ERR, ("Error reading blocker process pid from child\n"));
1050                 goto failed;
1051         }
1052         if (read(lock_ctx->block_fd[0], &inode, sizeof(inode)) != sizeof(inode)) {
1053                 DEBUG(DEBUG_ERR, ("Error reading blocked inode from child\n"));
1054                 goto failed;
1055         }
1056
1057         if (blocker_pid < 0) {
1058                 goto failed;
1059         }
1060
1061         process_name = ctdb_get_process_name(blocker_pid);
1062
1063         if (lock_ctx->type == LOCK_RECORD || lock_ctx->type == LOCK_DB) {
1064                 db_name = lock_ctx->ctdb_db->ltdb->name;
1065         } else {
1066                 for (ctdb_db = lock_ctx->ctdb->db_list; ctdb_db; ctdb_db = ctdb_db->next) {
1067                         fd = tdb_fd(ctdb_db->ltdb->tdb);
1068                         if (fstat(fd, &stat_buf) == 0) {
1069                                 if (stat_buf.st_ino == inode) {
1070                                         db_name = ctdb_db->ltdb->name;
1071                                         break;
1072                                 }
1073                         }
1074                 }
1075         }
1076
1077         if (db_name) {
1078                 DEBUG(DEBUG_WARNING,
1079                       ("Process (pid=%d) blocked in locking\n", lock_ctx->child));
1080                 DEBUG(DEBUG_WARNING,
1081                       ("Process %s (pid=%d) locked database %s (inode %lu) for %.0lf seconds\n",
1082                        (process_name ? process_name : "unknown"),
1083                        blocker_pid, db_name, (unsigned long)inode,
1084                        timeval_elapsed(&lock_ctx->start_time)));
1085         } else {
1086                 DEBUG(DEBUG_WARNING,
1087                       ("Process %s (pid=%d) locked database (inode %lu) for %.0lf seconds\n",
1088                        (process_name ? process_name : "unknown"),
1089                        blocker_pid, (unsigned long)inode,
1090                        timeval_elapsed(&lock_ctx->start_time)));
1091         }
1092
1093         /*
1094          * If ctdb is blocked by smbd for deadlock_interval, detect it as a deadlock
1095          * and kill smbd process.
1096          */
1097         if (lock_ctx->ctdb->tunable.deadlock_timeout > 0 &&
1098             timeval_elapsed(&lock_ctx->start_time) > lock_ctx->ctdb->tunable.deadlock_timeout &&
1099             process_name && strstr(process_name, "smbd")) {
1100                 DEBUG(DEBUG_WARNING,
1101                       ("Deadlock detected. Killing smbd process (pid=%d)", blocker_pid));
1102                 kill(blocker_pid, SIGKILL);
1103         }
1104
1105         free(process_name);
1106
1107 failed:
1108         if (lock_ctx->block_child > 0) {
1109                 ctdb_kill(lock_ctx->ctdb, lock_ctx->block_child, SIGKILL);
1110         }
1111         lock_ctx->block_child = -1;
1112         talloc_free(tfd);
1113 }
1114
1115
1116 /*
1117  * Find processes that holds lock we are interested in
1118  */
1119 void ctdb_lock_find_blocker(struct lock_context *lock_ctx)
1120 {
1121         struct tevent_fd *tfd;
1122         pid_t parent;
1123
1124         if (pipe(lock_ctx->block_fd) < 0) {
1125                 return;
1126         }
1127
1128         parent = getpid();
1129
1130         lock_ctx->block_child = ctdb_fork(lock_ctx->ctdb);
1131         if (lock_ctx->block_child == -1) {
1132                 close(lock_ctx->block_fd[0]);
1133                 close(lock_ctx->block_fd[1]);
1134                 return;
1135         }
1136
1137         /* Child process */
1138         if (lock_ctx->block_child == 0) {
1139                 struct ctdb_lock_info reqlock;
1140                 pid_t blocker_pid = -1;
1141                 bool status;
1142
1143                 close(lock_ctx->block_fd[0]);
1144                 if (ctdb_get_lock_info(lock_ctx->child, &reqlock)) {
1145                         status = ctdb_get_blocker_pid(&reqlock, &blocker_pid);
1146                         if (!status) {
1147                                 /* Could not find blocker pid */
1148                                 blocker_pid = -2;
1149                         }
1150                 }
1151                 write(lock_ctx->block_fd[1], &blocker_pid, sizeof(blocker_pid));
1152                 write(lock_ctx->block_fd[1], &reqlock.inode, sizeof(reqlock.inode));
1153
1154                 /* Hang around till parent dies */
1155                 while (kill(parent, 0) == 0 || errno != ESRCH) {
1156                         sleep(5);
1157                 }
1158                 _exit(0);
1159         }
1160
1161         /* Parent process */
1162         close(lock_ctx->block_fd[1]);
1163         set_close_on_exec(lock_ctx->block_fd[0]);
1164
1165         tfd = tevent_add_fd(lock_ctx->ctdb->ev,
1166                                 lock_ctx,
1167                                 lock_ctx->block_fd[0],
1168                                 EVENT_FD_READ,
1169                                 ctdb_lock_blocked_handler,
1170                                 (void *)lock_ctx);
1171         if (tfd == NULL) {
1172                 ctdb_kill(lock_ctx->ctdb, lock_ctx->block_child, SIGKILL);
1173                 close(lock_ctx->block_fd[0]);
1174         }
1175 }