2 MAPI Proxy - Cache module
6 Copyright (C) Julien Kerihuel 2008-2011
8 This program is free software; you can redistribute it and/or modify
9 it under the terms of the GNU General Public License as published by
10 the Free Software Foundation; either version 3 of the License, or
11 (at your option) any later version.
13 This program is distributed in the hope that it will be useful,
14 but WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 GNU General Public License for more details.
18 You should have received a copy of the GNU General Public License
19 along with this program. If not, see <http://www.gnu.org/licenses/>.
25 \brief Cache messages and attachments so we can reduce WAN traffic
28 #include "libmapi/libmapi.h"
29 #include "libmapi/libmapi_private.h"
30 #include "mapiproxy/dcesrv_mapiproxy.h"
31 #include "mapiproxy/libmapiproxy/libmapiproxy.h"
32 #include "mapiproxy/modules/mpm_cache.h"
34 #include <sys/types.h>
39 struct mpm_cache *mpm = NULL;
42 \details Find the position of the given MAPI call in a serialized
45 If the request includes a Release call, then request and replies
46 indexes for other calls will mismatch.
48 \param opnum The MAPI opnum to seek
49 \param mapi_req Pointer to the MAPI request calls array
51 \return On success, returns the call position, otherwise -1.
53 static uint32_t cache_find_call_request_index(uint8_t opnum, struct EcDoRpc_MAPI_REQ *mapi_req)
57 for (i = 0; mapi_req[i].opnum; i++) {
58 if (mapi_req[i].opnum == opnum) {
67 \details Dump time statistic between OpenStream and Release
69 This function monitors the effective time required to open, read
72 \param stream the mpm_stream entry
74 static void cache_dump_stream_stat(struct mpm_stream *stream)
77 struct timeval tv_end;
83 mem_ctx = (TALLOC_CTX *)mpm;
85 if (stream->attachment) {
86 name = talloc_asprintf(mem_ctx, "0x%"PRIx64"/0x%"PRIx64"/%d",
87 stream->attachment->message->FolderId,
88 stream->attachment->message->MessageId,
89 stream->attachment->AttachmentID);
90 } else if (stream->message) {
91 name = talloc_asprintf(mem_ctx, "0x%"PRIx64"/0x%"PRIx64,
92 stream->message->FolderId,
93 stream->message->MessageId);
98 gettimeofday(&tv_end, NULL);
99 sec = tv_end.tv_sec - stream->tv_start.tv_sec;
100 if ((tv_end.tv_usec - stream->tv_start.tv_usec) < 0) {
102 usec = tv_end.tv_usec + stream->tv_start.tv_usec;
103 while (usec > 1000000) {
108 usec = tv_end.tv_usec - stream->tv_start.tv_usec;
111 if (stream->ahead == true) {
112 stage = "[read ahead]";
113 } else if ((stream->ahead == false) && (stream->cached == true)) {
114 stage = "[cached mode]";
116 stage = "[non cached]";
119 DEBUG(1, ("STATISTIC: %-20s %s The difference is %ld seconds %ld microseconds\n",
120 stage, name, (long int)sec, (long int)usec));
128 1. close the existing FILE *
129 2. build complete file path
130 3. replace __FILE__ arguments with complete file path
132 5. stat the sync'd file
133 6. open the stream again
134 7. mark the file as cached
136 \param stream pointer on the mpm_stream entry
138 static NTSTATUS cache_exec_sync_cmd(struct mpm_stream *stream)
147 mpm_cache_stream_close(stream);
149 for (i = 0; mpm->sync_cmd[i]; i++);
151 args = talloc_array((TALLOC_CTX *)mpm, char *, i + 1);
153 for (i = 0; mpm->sync_cmd[i]; i++){
154 if (strstr(mpm->sync_cmd[i], "__FILE__")) {
155 args[i] = string_sub_talloc((TALLOC_CTX *)args, mpm->sync_cmd[i], "__FILE__", stream->filename);
157 args[i] = talloc_strdup((TALLOC_CTX *)args, mpm->sync_cmd[i]);
162 for (i = 0; args[i]; i++){
163 DEBUG(0, ("'%s' ", args[i]));
167 switch(pid = fork()) {
169 DEBUG(0, ("Failed to fork\n"));
172 ret = execve(args[0], args, NULL);
181 return NT_STATUS_INVALID_PARAMETER;
184 ret = stat(stream->filename, &sb);
187 return NT_STATUS_INVALID_PARAMETER;
190 if (sb.st_size != stream->StreamSize) {
191 DEBUG(0, ("Sync'd file size is 0x%x and 0x%x was expected\n",
192 (uint32_t)sb.st_size, stream->StreamSize));
193 return NT_STATUS_INVALID_PARAMETER;
196 mpm_cache_stream_open(mpm, stream);
197 stream->cached = true;
204 \details Track down Release calls and update the mpm_cache global
205 list - removing associated entries.
207 This function recursively remove child entries whenever necessary.
209 \param dce_call pointer to the session context
210 \param EcDoRpc pointer to the EcDoRpc operation
211 \param handle_idx the handle to track down
215 static NTSTATUS cache_pull_Release(struct dcesrv_call_state *dce_call,
216 struct EcDoRpc *EcDoRpc,
219 struct mpm_message *message;
220 struct mpm_attachment *attach;
221 struct mpm_stream *stream;
223 /* Look over messages */
224 for (message = mpm->messages; message; message = message->next) {
225 if ((mpm_session_cmp(message->session, dce_call) == true) &&
226 (EcDoRpc->in.mapi_request->handles[handle_idx] == message->handle)) {
227 DEBUG(2, ("* [%s:%d] [s(0x%"PRIx64"-0x%x-0x%x),c(0x%x)] Del: Message 0x%"PRIx64" 0x%"PRIx64": 0x%x\n",
228 MPM_LOCATION, MPM_SESSION(message), message->FolderId,
229 message->MessageId, message->handle));
231 /* Loop over children attachments */
232 attach = mpm->attachments;
234 if ((mpm_session_cmp(attach->session, dce_call) == true) &&
235 (message->handle == attach->parent_handle)) {
236 DEBUG(2, ("* [%s:%d] [s(0x%"PRIx64"-0x%x-0x%x),c(0x%x)] Del recursive 1: Attachment %d: 0x%x\n", MPM_LOCATION,
237 MPM_SESSION(attach), attach->AttachmentID, attach->handle));
239 /* Loop over children streams */
240 stream = mpm->streams;
242 if ((mpm_session_cmp(stream->session, dce_call) == true) &&
243 (attach->handle == stream->parent_handle)) {
244 DEBUG(2, ("* [%s:%d] [s(0x%"PRIx64"-0x%x-0x%x),c(0x%x)] Del recursive 1-2: Stream 0x%x\n",
245 MPM_LOCATION, MPM_SESSION(stream), stream->handle));
246 mpm_session_release(stream->session);
247 mpm_cache_stream_close(stream);
248 talloc_free(stream->filename);
249 DLIST_REMOVE(mpm->streams, stream);
251 stream = mpm->streams;
253 stream = stream->next;
257 mpm_session_release(attach->session);
258 DLIST_REMOVE(mpm->attachments, attach);
260 attach = mpm->attachments;
262 attach = attach->next;
266 /* Look over children streams */
267 stream = mpm->streams;
269 if ((mpm_session_cmp(stream->session, dce_call) == true) &&
270 (message->handle == stream->parent_handle)) {
271 DEBUG(2, ("* [%s:%d] [s(0x%"PRIx64"-0x%x-0x%x),c(0x%x)] Del recursive 1: Stream 0x%x\n",
272 MPM_LOCATION, MPM_SESSION(stream), stream->handle));
273 mpm_session_release(stream->session);
274 mpm_cache_stream_close(stream);
275 DLIST_REMOVE(mpm->streams, stream);
276 talloc_free(stream->filename);
278 stream = mpm->streams;
280 stream = stream->next;
284 mpm_session_release(message->session);
285 DLIST_REMOVE(mpm->messages, message);
286 talloc_free(message);
291 /* Look over attachments */
292 for (attach = mpm->attachments; attach; attach = attach->next) {
293 if ((mpm_session_cmp(attach->session, dce_call) == true) &&
294 (EcDoRpc->in.mapi_request->handles[handle_idx] == attach->handle)) {
295 DEBUG(2, ("* [%s:%d] [s(0x%"PRIx64"-0x%x-0x%x),c(0x%x)] Del: Attachment %d: 0x%x\n", MPM_LOCATION,
296 MPM_SESSION(attach), attach->AttachmentID, attach->handle));
299 /* Loop over children streams */
300 stream = mpm->streams;
302 if ((mpm_session_cmp(stream->session, dce_call) == true) &&
303 (attach->handle == stream->parent_handle)) {
304 DEBUG(2, ("* [%s:%d] [s(0x%"PRIx64"-0x%x-0x%x),c(0x%x)] Del recursive 2: Stream 0x%x\n",
305 MPM_LOCATION, MPM_SESSION(stream), stream->handle));
306 mpm_session_release(stream->session);
307 mpm_cache_stream_close(stream);
308 DLIST_REMOVE(mpm->streams, stream);
309 talloc_free(stream->filename);
311 stream = mpm->streams;
313 stream = stream->next;
317 mpm_session_release(attach->session);
318 DLIST_REMOVE(mpm->attachments, attach);
324 /* Look over streams */
325 for (stream = mpm->streams; stream; stream = stream->next) {
326 if ((mpm_session_cmp(stream->session, dce_call) == true) &&
327 (EcDoRpc->in.mapi_request->handles[handle_idx] == stream->handle)) {
328 DEBUG(2, ("* [%s:%d] [s(0x%"PRIx64"-0x%x-0x%x),c(0x%x)] Del: Stream 0x%x\n", MPM_LOCATION,
329 MPM_SESSION(stream), stream->handle));
330 mpm_session_release(stream->session);
331 mpm_cache_stream_close(stream);
332 DLIST_REMOVE(mpm->streams, stream);
333 talloc_free(stream->filename);
344 \details Monitor OpenMessage requests and register a message in the
347 This is the first step for message registration:
348 * set Folder ID and Message ID
349 * set the handle to 0xFFFFFFFF
350 * Insert the message to the list
352 \param dce_call pointer to the session context
353 \param mem_ctx the memory context
354 \param request reference to the OpenMessage request
356 \return NT_STATUS_OK on success
358 static NTSTATUS cache_pull_OpenMessage(struct dcesrv_call_state *dce_call,
360 struct OpenMessage_req request)
362 struct mpm_message *message;
364 /* Check if the message has already been registered */
365 for (message = mpm->messages; message; message = message->next) {
366 if ((mpm_session_cmp(message->session, dce_call) == true) &&
367 (request.FolderId == message->FolderId) &&
368 (request.MessageId == message->MessageId)) {
369 DLIST_REMOVE(mpm->messages, message);
373 message = talloc((TALLOC_CTX *)mpm, struct mpm_message);
374 NT_STATUS_HAVE_NO_MEMORY(message);
376 message->session = mpm_session_init((TALLOC_CTX *)mpm, dce_call);
377 NT_STATUS_HAVE_NO_MEMORY(message->session);
379 message->FolderId = request.FolderId;
380 message->MessageId = request.MessageId;
381 message->handle = 0xFFFFFFFF;
383 DLIST_ADD_END(mpm->messages, message, struct mpm_message *);
390 \details Monitor OpenMessage replies and store OpenMessage MAPI
393 This is the second step for message registration:
395 * Seek for a given FolderId/MessageId in the mpm_message list
397 * If a match is found (expected) and MAPI retval is set to
398 MAPI_E_SUCCESS, update the handle field for the element and
399 commit this message in the tdb store.
401 * If retval is different from MAPI_E_SUCCESS, then delete the
405 \param dce_call pointer to the session context
406 \param mapi_req reference to the OpenMessage MAPI request entry
407 \param mapi_repl reference to the OpenMessage MAPI response entry
408 \param EcDoRpc pointer to the current EcDoRpc operation
412 static NTSTATUS cache_push_OpenMessage(struct dcesrv_call_state *dce_call,
413 struct EcDoRpc_MAPI_REQ mapi_req,
414 struct EcDoRpc_MAPI_REPL mapi_repl,
415 struct EcDoRpc *EcDoRpc)
417 struct mpm_message *el;
418 struct mapi_response *mapi_response;
419 struct OpenMessage_req request;
421 request = mapi_req.u.mapi_OpenMessage;
423 mapi_response = EcDoRpc->out.mapi_response;
425 for (el = mpm->messages; el; el = el->next) {
426 if ((el->FolderId == request.FolderId) && (el->MessageId == request.MessageId) &&
427 (mpm_session_cmp(el->session, dce_call) == true)) {
428 if (mapi_repl.error_code == MAPI_E_SUCCESS) {
429 mpm_cache_ldb_add_message((TALLOC_CTX *)mpm, mpm->ldb_ctx, el);
430 el->handle = mapi_response->handles[request.handle_idx];
431 DEBUG(2, ("* [%s:%d] [s(0x%"PRIx64"-0x%x-0x%x),c(0x%x)] Add: Message 0x%"PRIx64" 0x%"PRIx64" 0x%x\n",
432 MPM_LOCATION, MPM_SESSION(el), el->FolderId, el->MessageId, el->handle));
434 DEBUG(0, ("* [%s:%d] [s(0x%"PRIx64"-0x%x-0x%x),c(0x%x)] Del: Message OpenMessage returned %s\n",
435 MPM_LOCATION, MPM_SESSION(el), mapi_get_errstr(mapi_repl.error_code)));
436 DLIST_REMOVE(mpm->messages, el);
446 \details Monitor OpenAttach requests and register an attachment in
447 the mpm_messages list.
449 This is the first step for attachment registration. This
450 function first ensures the attachment is not already registered,
451 otherwise delete it. It next creates the attachment entry in the
452 global mpm_message structure.
454 \param dce_call pointer to the session context
455 \param mem_ctx the memory context
456 \param mapi_req reference to the OpenAttach EcDoRpc_MAPI_REQ entry
457 \param EcDoRpc pointer to the EcDoRpc operation
459 \return NT_STATUS_OK on success, otherwise NT_STATUS_NO_MEMORY
461 static NTSTATUS cache_pull_OpenAttach(struct dcesrv_call_state *dce_call,
463 struct EcDoRpc_MAPI_REQ mapi_req,
464 struct EcDoRpc *EcDoRpc)
466 struct mpm_message *el;
467 struct mpm_attachment *attach;
468 struct mapi_request *mapi_request;
469 struct OpenAttach_req request;
471 mapi_request = EcDoRpc->in.mapi_request;
472 request = mapi_req.u.mapi_OpenAttach;
474 for (attach = mpm->attachments; attach; attach = attach->next) {
475 /* Check if the attachment has already been registered */
476 if ((mpm_session_cmp(attach->session, dce_call) == true) &&
477 (mapi_request->handles[mapi_req.handle_idx] == attach->parent_handle) && (request.AttachmentID == attach->AttachmentID)) {
478 DLIST_REMOVE(mpm->attachments, attach);
482 attach = talloc((TALLOC_CTX *)mpm, struct mpm_attachment);
483 NT_STATUS_HAVE_NO_MEMORY(attach);
485 attach->session = mpm_session_init((TALLOC_CTX *)mpm, dce_call);
486 NT_STATUS_HAVE_NO_MEMORY(attach->session);
488 attach->AttachmentID = request.AttachmentID;
489 attach->parent_handle = mapi_request->handles[mapi_req.handle_idx];
490 attach->handle = 0xFFFFFFFF;
492 for (el = mpm->messages; el; el = el->next) {
493 if ((mpm_session_cmp(el->session, dce_call) == true) && attach->parent_handle == el->handle) {
494 attach->message = el;
499 DEBUG(2, ("* [%s:%d] [s(0x%"PRIx64"-0x%x-0x%x),c(0x%x)] Add [1]: Attachment %d parent handle (0x%x) 0x%"PRIx64", 0x%"PRIx64" added to the list\n",
500 MPM_LOCATION, MPM_SESSION(attach), request.AttachmentID, attach->parent_handle,
501 attach->message->FolderId, attach->message->MessageId));
502 DLIST_ADD_END(mpm->attachments, attach, struct mpm_attachment *);
509 \details Monitor OpenAttach replies and store OpenAttach MAPI
512 This is the second step for attachment registration:
514 * Seek for a given parent_handle/attachmentID in the
515 mpm_attachments list.
517 * if a match is found (expected) and MAPI retval is set to
518 MAPI_E_SUCCESS, update the handle parameter for the element and
519 commit this attachment in the tdb store.
521 * If retval is different from MAPI_E_SUCCESS, then delete the
524 \param dce_call pointer to the session context
525 \param mapi_req reference to the OpenAttach request entry
526 \param mapi_repl reference to the OpenAttach MAPI response entry
527 \param EcDoRpc pointer to the current EcDoRpc operation
532 static NTSTATUS cache_push_OpenAttach(struct dcesrv_call_state *dce_call,
533 struct EcDoRpc_MAPI_REQ mapi_req,
534 struct EcDoRpc_MAPI_REPL mapi_repl,
535 struct EcDoRpc *EcDoRpc)
537 struct mpm_attachment *el;
538 struct mapi_request *mapi_request;
539 struct mapi_response *mapi_response;
540 struct OpenAttach_req request;
542 mapi_request = EcDoRpc->in.mapi_request;
543 mapi_response = EcDoRpc->out.mapi_response;
544 request = mapi_req.u.mapi_OpenAttach;
546 for (el = mpm->attachments; el; el = el->next) {
547 if ((mpm_session_cmp(el->session, dce_call) == true) &&
548 (mapi_request->handles[mapi_req.handle_idx] == el->parent_handle) &&
549 (request.AttachmentID == el->AttachmentID)) {
550 if (mapi_repl.error_code == MAPI_E_SUCCESS) {
551 el->handle = mapi_response->handles[request.handle_idx];
552 DEBUG(2, ("* [%s:%d] [s(0x%"PRIx64"-0x%x-0x%x),c(0x%x)] Add [2]: Attachment %d with handle 0x%x and parent handle 0x%x\n",
553 MPM_LOCATION, MPM_SESSION(el), el->AttachmentID, el->handle, el->parent_handle));
554 mpm_cache_ldb_add_attachment((TALLOC_CTX *)mpm, mpm->ldb_ctx, el);
556 DEBUG(0, ("* [%s:%d] [s(0x%"PRIx64"-0x%x-0x%x),c(0x%x)] Del: Attachment OpenAttach returned %s\n",
557 MPM_LOCATION, MPM_SESSION(el), mapi_get_errstr(mapi_repl.error_code)));
558 DLIST_REMOVE(mpm->attachments, el);
569 \details Monitor OpenStream requests and register a stream in the
572 We are only interested in monitoring streams related to attachments
573 or messages. This is the first step for stream registration:
575 * Look whether this stream inherits from a message or attachment
576 * Fill the stream element according to previous statement
577 * Add it to the mpm_stream list
579 \param dce_call pointer to the session context
580 \param mem_ctx the memory context
581 \param mapi_req reference to the OpenStream MAPI request
582 \param EcDoRpc pointer to the current EcDoRpc operation
584 \return NT_STATUS_OK on success, otherwise NT_STATUS_NO_MEMORY
586 static NTSTATUS cache_pull_OpenStream(struct dcesrv_call_state *dce_call,
588 struct EcDoRpc_MAPI_REQ mapi_req,
589 struct EcDoRpc *EcDoRpc)
591 struct mpm_stream *stream;
592 struct mpm_attachment *attach;
593 struct mpm_message *message;
594 struct mapi_request *mapi_request;
595 struct OpenStream_req request;
597 mapi_request = EcDoRpc->in.mapi_request;
598 request = mapi_req.u.mapi_OpenStream;
600 for (attach = mpm->attachments; attach; attach = attach->next) {
601 if ((mpm_session_cmp(attach->session, dce_call) == true) &&
602 mapi_request->handles[mapi_req.handle_idx] == attach->handle) {
603 stream = talloc((TALLOC_CTX *)mpm, struct mpm_stream);
604 NT_STATUS_HAVE_NO_MEMORY(stream);
606 stream->session = mpm_session_init((TALLOC_CTX *)mpm, dce_call);
607 NT_STATUS_HAVE_NO_MEMORY(stream->session);
609 stream->handle = 0xFFFFFFFF;
610 stream->parent_handle = attach->handle;
611 stream->PropertyTag = request.PropertyTag;
612 stream->StreamSize = 0;
613 stream->filename = NULL;
614 stream->attachment = attach;
615 stream->cached = false;
616 stream->message = NULL;
617 stream->ahead = (mpm->ahead == true) ? true : false;
618 gettimeofday(&stream->tv_start, NULL);
619 DEBUG(2, ("* [%s:%d] [s(0x%"PRIx64"-0x%x-0x%x),c(0x%x)] Stream::attachment added 0x%x 0x%"PRIx64" 0x%"PRIx64"\n",
620 MPM_LOCATION, MPM_SESSION(stream), stream->parent_handle,
621 stream->attachment->message->FolderId, stream->attachment->message->MessageId));
622 DLIST_ADD_END(mpm->streams, stream, struct mpm_stream *);
627 for (message = mpm->messages; message; message = message->next) {
628 if ((mpm_session_cmp(message->session, dce_call) == true) &&
629 mapi_request->handles[mapi_req.handle_idx] == message->handle) {
630 stream = talloc((TALLOC_CTX *)mpm, struct mpm_stream);
631 NT_STATUS_HAVE_NO_MEMORY(stream);
633 stream->session = mpm_session_init((TALLOC_CTX *)mpm, dce_call);
634 NT_STATUS_HAVE_NO_MEMORY(stream->session);
636 stream->handle = 0xFFFFFFFF;
637 stream->parent_handle = message->handle;
638 stream->PropertyTag = request.PropertyTag;
639 stream->StreamSize = 0;
640 stream->filename = NULL;
641 stream->attachment = NULL;
642 stream->cached = false;
643 stream->ahead = (mpm->ahead == true) ? true : false;
644 gettimeofday(&stream->tv_start, NULL);
645 DEBUG(2, ("* [%s:%d] [s(0x%"PRIx64"-0x%x-0x%x),c(0x%x)] Stream::message added 0x%x\n",
646 MPM_LOCATION, MPM_SESSION(stream), stream->parent_handle));
647 stream->message = message;
648 DLIST_ADD_END(mpm->streams, stream, struct mpm_stream *);
653 DEBUG(1, ("* [%s:%d] Stream: Not related to any attachment or message ?!?\n",
660 \details Monitor OpenStream replies and store the OpenStream MAPI
663 This is the second step for stream registration:
665 * Seek the parent_handle/PropertyTag couple in the mpm_streams
668 * If a match is found (expected) and MAPI retval is set to
669 MAPI_E_SUCCESS, update the handle field and StreamSize parameters
670 for the element and commit this stream in the tdb store.
672 * If retval is different from MAPI_E_SUCCESS, then delete the
675 \param dce_call pointer to the session context
676 \param mapi_req reference to the OpenStream MAPI request entry
677 \param mapi_repl reference to the OpenStream MAPI response entry
678 \param EcDoRpc pointer to the current EcDoRpc operation
682 static NTSTATUS cache_push_OpenStream(struct dcesrv_call_state *dce_call,
683 struct EcDoRpc_MAPI_REQ mapi_req,
684 struct EcDoRpc_MAPI_REPL mapi_repl,
685 struct EcDoRpc *EcDoRpc)
687 struct mpm_stream *el;
688 struct mapi_request *mapi_request;
689 struct mapi_response *mapi_response;
690 struct OpenStream_req request;
691 struct OpenStream_repl response;
693 mapi_request = EcDoRpc->in.mapi_request;
694 mapi_response = EcDoRpc->out.mapi_response;
695 request = mapi_req.u.mapi_OpenStream;
696 response = mapi_repl.u.mapi_OpenStream;
698 for (el = mpm->streams; el; el = el->next) {
699 if ((mpm_session_cmp(el->session, dce_call) == true) &&
700 (mapi_request->handles[mapi_req.handle_idx] == el->parent_handle)) {
701 if (request.PropertyTag == el->PropertyTag) {
702 if (mapi_repl.error_code == MAPI_E_SUCCESS) {
703 el->handle = mapi_response->handles[request.handle_idx];
704 el->StreamSize = response.StreamSize;
705 DEBUG(2, ("* [%s:%d] [s(0x%"PRIx64"-0x%x-0x%x),c(0x%x)] Add [2]: Stream for Property Tag 0x%x, handle 0x%x and size = %d\n",
706 MPM_LOCATION, MPM_SESSION(el), el->PropertyTag, el->handle, el->StreamSize));
707 mpm_cache_ldb_add_stream(mpm, mpm->ldb_ctx, el);
709 DEBUG(0, ("* [%s:%d] [s(0x%"PRIx64"-0x%x-0x%x),c(0x%x)] Del: Stream OpenStream returned %s\n",
710 MPM_LOCATION, MPM_SESSION(el), mapi_get_errstr(mapi_repl.error_code)));
711 DLIST_REMOVE(mpm->streams, el);
723 \details Monitor ReadStream replies.
725 This function writes ReadStream data received from remote server
726 and associated to messages or attachments to the opened associated
727 file. This function only writes data if the file is not already
728 cached, otherwise it just returns.
730 \param dce_call pointer to the session context
731 \param mapi_req reference to the ReadStream MAPI request
732 \param mapi_repl reference to the ReadStream MAPI reply
733 \param EcDoRpc pointer to the current EcDoRpc operation
739 static NTSTATUS cache_push_ReadStream(struct dcesrv_call_state *dce_call,
740 struct EcDoRpc_MAPI_REQ mapi_req,
741 struct EcDoRpc_MAPI_REPL mapi_repl,
742 struct EcDoRpc *EcDoRpc)
744 struct mpm_stream *stream;
745 struct mapi_response *mapi_response;
746 struct ReadStream_repl response;
747 struct ReadStream_req request;
749 mapi_response = EcDoRpc->out.mapi_response;
750 response = mapi_repl.u.mapi_ReadStream;
751 request = mapi_req.u.mapi_ReadStream;
753 /* Check if the handle is registered */
754 for (stream = mpm->streams; stream; stream = stream->next) {
755 if ((mpm_session_cmp(stream->session, dce_call) == true) &&
756 mapi_response->handles[mapi_repl.handle_idx] == stream->handle) {
757 if (stream->fp && stream->cached == false) {
758 if (mpm->sync == true && stream->StreamSize > mpm->sync_min) {
759 cache_exec_sync_cmd(stream);
761 DEBUG(5, ("* [%s:%d] [s(0x%"PRIx64"-0x%x-0x%x),c(0x%x)] %zd bytes from remove server\n",
762 MPM_LOCATION, MPM_SESSION(stream), response.data.length));
763 mpm_cache_stream_write(stream, response.data.length, response.data.data);
764 if (stream->offset == stream->StreamSize) {
765 if (response.data.length) {
766 cache_dump_stream_stat(stream);
770 } else if (stream->cached == true) {
771 /* This is managed by the dispatch routine */
781 \details Analyze EcDoRpc MAPI requests
783 This function loops over EcDoRpc MAPI calls and search for the
784 opnums required by the cache module to monitor Stream traffic
787 \param dce_call the session context
788 \param mem_ctx the memory context
789 \param r generic pointer on EcDoRpc operation
793 static NTSTATUS cache_pull(struct dcesrv_call_state *dce_call, TALLOC_CTX *mem_ctx, void *r)
795 struct EcDoRpc *EcDoRpc;
796 struct EcDoRpc_MAPI_REQ *mapi_req;
799 if (dce_call->pkt.u.request.opnum != 0x2) {
803 EcDoRpc = (struct EcDoRpc *) r;
804 if (!EcDoRpc) return NT_STATUS_OK;
805 if (!&(EcDoRpc->in)) return NT_STATUS_OK;
806 if (!EcDoRpc->in.mapi_request) return NT_STATUS_OK;
807 if (!EcDoRpc->in.mapi_request->mapi_req) return NT_STATUS_OK;
809 /* If this is an idle request, do not go further */
810 if (EcDoRpc->in.mapi_request->length == 2) {
814 mapi_req = EcDoRpc->in.mapi_request->mapi_req;
815 for (i = 0; mapi_req[i].opnum; i++) {
816 switch (mapi_req[i].opnum) {
817 case op_MAPI_OpenMessage:
818 cache_pull_OpenMessage(dce_call, (TALLOC_CTX *)mpm, mapi_req[i].u.mapi_OpenMessage);
820 case op_MAPI_OpenAttach:
821 cache_pull_OpenAttach(dce_call, (TALLOC_CTX *)mpm, mapi_req[i], EcDoRpc);
823 case op_MAPI_OpenStream:
824 cache_pull_OpenStream(dce_call, (TALLOC_CTX *)mpm, mapi_req[i], EcDoRpc);
826 case op_MAPI_Release:
827 cache_pull_Release(dce_call, EcDoRpc, mapi_req[i].handle_idx);
837 \details Analyze EcDoRpc MAPI responses
839 This function loops over EcDoRpc MAPI calls and search for the
840 opnums required by the cache module to monitor Stream traffic
843 \param dce_call pointer to the session context
844 \param mem_ctx the memory context
845 \param r generic pointer on EcDoRpc operation
849 static NTSTATUS cache_push(struct dcesrv_call_state *dce_call, TALLOC_CTX *mem_ctx, void *r)
851 struct EcDoRpc *EcDoRpc;
852 struct EcDoRpc_MAPI_REPL *mapi_repl;
853 struct EcDoRpc_MAPI_REQ *mapi_req;
857 if (dce_call->pkt.u.request.opnum != 0x2) {
861 EcDoRpc = (struct EcDoRpc *) r;
862 if (!EcDoRpc) return NT_STATUS_OK;
863 if (!&(EcDoRpc->out)) return NT_STATUS_OK;
864 if (!EcDoRpc->out.mapi_response) return NT_STATUS_OK;
865 if (!EcDoRpc->out.mapi_response->mapi_repl) return NT_STATUS_OK;
867 /* If this is an idle request, do not got further */
868 if (EcDoRpc->out.mapi_response->length == 2) {
872 mapi_repl = EcDoRpc->out.mapi_response->mapi_repl;
873 mapi_req = EcDoRpc->in.mapi_request->mapi_req;
875 for (i = 0; mapi_repl[i].opnum; i++) {
876 switch (mapi_repl[i].opnum) {
877 case op_MAPI_OpenMessage:
878 index = cache_find_call_request_index(op_MAPI_OpenMessage, mapi_req);
879 if (index == -1) break;
880 cache_push_OpenMessage(dce_call, mapi_req[index], mapi_repl[i], EcDoRpc);
882 case op_MAPI_OpenAttach:
883 index = cache_find_call_request_index(op_MAPI_OpenAttach, mapi_req);
884 if (index == -1) break;
885 cache_push_OpenAttach(dce_call, mapi_req[index], mapi_repl[i], EcDoRpc);
887 case op_MAPI_OpenStream:
888 index = cache_find_call_request_index(op_MAPI_OpenStream, mapi_req);
889 if (index == -1) break;
890 cache_push_OpenStream(dce_call, mapi_req[index], mapi_repl[i], EcDoRpc);
892 case op_MAPI_ReadStream:
893 index = cache_find_call_request_index(op_MAPI_ReadStream, mapi_req);
894 if (index == -1) break;
895 cache_push_ReadStream(dce_call, mapi_req[index], mapi_repl[i], EcDoRpc);
907 \details Dispatch function.
909 This function avoids calling dcerpc_ndr_request - understand
910 forwarding client request to remove server - when the client is
911 reading a message/attachment stream available in the cache.
913 This function can also be used to loop over dcerpc_ndr_request and
914 perform a read-ahead operation.
916 \param dce_call the session context
917 \param mem_ctx the memory context
918 \param r pointer on EcDoRpc operation
919 \param mapiproxy pointer to a mapiproxy structure controlling
924 static NTSTATUS cache_dispatch(struct dcesrv_call_state *dce_call, TALLOC_CTX *mem_ctx,
925 void *r, struct mapiproxy *mapiproxy)
927 struct EcDoRpc *EcDoRpc;
928 struct mapi_request *mapi_request;
929 struct mapi_response *mapi_response;
930 struct EcDoRpc_MAPI_REQ *mapi_req;
931 struct mpm_stream *stream;
935 if (dce_call->pkt.u.request.opnum != 0x2) {
939 EcDoRpc = (struct EcDoRpc *) r;
940 if (!EcDoRpc->in.mapi_request->mapi_req) return NT_STATUS_OK;
942 /* If this is an idle request, do not got further */
943 if (EcDoRpc->in.mapi_request->length == 2) {
947 mapi_request = EcDoRpc->in.mapi_request;
948 mapi_response = EcDoRpc->out.mapi_response;
949 mapi_req = mapi_request->mapi_req;
951 for (count = 0, i = 0; mapi_req[i].opnum; i++) {
952 switch (mapi_req[i].opnum) {
953 case op_MAPI_ReadStream:
959 /* If we have more than count cached calls, forward to Exchange */
960 if (i > count) return NT_STATUS_OK;
962 for (i = 0; mapi_req[i].opnum; i++) {
963 switch (mapi_req[i].opnum) {
964 case op_MAPI_ReadStream:
966 struct ReadStream_req request;
968 request = mapi_req[i].u.mapi_ReadStream;
969 for (stream = mpm->streams; stream; stream = stream->next) {
970 if ((mpm_session_cmp(stream->session, dce_call) == true) &&
971 (mapi_request->handles[mapi_req[i].handle_idx] == stream->handle)) {
972 if (stream->cached == true) {
974 mapiproxy->norelay = true;
975 mapiproxy->ahead = false;
976 /* Create a fake ReadStream reply */
977 mapi_response->mapi_repl = talloc_array(mem_ctx, struct EcDoRpc_MAPI_REPL, i + 2);
978 mapi_response->mapi_repl[i].opnum = op_MAPI_ReadStream;
979 mapi_response->mapi_repl[i].handle_idx = mapi_req[i].handle_idx;
980 mapi_response->mapi_repl[i].error_code = MAPI_E_SUCCESS;
981 mapi_response->mapi_repl[i].u.mapi_ReadStream.data.length = 0;
982 mapi_response->mapi_repl[i].u.mapi_ReadStream.data.data = (uint8_t *) talloc_size(mem_ctx, request.ByteCount);
983 mpm_cache_stream_read(stream, (size_t) request.ByteCount,
984 &mapi_response->mapi_repl[i].u.mapi_ReadStream.data.length,
985 &mapi_response->mapi_repl[i].u.mapi_ReadStream.data.data);
986 if (stream->offset == stream->StreamSize) {
987 if (mapi_response->mapi_repl[i].u.mapi_ReadStream.data.length) {
988 cache_dump_stream_stat(stream);
991 DEBUG(5, ("* [%s:%d] %zd bytes read from cache\n", MPM_LOCATION,
992 mapi_response->mapi_repl[i].u.mapi_ReadStream.data.length));
993 mapi_response->handles = talloc_array(mem_ctx, uint32_t, 1);
994 mapi_response->handles[0] = stream->handle;
995 mapi_response->mapi_len = 0xE + mapi_response->mapi_repl[i].u.mapi_ReadStream.data.length;
996 mapi_response->length = mapi_response->mapi_len - 4;
997 *EcDoRpc->out.length = mapi_response->mapi_len;
998 EcDoRpc->out.size = EcDoRpc->in.size;
1000 } else if ((stream->cached == false) && (stream->ahead == true)) {
1001 if (mapiproxy->ahead == true) {
1002 mpm_cache_stream_write(stream,
1003 mapi_response->mapi_repl[i].u.mapi_ReadStream.data.length,
1004 mapi_response->mapi_repl[i].u.mapi_ReadStream.data.data);
1005 /* When read ahead is over */
1006 if (stream->offset == stream->StreamSize) {
1007 cache_dump_stream_stat(stream);
1008 mpm_cache_stream_reset(stream);
1009 stream->cached = true;
1010 stream->ahead = false;
1014 mapiproxy->ahead = true;
1024 return NT_STATUS_OK;
1031 static NTSTATUS cache_unbind(struct server_id server_id, uint32_t context_id)
1033 struct mpm_message *message;
1034 struct mpm_attachment *attach;
1035 struct mpm_stream *stream;
1037 /* Look over messages still attached to the session */
1038 message = mpm->messages;
1040 if ((mpm_session_cmp_sub(message->session, server_id, context_id) == true)) {
1041 DEBUG(2, ("[%s:%d]: [s(0x%"PRIx64"-0x%x-0x%x),c(0x%x)] Message - 0x%"PRIx64"/0x%"PRIx64" handle(0x%x)\n",
1042 MPM_LOCATION, MPM_SESSION(message), message->FolderId, message->MessageId,
1044 mpm_session_release(message->session);
1045 DLIST_REMOVE(mpm->messages, message);
1046 talloc_free(message);
1047 message = mpm->messages;
1049 message = message->next;
1053 /* Look over attachments still attached to the session */
1054 attach = mpm->attachments;
1056 if ((mpm_session_cmp_sub(attach->session, server_id, context_id) == true)) {
1057 DEBUG(2, ("[%s:%d]: [s(0x%"PRIx64"-0x%x-0x%x),c(0x%x)] Attachment - AttachmentID(0x%x) handle(0x%x)\n",
1058 MPM_LOCATION, MPM_SESSION(attach), attach->AttachmentID, attach->handle));
1059 mpm_session_release(attach->session);
1060 DLIST_REMOVE(mpm->attachments, attach);
1061 talloc_free(attach);
1062 attach = mpm->attachments;
1064 attach = attach->next;
1068 stream = mpm->streams;
1070 if ((mpm_session_cmp_sub(stream->session, server_id, context_id) == true)) {
1071 DEBUG(2, ("[%s:%d]: [s(0x%"PRIx64"-0x%x-0x%x),c(0x%x)] Stream - handle(0x%x)\n", MPM_LOCATION,
1072 MPM_SESSION(stream), stream->handle));
1073 mpm_session_release(stream->session);
1074 mpm_cache_stream_close(stream);
1075 talloc_free(stream->filename);
1076 DLIST_REMOVE(mpm->streams, stream);
1077 talloc_free(stream);
1078 stream = mpm->streams;
1080 stream = stream->next;
1084 return NT_STATUS_OK;
1089 \details Initialize the cache module and retrieve configuration from
1092 Possible smb.conf parameters:
1093 * mpm_cache:database
1095 \param dce_ctx the session context
1097 \return NT_STATUS_OK on success otherwise
1098 NT_STATUS_INVALID_PARAMETER, NT_STATUS_NO_MEMORY
1100 static NTSTATUS cache_init(struct dcesrv_context *dce_ctx)
1103 struct loadparm_context *lp_ctx;
1106 mpm = talloc_zero(dce_ctx, struct mpm_cache);
1107 if (!mpm) return NT_STATUS_NO_MEMORY;
1108 mpm->messages = NULL;
1109 mpm->attachments = NULL;
1110 mpm->streams = NULL;
1112 mpm->ahead = lpcfg_parm_bool(dce_ctx->lp_ctx, NULL, MPM_NAME, "ahead", false);
1113 mpm->sync = lpcfg_parm_bool(dce_ctx->lp_ctx, NULL, MPM_NAME, "sync", false);
1114 mpm->sync_min = lpcfg_parm_int(dce_ctx->lp_ctx, NULL, MPM_NAME, "sync_min", 500000);
1115 mpm->sync_cmd = str_list_make(dce_ctx, lpcfg_parm_string(dce_ctx->lp_ctx, NULL, MPM_NAME, "sync_cmd"), " ");
1116 mpm->dbpath = lpcfg_parm_string(dce_ctx->lp_ctx, NULL, MPM_NAME, "path");
1118 if ((mpm->ahead == true) && mpm->sync) {
1119 DEBUG(0, ("%s: cache:ahead and cache:sync are exclusive!\n", MPM_ERROR));
1121 return NT_STATUS_INVALID_PARAMETER;
1125 DEBUG(0, ("%s: Missing mpm_cache:path parameter\n", MPM_ERROR));
1127 return NT_STATUS_INVALID_PARAMETER;
1130 database = talloc_asprintf(dce_ctx->lp_ctx, "tdb://%s/%s", mpm->dbpath, MPM_DB);
1131 status = mpm_cache_ldb_createdb(dce_ctx, database, &mpm->ldb_ctx);
1132 if (!NT_STATUS_IS_OK(status)) {
1133 talloc_free(database);
1135 return NT_STATUS_NO_MEMORY;
1138 lp_ctx = loadparm_init(dce_ctx);
1139 lpcfg_load_default(lp_ctx);
1140 dcerpc_init(lp_ctx);
1142 talloc_free(database);
1144 return NT_STATUS_OK;
1149 \details Entry point for the cache mapiproxy module
1151 \return NT_STATUS_OK on success, otherwise NTSTATUS error
1153 NTSTATUS samba_init_module(void)
1155 struct mapiproxy_module module;
1158 /* Fill in our name */
1159 module.name = "cache";
1160 module.description = "Cache MAPI messages and attachments";
1161 module.endpoint = "exchange_emsmdb";
1163 /* Fill in all the operations */
1164 module.init = cache_init;
1165 module.unbind = cache_unbind;
1166 module.push = cache_push;
1167 module.ndr_pull = NULL;
1168 module.pull = cache_pull;
1169 module.dispatch = cache_dispatch;
1171 /* Register ourselves with the MAPIPROXY subsystem */
1172 ret = mapiproxy_module_register(&module);
1173 if (!NT_STATUS_IS_OK(ret)) {
1174 DEBUG(0, ("Failed to register the 'cache' mapiproxy module!\n"));