GitHub - koss-null/FuncFrog: Stream api (kind of) implementation for go, other useful functions and packages to use go in a functional way (original) (raw)

FuncFrog

Go Report Card Go Reference License: MIT Coverage

FuncFrog icon

FuncFrog is a library for performing efficient, parallel, lazy map, reduce, filter and many other operations on slices and other data sequences in a pipeline. The sequence can be set by a variety of generating functions. Everything is supported to be executed in parallel with minimal overhead on copying and locks. There is a built-in support of error handling with Yeet/Snag methods
The library is easy to use and has a clean, intuitive API.
You can measure performance comparing to vanilla for loop on your machine using cd perf/; make (spoiler: FuncFrog is better when multithreading).

Table of Contents

Getting Started

To use FuncFrog in your project, run the following command:

go get github.com/koss-null/funcfrog

Then, import the library into your Go code (basically you need the pipe package):

import "github.com/koss-null/funcfrog/pkg/pipe"

You can then use the pipe package to create a pipeline of operations on a slice:

res := pipe.Slice(a). Map(func(x int) int { return x * x }). Filter(func(x *int) bool { return *x > 100 }). Parallel(12). Do()

All operations are carefully fenced with interfaces, so feel free to use anything, autosuggestion suggests you.

If you want it fast and short, you may use ff:

import "github.com/koss-null/funcfrog/pkg/ff"

res := ff.Map(strArr, strings.ToUpper).Do()

To see some code snippets, check out the Examples.

Basic information

The Piper (or PiperNoLen for pipes with undetermined lengths) is an interface that represents a lazy-evaluated sequence of data. The Piper interface provides a set of methods that can be used to transform, filter, collect and analyze data in the sequence. Every pipe can be conveniently copied at every moment just by equating it to a variable. Some methods (as Take or Gen) lead from PiperNoLen to Piper interface making wider method range available.

Supported functions list

The following functions can be used to create a new Pipe (this is how I call the inner representation of a sequence ofelements and a sequence operations on them):

Constructors

Set Pipe length

Split evaluation into n goroutines

Transform data

Retrieve a single element or perform a boolean check

Evaluate the pipeline

Transform Pipe from one type to another

Easy type conversion for Pipe[any]

Error handling

To be done

In addition to the functions described above, the pipe package also provides several utility functions that can be used to create common types of Pipes, such as Range, Repeat, and Cycle. These functions can be useful for creating Pipes of data that follow a certain pattern or sequence.

Also it is highly recommended to get familiarize with the pipies package, containing some useful predecates, comparators and accumulators.

Using prefix Pipe to transform Pipe type

You may found that using Erase() is not so convenient as it makes you to do some pointer conversions. Fortunately there is another way to convert a pipe type: use functions from pipe/prefixpipe.go. These functions takes Piper or PiperNoLen as a first parameter and function to apply as the second and returns a resulting pipe (or the result itself) of a destination type.

Prefix pipe functinos

Using ff package to write shortened pipes

Sometimes you need just to apply a function. Creating a pipe using pipe.Slice and then call Map looks a little bit verbose, especially when you need to call Map or Reduce from one type to another. The solution for it is funcfrog/pkg/ff package. It contains shortened Map and Reduce functions which can be called directly with a slice as a first parameter.

Look for useful functions in Pipies package

Some of the functions that are sent to Map, Filter or Reduce (or other Pipe methods) are pretty common. Also there is a common comparator for any integers and floats for a Sort method.

Examples

Basic example:

res := pipe.Slice(a). Map(func(x int) int { return x * x }). Map(func(x int) int { return x + 1 }). Filter(func(x *int) bool { return *x > 100 }). Filter(func(x *int) bool { return *x < 1000 }). Parallel(12). Do()

Example using Func and Take:

p := pipe.Func(func(i int) (v int, b bool) { if i < 10 { return i * i, true }; return }).Take(5).Do() // p will be [0, 1, 4, 9, 16]

Example using Func and Gen:

p := pipe.Func(func(i int) (v int, b bool) { if i < 10 { return i * i, true }; return }).Gen(5).Do() // p will be [0, 1, 4, 9, 16]

Example difference between Take and Gen:

Gen(n) generates the sequence of n elements and applies all pipeline afterwards.

p := pipe.Func(func(i int) (v int, b bool) { return i, true }). Filter(func(x *int) bool { return (*x) % 2 == 0}) Gen(10). Do() // p will be [0, 2, 4]

Take(n) expects the result to be of n length.

p := pipe.Func(func(i int) (v int, b bool) { return i, true }). Filter(func(x *int) bool { return (*x) % 2 == 0}) Take(10). Do() // p will be [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]

Watch out, if Take value is set uncarefully, it may jam the whole pipenile.

// DO NOT DO THIS, IT WILL JAM p := pipe.Func(func(i int) (v int, b bool) { return i, i < 10 // only 10 first values are not skipped }). Take(11). // we can't get any 11th value ever Parallel(4). // why not Do() // Do() will try to evaluate the 11th value in 4 goroutines until it reaches maximum int value

Example using Filter and Map:

p := pipe.Slice([]int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}). Filter(func(x *int) bool { return *x % 2 == 0 }). Map(func(x int) int { return len(strconv.Itoa(x)) }). Do() // p will be [1, 1, 1, 1, 2]

Example using Map and Reduce:

In this example Reduce is used in it's prefix form to be able to convert ints to string.

p := pipe.Reduce( pipe.Slice([]int{1, 2, 3, 4, 5}). Map(func(x int) int { return x * x }), func(x, y *int) string { return strconv.Itoa(*x) + "-" + strconv.Itoa(y) }, ) // p will be "1-4-9-16-25"

In this example Reduce is used as usual in it's postfix form.

p := pipe.Slice([]stirng{"Hello", "darkness", "my", "old", "friend"}). Map(strings.Title). Reduce(func(x, y *string) string { return *x + " " + *y }) ) // p will be "Hello Darkness My Old Friend"

Example of Map and Reduce with the underlying array type change:

p := pipe.Slice([]int{1, 2, 3, 4, 5, 6, 7, 8, 9}) strP := pipe.Map(p, func(x int) string { return strconv.Itoa(x) }) result := pipe.Reduce(strP, func(x, y *string) int { return len(*x) + len(*y) }).Do() // result will be 45

Example using Sort:

p := pipe.Func(func(i int) (float32, bool) { return 100500-float32(i) * 0.9, true }). Map(func(x float32) float32 { return x * x * 0.1 }). Gen(100500). // Sort is only availavle on pipes with known length Sort(pipies.Less[float32]). // pipies.Less(x, y *T) bool is available to all comparables // check out pipies package to find more usefull things Parallel(12). Do() // p will contain the elements sorted in ascending order

Example of infine sequence generation:

Here is an example of generating an infinite sequence of Fibonacci:

var fib []chan int p := pipe.Func(func(i int) (int, bool) { if i < 2 { fib[i] <- i return i, true } p1 := <-fib[i-1]; fib[i-1] <- p1 p2 := <-fib[i-2]; fib[i-2] <- p2

fib[i] <- p1 + p2
return p1 + p2, true

}).Parallel(20)

To generate a specific number of values, you can use the Take or Gen method:

// fill the array first fib = make([]chan int, 60) for i := range fib { fib[i] = make(chan int, 1) } // do the Take p = p.Take(60)

To accumulate the elements of the Pipe, you can use the Reduce or Sum method:

sum := p.Sum(pipe.Sum[float32]) //also you can: sum := p.Reduce(func(x, y *float32) float32 { return *x + *y}) // sum will be the sum of the first 65000 random float32 values greater than 0.5

Example using Range and Map:

p := pipe.Range(10, 20, 2).Map(func(x int) int { return x * x }).Do() // p will be [100, 144, 196, 256, 324]

Example using Repeat and Map:

p := pipe.Repeat("hello", 5).Map(strings.ToUpper).Do() // p will be ["HELLO", "HELLO", "HELLO", "HELLO", "HELLO"]

Here is an example how you can handle multiple function returning error call this way:

func foo() error { // <...> return nil }

errs := pipe.Map( pipe.Repeat(foo, 50), func(f func() error) error { return f() }, ).Do()

for _, e := range errs { if e != nil { log.Err(e) } }

Example using Cycle and Filter

p := pipe.Cycle([]int{1, 2, 3}).Filter(func(x *int) bool { return *x % 2 == 0 }).Take(4).Do() // p will be [2, 2, 2, 2]

Example using Erase and Collect

p := pipe.Slice([]int{1, 2, 3}). Erase(). Map(func(x any) any { i := *(x.(*int)) return &MyStruct{Weight: i} }).Filter(x *any) bool { return (*x).(*MyStruct).Weight > 10 } ms := pipe.CollectMyStruct.Parallel(10).Do()

Example of simple error handling

y := pipe.NewYeti() p := pipe.Range[int](-10, 10, 1). Yeti(y). // it's important to set yeti before yeeting, or the handle process will not be called MapFilter(func(x int) (int, bool) { if x == 0 { y.Yeet(errors.New("zero devision")) // yeet the error return 0, false // use MapFilter to filter out this value } return int(256.0 / float64(x)), true }).Snag(func(err error) { fmt.Println("oopsie-doopsie: ", err) }).Do()

fmt.Println("p is: ", p) /////////// output is: // oopsie-doopsie: zero devision // p is: [-25 -28 -32 -36 -42 -51 -64 -85 -128 -256 256 128 85 64 51 42 36 32 28]

This example demonstrates generating a set of values 256/i, where i ranges from -10 to 9 (excluding 10) with a step of 1. To handle division by zero, the library provides an error handling mechanism.

To begin, you need to create an error handler using the pipe.NewYeti() function. Then, register the error handler by calling the Yeti(yeti) method on your pipe object. This registered yeti will be the last error handler used in the pipe chain.

To yeet an error, you can use y.Yeet(error) from the registered yeti object.

To handle the yeeted error, use the Snag(func(error)) method, which sets up an error handling function. You can set up multiple Snag functions, but all of them will consider the last yeti object set with the Yeti(yeti) method.

This is a simple example of how to handle basic errors. Below, you will find a more realistic example of error handling in a real-life scenario.

Example of multiple error handling

y1, y2 := pipe.NewYeti(), pipe.NewYeti() users := pipe.Func(func(i int) (*domain.DomObj, bool) { domObj, err := uc.GetUser(i) if err != nil { y1.Yeet(err) return nil, false } return domObj, true }). Yeti(y1).Snag(handleGetUserErr). // suppose we have some pre-defined handler MapFilter(func(do *domain.DomObj) (*domain.DomObj, bool) { enriched, err := uc.EnrichUser(do) if err != nil { return nil, false } return enriched, true }).Yeti(y2).Snag(handleEnrichUserErr). Do()

The full working code with samples of handlers and implementations of usecase functions can be found at: https://go.dev/play/p/YGtM-OeMWqu.

This example demonstrates how multiple error handling functions can be set up at different stages of the data processing pipeline to handle errors specific to each stage.

Lets break down what is happening here.

In this code fragment, there are two instances of pipe.Yeti created: y1 and y2. These Yeti instances are used to handle errors at different stages of the data processing pipeline.

Within the pipe.Func operation, there are error-handling statements. When calling uc.GetUser(i), if an error occurs, it is yeeted using y1.Yeet(err), and the function returns nil and false to indicate the failure.

The Yeti(y1).Snag(handleGetUserErr) statement sets up an error handling function handleGetUserErr to handle the error thrown by uc.GetUser(i). This function is defined elsewhere and specifies how to handle the error.

After that, the MapFilter operation is performed on the resulting *domain.DomObj. If the uc.EnrichUser(do) operation encounters an error, it returns nil and false to filter out the value.

The Yeti(y2).Snag(handleEnrichUserErr) statement sets up another error handling function handleEnrichUserErr to handle the error thrown by uc.EnrichUser(do).

Finally, the Do() method executes the entire pipeline and assigns the result to the users variable.

Is this package stable?

Yes it finally is stable since v1.0.0! All listed functionality is fully covered by unit-tests. Functionality marked as TBD will be implemented as it described in the README and covered by unit-tests to be delivered stable.

If there will be any method signature changes, the major version will be incremented.

Contributions

I will accept any pr's with the functionality marked as TBD.

Also I will accept any sane unit-tests.

Bugfixes.

You are welcome to create any issues and connect to me via email.

What's next?

I hope to provide some roadmap of the project soon.

Feel free to fork, inspire and use!