Uncategorized

Channels and Workerpools

Concurrency are part of the Golang core. They are similar to light weight threads. We run a routine using the go keyword.

go matchRecordsWithDb(record)

Channels

Channels are a way to synchronize and communicate with go routines.

ch := make(chan string) 
ch <- "test" //Send data to unbuffered channel
v := <-ch //receive data from channel and assign to v

The receive here will block till data is available on the channel.

Most programs use multiple go routines and  buffered channels.

doneCh := make(chan bool, 4)

Here we will be able to run a routine 4 times without blocking.

select

select is like a switch with cases that allows you to wait on multiple communication operations. It is a way to receive channel data will block until one of its case is ready.  Select with a default clause is a way to implement non-blocking sends, receives.

select {
case ac <- a:
// sent a on ac
case b:= <-bc:
// received b from bc
}

WorkerPools

I encountered the classic scenario where I had to make thousands of database calls to match records from a file to database entries. Finding viable matches in the database per line in the file, was slow and proving to much of a hassle. I wanted to add concurrency to my calls to achieve this faster. However, I was restricted by the database connection. I could only send a set number of queries at a time or it would error out.

I started with the naive approach. I  create a buffered channel of 100 that went out and call the matching routine. The requirement was to match with a key in a table and return results.  Some improvement. It did about 100 queries. Wait for those to finish and start the next batch of 100.

const workers = 100 
jobsCh := make(chan int, workers)
for rowIdx := 0; rowIdx < len(fileRecords); rowIdx += workers {
for j = 0; j < workers; j++ {
if (rowIndex + j) >=len(fileRecords) {
break;
}
go matchRecordsWithDb(jobsCh, &fileRecords[rowIdx+j])
} // wait for the 100 workers to return
for i := 0; i < j; i++ {
fmt.Printf("%d", <-jobsCh)
}
}

There was a major snag in this approach. We had a condition, that if the line in the file didn’t have the main id field, we had to query on another field. This field was not indexed and took a while to query.  It is a very old legacy system so I can’t change the indexing at this point.

In my entire file I had one such record. The iteration of the 100 workers that had among it the routine to do this one query waited almost a minute and a half on that one query, while the 99 others finished. That is when I started looking at design patterns with channels and came across worker pools.

Worker pools is an approach to concurrency in which a fixed number of m workers have to do n number of  tasks in a work queue. Rather than wait on all the workers (channels) at once, as the workers get idle they can be assigned jobs.

The three main players are :

Collector:  Gathers all the jobs

AvailableWorkers Pool: Buffered channel of channels that is used to process the requests

Dispatcher: Pulls work requests off the Collector and sends them to available channels

All the jobs are add to a collector pool. The dispatcher picks jobs off the collector. If there are availableWorkers it gives them the job else it tries to createOne. If all m workers are busy doing jobs the dispatcher will wait on completion to assign the job.

After reading and experimenting with workerPools, I have written a workerPool package that can be used directly or as a guideline to implement your own.

https://github.com/mariadesouza/workerpool

Leave a Reply

Please log in using one of these methods to post your comment:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s