GitHub - rulego/streamsql: Lightweight SQL-based stream processing engine for IoT edge. (original) (raw)

StreamSQL

GoDoc Go Report CI RELEASE codecov Mentioned in Awesome Go

English| 简体中文

StreamSQL is a lightweight, SQL-based stream processing engine for IoT edge, enabling efficient data processing and analysis on unbounded streams.

📖 Documentation | Similar to: Apache Flink

Features

Installation

go get github.com/rulego/streamsql

Usage

StreamSQL supports two main processing modes for different business scenarios:

Non-Aggregation Mode - Real-time Data Transformation and Filtering

Suitable for scenarios requiring real-time response and low latency, where each data record is processed and output immediately.

Typical Use Cases:

package main

import ( "fmt" "time" "github.com/rulego/streamsql" )

func main() { // Create StreamSQL instance ssql := streamsql.New() defer ssql.Stop()

// Non-aggregation SQL: Real-time data transformation and filtering
// Feature: Each input data is processed immediately, no need to wait for windows
rsql := `SELECT deviceId, 
                UPPER(deviceType) as device_type,
                temperature * 1.8 + 32 as temp_fahrenheit,
                CASE WHEN temperature > 30 THEN 'hot'
                     WHEN temperature < 15 THEN 'cold'
                     ELSE 'normal' END as temp_category,
                CONCAT(location, '-', deviceId) as full_identifier,
                NOW() as processed_time
         FROM stream 
         WHERE temperature > 0 AND STARTSWITH(deviceId, 'sensor')`

err := ssql.Execute(rsql)
if err != nil {
    panic(err)
}

// Handle real-time transformation results
ssql.AddSink(func(results []map[string]interface{}) {
    fmt.Printf("Real-time result: %+v\n", results)
})

// Simulate sensor data input
sensorData := []map[string]interface{}{
    {
        "deviceId":     "sensor001",
        "deviceType":   "temperature", 
        "temperature":  25.0,
        "location":     "warehouse-A",
    },
    {
        "deviceId":     "sensor002",
        "deviceType":   "humidity",
        "temperature":  32.5,
        "location":     "warehouse-B", 
    },
    {
        "deviceId":     "pump001",  // Will be filtered out
        "deviceType":   "actuator",
        "temperature":  20.0,
        "location":     "factory",
    },
}

// Process data one by one, each will output results immediately
for _, data := range sensorData {
    ssql.Emit(data)
    //changedData,err:=ssql.EmitSync(data) //Synchronize to obtain processing results
    time.Sleep(100 * time.Millisecond) // Simulate real-time data arrival
}

time.Sleep(500 * time.Millisecond) // Wait for processing completion

}

Aggregation Mode - Windowed Statistical Analysis

Suitable for scenarios requiring statistical analysis and batch processing, collecting data over a period of time for aggregated computation.

Typical Use Cases:

package main

import ( "context" "fmt" "time"

"math/rand"
"sync"
"github.com/rulego/streamsql"

)

// StreamSQL Usage Example // This example demonstrates the complete workflow of StreamSQL: from instance creation to data processing and result handling func main() { // Step 1: Create StreamSQL Instance // StreamSQL is the core component of the stream SQL processing engine, managing the entire stream processing lifecycle ssql := streamsql.New() defer ssql.Stop() // Step 2: Define Stream SQL Query Statement // This SQL statement showcases StreamSQL's core capabilities: // - SELECT: Choose output fields and aggregation functions // - FROM stream: Specify the data source as stream data // - WHERE: Filter condition, excluding device3 data // - GROUP BY: Group by deviceId, combined with tumbling window for aggregation // - TumblingWindow('5s'): 5-second tumbling window, triggers computation every 5 seconds // - avg(), min(): Aggregation functions for calculating average and minimum values // - window_start(), window_end(): Window functions to get window start and end times rsql := "SELECT deviceId,avg(temperature) as avg_temp,min(humidity) as min_humidity ," + "window_start() as start,window_end() as end FROM stream where deviceId!='device3' group by deviceId,TumblingWindow('5s')"

// Step 3: Execute SQL Statement and Start Stream Analysis Task
// The Execute method parses SQL, builds execution plan, initializes window manager and aggregators
err := ssql.Execute(rsql)
if err != nil {
    panic(err)
}

// Step 4: Setup Test Environment and Concurrency Control
var wg sync.WaitGroup
wg.Add(1)
// Set 30-second test timeout to prevent infinite running
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

// Step 5: Start Data Producer Goroutine
// Simulate real-time data stream, continuously feeding data into StreamSQL
go func() {
    defer wg.Done()
    // Create ticker to trigger data generation every second
    ticker := time.NewTicker(1 * time.Second)
    defer ticker.Stop()
    for {
        select {
        case <-ticker.C:
            // Generate 10 random test data points per second, simulating high-frequency data stream
            // This data density tests StreamSQL's real-time processing capability
            for i := 0; i < 10; i++ {
                // Construct device data containing deviceId, temperature, and humidity
                randomData := map[string]interface{}{
                    "deviceId":    fmt.Sprintf("device%d", rand.Intn(2)+1), // Randomly select device1 or device2
                    "temperature": 20.0 + rand.Float64()*10,                // Temperature range: 20-30 degrees
                    "humidity":    50.0 + rand.Float64()*20,                // Humidity range: 50-70%
                }
                // Add data to stream, triggering StreamSQL's real-time processing
                // Emit distributes data to corresponding windows and aggregators
                ssql.Emit(randomData)
            }

        case <-ctx.Done():
            // Timeout or cancellation signal, stop data generation
            return
        }
    }
}()

// Step 6: Setup Result Processing Pipeline
resultChan := make(chan interface{})
// Add computation result callback function (Sink)
// When window triggers computation, results are output through this callback
ssql.AddSink(func(results []map[string]interface{}) {
    resultChan <- results
})

// Step 7: Start Result Consumer Goroutine
// Count received results for effect verification
resultCount := 0
go func() {
    for result := range resultChan {
        // Print results when window computation is triggered (every 5 seconds)
        // This demonstrates StreamSQL's window-based aggregation results
        fmt.Printf("Window Result [%s]: %v\n", time.Now().Format("15:04:05.000"), result)
        resultCount++
    }
}()

// Step 8: Wait for Processing Completion
// Wait for data producer goroutine to finish (30-second timeout or manual cancellation)
wg.Wait()

// Step 9: Display Final Statistics
// Show total number of window results received during the test period
fmt.Printf("\nTotal window results received: %d\n", resultCount)
fmt.Println("StreamSQL processing completed successfully!")

}

Nested Field Access

StreamSQL supports querying nested structured data using dot notation (.) syntax to access nested fields:

// Nested field access example package main

import ( "fmt" "time" "github.com/rulego/streamsql" )

func main() { ssql := streamsql.New() defer ssql.Stop()

// SQL query using nested fields - supports dot notation syntax for accessing nested structures
rsql := `SELECT device.info.name as device_name, 
                device.location,
                AVG(sensor.temperature) as avg_temp,
                COUNT(*) as sensor_count,
                window_start() as start,
                window_end() as end
         FROM stream 
         WHERE device.info.type = 'temperature'
         GROUP BY device.location, TumblingWindow('5s')`

err := ssql.Execute(rsql)
if err != nil {
    panic(err)
}

// Handle aggregation results
ssql.AddSink(func(results []map[string]interface{}) {
    fmt.Printf("Aggregation result: %+v\n", results)
})

// Add nested structured data
nestedData := map[string]interface{}{
    "device": map[string]interface{}{
        "info": map[string]interface{}{
            "name": "temperature-sensor-001",
            "type": "temperature",
        },
        "location": "smart-greenhouse-A",
    },
    "sensor": map[string]interface{}{
        "temperature": 25.5,
        "humidity":    60.2,
    },
    "timestamp": time.Now().Unix(),
}

ssql.Emit(nestedData)

}

Functions

StreamSQL supports a variety of function types, including mathematical, string, conversion, aggregate, analytic, window, and more. Documentation

Concepts

Processing Modes

StreamSQL supports two main processing modes:

Aggregation Mode (Windowed Processing)

Used when the SQL query contains aggregate functions (SUM, AVG, COUNT, etc.) or GROUP BY clauses. Data is collected in windows and aggregated results are output when windows are triggered.

Non-Aggregation Mode (Real-time Processing)

Used for immediate data transformation and filtering without aggregation operations. Each input record is processed and output immediately, providing ultra-low latency for real-time scenarios like data cleaning, enrichment, and filtering.

Windows

Since stream data is unbounded, it cannot be processed as a whole. Windows provide a mechanism to divide unbounded data into a series of bounded data segments for computation. StreamSQL includes the following types of windows:

Stream

Time Semantics

StreamSQL supports two time concepts that determine how windows are divided and triggered:

Event Time

Processing Time

Event Time vs Processing Time Comparison

Feature Event Time Processing Time
Time Source Timestamp field in data System current time
Window Division Based on event timestamp Based on data arrival time
Late Data Handling Supported (Watermark mechanism) Not supported
Out-of-Order Handling Supported (Watermark mechanism) Not supported
Result Accuracy Accurate May be inaccurate
Processing Latency Higher (need to wait for late data) Low (real-time trigger)
Configuration WITH (TIMESTAMP='field') Default (no WITH clause)
Use Cases Precise temporal analysis, historical replay Real-time monitoring, low latency requirements

Window Time

Watermark Mechanism (Event Time Windows Only)

Contribution Guidelines

Pull requests and issues are welcome. Please ensure that the code conforms to Go standards and include relevant test cases.

License

Apache License 2.0