Source code for djask.auth.abstract
from __future__ import annotations
from base64 import b64decode
from base64 import b64encode
from time import time
from typing import Any
from warnings import warn
import sqlalchemy as sa
from apiflask.exceptions import abort
from authlib.jose import jwt
from flask_login.mixins import UserMixin
from sqlalchemy.ext.declarative import AbstractConcreteBase
from werkzeug.security import check_password_hash
from werkzeug.security import generate_password_hash
from ..extensions import db
from .permission import Permission
from .permission import PermissionExistingWarning
[docs]class AbstractUser(AbstractConcreteBase, UserMixin):
"""
A base class for all user models.
It enables you to define a user model other than the
:class:`~djask.auth.models.User` model below.
.. versionchanged:: 0.7.0
Add permissions
.. versionadded:: 0.1.0
"""
__table_args__ = {"extend_existing": True}
username = sa.Column(sa.String(128), index=True, unique=True)
name = sa.Column(sa.String(128))
email = sa.Column(sa.String(256), unique=True)
password_hash = sa.Column(sa.String(256))
permissions = sa.Column(sa.Text)
is_admin = sa.Column(sa.Boolean, default=False)
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
for mapper in db.Model.registry.mappers:
print(mapper.class_.__tablename__)
self.add_permission(Permission(mapper.class_.__tablename__, "read"))
db.session.commit()
def __repr__(self): # pragma: no cover
return f"<User {self.username}>"
[docs] def set_password(self, password: str) -> None:
"""Set the password for the user.
:param password: The password to set
.. versionadded:: 0.1.0
"""
self.password_hash = generate_password_hash(password)
[docs] def check_password(self, password: str) -> bool:
"""Check if the password is correct.
:param password: The password to check
.. versionadded:: 0.1.0
"""
return check_password_hash(self.password_hash, password)
[docs] def api_token(self, expiration=3600 * 24 * 7) -> str:
"""Generate a new API token for the user.
:param expiration: The expiration time of the token in seconds
:returns: The API token
.. versionadded:: 0.3.0
.. versionchanged:: 0.4.2
"""
from ..globals import current_app # noreorder
header = {"alg": "HS256"}
data = {"id": self.id, "created": time(), "expiration": expiration}
return jwt.encode(header, data, current_app.config["SECRET_KEY"]).decode()
[docs] def update(self, data: dict[str, Any]) -> None:
"""Update the user with the given dict.
:param data: The dict containing user data
.. versionadded:: 0.4.2
"""
for attr, value in data.items():
if not hasattr(self, attr) and attr != "password": # pragma: no cover
abort(400, f"User model has no attribute {attr}.")
elif attr == "password":
self.set_password(value)
continue
if attr == "password_hash": # pragma: no cover
abort(400, "You should not hard-code the password hash.")
self.__setattr__(attr, value)
db.session.add(self)
db.session.commit()
[docs] def has_permission(self, perm: Permission) -> bool:
"""Check if a permission is in a user's permissions"""
return perm in (
b64decode(bytes(self_permission_b64, encoding="utf-8")).decode()
for self_permission_b64 in self.permissions.split(",")
)
[docs] def add_permission(self, perm: Permission) -> None:
"""Add a permission to a user"""
if self.permissions is not None and self.has_permission(perm):
warn(PermissionExistingWarning(self.username, perm))
to_append = b64encode(bytes(perm, encoding="utf-8")).decode() + ","
self.permissions = (
"" if self.permissions is None else self.permissions + to_append
)