orderedconcurrently package - github.com/tejzpr/ordered-concurrently - Go Packages (original) (raw)
Ordered Concurrently
A library for parallel processing with ordered output in Go. This module processes work concurrently / in parallel and returns output in a channel in the order of input. It is useful in concurrently / parallelly processing items in a queue, and get output in the order provided by the queue.
Usage
Get Module
go get github.com/tejzpr/ordered-concurrently
Import Module in your source code
import concurrently "github.com/tejzpr/ordered-concurrently"
Create a work function
// The work that needs to be performed
func workFn(val interface{}) interface{} {
time.Sleep(time.Millisecond * time.Duration(rand.Intn(100)))
return val.(int) * 2
}
Run
Example - 1
func main() {
max := 10
inputChan := make(chan *concurrently.OrderedInput)
output := concurrently.Process(inputChan, workFn, &concurrently.Options{PoolSize: 10, OutChannelBuffer: 10})
go func() {
for work := 0; work < max; work++ {
inputChan <- &concurrently.OrderedInput{work}
}
close(inputChan)
}()
for out := range output {
log.Println(out.Value)
}
}
Example - 2
func main() {
max := 100
// Can be a non blocking channel as well
inputChan := make(chan *concurrently.OrderedInput)
wg := &sync.WaitGroup{}
outChan := concurrently.Process(inputChan, workFn, &concurrently.Options{PoolSize: 10})
go func() {
for out := range outChan {
log.Println(out.Value)
wg.Done()
}
}()
// Create work and sent to input channel
// Output will be in the order of input
for work := 0; work < max; work++ {
wg.Add(1)
input := &concurrently.OrderedInput{work}
inputChan <- input
}
close(inputChan)
wg.Wait()
}
Credits
- u/justinisrael for inputs on improving resource usage.
- mh-cbon for identifying potential deadlocks.