Skip to main content

Overview

The AsyncTessaClient enables concurrent browser automation for high-volume processing and integration with async Python frameworks.
import asyncio
from tessa_sdk import AsyncTessaClient

async def main():
    async with AsyncTessaClient("YOUR_API_KEY") as client:
        jobs = await asyncio.gather(
            client.run_browser_agent("Task 1"),
            client.run_browser_agent("Task 2"),
            client.run_browser_agent("Task 3")
        )
        results = await asyncio.gather(
            *[job.wait_for_completion() for job in jobs]
        )

asyncio.run(main())

Constructor

AsyncTessaClient(
    api_key: str = None,
    base_url: str = None,
    timeout: float = 60.0,
    max_retries: int = 3
)

Parameters

api_key
string
default:"None"
Your Tessa API key. If not provided, uses the TESSA_API_KEY environment variable.
base_url
string
default:"None"
Override the base API URL. Defaults to https://api.heytessa.ai/v1.
timeout
float
default:"60.0"
Default timeout for API requests in seconds.
max_retries
integer
default:"3"
Maximum number of retries for failed requests.

Example

import asyncio
from tessa_sdk import AsyncTessaClient

async def main():
    # Simple initialization
    client = AsyncTessaClient("YOUR_API_KEY")
    
    # With custom configuration
    client = AsyncTessaClient(
        api_key="YOUR_API_KEY",
        base_url="https://custom-api.heytessa.ai/v1",
        timeout=120.0,
        max_retries=5
    )
    
    # Using context manager (recommended)
    async with AsyncTessaClient("YOUR_API_KEY") as client:
        # Client is automatically closed after use
        job = await client.run_browser_agent("Your task")
        result = await job.wait_for_completion()

asyncio.run(main())

Async Methods

run_browser_agent()

Start a browser agent session asynchronously.
async def run_browser_agent(
    directive: str,
    initial_url: str = None,
    cdp_url: str = None,
    live_url: str = None,
    action_selection_model: str = "claude-sonnet-4-20250514",
    shadow_dom_beta: bool = False,
    browser_config: Union[Dict, BrowserConfig] = None
) -> AsyncJob

Parameters

Same as the synchronous TessaClient.run_browser_agent().

Returns

Returns an AsyncJob object with async methods for monitoring.

Example

async def extract_data():
    async with AsyncTessaClient("YOUR_API_KEY") as client:
        job = await client.run_browser_agent(
            "Extract product data",
            initial_url="https://shop.example.com",
            browser_config={"width": 1920, "height": 1080}
        )
        
        # For sites with complex shadow DOM elements
        shadow_job = await client.run_browser_agent(
            "Extract data from shadow DOM components",
            initial_url="https://complex-spa.example.com",
            shadow_dom_beta=True,  # Enable enhanced shadow DOM support
            browser_config={"width": 1920, "height": 1080}
        )
        
        print(f"Job started: {job.job_id}")
        print(f"Watch live: {job.live_url}")
        
        result = await job.wait_for_completion()
        return result

get_job_status()

Get job status asynchronously.
async def get_job_status(job_id: str) -> JobStatus

Example

async def check_status(client, job_id):
    status = await client.get_job_status(job_id)
    print(f"Status: {status.status}")
    return status

health_check()

Check API health asynchronously.
async def health_check() -> bool

Example

async def check_health(client):
    is_healthy = await client.health_check()
    print("API is" + (" healthy" if is_healthy else " down"))
    return is_healthy

close()

Close the client and clean up resources.
async def close() -> None

AsyncJob Object

The AsyncJob object provides async methods for job management.

Methods

get_status()

async def get_status() -> JobStatus

wait_for_completion()

async def wait_for_completion(
    poll_interval: float = 5.0,
    timeout: float = None,
    verbose: bool = False
) -> JobResult

Concurrent Operations

Parallel Processing

Process multiple URLs concurrently:
import asyncio
from tessa_sdk import AsyncTessaClient

async def process_urls(urls, api_key):
    async with AsyncTessaClient(api_key) as client:
        # Start all jobs concurrently
        jobs = await asyncio.gather(*[
            client.run_browser_agent(
                f"Extract data from {url}",
                initial_url=url
            )
            for url in urls
        ])
        
        # Wait for all to complete
        results = await asyncio.gather(*[
            job.wait_for_completion()
            for job in jobs
        ])
        
        return results

# Run the async function
urls = [
    "https://example1.com",
    "https://example2.com",
    "https://example3.com"
]

results = asyncio.run(process_urls(urls, "YOUR_API_KEY"))

for url, result in zip(urls, results):
    if result.is_successful:
        print(f"✅ {url}: {len(result.output)} items")
    else:
        print(f"❌ {url}: {result.error}")

Rate-Limited Concurrency

Control concurrency with semaphores:
import asyncio
from tessa_sdk import AsyncTessaClient

async def process_with_limit(urls, api_key, max_concurrent=3):
    semaphore = asyncio.Semaphore(max_concurrent)
    
    async def process_url(client, url):
        async with semaphore:  # Limit concurrent jobs
            job = await client.run_browser_agent(
                f"Extract from {url}",
                initial_url=url
            )
            return await job.wait_for_completion()
    
    async with AsyncTessaClient(api_key) as client:
        results = await asyncio.gather(*[
            process_url(client, url) for url in urls
        ])
        return results

# Process with max 3 concurrent jobs
results = asyncio.run(
    process_with_limit(urls, "YOUR_API_KEY", max_concurrent=3)
)

Streaming Results

Process results as they complete:
import asyncio
from tessa_sdk import AsyncTessaClient

async def stream_results(urls, api_key):
    async with AsyncTessaClient(api_key) as client:
        # Create jobs
        jobs = []
        for url in urls:
            job = await client.run_browser_agent(
                f"Extract from {url}",
                initial_url=url
            )
            jobs.append((url, job))
        
        # Process as they complete
        for url, job in jobs:
            try:
                result = await asyncio.wait_for(
                    job.wait_for_completion(),
                    timeout=60
                )
                if result.is_successful:
                    print(f"✅ {url}: Success")
                    yield url, result.output
                else:
                    print(f"❌ {url}: {result.error}")
            except asyncio.TimeoutError:
                print(f"⏱️ {url}: Timeout")

# Use the streaming function
async def main():
    async for url, data in stream_results(urls, "YOUR_API_KEY"):
        print(f"Processing {url}: {data}")

asyncio.run(main())

Integration with Web Frameworks

FastAPI Integration

from fastapi import FastAPI, BackgroundTasks
from tessa_sdk import AsyncTessaClient
import asyncio

app = FastAPI()
client = AsyncTessaClient("YOUR_API_KEY")

@app.on_event("startup")
async def startup():
    """Initialize client on startup."""
    global client
    client = AsyncTessaClient("YOUR_API_KEY")

@app.on_event("shutdown")
async def shutdown():
    """Clean up on shutdown."""
    await client.close()

@app.post("/extract")
async def extract_data(url: str, description: str):
    """Extract data from a URL."""
    job = await client.run_browser_agent(
        f"Go to {url} and extract: {description}",
        initial_url=url
    )
    
    # Return job ID immediately
    return {
        "job_id": job.job_id,
        "live_url": job.live_url,
        "status": "processing"
    }

@app.get("/status/{job_id}")
async def get_status(job_id: str):
    """Check job status."""
    status = await client.get_job_status(job_id)
    return {
        "status": status.status,
        "output": status.output if status.status == "completed" else None,
        "error": status.error if status.status == "failed" else None
    }

@app.post("/batch")
async def batch_process(urls: list[str], background_tasks: BackgroundTasks):
    """Process multiple URLs in background."""
    
    async def process_batch():
        jobs = await asyncio.gather(*[
            client.run_browser_agent(f"Extract from {url}", initial_url=url)
            for url in urls
        ])
        
        results = await asyncio.gather(*[
            job.wait_for_completion() for job in jobs
        ])
        
        # Save results, send notifications, etc.
        return results
    
    background_tasks.add_task(process_batch)
    return {"message": f"Processing {len(urls)} URLs in background"}

Django Async Views

from django.http import JsonResponse
from tessa_sdk import AsyncTessaClient
import asyncio

client = AsyncTessaClient("YOUR_API_KEY")

async def extract_view(request):
    """Django async view for data extraction."""
    url = request.GET.get('url')
    
    if not url:
        return JsonResponse({'error': 'URL required'}, status=400)
    
    job = await client.run_browser_agent(
        f"Extract main content from {url}",
        initial_url=url
    )
    
    result = await job.wait_for_completion(timeout=60)
    
    if result.is_successful:
        return JsonResponse({
            'success': True,
            'data': result.output,
            'credits_used': result.credits_used
        })
    else:
        return JsonResponse({
            'success': False,
            'error': result.error
        }, status=500)

Advanced Patterns

Retry with Exponential Backoff

import asyncio
from tessa_sdk import AsyncTessaClient
from tessa.exceptions import RateLimitError

async def run_with_backoff(client, directive, max_retries=3):
    """Run with exponential backoff on failure."""
    
    for attempt in range(max_retries):
        try:
            job = await client.run_browser_agent(directive)
            return await job.wait_for_completion()
            
        except RateLimitError as e:
            if attempt < max_retries - 1:
                wait_time = 2 ** attempt  # Exponential backoff
                print(f"Rate limited. Waiting {wait_time}s...")
                await asyncio.sleep(wait_time)
            else:
                raise
        except Exception as e:
            if attempt < max_retries - 1:
                await asyncio.sleep(1)
            else:
                raise

Queue-Based Processing

import asyncio
from tessa_sdk import AsyncTessaClient

class TaskQueue:
    """Queue-based task processor."""
    
    def __init__(self, api_key, max_workers=5):
        self.client = AsyncTessaClient(api_key)
        self.queue = asyncio.Queue()
        self.max_workers = max_workers
        self.results = []
    
    async def worker(self):
        """Worker to process tasks from queue."""
        while True:
            task = await self.queue.get()
            if task is None:  # Poison pill
                break
            
            try:
                job = await self.client.run_browser_agent(task['directive'])
                result = await job.wait_for_completion()
                self.results.append({
                    'task': task,
                    'result': result.output if result.is_successful else None,
                    'error': result.error if not result.is_successful else None
                })
            except Exception as e:
                self.results.append({
                    'task': task,
                    'error': str(e)
                })
            finally:
                self.queue.task_done()
    
    async def process(self, tasks):
        """Process all tasks."""
        # Add tasks to queue
        for task in tasks:
            await self.queue.put(task)
        
        # Start workers
        workers = [
            asyncio.create_task(self.worker())
            for _ in range(self.max_workers)
        ]
        
        # Wait for all tasks
        await self.queue.join()
        
        # Stop workers
        for _ in workers:
            await self.queue.put(None)
        
        await asyncio.gather(*workers)
        await self.client.close()
        
        return self.results

# Usage
async def main():
    tasks = [
        {'directive': 'Extract from site1.com'},
        {'directive': 'Extract from site2.com'},
        {'directive': 'Extract from site3.com'},
    ]
    
    processor = TaskQueue("YOUR_API_KEY", max_workers=3)
    results = await processor.process(tasks)
    
    for result in results:
        print(result)

asyncio.run(main())

Progress Tracking

import asyncio
from tessa_sdk import AsyncTessaClient
from typing import Callable

async def process_with_progress(
    urls: list[str],
    api_key: str,
    progress_callback: Callable[[int, int], None]
):
    """Process URLs with progress tracking."""
    
    completed = 0
    total = len(urls)
    
    async with AsyncTessaClient(api_key) as client:
        async def process_url(url):
            nonlocal completed
            
            job = await client.run_browser_agent(
                f"Extract from {url}",
                initial_url=url
            )
            result = await job.wait_for_completion()
            
            completed += 1
            progress_callback(completed, total)
            
            return result
        
        results = await asyncio.gather(*[
            process_url(url) for url in urls
        ])
        
        return results

# Usage with progress bar
from tqdm.asyncio import tqdm

async def main():
    urls = ["https://example1.com", "https://example2.com"]
    
    pbar = tqdm(total=len(urls))
    
    def update_progress(completed, total):
        pbar.update(1)
    
    results = await process_with_progress(
        urls, "YOUR_API_KEY", update_progress
    )
    
    pbar.close()

asyncio.run(main())

Performance Tips

The async client automatically uses connection pooling for better performance:
# Client reuses connections across requests
async with AsyncTessaClient("YOUR_API_KEY") as client:
    # All jobs share the connection pool
    jobs = await asyncio.gather(*[
        client.run_browser_agent(task) for task in tasks
    ])
# ✅ Batch multiple operations
jobs = await asyncio.gather(*[
    client.run_browser_agent(task) for task in tasks
])

# ❌ Sequential operations
for task in tasks:
    job = await client.run_browser_agent(task)
    result = await job.wait_for_completion()
# Limit concurrent operations to avoid overwhelming the API
semaphore = asyncio.Semaphore(10)

async def limited_task(client, task):
    async with semaphore:
        job = await client.run_browser_agent(task)
        return await job.wait_for_completion()
# Set reasonable timeouts
try:
    result = await asyncio.wait_for(
        job.wait_for_completion(),
        timeout=60.0
    )
except asyncio.TimeoutError:
    print("Task took too long")

See Also

I