From cf4c026f6b7630ff90e52b805a1696cd349e5f64 Mon Sep 17 00:00:00 2001 From: Simon McVittie Date: Wed, 26 Jan 2011 16:41:32 +0000 Subject: [PATCH 5/5] Maintain the set of polled sockets over time This enables us to benefit from epoll(4): polling is now O(number of watch changes + number of active fds), whereas with poll(2) it's O(total number of fds). --- dbus/dbus-mainloop.c | 231 ++++++++++++++++++++++++++++--------------------- 1 files changed, 132 insertions(+), 99 deletions(-) diff --git a/dbus/dbus-mainloop.c b/dbus/dbus-mainloop.c index 271578e..d395152 100644 --- a/dbus/dbus-mainloop.c +++ b/dbus/dbus-mainloop.c @@ -60,12 +60,16 @@ struct DBusLoop int refcount; /** fd => dbus_malloc'd DBusList ** of references to DBusWatch */ DBusHashTable *watches; + DBusSocketSet *socket_set; DBusList *timeouts; int callback_list_serial; int watch_count; int timeout_count; int depth; /**< number of recursive runs */ DBusList *need_dispatch; + /** TRUE if we will skip a watch next time because it was OOM; becomes + * FALSE between polling, and dealing with the results of the poll */ + unsigned oom_watch_pending : 1; }; typedef struct @@ -151,6 +155,8 @@ _dbus_loop_new (void) loop->watches = _dbus_hash_table_new (DBUS_HASH_INT, NULL, free_watch_table_entry); + loop->socket_set = _dbus_socket_set_new (0); + if (loop->watches == NULL) { dbus_free (loop); @@ -190,6 +196,7 @@ _dbus_loop_unref (DBusLoop *loop) } _dbus_hash_table_unref (loop->watches); + _dbus_socket_set_free (loop->socket_set); dbus_free (loop); } } @@ -241,20 +248,57 @@ cull_watches_for_fd (DBusLoop *loop, _dbus_hash_table_remove_int (loop->watches, fd); } -static void +static dbus_bool_t gc_watch_table_entry (DBusLoop *loop, DBusList **watches, int fd) { /* If watches is already NULL we have nothing to do */ if (watches == NULL) - return; + return FALSE; /* We can't GC hash table entries if they're non-empty lists */ if (*watches != NULL) - return; + return FALSE; _dbus_hash_table_remove_int (loop->watches, fd); + return TRUE; +} + +static void +refresh_watches_for_fd (DBusLoop *loop, + DBusList **watches, + int fd) +{ + DBusList *link; + unsigned int flags = 0; + dbus_bool_t interested; + + _dbus_assert (fd != -1); + + if (watches == NULL) + watches = _dbus_hash_table_lookup_int (loop->watches, fd); + + /* we allocated this in the first _dbus_loop_add_watch for the fd, and keep + * it until there are none left */ + _dbus_assert (watches != NULL); + + for (link = _dbus_list_get_first_link (watches); + link != NULL; + link = _dbus_list_get_next_link (watches, link)) + { + if (dbus_watch_get_enabled (link->data) && + !_dbus_watch_get_oom_last_time (link->data)) + { + flags |= dbus_watch_get_flags (link->data); + interested = TRUE; + } + } + + if (interested) + _dbus_socket_set_enable (loop->socket_set, fd, flags); + else + _dbus_socket_set_disable (loop->socket_set, fd); } dbus_bool_t @@ -272,19 +316,32 @@ _dbus_loop_add_watch (DBusLoop *loop, if (watches == NULL) return FALSE; - if (_dbus_list_append (watches, _dbus_watch_ref (watch))) - { - loop->callback_list_serial += 1; - loop->watch_count += 1; - } - else + if (!_dbus_list_append (watches, _dbus_watch_ref (watch))) { _dbus_watch_unref (watch); gc_watch_table_entry (loop, watches, fd); - return FALSE; - } + return FALSE; + } + if (_dbus_list_length_is_one (watches)) + { + if (!_dbus_socket_set_add (loop->socket_set, fd, + dbus_watch_get_flags (watch), + dbus_watch_get_enabled (watch))) + { + _dbus_hash_table_remove_int (loop->watches, fd); + return FALSE; + } + } + else + { + /* we're modifying, not adding, which can't fail with OOM */ + refresh_watches_for_fd (loop, watches, fd); + } + + loop->callback_list_serial += 1; + loop->watch_count += 1; return TRUE; } @@ -292,7 +349,7 @@ void _dbus_loop_toggle_watch (DBusLoop *loop, DBusWatch *watch) { - /* stub */ + refresh_watches_for_fd (loop, NULL, dbus_watch_get_socket (watch)); } void @@ -327,8 +384,11 @@ _dbus_loop_remove_watch (DBusLoop *loop, _dbus_watch_unref (this); /* if that was the last watch for that fd, drop the hash table - * entry too */ - gc_watch_table_entry (loop, watches, fd); + * entry, and stop reserving space for it in the socket set */ + if (gc_watch_table_entry (loop, watches, fd)) + { + _dbus_socket_set_remove (loop->socket_set, fd); + } return; } @@ -546,20 +606,16 @@ _dbus_loop_iterate (DBusLoop *loop, { #define N_STACK_DESCRIPTORS 64 dbus_bool_t retval; - DBusSocketSet *socket_set = NULL; DBusSocketEvent ready_fds[N_STACK_DESCRIPTORS]; int i; DBusList *link; int n_ready; int initial_serial; long timeout; - dbus_bool_t oom_watch_pending; int orig_depth; - DBusHashIter hash_iter; retval = FALSE; - oom_watch_pending = FALSE; orig_depth = loop->depth; #if MAINLOOP_SPEW @@ -571,77 +627,6 @@ _dbus_loop_iterate (DBusLoop *loop, loop->timeouts == NULL) goto next_iteration; - socket_set = _dbus_socket_set_new (loop->watch_count); - - while (socket_set == NULL) - { - _dbus_wait_for_memory (); - socket_set = _dbus_socket_set_new (loop->watch_count); - } - - /* fill our array of fds and watches */ - _dbus_hash_iter_init (loop->watches, &hash_iter); - - while (_dbus_hash_iter_next (&hash_iter)) - { - DBusList **watches; - unsigned int flags; - int fd; - dbus_bool_t enabled; - - fd = _dbus_hash_iter_get_int_key (&hash_iter); - watches = _dbus_hash_iter_get_value (&hash_iter); - flags = 0; - enabled = FALSE; - - for (link = _dbus_list_get_first_link (watches); - link != NULL; - link = _dbus_list_get_next_link (watches, link)) - { - DBusWatch *watch = link->data; - - if (_dbus_watch_get_oom_last_time (watch)) - { - /* we skip this one this time, but reenable it next time, - * and have a timeout on this iteration - */ - _dbus_watch_set_oom_last_time (watch, FALSE); - oom_watch_pending = TRUE; - - retval = TRUE; /* return TRUE here to keep the loop going, - * since we don't know the watch is inactive - */ - -#if MAINLOOP_SPEW - _dbus_verbose (" skipping watch on fd %d as it was out of memory last time\n", - fd); -#endif - } - else if (dbus_watch_get_enabled (watch)) - { - flags |= dbus_watch_get_flags (watch); - enabled = TRUE; - } - } - - if (enabled) - { - _dbus_socket_set_add (socket_set, fd, flags, TRUE); - -#if MAINLOOP_SPEW - _dbus_verbose (" polling watch on fd %d %s\n", - fd, watch_flags_to_string (flags)); -#endif - } - else - { -#if MAINLOOP_SPEW - _dbus_verbose (" skipping disabled watch on fd %d %s\n", - fd, watch_flags_to_string (flags)); -#endif - } - } - timeout = -1; if (loop->timeout_count > 0) { @@ -697,19 +682,59 @@ _dbus_loop_iterate (DBusLoop *loop, #endif } - /* if a watch is OOM, don't wait longer than the OOM + /* if a watch was OOM last time, don't wait longer than the OOM * wait to re-enable it */ - if (oom_watch_pending) + if (loop->oom_watch_pending) timeout = MIN (timeout, _dbus_get_oom_wait ()); #if MAINLOOP_SPEW _dbus_verbose (" polling on %d descriptors timeout %ld\n", n_fds, timeout); #endif - n_ready = _dbus_socket_set_poll (socket_set, ready_fds, + n_ready = _dbus_socket_set_poll (loop->socket_set, ready_fds, _DBUS_N_ELEMENTS (ready_fds), timeout); + /* re-enable any watches we skipped this time */ + if (loop->oom_watch_pending) + { + DBusHashIter hash_iter; + + loop->oom_watch_pending = FALSE; + + _dbus_hash_iter_init (loop->watches, &hash_iter); + + while (_dbus_hash_iter_next (&hash_iter)) + { + DBusList **watches; + int fd; + dbus_bool_t changed; + + changed = FALSE; + fd = _dbus_hash_iter_get_int_key (&hash_iter); + watches = _dbus_hash_iter_get_value (&hash_iter); + + for (link = _dbus_list_get_first_link (watches); + link != NULL; + link = _dbus_list_get_next_link (watches, link)) + { + DBusWatch *watch = link->data; + + if (_dbus_watch_get_oom_last_time (watch)) + { + _dbus_watch_set_oom_last_time (watch, FALSE); + changed = TRUE; + } + } + + if (changed) + refresh_watches_for_fd (loop, watches, fd); + } + + retval = TRUE; /* return TRUE here to keep the loop going, + * since we don't know the watch was inactive */ + } + initial_serial = loop->callback_list_serial; if (loop->timeout_count > 0) @@ -779,6 +804,7 @@ _dbus_loop_iterate (DBusLoop *loop, DBusList **watches; DBusList *next; unsigned int condition; + dbus_bool_t any_oom; /* FIXME I think this "restart if we change the watches" * approach could result in starving watches @@ -813,6 +839,8 @@ _dbus_loop_iterate (DBusLoop *loop, if (watches == NULL) continue; + any_oom = FALSE; + for (link = _dbus_list_get_first_link (watches); link != NULL; link = next) @@ -828,6 +856,8 @@ _dbus_loop_iterate (DBusLoop *loop, if (oom) { _dbus_watch_set_oom_last_time (link->data, TRUE); + loop->oom_watch_pending = TRUE; + any_oom = TRUE; } #if MAINLOOP_SPEW @@ -838,13 +868,19 @@ _dbus_loop_iterate (DBusLoop *loop, /* We re-check this every time, in case the callback * added/removed watches, which might make our position in * the linked list invalid. See the FIXME above. */ - if (initial_serial != loop->callback_list_serial) - goto next_iteration; + if (initial_serial != loop->callback_list_serial || + loop->depth != orig_depth) + { + if (any_oom) + refresh_watches_for_fd (loop, NULL, ready_fds[i].fd); - if (loop->depth != orig_depth) - goto next_iteration; + goto next_iteration; + } } } + + if (any_oom) + refresh_watches_for_fd (loop, watches, ready_fds[i].fd); } } @@ -853,9 +889,6 @@ _dbus_loop_iterate (DBusLoop *loop, _dbus_verbose (" moving to next iteration\n"); #endif - if (socket_set) - _dbus_socket_set_free (socket_set); - if (_dbus_loop_dispatch (loop)) retval = TRUE; -- 1.7.2.3