Chepuhagram/srv/app/api/endpoints/auth.py

72 lines
2.7 KiB
Python

from fastapi import FastAPI, Depends, HTTPException, status, APIRouter
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from sqlalchemy.orm import Session
from app.core import security
from app.db import models
from jose import JWTError, jwt
# бд
def get_db():
db = models.SessionLocal()
try:
yield db
finally:
db.close()
authRouter = APIRouter(
prefix="/auth",
tags=[],
)
# регистрация
@authRouter.post("/register")
async def register(username: str, password: str, db: Session = Depends(get_db)):
if len(password.encode('utf-8')) > 72:
raise HTTPException(status_code=400, detail="Пароль слишком длинный (макс. 72 байта)")
db_user = db.query(models.User).filter(models.User.username == username).first()
if db_user:
raise HTTPException(status_code=400, detail="Пользователь уже существует")
hashed_pwd = security.get_password_hash(password)
new_user = models.User(username=username, hashed_password=hashed_pwd)
db.add(new_user)
db.commit()
return {"status": "ok", "message": "User created"}
# вход
@authRouter.post("/login")
async def login(form_data: OAuth2PasswordRequestForm = Depends(), db: Session = Depends(get_db)):
user = db.query(models.User).filter(models.User.username == form_data.username).first()
if not user or not security.verify_password(form_data.password, user.hashed_password):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Неверный логин или пароль",
headers={"WWW-Authenticate": "Bearer"},
)
access_token = security.create_access_token(data={"sub": str(user.id)})
refresh_token = security.create_refresh_token(data={"sub": str(user.id)})
return {
"access_token": access_token,
"refresh_token": refresh_token,
"token_type": "bearer",
"user_id": user.id
}
@authRouter.post("/refresh")
async def refresh_token(data: models.RefreshRequest, db: Session = Depends(get_db)):
try:
payload = jwt.decode(data.refresh_token, security.SECRET_KEY, algorithms=[security.ALGORITHM])
user_id = payload.get("sub")
if user_id is None:
raise HTTPException(status_code=401)
new_access_token = security.create_access_token(data={"sub": user_id})
new_refresh_token = security.create_refresh_token(data={"sub": user_id})
return {"refresh_token": new_refresh_token, "access_token": new_access_token, "token_type": "bearer"}
except JWTError:
raise HTTPException(status_code=401, detail="Refresh token expired")