Merge 2610c05b5b95cc7036b3d6dfb894c6cfbdb68483 as Samba-4.0alpha16
[mat/samba.git] / source3 / rpc_client / rpc_transport_tstream.c
1 /*
2  *  Unix SMB/CIFS implementation.
3  *  RPC client transport over tstream
4  *  Copyright (C) Simo Sorce 2010
5  *
6  *  This program is free software; you can redistribute it and/or modify
7  *  it under the terms of the GNU General Public License as published by
8  *  the Free Software Foundation; either version 3 of the License, or
9  *  (at your option) any later version.
10  *
11  *  This program is distributed in the hope that it will be useful,
12  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
13  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14  *  GNU General Public License for more details.
15  *
16  *  You should have received a copy of the GNU General Public License
17  *  along with this program; if not, see <http://www.gnu.org/licenses/>.
18  */
19
20 #include "includes.h"
21 #include "../lib/util/tevent_ntstatus.h"
22 #include "rpc_client/rpc_transport.h"
23 #include "lib/tsocket/tsocket.h"
24 #include "libsmb/cli_np_tstream.h"
25 #include "cli_pipe.h"
26
27 #undef DBGC_CLASS
28 #define DBGC_CLASS DBGC_RPC_CLI
29
30 struct rpc_tstream_state {
31         struct tstream_context *stream;
32         struct tevent_queue *read_queue;
33         struct tevent_queue *write_queue;
34         unsigned int timeout;
35 };
36
37 static void rpc_tstream_disconnect(struct rpc_tstream_state *s)
38 {
39         TALLOC_FREE(s->stream);
40 }
41
42 static bool rpc_tstream_is_connected(void *priv)
43 {
44         struct rpc_tstream_state *transp =
45                 talloc_get_type_abort(priv, struct rpc_tstream_state);
46         ssize_t ret;
47
48         if (!transp->stream) {
49                 return false;
50         }
51
52         if (!tstream_is_cli_np(transp->stream)) {
53                 return true;
54         }
55
56         ret = tstream_pending_bytes(transp->stream);
57         if (ret == -1) {
58                 return false;
59         }
60
61         return true;
62 }
63
64 static unsigned int rpc_tstream_set_timeout(void *priv, unsigned int timeout)
65 {
66         struct rpc_tstream_state *transp =
67                 talloc_get_type_abort(priv, struct rpc_tstream_state);
68         int orig_timeout;
69         bool ok;
70
71         ok = rpc_tstream_is_connected(transp);
72         if (!ok) {
73                 return 0;
74         }
75
76         if (tstream_is_cli_np(transp->stream)) {
77                 transp->timeout = timeout;
78                 return tstream_cli_np_set_timeout(transp->stream, timeout);
79         }
80
81         orig_timeout = transp->timeout;
82
83         transp->timeout = timeout;
84
85         return orig_timeout;
86 }
87
88 struct rpc_tstream_next_vector_state {
89         uint8_t *buf;
90         size_t len;
91         off_t ofs;
92         size_t remaining;
93 };
94
95 static void rpc_tstream_next_vector_init(
96                                 struct rpc_tstream_next_vector_state *s,
97                                 uint8_t *buf, size_t len)
98 {
99         ZERO_STRUCTP(s);
100
101         s->buf = buf;
102         s->len = MIN(len, UINT16_MAX);
103 }
104
105 static int rpc_tstream_next_vector(struct tstream_context *stream,
106                                    void *private_data,
107                                    TALLOC_CTX *mem_ctx,
108                                    struct iovec **_vector,
109                                    size_t *count)
110 {
111         struct rpc_tstream_next_vector_state *state =
112                 (struct rpc_tstream_next_vector_state *)private_data;
113         struct iovec *vector;
114         ssize_t pending;
115         size_t wanted;
116
117         if (state->ofs == state->len) {
118                 *_vector = NULL;
119                 *count = 0;
120                 return 0;
121         }
122
123         pending = tstream_pending_bytes(stream);
124         if (pending == -1) {
125                 return -1;
126         }
127
128         if (pending == 0 && state->ofs != 0) {
129                 /* return a short read */
130                 *_vector = NULL;
131                 *count = 0;
132                 return 0;
133         }
134
135         if (pending == 0) {
136                 /* we want at least one byte and recheck again */
137                 wanted = 1;
138         } else {
139                 size_t missing = state->len - state->ofs;
140                 if (pending > missing) {
141                         /* there's more available */
142                         state->remaining = pending - missing;
143                         wanted = missing;
144                 } else {
145                         /* read what we can get and recheck in the next cycle */
146                         wanted = pending;
147                 }
148         }
149
150         vector = talloc_array(mem_ctx, struct iovec, 1);
151         if (!vector) {
152                 return -1;
153         }
154
155         vector[0].iov_base = state->buf + state->ofs;
156         vector[0].iov_len = wanted;
157
158         state->ofs += wanted;
159
160         *_vector = vector;
161         *count = 1;
162         return 0;
163 }
164
165 struct rpc_tstream_read_state {
166         struct rpc_tstream_state *transp;
167         struct rpc_tstream_next_vector_state next_vector;
168         ssize_t nread;
169 };
170
171 static void rpc_tstream_read_done(struct tevent_req *subreq);
172
173 static struct tevent_req *rpc_tstream_read_send(TALLOC_CTX *mem_ctx,
174                                              struct event_context *ev,
175                                              uint8_t *data, size_t size,
176                                              void *priv)
177 {
178         struct rpc_tstream_state *transp =
179                 talloc_get_type_abort(priv, struct rpc_tstream_state);
180         struct tevent_req *req, *subreq;
181         struct rpc_tstream_read_state *state;
182         struct timeval endtime;
183
184         req = tevent_req_create(mem_ctx, &state, struct rpc_tstream_read_state);
185         if (req == NULL) {
186                 return NULL;
187         }
188         if (!rpc_tstream_is_connected(transp)) {
189                 tevent_req_nterror(req, NT_STATUS_CONNECTION_INVALID);
190                 return tevent_req_post(req, ev);
191         }
192         state->transp = transp;
193         rpc_tstream_next_vector_init(&state->next_vector, data, size);
194
195         subreq = tstream_readv_pdu_queue_send(state, ev,
196                                               transp->stream,
197                                               transp->read_queue,
198                                               rpc_tstream_next_vector,
199                                               &state->next_vector);
200         if (subreq == NULL) {
201                 tevent_req_nterror(req, NT_STATUS_NO_MEMORY);
202                 return tevent_req_post(req, ev);
203         }
204
205         endtime = timeval_current_ofs_msec(transp->timeout);
206         if (!tevent_req_set_endtime(subreq, ev, endtime)) {
207                 goto fail;
208         }
209
210         tevent_req_set_callback(subreq, rpc_tstream_read_done, req);
211         return req;
212  fail:
213         TALLOC_FREE(req);
214         return NULL;
215 }
216
217 static void rpc_tstream_read_done(struct tevent_req *subreq)
218 {
219         struct tevent_req *req =
220                 tevent_req_callback_data(subreq, struct tevent_req);
221         struct rpc_tstream_read_state *state =
222                 tevent_req_data(req, struct rpc_tstream_read_state);
223         int err;
224
225         state->nread = tstream_readv_pdu_queue_recv(subreq, &err);
226         TALLOC_FREE(subreq);
227         if (state->nread < 0) {
228                 rpc_tstream_disconnect(state->transp);
229                 tevent_req_nterror(req, map_nt_error_from_unix(err));
230                 return;
231         }
232         tevent_req_done(req);
233 }
234
235 static NTSTATUS rpc_tstream_read_recv(struct tevent_req *req, ssize_t *size)
236 {
237         struct rpc_tstream_read_state *state = tevent_req_data(
238                 req, struct rpc_tstream_read_state);
239         NTSTATUS status;
240
241         if (tevent_req_is_nterror(req, &status)) {
242                 return status;
243         }
244         *size = state->nread;
245         return NT_STATUS_OK;
246 }
247
248 struct rpc_tstream_write_state {
249         struct event_context *ev;
250         struct rpc_tstream_state *transp;
251         struct iovec iov;
252         ssize_t nwritten;
253 };
254
255 static void rpc_tstream_write_done(struct tevent_req *subreq);
256
257 static struct tevent_req *rpc_tstream_write_send(TALLOC_CTX *mem_ctx,
258                                               struct event_context *ev,
259                                               const uint8_t *data, size_t size,
260                                               void *priv)
261 {
262         struct rpc_tstream_state *transp =
263                 talloc_get_type_abort(priv, struct rpc_tstream_state);
264         struct tevent_req *req, *subreq;
265         struct rpc_tstream_write_state *state;
266         struct timeval endtime;
267
268         req = tevent_req_create(mem_ctx, &state, struct rpc_tstream_write_state);
269         if (req == NULL) {
270                 return NULL;
271         }
272         if (!rpc_tstream_is_connected(transp)) {
273                 tevent_req_nterror(req, NT_STATUS_CONNECTION_INVALID);
274                 return tevent_req_post(req, ev);
275         }
276         state->ev = ev;
277         state->transp = transp;
278         state->iov.iov_base = discard_const_p(void *, data);
279         state->iov.iov_len = size;
280
281         subreq = tstream_writev_queue_send(state, ev,
282                                            transp->stream,
283                                            transp->write_queue,
284                                            &state->iov, 1);
285         if (subreq == NULL) {
286                 goto fail;
287         }
288
289         endtime = timeval_current_ofs_msec(transp->timeout);
290         if (!tevent_req_set_endtime(subreq, ev, endtime)) {
291                 goto fail;
292         }
293
294         tevent_req_set_callback(subreq, rpc_tstream_write_done, req);
295         return req;
296  fail:
297         TALLOC_FREE(req);
298         return NULL;
299 }
300
301 static void rpc_tstream_write_done(struct tevent_req *subreq)
302 {
303         struct tevent_req *req =
304                 tevent_req_callback_data(subreq, struct tevent_req);
305         struct rpc_tstream_write_state *state =
306                 tevent_req_data(req, struct rpc_tstream_write_state);
307         int err;
308
309         state->nwritten = tstream_writev_queue_recv(subreq, &err);
310         TALLOC_FREE(subreq);
311         if (state->nwritten < 0) {
312                 rpc_tstream_disconnect(state->transp);
313                 tevent_req_nterror(req, map_nt_error_from_unix(err));
314                 return;
315         }
316         tevent_req_done(req);
317 }
318
319 static NTSTATUS rpc_tstream_write_recv(struct tevent_req *req, ssize_t *sent)
320 {
321         struct rpc_tstream_write_state *state =
322                 tevent_req_data(req, struct rpc_tstream_write_state);
323         NTSTATUS status;
324
325         if (tevent_req_is_nterror(req, &status)) {
326                 return status;
327         }
328         *sent = state->nwritten;
329         return NT_STATUS_OK;
330 }
331
332 struct rpc_tstream_trans_state {
333         struct tevent_context *ev;
334         struct rpc_tstream_state *transp;
335         struct iovec req;
336         uint32_t max_rdata_len;
337         struct iovec rep;
338 };
339
340 static void rpc_tstream_trans_writev(struct tevent_req *subreq);
341 static void rpc_tstream_trans_readv_pdu(struct tevent_req *subreq);
342
343 static int rpc_tstream_trans_next_vector(struct tstream_context *stream,
344                                          void *private_data,
345                                          TALLOC_CTX *mem_ctx,
346                                          struct iovec **_vector,
347                                          size_t *count);
348
349 static struct tevent_req *rpc_tstream_trans_send(TALLOC_CTX *mem_ctx,
350                                                  struct tevent_context *ev,
351                                                  uint8_t *data, size_t data_len,
352                                                  uint32_t max_rdata_len,
353                                                  void *priv)
354 {
355         struct rpc_tstream_state *transp =
356                 talloc_get_type_abort(priv, struct rpc_tstream_state);
357         struct tevent_req *req, *subreq;
358         struct rpc_tstream_trans_state *state;
359         struct timeval endtime;
360
361         req = tevent_req_create(mem_ctx, &state,
362                                 struct rpc_tstream_trans_state);
363         if (req == NULL) {
364                 return NULL;
365         }
366
367         if (!rpc_tstream_is_connected(transp)) {
368                 tevent_req_nterror(req, NT_STATUS_CONNECTION_INVALID);
369                 return tevent_req_post(req, ev);
370         }
371         state->ev = ev;
372         state->transp = transp;
373         state->req.iov_len = data_len;
374         state->req.iov_base = discard_const_p(void *, data);
375         state->max_rdata_len = max_rdata_len;
376
377         endtime = timeval_current_ofs_msec(transp->timeout);
378
379         subreq = tstream_writev_queue_send(state, ev,
380                                            transp->stream,
381                                            transp->write_queue,
382                                            &state->req, 1);
383         if (tevent_req_nomem(subreq, req)) {
384                 return tevent_req_post(req, ev);
385         }
386         if (!tevent_req_set_endtime(subreq, ev, endtime)) {
387                 return tevent_req_post(req, ev);
388         }
389         tevent_req_set_callback(subreq, rpc_tstream_trans_writev, req);
390
391         if (tstream_is_cli_np(transp->stream)) {
392                 tstream_cli_np_use_trans(transp->stream);
393         }
394
395         subreq = tstream_readv_pdu_queue_send(state, ev,
396                                               transp->stream,
397                                               transp->read_queue,
398                                               rpc_tstream_trans_next_vector,
399                                               state);
400         if (tevent_req_nomem(subreq, req)) {
401                 return tevent_req_post(req, ev);
402         }
403         if (!tevent_req_set_endtime(subreq, ev, endtime)) {
404                 return tevent_req_post(req, ev);
405         }
406         tevent_req_set_callback(subreq, rpc_tstream_trans_readv_pdu, req);
407
408         return req;
409 }
410
411 static void rpc_tstream_trans_writev(struct tevent_req *subreq)
412 {
413         struct tevent_req *req =
414                 tevent_req_callback_data(subreq,
415                 struct tevent_req);
416         struct rpc_tstream_trans_state *state =
417                 tevent_req_data(req,
418                 struct rpc_tstream_trans_state);
419         int ret;
420         int err;
421
422         ret = tstream_writev_queue_recv(subreq, &err);
423         TALLOC_FREE(subreq);
424         if (ret == -1) {
425                 rpc_tstream_disconnect(state->transp);
426                 tevent_req_nterror(req, map_nt_error_from_unix(err));
427                 return;
428         }
429 }
430
431 static int rpc_tstream_trans_next_vector(struct tstream_context *stream,
432                                          void *private_data,
433                                          TALLOC_CTX *mem_ctx,
434                                          struct iovec **_vector,
435                                          size_t *count)
436 {
437         struct rpc_tstream_trans_state *state =
438                 talloc_get_type_abort(private_data,
439                 struct rpc_tstream_trans_state);
440         struct iovec *vector;
441
442         if (state->max_rdata_len == state->rep.iov_len) {
443                 *_vector = NULL;
444                 *count = 0;
445                 return 0;
446         }
447
448         state->rep.iov_base = talloc_array(state, uint8_t,
449                                            state->max_rdata_len);
450         if (state->rep.iov_base == NULL) {
451                 return -1;
452         }
453         state->rep.iov_len = state->max_rdata_len;
454
455         vector = talloc_array(mem_ctx, struct iovec, 1);
456         if (!vector) {
457                 return -1;
458         }
459
460         vector[0] = state->rep;
461
462         *_vector = vector;
463         *count = 1;
464         return 0;
465 }
466
467 static void rpc_tstream_trans_readv_pdu(struct tevent_req *subreq)
468 {
469         struct tevent_req *req =
470                 tevent_req_callback_data(subreq,
471                 struct tevent_req);
472         struct rpc_tstream_trans_state *state =
473                 tevent_req_data(req,
474                 struct rpc_tstream_trans_state);
475         int ret;
476         int err;
477
478         ret = tstream_readv_pdu_queue_recv(subreq, &err);
479         TALLOC_FREE(subreq);
480         if (ret == -1) {
481                 rpc_tstream_disconnect(state->transp);
482                 tevent_req_nterror(req, map_nt_error_from_unix(err));
483                 return;
484         }
485
486         tevent_req_done(req);
487 }
488
489 static NTSTATUS rpc_tstream_trans_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
490                                        uint8_t **prdata, uint32_t *prdata_len)
491 {
492         struct rpc_tstream_trans_state *state =
493                 tevent_req_data(req,
494                 struct rpc_tstream_trans_state);
495         NTSTATUS status;
496
497         if (tevent_req_is_nterror(req, &status)) {
498                 return status;
499         }
500
501         *prdata = (uint8_t *)talloc_move(mem_ctx, &state->rep.iov_base);
502         *prdata_len = state->rep.iov_len;
503         return NT_STATUS_OK;
504 }
505
506 /**
507 * @brief Initialize a tstream transport facility
508 *        NOTE: this function will talloc_steal, the stream and the queues.
509 *
510 * @param mem_ctx        - memory context used to allocate the transport
511 * @param stream         - a ready to use tstream
512 * @param presult        - the transport structure
513 *
514 * @return               - a NT Status error code.
515 */
516 NTSTATUS rpc_transport_tstream_init(TALLOC_CTX *mem_ctx,
517                                 struct tstream_context **stream,
518                                 struct rpc_cli_transport **presult)
519 {
520         struct rpc_cli_transport *result;
521         struct rpc_tstream_state *state;
522
523         result = talloc(mem_ctx, struct rpc_cli_transport);
524         if (result == NULL) {
525                 return NT_STATUS_NO_MEMORY;
526         }
527         state = talloc(result, struct rpc_tstream_state);
528         if (state == NULL) {
529                 TALLOC_FREE(result);
530                 return NT_STATUS_NO_MEMORY;
531         }
532         result->priv = state;
533
534         state->read_queue = tevent_queue_create(state, "read_queue");
535         if (state->read_queue == NULL) {
536                 TALLOC_FREE(result);
537                 return NT_STATUS_NO_MEMORY;
538         }
539         state->write_queue = tevent_queue_create(state, "write_queue");
540         if (state->write_queue == NULL) {
541                 TALLOC_FREE(result);
542                 return NT_STATUS_NO_MEMORY;
543         }
544
545         state->stream = talloc_move(state, stream);
546         state->timeout = 10000; /* 10 seconds. */
547
548         if (tstream_is_cli_np(state->stream)) {
549                 result->trans_send = rpc_tstream_trans_send;
550                 result->trans_recv = rpc_tstream_trans_recv;
551         } else {
552                 result->trans_send = NULL;
553                 result->trans_recv = NULL;
554         }
555         result->write_send = rpc_tstream_write_send;
556         result->write_recv = rpc_tstream_write_recv;
557         result->read_send = rpc_tstream_read_send;
558         result->read_recv = rpc_tstream_read_recv;
559         result->is_connected = rpc_tstream_is_connected;
560         result->set_timeout = rpc_tstream_set_timeout;
561
562         *presult = result;
563         return NT_STATUS_OK;
564 }
565
566 struct cli_state *rpc_pipe_np_smb_conn(struct rpc_pipe_client *p)
567 {
568         struct rpc_tstream_state *transp =
569                 talloc_get_type_abort(p->transport->priv,
570                 struct rpc_tstream_state);
571         bool ok;
572
573         ok = rpccli_is_connected(p);
574         if (!ok) {
575                 return NULL;
576         }
577
578         if (!tstream_is_cli_np(transp->stream)) {
579                 return NULL;
580         }
581
582         return tstream_cli_np_get_cli_state(transp->stream);
583 }