from fastapi import FastAPI, Depends, HTTPException, status from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm from datetime import datetime, timedelta from jose import jwt import hashlib from sqlalchemy.orm import Session from app.db import models from dotenv import load_dotenv from jose import JWTError, jwt import os import bcrypt load_dotenv() SECRET_KEY = os.getenv("JWT_KEY").strip() ALGORITHM = "HS256" ACCESS_TOKEN_EXPIRE_MINUTES = 30 REFRESH_TOKEN_EXPIRE_MINUTES = 60 * 24 * 60 oauth2_scheme = OAuth2PasswordBearer(tokenUrl="auth/login") # бд def get_db(): db = models.SessionLocal() try: yield db finally: db.close() def verify_password(plain_password, hashed_password): try: return bcrypt.checkpw(plain_password.encode('utf-8'), hashed_password) except TypeError: return bcrypt.checkpw(plain_password.encode('utf-8'), hashed_password.encode('utf-8')) def get_password_hash(password): return bcrypt.hashpw(password.encode('utf-8'), bcrypt.gensalt()) def create_access_token(data: dict): to_encode = data.copy() expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) to_encode.update({"exp": expire}) return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) def create_refresh_token(data: dict): to_encode = data.copy() expire = datetime.utcnow() + timedelta(minutes=REFRESH_TOKEN_EXPIRE_MINUTES) to_encode.update({"exp": expire}) return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) # проверка токена async def get_current_user(token: str = Depends(oauth2_scheme), db: Session = Depends(get_db)): credentials_exception = HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate credentials", ) try: payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) id: str = payload.get("sub") if id is None: raise credentials_exception except JWTError: raise credentials_exception user_id = int(id) user = db.query(models.User).filter(models.User.id == user_id).first() if user is None: raise credentials_exception return user async def test_token(token: str): credentials_exception = HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate credentials", ) try: payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) id: str = payload.get("sub") if id is None: raise credentials_exception return id except JWTError: raise credentials_exception