from fastapi import HTTPException, status, APIRouter, WebSocket, WebSocketDisconnect, Query, Depends from app.core.security import test_token from typing import Dict from datetime import datetime, timezone import json from sqlalchemy.orm import Session from app.db import models from firebase_admin import messaging, credentials, exceptions import firebase_admin from app.core.config import config cred = credentials.Certificate(config.FIREBASE_CREDENTIALS_PATH) firebase_admin.initialize_app(cred) # бд def get_db(): db = models.SessionLocal() try: yield db finally: db.close() wsRouter = APIRouter( prefix='/ws' ) @wsRouter.websocket("") async def websocket_endpoint(websocket: WebSocket, token: str = Query(None), db: Session = Depends(get_db)): if token is None: await websocket.close(code=status.WS_1008_POLICY_VIOLATION) return try: user_id = int(await test_token(token=token)) except HTTPException: await websocket.close(code=status.WS_1008_POLICY_VIOLATION) return print("ПОДКЛЮЧЕНИЕ") await manager.connect(user_id, websocket) print("ПОДКЛЮЧЕНО") db.query(models.User).filter(models.User.id == user_id).update({"last_online": datetime.now(timezone.utc)}, synchronize_session="fetch") db.commit() await manager.broadcast({ "type": "user_online", "user_id": user_id, }) try: while True: print("ОЖИДАНИЕ СООБЩЕНИЙ") data = await websocket.receive_text() message_data = json.loads(data) print(f"DEBUG: Получены данные: {message_data}") db.query(models.User).filter(models.User.id == user_id).update({"last_online": datetime.now(timezone.utc)}, synchronize_session="fetch") db.commit() if message_data.get("type") == "private_message": user = db.query(models.User).filter( models.User.id == user_id).first() receiver_id = message_data.get("receiver_id") temp_id = message_data.get("temp_id") content = message_data.get("content") content50 = message_data.get("content50") message_type = message_data.get("message_type") or "text" file_id = message_data.get("file_id") encrypted_key = message_data.get("encrypted_key") print( f"DEBUG private_message payload: temp_id={temp_id}, receiver_id={receiver_id}, message_type={message_type}, file_id={file_id}, encrypted_key_present={encrypted_key is not None}", ) if receiver_id is None or content is None: await websocket.send_json({ "type": "error", "detail": "receiver_id/content required", }) continue try: receiver_id = int(receiver_id) except (TypeError, ValueError): await websocket.send_json({ "type": "error", "detail": "receiver_id must be int", }) continue new_msg = models.Message( sender_id=user_id, receiver_id=receiver_id, content=content, message_type=message_type, file_id=file_id, encrypted_key=encrypted_key, reply_to_id=message_data.get("reply_to_id"), reply_to_text=message_data.get("reply_to_text") ) db.add(new_msg) db.commit() db.refresh(new_msg) print( f"DEBUG saved message: id={new_msg.id}, sender={new_msg.sender_id}, receiver={new_msg.receiver_id}, message_type={new_msg.message_type}, file_id={new_msg.file_id}, encrypted_key_present={new_msg.encrypted_key is not None}", ) # ACK отправителю: сервер принял и сохранил сообщение. await manager.send_personal_message({ "type": "message_sent", "temp_id": temp_id, "server_id": new_msg.id, "timestamp": (new_msg.timestamp or datetime.utcnow()).isoformat(), }, str(user_id)) # отправим пуш. if user.public_key: receiver = db.query(models.User).filter( models.User.id == receiver_id).first() if receiver.fcm_token: send_fcm_notification( receiver.fcm_token, user_id, user.first_name, user.public_key, content50 if content50 else content, datetime.utcnow(), unread_count=db.query(models.Message).filter( models.Message.receiver_id == receiver_id, models.Message.read_at == None ).count(), message_id=new_msg.id, ) # Формируем пакет для получателя outgoing_message = { "id": new_msg.id, "type": "private_message", "sender_id": user_id, "receiver_id": receiver_id, "content": message_data.get("content"), "message_type": message_type, "file_id": file_id, "encrypted_key": message_data.get("encrypted_key"), "timestamp": (new_msg.timestamp or datetime.utcnow()).isoformat(), "reply_to_id": new_msg.reply_to_id, "reply_to_text": new_msg.reply_to_text, } print( f"DEBUG outgoing_message: id={outgoing_message['id']}, receiver_id={outgoing_message['receiver_id']}, file_id={outgoing_message['file_id']}, encrypted_key_present={outgoing_message['encrypted_key'] is not None}", ) # Пересылаем получателю, если он в сети sent_to_receiver = await manager.send_personal_message(outgoing_message, str(receiver_id)) print(f"DEBUG send_personal_message returned: {sent_to_receiver}") # Если сообщение реально ушло по сокету получателю — отмечаем delivered_at. if sent_to_receiver: try: delivered_at = datetime.utcnow() new_msg.delivered_at = delivered_at db.add(new_msg) db.commit() await manager.send_personal_message({ "type": "message_delivered", "message_id": new_msg.id, "timestamp": delivered_at.isoformat(), }, str(user_id)) except Exception: db.rollback() elif message_data.get("type") == "edit_message": message_id = message_data.get("message_id") content = message_data.get("content") if message_id is None or content is None: await websocket.send_json({ "type": "error", "detail": "message_id/content required", }) continue try: message_id = int(message_id) except (TypeError, ValueError): await websocket.send_json({ "type": "error", "detail": "message_id must be int", }) continue msg = db.query(models.Message).filter( models.Message.id == message_id).first() if msg is None or msg.sender_id != user_id: continue try: msg.content = content msg.edited_at = datetime.utcnow() db.add(msg) db.commit() except Exception: db.rollback() continue event = { "type": "message_edited", "message_id": msg.id, "sender_id": msg.sender_id, "content": msg.content, "edited_at": msg.edited_at.isoformat() if msg.edited_at else None, } await manager.send_personal_message(event, str(msg.receiver_id)) await manager.send_personal_message(event, str(msg.sender_id)) elif message_data.get("type") == "delete_message": message_id = message_data.get("message_id") if message_id is None: await websocket.send_json({ "type": "error", "detail": "message_id required", }) continue try: message_id = int(message_id) except (TypeError, ValueError): await websocket.send_json({ "type": "error", "detail": "message_id must be int", }) continue msg = db.query(models.Message).filter( models.Message.id == message_id).first() if msg is None or msg.sender_id != user_id: continue receiver_id = msg.receiver_id try: db.delete(msg) db.commit() except Exception: db.rollback() continue event = { "type": "message_deleted", "message_id": message_id, } await manager.send_personal_message(event, str(receiver_id)) await manager.send_personal_message(event, str(user_id)) elif message_data.get("type") == "read_receipt": message_id = message_data.get("message_id") try: message_id = int(message_id) except (TypeError, ValueError): continue msg = db.query(models.Message).filter( models.Message.id == message_id).first() if msg is None: continue # Безопасность: read_receipt может отправлять только получатель. if int(msg.receiver_id) != int(user_id): continue # Сохраняем read_at в БД try: read_at = datetime.utcnow() msg.read_at = read_at db.add(msg) db.commit() except Exception: db.rollback() sender_id = int(msg.sender_id) await manager.send_personal_message({ "type": "message_read", "message_id": message_id, "timestamp": read_at.isoformat() if 'read_at' in locals() else datetime.utcnow().isoformat(), }, str(sender_id)) elif message_data.get("type") == "typing": receiver_id = message_data.get("receiver_id") if receiver_id is None: continue try: receiver_id = int(receiver_id) except (TypeError, ValueError): continue await manager.send_personal_message({ "type": "typing", "sender_id": user_id, }, str(receiver_id)) elif message_data.get("type") == "stop_typing": receiver_id = message_data.get("receiver_id") if receiver_id is None: continue try: receiver_id = int(receiver_id) except (TypeError, ValueError): continue await manager.send_personal_message({ "type": "stop_typing", "sender_id": user_id, }, str(receiver_id)) except WebSocketDisconnect: pass finally: manager.disconnect(user_id) db.query(models.User).filter(models.User.id == user_id).update( {"last_online": datetime.now(timezone.utc)}, synchronize_session="fetch") db.commit() print("ОТКЛЮЧЕНИЕ") await manager.broadcast({ "type": "user_offline", "user_id": user_id, }) def send_fcm_notification(token, user_id, username, public_key, encrypted_text, timestamp, unread_count='1', message_id='0'): print( f"DEBUG: Отправляем FCM уведомление пользователю {user_id} с токеном {token}") message = messaging.Message( data={ "type": "enc_message", "sender_id": str(user_id), "username": username, "public_key": public_key, "content": encrypted_text, # Зашифрованный текст "timestamp": timestamp.isoformat(), "unread_count": str(unread_count), "message_id": str(message_id), }, android=messaging.AndroidConfig( priority='high', ), token=token, ) try: response = messaging.send(message) print('Successfully sent message:', response) except Exception as e: print('Unexpected error sending push:', e) class ConnectionManager: def __init__(self): # Храним активные соединения: {user_id: websocket} self.active_connections: Dict[str, WebSocket] = {} async def connect(self, user_id: int, websocket: WebSocket): await websocket.accept() self.active_connections[str(user_id)] = websocket def disconnect(self, user_id: int): key = str(user_id) if key in self.active_connections: del self.active_connections[key] async def send_personal_message(self, message: dict, user_id: str) -> bool: if str(user_id) in self.active_connections: try: await self.active_connections[str(user_id)].send_json(message) print('Sent to socket') return True except Exception as e: print(f'Failed to send to socket: {e}') return False else: print('User not active') return False async def broadcast(self, message: dict): # Рассылка вообще всем (например, системное уведомление) for connection in self.active_connections.values(): await connection.send_json(message) manager = ConnectionManager()