streamsql package - github.com/rulego/streamsql - Go Packages (original) (raw)

Package streamsql is a lightweight, SQL-based IoT edge stream processing engine.

StreamSQL provides efficient unbounded data stream processing and analysis capabilities, supporting multiple window types, aggregate functions, custom functions, and seamless integration with the RuleGo ecosystem.

Core Features

• Lightweight design - Pure in-memory operations, no external dependencies • SQL syntax support - Process stream data using familiar SQL syntax • Multiple window types - Sliding, tumbling, counting, and session windows • Event time and processing time - Support both time semantics for accurate stream processing • Watermark mechanism - Handle out-of-order and late-arriving data with configurable tolerance • Rich aggregate functions - MAX, MIN, AVG, SUM, STDDEV, MEDIAN, PERCENTILE, etc. • Plugin-based custom functions - Runtime dynamic registration, supports 8 function types • RuleGo ecosystem integration - Extend input/output sources using RuleGo components

Getting Started

Basic stream data processing:

package main

import ( "fmt" "math/rand" "time" "github.com/rulego/streamsql" )

func main() { // Create StreamSQL instance ssql := streamsql.New()

// Define SQL query - Calculate temperature average by device ID every 5 seconds
sql := `SELECT deviceId,
    AVG(temperature) as avg_temp,
    MIN(humidity) as min_humidity,
    window_start() as start,
    window_end() as end
FROM stream
WHERE deviceId != 'device3'
GROUP BY deviceId, TumblingWindow('5s')`

// Execute SQL, create stream processing task
err := ssql.Execute(sql)
if err != nil {
    panic(err)
}

// Add result processing callback
ssql.AddSink(func(result []map[string]interface{}) {
    fmt.Printf("Aggregation result: %v\n", result)
})

// Simulate sending stream data
go func() {
    ticker := time.NewTicker(1 * time.Second)
    defer ticker.Stop()

    for {
        select {
        case <-ticker.C:
            // Generate random device data
            data := map[string]interface{}{
                "deviceId":    fmt.Sprintf("device%d", rand.Intn(3)+1),
                "temperature": 20.0 + rand.Float64()*10,
                "humidity":    50.0 + rand.Float64()*20,
            }
            ssql.Emit(data)
        }
    }
}()

// Run for 30 seconds
time.Sleep(30 * time.Second)

}

Window Functions

StreamSQL supports multiple window types:

// Tumbling window - Independent window every 5 seconds SELECT AVG(temperature) FROM stream GROUP BY TumblingWindow('5s')

// Sliding window - 30-second window size, slides every 10 seconds SELECT MAX(temperature) FROM stream GROUP BY SlidingWindow('30s', '10s')

// Counting window - One window per 100 records SELECT COUNT(*) FROM stream GROUP BY CountingWindow(100)

// Session window - Automatically closes session after 5-minute timeout SELECT user_id, COUNT(*) FROM stream GROUP BY user_id, SessionWindow('5m')

Event Time vs Processing Time

StreamSQL supports two time semantics for window processing:

## Processing Time (Default)

Processing time uses the system clock when data arrives. Windows are triggered based on data arrival time:

// Processing time window (default) SELECT COUNT(*) FROM stream GROUP BY TumblingWindow('5m') // Windows are triggered every 5 minutes based on when data arrives

## Event Time

Event time uses timestamps embedded in the data itself. Windows are triggered based on event timestamps, allowing correct handling of out-of-order and late-arriving data:

// Event time window - Use 'order_time' field as event timestamp SELECT COUNT(*) as order_count FROM stream GROUP BY TumblingWindow('5m') WITH (TIMESTAMP='order_time')

// Event time with integer timestamp (Unix milliseconds) SELECT AVG(temperature) FROM stream GROUP BY TumblingWindow('1m') WITH (TIMESTAMP='event_time', TIMEUNIT='ms')

## Watermark and Late Data Handling

Event time windows use watermark mechanism to handle out-of-order and late data:

// Configure max out-of-orderness (tolerate 5 seconds of out-of-order data) SELECT COUNT(*) FROM stream GROUP BY TumblingWindow('5m') WITH ( TIMESTAMP='order_time', MAXOUTOFORDERNESS='5s' // Watermark = max(event_time) - 5s )

// Configure allowed lateness (accept late data for 2 seconds after window closes) SELECT COUNT(*) FROM stream GROUP BY TumblingWindow('5m') WITH ( TIMESTAMP='order_time', ALLOWEDLATENESS='2s' // Window stays open for 2s after trigger )

// Combine both configurations SELECT COUNT(*) FROM stream GROUP BY TumblingWindow('5m') WITH ( TIMESTAMP='order_time', MAXOUTOFORDERNESS='5s', // Tolerate 5s out-of-order before trigger ALLOWEDLATENESS='2s' // Accept 2s late data after trigger )

// Configure idle source mechanism (advance watermark based on processing time when data source is idle) SELECT COUNT(*) FROM stream GROUP BY TumblingWindow('5m') WITH ( TIMESTAMP='order_time', IDLETIMEOUT='5s' // If no data arrives within 5s, watermark advances based on processing time )

Key concepts: • MaxOutOfOrderness: Affects watermark calculation, delays window trigger to tolerate out-of-order data • AllowedLateness: Keeps window open after trigger to accept late data and update results • IdleTimeout: When data source is idle (no data arrives within timeout), watermark advances based on processing time to ensure windows can close • Watermark: Indicates that no events with timestamp less than watermark are expected

Custom Functions

StreamSQL supports plugin-based custom functions with runtime dynamic registration:

// Register temperature conversion function functions.RegisterCustomFunction( "fahrenheit_to_celsius", functions.TypeConversion, "Temperature conversion", "Fahrenheit to Celsius", 1, 1, func(ctx *functions.FunctionContext, args []interface{}) (interface{}, error) { f, _ := functions.ConvertToFloat64(args[0]) return (f - 32) * 5 / 9, nil }, )

// Use immediately in SQL sql := SELECT deviceId, AVG(fahrenheit_to_celsius(temperature)) as avg_celsius FROM stream GROUP BY deviceId, TumblingWindow('5s')

Supported custom function types: • TypeMath - Mathematical calculation functions • TypeString - String processing functions • TypeConversion - Type conversion functions • TypeDateTime - Date and time functions • TypeAggregation - Aggregate functions • TypeAnalytical - Analytical functions • TypeWindow - Window functions • TypeCustom - General custom functions

Log Configuration

StreamSQL provides flexible log configuration options:

// Set log level ssql := streamsql.New(streamsql.WithLogLevel(logger.DEBUG))

// Output to file logFile, _ := os.OpenFile("app.log", os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644) ssql := streamsql.New(streamsql.WithLogOutput(logFile, logger.INFO))

// Disable logging (production environment) ssql := streamsql.New(streamsql.WithDiscardLog())

RuleGo Integration

StreamSQL provides deep integration with the RuleGo rule engine through two dedicated components for stream data processing:

• streamTransform (x/streamTransform) - Stream transformer, handles non-aggregation SQL queries • streamAggregator (x/streamAggregator) - Stream aggregator, handles aggregation SQL queries

Basic integration example:

package main

import ( "github.com/rulego/rulego" "github.com/rulego/rulego/api/types" // Register StreamSQL components _ "github.com/rulego/rulego-components/external/streamsql" )

func main() { // Rule chain configuration ruleChainJson := { "ruleChain": {"id": "rule01"}, "metadata": { "nodes": [{ "id": "transform1", "type": "x/streamTransform", "configuration": { "sql": "SELECT deviceId, temperature * 1.8 + 32 as temp_f FROM stream WHERE temperature > 20" } }, { "id": "aggregator1", "type": "x/streamAggregator", "configuration": { "sql": "SELECT deviceId, AVG(temperature) as avg_temp FROM stream GROUP BY deviceId, TumblingWindow('5s')" } }], "connections": [{ "fromId": "transform1", "toId": "aggregator1", "type": "Success" }] } }

// Create rule engine
ruleEngine, _ := rulego.New("rule01", []byte(ruleChainJson))

// Send data
data := `{"deviceId":"sensor01","temperature":25.5}`
msg := types.NewMsg(0, "TELEMETRY", types.JSON, types.NewMetadata(), data)
ruleEngine.OnMsg(msg)

}

This section is empty.

This section is empty.

This section is empty.

type Option func(*Streamsql)

Option defines the configuration option type for StreamSQL

func WithBufferSizes(dataChannelSize, resultChannelSize, windowOutputSize int) Option

WithBufferSizes sets custom buffer sizes

WithCustomPerformance uses custom performance configuration

func WithDiscardLog() Option

WithDiscardLog disables log output

func WithHighPerformance() Option

WithHighPerformance uses high-performance configuration Suitable for scenarios requiring maximum throughput

WithLogLevel sets the log level

func WithLowLatency() Option

WithLowLatency uses low-latency configuration Suitable for real-time interactive applications, minimizing latency

WithMonitoring enables detailed monitoring

WithOverflowStrategy sets the overflow strategy

func WithWorkerConfig(sinkPoolSize, sinkWorkerCount, maxRetryRoutines int) Option

WithWorkerConfig sets the worker pool configuration

func WithZeroDataLoss() Option

WithZeroDataLoss uses zero data loss configuration Suitable for critical business data, ensuring no data loss

type Streamsql struct {

}

Streamsql is the main interface for the StreamSQL streaming engine. It encapsulates core functionality including SQL parsing, stream processing, and window management.

Usage example:

ssql := streamsql.New() err := ssql.Execute("SELECT AVG(temperature) FROM stream GROUP BY TumblingWindow('5s')") ssql.Emit(map[string]interface{}{"temperature": 25.5})

func New(options ...Option) *Streamsql

New creates a new StreamSQL instance. Supports configuration through optional Option parameters.

Parameters:

Returns:

Examples:

// Create default instance ssql := streamsql.New()

// Create high performance instance ssql := streamsql.New(streamsql.WithHighPerformance())

// Create zero data loss instance ssql := streamsql.New(streamsql.WithZeroDataLoss())

func (s *Streamsql) AddSink(sink func([]map[string]interface{}))

AddSink directly adds result processing callback functions. Convenience wrapper for Stream().AddSink() for cleaner API calls.

Parameters:

Examples:

// Directly add result processing ssql.AddSink(func(results []map[string]interface{}) { fmt.Printf("Processing results: %v\n", results) })

// Add multiple processors ssql.AddSink(func(results []map[string]interface{}) { // Save to database saveToDatabase(results) }) ssql.AddSink(func(results []map[string]interface{}) { // Send to message queue sendToQueue(results) })

func (s *Streamsql) Emit(data map[string]interface{})

Emit adds data to the stream processing pipeline. Accepts type-safe map[string]interface{} format data.

Parameters:

Examples:

// Add device data ssql.Emit(map[string]interface{}{ "deviceId": "sensor001", "temperature": 25.5, "humidity": 60.0, "timestamp": time.Now(), })

// Add user behavior data ssql.Emit(map[string]interface{}{ "userId": "user123", "action": "click", "page": "/home", })

EmitSync processes data synchronously, returning results immediately. Only applicable for non-aggregation queries, aggregation queries will return an error. Accepts type-safe map[string]interface{} format data.

Parameters:

Returns:

Examples:

result, err := ssql.EmitSync(map[string]interface{}{ "deviceId": "sensor001", "temperature": 25.5, }) if err != nil { log.Printf("processing error: %v", err) } else if result != nil { // Use processed result immediately (result is map[string]interface{} type) fmt.Printf("Processing result: %v\n", result) }

Execute parses and executes SQL queries, creating corresponding stream processing pipelines. This is the core method of StreamSQL, responsible for converting SQL into actual stream processing logic.

Supported SQL syntax:

Window functions:

Parameters:

Returns:

Examples:

// Basic aggregation query err := ssql.Execute("SELECT deviceId, AVG(temperature) FROM stream GROUP BY deviceId, TumblingWindow('5s')")

// Query with filtering conditions err := ssql.Execute("SELECT * FROM stream WHERE temperature > 30")

// Complex window aggregation err := ssql.Execute( SELECT deviceId, AVG(temperature) as avg_temp, MAX(humidity) as max_humidity FROM stream WHERE deviceId != 'test' GROUP BY deviceId, SlidingWindow('1m', '30s') HAVING avg_temp > 25 LIMIT 100)

func (s *Streamsql) GetDetailedStats() map[string]interface{}

GetDetailedStats returns detailed performance statistics

GetStats returns stream processing statistics

func (s *Streamsql) IsAggregationQuery() bool

IsAggregationQuery checks if the current query is an aggregation query

func (s *Streamsql) PrintTable()

PrintTable prints results to console in table format, similar to database output. Displays column names first, then data rows.

Supported data formats:

Example:

// Print results in table format ssql.PrintTable()

// Output format: // +--------+----------+ // | device | max_temp | // +--------+----------+ // | aa | 30.0 | // | bb | 22.0 | // +--------+----------+

func (s *Streamsql) Stop()

Stop stops the stream processor and releases related resources. After calling this method, the stream processor will stop receiving and processing new data.

Recommended to call this method for cleanup before application exit:

defer ssql.Stop()

Note: StreamSQL instance cannot be restarted after stopping, create a new instance.

Stream returns the underlying stream processor instance. Provides access to lower-level stream processing functionality.

Returns:

Common use cases:

Examples:

// Add result processing callback ssql.Stream().AddSink(func(results []map[string]interface{}) { fmt.Printf("Processing results: %v\n", results) })

// Get result channel resultChan := ssql.Stream().GetResultsChan() go func() { for result := range resultChan { // Process result } }()

func (s *Streamsql) ToChannel() <-chan []map[string]interface{}

ToChannel converts query results to channel output Returns a read-only channel for receiving query results

Notes: