Chepuhagram/srv/app/websocket/connection_manager.py

372 lines
15 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from fastapi import HTTPException, status, APIRouter, WebSocket, WebSocketDisconnect, Query, Depends
from app.core.security import test_token
from typing import Dict
from datetime import datetime, timezone
import json
from sqlalchemy.orm import Session
from app.db import models
from firebase_admin import messaging, credentials, exceptions
import firebase_admin
from app.core.config import config
cred = credentials.Certificate(config.FIREBASE_CREDENTIALS_PATH)
firebase_admin.initialize_app(cred)
# бд
def get_db():
db = models.SessionLocal()
try:
yield db
finally:
db.close()
wsRouter = APIRouter(
prefix='/ws'
)
@wsRouter.websocket("")
async def websocket_endpoint(websocket: WebSocket, token: str = Query(None), db: Session = Depends(get_db)):
if token is None:
await websocket.close(code=status.WS_1008_POLICY_VIOLATION)
return
try:
user_id = int(await test_token(token=token))
except HTTPException:
await websocket.close(code=status.WS_1008_POLICY_VIOLATION)
return
print("ПОДКЛЮЧЕНИЕ")
await manager.connect(user_id, websocket)
print("ПОДКЛЮЧЕНО")
db.query(models.User).filter(models.User.id == user_id).update({"last_online": datetime.now(timezone.utc)},
synchronize_session="fetch")
db.commit()
await manager.broadcast({
"type": "user_online",
"user_id": user_id,
})
try:
while True:
print("ОЖИДАНИЕ СООБЩЕНИЙ")
data = await websocket.receive_text()
message_data = json.loads(data)
print(f"DEBUG: Получены данные: {message_data}")
db.query(models.User).filter(models.User.id == user_id).update({"last_online": datetime.now(timezone.utc)},
synchronize_session="fetch")
db.commit()
if message_data.get("type") == "private_message":
user = db.query(models.User).filter(
models.User.id == user_id).first()
receiver_id = message_data.get("receiver_id")
temp_id = message_data.get("temp_id")
content = message_data.get("content")
content50 = message_data.get("content50")
message_type = message_data.get("message_type") or "text"
file_id = message_data.get("file_id")
encrypted_key = message_data.get("encrypted_key")
print(
f"DEBUG private_message payload: temp_id={temp_id}, receiver_id={receiver_id}, message_type={message_type}, file_id={file_id}, encrypted_key_present={encrypted_key is not None}",
)
if receiver_id is None or content is None:
await websocket.send_json({
"type": "error",
"detail": "receiver_id/content required",
})
continue
try:
receiver_id = int(receiver_id)
except (TypeError, ValueError):
await websocket.send_json({
"type": "error",
"detail": "receiver_id must be int",
})
continue
new_msg = models.Message(
sender_id=user_id,
receiver_id=receiver_id,
content=content,
message_type=message_type,
file_id=file_id,
encrypted_key=encrypted_key,
reply_to_id=message_data.get("reply_to_id"),
reply_to_text=message_data.get("reply_to_text")
)
db.add(new_msg)
db.commit()
db.refresh(new_msg)
print(
f"DEBUG saved message: id={new_msg.id}, sender={new_msg.sender_id}, receiver={new_msg.receiver_id}, message_type={new_msg.message_type}, file_id={new_msg.file_id}, encrypted_key_present={new_msg.encrypted_key is not None}",
)
# ACK отправителю: сервер принял и сохранил сообщение (нужно для статусов клиента).
await manager.send_personal_message({
"type": "message_sent",
"temp_id": temp_id,
"server_id": new_msg.id,
"timestamp": (new_msg.timestamp or datetime.now()).isoformat(),
}, str(user_id))
# Если получатель оффлайн — отправим пуш (если есть токен и ключи).
if user.public_key:
receiver = db.query(models.User).filter(
models.User.id == receiver_id).first()
if receiver.fcm_token:
send_fcm_notification(
receiver.fcm_token,
user_id,
user.first_name,
user.public_key,
content50 if content50 else content,
datetime.now(),
)
# Формируем пакет для получателя
outgoing_message = {
"id": new_msg.id,
"type": "private_message",
"sender_id": user_id,
"receiver_id": receiver_id,
"content": message_data.get("content"),
"message_type": message_type,
"file_id": file_id,
"encrypted_key": message_data.get("encrypted_key"),
"timestamp": (new_msg.timestamp or datetime.now()).isoformat(),
"reply_to_id": new_msg.reply_to_id,
"reply_to_text": new_msg.reply_to_text,
}
print(
f"DEBUG outgoing_message: id={outgoing_message['id']}, receiver_id={outgoing_message['receiver_id']}, file_id={outgoing_message['file_id']}, encrypted_key_present={outgoing_message['encrypted_key'] is not None}",
)
# Пересылаем получателю, если он в сети
sent_to_receiver = await manager.send_personal_message(outgoing_message, str(receiver_id))
print(f"DEBUG send_personal_message returned: {sent_to_receiver}")
# Если сообщение реально ушло по сокету получателю — отмечаем delivered_at.
if sent_to_receiver:
try:
delivered_at = datetime.now()
new_msg.delivered_at = delivered_at
db.add(new_msg)
db.commit()
await manager.send_personal_message({
"type": "message_delivered",
"message_id": new_msg.id,
"timestamp": delivered_at.isoformat(),
}, str(user_id))
except Exception:
db.rollback()
elif message_data.get("type") == "edit_message":
message_id = message_data.get("message_id")
content = message_data.get("content")
if message_id is None or content is None:
await websocket.send_json({
"type": "error",
"detail": "message_id/content required",
})
continue
try:
message_id = int(message_id)
except (TypeError, ValueError):
await websocket.send_json({
"type": "error",
"detail": "message_id must be int",
})
continue
msg = db.query(models.Message).filter(
models.Message.id == message_id).first()
if msg is None or msg.sender_id != user_id:
continue
try:
msg.content = content
msg.edited_at = datetime.now()
db.add(msg)
db.commit()
except Exception:
db.rollback()
continue
event = {
"type": "message_edited",
"message_id": msg.id,
"content": msg.content,
"edited_at": msg.edited_at.isoformat() if msg.edited_at else None,
}
await manager.send_personal_message(event, str(msg.receiver_id))
await manager.send_personal_message(event, str(msg.sender_id))
elif message_data.get("type") == "delete_message":
message_id = message_data.get("message_id")
if message_id is None:
await websocket.send_json({
"type": "error",
"detail": "message_id required",
})
continue
try:
message_id = int(message_id)
except (TypeError, ValueError):
await websocket.send_json({
"type": "error",
"detail": "message_id must be int",
})
continue
msg = db.query(models.Message).filter(
models.Message.id == message_id).first()
if msg is None or msg.sender_id != user_id:
continue
receiver_id = msg.receiver_id
try:
db.delete(msg)
db.commit()
except Exception:
db.rollback()
continue
event = {
"type": "message_deleted",
"message_id": message_id,
}
await manager.send_personal_message(event, str(receiver_id))
await manager.send_personal_message(event, str(user_id))
elif message_data.get("type") == "read_receipt":
message_id = message_data.get("message_id")
try:
message_id = int(message_id)
except (TypeError, ValueError):
continue
msg = db.query(models.Message).filter(
models.Message.id == message_id).first()
if msg is None:
continue
# Безопасность: read_receipt может отправлять только получатель.
if int(msg.receiver_id) != int(user_id):
continue
# Сохраняем read_at в БД
try:
read_at = datetime.now()
msg.read_at = read_at
db.add(msg)
db.commit()
except Exception:
db.rollback()
sender_id = int(msg.sender_id)
await manager.send_personal_message({
"type": "message_read",
"message_id": message_id,
"timestamp": read_at.isoformat() if 'read_at' in locals() else datetime.now().isoformat(),
}, str(sender_id))
elif message_data.get("type") == "typing":
receiver_id = message_data.get("receiver_id")
if receiver_id is None:
continue
try:
receiver_id = int(receiver_id)
except (TypeError, ValueError):
continue
await manager.send_personal_message({
"type": "typing",
"sender_id": user_id,
}, str(receiver_id))
elif message_data.get("type") == "stop_typing":
receiver_id = message_data.get("receiver_id")
if receiver_id is None:
continue
try:
receiver_id = int(receiver_id)
except (TypeError, ValueError):
continue
await manager.send_personal_message({
"type": "stop_typing",
"sender_id": user_id,
}, str(receiver_id))
except WebSocketDisconnect:
pass
finally:
manager.disconnect(user_id)
db.query(models.User).filter(models.User.id == user_id).update(
{"last_online": datetime.now(timezone.utc)}, synchronize_session="fetch")
db.commit()
print("ОТКЛЮЧЕНИЕ")
await manager.broadcast({
"type": "user_offline",
"user_id": user_id,
})
def send_fcm_notification(token, user_id, username, public_key, encrypted_text, timestamp):
print(
f"DEBUG: Отправляем FCM уведомление пользователю {user_id} с токеном {token}")
message = messaging.Message(
data={
"type": "enc_message",
"sender_id": str(user_id),
"username": username,
"public_key": public_key,
"content": encrypted_text, # Зашифрованный текст
"timestamp": timestamp.isoformat(),
},
android=messaging.AndroidConfig(
priority='high',
),
token=token,
)
try:
response = messaging.send(message)
print('Successfully sent message:', response)
except Exception as e:
print('Unexpected error sending push:', e)
class ConnectionManager:
def __init__(self):
# Храним активные соединения: {user_id: websocket}
self.active_connections: Dict[str, WebSocket] = {}
async def connect(self, user_id: int, websocket: WebSocket):
await websocket.accept()
self.active_connections[str(user_id)] = websocket
def disconnect(self, user_id: int):
key = str(user_id)
if key in self.active_connections:
del self.active_connections[key]
async def send_personal_message(self, message: dict, user_id: str) -> bool:
if str(user_id) in self.active_connections:
try:
await self.active_connections[str(user_id)].send_json(message)
print('Sent to socket')
return True
except Exception as e:
print(f'Failed to send to socket: {e}')
return False
else:
print('User not active')
return False
async def broadcast(self, message: dict):
# Рассылка вообще всем (например, системное уведомление)
for connection in self.active_connections.values():
await connection.send_json(message)
manager = ConnectionManager()