Python Integration Guide¶
This guide covers everything you need to build Python applications on top of the Beyond Retrieval v2 API, from simple scripts to production-grade async pipelines.
Prerequisites¶
- Python 3.9+
httpx(recommended) orrequests
Authentication Setup¶
import httpx
BASE_URL = "http://localhost:8000/api"
TOKEN = "dev" # Any string works in bypass mode
headers = {"Authorization": f"Bearer {TOKEN}"}
Production Tokens
In production, obtain a JWT from your authentication provider and pass it as the Bearer token. See the Authentication docs for details.
Client Wrapper¶
A reusable wrapper that handles the response envelope, error checking, and timeouts.
import httpx
from typing import Any
class BeyondRetrievalClient:
"""Lightweight wrapper around the Beyond Retrieval v2 REST API."""
def __init__(self, base_url: str = "http://localhost:8000/api", token: str = "dev"):
self.base_url = base_url.rstrip("/")
self.headers = {"Authorization": f"Bearer {token}"}
self.client = httpx.Client(timeout=60.0)
def _request(self, method: str, path: str, **kwargs) -> Any:
url = f"{self.base_url}{path}"
response = self.client.request(method, url, headers=self.headers, **kwargs)
response.raise_for_status()
body = response.json()
if not body.get("success"):
raise RuntimeError(f"API error: {body.get('error', 'Unknown error')}")
return body["data"]
def get(self, path: str, **kwargs) -> Any:
return self._request("GET", path, **kwargs)
def post(self, path: str, **kwargs) -> Any:
return self._request("POST", path, **kwargs)
def put(self, path: str, **kwargs) -> Any:
return self._request("PUT", path, **kwargs)
def patch(self, path: str, **kwargs) -> Any:
return self._request("PATCH", path, **kwargs)
def delete(self, path: str, **kwargs) -> Any:
return self._request("DELETE", path, **kwargs)
def close(self):
self.client.close()
Usage:
client = BeyondRetrievalClient()
notebooks = client.get("/notebooks/")
print(f"Found {len(notebooks)} notebooks")
client.close()
End-to-End Example¶
A complete workflow: create a notebook, upload files, ingest, chat, and collect feedback.
import httpx
import uuid
import time
BASE_URL = "http://localhost:8000/api"
TOKEN = "dev"
headers = {"Authorization": f"Bearer {TOKEN}"}
# ── 1. Create a notebook ──────────────────────────────────────────────
notebook_id = str(uuid.uuid4())
response = httpx.post(
f"{BASE_URL}/notebooks/",
headers=headers,
json={
"notebook_id": notebook_id,
"notebook_title": "Python Integration Test",
"user_id": "dev-user",
"embedding_model": "openai/text-embedding-3-small",
},
)
notebook = response.json()["data"]
print(f"Created notebook: {notebook['notebook_id']}")
# ── 2. Upload files ──────────────────────────────────────────────────
with open("handbook.pdf", "rb") as f:
response = httpx.post(
f"{BASE_URL}/notebooks/{notebook_id}/documents/upload",
headers=headers,
files={"files": ("handbook.pdf", f, "application/pdf")},
)
uploaded = response.json()["data"]
print(f"Uploaded {len(uploaded)} file(s)")
# ── 3. Start ingestion ───────────────────────────────────────────────
files_payload = [
{
"file_id": f["file_id"],
"file_name": f["file_name"],
"file_path": f["storage_path"],
}
for f in uploaded
]
response = httpx.post(
f"{BASE_URL}/notebooks/{notebook_id}/documents/ingest",
headers=headers,
json={
"files": files_payload,
"settings": {
"parser": "Docling Parser",
"chunking_strategy": "Recursive Chunking",
"chunk_size": 1000,
"chunk_overlap": 200,
},
"notebook_name": "Python Integration Test",
},
)
jobs = response.json()["data"]["jobs"]
print(f"Started {len(jobs)} ingestion job(s)")
# ── 4. Poll ingestion status ─────────────────────────────────────────
file_id = uploaded[0]["file_id"]
while True:
response = httpx.get(
f"{BASE_URL}/notebooks/{notebook_id}/documents/{file_id}/stage",
headers=headers,
)
stage = response.json()["data"]
print(f" Status: {stage['status']}, Stage: {stage.get('stage', 'N/A')}")
if stage["status"] in ("success", "error"):
break
time.sleep(3)
# ── 5. Create a conversation ─────────────────────────────────────────
response = httpx.post(
f"{BASE_URL}/notebooks/{notebook_id}/conversations",
headers=headers,
json={"title": "Test Chat", "chat_mode": "rag"},
)
conv = response.json()["data"]
conversation_id = conv["conversation_id"]
print(f"Created conversation: {conversation_id}")
# ── 6. Send a RAG message ────────────────────────────────────────────
response = httpx.post(
f"{BASE_URL}/notebooks/{notebook_id}/conversations/{conversation_id}/messages",
headers=headers,
json={
"content": "What is the refund policy?",
"chat_mode": "rag",
"strategy_id": "fusion",
"persona": "professional",
"language": "en",
},
timeout=60.0, # RAG pipeline can take time
)
result = response.json()["data"]
assistant_msg = result["assistant_message"]
print(f"\nAI: {assistant_msg['content'][:200]}...")
# ── 7. Print citations ───────────────────────────────────────────────
for cite in assistant_msg.get("citations", []):
print(f" [{cite['citation_id']}] {cite['metadata'].get('file_name', 'N/A')}"
f" (score: {cite.get('similarity', 'N/A')})")
# ── 8. Submit feedback ───────────────────────────────────────────────
message_id = assistant_msg["id"]
response = httpx.post(
f"{BASE_URL}/notebooks/{notebook_id}/messages/{message_id}/feedback",
headers=headers,
json={"is_positive": True, "feedback_text": "Accurate and helpful"},
)
print(f"\nFeedback saved: {response.json()['data']}")
File Upload¶
Upload multiple files using multipart/form-data:
import httpx
def upload_files(notebook_id: str, file_paths: list[str], headers: dict) -> list:
"""Upload multiple files to a notebook."""
files = []
file_handles = []
for path in file_paths:
f = open(path, "rb")
file_handles.append(f)
files.append(("files", (path.split("/")[-1], f)))
try:
response = httpx.post(
f"{BASE_URL}/notebooks/{notebook_id}/documents/upload",
headers=headers,
files=files,
timeout=120.0,
)
response.raise_for_status()
return response.json()["data"]
finally:
for f in file_handles:
f.close()
uploaded = upload_files(
notebook_id,
["docs/handbook.pdf", "docs/faq.docx", "docs/policy.txt"],
headers,
)
print(f"Uploaded {len(uploaded)} files")
Error Handling¶
import httpx
def safe_api_call(method: str, url: str, headers: dict, **kwargs):
"""Make an API call with comprehensive error handling."""
try:
response = httpx.request(method, url, headers=headers, **kwargs)
# HTTP-level errors
if response.status_code == 401:
raise PermissionError("Invalid or missing authentication token")
if response.status_code == 403:
raise PermissionError("Admin access required for this endpoint")
if response.status_code == 404:
raise LookupError("Resource not found")
if response.status_code == 409:
raise RuntimeError("Conflict: resource is being processed or storage disabled")
response.raise_for_status()
# Application-level errors
body = response.json()
if not body.get("success"):
raise RuntimeError(f"API error: {body.get('error', 'Unknown')}")
return body["data"]
except httpx.ConnectError:
raise ConnectionError("Cannot connect to Beyond Retrieval API. Is the server running?")
except httpx.TimeoutException:
raise TimeoutError("Request timed out. Try increasing the timeout for long operations.")
# Usage
try:
notebooks = safe_api_call("GET", f"{BASE_URL}/notebooks/", headers)
except PermissionError as e:
print(f"Auth error: {e}")
except ConnectionError as e:
print(f"Connection error: {e}")
Async Patterns¶
For high-throughput applications, use httpx.AsyncClient:
import httpx
import asyncio
class AsyncBeyondRetrievalClient:
"""Async wrapper around the Beyond Retrieval v2 REST API."""
def __init__(self, base_url: str = "http://localhost:8000/api", token: str = "dev"):
self.base_url = base_url.rstrip("/")
self.headers = {"Authorization": f"Bearer {token}"}
self.client = httpx.AsyncClient(timeout=60.0)
async def _request(self, method: str, path: str, **kwargs):
url = f"{self.base_url}{path}"
response = await self.client.request(method, url, headers=self.headers, **kwargs)
response.raise_for_status()
body = response.json()
if not body.get("success"):
raise RuntimeError(f"API error: {body.get('error')}")
return body["data"]
async def get(self, path, **kwargs):
return await self._request("GET", path, **kwargs)
async def post(self, path, **kwargs):
return await self._request("POST", path, **kwargs)
async def close(self):
await self.client.aclose()
async def main():
client = AsyncBeyondRetrievalClient()
# Fetch notebooks and models in parallel
notebooks, models = await asyncio.gather(
client.get("/notebooks/"),
client.get("/models/openrouter"),
)
print(f"Notebooks: {len(notebooks)}")
print(f"Models: {len(models['models'])}")
await client.close()
asyncio.run(main())
Retrieval Strategies¶
Execute different retrieval strategies and compare results:
import httpx
notebook_id = "a1b2c3d4-e5f6-7890-abcd-ef1234567890"
# List available strategies
response = httpx.get(
f"{BASE_URL}/notebooks/{notebook_id}/retrieval/strategies",
headers=headers,
)
strategies = response.json()["data"]
for s in strategies:
llm_tag = " [requires LLM]" if s["requires_llm"] else ""
print(f" {s['id']}: {s['name']}{llm_tag}")
# Execute a search
response = httpx.post(
f"{BASE_URL}/notebooks/{notebook_id}/retrieval/retrieve",
headers=headers,
json={
"query": "What are the return policies?",
"strategy_id": "fusion",
"top_k": 10,
"full_text_weight": 1.0,
"semantic_weight": 1.0,
},
)
result = response.json()["data"]
print(f"\nFound {result['total_results']} chunks in {result['execution_time_ms']}ms")
for chunk in result["chunks"]:
print(f" [{chunk['rank']}] {chunk['content'][:80]}... (score: {chunk['score']:.3f})")
Enhancement Pipeline Automation¶
Automate the full AI enhancement lifecycle:
import httpx
import time
notebook_id = "a1b2c3d4-e5f6-7890-abcd-ef1234567890"
# 1. List files available for enhancement
response = httpx.get(
f"{BASE_URL}/notebooks/{notebook_id}/enhance/files",
headers=headers,
)
files = response.json()["data"]
for f in files:
print(f" {f['file_name']}: {f['success']}/{f['total_chunks']} enhanced")
# 2. Start enhancement for all files
file_ids = [f["file_id"] for f in files if f["pending"] > 0]
if file_ids:
response = httpx.post(
f"{BASE_URL}/notebooks/{notebook_id}/enhance",
headers=headers,
json={"file_ids": file_ids},
)
print(f"Enhancement started for {len(file_ids)} files")
# 3. Poll until complete
for fid in file_ids:
while True:
response = httpx.get(
f"{BASE_URL}/notebooks/{notebook_id}/enhance/status",
headers=headers,
params={"file_id": fid},
)
status = response.json()["data"]
print(f" Progress: {status['progress_pct']:.1f}%"
f" ({status['success']}/{status['total']} done)")
if status["all_terminated"]:
break
time.sleep(4)
# 4. Publish enhanced chunks
for f in files:
if f["file_id"] in file_ids:
response = httpx.post(
f"{BASE_URL}/notebooks/{notebook_id}/enhance/publish",
headers=headers,
json={
"file_id": f["file_id"],
"file_name": f["file_name"],
"notebook_title": "My Notebook",
},
)
result = response.json()["data"]
print(f" Published: {result['published_count']} chunks")
Health Monitoring¶
Check notebook data quality and clean up duplicates:
import httpx
notebook_id = "a1b2c3d4-e5f6-7890-abcd-ef1234567890"
# Run health check
response = httpx.get(
f"{BASE_URL}/notebooks/{notebook_id}/health",
headers=headers,
)
health = response.json()["data"]
print(f"Health Score: {health['health_score']}/100")
print(f"Total chunks: {health['total_chunks']}")
print(f"Duplicates: {health['duplicate_count']}")
print(f"Orphans: {health['orphaned_count']}")
print(f"Enhanced: {health['enhanced_count']}")
# Clean up duplicates if found
if health["duplicate_count"] > 0:
response = httpx.post(
f"{BASE_URL}/notebooks/{notebook_id}/health/cleanup",
headers=headers,
)
result = response.json()["data"]
print(f"Removed {result['removed_count']} duplicates")
print(f"New score: {result['new_health_score']}/100")