Concurrency are part of the Golang core. They are similar to light weight threads. We run a routine using the go keyword.
go matchRecordsWithDb(record)
Channels
Channels are a way to synchronize and communicate with go routines.
ch := make(chan string)
ch <- "test" //Send data to unbuffered channel
v := <-ch //receive data from channel and assign to v
The receive here will block till data is available on the channel.
Most programs use multiple go routines and buffered channels.
doneCh := make(chan bool, 4)
Here we will be able to run a routine 4 times without blocking.
select
select is like a switch with cases that allows you to wait on multiple communication operations. It is a way to receive channel data will block until one of its case is ready. Select with a default clause is a way to implement non-blocking sends, receives.
select {
case ac <- a:
// sent a on ac
case b:= <-bc:
// received b from bc
}
WorkerPools
I encountered the classic scenario where I had to make thousands of database calls to match records from a file to database entries. Finding viable matches in the database per line in the file, was slow and proving to much of a hassle. I wanted to add concurrency to my calls to achieve this faster. However, I was restricted by the database connection. I could only send a set number of queries at a time or it would error out.
I started with the naive approach. I create a buffered channel of 100 that went out and call the matching routine. The requirement was to match with a key in a table and return results. Some improvement. It did about 100 queries. Wait for those to finish and start the next batch of 100.
const workers = 100
jobsCh := make(chan int, workers)
for rowIdx := 0; rowIdx < len(fileRecords); rowIdx += workers {
for j = 0; j < workers; j++ {
if (rowIndex + j) >=len(fileRecords) {
break;
}
go matchRecordsWithDb(jobsCh, &fileRecords[rowIdx+j])
} // wait for the 100 workers to return
for i := 0; i < j; i++ {
fmt.Printf("%d", <-jobsCh)
}
}
There was a major snag in this approach. We had a condition, that if the line in the file didn’t have the main id field, we had to query on another field. This field was not indexed and took a while to query. It is a very old legacy system so I can’t change the indexing at this point.
In my entire file I had one such record. The iteration of the 100 workers that had among it the routine to do this one query waited almost a minute and a half on that one query, while the 99 others finished. That is when I started looking at design patterns with channels and came across worker pools.
Worker pools is an approach to concurrency in which a fixed number of m workers have to do n number of tasks in a work queue. Rather than wait on all the workers (channels) at once, as the workers get idle they can be assigned jobs.
The three main players are :
Collector: Gathers all the jobs
AvailableWorkers Pool: Buffered channel of channels that is used to process the requests
Dispatcher: Pulls work requests off the Collector and sends them to available channels
All the jobs are add to a collector pool. The dispatcher picks jobs off the collector. If there are availableWorkers it gives them the job else it tries to createOne. If all m workers are busy doing jobs the dispatcher will wait on completion to assign the job.
After reading and experimenting with workerPools, I have written a workerPool package that can be used directly or as a guideline to implement your own.