Skip to content

Python Integration Guide

This guide covers everything you need to build Python applications on top of the Beyond Retrieval v2 API, from simple scripts to production-grade async pipelines.


Prerequisites

  • Python 3.9+
  • httpx (recommended) or requests
pip install httpx

Authentication Setup

import httpx

BASE_URL = "http://localhost:8000/api"
TOKEN = "dev"  # Any string works in bypass mode

headers = {"Authorization": f"Bearer {TOKEN}"}

Production Tokens

In production, obtain a JWT from your authentication provider and pass it as the Bearer token. See the Authentication docs for details.


Client Wrapper

A reusable wrapper that handles the response envelope, error checking, and timeouts.

import httpx
from typing import Any


class BeyondRetrievalClient:
    """Lightweight wrapper around the Beyond Retrieval v2 REST API."""

    def __init__(self, base_url: str = "http://localhost:8000/api", token: str = "dev"):
        self.base_url = base_url.rstrip("/")
        self.headers = {"Authorization": f"Bearer {token}"}
        self.client = httpx.Client(timeout=60.0)

    def _request(self, method: str, path: str, **kwargs) -> Any:
        url = f"{self.base_url}{path}"
        response = self.client.request(method, url, headers=self.headers, **kwargs)
        response.raise_for_status()
        body = response.json()
        if not body.get("success"):
            raise RuntimeError(f"API error: {body.get('error', 'Unknown error')}")
        return body["data"]

    def get(self, path: str, **kwargs) -> Any:
        return self._request("GET", path, **kwargs)

    def post(self, path: str, **kwargs) -> Any:
        return self._request("POST", path, **kwargs)

    def put(self, path: str, **kwargs) -> Any:
        return self._request("PUT", path, **kwargs)

    def patch(self, path: str, **kwargs) -> Any:
        return self._request("PATCH", path, **kwargs)

    def delete(self, path: str, **kwargs) -> Any:
        return self._request("DELETE", path, **kwargs)

    def close(self):
        self.client.close()

Usage:

client = BeyondRetrievalClient()
notebooks = client.get("/notebooks/")
print(f"Found {len(notebooks)} notebooks")
client.close()

End-to-End Example

A complete workflow: create a notebook, upload files, ingest, chat, and collect feedback.

import httpx
import uuid
import time

BASE_URL = "http://localhost:8000/api"
TOKEN = "dev"
headers = {"Authorization": f"Bearer {TOKEN}"}


# ── 1. Create a notebook ──────────────────────────────────────────────
notebook_id = str(uuid.uuid4())
response = httpx.post(
    f"{BASE_URL}/notebooks/",
    headers=headers,
    json={
        "notebook_id": notebook_id,
        "notebook_title": "Python Integration Test",
        "user_id": "dev-user",
        "embedding_model": "openai/text-embedding-3-small",
    },
)
notebook = response.json()["data"]
print(f"Created notebook: {notebook['notebook_id']}")


# ── 2. Upload files ──────────────────────────────────────────────────
with open("handbook.pdf", "rb") as f:
    response = httpx.post(
        f"{BASE_URL}/notebooks/{notebook_id}/documents/upload",
        headers=headers,
        files={"files": ("handbook.pdf", f, "application/pdf")},
    )
uploaded = response.json()["data"]
print(f"Uploaded {len(uploaded)} file(s)")


# ── 3. Start ingestion ───────────────────────────────────────────────
files_payload = [
    {
        "file_id": f["file_id"],
        "file_name": f["file_name"],
        "file_path": f["storage_path"],
    }
    for f in uploaded
]
response = httpx.post(
    f"{BASE_URL}/notebooks/{notebook_id}/documents/ingest",
    headers=headers,
    json={
        "files": files_payload,
        "settings": {
            "parser": "Docling Parser",
            "chunking_strategy": "Recursive Chunking",
            "chunk_size": 1000,
            "chunk_overlap": 200,
        },
        "notebook_name": "Python Integration Test",
    },
)
jobs = response.json()["data"]["jobs"]
print(f"Started {len(jobs)} ingestion job(s)")


# ── 4. Poll ingestion status ─────────────────────────────────────────
file_id = uploaded[0]["file_id"]
while True:
    response = httpx.get(
        f"{BASE_URL}/notebooks/{notebook_id}/documents/{file_id}/stage",
        headers=headers,
    )
    stage = response.json()["data"]
    print(f"  Status: {stage['status']}, Stage: {stage.get('stage', 'N/A')}")

    if stage["status"] in ("success", "error"):
        break
    time.sleep(3)


# ── 5. Create a conversation ─────────────────────────────────────────
response = httpx.post(
    f"{BASE_URL}/notebooks/{notebook_id}/conversations",
    headers=headers,
    json={"title": "Test Chat", "chat_mode": "rag"},
)
conv = response.json()["data"]
conversation_id = conv["conversation_id"]
print(f"Created conversation: {conversation_id}")


# ── 6. Send a RAG message ────────────────────────────────────────────
response = httpx.post(
    f"{BASE_URL}/notebooks/{notebook_id}/conversations/{conversation_id}/messages",
    headers=headers,
    json={
        "content": "What is the refund policy?",
        "chat_mode": "rag",
        "strategy_id": "fusion",
        "persona": "professional",
        "language": "en",
    },
    timeout=60.0,  # RAG pipeline can take time
)
result = response.json()["data"]
assistant_msg = result["assistant_message"]
print(f"\nAI: {assistant_msg['content'][:200]}...")

# ── 7. Print citations ───────────────────────────────────────────────
for cite in assistant_msg.get("citations", []):
    print(f"  [{cite['citation_id']}] {cite['metadata'].get('file_name', 'N/A')}"
          f" (score: {cite.get('similarity', 'N/A')})")

# ── 8. Submit feedback ───────────────────────────────────────────────
message_id = assistant_msg["id"]
response = httpx.post(
    f"{BASE_URL}/notebooks/{notebook_id}/messages/{message_id}/feedback",
    headers=headers,
    json={"is_positive": True, "feedback_text": "Accurate and helpful"},
)
print(f"\nFeedback saved: {response.json()['data']}")

File Upload

Upload multiple files using multipart/form-data:

import httpx

def upload_files(notebook_id: str, file_paths: list[str], headers: dict) -> list:
    """Upload multiple files to a notebook."""
    files = []
    file_handles = []
    for path in file_paths:
        f = open(path, "rb")
        file_handles.append(f)
        files.append(("files", (path.split("/")[-1], f)))

    try:
        response = httpx.post(
            f"{BASE_URL}/notebooks/{notebook_id}/documents/upload",
            headers=headers,
            files=files,
            timeout=120.0,
        )
        response.raise_for_status()
        return response.json()["data"]
    finally:
        for f in file_handles:
            f.close()


uploaded = upload_files(
    notebook_id,
    ["docs/handbook.pdf", "docs/faq.docx", "docs/policy.txt"],
    headers,
)
print(f"Uploaded {len(uploaded)} files")

Error Handling

import httpx


def safe_api_call(method: str, url: str, headers: dict, **kwargs):
    """Make an API call with comprehensive error handling."""
    try:
        response = httpx.request(method, url, headers=headers, **kwargs)

        # HTTP-level errors
        if response.status_code == 401:
            raise PermissionError("Invalid or missing authentication token")
        if response.status_code == 403:
            raise PermissionError("Admin access required for this endpoint")
        if response.status_code == 404:
            raise LookupError("Resource not found")
        if response.status_code == 409:
            raise RuntimeError("Conflict: resource is being processed or storage disabled")

        response.raise_for_status()

        # Application-level errors
        body = response.json()
        if not body.get("success"):
            raise RuntimeError(f"API error: {body.get('error', 'Unknown')}")

        return body["data"]

    except httpx.ConnectError:
        raise ConnectionError("Cannot connect to Beyond Retrieval API. Is the server running?")
    except httpx.TimeoutException:
        raise TimeoutError("Request timed out. Try increasing the timeout for long operations.")


# Usage
try:
    notebooks = safe_api_call("GET", f"{BASE_URL}/notebooks/", headers)
except PermissionError as e:
    print(f"Auth error: {e}")
except ConnectionError as e:
    print(f"Connection error: {e}")

Async Patterns

For high-throughput applications, use httpx.AsyncClient:

import httpx
import asyncio


class AsyncBeyondRetrievalClient:
    """Async wrapper around the Beyond Retrieval v2 REST API."""

    def __init__(self, base_url: str = "http://localhost:8000/api", token: str = "dev"):
        self.base_url = base_url.rstrip("/")
        self.headers = {"Authorization": f"Bearer {token}"}
        self.client = httpx.AsyncClient(timeout=60.0)

    async def _request(self, method: str, path: str, **kwargs):
        url = f"{self.base_url}{path}"
        response = await self.client.request(method, url, headers=self.headers, **kwargs)
        response.raise_for_status()
        body = response.json()
        if not body.get("success"):
            raise RuntimeError(f"API error: {body.get('error')}")
        return body["data"]

    async def get(self, path, **kwargs):
        return await self._request("GET", path, **kwargs)

    async def post(self, path, **kwargs):
        return await self._request("POST", path, **kwargs)

    async def close(self):
        await self.client.aclose()


async def main():
    client = AsyncBeyondRetrievalClient()

    # Fetch notebooks and models in parallel
    notebooks, models = await asyncio.gather(
        client.get("/notebooks/"),
        client.get("/models/openrouter"),
    )

    print(f"Notebooks: {len(notebooks)}")
    print(f"Models: {len(models['models'])}")

    await client.close()


asyncio.run(main())

Retrieval Strategies

Execute different retrieval strategies and compare results:

import httpx

notebook_id = "a1b2c3d4-e5f6-7890-abcd-ef1234567890"

# List available strategies
response = httpx.get(
    f"{BASE_URL}/notebooks/{notebook_id}/retrieval/strategies",
    headers=headers,
)
strategies = response.json()["data"]
for s in strategies:
    llm_tag = " [requires LLM]" if s["requires_llm"] else ""
    print(f"  {s['id']}: {s['name']}{llm_tag}")

# Execute a search
response = httpx.post(
    f"{BASE_URL}/notebooks/{notebook_id}/retrieval/retrieve",
    headers=headers,
    json={
        "query": "What are the return policies?",
        "strategy_id": "fusion",
        "top_k": 10,
        "full_text_weight": 1.0,
        "semantic_weight": 1.0,
    },
)
result = response.json()["data"]
print(f"\nFound {result['total_results']} chunks in {result['execution_time_ms']}ms")
for chunk in result["chunks"]:
    print(f"  [{chunk['rank']}] {chunk['content'][:80]}... (score: {chunk['score']:.3f})")

Enhancement Pipeline Automation

Automate the full AI enhancement lifecycle:

import httpx
import time

notebook_id = "a1b2c3d4-e5f6-7890-abcd-ef1234567890"

# 1. List files available for enhancement
response = httpx.get(
    f"{BASE_URL}/notebooks/{notebook_id}/enhance/files",
    headers=headers,
)
files = response.json()["data"]
for f in files:
    print(f"  {f['file_name']}: {f['success']}/{f['total_chunks']} enhanced")

# 2. Start enhancement for all files
file_ids = [f["file_id"] for f in files if f["pending"] > 0]
if file_ids:
    response = httpx.post(
        f"{BASE_URL}/notebooks/{notebook_id}/enhance",
        headers=headers,
        json={"file_ids": file_ids},
    )
    print(f"Enhancement started for {len(file_ids)} files")

    # 3. Poll until complete
    for fid in file_ids:
        while True:
            response = httpx.get(
                f"{BASE_URL}/notebooks/{notebook_id}/enhance/status",
                headers=headers,
                params={"file_id": fid},
            )
            status = response.json()["data"]
            print(f"  Progress: {status['progress_pct']:.1f}%"
                  f" ({status['success']}/{status['total']} done)")

            if status["all_terminated"]:
                break
            time.sleep(4)

    # 4. Publish enhanced chunks
    for f in files:
        if f["file_id"] in file_ids:
            response = httpx.post(
                f"{BASE_URL}/notebooks/{notebook_id}/enhance/publish",
                headers=headers,
                json={
                    "file_id": f["file_id"],
                    "file_name": f["file_name"],
                    "notebook_title": "My Notebook",
                },
            )
            result = response.json()["data"]
            print(f"  Published: {result['published_count']} chunks")

Health Monitoring

Check notebook data quality and clean up duplicates:

import httpx

notebook_id = "a1b2c3d4-e5f6-7890-abcd-ef1234567890"

# Run health check
response = httpx.get(
    f"{BASE_URL}/notebooks/{notebook_id}/health",
    headers=headers,
)
health = response.json()["data"]
print(f"Health Score: {health['health_score']}/100")
print(f"Total chunks: {health['total_chunks']}")
print(f"Duplicates: {health['duplicate_count']}")
print(f"Orphans: {health['orphaned_count']}")
print(f"Enhanced: {health['enhanced_count']}")

# Clean up duplicates if found
if health["duplicate_count"] > 0:
    response = httpx.post(
        f"{BASE_URL}/notebooks/{notebook_id}/health/cleanup",
        headers=headers,
    )
    result = response.json()["data"]
    print(f"Removed {result['removed_count']} duplicates")
    print(f"New score: {result['new_health_score']}/100")