Overview
TheAsyncTessaClient
enables concurrent browser automation for high-volume processing and integration with async Python frameworks.
Copy
import asyncio
from tessa_sdk import AsyncTessaClient
async def main():
async with AsyncTessaClient("YOUR_API_KEY") as client:
jobs = await asyncio.gather(
client.run_browser_agent("Task 1"),
client.run_browser_agent("Task 2"),
client.run_browser_agent("Task 3")
)
results = await asyncio.gather(
*[job.wait_for_completion() for job in jobs]
)
asyncio.run(main())
Constructor
Copy
AsyncTessaClient(
api_key: str = None,
base_url: str = None,
timeout: float = 60.0,
max_retries: int = 3
)
Parameters
Your Tessa API key. If not provided, uses the
TESSA_API_KEY
environment variable.Override the base API URL. Defaults to
https://api.heytessa.ai/v1
.Default timeout for API requests in seconds.
Maximum number of retries for failed requests.
Example
Copy
import asyncio
from tessa_sdk import AsyncTessaClient
async def main():
# Simple initialization
client = AsyncTessaClient("YOUR_API_KEY")
# With custom configuration
client = AsyncTessaClient(
api_key="YOUR_API_KEY",
base_url="https://custom-api.heytessa.ai/v1",
timeout=120.0,
max_retries=5
)
# Using context manager (recommended)
async with AsyncTessaClient("YOUR_API_KEY") as client:
# Client is automatically closed after use
job = await client.run_browser_agent("Your task")
result = await job.wait_for_completion()
asyncio.run(main())
Async Methods
run_browser_agent()
Start a browser agent session asynchronously.Copy
async def run_browser_agent(
directive: str,
initial_url: str = None,
cdp_url: str = None,
live_url: str = None,
action_selection_model: str = "claude-sonnet-4-20250514",
shadow_dom_beta: bool = False,
browser_config: Union[Dict, BrowserConfig] = None
) -> AsyncJob
Parameters
Same as the synchronousTessaClient.run_browser_agent()
.
Returns
Returns anAsyncJob
object with async methods for monitoring.
Example
Copy
async def extract_data():
async with AsyncTessaClient("YOUR_API_KEY") as client:
job = await client.run_browser_agent(
"Extract product data",
initial_url="https://shop.example.com",
browser_config={"width": 1920, "height": 1080}
)
# For sites with complex shadow DOM elements
shadow_job = await client.run_browser_agent(
"Extract data from shadow DOM components",
initial_url="https://complex-spa.example.com",
shadow_dom_beta=True, # Enable enhanced shadow DOM support
browser_config={"width": 1920, "height": 1080}
)
print(f"Job started: {job.job_id}")
print(f"Watch live: {job.live_url}")
result = await job.wait_for_completion()
return result
get_job_status()
Get job status asynchronously.Copy
async def get_job_status(job_id: str) -> JobStatus
Example
Copy
async def check_status(client, job_id):
status = await client.get_job_status(job_id)
print(f"Status: {status.status}")
return status
health_check()
Check API health asynchronously.Copy
async def health_check() -> bool
Example
Copy
async def check_health(client):
is_healthy = await client.health_check()
print("API is" + (" healthy" if is_healthy else " down"))
return is_healthy
close()
Close the client and clean up resources.Copy
async def close() -> None
AsyncJob Object
TheAsyncJob
object provides async methods for job management.
Methods
get_status()
Copy
async def get_status() -> JobStatus
wait_for_completion()
Copy
async def wait_for_completion(
poll_interval: float = 5.0,
timeout: float = None,
verbose: bool = False
) -> JobResult
Concurrent Operations
Parallel Processing
Process multiple URLs concurrently:Copy
import asyncio
from tessa_sdk import AsyncTessaClient
async def process_urls(urls, api_key):
async with AsyncTessaClient(api_key) as client:
# Start all jobs concurrently
jobs = await asyncio.gather(*[
client.run_browser_agent(
f"Extract data from {url}",
initial_url=url
)
for url in urls
])
# Wait for all to complete
results = await asyncio.gather(*[
job.wait_for_completion()
for job in jobs
])
return results
# Run the async function
urls = [
"https://example1.com",
"https://example2.com",
"https://example3.com"
]
results = asyncio.run(process_urls(urls, "YOUR_API_KEY"))
for url, result in zip(urls, results):
if result.is_successful:
print(f"✅ {url}: {len(result.output)} items")
else:
print(f"❌ {url}: {result.error}")
Rate-Limited Concurrency
Control concurrency with semaphores:Copy
import asyncio
from tessa_sdk import AsyncTessaClient
async def process_with_limit(urls, api_key, max_concurrent=3):
semaphore = asyncio.Semaphore(max_concurrent)
async def process_url(client, url):
async with semaphore: # Limit concurrent jobs
job = await client.run_browser_agent(
f"Extract from {url}",
initial_url=url
)
return await job.wait_for_completion()
async with AsyncTessaClient(api_key) as client:
results = await asyncio.gather(*[
process_url(client, url) for url in urls
])
return results
# Process with max 3 concurrent jobs
results = asyncio.run(
process_with_limit(urls, "YOUR_API_KEY", max_concurrent=3)
)
Streaming Results
Process results as they complete:Copy
import asyncio
from tessa_sdk import AsyncTessaClient
async def stream_results(urls, api_key):
async with AsyncTessaClient(api_key) as client:
# Create jobs
jobs = []
for url in urls:
job = await client.run_browser_agent(
f"Extract from {url}",
initial_url=url
)
jobs.append((url, job))
# Process as they complete
for url, job in jobs:
try:
result = await asyncio.wait_for(
job.wait_for_completion(),
timeout=60
)
if result.is_successful:
print(f"✅ {url}: Success")
yield url, result.output
else:
print(f"❌ {url}: {result.error}")
except asyncio.TimeoutError:
print(f"⏱️ {url}: Timeout")
# Use the streaming function
async def main():
async for url, data in stream_results(urls, "YOUR_API_KEY"):
print(f"Processing {url}: {data}")
asyncio.run(main())
Integration with Web Frameworks
FastAPI Integration
Copy
from fastapi import FastAPI, BackgroundTasks
from tessa_sdk import AsyncTessaClient
import asyncio
app = FastAPI()
client = AsyncTessaClient("YOUR_API_KEY")
@app.on_event("startup")
async def startup():
"""Initialize client on startup."""
global client
client = AsyncTessaClient("YOUR_API_KEY")
@app.on_event("shutdown")
async def shutdown():
"""Clean up on shutdown."""
await client.close()
@app.post("/extract")
async def extract_data(url: str, description: str):
"""Extract data from a URL."""
job = await client.run_browser_agent(
f"Go to {url} and extract: {description}",
initial_url=url
)
# Return job ID immediately
return {
"job_id": job.job_id,
"live_url": job.live_url,
"status": "processing"
}
@app.get("/status/{job_id}")
async def get_status(job_id: str):
"""Check job status."""
status = await client.get_job_status(job_id)
return {
"status": status.status,
"output": status.output if status.status == "completed" else None,
"error": status.error if status.status == "failed" else None
}
@app.post("/batch")
async def batch_process(urls: list[str], background_tasks: BackgroundTasks):
"""Process multiple URLs in background."""
async def process_batch():
jobs = await asyncio.gather(*[
client.run_browser_agent(f"Extract from {url}", initial_url=url)
for url in urls
])
results = await asyncio.gather(*[
job.wait_for_completion() for job in jobs
])
# Save results, send notifications, etc.
return results
background_tasks.add_task(process_batch)
return {"message": f"Processing {len(urls)} URLs in background"}
Django Async Views
Copy
from django.http import JsonResponse
from tessa_sdk import AsyncTessaClient
import asyncio
client = AsyncTessaClient("YOUR_API_KEY")
async def extract_view(request):
"""Django async view for data extraction."""
url = request.GET.get('url')
if not url:
return JsonResponse({'error': 'URL required'}, status=400)
job = await client.run_browser_agent(
f"Extract main content from {url}",
initial_url=url
)
result = await job.wait_for_completion(timeout=60)
if result.is_successful:
return JsonResponse({
'success': True,
'data': result.output,
'credits_used': result.credits_used
})
else:
return JsonResponse({
'success': False,
'error': result.error
}, status=500)
Advanced Patterns
Retry with Exponential Backoff
Copy
import asyncio
from tessa_sdk import AsyncTessaClient
from tessa.exceptions import RateLimitError
async def run_with_backoff(client, directive, max_retries=3):
"""Run with exponential backoff on failure."""
for attempt in range(max_retries):
try:
job = await client.run_browser_agent(directive)
return await job.wait_for_completion()
except RateLimitError as e:
if attempt < max_retries - 1:
wait_time = 2 ** attempt # Exponential backoff
print(f"Rate limited. Waiting {wait_time}s...")
await asyncio.sleep(wait_time)
else:
raise
except Exception as e:
if attempt < max_retries - 1:
await asyncio.sleep(1)
else:
raise
Queue-Based Processing
Copy
import asyncio
from tessa_sdk import AsyncTessaClient
class TaskQueue:
"""Queue-based task processor."""
def __init__(self, api_key, max_workers=5):
self.client = AsyncTessaClient(api_key)
self.queue = asyncio.Queue()
self.max_workers = max_workers
self.results = []
async def worker(self):
"""Worker to process tasks from queue."""
while True:
task = await self.queue.get()
if task is None: # Poison pill
break
try:
job = await self.client.run_browser_agent(task['directive'])
result = await job.wait_for_completion()
self.results.append({
'task': task,
'result': result.output if result.is_successful else None,
'error': result.error if not result.is_successful else None
})
except Exception as e:
self.results.append({
'task': task,
'error': str(e)
})
finally:
self.queue.task_done()
async def process(self, tasks):
"""Process all tasks."""
# Add tasks to queue
for task in tasks:
await self.queue.put(task)
# Start workers
workers = [
asyncio.create_task(self.worker())
for _ in range(self.max_workers)
]
# Wait for all tasks
await self.queue.join()
# Stop workers
for _ in workers:
await self.queue.put(None)
await asyncio.gather(*workers)
await self.client.close()
return self.results
# Usage
async def main():
tasks = [
{'directive': 'Extract from site1.com'},
{'directive': 'Extract from site2.com'},
{'directive': 'Extract from site3.com'},
]
processor = TaskQueue("YOUR_API_KEY", max_workers=3)
results = await processor.process(tasks)
for result in results:
print(result)
asyncio.run(main())
Progress Tracking
Copy
import asyncio
from tessa_sdk import AsyncTessaClient
from typing import Callable
async def process_with_progress(
urls: list[str],
api_key: str,
progress_callback: Callable[[int, int], None]
):
"""Process URLs with progress tracking."""
completed = 0
total = len(urls)
async with AsyncTessaClient(api_key) as client:
async def process_url(url):
nonlocal completed
job = await client.run_browser_agent(
f"Extract from {url}",
initial_url=url
)
result = await job.wait_for_completion()
completed += 1
progress_callback(completed, total)
return result
results = await asyncio.gather(*[
process_url(url) for url in urls
])
return results
# Usage with progress bar
from tqdm.asyncio import tqdm
async def main():
urls = ["https://example1.com", "https://example2.com"]
pbar = tqdm(total=len(urls))
def update_progress(completed, total):
pbar.update(1)
results = await process_with_progress(
urls, "YOUR_API_KEY", update_progress
)
pbar.close()
asyncio.run(main())
Performance Tips
Use Connection Pooling
Use Connection Pooling
The async client automatically uses connection pooling for better performance:
Copy
# Client reuses connections across requests
async with AsyncTessaClient("YOUR_API_KEY") as client:
# All jobs share the connection pool
jobs = await asyncio.gather(*[
client.run_browser_agent(task) for task in tasks
])
Batch Operations
Batch Operations
Copy
# ✅ Batch multiple operations
jobs = await asyncio.gather(*[
client.run_browser_agent(task) for task in tasks
])
# ❌ Sequential operations
for task in tasks:
job = await client.run_browser_agent(task)
result = await job.wait_for_completion()
Control Concurrency
Control Concurrency
Copy
# Limit concurrent operations to avoid overwhelming the API
semaphore = asyncio.Semaphore(10)
async def limited_task(client, task):
async with semaphore:
job = await client.run_browser_agent(task)
return await job.wait_for_completion()
Use Timeouts
Use Timeouts
Copy
# Set reasonable timeouts
try:
result = await asyncio.wait_for(
job.wait_for_completion(),
timeout=60.0
)
except asyncio.TimeoutError:
print("Task took too long")
See Also
- TessaClient - Synchronous client
- BrowserAgent - Simple one-line interface
- Advanced Examples - Complex real-world scenarios
- Error Handling - Async error handling patterns