# -*- coding: utf-8 -*-
"""
flask_pluginkit.utils
~~~~~~~~~~~~~~~~~~~~~
Some tool classes and functions.
:copyright: (c) 2019 by staugur.
:license: BSD 3-Clause, see LICENSE for more details.
"""
import json
import shelve
from semver.version import Version
from functools import cmp_to_key
from os.path import join, abspath
from tempfile import gettempdir
from collections import deque
from markupsafe import Markup
from flask import Response, jsonify
from ._compat import string_types, text_type, iteritems
from .exceptions import PluginError, NotCallableError
from typing import List, Any, Optional, Dict
def isValidPrefix(prefix: str, allow_none: bool = False) -> bool:
"""Check if it can be used for blueprint prefix"""
if prefix is None and allow_none is True:
return True
if isinstance(prefix, string_types):
return (
prefix.startswith("/")
and not prefix.endswith("/")
and "//" not in prefix
and " " not in prefix
)
return False
[docs]
def isValidSemver(version: str) -> bool:
"""Semantic version number - determines whether the version is qualified.
The format is MAJOR.Minor.PATCH, more with https://semver.org
"""
if version and isinstance(version, string_types):
return Version.is_valid(version)
return False
[docs]
def sortedSemver(versions: List[str], sort: str = "ASC") -> List[str]:
"""Semantically sort the list of version Numbers"""
reverse = True if sort.upper() == "DESC" else False
if versions and isinstance(versions, (list, tuple)):
return sorted(
versions,
key=cmp_to_key(lambda v1, v2: Version.parse(v1).compare(v2)),
reverse=reverse,
)
else:
raise TypeError("Invaild versions, a list or tuple is right.")
[docs]
class BaseStorage(object):
"""This is the base class for storage.
The available storage classes need to inherit from :class:`~BaseStorage`
and override the `get` and `set` methods, it's best to implement
the remote method as well.
This base class customizes the `__getitem__`, `__setitem__`
and `__delitem__` methods so that the user can call it like a dict.
.. versionchanged:: 3.4.1
Change :attr:`index` to :attr:`DEFAULT_INDEX`
"""
#: The default index, as the only key, you can override it.
DEFAULT_INDEX: str = "flask_pluginkit_dat"
@property
def index(self):
"""Get the final index
.. versionadded:: 3.4.1
"""
return getattr(self, "COVERED_INDEX", None) or self.DEFAULT_INDEX
@index.setter
def index(self, _covered_index: str):
"""Set the covered index
.. versionadded:: 3.6.0
"""
self.COVERED_INDEX = _covered_index
def __getitem__(self, key: str):
if hasattr(self, "get"):
return self.get(key)
else:
raise AttributeError("Please override the get method")
def __setitem__(self, key: str, value: Any):
if hasattr(self, "set"):
return self.set(key, value)
else:
raise AttributeError("Please override the set method")
def __delitem__(self, key: str):
if hasattr(self, "remove"):
return self.remove(key)
else:
return False
def __str__(self):
return "<%s object at %s, index is %s>" % (
self.__class__.__name__,
hex(id(self)),
self.index,
)
__repr__ = __str__
[docs]
class LocalStorage(BaseStorage):
"""Local file system storage based on the shelve module."""
def __init__(self, path: Optional[str] = None):
self.COVERED_INDEX = path or join(gettempdir(), self.DEFAULT_INDEX)
def _open(self, flag: str = "c") -> shelve.Shelf:
return shelve.open(
filename=abspath(self.index),
flag=flag,
protocol=2,
writeback=False,
)
@property
def list(self) -> Dict[str, Any]:
"""list all data
:returns: dict
"""
db = None
try:
db = self._open(flag="r")
except Exception:
return dict()
else:
return dict(db)
finally:
if db:
db.close()
def __ck(self, key: str) -> str:
if not isinstance(key, text_type):
key = key.decode("utf-8")
return key
[docs]
def set(self, key: str, value: Any):
"""Set persistent data with shelve.
:param key: str: Index key
:param value: All supported data types in python
:raises:
:returns:
"""
db = None
try:
db = self._open()
db[self.__ck(key)] = value
finally:
if db:
db.close()
[docs]
def setmany(self, **mapping: Dict[str, Any]):
"""Set more data
:param mapping: the more k=v
.. versionadded:: 3.4.1
"""
if mapping and isinstance(mapping, dict):
db = self._open()
for k, v in iteritems(mapping):
db[self.__ck(k)] = v
db.close()
[docs]
def get(self, key: str, default: Any = None):
"""Get persistent data from shelve.
:returns: data
"""
try:
value = self.list[key]
except KeyError:
return default
else:
return value
def remove(self, key: str):
db = self._open()
del db[key]
def __len__(self):
return len(self.list)
[docs]
class RedisStorage(BaseStorage):
"""Use redis stand-alone storage"""
def __init__(self, redis_url=None, redis_connection=None):
self._db = self._open(redis_url) if redis_url else redis_connection
def _open(self, redis_url):
try:
from redis import from_url
except ImportError:
raise ImportError("Please install the redis module, eg: pip install redis")
else:
return from_url(redis_url)
@property
def list(self) -> Dict[str, Any]:
"""list redis hash data"""
return {k: json.loads(v) for k, v in iteritems(self._db.hgetall(self.index))}
[docs]
def set(self, key: str, value: Any):
"""set key data"""
return self._db.hset(self.index, key, json.dumps(value))
[docs]
def setmany(self, **mapping: Dict[str, Any]):
"""Set more data
:param mapping: the more k=v
.. versionadded:: 3.4.1
"""
if mapping and isinstance(mapping, dict):
mapping = {k: json.dumps(v) for k, v in iteritems(mapping)}
return self._db.hmset(self.index, mapping)
[docs]
def get(self, key: str, default: Any = None) -> Any:
"""get key original data from redis"""
v = self._db.hget(self.index, key)
if v:
if not isinstance(v, text_type):
v = v.decode("utf-8")
return json.loads(v)
return default
[docs]
def remove(self, key: str):
"""delete key from redis"""
return self._db.hdel(self.index, key)
def __len__(self):
return self._db.hlen(self.index)
[docs]
class JsonResponse(Response):
"""In response to a return type that cannot be processed.
If it is a dict, return json.
.. versionadded:: 3.4.0
"""
[docs]
@classmethod
def force_type(cls, rv, environ=None):
if isinstance(rv, dict):
rv = jsonify(rv)
return super(JsonResponse, cls).force_type(rv, environ)
class Attribution(dict):
"""A dict that allows for object-like property access syntax."""
def __getattr__(self, name):
try:
return self[name]
except KeyError:
raise AttributeError(name)
[docs]
class DcpManager(object):
def __init__(self):
self._listeners = {}
@property
def list(self):
return self._listeners
[docs]
def push(self, event, callback, position="right"):
"""Connect a dcp, push a function.
:param event: a unique identifier name for dcp.
:param callback: corresponding to the event to perform a function.
:param position: the position of the insertion function,
right(default) or left. The default right is inserted
at the end of the event, and left is inserted into
the event first.
:raises PluginError: the param event or position error
:raises NotCallableError: the param callback is not callable
.. versionadded:: 3.2.0
"""
if event and isinstance(event, string_types):
if not callable(callback):
raise NotCallableError("The event %s cannot be called" % event)
if position not in ("left", "right", "after", "before"):
raise PluginError("Invalid position")
if event not in self._listeners:
self._listeners[event] = deque([callback])
elif position in ("left", "before"):
self._listeners[event].appendleft(callback)
else:
self._listeners[event].append(callback)
else:
raise PluginError("Invalid event")
[docs]
def remove(self, event, callback):
"""Remove a callback again."""
try:
self._listeners[event].remove(callback)
except (KeyError, ValueError):
return False
else:
return True
[docs]
def emit(self, event, *args, **kwargs):
"""Emits events for the template context.
:returns: strings with :class:`~flask.Markup`
"""
results = []
funcs = self._listeners.get(event) or []
for f in funcs:
rv = f(*args, **kwargs)
if isinstance(rv, (list, tuple)):
rv = "".join(rv)
if rv:
if not isinstance(rv, text_type):
rv = rv.decode("utf-8")
results.append(rv)
return Markup("".join(results))
def allowed_uploaded_plugin_suffix(filename: str) -> bool:
"""Check suffix for uploaded filename
.. versionadded:: 3.3.0
"""
allow_suffix = [".tar.gz", ".tgz", ".zip"]
if isinstance(filename, string_types):
for suffix in allow_suffix:
if filename.endswith(suffix):
return True
return False
def check_url(addr: str) -> bool:
"""Check whether UrlAddr is in a valid format, for example::
http://ip:port
https://abc.com
.. versionadded:: 3.3.0
"""
from re import compile, IGNORECASE
regex = compile(
r"^(?:http)s?://"
r"(?:(?:[A-Z0-9](?:[A-Z0-9-]{0,61}[A-Z0-9])?\.)+"
r"(?:[A-Z]{2,6}\.?|[A-Z0-9-]{2,}\.?)|"
r"localhost|"
r"\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})"
r"(?::\d+)?"
r"(?:/?|[/?]\S+)$",
IGNORECASE,
)
if addr and isinstance(addr, string_types):
if regex.match(addr):
return True
return False