from fastapi import Depends, FastAPI, HTTPException, status, APIRouter, File, UploadFile, Request, Form from fastapi.responses import FileResponse, StreamingResponse from sqlalchemy.orm import Session from sqlalchemy.sql import func from app.core.security import get_current_user from app.db import models from app.core.config import config import os import re import uuid import urllib.request import urllib.error from io import BytesIO import asyncio def _ensure_directory(path: str): if not os.path.exists(path): os.makedirs(path, exist_ok=True) UPLOAD_FOLDER = 'uploads' def _parse_multipart_body(body: bytes): try: if not body.startswith(b"--"): return None boundary, _ = body.split(b"\r\n", 1) parts = body.split(boundary) for part in parts: if not part or part in (b"--", b"--\r\n"): continue part = part.strip(b"\r\n") if not part: continue headers, _, content = part.partition(b"\r\n\r\n") if not headers or content is None: continue disposition_match = re.search( br'Content-Disposition:\s*form-data;\s*name="([^"]+)"(?:;\s*filename="([^"]+)")?', headers, re.IGNORECASE, ) if not disposition_match: continue field_name = disposition_match.group(1).decode('utf-8', errors='ignore') filename = disposition_match.group(2) if field_name != 'file': continue filename = filename.decode('utf-8', errors='ignore') if filename else 'upload.bin' content_type_match = re.search(br'Content-Type:\s*([\w\-\/]+)', headers, re.IGNORECASE) content_type = ( content_type_match.group(1).decode('utf-8', errors='ignore') if content_type_match else 'application/octet-stream' ) return filename, content.rstrip(b'\r\n'), content_type except Exception: return None return None async def _get_upload_file(request: Request, uploaded_file: UploadFile | None): if uploaded_file is not None: return uploaded_file raw_body = await request.body() parsed = _parse_multipart_body(raw_body) if parsed is None: return None filename, content, content_type = parsed return UploadFile(filename=filename, file=BytesIO(content), content_type=content_type) def _encode_multipart_formdata(fields, files): boundary = uuid.uuid4().hex body = BytesIO() for name, value in fields.items(): body.write(f"--{boundary}\r\n".encode('utf-8')) body.write(f'Content-Disposition: form-data; name="{name}"\r\n\r\n'.encode('utf-8')) body.write(str(value).encode('utf-8')) body.write(b"\r\n") for field_name, filename, content_type, file_bytes in files: body.write(f"--{boundary}\r\n".encode('utf-8')) body.write( f'Content-Disposition: form-data; name="{field_name}"; filename="{filename}"\r\n'.encode('utf-8') ) body.write(f"Content-Type: {content_type}\r\n\r\n".encode('utf-8')) body.write(file_bytes) body.write(b"\r\n") body.write(f"--{boundary}--\r\n".encode('utf-8')) return body.getvalue(), boundary def _get_cloud_cache_size_bytes(db: Session) -> int: total = db.query(func.sum(models.CloudMediaItem.size_bytes)).filter( models.CloudMediaItem.status.in_(['pending', 'sending']), models.CloudMediaItem.is_avatar == 0, ).scalar() return int(total or 0) def _find_local_media_path(file_id: str) -> str | None: candidates = [ os.path.join(config.CLOUD_MEDIA_CACHE_FOLDER, f"{file_id}.enc"), os.path.join('uploads', f"{file_id}.enc"), os.path.join(config.HOME_MEDIA_FOLDER, f"{file_id}.enc"), ] for path in candidates: if os.path.exists(path): return path return None def _stream_response_from_remote(url: str): try: request = urllib.request.Request(url) response = urllib.request.urlopen(request, timeout=45) except urllib.error.HTTPError as exc: if exc.code == 404: raise HTTPException(status_code=404, detail='File not found') raise HTTPException(status_code=502, detail=f'Error fetching media from home server: {exc.code}') except Exception as exc: raise HTTPException(status_code=502, detail=f'Could not reach home server: {exc}') headers = {k.lower(): v for k, v in response.getheaders()} content_type = headers.get('content-type', 'application/octet-stream') return StreamingResponse( iter(lambda: response.read(8192), b""), media_type=content_type, headers={ 'Content-Disposition': headers.get('content-disposition', f'attachment; filename="{os.path.basename(url)}"') }, ) def _post_file_to_home(item: models.CloudMediaItem) -> tuple[bool, str]: file_path = os.path.join(config.CLOUD_MEDIA_CACHE_FOLDER, item.local_filename) if not os.path.exists(file_path): return False, 'Local cache file not found' with open(file_path, 'rb') as f: content = f.read() fields = { 'owner_id': item.owner_id or '', 'cloud_file_id': item.file_id, 'original_filename': item.original_filename or item.local_filename, } files = [ ('file', item.original_filename or item.local_filename, item.content_type or 'application/octet-stream', content), ] body, boundary = _encode_multipart_formdata(fields, files) request = urllib.request.Request( f"{config.HOME_SERVER_URL}/media/receive", data=body, headers={ 'Content-Type': f'multipart/form-data; boundary={boundary}', 'X-Media-Forwarding-Secret': config.MEDIA_FORWARDING_SECRET, }, ) try: with urllib.request.urlopen(request, timeout=60) as response: if response.status == 200: return True, '' return False, f'Home server returned {response.status}' except urllib.error.HTTPError as exc: body = exc.read().decode(errors='ignore') return False, f'Home server HTTP error {exc.code}: {body}' except Exception as exc: return False, str(exc) def _cleanup_home_quota(db: Session, owner_id: int | None): if owner_id is None: return total = db.query(func.sum(models.HomeMediaFile.size_bytes)).filter( models.HomeMediaFile.owner_id == owner_id ).scalar() or 0 total = int(total) if total <= config.HOME_USER_QUOTA_BYTES: return files = db.query(models.HomeMediaFile).filter( models.HomeMediaFile.owner_id == owner_id ).order_by(models.HomeMediaFile.created_at.asc()).all() for file_record in files: if total <= config.HOME_USER_QUOTA_BYTES: break path = os.path.join(config.HOME_MEDIA_FOLDER, file_record.storage_filename) if os.path.exists(path): os.remove(path) total -= file_record.size_bytes db.delete(file_record) db.commit() def _cleanup_all_home_storage(): db = models.SessionLocal() try: owner_ids = db.query(models.HomeMediaFile.owner_id).filter(models.HomeMediaFile.owner_id.isnot(None)).distinct().all() for owner_id_tuple in owner_ids: _cleanup_home_quota(db, owner_id_tuple[0]) finally: db.close() async def forward_pending_media_loop(): while True: if config.SERVER_ROLE != 'cloud': await asyncio.sleep(10) continue db = models.SessionLocal() try: total_cache = _get_cloud_cache_size_bytes(db) if total_cache >= config.CLOUD_CACHE_MAX_BYTES: await asyncio.sleep(config.MEDIA_FORWARD_INTERVAL_SECONDS) continue pending_items = db.query(models.CloudMediaItem).filter( models.CloudMediaItem.status == 'pending', models.CloudMediaItem.is_avatar == 0, ).order_by(models.CloudMediaItem.created_at.asc()).limit(5).all() for item in pending_items: item.status = 'sending' item.attempts += 1 db.commit() success, error = _post_file_to_home(item) if success: item.status = 'sent' item.sent_at = func.now() item.error_message = None db.commit() cache_path = os.path.join(config.CLOUD_MEDIA_CACHE_FOLDER, item.local_filename) if os.path.exists(cache_path): os.remove(cache_path) else: item.status = 'failed' item.error_message = error db.commit() except Exception: pass finally: db.close() await asyncio.sleep(config.MEDIA_FORWARD_INTERVAL_SECONDS) async def home_storage_maintenance_loop(): while True: if config.SERVER_ROLE != 'home': await asyncio.sleep(10) continue _cleanup_all_home_storage() await asyncio.sleep(600) mediaRouter = APIRouter( prefix='/media', tags=['media'], ) _ensure_directory(UPLOAD_FOLDER) _ensure_directory(config.CLOUD_MEDIA_CACHE_FOLDER) _ensure_directory(config.HOME_MEDIA_FOLDER) @mediaRouter.post('/upload') async def upload_file( request: Request, file: UploadFile = File(None), ): uploaded_file = await _get_upload_file(request, file) if uploaded_file is None or not uploaded_file.filename: raise HTTPException(status_code=400, detail='No selected file') content = await uploaded_file.read() if len(content) > config.MEDIA_UPLOAD_MAX_BYTES: raise HTTPException(status_code=400, detail=f'File too large (max {config.MEDIA_UPLOAD_MAX_BYTES} bytes)') file_id = uuid.uuid4().hex filename = f"{file_id}.enc" file_path = os.path.join(UPLOAD_FOLDER, filename) with open(file_path, 'wb') as f: f.write(content) return { 'status': 'ok', 'file_id': file_id, } @mediaRouter.post('/v2/upload') async def upload_file_v2( request: Request, file: UploadFile = File(None), purpose: str = Form('media'), current_user: models.User = Depends(get_current_user), ): if config.SERVER_ROLE != 'cloud': raise HTTPException(status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail='Upload endpoint is available only on cloud server') uploaded_file = await _get_upload_file(request, file) if uploaded_file is None or not uploaded_file.filename: raise HTTPException(status_code=400, detail='No selected file') content = await uploaded_file.read() if len(content) > config.MEDIA_UPLOAD_MAX_BYTES: raise HTTPException(status_code=400, detail=f'File too large (max {config.MEDIA_UPLOAD_MAX_BYTES} bytes)') db = models.SessionLocal() try: cache_size = _get_cloud_cache_size_bytes(db) is_avatar = purpose == 'avatar' if cache_size >= config.CLOUD_CACHE_MAX_BYTES and not is_avatar: raise HTTPException( status_code=503, detail='Cloud media cache is full; new uploads are temporarily paused until pending files are forwarded.', ) file_id = uuid.uuid4().hex local_filename = f"{file_id}.enc" storage_path = os.path.join(config.CLOUD_MEDIA_CACHE_FOLDER, local_filename) with open(storage_path, 'wb') as f: f.write(content) item = models.CloudMediaItem( file_id=file_id, owner_id=current_user.id, original_filename=uploaded_file.filename, content_type=uploaded_file.content_type or 'application/octet-stream', local_filename=local_filename, size_bytes=len(content), status='avatar' if is_avatar else 'pending', is_avatar=1 if is_avatar else 0, ) db.add(item) db.commit() finally: db.close() return {'status': 'ok', 'file_id': file_id} @mediaRouter.post('/receive') async def receive_media( request: Request, file: UploadFile = File(None), owner_id: int | None = Form(None), cloud_file_id: str | None = Form(None), original_filename: str | None = Form(None), ): secret = request.headers.get('X-Media-Forwarding-Secret') if secret != config.MEDIA_FORWARDING_SECRET: raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail='Invalid forwarding secret') uploaded_file = await _get_upload_file(request, file) if uploaded_file is None or not uploaded_file.filename: raise HTTPException(status_code=400, detail='No selected file') content = await uploaded_file.read() if len(content) > config.MEDIA_UPLOAD_MAX_BYTES: raise HTTPException(status_code=400, detail=f'File too large (max {config.MEDIA_UPLOAD_MAX_BYTES} bytes)') file_id = cloud_file_id or uuid.uuid4().hex storage_filename = f"{file_id}.enc" file_path = os.path.join(config.HOME_MEDIA_FOLDER, storage_filename) with open(file_path, 'wb') as f: f.write(content) db = models.SessionLocal() try: home_record = models.HomeMediaFile( file_id=file_id, owner_id=owner_id, original_filename=original_filename or uploaded_file.filename, content_type=uploaded_file.content_type or 'application/octet-stream', storage_filename=storage_filename, size_bytes=len(content), ) db.add(home_record) db.commit() _cleanup_home_quota(db, owner_id) finally: db.close() return {'status': 'ok', 'file_id': file_id} @mediaRouter.get('/{file_id}') async def get_file(file_id: str): local_path = _find_local_media_path(file_id) if local_path: return FileResponse(local_path, media_type='application/octet-stream') if config.SERVER_ROLE == 'cloud': return _stream_response_from_remote(f"{config.HOME_SERVER_URL}/media/{file_id}") raise HTTPException(status_code=404, detail='File not found')