Python

The official Python SDK for Recall allows you to execute workflows programmatically from your Python applications using the official Python SDK.

The Python SDK supports Python 3.8+ with async execution support, automatic rate limiting with exponential backoff, and usage tracking.

Installation

Install the SDK using pip:

pip install recall-sdk

Quick Start

Here's a simple example to get you started:

from recall_sdk import RecallClient

# Initialize the client
client = RecallClient(
    api_key="your-api-key-here",
    base_url="{{APP_URL}}"  # optional, defaults to https://tryrecall.com
)

# Execute a workflow
try:
    result = client.execute_workflow("workflow-id")
    print("Workflow executed successfully:", result)
except Exception as error:
    print("Workflow execution failed:", error)

API Reference

RecallClient

Constructor

RecallClient(api_key: str, base_url: str = "{{APP_URL}}")

Parameters:

  • api_key (str): Your Recall API key
  • base_url (str, optional): Base URL for the Recall API

Methods

execute_workflow()

Execute a workflow with optional input data.

result = client.execute_workflow(
    "workflow-id",
    input={"message": "Hello, world!"},
    timeout=30.0  # 30 seconds
)

Parameters:

  • workflow_id (str): The ID of the workflow to execute
  • input (dict, optional): Input data to pass to the workflow
  • timeout (float, optional): Timeout in seconds (default: 30.0)
  • stream (bool, optional): Enable streaming responses (default: False)
  • selected_outputs (list[str], optional): Block outputs to stream in blockName.attribute format (e.g., ["agent1.content"])
  • async_execution (bool, optional): Execute asynchronously (default: False)

Returns: WorkflowExecutionResult | AsyncExecutionResult

When async_execution=True, returns immediately with a task ID for polling. Otherwise, waits for completion.

get_workflow_status()

Get the status of a workflow (deployment status, etc.).

status = client.get_workflow_status("workflow-id")
print("Is deployed:", status.is_deployed)

Parameters:

  • workflow_id (str): The ID of the workflow

Returns: WorkflowStatus

validate_workflow()

Validate that a workflow is ready for execution.

is_ready = client.validate_workflow("workflow-id")
if is_ready:
    # Workflow is deployed and ready
    pass

Parameters:

  • workflow_id (str): The ID of the workflow

Returns: bool

get_job_status()

Get the status of an async job execution.

status = client.get_job_status("task-id-from-async-execution")
print("Status:", status["status"])  # 'queued', 'processing', 'completed', 'failed'
if status["status"] == "completed":
    print("Output:", status["output"])

Parameters:

  • task_id (str): The task ID returned from async execution

Returns: Dict[str, Any]

Response fields:

  • success (bool): Whether the request was successful
  • taskId (str): The task ID
  • status (str): One of 'queued', 'processing', 'completed', 'failed', 'cancelled'
  • metadata (dict): Contains startedAt, completedAt, and duration
  • output (any, optional): The workflow output (when completed)
  • error (any, optional): Error details (when failed)
  • estimatedDuration (int, optional): Estimated duration in milliseconds (when processing/queued)
execute_with_retry()

Execute a workflow with automatic retry on rate limit errors using exponential backoff.

result = client.execute_with_retry(
    "workflow-id",
    input={"message": "Hello"},
    timeout=30.0,
    max_retries=3,           # Maximum number of retries
    initial_delay=1.0,       # Initial delay in seconds
    max_delay=30.0,          # Maximum delay in seconds
    backoff_multiplier=2.0   # Exponential backoff multiplier
)

Parameters:

  • workflow_id (str): The ID of the workflow to execute
  • input (dict, optional): Input data to pass to the workflow
  • timeout (float, optional): Timeout in seconds
  • stream (bool, optional): Enable streaming responses
  • selected_outputs (list, optional): Block outputs to stream
  • async_execution (bool, optional): Execute asynchronously
  • max_retries (int, optional): Maximum number of retries (default: 3)
  • initial_delay (float, optional): Initial delay in seconds (default: 1.0)
  • max_delay (float, optional): Maximum delay in seconds (default: 30.0)
  • backoff_multiplier (float, optional): Backoff multiplier (default: 2.0)

Returns: WorkflowExecutionResult | AsyncExecutionResult

The retry logic uses exponential backoff (1s → 2s → 4s → 8s...) with ±25% jitter to prevent thundering herd. If the API provides a retry-after header, it will be used instead.

get_rate_limit_info()

Get the current rate limit information from the last API response.

rate_limit_info = client.get_rate_limit_info()
if rate_limit_info:
    print("Limit:", rate_limit_info.limit)
    print("Remaining:", rate_limit_info.remaining)
    print("Reset:", datetime.fromtimestamp(rate_limit_info.reset))

Returns: RateLimitInfo | None

get_usage_limits()

Get current usage limits and quota information for your account.

limits = client.get_usage_limits()
print("Sync requests remaining:", limits.rate_limit["sync"]["remaining"])
print("Async requests remaining:", limits.rate_limit["async"]["remaining"])
print("Current period cost:", limits.usage["currentPeriodCost"])
print("Plan:", limits.usage["plan"])

Returns: UsageLimits

Response structure:

{
    "success": bool,
    "rateLimit": {
        "sync": {
            "isLimited": bool,
            "limit": int,
            "remaining": int,
            "resetAt": str
        },
        "async": {
            "isLimited": bool,
            "limit": int,
            "remaining": int,
            "resetAt": str
        },
        "authType": str  # 'api' or 'manual'
    },
    "usage": {
        "currentPeriodCost": float,
        "limit": float,
        "plan": str  # e.g., 'free', 'pro'
    }
}
set_api_key()

Update the API key.

client.set_api_key("new-api-key")
set_base_url()

Update the base URL.

client.set_base_url("https://my-custom-domain.com")
close()

Close the underlying HTTP session.

client.close()

Data Classes

WorkflowExecutionResult

@dataclass
class WorkflowExecutionResult:
    success: bool
    output: Optional[Any] = None
    error: Optional[str] = None
    logs: Optional[List[Any]] = None
    metadata: Optional[Dict[str, Any]] = None
    trace_spans: Optional[List[Any]] = None
    total_duration: Optional[float] = None

AsyncExecutionResult

@dataclass
class AsyncExecutionResult:
    success: bool
    task_id: str
    status: str  # 'queued'
    created_at: str
    links: Dict[str, str]  # e.g., {"status": "/api/jobs/{taskId}"}

WorkflowStatus

@dataclass
class WorkflowStatus:
    is_deployed: bool
    deployed_at: Optional[str] = None
    needs_redeployment: bool = False

RateLimitInfo

@dataclass
class RateLimitInfo:
    limit: int
    remaining: int
    reset: int
    retry_after: Optional[int] = None

UsageLimits

@dataclass
class UsageLimits:
    success: bool
    rate_limit: Dict[str, Any]
    usage: Dict[str, Any]

RecallError

class RecallError(Exception):
    def __init__(self, message: str, code: Optional[str] = None, status: Optional[int] = None):
        super().__init__(message)
        self.code = code
        self.status = status

Common error codes:

  • UNAUTHORIZED: Invalid API key
  • TIMEOUT: Request timed out
  • RATE_LIMIT_EXCEEDED: Rate limit exceeded
  • USAGE_LIMIT_EXCEEDED: Usage limit exceeded
  • EXECUTION_ERROR: Workflow execution failed

Examples

Basic Workflow Execution

Set up the RecallClient with your API key.

Check if the workflow is deployed and ready for execution.

Run the workflow with your input data.

Process the execution result and handle any errors.

import os
from recall_sdk import RecallClient

client = RecallClient(api_key=os.getenv("RECALL_API_KEY"))

def run_workflow():
    try:
        # Check if workflow is ready
        is_ready = client.validate_workflow("my-workflow-id")
        if not is_ready:
            raise Exception("Workflow is not deployed or ready")

        # Execute the workflow
        result = client.execute_workflow(
            "my-workflow-id",
            input={
                "message": "Process this data",
                "user_id": "12345"
            }
        )

        if result.success:
            print("Output:", result.output)
            print("Duration:", result.metadata.get("duration") if result.metadata else None)
        else:
            print("Workflow failed:", result.error)
            
    except Exception as error:
        print("Error:", error)

run_workflow()

Error Handling

Handle different types of errors that may occur during workflow execution:

from recall_sdk import RecallClient, RecallError
import os

client = RecallClient(api_key=os.getenv("RECALL_API_KEY"))

def execute_with_error_handling():
    try:
        result = client.execute_workflow("workflow-id")
        return result
    except RecallError as error:
        if error.code == "UNAUTHORIZED":
            print("Invalid API key")
        elif error.code == "TIMEOUT":
            print("Workflow execution timed out")
        elif error.code == "USAGE_LIMIT_EXCEEDED":
            print("Usage limit exceeded")
        elif error.code == "INVALID_JSON":
            print("Invalid JSON in request body")
        else:
            print(f"Workflow error: {error}")
        raise
    except Exception as error:
        print(f"Unexpected error: {error}")
        raise

Context Manager Usage

Use the client as a context manager to automatically handle resource cleanup:

from recall_sdk import RecallClient
import os

# Using context manager to automatically close the session
with RecallClient(api_key=os.getenv("RECALL_API_KEY")) as client:
    result = client.execute_workflow("workflow-id")
    print("Result:", result)
# Session is automatically closed here

Batch Workflow Execution

Execute multiple workflows efficiently:

from recall_sdk import RecallClient
import os

client = RecallClient(api_key=os.getenv("RECALL_API_KEY"))   

def execute_workflows_batch(workflow_data_pairs):
    """Execute multiple workflows with different input data."""
    results = []
    
    for workflow_id, input_data in workflow_data_pairs:
        try:
            # Validate workflow before execution
            if not client.validate_workflow(workflow_id):
                print(f"Skipping {workflow_id}: not deployed")
                continue
                
            result = client.execute_workflow(workflow_id, input_data)
            results.append({
                "workflow_id": workflow_id,
                "success": result.success,
                "output": result.output,
                "error": result.error
            })
            
        except Exception as error:
            results.append({
                "workflow_id": workflow_id,
                "success": False,
                "error": str(error)
            })
    
    return results

# Example usage
workflows = [
    ("workflow-1", {"type": "analysis", "data": "sample1"}),
    ("workflow-2", {"type": "processing", "data": "sample2"}),
]

results = execute_workflows_batch(workflows)
for result in results:
    print(f"Workflow {result['workflow_id']}: {'Success' if result['success'] else 'Failed'}")

Async Workflow Execution

Execute workflows asynchronously for long-running tasks:

import os
import time
from recall_sdk import RecallClient

client = RecallClient(api_key=os.getenv("RECALL_API_KEY"))

def execute_async():
    try:
        # Start async execution
        result = client.execute_workflow(
            "workflow-id",
            input={"data": "large dataset"},
            async_execution=True  # Execute asynchronously
        )

        # Check if result is an async execution
        if hasattr(result, 'task_id'):
            print(f"Task ID: {result.task_id}")
            print(f"Status endpoint: {result.links['status']}")

            # Poll for completion
            status = client.get_job_status(result.task_id)

            while status["status"] in ["queued", "processing"]:
                print(f"Current status: {status['status']}")
                time.sleep(2)  # Wait 2 seconds
                status = client.get_job_status(result.task_id)

            if status["status"] == "completed":
                print("Workflow completed!")
                print(f"Output: {status['output']}")
                print(f"Duration: {status['metadata']['duration']}")
            else:
                print(f"Workflow failed: {status['error']}")

    except Exception as error:
        print(f"Error: {error}")

execute_async()

Rate Limiting and Retry

Handle rate limits automatically with exponential backoff:

import os
from recall_sdk import RecallClient, RecallError

client = RecallClient(api_key=os.getenv("RECALL_API_KEY"))

def execute_with_retry_handling():
    try:
        # Automatically retries on rate limit
        result = client.execute_with_retry(
            "workflow-id",
            input={"message": "Process this"},
            max_retries=5,
            initial_delay=1.0,
            max_delay=60.0,
            backoff_multiplier=2.0
        )

        print(f"Success: {result}")
    except RecallError as error:
        if error.code == "RATE_LIMIT_EXCEEDED":
            print("Rate limit exceeded after all retries")

            # Check rate limit info
            rate_limit_info = client.get_rate_limit_info()
            if rate_limit_info:
                from datetime import datetime
                reset_time = datetime.fromtimestamp(rate_limit_info.reset)
                print(f"Rate limit resets at: {reset_time}")

execute_with_retry_handling()

Usage Monitoring

Monitor your account usage and limits:

import os
from recall_sdk import RecallClient

client = RecallClient(api_key=os.getenv("RECALL_API_KEY"))

def check_usage():
    try:
        limits = client.get_usage_limits()

        print("=== Rate Limits ===")
        print("Sync requests:")
        print(f"  Limit: {limits.rate_limit['sync']['limit']}")
        print(f"  Remaining: {limits.rate_limit['sync']['remaining']}")
        print(f"  Resets at: {limits.rate_limit['sync']['resetAt']}")
        print(f"  Is limited: {limits.rate_limit['sync']['isLimited']}")

        print("\nAsync requests:")
        print(f"  Limit: {limits.rate_limit['async']['limit']}")
        print(f"  Remaining: {limits.rate_limit['async']['remaining']}")
        print(f"  Resets at: {limits.rate_limit['async']['resetAt']}")
        print(f"  Is limited: {limits.rate_limit['async']['isLimited']}")

        print("\n=== Usage ===")
        print(f"Current period cost: ${limits.usage['currentPeriodCost']:.2f}")
        print(f"Limit: ${limits.usage['limit']:.2f}")
        print(f"Plan: {limits.usage['plan']}")

        percent_used = (limits.usage['currentPeriodCost'] / limits.usage['limit']) * 100
        print(f"Usage: {percent_used:.1f}%")

        if percent_used > 80:
            print("⚠️  Warning: You are approaching your usage limit!")

    except Exception as error:
        print(f"Error checking usage: {error}")

check_usage()

Streaming Workflow Execution

Execute workflows with real-time streaming responses:

from recall_sdk import RecallClient
import os

client = RecallClient(api_key=os.getenv("RECALL_API_KEY"))

def execute_with_streaming():
    """Execute workflow with streaming enabled."""
    try:
        # Enable streaming for specific block outputs
        result = client.execute_workflow(
            "workflow-id",
            input={"message": "Count to five"},
            stream=True,
            selected_outputs=["agent1.content"]  # Use blockName.attribute format
        )

        print("Workflow result:", result)
    except Exception as error:
        print("Error:", error)

execute_with_streaming()

The streaming response follows the Server-Sent Events (SSE) format:

data: {"blockId":"7b7735b9-19e5-4bd6-818b-46aae2596e9f","chunk":"One"}

data: {"blockId":"7b7735b9-19e5-4bd6-818b-46aae2596e9f","chunk":", two"}

data: {"event":"done","success":true,"output":{},"metadata":{"duration":610}}

data: [DONE]

Flask Streaming Example:

from flask import Flask, Response, stream_with_context
import requests
import json
import os

app = Flask(__name__)

@app.route('/stream-workflow')
def stream_workflow():
    """Stream workflow execution to the client."""

    def generate():
        response = requests.post(
            '{{APP_URL}}/api/workflows/WORKFLOW_ID/execute',
            headers={
                'Content-Type': 'application/json',
                'X-API-Key': os.getenv('RECALL_API_KEY')
            },
            json={
                'message': 'Generate a story',
                'stream': True,
                'selectedOutputs': ['agent1.content']
            },
            stream=True
        )

        for line in response.iter_lines():
            if line:
                decoded_line = line.decode('utf-8')
                if decoded_line.startswith('data: '):
                    data = decoded_line[6:]  # Remove 'data: ' prefix

                    if data == '[DONE]':
                        break

                    try:
                        parsed = json.loads(data)
                        if 'chunk' in parsed:
                            yield f"data: {json.dumps(parsed)}\n\n"
                        elif parsed.get('event') == 'done':
                            yield f"data: {json.dumps(parsed)}\n\n"
                            print("Execution complete:", parsed.get('metadata'))
                    except json.JSONDecodeError:
                        pass

    return Response(
        stream_with_context(generate()),
        mimetype='text/event-stream'
    )

if __name__ == '__main__':
    app.run(debug=True)

Environment Configuration

Configure the client using environment variables:

import os
from recall_sdk import RecallClient

# Development configuration
client = RecallClient(
    api_key=os.getenv("RECALL_API_KEY")
    base_url=os.getenv("RECALL_BASE_URL", "{{APP_URL}}")
)
import os
from recall_sdk import RecallClient

# Production configuration with error handling
api_key = os.getenv("RECALL_API_KEY")
if not api_key:
    raise ValueError("RECALL_API_KEY environment variable is required")

client = RecallClient(
    api_key=api_key,
    base_url=os.getenv("RECALL_BASE_URL", "{{APP_URL}}")
)

Getting Your API Key

Navigate to Recall and log in to your account.

Navigate to the workflow you want to execute programmatically.

Click on "Deploy" to deploy your workflow if it hasn't been deployed yet.

During the deployment process, select or create an API key.

Copy the API key to use in your Python application.

Requirements

  • Python 3.8+
  • requests >= 2.25.0

Common Questions

Yes. Workflows must be deployed before they can be executed through the SDK. You can use the validate_workflow() method to check whether a workflow is deployed and ready. If it returns False, deploy the workflow from the Recall UI first and create or select an API key during deployment.
Sync execution (the default) blocks until the workflow completes and returns the full result. Async execution (async_execution=True) returns immediately with a task ID that you can poll using get_job_status(). Use async mode for long-running workflows to avoid request timeouts. Async job statuses include queued, processing, completed, failed, and cancelled.
The SDK provides built-in rate limiting support through the execute_with_retry() method. It uses exponential backoff (1s, 2s, 4s, 8s...) with 25% jitter to avoid thundering herd problems. If the API returns a retry-after header, that value is used instead. You can configure max_retries, initial_delay, max_delay, and backoff_multiplier. Use get_rate_limit_info() to check your current rate limit status.
Yes. The RecallClient supports Python's context manager protocol. Use it with the 'with' statement to automatically close the underlying HTTP session when you are done, which is especially useful for scripts that create and discard client instances.
The SDK raises RecallError with a code property for API-specific errors. Common error codes are UNAUTHORIZED (invalid API key), TIMEOUT (request timed out), RATE_LIMIT_EXCEEDED (too many requests), USAGE_LIMIT_EXCEEDED (billing limit reached), and EXECUTION_ERROR (workflow failed). Use the error code to implement targeted error handling and recovery logic.
Use the get_usage_limits() method to check your current usage. It returns sync and async rate limit details (limit, remaining, reset time, whether you are currently limited), plus your current period cost, usage limit, and plan tier. This lets you monitor consumption and alert before hitting limits.

On this page