ea75427ffc63d6868dc79b63db34b57ae0145b46
[samba.git] / source3 / lib / dbwrap / dbwrap_watch.c
1 /*
2    Unix SMB/CIFS implementation.
3    Watch dbwrap record changes
4    Copyright (C) Volker Lendecke 2012
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15
16    You should have received a copy of the GNU General Public License
17    along with this program.  If not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #include "system/filesys.h"
22 #include "dbwrap/dbwrap.h"
23 #include "dbwrap_watch.h"
24 #include "dbwrap_open.h"
25 #include "msg_channel.h"
26 #include "lib/util/util_tdb.h"
27 #include "lib/util/tevent_ntstatus.h"
28
29 static struct db_context *dbwrap_record_watchers_db(void)
30 {
31         static struct db_context *watchers_db;
32
33         if (watchers_db == NULL) {
34                 watchers_db = db_open(NULL, lock_path("dbwrap_watchers.tdb"),
35                                       0, TDB_CLEAR_IF_FIRST, O_RDWR|O_CREAT,
36                                       0600, DBWRAP_LOCK_ORDER_3);
37         }
38         return watchers_db;
39 }
40
41 static TDB_DATA dbwrap_record_watchers_key(TALLOC_CTX *mem_ctx,
42                                            struct db_context *db,
43                                            struct db_record *rec,
44                                            TDB_DATA *rec_key)
45 {
46         const uint8_t *db_id;
47         size_t db_id_len;
48         TDB_DATA key, wkey;
49
50         dbwrap_db_id(db, &db_id, &db_id_len);
51         key = dbwrap_record_get_key(rec);
52
53         wkey.dsize = sizeof(uint32_t) + db_id_len + key.dsize;
54         wkey.dptr = talloc_array(mem_ctx, uint8_t, wkey.dsize);
55         if (wkey.dptr == NULL) {
56                 return make_tdb_data(NULL, 0);
57         }
58         SIVAL(wkey.dptr, 0, db_id_len);
59         memcpy(wkey.dptr + sizeof(uint32_t), db_id, db_id_len);
60         memcpy(wkey.dptr + sizeof(uint32_t) + db_id_len, key.dptr, key.dsize);
61
62         if (rec_key != NULL) {
63                 rec_key->dptr = wkey.dptr + sizeof(uint32_t) + db_id_len;
64                 rec_key->dsize = key.dsize;
65         }
66
67         return wkey;
68 }
69
70 static bool dbwrap_record_watchers_key_parse(
71         TDB_DATA wkey, uint8_t **p_db_id, size_t *p_db_id_len, TDB_DATA *key)
72 {
73         size_t db_id_len;
74
75         if (wkey.dsize < sizeof(uint32_t)) {
76                 DEBUG(1, ("Invalid watchers key\n"));
77                 return false;
78         }
79         db_id_len = IVAL(wkey.dptr, 0);
80         if (db_id_len > (wkey.dsize - sizeof(uint32_t))) {
81                 DEBUG(1, ("Invalid watchers key, wkey.dsize=%d, "
82                           "db_id_len=%d\n", (int)wkey.dsize, (int)db_id_len));
83                 return false;
84         }
85         *p_db_id = wkey.dptr + sizeof(uint32_t);
86         *p_db_id_len = db_id_len;
87         key->dptr = wkey.dptr + sizeof(uint32_t) + db_id_len;
88         key->dsize = wkey.dsize - sizeof(uint32_t) - db_id_len;
89         return true;
90 }
91
92 static NTSTATUS dbwrap_record_add_watcher(TDB_DATA w_key, struct server_id id)
93 {
94         struct TALLOC_CTX *frame = talloc_stackframe();
95         struct db_context *db;
96         struct db_record *rec;
97         TDB_DATA value;
98         struct server_id *ids;
99         size_t num_ids;
100         NTSTATUS status;
101
102         db = dbwrap_record_watchers_db();
103         if (db == NULL) {
104                 status = map_nt_error_from_unix(errno);
105                 goto fail;
106         }
107         rec = dbwrap_fetch_locked(db, talloc_tos(), w_key);
108         if (rec == NULL) {
109                 status = map_nt_error_from_unix(errno);
110                 goto fail;
111         }
112         value = dbwrap_record_get_value(rec);
113
114         if ((value.dsize % sizeof(struct server_id)) != 0) {
115                 status = NT_STATUS_INTERNAL_DB_CORRUPTION;
116                 goto fail;
117         }
118
119         ids = (struct server_id *)value.dptr;
120         num_ids = value.dsize / sizeof(struct server_id);
121
122         ids = talloc_realloc(talloc_tos(), ids, struct server_id,
123                              num_ids + 1);
124         if (ids == NULL) {
125                 status = NT_STATUS_NO_MEMORY;
126                 goto fail;
127         }
128         ids[num_ids] = id;
129         num_ids += 1;
130
131         status = dbwrap_record_store(
132                 rec, make_tdb_data((uint8_t *)ids, talloc_get_size(ids)), 0);
133 fail:
134         TALLOC_FREE(frame);
135         return status;
136 }
137
138 static NTSTATUS dbwrap_record_del_watcher(TDB_DATA w_key, struct server_id id)
139 {
140         struct TALLOC_CTX *frame = talloc_stackframe();
141         struct db_context *db;
142         struct db_record *rec;
143         struct server_id *ids;
144         size_t i, num_ids;
145         TDB_DATA value;
146         NTSTATUS status;
147
148         db = dbwrap_record_watchers_db();
149         if (db == NULL) {
150                 status = map_nt_error_from_unix(errno);
151                 goto fail;
152         }
153         rec = dbwrap_fetch_locked(db, talloc_tos(), w_key);
154         if (rec == NULL) {
155                 status = map_nt_error_from_unix(errno);
156                 goto fail;
157         }
158         value = dbwrap_record_get_value(rec);
159
160         if ((value.dsize % sizeof(struct server_id)) != 0) {
161                 status = NT_STATUS_INTERNAL_DB_CORRUPTION;
162                 goto fail;
163         }
164
165         ids = (struct server_id *)value.dptr;
166         num_ids = value.dsize / sizeof(struct server_id);
167
168         for (i=0; i<num_ids; i++) {
169                 if (procid_equal(&id, &ids[i])) {
170                         ids[i] = ids[num_ids-1];
171                         value.dsize -= sizeof(struct server_id);
172                         break;
173                 }
174         }
175         if (value.dsize == 0) {
176                 status = dbwrap_record_delete(rec);
177                 goto done;
178         }
179         status = dbwrap_record_store(rec, value, 0);
180 fail:
181 done:
182         TALLOC_FREE(frame);
183         return status;
184 }
185
186 static NTSTATUS dbwrap_record_get_watchers(struct db_context *db,
187                                            struct db_record *rec,
188                                            TALLOC_CTX *mem_ctx,
189                                            struct server_id **p_ids,
190                                            size_t *p_num_ids)
191 {
192         struct db_context *w_db;
193         TDB_DATA key = { 0, };
194         TDB_DATA value = { 0, };
195         struct server_id *ids;
196         NTSTATUS status;
197
198         key = dbwrap_record_watchers_key(talloc_tos(), db, rec, NULL);
199         if (key.dptr == NULL) {
200                 status = NT_STATUS_NO_MEMORY;
201                 goto fail;
202         }
203         w_db = dbwrap_record_watchers_db();
204         if (w_db == NULL) {
205                 status = NT_STATUS_INTERNAL_ERROR;
206                 goto fail;
207         }
208         status = dbwrap_fetch(w_db, mem_ctx, key, &value);
209         TALLOC_FREE(key.dptr);
210         if (!NT_STATUS_IS_OK(status)) {
211                 goto fail;
212         }
213         if ((value.dsize % sizeof(struct server_id)) != 0) {
214                 status = NT_STATUS_INTERNAL_DB_CORRUPTION;
215                 goto fail;
216         }
217         ids = (struct server_id *)value.dptr;
218         *p_ids = talloc_move(mem_ctx, &ids);
219         *p_num_ids = value.dsize / sizeof(struct server_id);
220         return NT_STATUS_OK;
221 fail:
222         TALLOC_FREE(key.dptr);
223         TALLOC_FREE(value.dptr);
224         return status;
225 }
226
227 struct dbwrap_record_watch_state {
228         struct tevent_context *ev;
229         struct db_context *db;
230         struct tevent_req *req;
231         struct messaging_context *msg;
232         struct msg_channel *channel;
233         TDB_DATA key;
234         TDB_DATA w_key;
235 };
236
237 static void dbwrap_record_watch_done(struct tevent_req *subreq);
238 static int dbwrap_record_watch_state_destructor(
239         struct dbwrap_record_watch_state *state);
240
241 struct tevent_req *dbwrap_record_watch_send(TALLOC_CTX *mem_ctx,
242                                             struct tevent_context *ev,
243                                             struct db_record *rec,
244                                             struct messaging_context *msg)
245 {
246         struct tevent_req *req, *subreq;
247         struct dbwrap_record_watch_state *state;
248         struct db_context *watchers_db;
249         NTSTATUS status;
250         int ret;
251
252         req = tevent_req_create(mem_ctx, &state,
253                                 struct dbwrap_record_watch_state);
254         if (req == NULL) {
255                 return NULL;
256         }
257         state->db = dbwrap_record_get_db(rec);
258         state->ev = ev;
259         state->req = req;
260         state->msg = msg;
261
262         watchers_db = dbwrap_record_watchers_db();
263         if (watchers_db == NULL) {
264                 tevent_req_nterror(req, map_nt_error_from_unix(errno));
265                 return tevent_req_post(req, ev);
266         }
267
268         state->w_key = dbwrap_record_watchers_key(state, state->db, rec,
269                                                   &state->key);
270         if (tevent_req_nomem(state->w_key.dptr, req)) {
271                 return tevent_req_post(req, ev);
272         }
273
274         ret = msg_channel_init(state, state->msg, MSG_DBWRAP_MODIFIED,
275                                &state->channel);
276         if (ret != 0) {
277                 tevent_req_nterror(req, map_nt_error_from_unix(ret));
278                 return tevent_req_post(req, ev);
279         }
280
281         status = dbwrap_record_add_watcher(
282                 state->w_key, messaging_server_id(state->msg));
283         if (tevent_req_nterror(req, status)) {
284                 return tevent_req_post(req, ev);
285         }
286         talloc_set_destructor(state, dbwrap_record_watch_state_destructor);
287
288         subreq = msg_read_send(state, state->ev, state->channel);
289         if (tevent_req_nomem(subreq, req)) {
290                 return tevent_req_post(req, ev);
291         }
292         tevent_req_set_callback(subreq, dbwrap_record_watch_done, req);
293         return req;
294 }
295
296 static int dbwrap_record_watch_state_destructor(
297         struct dbwrap_record_watch_state *s)
298 {
299         if (s->msg != NULL) {
300                 dbwrap_record_del_watcher(
301                         s->w_key, messaging_server_id(s->msg));
302         }
303         return 0;
304 }
305
306 static void dbwrap_watch_record_stored(struct db_context *db,
307                                        struct db_record *rec,
308                                        void *private_data)
309 {
310         struct messaging_context *msg = talloc_get_type_abort(
311                 private_data, struct messaging_context);
312         struct server_id *ids = NULL;
313         size_t num_ids = 0;
314         TDB_DATA w_key = { 0, };
315         DATA_BLOB w_blob;
316         NTSTATUS status;
317         uint32_t i;
318
319         status = dbwrap_record_get_watchers(db, rec, talloc_tos(),
320                                             &ids, &num_ids);
321         if (NT_STATUS_EQUAL(status, NT_STATUS_NOT_FOUND)) {
322                 goto done;
323         }
324         if (!NT_STATUS_IS_OK(status)) {
325                 DEBUG(1, ("dbwrap_record_get_watchers failed: %s\n",
326                           nt_errstr(status)));
327                 goto done;
328         }
329         w_key = dbwrap_record_watchers_key(talloc_tos(), db, rec, NULL);
330         if (w_key.dptr == NULL) {
331                 DEBUG(1, ("dbwrap_record_watchers_key failed\n"));
332                 goto done;
333         }
334         w_blob.data = w_key.dptr;
335         w_blob.length = w_key.dsize;
336
337         for (i=0; i<num_ids; i++) {
338                 status = messaging_send(msg, ids[i], MSG_DBWRAP_MODIFIED,
339                                         &w_blob);
340                 if (!NT_STATUS_IS_OK(status)) {
341                         char *str = procid_str_static(&ids[i]);
342                         DEBUG(1, ("messaging_send to %s failed: %s\n",
343                                   str, nt_errstr(status)));
344                         TALLOC_FREE(str);
345                 }
346         }
347 done:
348         TALLOC_FREE(w_key.dptr);
349         TALLOC_FREE(ids);
350         return;
351 }
352
353 void dbwrap_watch_db(struct db_context *db, struct messaging_context *msg)
354 {
355         dbwrap_set_stored_callback(db, dbwrap_watch_record_stored, msg);
356 }
357
358 static void dbwrap_record_watch_done(struct tevent_req *subreq)
359 {
360         struct tevent_req *req = tevent_req_callback_data(
361                 subreq, struct tevent_req);
362         struct dbwrap_record_watch_state *state = tevent_req_data(
363                 req, struct dbwrap_record_watch_state);
364         struct messaging_rec *rec;
365         int ret;
366
367         ret = msg_read_recv(subreq, talloc_tos(), &rec);
368         TALLOC_FREE(subreq);
369         if (ret != 0) {
370                 tevent_req_nterror(req, map_nt_error_from_unix(ret));
371                 return;
372         }
373
374         if ((rec->buf.length == state->w_key.dsize) &&
375             (memcmp(rec->buf.data, state->w_key.dptr, rec->buf.length) == 0)) {
376                 tevent_req_done(req);
377                 return;
378         }
379
380         /*
381          * Not our record, wait for the next one
382          */
383         subreq = msg_read_send(state, state->ev, state->channel);
384         if (tevent_req_nomem(subreq, req)) {
385                 return;
386         }
387         tevent_req_set_callback(subreq, dbwrap_record_watch_done, req);
388 }
389
390 NTSTATUS dbwrap_record_watch_recv(struct tevent_req *req,
391                                   TALLOC_CTX *mem_ctx,
392                                   struct db_record **prec)
393 {
394         struct dbwrap_record_watch_state *state = tevent_req_data(
395                 req, struct dbwrap_record_watch_state);
396         NTSTATUS status;
397         struct db_record *rec;
398
399         if (tevent_req_is_nterror(req, &status)) {
400                 return status;
401         }
402         rec = dbwrap_fetch_locked(state->db, mem_ctx, state->key);
403         if (rec == NULL) {
404                 return NT_STATUS_INTERNAL_DB_ERROR;
405         }
406         *prec = rec;
407         return NT_STATUS_OK;
408 }
409
410 struct dbwrap_watchers_traverse_read_state {
411         int (*fn)(const uint8_t *db_id, size_t db_id_len, const TDB_DATA key,
412                   const struct server_id *watchers, size_t num_watchers,
413                   void *private_data);
414         void *private_data;
415 };
416
417 static int dbwrap_watchers_traverse_read_callback(
418         struct db_record *rec, void *private_data)
419 {
420         struct dbwrap_watchers_traverse_read_state *state =
421                 (struct dbwrap_watchers_traverse_read_state *)private_data;
422         uint8_t *db_id;
423         size_t db_id_len;
424         TDB_DATA w_key, key, w_data;
425         int res;
426
427         w_key = dbwrap_record_get_key(rec);
428         w_data = dbwrap_record_get_value(rec);
429
430         if (!dbwrap_record_watchers_key_parse(w_key, &db_id, &db_id_len,
431                                               &key)) {
432                 return 0;
433         }
434         if ((w_data.dsize % sizeof(struct server_id)) != 0) {
435                 return 0;
436         }
437         res = state->fn(db_id, db_id_len, key,
438                         (struct server_id *)w_data.dptr,
439                         w_data.dsize / sizeof(struct server_id),
440                         state->private_data);
441         return res;
442 }
443
444 void dbwrap_watchers_traverse_read(
445         int (*fn)(const uint8_t *db_id, size_t db_id_len, const TDB_DATA key,
446                   const struct server_id *watchers, size_t num_watchers,
447                   void *private_data),
448         void *private_data)
449 {
450         struct dbwrap_watchers_traverse_read_state state;
451         struct db_context *db;
452
453         db = dbwrap_record_watchers_db();
454         if (db == NULL) {
455                 return;
456         }
457         state.fn = fn;
458         state.private_data = private_data;
459         dbwrap_traverse_read(db, dbwrap_watchers_traverse_read_callback,
460                              &state, NULL);
461 }
462
463 static int dbwrap_wakeall_cb(const uint8_t *db_id, size_t db_id_len,
464                              const TDB_DATA key,
465                              const struct server_id *watchers,
466                              size_t num_watchers,
467                              void *private_data)
468 {
469         struct messaging_context *msg = talloc_get_type_abort(
470                 private_data, struct messaging_context);
471         uint32_t i;
472         DATA_BLOB blob;
473
474         blob.data = key.dptr;
475         blob.length = key.dsize;
476
477         for (i=0; i<num_watchers; i++) {
478                 messaging_send(msg, watchers[i], MSG_DBWRAP_MODIFIED, &blob);
479         }
480         return 0;
481 }
482
483 void dbwrap_watchers_wakeall(struct messaging_context *msg)
484 {
485         dbwrap_watchers_traverse_read(dbwrap_wakeall_cb, msg);
486 }