XNet/XNet.Business/WsConnectionManager.cs
2025-12-25 15:50:11 +08:00

252 lines
9.4 KiB
C#
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

using MessagePack;
using Microsoft.Extensions.ObjectPool;
using NanoidDotNet;
using System.Collections.Concurrent;
using System.Net.Sockets;
using System.Net.WebSockets;
using System.Text.Json;
namespace XNet.Business
{
public class WsConnectionManager
{
// ========== 原有核心字段 ==========
private readonly ConcurrentDictionary<string, WebSocket> _connections = new();
private readonly ConcurrentDictionary<string, HashSet<string>> _instanceSubscribers = new();
private readonly JsonSerializerOptions _jsonOptions = new() { PropertyNamingPolicy = JsonNamingPolicy.CamelCase };
// ========== 新增:对象池配置 ==========
// 对象池(使用自定义重写的策略)
private readonly ObjectPool<List<AgentPositionSyncMsg>> _syncMsgListPool;
private readonly ObjectPool<List<string>> _deadConnListPool;
private readonly ObjectPool<byte[]> _byteArrayPool;
public WsConnectionManager()
{
// 配置对象池参数
var maximumRetained = 200;
// 初始化对象池(传入自定义策略)
_syncMsgListPool = new DefaultObjectPool<List<AgentPositionSyncMsg>>(new SyncMsgListPolicy(), maximumRetained);
_deadConnListPool = new DefaultObjectPool<List<string>>(new DeadConnListPolicy(), maximumRetained);
_byteArrayPool = new DefaultObjectPool<byte[]>(new ByteArrayPolicy(4096), maximumRetained);
}
// ========== 业务方法(无修改) ==========
public string AddConnection(WebSocket socket)
{
string connId = $"Conn_{Nanoid.Generate()}";
_connections.TryAdd(connId, socket);
Console.WriteLine($"[WS .NET 10] 新连接:{connId},当前连接数:{_connections.Count}");
return connId;
}
public void RemoveConnection(string connId)
{
if (_connections.TryRemove(connId, out _))
{
foreach (var instanceId in _instanceSubscribers.Keys)
{
lock (_instanceSubscribers[instanceId])
{
_instanceSubscribers[instanceId].Remove(connId);
}
}
Console.WriteLine($"[WS .NET 10] 连接断开:{connId},当前连接数:{_connections.Count}");
}
}
public bool SubscribeInstance(string connId, string instanceId)
{
if (!_connections.ContainsKey(connId))
{
Console.WriteLine($"[WS .NET 10] 订阅失败:连接 {connId} 不存在");
return false;
}
_instanceSubscribers.GetOrAdd(instanceId, _ => new HashSet<string>());
lock (_instanceSubscribers[instanceId])
{
_instanceSubscribers[instanceId].Add(connId);
}
return true;
}
public async Task SendAgentPositionBatchAsync(List<AgentPositionSyncMsg> syncMsgs)
{
if(syncMsgs.Count == 0) return;
foreach (var group in syncMsgs.GroupBy(m => m.InstanceId))
{
if (!_instanceSubscribers.TryGetValue(group.Key, out var subscriberConnIds)) continue;
// 1. 复用:从池获取消息列表
var msgList = _syncMsgListPool.Get();
try
{
msgList.AddRange(group);
// 2. 序列化逻辑优化(完全复用池化数组)
int msgBytesLength = 0; // 记录有效长度
byte[]? msgBytes = null;
// 2. 复用:从池获取字节数组(核心使用 _byteArrayPool
var pooledBytes = _byteArrayPool.Get();
bool needReturnPooledBytes = false;
try
{
using (var tempMs = new MemoryStream())
{
MessagePackSerializer.Serialize(tempMs, msgList);
msgBytesLength = (int)tempMs.Position;
if (msgBytesLength <= pooledBytes.Length)
{
tempMs.Position = 0;
tempMs.Read(pooledBytes, 0, msgBytesLength);
msgBytes = pooledBytes; // 直接复用池化数组
needReturnPooledBytes = true;
}
else
{
msgBytes = tempMs.ToArray();
needReturnPooledBytes = false;
}
}
}
finally
{
if (needReturnPooledBytes)
{
_byteArrayPool.Return(pooledBytes);
}
}
// 封装WebSocket消息
var wsMsg = new WsMessage { Type = WsMsgType.AgentPositionSync, Data = msgBytes };
byte[] sendBytes = MessagePackSerializer.Serialize(wsMsg);
// 3. 复用:从池获取失效连接列表
var deadConnIds = _deadConnListPool.Get();
try
{
lock (subscriberConnIds)
{
foreach (var connId in subscriberConnIds)
{
if (_connections.TryGetValue(connId, out var socket))
{
// 3. 发送时传递有效长度
if (needReturnPooledBytes)
{
_ = SendToSingleConnAsync(socket, msgBytes, msgBytesLength, connId, deadConnIds);
}
else
{
_ = SendToSingleConnAsync(socket, msgBytes, msgBytes.Length, connId, deadConnIds);
}
}
}
}
// 清理失效连接
foreach (var deadConnId in deadConnIds)
{
RemoveConnection(deadConnId);
}
}
finally
{
_deadConnListPool.Return(deadConnIds); // 归还,自动清空
}
}
finally
{
_syncMsgListPool.Return(msgList); // 归还,自动清空
}
}
}
// 1. 定义带长度的发送方法
private async Task SendToSingleConnAsync(WebSocket socket, byte[] data, int dataLength, string connId, List<string> deadConnIds)
{
try
{
if (socket.State == WebSocketState.Open)
{
// 只发送有效长度的字节而非全量4KB
await socket.SendAsync(
new ArraySegment<byte>(data, 0, dataLength),
WebSocketMessageType.Binary,
true,
CancellationToken.None
);
}
else
{
deadConnIds.Add(connId);
}
}
catch (Exception ex)
{
Console.WriteLine($"[WS] 发送失败 {connId}{ex.Message}");
deadConnIds.Add(connId);
}
}
//private async Task SendToSingleConnAsync(WebSocket socket, byte[] data, string connId, List<string> deadConnIds)
//{
// try
// {
// if (socket.State == WebSocketState.Open)
// {
// await socket.SendAsync(new ArraySegment<byte>(data), WebSocketMessageType.Binary, true, CancellationToken.None);
// }
// else
// {
// deadConnIds.Add(connId);
// }
// }
// catch (Exception ex)
// {
// Console.WriteLine($"[WS .NET 10] 发送失败 {connId}{ex.Message}");
// deadConnIds.Add(connId);
// }
//}
public async ValueTask DisposeAsync()
{
foreach (var (_, socket) in _connections)
{
if (socket.State == WebSocketState.Open)
{
await socket.CloseAsync(WebSocketCloseStatus.NormalClosure, "Server shutdown", CancellationToken.None);
}
socket.Dispose();
}
_connections.Clear();
_instanceSubscribers.Clear();
}
}
// 补充 Vector3 的 MessagePack 序列化(否则会序列化失败)
[MessagePackObject]
public struct Vector3Msg
{
[Key("x")]
public float X { get; set; }
[Key("y")]
public float Y { get; set; }
[Key("z")]
public float Z { get; set; }
public Vector3Msg(float x, float y, float z)
{
X = x;
Y = y;
Z = z;
}
}
}