From eafdad1f3c34cf181df4af944f54653d8c370db4 Mon Sep 17 00:00:00 2001 From: Debarshi Ray Date: Wed, 17 Aug 2011 23:15:45 +0300 Subject: [PATCH 08/18] IdleServerConnection: Async-ify idle_server_connection_send Fixes: https://bugs.freedesktop.org/37145 --- src/idle-connection.c | 111 +++++++++++++++++++++++++++++++---------- src/idle-server-connection.c | 33 +++++++++---- src/idle-server-connection.h | 3 +- 3 files changed, 109 insertions(+), 38 deletions(-) diff --git a/src/idle-connection.c b/src/idle-connection.c index 24ad75b..3bc9b07 100644 --- a/src/idle-connection.c +++ b/src/idle-connection.c @@ -82,12 +82,26 @@ G_DEFINE_TYPE_WITH_CODE(IdleConnection, idle_connection, TP_TYPE_BASE_CONNECTION ); typedef struct _IdleOutputPendingMsg IdleOutputPendingMsg; +typedef struct _MsgQueueTimeoutData MsgQueueTimeoutData; +typedef struct _SendWithPriorityData SendWithPriorityData; + struct _IdleOutputPendingMsg { gchar *message; guint priority; guint64 id; }; +struct _MsgQueueTimeoutData { + IdleConnection *conn; + gint limit; +}; + +struct _SendWithPriorityData { + IdleConnection *conn; + gchar *msg; + guint priority; +}; + static IdleOutputPendingMsg *idle_output_pending_msg_new() { IdleOutputPendingMsg *msg = g_slice_new(IdleOutputPendingMsg); static guint64 last_id = 0; @@ -799,13 +813,39 @@ static gboolean keepalive_timeout_cb(gpointer user_data) { return TRUE; } +static void _msg_queue_timeout_ready(GObject *source_object, GAsyncResult *res, gpointer user_data) { + IdleServerConnection *sconn = IDLE_SERVER_CONNECTION(source_object); + MsgQueueTimeoutData *data = (MsgQueueTimeoutData *) user_data; + IdleConnection *conn = data->conn; + IdleConnectionPrivate *priv = IDLE_CONNECTION_GET_PRIVATE(conn); + IdleOutputPendingMsg *output_msg; + int i; + int limit = data->limit; + GError *error = NULL; + + g_slice_free(MsgQueueTimeoutData, data); + + if (!idle_server_connection_send_finish(sconn, res, &error)) { + IDLE_DEBUG("idle_server_connection_send failed: %s", error->message); + g_error_free(error); + return; + } + + for (i = 0; i < limit; i++) { + output_msg = g_queue_pop_head(priv->msg_queue); + idle_output_pending_msg_free(output_msg); + } + + priv->last_msg_sent = time(NULL); +} + static gboolean msg_queue_timeout_cb(gpointer user_data) { IdleConnection *conn = IDLE_CONNECTION(user_data); IdleConnectionPrivate *priv = IDLE_CONNECTION_GET_PRIVATE(conn); - int i, j; + int i; IdleOutputPendingMsg *output_msg; + MsgQueueTimeoutData *data; gchar msg[IRC_MSG_MAXLEN + 3]; - GError *error = NULL; IDLE_DEBUG("called"); @@ -835,20 +875,44 @@ static gboolean msg_queue_timeout_cb(gpointer user_data) { break; } - if (idle_server_connection_send(priv->conn, msg, &error)) { - for (j = 0; j < i; j++) { - output_msg = g_queue_pop_head(priv->msg_queue); - idle_output_pending_msg_free(output_msg); - } + data = g_slice_new0(MsgQueueTimeoutData); + data->conn = conn; + data->limit = i; - priv->last_msg_sent = time(NULL); - } else { - IDLE_DEBUG("low-level network connection failed to send: %s", error->message); + idle_server_connection_send_async(priv->conn, msg, NULL, _msg_queue_timeout_ready, data); + return TRUE; +} +static void _add_msg_to_queue(IdleConnection *conn, gchar *msg, guint priority) { + IdleConnectionPrivate *priv = IDLE_CONNECTION_GET_PRIVATE(conn); + IdleOutputPendingMsg *output_msg; + + output_msg = idle_output_pending_msg_new(); + output_msg->message = msg; + output_msg->priority = priority; + + g_queue_insert_sorted(priv->msg_queue, output_msg, pending_msg_compare, NULL); + idle_connection_add_queue_timeout (conn); +} + +static void _send_with_max_priority_ready(GObject *source_object, GAsyncResult *res, gpointer user_data) { + IdleServerConnection *sconn = IDLE_SERVER_CONNECTION(source_object); + SendWithPriorityData *data = (SendWithPriorityData *) user_data; + IdleConnection *conn = data->conn; + gchar *msg = data->msg; + guint priority = data->priority; + GError *error = NULL; + + g_slice_free(SendWithPriorityData, data); + + if (!idle_server_connection_send_finish(sconn, res, &error)) { + IDLE_DEBUG("idle_server_connection_send failed: %s", error->message); g_error_free(error); + _add_msg_to_queue(conn, msg, priority); + return; } - return TRUE; + g_free(msg); } static void @@ -885,12 +949,11 @@ idle_connection_clear_queue_timeout (IdleConnection *self) static void _send_with_priority(IdleConnection *conn, const gchar *msg, guint priority) { gchar cmd[IRC_MSG_MAXLEN + 3]; IdleConnectionPrivate *priv = IDLE_CONNECTION_GET_PRIVATE(conn); + SendWithPriorityData *data; int len; - GError *error = NULL; gchar *converted; GError *convert_error = NULL; time_t curr_time = time(NULL); - IdleOutputPendingMsg *output_msg; g_assert(msg != NULL); @@ -919,22 +982,16 @@ static void _send_with_priority(IdleConnection *conn, const gchar *msg, guint pr curr_time - priv->last_msg_sent > MSG_QUEUE_TIMEOUT)) { priv->last_msg_sent = curr_time; - if (!idle_server_connection_send(priv->conn, converted, &error)) { - IDLE_DEBUG("server connection failed to send: %s", error->message); - g_error_free(error); - } else { - g_free(converted); - return; - } - } + data = g_slice_new0(SendWithPriorityData); + data->conn = conn; + data->msg = converted; + data->priority = priority; - output_msg = idle_output_pending_msg_new(); - output_msg->message = converted; - output_msg->priority = priority; - - g_queue_insert_sorted(priv->msg_queue, output_msg, pending_msg_compare, NULL); + idle_server_connection_send_async(priv->conn, converted, NULL, _send_with_max_priority_ready, data); + return; + } - idle_connection_add_queue_timeout (conn); + _add_msg_to_queue(conn, converted, priority); } void idle_connection_send(IdleConnection *conn, const gchar *msg) { diff --git a/src/idle-server-connection.c b/src/idle-server-connection.c index 2259b56..1fd8c26 100644 --- a/src/idle-server-connection.c +++ b/src/idle-server-connection.c @@ -404,37 +404,46 @@ gboolean idle_server_connection_disconnect_full(IdleServerConnection *conn, GErr static void _write_ready(GObject *source_object, GAsyncResult *res, gpointer user_data) { GOutputStream *output_stream = G_OUTPUT_STREAM(source_object); - IdleServerConnection *conn = IDLE_SERVER_CONNECTION(user_data); + GSimpleAsyncResult *result = G_SIMPLE_ASYNC_RESULT(user_data); + IdleServerConnection *conn = IDLE_SERVER_CONNECTION(g_async_result_get_source_object(G_ASYNC_RESULT(result))); IdleServerConnectionPrivate *priv = IDLE_SERVER_CONNECTION_GET_PRIVATE(conn); gssize nwrite; GError *error = NULL; + g_object_unref(conn); + nwrite = g_output_stream_write_finish(output_stream, res, &error); if (nwrite == -1) { IDLE_DEBUG("g_output_stream_write failed : %s", error->message); + g_simple_async_result_set_error(result, TP_ERRORS, TP_ERROR_NETWORK_ERROR, "%s", error->message); g_error_free(error); goto cleanup; } priv->nwritten += nwrite; if (priv->nwritten < priv->count) { - g_object_ref(conn); - g_output_stream_write_async(output_stream, priv->output_buffer + priv->nwritten, priv->count - priv->nwritten, G_PRIORITY_DEFAULT, priv->cancellable, _write_ready, conn); + g_output_stream_write_async(output_stream, priv->output_buffer + priv->nwritten, priv->count - priv->nwritten, G_PRIORITY_DEFAULT, priv->cancellable, _write_ready, result); + return; } cleanup: - g_object_unref(conn); + g_simple_async_result_complete(result); + g_object_unref(result); } -gboolean idle_server_connection_send(IdleServerConnection *conn, const gchar *cmd, GError **error) { +void idle_server_connection_send_async(IdleServerConnection *conn, const gchar *cmd, GCancellable *cancellable, GAsyncReadyCallback callback, gpointer user_data) { IdleServerConnectionPrivate *priv = IDLE_SERVER_CONNECTION_GET_PRIVATE(conn); GOutputStream *output_stream; + GSimpleAsyncResult *result; gsize output_buffer_size = sizeof(priv->output_buffer); if (priv->state != SERVER_CONNECTION_STATE_CONNECTED) { IDLE_DEBUG("connection was not open!"); - g_set_error(error, TP_ERRORS, TP_ERROR_NOT_AVAILABLE, "connection was not open!"); - return FALSE; + g_simple_async_report_error_in_idle(G_OBJECT(conn), + callback, user_data, + TP_ERRORS, TP_ERROR_NOT_AVAILABLE, + "connection was not open!"); + return; } priv->count = strlen(cmd); @@ -448,13 +457,17 @@ gboolean idle_server_connection_send(IdleServerConnection *conn, const gchar *cm strncpy(priv->output_buffer, cmd, output_buffer_size); priv->nwritten = 0; - g_object_ref(conn); output_stream = g_io_stream_get_output_stream(priv->io_stream); - g_output_stream_write_async(output_stream, priv->output_buffer, priv->count, G_PRIORITY_DEFAULT, priv->cancellable, _write_ready, conn); + result = g_simple_async_result_new(G_OBJECT(conn), callback, user_data, idle_server_connection_send_async); + g_output_stream_write_async(output_stream, priv->output_buffer, priv->count, G_PRIORITY_DEFAULT, priv->cancellable, _write_ready, result); IDLE_DEBUG("sending \"%s\" to OutputStream %p", priv->output_buffer, output_stream); - return TRUE; +} + +gboolean idle_server_connection_send_finish(IdleServerConnection *conn, GAsyncResult *result, GError **error) { + g_return_val_if_fail(g_simple_async_result_is_valid(result, G_OBJECT(conn), idle_server_connection_send_async), FALSE); + return !g_simple_async_result_propagate_error (G_SIMPLE_ASYNC_RESULT(result), error); } IdleServerConnectionState idle_server_connection_get_state(IdleServerConnection *conn) { diff --git a/src/idle-server-connection.h b/src/idle-server-connection.h index 32d988f..27139d2 100644 --- a/src/idle-server-connection.h +++ b/src/idle-server-connection.h @@ -73,7 +73,8 @@ void idle_server_connection_connect_async(IdleServerConnection *conn, GCancellab gboolean idle_server_connection_connect_finish(IdleServerConnection *conn, GAsyncResult *result, GError **error); gboolean idle_server_connection_disconnect(IdleServerConnection *conn, GError **error); gboolean idle_server_connection_disconnect_full(IdleServerConnection *conn, GError **error, guint reason); -gboolean idle_server_connection_send(IdleServerConnection *conn, const gchar *cmd, GError **error); +void idle_server_connection_send_async(IdleServerConnection *conn, const gchar *cmd, GCancellable *cancellable, GAsyncReadyCallback callback, gpointer user_data); +gboolean idle_server_connection_send_finish(IdleServerConnection *conn, GAsyncResult *result, GError **error); IdleServerConnectionState idle_server_connection_get_state(IdleServerConnection *conn); void idle_server_connection_set_tls(IdleServerConnection *conn, gboolean tls); -- 1.7.6.2