GitHub - outr/rapid: Stupid simple effects and threading (original) (raw)

CI

Rapid is a high-performance, minimal-overhead effect, concurrency, and streaming library for Scala 2 and 3, designed for scenarios where execution speed and allocation efficiency are critical. Primarily focused around simplicity and convenience utilizing Virtual Threads for extremely fast performance.

It provides:

Inspiration

This project was born out of a deep curiosity about Virtual Threads and a desire to explore their potential performance compared to existing libraries. What began as a mere benchmark swiftly transformed into a profound appreciation for the elegant simplicity and rapidity of this approach. From there, the project's direction shifted towards building a library that prioritized practical execution models over purely effect-free semantics. That’s why Rapid lets you run tasks in a straightforward, blocking, single-threaded way—or seamlessly kick them into multi-threaded parallel execution when performance demands it.

Benchmarks

Take a look at the benchmarks to see how well it performs compared to the alternatives: https://github.com/outr/rapid/wiki/Benchmarks

Features


SBT Configuration

Scala 2.13+ and Scala 3 are supported.

Core

libraryDependencies += "com.outr" %% "rapid-core" % "2.9.8"

Test (Test features for running Task effects in ScalaTest)

libraryDependencies += "com.outr" %% "rapid-test" % "2.9.8"

Cats (Interoperability with Cats-Effect)

libraryDependencies += "com.outr" %% "rapid-cats" % "2.9.8"


Core Concepts

Task

A Task[A] is a description of a computation that produces a value of type A. It can be run synchronously with .sync() or executed in a Fiber for concurrency.

import rapid.Task import scala.concurrent.duration._

val hello: Task[Unit] = Task { println("Hello, Rapid!") } // hello: Task[Unit] = Suspend( // f = repl.MdocSession$MdocApp$$Lambda/0x000000009288c5b0@23062ef9, // trace = SourcecodeTrace( // file = File("README.md"), // line = Line(15), // enclosing = Enclosing("repl.MdocSession.MdocApp#hello"), // kind = "apply" // ) // )

val delayed: Task[String] = Task.sleep(500.millis).map(_ => "Done!") // delayed: Task[String] = FlatMap( // input = FlatMap( // input = Unit, // f = rapid.Task$$Lambda/0x00000000928943d8@557696b1, // trace = SourcecodeTrace( // file = File("README.md"), // line = Line(20), // enclosing = Enclosing("repl.MdocSession.MdocApp#delayed"), // kind = "flatMap" // ) // ), // f = rapid.Task$$Lambda/0x0000000092894f68@37ed2f09, // trace = SourcecodeTrace( // file = File("README.md"), // line = Line(20), // enclosing = Enclosing("repl.MdocSession.MdocApp#delayed"), // kind = "map" // ) // )

hello.sync() // Hello, Rapid! println(delayed.sync()) // Done!


Fiber

A Fiber[A] is a lightweight handle to a running Task[A]. You can start tasks on fibers and wait for them to complete.

import rapid.Task

val fiber = Task { Thread.sleep(1000) "Completed!" }.start() // fiber: Fiber[String] = rapid.fiber.VirtualThreadFiber@6655e9b2

println("Running in background...") // Running in background... val result = fiber.sync() // result: String = "Completed!" println(result) // "Completed!" // Completed!


Stream

A Stream[A] is a lazy, composable sequence of A values backed by Task. You can transform it sequentially or in parallel.

import rapid.{Stream, Task}

val s = Stream.emits(1 to 5) // s: Stream[Int] = rapid.Stream@80923d4

val doubled = s.map(_ * 2).toList.sync() // doubled: List[Int] = List(2, 4, 6, 8, 10) // List(2, 4, 6, 8, 10)

val parallel = s.par(4)(i => Task(i * 2)).toList.sync() // parallel: List[Int] = List(2, 4, 6, 8, 10) // Parallel map with up to 4 threads

Common Operations

val filtered = s.filter(_ % 2 == 0).toList.sync() // List(2, 4) // filtered: List[Int] = List(2, 4) val taken = s.take(3).toList.sync() // List(1, 2, 3) // taken: List[Int] = List(1, 2, 3) val zipped = s.zipWithIndex.toList.sync() // List((1,0), (2,1), ...) // zipped: List[Tuple2[Int, Int]] = List( // (1, 0), // (2, 1), // (3, 2), // (4, 3), // (5, 4) // )


Parallel Operators

.par

Parallel map with a maximum number of threads.

Stream.emits(1 to 10) .par(maxThreads = 4)(i => Task(i * 2)) .toList .sync() // res4: List[Int] = List(2, 4, 6, 8, 10, 12, 14, 16, 18, 20)

.parForeach

Fire-and-forget parallel processing for side-effects (no allocation of results).

import java.util.concurrent.atomic.AtomicLong import rapid.Task

val sum = new AtomicLong(0) // sum: AtomicLong = 5050

Stream.emits(1 to 100) .parForeach(threads = 8) { i => sum.addAndGet(i) Task.unit } .sync()

sum.get() // Sum of 1..100 // res6: Long = 5050L

.parFold

Parallel reduction with per-thread accumulation and final merge.

import rapid._

val streamResult = Stream.emits(1 to 100) .parFold(0L, threads = 8)( (acc, i) => Task.pure(acc + i), _ + _ ) .sync() // streamResult: Long = 5050L

streamResult // 5050 // res7: Long = 5050L


Advanced: ParallelStream

ParallelStream[T, R] lets you control how elements are processed in parallel with a forge function:T => Task[Option[R]]. Results are ordered by input index and None values are dropped.

import rapid.{Stream, ParallelStream, Task}

val base = Stream.emits(1 to 10) // base: Stream[Int] = rapid.Stream@4dc2ad06 val ps = ParallelStream( stream = base, forge = (i: Int) => Task.pure(if (i % 2 == 0) Some(i * 10) else None), maxThreads = 8, maxBuffer = 100000 ) // ps: ParallelStream[Int, Int] = ParallelStream( // stream = rapid.Stream@4dc2ad06, // forge = repl.MdocSession$MdocApp$$anon$25@6228650e, // maxThreads = 8, // maxBuffer = 100000 // )

val out = ps.toList.sync() // List(20, 40, 60, 80, 100) // out: List[Int] = List(20, 40, 60, 80, 100)

You can collect after the forge to transform only kept values:

val doubledEvens = ps.collect { case x if x % 40 == 0 => x / 20 }.toList.sync() // doubledEvens: List[Int] = List(2, 4)


Error Handling

Task failures raise exceptions. Use attempt to capture errors if you prefer explicit handling.

val t = Task { if (System.currentTimeMillis() % 2L == 0L) "ok" else throw new RuntimeException("boom") } // t: Task[String] = Suspend( // f = repl.MdocSession$MdocApp$$Lambda/0x00000000928b86b8@598b29a8, // trace = SourcecodeTrace( // file = File("README.md"), // line = Line(180), // enclosing = Enclosing("repl.MdocSession.MdocApp#t"), // kind = "apply" // ) // )

t.attempt.sync() match { case scala.util.Success(v) => println(s"Success: $v") case scala.util.Failure(e) => println(s"Error: ${e.getMessage}") } // Success: ok


How It Works

Execution Model

  1. Task describes work.
  2. Fiber runs work (typically on a JVM virtual thread).
  3. Stream is a lazy pull that feeds elements through transforms.
  4. Parallel operators batch/pipeline elements across multiple fibers and preserve ordering when appropriate.

Performance Notes


Benchmarks

Rapid includes JMH benchmarks comparing:

Run:

sbt "jmh:run -i 3 -wi 3 -f1 -t1"