585 lines
20 KiB
Python
585 lines
20 KiB
Python
import shutil
|
||
from fastapi import Depends, FastAPI, HTTPException, status, APIRouter, File, UploadFile, Request, Form
|
||
from fastapi.responses import FileResponse, StreamingResponse
|
||
from sqlalchemy.orm import Session
|
||
from sqlalchemy.sql import func
|
||
from app.core.security import get_current_user
|
||
from app.db import models
|
||
from app.core.config import config
|
||
import os
|
||
import re
|
||
import uuid
|
||
import urllib.request
|
||
import urllib.error
|
||
from io import BytesIO
|
||
import asyncio
|
||
|
||
|
||
def _ensure_directory(path: str):
|
||
if not os.path.exists(path):
|
||
os.makedirs(path, exist_ok=True)
|
||
|
||
|
||
UPLOAD_FOLDER = 'uploads'
|
||
|
||
|
||
def _parse_multipart_body(body: bytes):
|
||
try:
|
||
if not body.startswith(b"--"):
|
||
return None
|
||
|
||
boundary, _ = body.split(b"\r\n", 1)
|
||
parts = body.split(boundary)
|
||
for part in parts:
|
||
if not part or part in (b"--", b"--\r\n"):
|
||
continue
|
||
|
||
part = part.strip(b"\r\n")
|
||
if not part:
|
||
continue
|
||
|
||
headers, _, content = part.partition(b"\r\n\r\n")
|
||
if not headers or content is None:
|
||
continue
|
||
|
||
disposition_match = re.search(
|
||
br'Content-Disposition:\s*form-data;\s*name="([^"]+)"(?:;\s*filename="([^"]+)")?',
|
||
headers,
|
||
re.IGNORECASE,
|
||
)
|
||
if not disposition_match:
|
||
continue
|
||
|
||
field_name = disposition_match.group(
|
||
1).decode('utf-8', errors='ignore')
|
||
filename = disposition_match.group(2)
|
||
if field_name != 'file':
|
||
continue
|
||
|
||
filename = filename.decode(
|
||
'utf-8', errors='ignore') if filename else 'upload.bin'
|
||
content_type_match = re.search(
|
||
br'Content-Type:\s*([\w\-\/]+)', headers, re.IGNORECASE)
|
||
content_type = (
|
||
content_type_match.group(1).decode('utf-8', errors='ignore')
|
||
if content_type_match
|
||
else 'application/octet-stream'
|
||
)
|
||
return filename, content.rstrip(b'\r\n'), content_type
|
||
except Exception:
|
||
return None
|
||
return None
|
||
|
||
|
||
async def _get_upload_file(request: Request, uploaded_file: UploadFile | None):
|
||
if uploaded_file is not None:
|
||
return uploaded_file
|
||
|
||
raw_body = await request.body()
|
||
parsed = _parse_multipart_body(raw_body)
|
||
if parsed is None:
|
||
return None
|
||
|
||
filename, content, content_type = parsed
|
||
return UploadFile(filename=filename, file=BytesIO(content), content_type=content_type)
|
||
|
||
|
||
def _encode_multipart_formdata(fields, files):
|
||
boundary = uuid.uuid4().hex
|
||
body = BytesIO()
|
||
|
||
for name, value in fields.items():
|
||
body.write(f"--{boundary}\r\n".encode('utf-8'))
|
||
body.write(
|
||
f'Content-Disposition: form-data; name="{name}"\r\n\r\n'.encode('utf-8'))
|
||
body.write(str(value).encode('utf-8'))
|
||
body.write(b"\r\n")
|
||
|
||
for field_name, filename, content_type, file_bytes in files:
|
||
body.write(f"--{boundary}\r\n".encode('utf-8'))
|
||
body.write(
|
||
f'Content-Disposition: form-data; name="{field_name}"; filename="{filename}"\r\n'.encode(
|
||
'utf-8')
|
||
)
|
||
body.write(f"Content-Type: {content_type}\r\n\r\n".encode('utf-8'))
|
||
body.write(file_bytes)
|
||
body.write(b"\r\n")
|
||
|
||
body.write(f"--{boundary}--\r\n".encode('utf-8'))
|
||
return body.getvalue(), boundary
|
||
|
||
|
||
def _get_cloud_cache_size_bytes(db: Session) -> int:
|
||
total = db.query(func.sum(models.CloudMediaItem.size_bytes)).filter(
|
||
models.CloudMediaItem.status.in_(['pending', 'sending']),
|
||
models.CloudMediaItem.is_avatar == 0,
|
||
).scalar()
|
||
return int(total or 0)
|
||
|
||
|
||
def _find_local_media_path(file_id: str) -> str | None:
|
||
candidates = [
|
||
os.path.join(config.CLOUD_MEDIA_CACHE_FOLDER, f"{file_id}.enc"),
|
||
os.path.join('uploads', f"{file_id}.enc"),
|
||
os.path.join(config.HOME_MEDIA_FOLDER, f"{file_id}.enc"),
|
||
]
|
||
for path in candidates:
|
||
if os.path.exists(path):
|
||
return path
|
||
return None
|
||
|
||
|
||
def _stream_response_from_remote(url: str):
|
||
try:
|
||
request = urllib.request.Request(url)
|
||
response = urllib.request.urlopen(request, timeout=45)
|
||
except urllib.error.HTTPError as exc:
|
||
if exc.code == 404:
|
||
raise HTTPException(status_code=404, detail='File not found')
|
||
raise HTTPException(
|
||
status_code=502, detail=f'Error fetching media from home server: {exc.code}')
|
||
except Exception as exc:
|
||
raise HTTPException(
|
||
status_code=502, detail=f'Could not reach home server: {exc}')
|
||
|
||
headers = {k.lower(): v for k, v in response.getheaders()}
|
||
content_type = headers.get('content-type', 'application/octet-stream')
|
||
return StreamingResponse(
|
||
iter(lambda: response.read(8192), b""),
|
||
media_type=content_type,
|
||
headers={
|
||
'Content-Disposition': headers.get('content-disposition', f'attachment; filename="{os.path.basename(url)}"')
|
||
},
|
||
)
|
||
|
||
|
||
def _post_file_to_home(item: models.CloudMediaItem) -> tuple[bool, str]:
|
||
file_path = os.path.join(
|
||
config.CLOUD_MEDIA_CACHE_FOLDER, item.local_filename)
|
||
if not os.path.exists(file_path):
|
||
return False, 'Local cache file not found'
|
||
|
||
with open(file_path, 'rb') as f:
|
||
content = f.read()
|
||
|
||
fields = {
|
||
'owner_id': item.owner_id or '',
|
||
'cloud_file_id': item.file_id,
|
||
'original_filename': item.original_filename or item.local_filename,
|
||
}
|
||
files = [
|
||
('file', item.original_filename or item.local_filename,
|
||
item.content_type or 'application/octet-stream', content),
|
||
]
|
||
body, boundary = _encode_multipart_formdata(fields, files)
|
||
request = urllib.request.Request(
|
||
f"{config.HOME_SERVER_URL}/media/receive",
|
||
data=body,
|
||
headers={
|
||
'Content-Type': f'multipart/form-data; boundary={boundary}',
|
||
'X-Media-Forwarding-Secret': config.MEDIA_FORWARDING_SECRET,
|
||
},
|
||
)
|
||
|
||
try:
|
||
with urllib.request.urlopen(request, timeout=60) as response:
|
||
if response.status == 200:
|
||
return True, ''
|
||
return False, f'Home server returned {response.status}'
|
||
except urllib.error.HTTPError as exc:
|
||
body = exc.read().decode(errors='ignore')
|
||
return False, f'Home server HTTP error {exc.code}: {body}'
|
||
except Exception as exc:
|
||
return False, str(exc)
|
||
|
||
|
||
def _cleanup_home_quota(db: Session, owner_id: int | None):
|
||
if owner_id is None:
|
||
return
|
||
|
||
total = db.query(func.sum(models.HomeMediaFile.size_bytes)).filter(
|
||
models.HomeMediaFile.owner_id == owner_id
|
||
).scalar() or 0
|
||
total = int(total)
|
||
if total <= config.HOME_USER_QUOTA_BYTES:
|
||
return
|
||
|
||
files = db.query(models.HomeMediaFile).filter(
|
||
models.HomeMediaFile.owner_id == owner_id
|
||
).order_by(models.HomeMediaFile.created_at.asc()).all()
|
||
|
||
for file_record in files:
|
||
if total <= config.HOME_USER_QUOTA_BYTES:
|
||
break
|
||
path = os.path.join(config.HOME_MEDIA_FOLDER,
|
||
file_record.storage_filename)
|
||
if os.path.exists(path):
|
||
os.remove(path)
|
||
total -= file_record.size_bytes
|
||
db.delete(file_record)
|
||
db.commit()
|
||
|
||
|
||
def _cleanup_all_home_storage():
|
||
db = models.SessionLocal()
|
||
try:
|
||
owner_ids = db.query(models.HomeMediaFile.owner_id).filter(
|
||
models.HomeMediaFile.owner_id.isnot(None)).distinct().all()
|
||
for owner_id_tuple in owner_ids:
|
||
_cleanup_home_quota(db, owner_id_tuple[0])
|
||
finally:
|
||
db.close()
|
||
|
||
|
||
async def forward_pending_media_loop():
|
||
while True:
|
||
if config.SERVER_ROLE != 'cloud':
|
||
await asyncio.sleep(10)
|
||
continue
|
||
|
||
db = models.SessionLocal()
|
||
try:
|
||
total_cache = _get_cloud_cache_size_bytes(db)
|
||
if total_cache >= config.CLOUD_CACHE_MAX_BYTES:
|
||
await asyncio.sleep(config.MEDIA_FORWARD_INTERVAL_SECONDS)
|
||
continue
|
||
|
||
pending_items = db.query(models.CloudMediaItem).filter(
|
||
models.CloudMediaItem.status == 'pending',
|
||
models.CloudMediaItem.is_avatar == 0,
|
||
).order_by(models.CloudMediaItem.created_at.asc()).limit(5).all()
|
||
|
||
for item in pending_items:
|
||
item.status = 'sending'
|
||
item.attempts += 1
|
||
db.commit()
|
||
|
||
success, error = _post_file_to_home(item)
|
||
if success:
|
||
item.status = 'sent'
|
||
item.sent_at = func.now()
|
||
item.error_message = None
|
||
db.commit()
|
||
cache_path = os.path.join(
|
||
config.CLOUD_MEDIA_CACHE_FOLDER, item.local_filename)
|
||
if os.path.exists(cache_path):
|
||
os.remove(cache_path)
|
||
else:
|
||
item.status = 'failed'
|
||
item.error_message = error
|
||
db.commit()
|
||
except Exception:
|
||
pass
|
||
finally:
|
||
db.close()
|
||
await asyncio.sleep(config.MEDIA_FORWARD_INTERVAL_SECONDS)
|
||
|
||
|
||
async def home_storage_maintenance_loop():
|
||
while True:
|
||
if config.SERVER_ROLE != 'home':
|
||
await asyncio.sleep(10)
|
||
continue
|
||
_cleanup_all_home_storage()
|
||
await asyncio.sleep(600)
|
||
|
||
|
||
mediaRouter = APIRouter(
|
||
prefix='/media',
|
||
tags=['media'],
|
||
)
|
||
|
||
|
||
_ensure_directory(UPLOAD_FOLDER)
|
||
_ensure_directory(config.CLOUD_MEDIA_CACHE_FOLDER)
|
||
_ensure_directory(config.HOME_MEDIA_FOLDER)
|
||
|
||
|
||
@mediaRouter.post('/upload')
|
||
async def upload_file(
|
||
request: Request,
|
||
file: UploadFile = File(None),
|
||
):
|
||
uploaded_file = await _get_upload_file(request, file)
|
||
if uploaded_file is None or not uploaded_file.filename:
|
||
raise HTTPException(status_code=400, detail='No selected file')
|
||
|
||
content = await uploaded_file.read()
|
||
if len(content) > config.MEDIA_UPLOAD_MAX_BYTES:
|
||
raise HTTPException(
|
||
status_code=400, detail=f'File too large (max {config.MEDIA_UPLOAD_MAX_BYTES} bytes)')
|
||
|
||
file_id = uuid.uuid4().hex
|
||
filename = f"{file_id}.enc"
|
||
file_path = os.path.join(UPLOAD_FOLDER, filename)
|
||
with open(file_path, 'wb') as f:
|
||
f.write(content)
|
||
|
||
return {
|
||
'status': 'ok',
|
||
'file_id': file_id,
|
||
}
|
||
|
||
|
||
@mediaRouter.post('/v2/upload')
|
||
async def upload_file_v2(
|
||
request: Request,
|
||
file: UploadFile = File(None),
|
||
purpose: str = Form('media'),
|
||
current_user: models.User = Depends(get_current_user),
|
||
):
|
||
if config.SERVER_ROLE != 'cloud':
|
||
raise HTTPException(status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
|
||
detail='Upload endpoint is available only on cloud server')
|
||
|
||
uploaded_file = await _get_upload_file(request, file)
|
||
if uploaded_file is None or not uploaded_file.filename:
|
||
raise HTTPException(status_code=400, detail='No selected file')
|
||
|
||
content = await uploaded_file.read()
|
||
if len(content) > config.MEDIA_UPLOAD_MAX_BYTES:
|
||
raise HTTPException(
|
||
status_code=400, detail=f'File too large (max {config.MEDIA_UPLOAD_MAX_BYTES} bytes)')
|
||
|
||
db = models.SessionLocal()
|
||
try:
|
||
cache_size = _get_cloud_cache_size_bytes(db)
|
||
is_avatar = purpose == 'avatar'
|
||
if cache_size >= config.CLOUD_CACHE_MAX_BYTES and not is_avatar:
|
||
raise HTTPException(
|
||
status_code=503,
|
||
detail='Cloud media cache is full; new uploads are temporarily paused until pending files are forwarded.',
|
||
)
|
||
|
||
file_id = uuid.uuid4().hex
|
||
local_filename = f"{file_id}.enc"
|
||
storage_path = os.path.join(
|
||
config.CLOUD_MEDIA_CACHE_FOLDER, local_filename)
|
||
with open(storage_path, 'wb') as f:
|
||
f.write(content)
|
||
|
||
item = models.CloudMediaItem(
|
||
file_id=file_id,
|
||
owner_id=current_user.id,
|
||
original_filename=uploaded_file.filename,
|
||
content_type=uploaded_file.content_type or 'application/octet-stream',
|
||
local_filename=local_filename,
|
||
size_bytes=len(content),
|
||
status='avatar' if is_avatar else 'pending',
|
||
is_avatar=1 if is_avatar else 0,
|
||
)
|
||
db.add(item)
|
||
db.commit()
|
||
finally:
|
||
db.close()
|
||
|
||
return {'status': 'ok', 'file_id': file_id}
|
||
|
||
|
||
@mediaRouter.post('/receive')
|
||
async def receive_media(
|
||
request: Request,
|
||
file: UploadFile = File(None),
|
||
owner_id: int | None = Form(None),
|
||
cloud_file_id: str | None = Form(None),
|
||
original_filename: str | None = Form(None),
|
||
):
|
||
secret = request.headers.get('X-Media-Forwarding-Secret')
|
||
if secret != config.MEDIA_FORWARDING_SECRET:
|
||
raise HTTPException(
|
||
status_code=status.HTTP_401_UNAUTHORIZED, detail='Invalid forwarding secret')
|
||
|
||
uploaded_file = await _get_upload_file(request, file)
|
||
if uploaded_file is None or not uploaded_file.filename:
|
||
raise HTTPException(status_code=400, detail='No selected file')
|
||
|
||
content = await uploaded_file.read()
|
||
if len(content) > config.MEDIA_UPLOAD_MAX_BYTES:
|
||
raise HTTPException(
|
||
status_code=400, detail=f'File too large (max {config.MEDIA_UPLOAD_MAX_BYTES} bytes)')
|
||
|
||
file_id = cloud_file_id or uuid.uuid4().hex
|
||
storage_filename = f"{file_id}.enc"
|
||
file_path = os.path.join(config.HOME_MEDIA_FOLDER, storage_filename)
|
||
with open(file_path, 'wb') as f:
|
||
f.write(content)
|
||
|
||
db = models.SessionLocal()
|
||
try:
|
||
home_record = models.HomeMediaFile(
|
||
file_id=file_id,
|
||
owner_id=owner_id,
|
||
original_filename=original_filename or uploaded_file.filename,
|
||
content_type=uploaded_file.content_type or 'application/octet-stream',
|
||
storage_filename=storage_filename,
|
||
size_bytes=len(content),
|
||
)
|
||
db.add(home_record)
|
||
db.commit()
|
||
_cleanup_home_quota(db, owner_id)
|
||
finally:
|
||
db.close()
|
||
|
||
return {'status': 'ok', 'file_id': file_id}
|
||
|
||
|
||
@mediaRouter.get('/size/{file_id}')
|
||
async def get_file_size(file_id: str):
|
||
db = models.SessionLocal()
|
||
db_file = None
|
||
try:
|
||
db_file = db.query(models.HomeMediaFile).filter(
|
||
models.HomeMediaFile.file_id == file_id).first()
|
||
finally:
|
||
db.close()
|
||
# 1. Проверяем наличие файла локально на этом сервере
|
||
local_path = _find_local_media_path(file_id)
|
||
if local_path and os.path.exists(local_path):
|
||
file_size = os.path.getsize(local_path)
|
||
filename = db_file.original_filename if db_file else f"file_{file_id}"
|
||
content_type = db_file.content_type if db_file else 'application/octet-stream'
|
||
encoded_filename = urllib.parse.quote(filename)
|
||
return {"file_id": file_id, "size": file_size, "file_name": encoded_filename, "content_type": content_type}
|
||
|
||
# 2. Если роль сервера 'cloud', запрашиваем размер у домашнего сервера
|
||
if config.SERVER_ROLE == 'cloud':
|
||
remote_url = f"{config.HOME_SERVER_URL}/media/size/{file_id}"
|
||
try:
|
||
# Выполняем синхронный легковесный подзапрос к домашнему серверу в треде,
|
||
# чтобы не блокировать асинхронный цикл FastAPI (по аналогии с деплоем стримов)
|
||
def _fetch_remote_size():
|
||
req = urllib.request.Request(remote_url, method='GET')
|
||
with urllib.request.urlopen(req, timeout=5.0) as response:
|
||
if response.status == 200:
|
||
import json
|
||
return json.loads(response.read().decode('utf-8'))
|
||
return None
|
||
|
||
remote_data = await asyncio.to_thread(_fetch_remote_size)
|
||
if remote_data:
|
||
return remote_data
|
||
|
||
except urllib.error.HTTPError as e:
|
||
if e.code == 404:
|
||
raise HTTPException(
|
||
status_code=404, detail='File not found on home server')
|
||
raise HTTPException(status_code=e.code, detail='Home server error')
|
||
except Exception as e:
|
||
print(f"Ошибка подключения к домашнему серверу: {e}")
|
||
raise HTTPException(
|
||
status_code=502, detail='Home server is unavailable')
|
||
|
||
# 3. Если файл не найден ни локально, ни на удаленном сервере
|
||
raise HTTPException(status_code=404, detail='File not found')
|
||
|
||
|
||
@mediaRouter.get('/{file_id}')
|
||
async def get_file(file_id: str):
|
||
db = models.SessionLocal()
|
||
db_file = None
|
||
try:
|
||
db_file = db.query(models.HomeMediaFile).filter(
|
||
models.HomeMediaFile.file_id == file_id).first()
|
||
finally:
|
||
db.close()
|
||
local_path = _find_local_media_path(file_id)
|
||
if local_path:
|
||
filename = db_file.original_filename if db_file else f"file_{file_id}"
|
||
content_type = db_file.content_type if db_file else 'application/octet-stream'
|
||
encoded_filename = urllib.parse.quote(filename)
|
||
headers = {
|
||
"Content-Disposition": f"attachment; filename*=UTF-8''{encoded_filename}"
|
||
}
|
||
|
||
return FileResponse(
|
||
local_path,
|
||
media_type=content_type,
|
||
headers=headers
|
||
)
|
||
|
||
if config.SERVER_ROLE == 'cloud':
|
||
return _stream_response_from_remote(f"{config.HOME_SERVER_URL}/media/{file_id}")
|
||
|
||
raise HTTPException(status_code=404, detail='File not found')
|
||
|
||
|
||
@mediaRouter.post('/copy_internal')
|
||
async def copy_file_internal(
|
||
request: Request,
|
||
file_id: str = Form(...),
|
||
owner_id: int = Form(...), # ID нового владельца (получателя)
|
||
):
|
||
# Проверка секрета
|
||
secret = request.headers.get('X-Media-Forwarding-Secret')
|
||
if secret != config.MEDIA_FORWARDING_SECRET:
|
||
raise HTTPException(status_code=401, detail='Unauthorized')
|
||
|
||
# 1. Находим файл
|
||
source_path = _find_local_media_path(file_id)
|
||
if not source_path:
|
||
raise HTTPException(status_code=404, detail='Source file not found')
|
||
|
||
# 2. Создаем новый ID и путь
|
||
new_file_id = uuid.uuid4().hex
|
||
new_storage_filename = f"{new_file_id}.enc"
|
||
dest_path = os.path.join(config.HOME_MEDIA_FOLDER, new_storage_filename)
|
||
|
||
# 3. Физическое копирование
|
||
shutil.copyfile(source_path, dest_path)
|
||
|
||
# 4. Обновляем БД
|
||
db = models.SessionLocal()
|
||
try:
|
||
old_record = db.query(models.HomeMediaFile).filter(
|
||
models.HomeMediaFile.file_id == file_id).first()
|
||
new_record = models.HomeMediaFile(
|
||
file_id=new_file_id,
|
||
owner_id=owner_id,
|
||
original_filename=old_record.original_filename if old_record else "copy.enc",
|
||
content_type=old_record.content_type if old_record else 'application/octet-stream',
|
||
storage_filename=new_storage_filename,
|
||
size_bytes=os.path.getsize(dest_path),
|
||
)
|
||
db.add(new_record)
|
||
db.commit()
|
||
finally:
|
||
db.close()
|
||
|
||
return {"status": "ok", "new_file_id": new_file_id}
|
||
|
||
|
||
@mediaRouter.post('/copy')
|
||
async def copy(
|
||
file_id: str = Form(...),
|
||
current_user: models.User = Depends(get_current_user),
|
||
):
|
||
if config.SERVER_ROLE != 'cloud':
|
||
raise HTTPException(status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
|
||
detail='Upload endpoint is available only on cloud server')
|
||
|
||
# Делаем запрос к домашнему серверу
|
||
url = f"{config.HOME_SERVER_URL}/media/copy_internal"
|
||
|
||
# Используем FormData для передачи параметров на домашний сервер
|
||
body_data = f"file_id={file_id}&owner_id={current_user.id}".encode('utf-8')
|
||
request = urllib.request.Request(
|
||
url,
|
||
data=body_data,
|
||
headers={
|
||
'X-Media-Forwarding-Secret': config.MEDIA_FORWARDING_SECRET,
|
||
'Content-Type': 'application/x-www-form-urlencoded'
|
||
},
|
||
method='POST'
|
||
)
|
||
|
||
try:
|
||
with urllib.request.urlopen(request, timeout=10) as response:
|
||
if response.status == 200:
|
||
import json
|
||
return json.loads(response.read().decode('utf-8'))
|
||
except Exception as e:
|
||
raise HTTPException(
|
||
status_code=502, detail=f'Failed to copy on home server: {e}')
|
||
|
||
raise HTTPException(status_code=500, detail='Copying failed')
|