Skip to main content
Use these synchronous recipes if your integration runs inside a WSGI app, Lambda, or any environment where asyncio is inconvenient. This walkthrough targets the true live capture API (/analyze/live/*) that streams transcripts, emotion, and cognitive analytics as the media arrives. It is not the Replay endpoint (POST /analyze/stream), which simply replays an uploaded file.

Endpoint Reference

PurposeMethod & PathNotes
Start sessionPOST /analyze/live/startReturns session_id, chunk/events/control URLs, TTL metadata.
Push audio chunkPOST /analyze/live/{session_id}/chunkSequential binary chunks with chunk_seq counters.
Control sessionPOST /analyze/live/{session_id}/controlfinalize, cancel, or keepalive.
Subscribe to analyticsGET /analyze/live/{session_id}/eventsSingle SSE stream delivering status, final_transcript, emotion, cognitive, transition, moment, summary_update, error, and done.
All requests require X-API-Key, and live capture must be enabled on the deployment (ENABLE_LIVE_STREAMING=true).

Requirements

pip install requests==2.32.3
  • Audio input: 16-bit PCM WAV, MP3, or Opus are safe defaults (server handles transcoding).
  • Chunking: 32–128 kB provides good latency without overwhelming FastAPI.
  • SSE: Only one consumer may connect to events_endpoint per session.

Recipe 1 – Start a session & read SSE synchronously

import json
import requests

API_KEY = "YOUR_KEY"
BASE_URL = "https://api.illocution.ai"

def start_session(filename: str, goal: str = "") -> dict:
    resp = requests.post(
        f"{BASE_URL}/analyze/live/start",
        headers={"X-API-Key": API_KEY},
        json={"filename": filename, "goal": goal},
        timeout=30,
    )
    resp.raise_for_status()
    return resp.json()

def stream_events(events_url: str) -> None:
    headers = {
        "Accept": "text/event-stream",
        "X-API-Key": API_KEY,
    }
    with requests.get(events_url, headers=headers, stream=True, timeout=None) as resp:
        resp.raise_for_status()
        event_name = None
        buffer = []
        for line in resp.iter_lines(decode_unicode=True):
            if not line:
                if buffer:
                    payload = json.loads("".join(buffer))
                    print(f"{event_name or 'message'}: {payload}")
                    buffer.clear()
                continue
            if line.startswith("event:"):
                event_name = line.split("event:", 1)[1].strip()
            elif line.startswith("data:"):
                buffer.append(line.split("data:", 1)[1].strip())
Event payloads mirror the async cookbook: status, final_transcript, emotion, cognitive, transition, moment, summary_update, error, done.

Recipe 2 – Stream audio chunks synchronously

from pathlib import Path

def upload_chunks(chunk_url: str, file_path: Path, chunk_size: int = 64 * 1024) -> None:
    headers = {"X-API-Key": API_KEY}
    seq = 0
    with file_path.open("rb") as media:
        while True:
            data = media.read(chunk_size)
            if not data:
                break
            files = {"chunk": (f"chunk-{seq}.bin", data, "application/octet-stream")}
            form = {"chunk_seq": str(seq)}
            resp = requests.post(chunk_url, headers=headers, data=form, files=files, timeout=30)
            resp.raise_for_status()
            ack = resp.json()
            print(f"chunk {seq} ack bytes={ack['received_bytes']}")
            seq += 1
Guidelines:
  • chunk_seq must be exactly 0,1,2.... On retry, resend the same sequence number.
  • Keep chunk sizes consistent; smaller chunks reduce latency but increase HTTP overhead.
  • The server rejects additional chunks after you send finalize.

Recipe 3 – Control the session

def finalize_session(control_url: str) -> None:
    headers = {"X-API-Key": API_KEY}
    resp = requests.post(control_url, headers=headers, json={"action": "finalize"}, timeout=15)
    resp.raise_for_status()
    print(f"finalize ack: {resp.json()}")

def cancel_session(control_url: str, reason: str) -> None:
    headers = {"X-API-Key": API_KEY}
    resp = requests.post(
        control_url,
        headers=headers,
        json={"action": "cancel", "reason": reason},
        timeout=15,
    )
    resp.raise_for_status()

def keepalive_session(control_url: str) -> None:
    headers = {"X-API-Key": API_KEY}
    resp = requests.post(control_url, headers=headers, json={"action": "keepalive"}, timeout=15)
    resp.raise_for_status()

Recipe 4 – Full synchronous runner with threads

To ingest while listening for analytics, use a background thread for SSE.
import threading
from pathlib import Path

def run_live_capture(file_path: Path) -> None:
    session = start_session(filename=file_path.name, goal="Live coaching")
    print(f"Session {session['session_id']} (conversation {session['conversation_id']}) started")

    events_thread = threading.Thread(
        target=stream_events,
        args=(session["events_endpoint"],),
        daemon=True,
    )
    events_thread.start()

    upload_chunks(session["chunk_endpoint"], file_path)
    finalize_session(session["control_endpoint"])

    events_thread.join()  # wait for `done`

if __name__ == "__main__":
    run_live_capture(Path("call.wav"))

Production Notes

  • Replace the file reader with microphone capture: enqueue PCM frames and write them out via upload_chunks without staging on disk.
  • Add exponential backoff & retry for requests.post calls; resend the same chunk_seq if you get a transient network error.
  • If the SSE thread exits without a done event, inspect the last error payload and consider restarting the session.
  • Monitor /metrics for events counts and agent latency to ensure the live service keeps up with ingest.
  • Artifacts (timeline, summary, segmentation, agent outputs) are persisted and referenced by the final done event.