Python API Examples¶
Complete Python code examples for integrating with the OpenProspect API, from basic requests to advanced workflows.
🚀 Installation & Setup¶
Install Dependencies¶
Basic Configuration¶
import os
import requests
from dotenv import load_dotenv
# Load environment variables
load_dotenv()
# Configuration
API_BASE_URL = os.getenv("OPENPROSPECT_API_URL", "http://localhost:8000/api/v1")
AUTH_TOKEN = os.getenv("OPENPROSPECT_API_KEY") # Set via: export OPENPROSPECT_API_KEY=your_token
# Create session with default headers
session = requests.Session()
session.headers.update({
"Authorization": f"Bearer {AUTH_TOKEN}",
"Content-Type": "application/json",
"User-Agent": "OpenProspect-Python-Client/1.0"
})
📚 Basic API Operations¶
1. Prospect Search Management¶
# Create a new prospect search
def create_prospect_search(name, criteria):
"""Create a new prospect search with ICP criteria"""
data = {
"name": name,
"description": f"Search for {criteria.get('industry', 'companies')}",
"target_criteria": criteria,
"ideal_customer_profile": {
"company_description": criteria.get("description", ""),
"target_roles": criteria.get("roles", ["Decision Maker"]),
"pain_points": criteria.get("pain_points", [])
}
}
response = session.post(f"{API_BASE_URL}/prospect-searches", json=data)
response.raise_for_status()
return response.json()
# Example usage
search = create_prospect_search(
"B2B SaaS Prospects Q1 2024",
{
"industry": "Software",
"employee_count": {"min": 50, "max": 500},
"location": ["United States", "Canada"],
"technologies": ["Salesforce", "HubSpot"],
"description": "B2B SaaS companies with growing sales teams",
"roles": ["VP Sales", "Head of Sales", "Sales Director"],
"pain_points": ["Manual prospect research", "Low email deliverability"]
}
)
print(f"Created search: {search['id']}")
2. Start Discovery Process¶
import time
def start_crawling(prospect_search_id, max_companies=100):
"""Start crawling to discover companies"""
data = {
"prospect_search_id": prospect_search_id,
"max_companies": max_companies,
"parallel_tasks": 5,
"enable_screenshots": True
}
response = session.post(f"{API_BASE_URL}/crawl/start", json=data)
response.raise_for_status()
return response.json()
def monitor_task_progress(task_id, check_interval=30):
"""Monitor task progress until completion"""
while True:
response = session.get(f"{API_BASE_URL}/crawl/status/{task_id}")
response.raise_for_status()
status = response.json()
print(f"Status: {status['status']} - Progress: {status['progress']}%")
if status['status'] in ['COMPLETED', 'FAILED']:
return status
time.sleep(check_interval)
# Start crawling
crawl_task = start_crawling(search['id'], max_companies=50)
print(f"Crawling started: Task {crawl_task['task_id']}")
# Monitor progress
result = monitor_task_progress(crawl_task['task_id'])
print(f"Crawling completed: {result['companies_discovered']} companies found")
3. Retrieve and Filter Prospects¶
import pandas as pd
def get_prospects(prospect_search_id, filters=None):
"""Get prospects with optional filtering"""
params = {"prospect_search_id": prospect_search_id}
if filters:
params.update(filters)
response = session.get(f"{API_BASE_URL}/prospects", params=params)
response.raise_for_status()
return response.json()
def prospects_to_dataframe(prospects):
"""Convert prospects to pandas DataFrame for analysis"""
data = []
for p in prospects:
data.append({
"company_name": p["company"]["name"],
"website": p["company"]["website"],
"employee_count": p["company"]["employee_count"],
"contact_name": p["contact"]["name"],
"contact_title": p["contact"]["title"],
"contact_email": p["contact"]["email"],
"qualification_score": p["qualification_score"],
"email_status": p["contact"]["email_status"]
})
return pd.DataFrame(data)
# Get high-quality prospects
prospects = get_prospects(
search['id'],
filters={
"qualification_score_min": 8.0,
"email_status": "deliverable",
"sort_by": "qualification_score",
"sort_order": "desc",
"limit": 100
}
)
# Convert to DataFrame for analysis
df = prospects_to_dataframe(prospects['prospects'])
print(f"\nTop Prospects (Score >= 8.0):")
print(df.head(10))
# Analyze the results
print(f"\nProspect Analysis:")
print(f"Total prospects: {len(df)}")
print(f"Average qualification score: {df['qualification_score'].mean():.2f}")
print(f"Email deliverability rate: {(df['email_status'] == 'deliverable').mean():.1%}")
🧠 Advanced Operations¶
1. AI-Powered Analysis¶
def trigger_ai_analysis(prospect_search_id, analysis_type="comprehensive"):
"""Trigger AI analysis for prospect qualification"""
data = {
"prospect_search_id": prospect_search_id,
"analysis_type": analysis_type,
"enable_competitive_intel": True,
"enable_buying_signals": True,
"batch_size": 25
}
response = session.post(f"{API_BASE_URL}/analysis/trigger-bulk", json=data)
response.raise_for_status()
return response.json()
def get_ai_insights(prospect_id):
"""Get detailed AI insights for a specific prospect"""
response = session.get(f"{API_BASE_URL}/prospects/{prospect_id}/insights")
response.raise_for_status()
return response.json()
# Trigger analysis
analysis_task = trigger_ai_analysis(search['id'])
print(f"AI Analysis started: {analysis_task['task_id']}")
# Wait for completion then get insights
time.sleep(60) # Or use monitor_task_progress()
# Get insights for top prospect
top_prospect = prospects['prospects'][0]
insights = get_ai_insights(top_prospect['id'])
print(f"\nAI Insights for {top_prospect['company']['name']}:")
print(f"- Pain Points: {', '.join(insights['pain_points'])}")
print(f"- Buying Signals: {', '.join(insights['buying_signals'])}")
print(f"- Recommended Approach: {insights['recommended_approach']}")
print(f"- Estimated Deal Size: {insights['estimated_deal_size']}")
2. Email Generation¶
def generate_personalized_email(prospect_id, template="default"):
"""Generate personalized email for a prospect"""
data = {
"prospect_id": prospect_id,
"template": template,
"tone": "professional-friendly",
"personalization_level": "high",
"include_pain_points": True,
"include_social_proof": True
}
response = session.post(f"{API_BASE_URL}/email-generation/generate", json=data)
response.raise_for_status()
return response.json()
def generate_bulk_emails(prospect_ids, campaign_name):
"""Generate emails for multiple prospects"""
data = {
"prospect_ids": prospect_ids,
"campaign_name": campaign_name,
"template": "saas_value_prop",
"ab_test_variations": 3,
"quality_threshold": 8.0
}
response = session.post(f"{API_BASE_URL}/email-generation/bulk", json=data)
response.raise_for_status()
return response.json()
# Generate email for top prospect
email = generate_personalized_email(top_prospect['id'])
print(f"\nGenerated Email:")
print(f"Subject: {email['subject_line']}")
print(f"Preview: {email['body'][:200]}...")
print(f"Quality Score: {email['quality_score']}/10")
# Bulk generation for campaign
prospect_ids = [p['id'] for p in prospects['prospects'][:20]]
campaign = generate_bulk_emails(prospect_ids, "Q1 Hot Prospects Campaign")
print(f"\nGenerated {campaign['emails_generated']} emails for campaign")
3. Webhook Configuration¶
def create_webhook(name, url, prospect_search_id, filters=None):
"""Create webhook for real-time data delivery"""
data = {
"name": name,
"webhook_url": url,
"prospect_search_id": prospect_search_id,
"frequency": "realtime",
"enabled": True,
"filters": filters or {},
"retry_config": {
"max_retries": 3,
"retry_delay": 60
}
}
response = session.post(f"{API_BASE_URL}/delivery/webhooks", json=data)
response.raise_for_status()
return response.json()
def test_webhook(webhook_id):
"""Test webhook configuration"""
response = session.post(f"{API_BASE_URL}/delivery/webhooks/{webhook_id}/test")
response.raise_for_status()
return response.json()
# Create webhook for hot prospects
webhook = create_webhook(
"Hot Prospects to CRM",
"https://your-crm.com/webhooks/prospects",
search['id'],
filters={
"qualification_score_min": 8.5,
"email_status": "deliverable"
}
)
print(f"Webhook created: {webhook['id']}")
# Test the webhook
test_result = test_webhook(webhook['id'])
print(f"Webhook test: {test_result['status']} - {test_result['message']}")
🔄 Async Operations¶
Async Client for High Performance¶
import asyncio
import aiohttp
class AsyncOpenProspectClient:
def __init__(self, api_base, token):
self.api_base = api_base
self.headers = {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json"
}
self.session = None
async def __aenter__(self):
self.session = aiohttp.ClientSession(headers=self.headers)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.session.close()
async def get_prospect(self, prospect_id):
async with self.session.get(f"{self.api_base}/prospects/{prospect_id}") as resp:
return await resp.json()
async def get_multiple_prospects(self, prospect_ids):
"""Fetch multiple prospects concurrently"""
tasks = [self.get_prospect(pid) for pid in prospect_ids]
return await asyncio.gather(*tasks)
# Usage example
async def fetch_prospects_async():
async with AsyncOpenProspectClient(API_BASE_URL, AUTH_TOKEN) as client:
# Fetch 50 prospects concurrently
prospect_ids = [p['id'] for p in prospects['prospects'][:50]]
results = await client.get_multiple_prospects(prospect_ids)
print(f"Fetched {len(results)} prospects asynchronously")
return results
# Run async operation
prospects_detailed = asyncio.run(fetch_prospects_async())
📊 Data Export & Reporting¶
Export to Different Formats¶
def export_prospects(prospect_search_id, format="csv", filters=None):
"""Export prospects in various formats"""
data = {
"prospect_search_id": prospect_search_id,
"format": format,
"filters": filters or {},
"include_fields": [
"company_name", "website", "employee_count",
"contact_name", "contact_title", "contact_email",
"qualification_score", "ai_insights"
]
}
response = session.post(f"{API_BASE_URL}/export/request", json=data)
response.raise_for_status()
return response.json()
def download_export(export_id, output_file):
"""Download completed export"""
response = session.get(
f"{API_BASE_URL}/export/download/{export_id}",
stream=True
)
response.raise_for_status()
with open(output_file, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
print(f"Export saved to: {output_file}")
# Request export
export_job = export_prospects(
search['id'],
format="xlsx",
filters={"qualification_score_min": 8.0}
)
print(f"Export job created: {export_job['id']}")
# Wait for completion then download
time.sleep(30)
download_export(export_job['id'], "hot_prospects.xlsx")
Generate Analytics Report¶
def generate_analytics_report(prospect_search_id):
"""Generate comprehensive analytics report"""
# Get search analytics
response = session.get(f"{API_BASE_URL}/prospect-searches/{prospect_search_id}/analytics")
response.raise_for_status()
analytics = response.json()
# Create report
report = f"""
Prospect Search Analytics Report
================================
Search ID: {prospect_search_id}
Generated: {pd.Timestamp.now()}
Discovery Metrics:
- Companies Analyzed: {analytics['companies_analyzed']}
- Prospects Found: {analytics['prospects_discovered']}
- Qualification Rate: {analytics['qualification_rate']:.1%}
Quality Metrics:
- Average Score: {analytics['avg_qualification_score']:.1f}/10
- High Quality (8+): {analytics['high_quality_count']} ({analytics['high_quality_percentage']:.1%})
- Email Deliverability: {analytics['email_deliverability_rate']:.1%}
Engagement Potential:
- With Buying Signals: {analytics['buying_signals_count']}
- Ready to Buy: {analytics['ready_now_count']}
- Estimated Pipeline: ${analytics['estimated_pipeline_value']:,.0f}
Top Industries:
{pd.DataFrame(analytics['top_industries']).to_string()}
Top Technologies:
{pd.DataFrame(analytics['top_technologies']).to_string()}
"""
return report
# Generate report
report = generate_analytics_report(search['id'])
print(report)
# Save to file
with open(f"prospect_report_{search['id']}.txt", "w") as f:
f.write(report)
🛠️ Complete Integration Example¶
End-to-End Workflow Class¶
class OpenProspectWorkflow:
"""Complete OpenProspect workflow automation"""
def __init__(self, api_base, token):
self.api_base = api_base
self.session = requests.Session()
self.session.headers.update({
"Authorization": f"Bearer {token}",
"Content-Type": "application/json"
})
def create_search(self, name, criteria):
"""Step 1: Create prospect search"""
response = self.session.post(
f"{self.api_base}/prospect-searches",
json={
"name": name,
"target_criteria": criteria
}
)
response.raise_for_status()
return response.json()
def discover_prospects(self, search_id, max_companies=100):
"""Step 2: Discover companies"""
# Start crawling
response = self.session.post(
f"{self.api_base}/crawl/start",
json={
"prospect_search_id": search_id,
"max_companies": max_companies
}
)
response.raise_for_status()
task = response.json()
# Wait for completion
return self._wait_for_task(task['task_id'])
def qualify_prospects(self, search_id):
"""Step 3: AI qualification"""
response = self.session.post(
f"{self.api_base}/analysis/trigger-bulk",
json={
"prospect_search_id": search_id,
"analysis_type": "comprehensive"
}
)
response.raise_for_status()
task = response.json()
return self._wait_for_task(task['task_id'])
def generate_emails(self, search_id, min_score=8.0):
"""Step 4: Generate personalized emails"""
# Get qualified prospects
response = self.session.get(
f"{self.api_base}/prospects",
params={
"prospect_search_id": search_id,
"qualification_score_min": min_score,
"limit": 50
}
)
response.raise_for_status()
prospects = response.json()['prospects']
# Generate emails
prospect_ids = [p['id'] for p in prospects]
response = self.session.post(
f"{self.api_base}/email-generation/bulk",
json={
"prospect_ids": prospect_ids,
"campaign_name": f"Campaign for {search_id}"
}
)
response.raise_for_status()
return response.json()
def setup_delivery(self, search_id, webhook_url):
"""Step 5: Configure delivery"""
response = self.session.post(
f"{self.api_base}/delivery/webhooks",
json={
"name": f"Delivery for {search_id}",
"webhook_url": webhook_url,
"prospect_search_id": search_id,
"frequency": "hourly"
}
)
response.raise_for_status()
return response.json()
def _wait_for_task(self, task_id, check_interval=30):
"""Wait for async task completion"""
while True:
response = self.session.get(f"{self.api_base}/tasks/{task_id}")
response.raise_for_status()
status = response.json()
if status['status'] in ['COMPLETED', 'FAILED']:
return status
time.sleep(check_interval)
def run_complete_workflow(self, name, criteria, webhook_url):
"""Run complete prospect discovery workflow"""
print("🚀 Starting OpenProspect workflow...")
# Create search
print("1️⃣ Creating prospect search...")
search = self.create_search(name, criteria)
print(f" ✅ Search created: {search['id']}")
# Discover prospects
print("2️⃣ Discovering companies...")
discovery = self.discover_prospects(search['id'])
print(f" ✅ Found {discovery['companies_discovered']} companies")
# Qualify with AI
print("3️⃣ Running AI qualification...")
qualification = self.qualify_prospects(search['id'])
print(f" ✅ Qualified {qualification['prospects_qualified']} prospects")
# Generate emails
print("4️⃣ Generating personalized emails...")
emails = self.generate_emails(search['id'])
print(f" ✅ Generated {emails['emails_generated']} emails")
# Setup delivery
print("5️⃣ Configuring webhook delivery...")
webhook = self.setup_delivery(search['id'], webhook_url)
print(f" ✅ Webhook configured: {webhook['id']}")
print("\n✨ Workflow complete!")
return {
"search_id": search['id'],
"companies_found": discovery['companies_discovered'],
"prospects_qualified": qualification['prospects_qualified'],
"emails_generated": emails['emails_generated'],
"webhook_id": webhook['id']
}
# Use the workflow
workflow = OpenProspectWorkflow(API_BASE_URL, AUTH_TOKEN)
result = workflow.run_complete_workflow(
name="Q1 2024 SaaS Prospects",
criteria={
"industry": "Software",
"employee_count": {"min": 50, "max": 500},
"location": ["United States"],
"technologies": ["Salesforce", "HubSpot"]
},
webhook_url="https://your-crm.com/webhooks/prospects"
)
print(f"\nWorkflow Results: {result}")
🚀 Next Steps¶
- Authentication Guide - Detailed auth implementation
- API Reference - Complete endpoint documentation
- Error Handling - Comprehensive error reference
💡 Best Practices¶
- Use Session Objects: Reuse connections for better performance
- Handle Rate Limits: Implement exponential backoff
- Validate Data: Check responses before processing
- Log Everything: Track API calls for debugging
- Use Async for Bulk: Process multiple requests concurrently
Need Help? Check our Support Documentation or contact support@openprospect.io