Skip to content

Python API Examples

Complete Python code examples for integrating with the OpenProspect API, from basic requests to advanced workflows.


🚀 Installation & Setup

Install Dependencies

pip install requests python-dotenv pandas asyncio aiohttp

Basic Configuration

import os
import requests
from dotenv import load_dotenv

# Load environment variables
load_dotenv()

# Configuration
API_BASE_URL = os.getenv("OPENPROSPECT_API_URL", "http://localhost:8000/api/v1")
AUTH_TOKEN = os.getenv("OPENPROSPECT_API_KEY")  # Set via: export OPENPROSPECT_API_KEY=your_token

# Create session with default headers
session = requests.Session()
session.headers.update({
    "Authorization": f"Bearer {AUTH_TOKEN}",
    "Content-Type": "application/json",
    "User-Agent": "OpenProspect-Python-Client/1.0"
})

📚 Basic API Operations

1. Prospect Search Management

# Create a new prospect search
def create_prospect_search(name, criteria):
    """Create a new prospect search with ICP criteria"""

    data = {
        "name": name,
        "description": f"Search for {criteria.get('industry', 'companies')}",
        "target_criteria": criteria,
        "ideal_customer_profile": {
            "company_description": criteria.get("description", ""),
            "target_roles": criteria.get("roles", ["Decision Maker"]),
            "pain_points": criteria.get("pain_points", [])
        }
    }

    response = session.post(f"{API_BASE_URL}/prospect-searches", json=data)
    response.raise_for_status()
    return response.json()

# Example usage
search = create_prospect_search(
    "B2B SaaS Prospects Q1 2024",
    {
        "industry": "Software",
        "employee_count": {"min": 50, "max": 500},
        "location": ["United States", "Canada"],
        "technologies": ["Salesforce", "HubSpot"],
        "description": "B2B SaaS companies with growing sales teams",
        "roles": ["VP Sales", "Head of Sales", "Sales Director"],
        "pain_points": ["Manual prospect research", "Low email deliverability"]
    }
)

print(f"Created search: {search['id']}")

2. Start Discovery Process

import time

def start_crawling(prospect_search_id, max_companies=100):
    """Start crawling to discover companies"""

    data = {
        "prospect_search_id": prospect_search_id,
        "max_companies": max_companies,
        "parallel_tasks": 5,
        "enable_screenshots": True
    }

    response = session.post(f"{API_BASE_URL}/crawl/start", json=data)
    response.raise_for_status()
    return response.json()

def monitor_task_progress(task_id, check_interval=30):
    """Monitor task progress until completion"""

    while True:
        response = session.get(f"{API_BASE_URL}/crawl/status/{task_id}")
        response.raise_for_status()
        status = response.json()

        print(f"Status: {status['status']} - Progress: {status['progress']}%")

        if status['status'] in ['COMPLETED', 'FAILED']:
            return status

        time.sleep(check_interval)

# Start crawling
crawl_task = start_crawling(search['id'], max_companies=50)
print(f"Crawling started: Task {crawl_task['task_id']}")

# Monitor progress
result = monitor_task_progress(crawl_task['task_id'])
print(f"Crawling completed: {result['companies_discovered']} companies found")

3. Retrieve and Filter Prospects

import pandas as pd

def get_prospects(prospect_search_id, filters=None):
    """Get prospects with optional filtering"""

    params = {"prospect_search_id": prospect_search_id}
    if filters:
        params.update(filters)

    response = session.get(f"{API_BASE_URL}/prospects", params=params)
    response.raise_for_status()
    return response.json()

def prospects_to_dataframe(prospects):
    """Convert prospects to pandas DataFrame for analysis"""

    data = []
    for p in prospects:
        data.append({
            "company_name": p["company"]["name"],
            "website": p["company"]["website"],
            "employee_count": p["company"]["employee_count"],
            "contact_name": p["contact"]["name"],
            "contact_title": p["contact"]["title"],
            "contact_email": p["contact"]["email"],
            "qualification_score": p["qualification_score"],
            "email_status": p["contact"]["email_status"]
        })

    return pd.DataFrame(data)

# Get high-quality prospects
prospects = get_prospects(
    search['id'],
    filters={
        "qualification_score_min": 8.0,
        "email_status": "deliverable",
        "sort_by": "qualification_score",
        "sort_order": "desc",
        "limit": 100
    }
)

# Convert to DataFrame for analysis
df = prospects_to_dataframe(prospects['prospects'])
print(f"\nTop Prospects (Score >= 8.0):")
print(df.head(10))

# Analyze the results
print(f"\nProspect Analysis:")
print(f"Total prospects: {len(df)}")
print(f"Average qualification score: {df['qualification_score'].mean():.2f}")
print(f"Email deliverability rate: {(df['email_status'] == 'deliverable').mean():.1%}")

🧠 Advanced Operations

1. AI-Powered Analysis

def trigger_ai_analysis(prospect_search_id, analysis_type="comprehensive"):
    """Trigger AI analysis for prospect qualification"""

    data = {
        "prospect_search_id": prospect_search_id,
        "analysis_type": analysis_type,
        "enable_competitive_intel": True,
        "enable_buying_signals": True,
        "batch_size": 25
    }

    response = session.post(f"{API_BASE_URL}/analysis/trigger-bulk", json=data)
    response.raise_for_status()
    return response.json()

def get_ai_insights(prospect_id):
    """Get detailed AI insights for a specific prospect"""

    response = session.get(f"{API_BASE_URL}/prospects/{prospect_id}/insights")
    response.raise_for_status()
    return response.json()

# Trigger analysis
analysis_task = trigger_ai_analysis(search['id'])
print(f"AI Analysis started: {analysis_task['task_id']}")

# Wait for completion then get insights
time.sleep(60)  # Or use monitor_task_progress()

# Get insights for top prospect
top_prospect = prospects['prospects'][0]
insights = get_ai_insights(top_prospect['id'])

print(f"\nAI Insights for {top_prospect['company']['name']}:")
print(f"- Pain Points: {', '.join(insights['pain_points'])}")
print(f"- Buying Signals: {', '.join(insights['buying_signals'])}")
print(f"- Recommended Approach: {insights['recommended_approach']}")
print(f"- Estimated Deal Size: {insights['estimated_deal_size']}")

2. Email Generation

def generate_personalized_email(prospect_id, template="default"):
    """Generate personalized email for a prospect"""

    data = {
        "prospect_id": prospect_id,
        "template": template,
        "tone": "professional-friendly",
        "personalization_level": "high",
        "include_pain_points": True,
        "include_social_proof": True
    }

    response = session.post(f"{API_BASE_URL}/email-generation/generate", json=data)
    response.raise_for_status()
    return response.json()

def generate_bulk_emails(prospect_ids, campaign_name):
    """Generate emails for multiple prospects"""

    data = {
        "prospect_ids": prospect_ids,
        "campaign_name": campaign_name,
        "template": "saas_value_prop",
        "ab_test_variations": 3,
        "quality_threshold": 8.0
    }

    response = session.post(f"{API_BASE_URL}/email-generation/bulk", json=data)
    response.raise_for_status()
    return response.json()

# Generate email for top prospect
email = generate_personalized_email(top_prospect['id'])
print(f"\nGenerated Email:")
print(f"Subject: {email['subject_line']}")
print(f"Preview: {email['body'][:200]}...")
print(f"Quality Score: {email['quality_score']}/10")

# Bulk generation for campaign
prospect_ids = [p['id'] for p in prospects['prospects'][:20]]
campaign = generate_bulk_emails(prospect_ids, "Q1 Hot Prospects Campaign")
print(f"\nGenerated {campaign['emails_generated']} emails for campaign")

3. Webhook Configuration

def create_webhook(name, url, prospect_search_id, filters=None):
    """Create webhook for real-time data delivery"""

    data = {
        "name": name,
        "webhook_url": url,
        "prospect_search_id": prospect_search_id,
        "frequency": "realtime",
        "enabled": True,
        "filters": filters or {},
        "retry_config": {
            "max_retries": 3,
            "retry_delay": 60
        }
    }

    response = session.post(f"{API_BASE_URL}/delivery/webhooks", json=data)
    response.raise_for_status()
    return response.json()

def test_webhook(webhook_id):
    """Test webhook configuration"""

    response = session.post(f"{API_BASE_URL}/delivery/webhooks/{webhook_id}/test")
    response.raise_for_status()
    return response.json()

# Create webhook for hot prospects
webhook = create_webhook(
    "Hot Prospects to CRM",
    "https://your-crm.com/webhooks/prospects",
    search['id'],
    filters={
        "qualification_score_min": 8.5,
        "email_status": "deliverable"
    }
)

print(f"Webhook created: {webhook['id']}")

# Test the webhook
test_result = test_webhook(webhook['id'])
print(f"Webhook test: {test_result['status']} - {test_result['message']}")

🔄 Async Operations

Async Client for High Performance

import asyncio
import aiohttp

class AsyncOpenProspectClient:
    def __init__(self, api_base, token):
        self.api_base = api_base
        self.headers = {
            "Authorization": f"Bearer {token}",
            "Content-Type": "application/json"
        }
        self.session = None

    async def __aenter__(self):
        self.session = aiohttp.ClientSession(headers=self.headers)
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        await self.session.close()

    async def get_prospect(self, prospect_id):
        async with self.session.get(f"{self.api_base}/prospects/{prospect_id}") as resp:
            return await resp.json()

    async def get_multiple_prospects(self, prospect_ids):
        """Fetch multiple prospects concurrently"""
        tasks = [self.get_prospect(pid) for pid in prospect_ids]
        return await asyncio.gather(*tasks)

# Usage example
async def fetch_prospects_async():
    async with AsyncOpenProspectClient(API_BASE_URL, AUTH_TOKEN) as client:
        # Fetch 50 prospects concurrently
        prospect_ids = [p['id'] for p in prospects['prospects'][:50]]
        results = await client.get_multiple_prospects(prospect_ids)

        print(f"Fetched {len(results)} prospects asynchronously")
        return results

# Run async operation
prospects_detailed = asyncio.run(fetch_prospects_async())

📊 Data Export & Reporting

Export to Different Formats

def export_prospects(prospect_search_id, format="csv", filters=None):
    """Export prospects in various formats"""

    data = {
        "prospect_search_id": prospect_search_id,
        "format": format,
        "filters": filters or {},
        "include_fields": [
            "company_name", "website", "employee_count",
            "contact_name", "contact_title", "contact_email",
            "qualification_score", "ai_insights"
        ]
    }

    response = session.post(f"{API_BASE_URL}/export/request", json=data)
    response.raise_for_status()
    return response.json()

def download_export(export_id, output_file):
    """Download completed export"""

    response = session.get(
        f"{API_BASE_URL}/export/download/{export_id}",
        stream=True
    )
    response.raise_for_status()

    with open(output_file, 'wb') as f:
        for chunk in response.iter_content(chunk_size=8192):
            f.write(chunk)

    print(f"Export saved to: {output_file}")

# Request export
export_job = export_prospects(
    search['id'],
    format="xlsx",
    filters={"qualification_score_min": 8.0}
)

print(f"Export job created: {export_job['id']}")

# Wait for completion then download
time.sleep(30)
download_export(export_job['id'], "hot_prospects.xlsx")

Generate Analytics Report

def generate_analytics_report(prospect_search_id):
    """Generate comprehensive analytics report"""

    # Get search analytics
    response = session.get(f"{API_BASE_URL}/prospect-searches/{prospect_search_id}/analytics")
    response.raise_for_status()
    analytics = response.json()

    # Create report
    report = f"""
    Prospect Search Analytics Report
    ================================
    Search ID: {prospect_search_id}
    Generated: {pd.Timestamp.now()}

    Discovery Metrics:
    - Companies Analyzed: {analytics['companies_analyzed']}
    - Prospects Found: {analytics['prospects_discovered']}
    - Qualification Rate: {analytics['qualification_rate']:.1%}

    Quality Metrics:
    - Average Score: {analytics['avg_qualification_score']:.1f}/10
    - High Quality (8+): {analytics['high_quality_count']} ({analytics['high_quality_percentage']:.1%})
    - Email Deliverability: {analytics['email_deliverability_rate']:.1%}

    Engagement Potential:
    - With Buying Signals: {analytics['buying_signals_count']}
    - Ready to Buy: {analytics['ready_now_count']}
    - Estimated Pipeline: ${analytics['estimated_pipeline_value']:,.0f}

    Top Industries:
    {pd.DataFrame(analytics['top_industries']).to_string()}

    Top Technologies:
    {pd.DataFrame(analytics['top_technologies']).to_string()}
    """

    return report

# Generate report
report = generate_analytics_report(search['id'])
print(report)

# Save to file
with open(f"prospect_report_{search['id']}.txt", "w") as f:
    f.write(report)

🛠️ Complete Integration Example

End-to-End Workflow Class

class OpenProspectWorkflow:
    """Complete OpenProspect workflow automation"""

    def __init__(self, api_base, token):
        self.api_base = api_base
        self.session = requests.Session()
        self.session.headers.update({
            "Authorization": f"Bearer {token}",
            "Content-Type": "application/json"
        })

    def create_search(self, name, criteria):
        """Step 1: Create prospect search"""
        response = self.session.post(
            f"{self.api_base}/prospect-searches",
            json={
                "name": name,
                "target_criteria": criteria
            }
        )
        response.raise_for_status()
        return response.json()

    def discover_prospects(self, search_id, max_companies=100):
        """Step 2: Discover companies"""
        # Start crawling
        response = self.session.post(
            f"{self.api_base}/crawl/start",
            json={
                "prospect_search_id": search_id,
                "max_companies": max_companies
            }
        )
        response.raise_for_status()
        task = response.json()

        # Wait for completion
        return self._wait_for_task(task['task_id'])

    def qualify_prospects(self, search_id):
        """Step 3: AI qualification"""
        response = self.session.post(
            f"{self.api_base}/analysis/trigger-bulk",
            json={
                "prospect_search_id": search_id,
                "analysis_type": "comprehensive"
            }
        )
        response.raise_for_status()
        task = response.json()

        return self._wait_for_task(task['task_id'])

    def generate_emails(self, search_id, min_score=8.0):
        """Step 4: Generate personalized emails"""
        # Get qualified prospects
        response = self.session.get(
            f"{self.api_base}/prospects",
            params={
                "prospect_search_id": search_id,
                "qualification_score_min": min_score,
                "limit": 50
            }
        )
        response.raise_for_status()
        prospects = response.json()['prospects']

        # Generate emails
        prospect_ids = [p['id'] for p in prospects]
        response = self.session.post(
            f"{self.api_base}/email-generation/bulk",
            json={
                "prospect_ids": prospect_ids,
                "campaign_name": f"Campaign for {search_id}"
            }
        )
        response.raise_for_status()

        return response.json()

    def setup_delivery(self, search_id, webhook_url):
        """Step 5: Configure delivery"""
        response = self.session.post(
            f"{self.api_base}/delivery/webhooks",
            json={
                "name": f"Delivery for {search_id}",
                "webhook_url": webhook_url,
                "prospect_search_id": search_id,
                "frequency": "hourly"
            }
        )
        response.raise_for_status()
        return response.json()

    def _wait_for_task(self, task_id, check_interval=30):
        """Wait for async task completion"""
        while True:
            response = self.session.get(f"{self.api_base}/tasks/{task_id}")
            response.raise_for_status()
            status = response.json()

            if status['status'] in ['COMPLETED', 'FAILED']:
                return status

            time.sleep(check_interval)

    def run_complete_workflow(self, name, criteria, webhook_url):
        """Run complete prospect discovery workflow"""
        print("🚀 Starting OpenProspect workflow...")

        # Create search
        print("1️⃣ Creating prospect search...")
        search = self.create_search(name, criteria)
        print(f"   ✅ Search created: {search['id']}")

        # Discover prospects
        print("2️⃣ Discovering companies...")
        discovery = self.discover_prospects(search['id'])
        print(f"   ✅ Found {discovery['companies_discovered']} companies")

        # Qualify with AI
        print("3️⃣ Running AI qualification...")
        qualification = self.qualify_prospects(search['id'])
        print(f"   ✅ Qualified {qualification['prospects_qualified']} prospects")

        # Generate emails
        print("4️⃣ Generating personalized emails...")
        emails = self.generate_emails(search['id'])
        print(f"   ✅ Generated {emails['emails_generated']} emails")

        # Setup delivery
        print("5️⃣ Configuring webhook delivery...")
        webhook = self.setup_delivery(search['id'], webhook_url)
        print(f"   ✅ Webhook configured: {webhook['id']}")

        print("\n✨ Workflow complete!")
        return {
            "search_id": search['id'],
            "companies_found": discovery['companies_discovered'],
            "prospects_qualified": qualification['prospects_qualified'],
            "emails_generated": emails['emails_generated'],
            "webhook_id": webhook['id']
        }

# Use the workflow
workflow = OpenProspectWorkflow(API_BASE_URL, AUTH_TOKEN)

result = workflow.run_complete_workflow(
    name="Q1 2024 SaaS Prospects",
    criteria={
        "industry": "Software",
        "employee_count": {"min": 50, "max": 500},
        "location": ["United States"],
        "technologies": ["Salesforce", "HubSpot"]
    },
    webhook_url="https://your-crm.com/webhooks/prospects"
)

print(f"\nWorkflow Results: {result}")

🚀 Next Steps


💡 Best Practices

  1. Use Session Objects: Reuse connections for better performance
  2. Handle Rate Limits: Implement exponential backoff
  3. Validate Data: Check responses before processing
  4. Log Everything: Track API calls for debugging
  5. Use Async for Bulk: Process multiple requests concurrently

Need Help? Check our Support Documentation or contact support@openprospect.io