260 lines
9.4 KiB
C#
260 lines
9.4 KiB
C#
using MessagePack;
|
||
using Microsoft.Extensions.ObjectPool;
|
||
using NanoidDotNet;
|
||
using System.Collections.Concurrent;
|
||
using System.Diagnostics;
|
||
using System.Net.WebSockets;
|
||
using System.Text.Json;
|
||
using XNet.Business.PathNavigation;
|
||
|
||
namespace XNet.Business.Net
|
||
{
|
||
public class WsConnectionManager
|
||
{
|
||
// ========== 原有核心字段 ==========
|
||
private readonly ConcurrentDictionary<string, WebSocket> _connections = new();
|
||
private readonly ConcurrentDictionary<string, ConcurrentDictionary<string, bool>> _instanceSubscribers = new();
|
||
//private readonly JsonSerializerOptions _jsonOptions = new() { PropertyNamingPolicy = JsonNamingPolicy.CamelCase };
|
||
|
||
// ========== 新增:对象池配置 ==========
|
||
// 对象池(使用自定义重写的策略)
|
||
//private readonly ObjectPool<List<RoomMsg>> _syncMsgListPool;
|
||
private readonly ObjectPool<List<string>> _deadConnListPool;
|
||
private readonly ObjectPool<byte[]> _byteArrayPool;
|
||
|
||
private readonly SceneAgent _sceneAgent = null;
|
||
|
||
public WsConnectionManager(SceneAgent sceneAgent)
|
||
{
|
||
_sceneAgent = sceneAgent;
|
||
// 配置对象池参数
|
||
var maximumRetained = 200;
|
||
|
||
// 初始化对象池(传入自定义策略)
|
||
//_syncMsgListPool = new DefaultObjectPool<List<RoomMsg>>(new SyncMsgListPolicy<RoomMsg>(), maximumRetained);
|
||
_deadConnListPool = new DefaultObjectPool<List<string>>(new DeadConnListPolicy(), maximumRetained);
|
||
_byteArrayPool = new DefaultObjectPool<byte[]>(new ByteArrayPolicy(4096), maximumRetained);
|
||
}
|
||
|
||
// ========== 业务方法(无修改) ==========
|
||
public string AddConnection(WebSocket socket)
|
||
{
|
||
string connId = $"Conn_{Nanoid.Generate()}";
|
||
_connections.TryAdd(connId, socket);
|
||
Console.WriteLine($"[WS .NET 10] 新连接:{connId},当前连接数:{_connections.Count}");
|
||
return connId;
|
||
}
|
||
|
||
public void RemoveConnection(string connId)
|
||
{
|
||
if (_connections.TryRemove(connId, out _))
|
||
{
|
||
foreach (var instanceId in _instanceSubscribers.Keys)
|
||
{
|
||
_instanceSubscribers[instanceId].TryRemove(connId, out _);
|
||
|
||
if (_instanceSubscribers[instanceId].IsEmpty)
|
||
{
|
||
_sceneAgent.RemoveInstance(instanceId);
|
||
_instanceSubscribers.TryRemove(instanceId, out _);
|
||
}
|
||
}
|
||
Console.WriteLine($"[WS .NET 10] 连接断开:{connId},当前连接数:{_connections.Count}");
|
||
}
|
||
}
|
||
|
||
public bool SubscribeInstance(string connId, string mapKey, ref string roomId)
|
||
{
|
||
if (!_connections.ContainsKey(connId))
|
||
{
|
||
Console.WriteLine($"[WS .NET 10] 订阅失败:连接 {connId} 不存在");
|
||
return false;
|
||
}
|
||
|
||
//roomId为空时,新创建一个唯一房间ID
|
||
if (string.IsNullOrWhiteSpace(roomId))
|
||
{
|
||
roomId = Nanoid.Generate();
|
||
}
|
||
|
||
|
||
bool isNewInstance = false;
|
||
_instanceSubscribers.GetOrAdd(roomId, (key) =>
|
||
{
|
||
isNewInstance = true;
|
||
return new ConcurrentDictionary<string, bool>();
|
||
});
|
||
|
||
_instanceSubscribers[roomId].TryAdd(connId, true);
|
||
|
||
if (isNewInstance)
|
||
{
|
||
_sceneAgent.CreateInstance(roomId, mapKey);
|
||
}
|
||
return true;
|
||
}
|
||
|
||
public bool HasSubscribeInstance(string instanceId)
|
||
{
|
||
if (_instanceSubscribers.ContainsKey(instanceId))
|
||
{
|
||
return true;
|
||
}
|
||
return false;
|
||
}
|
||
|
||
|
||
public async Task SendMessageToRoomBatchAsync<T>(List<RoomMsg<T>> syncMsgs)
|
||
{
|
||
if (syncMsgs.Count == 0) return;
|
||
|
||
foreach (var group in syncMsgs.GroupBy(m => m.RoomId))
|
||
{
|
||
if (!_instanceSubscribers.TryGetValue(group.Key, out var subscriberConnIds)) continue;
|
||
|
||
// 1. 复用:从池获取消息列表
|
||
//var msgList = _syncMsgListPool.Get();
|
||
//try
|
||
//{
|
||
//msgList.AddRange(group);
|
||
|
||
|
||
//int msgBytesLength = CreateSerializeMessage(group.ToList(), out byte[] msgBytes);
|
||
|
||
// 封装WebSocket消息
|
||
int msgBytesLength = CreateSerializeMessage(group.ToList(), out byte[] msgBytes);
|
||
|
||
// 3. 复用:从池获取失效连接列表
|
||
var deadConnIds = _deadConnListPool.Get();
|
||
try
|
||
{
|
||
foreach (var connKv in subscriberConnIds)
|
||
{
|
||
if (_connections.TryGetValue(connKv.Key, out var socket))
|
||
{
|
||
if (msgBytesLength != 0)
|
||
{
|
||
// 3. 发送时传递有效长度
|
||
_ = SendToSingleConnAsync(socket, msgBytes, msgBytesLength, connKv.Key, deadConnIds);
|
||
}
|
||
else
|
||
{
|
||
// 3. 发送整串字节数组
|
||
_ = SendToSingleConnAsync(socket, msgBytes, msgBytes.Length, connKv.Key, deadConnIds);
|
||
}
|
||
}
|
||
}
|
||
|
||
// 清理失效连接
|
||
foreach (var deadConnId in deadConnIds)
|
||
{
|
||
RemoveConnection(deadConnId);
|
||
}
|
||
}
|
||
finally
|
||
{
|
||
_deadConnListPool.Return(deadConnIds); // 归还,自动清空
|
||
}
|
||
//}
|
||
//finally
|
||
//{
|
||
// _syncMsgListPool.Return(msgList); // 归还,自动清空
|
||
//}
|
||
}
|
||
}
|
||
|
||
private int CreateSerializeMessage<T>(T msg, out byte[] msgBytes)
|
||
{
|
||
// 2. 序列化逻辑优化(完全复用池化数组)
|
||
int msgBytesLength = 0; // 记录有效长度
|
||
// 2. 复用:从池获取字节数组(核心使用 _byteArrayPool)
|
||
var pooledBytes = _byteArrayPool.Get();
|
||
try
|
||
{
|
||
using (var tempMs = new MemoryStream())
|
||
{
|
||
MessagePackSerializer.Serialize(tempMs, msg);
|
||
msgBytesLength = (int)tempMs.Position;
|
||
|
||
if (msgBytesLength <= pooledBytes.Length)
|
||
{
|
||
tempMs.Position = 0;
|
||
tempMs.Read(pooledBytes, 0, msgBytesLength);
|
||
msgBytes = pooledBytes; // 直接复用池化数组
|
||
}
|
||
else
|
||
{
|
||
msgBytes = tempMs.ToArray();// 超出池化数组长度,使用新分配的数组
|
||
msgBytesLength = 0;
|
||
}
|
||
}
|
||
}
|
||
finally
|
||
{
|
||
_byteArrayPool.Return(pooledBytes);
|
||
}
|
||
return msgBytesLength;
|
||
}
|
||
|
||
// 1. 定义带长度的发送方法
|
||
private async Task SendToSingleConnAsync(WebSocket socket, byte[] data, int dataLength, string connId, List<string> deadConnIds)
|
||
{
|
||
try
|
||
{
|
||
if (socket.State == WebSocketState.Open)
|
||
{
|
||
// 只发送有效长度的字节,而非全量4KB
|
||
await socket.SendAsync(
|
||
new ArraySegment<byte>(data, 0, dataLength),
|
||
WebSocketMessageType.Binary,
|
||
true,
|
||
CancellationToken.None
|
||
);
|
||
}
|
||
else
|
||
{
|
||
deadConnIds.Add(connId);
|
||
}
|
||
}
|
||
catch (Exception ex)
|
||
{
|
||
Console.WriteLine($"[WS] 发送失败 {connId}:{ex.Message}");
|
||
deadConnIds.Add(connId);
|
||
}
|
||
}
|
||
|
||
//private async Task SendToSingleConnAsync(WebSocket socket, byte[] data, string connId, List<string> deadConnIds)
|
||
//{
|
||
// try
|
||
// {
|
||
// if (socket.State == WebSocketState.Open)
|
||
// {
|
||
// await socket.SendAsync(new ArraySegment<byte>(data), WebSocketMessageType.Binary, true, CancellationToken.None);
|
||
// }
|
||
// else
|
||
// {
|
||
// deadConnIds.Add(connId);
|
||
// }
|
||
// }
|
||
// catch (Exception ex)
|
||
// {
|
||
// Console.WriteLine($"[WS .NET 10] 发送失败 {connId}:{ex.Message}");
|
||
// deadConnIds.Add(connId);
|
||
// }
|
||
//}
|
||
|
||
public async ValueTask DisposeAsync()
|
||
{
|
||
foreach (var (_, socket) in _connections)
|
||
{
|
||
if (socket.State == WebSocketState.Open)
|
||
{
|
||
await socket.CloseAsync(WebSocketCloseStatus.NormalClosure, "Server shutdown", CancellationToken.None);
|
||
}
|
||
socket.Dispose();
|
||
}
|
||
_connections.Clear();
|
||
_instanceSubscribers.Clear();
|
||
}
|
||
}
|
||
} |