XNet/XNet.Business/WsConnectionManager.cs
2025-12-25 14:32:23 +08:00

140 lines
5.1 KiB
C#
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

using MessagePack;
using NanoidDotNet;
using System.Collections.Concurrent;
using System.Net.WebSockets;
using System.Text;
using System.Text.Json;
namespace XNet.Business
{
public class WsConnectionManager
{
// 存储所有活跃的WebSocket连接
private readonly ConcurrentDictionary<string, WebSocket> _connections = new();
// 存储实例ID -> 订阅该实例的连接ID列表
private readonly ConcurrentDictionary<string, HashSet<string>> _instanceSubscribers = new();
// 线程安全的随机数生成器生成连接ID
private readonly Random _random = new();
// 序列化选项(复用避免重复创建)
private readonly JsonSerializerOptions _jsonOptions = new() { PropertyNamingPolicy = JsonNamingPolicy.CamelCase };
// 新增WebSocket连接
public string AddConnection(WebSocket socket)
{
string connId = GenerateConnId();
_connections.TryAdd(connId, socket);
Console.WriteLine($"[WS] 新连接:{connId},当前连接数:{_connections.Count}");
return connId;
}
// 移除WebSocket连接清理订阅关系
public void RemoveConnection(string connId)
{
if (_connections.TryRemove(connId, out _))
{
// 清理该连接的所有订阅
foreach (var instanceId in _instanceSubscribers.Keys)
{
lock (_instanceSubscribers[instanceId])
{
_instanceSubscribers[instanceId].Remove(connId);
}
}
Console.WriteLine($"[WS] 连接断开:{connId},当前连接数:{_connections.Count}");
}
}
// 客户端订阅实例
public bool SubscribeInstance(string connId, string instanceId)
{
if (!_connections.ContainsKey(connId))
{
Console.WriteLine($"[WS] 订阅失败:连接 {connId} 不存在");
return false;
}
// 初始化实例的订阅列表(不存在则创建)
_instanceSubscribers.GetOrAdd(instanceId, _ => new HashSet<string>());
lock (_instanceSubscribers[instanceId])
{
_instanceSubscribers[instanceId].Add(connId);
}
Console.WriteLine($"[WS] 连接 {connId} 订阅实例 {instanceId}");
return true;
}
// 批量发送Agent位置同步消息异步不阻塞游戏循环
public async Task SendAgentPositionBatchAsync(List<AgentPositionSyncMsg> syncMsgs)
{
if (syncMsgs.Count == 0) return;
// 按实例ID分组减少重复发送
var groupedMsgs = syncMsgs.GroupBy(m => m.InstanceId);
foreach (var group in groupedMsgs)
{
string instanceId = group.Key;
if (!_instanceSubscribers.TryGetValue(instanceId, out var subscriberConnIds))
{
continue; // 无订阅者,跳过
}
// 序列化该实例的所有同步消息
byte[] jsonData = MessagePackSerializer.Serialize(group.ToList());
var wsMsg = new WsMessage
{
Type = WsMsgType.AgentPositionSync,
Data = jsonData
};
byte[] sendBytes = MessagePackSerializer.Serialize(wsMsg);
// 异步发送给所有订阅者(逐个发送,失败则清理连接)
List<string> deadConnIds = new();
lock (subscriberConnIds)
{
foreach (var connId in subscriberConnIds)
{
if (_connections.TryGetValue(connId, out var socket))
{
_ = SendToSingleConnAsync(socket, sendBytes, connId, deadConnIds);
}
}
}
// 清理失效连接
foreach (var deadConnId in deadConnIds)
{
RemoveConnection(deadConnId);
}
}
}
// 内部:发送消息给单个连接(异步)
private async Task SendToSingleConnAsync(WebSocket socket, byte[] data, string connId, List<string> deadConnIds)
{
try
{
if (socket.State == WebSocketState.Open)
{
await socket.SendAsync(new ArraySegment<byte>(data), WebSocketMessageType.Text, true, CancellationToken.None);
}
else
{
deadConnIds.Add(connId);
}
}
catch (Exception ex)
{
Console.WriteLine($"[WS] 发送失败 {connId}{ex.Message}");
deadConnIds.Add(connId);
}
}
// 生成唯一连接ID
private string GenerateConnId()
{
return $"Conn_{Nanoid.Generate()}";// {DateTime.Now.Ticks}_{_random.Next(1000, 9999)}";
}
}
}