Chepuhagram/srv/app/websocket/connection_manager.py

66 lines
2.1 KiB
Python

from fastapi import HTTPException, status, APIRouter, WebSocket, WebSocketDisconnect, Query
from app.core.security import test_token
from typing import Dict
from datetime import datetime
wsRouter = APIRouter(
prefix="/ws",
tags=[],
)
@wsRouter.websocket("")
async def websocket_endpoint(websocket: WebSocket, token: str = Query(None)):
if token is None:
await websocket.close(code=status.WS_1008_POLICY_VIOLATION)
return
try:
user_id = await test_token(token=token)
except HTTPException:
await websocket.close(code=status.WS_1008_POLICY_VIOLATION)
return
await manager.connect(user_id, websocket)
try:
while True:
data = await websocket.receive_json()
receiver_id = str(data.get("receiver_id"))
message_to_send = {
"sender_id": user_id,
"text": data.get("text"),
"created_at": datetime.now().isoformat()
}
await manager.send_personal_message(message_to_send, receiver_id)
await manager.send_personal_message(message_to_send, user_id)
except WebSocketDisconnect:
pass
finally:
manager.disconnect(user_id)
class ConnectionManager:
def __init__(self):
# Храним активные соединения: {user_id: websocket}
self.active_connections: Dict[str, WebSocket] = {}
async def connect(self, user_id: str, websocket: WebSocket):
await websocket.accept()
self.active_connections[user_id] = websocket
def disconnect(self, user_id: str):
if user_id in self.active_connections:
del self.active_connections[user_id]
async def send_personal_message(self, message: dict, user_id: str):
if user_id in self.active_connections:
await self.active_connections[user_id].send_json(message)
async def broadcast(self, message: dict):
# Рассылка вообще всем (например, системное уведомление)
for connection in self.active_connections.values():
await connection.send_json(message)
manager = ConnectionManager()