如何优化golang制作的负载均衡器来处理高并发请求?

问题描述 投票:0回答:1

我在 golang 中创建了一个简单的负载均衡器,我正在测试它的能力 处理并发请求。为此,我正在运行一个 golang 脚本,其中发送 10,000 个请求,并将最大并发请求设置为 500。问题是,当数量较低时,没有问题,但当数量较高时,如上面提到的,代码给我错误。我附上了机器人负载均衡器和脚本的代码和错误屏幕截图。

// load-balancer.go
package main

import (
    "bytes"
    "errors"
    "flag"
    "fmt"
    "io"
    "io/ioutil"
    "log"
    "net/http"
    "sync"
    "time"
)

type ServerDataFormat struct {
    // wg *sync.WaitGroup
    lock *sync.Mutex
    index int
}

var reqCountMap = make(map[int]int)

var WorkingServerList []string = []string{
    "http://localhost:7000",
    "http://localhost:7001",
    "http://localhost:7002",
}

var client = &http.Client{
    Transport: &http.Transport{
        MaxIdleConns:        100,
        MaxIdleConnsPerHost: 10,
        IdleConnTimeout:     90 * time.Second,
    },
}


// this will check and keep track of health status of all the servers
var serverCheckMap = make(map[string]bool)

func AcceptIncomingRequest(w http.ResponseWriter, r *http.Request, srvDataObject *ServerDataFormat) {
    // print the request details
    fmt.Println("Received request from: ", r.RemoteAddr)
    fmt.Println("Method: ", r.Method)
    fmt.Println("Request URI: ", r.RequestURI)
    fmt.Println("Request URL: ", r.URL)
    fmt.Println("Request Path: ", r.URL.Path)
    fmt.Println("Agent: ", r.UserAgent())
    fmt.Println("Headers: ", r.Header)

    ForwardRequestToServer(w, r, srvDataObject)
    return
}

func (srv *ServerDataFormat) GetServerUsingRoundRobin(resp chan string) {
    //defer srv.wg.Done()
    var server string
    srv.lock.Lock()
    nextIndex := (*srv).index+1
    fmt.Println("curr index: ", (*srv).index)
    fmt.Println("len of list: ", len(WorkingServerList))
    if nextIndex == len(WorkingServerList) {
        nextIndex %= len(WorkingServerList)
    }
    // update the current index
    (*srv).index = nextIndex
    fmt.Println("next index: ", nextIndex)
    server = WorkingServerList[nextIndex]
    reqCountMap[nextIndex]++ // increment the count
    srv.lock.Unlock()
    resp <- server
}

func ForwardRequestToServer(w http.ResponseWriter, r *http.Request, srvDataObj *ServerDataFormat) {
    fmt.Println("Inside ForwardRequestToServer...")
    // calling this as a go routine
    server := make(chan string)
    go srvDataObj.GetServerUsingRoundRobin(server)
    BaseUrl := <-server
    // base url
    fmt.Println("base url: ", BaseUrl)
    newReq, err := http.NewRequest("GET", BaseUrl+r.RequestURI, r.Body)
    if err != nil {
        fmt.Println("Error while creating new request: ", err.Error())
        http.Error(w, "failed to create new request", http.StatusInternalServerError)
        return
    }

    // set the headers
    for name, values := range r.Header {
        for _, value := range values {
            w.Header().Set(name, value)
        }
    }

    resp, err := client.Do(newReq)
    if err != nil {
        fmt.Println("Error while making request to the server: ", err.Error())
        http.Error(w, "failed to making request to the server", http.StatusInternalServerError)
        return
    }

    defer resp.Body.Close()

    for name, values := range resp.Header {
        for _, value := range values {
            w.Header().Set(name, value)
        }
    }

    respBytes, err := ioutil.ReadAll(resp.Body)
    if err != nil {
        fmt.Println("Error while reading response bytes from the server: ", err.Error())
        http.Error(w, err.Error(), http.StatusInternalServerError)
        return
    }

    fmt.Println("\n\nResponse status: ", resp.Status)
    fmt.Printf("response from server: %s", string(respBytes))

    w.Header().Set("Content-Type", "text/plain")
    io.Copy(w, resp.Body)
    printServerRequestCountDetails()
}

func printServerRequestCountDetails() {
    fmt.Println("\n****** Request Details *******")
    for server, count := range reqCountMap {
        fmt.Printf("Server %d: %d\n", server+1, count)
    }
    fmt.Println("******************************")
}

func removeServerFromWorkingServerList(targetServer string) {
    for index, server := range WorkingServerList {
        if server == targetServer {
            WorkingServerList = append(WorkingServerList[:index], WorkingServerList[index+1:]...)
            break
        }
    }
}

func ServerHealthCheck() {
    for server, pass := range serverCheckMap {
        newReq, err := http.NewRequest("GET", server + "/healthcheck", bytes.NewReader(nil))
        if err != nil {
            fmt.Println("Error while creating new request for health check: ", err.Error())
            break
        }

        client := &http.Client{}
        resp, err := client.Do(newReq)
        if err != nil {
            fmt.Printf("\nServer %s is down\n", server)
            // remove the server from the list of working servers
            removeServerFromWorkingServerList(server)
            serverCheckMap[server] = false // mark as not working
        }else if resp.StatusCode == 200 && !pass {
            WorkingServerList = append(WorkingServerList, server)
            serverCheckMap[server] = true // mark as working
        }
    }

    // print all the active servers
    fmt.Println("********* Active Servers **********")
    for _, server := range WorkingServerList {
        fmt.Println(server)
    }
    fmt.Println("**********************************")
    return
}

func main() {
    // fill the server check map
    // initially all the servers are working
    for _, server := range WorkingServerList {
        serverCheckMap[server] = true
    }

    interval := flag.Int("time", 10, "this flag is used to specify the time interval between each server healthcheck up")
    flag.Parse()

    http.HandleFunc("/hello", func(w http.ResponseWriter, r *http.Request) {
        w.Header().Set("Content-Type", "text/plain")
        w.Write([]byte("Hello"))
    })

    http.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) {
        w.Header().Set("Content-Type", "application/json")
        w.Write([]byte("Server Health is Ok"))
    })

    serverDataObj := ServerDataFormat {
        index: 0,
        lock: &sync.Mutex{},
    }

    // load balancer route
    http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
        // start a go routine
        AcceptIncomingRequest(w, r, &serverDataObj)
    })

    // heath check anon func
    go func (t int) {
        for {
            ServerHealthCheck()
            if len(WorkingServerList)==0 {
                fmt.Println("No working servers !!!!")
                break
            }
            time.Sleep(time.Second * time.Duration(t))
        }
    }(*interval)

    // setting up http server
    fmt.Println("Starting server on port: 8080")
    err := http.ListenAndServe(":8080", nil)
    if errors.Is(err, http.ErrServerClosed) {
        log.Fatal("error during ListenAndServe: server closed ", err.Error())
        return
    } else if err != nil {
        log.Fatal("error during ListenAndServer: ", err.Error())
        return
    }
}

// test-script.go
package main

import (
    "bytes"
    "fmt"
    "net/http"
    "sync"
)

func makeRequest(wg *sync.WaitGroup) {
    defer wg.Done()
    newReq, err := http.NewRequest("GET", "http://localhost:8080/users/Ashwin", bytes.NewReader(nil))
    if err != nil {
        fmt.Println("error while creating new http request: ", err.Error())
        return
    }
    client := &http.Client{}
    resp, err := client.Do(newReq)
    if err != nil {
        fmt.Println("Error while making request: ", err.Error())
        return
    }
    fmt.Println("status code: ", resp.StatusCode)
}

func main() {
    wg := sync.WaitGroup{}

    semaphore := make(chan struct{}, 500)

    // launch 10,000 go routines
    for i:=0; i<10000; i++ {
        wg.Add(1)
        semaphore <- struct{}{}
        go func() {
            defer func(){<-semaphore}()
            makeRequest(&wg)
        }()
    }

    wg.Wait()
    fmt.Println("All requests completed")
}
// logs for the load balancer
// I am not printing complete logs as the error is repetitive
goroutine 20159 [chan receive]:
main.ForwardRequestToServer({0x1003809a0, 0x140039840e0}, 0x1400380f440, 0x14000192160)
        /Users/bigoh/Documents/learnings/loadbalancer/main.go:80 +0xf4
main.AcceptIncomingRequest({0x1003809a0, 0x140039840e0}, 0x1400380f440, 0x14000192160)
        /Users/bigoh/Documents/learnings/loadbalancer/main.go:52 +0x2e8
main.main.func3({0x1003809a0?, 0x140039840e0?}, 0x1400272eb18?)
        /Users/bigoh/Documents/learnings/loadbalancer/main.go:202 +0x28
net/http.HandlerFunc.ServeHTTP(0x100529500?, {0x1003809a0?, 0x140039840e0?}, 0x100252428?)
        /opt/homebrew/Cellar/go/1.22.5/libexec/src/net/http/server.go:2171 +0x38
net/http.(*ServeMux).ServeHTTP(0x0?, {0x1003809a0, 0x140039840e0}, 0x1400380f440)
        /opt/homebrew/Cellar/go/1.22.5/libexec/src/net/http/server.go:2688 +0x1a4
net/http.serverHandler.ServeHTTP({0x14003817200?}, {0x1003809a0?, 0x140039840e0?}, 0x6?)
        /opt/homebrew/Cellar/go/1.22.5/libexec/src/net/http/server.go:3142 +0xbc
net/http.(*conn).serve(0x1400380bef0, {0x100380e38, 0x14000196c60})
        /opt/homebrew/Cellar/go/1.22.5/libexec/src/net/http/server.go:2044 +0x508
created by net/http.(*Server).Serve in goroutine 1
        /opt/homebrew/Cellar/go/1.22.5/libexec/src/net/http/server.go:3290 +0x3f0

goroutine 19846 [sync.Mutex.Lock]:
sync.runtime_SemacquireMutex(0x10005dfb0?, 0x0?, 0x140024b5ee8?)
        /opt/homebrew/Cellar/go/1.22.5/libexec/src/runtime/sema.go:77 +0x28
sync.(*Mutex).lockSlow(0x140001a20e0)
        /opt/homebrew/Cellar/go/1.22.5/libexec/src/sync/mutex.go:171 +0x174
sync.(*Mutex).Lock(...)
        /opt/homebrew/Cellar/go/1.22.5/libexec/src/sync/mutex.go:90
main.(*ServerDataFormat).GetServerUsingRoundRobin(0x14000192160, 0x1400368c6c0)
        /Users/bigoh/Documents/learnings/loadbalancer/main.go:59 +0x80
created by main.ForwardRequestToServer in goroutine 20227
        /Users/bigoh/Documents/learnings/loadbalancer/main.go:79 +0xe0

goroutine 19103 [IO wait]:
internal/poll.runtime_pollWait(0x1471ff848, 0x72)
        /opt/homebrew/Cellar/go/1.22.5/libexec/src/runtime/netpoll.go:345 +0xa0
internal/poll.(*pollDesc).wait(0x14002da6400?, 0x14002db8eb1?, 0x0)
        /opt/homebrew/Cellar/go/1.22.5/libexec/src/internal/poll/fd_poll_runtime.go:84 +0x28
internal/poll.(*pollDesc).waitRead(...)
        /opt/homebrew/Cellar/go/1.22.5/libexec/src/internal/poll/fd_poll_runtime.go:89
internal/poll.(*FD).Read(0x14002da6400, {0x14002db8eb1, 0x1, 0x1})
        /opt/homebrew/Cellar/go/1.22.5/libexec/src/internal/poll/fd_unix.go:164 +0x200
net.(*netFD).Read(0x14002da6400, {0x14002db8eb1?, 0x10032dbe0?, 0x14000196990?})
        /opt/homebrew/Cellar/go/1.22.5/libexec/src/net/fd_posix.go:55 +0x28
net.(*conn).Read(0x140034c9470, {0x14002db8eb1?, 0x10031e3c0?, 0x1004fc390?})
        /opt/homebrew/Cellar/go/1.22.5/libexec/src/net/net.go:185 +0x34
net/http.(*connReader).backgroundRead(0x14002db8ea0)
        /opt/homebrew/Cellar/go/1.22.5/libexec/src/net/http/server.go:681 +0x40
created by net/http.(*connReader).startBackgroundRead in goroutine 19102
        /opt/homebrew/Cellar/go/1.22.5/libexec/src/net/http/server.go:677 +0xc8

goroutine 19104 [IO wait]:
internal/poll.runtime_pollWait(0x1473df4c0, 0x72)
        /opt/homebrew/Cellar/go/1.22.5/libexec/src/runtime/netpoll.go:345 +0xa0
internal/poll.(*pollDesc).wait(0x14002da6300?, 0x14002db8fd1?, 0x0)
        /opt/homebrew/Cellar/go/1.22.5/libexec/src/internal/poll/fd_poll_runtime.go:84 +0x28
internal/poll.(*pollDesc).waitRead(...)
        /opt/homebrew/Cellar/go/1.22.5/libexec/src/internal/poll/fd_poll_runtime.go:89
internal/poll.(*FD).Read(0x14002da6300, {0x14002db8fd1, 0x1, 0x1})
        /opt/homebrew/Cellar/go/1.22.5/libexec/src/internal/poll/fd_unix.go:164 +0x200
net.(*netFD).Read(0x14002da6300, {0x14002db8fd1?, 0x10032dbe0?, 0x14000196990?})
        /opt/homebrew/Cellar/go/1.22.5/libexec/src/net/fd_posix.go:55 +0x28
net.(*conn).Read(0x140034c9460, {0x14002db8fd1?, 0x10031e3c0?, 0x1400228d448?})
        /opt/homebrew/Cellar/go/1.22.5/libexec/src/net/net.go:185 +0x34
net/http.(*connReader).backgroundRead(0x14002db8fc0)
        /opt/homebrew/Cellar/go/1.22.5/libexec/src/net/http/server.go:681 +0x40
created by net/http.(*connReader).startBackgroundRead in goroutine 19100
        /opt/homebrew/Cellar/go/1.22.5/libexec/src/net/http/server.go:677 +0xc8

goroutine 19105 [IO wait]:
internal/poll.runtime_pollWait(0x147253f48, 0x72)
        /opt/homebrew/Cellar/go/1.22.5/libexec/src/runtime/netpoll.go:345 +0xa0
internal/poll.(*pollDesc).wait(0x14002da6380?, 0x14002db90f1?, 0x0)
        /opt/homebrew/Cellar/go/1.22.5/libexec/src/internal/poll/fd_poll_runtime.go:84 +0x28
internal/poll.(*pollDesc).waitRead(...)
        /opt/homebrew/Cellar/go/1.22.5/libexec/src/internal/poll/fd_poll_runtime.go:89
internal/poll.(*FD).Read(0x14002da6380, {0x14002db90f1, 0x1, 0x1})
        /opt/homebrew/Cellar/go/1.22.5/libexec/src/internal/poll/fd_unix.go:164 +0x200
net.(*netFD).Read(0x14002da6380, {0x14002db90f1?, 0x10032dbe0?, 0x14000196990?})
        /opt/homebrew/Cellar/go/1.22.5/libexec/src/net/fd_posix.go:55 +0x28
net.(*conn).Read(0x140034c9468, {0x14002db90f1?, 0x10031e3c0?, 0x1400091d238?})
        /opt/homebrew/Cellar/go/1.22.5/libexec/src/net/net.go:185 +0x34
net/http.(*connReader).backgroundRead(0x14002db90e0)
        /opt/homebrew/Cellar/go/1.22.5/libexec/src/net/http/server.go:681 +0x40
created by net/http.(*connReader).startBackgroundRead in goroutine 19101
        /opt/homebrew/Cellar/go/1.22.5/libexec/src/net/http/server.go:677 +0xc8

goroutine 19154 [chan receive]:
main.ForwardRequestToServer({0x1003809a0, 0x14002d9f0a0}, 0x14002d8d9e0, 0x14000192160)
        /Users/bigoh/Documents/learnings/loadbalancer/main.go:80 +0xf4
main.AcceptIncomingRequest({0x1003809a0, 0x14002d9f0a0}, 0x14002d8d9e0, 0x14000192160)
        /Users/bigoh/Documents/learnings/loadbalancer/main.go:52 +0x2e8
main.main.func3({0x1003809a0?, 0x14002d9f0a0?}, 0x140023beb18?)
        /Users/bigoh/Documents/learnings/loadbalancer/main.go:202 +0x28
net/http.HandlerFunc.ServeHTTP(0x100529500?, {0x1003809a0?, 0x14002d9f0a0?}, 0x100252428?)
        /opt/homebrew/Cellar/go/1.22.5/libexec/src/net/http/server.go:2171 +0x38
net/http.(*ServeMux).ServeHTTP(0x0?, {0x1003809a0, 0x14002d9f0a0}, 0x14002d8d9e0)
        /opt/homebrew/Cellar/go/1.22.5/libexec/src/net/http/server.go:2688 +0x1a4
net/http.serverHandler.ServeHTTP({0x14002db9380?}, {0x1003809a0?, 0x14002d9f0a0?}, 0x6?)
        /opt/homebrew/Cellar/go/1.22.5/libexec/src/net/http/server.go:3142 +0xbc
net/http.(*conn).serve(0x14002da9d40, {0x100380e38, 0x14000196c60})
        /opt/homebrew/Cellar/go/1.22.5/libexec/src/net/http/server.go:2044 +0x508
created by net/http.(*Server).Serve in goroutine 1
        /opt/homebrew/Cellar/go/1.22.5/libexec/src/net/http/server.go:3290 +0x3f0

goroutine 19690 [IO wait]:
internal/poll.runtime_pollWait(0x1473352b0, 0x72)
        /opt/homebrew/Cellar/go/1.22.5/libexec/src/runtime/netpoll.go:345 +0xa0
internal/poll.(*pollDesc).wait(0x140033fa180?, 0x140033f7661?, 0x0)
        /opt/homebrew/Cellar/go/1.22.5/libexec/src/internal/poll/fd_poll_runtime.go:84 +0x28
internal/poll.(*pollDesc).waitRead(...)
        /opt/homebrew/Cellar/go/1.22.5/libexec/src/internal/poll/fd_poll_runtime.go:89
internal/poll.(*FD).Read(0x140033fa180, {0x140033f7661, 0x1, 0x1})
        /opt/homebrew/Cellar/go/1.22.5/libexec/src/internal/poll/fd_unix.go:164 +0x200
net.(*netFD).Read(0x140033fa180, {0x140033f7661?, 0x10032dbe0?, 0x14000196990?})
        /opt/homebrew/Cellar/go/1.22.5/libexec/src/net/fd_posix.go:55 +0x28
net.(*conn).Read(0x1400042dd70, {0x140033f7661?, 0x10031e3c0?, 0x1004fc390?})
        /opt/homebrew/Cellar/go/1.22.5/libexec/src/net/net.go:185 +0x34
net/http.(*connReader).backgroundRead(0x140033f7650)
        /opt/homebrew/Cellar/go/1.22.5/libexec/src/net/http/server.go:681 +0x40
created by net/http.(*connReader).startBackgroundRead in goroutine 19616
        /opt/homebrew/Cellar/go/1.22.5/libexec/src/net/http/server.go:677 +0xc8

goroutine 19899 [IO wait]:
internal/poll.runtime_pollWait(0x1473dad10, 0x72)
        /opt/homebrew/Cellar/go/1.22.5/libexec/src/runtime/netpoll.go:345 +0xa0
internal/poll.(*pollDesc).wait(0x140033faa80?, 0x14003626cd1?, 0x0)
        /opt/homebrew/Cellar/go/1.22.5/libexec/src/internal/poll/fd_poll_runtime.go:84 +0x28
internal/poll.(*pollDesc).waitRead(...)
        /opt/homebrew/Cellar/go/1.22.5/libexec/src/internal/poll/fd_poll_runtime.go:89
internal/poll.(*FD).Read(0x140033faa80, {0x14003626cd1, 0x1, 0x1})
        /opt/homebrew/Cellar/go/1.22.5/libexec/src/internal/poll/fd_unix.go:164 +0x200
net.(*netFD).Read(0x140033faa80, {0x14003626cd1?, 0x10032dbe0?, 0x14000196990?})
        /opt/homebrew/Cellar/go/1.22.5/libexec/src/net/fd_posix.go:55 +0x28
net.(*conn).Read(0x1400042def0, {0x14003626cd1?, 0x10031e3c0?, 0x1004fc380?})
        /opt/homebrew/Cellar/go/1.22.5/libexec/src/net/net.go:185 +0x34
net/http.(*connReader).backgroundRead(0x14003626cc0)
        /opt/homebrew/Cellar/go/1.22.5/libexec/src/net/http/server.go:681 +0x40
created by net/http.(*connReader).startBackgroundRead in goroutine 19898
        /opt/homebrew/Cellar/go/1.22.5/libexec/src/net/http/server.go:677 +0xc8

goroutine 19902 [chan receive]:
main.ForwardRequestToServer({0x1003809a0, 0x14003611180}, 0x14003628b40, 0x14000192160)
        /Users/bigoh/Documents/learnings/loadbalancer/main.go:80 +0xf4
main.AcceptIncomingRequest({0x1003809a0, 0x14003611180}, 0x14003628b40, 0x14000192160)
        /Users/bigoh/Documents/learnings/loadbalancer/main.go:52 +0x2e8
main.main.func3({0x1003809a0?, 0x14003611180?}, 0x14000127b18?)
        /Users/bigoh/Documents/learnings/loadbalancer/main.go:202 +0x28
net/http.HandlerFunc.ServeHTTP(0x100529500?, {0x1003809a0?, 0x14003611180?}, 0x100252428?)
        /opt/homebrew/Cellar/go/1.22.5/libexec/src/net/http/server.go:2171 +0x38
net/http.(*ServeMux).ServeHTTP(0x0?, {0x1003809a0, 0x14003611180}, 0x14003628b40)
        /opt/homebrew/Cellar/go/1.22.5/libexec/src/net/http/server.go:2688 +0x1a4
net/http.serverHandler.ServeHTTP({0x14003626f30?}, {0x1003809a0?, 0x14003611180?}, 0x6?)
        /opt/homebrew/Cellar/go/1.22.5/libexec/src/net/http/server.go:3142 +0xbc
net/http.(*conn).serve(0x14003624cf0, {0x100380e38, 0x14000196c60})
        /opt/homebrew/Cellar/go/1.22.5/libexec/src/net/http/server.go:2044 +0x508
created by net/http.(*Server).Serve in goroutine 1
        /opt/homebrew/Cellar/go/1.22.5/libexec/src/net/http/server.go:3290 +0x3f0

goroutine 19903 [IO wait]:
internal/poll.runtime_pollWait(0x147244428, 0x72)
        /opt/homebrew/Cellar/go/1.22.5/libexec/src/runtime/netpoll.go:345 +0xa0
internal/poll.(*pollDesc).wait(0x140033fab00?, 0x14003626fa1?, 0x0)
        /opt/homebrew/Cellar/go/1.22.5/libexec/src/internal/poll/fd_poll_runtime.go:84 +0x28
internal/poll.(*pollDesc).waitRead(...)
        /opt/homebrew/Cellar/go/1.22.5/libexec/src/internal/poll/fd_poll_runtime.go:89
internal/poll.(*FD).Read(0x140033fab00, {0x14003626fa1, 0x1, 0x1})
        /opt/homebrew/Cellar/go/1.22.5/libexec/src/internal/poll/fd_unix.go:164 +0x200
net.(*netFD).Read(0x140033fab00, {0x14003626fa1?, 0x10032dbe0?, 0x14000196990?})
        /opt/homebrew/Cellar/go/1.22.5/libexec/src/net/fd_posix.go:55 +0x28
net.(*conn).Read(0x1400042def8, {0x14003626fa1?, 0x10031e3c0?, 0x1004fc390?})
        /opt/homebrew/Cellar/go/1.22.5/libexec/src/net/net.go:185 +0x34
net/http.(*connReader).backgroundRead(0x14003626f90)
        /opt/homebrew/Cellar/go/1.22.5/libexec/src/net/http/server.go:681 +0x40
created by net/http.(*connReader).startBackgroundRead in goroutine 19900
        /opt/homebrew/Cellar/go/1.22.5/libexec/src/net/http/server.go:677 +0xc8

goroutine 19904 [IO wait]:
internal/poll.runtime_pollWait(0x1472bdbb8, 0x72)
        /opt/homebrew/Cellar/go/1.22.5/libexec/src/runtime/netpoll.go:345 +0xa0
internal/poll.(*pollDesc).wait(0x140033fac00?, 0x14003626f41?, 0x0)
        /opt/homebrew/Cellar/go/1.22.5/libexec/src/internal/poll/fd_poll_runtime.go:84 +0x28
internal/poll.(*pollDesc).waitRead(...)
        /opt/homebrew/Cellar/go/1.22.5/libexec/src/internal/poll/fd_poll_runtime.go:89
internal/poll.(*FD).Read(0x140033fac00, {0x14003626f41, 0x1, 0x1})
        /opt/homebrew/Cellar/go/1.22.5/libexec/src/internal/poll/fd_unix.go:164 +0x200
net.(*netFD).Read(0x140033fac00, {0x14003626f41?, 0x10032dbe0?, 0x14000196990?})
        /opt/homebrew/Cellar/go/1.22.5/libexec/src/net/fd_posix.go:55 +0x28
net.(*conn).Read(0x1400042df08, {0x14003626f41?, 0x10031e3c0?, 0x1004fc380?})
        /opt/homebrew/Cellar/go/1.22.5/libexec/src/net/net.go:185 +0x34
net/http.(*connReader).backgroundRead(0x14003626f30)
        /opt/homebrew/Cellar/go/1.22.5/libexec/src/net/http/server.go:681 +0x40
created by net/http.(*connReader).startBackgroundRead in goroutine 19902
        /opt/homebrew/Cellar/go/1.22.5/libexec/src/net/http/server.go:677 +0xc8

goroutine 19905 [chan receive]:
main.ForwardRequestToServer({0x1003809a0, 0x140031277a0}, 0x14003474480, 0x14000192160)
        /Users/bigoh/Documents/learnings/loadbalancer/main.go:80 +0xf4
main.AcceptIncomingRequest({0x1003809a0, 0x140031277a0}, 0x14003474480, 0x14000192160)
        /Users/bigoh/Documents/learnings/loadbalancer/main.go:52 +0x2e8
main.main.func3({0x1003809a0?, 0x140031277a0?}, 0x14000e24b18?)
        /Users/bigoh/Documents/learnings/loadbalancer/main.go:202 +0x28
net/http.HandlerFunc.ServeHTTP(0x100529500?, {0x1003809a0?, 0x140031277a0?}, 0x100252428?)
        /opt/homebrew/Cellar/go/1.22.5/libexec/src/net/http/server.go:2171 +0x38
net/http.(*ServeMux).ServeHTTP(0x0?, {0x1003809a0, 0x140031277a0}, 0x14003474480)
        /opt/homebrew/Cellar/go/1.22.5/libexec/src/net/http/server.go:2688 +0x1a4
net/http.serverHandler.ServeHTTP({0x14003476a50?}, {0x1003809a0?, 0x140031277a0?}, 0x6?)
        /opt/homebrew/Cellar/go/1.22.5/libexec/src/net/http/server.go:3142 +0xbc
net/http.(*conn).serve(0x14003624fc0, {0x100380e38, 0x14000196c60})
        /opt/homebrew/Cellar/go/1.22.5/libexec/src/net/http/server.go:2044 +0x508
created by net/http.(*Server).Serve in goroutine 1
        /opt/homebrew/Cellar/go/1.22.5/libexec/src/net/http/server.go:3290 +0x3f0

goroutine 20034 [chan receive]:
main.ForwardRequestToServer({0x1003809a0, 0x14003127880}, 0x14003628ea0, 0x14000192160)
        /Users/bigoh/Documents/learnings/loadbalancer/main.go:80 +0xf4
main.AcceptIncomingRequest({0x1003809a0, 0x14003127880}, 0x14003628ea0, 0x14000192160)
        /Users/bigoh/Documents/learnings/loadbalancer/main.go:52 +0x2e8
main.main.func3({0x1003809a0?, 0x14003127880?}, 0x14002468b18?)
        /Users/bigoh/Documents/learnings/loadbalancer/main.go:202 +0x28
net/http.HandlerFunc.ServeHTTP(0x100529500?, {0x1003809a0?, 0x14003127880?}, 0x100252428?)
        /opt/homebrew/Cellar/go/1.22.5/libexec/src/net/http/server.go:2171 +0x38
net/http.(*ServeMux).ServeHTTP(0x0?, {0x1003809a0, 0x14003127880}, 0x14003628ea0)
        /opt/homebrew/Cellar/go/1.22.5/libexec/src/net/http/server.go:2688 +0x1a4
net/http.serverHandler.ServeHTTP({0x14003627320?}, {0x1003809a0?, 0x14003127880?}, 0x6?)
        /opt/homebrew/Cellar/go/1.22.5/libexec/src/net/http/server.go:3142 +0xbc
net/http.(*conn).serve(0x14003625050, {0x100380e38, 0x14000196c60})
        /opt/homebrew/Cellar/go/1.22.5/libexec/src/net/http/server.go:2044 +0x508
created by net/http.(*Server).Serve in goroutine 1
        /opt/homebrew/Cellar/go/1.22.5/libexec/src/net/http/server.go:3290 +0x3f0

goroutine 20037 [sync.Mutex.Lock]:
sync.runtime_SemacquireMutex(0x14001a8d6b8?, 0x80?, 0x14001a8d6e8?)
        /opt/homebrew/Cellar/go/1.22.5/libexec/src/runtime/sema.go:77 +0x28
sync.(*Mutex).lockSlow(0x140001a20e0)
        /opt/homebrew/Cellar/go/1.22.5/libexec/src/sync/mutex.go:171 +0x174
sync.(*Mutex).Lock(...)
        /opt/homebrew/Cellar/go/1.22.5/libexec/src/sync/mutex.go:90
main.(*ServerDataFormat).GetServerUsingRoundRobin(0x14000192160, 0x1400362e240)
        /Users/bigoh/Documents/learnings/loadbalancer/main.go:59 +0x80
created by main.ForwardRequestToServer in goroutine 19900
        /Users/bigoh/Documents/learnings/loadbalancer/main.go:79 +0xe0
exit status 2
// logs for the test-script
Error while making request:  Get "http://localhost:8080/users/Ashwin": read tcp [::1]:50189->[::1]:8080: read: connection reset by peer
Error while making request:  Get "http://localhost:8080/users/Ashwin": read tcp [::1]:50190->[::1]:8080: read: connection reset by peer
Error while making request:  Get "http://localhost:8080/users/Ashwin": read tcp [::1]:50191->[::1]:8080: read: connection reset by peer
Error while making request:  Get "http://localhost:8080/users/Ashwin": read tcp [::1]:50192->[::1]:8080: read: connection reset by peer
Error while making request:  Get "http://localhost:8080/users/Ashwin": read tcp [::1]:50194->[::1]:8080: read: connection reset by peer
Error while making request:  Get "http://localhost:8080/users/Ashwin": read tcp [::1]:50199->[::1]:8080: read: connection reset by peer
Error while making request:  Get "http://localhost:8080/users/Ashwin": read tcp [::1]:50575->[::1]:8080: read: connection reset by peer
Error while making request:  Get "http://localhost:8080/users/Ashwin": read tcp [::1]:50861->[::1]:8080: read: connection reset by peer
status code:  200
Error while making request:  Get "http://localhost:8080/users/Ashwin": dial tcp [::1]:8080: connect: connection reset by peer
Error while making request:  Get "http://localhost:8080/users/Ashwin": dial tcp [::1]:8080: connect: connection reset by peer
Error while making request:  Get "http://localhost:8080/users/Ashwin": dial tcp [::1]:8080: connect: connection reset by peer
Error while making request:  Get "http://localhost:8080/users/Ashwin": dial tcp [::1]:8080: connect: connection reset by peer
Error while making request:  Get "http://localhost:8080/users/Ashwin": dial tcp [::1]:8080: connect: connection reset by peer
status code:  200
status code:  200
status code:  200
status code:  200

我没有为负载均衡器和测试脚本添加完整的日志。 请建议我一些处理高并发请求的方法。

go http load-balancing
1个回答
0
投票
var reqCountMap = make(map[int]int)

var serverCheckMap = make(map[string]bool)

我怀疑原因是go中的map并发使用不安全

使用

Mutex
换行地图后,代码在我的机器上运行良好

// load-balancer.go
package main

import (
    "bytes"
    "errors"
    "flag"
    "fmt"
    "io"
    "io/ioutil"
    "log"
    "net/http"
    "sync"
    "time"
)

type ServerDataFormat struct {
    // wg *sync.WaitGroup
    lock  *sync.Mutex
    index int
}

type LockedMap[K comparable, V any] struct {
    sync.RWMutex

    data map[K]V
}

func NewLockedMap[K comparable, V any]() *LockedMap[K, V] {
    return &LockedMap[K, V]{
        data: make(map[K]V),
    }
}

func (l *LockedMap[K, V]) Set(k K, v V) {
    l.RWMutex.Lock()
    defer l.RWMutex.Unlock()

    l.data[k] = v
}

func (l *LockedMap[K, V]) Snapshot() map[K]V {
    l.RWMutex.Lock()
    defer l.RWMutex.Unlock()

    return l.data
}

func (l *LockedMap[K, V]) Get(k K) V {
    l.RWMutex.RLock()
    defer l.RWMutex.RUnlock()

    return l.data[k]
}

// var reqCountMap = make(map[int]int)
var reqCountMap = NewLockedMap[int, int]()

var WorkingServerList []string = []string{
    "http://localhost:7000",
    "http://localhost:7001",
    "http://localhost:7002",
}

var client = &http.Client{
    Transport: &http.Transport{
        MaxIdleConns:        100,
        MaxIdleConnsPerHost: 10,
        IdleConnTimeout:     90 * time.Second,
    },
}

// this will check and keep track of health status of all the servers
// var serverCheckMap = make(map[string]bool)
var serverCheckMap = NewLockedMap[string, bool]()

func AcceptIncomingRequest(w http.ResponseWriter, r *http.Request, srvDataObject *ServerDataFormat) {
    // print the request details
    fmt.Println("Received request from: ", r.RemoteAddr)
    fmt.Println("Method: ", r.Method)
    fmt.Println("Request URI: ", r.RequestURI)
    fmt.Println("Request URL: ", r.URL)
    fmt.Println("Request Path: ", r.URL.Path)
    fmt.Println("Agent: ", r.UserAgent())
    fmt.Println("Headers: ", r.Header)

    ForwardRequestToServer(w, r, srvDataObject)
    return
}

func (srv *ServerDataFormat) GetServerUsingRoundRobin(resp chan string) {
    //defer srv.wg.Done()
    var server string
    srv.lock.Lock()
    nextIndex := (*srv).index + 1
    fmt.Println("curr index: ", (*srv).index)
    fmt.Println("len of list: ", len(WorkingServerList))
    if nextIndex == len(WorkingServerList) {
        nextIndex %= len(WorkingServerList)
    }
    // update the current index
    (*srv).index = nextIndex
    fmt.Println("next index: ", nextIndex)
    server = WorkingServerList[nextIndex]
    data := reqCountMap.Get(nextIndex)
    reqCountMap.Set(nextIndex, data+1) // increment the count
    srv.lock.Unlock()
    resp <- server
}

func ForwardRequestToServer(w http.ResponseWriter, r *http.Request, srvDataObj *ServerDataFormat) {
    fmt.Println("Inside ForwardRequestToServer...")
    // calling this as a go routine
    server := make(chan string)
    go srvDataObj.GetServerUsingRoundRobin(server)
    BaseUrl := <-server
    // base url
    fmt.Println("base url: ", BaseUrl)
    newReq, err := http.NewRequest("GET", BaseUrl+r.RequestURI, r.Body)
    if err != nil {
        fmt.Println("Error while creating new request: ", err.Error())
        http.Error(w, "failed to create new request", http.StatusInternalServerError)
        return
    }

    // set the headers
    for name, values := range r.Header {
        for _, value := range values {
            w.Header().Set(name, value)
        }
    }

    resp, err := client.Do(newReq)
    if err != nil {
        fmt.Println("Error while making request to the server: ", err.Error())
        http.Error(w, "failed to making request to the server", http.StatusInternalServerError)
        return
    }

    defer resp.Body.Close()

    for name, values := range resp.Header {
        for _, value := range values {
            w.Header().Set(name, value)
        }
    }

    respBytes, err := ioutil.ReadAll(resp.Body)
    if err != nil {
        fmt.Println("Error while reading response bytes from the server: ", err.Error())
        http.Error(w, err.Error(), http.StatusInternalServerError)
        return
    }

    fmt.Println("\n\nResponse status: ", resp.Status)
    fmt.Printf("response from server: %s", string(respBytes))

    w.Header().Set("Content-Type", "text/plain")
    io.Copy(w, resp.Body)
    printServerRequestCountDetails()
}

func printServerRequestCountDetails() {
    fmt.Println("\n****** Request Details *******")
    for server, count := range reqCountMap.Snapshot() {
        fmt.Printf("Server %d: %d\n", server+1, count)
    }
    fmt.Println("******************************")
}

func removeServerFromWorkingServerList(targetServer string) {
    for index, server := range WorkingServerList {
        if server == targetServer {
            WorkingServerList = append(WorkingServerList[:index], WorkingServerList[index+1:]...)
            break
        }
    }
}

func ServerHealthCheck() {
    for server, pass := range serverCheckMap.Snapshot() {
        newReq, err := http.NewRequest("GET", server+"/healthcheck", bytes.NewReader(nil))
        if err != nil {
            fmt.Println("Error while creating new request for health check: ", err.Error())
            break
        }

        client := &http.Client{}
        resp, err := client.Do(newReq)
        if err != nil {
            fmt.Printf("\nServer %s is down\n", server)
            // remove the server from the list of working servers
            removeServerFromWorkingServerList(server)
            serverCheckMap.Set(server, false) // mark as not working
        } else if resp.StatusCode == 200 && !pass {
            WorkingServerList = append(WorkingServerList, server)
            serverCheckMap.Set(server, true) // mark as working
        }
    }

    // print all the active servers
    fmt.Println("********* Active Servers **********")
    for _, server := range WorkingServerList {
        fmt.Println(server)
    }
    fmt.Println("**********************************")
    return
}

func main() {
    // fill the server check map
    // initially all the servers are working
    for _, server := range WorkingServerList {
        serverCheckMap.Set(server, true)
    }

    interval := flag.Int("time", 10, "this flag is used to specify the time interval between each server healthcheck up")
    flag.Parse()

    http.HandleFunc("/hello", func(w http.ResponseWriter, r *http.Request) {
        w.Header().Set("Content-Type", "text/plain")
        w.Write([]byte("Hello"))
    })

    http.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) {
        w.Header().Set("Content-Type", "application/json")
        w.Write([]byte("Server Health is Ok"))
    })

    serverDataObj := ServerDataFormat{
        index: 0,
        lock:  &sync.Mutex{},
    }

    // load balancer route
    http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
        // start a go routine
        AcceptIncomingRequest(w, r, &serverDataObj)
    })

    // heath check anon func
    go func(t int) {
        for {
            ServerHealthCheck()
            if len(WorkingServerList) == 0 {
                fmt.Println("No working servers !!!!")
                break
            }
            time.Sleep(time.Second * time.Duration(t))
        }
    }(*interval)

    // setting up http server
    fmt.Println("Starting server on port: 8080")
    err := http.ListenAndServe(":8080", nil)
    if errors.Is(err, http.ErrServerClosed) {
        log.Fatal("error during ListenAndServe: server closed ", err.Error())
        return
    } else if err != nil {
        log.Fatal("error during ListenAndServer: ", err.Error())
        return
    }
}
© www.soinside.com 2019 - 2024. All rights reserved.