我正在使用C#和SignalR开发实时客户端 - 服务器应用程序。我需要尽快向客户发送消息。我在服务器上的代码:
for (int i = 0; i < totalRecords; i++)
{
hubContext.Clients.Client(clientList[c].Key).addMessage(
serverId, RecordsList[i].type + RecordsList[i].value);
Thread.Sleep(50);
}
如果有延迟> = 50 ms,一切正常,但如果没有延迟或延迟小于50 ms,则会丢失一些消息。我需要尽可能快地发送消息。我想我需要检查是否收到消息,并且只在发送另一个消息之后。 如何以正确的方式做到这一点?
SignalR不保证消息传递。由于SignalR在调用客户端方法时不会阻塞,因此您可以非常快速地调用客户端方法。不幸的是,客户端可能并不总是准备好在发送消息后立即接收消息,因此SignalR必须缓冲消息。
一般来说,SignalR每个客户端最多可缓冲1000条消息。一旦客户端落后超过1000条消息,它将开始丢失消息。可以增加1000的DefaultMessageBufferSize,但这会增加SignalR的内存使用量,但仍然无法保证消息传递。
http://www.asp.net/signalr/overview/signalr-20/performance-and-scaling/signalr-performance#tuning
如果您想保证邮件传递,您必须自己确认。您可以按照建议仅在确认上一条消息后发送消息。如果等待每条消息的ACK太慢,您也可以一次确认多条消息。
在收到其他客户端的确认之前,您需要重新发送消息。
而不是立即发送消息,将它们排队并让后台线程/定时器发送消息。
这是一个可行的高性能队列。
public class MessageQueue : IDisposable
{
private readonly ConcurrentQueue<Message> _messages = new ConcurrentQueue<Message>();
public int InQueue => _messages.Count;
public int SendInterval { get; }
private readonly Timer _sendTimer;
private readonly ISendMessage _messageSender;
public MessageQueue(ISendMessage messageSender, uint sendInterval) {
_messageSender = messageSender ?? throw new ArgumentNullException(nameof(messageSender));
SendInterval = (int)sendInterval;
_sendTimer = new Timer(timerTick, this, Timeout.Infinite, Timeout.Infinite);
}
public void Start() {
_sendTimer.Change(SendInterval, Timeout.Infinite);
}
private readonly ConcurrentQueue<Guid> _recentlyReceived = new ConcurrentQueue<Guid>();
public void ResponseReceived(Guid id) {
if (_recentlyReceived.Contains(id)) return; // We've already received a reply for this message
// Store current message locally
var message = _currentSendingMessage;
if (message == null || id != message.MessageId)
throw new InvalidOperationException($"Received response {id}, but that message hasn't been sent.");
// Unset to signify that the message has been successfully sent
_currentSendingMessage = null;
// We keep id's of recently received messages because it's possible to receive a reply
// more than once, since we're sending the message more than once.
_recentlyReceived.Enqueue(id);
if(_recentlyReceived.Count > 100) {
_recentlyReceived.TryDequeue(out var _);
}
}
public void Enqueue(Message m) {
_messages.Enqueue(m);
}
// We may access this variable from multiple threads, but there's no need to lock.
// The worst thing that can happen is we send the message again after we've already
// received a reply.
private Message _currentSendingMessage;
private void timerTick(object state) {
try {
var message = _currentSendingMessage;
// Get next message to send
if (message == null) {
_messages.TryDequeue(out message);
// Store so we don't have to peek the queue and conditionally dequeue
_currentSendingMessage = message;
}
if (message == null) return; // Nothing to send
// Send Message
_messageSender.Send(message);
} finally {
// Only start the timer again if we're done ticking.
try {
_sendTimer.Change(SendInterval, Timeout.Infinite);
} catch (ObjectDisposedException) {
}
}
}
public void Dispose() {
_sendTimer.Dispose();
}
}
public interface ISendMessage
{
void Send(Message message);
}
public class Message
{
public Guid MessageId { get; }
public string MessageData { get; }
public Message(string messageData) {
MessageId = Guid.NewGuid();
MessageData = messageData ?? throw new ArgumentNullException(nameof(messageData));
}
}
这是使用MessageQueue
的一些示例代码
public class Program
{
static void Main(string[] args) {
try {
const int TotalMessageCount = 1000;
var messageSender = new SimulatedMessageSender();
using (var messageQueue = new MessageQueue(messageSender, 10)) {
messageSender.Initialize(messageQueue);
for (var i = 0; i < TotalMessageCount; i++) {
messageQueue.Enqueue(new Message(i.ToString()));
}
var startTime = DateTime.Now;
Console.WriteLine("Starting message queue");
messageQueue.Start();
while (messageQueue.InQueue > 0) {
Thread.Yield(); // Want to use Thread.Sleep or Task.Delay in the real world.
}
var endTime = DateTime.Now;
var totalTime = endTime - startTime;
var messagesPerSecond = TotalMessageCount / totalTime.TotalSeconds;
Console.WriteLine($"Messages Per Second: {messagesPerSecond:#.##}");
}
} catch (Exception ex) {
Console.Error.WriteLine($"Unhandled Exception: {ex}");
}
Console.WriteLine();
Console.WriteLine("==== Done ====");
Console.ReadLine();
}
}
public class SimulatedMessageSender : ISendMessage
{
private MessageQueue _queue;
public void Initialize(MessageQueue queue) {
if (_queue != null) throw new InvalidOperationException("Already initialized.");
_queue = queue ?? throw new ArgumentNullException(nameof(queue));
}
private static readonly Random _random = new Random();
public void Send(Message message) {
if (_queue == null) throw new InvalidOperationException("Not initialized");
var chanceOfFailure = _random.Next(0, 20);
// Drop 1 out of 20 messages
// Most connections won't even be this bad.
if (chanceOfFailure != 0) {
_queue.ResponseReceived(message.MessageId);
}
}
}