trigger SIGPIPE...
[metze/samba/wip.git] / source4 / libcli / wrepl / winsrepl.c
index 0409607aa95f85a0a8fcfc5c6cd7c6b2af76dc6f..3d6db236a73acda47e19382e6418597dfb4c4e74 100644 (file)
@@ -4,10 +4,11 @@
    low level WINS replication client code
 
    Copyright (C) Andrew Tridgell 2005
+   Copyright (C) Stefan Metzmacher 2005-2010
    
    This program is free software; you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
-   the Free Software Foundation; either version 2 of the License, or
+   the Free Software Foundation; either version 3 of the License, or
    (at your option) any later version.
    
    This program is distributed in the hope that it will be useful,
    GNU General Public License for more details.
    
    You should have received a copy of the GNU General Public License
-   along with this program; if not, write to the Free Software
-   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+   along with this program.  If not, see <http://www.gnu.org/licenses/>.
 */
 
 #include "includes.h"
 #include "lib/events/events.h"
-#include "dlinklist.h"
-#include "lib/socket/socket.h"
+#include "../lib/util/dlinklist.h"
 #include "libcli/wrepl/winsrepl.h"
+#include "librpc/gen_ndr/ndr_winsrepl.h"
 #include "lib/stream/packet.h"
-#include "libcli/composite/composite.h"
-
-static struct wrepl_request *wrepl_request_finished(struct wrepl_request *req, NTSTATUS status);
+#include "system/network.h"
+#include "lib/socket/netif.h"
+#include "param/param.h"
+#include "lib/util/tevent_ntstatus.h"
+#include "lib/tsocket/tsocket.h"
+#include "libcli/util/tstream.h"
 
 /*
-  mark all pending requests as dead - called when a socket error happens
+  main context structure for the wins replication client library
 */
-static void wrepl_socket_dead(struct wrepl_socket *wrepl_socket, NTSTATUS status)
-{
-       wrepl_socket->dead = True;
+struct wrepl_socket {
+       struct {
+               struct tevent_context *ctx;
+       } event;
 
-       if (wrepl_socket->packet) {
-               packet_recv_disable(wrepl_socket->packet);
-               packet_set_fde(wrepl_socket->packet, NULL);
-               packet_set_socket(wrepl_socket->packet, NULL);
-       }
+       /* the default timeout for requests, 0 means no timeout */
+#define WREPL_SOCKET_REQUEST_TIMEOUT   (60)
+       uint32_t request_timeout;
 
-       if (wrepl_socket->event.fde) {
-               talloc_free(wrepl_socket->event.fde);
-               wrepl_socket->event.fde = NULL;
-       }
+       struct tevent_queue *request_queue;
 
-       if (wrepl_socket->sock) {
-               talloc_free(wrepl_socket->sock);
-               wrepl_socket->sock = NULL;
-       }
+       struct tstream_context *stream;
+};
 
-       if (NT_STATUS_EQUAL(NT_STATUS_UNSUCCESSFUL, status)) {
-               status = NT_STATUS_UNEXPECTED_NETWORK_ERROR;
-       }
-       while (wrepl_socket->recv_queue) {
-               struct wrepl_request *req = wrepl_socket->recv_queue;
-               DLIST_REMOVE(wrepl_socket->recv_queue, req);
-               wrepl_request_finished(req, status);
+bool wrepl_socket_is_connected(struct wrepl_socket *wrepl_sock)
+{
+       if (!wrepl_sock) {
+               return false;
        }
 
-       talloc_set_destructor(wrepl_socket, NULL);
-       if (wrepl_socket->free_skipped) {
-               talloc_free(wrepl_socket);
+       if (!wrepl_sock->stream) {
+               return false;
        }
-}
 
-static void wrepl_request_timeout_handler(struct event_context *ev, struct timed_event *te,
-                                         struct timeval t, void *ptr)
-{
-       struct wrepl_request *req = talloc_get_type(ptr, struct wrepl_request);
-       wrepl_socket_dead(req->wrepl_socket, NT_STATUS_IO_TIMEOUT);
+       return true;
 }
 
 /*
-  handle recv events 
+  initialise a wrepl_socket. The event_ctx is optional, if provided then
+  operations will use that event context
 */
-static NTSTATUS wrepl_finish_recv(void *private, DATA_BLOB packet_blob_in)
+struct wrepl_socket *wrepl_socket_init(TALLOC_CTX *mem_ctx,
+                                      struct tevent_context *event_ctx)
 {
-       struct wrepl_socket *wrepl_socket = talloc_get_type(private, struct wrepl_socket);
-       struct wrepl_request *req = wrepl_socket->recv_queue;
-       DATA_BLOB blob;
+       struct wrepl_socket *wrepl_socket;
 
-       if (!req) {
-               DEBUG(1,("Received unexpected WINS packet of length %u!\n", packet_blob_in.length));
-               return NT_STATUS_INVALID_NETWORK_RESPONSE;
+       wrepl_socket = talloc_zero(mem_ctx, struct wrepl_socket);
+       if (!wrepl_socket) {
+               return NULL;
        }
 
-       req->packet = talloc(req, struct wrepl_packet);
-       NT_STATUS_HAVE_NO_MEMORY(req->packet);
-
-       blob.data = packet_blob_in.data + 4;
-       blob.length = packet_blob_in.length - 4;
-       
-       /* we have a full request - parse it */
-       req->status = ndr_pull_struct_blob(&blob,
-                                          req->packet, req->packet,
-                                          (ndr_pull_flags_fn_t)ndr_pull_wrepl_packet);
-       if (!NT_STATUS_IS_OK(req->status)) {
-               wrepl_request_finished(req, req->status);
-               return NT_STATUS_OK;
+       wrepl_socket->event.ctx = event_ctx;
+       if (!wrepl_socket->event.ctx) {
+               goto failed;
        }
 
-       if (DEBUGLVL(10)) {
-               DEBUG(10,("Received WINS packet of length %u\n", packet_blob_in.length));
-               NDR_PRINT_DEBUG(wrepl_packet, req->packet);
+       wrepl_socket->request_queue = tevent_queue_create(wrepl_socket,
+                                                         "wrepl request queue");
+       if (wrepl_socket->request_queue == NULL) {
+               goto failed;
        }
 
-       wrepl_request_finished(req, req->status);
-       return NT_STATUS_OK;
-}
+       wrepl_socket->request_timeout   = WREPL_SOCKET_REQUEST_TIMEOUT;
 
-/*
-  handler for winrepl events
-*/
-static void wrepl_handler(struct event_context *ev, struct fd_event *fde, 
-                         uint16_t flags, void *private)
-{
-       struct wrepl_socket *wrepl_socket = talloc_get_type(private, 
-                                                           struct wrepl_socket);
-       if (flags & EVENT_FD_READ) {
-               packet_recv(wrepl_socket->packet);
-               return;
-       }
-       if (flags & EVENT_FD_WRITE) {
-               packet_queue_run(wrepl_socket->packet);
-       }
-}
+       return wrepl_socket;
 
-static void wrepl_error(void *private, NTSTATUS status)
-{
-       struct wrepl_socket *wrepl_socket = talloc_get_type(private, 
-                                                           struct wrepl_socket);
-       wrepl_socket_dead(wrepl_socket, status);
+failed:
+       talloc_free(wrepl_socket);
+       return NULL;
 }
 
-
 /*
-  destroy a wrepl_socket destructor
+  initialise a wrepl_socket from an already existing connection
 */
-static int wrepl_socket_destructor(void *ptr)
+NTSTATUS wrepl_socket_donate_stream(struct wrepl_socket *wrepl_socket,
+                                   struct tstream_context **stream)
 {
-       struct wrepl_socket *sock = talloc_get_type(ptr, struct wrepl_socket);
-       if (sock->dead) {
-               sock->free_skipped = True;
-               return -1;
+       if (wrepl_socket->stream) {
+               return NT_STATUS_CONNECTION_ACTIVE;
        }
-       wrepl_socket_dead(sock, NT_STATUS_LOCAL_DISCONNECT);
-       return 0;
+
+       wrepl_socket->stream = talloc_move(wrepl_socket, stream);
+       return NT_STATUS_OK;
 }
 
 /*
-  initialise a wrepl_socket. The event_ctx is optional, if provided then
-  operations will use that event context
+  initialise a wrepl_socket from an already existing connection
 */
-struct wrepl_socket *wrepl_socket_init(TALLOC_CTX *mem_ctx, 
-                                      struct event_context *event_ctx)
+NTSTATUS wrepl_socket_split_stream(struct wrepl_socket *wrepl_socket,
+                                  TALLOC_CTX *mem_ctx,
+                                  struct tstream_context **stream)
 {
-       struct wrepl_socket *wrepl_socket;
-       NTSTATUS status;
+       size_t num_requests;
 
-       wrepl_socket = talloc_zero(mem_ctx, struct wrepl_socket);
-       if (!wrepl_socket) return NULL;
-
-       if (event_ctx == NULL) {
-               wrepl_socket->event.ctx = event_context_init(wrepl_socket);
-       } else {
-               wrepl_socket->event.ctx = talloc_reference(wrepl_socket, event_ctx);
+       if (!wrepl_socket->stream) {
+               return NT_STATUS_CONNECTION_INVALID;
        }
-       if (!wrepl_socket->event.ctx) goto failed;
-
-       status = socket_create("ip", SOCKET_TYPE_STREAM, &wrepl_socket->sock, 0);
-       if (!NT_STATUS_IS_OK(status)) goto failed;
 
-       talloc_steal(wrepl_socket, wrepl_socket->sock);
+       num_requests = tevent_queue_length(wrepl_socket->request_queue);
+       if (num_requests > 0) {
+               return NT_STATUS_CONNECTION_IN_USE;
+       }
 
-       wrepl_socket->request_timeout   = WREPL_SOCKET_REQUEST_TIMEOUT;
+       *stream = talloc_move(wrepl_socket, &wrepl_socket->stream);
+       return NT_STATUS_OK;
+}
 
-       talloc_set_destructor(wrepl_socket, wrepl_socket_destructor);
+const char *wrepl_best_ip(struct loadparm_context *lp_ctx, const char *peer_ip)
+{
+       struct interface *ifaces;
+       load_interface_list(lp_ctx, lp_ctx, &ifaces);
+       return iface_list_best_ip(ifaces, peer_ip);
+}
 
-       return wrepl_socket;
+struct wrepl_connect_state {
+       struct {
+               struct wrepl_socket *wrepl_socket;
+               struct tevent_context *ev;
+       } caller;
+       struct tsocket_address *local_address;
+       struct tsocket_address *remote_address;
+       struct tstream_context *stream;
+};
 
-failed:
-       talloc_free(wrepl_socket);
-       return NULL;
-}
+static void wrepl_connect_trigger(struct tevent_req *req,
+                                 void *private_date);
 
-/*
-  initialise a wrepl_socket from an already existing connection
-*/
-struct wrepl_socket *wrepl_socket_merge(TALLOC_CTX *mem_ctx, 
-                                       struct event_context *event_ctx,
-                                       struct socket_context *sock,
-                                       struct packet_context *pack)
+struct tevent_req *wrepl_connect_send(TALLOC_CTX *mem_ctx,
+                                     struct tevent_context *ev,
+                                     struct wrepl_socket *wrepl_socket,
+                                     const char *our_ip, const char *peer_ip)
 {
-       struct wrepl_socket *wrepl_socket;
-
-       wrepl_socket = talloc_zero(mem_ctx, struct wrepl_socket);
-       if (wrepl_socket == NULL) goto failed;
+       struct tevent_req *req;
+       struct wrepl_connect_state *state;
+       int ret;
+       bool ok;
 
-       wrepl_socket->event.ctx = talloc_reference(wrepl_socket, event_ctx);
-       if (wrepl_socket->event.ctx == NULL) goto failed;
+       req = tevent_req_create(mem_ctx, &state,
+                               struct wrepl_connect_state);
+       if (req == NULL) {
+               return NULL;
+       }
 
-       wrepl_socket->sock = sock;
-       talloc_steal(wrepl_socket, wrepl_socket->sock);
+       state->caller.wrepl_socket = wrepl_socket;
+       state->caller.ev = ev;
 
+       if (wrepl_socket->stream) {
+               tevent_req_nterror(req, NT_STATUS_CONNECTION_ACTIVE);
+               return tevent_req_post(req, ev);
+       }
 
-       wrepl_socket->request_timeout   = WREPL_SOCKET_REQUEST_TIMEOUT;
+       ret = tsocket_address_inet_from_strings(state, "ipv4",
+                                               our_ip, 0,
+                                               &state->local_address);
+       if (ret != 0) {
+               NTSTATUS status = map_nt_error_from_unix_common(errno);
+               tevent_req_nterror(req, status);
+               return tevent_req_post(req, ev);
+       }
 
-       wrepl_socket->event.fde = event_add_fd(wrepl_socket->event.ctx, wrepl_socket,
-                                              socket_get_fd(wrepl_socket->sock), 
-                                              EVENT_FD_READ,
-                                              wrepl_handler, wrepl_socket);
-       if (wrepl_socket->event.fde == NULL) {
-               goto failed;
+       ret = tsocket_address_inet_from_strings(state, "ipv4",
+                                               peer_ip, WINS_REPLICATION_PORT,
+                                               &state->remote_address);
+       if (ret != 0) {
+               NTSTATUS status = map_nt_error_from_unix_common(errno);
+               tevent_req_nterror(req, status);
+               return tevent_req_post(req, ev);
        }
 
-       wrepl_socket->packet = pack;
-       talloc_steal(wrepl_socket, wrepl_socket->packet);
-       packet_set_private(wrepl_socket->packet, wrepl_socket);
-       packet_set_socket(wrepl_socket->packet, wrepl_socket->sock);
-       packet_set_callback(wrepl_socket->packet, wrepl_finish_recv);
-       packet_set_full_request(wrepl_socket->packet, packet_full_request_u32);
-       packet_set_error_handler(wrepl_socket->packet, wrepl_error);
-       packet_set_event_context(wrepl_socket->packet, wrepl_socket->event.ctx);
-       packet_set_fde(wrepl_socket->packet, wrepl_socket->event.fde);
-       packet_set_serialise(wrepl_socket->packet);
+       ok = tevent_queue_add(wrepl_socket->request_queue,
+                             ev,
+                             req,
+                             wrepl_connect_trigger,
+                             NULL);
+       if (!ok) {
+               tevent_req_oom(req);
+               return tevent_req_post(req, ev);
+       }
 
-       talloc_set_destructor(wrepl_socket, wrepl_socket_destructor);
-       
-       return wrepl_socket;
+       if (wrepl_socket->request_timeout > 0) {
+               struct timeval endtime;
+               endtime = tevent_timeval_current_ofs(wrepl_socket->request_timeout, 0);
+               ok = tevent_req_set_endtime(req, ev, endtime);
+               if (!ok) {
+                       return tevent_req_post(req, ev);
+               }
+       }
 
-failed:
-       talloc_free(wrepl_socket);
-       return NULL;
+       return req;
 }
 
-/*
-  destroy a wrepl_request
-*/
-static int wrepl_request_destructor(void *ptr)
+static void wrepl_connect_done(struct tevent_req *subreq);
+
+static void wrepl_connect_trigger(struct tevent_req *req,
+                                 void *private_date)
 {
-       struct wrepl_request *req = talloc_get_type(ptr, struct wrepl_request);
-       if (req->state == WREPL_REQUEST_RECV) {
-               DLIST_REMOVE(req->wrepl_socket->recv_queue, req);
+       struct wrepl_connect_state *state = tevent_req_data(req,
+                                           struct wrepl_connect_state);
+       struct tevent_req *subreq;
+
+       subreq = tstream_inet_tcp_connect_send(state,
+                                              state->caller.ev,
+                                              state->local_address,
+                                              state->remote_address);
+       if (tevent_req_nomem(subreq, req)) {
+               return;
        }
-       req->state = WREPL_REQUEST_ERROR;
-       return 0;
+       tevent_req_set_callback(subreq, wrepl_connect_done, req);
+
+       return;
 }
 
-/*
-  wait for a request to complete
-*/
-static NTSTATUS wrepl_request_wait(struct wrepl_request *req)
+static void wrepl_connect_done(struct tevent_req *subreq)
 {
-       NT_STATUS_HAVE_NO_MEMORY(req);
-       while (req->state < WREPL_REQUEST_DONE) {
-               event_loop_once(req->wrepl_socket->event.ctx);
+       struct tevent_req *req = tevent_req_callback_data(subreq,
+                                struct tevent_req);
+       struct wrepl_connect_state *state = tevent_req_data(req,
+                                           struct wrepl_connect_state);
+       int ret;
+       int sys_errno;
+
+       ret = tstream_inet_tcp_connect_recv(subreq, &sys_errno,
+                                           state, &state->stream, NULL);
+       if (ret != 0) {
+               NTSTATUS status = map_nt_error_from_unix_common(sys_errno);
+               tevent_req_nterror(req, status);
+               return;
        }
-       return req->status;
-}
 
-struct wrepl_connect_state {
-       struct composite_context *result;
-       struct wrepl_socket *wrepl_socket;
-       struct composite_context *creq;
-};
+       smb_msleep(5000);
+       tevent_req_done(req);
+}
 
 /*
-  handler for winrepl connection completion
+  connect a wrepl_socket to a WINS server - recv side
 */
-static void wrepl_connect_handler(struct composite_context *creq)
+NTSTATUS wrepl_connect_recv(struct tevent_req *req)
 {
-       struct wrepl_connect_state *state = talloc_get_type(creq->async.private_data, 
+       struct wrepl_connect_state *state = tevent_req_data(req,
                                            struct wrepl_connect_state);
-       struct wrepl_socket *wrepl_socket = state->wrepl_socket;
-       struct composite_context *result = state->result;
-
-       result->status = socket_connect_recv(state->creq);
-       if (!composite_is_ok(result)) return;
+       struct wrepl_socket *wrepl_socket = state->caller.wrepl_socket;
+       NTSTATUS status;
 
-       wrepl_socket->event.fde = event_add_fd(wrepl_socket->event.ctx, wrepl_socket, 
-                                              socket_get_fd(wrepl_socket->sock), 
-                                              EVENT_FD_READ,
-                                              wrepl_handler, wrepl_socket);
-       if (composite_nomem(wrepl_socket->event.fde, result)) return;
+       if (tevent_req_is_nterror(req, &status)) {
+               tevent_req_received(req);
+               return status;
+       }
 
-       /* setup the stream -> packet parser */
-       wrepl_socket->packet = packet_init(wrepl_socket);
-       if (composite_nomem(wrepl_socket->packet, result)) return;
-       packet_set_private(wrepl_socket->packet, wrepl_socket);
-       packet_set_socket(wrepl_socket->packet, wrepl_socket->sock);
-       packet_set_callback(wrepl_socket->packet, wrepl_finish_recv);
-       packet_set_full_request(wrepl_socket->packet, packet_full_request_u32);
-       packet_set_error_handler(wrepl_socket->packet, wrepl_error);
-       packet_set_event_context(wrepl_socket->packet, wrepl_socket->event.ctx);
-       packet_set_fde(wrepl_socket->packet, wrepl_socket->event.fde);
-       packet_set_serialise(wrepl_socket->packet);
+       wrepl_socket->stream = talloc_move(wrepl_socket, &state->stream);
 
-       composite_done(result);
+       tevent_req_received(req);
+       return NT_STATUS_OK;
 }
 
 /*
-  connect a wrepl_socket to a WINS server
+  connect a wrepl_socket to a WINS server - sync API
 */
-struct composite_context *wrepl_connect_send(struct wrepl_socket *wrepl_socket,
-                                            const char *our_ip, const char *peer_ip)
+NTSTATUS wrepl_connect(struct wrepl_socket *wrepl_socket,
+                      const char *our_ip, const char *peer_ip)
 {
-       struct composite_context *result;
-       struct wrepl_connect_state *state;
-
-       result = talloc_zero(wrepl_socket, struct composite_context);
-       if (!result) return NULL;
-
-       result->state           = COMPOSITE_STATE_IN_PROGRESS;
-       result->event_ctx       = wrepl_socket->event.ctx;
+       struct tevent_req *subreq;
+       bool ok;
+       NTSTATUS status;
 
-       state = talloc_zero(result, struct wrepl_connect_state);
-       if (composite_nomem(state, result)) return result;
-       result->private_data    = state;
-       state->result           = result;
-       state->wrepl_socket     = wrepl_socket;
+       subreq = wrepl_connect_send(wrepl_socket, wrepl_socket->event.ctx,
+                                   wrepl_socket, our_ip, peer_ip);
+       NT_STATUS_HAVE_NO_MEMORY(subreq);
 
-       if (!our_ip) {
-               our_ip = iface_best_ip(peer_ip);
+       ok = tevent_req_poll(subreq, wrepl_socket->event.ctx);
+       if (!ok) {
+               TALLOC_FREE(subreq);
+               return NT_STATUS_INTERNAL_ERROR;
        }
 
-       state->creq = socket_connect_send(wrepl_socket->sock, our_ip, 0,
-                                         peer_ip, WINS_REPLICATION_PORT,
-                                         0, wrepl_socket->event.ctx);
-       composite_continue(result, state->creq, wrepl_connect_handler, state);
-       return result;
+       status = wrepl_connect_recv(subreq);
+       TALLOC_FREE(subreq);
+       NT_STATUS_NOT_OK_RETURN(status);
+
+       return NT_STATUS_OK;
 }
 
-/*
-  connect a wrepl_socket to a WINS server - recv side
-*/
-NTSTATUS wrepl_connect_recv(struct composite_context *result)
+struct wrepl_request_state {
+       struct {
+               struct wrepl_socket *wrepl_socket;
+               struct tevent_context *ev;
+       } caller;
+       struct wrepl_send_ctrl ctrl;
+       struct {
+               struct wrepl_wrap wrap;
+               DATA_BLOB blob;
+               struct iovec iov;
+       } req;
+       bool one_way;
+       struct {
+               DATA_BLOB blob;
+               struct wrepl_packet *packet;
+       } rep;
+};
+
+static void wrepl_request_trigger(struct tevent_req *req,
+                                 void *private_data);
+
+struct tevent_req *wrepl_request_send(TALLOC_CTX *mem_ctx,
+                                     struct tevent_context *ev,
+                                     struct wrepl_socket *wrepl_socket,
+                                     const struct wrepl_packet *packet,
+                                     const struct wrepl_send_ctrl *ctrl)
 {
-       struct wrepl_connect_state *state = talloc_get_type(result->private_data,
-                                           struct wrepl_connect_state);
-       struct wrepl_socket *wrepl_socket = state->wrepl_socket;
-       NTSTATUS status = composite_wait(result);
+       struct tevent_req *req;
+       struct wrepl_request_state *state;
+       NTSTATUS status;
+       enum ndr_err_code ndr_err;
+       bool ok;
 
-       if (!NT_STATUS_IS_OK(status)) {
-               wrepl_socket_dead(wrepl_socket, status);
+       if (wrepl_socket->event.ctx != ev) {
+               /* TODO: remove wrepl_socket->event.ctx !!! */
+               smb_panic("wrepl_associate_stop_send event context mismatch!");
+               return NULL;
        }
 
-       talloc_free(result);
-       return status;
-}
+       req = tevent_req_create(mem_ctx, &state,
+                               struct wrepl_request_state);
+       if (req == NULL) {
+               return NULL;
+       }
 
-/*
-  connect a wrepl_socket to a WINS server - sync API
-*/
-NTSTATUS wrepl_connect(struct wrepl_socket *wrepl_socket, const char *our_ip, const char *peer_ip)
-{
-       struct composite_context *c_req = wrepl_connect_send(wrepl_socket, our_ip, peer_ip);
-       return wrepl_connect_recv(c_req);
-}
+       state->caller.wrepl_socket = wrepl_socket;
+       state->caller.ev = ev;
 
-/* 
-   callback from wrepl_request_trigger() 
-*/
-static void wrepl_request_trigger_handler(struct event_context *ev, struct timed_event *te,
-                                         struct timeval t, void *ptr)
-{
-       struct wrepl_request *req = talloc_get_type(ptr, struct wrepl_request);
-       if (req->async.fn) {
-               req->async.fn(req);
+       if (ctrl) {
+               state->ctrl = *ctrl;
+       }
+
+       if (wrepl_socket->stream == NULL) {
+               tevent_req_nterror(req, NT_STATUS_CONNECTION_DISCONNECTED);
+               return tevent_req_post(req, ev);
+       }
+
+       state->req.wrap.packet = *packet;
+       ndr_err = ndr_push_struct_blob(&state->req.blob, state,
+                                      &state->req.wrap,
+                                      (ndr_push_flags_fn_t)ndr_push_wrepl_wrap);
+       if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
+               status = ndr_map_error2ntstatus(ndr_err);
+               tevent_req_nterror(req, status);
+               return tevent_req_post(req, ev);
+       }
+
+       state->req.iov.iov_base = (char *) state->req.blob.data;
+       state->req.iov.iov_len = state->req.blob.length;
+
+       ok = tevent_queue_add(wrepl_socket->request_queue,
+                             ev,
+                             req,
+                             wrepl_request_trigger,
+                             NULL);
+       if (!ok) {
+               tevent_req_oom(req);
+               return tevent_req_post(req, ev);
+       }
+
+       if (wrepl_socket->request_timeout > 0) {
+               struct timeval endtime;
+               endtime = tevent_timeval_current_ofs(wrepl_socket->request_timeout, 0);
+               ok = tevent_req_set_endtime(req, ev, endtime);
+               if (!ok) {
+                       return tevent_req_post(req, ev);
+               }
        }
+
+       return req;
 }
 
-/*
-  trigger an immediate event on a wrepl_request
-  the return value should only be used in wrepl_request_send()
-  this is the only place where req->trigger is True
-*/
-static struct wrepl_request *wrepl_request_finished(struct wrepl_request *req, NTSTATUS status)
+static void wrepl_request_writev_done(struct tevent_req *subreq);
+
+static void wrepl_request_trigger(struct tevent_req *req,
+                                 void *private_data)
 {
-       struct timed_event *te;
+       struct wrepl_request_state *state = tevent_req_data(req,
+                                           struct wrepl_request_state);
+       struct tevent_req *subreq;
 
-       if (req->state == WREPL_REQUEST_RECV) {
-               DLIST_REMOVE(req->wrepl_socket->recv_queue, req);
+       if (state->caller.wrepl_socket->stream == NULL) {
+               tevent_req_nterror(req, NT_STATUS_CONNECTION_DISCONNECTED);
+               return;
        }
 
-       if (!NT_STATUS_IS_OK(status)) {
-               req->state      = WREPL_REQUEST_ERROR;
-       } else {
-               req->state      = WREPL_REQUEST_DONE;
-       }
-
-       req->status     = status;
-
-       if (req->trigger) {
-               req->trigger = False;
-               /* a zero timeout means immediate */
-               te = event_add_timed(req->wrepl_socket->event.ctx,
-                                    req, timeval_zero(),
-                                    wrepl_request_trigger_handler, req);
-               if (!te) {
-                       talloc_free(req);
-                       return NULL;
-               }
-               return req;
+       if (DEBUGLVL(10)) {
+               DEBUG(10,("Sending WINS packet of length %u\n",
+                         (unsigned)state->req.blob.length));
+               NDR_PRINT_DEBUG(wrepl_packet, &state->req.wrap.packet);
        }
 
-       if (req->async.fn) {
-               req->async.fn(req);
+       subreq = tstream_writev_send(state,
+                                    state->caller.ev,
+                                    state->caller.wrepl_socket->stream,
+                                    &state->req.iov, 1);
+       if (tevent_req_nomem(subreq, req)) {
+               return;
        }
-       return NULL;
+       tevent_req_set_callback(subreq, wrepl_request_writev_done, req);
 }
 
-struct wrepl_send_ctrl_state {
-       struct wrepl_send_ctrl ctrl;
-       struct wrepl_request *req;
-       struct wrepl_socket *wrepl_sock;
-};
+static void wrepl_request_disconnect_done(struct tevent_req *subreq);
+static void wrepl_request_read_pdu_done(struct tevent_req *subreq);
 
-static int wrepl_send_ctrl_destructor(void *ptr)
+static void wrepl_request_writev_done(struct tevent_req *subreq)
 {
-       struct wrepl_send_ctrl_state *s = talloc_get_type(ptr, struct wrepl_send_ctrl_state);
-       struct wrepl_request *req = s->wrepl_sock->recv_queue;
+       struct tevent_req *req = tevent_req_callback_data(subreq,
+                                struct tevent_req);
+       struct wrepl_request_state *state = tevent_req_data(req,
+                                           struct wrepl_request_state);
+       int ret;
+       int sys_errno;
+
+       ret = tstream_writev_recv(subreq, &sys_errno);
+       TALLOC_FREE(subreq);
+       if (ret == -1) {
+               NTSTATUS status = map_nt_error_from_unix_common(sys_errno);
+               TALLOC_FREE(state->caller.wrepl_socket->stream);
+               tevent_req_nterror(req, status);
+               return;
+       }
 
-       /* check if the request is still in WREPL_STATE_RECV,
-        * we need this here because the caller has may called 
-        * talloc_free(req) and wrepl_send_ctrl_state isn't
-        * a talloc child of the request, so our s->req pointer
-        * is maybe invalid!
-        */
-       for (; req; req = req->next) {
-               if (req == s->req) break;
+       if (state->caller.wrepl_socket->stream == NULL) {
+               tevent_req_nterror(req, NT_STATUS_CONNECTION_DISCONNECTED);
+               return;
        }
-       if (!req) return 0;
 
-       /* here, we need to make sure the async request handler is called
-        * later in the next event_loop and now now
-        */
-       req->trigger = True;
-       wrepl_request_finished(req, NT_STATUS_OK);
+       if (state->ctrl.disconnect_after_send) {
+               subreq = tstream_disconnect_send(state,
+                                                state->caller.ev,
+                                                state->caller.wrepl_socket->stream);
+               if (tevent_req_nomem(subreq, req)) {
+                       return;
+               }
+               tevent_req_set_callback(subreq, wrepl_request_disconnect_done, req);
+               return;
+       }
 
-       if (s->ctrl.disconnect_after_send) {
-               wrepl_socket_dead(s->wrepl_sock, NT_STATUS_LOCAL_DISCONNECT);
+       if (state->ctrl.send_only) {
+               tevent_req_done(req);
+               return;
        }
 
-       return 0;
+       subreq = tstream_read_pdu_blob_send(state,
+                                           state->caller.ev,
+                                           state->caller.wrepl_socket->stream,
+                                           4, /* initial_read_size */
+                                           packet_full_request_u32,
+                                           NULL);
+       if (tevent_req_nomem(subreq, req)) {
+               return;
+       }
+       tevent_req_set_callback(subreq, wrepl_request_read_pdu_done, req);
 }
 
-/*
-  send a generic wins replication request
-*/
-struct wrepl_request *wrepl_request_send(struct wrepl_socket *wrepl_socket,
-                                        struct wrepl_packet *packet,
-                                        struct wrepl_send_ctrl *ctrl)
+static void wrepl_request_disconnect_done(struct tevent_req *subreq)
 {
-       struct wrepl_request *req;
-       struct wrepl_wrap wrap;
-       DATA_BLOB blob;
+       struct tevent_req *req = tevent_req_callback_data(subreq,
+                                struct tevent_req);
+       struct wrepl_request_state *state = tevent_req_data(req,
+                                           struct wrepl_request_state);
+       int ret;
+       int sys_errno;
+
+       ret = tstream_disconnect_recv(subreq, &sys_errno);
+       TALLOC_FREE(subreq);
+       if (ret == -1) {
+               NTSTATUS status = map_nt_error_from_unix_common(sys_errno);
+               TALLOC_FREE(state->caller.wrepl_socket->stream);
+               tevent_req_nterror(req, status);
+               return;
+       }
 
-       req = talloc_zero(wrepl_socket, struct wrepl_request);
-       if (!req) return NULL;
-       req->wrepl_socket = wrepl_socket;
-       req->state        = WREPL_REQUEST_RECV;
-       req->trigger      = True;
+       DEBUG(10,("WINS connection disconnected\n"));
+       TALLOC_FREE(state->caller.wrepl_socket->stream);
 
-       DLIST_ADD_END(wrepl_socket->recv_queue, req, struct wrepl_request *);
-       talloc_set_destructor(req, wrepl_request_destructor);
+       tevent_req_done(req);
+}
 
-       if (wrepl_socket->dead) {
-               return wrepl_request_finished(req, NT_STATUS_INVALID_CONNECTION);
-       }
+static void wrepl_request_read_pdu_done(struct tevent_req *subreq)
+{
+       struct tevent_req *req = tevent_req_callback_data(subreq,
+                                struct tevent_req);
+       struct wrepl_request_state *state = tevent_req_data(req,
+                                           struct wrepl_request_state);
+       NTSTATUS status;
+       DATA_BLOB blob;
+       enum ndr_err_code ndr_err;
 
-       wrap.packet = *packet;
-       req->status = ndr_push_struct_blob(&blob, req, &wrap,
-                                          (ndr_push_flags_fn_t)ndr_push_wrepl_wrap);
-       if (!NT_STATUS_IS_OK(req->status)) {
-               return wrepl_request_finished(req, req->status);
+       status = tstream_read_pdu_blob_recv(subreq, state, &state->rep.blob);
+       if (!NT_STATUS_IS_OK(status)) {
+               TALLOC_FREE(state->caller.wrepl_socket->stream);
+               tevent_req_nterror(req, status);
+               return;
        }
 
-       if (DEBUGLVL(10)) {
-               DEBUG(10,("Sending WINS packet of length %u\n", blob.length));
-               NDR_PRINT_DEBUG(wrepl_packet, &wrap.packet);
+       state->rep.packet = talloc(state, struct wrepl_packet);
+       if (tevent_req_nomem(state->rep.packet, req)) {
+               return;
        }
 
-       if (wrepl_socket->request_timeout > 0) {
-               req->te = event_add_timed(wrepl_socket->event.ctx, req, 
-                                         timeval_current_ofs(wrepl_socket->request_timeout, 0), 
-                                         wrepl_request_timeout_handler, req);
-               if (!req->te) return wrepl_request_finished(req, NT_STATUS_NO_MEMORY);
-       }
+       blob.data = state->rep.blob.data + 4;
+       blob.length = state->rep.blob.length - 4;
 
-       if (ctrl && (ctrl->send_only || ctrl->disconnect_after_send)) {
-               struct wrepl_send_ctrl_state *s = talloc(blob.data, struct wrepl_send_ctrl_state);
-               if (!s) return wrepl_request_finished(req, NT_STATUS_NO_MEMORY);
-               s->ctrl         = *ctrl;
-               s->req          = req;
-               s->wrepl_sock   = wrepl_socket;
-               talloc_set_destructor(s, wrepl_send_ctrl_destructor);
+       /* we have a full request - parse it */
+       ndr_err = ndr_pull_struct_blob(&blob,
+                                      state->rep.packet,
+                                      state->rep.packet,
+                                      (ndr_pull_flags_fn_t)ndr_pull_wrepl_packet);
+       if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
+               status = ndr_map_error2ntstatus(ndr_err);
+               tevent_req_nterror(req, status);
+               return;
        }
 
-       req->status = packet_send(wrepl_socket->packet, blob);
-       if (!NT_STATUS_IS_OK(req->status)) {
-               return wrepl_request_finished(req, req->status);
+       if (DEBUGLVL(10)) {
+               DEBUG(10,("Received WINS packet of length %u\n",
+                         (unsigned)state->rep.blob.length));
+               NDR_PRINT_DEBUG(wrepl_packet, state->rep.packet);
        }
 
-       req->trigger = False;
-       return req;
+       tevent_req_done(req);
 }
 
-/*
-  receive a generic WINS replication reply
-*/
-NTSTATUS wrepl_request_recv(struct wrepl_request *req,
+NTSTATUS wrepl_request_recv(struct tevent_req *req,
                            TALLOC_CTX *mem_ctx,
                            struct wrepl_packet **packet)
 {
-       NTSTATUS status = wrepl_request_wait(req);
-       if (NT_STATUS_IS_OK(status) && packet) {
-               *packet = talloc_steal(mem_ctx, req->packet);
+       struct wrepl_request_state *state = tevent_req_data(req,
+                                           struct wrepl_request_state);
+       NTSTATUS status;
+
+       if (tevent_req_is_nterror(req, &status)) {
+               TALLOC_FREE(state->caller.wrepl_socket->stream);
+               tevent_req_received(req);
+               return status;
+       }
+
+       if (packet) {
+               *packet = talloc_move(mem_ctx, &state->rep.packet);
        }
-       talloc_free(req);
-       return status;
+
+       tevent_req_received(req);
+       return NT_STATUS_OK;
 }
 
 /*
@@ -530,56 +579,135 @@ NTSTATUS wrepl_request_recv(struct wrepl_request *req,
 */
 NTSTATUS wrepl_request(struct wrepl_socket *wrepl_socket,
                       TALLOC_CTX *mem_ctx,
-                      struct wrepl_packet *req_packet,
+                      const struct wrepl_packet *req_packet,
                       struct wrepl_packet **reply_packet)
 {
-       struct wrepl_request *req = wrepl_request_send(wrepl_socket, req_packet, NULL);
-       return wrepl_request_recv(req, mem_ctx, reply_packet);
+       struct tevent_req *subreq;
+       bool ok;
+       NTSTATUS status;
+
+       subreq = wrepl_request_send(mem_ctx, wrepl_socket->event.ctx,
+                                   wrepl_socket, req_packet, NULL);
+       NT_STATUS_HAVE_NO_MEMORY(subreq);
+
+       ok = tevent_req_poll(subreq, wrepl_socket->event.ctx);
+       if (!ok) {
+               TALLOC_FREE(subreq);
+               return NT_STATUS_INTERNAL_ERROR;
+       }
+
+       status = wrepl_request_recv(subreq, mem_ctx, reply_packet);
+       TALLOC_FREE(subreq);
+       NT_STATUS_NOT_OK_RETURN(status);
+
+       return NT_STATUS_OK;
 }
 
 
-/*
-  setup an association - send
-*/
-struct wrepl_request *wrepl_associate_send(struct wrepl_socket *wrepl_socket,
-                                          struct wrepl_associate *io)
+struct wrepl_associate_state {
+       struct wrepl_packet packet;
+       uint32_t assoc_ctx;
+       uint16_t major_version;
+};
+
+static void wrepl_associate_done(struct tevent_req *subreq);
+
+struct tevent_req *wrepl_associate_send(TALLOC_CTX *mem_ctx,
+                                       struct tevent_context *ev,
+                                       struct wrepl_socket *wrepl_socket,
+                                       const struct wrepl_associate *io)
 {
-       struct wrepl_packet *packet;
-       struct wrepl_request *req;
+       struct tevent_req *req;
+       struct wrepl_associate_state *state;
+       struct tevent_req *subreq;
+
+       if (wrepl_socket->event.ctx != ev) {
+               /* TODO: remove wrepl_socket->event.ctx !!! */
+               smb_panic("wrepl_associate_send event context mismatch!");
+               return NULL;
+       }
 
-       packet = talloc_zero(wrepl_socket, struct wrepl_packet);
-       if (packet == NULL) return NULL;
+       req = tevent_req_create(mem_ctx, &state,
+                               struct wrepl_associate_state);
+       if (req == NULL) {
+               return NULL;
+       };
+
+       state->packet.opcode                            = WREPL_OPCODE_BITS;
+       state->packet.mess_type                         = WREPL_START_ASSOCIATION;
+       state->packet.message.start.minor_version       = 2;
+       state->packet.message.start.major_version       = 5;
+
+       /*
+        * nt4 uses 41 bytes for the start_association call
+        * so do it the same and as we don't know th emeanings of this bytes
+        * we just send zeros and nt4, w2k and w2k3 seems to be happy with this
+        *
+        * if we don't do this nt4 uses an old version of the wins replication protocol
+        * and that would break nt4 <-> samba replication
+        */
+       state->packet.padding   = data_blob_talloc(state, NULL, 21);
+       if (tevent_req_nomem(state->packet.padding.data, req)) {
+               return tevent_req_post(req, ev);
+       }
+       memset(state->packet.padding.data, 0, state->packet.padding.length);
 
-       packet->opcode                      = WREPL_OPCODE_BITS;
-       packet->mess_type                   = WREPL_START_ASSOCIATION;
-       packet->message.start.minor_version = 2;
-       packet->message.start.major_version = 5;
+       subreq = wrepl_request_send(state, ev, wrepl_socket, &state->packet, NULL);
+       if (tevent_req_nomem(subreq, req)) {
+               return tevent_req_post(req, ev);
+       }
+       tevent_req_set_callback(subreq, wrepl_associate_done, req);
 
-       req = wrepl_request_send(wrepl_socket, packet, NULL);
+       return req;
+}
+
+static void wrepl_associate_done(struct tevent_req *subreq)
+{
+       struct tevent_req *req = tevent_req_callback_data(subreq,
+                                struct tevent_req);
+       struct wrepl_associate_state *state = tevent_req_data(req,
+                                             struct wrepl_associate_state);
+       NTSTATUS status;
+       struct wrepl_packet *packet;
 
-       talloc_free(packet);
+       status = wrepl_request_recv(subreq, state, &packet);
+       TALLOC_FREE(subreq);
+       if (!NT_STATUS_IS_OK(status)) {
+               tevent_req_nterror(req, status);
+               return;
+       }
 
-       return req;     
+       if (packet->mess_type != WREPL_START_ASSOCIATION_REPLY) {
+               tevent_req_nterror(req, NT_STATUS_INVALID_NETWORK_RESPONSE);
+               return;
+       }
+
+       state->assoc_ctx = packet->message.start_reply.assoc_ctx;
+       state->major_version = packet->message.start_reply.major_version;
+
+       tevent_req_done(req);
 }
 
 /*
   setup an association - recv
 */
-NTSTATUS wrepl_associate_recv(struct wrepl_request *req,
+NTSTATUS wrepl_associate_recv(struct tevent_req *req,
                              struct wrepl_associate *io)
 {
-       struct wrepl_packet *packet=NULL;
+       struct wrepl_associate_state *state = tevent_req_data(req,
+                                             struct wrepl_associate_state);
        NTSTATUS status;
-       status = wrepl_request_recv(req, req->wrepl_socket, &packet);
-       NT_STATUS_NOT_OK_RETURN(status);
-       if (packet->mess_type != WREPL_START_ASSOCIATION_REPLY) {
-               status = NT_STATUS_UNEXPECTED_NETWORK_ERROR;
-       }
-       if (NT_STATUS_IS_OK(status)) {
-               io->out.assoc_ctx = packet->message.start_reply.assoc_ctx;
+
+       if (tevent_req_is_nterror(req, &status)) {
+               tevent_req_received(req);
+               return status;
        }
-       talloc_free(packet);
-       return status;
+
+       io->out.assoc_ctx = state->assoc_ctx;
+       io->out.major_version = state->major_version;
+
+       tevent_req_received(req);
+       return NT_STATUS_OK;
 }
 
 /*
@@ -588,54 +716,108 @@ NTSTATUS wrepl_associate_recv(struct wrepl_request *req,
 NTSTATUS wrepl_associate(struct wrepl_socket *wrepl_socket,
                         struct wrepl_associate *io)
 {
-       struct wrepl_request *req = wrepl_associate_send(wrepl_socket, io);
-       return wrepl_associate_recv(req, io);
+       struct tevent_req *subreq;
+       bool ok;
+       NTSTATUS status;
+
+       subreq = wrepl_associate_send(wrepl_socket, wrepl_socket->event.ctx,
+                                     wrepl_socket, io);
+       NT_STATUS_HAVE_NO_MEMORY(subreq);
+
+       ok = tevent_req_poll(subreq, wrepl_socket->event.ctx);
+       if (!ok) {
+               TALLOC_FREE(subreq);
+               return NT_STATUS_INTERNAL_ERROR;
+       }
+
+       status = wrepl_associate_recv(subreq, io);
+       TALLOC_FREE(subreq);
+       NT_STATUS_NOT_OK_RETURN(status);
+
+       return NT_STATUS_OK;
 }
 
+struct wrepl_associate_stop_state {
+       struct wrepl_packet packet;
+       struct wrepl_send_ctrl ctrl;
+};
 
-/*
-  stop an association - send
-*/
-struct wrepl_request *wrepl_associate_stop_send(struct wrepl_socket *wrepl_socket,
-                                               struct wrepl_associate_stop *io)
+static void wrepl_associate_stop_done(struct tevent_req *subreq);
+
+struct tevent_req *wrepl_associate_stop_send(TALLOC_CTX *mem_ctx,
+                                            struct tevent_context *ev,
+                                            struct wrepl_socket *wrepl_socket,
+                                            const struct wrepl_associate_stop *io)
 {
-       struct wrepl_packet *packet;
-       struct wrepl_request *req;
-       struct wrepl_send_ctrl ctrl;
+       struct tevent_req *req;
+       struct wrepl_associate_stop_state *state;
+       struct tevent_req *subreq;
+
+       if (wrepl_socket->event.ctx != ev) {
+               /* TODO: remove wrepl_socket->event.ctx !!! */
+               smb_panic("wrepl_associate_stop_send event context mismatch!");
+               return NULL;
+       }
 
-       packet = talloc_zero(wrepl_socket, struct wrepl_packet);
-       if (packet == NULL) return NULL;
+       req = tevent_req_create(mem_ctx, &state,
+                               struct wrepl_associate_stop_state);
+       if (req == NULL) {
+               return NULL;
+       };
 
-       packet->opcode                  = WREPL_OPCODE_BITS;
-       packet->assoc_ctx               = io->in.assoc_ctx;
-       packet->mess_type               = WREPL_STOP_ASSOCIATION;
-       packet->message.stop.reason     = io->in.reason;
+       state->packet.opcode                    = WREPL_OPCODE_BITS;
+       state->packet.assoc_ctx                 = io->in.assoc_ctx;
+       state->packet.mess_type                 = WREPL_STOP_ASSOCIATION;
+       state->packet.message.stop.reason       = io->in.reason;
 
-       ZERO_STRUCT(ctrl);
        if (io->in.reason == 0) {
-               ctrl.send_only                  = True;
-               ctrl.disconnect_after_send      = True;
+               state->ctrl.send_only                   = true;
+               state->ctrl.disconnect_after_send       = true;
+       }
+
+       subreq = wrepl_request_send(state, ev, wrepl_socket, &state->packet, &state->ctrl);
+       if (tevent_req_nomem(subreq, req)) {
+               return tevent_req_post(req, ev);
        }
+       tevent_req_set_callback(subreq, wrepl_associate_stop_done, req);
 
-       req = wrepl_request_send(wrepl_socket, packet, &ctrl);
+       return req;
+}
 
-       talloc_free(packet);
+static void wrepl_associate_stop_done(struct tevent_req *subreq)
+{
+       struct tevent_req *req = tevent_req_callback_data(subreq,
+                                struct tevent_req);
+       struct wrepl_associate_stop_state *state = tevent_req_data(req,
+                                                  struct wrepl_associate_stop_state);
+       NTSTATUS status;
+
+       /* currently we don't care about a possible response */
+       status = wrepl_request_recv(subreq, state, NULL);
+       TALLOC_FREE(subreq);
+       if (!NT_STATUS_IS_OK(status)) {
+               tevent_req_nterror(req, status);
+               return;
+       }
 
-       return req;     
+       tevent_req_done(req);
 }
 
 /*
   stop an association - recv
 */
-NTSTATUS wrepl_associate_stop_recv(struct wrepl_request *req,
+NTSTATUS wrepl_associate_stop_recv(struct tevent_req *req,
                                   struct wrepl_associate_stop *io)
 {
-       struct wrepl_packet *packet=NULL;
        NTSTATUS status;
-       status = wrepl_request_recv(req, req->wrepl_socket, &packet);
-       NT_STATUS_NOT_OK_RETURN(status);
-       talloc_free(packet);
-       return status;
+
+       if (tevent_req_is_nterror(req, &status)) {
+               tevent_req_received(req);
+               return status;
+       }
+
+       tevent_req_received(req);
+       return NT_STATUS_OK;
 }
 
 /*
@@ -644,69 +826,128 @@ NTSTATUS wrepl_associate_stop_recv(struct wrepl_request *req,
 NTSTATUS wrepl_associate_stop(struct wrepl_socket *wrepl_socket,
                              struct wrepl_associate_stop *io)
 {
-       struct wrepl_request *req = wrepl_associate_stop_send(wrepl_socket, io);
-       return wrepl_associate_stop_recv(req, io);
+       struct tevent_req *subreq;
+       bool ok;
+       NTSTATUS status;
+
+       subreq = wrepl_associate_stop_send(wrepl_socket, wrepl_socket->event.ctx,
+                                          wrepl_socket, io);
+       NT_STATUS_HAVE_NO_MEMORY(subreq);
+
+       ok = tevent_req_poll(subreq, wrepl_socket->event.ctx);
+       if (!ok) {
+               TALLOC_FREE(subreq);
+               return NT_STATUS_INTERNAL_ERROR;
+       }
+
+       status = wrepl_associate_stop_recv(subreq, io);
+       TALLOC_FREE(subreq);
+       NT_STATUS_NOT_OK_RETURN(status);
+
+       return NT_STATUS_OK;
 }
 
-/*
-  fetch the partner tables - send
-*/
-struct wrepl_request *wrepl_pull_table_send(struct wrepl_socket *wrepl_socket,
-                                           struct wrepl_pull_table *io)
+struct wrepl_pull_table_state {
+       struct wrepl_packet packet;
+       uint32_t num_partners;
+       struct wrepl_wins_owner *partners;
+};
+
+static void wrepl_pull_table_done(struct tevent_req *subreq);
+
+struct tevent_req *wrepl_pull_table_send(TALLOC_CTX *mem_ctx,
+                                        struct tevent_context *ev,
+                                        struct wrepl_socket *wrepl_socket,
+                                        const struct wrepl_pull_table *io)
 {
+       struct tevent_req *req;
+       struct wrepl_pull_table_state *state;
+       struct tevent_req *subreq;
+
+       if (wrepl_socket->event.ctx != ev) {
+               /* TODO: remove wrepl_socket->event.ctx !!! */
+               smb_panic("wrepl_pull_table_send event context mismatch!");
+               return NULL;
+       }
+
+       req = tevent_req_create(mem_ctx, &state,
+                               struct wrepl_pull_table_state);
+       if (req == NULL) {
+               return NULL;
+       };
+
+       state->packet.opcode                            = WREPL_OPCODE_BITS;
+       state->packet.assoc_ctx                         = io->in.assoc_ctx;
+       state->packet.mess_type                         = WREPL_REPLICATION;
+       state->packet.message.replication.command       = WREPL_REPL_TABLE_QUERY;
+
+       subreq = wrepl_request_send(state, ev, wrepl_socket, &state->packet, NULL);
+       if (tevent_req_nomem(subreq, req)) {
+               return tevent_req_post(req, ev);
+       }
+       tevent_req_set_callback(subreq, wrepl_pull_table_done, req);
+
+       return req;
+}
+
+static void wrepl_pull_table_done(struct tevent_req *subreq)
+{
+       struct tevent_req *req = tevent_req_callback_data(subreq,
+                                struct tevent_req);
+       struct wrepl_pull_table_state *state = tevent_req_data(req,
+                                              struct wrepl_pull_table_state);
+       NTSTATUS status;
        struct wrepl_packet *packet;
-       struct wrepl_request *req;
+       struct wrepl_table *table;
 
-       packet = talloc_zero(wrepl_socket, struct wrepl_packet);
-       if (packet == NULL) return NULL;
+       status = wrepl_request_recv(subreq, state, &packet);
+       TALLOC_FREE(subreq);
+       if (!NT_STATUS_IS_OK(status)) {
+               tevent_req_nterror(req, status);
+               return;
+       }
 
-       packet->opcode                      = WREPL_OPCODE_BITS;
-       packet->assoc_ctx                   = io->in.assoc_ctx;
-       packet->mess_type                   = WREPL_REPLICATION;
-       packet->message.replication.command = WREPL_REPL_TABLE_QUERY;
+       if (packet->mess_type != WREPL_REPLICATION) {
+               tevent_req_nterror(req, NT_STATUS_NETWORK_ACCESS_DENIED);
+               return;
+       }
 
-       req = wrepl_request_send(wrepl_socket, packet, NULL);
+       if (packet->message.replication.command != WREPL_REPL_TABLE_REPLY) {
+               tevent_req_nterror(req, NT_STATUS_INVALID_NETWORK_RESPONSE);
+               return;
+       }
 
-       talloc_free(packet);
+       table = &packet->message.replication.info.table;
 
-       return req;     
-}
+       state->num_partners = table->partner_count;
+       state->partners = talloc_move(state, &table->partners);
 
+       tevent_req_done(req);
+}
 
 /*
   fetch the partner tables - recv
 */
-NTSTATUS wrepl_pull_table_recv(struct wrepl_request *req,
+NTSTATUS wrepl_pull_table_recv(struct tevent_req *req,
                               TALLOC_CTX *mem_ctx,
                               struct wrepl_pull_table *io)
 {
-       struct wrepl_packet *packet=NULL;
+       struct wrepl_pull_table_state *state = tevent_req_data(req,
+                                              struct wrepl_pull_table_state);
        NTSTATUS status;
-       struct wrepl_table *table;
-       int i;
 
-       status = wrepl_request_recv(req, req->wrepl_socket, &packet);
-       NT_STATUS_NOT_OK_RETURN(status);
-       if (packet->mess_type != WREPL_REPLICATION) {
-               status = NT_STATUS_NETWORK_ACCESS_DENIED;
-       } else if (packet->message.replication.command != WREPL_REPL_TABLE_REPLY) {
-               status = NT_STATUS_UNEXPECTED_NETWORK_ERROR;
+       if (tevent_req_is_nterror(req, &status)) {
+               tevent_req_received(req);
+               return status;
        }
-       if (!NT_STATUS_IS_OK(status)) goto failed;
 
-       table = &packet->message.replication.info.table;
-       io->out.num_partners = table->partner_count;
-       io->out.partners = talloc_steal(mem_ctx, table->partners);
-       for (i=0;i<io->out.num_partners;i++) {
-               talloc_steal(io->out.partners, io->out.partners[i].address);
-       }
+       io->out.num_partners = state->num_partners;
+       io->out.partners = talloc_move(mem_ctx, &state->partners);
 
-failed:
-       talloc_free(packet);
-       return status;
+       tevent_req_received(req);
+       return NT_STATUS_OK;
 }
 
-
 /*
   fetch the partner table - sync api
 */
@@ -714,112 +955,191 @@ NTSTATUS wrepl_pull_table(struct wrepl_socket *wrepl_socket,
                          TALLOC_CTX *mem_ctx,
                          struct wrepl_pull_table *io)
 {
-       struct wrepl_request *req = wrepl_pull_table_send(wrepl_socket, io);
-       return wrepl_pull_table_recv(req, mem_ctx, io);
-}
+       struct tevent_req *subreq;
+       bool ok;
+       NTSTATUS status;
 
+       subreq = wrepl_pull_table_send(mem_ctx, wrepl_socket->event.ctx,
+                                      wrepl_socket, io);
+       NT_STATUS_HAVE_NO_MEMORY(subreq);
 
-/*
-  fetch the names for a WINS partner - send
-*/
-struct wrepl_request *wrepl_pull_names_send(struct wrepl_socket *wrepl_socket,
-                                           struct wrepl_pull_names *io)
-{
-       struct wrepl_packet *packet;
-       struct wrepl_request *req;
+       ok = tevent_req_poll(subreq, wrepl_socket->event.ctx);
+       if (!ok) {
+               TALLOC_FREE(subreq);
+               return NT_STATUS_INTERNAL_ERROR;
+       }
 
-       packet = talloc_zero(wrepl_socket, struct wrepl_packet);
-       if (packet == NULL) return NULL;
+       status = wrepl_pull_table_recv(subreq, mem_ctx, io);
+       TALLOC_FREE(subreq);
+       NT_STATUS_NOT_OK_RETURN(status);
+
+       return NT_STATUS_OK;
+}
 
-       packet->opcode                         = WREPL_OPCODE_BITS;
-       packet->assoc_ctx                      = io->in.assoc_ctx;
-       packet->mess_type                      = WREPL_REPLICATION;
-       packet->message.replication.command    = WREPL_REPL_SEND_REQUEST;
-       packet->message.replication.info.owner = io->in.partner;
 
-       req = wrepl_request_send(wrepl_socket, packet, NULL);
+struct wrepl_pull_names_state {
+       struct {
+               const struct wrepl_pull_names *io;
+       } caller;
+       struct wrepl_packet packet;
+       uint32_t num_names;
+       struct wrepl_name *names;
+};
 
-       talloc_free(packet);
+static void wrepl_pull_names_done(struct tevent_req *subreq);
 
-       return req;     
+struct tevent_req *wrepl_pull_names_send(TALLOC_CTX *mem_ctx,
+                                        struct tevent_context *ev,
+                                        struct wrepl_socket *wrepl_socket,
+                                        const struct wrepl_pull_names *io)
+{
+       struct tevent_req *req;
+       struct wrepl_pull_names_state *state;
+       struct tevent_req *subreq;
+
+       if (wrepl_socket->event.ctx != ev) {
+               /* TODO: remove wrepl_socket->event.ctx !!! */
+               smb_panic("wrepl_pull_names_send event context mismatch!");
+               return NULL;
+       }
+
+       req = tevent_req_create(mem_ctx, &state,
+                               struct wrepl_pull_names_state);
+       if (req == NULL) {
+               return NULL;
+       };
+       state->caller.io = io;
+
+       state->packet.opcode                            = WREPL_OPCODE_BITS;
+       state->packet.assoc_ctx                         = io->in.assoc_ctx;
+       state->packet.mess_type                         = WREPL_REPLICATION;
+       state->packet.message.replication.command       = WREPL_REPL_SEND_REQUEST;
+       state->packet.message.replication.info.owner    = io->in.partner;
+
+       subreq = wrepl_request_send(state, ev, wrepl_socket, &state->packet, NULL);
+       if (tevent_req_nomem(subreq, req)) {
+               return tevent_req_post(req, ev);
+       }
+       tevent_req_set_callback(subreq, wrepl_pull_names_done, req);
+
+       return req;
 }
 
-/*
-  fetch the names for a WINS partner - recv
-*/
-NTSTATUS wrepl_pull_names_recv(struct wrepl_request *req,
-                              TALLOC_CTX *mem_ctx,
-                              struct wrepl_pull_names *io)
+static void wrepl_pull_names_done(struct tevent_req *subreq)
 {
-       struct wrepl_packet *packet=NULL;
+       struct tevent_req *req = tevent_req_callback_data(subreq,
+                                struct tevent_req);
+       struct wrepl_pull_names_state *state = tevent_req_data(req,
+                                              struct wrepl_pull_names_state);
        NTSTATUS status;
-       int i;
+       struct wrepl_packet *packet;
+       uint32_t i;
 
-       status = wrepl_request_recv(req, req->wrepl_socket, &packet);
-       NT_STATUS_NOT_OK_RETURN(status);
-       if (packet->mess_type != WREPL_REPLICATION ||
-           packet->message.replication.command != WREPL_REPL_SEND_REPLY) {
-               status = NT_STATUS_UNEXPECTED_NETWORK_ERROR;
+       status = wrepl_request_recv(subreq, state, &packet);
+       TALLOC_FREE(subreq);
+       if (!NT_STATUS_IS_OK(status)) {
+               tevent_req_nterror(req, status);
+               return;
        }
-       if (!NT_STATUS_IS_OK(status)) goto failed;
 
-       io->out.num_names = packet->message.replication.info.reply.num_names;
+       if (packet->mess_type != WREPL_REPLICATION) {
+               tevent_req_nterror(req, NT_STATUS_NETWORK_ACCESS_DENIED);
+               return;
+       }
 
-       io->out.names = talloc_array(packet, struct wrepl_name, io->out.num_names);
-       if (io->out.names == NULL) goto nomem;
+       if (packet->message.replication.command != WREPL_REPL_SEND_REPLY) {
+               tevent_req_nterror(req, NT_STATUS_INVALID_NETWORK_RESPONSE);
+               return;
+       }
+
+       state->num_names = packet->message.replication.info.reply.num_names;
+
+       state->names = talloc_array(state, struct wrepl_name, state->num_names);
+       if (tevent_req_nomem(state->names, req)) {
+               return;
+       }
 
        /* convert the list of names and addresses to a sane format */
-       for (i=0;i<io->out.num_names;i++) {
+       for (i=0; i < state->num_names; i++) {
                struct wrepl_wins_name *wname = &packet->message.replication.info.reply.names[i];
-               struct wrepl_name *name = &io->out.names[i];
+               struct wrepl_name *name = &state->names[i];
 
                name->name      = *wname->name;
-               talloc_steal(io->out.names, wname->name);
+               talloc_steal(state->names, wname->name);
                name->type      = WREPL_NAME_TYPE(wname->flags);
                name->state     = WREPL_NAME_STATE(wname->flags);
                name->node      = WREPL_NAME_NODE(wname->flags);
                name->is_static = WREPL_NAME_IS_STATIC(wname->flags);
                name->raw_flags = wname->flags;
                name->version_id= wname->id;
-               name->owner     = talloc_strdup(io->out.names, io->in.partner.address);
-               if (name->owner == NULL) goto nomem;
+               name->owner     = talloc_strdup(state->names,
+                                               state->caller.io->in.partner.address);
+               if (tevent_req_nomem(name->owner, req)) {
+                       return;
+               }
 
                /* trying to save 1 or 2 bytes on the wire isn't a good idea */
                if (wname->flags & 2) {
-                       int j;
+                       uint32_t j;
 
                        name->num_addresses = wname->addresses.addresses.num_ips;
-                       name->addresses = talloc_array(io->out.names, 
-                                                      struct wrepl_address, 
+                       name->addresses = talloc_array(state->names,
+                                                      struct wrepl_address,
                                                       name->num_addresses);
-                       if (name->addresses == NULL) goto nomem;
+                       if (tevent_req_nomem(name->addresses, req)) {
+                               return;
+                       }
+
                        for (j=0;j<name->num_addresses;j++) {
-                               name->addresses[j].owner = 
-                                       talloc_steal(name->addresses, 
-                                                    wname->addresses.addresses.ips[j].owner);
+                               name->addresses[j].owner =
+                                       talloc_move(name->addresses,
+                                                   &wname->addresses.addresses.ips[j].owner);
                                name->addresses[j].address = 
-                                       talloc_steal(name->addresses, 
-                                                    wname->addresses.addresses.ips[j].ip);
+                                       talloc_move(name->addresses,
+                                                   &wname->addresses.addresses.ips[j].ip);
                        }
                } else {
                        name->num_addresses = 1;
-                       name->addresses = talloc(io->out.names, struct wrepl_address);
-                       if (name->addresses == NULL) goto nomem;
-                       name->addresses[0].owner = talloc_strdup(name->addresses,io->in.partner.address);
-                       if (name->addresses[0].owner == NULL) goto nomem;
-                       name->addresses[0].address = talloc_steal(name->addresses,
-                                                                 wname->addresses.ip);
+                       name->addresses = talloc_array(state->names,
+                                                      struct wrepl_address,
+                                                      name->num_addresses);
+                       if (tevent_req_nomem(name->addresses, req)) {
+                               return;
+                       }
+
+                       name->addresses[0].owner = talloc_strdup(name->addresses, name->owner);
+                       if (tevent_req_nomem(name->addresses[0].owner, req)) {
+                               return;
+                       }
+                       name->addresses[0].address = talloc_move(name->addresses,
+                                                                &wname->addresses.ip);
                }
        }
 
-       talloc_steal(mem_ctx, io->out.names);
-       talloc_free(packet);
+       tevent_req_done(req);
+}
+
+/*
+  fetch the names for a WINS partner - recv
+*/
+NTSTATUS wrepl_pull_names_recv(struct tevent_req *req,
+                              TALLOC_CTX *mem_ctx,
+                              struct wrepl_pull_names *io)
+{
+       struct wrepl_pull_names_state *state = tevent_req_data(req,
+                                              struct wrepl_pull_names_state);
+       NTSTATUS status;
+
+       if (tevent_req_is_nterror(req, &status)) {
+               tevent_req_received(req);
+               return status;
+       }
+
+       io->out.num_names = state->num_names;
+       io->out.names = talloc_move(mem_ctx, &state->names);
+
+       tevent_req_received(req);
        return NT_STATUS_OK;
-nomem:
-       status = NT_STATUS_NO_MEMORY;
-failed:
-       talloc_free(packet);
-       return status;
 }
 
 
@@ -831,6 +1151,23 @@ NTSTATUS wrepl_pull_names(struct wrepl_socket *wrepl_socket,
                          TALLOC_CTX *mem_ctx,
                          struct wrepl_pull_names *io)
 {
-       struct wrepl_request *req = wrepl_pull_names_send(wrepl_socket, io);
-       return wrepl_pull_names_recv(req, mem_ctx, io);
+       struct tevent_req *subreq;
+       bool ok;
+       NTSTATUS status;
+
+       subreq = wrepl_pull_names_send(mem_ctx, wrepl_socket->event.ctx,
+                                      wrepl_socket, io);
+       NT_STATUS_HAVE_NO_MEMORY(subreq);
+
+       ok = tevent_req_poll(subreq, wrepl_socket->event.ctx);
+       if (!ok) {
+               TALLOC_FREE(subreq);
+               return NT_STATUS_INTERNAL_ERROR;
+       }
+
+       status = wrepl_pull_names_recv(subreq, mem_ctx, io);
+       TALLOC_FREE(subreq);
+       NT_STATUS_NOT_OK_RETURN(status);
+
+       return NT_STATUS_OK;
 }