GitHub - noneback/go-taskflow: A pure go General-purpose Task-parallel Programming Framework with integrated visualizer and profiler (original) (raw)

go-taskflow

codecov Go Reference Go Report Card Mentioned in Awesome Go DeepWiki

go-taskflow

go-taskflow is a general-purpose task-parallel programming framework for Go, inspired by taskflow-cpp. It leverages Go's native capabilities and simplicity, making it ideal for managing complex dependencies in concurrent tasks.

Features

Use Cases

Installation

Import the latest version of go-taskflow using:

go get -u github.com/noneback/go-taskflow

Documentation

DeepWiki Page

Example

Below is an example of using go-taskflow to implement a parallel merge sort:

package main

import ( "fmt" "log" "math/rand" "os" "slices" "strconv" "sync"

gtf "github.com/noneback/go-taskflow"

)

// mergeInto merges a sorted source array into a sorted destination array. func mergeInto(dest, src []int) []int { size := len(dest) + len(src) tmp := make([]int, 0, size) i, j := 0, 0 for i < len(dest) && j < len(src) { if dest[i] < src[j] { tmp = append(tmp, dest[i]) i++ } else { tmp = append(tmp, src[j]) j++ } }

if i < len(dest) {
    tmp = append(tmp, dest[i:]...)
} else {
    tmp = append(tmp, src[j:]...)
}

return tmp

}

func main() { size := 100 randomArr := make([][]int, 10) sortedArr := make([]int, 0, 10*size) mutex := &sync.Mutex{}

for i := 0; i < 10; i++ {
    for j := 0; j < size; j++ {
        randomArr[i] = append(randomArr[i], rand.Int())
    }
}

sortTasks := make([]*gtf.Task, 10)
tf := gtf.NewTaskFlow("merge sort")
done := tf.NewTask("Done", func() {
    if !slices.IsSorted(sortedArr) {
        log.Fatal("Sorting failed")
    }
    fmt.Println("Sorted successfully")
    fmt.Println(sortedArr[:1000])
})

for i := 0; i < 10; i++ {
    sortTasks[i] = tf.NewTask("sort_"+strconv.Itoa(i), func() {
        arr := randomArr[i]
        slices.Sort(arr)
        mutex.Lock()
        defer mutex.Unlock()
        sortedArr = mergeInto(sortedArr, arr)
    })
}
done.Succeed(sortTasks...)

executor := gtf.NewExecutor(1000)

executor.Run(tf).Wait()

if err := tf.Dump(os.Stdout); err != nil {
    log.Fatal("Error dumping taskflow:", err)
}

if err := executor.Profile(os.Stdout); err != nil {
    log.Fatal("Error profiling taskflow:", err)
}

}

For more examples, visit the examples directory.

Benchmark

The following benchmark provides a rough estimate of performance. Note that most realistic workloads are I/O-bound, and their performance cannot be accurately reflected by these results. For CPU-intensive tasks, consider using taskflow-cpp.

$ go test -bench=. -benchmem
goos: linux
goarch: amd64
pkg: github.com/noneback/go-taskflow/benchmark
cpu: Intel(R) Xeon(R) Platinum 8269CY CPU @ 2.50GHz
BenchmarkC32-4    	   23282	     51891 ns/op	    7295 B/op	     227 allocs/op
BenchmarkS32-4    	    7047	    160199 ns/op	    6907 B/op	     255 allocs/op
BenchmarkC6-4     	   66397	     18289 ns/op	    1296 B/op	      47 allocs/op
BenchmarkC8x8-4   	    7946	    143474 ns/op	   16914 B/op	     504 allocs/op
PASS
ok  	github.com/noneback/go-taskflow/benchmark	5.606s

Understanding Conditional Tasks

Conditional nodes in go-taskflow behave similarly to those in taskflow-cpp. They participate in both conditional control and looping. To avoid common pitfalls, refer to the Conditional Tasking documentation.

Error Handling in go-taskflow

In Go, errors are values, and it is the user's responsibility to handle them appropriately. Only unrecovered panic events are managed by the framework. If a panic occurs, the entire parent graph is canceled, leaving the remaining tasks incomplete. This behavior may evolve in the future. If you have suggestions, feel free to share them.

To prevent interruptions caused by panic, you can handle them manually when registering tasks:

tf.NewTask("not interrupt", func() { defer func() { if r := recover(); r != nil { // Handle the panic. } }() // User-defined logic. })

Visualizing Taskflows

To generate a visual representation of a taskflow, use the Dump method:

if err := tf.Dump(os.Stdout); err != nil { log.Fatal(err) }

The Dump method generates raw strings in DOT format. Use the dot tool to create a graph SVG.

dot

Profiling Taskflows

To profile a taskflow, use the Profile method:

if err := executor.Profile(os.Stdout); err != nil { log.Fatal(err) }

The Profile method generates raw strings in flamegraph format. Use the flamegraph tool to create a flamegraph SVG.

flg

Stargazer

Star History Chart