Logo Search packages:      
Sourcecode: ubuntuone-dev-tools version File versions  Download package

dbus.py

#
# Author: Guillermo Gonzalez <guillermo.gonzalez@canonical.com>
#
# Copyright 2009-2010 Canonical Ltd.
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU General Public License version 3, as published
# by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranties of
# MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR
# PURPOSE.  See the GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program.  If not, see <http://www.gnu.org/licenses/>.
"""Utilities for finding and running a dbus session bus for testing."""

import os
import signal
import subprocess

from distutils.spawn import find_executable
from urllib import quote
from xdg.BaseDirectory import load_data_paths


00028 class DBusLaunchError(Exception):
    """Error while launching dbus-daemon"""
    pass


00033 class NotFoundError(Exception):
    """Not found error"""
    pass


00038 class DBusRunner(object):
    """Class for running dbus-daemon with a private session."""

    def __init__(self):
        self.dbus_address = None
        self.dbus_pid = None
        self.running = False
        self.config_file = None

00047     def _find_config_file(self, tempdir=None):
        """Find the first appropriate dbus-session.conf to use."""
        # In case we're running from within the source tree
        path = os.path.abspath(os.path.join(os.path.dirname(__file__),
                                            os.path.pardir, os.path.pardir,
                                            os.path.pardir,
                                            "data", "dbus-session.conf.in"))
        if not os.path.exists(path):
            # Use the installed file in $pkgdatadir as source
            for path in load_data_paths("ubuntuone-dev-tools",
                                        "dbus-session.conf.in"):
                if os.path.exists(path):
                    break

        # Check to make sure we didn't just fall out of the loop
        if not os.path.exists(path):
            raise IOError('Could not locate suitable dbus-session.conf.in')

        self.config_file = os.path.join(tempdir, 'dbus-session.conf')
        dbus_address = 'unix:tmpdir=%s' % quote(tempdir)
        with open(path) as in_file:
            content = in_file.read()
            with open(self.config_file, 'w') as out_file:
                out_file.write(content.replace('@ADDRESS@', dbus_address))
                out_file.close()
            in_file.close()

00074     def start_service(self, tempdir=None):
        """Start our own session bus daemon for testing."""
        if not tempdir:
            tempdir = os.path.join(os.getcwd(), '_trial_temp')
        dbus = find_executable("dbus-daemon")
        if not dbus:
            raise NotFoundError("dbus-daemon was not found.")

        self._find_config_file(tempdir)

        dbus_args = ["--fork",
                     "--config-file=" + self.config_file,
                     "--print-address=1",
                     "--print-pid=2"]
        sp = subprocess.Popen([dbus] + dbus_args,
                              bufsize=4096, stdout=subprocess.PIPE,
                              stderr=subprocess.PIPE)

        # Call wait here as under the qt4 reactor we get an error about
        # interrupted system call if we don't.
        sp.wait()
        self.dbus_address = "".join(sp.stdout.readlines()).strip()
        self.dbus_pid = int("".join(sp.stderr.readlines()).strip())

        if self.dbus_address != "":
            os.environ["DBUS_SESSION_BUS_ADDRESS"] = self.dbus_address
        else:
            os.kill(self.dbus_pid, signal.SIGKILL)
            raise DBusLaunchError("There was a problem launching dbus-daemon.")
        self.running = True

00105     def stop_service(self):
        """Stop our DBus session bus daemon."""
        try:
            del os.environ["DBUS_SESSION_BUS_ADDRESS"]
        except KeyError:
            pass
        os.kill(self.dbus_pid, signal.SIGKILL)
        self.running = False
        os.unlink(self.config_file)
        self.config_file = None

Generated by  Doxygen 1.6.0   Back to index