GitHub - tejzpr/ordered-concurrently: Ordered-concurrently a library for concurrent processing with ordered output in Go. Process work concurrently and returns output in a channel in the order of input. It is useful in concurrently processing items in a queue, and get output in the order provided by the queue. (original) (raw)

Tests codecov Go Reference Gitpod ready-to-code Go Report Card

Ordered Concurrently

A library for parallel processing with ordered output in Go. This module processes work concurrently / in parallel and returns output in a channel in the order of input. It is useful in concurrently / parallelly processing items in a queue, and get output in the order provided by the queue.

Usage

Get Module

go get github.com/tejzpr/ordered-concurrently/v3

Import Module in your source code

import concurrently "github.com/tejzpr/ordered-concurrently/v3"

Create a work function by implementing WorkFunction interface

// Create a type based on your input to the work function type loadWorker int

// The work that needs to be performed // The input type should implement the WorkFunction interface func (w loadWorker) Run(ctx context.Context) interface{} { time.Sleep(time.Millisecond * time.Duration(rand.Intn(100))) return w * 2 }

Demo

Go Playground

Run

Example - 1

func main() { max := 10 inputChan := make(chan concurrently.WorkFunction) ctx := context.Background() output := concurrently.Process(ctx, inputChan, &concurrently.Options{PoolSize: 10, OutChannelBuffer: 10}) go func() { for work := 0; work < max; work++ { inputChan <- loadWorker(work) } close(inputChan) }() for out := range output { log.Println(out.Value) } }

Example - 2 - Process unknown number of inputs

func main() { inputChan := make(chan concurrently.WorkFunction, 10) ctx := context.Background() output := concurrently.Process(ctx, inputChan, &concurrently.Options{PoolSize: 10, OutChannelBuffer: 10})

ticker := time.NewTicker(100 * time.Millisecond)
done := make(chan bool)
wg := &sync.WaitGroup{}
go func() {
    input := 0
    for {
        select {
        case <-done:
            return
        case <-ticker.C:
            inputChan <- loadWorker(input)
            wg.Add(1)
            input++
        default:
        }
    }
}()

var res []loadWorker
go func() {
    for out := range output {
        res = append(res, out.Value.(loadWorker))
        wg.Done()
    }
}()

time.Sleep(1600 * time.Millisecond)
ticker.Stop()
done <- true
close(inputChan)
wg.Wait()

// Check if output is sorted
isSorted := sort.SliceIsSorted(res, func(i, j int) bool {
    return res[i] < res[j]
})
if !isSorted {
    log.Println("output is not sorted")
}

}

Credits

  1. u/justinisrael for inputs on improving resource usage.
  2. mh-cbon for identifying potential deadlocks.