当我在另一个线程上睡觉时,为什么套接字被阻止接收?

问题描述 投票:1回答:2

我有一个简单的套接字监听器应用程序。它需要能够接收请求并给出响应,并且还自己发送请求并接收响应。

一旦我的应用程序启动,它将开始在一个单独的线程中接收并发送响应。这部分工作正常。

但是,当我通过SendRequest()-Method发送请求时,我需要过滤传入的响应,因此正确的响应会转到之前所做的正确的requets。我使用类ResponseHandler执行此操作(如下面的代码所示),它允许我注册请求,并在收到正确响应后立即通知我的注册请求。但是,放置请求应在10秒后超时,因此我使用的是CountdownEvent,等待这10秒,但是如果早些时候出现响应的话会更早发布。

问题:我的CountdownEvent总是等待整整10秒,只有在那之后,我收到消息的线程才会继续,从而收到响应。当我收到另一个帖子时,这怎么可能?我想,即使CountdownEvent.Wait()处于活动状态,我的程序仍继续在该单独的线程中接收。

注意:在我发出请求后,等待的响应会立即回复,如同NetworkTool WireShark所示。因此超时不正确。

编辑:在一个简单的WPF应用程序中,从按钮调用SendRequest(),它可以工作。不幸的是,这意味着我的大计划就是问题。

服务:

public class Service
{
    private readonly ResponseHandler _responseHandler;
    private readonly SyncSocketServer _serverSocket;

    private static readonly int ServerPort = 9090;

    public Service()
    {
        _responseHandler = new ResponseHandler();

        _serverSocket = new SyncSocketServer(ServerPort);
        _serverSocket.StartListening();
        _serverSocket.DataReceived += ServerSocket_DataReceived;
    }

    public void ServerSocket_DataReceived(object sender, string message)
    {
        // Here I left irrelevant code out: Originally, I check here,
        // whether the message is a request or response and so on, and 
        // I only forward the message to the _responseHandler, if it is
        // indeed a response. If it is a request I send an answer.

        string messageId = GetIdFromMessage(message);
        _responseHandler.DataReceived(messageId, message);
    }

    public void SendRequest(string message)
    {
        string messageId = Guid.NewGuid().ToString();
        string request = CreateRequest(messageId, message);

        _responseHandler.Register(messageId);
        _serverSocket.Send(request);
        string response = _responseHandler.WaitForResponse(messageId);

        Debug.WriteLine("I got the correct response: " + response);
    }
}

SyncSocketServer:

public class SyncSocketServer
{
    public event EventHandler<string> DataReceived;

    private const int BufferSize = 1024;
    private const string EndDelimiter = "\n";

    private Socket _listenerSocket;
    private Socket _client;
    private string _data;
    private Byte[] _buffer;

    private readonly int _port;

    public SyncSocketServer(int port)
    {
        _port = port;
        _buffer = new Byte[BufferSize];
    }

    public void StartListening()
    {
        IPHostEntry ipHostInfo = Dns.GetHostEntry(Dns.GetHostName());
        IPAddress ipAddress = ipHostInfo.AddressList[3];
        IPEndPoint localEndPoint = new IPEndPoint(ipAddress, _port);

        _listenerSocket = new Socket(ipAddress.AddressFamily, SocketType.Stream, ProtocolType.Tcp);

        _listenerSocket.Bind(localEndPoint);
        _listenerSocket.Listen(5);

        _client = _listenerSocket.Accept();
        Debug.WriteLine("Local socket opened on: {0}", _listenerSocket.LocalEndPoint);

        StartReceiving();
    }

    private void StartReceiving()
    {
        Thread d = new Thread(() => {
            Thread.CurrentThread.IsBackground = true;
            while (true)
            {
                _data = null;

                while (true)
                {
                    int bytesReceived = _client.Receive(_buffer);
                    _data += Encoding.ASCII.GetString(_buffer, 0, bytesReceived);

                    if (_data.IndexOf(EndDelimiter, StringComparison.OrdinalIgnoreCase) > -1)
                        break;
                }

                Debug.WriteLine("Message received:" + _data);
                OnDataReceived(_data);
            }
        });
        d.Start();
    }

    public void Send(string message)
    {
        byte[] bytesMessage = Encoding.ASCII.GetBytes(message + EndDelimiter);
        _client.Send(bytesMessage);
        Debug.WriteLine("Message sent: " + message);
    }

    protected virtual void OnDataReceived(string data)
    {
        EventHandler<string> handler = DataReceived;

        if (handler != null)
            handler(this, data);
    }
}

ResponseHandler所:

public class ResponseHandler
{
    private const int WaitForResponseTimeout = 10000;

    private readonly Dictionary<string, PendingRequest> _pendingRequests;

    public ResponseHandler()
    {
        _pendingRequests = new Dictionary<string, PendingRequest>();
    }

    public void DataReceived(string messageId, string response)
    {
        _pendingRequests.TryGetValue(messageId, out var pendingRequest);

        if (pendingRequest == null)
            Debug.WriteLine("Received response for request, that has been removed");
        else
        {
            pendingRequest.ResponseReceived(response);
            _pendingRequests.Remove(messageId);
        }
    }

    public void Register(string messageId)
    {
        _pendingRequests.Add(messageId, new PendingRequest());
    }

    public string WaitForResponse(string messageId)
    {
        _pendingRequests.TryGetValue(messageId, out var pendingRequest);

        if (pendingRequest == null)
            return null;

        pendingRequest.Await();
        return pendingRequest.Response;
    }

    private class PendingRequest
    {
        public string Response { get; private set; }

        private readonly CountdownEvent _countdownEvent;

        public PendingRequest()
        {
            _countdownEvent = new CountdownEvent(1);
        }

        public void Await()
        {
            // Here, the current thread gets blocked, but
            // I expect, that the thread, where I receive
            // would continue receiving
            _countdownEvent.Wait(WaitForResponseTimeout);
        }

        public void ResponseReceived(stringresponse)
        {
            Response = response;
            _countdownEvent.Signal();
        }
    }
}
c# multithreading sockets tcp
2个回答
1
投票

因此,您的PendingRequestResponseHandler类正在从不同的线程访问。因此,为了您的计划的完整性,您需要做一些事情:

a)确保在添加和删除待处理请求字典中的请求时,您会获得锁定,因为您同时从不同的线程访问共享数据结构。否则,您可能会损坏您的数据结构。

b)你更直接的问题是Await()中的PendingRequest方法。您正在调用CountdownEvent.Wait()而不验证您的响应是否已设置。如果您的响应已经设置,则意味着您需要等待10秒才能处理它。如果您的响应到达,甚至在您调用CountdownEvent.Wait()之前就会发生这种情况。在这种情况下,CountdownEvent.Signal()将被忽略。您应该更改PendingRequest.Wait()如下:

while (Response is not set) {
      CountdownEvent.Await();
}

另外,你的CountdownEvent.Wait()信号量不需要传递给它的互斥量吗?请记住,您的Response对象正在线程之间共享。这是使用wait()方法的一般范例:

mutex.lock();
while (Response is not set) {
          CountdownEvent.Await(mutex);
    }

// Do your stuff, since your condition is satisfied
mutext.unlock();

0
投票

问题实际上是错误的假设,就像我在下面所做的那样触发事件会导致火灾并忘记:

protected virtual void OnDataReceived(string data)
{
    EventHandler<string> handler = DataReceived;

    if (handler != null)
        handler(this, data);
}

在函数StartReceiving()中,我接收数据并将其转发给订阅者,它会在呼叫时暂停,触发事件并等待所有订阅者完成其工作(当然,包括等待10秒的响应) 。这导致我的接收器线程等待另一个线程的事实。


解决方案是,为了实现调用,它会发生火灾并忘记:

protected virtual void OnDataReceived(string data)
{
    EventHandler<string> handler = DataReceived;

    if (handler != null)
        handler.BeginInvoke(this, data, null, null);
}
© www.soinside.com 2019 - 2024. All rights reserved.