通过 tcp 连接在 golang 中并行请求来自对等方的块

问题描述 投票:0回答:1

我正在用 Go 构建一个 P2P 文件传输系统,它可以并行地从对等点获取数据块。虽然顺序请求工作正常,但使用 goroutine 的并行请求行为不一致,从而导致等待组计数器为负或数据传输不完整等错误。

代码概述:

  • main.go:初始化并启动两个对等服务器。它使用 goroutine 模拟从一个对等点到另一个对等点的文件请求。
  • peer.go:定义
    PeerServer
    结构及其处理对等连接和文件请求的方法。
  • tcp_transport.go:实现传输层,处理连接和数据传输。
  • transport.go:定义传输接口。
  • encoding.go:提供消息的编码和解码功能。

关键问题:

  1. 负 WaitGroup 计数器:有时会在并行块请求期间发生。
  2. 不完整的数据传输:使用goroutine时某些块没有传输。

main.go(简化版):

func main() {
    // Initialize and start peer servers
    peerServer1 := NewPeerServer(opts1, "localhost:5000")
    peerServer2 := NewPeerServer(opts2, "localhost:5001")

    var wg sync.WaitGroup
    wg.Add(2)
    go func() { defer wg.Done(); peerServer1.Start() }()
    go func() { defer wg.Done(); peerServer2.Start() }()

    time.Sleep(2 * time.Second) // Wait for servers to start

    for i := 1; i <= 3; i++ {
        wg.Add(1)
        go func(i int) {
            defer wg.Done()
            peerServer1.TestFileRequest(i, "127.0.0.1:5001")
        }(i)
    }

    wg.Wait()
    log.Println("Servers shut down successfully.")
}

peer.go(简体):

type PeerServer struct {
    peerLock  sync.Mutex
    peers     map[string]Peer
    Transport Transport
}

func (p *PeerServer) TestFileRequest(i int, ipAddr string) error {
    err := p.Transport.Dial(ipAddr, "peer-server")
    if err != nil {
        return err
    }

    dataMessage := DataMessage{Payload: TestSend{ChunkId: i, ChunkName: fmt.Sprintf("chunk_%d.chunk", i)}}
    var buf bytes.Buffer
    encoder := gob.NewEncoder(&buf)
    if err := encoder.Encode(dataMessage); err != nil {
        return fmt.Errorf("Failed to encode data: %v", err)
    }

    p.peerLock.Lock()
    peer, exists := p.peers[ipAddr]
    p.peerLock.Unlock()
    if !exists {
        return fmt.Errorf("peer with address %s not found", ipAddr)
    }

    if err := peer.Send(buf.Bytes()); err != nil {
        return fmt.Errorf("Failed to send data: %v", err)
    }

    time.Sleep(time.Millisecond * 500)

    var fileSize int64
    if err := binary.Read(peer, binary.LittleEndian, &fileSize); err != nil {
        return fmt.Errorf("Failed to read file size: %v", err)
    }

    file, err := os.Create(fmt.Sprintf("chunk_%d.chunk", i))
    if err != nil {
        return fmt.Errorf("Failed to create file: %v", err)
    }
    defer file.Close()

    if _, err := io.CopyN(file, peer, fileSize); err != nil {
        return fmt.Errorf("Failed to copy file: %v", err)
    }

    fmt.Printf("Successfully received and saved chunk_%d.chunk\n", i)
    peer.CloseStream(i)

    return nil
}

tcp_transport.go(简化):

type TCPPeer struct {
    net.Conn
    outbound bool
    wg       *sync.WaitGroup
}

func (p *TCPPeer) CloseStream(i int) {
    p.wg.Done()
}

func (p *TCPPeer) Send(b []byte) error {
    _, err := p.Write(b)
    return err
}

type TCPTransport struct {
    TransportOpts
    listener net.Listener
    msgch    chan msg
}

func (t *TCPTransport) Dial(addr string, connType string) error {
    conn, err := net.Dial("tcp", addr)
    if err != nil {
        return err
    }
    go t.handleConn(conn, true, connType)
    return nil
}

func (t *TCPTransport) handleConn(conn net.Conn, outbound bool, connType string) {
    peer := NewTCPPeer(conn, outbound)
    if t.OnPeer != nil {
        if err := t.OnPeer(peer, connType); err != nil {
            return
        }
    }
    for {
        msg := msg{}
        err := t.Decoder.Reader(conn, &msg)//reads the message if stream is sent makes msg.stream==true
        if err != nil {
            return
        }
        msg.From = conn.RemoteAddr().String()
        if msg.Stream {
            peer.wg.Add(1)
            peer.wg.Wait()
            continue
        }
        t.msgch <- msg
    }
}

可能原因:

  1. 同步问题:潜在的竞争条件与
    sync.WaitGroup
  2. 网络延迟/计时:延迟或操作顺序可能会导致并行执行中出现问题。
  3. 在peer中等待组:当我在for循环中运行goroutine时,一些go例程会先访问peer.CloseStream,然后才能访问tcp_transport.go中的wg.wait()。

要求:

有人可以帮助识别问题并提出解决方案以使并行块请求正常工作吗?任何有关改进并发处理或调试技术的见解将不胜感激。

附加信息:

  • 当顺序发出请求而不在 main.go 中使用 goroutine 时,代码可以完美运行。
  • 我尝试添加/删除锁以及调整睡眠持续时间,但没有成功。

提前感谢您的帮助!

从对等服务器并发获取Chunk数据除外

go concurrency network-programming goroutine
1个回答
0
投票

我在你的代码中找不到

TCPPeer.wg.Add(1)
。但是您在 CloseStream() 中使用
TCPPeer.wg.Done()
。 我认为这会导致负 wg 计数器错误。

© www.soinside.com 2019 - 2024. All rights reserved.