s3-mdssvc: use tevent_glib_glue in mdssvc RPC service
authorRalph Boehme <slow@samba.org>
Wed, 27 Jan 2016 12:23:51 +0000 (13:23 +0100)
committerRalph Boehme <slow@samba.org>
Wed, 24 Apr 2019 18:32:15 +0000 (18:32 +0000)
Signed-off-by: Ralph Boehme <slow@samba.org>
Reviewed-by: Noel Power <npower@samba.org>
source3/rpc_server/mdssvc/mdssvc.c
source3/rpc_server/mdssvc/mdssvc.h
source3/rpc_server/wscript_build

index f03cb9d7ebd4a19c3b06739447ff458f78efc9a5..c8545a52e5517f38bbe76ff544ba6ba10980076b 100644 (file)
@@ -28,6 +28,7 @@
 #include "libcli/security/dom_sid.h"
 #include "mdssvc.h"
 #include "rpc_server/mdssvc/sparql_parser.tab.h"
+#include "lib/tevent_glib_glue.h"
 
 #undef DBGC_CLASS
 #define DBGC_CLASS DBGC_RPC_SRV
@@ -64,6 +65,15 @@ struct slq_destroy_state {
        struct sl_query *slq;
 };
 
+/*
+ * This is a static global because we may be called multiple times and
+ * we only want one mdssvc_ctx per connection to Tracker.
+ *
+ * The client will bind multiple times to the mdssvc RPC service, once
+ * for every tree connect.
+ */
+static struct mdssvc_ctx *mdssvc_ctx = NULL;
+
 /*
  * If these functions return an error, they hit something like a non
  * recoverable talloc error. Most errors are dealt with by returning
@@ -731,7 +741,6 @@ static void tracker_con_cb(GObject *object,
        }
 
        DEBUG(10, ("connected to Tracker\n"));
-       g_main_loop_quit(mds_ctx->gmainloop);
 }
 
 static void tracker_cursor_cb_destroy_done(struct tevent_req *subreq);
@@ -770,7 +779,6 @@ static void tracker_cursor_cb(GObject *object,
                 * we return.
                 */
                SLQ_DEBUG(10, slq, "closed");
-               g_main_loop_quit(slq->mds_ctx->gmainloop);
 
                req = slq_destroy_send(slq, global_event_context(), &slq);
                if (req == NULL) {
@@ -785,13 +793,11 @@ static void tracker_cursor_cb(GObject *object,
                DEBUG(1, ("Tracker cursor: %s\n", error->message));
                g_error_free(error);
                slq->state = SLQ_STATE_ERROR;
-               g_main_loop_quit(slq->mds_ctx->gmainloop);
                return;
        }
 
        if (!more_results) {
                slq->state = SLQ_STATE_DONE;
-               g_main_loop_quit(slq->mds_ctx->gmainloop);
                return;
        }
 
@@ -799,14 +805,12 @@ static void tracker_cursor_cb(GObject *object,
        if (uri == NULL) {
                DEBUG(1, ("error fetching Tracker URI\n"));
                slq->state = SLQ_STATE_ERROR;
-               g_main_loop_quit(slq->mds_ctx->gmainloop);
                return;
        }
        path = tracker_to_unix_path(slq->query_results, uri);
        if (path == NULL) {
                DEBUG(1, ("error converting Tracker URI to path: %s\n", uri));
                slq->state = SLQ_STATE_ERROR;
-               g_main_loop_quit(slq->mds_ctx->gmainloop);
                return;
        }
 
@@ -866,7 +870,6 @@ static void tracker_cursor_cb(GObject *object,
        if (result != 0) {
                DEBUG(1, ("dalloc error\n"));
                slq->state = SLQ_STATE_ERROR;
-               g_main_loop_quit(slq->mds_ctx->gmainloop);
                return;
        }
        ok = add_filemeta(slq->reqinfo, slq->query_results->fm_array,
@@ -874,7 +877,6 @@ static void tracker_cursor_cb(GObject *object,
        if (!ok) {
                DEBUG(1, ("add_filemeta error\n"));
                slq->state = SLQ_STATE_ERROR;
-               g_main_loop_quit(slq->mds_ctx->gmainloop);
                return;
        }
 
@@ -882,7 +884,6 @@ static void tracker_cursor_cb(GObject *object,
        if (!ok) {
                DEBUG(1, ("inode_map_add error\n"));
                slq->state = SLQ_STATE_ERROR;
-               g_main_loop_quit(slq->mds_ctx->gmainloop);
                return;
        }
 
@@ -892,7 +893,6 @@ done:
        if (slq->query_results->num_results >= MAX_SL_RESULTS) {
                slq->state = SLQ_STATE_FULL;
                SLQ_DEBUG(10, slq, "full");
-               g_main_loop_quit(slq->mds_ctx->gmainloop);
                return;
        }
 
@@ -929,13 +929,11 @@ static void tracker_query_cb(GObject *object,
                slq->state = SLQ_STATE_ERROR;
                DEBUG(1, ("Tracker query error: %s\n", error->message));
                g_error_free(error);
-               g_main_loop_quit(slq->mds_ctx->gmainloop);
                return;
        }
 
        if (slq->state == SLQ_STATE_DONE) {
                SLQ_DEBUG(10, slq, "done");
-               g_main_loop_quit(slq->mds_ctx->gmainloop);
                talloc_free(slq);
                return;
        }
@@ -1302,13 +1300,11 @@ static bool slrpc_open_query(struct mds_ctx *mds_ctx,
 
        DEBUG(10, ("SPARQL query: \"%s\"\n", slq->sparql_query));
 
-       g_main_context_push_thread_default(mds_ctx->gcontext);
        tracker_sparql_connection_query_async(mds_ctx->tracker_con,
                                              slq->sparql_query,
                                              slq->gcancellable,
                                              tracker_query_cb,
                                              slq);
-       g_main_context_pop_thread_default(mds_ctx->gcontext);
        slq->state = SLQ_STATE_RUNNING;
 
        sl_result = 0;
@@ -1401,13 +1397,11 @@ static bool slrpc_fetch_query_results(struct mds_ctx *mds_ctx,
                }
                if (slq->state == SLQ_STATE_FULL) {
                        slq->state = SLQ_STATE_RESULTS;
-                       g_main_context_push_thread_default(mds_ctx->gcontext);
                        tracker_sparql_cursor_next_async(
                                slq->tracker_cursor,
                                slq->gcancellable,
                                tracker_cursor_cb,
                                slq);
-                       g_main_context_pop_thread_default(mds_ctx->gcontext);
                }
                break;
 
@@ -1811,6 +1805,42 @@ done:
        return true;
 }
 
+static struct mdssvc_ctx *mdssvc_init(struct tevent_context *ev)
+{
+       if (mdssvc_ctx != NULL) {
+               return mdssvc_ctx;
+       }
+
+       mdssvc_ctx = talloc_zero(ev, struct mdssvc_ctx);
+       if (mdssvc_ctx == NULL) {
+               return NULL;
+       }
+
+       mdssvc_ctx->ev_ctx = ev;
+
+       mdssvc_ctx->gmain_ctx = g_main_context_new();
+       if (mdssvc_ctx->gmain_ctx == NULL) {
+               DBG_ERR("error from g_main_context_new\n");
+               return NULL;
+       }
+
+       /*
+        * This ensures all glib threads, especially gioi worker threads
+        * dispatch their async callbacks via our gmain_ctx.
+        */
+       g_main_context_push_thread_default(mdssvc_ctx->gmain_ctx);
+
+       mdssvc_ctx->glue = samba_tevent_glib_glue_create(ev,
+                                                        mdssvc_ctx->ev_ctx,
+                                                        mdssvc_ctx->gmain_ctx);
+       if (mdssvc_ctx->glue == NULL) {
+               DBG_ERR("samba_tevent_glib_glue_create failed\n");
+               return NULL;
+       }
+
+       return mdssvc_ctx;
+}
+
 /**
  * Init callbacks at startup
  **/
@@ -1824,21 +1854,25 @@ bool mds_init(struct messaging_context *msg_ctx)
 
 bool mds_shutdown(void)
 {
-       return true;
-}
+       if (mdssvc_ctx == NULL) {
+               return false;
+       }
 
-static gboolean gmainloop_timer(gpointer user_data)
-{
-       struct mds_ctx *ctx = talloc_get_type_abort(user_data, struct mds_ctx);
+       samba_tevent_glib_glue_quit(mdssvc_ctx->glue);
+       TALLOC_FREE(mdssvc_ctx->glue);
 
-       DEBUG(10,("%s\n", __func__));
-       g_main_loop_quit(ctx->gmainloop);
+       g_main_context_pop_thread_default(mdssvc_ctx->gmain_ctx);
 
-       return G_SOURCE_CONTINUE;
+       TALLOC_FREE(mdssvc_ctx);
+
+       return true;
 }
 
 /**
- * Initialise a context per share handle
+ * Initialise a context per RPC bind
+ *
+ * This ends up being called for every tcon, because the client does a
+ * RPC bind for every tcon, so this is acually a per tcon context.
  **/
 struct mds_ctx *mds_init_ctx(TALLOC_CTX *mem_ctx,
                             struct tevent_context *ev,
@@ -1853,6 +1887,11 @@ struct mds_ctx *mds_init_ctx(TALLOC_CTX *mem_ctx,
        }
        talloc_set_destructor(mds_ctx, mds_ctx_destructor_cb);
 
+       mds_ctx->mdssvc_ctx = mdssvc_init(ev);
+       if (mds_ctx->mdssvc_ctx == NULL) {
+               goto error;
+       }
+
        mds_ctx->spath = talloc_strdup(mds_ctx, path);
        if (mds_ctx->spath == NULL) {
                goto error;
@@ -1872,22 +1911,8 @@ struct mds_ctx *mds_init_ctx(TALLOC_CTX *mem_ctx,
                goto error;
        }
 
-       mds_ctx->gcontext = g_main_context_new();
-       if (mds_ctx->gcontext == NULL) {
-               DEBUG(1,("error from g_main_context_new\n"));
-               goto error;
-       }
-
-       mds_ctx->gmainloop = g_main_loop_new(mds_ctx->gcontext, false);
-       if (mds_ctx->gmainloop == NULL) {
-               DEBUG(1,("error from g_main_loop_new\n"));
-               goto error;
-       }
-
-       g_main_context_push_thread_default(mds_ctx->gcontext);
        tracker_sparql_connection_get_async(mds_ctx->gcancellable,
                                            tracker_con_cb, mds_ctx);
-       g_main_context_pop_thread_default(mds_ctx->gcontext);
 
        return mds_ctx;
 
@@ -1920,76 +1945,12 @@ int mds_ctx_destructor_cb(struct mds_ctx *mds_ctx)
                g_cancellable_cancel(mds_ctx->gcancellable);
                g_object_unref(mds_ctx->gcancellable);
        }
-       if (mds_ctx->gmainloop != NULL) {
-               g_main_loop_unref(mds_ctx->gmainloop);
-       }
-       if (mds_ctx->gcontext != NULL) {
-               g_main_context_unref(mds_ctx->gcontext);
-       }
 
        ZERO_STRUCTP(mds_ctx);
 
        return 0;
 }
 
-static bool mds_run_gmainloop(struct mds_ctx *mds_ctx, guint timeout)
-{
-       guint timer_id;
-       GSource *timer;
-
-       /*
-        * It seems the event processing of the libtracker-sparql
-        * async subsystem defers callbacks until *all* events are
-        * processes by the async subsystem main processing loop.
-        *
-        * g_main_context_iteration(may_block=FALSE) can't be used,
-        * because a search that produces a few thousand matches
-        * generates as many events that must be processed in either
-        * g_main_context_iteration() or g_main_loop_run() before
-        * callbacks are called.
-        *
-        * Unfortunately g_main_context_iteration() only processes a
-        * small subset of these event (1-30) at a time when run in
-        * mds_dispatch(), which happens once a second while the
-        * client polls for results.
-        *
-        * Carefully using the blocking g_main_loop_run() fixes
-        * this. It processes events until we exit from the loop at
-        * defined exit points. By adding a 1 ms timeout we at least
-        * try to get as close as possible to non-blocking behaviour.
-        */
-
-       if (!g_main_context_pending(mds_ctx->gcontext)) {
-               return true;
-       }
-
-       g_main_context_push_thread_default(mds_ctx->gcontext);
-
-       timer = g_timeout_source_new(timeout);
-       if (timer == NULL) {
-               DEBUG(1,("g_timeout_source_new_seconds\n"));
-               g_main_context_pop_thread_default(mds_ctx->gcontext);
-               return false;
-       }
-
-       timer_id = g_source_attach(timer, mds_ctx->gcontext);
-       if (timer_id == 0) {
-               DEBUG(1,("g_timeout_add failed\n"));
-               g_source_destroy(timer);
-               g_main_context_pop_thread_default(mds_ctx->gcontext);
-               return false;
-       }
-
-       g_source_set_callback(timer, gmainloop_timer, mds_ctx, NULL);
-
-       g_main_loop_run(mds_ctx->gmainloop);
-
-       g_source_destroy(timer);
-
-       g_main_context_pop_thread_default(mds_ctx->gcontext);
-       return true;
-}
-
 /**
  * Dispatch a Spotlight RPC command
  **/
@@ -2014,34 +1975,6 @@ bool mds_dispatch(struct mds_ctx *mds_ctx,
 
        response_blob->length = 0;
 
-       /*
-        * Process finished glib events.
-        *
-        * FIXME: integrate with tevent instead of piggy packing it
-        * onto the processing of new requests.
-        *
-        * mds_dispatch() is called by the client a few times in a row:
-        *
-        * - first in order to open/start a search query
-        *
-        * - later in order to fetch results asynchronously, typically
-        *   once a second. If no results have been retrieved from the
-        *   search store (Tracker) yet, we return no results.
-        *   The client asks for more results every second as long
-        *   as the "Search Window" in the client gui is open.
-        *
-        * - at some point the query is closed
-        *
-        * This means we try to iterate through the glib event loop
-        * before processing the request in order to get result
-        * from tracker which can be returned to the client.
-        */
-
-       ok = mds_run_gmainloop(mds_ctx, MDS_TRACKER_ASYNC_TIMEOUT_MS);
-       if (!ok) {
-               goto cleanup;
-       }
-
        DEBUG(10, ("share path: %s\n", mds_ctx->spath));
 
        query = dalloc_new(mds_ctx);
@@ -2102,17 +2035,6 @@ bool mds_dispatch(struct mds_ctx *mds_ctx,
                goto cleanup;
        }
 
-       /*
-        * Run g_main_loop a second time in order to dispatch events
-        * that may have been queued at the libtracker-sparql level.
-        * As we only want to dispatch (write out requests) but not
-        * wait for anything, we use a much shorter timeout here.
-        */
-       ok = mds_run_gmainloop(mds_ctx, MDS_TRACKER_ASYNC_TIMEOUT_MS / 10);
-       if (!ok) {
-               goto cleanup;
-       }
-
        response_blob->length = len;
 
 cleanup:
index e1b1ccfc34d53f4bf062a8312d88afd458686dfe..b9e3e56f785a4d07202895fc29eb901f3cf40760 100644 (file)
@@ -96,15 +96,22 @@ struct sl_inode_path_map {
        char              *path;
 };
 
+/* Per process state */
+struct mdssvc_ctx {
+       struct tevent_context *ev_ctx;
+       GMainContext *gmain_ctx;
+       struct tevent_glib_glue *glue;
+};
+
+/* Per tree connect state */
 struct mds_ctx {
+       struct mdssvc_ctx *mdssvc_ctx;
        struct auth_session_info *pipe_session_info;
        struct dom_sid sid;
        uid_t uid;
        const char *spath;
        GCancellable *gcancellable;
        TrackerSparqlConnection *tracker_con;
-       GMainContext *gcontext;
-       GMainLoop *gmainloop;
        struct sl_query *query_list;     /* list of active queries */
        struct db_context *ino_path_map; /* dbwrap rbt for storing inode->path mappings */
 };
index 85655d340a3cefb06f0871348fa4b462d92d7fe1..12f17a6d41d12cc7773f82176e46ab58790b4e7f 100644 (file)
@@ -150,7 +150,7 @@ bld.SAMBA3_MODULE('rpc_mdssvc_module',
                   mdssvc/srv_mdssvc_nt.c
                   ../../librpc/gen_ndr/srv_mdssvc.c''',
                   init_function='',
-                  deps='samba-util ' + bld.env['libtracker'],
+                  deps='samba-util tevent-glib-glue ' + bld.env['libtracker'],
                   internal_module=bld.SAMBA3_IS_STATIC_MODULE('rpc_mdssvc_module'),
                   enabled=bld.SAMBA3_IS_ENABLED_MODULE('rpc_mdssvc_module'))
 
@@ -205,5 +205,5 @@ bld.SAMBA3_SUBSYSTEM('FSSD',
 
 bld.SAMBA3_SUBSYSTEM('MDSSD',
                     source='mdssd.c',
-                    deps='RPC_SOCK_HELPER RPC_MODULES samba-util',
+                    deps='RPC_SOCK_HELPER RPC_MODULES samba-util tevent-glib-glue',
                     enabled=bld.env.with_spotlight)