# Copyright (C) 2012 Collabora Ltd. # # This library is free software; you can redistribute it and/or # modify it under the terms of the GNU Lesser General Public # License as published by the Free Software Foundation; either # version 2.1 of the License, or (at your option) any later version. # # This library is distributed in the hope that it will be useful, but # WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public # License along with this library; if not, write to the Free Software # Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA # 02110-1301 USA import dbus import dbus.service from servicetest import sync_dbus, call_async from mctest import exec_test, SimulatedConnection, \ create_fakecm_account, SimulatedChannel, AccountManager import constants as cs # we want a fake channel which might not actually close when Close() # is called. class StubbornChannel(SimulatedChannel): can_close = False def Close(self, e): if self.can_close: SimulatedChannel.Close(self, e) def test(q, bus, mc): params = dbus.Dictionary({"account": "someguy@example.com", "password": "secrecy"}, signature='sv') cm_name_ref, account = create_fakecm_account(q, bus, mc, params) # Enable the account account.Set(cs.ACCOUNT, 'Enabled', True, dbus_interface=cs.PROPERTIES_IFACE) requested_presence = dbus.Struct((dbus.UInt32(2L), dbus.String(u'available'), dbus.String(u''))) account.Set(cs.ACCOUNT, 'RequestedPresence', requested_presence, dbus_interface=cs.PROPERTIES_IFACE) e = q.expect('dbus-method-call', method='RequestConnection', args=['fakeprotocol', params], destination=cs.tp_name_prefix + '.ConnectionManager.fakecm', path=cs.tp_path_prefix + '/ConnectionManager/fakecm', interface=cs.tp_name_prefix + '.ConnectionManager', handled=False) conn = SimulatedConnection(q, bus, 'fakecm', 'fakeprotocol', '_', 'myself') q.dbus_return(e.message, conn.bus_name, conn.object_path, signature='so') q.expect('dbus-method-call', method='Connect', path=conn.object_path, handled=True) conn.StatusChanged(cs.CONN_STATUS_CONNECTED, cs.CONN_STATUS_REASON_NONE) # now that we're connected, time for some fun. channel_properties = dbus.Dictionary({}, signature='sv') channel_properties[cs.CHANNEL + '.ChannelType'] = \ cs.CHANNEL_TYPE_TEXT channel_properties[cs.CHANNEL + '.TargetHandleType'] = \ cs.HT_CONTACT channel_properties[cs.CHANNEL + '.InitiatorHandle'] = 4 channel_properties[cs.CHANNEL + '.TargetID'] = 'foo@bar.com' channel_properties[cs.CHANNEL + '.InitiatorID'] = 'foo@bar.com' channel_properties[cs.CHANNEL + '.Interfaces'] = \ dbus.Array([cs.CHANNEL_IFACE_GROUP], signature='s') channel_properties[cs.CHANNEL + '.TargetHandle'] = 4 channel_properties[cs.CHANNEL + '.Requested'] = False # announce the channel as normal. because it's undispatchable, MC # will try to close it but we'll avoid that by using a # StubbornChannel... chan = StubbornChannel(conn, channel_properties, group=True) chan.announce() sync_dbus(bus, q, mc) # now re-announce the channel even though it's not actually closed # since the last time it was announced. chan.announced = False chan.announce() # see if MC is still alive account.Properties.GetAll(cs.ACCOUNT) # let the channel close now so the test can be cleaned up properly chan.can_close = True if __name__ == '__main__': exec_test(test, {})