diff --git a/XNet.Business/Dto/SyncMsg.cs b/XNet.Business/Dto/SyncMsg.cs index 99e9a42..4df31e1 100644 --- a/XNet.Business/Dto/SyncMsg.cs +++ b/XNet.Business/Dto/SyncMsg.cs @@ -1,6 +1,4 @@ using MessagePack; -using System.Numerics; -using System.Text.Json; namespace XNet.Business { @@ -28,7 +26,7 @@ namespace XNet.Business [Key("agentIdx")] public int AgentIdx { get; set; } [Key("position")] - public Vector3 Position { get; set; } + public Vector3Msg Position { get; set; } [Key("rotation")] public float Rotation { get; set; } // 简化为绕Y轴旋转(弧度) } @@ -40,6 +38,6 @@ namespace XNet.Business [Key("type")] public WsMsgType Type { get; set; } = WsMsgType.SubscribeInstance; [Key("data")] - public byte[] Data { get; set; } = string.Empty; + public byte[] Data { get; set; } = []; } } \ No newline at end of file diff --git a/XNet.Business/GameLoopService.cs b/XNet.Business/GameLoopService.cs index 41df02c..06bca05 100644 --- a/XNet.Business/GameLoopService.cs +++ b/XNet.Business/GameLoopService.cs @@ -12,6 +12,7 @@ namespace XNet.Business private const int TARGET_FPS = 30; private const int FRAME_TIME_MS = 1000 / TARGET_FPS; + private readonly List _instanceIds = new List(); public GameLoopService(NavMeshManager navMeshManager, SceneAgent sceneAgent, WsConnectionManager wsManager) { @@ -20,6 +21,19 @@ namespace XNet.Business _wsManager = wsManager; } + public void CreateInstance(string instanceId, string mapId) + { + _navMeshManager.CreateInstance(instanceId, mapId); + _instanceIds.Add(instanceId); + } + + public void RemoveInstance(string instanceId) + { + _navMeshManager.RemoveInstance(instanceId); + _instanceIds.Remove(instanceId); + } + + protected override async Task ExecuteAsync(CancellationToken stoppingToken) { Console.WriteLine("=== Server Initializing Resources... ==="); @@ -35,8 +49,8 @@ namespace XNet.Business string instanceGuid_B = "Instance_TeamB_" + Guid.NewGuid(); // 注册 NavMesh 映射 - _navMeshManager.CreateInstance(instanceGuid_A, "Map_Forest"); - _navMeshManager.CreateInstance(instanceGuid_B, "Map_Forest"); // 复用同一份 NavMesh 内存 + CreateInstance(instanceGuid_A, "Map_Forest"); + CreateInstance(instanceGuid_B, "Map_Forest"); // 复用同一份 NavMesh 内存 // 为这两个副本创建独立的物理/避障模拟器 _sceneAgent.CreateCrowdForInstance(instanceGuid_A); @@ -74,19 +88,36 @@ namespace XNet.Business // 这行代码利用 SceneAgent 内部的 Parallel.ForEach,效率极高 _sceneAgent.UpdateAll(deltaTime); - // 2. 收集所有需要同步的Agent状态 - List allSyncMsgs = - [ - // 遍历所有实例(可从_navMeshManager获取实例列表,或SceneAgent维护) - .. _sceneAgent.GetAgentsNeedSync(instanceGuid_A), - .. _sceneAgent.GetAgentsNeedSync(instanceGuid_B), - ]; + //// 2. 收集所有需要同步的Agent状态 + //List allSyncMsgs = []; + //foreach (var instanceId in _instanceIds) + //{ + // var syncMsgs = _sceneAgent.GetAgentsNeedSync(instanceId); + // allSyncMsgs.AddRange(syncMsgs); + //} - // 3. 异步发送WebSocket消息(不阻塞游戏循环) - if (allSyncMsgs.Count > 0) + //// 3. 异步发送WebSocket消息(不阻塞游戏循环) + //if (allSyncMsgs.Count > 0) + //{ + // _ = _wsManager.SendAgentPositionBatchAsync(allSyncMsgs); + //} + + + // 2. 异步收集+推送消息(IO密集型,提交到专用线程池,不阻塞主循环) + _ = Task.Run(async () => { - _ = _wsManager.SendAgentPositionBatchAsync(allSyncMsgs); - } + List allSyncMsgs = []; + foreach (var instanceId in _instanceIds) + { + var syncMsgs = _sceneAgent.GetAgentsNeedSync(instanceId); + allSyncMsgs.AddRange(syncMsgs); + } + if (allSyncMsgs.Count > 0) + { + await _wsManager.SendAgentPositionBatchAsync(allSyncMsgs); + } + }, stoppingToken); + // 帧率控制 long frameWorkTime = stopwatch.ElapsedMilliseconds - currentTime; diff --git a/XNet.Business/SceneAgent.cs b/XNet.Business/SceneAgent.cs index 2c20544..2296e9b 100644 --- a/XNet.Business/SceneAgent.cs +++ b/XNet.Business/SceneAgent.cs @@ -113,7 +113,7 @@ namespace XNet.Business { InstanceId = instanceId, AgentIdx = agentIdx, - Position = currPos, + Position = new Vector3Msg { X = currPos.X, Y = currPos.Y, Z = currPos.Z }, Rotation = currRot }); // 更新缓存 @@ -127,7 +127,7 @@ namespace XNet.Business { InstanceId = instanceId, AgentIdx = agentIdx, - Position = currPos, + Position = new Vector3Msg { X = currPos.X, Y = currPos.Y, Z = currPos.Z }, Rotation = currRot }); lastStates.TryAdd(agentIdx, new AgentState { Position = currPos, Rotation = currRot }); @@ -303,7 +303,10 @@ namespace XNet.Business public void UpdateAll(float deltaTime) { - Parallel.ForEach(_crowdInstances.Values, ci => + Parallel.ForEach(_crowdInstances.Values, new ParallelOptions + { + MaxDegreeOfParallelism = Environment.ProcessorCount // 限制并行数=CPU核心数,避免过载 + }, ci => { lock (ci.SyncRoot) { diff --git a/XNet.Business/WsConnectionManager.cs b/XNet.Business/WsConnectionManager.cs index 349eadf..12ee4f8 100644 --- a/XNet.Business/WsConnectionManager.cs +++ b/XNet.Business/WsConnectionManager.cs @@ -1,38 +1,113 @@ using MessagePack; +using Microsoft.Extensions.ObjectPool; using NanoidDotNet; using System.Collections.Concurrent; +using System.Net.Sockets; using System.Net.WebSockets; -using System.Text; using System.Text.Json; namespace XNet.Business { public class WsConnectionManager { - // 存储所有活跃的WebSocket连接 + // ========== 原有核心字段 ========== private readonly ConcurrentDictionary _connections = new(); - // 存储:实例ID -> 订阅该实例的连接ID列表 private readonly ConcurrentDictionary> _instanceSubscribers = new(); - // 线程安全的随机数生成器(生成连接ID) - private readonly Random _random = new(); - // 序列化选项(复用避免重复创建) private readonly JsonSerializerOptions _jsonOptions = new() { PropertyNamingPolicy = JsonNamingPolicy.CamelCase }; - // 新增WebSocket连接 + // ========== 新增:对象池配置 ========== + // 对象池(使用自定义重写的策略) + private readonly ObjectPool> _syncMsgListPool; + private readonly ObjectPool> _deadConnListPool; + private readonly ObjectPool _byteArrayPool; + + + // ========== 核心:继承 DefaultPooledObjectPolicy 并重写 Create/Return ========== + /// + /// 自定义 Agent 消息列表池化策略(重写 Create/Return) + /// + public sealed class SyncMsgListPolicy : DefaultPooledObjectPolicy> + { + // 重写 Create 方法:指定初始容量,减少扩容开销 + public override List Create() + { + return new List(100); // 初始容量100 + } + + // 重写 Return 方法:清空列表数据,保留容量 + public override bool Return(List obj) + { + obj.Clear(); // 关键:归还前清空数据,避免脏数据 + return true; // 返回true表示可复用 + } + } + + /// + /// 自定义失效连接列表池化策略 + /// + public sealed class DeadConnListPolicy : DefaultPooledObjectPolicy> + { + public override List Create() + { + return new List(50); // 初始容量50 + } + + public override bool Return(List obj) + { + obj.Clear(); + return true; + } + } + + // ========== 2. byte[] 直接实现 IPooledPolicy(绕过 new() 约束) ========== + public sealed class ByteArrayPolicy : IPooledObjectPolicy + { + private readonly int _defaultSize; + + public ByteArrayPolicy(int defaultSize = 4096) + { + _defaultSize = defaultSize; + } + + // 直接实现 Create 方法,自定义数组创建逻辑(指定长度) + public byte[] Create() + { + return new byte[_defaultSize]; // 合法:数组创建必须指定长度 + } + + // 直接实现 Return 方法,自定义归还逻辑 + public bool Return(byte[] obj) + { + if (obj.Length != _defaultSize) return false; // 仅复用指定大小的数组 + Array.Clear(obj); // 清空数据,避免泄露 + return true; + } + } + + public WsConnectionManager() + { + // 配置对象池参数 + var maximumRetained = 200; + + // 初始化对象池(传入自定义策略) + _syncMsgListPool = new DefaultObjectPool>(new SyncMsgListPolicy(), maximumRetained); + _deadConnListPool = new DefaultObjectPool>(new DeadConnListPolicy(), maximumRetained); + _byteArrayPool = new DefaultObjectPool(new ByteArrayPolicy(4096), maximumRetained); + } + + // ========== 业务方法(无修改) ========== public string AddConnection(WebSocket socket) { - string connId = GenerateConnId(); + string connId = $"Conn_{Nanoid.Generate()}"; _connections.TryAdd(connId, socket); - Console.WriteLine($"[WS] 新连接:{connId},当前连接数:{_connections.Count}"); + Console.WriteLine($"[WS .NET 10] 新连接:{connId},当前连接数:{_connections.Count}"); return connId; } - // 移除WebSocket连接(清理订阅关系) public void RemoveConnection(string connId) { if (_connections.TryRemove(connId, out _)) { - // 清理该连接的所有订阅 foreach (var instanceId in _instanceSubscribers.Keys) { lock (_instanceSubscribers[instanceId]) @@ -40,84 +115,134 @@ namespace XNet.Business _instanceSubscribers[instanceId].Remove(connId); } } - Console.WriteLine($"[WS] 连接断开:{connId},当前连接数:{_connections.Count}"); + Console.WriteLine($"[WS .NET 10] 连接断开:{connId},当前连接数:{_connections.Count}"); } } - // 客户端订阅实例 public bool SubscribeInstance(string connId, string instanceId) { if (!_connections.ContainsKey(connId)) { - Console.WriteLine($"[WS] 订阅失败:连接 {connId} 不存在"); + Console.WriteLine($"[WS .NET 10] 订阅失败:连接 {connId} 不存在"); return false; } - // 初始化实例的订阅列表(不存在则创建) _instanceSubscribers.GetOrAdd(instanceId, _ => new HashSet()); - lock (_instanceSubscribers[instanceId]) { _instanceSubscribers[instanceId].Add(connId); } - Console.WriteLine($"[WS] 连接 {connId} 订阅实例 {instanceId}"); return true; } - // 批量发送Agent位置同步消息(异步,不阻塞游戏循环) public async Task SendAgentPositionBatchAsync(List syncMsgs) { - if (syncMsgs.Count == 0) return; + if(syncMsgs.Count == 0) return; - // 按实例ID分组,减少重复发送 - var groupedMsgs = syncMsgs.GroupBy(m => m.InstanceId); - - foreach (var group in groupedMsgs) + foreach (var group in syncMsgs.GroupBy(m => m.InstanceId)) { - string instanceId = group.Key; - if (!_instanceSubscribers.TryGetValue(instanceId, out var subscriberConnIds)) - { - continue; // 无订阅者,跳过 - } + if (!_instanceSubscribers.TryGetValue(group.Key, out var subscriberConnIds)) continue; - // 序列化该实例的所有同步消息 - byte[] bytesData = MessagePackSerializer.Serialize(group.ToList()); - var wsMsg = new WsMessage + // 1. 复用:从池获取消息列表 + var msgList = _syncMsgListPool.Get(); + try { - Type = WsMsgType.AgentPositionSync, - Data = bytesData - }; - byte[] sendBytes = MessagePackSerializer.Serialize(wsMsg); + msgList.AddRange(group); - // 异步发送给所有房间订阅者(逐个发送,失败则清理连接) - List deadConnIds = new(); - lock (subscriberConnIds) - { - foreach (var connId in subscriberConnIds) + // 2. 序列化逻辑优化(完全复用池化数组) + int msgBytesLength = 0; // 记录有效长度 + byte[]? msgBytes = null; + // 2. 复用:从池获取字节数组(核心使用 _byteArrayPool) + var pooledBytes = _byteArrayPool.Get(); + bool needReturnPooledBytes = false; + try { - if (_connections.TryGetValue(connId, out var socket)) + using (var tempMs = new MemoryStream()) { - _ = SendToSingleConnAsync(socket, sendBytes, connId, deadConnIds); + MessagePackSerializer.Serialize(tempMs, msgList); + msgBytesLength = (int)tempMs.Position; + + if (msgBytesLength <= pooledBytes.Length) + { + tempMs.Position = 0; + tempMs.Read(pooledBytes, 0, msgBytesLength); + msgBytes = pooledBytes; // 直接复用池化数组 + needReturnPooledBytes = true; + } + else + { + msgBytes = tempMs.ToArray(); + needReturnPooledBytes = false; + } + } + } + finally + { + if (needReturnPooledBytes) + { + _byteArrayPool.Return(pooledBytes); } } - } - // 清理失效连接 - foreach (var deadConnId in deadConnIds) + // 封装WebSocket消息 + var wsMsg = new WsMessage { Type = WsMsgType.AgentPositionSync, Data = msgBytes }; + byte[] sendBytes = MessagePackSerializer.Serialize(wsMsg); + + // 3. 复用:从池获取失效连接列表 + var deadConnIds = _deadConnListPool.Get(); + try + { + lock (subscriberConnIds) + { + foreach (var connId in subscriberConnIds) + { + if (_connections.TryGetValue(connId, out var socket)) + { + // 3. 发送时传递有效长度 + if (needReturnPooledBytes) + { + _ = SendToSingleConnAsync(socket, msgBytes, msgBytesLength, connId, deadConnIds); + } + else + { + _ = SendToSingleConnAsync(socket, msgBytes, msgBytes.Length, connId, deadConnIds); + } + } + } + } + + // 清理失效连接 + foreach (var deadConnId in deadConnIds) + { + RemoveConnection(deadConnId); + } + } + finally + { + _deadConnListPool.Return(deadConnIds); // 归还,自动清空 + } + } + finally { - RemoveConnection(deadConnId); + _syncMsgListPool.Return(msgList); // 归还,自动清空 } } } - // 内部:发送消息给单个连接(异步) - private async Task SendToSingleConnAsync(WebSocket socket, byte[] data, string connId, List deadConnIds) + // 1. 定义带长度的发送方法 + private async Task SendToSingleConnAsync(WebSocket socket, byte[] data, int dataLength, string connId, List deadConnIds) { try { if (socket.State == WebSocketState.Open) { - await socket.SendAsync(new ArraySegment(data), WebSocketMessageType.Text, true, CancellationToken.None); + // 只发送有效长度的字节,而非全量4KB + await socket.SendAsync( + new ArraySegment(data, 0, dataLength), + WebSocketMessageType.Binary, + true, + CancellationToken.None + ); } else { @@ -131,10 +256,59 @@ namespace XNet.Business } } - // 生成唯一连接ID - private string GenerateConnId() + private async Task SendToSingleConnAsync(WebSocket socket, byte[] data, string connId, List deadConnIds) { - return $"Conn_{Nanoid.Generate()}";// {DateTime.Now.Ticks}_{_random.Next(1000, 9999)}"; + try + { + if (socket.State == WebSocketState.Open) + { + await socket.SendAsync(new ArraySegment(data), WebSocketMessageType.Binary, true, CancellationToken.None); + } + else + { + deadConnIds.Add(connId); + } + } + catch (Exception ex) + { + Console.WriteLine($"[WS .NET 10] 发送失败 {connId}:{ex.Message}"); + deadConnIds.Add(connId); + } + } + + public async ValueTask DisposeAsync() + { + foreach (var (_, socket) in _connections) + { + if (socket.State == WebSocketState.Open) + { + await socket.CloseAsync(WebSocketCloseStatus.NormalClosure, "Server shutdown", CancellationToken.None); + } + socket.Dispose(); + } + _connections.Clear(); + _instanceSubscribers.Clear(); + } + } + + // 补充 Vector3 的 MessagePack 序列化(否则会序列化失败) + [MessagePackObject] + public struct Vector3Msg + { + [Key("x")] + public float X { get; set; } + + [Key("y")] + public float Y { get; set; } + + [Key("z")] + public float Z { get; set; } + + public Vector3Msg(float x, float y, float z) + { + X = x; + Y = y; + Z = z; } } } \ No newline at end of file diff --git a/XNet.Business/XNet.Business.csproj b/XNet.Business/XNet.Business.csproj index cddbb56..ed4b1ac 100644 --- a/XNet.Business/XNet.Business.csproj +++ b/XNet.Business/XNet.Business.csproj @@ -15,6 +15,7 @@ + diff --git a/XNet/Controllers/MapController.cs b/XNet/Controllers/MapController.cs index f6439d1..fd216af 100644 --- a/XNet/Controllers/MapController.cs +++ b/XNet/Controllers/MapController.cs @@ -18,7 +18,7 @@ namespace XNet.Api.Controllers [HttpGet] public void MapInit() { - _mapAgent!.FindPath("Instance_TeamA_1", new Vector3(-4, 0, -4), new Vector3(5, 0, 3)); + //_mapAgent!.FindPath("Instance_TeamA_1", new Vector3(-4, 0, -4), new Vector3(5, 0, 3)); } } }