Logo Search packages:      
Sourcecode: ubuntuone-dev-tools version File versions  Download package

testcase.py

# -*- coding: utf-8 -*-

# Author: Guillermo Gonzalez <guillermo.gonzalez@canonical.com>
#
# Copyright 2009-2010 Canonical Ltd.
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU General Public License version 3, as published
# by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranties of
# MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR
# PURPOSE.  See the GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program.  If not, see <http://www.gnu.org/licenses/>.

"""Base tests cases and test utilities."""

from __future__ import with_statement

import contextlib
import os
import shutil
import sys

from functools import wraps

from twisted.internet import defer
from twisted.python import failure
from twisted.trial.unittest import TestCase, SkipTest

# DBusRunner for DBusTestCase using tests
from ubuntuone.devtools.services.dbus import DBusRunner


# pylint: disable=F0401,C0103
try:
    import dbus
except ImportError:
    dbus = None

try:
    import dbus.service as service
except ImportError:
    service = None

try:
    from dbus.mainloop.glib import DBusGMainLoop
except ImportError:
    DBusGMainLoop = None


# pylint: enable=F0401,C0103
@contextlib.contextmanager
def environ(env_var, new_value):
    """context manager to replace/add an environ value"""
    old_value = os.environ.get(env_var, None)
    os.environ[env_var] = new_value
    yield
    if old_value is None:
        os.environ.pop(env_var)
    else:
        os.environ[env_var] = old_value


def _id(obj):
    """Return the obj calling the funct."""
    return obj


# pylint: disable=C0103
def skipTest(reason):
    """Unconditionally skip a test."""

    def decorator(test_item):
        """Decorate the test so that it is skipped."""
        if not (isinstance(test_item, type) and\
            issubclass(test_item, TestCase)):

            @wraps(test_item)
            def skip_wrapper(*args, **kwargs):
                """Skip a test method raising an exception."""
                raise SkipTest(reason)
            test_item = skip_wrapper

        # tell twisted.trial.unittest to skip the test, pylint will complain
        # since it thinks we are redefining a name out of the scope
        # pylint: disable=W0621,W0612
        test_item.skip = reason
        # pylint: enable=W0621,W0612
        # because the item was skipped, we will make sure that no
        # services are started for it
        if hasattr(test_item, "required_services"):
            # pylint: disable=W0612
            test_item.required_services = lambda *args, **kwargs: []
            # pylint: enable=W0612
        return test_item
    return decorator


def skipIf(condition, reason):
    """Skip a test if the condition is true."""
    if condition:
        return skipTest(reason)
    return _id


def skipIfOS(current_os, reason):
    """Skip test for a particular os or lists of them."""
    if os:
        if sys.platform in current_os or sys.platform == current_os:
            return skipTest(reason)
        return _id
    return _id


def skipIfNotOS(current_os, reason):
    """Skip test we are not in a particular os."""
    if os:
        if sys.platform not in current_os or\
            sys.platform != current_os:
            return skipTest(reason)
        return _id
    return _id


# pylint: enable=C0103


00132 class FakeDBusInterface(object):
    """A fake DBusInterface..."""

00135     def shutdown(self, with_restart=False):
        """...that only knows how to go away"""


00139 class BaseTestCase(TestCase):
    """Base TestCase with helper methods to handle temp dir.

    This class provides:
        mktemp(name): helper to create temporary dirs
        rmtree(path): support read-only shares
        makedirs(path): support read-only shares

    """

00149     def required_services(self):
        """Return the list of required services for DBusTestCase."""
        return []

00153     def mktemp(self, name='temp'):
        """Customized mktemp that accepts an optional name argument."""
        tempdir = os.path.join(self.tmpdir, name)
        if os.path.exists(tempdir):
            self.rmtree(tempdir)
        self.makedirs(tempdir)
        return tempdir

    @property
00162     def tmpdir(self):
        """Default tmpdir: module/class/test_method."""
        # check if we already generated the root path
        root_dir = getattr(self, '__root', None)
        if root_dir:
            return root_dir
        max_filename = 32  # some platforms limit lengths of filenames
        base = os.path.join(self.__class__.__module__[:max_filename],
                            self.__class__.__name__[:max_filename],
                            self._testMethodName[:max_filename])
        # use _trial_temp dir, it should be os.gwtcwd()
        # define the root temp dir of the testcase, pylint: disable=W0201
        self.__root = os.path.join(os.getcwd(), base)
        return self.__root

00177     def rmtree(self, path):
        """Custom rmtree that handle ro parent(s) and childs."""
        if not os.path.exists(path):
            return
        # change perms to rw, so we can delete the temp dir
        if path != getattr(self, '__root', None):
            os.chmod(os.path.dirname(path), 0755)
        if not os.access(path, os.W_OK):
            os.chmod(path, 0755)
        # pylint: disable=W0612
        for dirpath, dirs, files in os.walk(path):
            for dirname in dirs:
                if not os.access(os.path.join(dirpath, dirname), os.W_OK):
                    os.chmod(os.path.join(dirpath, dirname), 0777)
        shutil.rmtree(path)

00193     def makedirs(self, path):
        """Custom makedirs that handle ro parent."""
        parent = os.path.dirname(path)
        if os.path.exists(parent):
            os.chmod(parent, 0755)
        os.makedirs(path)


@skipIf(dbus is None or service is None or DBusGMainLoop is None,
    "The test requires dbus.")
00203 class DBusTestCase(BaseTestCase):
    """Test the DBus event handling."""

00206     def required_services(self):
        """Return the list of required services for DBusTestCase."""
        services = super(DBusTestCase, self).required_services()
        services.extend([DBusRunner])
        return services

00212     def setUp(self):
        """Setup the infrastructure fo the test (dbus service)."""
        # Class 'BaseTestCase' has no 'setUp' member
        # pylint: disable=E1101
        # dbus modules will be imported by the decorator
        # pylint: disable=E0602
        BaseTestCase.setUp(self)
        self.loop = DBusGMainLoop(set_as_default=True)
        self.bus = dbus.bus.BusConnection(mainloop=self.loop)
        # monkeypatch busName.__del__ to avoid errors on gc
        # we take care of releasing the name in shutdown
        service.BusName.__del__ = lambda _: None
        self.bus.set_exit_on_disconnect(False)
        self.signal_receivers = set()

00227     def tearDown(self):
        """Cleanup the test."""
        # Class 'BaseTestCase' has no 'tearDown' member
        # pylint: disable=E1101
        d = self.cleanup_signal_receivers(self.signal_receivers)
        d.addBoth(self._tear_down)
        d.addBoth(lambda _: BaseTestCase.tearDown(self))
        return d

00236     def _tear_down(self):
        """Shutdown."""
        self.bus.flush()
        self.bus.close()

00241     def error_handler(self, error):
        """Default error handler for DBus calls."""
        if isinstance(error, failure.Failure):
            self.fail(error.getErrorMessage())

00246     def cleanup_signal_receivers(self, signal_receivers):
        """Cleanup self.signal_receivers and returns a deferred."""
        # dbus modules will be imported by the decorator
        # pylint: disable=E0602
        deferreds = []
        for match in signal_receivers:
            d = defer.Deferred()

            def callback(*args):
                """Callback that accepts *args."""
                if not d.called:
                    d.callback(args)
            self.bus.call_async(dbus.bus.BUS_DAEMON_NAME,
                                dbus.bus.BUS_DAEMON_PATH,
                                dbus.bus.BUS_DAEMON_IFACE, 'RemoveMatch', 's',
                                (str(match),), callback, self.error_handler)
            deferreds.append(d)
        if deferreds:
            return defer.DeferredList(deferreds)
        else:
            return defer.succeed(True)

Generated by  Doxygen 1.6.0   Back to index