s3:rpc_server/witness: add handling of MSG_RPCD_WITNESS_REGISTRATION_UPDATE messages
authorStefan Metzmacher <metze@samba.org>
Wed, 20 Dec 2023 18:22:25 +0000 (19:22 +0100)
committerStefan Metzmacher <metze@samba.org>
Fri, 26 Jan 2024 17:00:33 +0000 (17:00 +0000)
This implements the server side features for the
'net witness [client-move,...]' commands in the end.

These are administrator driven notifications for the witness client.

RPCD_WITNESS_REGISTRATION_UPDATE_FORCE_RESPONSE and
RPCD_WITNESS_REGISTRATION_UPDATE_FORCE_UNREGISTER will be very useful
for later automated testing.

Signed-off-by: Stefan Metzmacher <metze@samba.org>
Reviewed-by: Günther Deschner <gd@samba.org>
source3/rpc_server/witness/srv_witness_nt.c

index 58d11f2f24bcfab70c2e0d44ed0dc008a234ff24..1d5f72070837b4ea17cf7e5705361d10b1175b4b 100644 (file)
@@ -85,6 +85,12 @@ struct swn_service_registration {
 
        struct swn_service_globals *swn;
 
+       struct {
+               struct tevent_context *ev_ctx;
+               struct messaging_context *msg_ctx;
+               struct tevent_req *subreq;
+       } msg;
+
        struct {
                struct policy_handle handle;
                void *ptr;
@@ -99,6 +105,12 @@ struct swn_service_registration {
        const char *share_name;
        struct samba_sockaddr ip_address;
 
+       struct {
+               bool triggered;
+               struct witness_notifyResponse *response;
+               WERROR result;
+       } forced_response;
+
        struct {
                bool triggered;
                /*
@@ -109,6 +121,28 @@ struct swn_service_registration {
                enum witness_interfaceInfo_state last_ip_state;
        } change_notification;
 
+       struct {
+               bool triggered;
+               uint32_t new_node;
+               struct samba_sockaddr new_ip;
+       } move_notification;
+
+       struct {
+               bool required;
+               bool triggered;
+               uint32_t new_node;
+               struct samba_sockaddr new_ip;
+       } share_notification;
+
+       struct {
+               bool required;
+               bool triggered;
+               /*
+                * TODO: find how this works on windows and implement
+                * Windows Server 2022 as client doesn't use it...
+                */
+       } ip_notification;
+
        struct {
                struct timeval create_time;
                struct timeval last_time;
@@ -966,6 +1000,257 @@ WERROR _witness_GetInterfaceList(struct pipes_struct *p,
        return WERR_OK;
 }
 
+static bool swn_server_registration_message_filter(struct messaging_rec *rec, void *private_data)
+{
+       struct swn_service_registration *reg = NULL;
+       struct policy_handle context_handle;
+       enum ndr_err_code ndr_err;
+       DATA_BLOB blob;
+       bool match;
+
+       if (rec->msg_type != MSG_RPCD_WITNESS_REGISTRATION_UPDATE) {
+               return false;
+       }
+
+       if (rec->num_fds != 0) {
+               return false;
+       }
+
+       if (rec->buf.length < 20) {
+               return false;
+       }
+
+       reg = talloc_get_type_abort(private_data, struct swn_service_registration);
+
+       blob = data_blob_const(rec->buf.data, 20);
+       ndr_err = ndr_pull_struct_blob_all_noalloc(&blob, &context_handle,
+                               (ndr_pull_flags_fn_t)ndr_pull_policy_handle);
+       SMB_ASSERT(NDR_ERR_CODE_IS_SUCCESS(ndr_err));
+
+       match = ndr_policy_handle_equal(&context_handle, &reg->key.handle);
+       if (!match) {
+               return false;
+       }
+
+       return true;
+}
+
+static void swn_server_registration_client_move_to_node(
+       struct swn_service_registration *reg,
+       struct rpcd_witness_registration_update_move_to_node *move)
+{
+       reg->move_notification.triggered = true;
+       reg->move_notification.new_node = move->new_node;
+       reg->move_notification.new_ip = (struct samba_sockaddr) {
+               .sa_socklen = 0,
+       };
+
+       tevent_queue_start(reg->async_notify.queue);
+}
+
+static void swn_server_registration_client_move_to_ip(
+       struct swn_service_registration *reg,
+       const char *new_ip_str)
+{
+       struct samba_sockaddr new_ip = {
+               .sa_socklen = 0,
+       };
+       bool ok;
+
+       ok = is_ipaddress(new_ip_str);
+       if (!ok) {
+               return;
+       }
+       ok = interpret_string_addr(&new_ip.u.ss,
+                                  new_ip_str,
+                                  AI_PASSIVE|AI_NUMERICHOST);
+       if (!ok) {
+               return;
+       }
+
+       reg->move_notification.triggered = true;
+       reg->move_notification.new_node = NONCLUSTER_VNN;
+       reg->move_notification.new_ip = new_ip;
+
+       tevent_queue_start(reg->async_notify.queue);
+}
+
+static void swn_server_registration_client_move_to_ipv4(
+       struct swn_service_registration *reg,
+       struct rpcd_witness_registration_update_move_to_ipv4 *move)
+{
+       swn_server_registration_client_move_to_ip(reg, move->new_ipv4);
+}
+
+static void swn_server_registration_client_move_to_ipv6(
+       struct swn_service_registration *reg,
+       struct rpcd_witness_registration_update_move_to_ipv6 *move)
+{
+       swn_server_registration_client_move_to_ip(reg, move->new_ipv6);
+}
+
+static void swn_server_registration_share_move_to_node(
+       struct swn_service_registration *reg,
+       struct rpcd_witness_registration_update_move_to_node *move)
+{
+       if (!reg->share_notification.required) {
+               return;
+       }
+
+       reg->share_notification.triggered = true;
+       reg->share_notification.new_node = move->new_node;
+       reg->share_notification.new_ip = (struct samba_sockaddr) {
+               .sa_socklen = 0,
+       };
+
+       tevent_queue_start(reg->async_notify.queue);
+}
+
+static void swn_server_registration_share_move_to_ip(
+       struct swn_service_registration *reg,
+       const char *new_ip_str)
+{
+       struct samba_sockaddr new_ip = {
+               .sa_socklen = 0,
+       };
+       bool ok;
+
+       ok = is_ipaddress(new_ip_str);
+       if (!ok) {
+               return;
+       }
+       ok = interpret_string_addr(&new_ip.u.ss,
+                                  new_ip_str,
+                                  AI_PASSIVE|AI_NUMERICHOST);
+       if (!ok) {
+               return;
+       }
+
+       if (!reg->share_notification.required) {
+               return;
+       }
+
+       reg->share_notification.triggered = true;
+       reg->share_notification.new_node = NONCLUSTER_VNN;
+       reg->share_notification.new_ip = new_ip;
+
+       tevent_queue_start(reg->async_notify.queue);
+}
+
+static void swn_server_registration_share_move_to_ipv4(
+       struct swn_service_registration *reg,
+       struct rpcd_witness_registration_update_move_to_ipv4 *move)
+{
+       swn_server_registration_share_move_to_ip(reg, move->new_ipv4);
+}
+
+static void swn_server_registration_share_move_to_ipv6(
+       struct swn_service_registration *reg,
+       struct rpcd_witness_registration_update_move_to_ipv6 *move)
+{
+       swn_server_registration_share_move_to_ip(reg, move->new_ipv6);
+}
+
+static void swn_server_registration_force_response(
+       struct swn_service_registration *reg,
+       struct rpcd_witness_registration_update_force_response *response)
+{
+       reg->forced_response.triggered = true;
+       reg->forced_response.response = talloc_move(reg, &response->response);
+       reg->forced_response.result = response->result;
+
+       tevent_queue_start(reg->async_notify.queue);
+}
+
+static void swn_server_registration_message_done(struct tevent_req *subreq)
+{
+       struct swn_service_registration *reg =
+               tevent_req_callback_data(subreq,
+               struct swn_service_registration);
+       TALLOC_CTX *frame = talloc_stackframe();
+       struct messaging_rec *rec = NULL;
+       struct rpcd_witness_registration_updateB update_blob;
+       enum ndr_err_code ndr_err;
+       NTSTATUS status;
+       int ret;
+
+       SMB_ASSERT(reg->msg.subreq == subreq);
+       reg->msg.subreq = NULL;
+
+       ret = messaging_filtered_read_recv(subreq, frame, &rec);
+       TALLOC_FREE(subreq);
+       if (ret != 0) {
+               status = map_nt_error_from_unix_common(ret);
+               DBG_ERR("messaging_filtered_read_recv() - %s\n",
+                       nt_errstr(status));
+               goto wait_for_next;
+       }
+
+       DBG_DEBUG("MSG_RPCD_WITNESS_REGISTRATION_UPDATE: received...\n");
+
+       ndr_err = ndr_pull_struct_blob(&rec->buf, frame, &update_blob,
+                       (ndr_pull_flags_fn_t)ndr_pull_rpcd_witness_registration_updateB);
+       if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
+               status = ndr_map_error2ntstatus(ndr_err);
+               DBG_ERR("ndr_pull_struct_blob - %s\n", nt_errstr(status));
+               goto wait_for_next;
+       }
+
+       if (DEBUGLVL(DBGLVL_DEBUG)) {
+               NDR_PRINT_DEBUG(rpcd_witness_registration_updateB, &update_blob);
+       }
+
+       switch (update_blob.type) {
+       case RPCD_WITNESS_REGISTRATION_UPDATE_CLIENT_MOVE_TO_NODE:
+               swn_server_registration_client_move_to_node(reg,
+                       &update_blob.update.client_move_to_node);
+               break;
+       case RPCD_WITNESS_REGISTRATION_UPDATE_CLIENT_MOVE_TO_IPV4:
+               swn_server_registration_client_move_to_ipv4(reg,
+                       &update_blob.update.client_move_to_ipv4);
+               break;
+       case RPCD_WITNESS_REGISTRATION_UPDATE_CLIENT_MOVE_TO_IPV6:
+               swn_server_registration_client_move_to_ipv6(reg,
+                       &update_blob.update.client_move_to_ipv6);
+               break;
+       case RPCD_WITNESS_REGISTRATION_UPDATE_SHARE_MOVE_TO_NODE:
+               swn_server_registration_share_move_to_node(reg,
+                       &update_blob.update.share_move_to_node);
+               break;
+       case RPCD_WITNESS_REGISTRATION_UPDATE_SHARE_MOVE_TO_IPV4:
+               swn_server_registration_share_move_to_ipv4(reg,
+                       &update_blob.update.share_move_to_ipv4);
+               break;
+       case RPCD_WITNESS_REGISTRATION_UPDATE_SHARE_MOVE_TO_IPV6:
+               swn_server_registration_share_move_to_ipv6(reg,
+                       &update_blob.update.share_move_to_ipv6);
+               break;
+       case RPCD_WITNESS_REGISTRATION_UPDATE_FORCE_UNREGISTER:
+               TALLOC_FREE(reg);
+               TALLOC_FREE(frame);
+               return;
+       case RPCD_WITNESS_REGISTRATION_UPDATE_FORCE_RESPONSE:
+               swn_server_registration_force_response(reg,
+                       &update_blob.update.force_response);
+               break;
+       }
+
+wait_for_next:
+       TALLOC_FREE(frame);
+       reg->msg.subreq = messaging_filtered_read_send(reg,
+                                                      reg->msg.ev_ctx,
+                                                      reg->msg.msg_ctx,
+                                                      swn_server_registration_message_filter,
+                                                      reg);
+       if (reg->msg.subreq == NULL) {
+               DBG_ERR("messaging_filtered_read_send() failed\n");
+               return;
+       }
+       tevent_req_set_callback(reg->msg.subreq,
+                               swn_server_registration_message_done,
+                               reg);
+}
+
 static WERROR swn_server_registration_create(struct swn_service_globals *swn,
                                             struct pipes_struct *p,
                                             const struct witness_RegisterEx *r,
@@ -1029,6 +1314,7 @@ static WERROR swn_server_registration_create(struct swn_service_globals *swn,
                        TALLOC_FREE(reg);
                        return WERR_NOT_ENOUGH_MEMORY;
                }
+               reg->share_notification.required = true;
        }
 
        reg->async_notify.timeout_secs = r->in.timeout;
@@ -1039,6 +1325,9 @@ static WERROR swn_server_registration_create(struct swn_service_globals *swn,
        }
        tevent_queue_stop(reg->async_notify.queue);
 
+       reg->ip_notification.required = (r->in.flags &
+                       WITNESS_REGISTER_IP_NOTIFICATION);
+
        reg->usage.create_time = p->dce_call->time;
        reg->usage.unused_timeout_secs =
                swn_globals->registrations.unused_timeout_secs;
@@ -1058,6 +1347,21 @@ static WERROR swn_server_registration_create(struct swn_service_globals *swn,
        }
        swn_service_registration_update_usage(reg, reg->usage.create_time);
 
+       reg->msg.ev_ctx = p->dce_call->event_ctx;
+       reg->msg.msg_ctx = p->msg_ctx;
+       reg->msg.subreq = messaging_filtered_read_send(reg,
+                                                      reg->msg.ev_ctx,
+                                                      reg->msg.msg_ctx,
+                                                      swn_server_registration_message_filter,
+                                                      reg);
+       if (reg->msg.subreq == NULL) {
+               TALLOC_FREE(reg);
+               return WERR_NOT_ENOUGH_MEMORY;
+       }
+       tevent_req_set_callback(reg->msg.subreq,
+                               swn_server_registration_message_done,
+                               reg);
+
        reg->key.ptr = create_policy_hnd(p, &reg->key.handle,
                                         SWN_SERVICE_CONTEXT_HANDLE_REGISTRATION,
                                         reg);
@@ -1388,7 +1692,12 @@ static struct tevent_req *swn_service_async_notify_send(TALLOC_CTX *mem_ctx,
        tevent_req_set_cleanup_fn(req, swn_service_async_notify_cleanup);
        tevent_req_set_cancel_fn(req, swn_service_async_notify_cancel);
 
-       if (!reg->change_notification.triggered) {
+       if (!reg->forced_response.triggered &&
+           !reg->change_notification.triggered &&
+           !reg->move_notification.triggered &&
+           !reg->share_notification.triggered &&
+           !reg->ip_notification.triggered)
+       {
                tevent_queue_stop(reg->async_notify.queue);
        }
 
@@ -1442,6 +1751,15 @@ static void swn_service_async_notify_trigger(struct tevent_req *req,
                return;
        }
 
+       if (reg->forced_response.triggered) {
+               resp = talloc_move(state, &reg->forced_response.response);
+               forced_result = reg->forced_response.result;
+
+               reg->forced_response.triggered = false;
+               reg->forced_response.result = WERR_OK;
+               goto finished;
+       }
+
        if (reg->change_notification.triggered) {
                struct swn_service_globals *swn = reg->swn;
                const struct swn_service_interface *iface = NULL;
@@ -1539,8 +1857,303 @@ static void swn_service_async_notify_trigger(struct tevent_req *req,
                goto finished;
        }
 
+       if (reg->move_notification.triggered) {
+               struct swn_service_globals *swn = reg->swn;
+               struct swn_service_interface *iface = NULL;
+               union witness_notifyResponse_message *msgs = NULL;
+               struct witness_IPaddrInfoList *list = NULL;
+               uint32_t num_ips = 0;
+               const uint32_t *new_node = NULL;
+               const struct samba_sockaddr *new_ip = NULL;
+
+               if (reg->move_notification.new_node != NONCLUSTER_VNN) {
+                       new_node = &reg->move_notification.new_node;
+               }
+               if (!is_zero_addr(&reg->move_notification.new_ip.u.ss)) {
+                       new_ip = &reg->move_notification.new_ip;
+               }
+
+               for (iface = swn->interfaces.list;
+                    iface != NULL;
+                    iface = iface->next)
+               {
+                       if (new_node != NULL &&
+                           iface->current_vnn != *new_node)
+                       {
+                               continue;
+                       }
+
+                       if (new_ip != NULL &&
+                           !sockaddr_equal(&new_ip->u.sa, &iface->addr.u.sa))
+                       {
+                               continue;
+                       }
+
+                       num_ips += 1;
+               }
+
+               if (num_ips == 0) {
+                       goto no_moves;
+               }
+
+               resp = talloc_zero(state, struct witness_notifyResponse);
+               if (tevent_req_nomem(resp, req)) {
+                       return;
+               }
+
+               msgs = talloc_zero_array(resp,
+                                        union witness_notifyResponse_message,
+                                        1);
+               if (tevent_req_nomem(msgs, req)) {
+                       return;
+               }
+
+               list = &msgs[0].client_move;
+               list->addr = talloc_zero_array(msgs,
+                                              struct witness_IPaddrInfo,
+                                              num_ips);
+               if (tevent_req_nomem(list->addr, req)) {
+                       return;
+               }
+
+               for (iface = swn->interfaces.list;
+                    iface != NULL;
+                    iface = iface->next)
+               {
+                       struct witness_IPaddrInfo *info = &list->addr[list->num];
+                       char addr[INET6_ADDRSTRLEN] = { 0, };
+                       const char *ipv4 = "0.0.0.0";
+                       const char *ipv6 = "::";
+                       uint32_t flags = 0;
+                       bool is_reg_ip = false;
+
+                       if (new_node != NULL &&
+                           iface->current_vnn != *new_node)
+                       {
+                               continue;
+                       }
+
+                       if (new_ip != NULL &&
+                           !sockaddr_equal(&new_ip->u.sa, &iface->addr.u.sa))
+                       {
+                               continue;
+                       }
+
+                       switch (iface->state) {
+                       case WITNESS_STATE_AVAILABLE:
+                               flags |= WITNESS_IPADDR_ONLINE;
+                               break;
+                       case WITNESS_STATE_UNAVAILABLE:
+                               flags |= WITNESS_IPADDR_OFFLINE;
+                               break;
+                       case WITNESS_STATE_UNKNOWN:
+                               /* We map unknown also to offline */
+                               flags |= WITNESS_IPADDR_OFFLINE;
+                               break;
+                       }
+
+                       print_sockaddr(addr, sizeof(addr), &iface->addr.u.ss);
+                       if (iface->addr.u.sa.sa_family == AF_INET) {
+                               flags |= WITNESS_IPADDR_V4;
+                               ipv4 = addr;
+                       } else if (iface->addr.u.sa.sa_family == AF_INET6) {
+                               flags |= WITNESS_IPADDR_V6;
+                               ipv6 = addr;
+                       }
+
+                       info->ipv4 = talloc_strdup(list, ipv4);
+                       if (tevent_req_nomem(info->ipv4, req)) {
+                               return;
+                       }
+                       info->ipv6 = talloc_strdup(list, ipv6);
+                       if (tevent_req_nomem(info->ipv6, req)) {
+                               return;
+                       }
+                       info->flags = flags;
+                       list->num += 1;
+
+                       is_reg_ip = sockaddr_equal(&reg->ip_address.u.sa,
+                                                  &iface->addr.u.sa);
+                       if (!is_reg_ip) {
+                               /*
+                                * In order to let a Windows server 2022
+                                * correctly re-register after moving
+                                * to a new connection, we force an
+                                * unregistration after 5 seconds.
+                                *
+                                * It means the client gets WERR_NOT_FOUND from
+                                * a pending AsyncNotify() and calls
+                                * Unregister() (which also gets
+                                * WERR_NOT_FOUND).  Then the client calls
+                                * GetInterfaceList() and RegisterEx() again.
+                                */
+                               defer_forced_unregister = true;
+                       }
+               }
+
+               resp->type = WITNESS_NOTIFY_CLIENT_MOVE;
+               resp->num = talloc_array_length(msgs);
+               resp->messages = msgs;
+
+no_moves:
+               reg->move_notification.triggered = false;
+               if (resp != NULL) {
+                       goto finished;
+               }
+       }
+
+       if (reg->share_notification.triggered) {
+               struct swn_service_globals *swn = reg->swn;
+               struct swn_service_interface *iface = NULL;
+               union witness_notifyResponse_message *msgs = NULL;
+               struct witness_IPaddrInfoList *list = NULL;
+               uint32_t num_ips = 0;
+               const uint32_t *new_node = NULL;
+               const struct samba_sockaddr *new_ip = NULL;
+
+               if (reg->share_notification.new_node != NONCLUSTER_VNN) {
+                       new_node = &reg->share_notification.new_node;
+               }
+               if (!is_zero_addr(&reg->share_notification.new_ip.u.ss)) {
+                       new_ip = &reg->share_notification.new_ip;
+               }
+
+               for (iface = swn->interfaces.list;
+                    iface != NULL;
+                    iface = iface->next)
+               {
+                       if (new_node != NULL &&
+                           iface->current_vnn != *new_node)
+                       {
+                               continue;
+                       }
+
+                       if (new_ip != NULL &&
+                           !sockaddr_equal(&new_ip->u.sa, &iface->addr.u.sa))
+                       {
+                               continue;
+                       }
+
+                       num_ips += 1;
+               }
+
+               if (num_ips == 0) {
+                       goto no_share_moves;
+               }
+
+               resp = talloc_zero(state, struct witness_notifyResponse);
+               if (tevent_req_nomem(resp, req)) {
+                       return;
+               }
+
+               msgs = talloc_zero_array(resp,
+                                        union witness_notifyResponse_message,
+                                        1);
+               if (tevent_req_nomem(msgs, req)) {
+                       return;
+               }
+
+               list = &msgs[0].client_move;
+               list->addr = talloc_zero_array(msgs,
+                                              struct witness_IPaddrInfo,
+                                              num_ips);
+               if (tevent_req_nomem(list->addr, req)) {
+                       return;
+               }
+
+               for (iface = swn->interfaces.list;
+                    iface != NULL;
+                    iface = iface->next)
+               {
+                       struct witness_IPaddrInfo *info = &list->addr[list->num];
+                       char addr[INET6_ADDRSTRLEN] = { 0, };
+                       const char *ipv4 = "0.0.0.0";
+                       const char *ipv6 = "::";
+                       uint32_t flags = 0;
+                       bool is_reg_ip = false;
+
+                       if (new_node != NULL &&
+                           iface->current_vnn != *new_node)
+                       {
+                               continue;
+                       }
+
+                       if (new_ip != NULL &&
+                           !sockaddr_equal(&new_ip->u.sa, &iface->addr.u.sa))
+                       {
+                               continue;
+                       }
+
+                       switch (iface->state) {
+                       case WITNESS_STATE_AVAILABLE:
+                               flags |= WITNESS_IPADDR_ONLINE;
+                               break;
+                       case WITNESS_STATE_UNAVAILABLE:
+                               flags |= WITNESS_IPADDR_OFFLINE;
+                               break;
+                       case WITNESS_STATE_UNKNOWN:
+                               /* We map unknown also to offline */
+                               flags |= WITNESS_IPADDR_OFFLINE;
+                               break;
+                       }
+
+                       print_sockaddr(addr, sizeof(addr), &iface->addr.u.ss);
+                       if (iface->addr.u.sa.sa_family == AF_INET) {
+                               flags |= WITNESS_IPADDR_V4;
+                               ipv4 = addr;
+                       } else if (iface->addr.u.sa.sa_family == AF_INET6) {
+                               flags |= WITNESS_IPADDR_V6;
+                               ipv6 = addr;
+                       }
+
+                       info->ipv4 = talloc_strdup(list, ipv4);
+                       if (tevent_req_nomem(info->ipv4, req)) {
+                               return;
+                       }
+                       info->ipv6 = talloc_strdup(list, ipv6);
+                       if (tevent_req_nomem(info->ipv6, req)) {
+                               return;
+                       }
+                       info->flags = flags;
+                       list->num += 1;
+
+                       is_reg_ip = sockaddr_equal(&reg->ip_address.u.sa,
+                                                  &iface->addr.u.sa);
+                       if (!is_reg_ip) {
+                               /*
+                                * In order to let a Windows server 2022
+                                * correctly re-register after moving
+                                * to a new connection, we force an
+                                * unregistration after 5 seconds.
+                                *
+                                * It means the client gets WERR_NOT_FOUND from
+                                * a pending AsyncNotify() and calls
+                                * Unregister() (which also gets
+                                * WERR_NOT_FOUND).  Then the client calls
+                                * GetInterfaceList() and RegisterEx() again.
+                                */
+                               defer_forced_unregister = true;
+                       }
+               }
+
+               resp->type = WITNESS_NOTIFY_SHARE_MOVE;
+               resp->num = talloc_array_length(msgs);
+               resp->messages = msgs;
+
+no_share_moves:
+               reg->share_notification.triggered = false;
+               if (resp != NULL) {
+                       goto finished;
+               }
+       }
+
 finished:
-       if (!reg->change_notification.triggered) {
+       if (!reg->forced_response.triggered &&
+           !reg->change_notification.triggered &&
+           !reg->move_notification.triggered &&
+           !reg->share_notification.triggered &&
+           !reg->ip_notification.triggered)
+       {
                tevent_queue_stop(reg->async_notify.queue);
        }