Overview
This guide provides comprehensive examples of advanced browser automation scenarios using the Tessa SDK. Each example includes complete code with error handling and best practices.E-Commerce Automation
Price Monitoring System
Monitor product prices across multiple e-commerce sites with alerts.Copy
import json
import time
from datetime import datetime
from typing import List, Dict
from tessa_sdk import BrowserAgent, TessaClient
from tessa_sdk.exceptions import RateLimitError, TimeoutError
class PriceMonitor:
"""Monitor product prices across multiple sites."""
def __init__(self, api_key: str, alert_threshold: float = 0.1):
self.agent = BrowserAgent(api_key, residential_ip=True)
self.alert_threshold = alert_threshold # 10% price drop alert
self.price_history = {}
def monitor_product(self, product: Dict) -> Dict:
"""Monitor a single product."""
try:
result = self.agent.extract(
url=product["url"],
data_description="product name, current price, availability, and seller name"
)
if result.is_successful:
current_price = self._parse_price(result.output.get("price", ""))
product_name = result.output.get("name", "Unknown")
# Check for price drop
if product["url"] in self.price_history:
last_price = self.price_history[product["url"]]
if current_price < last_price * (1 - self.alert_threshold):
self._send_alert(product_name, last_price, current_price, product["url"])
# Update history
self.price_history[product["url"]] = current_price
return {
"url": product["url"],
"name": product_name,
"price": current_price,
"available": result.output.get("availability", "Unknown"),
"timestamp": datetime.now().isoformat(),
"credits_used": result.credits_used
}
except (RateLimitError, TimeoutError) as e:
print(f"Error monitoring {product['url']}: {e}")
return None
def _parse_price(self, price_str: str) -> float:
"""Parse price from string."""
import re
# Extract numeric value from price string
match = re.search(r'[\d,]+\.?\d*', price_str.replace(',', ''))
return float(match.group()) if match else 0.0
def _send_alert(self, product_name: str, old_price: float, new_price: float, url: str):
"""Send price drop alert."""
discount = (old_price - new_price) / old_price * 100
print(f"🔔 PRICE ALERT: {product_name}")
print(f" Price dropped {discount:.1f}% from ${old_price:.2f} to ${new_price:.2f}")
print(f" URL: {url}")
# Here you could send email, SMS, or push notification
def monitor_all(self, products: List[Dict], interval_minutes: int = 60):
"""Continuously monitor all products."""
while True:
results = []
for product in products:
result = self.monitor_product(product)
if result:
results.append(result)
time.sleep(2) # Avoid rate limiting
# Save results
with open(f"price_monitor_{datetime.now().strftime('%Y%m%d')}.json", "a") as f:
json.dump({"timestamp": datetime.now().isoformat(), "results": results}, f)
f.write("\n")
print(f"Monitored {len(results)} products. Next check in {interval_minutes} minutes.")
time.sleep(interval_minutes * 60)
# Usage
monitor = PriceMonitor("YOUR_API_KEY", alert_threshold=0.05)
products = [
{"url": "https://amazon.com/dp/B08N5WRWNW", "name": "Echo Dot"},
{"url": "https://amazon.com/dp/B07FZ8S74R", "name": "Kindle"},
]
# Run monitoring (continuous)
# monitor.monitor_all(products, interval_minutes=30)
# Or single check
result = monitor.monitor_product(products[0])
print(result)
Inventory Tracker
Track product availability and get notified when items are back in stock.Copy
import asyncio
from typing import List, Dict
from tessa_sdk import AsyncTessaClient
from datetime import datetime
class InventoryTracker:
"""Track product inventory across multiple sites."""
def __init__(self, api_key: str):
self.api_key = api_key
self.out_of_stock_items = set()
async def check_availability(self, product_url: str) -> Dict:
"""Check if a product is available."""
async with AsyncTessaClient(self.api_key) as client:
job = await client.run_browser_agent(
f"""Go to {product_url} and check:
1. Is the product in stock?
2. How many units are available?
3. Are there any size/color variants available?
4. What's the estimated delivery date?
""",
initial_url=product_url
)
result = await job.wait_for_completion(timeout=60)
if result.is_successful:
return {
"url": product_url,
"in_stock": self._parse_availability(result.output),
"details": result.output,
"checked_at": datetime.now().isoformat()
}
return None
def _parse_availability(self, output: Dict) -> bool:
"""Parse availability from output."""
if isinstance(output, dict):
in_stock_indicators = ["in stock", "available", "add to cart"]
output_str = str(output).lower()
return any(indicator in output_str for indicator in in_stock_indicators)
return False
async def track_multiple(self, products: List[str]) -> List[Dict]:
"""Track multiple products concurrently."""
async with AsyncTessaClient(self.api_key) as client:
tasks = []
for url in products:
task = client.run_browser_agent(
f"Check if product at {url} is in stock and get availability details",
initial_url=url
)
tasks.append(task)
jobs = await asyncio.gather(*tasks)
results = await asyncio.gather(*[
job.wait_for_completion(timeout=60) for job in jobs
])
availability = []
for url, result in zip(products, results):
if result.is_successful:
is_available = self._parse_availability(result.output)
# Check for restock
if url in self.out_of_stock_items and is_available:
print(f"🎉 BACK IN STOCK: {url}")
self.out_of_stock_items.remove(url)
elif not is_available:
self.out_of_stock_items.add(url)
availability.append({
"url": url,
"available": is_available,
"details": result.output
})
return availability
# Usage
async def main():
tracker = InventoryTracker("YOUR_API_KEY")
products = [
"https://store.example.com/product1",
"https://store.example.com/product2",
]
results = await tracker.track_multiple(products)
for result in results:
status = "✅ In Stock" if result["available"] else "❌ Out of Stock"
print(f"{result['url']}: {status}")
# asyncio.run(main())
Data Extraction & Web Scraping
Multi-Page Data Extraction
Extract data from paginated results.Copy
from tessa_sdk import TessaClient, BrowserConfig
from typing import List, Dict
class PaginatedScraper:
"""Scrape data from paginated websites."""
def __init__(self, api_key: str):
self.client = TessaClient(api_key)
def scrape_all_pages(
self,
base_url: str,
data_description: str,
max_pages: int = None
) -> List[Dict]:
"""Scrape data from all pages."""
all_data = []
page = 1
with self.client as client:
while True:
if max_pages and page > max_pages:
break
print(f"Scraping page {page}...")
job = client.run_browser_agent(
f"""
1. Go to {base_url} (page {page} if applicable)
2. Extract: {data_description}
3. Check if there's a 'Next' button or page {page + 1}
4. Return the data and whether more pages exist
""",
initial_url=f"{base_url}?page={page}" if page > 1 else base_url,
browser_config=BrowserConfig(
width=1920,
height=1080,
max_duration_minutes=5
)
)
result = job.wait_for_completion(verbose=True)
if result.is_successful:
page_data = result.output.get("data", [])
has_next = result.output.get("has_next_page", False)
all_data.extend(page_data if isinstance(page_data, list) else [page_data])
if not has_next:
print(f"Reached last page (page {page})")
break
page += 1
else:
print(f"Failed to scrape page {page}: {result.error}")
break
return all_data
def scrape_with_infinite_scroll(
self,
url: str,
data_description: str,
scroll_count: int = 5
) -> List[Dict]:
"""Scrape from infinite scroll pages."""
with self.client as client:
job = client.run_browser_agent(
f"""
1. Go to {url}
2. Scroll down {scroll_count} times, waiting for content to load each time
3. After each scroll, extract new items matching: {data_description}
4. Return all unique items found
""",
initial_url=url,
browser_config=BrowserConfig(
max_duration_minutes=10
)
)
result = job.wait_for_completion()
if result.is_successful:
return result.output
else:
print(f"Scraping failed: {result.error}")
return []
# Usage
scraper = PaginatedScraper("YOUR_API_KEY")
# Scrape paginated results
data = scraper.scrape_all_pages(
base_url="https://news.site.com/articles",
data_description="article titles, authors, dates, and URLs",
max_pages=5
)
print(f"Scraped {len(data)} items")
Dynamic Content Extraction
Extract data from JavaScript-heavy sites with dynamic content.Copy
from tessa_sdk import BrowserAgent
class DynamicContentExtractor:
"""Extract data from dynamic JavaScript-rendered content."""
def __init__(self, api_key: str):
self.agent = BrowserAgent(api_key, verbose=True)
def extract_spa_content(self, url: str, wait_conditions: List[str]) -> Dict:
"""Extract from Single Page Applications."""
wait_instruction = " AND ".join(wait_conditions)
result = self.agent.run(f"""
1. Navigate to {url}
2. Wait until {wait_instruction}
3. Extract all visible content including:
- Text content
- Image URLs and alt texts
- Link URLs and anchor texts
- Form fields and their values
4. Capture any dynamically loaded data
""", initial_url=url, timeout=120)
return result.output if result.is_successful else None
def extract_after_interaction(
self,
url: str,
interactions: List[str],
target_data: str
) -> Dict:
"""Extract data after performing interactions."""
interaction_steps = "\n".join([
f"{i+1}. {action}" for i, action in enumerate(interactions)
])
result = self.agent.run(f"""
Navigate to {url}
{interaction_steps}
After completing all interactions, extract: {target_data}
""", initial_url=url)
return result.output if result.is_successful else None
# Usage
extractor = DynamicContentExtractor("YOUR_API_KEY")
# Extract from SPA
spa_data = extractor.extract_spa_content(
url="https://spa-app.example.com",
wait_conditions=[
"the loading spinner disappears",
"product cards are visible",
"the price information loads"
]
)
# Extract after interactions
interactive_data = extractor.extract_after_interaction(
url="https://interactive-site.com",
interactions=[
"Click on 'Show More' button",
"Select 'All Categories' from dropdown",
"Click 'Apply Filters' button",
"Wait for results to update"
],
target_data="filtered product list with names, prices, and ratings"
)
Social Media Analytics
Social Media Monitoring
Monitor social media profiles and extract analytics.Copy
import asyncio
from datetime import datetime, timedelta
from typing import List, Dict
from tessa_sdk import AsyncTessaClient
class SocialMediaMonitor:
"""Monitor social media profiles and content."""
def __init__(self, api_key: str):
self.api_key = api_key
async def monitor_profile(self, platform: str, username: str) -> Dict:
"""Monitor a social media profile."""
platform_urls = {
"twitter": f"https://x.com/{username}",
"instagram": f"https://instagram.com/{username}",
"linkedin": f"https://linkedin.com/in/{username}",
"youtube": f"https://youtube.com/@{username}"
}
url = platform_urls.get(platform.lower())
if not url:
raise ValueError(f"Unsupported platform: {platform}")
async with AsyncTessaClient(self.api_key) as client:
job = await client.run_browser_agent(
f"""
Go to {url} and extract:
1. Profile statistics (followers, following, posts count)
2. Recent posts (last 5) with:
- Content/caption
- Engagement metrics (likes, comments, shares)
- Post timestamp
3. Profile bio/description
4. Verification status
""",
initial_url=url,
browser_config={"residential_ip": True}
)
result = await job.wait_for_completion(timeout=90)
if result.is_successful:
return {
"platform": platform,
"username": username,
"data": result.output,
"monitored_at": datetime.now().isoformat()
}
return None
async def track_hashtag(self, platform: str, hashtag: str, limit: int = 20) -> List[Dict]:
"""Track posts with specific hashtag."""
hashtag = hashtag.lstrip('#')
async with AsyncTessaClient(self.api_key) as client:
job = await client.run_browser_agent(
f"""
Search for #{hashtag} on {platform} and extract the first {limit} posts:
- Post author and username
- Post content
- Engagement metrics
- Post timestamp
- Media type (photo/video/text)
""",
browser_config={"residential_ip": True}
)
result = await job.wait_for_completion(timeout=120)
if result.is_successful:
return result.output
return []
async def analyze_engagement_trends(
self,
profiles: List[Dict[str, str]]
) -> Dict:
"""Analyze engagement trends across multiple profiles."""
all_data = []
async with AsyncTessaClient(self.api_key) as client:
tasks = []
for profile in profiles:
task = self.monitor_profile(
profile["platform"],
profile["username"]
)
tasks.append(task)
results = await asyncio.gather(*tasks, return_exceptions=True)
for result in results:
if isinstance(result, dict):
all_data.append(result)
# Analyze trends
trends = {
"total_profiles": len(profiles),
"successful_extractions": len(all_data),
"profiles": all_data,
"analysis_timestamp": datetime.now().isoformat()
}
return trends
# Usage
async def main():
monitor = SocialMediaMonitor("YOUR_API_KEY")
# Monitor single profile
profile_data = await monitor.monitor_profile("twitter", "elonmusk")
print(profile_data)
# Track hashtag
hashtag_posts = await monitor.track_hashtag("twitter", "#AI", limit=10)
print(f"Found {len(hashtag_posts)} posts")
# Analyze multiple profiles
profiles = [
{"platform": "twitter", "username": "openai"},
{"platform": "twitter", "username": "anthropic"},
]
trends = await monitor.analyze_engagement_trends(profiles)
print(trends)
# asyncio.run(main())
Workflow Automation
Multi-Step Form Submission
Automate complex multi-step forms with validation.Copy
from tessa_sdk import TessaClient
from typing import Dict, List
class FormAutomation:
"""Automate complex form submissions."""
def __init__(self, api_key: str):
self.client = TessaClient(api_key)
def submit_multi_step_form(
self,
url: str,
form_data: Dict[str, Dict],
validate_each_step: bool = True
) -> Dict:
"""Submit multi-step form with validation."""
with self.client as client:
steps_instruction = []
for step_name, step_data in form_data.items():
step_fields = "\n".join([
f" - {field}: {value}"
for field, value in step_data.items()
])
step_instruction = f"""
Step '{step_name}':
{step_fields}
"""
if validate_each_step:
step_instruction += "\n - Verify all fields are filled correctly"
step_instruction += "\n - Check for any validation errors"
steps_instruction.append(step_instruction)
full_instruction = f"""
Go to {url} and complete the multi-step form:
{"".join(steps_instruction)}
After completing all steps:
1. Submit the form
2. Capture the confirmation message or number
3. Take a screenshot of the confirmation page
"""
job = client.run_browser_agent(
directive=full_instruction,
initial_url=url,
browser_config={"max_duration_minutes": 15}
)
result = job.wait_for_completion(verbose=True)
if result.is_successful:
return {
"success": True,
"confirmation": result.output,
"credits_used": result.credits_used
}
else:
return {
"success": False,
"error": result.error
}
def fill_dynamic_form(
self,
url: str,
form_rules: List[Dict]
) -> Dict:
"""Fill form with conditional logic."""
rules_instruction = "\n".join([
f"- If {rule['condition']}, then {rule['action']}"
for rule in form_rules
])
result = self.client.run_and_wait(
directive=f"""
Go to {url} and fill the form following these rules:
{rules_instruction}
Submit the form and capture the result.
""",
initial_url=url,
timeout=120
)
return result.output if result.is_successful else None
# Usage
automation = FormAutomation("YOUR_API_KEY")
# Multi-step form
form_data = {
"Personal Information": {
"first_name": "John",
"last_name": "Doe",
"email": "john.doe@example.com",
"phone": "555-0100"
},
"Address": {
"street": "123 Main St",
"city": "San Francisco",
"state": "CA",
"zip": "94105"
},
"Preferences": {
"newsletter": "Yes",
"contact_method": "Email"
}
}
result = automation.submit_multi_step_form(
url="https://example.com/registration",
form_data=form_data,
validate_each_step=True
)
# Dynamic form with conditions
rules = [
{"condition": "country field is 'USA'", "action": "show and fill state field"},
{"condition": "age is over 18", "action": "check 'I agree to terms'"},
{"condition": "subscription type is 'Premium'", "action": "fill payment details"}
]
dynamic_result = automation.fill_dynamic_form(
url="https://example.com/signup",
form_rules=rules
)
Automated Testing
Use Tessa for automated web application testing.Copy
from tessa_sdk import BrowserAgent
from typing import List, Dict
import json
class WebAppTester:
"""Automated testing for web applications."""
def __init__(self, api_key: str):
self.agent = BrowserAgent(api_key)
self.test_results = []
def run_test_suite(self, base_url: str, test_cases: List[Dict]) -> Dict:
"""Run a suite of test cases."""
passed = 0
failed = 0
for test in test_cases:
print(f"Running test: {test['name']}")
result = self.agent.run(
f"""
Test: {test['name']}
1. Go to {base_url}{test.get('path', '')}
2. {test['action']}
3. Verify: {test['expected']}
4. Return whether the test passed or failed with details
""",
initial_url=f"{base_url}{test.get('path', '')}",
timeout=60
)
if result.is_successful:
test_passed = self._evaluate_test_result(
result.output,
test['expected']
)
if test_passed:
passed += 1
status = "✅ PASSED"
else:
failed += 1
status = "❌ FAILED"
self.test_results.append({
"test": test['name'],
"status": status,
"details": result.output
})
print(f" {status}")
else:
failed += 1
self.test_results.append({
"test": test['name'],
"status": "❌ ERROR",
"error": result.error
})
# Generate report
report = {
"total_tests": len(test_cases),
"passed": passed,
"failed": failed,
"pass_rate": f"{(passed/len(test_cases)*100):.1f}%",
"results": self.test_results
}
# Save report
with open("test_report.json", "w") as f:
json.dump(report, f, indent=2)
return report
def _evaluate_test_result(self, output: Dict, expected: str) -> bool:
"""Evaluate if test passed based on output."""
output_str = str(output).lower()
expected_lower = expected.lower()
# Simple evaluation - check if expected content is in output
return expected_lower in output_str or "passed" in output_str
def test_user_journey(self, journey: Dict) -> Dict:
"""Test a complete user journey."""
steps = "\n".join([
f"{i+1}. {step}" for i, step in enumerate(journey['steps'])
])
result = self.agent.run(
f"""
Test User Journey: {journey['name']}
Perform these steps in order:
{steps}
After each step, verify it completed successfully.
Report any errors or unexpected behavior.
""",
initial_url=journey['start_url'],
timeout=180
)
return {
"journey": journey['name'],
"success": result.is_successful,
"result": result.output if result.is_successful else result.error
}
# Usage
tester = WebAppTester("YOUR_API_KEY")
# Define test cases
test_cases = [
{
"name": "Homepage Load Test",
"path": "/",
"action": "Check if page loads within 3 seconds",
"expected": "Page loaded successfully"
},
{
"name": "Navigation Test",
"path": "/",
"action": "Click on 'About' link in navigation",
"expected": "About page is displayed"
},
{
"name": "Search Functionality",
"path": "/",
"action": "Search for 'test product' in search bar",
"expected": "Search results are displayed"
},
{
"name": "Mobile Responsiveness",
"path": "/",
"action": "Resize browser to mobile size (375x667)",
"expected": "Mobile menu appears and layout adjusts"
}
]
# Run tests
report = tester.run_test_suite(
base_url="https://example.com",
test_cases=test_cases
)
print(f"\nTest Results: {report['passed']}/{report['total_tests']} passed ({report['pass_rate']})")
# Test user journey
journey = {
"name": "Purchase Flow",
"start_url": "https://shop.example.com",
"steps": [
"Search for 'laptop'",
"Click on the first product",
"Add to cart",
"Go to checkout",
"Fill in test payment details",
"Complete purchase"
]
}
journey_result = tester.test_user_journey(journey)
Performance Optimization
Batch Processing with Rate Limiting
Process large batches efficiently while respecting rate limits.Copy
import asyncio
from typing import List, Any, Callable
from tessa_sdk import AsyncTessaClient
from tessa_sdk.exceptions import RateLimitError
import time
class BatchProcessor:
"""Process large batches with rate limiting and retries."""
def __init__(
self,
api_key: str,
max_concurrent: int = 5,
rate_limit: int = 10, # requests per minute
max_retries: int = 3
):
self.api_key = api_key
self.max_concurrent = max_concurrent
self.rate_limit = rate_limit
self.max_retries = max_retries
self.semaphore = asyncio.Semaphore(max_concurrent)
self.request_times = []
async def _rate_limit_check(self):
"""Check and enforce rate limiting."""
now = time.time()
# Remove requests older than 1 minute
self.request_times = [t for t in self.request_times if now - t < 60]
if len(self.request_times) >= self.rate_limit:
# Wait until we can make another request
sleep_time = 60 - (now - self.request_times[0]) + 1
await asyncio.sleep(sleep_time)
self.request_times.append(now)
async def process_item(
self,
client: AsyncTessaClient,
item: Any,
processor_func: Callable
) -> Dict:
"""Process a single item with retries."""
async with self.semaphore:
await self._rate_limit_check()
for attempt in range(self.max_retries):
try:
result = await processor_func(client, item)
return {"item": item, "success": True, "result": result}
except RateLimitError as e:
if attempt < self.max_retries - 1:
await asyncio.sleep(e.retry_after or 10)
else:
return {"item": item, "success": False, "error": str(e)}
except Exception as e:
if attempt < self.max_retries - 1:
await asyncio.sleep(2 ** attempt) # Exponential backoff
else:
return {"item": item, "success": False, "error": str(e)}
async def process_batch(
self,
items: List[Any],
processor_func: Callable,
progress_callback: Callable = None
) -> List[Dict]:
"""Process a batch of items."""
results = []
completed = 0
async with AsyncTessaClient(self.api_key) as client:
tasks = []
for item in items:
task = self.process_item(client, item, processor_func)
tasks.append(task)
# Process with progress updates
for task in asyncio.as_completed(tasks):
result = await task
results.append(result)
completed += 1
if progress_callback:
progress_callback(completed, len(items))
return results
# Usage
async def extract_page_data(client: AsyncTessaClient, url: str) -> Dict:
"""Example processor function."""
job = await client.run_browser_agent(
f"Extract all text content from {url}",
initial_url=url
)
result = await job.wait_for_completion(timeout=60)
return result.output if result.is_successful else None
async def main():
processor = BatchProcessor(
api_key="YOUR_API_KEY",
max_concurrent=3,
rate_limit=20 # 20 requests per minute
)
urls = [f"https://example.com/page{i}" for i in range(50)]
def progress(completed, total):
print(f"Progress: {completed}/{total} ({completed/total*100:.1f}%)")
results = await processor.process_batch(
items=urls,
processor_func=extract_page_data,
progress_callback=progress
)
successful = sum(1 for r in results if r["success"])
print(f"\nCompleted: {successful}/{len(results)} successful")
# asyncio.run(main())
See Also
- Quickstart Guide - Get started quickly
- Error Handling - Handle errors gracefully
- AsyncTessaClient - Async operations
- API Reference - REST API documentation