XNet/XNet.Business/WsConnectionManager.cs

314 lines
12 KiB
C#
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

using MessagePack;
using Microsoft.Extensions.ObjectPool;
using NanoidDotNet;
using System.Collections.Concurrent;
using System.Net.Sockets;
using System.Net.WebSockets;
using System.Text.Json;
namespace XNet.Business
{
public class WsConnectionManager
{
// ========== 原有核心字段 ==========
private readonly ConcurrentDictionary<string, WebSocket> _connections = new();
private readonly ConcurrentDictionary<string, HashSet<string>> _instanceSubscribers = new();
private readonly JsonSerializerOptions _jsonOptions = new() { PropertyNamingPolicy = JsonNamingPolicy.CamelCase };
// ========== 新增:对象池配置 ==========
// 对象池(使用自定义重写的策略)
private readonly ObjectPool<List<AgentPositionSyncMsg>> _syncMsgListPool;
private readonly ObjectPool<List<string>> _deadConnListPool;
private readonly ObjectPool<byte[]> _byteArrayPool;
// ========== 核心:继承 DefaultPooledObjectPolicy 并重写 Create/Return ==========
/// <summary>
/// 自定义 Agent 消息列表池化策略(重写 Create/Return
/// </summary>
public sealed class SyncMsgListPolicy : DefaultPooledObjectPolicy<List<AgentPositionSyncMsg>>
{
// 重写 Create 方法:指定初始容量,减少扩容开销
public override List<AgentPositionSyncMsg> Create()
{
return new List<AgentPositionSyncMsg>(100); // 初始容量100
}
// 重写 Return 方法:清空列表数据,保留容量
public override bool Return(List<AgentPositionSyncMsg> obj)
{
obj.Clear(); // 关键:归还前清空数据,避免脏数据
return true; // 返回true表示可复用
}
}
/// <summary>
/// 自定义失效连接列表池化策略
/// </summary>
public sealed class DeadConnListPolicy : DefaultPooledObjectPolicy<List<string>>
{
public override List<string> Create()
{
return new List<string>(50); // 初始容量50
}
public override bool Return(List<string> obj)
{
obj.Clear();
return true;
}
}
// ========== 2. byte[] 直接实现 IPooledPolicy绕过 new() 约束) ==========
public sealed class ByteArrayPolicy : IPooledObjectPolicy<byte[]>
{
private readonly int _defaultSize;
public ByteArrayPolicy(int defaultSize = 4096)
{
_defaultSize = defaultSize;
}
// 直接实现 Create 方法,自定义数组创建逻辑(指定长度)
public byte[] Create()
{
return new byte[_defaultSize]; // 合法:数组创建必须指定长度
}
// 直接实现 Return 方法,自定义归还逻辑
public bool Return(byte[] obj)
{
if (obj.Length != _defaultSize) return false; // 仅复用指定大小的数组
Array.Clear(obj); // 清空数据,避免泄露
return true;
}
}
public WsConnectionManager()
{
// 配置对象池参数
var maximumRetained = 200;
// 初始化对象池(传入自定义策略)
_syncMsgListPool = new DefaultObjectPool<List<AgentPositionSyncMsg>>(new SyncMsgListPolicy(), maximumRetained);
_deadConnListPool = new DefaultObjectPool<List<string>>(new DeadConnListPolicy(), maximumRetained);
_byteArrayPool = new DefaultObjectPool<byte[]>(new ByteArrayPolicy(4096), maximumRetained);
}
// ========== 业务方法(无修改) ==========
public string AddConnection(WebSocket socket)
{
string connId = $"Conn_{Nanoid.Generate()}";
_connections.TryAdd(connId, socket);
Console.WriteLine($"[WS .NET 10] 新连接:{connId},当前连接数:{_connections.Count}");
return connId;
}
public void RemoveConnection(string connId)
{
if (_connections.TryRemove(connId, out _))
{
foreach (var instanceId in _instanceSubscribers.Keys)
{
lock (_instanceSubscribers[instanceId])
{
_instanceSubscribers[instanceId].Remove(connId);
}
}
Console.WriteLine($"[WS .NET 10] 连接断开:{connId},当前连接数:{_connections.Count}");
}
}
public bool SubscribeInstance(string connId, string instanceId)
{
if (!_connections.ContainsKey(connId))
{
Console.WriteLine($"[WS .NET 10] 订阅失败:连接 {connId} 不存在");
return false;
}
_instanceSubscribers.GetOrAdd(instanceId, _ => new HashSet<string>());
lock (_instanceSubscribers[instanceId])
{
_instanceSubscribers[instanceId].Add(connId);
}
return true;
}
public async Task SendAgentPositionBatchAsync(List<AgentPositionSyncMsg> syncMsgs)
{
if(syncMsgs.Count == 0) return;
foreach (var group in syncMsgs.GroupBy(m => m.InstanceId))
{
if (!_instanceSubscribers.TryGetValue(group.Key, out var subscriberConnIds)) continue;
// 1. 复用:从池获取消息列表
var msgList = _syncMsgListPool.Get();
try
{
msgList.AddRange(group);
// 2. 序列化逻辑优化(完全复用池化数组)
int msgBytesLength = 0; // 记录有效长度
byte[]? msgBytes = null;
// 2. 复用:从池获取字节数组(核心使用 _byteArrayPool
var pooledBytes = _byteArrayPool.Get();
bool needReturnPooledBytes = false;
try
{
using (var tempMs = new MemoryStream())
{
MessagePackSerializer.Serialize(tempMs, msgList);
msgBytesLength = (int)tempMs.Position;
if (msgBytesLength <= pooledBytes.Length)
{
tempMs.Position = 0;
tempMs.Read(pooledBytes, 0, msgBytesLength);
msgBytes = pooledBytes; // 直接复用池化数组
needReturnPooledBytes = true;
}
else
{
msgBytes = tempMs.ToArray();
needReturnPooledBytes = false;
}
}
}
finally
{
if (needReturnPooledBytes)
{
_byteArrayPool.Return(pooledBytes);
}
}
// 封装WebSocket消息
var wsMsg = new WsMessage { Type = WsMsgType.AgentPositionSync, Data = msgBytes };
byte[] sendBytes = MessagePackSerializer.Serialize(wsMsg);
// 3. 复用:从池获取失效连接列表
var deadConnIds = _deadConnListPool.Get();
try
{
lock (subscriberConnIds)
{
foreach (var connId in subscriberConnIds)
{
if (_connections.TryGetValue(connId, out var socket))
{
// 3. 发送时传递有效长度
if (needReturnPooledBytes)
{
_ = SendToSingleConnAsync(socket, msgBytes, msgBytesLength, connId, deadConnIds);
}
else
{
_ = SendToSingleConnAsync(socket, msgBytes, msgBytes.Length, connId, deadConnIds);
}
}
}
}
// 清理失效连接
foreach (var deadConnId in deadConnIds)
{
RemoveConnection(deadConnId);
}
}
finally
{
_deadConnListPool.Return(deadConnIds); // 归还,自动清空
}
}
finally
{
_syncMsgListPool.Return(msgList); // 归还,自动清空
}
}
}
// 1. 定义带长度的发送方法
private async Task SendToSingleConnAsync(WebSocket socket, byte[] data, int dataLength, string connId, List<string> deadConnIds)
{
try
{
if (socket.State == WebSocketState.Open)
{
// 只发送有效长度的字节而非全量4KB
await socket.SendAsync(
new ArraySegment<byte>(data, 0, dataLength),
WebSocketMessageType.Binary,
true,
CancellationToken.None
);
}
else
{
deadConnIds.Add(connId);
}
}
catch (Exception ex)
{
Console.WriteLine($"[WS] 发送失败 {connId}{ex.Message}");
deadConnIds.Add(connId);
}
}
private async Task SendToSingleConnAsync(WebSocket socket, byte[] data, string connId, List<string> deadConnIds)
{
try
{
if (socket.State == WebSocketState.Open)
{
await socket.SendAsync(new ArraySegment<byte>(data), WebSocketMessageType.Binary, true, CancellationToken.None);
}
else
{
deadConnIds.Add(connId);
}
}
catch (Exception ex)
{
Console.WriteLine($"[WS .NET 10] 发送失败 {connId}{ex.Message}");
deadConnIds.Add(connId);
}
}
public async ValueTask DisposeAsync()
{
foreach (var (_, socket) in _connections)
{
if (socket.State == WebSocketState.Open)
{
await socket.CloseAsync(WebSocketCloseStatus.NormalClosure, "Server shutdown", CancellationToken.None);
}
socket.Dispose();
}
_connections.Clear();
_instanceSubscribers.Clear();
}
}
// 补充 Vector3 的 MessagePack 序列化(否则会序列化失败)
[MessagePackObject]
public struct Vector3Msg
{
[Key("x")]
public float X { get; set; }
[Key("y")]
public float Y { get; set; }
[Key("z")]
public float Z { get; set; }
public Vector3Msg(float x, float y, float z)
{
X = x;
Y = y;
Z = z;
}
}
}