使用importlib加载modu时如何允许相对导入

2024-06-01 11:14:54 发布

您现在位置:Python中文网/ 问答频道 /正文

我正在尝试使用PyQt5创建应用程序,并且使用QWebEngine作为用户界面。 我已经成功地使应用程序,但现在我想使这个应用程序的插件机制,以便以后将更容易添加功能。我试过用Yapsy来做,但是后来我意识到我不能使用我所做的插件的相对导入

所以我决定用importlib模块自己创建它,然后我发现了这个问题

Using importlib to dynamically import module(s) containing relative imports

虽然答案本身并没有错,但在我想在包目录外加载模块的情况下,它不起作用

我的源代码可以找到here

正如您在我的源代码中看到的,有两个插件目录,myapp包内的插件和myapp包外的插件。myapp包中的plugins目录在那里,这样当它尝试导入时就不会给我任何ImportErrormyapp.plugins插件你知道吗

现在的问题是,当我使用python3 -m myapp运行它时,它会给我带来ImportError $No module named 'myapp.plugins.extras'。这就是我被困的地方。你知道吗


Tags: 模块功能目录插件应用程序源代码pluginsimportlib
1条回答
网友
1楼 · 发布于 2024-06-01 11:14:54

在谷歌上搜索了一段时间后,我发现了这篇文章

Dependency Injection with Import Hooks in Python 3

虽然概念本身正是我所需要的,但他提供模块的方式并不是我所期望的,因为他使用类而不是文件来提供模块。 所以我改变它来满足我的需要

你知道吗依赖性喷油器.py你知道吗

import os
import io
import sys
import _imp
import marshal
import types
import importlib
import importlib._bootstrap as _bootstrap
from importlib.abc import Loader


_PYCACHE = '__pycache__'
_OPT = 'opt-'
SOURCE_SUFFIXES = ['.py']  # _setup() adds .pyw as needed.

BYTECODE_SUFFIXES = ['.pyc']

MAGIC_NUMBER = (3394).to_bytes(2, 'little') + b'\r\n'
_RAW_MAGIC_NUMBER = int.from_bytes(MAGIC_NUMBER, 'little')  # For import.c


def _r_long(int_bytes):
    """Convert 4 bytes in little-endian to an integer."""
    return int.from_bytes(int_bytes, 'little')


def _w_long(x):
    """Convert a 32-bit integer to little-endian."""
    return (int(x) & 0xFFFFFFFF).to_bytes(4, 'little')


def _calc_mode(path):
    """Calculate the mode permissions for a bytecode file."""
    try:
        mode = os.stat(path).st_mode
    except OSError:
        mode = 0o666
    # We always ensure write access so we can update cached files
    # later even when the source files are read-only on Windows (#6074)
    mode |= 0o200
    return mode


def _write_atomic(path, data, mode=0o666):
    # id() is used to generate a pseudo-random filename.
    path_tmp = '{}.{}'.format(path, id(path))
    fd = os.open(path_tmp,
                 os.O_EXCL | os.O_CREAT | os.O_WRONLY, mode & 0o666)
    try:
        with io.FileIO(fd, 'wb') as file:
            file.write(data)
        os.replace(path_tmp, path)
    except OSError:
        try:
            os.unlink(path_tmp)
        except OSError:
            pass
        raise


_code_type = type(_write_atomic.__code__)


def cache_from_source(path, debug_override=None, *, optimization=None):
    if debug_override is not None:
        print('the debug_override parameter is deprecated; use '
              "'optimization' instead", DeprecationWarning)
        if optimization is not None:
            message = 'debug_override or optimization must be set to None'
            raise TypeError(message)
        optimization = '' if debug_override else 1

    path = os.fspath(path)
    head, tail = os.path.split(path)
    base, sep, rest = tail.rpartition('.')
    tag = sys.implementation.cache_tag
    if tag is None:
        raise NotImplementedError('sys.implementation.cache_tag is None')

    almost_filename = ''.join([(base if base else rest), sep, tag])
    if optimization is None:
        if sys.flags.optimize == 0:
            optimization = ''
        else:
            optimization = sys.flags.optimize

    optimization = str(optimization)
    if optimization != '':
        if not optimization.isalnum():
            raise ValueError('{!r} is not alphanumeric'.format(optimization))
        almost_filename = '{}.{}{}'.format(almost_filename, _OPT, optimization)

    return os.path.join(head, _PYCACHE, almost_filename + BYTECODE_SUFFIXES[0])


def _classify_pyc(data, name, exc_details):
    magic = data[:4]
    if magic != MAGIC_NUMBER:
        message = f'bad magic number in {name!r}: {magic!r}'
        _bootstrap._verbose_message('{}', message)
        raise ImportError(message, **exc_details)

    if len(data) < 16:
        message = f'reached EOF while reading pyc header of {name!r}'
        _bootstrap._verbose_message('{}', message)
        raise EOFError(message)

    flags = _r_long(data[4:8])
    # Only the first two flags are defined.
    if flags & ~0b11:
        message = f'invalid flags {flags!r} in {name!r}'
        raise ImportError(message, **exc_details)
    return flags


def _validate_timestamp_pyc(data, source_mtime, source_size, name,
                            exc_details):
    if _r_long(data[8:12]) != (source_mtime & 0xFFFFFFFF):
        message = f'bytecode is stale for {name!r}'
        _bootstrap._verbose_message('{}', message)
        raise ImportError(message, **exc_details)
    if (source_size is not None and
            _r_long(data[12:16]) != (source_size & 0xFFFFFFFF)):
        raise ImportError(f'bytecode is stale for {name!r}', **exc_details)


def _validate_hash_pyc(data, source_hash, name, exc_details):
    if data[8:16] != source_hash:
        raise ImportError(
            f'hash in bytecode doesn\'t match hash of source {name!r}',
            **exc_details,
        )


def _compile_bytecode(data, name=None, bytecode_path=None, source_path=None):
    code = marshal.loads(data)
    if isinstance(code, _code_type):
        _bootstrap._verbose_message('code object from {!r}', bytecode_path)
        if source_path is not None:
            _imp._fix_co_filename(code, source_path)
        return code
    else:
        raise ImportError('Non-code object in {!r}'.format(bytecode_path),
                          name=name, path=bytecode_path)


def _code_to_timestamp_pyc(code, mtime=0, source_size=0):
    data = bytearray(MAGIC_NUMBER)
    data.extend(_w_long(0))
    data.extend(_w_long(mtime))
    data.extend(_w_long(source_size))
    data.extend(marshal.dumps(code))
    return data


def _code_to_hash_pyc(code, source_hash, checked=True):
    data = bytearray(MAGIC_NUMBER)
    flags = 0b1 | checked << 1
    data.extend(_w_long(flags))
    assert len(source_hash) == 8
    data.extend(source_hash)
    data.extend(marshal.dumps(code))
    return data


class DependencyInjectorFinder(importlib.abc.MetaPathFinder):
    def __init__(self, loader):
        # we'll write the loader in a minute, hang tight
        self._loader = loader

    def find_spec(self, fullname, path, target=None):
        if self._loader.provides(fullname):
            return self._gen_spec(fullname)

    def _gen_spec(self, fullname):
        spec = importlib.machinery.ModuleSpec(fullname, self._loader)
        return spec


class DependencyInjectorLoader(Loader):
    _COMMON_PREFIX = "myapp.plugins."
    path = None

    def __init__(self):
        self._services = {}
        self._dummy_module = types.ModuleType(self._COMMON_PREFIX[:-1])
        self._dummy_module.__path__ = []

    def path_stats(self, path):
        st = os.stat(path)
        return {'mtime': st.st_mtime, 'size': st.st_size}

    def _cache_bytecode(self, source_path, bytecode_path, data):
        mode = _calc_mode(source_path)
        return self.set_data(bytecode_path, data, _mode=mode)

    def set_data(self, path, data, *, _mode=0o666):
        parent, filename = os.path.split(path)
        path_parts = []
        # Figure out what directories are missing.
        while parent and not os.path.isdir(parent):
            parent, part = os.path.split(parent)
            path_parts.append(part)
        # Create needed directories.
        for part in reversed(path_parts):
            parent = os.path.join(parent, part)
            try:
                os.mkdir(parent)
            except FileExistsError:
                # Probably another Python process already created the dir.
                continue
            except OSError as exc:
                # Could be a permission error, read-only filesystem: just forget
                # about writing the data.
                _bootstrap._verbose_message('could not create {!r}: {!r}',
                                            parent, exc)
                return
        try:
            _write_atomic(path, data, _mode)
            _bootstrap._verbose_message('created {!r}', path)
        except OSError as exc:
            # Same as above: just don't write the bytecode.
            _bootstrap._verbose_message('could not create {!r}: {!r}', path,
                                        exc)

    def get_filename(self, fullname):
        """Return the path to the source file as found by the finder."""
        if fullname in self._services:
            module = self._services[fullname]
            return module.__path__
        return None

    def get_code(self, fullname):
        """Concrete implementation of InspectLoader.get_code.

        Reading of bytecode requires path_stats to be implemented. To write
        bytecode, set_data must also be implemented.

        """
        source_path = self.get_filename(fullname)
        source_mtime = None
        source_bytes = None
        source_hash = None
        hash_based = False
        check_source = True
        try:
            bytecode_path = cache_from_source(source_path)
        except NotImplementedError:
            bytecode_path = None
        else:
            try:
                st = self.path_stats(source_path)
            except OSError:
                pass
            else:
                source_mtime = int(st['mtime'])
                try:
                    data = self.get_data(bytecode_path)
                except OSError:
                    pass
                else:
                    exc_details = {
                        'name': fullname,
                        'path': bytecode_path,
                    }
                    try:
                        flags = _classify_pyc(data, fullname, exc_details)
                        bytes_data = memoryview(data)[16:]
                        hash_based = flags & 0b1 != 0
                        if hash_based:
                            check_source = flags & 0b10 != 0
                            if (_imp.check_hash_based_pycs != 'never' and
                                (check_source or
                                 _imp.check_hash_based_pycs == 'always')):
                                source_bytes = self.get_data(source_path)
                                source_hash = _imp.source_hash(
                                    _RAW_MAGIC_NUMBER,
                                    source_bytes,
                                )
                                _validate_hash_pyc(data, source_hash, fullname,
                                                   exc_details)
                        else:
                            _validate_timestamp_pyc(
                                data,
                                source_mtime,
                                st['size'],
                                fullname,
                                exc_details,
                            )
                    except (ImportError, EOFError):
                        pass
                    else:
                        _bootstrap._verbose_message('{} matches {}', bytecode_path,
                                                    source_path)
                        return _compile_bytecode(bytes_data, name=fullname,
                                                 bytecode_path=bytecode_path,
                                                 source_path=source_path)
        if source_bytes is None:
            source_bytes = self.get_data(source_path)
        code_object = self.source_to_code(source_bytes, source_path)
        _bootstrap._verbose_message('code object from {}', source_path)
        if (not sys.dont_write_bytecode and bytecode_path is not None and
                source_mtime is not None):
            if hash_based:
                if source_hash is None:
                    source_hash = _imp.source_hash(source_bytes)
                data = _code_to_hash_pyc(
                    code_object, source_hash, check_source)
            else:
                data = _code_to_timestamp_pyc(code_object, source_mtime,
                                              len(source_bytes))
            try:
                self._cache_bytecode(source_path, bytecode_path, data)
                _bootstrap._verbose_message('wrote {!r}', bytecode_path)
            except NotImplementedError:
                pass
        return code_object

    def source_to_code(self, data, path, *, _optimize=-1):
        """Return the code object compiled from source.

        The 'data' argument can be any object type that compile() supports.
        """
        return _bootstrap._call_with_frames_removed(compile, data, path, 'exec',
                                                    dont_inherit=True, optimize=_optimize)

    def get_data(self, path):
        """Return the data from path as raw bytes."""
        # TODO: raise error if the file is not found
        # if it's a directory try to get the __init__.py file inside this folder
        if os.path.isdir(path):
            init_path = os.path.join(path, '__init__.py')
            if os.path.exists(init_path):
                with io.FileIO(init_path, 'r') as file:
                    return file.read()

        with io.FileIO(path, 'r') as file:
            return file.read()

    def provide(self, service_name, module):
        """Register a service as provided via the given module
        A service is any Python object in this context - an imported module,
        a class, etc."""
        self._services[service_name] = module

    def provides(self, fullname):
        if self._truncate_name(fullname) in self._services:
            return True
        else:
            # this checks if we should return the dummy module,
            # since this evaluates to True when importing myapp and
            # myapp.virtual
            return self._COMMON_PREFIX.startswith(fullname)

    def create_module(self, spec):
        """Create the given module from the supplied module spec
        Under the hood, this module returns a service or a dummy module,
        depending on whether Python is still importing one of the names listed
        in _COMMON_PREFIX.
        """
        service_name = self._truncate_name(spec.name)
        if service_name not in self._services:
            # return our dummy module since at this point we're loading
            # *something* along the lines of "myapp.virtual" that's not
            # a service
            return self._dummy_module
        module = self._services[service_name]
        return module

    def exec_module(self, module):
        """Execute the given module in its own namespace
        This method is required to be present by importlib.abc.Loader,
        but since we know our module object is already fully-formed,
        this method merely no-ops.
        """
        if hasattr(module, "__path__"):
            self.path = module.__path__
            code = self.get_code(module.__name__)
            importlib._bootstrap._call_with_frames_removed(
                exec, code, module.__dict__)

    def _truncate_name(self, fullname):
        """Strip off _COMMON_PREFIX from the given module name
        Convenience method when checking if a service is provided.
        """
        truncated_name = fullname
        if truncated_name.startswith(self._COMMON_PREFIX):
            truncated_name = fullname[len(self._COMMON_PREFIX):]

        return truncated_name

    def is_package(self, fullname):
        return self.provides(fullname)

上面的大部分源代码都取自importlib.machinery.SourceFileLoader

你知道吗插件管理器.py你知道吗

import os
import re
import ast
import sys
import types
import typing
import importlib
import configparser

from PyQt5.QtCore import QObject
from PyQt5.QtWidgets import QApplication
from PyQt5.QtWebEngineWidgets import QWebEngineScript

from .utils import Signal, findFiles
from .config import change_filter, getInstance
from .DependencyInjector import DependencyInjectorFinder, DependencyInjectorLoader


class PluginInjector:
    """
    Convenience wrapper for DependencyInjectorLoader and DependencyInjectorFinder.
    """

    def __init__(self):
        self._loader = DependencyInjectorLoader()
        self._finder = DependencyInjectorFinder(self._loader)
        self.installed = False
        self.install()

    def install(self):
        if not self.installed:
            self.installed = True
            sys.meta_path.append(self._finder)

    def provide(self, service_name, module):
        self._loader.provide(service_name, module)


class PluginInfo(configparser.ConfigParser):
    def __init__(self, filepath):
        super().__init__()
        self._filepath = filepath
        self.read(self._filepath, encoding='utf-8')

    def isValid(self) -> bool:
        """"""
        return self.has_section("plugin") and self.has_option("plugin", "Module")


class PluginManager(QObject):
    pluginAdded = Signal()
    pluginRemoved = Signal()
    pluginActivated = Signal()
    pluginDeactivated = Signal()
    loadStarted = Signal()
    loadFinished = Signal()
    beforeLoad = Signal()
    bridgeInitialize = Signal()

    def __init__(self, pluginDirs: typing.List[str] = [], parent=None):
        super().__init__(parent)
        app = QApplication.instance()

        from .MyApplication import MyApplication
        assert isinstance(app, MyApplication)

        self._injector = PluginInjector()
        self._plugins = {}
        self._loadedPlugins = {}
        self._pluginsResources = {}
        self._pluginDirs = pluginDirs
        self.loadStarted.connect(self._loadStarted)
        self.beforeLoad.connect(self._beforeLoad)
        self.loadFinished.connect(self._loadFinished)
        self.bridgeInitialize.connect(self._bridgeInitialize)
        self._loadPlugins()

    def _bridgeInitialize(self, page):
        for name, resources in self._pluginsResources.items():
            for resource in resources:
                scriptName = name + "_" + os.path.basename(resource)

                if resource.endswith(".js"):
                    injectionPoint = QWebEngineScript.DocumentReady
                    page.injectScript(resource, scriptName, injectionPoint)
                elif resource.endswith(".css"):
                    injectionPoint = QWebEngineScript.DocumentReady
                    page.injectStylesheet(
                        resource, scriptName, injectionPoint)

    def _beforeLoad(self, channel, page):
        for name, plugin in self._plugins.items():
            if 'beforeLoad' in dir(plugin):
                plugin.beforeLoad(channel, page)
            elif 'before_load' in dir(plugin):
                plugin.before_load(channel, page)

    def _loadStarted(self, page):
        for name, plugin in self._plugins.items():
            if 'loadStarted' in dir(plugin):
                plugin.loadStarted(page)
            elif 'load_started' in dir(plugin):
                plugin.load_started(page)

    def _loadFinished(self, page):
        for name, plugin in self._plugins.items():
            if 'loadFinished' in dir(plugin):
                plugin.loadStarted(page)
            elif 'load_finished' in dir(plugin):
                plugin.load_started(page)

    def addPluginPath(self, path: str):
        assert os.path.isabs(path)
        if not path in self._pluginDirs:
            self._pluginDirs.append(path)
            self._loadPlugins()

    def _loadPlugin(self, pluginName):
        if pluginName in self._loadedPlugins.keys():
            return self._loadedPlugins[pluginName]

        identities_paths = []
        for directory in self._pluginDirs:
            identities_paths += findFiles("*.plugin", directory)

        module = None
        for f in identities_paths:
            info = PluginInfo(f)
            name = f
            if info.has_section("plugin") and info.has_option("plugin", "Name"):
                name = info.get("plugin", "Name")
            else:
                continue

            if name == pluginName:
                if not info.isValid():
                    print(f"Plugin identity {name} is not valid, please read documentation "
                          "about how to write plugin.")
                else:
                    parentdir = os.path.dirname(f)
                    module_path = os.path.join(
                        parentdir, info.get("plugin", "Module"))
                    if(not module_path.endswith(".py")):
                        module_path += ".py"

                    if os.path.exists(module_path):
                        try:
                            module_name = info.get(
                                "plugin", "Module").replace(".py", "")
                            parentdir = os.path.dirname(module_path)
                            print(f"creating namespace for plugin {name}")
                            # create a fake module for this plugin namespace
                            package = f"{name}"
                            module = types.ModuleType(package)
                            module.__path__ = parentdir
                            self._injector.provide(package, module)
                            # try to load all python file except for the main file and __init__.py
                            for f in findFiles("*.py", parentdir):
                                basename = os.path.splitext(
                                    os.path.basename(f))[0]
                                if basename == module_name or basename == '__init__':
                                    continue

                                tail = f[len(
                                    parentdir + '/'):].replace(os.path.sep, '.').replace('.py', '')
                                package = f"{name}.{tail}"
                                m_path = f
                                print(
                                    f"load external module for plugin {name} with name {module.__name__}")
                                spec = importlib.util.spec_from_file_location(
                                    package, m_path)
                                module = importlib.util.module_from_spec(spec)
                                module.__path__ = m_path
                                self._injector.provide(package, module)

                            package = f"{name}.{module_name}"
                            spec = importlib.util.spec_from_file_location(
                                package, module_path)
                            module = importlib.util.module_from_spec(spec)
                            module.__path__ = module_path
                            self._injector.provide(package, module)
                            spec.loader.exec_module(module)
                            self._loadedPlugins[name] = module
                        except ImportError:
                            print(
                                f"Unable to load plugin module {name}")

                        break
                    else:
                        print(
                            f"module specified in {name} doesn't exists, it will be ignored.")

        return module

    def _loadPlugins(self):
        """"""
        identities_paths = []
        for directory in self._pluginDirs:
            identities_paths += findFiles("*.plugin", directory)

        plugins: typing.List[PluginInfo] = []

        for f in identities_paths:
            info = PluginInfo(f)
            name = f
            if info.has_section("plugin") and info.has_option("plugin", "Name"):
                name = info.get("plugin", "Name")

            # if it's already exists it means that user just add a new plugins directory
            if name in self._loadedPlugins.keys():
                continue

            if not info.isValid():
                print(f"Plugin identity {name} is not valid, please read documentation "
                      "about how to write plugin.")
            else:
                parentdir = os.path.dirname(f)
                module_path = os.path.join(
                    parentdir, info.get("plugin", "Module"))
                if(not module_path.endswith(".py")):
                    module_path += ".py"

                if os.path.exists(module_path):
                    info.set("plugin", "Path", module_path)
                    plugins.append(info)
                else:
                    print(
                        f"module specified in {f} doesn't exists, it will be ignored.")

        print(f"{len(plugins)} plugins found.")
        for plugin in plugins:
            try:
                name = plugin.get("plugin", "Name")
                module_name = plugin.get("plugin", "Module").replace(".py", "")
                module_path = plugin.get("plugin", "Path")

                parentdir = os.path.dirname(module_path)
                print(f"creating namespace for plugin {name}")
                # create a fake module for this plugin namespace
                # create a fake module for this plugin namespace
                package = f"{name}"
                module = types.ModuleType(package)
                module.__path__ = parentdir
                self._injector.provide(package, module)

                # try to load all python file except for the main file and __init__.py
                for f in findFiles("*.py", parentdir):
                    basename = os.path.splitext(os.path.basename(f))[0]
                    if basename == module_name or basename == '__init__':
                        continue

                    tail = f[len(parentdir + '/'):].replace(os.path.sep, '.').replace('.py', '')
                    package = f"{name}.{tail}"
                    m_path = f
                    print(
                        f"load external module for plugin {name} with name {module.__name__}")
                    spec = importlib.util.spec_from_file_location(
                        package, m_path)
                    module = importlib.util.module_from_spec(spec)
                    module.__path__ = m_path
                    self._injector.provide(package, module)

                print(f"importing main plugin module for plugin {name}")
                package = f"{name}.{module_name}"
                spec = importlib.util.spec_from_file_location(
                    package, module_path)
                module = importlib.util.module_from_spec(spec)
                module.__path__ = module_path
                self._injector.provide(package, module)
                spec.loader.exec_module(module)
                self._loadedPlugins[name] = module

                """
                By default plugin will be enabled if there was no plugin configuration.
                """
                cfg = getInstance().get(f"plugins.{name}")
                shouldLoad = True
                if cfg is None:
                    shouldLoad = False
                    cfg = dict()
                    cfg['enabled'] = True
                    getInstance().set(f"plugins.{name}.enabled", True)

                # if this is the first time the plugin is registered code above will trigger _pluginStateChange
                # and activate it, so we don't need to activate it again here
                if cfg['enabled'] and shouldLoad:
                    if 'activate' in dir(module):
                        module.activate()
                        self._plugins[name] = module

                if plugin.has_option("plugin", "Resources"):
                    resources = ast.literal_eval(
                        plugin.get("plugin", "Resources"))
                    base_path = os.path.dirname(module_path)

                    def to_abspath(path: str):
                        if not os.path.isabs(path):
                            return os.path.join(base_path, path)

                        return path

                    resources = list(map(to_abspath, resources))
                    self._pluginsResources[name] = resources

            except ImportError as e:
                name = plugin.get("plugin", "Name")
                print(
                    f"Unable to load plugin module {name} : ${e.msg}")

    @change_filter("plugins")
    def _pluginsStateChanged(self, key: str, value):
        """We only interested with the name and the value"""
        res = re.findall("plugins\\.(.*)\\.enabled", key)
        if key.endswith("enabled") and len(res) > 0:
            name = res[0]
            if not value:
                self.disablePlugin(name)
            elif value:
                self.enablePlugin(name)

    def enablePlugin(self, name: str):
        print(f"enabling plugin {name}")
        if not name in self._plugins.keys():
            module = self._loadPlugin(name)
            if module is not None:
                if "activate" in dir(module):
                    module.activate()
                    self.pluginActivated.emit(name)
                    self._plugins[name] = module
                    self.pluginAdded.emit(name)
            else:
                print(f"Unable activate plugin {name}")

    def disablePlugin(self, name: str):
        """"""
        print(f"disabling plugin {name}")
        if name in self._plugins.keys():
            module = self._plugins[name]
            if "deactivate" in dir(module):
                module.deactivate()
                self.pluginDeactivated.emit(name)

            self._plugins.pop(name, None)
            self.pluginRemoved.emit(name)

虽然上面的代码还不完善,但至少它已经满足了我的需要。你知道吗

相关问题 更多 >