Overview
The Tessa SDK provides a comprehensive exception hierarchy for robust error handling. All SDK exceptions inherit fromTessaError
, making it easy to catch any SDK-related error.
Copy
from tessa_sdk import BrowserAgent
from tessa_sdk.exceptions import (
TessaError, # Base exception
AuthenticationError, # Invalid API key
RateLimitError, # Rate limit exceeded
JobNotFoundError, # Job doesn't exist
JobFailedError, # Job execution failed
ValidationError, # Invalid parameters
TimeoutError, # Operation timed out
ConfigurationError # Configuration issues
)
Exception Hierarchy
Copy
TessaError (Base)
├── AuthenticationError
├── RateLimitError
├── JobNotFoundError
├── JobFailedError
├── ValidationError
├── TimeoutError
└── ConfigurationError
Exception Types
TessaError
Base exception for all SDK errors.Copy
class TessaError(Exception):
message: str # Error message
details: Dict[str, Any] # Additional error details
Copy
try:
result = agent.run("Task")
except TessaError as e:
print(f"SDK Error: {e.message}")
print(f"Details: {e.details}")
AuthenticationError
Raised when API authentication fails.Copy
class AuthenticationError(TessaError):
# Raised for invalid or missing API keys
- Invalid API key
- Missing API key
- Expired API key
- Incorrect environment variable
Copy
from tessa_sdk.exceptions import AuthenticationError
try:
agent = BrowserAgent("invalid_key")
result = agent.run("Task")
except AuthenticationError as e:
print("Authentication failed!")
print("Get your API key at: https://app.heytessa.ai/settings")
print(f"Error: {e}")
RateLimitError
Raised when API rate limits are exceeded.Copy
class RateLimitError(TessaError):
retry_after: Optional[int] # Seconds to wait before retry
retry_after
: Number of seconds to wait before retrying
Copy
from tessa_sdk.exceptions import RateLimitError
import time
def run_with_rate_limit_handling(agent, task):
max_retries = 3
for attempt in range(max_retries):
try:
return agent.run(task)
except RateLimitError as e:
if attempt < max_retries - 1 and e.retry_after:
print(f"Rate limited. Waiting {e.retry_after}s...")
time.sleep(e.retry_after)
else:
raise
JobNotFoundError
Raised when a job ID doesn’t exist.Copy
class JobNotFoundError(TessaError):
job_id: str # The job ID that wasn't found
Copy
from tessa_sdk.exceptions import JobNotFoundError
try:
status = client.get_job_status("non_existent_job")
except JobNotFoundError as e:
print(f"Job not found: {e.job_id}")
print("Please check the job ID and try again")
JobFailedError
Raised when a job fails during execution.Copy
class JobFailedError(TessaError):
job_id: str # Failed job ID
error_message: str # Failure reason
- Browser crash
- Network issues
- Invalid directive
- Timeout during execution
Copy
from tessa_sdk.exceptions import JobFailedError
try:
job = client.run_browser_agent("Complex task")
result = job.wait_for_completion()
except JobFailedError as e:
print(f"Job {e.job_id} failed")
print(f"Reason: {e.error_message}")
# Optionally retry with modified parameters
if "timeout" in e.error_message.lower():
print("Retrying with longer timeout...")
# Retry logic here
ValidationError
Raised when request parameters are invalid.Copy
class ValidationError(TessaError):
errors: List[str] # List of validation errors
- Invalid browser dimensions
- Invalid timeout values
- Missing required parameters
- Invalid model selection
Copy
from tessa_sdk.exceptions import ValidationError
from tessa_sdk import BrowserConfig
try:
# Invalid width (too large)
config = BrowserConfig(width=5000)
agent = BrowserAgent("API_KEY", viewport_width=5000)
except ValidationError as e:
print("Validation failed:")
for error in e.errors:
print(f" - {error}")
TimeoutError
Raised when an operation exceeds the specified timeout.Copy
class TimeoutError(TessaError):
job_id: str # Job that timed out
timeout_seconds: float # Timeout duration
Copy
from tessa_sdk.exceptions import TimeoutError
try:
# Set a 30-second timeout
result = agent.run("Long running task", timeout=30)
except TimeoutError as e:
print(f"Job {e.job_id} timed out after {e.timeout_seconds}s")
print("Consider:")
print(" 1. Increasing the timeout")
print(" 2. Simplifying the task")
print(" 3. Breaking it into smaller steps")
ConfigurationError
Raised for SDK configuration issues.Copy
class ConfigurationError(TessaError):
# Configuration-related errors
- Missing environment variables
- Invalid base URL
- Network configuration issues
Copy
from tessa_sdk.exceptions import ConfigurationError
try:
client = TessaClient(base_url="invalid://url")
except ConfigurationError as e:
print(f"Configuration error: {e}")
print("Check your SDK configuration")
Error Handling Patterns
Basic Error Handling
Copy
from tessa_sdk import BrowserAgent
from tessa_sdk.exceptions import *
agent = BrowserAgent("YOUR_API_KEY")
try:
result = agent.run("Extract data from example.com")
print(f"Success: {result.output}")
except AuthenticationError:
print("❌ Invalid API key")
except RateLimitError as e:
print(f"⏳ Rate limited. Retry after {e.retry_after}s")
except TimeoutError as e:
print(f"⏱️ Timeout after {e.timeout_seconds}s")
except JobFailedError as e:
print(f"❌ Job failed: {e.error_message}")
except ValidationError as e:
print(f"❌ Invalid parameters: {e.errors}")
except TessaError as e:
print(f"❌ SDK Error: {e}")
except Exception as e:
print(f"❌ Unexpected error: {e}")
Retry with Exponential Backoff
Copy
import time
from typing import Optional
from tessa_sdk import BrowserAgent, JobResult
from tessa_sdk.exceptions import RateLimitError, TimeoutError
def run_with_backoff(
agent: BrowserAgent,
directive: str,
max_retries: int = 3,
base_delay: float = 1.0
) -> Optional[JobResult]:
"""Run with exponential backoff on failure."""
for attempt in range(max_retries):
try:
return agent.run(directive, timeout=60)
except RateLimitError as e:
if attempt < max_retries - 1:
# Use provided retry_after or exponential backoff
delay = e.retry_after or (base_delay * (2 ** attempt))
print(f"Rate limited. Waiting {delay}s...")
time.sleep(delay)
else:
raise
except TimeoutError as e:
if attempt < max_retries - 1:
print(f"Timeout on attempt {attempt + 1}. Retrying...")
time.sleep(base_delay * (2 ** attempt))
else:
raise
except Exception as e:
print(f"Attempt {attempt + 1} failed: {e}")
if attempt == max_retries - 1:
raise
time.sleep(base_delay)
return None
# Usage
agent = BrowserAgent("YOUR_API_KEY")
result = run_with_backoff(agent, "Complex extraction task")
Async Error Handling
Copy
import asyncio
from tessa_sdk import AsyncTessaClient
from tessa_sdk.exceptions import *
async def safe_extract(client, url):
"""Extract with comprehensive error handling."""
try:
job = await client.run_browser_agent(
f"Extract all text from {url}",
initial_url=url
)
result = await asyncio.wait_for(
job.wait_for_completion(),
timeout=60.0
)
return {"success": True, "data": result.output}
except asyncio.TimeoutError:
return {"success": False, "error": "Async timeout"}
except AuthenticationError:
return {"success": False, "error": "Invalid API key"}
except RateLimitError as e:
await asyncio.sleep(e.retry_after or 5)
# Recursive retry
return await safe_extract(client, url)
except JobFailedError as e:
return {"success": False, "error": e.error_message}
except Exception as e:
return {"success": False, "error": str(e)}
# Usage
async def main():
async with AsyncTessaClient("YOUR_API_KEY") as client:
results = await asyncio.gather(*[
safe_extract(client, url)
for url in urls
])
for url, result in zip(urls, results):
if result["success"]:
print(f"✅ {url}: {result['data']}")
else:
print(f"❌ {url}: {result['error']}")
asyncio.run(main())
Context-Aware Error Handling
Copy
class SmartBrowserAgent:
"""Wrapper with intelligent error handling."""
def __init__(self, api_key: str):
self.agent = BrowserAgent(api_key)
self.retry_count = {}
def run_with_context(self, directive: str, context: str = None):
"""Run with context-aware error handling."""
# Track retries per directive
retry_key = hash(directive)
self.retry_count[retry_key] = self.retry_count.get(retry_key, 0)
try:
return self.agent.run(directive)
except TimeoutError as e:
if self.retry_count[retry_key] < 2:
# Retry with simpler directive
self.retry_count[retry_key] += 1
simplified = self._simplify_directive(directive)
print(f"Timeout. Retrying with: {simplified}")
return self.run_with_context(simplified, context)
raise
except JobFailedError as e:
if "navigation" in e.error_message.lower():
# Add explicit URL if navigation failed
if context and "http" in context:
modified = f"{directive} at {context}"
print(f"Navigation failed. Retrying with URL: {modified}")
return self.run_with_context(modified)
raise
except RateLimitError as e:
# Automatic retry with backoff
time.sleep(e.retry_after or 5)
return self.run_with_context(directive, context)
def _simplify_directive(self, directive: str) -> str:
"""Simplify a complex directive."""
# Remove extra conditions
if "and" in directive:
return directive.split("and")[0].strip()
return directive
# Usage
smart_agent = SmartBrowserAgent("YOUR_API_KEY")
result = smart_agent.run_with_context(
"Go to example.com and extract all product names, prices, and reviews",
context="https://example.com/products"
)
Error Recovery Strategies
Graceful Degradation
Copy
def extract_with_fallback(agent, url):
"""Extract with progressively simpler strategies."""
strategies = [
"Extract all structured data including names, prices, descriptions, and metadata",
"Extract product names and prices",
"Extract all visible text"
]
for strategy in strategies:
try:
result = agent.run(
f"Go to {url} and {strategy}",
timeout=30
)
if result.is_successful:
return result
except (TimeoutError, JobFailedError) as e:
print(f"Strategy failed: {strategy[:30]}...")
continue
return None # All strategies failed
Circuit Breaker Pattern
Copy
from datetime import datetime, timedelta
class CircuitBreaker:
"""Circuit breaker for API calls."""
def __init__(self, failure_threshold=5, recovery_timeout=60):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.failure_count = 0
self.last_failure_time = None
self.state = "closed" # closed, open, half-open
def call(self, func, *args, **kwargs):
"""Execute function with circuit breaker."""
# Check if circuit should be reset
if self.state == "open":
if (datetime.now() - self.last_failure_time).seconds > self.recovery_timeout:
self.state = "half-open"
self.failure_count = 0
else:
raise Exception("Circuit breaker is open")
try:
result = func(*args, **kwargs)
# Reset on success
if self.state == "half-open":
self.state = "closed"
self.failure_count = 0
return result
except Exception as e:
self.failure_count += 1
self.last_failure_time = datetime.now()
if self.failure_count >= self.failure_threshold:
self.state = "open"
print(f"Circuit breaker opened after {self.failure_count} failures")
raise
# Usage
breaker = CircuitBreaker(failure_threshold=3, recovery_timeout=30)
agent = BrowserAgent("YOUR_API_KEY")
try:
result = breaker.call(agent.run, "Extract data")
except Exception as e:
print(f"Circuit breaker prevented call: {e}")
Logging and Monitoring
Copy
import logging
from datetime import datetime
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger("tessa")
def monitored_extraction(agent, directive):
"""Extract with comprehensive logging."""
start_time = datetime.now()
job_id = None
try:
logger.info(f"Starting extraction: {directive[:50]}...")
# Run the task
result = agent.run(directive, timeout=60)
duration = (datetime.now() - start_time).total_seconds()
logger.info(f"Success! Duration: {duration:.2f}s, Credits: {result.credits_used}")
return result
except AuthenticationError as e:
logger.error(f"Authentication failed: {e}")
raise
except RateLimitError as e:
logger.warning(f"Rate limited. Retry after {e.retry_after}s")
raise
except TimeoutError as e:
duration = (datetime.now() - start_time).total_seconds()
logger.error(f"Timeout after {duration:.2f}s for job {e.job_id}")
raise
except JobFailedError as e:
logger.error(f"Job {e.job_id} failed: {e.error_message}")
raise
except Exception as e:
logger.exception(f"Unexpected error: {e}")
raise
finally:
# Log final status
duration = (datetime.now() - start_time).total_seconds()
logger.info(f"Total execution time: {duration:.2f}s")
Best Practices
Always Handle Authentication Errors
Always Handle Authentication Errors
Copy
try:
agent = BrowserAgent() # Uses env var
except AuthenticationError:
print("Please set TESSA_API_KEY environment variable")
sys.exit(1)
Implement Retry Logic for Transient Errors
Implement Retry Logic for Transient Errors
Copy
# Retry on rate limits and timeouts
transient_errors = (RateLimitError, TimeoutError)
for attempt in range(3):
try:
result = agent.run(task)
break
except transient_errors as e:
if attempt < 2:
time.sleep(5 * (attempt + 1))
else:
raise
Log Errors for Debugging
Log Errors for Debugging
Copy
import json
try:
result = agent.run(task)
except TessaError as e:
# Log full error details
error_log = {
"timestamp": datetime.now().isoformat(),
"error_type": type(e).__name__,
"message": str(e),
"details": e.details
}
with open("errors.log", "a") as f:
f.write(json.dumps(error_log) + "\n")
Provide User-Friendly Error Messages
Provide User-Friendly Error Messages
Copy
ERROR_MESSAGES = {
AuthenticationError: "Invalid API key. Get yours at app.heytessa.ai",
RateLimitError: "Too many requests. Please wait a moment.",
TimeoutError: "The operation took too long. Try a simpler task.",
JobFailedError: "The browser task failed. Please try again.",
ValidationError: "Invalid parameters provided.",
}
try:
result = agent.run(task)
except TessaError as e:
user_message = ERROR_MESSAGES.get(
type(e),
"An error occurred. Please try again."
)
print(f"❌ {user_message}")
logger.error(f"Technical details: {e}")
See Also
- BrowserAgent - Error handling with BrowserAgent
- AsyncTessaClient - Async error handling
- Models - Error-related data models
- API Reference - REST API error codes