import shutil from fastapi import Depends, FastAPI, HTTPException, status, APIRouter, File, UploadFile, Request, Form from fastapi.responses import FileResponse, StreamingResponse from sqlalchemy.orm import Session from sqlalchemy.sql import func from app.core.security import get_current_user from app.db import models from app.core.config import config import os import re import uuid import urllib.request import urllib.error from io import BytesIO import asyncio def _ensure_directory(path: str): if not os.path.exists(path): os.makedirs(path, exist_ok=True) UPLOAD_FOLDER = 'uploads' def _parse_multipart_body(body: bytes): try: if not body.startswith(b"--"): return None boundary, _ = body.split(b"\r\n", 1) parts = body.split(boundary) for part in parts: if not part or part in (b"--", b"--\r\n"): continue part = part.strip(b"\r\n") if not part: continue headers, _, content = part.partition(b"\r\n\r\n") if not headers or content is None: continue disposition_match = re.search( br'Content-Disposition:\s*form-data;\s*name="([^"]+)"(?:;\s*filename="([^"]+)")?', headers, re.IGNORECASE, ) if not disposition_match: continue field_name = disposition_match.group( 1).decode('utf-8', errors='ignore') filename = disposition_match.group(2) if field_name != 'file': continue filename = filename.decode( 'utf-8', errors='ignore') if filename else 'upload.bin' content_type_match = re.search( br'Content-Type:\s*([\w\-\/]+)', headers, re.IGNORECASE) content_type = ( content_type_match.group(1).decode('utf-8', errors='ignore') if content_type_match else 'application/octet-stream' ) return filename, content.rstrip(b'\r\n'), content_type except Exception: return None return None async def _get_upload_file(request: Request, uploaded_file: UploadFile | None): if uploaded_file is not None: return uploaded_file raw_body = await request.body() parsed = _parse_multipart_body(raw_body) if parsed is None: return None filename, content, content_type = parsed return UploadFile(filename=filename, file=BytesIO(content), content_type=content_type) def _encode_multipart_formdata(fields, files): boundary = uuid.uuid4().hex body = BytesIO() for name, value in fields.items(): body.write(f"--{boundary}\r\n".encode('utf-8')) body.write( f'Content-Disposition: form-data; name="{name}"\r\n\r\n'.encode('utf-8')) body.write(str(value).encode('utf-8')) body.write(b"\r\n") for field_name, filename, content_type, file_bytes in files: body.write(f"--{boundary}\r\n".encode('utf-8')) body.write( f'Content-Disposition: form-data; name="{field_name}"; filename="{filename}"\r\n'.encode( 'utf-8') ) body.write(f"Content-Type: {content_type}\r\n\r\n".encode('utf-8')) body.write(file_bytes) body.write(b"\r\n") body.write(f"--{boundary}--\r\n".encode('utf-8')) return body.getvalue(), boundary def _get_cloud_cache_size_bytes(db: Session) -> int: total = db.query(func.sum(models.CloudMediaItem.size_bytes)).filter( models.CloudMediaItem.status.in_(['pending', 'sending']), models.CloudMediaItem.is_avatar == 0, ).scalar() return int(total or 0) def _find_local_media_path(file_id: str) -> str | None: candidates = [ os.path.join(config.CLOUD_MEDIA_CACHE_FOLDER, f"{file_id}.enc"), os.path.join('uploads', f"{file_id}.enc"), os.path.join(config.HOME_MEDIA_FOLDER, f"{file_id}.enc"), ] for path in candidates: if os.path.exists(path): return path return None def _stream_response_from_remote(url: str): try: request = urllib.request.Request(url) response = urllib.request.urlopen(request, timeout=45) except urllib.error.HTTPError as exc: if exc.code == 404: raise HTTPException(status_code=404, detail='File not found') raise HTTPException( status_code=502, detail=f'Error fetching media from home server: {exc.code}') except Exception as exc: raise HTTPException( status_code=502, detail=f'Could not reach home server: {exc}') headers = {k.lower(): v for k, v in response.getheaders()} content_type = headers.get('content-type', 'application/octet-stream') return StreamingResponse( iter(lambda: response.read(8192), b""), media_type=content_type, headers={ 'Content-Disposition': headers.get('content-disposition', f'attachment; filename="{os.path.basename(url)}"') }, ) def _post_file_to_home(item: models.CloudMediaItem) -> tuple[bool, str]: file_path = os.path.join( config.CLOUD_MEDIA_CACHE_FOLDER, item.local_filename) if not os.path.exists(file_path): return False, 'Local cache file not found' with open(file_path, 'rb') as f: content = f.read() fields = { 'owner_id': item.owner_id or '', 'cloud_file_id': item.file_id, 'original_filename': item.original_filename or item.local_filename, } files = [ ('file', item.original_filename or item.local_filename, item.content_type or 'application/octet-stream', content), ] body, boundary = _encode_multipart_formdata(fields, files) request = urllib.request.Request( f"{config.HOME_SERVER_URL}/media/receive", data=body, headers={ 'Content-Type': f'multipart/form-data; boundary={boundary}', 'X-Media-Forwarding-Secret': config.MEDIA_FORWARDING_SECRET, }, ) try: with urllib.request.urlopen(request, timeout=60) as response: if response.status == 200: return True, '' return False, f'Home server returned {response.status}' except urllib.error.HTTPError as exc: body = exc.read().decode(errors='ignore') return False, f'Home server HTTP error {exc.code}: {body}' except Exception as exc: return False, str(exc) def _cleanup_home_quota(db: Session, owner_id: int | None): if owner_id is None: return total = db.query(func.sum(models.HomeMediaFile.size_bytes)).filter( models.HomeMediaFile.owner_id == owner_id ).scalar() or 0 total = int(total) if total <= config.HOME_USER_QUOTA_BYTES: return files = db.query(models.HomeMediaFile).filter( models.HomeMediaFile.owner_id == owner_id ).order_by(models.HomeMediaFile.created_at.asc()).all() for file_record in files: if total <= config.HOME_USER_QUOTA_BYTES: break path = os.path.join(config.HOME_MEDIA_FOLDER, file_record.storage_filename) if os.path.exists(path): os.remove(path) total -= file_record.size_bytes db.delete(file_record) db.commit() def _cleanup_all_home_storage(): db = models.SessionLocal() try: owner_ids = db.query(models.HomeMediaFile.owner_id).filter( models.HomeMediaFile.owner_id.isnot(None)).distinct().all() for owner_id_tuple in owner_ids: _cleanup_home_quota(db, owner_id_tuple[0]) finally: db.close() async def forward_pending_media_loop(): while True: if config.SERVER_ROLE != 'cloud': await asyncio.sleep(10) continue db = models.SessionLocal() try: total_cache = _get_cloud_cache_size_bytes(db) if total_cache >= config.CLOUD_CACHE_MAX_BYTES: await asyncio.sleep(config.MEDIA_FORWARD_INTERVAL_SECONDS) continue pending_items = db.query(models.CloudMediaItem).filter( models.CloudMediaItem.status == 'pending', models.CloudMediaItem.is_avatar == 0, ).order_by(models.CloudMediaItem.created_at.asc()).limit(5).all() for item in pending_items: item.status = 'sending' item.attempts += 1 db.commit() success, error = _post_file_to_home(item) if success: item.status = 'sent' item.sent_at = func.now() item.error_message = None db.commit() cache_path = os.path.join( config.CLOUD_MEDIA_CACHE_FOLDER, item.local_filename) if os.path.exists(cache_path): os.remove(cache_path) else: item.status = 'failed' item.error_message = error db.commit() except Exception: pass finally: db.close() await asyncio.sleep(config.MEDIA_FORWARD_INTERVAL_SECONDS) async def home_storage_maintenance_loop(): while True: if config.SERVER_ROLE != 'home': await asyncio.sleep(10) continue _cleanup_all_home_storage() await asyncio.sleep(600) mediaRouter = APIRouter( prefix='/media', tags=['media'], ) _ensure_directory(UPLOAD_FOLDER) _ensure_directory(config.CLOUD_MEDIA_CACHE_FOLDER) _ensure_directory(config.HOME_MEDIA_FOLDER) @mediaRouter.post('/upload') async def upload_file( request: Request, file: UploadFile = File(None), ): uploaded_file = await _get_upload_file(request, file) if uploaded_file is None or not uploaded_file.filename: raise HTTPException(status_code=400, detail='No selected file') content = await uploaded_file.read() if len(content) > config.MEDIA_UPLOAD_MAX_BYTES: raise HTTPException( status_code=400, detail=f'File too large (max {config.MEDIA_UPLOAD_MAX_BYTES} bytes)') file_id = uuid.uuid4().hex filename = f"{file_id}.enc" file_path = os.path.join(UPLOAD_FOLDER, filename) with open(file_path, 'wb') as f: f.write(content) return { 'status': 'ok', 'file_id': file_id, } @mediaRouter.post('/v2/upload') async def upload_file_v2( request: Request, file: UploadFile = File(None), purpose: str = Form('media'), current_user: models.User = Depends(get_current_user), ): if config.SERVER_ROLE != 'cloud': raise HTTPException(status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail='Upload endpoint is available only on cloud server') uploaded_file = await _get_upload_file(request, file) if uploaded_file is None or not uploaded_file.filename: raise HTTPException(status_code=400, detail='No selected file') content = await uploaded_file.read() if len(content) > config.MEDIA_UPLOAD_MAX_BYTES: raise HTTPException( status_code=400, detail=f'File too large (max {config.MEDIA_UPLOAD_MAX_BYTES} bytes)') db = models.SessionLocal() try: cache_size = _get_cloud_cache_size_bytes(db) is_avatar = purpose == 'avatar' if cache_size >= config.CLOUD_CACHE_MAX_BYTES and not is_avatar: raise HTTPException( status_code=503, detail='Cloud media cache is full; new uploads are temporarily paused until pending files are forwarded.', ) file_id = uuid.uuid4().hex local_filename = f"{file_id}.enc" storage_path = os.path.join( config.CLOUD_MEDIA_CACHE_FOLDER, local_filename) with open(storage_path, 'wb') as f: f.write(content) item = models.CloudMediaItem( file_id=file_id, owner_id=current_user.id, original_filename=uploaded_file.filename, content_type=uploaded_file.content_type or 'application/octet-stream', local_filename=local_filename, size_bytes=len(content), status='avatar' if is_avatar else 'pending', is_avatar=1 if is_avatar else 0, ) db.add(item) db.commit() finally: db.close() return {'status': 'ok', 'file_id': file_id} @mediaRouter.post('/receive') async def receive_media( request: Request, file: UploadFile = File(None), owner_id: int | None = Form(None), cloud_file_id: str | None = Form(None), original_filename: str | None = Form(None), ): secret = request.headers.get('X-Media-Forwarding-Secret') if secret != config.MEDIA_FORWARDING_SECRET: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail='Invalid forwarding secret') uploaded_file = await _get_upload_file(request, file) if uploaded_file is None or not uploaded_file.filename: raise HTTPException(status_code=400, detail='No selected file') content = await uploaded_file.read() if len(content) > config.MEDIA_UPLOAD_MAX_BYTES: raise HTTPException( status_code=400, detail=f'File too large (max {config.MEDIA_UPLOAD_MAX_BYTES} bytes)') file_id = cloud_file_id or uuid.uuid4().hex storage_filename = f"{file_id}.enc" file_path = os.path.join(config.HOME_MEDIA_FOLDER, storage_filename) with open(file_path, 'wb') as f: f.write(content) db = models.SessionLocal() try: home_record = models.HomeMediaFile( file_id=file_id, owner_id=owner_id, original_filename=original_filename or uploaded_file.filename, content_type=uploaded_file.content_type or 'application/octet-stream', storage_filename=storage_filename, size_bytes=len(content), ) db.add(home_record) db.commit() _cleanup_home_quota(db, owner_id) finally: db.close() return {'status': 'ok', 'file_id': file_id} @mediaRouter.get('/size/{file_id}') async def get_file_size(file_id: str): db = models.SessionLocal() db_file = None try: db_file = db.query(models.HomeMediaFile).filter( models.HomeMediaFile.file_id == file_id).first() finally: db.close() # 1. Проверяем наличие файла локально на этом сервере local_path = _find_local_media_path(file_id) if local_path and os.path.exists(local_path): file_size = os.path.getsize(local_path) filename = db_file.original_filename if db_file else f"file_{file_id}" content_type = db_file.content_type if db_file else 'application/octet-stream' encoded_filename = urllib.parse.quote(filename) return {"file_id": file_id, "size": file_size, "file_name": encoded_filename, "content_type": content_type} # 2. Если роль сервера 'cloud', запрашиваем размер у домашнего сервера if config.SERVER_ROLE == 'cloud': remote_url = f"{config.HOME_SERVER_URL}/media/size/{file_id}" try: # Выполняем синхронный легковесный подзапрос к домашнему серверу в треде, # чтобы не блокировать асинхронный цикл FastAPI (по аналогии с деплоем стримов) def _fetch_remote_size(): req = urllib.request.Request(remote_url, method='GET') with urllib.request.urlopen(req, timeout=5.0) as response: if response.status == 200: import json return json.loads(response.read().decode('utf-8')) return None remote_data = await asyncio.to_thread(_fetch_remote_size) if remote_data: return remote_data except urllib.error.HTTPError as e: if e.code == 404: raise HTTPException( status_code=404, detail='File not found on home server') raise HTTPException(status_code=e.code, detail='Home server error') except Exception as e: print(f"Ошибка подключения к домашнему серверу: {e}") raise HTTPException( status_code=502, detail='Home server is unavailable') # 3. Если файл не найден ни локально, ни на удаленном сервере raise HTTPException(status_code=404, detail='File not found') @mediaRouter.get('/{file_id}') async def get_file(file_id: str): db = models.SessionLocal() db_file = None try: db_file = db.query(models.HomeMediaFile).filter( models.HomeMediaFile.file_id == file_id).first() finally: db.close() local_path = _find_local_media_path(file_id) if local_path: filename = db_file.original_filename if db_file else f"file_{file_id}" content_type = db_file.content_type if db_file else 'application/octet-stream' encoded_filename = urllib.parse.quote(filename) headers = { "Content-Disposition": f"attachment; filename*=UTF-8''{encoded_filename}" } return FileResponse( local_path, media_type=content_type, headers=headers ) if config.SERVER_ROLE == 'cloud': return _stream_response_from_remote(f"{config.HOME_SERVER_URL}/media/{file_id}") raise HTTPException(status_code=404, detail='File not found') @mediaRouter.post('/copy_internal') async def copy_file_internal( request: Request, file_id: str = Form(...), owner_id: int = Form(...), # ID нового владельца (получателя) ): # Проверка секрета secret = request.headers.get('X-Media-Forwarding-Secret') if secret != config.MEDIA_FORWARDING_SECRET: raise HTTPException(status_code=401, detail='Unauthorized') # 1. Находим файл source_path = _find_local_media_path(file_id) if not source_path: raise HTTPException(status_code=404, detail='Source file not found') # 2. Создаем новый ID и путь new_file_id = uuid.uuid4().hex new_storage_filename = f"{new_file_id}.enc" dest_path = os.path.join(config.HOME_MEDIA_FOLDER, new_storage_filename) # 3. Физическое копирование shutil.copyfile(source_path, dest_path) # 4. Обновляем БД db = models.SessionLocal() try: old_record = db.query(models.HomeMediaFile).filter( models.HomeMediaFile.file_id == file_id).first() new_record = models.HomeMediaFile( file_id=new_file_id, owner_id=owner_id, original_filename=old_record.original_filename if old_record else "copy.enc", content_type=old_record.content_type if old_record else 'application/octet-stream', storage_filename=new_storage_filename, size_bytes=os.path.getsize(dest_path), ) db.add(new_record) db.commit() finally: db.close() return {"status": "ok", "new_file_id": new_file_id} @mediaRouter.post('/copy') async def copy( file_id: str = Form(...), current_user: models.User = Depends(get_current_user), ): if config.SERVER_ROLE != 'cloud': raise HTTPException(status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail='Upload endpoint is available only on cloud server') # Делаем запрос к домашнему серверу url = f"{config.HOME_SERVER_URL}/media/copy_internal" # Используем FormData для передачи параметров на домашний сервер body_data = f"file_id={file_id}&owner_id={current_user.id}".encode('utf-8') request = urllib.request.Request( url, data=body_data, headers={ 'X-Media-Forwarding-Secret': config.MEDIA_FORWARDING_SECRET, 'Content-Type': 'application/x-www-form-urlencoded' }, method='POST' ) try: with urllib.request.urlopen(request, timeout=10) as response: if response.status == 200: import json return json.loads(response.read().decode('utf-8')) except Exception as e: raise HTTPException( status_code=502, detail=f'Failed to copy on home server: {e}') raise HTTPException(status_code=500, detail='Copying failed')