我正在尝试在两台不同的机器上(仅)实现全双工客户端-服务器通信方案,其中每个端点(客户端或服务器)可以随时异步(非阻塞管道)发送内容,并且另一端会拿起它并阅读。
(请不要通过参考命名管道之外的其他技术来回答;我需要使用这种特定方法的答案。)
我读到命名管道必须是单向的,否则就会锁定,但我猜这可能是错误的。我认为管道是基于套接字的,我无法想象底层套接字只是单向的。
此问题的任何答案都需要解决这些问题才能真正有用:
我很抱歉听起来要求很高并且很势利,但是经过几天的互联网搜索,我仍然没有找到一个好的例子,而且我不想使用WCF。如果你知道这个答案的细节并回答得好,我相信这个话题将成为未来几年真正的赢家。如果我弄清楚了,我会自己发布答案。
如果您要写并说“您需要使用两个管道”,请解释为什么,以及您如何知道这是真的,因为我读到的任何内容都无法解释为什么会出现这种情况。
谢谢!
您不必使用两个管道。我在网上找到了很多答案,表明您需要使用两个管道。我四处寻找,彻夜不眠,反复尝试,并弄清楚了如何做到这一点,这非常简单,但你必须把一切都做好(尤其是按照正确的调用顺序),否则它就行不通。另一个技巧是始终确保有一个未完成的读调用,否则它也会锁定。在知道有人正在阅读之前不要写。除非您先设置了事件,否则不要启动读取调用。那种事。
这是我正在使用的管道类。它可能不够强大,无法处理管道错误、关闭和溢出。
好吧,我不知道这里出了什么问题,但是格式有点不对! 呜呜呜
namespace Squall
{
public interface PipeSender
{
Task SendCommandAsync(PipeCommandPlusString pCmd);
}
/******************************************************************************
*
*
*
*
******************************************************************************/
public class ClientPipe : BasicPipe
{
NamedPipeClientStream m_pPipe;
public ClientPipe(string szServerName, string szPipeName)
: base("Client")
{
m_szPipeName = szPipeName; // debugging
m_pPipe = new NamedPipeClientStream(szServerName, szPipeName, PipeDirection.InOut, PipeOptions.Asynchronous);
base.SetPipeStream(m_pPipe); // inform base class what to read/write from
}
public void Connect()
{
Debug.WriteLine("Pipe " + FullPipeNameDebug() + " connecting to server");
m_pPipe.Connect(); // doesn't seem to be an async method for this routine. just a timeout.
StartReadingAsync();
}
// the client's pipe index is always 0
internal override int PipeId() { return 0; }
}
/******************************************************************************
*
*
*
*
******************************************************************************/
public class ServerPipe : BasicPipe
{
public event EventHandler<EventArgs> GotConnectionEvent;
NamedPipeServerStream m_pPipe;
int m_nPipeId;
public ServerPipe(string szPipeName, int nPipeId)
: base("Server")
{
m_szPipeName = szPipeName;
m_nPipeId = nPipeId;
m_pPipe = new NamedPipeServerStream(
szPipeName,
PipeDirection.InOut,
NamedPipeServerStream.MaxAllowedServerInstances,
PipeTransmissionMode.Message,
PipeOptions.Asynchronous);
base.SetPipeStream(m_pPipe);
m_pPipe.BeginWaitForConnection(new AsyncCallback(StaticGotPipeConnection), this);
}
static void StaticGotPipeConnection(IAsyncResult pAsyncResult)
{
ServerPipe pThis = pAsyncResult.AsyncState as ServerPipe;
pThis.GotPipeConnection(pAsyncResult);
}
void GotPipeConnection(IAsyncResult pAsyncResult)
{
m_pPipe.EndWaitForConnection(pAsyncResult);
Debug.WriteLine("Server Pipe " + m_szPipeName + " got a connection");
if (GotConnectionEvent != null)
{
GotConnectionEvent(this, new EventArgs());
}
// lodge the first read request to get us going
//
StartReadingAsync();
}
internal override int PipeId() { return m_nPipeId; }
}
/******************************************************************************
*
*
*
*
******************************************************************************/
public abstract class BasicPipe : PipeSender
{
public static int MaxLen = 1024 * 1024; // why not
protected string m_szPipeName;
protected string m_szDebugPipeName;
public event EventHandler<PipeEventArgs> ReadDataEvent;
public event EventHandler<EventArgs> PipeClosedEvent;
protected byte[] m_pPipeBuffer = new byte[BasicPipe.MaxLen];
PipeStream m_pPipeStream;
public BasicPipe(string szDebugPipeName)
{
m_szDebugPipeName = szDebugPipeName;
}
protected void SetPipeStream(PipeStream p)
{
m_pPipeStream = p;
}
protected string FullPipeNameDebug()
{
return m_szDebugPipeName + "-" + m_szPipeName;
}
internal abstract int PipeId();
public void Close()
{
m_pPipeStream.WaitForPipeDrain();
m_pPipeStream.Close();
m_pPipeStream.Dispose();
m_pPipeStream = null;
}
// called when Server pipe gets a connection, or when Client pipe is created
public void StartReadingAsync()
{
Debug.WriteLine("Pipe " + FullPipeNameDebug() + " calling ReadAsync");
// okay we're connected, now immediately listen for incoming buffers
//
byte[] pBuffer = new byte[MaxLen];
m_pPipeStream.ReadAsync(pBuffer, 0, MaxLen).ContinueWith(t =>
{
Debug.WriteLine("Pipe " + FullPipeNameDebug() + " finished a read request");
int ReadLen = t.Result;
if (ReadLen == 0)
{
Debug.WriteLine("Got a null read length, remote pipe was closed");
if (PipeClosedEvent != null)
{
PipeClosedEvent(this, new EventArgs());
}
return;
}
if (ReadDataEvent != null)
{
ReadDataEvent(this, new PipeEventArgs(pBuffer, ReadLen));
}
else
{
Debug.Assert(false, "something happened");
}
// lodge ANOTHER read request
//
StartReadingAsync();
});
}
protected Task WriteByteArray(byte[] pBytes)
{
// this will start writing, but does it copy the memory before returning?
return m_pPipeStream.WriteAsync(pBytes, 0, pBytes.Length);
}
public Task SendCommandAsync(PipeCommandPlusString pCmd)
{
Debug.WriteLine("Pipe " + FullPipeNameDebug() + ", writing " + pCmd.GetCommand() + "-" + pCmd.GetTransmittedString());
string szSerializedCmd = JsonConvert.SerializeObject(pCmd);
byte[] pSerializedCmd = Misc.StringToBytes(szSerializedCmd);
Task t = WriteByteArray(pSerializedCmd);
return t;
}
}
/******************************************************************************
*
*
*
*
******************************************************************************/
public class PipeEventArgs
{
public byte[] m_pData;
public int m_nDataLen;
public PipeEventArgs(byte[] pData, int nDataLen)
{
// is this a copy, or an alias copy? I can't remember right now.
m_pData = pData;
m_nDataLen = nDataLen;
}
}
/******************************************************************************
* if we're just going to send a string back and forth, then we can use this
* class. It it allows us to get the bytes as a string. sort of silly.
******************************************************************************/
[Serializable]
public class PipeCommandPlusString
{
public string m_szCommand; // must be public to be serialized
public string m_szString; // ditto
public PipeCommandPlusString(string sz, string szString)
{
m_szCommand = sz;
m_szString = szString;
}
public string GetCommand()
{
return m_szCommand;
}
public string GetTransmittedString()
{
return m_szString;
}
}
}
这是我的管道测试,在一个进程上运行。它也运行在两个进程上,我检查过
namespace NamedPipeTest
{
public partial class Form1 : Form
{
SynchronizationContext _context;
Thread m_pThread = null;
volatile bool m_bDieThreadDie;
ServerPipe m_pServerPipe;
ClientPipe m_pClientPipe;
public Form1()
{
InitializeComponent();
}
private void Form1_Load(object sender, EventArgs e)
{
_context = SynchronizationContext.Current;
m_pServerPipe = new ServerPipe("SQUALL_PIPE", 0);
m_pServerPipe.ReadDataEvent += M_pServerPipe_ReadDataEvent;
m_pServerPipe.PipeClosedEvent += M_pServerPipe_PipeClosedEvent;
// m_pThread = new Thread(StaticThreadProc);
// m_pThread.Start( this );
}
private void M_pServerPipe_PipeClosedEvent(object sender, EventArgs e)
{
Debug.WriteLine("Server: Pipe was closed, shutting down");
// have to post this on the main thread
_context.Post(delegate
{
Close();
}, null);
}
private void M_pServerPipe_ReadDataEvent(object sender, PipeEventArgs e)
{
// this gets called on an anonymous thread
byte[] pBytes = e.m_pData;
string szBytes = Misc.BytesToString(pBytes, e.m_pData.Length);
PipeCommandPlusString pCmd = JsonConvert.DeserializeObject<PipeCommandPlusString>(szBytes);
string szValue = pCmd.GetTransmittedString();
if (szValue == "CONNECT")
{
Debug.WriteLine("Got command from client: " + pCmd.GetCommand() + "-" + pCmd.GetTransmittedString() + ", writing command back to client");
PipeCommandPlusString pCmdToSend = new PipeCommandPlusString("SERVER", "CONNECTED");
// fire off an async write
Task t = m_pServerPipe.SendCommandAsync(pCmdToSend);
}
}
static void StaticThreadProc(Object o)
{
Form1 pThis = o as Form1;
pThis.ThreadProc();
}
void ThreadProc()
{
m_pClientPipe = new ClientPipe(".", "SQUALL_PIPE");
m_pClientPipe.ReadDataEvent += PClientPipe_ReadDataEvent;
m_pClientPipe.PipeClosedEvent += M_pClientPipe_PipeClosedEvent;
m_pClientPipe.Connect();
PipeCommandPlusString pCmd = new PipeCommandPlusString("CLIENT", "CONNECT");
int Counter = 1;
while (Counter++ < 10)
{
Debug.WriteLine("Counter = " + Counter);
m_pClientPipe.SendCommandAsync(pCmd);
Thread.Sleep(3000);
}
while (!m_bDieThreadDie)
{
Thread.Sleep(1000);
}
m_pClientPipe.ReadDataEvent -= PClientPipe_ReadDataEvent;
m_pClientPipe.PipeClosedEvent -= M_pClientPipe_PipeClosedEvent;
m_pClientPipe.Close();
m_pClientPipe = null;
}
private void M_pClientPipe_PipeClosedEvent(object sender, EventArgs e)
{
// wait around for server to shut us down
}
private void PClientPipe_ReadDataEvent(object sender, PipeEventArgs e)
{
byte[] pBytes = e.m_pData;
string szBytes = Misc.BytesToString(pBytes, e.m_nDataLen);
PipeCommandPlusString pCmd = JsonConvert.DeserializeObject<PipeCommandPlusString>(szBytes);
string szValue = pCmd.GetTransmittedString();
Debug.WriteLine("Got command from server: " + pCmd.GetCommand() + "-" + pCmd.GetTransmittedString());
if (szValue == "CONNECTED")
{
PipeCommandPlusString pCmdToSend = new PipeCommandPlusString("CLIENT", "DATA");
m_pClientPipe.SendCommandAsync(pCmdToSend);
}
}
private void Form1_FormClosing(object sender, FormClosingEventArgs e)
{
if (m_pThread != null)
{
m_bDieThreadDie = true;
m_pThread.Join();
m_bDieThreadDie = false;
}
m_pServerPipe.ReadDataEvent -= M_pServerPipe_ReadDataEvent;
m_pServerPipe.PipeClosedEvent -= M_pServerPipe_PipeClosedEvent;
m_pServerPipe.Close();
m_pServerPipe = null;
}
}
}
只需将管道创建为重叠的,您的代码就可以阻止一个线程中的读取,同时从另一个线程写入管道。
void StartServer()
{
Task.Factory.StartNew(() =>
{
var server = new NamedPipeServerStream("PipesOfPiece", PipeDirection.InOut, 1, PipeTransmissionMode.Byte, PipeOptions.Asynchronous);
server.WaitForConnection();
reader = new StreamReader(server);
writer = new StreamWriter(server);
});
}
private async void timer1_Tick(object sender, EventArgs e)
{
timer1.Stop();
if (null != reader)
{
char[] buf = new char[50];
int count = await reader.ReadAsync(buf, 0, 50);
if (0 < count)
{
m_textBox_from.Text = new string(buf, 0, count);
}
}
timer1.Start();
}
我认为当你进行异步通信时,你必须使用两个管道。
一个是接收管道,另一个是发送管道
因为,你不知道什么时候接收数据。
当您使用一个管道发送一些数据时,recv 数据无法写入管道。
相反,您不能在管道上写入发送数据。
因此您必须使用两个管道进行异步通信。
我发现埃里克的解决方案非常有帮助。我正在构建一个跨平台系统,Windows 和 Linux,并使用 .NET 8 让它在这两个平台上运行。
关于 Linux 兼容性的注意事项:
PipeTransmissionMode.Message 仅在 Windows 上受支持,但我发现 PipeTransmissionMode.Byte 也能正常工作,但如果长度不是固定的,则必须以某种方式“框架”消息。 STX/Len/Bytes/ETX 或类似的东西。
另一件事是,在Linux上,必须设置Unix文件模式。类似于以下内容。
if (RuntimeInformation.IsOSPlatform(OSPlatform.Linux))
{
File.SetUnixFileMode(
$"/tmp/CoreFxPipe_{pipeName}",
UnixFileMode.UserRead | UnixFileMode.UserWrite |
UnixFileMode.OtherWrite | UnixFileMode.OtherRead
| UnixFileMode.GroupRead | UnixFileMode.GroupWrite);
}