From 31d14c5643968a3e758447f615cbe83bb37c4551 Mon Sep 17 00:00:00 2001 From: cristi.posoiu@gmail.com Date: Wed, 29 Oct 2008 02:28:33 +0200 Subject: [PATCH] Fixing big problem w/ GLib integration (queued messages are not dispatched if no netw. activity happens; for example if from your client you call a method on a server which then runs signals that your client captures -> you won't get the signals) --- include/dbus-c++/dispatcher.h | 1 + include/dbus-c++/glib-integration.h | 4 +- src/connection.cpp | 21 ++++++- src/connection_p.h | 4 + src/dispatcher.cpp | 19 ++++++ src/glib-integration.cpp | 118 ++++++++++++++++++++++++++++++++--- 6 files changed, 155 insertions(+), 12 deletions(-) diff --git a/include/dbus-c++/dispatcher.h b/include/dbus-c++/dispatcher.h index c6f4d12..9389b34 100644 --- a/include/dbus-c++/dispatcher.h +++ b/include/dbus-c++/dispatcher.h @@ -101,6 +101,7 @@ public: void queue_connection(Connection::Private *); void dispatch_pending(); + bool has_something_to_dispatch(); virtual void enter() = 0; diff --git a/include/dbus-c++/glib-integration.h b/include/dbus-c++/glib-integration.h index 82f8178..3c58d63 100644 --- a/include/dbus-c++/glib-integration.h +++ b/include/dbus-c++/glib-integration.h @@ -94,7 +94,8 @@ class DXXAPI BusDispatcher : public Dispatcher { public: - BusDispatcher() : _ctx(NULL), _priority(G_PRIORITY_DEFAULT) {} + BusDispatcher(); + ~BusDispatcher(); void attach(GMainContext *); @@ -116,6 +117,7 @@ private: GMainContext *_ctx; int _priority; + GSource *_source; }; } /* namespace Glib */ diff --git a/src/connection.cpp b/src/connection.cpp index 08f573a..0b12cfa 100644 --- a/src/connection.cpp +++ b/src/connection.cpp @@ -85,7 +85,7 @@ void Connection::Private::init() this, &Connection::Private::disconn_filter_function ); - dbus_connection_add_filter(conn, message_filter_stub, &disconn_filter, NULL); + dbus_connection_add_filter(conn, message_filter_stub, &disconn_filter, NULL); // TODO: some assert at least dbus_connection_set_dispatch_status_function(conn, dispatch_status_stub, this, 0); dbus_connection_set_exit_on_disconnect(conn, false); //why was this set to true?? @@ -172,6 +172,25 @@ bool Connection::Private::disconn_filter_function(const Message &msg) return false; } + +DBusDispatchStatus Connection::Private::dispatch_status() +{ + return dbus_connection_get_dispatch_status(conn); +} + + +bool Connection::Private::has_something_to_dispatch() +{ + return dispatch_status() == DBUS_DISPATCH_DATA_REMAINS; +} + + + + + + + + Connection Connection::SystemBus() { return Connection(new Private(DBUS_BUS_SYSTEM)); diff --git a/src/connection_p.h b/src/connection_p.h index 2164382..91aad44 100644 --- a/src/connection_p.h +++ b/src/connection_p.h @@ -63,6 +63,10 @@ struct DXXAPILOCAL Connection::Private void init(); + DBusDispatchStatus dispatch_status(); + bool has_something_to_dispatch(); + + static void dispatch_status_stub(DBusConnection *, DBusDispatchStatus, void *); static DBusHandlerResult message_filter_stub(DBusConnection *, DBusMessage *, void *); diff --git a/src/dispatcher.cpp b/src/dispatcher.cpp index 68c4412..45abc72 100644 --- a/src/dispatcher.cpp +++ b/src/dispatcher.cpp @@ -148,10 +148,29 @@ void Dispatcher::queue_connection(Connection::Private *cp) _mutex_p.unlock(); } + +bool Dispatcher::has_something_to_dispatch() +{ + _mutex_p.lock(); + bool has_something = false; + for(Connection::PrivatePList::iterator it = _pending_queue.begin(); + it != _pending_queue.end() && !has_something; + ++it) + { + has_something = (*it)->has_something_to_dispatch(); + } + + _mutex_p.unlock(); + return has_something; +} + + void Dispatcher::dispatch_pending() { _mutex_p.lock(); + // SEEME: dbus-glib is dispatching only one message at a time to not starve the loop/other things... + while (_pending_queue.size() > 0) { Connection::PrivatePList::iterator i, j; diff --git a/src/glib-integration.cpp b/src/glib-integration.cpp index bde3c7a..c3a0740 100644 --- a/src/glib-integration.cpp +++ b/src/glib-integration.cpp @@ -29,9 +29,10 @@ using namespace DBus; Glib::BusTimeout::BusTimeout(Timeout::Internal *ti, GMainContext *ctx, int priority) -: Timeout(ti), _ctx(ctx), _priority(priority) +: Timeout(ti), _ctx(ctx), _priority(priority), _source(NULL) { - _enable(); + if (Timeout::enabled()) + _enable(); } Glib::BusTimeout::~BusTimeout() @@ -58,6 +59,8 @@ gboolean Glib::BusTimeout::timeout_handler(gpointer data) void Glib::BusTimeout::_enable() { + if (_source) + _disable(); // be sane _source = g_timeout_source_new(Timeout::interval()); g_source_set_priority(_source, _priority); g_source_set_callback(_source, timeout_handler, this, NULL); @@ -66,7 +69,11 @@ void Glib::BusTimeout::_enable() void Glib::BusTimeout::_disable() { - g_source_destroy(_source); + if (_source) + { + g_source_destroy(_source); + _source = NULL; + } } struct BusSource @@ -77,7 +84,7 @@ struct BusSource static gboolean watch_prepare(GSource *source, gint *timeout) { -// debug_log("glib: watch_prepare"); + //debug_log("glib: watch_prepare"); *timeout = -1; return FALSE; @@ -85,7 +92,7 @@ static gboolean watch_prepare(GSource *source, gint *timeout) static gboolean watch_check(GSource *source) { -// debug_log("glib: watch_check"); + //debug_log("glib: watch_check"); BusSource *io = (BusSource *)source; return io->poll.revents ? TRUE : FALSE; @@ -96,7 +103,6 @@ static gboolean watch_dispatch(GSource *source, GSourceFunc callback, gpointer d debug_log("glib: watch_dispatch"); gboolean cb = callback(data); - DBus::default_dispatcher->dispatch_pending(); //TODO: won't work in case of multiple dispatchers return cb; } @@ -108,9 +114,10 @@ static GSourceFuncs watch_funcs = { }; Glib::BusWatch::BusWatch(Watch::Internal *wi, GMainContext *ctx, int priority) -: Watch(wi), _ctx(ctx), _priority(priority) +: Watch(wi), _ctx(ctx), _priority(priority), _source(NULL) { - _enable(); + if (Watch::enabled()) + _enable(); } Glib::BusWatch::~BusWatch() @@ -149,6 +156,8 @@ gboolean Glib::BusWatch::watch_handler(gpointer data) void Glib::BusWatch::_enable() { + if (_source) + _disable(); // be sane _source = g_source_new(&watch_funcs, sizeof(BusSource)); g_source_set_priority(_source, _priority); g_source_set_callback(_source, watch_handler, this, NULL); @@ -158,8 +167,8 @@ void Glib::BusWatch::_enable() if (flags &DBUS_WATCH_READABLE) condition |= G_IO_IN; -// if (flags &DBUS_WATCH_WRITABLE) -// condition |= G_IO_OUT; + if (flags &DBUS_WATCH_WRITABLE) + condition |= G_IO_OUT; if (flags &DBUS_WATCH_ERROR) condition |= G_IO_ERR; if (flags &DBUS_WATCH_HANGUP) @@ -176,16 +185,105 @@ void Glib::BusWatch::_enable() void Glib::BusWatch::_disable() { + if (!_source) + return; GPollFD *poll = &(((BusSource *)_source)->poll); g_source_remove_poll(_source, poll); g_source_destroy(_source); + _source = NULL; +} + +/* + * We need this on top of the IO handlers, because sometimes + * there are messages to dispatch queued up but no IO pending. + * (fixes also a previous problem of code not working in case of multiple dispatchers) +*/ +struct DispatcherSource +{ + GSource source; + Dispatcher *dispatcher; +}; + + +static gboolean +dispatcher_prepare (GSource *source, + gint *timeout) +{ + Dispatcher *dispatcher = ((DispatcherSource*)source)->dispatcher; + + *timeout = -1; + + return dispatcher->has_something_to_dispatch()? TRUE:FALSE; +} + + +static gboolean +dispatcher_check (GSource *source) +{ + return FALSE; +} + + +static gboolean +dispatcher_dispatch (GSource *source, + GSourceFunc callback, + gpointer user_data) +{ + Dispatcher *dispatcher = ((DispatcherSource*)source)->dispatcher; + + dispatcher->dispatch_pending(); + return TRUE; } + +static const GSourceFuncs dispatcher_funcs = { + dispatcher_prepare, + dispatcher_check, + dispatcher_dispatch, + NULL +}; + + +Glib::BusDispatcher::BusDispatcher() + : _ctx(NULL), _priority(G_PRIORITY_DEFAULT), _source(NULL) +{ +} + + + +Glib::BusDispatcher::~BusDispatcher() +{ + if (_source) + { + GSource *temp = _source; + _source = NULL; + + + g_source_destroy (temp); + g_source_unref (temp); + } + + if (_ctx) + g_main_context_unref(_ctx); +} + + + void Glib::BusDispatcher::attach(GMainContext *ctx) { + g_assert(_ctx == NULL); // just to be sane + _ctx = ctx ? ctx : g_main_context_default(); + g_main_context_ref(ctx); + + // create the source for dispatching messages + _source = g_source_new ((GSourceFuncs *) &dispatcher_funcs, + sizeof (DispatcherSource)); + ((DispatcherSource*)_source)->dispatcher = this; + g_source_attach (_source, _ctx); } + Timeout *Glib::BusDispatcher::add_timeout(Timeout::Internal *wi) { Timeout *t = new Glib::BusTimeout(wi, _ctx, _priority); -- 1.5.4.3