我在 golang 中创建了一个简单的负载均衡器,我正在测试它的能力 处理并发请求。为此,我正在运行一个 golang 脚本,其中发送 10,000 个请求,并将最大并发请求设置为 500。问题是,当数量较低时,没有问题,但当数量较高时,如上面提到的,代码给我错误。我附上了机器人负载均衡器和脚本的代码和错误屏幕截图。
// load-balancer.go
package main
import (
"bytes"
"errors"
"flag"
"fmt"
"io"
"io/ioutil"
"log"
"net/http"
"sync"
"time"
)
type ServerDataFormat struct {
// wg *sync.WaitGroup
lock *sync.Mutex
index int
}
var reqCountMap = make(map[int]int)
var WorkingServerList []string = []string{
"http://localhost:7000",
"http://localhost:7001",
"http://localhost:7002",
}
var client = &http.Client{
Transport: &http.Transport{
MaxIdleConns: 100,
MaxIdleConnsPerHost: 10,
IdleConnTimeout: 90 * time.Second,
},
}
// this will check and keep track of health status of all the servers
var serverCheckMap = make(map[string]bool)
func AcceptIncomingRequest(w http.ResponseWriter, r *http.Request, srvDataObject *ServerDataFormat) {
// print the request details
fmt.Println("Received request from: ", r.RemoteAddr)
fmt.Println("Method: ", r.Method)
fmt.Println("Request URI: ", r.RequestURI)
fmt.Println("Request URL: ", r.URL)
fmt.Println("Request Path: ", r.URL.Path)
fmt.Println("Agent: ", r.UserAgent())
fmt.Println("Headers: ", r.Header)
ForwardRequestToServer(w, r, srvDataObject)
return
}
func (srv *ServerDataFormat) GetServerUsingRoundRobin(resp chan string) {
//defer srv.wg.Done()
var server string
srv.lock.Lock()
nextIndex := (*srv).index+1
fmt.Println("curr index: ", (*srv).index)
fmt.Println("len of list: ", len(WorkingServerList))
if nextIndex == len(WorkingServerList) {
nextIndex %= len(WorkingServerList)
}
// update the current index
(*srv).index = nextIndex
fmt.Println("next index: ", nextIndex)
server = WorkingServerList[nextIndex]
reqCountMap[nextIndex]++ // increment the count
srv.lock.Unlock()
resp <- server
}
func ForwardRequestToServer(w http.ResponseWriter, r *http.Request, srvDataObj *ServerDataFormat) {
fmt.Println("Inside ForwardRequestToServer...")
// calling this as a go routine
server := make(chan string)
go srvDataObj.GetServerUsingRoundRobin(server)
BaseUrl := <-server
// base url
fmt.Println("base url: ", BaseUrl)
newReq, err := http.NewRequest("GET", BaseUrl+r.RequestURI, r.Body)
if err != nil {
fmt.Println("Error while creating new request: ", err.Error())
http.Error(w, "failed to create new request", http.StatusInternalServerError)
return
}
// set the headers
for name, values := range r.Header {
for _, value := range values {
w.Header().Set(name, value)
}
}
resp, err := client.Do(newReq)
if err != nil {
fmt.Println("Error while making request to the server: ", err.Error())
http.Error(w, "failed to making request to the server", http.StatusInternalServerError)
return
}
defer resp.Body.Close()
for name, values := range resp.Header {
for _, value := range values {
w.Header().Set(name, value)
}
}
respBytes, err := ioutil.ReadAll(resp.Body)
if err != nil {
fmt.Println("Error while reading response bytes from the server: ", err.Error())
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
fmt.Println("\n\nResponse status: ", resp.Status)
fmt.Printf("response from server: %s", string(respBytes))
w.Header().Set("Content-Type", "text/plain")
io.Copy(w, resp.Body)
printServerRequestCountDetails()
}
func printServerRequestCountDetails() {
fmt.Println("\n****** Request Details *******")
for server, count := range reqCountMap {
fmt.Printf("Server %d: %d\n", server+1, count)
}
fmt.Println("******************************")
}
func removeServerFromWorkingServerList(targetServer string) {
for index, server := range WorkingServerList {
if server == targetServer {
WorkingServerList = append(WorkingServerList[:index], WorkingServerList[index+1:]...)
break
}
}
}
func ServerHealthCheck() {
for server, pass := range serverCheckMap {
newReq, err := http.NewRequest("GET", server + "/healthcheck", bytes.NewReader(nil))
if err != nil {
fmt.Println("Error while creating new request for health check: ", err.Error())
break
}
client := &http.Client{}
resp, err := client.Do(newReq)
if err != nil {
fmt.Printf("\nServer %s is down\n", server)
// remove the server from the list of working servers
removeServerFromWorkingServerList(server)
serverCheckMap[server] = false // mark as not working
}else if resp.StatusCode == 200 && !pass {
WorkingServerList = append(WorkingServerList, server)
serverCheckMap[server] = true // mark as working
}
}
// print all the active servers
fmt.Println("********* Active Servers **********")
for _, server := range WorkingServerList {
fmt.Println(server)
}
fmt.Println("**********************************")
return
}
func main() {
// fill the server check map
// initially all the servers are working
for _, server := range WorkingServerList {
serverCheckMap[server] = true
}
interval := flag.Int("time", 10, "this flag is used to specify the time interval between each server healthcheck up")
flag.Parse()
http.HandleFunc("/hello", func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "text/plain")
w.Write([]byte("Hello"))
})
http.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
w.Write([]byte("Server Health is Ok"))
})
serverDataObj := ServerDataFormat {
index: 0,
lock: &sync.Mutex{},
}
// load balancer route
http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
// start a go routine
AcceptIncomingRequest(w, r, &serverDataObj)
})
// heath check anon func
go func (t int) {
for {
ServerHealthCheck()
if len(WorkingServerList)==0 {
fmt.Println("No working servers !!!!")
break
}
time.Sleep(time.Second * time.Duration(t))
}
}(*interval)
// setting up http server
fmt.Println("Starting server on port: 8080")
err := http.ListenAndServe(":8080", nil)
if errors.Is(err, http.ErrServerClosed) {
log.Fatal("error during ListenAndServe: server closed ", err.Error())
return
} else if err != nil {
log.Fatal("error during ListenAndServer: ", err.Error())
return
}
}
// test-script.go
package main
import (
"bytes"
"fmt"
"net/http"
"sync"
)
func makeRequest(wg *sync.WaitGroup) {
defer wg.Done()
newReq, err := http.NewRequest("GET", "http://localhost:8080/users/Ashwin", bytes.NewReader(nil))
if err != nil {
fmt.Println("error while creating new http request: ", err.Error())
return
}
client := &http.Client{}
resp, err := client.Do(newReq)
if err != nil {
fmt.Println("Error while making request: ", err.Error())
return
}
fmt.Println("status code: ", resp.StatusCode)
}
func main() {
wg := sync.WaitGroup{}
semaphore := make(chan struct{}, 500)
// launch 10,000 go routines
for i:=0; i<10000; i++ {
wg.Add(1)
semaphore <- struct{}{}
go func() {
defer func(){<-semaphore}()
makeRequest(&wg)
}()
}
wg.Wait()
fmt.Println("All requests completed")
}
// logs for the load balancer
// I am not printing complete logs as the error is repetitive
goroutine 20159 [chan receive]:
main.ForwardRequestToServer({0x1003809a0, 0x140039840e0}, 0x1400380f440, 0x14000192160)
/Users/bigoh/Documents/learnings/loadbalancer/main.go:80 +0xf4
main.AcceptIncomingRequest({0x1003809a0, 0x140039840e0}, 0x1400380f440, 0x14000192160)
/Users/bigoh/Documents/learnings/loadbalancer/main.go:52 +0x2e8
main.main.func3({0x1003809a0?, 0x140039840e0?}, 0x1400272eb18?)
/Users/bigoh/Documents/learnings/loadbalancer/main.go:202 +0x28
net/http.HandlerFunc.ServeHTTP(0x100529500?, {0x1003809a0?, 0x140039840e0?}, 0x100252428?)
/opt/homebrew/Cellar/go/1.22.5/libexec/src/net/http/server.go:2171 +0x38
net/http.(*ServeMux).ServeHTTP(0x0?, {0x1003809a0, 0x140039840e0}, 0x1400380f440)
/opt/homebrew/Cellar/go/1.22.5/libexec/src/net/http/server.go:2688 +0x1a4
net/http.serverHandler.ServeHTTP({0x14003817200?}, {0x1003809a0?, 0x140039840e0?}, 0x6?)
/opt/homebrew/Cellar/go/1.22.5/libexec/src/net/http/server.go:3142 +0xbc
net/http.(*conn).serve(0x1400380bef0, {0x100380e38, 0x14000196c60})
/opt/homebrew/Cellar/go/1.22.5/libexec/src/net/http/server.go:2044 +0x508
created by net/http.(*Server).Serve in goroutine 1
/opt/homebrew/Cellar/go/1.22.5/libexec/src/net/http/server.go:3290 +0x3f0
goroutine 19846 [sync.Mutex.Lock]:
sync.runtime_SemacquireMutex(0x10005dfb0?, 0x0?, 0x140024b5ee8?)
/opt/homebrew/Cellar/go/1.22.5/libexec/src/runtime/sema.go:77 +0x28
sync.(*Mutex).lockSlow(0x140001a20e0)
/opt/homebrew/Cellar/go/1.22.5/libexec/src/sync/mutex.go:171 +0x174
sync.(*Mutex).Lock(...)
/opt/homebrew/Cellar/go/1.22.5/libexec/src/sync/mutex.go:90
main.(*ServerDataFormat).GetServerUsingRoundRobin(0x14000192160, 0x1400368c6c0)
/Users/bigoh/Documents/learnings/loadbalancer/main.go:59 +0x80
created by main.ForwardRequestToServer in goroutine 20227
/Users/bigoh/Documents/learnings/loadbalancer/main.go:79 +0xe0
goroutine 19103 [IO wait]:
internal/poll.runtime_pollWait(0x1471ff848, 0x72)
/opt/homebrew/Cellar/go/1.22.5/libexec/src/runtime/netpoll.go:345 +0xa0
internal/poll.(*pollDesc).wait(0x14002da6400?, 0x14002db8eb1?, 0x0)
/opt/homebrew/Cellar/go/1.22.5/libexec/src/internal/poll/fd_poll_runtime.go:84 +0x28
internal/poll.(*pollDesc).waitRead(...)
/opt/homebrew/Cellar/go/1.22.5/libexec/src/internal/poll/fd_poll_runtime.go:89
internal/poll.(*FD).Read(0x14002da6400, {0x14002db8eb1, 0x1, 0x1})
/opt/homebrew/Cellar/go/1.22.5/libexec/src/internal/poll/fd_unix.go:164 +0x200
net.(*netFD).Read(0x14002da6400, {0x14002db8eb1?, 0x10032dbe0?, 0x14000196990?})
/opt/homebrew/Cellar/go/1.22.5/libexec/src/net/fd_posix.go:55 +0x28
net.(*conn).Read(0x140034c9470, {0x14002db8eb1?, 0x10031e3c0?, 0x1004fc390?})
/opt/homebrew/Cellar/go/1.22.5/libexec/src/net/net.go:185 +0x34
net/http.(*connReader).backgroundRead(0x14002db8ea0)
/opt/homebrew/Cellar/go/1.22.5/libexec/src/net/http/server.go:681 +0x40
created by net/http.(*connReader).startBackgroundRead in goroutine 19102
/opt/homebrew/Cellar/go/1.22.5/libexec/src/net/http/server.go:677 +0xc8
goroutine 19104 [IO wait]:
internal/poll.runtime_pollWait(0x1473df4c0, 0x72)
/opt/homebrew/Cellar/go/1.22.5/libexec/src/runtime/netpoll.go:345 +0xa0
internal/poll.(*pollDesc).wait(0x14002da6300?, 0x14002db8fd1?, 0x0)
/opt/homebrew/Cellar/go/1.22.5/libexec/src/internal/poll/fd_poll_runtime.go:84 +0x28
internal/poll.(*pollDesc).waitRead(...)
/opt/homebrew/Cellar/go/1.22.5/libexec/src/internal/poll/fd_poll_runtime.go:89
internal/poll.(*FD).Read(0x14002da6300, {0x14002db8fd1, 0x1, 0x1})
/opt/homebrew/Cellar/go/1.22.5/libexec/src/internal/poll/fd_unix.go:164 +0x200
net.(*netFD).Read(0x14002da6300, {0x14002db8fd1?, 0x10032dbe0?, 0x14000196990?})
/opt/homebrew/Cellar/go/1.22.5/libexec/src/net/fd_posix.go:55 +0x28
net.(*conn).Read(0x140034c9460, {0x14002db8fd1?, 0x10031e3c0?, 0x1400228d448?})
/opt/homebrew/Cellar/go/1.22.5/libexec/src/net/net.go:185 +0x34
net/http.(*connReader).backgroundRead(0x14002db8fc0)
/opt/homebrew/Cellar/go/1.22.5/libexec/src/net/http/server.go:681 +0x40
created by net/http.(*connReader).startBackgroundRead in goroutine 19100
/opt/homebrew/Cellar/go/1.22.5/libexec/src/net/http/server.go:677 +0xc8
goroutine 19105 [IO wait]:
internal/poll.runtime_pollWait(0x147253f48, 0x72)
/opt/homebrew/Cellar/go/1.22.5/libexec/src/runtime/netpoll.go:345 +0xa0
internal/poll.(*pollDesc).wait(0x14002da6380?, 0x14002db90f1?, 0x0)
/opt/homebrew/Cellar/go/1.22.5/libexec/src/internal/poll/fd_poll_runtime.go:84 +0x28
internal/poll.(*pollDesc).waitRead(...)
/opt/homebrew/Cellar/go/1.22.5/libexec/src/internal/poll/fd_poll_runtime.go:89
internal/poll.(*FD).Read(0x14002da6380, {0x14002db90f1, 0x1, 0x1})
/opt/homebrew/Cellar/go/1.22.5/libexec/src/internal/poll/fd_unix.go:164 +0x200
net.(*netFD).Read(0x14002da6380, {0x14002db90f1?, 0x10032dbe0?, 0x14000196990?})
/opt/homebrew/Cellar/go/1.22.5/libexec/src/net/fd_posix.go:55 +0x28
net.(*conn).Read(0x140034c9468, {0x14002db90f1?, 0x10031e3c0?, 0x1400091d238?})
/opt/homebrew/Cellar/go/1.22.5/libexec/src/net/net.go:185 +0x34
net/http.(*connReader).backgroundRead(0x14002db90e0)
/opt/homebrew/Cellar/go/1.22.5/libexec/src/net/http/server.go:681 +0x40
created by net/http.(*connReader).startBackgroundRead in goroutine 19101
/opt/homebrew/Cellar/go/1.22.5/libexec/src/net/http/server.go:677 +0xc8
goroutine 19154 [chan receive]:
main.ForwardRequestToServer({0x1003809a0, 0x14002d9f0a0}, 0x14002d8d9e0, 0x14000192160)
/Users/bigoh/Documents/learnings/loadbalancer/main.go:80 +0xf4
main.AcceptIncomingRequest({0x1003809a0, 0x14002d9f0a0}, 0x14002d8d9e0, 0x14000192160)
/Users/bigoh/Documents/learnings/loadbalancer/main.go:52 +0x2e8
main.main.func3({0x1003809a0?, 0x14002d9f0a0?}, 0x140023beb18?)
/Users/bigoh/Documents/learnings/loadbalancer/main.go:202 +0x28
net/http.HandlerFunc.ServeHTTP(0x100529500?, {0x1003809a0?, 0x14002d9f0a0?}, 0x100252428?)
/opt/homebrew/Cellar/go/1.22.5/libexec/src/net/http/server.go:2171 +0x38
net/http.(*ServeMux).ServeHTTP(0x0?, {0x1003809a0, 0x14002d9f0a0}, 0x14002d8d9e0)
/opt/homebrew/Cellar/go/1.22.5/libexec/src/net/http/server.go:2688 +0x1a4
net/http.serverHandler.ServeHTTP({0x14002db9380?}, {0x1003809a0?, 0x14002d9f0a0?}, 0x6?)
/opt/homebrew/Cellar/go/1.22.5/libexec/src/net/http/server.go:3142 +0xbc
net/http.(*conn).serve(0x14002da9d40, {0x100380e38, 0x14000196c60})
/opt/homebrew/Cellar/go/1.22.5/libexec/src/net/http/server.go:2044 +0x508
created by net/http.(*Server).Serve in goroutine 1
/opt/homebrew/Cellar/go/1.22.5/libexec/src/net/http/server.go:3290 +0x3f0
goroutine 19690 [IO wait]:
internal/poll.runtime_pollWait(0x1473352b0, 0x72)
/opt/homebrew/Cellar/go/1.22.5/libexec/src/runtime/netpoll.go:345 +0xa0
internal/poll.(*pollDesc).wait(0x140033fa180?, 0x140033f7661?, 0x0)
/opt/homebrew/Cellar/go/1.22.5/libexec/src/internal/poll/fd_poll_runtime.go:84 +0x28
internal/poll.(*pollDesc).waitRead(...)
/opt/homebrew/Cellar/go/1.22.5/libexec/src/internal/poll/fd_poll_runtime.go:89
internal/poll.(*FD).Read(0x140033fa180, {0x140033f7661, 0x1, 0x1})
/opt/homebrew/Cellar/go/1.22.5/libexec/src/internal/poll/fd_unix.go:164 +0x200
net.(*netFD).Read(0x140033fa180, {0x140033f7661?, 0x10032dbe0?, 0x14000196990?})
/opt/homebrew/Cellar/go/1.22.5/libexec/src/net/fd_posix.go:55 +0x28
net.(*conn).Read(0x1400042dd70, {0x140033f7661?, 0x10031e3c0?, 0x1004fc390?})
/opt/homebrew/Cellar/go/1.22.5/libexec/src/net/net.go:185 +0x34
net/http.(*connReader).backgroundRead(0x140033f7650)
/opt/homebrew/Cellar/go/1.22.5/libexec/src/net/http/server.go:681 +0x40
created by net/http.(*connReader).startBackgroundRead in goroutine 19616
/opt/homebrew/Cellar/go/1.22.5/libexec/src/net/http/server.go:677 +0xc8
goroutine 19899 [IO wait]:
internal/poll.runtime_pollWait(0x1473dad10, 0x72)
/opt/homebrew/Cellar/go/1.22.5/libexec/src/runtime/netpoll.go:345 +0xa0
internal/poll.(*pollDesc).wait(0x140033faa80?, 0x14003626cd1?, 0x0)
/opt/homebrew/Cellar/go/1.22.5/libexec/src/internal/poll/fd_poll_runtime.go:84 +0x28
internal/poll.(*pollDesc).waitRead(...)
/opt/homebrew/Cellar/go/1.22.5/libexec/src/internal/poll/fd_poll_runtime.go:89
internal/poll.(*FD).Read(0x140033faa80, {0x14003626cd1, 0x1, 0x1})
/opt/homebrew/Cellar/go/1.22.5/libexec/src/internal/poll/fd_unix.go:164 +0x200
net.(*netFD).Read(0x140033faa80, {0x14003626cd1?, 0x10032dbe0?, 0x14000196990?})
/opt/homebrew/Cellar/go/1.22.5/libexec/src/net/fd_posix.go:55 +0x28
net.(*conn).Read(0x1400042def0, {0x14003626cd1?, 0x10031e3c0?, 0x1004fc380?})
/opt/homebrew/Cellar/go/1.22.5/libexec/src/net/net.go:185 +0x34
net/http.(*connReader).backgroundRead(0x14003626cc0)
/opt/homebrew/Cellar/go/1.22.5/libexec/src/net/http/server.go:681 +0x40
created by net/http.(*connReader).startBackgroundRead in goroutine 19898
/opt/homebrew/Cellar/go/1.22.5/libexec/src/net/http/server.go:677 +0xc8
goroutine 19902 [chan receive]:
main.ForwardRequestToServer({0x1003809a0, 0x14003611180}, 0x14003628b40, 0x14000192160)
/Users/bigoh/Documents/learnings/loadbalancer/main.go:80 +0xf4
main.AcceptIncomingRequest({0x1003809a0, 0x14003611180}, 0x14003628b40, 0x14000192160)
/Users/bigoh/Documents/learnings/loadbalancer/main.go:52 +0x2e8
main.main.func3({0x1003809a0?, 0x14003611180?}, 0x14000127b18?)
/Users/bigoh/Documents/learnings/loadbalancer/main.go:202 +0x28
net/http.HandlerFunc.ServeHTTP(0x100529500?, {0x1003809a0?, 0x14003611180?}, 0x100252428?)
/opt/homebrew/Cellar/go/1.22.5/libexec/src/net/http/server.go:2171 +0x38
net/http.(*ServeMux).ServeHTTP(0x0?, {0x1003809a0, 0x14003611180}, 0x14003628b40)
/opt/homebrew/Cellar/go/1.22.5/libexec/src/net/http/server.go:2688 +0x1a4
net/http.serverHandler.ServeHTTP({0x14003626f30?}, {0x1003809a0?, 0x14003611180?}, 0x6?)
/opt/homebrew/Cellar/go/1.22.5/libexec/src/net/http/server.go:3142 +0xbc
net/http.(*conn).serve(0x14003624cf0, {0x100380e38, 0x14000196c60})
/opt/homebrew/Cellar/go/1.22.5/libexec/src/net/http/server.go:2044 +0x508
created by net/http.(*Server).Serve in goroutine 1
/opt/homebrew/Cellar/go/1.22.5/libexec/src/net/http/server.go:3290 +0x3f0
goroutine 19903 [IO wait]:
internal/poll.runtime_pollWait(0x147244428, 0x72)
/opt/homebrew/Cellar/go/1.22.5/libexec/src/runtime/netpoll.go:345 +0xa0
internal/poll.(*pollDesc).wait(0x140033fab00?, 0x14003626fa1?, 0x0)
/opt/homebrew/Cellar/go/1.22.5/libexec/src/internal/poll/fd_poll_runtime.go:84 +0x28
internal/poll.(*pollDesc).waitRead(...)
/opt/homebrew/Cellar/go/1.22.5/libexec/src/internal/poll/fd_poll_runtime.go:89
internal/poll.(*FD).Read(0x140033fab00, {0x14003626fa1, 0x1, 0x1})
/opt/homebrew/Cellar/go/1.22.5/libexec/src/internal/poll/fd_unix.go:164 +0x200
net.(*netFD).Read(0x140033fab00, {0x14003626fa1?, 0x10032dbe0?, 0x14000196990?})
/opt/homebrew/Cellar/go/1.22.5/libexec/src/net/fd_posix.go:55 +0x28
net.(*conn).Read(0x1400042def8, {0x14003626fa1?, 0x10031e3c0?, 0x1004fc390?})
/opt/homebrew/Cellar/go/1.22.5/libexec/src/net/net.go:185 +0x34
net/http.(*connReader).backgroundRead(0x14003626f90)
/opt/homebrew/Cellar/go/1.22.5/libexec/src/net/http/server.go:681 +0x40
created by net/http.(*connReader).startBackgroundRead in goroutine 19900
/opt/homebrew/Cellar/go/1.22.5/libexec/src/net/http/server.go:677 +0xc8
goroutine 19904 [IO wait]:
internal/poll.runtime_pollWait(0x1472bdbb8, 0x72)
/opt/homebrew/Cellar/go/1.22.5/libexec/src/runtime/netpoll.go:345 +0xa0
internal/poll.(*pollDesc).wait(0x140033fac00?, 0x14003626f41?, 0x0)
/opt/homebrew/Cellar/go/1.22.5/libexec/src/internal/poll/fd_poll_runtime.go:84 +0x28
internal/poll.(*pollDesc).waitRead(...)
/opt/homebrew/Cellar/go/1.22.5/libexec/src/internal/poll/fd_poll_runtime.go:89
internal/poll.(*FD).Read(0x140033fac00, {0x14003626f41, 0x1, 0x1})
/opt/homebrew/Cellar/go/1.22.5/libexec/src/internal/poll/fd_unix.go:164 +0x200
net.(*netFD).Read(0x140033fac00, {0x14003626f41?, 0x10032dbe0?, 0x14000196990?})
/opt/homebrew/Cellar/go/1.22.5/libexec/src/net/fd_posix.go:55 +0x28
net.(*conn).Read(0x1400042df08, {0x14003626f41?, 0x10031e3c0?, 0x1004fc380?})
/opt/homebrew/Cellar/go/1.22.5/libexec/src/net/net.go:185 +0x34
net/http.(*connReader).backgroundRead(0x14003626f30)
/opt/homebrew/Cellar/go/1.22.5/libexec/src/net/http/server.go:681 +0x40
created by net/http.(*connReader).startBackgroundRead in goroutine 19902
/opt/homebrew/Cellar/go/1.22.5/libexec/src/net/http/server.go:677 +0xc8
goroutine 19905 [chan receive]:
main.ForwardRequestToServer({0x1003809a0, 0x140031277a0}, 0x14003474480, 0x14000192160)
/Users/bigoh/Documents/learnings/loadbalancer/main.go:80 +0xf4
main.AcceptIncomingRequest({0x1003809a0, 0x140031277a0}, 0x14003474480, 0x14000192160)
/Users/bigoh/Documents/learnings/loadbalancer/main.go:52 +0x2e8
main.main.func3({0x1003809a0?, 0x140031277a0?}, 0x14000e24b18?)
/Users/bigoh/Documents/learnings/loadbalancer/main.go:202 +0x28
net/http.HandlerFunc.ServeHTTP(0x100529500?, {0x1003809a0?, 0x140031277a0?}, 0x100252428?)
/opt/homebrew/Cellar/go/1.22.5/libexec/src/net/http/server.go:2171 +0x38
net/http.(*ServeMux).ServeHTTP(0x0?, {0x1003809a0, 0x140031277a0}, 0x14003474480)
/opt/homebrew/Cellar/go/1.22.5/libexec/src/net/http/server.go:2688 +0x1a4
net/http.serverHandler.ServeHTTP({0x14003476a50?}, {0x1003809a0?, 0x140031277a0?}, 0x6?)
/opt/homebrew/Cellar/go/1.22.5/libexec/src/net/http/server.go:3142 +0xbc
net/http.(*conn).serve(0x14003624fc0, {0x100380e38, 0x14000196c60})
/opt/homebrew/Cellar/go/1.22.5/libexec/src/net/http/server.go:2044 +0x508
created by net/http.(*Server).Serve in goroutine 1
/opt/homebrew/Cellar/go/1.22.5/libexec/src/net/http/server.go:3290 +0x3f0
goroutine 20034 [chan receive]:
main.ForwardRequestToServer({0x1003809a0, 0x14003127880}, 0x14003628ea0, 0x14000192160)
/Users/bigoh/Documents/learnings/loadbalancer/main.go:80 +0xf4
main.AcceptIncomingRequest({0x1003809a0, 0x14003127880}, 0x14003628ea0, 0x14000192160)
/Users/bigoh/Documents/learnings/loadbalancer/main.go:52 +0x2e8
main.main.func3({0x1003809a0?, 0x14003127880?}, 0x14002468b18?)
/Users/bigoh/Documents/learnings/loadbalancer/main.go:202 +0x28
net/http.HandlerFunc.ServeHTTP(0x100529500?, {0x1003809a0?, 0x14003127880?}, 0x100252428?)
/opt/homebrew/Cellar/go/1.22.5/libexec/src/net/http/server.go:2171 +0x38
net/http.(*ServeMux).ServeHTTP(0x0?, {0x1003809a0, 0x14003127880}, 0x14003628ea0)
/opt/homebrew/Cellar/go/1.22.5/libexec/src/net/http/server.go:2688 +0x1a4
net/http.serverHandler.ServeHTTP({0x14003627320?}, {0x1003809a0?, 0x14003127880?}, 0x6?)
/opt/homebrew/Cellar/go/1.22.5/libexec/src/net/http/server.go:3142 +0xbc
net/http.(*conn).serve(0x14003625050, {0x100380e38, 0x14000196c60})
/opt/homebrew/Cellar/go/1.22.5/libexec/src/net/http/server.go:2044 +0x508
created by net/http.(*Server).Serve in goroutine 1
/opt/homebrew/Cellar/go/1.22.5/libexec/src/net/http/server.go:3290 +0x3f0
goroutine 20037 [sync.Mutex.Lock]:
sync.runtime_SemacquireMutex(0x14001a8d6b8?, 0x80?, 0x14001a8d6e8?)
/opt/homebrew/Cellar/go/1.22.5/libexec/src/runtime/sema.go:77 +0x28
sync.(*Mutex).lockSlow(0x140001a20e0)
/opt/homebrew/Cellar/go/1.22.5/libexec/src/sync/mutex.go:171 +0x174
sync.(*Mutex).Lock(...)
/opt/homebrew/Cellar/go/1.22.5/libexec/src/sync/mutex.go:90
main.(*ServerDataFormat).GetServerUsingRoundRobin(0x14000192160, 0x1400362e240)
/Users/bigoh/Documents/learnings/loadbalancer/main.go:59 +0x80
created by main.ForwardRequestToServer in goroutine 19900
/Users/bigoh/Documents/learnings/loadbalancer/main.go:79 +0xe0
exit status 2
// logs for the test-script
Error while making request: Get "http://localhost:8080/users/Ashwin": read tcp [::1]:50189->[::1]:8080: read: connection reset by peer
Error while making request: Get "http://localhost:8080/users/Ashwin": read tcp [::1]:50190->[::1]:8080: read: connection reset by peer
Error while making request: Get "http://localhost:8080/users/Ashwin": read tcp [::1]:50191->[::1]:8080: read: connection reset by peer
Error while making request: Get "http://localhost:8080/users/Ashwin": read tcp [::1]:50192->[::1]:8080: read: connection reset by peer
Error while making request: Get "http://localhost:8080/users/Ashwin": read tcp [::1]:50194->[::1]:8080: read: connection reset by peer
Error while making request: Get "http://localhost:8080/users/Ashwin": read tcp [::1]:50199->[::1]:8080: read: connection reset by peer
Error while making request: Get "http://localhost:8080/users/Ashwin": read tcp [::1]:50575->[::1]:8080: read: connection reset by peer
Error while making request: Get "http://localhost:8080/users/Ashwin": read tcp [::1]:50861->[::1]:8080: read: connection reset by peer
status code: 200
Error while making request: Get "http://localhost:8080/users/Ashwin": dial tcp [::1]:8080: connect: connection reset by peer
Error while making request: Get "http://localhost:8080/users/Ashwin": dial tcp [::1]:8080: connect: connection reset by peer
Error while making request: Get "http://localhost:8080/users/Ashwin": dial tcp [::1]:8080: connect: connection reset by peer
Error while making request: Get "http://localhost:8080/users/Ashwin": dial tcp [::1]:8080: connect: connection reset by peer
Error while making request: Get "http://localhost:8080/users/Ashwin": dial tcp [::1]:8080: connect: connection reset by peer
status code: 200
status code: 200
status code: 200
status code: 200
我没有为负载均衡器和测试脚本添加完整的日志。 请建议我一些处理高并发请求的方法。
var reqCountMap = make(map[int]int)
var serverCheckMap = make(map[string]bool)
我怀疑原因是go中的map并发使用不安全
使用
Mutex
换行地图后,代码在我的机器上运行良好
// load-balancer.go
package main
import (
"bytes"
"errors"
"flag"
"fmt"
"io"
"io/ioutil"
"log"
"net/http"
"sync"
"time"
)
type ServerDataFormat struct {
// wg *sync.WaitGroup
lock *sync.Mutex
index int
}
type LockedMap[K comparable, V any] struct {
sync.RWMutex
data map[K]V
}
func NewLockedMap[K comparable, V any]() *LockedMap[K, V] {
return &LockedMap[K, V]{
data: make(map[K]V),
}
}
func (l *LockedMap[K, V]) Set(k K, v V) {
l.RWMutex.Lock()
defer l.RWMutex.Unlock()
l.data[k] = v
}
func (l *LockedMap[K, V]) Snapshot() map[K]V {
l.RWMutex.Lock()
defer l.RWMutex.Unlock()
return l.data
}
func (l *LockedMap[K, V]) Get(k K) V {
l.RWMutex.RLock()
defer l.RWMutex.RUnlock()
return l.data[k]
}
// var reqCountMap = make(map[int]int)
var reqCountMap = NewLockedMap[int, int]()
var WorkingServerList []string = []string{
"http://localhost:7000",
"http://localhost:7001",
"http://localhost:7002",
}
var client = &http.Client{
Transport: &http.Transport{
MaxIdleConns: 100,
MaxIdleConnsPerHost: 10,
IdleConnTimeout: 90 * time.Second,
},
}
// this will check and keep track of health status of all the servers
// var serverCheckMap = make(map[string]bool)
var serverCheckMap = NewLockedMap[string, bool]()
func AcceptIncomingRequest(w http.ResponseWriter, r *http.Request, srvDataObject *ServerDataFormat) {
// print the request details
fmt.Println("Received request from: ", r.RemoteAddr)
fmt.Println("Method: ", r.Method)
fmt.Println("Request URI: ", r.RequestURI)
fmt.Println("Request URL: ", r.URL)
fmt.Println("Request Path: ", r.URL.Path)
fmt.Println("Agent: ", r.UserAgent())
fmt.Println("Headers: ", r.Header)
ForwardRequestToServer(w, r, srvDataObject)
return
}
func (srv *ServerDataFormat) GetServerUsingRoundRobin(resp chan string) {
//defer srv.wg.Done()
var server string
srv.lock.Lock()
nextIndex := (*srv).index + 1
fmt.Println("curr index: ", (*srv).index)
fmt.Println("len of list: ", len(WorkingServerList))
if nextIndex == len(WorkingServerList) {
nextIndex %= len(WorkingServerList)
}
// update the current index
(*srv).index = nextIndex
fmt.Println("next index: ", nextIndex)
server = WorkingServerList[nextIndex]
data := reqCountMap.Get(nextIndex)
reqCountMap.Set(nextIndex, data+1) // increment the count
srv.lock.Unlock()
resp <- server
}
func ForwardRequestToServer(w http.ResponseWriter, r *http.Request, srvDataObj *ServerDataFormat) {
fmt.Println("Inside ForwardRequestToServer...")
// calling this as a go routine
server := make(chan string)
go srvDataObj.GetServerUsingRoundRobin(server)
BaseUrl := <-server
// base url
fmt.Println("base url: ", BaseUrl)
newReq, err := http.NewRequest("GET", BaseUrl+r.RequestURI, r.Body)
if err != nil {
fmt.Println("Error while creating new request: ", err.Error())
http.Error(w, "failed to create new request", http.StatusInternalServerError)
return
}
// set the headers
for name, values := range r.Header {
for _, value := range values {
w.Header().Set(name, value)
}
}
resp, err := client.Do(newReq)
if err != nil {
fmt.Println("Error while making request to the server: ", err.Error())
http.Error(w, "failed to making request to the server", http.StatusInternalServerError)
return
}
defer resp.Body.Close()
for name, values := range resp.Header {
for _, value := range values {
w.Header().Set(name, value)
}
}
respBytes, err := ioutil.ReadAll(resp.Body)
if err != nil {
fmt.Println("Error while reading response bytes from the server: ", err.Error())
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
fmt.Println("\n\nResponse status: ", resp.Status)
fmt.Printf("response from server: %s", string(respBytes))
w.Header().Set("Content-Type", "text/plain")
io.Copy(w, resp.Body)
printServerRequestCountDetails()
}
func printServerRequestCountDetails() {
fmt.Println("\n****** Request Details *******")
for server, count := range reqCountMap.Snapshot() {
fmt.Printf("Server %d: %d\n", server+1, count)
}
fmt.Println("******************************")
}
func removeServerFromWorkingServerList(targetServer string) {
for index, server := range WorkingServerList {
if server == targetServer {
WorkingServerList = append(WorkingServerList[:index], WorkingServerList[index+1:]...)
break
}
}
}
func ServerHealthCheck() {
for server, pass := range serverCheckMap.Snapshot() {
newReq, err := http.NewRequest("GET", server+"/healthcheck", bytes.NewReader(nil))
if err != nil {
fmt.Println("Error while creating new request for health check: ", err.Error())
break
}
client := &http.Client{}
resp, err := client.Do(newReq)
if err != nil {
fmt.Printf("\nServer %s is down\n", server)
// remove the server from the list of working servers
removeServerFromWorkingServerList(server)
serverCheckMap.Set(server, false) // mark as not working
} else if resp.StatusCode == 200 && !pass {
WorkingServerList = append(WorkingServerList, server)
serverCheckMap.Set(server, true) // mark as working
}
}
// print all the active servers
fmt.Println("********* Active Servers **********")
for _, server := range WorkingServerList {
fmt.Println(server)
}
fmt.Println("**********************************")
return
}
func main() {
// fill the server check map
// initially all the servers are working
for _, server := range WorkingServerList {
serverCheckMap.Set(server, true)
}
interval := flag.Int("time", 10, "this flag is used to specify the time interval between each server healthcheck up")
flag.Parse()
http.HandleFunc("/hello", func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "text/plain")
w.Write([]byte("Hello"))
})
http.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
w.Write([]byte("Server Health is Ok"))
})
serverDataObj := ServerDataFormat{
index: 0,
lock: &sync.Mutex{},
}
// load balancer route
http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
// start a go routine
AcceptIncomingRequest(w, r, &serverDataObj)
})
// heath check anon func
go func(t int) {
for {
ServerHealthCheck()
if len(WorkingServerList) == 0 {
fmt.Println("No working servers !!!!")
break
}
time.Sleep(time.Second * time.Duration(t))
}
}(*interval)
// setting up http server
fmt.Println("Starting server on port: 8080")
err := http.ListenAndServe(":8080", nil)
if errors.Is(err, http.ErrServerClosed) {
log.Fatal("error during ListenAndServe: server closed ", err.Error())
return
} else if err != nil {
log.Fatal("error during ListenAndServer: ", err.Error())
return
}
}