Chepuhagram/srv/app/core/security.py

84 lines
2.7 KiB
Python

from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from datetime import datetime, timedelta
from jose import jwt
import hashlib
from sqlalchemy.orm import Session
from app.db import models
from dotenv import load_dotenv
from jose import JWTError, jwt
import os
import bcrypt
load_dotenv()
SECRET_KEY = os.getenv("JWT_KEY")
if not SECRET_KEY:
raise RuntimeError("JWT_KEY environment variable not set")
SECRET_KEY = SECRET_KEY.strip()
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
REFRESH_TOKEN_EXPIRE_MINUTES = 60 * 24 * 60
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="auth/login")
# бд
def get_db():
db = models.SessionLocal()
try:
yield db
finally:
db.close()
def verify_password(plain_password, hashed_password):
try:
return bcrypt.checkpw(plain_password.encode('utf-8'), hashed_password)
except TypeError:
return bcrypt.checkpw(plain_password.encode('utf-8'), hashed_password.encode('utf-8'))
def get_password_hash(password):
return bcrypt.hashpw(password.encode('utf-8'), bcrypt.gensalt())
def create_access_token(data: dict):
to_encode = data.copy()
expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
to_encode.update({"exp": expire})
return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
def create_refresh_token(data: dict):
to_encode = data.copy()
expire = datetime.utcnow() + timedelta(minutes=REFRESH_TOKEN_EXPIRE_MINUTES)
to_encode.update({"exp": expire})
return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
# проверка токена
async def get_current_user(token: str = Depends(oauth2_scheme), db: Session = Depends(get_db)):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
id: str = payload.get("sub")
if id is None:
raise credentials_exception
except JWTError:
raise credentials_exception
user_id = int(id)
user = db.query(models.User).filter(models.User.id == user_id).first()
if user is None:
raise credentials_exception
return user
async def test_token(token: str):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
id: str = payload.get("sub")
if id is None:
raise credentials_exception
return id
except JWTError:
raise credentials_exception