如何使用SignalR保证消息传递?

问题描述 投票:21回答:2

我正在使用C#和SignalR开发实时客户端 - 服务器应用程序。我需要尽快向客户发送消息。我在服务器上的代码:

for (int i = 0; i < totalRecords; i++)
{
    hubContext.Clients.Client(clientList[c].Key).addMessage(
    serverId, RecordsList[i].type + RecordsList[i].value);
    Thread.Sleep(50);       
}

如果有延迟> = 50 ms,一切正常,但如果没有延迟或延迟小于50 ms,则会丢失一些消息。我需要尽可能快地发送消息。我想我需要检查是否收到消息,并且只在发送另一个消息之后。 如何以正确的方式做到这一点?

c# signalr
2个回答
20
投票

SignalR不保证消息传递。由于SignalR在调用客户端方法时不会阻塞,因此您可以非常快速地调用客户端方法。不幸的是,客户端可能并不总是准备好在发送消息后立即接收消息,因此SignalR必须缓冲消息。

一般来说,SignalR每个客户端最多可缓冲1000条消息。一旦客户端落后超过1000条消息,它将开始丢失消息。可以增加1000的DefaultMessageBufferSize,但这会增加SignalR的内存使用量,但仍然无法保证消息传递。

http://www.asp.net/signalr/overview/signalr-20/performance-and-scaling/signalr-performance#tuning

如果您想保证邮件传递,您必须自己确认。您可以按照建议仅在确认上一条消息后发送消息。如果等待每条消息的ACK太慢,您也可以一次确认多条消息。


8
投票

在收到其他客户端的确认之前,您需要重新发送消息。

而不是立即发送消息,将它们排队并让后台线程/定时器发送消息。

这是一个可行的高性能队列。

public class MessageQueue : IDisposable
{
    private readonly ConcurrentQueue<Message> _messages = new ConcurrentQueue<Message>();

    public int InQueue => _messages.Count;

    public int SendInterval { get; }

    private readonly Timer _sendTimer;
    private readonly ISendMessage _messageSender;

    public MessageQueue(ISendMessage messageSender, uint sendInterval) {
        _messageSender = messageSender ?? throw new ArgumentNullException(nameof(messageSender));
        SendInterval = (int)sendInterval;
        _sendTimer = new Timer(timerTick, this, Timeout.Infinite, Timeout.Infinite);
    }

    public void Start() {
        _sendTimer.Change(SendInterval, Timeout.Infinite);
    }

    private readonly ConcurrentQueue<Guid> _recentlyReceived = new ConcurrentQueue<Guid>();

    public void ResponseReceived(Guid id) {
        if (_recentlyReceived.Contains(id)) return; // We've already received a reply for this message

        // Store current message locally
        var message = _currentSendingMessage;

        if (message == null || id != message.MessageId)
            throw new InvalidOperationException($"Received response {id}, but that message hasn't been sent.");

        // Unset to signify that the message has been successfully sent
        _currentSendingMessage = null;

        // We keep id's of recently received messages because it's possible to receive a reply
        // more than once, since we're sending the message more than once.
        _recentlyReceived.Enqueue(id);

        if(_recentlyReceived.Count > 100) {
            _recentlyReceived.TryDequeue(out var _);
        }
    }

    public void Enqueue(Message m) {
        _messages.Enqueue(m);
    }

    // We may access this variable from multiple threads, but there's no need to lock.
    // The worst thing that can happen is we send the message again after we've already
    // received a reply.
    private Message _currentSendingMessage;

    private void timerTick(object state) {
        try {
            var message = _currentSendingMessage;

            // Get next message to send
            if (message == null) {
                _messages.TryDequeue(out message);

                // Store so we don't have to peek the queue and conditionally dequeue
                _currentSendingMessage = message;
            }

            if (message == null) return; // Nothing to send

            // Send Message
            _messageSender.Send(message);
        } finally {
            // Only start the timer again if we're done ticking.
            try {
                _sendTimer.Change(SendInterval, Timeout.Infinite);
            } catch (ObjectDisposedException) {

            }
        }
    }

    public void Dispose() {
        _sendTimer.Dispose();
    }
}

public interface ISendMessage
{
    void Send(Message message);
}

public class Message
{
    public Guid MessageId { get; }

    public string MessageData { get; }

    public Message(string messageData) {
        MessageId = Guid.NewGuid();
        MessageData = messageData ?? throw new ArgumentNullException(nameof(messageData));
    }
}

这是使用MessageQueue的一些示例代码

public class Program
{
    static void Main(string[] args) {
        try {
            const int TotalMessageCount = 1000;

            var messageSender = new SimulatedMessageSender();

            using (var messageQueue = new MessageQueue(messageSender, 10)) {
                messageSender.Initialize(messageQueue);

                for (var i = 0; i < TotalMessageCount; i++) {
                    messageQueue.Enqueue(new Message(i.ToString()));
                }

                var startTime = DateTime.Now;

                Console.WriteLine("Starting message queue");

                messageQueue.Start();

                while (messageQueue.InQueue > 0) {
                    Thread.Yield(); // Want to use Thread.Sleep or Task.Delay in the real world.
                }

                var endTime = DateTime.Now;

                var totalTime = endTime - startTime;

                var messagesPerSecond = TotalMessageCount / totalTime.TotalSeconds;

                Console.WriteLine($"Messages Per Second: {messagesPerSecond:#.##}");
            }
        } catch (Exception ex) {
            Console.Error.WriteLine($"Unhandled Exception: {ex}");
        }

        Console.WriteLine();
        Console.WriteLine("==== Done ====");

        Console.ReadLine();
    }
}

public class SimulatedMessageSender : ISendMessage
{
    private MessageQueue _queue;

    public void Initialize(MessageQueue queue) {
        if (_queue != null) throw new InvalidOperationException("Already initialized.");

        _queue = queue ?? throw new ArgumentNullException(nameof(queue));
    }

    private static readonly Random _random = new Random();

    public void Send(Message message) {
        if (_queue == null) throw new InvalidOperationException("Not initialized");

        var chanceOfFailure = _random.Next(0, 20);

        // Drop 1 out of 20 messages
        // Most connections won't even be this bad.
        if (chanceOfFailure != 0) {
            _queue.ResponseReceived(message.MessageId);
        }
    }
}
© www.soinside.com 2019 - 2024. All rights reserved.