Chepuhagram/srv/app/api/endpoints/media.py

133 lines
4.2 KiB
Python

from fastapi import FastAPI, Depends, HTTPException, status, APIRouter, File, UploadFile, Request
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from sqlalchemy.orm import Session
from app.core import security
from app.api import schemas
from app.db import models
from jose import JWTError, jwt
from app.core.security import get_current_user
from fastapi.responses import FileResponse
import os
import re
import uuid
from io import BytesIO
# бд
def get_db():
db = models.SessionLocal()
try:
yield db
finally:
db.close()
mediaRouter = APIRouter(
prefix="/media",
tags=[],
)
UPLOAD_FOLDER = 'uploads'
if not os.path.exists(UPLOAD_FOLDER):
os.makedirs(UPLOAD_FOLDER)
def _parse_multipart_body(body: bytes):
try:
if not body.startswith(b'--'):
return None
boundary, rest = body.split(b'\r\n', 1)
parts = body.split(boundary)
for part in parts:
if not part or part in (b'--', b'--\r\n'):
continue
part = part.strip(b'\r\n')
if not part:
continue
headers, _, content = part.partition(b'\r\n\r\n')
if not headers or content is None:
continue
disposition_match = re.search(
br'Content-Disposition:\s*form-data;\s*name="([^"]+)"(?:;\s*filename="([^"]+)")?',
headers,
re.IGNORECASE,
)
if not disposition_match:
continue
field_name = disposition_match.group(1).decode('utf-8', errors='ignore')
filename = disposition_match.group(2)
if field_name != 'file':
continue
filename = filename.decode('utf-8', errors='ignore') if filename else 'upload.bin'
content_type_match = re.search(br'Content-Type:\s*([\w\-\/]+)', headers, re.IGNORECASE)
content_type = (
content_type_match.group(1).decode('utf-8', errors='ignore')
if content_type_match
else 'application/octet-stream'
)
return filename, content.rstrip(b'\r\n'), content_type
except Exception:
return None
return None
@mediaRouter.post('/upload')
async def upload_file(request: Request, file: UploadFile = File(None)):
uploaded_file = file
if uploaded_file is None:
raw_body = await request.body()
parsed = _parse_multipart_body(raw_body)
if parsed is not None:
filename, content, content_type = parsed
uploaded_file = UploadFile(
filename=filename,
file=BytesIO(content),
content_type=content_type,
)
if uploaded_file is None or not uploaded_file.filename:
raise HTTPException(status_code=400, detail="No selected file")
# Валидация размера файла (макс 10MB)
MAX_FILE_SIZE = 10 * 1024 * 1024
content = await uploaded_file.read()
if len(content) > MAX_FILE_SIZE:
raise HTTPException(status_code=400, detail="File too large (max 10MB)")
# Валидация типа файла (для зашифрованных файлов пропускаем, так как content_type не image)
# ALLOWED_TYPES = {'image/jpeg', 'image/png', 'image/gif', 'image/webp'}
# if uploaded_file.content_type not in ALLOWED_TYPES:
# raise HTTPException(status_code=400, detail="Invalid file type")
# Генерируем уникальное имя, чтобы файлы не перезаписывались
file_id = str(uuid.uuid4())
filename = f"{file_id}.enc"
file_path = os.path.join(UPLOAD_FOLDER, filename)
# Сохраняем
with open(file_path, "wb") as f:
f.write(content)
print(f"Файл сохранен: {file_path}")
return {
"status": "ok",
"file_id": file_id
}
@mediaRouter.get('/{file_id}')
async def get_file(file_id: str):
filename = f"{file_id}.enc"
file_path = os.path.join(UPLOAD_FOLDER, filename)
if not os.path.exists(file_path):
raise HTTPException(status_code=404, detail="File not found")
return FileResponse(file_path, media_type="application/octet-stream")