real time messaging
This commit is contained in:
343
backend/sockets/messageSocket.js
Normal file
343
backend/sockets/messageSocket.js
Normal file
@@ -0,0 +1,343 @@
|
||||
const logger = require("../utils/logger");
|
||||
|
||||
/**
|
||||
* Map to track typing status: { userId_receiverId: timestamp }
|
||||
* Used to prevent duplicate typing events and auto-clear stale states
|
||||
*/
|
||||
const typingStatus = new Map();
|
||||
|
||||
/**
|
||||
* Cleanup interval for stale typing indicators (every 5 seconds)
|
||||
*/
|
||||
setInterval(() => {
|
||||
const now = Date.now();
|
||||
const TYPING_TIMEOUT = 5000; // 5 seconds
|
||||
|
||||
for (const [key, timestamp] of typingStatus.entries()) {
|
||||
if (now - timestamp > TYPING_TIMEOUT) {
|
||||
typingStatus.delete(key);
|
||||
}
|
||||
}
|
||||
}, 5000);
|
||||
|
||||
/**
|
||||
* Generate conversation room ID from two user IDs
|
||||
* Always sorts IDs to ensure consistent room naming regardless of who initiates
|
||||
*/
|
||||
const getConversationRoom = (userId1, userId2) => {
|
||||
const sorted = [userId1, userId2].sort();
|
||||
return `conv_${sorted[0]}_${sorted[1]}`;
|
||||
};
|
||||
|
||||
/**
|
||||
* Get personal user room ID
|
||||
*/
|
||||
const getUserRoom = (userId) => {
|
||||
return `user_${userId}`;
|
||||
};
|
||||
|
||||
/**
|
||||
* Initialize message socket handlers
|
||||
* @param {SocketIO.Server} io - Socket.io server instance
|
||||
*/
|
||||
const initializeMessageSocket = (io) => {
|
||||
io.on("connection", (socket) => {
|
||||
const userId = socket.userId;
|
||||
const userRoom = getUserRoom(userId);
|
||||
|
||||
logger.info("User connected to messaging", {
|
||||
socketId: socket.id,
|
||||
userId,
|
||||
userEmail: socket.user.email,
|
||||
});
|
||||
|
||||
// Join user's personal room for receiving direct messages
|
||||
socket.join(userRoom);
|
||||
logger.debug("User joined personal room", {
|
||||
socketId: socket.id,
|
||||
userId,
|
||||
room: userRoom,
|
||||
});
|
||||
|
||||
/**
|
||||
* Join a specific conversation room
|
||||
* Used when user opens a chat with another user
|
||||
*/
|
||||
socket.on("join_conversation", (data) => {
|
||||
try {
|
||||
const { otherUserId } = data;
|
||||
|
||||
if (!otherUserId) {
|
||||
logger.warn("join_conversation - missing otherUserId", {
|
||||
socketId: socket.id,
|
||||
userId,
|
||||
});
|
||||
return;
|
||||
}
|
||||
|
||||
const conversationRoom = getConversationRoom(userId, otherUserId);
|
||||
socket.join(conversationRoom);
|
||||
|
||||
logger.debug("User joined conversation room", {
|
||||
socketId: socket.id,
|
||||
userId,
|
||||
otherUserId,
|
||||
room: conversationRoom,
|
||||
});
|
||||
|
||||
socket.emit("conversation_joined", {
|
||||
conversationRoom,
|
||||
otherUserId,
|
||||
});
|
||||
} catch (error) {
|
||||
logger.error("Error joining conversation", {
|
||||
socketId: socket.id,
|
||||
userId,
|
||||
error: error.message,
|
||||
});
|
||||
}
|
||||
});
|
||||
|
||||
/**
|
||||
* Leave a specific conversation room
|
||||
* Used when user closes a chat
|
||||
*/
|
||||
socket.on("leave_conversation", (data) => {
|
||||
try {
|
||||
const { otherUserId } = data;
|
||||
|
||||
if (!otherUserId) {
|
||||
return;
|
||||
}
|
||||
|
||||
const conversationRoom = getConversationRoom(userId, otherUserId);
|
||||
socket.leave(conversationRoom);
|
||||
|
||||
logger.debug("User left conversation room", {
|
||||
socketId: socket.id,
|
||||
userId,
|
||||
otherUserId,
|
||||
room: conversationRoom,
|
||||
});
|
||||
} catch (error) {
|
||||
logger.error("Error leaving conversation", {
|
||||
socketId: socket.id,
|
||||
userId,
|
||||
error: error.message,
|
||||
});
|
||||
}
|
||||
});
|
||||
|
||||
/**
|
||||
* Typing start indicator
|
||||
* Notifies the recipient that this user is typing
|
||||
*/
|
||||
socket.on("typing_start", (data) => {
|
||||
try {
|
||||
const { receiverId } = data;
|
||||
|
||||
if (!receiverId) {
|
||||
return;
|
||||
}
|
||||
|
||||
// Throttle typing events (prevent spam)
|
||||
const typingKey = `${userId}_${receiverId}`;
|
||||
const lastTyping = typingStatus.get(typingKey);
|
||||
const now = Date.now();
|
||||
|
||||
if (lastTyping && now - lastTyping < 1000) {
|
||||
// Ignore if typed within last 1 second
|
||||
return;
|
||||
}
|
||||
|
||||
typingStatus.set(typingKey, now);
|
||||
|
||||
// Emit to recipient's personal room
|
||||
const receiverRoom = getUserRoom(receiverId);
|
||||
io.to(receiverRoom).emit("user_typing", {
|
||||
userId,
|
||||
firstName: socket.user.firstName,
|
||||
isTyping: true,
|
||||
});
|
||||
|
||||
logger.debug("Typing indicator sent", {
|
||||
socketId: socket.id,
|
||||
senderId: userId,
|
||||
receiverId,
|
||||
});
|
||||
} catch (error) {
|
||||
logger.error("Error handling typing_start", {
|
||||
socketId: socket.id,
|
||||
userId,
|
||||
error: error.message,
|
||||
});
|
||||
}
|
||||
});
|
||||
|
||||
/**
|
||||
* Typing stop indicator
|
||||
* Notifies the recipient that this user stopped typing
|
||||
*/
|
||||
socket.on("typing_stop", (data) => {
|
||||
try {
|
||||
const { receiverId } = data;
|
||||
|
||||
if (!receiverId) {
|
||||
return;
|
||||
}
|
||||
|
||||
// Clear typing status
|
||||
const typingKey = `${userId}_${receiverId}`;
|
||||
typingStatus.delete(typingKey);
|
||||
|
||||
// Emit to recipient's personal room
|
||||
const receiverRoom = getUserRoom(receiverId);
|
||||
io.to(receiverRoom).emit("user_typing", {
|
||||
userId,
|
||||
firstName: socket.user.firstName,
|
||||
isTyping: false,
|
||||
});
|
||||
|
||||
logger.debug("Typing stop sent", {
|
||||
socketId: socket.id,
|
||||
senderId: userId,
|
||||
receiverId,
|
||||
});
|
||||
} catch (error) {
|
||||
logger.error("Error handling typing_stop", {
|
||||
socketId: socket.id,
|
||||
userId,
|
||||
error: error.message,
|
||||
});
|
||||
}
|
||||
});
|
||||
|
||||
/**
|
||||
* Mark message as read (from client)
|
||||
* This is handled by the REST API route, but we listen here for consistency
|
||||
*/
|
||||
socket.on("mark_message_read", (data) => {
|
||||
try {
|
||||
const { messageId, senderId } = data;
|
||||
|
||||
if (!messageId || !senderId) {
|
||||
return;
|
||||
}
|
||||
|
||||
// Emit to sender's room to update their UI
|
||||
const senderRoom = getUserRoom(senderId);
|
||||
io.to(senderRoom).emit("message_read", {
|
||||
messageId,
|
||||
readAt: new Date().toISOString(),
|
||||
readBy: userId,
|
||||
});
|
||||
|
||||
logger.debug("Message read notification sent", {
|
||||
socketId: socket.id,
|
||||
messageId,
|
||||
readBy: userId,
|
||||
notifiedUserId: senderId,
|
||||
});
|
||||
} catch (error) {
|
||||
logger.error("Error handling mark_message_read", {
|
||||
socketId: socket.id,
|
||||
userId,
|
||||
error: error.message,
|
||||
});
|
||||
}
|
||||
});
|
||||
|
||||
/**
|
||||
* Disconnect handler
|
||||
* Clean up rooms and typing status
|
||||
*/
|
||||
socket.on("disconnect", (reason) => {
|
||||
// Clean up all typing statuses for this user
|
||||
for (const [key] of typingStatus.entries()) {
|
||||
if (key.startsWith(`${userId}_`)) {
|
||||
typingStatus.delete(key);
|
||||
}
|
||||
}
|
||||
|
||||
logger.info("User disconnected from messaging", {
|
||||
socketId: socket.id,
|
||||
userId,
|
||||
reason,
|
||||
});
|
||||
});
|
||||
|
||||
/**
|
||||
* Error handler
|
||||
*/
|
||||
socket.on("error", (error) => {
|
||||
logger.error("Socket error", {
|
||||
socketId: socket.id,
|
||||
userId,
|
||||
error: error.message,
|
||||
stack: error.stack,
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
logger.info("Message socket handlers initialized");
|
||||
};
|
||||
|
||||
/**
|
||||
* Emit new message event to a specific user
|
||||
* Called from message routes when a message is created
|
||||
* @param {SocketIO.Server} io - Socket.io server instance
|
||||
* @param {string} receiverId - User ID to send the message to
|
||||
* @param {Object} messageData - Message object with sender info
|
||||
*/
|
||||
const emitNewMessage = (io, receiverId, messageData) => {
|
||||
try {
|
||||
const receiverRoom = getUserRoom(receiverId);
|
||||
io.to(receiverRoom).emit("new_message", messageData);
|
||||
|
||||
logger.info("New message emitted", {
|
||||
receiverId,
|
||||
receiverRoom,
|
||||
messageId: messageData.id,
|
||||
senderId: messageData.senderId,
|
||||
});
|
||||
} catch (error) {
|
||||
logger.error("Error emitting new message", {
|
||||
receiverId,
|
||||
messageId: messageData.id,
|
||||
error: error.message,
|
||||
});
|
||||
}
|
||||
};
|
||||
|
||||
/**
|
||||
* Emit message read event to sender
|
||||
* Called from message routes when a message is marked as read
|
||||
* @param {SocketIO.Server} io - Socket.io server instance
|
||||
* @param {string} senderId - User ID who sent the message
|
||||
* @param {Object} readData - Read status data
|
||||
*/
|
||||
const emitMessageRead = (io, senderId, readData) => {
|
||||
try {
|
||||
const senderRoom = getUserRoom(senderId);
|
||||
io.to(senderRoom).emit("message_read", readData);
|
||||
|
||||
logger.debug("Message read status emitted", {
|
||||
senderId,
|
||||
messageId: readData.messageId,
|
||||
});
|
||||
} catch (error) {
|
||||
logger.error("Error emitting message read status", {
|
||||
senderId,
|
||||
messageId: readData.messageId,
|
||||
error: error.message,
|
||||
});
|
||||
}
|
||||
};
|
||||
|
||||
module.exports = {
|
||||
initializeMessageSocket,
|
||||
emitNewMessage,
|
||||
emitMessageRead,
|
||||
getConversationRoom,
|
||||
getUserRoom,
|
||||
};
|
||||
111
backend/sockets/socketAuth.js
Normal file
111
backend/sockets/socketAuth.js
Normal file
@@ -0,0 +1,111 @@
|
||||
const jwt = require("jsonwebtoken");
|
||||
const { User } = require("../models");
|
||||
const logger = require("../utils/logger");
|
||||
const cookie = require("cookie");
|
||||
|
||||
/**
|
||||
* Socket.io authentication middleware
|
||||
* Verifies JWT token and attaches user to socket
|
||||
* Tokens can be provided via:
|
||||
* 1. Cookie (accessToken) - preferred for browser clients
|
||||
* 2. Query parameter (token) - fallback for mobile/other clients
|
||||
*/
|
||||
const authenticateSocket = async (socket, next) => {
|
||||
try {
|
||||
let token = null;
|
||||
|
||||
// Try to get token from cookies first (browser clients)
|
||||
if (socket.handshake.headers.cookie) {
|
||||
const cookies = cookie.parse(socket.handshake.headers.cookie);
|
||||
token = cookies.accessToken;
|
||||
}
|
||||
|
||||
// Fallback to query parameter (mobile/other clients)
|
||||
if (!token && socket.handshake.auth?.token) {
|
||||
token = socket.handshake.auth.token;
|
||||
}
|
||||
|
||||
// Fallback to legacy query parameter
|
||||
if (!token && socket.handshake.query?.token) {
|
||||
token = socket.handshake.query.token;
|
||||
}
|
||||
|
||||
if (!token) {
|
||||
logger.warn("Socket connection rejected - no token provided", {
|
||||
socketId: socket.id,
|
||||
address: socket.handshake.address,
|
||||
});
|
||||
return next(new Error("Authentication required"));
|
||||
}
|
||||
|
||||
// Verify JWT
|
||||
const decoded = jwt.verify(token, process.env.JWT_SECRET);
|
||||
const userId = decoded.id;
|
||||
|
||||
if (!userId) {
|
||||
logger.warn("Socket connection rejected - invalid token format", {
|
||||
socketId: socket.id,
|
||||
});
|
||||
return next(new Error("Invalid token format"));
|
||||
}
|
||||
|
||||
// Look up user
|
||||
const user = await User.findByPk(userId);
|
||||
|
||||
if (!user) {
|
||||
logger.warn("Socket connection rejected - user not found", {
|
||||
socketId: socket.id,
|
||||
userId,
|
||||
});
|
||||
return next(new Error("User not found"));
|
||||
}
|
||||
|
||||
// Validate JWT version (invalidate old tokens after password change)
|
||||
if (decoded.jwtVersion !== user.jwtVersion) {
|
||||
logger.warn("Socket connection rejected - JWT version mismatch", {
|
||||
socketId: socket.id,
|
||||
userId,
|
||||
tokenVersion: decoded.jwtVersion,
|
||||
userVersion: user.jwtVersion,
|
||||
});
|
||||
return next(
|
||||
new Error("Session expired due to password change. Please log in again.")
|
||||
);
|
||||
}
|
||||
|
||||
// Attach user to socket for use in event handlers
|
||||
socket.userId = user.id;
|
||||
socket.user = {
|
||||
id: user.id,
|
||||
email: user.email,
|
||||
firstName: user.firstName,
|
||||
lastName: user.lastName,
|
||||
};
|
||||
|
||||
logger.info("Socket authenticated successfully", {
|
||||
socketId: socket.id,
|
||||
userId: user.id,
|
||||
email: user.email,
|
||||
});
|
||||
|
||||
next();
|
||||
} catch (error) {
|
||||
// Check if token is expired
|
||||
if (error.name === "TokenExpiredError") {
|
||||
logger.warn("Socket connection rejected - token expired", {
|
||||
socketId: socket.id,
|
||||
});
|
||||
return next(new Error("Token expired"));
|
||||
}
|
||||
|
||||
logger.error("Socket authentication error", {
|
||||
socketId: socket.id,
|
||||
error: error.message,
|
||||
stack: error.stack,
|
||||
});
|
||||
|
||||
return next(new Error("Authentication failed"));
|
||||
}
|
||||
};
|
||||
|
||||
module.exports = { authenticateSocket };
|
||||
Reference in New Issue
Block a user