我有一个简单的套接字监听器应用程序。它需要能够接收请求并给出响应,并且还自己发送请求并接收响应。
一旦我的应用程序启动,它将开始在一个单独的线程中接收并发送响应。这部分工作正常。
但是,当我通过SendRequest()
-Method发送请求时,我需要过滤传入的响应,因此正确的响应会转到之前所做的正确的requets。我使用类ResponseHandler
执行此操作(如下面的代码所示),它允许我注册请求,并在收到正确响应后立即通知我的注册请求。但是,放置请求应在10秒后超时,因此我使用的是CountdownEvent
,等待这10秒,但是如果早些时候出现响应的话会更早发布。
问题:我的CountdownEvent
总是等待整整10秒,只有在那之后,我收到消息的线程才会继续,从而收到响应。当我收到另一个帖子时,这怎么可能?我想,即使CountdownEvent.Wait()
处于活动状态,我的程序仍继续在该单独的线程中接收。
注意:在我发出请求后,等待的响应会立即回复,如同NetworkTool WireShark所示。因此超时不正确。
编辑:在一个简单的WPF应用程序中,从按钮调用SendRequest(),它可以工作。不幸的是,这意味着我的大计划就是问题。
服务:
public class Service
{
private readonly ResponseHandler _responseHandler;
private readonly SyncSocketServer _serverSocket;
private static readonly int ServerPort = 9090;
public Service()
{
_responseHandler = new ResponseHandler();
_serverSocket = new SyncSocketServer(ServerPort);
_serverSocket.StartListening();
_serverSocket.DataReceived += ServerSocket_DataReceived;
}
public void ServerSocket_DataReceived(object sender, string message)
{
// Here I left irrelevant code out: Originally, I check here,
// whether the message is a request or response and so on, and
// I only forward the message to the _responseHandler, if it is
// indeed a response. If it is a request I send an answer.
string messageId = GetIdFromMessage(message);
_responseHandler.DataReceived(messageId, message);
}
public void SendRequest(string message)
{
string messageId = Guid.NewGuid().ToString();
string request = CreateRequest(messageId, message);
_responseHandler.Register(messageId);
_serverSocket.Send(request);
string response = _responseHandler.WaitForResponse(messageId);
Debug.WriteLine("I got the correct response: " + response);
}
}
SyncSocketServer:
public class SyncSocketServer
{
public event EventHandler<string> DataReceived;
private const int BufferSize = 1024;
private const string EndDelimiter = "\n";
private Socket _listenerSocket;
private Socket _client;
private string _data;
private Byte[] _buffer;
private readonly int _port;
public SyncSocketServer(int port)
{
_port = port;
_buffer = new Byte[BufferSize];
}
public void StartListening()
{
IPHostEntry ipHostInfo = Dns.GetHostEntry(Dns.GetHostName());
IPAddress ipAddress = ipHostInfo.AddressList[3];
IPEndPoint localEndPoint = new IPEndPoint(ipAddress, _port);
_listenerSocket = new Socket(ipAddress.AddressFamily, SocketType.Stream, ProtocolType.Tcp);
_listenerSocket.Bind(localEndPoint);
_listenerSocket.Listen(5);
_client = _listenerSocket.Accept();
Debug.WriteLine("Local socket opened on: {0}", _listenerSocket.LocalEndPoint);
StartReceiving();
}
private void StartReceiving()
{
Thread d = new Thread(() => {
Thread.CurrentThread.IsBackground = true;
while (true)
{
_data = null;
while (true)
{
int bytesReceived = _client.Receive(_buffer);
_data += Encoding.ASCII.GetString(_buffer, 0, bytesReceived);
if (_data.IndexOf(EndDelimiter, StringComparison.OrdinalIgnoreCase) > -1)
break;
}
Debug.WriteLine("Message received:" + _data);
OnDataReceived(_data);
}
});
d.Start();
}
public void Send(string message)
{
byte[] bytesMessage = Encoding.ASCII.GetBytes(message + EndDelimiter);
_client.Send(bytesMessage);
Debug.WriteLine("Message sent: " + message);
}
protected virtual void OnDataReceived(string data)
{
EventHandler<string> handler = DataReceived;
if (handler != null)
handler(this, data);
}
}
ResponseHandler所:
public class ResponseHandler
{
private const int WaitForResponseTimeout = 10000;
private readonly Dictionary<string, PendingRequest> _pendingRequests;
public ResponseHandler()
{
_pendingRequests = new Dictionary<string, PendingRequest>();
}
public void DataReceived(string messageId, string response)
{
_pendingRequests.TryGetValue(messageId, out var pendingRequest);
if (pendingRequest == null)
Debug.WriteLine("Received response for request, that has been removed");
else
{
pendingRequest.ResponseReceived(response);
_pendingRequests.Remove(messageId);
}
}
public void Register(string messageId)
{
_pendingRequests.Add(messageId, new PendingRequest());
}
public string WaitForResponse(string messageId)
{
_pendingRequests.TryGetValue(messageId, out var pendingRequest);
if (pendingRequest == null)
return null;
pendingRequest.Await();
return pendingRequest.Response;
}
private class PendingRequest
{
public string Response { get; private set; }
private readonly CountdownEvent _countdownEvent;
public PendingRequest()
{
_countdownEvent = new CountdownEvent(1);
}
public void Await()
{
// Here, the current thread gets blocked, but
// I expect, that the thread, where I receive
// would continue receiving
_countdownEvent.Wait(WaitForResponseTimeout);
}
public void ResponseReceived(stringresponse)
{
Response = response;
_countdownEvent.Signal();
}
}
}
因此,您的PendingRequest
和ResponseHandler
类正在从不同的线程访问。因此,为了您的计划的完整性,您需要做一些事情:
a)确保在添加和删除待处理请求字典中的请求时,您会获得锁定,因为您同时从不同的线程访问共享数据结构。否则,您可能会损坏您的数据结构。
b)你更直接的问题是Await()
中的PendingRequest
方法。您正在调用CountdownEvent.Wait()
而不验证您的响应是否已设置。如果您的响应已经设置,则意味着您需要等待10秒才能处理它。如果您的响应到达,甚至在您调用CountdownEvent.Wait()
之前就会发生这种情况。在这种情况下,CountdownEvent.Signal()
将被忽略。您应该更改PendingRequest.Wait()
如下:
while (Response is not set) {
CountdownEvent.Await();
}
另外,你的CountdownEvent.Wait()
信号量不需要传递给它的互斥量吗?请记住,您的Response
对象正在线程之间共享。这是使用wait()方法的一般范例:
mutex.lock();
while (Response is not set) {
CountdownEvent.Await(mutex);
}
// Do your stuff, since your condition is satisfied
mutext.unlock();
问题实际上是错误的假设,就像我在下面所做的那样触发事件会导致火灾并忘记:
protected virtual void OnDataReceived(string data)
{
EventHandler<string> handler = DataReceived;
if (handler != null)
handler(this, data);
}
在函数StartReceiving()
中,我接收数据并将其转发给订阅者,它会在呼叫时暂停,触发事件并等待所有订阅者完成其工作(当然,包括等待10秒的响应) 。这导致我的接收器线程等待另一个线程的事实。
解决方案是,为了实现调用,它会发生火灾并忘记:
protected virtual void OnDataReceived(string data)
{
EventHandler<string> handler = DataReceived;
if (handler != null)
handler.BeginInvoke(this, data, null, null);
}