feat: Implement initial Langflow Assistant API with streaming support by Cristhianzl · Pull Request #11374 · langflow-ai/langflow (original) (raw)
Important
Review skipped
Auto incremental reviews are disabled on this repository.
Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository. To trigger a single review, invoke the @coderabbitai review command.
You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.
- 🔍 Trigger a full review
Walkthrough
This PR introduces a new Langflow Agentic Module backend with API endpoints to execute named flows, check assistant configuration, and validate generated component code. It includes flow execution services with validation and retry logic, provider and model management, helper utilities for code extraction and error handling, and comprehensive test coverage.
Changes
| Cohort / File(s) | Summary |
|---|---|
| Dependency Management pyproject.toml | Added runtime dependency markitdown>=0.1.4 for markdown content parsing. |
| API Module Documentation src/backend/base/langflow/agentic/README.md | Added comprehensive backend documentation covering module architecture, API endpoints, request/response schemas, error handling, validation flows, and provider integration patterns. |
| API Package & Schemas src/backend/base/langflow/agentic/api/__init__.py, src/backend/base/langflow/agentic/api/schemas.py | Introduced API package with request/response schemas (AssistantRequest, ValidationResult, StepType). AssistantRequest accepts flow_id, component/field targeting, retry counts, and provider/model overrides. |
| API Router & Endpoints src/backend/base/langflow/agentic/api/router.py | Implemented four main endpoints: execute_named_flow() for direct flow execution, check_assistant_config() for provider/model inventory, assist() for synchronous validation-based flow execution, and assist_stream() for SSE-based streaming execution with progress events. |
| Helper Utilities src/backend/base/langflow/agentic/helpers/__init__.py, src/backend/base/langflow/agentic/helpers/code_extraction.py, src/backend/base/langflow/agentic/helpers/error_handling.py, src/backend/base/langflow/agentic/helpers/sse.py, src/backend/base/langflow/agentic/helpers/validation.py | Added utilities for extracting Python code blocks from markdown, validating component code via instantiation, converting technical error messages to user-friendly text, formatting SSE events (progress, complete, error, tokens), and extracting class names from code. |
| Assistant Service src/backend/base/langflow/agentic/services/__init__.py, src/backend/base/langflow/agentic/services/assistant_service.py | Implemented core validation and retry orchestration with execute_flow_with_validation() for synchronous execution and execute_flow_with_validation_streaming() for SSE-based streaming. Supports up to max_retries validation attempts with contextual error prompting on failure. |
| Flow Executor Service src/backend/base/langflow/agentic/services/flow_executor.py | Introduced flow loading, execution, and streaming capabilities. Features model injection into Agent nodes, FlowExecutionResult dataclass, synchronous and streaming flow execution, token-level event streaming, and response text extraction. |
| Provider Service src/backend/base/langflow/agentic/services/provider_service.py | Implemented provider discovery, default selection, API key resolution, and default model lookup for supported providers (Anthropic, OpenAI, Google Generative AI, Groq). Prioritizes providers based on availability and preferred order. |
| API Integration src/backend/base/langflow/api/__init__.py, src/backend/base/langflow/api/router.py, src/backend/base/langflow/main.py | Reorganized router imports and added lazy-loading of agentic API router to avoid circular dependencies. Router now includes agentic endpoints under /agentic prefix. |
| Flow Execution & Streaming src/lfx/src/lfx/run/base.py | Extended run_flow() to support user_id, session_id, and event_manager parameters for enabling user context isolation and component-level event streaming. |
| Model Metadata & Defaults src/lfx/src/lfx/base/models/unified_models.py | Enhanced provider metadata with model_class, api_key_param, model_name_param fields. Added get_provider_config() function. Updated default model resolution to fetch from user variables instead of frontend flags. |
| Test Coverage - Schemas & Code Extraction src/backend/tests/unit/agentic/api/test_schemas.py, src/backend/tests/unit/agentic/api/test_code_extraction.py | Added Pydantic schema validation tests and comprehensive code extraction tests covering multiple block formats, edge cases (CRLF, Unicode, long blocks), and validation integration. |
| Test Coverage - Streaming & Validation src/backend/tests/unit/agentic/api/test_streaming_validation.py | Implemented extensive tests for SSE event formatting, streaming validation flows, retry logic, non-streaming execution, error handling, and text-heavy response scenarios. |
| Test Coverage - Helpers src/backend/tests/unit/agentic/helpers/__init__.py, src/backend/tests/unit/agentic/helpers/test_error_handling.py, src/backend/tests/unit/agentic/helpers/test_sse.py | Added test suites validating error pattern matching and truncation, SSE formatting across event types, and edge cases. |
| Test Coverage - Services src/backend/tests/unit/agentic/services/__init__.py, src/backend/tests/unit/agentic/services/test_flow_executor.py, src/backend/tests/unit/agentic/services/test_provider_service.py | Implemented tests for flow execution (model injection, error mapping, file handling), provider selection logic, API key resolution, and default model lookup. |
| Test Coverage - Flow Runner src/lfx/tests/unit/run/test_base.py | Updated mock async_start functions to accept optional event_manager parameter for compatibility with streaming support. |
Sequence Diagram(s)
sequenceDiagram
participant Client
participant APIRouter as API Router
(assist endpoint)
participant ProviderSvc as Provider
Service
participant AssistSvc as Assistant
Service
participant FlowExec as Flow
Executor
participant Validation as Code
Validation
Client->>APIRouter: POST /agentic/assist<br/>(flow_id, input_value)
APIRouter->>ProviderSvc: get_enabled_providers_for_user()
ProviderSvc-->>APIRouter: enabled providers list
APIRouter->>ProviderSvc: get_default_provider()
ProviderSvc-->>APIRouter: provider name
APIRouter->>ProviderSvc: check_api_key(provider)
ProviderSvc-->>APIRouter: API key
APIRouter->>AssistSvc: execute_flow_with_validation()
Note over AssistSvc: Attempt 1 (max_retries=3)
AssistSvc->>FlowExec: execute_flow_file()
FlowExec-->>AssistSvc: response text
AssistSvc->>Validation: extract_python_code()
Validation-->>AssistSvc: code block
AssistSvc->>Validation: validate_component_code()
alt Code is valid
Validation-->>AssistSvc: ValidationResult(is_valid=True)
AssistSvc-->>APIRouter: result with validation
else Code is invalid
Validation-->>AssistSvc: ValidationResult(is_valid=False, error)
Note over AssistSvc: Attempt 2+ with error context
AssistSvc->>FlowExec: execute_flow_file()<br/>(retry prompt with error)
FlowExec-->>AssistSvc: corrected response
AssistSvc->>Validation: validate_component_code()
Validation-->>AssistSvc: ValidationResult(is_valid=True)
AssistSvc-->>APIRouter: result with validation
end
APIRouter-->>Client: 200 OK<br/>{validated, class_name, code}Loading
sequenceDiagram
participant Client
participant APIRouter as API Router
(assist_stream)
participant ProviderSvc as Provider
Service
participant AssistSvc as Assistant
Service
participant FlowExec as Flow
Executor
participant SSEHelper as SSE
Formatter
Client->>APIRouter: POST /agentic/assist_stream
APIRouter->>ProviderSvc: get_enabled_providers_for_user()
ProviderSvc-->>APIRouter: enabled providers
APIRouter->>AssistSvc: execute_flow_with_validation_streaming()
AssistSvc->>SSEHelper: format_progress_event(generating)
SSEHelper-->>AssistSvc: SSE data line
AssistSvc-->>APIRouter: progress event
APIRouter-->>Client: SSE event
AssistSvc->>FlowExec: execute_flow_file_streaming()
loop Token streaming
FlowExec-->>AssistSvc: token chunks
AssistSvc-->>APIRouter: token events
APIRouter-->>Client: SSE token data
end
AssistSvc->>SSEHelper: format_progress_event(extracting_code)
AssistSvc-->>APIRouter: progress event
APIRouter-->>Client: SSE event
AssistSvc->>SSEHelper: format_progress_event(validating)
AssistSvc-->>APIRouter: progress event
APIRouter-->>Client: SSE event
alt Validation passes
AssistSvc->>SSEHelper: format_complete_event(validated)
SSEHelper-->>AssistSvc: complete event
AssistSvc-->>APIRouter: complete event
APIRouter-->>Client: SSE complete
else Validation fails (retry < max)
AssistSvc->>SSEHelper: format_progress_event(retrying)
AssistSvc-->>APIRouter: retry event
APIRouter-->>Client: SSE event
AssistSvc->>FlowExec: execute_flow_file_streaming()<br/>(error context)
loop Retry token streaming
FlowExec-->>AssistSvc: tokens
AssistSvc-->>APIRouter: token events
APIRouter-->>Client: SSE tokens
end
AssistSvc->>SSEHelper: format_complete_event(validated)
AssistSvc-->>APIRouter: complete event
APIRouter-->>Client: SSE complete
else Max retries exhausted
AssistSvc->>SSEHelper: format_complete_event(validated=False, error)
SSEHelper-->>AssistSvc: complete event
AssistSvc-->>APIRouter: complete event
APIRouter-->>Client: SSE complete with error
endLoading
Estimated code review effort
🎯 4 (Complex) | ⏱️ ~55 minutes
🚥 Pre-merge checks | ✅ 6 | ❌ 1 ❌ Failed checks (1 warning)
| Check name | Status | Explanation | Resolution |
|---|---|---|---|
| Test Quality And Coverage | ⚠️ Warning | PR lacks endpoint integration tests for four API router endpoints despite well-tested service layer; missing TestClient-based coverage for request/response validation and error handling. | Create endpoint tests in src/backend/tests/unit/agentic/api/test_router.py covering all four endpoints, error scenarios, authentication contexts, and streaming validation with proper async patterns and mocked dependencies. |
✅ Passed checks (6 passed)
| Check name | Status | Explanation |
|---|---|---|
| Description Check | ✅ Passed | Check skipped - CodeRabbit’s high-level summary is enabled. |
| Title check | ✅ Passed | The title clearly describes the primary feature: implementing a Langflow Assistant API with streaming support. It is concise, specific, and directly reflects the main changes in the changeset. |
| Docstring Coverage | ✅ Passed | Docstring coverage is 91.52% which is sufficient. The required threshold is 80.00%. |
| Test Coverage For New Implementations | ✅ Passed | PR includes comprehensive test coverage with 9 test files (2,630+ lines) covering APIs, schemas, code extraction, error handling, and service layers, with test-to-source ratio of approximately 2.8:1. |
| Test File Naming And Structure | ✅ Passed | Pull request demonstrates proper test file naming and structure following pytest conventions with correct naming patterns and logical organization. |
| Excessive Mock Usage Warning | ✅ Passed | Test suite demonstrates appropriate mock usage with minimal mocking for unit tests of helpers and realistic mocking of external dependencies in service tests. |
✏️ Tip: You can configure your own custom pre-merge checks in the settings.
✨ Finishing touches 🧪 Generate unit tests (beta)
- Create PR with unit tests
- Post copyable unit tests in a comment
- Commit unit tests in branch
cz/agentic-api
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.
❤️ Share
Comment @coderabbitai help to get the list of available commands and usage tips.