66342a8ac4ebe872442567c2f89d3ccc262b92f3
[metze/samba/wip.git] / source4 / cluster / ctdb / brlock_ctdb.c
1 /* 
2    Unix SMB/CIFS implementation.
3
4    generic byte range locking code - ctdb backend
5
6    Copyright (C) Andrew Tridgell 2006
7    
8    This program is free software; you can redistribute it and/or modify
9    it under the terms of the GNU General Public License as published by
10    the Free Software Foundation; either version 2 of the License, or
11    (at your option) any later version.
12    
13    This program is distributed in the hope that it will be useful,
14    but WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16    GNU General Public License for more details.
17    
18    You should have received a copy of the GNU General Public License
19    along with this program; if not, write to the Free Software
20    Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
21 */
22
23 #include "includes.h"
24 #include "system/filesys.h"
25 #include "lib/tdb/include/tdb.h"
26 #include "messaging/messaging.h"
27 #include "db_wrap.h"
28 #include "lib/messaging/irpc.h"
29 #include "libcli/libcli.h"
30 #include "cluster/cluster.h"
31 #include "ntvfs/ntvfs.h"
32 #include "ntvfs/common/brlock.h"
33 #include "include/ctdb.h"
34
35 enum my_functions {FUNC_BRL_LOCK=1, FUNC_BRL_UNLOCK=2, 
36                    FUNC_BRL_REMOVE_PENDING=3, FUNC_BRL_LOCKTEST=4,
37                    FUNC_BRL_CLOSE=5};
38
39 /*
40   in this module a "DATA_BLOB *file_key" is a blob that uniquely identifies
41   a file. For a local posix filesystem this will usually be a combination
42   of the device and inode numbers of the file, but it can be anything 
43   that uniquely idetifies a file for locking purposes, as long
44   as it is applied consistently.
45 */
46
47 /* this struct is typically attached to tcon */
48 struct brl_context {
49         struct ctdb_context *ctdb;
50         struct ctdb_db_context *ctdb_db;
51         struct server_id server;
52         struct messaging_context *messaging_ctx;
53 };
54
55 /*
56   the lock context contains the elements that define whether one
57   lock is the same as another lock
58 */
59 struct lock_context {
60         struct server_id server;
61         uint16_t smbpid;
62         struct brl_context *ctx;
63 };
64
65 /* The data in brlock records is an unsorted linear array of these
66    records.  It is unnecessary to store the count as tdb provides the
67    size of the record */
68 struct lock_struct {
69         struct lock_context context;
70         struct ntvfs_handle *ntvfs;
71         uint64_t start;
72         uint64_t size;
73         enum brl_type lock_type;
74         void *notify_ptr;
75 };
76
77 /* this struct is attached to on open file handle */
78 struct brl_handle {
79         DATA_BLOB key;
80         struct ntvfs_handle *ntvfs;
81         struct lock_struct last_lock;
82 };
83
84 #if 0
85 static void show_locks(const char *op, struct lock_struct *locks, int count)
86 {
87         int i;
88         DEBUG(0,("OP: %s\n", op));
89         if (locks == NULL) return;
90         for (i=0;i<count;i++) {
91                 DEBUG(0,("%2d: %4d %4d %d.%d.%d %p %p\n",
92                          i, (int)locks[i].start, (int)locks[i].size, 
93                          locks[i].context.server.node,
94                          locks[i].context.server.id,
95                          locks[i].context.smbpid,
96                          locks[i].context.ctx,
97                          locks[i].ntvfs));
98         }
99 }
100 #endif
101
102 /*
103   Open up the brlock.tdb database. Close it down using
104   talloc_free(). We need the messaging_ctx to allow for
105   pending lock notifications.
106 */
107 static struct brl_context *brl_ctdb_init(TALLOC_CTX *mem_ctx, struct server_id server, 
108                                     struct messaging_context *messaging_ctx)
109 {
110         struct ctdb_context *ctdb = talloc_get_type(cluster_backend_handle(), 
111                                                     struct ctdb_context);
112         struct brl_context *brl;
113
114         brl = talloc(mem_ctx, struct brl_context);
115         if (brl == NULL) {
116                 return NULL;
117         }
118
119         brl->ctdb = ctdb;
120         brl->ctdb_db = ctdb_db_handle(ctdb, "brlock");
121         if (brl->ctdb_db == NULL) {
122                 DEBUG(0,("Failed to get attached ctdb db handle for brlock\n"));
123                 talloc_free(brl);
124                 return NULL;
125         }
126         brl->server = server;
127         brl->messaging_ctx = messaging_ctx;
128
129         return brl;
130 }
131
132 static struct brl_handle *brl_ctdb_create_handle(TALLOC_CTX *mem_ctx, struct ntvfs_handle *ntvfs, 
133                                                     DATA_BLOB *file_key)
134 {
135         struct brl_handle *brlh;
136
137         brlh = talloc(mem_ctx, struct brl_handle);
138         if (brlh == NULL) {
139                 return NULL;
140         }
141
142         brlh->key = *file_key;
143         brlh->ntvfs = ntvfs;
144         ZERO_STRUCT(brlh->last_lock);
145
146         return brlh;
147 }
148
149 /*
150   see if two locking contexts are equal
151 */
152 static BOOL brl_ctdb_same_context(struct lock_context *ctx1, struct lock_context *ctx2)
153 {
154         return (cluster_id_equal(&ctx1->server, &ctx2->server) &&
155                 ctx1->smbpid == ctx2->smbpid &&
156                 ctx1->ctx == ctx2->ctx);
157 }
158
159 /*
160   see if lck1 and lck2 overlap
161 */
162 static BOOL brl_ctdb_overlap(struct lock_struct *lck1, 
163                         struct lock_struct *lck2)
164 {
165         /* this extra check is not redundent - it copes with locks
166            that go beyond the end of 64 bit file space */
167         if (lck1->size != 0 &&
168             lck1->start == lck2->start &&
169             lck1->size == lck2->size) {
170                 return True;
171         }
172             
173         if (lck1->start >= (lck2->start+lck2->size) ||
174             lck2->start >= (lck1->start+lck1->size)) {
175                 return False;
176         }
177         return True;
178
179
180 /*
181  See if lock2 can be added when lock1 is in place.
182 */
183 static BOOL brl_ctdb_conflict(struct lock_struct *lck1, 
184                          struct lock_struct *lck2)
185 {
186         /* pending locks don't conflict with anything */
187         if (lck1->lock_type >= PENDING_READ_LOCK ||
188             lck2->lock_type >= PENDING_READ_LOCK) {
189                 return False;
190         }
191
192         if (lck1->lock_type == READ_LOCK && lck2->lock_type == READ_LOCK) {
193                 return False;
194         }
195
196         if (brl_ctdb_same_context(&lck1->context, &lck2->context) &&
197             lck2->lock_type == READ_LOCK && lck1->ntvfs == lck2->ntvfs) {
198                 return False;
199         }
200
201         return brl_ctdb_overlap(lck1, lck2);
202
203
204
205 /*
206  Check to see if this lock conflicts, but ignore our own locks on the
207  same fnum only.
208 */
209 static BOOL brl_ctdb_conflict_other(struct lock_struct *lck1, struct lock_struct *lck2)
210 {
211         /* pending locks don't conflict with anything */
212         if (lck1->lock_type >= PENDING_READ_LOCK ||
213             lck2->lock_type >= PENDING_READ_LOCK) {
214                 return False;
215         }
216
217         if (lck1->lock_type == READ_LOCK && lck2->lock_type == READ_LOCK) 
218                 return False;
219
220         /*
221          * note that incoming write calls conflict with existing READ
222          * locks even if the context is the same. JRA. See LOCKTEST7
223          * in smbtorture.
224          */
225         if (brl_ctdb_same_context(&lck1->context, &lck2->context) &&
226             lck1->ntvfs == lck2->ntvfs &&
227             (lck2->lock_type == READ_LOCK || lck1->lock_type == WRITE_LOCK)) {
228                 return False;
229         }
230
231         return brl_ctdb_overlap(lck1, lck2);
232
233
234
235 /*
236   amazingly enough, w2k3 "remembers" whether the last lock failure
237   is the same as this one and changes its error code. I wonder if any
238   app depends on this?
239 */
240 static NTSTATUS brl_ctdb_lock_failed(struct brl_handle *brlh, struct lock_struct *lock)
241 {
242         /*
243          * this function is only called for non pending lock!
244          */
245
246         /* in SMB2 mode always return NT_STATUS_LOCK_NOT_GRANTED! */
247         if (lock->ntvfs->ctx->protocol == PROTOCOL_SMB2) {
248                 return NT_STATUS_LOCK_NOT_GRANTED;
249         }
250
251         /* 
252          * if the notify_ptr is non NULL,
253          * it means that we're at the end of a pending lock
254          * and the real lock is requested after the timeout went by
255          * In this case we need to remember the last_lock and always
256          * give FILE_LOCK_CONFLICT
257          */
258         if (lock->notify_ptr) {
259                 brlh->last_lock = *lock;
260                 return NT_STATUS_FILE_LOCK_CONFLICT;
261         }
262
263         /* 
264          * amazing the little things you learn with a test
265          * suite. Locks beyond this offset (as a 64 bit
266          * number!) always generate the conflict error code,
267          * unless the top bit is set
268          */
269         if (lock->start >= 0xEF000000 && (lock->start >> 63) == 0) {
270                 brlh->last_lock = *lock;
271                 return NT_STATUS_FILE_LOCK_CONFLICT;
272         }
273
274         /*
275          * if the current lock matches the last failed lock on the file handle
276          * and starts at the same offset, then FILE_LOCK_CONFLICT should be returned
277          */
278         if (cluster_id_equal(&lock->context.server, &brlh->last_lock.context.server) &&
279             lock->context.ctx == brlh->last_lock.context.ctx &&
280             lock->ntvfs == brlh->last_lock.ntvfs &&
281             lock->start == brlh->last_lock.start) {
282                 return NT_STATUS_FILE_LOCK_CONFLICT;
283         }
284
285         brlh->last_lock = *lock;
286         return NT_STATUS_LOCK_NOT_GRANTED;
287 }
288
289 struct ctdb_lock_req {
290         uint16_t smbpid;
291         uint64_t start;
292         uint64_t size;
293         enum brl_type lock_type;
294         void *notify_ptr;
295         struct server_id server;
296         struct brl_context *brl;
297         struct ntvfs_handle *ntvfs;
298 };
299
300 /*
301   ctdb call handling brl_lock()
302 */
303 static int brl_ctdb_lock_func(struct ctdb_call_info *call)
304 {
305         struct ctdb_lock_req *req = (struct ctdb_lock_req *)call->call_data->dptr;
306         TDB_DATA dbuf;
307         int count=0, i;
308         struct lock_struct lock, *locks=NULL;
309         NTSTATUS status = NT_STATUS_OK;
310
311         /* if this is a pending lock, then with the chainlock held we
312            try to get the real lock. If we succeed then we don't need
313            to make it pending. This prevents a possible race condition
314            where the pending lock gets created after the lock that is
315            preventing the real lock gets removed */
316         if (req->lock_type >= PENDING_READ_LOCK) {
317                 enum brl_type lock_type = req->lock_type;
318                 req->lock_type = (req->lock_type==PENDING_READ_LOCK? READ_LOCK : WRITE_LOCK);
319                 if (brl_ctdb_lock_func(call) == 0 && call->status == NT_STATUS_V(NT_STATUS_OK)) {
320                         return 0;
321                 }
322                 req->lock_type = lock_type;
323         }
324
325         dbuf = call->record_data;
326
327         ZERO_STRUCT(lock);
328         lock.context.smbpid = req->smbpid;
329         lock.context.server = req->server;
330         lock.context.ctx = req->brl;
331         lock.ntvfs = req->ntvfs;
332         lock.start = req->start;
333         lock.size = req->size;
334         lock.lock_type = req->lock_type;
335         lock.notify_ptr = req->notify_ptr;
336
337         if (dbuf.dptr) {
338                 /* there are existing locks - make sure they don't conflict */
339                 locks = (struct lock_struct *)dbuf.dptr;
340                 count = dbuf.dsize / sizeof(*locks);
341
342                 for (i=0; i<count; i++) {
343                         if (brl_ctdb_conflict(&locks[i], &lock)) {
344                                 status = NT_STATUS_LOCK_NOT_GRANTED;
345                                 goto reply;
346                         }
347                 }
348         }
349
350         call->new_data = talloc(call, TDB_DATA);
351         if (call->new_data == NULL) {
352                 return CTDB_ERR_NOMEM;
353         }
354
355         call->new_data->dptr = talloc_size(call, dbuf.dsize + sizeof(lock));
356         if (call->new_data->dptr == NULL) {
357                 return CTDB_ERR_NOMEM;
358         }
359         memcpy(call->new_data->dptr, locks, dbuf.dsize);
360         memcpy(call->new_data->dptr+dbuf.dsize, &lock, sizeof(lock));
361         call->new_data->dsize = dbuf.dsize + sizeof(lock);
362
363         if (req->lock_type >= PENDING_READ_LOCK) {
364                 status = NT_STATUS_LOCK_NOT_GRANTED;
365         }
366
367 reply:
368         call->status = NT_STATUS_V(status);
369
370         return 0;
371 }
372
373
374 /*
375   Lock a range of bytes.  The lock_type can be a PENDING_*_LOCK, in
376   which case a real lock is first tried, and if that fails then a
377   pending lock is created. When the pending lock is triggered (by
378   someone else closing an overlapping lock range) a messaging
379   notification is sent, identified by the notify_ptr
380 */
381 static NTSTATUS brl_ctdb_lock(struct brl_context *brl,
382                               struct brl_handle *brlh,
383                               uint16_t smbpid,
384                               uint64_t start, uint64_t size, 
385                               enum brl_type lock_type,
386                               void *notify_ptr)
387 {
388         struct ctdb_lock_req req;
389         struct ctdb_call call;
390         int ret;
391         NTSTATUS status;
392
393         call.call_id = FUNC_BRL_LOCK;
394         call.key.dptr = brlh->key.data;
395         call.key.dsize = brlh->key.length;
396         call.call_data.dptr = (uint8_t *)&req;
397         call.call_data.dsize = sizeof(req);
398         call.flags = 0;
399         call.status = 0;
400
401         ZERO_STRUCT(req);
402         req.smbpid = smbpid;
403         req.start  = start;
404         req.size   = size;
405         req.lock_type = lock_type;
406         req.notify_ptr = notify_ptr;
407         req.server = brl->server;
408         req.brl = brl;
409         req.ntvfs = brlh->ntvfs;
410
411         ret = ctdb_call(brl->ctdb_db, &call);
412         if (ret == -1) {
413                 return NT_STATUS_INTERNAL_DB_CORRUPTION;
414         }
415
416         status = NT_STATUS(call.status);
417
418         if (NT_STATUS_EQUAL(status, NT_STATUS_LOCK_NOT_GRANTED)) {
419                 struct lock_struct lock;
420                 lock.context.smbpid = smbpid;
421                 lock.context.server = brl->server;
422                 lock.context.ctx = brl;
423                 lock.ntvfs = brlh->ntvfs;
424                 lock.start = start;
425                 lock.size = size;
426                 lock.lock_type = lock_type;
427                 lock.notify_ptr = notify_ptr;
428                 status = brl_ctdb_lock_failed(brlh, &lock);
429         }
430
431         return status;
432 }
433
434 /*
435   we are removing a lock that might be holding up a pending lock. Scan
436   for pending locks that cover this range and if we find any then
437   notify the server that it should retry the lock. In this backend, we
438   notify by sending the list of locks that need to be notified on back
439   in the reply_data of the ctdb call. The caller then does the
440   messaging for us. 
441 */
442 static int brl_ctdb_notify_unlock(struct ctdb_call_info *call,
443                                   struct lock_struct *locks, int count, 
444                                    struct lock_struct *removed_lock)
445 {
446         int i, last_notice;
447
448         /* the last_notice logic is to prevent stampeding on a lock
449            range. It prevents us sending hundreds of notifies on the
450            same range of bytes. It doesn't prevent all possible
451            stampedes, but it does prevent the most common problem */
452         last_notice = -1;
453
454         for (i=0;i<count;i++) {
455                 if (locks[i].lock_type >= PENDING_READ_LOCK &&
456                     brl_ctdb_overlap(&locks[i], removed_lock)) {
457                         struct lock_struct *nlocks;
458                         int ncount;
459
460                         if (last_notice != -1 && brl_ctdb_overlap(&locks[i], &locks[last_notice])) {
461                                 continue;
462                         }
463                         if (locks[i].lock_type == PENDING_WRITE_LOCK) {
464                                 last_notice = i;
465                         }
466                         if (call->reply_data == NULL) {
467                                 call->reply_data = talloc_zero(call, TDB_DATA);
468                                 if (call->reply_data == NULL) {
469                                         return CTDB_ERR_NOMEM;
470                                 }
471                         }
472                         /* add to the list of pending locks to notify caller of */
473                         ncount = call->reply_data->dsize / sizeof(struct lock_struct);
474                         nlocks = talloc_realloc(call->reply_data, call->reply_data->dptr, 
475                                                 struct lock_struct, ncount + 1);
476                         if (nlocks == NULL) {
477                                 return CTDB_ERR_NOMEM;
478                         }
479                         call->reply_data->dptr = (uint8_t *)nlocks;
480                         nlocks[ncount] = locks[i];
481                         call->reply_data->dsize += sizeof(struct lock_struct);
482                 }
483         }
484
485         return 0;
486 }
487
488 /*
489   send notifications for all pending locks - the file is being closed by this
490   user
491 */
492 static int brl_ctdb_notify_all(struct ctdb_call_info *call,
493                                 struct lock_struct *locks, int count)
494 {
495         int i;
496         for (i=0;i<count;i++) {
497                 if (locks->lock_type >= PENDING_READ_LOCK) {
498                         int ret = brl_ctdb_notify_unlock(call, locks, count, &locks[i]);
499                         if (ret != 0) return ret;
500                 }
501         }
502         return 0;
503 }
504
505 /*
506   send off any messages needed to notify of pending locks that should now retry
507 */
508 static void brl_ctdb_notify_send(struct brl_context *brl, TDB_DATA *reply_data)
509 {
510         struct lock_struct *locks = (struct lock_struct *)reply_data->dptr;
511         int i, count = reply_data->dsize / sizeof(struct lock_struct);
512         for (i=0;i<count;i++) {
513                 messaging_send_ptr(brl->messaging_ctx, locks[i].context.server, 
514                                    MSG_BRL_RETRY, locks[i].notify_ptr);
515         }
516 }
517
518
519 struct ctdb_unlock_req {
520         uint16_t smbpid;
521         uint64_t start;
522         uint64_t size;
523         struct server_id server;
524         struct brl_context *brl;
525         struct ntvfs_handle *ntvfs;
526 };
527
528 /*
529  Unlock a range of bytes.
530 */
531 static int brl_ctdb_unlock_func(struct ctdb_call_info *call)
532 {
533         struct ctdb_unlock_req *req = (struct ctdb_unlock_req *)call->call_data->dptr;
534         TDB_DATA dbuf;
535         int count, i;
536         struct lock_struct *locks, *lock;
537         struct lock_context context;
538         NTSTATUS status = NT_STATUS_OK;
539
540         dbuf = call->record_data;
541
542         context.smbpid = req->smbpid;
543         context.server = req->server;
544         context.ctx = req->brl;
545
546         /* there are existing locks - find a match */
547         locks = (struct lock_struct *)dbuf.dptr;
548         count = dbuf.dsize / sizeof(*locks);
549
550         for (i=0; i<count; i++) {
551                 lock = &locks[i];
552                 if (brl_ctdb_same_context(&lock->context, &context) &&
553                     lock->ntvfs == req->ntvfs &&
554                     lock->start == req->start &&
555                     lock->size == req->size &&
556                     lock->lock_type == WRITE_LOCK) {
557                         break;
558                 }
559         }
560         if (i < count) goto found;
561
562         for (i=0; i<count; i++) {
563                 lock = &locks[i];
564                 if (brl_ctdb_same_context(&lock->context, &context) &&
565                     lock->ntvfs == req->ntvfs &&
566                     lock->start == req->start &&
567                     lock->size == req->size &&
568                     lock->lock_type < PENDING_READ_LOCK) {
569                         break;
570                 }
571         }
572
573 found:
574         if (i < count) {
575                 struct lock_struct removed_lock = *lock;
576
577                 call->new_data = talloc(call, TDB_DATA);
578                 if (call->new_data == NULL) {
579                         return CTDB_ERR_NOMEM;
580                 }
581                 
582                 call->new_data->dptr = talloc_size(call, dbuf.dsize - sizeof(*lock));
583                 if (call->new_data->dptr == NULL) {
584                         return CTDB_ERR_NOMEM;
585                 }
586                 call->new_data->dsize = dbuf.dsize - sizeof(*lock);
587                 
588                 memcpy(call->new_data->dptr, locks, i*sizeof(*lock));
589                 memcpy(call->new_data->dptr+i*sizeof(*lock), locks+i+1,
590                        (count-(i+1))*sizeof(*lock));
591                 
592                 if (count > 1) {
593                         int ret = brl_ctdb_notify_unlock(call, locks, count, &removed_lock);
594                         if (ret != 0) return ret;
595                 }
596         }
597
598         if (i == count) {
599                 /* we didn't find it */
600                 status = NT_STATUS_RANGE_NOT_LOCKED;
601         }
602
603         call->status = NT_STATUS_V(status);
604
605         return 0;
606 }
607
608
609 /*
610  Unlock a range of bytes.
611 */
612 static NTSTATUS brl_ctdb_unlock(struct brl_context *brl,
613                                 struct brl_handle *brlh, 
614                                 uint16_t smbpid,
615                                 uint64_t start, uint64_t size)
616 {
617         struct ctdb_call call;
618         struct ctdb_unlock_req req;
619         int ret;
620
621         call.call_id = FUNC_BRL_UNLOCK;
622         call.key.dptr = brlh->key.data;
623         call.key.dsize = brlh->key.length;
624         call.call_data.dptr = (uint8_t *)&req;
625         call.call_data.dsize = sizeof(req);
626
627         ZERO_STRUCT(req);
628         req.smbpid = smbpid;
629         req.start  = start;
630         req.size   = size;
631         req.server = brl->server;
632         req.brl = brl;
633         req.ntvfs = brlh->ntvfs;
634                 
635         ret = ctdb_call(brl->ctdb_db, &call);
636         if (ret == -1) {
637                 DEBUG(0,("ctdb_call failed - %s\n", __location__));
638                 return NT_STATUS_INTERNAL_DB_CORRUPTION;
639         }
640
641         brl_ctdb_notify_send(brl, &call.reply_data);
642
643         return NT_STATUS(call.status);
644 }
645
646
647 struct ctdb_remove_pending_req {
648         struct server_id server;
649         void *notify_ptr;
650 };
651
652 /*
653   remove a pending lock. This is called when the caller has either
654   given up trying to establish a lock or when they have succeeded in
655   getting it. In either case they no longer need to be notified.
656 */
657 static int brl_ctdb_remove_pending_func(struct ctdb_call_info *call)
658 {
659         struct ctdb_remove_pending_req *req = (struct ctdb_remove_pending_req *)call->call_data->dptr;
660         TDB_DATA dbuf;
661         int count, i;
662         struct lock_struct *locks;
663         NTSTATUS status = NT_STATUS_OK;
664
665         dbuf = call->record_data;
666
667         /* there are existing locks - find a match */
668         locks = (struct lock_struct *)dbuf.dptr;
669         count = dbuf.dsize / sizeof(*locks);
670
671         for (i=0; i<count; i++) {
672                 struct lock_struct *lock = &locks[i];
673                 
674                 if (lock->lock_type >= PENDING_READ_LOCK &&
675                     lock->notify_ptr == req->notify_ptr &&
676                     cluster_id_equal(&lock->context.server, &req->server)) {
677                         call->new_data = talloc(call, TDB_DATA);
678                         if (call->new_data == NULL) {
679                                 return CTDB_ERR_NOMEM;
680                         }
681
682                         call->new_data->dptr = talloc_size(call, dbuf.dsize - sizeof(*lock));
683                         if (call->new_data->dptr == NULL) {
684                                 return CTDB_ERR_NOMEM;
685                         }
686                         call->new_data->dsize = dbuf.dsize - sizeof(*lock);
687
688                         memcpy(call->new_data->dptr, locks, i*sizeof(*lock));
689                         memcpy(call->new_data->dptr+i*sizeof(*lock), locks+i+1,
690                                (count-(i+1))*sizeof(*lock));
691                         break;
692                 }
693         }
694         
695         if (i == count) {
696                 /* we didn't find it */
697                 status = NT_STATUS_RANGE_NOT_LOCKED;
698         }
699
700         call->status = NT_STATUS_V(status);
701
702         return 0;
703 }
704
705 static NTSTATUS brl_ctdb_remove_pending(struct brl_context *brl,
706                                         struct brl_handle *brlh, 
707                                         void *notify_ptr)
708 {
709         struct ctdb_call call;
710         struct ctdb_remove_pending_req req;
711         int ret;
712
713         call.call_id = FUNC_BRL_REMOVE_PENDING;
714         call.key.dptr = brlh->key.data;
715         call.key.dsize = brlh->key.length;
716         call.call_data.dptr = (uint8_t *)&req;
717         call.call_data.dsize = sizeof(req);
718
719         ZERO_STRUCT(req);
720         req.notify_ptr = notify_ptr;
721         req.server = brl->server;
722                 
723         ret = ctdb_call(brl->ctdb_db, &call);
724         if (ret == -1) {
725                 DEBUG(0,("ctdb_call failed - %s\n", __location__));
726                 return NT_STATUS_INTERNAL_DB_CORRUPTION;
727         }
728
729         return NT_STATUS(call.status);
730 }
731
732
733 struct ctdb_locktest_req {
734         uint16_t smbpid;
735         uint64_t start;
736         uint64_t size;
737         enum brl_type lock_type;
738         struct brl_context *brl;
739         struct server_id server;
740         struct ntvfs_handle *ntvfs;
741 };
742
743 /*
744   remove a pending lock. This is called when the caller has either
745   given up trying to establish a lock or when they have succeeded in
746   getting it. In either case they no longer need to be notified.
747 */
748 static int brl_ctdb_locktest_func(struct ctdb_call_info *call)
749 {
750         struct ctdb_locktest_req *req = (struct ctdb_locktest_req *)call->call_data->dptr;
751         TDB_DATA dbuf;
752         int count, i;
753         struct lock_struct *locks, lock;
754         NTSTATUS status = NT_STATUS_OK;
755
756         lock.context.smbpid = req->smbpid;
757         lock.context.server = req->server;
758         lock.context.ctx = req->brl;
759         lock.ntvfs = req->ntvfs;
760         lock.start = req->start;
761         lock.size = req->size;
762         lock.lock_type = req->lock_type;
763
764         dbuf = call->record_data;
765
766         /* there are existing locks - find a match */
767         locks = (struct lock_struct *)dbuf.dptr;
768         count = dbuf.dsize / sizeof(*locks);
769
770         for (i=0; i<count; i++) {
771                 if (brl_ctdb_conflict_other(&locks[i], &lock)) {
772                         status = NT_STATUS_FILE_LOCK_CONFLICT;
773                         break;
774                 }
775         }
776         
777         call->status = NT_STATUS_V(status);
778
779         return 0;
780 }
781
782 /*
783   Test if we are allowed to perform IO on a region of an open file
784 */
785 static NTSTATUS brl_ctdb_locktest(struct brl_context *brl,
786                                   struct brl_handle *brlh,
787                                   uint16_t smbpid, 
788                                   uint64_t start, uint64_t size, 
789                                   enum brl_type lock_type)
790 {
791         struct ctdb_call call;
792         struct ctdb_locktest_req req;
793         int ret;
794
795         call.call_id = FUNC_BRL_LOCKTEST;
796         call.key.dptr = brlh->key.data;
797         call.key.dsize = brlh->key.length;
798         call.call_data.dptr = (uint8_t *)&req;
799         call.call_data.dsize = sizeof(req);
800
801         ZERO_STRUCT(req);
802         req.smbpid = smbpid;
803         req.start  = start;
804         req.size   = size;
805         req.lock_type = lock_type;
806         req.server = brl->server;
807         req.brl = brl;
808         req.ntvfs = brlh->ntvfs;
809
810         ret = ctdb_call(brl->ctdb_db, &call);
811         if (ret == -1) {
812                 DEBUG(0,("ctdb_call failed - %s\n", __location__));
813                 return NT_STATUS_INTERNAL_DB_CORRUPTION;
814         }
815
816         return NT_STATUS(call.status);
817 }
818
819
820 struct ctdb_close_req {
821         struct brl_context *brl;
822         struct server_id server;
823         struct ntvfs_handle *ntvfs;
824 };
825
826 /*
827   remove a pending lock. This is called when the caller has either
828   given up trying to establish a lock or when they have succeeded in
829   getting it. In either case they no longer need to be notified.
830 */
831 static int brl_ctdb_close_func(struct ctdb_call_info *call)
832 {
833         struct ctdb_close_req *req = (struct ctdb_close_req *)call->call_data->dptr;
834         TDB_DATA dbuf;
835         int count, dcount=0, i;
836         struct lock_struct *locks;
837         NTSTATUS status = NT_STATUS_OK;
838
839         dbuf = call->record_data;
840
841         /* there are existing locks - find a match */
842         locks = (struct lock_struct *)dbuf.dptr;
843         count = dbuf.dsize / sizeof(*locks);
844
845         for (i=0; i<count; i++) {
846                 struct lock_struct *lock = &locks[i];
847
848                 if (lock->context.ctx == req->brl &&
849                     cluster_id_equal(&lock->context.server, &req->server) &&
850                     lock->ntvfs == req->ntvfs) {
851                         /* found it - delete it */
852                         if (count > 1 && i < count-1) {
853                                 memmove(&locks[i], &locks[i+1], 
854                                         sizeof(*locks)*((count-1) - i));
855                         }
856                         count--;
857                         i--;
858                         dcount++;
859                 }
860         }
861
862         if (dcount > 0) {
863                 call->new_data = talloc(call, TDB_DATA);
864                 if (call->new_data == NULL) {
865                         return CTDB_ERR_NOMEM;
866                 }
867
868                 brl_ctdb_notify_all(call, locks, count);
869                 
870                 call->new_data->dptr = talloc_size(call, count*sizeof(struct lock_struct));
871                 if (call->new_data->dptr == NULL) {
872                         return CTDB_ERR_NOMEM;
873                 }
874                 call->new_data->dsize = count*sizeof(struct lock_struct);
875
876                 memcpy(call->new_data->dptr, locks, count*sizeof(struct lock_struct));
877         }
878
879         call->status = NT_STATUS_V(status);
880
881         return 0;
882 }
883
884 /*
885   Test if we are allowed to perform IO on a region of an open file
886 */
887 static NTSTATUS brl_ctdb_close(struct brl_context *brl,
888                                struct brl_handle *brlh)
889 {
890         struct ctdb_call call;
891         struct ctdb_close_req req;
892         int ret;
893
894         call.call_id = FUNC_BRL_CLOSE;
895         call.key.dptr = brlh->key.data;
896         call.key.dsize = brlh->key.length;
897         call.call_data.dptr = (uint8_t *)&req;
898         call.call_data.dsize = sizeof(req);
899
900         ZERO_STRUCT(req);
901         req.brl = brl;
902         req.server = brl->server;
903         req.ntvfs = brlh->ntvfs;
904
905         ret = ctdb_call(brl->ctdb_db, &call);
906         if (ret == -1) {
907                 DEBUG(0,("ctdb_call failed - %s\n", __location__));
908                 return NT_STATUS_INTERNAL_DB_CORRUPTION;
909         }
910
911         brl_ctdb_notify_send(brl, &call.reply_data);
912
913         return NT_STATUS(call.status);
914 }
915
916
917 static const struct brlock_ops brlock_tdb_ops = {
918         .brl_init           = brl_ctdb_init,
919         .brl_create_handle  = brl_ctdb_create_handle,
920         .brl_lock           = brl_ctdb_lock,
921         .brl_unlock         = brl_ctdb_unlock,
922         .brl_remove_pending = brl_ctdb_remove_pending,
923         .brl_locktest       = brl_ctdb_locktest,
924         .brl_close          = brl_ctdb_close
925 };
926
927
928 void brl_ctdb_init_ops(void)
929 {
930         struct ctdb_context *ctdb = talloc_get_type(cluster_backend_handle(), 
931                                                     struct ctdb_context);
932         struct ctdb_db_context *ctdb_db;
933
934         brl_set_ops(&brlock_tdb_ops);
935
936         ctdb_db = ctdb_db_handle(ctdb, "brlock");
937         if (ctdb_db == NULL) {
938                 DEBUG(0,("Failed to get attached ctdb db handle for brlock\n"));
939                 return;
940         }
941
942         ctdb_set_call(ctdb_db, brl_ctdb_lock_func,  FUNC_BRL_LOCK);
943         ctdb_set_call(ctdb_db, brl_ctdb_unlock_func,  FUNC_BRL_UNLOCK);
944         ctdb_set_call(ctdb_db, brl_ctdb_remove_pending_func,  FUNC_BRL_REMOVE_PENDING);
945         ctdb_set_call(ctdb_db, brl_ctdb_locktest_func,  FUNC_BRL_LOCKTEST);
946         ctdb_set_call(ctdb_db, brl_ctdb_close_func,  FUNC_BRL_CLOSE);
947 }