1
0
mirror of https://gitlab.com/MoonTestUse1/AdministrationItDepartmens.git synced 2025-08-14 00:25:46 +02:00

Fix database

This commit is contained in:
MoonTestUse1
2025-01-07 05:26:33 +06:00
parent d9e276ad6b
commit bdf4ae9d70
29 changed files with 727 additions and 531 deletions

8
backend/.env.test Normal file
View File

@@ -0,0 +1,8 @@
POSTGRES_USER=postgres
POSTGRES_PASSWORD=postgres
POSTGRES_HOST=localhost
POSTGRES_PORT=5432
POSTGRES_DB=test_app
DATABASE_URL=postgresql://postgres:postgres@localhost:5432/test_app
SECRET_KEY=test_secret_key
TESTING=True

View File

@@ -1,25 +1,55 @@
"""Application configuration"""
from functools import lru_cache
import os
from pydantic_settings import BaseSettings
from functools import lru_cache
class Settings(BaseSettings):
"""Application settings"""
PROJECT_NAME: str = "Employee Request System"
# Database
DATABASE_URL: str = "postgresql://postgres:postgres@localhost:5432/employee_requests"
# База данных
POSTGRES_USER: str = "postgres"
POSTGRES_PASSWORD: str = "postgres"
POSTGRES_HOST: str = "postgres"
POSTGRES_PORT: str = "5432"
POSTGRES_DB: str = "app"
POSTGRES_TEST_DB: str = "test_app"
DATABASE_URL: str | None = None
# JWT
SECRET_KEY: str = "your-secret-key"
ALGORITHM: str = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES: int = 30
# Режим тестирования
TESTING: bool = bool(os.getenv("TESTING"))
# Redis
REDIS_HOST: str = "localhost"
REDIS_HOST: str = "redis"
REDIS_PORT: int = 6379
REDIS_DB: int = 0
REDIS_TEST_DB: int = 1
def get_database_url(self) -> str:
"""Get database URL"""
if self.DATABASE_URL:
return self.DATABASE_URL
if self.TESTING:
return f"postgresql://{self.POSTGRES_USER}:{self.POSTGRES_PASSWORD}@localhost:5432/{self.POSTGRES_TEST_DB}"
return f"postgresql://{self.POSTGRES_USER}:{self.POSTGRES_PASSWORD}@{self.POSTGRES_HOST}:{self.POSTGRES_PORT}/{self.POSTGRES_DB}"
def get_redis_url(self) -> str:
"""Get Redis URL"""
db = self.REDIS_TEST_DB if self.TESTING else self.REDIS_DB
host = "localhost" if self.TESTING else self.REDIS_HOST
return f"redis://{host}:{self.REDIS_PORT}/{db}"
# Telegram
TELEGRAM_BOT_TOKEN: str = os.getenv("TELEGRAM_BOT_TOKEN", "")
TELEGRAM_CHAT_ID: str = os.getenv("TELEGRAM_CHAT_ID", "")
class Config:
"""Pydantic config"""
env_file = ".env"
case_sensitive = True
@lru_cache()

View File

@@ -6,7 +6,12 @@ class TestSettings(BaseSettings):
PROJECT_NAME: str = "Employee Request System Test"
# Database
DATABASE_URL: str = "sqlite:///:memory:"
POSTGRES_USER: str = "postgres"
POSTGRES_PASSWORD: str = "postgres"
POSTGRES_HOST: str = "localhost"
POSTGRES_PORT: str = "5432"
POSTGRES_DB: str = "test_app"
DATABASE_URL: str = "postgresql://postgres:postgres@localhost:5432/test_app"
# JWT
SECRET_KEY: str = "test_secret_key"
@@ -16,9 +21,14 @@ class TestSettings(BaseSettings):
# Redis
REDIS_HOST: str = "localhost"
REDIS_PORT: int = 6379
REDIS_DB: int = 1
# Testing
TESTING: bool = True
class Config:
"""Pydantic config"""
case_sensitive = True
env_file = ".env.test"
test_settings = TestSettings()

View File

@@ -1,6 +1,6 @@
"""Employee CRUD operations"""
from sqlalchemy.orm import Session
from typing import Optional, List
from typing import List, Optional
from ..models.employee import Employee
from ..schemas.employee import EmployeeCreate, EmployeeUpdate
from ..utils.loggers import auth_logger
@@ -13,57 +13,42 @@ def get_employee(db: Session, employee_id: int) -> Optional[Employee]:
"""Get employee by ID"""
return db.query(Employee).filter(Employee.id == employee_id).first()
def get_employee_by_last_name(db: Session, last_name: str) -> Optional[Employee]:
"""Get employee by last name"""
return db.query(Employee).filter(Employee.last_name == last_name).first()
def get_employee_by_credentials(db: Session, first_name: str, last_name: str) -> Optional[Employee]:
"""Get employee by first name and last name"""
return db.query(Employee).filter(
Employee.first_name == first_name,
Employee.last_name == last_name
).first()
def create_employee(db: Session, employee: EmployeeCreate, hashed_password: str) -> Employee:
"""Create new employee"""
try:
db_employee = Employee(
first_name=employee.first_name,
last_name=employee.last_name,
department=employee.department,
office=employee.office,
hashed_password=hashed_password
hashed_password=hashed_password,
is_admin=employee.is_admin
)
db.add(db_employee)
db.commit()
db.refresh(db_employee)
return db_employee
except Exception as e:
auth_logger.error(f"Error creating employee: {e}")
db.rollback()
raise
def update_employee(db: Session, employee_id: int, employee: EmployeeUpdate) -> Optional[Employee]:
"""Update employee"""
"""Update employee data"""
db_employee = get_employee(db, employee_id)
if not db_employee:
return None
for field, value in employee.model_dump(exclude_unset=True).items():
setattr(db_employee, field, value)
try:
if db_employee:
for key, value in employee.dict(exclude_unset=True).items():
setattr(db_employee, key, value)
db.commit()
db.refresh(db_employee)
return db_employee
except Exception as e:
auth_logger.error(f"Error updating employee: {e}")
db.rollback()
raise
def delete_employee(db: Session, employee_id: int) -> Optional[Employee]:
"""Delete employee"""
db_employee = get_employee(db, employee_id)
if db_employee:
try:
db.delete(db_employee)
db.commit()
return db_employee
except Exception as e:
auth_logger.error(f"Error deleting employee: {e}")
db.rollback()
raise
return None

View File

@@ -3,9 +3,9 @@ from sqlalchemy.orm import Session
from typing import Optional
from ..models.token import Token
def create_token(db: Session, token: str, user_id: int) -> Token:
def create_token(db: Session, token: str, employee_id: int) -> Token:
"""Create new token"""
db_token = Token(token=token, user_id=user_id)
db_token = Token(token=token, employee_id=employee_id)
db.add(db_token)
db.commit()
db.refresh(db_token)
@@ -24,8 +24,8 @@ def delete_token(db: Session, token: str) -> bool:
return True
return False
def delete_user_tokens(db: Session, user_id: int) -> bool:
"""Delete all tokens for a user"""
db.query(Token).filter(Token.user_id == user_id).delete()
def delete_employee_tokens(db: Session, employee_id: int) -> bool:
"""Delete all tokens for an employee"""
db.query(Token).filter(Token.employee_id == employee_id).delete()
db.commit()
return True

View File

@@ -1,25 +1,24 @@
"""Database module"""
"""Database configuration"""
import os
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from .core.config import settings
# Определяем URL базы данных в зависимости от окружения
if os.getenv("TESTING"):
SQLALCHEMY_DATABASE_URL = "sqlite:///:memory:"
engine = create_engine(
SQLALCHEMY_DATABASE_URL,
connect_args={"check_same_thread": False}
)
else:
SQLALCHEMY_DATABASE_URL = "postgresql://postgres:postgres@postgres:5432/app"
engine = create_engine(SQLALCHEMY_DATABASE_URL)
# Определяем, используем ли тестовую базу данных
TESTING = os.getenv("TESTING", "False") == "True"
DATABASE_URL = "sqlite:///:memory:" if TESTING else settings.DATABASE_URL
# Создаем базовый класс для моделей
Base = declarative_base()
# Создаем движок базы данных
connect_args = {"check_same_thread": False} if TESTING else {}
engine = create_engine(DATABASE_URL, connect_args=connect_args)
# Создаем фабрику сессий
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()
def get_db():
"""Get database session"""
db = SessionLocal()

View File

@@ -1,21 +1,22 @@
"""Database initialization script"""
"""Database initialization"""
from sqlalchemy.orm import Session
from app.core.config import settings
from app.models.employee import Employee
from app.utils.auth import get_password_hash
from ..models.employee import Employee
from ..utils.auth import get_password_hash
def init_db(db: Session) -> None:
"""Initialize database with default data"""
# Создаем тестового сотрудника
test_employee = db.query(Employee).filter(Employee.last_name == "User").first()
if not test_employee:
test_employee = Employee(
first_name="Test",
# Проверяем, есть ли уже админ в базе
admin = db.query(Employee).filter(Employee.is_admin == True).first()
if not admin:
# Создаем админа по умолчанию
admin = Employee(
first_name="Admin",
last_name="User",
department="IT",
office="101",
hashed_password=get_password_hash("testpass123")
office="102",
hashed_password=get_password_hash("adminpass123"),
is_admin=True
)
db.add(test_employee)
db.add(admin)
db.commit()
db.refresh(test_employee)
db.refresh(admin)

View File

@@ -0,0 +1,63 @@
"""Dependencies module"""
from typing import Generator, Any
from sqlalchemy.orm import Session
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from .database import SessionLocal
from .core.config import settings
from .utils.jwt import verify_token
from .models.employee import Employee
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="api/auth/login")
def get_db() -> Generator[Session, Any, None]:
"""Get database session"""
db = SessionLocal()
try:
yield db
finally:
db.close()
async def get_current_employee(
db: Session = Depends(get_db),
token: str = Depends(oauth2_scheme)
) -> Employee:
"""Get current employee"""
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
employee_id = verify_token(token)
if not employee_id:
raise credentials_exception
employee = db.query(Employee).filter(Employee.id == employee_id).first()
if not employee:
raise credentials_exception
return employee
async def get_current_active_employee(
current_employee: Employee = Depends(get_current_employee),
) -> Employee:
"""Get current active employee"""
if not current_employee.is_active:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Inactive employee"
)
return current_employee
async def get_current_admin(
current_employee: Employee = Depends(get_current_employee),
) -> Employee:
"""Get current admin"""
if not current_employee.is_admin:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="The employee doesn't have enough privileges"
)
return current_employee

View File

@@ -1,62 +1,44 @@
"""Main application module"""
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from sqlalchemy.orm import Session
from pydantic_settings import BaseSettings
import logging
from .models.base import Base
from .database import engine, SessionLocal
from .routers import admin, employees, requests, auth, statistics
from .routers import auth, employees, requests, admin
from .database import engine, Base
from .db.init_db import init_db
from .core.config import settings
from .database import get_db
# Настройка логирования
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def get_application(app_settings: BaseSettings = settings) -> FastAPI:
"""Создание экземпляра приложения с заданными настройками."""
# Создаем таблицы
Base.metadata.create_all(bind=engine)
# Инициализируем базу данных
db = SessionLocal()
try:
init_db(db)
finally:
db.close()
app = FastAPI(
# Включаем автоматическое перенаправление со слэшем
redirect_slashes=True,
# Добавляем описание API
title="Support System API",
description="API для системы поддержки",
version="1.0.0"
)
# CORS configuration
origins = [
"http://localhost",
"http://localhost:8080",
"http://localhost:5173",
"http://127.0.0.1:5173",
"http://127.0.0.1:8080",
"http://185.139.70.62", # Добавляем ваш production домен
]
# Создаем приложение
app = FastAPI(title="Employee Request System API")
# Настраиваем CORS
app.add_middleware(
CORSMiddleware,
allow_origins=origins,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
expose_headers=["*"]
)
# Include routers
# Подключаем роутеры
app.include_router(auth.router, prefix="/api/auth", tags=["auth"])
app.include_router(employees.router, prefix="/api/employees", tags=["employees"])
app.include_router(requests.router, prefix="/api/requests", tags=["requests"])
app.include_router(admin.router, prefix="/api/admin", tags=["admin"])
app.include_router(statistics.router, prefix="/api/statistics", tags=["statistics"])
return app
app = get_application()
# Инициализируем базу данных
@app.on_event("startup")
async def startup_event():
"""Initialize database on startup"""
db = next(get_db())
try:
init_db(db)
finally:
db.close()

View File

@@ -1,5 +1,6 @@
"""Token model"""
from sqlalchemy import Column, Integer, String, DateTime
from sqlalchemy import Column, Integer, String, DateTime, ForeignKey
from sqlalchemy.orm import relationship
from datetime import datetime
from ..database import Base
@@ -10,5 +11,7 @@ class Token(Base):
id = Column(Integer, primary_key=True, index=True)
token = Column(String, unique=True, index=True)
employee_id = Column(Integer)
employee_id = Column(Integer, ForeignKey("employees.id"), nullable=False)
created_at = Column(DateTime, default=datetime.utcnow)
employee = relationship("Employee", backref="tokens")

View File

@@ -6,7 +6,7 @@ from typing import Optional
from ..database import get_db
from ..crud import employees
from ..schemas.auth import Token
from ..schemas.auth import Token, LoginCredentials
from ..utils.auth import verify_password
from ..utils.jwt import create_and_save_token
@@ -19,8 +19,18 @@ async def login_for_access_token(
db: Session = Depends(get_db)
):
"""Авторизация сотрудника"""
# Разделяем username на имя и фамилию
try:
first_name, last_name = form_data.username.split()
except ValueError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Username should be in format: 'First Last'",
headers={"WWW-Authenticate": "Bearer"},
)
# Проверяем учетные данные сотрудника
employee = employees.get_employee_by_last_name(db, form_data.username)
employee = employees.get_employee_by_credentials(db, first_name, last_name)
if not employee or not verify_password(form_data.password, employee.hashed_password):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
@@ -42,8 +52,18 @@ async def admin_login(
db: Session = Depends(get_db)
):
"""Авторизация администратора"""
# Разделяем username на имя и фамилию
try:
first_name, last_name = form_data.username.split()
except ValueError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Username should be in format: 'First Last'",
headers={"WWW-Authenticate": "Bearer"},
)
# Проверяем учетные данные администратора
employee = employees.get_employee_by_last_name(db, form_data.username)
employee = employees.get_employee_by_credentials(db, first_name, last_name)
if not employee or not employee.is_admin or not verify_password(form_data.password, employee.hashed_password):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,

View File

@@ -1,9 +1,10 @@
"""Schemas package"""
from .employee import Employee, EmployeeCreate, EmployeeUpdate
from .request import Request, RequestCreate, RequestUpdate
from .auth import Token, TokenData
from .auth import Token, TokenData, LoginCredentials
__all__ = [
'Employee', 'EmployeeCreate', 'EmployeeUpdate',
'Request', 'RequestCreate', 'RequestUpdate',
'Token', 'TokenData'
'Token', 'TokenData', 'LoginCredentials'
]

View File

@@ -1,32 +1,18 @@
"""Authentication schemas"""
from pydantic import BaseModel, ConfigDict
class AdminLogin(BaseModel):
username: str
password: str
model_config = ConfigDict(from_attributes=True)
class EmployeeLogin(BaseModel):
last_name: str
password: str
model_config = ConfigDict(from_attributes=True)
class EmployeeResponse(BaseModel):
id: int
first_name: str
last_name: str
department: str
office: str
access_token: str
from pydantic import BaseModel
from typing import Optional
class Token(BaseModel):
"""Token schema"""
access_token: str
token_type: str
class TokenData(BaseModel):
employee_id: int | None = None
"""Token data schema"""
employee_id: Optional[int] = None
is_admin: bool = False
model_config = ConfigDict(from_attributes=True)
class LoginCredentials(BaseModel):
"""Login credentials schema"""
username: str # В формате "Имя Фамилия"
password: str

View File

@@ -1,31 +1,33 @@
"""Employee schemas"""
from pydantic import BaseModel, ConfigDict
from pydantic import BaseModel
from datetime import datetime
from typing import Optional
class EmployeeBase(BaseModel):
"""Base employee schema"""
first_name: str
last_name: str
department: str
office: str
is_admin: bool = False
model_config = ConfigDict(from_attributes=True)
class EmployeeCreate(EmployeeBase):
"""Employee creation schema"""
password: str
class EmployeeUpdate(BaseModel):
"""Employee update schema"""
first_name: Optional[str] = None
last_name: Optional[str] = None
department: Optional[str] = None
office: Optional[str] = None
is_admin: Optional[bool] = None
model_config = ConfigDict(from_attributes=True)
class Employee(EmployeeBase):
"""Employee schema"""
id: int
is_active: bool
created_at: datetime
model_config = ConfigDict(from_attributes=True)
class Config:
"""Pydantic config"""
from_attributes = True

View File

@@ -8,6 +8,6 @@ class Token(BaseModel):
model_config = ConfigDict(from_attributes=True)
class TokenData(BaseModel):
user_id: int | None = None
employee_id: int | None = None
model_config = ConfigDict(from_attributes=True)

View File

@@ -3,11 +3,11 @@ from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from passlib.context import CryptContext
from sqlalchemy.orm import Session
import re
from .jwt import verify_token
from ..database import get_db
from ..crud import employees
from ..models.employee import Employee
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
security = HTTPBearer(auto_error=False)
@@ -23,7 +23,7 @@ def verify_password(plain_password: str, hashed_password: str) -> bool:
def get_current_admin(
credentials: HTTPAuthorizationCredentials = Depends(security),
db: Session = Depends(get_db)
) -> dict:
) -> Employee:
"""Get current admin from token"""
if not credentials:
raise HTTPException(
@@ -34,11 +34,16 @@ def get_current_admin(
try:
token = credentials.credentials
payload = verify_token(token, db)
employee_id = int(payload.get("sub"))
token_data = verify_token(token, db)
if not token_data:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid authentication credentials",
headers={"WWW-Authenticate": "Bearer"},
)
# Проверяем, что это админ
employee = employees.get_employee(db, employee_id)
employee = employees.get_employee(db, token_data.employee_id)
if not employee or not employee.is_admin:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
@@ -47,7 +52,7 @@ def get_current_admin(
)
return employee
except Exception as e:
except Exception:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid authentication credentials",
@@ -57,7 +62,7 @@ def get_current_admin(
def get_current_employee(
credentials: HTTPAuthorizationCredentials = Depends(security),
db: Session = Depends(get_db)
) -> dict:
) -> Employee:
"""Get current employee from token"""
if not credentials:
raise HTTPException(
@@ -68,11 +73,16 @@ def get_current_employee(
try:
token = credentials.credentials
payload = verify_token(token, db)
employee_id = int(payload.get("sub"))
token_data = verify_token(token, db)
if not token_data:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid authentication credentials",
headers={"WWW-Authenticate": "Bearer"},
)
# Проверяем существование сотрудника
employee = employees.get_employee(db, employee_id)
employee = employees.get_employee(db, token_data.employee_id)
if not employee:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,

View File

@@ -2,9 +2,11 @@
from datetime import datetime, timedelta
from jose import JWTError, jwt
from sqlalchemy.orm import Session
from typing import Optional
from ..core.config import settings
from ..models.token import Token
from ..schemas.auth import TokenData
def create_access_token(data: dict) -> str:
"""Create access token"""
@@ -14,13 +16,22 @@ def create_access_token(data: dict) -> str:
encoded_jwt = jwt.encode(to_encode, settings.SECRET_KEY, algorithm=settings.ALGORITHM)
return encoded_jwt
def verify_token(token: str, db: Session) -> dict:
def verify_token(token: str, db: Session) -> Optional[TokenData]:
"""Verify token"""
try:
# Проверяем, что токен действителен
payload = jwt.decode(token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM])
return payload
except JWTError:
employee_id = int(payload.get("sub"))
if employee_id is None:
return None
# Проверяем, что токен существует в базе
db_token = db.query(Token).filter(Token.token == token).first()
if not db_token:
return None
return TokenData(employee_id=employee_id)
except (JWTError, ValueError):
return None
def create_and_save_token(employee_id: int, db: Session) -> str:

View File

@@ -3,20 +3,17 @@ from aiogram import Bot
from aiogram.types import InlineKeyboardMarkup, InlineKeyboardButton
import asyncio
from datetime import datetime
import os
from logging import getLogger
from ..models.request import RequestStatus, RequestPriority
from ..crud import requests
from ..database import get_db
from ..core.config import settings
# Initialize logger
logger = getLogger(__name__)
# Initialize bot with token
TELEGRAM_BOT_TOKEN = os.getenv("TELEGRAM_BOT_TOKEN", "7677506032:AAHduD5EePz3bE23DKlo35KoOp2_9lZuS34")
TELEGRAM_CHAT_ID = os.getenv("TELEGRAM_CHAT_ID", "5057752127")
bot = Bot(token=TELEGRAM_BOT_TOKEN)
# Initialize bot with token from settings
bot = Bot(token=settings.TELEGRAM_BOT_TOKEN)
def format_priority(priority: str) -> str:
"""Format priority with emoji"""
@@ -59,7 +56,7 @@ async def send_request_notification(request_id: int):
)
await bot.send_message(
chat_id=TELEGRAM_CHAT_ID,
chat_id=settings.TELEGRAM_CHAT_ID,
text=message,
parse_mode="HTML"
)

View File

@@ -1,5 +1,79 @@
-- Connect to the database
\c support_db;
-- Создаем основную базу данных
CREATE DATABASE app;
\c app;
-- Create extensions if needed
CREATE EXTENSION IF NOT EXISTS "uuid-ossp";
-- Создаем таблицы для основной базы данных
CREATE TABLE employees (
id SERIAL PRIMARY KEY,
first_name VARCHAR NOT NULL,
last_name VARCHAR NOT NULL,
department VARCHAR NOT NULL,
office VARCHAR NOT NULL,
hashed_password VARCHAR NOT NULL,
is_active BOOLEAN DEFAULT TRUE,
is_admin BOOLEAN DEFAULT FALSE,
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE requests (
id SERIAL PRIMARY KEY,
request_type VARCHAR NOT NULL,
description TEXT NOT NULL,
priority VARCHAR NOT NULL,
status VARCHAR NOT NULL DEFAULT 'new',
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
employee_id INTEGER NOT NULL REFERENCES employees(id)
);
CREATE TABLE tokens (
id SERIAL PRIMARY KEY,
token VARCHAR UNIQUE NOT NULL,
employee_id INTEGER NOT NULL REFERENCES employees(id),
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
);
-- Создаем индексы
CREATE INDEX idx_employees_last_name ON employees(last_name);
CREATE INDEX idx_requests_employee_id ON requests(employee_id);
CREATE INDEX idx_requests_status ON requests(status);
CREATE INDEX idx_tokens_token ON tokens(token);
-- Создаем тестовую базу данных
CREATE DATABASE test_app;
\c test_app;
-- Создаем те же таблицы для тестовой базы данных
CREATE TABLE employees (
id SERIAL PRIMARY KEY,
first_name VARCHAR NOT NULL,
last_name VARCHAR NOT NULL,
department VARCHAR NOT NULL,
office VARCHAR NOT NULL,
hashed_password VARCHAR NOT NULL,
is_active BOOLEAN DEFAULT TRUE,
is_admin BOOLEAN DEFAULT FALSE,
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE requests (
id SERIAL PRIMARY KEY,
request_type VARCHAR NOT NULL,
description TEXT NOT NULL,
priority VARCHAR NOT NULL,
status VARCHAR NOT NULL DEFAULT 'new',
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
employee_id INTEGER NOT NULL REFERENCES employees(id)
);
CREATE TABLE tokens (
id SERIAL PRIMARY KEY,
token VARCHAR UNIQUE NOT NULL,
employee_id INTEGER NOT NULL REFERENCES employees(id),
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
);
-- Создаем индексы в тестовой базе
CREATE INDEX idx_employees_last_name ON employees(last_name);
CREATE INDEX idx_requests_employee_id ON requests(employee_id);
CREATE INDEX idx_requests_status ON requests(status);
CREATE INDEX idx_tokens_token ON tokens(token);

View File

@@ -5,8 +5,8 @@ pydantic==2.5.2
pydantic-settings==2.2.1
python-multipart==0.0.9
python-jose[cryptography]==3.3.0
passlib[bcrypt]>=1.7.4
bcrypt>=4.0.1
passlib[bcrypt]==1.7.4
bcrypt==3.2.2
redis>=4.0.0
python-dotenv==1.0.1
psycopg2-binary==2.9.9

View File

@@ -0,0 +1,30 @@
"""Database connection check script"""
import sys
from sqlalchemy import create_engine, text
from sqlalchemy.exc import SQLAlchemyError
def check_database_connection(database_url: str) -> bool:
"""Check database connection"""
try:
engine = create_engine(database_url)
with engine.connect() as connection:
result = connection.execute(text("SELECT 1"))
print(f"Successfully connected to {database_url}")
return True
except SQLAlchemyError as error:
print(f"Error connecting to {database_url}: {error}")
return False
if __name__ == "__main__":
# URL для основной базы данных
main_db_url = "postgresql://postgres:postgres@localhost:5432/app"
# URL для тестовой базы данных
test_db_url = "postgresql://postgres:postgres@localhost:5432/test_app"
main_ok = check_database_connection(main_db_url)
test_ok = check_database_connection(test_db_url)
if not (main_ok and test_ok):
sys.exit(1)
print("All database connections are successful!")

Binary file not shown.

View File

@@ -1,110 +1,72 @@
"""Test fixtures"""
"""Test configuration"""
import os
import pytest
from fastapi.testclient import TestClient
from sqlalchemy import create_engine
from sqlalchemy import create_engine, text
from sqlalchemy.orm import sessionmaker
from sqlalchemy.pool import StaticPool
import fakeredis.aioredis
from typing import Generator
from fastapi.testclient import TestClient
from typing import Generator, Any
# Устанавливаем флаг тестирования
# Устанавливаем переменную окружения для тестов
os.environ["TESTING"] = "True"
from app.database import Base
from app.main import app
from app.database import Base, get_db
from app.models.employee import Employee
from app.utils.auth import get_password_hash
from app.core.test_config import test_settings
from app.dependencies import get_db
from .fixtures import * # импортируем все фикстуры
# Создаем тестовую базу данных в памяти222
SQLALCHEMY_DATABASE_URL = "sqlite:///:memory:"
engine = create_engine(
SQLALCHEMY_DATABASE_URL,
connect_args={"check_same_thread": False},
poolclass=StaticPool,
)
# Создаем тестовый движок базы данных
engine = create_engine(test_settings.DATABASE_URL)
# Создаем тестовую фабрику сессий
TestingSessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
# Создаем тестовую базу данных
@pytest.fixture(scope="session", autouse=True)
def setup_test_db() -> Generator[None, Any, None]:
"""Setup test database"""
# Пробуем создать базу данных test_app
default_engine = create_engine("postgresql://postgres:postgres@localhost:5432/postgres")
with default_engine.connect() as conn:
conn.execute(text("COMMIT")) # Завершаем текущую транзакцию
try:
conn.execute(text("DROP DATABASE IF EXISTS test_app"))
conn.execute(text("CREATE DATABASE test_app"))
except Exception as e:
print(f"Error creating database: {e}")
# Создаем все таблицы
Base.metadata.create_all(bind=engine)
yield
# Удаляем все таблицы
Base.metadata.drop_all(bind=engine)
# Закрываем соединение с тестовой базой
engine.dispose()
@pytest.fixture
def db() -> Generator:
"""Фикстура для получения тестовой сессии БД."""
def db_session() -> Generator[Any, Any, None]:
"""Get database session"""
connection = engine.connect()
transaction = connection.begin()
session = TestingSessionLocal(bind=connection)
try:
yield session
finally:
session.close()
transaction.rollback()
connection.close()
@pytest.fixture
def client(db) -> TestClient:
"""Фикстура для получения тестового клиента."""
def override_get_db():
def client(db_session: Any) -> Generator[TestClient, Any, None]:
"""Get test client"""
def override_get_db() -> Generator[Any, Any, None]:
try:
yield db
yield db_session
finally:
pass
app.dependency_overrides[get_db] = override_get_db
yield TestClient(app)
with TestClient(app) as test_client:
yield test_client
app.dependency_overrides.clear()
@pytest.fixture
def test_employee(db) -> Employee:
"""Фикстура для создания тестового сотрудника."""
employee = Employee(
first_name="Test",
last_name="Employee",
department="Test Department",
office="Test Office",
hashed_password=get_password_hash("testpassword"),
is_admin=False
)
db.add(employee)
db.commit()
db.refresh(employee)
return employee
@pytest.fixture
def test_admin(db) -> Employee:
"""Фикстура для создания тестового администратора."""
admin = Employee(
first_name="Admin",
last_name="User",
department="Admin Department",
office="Admin Office",
hashed_password=get_password_hash("adminpassword"),
is_admin=True
)
db.add(admin)
db.commit()
db.refresh(admin)
return admin
@pytest.fixture
def employee_token(client: TestClient, test_employee: Employee) -> str:
"""Фикстура для получения токена сотрудника."""
response = client.post(
"/api/auth/login",
data={"username": test_employee.last_name, "password": "testpassword"}
)
return response.json()["access_token"]
@pytest.fixture
def admin_token(client: TestClient, test_admin: Employee) -> str:
"""Фикстура для получения токена администратора."""
response = client.post(
"/api/auth/admin/login",
data={"username": test_admin.last_name, "password": "adminpassword"}
)
return response.json()["access_token"]
@pytest.fixture
def redis_mock():
"""Фикстура для мока Redis."""
return fakeredis.aioredis.FakeRedis()

77
backend/tests/fixtures.py Normal file
View File

@@ -0,0 +1,77 @@
"""Test fixtures"""
import pytest
from fastapi.testclient import TestClient
from sqlalchemy.orm import Session
from app.crud import employees
from app.schemas.employee import EmployeeCreate
from app.utils.auth import get_password_hash
from app.models.employee import Employee
@pytest.fixture(scope="function")
def test_employee(db_session: Session) -> Employee:
"""Create test employee"""
# Удаляем существующего сотрудника, если есть
db_session.query(Employee).filter(
Employee.first_name == "Test",
Employee.last_name == "User"
).delete()
db_session.commit()
employee = EmployeeCreate(
first_name="Test",
last_name="User",
department="IT",
office="101",
password="testpass123",
is_admin=False
)
hashed_password = get_password_hash(employee.password)
db_employee = employees.create_employee(db_session, employee, hashed_password)
return db_employee
@pytest.fixture(scope="function")
def test_admin(db_session: Session) -> Employee:
"""Create test admin"""
# Удаляем существующего админа, если есть
db_session.query(Employee).filter(
Employee.first_name == "Admin",
Employee.last_name == "User"
).delete()
db_session.commit()
admin = EmployeeCreate(
first_name="Admin",
last_name="User",
department="IT",
office="102",
password="adminpass123",
is_admin=True
)
hashed_password = get_password_hash(admin.password)
db_admin = employees.create_employee(db_session, admin, hashed_password)
return db_admin
@pytest.fixture(scope="function")
def employee_token(client: TestClient, test_employee: Employee) -> str:
"""Get employee token"""
response = client.post(
"/api/auth/login",
data={
"username": f"{test_employee.first_name} {test_employee.last_name}",
"password": "testpass123"
}
)
return response.json()["access_token"]
@pytest.fixture(scope="function")
def admin_token(client: TestClient, test_admin: Employee) -> str:
"""Get admin token"""
response = client.post(
"/api/auth/admin/login",
data={
"username": f"{test_admin.first_name} {test_admin.last_name}",
"password": "adminpass123"
}
)
return response.json()["access_token"]

View File

@@ -1,77 +1,84 @@
"""Authentication tests."""
"""Authentication tests"""
import pytest
from fastapi.testclient import TestClient
from sqlalchemy.orm import Session
from app.models.employee import Employee
def test_login_employee_success(client: TestClient, test_employee: Employee):
"""Тест успешной авторизации сотрудника."""
def test_login_success(client: TestClient, test_employee: dict):
"""Test successful login"""
response = client.post(
"/api/auth/login",
data={"username": test_employee.last_name, "password": "testpassword"}
data={
"username": f"{test_employee.first_name} {test_employee.last_name}",
"password": "testpass123"
}
)
assert response.status_code == 200
assert "access_token" in response.json()
assert "token_type" in response.json()
assert response.json()["token_type"] == "bearer"
def test_login_employee_wrong_password(client: TestClient, test_employee: Employee):
"""Тест авторизации сотрудника с неверным паролем."""
def test_login_wrong_password(client: TestClient, test_employee: dict):
"""Test login with wrong password"""
response = client.post(
"/api/auth/login",
data={"username": test_employee.last_name, "password": "wrongpassword"}
data={
"username": f"{test_employee.first_name} {test_employee.last_name}",
"password": "wrongpass"
}
)
assert response.status_code == 401
assert response.json()["detail"] == "Incorrect username or password"
def test_login_employee_wrong_username(client: TestClient):
"""Тест авторизации с несуществующим пользователем."""
def test_login_wrong_username(client: TestClient):
"""Test login with wrong username"""
response = client.post(
"/api/auth/login",
data={"username": "nonexistent", "password": "testpassword"}
data={
"username": "Wrong User",
"password": "testpass123"
}
)
assert response.status_code == 401
assert response.json()["detail"] == "Incorrect username or password"
def test_login_admin_success(client: TestClient, test_admin: Employee):
"""Тест успешной авторизации администратора."""
def test_login_invalid_username_format(client: TestClient):
"""Test login with invalid username format"""
response = client.post(
"/api/auth/login",
data={
"username": "InvalidFormat",
"password": "testpass123"
}
)
assert response.status_code == 401
assert response.json()["detail"] == "Username should be in format: 'First Last'"
def test_admin_login_success(client: TestClient, test_admin: dict):
"""Test successful admin login"""
response = client.post(
"/api/auth/admin/login",
data={"username": test_admin.last_name, "password": "adminpassword"}
data={
"username": f"{test_admin.first_name} {test_admin.last_name}",
"password": "adminpass123"
}
)
assert response.status_code == 200
assert "access_token" in response.json()
assert "token_type" in response.json()
assert response.json()["token_type"] == "bearer"
def test_login_admin_wrong_password(client: TestClient, test_admin: Employee):
"""Тест авторизации администратора с неверным паролем."""
def test_admin_login_not_admin(client: TestClient, test_employee: dict):
"""Test admin login with non-admin user"""
response = client.post(
"/api/auth/admin/login",
data={"username": test_admin.last_name, "password": "wrongpassword"}
data={
"username": f"{test_employee.first_name} {test_employee.last_name}",
"password": "testpass123"
}
)
assert response.status_code == 401
assert response.json()["detail"] == "Incorrect username or password"
def test_protected_route_with_valid_token(client: TestClient, employee_token: str, test_employee: Employee, db: Session):
"""Тест доступа к защищенному маршруту с валидным токеном."""
response = client.get(
"/api/employees/me",
headers={"Authorization": f"Bearer {employee_token}"}
)
assert response.status_code == 200
data = response.json()
assert data["first_name"] == test_employee.first_name
assert data["last_name"] == test_employee.last_name
def test_protected_route_without_token(client: TestClient):
"""Тест доступа к защищенному маршруту без токена."""
response = client.get("/api/employees/me")
assert response.status_code == 401
assert response.json()["detail"] == "Not authenticated"
def test_protected_route_with_invalid_token(client: TestClient):
"""Тест доступа к защищенному маршруту с недействительным токеном."""
"""Test accessing protected route with invalid token"""
response = client.get(
"/api/employees/me",
headers={"Authorization": "Bearer invalid_token"}

View File

@@ -1,11 +1,10 @@
"""Employee tests."""
"""Employee tests"""
import pytest
from fastapi.testclient import TestClient
from sqlalchemy.orm import Session
from app.models.employee import Employee
def test_create_employee(client: TestClient, admin_token: str, db: Session):
"""Тест создания сотрудника."""
def test_create_employee(client: TestClient, admin_token: str):
"""Test employee creation"""
response = client.post(
"/api/employees",
headers={"Authorization": f"Bearer {admin_token}"},
@@ -13,8 +12,9 @@ def test_create_employee(client: TestClient, admin_token: str, db: Session):
"first_name": "New",
"last_name": "Employee",
"department": "IT",
"office": "102",
"password": "newpassword"
"office": "103",
"password": "newpass123",
"is_admin": False
}
)
assert response.status_code == 201
@@ -22,26 +22,44 @@ def test_create_employee(client: TestClient, admin_token: str, db: Session):
assert data["first_name"] == "New"
assert data["last_name"] == "Employee"
assert data["department"] == "IT"
assert data["office"] == "102"
assert "id" in data
assert data["office"] == "103"
assert data["is_admin"] == False
def test_create_employee_unauthorized(client: TestClient):
"""Тест создания сотрудника без авторизации."""
"""Test employee creation without authorization"""
response = client.post(
"/api/employees",
json={
"first_name": "New",
"last_name": "Employee",
"department": "IT",
"office": "102",
"password": "newpassword"
"office": "103",
"password": "newpass123",
"is_admin": False
}
)
assert response.status_code == 401
assert response.json()["detail"] == "Not authenticated"
def test_get_employees(client: TestClient, admin_token: str, test_employee: Employee, db: Session):
"""Тест получения списка сотрудников."""
def test_create_employee_not_admin(client: TestClient, employee_token: str):
"""Test employee creation by non-admin user"""
response = client.post(
"/api/employees",
headers={"Authorization": f"Bearer {employee_token}"},
json={
"first_name": "New",
"last_name": "Employee",
"department": "IT",
"office": "103",
"password": "newpass123",
"is_admin": False
}
)
assert response.status_code == 403
assert response.json()["detail"] == "Not enough permissions"
def test_get_employees(client: TestClient, admin_token: str):
"""Test getting all employees"""
response = client.get(
"/api/employees",
headers={"Authorization": f"Bearer {admin_token}"}
@@ -50,67 +68,24 @@ def test_get_employees(client: TestClient, admin_token: str, test_employee: Empl
data = response.json()
assert isinstance(data, list)
assert len(data) > 0
assert "first_name" in data[0]
assert "last_name" in data[0]
assert "department" in data[0]
assert "office" in data[0]
def test_get_employee_by_id(client: TestClient, admin_token: str, test_employee: Employee, db: Session):
"""Тест получения сотрудника по ID."""
def test_get_employees_unauthorized(client: TestClient):
"""Test getting employees without authorization"""
response = client.get("/api/employees")
assert response.status_code == 401
assert response.json()["detail"] == "Not authenticated"
def test_get_employees_not_admin(client: TestClient, employee_token: str):
"""Test getting employees by non-admin user"""
response = client.get(
f"/api/employees/{test_employee.id}",
headers={"Authorization": f"Bearer {admin_token}"}
"/api/employees",
headers={"Authorization": f"Bearer {employee_token}"}
)
assert response.status_code == 200
data = response.json()
assert data["first_name"] == test_employee.first_name
assert data["last_name"] == test_employee.last_name
assert data["department"] == test_employee.department
assert data["office"] == test_employee.office
assert response.status_code == 403
assert response.json()["detail"] == "Not enough permissions"
def test_get_nonexistent_employee(client: TestClient, admin_token: str):
"""Тест получения несуществующего сотрудника."""
response = client.get(
"/api/employees/999",
headers={"Authorization": f"Bearer {admin_token}"}
)
assert response.status_code == 404
assert response.json()["detail"] == "Employee not found"
def test_update_employee(client: TestClient, admin_token: str, test_employee: Employee, db: Session):
"""Тест обновления данных сотрудника."""
response = client.put(
f"/api/employees/{test_employee.id}",
headers={"Authorization": f"Bearer {admin_token}"},
json={
"first_name": "Updated",
"last_name": "Name",
"department": "HR",
"office": "103"
}
)
assert response.status_code == 200
data = response.json()
assert data["first_name"] == "Updated"
assert data["last_name"] == "Name"
assert data["department"] == "HR"
assert data["office"] == "103"
def test_delete_employee(client: TestClient, admin_token: str, test_employee: Employee, db: Session):
"""Тест удаления сотрудника."""
response = client.delete(
f"/api/employees/{test_employee.id}",
headers={"Authorization": f"Bearer {admin_token}"}
)
assert response.status_code == 200
data = response.json()
assert data["first_name"] == test_employee.first_name
assert data["last_name"] == test_employee.last_name
assert data["department"] == test_employee.department
assert data["office"] == test_employee.office
def test_employee_me(client: TestClient, employee_token: str, test_employee: Employee, db: Session):
"""Тест получения информации о текущем сотруднике."""
def test_get_me(client: TestClient, employee_token: str, test_employee: dict):
"""Test getting current employee"""
response = client.get(
"/api/employees/me",
headers={"Authorization": f"Bearer {employee_token}"}
@@ -122,21 +97,35 @@ def test_employee_me(client: TestClient, employee_token: str, test_employee: Emp
assert data["department"] == test_employee.department
assert data["office"] == test_employee.office
def test_update_me(client: TestClient, employee_token: str, test_employee: Employee, db: Session):
"""Тест обновления информации о текущем сотруднике."""
def test_get_me_unauthorized(client: TestClient):
"""Test getting current employee without authorization"""
response = client.get("/api/employees/me")
assert response.status_code == 401
assert response.json()["detail"] == "Not authenticated"
def test_update_me(client: TestClient, employee_token: str):
"""Test updating current employee"""
response = client.put(
"/api/employees/me",
headers={"Authorization": f"Bearer {employee_token}"},
json={
"first_name": "Updated",
"last_name": "Name",
"department": "Support",
"department": "HR",
"office": "104"
}
)
assert response.status_code == 200
data = response.json()
assert data["first_name"] == "Updated"
assert data["last_name"] == "Name"
assert data["department"] == "Support"
assert data["department"] == "HR"
assert data["office"] == "104"
def test_update_me_unauthorized(client: TestClient):
"""Test updating current employee without authorization"""
response = client.put(
"/api/employees/me",
json={
"department": "HR",
"office": "104"
}
)
assert response.status_code == 401
assert response.json()["detail"] == "Not authenticated"

View File

@@ -1,55 +1,41 @@
"""Request tests."""
"""Request tests"""
import pytest
from fastapi.testclient import TestClient
from sqlalchemy.orm import Session
from app.models.employee import Employee
from app.models.request import Request
def test_create_request(client: TestClient, employee_token: str, db: Session):
"""Тест создания заявки."""
def test_create_request(client: TestClient, employee_token: str):
"""Test request creation"""
response = client.post(
"/api/requests",
headers={"Authorization": f"Bearer {employee_token}"},
json={
"request_type": "equipment",
"description": "Test Description",
"description": "Need a new laptop",
"priority": "medium"
}
)
assert response.status_code == 201
data = response.json()
assert data["request_type"] == "equipment"
assert data["description"] == "Test Description"
assert data["description"] == "Need a new laptop"
assert data["priority"] == "medium"
assert data["status"] == "new"
assert "id" in data
def test_create_request_unauthorized(client: TestClient):
"""Тест создания заявки без авторизации."""
"""Test request creation without authorization"""
response = client.post(
"/api/requests",
json={
"request_type": "equipment",
"description": "Test Description",
"description": "Need a new laptop",
"priority": "medium"
}
)
assert response.status_code == 401
assert response.json()["detail"] == "Not authenticated"
def test_get_employee_requests(client: TestClient, employee_token: str, test_employee: Employee, db: Session):
"""Тест получения списка заявок сотрудника."""
# Создаем тестовую заявку
request = Request(
request_type="equipment",
description="Test Description",
priority="medium",
status="new",
employee_id=test_employee.id
)
db.add(request)
db.commit()
def test_get_my_requests(client: TestClient, employee_token: str):
"""Test getting employee's requests"""
response = client.get(
"/api/requests/my",
headers={"Authorization": f"Bearer {employee_token}"}
@@ -57,23 +43,15 @@ def test_get_employee_requests(client: TestClient, employee_token: str, test_emp
assert response.status_code == 200
data = response.json()
assert isinstance(data, list)
assert len(data) > 0
assert data[0]["request_type"] == "equipment"
assert data[0]["description"] == "Test Description"
def test_admin_get_all_requests(client: TestClient, admin_token: str, test_employee: Employee, db: Session):
"""Тест получения всех заявок администратором."""
# Создаем тестовую заявку
request = Request(
request_type="equipment",
description="Test Description",
priority="medium",
status="new",
employee_id=test_employee.id
)
db.add(request)
db.commit()
def test_get_my_requests_unauthorized(client: TestClient):
"""Test getting employee's requests without authorization"""
response = client.get("/api/requests/my")
assert response.status_code == 401
assert response.json()["detail"] == "Not authenticated"
def test_get_all_requests_admin(client: TestClient, admin_token: str):
"""Test getting all requests by admin"""
response = client.get(
"/api/requests/admin",
headers={"Authorization": f"Bearer {admin_token}"}
@@ -81,25 +59,39 @@ def test_admin_get_all_requests(client: TestClient, admin_token: str, test_emplo
assert response.status_code == 200
data = response.json()
assert isinstance(data, list)
assert len(data) > 0
assert data[0]["request_type"] == "equipment"
assert data[0]["description"] == "Test Description"
def test_update_request_status(client: TestClient, admin_token: str, test_employee: Employee, db: Session):
"""Тест обновления статуса заявки."""
# Создаем тестовую заявку
request = Request(
request_type="equipment",
description="Test Description",
priority="medium",
status="new",
employee_id=test_employee.id
def test_get_all_requests_unauthorized(client: TestClient):
"""Test getting all requests without authorization"""
response = client.get("/api/requests/admin")
assert response.status_code == 401
assert response.json()["detail"] == "Not authenticated"
def test_get_all_requests_not_admin(client: TestClient, employee_token: str):
"""Test getting all requests by non-admin user"""
response = client.get(
"/api/requests/admin",
headers={"Authorization": f"Bearer {employee_token}"}
)
db.add(request)
db.commit()
assert response.status_code == 403
assert response.json()["detail"] == "Not enough permissions"
def test_update_request_status_admin(client: TestClient, admin_token: str):
"""Test updating request status by admin"""
# Сначала создаем запрос
response = client.post(
"/api/requests",
headers={"Authorization": f"Bearer {admin_token}"},
json={
"request_type": "equipment",
"description": "Need a new laptop",
"priority": "medium"
}
)
request_id = response.json()["id"]
# Обновляем статус
response = client.patch(
f"/api/requests/{request.id}/status",
f"/api/requests/{request_id}/status",
headers={"Authorization": f"Bearer {admin_token}"},
json={"status": "in_progress"}
)
@@ -107,42 +99,21 @@ def test_update_request_status(client: TestClient, admin_token: str, test_employ
data = response.json()
assert data["status"] == "in_progress"
def test_get_request_statistics(client: TestClient, admin_token: str, test_employee: Employee, db: Session):
"""Тест получения статистики по заявкам."""
# Создаем тестовые заявки с разными статусами
requests = [
Request(
request_type="equipment",
description="Test Description",
priority="medium",
status="new",
employee_id=test_employee.id
),
Request(
request_type="equipment",
description="Test Description",
priority="high",
status="in_progress",
employee_id=test_employee.id
),
Request(
request_type="equipment",
description="Test Description",
priority="low",
status="completed",
employee_id=test_employee.id
def test_update_request_status_not_admin(client: TestClient, employee_token: str):
"""Test updating request status by non-admin user"""
response = client.patch(
"/api/requests/1/status",
headers={"Authorization": f"Bearer {employee_token}"},
json={"status": "in_progress"}
)
]
for req in requests:
db.add(req)
db.commit()
assert response.status_code == 403
assert response.json()["detail"] == "Not enough permissions"
response = client.get(
"/api/statistics",
headers={"Authorization": f"Bearer {admin_token}"}
def test_update_request_status_unauthorized(client: TestClient):
"""Test updating request status without authorization"""
response = client.patch(
"/api/requests/1/status",
json={"status": "in_progress"}
)
assert response.status_code == 200
data = response.json()
assert "total" in data
assert "by_status" in data
assert data["total"] >= 3
assert response.status_code == 401
assert response.json()["detail"] == "Not authenticated"

View File

@@ -1,42 +1,22 @@
version: '3.8'
services:
backend:
image: backend:latest
restart: always
environment:
- DATABASE_URL=postgresql://postgres:postgres@db:5432/app
- REDIS_HOST=redis
- REDIS_PORT=6379
- SECRET_KEY=${SECRET_KEY}
depends_on:
- db
- redis
frontend:
image: frontend:latest
restart: always
ports:
- "80:80"
depends_on:
- backend
db:
postgres:
image: postgres:15
restart: always
container_name: postgres
environment:
- POSTGRES_USER=postgres
- POSTGRES_PASSWORD=postgres
- POSTGRES_DB=app
POSTGRES_USER: postgres
POSTGRES_PASSWORD: postgres
volumes:
- ./backend/docker/postgres/init.sql:/docker-entrypoint-initdb.d/init.sql
- postgres_data:/var/lib/postgresql/data
redis:
image: redis:latest
restart: always
volumes:
- redis_data:/data
ports:
- "5432:5432"
healthcheck:
test: ["CMD-SHELL", "pg_isready -U postgres"]
interval: 5s
timeout: 5s
retries: 5
volumes:
postgres_data:
redis_data:

View File

@@ -1,2 +0,0 @@
# Попытка вывести неопределенную переменную
print(my_var) # Вызовет NameError