将 SignalR hub 与 Redis 结合使用时,如何跨负载均衡器服务器跨越 ConcurrentDictionary

问题描述 投票:0回答:2

我有 ASP.NET Core Web 应用程序设置,其中 SignalR 通过 Redis 进行横向扩展。 使用内置组效果很好:

Clients.Group("Group_Name");

并且可以在多个负载均衡器中生存。我假设 SignalR 自动将这些组保留在 Redis 中,以便所有服务器都知道我们拥有哪些组以及谁订阅了它们。

但是,在我的情况下,我不能只依赖组(或用户),因为没有办法将connectionId(比如重载OnDisconnectedAsync时并且只知道连接id)映射回其组,并且您总是需要 Group_Name 来标识该组。我需要它来识别该组的哪个部分在线,因此当调用

OnDisconnectedAsync
时,我知道这个人属于哪个组,以及他在对话的哪一边。

我做了一些研究,他们都建议(包括 Microsoft Docs)使用类似的东西:

static readonly ConcurrentDictionary<string, ConversationInformation> connectionMaps;

在集线器本身。

现在,这是一个很好的解决方案(并且是线程安全的),只不过它仅存在于一台负载均衡器服务器的内存上,而其他服务器具有该字典的不同实例。

问题是,我必须手动坚持

connectionMaps
吗?例如使用Redis?

类似:

public class ChatHub : Hub
{
    static readonly ConcurrentDictionary<string, ConversationInformation> connectionMaps;

    ChatHub(IDistributedCache distributedCache)
    {

        connectionMaps = distributedCache.Get("ConnectionMaps");
       /// I think connectionMaps should not be static any more.
    }
}

如果是,它是线程安全的吗?如果没有,您能建议一个与负载平衡配合使用的更好的解决方案吗?

asp.net-core redis signalr asp.net-core-signalr concurrentdictionary
2个回答
1
投票

为此一直在与同样的问题作斗争。我想到的是将集合保留在 Redis 缓存中,同时利用 StackExchange.Redis.IDatabaseAsync 和锁来处理并发。 不幸的是,这使得整个过程同步,但无法找到解决这个问题的方法。

这是我正在做的核心,这获得了锁并从缓存返回反序列化的集合


    private async Task<ConcurrentDictionary<int, HubMedia>> GetMediaAttributes(bool requireLock)
        {
            if(requireLock)
            {
                var retryTime = 0;
                try
                {
                    while (!await _redisDatabase.LockTakeAsync(_mediaAttributesLock, _lockValue, _defaultLockDuration))
                    {
                        //wait till we can get a lock on the data, 100ms by default
                        await Task.Delay(100);
                        retryTime += 10;
                        if (retryTime > _defaultLockDuration.TotalMilliseconds)
                        {
                            _logger.LogError("Failed to get Media Attributes");
                            return null;
                        }
                    }
                }
                catch(TaskCanceledException e)
                {
                    _logger.LogError("Failed to take lock within the default 5 second wait time " + e);
                    return null;
                }

            }
            var mediaAttributes = await _redisDatabase.StringGetAsync(MEDIA_ATTRIBUTES_LIST);
            if (!mediaAttributes.HasValue)
            {
                return new ConcurrentDictionary<int, HubMedia>();
            }
            return JsonConvert.DeserializeObject<ConcurrentDictionary<int, HubMedia>>(mediaAttributes);
        }

在我完成操作后更新集合

        private async Task<bool> UpdateCollection(string redisCollectionKey, object collection, string lockKey)
        {
            var success = false;
            try
            {
                success = await _redisDatabase.StringSetAsync(redisCollectionKey, JsonConvert.SerializeObject(collection, new JsonSerializerSettings
                {
                    ReferenceLoopHandling = ReferenceLoopHandling.Ignore
                }));
            }
            finally
            {
                await _redisDatabase.LockReleaseAsync(lockKey, _lockValue);
            }
            return success;
        }

当我完成后,我只是确保释放锁以供其他实例获取和使用


private async Task ReleaseLock(string lockKey)
        {
            await _redisDatabase.LockReleaseAsync(lockKey, _lockValue);
        }

如果您找到更好的方法,我们将很高兴听到。很难找到任何有关数据保留和共享扩展的文档。


0
投票

如果只是知道如何向特定用户发送消息,那么您可以使用单用户组,如此处所述。

using Microsoft.AspNet.SignalR;
using System;
using System.Threading.Tasks;

namespace BasicChat
{
    [Authorize]
    public class ChatHub : Hub
    {
        public void SendChatMessage(string who, string message)
        {
            string name = Context.User.Identity.Name;

            Clients.Group(who).addChatMessage(name + ": " + message);
        }

        public override Task OnConnected()
        {
            string name = Context.User.Identity.Name;

            Groups.Add(Context.ConnectionId, name);

            return base.OnConnected();
        }

        public override Task OnDisconnected()
        {
            string name = Context.User.Identity.Name;

            Groups.Remove(Context.ConnectionId, name);

            return base.OnDisconnected();
        }
    }
}

另一方面,您需要保留所有用户的连接状态,这会变得有点棘手。如果这是必要的,那么您需要某种持久性(内存字典、数据库、redis 或其他),也许存储上次看到用户的时间,也许与 OnHeartbeat 事件结合使用:

using Microsoft.AspNet.SignalR;
using System;
using System.Threading.Tasks;

namespace BasicChat
{
    public class ConnectionData
    {
        public string ConnectionId { get; set; } 
        public string UserName { get; set; }
        public DateTime LastSeen { get; set; }
    }

    public interface IConnectionCounter
    {
        public ConcurrentDictionary<string, string> Connections { get; }
        public void RecordConnectionLastSeen(string connectionId, string userName);
        public void RemoveConnection(string connectionId);
    }

    // ... will leave implementing IConnectionCounter to the reader or see
    // https://stackoverflow.com/a/66432477/161735 for a more detailed implementation

    [Authorize]
    public class ChatHub : Hub
    {
        private readonly IConnectionCounter _connectionCounter;

        public ChatHub(IConnectionCounter connectionCounter)
        {
            _connectionCounter = connectionCounter;
        }

        public string[] GetOnlineUserNames()
        {
            // consider a remote persistence layer if your SignalR solution scales out
            return _connectionCounter.Connections
                .Where(c => LastSeen >= DateTime.UtcNow.AddSeconds(-60)
                .Select(c => c.UserName)
                .Distinct();
        }

        public override Task OnConnected()
        {
            string name = Context.User.Identity.Name;
            string connectionId = Context.ConnectionId;

            Groups.Add(connectionId, name);

            var connectionHeartbeat = Context.Features.Get<IConnectionHeartbeatFeature>();

            connectionHeartbeat.OnHeartbeat(heartbeatData =>
            { 
                // Be careful of this. It runs something like every second for each 
                // connected client so if you are hitting a database each time this is
                // called, you'll overwhelmn it. You may want to record individual 
                // changes in memory and batch any changes to a persistence layer
                _connectionCounter.RecordConnectionLastSeen(connectionId, name);
            }, (connectionId, name));

            return base.OnConnected();
        }

        public override Task OnDisconnected()
        {
            string name = Context.User.Identity.Name;
            string connectionId = Context.ConnectionId;

            Groups.Remove(connectionId, name);
            _connectionCounter.RemoveConnection(connectionId);

            return base.OnDisconnected();
        }
    }
}

请参阅此答案,了解更彻底的实现,其中 IConnectionCounter 中的连接被批量保存在数据库中。

© www.soinside.com 2019 - 2024. All rights reserved.