2 Unix SMB/CIFS implementation.
4 generic byte range locking code - ctdb backend
6 Copyright (C) Andrew Tridgell 2006
8 This program is free software; you can redistribute it and/or modify
9 it under the terms of the GNU General Public License as published by
10 the Free Software Foundation; either version 2 of the License, or
11 (at your option) any later version.
13 This program is distributed in the hope that it will be useful,
14 but WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 GNU General Public License for more details.
18 You should have received a copy of the GNU General Public License
19 along with this program; if not, write to the Free Software
20 Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
24 #include "system/filesys.h"
25 #include "lib/tdb/include/tdb.h"
26 #include "messaging/messaging.h"
28 #include "lib/messaging/irpc.h"
29 #include "libcli/libcli.h"
30 #include "cluster/cluster.h"
31 #include "ntvfs/ntvfs.h"
32 #include "ntvfs/common/brlock.h"
33 #include "include/ctdb.h"
35 enum my_functions {FUNC_BRL_LOCK=1, FUNC_BRL_UNLOCK=2,
36 FUNC_BRL_REMOVE_PENDING=3, FUNC_BRL_LOCKTEST=4,
40 in this module a "DATA_BLOB *file_key" is a blob that uniquely identifies
41 a file. For a local posix filesystem this will usually be a combination
42 of the device and inode numbers of the file, but it can be anything
43 that uniquely idetifies a file for locking purposes, as long
44 as it is applied consistently.
47 /* this struct is typically attached to tcon */
49 struct ctdb_context *ctdb;
50 struct ctdb_db_context *ctdb_db;
51 struct server_id server;
52 struct messaging_context *messaging_ctx;
56 the lock context contains the elements that define whether one
57 lock is the same as another lock
60 struct server_id server;
62 struct brl_context *ctx;
65 /* The data in brlock records is an unsorted linear array of these
66 records. It is unnecessary to store the count as tdb provides the
69 struct lock_context context;
70 struct ntvfs_handle *ntvfs;
73 enum brl_type lock_type;
77 /* this struct is attached to on open file handle */
80 struct ntvfs_handle *ntvfs;
81 struct lock_struct last_lock;
85 static void show_locks(const char *op, struct lock_struct *locks, int count)
88 DEBUG(0,("OP: %s\n", op));
89 if (locks == NULL) return;
90 for (i=0;i<count;i++) {
91 DEBUG(0,("%2d: %4d %4d %d.%d.%d %p %p\n",
92 i, (int)locks[i].start, (int)locks[i].size,
93 locks[i].context.server.node,
94 locks[i].context.server.id,
95 locks[i].context.smbpid,
103 Open up the brlock.tdb database. Close it down using
104 talloc_free(). We need the messaging_ctx to allow for
105 pending lock notifications.
107 static struct brl_context *brl_ctdb_init(TALLOC_CTX *mem_ctx, struct server_id server,
108 struct messaging_context *messaging_ctx)
110 struct ctdb_context *ctdb = talloc_get_type(cluster_backend_handle(),
111 struct ctdb_context);
112 struct brl_context *brl;
114 brl = talloc(mem_ctx, struct brl_context);
120 brl->ctdb_db = ctdb_db_handle(ctdb, "brlock");
121 if (brl->ctdb_db == NULL) {
122 DEBUG(0,("Failed to get attached ctdb db handle for brlock\n"));
126 brl->server = server;
127 brl->messaging_ctx = messaging_ctx;
132 static struct brl_handle *brl_ctdb_create_handle(TALLOC_CTX *mem_ctx, struct ntvfs_handle *ntvfs,
135 struct brl_handle *brlh;
137 brlh = talloc(mem_ctx, struct brl_handle);
142 brlh->key = *file_key;
144 ZERO_STRUCT(brlh->last_lock);
150 see if two locking contexts are equal
152 static BOOL brl_ctdb_same_context(struct lock_context *ctx1, struct lock_context *ctx2)
154 return (cluster_id_equal(&ctx1->server, &ctx2->server) &&
155 ctx1->smbpid == ctx2->smbpid &&
156 ctx1->ctx == ctx2->ctx);
160 see if lck1 and lck2 overlap
162 static BOOL brl_ctdb_overlap(struct lock_struct *lck1,
163 struct lock_struct *lck2)
165 /* this extra check is not redundent - it copes with locks
166 that go beyond the end of 64 bit file space */
167 if (lck1->size != 0 &&
168 lck1->start == lck2->start &&
169 lck1->size == lck2->size) {
173 if (lck1->start >= (lck2->start+lck2->size) ||
174 lck2->start >= (lck1->start+lck1->size)) {
181 See if lock2 can be added when lock1 is in place.
183 static BOOL brl_ctdb_conflict(struct lock_struct *lck1,
184 struct lock_struct *lck2)
186 /* pending locks don't conflict with anything */
187 if (lck1->lock_type >= PENDING_READ_LOCK ||
188 lck2->lock_type >= PENDING_READ_LOCK) {
192 if (lck1->lock_type == READ_LOCK && lck2->lock_type == READ_LOCK) {
196 if (brl_ctdb_same_context(&lck1->context, &lck2->context) &&
197 lck2->lock_type == READ_LOCK && lck1->ntvfs == lck2->ntvfs) {
201 return brl_ctdb_overlap(lck1, lck2);
206 Check to see if this lock conflicts, but ignore our own locks on the
209 static BOOL brl_ctdb_conflict_other(struct lock_struct *lck1, struct lock_struct *lck2)
211 /* pending locks don't conflict with anything */
212 if (lck1->lock_type >= PENDING_READ_LOCK ||
213 lck2->lock_type >= PENDING_READ_LOCK) {
217 if (lck1->lock_type == READ_LOCK && lck2->lock_type == READ_LOCK)
221 * note that incoming write calls conflict with existing READ
222 * locks even if the context is the same. JRA. See LOCKTEST7
225 if (brl_ctdb_same_context(&lck1->context, &lck2->context) &&
226 lck1->ntvfs == lck2->ntvfs &&
227 (lck2->lock_type == READ_LOCK || lck1->lock_type == WRITE_LOCK)) {
231 return brl_ctdb_overlap(lck1, lck2);
236 amazingly enough, w2k3 "remembers" whether the last lock failure
237 is the same as this one and changes its error code. I wonder if any
240 static NTSTATUS brl_ctdb_lock_failed(struct brl_handle *brlh, struct lock_struct *lock)
243 * this function is only called for non pending lock!
246 /* in SMB2 mode always return NT_STATUS_LOCK_NOT_GRANTED! */
247 if (lock->ntvfs->ctx->protocol == PROTOCOL_SMB2) {
248 return NT_STATUS_LOCK_NOT_GRANTED;
252 * if the notify_ptr is non NULL,
253 * it means that we're at the end of a pending lock
254 * and the real lock is requested after the timeout went by
255 * In this case we need to remember the last_lock and always
256 * give FILE_LOCK_CONFLICT
258 if (lock->notify_ptr) {
259 brlh->last_lock = *lock;
260 return NT_STATUS_FILE_LOCK_CONFLICT;
264 * amazing the little things you learn with a test
265 * suite. Locks beyond this offset (as a 64 bit
266 * number!) always generate the conflict error code,
267 * unless the top bit is set
269 if (lock->start >= 0xEF000000 && (lock->start >> 63) == 0) {
270 brlh->last_lock = *lock;
271 return NT_STATUS_FILE_LOCK_CONFLICT;
275 * if the current lock matches the last failed lock on the file handle
276 * and starts at the same offset, then FILE_LOCK_CONFLICT should be returned
278 if (cluster_id_equal(&lock->context.server, &brlh->last_lock.context.server) &&
279 lock->context.ctx == brlh->last_lock.context.ctx &&
280 lock->ntvfs == brlh->last_lock.ntvfs &&
281 lock->start == brlh->last_lock.start) {
282 return NT_STATUS_FILE_LOCK_CONFLICT;
285 brlh->last_lock = *lock;
286 return NT_STATUS_LOCK_NOT_GRANTED;
289 struct ctdb_lock_req {
293 enum brl_type lock_type;
295 struct server_id server;
296 struct brl_context *brl;
297 struct ntvfs_handle *ntvfs;
301 ctdb call handling brl_lock()
303 static int brl_ctdb_lock_func(struct ctdb_call_info *call)
305 struct ctdb_lock_req *req = (struct ctdb_lock_req *)call->call_data->dptr;
308 struct lock_struct lock, *locks=NULL;
309 NTSTATUS status = NT_STATUS_OK;
311 /* if this is a pending lock, then with the chainlock held we
312 try to get the real lock. If we succeed then we don't need
313 to make it pending. This prevents a possible race condition
314 where the pending lock gets created after the lock that is
315 preventing the real lock gets removed */
316 if (req->lock_type >= PENDING_READ_LOCK) {
317 enum brl_type lock_type = req->lock_type;
318 req->lock_type = (req->lock_type==PENDING_READ_LOCK? READ_LOCK : WRITE_LOCK);
319 if (brl_ctdb_lock_func(call) == 0 && call->status == NT_STATUS_V(NT_STATUS_OK)) {
322 req->lock_type = lock_type;
325 dbuf = call->record_data;
328 lock.context.smbpid = req->smbpid;
329 lock.context.server = req->server;
330 lock.context.ctx = req->brl;
331 lock.ntvfs = req->ntvfs;
332 lock.start = req->start;
333 lock.size = req->size;
334 lock.lock_type = req->lock_type;
335 lock.notify_ptr = req->notify_ptr;
338 /* there are existing locks - make sure they don't conflict */
339 locks = (struct lock_struct *)dbuf.dptr;
340 count = dbuf.dsize / sizeof(*locks);
342 for (i=0; i<count; i++) {
343 if (brl_ctdb_conflict(&locks[i], &lock)) {
344 status = NT_STATUS_LOCK_NOT_GRANTED;
350 call->new_data = talloc(call, TDB_DATA);
351 if (call->new_data == NULL) {
352 return CTDB_ERR_NOMEM;
355 call->new_data->dptr = talloc_size(call, dbuf.dsize + sizeof(lock));
356 if (call->new_data->dptr == NULL) {
357 return CTDB_ERR_NOMEM;
359 memcpy(call->new_data->dptr, locks, dbuf.dsize);
360 memcpy(call->new_data->dptr+dbuf.dsize, &lock, sizeof(lock));
361 call->new_data->dsize = dbuf.dsize + sizeof(lock);
363 if (req->lock_type >= PENDING_READ_LOCK) {
364 status = NT_STATUS_LOCK_NOT_GRANTED;
368 call->status = NT_STATUS_V(status);
375 Lock a range of bytes. The lock_type can be a PENDING_*_LOCK, in
376 which case a real lock is first tried, and if that fails then a
377 pending lock is created. When the pending lock is triggered (by
378 someone else closing an overlapping lock range) a messaging
379 notification is sent, identified by the notify_ptr
381 static NTSTATUS brl_ctdb_lock(struct brl_context *brl,
382 struct brl_handle *brlh,
384 uint64_t start, uint64_t size,
385 enum brl_type lock_type,
388 struct ctdb_lock_req req;
389 struct ctdb_call call;
393 call.call_id = FUNC_BRL_LOCK;
394 call.key.dptr = brlh->key.data;
395 call.key.dsize = brlh->key.length;
396 call.call_data.dptr = (uint8_t *)&req;
397 call.call_data.dsize = sizeof(req);
405 req.lock_type = lock_type;
406 req.notify_ptr = notify_ptr;
407 req.server = brl->server;
409 req.ntvfs = brlh->ntvfs;
411 ret = ctdb_call(brl->ctdb_db, &call);
413 return NT_STATUS_INTERNAL_DB_CORRUPTION;
416 status = NT_STATUS(call.status);
418 if (NT_STATUS_EQUAL(status, NT_STATUS_LOCK_NOT_GRANTED)) {
419 struct lock_struct lock;
420 lock.context.smbpid = smbpid;
421 lock.context.server = brl->server;
422 lock.context.ctx = brl;
423 lock.ntvfs = brlh->ntvfs;
426 lock.lock_type = lock_type;
427 lock.notify_ptr = notify_ptr;
428 status = brl_ctdb_lock_failed(brlh, &lock);
435 we are removing a lock that might be holding up a pending lock. Scan
436 for pending locks that cover this range and if we find any then
437 notify the server that it should retry the lock. In this backend, we
438 notify by sending the list of locks that need to be notified on back
439 in the reply_data of the ctdb call. The caller then does the
442 static int brl_ctdb_notify_unlock(struct ctdb_call_info *call,
443 struct lock_struct *locks, int count,
444 struct lock_struct *removed_lock)
448 /* the last_notice logic is to prevent stampeding on a lock
449 range. It prevents us sending hundreds of notifies on the
450 same range of bytes. It doesn't prevent all possible
451 stampedes, but it does prevent the most common problem */
454 for (i=0;i<count;i++) {
455 if (locks[i].lock_type >= PENDING_READ_LOCK &&
456 brl_ctdb_overlap(&locks[i], removed_lock)) {
457 struct lock_struct *nlocks;
460 if (last_notice != -1 && brl_ctdb_overlap(&locks[i], &locks[last_notice])) {
463 if (locks[i].lock_type == PENDING_WRITE_LOCK) {
466 if (call->reply_data == NULL) {
467 call->reply_data = talloc_zero(call, TDB_DATA);
468 if (call->reply_data == NULL) {
469 return CTDB_ERR_NOMEM;
472 /* add to the list of pending locks to notify caller of */
473 ncount = call->reply_data->dsize / sizeof(struct lock_struct);
474 nlocks = talloc_realloc(call->reply_data, call->reply_data->dptr,
475 struct lock_struct, ncount + 1);
476 if (nlocks == NULL) {
477 return CTDB_ERR_NOMEM;
479 call->reply_data->dptr = (uint8_t *)nlocks;
480 nlocks[ncount] = locks[i];
481 call->reply_data->dsize += sizeof(struct lock_struct);
489 send notifications for all pending locks - the file is being closed by this
492 static int brl_ctdb_notify_all(struct ctdb_call_info *call,
493 struct lock_struct *locks, int count)
496 for (i=0;i<count;i++) {
497 if (locks->lock_type >= PENDING_READ_LOCK) {
498 int ret = brl_ctdb_notify_unlock(call, locks, count, &locks[i]);
499 if (ret != 0) return ret;
506 send off any messages needed to notify of pending locks that should now retry
508 static void brl_ctdb_notify_send(struct brl_context *brl, TDB_DATA *reply_data)
510 struct lock_struct *locks = (struct lock_struct *)reply_data->dptr;
511 int i, count = reply_data->dsize / sizeof(struct lock_struct);
512 for (i=0;i<count;i++) {
513 messaging_send_ptr(brl->messaging_ctx, locks[i].context.server,
514 MSG_BRL_RETRY, locks[i].notify_ptr);
519 struct ctdb_unlock_req {
523 struct server_id server;
524 struct brl_context *brl;
525 struct ntvfs_handle *ntvfs;
529 Unlock a range of bytes.
531 static int brl_ctdb_unlock_func(struct ctdb_call_info *call)
533 struct ctdb_unlock_req *req = (struct ctdb_unlock_req *)call->call_data->dptr;
536 struct lock_struct *locks, *lock;
537 struct lock_context context;
538 NTSTATUS status = NT_STATUS_OK;
540 dbuf = call->record_data;
542 context.smbpid = req->smbpid;
543 context.server = req->server;
544 context.ctx = req->brl;
546 /* there are existing locks - find a match */
547 locks = (struct lock_struct *)dbuf.dptr;
548 count = dbuf.dsize / sizeof(*locks);
550 for (i=0; i<count; i++) {
552 if (brl_ctdb_same_context(&lock->context, &context) &&
553 lock->ntvfs == req->ntvfs &&
554 lock->start == req->start &&
555 lock->size == req->size &&
556 lock->lock_type == WRITE_LOCK) {
560 if (i < count) goto found;
562 for (i=0; i<count; i++) {
564 if (brl_ctdb_same_context(&lock->context, &context) &&
565 lock->ntvfs == req->ntvfs &&
566 lock->start == req->start &&
567 lock->size == req->size &&
568 lock->lock_type < PENDING_READ_LOCK) {
575 struct lock_struct removed_lock = *lock;
577 call->new_data = talloc(call, TDB_DATA);
578 if (call->new_data == NULL) {
579 return CTDB_ERR_NOMEM;
582 call->new_data->dptr = talloc_size(call, dbuf.dsize - sizeof(*lock));
583 if (call->new_data->dptr == NULL) {
584 return CTDB_ERR_NOMEM;
586 call->new_data->dsize = dbuf.dsize - sizeof(*lock);
588 memcpy(call->new_data->dptr, locks, i*sizeof(*lock));
589 memcpy(call->new_data->dptr+i*sizeof(*lock), locks+i+1,
590 (count-(i+1))*sizeof(*lock));
593 int ret = brl_ctdb_notify_unlock(call, locks, count, &removed_lock);
594 if (ret != 0) return ret;
599 /* we didn't find it */
600 status = NT_STATUS_RANGE_NOT_LOCKED;
603 call->status = NT_STATUS_V(status);
610 Unlock a range of bytes.
612 static NTSTATUS brl_ctdb_unlock(struct brl_context *brl,
613 struct brl_handle *brlh,
615 uint64_t start, uint64_t size)
617 struct ctdb_call call;
618 struct ctdb_unlock_req req;
621 call.call_id = FUNC_BRL_UNLOCK;
622 call.key.dptr = brlh->key.data;
623 call.key.dsize = brlh->key.length;
624 call.call_data.dptr = (uint8_t *)&req;
625 call.call_data.dsize = sizeof(req);
631 req.server = brl->server;
633 req.ntvfs = brlh->ntvfs;
635 ret = ctdb_call(brl->ctdb_db, &call);
637 DEBUG(0,("ctdb_call failed - %s\n", __location__));
638 return NT_STATUS_INTERNAL_DB_CORRUPTION;
641 brl_ctdb_notify_send(brl, &call.reply_data);
643 return NT_STATUS(call.status);
647 struct ctdb_remove_pending_req {
648 struct server_id server;
653 remove a pending lock. This is called when the caller has either
654 given up trying to establish a lock or when they have succeeded in
655 getting it. In either case they no longer need to be notified.
657 static int brl_ctdb_remove_pending_func(struct ctdb_call_info *call)
659 struct ctdb_remove_pending_req *req = (struct ctdb_remove_pending_req *)call->call_data->dptr;
662 struct lock_struct *locks;
663 NTSTATUS status = NT_STATUS_OK;
665 dbuf = call->record_data;
667 /* there are existing locks - find a match */
668 locks = (struct lock_struct *)dbuf.dptr;
669 count = dbuf.dsize / sizeof(*locks);
671 for (i=0; i<count; i++) {
672 struct lock_struct *lock = &locks[i];
674 if (lock->lock_type >= PENDING_READ_LOCK &&
675 lock->notify_ptr == req->notify_ptr &&
676 cluster_id_equal(&lock->context.server, &req->server)) {
677 call->new_data = talloc(call, TDB_DATA);
678 if (call->new_data == NULL) {
679 return CTDB_ERR_NOMEM;
682 call->new_data->dptr = talloc_size(call, dbuf.dsize - sizeof(*lock));
683 if (call->new_data->dptr == NULL) {
684 return CTDB_ERR_NOMEM;
686 call->new_data->dsize = dbuf.dsize - sizeof(*lock);
688 memcpy(call->new_data->dptr, locks, i*sizeof(*lock));
689 memcpy(call->new_data->dptr+i*sizeof(*lock), locks+i+1,
690 (count-(i+1))*sizeof(*lock));
696 /* we didn't find it */
697 status = NT_STATUS_RANGE_NOT_LOCKED;
700 call->status = NT_STATUS_V(status);
705 static NTSTATUS brl_ctdb_remove_pending(struct brl_context *brl,
706 struct brl_handle *brlh,
709 struct ctdb_call call;
710 struct ctdb_remove_pending_req req;
713 call.call_id = FUNC_BRL_REMOVE_PENDING;
714 call.key.dptr = brlh->key.data;
715 call.key.dsize = brlh->key.length;
716 call.call_data.dptr = (uint8_t *)&req;
717 call.call_data.dsize = sizeof(req);
720 req.notify_ptr = notify_ptr;
721 req.server = brl->server;
723 ret = ctdb_call(brl->ctdb_db, &call);
725 DEBUG(0,("ctdb_call failed - %s\n", __location__));
726 return NT_STATUS_INTERNAL_DB_CORRUPTION;
729 return NT_STATUS(call.status);
733 struct ctdb_locktest_req {
737 enum brl_type lock_type;
738 struct brl_context *brl;
739 struct server_id server;
740 struct ntvfs_handle *ntvfs;
744 remove a pending lock. This is called when the caller has either
745 given up trying to establish a lock or when they have succeeded in
746 getting it. In either case they no longer need to be notified.
748 static int brl_ctdb_locktest_func(struct ctdb_call_info *call)
750 struct ctdb_locktest_req *req = (struct ctdb_locktest_req *)call->call_data->dptr;
753 struct lock_struct *locks, lock;
754 NTSTATUS status = NT_STATUS_OK;
756 lock.context.smbpid = req->smbpid;
757 lock.context.server = req->server;
758 lock.context.ctx = req->brl;
759 lock.ntvfs = req->ntvfs;
760 lock.start = req->start;
761 lock.size = req->size;
762 lock.lock_type = req->lock_type;
764 dbuf = call->record_data;
766 /* there are existing locks - find a match */
767 locks = (struct lock_struct *)dbuf.dptr;
768 count = dbuf.dsize / sizeof(*locks);
770 for (i=0; i<count; i++) {
771 if (brl_ctdb_conflict_other(&locks[i], &lock)) {
772 status = NT_STATUS_FILE_LOCK_CONFLICT;
777 call->status = NT_STATUS_V(status);
783 Test if we are allowed to perform IO on a region of an open file
785 static NTSTATUS brl_ctdb_locktest(struct brl_context *brl,
786 struct brl_handle *brlh,
788 uint64_t start, uint64_t size,
789 enum brl_type lock_type)
791 struct ctdb_call call;
792 struct ctdb_locktest_req req;
795 call.call_id = FUNC_BRL_LOCKTEST;
796 call.key.dptr = brlh->key.data;
797 call.key.dsize = brlh->key.length;
798 call.call_data.dptr = (uint8_t *)&req;
799 call.call_data.dsize = sizeof(req);
805 req.lock_type = lock_type;
806 req.server = brl->server;
808 req.ntvfs = brlh->ntvfs;
810 ret = ctdb_call(brl->ctdb_db, &call);
812 DEBUG(0,("ctdb_call failed - %s\n", __location__));
813 return NT_STATUS_INTERNAL_DB_CORRUPTION;
816 return NT_STATUS(call.status);
820 struct ctdb_close_req {
821 struct brl_context *brl;
822 struct server_id server;
823 struct ntvfs_handle *ntvfs;
827 remove a pending lock. This is called when the caller has either
828 given up trying to establish a lock or when they have succeeded in
829 getting it. In either case they no longer need to be notified.
831 static int brl_ctdb_close_func(struct ctdb_call_info *call)
833 struct ctdb_close_req *req = (struct ctdb_close_req *)call->call_data->dptr;
835 int count, dcount=0, i;
836 struct lock_struct *locks;
837 NTSTATUS status = NT_STATUS_OK;
839 dbuf = call->record_data;
841 /* there are existing locks - find a match */
842 locks = (struct lock_struct *)dbuf.dptr;
843 count = dbuf.dsize / sizeof(*locks);
845 for (i=0; i<count; i++) {
846 struct lock_struct *lock = &locks[i];
848 if (lock->context.ctx == req->brl &&
849 cluster_id_equal(&lock->context.server, &req->server) &&
850 lock->ntvfs == req->ntvfs) {
851 /* found it - delete it */
852 if (count > 1 && i < count-1) {
853 memmove(&locks[i], &locks[i+1],
854 sizeof(*locks)*((count-1) - i));
863 call->new_data = talloc(call, TDB_DATA);
864 if (call->new_data == NULL) {
865 return CTDB_ERR_NOMEM;
868 brl_ctdb_notify_all(call, locks, count);
870 call->new_data->dptr = talloc_size(call, count*sizeof(struct lock_struct));
871 if (call->new_data->dptr == NULL) {
872 return CTDB_ERR_NOMEM;
874 call->new_data->dsize = count*sizeof(struct lock_struct);
876 memcpy(call->new_data->dptr, locks, count*sizeof(struct lock_struct));
879 call->status = NT_STATUS_V(status);
885 Test if we are allowed to perform IO on a region of an open file
887 static NTSTATUS brl_ctdb_close(struct brl_context *brl,
888 struct brl_handle *brlh)
890 struct ctdb_call call;
891 struct ctdb_close_req req;
894 call.call_id = FUNC_BRL_CLOSE;
895 call.key.dptr = brlh->key.data;
896 call.key.dsize = brlh->key.length;
897 call.call_data.dptr = (uint8_t *)&req;
898 call.call_data.dsize = sizeof(req);
902 req.server = brl->server;
903 req.ntvfs = brlh->ntvfs;
905 ret = ctdb_call(brl->ctdb_db, &call);
907 DEBUG(0,("ctdb_call failed - %s\n", __location__));
908 return NT_STATUS_INTERNAL_DB_CORRUPTION;
911 brl_ctdb_notify_send(brl, &call.reply_data);
913 return NT_STATUS(call.status);
917 static const struct brlock_ops brlock_tdb_ops = {
918 .brl_init = brl_ctdb_init,
919 .brl_create_handle = brl_ctdb_create_handle,
920 .brl_lock = brl_ctdb_lock,
921 .brl_unlock = brl_ctdb_unlock,
922 .brl_remove_pending = brl_ctdb_remove_pending,
923 .brl_locktest = brl_ctdb_locktest,
924 .brl_close = brl_ctdb_close
928 void brl_ctdb_init_ops(void)
930 struct ctdb_context *ctdb = talloc_get_type(cluster_backend_handle(),
931 struct ctdb_context);
932 struct ctdb_db_context *ctdb_db;
934 brl_set_ops(&brlock_tdb_ops);
936 ctdb_db = ctdb_db_handle(ctdb, "brlock");
937 if (ctdb_db == NULL) {
938 DEBUG(0,("Failed to get attached ctdb db handle for brlock\n"));
942 ctdb_set_call(ctdb_db, brl_ctdb_lock_func, FUNC_BRL_LOCK);
943 ctdb_set_call(ctdb_db, brl_ctdb_unlock_func, FUNC_BRL_UNLOCK);
944 ctdb_set_call(ctdb_db, brl_ctdb_remove_pending_func, FUNC_BRL_REMOVE_PENDING);
945 ctdb_set_call(ctdb_db, brl_ctdb_locktest_func, FUNC_BRL_LOCKTEST);
946 ctdb_set_call(ctdb_db, brl_ctdb_close_func, FUNC_BRL_CLOSE);