From d468f72aee5a38a877791888b17af5f4360d2b37 Mon Sep 17 00:00:00 2001 From: Martin Pitt Date: Tue, 1 Dec 2009 22:55:43 +0100 Subject: [PATCH 5/5] =?UTF-8?q?Bug=2024446=20=E2=80=94=20Add=20a=20test=20suite?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add a test suite which checks operations on a fake device (currently a RAID-0 md built from a RAM disk). This is useful for regression and system/distro integration testing, since it will only succeed if working versions of cryptsetup, udev rules, etc. are installed. Current coverage: - tests local build tree or system binaries, depending from where it is run - devkit-disks --show-info - create/relabel/mount/unmount/fsck/open files/take ownership for supported file systems (ext[234], minix, xfs, ntfs, vfat, swap), with both command line tools (and DK-d detection) as well as DK-D D-Bus interface - LUKS create/teardown/mount/unmount/change password - Partition create/flags/modify/delete for MBR and GPT (APM tests are present, but disabled) - SMART status/real HD/simulate - LVM single LV/single LV with RAID-1 - Manager functionality: Enumerate*, FindDeviceBy*, Inhibition --- testsuite | 1214 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 files changed, 1214 insertions(+), 0 deletions(-) create mode 100755 testsuite diff --git a/testsuite b/testsuite new file mode 100755 index 0000000..fe6b4fa --- /dev/null +++ b/testsuite @@ -0,0 +1,1214 @@ +#!/usr/bin/python +# +# udisks test suite +# Run in udisks built tree to test local built binaries (needs +# --libexecdir=`pwd`/src/helpers --localstatedir=/var), or from anywhere else +# to test system installed binaries. +# +# Usage: +# - Run all tests: +# ./testsuite +# - Run only a particular class of tests: +# ./testsuite Luks +# - Run only a single test: +# ./testsuite FS.text_ext3 + +# Copyright: (C) 2009 Martin Pitt +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. + +# TODO: +# - Add hotplug stresstest: stop/rebuild array with some partitions in a loop, +# and check added/removed devices and emitted signals +# - Add LUKS stresstest (was racy in the past) +# - Test LinuxMd* D-BUS interface + +import subprocess +import os +import unittest +import sys +import tempfile +import atexit +import time +import shutil +import dbus +import signal +import stat +import optparse + +test_ram_dev1 = '/dev/ram8' +test_ram_dev2 = '/dev/ram9' +test_ram_dev3 = '/dev/ram10' +test_md_dev = '/dev/md125' + +I_D = 'org.freedesktop.UDisks.Device' + +# ---------------------------------------------------------------------------- + +class UDisksTestCase(unittest.TestCase): + '''Base class for udisks test cases. + + This provides static functions which are useful for all test cases. + ''' + + tool_path = None + daemon = None + daemon_log = None + device = None + + manager_obj = None + manager_props = None + + @classmethod + def init(klass, logfile=None): + '''start daemon and set up test environment''' + + assert os.geteuid() == 0, 'need to be root for running this' + + # ensure that we can use test_ram_dev* for testing + for r in (test_ram_dev1, test_ram_dev2, test_ram_dev3): + assert os.path.exists(r), r + ' does not exist -- you might need to load the rd module?' + assert stat.S_ISBLK(os.stat(r).st_mode), r + ' is not a block device' + assert r not in open('/proc/mounts').read(), r + ' is already in use' + subprocess.call(['dd', 'if=/dev/zero', 'of='+r], + stderr=subprocess.PIPE) + + assert test_md_dev not in open('/proc/mdstat').read(), test_md_dev + 'is already in use' + + klass.device = test_md_dev + assert subprocess.call(['mdadm', '--create', test_md_dev, '--force', '-n', + '1', '-l', 'raid0', test_ram_dev1]) == 0 + + # start with a clean slate: zero out device + klass.zero_device() + + # run from local build tree if we are in one, otherwise use system instance + if (os.access ('src/udisks-daemon', os.X_OK)): + daemon_path = 'src/udisks-daemon' + klass.tool_path = 'tools/udisks' + print 'Testing binaries from local build tree' + else: + print 'Testing installed system binaries' + daemon_path = None + for l in open('/usr/share/dbus-1/system-services/org.freedesktop.UDisks.service'): + if l.startswith('Exec='): + daemon_path = l.split('=', 1)[1].strip() + break + assert daemon_path, 'could not determine daemon path from D-BUS .service file' + + klass.tool_path = 'udisks' + + print 'daemon path:', daemon_path + + # start daemon + if logfile: + klass.daemon_log = open(logfile, 'w') + else: + klass.daemon_log = tempfile.TemporaryFile() + klass.daemon = subprocess.Popen([daemon_path, '--replace'], + stdout=klass.daemon_log, stderr=subprocess.STDOUT) + assert klass.daemon.pid, 'daemon failed to start' + time.sleep(0.5) # give it some time to settle + + atexit.register(klass.cleanup) + + obj = dbus.SystemBus().get_object('org.freedesktop.UDisks', + '/org/freedesktop/UDisks') + klass.manager_iface = dbus.Interface(obj, 'org.freedesktop.UDisks') + klass.manager_props = dbus.Interface(obj, dbus.PROPERTIES_IFACE) + + @classmethod + def cleanup(klass): + '''stop daemon again and clean up test environment''' + + subprocess.call(['umount', test_md_dev], stderr=subprocess.PIPE) # if a test failed + subprocess.call(['mdadm', '-S', test_md_dev]) + klass.device = None + + os.kill(klass.daemon.pid, signal.SIGTERM) + os.wait() + klass.daemon = None + + @classmethod + def sync(klass): + '''Wait until pending events finished processing.''' + + subprocess.call(['udevadm', 'settle']) + + @classmethod + def zero_device(klass): + subprocess.call(['dd', 'if=/dev/zero', 'of='+klass.device], + stderr=subprocess.PIPE) + klass.sync() + + @classmethod + def devname(klass, partition=None): + '''Get name of test device or one of its partitions''' + + if partition: + return klass.device + 'p' + str(partition) + else: + return klass.device + + @classmethod + def partition_obj(klass, partition=None): + '''Get D-Bus object of test device or one of its partitions''' + + p = '/org/freedesktop/UDisks/devices/' + \ + os.path.basename(klass.devname(partition)) + if partition: + p += 'p' + str(partition) + + return dbus.SystemBus().get_object('org.freedesktop.UDisks', p) + + @classmethod + def partition_iface(klass, partition=None): + '''Get D-Bus Disks interface of test device or one of its partitions''' + + return dbus.Interface(klass.partition_obj(partition), I_D) + + @classmethod + def partition_props(klass, partition=None): + '''Get D-Bus Disks properties of test device or one of its partitions''' + + return dbus.Interface(klass.partition_obj(partition), + dbus.PROPERTIES_IFACE) + + @classmethod + def get_info(klass, partition=None, devname=None): + '''Return udisks --info in a dictionary. + + If no partition number is given, this queries device. If devname is + given, info for that is returned instead. + ''' + info = subprocess.Popen([klass.tool_path, '--show-info', + devname or klass.devname(partition)], + stdout=subprocess.PIPE) + out = info.communicate()[0] + assert info.returncode == 0, 'udisks --info failed' + + props = {} + prefix = '' + for l in out.splitlines(): + if not l.startswith(' ') or not ':' in l: + continue + + if l.startswith(' linux md:'): + prefix = 'md_' + continue + elif l.startswith(' partition table:'): + prefix = 'partition_' + continue + if prefix and not l.startswith(' '): + prefix = '' + + (k, v) = l.split(':', 1) + props[prefix + k.strip()] = v.strip() + + return props + + @classmethod + def get_uuid(klass, partition=None): + '''Use blkid to determine UUID.''' + + uuid = None + blkid = subprocess.Popen(['blkid', '-p', '-o', 'udev', + klass.devname(partition)], stdout=subprocess.PIPE) + for l in blkid.stdout: + if l.startswith('ID_FS_UUID='): + uuid = l.split('=', 1)[1].strip() + assert blkid.wait() == 0 + return uuid + + @classmethod + def get_partitions(klass): + '''Return list of test device partitions known to udisks.''' + + info = subprocess.Popen([klass.tool_path, '--enumerate-device-files'], + stdout=subprocess.PIPE) + out = info.communicate()[0] + assert info.returncode == 0, 'udisks --enumerate-device-files failed' + + partitions = [] + for l in out.splitlines(): + l = l.strip() + if l.startswith(klass.device) and l != klass.device: + partitions.append(l[len(klass.device):]) + return partitions + + @classmethod + def mkfs(klass, type, label=None, partition=None): + '''Create file system using mkfs.''' + + no_stderr = False + if type == 'vfat': + cmd = ['mkfs.vfat', '-F', '32'] + if label: + cmd += ['-n', label] + elif type == 'reiserfs': + cmd = ['mkfs.reiserfs', '-q'] + if label: + cmd += ['-l', label] + no_stderr = True + elif type == 'minix': + assert label is None, 'minix does not support labels' + cmd = ['mkfs.minix'] + elif type == 'swap': + cmd = ['mkswap', '-f'] + if label: + cmd += ['-L', label] + else: + cmd = ['mkfs.' + type, '-q'] + if label: + cmd += ['-L', label] + + if type == 'xfs': + # XFS complains if there's an existing FS, so --force + cmd.append('-f') + + cmd.append(klass.devname(partition)) + + if no_stderr: + assert subprocess.call(cmd, stdout=subprocess.PIPE, + stderr=subprocess.PIPE) == 0 + else: + assert subprocess.call(cmd, stdout=subprocess.PIPE) == 0 + + # kernel/udev generally detect those changes itself, but do not quite + # tell us when they are done; so do a little kludge here to know how + # long we need to wait + subprocess.call(['udevadm', 'trigger', '--action=change', + '--sysname-match=' + os.path.basename(klass.devname(partition))]) + klass.sync() + + @classmethod + def fs_create(klass, partition, type, options): + '''Call FilesystemCreate() on partition with given type and options.''' + + klass.partition_iface(partition).FilesystemCreate(type, options) + # .FilesystemCreate() already blocks until the mkfs job is done; but + # without udevsettle the property updating is racy + klass.sync() + + @classmethod + def retry_busy(klass, fn, *args): + '''Call a function until it does not fail with "Busy".''' + + timeout = 10 + while timeout >= 0: + try: + return fn(*args) + except dbus.DBusException, e: + if e._dbus_error_name != 'org.freedesktop.UDisks.Error.Busy': + raise + print >> sys.stderr, '[busy] ', + time.sleep(0.3) + timeout -= 1 + +# ---------------------------------------------------------------------------- + +class FS(UDisksTestCase): + '''Test detection of all supported file systems''' + + def setUp(self): + self.workdir = tempfile.mkdtemp() + + def tearDown(self): + if subprocess.call(['umount', self.device], stderr=subprocess.PIPE) == 0: + print >> sys.stderr, '[cleanup unmount] ', + shutil.rmtree (self.workdir) + + def test_zero(self): + '''properties of zeroed out md device''' + + self.zero_device() + info = self.get_info() + self.assertEqual(info['is mounted'], '0') + self.assertEqual(info['mount paths'], '') + self.assertEqual(info['presentation hide'], '0') + self.assertEqual(info['presentation name'], '') + self.assertEqual(info['usage'], '') + self.assertEqual(info['type'], '') + self.assertEqual(len(info['md_uuid']), 35) + self.assertEqual(info['uuid'], '') + self.assertEqual(info['label'], '') + + self.assertEqual(self.get_partitions(), []) + + def test_ext2(self): + '''fs: ext2''' + self._do_fs_check('ext2') + + def test_ext3(self): + '''fs: ext3''' + self._do_fs_check('ext3') + + def test_ext4(self): + '''fs: ext4''' + self._do_fs_check('ext4') + + def test_minix(self): + '''fs: minix''' + self._do_fs_check('minix') + + def test_xfs(self): + '''fs: XFS''' + self._do_fs_check('xfs') + + def test_ntfs(self): + '''fs: NTFS''' + self._do_fs_check('ntfs') + + def test_vfat(self): + '''fs: FAT''' + self._do_fs_check('vfat') + + def test_reiserfs(self): + '''fs: reiserfs''' + self._do_fs_check('reiserfs') + + def test_swap(self): + '''fs: swap''' + self._do_fs_check('swap') + + def _do_fs_check(self, type): + '''Run checks for a particular file system.''' + + if type != 'swap' and subprocess.call(['which', 'mkfs.' + type], + stdout=subprocess.PIPE) != 0: + print >> sys.stderr, '[no mkfs.%s, skip] ' % type, + + # check correct D-Bus exception + try: + self.fs_create(None, type, []) + self.fail('Expected failure for missing mkfs.' + type) + except dbus.DBusException, e: + self.assertEqual(e._dbus_error_name, + 'org.freedesktop.UDisks.Error.FilesystemToolsMissing', + str(e)) + + return + + # do checks with command line tools (mkfs/mount/umount) + print >> sys.stderr, '[cli]', + + self._do_mkfs_check(type) + if type != 'minix': + self._do_mkfs_check(type, 'test%stst' % type) + + # put a different fs here instead of zeroing, so that we verify that + # DK-D overrides existing FS (e. g. XFS complains then), and does not + # leave traces of other FS around + if type == 'ext3': + self.mkfs('swap') + else: + self.mkfs('ext3') + + # do checks with DK-Disks D-BUS operations + print >> sys.stderr, '[ud] ', + self._do_dbus_fs_check(type) + if type != 'minix': + self._do_dbus_fs_check(type, 'test%stst' % type) + + def _do_mkfs_check(self, type, label=None): + '''Run mkfs/mount/umount check for a fs and label. + + This checks that DK-Disks correctly picks up command line too actions. + ''' + self.mkfs(type, label) + i = self.get_info() + + self.assertEqual(i['usage'], (type == 'swap') and 'other' or 'filesystem') + + self.assertEqual(i['type'], type) + self.assertEqual(i['label'], label or '') + if type != 'swap': + self.assertEqual(i['is mounted'], '0') + self.assertEqual(i['mount paths'], '') + self.assertEqual(i['presentation name'], '') + if type != 'minix': + self.assertEqual(i['uuid'], self.get_uuid()) + + if type == 'swap': + return + + # mount it using "mount" + if type == 'ntfs' and subprocess.call(['which', 'mount.ntfs-3g'], + stdout=subprocess.PIPE) == 0: + # prefer mount.ntfs-3g if we have it (on Debian; Ubuntu + # defaults to ntfs-3g if installed); TODO: check other distros + mount_prog = 'mount.ntfs-3g' + else: + mount_prog = 'mount' + ret = subprocess.call([mount_prog, self.device, self.workdir]) + + if ret == 32: + # missing fs driver + print >> sys.stderr, '[missing kernel driver, skip] ', + return + + self.assertEqual(ret, 0) + i = self.get_info() + self.assertEqual(i['is mounted'], '1') + self.assertEqual(i['mount paths'], self.workdir) + + # unmount it using "umount" + subprocess.call(['umount', self.workdir]) + i = self.get_info() + self.assertEqual(i['is mounted'], '0') + self.assertEqual(i['mount paths'], '') + + def _do_dbus_fs_check(self, type, label=None): + '''Run DK-D FSCreate/Mount/Unmount check for a fs and label. + + This checks the D-Bus methods that DK-Disks offers. + ''' + # check that DK-disks reports the fs as supported + for fs in self.manager_props.Get('org.freedesktop.UDisks', + 'KnownFilesystems'): + if fs[0] == type: + supports_unix_owners = fs[2] + self.assert_(supports_unix_owners in (True, False)) + self.assertEqual(fs[3], type != 'swap') # can_mount + self.assert_(fs[4]) # can_create + supports_label_rename = fs[6] + # minix does not support labels; EXFAIL: swap doesn't have a program for it + self.assertEqual(supports_label_rename, type not in ('minix', 'swap')) + break + else: + self.fail('KnownFilesystems does not contain ' + type) + + options = [] + if label: + options.append('label=' + label) + + # create fs + self.fs_create(None, type, options) + i = self.get_info() + + self.assertEqual(i['usage'], (type == 'swap') and 'other' or 'filesystem') + self.assertEqual(i['type'], type) + self.assertEqual(i['label'], label or '') + if type != 'swap': + self.assertEqual(i['is mounted'], '0') + self.assertEqual(i['mount paths'], '') + self.assertEqual(i['presentation name'], '') + if type != 'minix': + self.assertEqual(i['uuid'], self.get_uuid()) + + # open files when unmounted + iface = self.partition_iface() + self.assertRaises(dbus.DBusException, + iface.FilesystemListOpenFiles) + + if type != 'swap': + # mount + try: + mount_path = iface.FilesystemMount('', []) + except dbus.DBusException, e: + self.assertEqual(e._dbus_error_name, + 'org.freedesktop.UDisks.Error.FilesystemDriverMissing', + str(e)) + print >> sys.stderr, '[missing kernel driver, skip] ', + return + + if label: + self.assertEqual(mount_path, '/media/' + label) + else: + self.assert_(mount_path.startswith('/media/')) + i = self.get_info() + self.assertEqual(i['is mounted'], '1') + self.assertEqual(i['mount paths'], mount_path) + + # no ownership taken, should be root owned + st = os.stat(mount_path) + self.assertEqual((st.st_uid, st.st_gid), (0, 0)) + + # open files when mounted + self.assertEqual(iface.FilesystemListOpenFiles(), []) + + f = open(os.path.join(mount_path, 'test.txt'), 'w') + result = iface.FilesystemListOpenFiles() + self.assertEqual(len(result), 1) + self.assertEqual(result[0][0], os.getpid()) + self.assertEqual(result[0][1], os.geteuid()) + self.assert_(sys.argv[0] in result[0][2]) + f.close() + self.assertEqual(iface.FilesystemListOpenFiles(), []) + + # unmount + self.retry_busy(self.partition_iface().FilesystemUnmount, []) + + i = self.get_info() + self.assertEqual(i['is mounted'], '0') + self.assertEqual(i['mount paths'], '') + + # create fs with taking ownership (daemon:mail == 1:8) + if supports_unix_owners: + options.append('take_ownership_uid=1') + options.append('take_ownership_gid=8') + self.fs_create(None, type, options) + mount_path = iface.FilesystemMount('', []) + st = os.stat(mount_path) + self.assertEqual((st.st_uid, st.st_gid), (1, 8)) + self.retry_busy(self.partition_iface().FilesystemUnmount, []) + + # change label + if supports_label_rename: + l = 'rename' + type + iface.FilesystemSetLabel(l) + self.sync() + i = self.get_info() + if type == 'vfat': + # EXFAIL: often (but not always) the label appears in all upper case + self.assertEqual(i['label'].upper(), l.upper()) + else: + self.assertEqual(i['label'], l) + else: + self.assertRaises(dbus.DBusException, iface.FilesystemSetLabel, 'foo') + + # check fs + self.assertEqual(iface.FilesystemCheck([]), True) + + +# ---------------------------------------------------------------------------- + +class Luks(UDisksTestCase): + '''Check LUKS.''' + + def test_0_create_teardown(self): + '''LUKS create/teardown''' + + self.fs_create(None, 'ext3', ['luks_encrypt=s3kr1t', 'label=treasure']) + + try: + # check crypted device info + i = self.get_info() + self.assertEqual(i['usage'], 'crypto') + self.assertEqual(i['type'], 'crypto_LUKS') + self.assertEqual(i['label'], '') # encrypted device + self.assertEqual(i['is mounted'], '0') + self.assertEqual(i['mount paths'], '') + self.assertEqual(i['presentation name'], '') + self.assert_(i['holder'].startswith('/org/freedesktop/UDisks/devices/')) + self.assertEqual(i['uuid'], self.get_uuid()) + + # check crypted device properties + crypt_props = self.partition_props() + self.assertEqual(crypt_props.Get(I_D, 'DeviceIsLuks'), True) + self.assertEqual(crypt_props.Get(I_D, 'DeviceIsLuksCleartext'), False) + self.assertEqual(crypt_props.Get(I_D, 'LuksHolder'), i['holder']) + + # check cleartext device properties + clear_obj = dbus.SystemBus().get_object('org.freedesktop.UDisks', i['holder']) + clear_props = dbus.Interface(clear_obj, dbus.PROPERTIES_IFACE) + self.assertEqual(clear_props.Get(I_D, 'DeviceIsLuks'), False) + self.assertEqual(clear_props.Get(I_D, 'DeviceIsLuksCleartext'), True) + self.assertEqual(clear_props.Get(I_D, 'LuksCleartextUnlockedByUid'), 0) + self.assert_(clear_props.Get(I_D, 'LuksCleartextSlave').endswith('/' + + os.path.basename(self.device))) + + # check cleartext device info + self.assert_(i['holder'] in self.manager_iface.EnumerateDevices()) + clear_devname = clear_props.Get(I_D, 'DeviceFile') + ci = self.get_info(devname=clear_devname) + self.assert_(os.path.exists(clear_devname)) + self.assertEqual(ci['usage'], 'filesystem') + self.assertEqual(ci['type'], 'ext3') + self.assertEqual(ci['label'], 'treasure') + self.assertEqual(ci['is mounted'], '0') + self.assertEqual(ci['mount paths'], '') + self.assertEqual(ci['presentation name'], '') + + finally: + # tear down cleartext device + self.partition_iface().LuksLock([]) + self.failIf(i['holder'] in self.manager_iface.EnumerateDevices()) + self.failIf(os.path.exists(clear_devname)) + self.assertRaises(dbus.DBusException, clear_props.Get, I_D, 'DeviceFile') + + def test_luks_mount(self): + '''LUKS mount/unmount''' + + # wrong password + self.assertRaises(dbus.DBusException, + self.partition_iface().LuksUnlock, 'h4ck3rz', []) + + # correct password + clear_objpath = self.retry_busy(self.partition_iface().LuksUnlock, 's3kr1t', []) + self.assert_(clear_objpath in self.manager_iface.EnumerateDevices()) + + clear_obj = dbus.SystemBus().get_object('org.freedesktop.UDisks', + clear_objpath) + clear_props = dbus.Interface(clear_obj, dbus.PROPERTIES_IFACE) + clear_iface = dbus.Interface(clear_obj, I_D) + + # mount + mount_path = clear_iface.FilesystemMount('', []) + clear_devname = clear_props.Get(I_D, 'DeviceFile') + self.assertEqual(mount_path, '/media/treasure') + + i = self.get_info(devname=clear_devname) + self.assertEqual(i['is mounted'], '1') + self.assertEqual(i['mount paths'], mount_path) + + # can't lock, busy + self.assertRaises(dbus.DBusException, self.partition_iface().LuksLock, []) + + # umount + self.retry_busy(clear_iface.FilesystemUnmount, []) + i = self.get_info() + self.assertEqual(i['is mounted'], '0') + self.assertEqual(i['mount paths'], '') + + # lock + self.partition_iface().LuksLock([]) + self.failIf(clear_objpath in self.manager_iface.EnumerateDevices()) + + def test_luks_change_passphrase(self): + '''LUKS change passphrase''' + + # wrong password + self.assertRaises(dbus.DBusException, + self.partition_iface().LuksChangePassphrase, 'h4ck3rz', 'foo') + self.assertEqual(self.partition_props().Get(I_D, 'LuksHolder'), '/', + 'changing passphrase does not unlock') + + # correct password + self.partition_iface().LuksChangePassphrase('s3kr1t', 'cl4ss1f13d') + holder = self.partition_props().Get(I_D, 'LuksHolder') + self.assertEqual(holder, '/', + 'changing passphrase does not unlock: ' + holder) + + # old password is invalid now + self.assertRaises(dbus.DBusException, + self.partition_iface().LuksUnlock, 's3kr1t', []) + + # new password is accepted + self.partition_iface().LuksUnlock('cl4ss1f13d', []) + self.partition_iface().LuksLock([]) + + # change it back so that order of tests does not matter + self.partition_iface().LuksChangePassphrase('cl4ss1f13d', 's3kr1t') + + self.sync() + +# ---------------------------------------------------------------------------- + +class Partitions(UDisksTestCase): + '''Check partition operations.''' + + def setUp(self): + self.zero_device() + self.assertEqual(self.get_partitions(), []) + + info = self.get_info() + self.assertEqual([k for k in info if k.startswith('partition_')], []) + self.assertEqual(self.fdisk_list(), None) + + def tearDown(self): + self.partition_iface().PartitionTableCreate('none', []) + info = self.get_info() + self.assertEqual([k for k in info if k.startswith('partition_')], []) + self.assertEqual(self.fdisk_list(), None) + + def test_mbr(self): + '''Partitions: mbr''' + + self._do_schema('mbr', '0x82', '0x06', 'boot') + + def test_gpt(self): + '''Partitions: GUID''' + + self._do_schema('gpt', 'EBD0A0A2-B9E5-4433-87C0-68B6B72699C7', + '0657FD6D-A4AB-43C4-84E5-0933C84B4F4F', 'required') + + # TODO: fails in various ways + def disabled_test_apm(self): + '''Partitions: Apple''' + + try: + self._do_schema('apm', 'Apple_Unix_SVR2', 'Foo', 'allow_write', + exp_default_partitions=2) # Apple creates bootstrap stuff by default + self.fail('creating apple partition at offset 0 should fail due to default bootstrap partitions') + except dbus.DBusException, e: + self.assert_("Can't have overlapping partitions." in str(e)) + + self._do_schema('apm', 'Apple_Unix_SVR2', 'Foo', 'allow_write', + exp_default_partitions=2, # Apple creates bootstrap stuff by default + first_offset=3000000) + + def _do_schema(self, schema, type1, type2, flag, + exp_default_partitions=0, first_offset=0): + '''Run tests for a particular schema/type''' + + # create partition table + self.partition_iface().PartitionTableCreate(schema, []) + self.sync() + + info = self.get_info() + self.assertEqual(info['partition_scheme'], schema) + self.assertEqual(info['partition_count'], str(exp_default_partitions)) + self.assertEqual(len(self.get_partitions()), exp_default_partitions) + + if schema == 'mbr': + self.assertEqual(self.fdisk_list(), []) + + # check device object properties + props = self.partition_props() + self.assertEqual(props.Get(I_D, 'DeviceIsPartition'), False) + self.assertEqual(props.Get(I_D, 'DeviceIsPartitionTable'), True) + self.assertEqual(props.Get(I_D, 'PartitionTableScheme'), schema) + self.assertEqual(props.Get(I_D, 'PartitionTableCount'), exp_default_partitions) + + # p1: non-flagged, no fs + p1 = self.partition_iface().PartitionCreate(first_offset, 10000000, type1, + '', [], [], '', []) + self.assert_(p1 in self.manager_iface.EnumerateDevices()) + + if schema == 'mbr': + fdisk = self.fdisk_list() + self.assertEqual(len(fdisk), 1) + fdisk = fdisk[0] + self.assert_(os.path.exists(fdisk[0]), 'p1: device file does not exist') + self.assertEqual(fdisk[1], False, 'p1 is bootable') + self.assertEqual(fdisk[5], type1.lstrip('0x')) + + # EXFAIL: /dev/md5p1 still works and appears, but kernel/udev never + # create /dev/md5p2 and following + #self.partition_iface().PartitionCreate(0, 10000000, type2, '', + # [flag], [], '', []) + #print self.fdisk_list() + + # the device is not a partition, so calls should fail + self.assertRaises(dbus.DBusException, + self.partition_iface().PartitionDelete, []) + self.assertRaises(dbus.DBusException, + self.partition_iface().PartitionModify, type2, '', [flag]) + + # check p1 object properties + p1_obj = dbus.SystemBus().get_object('org.freedesktop.UDisks', + p1) + p1_iface = dbus.Interface(p1_obj, I_D) + p1_props = dbus.Interface(p1_obj, dbus.PROPERTIES_IFACE) + + self.assertEqual(p1_props.Get(I_D, 'DeviceIsPartition'), True) + self.assertEqual(p1_props.Get(I_D, 'DeviceIsPartitionTable'), False) + self.assertEqual(p1_props.Get(I_D, 'PartitionSlave'), + '/org/freedesktop/UDisks/devices/' + + os.path.basename(self.device)) + self.assertEqual(p1_props.Get(I_D, 'PartitionScheme'), schema) + self.assertEqual(p1_props.Get(I_D, 'PartitionType'), type1) + self.assertEqual(p1_props.Get(I_D, 'PartitionLabel'), (schema == 'apm' and 'untitled' or '')) + self.assertEqual(p1_props.Get(I_D, 'PartitionFlags'), []) + self.assertEqual(p1_props.Get(I_D, 'PartitionNumber'), (schema == 'apm' and 2 or 1)) + self.assertEqual(p1_props.Get(I_D, 'IdUsage'), '') + self.assertEqual(p1_props.Get(I_D, 'IdType'), '') + off = p1_props.Get(I_D, 'PartitionOffset') + self.assert_(off >= first_offset and off <= first_offset+20000) + size = p1_props.Get(I_D, 'PartitionSize') + self.assert_(size >= 9500000 and off <= 10500000) + self.assertEqual(props.Get(I_D, 'PartitionTableCount'), exp_default_partitions + 1) + + # modify + p1_iface.PartitionModify(type2, '', [flag]) + self.sync() + self.assertEqual(p1_props.Get(I_D, 'PartitionType'), type2) + self.assertEqual(p1_props.Get(I_D, 'PartitionFlags'), [flag]) + + # delete + p1_iface.PartitionDelete([]) + self.sync() + self.assertRaises(dbus.DBusException, p1_iface.PartitionModify, type2, '', [flag]) + self.assertRaises(dbus.DBusException, p1_props.Get, I_D, 'PartitionType') + + self.failIf(p1 in self.manager_iface.EnumerateDevices()) + self.assertEqual(props.Get(I_D, 'PartitionTableCount'), 0) + + if schema == 'mbr': + self.failIf(os.path.exists(fdisk[0]), 'p1: device file still exists') + self.assertEqual(self.fdisk_list(), []) + + # recreate p1: flagged, with fs + p1 = self.partition_iface().PartitionCreate(0, 10000000, type1, + '', [flag], [], 'ext3', ['label=e3part']) + self.sync() + self.assert_(p1 in self.manager_iface.EnumerateDevices()) + + p1_obj = dbus.SystemBus().get_object('org.freedesktop.UDisks', + p1) + p1_props = dbus.Interface(p1_obj, dbus.PROPERTIES_IFACE) + self.assertEqual(p1_props.Get(I_D, 'DeviceIsPartition'), True) + self.assertEqual(p1_props.Get(I_D, 'PartitionType'), type1) + self.assertEqual(p1_props.Get(I_D, 'PartitionFlags'), [flag]) + self.assertEqual(p1_props.Get(I_D, 'PartitionNumber'), 1) + self.assertEqual(p1_props.Get(I_D, 'IdUsage'), 'filesystem') + self.assertEqual(p1_props.Get(I_D, 'IdType'), 'ext3') + self.assertEqual(p1_props.Get(I_D, 'IdLabel'), 'e3part') + + def fdisk_list(self): + '''Parse fdisk -l. + + Return None if device does not have a partition table, or a list of + (device, boot, start, end, blocks, id) tuples. + ''' + env = os.environ + env['LC_ALL'] = 'C' + fdisk = subprocess.Popen(['fdisk', '-l', self.device], env=env, + stdout=subprocess.PIPE, stderr=subprocess.PIPE) + (out, err) = fdisk.communicate() + if err != '': + return None + parts = [] + for l in out.splitlines(): + if l.startswith('/dev/'): + fields = l.split() + if fields[1] == '*': + # boot flag + fields[1] = True + fields = tuple(fields[:6]) + else: + fields = tuple([fields[0], False] + fields[1:5]) + parts.append(fields) + return parts + +# ---------------------------------------------------------------------------- + +hd_smart_blob = None + +class Smart(UDisksTestCase): + '''Check SMART operation.''' + + def test_0_hd_status(self): + '''SMART status of first internal hard disk + + This is a best-effort readonly test. + ''' + hd = '/dev/sda' + + if not os.path.exists(hd): + print >> sys.stderr, '[skip]', + return + + has_smart = subprocess.call(['skdump', '--can-smart', hd], + stdout=subprocess.PIPE, stderr=subprocess.STDOUT) == 0 + + obj = dbus.SystemBus().get_object('org.freedesktop.UDisks', + self.manager_iface.FindDeviceByDeviceFile(hd)) + iface = dbus.Interface(obj, I_D) + props = dbus.Interface(obj, dbus.PROPERTIES_IFACE) + info = self.get_info(devname=hd) + + self.assertEqual(props.Get(I_D, 'DriveAtaSmartIsAvailable'), + has_smart) + + if has_smart: + print >> sys.stderr, '[avail] ', + self.assert_(info['ATA SMART'].startswith('Updated at ')) + self.assertNotEqual(props.Get(I_D, 'DriveAtaSmartTimeCollected'), 0) + global hd_smart_blob + hd_smart_blob = ''.join(map(chr, props.Get(I_D, 'DriveAtaSmartBlob'))) + # this is of course not truly correct for a test suite, but let's + # consider it a courtesy for developers :-) + self.assertEqual(info['overall assessment'], 'Good') + self.assertEqual(props.Get(I_D, 'DriveAtaSmartStatus'), 'GOOD') + + try: + self.partition_iface().DriveAtaSmartInitiateSelftest('bogus', []) + self.fail('bogus mode succeeded') + except dbus.DBusException, e: + self.assertEqual(e._dbus_error_name, 'org.freedesktop.UDisks.Error.Failed') + else: + print >> sys.stderr, '[N/A] ', + self.assertEqual(info['ATA SMART'], 'not available') + self.assertEqual(props.Get(I_D, 'DriveAtaSmartTimeCollected'), 0) + self.assertEqual(props.Get(I_D, 'DriveAtaSmartBlob'), []) + self.failIf('overall assessment' in info) + + try: + self.partition_iface().DriveAtaSmartInitiateSelftest('short', []) + self.fail('device did not report to have SMART capabilities') + except dbus.DBusException, e: + self.assert_('does not support ATA SMART' in str(e)) + + def test_simulate(self): + '''SMART status of simulated data on test device + + This requires SMART being available from the first hard disk, to + collect the blob used for testing. + ''' + global hd_smart_blob + + if not hd_smart_blob: + print >> sys.stderr, '[skip]', + return + + props = self.partition_props() + self.failIf(props.Get(I_D, 'DriveAtaSmartIsAvailable')) + self.assertEqual(props.Get(I_D, 'DriveAtaSmartTimeCollected'), 0) + self.assertEqual(props.Get(I_D, 'DriveAtaSmartBlob'), []) + + # without simulate, DK-disks should complain about absent SMART + try: + self.partition_iface().DriveAtaSmartRefreshData([]) + self.fail('expected "Device does not support ATA SMART"') + except dbus.DBusException, e: + self.assert_('does not support ATA SMART' in str(e)) + try: + self.partition_iface().DriveAtaSmartInitiateSelftest('short', []) + self.fail('fake device is not expected to have SMART capabilities') + except dbus.DBusException, e: + self.assert_('does not support ATA SMART' in str(e)) + + # load the blob + blob_f = tempfile.NamedTemporaryFile() + blob_f.write(hd_smart_blob) + blob_f.flush() + self.partition_iface().DriveAtaSmartRefreshData(['simulate=' + blob_f.name]) + + info = self.get_info() + + self.assertEqual(props.Get(I_D, 'DriveAtaSmartIsAvailable'), True) + + self.assert_(info['ATA SMART'].startswith('Updated at ')) + self.assertNotEqual(props.Get(I_D, 'DriveAtaSmartTimeCollected'), 0) + + self.assertEqual(hd_smart_blob, ''.join(map(chr, props.Get(I_D, 'DriveAtaSmartBlob')))) + self.assertEqual(info['overall assessment'], 'Good') + self.assertEqual(props.Get(I_D, 'DriveAtaSmartStatus'), 'GOOD') + + # tool should have the entire SMART info + tool_info = subprocess.Popen([self.tool_path, '--show-info', + self.device], stdout=subprocess.PIPE) + out = tool_info.communicate()[0] + self.assert_('start-stop-count' in out) + self.assert_('Pre-fail' in out) + + +# ---------------------------------------------------------------------------- + +class LVM(UDisksTestCase): + '''Check LVM devices.''' + + def setUp(self): + '''Create a VG "udtest". + + This uses two RAM disks as PV. + ''' + if subprocess.call(['which', 'pvcreate'], stdout=subprocess.PIPE) != 0: + self.fail('lvm tools not installed') + return + self.assertEqual(subprocess.call(['pvcreate', test_ram_dev2], + stdout=subprocess.PIPE), 0) + self.assertEqual(subprocess.call(['pvcreate', test_ram_dev3], + stdout=subprocess.PIPE), 0) + self.vgname = 'udtest' + self.assertEqual(subprocess.call(['vgcreate', self.vgname, + test_ram_dev2, test_ram_dev3], stdout=subprocess.PIPE), 0) + + def tearDown(self): + '''Remove udtest VG.''' + + self.assertEqual(subprocess.call(['vgremove', '-f', + self.vgname], stdout=subprocess.PIPE), 0) + + def test_single_lv(self): + '''LVM: Single LV, no RAID''' + + objs_old = set(self.manager_iface.EnumerateDevices()) + + self.assertEqual(subprocess.call(['lvcreate', '-n', 'udtestlv1', '-L', + '52M', self.vgname], stdout=subprocess.PIPE), 0) + self.sync() + + # there should be exactly one new device for the LV + objs_new = set(self.manager_iface.EnumerateDevices()) + self.assertEqual(len(objs_old) + 1, len(objs_new)) + lvname = list(objs_new - objs_old)[0] + + lv_props = dbus.Interface(dbus.SystemBus().get_object( + 'org.freedesktop.UDisks', lvname), dbus.PROPERTIES_IFACE) + + # the LV is a real volume which should be shown, but not automounted + self.assert_(lv_props.Get(I_D, 'DeviceFile').startswith('/dev/mapper/')) + self.assertEqual(lv_props.Get(I_D, 'DevicePresentationHide'), False) + self.assertEqual(lv_props.Get(I_D, 'DevicePresentationNopolicy'), True) + + # ensure that we have a UUID + found_uuid = False + for i in lv_props.Get(I_D, 'DeviceFileById'): + if 'uuid-LVM' in i: + found_uuid = True + self.assert_(found_uuid, 'no by-uuid found in ' + str(i)) + + def test_single_lv_raid(self): + '''LVM: Single LV, RAID-1''' + + objs_old = set(self.manager_iface.EnumerateDevices()) + + self.assertEqual(subprocess.call(['lvcreate', '-n', 'udtestlvr1', '-L', + '50M', '-m', '1', '--mirrorlog', 'core', self.vgname], + stdout=subprocess.PIPE), 0) + self.sync() + + # there should be two new shadow devices for the RAID images, and one + # real LV + objs_new = set(self.manager_iface.EnumerateDevices()) + self.assertEqual(len(objs_old) + 3, len(objs_new)) + lv_objs = objs_new - objs_old + + #subprocess.call(['bash', '-i']) + + # find the real one; TODO: is this nameing scheme right on all distros? + devname = '/dev/mapper/%s-udtestlvr1' % self.vgname + real_lv_obj = self.manager_iface.FindDeviceByDeviceFile(devname) + self.assert_(real_lv_obj in lv_objs) + + # put a file system onto it, for testing properties + iface = dbus.Interface(dbus.SystemBus().get_object( + 'org.freedesktop.UDisks', real_lv_obj), I_D) + iface.FilesystemCreate('ext3', []) + self.sync() + + for o in lv_objs: + props = dbus.Interface(dbus.SystemBus().get_object( + 'org.freedesktop.UDisks', o), dbus.PROPERTIES_IFACE) + + if o == real_lv_obj: + self.assert_(props.Get(I_D, 'DeviceFile').startswith('/dev/mapper/')) + # never hide the real LV + self.assertEqual(props.Get(I_D, 'DevicePresentationHide'), False) + self.assertEqual(props.Get(I_D, 'IdUsage'), 'filesystem') + self.assertEqual(props.Get(I_D, 'IdType'), 'ext3') + + # ensure that we have a UUID + found_uuid = False + for i in props.Get(I_D, 'DeviceFileById'): + if 'uuid-LVM' in i: + found_uuid = True + self.assert_(found_uuid, 'no by-uuid found in ' + str(i)) + else: + # mirror images should not have any real FS usage at all + self.assertEqual(props.Get(I_D, 'IdUsage'), '') + + # mirror images should not have by-* symlinks (avoid probing); + # that is actually the job of the lvm2 udev rules, but check it + # here to ensure proper system integration + self.assertEqual(props.Get(I_D, 'DeviceFileById'), []) + + self.assertEqual(props.Get(I_D, 'DevicePresentationNopolicy'), True) + +# ---------------------------------------------------------------------------- + +class GlobalOps(UDisksTestCase): + '''Check various global operations.''' + + def test_daemon_version(self): + '''DaemonVersion property''' + + ver = self.manager_props.Get('org.freedesktop.UDisks', + 'DaemonVersion') + self.assertEqual(type(ver), dbus.String) + self.assert_(len(ver) > 0) + + def test_enumerate_devices(self): + '''EnumerateDevices()''' + + devs = self.manager_iface.EnumerateDevices() + self.assert_(len(devs) > 1) # at least our test device and root fs + self.assert_('/org/freedesktop/UDisks/devices/' + + os.path.basename(self.device) in devs) + + def test_enumerate_device_files(self): + '''EnumerateDeviceFiles()''' + + devs = self.manager_iface.EnumerateDeviceFiles() + self.assert_(len(devs) > 1) # at least our test device and root fs + self.assert_(self.device in devs) + + def test_find_by_devpath(self): + '''FindDeviceByDeviceFile()''' + + self.assertEqual( + self.manager_iface.FindDeviceByDeviceFile(self.device), + '/org/freedesktop/UDisks/devices/' + + os.path.basename(self.device)) + + self.assertRaises(dbus.DBusException, + self.manager_iface.FindDeviceByDeviceFile, '/dev/nonexisting') + + def test_find_by_major_minor(self): + '''FindDeviceByMajorMinor()''' + + st = os.stat(self.device) + dev = self.manager_iface.FindDeviceByMajorMinor(os.major(st.st_rdev), + os.minor(st.st_rdev)) + self.assertEqual(dev, '/org/freedesktop/UDisks/devices/' + + os.path.basename(self.device)) + + self.assertRaises(dbus.DBusException, + self.manager_iface.FindDeviceByMajorMinor, 42, 42) + + def test_inhibition(self): + '''inhibition''' + + # Inhibit() + self.failIf(self.manager_props.Get('org.freedesktop.UDisks', + 'DaemonIsInhibited')) + cookie1 = self.manager_iface.Inhibit() + self.assert_(self.manager_props.Get('org.freedesktop.UDisks', + 'DaemonIsInhibited')) + + # try mounting, should fail due to inhibition + try: + self.partition_iface().FilesystemMount('', []) + self.fail('.FilesystemMount() succeeded while inhibited') + except dbus.DBusException, e: + self.assert_(e._dbus_error_name.endswith('Error.Inhibited')) + + # Inhibit() another time + cookie2 = self.manager_iface.Inhibit() + self.assert_(self.manager_props.Get('org.freedesktop.UDisks', + 'DaemonIsInhibited')) + + # Uninhibit() + self.manager_iface.Uninhibit(cookie1) + self.assert_(self.manager_props.Get('org.freedesktop.UDisks', + 'DaemonIsInhibited')) + + self.assertRaises(dbus.DBusException, self.manager_iface.Uninhibit, + '0xDEADBEEF') + + self.manager_iface.Uninhibit(cookie2) + self.failIf(self.manager_props.Get('org.freedesktop.UDisks', + 'DaemonIsInhibited')) + + self.assertRaises(dbus.DBusException, self.manager_iface.Uninhibit, + cookie1) + +# ---------------------------------------------------------------------------- + +if __name__ == '__main__': + optparser = optparse.OptionParser('%prog [options] [test class] [test name] [...]') + optparser.add_option('-l', '--log-file', dest='logfile', + help='write daemon log to a file') + (opts, args) = optparser.parse_args() + + UDisksTestCase.init(logfile=opts.logfile) + if len(args) == 0: + tests = unittest.TestLoader().loadTestsFromName('__main__') + else: + tests = unittest.TestLoader().loadTestsFromNames(args, + __import__('__main__')) + unittest.TextTestRunner(verbosity=2).run(tests) + + + -- 1.6.5