417 lines
14 KiB
Python
417 lines
14 KiB
Python
from fastapi import Depends, FastAPI, HTTPException, status, APIRouter, File, UploadFile, Request, Form
|
|
from fastapi.responses import FileResponse, StreamingResponse
|
|
from sqlalchemy.orm import Session
|
|
from sqlalchemy.sql import func
|
|
from app.core.security import get_current_user
|
|
from app.db import models
|
|
from app.core.config import config
|
|
import os
|
|
import re
|
|
import uuid
|
|
import urllib.request
|
|
import urllib.error
|
|
from io import BytesIO
|
|
import asyncio
|
|
|
|
|
|
def _ensure_directory(path: str):
|
|
if not os.path.exists(path):
|
|
os.makedirs(path, exist_ok=True)
|
|
|
|
|
|
UPLOAD_FOLDER = 'uploads'
|
|
|
|
|
|
def _parse_multipart_body(body: bytes):
|
|
try:
|
|
if not body.startswith(b"--"):
|
|
return None
|
|
|
|
boundary, _ = body.split(b"\r\n", 1)
|
|
parts = body.split(boundary)
|
|
for part in parts:
|
|
if not part or part in (b"--", b"--\r\n"):
|
|
continue
|
|
|
|
part = part.strip(b"\r\n")
|
|
if not part:
|
|
continue
|
|
|
|
headers, _, content = part.partition(b"\r\n\r\n")
|
|
if not headers or content is None:
|
|
continue
|
|
|
|
disposition_match = re.search(
|
|
br'Content-Disposition:\s*form-data;\s*name="([^"]+)"(?:;\s*filename="([^"]+)")?',
|
|
headers,
|
|
re.IGNORECASE,
|
|
)
|
|
if not disposition_match:
|
|
continue
|
|
|
|
field_name = disposition_match.group(1).decode('utf-8', errors='ignore')
|
|
filename = disposition_match.group(2)
|
|
if field_name != 'file':
|
|
continue
|
|
|
|
filename = filename.decode('utf-8', errors='ignore') if filename else 'upload.bin'
|
|
content_type_match = re.search(br'Content-Type:\s*([\w\-\/]+)', headers, re.IGNORECASE)
|
|
content_type = (
|
|
content_type_match.group(1).decode('utf-8', errors='ignore')
|
|
if content_type_match
|
|
else 'application/octet-stream'
|
|
)
|
|
return filename, content.rstrip(b'\r\n'), content_type
|
|
except Exception:
|
|
return None
|
|
return None
|
|
|
|
|
|
async def _get_upload_file(request: Request, uploaded_file: UploadFile | None):
|
|
if uploaded_file is not None:
|
|
return uploaded_file
|
|
|
|
raw_body = await request.body()
|
|
parsed = _parse_multipart_body(raw_body)
|
|
if parsed is None:
|
|
return None
|
|
|
|
filename, content, content_type = parsed
|
|
return UploadFile(filename=filename, file=BytesIO(content), content_type=content_type)
|
|
|
|
|
|
def _encode_multipart_formdata(fields, files):
|
|
boundary = uuid.uuid4().hex
|
|
body = BytesIO()
|
|
|
|
for name, value in fields.items():
|
|
body.write(f"--{boundary}\r\n".encode('utf-8'))
|
|
body.write(f'Content-Disposition: form-data; name="{name}"\r\n\r\n'.encode('utf-8'))
|
|
body.write(str(value).encode('utf-8'))
|
|
body.write(b"\r\n")
|
|
|
|
for field_name, filename, content_type, file_bytes in files:
|
|
body.write(f"--{boundary}\r\n".encode('utf-8'))
|
|
body.write(
|
|
f'Content-Disposition: form-data; name="{field_name}"; filename="{filename}"\r\n'.encode('utf-8')
|
|
)
|
|
body.write(f"Content-Type: {content_type}\r\n\r\n".encode('utf-8'))
|
|
body.write(file_bytes)
|
|
body.write(b"\r\n")
|
|
|
|
body.write(f"--{boundary}--\r\n".encode('utf-8'))
|
|
return body.getvalue(), boundary
|
|
|
|
|
|
def _get_cloud_cache_size_bytes(db: Session) -> int:
|
|
total = db.query(func.sum(models.CloudMediaItem.size_bytes)).filter(
|
|
models.CloudMediaItem.status.in_(['pending', 'sending']),
|
|
models.CloudMediaItem.is_avatar == 0,
|
|
).scalar()
|
|
return int(total or 0)
|
|
|
|
|
|
def _find_local_media_path(file_id: str) -> str | None:
|
|
candidates = [
|
|
os.path.join(config.CLOUD_MEDIA_CACHE_FOLDER, f"{file_id}.enc"),
|
|
os.path.join('uploads', f"{file_id}.enc"),
|
|
os.path.join(config.HOME_MEDIA_FOLDER, f"{file_id}.enc"),
|
|
]
|
|
for path in candidates:
|
|
if os.path.exists(path):
|
|
return path
|
|
return None
|
|
|
|
|
|
def _stream_response_from_remote(url: str):
|
|
try:
|
|
request = urllib.request.Request(url)
|
|
response = urllib.request.urlopen(request, timeout=45)
|
|
except urllib.error.HTTPError as exc:
|
|
if exc.code == 404:
|
|
raise HTTPException(status_code=404, detail='File not found')
|
|
raise HTTPException(status_code=502, detail=f'Error fetching media from home server: {exc.code}')
|
|
except Exception as exc:
|
|
raise HTTPException(status_code=502, detail=f'Could not reach home server: {exc}')
|
|
|
|
headers = {k.lower(): v for k, v in response.getheaders()}
|
|
content_type = headers.get('content-type', 'application/octet-stream')
|
|
return StreamingResponse(
|
|
iter(lambda: response.read(8192), b""),
|
|
media_type=content_type,
|
|
headers={
|
|
'Content-Disposition': headers.get('content-disposition', f'attachment; filename="{os.path.basename(url)}"')
|
|
},
|
|
)
|
|
|
|
|
|
def _post_file_to_home(item: models.CloudMediaItem) -> tuple[bool, str]:
|
|
file_path = os.path.join(config.CLOUD_MEDIA_CACHE_FOLDER, item.local_filename)
|
|
if not os.path.exists(file_path):
|
|
return False, 'Local cache file not found'
|
|
|
|
with open(file_path, 'rb') as f:
|
|
content = f.read()
|
|
|
|
fields = {
|
|
'owner_id': item.owner_id or '',
|
|
'cloud_file_id': item.file_id,
|
|
'original_filename': item.original_filename or item.local_filename,
|
|
}
|
|
files = [
|
|
('file', item.original_filename or item.local_filename, item.content_type or 'application/octet-stream', content),
|
|
]
|
|
body, boundary = _encode_multipart_formdata(fields, files)
|
|
request = urllib.request.Request(
|
|
f"{config.HOME_SERVER_URL}/media/receive",
|
|
data=body,
|
|
headers={
|
|
'Content-Type': f'multipart/form-data; boundary={boundary}',
|
|
'X-Media-Forwarding-Secret': config.MEDIA_FORWARDING_SECRET,
|
|
},
|
|
)
|
|
|
|
try:
|
|
with urllib.request.urlopen(request, timeout=60) as response:
|
|
if response.status == 200:
|
|
return True, ''
|
|
return False, f'Home server returned {response.status}'
|
|
except urllib.error.HTTPError as exc:
|
|
body = exc.read().decode(errors='ignore')
|
|
return False, f'Home server HTTP error {exc.code}: {body}'
|
|
except Exception as exc:
|
|
return False, str(exc)
|
|
|
|
|
|
def _cleanup_home_quota(db: Session, owner_id: int | None):
|
|
if owner_id is None:
|
|
return
|
|
|
|
total = db.query(func.sum(models.HomeMediaFile.size_bytes)).filter(
|
|
models.HomeMediaFile.owner_id == owner_id
|
|
).scalar() or 0
|
|
total = int(total)
|
|
if total <= config.HOME_USER_QUOTA_BYTES:
|
|
return
|
|
|
|
files = db.query(models.HomeMediaFile).filter(
|
|
models.HomeMediaFile.owner_id == owner_id
|
|
).order_by(models.HomeMediaFile.created_at.asc()).all()
|
|
|
|
for file_record in files:
|
|
if total <= config.HOME_USER_QUOTA_BYTES:
|
|
break
|
|
path = os.path.join(config.HOME_MEDIA_FOLDER, file_record.storage_filename)
|
|
if os.path.exists(path):
|
|
os.remove(path)
|
|
total -= file_record.size_bytes
|
|
db.delete(file_record)
|
|
db.commit()
|
|
|
|
|
|
def _cleanup_all_home_storage():
|
|
db = models.SessionLocal()
|
|
try:
|
|
owner_ids = db.query(models.HomeMediaFile.owner_id).filter(models.HomeMediaFile.owner_id.isnot(None)).distinct().all()
|
|
for owner_id_tuple in owner_ids:
|
|
_cleanup_home_quota(db, owner_id_tuple[0])
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
async def forward_pending_media_loop():
|
|
while True:
|
|
if config.SERVER_ROLE != 'cloud':
|
|
await asyncio.sleep(10)
|
|
continue
|
|
|
|
db = models.SessionLocal()
|
|
try:
|
|
total_cache = _get_cloud_cache_size_bytes(db)
|
|
if total_cache >= config.CLOUD_CACHE_MAX_BYTES:
|
|
await asyncio.sleep(config.MEDIA_FORWARD_INTERVAL_SECONDS)
|
|
continue
|
|
|
|
pending_items = db.query(models.CloudMediaItem).filter(
|
|
models.CloudMediaItem.status == 'pending',
|
|
models.CloudMediaItem.is_avatar == 0,
|
|
).order_by(models.CloudMediaItem.created_at.asc()).limit(5).all()
|
|
|
|
for item in pending_items:
|
|
item.status = 'sending'
|
|
item.attempts += 1
|
|
db.commit()
|
|
|
|
success, error = _post_file_to_home(item)
|
|
if success:
|
|
item.status = 'sent'
|
|
item.sent_at = func.now()
|
|
item.error_message = None
|
|
db.commit()
|
|
cache_path = os.path.join(config.CLOUD_MEDIA_CACHE_FOLDER, item.local_filename)
|
|
if os.path.exists(cache_path):
|
|
os.remove(cache_path)
|
|
else:
|
|
item.status = 'failed'
|
|
item.error_message = error
|
|
db.commit()
|
|
except Exception:
|
|
pass
|
|
finally:
|
|
db.close()
|
|
await asyncio.sleep(config.MEDIA_FORWARD_INTERVAL_SECONDS)
|
|
|
|
|
|
async def home_storage_maintenance_loop():
|
|
while True:
|
|
if config.SERVER_ROLE != 'home':
|
|
await asyncio.sleep(10)
|
|
continue
|
|
_cleanup_all_home_storage()
|
|
await asyncio.sleep(600)
|
|
|
|
|
|
mediaRouter = APIRouter(
|
|
prefix='/media',
|
|
tags=['media'],
|
|
)
|
|
|
|
|
|
_ensure_directory(UPLOAD_FOLDER)
|
|
_ensure_directory(config.CLOUD_MEDIA_CACHE_FOLDER)
|
|
_ensure_directory(config.HOME_MEDIA_FOLDER)
|
|
|
|
|
|
@mediaRouter.post('/upload')
|
|
async def upload_file(
|
|
request: Request,
|
|
file: UploadFile = File(None),
|
|
):
|
|
uploaded_file = await _get_upload_file(request, file)
|
|
if uploaded_file is None or not uploaded_file.filename:
|
|
raise HTTPException(status_code=400, detail='No selected file')
|
|
|
|
content = await uploaded_file.read()
|
|
if len(content) > config.MEDIA_UPLOAD_MAX_BYTES:
|
|
raise HTTPException(status_code=400, detail=f'File too large (max {config.MEDIA_UPLOAD_MAX_BYTES} bytes)')
|
|
|
|
file_id = uuid.uuid4().hex
|
|
filename = f"{file_id}.enc"
|
|
file_path = os.path.join(UPLOAD_FOLDER, filename)
|
|
with open(file_path, 'wb') as f:
|
|
f.write(content)
|
|
|
|
return {
|
|
'status': 'ok',
|
|
'file_id': file_id,
|
|
}
|
|
|
|
|
|
@mediaRouter.post('/v2/upload')
|
|
async def upload_file_v2(
|
|
request: Request,
|
|
file: UploadFile = File(None),
|
|
purpose: str = Form('media'),
|
|
current_user: models.User = Depends(get_current_user),
|
|
):
|
|
if config.SERVER_ROLE != 'cloud':
|
|
raise HTTPException(status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail='Upload endpoint is available only on cloud server')
|
|
|
|
uploaded_file = await _get_upload_file(request, file)
|
|
if uploaded_file is None or not uploaded_file.filename:
|
|
raise HTTPException(status_code=400, detail='No selected file')
|
|
|
|
content = await uploaded_file.read()
|
|
if len(content) > config.MEDIA_UPLOAD_MAX_BYTES:
|
|
raise HTTPException(status_code=400, detail=f'File too large (max {config.MEDIA_UPLOAD_MAX_BYTES} bytes)')
|
|
|
|
db = models.SessionLocal()
|
|
try:
|
|
cache_size = _get_cloud_cache_size_bytes(db)
|
|
is_avatar = purpose == 'avatar'
|
|
if cache_size >= config.CLOUD_CACHE_MAX_BYTES and not is_avatar:
|
|
raise HTTPException(
|
|
status_code=503,
|
|
detail='Cloud media cache is full; new uploads are temporarily paused until pending files are forwarded.',
|
|
)
|
|
|
|
file_id = uuid.uuid4().hex
|
|
local_filename = f"{file_id}.enc"
|
|
storage_path = os.path.join(config.CLOUD_MEDIA_CACHE_FOLDER, local_filename)
|
|
with open(storage_path, 'wb') as f:
|
|
f.write(content)
|
|
|
|
item = models.CloudMediaItem(
|
|
file_id=file_id,
|
|
owner_id=current_user.id,
|
|
original_filename=uploaded_file.filename,
|
|
content_type=uploaded_file.content_type or 'application/octet-stream',
|
|
local_filename=local_filename,
|
|
size_bytes=len(content),
|
|
status='avatar' if is_avatar else 'pending',
|
|
is_avatar=1 if is_avatar else 0,
|
|
)
|
|
db.add(item)
|
|
db.commit()
|
|
finally:
|
|
db.close()
|
|
|
|
return {'status': 'ok', 'file_id': file_id}
|
|
|
|
|
|
@mediaRouter.post('/receive')
|
|
async def receive_media(
|
|
request: Request,
|
|
file: UploadFile = File(None),
|
|
owner_id: int | None = Form(None),
|
|
cloud_file_id: str | None = Form(None),
|
|
original_filename: str | None = Form(None),
|
|
):
|
|
secret = request.headers.get('X-Media-Forwarding-Secret')
|
|
if secret != config.MEDIA_FORWARDING_SECRET:
|
|
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail='Invalid forwarding secret')
|
|
|
|
uploaded_file = await _get_upload_file(request, file)
|
|
if uploaded_file is None or not uploaded_file.filename:
|
|
raise HTTPException(status_code=400, detail='No selected file')
|
|
|
|
content = await uploaded_file.read()
|
|
if len(content) > config.MEDIA_UPLOAD_MAX_BYTES:
|
|
raise HTTPException(status_code=400, detail=f'File too large (max {config.MEDIA_UPLOAD_MAX_BYTES} bytes)')
|
|
|
|
file_id = cloud_file_id or uuid.uuid4().hex
|
|
storage_filename = f"{file_id}.enc"
|
|
file_path = os.path.join(config.HOME_MEDIA_FOLDER, storage_filename)
|
|
with open(file_path, 'wb') as f:
|
|
f.write(content)
|
|
|
|
db = models.SessionLocal()
|
|
try:
|
|
home_record = models.HomeMediaFile(
|
|
file_id=file_id,
|
|
owner_id=owner_id,
|
|
original_filename=original_filename or uploaded_file.filename,
|
|
content_type=uploaded_file.content_type or 'application/octet-stream',
|
|
storage_filename=storage_filename,
|
|
size_bytes=len(content),
|
|
)
|
|
db.add(home_record)
|
|
db.commit()
|
|
_cleanup_home_quota(db, owner_id)
|
|
finally:
|
|
db.close()
|
|
|
|
return {'status': 'ok', 'file_id': file_id}
|
|
|
|
|
|
@mediaRouter.get('/{file_id}')
|
|
async def get_file(file_id: str):
|
|
local_path = _find_local_media_path(file_id)
|
|
if local_path:
|
|
return FileResponse(local_path, media_type='application/octet-stream')
|
|
|
|
if config.SERVER_ROLE == 'cloud':
|
|
return _stream_response_from_remote(f"{config.HOME_SERVER_URL}/media/{file_id}")
|
|
|
|
raise HTTPException(status_code=404, detail='File not found')
|