From 5cfada20188c96de9af78be3fe149b87b878cce0 Mon Sep 17 00:00:00 2001 From: Alban Crequy Date: Tue, 8 Jul 2014 16:26:21 +0100 Subject: [PATCH] pending replies: use better data structures than a linked list BusExpireList used to keep track of BusExpireItems/BusPendingReply in a unique linked list without knowledge of the BusPendingReply struct. Iterating on the linked list takes a linear time on the number of items. It is inefficient because it iterates on items not relevant to the DBusConnection considered. The total number of BusExpireItems on the bus can be quite large: with the default limits on the system bus, it is 2097152 per user: max_replies_per_connection * max_connections_per_user = (1024 * 8) * 256 The items in the BusExpireList are looked up in 3 different ways: 1. When sending a D-Bus method call in bus_connections_expect_reply(): check whether the exact triplet (DBusConnection caller, DBusConnection callee, dbus_uint32_t reply_serial) already exists and check whether the number of pending replies associated to the caller is over max_replies_per_connection. 2. When sending a D-Bus return call in bus_connections_check_reply(): find the exact triplet (DBusConnection caller, DBusConnection callee, dbus_uint32_t reply_serial) 3. When a DBusConnection disconnects, iterate on the pending replies associated either with the connection as a caller or as a callee. It becomes overly complicated to make a better data structure for theses use cases without BusExpireItem having a reference to the caller, callee and serial. BusExpireList was written in a generic way but is only used for pending replies. This patch gets rid of the generic programming and store each BusExpireItem in two hash tables keyed respectively by DBusConnection caller and by DBusConnection callee. In order to implement transactions correctly, a pending reply needs to be removed and re-added in the BusExpireList without memory allocations. It was done with linked lists by keeping the reference to the DBusList in CheckPendingReplyData. In this new implementation, BusExpireItem has a new field "dbus_bool_t deleted" and the BusExpireItem is only marked as deleted when preparing the transaction and removed effectively when the transaction finishes, with the bus_transaction_add_cancel_hook callbacks. https://bugs.freedesktop.org/show_bug.cgi?id=81053 v1: first version v2: - add some documentation - use the new API in the unit test - fix counting of deleted & non-deleted items - fix double free in bus_connections_check_reply - fix caller/callee confusion in bus_expire_list_foreach - fix memleaks in case of OOM in _bus_expire_list_add_helper --- bus/connection.c | 226 +++++++++----------- bus/expirelist.c | 633 ++++++++++++++++++++++++++++++++++++++++++++++++------- bus/expirelist.h | 39 ++-- 3 files changed, 682 insertions(+), 216 deletions(-) diff --git a/bus/connection.c b/bus/connection.c index ea2d155..4985e3c 100644 --- a/bus/connection.c +++ b/bus/connection.c @@ -39,17 +39,6 @@ static void bus_connection_remove_transactions (DBusConnection *connection); -typedef struct -{ - BusExpireItem expire_item; - - DBusConnection *will_get_reply; - DBusConnection *will_send_reply; - - dbus_uint32_t reply_serial; - -} BusPendingReply; - struct BusConnections { int refcount; @@ -105,7 +94,7 @@ typedef struct } BusConnectionData; static dbus_bool_t bus_pending_reply_expired (BusExpireList *list, - DBusList *link, + BusExpireItem *pending, void *data); static void bus_connection_drop_pending_replies (BusConnections *connections, @@ -1517,7 +1506,7 @@ bus_connections_check_limits (BusConnections *connections, } static void -bus_pending_reply_free (BusPendingReply *pending) +bus_pending_reply_free (BusExpireItem *pending) { _dbus_verbose ("Freeing pending reply %p, replier %p receiver %p serial %u\n", pending, @@ -1531,7 +1520,7 @@ bus_pending_reply_free (BusPendingReply *pending) static dbus_bool_t bus_pending_reply_send_no_reply (BusConnections *connections, BusTransaction *transaction, - BusPendingReply *pending) + BusExpireItem *pending) { DBusMessage *message; DBusMessageIter iter; @@ -1572,10 +1561,9 @@ bus_pending_reply_send_no_reply (BusConnections *connections, static dbus_bool_t bus_pending_reply_expired (BusExpireList *list, - DBusList *link, + BusExpireItem *pending, void *data) { - BusPendingReply *pending = link->data; BusConnections *connections = data; BusTransaction *transaction; @@ -1602,14 +1590,59 @@ bus_pending_reply_expired (BusExpireList *list, return FALSE; } - bus_expire_list_remove_link (connections->pending_replies, link); + bus_expire_list_mark_deleted (connections->pending_replies, pending); - bus_pending_reply_free (pending); bus_transaction_execute_and_free (transaction); return TRUE; } +static dbus_bool_t +drop_caller_cb (BusExpireList *list, + BusExpireItem *pending, + void *data) +{ + BusConnections *connections = data; + + /* We don't need to track this pending reply anymore */ + + _dbus_verbose ("Dropping pending reply %p, replier %p receiver %p serial %u\n", + pending, + pending->will_send_reply, + pending->will_get_reply, + pending->reply_serial); + + bus_expire_list_mark_deleted (connections->pending_replies, pending); + bus_expire_list_recheck_immediately (connections->pending_replies); + + return TRUE; +} + +static dbus_bool_t +drop_callee_cb (BusExpireList *list, + BusExpireItem *pending, + void *data) +{ + BusConnections *connections = data; + + /* The reply isn't going to be sent, so set things + * up so it will be expired right away + */ + _dbus_verbose ("Will expire pending reply %p, replier %p receiver %p serial %u\n", + pending, + pending->will_send_reply, + pending->will_get_reply, + pending->reply_serial); + + pending->will_send_reply = NULL; + pending->added_tv_sec = 0; + pending->added_tv_usec = 0; + + bus_expire_list_recheck_immediately (connections->pending_replies); + + return TRUE; +} + static void bus_connection_drop_pending_replies (BusConnections *connections, DBusConnection *connection) @@ -1617,61 +1650,17 @@ bus_connection_drop_pending_replies (BusConnections *connections, /* The DBusConnection is almost 100% finalized here, so you can't * do anything with it except check for pointer equality */ - DBusList *link; - _dbus_verbose ("Dropping pending replies that involve connection %p\n", connection); - - link = bus_expire_list_get_first_link (connections->pending_replies); - while (link != NULL) - { - DBusList *next; - BusPendingReply *pending; - next = bus_expire_list_get_next_link (connections->pending_replies, - link); - pending = link->data; - - if (pending->will_get_reply == connection) - { - /* We don't need to track this pending reply anymore */ - - _dbus_verbose ("Dropping pending reply %p, replier %p receiver %p serial %u\n", - pending, - pending->will_send_reply, - pending->will_get_reply, - pending->reply_serial); - - bus_expire_list_remove_link (connections->pending_replies, - link); - bus_pending_reply_free (pending); - } - else if (pending->will_send_reply == connection) - { - /* The reply isn't going to be sent, so set things - * up so it will be expired right away - */ - _dbus_verbose ("Will expire pending reply %p, replier %p receiver %p serial %u\n", - pending, - pending->will_send_reply, - pending->will_get_reply, - pending->reply_serial); - - pending->will_send_reply = NULL; - pending->expire_item.added_tv_sec = 0; - pending->expire_item.added_tv_usec = 0; - - bus_expire_list_recheck_immediately (connections->pending_replies); - } - - link = next; - } + bus_expire_list_foreach (connections->pending_replies, connection, + drop_caller_cb, drop_callee_cb, connections); } typedef struct { - BusPendingReply *pending; + BusExpireItem *pending; BusConnections *connections; } CancelPendingReplyData; @@ -1683,7 +1672,7 @@ cancel_pending_reply (void *data) _dbus_verbose ("d = %p\n", d); if (!bus_expire_list_remove (d->connections->pending_replies, - &d->pending->expire_item)) + d->pending)) _dbus_assert_not_reached ("pending reply did not exist to be cancelled"); bus_pending_reply_free (d->pending); /* since it's been cancelled */ @@ -1715,11 +1704,9 @@ bus_connections_expect_reply (BusConnections *connections, DBusMessage *reply_to_this, DBusError *error) { - BusPendingReply *pending; + BusExpireItem *pending; dbus_uint32_t reply_serial; - DBusList *link; CancelPendingReplyData *cprd; - int count; _dbus_assert (will_get_reply != NULL); _dbus_assert (will_send_reply != NULL); @@ -1730,28 +1717,18 @@ bus_connections_expect_reply (BusConnections *connections, reply_serial = dbus_message_get_serial (reply_to_this); - link = bus_expire_list_get_first_link (connections->pending_replies); - count = 0; - while (link != NULL) + pending = bus_expire_list_lookup (connections->pending_replies, + will_get_reply, will_send_reply, + reply_serial); + if (pending) { - pending = link->data; - - if (pending->reply_serial == reply_serial && - pending->will_get_reply == will_get_reply && - pending->will_send_reply == will_send_reply) - { - dbus_set_error (error, DBUS_ERROR_ACCESS_DENIED, - "Message has the same reply serial as a currently-outstanding existing method call"); - return FALSE; - } - - link = bus_expire_list_get_next_link (connections->pending_replies, - link); - if (pending->will_get_reply == will_get_reply) - ++count; + dbus_set_error (error, DBUS_ERROR_ACCESS_DENIED, + "Message has the same reply serial as a currently-outstanding existing method call"); + return FALSE; } - - if (count >= + + if (bus_expire_list_item_count_per_caller (connections->pending_replies, + will_get_reply) >= bus_context_get_max_replies_per_connection (connections->context)) { dbus_set_error (error, DBUS_ERROR_LIMITS_EXCEEDED, @@ -1759,7 +1736,7 @@ bus_connections_expect_reply (BusConnections *connections, return FALSE; } - pending = dbus_new0 (BusPendingReply, 1); + pending = dbus_new0 (BusExpireItem, 1); if (pending == NULL) { BUS_SET_OOM (error); @@ -1768,8 +1745,8 @@ bus_connections_expect_reply (BusConnections *connections, #ifdef DBUS_ENABLE_VERBOSE_MODE /* so we can see a not-yet-added pending reply */ - pending->expire_item.added_tv_sec = 1; - pending->expire_item.added_tv_usec = 1; + pending->added_tv_sec = 1; + pending->added_tv_usec = 1; #endif pending->will_get_reply = will_get_reply; @@ -1785,7 +1762,7 @@ bus_connections_expect_reply (BusConnections *connections, } if (!bus_expire_list_add (connections->pending_replies, - &pending->expire_item)) + pending)) { BUS_SET_OOM (error); dbus_free (cprd); @@ -1799,7 +1776,7 @@ bus_connections_expect_reply (BusConnections *connections, cancel_pending_reply_data_free)) { BUS_SET_OOM (error); - bus_expire_list_remove (connections->pending_replies, &pending->expire_item); + bus_expire_list_remove (connections->pending_replies, pending); dbus_free (cprd); bus_pending_reply_free (pending); return FALSE; @@ -1808,8 +1785,8 @@ bus_connections_expect_reply (BusConnections *connections, cprd->pending = pending; cprd->connections = connections; - _dbus_get_monotonic_time (&pending->expire_item.added_tv_sec, - &pending->expire_item.added_tv_usec); + _dbus_get_monotonic_time (&pending->added_tv_sec, + &pending->added_tv_usec); _dbus_verbose ("Added pending reply %p, replier %p receiver %p serial %u\n", pending, @@ -1822,45 +1799,42 @@ bus_connections_expect_reply (BusConnections *connections, typedef struct { - DBusList *link; - BusConnections *connections; + BusExpireItem *item; + BusConnections *connections; } CheckPendingReplyData; static void cancel_check_pending_reply (void *data) { CheckPendingReplyData *d = data; + BusExpireItem *item = d->item; _dbus_verbose ("d = %p\n",d); - bus_expire_list_add_link (d->connections->pending_replies, - d->link); - d->link = NULL; + bus_expire_list_restore (d->connections->pending_replies, + item); + d->item = NULL; } static void check_pending_reply_data_free (void *data) { CheckPendingReplyData *d = data; + BusExpireItem *pending = d->item; _dbus_verbose ("d = %p\n",d); - if (d->link != NULL) + if (pending != NULL) { - BusPendingReply *pending = d->link->data; - - _dbus_assert (!bus_expire_list_contains_item (d->connections->pending_replies, - &pending->expire_item)); - + bus_expire_list_remove (d->connections->pending_replies, pending); bus_pending_reply_free (pending); - _dbus_list_free_link (d->link); } dbus_free (d); } /* - * Check whether a reply is allowed, remove BusPendingReply + * Check whether a reply is allowed, remove BusExpireItem * if so, return TRUE if so. */ dbus_bool_t @@ -1872,7 +1846,7 @@ bus_connections_check_reply (BusConnections *connections, DBusError *error) { CheckPendingReplyData *cprd; - DBusList *link; + BusExpireItem *pending; dbus_uint32_t reply_serial; _dbus_assert (sending_reply != NULL); @@ -1880,24 +1854,14 @@ bus_connections_check_reply (BusConnections *connections, reply_serial = dbus_message_get_reply_serial (reply); - link = bus_expire_list_get_first_link (connections->pending_replies); - while (link != NULL) + pending = bus_expire_list_lookup (connections->pending_replies, + receiving_reply, sending_reply, + reply_serial); + if (pending) { - BusPendingReply *pending = link->data; - - if (pending->reply_serial == reply_serial && - pending->will_get_reply == receiving_reply && - pending->will_send_reply == sending_reply) - { - _dbus_verbose ("Found pending reply with serial %u\n", reply_serial); - break; - } - - link = bus_expire_list_get_next_link (connections->pending_replies, - link); + _dbus_verbose ("Found pending reply with serial %u\n", reply_serial); } - - if (link == NULL) + else { _dbus_verbose ("No pending reply expected\n"); @@ -1911,6 +1875,9 @@ bus_connections_check_reply (BusConnections *connections, return FALSE; } + /* The transaction could be either executed or cancelled. In both cases, + * cprd->item will be removed and released by check_pending_reply_data_free() + */ if (!bus_transaction_add_cancel_hook (transaction, cancel_check_pending_reply, cprd, @@ -1921,14 +1888,9 @@ bus_connections_check_reply (BusConnections *connections, return FALSE; } - cprd->link = link; + cprd->item = pending; cprd->connections = connections; - bus_expire_list_unlink (connections->pending_replies, - link); - - _dbus_assert (!bus_expire_list_contains_item (connections->pending_replies, link->data)); - return TRUE; } diff --git a/bus/expirelist.c b/bus/expirelist.c index 9a3886e..4f98d03 100644 --- a/bus/expirelist.c +++ b/bus/expirelist.c @@ -27,10 +27,41 @@ #include #include #include +#include struct BusExpireList { - DBusList *items; /**< List of BusExpireItem */ + /* hash: + * - key: DBusConnection *caller + * - value: hash: + * - key: DBusConnection *callee + * - value: hash: + * - key: int reply_serial + * - value: BusExpireItem *item + */ + DBusHashTable *caller; + + /* hash: + * - key: DBusConnection *callee + * - value: hash: + * - key: DBusConnection *caller + * - value: hash: + * - key: int reply_serial + * - value: BusExpireItem *item + */ + DBusHashTable *callee; + + /* hash: + * - key: DBusConnection *caller + * - value: dbus_uint32_t count + * + * Useful to implement max_replies_per_connection + */ + DBusHashTable *item_count_per_caller; + + int deleted_items_count; + int non_deleted_items_count; + DBusTimeout *timeout; DBusLoop *loop; BusExpireFunc expire_func; @@ -40,6 +71,14 @@ struct BusExpireList static dbus_bool_t expire_timeout_handler (void *data); +static void +_safe_hash_unref (DBusHashTable *table) +{ + if (table) + _dbus_hash_table_unref (table); +} + + BusExpireList* bus_expire_list_new (DBusLoop *loop, int expire_after, @@ -57,6 +96,13 @@ bus_expire_list_new (DBusLoop *loop, list->loop = loop; list->expire_after = expire_after; + list->caller = _dbus_hash_table_new (DBUS_HASH_UINTPTR, + NULL, (DBusFreeFunction) _safe_hash_unref); + list->callee = _dbus_hash_table_new (DBUS_HASH_UINTPTR, + NULL, (DBusFreeFunction) _safe_hash_unref); + list->item_count_per_caller = _dbus_hash_table_new (DBUS_HASH_UINTPTR, + NULL, NULL); + list->timeout = _dbus_timeout_new (100, /* irrelevant */ expire_timeout_handler, list, NULL); @@ -82,7 +128,11 @@ bus_expire_list_new (DBusLoop *loop, void bus_expire_list_free (BusExpireList *list) { - _dbus_assert (list->items == NULL); + _dbus_assert (list->non_deleted_items_count == 0); + + _dbus_hash_table_unref (list->caller); + _dbus_hash_table_unref (list->callee); + _dbus_hash_table_unref (list->item_count_per_caller); _dbus_loop_remove_timeout (list->loop, list->timeout); @@ -123,63 +173,142 @@ bus_expire_list_recheck_immediately (BusExpireList *list) } static int +_bus_expire_list_cleanup_helper (BusExpireList *list, + DBusHashTable *hash1, + DBusFreeFunction f) +{ + int count = 0; + DBusHashIter iter1; + + _dbus_hash_iter_init (hash1, &iter1); + while (_dbus_hash_iter_next (&iter1)) + { + DBusHashTable *hash2 = _dbus_hash_iter_get_value (&iter1); + DBusHashIter iter2; + _dbus_hash_iter_init (hash2, &iter2); + while (_dbus_hash_iter_next (&iter2)) + { + DBusHashTable *hash3 = _dbus_hash_iter_get_value (&iter2); + DBusHashIter iter3; + _dbus_hash_iter_init (hash3, &iter3); + while (_dbus_hash_iter_next (&iter3)) + { + BusExpireItem *item = _dbus_hash_iter_get_value (&iter3); + + if (!item->deleted) + continue; + + count++; + + /* It is safe to remove the entry while we walk the iterator + * but not to remove iter2 while we are in the iter3 loop. + */ + if (f) + f (item); + _dbus_hash_iter_remove_entry (&iter3); + } + if (_dbus_hash_table_get_n_entries (hash3) == 0) + { + _dbus_hash_iter_remove_entry (&iter2); + } + } + if (_dbus_hash_table_get_n_entries (hash2) == 0) + { + _dbus_hash_iter_remove_entry (&iter1); + } + } + + return count; +} + +static int do_expiration_with_monotonic_time (BusExpireList *list, long tv_sec, long tv_usec) { - DBusList *link; + DBusHashIter iter1; int next_interval, min_wait_time, items_to_expire; next_interval = -1; min_wait_time = 3600 * 1000; /* this is reset anyway if used */ items_to_expire = 0; - - link = _dbus_list_get_first_link (&list->items); - while (link != NULL) - { - DBusList *next = _dbus_list_get_next_link (&list->items, link); - double elapsed; - BusExpireItem *item; - item = link->data; - - elapsed = ELAPSED_MILLISECONDS_SINCE (item->added_tv_sec, - item->added_tv_usec, - tv_sec, tv_usec); - - if (((item->added_tv_sec == 0) && (item->added_tv_usec == 0)) || - ((list->expire_after > 0) && (elapsed >= (double) list->expire_after))) + _dbus_hash_iter_init (list->caller, &iter1); + while (_dbus_hash_iter_next (&iter1)) + { + DBusHashTable *hash2 = _dbus_hash_iter_get_value (&iter1); + DBusHashIter iter2; + _dbus_hash_iter_init (hash2, &iter2); + while (_dbus_hash_iter_next (&iter2)) { - _dbus_verbose ("Expiring an item %p\n", item); - - /* If the expire function fails, we just end up expiring - * this item next time we walk through the list. This would - * be an indeterminate time normally, so we set up the - * next_interval to be "shortly" (just enough to avoid - * a busy loop) - */ - if (!(* list->expire_func) (list, link, list->data)) + DBusHashTable *hash3 = _dbus_hash_iter_get_value (&iter2); + DBusHashIter iter3; + _dbus_hash_iter_init (hash3, &iter3); + while (_dbus_hash_iter_next (&iter3)) { - next_interval = _dbus_get_oom_wait (); - break; + BusExpireItem *item = _dbus_hash_iter_get_value (&iter3); + double elapsed; + + if (item->deleted) + continue; + + elapsed = ELAPSED_MILLISECONDS_SINCE (item->added_tv_sec, + item->added_tv_usec, + tv_sec, tv_usec); + + if (((item->added_tv_sec == 0) && (item->added_tv_usec == 0)) || + ((list->expire_after > 0) && (elapsed >= (double) list->expire_after))) + { + _dbus_verbose ("Expiring an item %p\n", item); + + /* If the expire function fails, we just end up expiring + * this item next time we walk through the list. This would + * be an indeterminate time normally, so we set up the + * next_interval to be "shortly" (just enough to avoid + * a busy loop) + */ + if (!(* list->expire_func) (list, item, list->data)) + { + next_interval = _dbus_get_oom_wait (); + break; + } + } + else if (list->expire_after > 0) + { + double to_wait; + + items_to_expire = 1; + to_wait = (double) list->expire_after - elapsed; + if (min_wait_time > to_wait) + min_wait_time = to_wait; + } } } - else if (list->expire_after > 0) - { - double to_wait; - - items_to_expire = 1; - to_wait = (double) list->expire_after - elapsed; - if (min_wait_time > to_wait) - min_wait_time = to_wait; - } - - link = next; } if (next_interval < 0 && items_to_expire) next_interval = min_wait_time; + if (list->deleted_items_count > 0) + { +#ifdef DBUS_DISABLE_ASSERT + /* Since each item is both in list->caller and list->callee, we must + * take care not to double-free it. So DBusFreeFunction is NULL for the + * first call and dbus_free for the second call. + */ + _bus_expire_list_cleanup_helper (list, list->caller, NULL); + _bus_expire_list_cleanup_helper (list, list->callee, dbus_free); +#else + int count1 = _bus_expire_list_cleanup_helper (list, list->caller, NULL); + int count2 = _bus_expire_list_cleanup_helper (list, list->callee, dbus_free); + + _dbus_assert (count1 == list->deleted_items_count); + _dbus_assert (count2 == list->deleted_items_count); +#endif + + list->deleted_items_count = 0; + } + return next_interval; } @@ -190,7 +319,7 @@ bus_expirelist_expire (BusExpireList *list) next_interval = -1; - if (list->items != NULL) + if (list->non_deleted_items_count > 0) { long tv_sec, tv_usec; @@ -215,25 +344,193 @@ expire_timeout_handler (void *data) return TRUE; } -void -bus_expire_list_remove_link (BusExpireList *list, - DBusList *link) +static dbus_bool_t +_bus_expire_list_remove_helper (DBusHashTable *hash1, + void *key1, void *key2, + BusExpireItem *item) { - _dbus_list_remove_link (&list->items, link); + DBusHashIter iter1; + DBusHashIter iter2; + DBusHashIter iter3; + DBusHashTable *hash2; + DBusHashTable *hash3; + dbus_bool_t found; + + found = _dbus_hash_iter_lookup (hash1, key1, + FALSE, &iter1); + if (!found) + return FALSE; + + hash2 = _dbus_hash_iter_get_value (&iter1); + found = _dbus_hash_iter_lookup (hash2, key2, + FALSE, &iter2); + if (!found) + return FALSE; + + hash3 = _dbus_hash_iter_get_value (&iter2); + found = _dbus_hash_iter_lookup (hash3, + _DBUS_INT_TO_POINTER (item->reply_serial), + FALSE, &iter3); + if (!found) + return FALSE; + + _dbus_assert (item == _dbus_hash_iter_get_value (&iter3)); + + _dbus_hash_iter_remove_entry (&iter3); + if (_dbus_hash_table_get_n_entries (hash3) == 0) + _dbus_hash_iter_remove_entry (&iter2); + if (_dbus_hash_table_get_n_entries (hash2) == 0) + _dbus_hash_iter_remove_entry (&iter1); + + return TRUE; } +/** + * Remove an item immediately from the list. The item's memory is not released + * and the caller is still responsible to release it. + */ dbus_bool_t bus_expire_list_remove (BusExpireList *list, BusExpireItem *item) { - return _dbus_list_remove (&list->items, item); + DBusHashIter iter; + dbus_bool_t found; + int value; + + /* items marked deleted are released by do_expiration_with_monotonic_time(). + * bus_expire_list_remove() is not supposed to be called on items already + * scheduled for deletion. */ + _dbus_assert (!item->deleted); + + found = _bus_expire_list_remove_helper (list->caller, item->will_get_reply, + item->will_send_reply, item); + if (!found) + return FALSE; + + found = _bus_expire_list_remove_helper (list->callee, item->will_send_reply, + item->will_get_reply, item); + _dbus_assert (found); + + list->non_deleted_items_count -= 1; + + found = _dbus_hash_iter_lookup (list->item_count_per_caller, + item->will_get_reply, + FALSE, &iter); + _dbus_assert (found); + value = _DBUS_POINTER_TO_INT (_dbus_hash_iter_get_value (&iter)); + value += 1; + _dbus_hash_iter_set_value (&iter, _DBUS_INT_TO_POINTER (value)); + + return TRUE; } +/** + * Mark the item for deletion next time do_expiration_with_monotonic_time() is + * called. The item's memory will also be released by dbus_free. + */ void -bus_expire_list_unlink (BusExpireList *list, - DBusList *link) +bus_expire_list_mark_deleted (BusExpireList *list, + BusExpireItem *item) { - _dbus_list_unlink (&list->items, link); + _dbus_assert (item != NULL); + _dbus_assert (!item->deleted); + _dbus_assert (list->non_deleted_items_count > 0); + + item->deleted = TRUE; + list->deleted_items_count += 1; + list->non_deleted_items_count -= 1; +} + +/** + * Add the item in either the caller hash table or the callee hash table. + * + * There are several indirections between the hash table and the item. In case + * of OOM, the changes are reverted. In case of success, rollback_iter + * provides a way to the caller to roll back the changes. + * + * @returns FALSE on OOM + */ +static dbus_bool_t +_bus_expire_list_add_helper (DBusHashTable *hash1, + void *key1, void *key2, + BusExpireItem *item, + DBusHashIter *rollback_iter) +{ + DBusHashIter iter1; + DBusHashIter iter2; + DBusHashIter iter3; + DBusHashTable *hash2; + DBusHashTable *hash3; + dbus_bool_t found1, found2, found3, ret; + + _dbus_assert (hash1); + found1 = _dbus_hash_iter_lookup (hash1, key1, + FALSE, &iter1); + if (found1) + { + hash2 = _dbus_hash_iter_get_value (&iter1); + _dbus_assert (hash2); + } + else + { + hash2 = _dbus_hash_table_new (DBUS_HASH_UINTPTR, + NULL, (DBusFreeFunction) _safe_hash_unref); + if (!hash2) + return FALSE; + + ret = _dbus_hash_table_insert_uintptr (hash1, + _DBUS_POINTER_TO_INT (key1), hash2); + if (ret) + { + _dbus_hash_iter_lookup (hash1, key1, FALSE, rollback_iter); + } + else + { + _dbus_hash_table_unref (hash2); + return FALSE; + } + } + + found2 = _dbus_hash_iter_lookup (hash2, key2, + FALSE, &iter2); + if (found2) + { + hash3 = _dbus_hash_iter_get_value (&iter2); + _dbus_assert (hash3); + } + else + { + hash3 = _dbus_hash_table_new (DBUS_HASH_INT, NULL, NULL); + if (!hash3) + return FALSE; + + ret = _dbus_hash_table_insert_uintptr (hash2, + _DBUS_POINTER_TO_INT (key2), hash3); + if (ret) + { + if (!found1) + _dbus_hash_iter_lookup (hash2, key2, FALSE, rollback_iter); + } + else + { + _dbus_hash_table_unref (hash3); + return FALSE; + } + } + + found3 = _dbus_hash_iter_lookup (hash3, + _DBUS_INT_TO_POINTER (item->reply_serial), FALSE, &iter3); + _dbus_assert (!found3); + + ret = _dbus_hash_table_insert_int (hash3, item->reply_serial, item); + if (ret) + { + if (!found1 && !found2) + _dbus_hash_iter_lookup (hash3, _DBUS_INT_TO_POINTER (item->reply_serial), + FALSE, rollback_iter); + } + + return ret; } dbus_bool_t @@ -241,44 +538,157 @@ bus_expire_list_add (BusExpireList *list, BusExpireItem *item) { dbus_bool_t ret; + DBusHashIter rollback_iter1, rollback_iter2; + int value; + + _dbus_assert (item != NULL); + _dbus_assert (!item->deleted); + + ret = _bus_expire_list_add_helper (list->caller, item->will_get_reply, + item->will_send_reply, item, + &rollback_iter1); + if (!ret) + return FALSE; + + ret = _bus_expire_list_add_helper (list->callee, item->will_send_reply, + item->will_get_reply, item, + &rollback_iter2); + if (!ret) + { + _dbus_hash_iter_remove_entry (&rollback_iter1); + return FALSE; + } - ret = _dbus_list_prepend (&list->items, item); - if (ret && !dbus_timeout_get_enabled (list->timeout)) + value = _DBUS_POINTER_TO_INT (_dbus_hash_table_lookup_uintptr (list->item_count_per_caller, + _DBUS_POINTER_TO_INT (item->will_get_reply))); + value += 1; + ret = _dbus_hash_table_insert_uintptr (list->item_count_per_caller, + _DBUS_POINTER_TO_INT (item->will_get_reply), + _DBUS_INT_TO_POINTER (value)); + if (!ret) + { + _dbus_hash_iter_remove_entry (&rollback_iter1); + _dbus_hash_iter_remove_entry (&rollback_iter2); + return FALSE; + } + + list->non_deleted_items_count += 1; + + if (!dbus_timeout_get_enabled (list->timeout)) bus_expire_timeout_set_interval (list->timeout, 0); - return ret; + return TRUE; } +/** + * Re-add an item that has been marked for deletion with + * bus_expire_list_mark_deleted(). + * + * It is not allowed to restore an item if do_expiration_with_monotonic_time() + * has been called after it was marked for deletion. + */ void -bus_expire_list_add_link (BusExpireList *list, - DBusList *link) +bus_expire_list_restore (BusExpireList *list, + BusExpireItem *item) { - _dbus_assert (link->data != NULL); - - _dbus_list_prepend_link (&list->items, link); + _dbus_assert (item != NULL); + _dbus_assert (item->deleted); + _dbus_assert (list->deleted_items_count > 0); + + item->deleted = FALSE; + + list->non_deleted_items_count += 1; + list->deleted_items_count -= 1; if (!dbus_timeout_get_enabled (list->timeout)) bus_expire_timeout_set_interval (list->timeout, 0); } -DBusList* -bus_expire_list_get_first_link (BusExpireList *list) +int +bus_expire_list_item_count_per_caller (BusExpireList *list, + DBusConnection *caller) { - return _dbus_list_get_first_link (&list->items); + void *value; + + value = _dbus_hash_table_lookup_uintptr (list->item_count_per_caller, + _DBUS_POINTER_TO_INT (caller)); + return _DBUS_POINTER_TO_INT (value); } -DBusList* -bus_expire_list_get_next_link (BusExpireList *list, - DBusList *link) +void +bus_expire_list_foreach (BusExpireList *list, + DBusConnection *conn, + BusExpireFunc caller_func, + BusExpireFunc callee_func, + void *data) { - return _dbus_list_get_next_link (&list->items, link); + DBusHashIter iter1; + + if (caller_func && _dbus_hash_iter_lookup (list->caller, conn, FALSE, &iter1)) + { + DBusHashTable *hash2 = _dbus_hash_iter_get_value (&iter1); + DBusHashIter iter2; + _dbus_hash_iter_init (hash2, &iter2); + while (_dbus_hash_iter_next (&iter2)) + { + DBusHashTable *hash3 = _dbus_hash_iter_get_value (&iter2); + DBusHashIter iter3; + _dbus_hash_iter_init (hash3, &iter3); + while (_dbus_hash_iter_next (&iter3)) + { + BusExpireItem *item = _dbus_hash_iter_get_value (&iter3); + if (item->deleted) + continue; + caller_func (list, item, data); + } + } + } + + if (callee_func && _dbus_hash_iter_lookup (list->callee, conn, FALSE, &iter1)) + { + DBusHashTable *hash2 = _dbus_hash_iter_get_value (&iter1); + DBusHashIter iter2; + _dbus_hash_iter_init (hash2, &iter2); + while (_dbus_hash_iter_next (&iter2)) + { + DBusHashTable *hash3 = _dbus_hash_iter_get_value (&iter2); + DBusHashIter iter3; + _dbus_hash_iter_init (hash3, &iter3); + while (_dbus_hash_iter_next (&iter3)) + { + BusExpireItem *item = _dbus_hash_iter_get_value (&iter3); + if (item->deleted) + continue; + callee_func (list, item, data); + } + } + } } -dbus_bool_t -bus_expire_list_contains_item (BusExpireList *list, - BusExpireItem *item) +BusExpireItem* +bus_expire_list_lookup (BusExpireList *list, + DBusConnection *caller, + DBusConnection *callee, + dbus_uint32_t reply_serial) { - return _dbus_list_find_last (&list->items, item) != NULL; + DBusHashTable *hash; + BusExpireItem *item; + + hash = _dbus_hash_table_lookup_uintptr (list->caller, + _DBUS_POINTER_TO_INT (caller)); + if (!hash) + return NULL; + + hash = _dbus_hash_table_lookup_uintptr (hash, + _DBUS_POINTER_TO_INT (callee)); + if (!hash) + return NULL; + + item = _dbus_hash_table_lookup_int (hash, reply_serial); + if (!item || item->deleted) + return NULL; + + return item; } #ifdef DBUS_ENABLE_EMBEDDED_TESTS @@ -291,12 +701,12 @@ typedef struct static dbus_bool_t test_expire_func (BusExpireList *list, - DBusList *link, + BusExpireItem *item, void *data) { TestExpireItem *t; - t = (TestExpireItem*) link->data; + t = (TestExpireItem*) item; t->expire_count += 1; @@ -327,9 +737,11 @@ bus_expire_list_test (const DBusString *test_data_dir) long tv_sec_expired, tv_usec_expired; long tv_sec_past, tv_usec_past; TestExpireItem *item; +#define TEST_CONN_COUNT 6 + TestExpireItem *items[TEST_CONN_COUNT][TEST_CONN_COUNT][TEST_CONN_COUNT]; int next_interval; dbus_bool_t result = FALSE; - + int i, j, k; loop = _dbus_loop_new (); _dbus_assert (loop != NULL); @@ -389,10 +801,87 @@ bus_expire_list_test (const DBusString *test_data_dir) bus_expire_list_remove (list, &item->item); dbus_free (item); + + _dbus_assert (list->deleted_items_count == 0); + _dbus_assert (list->non_deleted_items_count == 0); + + /* add plenty of items */ + for (i = 0; i < 6; i++) + for (j = 0; j < 6; j++) + for (k = 0; k < 6; k++) + { + items[i][j][k] = dbus_new0 (TestExpireItem, 1); + items[i][j][k]->item.added_tv_sec = tv_sec; + items[i][j][k]->item.added_tv_usec = tv_usec; + items[i][j][k]->item.will_get_reply = _DBUS_INT_TO_POINTER (i); + items[i][j][k]->item.will_send_reply = _DBUS_INT_TO_POINTER (j); + items[i][j][k]->item.reply_serial = k; + + if (!bus_expire_list_add (list, &items[i][j][k]->item)) + _dbus_assert_not_reached ("out of memory"); + + /* 1 third is marked as deleted, 1 third is removed, 1 third remains + */ + if ((i + j + k) % 3 == 1) + bus_expire_list_mark_deleted (list, &items[i][j][k]->item); + if ((i + j + k) % 3 == 2) + { + bus_expire_list_remove (list, &items[i][j][k]->item); + dbus_free (items[i][j][k]); + } + } + + _dbus_assert (list->deleted_items_count == 6 * 6 * 6 / 3); + _dbus_assert (list->non_deleted_items_count == 6 * 6 * 6 / 3); + + /* not expired: do_expiration_with_monotonic_time() will just clean up the + * items already marked as deleted */ + next_interval = + do_expiration_with_monotonic_time (list, tv_sec_not_expired, + tv_usec_not_expired); + _dbus_verbose ("next_interval = %d\n", next_interval); + _dbus_assert (next_interval == 1); + _dbus_assert (list->deleted_items_count == 0); + _dbus_assert (list->non_deleted_items_count == 6 * 6 * 6 / 3); + for (i = 0; i < 6; i++) + for (j = 0; j < 6; j++) + for (k = 0; k < 6; k++) + if ((i + j + k) % 3 == 0) + { + _dbus_assert (!items[i][j][k]->item.deleted); + _dbus_assert (items[i][j][k]->expire_count == 0); + } + + /* expired: the callback increments expire_count. */ + next_interval = + do_expiration_with_monotonic_time (list, tv_sec_expired, + tv_usec_expired); + for (i = 0; i < 6; i++) + for (j = 0; j < 6; j++) + for (k = 0; k < 6; k++) + if ((i + j + k) % 3 == 0) + { + _dbus_assert (!items[i][j][k]->item.deleted); + _dbus_assert (items[i][j][k]->expire_count == 1); + bus_expire_list_mark_deleted (list, &items[i][j][k]->item); + } + _dbus_verbose ("next_interval = %d\n", next_interval); + _dbus_assert (next_interval == -1); + _dbus_assert (list->deleted_items_count == 6 * 6 * 6 / 3); + _dbus_assert (list->non_deleted_items_count == 0); + + /* cleanup the remaining items */ + next_interval = + do_expiration_with_monotonic_time (list, tv_sec_expired, + tv_usec_expired); + _dbus_verbose ("next_interval = %d\n", next_interval); + _dbus_assert (next_interval == -1); + _dbus_assert (list->deleted_items_count == 0); + _dbus_assert (list->non_deleted_items_count == 0); bus_expire_list_free (list); _dbus_loop_unref (loop); - + result = TRUE; oom: diff --git a/bus/expirelist.h b/bus/expirelist.h index 887cb97..182bf2d 100644 --- a/bus/expirelist.h +++ b/bus/expirelist.h @@ -27,20 +27,31 @@ #include #include #include +#include typedef struct BusExpireList BusExpireList; typedef struct BusExpireItem BusExpireItem; typedef dbus_bool_t (* BusExpireFunc) (BusExpireList *list, - DBusList *link, + BusExpireItem *item, void *data); -/* embed this in a child expire item struct */ struct BusExpireItem { long added_tv_sec; /**< Time we were added (seconds component) */ long added_tv_usec; /**< Time we were added (microsec component) */ + + BusExpireList *set; + + DBusConnection *will_get_reply; /**< caller */ + DBusConnection *will_send_reply; /**< callee */ + dbus_uint32_t reply_serial; + + /* Items are not immediately deleted. Instead, they are marked as deleted + * and when the transaction is executed or cancelled, the deletion is + * completed or rolled back. */ + dbus_bool_t deleted; }; BusExpireList* bus_expire_list_new (DBusLoop *loop, @@ -49,21 +60,25 @@ BusExpireList* bus_expire_list_new (DBusLoop *loop, void *data); void bus_expire_list_free (BusExpireList *list); void bus_expire_list_recheck_immediately (BusExpireList *list); -void bus_expire_list_remove_link (BusExpireList *list, - DBusList *link); dbus_bool_t bus_expire_list_remove (BusExpireList *list, BusExpireItem *item); -DBusList* bus_expire_list_get_first_link (BusExpireList *list); -DBusList* bus_expire_list_get_next_link (BusExpireList *list, - DBusList *link); +void bus_expire_list_mark_deleted (BusExpireList *list, + BusExpireItem *item); dbus_bool_t bus_expire_list_add (BusExpireList *list, BusExpireItem *item); -void bus_expire_list_add_link (BusExpireList *list, - DBusList *link); -dbus_bool_t bus_expire_list_contains_item (BusExpireList *list, +void bus_expire_list_restore (BusExpireList *list, BusExpireItem *item); -void bus_expire_list_unlink (BusExpireList *list, - DBusList *link); +int bus_expire_list_item_count_per_caller (BusExpireList *list, + DBusConnection *caller); +void bus_expire_list_foreach (BusExpireList *list, + DBusConnection *conn, + BusExpireFunc caller_func, + BusExpireFunc callee_func, + void *data); +BusExpireItem* bus_expire_list_lookup (BusExpireList *list, + DBusConnection *caller, + DBusConnection *callee, + dbus_uint32_t reply_serial); /* this macro and function are semi-related utility functions, not really part of the * BusExpireList API -- 1.8.5.3