Chepuhagram/srv/app/api/endpoints/auth.py

108 lines
3.7 KiB
Python

from fastapi import FastAPI, Depends, HTTPException, status, APIRouter
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from sqlalchemy.orm import Session
from app.core import security
from app.api import schemas
from app.db import models
from jose import JWTError, jwt
from app.core.security import get_current_user
# бд
def get_db():
db = models.SessionLocal()
try:
yield db
finally:
db.close()
authRouter = APIRouter(
prefix="/auth",
tags=[],
)
# регистрация
@authRouter.post("/register")
async def register(username: str, password: str, db: Session = Depends(get_db)):
if len(password.encode('utf-8')) > 72:
raise HTTPException(
status_code=400, detail="Пароль слишком длинный (макс. 72 байта)")
db_user = db.query(models.User).filter(
models.User.username == username).first()
if db_user:
raise HTTPException(
status_code=400, detail="Пользователь уже существует")
hashed_pwd = security.get_password_hash(password)
new_user = models.User(username=username, hashed_password=hashed_pwd)
db.add(new_user)
db.commit()
return {"status": "ok", "message": "User created"}
@authRouter.post("/hash")
async def register(password: str):
if len(password.encode('utf-8')) > 72:
raise HTTPException(
status_code=400, detail="Пароль слишком длинный (макс. 72 байта)")
hashed_pwd = security.get_password_hash(password)
return {"password": hashed_pwd}
# вход
@authRouter.post("/login")
async def login(form_data: OAuth2PasswordRequestForm = Depends(), db: Session = Depends(get_db)):
user = db.query(models.User).filter(
models.User.username == form_data.username).first()
if not user or not security.verify_password(form_data.password, user.hashed_password):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Неверный логин или пароль",
headers={"WWW-Authenticate": "Bearer"},
)
access_token = security.create_access_token(data={"sub": str(user.id)})
refresh_token = security.create_refresh_token(data={"sub": str(user.id)})
return {
"access_token": access_token,
"refresh_token": refresh_token,
"token_type": "bearer",
"user_id": user.id
}
@authRouter.post("/refresh")
async def refresh_token(data: schemas.RefreshRequest):
try:
payload = jwt.decode(data.refresh_token, security.SECRET_KEY, algorithms=[
security.ALGORITHM])
user_id = str(payload.get("sub"))
if user_id is None:
raise HTTPException(status_code=401)
new_access_token = security.create_access_token(data={"sub": user_id})
new_refresh_token = security.create_refresh_token(
data={"sub": user_id})
return {"refresh_token": new_refresh_token, "access_token": new_access_token, "token_type": "bearer"}
except JWTError:
raise HTTPException(status_code=401, detail="Refresh token expired")
@authRouter.post("/setup-account")
async def setup_account(data: schemas.SetupAccount, current_user: models.User = Depends(get_current_user), db: Session = Depends(get_db)):
user_to_update = db.merge(current_user)
user_to_update.first_name = data.first_name
user_to_update.last_name = data.last_name
user_to_update.public_key = data.public_key
user_to_update.encrypted_private_key = data.encrypted_private_key
db.commit()
db.refresh(user_to_update)
return {"status": "ok", "message": "Account setup completed"}