我有 ASP.NET Core Web 应用程序设置,其中 SignalR 通过 Redis 进行横向扩展。 使用内置组效果很好:
Clients.Group("Group_Name");
并且可以在多个负载均衡器中生存。我假设 SignalR 自动将这些组保留在 Redis 中,以便所有服务器都知道我们拥有哪些组以及谁订阅了它们。
但是,在我的情况下,我不能只依赖组(或用户),因为没有办法将connectionId(比如重载OnDisconnectedAsync时并且只知道连接id)映射回其组,并且您总是需要 Group_Name 来标识该组。我需要它来识别该组的哪个部分在线,因此当调用
OnDisconnectedAsync
时,我知道这个人属于哪个组,以及他在对话的哪一边。
我做了一些研究,他们都建议(包括 Microsoft Docs)使用类似的东西:
static readonly ConcurrentDictionary<string, ConversationInformation> connectionMaps;
在集线器本身。
现在,这是一个很好的解决方案(并且是线程安全的),只不过它仅存在于一台负载均衡器服务器的内存上,而其他服务器具有该字典的不同实例。
问题是,我必须手动坚持
connectionMaps
吗?例如使用Redis?
类似:
public class ChatHub : Hub
{
static readonly ConcurrentDictionary<string, ConversationInformation> connectionMaps;
ChatHub(IDistributedCache distributedCache)
{
connectionMaps = distributedCache.Get("ConnectionMaps");
/// I think connectionMaps should not be static any more.
}
}
如果是,它是线程安全的吗?如果没有,您能建议一个与负载平衡配合使用的更好的解决方案吗?
为此一直在与同样的问题作斗争。我想到的是将集合保留在 Redis 缓存中,同时利用 StackExchange.Redis.IDatabaseAsync 和锁来处理并发。 不幸的是,这使得整个过程同步,但无法找到解决这个问题的方法。
这是我正在做的核心,这获得了锁并从缓存返回反序列化的集合
private async Task<ConcurrentDictionary<int, HubMedia>> GetMediaAttributes(bool requireLock)
{
if(requireLock)
{
var retryTime = 0;
try
{
while (!await _redisDatabase.LockTakeAsync(_mediaAttributesLock, _lockValue, _defaultLockDuration))
{
//wait till we can get a lock on the data, 100ms by default
await Task.Delay(100);
retryTime += 10;
if (retryTime > _defaultLockDuration.TotalMilliseconds)
{
_logger.LogError("Failed to get Media Attributes");
return null;
}
}
}
catch(TaskCanceledException e)
{
_logger.LogError("Failed to take lock within the default 5 second wait time " + e);
return null;
}
}
var mediaAttributes = await _redisDatabase.StringGetAsync(MEDIA_ATTRIBUTES_LIST);
if (!mediaAttributes.HasValue)
{
return new ConcurrentDictionary<int, HubMedia>();
}
return JsonConvert.DeserializeObject<ConcurrentDictionary<int, HubMedia>>(mediaAttributes);
}
在我完成操作后更新集合
private async Task<bool> UpdateCollection(string redisCollectionKey, object collection, string lockKey)
{
var success = false;
try
{
success = await _redisDatabase.StringSetAsync(redisCollectionKey, JsonConvert.SerializeObject(collection, new JsonSerializerSettings
{
ReferenceLoopHandling = ReferenceLoopHandling.Ignore
}));
}
finally
{
await _redisDatabase.LockReleaseAsync(lockKey, _lockValue);
}
return success;
}
当我完成后,我只是确保释放锁以供其他实例获取和使用
private async Task ReleaseLock(string lockKey)
{
await _redisDatabase.LockReleaseAsync(lockKey, _lockValue);
}
如果您找到更好的方法,我们将很高兴听到。很难找到任何有关数据保留和共享扩展的文档。
如果只是知道如何向特定用户发送消息,那么您可以使用单用户组,如此处所述。
using Microsoft.AspNet.SignalR;
using System;
using System.Threading.Tasks;
namespace BasicChat
{
[Authorize]
public class ChatHub : Hub
{
public void SendChatMessage(string who, string message)
{
string name = Context.User.Identity.Name;
Clients.Group(who).addChatMessage(name + ": " + message);
}
public override Task OnConnected()
{
string name = Context.User.Identity.Name;
Groups.Add(Context.ConnectionId, name);
return base.OnConnected();
}
public override Task OnDisconnected()
{
string name = Context.User.Identity.Name;
Groups.Remove(Context.ConnectionId, name);
return base.OnDisconnected();
}
}
}
另一方面,您需要保留所有用户的连接状态,这会变得有点棘手。如果这是必要的,那么您需要某种持久性(内存字典、数据库、redis 或其他),也许存储上次看到用户的时间,也许与 OnHeartbeat 事件结合使用:
using Microsoft.AspNet.SignalR;
using System;
using System.Threading.Tasks;
namespace BasicChat
{
public class ConnectionData
{
public string ConnectionId { get; set; }
public string UserName { get; set; }
public DateTime LastSeen { get; set; }
}
public interface IConnectionCounter
{
public ConcurrentDictionary<string, string> Connections { get; }
public void RecordConnectionLastSeen(string connectionId, string userName);
public void RemoveConnection(string connectionId);
}
// ... will leave implementing IConnectionCounter to the reader or see
// https://stackoverflow.com/a/66432477/161735 for a more detailed implementation
[Authorize]
public class ChatHub : Hub
{
private readonly IConnectionCounter _connectionCounter;
public ChatHub(IConnectionCounter connectionCounter)
{
_connectionCounter = connectionCounter;
}
public string[] GetOnlineUserNames()
{
// consider a remote persistence layer if your SignalR solution scales out
return _connectionCounter.Connections
.Where(c => LastSeen >= DateTime.UtcNow.AddSeconds(-60)
.Select(c => c.UserName)
.Distinct();
}
public override Task OnConnected()
{
string name = Context.User.Identity.Name;
string connectionId = Context.ConnectionId;
Groups.Add(connectionId, name);
var connectionHeartbeat = Context.Features.Get<IConnectionHeartbeatFeature>();
connectionHeartbeat.OnHeartbeat(heartbeatData =>
{
// Be careful of this. It runs something like every second for each
// connected client so if you are hitting a database each time this is
// called, you'll overwhelmn it. You may want to record individual
// changes in memory and batch any changes to a persistence layer
_connectionCounter.RecordConnectionLastSeen(connectionId, name);
}, (connectionId, name));
return base.OnConnected();
}
public override Task OnDisconnected()
{
string name = Context.User.Identity.Name;
string connectionId = Context.ConnectionId;
Groups.Remove(connectionId, name);
_connectionCounter.RemoveConnection(connectionId);
return base.OnDisconnected();
}
}
}
请参阅此答案,了解更彻底的实现,其中 IConnectionCounter 中的连接被批量保存在数据库中。