s4:libcli/wrepl: rewrite the low level request handling to use tevent_queue and tstre...
[kamenim/samba.git] / source4 / libcli / wrepl / winsrepl.c
1 /* 
2    Unix SMB/CIFS implementation.
3
4    low level WINS replication client code
5
6    Copyright (C) Andrew Tridgell 2005
7    Copyright (C) Stefan Metzmacher 2005-2010
8    
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation; either version 3 of the License, or
12    (at your option) any later version.
13    
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License for more details.
18    
19    You should have received a copy of the GNU General Public License
20    along with this program.  If not, see <http://www.gnu.org/licenses/>.
21 */
22
23 #include "includes.h"
24 #include "lib/events/events.h"
25 #include "../lib/util/dlinklist.h"
26 #include "libcli/wrepl/winsrepl.h"
27 #include "librpc/gen_ndr/ndr_winsrepl.h"
28 #include "lib/stream/packet.h"
29 #include "system/network.h"
30 #include "lib/socket/netif.h"
31 #include "param/param.h"
32 #include "lib/util/tevent_ntstatus.h"
33 #include "lib/tsocket/tsocket.h"
34 #include "libcli/util/tstream.h"
35
36 /*
37   main context structure for the wins replication client library
38 */
39 struct wrepl_socket {
40         struct {
41                 struct tevent_context *ctx;
42         } event;
43
44         /* the default timeout for requests, 0 means no timeout */
45 #define WREPL_SOCKET_REQUEST_TIMEOUT    (60)
46         uint32_t request_timeout;
47
48         struct smb_iconv_convenience *iconv_convenience;
49
50         struct tevent_queue *request_queue;
51
52         struct tstream_context *stream;
53 };
54
55 bool wrepl_socket_is_connected(struct wrepl_socket *wrepl_sock)
56 {
57         if (!wrepl_sock) {
58                 return false;
59         }
60
61         if (!wrepl_sock->stream) {
62                 return false;
63         }
64
65         return true;
66 }
67
68 /*
69   initialise a wrepl_socket. The event_ctx is optional, if provided then
70   operations will use that event context
71 */
72 struct wrepl_socket *wrepl_socket_init(TALLOC_CTX *mem_ctx,
73                                        struct tevent_context *event_ctx,
74                                        struct smb_iconv_convenience *iconv_convenience)
75 {
76         struct wrepl_socket *wrepl_socket;
77
78         wrepl_socket = talloc_zero(mem_ctx, struct wrepl_socket);
79         if (!wrepl_socket) {
80                 return NULL;
81         }
82
83         wrepl_socket->event.ctx = event_ctx;
84         if (!wrepl_socket->event.ctx) {
85                 goto failed;
86         }
87
88         wrepl_socket->request_queue = tevent_queue_create(wrepl_socket,
89                                                           "wrepl request queue");
90         if (wrepl_socket->request_queue == NULL) {
91                 goto failed;
92         }
93
94         wrepl_socket->iconv_convenience = iconv_convenience;
95
96         wrepl_socket->request_timeout   = WREPL_SOCKET_REQUEST_TIMEOUT;
97
98         return wrepl_socket;
99
100 failed:
101         talloc_free(wrepl_socket);
102         return NULL;
103 }
104
105 /*
106   initialise a wrepl_socket from an already existing connection
107 */
108 NTSTATUS wrepl_socket_donate_stream(struct wrepl_socket *wrepl_socket,
109                                     struct tstream_context **stream)
110 {
111         if (wrepl_socket->stream) {
112                 return NT_STATUS_CONNECTION_ACTIVE;
113         }
114
115         wrepl_socket->stream = talloc_move(wrepl_socket, stream);
116         return NT_STATUS_OK;
117 }
118
119 /*
120   initialise a wrepl_socket from an already existing connection
121 */
122 NTSTATUS wrepl_socket_split_stream(struct wrepl_socket *wrepl_socket,
123                                    TALLOC_CTX *mem_ctx,
124                                    struct tstream_context **stream)
125 {
126         size_t num_requests;
127
128         if (!wrepl_socket->stream) {
129                 return NT_STATUS_CONNECTION_INVALID;
130         }
131
132         num_requests = tevent_queue_length(wrepl_socket->request_queue);
133         if (num_requests > 0) {
134                 return NT_STATUS_CONNECTION_IN_USE;
135         }
136
137         *stream = talloc_move(wrepl_socket, &wrepl_socket->stream);
138         return NT_STATUS_OK;
139 }
140
141 const char *wrepl_best_ip(struct loadparm_context *lp_ctx, const char *peer_ip)
142 {
143         struct interface *ifaces;
144         load_interfaces(lp_ctx, lp_interfaces(lp_ctx), &ifaces);
145         return iface_best_ip(ifaces, peer_ip);
146 }
147
148 struct wrepl_connect_state {
149         struct {
150                 struct wrepl_socket *wrepl_socket;
151                 struct tevent_context *ev;
152         } caller;
153         struct tsocket_address *local_address;
154         struct tsocket_address *remote_address;
155         struct tstream_context *stream;
156 };
157
158 static void wrepl_connect_trigger(struct tevent_req *req,
159                                   void *private_date);
160
161 struct tevent_req *wrepl_connect_send(TALLOC_CTX *mem_ctx,
162                                       struct tevent_context *ev,
163                                       struct wrepl_socket *wrepl_socket,
164                                       const char *our_ip, const char *peer_ip)
165 {
166         struct tevent_req *req;
167         struct wrepl_connect_state *state;
168         int ret;
169         bool ok;
170
171         req = tevent_req_create(mem_ctx, &state,
172                                 struct wrepl_connect_state);
173         if (req == NULL) {
174                 return NULL;
175         }
176
177         state->caller.wrepl_socket = wrepl_socket;
178         state->caller.ev = ev;
179
180         if (wrepl_socket->stream) {
181                 tevent_req_nterror(req, NT_STATUS_CONNECTION_ACTIVE);
182                 return tevent_req_post(req, ev);
183         }
184
185         ret = tsocket_address_inet_from_strings(state, "ipv4",
186                                                 our_ip, 0,
187                                                 &state->local_address);
188         if (ret != 0) {
189                 NTSTATUS status = map_nt_error_from_unix(errno);
190                 tevent_req_nterror(req, status);
191                 return tevent_req_post(req, ev);
192         }
193
194         ret = tsocket_address_inet_from_strings(state, "ipv4",
195                                                 peer_ip, WINS_REPLICATION_PORT,
196                                                 &state->remote_address);
197         if (ret != 0) {
198                 NTSTATUS status = map_nt_error_from_unix(errno);
199                 tevent_req_nterror(req, status);
200                 return tevent_req_post(req, ev);
201         }
202
203         ok = tevent_queue_add(wrepl_socket->request_queue,
204                               ev,
205                               req,
206                               wrepl_connect_trigger,
207                               NULL);
208         if (!ok) {
209                 tevent_req_nomem(NULL, req);
210                 return tevent_req_post(req, ev);
211         }
212
213         if (wrepl_socket->request_timeout > 0) {
214                 struct timeval endtime;
215                 endtime = tevent_timeval_current_ofs(wrepl_socket->request_timeout, 0);
216                 ok = tevent_req_set_endtime(req, ev, endtime);
217                 if (!ok) {
218                         return tevent_req_post(req, ev);
219                 }
220         }
221
222         return req;
223 }
224
225 static void wrepl_connect_done(struct tevent_req *subreq);
226
227 static void wrepl_connect_trigger(struct tevent_req *req,
228                                   void *private_date)
229 {
230         struct wrepl_connect_state *state = tevent_req_data(req,
231                                             struct wrepl_connect_state);
232         struct tevent_req *subreq;
233
234         subreq = tstream_inet_tcp_connect_send(state,
235                                                state->caller.ev,
236                                                state->local_address,
237                                                state->remote_address);
238         if (tevent_req_nomem(subreq, req)) {
239                 return;
240         }
241         tevent_req_set_callback(subreq, wrepl_connect_done, req);
242
243         return;
244 }
245
246 static void wrepl_connect_done(struct tevent_req *subreq)
247 {
248         struct tevent_req *req = tevent_req_callback_data(subreq,
249                                  struct tevent_req);
250         struct wrepl_connect_state *state = tevent_req_data(req,
251                                             struct wrepl_connect_state);
252         int ret;
253         int sys_errno;
254
255         ret = tstream_inet_tcp_connect_recv(subreq, &sys_errno,
256                                             state, &state->stream);
257         if (ret != 0) {
258                 NTSTATUS status = map_nt_error_from_unix(sys_errno);
259                 tevent_req_nterror(req, status);
260                 return;
261         }
262
263         tevent_req_done(req);
264 }
265
266 /*
267   connect a wrepl_socket to a WINS server - recv side
268 */
269 NTSTATUS wrepl_connect_recv(struct tevent_req *req)
270 {
271         struct wrepl_connect_state *state = tevent_req_data(req,
272                                             struct wrepl_connect_state);
273         struct wrepl_socket *wrepl_socket = state->caller.wrepl_socket;
274         NTSTATUS status;
275
276         if (tevent_req_is_nterror(req, &status)) {
277                 tevent_req_received(req);
278                 return status;
279         }
280
281         wrepl_socket->stream = talloc_move(wrepl_socket, &state->stream);
282
283         tevent_req_received(req);
284         return NT_STATUS_OK;
285 }
286
287 /*
288   connect a wrepl_socket to a WINS server - sync API
289 */
290 NTSTATUS wrepl_connect(struct wrepl_socket *wrepl_socket,
291                        const char *our_ip, const char *peer_ip)
292 {
293         struct tevent_req *subreq;
294         bool ok;
295         NTSTATUS status;
296
297         subreq = wrepl_connect_send(wrepl_socket, wrepl_socket->event.ctx,
298                                     wrepl_socket, our_ip, peer_ip);
299         NT_STATUS_HAVE_NO_MEMORY(subreq);
300
301         ok = tevent_req_poll(subreq, wrepl_socket->event.ctx);
302         if (!ok) {
303                 TALLOC_FREE(subreq);
304                 return NT_STATUS_INTERNAL_ERROR;
305         }
306
307         status = wrepl_connect_recv(subreq);
308         TALLOC_FREE(subreq);
309         NT_STATUS_NOT_OK_RETURN(status);
310
311         return NT_STATUS_OK;
312 }
313
314 struct wrepl_request_state {
315         struct {
316                 struct wrepl_socket *wrepl_socket;
317                 struct tevent_context *ev;
318         } caller;
319         struct wrepl_send_ctrl ctrl;
320         struct {
321                 struct wrepl_wrap wrap;
322                 DATA_BLOB blob;
323                 struct iovec iov;
324         } req;
325         bool one_way;
326         struct {
327                 DATA_BLOB blob;
328                 struct wrepl_packet *packet;
329         } rep;
330 };
331
332 static void wrepl_request_trigger(struct tevent_req *req,
333                                   void *private_data);
334
335 struct tevent_req *wrepl_request_send(TALLOC_CTX *mem_ctx,
336                                       struct tevent_context *ev,
337                                       struct wrepl_socket *wrepl_socket,
338                                       const struct wrepl_packet *packet,
339                                       const struct wrepl_send_ctrl *ctrl)
340 {
341         struct tevent_req *req;
342         struct wrepl_request_state *state;
343         NTSTATUS status;
344         enum ndr_err_code ndr_err;
345         bool ok;
346
347         if (wrepl_socket->event.ctx != ev) {
348                 /* TODO: remove wrepl_socket->event.ctx !!! */
349                 smb_panic("wrepl_associate_stop_send event context mismatch!");
350                 return NULL;
351         }
352
353         req = tevent_req_create(mem_ctx, &state,
354                                 struct wrepl_request_state);
355         if (req == NULL) {
356                 return NULL;
357         }
358
359         state->caller.wrepl_socket = wrepl_socket;
360         state->caller.ev = ev;
361
362         if (ctrl) {
363                 state->ctrl = *ctrl;
364         }
365
366         if (wrepl_socket->stream == NULL) {
367                 tevent_req_nterror(req, NT_STATUS_INVALID_CONNECTION);
368                 return tevent_req_post(req, ev);
369         }
370
371         state->req.wrap.packet = *packet;
372         ndr_err = ndr_push_struct_blob(&state->req.blob, state,
373                                        wrepl_socket->iconv_convenience,
374                                        &state->req.wrap,
375                                        (ndr_push_flags_fn_t)ndr_push_wrepl_wrap);
376         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
377                 status = ndr_map_error2ntstatus(ndr_err);
378                 tevent_req_nterror(req, status);
379                 return tevent_req_post(req, ev);
380         }
381
382         state->req.iov.iov_base = state->req.blob.data;
383         state->req.iov.iov_len = state->req.blob.length;
384
385         ok = tevent_queue_add(wrepl_socket->request_queue,
386                               ev,
387                               req,
388                               wrepl_request_trigger,
389                               NULL);
390         if (!ok) {
391                 tevent_req_nomem(NULL, req);
392                 return tevent_req_post(req, ev);
393         }
394
395         if (wrepl_socket->request_timeout > 0) {
396                 struct timeval endtime;
397                 endtime = tevent_timeval_current_ofs(wrepl_socket->request_timeout, 0);
398                 ok = tevent_req_set_endtime(req, ev, endtime);
399                 if (!ok) {
400                         return tevent_req_post(req, ev);
401                 }
402         }
403
404         return req;
405 }
406
407 static void wrepl_request_writev_done(struct tevent_req *subreq);
408
409 static void wrepl_request_trigger(struct tevent_req *req,
410                                   void *private_data)
411 {
412         struct wrepl_request_state *state = tevent_req_data(req,
413                                             struct wrepl_request_state);
414         struct tevent_req *subreq;
415
416         if (DEBUGLVL(10)) {
417                 DEBUG(10,("Sending WINS packet of length %u\n",
418                           (unsigned)state->req.blob.length));
419                 NDR_PRINT_DEBUG(wrepl_packet, &state->req.wrap.packet);
420         }
421
422         subreq = tstream_writev_send(state,
423                                      state->caller.ev,
424                                      state->caller.wrepl_socket->stream,
425                                      &state->req.iov, 1);
426         if (tevent_req_nomem(subreq, req)) {
427                 return;
428         }
429         tevent_req_set_callback(subreq, wrepl_request_writev_done, req);
430 }
431
432 static void wrepl_request_disconnect_done(struct tevent_req *subreq);
433 static void wrepl_request_read_pdu_done(struct tevent_req *subreq);
434
435 static void wrepl_request_writev_done(struct tevent_req *subreq)
436 {
437         struct tevent_req *req = tevent_req_callback_data(subreq,
438                                  struct tevent_req);
439         struct wrepl_request_state *state = tevent_req_data(req,
440                                             struct wrepl_request_state);
441         int ret;
442         int sys_errno;
443
444         ret = tstream_writev_recv(subreq, &sys_errno);
445         TALLOC_FREE(subreq);
446         if (ret == -1) {
447                 NTSTATUS status = map_nt_error_from_unix(sys_errno);
448                 tevent_req_nterror(req, status);
449                 return;
450         }
451
452         if (state->ctrl.disconnect_after_send) {
453                 subreq = tstream_disconnect_send(state,
454                                                  state->caller.ev,
455                                                  state->caller.wrepl_socket->stream);
456                 if (tevent_req_nomem(subreq, req)) {
457                         return;
458                 }
459                 tevent_req_set_callback(subreq, wrepl_request_disconnect_done, req);
460                 return;
461         }
462
463         if (state->ctrl.send_only) {
464                 tevent_req_done(req);
465                 return;
466         }
467
468         subreq = tstream_read_pdu_blob_send(state,
469                                             state->caller.ev,
470                                             state->caller.wrepl_socket->stream,
471                                             4, /* initial_read_size */
472                                             packet_full_request_u32,
473                                             NULL);
474         if (tevent_req_nomem(subreq, req)) {
475                 return;
476         }
477         tevent_req_set_callback(subreq, wrepl_request_read_pdu_done, req);
478 }
479
480 static void wrepl_request_disconnect_done(struct tevent_req *subreq)
481 {
482         struct tevent_req *req = tevent_req_callback_data(subreq,
483                                  struct tevent_req);
484         struct wrepl_request_state *state = tevent_req_data(req,
485                                             struct wrepl_request_state);
486         int ret;
487         int sys_errno;
488
489         ret = tstream_disconnect_recv(subreq, &sys_errno);
490         TALLOC_FREE(subreq);
491         if (ret == -1) {
492                 NTSTATUS status = map_nt_error_from_unix(sys_errno);
493                 tevent_req_nterror(req, status);
494                 return;
495         }
496
497         DEBUG(10,("WINS connection disconnected\n"));
498         state->caller.wrepl_socket->stream = NULL;
499
500         tevent_req_done(req);
501 }
502
503 static void wrepl_request_read_pdu_done(struct tevent_req *subreq)
504 {
505         struct tevent_req *req = tevent_req_callback_data(subreq,
506                                  struct tevent_req);
507         struct wrepl_request_state *state = tevent_req_data(req,
508                                             struct wrepl_request_state);
509         NTSTATUS status;
510         DATA_BLOB blob;
511         enum ndr_err_code ndr_err;
512
513         status = tstream_read_pdu_blob_recv(subreq, state, &state->rep.blob);
514         if (!NT_STATUS_IS_OK(status)) {
515                 tevent_req_nterror(req, status);
516                 return;
517         }
518
519         state->rep.packet = talloc(state, struct wrepl_packet);
520         if (tevent_req_nomem(state->rep.packet, req)) {
521                 return;
522         }
523
524         blob.data = state->rep.blob.data + 4;
525         blob.length = state->rep.blob.length - 4;
526
527         /* we have a full request - parse it */
528         ndr_err = ndr_pull_struct_blob(&blob,
529                                        state->rep.packet,
530                                        state->caller.wrepl_socket->iconv_convenience,
531                                        state->rep.packet,
532                                        (ndr_pull_flags_fn_t)ndr_pull_wrepl_packet);
533         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
534                 status = ndr_map_error2ntstatus(ndr_err);
535                 tevent_req_nterror(req, status);
536                 return;
537         }
538
539         if (DEBUGLVL(10)) {
540                 DEBUG(10,("Received WINS packet of length %u\n",
541                           (unsigned)state->rep.blob.length));
542                 NDR_PRINT_DEBUG(wrepl_packet, state->rep.packet);
543         }
544
545         tevent_req_done(req);
546 }
547
548 NTSTATUS wrepl_request_recv(struct tevent_req *req,
549                             TALLOC_CTX *mem_ctx,
550                             struct wrepl_packet **packet)
551 {
552         struct wrepl_request_state *state = tevent_req_data(req,
553                                             struct wrepl_request_state);
554         NTSTATUS status;
555
556         if (tevent_req_is_nterror(req, &status)) {
557                 tevent_req_received(req);
558                 return status;
559         }
560
561         if (packet) {
562                 *packet = talloc_move(mem_ctx, &state->rep.packet);
563         }
564
565         tevent_req_received(req);
566         return NT_STATUS_OK;
567 }
568
569 /*
570   a full WINS replication request/response
571 */
572 NTSTATUS wrepl_request(struct wrepl_socket *wrepl_socket,
573                        TALLOC_CTX *mem_ctx,
574                        const struct wrepl_packet *req_packet,
575                        struct wrepl_packet **reply_packet)
576 {
577         struct tevent_req *subreq;
578         bool ok;
579         NTSTATUS status;
580
581         subreq = wrepl_request_send(mem_ctx, wrepl_socket->event.ctx,
582                                     wrepl_socket, req_packet, NULL);
583         NT_STATUS_HAVE_NO_MEMORY(subreq);
584
585         ok = tevent_req_poll(subreq, wrepl_socket->event.ctx);
586         if (!ok) {
587                 TALLOC_FREE(subreq);
588                 return NT_STATUS_INTERNAL_ERROR;
589         }
590
591         status = wrepl_request_recv(subreq, mem_ctx, reply_packet);
592         TALLOC_FREE(subreq);
593         NT_STATUS_NOT_OK_RETURN(status);
594
595         return NT_STATUS_OK;
596 }
597
598
599 struct wrepl_associate_state {
600         struct wrepl_packet packet;
601         uint32_t assoc_ctx;
602         uint16_t major_version;
603 };
604
605 static void wrepl_associate_done(struct tevent_req *subreq);
606
607 struct tevent_req *wrepl_associate_send(TALLOC_CTX *mem_ctx,
608                                         struct tevent_context *ev,
609                                         struct wrepl_socket *wrepl_socket,
610                                         const struct wrepl_associate *io)
611 {
612         struct tevent_req *req;
613         struct wrepl_associate_state *state;
614         struct tevent_req *subreq;
615
616         if (wrepl_socket->event.ctx != ev) {
617                 /* TODO: remove wrepl_socket->event.ctx !!! */
618                 smb_panic("wrepl_associate_send event context mismatch!");
619                 return NULL;
620         }
621
622         req = tevent_req_create(mem_ctx, &state,
623                                 struct wrepl_associate_state);
624         if (req == NULL) {
625                 return NULL;
626         };
627
628         state->packet.opcode                            = WREPL_OPCODE_BITS;
629         state->packet.mess_type                         = WREPL_START_ASSOCIATION;
630         state->packet.message.start.minor_version       = 2;
631         state->packet.message.start.major_version       = 5;
632
633         /*
634          * nt4 uses 41 bytes for the start_association call
635          * so do it the same and as we don't know th emeanings of this bytes
636          * we just send zeros and nt4, w2k and w2k3 seems to be happy with this
637          *
638          * if we don't do this nt4 uses an old version of the wins replication protocol
639          * and that would break nt4 <-> samba replication
640          */
641         state->packet.padding   = data_blob_talloc(state, NULL, 21);
642         if (tevent_req_nomem(state->packet.padding.data, req)) {
643                 return tevent_req_post(req, ev);
644         }
645         memset(state->packet.padding.data, 0, state->packet.padding.length);
646
647         subreq = wrepl_request_send(state, ev, wrepl_socket, &state->packet, NULL);
648         if (tevent_req_nomem(subreq, req)) {
649                 return tevent_req_post(req, ev);
650         }
651         tevent_req_set_callback(subreq, wrepl_associate_done, req);
652
653         return req;
654 }
655
656 static void wrepl_associate_done(struct tevent_req *subreq)
657 {
658         struct tevent_req *req = tevent_req_callback_data(subreq,
659                                  struct tevent_req);
660         struct wrepl_associate_state *state = tevent_req_data(req,
661                                               struct wrepl_associate_state);
662         NTSTATUS status;
663         struct wrepl_packet *packet;
664
665         status = wrepl_request_recv(subreq, state, &packet);
666         TALLOC_FREE(subreq);
667         if (!NT_STATUS_IS_OK(status)) {
668                 tevent_req_nterror(req, status);
669                 return;
670         }
671
672         if (packet->mess_type != WREPL_START_ASSOCIATION_REPLY) {
673                 tevent_req_nterror(req, NT_STATUS_INVALID_NETWORK_RESPONSE);
674                 return;
675         }
676
677         state->assoc_ctx = packet->message.start_reply.assoc_ctx;
678         state->major_version = packet->message.start_reply.major_version;
679
680         tevent_req_done(req);
681 }
682
683 /*
684   setup an association - recv
685 */
686 NTSTATUS wrepl_associate_recv(struct tevent_req *req,
687                               struct wrepl_associate *io)
688 {
689         struct wrepl_associate_state *state = tevent_req_data(req,
690                                               struct wrepl_associate_state);
691         NTSTATUS status;
692
693         if (tevent_req_is_nterror(req, &status)) {
694                 tevent_req_received(req);
695                 return status;
696         }
697
698         io->out.assoc_ctx = state->assoc_ctx;
699         io->out.major_version = state->major_version;
700
701         tevent_req_received(req);
702         return NT_STATUS_OK;
703 }
704
705 /*
706   setup an association - sync api
707 */
708 NTSTATUS wrepl_associate(struct wrepl_socket *wrepl_socket,
709                          struct wrepl_associate *io)
710 {
711         struct tevent_req *subreq;
712         bool ok;
713         NTSTATUS status;
714
715         subreq = wrepl_associate_send(wrepl_socket, wrepl_socket->event.ctx,
716                                       wrepl_socket, io);
717         NT_STATUS_HAVE_NO_MEMORY(subreq);
718
719         ok = tevent_req_poll(subreq, wrepl_socket->event.ctx);
720         if (!ok) {
721                 TALLOC_FREE(subreq);
722                 return NT_STATUS_INTERNAL_ERROR;
723         }
724
725         status = wrepl_associate_recv(subreq, io);
726         TALLOC_FREE(subreq);
727         NT_STATUS_NOT_OK_RETURN(status);
728
729         return NT_STATUS_OK;
730 }
731
732 struct wrepl_associate_stop_state {
733         struct wrepl_packet packet;
734         struct wrepl_send_ctrl ctrl;
735 };
736
737 static void wrepl_associate_stop_done(struct tevent_req *subreq);
738
739 struct tevent_req *wrepl_associate_stop_send(TALLOC_CTX *mem_ctx,
740                                              struct tevent_context *ev,
741                                              struct wrepl_socket *wrepl_socket,
742                                              const struct wrepl_associate_stop *io)
743 {
744         struct tevent_req *req;
745         struct wrepl_associate_stop_state *state;
746         struct tevent_req *subreq;
747
748         if (wrepl_socket->event.ctx != ev) {
749                 /* TODO: remove wrepl_socket->event.ctx !!! */
750                 smb_panic("wrepl_associate_stop_send event context mismatch!");
751                 return NULL;
752         }
753
754         req = tevent_req_create(mem_ctx, &state,
755                                 struct wrepl_associate_stop_state);
756         if (req == NULL) {
757                 return NULL;
758         };
759
760         state->packet.opcode                    = WREPL_OPCODE_BITS;
761         state->packet.assoc_ctx                 = io->in.assoc_ctx;
762         state->packet.mess_type                 = WREPL_STOP_ASSOCIATION;
763         state->packet.message.stop.reason       = io->in.reason;
764
765         if (io->in.reason == 0) {
766                 state->ctrl.send_only                   = true;
767                 state->ctrl.disconnect_after_send       = true;
768         }
769
770         subreq = wrepl_request_send(state, ev, wrepl_socket, &state->packet, &state->ctrl);
771         if (tevent_req_nomem(subreq, req)) {
772                 return tevent_req_post(req, ev);
773         }
774         tevent_req_set_callback(subreq, wrepl_associate_stop_done, req);
775
776         return req;
777 }
778
779 static void wrepl_associate_stop_done(struct tevent_req *subreq)
780 {
781         struct tevent_req *req = tevent_req_callback_data(subreq,
782                                  struct tevent_req);
783         struct wrepl_associate_stop_state *state = tevent_req_data(req,
784                                                    struct wrepl_associate_stop_state);
785         NTSTATUS status;
786
787         /* currently we don't care about a possible response */
788         status = wrepl_request_recv(subreq, state, NULL);
789         TALLOC_FREE(subreq);
790         if (!NT_STATUS_IS_OK(status)) {
791                 tevent_req_nterror(req, status);
792                 return;
793         }
794
795         tevent_req_done(req);
796 }
797
798 /*
799   stop an association - recv
800 */
801 NTSTATUS wrepl_associate_stop_recv(struct tevent_req *req,
802                                    struct wrepl_associate_stop *io)
803 {
804         NTSTATUS status;
805
806         if (tevent_req_is_nterror(req, &status)) {
807                 tevent_req_received(req);
808                 return status;
809         }
810
811         tevent_req_received(req);
812         return NT_STATUS_OK;
813 }
814
815 /*
816   setup an association - sync api
817 */
818 NTSTATUS wrepl_associate_stop(struct wrepl_socket *wrepl_socket,
819                               struct wrepl_associate_stop *io)
820 {
821         struct tevent_req *subreq;
822         bool ok;
823         NTSTATUS status;
824
825         subreq = wrepl_associate_stop_send(wrepl_socket, wrepl_socket->event.ctx,
826                                            wrepl_socket, io);
827         NT_STATUS_HAVE_NO_MEMORY(subreq);
828
829         ok = tevent_req_poll(subreq, wrepl_socket->event.ctx);
830         if (!ok) {
831                 TALLOC_FREE(subreq);
832                 return NT_STATUS_INTERNAL_ERROR;
833         }
834
835         status = wrepl_associate_stop_recv(subreq, io);
836         TALLOC_FREE(subreq);
837         NT_STATUS_NOT_OK_RETURN(status);
838
839         return NT_STATUS_OK;
840 }
841
842 struct wrepl_pull_table_state {
843         struct wrepl_packet packet;
844         uint32_t num_partners;
845         struct wrepl_wins_owner *partners;
846 };
847
848 static void wrepl_pull_table_done(struct tevent_req *subreq);
849
850 struct tevent_req *wrepl_pull_table_send(TALLOC_CTX *mem_ctx,
851                                          struct tevent_context *ev,
852                                          struct wrepl_socket *wrepl_socket,
853                                          const struct wrepl_pull_table *io)
854 {
855         struct tevent_req *req;
856         struct wrepl_pull_table_state *state;
857         struct tevent_req *subreq;
858
859         if (wrepl_socket->event.ctx != ev) {
860                 /* TODO: remove wrepl_socket->event.ctx !!! */
861                 smb_panic("wrepl_pull_table_send event context mismatch!");
862                 return NULL;
863         }
864
865         req = tevent_req_create(mem_ctx, &state,
866                                 struct wrepl_pull_table_state);
867         if (req == NULL) {
868                 return NULL;
869         };
870
871         state->packet.opcode                            = WREPL_OPCODE_BITS;
872         state->packet.assoc_ctx                         = io->in.assoc_ctx;
873         state->packet.mess_type                         = WREPL_REPLICATION;
874         state->packet.message.replication.command       = WREPL_REPL_TABLE_QUERY;
875
876         subreq = wrepl_request_send(state, ev, wrepl_socket, &state->packet, NULL);
877         if (tevent_req_nomem(subreq, req)) {
878                 return tevent_req_post(req, ev);
879         }
880         tevent_req_set_callback(subreq, wrepl_pull_table_done, req);
881
882         return req;
883 }
884
885 static void wrepl_pull_table_done(struct tevent_req *subreq)
886 {
887         struct tevent_req *req = tevent_req_callback_data(subreq,
888                                  struct tevent_req);
889         struct wrepl_pull_table_state *state = tevent_req_data(req,
890                                                struct wrepl_pull_table_state);
891         NTSTATUS status;
892         struct wrepl_packet *packet;
893         struct wrepl_table *table;
894
895         status = wrepl_request_recv(subreq, state, &packet);
896         TALLOC_FREE(subreq);
897         if (!NT_STATUS_IS_OK(status)) {
898                 tevent_req_nterror(req, status);
899                 return;
900         }
901
902         if (packet->mess_type != WREPL_REPLICATION) {
903                 tevent_req_nterror(req, NT_STATUS_NETWORK_ACCESS_DENIED);
904                 return;
905         }
906
907         if (packet->message.replication.command != WREPL_REPL_TABLE_REPLY) {
908                 tevent_req_nterror(req, NT_STATUS_INVALID_NETWORK_RESPONSE);
909                 return;
910         }
911
912         table = &packet->message.replication.info.table;
913
914         state->num_partners = table->partner_count;
915         state->partners = talloc_move(state, &table->partners);
916
917         tevent_req_done(req);
918 }
919
920 /*
921   fetch the partner tables - recv
922 */
923 NTSTATUS wrepl_pull_table_recv(struct tevent_req *req,
924                                TALLOC_CTX *mem_ctx,
925                                struct wrepl_pull_table *io)
926 {
927         struct wrepl_pull_table_state *state = tevent_req_data(req,
928                                                struct wrepl_pull_table_state);
929         NTSTATUS status;
930
931         if (tevent_req_is_nterror(req, &status)) {
932                 tevent_req_received(req);
933                 return status;
934         }
935
936         io->out.num_partners = state->num_partners;
937         io->out.partners = talloc_move(mem_ctx, &state->partners);
938
939         tevent_req_received(req);
940         return NT_STATUS_OK;
941 }
942
943 /*
944   fetch the partner table - sync api
945 */
946 NTSTATUS wrepl_pull_table(struct wrepl_socket *wrepl_socket,
947                           TALLOC_CTX *mem_ctx,
948                           struct wrepl_pull_table *io)
949 {
950         struct tevent_req *subreq;
951         bool ok;
952         NTSTATUS status;
953
954         subreq = wrepl_pull_table_send(mem_ctx, wrepl_socket->event.ctx,
955                                        wrepl_socket, io);
956         NT_STATUS_HAVE_NO_MEMORY(subreq);
957
958         ok = tevent_req_poll(subreq, wrepl_socket->event.ctx);
959         if (!ok) {
960                 TALLOC_FREE(subreq);
961                 return NT_STATUS_INTERNAL_ERROR;
962         }
963
964         status = wrepl_pull_table_recv(subreq, mem_ctx, io);
965         TALLOC_FREE(subreq);
966         NT_STATUS_NOT_OK_RETURN(status);
967
968         return NT_STATUS_OK;
969 }
970
971
972 struct wrepl_pull_names_state {
973         struct {
974                 const struct wrepl_pull_names *io;
975         } caller;
976         struct wrepl_packet packet;
977         uint32_t num_names;
978         struct wrepl_name *names;
979 };
980
981 static void wrepl_pull_names_done(struct tevent_req *subreq);
982
983 struct tevent_req *wrepl_pull_names_send(TALLOC_CTX *mem_ctx,
984                                          struct tevent_context *ev,
985                                          struct wrepl_socket *wrepl_socket,
986                                          const struct wrepl_pull_names *io)
987 {
988         struct tevent_req *req;
989         struct wrepl_pull_names_state *state;
990         struct tevent_req *subreq;
991
992         if (wrepl_socket->event.ctx != ev) {
993                 /* TODO: remove wrepl_socket->event.ctx !!! */
994                 smb_panic("wrepl_pull_names_send event context mismatch!");
995                 return NULL;
996         }
997
998         req = tevent_req_create(mem_ctx, &state,
999                                 struct wrepl_pull_names_state);
1000         if (req == NULL) {
1001                 return NULL;
1002         };
1003         state->caller.io = io;
1004
1005         state->packet.opcode                            = WREPL_OPCODE_BITS;
1006         state->packet.assoc_ctx                         = io->in.assoc_ctx;
1007         state->packet.mess_type                         = WREPL_REPLICATION;
1008         state->packet.message.replication.command       = WREPL_REPL_SEND_REQUEST;
1009         state->packet.message.replication.info.owner    = io->in.partner;
1010
1011         subreq = wrepl_request_send(state, ev, wrepl_socket, &state->packet, NULL);
1012         if (tevent_req_nomem(subreq, req)) {
1013                 return tevent_req_post(req, ev);
1014         }
1015         tevent_req_set_callback(subreq, wrepl_pull_names_done, req);
1016
1017         return req;
1018 }
1019
1020 static void wrepl_pull_names_done(struct tevent_req *subreq)
1021 {
1022         struct tevent_req *req = tevent_req_callback_data(subreq,
1023                                  struct tevent_req);
1024         struct wrepl_pull_names_state *state = tevent_req_data(req,
1025                                                struct wrepl_pull_names_state);
1026         NTSTATUS status;
1027         struct wrepl_packet *packet;
1028         uint32_t i;
1029
1030         status = wrepl_request_recv(subreq, state, &packet);
1031         TALLOC_FREE(subreq);
1032         if (!NT_STATUS_IS_OK(status)) {
1033                 tevent_req_nterror(req, status);
1034                 return;
1035         }
1036
1037         if (packet->mess_type != WREPL_REPLICATION) {
1038                 tevent_req_nterror(req, NT_STATUS_NETWORK_ACCESS_DENIED);
1039                 return;
1040         }
1041
1042         if (packet->message.replication.command != WREPL_REPL_SEND_REPLY) {
1043                 tevent_req_nterror(req, NT_STATUS_INVALID_NETWORK_RESPONSE);
1044                 return;
1045         }
1046
1047         state->num_names = packet->message.replication.info.reply.num_names;
1048
1049         state->names = talloc_array(state, struct wrepl_name, state->num_names);
1050         if (tevent_req_nomem(state->names, req)) {
1051                 return;
1052         }
1053
1054         /* convert the list of names and addresses to a sane format */
1055         for (i=0; i < state->num_names; i++) {
1056                 struct wrepl_wins_name *wname = &packet->message.replication.info.reply.names[i];
1057                 struct wrepl_name *name = &state->names[i];
1058
1059                 name->name      = *wname->name;
1060                 talloc_steal(state->names, wname->name);
1061                 name->type      = WREPL_NAME_TYPE(wname->flags);
1062                 name->state     = WREPL_NAME_STATE(wname->flags);
1063                 name->node      = WREPL_NAME_NODE(wname->flags);
1064                 name->is_static = WREPL_NAME_IS_STATIC(wname->flags);
1065                 name->raw_flags = wname->flags;
1066                 name->version_id= wname->id;
1067                 name->owner     = talloc_strdup(state->names,
1068                                                 state->caller.io->in.partner.address);
1069                 if (tevent_req_nomem(name->owner, req)) {
1070                         return;
1071                 }
1072
1073                 /* trying to save 1 or 2 bytes on the wire isn't a good idea */
1074                 if (wname->flags & 2) {
1075                         uint32_t j;
1076
1077                         name->num_addresses = wname->addresses.addresses.num_ips;
1078                         name->addresses = talloc_array(state->names,
1079                                                        struct wrepl_address,
1080                                                        name->num_addresses);
1081                         if (tevent_req_nomem(name->addresses, req)) {
1082                                 return;
1083                         }
1084
1085                         for (j=0;j<name->num_addresses;j++) {
1086                                 name->addresses[j].owner =
1087                                         talloc_move(name->addresses,
1088                                                     &wname->addresses.addresses.ips[j].owner);
1089                                 name->addresses[j].address = 
1090                                         talloc_move(name->addresses,
1091                                                     &wname->addresses.addresses.ips[j].ip);
1092                         }
1093                 } else {
1094                         name->num_addresses = 1;
1095                         name->addresses = talloc_array(state->names,
1096                                                        struct wrepl_address,
1097                                                        name->num_addresses);
1098                         if (tevent_req_nomem(name->addresses, req)) {
1099                                 return;
1100                         }
1101
1102                         name->addresses[0].owner = talloc_strdup(name->addresses, name->owner);
1103                         if (tevent_req_nomem(name->addresses[0].owner, req)) {
1104                                 return;
1105                         }
1106                         name->addresses[0].address = talloc_move(name->addresses,
1107                                                                  &wname->addresses.ip);
1108                 }
1109         }
1110
1111         tevent_req_done(req);
1112 }
1113
1114 /*
1115   fetch the names for a WINS partner - recv
1116 */
1117 NTSTATUS wrepl_pull_names_recv(struct tevent_req *req,
1118                                TALLOC_CTX *mem_ctx,
1119                                struct wrepl_pull_names *io)
1120 {
1121         struct wrepl_pull_names_state *state = tevent_req_data(req,
1122                                                struct wrepl_pull_names_state);
1123         NTSTATUS status;
1124
1125         if (tevent_req_is_nterror(req, &status)) {
1126                 tevent_req_received(req);
1127                 return status;
1128         }
1129
1130         io->out.num_names = state->num_names;
1131         io->out.names = talloc_move(mem_ctx, &state->names);
1132
1133         tevent_req_received(req);
1134         return NT_STATUS_OK;
1135 }
1136
1137
1138
1139 /*
1140   fetch the names for a WINS partner - sync api
1141 */
1142 NTSTATUS wrepl_pull_names(struct wrepl_socket *wrepl_socket,
1143                           TALLOC_CTX *mem_ctx,
1144                           struct wrepl_pull_names *io)
1145 {
1146         struct tevent_req *subreq;
1147         bool ok;
1148         NTSTATUS status;
1149
1150         subreq = wrepl_pull_names_send(mem_ctx, wrepl_socket->event.ctx,
1151                                        wrepl_socket, io);
1152         NT_STATUS_HAVE_NO_MEMORY(subreq);
1153
1154         ok = tevent_req_poll(subreq, wrepl_socket->event.ctx);
1155         if (!ok) {
1156                 TALLOC_FREE(subreq);
1157                 return NT_STATUS_INTERNAL_ERROR;
1158         }
1159
1160         status = wrepl_pull_names_recv(subreq, mem_ctx, io);
1161         TALLOC_FREE(subreq);
1162         NT_STATUS_NOT_OK_RETURN(status);
1163
1164         return NT_STATUS_OK;
1165 }