Uncategorized

Channels and Workerpools

Concurrency are part of the golang core. They are similar to light weight threads. We run a routine using the go keyword.

go matchRecordsWithDb(record)

 

Channels

Channels are a way to synchronize and communicate with go routines.

ch := make(chan string)
ch <- "test" //Send data to channel
v := <-ch //receive data from channel and assign to v

The receive here will block till data is available on the channel.

Most programs will use multiple go routines and  buffered channels are vital in synchronizing all the routines

doneCh := make(chan bool, 4)

Here we will be able to run a routine 4 and then it will block till all 4 are received.

Select

The select will block until atleast once case is ready. Select with a default clause is a way to implement non-blocking sends, receives.

WorkerPools

I encountered the classic scenario where I had to make thousands of database calls to match records in a payment file. Finding viable matches in the database per line in the file, was slow and proving to much of a hassle. I wanted to add concurrency to my calls to achieve this faster. However, I was restricted by the database connection. I could only send a set number of queries at a time or it would error out.

I started with the naive approach. I  create a buffered channel of 100 that went out and call the matching routine. The requirement was to match with a key in a table and return results.  Some improvement. It did about 100 queries. Wait for those to finish and start the next batch of 100.

const workers = 100
jobsCh := make(chan int, workers)

for rowIndex := 0; rowIndex < len(fileRecords); rowIndex += workers {
   for j = 0; j < workers; j++ {
      if (rowIndex + j) >=len(fileRecords) {
        break;
      }
      go matchRecordsWithDb(jobsCh,&fileRecords[rowIndex+j])
  } // wait for the 100 workers to return
  for i := 0; i < j; i++ {
      fmt.Printf("%d", <-jobsCh) 
  }
}

There was a major snag in this approach. We had a condition if for some reason the line in the file didn’t have the main key, we had to query on another field. This field was not indexed and took a while to query.  It is a very old legacy system so I can’t change the indexing at this point.

In my entire file I had one such record. The iteration of the 100 workers that had among it the routine to do this one query waited almost a minute and a half on that one query, while the 99 others finished. That is when I started looking at design patterns with channels and came across worker pools.

Worker pools is an approach to concurrency in which a fixed number of m workers have to do n number of  tasks in a work queue. Rather than wait on all the workers (channels) at once, as the workers get idle they can be assigned jobs.

The three main players are :

Collector:  Gathers all the jobs

AvailableWorkers Pool: Buffered channel of channels that is used to process the requests

Dispatcher: Pulls work requests off the Collector and sends them to available channels

All the jobs are add to a collector pool. The dispatcher picks jobs off the collector. If there are availableWorkers it gives them the job else it tries to createOne. If all m workers are busy doing jobs the dispatcher will wait on completion to assign the job.

After reading and experimenting with workerPools, I have written a workerPool package that can be used directly or as a guideline to implement your own.

https://github.com/mariadesouza/workerpool

 

 

 

Leave a Reply

Please log in using one of these methods to post your comment:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google+ photo

You are commenting using your Google+ account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s