我正在用 Go 构建一个 P2P 文件传输系统,它可以并行地从对等点获取数据块。虽然顺序请求工作正常,但使用 goroutine 的并行请求行为不一致,从而导致等待组计数器为负或数据传输不完整等错误。
代码概述:
PeerServer
结构及其处理对等连接和文件请求的方法。关键问题:
main.go(简化版):
func main() {
// Initialize and start peer servers
peerServer1 := NewPeerServer(opts1, "localhost:5000")
peerServer2 := NewPeerServer(opts2, "localhost:5001")
var wg sync.WaitGroup
wg.Add(2)
go func() { defer wg.Done(); peerServer1.Start() }()
go func() { defer wg.Done(); peerServer2.Start() }()
time.Sleep(2 * time.Second) // Wait for servers to start
for i := 1; i <= 3; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
peerServer1.TestFileRequest(i, "127.0.0.1:5001")
}(i)
}
wg.Wait()
log.Println("Servers shut down successfully.")
}
peer.go(简体):
type PeerServer struct {
peerLock sync.Mutex
peers map[string]Peer
Transport Transport
}
func (p *PeerServer) TestFileRequest(i int, ipAddr string) error {
err := p.Transport.Dial(ipAddr, "peer-server")
if err != nil {
return err
}
dataMessage := DataMessage{Payload: TestSend{ChunkId: i, ChunkName: fmt.Sprintf("chunk_%d.chunk", i)}}
var buf bytes.Buffer
encoder := gob.NewEncoder(&buf)
if err := encoder.Encode(dataMessage); err != nil {
return fmt.Errorf("Failed to encode data: %v", err)
}
p.peerLock.Lock()
peer, exists := p.peers[ipAddr]
p.peerLock.Unlock()
if !exists {
return fmt.Errorf("peer with address %s not found", ipAddr)
}
if err := peer.Send(buf.Bytes()); err != nil {
return fmt.Errorf("Failed to send data: %v", err)
}
time.Sleep(time.Millisecond * 500)
var fileSize int64
if err := binary.Read(peer, binary.LittleEndian, &fileSize); err != nil {
return fmt.Errorf("Failed to read file size: %v", err)
}
file, err := os.Create(fmt.Sprintf("chunk_%d.chunk", i))
if err != nil {
return fmt.Errorf("Failed to create file: %v", err)
}
defer file.Close()
if _, err := io.CopyN(file, peer, fileSize); err != nil {
return fmt.Errorf("Failed to copy file: %v", err)
}
fmt.Printf("Successfully received and saved chunk_%d.chunk\n", i)
peer.CloseStream(i)
return nil
}
tcp_transport.go(简化):
type TCPPeer struct {
net.Conn
outbound bool
wg *sync.WaitGroup
}
func (p *TCPPeer) CloseStream(i int) {
p.wg.Done()
}
func (p *TCPPeer) Send(b []byte) error {
_, err := p.Write(b)
return err
}
type TCPTransport struct {
TransportOpts
listener net.Listener
msgch chan msg
}
func (t *TCPTransport) Dial(addr string, connType string) error {
conn, err := net.Dial("tcp", addr)
if err != nil {
return err
}
go t.handleConn(conn, true, connType)
return nil
}
func (t *TCPTransport) handleConn(conn net.Conn, outbound bool, connType string) {
peer := NewTCPPeer(conn, outbound)
if t.OnPeer != nil {
if err := t.OnPeer(peer, connType); err != nil {
return
}
}
for {
msg := msg{}
err := t.Decoder.Reader(conn, &msg)//reads the message if stream is sent makes msg.stream==true
if err != nil {
return
}
msg.From = conn.RemoteAddr().String()
if msg.Stream {
peer.wg.Add(1)
peer.wg.Wait()
continue
}
t.msgch <- msg
}
}
可能原因:
sync.WaitGroup
。要求:
有人可以帮助识别问题并提出解决方案以使并行块请求正常工作吗?任何有关改进并发处理或调试技术的见解将不胜感激。
附加信息:
提前感谢您的帮助!
从对等服务器并发获取Chunk数据除外
我在你的代码中找不到
TCPPeer.wg.Add(1)
。但是您在 CloseStream() 中使用 TCPPeer.wg.Done()
。
我认为这会导致负 wg 计数器错误。