notifyd: Use messaging_register for MSG_SMB_NOTIFY_DB
[metze/samba/wip.git] / source3 / smbd / notifyd / notifyd.c
index 377d9535aa813a2b426cfee7ad058851a0612e13..bc694850e28cf45f919d60a93f0cb1e0b7976b90 100644 (file)
@@ -32,6 +32,7 @@
 #include "notifyd.h"
 #include "lib/util/server_id_db.h"
 #include "lib/util/tevent_unix.h"
+#include "lib/util/tevent_ntstatus.h"
 #include "ctdbd_conn.h"
 #include "ctdb_srvids.h"
 #include "server_id_db_util.h"
@@ -122,20 +123,20 @@ struct notifyd_peer {
        time_t last_broadcast;
 };
 
-static bool notifyd_rec_change(struct messaging_context *msg_ctx,
-                              struct messaging_rec **prec,
-                              void *private_data);
-static bool notifyd_trigger(struct messaging_context *msg_ctx,
-                           struct messaging_rec **prec,
-                           void *private_data);
-static bool notifyd_get_db(struct messaging_context *msg_ctx,
-                          struct messaging_rec **prec,
-                          void *private_data);
+static void notifyd_rec_change(struct messaging_context *msg_ctx,
+                              void *private_data, uint32_t msg_type,
+                              struct server_id src, DATA_BLOB *data);
+static void notifyd_trigger(struct messaging_context *msg_ctx,
+                           void *private_data, uint32_t msg_type,
+                           struct server_id src, DATA_BLOB *data);
+static void notifyd_get_db(struct messaging_context *msg_ctx,
+                          void *private_data, uint32_t msg_type,
+                          struct server_id src, DATA_BLOB *data);
 
 #ifdef CLUSTER_SUPPORT
-static bool notifyd_got_db(struct messaging_context *msg_ctx,
-                          struct messaging_rec **prec,
-                          void *private_data);
+static void notifyd_got_db(struct messaging_context *msg_ctx,
+                          void *private_data, uint32_t msg_type,
+                          struct server_id src, DATA_BLOB *data);
 static void notifyd_broadcast_reclog(struct ctdbd_connection *ctdbd_conn,
                                     struct server_id src,
                                     struct messaging_reclog *log);
@@ -192,9 +193,13 @@ struct tevent_req *notifyd_send(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
                                sys_notify_watch_fn sys_notify_watch,
                                struct sys_notify_context *sys_notify_ctx)
 {
-       struct tevent_req *req, *subreq;
+       struct tevent_req *req;
+#ifdef CLUSTER_SUPPORT
+       struct tevent_req *subreq;
+#endif
        struct notifyd_state *state;
        struct server_id_db *names_db;
+       NTSTATUS status;
        int ret;
 
        req = tevent_req_create(mem_ctx, &state, struct notifyd_state);
@@ -217,41 +222,23 @@ struct tevent_req *notifyd_send(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
                return tevent_req_post(req, ev);
        }
 
-       subreq = messaging_handler_send(state, ev, msg_ctx,
-                                       MSG_SMB_NOTIFY_REC_CHANGE,
-                                       notifyd_rec_change, state);
-       if (tevent_req_nomem(subreq, req)) {
-               return tevent_req_post(req, ev);
-       }
-       tevent_req_set_callback(subreq, notifyd_handler_done, req);
-
-       subreq = messaging_handler_send(state, ev, msg_ctx,
-                                       MSG_SMB_NOTIFY_TRIGGER,
-                                       notifyd_trigger, state);
-       if (tevent_req_nomem(subreq, req)) {
+       status = messaging_register(msg_ctx, state, MSG_SMB_NOTIFY_REC_CHANGE,
+                                   notifyd_rec_change);
+       if (tevent_req_nterror(req, status)) {
                return tevent_req_post(req, ev);
        }
-       tevent_req_set_callback(subreq, notifyd_handler_done, req);
 
-       subreq = messaging_handler_send(state, ev, msg_ctx,
-                                       MSG_SMB_NOTIFY_GET_DB,
-                                       notifyd_get_db, state);
-       if (tevent_req_nomem(subreq, req)) {
-               return tevent_req_post(req, ev);
+       status = messaging_register(msg_ctx, state, MSG_SMB_NOTIFY_TRIGGER,
+                                   notifyd_trigger);
+       if (tevent_req_nterror(req, status)) {
+               goto deregister_rec_change;
        }
-       tevent_req_set_callback(subreq, notifyd_handler_done, req);
 
-#ifdef CLUSTER_SUPPORT
-       if (ctdbd_conn != NULL) {
-               subreq = messaging_handler_send(state, ev, msg_ctx,
-                                               MSG_SMB_NOTIFY_DB,
-                                               notifyd_got_db, state);
-               if (tevent_req_nomem(subreq, req)) {
-                       return tevent_req_post(req, ev);
-               }
-               tevent_req_set_callback(subreq, notifyd_handler_done, req);
+       status = messaging_register(msg_ctx, state, MSG_SMB_NOTIFY_GET_DB,
+                                   notifyd_get_db);
+       if (tevent_req_nterror(req, status)) {
+               goto deregister_trigger;
        }
-#endif
 
        names_db = messaging_names_db(msg_ctx);
 
@@ -260,7 +247,7 @@ struct tevent_req *notifyd_send(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
                DEBUG(10, ("%s: server_id_db_add failed: %s\n",
                           __func__, strerror(ret)));
                tevent_req_error(req, ret);
-               return tevent_req_post(req, ev);
+               goto deregister_get_db;
        }
 
        if (ctdbd_conn == NULL) {
@@ -272,41 +259,57 @@ struct tevent_req *notifyd_send(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
        }
 
 #ifdef CLUSTER_SUPPORT
-       if (ctdbd_conn != NULL) {
-               state->log = talloc_zero(state, struct messaging_reclog);
-               if (tevent_req_nomem(state->log, req)) {
-                       return tevent_req_post(req, ev);
-               }
+       status = messaging_register(msg_ctx, state, MSG_SMB_NOTIFY_DB,
+                                   notifyd_got_db);
+       if (tevent_req_nterror(req, status)) {
+               goto deregister_get_db;
+       }
 
-               subreq = notifyd_broadcast_reclog_send(
-                       state->log, ev, ctdbd_conn,
-                       messaging_server_id(msg_ctx),
-                       state->log);
-               if (tevent_req_nomem(subreq, req)) {
-                       return tevent_req_post(req, ev);
-               }
-               tevent_req_set_callback(subreq,
-                                       notifyd_broadcast_reclog_finished,
-                                       req);
+       state->log = talloc_zero(state, struct messaging_reclog);
+       if (tevent_req_nomem(state->log, req)) {
+               goto deregister_db;
+       }
 
-               subreq = notifyd_clean_peers_send(state, ev, state);
-               if (tevent_req_nomem(subreq, req)) {
-                       return tevent_req_post(req, ev);
-               }
-               tevent_req_set_callback(subreq, notifyd_clean_peers_finished,
-                                       req);
+       subreq = notifyd_broadcast_reclog_send(
+               state->log, ev, ctdbd_conn,
+               messaging_server_id(msg_ctx),
+               state->log);
+       if (tevent_req_nomem(subreq, req)) {
+               goto deregister_db;
+       }
+       tevent_req_set_callback(subreq,
+                               notifyd_broadcast_reclog_finished,
+                               req);
 
-               ret = register_with_ctdbd(ctdbd_conn,
-                                         CTDB_SRVID_SAMBA_NOTIFY_PROXY,
-                                         notifyd_snoop_broadcast, state);
-               if (ret != 0) {
-                       tevent_req_error(req, ret);
-                       return tevent_req_post(req, ev);
-               }
+       subreq = notifyd_clean_peers_send(state, ev, state);
+       if (tevent_req_nomem(subreq, req)) {
+               goto deregister_db;
+       }
+       tevent_req_set_callback(subreq, notifyd_clean_peers_finished,
+                               req);
+
+       ret = register_with_ctdbd(ctdbd_conn,
+                                 CTDB_SRVID_SAMBA_NOTIFY_PROXY,
+                                 notifyd_snoop_broadcast, state);
+       if (ret != 0) {
+               tevent_req_error(req, ret);
+               goto deregister_db;
        }
 #endif
 
        return req;
+
+#ifdef CLUSTER_SUPPORT
+deregister_db:
+       messaging_deregister(msg_ctx, MSG_SMB_NOTIFY_DB, state);
+#endif
+deregister_get_db:
+       messaging_deregister(msg_ctx, MSG_SMB_NOTIFY_GET_DB, state);
+deregister_trigger:
+       messaging_deregister(msg_ctx, MSG_SMB_NOTIFY_TRIGGER, state);
+deregister_rec_change:
+       messaging_deregister(msg_ctx, MSG_SMB_NOTIFY_REC_CHANGE, state);
+       return tevent_req_post(req, ev);
 }
 
 static void notifyd_handler_done(struct tevent_req *subreq)
@@ -574,40 +577,38 @@ static bool notifyd_parse_rec_change(uint8_t *buf, size_t bufsize,
        return true;
 }
 
-static bool notifyd_rec_change(struct messaging_context *msg_ctx,
-                              struct messaging_rec **prec,
-                              void *private_data)
+static void notifyd_rec_change(struct messaging_context *msg_ctx,
+                              void *private_data, uint32_t msg_type,
+                              struct server_id src, DATA_BLOB *data)
 {
        struct notifyd_state *state = talloc_get_type_abort(
                private_data, struct notifyd_state);
        struct server_id_buf idbuf;
-       struct messaging_rec *rec = *prec;
        struct notify_rec_change_msg *msg;
        size_t pathlen;
        bool ok;
 
-       DEBUG(10, ("%s: Got %d bytes from %s\n", __func__,
-                  (unsigned)rec->buf.length,
-                  server_id_str_buf(rec->src, &idbuf)));
+       DBG_DEBUG("Got %zu bytes from %s\n", data->length,
+                 server_id_str_buf(src, &idbuf));
 
-       ok = notifyd_parse_rec_change(rec->buf.data, rec->buf.length,
+       ok = notifyd_parse_rec_change(data->data, data->length,
                                      &msg, &pathlen);
        if (!ok) {
-               return true;
+               return;
        }
 
        ok = notifyd_apply_rec_change(
-               &rec->src, msg->path, pathlen, &msg->instance,
+               &src, msg->path, pathlen, &msg->instance,
                state->entries, state->sys_notify_watch, state->sys_notify_ctx,
                state->msg_ctx);
        if (!ok) {
                DEBUG(1, ("%s: notifyd_apply_rec_change failed, ignoring\n",
                          __func__));
-               return true;
+               return;
        }
 
        if ((state->log == NULL) || (state->ctdbd_conn == NULL)) {
-               return true;
+               return;
        }
 
 #ifdef CLUSTER_SUPPORT
@@ -615,6 +616,7 @@ static bool notifyd_rec_change(struct messaging_context *msg_ctx,
 
        struct messaging_rec **tmp;
        struct messaging_reclog *log;
+       struct iovec iov = { .iov_base = data->data, .iov_len = data->length };
 
        log = state->log;
 
@@ -622,11 +624,19 @@ static bool notifyd_rec_change(struct messaging_context *msg_ctx,
                             log->num_recs+1);
        if (tmp == NULL) {
                DEBUG(1, ("%s: talloc_realloc failed, ignoring\n", __func__));
-               return true;
+               return;
        }
        log->recs = tmp;
 
-       log->recs[log->num_recs] = talloc_move(log->recs, prec);
+       log->recs[log->num_recs] = messaging_rec_create(
+               log->recs, src, messaging_server_id(msg_ctx),
+               msg_type, &iov, 1, NULL, 0);
+
+       if (log->recs[log->num_recs] == NULL) {
+               DBG_WARNING("messaging_rec_create failed, ignoring\n");
+               return;
+       }
+
        log->num_recs += 1;
 
        if (log->num_recs >= 100) {
@@ -639,8 +649,6 @@ static bool notifyd_rec_change(struct messaging_context *msg_ctx,
 
        }
 #endif
-
-       return true;
 }
 
 struct notifyd_trigger_state {
@@ -653,34 +661,33 @@ struct notifyd_trigger_state {
 static void notifyd_trigger_parser(TDB_DATA key, TDB_DATA data,
                                   void *private_data);
 
-static bool notifyd_trigger(struct messaging_context *msg_ctx,
-                           struct messaging_rec **prec,
-                           void *private_data)
+static void notifyd_trigger(struct messaging_context *msg_ctx,
+                           void *private_data, uint32_t msg_type,
+                           struct server_id src, DATA_BLOB *data)
 {
        struct notifyd_state *state = talloc_get_type_abort(
                private_data, struct notifyd_state);
        struct server_id my_id = messaging_server_id(msg_ctx);
-       struct messaging_rec *rec = *prec;
        struct notifyd_trigger_state tstate;
        const char *path;
        const char *p, *next_p;
 
-       if (rec->buf.length < offsetof(struct notify_trigger_msg, path) + 1) {
-               DEBUG(1, ("message too short, ignoring: %u\n",
-                         (unsigned)rec->buf.length));
-               return true;
+       if (data->length < offsetof(struct notify_trigger_msg, path) + 1) {
+               DBG_WARNING("message too short, ignoring: %zu\n",
+                           data->length);
+               return;
        }
-       if (rec->buf.data[rec->buf.length-1] != 0) {
+       if (data->data[data->length-1] != 0) {
                DEBUG(1, ("%s: path not 0-terminated, ignoring\n", __func__));
-               return true;
+               return;
        }
 
        tstate.msg_ctx = msg_ctx;
 
-       tstate.covered_by_sys_notify = (rec->src.vnn == my_id.vnn);
-       tstate.covered_by_sys_notify &= !server_id_equal(&rec->src, &my_id);
+       tstate.covered_by_sys_notify = (src.vnn == my_id.vnn);
+       tstate.covered_by_sys_notify &= !server_id_equal(&src, &my_id);
 
-       tstate.msg = (struct notify_trigger_msg *)rec->buf.data;
+       tstate.msg = (struct notify_trigger_msg *)data->data;
        path = tstate.msg->path;
 
        DEBUG(10, ("%s: Got trigger_msg action=%u, filter=%u, path=%s\n",
@@ -690,7 +697,7 @@ static bool notifyd_trigger(struct messaging_context *msg_ctx,
        if (path[0] != '/') {
                DEBUG(1, ("%s: path %s does not start with /, ignoring\n",
                          __func__, path));
-               return true;
+               return;
        }
 
        for (p = strchr(path+1, '/'); p != NULL; p = next_p) {
@@ -714,7 +721,7 @@ static bool notifyd_trigger(struct messaging_context *msg_ctx,
                        continue;
                }
 
-               if (rec->src.vnn != my_id.vnn) {
+               if (src.vnn != my_id.vnn) {
                        continue;
                }
 
@@ -729,8 +736,6 @@ static bool notifyd_trigger(struct messaging_context *msg_ctx,
                                            notifyd_trigger_parser, &tstate);
                }
        }
-
-       return true;
 }
 
 static void notifyd_send_delete(struct messaging_context *msg_ctx,
@@ -852,13 +857,12 @@ static void notifyd_send_delete(struct messaging_context *msg_ctx,
        }
 }
 
-static bool notifyd_get_db(struct messaging_context *msg_ctx,
-                          struct messaging_rec **prec,
-                          void *private_data)
+static void notifyd_get_db(struct messaging_context *msg_ctx,
+                          void *private_data, uint32_t msg_type,
+                          struct server_id src, DATA_BLOB *data)
 {
        struct notifyd_state *state = talloc_get_type_abort(
                private_data, struct notifyd_state);
-       struct messaging_rec *rec = *prec;
        struct server_id_buf id1, id2;
        NTSTATUS status;
        uint64_t rec_index = UINT64_MAX;
@@ -869,11 +873,11 @@ static bool notifyd_get_db(struct messaging_context *msg_ctx,
 
        dbsize = dbwrap_marshall(state->entries, NULL, 0);
 
-       buf = talloc_array(rec, uint8_t, dbsize);
+       buf = talloc_array(talloc_tos(), uint8_t, dbsize);
        if (buf == NULL) {
                DEBUG(1, ("%s: talloc_array(%ju) failed\n",
                          __func__, (uintmax_t)dbsize));
-               return true;
+               return;
        }
 
        dbsize = dbwrap_marshall(state->entries, buf, dbsize);
@@ -883,7 +887,7 @@ static bool notifyd_get_db(struct messaging_context *msg_ctx,
                          (uintmax_t)talloc_get_size(buf),
                          (uintmax_t)dbsize));
                TALLOC_FREE(buf);
-               return true;
+               return;
        }
 
        if (state->log != NULL) {
@@ -899,17 +903,15 @@ static bool notifyd_get_db(struct messaging_context *msg_ctx,
        DEBUG(10, ("%s: Sending %ju bytes to %s->%s\n", __func__,
                   (uintmax_t)iov_buflen(iov, ARRAY_SIZE(iov)),
                   server_id_str_buf(messaging_server_id(msg_ctx), &id1),
-                  server_id_str_buf(rec->src, &id2)));
+                  server_id_str_buf(src, &id2)));
 
-       status = messaging_send_iov(msg_ctx, rec->src, MSG_SMB_NOTIFY_DB,
+       status = messaging_send_iov(msg_ctx, src, MSG_SMB_NOTIFY_DB,
                                    iov, ARRAY_SIZE(iov), NULL, 0);
        TALLOC_FREE(buf);
        if (!NT_STATUS_IS_OK(status)) {
                DEBUG(1, ("%s: messaging_send_iov failed: %s\n",
                          __func__, nt_errstr(status)));
        }
-
-       return true;
 }
 
 #ifdef CLUSTER_SUPPORT
@@ -917,13 +919,12 @@ static bool notifyd_get_db(struct messaging_context *msg_ctx,
 static int notifyd_add_proxy_syswatches(struct db_record *rec,
                                        void *private_data);
 
-static bool notifyd_got_db(struct messaging_context *msg_ctx,
-                          struct messaging_rec **prec,
-                          void *private_data)
+static void notifyd_got_db(struct messaging_context *msg_ctx,
+                          void *private_data, uint32_t msg_type,
+                          struct server_id src, DATA_BLOB *data)
 {
        struct notifyd_state *state = talloc_get_type_abort(
                private_data, struct notifyd_state);
-       struct messaging_rec *rec = *prec;
        struct notifyd_peer *p = NULL;
        struct server_id_buf idbuf;
        NTSTATUS status;
@@ -931,52 +932,49 @@ static bool notifyd_got_db(struct messaging_context *msg_ctx,
        size_t i;
 
        for (i=0; i<state->num_peers; i++) {
-               if (server_id_equal(&rec->src, &state->peers[i]->pid)) {
+               if (server_id_equal(&src, &state->peers[i]->pid)) {
                        p = state->peers[i];
                        break;
                }
        }
 
        if (p == NULL) {
-               DEBUG(10, ("%s: Did not find peer for db from %s\n",
-                          __func__, server_id_str_buf(rec->src, &idbuf)));
-               return true;
+               DBG_DEBUG("Did not find peer for db from %s\n",
+                         server_id_str_buf(src, &idbuf));
+               return;
        }
 
-       if (rec->buf.length < 8) {
-               DEBUG(10, ("%s: Got short db length %u from %s\n", __func__,
-                          (unsigned)rec->buf.length,
-                          server_id_str_buf(rec->src, &idbuf)));
+       if (data->length < 8) {
+               DBG_DEBUG("Got short db length %zu from %s\n", data->length,
+                          server_id_str_buf(src, &idbuf));
                TALLOC_FREE(p);
-               return true;
+               return;
        }
 
-       p->rec_index = BVAL(rec->buf.data, 0);
+       p->rec_index = BVAL(data->data, 0);
 
        p->db = db_open_rbt(p);
        if (p->db == NULL) {
                DEBUG(10, ("%s: db_open_rbt failed\n", __func__));
                TALLOC_FREE(p);
-               return true;
+               return;
        }
 
-       status = dbwrap_unmarshall(p->db, rec->buf.data + 8,
-                                  rec->buf.length - 8);
+       status = dbwrap_unmarshall(p->db, data->data + 8,
+                                  data->length - 8);
        if (!NT_STATUS_IS_OK(status)) {
                DEBUG(10, ("%s: dbwrap_unmarshall returned %s for db %s\n",
                           __func__, nt_errstr(status),
-                          server_id_str_buf(rec->src, &idbuf)));
+                          server_id_str_buf(src, &idbuf)));
                TALLOC_FREE(p);
-               return true;
+               return;
        }
 
        dbwrap_traverse_read(p->db, notifyd_add_proxy_syswatches, state,
                             &count);
 
        DEBUG(10, ("%s: Database from %s contained %d records\n", __func__,
-                  server_id_str_buf(rec->src, &idbuf), count));
-
-       return true;
+                  server_id_str_buf(src, &idbuf), count));
 }
 
 static void notifyd_broadcast_reclog(struct ctdbd_connection *ctdbd_conn,