1
0
mirror of https://gitlab.com/MoonTestUse1/AdministrationItDepartmens.git synced 2025-08-14 00:25:46 +02:00

Fix database

This commit is contained in:
MoonTestUse1
2025-01-07 05:36:07 +06:00
parent 24f969425f
commit 298c7f5f53
3 changed files with 27 additions and 31 deletions

View File

@@ -6,7 +6,7 @@ from fastapi.security import OAuth2PasswordBearer
from .database import SessionLocal from .database import SessionLocal
from .core.config import settings from .core.config import settings
from .utils.jwt import verify_token from .utils.jwt import verify_token, verify_token_in_db
from .models.employee import Employee from .models.employee import Employee
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="api/auth/login") oauth2_scheme = OAuth2PasswordBearer(tokenUrl="api/auth/login")
@@ -30,11 +30,13 @@ async def get_current_employee(
headers={"WWW-Authenticate": "Bearer"}, headers={"WWW-Authenticate": "Bearer"},
) )
employee_id = verify_token(token) # Проверяем токен
if not employee_id: token_data = verify_token_in_db(token, db)
if not token_data:
raise credentials_exception raise credentials_exception
employee = db.query(Employee).filter(Employee.id == employee_id).first() # Получаем сотрудника
employee = db.query(Employee).filter(Employee.id == token_data.employee_id).first()
if not employee: if not employee:
raise credentials_exception raise credentials_exception

View File

@@ -16,24 +16,31 @@ def create_access_token(data: dict) -> str:
encoded_jwt = jwt.encode(to_encode, settings.SECRET_KEY, algorithm=settings.ALGORITHM) encoded_jwt = jwt.encode(to_encode, settings.SECRET_KEY, algorithm=settings.ALGORITHM)
return encoded_jwt return encoded_jwt
def verify_token(token: str, db: Session) -> Optional[TokenData]: def verify_token(token: str) -> Optional[int]:
"""Verify token""" """Verify token and return employee_id"""
try: try:
# Проверяем, что токен действителен # Проверяем, что токен действителен
payload = jwt.decode(token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM]) payload = jwt.decode(token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM])
employee_id = int(payload.get("sub")) employee_id = int(payload.get("sub"))
if employee_id is None: if employee_id is None:
return None return None
return employee_id
# Проверяем, что токен существует в базе
db_token = db.query(Token).filter(Token.token == token).first()
if not db_token:
return None
return TokenData(employee_id=employee_id)
except (JWTError, ValueError): except (JWTError, ValueError):
return None return None
def verify_token_in_db(token: str, db: Session) -> Optional[TokenData]:
"""Verify token in database"""
employee_id = verify_token(token)
if employee_id is None:
return None
# Проверяем, что токен существует в базе
db_token = db.query(Token).filter(Token.token == token).first()
if not db_token:
return None
return TokenData(employee_id=employee_id)
def create_and_save_token(employee_id: int, db: Session) -> str: def create_and_save_token(employee_id: int, db: Session) -> str:
"""Create and save token""" """Create and save token"""
# Создаем токен # Создаем токен

View File

@@ -6,6 +6,7 @@ from sqlalchemy.orm import Session
from app.crud import employees from app.crud import employees
from app.schemas.employee import EmployeeCreate from app.schemas.employee import EmployeeCreate
from app.utils.auth import get_password_hash from app.utils.auth import get_password_hash
from app.utils.jwt import create_and_save_token
from app.models.employee import Employee from app.models.employee import Employee
@pytest.fixture(scope="function") @pytest.fixture(scope="function")
@@ -53,25 +54,11 @@ def test_admin(db_session: Session) -> Employee:
return db_admin return db_admin
@pytest.fixture(scope="function") @pytest.fixture(scope="function")
def employee_token(client: TestClient, test_employee: Employee) -> str: def employee_token(db_session: Session, test_employee: Employee) -> str:
"""Get employee token""" """Get employee token"""
response = client.post( return create_and_save_token(test_employee.id, db_session)
"/api/auth/login",
data={
"username": f"{test_employee.first_name} {test_employee.last_name}",
"password": "testpass123"
}
)
return response.json()["access_token"]
@pytest.fixture(scope="function") @pytest.fixture(scope="function")
def admin_token(client: TestClient, test_admin: Employee) -> str: def admin_token(db_session: Session, test_admin: Employee) -> str:
"""Get admin token""" """Get admin token"""
response = client.post( return create_and_save_token(test_admin.id, db_session)
"/api/auth/admin/login",
data={
"username": f"{test_admin.first_name} {test_admin.last_name}",
"password": "adminpass123"
}
)
return response.json()["access_token"]