1
0
mirror of https://gitlab.com/MoonTestUse1/AdministrationItDepartmens.git synced 2025-08-14 00:25:46 +02:00

Testing workable

This commit is contained in:
MoonTestUse1
2025-02-07 00:43:33 +06:00
parent 8543d7fe88
commit 6db95a5eb0
37 changed files with 911 additions and 454 deletions

View File

@@ -1,67 +1,82 @@
"""JWT utilities"""
from datetime import datetime, timedelta
from jose import JWTError, jwt
from sqlalchemy.orm import Session
from typing import Optional
from jose import JWTError, jwt
from fastapi import HTTPException, status
from redis import Redis
from sqlalchemy.orm import Session
from ..core.config import settings
from ..core.test_config import test_settings
from ..models.token import Token
from ..schemas.auth import TokenData
from ..crud.employees import get_employee
def get_settings():
"""Get settings based on environment"""
return test_settings if test_settings.TESTING else settings
redis = Redis(
host=settings.REDIS_HOST,
port=settings.REDIS_PORT,
decode_responses=True
)
def create_access_token(data: dict) -> str:
"""Create access token"""
to_encode = data.copy()
config = get_settings()
expire = datetime.utcnow() + timedelta(minutes=config.ACCESS_TOKEN_EXPIRE_MINUTES)
expire = datetime.utcnow() + timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, config.SECRET_KEY, algorithm=config.ALGORITHM)
encoded_jwt = jwt.encode(to_encode, settings.SECRET_KEY, algorithm=settings.ALGORITHM)
return encoded_jwt
def verify_token(token: str) -> Optional[int]:
"""Verify token and return employee_id"""
def verify_token(token: str, db: Session) -> dict:
try:
config = get_settings()
payload = jwt.decode(token, config.SECRET_KEY, algorithms=[config.ALGORITHM])
employee_id = int(payload.get("sub"))
if employee_id is None:
return None
return employee_id
except (JWTError, ValueError):
return None
payload = jwt.decode(token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM])
user_id: int = payload.get("sub")
if user_id is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
)
# Проверяем токен в Redis
if not redis.get(f"token:{token}"):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
)
return payload
except JWTError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
)
def verify_token_in_db(token: str, db: Session) -> Optional[TokenData]:
"""Verify token in database"""
employee_id = verify_token(token)
if employee_id is None:
return None
# Проверяем, что токен существует в базе
db_token = db.query(Token).filter(Token.token == token).first()
if not db_token:
return None
return TokenData(employee_id=employee_id)
def create_and_save_token(employee_id: int, db: Session) -> str:
"""Create and save token"""
# Создаем токен
access_token = create_access_token({"sub": str(employee_id)})
def create_and_save_token(user_id: int, db: Session) -> str:
# Создаем JWT токен
access_token = create_access_token({"sub": str(user_id)})
# Удаляем старые токены пользователя
db.query(Token).filter(Token.employee_id == employee_id).delete()
# Сохраняем новый токен в базу
# Сохраняем в БД
db_token = Token(
token=access_token,
employee_id=employee_id
user_id=user_id
)
db.add(db_token)
db.commit()
db.refresh(db_token)
return access_token
# Кэшируем в Redis
redis.setex(
f"token:{access_token}",
timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES),
"valid"
)
return access_token
def get_current_employee(token: str, db: Session):
payload = verify_token(token, db)
employee_id = int(payload.get("sub"))
if employee_id == -1: # Для админа
return {"is_admin": True}
employee = get_employee(db, employee_id)
if employee is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Employee not found",
)
return employee