I was tasked with writing a piece of software that checks a couple of downstream services to make sure they running ( a health check ), the main function had an overall timeout, let’s say 10 seconds, so each downstream service had to respond within that time, and if it didn’t, it should log that as an error that the downstream service didn’t respond in time.
I solved this by using channels in Golang, specifically a combination of channels and wait groups that allows me to check each downstream process in its own goroutine, giving each one relatively the same amount of time to respond. My mindset behind this approach was to say,
I will instruct X number of gophers to do the work of checking downstream processes and they should return to me a response in 10 seconds whether the task was a success or failure. each one had 10 seconds to respond.
An example of the process is shown below. ( rewritten and modified from its original) This worked, for the most part, but I noticed that there were times the program crashed because one of the gophers ( goroutines ) tried to give me a response, but the door was closed ( write to close channel ). This was difficult to replicate, as it seems to happen randomly.
After doing some research, I saw the error in my ways.
/*
This function outlines the improper way to use waitgroups and channels
Client - http client can be mocked
timeout - time in seconds passes as duration
*/
func buggy_function(client *http.Client, timeout time.Duration){
service_endpoints :=
[]string{
"http://someservice.com",
"http://someservice.com",
"http://someservice.com",
"http://someservice.com",
"http://someservice.com"}
wg := sync.WaitGroup{} // set up wait groups
wg.Add(len(service_endpoints)) // tell the wg how much it should wait for
type response_data struct{
response string
err error
status_code int
}
response_chan := make(chan *response_data, len(service_endpoints)) // buffered channel
defer close(response_chan) // this is an issue, as the function can return before all the goroutines respond
for _, endpoint := range service_endpoints{
go func (wg sync.WaitGroup, response_chan chan *response_data, timeout time.Duration){
// each downstream check should be done in it's own goroutine.
req, err := http.NewRequest(http.MethodGet,endpoint,nil)
if err != nil {
// the counter is decremented before the data is written to the channel
//( this is just in the wrong place in general )
wg.Done()
response_chan <- &response_data{
err : err ,
status_code: 500,
}
return
}
// secondary timeout process, if no response is returned in the alotted time, gopher should still let us know
ctx, cancelFunc := context.WithTimeout(req.Context(),timeout)
defer cancelFunc()
req = req.WithContext(ctx)
resp, err := client.Do(req)
if err != nil {
wg.Done() // bad
response_chan <- &response_data{
err : err ,
status_code: 500,
}
return
}
if resp.StatusCode >= http.StatusBadRequest{
wg.Done() // call this explicity each time is not necessary
response_chan <- &response_data{
err : errors.New("non 2xx response recieved from server") ,
status_code: resp.StatusCode,
}
return
}
wg.Done() // i already said, this is bad
response_chan <- &response_data{
response: "success",
status_code: resp.StatusCode,
}
return
}(wg, response_chan,timeout)
}
//this blocks, but based on how the go routines are coded,
//the wg could be completed before all the responses are written to the channels
wg.Wait()
var recievedResponses []*response_data
// non infinite loop here means that it could have exited before all responses were recieved. Another issue.
for i:= 0 ; i < len(service_endpoints); i++ {
select{
case response := <- response_chan:
recievedResponses = append(recievedResponses, response)
if response.err != nil {
// whaetever you want
}
default:
if len(recievedResponses) == len(service_endpoints){
break
}
}
}
// do whatever else here
}
So what I want to do today, is show you what I did, show you how I fixed it, and present some things that you should look out for when using channels, wait-groups and goroutines in Golang. This is shown in the comments in the above program, hopefully, they are verbose enough to understand the thought process behind the decisions and why they were wrong.
I will also show you how to catch these errors so that the program does not crash in the case where you do use channels and may use them improperly.
func fixed(client *http.Client, timeout time.Duration){ service_endpoints := []string{ "http://someservice.com", "http://someservice.com", "http://someservice.com", "http://someservice.com", "http://someservice.com"} wg := sync.WaitGroup{} wg.Add(len(service_endpoints)) type response_data struct{ response string err error status_code int } response_chan := make(chan *response_data, len(service_endpoints)) for _, endpoint := range service_endpoints{ go func (wg sync.WaitGroup, response_chan chan *response_data, timeout time.Duration){ defer func () { if err := recover(); err != nil { // in case there's a panic, catch it // log out or do something here to indicate the program panics return } }() defer wg.Done() // use defer here (Go Adds deferred functions to a list) req, err := http.NewRequest(http.MethodGet,endpoint,nil) if err != nil { response_chan <- &response_data{ err : err , status_code: 500, } return } ctx, cancelFunc := context.WithTimeout(req.Context(),timeout) defer cancelFunc() req = req.WithContext(ctx) resp, err := client.Do(req) if err != nil { response_chan <- &response_data{ err : err , status_code: 500, } return } if resp.StatusCode >= http.StatusBadRequest{ response_chan <- &response_data{ err : errors.New("non 2xx response recieved from server") , status_code: resp.StatusCode, } return } response_chan <- &response_data{ response: "success", status_code: resp.StatusCode, } return }(wg, response_chan,timeout) } // moved this to it's own go routine so that we can go into the infinite for loop go func (wg sync.WaitGroup) { wg.Wait() close(response_chan) }(wg) var recievedResponses []*response_data for { // in this case, we only have one case to look at, //when we get a response, //and we break out when we recieved all the responses //as we expected. ( the buffered channel will panic if we try to write more than the buffer ) select{ case response := <- response_chan: recievedResponses = append(recievedResponses, response) if response.err != nil { // handle error here } if len(recievedResponses) == len(service_endpoints){ break } } break } }
A couple of things to note;
- Go adds defers to a queue. It works first in last out ( or last in, first out ). So the defers within the goroutine are executed in the reverse order they are added. In the flow above, the cancelFunc will be called first, then the wg.Done() then lastly the recover function.
defer wg.Done() // use defer here
defer func () {
if err := recover(); err != nil {
// log out or do something here to indicate the program panics
return
}
}()
2. Instead of calling defer in the main routine to close the channel, explicitly called close here when the wait-group is decremented to 0. This is a safer approach. This, in combination with defer wg.Done() is a more ideal way to accomplish the goal, as in this case, the counter is only decremented after the data is written to the channel and the function exits instead of me trying to manage that order of execution myself.
go func (wg sync.WaitGroup) {
wg.Wait()
close(response_chan)
}(wg)
3. The recover function is builtin in Golang, so you don’t need to import anything for this to work. This recover only works in the routine that panics. for example, if your main routine panics, but this recover is in the sub-routine, it won’t be caught. I find myself in the habit of creating a recover function and calling it at the start of every routine. this prevents the program from crashing and allows you to properly see the stack trace and then you can create monitors etc around it.
defer func () {
if err := recover(); err != nil {
// log out or do something here to indicate the program panics
return
}
}()
Ok, so that brings me to the end of this short tutorial. I am diving a bit deeper into Concurrency in Go, so I’ll probably find improvements to this in the future, but for now, this works and I have not seen any panics since it was implemented. I’m sure there are improvements to what I have here, please feel free to correct me if you see an error. Another thing to note, I didn’t run this, it just wanted to show the overall concept of what I was trying to do. ( sorry if the code blocks look a bit weird, i’ll find a better way in my next post )
Until next time,
Matt
Other Posts: