Index: dbus/Makefile.am =================================================================== RCS file: /usr/local/cvs/linux/libs/dbus-glib/dbus-glib/dbus/Makefile.am,v retrieving revision 1.2 retrieving revision 1.2.4.1 diff -u -r1.2 -r1.2.4.1 --- dbus/Makefile.am 14 Mar 2007 13:27:16 -0000 1.2 +++ dbus/Makefile.am 28 Nov 2008 11:13:18 -0000 1.2.4.1 @@ -31,7 +31,9 @@ dbus-gsignature.h \ dbus-gvalue.h \ dbus-gvalue-utils.c \ - dbus-gvalue-utils.h + dbus-gvalue-utils.h \ + dbus-gasync.c \ + dbus-gasync.h libdbus_glib_1_la_SOURCES = \ dbus-glib-error-switch.h \ Index: dbus/dbus-binding-tool-glib.c =================================================================== RCS file: /usr/local/cvs/linux/libs/dbus-glib/dbus-glib/dbus/dbus-binding-tool-glib.c,v retrieving revision 1.3 retrieving revision 1.3.2.1 diff -u -r1.3 -r1.3.2.1 --- dbus/dbus-binding-tool-glib.c 20 Nov 2008 13:38:07 -0000 1.3 +++ dbus/dbus-binding-tool-glib.c 28 Nov 2008 11:13:18 -0000 1.3.2.1 @@ -137,10 +137,20 @@ GType retval_type; GArray *ret; gboolean is_async; + gboolean is_thread; const char *arg_type; gboolean retval_signals_error; is_async = method_info_get_annotation (method, DBUS_GLIB_ANNOTATION_ASYNC) != NULL; + is_thread = method_info_get_annotation (method, DBUS_GLIB_ANNOTATION_THREAD) != NULL; + if (is_async && is_thread) + { + g_set_error (error, + DBUS_BINDING_TOOL_ERROR, + DBUS_BINDING_TOOL_ERROR_INVALID_ANNOTATION, + "Asynchronous methods can not be threaded"); + return FALSE; + } retval_signals_error = FALSE; ret = g_array_new (TRUE, TRUE, sizeof (GType)); @@ -546,6 +556,7 @@ char *marshaller_name; char *method_c_name; gboolean async = FALSE; + gboolean thread = FALSE; GSList *args; gboolean found_retval = FALSE; @@ -580,6 +591,9 @@ if (method_info_get_annotation (method, DBUS_GLIB_ANNOTATION_ASYNC) != NULL) async = TRUE; + if (method_info_get_annotation (method, DBUS_GLIB_ANNOTATION_THREAD) != NULL) + thread = TRUE; + /* Object method data blob format: * \0\0(\0\0\0)*\0 */ @@ -590,7 +604,7 @@ g_string_append (object_introspection_data_blob, method_info_get_name (method)); g_string_append_c (object_introspection_data_blob, '\0'); - g_string_append_c (object_introspection_data_blob, async ? 'A' : 'S'); + g_string_append_c (object_introspection_data_blob, async ? 'A' : (thread ? 'T' : 'S')); g_string_append_c (object_introspection_data_blob, '\0'); for (args = method_info_get_args (method); args; args = args->next) Index: dbus/dbus-binding-tool-glib.h =================================================================== RCS file: /usr/local/cvs/linux/libs/dbus-glib/dbus-glib/dbus/dbus-binding-tool-glib.h,v retrieving revision 1.1.1.1 retrieving revision 1.1.1.1.4.1 diff -u -r1.1.1.1 -r1.1.1.1.4.1 --- dbus/dbus-binding-tool-glib.h 14 Mar 2007 13:15:37 -0000 1.1.1.1 +++ dbus/dbus-binding-tool-glib.h 28 Nov 2008 11:13:18 -0000 1.1.1.1.4.1 @@ -27,6 +27,7 @@ #define DBUS_GLIB_ANNOTATION_C_SYMBOL "org.freedesktop.DBus.GLib.CSymbol" #define DBUS_GLIB_ANNOTATION_CLIENT_C_SYMBOL "org.freedesktop.DBus.GLib.ClientCSymbol" +#define DBUS_GLIB_ANNOTATION_THREAD "org.freedesktop.DBus.GLib.Thread" #define DBUS_GLIB_ANNOTATION_ASYNC "org.freedesktop.DBus.GLib.Async" #define DBUS_GLIB_ANNOTATION_CONST "org.freedesktop.DBus.GLib.Const" #define DBUS_GLIB_ANNOTATION_RETURNVAL "org.freedesktop.DBus.GLib.ReturnVal" Index: dbus/dbus-gasync.c =================================================================== RCS file: dbus/dbus-gasync.c diff -N dbus/dbus-gasync.c --- /dev/null 1 Jan 1970 00:00:00 -0000 +++ dbus/dbus-gasync.c 8 Dec 2008 13:09:24 -0000 1.1.4.6 @@ -0,0 +1,151 @@ +#include +#include + +#include "dbus-gasync.h" + +/* --- variables --- */ +static GThreadPool *dbus_pool = NULL; + +typedef struct _DBusTask DBusTask; + +struct _DBusTask { + GThreadedFunc callback; + gpointer data; +}; + +/* --- prototypes --- */ + +static void delete_task (DBusTask *task); +static DBusTask* create_new_task (GThreadedFunc callback, + gpointer data); +static void exec_task (DBusTask *task); +static void async_dbus_func (gpointer data, + gpointer user_data); +static gboolean push_task_to_pool (DBusTask *task); + +/* --- functions --- */ + +static DBusTask* +create_new_task (GThreadedFunc callback, + gpointer data) +{ + DBusTask *task = g_slice_new (DBusTask); + + if (NULL == task) + { + g_error ("Could not create memory slice"); + return NULL; + } + + task->callback = callback; + task->data = data; + + return task; +} + +static void +delete_task (DBusTask *task) +{ + g_slice_free (DBusTask, task); +} + +static void +exec_task (DBusTask *task) +{ + task->callback (task->data); +} + +static gboolean +push_task_to_pool (DBusTask *task) +{ + GError *err = NULL; + + g_thread_pool_push (dbus_pool, task, &err); + if (err) + { + g_error ("Could not push the task to the pool: %s", err->message); + g_error_free (err); + return FALSE; + } + + return TRUE; +} + +static void +async_dbus_func (gpointer data, + gpointer user_data) +{ + DBusTask *task = (DBusTask *) data; + exec_task (task); + delete_task (task); +} + +/** + * _dbus_gasync_thread_pool_start: + * @callback: callback function which is called in a thread + * @data: user data submitted to the callback function + * + * Calls the callback function asynchronously in a thread pool. + * dbus_g_thread_pool_init() must be called prior to this function. + * + * Returns: TRUE on success and FALSE on failure + */ +gboolean +_dbus_gasync_thread_pool_start (GThreadedFunc callback, + gpointer data) +{ + gboolean res = TRUE; + + g_return_val_if_fail (NULL != dbus_pool, FALSE); + + /* Create a new task */ + DBusTask *task = create_new_task (callback, + data); + + g_return_val_if_fail (NULL != task, FALSE); + + /* Execute the task asynchronously */ + res = push_task_to_pool (task); + + return res; +} + +/** + * dbus_g_thread_pool_init: + * @max_threads: the maximal number of threads to execute concurrently in the + * thread pool + * + * Creates a new thread pool. This function should only be called + * once and must be called prior to calling _dbus_gasync_thread_pool_start() + * + * Returns: TRUE on success and FALSE on failure + */ +gboolean +dbus_g_thread_pool_init (gint max_threads) +{ + + GError *err = NULL; + + dbus_pool = g_thread_pool_new (async_dbus_func, NULL, max_threads, TRUE, &err); + if (err) + { + g_error ("Could not create thread pool: %s", err->message); + g_error_free (err); + return FALSE; + } + + return TRUE; +} + +/** + * dbus_g_thread_pool_free: + * + * Frees all resources allocated for the thread_pool. + */ +void +dbus_g_thread_pool_free (void) +{ + g_thread_pool_free (dbus_pool, FALSE, TRUE); + dbus_pool = NULL; +} + Index: dbus/dbus-gasync.h =================================================================== RCS file: dbus/dbus-gasync.h diff -N dbus/dbus-gasync.h --- /dev/null 1 Jan 1970 00:00:00 -0000 +++ dbus/dbus-gasync.h 4 Dec 2008 12:02:16 -0000 1.1.4.4 @@ -0,0 +1,13 @@ +#ifndef __DBUS_GLIB_ASYNC_THREAD_POOL_H__ +#define __DBUS_GLIB_ASYNC_THREAD_POOL_H__ + +#include + +typedef void (*GThreadedFunc) (gpointer data); + +gboolean +_dbus_gasync_thread_pool_start (GThreadedFunc callback, + gpointer data); + +#endif /* __DBUS_GLIB_ASYNC_THREAD_POOL_H__ */ + Index: dbus/dbus-glib.h =================================================================== RCS file: /usr/local/cvs/linux/libs/dbus-glib/dbus-glib/dbus/dbus-glib.h,v retrieving revision 1.1.1.2 retrieving revision 1.1.1.2.2.2 diff -u -r1.1.1.2 -r1.1.1.2.2.2 --- dbus/dbus-glib.h 20 Nov 2008 12:50:37 -0000 1.1.1.2 +++ dbus/dbus-glib.h 8 Dec 2008 13:09:24 -0000 1.1.1.2.2.2 @@ -307,6 +307,10 @@ void dbus_g_method_return_error (DBusGMethodInvocation *context, GError *error); +gboolean dbus_g_thread_pool_init (gint max_threads); + +void dbus_g_thread_pool_free (void); + /* Probably possible to replace this with a closure */ typedef struct { GCallback cb; Index: dbus/dbus-gobject.c =================================================================== RCS file: /usr/local/cvs/linux/libs/dbus-glib/dbus-glib/dbus/dbus-gobject.c,v retrieving revision 1.1.1.2 retrieving revision 1.1.1.2.2.12 diff -u -r1.1.1.2 -r1.1.1.2.2.12 --- dbus/dbus-gobject.c 20 Nov 2008 12:50:37 -0000 1.1.1.2 +++ dbus/dbus-gobject.c 8 Dec 2008 13:09:24 -0000 1.1.1.2.2.12 @@ -33,14 +33,23 @@ #include "dbus-gvalue.h" #include "dbus-gmarshal.h" #include "dbus-gvalue-utils.h" +#include "dbus-gasync.h" #include +typedef enum { + SYNCHRONOUS, + SYNCHRONOUS_THREADED, + ASYNCHRONOUS +} GMethodCallType; + typedef struct { char *default_iface; GType code_enum; } DBusGErrorInfo; +typedef struct _DBusGSyncThreadedData DBusGSyncThreadedData; + static GStaticRWLock globals_lock = G_STATIC_RW_LOCK_INIT; static GHashTable *marshal_table = NULL; static GData *error_metadata = NULL; @@ -1048,6 +1057,151 @@ const DBusGMethodInfo *method; /**< The method called */ }; +/** + * The data provided to the callback function which calls synchronous + * threaded dbus methods + */ +struct _DBusGSyncThreadedData { + const DBusGMethodInfo *method; + GClosure *closure; + GValueArray *value_array; + GArray *out_param_values; + GValueArray *out_param_gvalues; +}; + +static gboolean +add_output_arguments (DBusMessageIter *iter, + const DBusGObjectInfo *object_info, + const DBusGMethodInfo *method, + GArray *out_param_values, + GValueArray *out_param_gvalues); + +/** + * Creates a return message for a given method invocation, with arguments. + * The arguments are the elements in the value array. + */ +static DBusMessage* +method_create_response (DBusGMethodInvocation *context, + GArray *out_param_values, + GValueArray *out_param_gvalues) +{ + DBusMessage *reply; + DBusMessageIter iter; + + reply = dbus_message_new_method_return (dbus_g_message_get_message (context->message)); + if (!reply) + { + g_error ("Out of memory"); + return NULL; + } + + dbus_message_iter_init_append (reply, &iter); + + if (!add_output_arguments (&iter, + context->object, + context->method, + out_param_values, + out_param_gvalues)) + { + g_error ("Out of memory"); + dbus_message_unref (reply); + return NULL; + } + + return reply; +} + +/** + * This function is called in a thread and invokes regular synchronous + * dbus method. + * + * The context which is supposed to be the last element in the value_array + * is used for sending dbus messages. Messages are sent using the + * dbus_g_method_send_reply() function which also unrefs the context. + * + * This function will free all the three arrays used for sending/getting + * arguments: value_array, out_param_values and out_param_gvalues. + * + * Note that the dbus method called by the marshaller is a regular syncronous + * method so the context is hidden from it! + */ +static void +invoke_marshaller_threaded (gpointer data) +{ + GError *error = NULL; + guint last_element; + DBusGMethodInvocation *context; + GValue return_value = {0,}; + DBusMessage *reply = NULL; + DBusGSyncThreadedData *method_data; + + method_data = (DBusGSyncThreadedData *)data; + + g_assert (method_data != NULL); + + const DBusGMethodInfo *method = method_data->method; + GClosure *closure = method_data->closure; + GValueArray *value_array = method_data->value_array; + GArray *out_param_values = method_data->out_param_values; + GValueArray *out_param_gvalues = method_data->out_param_gvalues; + + g_assert (method != NULL); + g_assert (closure != NULL); + g_assert (value_array != NULL); + g_assert (out_param_values != NULL); + g_assert (out_param_gvalues != NULL); + + /* + * Init return_value, it is needed by the marshaller function + * if the dbus function(the one described in the xml file) is not void + * Actually its value is never considered, perhaps that should + * be fixed? + */ + g_value_init (&return_value, G_TYPE_BOOLEAN); + + /* Get the index of the last element in the value_array */ + last_element = value_array->n_values - 1; + + /* Get the context */ + context = g_value_get_pointer (value_array->values + last_element); + g_assert (context != NULL); + + /* Remove the context and add the error instead */ + g_value_unset (g_value_array_get_nth (value_array, last_element)); + g_value_init (g_value_array_get_nth (value_array, last_element), G_TYPE_POINTER); + g_value_set_pointer (g_value_array_get_nth (value_array, last_element), &error); + + /* Invoke marshaller */ + method->marshaller (closure, + &return_value, + value_array->n_values, + value_array->values, + NULL, + method->function); + + /* Return a message with arguments or error */ + if (error) + { + reply = gerror_to_dbus_error_message (context->object, dbus_g_message_get_message (context->message), error); + g_error_free (error); + } + else + { + reply = method_create_response (context, out_param_values, out_param_gvalues); + } + + g_array_free (out_param_values, TRUE); + g_value_array_free (out_param_gvalues); + g_value_array_free (value_array); + + g_slice_free (DBusGSyncThreadedData, method_data); + + if (reply) + { + dbus_g_method_send_reply (context, reply); + } +} + static DBusHandlerResult invoke_object_method (GObject *object, const DBusGObjectInfo *object_info, @@ -1055,7 +1209,8 @@ DBusConnection *connection, DBusMessage *message) { - gboolean had_error, call_only; + GMethodCallType call_type; + gboolean had_error; GError *gerror; GValueArray *value_array; GValue return_value = {0,}; @@ -1072,16 +1227,31 @@ gboolean retval_is_synthetic; gboolean retval_is_constant; const char *arg_metadata; + const char *call_type_str; + DBusGSyncThreadedData *method_data; gerror = NULL; - /* Determine whether or not this method should be invoked in a new - thread + /* Determine how the method is to be called. 3 possible ways: + * + * synchronously - method_call_type 'S' + * synchronously in a thread - method_call_type 'T' + * asynchronously - method_call_type 'A' */ - if (strcmp (string_table_lookup (get_method_data (object_info, method), 2), "A") == 0) - call_only = TRUE; - else - call_only = FALSE; + call_type_str = string_table_lookup (get_method_data (object_info, method), 2); + switch (*call_type_str) + { + case 'A': /* call method asynchronously */ + call_type = ASYNCHRONOUS; + break; + case 'T': /* call synchronous method in a thread */ + call_type = SYNCHRONOUS_THREADED; + break; + case 'S': /* call fully synchronous method */ + default: + call_type = SYNCHRONOUS; + break; + } have_retval = FALSE; retval_signals_error = FALSE; @@ -1131,20 +1301,7 @@ g_value_init (g_value_array_get_nth (value_array, 0), G_TYPE_OBJECT); g_value_set_object (g_value_array_get_nth (value_array, 0), object); - if (call_only) - { - GValue context_value = {0,}; - DBusGMethodInvocation *context; - context = g_new (DBusGMethodInvocation, 1); - context->connection = dbus_g_connection_ref (DBUS_G_CONNECTION_FROM_CONNECTION (connection)); - context->message = dbus_g_message_ref (DBUS_G_MESSAGE_FROM_MESSAGE (message)); - context->object = object_info; - context->method = method; - g_value_init (&context_value, G_TYPE_POINTER); - g_value_set_pointer (&context_value, context); - g_value_array_append (value_array, &context_value); - } - else + if (ASYNCHRONOUS!=call_type) { RetvalType retval; gboolean arg_in; @@ -1200,7 +1357,7 @@ * is a "synthetic" return value; i.e. we aren't going to be * sending it over the bus, it's just to signal an error. */ - if (!have_retval) + if (!have_retval && SYNCHRONOUS_THREADED!=call_type) { have_retval = TRUE; retval_is_synthetic = TRUE; @@ -1267,17 +1424,61 @@ if (retval_signals_error) { g_assert (have_retval); + g_assert (SYNCHRONOUS==call_type); g_value_array_append (value_array, NULL); g_value_init (g_value_array_get_nth (value_array, value_array->n_values - 1), G_TYPE_POINTER); g_value_set_pointer (g_value_array_get_nth (value_array, value_array->n_values - 1), &gerror); } - + + /* + * Append DBusGMethodInvocation as final argument in case of + * asynchronous or synchronous threaded calls + */ + if (ASYNCHRONOUS==call_type || SYNCHRONOUS_THREADED==call_type) + { + GValue context_value = {0,}; + DBusGMethodInvocation *context; + context = g_new (DBusGMethodInvocation, 1); + context->connection = dbus_g_connection_ref (DBUS_G_CONNECTION_FROM_CONNECTION (connection)); + context->message = dbus_g_message_ref (DBUS_G_MESSAGE_FROM_MESSAGE (message)); + context->object = object_info; + context->method = method; + g_value_init (&context_value, G_TYPE_POINTER); + g_value_set_pointer (&context_value, context); + g_value_array_append (value_array, &context_value); + } + /* Actually invoke method */ - method->marshaller (&closure, have_retval ? &return_value : NULL, - value_array->n_values, - value_array->values, - NULL, method->function); - if (call_only) + switch (call_type) + { + case SYNCHRONOUS_THREADED: + method_data = g_slice_new (DBusGSyncThreadedData); + if (!method_data) + goto nomem; + + method_data->method = method; + method_data->closure = &closure; + method_data->value_array = value_array; + method_data->out_param_values = out_param_values; + method_data->out_param_gvalues = out_param_gvalues; + + if (!_dbus_gasync_thread_pool_start (invoke_marshaller_threaded, + method_data)) + { + g_slice_free (DBusGSyncThreadedData, method_data); + goto nomem; + } + break; + case ASYNCHRONOUS: + case SYNCHRONOUS: + method->marshaller (&closure, have_retval ? &return_value : NULL, + value_array->n_values, + value_array->values, + NULL, method->function); + break; + } + + if (ASYNCHRONOUS==call_type || SYNCHRONOUS_THREADED==call_type) { result = DBUS_HANDLER_RESULT_HANDLED; goto done; @@ -1307,61 +1508,13 @@ g_value_unset (&return_value); } - /* Grab the argument metadata and iterate over it */ - arg_metadata = method_arg_info_from_object_info (object_info, method); - /* Now append any remaining return values */ - out_param_pos = 0; - out_param_gvalue_pos = 0; - while (*arg_metadata) - { - GValue gvalue = {0, }; - const char *arg_name; - gboolean arg_in; - gboolean constval; - RetvalType retval; - const char *arg_signature; - DBusSignatureIter argsigiter; - - do - { - /* Iterate over only output values; skip over input - arguments and the return value */ - arg_metadata = arg_iterate (arg_metadata, &arg_name, &arg_in, &constval, &retval, &arg_signature); - } - while ((arg_in || retval != RETVAL_NONE) && *arg_metadata); - - /* If the last argument we saw was input or the return - * value, we must be done iterating over output arguments. - */ - if (arg_in || retval != RETVAL_NONE) - break; - - dbus_signature_iter_init (&argsigiter, arg_signature); - - g_value_init (&gvalue, _dbus_gtype_from_signature_iter (&argsigiter, FALSE)); - if (G_VALUE_TYPE (&gvalue) != G_TYPE_VALUE) - { - if (!_dbus_gvalue_take (&gvalue, - &(g_array_index (out_param_values, GTypeCValue, out_param_pos)))) - g_assert_not_reached (); - out_param_pos++; - } - else - { - g_value_set_static_boxed (&gvalue, out_param_gvalues->values + out_param_gvalue_pos); - out_param_gvalue_pos++; - } - - if (!_dbus_gvalue_marshal (&iter, &gvalue)) - goto nomem; - /* Here we actually free the allocated value; we - * took ownership of it with _dbus_gvalue_take, unless - * an annotation has specified this value as constant. - */ - if (!constval) - g_value_unset (&gvalue); - } + if (!add_output_arguments (&iter, + object_info, + method, + out_param_values, + out_param_gvalues)) + goto nomem; } else reply = gerror_to_dbus_error_message (object_info, message, gerror); @@ -1375,18 +1528,98 @@ result = DBUS_HANDLER_RESULT_HANDLED; done: g_free (in_signature); - if (!call_only) + if (SYNCHRONOUS_THREADED!=call_type) { - g_array_free (out_param_values, TRUE); - g_value_array_free (out_param_gvalues); + /* + * if call_type is synchronous threaded the arrays will be freed + * within the thread_pool by invoke_marshaller_method() + */ + if (out_param_values) + g_array_free (out_param_values, TRUE); + if (out_param_gvalues) + g_value_array_free (out_param_gvalues); + g_value_array_free (value_array); } - g_value_array_free (value_array); return result; nomem: result = DBUS_HANDLER_RESULT_NEED_MEMORY; goto done; } +static gboolean +add_output_arguments (DBusMessageIter *iter, + const DBusGObjectInfo *object_info, + const DBusGMethodInfo *method, + GArray *out_param_values, + GValueArray *out_param_gvalues) +{ + const char *arg_metadata; + int out_param_pos, out_param_gvalue_pos; + gboolean res = TRUE; + + /* Grab the argument metadata and iterate over it */ + arg_metadata = method_arg_info_from_object_info (object_info, method); + + /* Append the remaining return values */ + out_param_pos = 0; + out_param_gvalue_pos = 0; + while (*arg_metadata) + { + GValue gvalue = {0, }; + const char *arg_name; + gboolean arg_in; + gboolean constval; + RetvalType retval; + const char *arg_signature; + DBusSignatureIter argsigiter; + + do + { + /* Iterate over only output values; skip over input + arguments and the return value */ + arg_metadata = arg_iterate (arg_metadata, &arg_name, &arg_in, &constval, &retval, &arg_signature); + } + while ((arg_in || retval != RETVAL_NONE) && *arg_metadata); + + /* If the last argument we saw was input or the return + * value, we must be done iterating over output arguments. + */ + if (arg_in || retval != RETVAL_NONE) + break; + + dbus_signature_iter_init (&argsigiter, arg_signature); + + g_value_init (&gvalue, _dbus_gtype_from_signature_iter (&argsigiter, FALSE)); + if (G_VALUE_TYPE (&gvalue) != G_TYPE_VALUE) + { + if (!_dbus_gvalue_take (&gvalue, + &(g_array_index (out_param_values, GTypeCValue, out_param_pos)))) + g_assert_not_reached (); + out_param_pos++; + } + else + { + g_value_set_static_boxed (&gvalue, out_param_gvalues->values + out_param_gvalue_pos); + out_param_gvalue_pos++; + } + + if (!_dbus_gvalue_marshal (iter, &gvalue)) + { + res = FALSE; + break; + } + + /* Here we actually free the allocated value; we + * took ownership of it with _dbus_gvalue_take, unless + * an annotation has specified this value as constant. + */ + if (!constval) + g_value_unset (&gvalue); + } + + return res; +} + static DBusHandlerResult gobject_message_function (DBusConnection *connection, DBusMessage *message,