Skip to main content
The Live Capture API (/analyze/live/*) lets you push microphone or SIP audio chunks into Illocution and receive multi-agent analytics (transcripts, emotion, cognitive, transitions, summaries) as they finalize. Use this when you truly need live capture—not the Replay endpoint (POST /analyze/stream) that replays an already recorded file.

Endpoint Reference

PurposeMethod & PathNotes
Start sessionPOST /analyze/live/startReturns session_id, chunk/events/control URLs, TTL metadata.
Push audio chunkPOST /analyze/live/{session_id}/chunkSend sequential binary chunks (chunk_seq 0,1,2,…) up to one active session.
Control sessionPOST /analyze/live/{session_id}/controlfinalize, cancel, or keepalive to extend TTL.
Subscribe to analyticsGET /analyze/live/{session_id}/eventsSingle SSE stream per session; emits status, final_transcript, emotion, cognitive, transition, moment, summary_update, error, done.
All requests require X-API-Key. Live capture must be enabled server side (ENABLE_LIVE_STREAMING=true).

Prerequisites

pip install httpx==0.27.0 anyio
  • Use 16-bit PCM WAV, MP3, or Opus—transcoding happens server side.
  • Chunk sizes between 32–128 kB keep latency low without spamming HTTP.
  • Only one SSE consumer may attach to events_endpoint at a time.

Recipe 1 – Start a session & attach to SSE

import asyncio
import json
import httpx

API_KEY = "YOUR_KEY"
BASE_URL = "https://api.illocution.ai"


async def start_session(client: httpx.AsyncClient, filename: str, goal: str = "") -> dict:
    resp = await client.post(
        "/analyze/live/start",
        json={"filename": filename, "goal": goal},
        headers={"X-API-Key": API_KEY},
    )
    resp.raise_for_status()
    return resp.json()


async def stream_events(client: httpx.AsyncClient, events_url: str) -> None:
    headers = {
        "Accept": "text/event-stream",
        "X-API-Key": API_KEY,
    }
    async with client.stream("GET", events_url, headers=headers, timeout=None) as resp:
        resp.raise_for_status()
        event_name = None
        buffer = []
        async for line in resp.aiter_lines():
            if not line:
                if buffer:
                    payload = json.loads("".join(buffer))
                    print(f"{event_name or 'message'}: {payload}")
                    buffer.clear()
                continue
            if line.startswith("event:"):
                event_name = line.split("event:", 1)[1].strip()
            elif line.startswith("data:"):
                buffer.append(line.split("data:", 1)[1].strip())


async def main():
    async with httpx.AsyncClient(base_url=BASE_URL, timeout=30.0) as client:
        session = await start_session(client, filename="live_demo.wav", goal="Coach for confidence")
        print(f"Session {session['session_id']} started, expires {session['expires_at']}")
        await stream_events(client, session["events_endpoint"])


asyncio.run(main())

Event structure

  • status: {"phase": "ingesting"|"processing"|"completed", "progress": 0.0-1.0}
  • final_transcript: diarized utterance with timings/confidence
  • emotion: PAD + 7-way distribution per utterance
  • cognitive: engagement/focus/load plus textual rationale
  • transition / moment: detected shifts (e.g., objection, cta_commit)
  • summary_update: rolling recap near the end
  • error: structured {code,message}
  • done: {"conversation_id": "..."}

Recipe 2 – Stream audio chunks

from pathlib import Path


async def upload_chunks(
    client: httpx.AsyncClient,
    chunk_url: str,
    file_path: Path,
    chunk_size: int = 64 * 1024,
) -> None:
    headers = {"X-API-Key": API_KEY}
    seq = 0
    with file_path.open("rb") as media:
        while True:
            data = media.read(chunk_size)
            if not data:
                break
            files = {"chunk": (f"chunk-{seq}.bin", data, "application/octet-stream")}
            form = {"chunk_seq": str(seq)}
            resp = await client.post(chunk_url, headers=headers, data=form, files=files)
            resp.raise_for_status()
            ack = resp.json()
            print(f"chunk {seq} acknowledged, total bytes={ack['received_bytes']}")
            seq += 1
Rules of thumb:
  • chunk_seq must be monotonically increasing with no gaps. Retries should resend the same chunk_seq.
  • Keep chunk_size consistent; jitter is acceptable but smaller chunks mean more HTTP overhead.
  • Adjust timeout on the client for large uploads; the server enforces chunk ordering and rejects late chunks after finalize.

Recipe 3 – Control session lifecycle

async def finalize_session(client: httpx.AsyncClient, control_url: str) -> None:
    headers = {"X-API-Key": API_KEY}
    resp = await client.post(control_url, headers=headers, json={"action": "finalize"})
    resp.raise_for_status()
    print(f"Finalize ack: {resp.json()}")


async def keepalive_session(client: httpx.AsyncClient, control_url: str) -> None:
    headers = {"X-API-Key": API_KEY}
    resp = await client.post(control_url, headers=headers, json={"action": "keepalive"})
    resp.raise_for_status()
  • finalize stops chunk ingestion and lets the pipeline finish.
  • cancel tears down the session and emits error with your reason.
  • keepalive extends the TTL if you expect a long silence; send it before expires_at.

Full end-to-end async runner

import asyncio
from pathlib import Path
import httpx


async def run_live_capture(file_path: Path, api_key: str, base_url: str) -> None:
    headers = {"X-API-Key": api_key}
    async with httpx.AsyncClient(base_url=base_url, timeout=60.0) as client:
        start = await client.post(
            "/analyze/live/start",
            headers=headers,
            json={"filename": file_path.name, "goal": "Live coaching"},
        )
        start.raise_for_status()
        session = start.json()

        events_task = asyncio.create_task(stream_events(client, session["events_endpoint"]))
        await asyncio.sleep(0.5)  # let the SSE connection establish

        await upload_chunks(client, session["chunk_endpoint"], file_path)
        await finalize_session(client, session["control_endpoint"])

        await events_task  # waits for `done`


if __name__ == "__main__":
    asyncio.run(run_live_capture(Path("call.wav"), API_KEY, BASE_URL))

Production Notes

  • Mirror microphone capture by feeding an asyncio.Queue into upload_chunks; convert PCM frames to bytes before POSTing.
  • Handle reconnection logic for SSE (e.g., exponential backoff) and surface error events to operators.
  • /metrics exposes events counters and average agent latency for monitoring.
  • Artifact bundles (timeline, summary, segmentation) are persisted and referenced by the done event payload.
  • Only attach the SSE consumer once; the API rejects a second events connection for the same session.
  • For more advanced demos, use scripts/test_live_stream.py in this repo as a baseline.