d7392a32353c5c3bfccffec54c44e004e32a9f1d
[kai/samba.git] / source3 / lib / dbwrap / dbwrap_watch.c
1 /*
2    Unix SMB/CIFS implementation.
3    Watch dbwrap record changes
4    Copyright (C) Volker Lendecke 2012
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15
16    You should have received a copy of the GNU General Public License
17    along with this program.  If not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #include "system/filesys.h"
22 #include "dbwrap/dbwrap.h"
23 #include "dbwrap_watch.h"
24 #include "dbwrap_open.h"
25 #include "msg_channel.h"
26 #include "lib/util/util_tdb.h"
27 #include "lib/util/tevent_ntstatus.h"
28
29 static struct db_context *dbwrap_record_watchers_db(void)
30 {
31         static struct db_context *watchers_db;
32
33         if (watchers_db == NULL) {
34                 watchers_db = db_open(NULL, lock_path("dbwrap_watchers.tdb"),
35                                       0, TDB_CLEAR_IF_FIRST, O_RDWR|O_CREAT,
36                                       0600, DBWRAP_LOCK_ORDER_3);
37         }
38         return watchers_db;
39 }
40
41 static TDB_DATA dbwrap_record_watchers_key(TALLOC_CTX *mem_ctx,
42                                            struct db_context *db,
43                                            struct db_record *rec,
44                                            TDB_DATA *rec_key)
45 {
46         const uint8_t *db_id;
47         size_t db_id_len;
48         TDB_DATA key, wkey;
49
50         dbwrap_db_id(db, &db_id, &db_id_len);
51         key = dbwrap_record_get_key(rec);
52
53         wkey.dsize = sizeof(uint32_t) + db_id_len + key.dsize;
54         wkey.dptr = talloc_array(mem_ctx, uint8_t, wkey.dsize);
55         if (wkey.dptr == NULL) {
56                 return make_tdb_data(NULL, 0);
57         }
58         SIVAL(wkey.dptr, 0, db_id_len);
59         memcpy(wkey.dptr + sizeof(uint32_t), db_id, db_id_len);
60         memcpy(wkey.dptr + sizeof(uint32_t) + db_id_len, key.dptr, key.dsize);
61
62         if (rec_key != NULL) {
63                 rec_key->dptr = wkey.dptr + sizeof(uint32_t) + db_id_len;
64                 rec_key->dsize = key.dsize;
65         }
66
67         return wkey;
68 }
69
70 static bool dbwrap_record_watchers_key_parse(
71         TDB_DATA wkey, uint8_t **p_db_id, size_t *p_db_id_len, TDB_DATA *key)
72 {
73         size_t db_id_len;
74
75         if (wkey.dsize < sizeof(uint32_t)) {
76                 DEBUG(1, ("Invalid watchers key\n"));
77                 return false;
78         }
79         db_id_len = IVAL(wkey.dptr, 0);
80         if (db_id_len > (wkey.dsize - sizeof(uint32_t))) {
81                 DEBUG(1, ("Invalid watchers key, wkey.dsize=%d, "
82                           "db_id_len=%d\n", (int)wkey.dsize, (int)db_id_len));
83                 return false;
84         }
85         *p_db_id = wkey.dptr + sizeof(uint32_t);
86         *p_db_id_len = db_id_len;
87         key->dptr = wkey.dptr + sizeof(uint32_t) + db_id_len;
88         key->dsize = wkey.dsize - sizeof(uint32_t) - db_id_len;
89         return true;
90 }
91
92 static NTSTATUS dbwrap_record_add_watcher(TDB_DATA w_key, struct server_id id)
93 {
94         struct TALLOC_CTX *frame = talloc_stackframe();
95         struct db_context *db;
96         struct db_record *rec;
97         TDB_DATA value;
98         struct server_id *ids;
99         size_t num_ids;
100         NTSTATUS status;
101
102         db = dbwrap_record_watchers_db();
103         if (db == NULL) {
104                 status = map_nt_error_from_unix(errno);
105                 goto fail;
106         }
107         rec = dbwrap_fetch_locked(db, talloc_tos(), w_key);
108         if (rec == NULL) {
109                 status = map_nt_error_from_unix(errno);
110                 goto fail;
111         }
112         value = dbwrap_record_get_value(rec);
113
114         if ((value.dsize % sizeof(struct server_id)) != 0) {
115                 status = NT_STATUS_INTERNAL_DB_CORRUPTION;
116                 goto fail;
117         }
118
119         ids = (struct server_id *)value.dptr;
120         num_ids = value.dsize / sizeof(struct server_id);
121
122         ids = talloc_array(talloc_tos(), struct server_id,
123                            num_ids + 1);
124         if (ids == NULL) {
125                 status = NT_STATUS_NO_MEMORY;
126                 goto fail;
127         }
128         memcpy(ids, value.dptr, value.dsize);
129         ids[num_ids] = id;
130         num_ids += 1;
131
132         status = dbwrap_record_store(
133                 rec, make_tdb_data((uint8_t *)ids, talloc_get_size(ids)), 0);
134 fail:
135         TALLOC_FREE(frame);
136         return status;
137 }
138
139 static NTSTATUS dbwrap_record_del_watcher(TDB_DATA w_key, struct server_id id)
140 {
141         struct TALLOC_CTX *frame = talloc_stackframe();
142         struct db_context *db;
143         struct db_record *rec;
144         struct server_id *ids;
145         size_t i, num_ids;
146         TDB_DATA value;
147         NTSTATUS status;
148
149         db = dbwrap_record_watchers_db();
150         if (db == NULL) {
151                 status = map_nt_error_from_unix(errno);
152                 goto fail;
153         }
154         rec = dbwrap_fetch_locked(db, talloc_tos(), w_key);
155         if (rec == NULL) {
156                 status = map_nt_error_from_unix(errno);
157                 goto fail;
158         }
159         value = dbwrap_record_get_value(rec);
160
161         if ((value.dsize % sizeof(struct server_id)) != 0) {
162                 status = NT_STATUS_INTERNAL_DB_CORRUPTION;
163                 goto fail;
164         }
165
166         ids = (struct server_id *)value.dptr;
167         num_ids = value.dsize / sizeof(struct server_id);
168
169         for (i=0; i<num_ids; i++) {
170                 if (serverid_equal(&id, &ids[i])) {
171                         ids[i] = ids[num_ids-1];
172                         value.dsize -= sizeof(struct server_id);
173                         break;
174                 }
175         }
176         if (value.dsize == 0) {
177                 status = dbwrap_record_delete(rec);
178                 goto done;
179         }
180         status = dbwrap_record_store(rec, value, 0);
181 fail:
182 done:
183         TALLOC_FREE(frame);
184         return status;
185 }
186
187 static NTSTATUS dbwrap_record_get_watchers(struct db_context *db,
188                                            struct db_record *rec,
189                                            TALLOC_CTX *mem_ctx,
190                                            struct server_id **p_ids,
191                                            size_t *p_num_ids)
192 {
193         struct db_context *w_db;
194         TDB_DATA key = { 0, };
195         TDB_DATA value = { 0, };
196         struct server_id *ids;
197         NTSTATUS status;
198
199         key = dbwrap_record_watchers_key(talloc_tos(), db, rec, NULL);
200         if (key.dptr == NULL) {
201                 status = NT_STATUS_NO_MEMORY;
202                 goto fail;
203         }
204         w_db = dbwrap_record_watchers_db();
205         if (w_db == NULL) {
206                 status = NT_STATUS_INTERNAL_ERROR;
207                 goto fail;
208         }
209         status = dbwrap_fetch(w_db, mem_ctx, key, &value);
210         TALLOC_FREE(key.dptr);
211         if (!NT_STATUS_IS_OK(status)) {
212                 goto fail;
213         }
214         if ((value.dsize % sizeof(struct server_id)) != 0) {
215                 status = NT_STATUS_INTERNAL_DB_CORRUPTION;
216                 goto fail;
217         }
218         ids = (struct server_id *)value.dptr;
219         *p_ids = talloc_move(mem_ctx, &ids);
220         *p_num_ids = value.dsize / sizeof(struct server_id);
221         return NT_STATUS_OK;
222 fail:
223         TALLOC_FREE(key.dptr);
224         TALLOC_FREE(value.dptr);
225         return status;
226 }
227
228 struct dbwrap_record_watch_state {
229         struct tevent_context *ev;
230         struct db_context *db;
231         struct tevent_req *req;
232         struct messaging_context *msg;
233         struct msg_channel *channel;
234         TDB_DATA key;
235         TDB_DATA w_key;
236 };
237
238 static void dbwrap_record_watch_done(struct tevent_req *subreq);
239 static int dbwrap_record_watch_state_destructor(
240         struct dbwrap_record_watch_state *state);
241
242 struct tevent_req *dbwrap_record_watch_send(TALLOC_CTX *mem_ctx,
243                                             struct tevent_context *ev,
244                                             struct db_record *rec,
245                                             struct messaging_context *msg)
246 {
247         struct tevent_req *req, *subreq;
248         struct dbwrap_record_watch_state *state;
249         struct db_context *watchers_db;
250         NTSTATUS status;
251         int ret;
252
253         req = tevent_req_create(mem_ctx, &state,
254                                 struct dbwrap_record_watch_state);
255         if (req == NULL) {
256                 return NULL;
257         }
258         state->db = dbwrap_record_get_db(rec);
259         state->ev = ev;
260         state->req = req;
261         state->msg = msg;
262
263         watchers_db = dbwrap_record_watchers_db();
264         if (watchers_db == NULL) {
265                 tevent_req_nterror(req, map_nt_error_from_unix(errno));
266                 return tevent_req_post(req, ev);
267         }
268
269         state->w_key = dbwrap_record_watchers_key(state, state->db, rec,
270                                                   &state->key);
271         if (tevent_req_nomem(state->w_key.dptr, req)) {
272                 return tevent_req_post(req, ev);
273         }
274
275         ret = msg_channel_init(state, state->msg, MSG_DBWRAP_MODIFIED,
276                                &state->channel);
277         if (ret != 0) {
278                 tevent_req_nterror(req, map_nt_error_from_unix(ret));
279                 return tevent_req_post(req, ev);
280         }
281
282         status = dbwrap_record_add_watcher(
283                 state->w_key, messaging_server_id(state->msg));
284         if (tevent_req_nterror(req, status)) {
285                 return tevent_req_post(req, ev);
286         }
287         talloc_set_destructor(state, dbwrap_record_watch_state_destructor);
288
289         subreq = msg_read_send(state, state->ev, state->channel);
290         if (tevent_req_nomem(subreq, req)) {
291                 return tevent_req_post(req, ev);
292         }
293         tevent_req_set_callback(subreq, dbwrap_record_watch_done, req);
294         return req;
295 }
296
297 static int dbwrap_record_watch_state_destructor(
298         struct dbwrap_record_watch_state *s)
299 {
300         if (s->msg != NULL) {
301                 dbwrap_record_del_watcher(
302                         s->w_key, messaging_server_id(s->msg));
303         }
304         return 0;
305 }
306
307 static void dbwrap_watch_record_stored(struct db_context *db,
308                                        struct db_record *rec,
309                                        void *private_data)
310 {
311         struct messaging_context *msg = talloc_get_type_abort(
312                 private_data, struct messaging_context);
313         struct server_id *ids = NULL;
314         size_t num_ids = 0;
315         TDB_DATA w_key = { 0, };
316         DATA_BLOB w_blob;
317         NTSTATUS status;
318         uint32_t i;
319
320         status = dbwrap_record_get_watchers(db, rec, talloc_tos(),
321                                             &ids, &num_ids);
322         if (NT_STATUS_EQUAL(status, NT_STATUS_NOT_FOUND)) {
323                 goto done;
324         }
325         if (!NT_STATUS_IS_OK(status)) {
326                 DEBUG(1, ("dbwrap_record_get_watchers failed: %s\n",
327                           nt_errstr(status)));
328                 goto done;
329         }
330         w_key = dbwrap_record_watchers_key(talloc_tos(), db, rec, NULL);
331         if (w_key.dptr == NULL) {
332                 DEBUG(1, ("dbwrap_record_watchers_key failed\n"));
333                 goto done;
334         }
335         w_blob.data = w_key.dptr;
336         w_blob.length = w_key.dsize;
337
338         for (i=0; i<num_ids; i++) {
339                 status = messaging_send(msg, ids[i], MSG_DBWRAP_MODIFIED,
340                                         &w_blob);
341                 if (!NT_STATUS_IS_OK(status)) {
342                         char *str = procid_str_static(&ids[i]);
343                         DEBUG(1, ("messaging_send to %s failed: %s\n",
344                                   str, nt_errstr(status)));
345                         TALLOC_FREE(str);
346                 }
347         }
348 done:
349         TALLOC_FREE(w_key.dptr);
350         TALLOC_FREE(ids);
351         return;
352 }
353
354 void dbwrap_watch_db(struct db_context *db, struct messaging_context *msg)
355 {
356         dbwrap_set_stored_callback(db, dbwrap_watch_record_stored, msg);
357 }
358
359 static void dbwrap_record_watch_done(struct tevent_req *subreq)
360 {
361         struct tevent_req *req = tevent_req_callback_data(
362                 subreq, struct tevent_req);
363         struct dbwrap_record_watch_state *state = tevent_req_data(
364                 req, struct dbwrap_record_watch_state);
365         struct messaging_rec *rec;
366         int ret;
367
368         ret = msg_read_recv(subreq, talloc_tos(), &rec);
369         TALLOC_FREE(subreq);
370         if (ret != 0) {
371                 tevent_req_nterror(req, map_nt_error_from_unix(ret));
372                 return;
373         }
374
375         if ((rec->buf.length == state->w_key.dsize) &&
376             (memcmp(rec->buf.data, state->w_key.dptr, rec->buf.length) == 0)) {
377                 tevent_req_done(req);
378                 return;
379         }
380
381         /*
382          * Not our record, wait for the next one
383          */
384         subreq = msg_read_send(state, state->ev, state->channel);
385         if (tevent_req_nomem(subreq, req)) {
386                 return;
387         }
388         tevent_req_set_callback(subreq, dbwrap_record_watch_done, req);
389 }
390
391 NTSTATUS dbwrap_record_watch_recv(struct tevent_req *req,
392                                   TALLOC_CTX *mem_ctx,
393                                   struct db_record **prec)
394 {
395         struct dbwrap_record_watch_state *state = tevent_req_data(
396                 req, struct dbwrap_record_watch_state);
397         NTSTATUS status;
398         struct db_record *rec;
399
400         if (tevent_req_is_nterror(req, &status)) {
401                 return status;
402         }
403         rec = dbwrap_fetch_locked(state->db, mem_ctx, state->key);
404         if (rec == NULL) {
405                 return NT_STATUS_INTERNAL_DB_ERROR;
406         }
407         *prec = rec;
408         return NT_STATUS_OK;
409 }
410
411 struct dbwrap_watchers_traverse_read_state {
412         int (*fn)(const uint8_t *db_id, size_t db_id_len, const TDB_DATA key,
413                   const struct server_id *watchers, size_t num_watchers,
414                   void *private_data);
415         void *private_data;
416 };
417
418 static int dbwrap_watchers_traverse_read_callback(
419         struct db_record *rec, void *private_data)
420 {
421         struct dbwrap_watchers_traverse_read_state *state =
422                 (struct dbwrap_watchers_traverse_read_state *)private_data;
423         uint8_t *db_id;
424         size_t db_id_len;
425         TDB_DATA w_key, key, w_data;
426         int res;
427
428         w_key = dbwrap_record_get_key(rec);
429         w_data = dbwrap_record_get_value(rec);
430
431         if (!dbwrap_record_watchers_key_parse(w_key, &db_id, &db_id_len,
432                                               &key)) {
433                 return 0;
434         }
435         if ((w_data.dsize % sizeof(struct server_id)) != 0) {
436                 return 0;
437         }
438         res = state->fn(db_id, db_id_len, key,
439                         (struct server_id *)w_data.dptr,
440                         w_data.dsize / sizeof(struct server_id),
441                         state->private_data);
442         return res;
443 }
444
445 void dbwrap_watchers_traverse_read(
446         int (*fn)(const uint8_t *db_id, size_t db_id_len, const TDB_DATA key,
447                   const struct server_id *watchers, size_t num_watchers,
448                   void *private_data),
449         void *private_data)
450 {
451         struct dbwrap_watchers_traverse_read_state state;
452         struct db_context *db;
453
454         db = dbwrap_record_watchers_db();
455         if (db == NULL) {
456                 return;
457         }
458         state.fn = fn;
459         state.private_data = private_data;
460         dbwrap_traverse_read(db, dbwrap_watchers_traverse_read_callback,
461                              &state, NULL);
462 }
463
464 static int dbwrap_wakeall_cb(const uint8_t *db_id, size_t db_id_len,
465                              const TDB_DATA key,
466                              const struct server_id *watchers,
467                              size_t num_watchers,
468                              void *private_data)
469 {
470         struct messaging_context *msg = talloc_get_type_abort(
471                 private_data, struct messaging_context);
472         uint32_t i;
473         DATA_BLOB blob;
474
475         blob.data = key.dptr;
476         blob.length = key.dsize;
477
478         for (i=0; i<num_watchers; i++) {
479                 messaging_send(msg, watchers[i], MSG_DBWRAP_MODIFIED, &blob);
480         }
481         return 0;
482 }
483
484 void dbwrap_watchers_wakeall(struct messaging_context *msg)
485 {
486         dbwrap_watchers_traverse_read(dbwrap_wakeall_cb, msg);
487 }