flask_pluginkit.utils 源代码

# -*- coding: utf-8 -*-
"""
    flask_pluginkit.utils
    ~~~~~~~~~~~~~~~~~~~~~

    Some tool classes and functions.

    :copyright: (c) 2019 by staugur.
    :license: BSD 3-Clause, see LICENSE for more details.
"""

import json
import shelve
from semver import VersionInfo
from os.path import join, abspath
from tempfile import gettempdir
from collections import deque
from flask import Markup, Response, jsonify
from flask.app import setupmethod, Flask as _BaseFlask
from ._compat import PY2, string_types, text_type, iteritems
from .exceptions import PluginError, NotCallableError


def isValidPrefix(prefix, allow_none=False):
    """Check if it can be used for blueprint prefix"""
    if prefix is None and allow_none is True:
        return True
    if isinstance(prefix, string_types):
        return (
            prefix.startswith("/")
            and not prefix.endswith("/")
            and "//" not in prefix
            and " " not in prefix
        )
    return False


[文档]def isValidSemver(version): """Semantic version number - determines whether the version is qualified. The format is MAJOR.Minor.PATCH, more with https://semver.org """ if version and isinstance(version, string_types): return VersionInfo.isvalid(version) return False
[文档]def sortedSemver(versions, sort="ASC"): """Semantically sort the list of version Numbers""" reverse = True if sort.upper() == "DESC" else False if versions and isinstance(versions, (list, tuple)): def compare(ver1, ver2): v1 = VersionInfo.parse(ver1) return v1.compare(ver2) if PY2: return sorted(versions, cmp=compare, reverse=reverse) else: from functools import cmp_to_key return sorted(versions, key=cmp_to_key(compare), reverse=reverse) else: raise TypeError("Invaild versions, a list or tuple is right.")
[文档]class BaseStorage(object): """This is the base class for storage. The available storage classes need to inherit from :class:`~BaseStorage` and override the `get` and `set` methods, it's best to implement the remote method as well. This base class customizes the `__getitem__`, `__setitem__` and `__delitem__` methods so that the user can call it like a dict. .. versionchanged:: 3.4.1 Change :attr:`index` to :attr:`DEFAULT_INDEX` """ #: The default index, as the only key, you can override it. DEFAULT_INDEX = "flask_pluginkit_dat" @property def index(self): """Get the final index .. versionadded:: 3.4.1 """ return getattr(self, "COVERED_INDEX", None) or self.DEFAULT_INDEX @index.setter def index(self, _covered_index): """Set the covered index .. versionadded:: 3.6.0 """ self.COVERED_INDEX = _covered_index def __getitem__(self, key): if hasattr(self, "get"): return self.get(key) else: raise AttributeError("Please override the get method") def __setitem__(self, key, value): if hasattr(self, "set"): return self.set(key, value) else: raise AttributeError("Please override the set method") def __delitem__(self, key): if hasattr(self, "remove"): return self.remove(key) else: return False def __str__(self): return "<%s object at %s, index is %s>" % ( self.__class__.__name__, hex(id(self)), self.index, ) __repr__ = __str__
[文档]class LocalStorage(BaseStorage): """Local file system storage based on the shelve module.""" def __init__(self, path=None): self.COVERED_INDEX = path or join(gettempdir(), self.DEFAULT_INDEX) def _open(self, flag="c"): return shelve.open( filename=abspath(self.index), flag=flag, protocol=2, writeback=False, ) @property def list(self): """list all data :returns: dict """ db = None try: db = self._open(flag="r") except Exception: return dict() else: return dict(db) finally: if db: db.close() def __ck(self, key): if PY2 and isinstance(key, text_type): key = key.encode("utf-8") if not PY2 and not isinstance(key, text_type): key = key.decode("utf-8") return key
[文档] def set(self, key, value): """Set persistent data with shelve. :param key: str: Index key :param value: All supported data types in python :raises: :returns: """ db = None try: db = self._open() db[self.__ck(key)] = value finally: if db: db.close()
[文档] def setmany(self, **mapping): """Set more data :param mapping: the more k=v .. versionadded:: 3.4.1 """ if mapping and isinstance(mapping, dict): db = self._open() for k, v in iteritems(mapping): db[self.__ck(k)] = v db.close()
[文档] def get(self, key, default=None): """Get persistent data from shelve. :returns: data """ try: value = self.list[key] except KeyError: return default else: return value
def remove(self, key): db = self._open() del db[key] def __len__(self): return len(self.list)
[文档]class RedisStorage(BaseStorage): """Use redis stand-alone storage""" def __init__(self, redis_url=None, redis_connection=None): self._db = self._open(redis_url) if redis_url else redis_connection def _open(self, redis_url): try: from redis import from_url except ImportError: raise ImportError( "Please install the redis module, eg: pip install redis" ) else: return from_url(redis_url) @property def list(self): """list redis hash data""" return { k: json.loads(v) for k, v in iteritems(self._db.hgetall(self.index)) }
[文档] def set(self, key, value): """set key data""" return self._db.hset(self.index, key, json.dumps(value))
[文档] def setmany(self, **mapping): """Set more data :param mapping: the more k=v .. versionadded:: 3.4.1 """ if mapping and isinstance(mapping, dict): mapping = {k: json.dumps(v) for k, v in iteritems(mapping)} return self._db.hmset(self.index, mapping)
[文档] def get(self, key, default=None): """get key original data from redis""" v = self._db.hget(self.index, key) if v: if not PY2 and not isinstance(v, text_type): v = v.decode("utf-8") return json.loads(v) return default
[文档] def remove(self, key): """delete key from redis""" return self._db.hdel(self.index, key)
def __len__(self): return self._db.hlen(self.index)
[文档]class JsonResponse(Response): """In response to a return type that cannot be processed. If it is a dict, return json. .. versionadded:: 3.4.0 """
[文档] @classmethod def force_type(cls, rv, environ=None): if isinstance(rv, dict): rv = jsonify(rv) return super(JsonResponse, cls).force_type(rv, environ)
[文档]class Flask(_BaseFlask):
[文档] @setupmethod def before_request_top(self, f): """Registers a function to run before each request. Priority First. The usage is equivalent to the :meth:`flask.Flask.before_request` decorator, and before_request registers the function at the end of the before_request_funcs, while this decorator registers the function at the top of the before_request_funcs (index 0). Because flask-pluginkit has registered all cep into the app at load time, if your web application uses before_request and plugins depend on one of them (like g), the plugin will not run properly, so your web application should use this decorator at this time. """ self.before_request_funcs.setdefault(None, []).insert(0, f) return f
[文档] @setupmethod def before_request_second(self, f): """Registers a function to run before each request. Priority Second.""" self.before_request_funcs.setdefault(None, []).insert(1, f) return f
class Attribution(dict): """A dict that allows for object-like property access syntax.""" def __getattr__(self, name): try: return self[name] except KeyError: raise AttributeError(name)
[文档]class DcpManager(object): def __init__(self): self._listeners = {} @property def list(self): return self._listeners
[文档] def push(self, event, callback, position="right"): """Connect a dcp, push a function. :param event: a unique identifier name for dcp. :param callback: corresponding to the event to perform a function. :param position: the position of the insertion function, right(default) or left. The default right is inserted at the end of the event, and left is inserted into the event first. :raises PluginError: the param event or position error :raises NotCallableError: the param callback is not callable .. versionadded:: 3.2.0 """ if event and isinstance(event, string_types): if not callable(callback): raise NotCallableError("The event %s cannot be called" % event) if position not in ("left", "right", "after", "before"): raise PluginError("Invalid position") if event not in self._listeners: self._listeners[event] = deque([callback]) elif position in ("left", "before"): self._listeners[event].appendleft(callback) else: self._listeners[event].append(callback) else: raise PluginError("Invalid event")
[文档] def remove(self, event, callback): """Remove a callback again.""" try: self._listeners[event].remove(callback) except (KeyError, ValueError): return False else: return True
[文档] def emit(self, event, *args, **kwargs): """Emits events for the template context. :returns: strings with :class:`~flask.Markup` """ results = [] funcs = self._listeners.get(event) or [] for f in funcs: rv = f(*args, **kwargs) if isinstance(rv, (list, tuple)): rv = "".join(rv) if rv: if not isinstance(rv, text_type): rv = rv.decode("utf-8") results.append(rv) return Markup("".join(results))
def allowed_uploaded_plugin_suffix(filename): """Check suffix for uploaded filename .. versionadded:: 3.3.0 """ allow_suffix = [".tar.gz", ".tgz", ".zip"] if isinstance(filename, string_types): for suffix in allow_suffix: if filename.endswith(suffix): return True return False def check_url(addr): """Check whether UrlAddr is in a valid format, for example:: http://ip:port https://abc.com .. versionadded:: 3.3.0 """ from re import compile, IGNORECASE regex = compile( r"^(?:http)s?://" r"(?:(?:[A-Z0-9](?:[A-Z0-9-]{0,61}[A-Z0-9])?\.)+" r"(?:[A-Z]{2,6}\.?|[A-Z0-9-]{2,}\.?)|" r"localhost|" r"\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})" r"(?::\d+)?" r"(?:/?|[/?]\S+)$", IGNORECASE, ) if addr and isinstance(addr, string_types): if regex.match(addr): return True return False