From 96eb4c39cfafe31d0a4e9204c9b3649f1ef421ce Mon Sep 17 00:00:00 2001 From: Simon McVittie Date: Wed, 25 Sep 2013 17:13:00 +0100 Subject: [PATCH 3/4] servicetest: add infrastructure for faking D-Bus services, from MC I accidentally a small Python D-Bus binding. --- tests/twisted/servicetest.py | 146 ++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 145 insertions(+), 1 deletion(-) diff --git a/tests/twisted/servicetest.py b/tests/twisted/servicetest.py index a04b72d..b25fe42 100644 --- a/tests/twisted/servicetest.py +++ b/tests/twisted/servicetest.py @@ -1,6 +1,23 @@ +# Copyright (C) 2009 Nokia Corporation +# Copyright (C) 2009-2013 Collabora Ltd. +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, but +# WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library; if not, write to the Free Software +# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA +# 02110-1301 USA """ -Infrastructure code for testing connection managers. +Infrastructure code for testing Telepathy services. """ from twisted.internet import glib2reactor @@ -14,6 +31,7 @@ import pprint import unittest import dbus +import dbus.lowlevel from dbus.mainloop.glib import DBusGMainLoop DBusGMainLoop(set_as_default=True) @@ -291,6 +309,11 @@ class IteratingEventQueue(BaseEventQueue): def __init__(self, timeout=None): BaseEventQueue.__init__(self, timeout) + self._dbus_method_impls = [] + self._buses = [] + # a message filter which will claim we handled everything + self._dbus_dev_null = \ + lambda bus, message: dbus.lowlevel.HANDLER_RESULT_HANDLED def wait(self, queues=None): stop = [False] @@ -315,6 +338,127 @@ class IteratingEventQueue(BaseEventQueue): else: raise TimeoutError + def add_dbus_method_impl(self, cb, bus=None, **kwargs): + if bus is None: + bus = self._buses[0] + + self._dbus_method_impls.append( + (EventPattern('dbus-method-call', **kwargs), cb)) + + def dbus_emit(self, path, iface, name, *a, **k): + bus = k.pop('bus', self._buses[0]) + assert 'signature' in k, k + message = dbus.lowlevel.SignalMessage(path, iface, name) + message.append(*a, **k) + bus.send_message(message) + + def dbus_return(self, in_reply_to, *a, **k): + bus = k.pop('bus', self._buses[0]) + assert 'signature' in k, k + reply = dbus.lowlevel.MethodReturnMessage(in_reply_to) + reply.append(*a, **k) + bus.send_message(reply) + + def dbus_raise(self, in_reply_to, name, message=None, bus=None): + if bus is None: + bus = self._buses[0] + + reply = dbus.lowlevel.ErrorMessage(in_reply_to, name, message) + bus.send_message(reply) + + def attach_to_bus(self, bus): + if not self._buses: + # first-time setup + self._dbus_filter_bound_method = self._dbus_filter + + self._buses.append(bus) + + # Only subscribe to messages on the first bus connection (assumed to + # be the shared session bus connection used by the simulated connection + # manager and most of the test suite), not on subsequent bus + # connections (assumed to represent extra clients). + # + # When we receive a method call on the other bus connections, ignore + # it - the eavesdropping filter installed on the first bus connection + # will see it too. + # + # This is highly counter-intuitive, but it means our messages are in + # a guaranteed order (we don't have races between messages arriving on + # various connections). + if len(self._buses) > 1: + bus.add_message_filter(self._dbus_dev_null) + return + + try: + # for dbus > 1.5 + bus.add_match_string("eavesdrop=true,type='signal'") + except dbus.DBusException: + bus.add_match_string("type='signal'") + bus.add_match_string("type='method_call'") + else: + bus.add_match_string("eavesdrop=true,type='method_call'") + + bus.add_message_filter(self._dbus_filter_bound_method) + + bus.add_signal_receiver( + lambda *args, **kw: + self.append( + Event('dbus-signal', + path=unwrap(kw['path']), + signal=kw['member'], + args=map(unwrap, args), + interface=kw['interface'])), + None, + None, + None, + path_keyword='path', + member_keyword='member', + interface_keyword='interface', + byte_arrays=True, + ) + + def cleanup(self): + if self._buses: + self._buses[0].remove_message_filter(self._dbus_filter_bound_method) + for bus in self._buses[1:]: + bus.remove_message_filter(self._dbus_dev_null) + + self._buses = [] + self._dbus_method_impls = [] + + def _dbus_filter(self, bus, message): + if isinstance(message, dbus.lowlevel.MethodCallMessage): + + destination = message.get_destination() + sender = message.get_sender() + + if (destination == 'org.freedesktop.DBus' or + sender == self._buses[0].get_unique_name()): + # suppress reply and don't make an Event + return dbus.lowlevel.HANDLER_RESULT_HANDLED + + e = Event('dbus-method-call', message=message, + interface=message.get_interface(), path=message.get_path(), + raw_args=message.get_args_list(byte_arrays=True), + args=map(unwrap, message.get_args_list(byte_arrays=True)), + destination=str(destination), + method=message.get_member(), + sender=message.get_sender(), + handled=False) + + for pair in self._dbus_method_impls: + pattern, cb = pair + if pattern.match(e): + cb(e) + e.handled = True + break + + self.append(e) + + return dbus.lowlevel.HANDLER_RESULT_HANDLED + + return dbus.lowlevel.HANDLER_RESULT_NOT_YET_HANDLED + class TestEventQueue(BaseEventQueue): def __init__(self, events): BaseEventQueue.__init__(self) -- 1.8.4.rc3