XNet/XNet.Business/Net/WsConnectionManager.cs
2025-12-31 18:29:29 +08:00

512 lines
19 KiB
C#
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

using MessagePack;
using Microsoft.Extensions.ObjectPool;
using NanoidDotNet;
using System.Collections.Concurrent;
using System.Diagnostics;
using System.Net.Sockets;
using System.Net.WebSockets;
using System.Text.Json;
using System.Threading.Tasks;
using XNet.Business.Entity;
using XNet.Business.PathNavigation;
namespace XNet.Business.Net
{
public class WsConnectionManager
{
// ========== 原有核心字段 ==========
private readonly ConcurrentDictionary<string, ControlPlayer> _connections = new();
/// <summary>
/// 第一个string键代表实例房间ID第二个string键代表用户连接ID值为玩家房间实体
/// </summary>
private readonly ConcurrentDictionary<string, ConcurrentDictionary<string, PlayerRoomInfo<ControlPlayer>>> _instanceSubscribers = new();
//private readonly JsonSerializerOptions _jsonOptions = new() { PropertyNamingPolicy = JsonNamingPolicy.CamelCase };
// ========== 新增:对象池配置 ==========
// 对象池(使用自定义重写的策略)
//private readonly ObjectPool<List<RoomMsg>> _syncMsgListPool;
private readonly ObjectPool<List<string>> _deadConnListPool;
private readonly ObjectPool<byte[]> _byteArrayPool;
private readonly SceneAgent _sceneAgent = null!;
public ConcurrentDictionary<string, ConcurrentDictionary<string, PlayerRoomInfo<ControlPlayer>>> InstanceSubscribers => _instanceSubscribers;
public ConcurrentDictionary<string, ControlPlayer> Connections => _connections;
public WsConnectionManager(SceneAgent sceneAgent)
{
_sceneAgent = sceneAgent;
// 配置对象池参数
var maximumRetained = 200;
// 初始化对象池(传入自定义策略)
//_syncMsgListPool = new DefaultObjectPool<List<RoomMsg>>(new SyncMsgListPolicy<RoomMsg>(), maximumRetained);
_deadConnListPool = new DefaultObjectPool<List<string>>(new DeadConnListPolicy(), maximumRetained);
_byteArrayPool = new DefaultObjectPool<byte[]>(new ByteArrayPolicy(4096), maximumRetained);
}
// ========== 业务方法(无修改) ==========
public string AddConnection(WebSocket socket)
{
string playerId = $"Conn_{Nanoid.Generate()}";
Connections.TryAdd(playerId, new ControlPlayer
{
WebSocket = socket,
PlayerId = playerId
});
Console.WriteLine($"[WS .NET 10] 新连接:{playerId},当前连接数:{Connections.Count}");
return playerId;
}
public async Task RemoveConnection(string connId)
{
if (Connections.TryRemove(connId, out var player))
{
await RemovePlayerFromRoom(connId, player);
}
}
public async Task LeaveRoom(string connId)
{
if (Connections.TryGetValue(connId, out var player))
{
await RemovePlayerFromRoom(connId, player);
}
}
private async Task RemovePlayerFromRoom(string connId, ControlPlayer player)
{
foreach (var roomId in player.RoomIds.Keys)
{
if (InstanceSubscribers.TryGetValue(roomId, out var room))
{
if (room.TryRemove(connId, out var playerRoom))
{
if (player.Agent != null)
{
playerRoom?.Crowd?.RemoveAgent(player.Agent);//移除寻路索引
}
}
//房间没有人了,把房间信息从相关集合移除
if (room.IsEmpty)
{
_sceneAgent.RemoveInstance(roomId);
InstanceSubscribers.TryRemove(roomId, out _);
}
else
{
//房间还有其他人时,向其他人发送这个用户的离线信息
await SendMessageToRoomBatchAsync(roomId, connId, WsMsgType.ROOM_MSG, new UserNetStateReply
{
Type = WsMsgType.OFFLINE,
PlayerId = connId
});
}
}
}
Console.WriteLine($"[WS .NET 10] 连接断开:{connId},当前连接数:{Connections.Count}");
}
public async Task<string?> SubscribeInstance(string connId, string mapKey, bool isCreateAgent, string roomId, bool isChangeMapKey, float maxAgentRadius = 0.1f)
{
if (!Connections.TryGetValue(connId, out ControlPlayer? player))
{
Console.WriteLine($"[WS .NET 10] 订阅失败:连接 {connId} 不存在");
return null;
}
//roomId为空时新创建一个唯一房间ID
if (string.IsNullOrWhiteSpace(roomId))
{
roomId = Nanoid.Generate();
}
bool isNewInstance = false;
var playerRooms = InstanceSubscribers.GetOrAdd(roomId, (key) =>
{
isNewInstance = true;
return new ConcurrentDictionary<string, PlayerRoomInfo<ControlPlayer>>();
});
// 如果已经加入其他房间,先移除旧房间订阅(对战游戏只能加入一个房间)
if (!isChangeMapKey && player.RoomIds.Any())
{
var oldRoomId = player.RoomIds.Keys.First();
if (oldRoomId != roomId)
{
player.RoomIds.Clear();
if (InstanceSubscribers.TryGetValue(oldRoomId, out var room))
{
room.TryRemove(connId, out _);
if (room.IsEmpty)
{
_sceneAgent.RemoveInstance(oldRoomId);
InstanceSubscribers.TryRemove(oldRoomId, out _);
}
}
}
}
if (playerRooms != null)
{
var playerRoom = playerRooms.GetOrAdd(connId, new PlayerRoomInfo<ControlPlayer> { MapKey = mapKey, RoomId = roomId });
if (!playerRoom.Players.Any())
{
playerRoom.Players.TryAdd(connId, player);
}
if (playerRoom.Players.Count == 1)
{
playerRoom.MapKey = mapKey;
}
}
player.RoomIds[roomId] = true;
if (isNewInstance)
{
//新建房间,创建地图副本
_sceneAgent.CreateInstance(roomId, mapKey);
}
if (isCreateAgent)
{
CreateCrowdForInstance(connId, roomId, maxAgentRadius);
}
//房间还有其他人时,向其他人发送这个用户的上线信息
await SendMessageToRoomBatchAsync(roomId, connId, WsMsgType.ROOM_MSG, new UserNetStateReply
{
Type = WsMsgType.ONLINE,
PlayerId = connId
});
return roomId;
}
public bool CreateCrowdForInstance(string connId, string roomId, float maxAgentRadius = 0.1f)
{
if (InstanceSubscribers.TryGetValue(roomId, out var rooms))
{
if (rooms.TryGetValue(connId, out var room))
{
if (room.Crowd == null)
{
room.Crowd = _sceneAgent.CreateCrowdForInstance(roomId, maxAgentRadius);
if (room.Crowd != null)
{
return true;
}
}
}
}
return false;
}
public bool HasSubscribeInstance(string roomId)
{
return InstanceSubscribers.ContainsKey(roomId);
}
public ControlPlayer? GetConnectionInfo(string connId)
{
if (Connections.TryGetValue(connId, out var socketInfo))
{
return socketInfo;
}
return null;
}
public PlayerRoomInfo<ControlPlayer>? GetPlayerRoomInfo(string roomId, string connId)
{
if (InstanceSubscribers.TryGetValue(roomId, out var room))
{
if (room.TryGetValue(connId, out var playerRoom))
{
return playerRoom;
}
}
return null;
}
public PlayerRoomInfo<ControlPlayer>? GetStartRoomInfoForCondition(string selfRoomId, string roomName)
{
var now = DateTime.Now;
foreach (var rooms in InstanceSubscribers.Values)
{
var room = rooms.FirstOrDefault().Value;
if (room != null)
{
if (room.IsPublic && room.Players.Any() && room.RoomName == roomName && room.RoomId != selfRoomId && room.StartGameTime != null && (now - room.StartGameTime)!.Value.Seconds < 60)
{
return room;
}
}
}
return null;
}
//public async Task SendBaseSingleMessage<T>(string connId, WsMsgType type, T msg)
//{
// var baseMessage = new BaseMsg
// {
// SenderId = connId,
// Type = type,
// Data = MessagePackSerializer.Serialize(msg)
// };
// await SendSerializeMessageToPointWsSocket(connId, baseMessage);
//}
public async Task SendBaseSingleMessage<T>(string connId, WsMsgType type, T msg)
{
var baseMessage = new TypeBaseMsg<T>
{
SenderId = connId,
Type = type,
Data = msg
};
await SendSerializeMessageToPointWsSocket(connId, baseMessage);
}
public async Task SendSerializeMessageToPointWsSocket<T>(string connId, T syncMsg)
{
if (Connections.TryGetValue(connId, out var socketInfo))
{
// 3. 复用:从池获取失效连接列表
var deadConnIds = _deadConnListPool.Get();
try
{
int msgBytesLength = CreateSerializeMessage(syncMsg, out byte[] msgBytes);
if (msgBytesLength != 0)
{
// 3. 发送时传递有效长度
_ = SendToSingleConnAsync(socketInfo.WebSocket, msgBytes, msgBytesLength, connId, deadConnIds);
_byteArrayPool.Return(msgBytes);
}
else
{
// 3. 发送整串字节数组
_ = SendToSingleConnAsync(socketInfo.WebSocket, msgBytes, msgBytes.Length, connId, deadConnIds);
}
// 清理失效连接
foreach (var deadConnId in deadConnIds)
{
_ = RemoveConnection(deadConnId);
}
}
finally
{
_deadConnListPool.Return(deadConnIds); // 归还,自动清空
}
}
}
public async Task SendMessageToRoomBatchAsync<T>(string roomId, string connId, WsMsgType type, T msg)
{
await SendMessageToRoomBatchAsync([new BaseRoomMsg<T> {
Type = type,
RoomId = roomId,
SenderId = connId,
Data = msg
}]);
}
public async Task SendMessageToRoomBatchAsync(string roomId, byte[] msgBytes, List<string>? exceptIds = null)
{
if (!InstanceSubscribers.TryGetValue(roomId, out var room)) return;
// 3. 复用:从池获取失效连接列表
var deadConnIds = _deadConnListPool.Get();
try
{
foreach (var connKv in room)
{
if (exceptIds != null && exceptIds.Contains(connKv.Key))
{
continue;
}
if (Connections.TryGetValue(connKv.Key, out var socketInfo))
{
// 3. 发送整串字节数组
_ = SendToSingleConnAsync(socketInfo.WebSocket, msgBytes, msgBytes.Length, connKv.Key, deadConnIds);
}
}
// 清理失效连接
foreach (var deadConnId in deadConnIds)
{
_ = RemoveConnection(deadConnId);
}
}
finally
{
_deadConnListPool.Return(deadConnIds); // 归还,自动清空
}
//}
//finally
//{
// _syncMsgListPool.Return(msgList); // 归还,自动清空
//}
}
public async Task SendMessageToRoomBatchAsync<T>(List<BaseRoomMsg<T>> syncMsgs, List<string>? exceptIds = null)
{
if (syncMsgs.Count == 0) return;
foreach (var group in syncMsgs.GroupBy(m => m.RoomId))
{
if (!InstanceSubscribers.TryGetValue(group.Key, out var room)) continue;
// 1. 复用:从池获取消息列表
//var msgList = _syncMsgListPool.Get();
//try
//{
//msgList.AddRange(group);
//int msgBytesLength = CreateSerializeMessage(group.ToList(), out byte[] msgBytes);
// 封装WebSocket消息
int msgBytesLength = CreateSerializeMessage(group.ToList(), out byte[] msgBytes);
// 3. 复用:从池获取失效连接列表
var deadConnIds = _deadConnListPool.Get();
try
{
foreach (var connKv in room)
{
if (exceptIds != null && exceptIds.Contains(connKv.Key))
{
continue;
}
if (Connections.TryGetValue(connKv.Key, out var socketInfo))
{
if (msgBytesLength != 0)
{
// 3. 发送时传递有效长度
_ = SendToSingleConnAsync(socketInfo.WebSocket, msgBytes, msgBytesLength, connKv.Key, deadConnIds);
_byteArrayPool.Return(msgBytes);
}
else
{
// 3. 发送整串字节数组
_ = SendToSingleConnAsync(socketInfo.WebSocket, msgBytes, msgBytes.Length, connKv.Key, deadConnIds);
}
}
}
// 清理失效连接
foreach (var deadConnId in deadConnIds)
{
_ = RemoveConnection(deadConnId);
}
}
finally
{
_deadConnListPool.Return(deadConnIds); // 归还,自动清空
}
//}
//finally
//{
// _syncMsgListPool.Return(msgList); // 归还,自动清空
//}
}
}
private int CreateSerializeMessage<T>(T msg, out byte[] msgBytes)
{
// 2. 序列化逻辑优化(完全复用池化数组)
int msgBytesLength = 0; // 记录有效长度
// 2. 复用:从池获取字节数组(核心使用 _byteArrayPool
var pooledBytes = _byteArrayPool.Get();
using (var tempMs = new MemoryStream())
{
MessagePackSerializer.Serialize(tempMs, msg);
msgBytesLength = (int)tempMs.Position;
if (msgBytesLength <= pooledBytes.Length)
{
tempMs.Position = 0;
tempMs.Read(pooledBytes, 0, msgBytesLength);
msgBytes = pooledBytes; // 直接复用池化数组
}
else
{
msgBytes = tempMs.ToArray();// 超出池化数组长度,使用新分配的数组
msgBytesLength = 0;
}
}
return msgBytesLength;
}
// 1. 定义带长度的发送方法
private async Task SendToSingleConnAsync(WebSocket socket, byte[] data, int dataLength, string connId, List<string> deadConnIds)
{
try
{
if (socket.State == WebSocketState.Open)
{
// 只发送有效长度的字节而非全量4KB
await socket.SendAsync(
new ArraySegment<byte>(data, 0, dataLength),
WebSocketMessageType.Binary,
true,
CancellationToken.None
);
}
else
{
deadConnIds.Add(connId);
}
}
catch (Exception ex)
{
Console.WriteLine($"[WS] 发送失败 {connId}{ex.Message}");
deadConnIds.Add(connId);
}
}
//private async Task SendToSingleConnAsync(WebSocket socket, byte[] data, string connId, List<string> deadConnIds)
//{
// try
// {
// if (socket.State == WebSocketState.Open)
// {
// await socket.SendAsync(new ArraySegment<byte>(data), WebSocketMessageType.Binary, true, CancellationToken.None);
// }
// else
// {
// deadConnIds.Add(connId);
// }
// }
// catch (Exception ex)
// {
// Console.WriteLine($"[WS .NET 10] 发送失败 {connId}{ex.Message}");
// deadConnIds.Add(connId);
// }
//}
public async ValueTask DisposeAsync()
{
foreach (var (key, socketInfo) in Connections)
{
if (socketInfo.WebSocket.State == WebSocketState.Open)
{
await socketInfo.WebSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, "Server shutdown", CancellationToken.None);
}
socketInfo.WebSocket.Dispose();
RemoveConnection(key);
}
Connections.Clear();
InstanceSubscribers.Clear();
}
}
}