XNet/XNet.Business/Net/WsConnectionManager.cs
2025-12-30 11:22:46 +08:00

306 lines
11 KiB
C#
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

using MessagePack;
using Microsoft.Extensions.ObjectPool;
using NanoidDotNet;
using System.Collections.Concurrent;
using System.Diagnostics;
using System.Net.Sockets;
using System.Net.WebSockets;
using System.Text.Json;
using XNet.Business.Entity;
using XNet.Business.PathNavigation;
namespace XNet.Business.Net
{
public class WsConnectionManager
{
// ========== 原有核心字段 ==========
private readonly ConcurrentDictionary<string, WebSocketInfo> _connections = new();
private readonly ConcurrentDictionary<string, ConcurrentDictionary<string, bool>> _instanceSubscribers = new();
//private readonly JsonSerializerOptions _jsonOptions = new() { PropertyNamingPolicy = JsonNamingPolicy.CamelCase };
// ========== 新增:对象池配置 ==========
// 对象池(使用自定义重写的策略)
//private readonly ObjectPool<List<RoomMsg>> _syncMsgListPool;
private readonly ObjectPool<List<string>> _deadConnListPool;
private readonly ObjectPool<byte[]> _byteArrayPool;
private readonly SceneAgent _sceneAgent = null!;
public WsConnectionManager(SceneAgent sceneAgent)
{
_sceneAgent = sceneAgent;
// 配置对象池参数
var maximumRetained = 200;
// 初始化对象池(传入自定义策略)
//_syncMsgListPool = new DefaultObjectPool<List<RoomMsg>>(new SyncMsgListPolicy<RoomMsg>(), maximumRetained);
_deadConnListPool = new DefaultObjectPool<List<string>>(new DeadConnListPolicy(), maximumRetained);
_byteArrayPool = new DefaultObjectPool<byte[]>(new ByteArrayPolicy(4096), maximumRetained);
}
// ========== 业务方法(无修改) ==========
public string AddConnection(WebSocket socket)
{
string connId = $"Conn_{Nanoid.Generate()}";
_connections.TryAdd(connId, new WebSocketInfo
{
WebSocket = socket
});
Console.WriteLine($"[WS .NET 10] 新连接:{connId},当前连接数:{_connections.Count}");
return connId;
}
public void RemoveConnection(string connId)
{
if (_connections.TryRemove(connId, out var socketInfo))
{
foreach (var instanceId in socketInfo.RoomIds.Keys)
{
_instanceSubscribers[instanceId].TryRemove(connId, out _);
if (_instanceSubscribers[instanceId].IsEmpty)
{
_sceneAgent.RemoveInstance(instanceId);
_instanceSubscribers.TryRemove(instanceId, out _);
}
}
Console.WriteLine($"[WS .NET 10] 连接断开:{connId},当前连接数:{_connections.Count}");
}
}
public bool SubscribeInstance(string connId, string mapKey, ref string roomId)
{
if (!_connections.TryGetValue(connId,out WebSocketInfo? socketInfo))
{
Console.WriteLine($"[WS .NET 10] 订阅失败:连接 {connId} 不存在");
return false;
}
//roomId为空时新创建一个唯一房间ID
if (string.IsNullOrWhiteSpace(roomId))
{
roomId = Nanoid.Generate();
}
bool isNewInstance = false;
_instanceSubscribers.GetOrAdd(roomId, (key) =>
{
isNewInstance = true;
return new ConcurrentDictionary<string, bool>();
});
_instanceSubscribers[roomId].TryAdd(connId, true);
socketInfo.RoomIds[roomId] = true;
if (isNewInstance)
{
_sceneAgent.CreateInstance(roomId, mapKey);
}
return true;
}
public bool HasSubscribeInstance(string instanceId)
{
if (_instanceSubscribers.ContainsKey(instanceId))
{
return true;
}
return false;
}
public async Task SendSerializeMessageToPointWsSocket<T>(string connId, T syncMsgs)
{
if (_connections.TryGetValue(connId, out var socketInfo))
{
// 3. 复用:从池获取失效连接列表
var deadConnIds = _deadConnListPool.Get();
try
{
int msgBytesLength = CreateSerializeMessage(syncMsgs, out byte[] msgBytes);
if (msgBytesLength != 0)
{
// 3. 发送时传递有效长度
_ = SendToSingleConnAsync(socketInfo.WebSocket, msgBytes, msgBytesLength, connId, deadConnIds);
}
else
{
// 3. 发送整串字节数组
_ = SendToSingleConnAsync(socketInfo.WebSocket, msgBytes, msgBytes.Length, connId, deadConnIds);
}
// 清理失效连接
foreach (var deadConnId in deadConnIds)
{
RemoveConnection(deadConnId);
}
}
finally
{
_deadConnListPool.Return(deadConnIds); // 归还,自动清空
}
}
}
public async Task SendSerializeBatchMessageToPointWsSocket<T>(RoomMsg<T> syncMsg)
{
await SendMessageToRoomBatchAsync([syncMsg]);
}
public async Task SendMessageToRoomBatchAsync<T>(List<RoomMsg<T>> syncMsgs)
{
if (syncMsgs.Count == 0) return;
foreach (var group in syncMsgs.GroupBy(m => m.RoomId))
{
if (!_instanceSubscribers.TryGetValue(group.Key, out var subscriberConnIds)) continue;
// 1. 复用:从池获取消息列表
//var msgList = _syncMsgListPool.Get();
//try
//{
//msgList.AddRange(group);
//int msgBytesLength = CreateSerializeMessage(group.ToList(), out byte[] msgBytes);
// 封装WebSocket消息
int msgBytesLength = CreateSerializeMessage(group.ToList(), out byte[] msgBytes);
// 3. 复用:从池获取失效连接列表
var deadConnIds = _deadConnListPool.Get();
try
{
foreach (var connKv in subscriberConnIds)
{
if (_connections.TryGetValue(connKv.Key, out var socketInfo))
{
if (msgBytesLength != 0)
{
// 3. 发送时传递有效长度
_ = SendToSingleConnAsync(socketInfo.WebSocket, msgBytes, msgBytesLength, connKv.Key, deadConnIds);
}
else
{
// 3. 发送整串字节数组
_ = SendToSingleConnAsync(socketInfo.WebSocket, msgBytes, msgBytes.Length, connKv.Key, deadConnIds);
}
}
}
// 清理失效连接
foreach (var deadConnId in deadConnIds)
{
RemoveConnection(deadConnId);
}
}
finally
{
_deadConnListPool.Return(deadConnIds); // 归还,自动清空
}
//}
//finally
//{
// _syncMsgListPool.Return(msgList); // 归还,自动清空
//}
}
}
private int CreateSerializeMessage<T>(T msg, out byte[] msgBytes)
{
// 2. 序列化逻辑优化(完全复用池化数组)
int msgBytesLength = 0; // 记录有效长度
// 2. 复用:从池获取字节数组(核心使用 _byteArrayPool
var pooledBytes = _byteArrayPool.Get();
try
{
using (var tempMs = new MemoryStream())
{
MessagePackSerializer.Serialize(tempMs, msg);
msgBytesLength = (int)tempMs.Position;
if (msgBytesLength <= pooledBytes.Length)
{
tempMs.Position = 0;
tempMs.Read(pooledBytes, 0, msgBytesLength);
msgBytes = pooledBytes; // 直接复用池化数组
}
else
{
msgBytes = tempMs.ToArray();// 超出池化数组长度,使用新分配的数组
msgBytesLength = 0;
}
}
}
finally
{
_byteArrayPool.Return(pooledBytes);
}
return msgBytesLength;
}
// 1. 定义带长度的发送方法
private async Task SendToSingleConnAsync(WebSocket socket, byte[] data, int dataLength, string connId, List<string> deadConnIds)
{
try
{
if (socket.State == WebSocketState.Open)
{
// 只发送有效长度的字节而非全量4KB
await socket.SendAsync(
new ArraySegment<byte>(data, 0, dataLength),
WebSocketMessageType.Binary,
true,
CancellationToken.None
);
}
else
{
deadConnIds.Add(connId);
}
}
catch (Exception ex)
{
Console.WriteLine($"[WS] 发送失败 {connId}{ex.Message}");
deadConnIds.Add(connId);
}
}
//private async Task SendToSingleConnAsync(WebSocket socket, byte[] data, string connId, List<string> deadConnIds)
//{
// try
// {
// if (socket.State == WebSocketState.Open)
// {
// await socket.SendAsync(new ArraySegment<byte>(data), WebSocketMessageType.Binary, true, CancellationToken.None);
// }
// else
// {
// deadConnIds.Add(connId);
// }
// }
// catch (Exception ex)
// {
// Console.WriteLine($"[WS .NET 10] 发送失败 {connId}{ex.Message}");
// deadConnIds.Add(connId);
// }
//}
public async ValueTask DisposeAsync()
{
foreach (var (key, socketInfo) in _connections)
{
if (socketInfo.WebSocket.State == WebSocketState.Open)
{
await socketInfo.WebSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, "Server shutdown", CancellationToken.None);
}
socketInfo.WebSocket.Dispose();
RemoveConnection(key);
}
_connections.Clear();
_instanceSubscribers.Clear();
}
}
}