feat: Implement initial Langflow Assistant API with streaming support by Cristhianzl · Pull Request #11374 · langflow-ai/langflow (original) (raw)

Important

Review skipped

Auto incremental reviews are disabled on this repository.

Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository. To trigger a single review, invoke the @coderabbitai review command.

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

Walkthrough

This PR introduces a new Langflow Agentic Module backend with API endpoints to execute named flows, check assistant configuration, and validate generated component code. It includes flow execution services with validation and retry logic, provider and model management, helper utilities for code extraction and error handling, and comprehensive test coverage.

Changes

Cohort / File(s) Summary
Dependency Management pyproject.toml Added runtime dependency markitdown>=0.1.4 for markdown content parsing.
API Module Documentation src/backend/base/langflow/agentic/README.md Added comprehensive backend documentation covering module architecture, API endpoints, request/response schemas, error handling, validation flows, and provider integration patterns.
API Package & Schemas src/backend/base/langflow/agentic/api/__init__.py, src/backend/base/langflow/agentic/api/schemas.py Introduced API package with request/response schemas (AssistantRequest, ValidationResult, StepType). AssistantRequest accepts flow_id, component/field targeting, retry counts, and provider/model overrides.
API Router & Endpoints src/backend/base/langflow/agentic/api/router.py Implemented four main endpoints: execute_named_flow() for direct flow execution, check_assistant_config() for provider/model inventory, assist() for synchronous validation-based flow execution, and assist_stream() for SSE-based streaming execution with progress events.
Helper Utilities src/backend/base/langflow/agentic/helpers/__init__.py, src/backend/base/langflow/agentic/helpers/code_extraction.py, src/backend/base/langflow/agentic/helpers/error_handling.py, src/backend/base/langflow/agentic/helpers/sse.py, src/backend/base/langflow/agentic/helpers/validation.py Added utilities for extracting Python code blocks from markdown, validating component code via instantiation, converting technical error messages to user-friendly text, formatting SSE events (progress, complete, error, tokens), and extracting class names from code.
Assistant Service src/backend/base/langflow/agentic/services/__init__.py, src/backend/base/langflow/agentic/services/assistant_service.py Implemented core validation and retry orchestration with execute_flow_with_validation() for synchronous execution and execute_flow_with_validation_streaming() for SSE-based streaming. Supports up to max_retries validation attempts with contextual error prompting on failure.
Flow Executor Service src/backend/base/langflow/agentic/services/flow_executor.py Introduced flow loading, execution, and streaming capabilities. Features model injection into Agent nodes, FlowExecutionResult dataclass, synchronous and streaming flow execution, token-level event streaming, and response text extraction.
Provider Service src/backend/base/langflow/agentic/services/provider_service.py Implemented provider discovery, default selection, API key resolution, and default model lookup for supported providers (Anthropic, OpenAI, Google Generative AI, Groq). Prioritizes providers based on availability and preferred order.
API Integration src/backend/base/langflow/api/__init__.py, src/backend/base/langflow/api/router.py, src/backend/base/langflow/main.py Reorganized router imports and added lazy-loading of agentic API router to avoid circular dependencies. Router now includes agentic endpoints under /agentic prefix.
Flow Execution & Streaming src/lfx/src/lfx/run/base.py Extended run_flow() to support user_id, session_id, and event_manager parameters for enabling user context isolation and component-level event streaming.
Model Metadata & Defaults src/lfx/src/lfx/base/models/unified_models.py Enhanced provider metadata with model_class, api_key_param, model_name_param fields. Added get_provider_config() function. Updated default model resolution to fetch from user variables instead of frontend flags.
Test Coverage - Schemas & Code Extraction src/backend/tests/unit/agentic/api/test_schemas.py, src/backend/tests/unit/agentic/api/test_code_extraction.py Added Pydantic schema validation tests and comprehensive code extraction tests covering multiple block formats, edge cases (CRLF, Unicode, long blocks), and validation integration.
Test Coverage - Streaming & Validation src/backend/tests/unit/agentic/api/test_streaming_validation.py Implemented extensive tests for SSE event formatting, streaming validation flows, retry logic, non-streaming execution, error handling, and text-heavy response scenarios.
Test Coverage - Helpers src/backend/tests/unit/agentic/helpers/__init__.py, src/backend/tests/unit/agentic/helpers/test_error_handling.py, src/backend/tests/unit/agentic/helpers/test_sse.py Added test suites validating error pattern matching and truncation, SSE formatting across event types, and edge cases.
Test Coverage - Services src/backend/tests/unit/agentic/services/__init__.py, src/backend/tests/unit/agentic/services/test_flow_executor.py, src/backend/tests/unit/agentic/services/test_provider_service.py Implemented tests for flow execution (model injection, error mapping, file handling), provider selection logic, API key resolution, and default model lookup.
Test Coverage - Flow Runner src/lfx/tests/unit/run/test_base.py Updated mock async_start functions to accept optional event_manager parameter for compatibility with streaming support.

Sequence Diagram(s)

sequenceDiagram participant Client participant APIRouter as API Router
(assist endpoint) participant ProviderSvc as Provider
Service participant AssistSvc as Assistant
Service participant FlowExec as Flow
Executor participant Validation as Code
Validation

Client->>APIRouter: POST /agentic/assist<br/>(flow_id, input_value)
APIRouter->>ProviderSvc: get_enabled_providers_for_user()
ProviderSvc-->>APIRouter: enabled providers list
APIRouter->>ProviderSvc: get_default_provider()
ProviderSvc-->>APIRouter: provider name
APIRouter->>ProviderSvc: check_api_key(provider)
ProviderSvc-->>APIRouter: API key
APIRouter->>AssistSvc: execute_flow_with_validation()
Note over AssistSvc: Attempt 1 (max_retries=3)
AssistSvc->>FlowExec: execute_flow_file()
FlowExec-->>AssistSvc: response text
AssistSvc->>Validation: extract_python_code()
Validation-->>AssistSvc: code block
AssistSvc->>Validation: validate_component_code()
alt Code is valid
    Validation-->>AssistSvc: ValidationResult(is_valid=True)
    AssistSvc-->>APIRouter: result with validation
else Code is invalid
    Validation-->>AssistSvc: ValidationResult(is_valid=False, error)
    Note over AssistSvc: Attempt 2+ with error context
    AssistSvc->>FlowExec: execute_flow_file()<br/>(retry prompt with error)
    FlowExec-->>AssistSvc: corrected response
    AssistSvc->>Validation: validate_component_code()
    Validation-->>AssistSvc: ValidationResult(is_valid=True)
    AssistSvc-->>APIRouter: result with validation
end
APIRouter-->>Client: 200 OK<br/>{validated, class_name, code}

Loading

sequenceDiagram participant Client participant APIRouter as API Router
(assist_stream) participant ProviderSvc as Provider
Service participant AssistSvc as Assistant
Service participant FlowExec as Flow
Executor participant SSEHelper as SSE
Formatter

Client->>APIRouter: POST /agentic/assist_stream
APIRouter->>ProviderSvc: get_enabled_providers_for_user()
ProviderSvc-->>APIRouter: enabled providers
APIRouter->>AssistSvc: execute_flow_with_validation_streaming()
AssistSvc->>SSEHelper: format_progress_event(generating)
SSEHelper-->>AssistSvc: SSE data line
AssistSvc-->>APIRouter: progress event
APIRouter-->>Client: SSE event
AssistSvc->>FlowExec: execute_flow_file_streaming()
loop Token streaming
    FlowExec-->>AssistSvc: token chunks
    AssistSvc-->>APIRouter: token events
    APIRouter-->>Client: SSE token data
end
AssistSvc->>SSEHelper: format_progress_event(extracting_code)
AssistSvc-->>APIRouter: progress event
APIRouter-->>Client: SSE event
AssistSvc->>SSEHelper: format_progress_event(validating)
AssistSvc-->>APIRouter: progress event
APIRouter-->>Client: SSE event
alt Validation passes
    AssistSvc->>SSEHelper: format_complete_event(validated)
    SSEHelper-->>AssistSvc: complete event
    AssistSvc-->>APIRouter: complete event
    APIRouter-->>Client: SSE complete
else Validation fails (retry < max)
    AssistSvc->>SSEHelper: format_progress_event(retrying)
    AssistSvc-->>APIRouter: retry event
    APIRouter-->>Client: SSE event
    AssistSvc->>FlowExec: execute_flow_file_streaming()<br/>(error context)
    loop Retry token streaming
        FlowExec-->>AssistSvc: tokens
        AssistSvc-->>APIRouter: token events
        APIRouter-->>Client: SSE tokens
    end
    AssistSvc->>SSEHelper: format_complete_event(validated)
    AssistSvc-->>APIRouter: complete event
    APIRouter-->>Client: SSE complete
else Max retries exhausted
    AssistSvc->>SSEHelper: format_complete_event(validated=False, error)
    SSEHelper-->>AssistSvc: complete event
    AssistSvc-->>APIRouter: complete event
    APIRouter-->>Client: SSE complete with error
end

Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~55 minutes

🚥 Pre-merge checks | ✅ 6 | ❌ 1 ❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Test Quality And Coverage ⚠️ Warning PR lacks endpoint integration tests for four API router endpoints despite well-tested service layer; missing TestClient-based coverage for request/response validation and error handling. Create endpoint tests in src/backend/tests/unit/agentic/api/test_router.py covering all four endpoints, error scenarios, authentication contexts, and streaming validation with proper async patterns and mocked dependencies.

✅ Passed checks (6 passed)

Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title clearly describes the primary feature: implementing a Langflow Assistant API with streaming support. It is concise, specific, and directly reflects the main changes in the changeset.
Docstring Coverage ✅ Passed Docstring coverage is 91.52% which is sufficient. The required threshold is 80.00%.
Test Coverage For New Implementations ✅ Passed PR includes comprehensive test coverage with 9 test files (2,630+ lines) covering APIs, schemas, code extraction, error handling, and service layers, with test-to-source ratio of approximately 2.8:1.
Test File Naming And Structure ✅ Passed Pull request demonstrates proper test file naming and structure following pytest conventions with correct naming patterns and logical organization.
Excessive Mock Usage Warning ✅ Passed Test suite demonstrates appropriate mock usage with minimal mocking for unit tests of helpers and realistic mocking of external dependencies in service tests.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing touches 🧪 Generate unit tests (beta)


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.