s3: use TDB_INCOMPATIBLE_HASH (the jenkins hash) on all TDB_CLEAR_IF_FIRST tdb's.
[obnox/samba-ctdb.git] / source3 / lib / messages_local.c
1 /* 
2    Unix SMB/CIFS implementation.
3    Samba internal messaging functions
4    Copyright (C) 2007 by Volker Lendecke
5    
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program.  If not, see <http://www.gnu.org/licenses/>.
18 */
19
20 /**
21   @defgroup messages Internal messaging framework
22   @{
23   @file messages.c
24   
25   @brief  Module for internal messaging between Samba daemons. 
26
27    The idea is that if a part of Samba wants to do communication with
28    another Samba process then it will do a message_register() of a
29    dispatch function, and use message_send_pid() to send messages to
30    that process.
31
32    The dispatch function is given the pid of the sender, and it can
33    use that to reply by message_send_pid().  See ping_message() for a
34    simple example.
35
36    @caution Dispatch functions must be able to cope with incoming
37    messages on an *odd* byte boundary.
38
39    This system doesn't have any inherent size limitations but is not
40    very efficient for large messages or when messages are sent in very
41    quick succession.
42
43 */
44
45 #include "includes.h"
46 #include "librpc/gen_ndr/messaging.h"
47 #include "librpc/gen_ndr/ndr_messaging.h"
48
49 struct messaging_tdb_context {
50         struct messaging_context *msg_ctx;
51         struct tdb_wrap *tdb;
52         struct tevent_signal *se;
53         int received_messages;
54 };
55
56 static NTSTATUS messaging_tdb_send(struct messaging_context *msg_ctx,
57                                    struct server_id pid, int msg_type,
58                                    const DATA_BLOB *data,
59                                    struct messaging_backend *backend);
60 static void message_dispatch(struct messaging_context *msg_ctx);
61
62 static void messaging_tdb_signal_handler(struct tevent_context *ev_ctx,
63                                          struct tevent_signal *se,
64                                          int signum, int count,
65                                          void *_info, void *private_data)
66 {
67         struct messaging_tdb_context *ctx = talloc_get_type(private_data,
68                                             struct messaging_tdb_context);
69
70         ctx->received_messages++;
71
72         DEBUG(10, ("messaging_tdb_signal_handler: sig[%d] count[%d] msgs[%d]\n",
73                    signum, count, ctx->received_messages));
74
75         message_dispatch(ctx->msg_ctx);
76 }
77
78 /****************************************************************************
79  Initialise the messaging functions. 
80 ****************************************************************************/
81
82 NTSTATUS messaging_tdb_init(struct messaging_context *msg_ctx,
83                             TALLOC_CTX *mem_ctx,
84                             struct messaging_backend **presult)
85 {
86         struct messaging_backend *result;
87         struct messaging_tdb_context *ctx;
88
89         if (!(result = TALLOC_P(mem_ctx, struct messaging_backend))) {
90                 DEBUG(0, ("talloc failed\n"));
91                 return NT_STATUS_NO_MEMORY;
92         }
93
94         ctx = TALLOC_ZERO_P(result, struct messaging_tdb_context);
95         if (!ctx) {
96                 DEBUG(0, ("talloc failed\n"));
97                 TALLOC_FREE(result);
98                 return NT_STATUS_NO_MEMORY;
99         }
100         result->private_data = ctx;
101         result->send_fn = messaging_tdb_send;
102
103         ctx->msg_ctx = msg_ctx;
104
105         ctx->tdb = tdb_wrap_open(ctx, lock_path("messages.tdb"),
106                                  0, TDB_CLEAR_IF_FIRST|TDB_INCOMPATIBLE_HASH|TDB_DEFAULT,
107                                  O_RDWR|O_CREAT,0600);
108
109         if (!ctx->tdb) {
110                 NTSTATUS status = map_nt_error_from_unix(errno);
111                 DEBUG(0, ("ERROR: Failed to initialise messages database: "
112                           "%s\n", strerror(errno)));
113                 TALLOC_FREE(result);
114                 return status;
115         }
116
117         ctx->se = tevent_add_signal(msg_ctx->event_ctx,
118                                     ctx,
119                                     SIGUSR1, 0,
120                                     messaging_tdb_signal_handler,
121                                     ctx);
122         if (!ctx->se) {
123                 NTSTATUS status = map_nt_error_from_unix(errno);
124                 DEBUG(0, ("ERROR: Failed to initialise messages signal handler: "
125                           "%s\n", strerror(errno)));
126                 TALLOC_FREE(result);
127                 return status;
128         }
129
130         sec_init();
131
132         /* Activate the per-hashchain freelist */
133         tdb_set_max_dead(ctx->tdb->tdb, 5);
134
135         *presult = result;
136         return NT_STATUS_OK;
137 }
138
139 bool messaging_tdb_parent_init(void)
140 {
141         struct tdb_wrap *db;
142
143         /*
144          * Open the tdb in the parent process (smbd) so that our
145          * CLEAR_IF_FIRST optimization in tdb_reopen_all can properly
146          * work.
147          */
148
149         db = tdb_wrap_open(talloc_autofree_context(),
150                            lock_path("messages.tdb"), 0,
151                            TDB_CLEAR_IF_FIRST|TDB_INCOMPATIBLE_HASH|TDB_DEFAULT|TDB_VOLATILE,
152                            O_RDWR|O_CREAT,0600);
153         if (db == NULL) {
154                 DEBUG(1, ("could not open messaging.tdb: %s\n",
155                           strerror(errno)));
156                 return false;
157         }
158         return true;
159 }
160
161 /*******************************************************************
162  Form a static tdb key from a pid.
163 ******************************************************************/
164
165 static TDB_DATA message_key_pid(TALLOC_CTX *mem_ctx, struct server_id pid)
166 {
167         char *key;
168         TDB_DATA kbuf;
169
170         key = talloc_asprintf(talloc_tos(), "PID/%s", procid_str_static(&pid));
171
172         SMB_ASSERT(key != NULL);
173         
174         kbuf.dptr = (uint8 *)key;
175         kbuf.dsize = strlen(key)+1;
176         return kbuf;
177 }
178
179 /*
180   Fetch the messaging array for a process
181  */
182
183 static NTSTATUS messaging_tdb_fetch(TDB_CONTEXT *msg_tdb,
184                                     TDB_DATA key,
185                                     TALLOC_CTX *mem_ctx,
186                                     struct messaging_array **presult)
187 {
188         struct messaging_array *result;
189         TDB_DATA data;
190         DATA_BLOB blob;
191         enum ndr_err_code ndr_err;
192
193         if (!(result = TALLOC_ZERO_P(mem_ctx, struct messaging_array))) {
194                 return NT_STATUS_NO_MEMORY;
195         }
196
197         data = tdb_fetch(msg_tdb, key);
198
199         if (data.dptr == NULL) {
200                 *presult = result;
201                 return NT_STATUS_OK;
202         }
203
204         blob = data_blob_const(data.dptr, data.dsize);
205
206         ndr_err = ndr_pull_struct_blob(
207                 &blob, result, NULL, result,
208                 (ndr_pull_flags_fn_t)ndr_pull_messaging_array);
209
210         SAFE_FREE(data.dptr);
211
212         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
213                 TALLOC_FREE(result);
214                 return ndr_map_error2ntstatus(ndr_err);
215         }
216
217         if (DEBUGLEVEL >= 10) {
218                 DEBUG(10, ("messaging_tdb_fetch:\n"));
219                 NDR_PRINT_DEBUG(messaging_array, result);
220         }
221
222         *presult = result;
223         return NT_STATUS_OK;
224 }
225
226 /*
227   Store a messaging array for a pid
228 */
229
230 static NTSTATUS messaging_tdb_store(TDB_CONTEXT *msg_tdb,
231                                     TDB_DATA key,
232                                     struct messaging_array *array)
233 {
234         TDB_DATA data;
235         DATA_BLOB blob;
236         enum ndr_err_code ndr_err;
237         TALLOC_CTX *mem_ctx;
238         int ret;
239
240         if (array->num_messages == 0) {
241                 tdb_delete(msg_tdb, key);
242                 return NT_STATUS_OK;
243         }
244
245         if (!(mem_ctx = talloc_new(array))) {
246                 return NT_STATUS_NO_MEMORY;
247         }
248
249         ndr_err = ndr_push_struct_blob(
250                 &blob, mem_ctx, NULL, array,
251                 (ndr_push_flags_fn_t)ndr_push_messaging_array);
252
253         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
254                 talloc_free(mem_ctx);
255                 return ndr_map_error2ntstatus(ndr_err);
256         }
257
258         if (DEBUGLEVEL >= 10) {
259                 DEBUG(10, ("messaging_tdb_store:\n"));
260                 NDR_PRINT_DEBUG(messaging_array, array);
261         }
262
263         data.dptr = blob.data;
264         data.dsize = blob.length;
265
266         ret = tdb_store(msg_tdb, key, data, TDB_REPLACE);
267         TALLOC_FREE(mem_ctx);
268
269         return (ret == 0) ? NT_STATUS_OK : NT_STATUS_INTERNAL_DB_CORRUPTION;
270 }
271
272 /****************************************************************************
273  Notify a process that it has a message. If the process doesn't exist 
274  then delete its record in the database.
275 ****************************************************************************/
276
277 static NTSTATUS message_notify(struct server_id procid)
278 {
279         pid_t pid = procid.pid;
280         int ret;
281         uid_t euid = geteuid();
282
283         /*
284          * Doing kill with a non-positive pid causes messages to be
285          * sent to places we don't want.
286          */
287
288         SMB_ASSERT(pid > 0);
289
290         if (euid != 0) {
291                 /* If we're not root become so to send the message. */
292                 save_re_uid();
293                 set_effective_uid(0);
294         }
295
296         ret = kill(pid, SIGUSR1);
297
298         if (euid != 0) {
299                 /* Go back to who we were. */
300                 int saved_errno = errno;
301                 restore_re_uid_fromroot();
302                 errno = saved_errno;
303         }
304
305         if (ret == 0) {
306                 return NT_STATUS_OK;
307         }
308
309         /*
310          * Something has gone wrong
311          */
312
313         DEBUG(2,("message to process %d failed - %s\n", (int)pid,
314                  strerror(errno)));
315
316         /*
317          * No call to map_nt_error_from_unix -- don't want to link in
318          * errormap.o into lots of utils.
319          */
320
321         if (errno == ESRCH)  return NT_STATUS_INVALID_HANDLE;
322         if (errno == EINVAL) return NT_STATUS_INVALID_PARAMETER;
323         if (errno == EPERM)  return NT_STATUS_ACCESS_DENIED;
324         return NT_STATUS_UNSUCCESSFUL;
325 }
326
327 /****************************************************************************
328  Send a message to a particular pid.
329 ****************************************************************************/
330
331 static NTSTATUS messaging_tdb_send(struct messaging_context *msg_ctx,
332                                    struct server_id pid, int msg_type,
333                                    const DATA_BLOB *data,
334                                    struct messaging_backend *backend)
335 {
336         struct messaging_tdb_context *ctx = talloc_get_type(backend->private_data,
337                                             struct messaging_tdb_context);
338         struct messaging_array *msg_array;
339         struct messaging_rec *rec;
340         NTSTATUS status;
341         TDB_DATA key;
342         struct tdb_wrap *tdb = ctx->tdb;
343         TALLOC_CTX *frame = talloc_stackframe();
344
345         /* NULL pointer means implicit length zero. */
346         if (!data->data) {
347                 SMB_ASSERT(data->length == 0);
348         }
349
350         /*
351          * Doing kill with a non-positive pid causes messages to be
352          * sent to places we don't want.
353          */
354
355         SMB_ASSERT(procid_to_pid(&pid) > 0);
356
357         key = message_key_pid(frame, pid);
358
359         if (tdb_chainlock(tdb->tdb, key) == -1) {
360                 TALLOC_FREE(frame);
361                 return NT_STATUS_LOCK_NOT_GRANTED;
362         }
363
364         status = messaging_tdb_fetch(tdb->tdb, key, talloc_tos(), &msg_array);
365
366         if (!NT_STATUS_IS_OK(status)) {
367                 goto done;
368         }
369
370         if ((msg_type & MSG_FLAG_LOWPRIORITY)
371             && (msg_array->num_messages > 1000)) {
372                 DEBUG(5, ("Dropping message for PID %s\n",
373                           procid_str_static(&pid)));
374                 status = NT_STATUS_INSUFFICIENT_RESOURCES;
375                 goto done;
376         }
377
378         if (!(rec = TALLOC_REALLOC_ARRAY(talloc_tos(), msg_array->messages,
379                                          struct messaging_rec,
380                                          msg_array->num_messages+1))) {
381                 status = NT_STATUS_NO_MEMORY;
382                 goto done;
383         }
384
385         rec[msg_array->num_messages].msg_version = MESSAGE_VERSION;
386         rec[msg_array->num_messages].msg_type = msg_type & MSG_TYPE_MASK;
387         rec[msg_array->num_messages].dest = pid;
388         rec[msg_array->num_messages].src = procid_self();
389         rec[msg_array->num_messages].buf = *data;
390
391         msg_array->messages = rec;
392         msg_array->num_messages += 1;
393
394         status = messaging_tdb_store(tdb->tdb, key, msg_array);
395
396         if (!NT_STATUS_IS_OK(status)) {
397                 goto done;
398         }
399         
400         status = message_notify(pid);
401
402         if (NT_STATUS_EQUAL(status, NT_STATUS_INVALID_HANDLE)) {
403                 DEBUG(2, ("pid %s doesn't exist - deleting messages record\n",
404                           procid_str_static(&pid)));
405                 tdb_delete(tdb->tdb, message_key_pid(talloc_tos(), pid));
406         }
407
408  done:
409         tdb_chainunlock(tdb->tdb, key);
410         TALLOC_FREE(frame);
411         return status;
412 }
413
414 /****************************************************************************
415  Retrieve all messages for the current process.
416 ****************************************************************************/
417
418 static NTSTATUS retrieve_all_messages(TDB_CONTEXT *msg_tdb,
419                                       TALLOC_CTX *mem_ctx,
420                                       struct messaging_array **presult)
421 {
422         struct messaging_array *result;
423         TDB_DATA key = message_key_pid(mem_ctx, procid_self());
424         NTSTATUS status;
425
426         if (tdb_chainlock(msg_tdb, key) == -1) {
427                 TALLOC_FREE(key.dptr);
428                 return NT_STATUS_LOCK_NOT_GRANTED;
429         }
430
431         status = messaging_tdb_fetch(msg_tdb, key, mem_ctx, &result);
432
433         /*
434          * We delete the record here, tdb_set_max_dead keeps it around
435          */
436         tdb_delete(msg_tdb, key);
437         tdb_chainunlock(msg_tdb, key);
438
439         if (NT_STATUS_IS_OK(status)) {
440                 *presult = result;
441         }
442
443         TALLOC_FREE(key.dptr);
444
445         return status;
446 }
447
448 /****************************************************************************
449  Receive and dispatch any messages pending for this process.
450  JRA changed Dec 13 2006. Only one message handler now permitted per type.
451  *NOTE*: Dispatch functions must be able to cope with incoming
452  messages on an *odd* byte boundary.
453 ****************************************************************************/
454
455 static void message_dispatch(struct messaging_context *msg_ctx)
456 {
457         struct messaging_tdb_context *ctx = talloc_get_type(msg_ctx->local->private_data,
458                                             struct messaging_tdb_context);
459         struct messaging_array *msg_array = NULL;
460         struct tdb_wrap *tdb = ctx->tdb;
461         NTSTATUS status;
462         uint32 i;
463
464         if (ctx->received_messages == 0) {
465                 return;
466         }
467
468         DEBUG(10, ("message_dispatch: received_messages = %d\n",
469                    ctx->received_messages));
470
471         status = retrieve_all_messages(tdb->tdb, NULL, &msg_array);
472         if (!NT_STATUS_IS_OK(status)) {
473                 DEBUG(0, ("message_dispatch: failed to retrieve messages: %s\n",
474                            nt_errstr(status)));
475                 return;
476         }
477
478         ctx->received_messages = 0;
479
480         for (i=0; i<msg_array->num_messages; i++) {
481                 messaging_dispatch_rec(msg_ctx, &msg_array->messages[i]);
482         }
483
484         TALLOC_FREE(msg_array);
485 }
486
487 /** @} **/