Skip to main content

Overview

This guide provides comprehensive examples of advanced browser automation scenarios using the Tessa SDK. Each example includes complete code with error handling and best practices.

E-Commerce Automation

Price Monitoring System

Monitor product prices across multiple e-commerce sites with alerts.
import json
import time
from datetime import datetime
from typing import List, Dict
from tessa_sdk import BrowserAgent, TessaClient
from tessa_sdk.exceptions import RateLimitError, TimeoutError

class PriceMonitor:
    """Monitor product prices across multiple sites."""
    
    def __init__(self, api_key: str, alert_threshold: float = 0.1):
        self.agent = BrowserAgent(api_key, residential_ip=True)
        self.alert_threshold = alert_threshold  # 10% price drop alert
        self.price_history = {}
    
    def monitor_product(self, product: Dict) -> Dict:
        """Monitor a single product."""
        
        try:
            result = self.agent.extract(
                url=product["url"],
                data_description="product name, current price, availability, and seller name"
            )
            
            if result.is_successful:
                current_price = self._parse_price(result.output.get("price", ""))
                product_name = result.output.get("name", "Unknown")
                
                # Check for price drop
                if product["url"] in self.price_history:
                    last_price = self.price_history[product["url"]]
                    if current_price < last_price * (1 - self.alert_threshold):
                        self._send_alert(product_name, last_price, current_price, product["url"])
                
                # Update history
                self.price_history[product["url"]] = current_price
                
                return {
                    "url": product["url"],
                    "name": product_name,
                    "price": current_price,
                    "available": result.output.get("availability", "Unknown"),
                    "timestamp": datetime.now().isoformat(),
                    "credits_used": result.credits_used
                }
                
        except (RateLimitError, TimeoutError) as e:
            print(f"Error monitoring {product['url']}: {e}")
            return None
    
    def _parse_price(self, price_str: str) -> float:
        """Parse price from string."""
        import re
        # Extract numeric value from price string
        match = re.search(r'[\d,]+\.?\d*', price_str.replace(',', ''))
        return float(match.group()) if match else 0.0
    
    def _send_alert(self, product_name: str, old_price: float, new_price: float, url: str):
        """Send price drop alert."""
        discount = (old_price - new_price) / old_price * 100
        print(f"🔔 PRICE ALERT: {product_name}")
        print(f"   Price dropped {discount:.1f}% from ${old_price:.2f} to ${new_price:.2f}")
        print(f"   URL: {url}")
        
        # Here you could send email, SMS, or push notification
    
    def monitor_all(self, products: List[Dict], interval_minutes: int = 60):
        """Continuously monitor all products."""
        
        while True:
            results = []
            for product in products:
                result = self.monitor_product(product)
                if result:
                    results.append(result)
                time.sleep(2)  # Avoid rate limiting
            
            # Save results
            with open(f"price_monitor_{datetime.now().strftime('%Y%m%d')}.json", "a") as f:
                json.dump({"timestamp": datetime.now().isoformat(), "results": results}, f)
                f.write("\n")
            
            print(f"Monitored {len(results)} products. Next check in {interval_minutes} minutes.")
            time.sleep(interval_minutes * 60)

# Usage
monitor = PriceMonitor("YOUR_API_KEY", alert_threshold=0.05)

products = [
    {"url": "https://amazon.com/dp/B08N5WRWNW", "name": "Echo Dot"},
    {"url": "https://amazon.com/dp/B07FZ8S74R", "name": "Kindle"},
]

# Run monitoring (continuous)
# monitor.monitor_all(products, interval_minutes=30)

# Or single check
result = monitor.monitor_product(products[0])
print(result)

Inventory Tracker

Track product availability and get notified when items are back in stock.
import asyncio
from typing import List, Dict
from tessa_sdk import AsyncTessaClient
from datetime import datetime

class InventoryTracker:
    """Track product inventory across multiple sites."""
    
    def __init__(self, api_key: str):
        self.api_key = api_key
        self.out_of_stock_items = set()
    
    async def check_availability(self, product_url: str) -> Dict:
        """Check if a product is available."""
        
        async with AsyncTessaClient(self.api_key) as client:
            job = await client.run_browser_agent(
                f"""Go to {product_url} and check:
                1. Is the product in stock?
                2. How many units are available?
                3. Are there any size/color variants available?
                4. What's the estimated delivery date?
                """,
                initial_url=product_url
            )
            
            result = await job.wait_for_completion(timeout=60)
            
            if result.is_successful:
                return {
                    "url": product_url,
                    "in_stock": self._parse_availability(result.output),
                    "details": result.output,
                    "checked_at": datetime.now().isoformat()
                }
            return None
    
    def _parse_availability(self, output: Dict) -> bool:
        """Parse availability from output."""
        if isinstance(output, dict):
            in_stock_indicators = ["in stock", "available", "add to cart"]
            output_str = str(output).lower()
            return any(indicator in output_str for indicator in in_stock_indicators)
        return False
    
    async def track_multiple(self, products: List[str]) -> List[Dict]:
        """Track multiple products concurrently."""
        
        async with AsyncTessaClient(self.api_key) as client:
            tasks = []
            for url in products:
                task = client.run_browser_agent(
                    f"Check if product at {url} is in stock and get availability details",
                    initial_url=url
                )
                tasks.append(task)
            
            jobs = await asyncio.gather(*tasks)
            results = await asyncio.gather(*[
                job.wait_for_completion(timeout=60) for job in jobs
            ])
            
            availability = []
            for url, result in zip(products, results):
                if result.is_successful:
                    is_available = self._parse_availability(result.output)
                    
                    # Check for restock
                    if url in self.out_of_stock_items and is_available:
                        print(f"🎉 BACK IN STOCK: {url}")
                        self.out_of_stock_items.remove(url)
                    elif not is_available:
                        self.out_of_stock_items.add(url)
                    
                    availability.append({
                        "url": url,
                        "available": is_available,
                        "details": result.output
                    })
            
            return availability

# Usage
async def main():
    tracker = InventoryTracker("YOUR_API_KEY")
    
    products = [
        "https://store.example.com/product1",
        "https://store.example.com/product2",
    ]
    
    results = await tracker.track_multiple(products)
    for result in results:
        status = "✅ In Stock" if result["available"] else "❌ Out of Stock"
        print(f"{result['url']}: {status}")

# asyncio.run(main())

Data Extraction & Web Scraping

Multi-Page Data Extraction

Extract data from paginated results.
from tessa_sdk import TessaClient, BrowserConfig
from typing import List, Dict

class PaginatedScraper:
    """Scrape data from paginated websites."""
    
    def __init__(self, api_key: str):
        self.client = TessaClient(api_key)
    
    def scrape_all_pages(
        self,
        base_url: str,
        data_description: str,
        max_pages: int = None
    ) -> List[Dict]:
        """Scrape data from all pages."""
        
        all_data = []
        page = 1
        
        with self.client as client:
            while True:
                if max_pages and page > max_pages:
                    break
                
                print(f"Scraping page {page}...")
                
                job = client.run_browser_agent(
                    f"""
                    1. Go to {base_url} (page {page} if applicable)
                    2. Extract: {data_description}
                    3. Check if there's a 'Next' button or page {page + 1}
                    4. Return the data and whether more pages exist
                    """,
                    initial_url=f"{base_url}?page={page}" if page > 1 else base_url,
                    browser_config=BrowserConfig(
                        width=1920,
                        height=1080,
                        max_duration_minutes=5
                    )
                )
                
                result = job.wait_for_completion(verbose=True)
                
                if result.is_successful:
                    page_data = result.output.get("data", [])
                    has_next = result.output.get("has_next_page", False)
                    
                    all_data.extend(page_data if isinstance(page_data, list) else [page_data])
                    
                    if not has_next:
                        print(f"Reached last page (page {page})")
                        break
                    
                    page += 1
                else:
                    print(f"Failed to scrape page {page}: {result.error}")
                    break
        
        return all_data
    
    def scrape_with_infinite_scroll(
        self,
        url: str,
        data_description: str,
        scroll_count: int = 5
    ) -> List[Dict]:
        """Scrape from infinite scroll pages."""
        
        with self.client as client:
            job = client.run_browser_agent(
                f"""
                1. Go to {url}
                2. Scroll down {scroll_count} times, waiting for content to load each time
                3. After each scroll, extract new items matching: {data_description}
                4. Return all unique items found
                """,
                initial_url=url,
                browser_config=BrowserConfig(
                    max_duration_minutes=10
                )
            )
            
            result = job.wait_for_completion()
            
            if result.is_successful:
                return result.output
            else:
                print(f"Scraping failed: {result.error}")
                return []

# Usage
scraper = PaginatedScraper("YOUR_API_KEY")

# Scrape paginated results
data = scraper.scrape_all_pages(
    base_url="https://news.site.com/articles",
    data_description="article titles, authors, dates, and URLs",
    max_pages=5
)

print(f"Scraped {len(data)} items")

Dynamic Content Extraction

Extract data from JavaScript-heavy sites with dynamic content.
from tessa_sdk import BrowserAgent

class DynamicContentExtractor:
    """Extract data from dynamic JavaScript-rendered content."""
    
    def __init__(self, api_key: str):
        self.agent = BrowserAgent(api_key, verbose=True)
    
    def extract_spa_content(self, url: str, wait_conditions: List[str]) -> Dict:
        """Extract from Single Page Applications."""
        
        wait_instruction = " AND ".join(wait_conditions)
        
        result = self.agent.run(f"""
            1. Navigate to {url}
            2. Wait until {wait_instruction}
            3. Extract all visible content including:
               - Text content
               - Image URLs and alt texts
               - Link URLs and anchor texts
               - Form fields and their values
            4. Capture any dynamically loaded data
        """, initial_url=url, timeout=120)
        
        return result.output if result.is_successful else None
    
    def extract_after_interaction(
        self,
        url: str,
        interactions: List[str],
        target_data: str
    ) -> Dict:
        """Extract data after performing interactions."""
        
        interaction_steps = "\n".join([
            f"{i+1}. {action}" for i, action in enumerate(interactions)
        ])
        
        result = self.agent.run(f"""
            Navigate to {url}
            {interaction_steps}
            After completing all interactions, extract: {target_data}
        """, initial_url=url)
        
        return result.output if result.is_successful else None

# Usage
extractor = DynamicContentExtractor("YOUR_API_KEY")

# Extract from SPA
spa_data = extractor.extract_spa_content(
    url="https://spa-app.example.com",
    wait_conditions=[
        "the loading spinner disappears",
        "product cards are visible",
        "the price information loads"
    ]
)

# Extract after interactions
interactive_data = extractor.extract_after_interaction(
    url="https://interactive-site.com",
    interactions=[
        "Click on 'Show More' button",
        "Select 'All Categories' from dropdown",
        "Click 'Apply Filters' button",
        "Wait for results to update"
    ],
    target_data="filtered product list with names, prices, and ratings"
)

Social Media Analytics

Social Media Monitoring

Monitor social media profiles and extract analytics.
import asyncio
from datetime import datetime, timedelta
from typing import List, Dict
from tessa_sdk import AsyncTessaClient

class SocialMediaMonitor:
    """Monitor social media profiles and content."""
    
    def __init__(self, api_key: str):
        self.api_key = api_key
    
    async def monitor_profile(self, platform: str, username: str) -> Dict:
        """Monitor a social media profile."""
        
        platform_urls = {
            "twitter": f"https://x.com/{username}",
            "instagram": f"https://instagram.com/{username}",
            "linkedin": f"https://linkedin.com/in/{username}",
            "youtube": f"https://youtube.com/@{username}"
        }
        
        url = platform_urls.get(platform.lower())
        if not url:
            raise ValueError(f"Unsupported platform: {platform}")
        
        async with AsyncTessaClient(self.api_key) as client:
            job = await client.run_browser_agent(
                f"""
                Go to {url} and extract:
                1. Profile statistics (followers, following, posts count)
                2. Recent posts (last 5) with:
                   - Content/caption
                   - Engagement metrics (likes, comments, shares)
                   - Post timestamp
                3. Profile bio/description
                4. Verification status
                """,
                initial_url=url,
                browser_config={"residential_ip": True}
            )
            
            result = await job.wait_for_completion(timeout=90)
            
            if result.is_successful:
                return {
                    "platform": platform,
                    "username": username,
                    "data": result.output,
                    "monitored_at": datetime.now().isoformat()
                }
            return None
    
    async def track_hashtag(self, platform: str, hashtag: str, limit: int = 20) -> List[Dict]:
        """Track posts with specific hashtag."""
        
        hashtag = hashtag.lstrip('#')
        
        async with AsyncTessaClient(self.api_key) as client:
            job = await client.run_browser_agent(
                f"""
                Search for #{hashtag} on {platform} and extract the first {limit} posts:
                - Post author and username
                - Post content
                - Engagement metrics
                - Post timestamp
                - Media type (photo/video/text)
                """,
                browser_config={"residential_ip": True}
            )
            
            result = await job.wait_for_completion(timeout=120)
            
            if result.is_successful:
                return result.output
            return []
    
    async def analyze_engagement_trends(
        self,
        profiles: List[Dict[str, str]]
    ) -> Dict:
        """Analyze engagement trends across multiple profiles."""
        
        all_data = []
        
        async with AsyncTessaClient(self.api_key) as client:
            tasks = []
            for profile in profiles:
                task = self.monitor_profile(
                    profile["platform"],
                    profile["username"]
                )
                tasks.append(task)
            
            results = await asyncio.gather(*tasks, return_exceptions=True)
            
            for result in results:
                if isinstance(result, dict):
                    all_data.append(result)
        
        # Analyze trends
        trends = {
            "total_profiles": len(profiles),
            "successful_extractions": len(all_data),
            "profiles": all_data,
            "analysis_timestamp": datetime.now().isoformat()
        }
        
        return trends

# Usage
async def main():
    monitor = SocialMediaMonitor("YOUR_API_KEY")
    
    # Monitor single profile
    profile_data = await monitor.monitor_profile("twitter", "elonmusk")
    print(profile_data)
    
    # Track hashtag
    hashtag_posts = await monitor.track_hashtag("twitter", "#AI", limit=10)
    print(f"Found {len(hashtag_posts)} posts")
    
    # Analyze multiple profiles
    profiles = [
        {"platform": "twitter", "username": "openai"},
        {"platform": "twitter", "username": "anthropic"},
    ]
    trends = await monitor.analyze_engagement_trends(profiles)
    print(trends)

# asyncio.run(main())

Workflow Automation

Multi-Step Form Submission

Automate complex multi-step forms with validation.
from tessa_sdk import TessaClient
from typing import Dict, List

class FormAutomation:
    """Automate complex form submissions."""
    
    def __init__(self, api_key: str):
        self.client = TessaClient(api_key)
    
    def submit_multi_step_form(
        self,
        url: str,
        form_data: Dict[str, Dict],
        validate_each_step: bool = True
    ) -> Dict:
        """Submit multi-step form with validation."""
        
        with self.client as client:
            steps_instruction = []
            
            for step_name, step_data in form_data.items():
                step_fields = "\n".join([
                    f"   - {field}: {value}"
                    for field, value in step_data.items()
                ])
                
                step_instruction = f"""
                Step '{step_name}':
                {step_fields}
                """
                
                if validate_each_step:
                    step_instruction += "\n   - Verify all fields are filled correctly"
                    step_instruction += "\n   - Check for any validation errors"
                
                steps_instruction.append(step_instruction)
            
            full_instruction = f"""
            Go to {url} and complete the multi-step form:
            
            {"".join(steps_instruction)}
            
            After completing all steps:
            1. Submit the form
            2. Capture the confirmation message or number
            3. Take a screenshot of the confirmation page
            """
            
            job = client.run_browser_agent(
                directive=full_instruction,
                initial_url=url,
                browser_config={"max_duration_minutes": 15}
            )
            
            result = job.wait_for_completion(verbose=True)
            
            if result.is_successful:
                return {
                    "success": True,
                    "confirmation": result.output,
                    "credits_used": result.credits_used
                }
            else:
                return {
                    "success": False,
                    "error": result.error
                }
    
    def fill_dynamic_form(
        self,
        url: str,
        form_rules: List[Dict]
    ) -> Dict:
        """Fill form with conditional logic."""
        
        rules_instruction = "\n".join([
            f"- If {rule['condition']}, then {rule['action']}"
            for rule in form_rules
        ])
        
        result = self.client.run_and_wait(
            directive=f"""
            Go to {url} and fill the form following these rules:
            {rules_instruction}
            
            Submit the form and capture the result.
            """,
            initial_url=url,
            timeout=120
        )
        
        return result.output if result.is_successful else None

# Usage
automation = FormAutomation("YOUR_API_KEY")

# Multi-step form
form_data = {
    "Personal Information": {
        "first_name": "John",
        "last_name": "Doe",
        "email": "john.doe@example.com",
        "phone": "555-0100"
    },
    "Address": {
        "street": "123 Main St",
        "city": "San Francisco",
        "state": "CA",
        "zip": "94105"
    },
    "Preferences": {
        "newsletter": "Yes",
        "contact_method": "Email"
    }
}

result = automation.submit_multi_step_form(
    url="https://example.com/registration",
    form_data=form_data,
    validate_each_step=True
)

# Dynamic form with conditions
rules = [
    {"condition": "country field is 'USA'", "action": "show and fill state field"},
    {"condition": "age is over 18", "action": "check 'I agree to terms'"},
    {"condition": "subscription type is 'Premium'", "action": "fill payment details"}
]

dynamic_result = automation.fill_dynamic_form(
    url="https://example.com/signup",
    form_rules=rules
)

Automated Testing

Use Tessa for automated web application testing.
from tessa_sdk import BrowserAgent
from typing import List, Dict
import json

class WebAppTester:
    """Automated testing for web applications."""
    
    def __init__(self, api_key: str):
        self.agent = BrowserAgent(api_key)
        self.test_results = []
    
    def run_test_suite(self, base_url: str, test_cases: List[Dict]) -> Dict:
        """Run a suite of test cases."""
        
        passed = 0
        failed = 0
        
        for test in test_cases:
            print(f"Running test: {test['name']}")
            
            result = self.agent.run(
                f"""
                Test: {test['name']}
                1. Go to {base_url}{test.get('path', '')}
                2. {test['action']}
                3. Verify: {test['expected']}
                4. Return whether the test passed or failed with details
                """,
                initial_url=f"{base_url}{test.get('path', '')}",
                timeout=60
            )
            
            if result.is_successful:
                test_passed = self._evaluate_test_result(
                    result.output,
                    test['expected']
                )
                
                if test_passed:
                    passed += 1
                    status = "✅ PASSED"
                else:
                    failed += 1
                    status = "❌ FAILED"
                
                self.test_results.append({
                    "test": test['name'],
                    "status": status,
                    "details": result.output
                })
                
                print(f"  {status}")
            else:
                failed += 1
                self.test_results.append({
                    "test": test['name'],
                    "status": "❌ ERROR",
                    "error": result.error
                })
        
        # Generate report
        report = {
            "total_tests": len(test_cases),
            "passed": passed,
            "failed": failed,
            "pass_rate": f"{(passed/len(test_cases)*100):.1f}%",
            "results": self.test_results
        }
        
        # Save report
        with open("test_report.json", "w") as f:
            json.dump(report, f, indent=2)
        
        return report
    
    def _evaluate_test_result(self, output: Dict, expected: str) -> bool:
        """Evaluate if test passed based on output."""
        output_str = str(output).lower()
        expected_lower = expected.lower()
        
        # Simple evaluation - check if expected content is in output
        return expected_lower in output_str or "passed" in output_str
    
    def test_user_journey(self, journey: Dict) -> Dict:
        """Test a complete user journey."""
        
        steps = "\n".join([
            f"{i+1}. {step}" for i, step in enumerate(journey['steps'])
        ])
        
        result = self.agent.run(
            f"""
            Test User Journey: {journey['name']}
            
            Perform these steps in order:
            {steps}
            
            After each step, verify it completed successfully.
            Report any errors or unexpected behavior.
            """,
            initial_url=journey['start_url'],
            timeout=180
        )
        
        return {
            "journey": journey['name'],
            "success": result.is_successful,
            "result": result.output if result.is_successful else result.error
        }

# Usage
tester = WebAppTester("YOUR_API_KEY")

# Define test cases
test_cases = [
    {
        "name": "Homepage Load Test",
        "path": "/",
        "action": "Check if page loads within 3 seconds",
        "expected": "Page loaded successfully"
    },
    {
        "name": "Navigation Test",
        "path": "/",
        "action": "Click on 'About' link in navigation",
        "expected": "About page is displayed"
    },
    {
        "name": "Search Functionality",
        "path": "/",
        "action": "Search for 'test product' in search bar",
        "expected": "Search results are displayed"
    },
    {
        "name": "Mobile Responsiveness",
        "path": "/",
        "action": "Resize browser to mobile size (375x667)",
        "expected": "Mobile menu appears and layout adjusts"
    }
]

# Run tests
report = tester.run_test_suite(
    base_url="https://example.com",
    test_cases=test_cases
)

print(f"\nTest Results: {report['passed']}/{report['total_tests']} passed ({report['pass_rate']})")

# Test user journey
journey = {
    "name": "Purchase Flow",
    "start_url": "https://shop.example.com",
    "steps": [
        "Search for 'laptop'",
        "Click on the first product",
        "Add to cart",
        "Go to checkout",
        "Fill in test payment details",
        "Complete purchase"
    ]
}

journey_result = tester.test_user_journey(journey)

Performance Optimization

Batch Processing with Rate Limiting

Process large batches efficiently while respecting rate limits.
import asyncio
from typing import List, Any, Callable
from tessa_sdk import AsyncTessaClient
from tessa_sdk.exceptions import RateLimitError
import time

class BatchProcessor:
    """Process large batches with rate limiting and retries."""
    
    def __init__(
        self,
        api_key: str,
        max_concurrent: int = 5,
        rate_limit: int = 10,  # requests per minute
        max_retries: int = 3
    ):
        self.api_key = api_key
        self.max_concurrent = max_concurrent
        self.rate_limit = rate_limit
        self.max_retries = max_retries
        self.semaphore = asyncio.Semaphore(max_concurrent)
        self.request_times = []
    
    async def _rate_limit_check(self):
        """Check and enforce rate limiting."""
        now = time.time()
        # Remove requests older than 1 minute
        self.request_times = [t for t in self.request_times if now - t < 60]
        
        if len(self.request_times) >= self.rate_limit:
            # Wait until we can make another request
            sleep_time = 60 - (now - self.request_times[0]) + 1
            await asyncio.sleep(sleep_time)
        
        self.request_times.append(now)
    
    async def process_item(
        self,
        client: AsyncTessaClient,
        item: Any,
        processor_func: Callable
    ) -> Dict:
        """Process a single item with retries."""
        
        async with self.semaphore:
            await self._rate_limit_check()
            
            for attempt in range(self.max_retries):
                try:
                    result = await processor_func(client, item)
                    return {"item": item, "success": True, "result": result}
                    
                except RateLimitError as e:
                    if attempt < self.max_retries - 1:
                        await asyncio.sleep(e.retry_after or 10)
                    else:
                        return {"item": item, "success": False, "error": str(e)}
                        
                except Exception as e:
                    if attempt < self.max_retries - 1:
                        await asyncio.sleep(2 ** attempt)  # Exponential backoff
                    else:
                        return {"item": item, "success": False, "error": str(e)}
    
    async def process_batch(
        self,
        items: List[Any],
        processor_func: Callable,
        progress_callback: Callable = None
    ) -> List[Dict]:
        """Process a batch of items."""
        
        results = []
        completed = 0
        
        async with AsyncTessaClient(self.api_key) as client:
            tasks = []
            for item in items:
                task = self.process_item(client, item, processor_func)
                tasks.append(task)
            
            # Process with progress updates
            for task in asyncio.as_completed(tasks):
                result = await task
                results.append(result)
                completed += 1
                
                if progress_callback:
                    progress_callback(completed, len(items))
        
        return results

# Usage
async def extract_page_data(client: AsyncTessaClient, url: str) -> Dict:
    """Example processor function."""
    job = await client.run_browser_agent(
        f"Extract all text content from {url}",
        initial_url=url
    )
    result = await job.wait_for_completion(timeout=60)
    return result.output if result.is_successful else None

async def main():
    processor = BatchProcessor(
        api_key="YOUR_API_KEY",
        max_concurrent=3,
        rate_limit=20  # 20 requests per minute
    )
    
    urls = [f"https://example.com/page{i}" for i in range(50)]
    
    def progress(completed, total):
        print(f"Progress: {completed}/{total} ({completed/total*100:.1f}%)")
    
    results = await processor.process_batch(
        items=urls,
        processor_func=extract_page_data,
        progress_callback=progress
    )
    
    successful = sum(1 for r in results if r["success"])
    print(f"\nCompleted: {successful}/{len(results)} successful")

# asyncio.run(main())

See Also

I