c# 全双工异步命名管道.NET

问题描述 投票:0回答:4

我正在尝试在两台不同的机器上(仅)实现全双工客户端-服务器通信方案,其中每个端点(客户端或服务器)可以随时异步(非阻塞管道)发送内容,并且另一端会拿起它并阅读。

(请不要通过参考命名管道之外的其他技术来回答;我需要使用这种特定方法的答案。)

我读到命名管道必须是单向的,否则就会锁定,但我猜这可能是错误的。我认为管道是基于套接字的,我无法想象底层套接字只是单向的。

此问题的任何答案都需要解决这些问题才能真正有用:

  1. 答案需要解决异步管道,我无法使用同步解决方案。
  2. 答案需要证明或考虑到管道保持打开状态的事实。我厌倦了阅读打开管道、传输字符串、然后立即关闭管道的示例。我想要一个答案,假设管道保持打开状态并随机传输大量垃圾,并不断重复。没有挂起。
  3. 基于C#的解决方案

我很抱歉听起来要求很高并且很势利,但是经过几天的互联网搜索,我仍然没有找到一个好的例子,而且我不想使用WCF。如果你知道这个答案的细节并回答得好,我相信这个话题将成为未来几年真正的赢家。如果我弄清楚了,我会自己发布答案。

如果您要写并说“您需要使用两个管道”,请解释为什么,以及您如何知道这是真的,因为我读到的任何内容都无法解释为什么会出现这种情况。

谢谢!

.net asynchronous named-pipes
4个回答
24
投票

您不必使用两个管道。我在网上找到了很多答案,表明您需要使用两个管道。我四处寻找,彻夜不眠,反复尝试,并弄清楚了如何做到这一点,这非常简单,但你必须把一切都做好(尤其是按照正确的调用顺序),否则它就行不通。另一个技巧是始终确保有一个未完成的读调用,否则它也会锁定。在知道有人正在阅读之前不要写。除非您先设置了事件,否则不要启动读取调用。那种事。

这是我正在使用的管道类。它可能不够强大,无法处理管道错误、关闭和溢出。

好吧,我不知道这里出了什么问题,但是格式有点不对! 呜呜呜

namespace Squall
{
    public interface PipeSender
    {
        Task SendCommandAsync(PipeCommandPlusString pCmd);
    }

    /******************************************************************************
     * 
     * 
     * 
     * 
     ******************************************************************************/
    public class ClientPipe : BasicPipe
    {
        NamedPipeClientStream m_pPipe;

        public ClientPipe(string szServerName, string szPipeName)
            : base("Client")
        {
            m_szPipeName = szPipeName; // debugging
            m_pPipe = new NamedPipeClientStream(szServerName, szPipeName, PipeDirection.InOut, PipeOptions.Asynchronous);
            base.SetPipeStream(m_pPipe); // inform base class what to read/write from
        }

        public void Connect()
        {
            Debug.WriteLine("Pipe " + FullPipeNameDebug() + " connecting to server");
            m_pPipe.Connect(); // doesn't seem to be an async method for this routine. just a timeout.
            StartReadingAsync();
        }

        // the client's pipe index is always 0
        internal override int PipeId() { return 0; }
    }

    /******************************************************************************
     * 
     * 
     * 
     * 
     ******************************************************************************/
    public class ServerPipe : BasicPipe
    {
        public event EventHandler<EventArgs> GotConnectionEvent;

        NamedPipeServerStream m_pPipe;
        int m_nPipeId;

        public ServerPipe(string szPipeName, int nPipeId)
            : base("Server")
        {
            m_szPipeName = szPipeName;
            m_nPipeId = nPipeId;
            m_pPipe = new NamedPipeServerStream(
                szPipeName,
                PipeDirection.InOut,
                NamedPipeServerStream.MaxAllowedServerInstances,
                PipeTransmissionMode.Message,
                PipeOptions.Asynchronous);
            base.SetPipeStream(m_pPipe);
            m_pPipe.BeginWaitForConnection(new AsyncCallback(StaticGotPipeConnection), this);
        }

        static void StaticGotPipeConnection(IAsyncResult pAsyncResult)
        {
            ServerPipe pThis = pAsyncResult.AsyncState as ServerPipe;
            pThis.GotPipeConnection(pAsyncResult);
        }

        void GotPipeConnection(IAsyncResult pAsyncResult)
        {
            m_pPipe.EndWaitForConnection(pAsyncResult);

            Debug.WriteLine("Server Pipe " + m_szPipeName + " got a connection");

            if (GotConnectionEvent != null)
            {
                GotConnectionEvent(this, new EventArgs());
            }

            // lodge the first read request to get us going
            //
            StartReadingAsync();
        }

        internal override int PipeId() { return m_nPipeId; }
    }

    /******************************************************************************
     * 
     * 
     * 
     * 
     ******************************************************************************/

    public abstract class BasicPipe : PipeSender
    {
        public static int MaxLen = 1024 * 1024; // why not
        protected string m_szPipeName;
        protected string m_szDebugPipeName;

        public event EventHandler<PipeEventArgs> ReadDataEvent;
        public event EventHandler<EventArgs> PipeClosedEvent;

        protected byte[] m_pPipeBuffer = new byte[BasicPipe.MaxLen];

        PipeStream m_pPipeStream;

        public BasicPipe(string szDebugPipeName)
        {
            m_szDebugPipeName = szDebugPipeName;
        }

        protected void SetPipeStream(PipeStream p)
        {
            m_pPipeStream = p;
        }

        protected string FullPipeNameDebug()
        {
            return m_szDebugPipeName + "-" + m_szPipeName;
        }

        internal abstract int PipeId();

        public void Close()
        {
            m_pPipeStream.WaitForPipeDrain();
            m_pPipeStream.Close();
            m_pPipeStream.Dispose();
            m_pPipeStream = null;
        }

        // called when Server pipe gets a connection, or when Client pipe is created
        public void StartReadingAsync()
        {
            Debug.WriteLine("Pipe " + FullPipeNameDebug() + " calling ReadAsync");

            // okay we're connected, now immediately listen for incoming buffers
            //
            byte[] pBuffer = new byte[MaxLen];
            m_pPipeStream.ReadAsync(pBuffer, 0, MaxLen).ContinueWith(t =>
            {
                Debug.WriteLine("Pipe " + FullPipeNameDebug() + " finished a read request");

                int ReadLen = t.Result;
                if (ReadLen == 0)
                {
                    Debug.WriteLine("Got a null read length, remote pipe was closed");
                    if (PipeClosedEvent != null)
                    {
                        PipeClosedEvent(this, new EventArgs());
                    }
                    return;
                }

                if (ReadDataEvent != null)
                {
                    ReadDataEvent(this, new PipeEventArgs(pBuffer, ReadLen));
                }
                else
                {
                    Debug.Assert(false, "something happened");
                }

                // lodge ANOTHER read request
                //
                StartReadingAsync();

            });
        }

        protected Task WriteByteArray(byte[] pBytes)
        {
            // this will start writing, but does it copy the memory before returning?
            return m_pPipeStream.WriteAsync(pBytes, 0, pBytes.Length);
        }

        public Task SendCommandAsync(PipeCommandPlusString pCmd)
        {
            Debug.WriteLine("Pipe " + FullPipeNameDebug() + ", writing " + pCmd.GetCommand() + "-" + pCmd.GetTransmittedString());
            string szSerializedCmd = JsonConvert.SerializeObject(pCmd);
            byte[] pSerializedCmd = Misc.StringToBytes(szSerializedCmd);
            Task t = WriteByteArray(pSerializedCmd);
            return t;
        }
    }

    /******************************************************************************
     * 
     * 
     * 
     * 
     ******************************************************************************/

    public class PipeEventArgs
    {
        public byte[] m_pData;
        public int m_nDataLen;

        public PipeEventArgs(byte[] pData, int nDataLen)
        {
            // is this a copy, or an alias copy? I can't remember right now.
            m_pData = pData;
            m_nDataLen = nDataLen;
        }
    }

    /******************************************************************************
     * if we're just going to send a string back and forth, then we can use this
     * class. It it allows us to get the bytes as a string. sort of silly.
     ******************************************************************************/

    [Serializable]
    public class PipeCommandPlusString
    {
        public string m_szCommand;  // must be public to be serialized
        public string m_szString;   // ditto

        public PipeCommandPlusString(string sz, string szString)
        {
            m_szCommand = sz;
            m_szString = szString;
        }

        public string GetCommand()
        {
            return m_szCommand;
        }

        public string GetTransmittedString()
        {
            return m_szString;
        }
    }
}

这是我的管道测试,在一个进程上运行。它也运行在两个进程上,我检查过

namespace NamedPipeTest
{
    public partial class Form1 : Form
    {
        SynchronizationContext _context;
        Thread m_pThread = null;
        volatile bool m_bDieThreadDie;
        ServerPipe m_pServerPipe;
        ClientPipe m_pClientPipe;

        public Form1()
        {
            InitializeComponent();
        }

        private void Form1_Load(object sender, EventArgs e)
        {
            _context = SynchronizationContext.Current;

            m_pServerPipe = new ServerPipe("SQUALL_PIPE", 0);
            m_pServerPipe.ReadDataEvent += M_pServerPipe_ReadDataEvent;
            m_pServerPipe.PipeClosedEvent += M_pServerPipe_PipeClosedEvent;

            // m_pThread = new Thread(StaticThreadProc);
            // m_pThread.Start( this );
        }

        private void M_pServerPipe_PipeClosedEvent(object sender, EventArgs e)
        {
            Debug.WriteLine("Server: Pipe was closed, shutting down");

            // have to post this on the main thread
            _context.Post(delegate
            {
                Close();
            }, null);
        }

        private void M_pServerPipe_ReadDataEvent(object sender, PipeEventArgs e)
        {
            // this gets called on an anonymous thread

            byte[] pBytes = e.m_pData;
            string szBytes = Misc.BytesToString(pBytes, e.m_pData.Length);
            PipeCommandPlusString pCmd = JsonConvert.DeserializeObject<PipeCommandPlusString>(szBytes);
            string szValue = pCmd.GetTransmittedString();

            if (szValue == "CONNECT")
            {
                Debug.WriteLine("Got command from client: " + pCmd.GetCommand() + "-" + pCmd.GetTransmittedString() + ", writing command back to client");
                PipeCommandPlusString pCmdToSend = new PipeCommandPlusString("SERVER", "CONNECTED");
                // fire off an async write
                Task t = m_pServerPipe.SendCommandAsync(pCmdToSend);
            }
        }

        static void StaticThreadProc(Object o)
        {
            Form1 pThis = o as Form1;
            pThis.ThreadProc();
        }

        void ThreadProc()
        {
            m_pClientPipe = new ClientPipe(".", "SQUALL_PIPE");
            m_pClientPipe.ReadDataEvent += PClientPipe_ReadDataEvent;
            m_pClientPipe.PipeClosedEvent += M_pClientPipe_PipeClosedEvent;
            m_pClientPipe.Connect();

            PipeCommandPlusString pCmd = new PipeCommandPlusString("CLIENT", "CONNECT");
            int Counter = 1;
            while (Counter++ < 10)
            {
                Debug.WriteLine("Counter = " + Counter);
                m_pClientPipe.SendCommandAsync(pCmd);
                Thread.Sleep(3000);
            }

            while (!m_bDieThreadDie)
            {
                Thread.Sleep(1000);
            }

            m_pClientPipe.ReadDataEvent -= PClientPipe_ReadDataEvent;
            m_pClientPipe.PipeClosedEvent -= M_pClientPipe_PipeClosedEvent;
            m_pClientPipe.Close();
            m_pClientPipe = null;
        }

        private void M_pClientPipe_PipeClosedEvent(object sender, EventArgs e)
        {
            // wait around for server to shut us down
        }

        private void PClientPipe_ReadDataEvent(object sender, PipeEventArgs e)
        {
            byte[] pBytes = e.m_pData;
            string szBytes = Misc.BytesToString(pBytes, e.m_nDataLen);
            PipeCommandPlusString pCmd = JsonConvert.DeserializeObject<PipeCommandPlusString>(szBytes);
            string szValue = pCmd.GetTransmittedString();

            Debug.WriteLine("Got command from server: " + pCmd.GetCommand() + "-" + pCmd.GetTransmittedString());

            if (szValue == "CONNECTED")
            {
                PipeCommandPlusString pCmdToSend = new PipeCommandPlusString("CLIENT", "DATA");
                m_pClientPipe.SendCommandAsync(pCmdToSend);
            }
        }

        private void Form1_FormClosing(object sender, FormClosingEventArgs e)
        {
            if (m_pThread != null)
            {
                m_bDieThreadDie = true;
                m_pThread.Join();
                m_bDieThreadDie = false;
            }

            m_pServerPipe.ReadDataEvent -= M_pServerPipe_ReadDataEvent;
            m_pServerPipe.PipeClosedEvent -= M_pServerPipe_PipeClosedEvent;
            m_pServerPipe.Close();
            m_pServerPipe = null;

        }
    }
}

4
投票

只需将管道创建为重叠的,您的代码就可以阻止一个线程中的读取,同时从另一个线程写入管道。

    void StartServer()
    {
        Task.Factory.StartNew(() =>
        {
            var server = new NamedPipeServerStream("PipesOfPiece", PipeDirection.InOut, 1, PipeTransmissionMode.Byte, PipeOptions.Asynchronous);
            server.WaitForConnection();
            reader = new StreamReader(server);
            writer = new StreamWriter(server);
        });
    }

    private async void timer1_Tick(object sender, EventArgs e)
    {
        timer1.Stop();
        if (null != reader)
        {
            char[] buf = new char[50];

            int count = await reader.ReadAsync(buf, 0, 50);

            if (0 < count)
            {
                m_textBox_from.Text = new string(buf, 0, count);
            }
        }
        timer1.Start();
    }

0
投票

我认为当你进行异步通信时,你必须使用两个管道。

一个是接收管道,另一个是发送管道

因为,你不知道什么时候接收数据。

当您使用一个管道发送一些数据时,recv 数据无法写入管道。

相反,您不能在管道上写入发送数据。

因此您必须使用两个管道进行异步通信。


0
投票

我发现埃里克的解决方案非常有帮助。我正在构建一个跨平台系统,Windows 和 Linux,并使用 .NET 8 让它在这两个平台上运行。

关于 Linux 兼容性的注意事项:

PipeTransmissionMode.Message 仅在 Windows 上受支持,但我发现 PipeTransmissionMode.Byte 也能正常工作,但如果长度不是固定的,则必须以某种方式“框架”消息。 STX/Len/Bytes/ETX 或类似的东西。

另一件事是,在Linux上,必须设置Unix文件模式。类似于以下内容。

        if (RuntimeInformation.IsOSPlatform(OSPlatform.Linux))
        {
            File.SetUnixFileMode(
                $"/tmp/CoreFxPipe_{pipeName}",
                UnixFileMode.UserRead | UnixFileMode.UserWrite |
                UnixFileMode.OtherWrite | UnixFileMode.OtherRead 
                | UnixFileMode.GroupRead | UnixFileMode.GroupWrite);
        }
© www.soinside.com 2019 - 2024. All rights reserved.