Chepuhagram/srv/app/api/endpoints/media.py

585 lines
20 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import shutil
from fastapi import Depends, FastAPI, HTTPException, status, APIRouter, File, UploadFile, Request, Form
from fastapi.responses import FileResponse, StreamingResponse
from sqlalchemy.orm import Session
from sqlalchemy.sql import func
from app.core.security import get_current_user
from app.db import models
from app.core.config import config
import os
import re
import uuid
import urllib.request
import urllib.error
from io import BytesIO
import asyncio
def _ensure_directory(path: str):
if not os.path.exists(path):
os.makedirs(path, exist_ok=True)
UPLOAD_FOLDER = 'uploads'
def _parse_multipart_body(body: bytes):
try:
if not body.startswith(b"--"):
return None
boundary, _ = body.split(b"\r\n", 1)
parts = body.split(boundary)
for part in parts:
if not part or part in (b"--", b"--\r\n"):
continue
part = part.strip(b"\r\n")
if not part:
continue
headers, _, content = part.partition(b"\r\n\r\n")
if not headers or content is None:
continue
disposition_match = re.search(
br'Content-Disposition:\s*form-data;\s*name="([^"]+)"(?:;\s*filename="([^"]+)")?',
headers,
re.IGNORECASE,
)
if not disposition_match:
continue
field_name = disposition_match.group(
1).decode('utf-8', errors='ignore')
filename = disposition_match.group(2)
if field_name != 'file':
continue
filename = filename.decode(
'utf-8', errors='ignore') if filename else 'upload.bin'
content_type_match = re.search(
br'Content-Type:\s*([\w\-\/]+)', headers, re.IGNORECASE)
content_type = (
content_type_match.group(1).decode('utf-8', errors='ignore')
if content_type_match
else 'application/octet-stream'
)
return filename, content.rstrip(b'\r\n'), content_type
except Exception:
return None
return None
async def _get_upload_file(request: Request, uploaded_file: UploadFile | None):
if uploaded_file is not None:
return uploaded_file
raw_body = await request.body()
parsed = _parse_multipart_body(raw_body)
if parsed is None:
return None
filename, content, content_type = parsed
return UploadFile(filename=filename, file=BytesIO(content), content_type=content_type)
def _encode_multipart_formdata(fields, files):
boundary = uuid.uuid4().hex
body = BytesIO()
for name, value in fields.items():
body.write(f"--{boundary}\r\n".encode('utf-8'))
body.write(
f'Content-Disposition: form-data; name="{name}"\r\n\r\n'.encode('utf-8'))
body.write(str(value).encode('utf-8'))
body.write(b"\r\n")
for field_name, filename, content_type, file_bytes in files:
body.write(f"--{boundary}\r\n".encode('utf-8'))
body.write(
f'Content-Disposition: form-data; name="{field_name}"; filename="{filename}"\r\n'.encode(
'utf-8')
)
body.write(f"Content-Type: {content_type}\r\n\r\n".encode('utf-8'))
body.write(file_bytes)
body.write(b"\r\n")
body.write(f"--{boundary}--\r\n".encode('utf-8'))
return body.getvalue(), boundary
def _get_cloud_cache_size_bytes(db: Session) -> int:
total = db.query(func.sum(models.CloudMediaItem.size_bytes)).filter(
models.CloudMediaItem.status.in_(['pending', 'sending']),
models.CloudMediaItem.is_avatar == 0,
).scalar()
return int(total or 0)
def _find_local_media_path(file_id: str) -> str | None:
candidates = [
os.path.join(config.CLOUD_MEDIA_CACHE_FOLDER, f"{file_id}.enc"),
os.path.join('uploads', f"{file_id}.enc"),
os.path.join(config.HOME_MEDIA_FOLDER, f"{file_id}.enc"),
]
for path in candidates:
if os.path.exists(path):
return path
return None
def _stream_response_from_remote(url: str):
try:
request = urllib.request.Request(url)
response = urllib.request.urlopen(request, timeout=45)
except urllib.error.HTTPError as exc:
if exc.code == 404:
raise HTTPException(status_code=404, detail='File not found')
raise HTTPException(
status_code=502, detail=f'Error fetching media from home server: {exc.code}')
except Exception as exc:
raise HTTPException(
status_code=502, detail=f'Could not reach home server: {exc}')
headers = {k.lower(): v for k, v in response.getheaders()}
content_type = headers.get('content-type', 'application/octet-stream')
return StreamingResponse(
iter(lambda: response.read(8192), b""),
media_type=content_type,
headers={
'Content-Disposition': headers.get('content-disposition', f'attachment; filename="{os.path.basename(url)}"')
},
)
def _post_file_to_home(item: models.CloudMediaItem) -> tuple[bool, str]:
file_path = os.path.join(
config.CLOUD_MEDIA_CACHE_FOLDER, item.local_filename)
if not os.path.exists(file_path):
return False, 'Local cache file not found'
with open(file_path, 'rb') as f:
content = f.read()
fields = {
'owner_id': item.owner_id or '',
'cloud_file_id': item.file_id,
'original_filename': item.original_filename or item.local_filename,
}
files = [
('file', item.original_filename or item.local_filename,
item.content_type or 'application/octet-stream', content),
]
body, boundary = _encode_multipart_formdata(fields, files)
request = urllib.request.Request(
f"{config.HOME_SERVER_URL}/media/receive",
data=body,
headers={
'Content-Type': f'multipart/form-data; boundary={boundary}',
'X-Media-Forwarding-Secret': config.MEDIA_FORWARDING_SECRET,
},
)
try:
with urllib.request.urlopen(request, timeout=60) as response:
if response.status == 200:
return True, ''
return False, f'Home server returned {response.status}'
except urllib.error.HTTPError as exc:
body = exc.read().decode(errors='ignore')
return False, f'Home server HTTP error {exc.code}: {body}'
except Exception as exc:
return False, str(exc)
def _cleanup_home_quota(db: Session, owner_id: int | None):
if owner_id is None:
return
total = db.query(func.sum(models.HomeMediaFile.size_bytes)).filter(
models.HomeMediaFile.owner_id == owner_id
).scalar() or 0
total = int(total)
if total <= config.HOME_USER_QUOTA_BYTES:
return
files = db.query(models.HomeMediaFile).filter(
models.HomeMediaFile.owner_id == owner_id
).order_by(models.HomeMediaFile.created_at.asc()).all()
for file_record in files:
if total <= config.HOME_USER_QUOTA_BYTES:
break
path = os.path.join(config.HOME_MEDIA_FOLDER,
file_record.storage_filename)
if os.path.exists(path):
os.remove(path)
total -= file_record.size_bytes
db.delete(file_record)
db.commit()
def _cleanup_all_home_storage():
db = models.SessionLocal()
try:
owner_ids = db.query(models.HomeMediaFile.owner_id).filter(
models.HomeMediaFile.owner_id.isnot(None)).distinct().all()
for owner_id_tuple in owner_ids:
_cleanup_home_quota(db, owner_id_tuple[0])
finally:
db.close()
async def forward_pending_media_loop():
while True:
if config.SERVER_ROLE != 'cloud':
await asyncio.sleep(10)
continue
db = models.SessionLocal()
try:
total_cache = _get_cloud_cache_size_bytes(db)
if total_cache >= config.CLOUD_CACHE_MAX_BYTES:
await asyncio.sleep(config.MEDIA_FORWARD_INTERVAL_SECONDS)
continue
pending_items = db.query(models.CloudMediaItem).filter(
models.CloudMediaItem.status == 'pending',
models.CloudMediaItem.is_avatar == 0,
).order_by(models.CloudMediaItem.created_at.asc()).limit(5).all()
for item in pending_items:
item.status = 'sending'
item.attempts += 1
db.commit()
success, error = _post_file_to_home(item)
if success:
item.status = 'sent'
item.sent_at = func.now()
item.error_message = None
db.commit()
cache_path = os.path.join(
config.CLOUD_MEDIA_CACHE_FOLDER, item.local_filename)
if os.path.exists(cache_path):
os.remove(cache_path)
else:
item.status = 'failed'
item.error_message = error
db.commit()
except Exception:
pass
finally:
db.close()
await asyncio.sleep(config.MEDIA_FORWARD_INTERVAL_SECONDS)
async def home_storage_maintenance_loop():
while True:
if config.SERVER_ROLE != 'home':
await asyncio.sleep(10)
continue
_cleanup_all_home_storage()
await asyncio.sleep(600)
mediaRouter = APIRouter(
prefix='/media',
tags=['media'],
)
_ensure_directory(UPLOAD_FOLDER)
_ensure_directory(config.CLOUD_MEDIA_CACHE_FOLDER)
_ensure_directory(config.HOME_MEDIA_FOLDER)
@mediaRouter.post('/upload')
async def upload_file(
request: Request,
file: UploadFile = File(None),
):
uploaded_file = await _get_upload_file(request, file)
if uploaded_file is None or not uploaded_file.filename:
raise HTTPException(status_code=400, detail='No selected file')
content = await uploaded_file.read()
if len(content) > config.MEDIA_UPLOAD_MAX_BYTES:
raise HTTPException(
status_code=400, detail=f'File too large (max {config.MEDIA_UPLOAD_MAX_BYTES} bytes)')
file_id = uuid.uuid4().hex
filename = f"{file_id}.enc"
file_path = os.path.join(UPLOAD_FOLDER, filename)
with open(file_path, 'wb') as f:
f.write(content)
return {
'status': 'ok',
'file_id': file_id,
}
@mediaRouter.post('/v2/upload')
async def upload_file_v2(
request: Request,
file: UploadFile = File(None),
purpose: str = Form('media'),
current_user: models.User = Depends(get_current_user),
):
if config.SERVER_ROLE != 'cloud':
raise HTTPException(status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
detail='Upload endpoint is available only on cloud server')
uploaded_file = await _get_upload_file(request, file)
if uploaded_file is None or not uploaded_file.filename:
raise HTTPException(status_code=400, detail='No selected file')
content = await uploaded_file.read()
if len(content) > config.MEDIA_UPLOAD_MAX_BYTES:
raise HTTPException(
status_code=400, detail=f'File too large (max {config.MEDIA_UPLOAD_MAX_BYTES} bytes)')
db = models.SessionLocal()
try:
cache_size = _get_cloud_cache_size_bytes(db)
is_avatar = purpose == 'avatar'
if cache_size >= config.CLOUD_CACHE_MAX_BYTES and not is_avatar:
raise HTTPException(
status_code=503,
detail='Cloud media cache is full; new uploads are temporarily paused until pending files are forwarded.',
)
file_id = uuid.uuid4().hex
local_filename = f"{file_id}.enc"
storage_path = os.path.join(
config.CLOUD_MEDIA_CACHE_FOLDER, local_filename)
with open(storage_path, 'wb') as f:
f.write(content)
item = models.CloudMediaItem(
file_id=file_id,
owner_id=current_user.id,
original_filename=uploaded_file.filename,
content_type=uploaded_file.content_type or 'application/octet-stream',
local_filename=local_filename,
size_bytes=len(content),
status='avatar' if is_avatar else 'pending',
is_avatar=1 if is_avatar else 0,
)
db.add(item)
db.commit()
finally:
db.close()
return {'status': 'ok', 'file_id': file_id}
@mediaRouter.post('/receive')
async def receive_media(
request: Request,
file: UploadFile = File(None),
owner_id: int | None = Form(None),
cloud_file_id: str | None = Form(None),
original_filename: str | None = Form(None),
):
secret = request.headers.get('X-Media-Forwarding-Secret')
if secret != config.MEDIA_FORWARDING_SECRET:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED, detail='Invalid forwarding secret')
uploaded_file = await _get_upload_file(request, file)
if uploaded_file is None or not uploaded_file.filename:
raise HTTPException(status_code=400, detail='No selected file')
content = await uploaded_file.read()
if len(content) > config.MEDIA_UPLOAD_MAX_BYTES:
raise HTTPException(
status_code=400, detail=f'File too large (max {config.MEDIA_UPLOAD_MAX_BYTES} bytes)')
file_id = cloud_file_id or uuid.uuid4().hex
storage_filename = f"{file_id}.enc"
file_path = os.path.join(config.HOME_MEDIA_FOLDER, storage_filename)
with open(file_path, 'wb') as f:
f.write(content)
db = models.SessionLocal()
try:
home_record = models.HomeMediaFile(
file_id=file_id,
owner_id=owner_id,
original_filename=original_filename or uploaded_file.filename,
content_type=uploaded_file.content_type or 'application/octet-stream',
storage_filename=storage_filename,
size_bytes=len(content),
)
db.add(home_record)
db.commit()
_cleanup_home_quota(db, owner_id)
finally:
db.close()
return {'status': 'ok', 'file_id': file_id}
@mediaRouter.get('/size/{file_id}')
async def get_file_size(file_id: str):
db = models.SessionLocal()
db_file = None
try:
db_file = db.query(models.HomeMediaFile).filter(
models.HomeMediaFile.file_id == file_id).first()
finally:
db.close()
# 1. Проверяем наличие файла локально на этом сервере
local_path = _find_local_media_path(file_id)
if local_path and os.path.exists(local_path):
file_size = os.path.getsize(local_path)
filename = db_file.original_filename if db_file else f"file_{file_id}"
content_type = db_file.content_type if db_file else 'application/octet-stream'
encoded_filename = urllib.parse.quote(filename)
return {"file_id": file_id, "size": file_size, "file_name": encoded_filename, "content_type": content_type}
# 2. Если роль сервера 'cloud', запрашиваем размер у домашнего сервера
if config.SERVER_ROLE == 'cloud':
remote_url = f"{config.HOME_SERVER_URL}/media/size/{file_id}"
try:
# Выполняем синхронный легковесный подзапрос к домашнему серверу в треде,
# чтобы не блокировать асинхронный цикл FastAPI (по аналогии с деплоем стримов)
def _fetch_remote_size():
req = urllib.request.Request(remote_url, method='GET')
with urllib.request.urlopen(req, timeout=5.0) as response:
if response.status == 200:
import json
return json.loads(response.read().decode('utf-8'))
return None
remote_data = await asyncio.to_thread(_fetch_remote_size)
if remote_data:
return remote_data
except urllib.error.HTTPError as e:
if e.code == 404:
raise HTTPException(
status_code=404, detail='File not found on home server')
raise HTTPException(status_code=e.code, detail='Home server error')
except Exception as e:
print(f"Ошибка подключения к домашнему серверу: {e}")
raise HTTPException(
status_code=502, detail='Home server is unavailable')
# 3. Если файл не найден ни локально, ни на удаленном сервере
raise HTTPException(status_code=404, detail='File not found')
@mediaRouter.get('/{file_id}')
async def get_file(file_id: str):
db = models.SessionLocal()
db_file = None
try:
db_file = db.query(models.HomeMediaFile).filter(
models.HomeMediaFile.file_id == file_id).first()
finally:
db.close()
local_path = _find_local_media_path(file_id)
if local_path:
filename = db_file.original_filename if db_file else f"file_{file_id}"
content_type = db_file.content_type if db_file else 'application/octet-stream'
encoded_filename = urllib.parse.quote(filename)
headers = {
"Content-Disposition": f"attachment; filename*=UTF-8''{encoded_filename}"
}
return FileResponse(
local_path,
media_type=content_type,
headers=headers
)
if config.SERVER_ROLE == 'cloud':
return _stream_response_from_remote(f"{config.HOME_SERVER_URL}/media/{file_id}")
raise HTTPException(status_code=404, detail='File not found')
@mediaRouter.post('/copy_internal')
async def copy_file_internal(
request: Request,
file_id: str = Form(...),
owner_id: int = Form(...), # ID нового владельца (получателя)
):
# Проверка секрета
secret = request.headers.get('X-Media-Forwarding-Secret')
if secret != config.MEDIA_FORWARDING_SECRET:
raise HTTPException(status_code=401, detail='Unauthorized')
# 1. Находим файл
source_path = _find_local_media_path(file_id)
if not source_path:
raise HTTPException(status_code=404, detail='Source file not found')
# 2. Создаем новый ID и путь
new_file_id = uuid.uuid4().hex
new_storage_filename = f"{new_file_id}.enc"
dest_path = os.path.join(config.HOME_MEDIA_FOLDER, new_storage_filename)
# 3. Физическое копирование
shutil.copyfile(source_path, dest_path)
# 4. Обновляем БД
db = models.SessionLocal()
try:
old_record = db.query(models.HomeMediaFile).filter(
models.HomeMediaFile.file_id == file_id).first()
new_record = models.HomeMediaFile(
file_id=new_file_id,
owner_id=owner_id,
original_filename=old_record.original_filename if old_record else "copy.enc",
content_type=old_record.content_type if old_record else 'application/octet-stream',
storage_filename=new_storage_filename,
size_bytes=os.path.getsize(dest_path),
)
db.add(new_record)
db.commit()
finally:
db.close()
return {"status": "ok", "new_file_id": new_file_id}
@mediaRouter.post('/copy')
async def copy(
file_id: str = Form(...),
current_user: models.User = Depends(get_current_user),
):
if config.SERVER_ROLE != 'cloud':
raise HTTPException(status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
detail='Upload endpoint is available only on cloud server')
# Делаем запрос к домашнему серверу
url = f"{config.HOME_SERVER_URL}/media/copy_internal"
# Используем FormData для передачи параметров на домашний сервер
body_data = f"file_id={file_id}&owner_id={current_user.id}".encode('utf-8')
request = urllib.request.Request(
url,
data=body_data,
headers={
'X-Media-Forwarding-Secret': config.MEDIA_FORWARDING_SECRET,
'Content-Type': 'application/x-www-form-urlencoded'
},
method='POST'
)
try:
with urllib.request.urlopen(request, timeout=10) as response:
if response.status == 200:
import json
return json.loads(response.read().decode('utf-8'))
except Exception as e:
raise HTTPException(
status_code=502, detail=f'Failed to copy on home server: {e}')
raise HTTPException(status_code=500, detail='Copying failed')