Documentation Index
Fetch the complete documentation index at: https://docs.illocution.ai/llms.txt
Use this file to discover all available pages before exploring further.
The Live Capture API (/analyze/live/*) lets you push microphone or SIP audio chunks into Illocution and receive multi-agent analytics (transcripts, emotion, cognitive, transitions, summaries) as they finalize. Use this when you truly need live capture—not the Replay endpoint (POST /analyze/stream) that replays an already recorded file.
Endpoint Reference
| Purpose | Method & Path | Notes |
| Start session | POST /analyze/live/start | Returns session_id, chunk/events/control URLs, TTL metadata. |
| Push audio chunk | POST /analyze/live/{session_id}/chunk | Send sequential binary chunks (chunk_seq 0,1,2,…) up to one active session. |
| Control session | POST /analyze/live/{session_id}/control | finalize, cancel, or keepalive to extend TTL. |
| Subscribe to analytics | GET /analyze/live/{session_id}/events | Single SSE stream per session; emits status, final_transcript, emotion, cognitive, transition, moment, summary_update, error, done. |
All requests require X-API-Key. Live capture must be enabled server side (ENABLE_LIVE_STREAMING=true).
Prerequisites
pip install httpx==0.27.0 anyio
- Use 16-bit PCM WAV, MP3, or Opus—transcoding happens server side.
- Chunk sizes between 32–128 kB keep latency low without spamming HTTP.
- Only one SSE consumer may attach to
events_endpoint at a time.
Recipe 1 – Start a session & attach to SSE
import asyncio
import json
import httpx
API_KEY = "YOUR_KEY"
BASE_URL = "https://api.illocution.ai"
async def start_session(client: httpx.AsyncClient, filename: str, goal: str = "") -> dict:
resp = await client.post(
"/analyze/live/start",
json={"filename": filename, "goal": goal},
headers={"X-API-Key": API_KEY},
)
resp.raise_for_status()
return resp.json()
async def stream_events(client: httpx.AsyncClient, events_url: str) -> None:
headers = {
"Accept": "text/event-stream",
"X-API-Key": API_KEY,
}
async with client.stream("GET", events_url, headers=headers, timeout=None) as resp:
resp.raise_for_status()
event_name = None
buffer = []
async for line in resp.aiter_lines():
if not line:
if buffer:
payload = json.loads("".join(buffer))
print(f"{event_name or 'message'}: {payload}")
buffer.clear()
continue
if line.startswith("event:"):
event_name = line.split("event:", 1)[1].strip()
elif line.startswith("data:"):
buffer.append(line.split("data:", 1)[1].strip())
async def main():
async with httpx.AsyncClient(base_url=BASE_URL, timeout=30.0) as client:
session = await start_session(client, filename="live_demo.wav", goal="Coach for confidence")
print(f"Session {session['session_id']} started, expires {session['expires_at']}")
await stream_events(client, session["events_endpoint"])
asyncio.run(main())
Event structure
status: {"phase": "ingesting"|"processing"|"completed", "progress": 0.0-1.0}
final_transcript: diarized utterance with timings/confidence
emotion: PAD + 7-way distribution per utterance
cognitive: engagement/focus/load plus textual rationale
transition / moment: detected shifts (e.g., objection, cta_commit)
summary_update: rolling recap near the end
error: structured {code,message}
done: {"conversation_id": "..."}
Recipe 2 – Stream audio chunks
from pathlib import Path
async def upload_chunks(
client: httpx.AsyncClient,
chunk_url: str,
file_path: Path,
chunk_size: int = 64 * 1024,
) -> None:
headers = {"X-API-Key": API_KEY}
seq = 0
with file_path.open("rb") as media:
while True:
data = media.read(chunk_size)
if not data:
break
files = {"chunk": (f"chunk-{seq}.bin", data, "application/octet-stream")}
form = {"chunk_seq": str(seq)}
resp = await client.post(chunk_url, headers=headers, data=form, files=files)
resp.raise_for_status()
ack = resp.json()
print(f"chunk {seq} acknowledged, total bytes={ack['received_bytes']}")
seq += 1
Rules of thumb:
chunk_seq must be monotonically increasing with no gaps. Retries should resend the same chunk_seq.
- Keep
chunk_size consistent; jitter is acceptable but smaller chunks mean more HTTP overhead.
- Adjust
timeout on the client for large uploads; the server enforces chunk ordering and rejects late chunks after finalize.
Recipe 3 – Control session lifecycle
async def finalize_session(client: httpx.AsyncClient, control_url: str) -> None:
headers = {"X-API-Key": API_KEY}
resp = await client.post(control_url, headers=headers, json={"action": "finalize"})
resp.raise_for_status()
print(f"Finalize ack: {resp.json()}")
async def keepalive_session(client: httpx.AsyncClient, control_url: str) -> None:
headers = {"X-API-Key": API_KEY}
resp = await client.post(control_url, headers=headers, json={"action": "keepalive"})
resp.raise_for_status()
finalize stops chunk ingestion and lets the pipeline finish.
cancel tears down the session and emits error with your reason.
keepalive extends the TTL if you expect a long silence; send it before expires_at.
Full end-to-end async runner
import asyncio
from pathlib import Path
import httpx
async def run_live_capture(file_path: Path, api_key: str, base_url: str) -> None:
headers = {"X-API-Key": api_key}
async with httpx.AsyncClient(base_url=base_url, timeout=60.0) as client:
start = await client.post(
"/analyze/live/start",
headers=headers,
json={"filename": file_path.name, "goal": "Live coaching"},
)
start.raise_for_status()
session = start.json()
events_task = asyncio.create_task(stream_events(client, session["events_endpoint"]))
await asyncio.sleep(0.5) # let the SSE connection establish
await upload_chunks(client, session["chunk_endpoint"], file_path)
await finalize_session(client, session["control_endpoint"])
await events_task # waits for `done`
if __name__ == "__main__":
asyncio.run(run_live_capture(Path("call.wav"), API_KEY, BASE_URL))
Production Notes
- Mirror microphone capture by feeding an
asyncio.Queue into upload_chunks; convert PCM frames to bytes before POSTing.
- Handle reconnection logic for SSE (e.g., exponential backoff) and surface
error events to operators.
/metrics exposes events counters and average agent latency for monitoring.
- Artifact bundles (timeline, summary, segmentation) are persisted and referenced by the
done event payload.
- Only attach the SSE consumer once; the API rejects a second
events connection for the same session.
- For more advanced demos, use
scripts/test_live_stream.py in this repo as a baseline.