From 69a0ae63a1b6dce7cbafbe0490ca55b75cc4c282 Mon Sep 17 00:00:00 2001 From: Guillaume Desmottes Date: Tue, 1 Oct 2013 13:27:16 +0200 Subject: [PATCH 4/6] sync servicetest.py with Gabble https://bugs.freedesktop.org/show_bug.cgi?id=69995 --- tests/twisted/servicetest.py | 287 +++++++++++++++++++++++++++++++++++++++---- 1 file changed, 260 insertions(+), 27 deletions(-) diff --git a/tests/twisted/servicetest.py b/tests/twisted/servicetest.py index b871e1b..c464d8f 100644 --- a/tests/twisted/servicetest.py +++ b/tests/twisted/servicetest.py @@ -1,6 +1,23 @@ +# Copyright (C) 2009 Nokia Corporation +# Copyright (C) 2009-2013 Collabora Ltd. +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, but +# WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library; if not, write to the Free Software +# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA +# 02110-1301 USA """ -Infrastructure code for testing connection managers. +Infrastructure code for testing Telepathy services. """ from twisted.internet import glib2reactor @@ -8,29 +25,55 @@ from twisted.internet.protocol import Protocol, Factory, ClientFactory glib2reactor.install() import sys import time +import os import pprint import unittest -import dbus.glib +import dbus +import dbus.lowlevel +from dbus.mainloop.glib import DBusGMainLoop +DBusGMainLoop(set_as_default=True) from twisted.internet import reactor import constants as cs -tp_name_prefix = 'org.freedesktop.Telepathy' -tp_path_prefix = '/org/freedesktop/Telepathy' +tp_name_prefix = cs.PREFIX +tp_path_prefix = cs.PATH_PREFIX -class Event: +class DictionarySupersetOf (object): + """Utility class for expecting "a dictionary with at least these keys".""" + def __init__(self, dictionary): + self._dictionary = dictionary + def __repr__(self): + return "DictionarySupersetOf(%s)" % self._dictionary + def __eq__(self, other): + """would like to just do: + return set(other.items()).issuperset(self._dictionary.items()) + but it turns out that this doesn't work if you have another dict + nested in the values of your dicts""" + try: + for k,v in self._dictionary.items(): + if k not in other or other[k] != v: + return False + return True + except TypeError: # other is not iterable + return False + +class Event(object): def __init__(self, type, **kw): self.__dict__.update(kw) self.type = type (self.subqueue, self.subtype) = type.split ("-", 1) + def __str__(self): + return '\n'.join([ str(type(self)) ] + format_event(self)) + def format_event(event): ret = ['- type %s' % event.type] - for key in dir(event): + for key in sorted(dir(event)): if key != 'type' and not key.startswith('_'): ret.append('- %s: %s' % ( key, pprint.pformat(getattr(event, key)))) @@ -79,6 +122,14 @@ class EventPattern: class TimeoutError(Exception): pass +class ForbiddenEventOccurred(Exception): + def __init__(self, event): + Exception.__init__(self) + self.event = event + + def __str__(self): + return '\n' + '\n'.join(format_event(self.event)) + class BaseEventQueue: """Abstract event queue base class. @@ -124,13 +175,16 @@ class BaseEventQueue: """ self.forbidden_events.difference_update(set(patterns)) + def unforbid_all(self): + """ + Remove all patterns from the set of forbidden events. + """ + self.forbidden_events.clear() + def _check_forbidden(self, event): for e in self.forbidden_events: if e.match(event): - print "forbidden event occurred:" - for x in format_event(event): - print x - assert False + raise ForbiddenEventOccurred(event) def expect(self, type, **kw): """ @@ -255,6 +309,11 @@ class IteratingEventQueue(BaseEventQueue): def __init__(self, timeout=None): BaseEventQueue.__init__(self, timeout) + self._dbus_method_impls = [] + self._buses = [] + # a message filter which will claim we handled everything + self._dbus_dev_null = \ + lambda bus, message: dbus.lowlevel.HANDLER_RESULT_HANDLED def wait(self, queues=None): stop = [False] @@ -279,6 +338,127 @@ class IteratingEventQueue(BaseEventQueue): else: raise TimeoutError + def add_dbus_method_impl(self, cb, bus=None, **kwargs): + if bus is None: + bus = self._buses[0] + + self._dbus_method_impls.append( + (EventPattern('dbus-method-call', **kwargs), cb)) + + def dbus_emit(self, path, iface, name, *a, **k): + bus = k.pop('bus', self._buses[0]) + assert 'signature' in k, k + message = dbus.lowlevel.SignalMessage(path, iface, name) + message.append(*a, **k) + bus.send_message(message) + + def dbus_return(self, in_reply_to, *a, **k): + bus = k.pop('bus', self._buses[0]) + assert 'signature' in k, k + reply = dbus.lowlevel.MethodReturnMessage(in_reply_to) + reply.append(*a, **k) + bus.send_message(reply) + + def dbus_raise(self, in_reply_to, name, message=None, bus=None): + if bus is None: + bus = self._buses[0] + + reply = dbus.lowlevel.ErrorMessage(in_reply_to, name, message) + bus.send_message(reply) + + def attach_to_bus(self, bus): + if not self._buses: + # first-time setup + self._dbus_filter_bound_method = self._dbus_filter + + self._buses.append(bus) + + # Only subscribe to messages on the first bus connection (assumed to + # be the shared session bus connection used by the simulated connection + # manager and most of the test suite), not on subsequent bus + # connections (assumed to represent extra clients). + # + # When we receive a method call on the other bus connections, ignore + # it - the eavesdropping filter installed on the first bus connection + # will see it too. + # + # This is highly counter-intuitive, but it means our messages are in + # a guaranteed order (we don't have races between messages arriving on + # various connections). + if len(self._buses) > 1: + bus.add_message_filter(self._dbus_dev_null) + return + + try: + # for dbus > 1.5 + bus.add_match_string("eavesdrop=true,type='signal'") + except dbus.DBusException: + bus.add_match_string("type='signal'") + bus.add_match_string("type='method_call'") + else: + bus.add_match_string("eavesdrop=true,type='method_call'") + + bus.add_message_filter(self._dbus_filter_bound_method) + + bus.add_signal_receiver( + lambda *args, **kw: + self.append( + Event('dbus-signal', + path=unwrap(kw['path']), + signal=kw['member'], + args=map(unwrap, args), + interface=kw['interface'])), + None, + None, + None, + path_keyword='path', + member_keyword='member', + interface_keyword='interface', + byte_arrays=True, + ) + + def cleanup(self): + if self._buses: + self._buses[0].remove_message_filter(self._dbus_filter_bound_method) + for bus in self._buses[1:]: + bus.remove_message_filter(self._dbus_dev_null) + + self._buses = [] + self._dbus_method_impls = [] + + def _dbus_filter(self, bus, message): + if isinstance(message, dbus.lowlevel.MethodCallMessage): + + destination = message.get_destination() + sender = message.get_sender() + + if (destination == 'org.freedesktop.DBus' or + sender == self._buses[0].get_unique_name()): + # suppress reply and don't make an Event + return dbus.lowlevel.HANDLER_RESULT_HANDLED + + e = Event('dbus-method-call', message=message, + interface=message.get_interface(), path=message.get_path(), + raw_args=message.get_args_list(byte_arrays=True), + args=map(unwrap, message.get_args_list(byte_arrays=True)), + destination=str(destination), + method=message.get_member(), + sender=message.get_sender(), + handled=False) + + for pair in self._dbus_method_impls: + pattern, cb = pair + if pattern.match(e): + cb(e) + e.handled = True + break + + self.append(e) + + return dbus.lowlevel.HANDLER_RESULT_HANDLED + + return dbus.lowlevel.HANDLER_RESULT_NOT_YET_HANDLED + class TestEventQueue(BaseEventQueue): def __init__(self, events): BaseEventQueue.__init__(self) @@ -389,17 +569,18 @@ def call_async(test, proxy, method, *args, **kw): kw.update({'reply_handler': reply_func, 'error_handler': error_func}) method_proxy(*args, **kw) -def sync_dbus(bus, q, conn): - # Dummy D-Bus method call - # This won't do the right thing unless the proxy has a unique name. - assert conn.object.bus_name.startswith(':') - root_object = bus.get_object(conn.object.bus_name, '/') - call_async( - q, dbus.Interface(root_object, 'org.freedesktop.DBus.Peer'), 'Ping') - q.expect('dbus-return', method='Ping') +def sync_dbus(bus, q, proxy): + # Dummy D-Bus method call. We can't use DBus.Peer.Ping() because libdbus + # replies to that message immediately, rather than handing it up to + # dbus-glib and thence the application, which means that Ping()ing the + # application doesn't ensure that it's processed all D-Bus messages prior + # to our ping. + call_async(q, dbus.Interface(proxy, 'org.freedesktop.Telepathy.Tests'), + 'DummySyncDBus') + q.expect('dbus-error', method='DummySyncDBus') class ProxyWrapper: - def __init__(self, object, default, others): + def __init__(self, object, default, others={}): self.object = object self.default_interface = dbus.Interface(object, default) self.Properties = dbus.Interface(object, dbus.PROPERTIES_IFACE) @@ -418,12 +599,29 @@ class ProxyWrapper: return getattr(self.default_interface, name) +class ConnWrapper(ProxyWrapper): + def inspect_contact_sync(self, handle): + return self.inspect_contacts_sync([handle])[0] + + def inspect_contacts_sync(self, handles): + h2asv = self.Contacts.GetContactAttributes(handles, [], True) + ret = [] + for h in handles: + ret.append(h2asv[h][cs.ATTR_CONTACT_ID]) + return ret + + def get_contact_handle_sync(self, identifier): + return self.Contacts.GetContactByID(identifier, [])[0] + + def get_contact_handles_sync(self, ids): + return [self.get_contact_handle_sync(i) for i in ids] + def wrap_connection(conn): - return ProxyWrapper(conn, tp_name_prefix + '.Connection', + return ConnWrapper(conn, tp_name_prefix + '.Connection', dict([ (name, tp_name_prefix + '.Connection.Interface.' + name) for name in ['Aliasing', 'Avatars', 'Capabilities', 'Contacts', - 'Presence', 'SimplePresence', 'Requests']] + + 'SimplePresence', 'Requests']] + [('Peer', 'org.freedesktop.DBus.Peer'), ('ContactCapabilities', cs.CONN_IFACE_CONTACT_CAPS), ('ContactInfo', cs.CONN_IFACE_CONTACT_INFO), @@ -432,6 +630,8 @@ def wrap_connection(conn): ('MailNotification', cs.CONN_IFACE_MAIL_NOTIFICATION), ('ContactList', cs.CONN_IFACE_CONTACT_LIST), ('ContactGroups', cs.CONN_IFACE_CONTACT_GROUPS), + ('PowerSaving', cs.CONN_IFACE_POWER_SAVING), + ('Addressing', cs.CONN_IFACE_ADDRESSING), ])) def wrap_channel(chan, type_, extra=None): @@ -447,14 +647,26 @@ def wrap_channel(chan, type_, extra=None): return ProxyWrapper(chan, tp_name_prefix + '.Channel', interfaces) + +def wrap_content(chan, extra=None): + interfaces = { } + + if extra: + interfaces.update(dict([ + (name, tp_name_prefix + '.Call1.Content.Interface.' + name) + for name in extra])) + + return ProxyWrapper(chan, tp_name_prefix + '.Call1.Content', interfaces) + def make_connection(bus, event_func, name, proto, params): cm = bus.get_object( tp_name_prefix + '.ConnectionManager.%s' % name, - tp_path_prefix + '/ConnectionManager/%s' % name) + tp_path_prefix + '/ConnectionManager/%s' % name, + introspect=False) cm_iface = dbus.Interface(cm, tp_name_prefix + '.ConnectionManager') connection_name, connection_path = cm_iface.RequestConnection( - proto, params) + proto, dbus.Dictionary(params, signature='sv')) conn = wrap_connection(bus.get_object(connection_name, connection_path)) return conn @@ -470,6 +682,7 @@ def make_channel_proxy(conn, path, iface): class EventProtocol(Protocol): def __init__(self, queue=None, block_reading=False): self.queue = queue + self.block_reading = block_reading def dataReceived(self, data): if self.queue is not None: @@ -566,6 +779,12 @@ def assertFlagsUnset(flags, value): "expected none of flags %u, but %u are set in %u" % ( flags, masked, value)) +def assertDBusError(name, error): + if error.get_dbus_name() != name: + raise AssertionError( + "expected DBus error named:\n %s\ngot:\n %s\n(with message: %s)" + % (name, error.get_dbus_name(), error.message)) + def install_colourer(): def red(s): return '\x1b[31m%s\x1b[0m' % s @@ -584,12 +803,26 @@ def install_colourer(): self.patterns = patterns def write(self, s): - f = self.patterns.get(s, lambda x: x) - self.fh.write(f(s)) + for p, f in self.patterns.items(): + if s.startswith(p): + self.fh.write(f(p) + s[len(p):]) + return + + self.fh.write(s) sys.stdout = Colourer(sys.stdout, patterns) return sys.stdout -if __name__ == '__main__': - unittest.main() +# this is just to shut up unittest. +class DummyStream(object): + def write(self, s): + if 'CHECK_TWISTED_VERBOSE' in os.environ: + print s, + def flush(self): + pass + +if __name__ == '__main__': + stream = DummyStream() + runner = unittest.TextTestRunner(stream=stream) + unittest.main(testRunner=runner) -- 1.8.3.1