Plugin Example
Coming full circle with all Four-Elements (Middleware, Extension, Permissions and Router)
Plugin Workflow
graph LR;
Z[Client] --> A;
A[Request] --> B;
B{Middleware} --> C;
C{Extension} --> D;
D{Permissions} --> E;
E[Resolver] --> F;
F[Response] --> Z;
Command
Files Layout
root/
|
|-- apps/
| `-- MY_PLUGIN/ --> <Directory> - Your App in HERE!
| |
| |-- extension.py
| |-- middleware.py
| |-- permission.py
| |-- router.py
| `-- users.py
|
`-- etc...
Other Requirements
Demo Credentials
- Username:
johndoe
orjanedoe
- Password:
secret
Python Code
users.py
# -*- coding: utf-8 -*-
"""
[Users]
"""
from datetime import datetime, timedelta
from typing import Optional
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from jose import JWTError, jwt
from passlib.context import CryptContext
from pydantic import BaseModel
SECRET_KEY = "fastapi-insecure-09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b"
# Security
ALGORITHM = "HS256"
FAKE_USERS_DB = {
"johndoe": {
"id": 1,
"username": "johndoe",
"full_name": "John Doe",
"email": "johndoe@example.com",
"hashed_password": "$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW",
"disabled": False,
"is_staff": True,
"super_user": False,
},
"janedoe": {
"id": 2,
"username": "janedoe",
"full_name": "Jane Doe",
"email": "janedoe@example.com",
"hashed_password": "$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW",
"disabled": False,
"is_staff": False,
"super_user": False,
},
}
FAKE_ROLES_DB = {1: "admin", 2: "public"}
class UserLogin(BaseModel):
username: str
password: str
class Token(BaseModel):
access_token: str
token_type: str
class User(BaseModel):
id: str
role: str
username: str | None = None
email: str | None = None
full_name: str | None = None
disabled: bool = False
is_staff: bool = False
super_user: bool = False
is_authenticated: bool = False
is_anonymous: bool = False
class UserInDB(User):
hashed_password: str
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
def verify_password(plain_password, hashed_password):
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password):
return pwd_context.hash(password)
async def get_user(username: str):
db = FAKE_USERS_DB
role_db = FAKE_ROLES_DB
if username in db:
user_dict = db[username]
role = role_db.get(user_dict.get("id"))
user_dict["role"] = role
return UserInDB(**user_dict)
async def get_request_user(request):
access_token = request.headers.get("Authorization")
user = None
if access_token:
token = access_token.replace("Bearer ", "")
auth_dict = await decode_token(token)
if auth_dict:
username = auth_dict.get("username")
user = await get_user(username=username)
return user
async def authenticate_user(username: str, password: str):
user = await get_user(username)
if not user:
return False
if not verify_password(password, user.hashed_password):
return False
return user
def create_access_token(data: dict, expires_delta: timedelta | None = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=15)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
async def decode_token(token: str):
try:
return_value = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
except JWTError:
return_value = {}
return return_value
async def get_current_user(
token: Optional[str] = Depends(oauth2_scheme),
):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
payload = await decode_token(token)
username: str = payload.get("username")
if username is None:
raise credentials_exception
user = await get_user(username=username)
if user is None:
raise credentials_exception
return user
async def get_current_active_user(current_user: User = Depends(get_current_user)):
if current_user.disabled:
raise HTTPException(status_code=400, detail="Inactive user")
return current_user
router.py
# -*- coding: utf-8 -*-
"""
[Router]
"""
from datetime import timedelta
from fastapi import Depends, HTTPException, Response, status
from fastapi.responses import JSONResponse
from fastapi.security import OAuth2PasswordRequestForm
from fastberry import Router
from .users import (
Token,
User,
authenticate_user,
create_access_token,
get_current_active_user,
)
ACCESS_TOKEN_EXPIRE_MINUTES = 60 * 24
router = Router(
tags=["Users"],
)
def to_camel_case(text):
init, *temp = text.split("_")
return "".join([init.lower(), *map(str.title, temp)])
@router.post("/token", response_model=Token)
async def login_for_access_token(
response: Response,
form_data: OAuth2PasswordRequestForm = Depends(),
):
user = await authenticate_user(form_data.username, form_data.password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": str(user.id), "username": user.username},
expires_delta=access_token_expires,
)
response.set_cookie(
key="Authorization", value=access_token, httponly=True, secure=True
)
return {"access_token": access_token, "token_type": "bearer"}
@router.get("/logout")
async def logout_user(current_user: User = Depends(get_current_active_user)):
response = JSONResponse({"logout": True})
response.delete_cookie(key="Authorization")
return response
@router.get("/user-me") # response_model=User
async def read_users_me(current_user: User = Depends(get_current_active_user)):
user_me = {to_camel_case(k): v for k, v in current_user.__dict__.items()}
del user_me["hashedPassword"]
return user_me
extension.py
# -*- coding: utf-8 -*-
""" [Extension]
Inject { User } or { Anonymous-User } to GraphQL Context.
"""
import secrets
from fastberry import BaseExtension
from .users import User, get_request_user
def anonymous_user():
"""Default Anonymous User"""
token = f"anonymous-{secrets.token_urlsafe(38)}"
user = User(
id=token,
role="public",
is_anonymous=True,
)
return user
class InjectUser(BaseExtension):
"""Inject User Extension"""
async def on_executing_start(self):
request = self.execution_context.context.get("request")
user = await get_request_user(request)
if not user:
# Anonymous-User
user = anonymous_user()
else:
# User-Authenticated
user_dict = user.__dict__
user_dict["is_authenticated"] = True
user = User(**user_dict)
# Set-User (Context)
self.execution_context.context["user"] = user
middleware.py
# -*- coding: utf-8 -*-
""" [Middleware]
Get { Authorization } from the { Cookies } and inject it to the { Headers }.
"""
from fastberry import BaseMiddleware
class AuthenticatedCookieMiddleware(BaseMiddleware):
"""Get Authorization From Cookie"""
async def dispatch(self, request, call_next):
"""Process Request and Inject Header"""
if (
"Authorization" not in request.headers
and "Authorization" in request.cookies
):
access_token = request.cookies.get("Authorization")
request.headers.__dict__["_list"].append(
(
"authorization".encode(),
f"Bearer {access_token}".encode(),
)
)
response = await call_next(request)
return response
permission.py
# -*- coding: utf-8 -*-
""" [Permission]
Check GraphQL Context for a { User } or { Anonymous-User }.
"""
import typing
from strawberry.types import Info
from fastberry import BasePermission
ROLES = {
"public": ["SomeMethod"],
"admin": ["demoDetail", "demoSearch", "demoCreate", "demoUpdate", "demoDelete"],
}
def get_perms(role: str = None):
"""Get Role And Check For Permissions"""
found = ROLES.get(role, [])
if found and role:
perms = found
else:
perms = []
return perms
class IsAuthorized(BasePermission):
"""Check If User Is Authorized"""
message = "User is not authorized" # Unauthorized
async def has_permission(self, source: typing.Any, info: Info, **kwargs) -> bool:
"""Check GraphQL's Info Context"""
# if not user.is_authenticated and user.is_anonymous:
operation = info.field_name # info.python_name
user = info.context.get("user")
if user:
permissions = get_perms(user.role)
return operation in permissions
return False