Add context for libcli_resolve.
[samba-svnmirror.git] / source / librpc / rpc / dcerpc_sock.c
index 361d0d4a5fd20f4804bd1e365b5d57866beadf5a..2e5a8388a1bb977fb5a7e7a9d06631c0799d6480 100644 (file)
@@ -5,10 +5,11 @@
 
    Copyright (C) Andrew Tridgell 2003
    Copyright (C) Jelmer Vernooij 2004
+   Copyright (C) Rafal Szczesniak 2006
    
    This program is free software; you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
-   the Free Software Foundation; either version 2 of the License, or
+   the Free Software Foundation; either version 3 of the License, or
    (at your option) any later version.
    
    This program is distributed in the hope that it will be useful,
    GNU General Public License for more details.
    
    You should have received a copy of the GNU General Public License
-   along with this program; if not, write to the Free Software
-   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+   along with this program.  If not, see <http://www.gnu.org/licenses/>.
 */
 
 #include "includes.h"
-#include "dlinklist.h"
-#include "events.h"
-#include "librpc/gen_ndr/ndr_epmapper.h"
-
-#define MIN_HDR_SIZE 16
-
-struct sock_blob {
-       struct sock_blob *next, *prev;
-       DATA_BLOB data;
-};
+#include "lib/events/events.h"
+#include "lib/socket/socket.h"
+#include "lib/stream/packet.h"
+#include "libcli/composite/composite.h"
+#include "librpc/rpc/dcerpc.h"
+#include "libcli/resolve/resolve.h"
+#include "param/param.h"
 
 /* transport private information used by general socket pipe transports */
 struct sock_private {
-       struct event_context *event_ctx;
        struct fd_event *fde;
        struct socket_context *sock;
        char *server_name;
-       uint32_t port;
 
-       struct sock_blob *pending_send;
-
-       struct {
-               size_t received;
-               DATA_BLOB data;
-               uint_t pending_count;
-       } recv;
+       struct packet_context *packet;
+       uint32_t pending_reads;
 };
 
 
 /*
   mark the socket dead
 */
-static void sock_dead(struct dcerpc_pipe *p, NTSTATUS status)
+static void sock_dead(struct dcerpc_connection *p, NTSTATUS status)
 {
-       struct sock_private *sock = p->transport.private;
+       struct sock_private *sock = (struct sock_private *)p->transport.private_data;
+
+       if (!sock) return;
 
-       if (sock && sock->sock != NULL) {
+       if (sock->packet) {
+               packet_recv_disable(sock->packet);
+               packet_set_fde(sock->packet, NULL);
+               packet_set_socket(sock->packet, NULL);
+       }
+
+       if (sock->fde) {
+               talloc_free(sock->fde);
+               sock->fde = NULL;
+       }
+
+       if (sock->sock) {
                talloc_free(sock->sock);
                sock->sock = NULL;
        }
 
-       /* wipe any pending sends */
-       while (sock->pending_send) {
-               struct sock_blob *blob = sock->pending_send;
-               DLIST_REMOVE(sock->pending_send, blob);
-               talloc_free(blob);
+       if (NT_STATUS_EQUAL(NT_STATUS_UNSUCCESSFUL, status)) {
+               status = NT_STATUS_UNEXPECTED_NETWORK_ERROR;
        }
 
-       if (!NT_STATUS_IS_OK(status)) {
+       if (NT_STATUS_EQUAL(NT_STATUS_OK, status)) {
+               status = NT_STATUS_END_OF_FILE;
+       }
+
+       if (p->transport.recv_data) {
                p->transport.recv_data(p, NULL, status);
        }
 }
 
+
 /*
-  process send requests
+  handle socket recv errors
 */
-static void sock_process_send(struct dcerpc_pipe *p)
+static void sock_error_handler(void *private, NTSTATUS status)
 {
-       struct sock_private *sock = p->transport.private;
-
-       while (sock->pending_send) {
-               struct sock_blob *blob = sock->pending_send;
-               NTSTATUS status;
-               size_t sent;
-               status = socket_send(sock->sock, &blob->data, &sent, 0);
-               if (NT_STATUS_IS_ERR(status)) {
-                       sock_dead(p, NT_STATUS_NET_WRITE_FAULT);
-                       break;
-               }
-               if (sent == 0) {
-                       break;
-               }
-
-               blob->data.data += sent;
-               blob->data.length -= sent;
-
-               if (blob->data.length != 0) {
-                       break;
-               }
-
-               DLIST_REMOVE(sock->pending_send, blob);
-               talloc_free(blob);
-       }
-
-       if (sock->pending_send == NULL) {
-               sock->fde->flags &= ~EVENT_FD_WRITE;
-       }
+       struct dcerpc_connection *p = talloc_get_type(private, 
+                                                     struct dcerpc_connection);
+       sock_dead(p, status);
 }
 
-
 /*
-  process recv requests
+  check if a blob is a complete packet
 */
-static void sock_process_recv(struct dcerpc_pipe *p)
+static NTSTATUS sock_complete_packet(void *private, DATA_BLOB blob, size_t *size)
 {
-       struct sock_private *sock = p->transport.private;
-       NTSTATUS status;
-       size_t nread;
-
-       if (sock->recv.data.data == NULL) {
-               sock->recv.data = data_blob_talloc(sock, NULL, MIN_HDR_SIZE);
-       }
-
-       /* read in the base header to get the fragment length */
-       if (sock->recv.received < MIN_HDR_SIZE) {
-               uint32_t frag_length;
-
-               status = socket_recv(sock->sock, 
-                                    sock->recv.data.data + sock->recv.received, 
-                                    MIN_HDR_SIZE - sock->recv.received, 
-                                    &nread, 0);
-               if (NT_STATUS_IS_ERR(status)) {
-                       sock_dead(p, NT_STATUS_NET_WRITE_FAULT);
-                       return;
-               }
-               if (nread == 0) {
-                       return;
-               }
-               
-               sock->recv.received += nread;
-
-               if (sock->recv.received != MIN_HDR_SIZE) {
-                       return;
-               }
-               frag_length = dcerpc_get_frag_length(&sock->recv.data);
-
-               sock->recv.data.data = talloc_realloc(sock, sock->recv.data.data,
-                                                    frag_length);
-               if (sock->recv.data.data == NULL) {
-                       sock_dead(p, NT_STATUS_NO_MEMORY);
-                       return;
-               }
-               sock->recv.data.length = frag_length;
+       if (blob.length < DCERPC_FRAG_LEN_OFFSET+2) {
+               return STATUS_MORE_ENTRIES;
        }
-
-       /* read in the rest of the packet */
-       status = socket_recv(sock->sock, 
-                            sock->recv.data.data + sock->recv.received, 
-                            sock->recv.data.length - sock->recv.received, 
-                            &nread, 0);
-       if (NT_STATUS_IS_ERR(status)) {
-               sock_dead(p, NT_STATUS_NET_WRITE_FAULT);
-               return;
-       }
-       if (nread == 0) {
-               return;
-       }
-       sock->recv.received += nread;
-
-       if (sock->recv.received != sock->recv.data.length) {
-               return;
+       *size = dcerpc_get_frag_length(&blob);
+       if (*size > blob.length) {
+               return STATUS_MORE_ENTRIES;
        }
+       return NT_STATUS_OK;
+}
 
-       /* we have a full packet */
-       p->transport.recv_data(p, &sock->recv.data, NT_STATUS_OK);
-       talloc_free(sock->recv.data.data);
-       sock->recv.data = data_blob(NULL, 0);
-       sock->recv.received = 0;
-       sock->recv.pending_count--;
-       if (sock->recv.pending_count == 0) {
-               sock->fde->flags &= ~EVENT_FD_READ;
+/*
+  process recv requests
+*/
+static NTSTATUS sock_process_recv(void *private, DATA_BLOB blob)
+{
+       struct dcerpc_connection *p = talloc_get_type(private, 
+                                                     struct dcerpc_connection);
+       struct sock_private *sock = (struct sock_private *)p->transport.private_data;
+       sock->pending_reads--;
+       if (sock->pending_reads == 0) {
+               packet_recv_disable(sock->packet);
        }
+       p->transport.recv_data(p, &blob, NT_STATUS_OK);
+       return NT_STATUS_OK;
 }
 
 /*
   called when a IO is triggered by the events system
 */
 static void sock_io_handler(struct event_context *ev, struct fd_event *fde, 
-                           struct timeval t, uint16_t flags)
+                           uint16_t flags, void *private)
 {
-       struct dcerpc_pipe *p = fde->private;
-       struct sock_private *sock = p->transport.private;
+       struct dcerpc_connection *p = talloc_get_type(private, 
+                                                     struct dcerpc_connection);
+       struct sock_private *sock = (struct sock_private *)p->transport.private_data;
 
        if (flags & EVENT_FD_WRITE) {
-               sock_process_send(p);
+               packet_queue_run(sock->packet);
+               return;
        }
 
        if (sock->sock == NULL) {
@@ -204,20 +141,19 @@ static void sock_io_handler(struct event_context *ev, struct fd_event *fde,
        }
 
        if (flags & EVENT_FD_READ) {
-               sock_process_recv(p);
+               packet_recv(sock->packet);
        }
 }
 
 /* 
-   initiate a read request 
+   initiate a read request - not needed for dcerpc sockets
 */
-static NTSTATUS sock_send_read(struct dcerpc_pipe *p)
+static NTSTATUS sock_send_read(struct dcerpc_connection *p)
 {
-       struct sock_private *sock = p->transport.private;
-
-       sock->recv.pending_count++;
-       if (sock->recv.pending_count == 1) {
-               sock->fde->flags |= EVENT_FD_READ;
+       struct sock_private *sock = (struct sock_private *)p->transport.private_data;
+       sock->pending_reads++;
+       if (sock->pending_reads == 1) {
+               packet_recv_enable(sock->packet);
        }
        return NT_STATUS_OK;
 }
@@ -225,25 +161,26 @@ static NTSTATUS sock_send_read(struct dcerpc_pipe *p)
 /* 
    send an initial pdu in a multi-pdu sequence
 */
-static NTSTATUS sock_send_request(struct dcerpc_pipe *p, DATA_BLOB *data, BOOL trigger_read)
+static NTSTATUS sock_send_request(struct dcerpc_connection *p, DATA_BLOB *data, 
+                                 bool trigger_read)
 {
-       struct sock_private *sock = p->transport.private;
-       struct sock_blob *blob;
+       struct sock_private *sock = (struct sock_private *)p->transport.private_data;
+       DATA_BLOB blob;
+       NTSTATUS status;
 
-       blob = talloc_p(sock, struct sock_blob);
-       if (blob == NULL) {
-               return NT_STATUS_NO_MEMORY;
+       if (sock->sock == NULL) {
+               return NT_STATUS_CONNECTION_DISCONNECTED;
        }
 
-       blob->data = data_blob_talloc(blob, data->data, data->length);
-       if (blob->data.data == NULL) {
-               talloc_free(blob);
+       blob = data_blob_talloc(sock->packet, data->data, data->length);
+       if (blob.data == NULL) {
                return NT_STATUS_NO_MEMORY;
        }
 
-       DLIST_ADD_END(sock->pending_send, blob, struct sock_blob *);
-
-       sock->fde->flags |= EVENT_FD_WRITE;
+       status = packet_send(sock->packet, blob);
+       if (!NT_STATUS_IS_OK(status)) {
+               return status;
+       }
 
        if (trigger_read) {
                sock_send_read(p);
@@ -252,148 +189,470 @@ static NTSTATUS sock_send_request(struct dcerpc_pipe *p, DATA_BLOB *data, BOOL t
        return NT_STATUS_OK;
 }
 
-/* 
-   return the event context so the caller can process asynchronously
-*/
-static struct event_context *sock_event_context(struct dcerpc_pipe *p)
-{
-       struct sock_private *sock = p->transport.private;
-
-       return sock->event_ctx;
-}
-
 /* 
    shutdown sock pipe connection
 */
-static NTSTATUS sock_shutdown_pipe(struct dcerpc_pipe *p)
+static NTSTATUS sock_shutdown_pipe(struct dcerpc_connection *p, NTSTATUS status)
 {
-       sock_dead(p, NT_STATUS_OK);
+       struct sock_private *sock = (struct sock_private *)p->transport.private_data;
 
-       return NT_STATUS_OK;
+       if (sock && sock->sock) {
+               sock_dead(p, status);
+       }
+
+       return status;
 }
 
 /*
   return sock server name
 */
-static const char *sock_peer_name(struct dcerpc_pipe *p)
+static const char *sock_peer_name(struct dcerpc_connection *p)
 {
-       struct sock_private *sock = p->transport.private;
+       struct sock_private *sock = talloc_get_type(p->transport.private_data, struct sock_private);
        return sock->server_name;
 }
 
-/* 
-   open a rpc connection using the generic socket library
+/*
+  return remote name we make the actual connection (good for kerberos) 
 */
-static NTSTATUS dcerpc_pipe_open_socket(struct dcerpc_pipe **p, 
-                                       const char *server,
-                                       uint32_t port, 
-                                       const char *type,
-                                       enum dcerpc_transport_t transport)
+static const char *sock_target_hostname(struct dcerpc_connection *p)
 {
-       struct sock_private *sock;
-       struct socket_context *socket_ctx;
-       struct fd_event fde;
-       NTSTATUS status;
+       struct sock_private *sock = talloc_get_type(p->transport.private_data, struct sock_private);
+       return sock->server_name;
+}
 
-       if (!(*p = dcerpc_pipe_init())) {
-                return NT_STATUS_NO_MEMORY;
-       }
-       sock = talloc_p((*p), struct sock_private);
-       if (!sock) {
-               talloc_free(*p);
-               return NT_STATUS_NO_MEMORY;
-       }
 
-       status = socket_create(type, SOCKET_TYPE_STREAM, &socket_ctx, 0);
-       if (!NT_STATUS_IS_OK(status)) {
-               talloc_free(*p);
-               return status;
-       }
-       talloc_steal(sock, socket_ctx);
+struct pipe_open_socket_state {
+       struct dcerpc_connection *conn;
+       struct socket_context *socket_ctx;
+       struct sock_private *sock;
+       struct socket_address *server;
+       const char *target_hostname;
+       enum dcerpc_transport_t transport;
+};
 
-       status = socket_connect(socket_ctx, NULL, 0, server, port, 0);
-       if (!NT_STATUS_IS_OK(status)) {
-               talloc_free(*p);
-               return status;
+
+static void continue_socket_connect(struct composite_context *ctx)
+{
+       struct dcerpc_connection *conn;
+       struct sock_private *sock;
+       struct composite_context *c = talloc_get_type(ctx->async.private_data,
+                                                     struct composite_context);
+       struct pipe_open_socket_state *s = talloc_get_type(c->private_data,
+                                                          struct pipe_open_socket_state);
+
+       /* make it easier to write a function calls */
+       conn = s->conn;
+       sock = s->sock;
+
+       c->status = socket_connect_recv(ctx);
+       if (!NT_STATUS_IS_OK(c->status)) {
+               DEBUG(0, ("Failed to connect host %s on port %d - %s\n", 
+                         s->server->addr, s->server->port,
+                         nt_errstr(c->status)));
+               composite_error(c, c->status);
+               return;
        }
 
        /*
          fill in the transport methods
        */
-       (*p)->transport.transport = transport;
-       (*p)->transport.private = NULL;
+       conn->transport.transport       = s->transport;
+       conn->transport.private_data    = NULL;
 
-       (*p)->transport.send_request = sock_send_request;
-       (*p)->transport.send_read = sock_send_read;
-       (*p)->transport.event_context = sock_event_context;
-       (*p)->transport.recv_data = NULL;
+       conn->transport.send_request    = sock_send_request;
+       conn->transport.send_read       = sock_send_read;
+       conn->transport.recv_data       = NULL;
 
-       (*p)->transport.shutdown_pipe = sock_shutdown_pipe;
-       (*p)->transport.peer_name = sock_peer_name;
-       
-       sock->sock = socket_ctx;
-       sock->server_name = talloc_strdup((*p), server);
-       sock->event_ctx = event_context_init(sock);
-       sock->pending_send = NULL;
-       sock->recv.received = 0;
-       sock->recv.data = data_blob(NULL, 0);
-       sock->recv.pending_count = 0;
+       conn->transport.shutdown_pipe   = sock_shutdown_pipe;
+       conn->transport.peer_name       = sock_peer_name;
+       conn->transport.target_hostname = sock_target_hostname;
+
+       sock->sock          = s->socket_ctx;
+       sock->pending_reads = 0;
+       sock->server_name   = strupper_talloc(sock, s->target_hostname);
 
-       fde.fd = socket_get_fd(sock->sock);
-       fde.flags = 0;
-       fde.handler = sock_io_handler;
-       fde.private = *p;
+       sock->fde = event_add_fd(conn->event_ctx, sock->sock, socket_get_fd(sock->sock),
+                                EVENT_FD_READ, sock_io_handler, conn);
+       
+       conn->transport.private_data = sock;
 
-       sock->fde = event_add_fd(sock->event_ctx, &fde);
+       sock->packet = packet_init(sock);
+       if (sock->packet == NULL) {
+               composite_error(c, NT_STATUS_NO_MEMORY);
+               talloc_free(sock);
+               return;
+       }
 
-       (*p)->transport.private = sock;
+       packet_set_private(sock->packet, conn);
+       packet_set_socket(sock->packet, sock->sock);
+       packet_set_callback(sock->packet, sock_process_recv);
+       packet_set_full_request(sock->packet, sock_complete_packet);
+       packet_set_error_handler(sock->packet, sock_error_handler);
+       packet_set_event_context(sock->packet, conn->event_ctx);
+       packet_set_fde(sock->packet, sock->fde);
+       packet_set_serialise(sock->packet);
+       packet_set_initial_read(sock->packet, 16);
 
        /* ensure we don't get SIGPIPE */
-       BlockSignals(True,SIGPIPE);
+       BlockSignals(true, SIGPIPE);
 
-       return NT_STATUS_OK;
+       composite_done(c);
 }
 
-/* 
-   open a rpc connection using tcp
+
+static struct composite_context *dcerpc_pipe_open_socket_send(TALLOC_CTX *mem_ctx,
+                                                      struct dcerpc_connection *cn,
+                                                      struct socket_address *server,
+                                                      const char *target_hostname,
+                                                      enum dcerpc_transport_t transport)
+{
+       struct composite_context *c;
+       struct pipe_open_socket_state *s;
+       struct composite_context *conn_req;
+
+       c = composite_create(mem_ctx, cn->event_ctx);
+       if (c == NULL) return NULL;
+
+       s = talloc_zero(c, struct pipe_open_socket_state);
+       if (composite_nomem(s, c)) return c;
+       c->private_data = s;
+
+       s->conn      = cn;
+       s->transport = transport;
+       s->server    = talloc_reference(c, server);
+       if (composite_nomem(s->server, c)) return c;
+       s->target_hostname = talloc_reference(s, target_hostname);
+
+       s->sock = talloc(cn, struct sock_private);
+       if (composite_nomem(s->sock, c)) return c;
+
+       c->status = socket_create(server->family, SOCKET_TYPE_STREAM, &s->socket_ctx, 0);
+       if (!composite_is_ok(c)) return c;
+
+       talloc_steal(s->sock, s->socket_ctx);
+
+       conn_req = socket_connect_send(s->socket_ctx, NULL, s->server, 0, 
+                                      lp_resolve_context(global_loadparm), 
+                                      c->event_ctx);
+       composite_continue(c, conn_req, continue_socket_connect, c);
+       return c;
+}
+
+
+static NTSTATUS dcerpc_pipe_open_socket_recv(struct composite_context *c)
+{
+       NTSTATUS status = composite_wait(c);
+
+       talloc_free(c);
+       return status;
+}
+
+struct pipe_tcp_state {
+       const char *server;
+       const char *target_hostname;
+       const char *address;
+       uint32_t port;
+       struct socket_address *srvaddr;
+       struct dcerpc_connection *conn;
+};
+
+
+#if 0 /* disabled till we can resolve names to ipv6 addresses */
+static void continue_ipv6_open_socket(struct composite_context *ctx);
+#endif
+static void continue_ipv4_open_socket(struct composite_context *ctx);
+static void continue_ip_resolve_name(struct composite_context *ctx);
+
+static void continue_ip_resolve_name(struct composite_context *ctx)
+{
+       struct composite_context *c = talloc_get_type(ctx->async.private_data,
+                                                     struct composite_context);
+       struct pipe_tcp_state *s = talloc_get_type(c->private_data,
+                                                  struct pipe_tcp_state);
+       struct composite_context *sock_ipv4_req;
+
+       c->status = resolve_name_recv(ctx, s, &s->address);
+       if (!composite_is_ok(c)) return;
+
+       /* prepare server address using host ip:port and transport name */
+       s->srvaddr = socket_address_from_strings(s->conn, "ipv4", s->address, s->port);
+       if (composite_nomem(s->srvaddr, c)) return;
+
+       /* resolve_nbt_name gives only ipv4 ... - send socket open request */
+       sock_ipv4_req = dcerpc_pipe_open_socket_send(c, s->conn,
+                                                    s->srvaddr, s->target_hostname,
+                                                    NCACN_IP_TCP);
+       composite_continue(c, sock_ipv4_req, continue_ipv4_open_socket, c);
+}
+
+/*
+  Stage 2 of dcerpc_pipe_open_tcp_send: receive result of pipe open request
+  on IPv6 and send the request on IPv4 unless IPv6 transport succeeded.
 */
-NTSTATUS dcerpc_pipe_open_tcp(struct dcerpc_pipe **p, const char *server, uint32_t port)
+#if 0 /* disabled till we can resolve names to ipv6 addresses */
+static void continue_ipv6_open_socket(struct composite_context *ctx)
 {
-       NTSTATUS status;
-       
-       /* Try IPv6 first */
-       status = dcerpc_pipe_open_socket(p, server, port, "ipv6", NCACN_IP_TCP);
-       if (NT_STATUS_IS_OK(status)) {
-               return status;
+       struct composite_context *c = talloc_get_type(ctx->async.private_data,
+                                                     struct composite_context);
+       struct pipe_tcp_state *s = talloc_get_type(c->private_data,
+                                                  struct pipe_tcp_state);
+       struct composite_context *sock_ipv4_req;
+
+       /* receive result of socket open request */
+       c->status = dcerpc_pipe_open_socket_recv(ctx);
+       if (NT_STATUS_IS_OK(c->status)) {
+               composite_done(c);
+               return;
        }
+
+       talloc_free(s->srvaddr);
+
+       /* prepare server address using host:ip and transport name */
+       s->srvaddr = socket_address_from_strings(s->conn, "ipv4", s->address, s->port);
+       if (composite_nomem(s->srvaddr, c)) return;
+
+       /* try IPv4 if IPv6 fails */
+       sock_ipv4_req = dcerpc_pipe_open_socket_send(c, s->conn, 
+                                                    s->srvaddr, s->target_hostname, 
+                                                    NCACN_IP_TCP);
+       composite_continue(c, sock_ipv4_req, continue_ipv4_open_socket, c);
+}
+#endif
+
+/*
+  Stage 2 of dcerpc_pipe_open_tcp_send: receive result of pipe open request
+  on IPv4 transport.
+*/
+static void continue_ipv4_open_socket(struct composite_context *ctx)
+{
+       struct composite_context *c = talloc_get_type(ctx->async.private_data,
+                                                     struct composite_context);
+       struct pipe_tcp_state *s = talloc_get_type(c->private_data,
+                                                  struct pipe_tcp_state);
        
-       return dcerpc_pipe_open_socket(p, server, port, "ipv4", NCACN_IP_TCP);
+       /* receive result socket open request */
+       c->status = dcerpc_pipe_open_socket_recv(ctx);
+       if (!NT_STATUS_IS_OK(c->status)) {
+               /* something went wrong... */
+               DEBUG(0, ("Failed to connect host %s (%s) on port %d - %s.\n",
+                         s->address, s->target_hostname, 
+                         s->port, nt_errstr(c->status)));
+
+               composite_error(c, c->status);
+               return;
+       }
+
+       composite_done(c);
 }
 
-/* 
-   open a rpc connection to a unix socket 
+
+/*
+  Send rpc pipe open request to given host:port using
+  tcp/ip transport
 */
-NTSTATUS dcerpc_pipe_open_unix_stream(struct dcerpc_pipe **p, const char *path)
+struct composite_context* dcerpc_pipe_open_tcp_send(struct dcerpc_connection *conn,
+                                                   const char *server,
+                                                   const char *target_hostname,
+                                                   uint32_t port,
+                                                   struct resolve_context *resolve_ctx)
 {
-       return dcerpc_pipe_open_socket(p, path, 0, "unix", NCACN_UNIX_STREAM);
+       struct composite_context *c;
+       struct pipe_tcp_state *s;
+       struct composite_context *resolve_req;
+       struct nbt_name name;
+
+       /* composite context allocation and setup */
+       c = composite_create(conn, conn->event_ctx);
+       if (c == NULL) return NULL;
+
+       s = talloc_zero(c, struct pipe_tcp_state);
+       if (composite_nomem(s, c)) return c;
+       c->private_data = s;
+
+       /* store input parameters in state structure */
+       s->server          = talloc_strdup(c, server);
+       if (composite_nomem(s->server, c)) return c;
+       if (target_hostname) {
+               s->target_hostname = talloc_strdup(c, target_hostname);
+               if (composite_nomem(s->target_hostname, c)) return c;
+       }
+       s->port            = port;
+       s->conn            = conn;
+
+       make_nbt_name_server(&name, server);
+       resolve_req = resolve_name_send(resolve_ctx, &name, c->event_ctx);
+       composite_continue(c, resolve_req, continue_ip_resolve_name, c);
+       return c;
 }
 
-/* 
-   open a rpc connection to a named pipe 
+/*
+  Receive result of pipe open request on tcp/ip
 */
-NTSTATUS dcerpc_pipe_open_pipe(struct dcerpc_pipe **p, const char *identifier)
+NTSTATUS dcerpc_pipe_open_tcp_recv(struct composite_context *c)
 {
        NTSTATUS status;
-       char *canon, *full_path;
+       status = composite_wait(c);
+
+       talloc_free(c);
+       return status;
+}
+
+
+struct pipe_unix_state {
+       const char *path;
+       struct socket_address *srvaddr;
+       struct dcerpc_connection *conn;
+};
+
+
+/*
+  Stage 2 of dcerpc_pipe_open_unix_stream_send: receive result of pipe open
+  request on unix socket.
+*/
+static void continue_unix_open_socket(struct composite_context *ctx)
+{
+       struct composite_context *c = talloc_get_type(ctx->async.private_data,
+                                                     struct composite_context);
 
-       canon = talloc_strdup(NULL, identifier);
+       c->status = dcerpc_pipe_open_socket_recv(ctx);
+       if (NT_STATUS_IS_OK(c->status)) {
+               composite_done(c);
+               return;
+       }
+
+       composite_error(c, c->status);
+}
+
+
+/*
+  Send pipe open request on unix socket
+*/
+struct composite_context *dcerpc_pipe_open_unix_stream_send(struct dcerpc_connection *conn,
+                                                           const char *path)
+{
+       struct composite_context *c;
+       struct composite_context *sock_unix_req;
+       struct pipe_unix_state *s;
+
+       /* composite context allocation and setup */
+       c = composite_create(conn, conn->event_ctx);
+       if (c == NULL) return NULL;
+
+       s = talloc_zero(c, struct pipe_unix_state);
+       if (composite_nomem(s, c)) return c;
+       c->private_data = s;
+
+       /* store parameters in state structure */
+       s->path = talloc_strdup(c, path);
+       if (composite_nomem(s->path, c)) return c;
+       s->conn = conn;
+
+       /* prepare server address using socket path and transport name */
+       s->srvaddr = socket_address_from_strings(conn, "unix", s->path, 0);
+       if (composite_nomem(s->srvaddr, c)) return c;
+
+       /* send socket open request */
+       sock_unix_req = dcerpc_pipe_open_socket_send(c, s->conn, 
+                                                    s->srvaddr, NULL,
+                                                    NCALRPC);
+       composite_continue(c, sock_unix_req, continue_unix_open_socket, c);
+       return c;
+}
+
+
+/*
+  Receive result of pipe open request on unix socket
+*/
+NTSTATUS dcerpc_pipe_open_unix_stream_recv(struct composite_context *c)
+{
+       NTSTATUS status = composite_wait(c);
+
+       talloc_free(c);
+       return status;
+}
+
+
+struct pipe_np_state {
+       char *full_path;
+       struct socket_address *srvaddr;
+       struct dcerpc_connection *conn;
+};
+
+
+/*
+  Stage 2 of dcerpc_pipe_open_pipe_send: receive socket open request
+*/
+static void continue_np_open_socket(struct composite_context *ctx)
+{
+       struct composite_context *c = talloc_get_type(ctx->async.private_data,
+                                                     struct composite_context);
+
+       c->status = dcerpc_pipe_open_socket_recv(ctx);
+       if (!composite_is_ok(c)) return;
+
+       composite_done(c);
+}
+
+
+/*
+  Send pipe open request on ncalrpc
+*/
+struct composite_context* dcerpc_pipe_open_pipe_send(struct dcerpc_connection *conn,
+                                                    const char *ncalrpc_dir,
+                                                    const char *identifier)
+{
+       char *canon = NULL;
+
+       struct composite_context *c;
+       struct composite_context *sock_np_req;
+       struct pipe_np_state *s;
+
+       /* composite context allocation and setup */
+       c = composite_create(conn, conn->event_ctx);
+       if (c == NULL) return NULL;
+
+       s = talloc_zero(c, struct pipe_np_state);
+       if (composite_nomem(s, c)) return c;
+       c->private_data = s;
+
+       /* store parameters in state structure */
+       canon = talloc_strdup(s, identifier);
+       if (composite_nomem(canon, c)) return c;
+       s->conn = conn;
 
        string_replace(canon, '/', '\\');
-       full_path = talloc_asprintf(canon, "%s/%s", lp_ncalrpc_dir(), canon);
+       s->full_path = talloc_asprintf(canon, "%s/%s", ncalrpc_dir, canon);
+       if (composite_nomem(s->full_path, c)) return c;
+
+       /* prepare server address using path and transport name */
+       s->srvaddr = socket_address_from_strings(conn, "unix", s->full_path, 0);
+       if (composite_nomem(s->srvaddr, c)) return c;
+
+       /* send socket open request */
+       sock_np_req = dcerpc_pipe_open_socket_send(c, s->conn, s->srvaddr, NULL, NCALRPC);
+       composite_continue(c, sock_np_req, continue_np_open_socket, c);
+       return c;
+}
 
-       status = dcerpc_pipe_open_socket(p, full_path, 0, "unix", NCALRPC);
-       talloc_free(canon);
 
+/*
+  Receive result of pipe open request on ncalrpc
+*/
+NTSTATUS dcerpc_pipe_open_pipe_recv(struct composite_context *c)
+{
+       NTSTATUS status = composite_wait(c);
+       
+       talloc_free(c);
        return status;
 }
+
+
+/*
+  Open a rpc pipe on a named pipe - sync version
+*/
+NTSTATUS dcerpc_pipe_open_pipe(struct dcerpc_connection *conn, const char *ncalrpc_dir, const char *identifier)
+{
+       struct composite_context *c = dcerpc_pipe_open_pipe_send(conn, ncalrpc_dir, identifier);
+       return dcerpc_pipe_open_pipe_recv(c);
+}